HH/apps/executive_summary/services.py
ismail c5f76b3855
Some checks are pending
Build and Push Docker Image / build (push) Waiting to run
updates
2026-05-11 14:45:30 +03:00

2199 lines
93 KiB
Python

"""
Executive Summary Services - Aggregation, AI narratives, predictive analytics, and recommendations
This module provides service classes for:
- ExecutiveSummaryService: Aggregates daily metrics and dashboard data
- AINarrativeService: Generates AI-powered narrative summaries (English and Arabic)
- PredictiveAnalyticsService: Detects anomalies, predicts SLA breaches, calculates risk scores
- RecommendationService: Generates AI recommendations based on insights
All services use Django ORM queries following patterns from apps/dashboard/views.py
and leverage LiteLLM for AI-powered analysis.
"""
import logging
import statistics
from datetime import timedelta, date
from decimal import Decimal
from typing import Any, Optional
from django.db.models import Avg, Count, Q, Sum
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
logger = logging.getLogger(__name__)
# =============================================================================
# ExecutiveSummaryService
# =============================================================================
class ExecutiveSummaryService:
"""
Service for aggregating executive metrics and generating dashboard data.
Aggregates data from complaints, surveys, PX actions, observations,
call center interactions, and physician ratings across all hospitals.
"""
def __init__(self) -> None:
from apps.complaints.models import Complaint, ComplaintStatus
from apps.complaints.models import Inquiry
from apps.observations.models import Observation
from apps.px_action_center.models import PXAction
from apps.surveys.models import SurveyInstance
self.Complaint = Complaint
self.ComplaintStatus = ComplaintStatus
self.Inquiry = Inquiry
self.PXAction = PXAction
self.SurveyInstance = SurveyInstance
self.Observation = Observation
def aggregate_daily_metrics(self, target_date: Optional[date] = None, hospital=None) -> dict[str, Decimal]:
"""
Aggregate daily metrics across all hospitals (or a specific hospital) for a given date.
Args:
target_date: Date to aggregate metrics for (defaults to today)
hospital: Optional Hospital instance to filter by
Returns:
Dictionary of metric_type -> Decimal value
"""
if target_date is None:
target_date = timezone.now().date()
start_of_day = timezone.make_aware(timezone.datetime.combine(target_date, timezone.datetime.min.time()))
end_of_day = start_of_day + timedelta(days=1)
base_filters: dict[str, Any] = {"created_at__gte": start_of_day, "created_at__lt": end_of_day}
if hospital:
base_filters["hospital"] = hospital
metrics: dict[str, Decimal] = {}
# --- Complaints ---
complaints_qs = self.Complaint.objects.filter(**base_filters)
metrics["complaints_total"] = Decimal(complaints_qs.count())
metrics["complaints_critical"] = Decimal(complaints_qs.filter(severity="critical").count())
metrics["complaints_overdue"] = Decimal(
complaints_qs.filter(is_overdue=True, status__in=["open", "in_progress"]).count()
)
resolved_complaints = complaints_qs.filter(
status="closed",
closed_at__isnull=False,
)
if resolved_complaints.exists():
total_hours = sum((c.closed_at - c.created_at).total_seconds() / 3600 for c in resolved_complaints)
avg_hours = total_hours / resolved_complaints.count()
metrics["complaints_resolution_time"] = Decimal(str(avg_hours)).quantize(Decimal("0.01"))
else:
metrics["complaints_resolution_time"] = Decimal("0")
# --- Surveys ---
survey_base_filters: dict[str, Any] = {"completed_at__gte": start_of_day, "completed_at__lt": end_of_day}
if hospital:
survey_base_filters["hospital"] = hospital
surveys_qs = self.SurveyInstance.objects.filter(**survey_base_filters)
metrics["surveys_total"] = Decimal(surveys_qs.count())
satisfaction = surveys_qs.filter(total_score__isnull=False).aggregate(avg_score=Avg("total_score"))["avg_score"]
if satisfaction is not None:
metrics["surveys_satisfaction"] = Decimal(str(satisfaction)).quantize(Decimal("0.01"))
else:
metrics["surveys_satisfaction"] = Decimal("0")
# NPS-style calculation
total_surveys = surveys_qs.count()
if total_surveys > 0:
positive = surveys_qs.filter(is_negative=False).count()
negative = surveys_qs.filter(is_negative=True).count()
nps = Decimal(((positive - negative) / total_surveys) * 100).quantize(Decimal("0.01"))
else:
nps = Decimal("0")
metrics["surveys_nps"] = nps
surveys_sent = self.SurveyInstance.objects.filter(
sent_at__gte=start_of_day,
sent_at__lt=end_of_day,
**({"hospital": hospital} if hospital else {}),
).count()
if surveys_sent > 0:
metrics["surveys_response_rate"] = Decimal(str((total_surveys / surveys_sent) * 100)).quantize(
Decimal("0.01")
)
else:
metrics["surveys_response_rate"] = Decimal("0")
# --- PX Actions ---
action_base_filters: dict[str, Any] = {"created_at__gte": start_of_day, "created_at__lt": end_of_day}
if hospital:
action_base_filters["hospital"] = hospital
actions_qs = self.PXAction.objects.filter(**action_base_filters)
metrics["actions_total"] = Decimal(actions_qs.count())
metrics["actions_open"] = Decimal(actions_qs.filter(status="open").count())
metrics["actions_overdue"] = Decimal(
actions_qs.filter(is_overdue=True, status__in=["open", "in_progress"]).count()
)
metrics["actions_closed"] = Decimal(actions_qs.filter(status="closed").count())
# --- Observations ---
obs_base_filters: dict[str, Any] = {"created_at__gte": start_of_day, "created_at__lt": end_of_day}
if hospital:
obs_base_filters["hospital"] = hospital
observations_qs = self.Observation.objects.filter(**obs_base_filters)
metrics["observations_total"] = Decimal(observations_qs.count())
metrics["observations_critical"] = Decimal(observations_qs.filter(severity="critical").count())
# --- Inquiries ---
inquiry_base_filters: dict[str, Any] = {"created_at__gte": start_of_day, "created_at__lt": end_of_day}
if hospital:
inquiry_base_filters["hospital"] = hospital
inquiries_qs = self.Inquiry.objects.filter(**inquiry_base_filters)
metrics["inquiries_total"] = Decimal(inquiries_qs.count())
metrics["inquiries_resolved"] = Decimal(inquiries_qs.filter(status="resolved").count())
# --- Call Center (if available) ---
try:
from apps.callcenter.models import CallCenterInteraction
call_base_filters: dict[str, Any] = {
"call_started_at__gte": start_of_day,
"call_started_at__lt": end_of_day,
}
if hospital:
call_base_filters["hospital"] = hospital
calls_qs = CallCenterInteraction.objects.filter(**call_base_filters)
metrics["call_center_total"] = Decimal(calls_qs.count())
total_calls = calls_qs.count()
if total_calls > 0:
low_ratings = calls_qs.filter(is_low_rating=True).count()
satisfaction_rate = ((total_calls - low_ratings) / total_calls) * 100
metrics["call_center_satisfaction"] = Decimal(str(satisfaction_rate)).quantize(Decimal("0.01"))
else:
metrics["call_center_satisfaction"] = Decimal("0")
except Exception:
metrics["call_center_total"] = Decimal("0")
metrics["call_center_satisfaction"] = Decimal("0")
# --- Physician Ratings ---
try:
from apps.organizations.models import Staff
from apps.physicians.models import PhysicianMonthlyRating
now = timezone.now()
physician_qs = PhysicianMonthlyRating.objects.filter(year=now.year, month=now.month)
if hospital:
physician_qs = physician_qs.filter(staff__hospital=hospital)
avg_rating = physician_qs.aggregate(avg=Avg("average_rating"))["avg"]
if avg_rating is not None:
metrics["physician_avg_rating"] = Decimal(str(avg_rating)).quantize(Decimal("0.01"))
else:
metrics["physician_avg_rating"] = Decimal("0")
except Exception:
metrics["physician_avg_rating"] = Decimal("0")
return metrics
def get_dashboard_data(self, hospital=None, date_range: int = 30) -> dict[str, Any]:
"""
Return comprehensive dashboard data with KPIs, trends, and hospital comparisons.
Args:
hospital: Optional Hospital instance to scope data
date_range: Number of days to include (default 30)
Returns:
Dictionary containing:
- kpis: current period KPI values
- trends: trend data for each metric type
- hospital_comparison: comparison across hospitals
- variance: period-over-period changes
"""
now = timezone.now()
current_start = now - timedelta(days=date_range)
previous_start = now - timedelta(days=date_range * 2)
# Current period metrics
current_metrics = self._aggregate_period_metrics(current_start, now, hospital)
previous_metrics = self._aggregate_period_metrics(previous_start, current_start, hospital)
# Calculate variances
variances = {}
for key in current_metrics:
current_val = current_metrics.get(key, 0)
previous_val = previous_metrics.get(key, 0)
variances[key] = self.calculate_variance(current_val, previous_val)
# Hospital leaderboard
leaderboard = self.get_hospital_leaderboard()
# Trend data
trend_data = {}
for metric_type in [
"complaints_total",
"surveys_satisfaction",
"actions_total",
"observations_total",
]:
trend_data[metric_type] = self.get_trend_data(metric_type, days=date_range, hospital=hospital)
return {
"kpis": current_metrics,
"previous_kpis": previous_metrics,
"variances": variances,
"hospital_leaderboard": leaderboard,
"trends": trend_data,
"date_range": date_range,
"generated_at": now.isoformat(),
}
def calculate_variance(self, current: float | Decimal, previous: float | Decimal) -> dict[str, Any]:
"""
Calculate percentage variance between current and previous values.
Args:
current: Current period value
previous: Previous period value
Returns:
Dictionary with percentage, direction, and absolute_change
"""
current_val = float(current)
previous_val = float(previous)
if previous_val == 0:
return {
"percentage": 0.0,
"direction": "neutral",
"absolute_change": current_val,
}
percentage = ((current_val - previous_val) / previous_val) * 100
absolute_change = current_val - previous_val
if percentage > 0:
direction = "up"
elif percentage < 0:
direction = "down"
else:
direction = "neutral"
return {
"percentage": round(percentage, 1),
"direction": direction,
"absolute_change": round(absolute_change, 2),
}
def get_hospital_leaderboard(self) -> list[dict[str, Any]]:
"""
Rank hospitals by satisfaction, complaint resolution, and overall performance.
Returns:
List of dictionaries with hospital rankings and scores.
"""
from apps.organizations.models import Hospital
now = timezone.now()
last_30d = now - timedelta(days=30)
hospitals = Hospital.objects.filter(status="active")
leaderboard = []
for hospital in hospitals:
# Complaint resolution rate
total_complaints = self.Complaint.objects.filter(hospital=hospital, created_at__gte=last_30d).count()
resolved_complaints = self.Complaint.objects.filter(
hospital=hospital, status="closed", created_at__gte=last_30d
).count()
resolution_rate = (resolved_complaints / total_complaints * 100) if total_complaints > 0 else 0
# Satisfaction score
surveys = self.SurveyInstance.objects.filter(
hospital=hospital, completed_at__gte=last_30d, total_score__isnull=False
)
avg_satisfaction = surveys.aggregate(avg_score=Avg("total_score"))["avg_score"] or 0
# Overdue rate (lower is better)
overdue_complaints = self.Complaint.objects.filter(
hospital=hospital, is_overdue=True, status__in=["open", "in_progress"]
).count()
active_complaints = self.Complaint.objects.filter(
hospital=hospital, status__in=["open", "in_progress"]
).count()
overdue_rate = (overdue_complaints / active_complaints * 100) if active_complaints > 0 else 0
# Action closure rate
total_actions = self.PXAction.objects.filter(hospital=hospital, created_at__gte=last_30d).count()
closed_actions = self.PXAction.objects.filter(
hospital=hospital, status="closed", created_at__gte=last_30d
).count()
action_closure_rate = (closed_actions / total_actions * 100) if total_actions > 0 else 0
# Composite score (weighted)
composite_score = (
(resolution_rate * 0.3)
+ (float(avg_satisfaction) * 20 * 0.3)
+ ((100 - overdue_rate) * 0.2)
+ (action_closure_rate * 0.2)
)
leaderboard.append(
{
"hospital": hospital,
"hospital_name": hospital.name,
"resolution_rate": round(resolution_rate, 1),
"satisfaction_score": round(float(avg_satisfaction), 1),
"overdue_rate": round(overdue_rate, 1),
"action_closure_rate": round(action_closure_rate, 1),
"composite_score": round(composite_score, 1),
}
)
# Sort by composite score descending
leaderboard.sort(key=lambda x: x["composite_score"], reverse=True)
# Add rank
for idx, entry in enumerate(leaderboard, start=1):
entry["rank"] = idx
return leaderboard
def get_trend_data(
self,
metric_type: str,
days: int = 30,
hospital=None,
) -> list[dict[str, Any]]:
"""
Return trend data for charts over the specified number of days.
Args:
metric_type: One of the metric types (complaints_total, surveys_satisfaction, etc.)
days: Number of days to include (default 30)
hospital: Optional Hospital to filter by
Returns:
List of dicts with date and value for each day.
"""
now = timezone.now()
start_date = now - timedelta(days=days)
trend_data: list[dict[str, Any]] = []
for day_offset in range(days):
day_date = start_date + timedelta(days=day_offset)
day_start = timezone.make_aware(timezone.datetime.combine(day_date.date(), timezone.datetime.min.time()))
day_end = day_start + timedelta(days=1)
day_filters: dict[str, Any] = {"created_at__gte": day_start, "created_at__lt": day_end}
survey_day_filters: dict[str, Any] = {"completed_at__gte": day_start, "completed_at__lt": day_end}
call_day_filters: dict[str, Any] = {"call_started_at__gte": day_start, "call_started_at__lt": day_end}
if hospital:
day_filters["hospital"] = hospital
survey_day_filters["hospital"] = hospital
call_day_filters["hospital"] = hospital
value = 0.0
if metric_type.startswith("complaints"):
qs = self.Complaint.objects.filter(**day_filters)
if metric_type == "complaints_total":
value = float(qs.count())
elif metric_type == "complaints_critical":
value = float(qs.filter(severity="critical").count())
elif metric_type == "complaints_overdue":
value = float(qs.filter(is_overdue=True).count())
elif metric_type == "complaints_resolution_time":
resolved = qs.filter(status="closed", closed_at__isnull=False)
if resolved.exists():
avg_hours = (
sum((c.closed_at - c.created_at).total_seconds() / 3600 for c in resolved)
/ resolved.count()
)
value = avg_hours
elif metric_type.startswith("surveys"):
qs = self.SurveyInstance.objects.filter(**survey_day_filters)
if metric_type == "surveys_total":
value = float(qs.count())
elif metric_type == "surveys_satisfaction":
agg = qs.filter(total_score__isnull=False).aggregate(avg_score=Avg("total_score"))
value = float(agg["avg_score"] or 0)
elif metric_type == "surveys_nps":
total = qs.count()
if total > 0:
positive = qs.filter(is_negative=False).count()
negative = qs.filter(is_negative=True).count()
value = ((positive - negative) / total) * 100
elif metric_type.startswith("actions"):
qs = self.PXAction.objects.filter(**day_filters)
if metric_type == "actions_total":
value = float(qs.count())
elif metric_type == "actions_open":
value = float(qs.filter(status="open").count())
elif metric_type == "actions_overdue":
value = float(qs.filter(is_overdue=True).count())
elif metric_type == "actions_closed":
value = float(qs.filter(status="closed").count())
elif metric_type.startswith("observations"):
qs = self.Observation.objects.filter(**day_filters)
if metric_type == "observations_total":
value = float(qs.count())
elif metric_type == "observations_critical":
value = float(qs.filter(severity="critical").count())
elif metric_type.startswith("inquiries"):
from apps.complaints.models import Inquiry
qs = Inquiry.objects.filter(**day_filters)
if metric_type == "inquiries_total":
value = float(qs.count())
elif metric_type == "inquiries_resolved":
value = float(qs.filter(status="resolved").count())
elif metric_type.startswith("call_center"):
try:
from apps.callcenter.models import CallCenterInteraction
qs = CallCenterInteraction.objects.filter(**call_day_filters)
if metric_type == "call_center_total":
value = float(qs.count())
elif metric_type == "call_center_satisfaction":
total = qs.count()
if total > 0:
low = qs.filter(is_low_rating=True).count()
value = ((total - low) / total) * 100
except Exception:
value = 0.0
elif metric_type == "physician_avg_rating":
try:
from apps.physicians.models import PhysicianMonthlyRating
qs = PhysicianMonthlyRating.objects.filter(created_at__gte=day_start, created_at__lt=day_end)
if hospital:
qs = qs.filter(staff__hospital=hospital)
agg = qs.aggregate(avg=Avg("average_rating"))
value = float(agg["avg"] or 0)
except Exception:
value = 0.0
trend_data.append(
{
"date": day_date.date().isoformat(),
"value": round(value, 2),
}
)
return trend_data
# -------------------------------------------------------------------------
# Internal helpers
# -------------------------------------------------------------------------
def _aggregate_period_metrics(
self,
start_date,
end_date,
hospital=None,
) -> dict[str, float]:
"""Aggregate metrics for a given period."""
base_filters: dict[str, Any] = {"created_at__gte": start_date, "created_at__lt": end_date}
if hospital:
base_filters["hospital"] = hospital
survey_base_filters: dict[str, Any] = {"completed_at__gte": start_date, "completed_at__lt": end_date}
if hospital:
survey_base_filters["hospital"] = hospital
action_base_filters: dict[str, Any] = {"created_at__gte": start_date, "created_at__lt": end_date}
if hospital:
action_base_filters["hospital"] = hospital
obs_base_filters: dict[str, Any] = {"created_at__gte": start_date, "created_at__lt": end_date}
if hospital:
obs_base_filters["hospital"] = hospital
inquiry_base_filters: dict[str, Any] = {"created_at__gte": start_date, "created_at__lt": end_date}
if hospital:
inquiry_base_filters["hospital"] = hospital
metrics: dict[str, float] = {}
# Complaints
complaints_qs = self.Complaint.objects.filter(**base_filters)
metrics["complaints_total"] = float(complaints_qs.count())
metrics["complaints_critical"] = float(complaints_qs.filter(severity="critical").count())
metrics["complaints_overdue"] = float(
complaints_qs.filter(is_overdue=True, status__in=["open", "in_progress"]).count()
)
resolved = complaints_qs.filter(status="closed", closed_at__isnull=False)
if resolved.exists():
avg_hours = sum((c.closed_at - c.created_at).total_seconds() / 3600 for c in resolved) / resolved.count()
metrics["complaints_resolution_time"] = avg_hours
else:
metrics["complaints_resolution_time"] = 0.0
# Surveys
surveys_qs = self.SurveyInstance.objects.filter(**survey_base_filters)
metrics["surveys_total"] = float(surveys_qs.count())
satisfaction = surveys_qs.filter(total_score__isnull=False).aggregate(avg_score=Avg("total_score"))["avg_score"]
metrics["surveys_satisfaction"] = float(satisfaction or 0)
total_surveys = surveys_qs.count()
if total_surveys > 0:
positive = surveys_qs.filter(is_negative=False).count()
negative = surveys_qs.filter(is_negative=True).count()
metrics["surveys_nps"] = ((positive - negative) / total_surveys) * 100
else:
metrics["surveys_nps"] = 0.0
# PX Actions
actions_qs = self.PXAction.objects.filter(**action_base_filters)
metrics["actions_total"] = float(actions_qs.count())
metrics["actions_open"] = float(actions_qs.filter(status="open").count())
metrics["actions_overdue"] = float(
actions_qs.filter(is_overdue=True, status__in=["open", "in_progress"]).count()
)
metrics["actions_closed"] = float(actions_qs.filter(status="closed").count())
# Observations
observations_qs = self.Observation.objects.filter(**obs_base_filters)
metrics["observations_total"] = float(observations_qs.count())
metrics["observations_critical"] = float(observations_qs.filter(severity="critical").count())
# Inquiries
from apps.complaints.models import Inquiry
inquiries_qs = Inquiry.objects.filter(**inquiry_base_filters)
metrics["inquiries_total"] = float(inquiries_qs.count())
metrics["inquiries_resolved"] = float(inquiries_qs.filter(status="resolved").count())
# Call Center
try:
from apps.callcenter.models import CallCenterInteraction
call_base_filters: dict[str, Any] = {
"call_started_at__gte": start_date,
"call_started_at__lt": end_date,
}
if hospital:
call_base_filters["hospital"] = hospital
calls_qs = CallCenterInteraction.objects.filter(**call_base_filters)
total_calls = calls_qs.count()
metrics["call_center_total"] = float(total_calls)
if total_calls > 0:
low = calls_qs.filter(is_low_rating=True).count()
metrics["call_center_satisfaction"] = ((total_calls - low) / total_calls) * 100
else:
metrics["call_center_satisfaction"] = 0.0
except Exception:
metrics["call_center_total"] = 0.0
metrics["call_center_satisfaction"] = 0.0
# Physician Ratings
try:
from apps.physicians.models import PhysicianMonthlyRating
now = timezone.now()
physician_qs = PhysicianMonthlyRating.objects.filter(year=now.year, month=now.month)
if hospital:
physician_qs = physician_qs.filter(staff__hospital=hospital)
avg_rating = physician_qs.aggregate(avg=Avg("average_rating"))["avg"]
metrics["physician_avg_rating"] = float(avg_rating or 0)
except Exception:
metrics["physician_avg_rating"] = 0.0
return metrics
# =============================================================================
# AINarrativeService
# =============================================================================
class AINarrativeService:
"""
Service for generating AI-powered narrative summaries in English and Arabic.
Uses LiteLLM with OpenRouter to analyze performance data and generate
bilingual executive narratives with insights, achievements, and concerns.
"""
MODEL_NAME = "openrouter/google/gemma-3-27b-it:free"
def __init__(self) -> None:
self.summary_service = ExecutiveSummaryService()
def generate_weekly_narrative(
self,
start_date: date,
end_date: date,
hospital=None,
) -> dict[str, Any]:
"""Generate an English narrative summary for a weekly period."""
return self._generate_narrative(
start_date=start_date,
end_date=end_date,
hospital=hospital,
language="en",
period_label="weekly",
)
def generate_monthly_narrative(
self,
start_date: date,
end_date: date,
hospital=None,
) -> dict[str, Any]:
"""Generate an English narrative summary for a monthly period."""
return self._generate_narrative(
start_date=start_date,
end_date=end_date,
hospital=hospital,
language="en",
period_label="monthly",
)
def generate_quarterly_narrative(
self,
start_date: date,
end_date: date,
hospital=None,
) -> dict[str, Any]:
"""Generate an English narrative summary for a quarterly period."""
return self._generate_narrative(
start_date=start_date,
end_date=end_date,
hospital=hospital,
language="en",
period_label="quarterly",
)
def generate_arabic_narrative(
self,
start_date: date,
end_date: date,
hospital=None,
report_type: str = "weekly",
) -> dict[str, Any]:
"""
Generate an Arabic narrative summary.
Args:
start_date: Start of the reporting period
end_date: End of the reporting period
hospital: Optional Hospital to scope the narrative
report_type: weekly, monthly, or quarterly
Returns:
Dictionary with Arabic narrative, highlights, concerns, and metadata
"""
period_map = {
"weekly": "أسبوعي",
"monthly": "شهري",
"quarterly": "ربع سنوي",
}
period_label = period_map.get(report_type, "أسبوعي")
return self._generate_narrative(
start_date=start_date,
end_date=end_date,
hospital=hospital,
language="ar",
period_label=period_label,
)
def _generate_narrative(
self,
start_date: date,
end_date: date,
hospital=None,
language: str = "en",
period_label: str = "weekly",
) -> dict[str, Any]:
"""Internal unified narrative generator supporting EN/AR and any period."""
import json
import time
start_time = time.time()
try:
metrics_data = self._collect_period_metrics(start_date, end_date, hospital)
formatted_metrics = self.format_metrics_for_ai(metrics_data)
hospital_name = hospital.name if hospital else ("All Hospitals" if language == "en" else "جميع المستشفيات")
if language == "ar":
prompt = f"""أنت محلل تنفيذي لمنصة تجربة المرضى (PX) في قطاع الرعاية الصحية.
أعد ملخصًا {period_label}ًا سرديًا شاملاً يحلل بيانات الأداء لـ {hospital_name}.
## فترة التقرير: من {start_date.isoformat()} إلى {end_date.isoformat()}
## بيانات الأداء:
{formatted_metrics}
قم بتحليل البيانات وتقديم:
1. **الملخص التنفيذي**: فقرتان إلى ثلاث فقرات توضح نظرة عامة على أداء الفترة {period_label}، مع إبراز أهم الاتجاهات والأنماط.
2. **أبرز الإنجازات**: اذكر 3-5 نتائج إيجابية أو تحسينات أو نجاحات تم تحديدها في البيانات.
3. **أهم المخاوف**: اذكر 3-5 مجالات تحتاج إلى اهتمام، بما في ذلك تراجع المؤشرات أو خرق اتفاقيات مستوى الخدمة أو المخاطر الناشئة.
4. **رؤى قابلة للتنفيذ**: قدم 3-5 توصيات محددة قائمة على البيانات للتحسين.
5. **تحليل الاتجاهات**: حدد أي اتجاهات تصاعدية أو تنازلية ملحوظة مقارنة بالفترة السابقة.
اكتب بلغة مهنية مناسبة للمستوى التنفيذي. كن محددًا بالأرقام والنسب المئوية. ركّز على تجربة المرضى، وحل الشكاوى، ودرجات الرضا، والكفاءة التشغيلية.
أعد استجابتك بتنسيق JSON بالمفاتيح التالية:
- "executive_summary": نص (فقرتان إلى ثلاث فقرات)
- "highlights": مصفوفة نصوص (3-5 إنجازات)
- "concerns": مصفوفة نصوص (3-5 مخاوف)
- "actionable_insights": مصفوفة نصوص (3-5 توصيات)
- "trend_analysis": نص (ملخص الاتجاهات)
"""
result_keys = {
"narrative_key": "narrative_ar",
"highlights_key": "highlights_ar",
"concerns_key": "concerns_ar",
"actionable_key": "actionable_insights_ar",
"trend_key": "trend_analysis_ar",
"fallback_text": f"تعذر إنشاء السرد بسبب خطأ",
}
else:
prompt = f"""You are an executive analyst for a healthcare patient experience (PX) platform.
Generate a comprehensive {period_label} narrative summary analyzing performance data for {hospital_name}.
## Reporting Period: {start_date.isoformat()} to {end_date.isoformat()}
## Performance Data:
{formatted_metrics}
Please analyze the data and provide:
1. **Executive Summary**: A 2-3 paragraph overview of the {period_label}'s performance, highlighting the most significant trends and patterns.
2. **Key Achievements**: List 3-5 positive outcomes, improvements, or successes identified in the data.
3. **Key Concerns**: List 3-5 areas that require attention, including declining metrics, SLA breaches, or emerging risks.
4. **Actionable Insights**: Provide 3-5 specific, data-driven recommendations for improvement.
5. **Trend Analysis**: Identify notable upward or downward trends compared to the previous period.
Write in a professional, executive-appropriate tone. Be specific with numbers and percentages. Focus on patient experience, complaint resolution, satisfaction scores, and operational efficiency.
Format your response as JSON with these keys:
- "executive_summary": string (2-3 paragraphs)
- "highlights": array of strings (3-5 achievements)
- "concerns": array of strings (3-5 concerns)
- "actionable_insights": array of strings (3-5 recommendations)
- "trend_analysis": string (trend summary)
"""
result_keys = {
"narrative_key": "narrative_en",
"highlights_key": "highlights_en",
"concerns_key": "concerns_en",
"actionable_key": "actionable_insights_en",
"trend_key": "trend_analysis_en",
"fallback_text": "Unable to generate narrative due to an error",
}
response = self._call_ai(prompt)
try:
result = json.loads(self._clean_ai_response(response))
except json.JSONDecodeError:
cleaned = self._clean_ai_response(response)
if cleaned.startswith("{"):
try:
result = json.loads(cleaned)
except json.JSONDecodeError:
result = {
"executive_summary": cleaned,
"highlights": [],
"concerns": [],
"actionable_insights": [],
"trend_analysis": "",
}
else:
result = {
"executive_summary": cleaned,
"highlights": [],
"concerns": [],
"actionable_insights": [],
"trend_analysis": "",
}
generation_time_ms = int((time.time() - start_time) * 1000)
return {
result_keys["narrative_key"]: result.get("executive_summary", ""),
result_keys["highlights_key"]: result.get("highlights", []),
result_keys["concerns_key"]: result.get("concerns", []),
result_keys["actionable_key"]: result.get("actionable_insights", []),
result_keys["trend_key"]: result.get("trend_analysis", ""),
"ai_model": self.MODEL_NAME,
"status": "completed",
"generation_time_ms": generation_time_ms,
}
except Exception as e:
logger.error(f"Error generating {language} {period_label} narrative: {e}", exc_info=True)
generation_time_ms = int((time.time() - start_time) * 1000)
return {
result_keys["narrative_key"]: f"{result_keys['fallback_text']}: {str(e)}",
result_keys["highlights_key"]: [],
result_keys["concerns_key"]: [],
result_keys["actionable_key"]: [],
result_keys["trend_key"]: "",
"ai_model": self.MODEL_NAME,
"status": "failed",
"error_message": str(e),
"generation_time_ms": generation_time_ms,
}
def format_metrics_for_ai(self, metrics_data: dict[str, Any]) -> str:
"""
Format metrics data into a human-readable prompt for AI analysis.
Args:
metrics_data: Dictionary of metric names to values with optional variances
Returns:
Formatted string suitable for AI prompt input.
"""
lines = []
# Complaints
lines.append("### Complaints")
lines.append(f"- Total complaints: {metrics_data.get('complaints_total', 0)}")
lines.append(f"- Critical complaints: {metrics_data.get('complaints_critical', 0)}")
lines.append(f"- Overdue complaints: {metrics_data.get('complaints_overdue', 0)}")
lines.append(f"- Average resolution time: {metrics_data.get('complaints_resolution_time', 0):.1f} hours")
if "complaints_variance" in metrics_data:
lines.append(f"- Period variance: {metrics_data['complaints_variance']:+.1f}%")
lines.append("")
# Surveys
lines.append("### Surveys")
lines.append(f"- Total surveys completed: {metrics_data.get('surveys_total', 0)}")
lines.append(f"- Average satisfaction score: {metrics_data.get('surveys_satisfaction', 0):.1f}/5.0")
lines.append(f"- NPS score: {metrics_data.get('surveys_nps', 0):.1f}")
lines.append(f"- Survey response rate: {metrics_data.get('surveys_response_rate', 0):.1f}%")
if "surveys_variance" in metrics_data:
lines.append(f"- Period variance: {metrics_data['surveys_variance']:+.1f}%")
lines.append("")
# PX Actions
lines.append("### PX Actions")
lines.append(f"- Total actions: {metrics_data.get('actions_total', 0)}")
lines.append(f"- Open actions: {metrics_data.get('actions_open', 0)}")
lines.append(f"- Overdue actions: {metrics_data.get('actions_overdue', 0)}")
lines.append(f"- Closed actions: {metrics_data.get('actions_closed', 0)}")
lines.append("")
# Observations
lines.append("### Observations")
lines.append(f"- Total observations: {metrics_data.get('observations_total', 0)}")
lines.append(f"- Critical observations: {metrics_data.get('observations_critical', 0)}")
lines.append("")
# Inquiries
lines.append("### Inquiries")
lines.append(f"- Total inquiries: {metrics_data.get('inquiries_total', 0)}")
lines.append(f"- Resolved inquiries: {metrics_data.get('inquiries_resolved', 0)}")
lines.append("")
# Call Center
lines.append("### Call Center")
lines.append(f"- Total interactions: {metrics_data.get('call_center_total', 0)}")
lines.append(f"- Satisfaction rate: {metrics_data.get('call_center_satisfaction', 0):.1f}%")
lines.append("")
# Physician Ratings
lines.append("### Physician Ratings")
lines.append(f"- Average physician rating: {metrics_data.get('physician_avg_rating', 0):.2f}/5.0")
return "\n".join(lines)
# -------------------------------------------------------------------------
# Internal helpers
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
# Tab-specific on-demand AI analysis
# -------------------------------------------------------------------------
def generate_overview_analysis(
self,
start_date: date,
end_date: date,
hospital=None,
) -> dict[str, Any]:
"""Generate on-demand AI analysis for the Overview tab."""
import json as _json
import time as _time
t0 = _time.time()
try:
metrics_data = self._collect_period_metrics(start_date, end_date, hospital)
formatted = self.format_metrics_for_ai(metrics_data)
hospital_name = hospital.name if hospital else "All Hospitals"
prompt = f"""You are an executive healthcare PX analyst.
Analyze the following KPI overview for {hospital_name} ({start_date.isoformat()} to {end_date.isoformat()}):
{formatted}
Provide:
1. A 2-3 paragraph executive overview of current performance.
2. 3-5 key highlights (positive outcomes).
3. 3-5 key concerns (areas needing attention).
4. 3-5 actionable recommendations.
Format as JSON:
{{"narrative": "...", "highlights": ["..."], "concerns": ["..."], "recommendations": ["..."]}}"""
response = self._call_ai(prompt)
result = _json.loads(self._clean_ai_response(response))
return {
"narrative": result.get("narrative", ""),
"highlights": result.get("highlights", []),
"concerns": result.get("concerns", []),
"recommendations": result.get("recommendations", []),
"generation_time_ms": int((_time.time() - t0) * 1000),
"status": "completed",
}
except Exception as e:
logger.error(f"Error generating overview analysis: {e}", exc_info=True)
return {
"narrative": f"Analysis unavailable: {e}",
"highlights": [],
"concerns": [],
"recommendations": [],
"generation_time_ms": int((_time.time() - t0) * 1000),
"status": "failed",
}
def generate_trends_analysis(
self,
start_date: date,
end_date: date,
hospital=None,
trends_data: dict[str, list] | None = None,
leaderboard: list[dict] | None = None,
) -> dict[str, Any]:
"""Generate on-demand AI analysis for the Trends tab."""
import json as _json
import time as _time
t0 = _time.time()
try:
hospital_name = hospital.name if hospital else "All Hospitals"
trend_summary_parts = []
for metric_key, points in (trends_data or {}).items():
values = [p.get("value", 0) for p in points]
if values:
trend_summary_parts.append(
f"- {metric_key}: min={min(values):.1f}, max={max(values):.1f}, "
f"avg={sum(values) / len(values):.1f}, latest={values[-1]:.1f}"
)
lb_parts = []
for entry in (leaderboard or [])[:5]:
lb_parts.append(
f" #{entry.get('rank', '?')} {entry.get('hospital_name', '?')} "
f"- composite: {entry.get('composite_score', 0):.1f}, "
f"resolution: {entry.get('resolution_rate', 0):.1f}%, "
f"satisfaction: {entry.get('satisfaction_score', 0):.1f}"
)
prompt = f"""You are an executive healthcare PX analyst specializing in trend analysis.
Analyze trends for {hospital_name} ({start_date.isoformat()} to {end_date.isoformat()}):
## Metric Trends:
{chr(10).join(trend_summary_parts) or "No trend data available."}
## Hospital Leaderboard:
{chr(10).join(lb_parts) or "No leaderboard data available."}
Provide:
1. A 2-3 paragraph trend analysis narrative.
2. 3-5 key highlights (positive patterns).
3. 3-5 key concerns (negative or emerging patterns).
4. 3-5 actionable recommendations based on trends.
Format as JSON:
{{"narrative": "...", "highlights": ["..."], "concerns": ["..."], "recommendations": ["..."]}}"""
response = self._call_ai(prompt)
result = _json.loads(self._clean_ai_response(response))
return {
"narrative": result.get("narrative", ""),
"highlights": result.get("highlights", []),
"concerns": result.get("concerns", []),
"recommendations": result.get("recommendations", []),
"generation_time_ms": int((_time.time() - t0) * 1000),
"status": "completed",
}
except Exception as e:
logger.error(f"Error generating trends analysis: {e}", exc_info=True)
return {
"narrative": f"Analysis unavailable: {e}",
"highlights": [],
"concerns": [],
"recommendations": [],
"generation_time_ms": int((_time.time() - t0) * 1000),
"status": "failed",
}
def generate_insights_analysis(
self,
hospital=None,
risk_alerts: list | None = None,
ai_recommendations: list | None = None,
) -> dict[str, Any]:
"""Generate on-demand AI analysis for the Insights tab."""
import json as _json
import time as _time
t0 = _time.time()
try:
hospital_name = hospital.name if hospital else "All Hospitals"
alert_parts = []
for alert in (risk_alerts or [])[:10]:
alert_parts.append(
f"- [{alert.severity}] {alert.title_en} "
f"({alert.get_insight_type_display()}) "
f"{'- ' + alert.hospital.name if alert.hospital else ''}"
)
rec_parts = []
for rec in (ai_recommendations or [])[:5]:
rec_parts.append(f"- [{rec.get_priority_display()}] {rec.title_en}: {rec.description_en[:120]}...")
prompt = f"""You are an executive healthcare PX risk analyst.
Analyze the current risk landscape for {hospital_name}:
## Active Risk Alerts ({len(risk_alerts or [])} total):
{chr(10).join(alert_parts) or "No active risk alerts."}
## AI Recommendations ({len(ai_recommendations or [])} total):
{chr(10).join(rec_parts) or "No active recommendations."}
Provide:
1. A 2-3 paragraph risk assessment narrative.
2. 3-5 key highlights (mitigated risks or positive findings).
3. 3-5 key concerns (active or emerging risks).
4. 3-5 actionable recommendations for risk mitigation.
Format as JSON:
{{"narrative": "...", "highlights": ["..."], "concerns": ["..."], "recommendations": ["..."]}}"""
response = self._call_ai(prompt)
result = _json.loads(self._clean_ai_response(response))
return {
"narrative": result.get("narrative", ""),
"highlights": result.get("highlights", []),
"concerns": result.get("concerns", []),
"recommendations": result.get("recommendations", []),
"generation_time_ms": int((_time.time() - t0) * 1000),
"status": "completed",
}
except Exception as e:
logger.error(f"Error generating insights analysis: {e}", exc_info=True)
return {
"narrative": f"Analysis unavailable: {e}",
"highlights": [],
"concerns": [],
"recommendations": [],
"generation_time_ms": int((_time.time() - t0) * 1000),
"status": "failed",
}
def _collect_period_metrics(
self,
start_date: date,
end_date: date,
hospital=None,
) -> dict[str, Any]:
"""Collect metrics for a given period with variances."""
now = timezone.now()
period_length = (end_date - start_date).days
current_start = timezone.make_aware(timezone.datetime.combine(start_date, timezone.datetime.min.time()))
current_end = timezone.make_aware(timezone.datetime.combine(end_date, timezone.datetime.max.time()))
previous_start = current_start - timedelta(days=period_length)
previous_end = current_start
# Current period
current_metrics = self.summary_service._aggregate_period_metrics(current_start, current_end, hospital)
# Previous period average
previous_metrics = self.summary_service._aggregate_period_metrics(previous_start, previous_end, hospital)
metrics = dict(current_metrics)
# Add variance for key metrics
for key in [
"complaints_total",
"surveys_satisfaction",
"surveys_nps",
"actions_total",
"observations_total",
]:
current_val = current_metrics.get(key, 0)
previous_val = previous_metrics.get(key, 0)
variance_data = self.summary_service.calculate_variance(current_val, previous_val)
metrics[f"{key}_variance"] = variance_data["percentage"]
return metrics
def _call_ai(self, prompt: str) -> str:
from apps.core.ai_service import AIService
model = AIService._strip_model_prefix(self.MODEL_NAME)
messages = [{"role": "user", "content": prompt}]
return AIService._openrouter_completion(
model=model,
messages=messages,
temperature=0.4,
max_tokens=2000,
)
@staticmethod
def _clean_ai_response(text: str) -> str:
"""Strip markdown code fences from AI response before JSON parsing."""
text = text.strip()
if text.startswith("```"):
lines = text.split("\n")
if lines[0].startswith("```"):
lines = lines[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
text = "\n".join(lines)
return text.strip()
# =============================================================================
# PredictiveAnalyticsService
# =============================================================================
class PredictiveAnalyticsService:
"""
Service for predictive analytics, anomaly detection, and risk scoring.
Uses statistical methods to detect anomalies, predict SLA breaches,
identify trend changes, and calculate risk scores.
"""
def __init__(self) -> None:
self.summary_service = ExecutiveSummaryService()
def detect_anomalies(
self,
metric_type: str,
days: int = 90,
hospital=None,
) -> dict[str, Any]:
"""
Detect statistical anomalies in a metric over the specified period.
Uses z-score method: values beyond 2 standard deviations from the mean
are flagged as anomalies.
Args:
metric_type: The metric to analyze
days: Number of days to analyze (default 90)
hospital: Optional Hospital to filter by
Returns:
Dictionary with anomaly data including flagged dates and severity.
"""
trend_data = self.summary_service.get_trend_data(metric_type, days=days, hospital=hospital)
values = [point["value"] for point in trend_data]
if len(values) < 7:
return {
"metric_type": metric_type,
"days_analyzed": days,
"anomalies": [],
"message": "Insufficient data for anomaly detection (need at least 7 days)",
}
mean_val = statistics.mean(values)
stdev_val = statistics.stdev(values) if len(values) > 1 else 0
anomalies = []
if stdev_val > 0:
for point in trend_data:
z_score = abs((point["value"] - mean_val) / stdev_val)
if z_score > 2:
severity = "critical" if z_score > 3 else "high" if z_score > 2.5 else "medium"
anomalies.append(
{
"date": point["date"],
"value": point["value"],
"z_score": round(z_score, 2),
"severity": severity,
"deviation_from_mean": round(point["value"] - mean_val, 2),
}
)
return {
"metric_type": metric_type,
"days_analyzed": days,
"mean": round(mean_val, 2),
"stdev": round(stdev_val, 2),
"anomaly_count": len(anomalies),
"anomalies": anomalies,
}
def predict_sla_breach(
self,
hospital=None,
days_ahead: int = 7,
) -> list[dict[str, Any]]:
"""
Predict potential SLA breaches in the coming days.
Analyzes currently open/in-progress items approaching their SLA deadline
and estimates breach probability based on historical resolution patterns.
Args:
hospital: Optional Hospital to filter by
days_ahead: Number of days to predict ahead (default 7)
Returns:
List of predicted breach risk items with probability and details.
"""
now = timezone.now()
prediction_horizon = now + timedelta(days=days_ahead)
predictions = []
# Analyze complaints
complaints_qs = self.summary_service.Complaint.objects.filter(
status__in=["open", "in_progress"],
due_at__lte=prediction_horizon,
due_at__gte=now,
)
if hospital:
complaints_qs = complaints_qs.filter(hospital=hospital)
for complaint in complaints_qs.select_related("hospital", "department"):
hours_remaining = (complaint.due_at - now).total_seconds() / 3600
age_hours = (now - complaint.created_at).total_seconds() / 3600
# Simple risk model: higher risk as deadline approaches and age increases
if hours_remaining > 0:
breach_probability = min(
100,
round(
(1 - (hours_remaining / max(age_hours, 1))) * 100 + 20,
1,
),
)
else:
breach_probability = 100.0
severity = "critical" if breach_probability > 80 else "high" if breach_probability > 60 else "medium"
predictions.append(
{
"entity_type": "complaint",
"entity_id": str(complaint.id),
"title": complaint.title,
"hospital": complaint.hospital.name if complaint.hospital else None,
"department": complaint.department.name if complaint.department else None,
"due_at": complaint.due_at.isoformat(),
"hours_remaining": round(hours_remaining, 1),
"age_hours": round(age_hours, 1),
"breach_probability": breach_probability,
"severity": severity,
}
)
# Analyze PX Actions
actions_qs = self.summary_service.PXAction.objects.filter(
status__in=["open", "in_progress"],
due_at__lte=prediction_horizon,
due_at__gte=now,
)
if hospital:
actions_qs = actions_qs.filter(hospital=hospital)
for action in actions_qs.select_related("hospital", "department"):
hours_remaining = (action.due_at - now).total_seconds() / 3600
age_hours = (now - action.created_at).total_seconds() / 3600
if hours_remaining > 0:
breach_probability = min(
100,
round(
(1 - (hours_remaining / max(age_hours, 1))) * 100 + 20,
1,
),
)
else:
breach_probability = 100.0
severity = "critical" if breach_probability > 80 else "high" if breach_probability > 60 else "medium"
predictions.append(
{
"entity_type": "px_action",
"entity_id": str(action.id),
"title": action.title,
"hospital": action.hospital.name if action.hospital else None,
"department": action.department.name if action.department else None,
"due_at": action.due_at.isoformat(),
"hours_remaining": round(hours_remaining, 1),
"age_hours": round(age_hours, 1),
"breach_probability": breach_probability,
"severity": severity,
}
)
# Analyze Observations
observations_qs = self.summary_service.Observation.objects.filter(
status__in=["new", "triaged", "assigned", "in_progress"],
due_at__lte=prediction_horizon,
due_at__gte=now,
)
if hospital:
observations_qs = observations_qs.filter(hospital=hospital)
for obs in observations_qs.select_related("hospital", "assigned_department"):
hours_remaining = (obs.due_at - now).total_seconds() / 3600
age_hours = (now - obs.created_at).total_seconds() / 3600
if hours_remaining > 0:
breach_probability = min(
100,
round(
(1 - (hours_remaining / max(age_hours, 1))) * 100 + 20,
1,
),
)
else:
breach_probability = 100.0
severity = "critical" if breach_probability > 80 else "high" if breach_probability > 60 else "medium"
predictions.append(
{
"entity_type": "observation",
"entity_id": str(obs.id),
"title": obs.title or obs.description[:100],
"hospital": obs.hospital.name if obs.hospital else None,
"department": obs.assigned_department.name if obs.assigned_department else None,
"due_at": obs.due_at.isoformat(),
"hours_remaining": round(hours_remaining, 1),
"age_hours": round(age_hours, 1),
"breach_probability": breach_probability,
"severity": severity,
}
)
# Sort by breach probability descending
predictions.sort(key=lambda x: x["breach_probability"], reverse=True)
return predictions
def identify_trend_changes(
self,
metric_type: str,
days: int = 60,
hospital=None,
) -> dict[str, Any]:
"""
Identify significant trend changes in a metric over the specified period.
Compares the most recent half of the period against the earlier half
to identify significant increases or decreases.
Args:
metric_type: The metric to analyze
days: Number of days to analyze (default 60)
hospital: Optional Hospital to filter by
Returns:
Dictionary with trend change analysis including direction and magnitude.
"""
trend_data = self.summary_service.get_trend_data(metric_type, days=days, hospital=hospital)
if len(trend_data) < 14:
return {
"metric_type": metric_type,
"days_analyzed": days,
"change_detected": False,
"message": "Insufficient data for trend analysis (need at least 14 days)",
}
mid_point = len(trend_data) // 2
first_half = [d["value"] for d in trend_data[:mid_point]]
second_half = [d["value"] for d in trend_data[mid_point:]]
first_avg = statistics.mean(first_half) if first_half else 0
second_avg = statistics.mean(second_half) if second_half else 0
if first_avg > 0:
change_percentage = ((second_avg - first_avg) / first_avg) * 100
else:
change_percentage = 0.0 if second_avg == 0 else 100.0
direction = "increasing" if change_percentage > 5 else "decreasing" if change_percentage < -5 else "stable"
significance = (
"significant" if abs(change_percentage) > 20 else "moderate" if abs(change_percentage) > 10 else "minor"
)
# Detect slope changes (acceleration/deceleration)
recent_trend = "accelerating"
if len(second_half) >= 7:
recent_week = second_half[-7:]
previous_week = second_half[:7] if len(second_half) > 7 else first_half[-7:]
recent_slope = (recent_week[-1] - recent_week[0]) / max(len(recent_week), 1)
previous_slope = (previous_week[-1] - previous_week[0]) / max(len(previous_week), 1)
if abs(recent_slope) > abs(previous_slope) * 1.5:
recent_trend = "accelerating"
elif abs(recent_slope) < abs(previous_slope) * 0.5:
recent_trend = "decelerating"
else:
recent_trend = "steady"
return {
"metric_type": metric_type,
"days_analyzed": days,
"first_half_avg": round(first_avg, 2),
"second_half_avg": round(second_avg, 2),
"change_percentage": round(change_percentage, 1),
"direction": direction,
"significance": significance,
"recent_trend": recent_trend,
"change_detected": abs(change_percentage) > 5,
}
def calculate_risk_scores(
self,
hospital=None,
) -> list[dict[str, Any]]:
"""
Calculate risk scores for hospitals or departments based on multiple factors.
Risk score is a composite of:
- SLA breach rate (weight: 30%)
- Overdue rate (weight: 25%)
- Satisfaction decline (weight: 20%)
- Critical volume (weight: 15%)
- Resolution time trend (weight: 10%)
Args:
hospital: Optional Hospital to calculate for (if None, calculates for all)
Returns:
List of risk score entries sorted by risk score descending.
"""
from apps.organizations.models import Hospital
now = timezone.now()
last_30d = now - timedelta(days=30)
previous_30d = now - timedelta(days=60)
hospitals_to_analyze = (
Hospital.objects.filter(status="active", id=hospital.id)
if hospital
else Hospital.objects.filter(status="active")
)
risk_scores = []
for h in hospitals_to_analyze:
# SLA breach rate
total_items = (
self.summary_service.Complaint.objects.filter(hospital=h, created_at__gte=last_30d).count()
+ self.summary_service.PXAction.objects.filter(hospital=h, created_at__gte=last_30d).count()
+ self.summary_service.Observation.objects.filter(hospital=h, created_at__gte=last_30d).count()
)
breached_items = (
self.summary_service.Complaint.objects.filter(hospital=h, breached_at__gte=last_30d).count()
+ self.summary_service.PXAction.objects.filter(
hospital=h, is_overdue=True, status__in=["open", "in_progress"]
).count()
+ self.summary_service.Observation.objects.filter(hospital=h, breached_at__gte=last_30d).count()
)
sla_breach_rate = (breached_items / total_items * 100) if total_items > 0 else 0
# Overdue rate
overdue_items = (
self.summary_service.Complaint.objects.filter(
hospital=h, is_overdue=True, status__in=["open", "in_progress"]
).count()
+ self.summary_service.PXAction.objects.filter(
hospital=h, is_overdue=True, status__in=["open", "in_progress"]
).count()
+ self.summary_service.Observation.objects.filter(
hospital=h, is_overdue=True, status__in=["new", "triaged", "assigned", "in_progress"]
).count()
)
active_items = (
self.summary_service.Complaint.objects.filter(hospital=h, status__in=["open", "in_progress"]).count()
+ self.summary_service.PXAction.objects.filter(hospital=h, status__in=["open", "in_progress"]).count()
+ self.summary_service.Observation.objects.filter(
hospital=h, status__in=["new", "triaged", "assigned", "in_progress"]
).count()
)
overdue_rate = (overdue_items / active_items * 100) if active_items > 0 else 0
# Satisfaction decline
current_satisfaction = (
self.summary_service.SurveyInstance.objects.filter(
hospital=h, completed_at__gte=last_30d, total_score__isnull=False
).aggregate(avg=Avg("total_score"))["avg"]
or 0
)
previous_satisfaction = (
self.summary_service.SurveyInstance.objects.filter(
hospital=h, completed_at__gte=previous_30d, completed_at__lt=last_30d, total_score__isnull=False
).aggregate(avg=Avg("total_score"))["avg"]
or 0
)
satisfaction_decline = float(previous_satisfaction) - float(current_satisfaction)
# Critical volume
critical_items = (
self.summary_service.Complaint.objects.filter(
hospital=h, severity="critical", created_at__gte=last_30d
).count()
+ self.summary_service.Observation.objects.filter(
hospital=h, severity="critical", created_at__gte=last_30d
).count()
)
critical_rate = (critical_items / total_items * 100) if total_items > 0 else 0
# Resolution time trend
current_resolution = self._get_avg_resolution_time(h, last_30d)
previous_resolution = self._get_avg_resolution_time(h, previous_30d)
if previous_resolution > 0:
resolution_time_change = ((current_resolution - previous_resolution) / previous_resolution) * 100
else:
resolution_time_change = 0
# Composite risk score (0-100, higher = more risk)
risk_score = (
min(sla_breach_rate, 100) * 0.30
+ min(overdue_rate, 100) * 0.25
+ min(max(satisfaction_decline * 20, 0), 100) * 0.20
+ min(critical_rate * 5, 100) * 0.15
+ min(max(resolution_time_change, 0), 100) * 0.10
)
risk_level = (
"critical" if risk_score > 70 else "high" if risk_score > 50 else "medium" if risk_score > 30 else "low"
)
risk_scores.append(
{
"hospital": h,
"hospital_name": h.name,
"risk_score": round(risk_score, 1),
"risk_level": risk_level,
"sla_breach_rate": round(sla_breach_rate, 1),
"overdue_rate": round(overdue_rate, 1),
"satisfaction_decline": round(satisfaction_decline, 2),
"critical_rate": round(critical_rate, 1),
"resolution_time_change_pct": round(resolution_time_change, 1),
}
)
risk_scores.sort(key=lambda x: x["risk_score"], reverse=True)
return risk_scores
def generate_predictive_insights(self) -> list:
"""
Create PredictiveInsight objects from anomaly detection, trend analysis,
and SLA breach predictions.
Returns:
List of created PredictiveInsight instances.
"""
from apps.executive_summary.models import PredictiveInsight
created_insights = []
try:
# 1. Detect anomalies across key metrics
for metric_type in [
"complaints_total",
"complaints_critical",
"surveys_satisfaction",
"actions_overdue",
]:
anomaly_result = self.detect_anomalies(metric_type, days=90)
for anomaly in anomaly_result.get("anomalies", []):
direction = "increased" if anomaly["deviation_from_mean"] > 0 else "decreased"
insight, created = PredictiveInsight.objects.get_or_create(
insight_type="anomaly",
metric_type=metric_type,
predicted_date=anomaly["date"],
defaults={
"current_value": Decimal(str(anomaly["value"])),
"severity": anomaly["severity"],
"title_en": f"Anomaly detected in {metric_type.replace('_', ' ').title()}",
"title_ar": f"تم اكتشاف شذوذ في {metric_type}",
"description_en": f"Statistical anomaly detected: value of {anomaly['value']} on {anomaly['date']} with z-score of {anomaly['z_score']}. Value {direction} by {abs(anomaly['deviation_from_mean']):.2f} from the mean of {anomaly_result.get('mean', 0):.2f}.",
"description_ar": f"تم اكتشاف شذوذ إحصائي: قيمة {anomaly['value']} في {anomaly['date']} بمعيار z-score {anomaly['z_score']}.",
"confidence_score": Decimal(str(min(100, anomaly["z_score"] * 30))),
"ai_model": "statistical_zscore",
"detection_metadata": {
"z_score": anomaly["z_score"],
"mean": anomaly_result.get("mean"),
"stdev": anomaly_result.get("stdev"),
},
},
)
if created:
created_insights.append(insight)
# 2. Predict SLA breaches
breach_predictions = self.predict_sla_breach(days_ahead=7)
for prediction in breach_predictions[:20]: # Limit to top 20
if prediction["breach_probability"] > 50:
severity_map = {"critical": "critical", "high": "high", "medium": "medium"}
insight, created = PredictiveInsight.objects.get_or_create(
insight_type="sla_breach_risk",
metric_type="sla_breach_prediction",
entity_type=prediction["entity_type"],
entity_id=prediction["entity_id"],
defaults={
"severity": severity_map.get(prediction["severity"], "medium"),
"title_en": f"SLA Breach Risk: {prediction['title'][:100]}",
"title_ar": f"خطر خرق اتفاقية مستوى الخدمة",
"description_en": f"Predicted SLA breach with {prediction['breach_probability']}% probability. {prediction['hours_remaining']} hours remaining until deadline for {prediction['entity_type']}.",
"description_ar": f"توقع خرق اتفاقية مستوى الخدمة بنسبة {prediction['breach_probability']}%. متبقي {prediction['hours_remaining']} ساعة حتى الموعد النهائي.",
"predicted_value": Decimal(str(prediction["breach_probability"])),
"confidence_score": Decimal(str(prediction["breach_probability"])),
"predicted_date": timezone.now().date() + timedelta(days=7),
"ai_model": "sla_prediction_model",
"detection_metadata": prediction,
},
)
if created:
created_insights.append(insight)
# 3. Identify trend changes
for metric_type in [
"complaints_total",
"surveys_satisfaction",
"actions_total",
]:
trend_change = self.identify_trend_changes(metric_type, days=60)
if trend_change.get("change_detected") and trend_change.get("significance") in [
"significant",
"moderate",
]:
insight_type = (
"positive_trend"
if trend_change["direction"] == "decreasing" and "complaint" in metric_type
else "trend_change"
)
if "satisfaction" in metric_type and trend_change["direction"] == "increasing":
insight_type = "positive_trend"
severity = "high" if trend_change["significance"] == "significant" else "medium"
insight, created = PredictiveInsight.objects.get_or_create(
insight_type=insight_type,
metric_type=metric_type,
defaults={
"severity": severity,
"title_en": f"Trend Change: {metric_type.replace('_', ' ').title()} is {trend_change['direction']}",
"title_ar": f"تغيير الاتجاه: {metric_type} في اتجاه {trend_change['direction']}",
"description_en": f"{trend_change['significance'].title()} {trend_change['direction']} trend detected in {metric_type}. Average changed from {trend_change['first_half_avg']:.2f} to {trend_change['second_half_avg']:.2f} ({trend_change['change_percentage']:+.1f}% change).",
"description_ar": f"تم اكتشاف اتجاه {trend_change['direction']} {trend_change['significance']} في {metric_type}.",
"current_value": Decimal(str(trend_change["second_half_avg"])),
"predicted_value": Decimal(str(trend_change["first_half_avg"])),
"confidence_score": Decimal(str(abs(trend_change["change_percentage"]))),
"ai_model": "trend_analysis",
"detection_metadata": trend_change,
},
)
if created:
created_insights.append(insight)
except Exception as e:
logger.error(f"Error generating predictive insights: {e}", exc_info=True)
return created_insights
# -------------------------------------------------------------------------
# Internal helpers
# -------------------------------------------------------------------------
def _get_avg_resolution_time(self, hospital, start_date) -> float:
"""Get average resolution time in hours for a hospital in a period."""
resolved = self.summary_service.Complaint.objects.filter(
hospital=hospital,
status="closed",
closed_at__isnull=False,
created_at__gte=start_date,
)
if resolved.exists():
total_hours = sum((c.closed_at - c.created_at).total_seconds() / 3600 for c in resolved)
return total_hours / resolved.count()
return 0.0
# =============================================================================
# RecommendationService
# =============================================================================
class RecommendationService:
"""
Service for generating AI recommendations based on predictive insights
and data analysis patterns.
Creates AIRecommendation objects with actionable guidance for
process improvements, resource allocation, and training needs.
"""
MODEL_NAME = "openrouter/google/gemma-3-27b-it:free"
def __init__(self) -> None:
self.predictive_service = PredictiveAnalyticsService()
self.summary_service = ExecutiveSummaryService()
def generate_recommendations_from_insights(self) -> list:
"""
Create AIRecommendation objects based on existing PredictiveInsights.
Analyzes unhandled predictive insights and generates actionable
recommendations for each significant insight.
Returns:
List of created AIRecommendation instances.
"""
from apps.executive_summary.models import AIRecommendation, PredictiveInsight
created_recommendations = []
try:
# Get active insights without existing recommendations
insights = (
PredictiveInsight.objects.filter(
status__in=["new", "acknowledged"],
)
.exclude(
recommendations__isnull=False,
)
.order_by("-severity", "-created_at")[:50]
)
for insight in insights:
recommendation_data = self._generate_recommendation_for_insight(insight)
if recommendation_data:
recommendation = AIRecommendation.objects.create(
category=recommendation_data.get("category", "process_improvement"),
priority=recommendation_data.get("priority", "medium"),
status="new",
title_en=recommendation_data.get("title_en", ""),
title_ar=recommendation_data.get("title_ar", ""),
description_en=recommendation_data.get("description_en", ""),
description_ar=recommendation_data.get("description_ar", ""),
expected_impact_en=recommendation_data.get("expected_impact_en", ""),
expected_impact_ar=recommendation_data.get("expected_impact_ar", ""),
hospital=insight.hospital,
department=insight.department,
related_insight=insight,
confidence_score=insight.confidence_score,
ai_model=self.MODEL_NAME,
generation_metadata={
"insight_type": insight.insight_type,
"insight_severity": insight.severity,
},
)
created_recommendations.append(recommendation)
except Exception as e:
logger.error(f"Error generating recommendations from insights: {e}", exc_info=True)
return created_recommendations
def analyze_best_practices(self) -> dict[str, Any]:
"""
Identify successful patterns from high-performing hospitals.
Analyzes hospitals in the top quartile of the leaderboard to identify
common practices and strategies that correlate with success.
Returns:
Dictionary with best practices analysis including common patterns
and specific recommendations for lower-performing hospitals.
"""
leaderboard = self.summary_service.get_hospital_leaderboard()
if len(leaderboard) < 4:
return {
"message": "Insufficient data for best practices analysis (need at least 4 hospitals)",
"best_practices": [],
}
# Top quartile (top 25%)
top_quartile_size = max(1, len(leaderboard) // 4)
top_hospitals = leaderboard[:top_quartile_size]
bottom_hospitals = leaderboard[top_quartile_size:]
# Analyze common patterns in top performers
top_resolution_rates = [h["resolution_rate"] for h in top_hospitals]
top_satisfaction_scores = [h["satisfaction_score"] for h in top_hospitals]
top_overdue_rates = [h["overdue_rate"] for h in top_hospitals]
top_action_closure_rates = [h["action_closure_rate"] for h in top_hospitals]
avg_top_resolution = statistics.mean(top_resolution_rates) if top_resolution_rates else 0
avg_top_satisfaction = statistics.mean(top_satisfaction_scores) if top_satisfaction_scores else 0
avg_top_overdue = statistics.mean(top_overdue_rates) if top_overdue_rates else 0
avg_top_closure = statistics.mean(top_action_closure_rates) if top_action_closure_rates else 0
best_practices = []
if avg_top_resolution > 70:
best_practices.append(
{
"practice": "High complaint resolution rate",
"metric": "resolution_rate",
"top_performer_avg": round(avg_top_resolution, 1),
"description": f"Top-performing hospitals achieve an average resolution rate of {avg_top_resolution:.1f}%, compared to lower performers.",
"recommendation": "Implement streamlined resolution workflows and set daily resolution targets.",
}
)
if avg_top_satisfaction > 3.5:
best_practices.append(
{
"practice": "High patient satisfaction",
"metric": "satisfaction_score",
"top_performer_avg": round(avg_top_satisfaction, 1),
"description": f"Top hospitals maintain satisfaction scores above {avg_top_satisfaction:.1f}/5.0.",
"recommendation": "Focus on proactive communication and rapid response to patient concerns.",
}
)
if avg_top_overdue < 15:
best_practices.append(
{
"practice": "Low overdue rate",
"metric": "overdue_rate",
"top_performer_avg": round(avg_top_overdue, 1),
"description": f"Best-in-class hospitals keep overdue rates below {avg_top_overdue:.1f}%.",
"recommendation": "Implement SLA monitoring dashboards and proactive escalation processes.",
}
)
# Gap analysis for lower performers
gap_analysis = []
for hospital in bottom_hospitals[:5]:
gaps = []
if hospital["resolution_rate"] < avg_top_resolution - 10:
gaps.append(
{
"area": "resolution_rate",
"current": hospital["resolution_rate"],
"target": round(avg_top_resolution, 1),
"gap": round(avg_top_resolution - hospital["resolution_rate"], 1),
}
)
if hospital["satisfaction_score"] < avg_top_satisfaction - 0.5:
gaps.append(
{
"area": "satisfaction",
"current": hospital["satisfaction_score"],
"target": round(avg_top_satisfaction, 1),
"gap": round(avg_top_satisfaction - hospital["satisfaction_score"], 1),
}
)
if gaps:
gap_analysis.append(
{
"hospital_name": hospital["hospital_name"],
"rank": hospital["rank"],
"gaps": gaps,
}
)
return {
"top_quartile_size": top_quartile_size,
"top_performer_averages": {
"resolution_rate": round(avg_top_resolution, 1),
"satisfaction_score": round(avg_top_satisfaction, 1),
"overdue_rate": round(avg_top_overdue, 1),
"action_closure_rate": round(avg_top_closure, 1),
},
"best_practices": best_practices,
"gap_analysis": gap_analysis,
}
def suggest_resource_allocation(self) -> dict[str, Any]:
"""
Suggest optimal resource allocation based on data patterns.
Analyzes workload distribution, bottleneck areas, and performance
gaps to recommend where resources should be allocated or reallocated.
Returns:
Dictionary with resource allocation recommendations by area.
"""
from apps.organizations.models import Hospital
now = timezone.now()
last_30d = now - timedelta(days=30)
suggestions: dict[str, Any] = {
"complaints_handling": [],
"survey_management": [],
"action_resolution": [],
"observation_triage": [],
"call_center": [],
}
hospitals = Hospital.objects.filter(status="active")
for hospital in hospitals:
# --- Complaints workload analysis ---
open_complaints = self.summary_service.Complaint.objects.filter(
hospital=hospital, status__in=["open", "in_progress"]
).count()
overdue_complaints = self.summary_service.Complaint.objects.filter(
hospital=hospital, is_overdue=True, status__in=["open", "in_progress"]
).count()
critical_complaints = self.summary_service.Complaint.objects.filter(
hospital=hospital, severity="critical", status__in=["open", "in_progress"]
).count()
if overdue_complaints > 5 or (open_complaints > 0 and overdue_complaints / open_complaints > 0.2):
suggestions["complaints_handling"].append(
{
"hospital": hospital.name,
"issue": "High overdue complaint rate",
"current_open": open_complaints,
"current_overdue": overdue_complaints,
"recommendation": f"Allocate additional complaint handling resources. {overdue_complaints} complaints are currently overdue.",
"priority": "high" if overdue_complaints > 10 else "medium",
}
)
if critical_complaints > 3:
suggestions["complaints_handling"].append(
{
"hospital": hospital.name,
"issue": "High critical complaint volume",
"critical_count": critical_complaints,
"recommendation": f"Prioritize critical complaint resolution. {critical_complaints} critical complaints require immediate attention.",
"priority": "urgent",
}
)
# --- Survey follow-up ---
negative_surveys = self.summary_service.SurveyInstance.objects.filter(
hospital=hospital, is_negative=True, completed_at__gte=last_30d
).count()
contacted = self.summary_service.SurveyInstance.objects.filter(
hospital=hospital, is_negative=True, completed_at__gte=last_30d, patient_contacted=True
).count()
follow_up_rate = (contacted / negative_surveys * 100) if negative_surveys > 0 else 100
if follow_up_rate < 70 and negative_surveys > 10:
suggestions["survey_management"].append(
{
"hospital": hospital.name,
"issue": "Low negative survey follow-up rate",
"negative_surveys": negative_surveys,
"contacted": contacted,
"follow_up_rate": round(follow_up_rate, 1),
"recommendation": f"Improve negative survey follow-up process. Only {follow_up_rate:.0f}% of {negative_surveys} negative surveys have been followed up.",
"priority": "high",
}
)
# --- Action resolution ---
open_actions = self.summary_service.PXAction.objects.filter(
hospital=hospital, status__in=["open", "in_progress"]
).count()
overdue_actions = self.summary_service.PXAction.objects.filter(
hospital=hospital, is_overdue=True, status__in=["open", "in_progress"]
).count()
if overdue_actions > 5:
suggestions["action_resolution"].append(
{
"hospital": hospital.name,
"issue": "High overdue action count",
"open_actions": open_actions,
"overdue_actions": overdue_actions,
"recommendation": f"Accelerate action resolution. {overdue_actions} actions are overdue.",
"priority": "high" if overdue_actions > 10 else "medium",
}
)
# --- Observation triage ---
new_observations = self.summary_service.Observation.objects.filter(hospital=hospital, status="new").count()
critical_observations = self.summary_service.Observation.objects.filter(
hospital=hospital, severity="critical", status__in=["new", "triaged"]
).count()
if new_observations > 10:
suggestions["observation_triage"].append(
{
"hospital": hospital.name,
"issue": "Observation triage backlog",
"new_observations": new_observations,
"critical_observations": critical_observations,
"recommendation": f"Clear observation triage backlog. {new_observations} observations awaiting triage, {critical_observations} are critical.",
"priority": "high" if critical_observations > 0 else "medium",
}
)
# --- Call Center Analysis ---
try:
from apps.callcenter.models import CallCenterInteraction
calls_7d = CallCenterInteraction.objects.filter(call_started_at__gte=now - timedelta(days=7))
total_calls = calls_7d.count()
if total_calls > 0:
low_rating_rate = calls_7d.filter(is_low_rating=True).count() / total_calls * 100
if low_rating_rate > 20:
suggestions["call_center"].append(
{
"issue": "High low-rating call rate",
"total_calls_7d": total_calls,
"low_rating_rate": round(low_rating_rate, 1),
"recommendation": f"Review call center training. {low_rating_rate:.1f}% of calls in the last 7 days received low ratings.",
"priority": "high",
}
)
except Exception:
pass
# Count total recommendations by priority
all_suggestions = []
for category_items in suggestions.values():
all_suggestions.extend(category_items)
urgent_count = sum(1 for s in all_suggestions if s.get("priority") == "urgent")
high_count = sum(1 for s in all_suggestions if s.get("priority") == "high")
medium_count = sum(1 for s in all_suggestions if s.get("priority") == "medium")
suggestions["summary"] = {
"total_recommendations": len(all_suggestions),
"urgent": urgent_count,
"high": high_count,
"medium": medium_count,
}
return suggestions
# -------------------------------------------------------------------------
# Internal helpers
# -------------------------------------------------------------------------
def _generate_recommendation_for_insight(self, insight) -> Optional[dict[str, str]]:
"""Generate a recommendation for a specific insight using AI."""
from apps.executive_summary.models import PredictiveInsight
try:
category_mapping = {
"trend_change": "process_improvement",
"anomaly": "quality_assurance",
"risk_warning": "preventive_action",
"sla_breach_risk": "process_improvement",
"performance_drop": "training",
"volume_spike": "resource_allocation",
"satisfaction_decline": "communication",
"positive_trend": "process_improvement",
}
category = category_mapping.get(insight.insight_type, "process_improvement")
priority = "high" if insight.severity in ["high", "critical"] else "medium"
prompt = f"""You are a healthcare patient experience (PX) consultant.
Based on the following predictive insight, generate a specific, actionable recommendation.
## Insight Details:
- Type: {insight.insight_type}
- Severity: {insight.severity}
- Title: {insight.title_en}
- Description: {insight.description_en}
- Metric: {insight.metric_type}
- Current Value: {insight.current_value}
- Predicted Value: {insight.predicted_value}
Generate a recommendation in both English and Arabic. Include:
1. A concise title (max 100 characters)
2. A detailed description of the recommended action
3. The expected impact if implemented
Format as JSON:
{{
"title_en": "...",
"title_ar": "...",
"description_en": "...",
"description_ar": "...",
"expected_impact_en": "...",
"expected_impact_ar": "..."
}}
"""
from apps.core.ai_service import AIService
raw = AIService._openrouter_completion(
model=self.MODEL_NAME,
messages=[{"role": "user", "content": prompt}],
temperature=0.4,
max_tokens=1000,
)
import json
try:
cleaned = AINarrativeService._clean_ai_response(raw)
result = json.loads(cleaned)
except json.JSONDecodeError:
# Fallback to basic recommendation
result = {
"title_en": f"Address {insight.insight_type}: {insight.title_en[:80]}",
"title_ar": insight.title_ar or f"معالجة {insight.insight_type}",
"description_en": insight.description_en,
"description_ar": insight.description_ar or "",
"expected_impact_en": "Addressing this insight should improve overall patient experience metrics.",
"expected_impact_ar": "معالجة هذا البصيرة يجب أن تحسن مقاييس تجربة المريض بشكل عام.",
}
return {
"category": category,
"priority": priority,
**result,
}
except Exception as e:
logger.error(f"Error generating recommendation for insight {insight.id}: {e}")
return None