HH/apps/rca/views.py
2026-04-08 17:13:35 +03:00

504 lines
18 KiB
Python

"""
RCA (Root Cause Analysis) views
"""
from django.contrib import messages
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.contenttypes.models import ContentType
from django.db.models import Q, Count
from django.core.exceptions import PermissionDenied
from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse_lazy
from django.utils import timezone
from django.views import View
from django.views.generic import (
CreateView,
DeleteView,
DetailView,
ListView,
UpdateView,
)
def _check_rca_access(request, rca):
user = request.user
if user.is_superuser:
return
if user.is_px_admin():
tenant = getattr(request, "tenant_hospital", None)
if tenant and rca.hospital_id == tenant.id:
return
elif user.hospital and rca.hospital_id == user.hospital.id:
return
raise PermissionDenied("You don't have access to this RCA.")
from .forms import (
RCAAttachmentForm,
RCAClosureForm,
RCACorrectiveActionForm,
RCANoteForm,
RCARootCauseForm,
RCAApprovalForm,
RCAStatusChangeForm,
RootCauseAnalysisForm,
)
from .models import (
RCAActionStatus,
RCAStatus,
RCACorrectiveAction,
RCARootCause,
RootCauseAnalysis,
)
class RCAListView(LoginRequiredMixin, ListView):
"""List view for Root Cause Analyses"""
model = RootCauseAnalysis
template_name = "rca/rca_list.html"
context_object_name = "rcas"
paginate_by = 20
def get_queryset(self):
queryset = (
RootCauseAnalysis.objects.filter(is_deleted=False)
.select_related("hospital", "department", "assigned_to", "created_by")
.prefetch_related("root_causes", "corrective_actions")
)
# Get filter parameters
status = self.request.GET.get("status")
severity = self.request.GET.get("severity")
priority = self.request.GET.get("priority")
hospital = self.request.GET.get("hospital")
search = self.request.GET.get("search")
date_from = self.request.GET.get("date_from")
date_to = self.request.GET.get("date_to")
# Apply filters
if status:
queryset = queryset.filter(status=status)
if severity:
queryset = queryset.filter(severity=severity)
if priority:
queryset = queryset.filter(priority=priority)
if hospital:
queryset = queryset.filter(hospital_id=hospital)
if search:
queryset = queryset.filter(Q(title__icontains=search) | Q(description__icontains=search))
if date_from:
queryset = queryset.filter(created_at__gte=date_from)
if date_to:
queryset = queryset.filter(created_at__lte=date_to)
# Filter by user's hospital (if not admin)
user = self.request.user
if user.is_px_admin():
tenant = getattr(self.request, "tenant_hospital", None)
if tenant:
queryset = queryset.filter(hospital=tenant)
elif user.hospital:
queryset = queryset.filter(hospital=user.hospital)
else:
queryset = queryset.none()
return queryset.order_by("-created_at")
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
# Add filter form
from .forms import RCAFilterForm
context["filter_form"] = RCAFilterForm(self.request.GET)
# Add counts
context["total_count"] = self.get_queryset().count()
context["draft_count"] = self.get_queryset().filter(status=RCAStatus.DRAFT).count()
context["in_progress_count"] = self.get_queryset().filter(status=RCAStatus.IN_PROGRESS).count()
context["review_count"] = self.get_queryset().filter(status=RCAStatus.REVIEW).count()
context["approved_count"] = self.get_queryset().filter(status=RCAStatus.APPROVED).count()
context["closed_count"] = self.get_queryset().filter(status=RCAStatus.CLOSED).count()
return context
class RCADetailView(LoginRequiredMixin, DetailView):
"""Detail view for Root Cause Analysis"""
model = RootCauseAnalysis
template_name = "rca/rca_detail.html"
context_object_name = "rca"
def get_queryset(self):
queryset = (
RootCauseAnalysis.objects.filter(is_deleted=False)
.select_related("hospital", "department", "assigned_to", "created_by", "approved_by", "closed_by")
.prefetch_related(
"root_causes__verified_by",
"corrective_actions__root_cause",
"attachments",
"notes__created_by",
"status_logs__changed_by",
)
)
user = self.request.user
if user.is_px_admin():
tenant = getattr(self.request, "tenant_hospital", None)
if tenant:
return queryset.filter(hospital=tenant)
elif user.hospital:
return queryset.filter(hospital=user.hospital)
return queryset.none()
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["status_choices"] = RCAStatus.choices
total_actions = self.object.corrective_actions.count()
completed_actions = self.object.corrective_actions.filter(status=RCAActionStatus.COMPLETED).count()
context["progress_percentage"] = (completed_actions / total_actions * 100) if total_actions > 0 else 0
if self.object.content_type and self.object.object_id:
try:
model_class = self.object.content_type.model_class()
related_item = model_class.objects.filter(pk=self.object.object_id).first()
context["related_item"] = related_item
context["related_item_type"] = self.object.content_type.model
except Exception:
pass
return context
class RCACreateView(LoginRequiredMixin, CreateView):
"""Create view for Root Cause Analysis"""
model = RootCauseAnalysis
form_class = RootCauseAnalysisForm
template_name = "rca/rca_form.html"
success_url = reverse_lazy("rca:rca_list")
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["user"] = self.request.user
kwargs["request"] = self.request
return kwargs
def get_initial(self):
initial = super().get_initial()
related_model = self.request.GET.get("related_model") or self.request.POST.get("related_model")
related_id = self.request.GET.get("related_id") or self.request.POST.get("related_id")
if related_model and related_id:
initial["related_model"] = related_model
initial["related_id"] = related_id
try:
content_type = ContentType.objects.get(model=related_model)
initial["content_type"] = content_type.pk
initial["object_id"] = related_id
model_class = content_type.model_class()
if model_class and hasattr(model_class, "objects"):
obj = model_class.objects.filter(pk=related_id).first()
if obj:
if hasattr(obj, "hospital_id"):
initial["hospital"] = obj.hospital_id
if hasattr(obj, "department_id"):
initial["department"] = obj.department_id
if hasattr(obj, "title"):
initial["title"] = f"RCA: {obj.title}"
elif hasattr(obj, "subject"):
initial["title"] = f"RCA: {obj.subject}"
if hasattr(obj, "description"):
initial["description"] = obj.description
except ContentType.DoesNotExist:
pass
return initial
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
related_model = self.request.GET.get("related_model")
related_id = self.request.GET.get("related_id")
context["related_model"] = related_model
context["related_id"] = related_id
if related_model and related_id:
try:
content_type = ContentType.objects.get(model=related_model)
model_class = content_type.model_class()
if model_class:
obj = model_class.objects.filter(pk=related_id).first()
if obj:
context["related_item"] = obj
context["related_item_type"] = related_model
except ContentType.DoesNotExist:
pass
return context
def form_valid(self, form):
rca = form.save(commit=False)
rca.created_by = self.request.user
related_model = self.request.POST.get("related_model")
related_id = self.request.POST.get("related_id")
if related_model and related_id:
try:
content_type = ContentType.objects.get(model=related_model)
rca.content_type = content_type
rca.object_id = related_id
except ContentType.DoesNotExist:
pass
rca.save()
rca.status_logs.create(old_status="", new_status=rca.status, changed_by=self.request.user, notes="RCA created")
messages.success(self.request, "Root Cause Analysis created successfully!")
return redirect("rca:rca_detail", pk=rca.pk)
class RCAUpdateView(LoginRequiredMixin, UpdateView):
"""Update view for Root Cause Analysis"""
model = RootCauseAnalysis
form_class = RootCauseAnalysisForm
template_name = "rca/rca_form.html"
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["user"] = self.request.user
kwargs["request"] = self.request
return kwargs
def get_object(self, queryset=None):
obj = super().get_object(queryset)
_check_rca_access(self.request, obj)
return obj
def get_success_url(self):
return reverse_lazy("rca:rca_detail", kwargs={"pk": self.object.pk})
def form_valid(self, form):
rca = form.save()
messages.success(self.request, "Root Cause Analysis updated successfully!")
return redirect(self.get_success_url())
class RCADeleteView(LoginRequiredMixin, View):
"""Delete (soft delete) view for Root Cause Analysis"""
def post(self, request, pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
_check_rca_access(request, rca)
rca.soft_delete(user=request.user)
messages.success(request, "Root Cause Analysis deleted successfully!")
return redirect("rca:rca_list")
class RCAStatusChangeView(LoginRequiredMixin, View):
"""View to change RCA status"""
def post(self, request, pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
_check_rca_access(request, rca)
form = RCAStatusChangeForm(request.POST)
if form.is_valid():
old_status = rca.status
new_status = form.cleaned_data["new_status"]
notes = form.cleaned_data["notes"]
rca.status = new_status
# Set timestamps based on status
if new_status == RCAStatus.IN_PROGRESS:
rca.assigned_to = request.user
rca.assigned_at = timezone.now()
elif new_status == RCAStatus.APPROVED:
rca.approved_by = request.user
rca.approved_at = timezone.now()
rca.save()
# Create status log
rca.status_logs.create(old_status=old_status, new_status=new_status, changed_by=request.user, notes=notes)
messages.success(request, f"Status changed from {old_status} to {new_status}")
else:
messages.error(request, "Invalid status change")
return redirect("rca:rca_detail", pk=rca.pk)
class RCAApprovalView(LoginRequiredMixin, View):
"""View to approve RCA"""
def post(self, request, pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False, status=RCAStatus.REVIEW)
_check_rca_access(request, rca)
form = RCAApprovalForm(request.POST)
if form.is_valid():
rca.status = RCAStatus.APPROVED
rca.approved_by = request.user
rca.approved_at = timezone.now()
rca.approval_notes = form.cleaned_data["approval_notes"]
rca.save()
# Create status log
rca.status_logs.create(
old_status=RCAStatus.REVIEW,
new_status=RCAStatus.APPROVED,
changed_by=request.user,
notes=rca.approval_notes,
)
messages.success(request, "RCA approved successfully!")
else:
messages.error(request, "Invalid approval data")
return redirect("rca:rca_detail", pk=rca.pk)
class RCAClosureView(LoginRequiredMixin, View):
"""View to close RCA"""
def post(self, request, pk):
rca = get_object_or_404(
RootCauseAnalysis, pk=pk, is_deleted=False, status__in=[RCAStatus.APPROVED, RCAStatus.IN_PROGRESS]
)
_check_rca_access(request, rca)
form = RCAClosureForm(request.POST)
if form.is_valid():
rca.status = RCAStatus.CLOSED
rca.closed_by = request.user
rca.closed_at = timezone.now()
rca.closure_notes = form.cleaned_data["closure_notes"]
rca.actual_completion_date = form.cleaned_data["actual_completion_date"]
rca.save()
# Create status log
rca.status_logs.create(
old_status=RCAStatus.APPROVED,
new_status=RCAStatus.CLOSED,
changed_by=request.user,
notes=rca.closure_notes,
)
messages.success(request, "RCA closed successfully!")
else:
messages.error(request, "Invalid closure data")
return redirect("rca:rca_detail", pk=rca.pk)
class RCARootCauseCreateView(LoginRequiredMixin, View):
"""View to add a root cause to RCA"""
def post(self, request, pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
_check_rca_access(request, rca)
form = RCARootCauseForm(request.POST)
if form.is_valid():
root_cause = form.save(commit=False)
root_cause.rca = rca
root_cause.save()
messages.success(request, "Root cause added successfully!")
else:
messages.error(request, "Invalid root cause data")
return redirect("rca:rca_detail", pk=rca.pk)
class RCARootCauseDeleteView(LoginRequiredMixin, View):
"""View to delete a root cause"""
def post(self, request, pk, root_cause_pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
_check_rca_access(request, rca)
root_cause = get_object_or_404(RCARootCause, pk=root_cause_pk, rca=rca)
root_cause.delete()
messages.success(request, "Root cause deleted successfully!")
return redirect("rca:rca_detail", pk=rca.pk)
class RCACorrectiveActionCreateView(LoginRequiredMixin, View):
"""View to add a corrective action to RCA"""
def post(self, request, pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
_check_rca_access(request, rca)
form = RCACorrectiveActionForm(request.POST, rca=rca)
if form.is_valid():
action = form.save(commit=False)
action.rca = rca
action.save()
messages.success(request, "Corrective action added successfully!")
else:
messages.error(request, "Invalid corrective action data")
return redirect("rca:rca_detail", pk=rca.pk)
class RCACorrectiveActionDeleteView(LoginRequiredMixin, View):
"""View to delete a corrective action"""
def post(self, request, pk, action_pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
_check_rca_access(request, rca)
action = get_object_or_404(RCACorrectiveAction, pk=action_pk, rca=rca)
action.delete()
messages.success(request, "Corrective action deleted successfully!")
return redirect("rca:rca_detail", pk=rca.pk)
class RCAAttachmentCreateView(LoginRequiredMixin, View):
"""View to add an attachment to RCA"""
def post(self, request, pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
_check_rca_access(request, rca)
form = RCAAttachmentForm(request.POST, request.FILES)
if form.is_valid():
attachment = form.save(commit=False)
attachment.rca = rca
attachment.uploaded_by = request.user
attachment.filename = request.FILES["file"].name
attachment.file_type = request.FILES["file"].content_type
attachment.file_size = request.FILES["file"].size
attachment.save()
messages.success(request, "Attachment added successfully!")
else:
messages.error(request, "Invalid attachment data")
return redirect("rca:rca_detail", pk=rca.pk)
class RCANoteCreateView(LoginRequiredMixin, View):
"""View to add a note to RCA"""
def post(self, request, pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
_check_rca_access(request, rca)
form = RCANoteForm(request.POST)
if form.is_valid():
note = form.save(commit=False)
note.rca = rca
note.created_by = request.user
note.save()
messages.success(request, "Note added successfully!")
else:
messages.error(request, "Invalid note data")
return redirect("rca:rca_detail", pk=rca.pk)