HH/apps/simulator/his_simulator.py
2026-01-24 15:27:30 +03:00

353 lines
12 KiB
Python

#!/usr/bin/env python
"""
HIS Simulator - Continuous patient journey event generator
This script simulates a Hospital Information System (HIS) by continuously
generating patient journey events and sending them to the PX360 API.
Usage:
python his_simulator.py [--url URL] [--delay SECONDS] [--max-patients N]
Arguments:
--url: API endpoint URL (default: http://localhost:8000/api/simulator/his-events/)
--delay: Delay between events in seconds (default: 5)
--max-patients: Maximum number of patients to simulate (default: infinite)
"""
import argparse
import json
import random
import time
import os
import sys
import django
from datetime import datetime, timedelta
from typing import List, Dict
import requests
# Add project root to Python path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
# Setup Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.dev')
django.setup()
from apps.organizations.models import Hospital
# Arabic names for realistic patient data
ARABIC_FIRST_NAMES = [
"Ahmed", "Mohammed", "Abdullah", "Omar", "Ali",
"Saud", "Fahad", "Turki", "Khalid", "Youssef",
"Abdulrahman", "Abdulaziz", "Abdulwahab", "Majid", "Nasser",
"Fatima", "Aisha", "Sarah", "Nora", "Layla",
"Hessa", "Reem", "Mona", "Dalal", "Jawaher"
]
ARABIC_LAST_NAMES = [
"Al-Saud", "Al-Rashid", "Al-Qahtani", "Al-Harbi", "Al-Otaibi",
"Al-Dossary", "Al-Shammari", "Al-Mutairi", "Al-Anazi", "Al-Zahrani",
"Al-Ghamdi", "Al-Ahmari", "Al-Malki", "Al-Khaldi", "Al-Bakr"
]
# Departments and journey types
DEPARTMENTS = [
"Cardiology", "Orthopedics", "Pediatrics", "Emergency", "General",
"Internal Medicine", "Surgery", "Oncology", "Neurology", "Gynecology"
]
def get_active_hospital_codes() -> List[str]:
"""Query active hospitals from the database and return their codes"""
try:
hospital_codes = list(
Hospital.objects.filter(status='active').values_list('code', flat=True)
)
if not hospital_codes:
# Fallback to default if no active hospitals found
print("⚠️ Warning: No active hospitals found, using default ALH-main")
return ["ALH-main"]
return hospital_codes
except Exception as e:
print(f"⚠️ Error querying hospitals: {e}, using default ALH-main")
return ["ALH-main"]
JOURNEY_TYPES = {
"ems": ["EMS_STAGE_1_DISPATCHED", "EMS_STAGE_2_ON_SCENE", "EMS_STAGE_3_TRANSPORT", "EMS_STAGE_4_HANDOFF"],
"inpatient": [
"INPATIENT_STAGE_1_ADMISSION", "INPATIENT_STAGE_2_TREATMENT",
"INPATIENT_STAGE_3_NURSING", "INPATIENT_STAGE_4_LAB",
"INPATIENT_STAGE_5_RADIOLOGY", "INPATIENT_STAGE_6_DISCHARGE"
],
"opd": [
"OPD_STAGE_1_REGISTRATION", "OPD_STAGE_2_CONSULTATION",
"OPD_STAGE_3_LAB", "OPD_STAGE_4_RADIOLOGY", "OPD_STAGE_5_PHARMACY"
]
}
def generate_random_saudi_phone() -> str:
"""Generate random Saudi phone number"""
return f"+9665{random.randint(0, 9)}{random.randint(1000000, 9999999)}"
def generate_random_national_id() -> str:
"""Generate random Saudi national ID (10 digits)"""
return "".join([str(random.randint(0, 9)) for _ in range(10)])
def generate_random_mrn() -> str:
"""Generate random MRN"""
return f"MRN-{random.randint(100000, 999999)}"
def generate_random_encounter_id() -> str:
"""Generate random encounter ID"""
year = datetime.now().year
return f"ENC-{year}-{random.randint(1, 99999):05d}"
def generate_random_email(first_name: str, last_name: str) -> str:
"""Generate random email address"""
domains = ["gmail.com", "yahoo.com", "hotmail.com", "outlook.com"]
domain = random.choice(domains)
return f"{first_name.lower()}.{last_name.lower()}@{domain}"
def generate_patient_journey() -> Dict:
"""Generate a complete or partial patient journey"""
encounter_id = generate_random_encounter_id()
mrn = generate_random_mrn()
national_id = generate_random_national_id()
first_name = random.choice(ARABIC_FIRST_NAMES)
last_name = random.choice(ARABIC_LAST_NAMES)
phone = generate_random_saudi_phone()
email = generate_random_email(first_name, last_name)
visit_type = random.choice(["ems", "inpatient", "opd"])
department = random.choice(DEPARTMENTS)
# Query active hospitals dynamically
hospital_codes = get_active_hospital_codes()
hospital_code = random.choice(hospital_codes)
# Get available event codes for this journey type
available_events = JOURNEY_TYPES[visit_type]
# Determine how many stages to complete (random: some full, some partial)
# 40% chance of full journey, 60% chance of partial
is_full_journey = random.random() < 0.4
num_stages = len(available_events) if is_full_journey else random.randint(1, len(available_events) - 1)
# Select events for this journey
journey_events = available_events[:num_stages]
# Generate events with timestamps
base_time = datetime.now()
events = []
for i, event_code in enumerate(journey_events):
# Stagger events by 1-2 hours
event_time = base_time + timedelta(hours=i*1.5, minutes=random.randint(0, 30))
event = {
"encounter_id": encounter_id,
"mrn": mrn,
"national_id": national_id,
"first_name": first_name,
"last_name": last_name,
"phone": phone,
"email": email,
"event_type": event_code,
"timestamp": event_time.isoformat() + "Z",
"visit_type": visit_type,
"department": department,
"hospital_code": hospital_code
}
events.append(event)
return {
"events": events,
"summary": {
"encounter_id": encounter_id,
"patient_name": f"{first_name} {last_name}",
"visit_type": visit_type,
"stages_completed": num_stages,
"total_stages": len(available_events),
"is_full_journey": is_full_journey,
"hospital_code": hospital_code
}
}
def send_events_to_api(api_url: str, events: List[Dict]) -> bool:
"""Send events to the PX360 API"""
try:
# API expects a dictionary with 'events' key
payload = {"events": events}
response = requests.post(
api_url,
json=payload,
headers={"Content-Type": "application/json"},
timeout=10
)
if response.status_code == 200:
return True
else:
print(f" ❌ API Error: {response.status_code} - {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f" ❌ Request failed: {str(e)}")
return False
def print_journey_summary(summary: Dict, success: bool):
"""Print formatted journey summary"""
status_symbol = "" if success else ""
journey_type_symbol = {
"ems": "🚑",
"inpatient": "🏥",
"opd": "🏥"
}.get(summary["visit_type"], "📋")
status_text = "Full Journey" if summary["is_full_journey"] else "Partial Journey"
print(f"\n{status_symbol} {journey_type_symbol} Patient Journey Created")
print(f" Patient: {summary['patient_name']}")
print(f" Encounter ID: {summary['encounter_id']}")
print(f" Hospital: {summary['hospital_code']}")
print(f" Type: {summary['visit_type'].upper()} - {status_text}")
print(f" Stages: {summary['stages_completed']}/{summary['total_stages']} completed")
print(f" API Status: {'Success' if success else 'Failed'}")
def print_statistics(stats: Dict):
"""Print simulation statistics"""
print(f"\n{'='*70}")
print(f"📊 SIMULATION STATISTICS")
print(f"{'='*70}")
print(f"Total Journeys: {stats['total']}")
print(f"Successful: {stats['successful']} ({stats['success_rate']:.1f}%)")
print(f"Failed: {stats['failed']}")
print(f"Full Journeys: {stats['full_journeys']}")
print(f"Partial Journeys: {stats['partial_journeys']}")
print(f"EMS Journeys: {stats['ems_journeys']}")
print(f"Inpatient Journeys: {stats['inpatient_journeys']}")
print(f"OPD Journeys: {stats['opd_journeys']}")
print(f"Total Events Sent: {stats['total_events']}")
if stats['hospital_distribution']:
print(f"\n🏥 Hospital Distribution:")
for hospital, count in sorted(stats['hospital_distribution'].items()):
percentage = (count / stats['total']) * 100 if stats['total'] > 0 else 0
print(f" {hospital}: {count} ({percentage:.1f}%)")
print(f"{'='*70}\n")
def main():
"""Main simulator loop"""
parser = argparse.ArgumentParser(description="HIS Simulator - Continuous event generator")
parser.add_argument("--url",
default="http://localhost:8000/api/simulator/his-events/",
help="API endpoint URL")
parser.add_argument("--delay",
type=int,
default=5,
help="Delay between events in seconds")
parser.add_argument("--max-patients",
type=int,
default=0,
help="Maximum number of patients to simulate (0 = infinite)")
args = parser.parse_args()
print("="*70)
print("🏥 HIS SIMULATOR - Patient Journey Event Generator")
print("="*70)
print(f"API URL: {args.url}")
print(f"Delay: {args.delay} seconds between events")
print(f"Max Patients: {args.max_patients if args.max_patients > 0 else 'Infinite'}")
print("="*70)
print("\nStarting simulation... Press Ctrl+C to stop\n")
# Statistics
stats = {
"total": 0,
"successful": 0,
"failed": 0,
"full_journeys": 0,
"partial_journeys": 0,
"ems_journeys": 0,
"inpatient_journeys": 0,
"opd_journeys": 0,
"total_events": 0,
"hospital_distribution": {}
}
patient_count = 0
try:
while True:
# Check max patients limit
if args.max_patients > 0 and patient_count >= args.max_patients:
print(f"\n✓ Reached maximum patient limit: {args.max_patients}")
break
# Generate patient journey
journey_data = generate_patient_journey()
events = journey_data["events"]
summary = journey_data["summary"]
# Send events to API
print(f"\n📤 Sending {len(events)} events for {summary['patient_name']}...")
success = send_events_to_api(args.url, events)
# Update statistics
patient_count += 1
stats["total"] += 1
stats["total_events"] += len(events)
if success:
stats["successful"] += 1
else:
stats["failed"] += 1
if summary["is_full_journey"]:
stats["full_journeys"] += 1
else:
stats["partial_journeys"] += 1
if summary["visit_type"] == "ems":
stats["ems_journeys"] += 1
elif summary["visit_type"] == "inpatient":
stats["inpatient_journeys"] += 1
else:
stats["opd_journeys"] += 1
# Track hospital distribution
hospital = summary["hospital_code"]
stats["hospital_distribution"][hospital] = stats["hospital_distribution"].get(hospital, 0) + 1
# Calculate success rate
stats["success_rate"] = (stats["successful"] / stats["total"]) * 100 if stats["total"] > 0 else 0
# Print journey summary
print_journey_summary(summary, success)
# Print statistics every 10 patients
if patient_count % 10 == 0:
print_statistics(stats)
# Wait before next patient
time.sleep(args.delay)
except KeyboardInterrupt:
print("\n\n⏹️ Simulation stopped by user")
print_statistics(stats)
print("Goodbye! 👋\n")
if __name__ == "__main__":
main()