HH/apps/presentations/template_generator.py
ismail c5f76b3855
Some checks are pending
Build and Push Docker Image / build (push) Waiting to run
updates
2026-05-11 14:45:30 +03:00

478 lines
18 KiB
Python

import calendar
import json
import logging
import re
from django.utils import timezone
from apps.core.ai_service import AIService
from apps.organizations.models import Hospital
from .data_sources import get_data_source
from .models import (
Presentation,
PresentationTheme,
PresentationStatus,
ReportTemplate,
Slide,
SlideLayout,
)
logger = logging.getLogger(__name__)
class TemplateReportGenerator:
def __init__(self, template_id, hospital_id, year, quarter=None, created_by=None):
self.template = ReportTemplate.objects.get(pk=template_id, active=True)
self.hospital_id = hospital_id
self.year = year
self.quarter = quarter
self.created_by = created_by
self._data = {}
self._ai_insights = {}
def generate(self):
source_class = get_data_source(self.template.data_source)
source = source_class(
hospital_id=self.hospital_id,
year=self.year,
quarter=self.quarter,
)
self._data = source.fetch()
if self.template.ai_prompt_template:
self._ai_insights = self._generate_ai_insights(source)
hospital = Hospital.objects.get(pk=self.hospital_id)
period_label = self._period_label()
presentation = Presentation.objects.create(
title=self._resolve_template(
self.template.name, {"period_label": period_label, "hospital": hospital.name}
),
subtitle=hospital.name,
description=f"Generated from template: {self.template.name}",
theme=PresentationTheme.HEALTHCARE_MODERN,
status=PresentationStatus.PUBLISHED,
presentation_type=self.template.slug,
created_by=self.created_by,
hospital=hospital,
presentation_date=timezone.now().date(),
is_shared=True,
)
self._hospital = hospital
order = 0
template_slides = self.template.template_slides.order_by("order")
for ts in template_slides:
if ts.repeat_source:
order = self._create_repeated_slides(presentation, ts, order)
else:
self._create_slide_from_template(presentation, ts, order, {})
order += 1
return presentation
def _period_label(self):
if self.quarter:
return f"Q{self.quarter}.{self.year}"
return str(self.year)
def _resolve_template(self, template_str, extra_vars=None):
if not template_str:
return ""
hospital_name = ""
if hasattr(self, '_hospital') and self._hospital:
hospital_name = self._hospital.name
variables = {
"period_label": self._period_label(),
"year": self.year,
"quarter": self.quarter or "",
"hospital": hospital_name,
}
if extra_vars:
variables.update(extra_vars)
def replacer(match):
key = match.group(1).strip()
parts = key.split(".")
val = variables
for part in parts:
if isinstance(val, dict):
val = val.get(part, "")
else:
return match.group(0)
return str(val)
return re.sub(r'\{\{\s*([^}]+)\s*\}\}', replacer, template_str)
def _resolve_data_path(self, path, context=None):
if not path:
return ""
parts = path.strip().split(".")
val = context or self._data
for part in parts:
if isinstance(val, dict):
val = val.get(part)
elif isinstance(val, (list, tuple)):
try:
val = val[int(part)]
except (ValueError, IndexError):
return None
else:
return None
return val
def _get_dept_manager(self, dept_name):
from apps.organizations.models import Department
dept = Department.objects.filter(name__iexact=dept_name).first()
if dept and dept.manager:
return dept.manager.get_full_name() or str(dept.manager)
return ""
def _generate_ai_insights(self, source):
data_summary = source.get_data_summary(self._data)
prompt = self.template.ai_prompt_template.replace("{{ data_summary }}", data_summary)
if "{{ data_summary }}" not in self.template.ai_prompt_template:
prompt = f"{prompt}\n\nData:\n{data_summary}"
try:
response = AIService.chat_completion(
prompt=prompt,
system_prompt="You are a healthcare quality analyst. Respond only in valid JSON.",
response_format="json_object",
temperature=0.4,
max_tokens=1500,
)
return json.loads(response)
except Exception as e:
logger.warning(f"AI insights generation failed: {e}")
return {
"key_finding": f"Report data for {self._period_label()} has been processed.",
"analysis": f"Automated analysis for {self._period_label()}.",
"recommendations": [
"Review the data and identify areas for improvement",
"Share findings with department heads",
"Schedule follow-up review within 1 month",
],
"next_steps": [
"Schedule review meeting within 2 weeks",
"Develop action plan within 1 month",
],
}
def _build_slide_content(self, ts, item_data=None):
mapping = ts.content_mapping or {}
layout = ts.layout
if layout == SlideLayout.KPI_DASHBOARD:
return self._build_kpi_content(mapping, item_data)
elif layout == SlideLayout.DATA_TABLE:
return self._build_table_content(mapping, item_data, ts)
elif layout in (SlideLayout.FULL_CHART, SlideLayout.CHART_METRICS):
return self._build_chart_content(mapping, item_data)
elif layout == SlideLayout.TWO_COLUMN:
return self._build_two_column_content(mapping, item_data)
elif layout == SlideLayout.QUOTE:
return self._build_quote_content(mapping, item_data)
elif layout == SlideLayout.TEAM_GRID:
return self._build_team_grid_content(mapping, item_data)
elif layout == SlideLayout.COVER:
return self._build_cover_content(mapping, item_data)
elif layout == SlideLayout.CLOSING:
return self._build_closing_content(mapping, item_data)
elif layout == SlideLayout.SECTION_DIVIDER:
return {
"section_number": ts.section_label,
"section_label": self._resolve_template(
mapping.get("section_label", ""), item_data
),
}
return mapping
def _build_kpi_content(self, mapping, item_data):
metrics = []
for m in mapping.get("metrics", []):
metric = dict(m)
if "value_path" in metric:
val = self._resolve_data_path(metric.pop("value_path"), item_data)
if val is not None:
metric["value"] = str(val)
if "description_path" in metric:
val = self._resolve_data_path(metric.pop("description_path"), item_data)
if val is not None:
metric["description"] = str(val)
metrics.append(metric)
return {"metrics": metrics}
def _build_table_content(self, mapping, item_data, ts):
headers = mapping.get("headers", [])
rows = []
items = item_data
if isinstance(item_data, dict) and "items" in item_data:
items = item_data["items"]
if not isinstance(items, (list, tuple)):
return {"headers": headers, "rows": rows}
row_template = mapping.get("row_template", [])
row_color_config = mapping.get("row_color", ts.style_overrides.get("row_color", {}))
max_avg = 0
if row_color_config.get("highlight_top"):
field = row_color_config.get("field", "avg_rating")
for item in items:
if isinstance(item, dict):
v = item.get(field, 0)
if isinstance(v, (int, float)) and v > max_avg:
max_avg = v
for item in items:
row = []
row_class = ""
if isinstance(item, dict) and row_color_config:
field = row_color_config.get("field", "avg_rating")
val = item.get(field, 0)
if isinstance(val, (int, float)):
is_top = row_color_config.get("highlight_top") and val == max_avg
if is_top:
row_class = row_color_config.get("highlight_class", "top")
else:
for rule in row_color_config.get("rules", []):
op = rule.get("op", "gte")
threshold = rule.get("value", 0)
match = False
if op == "gte" and val >= threshold:
match = True
elif op == "lt" and val < threshold:
match = True
elif op == "gt" and val > threshold:
match = True
elif op == "lte" and val <= threshold:
match = True
if match:
row_class = rule.get("class", "")
break
for cell_tpl in row_template:
cell = {}
if row_class:
cell["row_bg"] = row_class
field = cell_tpl.get("field", "")
cell_type = cell_tpl.get("type", "text")
if field and isinstance(item, dict):
raw_val = item.get(field, "")
if cell_type == "rating_bar" and isinstance(raw_val, (int, float)):
cell["rating_bar"] = {
"value": f"{raw_val:.2f}",
"pct": round(raw_val / 5 * 100),
}
else:
cell["text"] = str(raw_val)
elif cell_type == "text" and "text" in cell_tpl:
cell["text"] = cell_tpl["text"]
if cell_tpl.get("font_weight"):
cell["font_weight"] = cell_tpl["font_weight"]
row.append(cell)
rows.append(row)
return {"headers": headers, "rows": rows}
def _build_chart_content(self, mapping, item_data):
result = dict(mapping)
if "chart_config" in result:
chart = result["chart_config"]
if "series_path" in chart:
series_data = self._resolve_data_path(chart.pop("series_path"), item_data)
if series_data is not None:
chart["series"] = [{"name": "Data", "data": series_data}]
if "categories_path" in chart:
cats = self._resolve_data_path(chart.pop("categories_path"), item_data)
if cats is not None:
if "xaxis" not in chart:
chart["xaxis"] = {}
chart["xaxis"]["categories"] = cats
return result
def _build_two_column_content(self, mapping, item_data):
result = {}
if "left_body_path" in mapping:
val = self._resolve_data_path(mapping.pop("left_body_path"), item_data)
if val:
result["left_body"] = [str(val)]
if "right_bullets" in mapping:
bullets = []
for b in mapping["right_bullets"]:
if isinstance(b, dict) and "text_path" in b:
val = self._resolve_data_path(b["text_path"], item_data)
bullets.append({"text": str(val) if val else ""})
else:
bullets.append(b)
result["right_bullets"] = bullets
for key in ("left_title", "right_title", "left_body"):
if key in mapping:
result[key] = mapping[key]
return result
def _build_quote_content(self, mapping, item_data):
result = dict(mapping)
if "quote_path" in result:
val = self._resolve_data_path(result.pop("quote_path"), item_data)
if val:
result["quote"] = str(val)
if "ai_key" in result and result.pop("ai_key") in self._ai_insights:
result["quote"] = self._ai_insights[result["ai_key"]]
return result
def _build_team_grid_content(self, mapping, item_data):
members = []
source_path = mapping.get("source_path", "")
items = self._resolve_data_path(source_path, item_data) if source_path else item_data
if isinstance(items, dict):
items = list(items.values())
limit = mapping.get("limit", 10)
sort_by = mapping.get("sort_by", "")
reverse = mapping.get("sort_reverse", True)
if isinstance(items, list) and sort_by:
items = sorted(items, key=lambda x: x.get(sort_by, 0) if isinstance(x, dict) else 0, reverse=reverse)
for item in (items or [])[:limit]:
if isinstance(item, dict):
name = item.get("name", "")
members.append({
"name": name,
"role": item.get("department", ""),
"metric_value": str(item.get(sort_by, "")),
"metric_label": mapping.get("metric_label", ""),
"initials": name[0].upper() if name else "?",
})
if not members:
members.append({"name": "No data available", "role": "N/A", "initials": "-"})
return {"members": members}
def _build_cover_content(self, mapping, item_data):
result = dict(mapping)
if "prepared_by" not in result and self.created_by:
result["prepared_by"] = self.created_by.get_full_name()
hospital = Hospital.objects.filter(pk=self.hospital_id).first()
if hospital:
result["hospital_name"] = hospital.name
return result
def _build_closing_content(self, mapping, item_data):
result = dict(mapping)
hospital = Hospital.objects.filter(pk=self.hospital_id).first()
contact = []
if hospital:
if hospital.email:
contact.append(hospital.email)
if hospital.phone:
contact.append(f"Phone: {hospital.phone}")
result["contact"] = contact
return result
def _create_slide_from_template(self, presentation, ts, order, item_vars):
title = self._resolve_template(ts.title_template, item_vars)
subtitle = self._resolve_template(ts.subtitle_template, item_vars)
notes = self._resolve_template(ts.speaker_notes_template, item_vars)
item_data = item_vars.get("_item_data") if item_vars else None
content = self._build_slide_content(ts, item_data)
ai_key = ts.content_mapping.get("ai_key", "")
if ai_key and ai_key in self._ai_insights:
if not subtitle:
subtitle = str(self._ai_insights[ai_key])
if not notes:
notes = str(self._ai_insights[ai_key])
return Slide.objects.create(
presentation=presentation,
layout=ts.layout,
order=order,
title=title,
subtitle=subtitle,
content=content,
speaker_notes=notes,
)
def _create_repeated_slides(self, presentation, ts, order):
repeat_data = self._resolve_data_path(ts.repeat_source)
if not repeat_data:
return order
if isinstance(repeat_data, dict):
items = list(repeat_data.items())
elif isinstance(repeat_data, list):
items = [(i, item) for i, item in enumerate(repeat_data)]
else:
return order
for key, item in items:
dept_name = str(key) if isinstance(key, str) else str(key)
is_list = isinstance(item, list)
is_dict = isinstance(item, dict)
if ts.layout == SlideLayout.DATA_TABLE:
if is_list:
items_list = item
total = len(item)
elif is_dict:
items_list = [item]
total = 1
else:
continue
manager = self._get_dept_manager(dept_name)
max_rows = ts.max_rows
for i in range(0, len(items_list), max_rows):
chunk = items_list[i:i + max_rows]
title = ts.title_template.replace("{{ item }}", dept_name).replace("{{ department_name }}", dept_name)
subtitle = ts.repeat_subtitle_template or ""
if manager:
subtitle = subtitle.replace("{{ manager }}", manager) if subtitle else f"Head of Department: {manager}"
subtitle = subtitle.replace("{{ item_count }}", str(total))
if i > 0:
page = i // max_rows + 1
total_pages = (len(items_list) - 1) // max_rows + 1
subtitle += f" | Page {page} of {total_pages}"
content = self._build_table_content(ts.content_mapping, {"items": chunk}, ts)
Slide.objects.create(
presentation=presentation,
layout=ts.layout,
order=order,
title=title,
subtitle=subtitle,
content=content,
)
order += 1
else:
item_vars = {
"department_name": dept_name,
"item_count": len(item) if is_list else 1,
"item": item,
"_item_data": item,
}
self._create_slide_from_template(presentation, ts, order, item_vars)
order += 1
return order