339 lines
16 KiB
Python
339 lines
16 KiB
Python
import random
|
||
import uuid
|
||
from datetime import datetime, timedelta
|
||
from django.core.management.base import BaseCommand
|
||
from django.utils import timezone
|
||
from django.db import transaction
|
||
|
||
from accounts.models import User
|
||
from blood_bank.models import (
|
||
BloodGroup, Donor, BloodComponent, BloodUnit, BloodTest,
|
||
BloodRequest, InventoryLocation, QualityControl
|
||
)
|
||
from patients.models import PatientProfile
|
||
from hr.models import Department
|
||
|
||
|
||
class Command(BaseCommand):
|
||
help = 'Generate Saudi-influenced blood bank test data'
|
||
|
||
def add_arguments(self, parser):
|
||
parser.add_argument('--donors', type=int, default=50, help='Number of donors to create (default: 50)')
|
||
parser.add_argument('--units', type=int, default=100, help='Number of blood units to create (default: 100)')
|
||
|
||
def handle(self, *args, **options):
|
||
self.stdout.write(self.style.SUCCESS('Starting Saudi blood bank data generation...'))
|
||
try:
|
||
with transaction.atomic():
|
||
users = list(User.objects.all())
|
||
patients = list(PatientProfile.objects.all())
|
||
departments = list(Department.objects.all())
|
||
|
||
if not users:
|
||
self.stdout.write(self.style.ERROR('No users found. Please create users first.'))
|
||
return
|
||
|
||
self.create_blood_groups()
|
||
self.create_blood_components()
|
||
self.create_inventory_locations()
|
||
|
||
donors = self.create_saudi_donors(options['donors'], users)
|
||
blood_units = self.create_blood_units(options['units'], donors, users)
|
||
self.create_blood_tests(blood_units, users)
|
||
|
||
if patients and departments:
|
||
self.create_blood_requests(20, patients, departments, users)
|
||
|
||
self.create_quality_control_records(users)
|
||
|
||
except Exception as e:
|
||
self.stdout.write(self.style.ERROR(f'Error: {str(e)}'))
|
||
return
|
||
|
||
self.stdout.write(self.style.SUCCESS('Saudi blood bank data generation completed!'))
|
||
|
||
# ---------- seeders ----------
|
||
|
||
def create_blood_groups(self):
|
||
"""Create ABO/Rh blood groups with correct choices (POS/NEG)."""
|
||
blood_groups = [
|
||
('A', 'POS'), ('A', 'NEG'),
|
||
('B', 'POS'), ('B', 'NEG'),
|
||
('AB', 'POS'), ('AB', 'NEG'),
|
||
('O', 'POS'), ('O', 'NEG'),
|
||
]
|
||
for abo, rh in blood_groups:
|
||
BloodGroup.objects.get_or_create(abo_type=abo, rh_factor=rh)
|
||
self.stdout.write('✓ Created blood groups')
|
||
|
||
def create_blood_components(self):
|
||
"""Create blood components with required fields."""
|
||
components = [
|
||
# (name, volume_ml, shelf_life_days, storage_temperature, description)
|
||
('whole_blood', 450, 35, '2–6°C', 'Whole blood unit'),
|
||
('packed_rbc', 300, 42, '2–6°C', 'Packed red blood cells'),
|
||
('fresh_frozen_plasma', 250, 365, '≤ -18°C', 'Fresh frozen plasma'),
|
||
('platelets', 50, 5, '20–24°C with agitation', 'Platelet concentrate'),
|
||
# Optional extras (your model allows them)
|
||
('cryoprecipitate', 15, 365, '≤ -18°C', 'Cryoprecipitated AHF'),
|
||
('granulocytes', 200, 1, '20–24°C', 'Granulocyte concentrate'),
|
||
]
|
||
for name, volume, shelf_life, temp, desc in components:
|
||
BloodComponent.objects.get_or_create(
|
||
name=name,
|
||
defaults={
|
||
'description': desc,
|
||
'shelf_life_days': shelf_life,
|
||
'storage_temperature': temp,
|
||
'volume_ml': volume,
|
||
'is_active': True
|
||
}
|
||
)
|
||
self.stdout.write('✓ Created blood components')
|
||
|
||
def create_inventory_locations(self):
|
||
"""Create storage locations with required fields."""
|
||
locations = [
|
||
# (name, type, capacity, temperature_range)
|
||
('Main Refrigerator A', 'refrigerator', 100, '2–6°C'),
|
||
('Main Refrigerator B', 'refrigerator', 100, '2–6°C'),
|
||
('Platelet Agitator 1', 'platelet_agitator', 30, '20–24°C'),
|
||
('Plasma Freezer A', 'freezer', 200, '≤ -18°C'),
|
||
('Quarantine Storage', 'quarantine', 50, '2–6°C'),
|
||
('Testing Area 1', 'testing', 20, 'Room temp'),
|
||
]
|
||
for name, loc_type, capacity, temp_range in locations:
|
||
InventoryLocation.objects.get_or_create(
|
||
name=name,
|
||
defaults={
|
||
'location_type': loc_type,
|
||
'temperature_range': temp_range,
|
||
'capacity': capacity,
|
||
'is_active': True,
|
||
'current_stock': 0,
|
||
}
|
||
)
|
||
self.stdout.write('✓ Created inventory locations')
|
||
|
||
def create_saudi_donors(self, count, users):
|
||
"""Create Saudi-influenced donors aligned with Donor model fields."""
|
||
saudi_male_names = [
|
||
'Abdullah', 'Mohammed', 'Ahmed', 'Ali', 'Omar', 'Khalid', 'Fahd', 'Salman',
|
||
'Faisal', 'Turki', 'Nasser', 'Saud', 'Bandar', 'Majid', 'Waleed', 'Yazeed',
|
||
'Abdulaziz', 'Abdulrahman', 'Ibrahim', 'Hassan', 'Hussein', 'Mansour'
|
||
]
|
||
saudi_female_names = [
|
||
'Fatima', 'Aisha', 'Maryam', 'Khadija', 'Zainab', 'Noura', 'Sarah', 'Hala',
|
||
'Reem', 'Lama', 'Nada', 'Rana', 'Dina', 'Lina', 'Maha', 'Wafa', 'Amal',
|
||
'Najla', 'Huda', 'Layla', 'Nour', 'Ghada', 'Rania'
|
||
]
|
||
saudi_family_names = [
|
||
'Al-Saud', 'Al-Rashid', 'Al-Otaibi', 'Al-Dosari', 'Al-Harbi', 'Al-Zahrani',
|
||
'Al-Ghamdi', 'Al-Qahtani', 'Al-Mutairi', 'Al-Malki', 'Al-Subai', 'Al-Shehri',
|
||
'Al-Dawsari', 'Al-Enezi', 'Al-Rasheed', 'Al-Faraj', 'Al-Mansour', 'Al-Nasser'
|
||
]
|
||
|
||
blood_groups = list(BloodGroup.objects.all())
|
||
created_by = random.choice(users)
|
||
|
||
donors = []
|
||
for i in range(count):
|
||
try:
|
||
gender = random.choice(['M', 'F'])
|
||
first_name = random.choice(saudi_male_names if gender == 'M' else saudi_female_names)
|
||
last_name = random.choice(saudi_family_names)
|
||
|
||
national_id = f"{random.choice([1, 2])}{random.randint(100000000, 999999999)}" # 10 digits
|
||
|
||
age = random.randint(18, 65)
|
||
birth_date = timezone.now().date() - timedelta(days=age * 365)
|
||
|
||
phone = f"+966{random.choice([50, 51, 52, 53, 54, 55])}{random.randint(1000000, 9999999)}"
|
||
weight = random.randint(60, 120) if gender == 'M' else random.randint(45, 90)
|
||
height = random.randint(150, 190)
|
||
|
||
blood_group = random.choice(blood_groups)
|
||
|
||
donor = Donor.objects.create(
|
||
donor_id=uuid.uuid4(),
|
||
first_name=first_name,
|
||
last_name=last_name,
|
||
date_of_birth=birth_date,
|
||
gender=gender,
|
||
blood_group=blood_group,
|
||
national_id=national_id,
|
||
phone=phone, # field is `phone`
|
||
email=f"{first_name.lower()}.{last_name.lower().replace('-', '').replace(' ', '')}@gmail.com",
|
||
address=f"{random.randint(1, 999)} King Fahd Road, Saudi Arabia",
|
||
emergency_contact_name=f"{random.choice(saudi_male_names + saudi_female_names)} {random.choice(saudi_family_names)}",
|
||
emergency_contact_phone=f"+966{random.choice([50, 51, 52, 53, 54, 55])}{random.randint(1000000, 9999999)}",
|
||
donor_type='voluntary',
|
||
status='active',
|
||
last_donation_date=timezone.now() - timedelta(days=random.randint(60, 365)) if random.random() < 0.6 else None,
|
||
total_donations=random.randint(0, 10),
|
||
weight=weight,
|
||
height=height,
|
||
notes="Saudi donor",
|
||
created_by=created_by, # required
|
||
)
|
||
donors.append(donor)
|
||
except Exception as e:
|
||
self.stdout.write(f"Error creating donor {i}: {str(e)}")
|
||
continue
|
||
|
||
self.stdout.write(f'✓ Created {len(donors)} Saudi donors')
|
||
return donors
|
||
|
||
def create_blood_units(self, count, donors, users):
|
||
"""Create blood units; save location as string and update stock."""
|
||
if not donors:
|
||
self.stdout.write('No donors available for blood units')
|
||
return []
|
||
|
||
components = list(BloodComponent.objects.filter(is_active=True))
|
||
locations = list(InventoryLocation.objects.filter(is_active=True))
|
||
blood_units = []
|
||
|
||
for i in range(count):
|
||
try:
|
||
donor = random.choice(donors)
|
||
component = random.choice(components)
|
||
inv = random.choice(locations)
|
||
|
||
unit_number = f"SA{timezone.now().year}{random.randint(100000, 999999)}"
|
||
collection_date = timezone.now() - timedelta(days=random.randint(0, 30))
|
||
expiry_date = collection_date + timedelta(days=component.shelf_life_days)
|
||
|
||
status = random.choice(['available', 'reserved', 'issued', 'collected', 'testing', 'quarantine'])
|
||
|
||
blood_unit = BloodUnit.objects.create(
|
||
unit_number=unit_number,
|
||
donor=donor,
|
||
blood_group=donor.blood_group,
|
||
component=component,
|
||
volume_ml=component.volume_ml,
|
||
collection_date=collection_date,
|
||
expiry_date=expiry_date,
|
||
status=status,
|
||
location=inv.name, # CharField on model
|
||
collected_by=random.choice(users),
|
||
collection_site='King Fahd Medical City Blood Bank',
|
||
bag_type='single',
|
||
anticoagulant='CPDA-1',
|
||
notes=f"Collected from donor {donor.donor_id}"
|
||
)
|
||
blood_units.append(blood_unit)
|
||
|
||
# Update inventory stock (bounded by capacity)
|
||
inv.current_stock = min(inv.capacity, inv.current_stock + 1)
|
||
inv.save(update_fields=['current_stock'])
|
||
|
||
except Exception as e:
|
||
self.stdout.write(f"Error creating blood unit {i}: {str(e)}")
|
||
continue
|
||
|
||
self.stdout.write(f'✓ Created {len(blood_units)} blood units')
|
||
return blood_units
|
||
|
||
def create_blood_tests(self, blood_units, users):
|
||
"""Create infectious disease tests for each unit."""
|
||
test_types = ['hiv', 'hbv', 'hcv', 'syphilis'] # must match TEST_TYPE_CHOICES keys
|
||
|
||
for unit in blood_units:
|
||
try:
|
||
for test_type in test_types:
|
||
result = random.choices(['NEG', 'POS'], weights=[0.95, 0.05])[0]
|
||
BloodTest.objects.create(
|
||
blood_unit=unit,
|
||
test_type=test_type,
|
||
result=result,
|
||
test_date=unit.collection_date + timedelta(hours=random.randint(2, 24)),
|
||
tested_by=random.choice(users),
|
||
equipment_used=f"Abbott PRISM {random.randint(1000, 9999)}",
|
||
lot_number=f"LOT{random.randint(100000, 999999)}",
|
||
notes="Routine infectious disease screening"
|
||
)
|
||
except Exception as e:
|
||
self.stdout.write(f"Error creating tests for unit {unit.unit_number}: {str(e)}")
|
||
continue
|
||
|
||
self.stdout.write(f'✓ Created blood tests for {len(blood_units)} units')
|
||
|
||
def create_blood_requests(self, count, patients, departments, users):
|
||
"""Create blood requests from departments."""
|
||
components = list(BloodComponent.objects.filter(is_active=True))
|
||
blood_groups = list(BloodGroup.objects.all())
|
||
|
||
for i in range(count):
|
||
try:
|
||
patient = random.choice(patients)
|
||
department = random.choice(departments)
|
||
component = random.choice(components)
|
||
|
||
request_number = f"REQ{timezone.now().year}{random.randint(10000, 99999)}"
|
||
indications = [
|
||
'Surgical blood loss during cardiac surgery',
|
||
'Anemia secondary to chronic kidney disease',
|
||
'Postpartum hemorrhage',
|
||
'Trauma-related blood loss',
|
||
'Chemotherapy-induced anemia'
|
||
]
|
||
urgency = random.choice(['routine', 'urgent', 'emergency'])
|
||
status = random.choice(['pending', 'processing', 'ready'])
|
||
|
||
request_date = timezone.now() - timedelta(days=random.randint(0, 7))
|
||
required_by = request_date + timedelta(hours=random.randint(2, 48))
|
||
|
||
BloodRequest.objects.create(
|
||
request_number=request_number,
|
||
patient=patient,
|
||
requesting_department=department,
|
||
requesting_physician=random.choice(users),
|
||
component_requested=component,
|
||
units_requested=random.randint(1, 3),
|
||
urgency=urgency,
|
||
indication=random.choice(indications),
|
||
patient_blood_group=random.choice(blood_groups),
|
||
hemoglobin_level=round(random.uniform(7.0, 12.0), 1),
|
||
status=status,
|
||
# auto_now_add request_date on model; we still set required_by
|
||
required_by=required_by,
|
||
notes=f"Request from {department.name}"
|
||
)
|
||
except Exception as e:
|
||
self.stdout.write(f"Error creating blood request {i}: {str(e)}")
|
||
continue
|
||
|
||
self.stdout.write('✓ Created blood requests')
|
||
|
||
def create_quality_control_records(self, users):
|
||
"""Create quality control records."""
|
||
test_types = ['temperature_monitoring', 'equipment_calibration', 'reagent_testing']
|
||
for i in range(10):
|
||
try:
|
||
test_type = random.choice(test_types)
|
||
status = random.choice(['pass', 'fail', 'pending'])
|
||
performed_by = random.choice(users)
|
||
qc = QualityControl.objects.create(
|
||
test_type=test_type,
|
||
test_date=timezone.now() - timedelta(days=random.randint(0, 30)),
|
||
equipment_tested=f"Equipment {random.randint(1000, 9999)}",
|
||
parameters_tested="Temperature range, accuracy",
|
||
expected_results="Within acceptable limits",
|
||
actual_results="Pass" if status == 'pass' else "Out of range" if status == 'fail' else "Pending",
|
||
status=status,
|
||
performed_by=performed_by,
|
||
corrective_action="Recalibration performed" if status == 'fail' else "",
|
||
next_test_date=timezone.now() + timedelta(days=30)
|
||
)
|
||
# Optionally auto-review passes
|
||
if status == 'pass' and random.random() < 0.5:
|
||
qc.reviewed_by = random.choice(users)
|
||
qc.review_date = timezone.now()
|
||
qc.review_notes = "Reviewed and verified."
|
||
qc.save(update_fields=['reviewed_by', 'review_date', 'review_notes'])
|
||
except Exception as e:
|
||
self.stdout.write(f"Error creating QC record {i}: {str(e)}")
|
||
continue
|
||
|
||
self.stdout.write('✓ Created quality control records') |