2026-01-01 16:44:42 +03:00

531 lines
19 KiB
Python

"""
Appreciation views - API views for appreciation management
"""
from django.contrib.contenttypes.models import ContentType
from django.db.models import Count, Q, F
from django.utils import timezone
from rest_framework import generics, status, viewsets
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from apps.appreciation.models import (
Appreciation,
AppreciationBadge,
AppreciationCategory,
AppreciationStats,
UserBadge,
)
from apps.appreciation.serializers import (
AppreciationBadgeSerializer,
AppreciationCategorySerializer,
AppreciationCreateSerializer,
AppreciationLeaderboardSerializer,
AppreciationSerializer,
AppreciationStatsSerializer,
AppreciationSummarySerializer,
UserBadgeSerializer,
)
from apps.accounts.permissions import IsPXAdminOrHospitalAdmin
class AppreciationCategoryViewSet(viewsets.ModelViewSet):
"""Viewset for AppreciationCategory"""
queryset = AppreciationCategory.objects.all()
serializer_class = AppreciationCategorySerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""Filter categories by hospital"""
queryset = super().get_queryset()
# Filter by hospital if provided
hospital_id = self.request.query_params.get('hospital_id')
if hospital_id:
queryset = queryset.filter(Q(hospital_id=hospital_id) | Q(hospital__isnull=True))
# Only show active categories
queryset = queryset.filter(is_active=True)
return queryset.select_related('hospital')
class AppreciationViewSet(viewsets.ModelViewSet):
"""Viewset for Appreciation"""
queryset = Appreciation.objects.all()
serializer_class = AppreciationSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""Filter appreciations based on user's access"""
user = self.request.user
queryset = super().get_queryset()
# Filter by hospital
if user.hospital:
queryset = queryset.filter(hospital=user.hospital)
# Filter by department if user is department manager
if user.department and user.is_department_manager():
queryset = queryset.filter(
Q(department=user.department) | Q(department__isnull=True)
)
# Filter by visibility
# Users can see:
# - All appreciations they sent
# - All appreciations they received
# - Department-level appreciations if they're in the department
# - Hospital-level appreciations if they're in the hospital
# - Public appreciations
from apps.appreciation.models import AppreciationVisibility
# Get user's content type
user_content_type = ContentType.objects.get_for_model(user)
# Get physician if user has a physician profile
physician = None
if hasattr(user, 'physician_profile'):
physician = user.phician_profile
physician_content_type = ContentType.objects.get_for_model(type(physician))
# Build visibility filter
visibility_filter = (
Q(sender=user) | # Sent by user
Q(
recipient_content_type=user_content_type,
recipient_object_id=user.id
) # Received by user
)
if physician:
visibility_filter |= Q(
recipient_content_type=physician_content_type,
recipient_object_id=physician.id
) # Received by physician
if user.department:
visibility_filter |= Q(
visibility=AppreciationVisibility.DEPARTMENT,
department=user.department
)
if user.hospital:
visibility_filter |= Q(
visibility=AppreciationVisibility.HOSPITAL,
hospital=user.hospital
)
visibility_filter |= Q(visibility=AppreciationVisibility.PUBLIC)
queryset = queryset.filter(visibility_filter)
# Filter by recipient
recipient_type = self.request.query_params.get('recipient_type')
recipient_id = self.request.query_params.get('recipient_id')
if recipient_type and recipient_id:
if recipient_type == 'user':
content_type = ContentType.objects.get_for_model(
self.request.user.__class__
)
queryset = queryset.filter(
recipient_content_type=content_type,
recipient_object_id=recipient_id
)
elif recipient_type == 'physician':
from apps.organizations.models import Physician
content_type = ContentType.objects.get_for_model(Physician)
queryset = queryset.filter(
recipient_content_type=content_type,
recipient_object_id=recipient_id
)
# Filter by status
status_filter = self.request.query_params.get('status')
if status_filter:
queryset = queryset.filter(status=status_filter)
# Filter by category
category_id = self.request.query_params.get('category_id')
if category_id:
queryset = queryset.filter(category_id=category_id)
return queryset.select_related(
'sender', 'hospital', 'department', 'category'
).prefetch_related('recipient')
def create(self, request, *args, **kwargs):
"""Create a new appreciation"""
serializer = AppreciationCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
# Get validated data
data = serializer.validated_data
# Get recipient
recipient_type = data['recipient_type']
recipient_id = data['recipient_id']
if recipient_type == 'user':
from apps.accounts.models import User
recipient = User.objects.get(id=recipient_id)
content_type = ContentType.objects.get_for_model(User)
else: # physician
from apps.organizations.models import Physician
recipient = Physician.objects.get(id=recipient_id)
content_type = ContentType.objects.get_for_model(Physician)
# Get hospital
from apps.organizations.models import Hospital
hospital = Hospital.objects.get(id=data['hospital_id'])
# Get department
department = None
if data.get('department_id'):
from apps.organizations.models import Department
department = Department.objects.get(id=data['department_id'])
# Get category
category = None
if data.get('category_id'):
category = AppreciationCategory.objects.get(id=data['category_id'])
# Create appreciation
appreciation = Appreciation.objects.create(
sender=request.user,
recipient_content_type=content_type,
recipient_object_id=recipient_id,
hospital=hospital,
department=department,
category=category,
message_en=data['message_en'],
message_ar=data.get('message_ar', ''),
visibility=data['visibility'],
is_anonymous=data['is_anonymous'],
)
# Send appreciation
appreciation.send()
# Serialize and return
serializer = AppreciationSerializer(appreciation)
return Response(serializer.data, status=status.HTTP_201_CREATED)
@action(detail=True, methods=['post'])
def acknowledge(self, request, pk=None):
"""Acknowledge an appreciation"""
appreciation = self.get_object()
# Check if user is the recipient
user_content_type = ContentType.objects.get_for_model(request.user)
if not (
appreciation.recipient_content_type == user_content_type and
appreciation.recipient_object_id == request.user.id
):
return Response(
{'error': 'You can only acknowledge appreciations sent to you'},
status=status.HTTP_403_FORBIDDEN
)
# Acknowledge
appreciation.acknowledge()
# Serialize and return
serializer = AppreciationSerializer(appreciation)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def my_appreciations(self, request):
"""Get appreciations for the current user"""
# Get user's appreciations
user_content_type = ContentType.objects.get_for_model(request.user)
# Check if user has physician profile
physician = None
if hasattr(request.user, 'physician_profile'):
physician = request.user.physician_profile
# Build query
queryset = self.get_queryset().filter(
Q(
recipient_content_type=user_content_type,
recipient_object_id=request.user.id
)
)
if physician:
physician_content_type = ContentType.objects.get_for_model(type(physician))
queryset |= self.get_queryset().filter(
recipient_content_type=physician_content_type,
recipient_object_id=physician.id
)
# Paginate
page = self.paginate_queryset(queryset)
if page is not None:
serializer = AppreciationSerializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = AppreciationSerializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def sent_by_me(self, request):
"""Get appreciations sent by the current user"""
queryset = self.get_queryset().filter(sender=request.user)
# Paginate
page = self.paginate_queryset(queryset)
if page is not None:
serializer = AppreciationSerializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = AppreciationSerializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def summary(self, request):
"""Get appreciation summary for the current user"""
# Get user's content type
user_content_type = ContentType.objects.get_for_model(request.user)
# Get current year and month
now = timezone.now()
current_year = now.year
current_month = now.month
# Count total received
total_received = Appreciation.objects.filter(
recipient_content_type=user_content_type,
recipient_object_id=request.user.id
).count()
# Count total sent
total_sent = Appreciation.objects.filter(
sender=request.user
).count()
# Count this month received
this_month_received = Appreciation.objects.filter(
recipient_content_type=user_content_type,
recipient_object_id=request.user.id,
sent_at__year=current_year,
sent_at__month=current_month
).count()
# Count this month sent
this_month_sent = Appreciation.objects.filter(
sender=request.user,
sent_at__year=current_year,
sent_at__month=current_month
).count()
# Get badges earned
badges_earned = UserBadge.objects.filter(
recipient_content_type=user_content_type,
recipient_object_id=request.user.id
).count()
# Get hospital rank
hospital_rank = None
if request.user.hospital:
# Get stats for this month
stats = AppreciationStats.objects.filter(
recipient_content_type=user_content_type,
recipient_object_id=request.user.id,
year=current_year,
month=current_month
).first()
if stats:
hospital_rank = stats.hospital_rank
# Get top category
top_category = None
if total_received > 0:
top_category_obj = Appreciation.objects.filter(
recipient_content_type=user_content_type,
recipient_object_id=request.user.id
).values('category__name_en', 'category__icon', 'category__color').annotate(
count=Count('id')
).order_by('-count').first()
if top_category_obj and top_category_obj['category__name_en']:
top_category = {
'name': top_category_obj['category__name_en'],
'icon': top_category_obj['category__icon'],
'color': top_category_obj['category__color'],
'count': top_category_obj['count']
}
# Build response
summary = {
'total_received': total_received,
'total_sent': total_sent,
'this_month_received': this_month_received,
'this_month_sent': this_month_sent,
'top_category': top_category,
'badges_earned': badges_earned,
'hospital_rank': hospital_rank,
}
serializer = AppreciationSummarySerializer(summary)
return Response(serializer.data)
class AppreciationStatsViewSet(viewsets.ReadOnlyModelViewSet):
"""Viewset for AppreciationStats"""
queryset = AppreciationStats.objects.all()
serializer_class = AppreciationStatsSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""Filter stats based on user's access"""
user = self.request.user
queryset = super().get_queryset()
# Filter by hospital
if user.hospital:
queryset = queryset.filter(hospital=user.hospital)
# Filter by year and month
year = self.request.query_params.get('year')
if year:
queryset = queryset.filter(year=int(year))
month = self.request.query_params.get('month')
if month:
queryset = queryset.filter(month=int(month))
return queryset.select_related('hospital', 'department')
class AppreciationBadgeViewSet(viewsets.ModelViewSet):
"""Viewset for AppreciationBadge"""
queryset = AppreciationBadge.objects.all()
serializer_class = AppreciationBadgeSerializer
permission_classes = [IsAuthenticated, IsPXAdminOrHospitalAdmin]
def get_queryset(self):
"""Filter badges by hospital"""
queryset = super().get_queryset()
# Filter by hospital if provided
hospital_id = self.request.query_params.get('hospital_id')
if hospital_id:
queryset = queryset.filter(Q(hospital_id=hospital_id) | Q(hospital__isnull=True))
# Only show active badges
queryset = queryset.filter(is_active=True)
return queryset.select_related('hospital')
class UserBadgeViewSet(viewsets.ReadOnlyModelViewSet):
"""Viewset for UserBadge"""
queryset = UserBadge.objects.all()
serializer_class = UserBadgeSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""Filter badges based on user's access"""
user = self.request.user
queryset = super().get_queryset()
# Get user's content type
user_content_type = ContentType.objects.get_for_model(user)
# Filter by user or user's physician profile
physician = None
if hasattr(user, 'physician_profile'):
physician = user.physician_profile
physician_content_type = ContentType.objects.get_for_model(type(physician))
queryset = queryset.filter(
Q(
recipient_content_type=user_content_type,
recipient_object_id=user.id
)
)
if physician:
queryset |= queryset.filter(
recipient_content_type=physician_content_type,
recipient_object_id=physician.id
)
return queryset.select_related('badge')
class LeaderboardView(generics.ListAPIView):
"""View for appreciation leaderboard"""
serializer_class = AppreciationLeaderboardSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""Build leaderboard"""
# Get filters
year = self.request.query_params.get('year')
month = self.request.query_params.get('month')
# Default to current month
if not year or not month:
now = timezone.now()
year = now.year
month = now.month
else:
year = int(year)
month = int(month)
# Get hospital from request user
user = self.request.user
if not user.hospital:
return []
# Get stats for the period
stats = AppreciationStats.objects.filter(
hospital=user.hospital,
year=year,
month=month,
received_count__gt=0
).order_by('-received_count')
# Build leaderboard
leaderboard = []
for rank, stat in enumerate(stats, start=1):
recipient_name = stat.get_recipient_name()
recipient_type = stat.recipient_content_type.model if stat.recipient_content_type else 'unknown'
# Get badges for recipient
badges = []
user_badges = UserBadge.objects.filter(
recipient_content_type=stat.recipient_content_type,
recipient_object_id=stat.recipient_object_id
).select_related('badge')
for user_badge in user_badges:
badges.append({
'name': user_badge.badge.name_en,
'icon': user_badge.badge.icon,
'color': user_badge.badge.color,
})
leaderboard.append({
'rank': rank,
'recipient_type': recipient_type,
'recipient_id': stat.recipient_object_id,
'recipient_name': recipient_name,
'hospital': stat.hospital.name,
'department': stat.department.name if stat.department else None,
'received_count': stat.received_count,
'badges': badges,
})
return leaderboard