import logging from celery import shared_task from django.utils import timezone from apps.organizations.models import Hospital from .kpi_models import KPIReportType from .kpi_service import KPICalculationService logger = logging.getLogger(__name__) @shared_task(bind=True, ignore_result=True) def calculate_daily_kpis(self): today = timezone.now() year = today.year month = today.month hospitals = Hospital.objects.filter(status="active") report_types = [rt[0] for rt in KPIReportType.choices] for hospital in hospitals: for report_type in report_types: try: KPICalculationService.generate_monthly_report( report_type=report_type, hospital=hospital, year=year, month=month, ) logger.info(f"Daily KPI calculated: {report_type} for {hospital.name} ({year}-{month:02d})") except Exception as e: logger.exception(f"Failed daily KPI: {report_type} for {hospital.name}: {e}") # ============================================================================= # AI-Powered Analytics — Async Celery Tasks # ============================================================================= @shared_task(bind=True, ignore_result=True, max_retries=2, default_retry_delay=60) def generate_executive_summary_task( self, user_id=None, hospital_id=None, department_id=None, period="30d", force_refresh=False ): """ Async task: Generate AI executive summary and cache it. Called by Celery beat daily, or manually when user forces refresh. """ from apps.analytics.services.ai_analytics import ExecutiveSummaryGenerator # Build a mock user-like object for role scoping (admin sees all) class _User: def __init__(self, uid): self.id = uid self.hospital = None self.department = None def is_px_admin(self): return True user = _User(user_id) if user_id else _User("system") try: result = ExecutiveSummaryGenerator.generate( user, hospital_id=hospital_id, department_id=department_id, period=period, force_refresh=force_refresh, ) logger.info( f"Executive summary generated: {period} for hospital={hospital_id}, " f"risk={result.get('risk_level', 'unknown')}" ) except Exception as e: logger.exception(f"Failed to generate executive summary: {e}") self.retry(exc=e) @shared_task(bind=True, ignore_result=True, max_retries=2, default_retry_delay=60) def generate_action_recommendations_task(self, user_id=None, hospital_id=None, department_id=None): """ Async task: Generate AI action recommendations from complaint clusters. """ from apps.analytics.services.ai_analytics import ActionRecommendationEngine class _User: def __init__(self, uid): self.id = uid self.hospital = None self.department = None def is_px_admin(self): return True user = _User(user_id) if user_id else _User("system") try: results = ActionRecommendationEngine.generate_recommendations( user, hospital_id=hospital_id, department_id=department_id, limit=5 ) logger.info(f"Action recommendations generated: {len(results)} recommendations") except Exception as e: logger.exception(f"Failed to generate action recommendations: {e}") self.retry(exc=e) @shared_task(bind=True, ignore_result=True) def precompute_dashboard_cache_task(self): """ Async task: Pre-compute all cacheable analytics data for all active hospitals. Run daily at 3 AM. Users can trigger on-demand refresh via dashboard button. """ from apps.analytics.services.ai_analytics import ( ExecutiveSummaryGenerator, EarlyWarningSystem, ComplaintVolumeForecaster, SLABreachPredictor, ActionRecommendationEngine, VisitEfficiencyAnalyzer, ) from django.contrib.auth import get_user_model User = get_user_model() admin_users = User.objects.filter(is_active=True, role="px_admin") if not admin_users.exists(): # Fallback: use first superuser admin_users = User.objects.filter(is_active=True, is_superuser=True) hospitals = list(Hospital.objects.filter(status="active")) for admin in admin_users[:3]: # limit to first 3 admins for hospital in hospitals: try: ExecutiveSummaryGenerator.generate(admin, hospital_id=str(hospital.id), period="30d") EarlyWarningSystem.detect(admin, hospital_id=str(hospital.id), limit=5) ComplaintVolumeForecaster.forecast(admin, hospital_id=str(hospital.id), forecast_days=30) SLABreachPredictor.predict(admin, hospital_id=str(hospital.id), limit=10) ActionRecommendationEngine.generate_recommendations(admin, hospital_id=str(hospital.id), limit=5) VisitEfficiencyAnalyzer.analyze(admin, hospital_id=str(hospital.id)) logger.info(f"Precomputed analytics for admin={admin.id}, hospital={hospital.id}") except Exception as e: logger.exception(f"Failed to precompute for admin={admin.id}, hospital={hospital.id}: {e}") # ============================================================================= # Hourly AI Analytics Pre-computation Tasks # ============================================================================= def _get_system_admin(): """Return a mock admin user for background tasks.""" class _User: def __init__(self, uid): self.id = uid self.hospital = None self.department = None def is_px_admin(self): return True return _User("system") @shared_task(bind=True, ignore_result=True, max_retries=2, default_retry_delay=60) def precompute_executive_summary_task(self, hospital_id=None, period="30d"): """Pre-compute executive summary for a single hospital.""" from apps.analytics.services.ai_analytics import ExecutiveSummaryGenerator user = _get_system_admin() try: result = ExecutiveSummaryGenerator.generate(user, hospital_id=hospital_id, period=period, force_refresh=True) logger.info( f"Precomputed executive summary: period={period} hospital={hospital_id} " f"risk={result.get('risk_level', 'unknown')}" ) except Exception as e: logger.exception(f"Failed precompute_executive_summary_task: {e}") self.retry(exc=e) @shared_task(bind=True, ignore_result=True, max_retries=2, default_retry_delay=60) def precompute_early_warning_task(self, hospital_id=None, limit=5): """Pre-compute early warning system for a single hospital.""" from apps.analytics.services.ai_analytics import EarlyWarningSystem user = _get_system_admin() try: results = EarlyWarningSystem.detect(user, hospital_id=hospital_id, limit=limit) logger.info(f"Precomputed early warnings: hospital={hospital_id} count={len(results)}") except Exception as e: logger.exception(f"Failed precompute_early_warning_task: {e}") self.retry(exc=e) @shared_task(bind=True, ignore_result=True, max_retries=2, default_retry_delay=60) def precompute_complaint_forecast_task(self, hospital_id=None, forecast_days=30): """Pre-compute complaint volume forecast for a single hospital.""" from apps.analytics.services.ai_analytics import ComplaintVolumeForecaster user = _get_system_admin() try: result = ComplaintVolumeForecaster.forecast(user, hospital_id=hospital_id, forecast_days=forecast_days) logger.info(f"Precomputed complaint forecast: hospital={hospital_id}") except Exception as e: logger.exception(f"Failed precompute_complaint_forecast_task: {e}") self.retry(exc=e) @shared_task(bind=True, ignore_result=True, max_retries=2, default_retry_delay=60) def precompute_sla_breach_prediction_task(self, hospital_id=None, limit=10): """Pre-compute SLA breach predictions for a single hospital.""" from apps.analytics.services.ai_analytics import SLABreachPredictor user = _get_system_admin() try: results = SLABreachPredictor.predict(user, hospital_id=hospital_id, limit=limit) logger.info(f"Precomputed SLA breach predictions: hospital={hospital_id} count={len(results)}") except Exception as e: logger.exception(f"Failed precompute_sla_breach_prediction_task: {e}") self.retry(exc=e) @shared_task(bind=True, ignore_result=True, max_retries=2, default_retry_delay=60) def precompute_action_recommendations_task(self, hospital_id=None, limit=5): """Pre-compute action recommendations for a single hospital.""" from apps.analytics.services.ai_analytics import ActionRecommendationEngine user = _get_system_admin() try: results = ActionRecommendationEngine.generate_recommendations(user, hospital_id=hospital_id, limit=limit) logger.info(f"Precomputed action recommendations: hospital={hospital_id} count={len(results)}") except Exception as e: logger.exception(f"Failed precompute_action_recommendations_task: {e}") self.retry(exc=e) @shared_task(bind=True, ignore_result=True, max_retries=2, default_retry_delay=60) def precompute_visit_efficiency_task(self, hospital_id=None): """Pre-compute visit efficiency analysis for a single hospital.""" from apps.analytics.services.ai_analytics import VisitEfficiencyAnalyzer user = _get_system_admin() try: result = VisitEfficiencyAnalyzer.analyze(user, hospital_id=hospital_id) logger.info( f"Precomputed visit efficiency: hospital={hospital_id} " f"score={result.get('efficiency_score', 'N/A')}" ) except Exception as e: logger.exception(f"Failed precompute_visit_efficiency_task: {e}") self.retry(exc=e) @shared_task(bind=True, ignore_result=True) def precompute_all_ai_analytics_task(self): """ Master task: pre-compute ALL AI analytics for every active hospital every hour. Dispatches sub-tasks per hospital so failures are isolated. """ hospitals = list(Hospital.objects.filter(status="active").values_list("id", flat=True)) for hid in hospitals: hospital_id = str(hid) precompute_executive_summary_task.delay(hospital_id=hospital_id, period="30d") precompute_early_warning_task.delay(hospital_id=hospital_id, limit=5) precompute_complaint_forecast_task.delay(hospital_id=hospital_id, forecast_days=30) precompute_sla_breach_prediction_task.delay(hospital_id=hospital_id, limit=10) precompute_action_recommendations_task.delay(hospital_id=hospital_id, limit=5) precompute_visit_efficiency_task.delay(hospital_id=hospital_id) logger.info(f"Dispatched hourly AI precompute tasks for hospital={hospital_id}") # Also run for global (no hospital filter) precompute_executive_summary_task.delay(hospital_id=None, period="30d") precompute_early_warning_task.delay(hospital_id=None, limit=5) precompute_complaint_forecast_task.delay(hospital_id=None, forecast_days=30) precompute_sla_breach_prediction_task.delay(hospital_id=None, limit=10) precompute_action_recommendations_task.delay(hospital_id=None, limit=5) precompute_visit_efficiency_task.delay(hospital_id=None) logger.info("Dispatched hourly AI precompute tasks for global scope")