Marwan Alwali 263292f6be update
2025-11-04 00:50:06 +03:00

372 lines
12 KiB
Python

"""
Database utilities and model helpers for data generation.
Contains common database operations and model field utilities.
"""
import random
from datetime import datetime, timedelta
from decimal import Decimal
from django.db import transaction, IntegrityError
from django.db.models import DecimalField, CharField, IntegerField
from django.db.models.fields import NOT_PROVIDED
from django.utils import timezone as django_timezone
# ================================
# MODEL FIELD UTILITIES
# ================================
def _model_fields(Model):
"""Get all concrete model fields"""
return {f.name for f in Model._meta.get_fields() if getattr(f, "concrete", False) and not f.auto_created}
def _filter_kwargs(Model, data: dict):
"""Filter dictionary to only include valid model fields"""
allowed = _model_fields(Model)
return {k: v for k, v in data.items() if k in allowed}
def _quantize_for_field(value: Decimal, f: DecimalField) -> Decimal:
"""Quantize a Decimal to a field's decimal_places and clamp to max_digits."""
# Quantize to required decimal_places
q = Decimal(1).scaleb(-f.decimal_places) # 10^(-decimal_places)
v = Decimal(value).quantize(q)
# Ensure total digits <= max_digits (digits before + decimal_places)
# Count digits before decimal:
sign, digits, exp = v.as_tuple()
digits_str_len = len(digits)
# number of digits after decimal is decimal_places
digits_before = digits_str_len - f.decimal_places if f.decimal_places else digits_str_len
# if v is 0.x and decimal_places > digits, digits_before can be negative; normalize
if digits_before < 0:
digits_before = 0
max_before = f.max_digits - f.decimal_places
if max_before < 0:
max_before = 0
# If too many digits before decimal, clamp to the largest representable value
if digits_before > max_before:
# Largest integer part we can store is 10^max_before - 1
max_int = (10 ** max_before) - 1 if max_before > 0 else 0
v = Decimal(max_int).quantize(q)
return v
def _default_decimal_for(BillingConfiguration, f: DecimalField) -> Decimal:
"""Return a safe default decimal per field name & precision."""
name = f.name.lower()
if any(k in name for k in ["tax", "vat", "rate"]):
# Prefer 15% if it's a rate; use 15 if integer percent
base = Decimal("15") if f.decimal_places == 0 else Decimal("0.15")
else:
base = Decimal("0")
return _quantize_for_field(base, f)
# ================================
# SEQUENTIAL NUMBER GENERATORS
# ================================
def _next_seq_number(prefix, Model, field):
"""Generate next sequential number for a model field"""
today = django_timezone.now().date().strftime("%Y%m%d")
i = 1
while True:
candidate = f"{prefix}-{today}-{i:04d}"
if not Model.objects.filter(**{field: candidate}).exists():
return candidate
i += 1
def _next_claim_number():
"""Generate next insurance claim number"""
from billing.models import InsuranceClaim
return _next_seq_number("CLM", InsuranceClaim, "claim_number")
def _next_payment_number():
"""Generate next payment number"""
from billing.models import Payment
return _next_seq_number("PMT", Payment, "payment_number")
def _next_bill_number():
"""Generate next bill number"""
from billing.models import MedicalBill
return _next_seq_number("BILL", MedicalBill, "bill_number")
def _next_case_number(tenant):
"""Generate next surgical case number"""
from operating_theatre.models import SurgicalCase
return f"CASE-{tenant.slug.upper()}-{django_timezone.now().year}-{random.randint(1000, 9999):05d}"
# ================================
# DATABASE OPERATIONS
# ================================
def safe_bulk_create(Model, objects, batch_size=1000, ignore_conflicts=False):
"""Safely bulk create objects with error handling"""
if not objects:
return 0
created_count = 0
for i in range(0, len(objects), batch_size):
batch = objects[i:i + batch_size]
try:
with transaction.atomic():
if ignore_conflicts:
Model.objects.bulk_create(batch, ignore_conflicts=True)
else:
Model.objects.bulk_create(batch)
created_count += len(batch)
except IntegrityError as e:
print(f"Integrity error in batch {i//batch_size + 1}: {e}")
# Try individual creates for problematic batch
for obj in batch:
try:
obj.save()
created_count += 1
except IntegrityError:
continue # Skip duplicates
except Exception as e:
print(f"Error in batch {i//batch_size + 1}: {e}")
continue
return created_count
def safe_get_or_create(Model, defaults=None, **kwargs):
"""Safe get_or_create with error handling"""
try:
return Model.objects.get_or_create(defaults=defaults or {}, **kwargs)
except Exception as e:
print(f"Error in get_or_create for {Model.__name__}: {e}")
return None, False
def safe_create(Model, **kwargs):
"""Safe create with error handling"""
try:
return Model.objects.create(**kwargs)
except Exception as e:
print(f"Error creating {Model.__name__}: {e}")
return None
# ================================
# TENANT UTILITIES
# ================================
def get_tenant_users(tenant, roles=None, limit=20):
"""Get users for a tenant with optional role filtering"""
from django.contrib.auth import get_user_model
User = get_user_model()
qs = User.objects.filter(tenant=tenant, is_active=True)
if roles:
qs = qs.filter(employee_profile__role__in=roles)
return list(qs[:limit])
def get_tenant_providers(tenant, clinical_only=True):
"""Get healthcare providers for a tenant"""
from django.contrib.auth import get_user_model
User = get_user_model()
clinical_roles = ['PHYSICIAN', 'NURSE_PRACTITIONER', 'PHYSICIAN_ASSISTANT', 'RADIOLOGIST']
qs = User.objects.filter(tenant=tenant, is_active=True)
if clinical_only:
qs = qs.filter(employee_profile__role__in=clinical_roles)
return list(qs)
def get_tenant_patients(tenant, limit=100):
"""Get patients for a tenant"""
from patients.models import PatientProfile
return list(PatientProfile.objects.filter(tenant=tenant)[:limit])
def get_tenant_departments(tenant):
"""Get departments for a tenant"""
from hr.models import Department
return list(Department.objects.filter(tenant=tenant))
# ================================
# PROGRESS TRACKING
# ================================
class ProgressTracker:
"""Track progress of data generation operations"""
def __init__(self, total_operations=0):
self.total_operations = total_operations
self.completed_operations = 0
self.start_time = django_timezone.now()
def increment(self, count=1):
"""Increment completed operations"""
self.completed_operations += count
def get_progress(self):
"""Get current progress percentage"""
if self.total_operations == 0:
return 100
return int((self.completed_operations / self.total_operations) * 100)
def get_eta(self):
"""Estimate time remaining"""
if self.completed_operations == 0:
return None
elapsed = django_timezone.now() - self.start_time
total_estimated = elapsed * (self.total_operations / self.completed_operations)
remaining = total_estimated - elapsed
return remaining
def print_progress(self, operation_name=""):
"""Print current progress"""
progress = self.get_progress()
eta = self.get_eta()
eta_str = f" ETA: {eta}" if eta else ""
print(f"[{progress:3d}%] {operation_name}{eta_str}")
# ================================
# VALIDATION UTILITIES
# ================================
def validate_tenant_exists(tenant_id=None, tenant_slug=None):
"""Validate that tenant exists"""
from core.models import Tenant
if tenant_id:
try:
return Tenant.objects.get(id=tenant_id)
except Tenant.DoesNotExist:
raise ValueError(f"Tenant with ID {tenant_id} does not exist")
if tenant_slug:
try:
return Tenant.objects.get(slug=tenant_slug)
except Tenant.DoesNotExist:
raise ValueError(f"Tenant with slug {tenant_slug} does not exist")
# Return first active tenant if no specific tenant requested
tenant = Tenant.objects.filter(is_active=True).first()
if not tenant:
raise ValueError("No active tenants found")
return tenant
def validate_dependencies():
"""Validate that all required dependencies exist"""
from core.models import Tenant
from django.contrib.auth import get_user_model
User = get_user_model()
# Check for tenants
tenant_count = Tenant.objects.filter(is_active=True).count()
if tenant_count == 0:
raise ValueError("No active tenants found. Please create tenants first.")
# Check for users
user_count = User.objects.filter(is_active=True).count()
if user_count == 0:
raise ValueError("No active users found. Please create users first.")
return {
'tenants': tenant_count,
'users': user_count
}
# ================================
# CLEANUP UTILITIES
# ================================
def cleanup_test_data(tenant=None, confirm=False):
"""Clean up test data (use with caution!)"""
if not confirm:
print("WARNING: This will delete test data. Set confirm=True to proceed.")
return
from django.core.management import call_command
# Reset sequences and clear data
models_to_clear = [
'laboratory.LabResult',
'laboratory.Specimen',
'laboratory.LabOrder',
'emr.ClinicalNote',
'emr.CarePlan',
'emr.ProblemList',
'emr.VitalSigns',
'emr.Encounter',
'appointments.AppointmentRequest',
'patients.PatientProfile',
'hr.Employee',
'accounts.User',
]
for model in models_to_clear:
try:
call_command('flush', model, verbosity=0, interactive=False)
except Exception as e:
print(f"Error clearing {model}: {e}")
print("Test data cleanup completed.")
# ================================
# BATCH PROCESSING UTILITIES
# ================================
def batch_process(items, batch_size=100, process_func=None, progress_callback=None):
"""Process items in batches with progress tracking"""
if not process_func:
return
total_batches = (len(items) + batch_size - 1) // batch_size
processed = 0
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_num = (i // batch_size) + 1
try:
process_func(batch)
processed += len(batch)
if progress_callback:
progress_callback(batch_num, total_batches, processed, len(items))
except Exception as e:
print(f"Error processing batch {batch_num}: {e}")
continue
return processed
def create_with_retry(Model, max_retries=3, **kwargs):
"""Create model instance with retry on integrity errors"""
for attempt in range(max_retries):
try:
return Model.objects.create(**kwargs)
except IntegrityError:
if attempt == max_retries - 1:
raise
continue
return None