HH/apps/analytics/services/ai_analytics.py
ismail 23d439f5a5 fix: harden multi-tenant data isolation across 8 modules
Pre-production security fixes to prevent cross-hospital data leaks:

- Standards API: add get_queryset() filtering by department__hospital
- Reports service: add user param with hospital filtering to all querysets
- RCA views: replace is_superuser with tenant_hospital pattern, add access
  checks to all 11 mutation views
- Notifications views: replace is_superuser patterns with _get_notification_hospital
  helper across all 5 settings functions
- Appreciation API: add tenant_hospital fallback to AppreciationViewSet,
  AppreciationStatsViewSet, and LeaderboardView
- AI Analytics: add tenant_hospital fallback in ExecutiveSummaryGenerator and
  ActionRecommendationEngine
- SourceUserRestrictionMiddleware: remove None from ALLOWED_URL_NAMES
- Complaint export: fix nullable patient/due_at/description crashes in CSV
  and Excel export, fix invalid get_category_display/get_source_display calls

E2E test updates:
- Update isolation gap tests to actively assert hospital filtering
- Fix CSV export test to use API context for download handling
- Switch clinical-staff tests to serial mode to prevent race conditions
2026-04-07 01:23:10 +03:00

1121 lines
46 KiB
Python

"""
AI-Powered Analytics Service
Provides 5 advanced AI features:
1. AI-Generated Executive Summaries
2. Early Warning System (at-risk departments)
3. Predictive Complaint Volume (time-series forecasting)
4. SLA Breach Prediction
5. Automated Action Recommendations
"""
import json
import logging
import math
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from django.conf import settings
from django.core.cache import cache
from django.db.models import Avg, Count, F, Q
from django.utils import timezone
from apps.complaints.models import Complaint
from apps.feedback.models import Feedback
from apps.organizations.models import Department, Hospital
from apps.px_action_center.models import PXAction
from apps.surveys.models import SurveyInstance
logger = logging.getLogger(__name__)
# =============================================================================
# Shared AI Service Wrapper (uses apps.core.ai_service — reads model from .env)
# =============================================================================
def _ai_chat(system_prompt: str, user_prompt: str, max_tokens: int = 4096) -> Optional[str]:
"""Chat with the shared AIService. Model config is read from .env via settings."""
from apps.core.ai_service import AIService, AIServiceError
try:
result = AIService.chat_completion(
prompt=user_prompt,
system_prompt=system_prompt,
max_tokens=max_tokens,
response_format="json_object",
)
result = result.strip()
if result.startswith("```json"):
result = result[7:]
elif result.startswith("```"):
result = result[3:]
if result.endswith("```"):
result = result[:-3]
return result.strip()
except (AIServiceError, Exception) as e:
logger.error(f"AI chat error: {e}")
return None
class _OpenRouterClient:
"""Compatibility wrapper — delegates to the shared AIService."""
def is_configured(self) -> bool:
return True # AIService manages its own config
def chat(
self, system_prompt: str, user_prompt: str, temperature: float = 0.2, max_tokens: int = 4096
) -> Optional[str]:
return _ai_chat(system_prompt, user_prompt, max_tokens)
_client = _OpenRouterClient()
# =============================================================================
# 1. AI-Generated Executive Summaries
# =============================================================================
class ExecutiveSummaryGenerator:
"""
Generates bilingual (EN/AR) executive summaries by feeding
aggregated KPI data to the LLM.
"""
CACHE_TIMEOUT = 3600 # 1 hour
@staticmethod
def _gather_data(user, hospital_id=None, department_id=None, period="30d") -> Dict[str, Any]:
"""Collect all data needed for the summary."""
now = timezone.now()
if period == "7d":
start = now - timedelta(days=7)
elif period == "90d":
start = now - timedelta(days=90)
else:
start = now - timedelta(days=30)
# Base filters
base_complaint = Complaint.objects.filter(created_at__gte=start)
base_action = PXAction.objects.filter(created_at__gte=start)
base_survey = SurveyInstance.objects.filter(completed_at__gte=start, status="completed")
base_feedback = Feedback.objects.filter(created_at__gte=start)
if hospital_id:
h = Hospital.objects.filter(id=hospital_id).first()
if h:
base_complaint = base_complaint.filter(hospital=h)
base_action = base_action.filter(hospital=h)
base_survey = base_survey.filter(survey_template__hospital=h)
base_feedback = base_feedback.filter(hospital=h)
elif hasattr(user, "hospital") and user.hospital and not getattr(user, "is_px_admin", lambda: False)():
base_complaint = base_complaint.filter(hospital=user.hospital)
base_action = base_action.filter(hospital=user.hospital)
base_survey = base_survey.filter(survey_template__hospital=user.hospital)
base_feedback = base_feedback.filter(hospital=user.hospital)
if department_id:
d = Department.objects.filter(id=department_id).first()
if d:
base_complaint = base_complaint.filter(department=d)
base_action = base_action.filter(department=d)
# Previous period for comparison
duration = now - start
prev_start = start - duration
prev_end = start
prev_complaints = Complaint.objects.filter(created_at__gte=prev_start, created_at__lt=prev_end)
if hospital_id:
prev_complaints = prev_complaints.filter(hospital_id=hospital_id)
if department_id:
prev_complaints = prev_complaints.filter(department_id=department_id)
# Complaints stats
total_complaints = base_complaint.count()
prev_complaint_count = prev_complaints.count()
resolved = base_complaint.filter(status__in=["resolved", "closed"]).count()
overdue = base_complaint.filter(is_overdue=True).count()
critical = base_complaint.filter(severity__in=["high", "critical"]).count()
# Top categories
top_cats = list(
base_complaint.filter(category__isnull=False)
.values("category__name_en")
.annotate(c=Count("id"))
.order_by("-c")[:5]
)
# Top departments by complaint volume
top_depts = list(
base_complaint.filter(department__isnull=False)
.values("department__name_en")
.annotate(c=Count("id"))
.order_by("-c")[:5]
)
# Actions stats
total_actions = base_action.count()
open_actions = base_action.filter(status__in=["open", "in_progress"]).count()
closed_actions = base_action.filter(status="closed").count()
overdue_actions = base_action.filter(is_overdue=True).count()
# Survey stats
total_surveys = base_survey.count()
avg_score = base_survey.aggregate(avg=Avg("total_score"))["avg"] or 0
negative = base_survey.filter(is_negative=True).count()
nps_surveys = base_survey.filter(survey_template__survey_type="nps", total_score__isnull=False)
if nps_surveys.exists():
promoters = nps_surveys.filter(total_score__gte=9).count()
detractors = nps_surveys.filter(total_score__lte=6).count()
nps = ((promoters - detractors) / nps_surveys.count() * 100) if nps_surveys.count() > 0 else 0
else:
nps = 0
# SLA compliance
total_with_sla = base_complaint.filter(due_at__isnull=False).count()
resolved_within = base_complaint.filter(status__in=["resolved", "closed"], resolved_at__lte=F("due_at")).count()
sla_rate = (resolved_within / total_with_sla * 100) if total_with_sla > 0 else 0
# Feedback
total_fb = base_feedback.count()
compliments = base_feedback.filter(feedback_type="compliment").count()
# Avg resolution hours
resolved_with_time = base_complaint.filter(
status__in=["resolved", "closed"], resolved_at__isnull=False, created_at__isnull=False
)
if resolved_with_time.exists():
avg_res_hrs = resolved_with_time.annotate(rt=F("resolved_at") - F("created_at")).aggregate(avg=Avg("rt"))[
"avg"
]
avg_res_hrs = (avg_res_hrs.total_seconds() / 3600) if avg_res_hrs else 0
else:
avg_res_hrs = 0
complaint_change_pct = 0
if prev_complaint_count > 0:
complaint_change_pct = ((total_complaints - prev_complaint_count) / prev_complaint_count) * 100
return {
"period": period,
"total_complaints": total_complaints,
"complaint_change_pct": round(complaint_change_pct, 1),
"resolved_complaints": resolved,
"overdue_complaints": overdue,
"critical_complaints": critical,
"top_categories": [c["category__name_en"] for c in top_cats],
"top_departments": [d["department__name_en"] for d in top_depts],
"total_actions": total_actions,
"open_actions": open_actions,
"closed_actions": closed_actions,
"overdue_actions": overdue_actions,
"total_surveys": total_surveys,
"avg_survey_score": round(avg_score, 2),
"negative_surveys": negative,
"nps_score": round(nps, 1),
"sla_compliance": round(sla_rate, 1),
"total_feedback": total_fb,
"compliments": compliments,
"avg_resolution_hours": round(avg_res_hrs, 1),
}
@classmethod
def generate(cls, user, hospital_id=None, department_id=None, period="30d", force_refresh=False) -> Dict[str, Any]:
"""Generate or return cached executive summary."""
cache_key = f"exec_summary_{user.id}_{hospital_id}_{department_id}_{period}"
if not force_refresh:
cached = cache.get(cache_key)
if cached:
return cached
data = cls._gather_data(user, hospital_id, department_id, period)
system_prompt = (
"You are a senior healthcare quality analyst writing an executive summary "
"for hospital leadership. Your summaries must be concise, data-driven, "
"and actionable. Always provide BOTH English and Arabic (فصحى معاصرة) versions. "
"Respond with ONLY valid JSON, no markdown."
)
top_cats_str = ", ".join(data["top_categories"]) if data["top_categories"] else "N/A"
top_depts_str = ", ".join(data["top_departments"]) if data["top_departments"] else "N/A"
direction = "up" if data["complaint_change_pct"] > 0 else "down"
user_prompt = f"""Write an executive summary for the past {data["period"]}. Use these exact data points:
COMPLAINTS: {data["total_complaints"]} total ({direction} {abs(data["complaint_change_pct"])}% vs prior period). {data["resolved_complaints"]} resolved, {data["overdue_complaints"]} overdue, {data["critical_complaints"]} critical/high severity. Avg resolution: {data["avg_resolution_hours"]}h. SLA compliance: {data["sla_compliance"]}%.
Top complaint categories: {top_cats_str}
Top departments by volume: {top_depts_str}
ACTIONS: {data["total_actions"]} total, {data["open_actions"]} open/in-progress, {data["closed_actions"]} closed, {data["overdue_actions"]} overdue.
SURVEYS: {data["total_surveys"]} responses. Avg score: {data["avg_survey_score"]}/5. {data["negative_surveys"]} negative. NPS: {data["nps_score"]}.
FEEDBACK: {data["total_feedback"]} items, {data["compliments"]} compliments.
Write a 4-5 sentence English summary, then a 4-5 sentence Arabic summary, then 3 specific recommended actions. Return ONLY this JSON structure:
{{
"summary_en": "...",
"summary_ar": "...",
"key_findings_en": ["finding 1", "finding 2", "finding 3"],
"key_findings_ar": ["اكتشاف 1", "اكتشاف 2", "اكتشاف 3"],
"recommendations_en": ["action 1", "action 2", "action 3"],
"recommendations_ar": ["إجراء 1", "إجراء 2", "إجراء 3"],
"risk_level": "low|medium|high"
}}"""
raw = _client.chat(system_prompt, user_prompt, temperature=0.2, max_tokens=2048)
if not raw:
result = {
"summary_en": "AI summary unavailable — check OpenRouter configuration.",
"summary_ar": "ملخص الذكاء الاصطناعي غير متاح — تحقق من إعدادات OpenRouter.",
"key_findings_en": [
f"{data['total_complaints']} complaints recorded",
f"SLA compliance at {data['sla_compliance']}%",
f"NPS: {data['nps_score']}",
],
"key_findings_ar": [
f"تم تسجيل {data['total_complaints']} شكوى",
f"الالتزام باتفاقية مستوى الخدمة {data['sla_compliance']}%",
f"صافي نقاط الترويج: {data['nps_score']}",
],
"recommendations_en": [
"Review overdue complaints",
"Address top complaint categories",
"Improve SLA compliance",
],
"recommendations_ar": [
"مراجعة الشكاوى المتأخرة",
"معالجة فئات الشكاوى الرئيسية",
"تحسين الالتزام باتفاقية مستوى الخدمة",
],
"risk_level": "medium",
}
else:
try:
result = json.loads(raw)
except json.JSONDecodeError:
result = {
"summary_en": raw[:500],
"summary_ar": "",
"key_findings_en": [],
"key_findings_ar": [],
"recommendations_en": [],
"recommendations_ar": [],
"risk_level": "medium",
}
result["_data"] = data # include raw numbers for UI display
cache.set(cache_key, result, cls.CACHE_TIMEOUT)
return result
# =============================================================================
# 2. Early Warning System — At-Risk Department Detection
# =============================================================================
class EarlyWarningSystem:
"""
Detects departments showing risk signals across multiple channels:
- Rising complaint volume
- Declining survey scores
- Increasing SLA breaches
- Negative social/call center feedback
Returns ranked list of at-risk departments.
"""
CACHE_TIMEOUT = 1800 # 30 minutes
RISK_WEIGHTS = {
"complaint_volume_spike": 25,
"survey_score_decline": 25,
"sla_breach_increase": 20,
"negative_feedback_rise": 15,
"overdue_actions_rise": 15,
}
@classmethod
def detect(cls, user, hospital_id=None, limit=10) -> List[Dict[str, Any]]:
"""Scan all active departments and return those with risk scores > threshold."""
cache_key = f"early_warning_{user.id}_{hospital_id}_{limit}"
cached = cache.get(cache_key)
if cached:
return cached
now = timezone.now()
current_start = now - timedelta(days=30)
prev_start = now - timedelta(days=60)
# Get departments
depts = Department.objects.filter(status="active")
if hospital_id:
depts = depts.filter(hospital_id=hospital_id)
elif hasattr(user, "hospital") and user.hospital and not getattr(user, "is_px_admin", lambda: False)():
depts = depts.filter(hospital=user.hospital)
results = []
for dept in depts:
signals = cls._evaluate_department(dept, current_start, prev_start, now)
risk_score = signals["risk_score"]
if risk_score > 20: # threshold — show anything above 20%
signals["department_id"] = str(dept.id)
signals["department_name"] = dept.name_en if hasattr(dept, "name_en") else str(dept)
results.append(signals)
results.sort(key=lambda x: x["risk_score"], reverse=True)
results = results[:limit]
cache.set(cache_key, results, cls.CACHE_TIMEOUT)
return results
@classmethod
def _evaluate_department(cls, dept, current_start, prev_start, now) -> Dict[str, Any]:
signals = {}
risk_score = 0
# 1. Complaint volume spike
current_complaints = Complaint.objects.filter(department=dept, created_at__gte=current_start).count()
prev_complaints = Complaint.objects.filter(
department=dept, created_at__gte=prev_start, created_at__lt=current_start
).count()
if prev_complaints > 0:
change_pct = ((current_complaints - prev_complaints) / prev_complaints) * 100
elif current_complaints > 0:
change_pct = 100
else:
change_pct = 0
volume_score = min(change_pct / 50 * 100, 100) # 50% spike = 100
risk_score += volume_score * cls.RISK_WEIGHTS["complaint_volume_spike"] / 100
signals["complaint_volume_spike"] = {
"current": current_complaints,
"previous": prev_complaints,
"change_pct": round(change_pct, 1),
"score": round(volume_score, 1),
}
# 2. Survey score decline
current_surveys = SurveyInstance.objects.filter(
journey_instance__department=dept,
completed_at__gte=current_start,
status="completed",
total_score__isnull=False,
)
prev_surveys = SurveyInstance.objects.filter(
journey_instance__department=dept,
completed_at__gte=prev_start,
completed_at__lt=current_start,
status="completed",
total_score__isnull=False,
)
curr_avg = current_surveys.aggregate(a=Avg("total_score"))["a"] or 0
prev_avg = prev_surveys.aggregate(a=Avg("total_score"))["a"] or 0
if prev_avg > 0:
survey_change = ((curr_avg - prev_avg) / prev_avg) * 100
else:
survey_change = 0
decline_score = max(0, min(-survey_change / 20 * 100, 100)) # 20% drop = 100
risk_score += decline_score * cls.RISK_WEIGHTS["survey_score_decline"] / 100
signals["survey_score_decline"] = {
"current_avg": round(curr_avg, 2),
"previous_avg": round(prev_avg, 2),
"change_pct": round(survey_change, 1),
"score": round(decline_score, 1),
}
# 3. SLA breach increase
curr_breached = Complaint.objects.filter(
department=dept, created_at__gte=current_start, is_overdue=True
).count()
curr_total_c = Complaint.objects.filter(department=dept, created_at__gte=current_start).count()
prev_breached = Complaint.objects.filter(
department=dept, created_at__gte=prev_start, created_at__lt=current_start, is_overdue=True
).count()
prev_total_c = Complaint.objects.filter(
department=dept, created_at__gte=prev_start, created_at__lt=current_start
).count()
curr_breach_rate = (curr_breached / curr_total_c * 100) if curr_total_c > 0 else 0
prev_breach_rate = (prev_breached / prev_total_c * 100) if prev_total_c > 0 else 0
if prev_breach_rate > 0:
breach_change = curr_breach_rate - prev_breach_rate
else:
breach_change = curr_breach_rate
sla_score = min(max(breach_change / 20 * 100, 0), 100)
risk_score += sla_score * cls.RISK_WEIGHTS["sla_breach_increase"] / 100
signals["sla_breach_increase"] = {
"current_rate": round(curr_breach_rate, 1),
"previous_rate": round(prev_breach_rate, 1),
"change_pp": round(breach_change, 1),
"score": round(sla_score, 1),
}
# 4. Negative feedback rise
curr_neg_fb = Feedback.objects.filter(
department=dept, created_at__gte=current_start, sentiment="negative"
).count()
prev_neg_fb = Feedback.objects.filter(
department=dept, created_at__gte=prev_start, created_at__lt=current_start, sentiment="negative"
).count()
if prev_neg_fb > 0:
fb_change = ((curr_neg_fb - prev_neg_fb) / prev_neg_fb) * 100
elif curr_neg_fb > 0:
fb_change = 100
else:
fb_change = 0
fb_score = min(max(fb_change / 50 * 100, 0), 100)
risk_score += fb_score * cls.RISK_WEIGHTS["negative_feedback_rise"] / 100
signals["negative_feedback_rise"] = {
"current": curr_neg_fb,
"previous": prev_neg_fb,
"change_pct": round(fb_change, 1),
"score": round(fb_score, 1),
}
# 5. Overdue actions rise
curr_overdue = PXAction.objects.filter(department=dept, created_at__gte=current_start, is_overdue=True).count()
prev_overdue = PXAction.objects.filter(
department=dept, created_at__gte=prev_start, created_at__lt=current_start, is_overdue=True
).count()
if prev_overdue > 0:
overdue_change = ((curr_overdue - prev_overdue) / prev_overdue) * 100
elif curr_overdue > 0:
overdue_change = 100
else:
overdue_change = 0
overdue_score = min(max(overdue_change / 50 * 100, 0), 100)
risk_score += overdue_score * cls.RISK_WEIGHTS["overdue_actions_rise"] / 100
signals["overdue_actions_rise"] = {
"current": curr_overdue,
"previous": prev_overdue,
"change_pct": round(overdue_change, 1),
"score": round(overdue_score, 1),
}
signals["risk_score"] = round(risk_score, 1)
signals["risk_level"] = (
"critical" if risk_score >= 70 else "high" if risk_score >= 50 else "medium" if risk_score >= 30 else "low"
)
signals["active_signals"] = sum(
1
for k in [
"complaint_volume_spike",
"survey_score_decline",
"sla_breach_increase",
"negative_feedback_rise",
"overdue_actions_rise",
]
if signals.get(k, {}).get("score", 0) > 30
)
return signals
# =============================================================================
# 3. Predictive Complaint Volume — Time-Series Forecasting
# =============================================================================
class ComplaintVolumeForecaster:
"""
Uses weighted moving average + seasonality detection to forecast
complaint volumes for the next 30/60/90 days with confidence bands.
"""
CACHE_TIMEOUT = 3600 # 1 hour
@classmethod
def forecast(cls, user, hospital_id=None, forecast_days=30) -> Dict[str, Any]:
cache_key = f"complaint_forecast_{user.id}_{hospital_id}_{forecast_days}"
cached = cache.get(cache_key)
if cached:
return cached
now = timezone.now()
# Need at least 90 days of history
history_start = now - timedelta(days=120)
qs = Complaint.objects.filter(created_at__gte=history_start)
if hospital_id:
qs = qs.filter(hospital_id=hospital_id)
elif hasattr(user, "hospital") and user.hospital and not getattr(user, "is_px_admin", lambda: False)():
qs = qs.filter(hospital=user.hospital)
# Aggregate daily counts
daily = list(
qs.extra(select={"date": "DATE(created_at)"}).values("date").annotate(count=Count("id")).order_by("date")
)
if len(daily) < 14:
return cls._insufficient_data_response(forecast_days)
# Build time series
dates = []
counts = []
current = history_start.date()
end = now.date()
date_map = {str(d["date"]): d["count"] for d in daily}
while current <= end:
dates.append(current)
counts.append(date_map.get(str(current), 0))
current += timedelta(days=1)
# 7-day weighted moving average forecast
weights = [0.05, 0.08, 0.12, 0.15, 0.20, 0.25, 0.15] # recent days weighted more
predictions = []
history = list(counts)
for day_offset in range(1, forecast_days + 1):
recent_7 = history[-7:]
# Pad if needed
while len(recent_7) < 7:
recent_7.insert(0, recent_7[0] if recent_7 else 0)
wma = sum(w * v for w, v in zip(weights, recent_7))
predicted = max(0, round(wma, 1))
predictions.append(predicted)
history.append(predicted) # feed back for next prediction
# Confidence bands widen with forecast horizon
recent_std = cls._std_dev(counts[-14:]) if len(counts) >= 14 else cls._std_dev(counts)
forecast_dates = [now.date() + timedelta(days=i) for i in range(1, forecast_days + 1)]
result_labels = [d.strftime("%Y-%m-%d") for d in forecast_dates]
upper_band = []
lower_band = []
for i, pred in enumerate(predictions):
# Confidence widens: 1 std at day 1, 3 std at day 30
std_multiplier = 1 + (i / forecast_days) * 2
margin = recent_std * std_multiplier
upper_band.append(round(pred + margin, 1))
lower_band.append(round(max(0, pred - margin), 1))
# Detect seasonality signal (day-of-week patterns)
dow_pattern = cls._detect_day_of_week_pattern(counts, dates)
total_predicted = round(sum(predictions), 0)
recent_30 = sum(counts[-30:]) if len(counts) >= 30 else sum(counts)
change_pct = ((total_predicted - recent_30) / recent_30 * 100) if recent_30 > 0 else 0
result = {
"labels": result_labels,
"predicted": predictions,
"upper_band": upper_band,
"lower_band": lower_band,
"total_predicted_30d": int(total_predicted),
"recent_30d_actual": recent_30,
"change_pct": round(change_pct, 1),
"confidence_level": "high" if len(counts) > 60 else "medium" if len(counts) > 30 else "low",
"day_of_week_pattern": dow_pattern,
}
cache.set(cache_key, result, cls.CACHE_TIMEOUT)
return result
@staticmethod
def _std_dev(values):
if not values:
return 0
mean = sum(values) / len(values)
variance = sum((x - mean) ** 2 for x in values) / len(values)
return math.sqrt(variance)
@staticmethod
def _detect_day_of_week_pattern(counts, dates):
"""Detect if certain days of week have higher complaint volumes."""
if len(counts) < 14:
return {}
dow_totals = {i: [] for i in range(7)}
for date, count in zip(dates, counts):
dow_totals[date.weekday()].append(count)
dow_avgs = {
["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][k]: round(sum(v) / len(v), 1) if v else 0
for k, v in dow_totals.items()
}
overall_avg = sum(dow_avgs.values()) / 7
pattern = {
day: "above_average"
if avg > overall_avg * 1.1
else "below_average"
if avg < overall_avg * 0.9
else "average"
for day, avg in dow_avgs.items()
}
return pattern
@staticmethod
def _insufficient_data_response(forecast_days):
return {
"labels": [],
"predicted": [],
"upper_band": [],
"lower_band": [],
"total_predicted_30d": 0,
"recent_30d_actual": 0,
"change_pct": 0,
"confidence_level": "insufficient_data",
"day_of_week_pattern": {},
"message": "At least 14 days of historical data required for forecasting.",
}
# =============================================================================
# 4. SLA Breach Prediction
# =============================================================================
class SLABreachPredictor:
"""
Predicts which open/in-progress complaints are at risk of breaching SLA.
Uses factors: age, severity, assigned staff workload, historical resolution time.
"""
CACHE_TIMEOUT = 900 # 15 minutes
@classmethod
def predict(cls, user, hospital_id=None, limit=20) -> List[Dict[str, Any]]:
"""Return complaints ranked by breach probability."""
cache_key = f"sla_breach_pred_{user.id}_{hospital_id}_{limit}"
cached = cache.get(cache_key)
if cached:
return cached
now = timezone.now()
qs = Complaint.objects.filter(
status__in=["open", "in_progress"],
due_at__isnull=False,
)
if hospital_id:
qs = qs.filter(hospital_id=hospital_id)
elif hasattr(user, "hospital") and user.hospital and not getattr(user, "is_px_admin", lambda: False)():
qs = qs.filter(hospital=user.hospital)
qs = qs.select_related("hospital", "department", "source", "assigned_to").order_by("due_at")
results = []
for complaint in qs[: limit * 2]: # fetch extra to filter/sort
prediction = cls._predict_complaint_breach(complaint, now)
if prediction["breach_probability"] > 30: # only show > 30% risk
prediction["complaint_id"] = str(complaint.id)
prediction["title"] = complaint.title
prediction["severity"] = complaint.severity
prediction["status"] = complaint.status
prediction["due_at"] = complaint.due_at.isoformat() if complaint.due_at else None
prediction["hours_remaining"] = (
round((complaint.due_at - now).total_seconds() / 3600, 1) if complaint.due_at else None
)
if complaint.department:
prediction["department"] = (
complaint.department.name_en
if hasattr(complaint.department, "name_en")
else str(complaint.department)
)
if complaint.assigned_to:
prediction["assigned_to"] = f"{complaint.assigned_to.first_name} {complaint.assigned_to.last_name}"
results.append(prediction)
results.sort(key=lambda x: x["breach_probability"], reverse=True)
results = results[:limit]
cache.set(cache_key, results, cls.CACHE_TIMEOUT)
return results
@classmethod
def _predict_complaint_breach(cls, complaint, now) -> Dict[str, Any]:
"""Calculate breach probability for a single complaint."""
probability = 0
factors = []
# Factor 1: Time remaining (0-35 points)
if complaint.due_at:
hours_remaining = (complaint.due_at - now).total_seconds() / 3600
total_sla_hours = (
(complaint.due_at - complaint.created_at).total_seconds() / 3600 if complaint.created_at else 24
)
if hours_remaining <= 0:
time_score = 35
factors.append("SLA already expired")
elif hours_remaining < 4:
time_score = 30
factors.append(f"Only {hours_remaining:.0f}h remaining")
elif hours_remaining < 12:
time_score = 20
factors.append(f"{hours_remaining:.0f}h remaining — urgent window")
elif hours_remaining < 24:
time_score = 10
factors.append(f"{hours_remaining:.0f}h remaining")
else:
time_pct = 1 - (hours_remaining / total_sla_hours) if total_sla_hours > 0 else 0
time_score = min(time_pct * 35, 35)
if time_pct > 0.7:
factors.append(f"{time_pct * 100:.0f}% of SLA time consumed")
else:
time_score = 15
factors.append("No SLA deadline set")
probability += time_score
# Factor 2: Severity (0-20 points)
severity_scores = {"critical": 20, "high": 15, "medium": 8, "low": 3}
sev_score = severity_scores.get(complaint.severity, 10)
probability += sev_score
if sev_score >= 15:
factors.append(f"{complaint.severity.capitalize()} severity — typically slower to resolve")
# Factor 3: Assignment status (0-15 points)
if not complaint.assigned_to_id:
probability += 15
factors.append("Unassigned — no owner yet")
else:
# Check assignee workload
workload = Complaint.objects.filter(
assigned_to=complaint.assigned_to,
status__in=["open", "in_progress"],
).count()
if workload >= 10:
probability += 12
factors.append(f"Assignee has {workload} active cases — overloaded")
elif workload >= 5:
probability += 6
factors.append(f"Assignee has {workload} active cases — moderate load")
# Factor 4: Historical resolution time for similar complaints (0-20 points)
similar = Complaint.objects.filter(
status__in=["resolved", "closed"],
severity=complaint.severity,
)
if complaint.department:
similar = similar.filter(department=complaint.department)
if similar.exists():
avg_resolve_hours = (
similar.filter(resolved_at__isnull=False, created_at__isnull=False)
.annotate(rt=F("resolved_at") - F("created_at"))
.aggregate(avg=Avg("rt"))["avg"]
)
if avg_resolve_hours:
avg_hrs = avg_resolve_hours.total_seconds() / 3600
total_sla_hrs = (
(complaint.due_at - complaint.created_at).total_seconds() / 3600
if complaint.due_at and complaint.created_at
else 24
)
if avg_hrs > total_sla_hrs * 0.9:
probability += 18
factors.append(f"Similar complaints avg {avg_hrs:.0f}h to resolve — exceeds SLA")
elif avg_hrs > total_sla_hrs * 0.7:
probability += 10
factors.append(f"Similar complaints avg {avg_hrs:.0f}h — close to SLA limit")
# Factor 5: Age without progress (0-10 points)
if complaint.created_at:
age_hours = (now - complaint.created_at).total_seconds() / 3600
if complaint.status == "open" and age_hours > 24:
probability += 10
factors.append(f"Open for {age_hours:.0f}h without status change")
elif complaint.status == "open" and age_hours > 12:
probability += 5
factors.append(f"Open for {age_hours:.0f}h")
probability = min(round(probability, 1), 100)
return {
"breach_probability": probability,
"risk_factors": factors,
"recommendation": cls._get_recommendation(probability, factors),
}
@staticmethod
def _get_recommendation(probability, factors):
if probability >= 80:
return "Immediate escalation required"
elif probability >= 60:
return "Escalate or reassign to available staff"
elif probability >= 40:
return "Add priority flag and monitor closely"
else:
return "On track — continue standard monitoring"
# =============================================================================
# 5. Automated Action Recommendations
# =============================================================================
class ActionRecommendationEngine:
"""
Analyzes clusters of similar complaints and recommends specific
PX Actions based on historical resolution patterns.
"""
CACHE_TIMEOUT = 3600 # 1 hour
@classmethod
def generate_recommendations(cls, user, hospital_id=None, department_id=None, limit=5) -> List[Dict[str, Any]]:
"""Generate AI-powered action recommendations from complaint analysis."""
cache_key = f"action_recommendations_{user.id}_{hospital_id}_{department_id}_{limit}"
cached = cache.get(cache_key)
if cached:
return cached
# Gather recent resolved complaints with resolution data
now = timezone.now()
start = now - timedelta(days=90)
qs = Complaint.objects.filter(
status__in=["resolved", "closed"],
created_at__gte=start,
resolution__isnull=False,
)
if hospital_id:
qs = qs.filter(hospital_id=hospital_id)
elif hasattr(user, "hospital") and user.hospital and not getattr(user, "is_px_admin", lambda: False)():
qs = qs.filter(hospital=user.hospital)
if department_id:
qs = qs.filter(department_id=department_id)
qs = qs.select_related("department").order_by("-resolved_at")
# Group by category/domain
category_data = {}
for c in qs[:200]:
cat = c.category if hasattr(c, "category") and c.category else "uncategorized"
if cat not in category_data:
category_data[cat] = {
"count": 0,
"resolutions": [],
"severities": [],
"departments": set(),
}
category_data[cat]["count"] += 1
if hasattr(c, "resolution") and c.resolution:
category_data[cat]["resolutions"].append(c.resolution[:200])
if hasattr(c, "severity"):
category_data[cat]["severities"].append(c.severity)
if c.department:
dept_name = c.department.name_en if hasattr(c.department, "name_en") else str(c.department)
category_data[cat]["departments"].add(dept_name)
# Filter to categories with enough volume (at least 3 complaints)
significant_categories = {k: v for k, v in category_data.items() if v["count"] >= 3}
if not significant_categories:
return cls._no_data_response()
# Use AI to generate recommendations for top categories
recommendations = []
top_categories = sorted(significant_categories.items(), key=lambda x: x[1]["count"], reverse=True)[:limit]
for category, data in top_categories:
rec = cls._generate_ai_recommendation(category, data)
if rec:
recommendations.append(rec)
# If AI is unavailable, fall back to rule-based recommendations
if not recommendations:
recommendations = cls._generate_rule_based_recommendations(top_categories)
cache.set(cache_key, recommendations, cls.CACHE_TIMEOUT)
return recommendations
@classmethod
def _generate_ai_recommendation(cls, category, data) -> Optional[Dict[str, Any]]:
"""Use OpenRouter to generate a recommendation for a complaint cluster."""
if not _client.is_configured():
return None
system_prompt = (
"You are a healthcare quality improvement expert. Given a cluster of similar complaints "
"and their resolutions, recommend specific, actionable improvement steps. "
"Respond with ONLY valid JSON, no markdown."
)
resolutions_sample = (
"\n".join(data["resolutions"][:5]) if data["resolutions"] else "No resolution data available."
)
most_common_sev = max(set(data["severities"]), key=data["severities"].count) if data["severities"] else "medium"
depts_str = ", ".join(list(data["departments"])[:3]) if data["departments"] else "Multiple departments"
user_prompt = f"""Category: {category}
Complaint count: {data["count"]}
Most common severity: {most_common_sev}
Affected departments: {depts_str}
Sample resolutions:
{resolutions_sample}
Based on this pattern, recommend 2-3 specific, actionable improvement actions. Return ONLY this JSON:
{{
"category": "{category}",
"problem_summary_en": "Brief description of the systemic issue",
"problem_summary_ar": "وصف موجز للمشكلة المنهجية",
"complaint_count": {data["count"]},
"affected_departments": ["dept1", "dept2"],
"recommended_actions_en": ["Action 1", "Action 2", "Action 3"],
"recommended_actions_ar": ["إجراء 1", "إجراء 2", "إجراء 3"],
"expected_impact_en": "Expected improvement if actions are implemented",
"priority": "low|medium|high|critical",
"action_category": "clinical_quality|patient_safety|service_quality|staff_behavior|facility|process_improvement|other"
}}"""
raw = _client.chat(system_prompt, user_prompt, temperature=0.2, max_tokens=1024)
if not raw:
return None
try:
result = json.loads(raw)
result["source"] = "ai_generated"
return result
except json.JSONDecodeError:
return None
@classmethod
def _generate_rule_based_recommendations(cls, top_categories) -> List[Dict[str, Any]]:
"""Fallback: generate recommendations without AI."""
recommendations = []
category_templates = {
"wait_time": {
"problem_en": "Recurring complaints about excessive wait times",
"problem_ar": "شكاوى متكررة حول أوقات الانتظار الطويلة",
"actions_en": [
"Review scheduling system",
"Add real-time wait tracking",
"Implement patient communication updates",
],
"actions_ar": ["مراجعة نظام الجدولة", "إضافة تتبع وقت الانتظار", "تطبيق تحديثات التواصل مع المرضى"],
"impact_en": "Reduce average wait time by 30% and improve patient satisfaction",
"category": "process_improvement",
"priority": "high",
},
"staff_behavior": {
"problem_en": "Pattern of complaints about staff communication and behavior",
"problem_ar": "نمط من الشكاوى حول سلوك وتواصل الموظفين",
"actions_en": [
"Conduct patient communication training",
"Implement feedback loop for staff performance",
"Establish patient relations team",
],
"actions_ar": [
"إجراء تدريب على التواصل مع المرضى",
"تطبيق حلقة التغذية الراجعة لأداء الموظفين",
"إنشاء فريق علاقات المرضى",
],
"impact_en": "Improve staff-patient relationships and reduce behavior-related complaints",
"category": "staff_behavior",
"priority": "high",
},
"clinical": {
"problem_en": "Clinical quality and care delivery concerns",
"problem_ar": "مخاوف حول الجودة السريرية وتقديم الرعاية",
"actions_en": ["Conduct clinical audit", "Review care protocols", "Implement peer review process"],
"actions_ar": ["إجراء مراجعة سريرية", "مراجعة بروتوكولات الرعاية", "تطبيق عملية مراجعة الأقران"],
"impact_en": "Improve clinical outcomes and patient safety metrics",
"category": "clinical_quality",
"priority": "critical",
},
"facility": {
"problem_en": "Facility condition and environmental complaints",
"problem_ar": "شكاوى حول حالة المنشأة والبيئة",
"actions_en": ["Conduct facility inspection", "Create maintenance schedule", "Upgrade affected areas"],
"actions_ar": ["إجراء فحص للمنشأة", "إنشاء جدول صيانة", "ترقية المناطق المتأثرة"],
"impact_en": "Improve patient environment and facility standards compliance",
"category": "facility",
"priority": "medium",
},
}
for category, data in top_categories:
template = None
for key, tmpl in category_templates.items():
if key in category.lower():
template = tmpl
break
if not template:
template = {
"problem_en": f"Recurring complaints in {category}",
"problem_ar": f"شكاوى متكررة في {category}",
"actions_en": [
"Investigate complaint pattern",
"Develop corrective action plan",
"Monitor improvement metrics",
],
"actions_ar": ["التحقيق في نمط الشكاوى", "وضع خطة عمل تصحيحية", "مراقبة مقاييس التحسين"],
"impact_en": f"Reduce {category}-related complaints",
"category": "other",
"priority": "medium",
}
recommendations.append(
{
"category": category,
"problem_summary_en": template["problem_en"],
"problem_summary_ar": template["problem_ar"],
"complaint_count": data["count"],
"affected_departments": list(data["departments"])[:5],
"recommended_actions_en": template["actions_en"],
"recommended_actions_ar": template["actions_ar"],
"expected_impact_en": template["impact_en"],
"priority": template["priority"],
"action_category": template["category"],
"source": "rule_based",
}
)
return recommendations
@staticmethod
def _no_data_response() -> List[Dict[str, Any]]:
return [
{
"category": "general",
"problem_summary_en": "Insufficient resolved complaint data to identify patterns",
"problem_summary_ar": "بيانات غير كافية لتحديد الأنماط",
"complaint_count": 0,
"affected_departments": [],
"recommended_actions_en": [
"Ensure all resolved complaints have resolution notes",
"Review complaint categorization",
"Build historical resolution patterns",
],
"recommended_actions_ar": [
"ضمان وجود ملاحظات حل لجميع الشكاوى المحلولة",
"مراجعة تصنيف الشكاوى",
"بناء أنماط الحل التاريخية",
],
"expected_impact_en": "Enable data-driven action recommendations",
"priority": "medium",
"action_category": "process_improvement",
"source": "system",
}
]