""" Celery tasks for automated PX digest emails. Sends weekly/monthly AI-powered executive summaries to hospital admins. """ import logging from datetime import timedelta from celery import shared_task from django.conf import settings from django.core.mail import EmailMultiAlternatives from django.template.loader import render_to_string from django.utils import timezone from django.utils.html import strip_tags from apps.organizations.models import Hospital logger = logging.getLogger(__name__) @shared_task(bind=True, ignore_result=True, max_retries=3, default_retry_delay=300) def send_weekly_digest_task(self): """ Send weekly PX digest to all PX admins. Scheduled: Every Monday at 8 AM. """ _send_digest(self, period="weekly", days=7) @shared_task(bind=True, ignore_result=True, max_retries=3, default_retry_delay=300) def send_monthly_digest_task(self): """ Send monthly PX digest to all PX admins. Scheduled: 1st of each month at 8 AM. """ _send_digest(self, period="monthly", days=30) def _send_digest(task, period="weekly", days=7): """ Core digest generation and sending logic. For each active hospital: 1. Gather KPI data for the period 2. Generate AI executive summary 3. Detect early warnings and recommendations 4. Render email template 5. Send to all PX admins for that hospital """ from django.contrib.auth import get_user_model from apps.analytics.services.ai_analytics import ( ExecutiveSummaryGenerator, EarlyWarningSystem, ActionRecommendationEngine, ) User = get_user_model() hospitals = list(Hospital.objects.filter(status="active")) if not hospitals: logger.info("No active hospitals found, skipping digest") return now = timezone.now() start_date = now - timedelta(days=days) period_label = f"{start_date.strftime('%b %d')} – {now.strftime('%b %d, %Y')}" sent_count = 0 failed_count = 0 for hospital in hospitals: hospital_id = str(hospital.id) # Get PX admins for this hospital admins = User.objects.filter( is_active=True, hospital=hospital, ).filter(role="px_admin") | User.objects.filter(is_active=True, is_superuser=True) admins = list(admins.distinct()) if not admins: logger.info(f"No PX admins found for hospital {hospital.name}, skipping") continue admin_emails = [a.email for a in admins if a.email] if not admin_emails: logger.info(f"No admin emails found for hospital {hospital.name}, skipping") continue try: # Build a mock admin user for AI services class _MockUser: def __init__(self, u): self.id = u.id self.hospital = hospital self.department = None def is_px_admin(self): return True mock_admin = _MockUser(admins[0]) # Gather KPI metrics metrics = _gather_metrics(hospital, start_date, now) # Generate AI executive summary summary = ExecutiveSummaryGenerator.generate( mock_admin, hospital_id=hospital_id, period=f"{days}d" ) # Detect early warnings early_warnings = EarlyWarningSystem.detect( mock_admin, hospital_id=hospital_id, limit=5 ) # Get action recommendations recommendations = ActionRecommendationEngine.generate_recommendations( mock_admin, hospital_id=hospital_id, limit=3 ) # Render email subject = f"PX360 {period.title()} Digest — {hospital.name} ({period_label})" context = { "period": period, "period_label": period_label, "hospital_name": hospital.name, "summary": summary, "metrics": metrics, "early_warnings": early_warnings, "recommendations": recommendations, "dashboard_url": f"{getattr(settings, 'SITE_URL', '')}/analytics/dashboard/", "command_center_url": f"{getattr(settings, 'SITE_URL', '')}/analytics/command-center/", } html_content = render_to_string("emails/px_digest_weekly.html", context) text_content = strip_tags(html_content) # Send email email = EmailMultiAlternatives( subject=subject, body=text_content, from_email=getattr(settings, "DEFAULT_FROM_EMAIL", "noreply@px360.sa"), to=admin_emails, ) email.attach_alternative(html_content, "text/html") email.send() sent_count += len(admin_emails) logger.info( f"Sent {period} digest to {len(admin_emails)} admins at {hospital.name}: " f"{admin_emails}" ) except Exception as e: failed_count += 1 logger.exception( f"Failed to send {period} digest for hospital {hospital.name}: {e}" ) if failed_count <= 3: task.retry(exc=e) else: logger.error(f"Max retries reached for {period} digest, giving up") return { "period": period, "hospitals_processed": len(hospitals), "emails_sent": sent_count, "hospitals_failed": failed_count, } def _gather_metrics(hospital, start_date, now): """Gather key metrics for a hospital in the date range.""" from django.db.models import Avg, Count, F, Q from apps.complaints.models import Complaint from apps.surveys.models import SurveyInstance from apps.px_action_center.models import PXAction from apps.feedback.models import Feedback base_complaint = Complaint.objects.filter( created_at__gte=start_date, hospital=hospital ) base_survey = SurveyInstance.objects.filter( completed_at__gte=start_date, status="completed", survey_template__hospital=hospital ) base_action = PXAction.objects.filter( created_at__gte=start_date, hospital=hospital ) total_complaints = base_complaint.count() resolved = base_complaint.filter(status__in=["resolved", "closed"]).count() # SLA compliance total_with_sla = base_complaint.filter(due_at__isnull=False).count() resolved_within = base_complaint.filter( status__in=["resolved", "closed"], resolved_at__lte=F("due_at") ).count() sla_compliance = round((resolved_within / total_with_sla * 100), 1) if total_with_sla > 0 else 0 # NPS nps_surveys = base_survey.filter(survey_template__survey_type="nps", total_score__isnull=False) if nps_surveys.exists(): promoters = nps_surveys.filter(total_score__gte=9).count() detractors = nps_surveys.filter(total_score__lte=6).count() nps = round(((promoters - detractors) / nps_surveys.count() * 100), 1) if nps_surveys.count() > 0 else 0 else: nps = 0 # Survey avg avg_survey = base_survey.aggregate(avg=Avg("total_score"))["avg"] or 0 avg_survey = round(avg_survey, 2) # Total actions total_actions = base_action.count() # Avg resolution hours resolved_with_time = base_complaint.filter( status__in=["resolved", "closed"], resolved_at__isnull=False, activated_at__isnull=False ) if resolved_with_time.exists(): avg_res = resolved_with_time.annotate( rt=F("resolved_at") - F("activated_at") ).aggregate(avg=Avg("rt"))["avg"] avg_res_hrs = round((avg_res.total_seconds() / 3600), 1) if avg_res else 0 else: avg_res_hrs = 0 return { "total_complaints": total_complaints, "sla_compliance": sla_compliance, "nps_score": nps, "avg_survey_score": avg_survey, "total_actions": total_actions, "avg_resolution_hours": avg_res_hrs, }