HH/apps/px_sources/views.py
2026-04-08 17:13:35 +03:00

176 lines
5.7 KiB
Python

"""
PX Sources REST API views and viewsets
"""
from rest_framework import status, viewsets
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from apps.core.services import AuditService
from .models import PXSource
from .serializers import (
PXSourceChoiceSerializer,
PXSourceDetailSerializer,
PXSourceListSerializer,
PXSourceSerializer,
)
from .permissions import IsAdminUser, IsSourceUserOrAdmin, IsSourceUserOwnData
class PXSourceViewSet(viewsets.ModelViewSet):
"""
ViewSet for PX Sources with full CRUD operations.
Permissions:
- PX Admins: Full access to all sources
- Hospital Admins: Can view and manage sources
- Source Users: Read-only access to their own source
- Other users: No access
"""
queryset = PXSource.objects.all()
permission_classes = [IsAuthenticated, IsAdminUser]
filterset_fields = ["is_active"]
search_fields = ["name_en", "name_ar", "description"]
ordering_fields = ["name_en", "created_at"]
ordering = ["name_en"]
def get_serializer_class(self):
"""Use different serializers based on action"""
if self.action == "list":
return PXSourceListSerializer
elif self.action == "retrieve":
return PXSourceDetailSerializer
elif self.action == "choices":
return PXSourceChoiceSerializer
return PXSourceSerializer
def get_queryset(self):
"""Filter sources based on user role"""
queryset = super().get_queryset()
user = self.request.user
# PX Admins see all sources
if user.is_px_admin():
return queryset
# Hospital Admins see all sources
if user.is_hospital_admin():
return queryset
# Source users see only their source (handled by IsSourceUserOrAdmin)
if hasattr(user, "source_user_profile"):
source_user = user.source_user_profile
if source_user.is_active:
return queryset.filter(id=source_user.source.id)
# Default: active sources only
return queryset.filter(is_active=True)
def perform_create(self, serializer):
"""Log source creation"""
source = serializer.save()
AuditService.log_from_request(
event_type="px_source_created",
description=f"PX Source created: {source.name_en}",
request=self.request,
content_object=source,
)
def perform_update(self, serializer):
"""Log source update"""
source = serializer.save()
AuditService.log_from_request(
event_type="px_source_updated",
description=f"PX Source updated: {source.name_en}",
request=self.request,
content_object=source,
)
def perform_destroy(self, instance):
"""Log source deletion"""
source_name = instance.name_en
instance.delete()
AuditService.log_from_request(
event_type="px_source_deleted", description=f"PX Source deleted: {source_name}", request=self.request
)
@action(detail=False, methods=["get"])
def choices(self, request):
"""
Get source choices for dropdowns.
"""
queryset = PXSource.get_active_sources()
serializer = PXSourceChoiceSerializer(queryset, many=True, context={"request": request})
return Response(serializer.data)
@action(detail=True, methods=["post"])
def activate(self, request, pk=None):
"""Activate a source"""
source = self.get_object()
source.activate()
AuditService.log_from_request(
event_type="px_source_activated",
description=f"PX Source activated: {source.name_en}",
request=self.request,
content_object=source,
)
return Response({"message": "Source activated successfully", "is_active": True})
@action(detail=True, methods=["post"])
def deactivate(self, request, pk=None):
"""Deactivate a source"""
source = self.get_object()
source.deactivate()
AuditService.log_from_request(
event_type="px_source_deactivated",
description=f"PX Source deactivated: {source.name_en}",
request=self.request,
content_object=source,
)
return Response({"message": "Source deactivated successfully", "is_active": False})
@action(detail=True, methods=["get"])
def usage(self, request, pk=None):
"""Get usage statistics for a source"""
source = self.get_object()
usage_records = source.usage_records.all().select_related("content_type", "hospital", "user")
# Group by content type
usage_by_type = {}
for record in usage_records:
content_type = record.content_type.model
if content_type not in usage_by_type:
usage_by_type[content_type] = 0
usage_by_type[content_type] += 1
return Response(
{
"source_id": str(source.id),
"source_name": source.name_en,
"total_usage": usage_records.count(),
"usage_by_type": usage_by_type,
"recent_usage": [
{
"content_type": r.content_type.model,
"object_id": str(r.object_id),
"hospital": r.hospital.name if r.hospital else None,
"user": r.user.get_full_name() if r.user else None,
"created_at": r.created_at,
}
for r in usage_records[:10]
],
}
)