import calendar import json import logging import re from django.utils import timezone from apps.core.ai_service import AIService from apps.organizations.models import Hospital from .data_sources import get_data_source from .models import ( Presentation, PresentationTheme, PresentationStatus, ReportTemplate, Slide, SlideLayout, ) logger = logging.getLogger(__name__) class TemplateReportGenerator: def __init__(self, template_id, hospital_id, year, quarter=None, created_by=None): self.template = ReportTemplate.objects.get(pk=template_id, active=True) self.hospital_id = hospital_id self.year = year self.quarter = quarter self.created_by = created_by self._data = {} self._ai_insights = {} def generate(self): source_class = get_data_source(self.template.data_source) source = source_class( hospital_id=self.hospital_id, year=self.year, quarter=self.quarter, ) self._data = source.fetch() if self.template.ai_prompt_template: self._ai_insights = self._generate_ai_insights(source) hospital = Hospital.objects.get(pk=self.hospital_id) period_label = self._period_label() presentation = Presentation.objects.create( title=self._resolve_template( self.template.name, {"period_label": period_label, "hospital": hospital.name} ), subtitle=hospital.name, description=f"Generated from template: {self.template.name}", theme=PresentationTheme.HEALTHCARE_MODERN, status=PresentationStatus.PUBLISHED, presentation_type=self.template.slug, created_by=self.created_by, hospital=hospital, presentation_date=timezone.now().date(), is_shared=True, ) self._hospital = hospital order = 0 template_slides = self.template.template_slides.order_by("order") for ts in template_slides: if ts.repeat_source: order = self._create_repeated_slides(presentation, ts, order) else: self._create_slide_from_template(presentation, ts, order, {}) order += 1 return presentation def _period_label(self): if self.quarter: return f"Q{self.quarter}.{self.year}" return str(self.year) def _resolve_template(self, template_str, extra_vars=None): if not template_str: return "" hospital_name = "" if hasattr(self, '_hospital') and self._hospital: hospital_name = self._hospital.name variables = { "period_label": self._period_label(), "year": self.year, "quarter": self.quarter or "", "hospital": hospital_name, } if extra_vars: variables.update(extra_vars) def replacer(match): key = match.group(1).strip() parts = key.split(".") val = variables for part in parts: if isinstance(val, dict): val = val.get(part, "") else: return match.group(0) return str(val) return re.sub(r'\{\{\s*([^}]+)\s*\}\}', replacer, template_str) def _resolve_data_path(self, path, context=None): if not path: return "" parts = path.strip().split(".") val = context or self._data for part in parts: if isinstance(val, dict): val = val.get(part) elif isinstance(val, (list, tuple)): try: val = val[int(part)] except (ValueError, IndexError): return None else: return None return val def _get_dept_manager(self, dept_name): from apps.organizations.models import Department dept = Department.objects.filter(name__iexact=dept_name).first() if dept and dept.manager: return dept.manager.get_full_name() or str(dept.manager) return "" def _generate_ai_insights(self, source): data_summary = source.get_data_summary(self._data) prompt = self.template.ai_prompt_template.replace("{{ data_summary }}", data_summary) if "{{ data_summary }}" not in self.template.ai_prompt_template: prompt = f"{prompt}\n\nData:\n{data_summary}" try: response = AIService.chat_completion( prompt=prompt, system_prompt="You are a healthcare quality analyst. Respond only in valid JSON.", response_format="json_object", temperature=0.4, max_tokens=1500, ) return json.loads(response) except Exception as e: logger.warning(f"AI insights generation failed: {e}") return { "key_finding": f"Report data for {self._period_label()} has been processed.", "analysis": f"Automated analysis for {self._period_label()}.", "recommendations": [ "Review the data and identify areas for improvement", "Share findings with department heads", "Schedule follow-up review within 1 month", ], "next_steps": [ "Schedule review meeting within 2 weeks", "Develop action plan within 1 month", ], } def _build_slide_content(self, ts, item_data=None): mapping = ts.content_mapping or {} layout = ts.layout if layout == SlideLayout.KPI_DASHBOARD: return self._build_kpi_content(mapping, item_data) elif layout == SlideLayout.DATA_TABLE: return self._build_table_content(mapping, item_data, ts) elif layout in (SlideLayout.FULL_CHART, SlideLayout.CHART_METRICS): return self._build_chart_content(mapping, item_data) elif layout == SlideLayout.TWO_COLUMN: return self._build_two_column_content(mapping, item_data) elif layout == SlideLayout.QUOTE: return self._build_quote_content(mapping, item_data) elif layout == SlideLayout.TEAM_GRID: return self._build_team_grid_content(mapping, item_data) elif layout == SlideLayout.COVER: return self._build_cover_content(mapping, item_data) elif layout == SlideLayout.CLOSING: return self._build_closing_content(mapping, item_data) elif layout == SlideLayout.SECTION_DIVIDER: return { "section_number": ts.section_label, "section_label": self._resolve_template( mapping.get("section_label", ""), item_data ), } return mapping def _build_kpi_content(self, mapping, item_data): metrics = [] for m in mapping.get("metrics", []): metric = dict(m) if "value_path" in metric: val = self._resolve_data_path(metric.pop("value_path"), item_data) if val is not None: metric["value"] = str(val) if "description_path" in metric: val = self._resolve_data_path(metric.pop("description_path"), item_data) if val is not None: metric["description"] = str(val) metrics.append(metric) return {"metrics": metrics} def _build_table_content(self, mapping, item_data, ts): headers = mapping.get("headers", []) rows = [] items = item_data if isinstance(item_data, dict) and "items" in item_data: items = item_data["items"] if not isinstance(items, (list, tuple)): return {"headers": headers, "rows": rows} row_template = mapping.get("row_template", []) row_color_config = mapping.get("row_color", ts.style_overrides.get("row_color", {})) max_avg = 0 if row_color_config.get("highlight_top"): field = row_color_config.get("field", "avg_rating") for item in items: if isinstance(item, dict): v = item.get(field, 0) if isinstance(v, (int, float)) and v > max_avg: max_avg = v for item in items: row = [] row_class = "" if isinstance(item, dict) and row_color_config: field = row_color_config.get("field", "avg_rating") val = item.get(field, 0) if isinstance(val, (int, float)): is_top = row_color_config.get("highlight_top") and val == max_avg if is_top: row_class = row_color_config.get("highlight_class", "top") else: for rule in row_color_config.get("rules", []): op = rule.get("op", "gte") threshold = rule.get("value", 0) match = False if op == "gte" and val >= threshold: match = True elif op == "lt" and val < threshold: match = True elif op == "gt" and val > threshold: match = True elif op == "lte" and val <= threshold: match = True if match: row_class = rule.get("class", "") break for cell_tpl in row_template: cell = {} if row_class: cell["row_bg"] = row_class field = cell_tpl.get("field", "") cell_type = cell_tpl.get("type", "text") if field and isinstance(item, dict): raw_val = item.get(field, "") if cell_type == "rating_bar" and isinstance(raw_val, (int, float)): cell["rating_bar"] = { "value": f"{raw_val:.2f}", "pct": round(raw_val / 5 * 100), } else: cell["text"] = str(raw_val) elif cell_type == "text" and "text" in cell_tpl: cell["text"] = cell_tpl["text"] if cell_tpl.get("font_weight"): cell["font_weight"] = cell_tpl["font_weight"] row.append(cell) rows.append(row) return {"headers": headers, "rows": rows} def _build_chart_content(self, mapping, item_data): result = dict(mapping) if "chart_config" in result: chart = result["chart_config"] if "series_path" in chart: series_data = self._resolve_data_path(chart.pop("series_path"), item_data) if series_data is not None: chart["series"] = [{"name": "Data", "data": series_data}] if "categories_path" in chart: cats = self._resolve_data_path(chart.pop("categories_path"), item_data) if cats is not None: if "xaxis" not in chart: chart["xaxis"] = {} chart["xaxis"]["categories"] = cats return result def _build_two_column_content(self, mapping, item_data): result = {} if "left_body_path" in mapping: val = self._resolve_data_path(mapping.pop("left_body_path"), item_data) if val: result["left_body"] = [str(val)] if "right_bullets" in mapping: bullets = [] for b in mapping["right_bullets"]: if isinstance(b, dict) and "text_path" in b: val = self._resolve_data_path(b["text_path"], item_data) bullets.append({"text": str(val) if val else ""}) else: bullets.append(b) result["right_bullets"] = bullets for key in ("left_title", "right_title", "left_body"): if key in mapping: result[key] = mapping[key] return result def _build_quote_content(self, mapping, item_data): result = dict(mapping) if "quote_path" in result: val = self._resolve_data_path(result.pop("quote_path"), item_data) if val: result["quote"] = str(val) if "ai_key" in result and result.pop("ai_key") in self._ai_insights: result["quote"] = self._ai_insights[result["ai_key"]] return result def _build_team_grid_content(self, mapping, item_data): members = [] source_path = mapping.get("source_path", "") items = self._resolve_data_path(source_path, item_data) if source_path else item_data if isinstance(items, dict): items = list(items.values()) limit = mapping.get("limit", 10) sort_by = mapping.get("sort_by", "") reverse = mapping.get("sort_reverse", True) if isinstance(items, list) and sort_by: items = sorted(items, key=lambda x: x.get(sort_by, 0) if isinstance(x, dict) else 0, reverse=reverse) for item in (items or [])[:limit]: if isinstance(item, dict): name = item.get("name", "") members.append({ "name": name, "role": item.get("department", ""), "metric_value": str(item.get(sort_by, "")), "metric_label": mapping.get("metric_label", ""), "initials": name[0].upper() if name else "?", }) if not members: members.append({"name": "No data available", "role": "N/A", "initials": "-"}) return {"members": members} def _build_cover_content(self, mapping, item_data): result = dict(mapping) if "prepared_by" not in result and self.created_by: result["prepared_by"] = self.created_by.get_full_name() hospital = Hospital.objects.filter(pk=self.hospital_id).first() if hospital: result["hospital_name"] = hospital.name return result def _build_closing_content(self, mapping, item_data): result = dict(mapping) hospital = Hospital.objects.filter(pk=self.hospital_id).first() contact = [] if hospital: if hospital.email: contact.append(hospital.email) if hospital.phone: contact.append(f"Phone: {hospital.phone}") result["contact"] = contact return result def _create_slide_from_template(self, presentation, ts, order, item_vars): title = self._resolve_template(ts.title_template, item_vars) subtitle = self._resolve_template(ts.subtitle_template, item_vars) notes = self._resolve_template(ts.speaker_notes_template, item_vars) item_data = item_vars.get("_item_data") if item_vars else None content = self._build_slide_content(ts, item_data) ai_key = ts.content_mapping.get("ai_key", "") if ai_key and ai_key in self._ai_insights: if not subtitle: subtitle = str(self._ai_insights[ai_key]) if not notes: notes = str(self._ai_insights[ai_key]) return Slide.objects.create( presentation=presentation, layout=ts.layout, order=order, title=title, subtitle=subtitle, content=content, speaker_notes=notes, ) def _create_repeated_slides(self, presentation, ts, order): repeat_data = self._resolve_data_path(ts.repeat_source) if not repeat_data: return order if isinstance(repeat_data, dict): items = list(repeat_data.items()) elif isinstance(repeat_data, list): items = [(i, item) for i, item in enumerate(repeat_data)] else: return order for key, item in items: dept_name = str(key) if isinstance(key, str) else str(key) is_list = isinstance(item, list) is_dict = isinstance(item, dict) if ts.layout == SlideLayout.DATA_TABLE: if is_list: items_list = item total = len(item) elif is_dict: items_list = [item] total = 1 else: continue manager = self._get_dept_manager(dept_name) max_rows = ts.max_rows for i in range(0, len(items_list), max_rows): chunk = items_list[i:i + max_rows] title = ts.title_template.replace("{{ item }}", dept_name).replace("{{ department_name }}", dept_name) subtitle = ts.repeat_subtitle_template or "" if manager: subtitle = subtitle.replace("{{ manager }}", manager) if subtitle else f"Head of Department: {manager}" subtitle = subtitle.replace("{{ item_count }}", str(total)) if i > 0: page = i // max_rows + 1 total_pages = (len(items_list) - 1) // max_rows + 1 subtitle += f" | Page {page} of {total_pages}" content = self._build_table_content(ts.content_mapping, {"items": chunk}, ts) Slide.objects.create( presentation=presentation, layout=ts.layout, order=order, title=title, subtitle=subtitle, content=content, ) order += 1 else: item_vars = { "department_name": dept_name, "item_count": len(item) if is_list else 1, "item": item, "_item_data": item, } self._create_slide_from_template(presentation, ts, order, item_vars) order += 1 return order