2025-10-06 15:25:37 +03:00

1502 lines
51 KiB
Python

"""
Radiology app views with healthcare-focused CRUD operations.
Implements appropriate access patterns for radiology and imaging workflows.
"""
from django.shortcuts import render, get_object_or_404, redirect
from django.contrib.auth.decorators import login_required
from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin
from django.views.generic import (
ListView, DetailView, CreateView, UpdateView, DeleteView, TemplateView
)
from django.http import JsonResponse, HttpResponse, FileResponse, Http404
from django.db.models import Q, Count, Avg, Sum, F
from django.utils import timezone
from django.contrib import messages
from django.urls import reverse_lazy, reverse
from django.core.paginator import Paginator
from django.template.loader import render_to_string
from datetime import datetime, timedelta, date
import json
import os
import mimetypes
from PIL import Image
import numpy as np
from core.utils import AuditLogger
from .models import (
ImagingOrder, ImagingStudy, ImagingSeries, DICOMImage,
RadiologyReport, ReportTemplate
)
from .forms import (
ImagingOrderForm, ImagingStudyForm, RadiologyReportForm,
ReportTemplateForm, ImagingSeriesForm
)
class RadiologyDashboardView(LoginRequiredMixin, TemplateView):
"""
Main radiology dashboard with key metrics and recent activity.
"""
template_name = 'radiology/dashboard.html'
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
tenant = self.request.user.tenant
today = timezone.now().date()
# Dashboard statistics
context.update({
'pending_orders': ImagingOrder.objects.filter(
tenant=tenant,
status='PENDING'
).count(),
'scheduled_studies': ImagingStudy.objects.filter(
tenant=tenant,
status='SCHEDULED'
).count(),
'in_progress_studies': ImagingStudy.objects.filter(
tenant=tenant,
status='IN_PROGRESS'
).count(),
'studies_completed_today': ImagingStudy.objects.filter(
tenant=tenant,
completed_datetime__date=today,
status='COMPLETED'
).count(),
'reports_pending': RadiologyReport.objects.filter(
study__tenant=tenant,
status='DRAFT'
).count(),
'reports_signed_today': RadiologyReport.objects.filter(
study__tenant=tenant,
# signed_datetime__date=today,
status='SIGNED'
).count(),
'critical_findings': RadiologyReport.objects.filter(
study__tenant=tenant,
# has_critical_findings=True,
status='SIGNED',
# signed_datetime__date=today
).count(),
'total_images_today': DICOMImage.objects.filter(
series__study__tenant=tenant,
created_at__date=today
).count(),
})
# Recent orders
context['recent_orders'] = ImagingOrder.objects.filter(
tenant=tenant
).select_related('patient', 'ordering_provider').order_by('-order_datetime')[:10]
# Recent studies
context['recent_studies'] = ImagingStudy.objects.filter(
tenant=tenant
).select_related('patient').order_by('-study_datetime')[:10]
# Recent reports
context['recent_reports'] = RadiologyReport.objects.filter(
study__tenant=tenant,
).select_related('study__imaging_order__patient')[:10]
return context
class ReportTemplateListView(LoginRequiredMixin, ListView):
"""
List all radiology report templates with filtering and search.
"""
model = ReportTemplate
template_name = 'radiology/report_template_list.html'
context_object_name = 'report_templates'
paginate_by = 25
def get_queryset(self):
queryset = ReportTemplate.objects.filter(tenant=self.request.user.tenant)
# Search functionality
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(template_name__icontains=search) |
Q(modality__icontains=search) |
Q(body_part__icontains=search)
)
# Filter by modality
modality = self.request.GET.get('modality')
if modality:
queryset = queryset.filter(modality=modality)
# Filter by active status
active_only = self.request.GET.get('active_only')
if active_only:
queryset = queryset.filter(is_active=True)
return queryset.order_by('name')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context.update({
'modalities': ReportTemplate._meta.get_field('modality').choices,
'search_query': self.request.GET.get('search', ''),
})
return context
class ReportTemplateDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about a report template.
"""
model = ReportTemplate
template_name = 'radiology/templates/report_template_detail.html'
context_object_name = 'report_template'
def get_queryset(self):
return ReportTemplate.objects.filter(tenant=self.request.user.tenant)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
template = self.object
# Get recent reports using this template
context['recent_reports'] = RadiologyReport.objects.filter(
template_used=template,
study__tenant=self.request.user.tenant
).select_related('study__order__patient').order_by('-created_at')[:10]
return context
class ReportTemplateCreateView(LoginRequiredMixin, PermissionRequiredMixin, CreateView):
"""
Create a new report template.
"""
model = ReportTemplate
form_class = ReportTemplateForm
template_name = 'radiology/templates/report_template_form.html'
permission_required = 'radiology.add_reporttemplate'
success_url = reverse_lazy('radiology:report_template_list')
def form_valid(self, form):
form.instance.tenant = self.request.user.tenant
form.instance.created_by = self.request.user
response = super().form_valid(form)
# Log the action
AuditLogger.log_event(
user=self.request.user,
action='REPORT_TEMPLATE_CREATED',
model='ReportTemplate',
object_id=str(self.object.id),
details={
'template_name': self.object.template_name,
'modality': self.object.modality
}
)
messages.success(self.request, f'Report template "{self.object.template_name}" created successfully.')
return response
class ReportTemplateUpdateView(LoginRequiredMixin, PermissionRequiredMixin, UpdateView):
"""
Update an existing report template.
"""
model = ReportTemplate
form_class = ReportTemplateForm
template_name = 'radiology/templates/report_template_form.html'
permission_required = 'radiology.change_reporttemplate'
def get_queryset(self):
return ReportTemplate.objects.filter(tenant=self.request.user.tenant)
def get_success_url(self):
return reverse('radiology:report_template_detail', kwargs={'pk': self.object.pk})
def form_valid(self, form):
response = super().form_valid(form)
# Log the action
AuditLogger.log_event(
user=self.request.user,
action='REPORT_TEMPLATE_UPDATED',
model='ReportTemplate',
object_id=str(self.object.id),
details={
'template_name': self.object.template_name,
'changes': form.changed_data
}
)
messages.success(self.request, f'Report template "{self.object.template_name}" updated successfully.')
return response
class ReportTemplateDeleteView(LoginRequiredMixin, PermissionRequiredMixin, DeleteView):
"""
Delete a report template (soft delete by deactivating).
"""
model = ReportTemplate
template_name = 'radiology/templates/report_template_confirm_delete.html'
permission_required = 'radiology.delete_reporttemplate'
success_url = reverse_lazy('radiology:report_template_list')
def get_queryset(self):
return ReportTemplate.objects.filter(tenant=self.request.user.tenant)
def delete(self, request, *args, **kwargs):
self.object = self.get_object()
# Soft delete by deactivating instead of actual deletion
self.object.is_active = False
self.object.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='REPORT_TEMPLATE_DEACTIVATED',
model='ReportTemplate',
object_id=str(self.object.id),
details={'template_name': self.object.name}
)
messages.success(request, f'Report template "{self.object.name}" deactivated successfully.')
return redirect(self.success_url)
class ImagingOrderListView(LoginRequiredMixin, ListView):
"""
List all imaging orders with filtering and search.
"""
model = ImagingOrder
template_name = 'radiology/orders/imaging_order_list.html'
context_object_name = 'imaging_orders'
paginate_by = 25
def get_queryset(self):
queryset = ImagingOrder.objects.filter(tenant=self.request.user.tenant)
# Search functionality
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(patient__first_name__icontains=search) |
Q(patient__last_name__icontains=search) |
Q(patient__mrn__icontains=search) |
Q(study_description__icontains=search)
)
# Filter by status
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
# Filter by modality
modality = self.request.GET.get('modality')
if modality:
queryset = queryset.filter(modality=modality)
# Filter by priority
priority = self.request.GET.get('priority')
if priority:
queryset = queryset.filter(priority=priority)
# Filter by date range
date_from = self.request.GET.get('date_from')
date_to = self.request.GET.get('date_to')
if date_from:
queryset = queryset.filter(order_datetime__date__gte=date_from)
if date_to:
queryset = queryset.filter(order_datetime__date__lte=date_to)
return queryset.select_related(
'patient', 'ordering_provider'
).order_by('-order_datetime')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context.update({
'statuses': ImagingOrder._meta.get_field('status').choices,
'modalities': ImagingOrder._meta.get_field('modality').choices,
'priorities': ImagingOrder._meta.get_field('priority').choices,
})
return context
class ImagingOrderDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about an imaging order.
Refactored to use improved managers and fix logic issues.
"""
model = ImagingOrder
template_name = 'radiology/orders/imaging_order_detail.html'
context_object_name = 'order'
def get_queryset(self):
"""Use the custom manager for optimized queries."""
return ImagingOrder.objects.with_patient_details().filter(
tenant=self.request.user.tenant
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
imaging_order = self.object
# Get studies for this order using optimized manager
studies = ImagingStudy.objects.with_related().filter(
imaging_order=imaging_order
).order_by('-study_datetime')
context['studies'] = studies
# Get all reports for studies in this order using optimized manager
if studies.exists():
study_ids = list(studies.values_list('id', flat=True))
reports = RadiologyReport.objects.with_study_details().filter(
study_id__in=study_ids
).order_by('-created_at')
context['reports'] = reports
# Create a mapping of study_id to reports for template use
reports_by_study = {}
for report in reports:
study_id = report.study_id
if study_id not in reports_by_study:
reports_by_study[study_id] = []
reports_by_study[study_id].append(report)
context['reports_by_study'] = reports_by_study
else:
context['reports'] = RadiologyReport.objects.none()
context['reports_by_study'] = {}
# Add summary statistics
context['study_count'] = studies.count()
context['completed_studies'] = studies.filter(status='COMPLETED').count()
context['pending_reports'] = RadiologyReport.objects.filter(
study__in=studies,
status='DRAFT'
).count() if studies.exists() else 0
# Add critical findings count
context['critical_findings'] = RadiologyReport.objects.filter(
study__in=studies,
critical_finding=True
).count() if studies.exists() else 0
# Check if order can be modified (business rule)
context['can_modify'] = imaging_order.status in ['PENDING', 'SCHEDULED']
# Get available report templates for quick access
context['report_templates'] = ReportTemplate.objects.for_modality(
imaging_order.modality
).filter(
body_part__in=[imaging_order.body_part, 'ALL']
)[:5] # Limit to 5 most relevant templates
return context
class ImagingOrderCreateView(LoginRequiredMixin, PermissionRequiredMixin, CreateView):
"""
Create a new imaging order.
"""
model = ImagingOrder
form_class = ImagingOrderForm
template_name = 'radiology/orders/imaging_order_form.html'
permission_required = 'radiology.add_imagingorder'
success_url = reverse_lazy('radiology:imaging_order_list')
def form_valid(self, form):
form.instance.tenant = self.request.user.tenant
form.instance.ordering_provider = self.request.user
form.instance.order_datetime = timezone.now()
response = super().form_valid(form)
# Log the action
AuditLogger.log_action(
user=self.request.user,
action='IMAGING_ORDER_CREATED',
model='ImagingOrder',
object_id=str(self.object.order_id),
details={
'patient_name': f"{self.object.patient.first_name} {self.object.patient.last_name}",
'modality': self.object.modality,
'study_description': self.object.study_description,
'priority': self.object.priority
}
)
messages.success(self.request, 'Imaging order created successfully.')
return response
class ImagingOrderUpdateView(LoginRequiredMixin, PermissionRequiredMixin, UpdateView):
"""
Update imaging order (limited to status and notes only).
"""
model = ImagingOrder
fields = ['status', 'notes'] # Restricted fields for clinical orders
template_name = 'radiology/orders/imaging_order_form.html'
permission_required = 'radiology.change_imagingorder'
def get_queryset(self):
return ImagingOrder.objects.filter(tenant=self.request.user.tenant)
def get_success_url(self):
return reverse('radiology:imaging_order_detail', kwargs={'pk': self.object.pk})
def form_valid(self, form):
response = super().form_valid(form)
# Log the action
AuditLogger.log_action(
user=self.request.user,
action='IMAGING_ORDER_UPDATED',
model='ImagingOrder',
object_id=str(self.object.order_id),
details={
'patient_name': f"{self.object.patient.first_name} {self.object.patient.last_name}",
'changes': form.changed_data
}
)
messages.success(self.request, 'Imaging order updated successfully.')
return response
class ImagingStudyListView(LoginRequiredMixin, ListView):
"""
List all imaging studies with filtering and search.
"""
model = ImagingStudy
template_name = 'radiology/studies/imaging_study_list.html'
context_object_name = 'imaging_studies'
paginate_by = 25
def get_queryset(self):
queryset = ImagingStudy.objects.filter(tenant=self.request.user.tenant)
# Search functionality
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(study_id__icontains=search) |
Q(imaging_order__patient__first_name__icontains=search) |
Q(imaging_order__patient__last_name__icontains=search) |
Q(imaging_order__patient__mrn__icontains=search)
)
# Filter by status
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
# Filter by modality
modality = self.request.GET.get('modality')
if modality:
queryset = queryset.filter(imaging_order__modality=modality)
return queryset.select_related('imaging_order__patient').order_by('-study_datetime')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context.update({
'statuses': ImagingStudy._meta.get_field('status').choices,
'modalities': ImagingOrder._meta.get_field('modality').choices,
})
return context
class ImagingStudyDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about an imaging study.
"""
model = ImagingStudy
template_name = 'radiology/studies/imaging_study_detail.html'
context_object_name = 'study'
def get_queryset(self):
return ImagingStudy.objects.filter(tenant=self.request.user.tenant)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
imaging_study = self.object
# Get series for this study
context['series'] = imaging_study.series
# Get reports for this study
context['reports'] = imaging_study.report
# Get total image count
context['total_images'] = DICOMImage.objects.filter(
series__study=imaging_study,
series__study__tenant=self.request.user.tenant
).count()
return context
class ImagingStudyCreateView(LoginRequiredMixin, PermissionRequiredMixin, CreateView):
"""
Create a new imaging study.
"""
model = ImagingStudy
form_class = ImagingStudyForm
template_name = 'radiology/studies/imaging_study_form.html'
permission_required = 'radiology.add_imagingstudy'
success_url = reverse_lazy('radiology:imaging_study_list')
def form_valid(self, form):
form.instance.tenant = self.request.user.tenant
form.instance.technologist = self.request.user
response = super().form_valid(form)
# Log the action
AuditLogger.log_action(
user=self.request.user,
action='IMAGING_STUDY_CREATED',
model='ImagingStudy',
object_id=str(self.object.study_id),
details={
'patient_name': f"{self.object.order.patient.first_name} {self.object.order.patient.last_name}",
'modality': self.object.order.modality
}
)
messages.success(self.request, 'Imaging study created successfully.')
return response
class ImagingStudyUpdateView(LoginRequiredMixin, PermissionRequiredMixin, UpdateView):
"""
Update imaging study (limited to status and technical notes).
"""
model = ImagingStudy
fields = ['status', 'technical_notes'] # Restricted fields
template_name = 'radiology/studies/imaging_study_form.html'
permission_required = 'radiology.change_imagingstudy'
def get_queryset(self):
return ImagingStudy.objects.filter(tenant=self.request.user.tenant)
def get_success_url(self):
return reverse('radiology:imaging_study_detail', kwargs={'pk': self.object.pk})
def form_valid(self, form):
response = super().form_valid(form)
# Log the action
AuditLogger.log_action(
user=self.request.user,
action='IMAGING_STUDY_UPDATED',
model='ImagingStudy',
object_id=str(self.object.study_id),
details={
'patient_name': f"{self.object.order.patient.first_name} {self.object.order.patient.last_name}",
'changes': form.changed_data
}
)
messages.success(self.request, 'Imaging study updated successfully.')
return response
class ImagingSeriesListView(LoginRequiredMixin, ListView):
"""
List all imaging series with filtering and search.
"""
model = ImagingSeries
template_name = 'radiology/series/imaging_series_list.html'
context_object_name = 'series'
paginate_by = 25
def get_queryset(self):
queryset = ImagingSeries.objects.filter(study__tenant=self.request.user.tenant)
# Filter by study
study_id = self.request.GET.get('study')
if study_id:
queryset = queryset.filter(study_id=study_id)
# Search functionality
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(series_description__icontains=search) |
Q(study__imaging_order__patient__first_name__icontains=search) |
Q(study__imaging_order__patient__last_name__icontains=search)
)
return queryset.select_related('study__imaging_order__patient').order_by('-created_at')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context.update({
'studies': ImagingStudy.objects.filter(
tenant=self.request.user.tenant
).select_related('imaging_order__patient').order_by('-study_datetime')[:50],
})
return context
class ImagingSeriesDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about an imaging series.
"""
model = ImagingSeries
template_name = 'radiology/series/imaging_series_detail.html'
context_object_name = 'series'
def get_queryset(self):
return ImagingSeries.objects.filter(study__tenant=self.request.user.tenant)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
imaging_series = self.object
# Get images for this series
context['images'] = imaging_series.images.all().order_by('instance_number')
return context
class ImagingSeriesCreateView(LoginRequiredMixin, PermissionRequiredMixin, CreateView):
"""
Create a new imaging series.
"""
model = ImagingSeries
form_class = ImagingSeriesForm
template_name = 'radiology/series/imaging_series_form.html'
permission_required = 'radiology.add_imagingseries'
success_url = reverse_lazy('radiology:imaging_series_list')
def form_valid(self, form):
form.instance.tenant = self.request.user.tenant
response = super().form_valid(form)
# Log the action
AuditLogger.log_action(
user=self.request.user,
action='IMAGING_SERIES_CREATED',
model='ImagingSeries',
object_id=str(self.object.series_id),
details={
'series_description': self.object.series_description,
'study_id': str(self.object.study.study_id)
}
)
messages.success(self.request, 'Imaging series created successfully.')
return response
class DICOMImageListView(LoginRequiredMixin, ListView):
"""
List all DICOM images with filtering and search.
"""
model = DICOMImage
template_name = 'radiology/dicom/dicom_file_list.html'
context_object_name = 'dicom_images'
paginate_by = 50
def get_queryset(self):
queryset = DICOMImage.objects.filter(series__study__tenant=self.request.user.tenant)
# Filter by series
series_id = self.request.GET.get('series')
if series_id:
queryset = queryset.filter(series_id=series_id)
# Filter by study
study_id = self.request.GET.get('study')
if study_id:
queryset = queryset.filter(series__study_id=study_id)
return queryset.select_related('series__study__imaging_order__patient').order_by('instance_number')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context.update({
'series': ImagingSeries.objects.filter(
study__tenant=self.request.user.tenant
).select_related('study__order__patient').order_by('-created_at')[:50],
})
return context
class DICOMImageDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about a DICOM image.
"""
model = DICOMImage
template_name = 'radiology/dicom_image_detail.html'
context_object_name = 'dicom_image'
def get_queryset(self):
return DICOMImage.objects.filter(tenant=self.request.user.tenant)
class RadiologyReportListView(LoginRequiredMixin, ListView):
"""
List all radiology reports with filtering and search.
"""
model = RadiologyReport
template_name = 'radiology/reports/radiology_report_list.html'
context_object_name = 'radiology_reports'
paginate_by = 25
def get_queryset(self):
queryset = RadiologyReport.objects.filter(study__tenant=self.request.user.tenant)
# Search functionality
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(study__order__patient__first_name__icontains=search) |
Q(study__order__patient__last_name__icontains=search) |
Q(study__order__patient__mrn__icontains=search) |
Q(findings__icontains=search) |
Q(impression__icontains=search)
)
# Filter by status
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
# Filter by critical findings
critical_only = self.request.GET.get('critical_only')
if critical_only:
queryset = queryset.filter(critical_finding=True)
# Filter by radiologist
radiologist_id = self.request.GET.get('radiologist')
if radiologist_id:
queryset = queryset.filter(radiologist_id=radiologist_id)
return queryset.select_related(
'study__imaging_order__patient', 'radiologist', 'template_used'
).order_by('-created_at')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context.update({
'statuses': RadiologyReport._meta.get_field('status').choices,
})
return context
class RadiologyReportDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about a radiology report.
"""
model = RadiologyReport
template_name = 'radiology/reports/radiology_report_detail.html'
context_object_name = 'report'
def get_queryset(self):
return RadiologyReport.objects.filter(study__tenant=self.request.user.tenant)
class RadiologyReportCreateView(LoginRequiredMixin, PermissionRequiredMixin, CreateView):
"""
Create a new radiology report.
"""
model = RadiologyReport
form_class = RadiologyReportForm
template_name = 'radiology/reports/radiology_report_form.html'
permission_required = 'radiology.add_radiologyreport'
success_url = reverse_lazy('radiology:radiology_report_list')
def form_valid(self, form):
form.instance.tenant = self.request.user.tenant
form.instance.radiologist = self.request.user
response = super().form_valid(form)
# Log the action
AuditLogger.log_event(
user=self.request.user,
action='RADIOLOGY_REPORT_CREATED',
model='RadiologyReport',
object_id=str(self.object.report_id),
details={
'patient_name': f"{self.object.study.order.patient.first_name} {self.object.study.order.patient.last_name}",
'critical_finding': self.object.critical_finding
}
)
messages.success(self.request, 'Radiology report created successfully.')
return response
@login_required
def radiology_stats(request):
"""
HTMX endpoint for radiology statistics.
"""
tenant = request.user.tenant
today = timezone.now().date()
stats = {
'pending_orders': ImagingOrder.objects.filter(
tenant=tenant,
status='PENDING'
).count(),
'scheduled_studies': ImagingStudy.objects.filter(
tenant=tenant,
status='SCHEDULED'
).count(),
'in_progress_studies': ImagingStudy.objects.filter(
tenant=tenant,
status='IN_PROGRESS'
).count(),
'reports_pending': RadiologyReport.objects.filter(
study__tenant=tenant,
status='DRAFT'
).count(),
'critical_findings': RadiologyReport.objects.filter(
study__tenant=tenant,
critical_finding=True,
status='SIGNED',
finalized_datetime__date=today
).count(),
}
return render(request, 'radiology/partials/radiology_stats.html', {'stats': stats})
@login_required
def order_search(request):
"""
HTMX endpoint for imaging order search.
"""
search = request.GET.get('search', '')
status = request.GET.get('status', '')
modality = request.GET.get('modality', '')
queryset = ImagingOrder.objects.filter(tenant=request.user.tenant)
if search:
queryset = queryset.filter(
Q(patient__first_name__icontains=search) |
Q(patient__last_name__icontains=search) |
Q(patient__mrn__icontains=search) |
Q(study_description__icontains=search)
)
if status:
queryset = queryset.filter(status=status)
if modality:
queryset = queryset.filter(modality=modality)
orders = queryset.select_related(
'patient', 'ordering_provider'
).order_by('-order_datetime')[:20]
return render(request, 'radiology/partials/order_list.html', {'orders': orders})
@login_required
def schedule_study(request, order_id):
"""
Schedule an imaging study for an order.
"""
if request.method == 'POST':
order = get_object_or_404(
ImagingOrder,
id=order_id,
tenant=request.user.tenant
)
scheduled_datetime = request.POST.get('scheduled_datetime')
if scheduled_datetime:
scheduled_datetime = timezone.datetime.fromisoformat(scheduled_datetime)
# Create or update study
study, created = ImagingStudy.objects.get_or_create(
order=order,
tenant=request.user.tenant,
defaults={
'study_datetime': scheduled_datetime,
'status': 'SCHEDULED',
'technologist': request.user
}
)
if not created:
study.study_datetime = scheduled_datetime
study.status = 'SCHEDULED'
study.save()
# Update order status
order.status = 'SCHEDULED'
order.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='STUDY_SCHEDULED',
model='ImagingOrder',
object_id=str(order.order_id),
details={
'patient_name': f"{order.patient.first_name} {order.patient.last_name}",
'scheduled_time': scheduled_datetime.isoformat()
}
)
messages.success(request, 'Study scheduled successfully.')
if request.headers.get('HX-Request'):
return render(request, 'radiology/partials/order_status.html', {'order': order})
return redirect('radiology:imaging_order_detail', pk=order.pk)
return JsonResponse({'success': False})
@login_required
def start_study(request, study_id):
"""
Start an imaging study.
"""
if request.method == 'POST':
study = get_object_or_404(
ImagingStudy,
id=study_id,
tenant=request.user.tenant
)
study.status = 'IN_PROGRESS'
study.started_datetime = timezone.now()
study.technologist = request.user
study.save()
# Update order status
study.order.status = 'IN_PROGRESS'
study.order.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='STUDY_STARTED',
model='ImagingStudy',
object_id=str(study.study_id),
details={
'patient_name': f"{study.order.patient.first_name} {study.order.patient.last_name}",
'modality': study.order.modality
}
)
messages.success(request, 'Study started successfully.')
if request.headers.get('HX-Request'):
return render(request, 'radiology/partials/study_status.html', {'study': study})
return redirect('radiology:imaging_study_detail', pk=study.pk)
return JsonResponse({'success': False})
@login_required
def complete_study(request, study_id):
"""
Complete an imaging study.
"""
if request.method == 'POST':
study = get_object_or_404(
ImagingStudy,
id=study_id,
tenant=request.user.tenant
)
study.status = 'COMPLETED'
study.completed_datetime = timezone.now()
study.save()
# Update order status
study.order.status = 'COMPLETED'
study.order.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='STUDY_COMPLETED',
model='ImagingStudy',
object_id=str(study.study_id),
details={
'patient_name': f"{study.order.patient.first_name} {study.order.patient.last_name}",
'modality': study.order.modality
}
)
messages.success(request, 'Study completed successfully.')
if request.headers.get('HX-Request'):
return render(request, 'radiology/partials/study_status.html', {'study': study})
return redirect('radiology:imaging_study_detail', pk=study.pk)
return JsonResponse({'success': False})
@login_required
def sign_report(request, report_id):
"""
Sign a radiology report.
"""
if request.method == 'POST':
report = get_object_or_404(
RadiologyReport,
id=report_id,
tenant=request.user.tenant
)
# Only allow signing if report is in draft status
if report.status != 'DRAFT':
messages.error(request, 'Only draft reports can be signed.')
return redirect('radiology:radiology_report_detail', pk=report.pk)
report.status = 'SIGNED'
report.signed_datetime = timezone.now()
report.signed_by = request.user
report.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='REPORT_SIGNED',
model='RadiologyReport',
object_id=str(report.report_id),
details={
'patient_name': f"{report.study.order.patient.first_name} {report.study.order.patient.last_name}",
'critical_finding': report.critical_finding
}
)
messages.success(request, 'Report signed successfully.')
if request.headers.get('HX-Request'):
return render(request, 'radiology/partials/report_status.html', {'report': report})
return redirect('radiology:radiology_report_detail', pk=report.pk)
return JsonResponse({'success': False})
# DICOM Image Viewing Views
class DICOMImageViewerView(LoginRequiredMixin, DetailView):
"""
DICOM image viewer with basic viewing capabilities.
"""
model = DICOMImage
template_name = 'radiology/dicom/dicom_viewer.html'
context_object_name = 'dicom_image'
def get_queryset(self):
return DICOMImage.objects.filter(series__study__tenant=self.request.user.tenant)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
dicom_image = self.object
# Get all images in the same series for navigation
series_images = DICOMImage.objects.filter(
series=dicom_image.series
).order_by('instance_number')
context['series_images'] = series_images
context['current_index'] = list(series_images).index(dicom_image)
context['total_images'] = series_images.count()
# Get previous and next images
current_index = context['current_index']
if current_index > 0:
context['previous_image'] = series_images[current_index - 1]
if current_index < context['total_images'] - 1:
context['next_image'] = series_images[current_index + 1]
# Get DICOM metadata if file exists
if dicom_image.has_dicom_file():
context['dicom_metadata'] = dicom_image.get_dicom_metadata()
return context
class DICOMSeriesViewerView(LoginRequiredMixin, DetailView):
"""
DICOM series viewer showing all images in a series.
"""
model = ImagingSeries
template_name = 'radiology/dicom/series_viewer.html'
context_object_name = 'series'
def get_queryset(self):
return ImagingSeries.objects.filter(study__tenant=self.request.user.tenant)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
series = self.object
# Get all images in this series
images = DICOMImage.objects.filter(
series=series
).order_by('instance_number')
context['images'] = images
context['image_count'] = images.count()
# Get first image for initial display
if images.exists():
context['first_image'] = images.first()
return context
@login_required
def dicom_image_file(request, image_id):
"""
Serve DICOM image file for viewing.
"""
dicom_image = get_object_or_404(
DICOMImage,
image_id=image_id,
series__study__tenant=request.user.tenant
)
if not dicom_image.has_dicom_file():
raise Http404("DICOM file not found")
# Log access for audit
AuditLogger.log_event(
tenant=request.user.tenant,
event_type='READ',
event_category='DATA_ACCESS',
action='DICOM_IMAGE_ACCESSED',
description=f'Dicom image accessed',
user=request.user,
content_object=dicom_image,
request=request
)
# Serve the file
try:
response = FileResponse(
open(dicom_image.file_path, 'rb'),
content_type='application/dicom'
)
response['Content-Disposition'] = f'inline; filename="{dicom_image.sop_instance_uid}.dcm"'
return response
except FileNotFoundError:
raise Http404("DICOM file not found on disk")
@login_required
def dicom_image_thumbnail(request, image_id):
"""
Generate and serve thumbnail for DICOM image.
"""
dicom_image = get_object_or_404(
DICOMImage,
image_id=image_id,
series__study__tenant=request.user.tenant
)
if not dicom_image.has_dicom_file():
raise Http404("DICOM file not found")
try:
from pydicom import dcmread
import io
from PIL import Image
# Read DICOM file
ds = dcmread(dicom_image.file_path)
# Get pixel data
pixel_array = ds.pixel_array
# Normalize to 8-bit for thumbnail
if pixel_array.dtype != np.uint8:
# Apply window/level if available
if hasattr(ds, 'WindowCenter') and hasattr(ds, 'WindowWidth'):
window_center = float(ds.WindowCenter)
window_width = float(ds.WindowWidth)
# Apply windowing
img_min = window_center - window_width // 2
img_max = window_center + window_width // 2
pixel_array = np.clip(pixel_array, img_min, img_max)
# Normalize to 0-255
pixel_array = ((pixel_array - img_min) / (img_max - img_min) * 255).astype(np.uint8)
else:
# Simple normalization
pixel_array = ((pixel_array - pixel_array.min()) /
(pixel_array.max() - pixel_array.min()) * 255).astype(np.uint8)
# Create PIL Image
pil_image = Image.fromarray(pixel_array)
# Create thumbnail
thumbnail_size = (200, 200)
pil_image.thumbnail(thumbnail_size, Image.Resampling.LANCZOS)
# Save to bytes
img_io = io.BytesIO()
pil_image.save(img_io, format='PNG')
img_io.seek(0)
response = HttpResponse(img_io.getvalue(), content_type='image/png')
response['Cache-Control'] = 'max-age=3600' # Cache for 1 hour
return response
except Exception as e:
# Return a placeholder image or error
raise Http404(f"Could not generate thumbnail: {str(e)}")
@login_required
def dicom_image_preview(request, image_id):
"""
Generate and serve preview image for DICOM.
"""
dicom_image = get_object_or_404(
DICOMImage,
image_id=image_id,
series__study__tenant=request.user.tenant
)
if not dicom_image.has_dicom_file():
raise Http404("DICOM file not found")
try:
from pydicom import dcmread
import io
from PIL import Image
# Read DICOM file
ds = dcmread(dicom_image.file_path)
# Get pixel data
pixel_array = ds.pixel_array
# Normalize to 8-bit for preview
if pixel_array.dtype != np.uint8:
# Apply window/level if available
if hasattr(ds, 'WindowCenter') and hasattr(ds, 'WindowWidth'):
window_center = float(ds.WindowCenter)
window_width = float(ds.WindowWidth)
# Apply windowing
img_min = window_center - window_width // 2
img_max = window_center + window_width // 2
pixel_array = np.clip(pixel_array, img_min, img_max)
# Normalize to 0-255
pixel_array = ((pixel_array - img_min) / (img_max - img_min) * 255).astype(np.uint8)
else:
# Simple normalization
pixel_array = ((pixel_array - pixel_array.min()) /
(pixel_array.max() - pixel_array.min()) * 255).astype(np.uint8)
# Create PIL Image
pil_image = Image.fromarray(pixel_array)
# Resize for preview (max 800x800)
max_size = (800, 800)
pil_image.thumbnail(max_size, Image.Resampling.LANCZOS)
# Save to bytes
img_io = io.BytesIO()
pil_image.save(img_io, format='PNG')
img_io.seek(0)
response = HttpResponse(img_io.getvalue(), content_type='image/png')
response['Cache-Control'] = 'max-age=1800' # Cache for 30 minutes
return response
except Exception as e:
# Return a placeholder image or error
raise Http404(f"Could not generate preview: {str(e)}")
@login_required
def dicom_metadata_json(request, image_id):
"""
Return DICOM metadata as JSON.
"""
dicom_image = get_object_or_404(
DICOMImage,
image_id=image_id,
series__study__tenant=request.user.tenant
)
if not dicom_image.has_dicom_file():
return JsonResponse({'error': 'DICOM file not found'}, status=404)
try:
from pydicom import dcmread
# Read DICOM file
ds = dcmread(dicom_image.file_path)
# Extract key metadata
metadata = {
'patient_name': str(ds.get('PatientName', '')),
'patient_id': str(ds.get('PatientID', '')),
'study_date': str(ds.get('StudyDate', '')),
'study_time': str(ds.get('StudyTime', '')),
'modality': str(ds.get('Modality', '')),
'study_description': str(ds.get('StudyDescription', '')),
'series_description': str(ds.get('SeriesDescription', '')),
'instance_number': str(ds.get('InstanceNumber', '')),
'rows': int(ds.get('Rows', 0)),
'columns': int(ds.get('Columns', 0)),
'bits_allocated': int(ds.get('BitsAllocated', 0)),
'bits_stored': int(ds.get('BitsStored', 0)),
'pixel_spacing': str(ds.get('PixelSpacing', '')),
'slice_thickness': str(ds.get('SliceThickness', '')),
'window_center': str(ds.get('WindowCenter', '')),
'window_width': str(ds.get('WindowWidth', '')),
'manufacturer': str(ds.get('Manufacturer', '')),
'model_name': str(ds.get('ManufacturerModelName', '')),
'sop_instance_uid': str(ds.get('SOPInstanceUID', '')),
'study_instance_uid': str(ds.get('StudyInstanceUID', '')),
'series_instance_uid': str(ds.get('SeriesInstanceUID', '')),
}
return JsonResponse(metadata)
except Exception as e:
return JsonResponse({'error': f'Could not read DICOM metadata: {str(e)}'}, status=500)
@login_required
def series_images_json(request, series_id):
"""
Return list of images in a series as JSON for viewer navigation.
"""
series = get_object_or_404(
ImagingSeries,
series_id=series_id,
study__tenant=request.user.tenant
)
images = DICOMImage.objects.filter(
series=series
).order_by('instance_number')
images_data = []
for image in images:
images_data.append({
'id': str(image.image_id),
'instance_number': image.instance_number,
'sop_instance_uid': image.sop_instance_uid,
'has_file': image.has_dicom_file(),
'file_size_mb': image.file_size_mb,
'thumbnail_url': reverse('radiology:dicom_image_thumbnail', kwargs={'image_id': image.image_id}),
'preview_url': reverse('radiology:dicom_image_preview', kwargs={'image_id': image.image_id}),
'viewer_url': reverse('radiology:dicom_image_viewer', kwargs={'pk': image.pk}),
})
return JsonResponse({
'series_id': str(series.series_id),
'series_description': series.series_description,
'modality': series.modality,
'image_count': len(images_data),
'images': images_data
})
@login_required
def study_series_json(request, study_id):
"""
Return list of series in a study as JSON.
"""
study = get_object_or_404(
ImagingStudy,
study_id=study_id,
tenant=request.user.tenant
)
series_list = ImagingSeries.objects.filter(
study=study
).order_by('series_number')
series_data = []
for series in series_list:
image_count = DICOMImage.objects.filter(series=series).count()
series_data.append({
'id': str(series.series_id),
'series_number': series.series_number,
'series_description': series.series_description,
'modality': series.modality,
'image_count': image_count,
'viewer_url': reverse('radiology:dicom_series_viewer', kwargs={'pk': series.pk}),
})
return JsonResponse({
'study_id': str(study.study_id),
'study_description': study.study_description,
'accession_number': study.accession_number,
'series_count': len(series_data),
'series': series_data
})
@login_required
def dictate_report(request, study_id):
"""
Start dictating a report for a study.
"""
if request.method == 'POST':
study = get_object_or_404(
ImagingStudy,
id=study_id,
tenant=request.user.tenant
)
template_id = request.POST.get('template_id')
template = None
if template_id:
template = get_object_or_404(
ReportTemplate,
id=template_id,
tenant=request.user.tenant
)
# Create report if it doesn't exist
report, created = RadiologyReport.objects.get_or_create(
study=study,
tenant=request.user.tenant,
defaults={
'radiologist': request.user,
'template': template,
'status': 'DRAFT'
}
)
if created:
# Log the action
AuditLogger.log_action(
user=request.user,
action='REPORT_DICTATION_STARTED',
model='RadiologyReport',
object_id=str(report.report_id),
details={
'patient_name': f"{study.order.patient.first_name} {study.order.patient.last_name}",
'template_used': template.template_name if template else None
}
)
messages.success(request, 'Report dictation started.')
return redirect('radiology:radiology_report_detail', pk=report.pk)
return JsonResponse({'success': False})