HH/run.py
2026-01-06 18:18:35 +03:00

117 lines
4.0 KiB
Python

import json
import time
import random
import uuid
import requests
from datetime import datetime, timedelta
# Configuration
API_ENDPOINT = "http://localhost:8000/config/test/" # Update with your actual endpoint
SIMULATION_SPEED_FACTOR = 1 # 1 is real-time, higher is faster
BATCH_SIZE = 1 # Number of events to send at once
# Mock Data Constants
DEPARTMENTS = ["Emergency", "Cardiology", "Pediatrics", "Radiology", "General Medicine"]
VISIT_TYPES = ["Inpatient", "Outpatient", "Emergency"]
STAGES = ["Triage", "Consultation", "Diagnostic Testing", "Observation", "Discharge"]
ROLES = ["Doctor", "Nurse", "Technician", "Admin"]
# Staff Directory Simulation
STAFF_LIST = [
{
"staff_id": f"STF-{1000 + i}",
"first_name": f"StaffFirst_{i}",
"last_name": f"StaffLast_{i}",
"role": random.choice(ROLES),
"department": random.choice(DEPARTMENTS)
} for i in range(20)
]
def generate_patient_journey():
"""Generates a realistic patient journey payload with National ID and MRN."""
p_id = f"PAT-{random.randint(10000, 99999)}"
mrn = f"MRN-{random.randint(100000, 999999)}"
national_id = f"{random.randint(100, 999)}-{random.randint(10, 99)}-{random.randint(1000, 9999)}"
e_id = f"ENC-{uuid.uuid4().hex[:8].upper()}"
# Times
checkin = datetime.now() - timedelta(minutes=random.randint(30, 240))
payload = {
"patient_info": {
"patient_id": p_id,
"mrn": mrn,
"national_id": national_id,
"first_name": f"Patient_{p_id[-3:]}",
"last_name": "Doe",
"phone": f"+1-555-{random.randint(100, 999)}-{random.randint(1000, 9999)}"
},
"visit_info": {
"encounter_id": e_id,
"patient_id": p_id,
"visit_type": random.choice(VISIT_TYPES),
"department": random.choice(DEPARTMENTS),
"checkin_time": checkin.isoformat(),
"discharge_time": datetime.now().isoformat()
},
"journey_steps": []
}
# Generate 1 to 4 journey steps
num_steps = random.randint(1, 4)
step_time = checkin
for i in range(num_steps):
staff = random.choice(STAFF_LIST)
step_time += timedelta(minutes=random.randint(10, 45))
step = {
"encounter_id": e_id,
"patient_id": p_id,
"stage": STAGES[min(i, len(STAGES)-1)],
"timestamp": step_time.isoformat(),
"staff_id": staff["staff_id"],
"wait_minutes": random.randint(0, 30)
}
payload["journey_steps"].append(step)
return payload
def send_data(payload):
"""Sends the JSON payload to the configured endpoint."""
print(f"[{datetime.now().strftime('%H:%M:%S')}] Sending journey data for Patient MRN: {payload['patient_info']['mrn']}...")
try:
# Note: In a real scenario, you'd add headers={'Authorization': 'Bearer YOUR_TOKEN'}
response = requests.post(API_ENDPOINT, json=payload, timeout=10)
response.raise_for_status()
# Simulated success for demonstration
print(f"Successfully sent {len(payload['journey_steps'])} journey steps to {API_ENDPOINT}")
# print(json.dumps(payload, indent=2)) # Uncomment to see full payload
except requests.exceptions.RequestException as e:
print(f"Error sending data: {e}")
def run_simulation():
"""Main loop for the simulation."""
print("Starting PX360 Integration Simulation...")
print(f"Target Endpoint: {API_ENDPOINT}")
print("-" * 50)
try:
while True:
# Generate and send data
data = generate_patient_journey()
send_data(data)
# Wait for a random interval between 2 and 10 seconds (scaled)
wait_time = random.uniform(2, 10) / SIMULATION_SPEED_FACTOR
print(f"Next event in {wait_time:.2f} seconds...\n")
time.sleep(wait_time)
except KeyboardInterrupt:
print("\nSimulation stopped by user.")
if __name__ == "__main__":
run_simulation()