HH/apps/complaints/analytics.py
Marwan Alwali 2179fbf39a update
2025-12-31 13:16:30 +03:00

319 lines
10 KiB
Python

"""
Complaints analytics service
Provides analytics and metrics for complaints dashboard integration.
"""
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from django.db.models import Count, Q, Avg, F, ExpressionWrapper, DurationField
from django.utils import timezone
from apps.complaints.models import Complaint, ComplaintStatus
class ComplaintAnalytics:
"""Service for complaint analytics and metrics"""
@staticmethod
def get_complaint_trends(hospital=None, date_range=30):
"""
Get complaint trends over time.
Args:
hospital: Optional hospital to filter by
date_range: Number of days to analyze (default 30)
Returns:
dict: Trend data with dates and counts
"""
end_date = timezone.now()
start_date = end_date - timedelta(days=date_range)
queryset = Complaint.objects.filter(
created_at__gte=start_date,
created_at__lte=end_date
)
if hospital:
queryset = queryset.filter(hospital=hospital)
# Group by date
trends = queryset.extra(
select={'date': 'DATE(created_at)'}
).values('date').annotate(
count=Count('id')
).order_by('date')
# Convert dates to strings
labels = []
data = []
for item in trends:
date_val = item['date']
# Handle both date objects and strings
if isinstance(date_val, str):
labels.append(date_val)
else:
labels.append(date_val.strftime('%Y-%m-%d'))
data.append(item['count'])
return {
'labels': labels,
'data': data,
'total': queryset.count()
}
@staticmethod
def get_sla_compliance(hospital=None, date_range=30):
"""
Calculate SLA compliance metrics.
Args:
hospital: Optional hospital to filter by
date_range: Number of days to analyze
Returns:
dict: SLA compliance data
"""
end_date = timezone.now()
start_date = end_date - timedelta(days=date_range)
queryset = Complaint.objects.filter(
created_at__gte=start_date,
created_at__lte=end_date
)
if hospital:
queryset = queryset.filter(hospital=hospital)
total = queryset.count()
overdue = queryset.filter(is_overdue=True).count()
on_time = total - overdue
compliance_rate = (on_time / total * 100) if total > 0 else 0
# Get trend data
daily_compliance = []
for i in range(date_range):
day = start_date + timedelta(days=i)
day_end = day + timedelta(days=1)
day_total = queryset.filter(
created_at__gte=day,
created_at__lt=day_end
).count()
day_overdue = queryset.filter(
created_at__gte=day,
created_at__lt=day_end,
is_overdue=True
).count()
day_compliance = ((day_total - day_overdue) / day_total * 100) if day_total > 0 else 100
daily_compliance.append({
'date': day.strftime('%Y-%m-%d'),
'compliance_rate': round(day_compliance, 2),
'total': day_total,
'overdue': day_overdue
})
return {
'overall_compliance_rate': round(compliance_rate, 2),
'total_complaints': total,
'on_time': on_time,
'overdue': overdue,
'daily_compliance': daily_compliance
}
@staticmethod
def get_resolution_rate(hospital=None, date_range=30):
"""
Calculate resolution rate metrics.
Args:
hospital: Optional hospital to filter by
date_range: Number of days to analyze
Returns:
dict: Resolution rate data
"""
end_date = timezone.now()
start_date = end_date - timedelta(days=date_range)
queryset = Complaint.objects.filter(
created_at__gte=start_date,
created_at__lte=end_date
)
if hospital:
queryset = queryset.filter(hospital=hospital)
total = queryset.count()
resolved = queryset.filter(
status__in=[ComplaintStatus.RESOLVED, ComplaintStatus.CLOSED]
).count()
resolution_rate = (resolved / total * 100) if total > 0 else 0
# Calculate average resolution time
resolved_complaints = queryset.filter(
status__in=[ComplaintStatus.RESOLVED, ComplaintStatus.CLOSED],
resolved_at__isnull=False
)
avg_resolution_time = None
if resolved_complaints.exists():
# Calculate time difference
time_diffs = []
for complaint in resolved_complaints:
if complaint.resolved_at:
diff = (complaint.resolved_at - complaint.created_at).total_seconds() / 3600 # hours
time_diffs.append(diff)
if time_diffs:
avg_resolution_time = sum(time_diffs) / len(time_diffs)
# Resolution by department
by_department = queryset.filter(
status__in=[ComplaintStatus.RESOLVED, ComplaintStatus.CLOSED],
department__isnull=False
).values(
'department__name'
).annotate(
count=Count('id')
).order_by('-count')[:10]
return {
'resolution_rate': round(resolution_rate, 2),
'total_complaints': total,
'resolved': resolved,
'pending': total - resolved,
'avg_resolution_time_hours': round(avg_resolution_time, 2) if avg_resolution_time else None,
'by_department': list(by_department)
}
@staticmethod
def get_top_categories(hospital=None, date_range=30, limit=10):
"""
Get top complaint categories.
Args:
hospital: Optional hospital to filter by
date_range: Number of days to analyze
limit: Number of top categories to return
Returns:
dict: Top categories data
"""
end_date = timezone.now()
start_date = end_date - timedelta(days=date_range)
queryset = Complaint.objects.filter(
created_at__gte=start_date,
created_at__lte=end_date
)
if hospital:
queryset = queryset.filter(hospital=hospital)
categories = queryset.values('category').annotate(
count=Count('id')
).order_by('-count')[:limit]
return {
'categories': [
{
'category': item['category'],
'count': item['count']
}
for item in categories
]
}
@staticmethod
def get_overdue_complaints(hospital=None, limit=10):
"""
Get list of overdue complaints.
Args:
hospital: Optional hospital to filter by
limit: Number of complaints to return
Returns:
QuerySet: Overdue complaints
"""
queryset = Complaint.objects.filter(
is_overdue=True,
status__in=[ComplaintStatus.OPEN, ComplaintStatus.IN_PROGRESS]
).select_related(
'patient', 'hospital', 'department', 'assigned_to'
).order_by('due_at')
if hospital:
queryset = queryset.filter(hospital=hospital)
return queryset[:limit]
@staticmethod
def get_dashboard_summary(hospital=None):
"""
Get comprehensive dashboard summary.
Args:
hospital: Optional hospital to filter by
Returns:
dict: Dashboard summary data
"""
queryset = Complaint.objects.all()
if hospital:
queryset = queryset.filter(hospital=hospital)
# Current status counts
status_counts = {
'total': queryset.count(),
'open': queryset.filter(status=ComplaintStatus.OPEN).count(),
'in_progress': queryset.filter(status=ComplaintStatus.IN_PROGRESS).count(),
'resolved': queryset.filter(status=ComplaintStatus.RESOLVED).count(),
'closed': queryset.filter(status=ComplaintStatus.CLOSED).count(),
'overdue': queryset.filter(is_overdue=True).count(),
}
# Severity breakdown
severity_counts = queryset.values('severity').annotate(
count=Count('id')
)
# Recent high severity
recent_high_severity = queryset.filter(
severity__in=['high', 'critical'],
created_at__gte=timezone.now() - timedelta(days=7)
).count()
# Trends (last 7 days vs previous 7 days)
last_7_days = queryset.filter(
created_at__gte=timezone.now() - timedelta(days=7)
).count()
previous_7_days = queryset.filter(
created_at__gte=timezone.now() - timedelta(days=14),
created_at__lt=timezone.now() - timedelta(days=7)
).count()
trend_percentage = 0
if previous_7_days > 0:
trend_percentage = ((last_7_days - previous_7_days) / previous_7_days) * 100
return {
'status_counts': status_counts,
'severity_counts': list(severity_counts),
'recent_high_severity': recent_high_severity,
'trend': {
'last_7_days': last_7_days,
'previous_7_days': previous_7_days,
'percentage_change': round(trend_percentage, 2)
}
}