"""
Executive Summary Views - AI-powered executive dashboard and analytics
This module provides views for:
- Executive dashboard (tabbed: Overview, Trends, Insights, Reports)
- Predictive insights management with HTMX acknowledge
- Executive report generation and PDF export
"""
import json
import logging
from datetime import timedelta, datetime
from django.contrib.auth.decorators import login_required
from django.contrib.auth.mixins import LoginRequiredMixin
from django.db.models import Avg, Count, Q
from django.http import JsonResponse, HttpResponse
from django.shortcuts import redirect, get_object_or_404
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.views import View
from django.views.generic import TemplateView
from django.template.loader import render_to_string
from django.contrib import messages
from .models import ExecutiveReport, PredictiveInsight, AIRecommendation, ExecutiveMetric
from .services import (
ExecutiveSummaryService,
AINarrativeService,
PredictiveAnalyticsService,
RecommendationService,
)
logger = logging.getLogger(__name__)
class ExecutiveAccessMixin:
"""Mixin to restrict access to executive and px_admin users only."""
def dispatch(self, request, *args, **kwargs):
if not request.user.is_authenticated:
return redirect("accounts:login")
if not (request.user.is_executive() or request.user.is_px_admin()):
messages.error(request, _("You do not have permission to access the executive dashboard."))
return redirect("core:home")
return super().dispatch(request, *args, **kwargs)
# =============================================================================
# Executive Dashboard
# =============================================================================
class ExecutiveDashboardView(ExecutiveAccessMixin, LoginRequiredMixin, TemplateView):
"""
Executive Dashboard - Tabbed AI-powered overview for top management.
Tabs: Overview | Trends | Insights | Reports
"""
template_name = "executive/dashboard.html"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
user = self.request.user
date_range, hospital = self._parse_filters(user)
summary_service = ExecutiveSummaryService()
dashboard_data = summary_service.get_dashboard_data(
hospital=hospital,
date_range=date_range,
)
kpis = dashboard_data.get("kpis", {})
variances = dashboard_data.get("variances", {})
trends = dashboard_data.get("trends", {})
context.update(self._get_kpi_cards(kpis, variances, hospital))
context.update(self._get_chart_data(trends))
context.update(self._get_risk_alerts())
context.update(self._get_insights_and_recommendations())
context.update(self._get_reports_context())
context.update(self._get_filters(user, hospital, date_range))
context.update(self._get_latest_report())
context["last_updated"] = timezone.now().strftime("%Y-%m-%d %H:%M")
context["is_px_admin"] = user.is_px_admin()
context["hospital_leaderboard"] = dashboard_data.get("hospital_leaderboard", [])[:7]
return context
# ------------------------------------------------------------------
def _parse_filters(self, user):
date_range = int(self.request.GET.get("date_range", 30))
hospital_id = self.request.GET.get("hospital")
hospital = None
if hospital_id:
from apps.organizations.models import Hospital
hospital = get_object_or_404(Hospital, id=hospital_id)
elif user.is_hospital_admin() and user.hospital:
hospital = user.hospital
return date_range, hospital
# ------------------------------------------------------------------
def _get_kpi_cards(self, kpis, variances, hospital):
c_var = variances.get("complaints_total", {})
c_var["is_positive"] = False
s_var = variances.get("surveys_satisfaction", {})
s_var["is_positive"] = True
a_var = variances.get("actions_total", {})
a_var["is_positive"] = False
sparklines = self._get_sparkline_data(hospital, days=7)
return {
"kpi_cards": {
"complaints": {
"total": int(kpis.get("complaints_total", 0)),
"critical": int(kpis.get("complaints_critical", 0)),
"overdue": int(kpis.get("complaints_overdue", 0)),
"variance": c_var,
"resolution_time": kpis.get("complaints_resolution_time", 0),
"sparkline": sparklines.get("complaints_total", []),
},
"surveys": {
"total": int(kpis.get("surveys_total", 0)),
"satisfaction": float(kpis.get("surveys_satisfaction", 0)),
"nps": float(kpis.get("surveys_nps", 0)),
"variance": s_var,
"sparkline": sparklines.get("surveys_satisfaction", []),
},
"actions": {
"total": int(kpis.get("actions_total", 0)),
"open": int(kpis.get("actions_open", 0)),
"overdue": int(kpis.get("actions_overdue", 0)),
"closed": int(kpis.get("actions_closed", 0)),
"variance": a_var,
"sparkline": sparklines.get("actions_total", []),
},
},
}
# ------------------------------------------------------------------
def _get_sparkline_data(self, hospital, days=7):
svc = ExecutiveSummaryService()
result = {}
for mt in ["complaints_total", "surveys_satisfaction", "actions_total"]:
trend = svc.get_trend_data(mt, days=days, hospital=hospital)
result[mt] = json.dumps([round(p["value"], 1) for p in trend])
return result
# ------------------------------------------------------------------
def _get_chart_data(self, trends):
return {
"chart_data": {
"complaints_trend": json.dumps(trends.get("complaints_total", [])),
"surveys_satisfaction_trend": json.dumps(trends.get("surveys_satisfaction", [])),
"actions_trend": json.dumps(trends.get("actions_total", [])),
},
}
# ------------------------------------------------------------------
def _get_risk_alerts(self):
alerts = (
PredictiveInsight.objects.filter(
severity__in=["high", "critical"],
status__in=["new", "acknowledged"],
)
.select_related("hospital", "department")
.order_by("-severity", "-created_at")[:10]
)
return {
"risk_alerts": alerts,
"has_risk_alerts": alerts.exists(),
"risk_alerts_count": alerts.count(),
}
# ------------------------------------------------------------------
def _get_insights_and_recommendations(self):
recs = (
AIRecommendation.objects.filter(
status__in=["new", "under_review"],
)
.select_related("hospital", "department")
.order_by("-priority", "-created_at")[:5]
)
return {
"ai_recommendations": recs,
"insights_critical_count": PredictiveInsight.objects.filter(severity="critical", status="new").count(),
"insights_high_count": PredictiveInsight.objects.filter(severity="high", status="new").count(),
}
# ------------------------------------------------------------------
def _get_reports_context(self):
return {"recent_reports": ExecutiveReport.objects.all().order_by("-created_at")[:20]}
# ------------------------------------------------------------------
def _get_filters(self, user, hospital, date_range):
from apps.organizations.models import Hospital
if user.is_px_admin():
hospitals = Hospital.objects.filter(status="active")
elif user.is_hospital_admin() and user.hospital:
hospitals = [user.hospital]
else:
hospitals = Hospital.objects.filter(status="active")
return {
"available_hospitals": hospitals,
"selected_hospital": hospital,
"date_range": date_range,
"date_range_options": [
(7, _("Last 7 days")),
(14, _("Last 14 days")),
(30, _("Last 30 days")),
(60, _("Last 60 days")),
(90, _("Last 90 days")),
],
}
# ------------------------------------------------------------------
def _get_latest_report(self):
rpt = ExecutiveReport.objects.filter(status="completed").order_by("-created_at").first()
ctx = {"latest_report": rpt}
if rpt:
ctx.update(
{
"ai_narrative": rpt.narrative_en,
"ai_highlights": rpt.highlights_en,
"ai_concerns": rpt.concerns_en,
"report_period": f"{rpt.start_date} to {rpt.end_date}",
"report_type": rpt.get_report_type_display(),
}
)
else:
ctx.update(
{
"ai_narrative": None,
"ai_highlights": [],
"ai_concerns": [],
"report_period": None,
"report_type": None,
}
)
return ctx
# =============================================================================
# Dashboard API
# =============================================================================
@login_required
def executive_dashboard_api(request):
"""API endpoint for executive dashboard HTMX updates."""
if not (request.user.is_executive() or request.user.is_px_admin()):
return JsonResponse({"error": _("Permission denied")}, status=403)
date_range = int(request.GET.get("date_range", 30))
hospital_id = request.GET.get("hospital")
hospital = None
if hospital_id:
from apps.organizations.models import Hospital
try:
hospital = Hospital.objects.get(id=hospital_id)
except Hospital.DoesNotExist:
pass
dashboard_data = ExecutiveSummaryService().get_dashboard_data(hospital=hospital, date_range=date_range)
risk_alerts = PredictiveInsight.objects.filter(
severity__in=["high", "critical"],
status__in=["new", "acknowledged"],
).order_by("-severity", "-created_at")[:5]
latest_report = ExecutiveReport.objects.filter(status="completed").order_by("-created_at").first()
return JsonResponse(
{
"kpis": dashboard_data.get("kpis", {}),
"variances": dashboard_data.get("variances", {}),
"risk_alerts": [
{
"id": str(i.id),
"title": i.title_en,
"severity": i.severity,
"insight_type": i.insight_type,
"hospital": i.hospital.name if i.hospital else None,
"created_at": i.created_at.isoformat(),
}
for i in risk_alerts
],
"hospital_leaderboard": dashboard_data.get("hospital_leaderboard", [])[:5],
"ai_narrative": latest_report.narrative_en if latest_report else None,
"last_updated": timezone.now().isoformat(),
}
)
# =============================================================================
# Predictive Insights
# =============================================================================
class PredictiveInsightsView(ExecutiveAccessMixin, LoginRequiredMixin, TemplateView):
"""Predictive Insights - View and manage AI-detected patterns and risks."""
template_name = "executive/insights.html"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
user = self.request.user
severity_filter = self.request.GET.get("severity", "")
status_filter = self.request.GET.get("status", "")
insight_type_filter = self.request.GET.get("insight_type", "")
hospital_id = self.request.GET.get("hospital")
qs = PredictiveInsight.objects.select_related("hospital", "department", "acknowledged_by").order_by(
"-severity", "-created_at"
)
if severity_filter:
qs = qs.filter(severity=severity_filter)
if status_filter:
qs = qs.filter(status=status_filter)
if insight_type_filter:
qs = qs.filter(insight_type=insight_type_filter)
if hospital_id:
from apps.organizations.models import Hospital
try:
qs = qs.filter(hospital=Hospital.objects.get(id=hospital_id))
except Hospital.DoesNotExist:
pass
elif user.is_hospital_admin() and user.hospital:
qs = qs.filter(hospital=user.hospital)
context["total_insights"] = qs.count()
context["new_insights"] = qs.filter(status="new").count()
context["critical_insights"] = qs.filter(severity="critical").count()
context["high_insights"] = qs.filter(severity="high").count()
from django.core.paginator import Paginator
paginator = Paginator(qs, 25)
context["insights"] = paginator.get_page(self.request.GET.get("page"))
context["severity_choices"] = PredictiveInsight.SEVERITY_LEVELS
context["status_choices"] = PredictiveInsight.STATUS_CHOICES
context["insight_type_choices"] = PredictiveInsight.INSIGHT_TYPES
if user.is_px_admin():
from apps.organizations.models import Hospital
context["available_hospitals"] = Hospital.objects.filter(predictive_insights__isnull=False).distinct()
else:
context["available_hospitals"] = []
context["current_filters"] = {
"severity": severity_filter,
"status": status_filter,
"insight_type": insight_type_filter,
"hospital": hospital_id,
}
return context
@login_required
def acknowledge_insight(request, insight_id):
"""Acknowledge a predictive insight via HTMX or JSON."""
if not (request.user.is_executive() or request.user.is_px_admin()):
return JsonResponse({"error": _("Permission denied")}, status=403)
insight = get_object_or_404(PredictiveInsight, id=insight_id)
if request.method == "POST":
try:
insight.status = "acknowledged"
insight.acknowledged_by = request.user
insight.acknowledged_at = timezone.now()
insight.save(update_fields=["status", "acknowledged_by", "acknowledged_at"])
if request.headers.get("HX-Request"):
return HttpResponse(
''
'" + str(_("Acknowledged")) + ""
)
return JsonResponse(
{
"status": "success",
"message": _("Insight acknowledged successfully"),
"insight_id": str(insight.id),
}
)
except Exception as e:
logger.error(f"Error acknowledging insight {insight_id}: {e}", exc_info=True)
if request.headers.get("HX-Request"):
return HttpResponse(
'' + str(_("Error acknowledging insight")) + "",
status=500,
)
return JsonResponse({"error": _("Error acknowledging insight"), "details": str(e)}, status=500)
return JsonResponse({"error": _("POST required")}, status=405)
# =============================================================================
# Per-Tab AI Analysis (HTMX)
# =============================================================================
def _get_analysis_filters(request):
"""Extract date_range and hospital from request (POST or GET params)."""
date_range = int(request.POST.get("date_range") or request.GET.get("date_range", 30))
hospital_id = request.POST.get("hospital") or request.GET.get("hospital")
hospital = None
if hospital_id:
from apps.organizations.models import Hospital
try:
hospital = Hospital.objects.get(id=hospital_id)
except Hospital.DoesNotExist:
pass
elif request.user.is_hospital_admin() and request.user.hospital:
hospital = request.user.hospital
return date_range, hospital
@login_required
def analyze_overview(request):
"""On-demand AI analysis for the Overview tab (HTMX POST)."""
if not (request.user.is_executive() or request.user.is_px_admin()):
return JsonResponse({"error": _("Permission denied")}, status=403)
date_range, hospital = _get_analysis_filters(request)
now = timezone.now()
start_date = now - timedelta(days=date_range)
svc = ExecutiveSummaryService()
dashboard_data = svc.get_dashboard_data(hospital=hospital, date_range=date_range)
trends = dashboard_data.get("trends", {})
ai_svc = AINarrativeService()
analysis = ai_svc.generate_overview_analysis(
start_date=start_date.date(),
end_date=now.date(),
hospital=hospital,
)
html = render_to_string(
"executive/partials/ai_overview_card.html",
{"analysis": analysis},
request=request,
)
return HttpResponse(html)
@login_required
def analyze_trends(request):
"""On-demand AI analysis for the Trends tab (HTMX POST)."""
if not (request.user.is_executive() or request.user.is_px_admin()):
return JsonResponse({"error": _("Permission denied")}, status=403)
date_range, hospital = _get_analysis_filters(request)
now = timezone.now()
start_date = now - timedelta(days=date_range)
svc = ExecutiveSummaryService()
dashboard_data = svc.get_dashboard_data(hospital=hospital, date_range=date_range)
trends = dashboard_data.get("trends", {})
leaderboard = dashboard_data.get("hospital_leaderboard", [])
ai_svc = AINarrativeService()
analysis = ai_svc.generate_trends_analysis(
start_date=start_date.date(),
end_date=now.date(),
hospital=hospital,
trends_data=trends,
leaderboard=leaderboard,
)
html = render_to_string(
"executive/partials/ai_trends_card.html",
{"analysis": analysis},
request=request,
)
return HttpResponse(html)
@login_required
def analyze_insights(request):
"""On-demand AI analysis for the Insights tab (HTMX POST)."""
if not (request.user.is_executive() or request.user.is_px_admin()):
return JsonResponse({"error": _("Permission denied")}, status=403)
date_range, hospital = _get_analysis_filters(request)
risk_alerts = list(
PredictiveInsight.objects.filter(
severity__in=["high", "critical"],
status__in=["new", "acknowledged"],
)
.select_related("hospital", "department")
.order_by("-severity", "-created_at")[:10]
)
ai_recs = list(
AIRecommendation.objects.filter(status__in=["new", "under_review"])
.select_related("hospital", "department")
.order_by("-priority", "-created_at")[:5]
)
ai_svc = AINarrativeService()
analysis = ai_svc.generate_insights_analysis(
hospital=hospital,
risk_alerts=risk_alerts,
ai_recommendations=ai_recs,
)
html = render_to_string(
"executive/partials/ai_insights_card.html",
{"analysis": analysis},
request=request,
)
return HttpResponse(html)
# =============================================================================
# Report Generation
# =============================================================================
@login_required
def generate_executive_report(request):
"""Trigger AI report generation for executive summary."""
if not (request.user.is_executive() or request.user.is_px_admin()):
messages.error(request, _("You do not have permission to generate reports."))
return redirect("executive:executive_dashboard")
if request.method != "POST":
return redirect("executive:executive_dashboard")
report_type = request.POST.get("report_type", "weekly")
start_date_str = request.POST.get("start_date")
end_date_str = request.POST.get("end_date")
now = timezone.now()
if start_date_str and end_date_str:
try:
start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()
end_date = datetime.strptime(end_date_str, "%Y-%m-%d").date()
except ValueError:
messages.error(request, _("Invalid date format. Use YYYY-MM-DD."))
return redirect("executive:executive_dashboard")
else:
end_date = now.date()
deltas = {"weekly": 7, "monthly": 30, "quarterly": 90}
start_date = end_date - timedelta(days=deltas.get(report_type, 7))
try:
report = ExecutiveReport.objects.create(
report_type=report_type,
status="generating",
start_date=start_date,
end_date=end_date,
ai_model="openrouter/google/gemma-3-27b-it:free",
)
narrative_service = AINarrativeService()
method_map = {
"monthly": ("generate_monthly_narrative", "monthly"),
"quarterly": ("generate_quarterly_narrative", "quarterly"),
}
method_name, ar_type = method_map.get(report_type, ("generate_weekly_narrative", "weekly"))
en_result = getattr(narrative_service, method_name)(start_date=start_date, end_date=end_date)
ar_result = narrative_service.generate_arabic_narrative(
start_date=start_date,
end_date=end_date,
report_type=ar_type,
)
report.status = en_result.get("status", "completed")
report.narrative_en = en_result.get("narrative_en", "")
report.narrative_ar = ar_result.get("narrative_ar", "")
report.highlights_en = en_result.get("highlights_en", [])
report.highlights_ar = ar_result.get("highlights_ar", [])
report.concerns_en = en_result.get("concerns_en", [])
report.concerns_ar = ar_result.get("concerns_ar", [])
report.ai_model = en_result.get("ai_model", "")
report.error_message = en_result.get("error_message", "") or ar_result.get("error_message", "")
metrics = ExecutiveSummaryService().aggregate_daily_metrics(target_date=end_date)
report.metrics_snapshot = {k: float(v) for k, v in metrics.items()}
report.save()
try:
from .pdf_service import generate_executive_pdf
from django.core.files.base import ContentFile
pdf_bytes = generate_executive_pdf(report, user=request.user)
report.pdf_file.save(
f"executive_report_{report.id}_{report.start_date}.pdf",
ContentFile(pdf_bytes),
save=True,
)
except Exception as pdf_err:
logger.error(f"Error saving PDF for report {report.id}: {pdf_err}", exc_info=True)
try:
PredictiveAnalyticsService().generate_predictive_insights()
RecommendationService().generate_recommendations_from_insights()
except Exception as e:
logger.error(f"Error generating insights/recommendations: {e}", exc_info=True)
messages.success(
request,
_("%(report_type)s report generated successfully.") % {"report_type": report.get_report_type_display()},
)
return redirect("executive:executive_dashboard")
except Exception as e:
logger.error(f"Error generating executive report: {e}", exc_info=True)
messages.error(request, _("Failed to generate report. Please try again."))
if "report" in locals():
report.status = "failed"
report.error_message = str(e)
report.save()
return redirect("executive:executive_dashboard")
# =============================================================================
# PDF Export
# =============================================================================
class ExecutivePDFReportView(ExecutiveAccessMixin, LoginRequiredMixin, View):
"""Generate and download PDF version of an executive report."""
def get(self, request, report_id, *args, **kwargs):
report = get_object_or_404(ExecutiveReport, id=report_id)
if not (request.user.is_executive() or request.user.is_px_admin()):
messages.error(request, _("You do not have permission to access this report."))
return redirect("executive:executive_dashboard")
try:
if report.pdf_file:
resp = HttpResponse(report.pdf_file.read(), content_type="application/pdf")
resp["Content-Disposition"] = (
f'attachment; filename="executive_report_{report.id}_{report.start_date}.pdf"'
)
return resp
from .pdf_service import generate_executive_pdf
from django.core.files.base import ContentFile
pdf_bytes = generate_executive_pdf(report, user=request.user)
report.pdf_file.save(
f"executive_report_{report.id}_{report.start_date}.pdf",
ContentFile(pdf_bytes),
save=True,
)
resp = HttpResponse(pdf_bytes, content_type="application/pdf")
resp["Content-Disposition"] = f'attachment; filename="executive_report_{report.id}_{report.start_date}.pdf"'
return resp
except Exception as e:
logger.error(f"Error generating PDF for report {report_id}: {e}", exc_info=True)
messages.error(request, _("Failed to generate PDF report."))
return redirect("executive:executive_dashboard")