hospital-management/lab_data.py
Marwan Alwali ab2c4a36c5 update
2025-10-02 10:13:03 +03:00

599 lines
26 KiB
Python

import os
import django
# Set up Django environment
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hospital_management.settings')
django.setup()
import random
from datetime import datetime, date, time, timedelta
from django.utils import timezone as django_timezone
from django.contrib.auth import get_user_model
from laboratory.models import LabTest, LabOrder, Specimen, LabResult, QualityControl, ReferenceRange
from patients.models import PatientProfile
from emr.models import Encounter
from core.models import Tenant
import uuid
from decimal import Decimal
User = get_user_model()
# Simplified Saudi laboratory test data
SAUDI_LAB_TESTS = [
# (test_name, category, specimen_type, tat_hours)
('Complete Blood Count', 'HEMATOLOGY', 'BLOOD', 2),
('Erythrocyte Sedimentation Rate', 'HEMATOLOGY', 'BLOOD', 1),
('Prothrombin Time', 'HEMATOLOGY', 'BLOOD', 1),
('Basic Metabolic Panel', 'CHEMISTRY', 'BLOOD', 1),
('Comprehensive Metabolic Panel', 'CHEMISTRY', 'BLOOD', 2),
('Lipid Profile', 'CHEMISTRY', 'BLOOD', 4),
('Liver Function Tests', 'CHEMISTRY', 'BLOOD', 2),
('Thyroid Function Tests', 'ENDOCRINOLOGY', 'BLOOD', 6),
('Hemoglobin A1C', 'CHEMISTRY', 'BLOOD', 8),
('Blood Culture', 'MICROBIOLOGY', 'BLOOD', 72),
('Urine Culture', 'MICROBIOLOGY', 'URINE', 48),
('Throat Culture', 'MICROBIOLOGY', 'SWAB', 24),
('Hepatitis Panel', 'IMMUNOLOGY', 'BLOOD', 8),
('HIV Test', 'IMMUNOLOGY', 'BLOOD', 4),
('Urinalysis', 'CHEMISTRY', 'URINE', 1),
('Urine Microscopy', 'CHEMISTRY', 'URINE', 1),
]
def create_mock_lab_staff(tenants):
"""Create mock lab staff"""
lab_staff = []
mock_staff = [
{'first_name': 'Sara', 'last_name': 'Al-Mahmoud', 'role': 'LABORATORY_TECHNICIAN'},
{'first_name': 'Omar', 'last_name': 'Al-Rashid', 'role': 'LABORATORY_TECHNICIAN'},
{'first_name': 'Layla', 'last_name': 'Al-Zahra', 'role': 'PATHOLOGIST'},
{'first_name': 'Ahmed', 'last_name': 'Al-Farisi', 'role': 'LABORATORY_TECHNICIAN'},
]
for tenant in tenants:
# Use a simple naming scheme instead of domain
tenant_prefix = f"tenant{tenant.id}"
for staff_data in mock_staff:
try:
email = f"{staff_data['first_name'].lower()}.{staff_data['last_name'].lower().replace('-', '')}@{tenant_prefix}.com"
existing_user = User.objects.filter(email=email).first()
if not existing_user:
user = User.objects.create_user(
email=email,
first_name=staff_data['first_name'],
last_name=staff_data['last_name'],
# employee_profile__role=staff_data['role'],
tenant=tenant,
is_active=True,
password='temp_password_123'
)
lab_staff.append(user)
else:
lab_staff.append(existing_user)
except Exception as e:
print(f"Error creating lab staff: {e}")
continue
return lab_staff
def get_lab_staff(tenants):
"""Get lab staff for laboratory operations"""
lab_staff = []
for tenant in tenants:
tenant_staff = User.objects.filter(
tenant=tenant,
is_active=True,
employee_profile__role__in=['LABORATORY_TECHNICIAN', 'PATHOLOGIST', 'PHYSICIAN']
)
lab_staff.extend(list(tenant_staff))
# Create mock staff if none exist
if not lab_staff:
print("No existing lab staff found, creating mock staff...")
lab_staff = create_mock_lab_staff(tenants)
return lab_staff
def create_lab_tests(tenants):
"""Create laboratory test master data"""
lab_tests = []
for tenant in tenants:
for test_data in SAUDI_LAB_TESTS:
test_name, category, specimen_type, tat_hours = test_data
try:
lab_test = LabTest.objects.create(
tenant=tenant,
test_id=uuid.uuid4(),
test_code=f"LAB{random.randint(1000, 9999)}",
test_name=test_name,
test_description=f"Laboratory test for {test_name}",
loinc_code=f"{random.randint(10000, 99999)}-{random.randint(0, 9)}",
cpt_code=f"{random.randint(80000, 89999)}",
snomed_code=f"{random.randint(100000000, 999999999)}",
test_category=category if category in [
'HEMATOLOGY', 'CHEMISTRY', 'MICROBIOLOGY', 'IMMUNOLOGY',
'PATHOLOGY', 'MOLECULAR', 'TOXICOLOGY', 'ENDOCRINOLOGY', 'OTHER'
] else 'OTHER',
test_type=random.choice(['QUANTITATIVE', 'QUALITATIVE', 'SEMI_QUANTITATIVE']),
specimen_type=specimen_type if specimen_type in [
'BLOOD', 'SERUM', 'PLASMA', 'URINE', 'STOOL', 'CSF',
'SPUTUM', 'TISSUE', 'SWAB', 'OTHER'
] else 'BLOOD',
specimen_volume=Decimal(str(round(random.uniform(2.0, 10.0), 1))),
collection_container=random.choice([
'EDTA Tube', 'Heparin Tube', 'Serum Tube', 'Urine Container',
'Sterile Container', 'Special Transport Medium'
]),
collection_instructions=f"Collect specimen in appropriate container",
processing_time=random.randint(30, 120),
turnaround_time=tat_hours,
stat_available=random.choice([True, False]),
stat_turnaround_time=max(1, tat_hours // 2) if random.choice([True, False]) else None,
storage_temperature=random.choice(['ROOM_TEMP', 'REFRIGERATED', 'FROZEN']),
transport_requirements="Standard transport conditions",
stability_time=random.randint(24, 168),
clinical_significance=f"Important diagnostic test for {category.lower()}",
indications=f"Diagnosis and monitoring of {category.lower()} conditions",
contraindications="None known" if random.choice([True, False]) else None,
patient_preparation="No special preparation required" if random.choice(
[True, False]) else "Fasting required",
fasting_required=random.choice([True, False]),
fasting_hours=random.choice([8, 12]) if random.choice([True, False]) else None,
methodology=random.choice([
'Automated Chemistry Analyzer', 'Flow Cytometry', 'ELISA',
'PCR', 'Mass Spectrometry', 'Microscopy'
]),
analyzer=random.choice([
'Roche Cobas', 'Abbott Architect', 'Siemens Dimension',
'Beckman Coulter', 'Ortho Clinical'
]),
qc_frequency=random.choice(['DAILY', 'WEEKLY', 'MONTHLY']),
cost=Decimal(str(round(random.uniform(50.0, 500.0), 2))),
is_active=True,
is_orderable=True,
department=random.choice(['CLINICAL_LAB', 'HEMATOLOGY', 'CHEMISTRY', 'MICROBIOLOGY']),
created_at=django_timezone.now() - timedelta(days=random.randint(30, 365)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30))
)
lab_tests.append(lab_test)
except Exception as e:
print(f"Error creating lab test {test_name}: {e}")
continue
print(f"Created {len(lab_tests)} lab tests")
return lab_tests
def create_reference_ranges(lab_tests):
"""Create reference ranges for lab tests"""
reference_ranges = []
for test in lab_tests:
# Create reference ranges for different demographics
demographics = [
{'gender': 'MALE', 'age_min': 18, 'age_max': 65},
{'gender': 'FEMALE', 'age_min': 18, 'age_max': 65},
{'gender': 'BOTH', 'age_min': 0, 'age_max': 17}, # Pediatric
]
for demo in demographics:
try:
# Generate realistic ranges based on test type
if test.test_type == 'QUANTITATIVE':
low_value = round(random.uniform(1.0, 50.0), 2)
high_value = round(low_value + random.uniform(10.0, 100.0), 2)
reference_range = ReferenceRange.objects.create(
test=test,
range_id=uuid.uuid4(),
gender=demo['gender'],
age_min=demo['age_min'],
age_max=demo['age_max'],
range_low=Decimal(str(low_value)),
range_high=Decimal(str(high_value)),
range_text=f"{low_value} - {high_value}",
critical_low=Decimal(str(max(0, low_value - random.uniform(5.0, 15.0)))),
critical_high=Decimal(str(high_value + random.uniform(20.0, 50.0))),
unit=random.choice(['mg/dL', 'mmol/L', 'IU/L', 'g/dL', 'cells/uL']),
is_active=True,
created_at=django_timezone.now() - timedelta(days=random.randint(30, 365)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30))
)
else:
reference_range = ReferenceRange.objects.create(
test=test,
range_id=uuid.uuid4(),
gender=demo['gender'],
age_min=demo['age_min'],
age_max=demo['age_max'],
range_text=random.choice(['Negative', 'Normal', 'Not Detected']),
unit='',
is_active=True,
created_at=django_timezone.now() - timedelta(days=random.randint(30, 365)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30))
)
reference_ranges.append(reference_range)
except Exception as e:
print(f"Error creating reference range for {test.test_name}: {e}")
continue
print(f"Created {len(reference_ranges)} reference ranges")
return reference_ranges
def create_lab_orders(tenants, lab_tests, days_back=30):
"""Create lab orders"""
lab_orders = []
# Get required data
patients = list(PatientProfile.objects.filter(tenant__in=tenants))
providers = list(User.objects.filter(
tenant__in=tenants,
employee_profile__role__in=['PHYSICIAN', 'NURSE_PRACTITIONER'],
is_active=True
))
encounters = list(Encounter.objects.filter(tenant__in=tenants))
if not all([patients, providers, lab_tests]):
print(
f"Missing required data - Patients: {len(patients)}, Providers: {len(providers)}, Lab Tests: {len(lab_tests)}")
return lab_orders
start_date = django_timezone.now().date() - timedelta(days=days_back)
for day_offset in range(days_back):
order_date = start_date + timedelta(days=day_offset)
daily_orders = random.randint(15, 40)
for order_num in range(daily_orders):
patient = random.choice(patients)
provider = random.choice([p for p in providers if p.tenant == patient.tenant])
# Link to encounter if available
patient_encounters = [e for e in encounters if e.patient == patient]
encounter = random.choice(patient_encounters) if patient_encounters else None
order_datetime = django_timezone.make_aware(
datetime.combine(order_date, time(random.randint(8, 18), random.randint(0, 59)))
)
# Select 1-5 tests for this order
tenant_tests = [t for t in lab_tests if t.tenant == patient.tenant]
if not tenant_tests:
continue
selected_tests = random.sample(tenant_tests, min(random.randint(1, 5), len(tenant_tests)))
try:
lab_order = LabOrder.objects.create(
tenant=patient.tenant,
order_id=uuid.uuid4(),
patient=patient,
ordering_provider=provider,
order_datetime=order_datetime,
priority=random.choices(
['ROUTINE', 'URGENT', 'STAT'],
weights=[70, 25, 5]
)[0],
clinical_indication=random.choice([
'Routine screening', 'Follow-up', 'Diagnostic workup',
'Pre-operative', 'Annual physical', 'Symptom evaluation'
]),
diagnosis_code=f"Z{random.randint(10, 99)}.{random.randint(10, 99)}",
clinical_notes=f"Ordered for {random.choice(['routine screening', 'follow-up care', 'diagnostic evaluation'])}",
collection_datetime=order_datetime + timedelta(hours=random.randint(2, 24)),
collection_location=random.choice([
'Outpatient Lab', 'Emergency Department', 'Inpatient Room',
'ICU', 'Medical Ward'
]),
fasting_status=random.choice(['FASTING', 'NON_FASTING', 'UNKNOWN']),
status=random.choices(
['ORDERED', 'COLLECTED', 'PROCESSING', 'COMPLETED', 'CANCELLED'],
weights=[20, 15, 25, 35, 5]
)[0],
encounter=encounter,
special_instructions="Handle with care" if random.choice([True, False]) else None,
created_at=order_datetime,
updated_at=order_datetime + timedelta(hours=random.randint(1, 48))
)
# Add tests to the order
lab_order.tests.set(selected_tests)
lab_orders.append(lab_order)
except Exception as e:
print(f"Error creating lab order: {e}")
continue
print(f"Created {len(lab_orders)} lab orders")
return lab_orders
def create_specimens(lab_orders, lab_staff):
"""Create specimen records"""
specimens = []
for order in lab_orders:
if order.status in ['CANCELLED']:
continue
# Create 1-2 specimens per order (some tests can share specimens)
num_specimens = random.randint(1, 2)
for spec_num in range(num_specimens):
collection_datetime = order.collection_datetime or (
order.order_datetime + timedelta(hours=random.randint(2, 24))
)
collector = random.choice([s for s in lab_staff if s.tenant == order.tenant])
try:
specimen = Specimen.objects.create(
order=order,
specimen_id=uuid.uuid4(),
specimen_type=random.choice(['BLOOD', 'SERUM', 'PLASMA', 'URINE']),
container_type=random.choice([
'EDTA Tube', 'Heparin Tube', 'Serum Tube', 'Urine Container'
]),
volume=Decimal(str(round(random.uniform(2.0, 10.0), 1))),
collected_datetime=collection_datetime,
collected_by=collector,
collection_site=random.choice([
'Antecubital vein', 'Hand vein', 'Clean catch midstream'
]),
collection_method=random.choice(['VENIPUNCTURE', 'CAPILLARY', 'CLEAN_CATCH']),
quality=random.choices(
['ACCEPTABLE', 'HEMOLYZED', 'LIPEMIC', 'CLOTTED', 'INSUFFICIENT'],
weights=[80, 8, 5, 4, 3]
)[0],
rejection_reason=random.choice([
'Hemolyzed sample', 'Insufficient volume', 'Unlabeled specimen'
]) if random.random() < 0.05 else None,
quality_notes="Good quality specimen" if random.choice([True, False]) else None,
received_datetime=collection_datetime + timedelta(hours=random.randint(1, 4)),
received_by=random.choice([s for s in lab_staff if s.tenant == order.tenant]),
storage_location=f"Rack {random.randint(1, 20)}, Position {random.randint(1, 50)}",
storage_temperature=random.choice(['ROOM_TEMP', 'REFRIGERATED']),
status=random.choices(
['COLLECTED', 'RECEIVED', 'PROCESSING', 'ANALYZED', 'ARCHIVED'],
weights=[10, 15, 25, 40, 10]
)[0],
chain_of_custody=f"Collected by {collector.get_full_name()}",
created_at=collection_datetime,
updated_at=collection_datetime + timedelta(hours=random.randint(1, 24))
)
specimens.append(specimen)
except Exception as e:
print(f"Error creating specimen: {e}")
continue
print(f"Created {len(specimens)} specimens")
return specimens
def create_lab_results(lab_orders, specimens, lab_staff, reference_ranges):
"""Create lab results"""
lab_results = []
for order in lab_orders:
if order.status in ['ORDERED', 'CANCELLED']:
continue
order_specimens = [s for s in specimens if s.order == order and s.status not in ['REJECTED']]
if not order_specimens:
continue
for test in order.tests.all():
specimen = random.choice(order_specimens)
# Get reference range for this test
test_ranges = [r for r in reference_ranges if r.test == test]
ref_range = random.choice(test_ranges) if test_ranges else None
analyzed_datetime = specimen.received_datetime + timedelta(
hours=random.randint(1, test.turnaround_time)
)
analyzer_user = random.choice([s for s in lab_staff if s.tenant == order.tenant])
try:
if test.test_type == 'QUANTITATIVE' and ref_range and ref_range.range_low and ref_range.range_high:
# Generate quantitative result
if random.random() < 0.8: # 80% normal results
result_value = round(random.uniform(
float(ref_range.range_low),
float(ref_range.range_high)
), 2)
abnormal_flag = 'N'
else: # 20% abnormal results
if random.choice([True, False]):
result_value = round(random.uniform(
float(ref_range.range_high),
float(ref_range.range_high) * 1.5
), 2)
abnormal_flag = 'H'
else:
result_value = round(random.uniform(
float(ref_range.range_low) * 0.5,
float(ref_range.range_low)
), 2)
abnormal_flag = 'L'
else:
# Qualitative result
result_value = random.choice(['Negative', 'Positive', 'Normal', 'Abnormal'])
abnormal_flag = 'N' if result_value in ['Negative', 'Normal'] else 'A'
is_critical = abnormal_flag in ['HH', 'LL'] and random.random() < 0.1
lab_result = LabResult.objects.create(
order=order,
test=test,
specimen=specimen,
result_id=uuid.uuid4(),
result_value=str(result_value),
result_unit=ref_range.unit if ref_range else '',
result_type=test.test_type,
reference_range=ref_range.display_range if ref_range else 'See reference',
abnormal_flag=abnormal_flag,
is_critical=is_critical,
critical_called=is_critical and random.choice([True, False]),
critical_called_datetime=analyzed_datetime + timedelta(minutes=30) if is_critical else None,
critical_called_to=order.ordering_provider.get_full_name() if is_critical else None,
analyzed_datetime=analyzed_datetime,
analyzed_by=analyzer_user,
analyzer=test.analyzer,
verified=random.choice([True, False]),
verified_by=random.choice([s for s in lab_staff if
s.tenant == order.tenant and s.employee_profile.role == 'PATHOLOGIST']) if random.choice(
[True, False]) else None,
verified_datetime=analyzed_datetime + timedelta(hours=random.randint(1, 4)) if random.choice(
[True, False]) else None,
status=random.choices(
['PENDING', 'PRELIMINARY', 'FINAL', 'CORRECTED'],
weights=[10, 20, 65, 5]
)[0],
technician_comments="Result verified" if random.choice([True, False]) else None,
pathologist_comments="Normal findings" if random.choice([True, False]) else None,
qc_passed=random.choice([True, False]),
qc_notes="QC within acceptable limits" if random.choice([True, False]) else None,
reported_datetime=analyzed_datetime + timedelta(hours=random.randint(2, 6)),
created_at=analyzed_datetime,
updated_at=analyzed_datetime + timedelta(hours=random.randint(1, 8))
)
lab_results.append(lab_result)
except Exception as e:
print(f"Error creating lab result for {test.test_name}: {e}")
continue
print(f"Created {len(lab_results)} lab results")
return lab_results
def create_quality_control(lab_tests, lab_staff):
"""Create quality control records"""
qc_records = []
for test in lab_tests:
# Create QC records for the past 30 days
for day_offset in range(30):
qc_date = django_timezone.now().date() - timedelta(days=day_offset)
# Skip weekends for some tests
if qc_date.weekday() > 4 and random.choice([True, False]):
continue
qc_datetime = django_timezone.make_aware(
datetime.combine(qc_date, time(random.randint(6, 10), random.randint(0, 59)))
)
performed_by = random.choice([s for s in lab_staff if s.tenant == test.tenant])
# Generate target values and acceptable ranges
target_value = round(random.uniform(50.0, 200.0), 2)
acceptable_low = round(target_value * 0.9, 2)
acceptable_high = round(target_value * 1.1, 2)
# Generate observed value (mostly within range)
if random.random() < 0.95: # 95% pass rate
observed_value = round(random.uniform(acceptable_low, acceptable_high), 2)
qc_status = 'PASS'
else:
observed_value = round(random.uniform(
target_value * 0.7,
target_value * 1.3
), 2)
qc_status = 'FAIL'
try:
qc_record = QualityControl.objects.create(
tenant=test.tenant,
test=test,
qc_id=uuid.uuid4(),
control_material=random.choice([
'Normal Control Level 1', 'Abnormal Control Level 2',
'High Control Level 3', 'Low Control Level 1'
]),
control_lot=f"LOT{random.randint(100000, 999999)}",
control_level=random.choice(['LEVEL_1', 'LEVEL_2', 'LEVEL_3']),
target_value=Decimal(str(target_value)),
acceptable_range_low=Decimal(str(acceptable_low)),
acceptable_range_high=Decimal(str(acceptable_high)),
run_datetime=qc_datetime,
observed_value=Decimal(str(observed_value)),
status=qc_status,
performed_by=performed_by,
reviewed_by=random.choice(
[s for s in lab_staff if s.tenant == test.tenant ]) if random.choice(
[True, False]) else None,
analyzer=test.analyzer,
comments=f"QC {qc_status.lower()} for {test.test_name}",
corrective_action="Repeat analysis" if qc_status == 'FAIL' else None,
created_at=qc_datetime,
updated_at=qc_datetime + timedelta(hours=random.randint(1, 4))
)
qc_records.append(qc_record)
except Exception as e:
print(f"Error creating QC record for {test.test_name}: {e}")
continue
print(f"Created {len(qc_records)} quality control records")
return qc_records
def main():
"""Main function to generate laboratory data"""
print("Starting Saudi Laboratory Data Generation...")
tenants = list(Tenant.objects.all())
if not tenants:
print("❌ No tenants found.")
return
print("\n1. Creating Lab Tests...")
lab_tests = create_lab_tests(tenants)
print("\n2. Creating Reference Ranges...")
reference_ranges = create_reference_ranges(lab_tests)
print("\n3. Getting Lab Staff...")
lab_staff = get_lab_staff(tenants)
print("\n4. Creating Lab Orders...")
lab_orders = create_lab_orders(tenants, lab_tests, days_back=30)
print("\n5. Creating Specimens...")
specimens = create_specimens(lab_orders, lab_staff)
print("\n6. Creating Lab Results...")
lab_results = create_lab_results(lab_orders, specimens, lab_staff, reference_ranges)
print("\n7. Creating Quality Control...")
qc_records = create_quality_control(lab_tests, lab_staff)
print(f"\n✅ Saudi Laboratory Data Generation Complete!")
print(f"📊 Summary:")
print(f" - Lab Tests: {len(lab_tests)}")
print(f" - Reference Ranges: {len(reference_ranges)}")
print(f" - Lab Orders: {len(lab_orders)}")
print(f" - Specimens: {len(specimens)}")
print(f" - Lab Results: {len(lab_results)}")
print(f" - Quality Control Records: {len(qc_records)}")
if __name__ == "__main__":
main()