ismail c5f76b3855
Some checks are pending
Build and Push Docker Image / build (push) Waiting to run
updates
2026-05-11 14:45:30 +03:00

646 lines
26 KiB
Python

"""
Observations Celery tasks - SLA tracking and notifications.
This module implements tasks for:
- Checking overdue observations
- Sending SLA reminder emails
- Escalation handling
"""
import logging
from celery import shared_task
from django.utils import timezone
logger = logging.getLogger(__name__)
@shared_task
def check_overdue_observations():
"""
Periodic task to check for overdue observations.
Runs every 15 minutes (configured in config/celery.py).
Updates is_overdue flag and sets breached_at for observations past their SLA deadline.
"""
from apps.observations.models import Observation, ObservationStatus
active_statuses = [
ObservationStatus.NEW,
ObservationStatus.TRIAGED,
ObservationStatus.ASSIGNED,
ObservationStatus.IN_PROGRESS,
]
active_observations = Observation.objects.filter(
status__in=active_statuses,
due_at__isnull=False,
).select_related("hospital", "assigned_department")
overdue_count = 0
for observation in active_observations:
if observation.check_overdue():
overdue_count += 1
logger.warning(
f"Observation {observation.id} is overdue: {observation.tracking_code} (due: {observation.due_at})"
)
if overdue_count > 0:
logger.info(f"Found {overdue_count} overdue observations")
return {"overdue_count": overdue_count}
@shared_task
def send_observation_sla_reminders():
"""
Periodic task to send SLA reminder emails for observations approaching deadline.
Runs every hour (configured in config/celery.py).
"""
from django.conf import settings
from django.template.loader import render_to_string
from apps.observations.models import Observation, ObservationStatus, ObservationSLAConfig
from apps.notifications.services import NotificationService
now = timezone.now()
active_statuses = [
ObservationStatus.NEW,
ObservationStatus.TRIAGED,
ObservationStatus.ASSIGNED,
ObservationStatus.IN_PROGRESS,
]
active_observations = Observation.objects.filter(
status__in=active_statuses,
due_at__isnull=False,
is_overdue=False,
).select_related("hospital", "assigned_to", "category")
first_reminder_count = 0
second_reminder_count = 0
for observation in active_observations:
config = observation.get_sla_config()
if not config:
continue
first_reminder_hours = config.get_first_reminder_hours_after()
second_reminder_hours = config.get_second_reminder_hours_after()
if first_reminder_hours > 0 and observation.reminder_sent_at is None:
hours_since_creation = (now - observation.created_at).total_seconds() / 3600
if hours_since_creation >= first_reminder_hours:
if observation.assigned_to and observation.assigned_to.email:
try:
hours_remaining = (observation.due_at - now).total_seconds() / 3600
hours_remaining = max(0, round(hours_remaining))
plain_message = (
f"Observation '{observation.title or observation.description[:50]}' "
f"(tracking code: {observation.tracking_code}) is due at "
f"{observation.due_at.strftime('%Y-%m-%d %H:%M')}. "
f"Time remaining: {hours_remaining} hours. Please take action."
)
try:
from django.contrib.sites.models import Site
site_url = Site.objects.get_current().domain
except Exception:
site_url = getattr(settings, "SITE_URL", "")
context = {
"recipient_name": observation.assigned_to.get_full_name(),
"observation": observation,
"observation_url": f"{site_url}/observations/{observation.id}/",
"site_url": site_url,
"due_date": observation.due_at,
"hours_remaining": hours_remaining,
"is_unassigned": False,
}
html_message = render_to_string("emails/observation_sla_reminder.html", context)
NotificationService.send_email(
email=observation.assigned_to.email,
subject=f"SLA Reminder - Observation {observation.tracking_code}",
message=plain_message,
html_message=html_message,
related_object=observation,
)
observation.reminder_sent_at = now
observation.save(update_fields=["reminder_sent_at"])
first_reminder_count += 1
except Exception as e:
logger.error(f"Failed to send observation reminder: {e}")
if second_reminder_hours > 0 and observation.second_reminder_sent_at is None:
hours_since_creation = (now - observation.created_at).total_seconds() / 3600
if hours_since_creation >= second_reminder_hours:
if observation.assigned_to and observation.assigned_to.email:
try:
hours_remaining = (observation.due_at - now).total_seconds() / 3600
hours_remaining = max(0, round(hours_remaining))
plain_message = (
f"URGENT: Observation '{observation.title or observation.description[:50]}' "
f"(tracking code: {observation.tracking_code}) is due at "
f"{observation.due_at.strftime('%Y-%m-%d %H:%M')}. "
f"Time remaining: {hours_remaining} hours. Immediate action required."
)
try:
from django.contrib.sites.models import Site
site_url = Site.objects.get_current().domain
except Exception:
site_url = getattr(settings, "SITE_URL", "")
context = {
"recipient_name": observation.assigned_to.get_full_name(),
"observation": observation,
"observation_url": f"{site_url}/observations/{observation.id}/",
"site_url": site_url,
"due_date": observation.due_at,
"hours_remaining": hours_remaining,
}
html_message = render_to_string("emails/observation_sla_second_reminder.html", context)
NotificationService.send_email(
email=observation.assigned_to.email,
subject=f"URGENT: SLA Reminder - Observation {observation.tracking_code}",
message=plain_message,
html_message=html_message,
related_object=observation,
)
observation.second_reminder_sent_at = now
observation.save(update_fields=["second_reminder_sent_at"])
second_reminder_count += 1
except Exception as e:
logger.error(f"Failed to send second observation reminder: {e}")
logger.info(
f"Sent {first_reminder_count} first reminders and {second_reminder_count} second reminders for observations"
)
return {
"first_reminder_count": first_reminder_count,
"second_reminder_count": second_reminder_count,
}
@shared_task
def schedule_monthly_followups():
from apps.observations.models import Observation, ObservationStatus
now = timezone.now()
resolved_observations = Observation.objects.filter(
status__in=[ObservationStatus.RESOLVED, ObservationStatus.CLOSED],
monthly_follow_up_due_at__isnull=True,
resolved_at__isnull=False,
)
scheduled = 0
for obs in resolved_observations:
obs.monthly_follow_up_due_at = obs.resolved_at + __import__("datetime").timedelta(days=30)
obs.save(update_fields=["monthly_follow_up_due_at"])
scheduled += 1
logger.info(f"Scheduled {scheduled} monthly observation follow-ups")
return {"scheduled": scheduled}
@shared_task
def process_monthly_followups():
from django.conf import settings
from django.template.loader import render_to_string
from apps.observations.models import Observation, ObservationStatus
from apps.notifications.services import NotificationService
now = timezone.now()
due_observations = Observation.objects.filter(
status__in=[ObservationStatus.RESOLVED, ObservationStatus.CLOSED],
monthly_follow_up_due_at__lte=now,
monthly_follow_up_completed_at__isnull=True,
).select_related("hospital", "assigned_to", "assigned_department", "category")
notified = 0
for obs in due_observations:
if obs.assigned_to and obs.assigned_to.email:
try:
plain_message = (
f"Observation '{obs.title or obs.description[:50]}' "
f"(tracking code: {obs.tracking_code}) requires monthly follow-up review. "
f"Please verify that the issue has been fully addressed and sustained."
)
try:
from django.contrib.sites.models import Site
site_url = Site.objects.get_current().domain
except Exception:
site_url = getattr(settings, "SITE_URL", "")
context = {
"recipient_name": obs.assigned_to.get_full_name(),
"observation": obs,
"observation_url": f"{site_url}/observations/{obs.id}/",
"site_url": site_url,
}
html_message = render_to_string("emails/observation_monthly_followup.html", context)
NotificationService.send_email(
email=obs.assigned_to.email,
subject=f"Monthly Follow-Up Due: Observation {obs.tracking_code}",
message=plain_message,
html_message=html_message,
related_object=obs,
)
notified += 1
except Exception as e:
logger.error(f"Failed to send follow-up notification for observation {obs.id}: {e}")
logger.info(f"Sent {notified} monthly follow-up notifications for observations")
return {"notified": notified}
@shared_task
def send_new_observation_notification(observation_id):
"""
Send notification for a new observation to PX360 triage team.
Runs as a background task to avoid blocking the observation creation response.
"""
from apps.observations.models import Observation
from apps.observations.services import ObservationService
try:
observation = Observation.objects.get(id=observation_id)
ObservationService.notify_new_observation(observation)
except Observation.DoesNotExist:
logger.error(f"Observation {observation_id} not found for notification")
@shared_task
def analyze_observation_with_ai(observation_id):
"""
Analyze an observation using AI to determine severity, category, and generate
bilingual summaries and suggested PX actions.
This task is triggered when an observation is created.
It uses the AI service to analyze the observation content and classify it.
Args:
observation_id: UUID of the Observation
Returns:
dict: Result with severity, category, suggested actions, and summaries
"""
from apps.observations.models import Observation, ObservationCategory, ObservationNote
from apps.core.ai_service import AIService, AIServiceError
try:
observation = Observation.objects.select_related("hospital").get(id=observation_id)
logger.info(f"Starting AI analysis for observation {observation_id}")
try:
analysis = AIService.analyze_observation(
title=observation.title or None,
description=observation.description,
hospital_id=observation.hospital.id if observation.hospital else None,
)
emotion_analysis = AIService.analyze_emotion(text=observation.description)
old_severity = observation.severity
old_category = observation.category
# Update severity from AI
observation.severity = analysis["severity"]
# Match AI category to ObservationCategory
ai_category_name = analysis.get("category", "")
if ai_category_name:
category = ObservationCategory.objects.filter(name_en__iexact=ai_category_name, is_active=True).first()
if not category:
category = ObservationCategory.objects.filter(
name_en__icontains=ai_category_name, is_active=True
).first()
if category:
observation.category = category
logger.info(f"AI matched observation category: {category.name_en}")
else:
logger.warning(f"AI suggested category '{ai_category_name}' but no match found")
# Update title from AI
if analysis.get("title_en"):
observation.title = analysis["title_en"]
elif analysis.get("title"):
observation.title = analysis["title"]
# Store full analysis in metadata
if not observation.metadata:
observation.metadata = {}
old_category_name = old_category.name_en if old_category else None
observation.metadata["ai_analysis"] = {
"title_en": analysis.get("title_en", ""),
"title_ar": analysis.get("title_ar", ""),
"brief_summary_en": analysis.get("brief_summary_en", ""),
"brief_summary_ar": analysis.get("brief_summary_ar", ""),
"short_description_en": analysis.get("short_description_en", ""),
"short_description_ar": analysis.get("short_description_ar", ""),
"suggested_actions": analysis.get("suggested_actions", []),
"suggested_action_en": analysis.get("suggested_action_en", ""),
"suggested_action_ar": analysis.get("suggested_action_ar", ""),
"severity": analysis.get("severity", ""),
"category": analysis.get("category", ""),
"department": analysis.get("department", ""),
"reasoning_en": analysis.get("reasoning_en", ""),
"reasoning_ar": analysis.get("reasoning_ar", ""),
"emotion": emotion_analysis.get("emotion", "neutral"),
"emotion_intensity": emotion_analysis.get("intensity", 0.0),
"emotion_confidence": emotion_analysis.get("confidence", 0.0),
"analyzed_at": timezone.now().isoformat(),
"old_severity": old_severity,
"old_category": old_category_name,
}
taxonomy_mapping = analysis.get("taxonomy_mapping", {})
if taxonomy_mapping:
if taxonomy_mapping.get("domain"):
observation.taxonomy_domain_id = taxonomy_mapping["domain"].get("id")
if taxonomy_mapping.get("category"):
observation.taxonomy_category_id = taxonomy_mapping["category"].get("id")
if taxonomy_mapping.get("subcategory"):
observation.taxonomy_subcategory_id = taxonomy_mapping["subcategory"].get("id")
if taxonomy_mapping.get("classification"):
observation.taxonomy_classification_id = taxonomy_mapping["classification"].get("id")
observation.metadata["taxonomy"] = analysis.get("taxonomy")
observation.save(
update_fields=[
"severity",
"category",
"title",
"metadata",
"taxonomy_domain",
"taxonomy_category",
"taxonomy_subcategory",
"taxonomy_classification",
]
)
# Create timeline note with bilingual AI completion message
emotion_display = emotion_analysis.get("emotion", "neutral")
category_display = observation.category.name_en if observation.category else analysis.get("category", "N/A")
message_en = (
f"AI analysis complete: Severity={analysis['severity']}, "
f"Category={category_display}, "
f"Emotion={emotion_display}"
)
message_ar = (
f"اكتمل تحليل الذكاء الاصطناعي: الشدة={analysis['severity']}, "
f"الفئة={category_display}, "
f"العاطفة={emotion_display}"
)
ObservationNote.objects.create(
observation=observation,
note=f"{message_en}\n{message_ar}",
is_internal=True,
created_by=None,
)
logger.info(f"AI analysis completed for observation {observation_id}")
return {
"status": "success",
"observation_id": str(observation_id),
"severity": analysis.get("severity"),
"category": analysis.get("category"),
"suggested_actions_count": len(analysis.get("suggested_actions", [])),
}
except AIServiceError as e:
logger.error(f"AI analysis failed for observation {observation_id}: {e}")
if not observation.metadata:
observation.metadata = {}
observation.metadata["ai_analysis"] = {
"error": str(e),
"analyzed_at": timezone.now().isoformat(),
"reasoning_en": f"AI service unavailable: {str(e)}",
"reasoning_ar": f"خدمة الذكاء الاصطناعي غير متوفرة: {str(e)}",
}
observation.save(update_fields=["metadata"])
return {"status": "error", "reason": f"AI service error: {str(e)}"}
except Observation.DoesNotExist:
logger.error(f"Observation {observation_id} not found for AI analysis")
return {"status": "error", "reason": "Observation not found"}
except Exception as e:
logger.error(f"Unexpected error analyzing observation {observation_id}: {e}", exc_info=True)
return {"status": "error", "reason": str(e)}
@shared_task
def check_overdue_observation_dept_responses():
"""
Periodic task to check for overdue department responses on observations.
Runs every 15 minutes. Flags observations where department response SLA has passed.
Auto-escalates to department manager if configured.
"""
from apps.observations.models import Observation, ObservationSLAConfig, ObservationStatusLog, ObservationNote
from apps.notifications.services import NotificationService
now = timezone.now()
pending_observations = Observation.objects.filter(
dept_response_sla_due_at__isnull=False,
dept_response_sla_due_at__lte=now,
department_responded_at__isnull=True,
).select_related("hospital", "assigned_department")
overdue_count = 0
escalated_count = 0
for observation in pending_observations:
if not observation.dept_response_is_overdue:
observation.dept_response_is_overdue = True
observation.save(update_fields=["dept_response_is_overdue"])
overdue_count += 1
logger.warning(f"Observation {observation.tracking_code} dept response is overdue")
dept = observation.assigned_department
if not dept:
continue
sla_config = observation.get_sla_config()
if not sla_config or not sla_config.dept_response_auto_escalate_enabled:
continue
escalation_delay = sla_config.dept_response_escalation_hours_overdue
if escalation_delay > 0:
escalation_threshold = observation.dept_response_sla_due_at + __import__(
"datetime"
).timedelta(hours=escalation_delay)
if now < escalation_threshold:
continue
if observation.dept_response_escalated_at:
continue
if not (
observation.dept_response_reminder_sent_at
or observation.dept_response_second_reminder_sent_at
):
continue
if dept.manager and dept.manager.email:
try:
NotificationService.send_email(
email=dept.manager.email,
subject=f"ESCALATION: Observation {observation.tracking_code} - Department Response Overdue",
message=(
f"The department response for observation {observation.tracking_code} "
f"is overdue. The response deadline was "
f"{observation.dept_response_sla_due_at.strftime('%Y-%m-%d %H:%M')}. "
f"Please ensure the department submits a response immediately."
),
related_object=observation,
)
except Exception as e:
logger.error(f"Failed to send escalation email: {e}")
observation.dept_response_escalated_at = now
observation.save(update_fields=["dept_response_escalated_at"])
escalated_count += 1
ObservationStatusLog.objects.create(
observation=observation,
from_status="",
to_status=observation.status,
changed_by=None,
comment=f"Department response SLA escalated to {dept.get_localized_name()} manager",
)
ObservationNote.objects.create(
observation=observation,
note=f"Department response SLA escalated to {dept.get_localized_name()} manager",
created_by=None,
is_internal=True,
)
if overdue_count > 0 or escalated_count > 0:
logger.info(
f"Observation dept response: {overdue_count} overdue, {escalated_count} escalated"
)
return {"overdue_count": overdue_count, "escalated_count": escalated_count}
@shared_task
def send_observation_dept_response_reminders():
"""
Periodic task to send SLA reminder emails for department responses on observations.
Runs every hour. Sends first and second reminders based on config.
"""
from apps.observations.models import Observation, ObservationSLAConfig
from apps.notifications.services import NotificationService
now = timezone.now()
pending_observations = Observation.objects.filter(
dept_response_sla_due_at__isnull=False,
department_responded_at__isnull=True,
dept_response_is_overdue=False,
).select_related("hospital", "assigned_department")
first_reminder_count = 0
second_reminder_count = 0
for observation in pending_observations:
sla_config = observation.get_sla_config()
if not sla_config:
continue
dept = observation.assigned_department
if not dept:
continue
recipients = []
if dept.respondent and dept.respondent.user and dept.respondent.user.email:
recipients.append(dept.respondent.user)
if not recipients:
continue
hours_remaining = (observation.dept_response_sla_due_at - now).total_seconds() / 3600
if (
observation.dept_response_reminder_sent_at is None
and hours_remaining <= sla_config.dept_response_reminder_hours_before
and hours_remaining > 0
):
for recipient in recipients:
try:
NotificationService.send_email(
email=recipient.email,
subject=f"Reminder: Observation {observation.tracking_code} - Response Required",
message=(
f"This is a reminder that observation {observation.tracking_code} "
f"is awaiting your department's response. "
f"Time remaining: {max(0, round(hours_remaining))} hours. "
f"Please submit your response before the deadline."
),
related_object=observation,
)
except Exception as e:
logger.error(f"Failed to send observation dept reminder: {e}")
observation.dept_response_reminder_sent_at = now
observation.save(update_fields=["dept_response_reminder_sent_at"])
first_reminder_count += 1
elif (
sla_config.dept_response_second_reminder_enabled
and observation.dept_response_reminder_sent_at is not None
and observation.dept_response_second_reminder_sent_at is None
and hours_remaining <= sla_config.dept_response_second_reminder_hours_before
and hours_remaining > 0
):
for recipient in recipients:
try:
NotificationService.send_email(
email=recipient.email,
subject=f"URGENT: Observation {observation.tracking_code} - Response Overdue Soon",
message=(
f"URGENT: Observation {observation.tracking_code} "
f"response deadline is approaching. "
f"Time remaining: {max(0, round(hours_remaining))} hours. "
f"Please submit your response immediately."
),
related_object=observation,
)
except Exception as e:
logger.error(f"Failed to send observation dept 2nd reminder: {e}")
observation.dept_response_second_reminder_sent_at = now
observation.save(update_fields=["dept_response_second_reminder_sent_at"])
second_reminder_count += 1
logger.info(
f"Observation dept response reminders: {first_reminder_count} first, {second_reminder_count} second"
)
return {
"first_reminder_count": first_reminder_count,
"second_reminder_count": second_reminder_count,
}