HH/apps/complaints/tasks.py

1080 lines
44 KiB
Python

"""
Complaints Celery tasks
This module contains tasks for:
- Checking overdue complaints
- Sending SLA reminders
- Triggering resolution satisfaction surveys
- Creating PX actions from complaints
- AI-powered complaint analysis
"""
import logging
from typing import Optional, Dict, Any, Tuple
from celery import shared_task
from django.db import transaction
from django.db.models import Q
from django.utils import timezone
logger = logging.getLogger(__name__)
def match_staff_from_name(staff_name: str, hospital_id: str, department_name: Optional[str] = None, return_all: bool = False) -> Tuple[list, float, str]:
"""
Match staff member from extracted name using multiple matching strategies.
Args:
staff_name: Name extracted from complaint (without titles)
hospital_id: Hospital ID to search within
department_name: Optional department name to prioritize matching
return_all: If True, return all matching staff. If False, return single best match.
Returns:
If return_all=True: Tuple of (matches_list, confidence_score, matching_method)
- matches_list: List of dicts with matched staff details
- confidence_score: Float from 0.0 to 1.0 (best match confidence)
- matching_method: Description of how staff was matched
If return_all=False: Tuple of (staff_id, confidence_score, matching_method)
- staff_id: UUID of matched staff or None
- confidence_score: Float from 0.0 to 1.0
- matching_method: Description of how staff was matched
"""
from apps.organizations.models import Staff, Department
if not staff_name or not staff_name.strip():
return [], 0.0, "No staff name provided"
staff_name = staff_name.strip()
matches = []
# Build base query - staff from this hospital, active status
base_query = Staff.objects.filter(
hospital_id=hospital_id,
status='active'
)
# If department is specified, prioritize staff in that department
dept_id = None
if department_name:
department = Department.objects.filter(
hospital_id=hospital_id,
name__iexact=department_name,
status='active'
).first()
if department:
dept_id = department.id
# Layer 1: Exact English match (full name)
# Try "first_name last_name" format
words = staff_name.split()
if len(words) >= 2:
first_name = words[0]
last_name = ' '.join(words[1:])
# Exact match in correct department
exact_query = base_query.filter(
Q(first_name__iexact=first_name) & Q(last_name__iexact=last_name)
)
if dept_id:
exact_query = exact_query.filter(department_id=dept_id)
exact_matches = list(exact_query)
if exact_matches:
confidence = 0.95 if dept_id else 0.90
method = f"Exact English match in {'correct' if dept_id else 'any'} department"
for staff in exact_matches:
matches.append({
'id': str(staff.id),
'name_en': f"{staff.first_name} {staff.last_name}",
'name_ar': f"{staff.first_name_ar} {staff.last_name_ar}" if staff.first_name_ar and staff.last_name_ar else "",
'job_title': staff.job_title,
'specialization': staff.specialization,
'department': staff.department.name if staff.department else None,
'department_id': str(staff.department.id) if staff.department else None,
'confidence': confidence,
'matching_method': method
})
logger.info(f"Found {len(exact_matches)} exact English matches for: {staff_name}")
# Layer 2: Exact Arabic match
arabic_query = base_query.filter(
Q(first_name_ar__iexact=staff_name) | Q(last_name_ar__iexact=staff_name)
)
if dept_id:
arabic_query = arabic_query.filter(department_id=dept_id)
# Try full Arabic name match
for staff in arabic_query:
full_arabic_name = f"{staff.first_name_ar} {staff.last_name_ar}".strip()
if full_arabic_name == staff_name:
confidence = 0.95 if dept_id else 0.90
method = f"Exact Arabic match in {'correct' if dept_id else 'any'} department"
# Check if already in matches
if not any(m['id'] == str(staff.id) for m in matches):
matches.append({
'id': str(staff.id),
'name_en': f"{staff.first_name} {staff.last_name}",
'name_ar': f"{staff.first_name_ar} {staff.last_name_ar}",
'job_title': staff.job_title,
'specialization': staff.specialization,
'department': staff.department.name if staff.department else None,
'department_id': str(staff.department.id) if staff.department else None,
'confidence': confidence,
'matching_method': method
})
logger.info(f"Found Arabic match: {staff.first_name_ar} {staff.last_name_ar}")
# Layer 3: Partial match (first name or last name)
partial_query = base_query.filter(
Q(first_name__icontains=staff_name) |
Q(last_name__icontains=staff_name) |
Q(first_name_ar__icontains=staff_name) |
Q(last_name_ar__icontains=staff_name)
)
if dept_id:
partial_query = partial_query.filter(department_id=dept_id)
partial_matches = list(partial_query)
for staff in partial_matches:
# Check if already in matches
if not any(m['id'] == str(staff.id) for m in matches):
confidence = 0.70 if dept_id else 0.60
method = f"Partial match in {'correct' if dept_id else 'any'} department"
matches.append({
'id': str(staff.id),
'name_en': f"{staff.first_name} {staff.last_name}",
'name_ar': f"{staff.first_name_ar} {staff.last_name_ar}" if staff.first_name_ar and staff.last_name_ar else "",
'job_title': staff.job_title,
'specialization': staff.specialization,
'department': staff.department.name if staff.department else None,
'department_id': str(staff.department.id) if staff.department else None,
'confidence': confidence,
'matching_method': method
})
if partial_matches:
logger.info(f"Found {len(partial_matches)} partial matches for: {staff_name}")
# Layer 4: Fuzzy match using individual words
# Handle cases like "Dr. Ahmed" or "Nurse Sarah"
word_query = base_query.filter(
Q(first_name__in=words) | Q(first_name_ar__in=words)
)
if dept_id:
word_query = word_query.filter(department_id=dept_id)
word_matches = list(word_query)
for staff in word_matches:
# Check if already in matches
if not any(m['id'] == str(staff.id) for m in matches):
confidence = 0.50 if dept_id else 0.45
method = f"Word match in {'correct' if dept_id else 'any'} department"
matches.append({
'id': str(staff.id),
'name_en': f"{staff.first_name} {staff.last_name}",
'name_ar': f"{staff.first_name_ar} {staff.last_name_ar}" if staff.first_name_ar and staff.last_name_ar else "",
'job_title': staff.job_title,
'specialization': staff.specialization,
'department': staff.department.name if staff.department else None,
'department_id': str(staff.department.id) if staff.department else None,
'confidence': confidence,
'matching_method': method
})
if word_matches:
logger.info(f"Found {len(word_matches)} word matches for: {staff_name}")
# If return_all is False, return only the best match (highest confidence)
if not return_all:
if matches:
# Sort by confidence (descending)
matches.sort(key=lambda x: x['confidence'], reverse=True)
best_match = matches[0]
logger.info(
f"Best match: {best_match['name_en']} "
f"(confidence: {best_match['confidence']:.2f}, method: {best_match['matching_method']})"
)
return str(best_match['id']), best_match['confidence'], best_match['matching_method']
else:
logger.warning(f"No staff match found for name: {staff_name}")
return None, 0.0, "No match found"
# Return all matches
if matches:
# Sort by confidence (descending)
matches.sort(key=lambda x: x['confidence'], reverse=True)
best_confidence = matches[0]['confidence']
best_method = matches[0]['matching_method']
logger.info(f"Returning {len(matches)} matches for: {staff_name}")
return matches, best_confidence, best_method
else:
logger.warning(f"No staff match found for name: {staff_name}")
return [], 0.0, "No match found"
@shared_task
def check_overdue_complaints():
"""
Periodic task to check for overdue complaints.
Runs every 15 minutes (configured in config/celery.py).
Updates is_overdue flag for complaints past their SLA deadline.
Triggers automatic escalation based on escalation rules.
"""
from apps.complaints.models import Complaint, ComplaintStatus
# Get active complaints (not closed or cancelled)
active_complaints = Complaint.objects.filter(
status__in=[ComplaintStatus.OPEN, ComplaintStatus.IN_PROGRESS, ComplaintStatus.RESOLVED]
).select_related('hospital', 'patient', 'department')
overdue_count = 0
escalated_count = 0
for complaint in active_complaints:
if complaint.check_overdue():
overdue_count += 1
logger.warning(
f"Complaint {complaint.id} is overdue: {complaint.title} "
f"(due: {complaint.due_at})"
)
# Trigger automatic escalation
result = escalate_complaint_auto.delay(str(complaint.id))
if result:
escalated_count += 1
if overdue_count > 0:
logger.info(f"Found {overdue_count} overdue complaints, triggered {escalated_count} escalations")
return {
'overdue_count': overdue_count,
'escalated_count': escalated_count
}
@shared_task
def send_complaint_resolution_survey(complaint_id):
"""
Send resolution satisfaction survey when complaint is closed.
This task is triggered when a complaint status changes to CLOSED.
Args:
complaint_id: UUID of the Complaint
Returns:
dict: Result with survey_instance_id
"""
from apps.complaints.models import Complaint
from apps.core.services import create_audit_log
from apps.surveys.models import SurveyInstance, SurveyTemplate
try:
complaint = Complaint.objects.select_related(
'patient', 'hospital'
).get(id=complaint_id)
# Check if survey already sent
if complaint.resolution_survey:
logger.info(f"Resolution survey already sent for complaint {complaint_id}")
return {'status': 'skipped', 'reason': 'already_sent'}
# Get resolution satisfaction survey template
try:
survey_template = SurveyTemplate.objects.get(
hospital=complaint.hospital,
survey_type='complaint_resolution',
is_active=True
)
except SurveyTemplate.DoesNotExist:
logger.warning(
f"No resolution satisfaction survey template found for hospital {complaint.hospital.name}"
)
return {'status': 'skipped', 'reason': 'no_template'}
# Create survey instance
with transaction.atomic():
survey_instance = SurveyInstance.objects.create(
survey_template=survey_template,
patient=complaint.patient,
encounter_id=complaint.encounter_id,
delivery_channel='sms', # Default
recipient_phone=complaint.patient.phone,
recipient_email=complaint.patient.email,
metadata={
'complaint_id': str(complaint.id),
'complaint_title': complaint.title
}
)
# Link survey to complaint
complaint.resolution_survey = survey_instance
complaint.resolution_survey_sent_at = timezone.now()
complaint.save(update_fields=['resolution_survey', 'resolution_survey_sent_at'])
# Send survey
from apps.notifications.services import NotificationService
notification_log = NotificationService.send_survey_invitation(
survey_instance=survey_instance,
language='en' # TODO: Get from patient preference
)
# Update survey status
survey_instance.status = 'active'
survey_instance.sent_at = timezone.now()
survey_instance.save(update_fields=['status', 'sent_at'])
# Log audit event
create_audit_log(
event_type='survey_sent',
description=f"Resolution satisfaction survey sent for complaint: {complaint.title}",
content_object=survey_instance,
metadata={
'complaint_id': str(complaint.id),
'survey_template': survey_template.name
}
)
logger.info(
f"Resolution satisfaction survey sent for complaint {complaint.id}"
)
return {
'status': 'sent',
'survey_instance_id': str(survey_instance.id),
'notification_log_id': str(notification_log.id)
}
except Complaint.DoesNotExist:
error_msg = f"Complaint {complaint_id} not found"
logger.error(error_msg)
return {'status': 'error', 'reason': error_msg}
except Exception as e:
error_msg = f"Error sending resolution survey: {str(e)}"
logger.error(error_msg, exc_info=True)
return {'status': 'error', 'reason': error_msg}
@shared_task
def check_resolution_survey_threshold(survey_instance_id, complaint_id):
"""
Check if resolution survey score breaches threshold and create PX Action if needed.
This task is triggered when a complaint resolution survey is completed.
Args:
survey_instance_id: UUID of the SurveyInstance
complaint_id: UUID of the Complaint
Returns:
dict: Result with action status
"""
from apps.complaints.models import Complaint, ComplaintThreshold
from apps.surveys.models import SurveyInstance
from apps.px_action_center.models import PXAction
from django.contrib.contenttypes.models import ContentType
try:
survey = SurveyInstance.objects.get(id=survey_instance_id)
complaint = Complaint.objects.select_related('hospital', 'patient').get(id=complaint_id)
# Get threshold for this hospital
try:
threshold = ComplaintThreshold.objects.get(
hospital=complaint.hospital,
threshold_type='resolution_survey_score',
is_active=True
)
except ComplaintThreshold.DoesNotExist:
logger.info(f"No resolution survey threshold configured for hospital {complaint.hospital.name_en}")
return {'status': 'no_threshold'}
# Check if threshold is breached
if threshold.check_threshold(survey.score):
logger.warning(
f"Resolution survey score {survey.score} breaches threshold {threshold.threshold_value} "
f"for complaint {complaint_id}"
)
# Create PX Action
complaint_ct = ContentType.objects.get_for_model(Complaint)
action = PXAction.objects.create(
title=f"Low Resolution Satisfaction: {complaint.title[:100]}",
description=(
f"Complaint resolution survey scored {survey.score}% "
f"(threshold: {threshold.threshold_value}%). "
f"Original complaint: {complaint.description[:200]}"
),
source='complaint_resolution_survey',
priority='high' if survey.score < 30 else 'medium',
hospital=complaint.hospital,
department=complaint.department,
patient=complaint.patient,
content_type=complaint_ct,
object_id=complaint.id,
metadata={
'complaint_id': str(complaint.id),
'survey_id': str(survey.id),
'survey_score': survey.score,
'threshold_value': threshold.threshold_value,
}
)
# Log audit
from apps.core.services import create_audit_log
create_audit_log(
event_type='px_action_created',
description=f"PX Action created from low resolution survey score",
content_object=action,
metadata={
'complaint_id': str(complaint.id),
'survey_score': survey.score,
'trigger': 'resolution_survey_threshold'
}
)
logger.info(f"Created PX Action {action.id} from low resolution survey score")
return {
'status': 'action_created',
'action_id': str(action.id),
'survey_score': survey.score,
'threshold': threshold.threshold_value
}
else:
logger.info(f"Resolution survey score {survey.score} is above threshold {threshold.threshold_value}")
return {'status': 'threshold_not_breached', 'survey_score': survey.score}
except SurveyInstance.DoesNotExist:
error_msg = f"SurveyInstance {survey_instance_id} not found"
logger.error(error_msg)
return {'status': 'error', 'reason': error_msg}
except Complaint.DoesNotExist:
error_msg = f"Complaint {complaint_id} not found"
logger.error(error_msg)
return {'status': 'error', 'reason': error_msg}
except Exception as e:
error_msg = f"Error checking resolution survey threshold: {str(e)}"
logger.error(error_msg, exc_info=True)
return {'status': 'error', 'reason': error_msg}
@shared_task
def create_action_from_complaint(complaint_id):
"""
Create PX Action from complaint (if configured).
This task is triggered when a complaint is created,
if the hospital configuration requires automatic action creation.
Args:
complaint_id: UUID of the Complaint
Returns:
dict: Result with action_id
"""
from apps.complaints.models import Complaint
from apps.organizations.models import Hospital
from apps.px_action_center.models import PXAction
from django.contrib.contenttypes.models import ContentType
try:
complaint = Complaint.objects.select_related('hospital', 'patient', 'department').get(id=complaint_id)
# Check if hospital has auto-create enabled
# For now, we'll check metadata on hospital or use a simple rule
# In production, you'd have a HospitalComplaintConfig model
# Handle case where metadata field might not exist (legacy data)
hospital_metadata = getattr(complaint.hospital, 'metadata', None)
if hospital_metadata is None:
hospital_metadata = {}
auto_create = hospital_metadata.get('auto_create_action_on_complaint', False)
if not auto_create:
logger.info(f"Auto-create PX Action disabled for hospital {complaint.hospital.name}")
return {'status': 'disabled'}
# Use JSON-serializable values instead of model objects
category_name = complaint.category.name_en if complaint.category else None
category_id = str(complaint.category.id) if complaint.category else None
# Create PX Action
complaint_ct = ContentType.objects.get_for_model(Complaint)
action = PXAction.objects.create(
title=f"New Complaint: {complaint.title[:100]}",
description=complaint.description[:500],
source='complaint',
priority=complaint.priority,
hospital=complaint.hospital,
department=complaint.department,
patient=complaint.patient,
content_type=complaint_ct,
object_id=complaint.id,
metadata={
'complaint_id': str(complaint.id),
'complaint_category': category_name,
'complaint_category_id': category_id,
'complaint_severity': complaint.severity,
}
)
# Log audit
from apps.core.services import create_audit_log
create_audit_log(
event_type='px_action_created',
description=f"PX Action created from complaint",
content_object=action,
metadata={
'complaint_id': str(complaint.id),
'trigger': 'complaint_creation'
}
)
logger.info(f"Created PX Action {action.id} from complaint {complaint_id}")
return {
'status': 'action_created',
'action_id': str(action.id)
}
except Complaint.DoesNotExist:
error_msg = f"Complaint {complaint_id} not found"
logger.error(error_msg)
return {'status': 'error', 'reason': error_msg}
except Exception as e:
error_msg = f"Error creating action from complaint: {str(e)}"
logger.error(error_msg, exc_info=True)
return {'status': 'error', 'reason': error_msg}
@shared_task
def escalate_complaint_auto(complaint_id):
"""
Automatically escalate complaint based on escalation rules.
This task is triggered when a complaint becomes overdue.
It finds matching escalation rules and reassigns the complaint.
Args:
complaint_id: UUID of the Complaint
Returns:
dict: Result with escalation status
"""
from apps.complaints.models import Complaint, ComplaintUpdate, EscalationRule
from apps.accounts.models import User
try:
complaint = Complaint.objects.select_related(
'hospital', 'department', 'assigned_to'
).get(id=complaint_id)
# Calculate hours overdue
hours_overdue = (timezone.now() - complaint.due_at).total_seconds() / 3600
# Get applicable escalation rules for this hospital
rules = EscalationRule.objects.filter(
hospital=complaint.hospital,
is_active=True,
trigger_on_overdue=True
).order_by('order')
# Filter rules by severity and priority if specified
if complaint.severity:
rules = rules.filter(
Q(severity_filter='') | Q(severity_filter=complaint.severity)
)
if complaint.priority:
rules = rules.filter(
Q(priority_filter='') | Q(priority_filter=complaint.priority)
)
# Find first matching rule based on hours overdue
matching_rule = None
for rule in rules:
if hours_overdue >= rule.trigger_hours_overdue:
matching_rule = rule
break
if not matching_rule:
logger.info(f"No matching escalation rule found for complaint {complaint_id}")
return {'status': 'no_matching_rule'}
# Determine escalation target
escalation_target = None
if matching_rule.escalate_to_role == 'department_manager':
if complaint.department and complaint.department.manager:
escalation_target = complaint.department.manager
elif matching_rule.escalate_to_role == 'hospital_admin':
# Find hospital admin for this hospital
escalation_target = User.objects.filter(
hospital=complaint.hospital,
groups__name='Hospital Admin',
is_active=True
).first()
elif matching_rule.escalate_to_role == 'px_admin':
# Find PX admin
escalation_target = User.objects.filter(
groups__name='PX Admin',
is_active=True
).first()
elif matching_rule.escalate_to_role == 'specific_user':
escalation_target = matching_rule.escalate_to_user
if not escalation_target:
logger.warning(
f"Could not find escalation target for rule {matching_rule.name} "
f"on complaint {complaint_id}"
)
return {'status': 'no_target_found', 'rule': matching_rule.name}
# Perform escalation
old_assignee = complaint.assigned_to
complaint.assigned_to = escalation_target
complaint.escalated_at = timezone.now()
complaint.save(update_fields=['assigned_to', 'escalated_at'])
# Create update
ComplaintUpdate.objects.create(
complaint=complaint,
update_type='escalation',
message=(
f"Automatically escalated to {escalation_target.get_full_name()} "
f"(Rule: {matching_rule.name}). "
f"Complaint is {hours_overdue:.1f} hours overdue."
),
created_by=None, # System action
metadata={
'rule_id': str(matching_rule.id),
'rule_name': matching_rule.name,
'hours_overdue': hours_overdue,
'old_assignee_id': str(old_assignee.id) if old_assignee else None,
'new_assignee_id': str(escalation_target.id)
}
)
# Send notifications
send_complaint_notification.delay(
complaint_id=str(complaint.id),
event_type='escalated'
)
# Log audit
from apps.core.services import create_audit_log
create_audit_log(
event_type='complaint_escalated',
description=f"Complaint automatically escalated to {escalation_target.get_full_name()}",
content_object=complaint,
metadata={
'rule': matching_rule.name,
'hours_overdue': hours_overdue,
'escalated_to': escalation_target.get_full_name()
}
)
logger.info(
f"Escalated complaint {complaint_id} to {escalation_target.get_full_name()} "
f"using rule '{matching_rule.name}'"
)
return {
'status': 'escalated',
'rule': matching_rule.name,
'escalated_to': escalation_target.get_full_name(),
'hours_overdue': round(hours_overdue, 2)
}
except Complaint.DoesNotExist:
error_msg = f"Complaint {complaint_id} not found"
logger.error(error_msg)
return {'status': 'error', 'reason': error_msg}
except Exception as e:
error_msg = f"Error escalating complaint: {str(e)}"
logger.error(error_msg, exc_info=True)
return {'status': 'error', 'reason': error_msg}
@shared_task
def analyze_complaint_with_ai(complaint_id):
"""
Analyze a complaint using AI to determine severity and priority and category.
This task is triggered when a complaint is created.
It uses the AI service to analyze the complaint content and classify it.
Args:
complaint_id: UUID of the Complaint
Returns:
dict: Result with severity, priority, category, and reasoning
"""
from apps.complaints.models import Complaint
from apps.core.ai_service import AIService, AIServiceError
try:
complaint = Complaint.objects.select_related('hospital').get(id=complaint_id)
logger.info(f"Starting AI analysis for complaint {complaint_id}")
# Get category name if category exists
category_name = None
if complaint.category:
category_name = complaint.category.name_en
# Analyze complaint using AI service
try:
analysis = AIService.analyze_complaint(
title=complaint.title,
description=complaint.description,
category=category_name,
hospital_id=complaint.hospital.id
)
# Analyze emotion using AI service
emotion_analysis = AIService.analyze_emotion(
text=complaint.description
)
# Update complaint with AI-determined values
old_severity = complaint.severity
old_priority = complaint.priority
old_category = complaint.category
old_department = complaint.department
complaint.severity = analysis['severity']
complaint.priority = analysis['priority']
from apps.complaints.models import ComplaintCategory
if category := ComplaintCategory.objects.filter(name_en=analysis['category']).first():
complaint.category = category
# Update department from AI analysis
department_name = analysis.get('department', '')
if department_name:
from apps.organizations.models import Department
# Try exact match first (case-insensitive)
if department := Department.objects.filter(
hospital_id=complaint.hospital.id,
name__iexact=department_name,
status='active'
).first():
complaint.department = department
logger.info(f"Matched department exactly: {department.name}")
# If no exact match, try partial match
elif department := Department.objects.filter(
hospital_id=complaint.hospital.id,
name__icontains=department_name,
status='active'
).first():
complaint.department = department
logger.info(f"Matched department partially: {department.name} from '{department_name}'")
else:
logger.warning(f"AI suggested department '{department_name}' but no match found in hospital '{complaint.hospital.name}'")
# Update title from AI analysis (use English version)
if analysis.get('title_en'):
complaint.title = analysis['title_en']
elif analysis.get('title'):
complaint.title = analysis['title']
# Get staff_name from analyze_complaint result (already extracted by AI)
staff_name = analysis.get('staff_name', '').strip()
# Always get ALL matching staff for PX Admin review
staff_matches = []
staff_confidence = 0.0
staff_matching_method = None
matched_staff_id = None
# Capture old staff before matching
old_staff = complaint.staff
if staff_name:
logger.info(f"AI extracted staff name: {staff_name}")
# Try matching WITH department filter first (higher confidence if match found)
staff_matches, staff_confidence, staff_matching_method = match_staff_from_name(
staff_name=staff_name,
hospital_id=str(complaint.hospital.id),
department_name=department_name,
return_all=True # Return ALL matches
)
# If no match found with department, try WITHOUT department filter
if not staff_matches:
logger.info(f"No match found with department filter '{department_name}', trying without department filter...")
staff_matches, staff_confidence, staff_matching_method = match_staff_from_name(
staff_name=staff_name,
hospital_id=str(complaint.hospital.id),
department_name=None, # Search all departments
return_all=True
)
# Logic for staff assignment
needs_staff_review = False
if staff_matches:
# If only ONE match, assign it (regardless of confidence for PX Admin review)
if len(staff_matches) == 1:
matched_staff_id = staff_matches[0]['id']
from apps.organizations.models import Staff
try:
staff = Staff.objects.get(id=matched_staff_id)
complaint.staff = staff
logger.info(
f"Assigned staff {staff.first_name} {staff.last_name} "
f"to complaint {complaint_id} "
f"(confidence: {staff_confidence:.2f}, method: {staff_matching_method})"
)
except Staff.DoesNotExist:
logger.warning(f"Staff {matched_staff_id} not found in database")
# Still mark for review if confidence is low
if staff_confidence < 0.6:
needs_staff_review = True
else:
# Multiple matches found - don't assign, mark for review
logger.info(
f"Multiple staff matches found ({len(staff_matches)}), "
f"marking for PX Admin review"
)
needs_staff_review = True
# Assign to department instead if available
if department_name:
# Department already set from AI analysis
pass
elif staff_matches[0].get('department_id'):
from apps.organizations.models import Department
try:
dept = Department.objects.get(id=staff_matches[0]['department_id'])
complaint.department = dept
logger.info(f"Assigned to department: {dept.name}")
except Department.DoesNotExist:
pass
else:
# No matches found
logger.warning(f"No staff match found for name: {staff_name}")
needs_staff_review = False # No review needed if no name found
# Save reasoning in metadata
# Use JSON-serializable values instead of model objects
old_category_name = old_category.name_en if old_category else None
old_category_id = str(old_category.id) if old_category else None
old_department_name = old_department.name if old_department else None
old_department_id = str(old_department.id) if old_department else None
old_staff_name = f"{old_staff.first_name} {old_staff.last_name}" if old_staff else None
old_staff_id = str(old_staff.id) if old_staff else None
# Initialize metadata if needed
if not complaint.metadata:
complaint.metadata = {}
# Update or create ai_analysis in metadata with bilingual support and emotion
complaint.metadata['ai_analysis'] = {
'title_en': analysis.get('title_en', ''),
'title_ar': analysis.get('title_ar', ''),
'short_description_en': analysis.get('short_description_en', ''),
'short_description_ar': analysis.get('short_description_ar', ''),
'suggested_action_en': analysis.get('suggested_action_en', ''),
'suggested_action_ar': analysis.get('suggested_action_ar', ''),
'reasoning_en': analysis.get('reasoning_en', ''),
'reasoning_ar': analysis.get('reasoning_ar', ''),
'emotion': emotion_analysis.get('emotion', 'neutral'),
'emotion_intensity': emotion_analysis.get('intensity', 0.0),
'emotion_confidence': emotion_analysis.get('confidence', 0.0),
'analyzed_at': timezone.now().isoformat(),
'old_severity': old_severity,
'old_priority': old_priority,
'old_category': old_category_name,
'old_category_id': old_category_id,
'old_department': old_department_name,
'old_department_id': old_department_id,
'old_staff': old_staff_name,
'old_staff_id': old_staff_id,
'extracted_staff_name': staff_name,
'staff_matches': staff_matches,
'matched_staff_id': matched_staff_id,
'staff_confidence': staff_confidence,
'staff_matching_method': staff_matching_method,
'needs_staff_review': needs_staff_review,
'staff_match_count': len(staff_matches)
}
complaint.save(update_fields=['severity', 'priority', 'category', 'department', 'staff', 'title', 'metadata'])
# Re-calculate SLA due date based on new severity
complaint.due_at = complaint.calculate_sla_due_date()
complaint.save(update_fields=['due_at'])
# Create timeline update for AI completion
from apps.complaints.models import ComplaintUpdate
# Build bilingual message
emotion_display = emotion_analysis.get('emotion', 'neutral')
emotion_intensity = emotion_analysis.get('intensity', 0.0)
# Build English message
message_en = f"AI analysis complete: Severity={analysis['severity']}, Priority={analysis['priority']}, Category={analysis.get('category', 'N/A')}, Department={department_name or 'N/A'}"
if matched_staff_id:
message_en += f", Staff={f'{complaint.staff.first_name} {complaint.staff.last_name}' if complaint.staff else 'N/A'} (confidence: {staff_confidence:.2f})"
message_en += f", Emotion={emotion_display} (Intensity: {emotion_intensity:.2f})"
# Build Arabic message
message_ar = f"اكتمل تحليل الذكاء الاصطناعي: الشدة={analysis['severity']}, الأولوية={analysis['priority']}, الفئة={analysis.get('category', 'N/A')}, القسم={department_name or 'N/A'}"
if matched_staff_id and complaint.staff:
staff_name_ar = complaint.staff.first_name_ar if complaint.staff.first_name_ar else complaint.staff.first_name
message_ar += f", الموظف={staff_name_ar} {complaint.staff.last_name_ar if complaint.staff.last_name_ar else complaint.staff.last_name} (الثقة: {staff_confidence:.2f})"
message_ar += f", العاطفة={emotion_display} (الشدة: {emotion_intensity:.2f})"
message = f"{message_en}\n\n{message_ar}"
ComplaintUpdate.objects.create(
complaint=complaint,
update_type='note',
message=message
)
logger.info(
f"AI analysis complete for complaint {complaint_id}: "
f"severity={old_severity}->{analysis['severity']}, "
f"priority={old_priority}->{analysis['priority']}, "
f"category={old_category_name}->{analysis['category']}, "
f"department={old_department_name}->{department_name}, "
f"title_en={analysis.get('title_en', '')}"
)
return {
'status': 'success',
'complaint_id': str(complaint_id),
'severity': analysis['severity'],
'priority': analysis['priority'],
'category': analysis['category'],
'department': department_name,
'title_en': analysis.get('title_en', ''),
'title_ar': analysis.get('title_ar', ''),
'short_description_en': analysis.get('short_description_en', ''),
'short_description_ar': analysis.get('short_description_ar', ''),
'suggested_action_en': analysis.get('suggested_action_en', ''),
'suggested_action_ar': analysis.get('suggested_action_ar', ''),
'reasoning_en': analysis.get('reasoning_en', ''),
'reasoning_ar': analysis.get('reasoning_ar', ''),
'emotion': emotion_analysis.get('emotion', 'neutral'),
'emotion_intensity': emotion_analysis.get('intensity', 0.0),
'emotion_confidence': emotion_analysis.get('confidence', 0.0),
'old_severity': old_severity,
'old_priority': old_priority
}
except AIServiceError as e:
logger.error(f"AI service error for complaint {complaint_id}: {str(e)}")
# Keep default values (medium/medium) and log the error
return {
'status': 'ai_error',
'complaint_id': str(complaint_id),
'reason': str(e)
}
except Complaint.DoesNotExist:
error_msg = f"Complaint {complaint_id} not found"
logger.error(error_msg)
return {'status': 'error', 'reason': error_msg}
except Exception as e:
error_msg = f"Error analyzing complaint {complaint_id} with AI: {str(e)}"
logger.error(error_msg, exc_info=True)
return {'status': 'error', 'reason': error_msg}
@shared_task
def send_complaint_notification(complaint_id, event_type):
"""
Send notification for complaint events.
Args:
complaint_id: UUID of the Complaint
event_type: Type of event (created, assigned, overdue, escalated, resolved, closed)
Returns:
dict: Result with notification status
"""
from apps.complaints.models import Complaint
from apps.notifications.services import NotificationService
try:
complaint = Complaint.objects.select_related(
'hospital', 'patient', 'assigned_to', 'department'
).get(id=complaint_id)
# Determine recipients based on event type
recipients = []
if event_type == 'created':
# Notify assigned user or department manager
if complaint.assigned_to:
recipients.append(complaint.assigned_to)
elif complaint.department and complaint.department.manager:
recipients.append(complaint.department.manager)
elif event_type == 'assigned':
# Notify the assignee
if complaint.assigned_to:
recipients.append(complaint.assigned_to)
elif event_type in ['overdue', 'escalated']:
# Notify assignee and their manager
if complaint.assigned_to:
recipients.append(complaint.assigned_to)
if complaint.department and complaint.department.manager:
recipients.append(complaint.department.manager)
elif event_type == 'resolved':
# Notify patient
recipients.append(complaint.patient)
elif event_type == 'closed':
# Notify patient
recipients.append(complaint.patient)
# Send notifications
notification_count = 0
for recipient in recipients:
try:
# Check if NotificationService has send_notification method
if hasattr(NotificationService, 'send_notification'):
NotificationService.send_notification(
recipient=recipient,
title=f"Complaint {event_type.title()}: {complaint.title[:50]}",
message=f"Complaint #{str(complaint.id)[:8]} has been {event_type}.",
notification_type='complaint',
related_object=complaint
)
notification_count += 1
else:
logger.warning(f"NotificationService.send_notification method not available")
except Exception as e:
logger.error(f"Failed to send notification to {recipient}: {str(e)}")
logger.info(f"Sent {notification_count} notifications for complaint {complaint_id} event: {event_type}")
return {
'status': 'sent',
'notification_count': notification_count,
'event_type': event_type
}
except Complaint.DoesNotExist:
error_msg = f"Complaint {complaint_id} not found"
logger.error(error_msg)
return {'status': 'error', 'reason': error_msg}
except Exception as e:
error_msg = f"Error sending complaint notification: {str(e)}"
logger.error(error_msg, exc_info=True)
return {'status': 'error', 'reason': error_msg}