#!/usr/bin/env python """ HIS Simulator - Continuous patient journey event generator This script simulates a Hospital Information System (HIS) by continuously generating patient journey events and sending them to the PX360 API. Usage: python his_simulator.py [--url URL] [--delay SECONDS] [--max-patients N] Arguments: --url: API endpoint URL (default: http://localhost:8000/api/simulator/his-events/) --delay: Delay between events in seconds (default: 5) --max-patients: Maximum number of patients to simulate (default: infinite) """ import argparse import json import random import time import os import sys import django from datetime import datetime, timedelta from typing import List, Dict import requests # Add project root to Python path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) # Setup Django os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.dev') django.setup() from apps.organizations.models import Hospital # Arabic names for realistic patient data ARABIC_FIRST_NAMES = [ "Ahmed", "Mohammed", "Abdullah", "Omar", "Ali", "Saud", "Fahad", "Turki", "Khalid", "Youssef", "Abdulrahman", "Abdulaziz", "Abdulwahab", "Majid", "Nasser", "Fatima", "Aisha", "Sarah", "Nora", "Layla", "Hessa", "Reem", "Mona", "Dalal", "Jawaher" ] ARABIC_LAST_NAMES = [ "Al-Saud", "Al-Rashid", "Al-Qahtani", "Al-Harbi", "Al-Otaibi", "Al-Dossary", "Al-Shammari", "Al-Mutairi", "Al-Anazi", "Al-Zahrani", "Al-Ghamdi", "Al-Ahmari", "Al-Malki", "Al-Khaldi", "Al-Bakr" ] # Departments and journey types DEPARTMENTS = [ "Cardiology", "Orthopedics", "Pediatrics", "Emergency", "General", "Internal Medicine", "Surgery", "Oncology", "Neurology", "Gynecology" ] def get_active_hospital_codes() -> List[str]: """Query active hospitals from the database and return their codes""" try: hospital_codes = list( Hospital.objects.filter(status='active').values_list('code', flat=True) ) if not hospital_codes: # Fallback to default if no active hospitals found print("⚠️ Warning: No active hospitals found, using default ALH-main") return ["ALH-main"] return hospital_codes except Exception as e: print(f"⚠️ Error querying hospitals: {e}, using default ALH-main") return ["ALH-main"] JOURNEY_TYPES = { "ems": ["EMS_STAGE_1_DISPATCHED", "EMS_STAGE_2_ON_SCENE", "EMS_STAGE_3_TRANSPORT", "EMS_STAGE_4_HANDOFF"], "inpatient": [ "INPATIENT_STAGE_1_ADMISSION", "INPATIENT_STAGE_2_TREATMENT", "INPATIENT_STAGE_3_NURSING", "INPATIENT_STAGE_4_LAB", "INPATIENT_STAGE_5_RADIOLOGY", "INPATIENT_STAGE_6_DISCHARGE" ], "opd": [ "OPD_STAGE_1_REGISTRATION", "OPD_STAGE_2_CONSULTATION", "OPD_STAGE_3_LAB", "OPD_STAGE_4_RADIOLOGY", "OPD_STAGE_5_PHARMACY" ] } def generate_random_saudi_phone() -> str: """Generate random Saudi phone number""" return f"+9665{random.randint(0, 9)}{random.randint(1000000, 9999999)}" def generate_random_national_id() -> str: """Generate random Saudi national ID (10 digits)""" return "".join([str(random.randint(0, 9)) for _ in range(10)]) def generate_random_mrn() -> str: """Generate random MRN""" return f"MRN-{random.randint(100000, 999999)}" def generate_random_encounter_id() -> str: """Generate random encounter ID""" year = datetime.now().year return f"ENC-{year}-{random.randint(1, 99999):05d}" def generate_random_email(first_name: str, last_name: str) -> str: """Generate random email address""" domains = ["gmail.com", "yahoo.com", "hotmail.com", "outlook.com"] domain = random.choice(domains) return f"{first_name.lower()}.{last_name.lower()}@{domain}" def generate_patient_journey() -> Dict: """Generate a complete or partial patient journey""" encounter_id = generate_random_encounter_id() mrn = generate_random_mrn() national_id = generate_random_national_id() first_name = random.choice(ARABIC_FIRST_NAMES) last_name = random.choice(ARABIC_LAST_NAMES) phone = generate_random_saudi_phone() email = generate_random_email(first_name, last_name) visit_type = random.choice(["ems", "inpatient", "opd"]) department = random.choice(DEPARTMENTS) # Query active hospitals dynamically hospital_codes = get_active_hospital_codes() hospital_code = random.choice(hospital_codes) # Get available event codes for this journey type available_events = JOURNEY_TYPES[visit_type] # Determine how many stages to complete (random: some full, some partial) # 40% chance of full journey, 60% chance of partial is_full_journey = random.random() < 0.4 num_stages = len(available_events) if is_full_journey else random.randint(1, len(available_events) - 1) # Select events for this journey journey_events = available_events[:num_stages] # Generate events with timestamps base_time = datetime.now() events = [] for i, event_code in enumerate(journey_events): # Stagger events by 1-2 hours event_time = base_time + timedelta(hours=i*1.5, minutes=random.randint(0, 30)) event = { "encounter_id": encounter_id, "mrn": mrn, "national_id": national_id, "first_name": first_name, "last_name": last_name, "phone": phone, "email": email, "event_type": event_code, "timestamp": event_time.isoformat() + "Z", "visit_type": visit_type, "department": department, "hospital_code": hospital_code } events.append(event) return { "events": events, "summary": { "encounter_id": encounter_id, "patient_name": f"{first_name} {last_name}", "visit_type": visit_type, "stages_completed": num_stages, "total_stages": len(available_events), "is_full_journey": is_full_journey, "hospital_code": hospital_code } } def send_events_to_api(api_url: str, events: List[Dict]) -> bool: """Send events to the PX360 API""" try: # API expects a dictionary with 'events' key payload = {"events": events} response = requests.post( api_url, json=payload, headers={"Content-Type": "application/json"}, timeout=10 ) if response.status_code == 200: return True else: print(f" ❌ API Error: {response.status_code} - {response.text}") return False except requests.exceptions.RequestException as e: print(f" ❌ Request failed: {str(e)}") return False def print_journey_summary(summary: Dict, success: bool): """Print formatted journey summary""" status_symbol = "✅" if success else "❌" journey_type_symbol = { "ems": "🚑", "inpatient": "🏥", "opd": "🏥" }.get(summary["visit_type"], "📋") status_text = "Full Journey" if summary["is_full_journey"] else "Partial Journey" print(f"\n{status_symbol} {journey_type_symbol} Patient Journey Created") print(f" Patient: {summary['patient_name']}") print(f" Encounter ID: {summary['encounter_id']}") print(f" Hospital: {summary['hospital_code']}") print(f" Type: {summary['visit_type'].upper()} - {status_text}") print(f" Stages: {summary['stages_completed']}/{summary['total_stages']} completed") print(f" API Status: {'Success' if success else 'Failed'}") def print_statistics(stats: Dict): """Print simulation statistics""" print(f"\n{'='*70}") print(f"📊 SIMULATION STATISTICS") print(f"{'='*70}") print(f"Total Journeys: {stats['total']}") print(f"Successful: {stats['successful']} ({stats['success_rate']:.1f}%)") print(f"Failed: {stats['failed']}") print(f"Full Journeys: {stats['full_journeys']}") print(f"Partial Journeys: {stats['partial_journeys']}") print(f"EMS Journeys: {stats['ems_journeys']}") print(f"Inpatient Journeys: {stats['inpatient_journeys']}") print(f"OPD Journeys: {stats['opd_journeys']}") print(f"Total Events Sent: {stats['total_events']}") if stats['hospital_distribution']: print(f"\n🏥 Hospital Distribution:") for hospital, count in sorted(stats['hospital_distribution'].items()): percentage = (count / stats['total']) * 100 if stats['total'] > 0 else 0 print(f" {hospital}: {count} ({percentage:.1f}%)") print(f"{'='*70}\n") def main(): """Main simulator loop""" parser = argparse.ArgumentParser(description="HIS Simulator - Continuous event generator") parser.add_argument("--url", default="http://localhost:8000/api/simulator/his-events/", help="API endpoint URL") parser.add_argument("--delay", type=int, default=5, help="Delay between events in seconds") parser.add_argument("--max-patients", type=int, default=0, help="Maximum number of patients to simulate (0 = infinite)") args = parser.parse_args() print("="*70) print("🏥 HIS SIMULATOR - Patient Journey Event Generator") print("="*70) print(f"API URL: {args.url}") print(f"Delay: {args.delay} seconds between events") print(f"Max Patients: {args.max_patients if args.max_patients > 0 else 'Infinite'}") print("="*70) print("\nStarting simulation... Press Ctrl+C to stop\n") # Statistics stats = { "total": 0, "successful": 0, "failed": 0, "full_journeys": 0, "partial_journeys": 0, "ems_journeys": 0, "inpatient_journeys": 0, "opd_journeys": 0, "total_events": 0, "hospital_distribution": {} } patient_count = 0 try: while True: # Check max patients limit if args.max_patients > 0 and patient_count >= args.max_patients: print(f"\n✓ Reached maximum patient limit: {args.max_patients}") break # Generate patient journey journey_data = generate_patient_journey() events = journey_data["events"] summary = journey_data["summary"] # Send events to API print(f"\n📤 Sending {len(events)} events for {summary['patient_name']}...") success = send_events_to_api(args.url, events) # Update statistics patient_count += 1 stats["total"] += 1 stats["total_events"] += len(events) if success: stats["successful"] += 1 else: stats["failed"] += 1 if summary["is_full_journey"]: stats["full_journeys"] += 1 else: stats["partial_journeys"] += 1 if summary["visit_type"] == "ems": stats["ems_journeys"] += 1 elif summary["visit_type"] == "inpatient": stats["inpatient_journeys"] += 1 else: stats["opd_journeys"] += 1 # Track hospital distribution hospital = summary["hospital_code"] stats["hospital_distribution"][hospital] = stats["hospital_distribution"].get(hospital, 0) + 1 # Calculate success rate stats["success_rate"] = (stats["successful"] / stats["total"]) * 100 if stats["total"] > 0 else 0 # Print journey summary print_journey_summary(summary, success) # Print statistics every 10 patients if patient_count % 10 == 0: print_statistics(stats) # Wait before next patient time.sleep(args.delay) except KeyboardInterrupt: print("\n\n⏹️ Simulation stopped by user") print_statistics(stats) print("Goodbye! 👋\n") if __name__ == "__main__": main()