372 lines
12 KiB
Python
372 lines
12 KiB
Python
"""
|
|
Database utilities and model helpers for data generation.
|
|
Contains common database operations and model field utilities.
|
|
"""
|
|
|
|
import random
|
|
from datetime import datetime, timedelta
|
|
from decimal import Decimal
|
|
from django.db import transaction, IntegrityError
|
|
from django.db.models import DecimalField, CharField, IntegerField
|
|
from django.db.models.fields import NOT_PROVIDED
|
|
from django.utils import timezone as django_timezone
|
|
|
|
|
|
# ================================
|
|
# MODEL FIELD UTILITIES
|
|
# ================================
|
|
|
|
def _model_fields(Model):
|
|
"""Get all concrete model fields"""
|
|
return {f.name for f in Model._meta.get_fields() if getattr(f, "concrete", False) and not f.auto_created}
|
|
|
|
|
|
def _filter_kwargs(Model, data: dict):
|
|
"""Filter dictionary to only include valid model fields"""
|
|
allowed = _model_fields(Model)
|
|
return {k: v for k, v in data.items() if k in allowed}
|
|
|
|
|
|
def _quantize_for_field(value: Decimal, f: DecimalField) -> Decimal:
|
|
"""Quantize a Decimal to a field's decimal_places and clamp to max_digits."""
|
|
# Quantize to required decimal_places
|
|
q = Decimal(1).scaleb(-f.decimal_places) # 10^(-decimal_places)
|
|
v = Decimal(value).quantize(q)
|
|
|
|
# Ensure total digits <= max_digits (digits before + decimal_places)
|
|
# Count digits before decimal:
|
|
sign, digits, exp = v.as_tuple()
|
|
digits_str_len = len(digits)
|
|
# number of digits after decimal is decimal_places
|
|
digits_before = digits_str_len - f.decimal_places if f.decimal_places else digits_str_len
|
|
# if v is 0.x and decimal_places > digits, digits_before can be negative; normalize
|
|
if digits_before < 0:
|
|
digits_before = 0
|
|
|
|
max_before = f.max_digits - f.decimal_places
|
|
if max_before < 0:
|
|
max_before = 0
|
|
|
|
# If too many digits before decimal, clamp to the largest representable value
|
|
if digits_before > max_before:
|
|
# Largest integer part we can store is 10^max_before - 1
|
|
max_int = (10 ** max_before) - 1 if max_before > 0 else 0
|
|
v = Decimal(max_int).quantize(q)
|
|
return v
|
|
|
|
|
|
def _default_decimal_for(BillingConfiguration, f: DecimalField) -> Decimal:
|
|
"""Return a safe default decimal per field name & precision."""
|
|
name = f.name.lower()
|
|
if any(k in name for k in ["tax", "vat", "rate"]):
|
|
# Prefer 15% if it's a rate; use 15 if integer percent
|
|
base = Decimal("15") if f.decimal_places == 0 else Decimal("0.15")
|
|
else:
|
|
base = Decimal("0")
|
|
return _quantize_for_field(base, f)
|
|
|
|
|
|
# ================================
|
|
# SEQUENTIAL NUMBER GENERATORS
|
|
# ================================
|
|
|
|
def _next_seq_number(prefix, Model, field):
|
|
"""Generate next sequential number for a model field"""
|
|
today = django_timezone.now().date().strftime("%Y%m%d")
|
|
i = 1
|
|
while True:
|
|
candidate = f"{prefix}-{today}-{i:04d}"
|
|
if not Model.objects.filter(**{field: candidate}).exists():
|
|
return candidate
|
|
i += 1
|
|
|
|
|
|
def _next_claim_number():
|
|
"""Generate next insurance claim number"""
|
|
from billing.models import InsuranceClaim
|
|
return _next_seq_number("CLM", InsuranceClaim, "claim_number")
|
|
|
|
|
|
def _next_payment_number():
|
|
"""Generate next payment number"""
|
|
from billing.models import Payment
|
|
return _next_seq_number("PMT", Payment, "payment_number")
|
|
|
|
|
|
def _next_bill_number():
|
|
"""Generate next bill number"""
|
|
from billing.models import MedicalBill
|
|
return _next_seq_number("BILL", MedicalBill, "bill_number")
|
|
|
|
|
|
def _next_case_number(tenant):
|
|
"""Generate next surgical case number"""
|
|
from operating_theatre.models import SurgicalCase
|
|
return f"CASE-{tenant.slug.upper()}-{django_timezone.now().year}-{random.randint(1000, 9999):05d}"
|
|
|
|
|
|
# ================================
|
|
# DATABASE OPERATIONS
|
|
# ================================
|
|
|
|
def safe_bulk_create(Model, objects, batch_size=1000, ignore_conflicts=False):
|
|
"""Safely bulk create objects with error handling"""
|
|
if not objects:
|
|
return 0
|
|
|
|
created_count = 0
|
|
for i in range(0, len(objects), batch_size):
|
|
batch = objects[i:i + batch_size]
|
|
try:
|
|
with transaction.atomic():
|
|
if ignore_conflicts:
|
|
Model.objects.bulk_create(batch, ignore_conflicts=True)
|
|
else:
|
|
Model.objects.bulk_create(batch)
|
|
created_count += len(batch)
|
|
except IntegrityError as e:
|
|
print(f"Integrity error in batch {i//batch_size + 1}: {e}")
|
|
# Try individual creates for problematic batch
|
|
for obj in batch:
|
|
try:
|
|
obj.save()
|
|
created_count += 1
|
|
except IntegrityError:
|
|
continue # Skip duplicates
|
|
except Exception as e:
|
|
print(f"Error in batch {i//batch_size + 1}: {e}")
|
|
continue
|
|
|
|
return created_count
|
|
|
|
|
|
def safe_get_or_create(Model, defaults=None, **kwargs):
|
|
"""Safe get_or_create with error handling"""
|
|
try:
|
|
return Model.objects.get_or_create(defaults=defaults or {}, **kwargs)
|
|
except Exception as e:
|
|
print(f"Error in get_or_create for {Model.__name__}: {e}")
|
|
return None, False
|
|
|
|
|
|
def safe_create(Model, **kwargs):
|
|
"""Safe create with error handling"""
|
|
try:
|
|
return Model.objects.create(**kwargs)
|
|
except Exception as e:
|
|
print(f"Error creating {Model.__name__}: {e}")
|
|
return None
|
|
|
|
|
|
# ================================
|
|
# TENANT UTILITIES
|
|
# ================================
|
|
|
|
def get_tenant_users(tenant, roles=None, limit=20):
|
|
"""Get users for a tenant with optional role filtering"""
|
|
from django.contrib.auth import get_user_model
|
|
User = get_user_model()
|
|
|
|
qs = User.objects.filter(tenant=tenant, is_active=True)
|
|
if roles:
|
|
qs = qs.filter(employee_profile__role__in=roles)
|
|
return list(qs[:limit])
|
|
|
|
|
|
def get_tenant_providers(tenant, clinical_only=True):
|
|
"""Get healthcare providers for a tenant"""
|
|
from django.contrib.auth import get_user_model
|
|
User = get_user_model()
|
|
|
|
clinical_roles = ['PHYSICIAN', 'NURSE_PRACTITIONER', 'PHYSICIAN_ASSISTANT', 'RADIOLOGIST']
|
|
qs = User.objects.filter(tenant=tenant, is_active=True)
|
|
|
|
if clinical_only:
|
|
qs = qs.filter(employee_profile__role__in=clinical_roles)
|
|
|
|
return list(qs)
|
|
|
|
|
|
def get_tenant_patients(tenant, limit=100):
|
|
"""Get patients for a tenant"""
|
|
from patients.models import PatientProfile
|
|
return list(PatientProfile.objects.filter(tenant=tenant)[:limit])
|
|
|
|
|
|
def get_tenant_departments(tenant):
|
|
"""Get departments for a tenant"""
|
|
from hr.models import Department
|
|
return list(Department.objects.filter(tenant=tenant))
|
|
|
|
|
|
# ================================
|
|
# PROGRESS TRACKING
|
|
# ================================
|
|
|
|
class ProgressTracker:
|
|
"""Track progress of data generation operations"""
|
|
|
|
def __init__(self, total_operations=0):
|
|
self.total_operations = total_operations
|
|
self.completed_operations = 0
|
|
self.start_time = django_timezone.now()
|
|
|
|
def increment(self, count=1):
|
|
"""Increment completed operations"""
|
|
self.completed_operations += count
|
|
|
|
def get_progress(self):
|
|
"""Get current progress percentage"""
|
|
if self.total_operations == 0:
|
|
return 100
|
|
return int((self.completed_operations / self.total_operations) * 100)
|
|
|
|
def get_eta(self):
|
|
"""Estimate time remaining"""
|
|
if self.completed_operations == 0:
|
|
return None
|
|
|
|
elapsed = django_timezone.now() - self.start_time
|
|
total_estimated = elapsed * (self.total_operations / self.completed_operations)
|
|
remaining = total_estimated - elapsed
|
|
|
|
return remaining
|
|
|
|
def print_progress(self, operation_name=""):
|
|
"""Print current progress"""
|
|
progress = self.get_progress()
|
|
eta = self.get_eta()
|
|
eta_str = f" ETA: {eta}" if eta else ""
|
|
|
|
print(f"[{progress:3d}%] {operation_name}{eta_str}")
|
|
|
|
|
|
# ================================
|
|
# VALIDATION UTILITIES
|
|
# ================================
|
|
|
|
def validate_tenant_exists(tenant_id=None, tenant_slug=None):
|
|
"""Validate that tenant exists"""
|
|
from core.models import Tenant
|
|
|
|
if tenant_id:
|
|
try:
|
|
return Tenant.objects.get(id=tenant_id)
|
|
except Tenant.DoesNotExist:
|
|
raise ValueError(f"Tenant with ID {tenant_id} does not exist")
|
|
|
|
if tenant_slug:
|
|
try:
|
|
return Tenant.objects.get(slug=tenant_slug)
|
|
except Tenant.DoesNotExist:
|
|
raise ValueError(f"Tenant with slug {tenant_slug} does not exist")
|
|
|
|
# Return first active tenant if no specific tenant requested
|
|
tenant = Tenant.objects.filter(is_active=True).first()
|
|
if not tenant:
|
|
raise ValueError("No active tenants found")
|
|
|
|
return tenant
|
|
|
|
|
|
def validate_dependencies():
|
|
"""Validate that all required dependencies exist"""
|
|
from core.models import Tenant
|
|
from django.contrib.auth import get_user_model
|
|
|
|
User = get_user_model()
|
|
|
|
# Check for tenants
|
|
tenant_count = Tenant.objects.filter(is_active=True).count()
|
|
if tenant_count == 0:
|
|
raise ValueError("No active tenants found. Please create tenants first.")
|
|
|
|
# Check for users
|
|
user_count = User.objects.filter(is_active=True).count()
|
|
if user_count == 0:
|
|
raise ValueError("No active users found. Please create users first.")
|
|
|
|
return {
|
|
'tenants': tenant_count,
|
|
'users': user_count
|
|
}
|
|
|
|
|
|
# ================================
|
|
# CLEANUP UTILITIES
|
|
# ================================
|
|
|
|
def cleanup_test_data(tenant=None, confirm=False):
|
|
"""Clean up test data (use with caution!)"""
|
|
if not confirm:
|
|
print("WARNING: This will delete test data. Set confirm=True to proceed.")
|
|
return
|
|
|
|
from django.core.management import call_command
|
|
|
|
# Reset sequences and clear data
|
|
models_to_clear = [
|
|
'laboratory.LabResult',
|
|
'laboratory.Specimen',
|
|
'laboratory.LabOrder',
|
|
'emr.ClinicalNote',
|
|
'emr.CarePlan',
|
|
'emr.ProblemList',
|
|
'emr.VitalSigns',
|
|
'emr.Encounter',
|
|
'appointments.AppointmentRequest',
|
|
'patients.PatientProfile',
|
|
'hr.Employee',
|
|
'accounts.User',
|
|
]
|
|
|
|
for model in models_to_clear:
|
|
try:
|
|
call_command('flush', model, verbosity=0, interactive=False)
|
|
except Exception as e:
|
|
print(f"Error clearing {model}: {e}")
|
|
|
|
print("Test data cleanup completed.")
|
|
|
|
|
|
# ================================
|
|
# BATCH PROCESSING UTILITIES
|
|
# ================================
|
|
|
|
def batch_process(items, batch_size=100, process_func=None, progress_callback=None):
|
|
"""Process items in batches with progress tracking"""
|
|
if not process_func:
|
|
return
|
|
|
|
total_batches = (len(items) + batch_size - 1) // batch_size
|
|
processed = 0
|
|
|
|
for i in range(0, len(items), batch_size):
|
|
batch = items[i:i + batch_size]
|
|
batch_num = (i // batch_size) + 1
|
|
|
|
try:
|
|
process_func(batch)
|
|
processed += len(batch)
|
|
|
|
if progress_callback:
|
|
progress_callback(batch_num, total_batches, processed, len(items))
|
|
|
|
except Exception as e:
|
|
print(f"Error processing batch {batch_num}: {e}")
|
|
continue
|
|
|
|
return processed
|
|
|
|
|
|
def create_with_retry(Model, max_retries=3, **kwargs):
|
|
"""Create model instance with retry on integrity errors"""
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return Model.objects.create(**kwargs)
|
|
except IntegrityError:
|
|
if attempt == max_retries - 1:
|
|
raise
|
|
continue
|
|
return None
|