ismail 7dae32d206
All checks were successful
Build and Push Docker Image / build (push) Successful in 2m15s
fix: replace pytz with zoneinfo, fix predictive insights list/dict bug
2026-05-12 01:32:41 +03:00

424 lines
15 KiB
Python

"""
Executive Summary Celery Tasks
Scheduled tasks for:
- Generating weekly/monthly executive reports
- Calculating daily metrics
- Generating predictive insights
- Creating AI recommendations
- Sending PDF reports via email
"""
from celery import shared_task
from django.utils import timezone
from django.core.files.base import ContentFile
from celery import shared_task
from datetime import timedelta
import logging
logger = logging.getLogger(__name__)
@shared_task
def generate_weekly_executive_summary():
"""
Generate weekly executive summary report.
Runs every Monday at 6 AM.
Creates an ExecutiveReport with AI-generated narrative for the past week.
"""
from .models import ExecutiveReport
from .services import AINarrativeService
try:
logger.info("Starting weekly executive summary generation")
end_date = timezone.now().date()
start_date = end_date - timedelta(days=7)
# Check if report already exists
existing = ExecutiveReport.objects.filter(
report_type="weekly",
start_date=start_date,
end_date=end_date,
).first()
if existing:
logger.info(f"Weekly report already exists for {start_date} to {end_date}")
return {"status": "skipped", "report_id": str(existing.id)}
# Generate narratives using AI
narrative_service = AINarrativeService()
# Generate English narrative
logger.info("Generating English narrative...")
en_result = narrative_service.generate_weekly_narrative(
start_date=start_date,
end_date=end_date,
)
# Generate Arabic narrative
logger.info("Generating Arabic narrative...")
ar_result = narrative_service.generate_arabic_narrative(
start_date=start_date,
end_date=end_date,
report_type="weekly",
)
# Create report
report = ExecutiveReport.objects.create(
report_type="weekly",
status=en_result.get("status", "completed"),
start_date=start_date,
end_date=end_date,
narrative_en=en_result.get("narrative_en", ""),
narrative_ar=ar_result.get("narrative_ar", ""),
highlights_en=en_result.get("highlights_en", []),
highlights_ar=ar_result.get("highlights_ar", []),
concerns_en=en_result.get("concerns_en", []),
concerns_ar=ar_result.get("concerns_ar", []),
ai_model=en_result.get("ai_model", ""),
generation_time_ms=en_result.get("generation_time_ms"),
error_message=en_result.get("error", "") or ar_result.get("error", ""),
)
# Generate and save PDF
try:
from .pdf_service import generate_executive_pdf
pdf_bytes = generate_executive_pdf(report)
filename = f"executive_report_{report.id}_{report.start_date}.pdf"
report.pdf_file.save(filename, ContentFile(pdf_bytes), save=True)
logger.info(f"PDF saved for report {report.id}")
except Exception as pdf_err:
logger.error(f"Failed to generate PDF for report {report.id}: {pdf_err}", exc_info=True)
logger.info(f"Weekly executive summary generated: {report.id}")
return {
"status": "success",
"report_id": str(report.id),
"report_type": "weekly",
}
except Exception as e:
logger.error(f"Failed to generate weekly executive summary: {str(e)}", exc_info=True)
return {"status": "error", "error": str(e)}
@shared_task
def generate_monthly_executive_summary():
"""
Generate monthly executive summary report.
Runs on 1st of each month at 7 AM.
Creates an ExecutiveReport with AI-generated narrative for the past month.
"""
from .models import ExecutiveReport
from .services import AINarrativeService
try:
logger.info("Starting monthly executive summary generation")
end_date = timezone.now().date()
start_date = end_date - timedelta(days=30)
# Check if report already exists
existing = ExecutiveReport.objects.filter(
report_type="monthly",
start_date=start_date,
end_date=end_date,
).first()
if existing:
logger.info(f"Monthly report already exists for {start_date} to {end_date}")
return {"status": "skipped", "report_id": str(existing.id)}
# Generate narratives using AI
narrative_service = AINarrativeService()
# Generate English narrative
logger.info("Generating English narrative...")
en_result = narrative_service.generate_monthly_narrative(
start_date=start_date,
end_date=end_date,
)
# Generate Arabic narrative
logger.info("Generating Arabic narrative...")
ar_result = narrative_service.generate_arabic_narrative(
start_date=start_date,
end_date=end_date,
report_type="monthly",
)
# Create report
report = ExecutiveReport.objects.create(
report_type="monthly",
status=en_result.get("status", "completed"),
start_date=start_date,
end_date=end_date,
narrative_en=en_result.get("narrative_en", ""),
narrative_ar=ar_result.get("narrative_ar", ""),
highlights_en=en_result.get("highlights_en", []),
highlights_ar=ar_result.get("highlights_ar", []),
concerns_en=en_result.get("concerns_en", []),
concerns_ar=ar_result.get("concerns_ar", []),
ai_model=en_result.get("ai_model", ""),
generation_time_ms=en_result.get("generation_time_ms"),
error_message=en_result.get("error", "") or ar_result.get("error", ""),
)
# Generate and save PDF
try:
from .pdf_service import generate_executive_pdf
pdf_bytes = generate_executive_pdf(report)
filename = f"executive_report_{report.id}_{report.start_date}.pdf"
report.pdf_file.save(filename, ContentFile(pdf_bytes), save=True)
logger.info(f"PDF saved for report {report.id}")
except Exception as pdf_err:
logger.error(f"Failed to generate PDF for report {report.id}: {pdf_err}", exc_info=True)
logger.info(f"Monthly executive summary generated: {report.id}")
return {
"status": "success",
"report_id": str(report.id),
"report_type": "monthly",
}
except Exception as e:
logger.error(f"Failed to generate monthly executive summary: {str(e)}", exc_info=True)
return {"status": "error", "error": str(e)}
@shared_task
def calculate_daily_metrics():
"""
Calculate and store daily metrics for all hospitals.
Runs daily at 1 AM.
Aggregates metrics from complaints, surveys, actions, observations, etc.
"""
from decimal import Decimal
from .models import ExecutiveMetric
from .services import ExecutiveSummaryService
from apps.organizations.models import Hospital
try:
logger.info("Starting daily metrics calculation")
target_date = (timezone.now() - timedelta(days=1)).date()
summary_service = ExecutiveSummaryService()
def _save_metrics(metrics, date, hospital=None):
saved = 0
for metric_type, metric_value in metrics.items():
try:
value = Decimal(str(metric_value))
except Exception:
continue
# Calculate simple variance vs previous day
previous = (
ExecutiveMetric.objects.filter(
metric_type=metric_type,
hospital=hospital,
metric_date__lt=date,
)
.order_by("-metric_date")
.first()
)
variance = None
variance_direction = "neutral"
if previous and previous.metric_value is not None:
prev_val = float(previous.metric_value)
curr_val = float(value)
if prev_val != 0:
variance = Decimal(str(((curr_val - prev_val) / prev_val) * 100))
else:
variance = Decimal("0")
if variance > 0:
variance_direction = "up"
elif variance < 0:
variance_direction = "down"
ExecutiveMetric.objects.update_or_create(
metric_date=date,
metric_type=metric_type,
hospital=hospital,
defaults={
"metric_value": value,
"variance": variance,
"variance_direction": variance_direction,
},
)
saved += 1
return saved
# Calculate system-wide metrics
logger.info(f"Calculating system-wide metrics for {target_date}")
metrics = summary_service.aggregate_daily_metrics(target_date, hospital=None)
system_saved = _save_metrics(metrics, target_date, hospital=None)
# Calculate per-hospital metrics
hospitals = Hospital.objects.filter(status="active")
total_hospital_saved = 0
for hospital in hospitals:
logger.info(f"Calculating metrics for {hospital.name}")
metrics = summary_service.aggregate_daily_metrics(target_date, hospital=hospital)
total_hospital_saved += _save_metrics(metrics, target_date, hospital=hospital)
logger.info(
f"Daily metrics calculated for {target_date}: {system_saved} system-wide, {total_hospital_saved} hospital-level"
)
return {
"status": "success",
"date": target_date.isoformat(),
"hospitals_processed": hospitals.count(),
"system_metrics_saved": system_saved,
"hospital_metrics_saved": total_hospital_saved,
}
except Exception as e:
logger.error(f"Failed to calculate daily metrics: {str(e)}", exc_info=True)
return {"status": "error", "error": str(e)}
@shared_task
def generate_predictive_insights():
"""
Generate predictive insights from data analysis.
Runs every 6 hours.
Analyzes trends, anomalies, SLA breach risks, and creates PredictiveInsight objects.
"""
from .services import PredictiveAnalyticsService
try:
logger.info("Starting predictive insights generation")
predictive_service = PredictiveAnalyticsService()
result = predictive_service.generate_predictive_insights()
insights_created = len(result) if isinstance(result, list) else result.get("insights_created", 0)
logger.info(f"Predictive insights generated: {insights_created} insights created")
return {
"status": "success",
"insights_created": insights_created,
"insights_updated": result.get("insights_updated", 0) if isinstance(result, dict) else 0,
}
except Exception as e:
logger.error(f"Failed to generate predictive insights: {str(e)}", exc_info=True)
return {"status": "error", "error": str(e)}
@shared_task
def generate_ai_recommendations():
"""
Generate AI recommendations from predictive insights.
Runs daily at 3 AM.
Creates AIRecommendation objects based on unhandled PredictiveInsights.
"""
from .services import RecommendationService
try:
logger.info("Starting AI recommendations generation")
recommendation_service = RecommendationService()
result = recommendation_service.generate_recommendations_from_insights()
logger.info(f"AI recommendations generated: {result.get('recommendations_created', 0)}")
return {
"status": "success",
"recommendations_created": result.get("recommendations_created", 0),
}
except Exception as e:
logger.error(f"Failed to generate AI recommendations: {str(e)}", exc_info=True)
return {"status": "error", "error": str(e)}
@shared_task
def send_executive_pdf_report():
"""
Send PDF reports to executives via email.
Runs weekly and monthly after report generation.
Emails the latest executive PDF report to all executive users.
"""
from django.core.mail import EmailMessage
from django.conf import settings
from .models import ExecutiveReport
from apps.accounts.models import User
try:
logger.info("Starting executive PDF report email delivery")
# Get latest completed report
latest_report = (
ExecutiveReport.objects.filter(status="completed", pdf_file__isnull=False).order_by("-created_at").first()
)
if not latest_report:
logger.info("No completed PDF report found to send")
return {"status": "skipped", "reason": "no_pdf_report"}
# Get executive users
executives = User.objects.filter(
groups__name="Executive",
is_active=True,
).distinct()
if not executives.exists():
logger.info("No executive users found to send report to")
return {"status": "skipped", "reason": "no_executives"}
# Prepare email
subject = f"Executive Summary Report - {latest_report.get_report_type_display()}"
body = f"""Dear Executive,
Please find attached the {latest_report.get_report_type_display().lower()} summary report for the period {latest_report.start_date} to {latest_report.end_date}.
Key Highlights:
{chr(10).join(f"{highlight}" for highlight in latest_report.highlights_en[:5])}
Key Concerns:
{chr(10).join(f"{concern}" for concern in latest_report.concerns_en[:5])}
This report was generated automatically by the PX360 AI system.
Best regards,
PX360 Team
"""
# Send emails
emails_sent = 0
for executive in executives:
if executive.email:
try:
email = EmailMessage(
subject=subject,
body=body,
to=[executive.email],
)
email.attach_file(latest_report.pdf_file.path)
email.send()
emails_sent += 1
logger.info(f"Report sent to {executive.email}")
except Exception as e:
logger.error(f"Failed to send report to {executive.email}: {str(e)}")
logger.info(f"PDF reports sent to {emails_sent} executives")
return {
"status": "success",
"emails_sent": emails_sent,
"report_id": str(latest_report.id),
}
except Exception as e:
logger.error(f"Failed to send executive PDF reports: {str(e)}", exc_info=True)
return {"status": "error", "error": str(e)}