HH/apps/analytics/tasks_digest.py
2026-04-08 17:13:35 +03:00

236 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Celery tasks for automated PX digest emails.
Sends weekly/monthly AI-powered executive summaries to hospital admins.
"""
import logging
from datetime import timedelta
from celery import shared_task
from django.conf import settings
from django.core.mail import EmailMultiAlternatives
from django.template.loader import render_to_string
from django.utils import timezone
from django.utils.html import strip_tags
from apps.organizations.models import Hospital
logger = logging.getLogger(__name__)
@shared_task(bind=True, ignore_result=True, max_retries=3, default_retry_delay=300)
def send_weekly_digest_task(self):
"""
Send weekly PX digest to all PX admins.
Scheduled: Every Monday at 8 AM.
"""
_send_digest(self, period="weekly", days=7)
@shared_task(bind=True, ignore_result=True, max_retries=3, default_retry_delay=300)
def send_monthly_digest_task(self):
"""
Send monthly PX digest to all PX admins.
Scheduled: 1st of each month at 8 AM.
"""
_send_digest(self, period="monthly", days=30)
def _send_digest(task, period="weekly", days=7):
"""
Core digest generation and sending logic.
For each active hospital:
1. Gather KPI data for the period
2. Generate AI executive summary
3. Detect early warnings and recommendations
4. Render email template
5. Send to all PX admins for that hospital
"""
from django.contrib.auth import get_user_model
from apps.analytics.services.ai_analytics import (
ExecutiveSummaryGenerator,
EarlyWarningSystem,
ActionRecommendationEngine,
)
User = get_user_model()
hospitals = list(Hospital.objects.filter(status="active"))
if not hospitals:
logger.info("No active hospitals found, skipping digest")
return
now = timezone.now()
start_date = now - timedelta(days=days)
period_label = f"{start_date.strftime('%b %d')} {now.strftime('%b %d, %Y')}"
sent_count = 0
failed_count = 0
for hospital in hospitals:
hospital_id = str(hospital.id)
# Get PX admins for this hospital
admins = User.objects.filter(
is_active=True,
hospital=hospital,
).filter(role="px_admin") | User.objects.filter(is_active=True, is_superuser=True)
admins = list(admins.distinct())
if not admins:
logger.info(f"No PX admins found for hospital {hospital.name}, skipping")
continue
admin_emails = [a.email for a in admins if a.email]
if not admin_emails:
logger.info(f"No admin emails found for hospital {hospital.name}, skipping")
continue
try:
# Build a mock admin user for AI services
class _MockUser:
def __init__(self, u):
self.id = u.id
self.hospital = hospital
self.department = None
def is_px_admin(self):
return True
mock_admin = _MockUser(admins[0])
# Gather KPI metrics
metrics = _gather_metrics(hospital, start_date, now)
# Generate AI executive summary
summary = ExecutiveSummaryGenerator.generate(
mock_admin, hospital_id=hospital_id, period=f"{days}d"
)
# Detect early warnings
early_warnings = EarlyWarningSystem.detect(
mock_admin, hospital_id=hospital_id, limit=5
)
# Get action recommendations
recommendations = ActionRecommendationEngine.generate_recommendations(
mock_admin, hospital_id=hospital_id, limit=3
)
# Render email
subject = f"PX360 {period.title()} Digest — {hospital.name} ({period_label})"
context = {
"period": period,
"period_label": period_label,
"hospital_name": hospital.name,
"summary": summary,
"metrics": metrics,
"early_warnings": early_warnings,
"recommendations": recommendations,
"dashboard_url": f"{getattr(settings, 'SITE_URL', '')}/analytics/dashboard/",
"command_center_url": f"{getattr(settings, 'SITE_URL', '')}/analytics/command-center/",
}
html_content = render_to_string("emails/px_digest_weekly.html", context)
text_content = strip_tags(html_content)
# Send email
email = EmailMultiAlternatives(
subject=subject,
body=text_content,
from_email=getattr(settings, "DEFAULT_FROM_EMAIL", "noreply@px360.sa"),
to=admin_emails,
)
email.attach_alternative(html_content, "text/html")
email.send()
sent_count += len(admin_emails)
logger.info(
f"Sent {period} digest to {len(admin_emails)} admins at {hospital.name}: "
f"{admin_emails}"
)
except Exception as e:
failed_count += 1
logger.exception(
f"Failed to send {period} digest for hospital {hospital.name}: {e}"
)
if failed_count <= 3:
task.retry(exc=e)
else:
logger.error(f"Max retries reached for {period} digest, giving up")
return {
"period": period,
"hospitals_processed": len(hospitals),
"emails_sent": sent_count,
"hospitals_failed": failed_count,
}
def _gather_metrics(hospital, start_date, now):
"""Gather key metrics for a hospital in the date range."""
from django.db.models import Avg, Count, F, Q
from apps.complaints.models import Complaint
from apps.surveys.models import SurveyInstance
from apps.px_action_center.models import PXAction
from apps.feedback.models import Feedback
base_complaint = Complaint.objects.filter(
created_at__gte=start_date, hospital=hospital
)
base_survey = SurveyInstance.objects.filter(
completed_at__gte=start_date, status="completed",
survey_template__hospital=hospital
)
base_action = PXAction.objects.filter(
created_at__gte=start_date, hospital=hospital
)
total_complaints = base_complaint.count()
resolved = base_complaint.filter(status__in=["resolved", "closed"]).count()
# SLA compliance
total_with_sla = base_complaint.filter(due_at__isnull=False).count()
resolved_within = base_complaint.filter(
status__in=["resolved", "closed"], resolved_at__lte=F("due_at")
).count()
sla_compliance = round((resolved_within / total_with_sla * 100), 1) if total_with_sla > 0 else 0
# NPS
nps_surveys = base_survey.filter(survey_template__survey_type="nps", total_score__isnull=False)
if nps_surveys.exists():
promoters = nps_surveys.filter(total_score__gte=9).count()
detractors = nps_surveys.filter(total_score__lte=6).count()
nps = round(((promoters - detractors) / nps_surveys.count() * 100), 1) if nps_surveys.count() > 0 else 0
else:
nps = 0
# Survey avg
avg_survey = base_survey.aggregate(avg=Avg("total_score"))["avg"] or 0
avg_survey = round(avg_survey, 2)
# Total actions
total_actions = base_action.count()
# Avg resolution hours
resolved_with_time = base_complaint.filter(
status__in=["resolved", "closed"], resolved_at__isnull=False, created_at__isnull=False
)
if resolved_with_time.exists():
avg_res = resolved_with_time.annotate(
rt=F("resolved_at") - F("created_at")
).aggregate(avg=Avg("rt"))["avg"]
avg_res_hrs = round((avg_res.total_seconds() / 3600), 1) if avg_res else 0
else:
avg_res_hrs = 0
return {
"total_complaints": total_complaints,
"sla_compliance": sla_compliance,
"nps_score": nps,
"avg_survey_score": avg_survey,
"total_actions": total_actions,
"avg_resolution_hours": avg_res_hrs,
}