478 lines
18 KiB
Python
478 lines
18 KiB
Python
import calendar
|
|
import json
|
|
import logging
|
|
import re
|
|
|
|
from django.utils import timezone
|
|
|
|
from apps.core.ai_service import AIService
|
|
from apps.organizations.models import Hospital
|
|
|
|
from .data_sources import get_data_source
|
|
from .models import (
|
|
Presentation,
|
|
PresentationTheme,
|
|
PresentationStatus,
|
|
ReportTemplate,
|
|
Slide,
|
|
SlideLayout,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TemplateReportGenerator:
|
|
def __init__(self, template_id, hospital_id, year, quarter=None, created_by=None):
|
|
self.template = ReportTemplate.objects.get(pk=template_id, active=True)
|
|
self.hospital_id = hospital_id
|
|
self.year = year
|
|
self.quarter = quarter
|
|
self.created_by = created_by
|
|
self._data = {}
|
|
self._ai_insights = {}
|
|
|
|
def generate(self):
|
|
source_class = get_data_source(self.template.data_source)
|
|
source = source_class(
|
|
hospital_id=self.hospital_id,
|
|
year=self.year,
|
|
quarter=self.quarter,
|
|
)
|
|
self._data = source.fetch()
|
|
|
|
if self.template.ai_prompt_template:
|
|
self._ai_insights = self._generate_ai_insights(source)
|
|
|
|
hospital = Hospital.objects.get(pk=self.hospital_id)
|
|
period_label = self._period_label()
|
|
|
|
presentation = Presentation.objects.create(
|
|
title=self._resolve_template(
|
|
self.template.name, {"period_label": period_label, "hospital": hospital.name}
|
|
),
|
|
subtitle=hospital.name,
|
|
description=f"Generated from template: {self.template.name}",
|
|
theme=PresentationTheme.HEALTHCARE_MODERN,
|
|
status=PresentationStatus.PUBLISHED,
|
|
presentation_type=self.template.slug,
|
|
created_by=self.created_by,
|
|
hospital=hospital,
|
|
presentation_date=timezone.now().date(),
|
|
is_shared=True,
|
|
)
|
|
|
|
self._hospital = hospital
|
|
|
|
order = 0
|
|
template_slides = self.template.template_slides.order_by("order")
|
|
|
|
for ts in template_slides:
|
|
if ts.repeat_source:
|
|
order = self._create_repeated_slides(presentation, ts, order)
|
|
else:
|
|
self._create_slide_from_template(presentation, ts, order, {})
|
|
order += 1
|
|
|
|
return presentation
|
|
|
|
def _period_label(self):
|
|
if self.quarter:
|
|
return f"Q{self.quarter}.{self.year}"
|
|
return str(self.year)
|
|
|
|
def _resolve_template(self, template_str, extra_vars=None):
|
|
if not template_str:
|
|
return ""
|
|
hospital_name = ""
|
|
if hasattr(self, '_hospital') and self._hospital:
|
|
hospital_name = self._hospital.name
|
|
variables = {
|
|
"period_label": self._period_label(),
|
|
"year": self.year,
|
|
"quarter": self.quarter or "",
|
|
"hospital": hospital_name,
|
|
}
|
|
if extra_vars:
|
|
variables.update(extra_vars)
|
|
|
|
def replacer(match):
|
|
key = match.group(1).strip()
|
|
parts = key.split(".")
|
|
val = variables
|
|
for part in parts:
|
|
if isinstance(val, dict):
|
|
val = val.get(part, "")
|
|
else:
|
|
return match.group(0)
|
|
return str(val)
|
|
|
|
return re.sub(r'\{\{\s*([^}]+)\s*\}\}', replacer, template_str)
|
|
|
|
def _resolve_data_path(self, path, context=None):
|
|
if not path:
|
|
return ""
|
|
parts = path.strip().split(".")
|
|
val = context or self._data
|
|
for part in parts:
|
|
if isinstance(val, dict):
|
|
val = val.get(part)
|
|
elif isinstance(val, (list, tuple)):
|
|
try:
|
|
val = val[int(part)]
|
|
except (ValueError, IndexError):
|
|
return None
|
|
else:
|
|
return None
|
|
return val
|
|
|
|
def _get_dept_manager(self, dept_name):
|
|
from apps.organizations.models import Department
|
|
dept = Department.objects.filter(name__iexact=dept_name).first()
|
|
if dept and dept.manager:
|
|
return dept.manager.get_full_name() or str(dept.manager)
|
|
return ""
|
|
|
|
def _generate_ai_insights(self, source):
|
|
data_summary = source.get_data_summary(self._data)
|
|
prompt = self.template.ai_prompt_template.replace("{{ data_summary }}", data_summary)
|
|
|
|
if "{{ data_summary }}" not in self.template.ai_prompt_template:
|
|
prompt = f"{prompt}\n\nData:\n{data_summary}"
|
|
|
|
try:
|
|
response = AIService.chat_completion(
|
|
prompt=prompt,
|
|
system_prompt="You are a healthcare quality analyst. Respond only in valid JSON.",
|
|
response_format="json_object",
|
|
temperature=0.4,
|
|
max_tokens=1500,
|
|
)
|
|
return json.loads(response)
|
|
except Exception as e:
|
|
logger.warning(f"AI insights generation failed: {e}")
|
|
return {
|
|
"key_finding": f"Report data for {self._period_label()} has been processed.",
|
|
"analysis": f"Automated analysis for {self._period_label()}.",
|
|
"recommendations": [
|
|
"Review the data and identify areas for improvement",
|
|
"Share findings with department heads",
|
|
"Schedule follow-up review within 1 month",
|
|
],
|
|
"next_steps": [
|
|
"Schedule review meeting within 2 weeks",
|
|
"Develop action plan within 1 month",
|
|
],
|
|
}
|
|
|
|
def _build_slide_content(self, ts, item_data=None):
|
|
mapping = ts.content_mapping or {}
|
|
layout = ts.layout
|
|
|
|
if layout == SlideLayout.KPI_DASHBOARD:
|
|
return self._build_kpi_content(mapping, item_data)
|
|
elif layout == SlideLayout.DATA_TABLE:
|
|
return self._build_table_content(mapping, item_data, ts)
|
|
elif layout in (SlideLayout.FULL_CHART, SlideLayout.CHART_METRICS):
|
|
return self._build_chart_content(mapping, item_data)
|
|
elif layout == SlideLayout.TWO_COLUMN:
|
|
return self._build_two_column_content(mapping, item_data)
|
|
elif layout == SlideLayout.QUOTE:
|
|
return self._build_quote_content(mapping, item_data)
|
|
elif layout == SlideLayout.TEAM_GRID:
|
|
return self._build_team_grid_content(mapping, item_data)
|
|
elif layout == SlideLayout.COVER:
|
|
return self._build_cover_content(mapping, item_data)
|
|
elif layout == SlideLayout.CLOSING:
|
|
return self._build_closing_content(mapping, item_data)
|
|
elif layout == SlideLayout.SECTION_DIVIDER:
|
|
return {
|
|
"section_number": ts.section_label,
|
|
"section_label": self._resolve_template(
|
|
mapping.get("section_label", ""), item_data
|
|
),
|
|
}
|
|
|
|
return mapping
|
|
|
|
def _build_kpi_content(self, mapping, item_data):
|
|
metrics = []
|
|
for m in mapping.get("metrics", []):
|
|
metric = dict(m)
|
|
if "value_path" in metric:
|
|
val = self._resolve_data_path(metric.pop("value_path"), item_data)
|
|
if val is not None:
|
|
metric["value"] = str(val)
|
|
if "description_path" in metric:
|
|
val = self._resolve_data_path(metric.pop("description_path"), item_data)
|
|
if val is not None:
|
|
metric["description"] = str(val)
|
|
metrics.append(metric)
|
|
return {"metrics": metrics}
|
|
|
|
def _build_table_content(self, mapping, item_data, ts):
|
|
headers = mapping.get("headers", [])
|
|
rows = []
|
|
items = item_data
|
|
if isinstance(item_data, dict) and "items" in item_data:
|
|
items = item_data["items"]
|
|
if not isinstance(items, (list, tuple)):
|
|
return {"headers": headers, "rows": rows}
|
|
|
|
row_template = mapping.get("row_template", [])
|
|
row_color_config = mapping.get("row_color", ts.style_overrides.get("row_color", {}))
|
|
|
|
max_avg = 0
|
|
if row_color_config.get("highlight_top"):
|
|
field = row_color_config.get("field", "avg_rating")
|
|
for item in items:
|
|
if isinstance(item, dict):
|
|
v = item.get(field, 0)
|
|
if isinstance(v, (int, float)) and v > max_avg:
|
|
max_avg = v
|
|
|
|
for item in items:
|
|
row = []
|
|
row_class = ""
|
|
if isinstance(item, dict) and row_color_config:
|
|
field = row_color_config.get("field", "avg_rating")
|
|
val = item.get(field, 0)
|
|
if isinstance(val, (int, float)):
|
|
is_top = row_color_config.get("highlight_top") and val == max_avg
|
|
if is_top:
|
|
row_class = row_color_config.get("highlight_class", "top")
|
|
else:
|
|
for rule in row_color_config.get("rules", []):
|
|
op = rule.get("op", "gte")
|
|
threshold = rule.get("value", 0)
|
|
match = False
|
|
if op == "gte" and val >= threshold:
|
|
match = True
|
|
elif op == "lt" and val < threshold:
|
|
match = True
|
|
elif op == "gt" and val > threshold:
|
|
match = True
|
|
elif op == "lte" and val <= threshold:
|
|
match = True
|
|
if match:
|
|
row_class = rule.get("class", "")
|
|
break
|
|
|
|
for cell_tpl in row_template:
|
|
cell = {}
|
|
if row_class:
|
|
cell["row_bg"] = row_class
|
|
|
|
field = cell_tpl.get("field", "")
|
|
cell_type = cell_tpl.get("type", "text")
|
|
|
|
if field and isinstance(item, dict):
|
|
raw_val = item.get(field, "")
|
|
if cell_type == "rating_bar" and isinstance(raw_val, (int, float)):
|
|
cell["rating_bar"] = {
|
|
"value": f"{raw_val:.2f}",
|
|
"pct": round(raw_val / 5 * 100),
|
|
}
|
|
else:
|
|
cell["text"] = str(raw_val)
|
|
elif cell_type == "text" and "text" in cell_tpl:
|
|
cell["text"] = cell_tpl["text"]
|
|
|
|
if cell_tpl.get("font_weight"):
|
|
cell["font_weight"] = cell_tpl["font_weight"]
|
|
|
|
row.append(cell)
|
|
rows.append(row)
|
|
|
|
return {"headers": headers, "rows": rows}
|
|
|
|
def _build_chart_content(self, mapping, item_data):
|
|
result = dict(mapping)
|
|
if "chart_config" in result:
|
|
chart = result["chart_config"]
|
|
if "series_path" in chart:
|
|
series_data = self._resolve_data_path(chart.pop("series_path"), item_data)
|
|
if series_data is not None:
|
|
chart["series"] = [{"name": "Data", "data": series_data}]
|
|
if "categories_path" in chart:
|
|
cats = self._resolve_data_path(chart.pop("categories_path"), item_data)
|
|
if cats is not None:
|
|
if "xaxis" not in chart:
|
|
chart["xaxis"] = {}
|
|
chart["xaxis"]["categories"] = cats
|
|
return result
|
|
|
|
def _build_two_column_content(self, mapping, item_data):
|
|
result = {}
|
|
if "left_body_path" in mapping:
|
|
val = self._resolve_data_path(mapping.pop("left_body_path"), item_data)
|
|
if val:
|
|
result["left_body"] = [str(val)]
|
|
|
|
if "right_bullets" in mapping:
|
|
bullets = []
|
|
for b in mapping["right_bullets"]:
|
|
if isinstance(b, dict) and "text_path" in b:
|
|
val = self._resolve_data_path(b["text_path"], item_data)
|
|
bullets.append({"text": str(val) if val else ""})
|
|
else:
|
|
bullets.append(b)
|
|
result["right_bullets"] = bullets
|
|
|
|
for key in ("left_title", "right_title", "left_body"):
|
|
if key in mapping:
|
|
result[key] = mapping[key]
|
|
|
|
return result
|
|
|
|
def _build_quote_content(self, mapping, item_data):
|
|
result = dict(mapping)
|
|
if "quote_path" in result:
|
|
val = self._resolve_data_path(result.pop("quote_path"), item_data)
|
|
if val:
|
|
result["quote"] = str(val)
|
|
if "ai_key" in result and result.pop("ai_key") in self._ai_insights:
|
|
result["quote"] = self._ai_insights[result["ai_key"]]
|
|
return result
|
|
|
|
def _build_team_grid_content(self, mapping, item_data):
|
|
members = []
|
|
source_path = mapping.get("source_path", "")
|
|
items = self._resolve_data_path(source_path, item_data) if source_path else item_data
|
|
if isinstance(items, dict):
|
|
items = list(items.values())
|
|
|
|
limit = mapping.get("limit", 10)
|
|
sort_by = mapping.get("sort_by", "")
|
|
reverse = mapping.get("sort_reverse", True)
|
|
|
|
if isinstance(items, list) and sort_by:
|
|
items = sorted(items, key=lambda x: x.get(sort_by, 0) if isinstance(x, dict) else 0, reverse=reverse)
|
|
|
|
for item in (items or [])[:limit]:
|
|
if isinstance(item, dict):
|
|
name = item.get("name", "")
|
|
members.append({
|
|
"name": name,
|
|
"role": item.get("department", ""),
|
|
"metric_value": str(item.get(sort_by, "")),
|
|
"metric_label": mapping.get("metric_label", ""),
|
|
"initials": name[0].upper() if name else "?",
|
|
})
|
|
|
|
if not members:
|
|
members.append({"name": "No data available", "role": "N/A", "initials": "-"})
|
|
|
|
return {"members": members}
|
|
|
|
def _build_cover_content(self, mapping, item_data):
|
|
result = dict(mapping)
|
|
if "prepared_by" not in result and self.created_by:
|
|
result["prepared_by"] = self.created_by.get_full_name()
|
|
hospital = Hospital.objects.filter(pk=self.hospital_id).first()
|
|
if hospital:
|
|
result["hospital_name"] = hospital.name
|
|
return result
|
|
|
|
def _build_closing_content(self, mapping, item_data):
|
|
result = dict(mapping)
|
|
hospital = Hospital.objects.filter(pk=self.hospital_id).first()
|
|
contact = []
|
|
if hospital:
|
|
if hospital.email:
|
|
contact.append(hospital.email)
|
|
if hospital.phone:
|
|
contact.append(f"Phone: {hospital.phone}")
|
|
result["contact"] = contact
|
|
return result
|
|
|
|
def _create_slide_from_template(self, presentation, ts, order, item_vars):
|
|
title = self._resolve_template(ts.title_template, item_vars)
|
|
subtitle = self._resolve_template(ts.subtitle_template, item_vars)
|
|
notes = self._resolve_template(ts.speaker_notes_template, item_vars)
|
|
|
|
item_data = item_vars.get("_item_data") if item_vars else None
|
|
content = self._build_slide_content(ts, item_data)
|
|
|
|
ai_key = ts.content_mapping.get("ai_key", "")
|
|
if ai_key and ai_key in self._ai_insights:
|
|
if not subtitle:
|
|
subtitle = str(self._ai_insights[ai_key])
|
|
if not notes:
|
|
notes = str(self._ai_insights[ai_key])
|
|
|
|
return Slide.objects.create(
|
|
presentation=presentation,
|
|
layout=ts.layout,
|
|
order=order,
|
|
title=title,
|
|
subtitle=subtitle,
|
|
content=content,
|
|
speaker_notes=notes,
|
|
)
|
|
|
|
def _create_repeated_slides(self, presentation, ts, order):
|
|
repeat_data = self._resolve_data_path(ts.repeat_source)
|
|
if not repeat_data:
|
|
return order
|
|
|
|
if isinstance(repeat_data, dict):
|
|
items = list(repeat_data.items())
|
|
elif isinstance(repeat_data, list):
|
|
items = [(i, item) for i, item in enumerate(repeat_data)]
|
|
else:
|
|
return order
|
|
|
|
for key, item in items:
|
|
dept_name = str(key) if isinstance(key, str) else str(key)
|
|
is_list = isinstance(item, list)
|
|
is_dict = isinstance(item, dict)
|
|
|
|
if ts.layout == SlideLayout.DATA_TABLE:
|
|
if is_list:
|
|
items_list = item
|
|
total = len(item)
|
|
elif is_dict:
|
|
items_list = [item]
|
|
total = 1
|
|
else:
|
|
continue
|
|
|
|
manager = self._get_dept_manager(dept_name)
|
|
max_rows = ts.max_rows
|
|
|
|
for i in range(0, len(items_list), max_rows):
|
|
chunk = items_list[i:i + max_rows]
|
|
title = ts.title_template.replace("{{ item }}", dept_name).replace("{{ department_name }}", dept_name)
|
|
|
|
subtitle = ts.repeat_subtitle_template or ""
|
|
if manager:
|
|
subtitle = subtitle.replace("{{ manager }}", manager) if subtitle else f"Head of Department: {manager}"
|
|
subtitle = subtitle.replace("{{ item_count }}", str(total))
|
|
if i > 0:
|
|
page = i // max_rows + 1
|
|
total_pages = (len(items_list) - 1) // max_rows + 1
|
|
subtitle += f" | Page {page} of {total_pages}"
|
|
|
|
content = self._build_table_content(ts.content_mapping, {"items": chunk}, ts)
|
|
|
|
Slide.objects.create(
|
|
presentation=presentation,
|
|
layout=ts.layout,
|
|
order=order,
|
|
title=title,
|
|
subtitle=subtitle,
|
|
content=content,
|
|
)
|
|
order += 1
|
|
else:
|
|
item_vars = {
|
|
"department_name": dept_name,
|
|
"item_count": len(item) if is_list else 1,
|
|
"item": item,
|
|
"_item_data": item,
|
|
}
|
|
self._create_slide_from_template(presentation, ts, order, item_vars)
|
|
order += 1
|
|
|
|
return order
|