HH/apps/complaints/views.py
2026-01-12 12:08:18 +03:00

1302 lines
44 KiB
Python

"""
Complaints views and viewsets
"""
from django.db.models import Q
from django.utils import timezone
from rest_framework import status, viewsets
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from apps.core.services import AuditService
from .models import Complaint, ComplaintAttachment, ComplaintUpdate, Inquiry
from .serializers import (
ComplaintAttachmentSerializer,
ComplaintListSerializer,
ComplaintSerializer,
ComplaintUpdateSerializer,
InquirySerializer,
)
def map_complaint_category_to_action_category(complaint_category_code):
"""
Map complaint category code to PX Action category.
Provides intelligent mapping from complaint categories to PX Action categories.
Returns 'other' as fallback if no match found.
"""
if not complaint_category_code:
return 'other'
mapping = {
# Clinical issues
'clinical': 'clinical_quality',
'medical': 'clinical_quality',
'diagnosis': 'clinical_quality',
'treatment': 'clinical_quality',
'medication': 'clinical_quality',
'care': 'clinical_quality',
# Safety issues
'safety': 'patient_safety',
'risk': 'patient_safety',
'incident': 'patient_safety',
'infection': 'patient_safety',
'harm': 'patient_safety',
# Service quality
'service': 'service_quality',
'communication': 'service_quality',
'wait': 'service_quality',
'response': 'service_quality',
'customer_service': 'service_quality',
'timeliness': 'service_quality',
'waiting_time': 'service_quality',
# Staff behavior
'staff': 'staff_behavior',
'behavior': 'staff_behavior',
'attitude': 'staff_behavior',
'professionalism': 'staff_behavior',
'rude': 'staff_behavior',
'respect': 'staff_behavior',
# Facility
'facility': 'facility',
'environment': 'facility',
'cleanliness': 'facility',
'equipment': 'facility',
'infrastructure': 'facility',
'parking': 'facility',
'accessibility': 'facility',
# Process
'process': 'process_improvement',
'administrative': 'process_improvement',
'billing': 'process_improvement',
'procedure': 'process_improvement',
'workflow': 'process_improvement',
'registration': 'process_improvement',
'appointment': 'process_improvement',
}
# Try exact match first
category_lower = complaint_category_code.lower()
if category_lower in mapping:
return mapping[category_lower]
# Try partial match (contains the keyword)
for keyword, action_category in mapping.items():
if keyword in category_lower:
return action_category
# Fallback to 'other'
return 'other'
class ComplaintViewSet(viewsets.ModelViewSet):
"""
ViewSet for Complaints with workflow actions.
Permissions:
- All authenticated users can view complaints
- PX Admins and Hospital Admins can create/manage complaints
"""
queryset = Complaint.objects.all()
permission_classes = [IsAuthenticated]
filterset_fields = [
'status', 'severity', 'priority', 'category', 'source',
'hospital', 'department', 'staff', 'assigned_to',
'is_overdue', 'hospital__organization'
]
search_fields = ['title', 'description', 'patient__mrn', 'patient__first_name', 'patient__last_name']
ordering_fields = ['created_at', 'due_at', 'severity']
ordering = ['-created_at']
def get_serializer_class(self):
"""Use simplified serializer for list view"""
if self.action == 'list':
return ComplaintListSerializer
return ComplaintSerializer
def get_queryset(self):
"""Filter complaints based on user role"""
queryset = super().get_queryset().select_related(
'patient', 'hospital', 'department', 'staff',
'assigned_to', 'resolved_by', 'closed_by'
).prefetch_related('attachments', 'updates')
user = self.request.user
# PX Admins see all complaints
if user.is_px_admin():
return queryset
# Hospital Admins see complaints for their hospital
if user.is_hospital_admin() and user.hospital:
return queryset.filter(hospital=user.hospital)
# Department Managers see complaints for their department
if user.is_department_manager() and user.department:
return queryset.filter(department=user.department)
# Others see complaints for their hospital
if user.hospital:
return queryset.filter(hospital=user.hospital)
return queryset.none()
def perform_create(self, serializer):
"""Log complaint creation and trigger resolution satisfaction survey"""
complaint = serializer.save()
AuditService.log_from_request(
event_type='complaint_created',
description=f"Complaint created: {complaint.title}",
request=self.request,
content_object=complaint,
metadata={
'category': complaint.category,
'severity': complaint.severity,
'patient_mrn': complaint.patient.mrn
}
)
# TODO: Optionally create PX Action (Phase 6)
# from apps.complaints.tasks import create_action_from_complaint
# create_action_from_complaint.delay(str(complaint.id))
@action(detail=True, methods=['post'])
def assign(self, request, pk=None):
"""Assign complaint to user"""
complaint = self.get_object()
user_id = request.data.get('user_id')
if not user_id:
return Response(
{'error': 'user_id is required'},
status=status.HTTP_400_BAD_REQUEST
)
from apps.accounts.models import User
try:
user = User.objects.get(id=user_id)
complaint.assigned_to = user
complaint.assigned_at = timezone.now()
complaint.save(update_fields=['assigned_to', 'assigned_at'])
# Create update
ComplaintUpdate.objects.create(
complaint=complaint,
update_type='assignment',
message=f"Assigned to {user.get_full_name()}",
created_by=request.user
)
AuditService.log_from_request(
event_type='assignment',
description=f"Complaint assigned to {user.get_full_name()}",
request=request,
content_object=complaint
)
return Response({'message': 'Complaint assigned successfully'})
except User.DoesNotExist:
return Response(
{'error': 'User not found'},
status=status.HTTP_404_NOT_FOUND
)
@action(detail=True, methods=['post'])
def change_status(self, request, pk=None):
"""Change complaint status"""
complaint = self.get_object()
new_status = request.data.get('status')
note = request.data.get('note', '')
if not new_status:
return Response(
{'error': 'status is required'},
status=status.HTTP_400_BAD_REQUEST
)
old_status = complaint.status
complaint.status = new_status
# Handle status-specific logic
if new_status == 'resolved':
complaint.resolved_at = timezone.now()
complaint.resolved_by = request.user
elif new_status == 'closed':
complaint.closed_at = timezone.now()
complaint.closed_by = request.user
# Trigger resolution satisfaction survey
from apps.complaints.tasks import send_complaint_resolution_survey
send_complaint_resolution_survey.delay(str(complaint.id))
complaint.save()
# Create update
ComplaintUpdate.objects.create(
complaint=complaint,
update_type='status_change',
message=note or f"Status changed from {old_status} to {new_status}",
created_by=request.user,
old_status=old_status,
new_status=new_status
)
AuditService.log_from_request(
event_type='status_change',
description=f"Complaint status changed from {old_status} to {new_status}",
request=request,
content_object=complaint,
metadata={'old_status': old_status, 'new_status': new_status}
)
return Response({'message': 'Status updated successfully'})
@action(detail=True, methods=['post'])
def add_note(self, request, pk=None):
"""Add note to complaint"""
complaint = self.get_object()
note = request.data.get('note')
if not note:
return Response(
{'error': 'note is required'},
status=status.HTTP_400_BAD_REQUEST
)
# Create update
update = ComplaintUpdate.objects.create(
complaint=complaint,
update_type='note',
message=note,
created_by=request.user
)
serializer = ComplaintUpdateSerializer(update)
return Response(serializer.data, status=status.HTTP_201_CREATED)
@action(detail=True, methods=['get'])
def staff_suggestions(self, request, pk=None):
"""
Get staff matching suggestions for a complaint.
Returns potential staff matches from AI analysis,
allowing PX Admins to review and select correct staff.
"""
complaint = self.get_object()
# Check if user is PX Admin
if not request.user.is_px_admin():
return Response(
{'error': 'Only PX Admins can access staff suggestions'},
status=status.HTTP_403_FORBIDDEN
)
# Get AI analysis metadata
ai_analysis = complaint.metadata.get('ai_analysis', {})
staff_matches = ai_analysis.get('staff_matches', [])
extracted_name = ai_analysis.get('extracted_staff_name', '')
needs_review = ai_analysis.get('needs_staff_review', False)
matched_staff_id = ai_analysis.get('matched_staff_id')
return Response({
'extracted_name': extracted_name,
'staff_matches': staff_matches,
'current_staff_id': matched_staff_id,
'needs_staff_review': needs_staff_review,
'staff_match_count': len(staff_matches)
})
@action(detail=True, methods=['get'])
def hospital_staff(self, request, pk=None):
"""
Get all staff from complaint's hospital for manual selection.
Allows PX Admins to manually select staff.
Supports filtering by department.
"""
complaint = self.get_object()
# Check if user is PX Admin
if not request.user.is_px_admin():
return Response(
{'error': 'Only PX Admins can access hospital staff list'},
status=status.HTTP_403_FORBIDDEN
)
from apps.organizations.models import Staff
# Get query params
department_id = request.query_params.get('department_id')
search = request.query_params.get('search', '').strip()
# Build query
queryset = Staff.objects.filter(
hospital=complaint.hospital,
status='active'
).select_related('department')
# Filter by department if specified
if department_id:
queryset = queryset.filter(department_id=department_id)
# Search by name if provided
if search:
queryset = queryset.filter(
Q(first_name__icontains=search) |
Q(last_name__icontains=search) |
Q(first_name_ar__icontains=search) |
Q(last_name_ar__icontains=search) |
Q(job_title__icontains=search)
)
# Order by department and name
queryset = queryset.order_by('department__name', 'first_name', 'last_name')
# Serialize
staff_list = []
for staff in queryset:
staff_list.append({
'id': str(staff.id),
'name_en': f"{staff.first_name} {staff.last_name}",
'name_ar': f"{staff.first_name_ar} {staff.last_name_ar}" if staff.first_name_ar and staff.last_name_ar else "",
'job_title': staff.job_title,
'specialization': staff.specialization,
'department': staff.department.name if staff.department else None,
'department_id': str(staff.department.id) if staff.department else None
})
return Response({
'hospital_id': str(complaint.hospital.id),
'hospital_name': complaint.hospital.name,
'staff_count': len(staff_list),
'staff': staff_list
})
@action(detail=True, methods=['post'])
def assign_staff(self, request, pk=None):
"""
Manually assign staff to a complaint.
Allows PX Admins to assign specific staff member,
especially when AI matching is ambiguous.
"""
complaint = self.get_object()
# Check if user is PX Admin
if not request.user.is_px_admin():
return Response(
{'error': 'Only PX Admins can assign staff to complaints'},
status=status.HTTP_403_FORBIDDEN
)
staff_id = request.data.get('staff_id')
reason = request.data.get('reason', '')
if not staff_id:
return Response(
{'error': 'staff_id is required'},
status=status.HTTP_400_BAD_REQUEST
)
from apps.organizations.models import Staff
try:
staff = Staff.objects.get(id=staff_id)
except Staff.DoesNotExist:
return Response(
{'error': 'Staff not found'},
status=status.HTTP_404_NOT_FOUND
)
# Check staff belongs to same hospital
if staff.hospital != complaint.hospital:
return Response(
{'error': 'Staff does not belong to complaint hospital'},
status=status.HTTP_400_BAD_REQUEST
)
# Update complaint
old_staff_id = str(complaint.staff.id) if complaint.staff else None
complaint.staff = staff
complaint.save(update_fields=['staff'])
# Update metadata to clear review flag
if not complaint.metadata:
complaint.metadata = {}
if 'ai_analysis' in complaint.metadata:
complaint.metadata['ai_analysis']['needs_staff_review'] = False
complaint.metadata['ai_analysis']['staff_manually_assigned'] = True
complaint.metadata['ai_analysis']['staff_assigned_by'] = str(request.user.id)
complaint.metadata['ai_analysis']['staff_assigned_at'] = timezone.now().isoformat()
complaint.metadata['ai_analysis']['staff_assignment_reason'] = reason
complaint.save(update_fields=['metadata'])
# Create update
ComplaintUpdate.objects.create(
complaint=complaint,
update_type='assignment',
message=f"Staff assigned to {staff.first_name} {staff.last_name} ({staff.job_title}). {reason}" if reason else f"Staff assigned to {staff.first_name} {staff.last_name} ({staff.job_title})",
created_by=request.user,
metadata={
'old_staff_id': old_staff_id,
'new_staff_id': str(staff.id),
'manual_assignment': True
}
)
# Log audit
AuditService.log_from_request(
event_type='staff_assigned',
description=f"Staff {staff.first_name} {staff.last_name} manually assigned to complaint by {request.user.get_full_name()}",
request=request,
content_object=complaint,
metadata={
'old_staff_id': old_staff_id,
'new_staff_id': str(staff.id),
'reason': reason
}
)
return Response({
'message': 'Staff assigned successfully',
'staff_id': str(staff.id),
'staff_name': f"{staff.first_name} {staff.last_name}"
})
@action(detail=True, methods=['post'])
def change_department(self, request, pk=None):
"""Change complaint department"""
complaint = self.get_object()
department_id = request.data.get('department_id')
if not department_id:
return Response(
{'error': 'department_id is required'},
status=status.HTTP_400_BAD_REQUEST
)
from apps.organizations.models import Department
try:
department = Department.objects.get(id=department_id)
except Department.DoesNotExist:
return Response(
{'error': 'Department not found'},
status=status.HTTP_404_NOT_FOUND
)
# Check department belongs to same hospital
if department.hospital != complaint.hospital:
return Response(
{'error': 'Department does not belong to complaint hospital'},
status=status.HTTP_400_BAD_REQUEST
)
# Update complaint
old_department_id = str(complaint.department.id) if complaint.department else None
complaint.department = department
complaint.save(update_fields=['department'])
# Create update
ComplaintUpdate.objects.create(
complaint=complaint,
update_type='assignment',
message=f"Department changed to {department.name}",
created_by=request.user,
metadata={
'old_department_id': old_department_id,
'new_department_id': str(department.id)
}
)
# Log audit
AuditService.log_from_request(
event_type='department_change',
description=f"Complaint department changed to {department.name}",
request=request,
content_object=complaint,
metadata={
'old_department_id': old_department_id,
'new_department_id': str(department.id)
}
)
return Response({
'message': 'Department changed successfully',
'department_id': str(department.id),
'department_name': department.name
})
@action(detail=True, methods=['post'])
def create_action_from_ai(self, request, pk=None):
"""Create PX Action from AI-suggested action"""
complaint = self.get_object()
# Check if complaint has suggested action
suggested_action = request.data.get('suggested_action')
if not suggested_action and complaint.metadata and 'ai_analysis' in complaint.metadata:
suggested_action = complaint.metadata['ai_analysis'].get('suggested_action_en')
if not suggested_action:
return Response(
{'error': 'No suggested action available for this complaint'},
status=status.HTTP_400_BAD_REQUEST
)
# Get category (optional - will be auto-mapped from complaint category if not provided)
category = request.data.get('category')
# If category not provided, auto-map from complaint category
if not category:
if complaint.category:
category = map_complaint_category_to_action_category(complaint.category.code)
else:
category = 'other'
# Validate category choice if manually provided
valid_categories = [
'clinical_quality', 'patient_safety', 'service_quality',
'staff_behavior', 'facility', 'process_improvement', 'other'
]
if category not in valid_categories:
return Response(
{'error': f'Invalid category. Valid options: {", ".join(valid_categories)}'},
status=status.HTTP_400_BAD_REQUEST
)
# Get optional assigned_to
assigned_to_id = request.data.get('assigned_to')
assigned_to = None
if assigned_to_id:
from apps.accounts.models import User
try:
assigned_to = User.objects.get(id=assigned_to_id)
except User.DoesNotExist:
return Response(
{'error': 'Assigned user not found'},
status=status.HTTP_404_NOT_FOUND
)
# Create PX Action
from apps.px_action_center.models import PXAction, PXActionLog
from django.contrib.contenttypes.models import ContentType
complaint_content_type = ContentType.objects.get_for_model(Complaint)
action = PXAction.objects.create(
source_type='complaint',
content_type=complaint_content_type,
object_id=complaint.id,
title=f"Action from Complaint: {complaint.title}",
description=suggested_action,
hospital=complaint.hospital,
department=complaint.department,
category=category,
priority=complaint.priority,
severity=complaint.severity,
assigned_to=assigned_to,
status='open',
metadata={
'source_complaint_id': str(complaint.id),
'source_complaint_title': complaint.title,
'ai_generated': True,
'created_from_ai_suggestion': True
}
)
# Create action log entry
PXActionLog.objects.create(
action=action,
log_type='note',
message=f"Action created from AI-suggested action for complaint: {complaint.title}",
created_by=request.user,
metadata={
'complaint_id': str(complaint.id),
'ai_generated': True
}
)
# Create complaint update
ComplaintUpdate.objects.create(
complaint=complaint,
update_type='note',
message=f"PX Action created from AI-suggested action (Action #{action.id})",
created_by=request.user,
metadata={'action_id': str(action.id)}
)
# Log audit
AuditService.log_from_request(
event_type='action_created_from_ai',
description=f"PX Action created from AI-suggested action for complaint: {complaint.title}",
request=request,
content_object=action,
metadata={
'complaint_id': str(complaint.id),
'category': category,
'ai_generated': True
}
)
return Response({
'action_id': str(action.id),
'message': 'Action created successfully from AI-suggested action'
}, status=status.HTTP_201_CREATED)
@action(detail=True, methods=['post'])
def send_notification(self, request, pk=None):
"""
Send email notification to staff member or department head.
Sends complaint notification with AI-generated summary (editable by user).
Logs the operation to NotificationLog and ComplaintUpdate.
Recipient Priority:
1. Staff with user account
2. Staff with email field
3. Department manager
"""
complaint = self.get_object()
# Get email message (required)
email_message = request.data.get('email_message', '').strip()
if not email_message:
return Response(
{'error': 'email_message is required'},
status=status.HTTP_400_BAD_REQUEST
)
# Get additional message (optional)
additional_message = request.data.get('additional_message', '').strip()
# Determine recipient with priority logic
recipient = None
recipient_display = None
recipient_type = None
recipient_email = None
# Priority 1: Staff member with user account
if complaint.staff and complaint.staff.user:
recipient = complaint.staff.user
recipient_display = str(complaint.staff)
recipient_type = 'Staff Member (User Account)'
recipient_email = recipient.email
# Priority 2: Staff member with email field (no user account)
elif complaint.staff and complaint.staff.email:
recipient_display = str(complaint.staff)
recipient_type = 'Staff Member (Email)'
recipient_email = complaint.staff.email
# Priority 3: Department head
elif complaint.department and complaint.department.manager:
recipient = complaint.department.manager
recipient_display = recipient.get_full_name()
recipient_type = 'Department Head'
recipient_email = recipient.email
# Check if we found a recipient with email
if not recipient_email:
return Response(
{'error': 'No valid recipient found. Complaint must have staff with email, or a department manager with email.'},
status=status.HTTP_400_BAD_REQUEST
)
# Construct email content
subject = f"Complaint Notification - #{complaint.id}"
# Build email body
email_body = f"""
Dear {recipient_display},
You have been assigned to review the following complaint:
COMPLAINT DETAILS:
----------------
ID: #{complaint.id}
Title: {complaint.title}
Severity: {complaint.get_severity_display()}
Priority: {complaint.get_priority_display()}
Status: {complaint.get_status_display()}
SUMMARY:
--------
{email_message}
"""
# Add patient info if available
if complaint.patient:
email_body += f"""
PATIENT INFORMATION:
------------------
Name: {complaint.patient.get_full_name()}
MRN: {complaint.patient.mrn}
"""
# Add additional message if provided
if additional_message:
email_body += f"""
ADDITIONAL MESSAGE:
------------------
{additional_message}
"""
# Add link to complaint
from django.contrib.sites.shortcuts import get_current_site
site = get_current_site(request)
complaint_url = f"https://{site.domain}/complaints/{complaint.id}/"
email_body += f"""
To view the full complaint details, please visit:
{complaint_url}
Thank you for your attention to this matter.
---
This is an automated message from PX360 Complaint Management System.
"""
# Send email using NotificationService
from apps.notifications.services import NotificationService
try:
notification_log = NotificationService.send_email(
email=recipient_email,
subject=subject,
message=email_body,
related_object=complaint,
metadata={
'notification_type': 'complaint_notification',
'recipient_type': recipient_type,
'recipient_id': str(recipient.id) if recipient else None,
'sender_id': str(request.user.id),
'has_additional_message': bool(additional_message)
}
)
except Exception as e:
return Response(
{'error': f'Failed to send email: {str(e)}'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
# Create ComplaintUpdate entry
ComplaintUpdate.objects.create(
complaint=complaint,
update_type='communication',
message=f"Email notification sent to {recipient_type}: {recipient_display}",
created_by=request.user,
metadata={
'recipient_type': recipient_type,
'recipient_id': str(recipient.id) if recipient else None,
'notification_log_id': str(notification_log.id) if notification_log else None
}
)
# Log audit
AuditService.log_from_request(
event_type='notification_sent',
description=f"Email notification sent to {recipient_type}: {recipient_display}",
request=request,
content_object=complaint,
metadata={
'recipient_type': recipient_type,
'recipient_id': str(recipient.id) if recipient else None,
'recipient_email': recipient_email
}
)
return Response({
'success': True,
'message': 'Email notification sent successfully',
'recipient': recipient_display,
'recipient_type': recipient_type,
'recipient_email': recipient_email
})
@action(detail=True, methods=['post'])
def request_explanation(self, request, pk=None):
"""
Request explanation from staff/recipient.
Generates a unique token and sends email with secure link.
Token can only be used once.
"""
complaint = self.get_object()
# Check if complaint has staff to request explanation from
if not complaint.staff:
return Response(
{'error': 'Complaint has no staff assigned to request explanation from'},
status=status.HTTP_400_BAD_REQUEST
)
# Check if explanation already exists for this staff
from .models import ComplaintExplanation
existing_explanation = ComplaintExplanation.objects.filter(
complaint=complaint,
staff=complaint.staff
).first()
if existing_explanation and existing_explanation.is_used:
return Response(
{'error': 'This staff member has already submitted an explanation'},
status=status.HTTP_400_BAD_REQUEST
)
# Get optional message
request_message = request.data.get('request_message', '').strip()
# Generate unique token
import secrets
token = secrets.token_urlsafe(32)
# Create or update explanation record
if existing_explanation:
explanation = existing_explanation
explanation.token = token
explanation.is_used = False
explanation.requested_by = request.user
explanation.request_message = request_message
explanation.email_sent_at = timezone.now()
explanation.save()
else:
explanation = ComplaintExplanation.objects.create(
complaint=complaint,
staff=complaint.staff,
token=token,
is_used=False,
submitted_via='email_link',
requested_by=request.user,
request_message=request_message,
email_sent_at=timezone.now()
)
# Send email with explanation link
from django.contrib.sites.shortcuts import get_current_site
from apps.notifications.services import NotificationService
site = get_current_site(request)
explanation_link = f"https://{site.domain}/complaints/{complaint.id}/explain/{token}/"
# Determine recipient email
if complaint.staff.user and complaint.staff.user.email:
recipient_email = complaint.staff.user.email
recipient_display = str(complaint.staff)
elif complaint.staff.email:
recipient_email = complaint.staff.email
recipient_display = str(complaint.staff)
else:
return Response(
{'error': 'Staff member has no email address'},
status=status.HTTP_400_BAD_REQUEST
)
# Build email subject
subject = f"Explanation Request - Complaint #{complaint.id}"
# Build email body
email_body = f"""
Dear {recipient_display},
We have received a complaint that requires your explanation.
COMPLAINT DETAILS:
----------------
Reference: #{complaint.id}
Title: {complaint.title}
Severity: {complaint.get_severity_display()}
Priority: {complaint.get_priority_display()}
{complaint.description}
"""
# Add patient info if available
if complaint.patient:
email_body += f"""
PATIENT INFORMATION:
------------------
Name: {complaint.patient.get_full_name()}
MRN: {complaint.patient.mrn}
"""
# Add request message if provided
if request_message:
email_body += f"""
ADDITIONAL MESSAGE:
------------------
{request_message}
"""
email_body += f"""
SUBMIT YOUR EXPLANATION:
------------------------
Your perspective is important. Please submit your explanation about this complaint:
{explanation_link}
Note: This link can only be used once. After submission, it will expire.
If you have any questions, please contact the PX team.
---
This is an automated message from PX360 Complaint Management System.
"""
# Send email
try:
notification_log = NotificationService.send_email(
email=recipient_email,
subject=subject,
message=email_body,
related_object=complaint,
metadata={
'notification_type': 'explanation_request',
'staff_id': str(complaint.staff.id),
'explanation_id': str(explanation.id),
'requested_by_id': str(request.user.id),
'has_request_message': bool(request_message)
}
)
except Exception as e:
return Response(
{'error': f'Failed to send email: {str(e)}'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
# Create ComplaintUpdate entry
ComplaintUpdate.objects.create(
complaint=complaint,
update_type='communication',
message=f"Explanation request sent to {recipient_display}",
created_by=request.user,
metadata={
'explanation_id': str(explanation.id),
'staff_id': str(complaint.staff.id),
'notification_log_id': str(notification_log.id) if notification_log else None
}
)
# Log audit
AuditService.log_from_request(
event_type='explanation_requested',
description=f"Explanation request sent to {recipient_display}",
request=request,
content_object=complaint,
metadata={
'explanation_id': str(explanation.id),
'staff_id': str(complaint.staff.id),
'request_message': request_message
}
)
return Response({
'success': True,
'message': 'Explanation request sent successfully',
'explanation_id': str(explanation.id),
'recipient': recipient_display,
'explanation_link': explanation_link
})
class ComplaintAttachmentViewSet(viewsets.ModelViewSet):
"""ViewSet for Complaint Attachments"""
queryset = ComplaintAttachment.objects.all()
serializer_class = ComplaintAttachmentSerializer
permission_classes = [IsAuthenticated]
filterset_fields = ['complaint']
ordering = ['-created_at']
def get_queryset(self):
queryset = super().get_queryset().select_related('complaint', 'uploaded_by')
user = self.request.user
# Filter based on complaint access
if user.is_px_admin():
return queryset
if user.is_hospital_admin() and user.hospital:
return queryset.filter(complaint__hospital=user.hospital)
if user.hospital:
return queryset.filter(complaint__hospital=user.hospital)
return queryset.none()
class InquiryViewSet(viewsets.ModelViewSet):
"""ViewSet for Inquiries"""
queryset = Inquiry.objects.all()
serializer_class = InquirySerializer
permission_classes = [IsAuthenticated]
filterset_fields = ['status', 'category', 'hospital', 'department', 'assigned_to', 'hospital__organization']
search_fields = ['subject', 'message', 'contact_name', 'patient__mrn']
ordering_fields = ['created_at']
ordering = ['-created_at']
def get_queryset(self):
"""Filter inquiries based on user role"""
queryset = super().get_queryset().select_related(
'patient', 'hospital', 'department', 'assigned_to', 'responded_by'
)
user = self.request.user
# PX Admins see all inquiries
if user.is_px_admin():
return queryset
# Hospital Admins see inquiries for their hospital
if user.is_hospital_admin() and user.hospital:
return queryset.filter(hospital=user.hospital)
# Department Managers see inquiries for their department
if user.is_department_manager() and user.department:
return queryset.filter(department=user.department)
# Others see inquiries for their hospital
if user.hospital:
return queryset.filter(hospital=user.hospital)
return queryset.none()
@action(detail=True, methods=['post'])
def respond(self, request, pk=None):
"""Respond to inquiry"""
inquiry = self.get_object()
response_text = request.data.get('response')
if not response_text:
return Response(
{'error': 'response is required'},
status=status.HTTP_400_BAD_REQUEST
)
inquiry.response = response_text
inquiry.responded_at = timezone.now()
inquiry.responded_by = request.user
inquiry.status = 'resolved'
inquiry.save()
return Response({'message': 'Response submitted successfully'})
# Public views (no authentication required)
from django.shortcuts import render, redirect, get_object_or_404
from django.http import JsonResponse
def complaint_explanation_form(request, complaint_id, token):
"""
Public-facing form for staff to submit explanation.
This view does NOT require authentication.
Validates token and checks if it's still valid (not used).
"""
from .models import ComplaintExplanation, ExplanationAttachment
from apps.notifications.services import NotificationService
from django.contrib.sites.shortcuts import get_current_site
# Get complaint
complaint = get_object_or_404(Complaint, id=complaint_id)
# Validate token
explanation = get_object_or_404(ComplaintExplanation, complaint=complaint, token=token)
# Check if token is already used
if explanation.is_used:
return render(request, 'complaints/explanation_already_submitted.html', {
'complaint': complaint,
'explanation': explanation
})
if request.method == 'POST':
# Handle form submission
explanation_text = request.POST.get('explanation', '').strip()
if not explanation_text:
return render(request, 'complaints/explanation_form.html', {
'complaint': complaint,
'explanation': explanation,
'error': 'Please provide your explanation.'
})
# Save explanation
explanation.explanation = explanation_text
explanation.is_used = True
explanation.responded_at = timezone.now()
explanation.save()
# Handle file attachments
files = request.FILES.getlist('attachments')
for uploaded_file in files:
ExplanationAttachment.objects.create(
explanation=explanation,
file=uploaded_file,
filename=uploaded_file.name,
file_type=uploaded_file.content_type,
file_size=uploaded_file.size
)
# Notify complaint assignee
if complaint.assigned_to and complaint.assigned_to.email:
site = get_current_site(request)
complaint_url = f"https://{site.domain}/complaints/{complaint.id}/"
subject = f"New Explanation Received - Complaint #{complaint.id}"
email_body = f"""
Dear {complaint.assigned_to.get_full_name()},
A new explanation has been submitted for the following complaint:
COMPLAINT DETAILS:
----------------
Reference: #{complaint.id}
Title: {complaint.title}
Severity: {complaint.get_severity_display()}
EXPLANATION SUBMITTED BY:
------------------------
{explanation.staff}
EXPLANATION:
-----------
{explanation.explanation}
"""
if files:
email_body += f"""
ATTACHMENTS:
------------
{len(files)} file(s) attached
"""
email_body += f"""
To view the complaint and explanation, please visit:
{complaint_url}
---
This is an automated message from PX360 Complaint Management System.
"""
try:
NotificationService.send_email(
email=complaint.assigned_to.email,
subject=subject,
message=email_body,
related_object=complaint,
metadata={
'notification_type': 'explanation_submitted',
'explanation_id': str(explanation.id),
'staff_id': str(explanation.staff.id) if explanation.staff else None
}
)
except Exception as e:
# Log error but don't fail the submission
import logging
logger = logging.getLogger(__name__)
logger.error(f"Failed to send notification email: {e}")
# Create complaint update
ComplaintUpdate.objects.create(
complaint=complaint,
update_type='communication',
message=f"Explanation submitted by {explanation.staff}",
metadata={
'explanation_id': str(explanation.id),
'staff_id': str(explanation.staff.id) if explanation.staff else None
}
)
# Redirect to success page
return render(request, 'complaints/explanation_success.html', {
'complaint': complaint,
'explanation': explanation,
'attachment_count': len(files)
})
# GET request - display form
return render(request, 'complaints/explanation_form.html', {
'complaint': complaint,
'explanation': explanation
})
from django.http import HttpResponse
def generate_complaint_pdf(request, complaint_id):
"""
Generate PDF for a complaint using WeasyPrint.
Creates a professionally styled PDF document with all complaint details
including AI analysis, staff assignment, and resolution information.
"""
# Get complaint
complaint = get_object_or_404(Complaint, id=complaint_id)
# Check permissions
user = request.user
if not user.is_authenticated:
return HttpResponse('Unauthorized', status=401)
# Check if user can view this complaint
can_view = False
if user.is_px_admin():
can_view = True
elif user.is_hospital_admin() and user.hospital == complaint.hospital:
can_view = True
elif user.is_department_manager() and user.department == complaint.department:
can_view = True
elif user.hospital == complaint.hospital:
can_view = True
if not can_view:
return HttpResponse('Forbidden', status=403)
# Render HTML template
from django.template.loader import render_to_string
html_string = render_to_string('complaints/complaint_pdf.html', {
'complaint': complaint,
})
# Generate PDF using WeasyPrint
try:
from weasyprint import HTML
pdf_file = HTML(string=html_string).write_pdf()
# Create response
response = HttpResponse(pdf_file, content_type='application/pdf')
filename = f"complaint_{complaint.id.strftime('%Y%m%d_%H%M%S')}.pdf"
response['Content-Disposition'] = f'attachment; filename="{filename}"'
# Log audit
AuditService.log_from_request(
event_type='pdf_generated',
description=f"PDF generated for complaint: {complaint.title}",
request=request,
content_object=complaint,
metadata={'complaint_id': str(complaint.id)}
)
return response
except ImportError:
return HttpResponse('WeasyPrint is not installed. Please install it to generate PDFs.', status=500)
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"Error generating PDF for complaint {complaint.id}: {e}")
return HttpResponse(f'Error generating PDF: {str(e)}', status=500)