import json import time import random import uuid import requests from datetime import datetime, timedelta # Configuration API_ENDPOINT = "http://localhost:8000/config/test/" # Update with your actual endpoint SIMULATION_SPEED_FACTOR = 1 # 1 is real-time, higher is faster BATCH_SIZE = 1 # Number of events to send at once # Mock Data Constants DEPARTMENTS = ["Emergency", "Cardiology", "Pediatrics", "Radiology", "General Medicine"] VISIT_TYPES = ["Inpatient", "Outpatient", "Emergency"] STAGES = ["Triage", "Consultation", "Diagnostic Testing", "Observation", "Discharge"] ROLES = ["Doctor", "Nurse", "Technician", "Admin"] # Staff Directory Simulation STAFF_LIST = [ { "staff_id": f"STF-{1000 + i}", "first_name": f"StaffFirst_{i}", "last_name": f"StaffLast_{i}", "role": random.choice(ROLES), "department": random.choice(DEPARTMENTS) } for i in range(20) ] def generate_patient_journey(): """Generates a realistic patient journey payload with National ID and MRN.""" p_id = f"PAT-{random.randint(10000, 99999)}" mrn = f"MRN-{random.randint(100000, 999999)}" national_id = f"{random.randint(100, 999)}-{random.randint(10, 99)}-{random.randint(1000, 9999)}" e_id = f"ENC-{uuid.uuid4().hex[:8].upper()}" # Times checkin = datetime.now() - timedelta(minutes=random.randint(30, 240)) payload = { "patient_info": { "patient_id": p_id, "mrn": mrn, "national_id": national_id, "first_name": f"Patient_{p_id[-3:]}", "last_name": "Doe", "phone": f"+1-555-{random.randint(100, 999)}-{random.randint(1000, 9999)}" }, "visit_info": { "encounter_id": e_id, "patient_id": p_id, "visit_type": random.choice(VISIT_TYPES), "department": random.choice(DEPARTMENTS), "checkin_time": checkin.isoformat(), "discharge_time": datetime.now().isoformat() }, "journey_steps": [] } # Generate 1 to 4 journey steps num_steps = random.randint(1, 4) step_time = checkin for i in range(num_steps): staff = random.choice(STAFF_LIST) step_time += timedelta(minutes=random.randint(10, 45)) step = { "encounter_id": e_id, "patient_id": p_id, "stage": STAGES[min(i, len(STAGES)-1)], "timestamp": step_time.isoformat(), "staff_id": staff["staff_id"], "wait_minutes": random.randint(0, 30) } payload["journey_steps"].append(step) return payload def send_data(payload): """Sends the JSON payload to the configured endpoint.""" print(f"[{datetime.now().strftime('%H:%M:%S')}] Sending journey data for Patient MRN: {payload['patient_info']['mrn']}...") try: # Note: In a real scenario, you'd add headers={'Authorization': 'Bearer YOUR_TOKEN'} response = requests.post(API_ENDPOINT, json=payload, timeout=10) response.raise_for_status() # Simulated success for demonstration print(f"Successfully sent {len(payload['journey_steps'])} journey steps to {API_ENDPOINT}") # print(json.dumps(payload, indent=2)) # Uncomment to see full payload except requests.exceptions.RequestException as e: print(f"Error sending data: {e}") def run_simulation(): """Main loop for the simulation.""" print("Starting PX360 Integration Simulation...") print(f"Target Endpoint: {API_ENDPOINT}") print("-" * 50) try: while True: # Generate and send data data = generate_patient_journey() send_data(data) # Wait for a random interval between 2 and 10 seconds (scaled) wait_time = random.uniform(2, 10) / SIMULATION_SPEED_FACTOR print(f"Next event in {wait_time:.2f} seconds...\n") time.sleep(wait_time) except KeyboardInterrupt: print("\nSimulation stopped by user.") if __name__ == "__main__": run_simulation()