830 lines
34 KiB
Python
830 lines
34 KiB
Python
import calendar
|
|
from collections import defaultdict
|
|
from datetime import date, datetime
|
|
|
|
from django.db.models import Count, Q, F
|
|
from django.db.models.functions import TruncMonth
|
|
from django.utils import timezone as dj_timezone
|
|
|
|
from apps.complaints.models import Complaint
|
|
|
|
SOURCE_TYPES = ["MOH", "CHI", "CCHI", "Insurance"]
|
|
LOCATION_TYPES = ["IP", "OP", "ER"]
|
|
DEPT_TYPES = ["Medical", "Admin", "Nursing", "Support Services"]
|
|
TIMELINE_BUCKETS = ["24h", "48h", "72h", "72h+"]
|
|
MONTHS_ABBR = [calendar.month_abbr[m] for m in range(1, 13)]
|
|
|
|
|
|
class ComplaintQuarterlyService:
|
|
def __init__(self, hospital_id, start_date, end_date):
|
|
self.hospital_id = hospital_id
|
|
if isinstance(start_date, str):
|
|
start_date = date.fromisoformat(start_date)
|
|
if isinstance(end_date, str):
|
|
end_date = date.fromisoformat(end_date)
|
|
self.start_date = start_date
|
|
self.end_date = end_date
|
|
self.year = start_date.year
|
|
self._cache = {}
|
|
|
|
if start_date.year == end_date.year:
|
|
self.active_months = list(range(start_date.month, end_date.month + 1))
|
|
else:
|
|
self.active_months = []
|
|
y = start_date.year
|
|
m = start_date.month
|
|
while (y, m) <= (end_date.year, end_date.month):
|
|
self.active_months.append(m)
|
|
m += 1
|
|
if m > 12:
|
|
m = 1
|
|
y += 1
|
|
|
|
self.active_month_labels = [calendar.month_abbr[m] for m in self.active_months]
|
|
|
|
def _base_qs(self):
|
|
dt_start = dj_timezone.make_aware(datetime.combine(self.start_date, datetime.min.time()))
|
|
dt_end = dj_timezone.make_aware(datetime.combine(self.end_date, datetime.max.time()))
|
|
return Complaint.objects.filter(
|
|
hospital_id=self.hospital_id,
|
|
created_at__range=(dt_start, dt_end),
|
|
).select_related(
|
|
"department", "domain", "category", "location",
|
|
"source", "assigned_to", "created_by",
|
|
)
|
|
|
|
def _month_qs(self, month):
|
|
return self._base_qs().filter(created_at__month=month)
|
|
|
|
def _cached(self, key, fn):
|
|
if key not in self._cache:
|
|
self._cache[key] = fn()
|
|
return self._cache[key]
|
|
|
|
def get_available_years(self):
|
|
years = (
|
|
Complaint.objects.filter(hospital_id=self.hospital_id)
|
|
.values_list("created_at__year", flat=True)
|
|
.distinct()
|
|
.order_by("-created_at__year")
|
|
)
|
|
return list(years)
|
|
|
|
def get_kpi_data(self):
|
|
return self._cached("kpi_data", self._compute_kpi_data)
|
|
|
|
def _compute_kpi_data(self):
|
|
result = {}
|
|
for m in self.active_months:
|
|
qs = self._month_qs(m)
|
|
total = qs.count()
|
|
closed = qs.filter(status__in=["closed", "resolved"]).count()
|
|
resolved_72h = 0
|
|
resolved_48h = 0
|
|
for c in qs.filter(status__in=["closed", "resolved"]).iterator(chunk_size=2000):
|
|
if c.resolved_at and c.activated_at:
|
|
hours = (c.resolved_at - c.activated_at).total_seconds() / 3600
|
|
if hours <= 72:
|
|
resolved_72h += 1
|
|
if hours <= 48:
|
|
resolved_48h += 1
|
|
elif c.closed_at and c.created_at:
|
|
hours = (c.closed_at - c.created_at).total_seconds() / 3600
|
|
if hours <= 48:
|
|
resolved_48h += 1
|
|
activated = qs.filter(activated_at__isnull=False).count()
|
|
not_activated = total - activated
|
|
|
|
result[m] = {
|
|
"total": total,
|
|
"closed": closed,
|
|
"resolution_rate": round(closed / total, 3) if total > 0 else 0,
|
|
"resolved_72h": resolved_72h,
|
|
"resolved_72h_rate": round(resolved_72h / total, 3) if total > 0 else 0,
|
|
"resolved_48h": resolved_48h,
|
|
"resolved_48h_rate": round(resolved_48h / total, 3) if total > 0 else 0,
|
|
"activated": activated,
|
|
"activation_rate": round(activated / total, 3) if total > 0 else 0,
|
|
"not_activated": not_activated,
|
|
"not_activated_rate": round(not_activated / total, 3) if total > 0 else 0,
|
|
}
|
|
|
|
totals = {
|
|
"total": sum(r["total"] for r in result.values()),
|
|
"closed": sum(r["closed"] for r in result.values()),
|
|
"resolved_72h": sum(r["resolved_72h"] for r in result.values()),
|
|
"resolved_48h": sum(r["resolved_48h"] for r in result.values()),
|
|
"activated": sum(r["activated"] for r in result.values()),
|
|
"not_activated": sum(r["not_activated"] for r in result.values()),
|
|
}
|
|
totals["resolution_rate"] = round(totals["closed"] / totals["total"], 3) if totals["total"] > 0 else 0
|
|
totals["resolved_72h_rate"] = round(totals["resolved_72h"] / totals["total"], 3) if totals["total"] > 0 else 0
|
|
totals["resolved_48h_rate"] = round(totals["resolved_48h"] / totals["total"], 3) if totals["total"] > 0 else 0
|
|
totals["activation_rate"] = round(totals["activated"] / totals["total"], 3) if totals["total"] > 0 else 0
|
|
totals["not_activated_rate"] = round(totals["not_activated"] / totals["total"], 3) if totals["total"] > 0 else 0
|
|
|
|
return {"months": result, "totals": totals}
|
|
|
|
def get_satisfaction_data(self):
|
|
return self._cached("satisfaction_data", self._compute_satisfaction_data)
|
|
|
|
def _compute_satisfaction_data(self):
|
|
result = {}
|
|
call_months = {}
|
|
survey_months = {}
|
|
for m in self.active_months:
|
|
qs = self._month_qs(m)
|
|
total = qs.count()
|
|
sat_qs = qs.filter(satisfaction__isnull=False).exclude(satisfaction="")
|
|
surveyed = sat_qs.count()
|
|
satisfied = sat_qs.filter(satisfaction="satisfied").count()
|
|
result[m] = {
|
|
"surveyed": surveyed,
|
|
"satisfied": satisfied,
|
|
"rate": round(satisfied / surveyed, 3) if surveyed > 0 else 0,
|
|
}
|
|
|
|
call_resp = qs.filter(resolution_survey_sent_at__isnull=False).count()
|
|
call_months[m] = {
|
|
"responses": call_resp,
|
|
"total": total,
|
|
"rate": round(call_resp / total, 3) if total > 0 else 0,
|
|
}
|
|
|
|
survey_resp = sat_qs.count()
|
|
survey_months[m] = {
|
|
"responses": survey_resp,
|
|
"total": total,
|
|
"rate": round(survey_resp / total, 3) if total > 0 else 0,
|
|
}
|
|
|
|
total_surveyed = sum(r["surveyed"] for r in result.values())
|
|
total_satisfied = sum(r["satisfied"] for r in result.values())
|
|
|
|
call_total = sum(c["responses"] for c in call_months.values())
|
|
call_grand = sum(c["total"] for c in call_months.values())
|
|
survey_total = sum(s["responses"] for s in survey_months.values())
|
|
|
|
return {
|
|
"months": result,
|
|
"totals": {
|
|
"surveyed": total_surveyed,
|
|
"satisfied": total_satisfied,
|
|
"rate": round(total_satisfied / total_surveyed, 3) if total_surveyed > 0 else 0,
|
|
},
|
|
"call_months": call_months,
|
|
"call_totals": {
|
|
"responses": call_total,
|
|
"total": call_grand,
|
|
"rate": round(call_total / call_grand, 3) if call_grand > 0 else 0,
|
|
},
|
|
"survey_months": survey_months,
|
|
"survey_totals": {
|
|
"responses": survey_total,
|
|
"total": call_grand,
|
|
"rate": round(survey_total / call_grand, 3) if call_grand > 0 else 0,
|
|
},
|
|
}
|
|
|
|
def get_source_breakdown(self):
|
|
return self._cached("source_breakdown", self._compute_source_breakdown)
|
|
|
|
def _compute_source_breakdown(self):
|
|
result = {}
|
|
for m in self.active_months:
|
|
qs = self._month_qs(m)
|
|
total = qs.count()
|
|
ext = qs.filter(complaint_source_type="external").count()
|
|
internal = total - ext
|
|
|
|
moh = qs.filter(Q(source__code__iexact="moh") | Q(moh_reference__gt="")).count()
|
|
chi = qs.filter(
|
|
Q(source__code__iexact="chi") | Q(source__code__iexact="cchi") | Q(chi_reference__gt="")
|
|
).count()
|
|
insurance = qs.exclude(
|
|
Q(source__code__iexact="moh") | Q(source__code__iexact="chi") | Q(source__code__iexact="cchi")
|
|
).filter(source__name_en__icontains="insurance").count()
|
|
patients = qs.filter(
|
|
Q(source__name_en__icontains="patient") | Q(source__name_en__icontains="walk")
|
|
).count()
|
|
relatives = qs.filter(
|
|
Q(source__name_en__icontains="relative")
|
|
| Q(relation_to_patient="relative")
|
|
).count()
|
|
|
|
result[m] = {
|
|
"total": total,
|
|
"external": ext,
|
|
"internal": internal,
|
|
"moh": moh,
|
|
"chi": chi,
|
|
"insurance": insurance,
|
|
"patients": patients,
|
|
"relatives": relatives,
|
|
}
|
|
return result
|
|
|
|
def get_location_breakdown(self):
|
|
return self._cached("location_breakdown", self._compute_location_breakdown)
|
|
|
|
def _compute_location_breakdown(self):
|
|
result = {}
|
|
for m in self.active_months:
|
|
qs = self._month_qs(m)
|
|
total = qs.count()
|
|
ip = qs.filter(
|
|
Q(location__name_en__icontains="inpatient") | Q(location__name_en__icontains="in-patient")
|
|
).count()
|
|
er = qs.filter(
|
|
Q(location__name_en__icontains="emergency") | Q(location__name_en__icontains="er")
|
|
).count()
|
|
op = total - ip - er
|
|
|
|
result[m] = {"total": total, "ip": ip, "op": op, "er": er}
|
|
return result
|
|
|
|
def get_dept_type_breakdown(self):
|
|
return self._cached("dept_type_breakdown", self._compute_dept_type_breakdown)
|
|
|
|
def _compute_dept_type_breakdown(self):
|
|
result = {}
|
|
for m in self.active_months:
|
|
qs = self._month_qs(m)
|
|
total = qs.count()
|
|
medical = qs.filter(domain__domain_type="medical").count()
|
|
admin = qs.filter(domain__domain_type="admin").count()
|
|
nursing = qs.filter(domain__domain_type="nursing").count()
|
|
support = total - medical - admin - nursing
|
|
|
|
result[m] = {
|
|
"total": total,
|
|
"medical": medical,
|
|
"admin": admin,
|
|
"nursing": nursing,
|
|
"support": support,
|
|
}
|
|
return result
|
|
|
|
def get_escalated_breakdown(self):
|
|
return self._cached("escalated_breakdown", self._compute_escalated_breakdown)
|
|
|
|
def _compute_escalated_breakdown(self):
|
|
qs = self._base_qs().filter(escalated_at__isnull=False)
|
|
by_category = defaultdict(lambda: defaultdict(int))
|
|
by_source = defaultdict(int)
|
|
for c in qs.iterator(chunk_size=2000):
|
|
dept_name = c.department.name if c.department else "Unknown"
|
|
domain_type = "medical"
|
|
if c.domain:
|
|
dt = c.domain.domain_type or ""
|
|
if dt == "admin" or dt == "non_medical":
|
|
domain_type = "non_medical"
|
|
elif dt == "nursing":
|
|
domain_type = "nursing"
|
|
elif dt in ("support", "support_services"):
|
|
domain_type = "support"
|
|
by_category[domain_type][dept_name] += 1
|
|
|
|
if c.complaint_source_type == "internal":
|
|
by_source["internal"] += 1
|
|
elif c.source and c.source.code and c.source.code.upper() == "MOH":
|
|
by_source["moh"] += 1
|
|
elif c.source and c.source.code and c.source.code.upper() in ("CHI", "CCHI"):
|
|
by_source["chi"] += 1
|
|
else:
|
|
by_source["internal"] += 1
|
|
|
|
total_escalated = qs.count()
|
|
total_complaints = self._base_qs().count()
|
|
|
|
return {
|
|
"by_category": dict(by_category),
|
|
"by_source": dict(by_source),
|
|
"total_escalated": total_escalated,
|
|
"total_complaints": total_complaints,
|
|
"escalation_rate": round(total_escalated / total_complaints, 3) if total_complaints > 0 else 0,
|
|
}
|
|
|
|
def _bucket_response_time(self, complaint):
|
|
if not complaint.closed_at or not complaint.created_at:
|
|
return None
|
|
hours = (complaint.closed_at - complaint.created_at).total_seconds() / 3600
|
|
if hours <= 24:
|
|
return "24h"
|
|
elif hours <= 48:
|
|
return "48h"
|
|
elif hours <= 72:
|
|
return "72h"
|
|
else:
|
|
return "72h+"
|
|
|
|
def _response_time_hours(self, complaint):
|
|
if not complaint.closed_at or not complaint.created_at:
|
|
return None
|
|
return (complaint.closed_at - complaint.created_at).total_seconds() / 3600
|
|
|
|
def get_response_rates(self):
|
|
return self._cached("response_rates", self._compute_response_rates)
|
|
|
|
def _compute_response_rates(self):
|
|
qs = self._base_qs().filter(status__in=["closed", "resolved"])
|
|
buckets = {
|
|
"internal": defaultdict(int),
|
|
"external_moh": defaultdict(int),
|
|
"external_chi": defaultdict(int),
|
|
}
|
|
individual = {
|
|
"internal": [],
|
|
"external_moh": [],
|
|
"external_chi": [],
|
|
}
|
|
for c in qs.iterator(chunk_size=2000):
|
|
bucket = self._bucket_response_time(c)
|
|
if not bucket:
|
|
continue
|
|
hours = self._response_time_hours(c)
|
|
if c.complaint_source_type == "internal":
|
|
buckets["internal"][bucket] += 1
|
|
individual["internal"].append({
|
|
"ref": c.reference_number or "",
|
|
"dept": c.department.name if c.department else "Unknown",
|
|
"hours": round(hours, 2) if hours else None,
|
|
"bucket": bucket,
|
|
"status": c.status,
|
|
})
|
|
elif c.source and c.source.code and c.source.code.upper() == "MOH":
|
|
buckets["external_moh"][bucket] += 1
|
|
individual["external_moh"].append({
|
|
"ref": c.reference_number or "",
|
|
"dept": c.department.name if c.department else "Unknown",
|
|
"hours": round(hours, 2) if hours else None,
|
|
"bucket": bucket,
|
|
"status": c.status,
|
|
})
|
|
elif c.source and c.source.code and c.source.code.upper() in ("CHI", "CCHI"):
|
|
buckets["external_chi"][bucket] += 1
|
|
individual["external_chi"].append({
|
|
"ref": c.reference_number or "",
|
|
"dept": c.department.name if c.department else "Unknown",
|
|
"hours": round(hours, 2) if hours else None,
|
|
"bucket": bucket,
|
|
"status": c.status,
|
|
})
|
|
else:
|
|
buckets["internal"][bucket] += 1
|
|
individual["internal"].append({
|
|
"ref": c.reference_number or "",
|
|
"dept": c.department.name if c.department else "Unknown",
|
|
"hours": round(hours, 2) if hours else None,
|
|
"bucket": bucket,
|
|
"status": c.status,
|
|
})
|
|
|
|
def _to_rate(d):
|
|
total = sum(d.values())
|
|
return {
|
|
"counts": {b: d.get(b, 0) for b in TIMELINE_BUCKETS},
|
|
"total": total,
|
|
"rates": {b: round(d.get(b, 0) / total, 3) if total > 0 else 0 for b in TIMELINE_BUCKETS},
|
|
}
|
|
|
|
return {
|
|
"internal": {**_to_rate(buckets["internal"]), "individual": individual["internal"]},
|
|
"moh": {**_to_rate(buckets["external_moh"]), "individual": individual["external_moh"]},
|
|
"chi": {**_to_rate(buckets["external_chi"]), "individual": individual["external_chi"]},
|
|
}
|
|
|
|
def get_monthly_summary(self):
|
|
return self._cached("monthly_summary", self._compute_monthly_summary)
|
|
|
|
def _compute_monthly_summary(self):
|
|
source = self.get_source_breakdown()
|
|
result = []
|
|
for m in self.active_months:
|
|
s = source[m]
|
|
total = s["total"]
|
|
result.append({
|
|
"month": calendar.month_abbr[m],
|
|
"month_num": m,
|
|
"moh": s["moh"],
|
|
"chi": s["chi"],
|
|
"insurance": s["insurance"],
|
|
"internal": s["internal"],
|
|
"total": total,
|
|
"moh_pct": round(s["moh"] / total, 3) if total > 0 else 0,
|
|
"chi_pct": round(s["chi"] / total, 3) if total > 0 else 0,
|
|
})
|
|
return result
|
|
|
|
def get_per_department_breakdown(self):
|
|
return self._cached("per_department", self._compute_per_department)
|
|
|
|
def _compute_per_department(self):
|
|
qs = self._base_qs()
|
|
dept_data = defaultdict(lambda: defaultdict(int))
|
|
dept_domain = {}
|
|
|
|
for c in qs.iterator(chunk_size=2000):
|
|
if not c.department:
|
|
continue
|
|
dept_name = c.department.name
|
|
dept_id = c.department_id
|
|
|
|
if c.department.category:
|
|
domain_type = c.department.category
|
|
elif c.domain:
|
|
dt = c.domain.domain_type or ""
|
|
if dt == "admin" or dt == "non_medical":
|
|
domain_type = "non_medical"
|
|
elif dt == "nursing":
|
|
domain_type = "nursing"
|
|
elif dt in ("support", "support_services"):
|
|
domain_type = "support_services"
|
|
else:
|
|
domain_type = "medical"
|
|
|
|
dept_domain[dept_id] = domain_type
|
|
|
|
is_moh = (c.source and c.source.code and c.source.code.upper() == "MOH")
|
|
is_chi = (c.source and c.source.code and c.source.code.upper() in ("CHI", "CCHI"))
|
|
is_insurance = (
|
|
not is_moh and not is_chi and
|
|
c.source and c.source.name_en and "insurance" in c.source.name_en.lower()
|
|
)
|
|
is_escalated = c.escalated_at is not None
|
|
|
|
key = (dept_id, dept_name)
|
|
dept_data[key]["total"] += 1
|
|
if is_moh:
|
|
dept_data[key]["moh"] += 1
|
|
if is_chi:
|
|
dept_data[key]["chi"] += 1
|
|
if is_insurance:
|
|
dept_data[key]["insurance"] += 1
|
|
if c.complaint_source_type == "internal" and not is_moh and not is_chi and not is_insurance:
|
|
dept_data[key]["internal"] += 1
|
|
if is_escalated:
|
|
dept_data[key]["escalated"] += 1
|
|
|
|
if c.closed_at and c.created_at:
|
|
hours = (c.closed_at - c.created_at).total_seconds() / 3600
|
|
dept_data[key]["response_hours_sum"] += hours
|
|
dept_data[key]["response_count"] += 1
|
|
|
|
categories = defaultdict(list)
|
|
for (dept_id, dept_name), data in sorted(dept_data.items(), key=lambda x: x[0][1]):
|
|
domain_type = dept_domain.get(dept_id, "medical")
|
|
avg_response = round(data["response_hours_sum"] / data["response_count"], 1) if data["response_count"] > 0 else 0
|
|
categories[domain_type].append({
|
|
"name": dept_name,
|
|
"moh": data["moh"],
|
|
"chi": data["chi"],
|
|
"insurance": data.get("insurance", 0),
|
|
"internal": data["internal"],
|
|
"total": data["total"],
|
|
"escalated": data["escalated"],
|
|
"avg_response_days": round(avg_response / 24, 1),
|
|
})
|
|
|
|
source_totals = {
|
|
"moh": sum(d["moh"] for cats in categories.values() for d in cats),
|
|
"chi": sum(d["chi"] for cats in categories.values() for d in cats),
|
|
"internal": sum(d["internal"] for cats in categories.values() for d in cats),
|
|
"insurance": sum(d["insurance"] for cats in categories.values() for d in cats),
|
|
"total": sum(d["total"] for cats in categories.values() for d in cats),
|
|
}
|
|
|
|
avg_response = {}
|
|
for dtype in ["medical", "non_medical", "nursing", "support_services"]:
|
|
depts = categories.get(dtype, [])
|
|
total_hours = sum(d["avg_response_days"] * 24 * (d["total"] - d.get("_excluded", 0)) for d in depts)
|
|
total_count = sum(d["total"] for d in depts)
|
|
avg_response[dtype] = round(total_hours / (total_count * 24), 1) if total_count > 0 else 0
|
|
|
|
return {
|
|
"categories": dict(categories),
|
|
"source_totals": source_totals,
|
|
"avg_response_days": avg_response,
|
|
}
|
|
|
|
def get_per_dept_response_times(self):
|
|
return self._cached("per_dept_response_times", self._compute_per_dept_response_times)
|
|
|
|
def _compute_per_dept_response_times(self):
|
|
qs = self._base_qs().filter(status__in=["closed", "resolved"])
|
|
dept_times = defaultdict(list)
|
|
dept_domain = {}
|
|
|
|
for c in qs.iterator(chunk_size=2000):
|
|
if not c.department:
|
|
continue
|
|
if not c.closed_at or not c.created_at:
|
|
continue
|
|
|
|
dept_name = c.department.name
|
|
if c.department.category:
|
|
domain_type = c.department.category
|
|
elif c.domain:
|
|
dt = c.domain.domain_type or ""
|
|
if dt == "admin" or dt == "non_medical":
|
|
domain_type = "non_medical"
|
|
elif dt == "nursing":
|
|
domain_type = "nursing"
|
|
elif dt in ("support", "support_services"):
|
|
domain_type = "support_services"
|
|
else:
|
|
domain_type = "medical"
|
|
|
|
dept_domain[dept_name] = domain_type
|
|
hours = (c.closed_at - c.created_at).total_seconds() / 3600
|
|
|
|
is_escalated = c.escalated_at is not None
|
|
status_label = "Escalated" if is_escalated else (
|
|
"Closed" if c.status == "closed" else ""
|
|
)
|
|
|
|
dept_times[dept_name].append({
|
|
"hours": round(hours, 2),
|
|
"days": round(hours / 24, 1),
|
|
"status": status_label,
|
|
})
|
|
|
|
categories = defaultdict(dict)
|
|
for dept_name, times in dept_times.items():
|
|
domain_type = dept_domain.get(dept_name, "medical")
|
|
categories[domain_type][dept_name] = times
|
|
|
|
return dict(categories)
|
|
|
|
def get_all_complaints_raw(self):
|
|
return self._cached("all_raw", self._compute_all_raw)
|
|
|
|
def _compute_all_raw(self):
|
|
qs = self._base_qs().order_by("created_at")
|
|
rows = []
|
|
for c in qs.iterator(chunk_size=2000):
|
|
source_label = c.complaint_source_type
|
|
if c.moh_reference:
|
|
source_label = "MOH"
|
|
elif c.chi_reference:
|
|
source_label = "CHI"
|
|
elif c.source and c.source.name_en:
|
|
sn = c.source.name_en.lower()
|
|
if "insurance" in sn:
|
|
source_label = "Insurance"
|
|
elif "patient" in sn or "walk" in sn:
|
|
source_label = "Patient"
|
|
elif "relative" in sn:
|
|
source_label = "Patient's relatives"
|
|
|
|
location_label = ""
|
|
if c.location and c.location.name_en:
|
|
location_label = c.location.name_en
|
|
|
|
dept_label = c.department.name if c.department else ""
|
|
domain_label = c.domain.name_en if c.domain else ""
|
|
|
|
hours_to_bucket = ""
|
|
if c.closed_at and c.created_at:
|
|
h = (c.closed_at - c.created_at).total_seconds() / 3600
|
|
if h <= 24:
|
|
hours_to_bucket = "24 Hours"
|
|
elif h <= 48:
|
|
hours_to_bucket = "48 Hours"
|
|
elif h <= 72:
|
|
hours_to_bucket = "72 Hours"
|
|
else:
|
|
hours_to_bucket = "More than 72 Hours"
|
|
|
|
created_by_name = ""
|
|
if c.created_by:
|
|
created_by_name = c.created_by.get_full_name() or str(c.created_by)
|
|
assigned_to_name = ""
|
|
if c.assigned_to:
|
|
assigned_to_name = c.assigned_to.get_full_name() or str(c.assigned_to)
|
|
|
|
processing_time = None
|
|
if c.resolved_at and c.activated_at:
|
|
processing_time = (c.resolved_at - c.activated_at).total_seconds() / 3600
|
|
|
|
rows.append({
|
|
"reference": c.reference_number or "",
|
|
"source": source_label,
|
|
"location": location_label,
|
|
"department": dept_label,
|
|
"domain": domain_label,
|
|
"created_at": c.created_at,
|
|
"entered_by": created_by_name,
|
|
"timeline_bucket": hours_to_bucket,
|
|
"form_sent_at": c.form_sent_at,
|
|
"activated_at": c.activated_at,
|
|
"assigned_to": assigned_to_name,
|
|
"sent_at": c.forwarded_to_dept_at,
|
|
"time_entry_to_send": (
|
|
(c.forwarded_to_dept_at - c.created_at).total_seconds() / 3600
|
|
if c.forwarded_to_dept_at and c.created_at else None
|
|
),
|
|
"first_reminder": c.reminder_sent_at,
|
|
"time_first_reminder_to_send": (
|
|
(c.reminder_sent_at - c.forwarded_to_dept_at).total_seconds() / 3600
|
|
if c.reminder_sent_at and c.forwarded_to_dept_at else None
|
|
),
|
|
"second_reminder": c.second_reminder_sent_at,
|
|
"escalated_at": c.escalated_at,
|
|
"closed_at": c.closed_at,
|
|
"resolved_at": c.resolved_at,
|
|
"response_date": c.response_date,
|
|
"processing_hours": round(processing_time, 2) if processing_time else None,
|
|
"status": c.status,
|
|
"satisfaction": c.satisfaction or "",
|
|
})
|
|
return rows
|
|
|
|
def get_moh_kpi_data(self):
|
|
return self._cached("moh_kpi", self._compute_moh_kpi)
|
|
|
|
def _compute_moh_kpi(self):
|
|
moh_months = {}
|
|
real_months = {}
|
|
for m in self.active_months:
|
|
qs = self._month_qs(m).filter(
|
|
Q(source__code__iexact="moh") | Q(moh_reference__gt="")
|
|
)
|
|
total = qs.count()
|
|
satisfied_scoring = qs.filter(satisfaction="satisfied").count()
|
|
moh_months[m] = {
|
|
"satisfied_scoring": satisfied_scoring,
|
|
"total": total,
|
|
"rate": round(satisfied_scoring / total, 3) if total > 0 else 0,
|
|
}
|
|
|
|
real_qs = qs.filter(satisfaction__isnull=False).exclude(satisfaction="")
|
|
real_resp = real_qs.count()
|
|
real_satisfied = real_qs.filter(satisfaction="satisfied").count()
|
|
real_months[m] = {
|
|
"satisfied_scoring": real_satisfied,
|
|
"total": total,
|
|
"rate": round(real_satisfied / total, 3) if total > 0 else 0,
|
|
}
|
|
|
|
moh_total = sum(v["total"] for v in moh_months.values())
|
|
moh_satisfied = sum(v["satisfied_scoring"] for v in moh_months.values())
|
|
real_satisfied_total = sum(v["satisfied_scoring"] for v in real_months.values())
|
|
|
|
return {
|
|
"moh": {
|
|
"months": moh_months,
|
|
"totals": {
|
|
"satisfied_scoring": moh_satisfied,
|
|
"total": moh_total,
|
|
"rate": round(moh_satisfied / moh_total, 3) if moh_total > 0 else 0,
|
|
},
|
|
},
|
|
"real": {
|
|
"months": real_months,
|
|
"totals": {
|
|
"satisfied_scoring": real_satisfied_total,
|
|
"total": moh_total,
|
|
"rate": round(real_satisfied_total / moh_total, 3) if moh_total > 0 else 0,
|
|
},
|
|
},
|
|
}
|
|
|
|
def get_location_ratios(self):
|
|
return self._cached("location_ratios", self._compute_location_ratios)
|
|
|
|
def _compute_location_ratios(self):
|
|
from apps.dashboard.services.census import CensusService, PATIENT_TYPE_MAP
|
|
|
|
location = self.get_location_breakdown()
|
|
census = CensusService(hospital_id=self.hospital_id, year=self.year)
|
|
monthly_visits = census.get_monthly_counts(self.year)
|
|
|
|
result = {}
|
|
for m in self.active_months:
|
|
visits = monthly_visits.get(m, {"OPD": 0, "ER": 0, "IP": 0})
|
|
loc = location[m]
|
|
result[m] = {
|
|
"ip_complaints": loc["ip"],
|
|
"ip_patients": visits["IP"],
|
|
"ip_ratio": round(loc["ip"] / visits["IP"], 6) if visits["IP"] > 0 else 0,
|
|
"op_complaints": loc["op"],
|
|
"op_patients": visits["OPD"],
|
|
"op_ratio": round(loc["op"] / visits["OPD"], 6) if visits["OPD"] > 0 else 0,
|
|
"er_complaints": loc["er"],
|
|
"er_patients": visits["ER"],
|
|
"er_ratio": round(loc["er"] / visits["ER"], 6) if visits["ER"] > 0 else 0,
|
|
}
|
|
|
|
totals = {}
|
|
for area in ["ip", "op", "er"]:
|
|
total_comp = sum(result[m][f"{area}_complaints"] for m in self.active_months)
|
|
total_pat = sum(result[m][f"{area}_patients"] for m in self.active_months)
|
|
totals[area] = {
|
|
"complaints": total_comp,
|
|
"patients": total_pat,
|
|
"ratio": round(total_comp / total_pat, 6) if total_pat > 0 else 0,
|
|
}
|
|
|
|
return {"months": result, "totals": totals}
|
|
|
|
def get_quarterly_volumes(self):
|
|
return self._cached("quarterly_volumes", self._compute_quarterly_volumes)
|
|
|
|
def _compute_quarterly_volumes(self):
|
|
from apps.dashboard.services.census import CensusService
|
|
|
|
census = CensusService(hospital_id=self.hospital_id, year=self.year)
|
|
return census.get_quarterly_data(self.year)
|
|
|
|
def get_source_totals(self):
|
|
return self._cached("source_totals", self._compute_source_totals)
|
|
|
|
def _compute_source_totals(self):
|
|
source = self.get_source_breakdown()
|
|
total_all = sum(s["total"] for s in source.values())
|
|
moh = sum(s["moh"] for s in source.values())
|
|
chi = sum(s["chi"] for s in source.values())
|
|
insurance = sum(s["insurance"] for s in source.values())
|
|
internal = sum(s["internal"] for s in source.values())
|
|
external = sum(s["external"] for s in source.values())
|
|
return {
|
|
"internal": {"count": internal, "pct": round(internal / total_all, 3) if total_all else 0},
|
|
"external": {"count": external, "pct": round(external / total_all, 3) if total_all else 0},
|
|
"moh": {"count": moh, "pct": round(moh / total_all, 3) if total_all else 0},
|
|
"chi": {"count": chi, "pct": round(chi / total_all, 3) if total_all else 0},
|
|
"insurance": {"count": insurance, "pct": round(insurance / total_all, 3) if total_all else 0},
|
|
"total": total_all,
|
|
}
|
|
|
|
def get_dept_type_monthly_table(self):
|
|
return self._cached("dept_type_table", self._compute_dept_type_table)
|
|
|
|
def _compute_dept_type_table(self):
|
|
dept = self.get_dept_type_breakdown()
|
|
result = []
|
|
for m in self.active_months:
|
|
d = dept[m]
|
|
result.append({
|
|
"month": calendar.month_abbr[m],
|
|
"medical": d["medical"],
|
|
"admin": d["admin"],
|
|
"nursing": d["nursing"],
|
|
"support": d["support"],
|
|
"total": d["total"],
|
|
})
|
|
totals = {
|
|
"medical": sum(r["medical"] for r in result),
|
|
"admin": sum(r["admin"] for r in result),
|
|
"nursing": sum(r["nursing"] for r in result),
|
|
"support": sum(r["support"] for r in result),
|
|
"total": sum(r["total"] for r in result),
|
|
}
|
|
total_all = totals["total"]
|
|
pcts = {k: round(v / total_all, 3) if total_all else 0 for k, v in totals.items() if k != "total"}
|
|
pcts["total"] = 1
|
|
return {"months": result, "totals": totals, "percentages": pcts}
|
|
|
|
def get_chart_data(self):
|
|
kpi = self.get_kpi_data()
|
|
source = self.get_source_breakdown()
|
|
location = self.get_location_breakdown()
|
|
dept_type = self.get_dept_type_breakdown()
|
|
satisfaction = self.get_satisfaction_data()
|
|
|
|
return {
|
|
"kpi_resolution": {
|
|
"months": self.active_month_labels,
|
|
"values": [kpi["months"][m]["resolution_rate"] for m in self.active_months],
|
|
"target": 0.95,
|
|
"threshold": 0.9,
|
|
},
|
|
"kpi_72h": {
|
|
"months": self.active_month_labels,
|
|
"values": [kpi["months"][m]["resolved_72h_rate"] for m in self.active_months],
|
|
},
|
|
"monthly_totals": {
|
|
"months": self.active_month_labels,
|
|
"values": [kpi["months"][m]["total"] for m in self.active_months],
|
|
},
|
|
"source_distribution": {
|
|
"external": sum(s["external"] for s in source.values()),
|
|
"internal": sum(s["internal"] for s in source.values()),
|
|
},
|
|
"location_distribution": {
|
|
"IP": sum(l["ip"] for l in location.values()),
|
|
"OP": sum(l["op"] for l in location.values()),
|
|
"ER": sum(l["er"] for l in location.values()),
|
|
},
|
|
"dept_type_distribution": {
|
|
"Medical": sum(d["medical"] for d in dept_type.values()),
|
|
"Admin": sum(d["admin"] for d in dept_type.values()),
|
|
"Nursing": sum(d["nursing"] for d in dept_type.values()),
|
|
"Support": sum(d["support"] for d in dept_type.values()),
|
|
},
|
|
"satisfaction": {
|
|
"months": self.active_month_labels,
|
|
"values": [satisfaction["months"][m]["rate"] for m in self.active_months],
|
|
},
|
|
}
|