2025-08-12 13:33:25 +03:00

504 lines
19 KiB
Python

from rest_framework import viewsets, permissions, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework import filters
from django.db.models import Q, Count, Avg
from django.utils import timezone
from datetime import timedelta
from ..models import (
ImagingStudy, ImagingSeries, DICOMImage, RadiologyReport,
ReportTemplate, ImagingOrder
)
from .serializers import (
ImagingOrderSerializer, ImagingStudySerializer, ImagingSeriesSerializer,
DICOMImageSerializer, RadiologyReportSerializer, ReportTemplateSerializer,
RadiologyStatsSerializer, StudySchedulingSerializer, StudyCompletionSerializer,
ReportDictationSerializer
)
from core.utils import AuditLogger
class BaseViewSet(viewsets.ModelViewSet):
"""Base ViewSet with common functionality"""
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
def get_queryset(self):
# Filter by tenant if user has one
if hasattr(self.request.user, 'tenant') and self.request.user.tenant:
return self.queryset.filter(tenant=self.request.user.tenant)
return self.queryset
def perform_create(self, serializer):
if hasattr(self.request.user, 'tenant'):
serializer.save(tenant=self.request.user.tenant)
else:
serializer.save()
class ImagingOrderViewSet(BaseViewSet):
"""ViewSet for ImagingOrder model"""
queryset = ImagingOrder.objects.all()
serializer_class = ImagingOrderSerializer
filterset_fields = [
'status', 'priority', 'modality', 'patient', 'ordering_provider',
'contrast_required', 'location'
]
search_fields = [
'order_number', 'patient__first_name', 'patient__last_name',
'patient__mrn', 'body_part', 'clinical_indication'
]
ordering_fields = ['order_date', 'scheduled_date', 'priority']
ordering = ['-order_date']
@action(detail=False, methods=['get'])
def pending(self, request):
"""Get pending orders"""
queryset = self.get_queryset().filter(status='PENDING')
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def scheduled_today(self, request):
"""Get orders scheduled for today"""
today = timezone.now().date()
queryset = self.get_queryset().filter(
scheduled_date__date=today,
status='SCHEDULED'
)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def schedule(self, request, pk=None):
"""Schedule an imaging order"""
order = self.get_object()
serializer = StudySchedulingSerializer(data=request.data)
if serializer.is_valid():
order.scheduled_date = serializer.validated_data['scheduled_date']
order.location = serializer.validated_data['location']
order.room_number = serializer.validated_data['room_number']
order.equipment_id = serializer.validated_data['equipment_id']
order.special_instructions = serializer.validated_data.get('special_instructions', '')
order.status = 'SCHEDULED'
order.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='ORDER_SCHEDULED',
model='ImagingOrder',
object_id=str(order.order_id),
details={
'order_number': order.order_number,
'scheduled_date': str(serializer.validated_data['scheduled_date'])
}
)
return Response({'message': 'Order scheduled successfully'})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['post'])
def cancel(self, request, pk=None):
"""Cancel an imaging order"""
order = self.get_object()
reason = request.data.get('reason', '')
order.status = 'CANCELLED'
order.radiologist_notes = f"Cancelled: {reason}"
order.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='ORDER_CANCELLED',
model='ImagingOrder',
object_id=str(order.order_id),
details={
'order_number': order.order_number,
'reason': reason
}
)
return Response({'message': 'Order cancelled successfully'})
class ImagingStudyViewSet(BaseViewSet):
"""ViewSet for ImagingStudy model"""
queryset = ImagingStudy.objects.all()
serializer_class = ImagingStudySerializer
filterset_fields = [
'status', 'modality', 'imaging_order', 'performed_by', 'contrast_used'
]
search_fields = [
'accession_number', 'study_instance_uid', 'study_description',
'imaging_order__order_number', 'imaging_order__patient__first_name',
'imaging_order__patient__last_name'
]
ordering_fields = ['study_date', 'study_time']
ordering = ['-study_date', '-study_time']
@action(detail=False, methods=['get'])
def in_progress(self, request):
"""Get studies in progress"""
queryset = self.get_queryset().filter(status='IN_PROGRESS')
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def completed_today(self, request):
"""Get studies completed today"""
today = timezone.now().date()
queryset = self.get_queryset().filter(
study_date=today,
status='COMPLETED'
)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def start(self, request, pk=None):
"""Start an imaging study"""
study = self.get_object()
study.status = 'IN_PROGRESS'
study.performed_by = request.user
study.save()
# Update order status
study.imaging_order.status = 'IN_PROGRESS'
study.imaging_order.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='STUDY_STARTED',
model='ImagingStudy',
object_id=str(study.study_id),
details={'accession_number': study.accession_number}
)
return Response({'message': 'Study started successfully'})
@action(detail=True, methods=['post'])
def complete(self, request, pk=None):
"""Complete an imaging study"""
study = self.get_object()
serializer = StudyCompletionSerializer(data=request.data)
if serializer.is_valid():
study.status = 'COMPLETED'
study.contrast_used = serializer.validated_data.get('contrast_used', False)
study.contrast_agent = serializer.validated_data.get('contrast_agent', '')
study.contrast_volume = serializer.validated_data.get('contrast_volume')
study.radiation_dose = serializer.validated_data.get('radiation_dose')
study.study_comments = serializer.validated_data.get('study_comments', '')
study.save()
# Update order status
study.imaging_order.status = 'COMPLETED'
study.imaging_order.completed_date = timezone.now()
study.imaging_order.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='STUDY_COMPLETED',
model='ImagingStudy',
object_id=str(study.study_id),
details={'accession_number': study.accession_number}
)
return Response({'message': 'Study completed successfully'})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class ImagingSeriesViewSet(viewsets.ReadOnlyModelViewSet):
"""ViewSet for ImagingSeries model (read-only)"""
queryset = ImagingSeries.objects.all()
serializer_class = ImagingSeriesSerializer
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['imaging_study', 'modality', 'body_part_examined']
search_fields = ['series_instance_uid', 'series_description']
ordering_fields = ['series_number', 'series_date', 'series_time']
ordering = ['series_number']
def get_queryset(self):
if hasattr(self.request.user, 'tenant') and self.request.user.tenant:
return self.queryset.filter(tenant=self.request.user.tenant)
return self.queryset
class DICOMImageViewSet(viewsets.ReadOnlyModelViewSet):
"""ViewSet for DICOMImage model (read-only)"""
queryset = DICOMImage.objects.all()
serializer_class = DICOMImageSerializer
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['imaging_series']
search_fields = ['sop_instance_uid']
ordering_fields = ['instance_number', 'slice_location']
ordering = ['instance_number']
def get_queryset(self):
if hasattr(self.request.user, 'tenant') and self.request.user.tenant:
return self.queryset.filter(tenant=self.request.user.tenant)
return self.queryset
class ReportTemplateViewSet(BaseViewSet):
"""ViewSet for ReportTemplate model"""
queryset = ReportTemplate.objects.all()
serializer_class = ReportTemplateSerializer
filterset_fields = ['modality', 'body_part', 'procedure_type', 'is_active']
search_fields = ['name', 'description', 'template_content']
ordering_fields = ['name', 'modality']
ordering = ['name']
@action(detail=False, methods=['get'])
def by_modality(self, request):
"""Get templates by modality"""
modality = request.query_params.get('modality')
if modality:
queryset = self.get_queryset().filter(modality=modality, is_active=True)
else:
queryset = self.get_queryset().filter(is_active=True)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class RadiologyReportViewSet(BaseViewSet):
"""ViewSet for RadiologyReport model"""
queryset = RadiologyReport.objects.all()
serializer_class = RadiologyReportSerializer
filterset_fields = [
'status', 'radiologist', 'imaging_study', 'critical_result'
]
search_fields = [
'imaging_study__accession_number', 'findings', 'impression',
'imaging_study__imaging_order__patient__first_name',
'imaging_study__imaging_order__patient__last_name'
]
ordering_fields = ['dictated_date', 'signed_date']
ordering = ['-dictated_date']
@action(detail=False, methods=['post'])
def dictate(self, request):
"""Dictate a radiology report"""
serializer = ReportDictationSerializer(data=request.data)
if serializer.is_valid():
study = ImagingStudy.objects.get(id=serializer.validated_data['study_id'])
template_id = serializer.validated_data.get('template_id')
template = None
if template_id:
template = ReportTemplate.objects.get(id=template_id)
# Create report
report = RadiologyReport.objects.create(
imaging_study=study,
report_template=template,
radiologist=request.user,
dictated_date=timezone.now(),
status='DRAFT',
clinical_history=serializer.validated_data['clinical_history'],
technique=serializer.validated_data['technique'],
findings=serializer.validated_data['findings'],
impression=serializer.validated_data['impression'],
recommendations=serializer.validated_data.get('recommendations', ''),
comparison_studies=serializer.validated_data.get('comparison_studies', ''),
critical_result=serializer.validated_data.get('critical_result', False),
tenant=getattr(request.user, 'tenant', None)
)
# Log the action
AuditLogger.log_action(
user=request.user,
action='REPORT_DICTATED',
model='RadiologyReport',
object_id=str(report.report_id),
details={
'study_accession': study.accession_number,
'critical_result': report.critical_result
}
)
return Response({
'message': 'Report dictated successfully',
'report': RadiologyReportSerializer(report).data
})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['get'])
def pending_signature(self, request):
"""Get reports pending signature"""
queryset = self.get_queryset().filter(status='TRANSCRIBED')
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def critical_results(self, request):
"""Get critical results"""
queryset = self.get_queryset().filter(
critical_result=True,
status='SIGNED'
)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def sign(self, request, pk=None):
"""Sign a radiology report"""
report = self.get_object()
if report.status != 'TRANSCRIBED':
return Response(
{'error': 'Report must be transcribed before signing'},
status=status.HTTP_400_BAD_REQUEST
)
report.status = 'SIGNED'
report.signed_date = timezone.now()
report.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='REPORT_SIGNED',
model='RadiologyReport',
object_id=str(report.report_id),
details={
'study_accession': report.imaging_study.accession_number,
'critical_result': report.critical_result
}
)
return Response({'message': 'Report signed successfully'})
@action(detail=True, methods=['post'])
def add_addendum(self, request, pk=None):
"""Add addendum to a report"""
report = self.get_object()
addendum_text = request.data.get('addendum', '')
if not addendum_text:
return Response(
{'error': 'Addendum text is required'},
status=status.HTTP_400_BAD_REQUEST
)
current_addendum = report.addendum or ''
timestamp = timezone.now().strftime('%Y-%m-%d %H:%M:%S')
new_addendum = f"{current_addendum}\n\nADDENDUM ({timestamp}):\n{addendum_text}"
report.addendum = new_addendum
report.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='REPORT_ADDENDUM_ADDED',
model='RadiologyReport',
object_id=str(report.report_id),
details={'study_accession': report.imaging_study.accession_number}
)
return Response({'message': 'Addendum added successfully'})
class RadiologyStatsViewSet(viewsets.ViewSet):
"""ViewSet for radiology statistics"""
permission_classes = [permissions.IsAuthenticated]
@action(detail=False, methods=['get'])
def dashboard(self, request):
"""Get radiology dashboard statistics"""
tenant_filter = {}
if hasattr(request.user, 'tenant') and request.user.tenant:
tenant_filter['tenant'] = request.user.tenant
today = timezone.now().date()
# Order statistics
orders = ImagingOrder.objects.filter(**tenant_filter)
total_orders = orders.count()
pending_orders = orders.filter(status='PENDING').count()
completed_today = orders.filter(
completed_date__date=today
).count()
# Study statistics
studies = ImagingStudy.objects.filter(**tenant_filter)
studies_performed = studies.filter(
study_date=today
).count()
# Report statistics
reports = RadiologyReport.objects.filter(**tenant_filter)
reports_pending = reports.filter(status__in=['DRAFT', 'TRANSCRIBED']).count()
critical_results = reports.filter(
critical_result=True,
signed_date__date=today
).count()
# Turnaround time
completed_reports = reports.filter(
signed_date__date=today
)
turnaround_times = []
for report in completed_reports:
if report.imaging_study.study_date and report.signed_date:
study_datetime = timezone.make_aware(
timezone.datetime.combine(
report.imaging_study.study_date,
report.imaging_study.study_time or timezone.datetime.min.time()
)
)
delta = report.signed_date - study_datetime
turnaround_times.append(delta.total_seconds() / 3600) # hours
average_turnaround = sum(turnaround_times) / len(turnaround_times) if turnaround_times else 0
# Equipment utilization (mock data)
equipment_utilization = {
'CT Scanner 1': 85.5,
'MRI 1': 92.3,
'X-Ray Room 1': 67.8,
'Ultrasound 1': 78.9
}
# Modality breakdown
modality_breakdown = orders.values('modality').annotate(
count=Count('id')
).order_by('-count')
# Status breakdown
status_breakdown = orders.values('status').annotate(
count=Count('id')
).order_by('-count')
stats = {
'total_orders': total_orders,
'pending_orders': pending_orders,
'completed_today': completed_today,
'studies_performed': studies_performed,
'reports_pending': reports_pending,
'critical_results': critical_results,
'average_turnaround': round(average_turnaround, 2),
'equipment_utilization': equipment_utilization,
'modality_breakdown': {item['modality']: item['count'] for item in modality_breakdown},
'status_breakdown': {item['status']: item['count'] for item in status_breakdown}
}
serializer = RadiologyStatsSerializer(stats)
return Response(serializer.data)