import os import django # Set up Django environment os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hospital_management.settings') django.setup() import random from datetime import datetime, date, time, timedelta from django.utils import timezone as django_timezone from django.contrib.auth import get_user_model from radiology.models import ImagingStudy, ImagingSeries, DICOMImage, RadiologyReport, ReportTemplate, ImagingOrder from patients.models import PatientProfile from emr.models import Encounter from core.models import Tenant import uuid from decimal import Decimal User = get_user_model() # Saudi radiology data SAUDI_IMAGING_STUDIES = [ # X-Ray Studies ('Chest X-Ray', 'CXR', 'CR', 'CHEST', ['PA', 'Lateral'], 15), ('Abdominal X-Ray', 'AXR', 'CR', 'ABDOMEN', ['AP'], 10), ('Spine X-Ray', 'SPINE', 'CR', 'SPINE', ['AP', 'Lateral'], 20), ('Extremity X-Ray', 'EXT', 'CR', 'EXTREMITY', ['AP', 'Lateral'], 10), # CT Studies ('CT Head', 'CTH', 'CT', 'HEAD', ['Axial'], 30), ('CT Chest', 'CTC', 'CT', 'CHEST', ['Axial'], 45), ('CT Abdomen Pelvis', 'CTAP', 'CT', 'ABDOMEN', ['Axial'], 60), ('CT Angiogram', 'CTA', 'CT', 'VASCULAR', ['Axial', 'Coronal', 'Sagittal'], 75), # MRI Studies ('MRI Brain', 'MRB', 'MR', 'HEAD', ['T1', 'T2', 'FLAIR', 'DWI'], 90), ('MRI Spine', 'MRS', 'MR', 'SPINE', ['T1', 'T2', 'STIR'], 75), ('MRI Knee', 'MRK', 'MR', 'EXTREMITY', ['T1', 'T2', 'PD'], 60), # Ultrasound ('Abdominal Ultrasound', 'USAB', 'US', 'ABDOMEN', ['Gray scale', 'Doppler'], 30), ('Pelvic Ultrasound', 'USPV', 'US', 'PELVIS', ['Transabdominal', 'Transvaginal'], 30), ('Obstetric Ultrasound', 'USOB', 'US', 'OBSTETRIC', ['2D', '3D', 'Doppler'], 45), # Nuclear Medicine ('Bone Scan', 'BS', 'NM', 'BONE', ['Whole body'], 180), ('Thyroid Scan', 'TS', 'NM', 'THYROID', ['Planar'], 60), ] SAUDI_MODALITIES = ['CR', 'CT', 'MR', 'US', 'NM', 'RF', 'MG', 'DX'] SAUDI_BODY_PARTS = [ 'HEAD', 'NECK', 'CHEST', 'ABDOMEN', 'PELVIS', 'SPINE', 'EXTREMITY', 'CARDIAC', 'VASCULAR', 'BREAST' ] def get_radiology_staff(tenants): """Get radiology staff""" radiology_staff = [] for tenant in tenants: tenant_staff = User.objects.filter( tenant=tenant, is_active=True, role__in=['RADIOLOGIST', 'RAD_TECH', 'RAD_SUPERVISOR'] ) radiology_staff.extend(list(tenant_staff)) if not radiology_staff: radiology_staff = create_mock_radiology_staff(tenants) return radiology_staff def create_mock_radiology_staff(tenants): """Create mock radiology staff""" radiology_staff = [] mock_staff = [ {'first_name': 'Dr. Khalil', 'last_name': 'Al-Radwan', 'role': 'RADIOLOGIST'}, {'first_name': 'Dr. Laila', 'last_name': 'Al-Musheer', 'role': 'RADIOLOGIST'}, {'first_name': 'Omar', 'last_name': 'Al-Tasweer', 'role': 'RAD_TECH'}, {'first_name': 'Aisha', 'last_name': 'Al-Shoaa', 'role': 'RAD_TECH'}, {'first_name': 'Mansour', 'last_name': 'Al-Ashaa', 'role': 'RAD_SUPERVISOR'}, ] for tenant in tenants: for staff_data in mock_staff: try: email = f"{staff_data['first_name'].lower().replace(' ', '').replace('.', '')}.{staff_data['last_name'].lower().replace('-', '')}@{tenant.domain}" existing_user = User.objects.filter(email=email).first() if not existing_user: user = User.objects.create_user( email=email, first_name=staff_data['first_name'], last_name=staff_data['last_name'], role=staff_data['role'], tenant=tenant, is_active=True, password='temp_password_123' ) radiology_staff.append(user) else: radiology_staff.append(existing_user) except Exception as e: print(f"Error creating radiology staff: {e}") continue return radiology_staff def create_report_templates(tenants): """Create radiology report templates""" templates = [] template_configs = [ { 'name': 'Chest X-Ray Report', 'modality': 'CR', 'body_part': 'CHEST', 'clinical_history': 'CLINICAL HISTORY: [Clinical indication]\n\nTECHNIQUE: Standard PA and lateral chest radiographs', 'technique': 'Standard two-view chest radiography', 'findings': 'FINDINGS:\nHeart: [Heart findings]\nLungs: [Lung findings]\nBones: [Bone findings]\nSoft tissues: [Soft tissue findings]', 'impression': 'IMPRESSION:\n[Overall impression]', 'recommendations': 'RECOMMENDATIONS:\n[Follow-up recommendations]' }, { 'name': 'CT Head Report', 'modality': 'CT', 'body_part': 'HEAD', 'clinical_history': 'CLINICAL HISTORY: [Clinical indication]\n\nTECHNIQUE: Axial CT images of the head without contrast', 'technique': 'Non-contrast CT of the head', 'findings': 'FINDINGS:\nBrain parenchyma: [Brain findings]\nVentricular system: [Ventricular findings]\nExtraaxial spaces: [Extraaxial findings]\nBone: [Bone findings]', 'impression': 'IMPRESSION:\n[Overall impression]', 'recommendations': 'RECOMMENDATIONS:\n[Follow-up recommendations]' }, { 'name': 'Abdominal Ultrasound Report', 'modality': 'US', 'body_part': 'ABDOMEN', 'clinical_history': 'CLINICAL HISTORY: [Clinical indication]\n\nTECHNIQUE: Real-time ultrasonography of the abdomen', 'technique': 'Transabdominal ultrasound examination', 'findings': 'FINDINGS:\nLiver: [Liver findings]\nGallbladder: [GB findings]\nKidneys: [Kidney findings]\nPancreas: [Pancreas findings]\nSpleen: [Spleen findings]', 'impression': 'IMPRESSION:\n[Overall impression]', 'recommendations': 'RECOMMENDATIONS:\n[Follow-up recommendations]' } ] for tenant in tenants: for config in template_configs: try: template = ReportTemplate.objects.create( tenant=tenant, template_id=uuid.uuid4(), name=config['name'], description=f"Standard template for {config['name']}", modality=config['modality'], body_part=config['body_part'], clinical_history_template=config['clinical_history'], technique_template=config['technique'], findings_template=config['findings'], impression_template=config['impression'], recommendations_template=config['recommendations'], structured_fields=[ 'clinical_history', 'technique', 'findings', 'impression', 'recommendations' ], is_active=True, is_default=config['name'] == 'Chest X-Ray Report', usage_count=random.randint(10, 100), created_at=django_timezone.now() - timedelta(days=random.randint(30, 365)), updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30)), created_by=random.choice([ s for s in User.objects.filter( tenant=tenant, role='RADIOLOGIST', is_active=True ) ]) if User.objects.filter(tenant=tenant, role='RADIOLOGIST').exists() else None ) templates.append(template) except Exception as e: print(f"Error creating template {config['name']}: {e}") continue print(f"Created {len(templates)} report templates") return templates def create_imaging_orders(tenants, days_back=30): """Create imaging orders""" imaging_orders = [] patients = list(PatientProfile.objects.filter(tenant__in=tenants)) providers = list(User.objects.filter( tenant__in=tenants, role__in=['PHYSICIAN', 'SURGEON'], is_active=True )) encounters = list(Encounter.objects.filter(tenant__in=tenants)) if not all([patients, providers]): print("Missing required data for imaging orders") return imaging_orders start_date = django_timezone.now().date() - timedelta(days=days_back) for day_offset in range(days_back): order_date = start_date + timedelta(days=day_offset) daily_orders = random.randint(8, 20) for _ in range(daily_orders): patient = random.choice(patients) provider = random.choice([p for p in providers if p.tenant == patient.tenant]) patient_encounters = [e for e in encounters if e.patient == patient] encounter = random.choice(patient_encounters) if patient_encounters else None study_data = random.choice(SAUDI_IMAGING_STUDIES) study_name, study_code, modality, body_part, sequences, duration = study_data order_datetime = django_timezone.make_aware( datetime.combine(order_date, time(random.randint(7, 19), random.randint(0, 59))) ) priority = random.choices(['ROUTINE', 'URGENT', 'STAT'], weights=[75, 20, 5])[0] status = random.choices( ['ORDERED', 'SCHEDULED', 'IN_PROGRESS', 'COMPLETED', 'CANCELLED'], weights=[20, 25, 15, 35, 5] )[0] try: imaging_order = ImagingOrder.objects.create( tenant=patient.tenant, order_id=uuid.uuid4(), order_number=f"IMG{random.randint(100000, 999999)}", patient=patient, ordering_provider=provider, order_datetime=order_datetime, priority=priority, modality=modality, study_description=study_name, body_part=body_part, clinical_indication=random.choice([ 'Chest pain evaluation', 'Shortness of breath', 'Abdominal pain', 'Headache workup', 'Back pain', 'Joint pain', 'Trauma evaluation', 'Screening study', 'Follow-up imaging', 'Preoperative assessment' ]), clinical_history=f"Patient presents with {random.choice(['acute', 'chronic', 'recurrent'])} symptoms requiring imaging evaluation", diagnosis_code=f"R{random.randint(10, 99)}.{random.randint(0, 9)}", contrast_required=random.choice([True, False]) if modality in ['CT', 'MR'] else False, contrast_type=random.choice(['IODINATED', 'GADOLINIUM']) if random.choice([True, False]) else None, contrast_route='IV' if random.choice([True, False]) else None, requested_datetime=order_datetime + timedelta( hours=random.randint(4, 72) ), scheduled_datetime=order_datetime + timedelta( hours=random.randint(6, 96) ) if status in ['SCHEDULED', 'IN_PROGRESS', 'COMPLETED'] else None, status=status, encounter=encounter, special_instructions=random.choice([ 'NPO 4 hours prior to exam', 'Remove all metal objects', 'Patient claustrophobic - consider sedation', 'Portable study preferred' ]) if random.choice([True, False]) else None, patient_preparation=random.choice([ 'No preparation required', 'Fast for 4 hours', 'Drink contrast material', 'Arrive 30 minutes early' ]), created_at=order_datetime, updated_at=order_datetime + timedelta(hours=random.randint(1, 48)) ) imaging_orders.append(imaging_order) except Exception as e: print(f"Error creating imaging order: {e}") continue print(f"Created {len(imaging_orders)} imaging orders") return imaging_orders def create_imaging_studies(imaging_orders, radiology_staff): """Create imaging studies""" imaging_studies = [] for imaging_order in imaging_orders: if imaging_order.status not in ['IN_PROGRESS', 'COMPLETED']: continue study_datetime = imaging_order.scheduled_datetime or ( imaging_order.order_datetime + timedelta(hours=random.randint(6, 72)) ) radiologist = random.choice([ s for s in radiology_staff if s.tenant == imaging_order.tenant and s.role == 'RADIOLOGIST' ]) referring_physician = imaging_order.ordering_provider try: imaging_study = ImagingStudy.objects.create( tenant=imaging_order.tenant, study_id=uuid.uuid4(), study_instance_uid=f"1.2.826.0.1.{random.randint(1000000, 9999999)}", accession_number=f"ACC{random.randint(100000, 999999)}", patient=imaging_order.patient, referring_physician=referring_physician, radiologist=radiologist, modality=imaging_order.modality, study_description=imaging_order.study_description, body_part=imaging_order.body_part, study_date=study_datetime.date(), study_time=study_datetime.time(), study_datetime=study_datetime, clinical_indication=imaging_order.clinical_indication, clinical_history=imaging_order.clinical_history, diagnosis_code=imaging_order.diagnosis_code, status=random.choices( ['SCHEDULED', 'IN_PROGRESS', 'COMPLETED'], weights=[10, 20, 70] )[0], priority=imaging_order.priority, kvp=random.randint(80, 140) if imaging_order.modality in ['CR', 'CT'] else None, mas=random.randint(50, 500) if imaging_order.modality in ['CR', 'CT'] else None, exposure_time=random.randint(100, 2000) if imaging_order.modality == 'CR' else None, slice_thickness=Decimal(str(random.choice([1.0, 1.5, 2.0, 3.0, 5.0]))) if imaging_order.modality in [ 'CT', 'MR'] else None, station_name=f"STATION_{random.choice(['A', 'B', 'C', 'D'])}", manufacturer=random.choice([ 'GE Healthcare', 'Siemens', 'Philips', 'Toshiba', 'Hitachi' ]), model_name=f"Model_{random.choice(['X1', 'X2', 'Pro', 'Elite'])}", number_of_series=random.randint(1, 5), number_of_instances=random.randint(20, 500), study_size=random.randint(10, 500), # MB image_quality=random.choices( ['EXCELLENT', 'GOOD', 'FAIR', 'POOR'], weights=[60, 30, 8, 2] )[0], completion_status=random.choices( ['COMPLETE', 'INCOMPLETE', 'CANCELLED'], weights=[80, 15, 5] )[0], encounter=imaging_order.encounter, imaging_order=imaging_order, pacs_location=f"PACS_SERVER_{random.choice(['01', '02', '03'])}", archived=random.choice([True, False]), archive_location=f"ARCHIVE_{random.randint(100, 999)}" if random.choice([True, False]) else None, created_at=study_datetime, updated_at=study_datetime + timedelta(hours=random.randint(1, 12)), created_by=radiologist ) imaging_studies.append(imaging_study) except Exception as e: print(f"Error creating imaging study: {e}") continue print(f"Created {len(imaging_studies)} imaging studies") return imaging_studies def create_imaging_series(imaging_studies): """Create imaging series""" imaging_series_list = [] for study in imaging_studies: # Create 1-4 series per study num_series = study.number_of_series for series_num in range(num_series): series_datetime = study.study_datetime + timedelta(minutes=series_num * 5) try: imaging_series = ImagingSeries.objects.create( study=study, series_id=uuid.uuid4(), series_instance_uid=f"{study.study_instance_uid}.{series_num + 1}", series_number=series_num + 1, modality=study.modality, series_description=random.choice([ 'Axial T1', 'Axial T2', 'Sagittal T1', 'Coronal FLAIR', 'PA View', 'Lateral View', 'Oblique View', 'Pre-contrast', 'Post-contrast', 'Delayed Phase' ]), protocol_name=f"{study.modality}_{study.body_part}_{random.choice(['Standard', 'High_Res', 'Fast'])}", series_date=series_datetime.date(), series_time=series_datetime.time(), series_datetime=series_datetime, slice_thickness=study.slice_thickness, spacing_between_slices=study.slice_thickness * Decimal('1.2') if study.slice_thickness else None, pixel_spacing=[Decimal('0.5'), Decimal('0.5')], image_orientation=[1, 0, 0, 0, 1, 0], number_of_instances=random.randint(10, 200), series_size=random.randint(5, 100), # MB body_part=study.body_part, patient_position=random.choice([ 'SUPINE', 'PRONE', 'LATERAL', 'SITTING', 'STANDING' ]), contrast_agent=random.choice([ 'Iodinated contrast', 'Gadolinium', 'None' ]), contrast_route=random.choice(['IV', 'ORAL', 'NONE']), image_quality=random.choices( ['EXCELLENT', 'GOOD', 'FAIR', 'POOR'], weights=[65, 25, 8, 2] )[0], created_at=series_datetime, updated_at=series_datetime + timedelta(minutes=random.randint(5, 30)) ) imaging_series_list.append(imaging_series) except Exception as e: print(f"Error creating imaging series: {e}") continue print(f"Created {len(imaging_series_list)} imaging series") return imaging_series_list def create_dicom_images(imaging_series_list): """Create DICOM images""" dicom_images = [] for series in imaging_series_list: # Create images for this series num_images = min(series.number_of_instances, 50) # Limit to 50 for demo for image_num in range(num_images): try: dicom_image = DICOMImage.objects.create( series=series, image_id=uuid.uuid4(), sop_instance_uid=f"{series.series_instance_uid}.{image_num + 1}", instance_number=image_num + 1, image_type=random.choice(['ORIGINAL', 'DERIVED', 'SECONDARY']), sop_class_uid=f"1.2.840.10008.5.1.4.1.1.{random.randint(1, 99)}", rows=random.choice([256, 512, 1024]), columns=random.choice([256, 512, 1024]), bits_allocated=random.choice([8, 16]), bits_stored=random.choice([8, 12, 16]), image_position=[ Decimal(str(random.uniform(-200, 200))), Decimal(str(random.uniform(-200, 200))), Decimal(str(image_num * 5)) ], image_orientation=[1, 0, 0, 0, 1, 0], slice_location=Decimal(str(image_num * 5)), window_center=random.randint(40, 400), window_width=random.randint(200, 2000), file_path=f"/dicom/studies/{series.study.study_id}/{series.series_id}/{image_num + 1}.dcm", file_size=random.randint(100000, 2000000), # bytes transfer_syntax_uid="1.2.840.10008.1.2.1", content_date=series.series_datetime.date(), content_time=series.series_datetime.time(), acquisition_datetime=series.series_datetime + timedelta(seconds=image_num * 2), image_quality=random.choices( ['EXCELLENT', 'GOOD', 'FAIR', 'POOR'], weights=[70, 25, 4, 1] )[0], processed=random.choice([True, False]), archived=random.choice([True, False]), created_at=series.series_datetime + timedelta(seconds=image_num * 2), updated_at=series.series_datetime + timedelta(minutes=random.randint(5, 60)) ) dicom_images.append(dicom_image) except Exception as e: print(f"Error creating DICOM image: {e}") continue print(f"Created {len(dicom_images)} DICOM images") return dicom_images def create_radiology_reports(imaging_studies, radiology_staff, templates): """Create radiology reports""" reports = [] for study in imaging_studies: if study.status != 'COMPLETED': continue # 80% of completed studies get reports if random.random() > 0.8: continue radiologist = study.radiologist # Find suitable template suitable_templates = [ t for t in templates if t.tenant == study.tenant and ( t.modality == study.modality or t.body_part == study.body_part ) ] template = random.choice(suitable_templates) if suitable_templates else None # Generate report times dictated_datetime = study.study_datetime + timedelta( hours=random.randint(2, 24) ) transcribed_datetime = dictated_datetime + timedelta( minutes=random.randint(30, 180) ) verified_datetime = transcribed_datetime + timedelta( minutes=random.randint(15, 120) ) finalized_datetime = verified_datetime + timedelta( minutes=random.randint(5, 60) ) # Generate report content clinical_history = f"Clinical indication: {study.clinical_indication}\n\nHistory: {study.clinical_history}" technique = f"{study.modality} {study.body_part} performed according to standard protocol" findings = f"FINDINGS:\nThe {study.body_part.lower()} {study.modality} demonstrates no acute abnormalities. " findings += random.choice([ "Normal anatomical structures are visualized.", "No focal lesions identified.", "Age-appropriate changes noted.", "Mild degenerative changes present." ]) impression = random.choice([ "Normal study", "No acute abnormalities", "Mild degenerative changes", "Follow-up recommended", "Clinical correlation suggested" ]) try: report = RadiologyReport.objects.create( study=study, report_id=uuid.uuid4(), radiologist=radiologist, dictated_by=radiologist, transcribed_by=random.choice([ s for s in radiology_staff if s.tenant == study.tenant ]) if random.choice([True, False]) else None, clinical_history=clinical_history, technique=technique, findings=findings, impression=impression, recommendations=random.choice([ "No follow-up needed", "Follow-up in 6 months", "Clinical correlation recommended", "Additional imaging if clinically indicated" ]), status='FINAL', critical_finding=random.choice([True, False]) if random.random() < 0.05 else False, critical_communicated=random.choice([True, False]) if random.random() < 0.05 else False, critical_communicated_to=study.referring_physician.get_full_name() if random.choice( [True, False]) else None, critical_communicated_datetime=finalized_datetime + timedelta(minutes=10) if random.choice( [True, False]) else None, dictated_datetime=dictated_datetime, transcribed_datetime=transcribed_datetime, verified_datetime=verified_datetime, finalized_datetime=finalized_datetime, template_used=template, structured_data={ 'modality': study.modality, 'body_part': study.body_part, 'indication': study.clinical_indication, 'quality': study.image_quality }, report_length=len(findings) + len(impression), turnaround_time=(finalized_datetime - study.study_datetime).total_seconds() // 60, # minutes addendum=random.choice([ "Addendum: Additional review confirms initial findings.", None ]) if random.random() < 0.1 else None, addendum_datetime=finalized_datetime + timedelta(hours=random.randint(1, 24)) if random.choice( [True, False]) else None, created_at=dictated_datetime, updated_at=finalized_datetime ) reports.append(report) except Exception as e: print(f"Error creating radiology report: {e}") continue print(f"Created {len(reports)} radiology reports") return reports def main(): """Main function to generate radiology data""" print("Starting Saudi Radiology Data Generation...") tenants = list(Tenant.objects.all()) if not tenants: print("āŒ No tenants found.") return print("\n1. Creating Report Templates...") templates = create_report_templates(tenants) print("\n2. Getting Radiology Staff...") radiology_staff = get_radiology_staff(tenants) print("\n3. Creating Imaging Orders...") imaging_orders = create_imaging_orders(tenants, days_back=30) print("\n4. Creating Imaging Studies...") imaging_studies = create_imaging_studies(imaging_orders, radiology_staff) print("\n5. Creating Imaging Series...") imaging_series_list = create_imaging_series(imaging_studies) print("\n6. Creating DICOM Images...") dicom_images = create_dicom_images(imaging_series_list) print("\n7. Creating Radiology Reports...") reports = create_radiology_reports(imaging_studies, radiology_staff, templates) print(f"\nāœ… Saudi Radiology Data Generation Complete!") print(f"šŸ“Š Summary:") print(f" - Report Templates: {len(templates)}") print(f" - Imaging Orders: {len(imaging_orders)}") print(f" - Imaging Studies: {len(imaging_studies)}") print(f" - Imaging Series: {len(imaging_series_list)}") print(f" - DICOM Images: {len(dicom_images)}") print(f" - Radiology Reports: {len(reports)}") if __name__ == "__main__": main()