import calendar from collections import defaultdict from django.db.models import Count, Q from django.db.models.functions import TruncDate from apps.complaints.models import Complaint STATUS_MAP = { "open": "Open", "in_progress": "In Progress", "partially_resolved": "Partially Resolved", "resolved": "Resolved", "closed": "Closed", "cancelled": "Cancelled", "contacted": "Contacted", "contacted_no_response": "No Response", } SOURCE_TYPE_MAP = { "internal": "Internal", "external": "External", } SATISFACTION_MAP = { "satisfied": "Satisfied", "neutral": "Neutral", "dissatisfied": "Dissatisfied", "no_response": "No Response", "escalated": "Escalated", } def _time_delta_hours(start, end): if start and end: return round((end - start).total_seconds() / 3600, 1) return None class ComplaintMonthlyService: def __init__(self, hospital_id, year, month): self.hospital_id = hospital_id self.year = year self.month = month def _base_qs(self): return Complaint.objects.filter( hospital_id=self.hospital_id, created_at__year=self.year, created_at__month=self.month, ).select_related( "patient", "department", "assigned_to", "created_by", "domain", "category", "subcategory_obj", "location", "main_section", "source", ).prefetch_related("involved_departments__department") def get_summary(self): qs = self._base_qs() total = qs.count() by_status = dict( qs.values_list("status").annotate(count=Count("id")).order_by() ) by_source_type = dict( qs.values_list("complaint_source_type").annotate(count=Count("id")).order_by() ) by_severity = dict( qs.values_list("severity").annotate(count=Count("id")).order_by() ) by_satisfaction = dict( qs.exclude(satisfaction="").values_list("satisfaction").annotate(count=Count("id")).order_by() ) closed_count = by_status.get("closed", 0) + by_status.get("resolved", 0) resolution_rate = (closed_count / total * 100) if total > 0 else 0 escalated = qs.filter(escalated_at__isnull=False).count() activated = qs.filter(activated_at__isnull=False).count() activation_rate = (activated / total * 100) if total > 0 else 0 return { "total": total, "by_status": by_status, "by_source_type": by_source_type, "by_severity": by_severity, "by_satisfaction": by_satisfaction, "closed_count": closed_count, "resolution_rate": round(resolution_rate, 1), "escalated": escalated, "activated": activated, "activation_rate": round(activation_rate, 1), } def get_weekly_breakdown(self): qs = self._base_qs() weeks = {"week1": 0, "week2": 0, "week3": 0, "week4": 0, "week5": 0} status_weeks = defaultdict(lambda: defaultdict(int)) for c in qs.iterator(chunk_size=2000): day = c.created_at.day if day <= 7: wk = "week1" elif day <= 15: wk = "week2" elif day <= 22: wk = "week3" elif day <= 28: wk = "week4" else: wk = "week5" weeks[wk] += 1 status_weeks[c.status][wk] += 1 labels = ["Week 1 (1-7)", "Week 2 (8-15)", "Week 3 (16-22)", "Week 4 (23-28)", "Week 5 (29-31)"] keys = ["week1", "week2", "week3", "week4", "week5"] return { "labels": labels, "counts": [weeks[k] for k in keys], "by_status": { s: [status_weeks[s].get(k, 0) for k in keys] for s in status_weeks }, } def get_daily_trend(self): qs = self._base_qs() days_in_month = calendar.monthrange(self.year, self.month)[1] daily_raw = ( qs.annotate(day=TruncDate("created_at")) .values("day") .annotate(count=Count("id")) .order_by("day") ) daily = {} for entry in daily_raw: if entry["day"]: daily[entry["day"].day] = entry["count"] result = [] for d in range(1, days_in_month + 1): result.append({"day": d, "label": str(d), "count": daily.get(d, 0)}) return result def get_stage_funnel(self): qs = self._base_qs() return { "Received": qs.count(), "Activated": qs.filter(activated_at__isnull=False).count(), "Form Sent": qs.filter(form_sent_at__isnull=False).count(), "Forwarded": qs.filter(forwarded_to_dept_at__isnull=False).count(), "Responded": qs.filter(response_date__isnull=False).count(), "Resolved": qs.filter(resolved_at__isnull=False).count(), "Closed": qs.filter(closed_at__isnull=False).count(), } def get_avg_stage_times(self): qs = self._base_qs().filter(closed_at__isnull=False) if not qs.exists(): return {} times = defaultdict(list) for c in qs.iterator(chunk_size=2000): if c.activated_at and c.created_at: times["activate"].append(_time_delta_hours(c.created_at, c.activated_at)) if c.forwarded_to_dept_at and c.form_sent_at: times["send_to_forward"].append(_time_delta_hours(c.form_sent_at, c.forwarded_to_dept_at)) if c.closed_at and c.created_at: times["total_close"].append(_time_delta_hours(c.created_at, c.closed_at)) if c.resolved_at and c.activated_at: times["total_resolve"].append(_time_delta_hours(c.activated_at, c.resolved_at)) result = {} for key, vals in times.items(): if vals: result[key] = round(sum(vals) / len(vals), 1) return result def get_chart_data(self): daily = self.get_daily_trend() summary = self.get_summary() weekly = self.get_weekly_breakdown() funnel = self.get_stage_funnel() by_status_chart = [ {"x": STATUS_MAP.get(s, s), "y": c} for s, c in summary["by_status"].items() ] by_source_chart = [ {"x": SOURCE_TYPE_MAP.get(s, s or "N/A"), "y": c} for s, c in summary["by_source_type"].items() ] by_satisfaction_chart = [ {"x": SATISFACTION_MAP.get(s, s), "y": c} for s, c in summary["by_satisfaction"].items() ] return { "daily": [{"x": d["label"], "y": d["count"]} for d in daily], "by_status": by_status_chart, "by_source": by_source_chart, "by_satisfaction": by_satisfaction_chart, "weekly": { "labels": weekly["labels"], "counts": weekly["counts"], }, "funnel": { "labels": list(funnel.keys()), "counts": list(funnel.values()), }, } def get_available_months(self): months = ( Complaint.objects.filter(hospital_id=self.hospital_id) .values_list("created_at__year", "created_at__month") .distinct() .order_by("-created_at__year", "-created_at__month") ) return [ {"year": y, "month": m, "label": f"{calendar.month_abbr[m]} {y}"} for y, m in months ]