HH/apps/integrations/services/his_adapter.py

420 lines
14 KiB
Python

"""
HIS Adapter Service - Transforms real HIS data format to internal format
This service handles the transformation of HIS patient data into PX360's
internal format for sending surveys based on PatientType.
Simplified Flow:
1. Parse HIS patient data
2. Determine survey type from PatientType
3. Create survey instance with PENDING status
4. Queue delayed send task
5. Survey sent after delay (e.g., 1 hour for OPD)
"""
from datetime import datetime, timedelta
from typing import Dict, Optional, Tuple
import logging
from django.utils import timezone
from apps.organizations.models import Hospital, Patient
from apps.surveys.models import SurveyTemplate, SurveyInstance, SurveyStatus
from apps.integrations.models import InboundEvent
logger = logging.getLogger(__name__)
class HISAdapter:
"""
Adapter for transforming HIS patient data format to internal format.
HIS Data Structure:
{
"FetchPatientDataTimeStampList": [{...patient demographics...}],
"FetchPatientDataTimeStampVisitDataList": [
{"Type": "Consultation", "BillDate": "05-Jun-2025 11:06"},
...
],
"Code": 200,
"Status": "Success"
}
PatientType Codes:
- "1" → Inpatient
- "2" or "O" → OPD (Outpatient)
- "3" or "E" → EMS (Emergency)
"""
@staticmethod
def parse_date(date_str: Optional[str]) -> Optional[datetime]:
"""Parse HIS date format 'DD-Mon-YYYY HH:MM' to timezone-aware datetime"""
if not date_str:
return None
try:
# HIS format: "05-Jun-2025 11:06"
naive_dt = datetime.strptime(date_str, "%d-%b-%Y %H:%M")
# Make timezone-aware using Django's timezone
from django.utils import timezone
return timezone.make_aware(naive_dt)
except ValueError:
return None
@staticmethod
def map_patient_type_to_survey_type(patient_type: str) -> str:
"""
Map HIS PatientType code to survey type name.
Returns survey type name for template lookup.
"""
if patient_type == "1":
return "INPATIENT"
elif patient_type in ["2", "O"]:
return "OPD"
elif patient_type in ["3", "E"]:
return "EMS"
elif patient_type == "4":
return "DAYCASE"
else:
# Default to OPD if unknown
return "OPD"
@staticmethod
def split_patient_name(full_name: str) -> Tuple[str, str]:
"""Split patient name into first and last name"""
# Handle names like "AFAF NASSER ALRAZoooOOQ"
parts = full_name.strip().split()
if len(parts) == 1:
return parts[0], ""
elif len(parts) == 2:
return parts[0], parts[1]
else:
# Multiple parts - first is first name, rest is last name
return parts[0], " ".join(parts[1:])
@staticmethod
def get_or_create_hospital(hospital_data: Dict) -> Optional[Hospital]:
"""Get or create hospital from HIS data"""
hospital_name = hospital_data.get("HospitalName")
hospital_id = hospital_data.get("HospitalID")
if not hospital_name:
return None
# Try to find existing hospital by name
hospital = Hospital.objects.filter(name__icontains=hospital_name).first()
if hospital:
return hospital
# If not found, create new hospital (optional - can be disabled in production)
hospital_code = hospital_id if hospital_id else f"HOSP-{hospital_name[:3].upper()}"
hospital, created = Hospital.objects.get_or_create(
code=hospital_code,
defaults={
'name': hospital_name,
'status': 'active'
}
)
return hospital
@staticmethod
def get_or_create_patient(patient_data: Dict, hospital: Hospital) -> Patient:
"""Get or create patient from HIS demographic data"""
patient_id = patient_data.get("PatientID")
mrn = patient_id # PatientID serves as MRN
national_id = patient_data.get("SSN")
phone = patient_data.get("MobileNo")
email = patient_data.get("Email")
full_name = patient_data.get("PatientName")
# Split name
first_name, last_name = HISAdapter.split_patient_name(full_name)
# Parse date of birth
dob_str = patient_data.get("DOB")
date_of_birth = HISAdapter.parse_date(dob_str) if dob_str else None
# Extract additional info
gender = patient_data.get("Gender", "").lower()
# Try to find existing patient by MRN
patient = Patient.objects.filter(mrn=mrn, primary_hospital=hospital).first()
if patient:
# Update patient information if changed
patient.first_name = first_name
patient.last_name = last_name
patient.national_id = national_id
patient.phone = phone
# Only update email if it's not None (to avoid NOT NULL constraint)
if email is not None:
patient.email = email
patient.date_of_birth = date_of_birth
patient.gender = gender
patient.save()
return patient
# Create new patient
patient = Patient.objects.create(
mrn=mrn,
primary_hospital=hospital,
first_name=first_name,
last_name=last_name,
national_id=national_id,
phone=phone,
email=email if email else '', # Use empty string if email is None
date_of_birth=date_of_birth,
gender=gender
)
return patient
@staticmethod
def get_survey_template(patient_type: str, hospital: Hospital) -> Optional[SurveyTemplate]:
"""
Get appropriate survey template based on PatientType using explicit mapping.
Uses SurveyTemplateMapping to determine which template to send.
Args:
patient_type: HIS PatientType code (1, 2, 3, 4, O, E, APPOINTMENT)
hospital: Hospital instance
Returns:
SurveyTemplate or None if not found
"""
from apps.integrations.models import SurveyTemplateMapping
# Use explicit mapping to get template
survey_template = SurveyTemplateMapping.get_template_for_patient_type(
patient_type, hospital
)
return survey_template
@staticmethod
def get_delay_for_patient_type(patient_type: str, hospital) -> int:
"""
Get delay hours from SurveyTemplateMapping.
Falls back to default delays if no mapping found.
Args:
patient_type: HIS PatientType code (1, 2, 3, 4, O, E)
hospital: Hospital instance
Returns:
Delay in hours
"""
from apps.integrations.models import SurveyTemplateMapping
# Try to get mapping with delay (hospital-specific)
mapping = SurveyTemplateMapping.objects.filter(
patient_type=patient_type,
hospital=hospital,
is_active=True
).first()
if mapping and mapping.send_delay_hours:
return mapping.send_delay_hours
# Fallback to global mapping
mapping = SurveyTemplateMapping.objects.filter(
patient_type=patient_type,
hospital__isnull=True,
is_active=True
).first()
if mapping and mapping.send_delay_hours:
return mapping.send_delay_hours
# Default delays by patient type
default_delays = {
'1': 24, # Inpatient - 24 hours
'2': 1, # OPD - 1 hour
'3': 2, # EMS - 2 hours
'O': 1, # OPD - 1 hour
'E': 2, # EMS - 2 hours
'4': 4, # Daycase - 4 hours
}
return default_delays.get(patient_type, 1) # Default 1 hour
@staticmethod
def create_and_send_survey(
patient: Patient,
hospital: Hospital,
patient_data: Dict,
survey_template: SurveyTemplate
) -> Optional[SurveyInstance]:
"""
Create survey instance and queue for delayed sending.
NEW: Survey is created with PENDING status and sent after delay.
Args:
patient: Patient instance
hospital: Hospital instance
patient_data: HIS patient data
survey_template: SurveyTemplate instance
Returns:
SurveyInstance or None if failed
"""
from apps.surveys.tasks import send_scheduled_survey
admission_id = patient_data.get("AdmissionID")
discharge_date_str = patient_data.get("DischargeDate")
patient_type = patient_data.get("PatientType")
# Check if survey already sent for this admission
existing_survey = SurveyInstance.objects.filter(
patient=patient,
hospital=hospital,
metadata__admission_id=admission_id
).first()
if existing_survey:
logger.info(f"Survey already exists for admission {admission_id}")
return existing_survey
# Get delay from SurveyTemplateMapping
delay_hours = HISAdapter.get_delay_for_patient_type(patient_type, hospital)
# Calculate scheduled send time
scheduled_send_at = timezone.now() + timedelta(hours=delay_hours)
# Create survey with PENDING status (NOT SENT)
survey = SurveyInstance.objects.create(
survey_template=survey_template,
patient=patient,
hospital=hospital,
status=SurveyStatus.PENDING, # Changed from SENT
delivery_channel="SMS",
recipient_phone=patient.phone,
recipient_email=patient.email,
scheduled_send_at=scheduled_send_at,
metadata={
'admission_id': admission_id,
'patient_type': patient_type,
'hospital_id': patient_data.get("HospitalID"),
'insurance_company': patient_data.get("InsuranceCompanyName"),
'is_vip': patient_data.get("IsVIP") == "1",
'discharge_date': discharge_date_str,
'scheduled_send_at': scheduled_send_at.isoformat(),
'delay_hours': delay_hours,
}
)
# Queue delayed send task
send_scheduled_survey.apply_async(
args=[str(survey.id)],
countdown=delay_hours * 3600 # Convert to seconds
)
logger.info(
f"Survey {survey.id} created for {patient_type}, "
f"will send in {delay_hours}h at {scheduled_send_at}"
)
return survey
@staticmethod
def process_his_data(his_data: Dict) -> Dict:
"""
Main method to process HIS patient data and send surveys.
Simplified Flow:
1. Extract patient data
2. Get or create patient and hospital
3. Determine survey type from PatientType
4. Create survey with PENDING status
5. Queue delayed send task
Args:
his_data: HIS data in real format
Returns:
Dict with processing results
"""
result = {
'success': False,
'message': '',
'patient': None,
'survey': None,
'survey_queued': False
}
try:
# Extract patient data
patient_list = his_data.get("FetchPatientDataTimeStampList", [])
if not patient_list:
result['message'] = "No patient data found"
return result
patient_data = patient_list[0]
# Validate status
if his_data.get("Code") != 200 or his_data.get("Status") != "Success":
result['message'] = f"HIS Error: {his_data.get('Message', 'Unknown error')}"
return result
# Check if patient is discharged (required for ALL patient types)
patient_type = patient_data.get("PatientType")
discharge_date_str = patient_data.get("DischargeDate")
# All patient types require discharge date
if not discharge_date_str:
result['message'] = f'Patient type {patient_type} not discharged - no survey sent'
result['success'] = True # Not an error, just no action needed
return result
# Get or create hospital
hospital = HISAdapter.get_or_create_hospital(patient_data)
if not hospital:
result['message'] = "Could not determine hospital"
return result
# Get or create patient
patient = HISAdapter.get_or_create_patient(patient_data, hospital)
# Get survey template based on PatientType
patient_type = patient_data.get("PatientType")
survey_template = HISAdapter.get_survey_template(patient_type, hospital)
if not survey_template:
result['message'] = f"No survey template found for patient type '{patient_type}'"
return result
# Create and queue survey (delayed sending)
survey = HISAdapter.create_and_send_survey(
patient, hospital, patient_data, survey_template
)
if survey:
# Survey is queued with PENDING status
survey_queued = survey.status == SurveyStatus.PENDING
else:
survey_queued = False
result.update({
'success': True,
'message': 'Patient data processed successfully',
'patient': patient,
'patient_type': patient_type,
'survey': survey,
'survey_queued': survey_queued,
'scheduled_send_at': survey.scheduled_send_at.isoformat() if survey and survey.scheduled_send_at else None,
'survey_url': survey.get_survey_url() if survey else None
})
except Exception as e:
logger.error(f"Error processing HIS data: {str(e)}", exc_info=True)
result['message'] = f"Error processing HIS data: {str(e)}"
result['success'] = False
return result