HH/apps/accounts/views.py
2025-12-24 12:42:31 +03:00

227 lines
7.9 KiB
Python

"""
Accounts views and viewsets
"""
from django.contrib.auth import get_user_model
from rest_framework import status, viewsets
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework_simplejwt.views import TokenObtainPairView
from apps.core.services import AuditService
from .models import Role
from .permissions import IsPXAdmin, IsPXAdminOrReadOnly, IsOwnerOrPXAdmin
from .serializers import (
ChangePasswordSerializer,
RoleSerializer,
UserCreateSerializer,
UserSerializer,
UserUpdateSerializer,
)
User = get_user_model()
class CustomTokenObtainPairView(TokenObtainPairView):
"""
Custom JWT token view that logs user login.
"""
def post(self, request, *args, **kwargs):
response = super().post(request, *args, **kwargs)
# Log successful login
if response.status_code == 200:
username = request.data.get('username')
try:
user = User.objects.get(username=username)
AuditService.log_from_request(
event_type='user_login',
description=f"User {user.email} logged in",
request=request,
content_object=user
)
except User.DoesNotExist:
pass
return response
class UserViewSet(viewsets.ModelViewSet):
"""
ViewSet for User model.
Permissions:
- List/Retrieve: Authenticated users
- Create/Update/Delete: PX Admins only
- Users can update their own profile
"""
queryset = User.objects.all()
permission_classes = [IsAuthenticated]
filterset_fields = ['is_active', 'hospital', 'department', 'groups']
search_fields = ['username', 'email', 'first_name', 'last_name', 'employee_id']
ordering_fields = ['date_joined', 'email', 'last_name']
ordering = ['-date_joined']
def get_serializer_class(self):
"""Return appropriate serializer based on action"""
if self.action == 'create':
return UserCreateSerializer
elif self.action in ['update', 'partial_update']:
return UserUpdateSerializer
return UserSerializer
def get_permissions(self):
"""Set permissions based on action"""
if self.action in ['create', 'destroy']:
return [IsPXAdmin()]
elif self.action in ['update', 'partial_update']:
return [IsOwnerOrPXAdmin()]
return [IsAuthenticated()]
def get_queryset(self):
"""Filter queryset based on user role"""
queryset = super().get_queryset()
user = self.request.user
# PX Admins see all users
if user.is_px_admin():
return queryset.select_related('hospital', 'department')
# Hospital Admins see users in their hospital
if user.is_hospital_admin() and user.hospital:
return queryset.filter(hospital=user.hospital).select_related('hospital', 'department')
# Department Managers see users in their department
if user.is_department_manager() and user.department:
return queryset.filter(department=user.department).select_related('hospital', 'department')
# Others see only themselves
return queryset.filter(id=user.id)
def perform_create(self, serializer):
"""Log user creation"""
user = serializer.save()
AuditService.log_from_request(
event_type='other',
description=f"User {user.email} created",
request=self.request,
content_object=user
)
def perform_update(self, serializer):
"""Log user update"""
user = serializer.save()
AuditService.log_from_request(
event_type='other',
description=f"User {user.email} updated",
request=self.request,
content_object=user
)
@action(detail=False, methods=['get'], permission_classes=[IsAuthenticated])
def me(self, request):
"""Get current user profile"""
serializer = self.get_serializer(request.user)
return Response(serializer.data)
@action(detail=False, methods=['put'], permission_classes=[IsAuthenticated])
def update_profile(self, request):
"""Update current user profile"""
serializer = UserUpdateSerializer(request.user, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
serializer.save()
AuditService.log_from_request(
event_type='other',
description=f"User {request.user.email} updated their profile",
request=request,
content_object=request.user
)
return Response(UserSerializer(request.user).data)
@action(detail=False, methods=['post'], permission_classes=[IsAuthenticated])
def change_password(self, request):
"""Change user password"""
serializer = ChangePasswordSerializer(data=request.data, context={'request': request})
serializer.is_valid(raise_exception=True)
# Change password
request.user.set_password(serializer.validated_data['new_password'])
request.user.save()
AuditService.log_from_request(
event_type='other',
description=f"User {request.user.email} changed their password",
request=request,
content_object=request.user
)
return Response({'message': 'Password changed successfully'}, status=status.HTTP_200_OK)
@action(detail=True, methods=['post'], permission_classes=[IsPXAdmin])
def assign_role(self, request, pk=None):
"""Assign role to user (PX Admin only)"""
user = self.get_object()
role_id = request.data.get('role_id')
try:
role = Role.objects.get(id=role_id)
user.groups.add(role.group)
AuditService.log_from_request(
event_type='role_change',
description=f"Role {role.display_name} assigned to user {user.email}",
request=request,
content_object=user,
metadata={'role': role.name}
)
return Response({'message': f'Role {role.display_name} assigned successfully'})
except Role.DoesNotExist:
return Response({'error': 'Role not found'}, status=status.HTTP_404_NOT_FOUND)
@action(detail=True, methods=['post'], permission_classes=[IsPXAdmin])
def remove_role(self, request, pk=None):
"""Remove role from user (PX Admin only)"""
user = self.get_object()
role_id = request.data.get('role_id')
try:
role = Role.objects.get(id=role_id)
user.groups.remove(role.group)
AuditService.log_from_request(
event_type='role_change',
description=f"Role {role.display_name} removed from user {user.email}",
request=request,
content_object=user,
metadata={'role': role.name}
)
return Response({'message': f'Role {role.display_name} removed successfully'})
except Role.DoesNotExist:
return Response({'error': 'Role not found'}, status=status.HTTP_404_NOT_FOUND)
class RoleViewSet(viewsets.ModelViewSet):
"""
ViewSet for Role model.
Permissions:
- List/Retrieve: Authenticated users
- Create/Update/Delete: PX Admins only
"""
queryset = Role.objects.all()
serializer_class = RoleSerializer
permission_classes = [IsPXAdminOrReadOnly]
filterset_fields = ['name', 'level']
search_fields = ['name', 'display_name', 'description']
ordering_fields = ['level', 'name']
ordering = ['-level', 'name']
def get_queryset(self):
return super().get_queryset().select_related('group')