HH/apps/analytics/kpi_service.py
2026-02-22 08:35:53 +03:00

620 lines
23 KiB
Python

"""
KPI Report Calculation Service
This service calculates KPI metrics for monthly reports based on
the complaint and survey data in the system.
"""
import logging
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Dict, List, Optional
from django.db.models import Avg, Count, F, Q
from django.utils import timezone
from apps.complaints.models import Complaint, ComplaintStatus, ComplaintSource
from apps.organizations.models import Department
from apps.surveys.models import SurveyInstance, SurveyStatus, SurveyTemplate
from .kpi_models import (
KPIReport,
KPIReportDepartmentBreakdown,
KPIReportMonthlyData,
KPIReportSourceBreakdown,
KPIReportStatus,
KPIReportType,
)
logger = logging.getLogger(__name__)
class KPICalculationService:
"""
Service for calculating KPI report metrics
Handles the complex calculations for each KPI type:
- 72H Resolution Rate
- Patient Experience Score
- Satisfaction with Resolution
- Response Rate
- Activation Time
- Unactivated Rate
"""
@classmethod
def generate_monthly_report(
cls,
report_type: str,
hospital,
year: int,
month: int,
generated_by=None
) -> KPIReport:
"""
Generate a complete monthly KPI report
Args:
report_type: Type of KPI report (from KPIReportType)
hospital: Hospital instance
year: Report year
month: Report month (1-12)
generated_by: User who generated the report (optional)
Returns:
KPIReport instance
"""
# Get or create the report
report, created = KPIReport.objects.get_or_create(
report_type=report_type,
hospital=hospital,
year=year,
month=month,
defaults={
"report_date": timezone.now().date(),
"status": KPIReportStatus.PENDING,
"generated_by": generated_by,
}
)
if not created and report.status == KPIReportStatus.COMPLETED:
# Report already exists and is complete - return it
return report
# Update status to generating
report.status = KPIReportStatus.GENERATING
report.save()
try:
# Calculate based on report type
if report_type == KPIReportType.RESOLUTION_72H:
cls._calculate_72h_resolution(report)
elif report_type == KPIReportType.PATIENT_EXPERIENCE:
cls._calculate_patient_experience(report)
elif report_type == KPIReportType.SATISFACTION_RESOLUTION:
cls._calculate_satisfaction_resolution(report)
elif report_type == KPIReportType.N_PAD_001:
cls._calculate_n_pad_001(report)
elif report_type == KPIReportType.RESPONSE_RATE:
cls._calculate_response_rate(report)
elif report_type == KPIReportType.ACTIVATION_2H:
cls._calculate_activation_2h(report)
elif report_type == KPIReportType.UNACTIVATED:
cls._calculate_unactivated(report)
# Mark as completed
report.status = KPIReportStatus.COMPLETED
report.generated_at = timezone.now()
report.save()
logger.info(f"KPI Report {report.id} generated successfully")
except Exception as e:
logger.exception(f"Error generating KPI report {report.id}")
report.status = KPIReportStatus.FAILED
report.error_message = str(e)
report.save()
raise
return report
@classmethod
def _calculate_72h_resolution(cls, report: KPIReport):
"""Calculate 72-Hour Resolution Rate (MOH-2)"""
# Get date range for the report period
start_date = datetime(report.year, report.month, 1)
if report.month == 12:
end_date = datetime(report.year + 1, 1, 1)
else:
end_date = datetime(report.year, report.month + 1, 1)
# Get all months data for YTD (year to date)
year_start = datetime(report.year, 1, 1)
# Calculate for each month
total_numerator = 0
total_denominator = 0
for month in range(1, 13):
month_start = datetime(report.year, month, 1)
if month == 12:
month_end = datetime(report.year + 1, 1, 1)
else:
month_end = datetime(report.year, month + 1, 1)
# Get complaints for this month
complaints = Complaint.objects.filter(
hospital=report.hospital,
created_at__gte=month_start,
created_at__lt=month_end,
complaint_type="complaint" # Only actual complaints
)
# Count total complaints
denominator = complaints.count()
# Count resolved within 72 hours
numerator = 0
for complaint in complaints:
if complaint.resolved_at and complaint.created_at:
resolution_time = complaint.resolved_at - complaint.created_at
if resolution_time.total_seconds() <= 72 * 3600: # 72 hours
numerator += 1
# Create or update monthly data
monthly_data, _ = KPIReportMonthlyData.objects.get_or_create(
kpi_report=report,
month=month,
defaults={
"numerator": numerator,
"denominator": denominator,
}
)
monthly_data.numerator = numerator
monthly_data.denominator = denominator
monthly_data.calculate_percentage()
monthly_data.is_below_target = monthly_data.percentage < report.target_percentage
# Store source breakdown in details
source_data = cls._get_source_breakdown(complaints)
monthly_data.details = {"source_breakdown": source_data}
monthly_data.save()
total_numerator += numerator
total_denominator += denominator
# Update report totals
report.total_numerator = total_numerator
report.total_denominator = total_denominator
if total_denominator > 0:
report.overall_result = Decimal(str((total_numerator / total_denominator) * 100))
report.save()
# Create source breakdown for pie chart
all_complaints = Complaint.objects.filter(
hospital=report.hospital,
created_at__gte=year_start,
created_at__lt=end_date,
complaint_type="complaint"
)
cls._create_source_breakdowns(report, all_complaints)
# Create department breakdown
cls._create_department_breakdown(report, all_complaints)
@classmethod
def _calculate_patient_experience(cls, report: KPIReport):
"""Calculate Patient Experience Score (MOH-1)"""
# Get date range
year_start = datetime(report.year, 1, 1)
start_date = datetime(report.year, report.month, 1)
if report.month == 12:
end_date = datetime(report.year + 1, 1, 1)
else:
end_date = datetime(report.year, report.month + 1, 1)
total_numerator = 0
total_denominator = 0
for month in range(1, 13):
month_start = datetime(report.year, month, 1)
if month == 12:
month_end = datetime(report.year + 1, 1, 1)
else:
month_end = datetime(report.year, month + 1, 1)
# Get completed surveys for patient experience
surveys = SurveyInstance.objects.filter(
survey_template__hospital=report.hospital,
status=SurveyStatus.COMPLETED,
completed_at__gte=month_start,
completed_at__lt=month_end,
survey_template__survey_type__in=["stage", "general"]
)
denominator = surveys.count()
# Count positive responses (score >= 4 out of 5)
numerator = surveys.filter(total_score__gte=4).count()
monthly_data, _ = KPIReportMonthlyData.objects.get_or_create(
kpi_report=report,
month=month,
defaults={
"numerator": numerator,
"denominator": denominator,
}
)
monthly_data.numerator = numerator
monthly_data.denominator = denominator
monthly_data.calculate_percentage()
monthly_data.is_below_target = monthly_data.percentage < report.target_percentage
# Store average score
avg_score = surveys.aggregate(avg=Avg('total_score'))['avg'] or 0
monthly_data.details = {"avg_score": round(avg_score, 2)}
monthly_data.save()
total_numerator += numerator
total_denominator += denominator
report.total_numerator = total_numerator
report.total_denominator = total_denominator
if total_denominator > 0:
report.overall_result = Decimal(str((total_numerator / total_denominator) * 100))
report.save()
@classmethod
def _calculate_satisfaction_resolution(cls, report: KPIReport):
"""Calculate Overall Satisfaction with Resolution (MOH-3)"""
year_start = datetime(report.year, 1, 1)
total_numerator = 0
total_denominator = 0
for month in range(1, 13):
month_start = datetime(report.year, month, 1)
if month == 12:
month_end = datetime(report.year + 1, 1, 1)
else:
month_end = datetime(report.year, month + 1, 1)
# Get complaint resolution surveys
surveys = SurveyInstance.objects.filter(
survey_template__hospital=report.hospital,
status=SurveyStatus.COMPLETED,
completed_at__gte=month_start,
completed_at__lt=month_end,
survey_template__survey_type="complaint_resolution"
)
denominator = surveys.count()
# Satisfied = score >= 4
numerator = surveys.filter(total_score__gte=4).count()
monthly_data, _ = KPIReportMonthlyData.objects.get_or_create(
kpi_report=report,
month=month,
defaults={
"numerator": numerator,
"denominator": denominator,
}
)
monthly_data.numerator = numerator
monthly_data.denominator = denominator
monthly_data.calculate_percentage()
monthly_data.is_below_target = monthly_data.percentage < report.target_percentage
monthly_data.save()
total_numerator += numerator
total_denominator += denominator
report.total_numerator = total_numerator
report.total_denominator = total_denominator
if total_denominator > 0:
report.overall_result = Decimal(str((total_numerator / total_denominator) * 100))
report.save()
@classmethod
def _calculate_n_pad_001(cls, report: KPIReport):
"""Calculate N-PAD-001 Resolution Rate"""
year_start = datetime(report.year, 1, 1)
total_numerator = 0
total_denominator = 0
for month in range(1, 13):
month_start = datetime(report.year, month, 1)
if month == 12:
month_end = datetime(report.year + 1, 1, 1)
else:
month_end = datetime(report.year, month + 1, 1)
# Get complaints
complaints = Complaint.objects.filter(
hospital=report.hospital,
created_at__gte=month_start,
created_at__lt=month_end,
complaint_type="complaint"
)
denominator = complaints.count()
# Resolved includes closed and resolved statuses
numerator = complaints.filter(
status__in=[ComplaintStatus.RESOLVED, ComplaintStatus.CLOSED]
).count()
monthly_data, _ = KPIReportMonthlyData.objects.get_or_create(
kpi_report=report,
month=month,
defaults={
"numerator": numerator,
"denominator": denominator,
}
)
monthly_data.numerator = numerator
monthly_data.denominator = denominator
monthly_data.calculate_percentage()
monthly_data.is_below_target = monthly_data.percentage < report.target_percentage
monthly_data.save()
total_numerator += numerator
total_denominator += denominator
report.total_numerator = total_numerator
report.total_denominator = total_denominator
if total_denominator > 0:
report.overall_result = Decimal(str((total_numerator / total_denominator) * 100))
report.save()
@classmethod
def _calculate_response_rate(cls, report: KPIReport):
"""Calculate Department Response Rate (48h)"""
year_start = datetime(report.year, 1, 1)
total_numerator = 0
total_denominator = 0
for month in range(1, 13):
month_start = datetime(report.year, month, 1)
if month == 12:
month_end = datetime(report.year + 1, 1, 1)
else:
month_end = datetime(report.year, month + 1, 1)
# Get complaints that received a response
complaints = Complaint.objects.filter(
hospital=report.hospital,
created_at__gte=month_start,
created_at__lt=month_end,
complaint_type="complaint"
)
denominator = complaints.count()
# Count complaints with response within 48h
numerator = 0
for complaint in complaints:
first_update = complaint.updates.order_by('created_at').first()
if first_update and complaint.created_at:
response_time = first_update.created_at - complaint.created_at
if response_time.total_seconds() <= 48 * 3600:
numerator += 1
monthly_data, _ = KPIReportMonthlyData.objects.get_or_create(
kpi_report=report,
month=month,
defaults={
"numerator": numerator,
"denominator": denominator,
}
)
monthly_data.numerator = numerator
monthly_data.denominator = denominator
monthly_data.calculate_percentage()
monthly_data.is_below_target = monthly_data.percentage < report.target_percentage
monthly_data.save()
total_numerator += numerator
total_denominator += denominator
report.total_numerator = total_numerator
report.total_denominator = total_denominator
if total_denominator > 0:
report.overall_result = Decimal(str((total_numerator / total_denominator) * 100))
report.save()
@classmethod
def _calculate_activation_2h(cls, report: KPIReport):
"""Calculate Complaint Activation Within 2 Hours"""
year_start = datetime(report.year, 1, 1)
total_numerator = 0
total_denominator = 0
for month in range(1, 13):
month_start = datetime(report.year, month, 1)
if month == 12:
month_end = datetime(report.year + 1, 1, 1)
else:
month_end = datetime(report.year, month + 1, 1)
# Get complaints with assigned_to (activated)
complaints = Complaint.objects.filter(
hospital=report.hospital,
created_at__gte=month_start,
created_at__lt=month_end,
complaint_type="complaint"
)
denominator = complaints.count()
# Count activated within 2 hours
numerator = 0
for complaint in complaints:
if complaint.assigned_at and complaint.created_at:
activation_time = complaint.assigned_at - complaint.created_at
if activation_time.total_seconds() <= 2 * 3600:
numerator += 1
monthly_data, _ = KPIReportMonthlyData.objects.get_or_create(
kpi_report=report,
month=month,
defaults={
"numerator": numerator,
"denominator": denominator,
}
)
monthly_data.numerator = numerator
monthly_data.denominator = denominator
monthly_data.calculate_percentage()
monthly_data.is_below_target = monthly_data.percentage < report.target_percentage
monthly_data.save()
total_numerator += numerator
total_denominator += denominator
report.total_numerator = total_numerator
report.total_denominator = total_denominator
if total_denominator > 0:
report.overall_result = Decimal(str((total_numerator / total_denominator) * 100))
report.save()
@classmethod
def _calculate_unactivated(cls, report: KPIReport):
"""Calculate Unactivated Filled Complaints Rate"""
year_start = datetime(report.year, 1, 1)
total_numerator = 0
total_denominator = 0
for month in range(1, 13):
month_start = datetime(report.year, month, 1)
if month == 12:
month_end = datetime(report.year + 1, 1, 1)
else:
month_end = datetime(report.year, month + 1, 1)
# Get all complaints
complaints = Complaint.objects.filter(
hospital=report.hospital,
created_at__gte=month_start,
created_at__lt=month_end,
complaint_type="complaint"
)
denominator = complaints.count()
# Unactivated = no assigned_to
numerator = complaints.filter(assigned_to__isnull=True).count()
monthly_data, _ = KPIReportMonthlyData.objects.get_or_create(
kpi_report=report,
month=month,
defaults={
"numerator": numerator,
"denominator": denominator,
}
)
monthly_data.numerator = numerator
monthly_data.denominator = denominator
monthly_data.calculate_percentage()
# Note: For unactivated, HIGHER is WORSE, so below target = above threshold
monthly_data.is_below_target = monthly_data.percentage > (100 - report.target_percentage)
monthly_data.save()
total_numerator += numerator
total_denominator += denominator
report.total_numerator = total_numerator
report.total_denominator = total_denominator
if total_denominator > 0:
report.overall_result = Decimal(str((total_numerator / total_denominator) * 100))
report.save()
@classmethod
def _get_source_breakdown(cls, complaints) -> Dict[str, int]:
"""Get breakdown of complaints by source"""
sources = {}
for complaint in complaints:
source_name = complaint.source.name_en if complaint.source else "Other"
sources[source_name] = sources.get(source_name, 0) + 1
return sources
@classmethod
def _create_source_breakdowns(cls, report: KPIReport, complaints):
"""Create source breakdown records for pie chart"""
# Delete existing
report.source_breakdowns.all().delete()
# Count by source
source_counts = {}
total = complaints.count()
for complaint in complaints:
source_name = complaint.source.name_en if complaint.source else "Other"
source_counts[source_name] = source_counts.get(source_name, 0) + 1
# Create records
for source_name, count in source_counts.items():
percentage = (count / total * 100) if total > 0 else 0
KPIReportSourceBreakdown.objects.create(
kpi_report=report,
source_name=source_name,
complaint_count=count,
percentage=Decimal(str(percentage))
)
@classmethod
def _create_department_breakdown(cls, report: KPIReport, complaints):
"""Create department breakdown records"""
# Delete existing
report.department_breakdowns.all().delete()
# Categorize departments
department_categories = {
"medical": ["Medical", "Surgery", "Cardiology", "Orthopedics", "Pediatrics", "Obstetrics", "Gynecology"],
"nursing": ["Nursing", "ICU", "ER", "OR"],
"admin": ["Administration", "HR", "Finance", "IT", "Reception"],
"support": ["Housekeeping", "Maintenance", "Security", "Cafeteria", "Transport"],
}
for category, keywords in department_categories.items():
# Find departments matching this category
dept_complaints = complaints.filter(
department__name__icontains=keywords[0]
)
for keyword in keywords[1:]:
dept_complaints = dept_complaints | complaints.filter(
department__name__icontains=keyword
)
complaint_count = dept_complaints.count()
resolved_count = dept_complaints.filter(
status__in=[ComplaintStatus.RESOLVED, ComplaintStatus.CLOSED]
).count()
# Calculate average resolution days
avg_days = None
resolved_complaints = dept_complaints.filter(resolved_at__isnull=False)
if resolved_complaints.exists():
total_days = 0
for c in resolved_complaints:
days = (c.resolved_at - c.created_at).total_seconds() / (24 * 3600)
total_days += days
avg_days = Decimal(str(total_days / resolved_complaints.count()))
# Get top areas (subcategories)
top_areas_list = []
for c in dept_complaints[:10]:
if c.category:
top_areas_list.append(c.category.name_en)
top_areas = "\n".join(list(set(top_areas_list))[:5]) if top_areas_list else ""
KPIReportDepartmentBreakdown.objects.create(
kpi_report=report,
department_category=category,
complaint_count=complaint_count,
resolved_count=resolved_count,
avg_resolution_days=avg_days,
top_areas=top_areas
)