HH/apps/simulator/his_simulator.py

504 lines
18 KiB
Python

#!/usr/bin/env python
"""
HIS Simulator - Continuous patient journey event generator
This script simulates a Hospital Information System (HIS) by continuously
generating patient journey events and sending them to the PX360 API.
Usage:
python his_simulator.py [--url URL] [--delay SECONDS] [--max-patients N]
Arguments:
--url: API endpoint URL (default: http://localhost:8000/api/simulator/his-events/)
--delay: Delay between events in seconds (default: 5)
--max-patients: Maximum number of patients to simulate (default: infinite)
"""
import argparse
import json
import random
import time
import os
import sys
import django
from datetime import datetime, timedelta
from typing import List, Dict
import requests
# Add project root to Python path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
# Setup Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.dev')
django.setup()
from apps.organizations.models import Hospital
# Arabic names for realistic patient data
ARABIC_FIRST_NAMES = [
"Ahmed", "Mohammed", "Abdullah", "Omar", "Ali",
"Saud", "Fahad", "Turki", "Khalid", "Youssef",
"Abdulrahman", "Abdulaziz", "Abdulwahab", "Majid", "Nasser",
"Fatima", "Aisha", "Sarah", "Nora", "Layla",
"Hessa", "Reem", "Mona", "Dalal", "Jawaher"
]
ARABIC_LAST_NAMES = [
"Al-Saud", "Al-Rashid", "Al-Qahtani", "Al-Harbi", "Al-Otaibi",
"Al-Dossary", "Al-Shammari", "Al-Mutairi", "Al-Anazi", "Al-Zahrani",
"Al-Ghamdi", "Al-Ahmari", "Al-Malki", "Al-Khaldi", "Al-Bakr"
]
# Departments and journey types
DEPARTMENTS = [
"Cardiology", "Orthopedics", "Pediatrics", "Emergency", "General",
"Internal Medicine", "Surgery", "Oncology", "Neurology", "Gynecology"
]
def get_active_hospital_codes() -> List[str]:
"""Query active hospitals from the database and return their codes"""
try:
hospital_codes = list(
Hospital.objects.filter(status='active').values_list('code', flat=True)
)
if not hospital_codes:
# Fallback to default if no active hospitals found
print("⚠️ Warning: No active hospitals found, using default ALH-main")
return ["ALH-main"]
return hospital_codes
except Exception as e:
print(f"⚠️ Error querying hospitals: {e}, using default ALH-main")
return ["ALH-main"]
# HIS Visit Types (directly from HIS system)
HIS_VISIT_TYPES = [
"Consultation",
"Doctor Visited",
"Clinical Condtion",
"ChiefComplaint",
"Prescribed Drugs"
]
def generate_random_saudi_phone() -> str:
"""Generate random Saudi phone number"""
return f"05{random.randint(0, 9)}{random.randint(10000000, 99999999)}"
def generate_patient_id() -> str:
"""Generate random patient ID"""
return str(random.randint(100000, 999999))
def generate_admission_id() -> str:
"""Generate random admission ID"""
return str(random.randint(100000, 999999))
def generate_ssn() -> str:
"""Generate random Saudi National ID (10 digits)"""
return "".join([str(random.randint(0, 9)) for _ in range(10)])
def generate_reg_code() -> str:
"""Generate registration code"""
return f"ALHH.{random.randint(10000000, 99999999)}"
def generate_consultant_id() -> str:
"""Generate consultant ID"""
return str(random.randint(100, 999))
def generate_primary_doctor(first_name: str, last_name: str) -> str:
"""Generate primary doctor name"""
doctor_id = random.randint(1000, 9999)
return f"{doctor_id}-{first_name} {last_name}"
def generate_company_id() -> str:
"""Generate company ID"""
return str(random.randint(10000, 99999))
def generate_grade_id() -> str:
"""Generate grade ID"""
return str(random.randint(1000, 9999))
def parse_date(date_obj: datetime) -> str:
"""Format date as DD-Mon-YYYY HH:MM"""
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
return f"{date_obj.day:02d}-{months[date_obj.month-1]}-{date_obj.year} {date_obj.hour:02d}:{date_obj.minute:02d}"
def generate_random_national_id() -> str:
"""Generate random Saudi national ID (10 digits)"""
return "".join([str(random.randint(0, 9)) for _ in range(10)])
def generate_random_mrn() -> str:
"""Generate random MRN"""
return f"MRN-{random.randint(100000, 999999)}"
def generate_random_encounter_id() -> str:
"""Generate random encounter ID"""
year = datetime.now().year
return f"ENC-{year}-{random.randint(1, 99999):05d}"
def generate_random_email(first_name: str, last_name: str) -> str:
"""Generate random email address"""
domains = ["gmail.com", "yahoo.com", "hotmail.com", "outlook.com"]
domain = random.choice(domains)
return f"{first_name.lower()}.{last_name.lower()}@{domain}"
def generate_patient_type() -> str:
"""Generate PatientType with realistic distribution
Distribution:
- OPD (Type 2 or O): 60% (most common)
- Inpatient (Type 1): 30%
- EMS/Emergency (Type 3 or E): 10%
"""
rand = random.random()
if rand < 0.6:
# 60% OPD - randomly use "2" or "O"
return random.choice(["2", "O"])
elif rand < 0.9:
# 30% Inpatient
return "1"
else:
# 10% EMS/Emergency - randomly use "3" or "E"
return random.choice(["3", "E"])
def generate_his_patient_data() -> Dict:
"""Generate patient data in real HIS format"""
patient_id = generate_patient_id()
admission_id = generate_admission_id()
national_id = generate_ssn()
first_name = random.choice(ARABIC_FIRST_NAMES)
last_name = random.choice(ARABIC_LAST_NAMES)
phone = generate_random_saudi_phone()
email = generate_random_email(first_name, last_name)
reg_code = generate_reg_code()
consultant_id = generate_consultant_id()
primary_doctor = generate_primary_doctor(first_name, last_name)
company_id = generate_company_id()
grade_id = generate_grade_id()
# Get hospital info
hospital_codes = get_active_hospital_codes()
hospital_code = random.choice(hospital_codes)
hospital = Hospital.objects.filter(code=hospital_code).first()
hospital_name = hospital.name if hospital else "NUZHA-UAT"
# Generate patient type with realistic distribution
patient_type = generate_patient_type()
# Generate dates
dob = datetime.now() - timedelta(days=random.randint(18*365, 80*365))
admit_date = datetime.now() - timedelta(days=random.randint(0, 7),
hours=random.randint(0, 23))
# Determine if discharged (40% chance)
is_discharged = random.random() < 0.4
discharge_date = None
if is_discharged:
discharge_date = admit_date + timedelta(hours=random.randint(4, 48))
# Generate insurance company
insurance_companies = [
"Arabian Shield Cooperative Insurance Company",
"Tawuniya Cooperative Insurance Company",
"Malath Cooperative Insurance Company",
"MedGulf Cooperative Insurance Company",
"AXA Cooperative Insurance Company"
]
insurance_company = random.choice(insurance_companies)
# Generate company and grade info
companies = [
("Al Hammadi for Mgmt / Arabian Shield", "A"),
("Saudi Aramco", "A+"),
("Saudi Electricity Company", "B"),
("STC", "A"),
("Ministry of Health", "C")
]
company_name, grade_name = random.choice(companies)
# Generate bill type
bill_types = ["CR", "CA", "CP"]
bill_type = random.choice(bill_types)
# VIP status (0 or 1)
is_vip = random.choice(["0", "1"])
# Generate patient data
patient_data = {
"Type": "Patient Demographic details",
"PatientID": patient_id,
"AdmissionID": admission_id,
"HospitalID": str(random.randint(1, 10)),
"HospitalName": hospital_name,
"PatientType": patient_type,
"AdmitDate": parse_date(admit_date),
"DischargeDate": parse_date(discharge_date) if discharge_date else None,
"RegCode": reg_code,
"SSN": national_id,
"PatientName": f"{first_name} {last_name}",
"GenderID": str(random.randint(1, 2)),
"Gender": "Male" if random.choice([True, False]) else "Female",
"FullAge": f"{random.randint(18, 80)} Year(s)",
"PatientNationality": "Saudi",
"MobileNo": phone,
"Email": email, # Added email field
"DOB": parse_date(dob),
"ConsultantID": consultant_id,
"PrimaryDoctor": primary_doctor,
"CompanyID": company_id,
"GradeID": grade_id,
"CompanyName": company_name,
"GradeName": grade_name,
"InsuranceCompanyName": insurance_company,
"BillType": bill_type,
"IsVIP": is_vip
}
return patient_data, admission_id, first_name, last_name, is_discharged, patient_type
def generate_his_visit_data(admit_date: datetime, is_discharged: bool) -> tuple:
"""Generate visit timeline in HIS format"""
# Determine number of visits (partial or full journey)
is_full_journey = random.random() < 0.4
num_visits = len(HIS_VISIT_TYPES) if is_full_journey else random.randint(1, len(HIS_VISIT_TYPES) - 1)
# Select visit types
selected_types = HIS_VISIT_TYPES[:num_visits]
# Generate visit data with timestamps
visit_data = []
base_time = admit_date + timedelta(minutes=30) # First visit 30 min after admission
for i, visit_type in enumerate(selected_types):
# Stagger visits by 15-30 minutes
visit_time = base_time + timedelta(minutes=i*15, seconds=random.randint(0, 59))
visit_data.append({
"Type": visit_type,
"BillDate": parse_date(visit_time)
})
return visit_data, is_full_journey
def generate_his_patient_journey() -> Dict:
"""Generate complete HIS patient journey in real format"""
# Generate patient data
patient_data, admission_id, first_name, last_name, is_discharged, patient_type = generate_his_patient_data()
# Parse admit date back to datetime
admit_date = datetime.strptime(patient_data["AdmitDate"], "%d-%b-%Y %H:%M")
# Generate visit data
visit_data, is_full_journey = generate_his_visit_data(admit_date, is_discharged)
# Construct HIS format response
his_data = {
"FetchPatientDataTimeStampList": [patient_data],
"FetchPatientDataTimeStampVisitDataList": visit_data,
"Code": 200,
"Status": "Success",
"Message": "",
"Message2L": "",
"MobileNo": "",
"ValidateMessage": ""
}
return his_data, {
"admission_id": admission_id,
"patient_name": f"{first_name} {last_name}",
"visits_completed": len(visit_data),
"total_possible_visits": len(HIS_VISIT_TYPES),
"is_full_journey": is_full_journey,
"is_discharged": is_discharged,
"patient_type": patient_type
}
def send_his_data_to_api(api_url: str, his_data: Dict) -> bool:
"""Send HIS patient data to the PX360 API"""
try:
# Send HIS format data directly
response = requests.post(
api_url,
json=his_data,
headers={"Content-Type": "application/json"},
timeout=10
)
if response.status_code == 200:
return True
else:
print(f" ❌ API Error: {response.status_code} - {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f" ❌ Request failed: {str(e)}")
return False
def print_his_journey_summary(summary: Dict, success: bool):
"""Print formatted HIS journey summary"""
status_symbol = "" if success else ""
status_text = "Full Journey" if summary["is_full_journey"] else "Partial Journey"
discharge_text = "Discharged" if summary["is_discharged"] else "Active"
print(f"\n{status_symbol} 🏥 HIS Patient Journey Created")
print(f" Patient: {summary['patient_name']}")
print(f" Admission ID: {summary['admission_id']}")
print(f" Status: {discharge_text}")
print(f" Type: {status_text}")
print(f" Visits: {summary['visits_completed']}/{summary['total_possible_visits']} completed")
print(f" API Status: {'Success' if success else 'Failed'}")
def print_his_statistics(stats: Dict):
"""Print HIS simulation statistics"""
print(f"\n{'='*70}")
print(f"📊 HIS SIMULATION STATISTICS")
print(f"{'='*70}")
print(f"Total Patients: {stats['total']}")
print(f"Successful: {stats['successful']} ({stats['success_rate']:.1f}%)")
print(f"Failed: {stats['failed']}")
print(f"Full Journeys: {stats['full_journeys']}")
print(f"Partial Journeys: {stats['partial_journeys']}")
print(f"Discharged: {stats['discharged']}")
print(f"Active: {stats['active']}")
print(f"Total Visits Sent: {stats['total_visits']}")
if stats['patient_type_distribution']:
print(f"\n📋 Patient Type Distribution:")
for ptype, count in sorted(stats['patient_type_distribution'].items()):
percentage = (count / stats['total']) * 100 if stats['total'] > 0 else 0
# Map type codes to readable names
if ptype in ["2", "O"]:
type_name = "OPD"
elif ptype == "1":
type_name = "Inpatient"
elif ptype in ["3", "E"]:
type_name = "EMS"
else:
type_name = "Unknown"
print(f" {type_name} ({ptype}): {count} ({percentage:.1f}%)")
if stats['hospital_distribution']:
print(f"\n🏥 Hospital Distribution:")
for hospital, count in sorted(stats['hospital_distribution'].items()):
percentage = (count / stats['total']) * 100 if stats['total'] > 0 else 0
print(f" {hospital}: {count} ({percentage:.1f}%)")
print(f"{'='*70}\n")
def main():
"""Main simulator loop"""
parser = argparse.ArgumentParser(description="HIS Simulator - Continuous event generator")
parser.add_argument("--url",
default="http://localhost:8000/api/integrations/events/",
help="API endpoint URL for HIS patient data")
parser.add_argument("--delay",
type=int,
default=5,
help="Delay between events in seconds")
parser.add_argument("--max-patients",
type=int,
default=0,
help="Maximum number of patients to simulate (0 = infinite)")
args = parser.parse_args()
print("="*70)
print("🏥 HIS SIMULATOR - Real HIS Data Format Generator")
print("="*70)
print(f"API URL: {args.url}")
print(f"Delay: {args.delay} seconds between patients")
print(f"Max Patients: {args.max_patients if args.max_patients > 0 else 'Infinite'}")
print("="*70)
print("\nStarting simulation... Press Ctrl+C to stop\n")
# Statistics
stats = {
"total": 0,
"successful": 0,
"failed": 0,
"full_journeys": 0,
"partial_journeys": 0,
"discharged": 0,
"active": 0,
"total_visits": 0,
"hospital_distribution": {},
"patient_type_distribution": {}
}
patient_count = 0
try:
while True:
# Check max patients limit
if args.max_patients > 0 and patient_count >= args.max_patients:
print(f"\n✓ Reached maximum patient limit: {args.max_patients}")
break
# Generate HIS patient journey
his_data, summary = generate_his_patient_journey()
# Send HIS data to API
print(f"\n📤 Sending patient data for {summary['patient_name']}...")
print(f" Admission ID: {summary['admission_id']}")
print(f" Visits: {len(his_data['FetchPatientDataTimeStampVisitDataList'])}")
success = send_his_data_to_api(args.url, his_data)
# Update statistics
patient_count += 1
stats["total"] += 1
stats["total_visits"] += len(his_data['FetchPatientDataTimeStampVisitDataList'])
if success:
stats["successful"] += 1
else:
stats["failed"] += 1
if summary["is_full_journey"]:
stats["full_journeys"] += 1
else:
stats["partial_journeys"] += 1
if summary["is_discharged"]:
stats["discharged"] += 1
else:
stats["active"] += 1
# Track hospital and patient type distribution
patient_data = his_data['FetchPatientDataTimeStampList'][0]
hospital = patient_data['HospitalName']
stats["hospital_distribution"][hospital] = stats["hospital_distribution"].get(hospital, 0) + 1
patient_type = summary.get("patient_type", "Unknown")
stats["patient_type_distribution"][patient_type] = stats["patient_type_distribution"].get(patient_type, 0) + 1
# Calculate success rate
stats["success_rate"] = (stats["successful"] / stats["total"]) * 100 if stats["total"] > 0 else 0
# Print journey summary
print_his_journey_summary(summary, success)
# Print statistics every 10 patients
if patient_count % 10 == 0:
print_his_statistics(stats)
# Wait before next patient
time.sleep(args.delay)
except KeyboardInterrupt:
print("\n\n⏹️ Simulation stopped by user")
print_his_statistics(stats)
print("Goodbye! 👋\n")
if __name__ == "__main__":
main()