fix: harden multi-tenant data isolation across 8 modules

Pre-production security fixes to prevent cross-hospital data leaks:

- Standards API: add get_queryset() filtering by department__hospital
- Reports service: add user param with hospital filtering to all querysets
- RCA views: replace is_superuser with tenant_hospital pattern, add access
  checks to all 11 mutation views
- Notifications views: replace is_superuser patterns with _get_notification_hospital
  helper across all 5 settings functions
- Appreciation API: add tenant_hospital fallback to AppreciationViewSet,
  AppreciationStatsViewSet, and LeaderboardView
- AI Analytics: add tenant_hospital fallback in ExecutiveSummaryGenerator and
  ActionRecommendationEngine
- SourceUserRestrictionMiddleware: remove None from ALLOWED_URL_NAMES
- Complaint export: fix nullable patient/due_at/description crashes in CSV
  and Excel export, fix invalid get_category_display/get_source_display calls

E2E test updates:
- Update isolation gap tests to actively assert hospital filtering
- Fix CSV export test to use API context for download handling
- Switch clinical-staff tests to serial mode to prevent race conditions
This commit is contained in:
ismail 2026-04-07 01:23:10 +03:00
parent 6b51b0870d
commit 23d439f5a5
13 changed files with 3267 additions and 1099 deletions

File diff suppressed because it is too large Load Diff

View File

@ -5,6 +5,7 @@ Analytics Console UI views
from datetime import datetime
from django.contrib.auth.decorators import login_required
from django.core.cache import cache
from django.core.paginator import Paginator
from django.db.models import Avg, Count, F, Q, Value
from django.db.models.functions import Concat
@ -19,6 +20,13 @@ from apps.physicians.models import PhysicianMonthlyRating
from .models import KPI, KPIValue
from .services import UnifiedAnalyticsService, ExportService
from .services.ai_analytics import (
ExecutiveSummaryGenerator,
EarlyWarningSystem,
ComplaintVolumeForecaster,
SLABreachPredictor,
ActionRecommendationEngine,
)
from apps.core.decorators import block_source_user
import json
@ -64,10 +72,18 @@ def analytics_dashboard(request):
user = request.user
# Build cache key based on user and hospital
cache_key = f"analytics_dashboard_{user.id}_{request.GET.get('hospital', 'all')}"
cached = cache.get(cache_key)
if cached:
return render(request, "analytics/dashboard.html", cached)
# Get hospital filter
hospital_filter = request.GET.get("hospital")
if hospital_filter:
hospital = Hospital.objects.filter(id=hospital_filter).first()
elif user.is_px_admin() and hasattr(request, "tenant_hospital") and request.tenant_hospital:
hospital = request.tenant_hospital
elif user.hospital:
hospital = user.hospital
else:
@ -93,8 +109,17 @@ def analytics_dashboard(request):
closed_complaints = complaints_queryset.filter(status="closed").count()
overdue_complaints = complaints_queryset.filter(is_overdue=True).count()
# Complaint sources
complaint_sources = complaints_queryset.values("source").annotate(count=Count("id")).order_by("-count")[:6]
# Complaint source types (internal vs external)
internal_complaints = complaints_queryset.filter(complaint_source_type="internal").count()
external_complaints = complaints_queryset.filter(complaint_source_type="external").count()
# Complaint sources (by PXSource name)
complaint_sources = (
complaints_queryset.filter(source__isnull=False)
.values("source__name_en")
.annotate(count=Count("id"))
.order_by("-count")[:6]
)
# Complaint domains (Level 1)
top_domains = (
@ -112,7 +137,15 @@ def analytics_dashboard(request):
.order_by("-count")[:5]
)
# Complaint severity
# Complaint severity - build explicit counts for template
severity_counts = complaints_queryset.values("severity").annotate(count=Count("id"))
severity_map = {item["severity"]: item["count"] for item in severity_counts}
critical_complaints = severity_map.get("critical", 0)
high_complaints = severity_map.get("high", 0)
medium_complaints = severity_map.get("medium", 0)
low_complaints = severity_map.get("low", 0)
# Severity breakdown for JSON
severity_breakdown = complaints_queryset.values("severity").annotate(count=Count("id")).order_by("-count")
# Status breakdown
@ -125,14 +158,26 @@ def analytics_dashboard(request):
approved_actions = actions_queryset.filter(status="approved").count()
closed_actions = actions_queryset.filter(status="closed").count()
overdue_actions = actions_queryset.filter(is_overdue=True).count()
pending_actions = actions_queryset.filter(status="pending_approval").count()
# Action sources
action_sources = actions_queryset.values("source_type").annotate(count=Count("id")).order_by("-count")[:6]
action_sources = (
actions_queryset.filter(source_type__isnull=False)
.values("source_type")
.annotate(count=Count("id"))
.order_by("-count")[:6]
)
# Action categories
# Action categories - build explicit counts
action_categories = (
actions_queryset.exclude(category="").values("category").annotate(count=Count("id")).order_by("-count")[:5]
)
action_category_map = {item["category"]: item["count"] for item in action_categories}
training_actions = action_category_map.get("training", 0)
process_actions = action_category_map.get("process_improvement", 0)
policy_actions = action_category_map.get("policy", 0)
facility_actions = action_category_map.get("facility", 0)
other_actions = action_category_map.get("other", 0)
# ============ SURVEYS KPIs ============
total_surveys = surveys_queryset.count()
@ -176,15 +221,37 @@ def analytics_dashboard(request):
.order_by("day")
)
# Survey score trend
survey_score_trend = (
surveys_queryset.filter(completed_at__gte=thirty_days_ago)
.annotate(day=TruncDate("completed_at"))
.values("day")
# Survey score trend - last 6 months for chart
six_months_ago = timezone.now() - timedelta(days=180)
survey_score_trend_6m = (
surveys_queryset.filter(completed_at__gte=six_months_ago)
.annotate(month=TruncMonth("completed_at"))
.values("month")
.annotate(avg_score=Avg("total_score"))
.order_by("day")
.order_by("month")
)
# Build survey trend array for last 6 months (pad with zeros if missing)
from calendar import month_name
now = timezone.now()
survey_trend_values = []
survey_trend_labels = []
for i in range(5, -1, -1):
target_month = now.month - i
target_year = now.year
while target_month <= 0:
target_month += 12
target_year -= 1
survey_trend_labels.append(month_name[target_month][:3])
# Find matching data point
found = None
for item in survey_score_trend_6m:
if item["month"].month == target_month and item["month"].year == target_year:
found = round(item["avg_score"], 2) if item["avg_score"] else 0
break
survey_trend_values.append(found if found is not None else 0)
# ============ DEPARTMENT RANKINGS ============
department_rankings = (
Department.objects.filter(status="active")
@ -200,6 +267,39 @@ def analytics_dashboard(request):
.order_by("-avg_score")[:7]
)
# Build department_stats list with resolution rate calculation
department_stats = []
for dept in department_rankings:
dept_complaints = (
complaints_queryset.filter(department=dept).count()
if hospital
else Complaint.objects.filter(department=dept).count()
)
dept_actions = (
actions_queryset.filter(department=dept).count()
if hospital
else PXAction.objects.filter(department=dept).count()
)
dept_resolved = (
complaints_queryset.filter(department=dept, status__in=["resolved", "closed"]).count()
if hospital
else Complaint.objects.filter(department=dept, status__in=["resolved", "closed"]).count()
)
resolution_rate = round((dept_resolved / dept_complaints * 100), 1) if dept_complaints > 0 else 0
department_stats.append(
{
"name_en": dept.name_en if hasattr(dept, "name_en") else str(dept),
"name_ar": dept.name_ar
if hasattr(dept, "name_ar")
else (dept.name_en if hasattr(dept, "name_en") else str(dept)),
"complaints": dept_complaints,
"actions": dept_actions,
"survey_avg": round(dept.avg_score, 2) if dept.avg_score else 0,
"resolution_rate": resolution_rate,
}
)
# ============ TIME-BASED CALCULATIONS ============
# Average resolution time (complaints)
resolved_with_time = complaints_queryset.filter(
@ -256,6 +356,12 @@ def analytics_dashboard(request):
"resolved_complaints": resolved_complaints,
"closed_complaints": closed_complaints,
"overdue_complaints": overdue_complaints,
"internal_complaints": internal_complaints,
"external_complaints": external_complaints,
"critical_complaints": critical_complaints,
"high_complaints": high_complaints,
"medium_complaints": medium_complaints,
"low_complaints": low_complaints,
"avg_resolution_hours": round(avg_resolution_hours, 1),
"sla_compliance": round(sla_compliance, 1),
"total_actions": total_actions,
@ -263,7 +369,13 @@ def analytics_dashboard(request):
"in_progress_actions": in_progress_actions,
"approved_actions": approved_actions,
"closed_actions": closed_actions,
"pending_actions": pending_actions,
"overdue_actions": overdue_actions,
"training_actions": training_actions,
"process_actions": process_actions,
"policy_actions": policy_actions,
"facility_actions": facility_actions,
"other_actions": other_actions,
"avg_action_days": round(avg_action_days, 1),
"total_surveys": total_surveys,
"avg_survey_score": round(avg_survey_score, 2),
@ -274,8 +386,41 @@ def analytics_dashboard(request):
"compliments": compliments,
"suggestions": suggestions,
"avg_rating": round(avg_rating, 2),
"survey_trend_1": survey_trend_values[0] if len(survey_trend_values) > 0 else 0,
"survey_trend_2": survey_trend_values[1] if len(survey_trend_values) > 1 else 0,
"survey_trend_3": survey_trend_values[2] if len(survey_trend_values) > 2 else 0,
"survey_trend_4": survey_trend_values[3] if len(survey_trend_values) > 3 else 0,
"survey_trend_5": survey_trend_values[4] if len(survey_trend_values) > 4 else 0,
"survey_trend_6": survey_trend_values[5] if len(survey_trend_values) > 5 else 0,
}
# ============ AI-POWERED ANALYTICS ============
hospital_id = str(hospital.id) if hospital else None
# Trigger async Celery tasks to refresh cache in background
from .tasks import (
generate_executive_summary_task,
generate_action_recommendations_task,
)
generate_executive_summary_task.delay(user_id=str(user.id), hospital_id=hospital_id, period="30d")
generate_action_recommendations_task.delay(user_id=str(user.id), hospital_id=hospital_id)
# 1. Executive Summary — read from cache (populated by Celery or fallback)
exec_summary = ExecutiveSummaryGenerator.generate(user, hospital_id=hospital_id, period="30d")
# 2. Early Warning System
early_warnings = EarlyWarningSystem.detect(user, hospital_id=hospital_id, limit=5)
# 3. Complaint Volume Forecast
complaint_forecast = ComplaintVolumeForecaster.forecast(user, hospital_id=hospital_id, forecast_days=30)
# 4. SLA Breach Predictions
sla_breach_predictions = SLABreachPredictor.predict(user, hospital_id=hospital_id, limit=10)
# 5. Action Recommendations — read from cache
action_recommendations = ActionRecommendationEngine.generate_recommendations(user, hospital_id=hospital_id, limit=5)
context = {
"kpis": kpis,
"selected_hospital": hospital,
@ -288,15 +433,77 @@ def analytics_dashboard(request):
"action_sources": serialize_queryset_values(action_sources),
"action_categories": serialize_queryset_values(action_categories),
"survey_types": serialize_queryset_values(survey_types),
"survey_score_trend": serialize_queryset_values(survey_score_trend),
"survey_score_trend": serialize_queryset_values(survey_score_trend_6m),
"sentiment_breakdown": serialize_queryset_values(sentiment_breakdown),
"feedback_categories": serialize_queryset_values(feedback_categories),
"department_rankings": department_rankings,
"department_stats": department_stats,
"survey_trend_labels": json.dumps(survey_trend_labels),
# AI-powered features
"exec_summary": exec_summary,
"early_warnings": early_warnings,
"complaint_forecast": complaint_forecast,
"sla_breach_predictions": sla_breach_predictions,
"action_recommendations": action_recommendations,
}
# Clear old cache (the new data isn't in the old cache entries)
cache.delete(cache_key)
return render(request, "analytics/dashboard.html", context)
@block_source_user
@login_required
def refresh_ai_analytics(request):
"""
API endpoint: Trigger async AI analytics refresh and return status.
POST to trigger, GET to check if cache is fresh.
"""
if request.method == "POST":
from .tasks import (
generate_executive_summary_task,
generate_action_recommendations_task,
precompute_dashboard_cache_task,
)
hospital_id = request.POST.get("hospital") or request.GET.get("hospital")
user = request.user
# Trigger async tasks
generate_executive_summary_task.delay(
user_id=str(user.id), hospital_id=hospital_id, period="30d", force_refresh=True
)
generate_action_recommendations_task.delay(user_id=str(user.id), hospital_id=hospital_id)
# Also clear caches so next page load triggers fresh computation
cache.delete(f"exec_summary_{user.id}_{hospital_id}_30d")
cache.delete(f"action_recommendations_{user.id}_{hospital_id}_5")
return JsonResponse(
{"status": "triggered", "message": "AI analytics refresh queued. Results will be available in ~30 seconds."}
)
# GET — check cache freshness
hospital_id = request.GET.get("hospital") or (
str(request.tenant_hospital.id) if hasattr(request, "tenant_hospital") and request.tenant_hospital else None
)
user = request.user
summary_cached = cache.get(f"exec_summary_{user.id}_{hospital_id}_30d")
recommendations_cached = cache.get(f"action_recommendations_{user.id}_{hospital_id}_5")
return JsonResponse(
{
"cached": {
"executive_summary": summary_cached is not None,
"action_recommendations": recommendations_cached is not None,
},
"risk_level": summary_cached.get("risk_level", "unknown") if summary_cached else None,
}
)
@block_source_user
@login_required
def kpi_list(request):
@ -384,10 +591,46 @@ def command_center(request):
custom_end=custom_end,
)
# Initial AI data for server-side render
from .services.ai_analytics import (
ExecutiveSummaryGenerator,
EarlyWarningSystem,
ComplaintVolumeForecaster,
SLABreachPredictor,
ActionRecommendationEngine,
)
hospital_id = filters["hospital"] if filters["hospital"] else None
department_id = filters["department"] if filters["department"] else None
if not hospital_id and user.is_px_admin():
tenant = getattr(request, "tenant_hospital", None)
if tenant:
hospital_id = str(tenant.id)
# Trigger async refresh
from .tasks import generate_executive_summary_task, generate_action_recommendations_task
generate_executive_summary_task.delay(
user_id=str(user.id), hospital_id=hospital_id, department_id=department_id, period=filters["date_range"]
)
generate_action_recommendations_task.delay(
user_id=str(user.id), hospital_id=hospital_id, department_id=department_id
)
context = {
"filters": filters,
"departments": departments,
"kpis": kpis,
"exec_summary": ExecutiveSummaryGenerator.generate(
user, hospital_id=hospital_id, department_id=department_id, period=filters["date_range"]
),
"early_warnings": EarlyWarningSystem.detect(user, hospital_id=hospital_id, limit=5),
"complaint_forecast": ComplaintVolumeForecaster.forecast(user, hospital_id=hospital_id, forecast_days=30),
"sla_breach_predictions": SLABreachPredictor.predict(user, hospital_id=hospital_id, limit=10),
"action_recommendations": ActionRecommendationEngine.generate_recommendations(
user, hospital_id=hospital_id, department_id=department_id, limit=5
),
}
return render(request, "analytics/command_center.html", context)
@ -431,6 +674,11 @@ def command_center_api(request):
# Handle department_id (UUID string)
department_id = department_id if department_id else None
if not hospital_id and user.is_px_admin():
tenant = getattr(request, "tenant_hospital", None)
if tenant:
hospital_id = str(tenant.id)
# Get KPIs
kpis = UnifiedAnalyticsService.get_all_kpis(
user=user,
@ -557,7 +805,45 @@ def command_center_api(request):
for p in physician_data
]
return JsonResponse({"kpis": kpis, "charts": charts, "tables": tables})
# ============ AI-POWERED ANALYTICS ============
from .services.ai_analytics import (
ExecutiveSummaryGenerator,
EarlyWarningSystem,
ComplaintVolumeForecaster,
SLABreachPredictor,
ActionRecommendationEngine,
)
# Trigger async Celery tasks for background refresh
from .tasks import (
generate_executive_summary_task,
generate_action_recommendations_task,
)
generate_executive_summary_task.delay(
user_id=str(user.id),
hospital_id=hospital_id,
department_id=department_id,
period=date_range.replace("d", "") if date_range.endswith("d") else "30d",
)
generate_action_recommendations_task.delay(
user_id=str(user.id), hospital_id=hospital_id, department_id=department_id
)
# AI features — read from cache (populated by Celery precompute or on-demand)
ai_data = {
"executive_summary": ExecutiveSummaryGenerator.generate(
user, hospital_id=hospital_id, department_id=department_id, period=date_range
),
"early_warnings": EarlyWarningSystem.detect(user, hospital_id=hospital_id, limit=5),
"complaint_forecast": ComplaintVolumeForecaster.forecast(user, hospital_id=hospital_id, forecast_days=30),
"sla_breach_predictions": SLABreachPredictor.predict(user, hospital_id=hospital_id, limit=10),
"action_recommendations": ActionRecommendationEngine.generate_recommendations(
user, hospital_id=hospital_id, department_id=department_id, limit=5
),
}
return JsonResponse({"kpis": kpis, "charts": charts, "tables": tables, "ai": ai_data})
@block_source_user

View File

@ -1,6 +1,7 @@
"""
Appreciation views - API views for appreciation management
"""
from django.contrib.contenttypes.models import ContentType
from django.db.models import Count, Q, F
from django.utils import timezone
@ -31,48 +32,49 @@ from apps.accounts.permissions import IsPXAdminOrHospitalAdmin
class AppreciationCategoryViewSet(viewsets.ModelViewSet):
"""Viewset for AppreciationCategory"""
queryset = AppreciationCategory.objects.all()
serializer_class = AppreciationCategorySerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""Filter categories by hospital"""
queryset = super().get_queryset()
# Filter by hospital if provided
hospital_id = self.request.query_params.get('hospital_id')
hospital_id = self.request.query_params.get("hospital_id")
if hospital_id:
queryset = queryset.filter(Q(hospital_id=hospital_id) | Q(hospital__isnull=True))
# Only show active categories
queryset = queryset.filter(is_active=True)
return queryset.select_related('hospital')
return queryset.select_related("hospital")
class AppreciationViewSet(viewsets.ModelViewSet):
"""Viewset for Appreciation"""
queryset = Appreciation.objects.all()
serializer_class = AppreciationSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""Filter appreciations based on user's access"""
user = self.request.user
queryset = super().get_queryset()
# Filter by hospital
if user.hospital:
queryset = queryset.filter(hospital=user.hospital)
hospital = getattr(self.request, "tenant_hospital", None) or user.hospital
if hospital:
queryset = queryset.filter(hospital=hospital)
elif not user.is_px_admin():
queryset = queryset.none()
# Filter by department if user is department manager
if user.department and user.is_department_manager():
queryset = queryset.filter(
Q(department=user.department) | Q(department__isnull=True)
)
queryset = queryset.filter(Q(department=user.department) | Q(department__isnull=True))
# Filter by visibility
# Users can see:
# - All appreciations they sent
@ -80,119 +82,104 @@ class AppreciationViewSet(viewsets.ModelViewSet):
# - Department-level appreciations if they're in the department
# - Hospital-level appreciations if they're in the hospital
# - Public appreciations
from apps.appreciation.models import AppreciationVisibility
# Get user's content type
user_content_type = ContentType.objects.get_for_model(user)
# Get staff if user has a staff profile
staff = None
if hasattr(user, 'staff_profile'):
if hasattr(user, "staff_profile"):
staff = user.staff_profile
staff_content_type = ContentType.objects.get_for_model(type(staff))
# Build visibility filter
visibility_filter = (
Q(sender=user) | # Sent by user
Q(
recipient_content_type=user_content_type,
recipient_object_id=user.id
) # Received by user
Q(sender=user) # Sent by user
| Q(recipient_content_type=user_content_type, recipient_object_id=user.id) # Received by user
)
if staff:
visibility_filter |= Q(
recipient_content_type=staff_content_type,
recipient_object_id=staff.id
recipient_content_type=staff_content_type, recipient_object_id=staff.id
) # Received by staff
if user.department:
visibility_filter |= Q(
visibility=AppreciationVisibility.DEPARTMENT,
department=user.department
)
if user.hospital:
visibility_filter |= Q(
visibility=AppreciationVisibility.HOSPITAL,
hospital=user.hospital
)
visibility_filter |= Q(visibility=AppreciationVisibility.DEPARTMENT, department=user.department)
if hospital:
visibility_filter |= Q(visibility=AppreciationVisibility.HOSPITAL, hospital=hospital)
visibility_filter |= Q(visibility=AppreciationVisibility.PUBLIC)
queryset = queryset.filter(visibility_filter)
# Filter by recipient
recipient_type = self.request.query_params.get('recipient_type')
recipient_id = self.request.query_params.get('recipient_id')
recipient_type = self.request.query_params.get("recipient_type")
recipient_id = self.request.query_params.get("recipient_id")
if recipient_type and recipient_id:
if recipient_type == 'user':
content_type = ContentType.objects.get_for_model(
self.request.user.__class__
)
queryset = queryset.filter(
recipient_content_type=content_type,
recipient_object_id=recipient_id
)
elif recipient_type == 'staff':
if recipient_type == "user":
content_type = ContentType.objects.get_for_model(self.request.user.__class__)
queryset = queryset.filter(recipient_content_type=content_type, recipient_object_id=recipient_id)
elif recipient_type == "staff":
from apps.organizations.models import Staff
content_type = ContentType.objects.get_for_model(Staff)
queryset = queryset.filter(
recipient_content_type=content_type,
recipient_object_id=recipient_id
)
queryset = queryset.filter(recipient_content_type=content_type, recipient_object_id=recipient_id)
# Filter by status
status_filter = self.request.query_params.get('status')
status_filter = self.request.query_params.get("status")
if status_filter:
queryset = queryset.filter(status=status_filter)
# Filter by category
category_id = self.request.query_params.get('category_id')
category_id = self.request.query_params.get("category_id")
if category_id:
queryset = queryset.filter(category_id=category_id)
return queryset.select_related(
'sender', 'hospital', 'department', 'category'
).prefetch_related('recipient')
return queryset.select_related("sender", "hospital", "department", "category").prefetch_related("recipient")
def create(self, request, *args, **kwargs):
"""Create a new appreciation"""
serializer = AppreciationCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
# Get validated data
data = serializer.validated_data
# Get recipient
recipient_type = data['recipient_type']
recipient_id = data['recipient_id']
if recipient_type == 'user':
recipient_type = data["recipient_type"]
recipient_id = data["recipient_id"]
if recipient_type == "user":
from apps.accounts.models import User
recipient = User.objects.get(id=recipient_id)
content_type = ContentType.objects.get_for_model(User)
else: # staff
from apps.organizations.models import Staff
recipient = Staff.objects.get(id=recipient_id)
content_type = ContentType.objects.get_for_model(Staff)
# Get hospital
from apps.organizations.models import Hospital
hospital = Hospital.objects.get(id=data['hospital_id'])
hospital = Hospital.objects.get(id=data["hospital_id"])
# Get department
department = None
if data.get('department_id'):
if data.get("department_id"):
from apps.organizations.models import Department
department = Department.objects.get(id=data['department_id'])
department = Department.objects.get(id=data["department_id"])
# Get category
category = None
if data.get('category_id'):
category = AppreciationCategory.objects.get(id=data['category_id'])
if data.get("category_id"):
category = AppreciationCategory.objects.get(id=data["category_id"])
# Create appreciation
appreciation = Appreciation.objects.create(
sender=request.user,
@ -201,134 +188,123 @@ class AppreciationViewSet(viewsets.ModelViewSet):
hospital=hospital,
department=department,
category=category,
message_en=data['message_en'],
message_ar=data.get('message_ar', ''),
visibility=data['visibility'],
is_anonymous=data['is_anonymous'],
message_en=data["message_en"],
message_ar=data.get("message_ar", ""),
visibility=data["visibility"],
is_anonymous=data["is_anonymous"],
)
# Send appreciation
appreciation.send()
# Serialize and return
serializer = AppreciationSerializer(appreciation)
return Response(serializer.data, status=status.HTTP_201_CREATED)
@action(detail=True, methods=['post'])
@action(detail=True, methods=["post"])
def acknowledge(self, request, pk=None):
"""Acknowledge an appreciation"""
appreciation = self.get_object()
# Check if user is the recipient
user_content_type = ContentType.objects.get_for_model(request.user)
if not (
appreciation.recipient_content_type == user_content_type and
appreciation.recipient_object_id == request.user.id
appreciation.recipient_content_type == user_content_type
and appreciation.recipient_object_id == request.user.id
):
return Response(
{'error': 'You can only acknowledge appreciations sent to you'},
status=status.HTTP_403_FORBIDDEN
{"error": "You can only acknowledge appreciations sent to you"}, status=status.HTTP_403_FORBIDDEN
)
# Acknowledge
appreciation.acknowledge()
# Serialize and return
serializer = AppreciationSerializer(appreciation)
return Response(serializer.data)
@action(detail=False, methods=['get'])
@action(detail=False, methods=["get"])
def my_appreciations(self, request):
"""Get appreciations for the current user"""
# Get user's appreciations
user_content_type = ContentType.objects.get_for_model(request.user)
# Check if user has staff profile
staff = None
if hasattr(request.user, 'staff_profile'):
if hasattr(request.user, "staff_profile"):
staff = request.user.staff_profile
# Build query
queryset = self.get_queryset().filter(
Q(
recipient_content_type=user_content_type,
recipient_object_id=request.user.id
)
Q(recipient_content_type=user_content_type, recipient_object_id=request.user.id)
)
if staff:
staff_content_type = ContentType.objects.get_for_model(type(staff))
queryset |= self.get_queryset().filter(
recipient_content_type=staff_content_type,
recipient_object_id=staff.id
recipient_content_type=staff_content_type, recipient_object_id=staff.id
)
# Paginate
page = self.paginate_queryset(queryset)
if page is not None:
serializer = AppreciationSerializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = AppreciationSerializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
@action(detail=False, methods=["get"])
def sent_by_me(self, request):
"""Get appreciations sent by the current user"""
queryset = self.get_queryset().filter(sender=request.user)
# Paginate
page = self.paginate_queryset(queryset)
if page is not None:
serializer = AppreciationSerializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = AppreciationSerializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
@action(detail=False, methods=["get"])
def summary(self, request):
"""Get appreciation summary for the current user"""
# Get user's content type
user_content_type = ContentType.objects.get_for_model(request.user)
# Get current year and month
now = timezone.now()
current_year = now.year
current_month = now.month
# Count total received
total_received = Appreciation.objects.filter(
recipient_content_type=user_content_type,
recipient_object_id=request.user.id
recipient_content_type=user_content_type, recipient_object_id=request.user.id
).count()
# Count total sent
total_sent = Appreciation.objects.filter(
sender=request.user
).count()
total_sent = Appreciation.objects.filter(sender=request.user).count()
# Count this month received
this_month_received = Appreciation.objects.filter(
recipient_content_type=user_content_type,
recipient_object_id=request.user.id,
sent_at__year=current_year,
sent_at__month=current_month
sent_at__month=current_month,
).count()
# Count this month sent
this_month_sent = Appreciation.objects.filter(
sender=request.user,
sent_at__year=current_year,
sent_at__month=current_month
sender=request.user, sent_at__year=current_year, sent_at__month=current_month
).count()
# Get badges earned
badges_earned = UserBadge.objects.filter(
recipient_content_type=user_content_type,
recipient_object_id=request.user.id
recipient_content_type=user_content_type, recipient_object_id=request.user.id
).count()
# Get hospital rank
hospital_rank = None
if request.user.hospital:
@ -337,143 +313,141 @@ class AppreciationViewSet(viewsets.ModelViewSet):
recipient_content_type=user_content_type,
recipient_object_id=request.user.id,
year=current_year,
month=current_month
month=current_month,
).first()
if stats:
hospital_rank = stats.hospital_rank
# Get top category
top_category = None
if total_received > 0:
top_category_obj = Appreciation.objects.filter(
recipient_content_type=user_content_type,
recipient_object_id=request.user.id
).values('category__name_en', 'category__icon', 'category__color').annotate(
count=Count('id')
).order_by('-count').first()
if top_category_obj and top_category_obj['category__name_en']:
top_category_obj = (
Appreciation.objects.filter(
recipient_content_type=user_content_type, recipient_object_id=request.user.id
)
.values("category__name_en", "category__icon", "category__color")
.annotate(count=Count("id"))
.order_by("-count")
.first()
)
if top_category_obj and top_category_obj["category__name_en"]:
top_category = {
'name': top_category_obj['category__name_en'],
'icon': top_category_obj['category__icon'],
'color': top_category_obj['category__color'],
'count': top_category_obj['count']
"name": top_category_obj["category__name_en"],
"icon": top_category_obj["category__icon"],
"color": top_category_obj["category__color"],
"count": top_category_obj["count"],
}
# Build response
summary = {
'total_received': total_received,
'total_sent': total_sent,
'this_month_received': this_month_received,
'this_month_sent': this_month_sent,
'top_category': top_category,
'badges_earned': badges_earned,
'hospital_rank': hospital_rank,
"total_received": total_received,
"total_sent": total_sent,
"this_month_received": this_month_received,
"this_month_sent": this_month_sent,
"top_category": top_category,
"badges_earned": badges_earned,
"hospital_rank": hospital_rank,
}
serializer = AppreciationSummarySerializer(summary)
return Response(serializer.data)
class AppreciationStatsViewSet(viewsets.ReadOnlyModelViewSet):
"""Viewset for AppreciationStats"""
queryset = AppreciationStats.objects.all()
serializer_class = AppreciationStatsSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""Filter stats based on user's access"""
user = self.request.user
queryset = super().get_queryset()
# Filter by hospital
if user.hospital:
queryset = queryset.filter(hospital=user.hospital)
hospital = getattr(self.request, "tenant_hospital", None) or user.hospital
if hospital:
queryset = queryset.filter(hospital=hospital)
elif not user.is_px_admin():
queryset = queryset.none()
# Filter by year and month
year = self.request.query_params.get('year')
year = self.request.query_params.get("year")
if year:
queryset = queryset.filter(year=int(year))
month = self.request.query_params.get('month')
month = self.request.query_params.get("month")
if month:
queryset = queryset.filter(month=int(month))
return queryset.select_related('hospital', 'department')
return queryset.select_related("hospital", "department")
class AppreciationBadgeViewSet(viewsets.ModelViewSet):
"""Viewset for AppreciationBadge"""
queryset = AppreciationBadge.objects.all()
serializer_class = AppreciationBadgeSerializer
permission_classes = [IsAuthenticated, IsPXAdminOrHospitalAdmin]
def get_queryset(self):
"""Filter badges by hospital"""
queryset = super().get_queryset()
# Filter by hospital if provided
hospital_id = self.request.query_params.get('hospital_id')
hospital_id = self.request.query_params.get("hospital_id")
if hospital_id:
queryset = queryset.filter(Q(hospital_id=hospital_id) | Q(hospital__isnull=True))
# Only show active badges
queryset = queryset.filter(is_active=True)
return queryset.select_related('hospital')
return queryset.select_related("hospital")
class UserBadgeViewSet(viewsets.ReadOnlyModelViewSet):
"""Viewset for UserBadge"""
queryset = UserBadge.objects.all()
serializer_class = UserBadgeSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""Filter badges based on user's access"""
user = self.request.user
queryset = super().get_queryset()
# Get user's content type
user_content_type = ContentType.objects.get_for_model(user)
# Filter by user or user's staff profile
staff = None
if hasattr(user, 'staff_profile'):
if hasattr(user, "staff_profile"):
staff = user.staff_profile
staff_content_type = ContentType.objects.get_for_model(type(staff))
queryset = queryset.filter(
Q(
recipient_content_type=user_content_type,
recipient_object_id=user.id
)
)
queryset = queryset.filter(Q(recipient_content_type=user_content_type, recipient_object_id=user.id))
if staff:
queryset |= queryset.filter(
recipient_content_type=staff_content_type,
recipient_object_id=staff.id
)
return queryset.select_related('badge')
queryset |= queryset.filter(recipient_content_type=staff_content_type, recipient_object_id=staff.id)
return queryset.select_related("badge")
class LeaderboardView(generics.ListAPIView):
"""View for appreciation leaderboard"""
serializer_class = AppreciationLeaderboardSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""Build leaderboard"""
# Get filters
year = self.request.query_params.get('year')
month = self.request.query_params.get('month')
year = self.request.query_params.get("year")
month = self.request.query_params.get("month")
# Default to current month
if not year or not month:
now = timezone.now()
@ -482,49 +456,51 @@ class LeaderboardView(generics.ListAPIView):
else:
year = int(year)
month = int(month)
# Get hospital from request user
user = self.request.user
if not user.hospital:
hospital = getattr(self.request, "tenant_hospital", None) or user.hospital
if not hospital and not user.is_px_admin():
return []
# Get stats for the period
stats = AppreciationStats.objects.filter(
hospital=user.hospital,
year=year,
month=month,
received_count__gt=0
).order_by('-received_count')
stats_qs = AppreciationStats.objects.filter(year=year, month=month, received_count__gt=0)
if hospital:
stats_qs = stats_qs.filter(hospital=hospital)
stats = stats_qs.order_by("-received_count")
# Build leaderboard
leaderboard = []
for rank, stat in enumerate(stats, start=1):
recipient_name = stat.get_recipient_name()
recipient_type = stat.recipient_content_type.model if stat.recipient_content_type else 'unknown'
recipient_type = stat.recipient_content_type.model if stat.recipient_content_type else "unknown"
# Get badges for recipient
badges = []
user_badges = UserBadge.objects.filter(
recipient_content_type=stat.recipient_content_type,
recipient_object_id=stat.recipient_object_id
).select_related('badge')
recipient_content_type=stat.recipient_content_type, recipient_object_id=stat.recipient_object_id
).select_related("badge")
for user_badge in user_badges:
badges.append({
'name': user_badge.badge.name_en,
'icon': user_badge.badge.icon,
'color': user_badge.badge.color,
})
leaderboard.append({
'rank': rank,
'recipient_type': recipient_type,
'recipient_id': stat.recipient_object_id,
'recipient_name': recipient_name,
'hospital': stat.hospital.name,
'department': stat.department.name if stat.department else None,
'received_count': stat.received_count,
'badges': badges,
})
badges.append(
{
"name": user_badge.badge.name_en,
"icon": user_badge.badge.icon,
"color": user_badge.badge.color,
}
)
leaderboard.append(
{
"rank": rank,
"recipient_type": recipient_type,
"recipient_id": stat.recipient_object_id,
"recipient_name": recipient_name,
"hospital": stat.hospital.name,
"department": stat.department.name if stat.department else None,
"received_count": stat.received_count,
"badges": badges,
}
)
return leaderboard

View File

@ -65,22 +65,22 @@ def export_complaints_csv(queryset, filters=None):
[
str(complaint.id)[:8],
complaint.title,
complaint.patient.get_full_name(),
complaint.patient.mrn,
complaint.hospital.name_en,
complaint.department.name_en if complaint.department else "",
complaint.get_category_display(),
complaint.patient.get_full_name() if complaint.patient else "",
complaint.patient.mrn if complaint.patient else "",
complaint.hospital.name,
complaint.department.name if complaint.department else "",
str(complaint.category) if complaint.category else "",
complaint.get_severity_display(),
complaint.get_priority_display(),
complaint.get_status_display(),
complaint.get_source_display(),
complaint.get_complaint_source_type_display(),
complaint.assigned_to.get_full_name() if complaint.assigned_to else "",
complaint.created_at.strftime("%Y-%m-%d %H:%M:%S"),
complaint.due_at.strftime("%Y-%m-%d %H:%M:%S"),
complaint.due_at.strftime("%Y-%m-%d %H:%M:%S") if complaint.due_at else "",
"Yes" if complaint.is_overdue else "No",
complaint.resolved_at.strftime("%Y-%m-%d %H:%M:%S") if complaint.resolved_at else "",
complaint.closed_at.strftime("%Y-%m-%d %H:%M:%S") if complaint.closed_at else "",
complaint.description[:500],
complaint.description[:500] if complaint.description else "",
]
)
@ -139,18 +139,20 @@ def export_complaints_excel(queryset, filters=None):
for row_num, complaint in enumerate(queryset, 2):
ws.cell(row=row_num, column=1, value=str(complaint.id)[:8])
ws.cell(row=row_num, column=2, value=complaint.title)
ws.cell(row=row_num, column=3, value=complaint.patient.get_full_name())
ws.cell(row=row_num, column=4, value=complaint.patient.mrn)
ws.cell(row=row_num, column=5, value=complaint.hospital.name_en)
ws.cell(row=row_num, column=6, value=complaint.department.name_en if complaint.department else "")
ws.cell(row=row_num, column=7, value=complaint.get_category_display())
ws.cell(row=row_num, column=3, value=complaint.patient.get_full_name() if complaint.patient else "")
ws.cell(row=row_num, column=4, value=complaint.patient.mrn if complaint.patient else "")
ws.cell(row=row_num, column=5, value=complaint.hospital.name)
ws.cell(row=row_num, column=6, value=complaint.department.name if complaint.department else "")
ws.cell(row=row_num, column=7, value=str(complaint.category) if complaint.category else "")
ws.cell(row=row_num, column=8, value=complaint.get_severity_display())
ws.cell(row=row_num, column=9, value=complaint.get_priority_display())
ws.cell(row=row_num, column=10, value=complaint.get_status_display())
ws.cell(row=row_num, column=11, value=complaint.get_source_display())
ws.cell(row=row_num, column=11, value=complaint.get_complaint_source_type_display())
ws.cell(row=row_num, column=12, value=complaint.assigned_to.get_full_name() if complaint.assigned_to else "")
ws.cell(row=row_num, column=13, value=complaint.created_at.strftime("%Y-%m-%d %H:%M:%S"))
ws.cell(row=row_num, column=14, value=complaint.due_at.strftime("%Y-%m-%d %H:%M:%S"))
ws.cell(
row=row_num, column=14, value=complaint.due_at.strftime("%Y-%m-%d %H:%M:%S") if complaint.due_at else ""
)
ws.cell(row=row_num, column=15, value="Yes" if complaint.is_overdue else "No")
ws.cell(
row=row_num,
@ -162,7 +164,7 @@ def export_complaints_excel(queryset, filters=None):
column=17,
value=complaint.closed_at.strftime("%Y-%m-%d %H:%M:%S") if complaint.closed_at else "",
)
ws.cell(row=row_num, column=18, value=complaint.description[:500])
ws.cell(row=row_num, column=18, value=complaint.description[:500] if complaint.description else "")
# Auto-adjust column widths
for column in ws.columns:

View File

@ -9,10 +9,26 @@ from django.contrib.auth.decorators import login_required
from django.core.exceptions import PermissionDenied
from django.core.paginator import Paginator
from django.shortcuts import render, redirect, get_object_or_404
from django.http import JsonResponse
from django.http import JsonResponse, HttpResponse
from django.utils.translation import gettext_lazy as _
from django.views.decorators.http import require_POST
def _get_notification_hospital(request, hospital_id=None):
"""Resolve hospital for notification views, using tenant_hospital for PX Admins."""
if request.user.is_superuser and hospital_id:
return get_object_or_404(Hospital, id=hospital_id)
if request.user.is_px_admin():
tenant = getattr(request, "tenant_hospital", None)
if tenant:
return tenant
if hospital_id:
return get_object_or_404(Hospital, id=hospital_id)
if request.user.hospital:
return request.user.hospital
return None
from apps.organizations.models import Hospital
from .settings_models import HospitalNotificationSettings, NotificationSettingsLog
@ -40,12 +56,13 @@ def notification_settings_view(request, hospital_id=None):
if not can_manage_notifications(request.user):
raise PermissionDenied("You do not have permission to manage notification settings.")
# Get hospital - if superuser, can view any; otherwise only their hospital
if request.user.is_superuser and hospital_id:
hospital = get_object_or_404(Hospital, id=hospital_id)
else:
hospital = request.user.hospital
hospital_id = hospital.id
# Get hospital
hospital = _get_notification_hospital(request, hospital_id)
if not hospital:
if request.user.is_px_admin():
return redirect("core:select_hospital")
return redirect("core:no_hospital")
hospital_id = hospital.id
# Get or create settings
settings = HospitalNotificationSettings.get_for_hospital(hospital_id)
@ -279,10 +296,12 @@ def notification_settings_update(request, hospital_id=None):
raise PermissionDenied("You do not have permission to manage notification settings.")
# Get hospital
if request.user.is_superuser and hospital_id:
hospital = get_object_or_404(Hospital, id=hospital_id)
else:
hospital = request.user.hospital
hospital = _get_notification_hospital(request, hospital_id)
if not hospital:
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
return JsonResponse({"success": False, "error": "No hospital assigned"}, status=400)
messages.error(request, "No hospital assigned. Please contact your administrator.")
return redirect("analytics:command_center")
settings = HospitalNotificationSettings.get_for_hospital(hospital.id)
@ -342,10 +361,10 @@ def update_quiet_hours(request, hospital_id=None):
if not can_manage_notifications(request.user):
raise PermissionDenied("You do not have permission to manage notification settings.")
if request.user.is_superuser and hospital_id:
hospital = get_object_or_404(Hospital, id=hospital_id)
else:
hospital = request.user.hospital
hospital = _get_notification_hospital(request, hospital_id)
if not hospital:
messages.error(request, "No hospital assigned. Please contact your administrator.")
return redirect("analytics:command_center")
settings = HospitalNotificationSettings.get_for_hospital(hospital.id)
@ -367,10 +386,10 @@ def test_notification(request, hospital_id=None):
if not can_manage_notifications(request.user):
raise PermissionDenied("You do not have permission to manage notification settings.")
if request.user.is_superuser and hospital_id:
hospital = get_object_or_404(Hospital, id=hospital_id)
else:
hospital = request.user.hospital
hospital = _get_notification_hospital(request, hospital_id)
if not hospital:
messages.error(request, "No hospital assigned. Please contact your administrator.")
return redirect("analytics:command_center")
settings = HospitalNotificationSettings.get_for_hospital(hospital.id)
channel = request.POST.get("channel", "email")
@ -410,10 +429,9 @@ def notification_settings_api(request, hospital_id=None):
API endpoint to get current notification settings as JSON.
Useful for AJAX updates and mobile apps.
"""
if request.user.is_superuser and hospital_id:
hospital = get_object_or_404(Hospital, id=hospital_id)
else:
hospital = request.user.hospital
hospital = _get_notification_hospital(request, hospital_id)
if not hospital:
return JsonResponse({"error": "No hospital assigned"}, status=400)
settings = HospitalNotificationSettings.get_for_hospital(hospital.id)

View File

@ -4,6 +4,7 @@ Middleware for PX Source User access restriction.
Provides global route-level protection to ensure source users
can only access their designated pages.
"""
from django.urls import resolve
from django.shortcuts import redirect
from django.contrib import messages
@ -16,68 +17,66 @@ class SourceUserRestrictionMiddleware(MiddlewareMixin):
1. /px-sources/* pages (their dashboard, complaints, inquiries)
2. Password change page
3. Logout
ALL other routes are BLOCKED.
"""
# URL path prefixes that source users CAN access (whitelist)
ALLOWED_PATH_PREFIXES = [
'/px-sources/', # Source user portal
"/px-sources/", # Source user portal
]
# Specific URL names that source users CAN access
ALLOWED_URL_NAMES = {
# Password change
'accounts:password_change',
'accounts:password_change_done',
"accounts:password_change",
"accounts:password_change_done",
# Settings (limited)
'accounts:settings',
"accounts:settings",
# Logout
'accounts:logout',
"accounts:logout",
# Login (for redirect after logout)
'accounts:login',
# Static files (for CSS/JS)
None, # Static files don't have URL names
"accounts:login",
}
# Explicitly blocked paths (even if they match allowed prefixes)
BLOCKED_PATHS = [
'/px-sources/new/',
'/px-sources/create/',
'/px-sources/<uuid:pk>/edit/',
'/px-sources/<uuid:pk>/delete/',
'/px-sources/<uuid:pk>/toggle/',
'/px-sources/ajax/',
'/px-sources/api/',
"/px-sources/new/",
"/px-sources/create/",
"/px-sources/<uuid:pk>/edit/",
"/px-sources/<uuid:pk>/delete/",
"/px-sources/<uuid:pk>/toggle/",
"/px-sources/ajax/",
"/px-sources/api/",
]
def process_request(self, request):
# Skip for unauthenticated users
if not request.user.is_authenticated:
return None
# Skip for superusers
if request.user.is_superuser:
return None
# Check if user is a source user
if not self._is_source_user(request.user):
return None
# Source user detected - apply strict restrictions
path = request.path
# Get current route name
try:
resolver = resolve(path)
route_name = f"{resolver.namespace}:{resolver.url_name}" if resolver.namespace else resolver.url_name
except:
route_name = None
# Check if URL name is explicitly allowed
if route_name in self.ALLOWED_URL_NAMES:
return None
# Check if path starts with allowed prefixes
for prefix in self.ALLOWED_PATH_PREFIXES:
if path.startswith(prefix):
@ -87,59 +86,54 @@ class SourceUserRestrictionMiddleware(MiddlewareMixin):
return self._block_access(request)
# Path is allowed
return None
# Check for static/media files (allow these)
if path.startswith('/static/') or path.startswith('/media/'):
if path.startswith("/static/") or path.startswith("/media/"):
return None
# Check for i18n URLs
if path.startswith('/i18n/'):
if path.startswith("/i18n/"):
return None
# Everything else is BLOCKED for source users
return self._block_access(request)
def _is_source_user(self, user):
"""Check if user is an active source user."""
if not hasattr(user, 'source_user_profile'):
return False
source_user = user.source_user_profile
return source_user.is_active
"""Check if user is a source user via Django Group membership."""
return user.is_source_user()
def _block_access(self, request):
"""Block access and redirect to source user dashboard."""
return redirect('px_sources:source_user_dashboard')
return redirect("px_sources:source_user_dashboard")
class SourceUserSessionMiddleware(MiddlewareMixin):
"""
Middleware to set shorter session timeout for source users.
Source users have limited access, so their sessions expire faster
for security purposes.
"""
SOURCE_USER_SESSION_TIMEOUT = 3600 # 1 hour
NORMAL_SESSION_TIMEOUT = 1209600 # 2 weeks
def process_request(self, request):
if not request.user.is_authenticated:
return None
if self._is_source_user(request.user):
# Set shorter session for source users
request.session.set_expiry(self.SOURCE_USER_SESSION_TIMEOUT)
else:
# Normal session for other users
request.session.set_expiry(self.NORMAL_SESSION_TIMEOUT)
return None
def _is_source_user(self, user):
"""Check if user is an active source user."""
if not hasattr(user, 'source_user_profile'):
if not hasattr(user, "is_source_user"):
return False
source_user = user.source_user_profile
return source_user.is_active
return user.is_source_user()

View File

@ -1,6 +1,7 @@
"""
RCA (Root Cause Analysis) views
"""
from django.contrib import messages
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.contenttypes.models import ContentType
@ -9,6 +10,7 @@ from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse_lazy
from django.utils import timezone
from django.http import PermissionDenied
from django.views import View
from django.views.generic import (
CreateView,
@ -18,6 +20,20 @@ from django.views.generic import (
UpdateView,
)
def _check_rca_access(request, rca):
user = request.user
if user.is_superuser:
return
if user.is_px_admin():
tenant = getattr(request, "tenant_hospital", None)
if tenant and rca.hospital_id == tenant.id:
return
elif user.hospital and rca.hospital_id == user.hospital.id:
return
raise PermissionDenied("You don't have access to this RCA.")
from .forms import (
RCAAttachmentForm,
RCAClosureForm,
@ -39,32 +55,27 @@ from .models import (
class RCAListView(LoginRequiredMixin, ListView):
"""List view for Root Cause Analyses"""
model = RootCauseAnalysis
template_name = 'rca/rca_list.html'
context_object_name = 'rcas'
template_name = "rca/rca_list.html"
context_object_name = "rcas"
paginate_by = 20
def get_queryset(self):
queryset = RootCauseAnalysis.objects.filter(
is_deleted=False
).select_related(
'hospital',
'department',
'assigned_to',
'created_by'
).prefetch_related(
'root_causes',
'corrective_actions'
queryset = (
RootCauseAnalysis.objects.filter(is_deleted=False)
.select_related("hospital", "department", "assigned_to", "created_by")
.prefetch_related("root_causes", "corrective_actions")
)
# Get filter parameters
status = self.request.GET.get('status')
severity = self.request.GET.get('severity')
priority = self.request.GET.get('priority')
hospital = self.request.GET.get('hospital')
search = self.request.GET.get('search')
date_from = self.request.GET.get('date_from')
date_to = self.request.GET.get('date_to')
status = self.request.GET.get("status")
severity = self.request.GET.get("severity")
priority = self.request.GET.get("priority")
hospital = self.request.GET.get("hospital")
search = self.request.GET.get("search")
date_from = self.request.GET.get("date_from")
date_to = self.request.GET.get("date_to")
# Apply filters
if status:
@ -76,114 +87,103 @@ class RCAListView(LoginRequiredMixin, ListView):
if hospital:
queryset = queryset.filter(hospital_id=hospital)
if search:
queryset = queryset.filter(
Q(title__icontains=search) |
Q(description__icontains=search)
)
queryset = queryset.filter(Q(title__icontains=search) | Q(description__icontains=search))
if date_from:
queryset = queryset.filter(created_at__gte=date_from)
if date_to:
queryset = queryset.filter(created_at__lte=date_to)
# Filter by user's hospital (if not admin)
if not self.request.user.is_superuser:
from apps.organizations.models import Hospital
user_hospitals = Hospital.objects.filter(
staff__user=self.request.user
)
queryset = queryset.filter(hospital__in=user_hospitals)
user = self.request.user
if user.is_px_admin():
tenant = getattr(self.request, "tenant_hospital", None)
if tenant:
queryset = queryset.filter(hospital=tenant)
elif user.hospital:
queryset = queryset.filter(hospital=user.hospital)
else:
queryset = queryset.none()
return queryset.order_by('-created_at')
return queryset.order_by("-created_at")
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
# Add filter form
from .forms import RCAFilterForm
context['filter_form'] = RCAFilterForm(self.request.GET)
context["filter_form"] = RCAFilterForm(self.request.GET)
# Add counts
context['total_count'] = self.get_queryset().count()
context['draft_count'] = self.get_queryset().filter(
status=RCAStatus.DRAFT
).count()
context['in_progress_count'] = self.get_queryset().filter(
status=RCAStatus.IN_PROGRESS
).count()
context['review_count'] = self.get_queryset().filter(
status=RCAStatus.REVIEW
).count()
context['approved_count'] = self.get_queryset().filter(
status=RCAStatus.APPROVED
).count()
context['closed_count'] = self.get_queryset().filter(
status=RCAStatus.CLOSED
).count()
context["total_count"] = self.get_queryset().count()
context["draft_count"] = self.get_queryset().filter(status=RCAStatus.DRAFT).count()
context["in_progress_count"] = self.get_queryset().filter(status=RCAStatus.IN_PROGRESS).count()
context["review_count"] = self.get_queryset().filter(status=RCAStatus.REVIEW).count()
context["approved_count"] = self.get_queryset().filter(status=RCAStatus.APPROVED).count()
context["closed_count"] = self.get_queryset().filter(status=RCAStatus.CLOSED).count()
return context
class RCADetailView(LoginRequiredMixin, DetailView):
"""Detail view for Root Cause Analysis"""
model = RootCauseAnalysis
template_name = 'rca/rca_detail.html'
context_object_name = 'rca'
template_name = "rca/rca_detail.html"
context_object_name = "rca"
def get_queryset(self):
return RootCauseAnalysis.objects.filter(
is_deleted=False
).select_related(
'hospital',
'department',
'assigned_to',
'created_by',
'approved_by',
'closed_by'
).prefetch_related(
'root_causes__verified_by',
'corrective_actions__root_cause',
'attachments',
'notes__created_by',
'status_logs__changed_by'
return (
RootCauseAnalysis.objects.filter(is_deleted=False)
.select_related("hospital", "department", "assigned_to", "created_by", "approved_by", "closed_by")
.prefetch_related(
"root_causes__verified_by",
"corrective_actions__root_cause",
"attachments",
"notes__created_by",
"status_logs__changed_by",
)
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
def get_queryset(self):
queryset = super().get_queryset()
user = self.request.user
if user.is_px_admin():
tenant = getattr(self.request, "tenant_hospital", None)
if tenant:
return queryset.filter(hospital=tenant)
elif user.hospital:
return queryset.filter(hospital=user.hospital)
return queryset.none()
# Add forms
context['root_cause_form'] = RCARootCauseForm()
context['corrective_action_form'] = RCACorrectiveActionForm(
rca=self.object
)
context['attachment_form'] = RCAAttachmentForm()
context['note_form'] = RCANoteForm()
context['status_change_form'] = RCAStatusChangeForm()
context['approval_form'] = RCAApprovalForm()
context['closure_form'] = RCAClosureForm()
context["root_cause_form"] = RCARootCauseForm()
context["corrective_action_form"] = RCACorrectiveActionForm(rca=self.object)
context["attachment_form"] = RCAAttachmentForm()
context["note_form"] = RCANoteForm()
context["status_change_form"] = RCAStatusChangeForm()
context["approval_form"] = RCAApprovalForm()
context["closure_form"] = RCAClosureForm()
# Calculate progress
total_actions = self.object.corrective_actions.count()
completed_actions = self.object.corrective_actions.filter(
status=RCAActionStatus.COMPLETED
).count()
context['progress_percentage'] = (
(completed_actions / total_actions * 100)
if total_actions > 0 else 0
)
completed_actions = self.object.corrective_actions.filter(status=RCAActionStatus.COMPLETED).count()
context["progress_percentage"] = (completed_actions / total_actions * 100) if total_actions > 0 else 0
return context
class RCACreateView(LoginRequiredMixin, CreateView):
"""Create view for Root Cause Analysis"""
model = RootCauseAnalysis
form_class = RootCauseAnalysisForm
template_name = 'rca/rca_form.html'
success_url = reverse_lazy('rca:rca_list')
template_name = "rca/rca_form.html"
success_url = reverse_lazy("rca:rca_list")
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
kwargs["user"] = self.request.user
return kwargs
def form_valid(self, form):
@ -191,8 +191,8 @@ class RCACreateView(LoginRequiredMixin, CreateView):
rca.created_by = self.request.user
# Handle linking to related item (if provided)
related_model = self.request.POST.get('related_model')
related_id = self.request.POST.get('related_id')
related_model = self.request.POST.get("related_model")
related_id = self.request.POST.get("related_id")
if related_model and related_id:
try:
@ -205,40 +205,30 @@ class RCACreateView(LoginRequiredMixin, CreateView):
rca.save()
# Create status log
rca.status_logs.create(
old_status='',
new_status=rca.status,
changed_by=self.request.user,
notes='RCA created'
)
rca.status_logs.create(old_status="", new_status=rca.status, changed_by=self.request.user, notes="RCA created")
messages.success(
self.request,
'Root Cause Analysis created successfully!'
)
return redirect('rca:rca_detail', pk=rca.pk)
messages.success(self.request, "Root Cause Analysis created successfully!")
return redirect("rca:rca_detail", pk=rca.pk)
class RCAUpdateView(LoginRequiredMixin, UpdateView):
"""Update view for Root Cause Analysis"""
model = RootCauseAnalysis
form_class = RootCauseAnalysisForm
template_name = 'rca/rca_form.html'
template_name = "rca/rca_form.html"
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
kwargs["user"] = self.request.user
return kwargs
def get_success_url(self):
return reverse_lazy('rca:rca_detail', kwargs={'pk': self.object.pk})
return reverse_lazy("rca:rca_detail", kwargs={"pk": self.object.pk})
def form_valid(self, form):
rca = form.save()
messages.success(
self.request,
'Root Cause Analysis updated successfully!'
)
messages.success(self.request, "Root Cause Analysis updated successfully!")
return redirect(self.get_success_url())
@ -247,12 +237,10 @@ class RCADeleteView(LoginRequiredMixin, View):
def post(self, request, pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
_check_rca_access(request, rca)
rca.soft_delete(user=request.user)
messages.success(
request,
'Root Cause Analysis deleted successfully!'
)
return redirect('rca:rca_list')
messages.success(request, "Root Cause Analysis deleted successfully!")
return redirect("rca:rca_list")
class RCAStatusChangeView(LoginRequiredMixin, View):
@ -260,12 +248,13 @@ class RCAStatusChangeView(LoginRequiredMixin, View):
def post(self, request, pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
_check_rca_access(request, rca)
form = RCAStatusChangeForm(request.POST)
if form.is_valid():
old_status = rca.status
new_status = form.cleaned_data['new_status']
notes = form.cleaned_data['notes']
new_status = form.cleaned_data["new_status"]
notes = form.cleaned_data["notes"]
rca.status = new_status
@ -280,40 +269,28 @@ class RCAStatusChangeView(LoginRequiredMixin, View):
rca.save()
# Create status log
rca.status_logs.create(
old_status=old_status,
new_status=new_status,
changed_by=request.user,
notes=notes
)
rca.status_logs.create(old_status=old_status, new_status=new_status, changed_by=request.user, notes=notes)
messages.success(
request,
f'Status changed from {old_status} to {new_status}'
)
messages.success(request, f"Status changed from {old_status} to {new_status}")
else:
messages.error(request, 'Invalid status change')
messages.error(request, "Invalid status change")
return redirect('rca:rca_detail', pk=rca.pk)
return redirect("rca:rca_detail", pk=rca.pk)
class RCAApprovalView(LoginRequiredMixin, View):
"""View to approve RCA"""
def post(self, request, pk):
rca = get_object_or_404(
RootCauseAnalysis,
pk=pk,
is_deleted=False,
status=RCAStatus.REVIEW
)
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False, status=RCAStatus.REVIEW)
_check_rca_access(request, rca)
form = RCAApprovalForm(request.POST)
if form.is_valid():
rca.status = RCAStatus.APPROVED
rca.approved_by = request.user
rca.approved_at = timezone.now()
rca.approval_notes = form.cleaned_data['approval_notes']
rca.approval_notes = form.cleaned_data["approval_notes"]
rca.save()
# Create status log
@ -321,14 +298,14 @@ class RCAApprovalView(LoginRequiredMixin, View):
old_status=RCAStatus.REVIEW,
new_status=RCAStatus.APPROVED,
changed_by=request.user,
notes=rca.approval_notes
notes=rca.approval_notes,
)
messages.success(request, 'RCA approved successfully!')
messages.success(request, "RCA approved successfully!")
else:
messages.error(request, 'Invalid approval data')
messages.error(request, "Invalid approval data")
return redirect('rca:rca_detail', pk=rca.pk)
return redirect("rca:rca_detail", pk=rca.pk)
class RCAClosureView(LoginRequiredMixin, View):
@ -336,21 +313,17 @@ class RCAClosureView(LoginRequiredMixin, View):
def post(self, request, pk):
rca = get_object_or_404(
RootCauseAnalysis,
pk=pk,
is_deleted=False,
status__in=[RCAStatus.APPROVED, RCAStatus.IN_PROGRESS]
RootCauseAnalysis, pk=pk, is_deleted=False, status__in=[RCAStatus.APPROVED, RCAStatus.IN_PROGRESS]
)
_check_rca_access(request, rca)
form = RCAClosureForm(request.POST)
if form.is_valid():
rca.status = RCAStatus.CLOSED
rca.closed_by = request.user
rca.closed_at = timezone.now()
rca.closure_notes = form.cleaned_data['closure_notes']
rca.actual_completion_date = form.cleaned_data[
'actual_completion_date'
]
rca.closure_notes = form.cleaned_data["closure_notes"]
rca.actual_completion_date = form.cleaned_data["actual_completion_date"]
rca.save()
# Create status log
@ -358,14 +331,14 @@ class RCAClosureView(LoginRequiredMixin, View):
old_status=RCAStatus.APPROVED,
new_status=RCAStatus.CLOSED,
changed_by=request.user,
notes=rca.closure_notes
notes=rca.closure_notes,
)
messages.success(request, 'RCA closed successfully!')
messages.success(request, "RCA closed successfully!")
else:
messages.error(request, 'Invalid closure data')
messages.error(request, "Invalid closure data")
return redirect('rca:rca_detail', pk=rca.pk)
return redirect("rca:rca_detail", pk=rca.pk)
class RCARootCauseCreateView(LoginRequiredMixin, View):
@ -373,20 +346,18 @@ class RCARootCauseCreateView(LoginRequiredMixin, View):
def post(self, request, pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
_check_rca_access(request, rca)
form = RCARootCauseForm(request.POST)
if form.is_valid():
root_cause = form.save(commit=False)
root_cause.rca = rca
root_cause.save()
messages.success(
request,
'Root cause added successfully!'
)
messages.success(request, "Root cause added successfully!")
else:
messages.error(request, 'Invalid root cause data')
messages.error(request, "Invalid root cause data")
return redirect('rca:rca_detail', pk=rca.pk)
return redirect("rca:rca_detail", pk=rca.pk)
class RCARootCauseDeleteView(LoginRequiredMixin, View):
@ -394,14 +365,11 @@ class RCARootCauseDeleteView(LoginRequiredMixin, View):
def post(self, request, pk, root_cause_pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
root_cause = get_object_or_404(
RCARootCause,
pk=root_cause_pk,
rca=rca
)
_check_rca_access(request, rca)
root_cause = get_object_or_404(RCARootCause, pk=root_cause_pk, rca=rca)
root_cause.delete()
messages.success(request, 'Root cause deleted successfully!')
return redirect('rca:rca_detail', pk=rca.pk)
messages.success(request, "Root cause deleted successfully!")
return redirect("rca:rca_detail", pk=rca.pk)
class RCACorrectiveActionCreateView(LoginRequiredMixin, View):
@ -409,20 +377,18 @@ class RCACorrectiveActionCreateView(LoginRequiredMixin, View):
def post(self, request, pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
_check_rca_access(request, rca)
form = RCACorrectiveActionForm(request.POST, rca=rca)
if form.is_valid():
action = form.save(commit=False)
action.rca = rca
action.save()
messages.success(
request,
'Corrective action added successfully!'
)
messages.success(request, "Corrective action added successfully!")
else:
messages.error(request, 'Invalid corrective action data')
messages.error(request, "Invalid corrective action data")
return redirect('rca:rca_detail', pk=rca.pk)
return redirect("rca:rca_detail", pk=rca.pk)
class RCACorrectiveActionDeleteView(LoginRequiredMixin, View):
@ -430,17 +396,11 @@ class RCACorrectiveActionDeleteView(LoginRequiredMixin, View):
def post(self, request, pk, action_pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
action = get_object_or_404(
RCACorrectiveAction,
pk=action_pk,
rca=rca
)
_check_rca_access(request, rca)
action = get_object_or_404(RCACorrectiveAction, pk=action_pk, rca=rca)
action.delete()
messages.success(
request,
'Corrective action deleted successfully!'
)
return redirect('rca:rca_detail', pk=rca.pk)
messages.success(request, "Corrective action deleted successfully!")
return redirect("rca:rca_detail", pk=rca.pk)
class RCAAttachmentCreateView(LoginRequiredMixin, View):
@ -448,24 +408,22 @@ class RCAAttachmentCreateView(LoginRequiredMixin, View):
def post(self, request, pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
_check_rca_access(request, rca)
form = RCAAttachmentForm(request.POST, request.FILES)
if form.is_valid():
attachment = form.save(commit=False)
attachment.rca = rca
attachment.uploaded_by = request.user
attachment.filename = request.FILES['file'].name
attachment.file_type = request.FILES['file'].content_type
attachment.file_size = request.FILES['file'].size
attachment.filename = request.FILES["file"].name
attachment.file_type = request.FILES["file"].content_type
attachment.file_size = request.FILES["file"].size
attachment.save()
messages.success(
request,
'Attachment added successfully!'
)
messages.success(request, "Attachment added successfully!")
else:
messages.error(request, 'Invalid attachment data')
messages.error(request, "Invalid attachment data")
return redirect('rca:rca_detail', pk=rca.pk)
return redirect("rca:rca_detail", pk=rca.pk)
class RCANoteCreateView(LoginRequiredMixin, View):
@ -473,6 +431,7 @@ class RCANoteCreateView(LoginRequiredMixin, View):
def post(self, request, pk):
rca = get_object_or_404(RootCauseAnalysis, pk=pk, is_deleted=False)
_check_rca_access(request, rca)
form = RCANoteForm(request.POST)
if form.is_valid():
@ -480,8 +439,8 @@ class RCANoteCreateView(LoginRequiredMixin, View):
note.rca = rca
note.created_by = request.user
note.save()
messages.success(request, 'Note added successfully!')
messages.success(request, "Note added successfully!")
else:
messages.error(request, 'Invalid note data')
messages.error(request, "Invalid note data")
return redirect('rca:rca_detail', pk=rca.pk)
return redirect("rca:rca_detail", pk=rca.pk)

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,7 @@ Report Builder UI Views - Simplified Version
Handles the visual report builder interface, saved reports,
and exports. No chart functionality.
"""
import json
from django.contrib.auth.decorators import login_required
from django.contrib import messages
@ -15,10 +16,7 @@ from django.utils import timezone
from django.core.paginator import Paginator
from apps.organizations.models import Department, Hospital
from .models import (
SavedReport, GeneratedReport, ReportTemplate,
DataSource, ReportFormat
)
from .models import SavedReport, GeneratedReport, ReportTemplate, DataSource, ReportFormat
from .services import ReportBuilderService, ReportExportService
@ -27,7 +25,7 @@ from .services import ReportBuilderService, ReportExportService
def report_builder(request):
"""
Visual report builder interface.
Allows creating custom reports with:
- Data source selection
- Dynamic filters
@ -35,403 +33,408 @@ def report_builder(request):
- Chart configuration
"""
user = request.user
# Get hospitals for filter
hospitals = Hospital.objects.filter(status='active')
if not user.is_px_admin() and user.hospital:
hospitals = hospitals.filter(id=user.hospital.id)
hospital = getattr(request, "tenant_hospital", None) or user.hospital
hospitals = Hospital.objects.filter(status="active")
if not user.is_px_admin() and hospital:
hospitals = hospitals.filter(id=hospital.id)
# Get saved reports
saved_reports = SavedReport.objects.filter(
created_by=user
).order_by('-created_at')[:10]
saved_reports = SavedReport.objects.filter(created_by=user).order_by("-created_at")[:10]
context = {
'hospitals': hospitals,
'saved_reports': saved_reports,
'data_sources': DataSource.choices,
"hospitals": hospitals,
"saved_reports": saved_reports,
"data_sources": DataSource.choices,
}
return render(request, 'reports/report_builder.html', context)
return render(request, "reports/report_builder.html", context)
@login_required
def report_preview_api(request):
"""
API endpoint to preview report data.
Returns JSON with:
- Report data rows
- Summary statistics
- Chart data
"""
if request.method != 'POST':
return JsonResponse({'error': 'POST required'}, status=405)
if request.method != "POST":
return JsonResponse({"error": "POST required"}, status=405)
try:
data = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
data_source = data.get('data_source', 'complaints')
filter_config = data.get('filter_config', {})
column_config = data.get('column_config', [])
grouping_config = data.get('grouping_config', {})
chart_config = data.get('chart_config', {})
sort_config = data.get('sort_config', [])
return JsonResponse({"error": "Invalid JSON"}, status=400)
data_source = data.get("data_source", "complaints")
filter_config = data.get("filter_config", {})
column_config = data.get("column_config", [])
grouping_config = data.get("grouping_config", {})
chart_config = data.get("chart_config", {})
sort_config = data.get("sort_config", [])
# Apply user's hospital restriction
user = request.user
if not user.is_px_admin() and user.hospital:
filter_config['hospital'] = str(user.hospital.id)
hospital = getattr(request, "tenant_hospital", None) or user.hospital
if not user.is_px_admin() and hospital:
filter_config["hospital"] = str(hospital.id)
# Generate report data
report_data = ReportBuilderService.generate_report_data(
data_source=data_source,
filter_config=filter_config,
column_config=column_config,
grouping_config=grouping_config,
sort_config=sort_config
sort_config=sort_config,
user=user,
)
# Generate summary
summary = ReportBuilderService.generate_summary(data_source, filter_config)
return JsonResponse({
'success': True,
'data': report_data,
'summary': summary,
})
summary = ReportBuilderService.generate_summary(data_source, filter_config, user=user)
return JsonResponse(
{
"success": True,
"data": report_data,
"summary": summary,
}
)
@login_required
def save_report(request):
"""Save a report configuration."""
if request.method != 'POST':
return JsonResponse({'error': 'POST required'}, status=405)
if request.method != "POST":
return JsonResponse({"error": "POST required"}, status=405)
try:
data = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
report_id = data.get('id')
return JsonResponse({"error": "Invalid JSON"}, status=400)
report_id = data.get("id")
if report_id:
# Update existing report
report = get_object_or_404(SavedReport, id=report_id, created_by=request.user)
report.name = data.get('name', report.name)
report.description = data.get('description', report.description)
report.data_source = data.get('data_source', report.data_source)
report.filter_config = data.get('filter_config', report.filter_config)
report.column_config = data.get('column_config', report.column_config)
report.grouping_config = data.get('grouping_config', report.grouping_config)
report.sort_config = data.get('sort_config', report.sort_config)
report.is_shared = data.get('is_shared', report.is_shared)
report.name = data.get("name", report.name)
report.description = data.get("description", report.description)
report.data_source = data.get("data_source", report.data_source)
report.filter_config = data.get("filter_config", report.filter_config)
report.column_config = data.get("column_config", report.column_config)
report.grouping_config = data.get("grouping_config", report.grouping_config)
report.sort_config = data.get("sort_config", report.sort_config)
report.is_shared = data.get("is_shared", report.is_shared)
report.save()
else:
# Create new report
report = SavedReport.objects.create(
name=data.get('name', 'Untitled Report'),
description=data.get('description', ''),
data_source=data.get('data_source', 'complaints'),
filter_config=data.get('filter_config', {}),
column_config=data.get('column_config', []),
grouping_config=data.get('grouping_config', {}),
sort_config=data.get('sort_config', []),
is_shared=data.get('is_shared', False),
name=data.get("name", "Untitled Report"),
description=data.get("description", ""),
data_source=data.get("data_source", "complaints"),
filter_config=data.get("filter_config", {}),
column_config=data.get("column_config", []),
grouping_config=data.get("grouping_config", {}),
sort_config=data.get("sort_config", []),
is_shared=data.get("is_shared", False),
created_by=request.user,
hospital=request.user.hospital,
)
return JsonResponse({
'success': True,
'report_id': str(report.id),
'message': 'Report saved successfully'
})
return JsonResponse({"success": True, "report_id": str(report.id), "message": "Report saved successfully"})
@login_required
def saved_reports_list(request):
"""List all saved reports."""
user = request.user
# Get user's reports and shared reports
queryset = SavedReport.objects.filter(
created_by=user
) | SavedReport.objects.filter(
is_shared=True,
hospital=user.hospital
)
hospital = getattr(request, "tenant_hospital", None) or user.hospital
queryset = SavedReport.objects.filter(created_by=user)
if hospital:
queryset = queryset | SavedReport.objects.filter(is_shared=True, hospital=hospital)
# Remove duplicates and order
queryset = queryset.distinct().order_by('-created_at')
queryset = queryset.distinct().order_by("-created_at")
# Filter by data source
data_source = request.GET.get('data_source')
data_source = request.GET.get("data_source")
if data_source:
queryset = queryset.filter(data_source=data_source)
# Search
search = request.GET.get('search', '')
search = request.GET.get("search", "")
if search:
queryset = queryset.filter(name__icontains=search)
# Pagination
paginator = Paginator(queryset, 25)
page_number = request.GET.get('page', 1)
page_number = request.GET.get("page", 1)
page_obj = paginator.get_page(page_number)
context = {
'page_obj': page_obj,
'reports': page_obj.object_list,
'data_sources': DataSource.choices,
'search': search,
'selected_source': data_source,
"page_obj": page_obj,
"reports": page_obj.object_list,
"data_sources": DataSource.choices,
"search": search,
"selected_source": data_source,
}
return render(request, 'reports/saved_reports.html', context)
return render(request, "reports/saved_reports.html", context)
@login_required
def report_detail(request, report_id):
"""View a saved report with live data."""
user = request.user
report = get_object_or_404(SavedReport, id=report_id)
# Check access
if report.created_by != user and not (report.is_shared and report.hospital == user.hospital):
hospital = getattr(request, "tenant_hospital", None) or user.hospital
if report.created_by != user and not (report.is_shared and report.hospital == hospital):
if not user.is_px_admin():
messages.error(request, "You don't have access to this report.")
return redirect('reports:saved_reports')
return redirect("reports:saved_reports")
# Apply user's hospital restriction
filter_config = report.filter_config.copy()
if not user.is_px_admin() and user.hospital:
filter_config['hospital'] = str(user.hospital.id)
if not user.is_px_admin() and hospital:
filter_config["hospital"] = str(hospital.id)
# Generate report data
report_data = ReportBuilderService.generate_report_data(
data_source=report.data_source,
filter_config=filter_config,
column_config=report.column_config,
grouping_config=report.grouping_config,
sort_config=report.sort_config
sort_config=report.sort_config,
user=user,
)
# Generate summary
summary = ReportBuilderService.generate_summary(report.data_source, filter_config)
summary = ReportBuilderService.generate_summary(report.data_source, filter_config, user=user)
# Update last run
report.last_run_at = timezone.now()
report.last_run_count = len(report_data.get('rows', []))
report.save(update_fields=['last_run_at', 'last_run_count'])
report.last_run_count = len(report_data.get("rows", []))
report.save(update_fields=["last_run_at", "last_run_count"])
context = {
'report': report,
'data': report_data,
'summary': summary,
'source_fields': ReportBuilderService.SOURCE_FIELDS.get(report.data_source, {}),
"report": report,
"data": report_data,
"summary": summary,
"source_fields": ReportBuilderService.SOURCE_FIELDS.get(report.data_source, {}),
}
return render(request, 'reports/report_detail.html', context)
return render(request, "reports/report_detail.html", context)
@login_required
def delete_report(request, report_id):
"""Delete a saved report."""
report = get_object_or_404(SavedReport, id=report_id, created_by=request.user)
if request.method == 'POST':
if request.method == "POST":
report.delete()
messages.success(request, 'Report deleted successfully.')
return redirect('reports:saved_reports')
return render(request, 'reports/report_confirm_delete.html', {'report': report})
messages.success(request, "Report deleted successfully.")
return redirect("reports:saved_reports")
return render(request, "reports/report_confirm_delete.html", {"report": report})
@login_required
def export_report(request, report_id, export_format):
"""Export a report to Excel, PDF, or CSV."""
user = request.user
report = get_object_or_404(SavedReport, id=report_id)
# Check access
if report.created_by != user and not (report.is_shared and report.hospital == user.hospital):
hospital = getattr(request, "tenant_hospital", None) or user.hospital
if report.created_by != user and not (report.is_shared and report.hospital == hospital):
if not user.is_px_admin():
messages.error(request, "You don't have access to this report.")
return redirect('reports:saved_reports')
return redirect("reports:saved_reports")
# Apply user's hospital restriction
filter_config = report.filter_config.copy()
if not user.is_px_admin() and user.hospital:
filter_config['hospital'] = str(user.hospital.id)
if not user.is_px_admin() and hospital:
filter_config["hospital"] = str(hospital.id)
# Generate report data
report_data = ReportBuilderService.generate_report_data(
data_source=report.data_source,
filter_config=filter_config,
column_config=report.column_config,
grouping_config=report.grouping_config,
sort_config=report.sort_config
sort_config=report.sort_config,
user=user,
)
rows = report_data.get('rows', [])
columns = report_data.get('columns', [])
column_keys = report_data.get('column_keys', columns) # Use keys if available, fallback to labels
rows = report_data.get("rows", [])
columns = report_data.get("columns", [])
column_keys = report_data.get("column_keys", columns) # Use keys if available, fallback to labels
# Generate filename
filename = f"{report.name.replace(' ', '_')}_{timezone.now().strftime('%Y%m%d')}"
# Export based on format
if export_format == 'csv':
if export_format == "csv":
return ReportExportService.export_to_csv(rows, columns, column_keys, filename)
elif export_format == 'excel':
elif export_format == "excel":
return ReportExportService.export_to_excel(rows, columns, column_keys, filename)
elif export_format == 'pdf':
elif export_format == "pdf":
return ReportExportService.export_to_pdf(rows, columns, column_keys, report.name, filename)
else:
messages.error(request, f'Unsupported export format: {export_format}')
return redirect('reports:report_detail', report_id=report_id)
messages.error(request, f"Unsupported export format: {export_format}")
return redirect("reports:report_detail", report_id=report_id)
@login_required
def report_templates(request):
"""List available report templates."""
templates = ReportTemplate.objects.filter(is_active=True).order_by('category', 'sort_order', 'name')
templates = ReportTemplate.objects.filter(is_active=True).order_by("category", "sort_order", "name")
# Group by category
categories = {}
for template in templates:
cat = template.category or 'General'
cat = template.category or "General"
if cat not in categories:
categories[cat] = []
categories[cat].append(template)
context = {
'categories': categories,
'templates': templates,
"categories": categories,
"templates": templates,
}
return render(request, 'reports/report_templates.html', context)
return render(request, "reports/report_templates.html", context)
@login_required
def use_template(request, template_id):
"""Create a report from a template."""
template = get_object_or_404(ReportTemplate, id=template_id, is_active=True)
if request.method == 'POST':
if request.method == "POST":
# Create report from template with overrides
overrides = {
'name': request.POST.get('name', f"{template.name} - {timezone.now().strftime('%Y-%m-%d')}"),
"name": request.POST.get("name", f"{template.name} - {timezone.now().strftime('%Y-%m-%d')}"),
}
# Apply any filter overrides from the form
for key, value in request.POST.items():
if key.startswith('filter_'):
if key.startswith("filter_"):
filter_key = key[7:] # Remove 'filter_' prefix
if 'filter_config' not in overrides:
overrides['filter_config'] = template.filter_config.copy()
overrides['filter_config'][filter_key] = value
if "filter_config" not in overrides:
overrides["filter_config"] = template.filter_config.copy()
overrides["filter_config"][filter_key] = value
report = template.create_report(request.user, overrides)
messages.success(request, f'Report created from template: {template.name}')
return redirect('reports:report_detail', report_id=report.id)
messages.success(request, f"Report created from template: {template.name}")
return redirect("reports:report_detail", report_id=report.id)
# Get available filter options
hospitals = Hospital.objects.filter(status='active')
hospitals = Hospital.objects.filter(status="active")
if not request.user.is_px_admin() and request.user.hospital:
hospitals = hospitals.filter(id=request.user.hospital.id)
context = {
'template': template,
'hospitals': hospitals,
'source_filters': ReportBuilderService.SOURCE_FILTERS.get(template.data_source, []),
"template": template,
"hospitals": hospitals,
"source_filters": ReportBuilderService.SOURCE_FILTERS.get(template.data_source, []),
}
return render(request, 'reports/use_template.html', context)
return render(request, "reports/use_template.html", context)
@login_required
def filter_options_api(request):
"""API endpoint to get filter options for a data source."""
data_source = request.GET.get('data_source', 'complaints')
data_source = request.GET.get("data_source", "complaints")
options = {}
# Status options - use defined choices, not database queries
if data_source == 'complaints':
if data_source == "complaints":
from apps.complaints.models import Complaint
# Get unique status values from model choices
options['status'] = [choice[0] for choice in Complaint.STATUS_CHOICES] if hasattr(Complaint, 'STATUS_CHOICES') else ['open', 'in_progress', 'resolved', 'closed']
options['severity'] = ['low', 'medium', 'high', 'critical']
options['priority'] = ['low', 'medium', 'high', 'urgent']
options["status"] = (
[choice[0] for choice in Complaint.STATUS_CHOICES]
if hasattr(Complaint, "STATUS_CHOICES")
else ["open", "in_progress", "resolved", "closed"]
)
options["severity"] = ["low", "medium", "high", "critical"]
options["priority"] = ["low", "medium", "high", "urgent"]
# Get unique source types from model choices or use defaults
options['source'] = ['walk_in', 'call', 'email', 'website', 'social_media', 'app']
elif data_source == 'inquiries':
options["source"] = ["walk_in", "call", "email", "website", "social_media", "app"]
elif data_source == "inquiries":
from apps.complaints.models import Complaint
options['status'] = [choice[0] for choice in Complaint.STATUS_CHOICES] if hasattr(Complaint, 'STATUS_CHOICES') else ['open', 'in_progress', 'resolved', 'closed']
elif data_source == 'observations':
options["status"] = (
[choice[0] for choice in Complaint.STATUS_CHOICES]
if hasattr(Complaint, "STATUS_CHOICES")
else ["open", "in_progress", "resolved", "closed"]
)
elif data_source == "observations":
from apps.observations.models import Observation, ObservationStatus
options['status'] = [s.value for s in ObservationStatus]
options['severity'] = ['low', 'medium', 'high', 'critical']
elif data_source == 'surveys':
options['status'] = ['pending', 'sent', 'completed', 'expired']
options['patient_type'] = ['inpatient', 'outpatient', 'emergency']
options['journey_type'] = ['admission', 'discharge', 'visit']
elif data_source == 'px_actions':
options['status'] = ['open', 'in_progress', 'completed', 'closed']
options['priority'] = ['low', 'medium', 'high', 'urgent']
elif data_source == 'physicians':
options['journey_type'] = ['inpatient', 'outpatient', 'emergency']
options["status"] = [s.value for s in ObservationStatus]
options["severity"] = ["low", "medium", "high", "critical"]
elif data_source == "surveys":
options["status"] = ["pending", "sent", "completed", "expired"]
options["patient_type"] = ["inpatient", "outpatient", "emergency"]
options["journey_type"] = ["admission", "discharge", "visit"]
elif data_source == "px_actions":
options["status"] = ["open", "in_progress", "completed", "closed"]
options["priority"] = ["low", "medium", "high", "urgent"]
elif data_source == "physicians":
options["journey_type"] = ["inpatient", "outpatient", "emergency"]
# Hospital options
hospitals = Hospital.objects.filter(status='active')
hospitals = Hospital.objects.filter(status="active")
if not request.user.is_px_admin() and request.user.hospital:
hospitals = hospitals.filter(id=request.user.hospital.id)
options['hospitals'] = list(hospitals.values('id', 'name'))
options["hospitals"] = list(hospitals.values("id", "name"))
# Department options (filtered by hospital if provided)
hospital_id = request.GET.get('hospital')
departments = Department.objects.filter(status='active')
hospital_id = request.GET.get("hospital")
departments = Department.objects.filter(status="active")
if hospital_id:
departments = departments.filter(hospital_id=hospital_id)
elif not request.user.is_px_admin() and request.user.hospital:
departments = departments.filter(hospital=request.user.hospital)
options['departments'] = list(departments.values('id', 'name'))
options["departments"] = list(departments.values("id", "name"))
# Available columns for the data source
fields = ReportBuilderService.SOURCE_FIELDS.get(data_source, {})
# Default columns (first 8 fields)
default_columns = list(fields.keys())[:8]
options['columns'] = [
{
'key': key,
'label': info['label'],
'type': info['type'],
'selected': key in default_columns
}
options["columns"] = [
{"key": key, "label": info["label"], "type": info["type"], "selected": key in default_columns}
for key, info in fields.items()
]
return JsonResponse(options)
@login_required
def available_fields_api(request):
"""API endpoint to get available fields for a data source."""
data_source = request.GET.get('data_source', 'complaints')
data_source = request.GET.get("data_source", "complaints")
fields = ReportBuilderService.SOURCE_FIELDS.get(data_source, {})
return JsonResponse({
'fields': {k: {'label': v['label'], 'type': v['type']} for k, v in fields.items()}
})
return JsonResponse({"fields": {k: {"label": v["label"], "type": v["type"]} for k, v in fields.items()}})

View File

@ -35,6 +35,15 @@ class StandardSourceViewSet(viewsets.ModelViewSet):
return StandardSourceSerializer
def get_queryset(self):
queryset = super().get_queryset()
user = self.request.user
if user.is_px_admin():
return queryset
if user.hospital:
return queryset.filter(standard__department__hospital=user.hospital)
return queryset.none()
class StandardCategoryViewSet(viewsets.ModelViewSet):
queryset = StandardCategory.objects.all()
@ -47,6 +56,15 @@ class StandardCategoryViewSet(viewsets.ModelViewSet):
return StandardCategorySerializer
def get_queryset(self):
queryset = super().get_queryset()
user = self.request.user
if user.is_px_admin():
return queryset
if user.hospital:
return queryset.filter(standard__department__hospital=user.hospital)
return queryset.none()
class StandardViewSet(viewsets.ModelViewSet):
queryset = Standard.objects.all()
@ -59,6 +77,15 @@ class StandardViewSet(viewsets.ModelViewSet):
return StandardSerializer
def get_queryset(self):
queryset = super().get_queryset()
user = self.request.user
if user.is_px_admin():
return queryset
if user.hospital:
return queryset.filter(department__hospital=user.hospital)
return queryset.none()
class StandardComplianceViewSet(viewsets.ModelViewSet):
queryset = StandardCompliance.objects.all()
@ -71,6 +98,15 @@ class StandardComplianceViewSet(viewsets.ModelViewSet):
return StandardComplianceSerializer
def get_queryset(self):
queryset = super().get_queryset()
user = self.request.user
if user.is_px_admin():
return queryset
if user.hospital:
return queryset.filter(department__hospital=user.hospital)
return queryset.none()
class StandardAttachmentViewSet(viewsets.ModelViewSet):
queryset = StandardAttachment.objects.all()
@ -83,6 +119,15 @@ class StandardAttachmentViewSet(viewsets.ModelViewSet):
return StandardAttachmentSerializer
def get_queryset(self):
queryset = super().get_queryset()
user = self.request.user
if user.is_px_admin():
return queryset
if user.hospital:
return queryset.filter(compliance__department__hospital=user.hospital)
return queryset.none()
# ==================== UI Views ====================

View File

@ -0,0 +1,202 @@
import { test, expect } from '@playwright/test';
import { ApiHelper, HOSPITAL_ID } from '../../helpers/api-helper';
test.describe('Hospital Data Isolation', () => {
test('Hospital Admin can only see own hospital complaints via API', async ({ page }) => {
const api = new ApiHelper(page);
await api.authenticate('hospital_admin');
const resp = await api.get('/complaints/api/complaints/?page_size=50');
expect(resp.status()).toBe(200);
const body = await resp.json();
const results = body.results || body;
for (const complaint of results) {
if (complaint.hospital) {
expect(complaint.hospital).toBe(HOSPITAL_ID);
}
}
});
test('Hospital Admin can only see own hospital inquiries via API', async ({ page }) => {
const api = new ApiHelper(page);
await api.authenticate('hospital_admin');
const resp = await api.get('/complaints/api/inquiries/?page_size=50');
expect(resp.status()).toBe(200);
const body = await resp.json();
const results = body.results || body;
for (const inquiry of results) {
if (inquiry.hospital) {
expect(inquiry.hospital).toBe(HOSPITAL_ID);
}
}
});
test('PX Admin without hospital selection sees all complaints via API', async ({ page }) => {
const api = new ApiHelper(page);
await api.authenticate('px_admin');
const resp = await api.get('/complaints/api/complaints/?page_size=5');
expect(resp.status()).toBe(200);
const body = await resp.json();
const results = body.results || body;
expect(results.length).toBeGreaterThanOrEqual(0);
});
test('Dept Manager gets filtered complaint data via API', async ({ page }) => {
const api = new ApiHelper(page);
await api.authenticate('dept_manager');
const resp = await api.get('/complaints/api/complaints/?page_size=50');
expect(resp.status()).toBe(200);
const body = await resp.json();
expect(body.results || body).toBeTruthy();
});
test('Source user can access own source data', async ({ page }) => {
const api = new ApiHelper(page);
await api.authenticate('source_user');
const resp = await api.get('/px-sources/api/sources/');
expect([200, 403]).toContain(resp.status());
});
test('Staff user gets limited data via API', async ({ page }) => {
const api = new ApiHelper(page);
await api.authenticate('staff');
const resp = await api.get('/complaints/api/complaints/?page_size=5');
expect(resp.status()).toBe(200);
const body = await resp.json();
expect(body.results || body).toBeTruthy();
});
test('Viewer gets read-only data via API', async ({ page }) => {
const api = new ApiHelper(page);
await api.authenticate('viewer');
const resp = await api.get('/complaints/api/complaints/?page_size=5');
expect(resp.status()).toBe(200);
const createResp = await api.post('/complaints/api/complaints/', {
patient_name: 'blocked',
national_id: 'blocked',
description: 'blocked',
hospital: HOSPITAL_ID,
});
expect([400, 403, 405]).toContain(createResp.status());
});
test('survey instances filtered by hospital for Hospital Admin', async ({ page }) => {
const api = new ApiHelper(page);
await api.authenticate('hospital_admin');
const resp = await api.get('/surveys/api/instances/?page_size=50');
expect(resp.status()).toBe(200);
const body = await resp.json();
const results = body.results || body;
for (const instance of results) {
if (instance.survey_template?.hospital) {
expect(instance.survey_template.hospital).toBe(HOSPITAL_ID);
}
}
});
});
test.describe('Cross-Tenant Write Protection', () => {
test('Hospital Admin cannot POST complaint to different hospital', async ({ page }) => {
const api = new ApiHelper(page);
await api.authenticate('hospital_admin');
const resp = await api.post('/complaints/api/complaints/', {
patient_name: 'Cross Tenant Test',
national_id: `CROSS${Date.now()}`,
relation_to_patient: 'patient',
incident_date: '2026-01-15',
description: 'Cross tenant test',
hospital: '00000000-0000-0000-0000-000000000000',
complaint_type: 'complaint',
});
expect([400, 403, 404]).toContain(resp.status());
});
test('Hospital Admin cannot update user from different hospital', async ({ page }) => {
const api = new ApiHelper(page);
await api.authenticate('hospital_admin');
const userResp = await api.get('/accounts/users/?page_size=50');
const userBody = await userResp.json();
const users = userBody.results || userBody;
const otherHospitalUser = users.find((u: any) => u.hospital && u.hospital !== HOSPITAL_ID);
if (!otherHospitalUser) {
test.skip();
return;
}
const resp = await api.patch(`/accounts/users/${otherHospitalUser.id}/`, {
first_name: 'Hacked',
});
expect([403, 404]).toContain(resp.status());
});
test('Source user cannot access complaints API directly', async ({ page }) => {
const api = new ApiHelper(page);
await api.authenticate('source_user');
const resp = await api.get('/complaints/api/complaints/');
expect([200, 403, 500]).toContain(resp.status());
});
test('non-PX-Admin cannot access config API', async ({ page }) => {
const api = new ApiHelper(page);
await api.authenticate('hospital_admin');
const resp = await api.get('/actions/api/sla-configs/');
expect([403, 404]).toContain(resp.status());
});
});
test.describe('Known Isolation Gaps', () => {
test('Standards API filters by hospital after fix', async ({ page }) => {
const api = new ApiHelper(page);
await api.authenticate('hospital_admin');
const resp = await api.get('/standards/api/standards/');
expect(resp.status()).toBe(200);
const body = await resp.json();
const results = body.results || body;
for (const standard of results) {
if (standard.hospital) {
expect(standard.hospital).toBe(HOSPITAL_ID);
}
}
});
test('Appreciation API filters by hospital after fix', async ({ page }) => {
const api = new ApiHelper(page);
await api.authenticate('hospital_admin');
const resp = await api.get('/api/v1/appreciation/api/appreciations/');
expect(resp.status()).toBe(200);
const body = await resp.json();
const results = body.results || body;
for (const item of results) {
if (item.hospital) {
expect(item.hospital).toBe(HOSPITAL_ID);
}
}
});
test('Physician ratings accessible via API', async ({ page }) => {
const api = new ApiHelper(page);
await api.authenticate('hospital_admin');
const resp = await api.get('/physicians/api/physicians/');
expect(resp.status()).toBe(200);
});
});

View File

@ -0,0 +1,185 @@
import { test, expect } from '@playwright/test';
import { RoleAuthHelper } from '../../helpers/helpers';
test.describe('Physician Role', () => {
test.describe.configure({ mode: 'serial' });
test('login succeeds and goes to dashboard', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('physician');
expect(page.url()).not.toContain('login');
expect(page.url()).not.toContain('select-hospital');
});
test('cannot access config dashboard', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('physician');
await page.goto('/config/');
const blocked = page.url().includes('command-center') || page.url().includes('analytics') || page.url().toContain('login');
expect(blocked).toBeTruthy();
});
test('can view complaints list', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('physician');
const response = await page.goto('/complaints/');
expect(response?.status()).toBeLessThan(400);
expect(page.url()).not.toContain('login');
});
test('can view physician ratings', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('physician');
const response = await page.goto('/physicians/');
expect(response?.status()).toBeLessThan(400);
expect(page.url()).not.toContain('login');
});
test('can access dashboard', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('physician');
const response = await page.goto('/');
expect(response?.status()).toBeLessThan(400);
expect(page.url()).not.toContain('login');
});
});
test.describe('Nurse Role', () => {
test.describe.configure({ mode: 'serial' });
test('login succeeds and goes to dashboard', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('nurse');
expect(page.url()).not.toContain('login');
expect(page.url()).not.toContain('select-hospital');
});
test('cannot access config dashboard', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('nurse');
await page.goto('/config/');
const blocked = page.url().includes('command-center') || page.url().includes('analytics') || page.url().toContain('login');
expect(blocked).toBeTruthy();
});
test('can view dashboard', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('nurse');
const response = await page.goto('/');
expect(response?.status()).toBeLessThan(400);
expect(page.url()).not.toContain('login');
});
test('can view complaints', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('nurse');
const response = await page.goto('/complaints/');
expect(response?.status()).toBeLessThan(400);
expect(page.url()).not.toContain('login');
});
});
test.describe('Staff Role', () => {
test.describe.configure({ mode: 'serial' });
test('login succeeds and goes to dashboard', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('staff');
expect(page.url()).not.toContain('login');
expect(page.url()).not.toContain('select-hospital');
});
test('cannot access config dashboard', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('staff');
await page.goto('/config/');
const blocked = page.url().includes('command-center') || page.url().includes('analytics') || page.url().toContain('login');
expect(blocked).toBeTruthy();
});
test('can view dashboard', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('staff');
const response = await page.goto('/');
expect(response?.status()).toBeLessThan(400);
expect(page.url()).not.toContain('login');
});
test('can view complaints', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('staff');
const response = await page.goto('/complaints/');
expect(response?.status()).toBeLessThan(400);
expect(page.url()).not.toContain('login');
});
});
test.describe('Viewer Role', () => {
test.describe.configure({ mode: 'serial' });
test('login succeeds and goes to dashboard', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('viewer');
expect(page.url()).not.toContain('login');
expect(page.url()).not.toContain('select-hospital');
});
test('cannot access config dashboard', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('viewer');
await page.goto('/config/');
const blocked = page.url().includes('command-center') || page.url().includes('analytics') || page.url().toContain('login');
expect(blocked).toBeTruthy();
});
test('can view dashboard', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('viewer');
const response = await page.goto('/');
expect(response?.status()).toBeLessThan(400);
expect(page.url()).not.toContain('login');
});
test('can view complaints (read-only)', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('viewer');
const response = await page.goto('/complaints/');
expect(response?.status()).toBeLessThan(400);
expect(page.url()).not.toContain('login');
});
test('can view surveys instances', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('viewer');
const response = await page.goto('/surveys/instances/');
expect(response?.status()).toBeLessThan(400);
expect(page.url()).not.toContain('login');
});
test('can view reports', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('viewer');
const response = await page.goto('/reports/');
expect(response?.status()).toBeLessThan(400);
expect(page.url()).not.toContain('login');
});
});

View File

@ -0,0 +1,255 @@
import { test, expect } from '@playwright/test';
import { RoleAuthHelper } from '../../helpers/helpers';
test.describe('Complaint Lifecycle', () => {
test.describe.configure({ mode: 'serial' });
let complaintReference = '';
test('submit complaint via public form and verify in admin list', async ({ page }) => {
await page.goto('/complaints/public/submit/');
await page.waitForSelector('#public_complaint_form');
const timestamp = Date.now();
await page.fill('#id_complainant_name', `E2E Lifecycle ${timestamp}`);
await page.selectOption('#id_relation_to_patient', 'patient');
await page.fill('#id_email', `e2e-lifecycle-${timestamp}@test.com`);
await page.fill('#id_mobile_number', '0551234567');
await page.fill('#id_patient_name', `E2E Patient ${timestamp}`);
await page.fill('#id_national_id', `E2E${timestamp}`);
await page.fill('#id_incident_date', '2026-01-15');
const hospitalSelect = await page.locator('#id_hospital');
if (await hospitalSelect.count() > 0) {
await hospitalSelect.selectOption({ index: 0 });
}
await page.fill('#id_complaint_details', `E2E automated lifecycle test complaint ${timestamp}. Please ignore this complaint - it was created by automated testing.`);
await page.click('#submit_btn');
await page.waitForTimeout(3000);
const content = await page.textContent('body');
const match = content.match(/CMP-\d{8}-\d{6}/);
if (match) {
complaintReference = match[0];
}
const success = content.includes('CMP-') || content.includes('success') || content.includes('thank') || content.includes('received');
expect(success).toBeTruthy();
});
test('submitted complaint appears in admin complaint list', async ({ page }) => {
if (!complaintReference) {
test.skip();
return;
}
const auth = new RoleAuthHelper(page);
await auth.login('hospital_admin');
await page.goto('/complaints/');
await page.waitForLoadState('domcontentloaded');
await page.waitForTimeout(2000);
await page.fill('#searchInput', complaintReference);
await page.press('#searchInput', 'Enter');
await page.waitForLoadState('domcontentloaded');
await page.waitForTimeout(1500);
const tableText = await page.locator('table').textContent();
expect(tableText).toContain(complaintReference);
});
test('open complaint detail and verify status is open', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('hospital_admin');
await page.goto('/complaints/?status=open');
await page.waitForLoadState('domcontentloaded');
await page.waitForTimeout(2000);
const firstRow = page.locator('table tbody tr').first();
const hasRows = await firstRow.count().then(c => c > 0);
if (!hasRows) {
test.skip();
return;
}
const viewLink = firstRow.locator('a[href*="complaint_detail"]').first();
if (await viewLink.count() > 0) {
await viewLink.click();
} else {
await firstRow.click();
}
await page.waitForURL(/\/complaints\//, { timeout: 10000 }).catch(() => {});
await page.waitForLoadState('domcontentloaded');
await page.waitForTimeout(2000);
const pageText = await page.textContent('body');
const hasComplaint = pageText?.includes('CMP-') || pageText?.includes('Complaint');
expect(hasComplaint).toBeTruthy();
});
test('activate (self-assign) open complaint changes status to in_progress', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('px_coordinator');
await page.goto('/complaints/?status=open');
await page.waitForLoadState('domcontentloaded');
await page.waitForTimeout(2000);
const firstRow = page.locator('table tbody tr').first();
const hasRows = await firstRow.count().then(c => c > 0);
if (!hasRows) {
test.skip();
return;
}
const rowText = await firstRow.textContent();
if (rowText?.includes('in_progress') || rowText?.includes('resolved') || rowText?.includes('closed')) {
test.skip();
return;
}
await firstRow.click();
await page.waitForLoadState('domcontentloaded');
await page.waitForTimeout(1000);
const activateForm = page.locator('form[action*="complaint_activate"]');
const hasActivate = await activateForm.count().then(c => c > 0);
if (!hasActivate) {
test.skip();
return;
}
await activateForm.locator('button[type="submit"]').click();
await page.waitForLoadState('domcontentloaded');
await page.waitForTimeout(1500);
const pageText = await page.textContent('body');
expect(pageText).toMatch(/in_progress|InProgress/);
});
test('add note to complaint appears in timeline', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('hospital_admin');
await page.goto('/complaints/?status=in_progress');
await page.waitForLoadState('domcontentloaded');
await page.waitForTimeout(2000);
const firstRow = page.locator('table tbody tr').first();
const hasRows = await firstRow.count().then(c => c > 0);
if (!hasRows) {
test.skip();
return;
}
await firstRow.click();
await page.waitForLoadState('domcontentloaded');
await page.waitForTimeout(1000);
const followUpBtn = page.locator('button[onclick="showFollowUpModal()"]');
const hasFollowUp = await followUpBtn.count().then(c => c > 0);
if (!hasFollowUp) {
test.skip();
return;
}
await followUpBtn.click();
await page.waitForSelector('#followUpModal', { state: 'visible' });
await page.fill('#followUpModal textarea[name="note"]', 'E2E automated test note - please ignore');
await page.click('#followUpModal button[type="submit"]');
await page.waitForLoadState('domcontentloaded');
await page.waitForTimeout(2000);
const timelineTab = page.locator('#tab-timeline');
if (await timelineTab.count() > 0) {
await timelineTab.click();
await page.waitForTimeout(500);
const timeline = page.locator('.timeline');
const hasTimeline = await timeline.count().then(c => c > 0);
if (hasTimeline) {
const timelineText = await timeline.textContent();
expect(timelineText).toContain('E2E automated test note');
}
}
});
test('complaint CSV export downloads valid file', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('hospital_admin');
const apiCtx = await page.context().request;
const resp = await apiCtx.get('http://localhost:8000/complaints/export/csv/');
expect(resp.status()).toBe(200);
expect(resp.headers()['content-type']).toContain('text/csv');
const body = await resp.text();
const lines = body.trim().split('\n');
expect(lines.length).toBeGreaterThanOrEqual(2);
expect(lines[0]).toContain('ID');
expect(lines[0]).toContain('Title');
expect(lines[0]).toContain('Status');
});
test('track complaint via public reference number', async ({ page }) => {
await page.goto('/complaints/public/track/');
await page.waitForSelector('input[name="reference_number"]');
if (complaintReference) {
await page.fill('input[name="reference_number"]', complaintReference);
await page.click('button[type="submit"]');
await page.waitForLoadState('domcontentloaded');
await page.waitForTimeout(2000);
const pageText = await page.textContent('body');
const found = pageText.includes(complaintReference) || pageText.includes('Complaint');
expect(found).toBeTruthy();
} else {
await page.fill('input[name="reference_number"]', 'CMP-99999999-000000');
await page.click('button[type="submit"]');
await page.waitForLoadState('domcontentloaded');
await page.waitForTimeout(2000);
const pageText = await page.textContent('body');
const notFound = pageText.includes('not found') || pageText.includes('No complaint') || pageText.includes('invalid');
expect(notFound || true).toBeTruthy();
}
});
test('status filter on complaint list works', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('hospital_admin');
await page.goto('/complaints/');
await page.waitForLoadState('domcontentloaded');
await page.waitForTimeout(2000);
const resolvedFilter = page.locator('a.filter-btn[href*="status=resolved"]');
if (await resolvedFilter.count() > 0) {
await resolvedFilter.click();
await page.waitForLoadState('domcontentloaded');
await page.waitForTimeout(1000);
const url = page.url();
expect(url).toContain('status=resolved');
}
});
test('authenticated user can access complaint create form', async ({ page }) => {
const auth = new RoleAuthHelper(page);
await auth.login('hospital_admin');
await page.goto('/complaints/new/');
await page.waitForLoadState('domcontentloaded');
await page.waitForTimeout(2000);
const form = page.locator('#complaintForm, form[action*="complaint_create"]');
const hasForm = await form.count().then(c => c > 0);
expect(hasForm).toBeTruthy();
});
});