Pre-production security fixes to prevent cross-hospital data leaks: - Standards API: add get_queryset() filtering by department__hospital - Reports service: add user param with hospital filtering to all querysets - RCA views: replace is_superuser with tenant_hospital pattern, add access checks to all 11 mutation views - Notifications views: replace is_superuser patterns with _get_notification_hospital helper across all 5 settings functions - Appreciation API: add tenant_hospital fallback to AppreciationViewSet, AppreciationStatsViewSet, and LeaderboardView - AI Analytics: add tenant_hospital fallback in ExecutiveSummaryGenerator and ActionRecommendationEngine - SourceUserRestrictionMiddleware: remove None from ALLOWED_URL_NAMES - Complaint export: fix nullable patient/due_at/description crashes in CSV and Excel export, fix invalid get_category_display/get_source_display calls E2E test updates: - Update isolation gap tests to actively assert hospital filtering - Fix CSV export test to use API context for download handling - Switch clinical-staff tests to serial mode to prevent race conditions
507 lines
18 KiB
Python
507 lines
18 KiB
Python
"""
|
|
Appreciation views - API views for appreciation management
|
|
"""
|
|
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.db.models import Count, Q, F
|
|
from django.utils import timezone
|
|
from rest_framework import generics, status, viewsets
|
|
from rest_framework.decorators import action
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from rest_framework.response import Response
|
|
|
|
from apps.appreciation.models import (
|
|
Appreciation,
|
|
AppreciationBadge,
|
|
AppreciationCategory,
|
|
AppreciationStats,
|
|
UserBadge,
|
|
)
|
|
from apps.appreciation.serializers import (
|
|
AppreciationBadgeSerializer,
|
|
AppreciationCategorySerializer,
|
|
AppreciationCreateSerializer,
|
|
AppreciationLeaderboardSerializer,
|
|
AppreciationSerializer,
|
|
AppreciationStatsSerializer,
|
|
AppreciationSummarySerializer,
|
|
UserBadgeSerializer,
|
|
)
|
|
from apps.accounts.permissions import IsPXAdminOrHospitalAdmin
|
|
|
|
|
|
class AppreciationCategoryViewSet(viewsets.ModelViewSet):
|
|
"""Viewset for AppreciationCategory"""
|
|
|
|
queryset = AppreciationCategory.objects.all()
|
|
serializer_class = AppreciationCategorySerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
"""Filter categories by hospital"""
|
|
queryset = super().get_queryset()
|
|
|
|
# Filter by hospital if provided
|
|
hospital_id = self.request.query_params.get("hospital_id")
|
|
if hospital_id:
|
|
queryset = queryset.filter(Q(hospital_id=hospital_id) | Q(hospital__isnull=True))
|
|
|
|
# Only show active categories
|
|
queryset = queryset.filter(is_active=True)
|
|
|
|
return queryset.select_related("hospital")
|
|
|
|
|
|
class AppreciationViewSet(viewsets.ModelViewSet):
|
|
"""Viewset for Appreciation"""
|
|
|
|
queryset = Appreciation.objects.all()
|
|
serializer_class = AppreciationSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
"""Filter appreciations based on user's access"""
|
|
user = self.request.user
|
|
queryset = super().get_queryset()
|
|
|
|
# Filter by hospital
|
|
hospital = getattr(self.request, "tenant_hospital", None) or user.hospital
|
|
if hospital:
|
|
queryset = queryset.filter(hospital=hospital)
|
|
elif not user.is_px_admin():
|
|
queryset = queryset.none()
|
|
|
|
# Filter by department if user is department manager
|
|
if user.department and user.is_department_manager():
|
|
queryset = queryset.filter(Q(department=user.department) | Q(department__isnull=True))
|
|
|
|
# Filter by visibility
|
|
# Users can see:
|
|
# - All appreciations they sent
|
|
# - All appreciations they received
|
|
# - Department-level appreciations if they're in the department
|
|
# - Hospital-level appreciations if they're in the hospital
|
|
# - Public appreciations
|
|
|
|
from apps.appreciation.models import AppreciationVisibility
|
|
|
|
# Get user's content type
|
|
user_content_type = ContentType.objects.get_for_model(user)
|
|
|
|
# Get staff if user has a staff profile
|
|
staff = None
|
|
if hasattr(user, "staff_profile"):
|
|
staff = user.staff_profile
|
|
staff_content_type = ContentType.objects.get_for_model(type(staff))
|
|
|
|
# Build visibility filter
|
|
visibility_filter = (
|
|
Q(sender=user) # Sent by user
|
|
| Q(recipient_content_type=user_content_type, recipient_object_id=user.id) # Received by user
|
|
)
|
|
|
|
if staff:
|
|
visibility_filter |= Q(
|
|
recipient_content_type=staff_content_type, recipient_object_id=staff.id
|
|
) # Received by staff
|
|
|
|
if user.department:
|
|
visibility_filter |= Q(visibility=AppreciationVisibility.DEPARTMENT, department=user.department)
|
|
|
|
if hospital:
|
|
visibility_filter |= Q(visibility=AppreciationVisibility.HOSPITAL, hospital=hospital)
|
|
|
|
visibility_filter |= Q(visibility=AppreciationVisibility.PUBLIC)
|
|
|
|
queryset = queryset.filter(visibility_filter)
|
|
|
|
# Filter by recipient
|
|
recipient_type = self.request.query_params.get("recipient_type")
|
|
recipient_id = self.request.query_params.get("recipient_id")
|
|
if recipient_type and recipient_id:
|
|
if recipient_type == "user":
|
|
content_type = ContentType.objects.get_for_model(self.request.user.__class__)
|
|
queryset = queryset.filter(recipient_content_type=content_type, recipient_object_id=recipient_id)
|
|
elif recipient_type == "staff":
|
|
from apps.organizations.models import Staff
|
|
|
|
content_type = ContentType.objects.get_for_model(Staff)
|
|
queryset = queryset.filter(recipient_content_type=content_type, recipient_object_id=recipient_id)
|
|
|
|
# Filter by status
|
|
status_filter = self.request.query_params.get("status")
|
|
if status_filter:
|
|
queryset = queryset.filter(status=status_filter)
|
|
|
|
# Filter by category
|
|
category_id = self.request.query_params.get("category_id")
|
|
if category_id:
|
|
queryset = queryset.filter(category_id=category_id)
|
|
|
|
return queryset.select_related("sender", "hospital", "department", "category").prefetch_related("recipient")
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
"""Create a new appreciation"""
|
|
serializer = AppreciationCreateSerializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
|
|
# Get validated data
|
|
data = serializer.validated_data
|
|
|
|
# Get recipient
|
|
recipient_type = data["recipient_type"]
|
|
recipient_id = data["recipient_id"]
|
|
|
|
if recipient_type == "user":
|
|
from apps.accounts.models import User
|
|
|
|
recipient = User.objects.get(id=recipient_id)
|
|
content_type = ContentType.objects.get_for_model(User)
|
|
else: # staff
|
|
from apps.organizations.models import Staff
|
|
|
|
recipient = Staff.objects.get(id=recipient_id)
|
|
content_type = ContentType.objects.get_for_model(Staff)
|
|
|
|
# Get hospital
|
|
from apps.organizations.models import Hospital
|
|
|
|
hospital = Hospital.objects.get(id=data["hospital_id"])
|
|
|
|
# Get department
|
|
department = None
|
|
if data.get("department_id"):
|
|
from apps.organizations.models import Department
|
|
|
|
department = Department.objects.get(id=data["department_id"])
|
|
|
|
# Get category
|
|
category = None
|
|
if data.get("category_id"):
|
|
category = AppreciationCategory.objects.get(id=data["category_id"])
|
|
|
|
# Create appreciation
|
|
appreciation = Appreciation.objects.create(
|
|
sender=request.user,
|
|
recipient_content_type=content_type,
|
|
recipient_object_id=recipient_id,
|
|
hospital=hospital,
|
|
department=department,
|
|
category=category,
|
|
message_en=data["message_en"],
|
|
message_ar=data.get("message_ar", ""),
|
|
visibility=data["visibility"],
|
|
is_anonymous=data["is_anonymous"],
|
|
)
|
|
|
|
# Send appreciation
|
|
appreciation.send()
|
|
|
|
# Serialize and return
|
|
serializer = AppreciationSerializer(appreciation)
|
|
return Response(serializer.data, status=status.HTTP_201_CREATED)
|
|
|
|
@action(detail=True, methods=["post"])
|
|
def acknowledge(self, request, pk=None):
|
|
"""Acknowledge an appreciation"""
|
|
appreciation = self.get_object()
|
|
|
|
# Check if user is the recipient
|
|
user_content_type = ContentType.objects.get_for_model(request.user)
|
|
if not (
|
|
appreciation.recipient_content_type == user_content_type
|
|
and appreciation.recipient_object_id == request.user.id
|
|
):
|
|
return Response(
|
|
{"error": "You can only acknowledge appreciations sent to you"}, status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
|
|
# Acknowledge
|
|
appreciation.acknowledge()
|
|
|
|
# Serialize and return
|
|
serializer = AppreciationSerializer(appreciation)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=False, methods=["get"])
|
|
def my_appreciations(self, request):
|
|
"""Get appreciations for the current user"""
|
|
# Get user's appreciations
|
|
user_content_type = ContentType.objects.get_for_model(request.user)
|
|
|
|
# Check if user has staff profile
|
|
staff = None
|
|
if hasattr(request.user, "staff_profile"):
|
|
staff = request.user.staff_profile
|
|
|
|
# Build query
|
|
queryset = self.get_queryset().filter(
|
|
Q(recipient_content_type=user_content_type, recipient_object_id=request.user.id)
|
|
)
|
|
|
|
if staff:
|
|
staff_content_type = ContentType.objects.get_for_model(type(staff))
|
|
queryset |= self.get_queryset().filter(
|
|
recipient_content_type=staff_content_type, recipient_object_id=staff.id
|
|
)
|
|
|
|
# Paginate
|
|
page = self.paginate_queryset(queryset)
|
|
if page is not None:
|
|
serializer = AppreciationSerializer(page, many=True)
|
|
return self.get_paginated_response(serializer.data)
|
|
|
|
serializer = AppreciationSerializer(queryset, many=True)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=False, methods=["get"])
|
|
def sent_by_me(self, request):
|
|
"""Get appreciations sent by the current user"""
|
|
queryset = self.get_queryset().filter(sender=request.user)
|
|
|
|
# Paginate
|
|
page = self.paginate_queryset(queryset)
|
|
if page is not None:
|
|
serializer = AppreciationSerializer(page, many=True)
|
|
return self.get_paginated_response(serializer.data)
|
|
|
|
serializer = AppreciationSerializer(queryset, many=True)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=False, methods=["get"])
|
|
def summary(self, request):
|
|
"""Get appreciation summary for the current user"""
|
|
# Get user's content type
|
|
user_content_type = ContentType.objects.get_for_model(request.user)
|
|
|
|
# Get current year and month
|
|
now = timezone.now()
|
|
current_year = now.year
|
|
current_month = now.month
|
|
|
|
# Count total received
|
|
total_received = Appreciation.objects.filter(
|
|
recipient_content_type=user_content_type, recipient_object_id=request.user.id
|
|
).count()
|
|
|
|
# Count total sent
|
|
total_sent = Appreciation.objects.filter(sender=request.user).count()
|
|
|
|
# Count this month received
|
|
this_month_received = Appreciation.objects.filter(
|
|
recipient_content_type=user_content_type,
|
|
recipient_object_id=request.user.id,
|
|
sent_at__year=current_year,
|
|
sent_at__month=current_month,
|
|
).count()
|
|
|
|
# Count this month sent
|
|
this_month_sent = Appreciation.objects.filter(
|
|
sender=request.user, sent_at__year=current_year, sent_at__month=current_month
|
|
).count()
|
|
|
|
# Get badges earned
|
|
badges_earned = UserBadge.objects.filter(
|
|
recipient_content_type=user_content_type, recipient_object_id=request.user.id
|
|
).count()
|
|
|
|
# Get hospital rank
|
|
hospital_rank = None
|
|
if request.user.hospital:
|
|
# Get stats for this month
|
|
stats = AppreciationStats.objects.filter(
|
|
recipient_content_type=user_content_type,
|
|
recipient_object_id=request.user.id,
|
|
year=current_year,
|
|
month=current_month,
|
|
).first()
|
|
if stats:
|
|
hospital_rank = stats.hospital_rank
|
|
|
|
# Get top category
|
|
top_category = None
|
|
if total_received > 0:
|
|
top_category_obj = (
|
|
Appreciation.objects.filter(
|
|
recipient_content_type=user_content_type, recipient_object_id=request.user.id
|
|
)
|
|
.values("category__name_en", "category__icon", "category__color")
|
|
.annotate(count=Count("id"))
|
|
.order_by("-count")
|
|
.first()
|
|
)
|
|
|
|
if top_category_obj and top_category_obj["category__name_en"]:
|
|
top_category = {
|
|
"name": top_category_obj["category__name_en"],
|
|
"icon": top_category_obj["category__icon"],
|
|
"color": top_category_obj["category__color"],
|
|
"count": top_category_obj["count"],
|
|
}
|
|
|
|
# Build response
|
|
summary = {
|
|
"total_received": total_received,
|
|
"total_sent": total_sent,
|
|
"this_month_received": this_month_received,
|
|
"this_month_sent": this_month_sent,
|
|
"top_category": top_category,
|
|
"badges_earned": badges_earned,
|
|
"hospital_rank": hospital_rank,
|
|
}
|
|
|
|
serializer = AppreciationSummarySerializer(summary)
|
|
return Response(serializer.data)
|
|
|
|
|
|
class AppreciationStatsViewSet(viewsets.ReadOnlyModelViewSet):
|
|
"""Viewset for AppreciationStats"""
|
|
|
|
queryset = AppreciationStats.objects.all()
|
|
serializer_class = AppreciationStatsSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
"""Filter stats based on user's access"""
|
|
user = self.request.user
|
|
queryset = super().get_queryset()
|
|
|
|
# Filter by hospital
|
|
hospital = getattr(self.request, "tenant_hospital", None) or user.hospital
|
|
if hospital:
|
|
queryset = queryset.filter(hospital=hospital)
|
|
elif not user.is_px_admin():
|
|
queryset = queryset.none()
|
|
|
|
# Filter by year and month
|
|
year = self.request.query_params.get("year")
|
|
if year:
|
|
queryset = queryset.filter(year=int(year))
|
|
|
|
month = self.request.query_params.get("month")
|
|
if month:
|
|
queryset = queryset.filter(month=int(month))
|
|
|
|
return queryset.select_related("hospital", "department")
|
|
|
|
|
|
class AppreciationBadgeViewSet(viewsets.ModelViewSet):
|
|
"""Viewset for AppreciationBadge"""
|
|
|
|
queryset = AppreciationBadge.objects.all()
|
|
serializer_class = AppreciationBadgeSerializer
|
|
permission_classes = [IsAuthenticated, IsPXAdminOrHospitalAdmin]
|
|
|
|
def get_queryset(self):
|
|
"""Filter badges by hospital"""
|
|
queryset = super().get_queryset()
|
|
|
|
# Filter by hospital if provided
|
|
hospital_id = self.request.query_params.get("hospital_id")
|
|
if hospital_id:
|
|
queryset = queryset.filter(Q(hospital_id=hospital_id) | Q(hospital__isnull=True))
|
|
|
|
# Only show active badges
|
|
queryset = queryset.filter(is_active=True)
|
|
|
|
return queryset.select_related("hospital")
|
|
|
|
|
|
class UserBadgeViewSet(viewsets.ReadOnlyModelViewSet):
|
|
"""Viewset for UserBadge"""
|
|
|
|
queryset = UserBadge.objects.all()
|
|
serializer_class = UserBadgeSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
"""Filter badges based on user's access"""
|
|
user = self.request.user
|
|
queryset = super().get_queryset()
|
|
|
|
# Get user's content type
|
|
user_content_type = ContentType.objects.get_for_model(user)
|
|
|
|
# Filter by user or user's staff profile
|
|
staff = None
|
|
if hasattr(user, "staff_profile"):
|
|
staff = user.staff_profile
|
|
staff_content_type = ContentType.objects.get_for_model(type(staff))
|
|
|
|
queryset = queryset.filter(Q(recipient_content_type=user_content_type, recipient_object_id=user.id))
|
|
|
|
if staff:
|
|
queryset |= queryset.filter(recipient_content_type=staff_content_type, recipient_object_id=staff.id)
|
|
|
|
return queryset.select_related("badge")
|
|
|
|
|
|
class LeaderboardView(generics.ListAPIView):
|
|
"""View for appreciation leaderboard"""
|
|
|
|
serializer_class = AppreciationLeaderboardSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
"""Build leaderboard"""
|
|
# Get filters
|
|
year = self.request.query_params.get("year")
|
|
month = self.request.query_params.get("month")
|
|
|
|
# Default to current month
|
|
if not year or not month:
|
|
now = timezone.now()
|
|
year = now.year
|
|
month = now.month
|
|
else:
|
|
year = int(year)
|
|
month = int(month)
|
|
|
|
# Get hospital from request user
|
|
user = self.request.user
|
|
hospital = getattr(self.request, "tenant_hospital", None) or user.hospital
|
|
if not hospital and not user.is_px_admin():
|
|
return []
|
|
|
|
# Get stats for the period
|
|
stats_qs = AppreciationStats.objects.filter(year=year, month=month, received_count__gt=0)
|
|
if hospital:
|
|
stats_qs = stats_qs.filter(hospital=hospital)
|
|
stats = stats_qs.order_by("-received_count")
|
|
|
|
# Build leaderboard
|
|
leaderboard = []
|
|
for rank, stat in enumerate(stats, start=1):
|
|
recipient_name = stat.get_recipient_name()
|
|
recipient_type = stat.recipient_content_type.model if stat.recipient_content_type else "unknown"
|
|
|
|
# Get badges for recipient
|
|
badges = []
|
|
user_badges = UserBadge.objects.filter(
|
|
recipient_content_type=stat.recipient_content_type, recipient_object_id=stat.recipient_object_id
|
|
).select_related("badge")
|
|
|
|
for user_badge in user_badges:
|
|
badges.append(
|
|
{
|
|
"name": user_badge.badge.name_en,
|
|
"icon": user_badge.badge.icon,
|
|
"color": user_badge.badge.color,
|
|
}
|
|
)
|
|
|
|
leaderboard.append(
|
|
{
|
|
"rank": rank,
|
|
"recipient_type": recipient_type,
|
|
"recipient_id": stat.recipient_object_id,
|
|
"recipient_name": recipient_name,
|
|
"hospital": stat.hospital.name,
|
|
"department": stat.department.name if stat.department else None,
|
|
"received_count": stat.received_count,
|
|
"badges": badges,
|
|
}
|
|
)
|
|
|
|
return leaderboard
|