Marwan Alwali ab2c4a36c5 update
2025-10-02 10:13:03 +03:00

300 lines
9.7 KiB
Python

"""
Managers for radiology app models.
Provides optimized queries and business logic methods.
"""
from django.db import models, transaction
from django.utils import timezone
from django.core.exceptions import ValidationError
from datetime import timedelta
from .constants import RadiologyBusinessRules
class ImagingStudyManager(models.Manager):
"""Manager for ImagingStudy model with optimized queries."""
def with_related(self):
"""Get studies with commonly accessed related objects."""
return self.select_related(
'patient',
'referring_physician',
'radiologist',
'encounter',
'imaging_order'
).prefetch_related(
'series__images',
'report'
)
def by_status(self, status):
"""Filter studies by status."""
return self.filter(status=status)
def by_priority(self, priority):
"""Filter studies by priority."""
return self.filter(priority=priority)
def stat_studies(self):
"""Get STAT and emergency studies."""
return self.filter(priority__in=['STAT', 'EMERGENCY'])
def pending_interpretation(self):
"""Get studies pending radiologist interpretation."""
return self.filter(status='COMPLETED', radiologist__isnull=True)
def overdue_studies(self, hours=24):
"""Get studies that are overdue for completion."""
cutoff_time = timezone.now() - timedelta(hours=hours)
return self.filter(
study_datetime__lt=cutoff_time,
status__in=['SCHEDULED', 'ARRIVED', 'IN_PROGRESS']
)
def by_modality(self, modality):
"""Filter studies by modality."""
return self.filter(modality=modality)
def by_date_range(self, start_date, end_date):
"""Filter studies by date range."""
return self.filter(study_datetime__date__range=[start_date, end_date])
def for_patient(self, patient):
"""Get all studies for a specific patient."""
return self.filter(patient=patient).order_by('-study_datetime')
def critical_pending(self):
"""Get critical studies that need immediate attention."""
return self.filter(
priority__in=['STAT', 'EMERGENCY'],
status__in=['SCHEDULED', 'ARRIVED', 'IN_PROGRESS']
).order_by('study_datetime')
@transaction.atomic
def generate_accession_number(self, tenant):
"""Generate unique accession number atomically."""
from django.db import connection
today = timezone.now().date()
date_str = today.strftime('%Y%m%d')
# Use database-level sequence for thread safety
with connection.cursor() as cursor:
cursor.execute("""
SELECT COALESCE(MAX(CAST(SUBSTRING(accession_number FROM 'RAD-\\d{8}-(\\d{4})') AS INTEGER)), 0) + 1
FROM radiology_imaging_study
WHERE tenant_id = %s AND DATE(created_at) = %s
""", [tenant.id, today])
next_number = cursor.fetchone()[0] or 1
return f"RAD-{date_str}-{next_number:04d}"
class RadiologyReportManager(models.Manager):
"""Manager for RadiologyReport model."""
def with_study_details(self):
"""Get reports with study details."""
return self.select_related(
'study__patient',
'study__referring_physician',
'radiologist',
'dictated_by',
'transcribed_by'
)
def pending_reports(self):
"""Get reports in draft or preliminary status."""
return self.filter(status__in=['DRAFT', 'PRELIMINARY'])
def critical_reports(self):
"""Get reports with critical findings."""
return self.filter(critical_finding=True)
def uncommunicated_critical(self):
"""Get critical reports that haven't been communicated."""
return self.filter(
critical_finding=True,
critical_communicated=False
)
def by_radiologist(self, radiologist):
"""Get reports by specific radiologist."""
return self.filter(radiologist=radiologist)
def overdue_reports(self):
"""Get reports that are overdue based on study priority."""
overdue_reports = []
for report in self.filter(status__in=['DRAFT', 'PRELIMINARY']):
study = report.study
priority = study.priority
if priority in RadiologyBusinessRules.PRIORITY_ESCALATION_HOURS:
max_hours = RadiologyBusinessRules.PRIORITY_ESCALATION_HOURS[priority]
cutoff_time = study.study_datetime + timedelta(hours=max_hours)
if timezone.now() > cutoff_time:
overdue_reports.append(report.id)
return self.filter(id__in=overdue_reports)
def finalized_today(self):
"""Get reports finalized today."""
today = timezone.now().date()
return self.filter(
status='FINAL',
finalized_datetime__date=today
)
class ImagingOrderManager(models.Manager):
"""Manager for ImagingOrder model."""
def with_patient_details(self):
"""Get orders with patient details."""
return self.select_related(
'patient',
'ordering_provider',
'encounter'
).prefetch_related('studies')
def pending_orders(self):
"""Get pending orders."""
return self.filter(status='PENDING')
def stat_orders(self):
"""Get STAT and emergency orders."""
return self.filter(priority__in=['STAT', 'EMERGENCY'])
def by_modality(self, modality):
"""Filter orders by modality."""
return self.filter(modality=modality)
def overdue_scheduling(self, hours=24):
"""Get orders overdue for scheduling."""
cutoff_time = timezone.now() - timedelta(hours=hours)
return self.filter(
order_datetime__lt=cutoff_time,
status='PENDING'
)
def for_scheduling(self):
"""Get orders ready for scheduling."""
return self.filter(
status__in=['PENDING', 'SCHEDULED']
).order_by('priority', 'order_datetime')
@transaction.atomic
def generate_order_number(self, tenant):
"""Generate unique order number atomically."""
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("""
SELECT COALESCE(MAX(CAST(SUBSTRING(order_number FROM 'IMG-\\d+-(\\d{6})') AS INTEGER)), 0) + 1
FROM radiology_imaging_order
WHERE tenant_id = %s
""", [tenant.id])
next_number = cursor.fetchone()[0] or 1
return f"IMG-{tenant.id}-{next_number:06d}"
class ImagingSeriesManager(models.Manager):
"""Manager for ImagingSeries model."""
def with_study_patient(self):
"""Get series with study and patient details."""
return self.select_related(
'study__patient',
'study__referring_physician'
)
def by_modality(self, modality):
"""Filter series by modality."""
return self.filter(modality=modality)
def with_images(self):
"""Get series with their images."""
return self.prefetch_related('images')
def incomplete_series(self):
"""Get series that may be incomplete."""
return self.filter(
number_of_instances=0
).exclude(
study__status='CANCELLED'
)
class DICOMImageManager(models.Manager):
"""Manager for DICOMImage model."""
def with_series_study(self):
"""Get images with series and study details."""
return self.select_related(
'series__study__patient'
)
def unprocessed(self):
"""Get unprocessed images."""
return self.filter(processed=False)
def by_quality(self, quality):
"""Filter images by quality."""
return self.filter(image_quality=quality)
def poor_quality(self):
"""Get images with poor or unacceptable quality."""
return self.filter(
image_quality__in=['POOR', 'UNACCEPTABLE']
)
def large_files(self, size_mb=100):
"""Get images larger than specified size in MB."""
size_bytes = size_mb * 1024 * 1024
return self.filter(file_size__gt=size_bytes)
def archived(self):
"""Get archived images."""
return self.filter(archived=True)
class ReportTemplateManager(models.Manager):
"""Manager for ReportTemplate model."""
def active_templates(self):
"""Get active templates."""
return self.filter(is_active=True)
def for_modality(self, modality):
"""Get templates for specific modality."""
return self.filter(
modality__in=[modality, 'ALL'],
is_active=True
)
def for_body_part(self, body_part):
"""Get templates for specific body part."""
return self.filter(
body_part__in=[body_part, 'ALL'],
is_active=True
)
def default_template(self, modality, body_part):
"""Get default template for modality and body part."""
return self.filter(
modality__in=[modality, 'ALL'],
body_part__in=[body_part, 'ALL'],
is_default=True,
is_active=True
).first()
def most_used(self, limit=10):
"""Get most frequently used templates."""
return self.filter(
is_active=True
).order_by('-usage_count')[:limit]