175 lines
5.4 KiB
Python
175 lines
5.4 KiB
Python
"""
|
|
PX Sources REST API views and viewsets
|
|
"""
|
|
from rest_framework import status, viewsets
|
|
from rest_framework.decorators import action
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from rest_framework.response import Response
|
|
|
|
from apps.core.services import AuditService
|
|
|
|
from .models import PXSource
|
|
from .serializers import (
|
|
PXSourceChoiceSerializer,
|
|
PXSourceDetailSerializer,
|
|
PXSourceListSerializer,
|
|
PXSourceSerializer,
|
|
)
|
|
|
|
|
|
class PXSourceViewSet(viewsets.ModelViewSet):
|
|
"""
|
|
ViewSet for PX Sources with full CRUD operations.
|
|
|
|
Permissions:
|
|
- PX Admins: Full access to all sources
|
|
- Hospital Admins: Can view and manage sources
|
|
- Other users: Read-only access
|
|
"""
|
|
queryset = PXSource.objects.all()
|
|
permission_classes = [IsAuthenticated]
|
|
filterset_fields = ['is_active']
|
|
search_fields = ['name_en', 'name_ar', 'description']
|
|
ordering_fields = ['name_en', 'created_at']
|
|
ordering = ['name_en']
|
|
|
|
def get_serializer_class(self):
|
|
"""Use different serializers based on action"""
|
|
if self.action == 'list':
|
|
return PXSourceListSerializer
|
|
elif self.action == 'retrieve':
|
|
return PXSourceDetailSerializer
|
|
elif self.action == 'choices':
|
|
return PXSourceChoiceSerializer
|
|
return PXSourceSerializer
|
|
|
|
def get_queryset(self):
|
|
"""Filter sources based on user role"""
|
|
queryset = super().get_queryset()
|
|
|
|
user = self.request.user
|
|
|
|
# PX Admins see all sources
|
|
if user.is_px_admin():
|
|
return queryset
|
|
|
|
# All other authenticated users see active sources
|
|
return queryset.filter(is_active=True)
|
|
|
|
def perform_create(self, serializer):
|
|
"""Log source creation"""
|
|
source = serializer.save()
|
|
|
|
AuditService.log_from_request(
|
|
event_type='px_source_created',
|
|
description=f"PX Source created: {source.name_en}",
|
|
request=self.request,
|
|
content_object=source
|
|
)
|
|
|
|
def perform_update(self, serializer):
|
|
"""Log source update"""
|
|
source = serializer.save()
|
|
|
|
AuditService.log_from_request(
|
|
event_type='px_source_updated',
|
|
description=f"PX Source updated: {source.name_en}",
|
|
request=self.request,
|
|
content_object=source
|
|
)
|
|
|
|
def perform_destroy(self, instance):
|
|
"""Log source deletion"""
|
|
source_name = instance.name_en
|
|
instance.delete()
|
|
|
|
AuditService.log_from_request(
|
|
event_type='px_source_deleted',
|
|
description=f"PX Source deleted: {source_name}",
|
|
request=self.request
|
|
)
|
|
|
|
@action(detail=False, methods=['get'])
|
|
def choices(self, request):
|
|
"""
|
|
Get source choices for dropdowns.
|
|
"""
|
|
queryset = PXSource.get_active_sources()
|
|
serializer = PXSourceChoiceSerializer(
|
|
queryset,
|
|
many=True,
|
|
context={'request': request}
|
|
)
|
|
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def activate(self, request, pk=None):
|
|
"""Activate a source"""
|
|
source = self.get_object()
|
|
source.activate()
|
|
|
|
AuditService.log_from_request(
|
|
event_type='px_source_activated',
|
|
description=f"PX Source activated: {source.name_en}",
|
|
request=self.request,
|
|
content_object=source
|
|
)
|
|
|
|
return Response({
|
|
'message': 'Source activated successfully',
|
|
'is_active': True
|
|
})
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def deactivate(self, request, pk=None):
|
|
"""Deactivate a source"""
|
|
source = self.get_object()
|
|
source.deactivate()
|
|
|
|
AuditService.log_from_request(
|
|
event_type='px_source_deactivated',
|
|
description=f"PX Source deactivated: {source.name_en}",
|
|
request=self.request,
|
|
content_object=source
|
|
)
|
|
|
|
return Response({
|
|
'message': 'Source deactivated successfully',
|
|
'is_active': False
|
|
})
|
|
|
|
|
|
@action(detail=True, methods=['get'])
|
|
def usage(self, request, pk=None):
|
|
"""Get usage statistics for a source"""
|
|
source = self.get_object()
|
|
usage_records = source.usage_records.all().select_related(
|
|
'content_type', 'hospital', 'user'
|
|
)
|
|
|
|
# Group by content type
|
|
usage_by_type = {}
|
|
for record in usage_records:
|
|
content_type = record.content_type.model
|
|
if content_type not in usage_by_type:
|
|
usage_by_type[content_type] = 0
|
|
usage_by_type[content_type] += 1
|
|
|
|
return Response({
|
|
'source_id': str(source.id),
|
|
'source_name': source.name_en,
|
|
'total_usage': usage_records.count(),
|
|
'usage_by_type': usage_by_type,
|
|
'recent_usage': [
|
|
{
|
|
'content_type': r.content_type.model,
|
|
'object_id': str(r.object_id),
|
|
'hospital': r.hospital.name_en if r.hospital else None,
|
|
'user': r.user.get_full_name() if r.user else None,
|
|
'created_at': r.created_at,
|
|
}
|
|
for r in usage_records[:10]
|
|
]
|
|
})
|