HH/apps/px_sources/views.py
2026-01-15 14:31:58 +03:00

175 lines
5.4 KiB
Python

"""
PX Sources REST API views and viewsets
"""
from rest_framework import status, viewsets
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from apps.core.services import AuditService
from .models import PXSource
from .serializers import (
PXSourceChoiceSerializer,
PXSourceDetailSerializer,
PXSourceListSerializer,
PXSourceSerializer,
)
class PXSourceViewSet(viewsets.ModelViewSet):
"""
ViewSet for PX Sources with full CRUD operations.
Permissions:
- PX Admins: Full access to all sources
- Hospital Admins: Can view and manage sources
- Other users: Read-only access
"""
queryset = PXSource.objects.all()
permission_classes = [IsAuthenticated]
filterset_fields = ['is_active']
search_fields = ['name_en', 'name_ar', 'description']
ordering_fields = ['name_en', 'created_at']
ordering = ['name_en']
def get_serializer_class(self):
"""Use different serializers based on action"""
if self.action == 'list':
return PXSourceListSerializer
elif self.action == 'retrieve':
return PXSourceDetailSerializer
elif self.action == 'choices':
return PXSourceChoiceSerializer
return PXSourceSerializer
def get_queryset(self):
"""Filter sources based on user role"""
queryset = super().get_queryset()
user = self.request.user
# PX Admins see all sources
if user.is_px_admin():
return queryset
# All other authenticated users see active sources
return queryset.filter(is_active=True)
def perform_create(self, serializer):
"""Log source creation"""
source = serializer.save()
AuditService.log_from_request(
event_type='px_source_created',
description=f"PX Source created: {source.name_en}",
request=self.request,
content_object=source
)
def perform_update(self, serializer):
"""Log source update"""
source = serializer.save()
AuditService.log_from_request(
event_type='px_source_updated',
description=f"PX Source updated: {source.name_en}",
request=self.request,
content_object=source
)
def perform_destroy(self, instance):
"""Log source deletion"""
source_name = instance.name_en
instance.delete()
AuditService.log_from_request(
event_type='px_source_deleted',
description=f"PX Source deleted: {source_name}",
request=self.request
)
@action(detail=False, methods=['get'])
def choices(self, request):
"""
Get source choices for dropdowns.
"""
queryset = PXSource.get_active_sources()
serializer = PXSourceChoiceSerializer(
queryset,
many=True,
context={'request': request}
)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def activate(self, request, pk=None):
"""Activate a source"""
source = self.get_object()
source.activate()
AuditService.log_from_request(
event_type='px_source_activated',
description=f"PX Source activated: {source.name_en}",
request=self.request,
content_object=source
)
return Response({
'message': 'Source activated successfully',
'is_active': True
})
@action(detail=True, methods=['post'])
def deactivate(self, request, pk=None):
"""Deactivate a source"""
source = self.get_object()
source.deactivate()
AuditService.log_from_request(
event_type='px_source_deactivated',
description=f"PX Source deactivated: {source.name_en}",
request=self.request,
content_object=source
)
return Response({
'message': 'Source deactivated successfully',
'is_active': False
})
@action(detail=True, methods=['get'])
def usage(self, request, pk=None):
"""Get usage statistics for a source"""
source = self.get_object()
usage_records = source.usage_records.all().select_related(
'content_type', 'hospital', 'user'
)
# Group by content type
usage_by_type = {}
for record in usage_records:
content_type = record.content_type.model
if content_type not in usage_by_type:
usage_by_type[content_type] = 0
usage_by_type[content_type] += 1
return Response({
'source_id': str(source.id),
'source_name': source.name_en,
'total_usage': usage_records.count(),
'usage_by_type': usage_by_type,
'recent_usage': [
{
'content_type': r.content_type.model,
'object_id': str(r.object_id),
'hospital': r.hospital.name_en if r.hospital else None,
'user': r.user.get_full_name() if r.user else None,
'created_at': r.created_at,
}
for r in usage_records[:10]
]
})