import os import django # Set up Django environment os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hospital_management.settings') django.setup() import random from datetime import datetime, date, time, timedelta from django.utils import timezone as django_timezone from django.contrib.auth import get_user_model from laboratory.models import LabTest, LabOrder, Specimen, LabResult, QualityControl, ReferenceRange from patients.models import PatientProfile from emr.models import Encounter from core.models import Tenant import uuid from decimal import Decimal User = get_user_model() # Simplified Saudi laboratory test data SAUDI_LAB_TESTS = [ # (test_name, category, specimen_type, tat_hours) ('Complete Blood Count', 'HEMATOLOGY', 'BLOOD', 2), ('Erythrocyte Sedimentation Rate', 'HEMATOLOGY', 'BLOOD', 1), ('Prothrombin Time', 'HEMATOLOGY', 'BLOOD', 1), ('Basic Metabolic Panel', 'CHEMISTRY', 'BLOOD', 1), ('Comprehensive Metabolic Panel', 'CHEMISTRY', 'BLOOD', 2), ('Lipid Profile', 'CHEMISTRY', 'BLOOD', 4), ('Liver Function Tests', 'CHEMISTRY', 'BLOOD', 2), ('Thyroid Function Tests', 'ENDOCRINOLOGY', 'BLOOD', 6), ('Hemoglobin A1C', 'CHEMISTRY', 'BLOOD', 8), ('Blood Culture', 'MICROBIOLOGY', 'BLOOD', 72), ('Urine Culture', 'MICROBIOLOGY', 'URINE', 48), ('Throat Culture', 'MICROBIOLOGY', 'SWAB', 24), ('Hepatitis Panel', 'IMMUNOLOGY', 'BLOOD', 8), ('HIV Test', 'IMMUNOLOGY', 'BLOOD', 4), ('Urinalysis', 'CHEMISTRY', 'URINE', 1), ('Urine Microscopy', 'CHEMISTRY', 'URINE', 1), ] def create_mock_lab_staff(tenants): """Create mock lab staff""" lab_staff = [] mock_staff = [ {'first_name': 'Sara', 'last_name': 'Al-Mahmoud', 'role': 'LABORATORY_TECHNICIAN'}, {'first_name': 'Omar', 'last_name': 'Al-Rashid', 'role': 'LABORATORY_TECHNICIAN'}, {'first_name': 'Layla', 'last_name': 'Al-Zahra', 'role': 'PATHOLOGIST'}, {'first_name': 'Ahmed', 'last_name': 'Al-Farisi', 'role': 'LABORATORY_TECHNICIAN'}, ] for tenant in tenants: # Use a simple naming scheme instead of domain tenant_prefix = f"tenant{tenant.id}" for staff_data in mock_staff: try: email = f"{staff_data['first_name'].lower()}.{staff_data['last_name'].lower().replace('-', '')}@{tenant_prefix}.com" existing_user = User.objects.filter(email=email).first() if not existing_user: user = User.objects.create_user( email=email, first_name=staff_data['first_name'], last_name=staff_data['last_name'], # employee_profile__role=staff_data['role'], tenant=tenant, is_active=True, password='temp_password_123' ) lab_staff.append(user) else: lab_staff.append(existing_user) except Exception as e: print(f"Error creating lab staff: {e}") continue return lab_staff def get_lab_staff(tenants): """Get lab staff for laboratory operations""" lab_staff = [] for tenant in tenants: tenant_staff = User.objects.filter( tenant=tenant, is_active=True, employee_profile__role__in=['LABORATORY_TECHNICIAN', 'PATHOLOGIST', 'PHYSICIAN'] ) lab_staff.extend(list(tenant_staff)) # Create mock staff if none exist if not lab_staff: print("No existing lab staff found, creating mock staff...") lab_staff = create_mock_lab_staff(tenants) return lab_staff def create_lab_tests(tenants): """Create laboratory test master data""" lab_tests = [] for tenant in tenants: for test_data in SAUDI_LAB_TESTS: test_name, category, specimen_type, tat_hours = test_data try: lab_test = LabTest.objects.create( tenant=tenant, test_id=uuid.uuid4(), test_code=f"LAB{random.randint(1000, 9999)}", test_name=test_name, test_description=f"Laboratory test for {test_name}", loinc_code=f"{random.randint(10000, 99999)}-{random.randint(0, 9)}", cpt_code=f"{random.randint(80000, 89999)}", snomed_code=f"{random.randint(100000000, 999999999)}", test_category=category if category in [ 'HEMATOLOGY', 'CHEMISTRY', 'MICROBIOLOGY', 'IMMUNOLOGY', 'PATHOLOGY', 'MOLECULAR', 'TOXICOLOGY', 'ENDOCRINOLOGY', 'OTHER' ] else 'OTHER', test_type=random.choice(['QUANTITATIVE', 'QUALITATIVE', 'SEMI_QUANTITATIVE']), specimen_type=specimen_type if specimen_type in [ 'BLOOD', 'SERUM', 'PLASMA', 'URINE', 'STOOL', 'CSF', 'SPUTUM', 'TISSUE', 'SWAB', 'OTHER' ] else 'BLOOD', specimen_volume=Decimal(str(round(random.uniform(2.0, 10.0), 1))), collection_container=random.choice([ 'EDTA Tube', 'Heparin Tube', 'Serum Tube', 'Urine Container', 'Sterile Container', 'Special Transport Medium' ]), collection_instructions=f"Collect specimen in appropriate container", processing_time=random.randint(30, 120), turnaround_time=tat_hours, stat_available=random.choice([True, False]), stat_turnaround_time=max(1, tat_hours // 2) if random.choice([True, False]) else None, storage_temperature=random.choice(['ROOM_TEMP', 'REFRIGERATED', 'FROZEN']), transport_requirements="Standard transport conditions", stability_time=random.randint(24, 168), clinical_significance=f"Important diagnostic test for {category.lower()}", indications=f"Diagnosis and monitoring of {category.lower()} conditions", contraindications="None known" if random.choice([True, False]) else None, patient_preparation="No special preparation required" if random.choice( [True, False]) else "Fasting required", fasting_required=random.choice([True, False]), fasting_hours=random.choice([8, 12]) if random.choice([True, False]) else None, methodology=random.choice([ 'Automated Chemistry Analyzer', 'Flow Cytometry', 'ELISA', 'PCR', 'Mass Spectrometry', 'Microscopy' ]), analyzer=random.choice([ 'Roche Cobas', 'Abbott Architect', 'Siemens Dimension', 'Beckman Coulter', 'Ortho Clinical' ]), qc_frequency=random.choice(['DAILY', 'WEEKLY', 'MONTHLY']), cost=Decimal(str(round(random.uniform(50.0, 500.0), 2))), is_active=True, is_orderable=True, department=random.choice(['CLINICAL_LAB', 'HEMATOLOGY', 'CHEMISTRY', 'MICROBIOLOGY']), created_at=django_timezone.now() - timedelta(days=random.randint(30, 365)), updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30)) ) lab_tests.append(lab_test) except Exception as e: print(f"Error creating lab test {test_name}: {e}") continue print(f"Created {len(lab_tests)} lab tests") return lab_tests def create_reference_ranges(lab_tests): """Create reference ranges for lab tests""" reference_ranges = [] for test in lab_tests: # Create reference ranges for different demographics demographics = [ {'gender': 'MALE', 'age_min': 18, 'age_max': 65}, {'gender': 'FEMALE', 'age_min': 18, 'age_max': 65}, {'gender': 'BOTH', 'age_min': 0, 'age_max': 17}, # Pediatric ] for demo in demographics: try: # Generate realistic ranges based on test type if test.test_type == 'QUANTITATIVE': low_value = round(random.uniform(1.0, 50.0), 2) high_value = round(low_value + random.uniform(10.0, 100.0), 2) reference_range = ReferenceRange.objects.create( test=test, range_id=uuid.uuid4(), gender=demo['gender'], age_min=demo['age_min'], age_max=demo['age_max'], range_low=Decimal(str(low_value)), range_high=Decimal(str(high_value)), range_text=f"{low_value} - {high_value}", critical_low=Decimal(str(max(0, low_value - random.uniform(5.0, 15.0)))), critical_high=Decimal(str(high_value + random.uniform(20.0, 50.0))), unit=random.choice(['mg/dL', 'mmol/L', 'IU/L', 'g/dL', 'cells/uL']), is_active=True, created_at=django_timezone.now() - timedelta(days=random.randint(30, 365)), updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30)) ) else: reference_range = ReferenceRange.objects.create( test=test, range_id=uuid.uuid4(), gender=demo['gender'], age_min=demo['age_min'], age_max=demo['age_max'], range_text=random.choice(['Negative', 'Normal', 'Not Detected']), unit='', is_active=True, created_at=django_timezone.now() - timedelta(days=random.randint(30, 365)), updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30)) ) reference_ranges.append(reference_range) except Exception as e: print(f"Error creating reference range for {test.test_name}: {e}") continue print(f"Created {len(reference_ranges)} reference ranges") return reference_ranges def create_lab_orders(tenants, lab_tests, days_back=30): """Create lab orders""" lab_orders = [] # Get required data patients = list(PatientProfile.objects.filter(tenant__in=tenants)) providers = list(User.objects.filter( tenant__in=tenants, employee_profile__role__in=['PHYSICIAN', 'NURSE_PRACTITIONER'], is_active=True )) encounters = list(Encounter.objects.filter(tenant__in=tenants)) if not all([patients, providers, lab_tests]): print( f"Missing required data - Patients: {len(patients)}, Providers: {len(providers)}, Lab Tests: {len(lab_tests)}") return lab_orders start_date = django_timezone.now().date() - timedelta(days=days_back) for day_offset in range(days_back): order_date = start_date + timedelta(days=day_offset) daily_orders = random.randint(15, 40) for order_num in range(daily_orders): patient = random.choice(patients) provider = random.choice([p for p in providers if p.tenant == patient.tenant]) # Link to encounter if available patient_encounters = [e for e in encounters if e.patient == patient] encounter = random.choice(patient_encounters) if patient_encounters else None order_datetime = django_timezone.make_aware( datetime.combine(order_date, time(random.randint(8, 18), random.randint(0, 59))) ) # Select 1-5 tests for this order tenant_tests = [t for t in lab_tests if t.tenant == patient.tenant] if not tenant_tests: continue selected_tests = random.sample(tenant_tests, min(random.randint(1, 5), len(tenant_tests))) try: lab_order = LabOrder.objects.create( tenant=patient.tenant, order_id=uuid.uuid4(), patient=patient, ordering_provider=provider, order_datetime=order_datetime, priority=random.choices( ['ROUTINE', 'URGENT', 'STAT'], weights=[70, 25, 5] )[0], clinical_indication=random.choice([ 'Routine screening', 'Follow-up', 'Diagnostic workup', 'Pre-operative', 'Annual physical', 'Symptom evaluation' ]), diagnosis_code=f"Z{random.randint(10, 99)}.{random.randint(10, 99)}", clinical_notes=f"Ordered for {random.choice(['routine screening', 'follow-up care', 'diagnostic evaluation'])}", collection_datetime=order_datetime + timedelta(hours=random.randint(2, 24)), collection_location=random.choice([ 'Outpatient Lab', 'Emergency Department', 'Inpatient Room', 'ICU', 'Medical Ward' ]), fasting_status=random.choice(['FASTING', 'NON_FASTING', 'UNKNOWN']), status=random.choices( ['ORDERED', 'COLLECTED', 'PROCESSING', 'COMPLETED', 'CANCELLED'], weights=[20, 15, 25, 35, 5] )[0], encounter=encounter, special_instructions="Handle with care" if random.choice([True, False]) else None, created_at=order_datetime, updated_at=order_datetime + timedelta(hours=random.randint(1, 48)) ) # Add tests to the order lab_order.tests.set(selected_tests) lab_orders.append(lab_order) except Exception as e: print(f"Error creating lab order: {e}") continue print(f"Created {len(lab_orders)} lab orders") return lab_orders def create_specimens(lab_orders, lab_staff): """Create specimen records""" specimens = [] for order in lab_orders: if order.status in ['CANCELLED']: continue # Create 1-2 specimens per order (some tests can share specimens) num_specimens = random.randint(1, 2) for spec_num in range(num_specimens): collection_datetime = order.collection_datetime or ( order.order_datetime + timedelta(hours=random.randint(2, 24)) ) collector = random.choice([s for s in lab_staff if s.tenant == order.tenant]) try: specimen = Specimen.objects.create( order=order, specimen_id=uuid.uuid4(), specimen_type=random.choice(['BLOOD', 'SERUM', 'PLASMA', 'URINE']), container_type=random.choice([ 'EDTA Tube', 'Heparin Tube', 'Serum Tube', 'Urine Container' ]), volume=Decimal(str(round(random.uniform(2.0, 10.0), 1))), collected_datetime=collection_datetime, collected_by=collector, collection_site=random.choice([ 'Antecubital vein', 'Hand vein', 'Clean catch midstream' ]), collection_method=random.choice(['VENIPUNCTURE', 'CAPILLARY', 'CLEAN_CATCH']), quality=random.choices( ['ACCEPTABLE', 'HEMOLYZED', 'LIPEMIC', 'CLOTTED', 'INSUFFICIENT'], weights=[80, 8, 5, 4, 3] )[0], rejection_reason=random.choice([ 'Hemolyzed sample', 'Insufficient volume', 'Unlabeled specimen' ]) if random.random() < 0.05 else None, quality_notes="Good quality specimen" if random.choice([True, False]) else None, received_datetime=collection_datetime + timedelta(hours=random.randint(1, 4)), received_by=random.choice([s for s in lab_staff if s.tenant == order.tenant]), storage_location=f"Rack {random.randint(1, 20)}, Position {random.randint(1, 50)}", storage_temperature=random.choice(['ROOM_TEMP', 'REFRIGERATED']), status=random.choices( ['COLLECTED', 'RECEIVED', 'PROCESSING', 'ANALYZED', 'ARCHIVED'], weights=[10, 15, 25, 40, 10] )[0], chain_of_custody=f"Collected by {collector.get_full_name()}", created_at=collection_datetime, updated_at=collection_datetime + timedelta(hours=random.randint(1, 24)) ) specimens.append(specimen) except Exception as e: print(f"Error creating specimen: {e}") continue print(f"Created {len(specimens)} specimens") return specimens def create_lab_results(lab_orders, specimens, lab_staff, reference_ranges): """Create lab results""" lab_results = [] for order in lab_orders: if order.status in ['ORDERED', 'CANCELLED']: continue order_specimens = [s for s in specimens if s.order == order and s.status not in ['REJECTED']] if not order_specimens: continue for test in order.tests.all(): specimen = random.choice(order_specimens) # Get reference range for this test test_ranges = [r for r in reference_ranges if r.test == test] ref_range = random.choice(test_ranges) if test_ranges else None analyzed_datetime = specimen.received_datetime + timedelta( hours=random.randint(1, test.turnaround_time) ) analyzer_user = random.choice([s for s in lab_staff if s.tenant == order.tenant]) try: if test.test_type == 'QUANTITATIVE' and ref_range and ref_range.range_low and ref_range.range_high: # Generate quantitative result if random.random() < 0.8: # 80% normal results result_value = round(random.uniform( float(ref_range.range_low), float(ref_range.range_high) ), 2) abnormal_flag = 'NORMAL' else: # 20% abnormal results if random.choice([True, False]): result_value = round(random.uniform( float(ref_range.range_high), float(ref_range.range_high) * 1.5 ), 2) abnormal_flag = 'HIGH' else: result_value = round(random.uniform( float(ref_range.range_low) * 0.5, float(ref_range.range_low) ), 2) abnormal_flag = 'LOW' else: # Qualitative result result_value = random.choice(['Negative', 'Positive', 'Normal', 'Abnormal']) abnormal_flag = 'NORMAL' if result_value in ['Negative', 'Normal'] else 'ABNORMAL' is_critical = abnormal_flag in ['HIGH', 'LOW'] and random.random() < 0.1 lab_result = LabResult.objects.create( order=order, test=test, specimen=specimen, result_id=uuid.uuid4(), result_value=str(result_value), result_unit=ref_range.unit if ref_range else '', result_type=test.test_type, reference_range=ref_range.display_range if ref_range else 'See reference', abnormal_flag=abnormal_flag, is_critical=is_critical, critical_called=is_critical and random.choice([True, False]), critical_called_datetime=analyzed_datetime + timedelta(minutes=30) if is_critical else None, critical_called_to=order.ordering_provider.get_full_name() if is_critical else None, analyzed_datetime=analyzed_datetime, analyzed_by=analyzer_user, analyzer=test.analyzer, verified=random.choice([True, False]), verified_by=random.choice([s for s in lab_staff if s.tenant == order.tenant and s.employee_profile.role == 'PATHOLOGIST']) if random.choice( [True, False]) else None, verified_datetime=analyzed_datetime + timedelta(hours=random.randint(1, 4)) if random.choice( [True, False]) else None, status=random.choices( ['PENDING', 'PRELIMINARY', 'FINAL', 'CORRECTED'], weights=[10, 20, 65, 5] )[0], technician_comments="Result verified" if random.choice([True, False]) else None, pathologist_comments="Normal findings" if random.choice([True, False]) else None, qc_passed=random.choice([True, False]), qc_notes="QC within acceptable limits" if random.choice([True, False]) else None, reported_datetime=analyzed_datetime + timedelta(hours=random.randint(2, 6)), created_at=analyzed_datetime, updated_at=analyzed_datetime + timedelta(hours=random.randint(1, 8)) ) lab_results.append(lab_result) except Exception as e: print(f"Error creating lab result for {test.test_name}: {e}") continue print(f"Created {len(lab_results)} lab results") return lab_results def create_quality_control(lab_tests, lab_staff): """Create quality control records""" qc_records = [] for test in lab_tests: # Create QC records for the past 30 days for day_offset in range(30): qc_date = django_timezone.now().date() - timedelta(days=day_offset) # Skip weekends for some tests if qc_date.weekday() > 4 and random.choice([True, False]): continue qc_datetime = django_timezone.make_aware( datetime.combine(qc_date, time(random.randint(6, 10), random.randint(0, 59))) ) performed_by = random.choice([s for s in lab_staff if s.tenant == test.tenant]) # Generate target values and acceptable ranges target_value = round(random.uniform(50.0, 200.0), 2) acceptable_low = round(target_value * 0.9, 2) acceptable_high = round(target_value * 1.1, 2) # Generate observed value (mostly within range) if random.random() < 0.95: # 95% pass rate observed_value = round(random.uniform(acceptable_low, acceptable_high), 2) qc_status = 'PASS' else: observed_value = round(random.uniform( target_value * 0.7, target_value * 1.3 ), 2) qc_status = 'FAIL' try: qc_record = QualityControl.objects.create( tenant=test.tenant, test=test, qc_id=uuid.uuid4(), control_material=random.choice([ 'Normal Control Level 1', 'Abnormal Control Level 2', 'High Control Level 3', 'Low Control Level 1' ]), control_lot=f"LOT{random.randint(100000, 999999)}", control_level=random.choice(['LEVEL_1', 'LEVEL_2', 'LEVEL_3']), target_value=Decimal(str(target_value)), acceptable_range_low=Decimal(str(acceptable_low)), acceptable_range_high=Decimal(str(acceptable_high)), run_datetime=qc_datetime, observed_value=Decimal(str(observed_value)), status=qc_status, performed_by=performed_by, reviewed_by=random.choice( [s for s in lab_staff if s.tenant == test.tenant ]) if random.choice( [True, False]) else None, analyzer=test.analyzer, comments=f"QC {qc_status.lower()} for {test.test_name}", corrective_action="Repeat analysis" if qc_status == 'FAIL' else None, created_at=qc_datetime, updated_at=qc_datetime + timedelta(hours=random.randint(1, 4)) ) qc_records.append(qc_record) except Exception as e: print(f"Error creating QC record for {test.test_name}: {e}") continue print(f"Created {len(qc_records)} quality control records") return qc_records def main(): """Main function to generate laboratory data""" print("Starting Saudi Laboratory Data Generation...") tenants = list(Tenant.objects.all()) if not tenants: print("āŒ No tenants found.") return print("\n1. Creating Lab Tests...") lab_tests = create_lab_tests(tenants) print("\n2. Creating Reference Ranges...") reference_ranges = create_reference_ranges(lab_tests) print("\n3. Getting Lab Staff...") lab_staff = get_lab_staff(tenants) print("\n4. Creating Lab Orders...") lab_orders = create_lab_orders(tenants, lab_tests, days_back=30) print("\n5. Creating Specimens...") specimens = create_specimens(lab_orders, lab_staff) print("\n6. Creating Lab Results...") lab_results = create_lab_results(lab_orders, specimens, lab_staff, reference_ranges) print("\n7. Creating Quality Control...") qc_records = create_quality_control(lab_tests, lab_staff) print(f"\nāœ… Saudi Laboratory Data Generation Complete!") print(f"šŸ“Š Summary:") print(f" - Lab Tests: {len(lab_tests)}") print(f" - Reference Ranges: {len(reference_ranges)}") print(f" - Lab Orders: {len(lab_orders)}") print(f" - Specimens: {len(specimens)}") print(f" - Lab Results: {len(lab_results)}") print(f" - Quality Control Records: {len(qc_records)}") if __name__ == "__main__": main()