HH/apps/dashboard/services/standards_report_service.py
ismail c5f76b3855
Some checks are pending
Build and Push Docker Image / build (push) Waiting to run
updates
2026-05-11 14:45:30 +03:00

226 lines
9.1 KiB
Python

import json
from collections import defaultdict
from decimal import Decimal
from django.db.models import Count, Q, Sum, Case, When, Value, DecimalField
from apps.standards.models import StandardSource, StandardCategory, Standard, StandardCompliance
class StandardsReportService:
def __init__(self, hospital_id, source_code=None):
self.hospital_id = hospital_id
self.source_code = source_code
def get_sources(self):
return list(StandardSource.objects.filter(is_active=True).values("id", "name", "name_ar", "code"))
def get_categories(self, source_code=None):
sc = source_code or self.source_code
qs = StandardCategory.objects.filter(source__isnull=False)
if sc:
qs = qs.filter(source__code=sc)
return list(qs.values("id", "name", "name_ar", "source__code", "max_score", "order"))
def get_summary(self):
compliance_qs = StandardCompliance.objects.filter(
hospital_id=self.hospital_id,
standard__is_heading=False,
standard__is_assessable=True,
)
if self.source_code:
compliance_qs = compliance_qs.filter(standard__source__code=self.source_code)
total = compliance_qs.count()
if total == 0:
return {"total": 0, "met": 0, "partially_met": 0, "not_met": 0, "not_assessed": 0,
"met_pct": 0, "partially_met_pct": 0, "not_met_pct": 0, "not_assessed_pct": 0}
counts = compliance_qs.values("status").annotate(cnt=Count("id"))
status_map = {c["status"]: c["cnt"] for c in counts}
met = status_map.get("met", 0)
partially_met = status_map.get("partially_met", 0)
not_met = status_map.get("not_met", 0)
not_assessed = status_map.get("not_assessed", 0)
not_applicable = status_map.get("not_applicable", 0)
return {
"total": total,
"met": met,
"partially_met": partially_met,
"not_met": not_met,
"not_assessed": not_assessed,
"not_applicable": not_applicable,
"met_pct": round(met / total * 100, 1) if total else 0,
"partially_met_pct": round(partially_met / total * 100, 1) if total else 0,
"not_met_pct": round(not_met / total * 100, 1) if total else 0,
"not_assessed_pct": round(not_assessed / total * 100, 1) if total else 0,
}
def get_category_breakdown(self):
compliance_qs = StandardCompliance.objects.filter(
hospital_id=self.hospital_id,
standard__is_heading=False,
standard__is_assessable=True,
)
if self.source_code:
compliance_qs = compliance_qs.filter(standard__source__code=self.source_code)
categories = StandardCategory.objects.filter(source__isnull=False)
if self.source_code:
categories = categories.filter(source__code=self.source_code)
result = []
for cat in categories.order_by("order", "name"):
cat_compliance = compliance_qs.filter(standard__category=cat)
total = cat_compliance.count()
if total == 0:
continue
met = cat_compliance.filter(status="met").count()
partially = cat_compliance.filter(status="partially_met").count()
not_met = cat_compliance.filter(status="not_met").count()
score_data = cat_compliance.filter(score__isnull=False).aggregate(
total_score=Sum("score"), total_max=Sum("max_score")
)
score = float(score_data["total_score"] or 0)
max_score = float(score_data["total_max"] or float(cat.max_score) if cat.max_score else 0)
result.append({
"id": str(cat.id),
"name": cat.name,
"name_ar": cat.name_ar,
"total": total,
"met": met,
"partially_met": partially,
"not_met": not_met,
"met_pct": round(met / total * 100, 1) if total else 0,
"score": score,
"max_score": max_score,
"score_pct": round(score / max_score * 100, 1) if max_score else 0,
})
return result
def get_status_distribution_chart(self):
summary = self.get_summary()
return {
"labels": ["Met", "Partially Met", "Not Met", "Not Assessed"],
"labels_ar": ["محقق", "محقق جزئياً", "غير محقق", "لم يتم التقييم"],
"series": [summary["met"], summary["partially_met"], summary["not_met"], summary["not_assessed"]],
"colors": ["#10b981", "#f59e0b", "#ef4444", "#94a3b8"],
}
def get_category_compliance_chart(self):
breakdown = self.get_category_breakdown()
if not breakdown:
return None
return {
"categories": [b["name_ar"] or b["name"] for b in breakdown],
"met_pct": [b["met_pct"] for b in breakdown],
"series": [
{"name": "Met", "data": [b["met"] for b in breakdown]},
{"name": "Partially Met", "data": [b["partially_met"] for b in breakdown]},
{"name": "Not Met", "data": [b["not_met"] for b in breakdown]},
],
}
def get_score_gauge_data(self):
compliance_qs = StandardCompliance.objects.filter(
hospital_id=self.hospital_id,
standard__is_heading=False,
standard__is_assessable=True,
score__isnull=False,
)
if self.source_code:
compliance_qs = compliance_qs.filter(standard__source__code=self.source_code)
if not compliance_qs.exists():
return None
agg = compliance_qs.aggregate(
total_score=Sum("score"),
total_max=Sum("max_score"),
)
total_score = float(agg["total_score"] or 0)
total_max = float(agg["total_max"] or 0)
pct = round(total_score / total_max * 100, 1) if total_max else 0
return {
"score": total_score,
"max_score": total_max,
"pct": pct,
}
def get_corrective_actions(self):
qs = StandardCompliance.objects.filter(
hospital_id=self.hospital_id,
corrective_action__gt="",
).select_related("standard", "standard__category", "standard__source")
if self.source_code:
qs = qs.filter(standard__source__code=self.source_code)
return [
{
"code": c.standard.code,
"title": c.standard.title_ar or c.standard.title,
"category": c.standard.category.name_ar or c.standard.category.name,
"status": c.get_status_display(),
"status_ar": c.status_ar,
"corrective_action": c.corrective_action,
"priority": c.get_priority_display() if c.priority else "",
"priority_raw": c.priority,
"target_date": c.target_date,
}
for c in qs.order_by("-priority", "standard__code")
]
def get_standards_table(self, source_code=None, category_id=None):
sc = source_code or self.source_code
compliance_qs = StandardCompliance.objects.filter(
hospital_id=self.hospital_id,
).select_related("standard", "standard__category", "standard__source")
standards_qs = Standard.objects.filter(
is_heading=False,
is_assessable=True,
is_active=True,
).select_related("source", "category", "parent_standard")
if sc:
standards_qs = standards_qs.filter(source__code=sc)
compliance_qs = compliance_qs.filter(standard__source__code=sc)
if category_id:
standards_qs = standards_qs.filter(category_id=category_id)
compliance_qs = compliance_qs.filter(standard__category_id=category_id)
compliance_map = {c.standard_id: c for c in compliance_qs}
result = []
for s in standards_qs.order_by("source", "category", "order_within_category", "code"):
comp = compliance_map.get(s.id)
if not comp:
continue
result.append({
"id": str(comp.id),
"code": s.code,
"title": s.title_ar or s.title,
"category": s.category.name_ar or s.category.name,
"source": s.source.name,
"status": comp.get_status_display(),
"status_ar": comp.status_ar,
"status_raw": comp.status,
"assessment_method": comp.standard.assessment_method_ar or comp.standard.get_assessment_method_display(),
"recommendations": comp.recommendations,
"score": float(comp.score) if comp.score else None,
"max_score": float(comp.max_score) if comp.max_score else None,
})
return result
def get_all_chart_data(self):
return json.dumps({
"status_distribution": self.get_status_distribution_chart(),
"category_compliance": self.get_category_compliance_chart(),
"score_gauge": self.get_score_gauge_data(),
})