HH/apps/integrations/services/his_adapter.py
2026-03-28 14:03:56 +03:00

915 lines
34 KiB
Python

"""
HIS Adapter Service - Transforms real HIS data format to internal format
This service handles the transformation of HIS patient data into PX360's
internal format for sending surveys based on PatientType.
Simplified Flow:
1. Parse HIS patient data
2. Determine survey type from PatientType
3. Create/update HISPatientVisit record
4. Create patient record (always)
5. Create survey instance with PENDING status (only when visit complete)
6. Queue delayed send task
7. Survey sent after delay (e.g., 1 hour for OPD)
"""
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import logging
from django.utils import timezone
from apps.organizations.models import Hospital, Patient, Staff
from apps.surveys.models import SurveyTemplate, SurveyInstance, SurveyStatus
from apps.integrations.models import InboundEvent, HISPatientVisit
logger = logging.getLogger(__name__)
PATIENT_TYPE_TO_VISIT_LIST = {
"ED": "FetchPatientDataTimeStampVisitEDDataList",
"IP": "FetchPatientDataTimeStampVisitIPDataList",
"OP": "FetchPatientDataTimeStampVisitOPDataList",
}
class HISAdapter:
"""
Adapter for transforming HIS patient data format to internal format.
HIS Data Structure:
{
"FetchPatientDataTimeStampList": [{...patient demographics...}],
"FetchPatientDataTimeStampVisitEDDataList": [...],
"FetchPatientDataTimeStampVisitIPDataList": [...],
"FetchPatientDataTimeStampVisitOPDataList": [...],
"Code": 200,
"Status": "Success"
}
PatientType Codes:
- "1" -> Inpatient
- "2" or "O" -> OPD (Outpatient)
- "3" or "E" -> EMS (Emergency)
"""
@staticmethod
def parse_date(date_str: Optional[str]) -> Optional[datetime]:
"""Parse HIS date format 'DD-Mon-YYYY HH:MM' to timezone-aware datetime"""
if not date_str:
return None
try:
naive_dt = datetime.strptime(date_str, "%d-%b-%Y %H:%M")
return timezone.make_aware(naive_dt)
except ValueError:
return None
@staticmethod
def map_patient_type_to_survey_type(patient_type: str) -> str:
"""
Map HIS PatientType string to survey type name.
Based on HIS sample data:
- "IP" -> Inpatient
- "OP" -> Outpatient
- "ED" -> Emergency
Returns survey type name for template lookup.
"""
patient_type_upper = patient_type.upper() if patient_type else ""
if patient_type_upper == "IP":
return "INPATIENT"
elif patient_type_upper == "OP":
return "OPD"
elif patient_type_upper == "ED":
return "EMS"
elif patient_type_upper == "DAYCASE":
return "DAYCASE"
elif patient_type_upper == "APPOINTMENT":
return "APPOINTMENT"
else:
return "OPD"
@staticmethod
def split_patient_name(full_name: str) -> Tuple[str, str]:
"""Split patient name into first and last name"""
parts = full_name.strip().split()
if len(parts) == 1:
return parts[0], ""
elif len(parts) == 2:
return parts[0], parts[1]
else:
return parts[0], " ".join(parts[1:])
@staticmethod
def extract_patient_visits(his_data: Dict, patient_id: str, patient_type: str) -> List[Dict]:
"""
Extract visit events for a specific patient from the appropriate sublist.
Uses PatientID to match and PatientType to determine which sublist to use:
- "ED" -> FetchPatientDataTimeStampVisitEDDataList
- "IP" -> FetchPatientDataTimeStampVisitIPDataList
- "OP" -> FetchPatientDataTimeStampVisitOPDataList
Args:
his_data: Full HIS response dict
patient_id: HIS PatientID to filter by
patient_type: HIS PatientType (ED, IP, OP)
Returns:
List of visit event dicts for this patient, sorted by BillDate
"""
list_key = PATIENT_TYPE_TO_VISIT_LIST.get(patient_type.upper())
if not list_key:
logger.warning(f"Unknown patient type '{patient_type}', no visit list found")
return []
all_visits = his_data.get(list_key, [])
patient_visits = []
for visit in all_visits:
if visit.get("PatientID") == str(patient_id):
visit_event = {
"type": visit.get("Type", ""),
"bill_date": visit.get("BillDate", ""),
"patient_type": visit.get("PatientType", patient_type),
"visit_category": patient_type,
"admission_id": visit.get("AdmissionID", ""),
"patient_id": visit.get("PatientID", ""),
"reg_code": visit.get("RegCode", ""),
"ssn": visit.get("SSN", ""),
"mobile_no": visit.get("MobileNo", ""),
}
if visit_event["bill_date"]:
parsed_date = HISAdapter.parse_date(visit_event["bill_date"])
visit_event["parsed_date"] = parsed_date.isoformat() if parsed_date else None
visit_event["sort_key"] = parsed_date if parsed_date else datetime.min
else:
visit_event["parsed_date"] = None
visit_event["sort_key"] = datetime.min
patient_visits.append(visit_event)
patient_visits.sort(key=lambda x: x.get("sort_key", datetime.min))
for visit in patient_visits:
visit.pop("sort_key", None)
return patient_visits
@staticmethod
def extract_visit_timeline(his_data: Dict) -> List[Dict]:
"""
Extract and sort visit timeline from HIS data (all patients).
Combines visits from all 3 lists and sorts by BillDate.
Used for backward compatibility with the webhook flow.
"""
all_visits = []
visit_lists = [
("ED", his_data.get("FetchPatientDataTimeStampVisitEDDataList", [])),
("IP", his_data.get("FetchPatientDataTimeStampVisitIPDataList", [])),
("OP", his_data.get("FetchPatientDataTimeStampVisitOPDataList", [])),
]
for visit_type, visits in visit_lists:
for visit in visits:
visit_event = {
"type": visit.get("Type", ""),
"bill_date": visit.get("BillDate", ""),
"patient_type": visit.get("PatientType", visit_type),
"visit_category": visit_type,
"admission_id": visit.get("AdmissionID", ""),
"patient_id": visit.get("PatientID", ""),
"reg_code": visit.get("RegCode", ""),
"ssn": visit.get("SSN", ""),
"mobile_no": visit.get("MobileNo", ""),
}
if visit_event["bill_date"]:
parsed_date = HISAdapter.parse_date(visit_event["bill_date"])
visit_event["parsed_date"] = parsed_date.isoformat() if parsed_date else None
visit_event["sort_key"] = parsed_date if parsed_date else datetime.min
all_visits.append(visit_event)
all_visits.sort(key=lambda x: x.get("sort_key", datetime.min))
for visit in all_visits:
visit.pop("sort_key", None)
return all_visits
@staticmethod
def get_or_create_hospital(hospital_data: Dict) -> Optional[Hospital]:
"""Get or create hospital from HIS data"""
hospital_name = hospital_data.get("HospitalName")
hospital_id = hospital_data.get("HospitalID")
if not hospital_name:
return None
hospital = Hospital.objects.filter(name__icontains=hospital_name).first()
if hospital:
return hospital
hospital_code = hospital_id if hospital_id else f"HOSP-{hospital_name[:3].upper()}"
hospital, created = Hospital.objects.get_or_create(
code=hospital_code, defaults={"name": hospital_name, "status": "active"}
)
return hospital
@staticmethod
def get_or_create_patient(patient_data: Dict, hospital: Hospital) -> Patient:
"""Get or create patient from HIS demographic data"""
patient_id = patient_data.get("PatientID")
mrn = patient_id
national_id = patient_data.get("SSN")
phone = patient_data.get("MobileNo")
email = patient_data.get("Email")
full_name = patient_data.get("PatientName")
nationality = patient_data.get("PatientNationality", "")
first_name, last_name = HISAdapter.split_patient_name(full_name)
dob_str = patient_data.get("DOB")
date_of_birth = HISAdapter.parse_date(dob_str) if dob_str else None
gender = patient_data.get("Gender", "").lower()
patient = Patient.objects.filter(mrn=mrn, primary_hospital=hospital).first()
if patient:
patient.first_name = first_name
patient.last_name = last_name
patient.national_id = national_id
patient.phone = phone
if email is not None:
patient.email = email
patient.date_of_birth = date_of_birth
patient.gender = gender
patient.nationality = nationality
patient.save()
return patient
mrn_taken = Patient.objects.filter(mrn=mrn).exists()
if mrn_taken and national_id:
patient = Patient.objects.filter(national_id=national_id).first()
if patient:
patient.mrn = mrn
patient.primary_hospital = hospital
patient.first_name = first_name
patient.last_name = last_name
patient.phone = phone
if email is not None:
patient.email = email
patient.date_of_birth = date_of_birth
patient.gender = gender
patient.nationality = nationality
patient.save()
return patient
if mrn_taken:
unique_mrn = f"{mrn}_{hospital.id}"
logger.warning(f"MRN collision for {mrn}, using {unique_mrn}")
mrn = unique_mrn
patient = Patient.objects.create(
mrn=mrn,
primary_hospital=hospital,
first_name=first_name,
last_name=last_name,
national_id=national_id,
phone=phone,
email=email if email else "",
date_of_birth=date_of_birth,
gender=gender,
nationality=nationality,
)
return patient
@staticmethod
def get_survey_template(patient_type: str, hospital: Hospital) -> Optional[SurveyTemplate]:
"""
Get appropriate survey template based on PatientType using explicit mapping.
"""
from apps.integrations.models import SurveyTemplateMapping
survey_template = SurveyTemplateMapping.get_template_for_patient_type(patient_type, hospital)
return survey_template
@staticmethod
def get_delay_for_patient_type(patient_type: str, hospital) -> int:
"""
Get delay hours from SurveyTemplateMapping.
Falls back to default delays if no mapping found.
"""
from apps.integrations.models import SurveyTemplateMapping
mapping = SurveyTemplateMapping.objects.filter(
patient_type=patient_type, hospital=hospital, is_active=True
).first()
if mapping and mapping.send_delay_hours:
return mapping.send_delay_hours
mapping = SurveyTemplateMapping.objects.filter(
patient_type=patient_type, hospital__isnull=True, is_active=True
).first()
if mapping and mapping.send_delay_hours:
return mapping.send_delay_hours
default_delays = {
"IP": 24,
"OP": 1,
"ED": 2,
"DAYCASE": 4,
}
return default_delays.get(patient_type, 1)
@staticmethod
def is_op_visit_complete(
visit_timeline: List[Dict], patient_type: str, hospital
) -> Tuple[bool, Optional[datetime]]:
"""
Check if an OP visit is complete by checking if the last visit event
is older than the configured send_delay_hours.
Args:
visit_timeline: List of visit events for this patient
patient_type: HIS PatientType code
hospital: Hospital instance
Returns:
Tuple of (is_complete: bool, last_visit_date: datetime or None)
"""
if not visit_timeline:
return False, None
last_visit_date = None
for event in visit_timeline:
if event.get("bill_date"):
parsed = HISAdapter.parse_date(event["bill_date"])
if parsed and (last_visit_date is None or parsed > last_visit_date):
last_visit_date = parsed
if not last_visit_date:
return False, None
delay_hours = HISAdapter.get_delay_for_patient_type(patient_type, hospital)
cutoff = timezone.now() - timedelta(hours=delay_hours)
is_complete = last_visit_date <= cutoff
return is_complete, last_visit_date
@staticmethod
def save_patient_visit(
patient: Patient,
hospital: Hospital,
patient_data: Dict,
visit_timeline: List[Dict],
is_visit_complete: bool = False,
discharge_date: Optional[datetime] = None,
effective_discharge_date: Optional[datetime] = None,
) -> HISPatientVisit:
"""
Create or update HISPatientVisit record.
This is called on every fetch for every patient, regardless of
whether the visit is complete.
"""
admission_id = patient_data.get("AdmissionID", "")
patient_id_his = str(patient_data.get("PatientID", ""))
patient_type = patient_data.get("PatientType", "")
reg_code = patient_data.get("RegCode", "")
admit_date_str = patient_data.get("AdmitDate")
admit_date = HISAdapter.parse_date(admit_date_str) if admit_date_str else None
is_vip = str(patient_data.get("IsVIP", "0")).strip() == "1"
visit, created = HISPatientVisit.objects.update_or_create(
admission_id=admission_id,
defaults={
"patient": patient,
"hospital": hospital,
"reg_code": reg_code,
"patient_id_his": patient_id_his,
"patient_type": patient_type,
"admit_date": admit_date,
"discharge_date": discharge_date,
"effective_discharge_date": effective_discharge_date,
"visit_data": patient_data,
"visit_timeline": visit_timeline,
"primary_doctor": patient_data.get("PrimaryDoctor", ""),
"consultant_id": patient_data.get("ConsultantID", ""),
"company_name": patient_data.get("CompanyName", ""),
"grade_name": patient_data.get("GradeName", ""),
"insurance_company_name": patient_data.get("InsuranceCompanyName", ""),
"bill_type": patient_data.get("BillType", ""),
"is_vip": is_vip,
"nationality": patient_data.get("PatientNationality", ""),
"is_visit_complete": is_visit_complete,
"last_his_fetch_at": timezone.now(),
},
)
HISAdapter._resolve_staff_fks(visit, patient_data)
HISAdapter._sync_visit_events(visit, visit_timeline)
action = "Created" if created else "Updated"
logger.info(
f"{action} HISPatientVisit: {admission_id} type={patient_type} complete={is_visit_complete} patient={patient}"
)
return visit
@staticmethod
def _resolve_staff_fks(visit: HISPatientVisit, patient_data: Dict) -> None:
"""
Match HIS doctor/consultant IDs to Staff records via employee_id.
HIS formats:
- ConsultantID: "11065" (numeric string only, no name)
- PrimaryDoctor: "16468-HEBA ELSHABOURY ABDELATTY" (ID prefix + dash + name)
PrimaryDoctor: get_or_create on employee_id (auto-creates Staff from HIS name).
ConsultantID: lookup only (no creation — no name available from HIS).
Uses .update() to avoid extra queries or save signals.
"""
updates = {}
consultant_raw = patient_data.get("ConsultantID", "").strip()
if consultant_raw and consultant_raw.isdigit():
consultant = Staff.objects.filter(employee_id=consultant_raw).first()
if consultant:
updates["consultant_fk"] = consultant
primary_doctor_raw = patient_data.get("PrimaryDoctor", "").strip()
if primary_doctor_raw and "-" in primary_doctor_raw:
doctor_code, doctor_name = primary_doctor_raw.split("-", 1)
doctor_code = doctor_code.strip()
doctor_name = doctor_name.strip()
if doctor_code.isdigit() and doctor_name:
defaults = HISAdapter._staff_defaults_from_name(doctor_name, visit.hospital)
doctor, created = Staff.objects.get_or_create(
employee_id=doctor_code,
defaults=defaults,
)
if created:
logger.info(f"Auto-created Staff from HIS: {doctor} (employee_id={doctor_code})")
updates["primary_doctor_fk"] = doctor
if updates:
HISPatientVisit.objects.filter(pk=visit.pk).update(**updates)
@staticmethod
def _staff_defaults_from_name(full_name: str, hospital: Hospital) -> Dict:
"""
Build Staff defaults dict from a raw HIS doctor name string.
E.g. "HEBA ELSHABOURY ABDELATTY" → first_name="HEBA", last_name="ELSABOURY ABDELATTY"
"""
parts = full_name.split(None, 1)
first_name = parts[0] if parts else "Doctor"
last_name = parts[1] if len(parts) > 1 else ""
return {
"first_name": first_name,
"last_name": last_name,
"name": full_name,
"staff_type": Staff.StaffType.PHYSICIAN,
"job_title": "Physician",
"hospital": hospital,
"physician": True,
}
@staticmethod
def _sync_visit_events(visit: HISPatientVisit, visit_timeline: List[Dict]) -> None:
"""
Sync timeline events to HISVisitEvent model.
Creates/updates HISVisitEvent records from the timeline JSON.
Also auto-creates HISEventType records for unique event types.
"""
from apps.integrations.models import HISVisitEvent, HISEventType
if not visit_timeline:
return
existing_keys = set(HISVisitEvent.objects.filter(visit=visit).values_list("bill_date", "event_type"))
events_to_create = []
event_types_seen = set()
for event in visit_timeline:
bill_date = event.get("bill_date", "")
event_type = event.get("type", "")
patient_type = event.get("patient_type", "")
key = (bill_date, event_type)
if key in existing_keys:
continue
parsed_date_str = event.get("parsed_date")
parsed_date = None
if parsed_date_str:
try:
from datetime import datetime
parsed_date = datetime.fromisoformat(parsed_date_str.replace("Z", "+00:00"))
except (ValueError, TypeError):
pass
events_to_create.append(
HISVisitEvent(
visit=visit,
event_type=event_type,
bill_date=bill_date,
parsed_date=parsed_date,
patient_type=patient_type,
visit_category=event.get("visit_category", ""),
admission_id=event.get("admission_id", ""),
patient_id=event.get("patient_id", ""),
reg_code=event.get("reg_code", ""),
ssn=event.get("ssn", ""),
mobile_no=event.get("mobile_no", ""),
)
)
if event_type:
event_types_seen.add((event_type, patient_type))
if events_to_create:
HISVisitEvent.objects.bulk_create(events_to_create, ignore_conflicts=True)
logger.info(f"Created {len(events_to_create)} HISVisitEvent records for visit {visit.admission_id}")
if event_types_seen:
from django.db.models import F
for event_type, patient_type in event_types_seen:
et, created = HISEventType.objects.get_or_create(event_type=event_type)
if patient_type and patient_type not in et.patient_types:
et.patient_types.append(patient_type)
et.save(update_fields=["patient_types"])
HISEventType.objects.filter(id=et.id).update(event_count=F("event_count") + 1)
@staticmethod
def create_and_send_survey(
patient: Patient,
hospital: Hospital,
patient_data: Dict,
survey_template: SurveyTemplate = None,
visit_timeline: List[Dict] = None,
his_visit: HISPatientVisit = None,
) -> Optional[SurveyInstance]:
"""
Create survey instance using the template from SurveyTemplateMapping.
The template's questions are filtered at display time by the public
serializer - is_base questions are always shown, event_type questions
are only shown if the patient experienced that event.
"""
from apps.surveys.models import (
SurveyInstance,
SurveyStatus,
)
from apps.surveys.tasks import send_scheduled_survey
admission_id = patient_data.get("AdmissionID")
discharge_date_str = patient_data.get("DischargeDate")
patient_type = patient_data.get("PatientType")
existing_survey = SurveyInstance.objects.filter(
patient=patient, hospital=hospital, metadata__admission_id=admission_id
).first()
if existing_survey:
logger.info(f"Survey already exists for admission {admission_id}")
if his_visit and not his_visit.survey_instance:
his_visit.survey_instance = existing_survey
his_visit.save(update_fields=["survey_instance"])
return existing_survey
if not survey_template:
survey_template = HISAdapter.get_survey_template(patient_type, hospital)
if not survey_template:
logger.warning(f"No survey template mapping found for {patient_type} at {hospital}")
return None
delay_hours = HISAdapter.get_delay_for_patient_type(patient_type, hospital)
scheduled_send_at = timezone.now() + timedelta(hours=delay_hours)
effective_discharge = None
if his_visit and his_visit.effective_discharge_date:
effective_discharge = his_visit.effective_discharge_date.isoformat()
# Collect unique event types from patient's journey for metadata
event_types = []
if his_visit:
events_qs = his_visit.visit_events.order_by("parsed_date")
for evt in events_qs:
if evt.event_type and evt.event_type not in event_types:
event_types.append(evt.event_type)
survey = SurveyInstance.objects.create(
survey_template=survey_template,
patient=patient,
hospital=hospital,
status=SurveyStatus.PENDING,
delivery_channel="SMS",
recipient_phone=patient.phone,
recipient_email=patient.email,
scheduled_send_at=scheduled_send_at,
metadata={
"admission_id": admission_id,
"patient_type": patient_type,
"hospital_id": patient_data.get("HospitalID"),
"insurance_company": patient_data.get("InsuranceCompanyName"),
"is_vip": patient_data.get("IsVIP") == "1",
"discharge_date": discharge_date_str,
"effective_discharge_date": effective_discharge,
"scheduled_send_at": scheduled_send_at.isoformat(),
"delay_hours": delay_hours,
"visit_timeline": visit_timeline or [],
"event_types": event_types,
"question_source": "template",
},
)
send_scheduled_survey.apply_async(args=[str(survey.id)], countdown=delay_hours * 3600)
if his_visit:
his_visit.survey_instance = survey
his_visit.save(update_fields=["survey_instance"])
logger.info(
f"Survey {survey.id} created for {patient_type} (template: {survey_template.name}), "
f"will send in {delay_hours}h at {scheduled_send_at}"
)
return survey
@staticmethod
def process_single_patient(his_data: Dict, patient_data: Dict) -> Dict:
"""
Process a single patient from HIS data.
Two phases:
Phase A: Always save patient and visit data
Phase B: Create survey only if visit is complete
Args:
his_data: Full HIS response dict (needed for visit timeline extraction)
patient_data: Single patient dict from FetchPatientDataTimeStampList
Returns:
Dict with processing results
"""
result = {
"success": False,
"message": "",
"patient": None,
"survey": None,
"survey_queued": False,
"visit_saved": False,
"visit_complete": False,
"patient_type": None,
}
try:
patient_type = patient_data.get("PatientType")
patient_id = str(patient_data.get("PatientID", ""))
discharge_date_str = patient_data.get("DischargeDate")
hospital = HISAdapter.get_or_create_hospital(patient_data)
if not hospital:
result["message"] = "Could not determine hospital"
return result
patient = HISAdapter.get_or_create_patient(patient_data, hospital)
result["patient"] = patient
result["patient_type"] = patient_type
visit_timeline = HISAdapter.extract_patient_visits(his_data, patient_id, patient_type)
discharge_date = HISAdapter.parse_date(discharge_date_str) if discharge_date_str else None
is_visit_complete = False
effective_discharge_date = None
if patient_type and patient_type.upper() in ("ED", "IP"):
if discharge_date:
is_visit_complete = True
else:
result["message"] = f"{patient_type} patient not discharged - visit saved, no survey"
result["success"] = True
result["visit_saved"] = True
HISAdapter.save_patient_visit(
patient,
hospital,
patient_data,
visit_timeline,
is_visit_complete=False,
discharge_date=None,
)
return result
elif patient_type and patient_type.upper() == "OP":
is_complete, last_visit_date = HISAdapter.is_op_visit_complete(visit_timeline, patient_type, hospital)
if is_complete:
is_visit_complete = True
effective_discharge_date = last_visit_date
else:
result["message"] = f"OP visit still in progress (last activity: {last_visit_date})"
result["success"] = True
result["visit_saved"] = True
result["visit_complete"] = False
HISAdapter.save_patient_visit(
patient,
hospital,
patient_data,
visit_timeline,
is_visit_complete=False,
discharge_date=None,
)
return result
else:
if discharge_date:
is_visit_complete = True
else:
result["message"] = f"Patient type {patient_type} not discharged - visit saved, no survey"
result["success"] = True
result["visit_saved"] = True
HISAdapter.save_patient_visit(
patient,
hospital,
patient_data,
visit_timeline,
is_visit_complete=False,
discharge_date=None,
)
return result
his_visit = HISAdapter.save_patient_visit(
patient,
hospital,
patient_data,
visit_timeline,
is_visit_complete=is_visit_complete,
discharge_date=discharge_date,
effective_discharge_date=effective_discharge_date,
)
result["visit_saved"] = True
result["visit_complete"] = True
if his_visit.survey_instance:
result["message"] = f"Survey already exists for admission {patient_data.get('AdmissionID')}"
result["success"] = True
result["survey"] = his_visit.survey_instance
result["survey_queued"] = True
return result
survey = HISAdapter.create_and_send_survey(
patient, hospital, patient_data, visit_timeline=visit_timeline, his_visit=his_visit
)
if survey:
survey_queued = survey.status == SurveyStatus.PENDING
else:
survey_queued = False
result.update(
{
"success": True,
"message": "Patient data processed successfully",
"survey": survey,
"survey_queued": survey_queued,
"scheduled_send_at": survey.scheduled_send_at.isoformat()
if survey and survey.scheduled_send_at
else None,
"survey_url": survey.get_survey_url() if survey else None,
}
)
except Exception as e:
logger.error(f"Error processing HIS data: {str(e)}", exc_info=True)
result["message"] = f"Error processing HIS data: {str(e)}"
result["success"] = False
return result
@staticmethod
def process_his_response(his_data: Dict) -> Dict:
"""
Process a full HIS API response containing multiple patients.
Iterates each patient in FetchPatientDataTimeStampList, extracts
their visits from the appropriate sublist, saves patient/visit data,
and creates surveys for completed visits.
Args:
his_data: Full HIS response dict
Returns:
Dict with summary of processing results
"""
result = {
"success": False,
"total_patients": 0,
"visits_saved": 0,
"surveys_created": 0,
"surveys_skipped": 0,
"errors": [],
"details": [],
}
try:
if his_data.get("Code") != 200 or his_data.get("Status") != "Success":
result["errors"].append(f"HIS Error: {his_data.get('Message', 'Unknown error')}")
return result
patient_list = his_data.get("FetchPatientDataTimeStampList", [])
if not patient_list:
result["message"] = "No patient data found"
result["success"] = True
return result
result["total_patients"] = len(patient_list)
for patient_data in patient_list:
patient_result = HISAdapter.process_single_patient(his_data, patient_data)
detail = {
"patient_name": patient_data.get("PatientName", "Unknown"),
"patient_type": patient_data.get("PatientType"),
"admission_id": patient_data.get("AdmissionID"),
}
if patient_result["success"]:
if patient_result.get("visit_saved"):
result["visits_saved"] += 1
if patient_result.get("visit_complete") and patient_result.get("survey"):
result["surveys_created"] += 1
detail["survey_created"] = True
elif patient_result.get("visit_complete") and not patient_result.get("survey"):
result["surveys_skipped"] += 1
detail["reason"] = patient_result.get("message", "No survey created")
else:
detail["reason"] = patient_result.get("message", "Visit in progress")
else:
result["errors"].append(f"{patient_data.get('PatientName')}: {patient_result.get('message')}")
detail["error"] = patient_result.get("message")
result["details"].append(detail)
result["success"] = True
except Exception as e:
logger.error(f"Error processing HIS response: {str(e)}", exc_info=True)
result["errors"].append(str(e))
return result
@staticmethod
def process_his_data(his_data: Dict) -> Dict:
"""
Process HIS patient data (webhook/push flow).
Handles a full HIS payload received via webhook.
Processes the first patient in the list.
For the pull flow, use process_his_response() instead.
Args:
his_data: HIS data in real format
Returns:
Dict with processing results
"""
patient_list = his_data.get("FetchPatientDataTimeStampList", [])
if not patient_list:
return {"success": False, "message": "No patient data found"}
patient_data = patient_list[0]
if his_data.get("Code") != 200 or his_data.get("Status") != "Success":
return {"success": False, "message": f"HIS Error: {his_data.get('Message', 'Unknown error')}"}
return HISAdapter.process_single_patient(his_data, patient_data)