2025-12-24 12:42:31 +03:00

464 lines
16 KiB
Python

"""
PX Action Center UI views - Server-rendered templates for action center console
"""
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.core.paginator import Paginator
from django.db.models import Q, Prefetch
from django.shortcuts import get_object_or_404, redirect, render
from django.utils import timezone
from django.views.decorators.http import require_http_methods
from apps.accounts.models import User
from apps.core.services import AuditService
from apps.organizations.models import Department, Hospital
from .models import (
PXAction,
PXActionAttachment,
PXActionLog,
ActionStatus,
ActionSource,
)
@login_required
def action_list(request):
"""
PX Actions list view with advanced filters and views.
Features:
- Multiple views (All, My Actions, Overdue, Escalated, By Source)
- Server-side pagination
- Advanced filters
- Search functionality
- Bulk actions support
"""
# Base queryset with optimizations
queryset = PXAction.objects.select_related(
'hospital', 'department', 'assigned_to',
'approved_by', 'closed_by', 'content_type'
)
# Apply RBAC filters
user = request.user
if user.is_px_admin():
pass # See all
elif user.is_hospital_admin() and user.hospital:
queryset = queryset.filter(hospital=user.hospital)
elif user.is_department_manager() and user.department:
queryset = queryset.filter(department=user.department)
elif user.hospital:
queryset = queryset.filter(hospital=user.hospital)
else:
queryset = queryset.none()
# View filter (My Actions, Overdue, etc.)
view_filter = request.GET.get('view', 'all')
if view_filter == 'my_actions':
queryset = queryset.filter(assigned_to=user)
elif view_filter == 'overdue':
queryset = queryset.filter(is_overdue=True, status__in=[ActionStatus.OPEN, ActionStatus.IN_PROGRESS])
elif view_filter == 'escalated':
queryset = queryset.filter(escalation_level__gt=0)
elif view_filter == 'pending_approval':
queryset = queryset.filter(status=ActionStatus.PENDING_APPROVAL)
elif view_filter == 'from_surveys':
queryset = queryset.filter(source_type=ActionSource.SURVEY)
elif view_filter == 'from_complaints':
queryset = queryset.filter(source_type__in=[ActionSource.COMPLAINT, ActionSource.COMPLAINT_RESOLUTION])
elif view_filter == 'from_social':
queryset = queryset.filter(source_type=ActionSource.SOCIAL_MEDIA)
# Apply filters from request
status_filter = request.GET.get('status')
if status_filter:
queryset = queryset.filter(status=status_filter)
severity_filter = request.GET.get('severity')
if severity_filter:
queryset = queryset.filter(severity=severity_filter)
priority_filter = request.GET.get('priority')
if priority_filter:
queryset = queryset.filter(priority=priority_filter)
category_filter = request.GET.get('category')
if category_filter:
queryset = queryset.filter(category=category_filter)
source_type_filter = request.GET.get('source_type')
if source_type_filter:
queryset = queryset.filter(source_type=source_type_filter)
hospital_filter = request.GET.get('hospital')
if hospital_filter:
queryset = queryset.filter(hospital_id=hospital_filter)
department_filter = request.GET.get('department')
if department_filter:
queryset = queryset.filter(department_id=department_filter)
assigned_to_filter = request.GET.get('assigned_to')
if assigned_to_filter:
queryset = queryset.filter(assigned_to_id=assigned_to_filter)
# Search
search_query = request.GET.get('search')
if search_query:
queryset = queryset.filter(
Q(title__icontains=search_query) |
Q(description__icontains=search_query)
)
# Date range filters
date_from = request.GET.get('date_from')
if date_from:
queryset = queryset.filter(created_at__gte=date_from)
date_to = request.GET.get('date_to')
if date_to:
queryset = queryset.filter(created_at__lte=date_to)
# Ordering
order_by = request.GET.get('order_by', '-created_at')
queryset = queryset.order_by(order_by)
# Pagination
page_size = int(request.GET.get('page_size', 25))
paginator = Paginator(queryset, page_size)
page_number = request.GET.get('page', 1)
page_obj = paginator.get_page(page_number)
# Get filter options
hospitals = Hospital.objects.filter(status='active')
if not user.is_px_admin() and user.hospital:
hospitals = hospitals.filter(id=user.hospital.id)
departments = Department.objects.filter(status='active')
if not user.is_px_admin() and user.hospital:
departments = departments.filter(hospital=user.hospital)
# Get assignable users
assignable_users = User.objects.filter(is_active=True)
if user.hospital:
assignable_users = assignable_users.filter(hospital=user.hospital)
# Statistics
stats = {
'total': queryset.count(),
'open': queryset.filter(status=ActionStatus.OPEN).count(),
'in_progress': queryset.filter(status=ActionStatus.IN_PROGRESS).count(),
'overdue': queryset.filter(is_overdue=True).count(),
'pending_approval': queryset.filter(status=ActionStatus.PENDING_APPROVAL).count(),
'my_actions': queryset.filter(assigned_to=user).count(),
}
context = {
'page_obj': page_obj,
'actions': page_obj.object_list,
'stats': stats,
'hospitals': hospitals,
'departments': departments,
'assignable_users': assignable_users,
'status_choices': ActionStatus.choices,
'source_choices': ActionSource.choices,
'filters': request.GET,
'current_view': view_filter,
}
return render(request, 'actions/action_list.html', context)
@login_required
def action_detail(request, pk):
"""
PX Action detail view with logs, attachments, and workflow actions.
Features:
- Full action details
- Activity log timeline
- Evidence/attachments management
- SLA tracking with progress bar
- Workflow actions (assign, status change, approve, add note)
- Source object link
"""
action = get_object_or_404(
PXAction.objects.select_related(
'hospital', 'department', 'assigned_to',
'approved_by', 'closed_by', 'content_type'
).prefetch_related(
'logs__created_by',
'attachments__uploaded_by'
),
pk=pk
)
# Check access
user = request.user
if not user.is_px_admin():
if user.is_hospital_admin() and action.hospital != user.hospital:
messages.error(request, "You don't have permission to view this action.")
return redirect('actions:action_list')
elif user.is_department_manager() and action.department != user.department:
messages.error(request, "You don't have permission to view this action.")
return redirect('actions:action_list')
elif user.hospital and action.hospital != user.hospital:
messages.error(request, "You don't have permission to view this action.")
return redirect('actions:action_list')
# Get logs (timeline)
logs = action.logs.all().order_by('-created_at')
# Get attachments
attachments = action.attachments.all().order_by('-created_at')
evidence_attachments = attachments.filter(is_evidence=True)
# Get assignable users
assignable_users = User.objects.filter(is_active=True)
if action.hospital:
assignable_users = assignable_users.filter(hospital=action.hospital)
# Check if overdue
action.check_overdue()
# Calculate SLA progress percentage
if action.created_at and action.due_at:
total_duration = (action.due_at - action.created_at).total_seconds()
elapsed_duration = (timezone.now() - action.created_at).total_seconds()
sla_progress = min(100, int((elapsed_duration / total_duration) * 100)) if total_duration > 0 else 0
else:
sla_progress = 0
context = {
'action': action,
'logs': logs,
'attachments': attachments,
'evidence_attachments': evidence_attachments,
'assignable_users': assignable_users,
'status_choices': ActionStatus.choices,
'sla_progress': sla_progress,
'can_edit': user.is_px_admin() or user.is_hospital_admin() or action.assigned_to == user,
'can_approve': user.is_px_admin(),
}
return render(request, 'actions/action_detail.html', context)
@login_required
@require_http_methods(["POST"])
def action_assign(request, pk):
"""Assign action to user"""
action = get_object_or_404(PXAction, pk=pk)
# Check permission
user = request.user
if not (user.is_px_admin() or user.is_hospital_admin()):
messages.error(request, "You don't have permission to assign actions.")
return redirect('actions:action_detail', pk=pk)
user_id = request.POST.get('user_id')
if not user_id:
messages.error(request, "Please select a user to assign.")
return redirect('actions:action_detail', pk=pk)
try:
assignee = User.objects.get(id=user_id)
action.assigned_to = assignee
action.assigned_at = timezone.now()
action.save(update_fields=['assigned_to', 'assigned_at'])
# Create log
PXActionLog.objects.create(
action=action,
log_type='assignment',
message=f"Assigned to {assignee.get_full_name()}",
created_by=request.user
)
# Audit log
AuditService.log_event(
event_type='assignment',
description=f"Action assigned to {assignee.get_full_name()}",
user=request.user,
content_object=action
)
messages.success(request, f"Action assigned to {assignee.get_full_name()}.")
except User.DoesNotExist:
messages.error(request, "User not found.")
return redirect('actions:action_detail', pk=pk)
@login_required
@require_http_methods(["POST"])
def action_change_status(request, pk):
"""Change action status"""
action = get_object_or_404(PXAction, pk=pk)
# Check permission
user = request.user
if not (user.is_px_admin() or user.is_hospital_admin() or action.assigned_to == user):
messages.error(request, "You don't have permission to change action status.")
return redirect('actions:action_detail', pk=pk)
new_status = request.POST.get('status')
note = request.POST.get('note', '')
if not new_status:
messages.error(request, "Please select a status.")
return redirect('actions:action_detail', pk=pk)
# Validate status transitions
if new_status == ActionStatus.PENDING_APPROVAL:
if action.requires_approval:
evidence_count = action.attachments.filter(is_evidence=True).count()
if evidence_count == 0:
messages.error(request, "Evidence is required before requesting approval.")
return redirect('actions:action_detail', pk=pk)
elif new_status == ActionStatus.APPROVED:
if not user.is_px_admin():
messages.error(request, "Only PX Admins can approve actions.")
return redirect('actions:action_detail', pk=pk)
action.approved_by = user
action.approved_at = timezone.now()
elif new_status == ActionStatus.CLOSED:
action.closed_at = timezone.now()
action.closed_by = user
old_status = action.status
action.status = new_status
action.save()
# Create log
PXActionLog.objects.create(
action=action,
log_type='status_change',
message=note or f"Status changed from {old_status} to {new_status}",
created_by=user,
old_status=old_status,
new_status=new_status
)
# Audit log
AuditService.log_event(
event_type='status_change',
description=f"Action status changed from {old_status} to {new_status}",
user=user,
content_object=action,
metadata={'old_status': old_status, 'new_status': new_status}
)
messages.success(request, f"Action status changed to {new_status}.")
return redirect('actions:action_detail', pk=pk)
@login_required
@require_http_methods(["POST"])
def action_add_note(request, pk):
"""Add note to action"""
action = get_object_or_404(PXAction, pk=pk)
note = request.POST.get('note')
if not note:
messages.error(request, "Please enter a note.")
return redirect('actions:action_detail', pk=pk)
# Create log
PXActionLog.objects.create(
action=action,
log_type='note',
message=note,
created_by=request.user
)
messages.success(request, "Note added successfully.")
return redirect('actions:action_detail', pk=pk)
@login_required
@require_http_methods(["POST"])
def action_escalate(request, pk):
"""Escalate action"""
action = get_object_or_404(PXAction, pk=pk)
# Check permission
user = request.user
if not (user.is_px_admin() or user.is_hospital_admin()):
messages.error(request, "You don't have permission to escalate actions.")
return redirect('actions:action_detail', pk=pk)
reason = request.POST.get('reason', '')
# Increment escalation level
action.escalation_level += 1
action.escalated_at = timezone.now()
action.save(update_fields=['escalation_level', 'escalated_at'])
# Create log
PXActionLog.objects.create(
action=action,
log_type='escalation',
message=f"Action escalated (Level {action.escalation_level}). Reason: {reason}",
created_by=user
)
# Audit log
AuditService.log_event(
event_type='escalation',
description=f"Action escalated to level {action.escalation_level}",
user=user,
content_object=action,
metadata={'reason': reason, 'escalation_level': action.escalation_level}
)
messages.success(request, f"Action escalated to level {action.escalation_level}.")
return redirect('actions:action_detail', pk=pk)
@login_required
@require_http_methods(["POST"])
def action_approve(request, pk):
"""Approve action (PX Admin only)"""
action = get_object_or_404(PXAction, pk=pk)
# Check permission
if not request.user.is_px_admin():
messages.error(request, "Only PX Admins can approve actions.")
return redirect('actions:action_detail', pk=pk)
if action.status != ActionStatus.PENDING_APPROVAL:
messages.error(request, "Action is not pending approval.")
return redirect('actions:action_detail', pk=pk)
# Approve
action.status = ActionStatus.APPROVED
action.approved_by = request.user
action.approved_at = timezone.now()
action.save()
# Create log
PXActionLog.objects.create(
action=action,
log_type='approval',
message=f"Action approved by {request.user.get_full_name()}",
created_by=request.user,
old_status=ActionStatus.PENDING_APPROVAL,
new_status=ActionStatus.APPROVED
)
# Audit log
AuditService.log_event(
event_type='approval',
description="Action approved",
user=request.user,
content_object=action
)
messages.success(request, "Action approved successfully.")
return redirect('actions:action_detail', pk=pk)