2025-10-03 20:11:25 +03:00

2223 lines
77 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Views for EMR app.
"""
from django.shortcuts import render, get_object_or_404, redirect
from django.contrib.auth.decorators import login_required
from django.contrib.auth.mixins import LoginRequiredMixin
from django.urls import reverse_lazy, reverse
from django.views.generic import ListView, DetailView, CreateView, UpdateView, DeleteView, TemplateView
from django.http import JsonResponse, HttpResponse, Http404
from django.utils.translation import gettext_lazy as _
from django.utils import timezone
from django.contrib import messages
from django.core.paginator import Paginator
from django.template.loader import render_to_string
from datetime import datetime, timedelta, date
import json
from django.contrib.messages.views import SuccessMessageMixin
from core.models import AuditLogEntry
from core.utils import AuditLogger
from patients.models import PatientProfile
from .models import *
from .forms import *
from core.mixins import TenantMixin, FormKwargsMixin
from django.utils.dateformat import format as dj_format
from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
from django.db.models import *
from pharmacy.models import *
from quality.models import *
from django.contrib.contenttypes.models import ContentType
class EMRDashboardView(LoginRequiredMixin, TemplateView):
"""
Main dashboard for EMR management.
"""
template_name = 'emr/dashboard.html'
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
tenant = self.request.user.tenant
today = timezone.now().date()
# Use model managers for optimized queries
encounters_manager = Encounter.objects
# Dashboard statistics with optimized queries
context.update({
'recent_encounters': encounters_manager.filter(
tenant=tenant
).select_related('patient', 'provider').order_by('-start_datetime')[:10],
'total_encounters': encounters_manager.filter(tenant=tenant).count(),
'active_encounters': encounters_manager.active_encounters(tenant).count(),
'todays_encounters': encounters_manager.todays_encounters(tenant).count(),
'pending_documentation': encounters_manager.unsigned_encounters(tenant).count(),
'unsigned_notes': ClinicalNote.objects.filter(
patient__tenant=tenant,
status='COMPLETED',
electronically_signed=False
).count(),
'active_problems': ProblemList.objects.filter(
tenant=tenant,
status='ACTIVE'
).count(),
'active_care_plans': CarePlan.objects.filter(
tenant=tenant,
status='ACTIVE'
).count(),
'critical_vitals': VitalSigns.objects.filter(
patient__tenant=tenant,
measured_datetime__date=today
).exclude(critical_values=[]).count(),
})
return context
class EncounterListView(LoginRequiredMixin, ListView):
"""
List view for encounters.
"""
model = Encounter
template_name = 'emr/encounters/encounter_list.html'
context_object_name = 'encounters'
paginate_by = 25
def get_queryset(self):
queryset = Encounter.objects.filter(tenant=self.request.user.tenant)
# Search functionality
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(patient__first_name__icontains=search) |
Q(patient__last_name__icontains=search) |
Q(patient__mrn__icontains=search) |
Q(provider__first_name__icontains=search) |
Q(provider__last_name__icontains=search) |
Q(chief_complaint__icontains=search)
)
# Filter by encounter type
encounter_type = self.request.GET.get('encounter_type')
if encounter_type:
queryset = queryset.filter(encounter_type=encounter_type)
# Filter by status
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
# Filter by provider
provider_id = self.request.GET.get('provider')
if provider_id:
queryset = queryset.filter(provider_id=provider_id)
# Filter by date range
date_from = self.request.GET.get('date_from')
date_to = self.request.GET.get('date_to')
if date_from:
queryset = queryset.filter(start_datetime__date__gte=date_from)
if date_to:
queryset = queryset.filter(start_datetime__date__lte=date_to)
return queryset.select_related('patient', 'provider').order_by('-start_datetime')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context.update({
'encounter_types': Encounter._meta.get_field('encounter_type').choices,
'encounter_statuses': Encounter._meta.get_field('status').choices,
})
return context
class EncounterCreateView(LoginRequiredMixin, CreateView):
"""
Create view for encounters with proper validation and error handling.
"""
model = Encounter
form_class = EncounterForm
template_name = 'emr/encounters/encounter_create.html'
success_message = _('Encounter for %(patient)s created successfully.')
def dispatch(self, request, *args, **kwargs):
"""Check permissions before allowing access."""
if not request.user.has_perm('emr.add_encounter'):
messages.error(request, _('You do not have permission to create encounters.'))
return redirect('emr:encounter_list')
return super().dispatch(request, *args, **kwargs)
def get_form_kwargs(self):
"""Pass additional context to form."""
kwargs = super().get_form_kwargs()
kwargs['tenant'] = self.request.user.tenant
kwargs['user'] = self.request.user
return kwargs
def form_valid(self, form):
"""Handle successful form submission."""
try:
form.instance.tenant = self.request.user.tenant
form.instance.created_by = self.request.user
# Validate business rules
if form.instance.end_datetime and form.instance.end_datetime <= form.instance.start_datetime:
messages.error(self.request, _('End date/time must be after start date/time.'))
return self.form_invalid(form)
response = super().form_valid(form)
# Log successful creation
try:
from django.contrib.contenttypes.models import ContentType
AuditLogEntry.objects.create(
tenant=self.request.user.tenant,
user=self.request.user,
event_type='CREATE',
event_category='CLINICAL_DATA',
action='Create Encounter',
description=f'Encounter created for {self.object.patient.get_full_name()}',
content_type=ContentType.objects.get_for_model(Encounter),
object_id=self.object.pk,
object_repr=str(self.object),
patient_id=str(self.object.patient.patient_id),
patient_mrn=self.object.patient.mrn,
changes={'encounter_type': self.object.encounter_type, 'status': self.object.status}
)
except Exception as e:
# Log audit failure but don't block user flow
print(f"Audit logging failed: {e}")
messages.success(self.request, self.success_message % {'patient': self.object.patient.get_full_name()})
return response
except Exception as e:
messages.error(self.request, _('An error occurred while creating the encounter. Please try again.'))
return self.form_invalid(form)
def form_invalid(self, form):
"""Handle form validation errors."""
for field, errors in form.errors.items():
for error in errors:
messages.error(self.request, f"{field.title()}: {error}")
return super().form_invalid(form)
def get_success_url(self):
return reverse_lazy('emr:encounter_detail', kwargs={'pk': self.object.pk})
def get_context_data(self, **kwargs):
"""Add additional context for template rendering."""
context = super().get_context_data(**kwargs)
context['patient_id'] = self.request.GET.get('patient')
return context
class EncounterDetailView(LoginRequiredMixin, DetailView):
"""
Detail view for encounter with optimized queries.
"""
model = Encounter
template_name = 'emr/encounters/encounter_detail.html'
context_object_name = 'encounter'
def get_queryset(self):
tenant = self.request.user.tenant
return Encounter.objects.filter(tenant=tenant).select_related(
'patient', 'provider', 'appointment', 'admission',
).prefetch_related(
'vital_signs__measured_by',
'clinical_notes__author',
'problems_identified__diagnosing_provider',
'problems_identified__care_plans',
'problems_identified',
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
encounter = self.object
# Use model properties and optimized queries
context.update({
'vital_signs': encounter.vital_signs.all().order_by('-measured_datetime'),
'clinical_notes': encounter.clinical_notes.all().order_by('-note_datetime'),
'problems': encounter.problems_identified.all().order_by('-created_at'),
# 'can_edit': encounter.can_be_edited,
'duration_display': encounter.duration,
'status_color': encounter.get_status_color(),
})
# Add related appointments
context['related_appointments'] = encounter.patient.appointment_requests.all().select_related('provider').order_by('-scheduled_datetime')[:10]
# Add related laboratory orders and results
context['lab_orders'] = encounter.patient.lab_orders.all().select_related('ordering_provider').prefetch_related('tests', 'results__test').order_by('-order_datetime')[:10]
# Add related imaging orders and studies
context['imaging_orders'] = encounter.patient.imaging_orders.all().select_related('ordering_provider').order_by('-order_datetime')[:10]
context['imaging_studies'] = encounter.patient.imaging_studies.all().select_related('referring_physician', 'radiologist').order_by('-study_datetime')[:10]
# Add related billing information
context['medical_bills'] = encounter.patient.medical_bills.all().select_related('primary_insurance', 'secondary_insurance').order_by('-bill_date')[:10]
return context
class VitalSignsListView(LoginRequiredMixin, ListView):
"""
List view for vital signs.
"""
model = VitalSigns
template_name = 'emr/vital_signs_list.html'
context_object_name = 'vital_signs'
paginate_by = 25
def get_queryset(self):
tenant = self.request.user.tenant
queryset = VitalSigns.objects.filter(patient__tenant=tenant)
# Filter by patient
patient_id = self.request.GET.get('patient')
if patient_id:
queryset = queryset.filter(patient_id=patient_id)
# Filter by encounter
encounter_id = self.request.GET.get('encounter')
if encounter_id:
queryset = queryset.filter(encounter_id=encounter_id)
# Filter by date range
date_from = self.request.GET.get('date_from')
date_to = self.request.GET.get('date_to')
if date_from:
queryset = queryset.filter(measured_datetime__date__gte=date_from)
if date_to:
queryset = queryset.filter(measured_datetime__date__lte=date_to)
# Search functionality
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(patient__first_name__icontains=search) |
Q(patient__last_name__icontains=search) |
Q(patient__mrn__icontains=search)
)
return queryset.select_related('patient', 'encounter', 'measured_by').order_by('-measured_datetime')
class ProblemListView(LoginRequiredMixin, ListView):
"""
List view for problem list with optimized queries.
"""
model = ProblemList
template_name = 'emr/problems/problem_list.html'
context_object_name = 'problems'
paginate_by = 25
def get_queryset(self):
tenant = self.request.user.tenant
queryset = ProblemList.objects.filter(tenant=tenant)
# Filter by patient
patient_id = self.request.GET.get('patient')
if patient_id:
queryset = queryset.filter(patient_id=patient_id)
# Filter by status
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
# Filter by problem type
problem_type = self.request.GET.get('problem_type')
if problem_type:
queryset = queryset.filter(problem_type=problem_type)
# Filter by priority
priority = self.request.GET.get('priority')
if priority:
queryset = queryset.filter(priority=priority)
# Search functionality
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(patient__first_name__icontains=search) |
Q(patient__last_name__icontains=search) |
Q(patient__mrn__icontains=search) |
Q(problem_name__icontains=search) |
Q(problem_code__icontains=search)
)
return queryset.select_related(
'patient', 'diagnosing_provider', 'managing_provider'
).prefetch_related(
'care_plans__primary_provider'
).order_by('-created_at')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
# Use model properties for choices
context.update({
'problem_types': ProblemList._meta.get_field('problem_type').choices,
'problem_statuses': ProblemList._meta.get_field('status').choices,
'priorities': ProblemList._meta.get_field('priority').choices,
'active_problems_count': sum(1 for p in context['problems'] if p.is_active),
'chronic_problems_count': sum(1 for p in context['problems'] if p.is_chronic),
})
return context
class CarePlanListView(LoginRequiredMixin, ListView):
"""
List view for care plans with optimized queries.
"""
model = CarePlan
template_name = 'emr/care_plans/care_plan_list.html'
context_object_name = 'care_plans'
paginate_by = 25
def get_queryset(self):
tenant = self.request.user.tenant
queryset = CarePlan.objects.filter(tenant=tenant)
# Filter by patient
patient_id = self.request.GET.get('patient')
if patient_id:
queryset = queryset.filter(patient_id=patient_id)
# Filter by status
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
# Filter by plan type
plan_type = self.request.GET.get('plan_type')
if plan_type:
queryset = queryset.filter(plan_type=plan_type)
# Filter by provider
provider_id = self.request.GET.get('provider')
if provider_id:
queryset = queryset.filter(primary_provider_id=provider_id)
# Search functionality
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(patient__first_name__icontains=search) |
Q(patient__last_name__icontains=search) |
Q(patient__mrn__icontains=search) |
Q(title__icontains=search) |
Q(description__icontains=search)
)
return queryset.select_related('patient', 'primary_provider').prefetch_related(
'care_team', 'related_problems'
).order_by('-created_at')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
# Use model properties for choices and statistics
context.update({
'plan_types': CarePlan._meta.get_field('plan_type').choices,
'plan_statuses': CarePlan._meta.get_field('status').choices,
'active_plans_count': sum(1 for cp in context['care_plans'] if cp.is_active),
'overdue_plans_count': sum(1 for cp in context['care_plans'] if cp.is_overdue),
'completed_plans_count': sum(1 for cp in context['care_plans'] if cp.status == 'COMPLETED'),
})
return context
class ClinicalNoteListView(LoginRequiredMixin, ListView):
"""
List view for clinical notes.
"""
model = ClinicalNote
template_name = 'emr/clinical_notes/clinical_note_list.html'
context_object_name = 'notes'
paginate_by = 25
def get_queryset(self):
tenant = self.request.user.tenant
queryset = ClinicalNote.objects.filter(patient__tenant=tenant)
# Filter by patient
patient_id = self.request.GET.get('patient')
if patient_id:
queryset = queryset.filter(patient_id=patient_id)
# Filter by note type
note_type = self.request.GET.get('note_type')
if note_type:
queryset = queryset.filter(note_type=note_type)
# Filter by status
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
# Filter by author
author_id = self.request.GET.get('author')
if author_id:
queryset = queryset.filter(author_id=author_id)
# Search functionality
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(patient__first_name__icontains=search) |
Q(patient__last_name__icontains=search) |
Q(patient__mrn__icontains=search) |
Q(title__icontains=search) |
Q(content__icontains=search)
)
return queryset.select_related('patient', 'encounter', 'author').order_by('-note_datetime')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context.update({
'note_types': ClinicalNote._meta.get_field('note_type').choices,
'note_statuses': ClinicalNote._meta.get_field('status').choices,
})
return context
class ClinicalNoteDetailView(LoginRequiredMixin, TenantMixin, DetailView):
model = ClinicalNote
template_name = 'emr/clinical_notes/clinical_note_detail.html'
context_object_name = 'clinical_note'
class ClinicalNoteCreateView(LoginRequiredMixin, CreateView):
model = ClinicalNote
form_class = ClinicalNoteForm
template_name = 'emr/clinical_notes/clinical_note_form.html'
success_message = _('Clinical note created successfully.')
def form_valid(self, form):
form.instance.author = self.request.user
response = super().form_valid(form)
from django.contrib.contenttypes.models import ContentType
AuditLogEntry.objects.create(
tenant=self.request.user.tenant,
user=self.request.user,
event_type='CREATE',
event_category='CLINICAL_DATA',
action='Create Clinical Note',
description=f'Clinical note created: {self.object.title}',
content_type=ContentType.objects.get_for_model(ClinicalNote),
object_id=self.object.pk,
object_repr=str(self.object),
patient_id=str(self.object.patient.patient_id),
patient_mrn=self.object.patient.mrn,
changes={'note_type': self.object.note_type, 'status': self.object.status}
)
return response
def get_success_url(self):
return reverse_lazy('emr:clinical_note_detail', kwargs={'pk': self.object.pk})
class ClinicalNoteUpdateView(LoginRequiredMixin,UpdateView):
model = ClinicalNote
form_class = ClinicalNoteForm
template_name = 'emr/clinical_notes/clinical_note_form.html'
success_message = _('Clinical note updated successfully.')
def dispatch(self, request, *args, **kwargs):
# Ensure tenant exists
self.tenant = request.user.tenant
if not self.tenant:
return JsonResponse({"error": "No tenant found"}, status=400)
return super().dispatch(request, *args, **kwargs)
def form_valid(self, form):
response = super().form_valid(form)
from django.contrib.contenttypes.models import ContentType
AuditLogEntry.objects.create(
tenant=self.request.user.tenant,
user=self.request.user,
event_type='UPDATE',
event_category='CLINICAL_DATA',
action='Update Clinical Note',
description=f'Clinical note updated: {self.object.title}',
content_type=ContentType.objects.get_for_model(ClinicalNote),
object_id=self.object.pk,
object_repr=str(self.object),
patient_id=str(self.object.patient.patient_id),
patient_mrn=self.object.patient.mrn,
changes={'status': self.object.status}
)
return response
def get_success_url(self):
return reverse_lazy('emr:clinical_note_detail', kwargs={'pk': self.object.pk})
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
encounter = get_object_or_404(Encounter, pk=self.kwargs['pk'], tenant=self.tenant)
ctx['patient'] = encounter.patient
ctx['encounter'] = encounter
ctx['author'] = self.request.user # or Employee profile if you have one
return ctx
class ClinicalNoteDeleteView(LoginRequiredMixin,SuccessMessageMixin, DeleteView):
model = ClinicalNote
template_name = 'emr/clinical_notes/clinical_note_confirm_delete.html'
success_url = reverse_lazy('emr:clinical_note_list')
success_message = _('Clinical note deleted successfully.')
def delete(self, request, *args, **kwargs):
cn = self.get_object()
from django.contrib.contenttypes.models import ContentType
AuditLogEntry.objects.create(
tenant=request.user.tenant,
user=request.user,
event_type='DELETE',
event_category='CLINICAL_DATA',
action='Delete Clinical Note',
description=f'Clinical note deleted: {cn.title}',
content_type=ContentType.objects.get_for_model(ClinicalNote),
object_id=cn.pk,
object_repr=str(cn),
patient_id=str(cn.patient.patient_id),
patient_mrn=cn.patient.mrn,
changes={'status': 'Clinical note deleted'}
)
messages.success(request, self.success_message)
return super().delete(request, *args, **kwargs)
class VitalSignsDetailView(LoginRequiredMixin, TenantMixin, DetailView):
model = VitalSigns
template_name = 'emr/vital_signs/vital_signs_detail.html'
context_object_name = 'vital_signs'
class VitalSignsCreateView(LoginRequiredMixin, CreateView):
model = VitalSigns
form_class = VitalSignsForm
template_name = 'emr/vital_signs/vital_signs_form.html'
success_message = _('Vital signs recorded successfully.')
def dispatch(self, request, *args, **kwargs):
# Ensure tenant exists
tenant = self.request.user.tenant
if not tenant:
return JsonResponse({"error": "No tenant found"}, status=400)
return super().dispatch(request, *args, **kwargs)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
# pass what the form actually expects
# kwargs['tenant'] = self.request.user.tenant
kwargs['user'] = self.request.user
# (Optional) initial values for fields that ARE in the form:
# kwargs.setdefault('initial', {})
# kwargs['initial'].setdefault('measured_datetime', timezone.now())
return kwargs
def form_valid(self, form):
# set server-controlled fields not present on the form
form.instance.measured_by = self.request.user
encounter = get_object_or_404(
Encounter,
pk=self.kwargs['pk'],
tenant=self.tenant
)
form.instance.encounter = encounter
form.instance.patient = encounter.patient
response = super().form_valid(form)
AuditLogEntry.objects.create(
tenant=self.tenant, # use the resolved tenant
user=self.request.user,
action='CREATE',
model_name='VitalSigns',
object_id=str(self.object.pk),
changes={'status': 'Vital signs recorded'}
)
return response
def get_success_url(self):
return reverse_lazy('emr:vital_signs_detail', kwargs={'pk': self.object.pk})
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
tenant = self.request.user.tenant
# encounter is coming from URL; fetch it with tenant check
encounter = get_object_or_404(
Encounter, pk=self.kwargs['pk'], tenant=tenant
)
ctx['patient'] = encounter.patient
ctx['encounter'] = encounter
ctx['measured_by'] = self.request.user # or Employee profile if you have one
return ctx
class ProblemListListView(LoginRequiredMixin, ListView):
model = ProblemList
template_name = 'emr/problems/problem_list.html'
context_object_name = 'problems'
paginate_by = 20
def get_queryset(self):
return super().get_queryset().select_related('patient','identified_by').order_by('-created_at')
class ProblemListDetailView(LoginRequiredMixin, TenantMixin, DetailView):
model = ProblemList
template_name = 'emr/problems/problem_detail.html'
context_object_name = 'problem'
class ProblemListCreateView(LoginRequiredMixin, CreateView):
model = ProblemList
form_class = ProblemListForm
template_name = 'emr/problems/problem_form.html'
success_message = _('Problem added successfully.')
def form_valid(self, form):
form.instance.identified_by = self.request.user
form.instance.tenant = self.request.user.tenant
response = super().form_valid(form)
from django.contrib.contenttypes.models import ContentType
AuditLogEntry.objects.create(
tenant=self.request.user.tenant,
user=self.request.user,
event_type='CREATE',
event_category='CLINICAL_DATA',
action='Create Problem',
description=f'Problem added: {self.object.problem_name}',
content_type=ContentType.objects.get_for_model(ProblemList),
object_id=self.object.pk,
object_repr=str(self.object),
patient_id=str(self.object.patient.patient_id),
patient_mrn=self.object.patient.mrn,
changes={'problem_type': self.object.problem_type, 'status': self.object.status}
)
return response
def get_success_url(self):
return reverse_lazy('emr:problem_detail', kwargs={'pk': self.object.pk})
class ProblemListUpdateView(LoginRequiredMixin, UpdateView):
model = ProblemList
form_class = ProblemListForm
template_name = 'emr/problems/problem_form.html'
success_message = _('Problem updated successfully.')
def form_valid(self, form):
response = super().form_valid(form)
from django.contrib.contenttypes.models import ContentType
AuditLogEntry.objects.create(
tenant=self.request.user.tenant,
user=self.request.user,
event_type='UPDATE',
event_category='CLINICAL_DATA',
action='Update Problem',
description=f'Problem updated: {self.object.problem_name}',
content_type=ContentType.objects.get_for_model(ProblemList),
object_id=self.object.pk,
object_repr=str(self.object),
patient_id=str(self.object.patient.patient_id),
patient_mrn=self.object.patient.mrn,
changes={'status': self.object.status}
)
return response
def get_success_url(self):
return reverse_lazy('emr:problem_detail', kwargs={'pk': self.object.pk})
class ProblemListDeleteView(LoginRequiredMixin, DeleteView):
model = ProblemList
template_name = 'emr/problems/problem_confirm_delete.html'
success_url = reverse_lazy('emr:problem_list')
success_message = _('Problem deleted successfully.')
def delete(self, request, *args, **kwargs):
prob = self.get_object()
AuditLogEntry.objects.create(
tenant=request.user.tenant,
user=request.user,
action='DELETE',
model_name='ProblemList',
object_id=str(prob.pk),
changes={'status': 'Problem deleted'}
)
messages.success(request, self.success_message)
return super().delete(request, *args, **kwargs)
#
class CarePlanDetailView(LoginRequiredMixin, DetailView):
model = CarePlan
template_name = 'emr/care_plans/care_plan_detail.html'
context_object_name = 'care_plan'
class CarePlanCreateView(LoginRequiredMixin, CreateView):
model = CarePlan
form_class = CarePlanForm
template_name = 'emr/care_plans/care_plan_form.html'
success_message = _('Care plan created successfully.')
def form_valid(self, form):
response = super().form_valid(form)
AuditLogEntry.objects.create(
tenant=self.request.user.tenant,
user=self.request.user,
action='CREATE',
model_name='CarePlan',
object_id=str(self.object.pk),
changes={'status': 'Care plan created'}
)
return response
def get_success_url(self):
return reverse_lazy('emr:care_plan_detail', kwargs={'pk': self.object.pk})
class CarePlanUpdateView(LoginRequiredMixin, UpdateView):
model = CarePlan
form_class = CarePlanForm
template_name = 'emr/care_plans/care_plan_form.html'
success_message = _('Care plan updated successfully.')
def form_valid(self, form):
response = super().form_valid(form)
AuditLogEntry.objects.create(
tenant=self.request.user.tenant,
user=self.request.user,
action='UPDATE',
model_name='CarePlan',
object_id=str(self.object.pk),
changes={'status': 'Care plan updated'}
)
return response
def get_success_url(self):
return reverse_lazy('emr:care_plan_detail', kwargs={'pk': self.object.pk})
class CarePlanProgressUpdateView(LoginRequiredMixin, UpdateView):
"""
Update only progress-related fields of a CarePlan.
"""
model = CarePlan
form_class = CarePlanProgressForm
template_name = 'emr/care_plans/care_plan_progress_form.html'
# permission_required = 'emr.change_careplan'
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['tenant'] = self.request.user.tenant
return kwargs
def get_queryset(self):
tenant = self.request.user.tenant
if not tenant:
return CarePlan.objects.none()
# Limit by tenant, and optionally eager-load relations used in your template
return CarePlan.objects.filter(tenant=tenant)
def form_valid(self, form):
# Auto-set last_reviewed to today if user left it blank
if not form.cleaned_data.get('last_reviewed'):
form.instance.last_reviewed = timezone.now().date()
# If user marks plan completed and end_date is empty, set end_date to today
if form.cleaned_data.get('status') == 'COMPLETED' and not form.instance.end_date:
form.instance.end_date = timezone.now().date()
response = super().form_valid(form)
try:
AuditLogger.log_event(
tenant=form.instance.tenant,
event_type='UPDATE',
event_category='CARE_PLAN',
action='Update Care Plan Progress',
description=f'Progress updated for care plan #{self.object.pk} ({self.object.title})',
user=self.request.user,
content_object=self.object,
request=self.request,
)
except Exception:
# Dont block user flow if logging fails
pass
messages.success(self.request, 'Care plan progress updated successfully.')
return response
def get_success_url(self):
return reverse('emr:care_plan_detail', args=[self.object.pk])
class CarePlanDeleteView(LoginRequiredMixin, DeleteView):
model = CarePlan
template_name = 'emr/care_plans/care_plan_confirm_delete.html'
success_url = reverse_lazy('emr:care_plan_list')
success_message = _('Care plan deleted successfully.')
def delete(self, request, *args, **kwargs):
cp = self.get_object()
AuditLogEntry.objects.create(
tenant=request.user.tenant,
user=request.user,
action='DELETE',
model_name='CarePlan',
object_id=str(cp.pk),
changes={'status': 'Care plan deleted'}
)
messages.success(request, self.success_message)
return super().delete(request, *args, **kwargs)
class NoteTemplateListView(LoginRequiredMixin, TenantMixin, ListView):
model = NoteTemplate
template_name = 'emr/templates/note_template_list.html'
context_object_name = 'templates'
paginate_by = 20
class NoteTemplateDetailView(LoginRequiredMixin, TenantMixin, DetailView):
model = NoteTemplate
template_name = 'emr/templates/note_template_detail.html'
context_object_name = 'template'
def context_data(self, **kwargs):
tenant = self.request.user.tenant
ctx = NoteTemplate.objects.filter(tenant=tenant).order_by('-created_at')
return ctx
class NoteTemplateCreateView(LoginRequiredMixin, CreateView):
model = NoteTemplate
form_class = NoteTemplateForm
template_name = 'emr/templates/note_template_form.html'
success_message = _('Note template created successfully.')
def form_valid(self, form):
form.instance.tenant = self.request.user.tenant
form.instance.created_by = self.request.user
response = super().form_valid(form)
AuditLogEntry.objects.create(
tenant=self.request.user.tenant,
user=self.request.user,
action='CREATE',
model_name='NoteTemplate',
object_id=str(self.object.pk),
changes={'status': 'Template created'}
)
return response
def get_success_url(self):
return reverse_lazy('emr:note_template_detail', kwargs={'pk': self.object.pk})
class NoteTemplateUpdateView(LoginRequiredMixin, UpdateView):
model = NoteTemplate
form_class = NoteTemplateForm
template_name = 'emr/templates/note_template_form.html'
success_message = _('Note template updated successfully.')
def form_valid(self, form):
response = super().form_valid(form)
AuditLogEntry.objects.create(
tenant=self.request.user.tenant,
user=self.request.user,
action='UPDATE',
model_name='NoteTemplate',
object_id=str(self.object.pk),
changes={'status': 'Template updated'}
)
return response
def get_success_url(self):
return reverse_lazy('emr:note_template_detail', kwargs={'pk': self.object.pk})
class NoteTemplateDeleteView(LoginRequiredMixin, DeleteView):
model = NoteTemplate
template_name = 'emr/note_template_confirm_delete.html'
success_url = reverse_lazy('emr:note_template_list')
success_message = _('Note template deleted successfully.')
def delete(self, request, *args, **kwargs):
nt = self.get_object()
if ClinicalNote.objects.filter(template=nt).exists():
messages.error(request, _('Cannot delete a template in use.'))
return redirect('emr:note_template_detail', pk=nt.pk)
AuditLogEntry.objects.create(
tenant=request.user.tenant,
user=request.user,
action='DELETE',
model_name='NoteTemplate',
object_id=str(nt.pk),
changes={'status': 'Template deleted'}
)
messages.success(request, self.success_message)
return super().delete(request, *args, **kwargs)
@login_required
def start_encounter(request, pk):
"""
Start a scheduled encounter with proper error handling.
"""
try:
# Get encounter with tenant validation
enc = get_object_or_404(
Encounter,
pk=pk,
patient__tenant=request.user.tenant
)
# Check permissions and status
if not request.user.has_perm('emr.change_encounter'):
messages.error(request, _('You do not have permission to start encounters.'))
return redirect('emr:encounter_detail', pk=pk)
if enc.status != 'SCHEDULED':
messages.error(request, _('Only scheduled encounters can be started.'))
return redirect('emr:encounter_detail', pk=pk)
# Update encounter status
old_status = enc.status
enc.status = 'IN_PROGRESS'
enc.save()
# Log the action
try:
AuditLogEntry.objects.create(
tenant=request.user.tenant,
user=request.user,
action='UPDATE',
model_name='Encounter',
object_id=str(enc.pk),
changes={'status': f'Changed from {old_status} to IN_PROGRESS'}
)
except Exception as e:
# Log audit failure but don't block the main operation
print(f"Audit logging failed: {e}")
messages.success(request, _('Encounter started successfully.'))
except Exception as e:
messages.error(request, _('An error occurred while starting the encounter. Please try again.'))
print(f"Error starting encounter {pk}: {e}")
return redirect('emr:encounter_detail', pk=pk)
@login_required
def complete_encounter(request, pk):
enc = get_object_or_404(Encounter, pk=pk, patient__tenant=request.user.tenant)
if enc.status == 'IN_PROGRESS':
enc.status = 'COMPLETED'; enc.save()
AuditLogEntry.objects.create(
tenant=request.user.tenant, user=request.user,
action='UPDATE', model_name='Encounter',
object_id=str(enc.pk), changes={'status': 'Encounter completed'}
)
messages.success(request, _('Encounter completed.'))
else:
messages.error(request, _('Only in-progress encounters can be completed.'))
return redirect('emr:encounter_detail', pk=pk)
@login_required
def resolve_problem(request, problem_id):
prob = get_object_or_404(ProblemList, pk=problem_id, patient__tenant=request.user.tenant)
if prob.status == 'ACTIVE':
prob.status = 'RESOLVED'; prob.save()
# Log successful creation
try:
AuditLogEntry.objects.create(
tenant=self.request.user.tenant,
user=self.request.user,
event_type='UPDATE',
event_category='PROBLEM_LIST',
action='Resolve Problem',
description=f'Problem resolved for {self.object.patient.get_full_name()}',
content_type=ContentType.objects.get_for_model(ProblemList),
object_id=self.object.pk,
object_repr=str(self.object),
patient_id=str(self.object.patient.patient_id),
patient_mrn=self.object.patient.mrn,
changes={'problem_status': self.object.problem_status, 'status': self.object.status}
)
except Exception as e:
# Log audit failure but don't block user flow
print(f"Audit logging failed: {e}")
messages.success(request, _('Problem resolved.'))
else:
messages.error(request, _('Only active problems can be resolved.'))
return redirect('emr:problem_detail', pk=prob.pk)
@login_required
def complete_care_plan(request, pk):
cp = get_object_or_404(CarePlan, pk=pk, patient__tenant=request.user.tenant)
if cp.status == 'ACTIVE':
cp.status = 'COMPLETED'
cp.save()
AuditLogger.log_event(
tenant=request.user.tenant,
event_type='UPDATE',
event_category='CARE_PLAN',
action='Care plan completed',
description=f'Progress updated for care plan #{cp.pk} ({cp.title})',
user=request.user,
content_object=cp,
request=request,
)
messages.success(request, _('Care plan completed.'))
else:
messages.error(request, _('Only active care plans can be completed.'))
return redirect('emr:care_plan_detail', pk=pk)
@login_required
def approve_care_plan(request, pk):
cp = get_object_or_404(CarePlan, pk=pk, patient__tenant=request.user.tenant)
if cp.status == 'DRAFT':
cp.status = 'ACTIVE'
cp.save()
AuditLogger.log_event(
tenant=request.user.tenant,
event_type='UPDATE',
event_category='CARE_PLAN',
action='Care plan approved',
description=f'Approval of care plan #{cp.pk} ({cp.title})',
user=request.user,
content_object=cp,
request=request,
)
messages.success(request, _('Care plan approved.'))
else:
messages.error(request, _('Only pending care plans can be approved.'))
return redirect('emr:care_plan_detail', pk=pk)
@login_required
def vital_signs_search(request):
tenant = request.user.tenant
search = request.GET.get('search', '')
vs = VitalSigns.objects.filter(encounter__patient__tenant=tenant)
if search:
vs = vs.filter(
Q(encounter__patient__first_name__icontains=search) |
Q(encounter__patient__last_name__icontains=search)
)
vs = vs.order_by('-measured_datetime')[:10]
return render(request, 'emr/partials/vital_signs_search.html', {'vital_signs': vs})
@login_required
def clinical_decision_support(request):
"""
Main view for Clinical Decision Support page.
"""
tenant = request.user.tenant
patient_id = request.GET.get('patient_id')
patient = None
critical_alerts = []
recommendations = []
drug_interactions = []
allergy_alerts = []
diagnostic_suggestions = []
treatment_protocols = []
risk_assessments = []
clinical_guidelines = []
if patient_id:
try:
patient = get_object_or_404(
PatientProfile,
patient_id=patient_id,
tenant=tenant
)
# Get critical alerts
critical_alerts = CriticalAlert.objects.filter(
patient=patient,
acknowledged=False
).order_by('-created_at')[:5]
# Get clinical recommendations
recommendations = ClinicalRecommendation.objects.filter(
patient=patient,
status__in=['PENDING', 'ACTIVE']
).order_by('-priority', '-created_at')[:10]
# Get allergy alerts
allergy_alerts = AllergyAlert.objects.filter(
patient=patient,
resolved=False
).order_by('-severity')[:5]
# Get diagnostic suggestions
diagnostic_suggestions = DiagnosticSuggestion.objects.filter(
patient=patient,
status='ACTIVE'
).order_by('-confidence')[:5]
# Get treatment protocols
treatment_protocols = TreatmentProtocol.objects.filter(
is_active=True
).order_by('-success_rate')[:5]
# Get relevant clinical guidelines
clinical_guidelines = ClinicalGuideline.objects.filter(
is_active=True
).order_by('-last_updated')[:6]
except Exception as e:
messages.error(request, f'Error loading patient data: {str(e)}')
context = {
'patient': patient,
'critical_alerts': critical_alerts,
'recommendations': recommendations,
'drug_interactions': drug_interactions,
'allergy_alerts': allergy_alerts,
'diagnostic_suggestions': diagnostic_suggestions,
'treatment_protocols': treatment_protocols,
'risk_assessments': risk_assessments,
'clinical_guidelines': clinical_guidelines,
}
return render(request, 'emr/clinical_decision_support.html', context)
@login_required
def patient_search_api(request):
"""
API endpoint for patient search.
"""
tenant = request.user.tenant
query = request.GET.get('q', '').strip()
if len(query) < 3:
return JsonResponse({'patients': []})
patients = PatientProfile.objects.filter(
tenant=tenant
).filter(
Q(first_name__icontains=query) |
Q(last_name__icontains=query) |
Q(mrn__icontains=query) |
Q(mobile_number__icontains=query)
).select_related().order_by('last_name', 'first_name')[:10]
patient_data = []
for patient in patients:
patient_data.append({
'mrn': patient.mrn,
'patient_id': patient.patient_id,
'first_name': patient.first_name,
'last_name': patient.last_name,
'date_of_birth': patient.date_of_birth.strftime('%Y-%m-%d') if patient.date_of_birth else None,
'gender': patient.get_gender_display(),
'mobile_number': patient.mobile_number,
})
return JsonResponse({'patients': patient_data})
@login_required
def get_clinical_recommendations(request):
"""
API endpoint to get clinical recommendations for a patient.
"""
tenant = request.user.tenant
patient_id = request.GET.get('patient_id')
if not patient_id:
return JsonResponse({'error': 'Patient ID required'}, status=400)
try:
patient = get_object_or_404(PatientProfile, patient_id=patient_id, tenant=tenant)
recommendations = ClinicalRecommendation.objects.filter(
patient=patient,
status__in=['PENDING', 'ACTIVE']
).order_by('-priority', '-created_at')[:20]
recommendation_data = []
for rec in recommendations:
recommendation_data.append({
'id': str(rec.id),
'title': rec.title,
'description': rec.description,
'category': rec.category,
'priority': rec.get_priority_display(),
'evidence_level': rec.evidence_level,
'source': rec.source,
'category_display': rec.get_category_display(),
})
return JsonResponse({'recommendations': recommendation_data})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
@login_required
def check_drug_interactions(request):
"""
API endpoint to check drug interactions for a patient.
"""
tenant = request.user.tenant
patient_id = request.GET.get('patient_id')
if not patient_id:
return JsonResponse({'error': 'Patient ID required'}, status=400)
try:
patient = get_object_or_404(PatientProfile, patient_id=patient_id, tenant=tenant)
# Get current medications (this would need to be implemented based on pharmacy system)
# For now, return mock data
interaction_data = [
{
'id': 'mock-1',
'drug1': 'Aspirin',
'drug2': 'Warfarin',
'severity': 'High',
'description': 'Increased risk of bleeding',
'severity_level': 3,
},
{
'id': 'mock-2',
'drug1': 'Lisinopril',
'drug2': 'Potassium',
'severity': 'Moderate',
'description': 'Risk of hyperkalemia',
'severity_level': 2,
}
]
return JsonResponse({'interactions': interaction_data})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
@login_required
def check_allergies(request):
"""
API endpoint to check allergies for a patient.
"""
tenant = request.user.tenant
patient_id = request.GET.get('patient_id')
if not patient_id:
return JsonResponse({'error': 'Patient ID required'}, status=400)
try:
patient = get_object_or_404(PatientProfile, patient_id=patient_id, tenant=tenant)
alerts = AllergyAlert.objects.filter(
patient=patient,
resolved=False
).order_by('-severity')[:10]
alert_data = []
for alert in alerts:
alert_data.append({
'id': str(alert.id),
'allergen': alert.allergen,
'reaction_type': alert.reaction_type,
'severity': alert.get_severity_display(),
'severity_level': alert.severity,
})
return JsonResponse({'alerts': alert_data})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
@login_required
def calculate_risk_scores(request):
"""
API endpoint to calculate risk scores for a patient.
"""
tenant = request.user.tenant
patient_id = request.GET.get('patient_id')
if not patient_id:
return JsonResponse({'error': 'Patient ID required'}, status=400)
try:
patient = get_object_or_404(PatientProfile, patient_id=patient_id, tenant=tenant)
# Calculate mock risk scores (this would need to be implemented based on quality system)
risk_data = [
{
'id': 'mock-1',
'name': 'Cardiovascular Risk',
'score': 15.2,
'level': 'Moderate',
'description': '10-year cardiovascular risk assessment',
},
{
'id': 'mock-2',
'name': 'Diabetes Risk',
'score': 8.5,
'level': 'Low',
'description': 'Risk of developing type 2 diabetes',
},
{
'id': 'mock-3',
'name': 'Fall Risk',
'score': 22.1,
'level': 'High',
'description': 'Assessment of fall risk factors',
}
]
return JsonResponse({'risk_scores': risk_data})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
@login_required
def accept_recommendation(request):
"""
API endpoint to accept a clinical recommendation.
"""
if request.method != 'POST':
return JsonResponse({'error': 'POST method required'}, status=405)
tenant = request.user.tenant
recommendation_id = request.POST.get('recommendation_id')
if not recommendation_id:
return JsonResponse({'error': 'Recommendation ID required'}, status=400)
try:
recommendation = get_object_or_404(
ClinicalRecommendation,
id=recommendation_id,
patient__tenant=tenant
)
recommendation.status = 'ACCEPTED'
recommendation.accepted_by = request.user
recommendation.accepted_at = timezone.now()
recommendation.save()
return JsonResponse({'success': True})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
@login_required
def defer_recommendation(request):
"""
API endpoint to defer a clinical recommendation.
"""
if request.method != 'POST':
return JsonResponse({'error': 'POST method required'}, status=405)
tenant = request.user.tenant
recommendation_id = request.POST.get('recommendation_id')
if not recommendation_id:
return JsonResponse({'error': 'Recommendation ID required'}, status=400)
try:
recommendation = get_object_or_404(
ClinicalRecommendation,
id=recommendation_id,
patient__tenant=tenant
)
recommendation.status = 'DEFERRED'
recommendation.deferred_by = request.user
recommendation.deferred_at = timezone.now()
recommendation.save()
return JsonResponse({'success': True})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
@login_required
def dismiss_recommendation(request):
"""
API endpoint to dismiss a clinical recommendation.
"""
if request.method != 'POST':
return JsonResponse({'error': 'POST method required'}, status=405)
tenant = request.user.tenant
recommendation_id = request.POST.get('recommendation_id')
if not recommendation_id:
return JsonResponse({'error': 'Recommendation ID required'}, status=400)
try:
recommendation = get_object_or_404(
ClinicalRecommendation,
id=recommendation_id,
patient__tenant=tenant
)
recommendation.status = 'DISMISSED'
recommendation.dismissed_by = request.user
recommendation.dismissed_at = timezone.now()
recommendation.save()
return JsonResponse({'success': True})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
@login_required
def acknowledge_alert(request):
"""
API endpoint to acknowledge a critical alert.
"""
if request.method != 'POST':
return JsonResponse({'error': 'POST method required'}, status=405)
tenant = request.user.tenant
alert_id = request.POST.get('alert_id')
if not alert_id:
return JsonResponse({'error': 'Alert ID required'}, status=400)
try:
alert = get_object_or_404(
CriticalAlert,
id=alert_id,
patient__tenant=tenant
)
alert.acknowledged = True
alert.acknowledged_by = request.user
alert.acknowledged_at = timezone.now()
alert.save()
return JsonResponse({'success': True})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
@login_required
def apply_protocol(request):
"""
API endpoint to apply a treatment protocol.
"""
if request.method != 'POST':
return JsonResponse({'error': 'POST method required'}, status=405)
tenant = request.user.tenant
protocol_id = request.POST.get('protocol_id')
patient_id = request.POST.get('patient_id')
if not protocol_id or not patient_id:
return JsonResponse({'error': 'Protocol ID and Patient ID required'}, status=400)
try:
protocol = get_object_or_404(TreatmentProtocol, id=protocol_id, is_active=True)
patient = get_object_or_404(PatientProfile, patient_id=patient_id, tenant=tenant)
# Create a care plan based on the protocol
care_plan = CarePlan.objects.create(
tenant=tenant,
patient=patient,
title=f"{protocol.name} - Applied",
description=f"Applied treatment protocol: {protocol.description}",
plan_type='TREATMENT',
category='TREATMENT',
start_date=timezone.now().date(),
primary_provider=request.user,
goals=protocol.goals,
interventions=protocol.interventions,
monitoring_parameters=protocol.monitoring_parameters,
)
return JsonResponse({'success': True, 'care_plan_id': str(care_plan.id)})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
@login_required
def get_template_content(request, pk):
"""
Return the raw content of a NoteTemplate (e.g. for AJAX/HTMX).
"""
template = get_object_or_404(
NoteTemplate,
pk=pk,
tenant=request.user.tenant
)
return JsonResponse({
'template_id': str(template.template_id),
'name': template.name,
'content': template.template_content,
})
@login_required
def emr_stats(request):
"""
HTMX endpoint for EMR statistics.
"""
tenant = request.user.tenant
today = timezone.now().date()
last_month = today - timedelta(days=30)
# Base querysets
enc = Encounter.objects.filter(patient__tenant=tenant)
vs = VitalSigns.objects.filter(encounter__patient__tenant=tenant)
pb = ProblemList.objects.filter(patient__tenant=tenant)
cp = CarePlan.objects.filter(patient__tenant=tenant)
cn = ClinicalNote.objects.filter(encounter__patient__tenant=tenant)
providers = User.objects.filter(tenant=tenant, groups__name__in=['Physicians', 'Nurses', 'Providers'])
# Calculate basic stats
total_encounters = enc.count()
active_encounters = enc.filter(status='IN_PROGRESS').count()
pending_documentation = enc.filter(documentation_complete=False).count()
# Calculate today's activity
todays_encounters = enc.filter(start_datetime__date=today).count()
completed_today = enc.filter(end_datetime__date=today, status='COMPLETED').count()
vitals_today = vs.filter(measured_datetime__date=today).count()
# Calculate provider activity
active_providers = providers.filter(
pk__in=enc.filter(start_datetime__date__gte=last_month).values('provider')
).distinct().count()
# Calculate performance metrics - calculate wait time from arrival to start
avg_wait_time = 15 # Default value
# Get encounters with both arrived_datetime and start_datetime
encounters_with_wait_times = enc.filter(
start_datetime__date__gte=last_month,
status__in=['IN_PROGRESS', 'COMPLETED', 'ON_HOLD']
).annotate(
wait_time=ExpressionWrapper(
F('start_datetime') - F('created_at'),
output_field=DurationField()
)
).exclude(wait_time=None)
if encounters_with_wait_times.exists():
total_wait_minutes = 0
count = 0
for encounter in encounters_with_wait_times:
if encounter.wait_time:
total_wait_minutes += encounter.wait_time.total_seconds() / 60
count += 1
if count > 0:
avg_wait_time = total_wait_minutes / count
# Calculate documentation rate
total_recent = enc.filter(start_datetime__date__gte=last_month).count()
documented_recent = enc.filter(
start_datetime__date__gte=last_month,
documentation_complete=True
).count()
documentation_rate = (documented_recent / total_recent * 100) if total_recent > 0 else 0
# Calculate encounter efficiency and quality score (mock values for now)
encounter_efficiency = 85.5
quality_score = 92.3
# Calculate month-over-month change
last_month_encounters = enc.filter(
start_datetime__date__gte=last_month - timedelta(days=30),
start_datetime__date__lt=last_month
).count()
encounters_change = 0
if last_month_encounters > 0:
encounters_change = ((total_encounters - last_month_encounters) / last_month_encounters) * 100
# Calculate average encounters per provider
avg_encounters_per_provider = 0
if active_providers > 0:
avg_encounters_per_provider = total_encounters / active_providers
# Calculate average encounter duration
avg_encounter_duration = 0
durations = enc.filter(
end_datetime__isnull=False,
start_datetime__isnull=False
).annotate(
duration=ExpressionWrapper(
F('end_datetime') - F('start_datetime'),
output_field=DurationField()
)
).values_list('duration', flat=True)
if durations.exists():
total_seconds = sum(d.total_seconds() for d in durations)
avg_encounter_duration = total_seconds / len(durations) / 3600 # Convert to hours
# Critical alerts (mock value for now)
critical_alerts = 0
# Compile all stats
context = {
'total_encounters': total_encounters,
'active_encounters': active_encounters,
'pending_documentation': pending_documentation,
'critical_alerts': critical_alerts,
'todays_encounters': todays_encounters,
'completed_today': completed_today,
'vitals_today': vitals_today,
'active_providers': active_providers,
'avg_encounters_per_provider': avg_encounters_per_provider,
'documentation_rate': documentation_rate,
'avg_wait_time': avg_wait_time,
'encounter_efficiency': encounter_efficiency,
'quality_score': quality_score,
'encounters_change': encounters_change,
'avg_encounter_duration': avg_encounter_duration,
}
return render(request, 'emr/partials/emr_stats.html', context)
@login_required
def encounter_search(request):
"""
HTMX endpoint for encounter search.
"""
search = request.GET.get('search', '')
status = request.GET.get('status', '')
encounter_type = request.GET.get('encounter_type', '')
queryset = Encounter.objects.filter(tenant=request.user.tenant)
if search:
queryset = queryset.filter(
Q(patient__first_name__icontains=search) |
Q(patient__last_name__icontains=search) |
Q(patient__mrn__icontains=search) |
Q(chief_complaint__icontains=search)
)
if status:
queryset = queryset.filter(status=status)
if encounter_type:
queryset = queryset.filter(encounter_type=encounter_type)
encounters = queryset.select_related('patient', 'provider').order_by('-start_datetime')[:20]
return render(request, 'emr/partials/encounter_list.html', {'encounters': encounters})
@login_required
def vital_signs_chart(request, patient_id):
"""
HTMX endpoint for vital signs chart.
"""
tenant = request.user.tenant
patient = get_object_or_404(
PatientProfile,
id=patient_id,
tenant=tenant
)
# Get recent vital signs
vital_signs = VitalSigns.objects.filter(
patient=patient
).order_by('-measured_datetime')[:15]
context = {
'patient': patient,
'vital_signs': vital_signs
}
return render(request, 'emr/partials/vital_signs_chart.html', context)
@login_required
def problem_list_patient(request, patient_id):
"""
HTMX endpoint for patient problem list.
"""
patient = get_object_or_404(
PatientProfile,
id=patient_id,
tenant=request.user.tenant
)
problems = ProblemList.objects.filter(
patient=patient
).select_related('diagnosing_provider').order_by('-created_at')
return render(request, 'emr/partials/problem_list.html', {
'patient': patient,
'problems': problems
})
@login_required
def add_vital_signs(request, pk):
"""
HTMX endpoint for adding vital signs.
"""
tenant = request.user.tenant
if request.method == 'POST':
encounter = get_object_or_404(
Encounter,
pk=pk,
tenant=tenant
)
vital_signs_data = {
'encounter': encounter,
'patient': encounter.patient,
'measured_by': request.user,
'measured_datetime': timezone.now(),
}
# Extract vital signs data
# Add measurements if provided
if request.POST.get('temperature'):
vital_signs_data['temperature'] = float(request.POST.get('temperature'))
vital_signs_data['temperature_method'] = request.POST.get('temperature_method', 'ORAL')
if request.POST.get('systolic_bp') and request.POST.get('diastolic_bp'):
vital_signs_data['systolic_bp'] = int(request.POST.get('systolic_bp'))
vital_signs_data['diastolic_bp'] = int(request.POST.get('diastolic_bp'))
vital_signs_data['bp_position'] = request.POST.get('bp_position', 'SITTING')
if request.POST.get('heart_rate'):
vital_signs_data['heart_rate'] = int(request.POST.get('heart_rate'))
vital_signs_data['heart_rhythm'] = request.POST.get('heart_rhythm', 'REGULAR')
if request.POST.get('respiratory_rate'):
vital_signs_data['respiratory_rate'] = int(request.POST.get('respiratory_rate'))
if request.POST.get('oxygen_saturation'):
vital_signs_data['oxygen_saturation'] = int(request.POST.get('oxygen_saturation'))
vital_signs_data['oxygen_delivery'] = request.POST.get('oxygen_delivery', 'ROOM_AIR')
if request.POST.get('pain_scale'):
vital_signs_data['pain_scale'] = int(request.POST.get('pain_scale'))
vital_signs_data['pain_location'] = request.POST.get('pain_location', '')
if request.POST.get('weight'):
vital_signs_data['weight'] = float(request.POST.get('weight'))
if request.POST.get('height'):
vital_signs_data['height'] = float(request.POST.get('height'))
# Create vital signs record
vital_signs = VitalSigns.objects.create(**vital_signs_data)
# Log the action
AuditLogger.log_event(
user=request.user,
action='VITAL_SIGNS_RECORDED',
model='VitalSigns',
object_id=vital_signs.id,
details=f"Vital signs recorded for {encounter.patient.get_full_name()}"
)
messages.success(request, 'Vital signs recorded successfully')
else:
messages.error(request, 'An error occurred while recording the vital signs. Please try again later.')
# return JsonResponse({'success': True, 'vital_signs_id': vital_signs.id})
return redirect('emr:encounter_detail', pk=pk)
# return JsonResponse({'error': 'Invalid request'}, status=400)
@login_required
def add_problem(request, encounter_id):
"""
HTMX endpoint for adding a problem to an encounter.
"""
tenant = request.user.tenant
encounter = get_object_or_404(Encounter, id=encounter_id, tenant=tenant)
if request.method == 'POST':
form = ProblemListForm(request.POST, tenant=tenant)
if form.is_valid():
problem = form.save(commit=False)
problem.tenant = tenant
problem.patient = encounter.patient
problem.diagnosing_provider = request.user
problem.related_encounter = encounter
problem.save()
# Log the action
try:
AuditLogEntry.objects.create(
tenant=tenant,
user=request.user,
event_type='CREATE',
event_category='CLINICAL_DATA',
action='Add Problem',
description=f'Problem added: {problem.problem_name}',
content_type=ContentType.objects.get_for_model(ProblemList),
object_id=problem.pk,
object_repr=str(problem),
patient_id=str(encounter.patient.patient_id),
patient_mrn=encounter.patient.mrn,
changes={'problem_type': problem.problem_type, 'status': problem.status}
)
except Exception as e:
print(f"Audit logging failed: {e}")
messages.success(request, f'Problem "{problem.problem_name}" added successfully')
return redirect('emr:encounter_detail', pk=encounter_id)
else:
# Return form with errors
return render(request, 'emr/partials/problem_form_modal.html', {
'form': form,
'encounter': encounter,
'patient': encounter.patient
})
else:
# GET request - show form
form = ProblemListForm(tenant=tenant, initial={
'patient': encounter.patient,
'diagnosing_provider': request.user,
'related_encounter': encounter,
'status': 'ACTIVE',
'priority': 'MEDIUM'
})
return render(request, 'emr/partials/problem_form_modal.html', {
'form': form,
'encounter': encounter,
'patient': encounter.patient
})
@login_required
def add_care_plan(request, encounter_id):
"""
HTMX endpoint for adding a care plan to an encounter.
"""
tenant = request.user.tenant
encounter = get_object_or_404(Encounter, id=encounter_id, tenant=tenant)
if request.method == 'POST':
form = CarePlanForm(request.POST, tenant=tenant)
if form.is_valid():
care_plan = form.save(commit=False)
care_plan.tenant = tenant
care_plan.patient = encounter.patient
care_plan.primary_provider = request.user
care_plan.save()
form.save_m2m() # Save many-to-many relationships
# Log the action
try:
AuditLogEntry.objects.create(
tenant=tenant,
user=request.user,
event_type='CREATE',
event_category='CARE_PLAN',
action='Add Care Plan',
description=f'Care plan created: {care_plan.title}',
content_type=ContentType.objects.get_for_model(CarePlan),
object_id=care_plan.pk,
object_repr=str(care_plan),
patient_id=str(encounter.patient.patient_id),
patient_mrn=encounter.patient.mrn,
changes={'plan_type': care_plan.plan_type, 'status': care_plan.status}
)
except Exception as e:
print(f"Audit logging failed: {e}")
messages.success(request, f'Care plan "{care_plan.title}" created successfully')
return redirect('emr:encounter_detail', pk=encounter_id)
else:
# Return form with errors
return render(request, 'emr/partials/care_plan_form_modal.html', {
'form': form,
'encounter': encounter,
'patient': encounter.patient
})
else:
# GET request - show form
form = CarePlanForm(tenant=tenant, initial={
'patient': encounter.patient,
'primary_provider': request.user,
'start_date': timezone.now().date(),
'status': 'DRAFT',
'priority': 'ROUTINE'
})
return render(request, 'emr/partials/care_plan_form_modal.html', {
'form': form,
'encounter': encounter,
'patient': encounter.patient
})
@login_required
def update_encounter_status(request, encounter_id):
"""
HTMX endpoint for updating encounter status.
"""
if request.method == 'POST':
encounter = get_object_or_404(
Encounter,
id=encounter_id,
tenant=request.user.tenant
)
new_status = request.POST.get('status')
if new_status in dict(Encounter._meta.get_field('status').choices):
old_status = encounter.status
encounter.status = new_status
# Handle status-specific logic
if new_status == 'FINISHED':
encounter.end_datetime = timezone.now()
elif new_status == 'IN_PROGRESS' and not encounter.end_datetime:
encounter.end_datetime = None
encounter.save()
# Log the action
AuditLogger.log_event(
user=request.user,
action='ENCOUNTER_STATUS_UPDATED',
model='Encounter',
object_id=encounter.id,
details=f"Encounter status changed from {old_status} to {new_status}"
)
return JsonResponse({
'success': True,
'new_status': encounter.get_status_display(),
'status_class': get_status_class(new_status)
})
return JsonResponse({'error': 'Invalid request'}, status=400)
@login_required
def sign_note(request, note_id):
"""
HTMX endpoint for signing a clinical note.
"""
tenant = request.user.tenant
if request.method == 'POST':
note = get_object_or_404(
ClinicalNote,
id=note_id,
patient__tenant=tenant,
author=request.user,
status='DRAFT'
)
note.status = 'SIGNED'
note.electronically_signed = True
note.signed_datetime = timezone.now()
note.signature_method = 'ELECTRONIC'
note.save()
# Log the action
AuditLogger.log_event(
tenant=tenant,
event_type='NOTE_SIGNED',
event_category='ClinicalNote',
action='Sign clinical note',
description=f"Clinical note signed: {note.title}",
user=request.user,
content_object=note,
request=request
)
messages.success(request, f'Note "{note.title}" signed successfully')
else:
messages.error(request, 'An error occurred while signing the note. Please try again later.')
return redirect('emr:clinical_note_detail', id=note_id)
def get_status_class(status):
"""
Get CSS class for status display.
"""
status_classes = {
'PLANNED': 'secondary',
'ARRIVED': 'info',
'TRIAGED': 'warning',
'IN_PROGRESS': 'primary',
'ON_HOLD': 'warning',
'FINISHED': 'success',
'CANCELLED': 'danger',
'ENTERED_IN_ERROR': 'danger',
'UNKNOWN': 'secondary',
}
return status_classes.get(status, 'secondary')
def _norm_code(s: str) -> str:
return (s or "").upper().replace(" ", "")
class Icd10SearchView(LoginRequiredMixin, ListView):
"""
Search ICD-10 by description or code.
URL params: ?q=...&mode=description|code&page=N
"""
model = Icd10
template_name = "emr/icd10_search.html"
context_object_name = "results"
paginate_by = 20
def get_queryset(self):
q = (self.request.GET.get("q") or "").strip()
mode = (self.request.GET.get("mode") or "description").strip().lower()
if not q:
return Icd10.objects.none()
try:
if mode == "code":
qn = _norm_code(q)
return (
Icd10.objects.filter(
Q(code__iexact=qn)
| Q(code__istartswith=qn)
| Q(code__icontains=qn)
)
.annotate(
exact_first=Case(
When(code__iexact=qn, then=Value(0)),
When(code__istartswith=qn, then=Value(1)),
default=Value(2),
output_field=IntegerField(),
)
)
.order_by("exact_first", "code")
)
else:
return (
Icd10.objects.filter(
Q(description__icontains=q)
| Q(section_name__icontains=q)
| Q(chapter_name__icontains=q)
)
.order_by("code")
)
except Exception as exc:
messages.error(self.request, f"Search failed: {exc}")
return Icd10.objects.none()
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
ctx["q"] = self.request.GET.get("q", "")
ctx["mode"] = self.request.GET.get("mode", "description")
ctx["total"] = self.get_queryset().count()
ctx["error"] = None # template compatibility
return ctx
class Icd10DetailView(LoginRequiredMixin, DetailView):
"""
Exact lookup by code (e.g., A00.0).
"""
model = Icd10
template_name = "emr/icd10_detail.html"
context_object_name = "record"
slug_field = "code"
slug_url_kwarg = "code"
def get_object(self, queryset=None):
code_in = (self.kwargs.get(self.slug_url_kwarg) or "").strip()
if not code_in:
raise Http404("ICD-10 code is required.")
try:
return self.model.objects.get(code__iexact=_norm_code(code_in))
except self.model.DoesNotExist:
raise Http404("ICD-10 code not found.")
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
record = ctx["record"]
# Collect ancestors
ancestors = []
p = record.parent
while p is not None:
ancestors.append(p)
p = p.parent
ancestors.reverse()
ctx["code"] = record.code
ctx["ancestors"] = ancestors
ctx["children"] = record.children.all().order_by("code")
return ctx
@login_required
def icd10_search_api(request):
"""
API endpoint for ICD-10 search autocomplete.
"""
query = request.GET.get('q', '').strip()
if len(query) < 2:
return JsonResponse({'results': []})
try:
# Search in description, code, and section name
icd10_records = Icd10.objects.filter(
Q(description__icontains=query) |
Q(code__icontains=query) |
Q(section_name__icontains=query)
).exclude(is_header=True).order_by('code')[:20]
results = []
for record in icd10_records:
results.append({
'code': record.code,
'description': record.description,
'display': f"{record.code} - {record.description[:100]}{'...' if len(record.description) > 100 else ''}",
'chapter_name': record.chapter_name,
'section_name': record.section_name
})
return JsonResponse({'results': results})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)