HH/apps/accounts/views.py
2026-01-15 15:02:42 +03:00

673 lines
22 KiB
Python

"""
Accounts views and viewsets
"""
from django.contrib import messages
from django.contrib.auth import get_user_model
from django.contrib.auth.decorators import login_required
from django.shortcuts import render, redirect
from django.utils.translation import gettext as _
from rest_framework import status, viewsets
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework_simplejwt.views import TokenObtainPairView
from apps.core.services import AuditService
from .models import Role
from .models import AcknowledgementChecklistItem, AcknowledgementContent, UserAcknowledgement
from .permissions import (
CanManageAcknowledgementContent,
CanManageOnboarding,
CanViewOnboarding,
IsOnboardingOwnerOrAdmin,
IsProvisionalUser,
IsPXAdmin,
IsPXAdminOrReadOnly,
IsOwnerOrPXAdmin,
)
from .serializers import (
AccountActivationSerializer,
AcknowledgementChecklistItemSerializer,
AcknowledgementContentSerializer,
AcknowledgeItemSerializer,
ChangePasswordSerializer,
ProvisionalUserSerializer,
ResendInvitationSerializer,
RoleSerializer,
UserAcknowledgementSerializer,
UserCreateSerializer,
UserSerializer,
UserUpdateSerializer,
WizardProgressSerializer,
)
User = get_user_model()
class CustomTokenObtainPairView(TokenObtainPairView):
"""
Custom JWT token view that logs user login and provides redirect info.
"""
def post(self, request, *args, **kwargs):
response = super().post(request, *args, **kwargs)
# Log successful login and add redirect info
if response.status_code == 200:
username = request.data.get('username')
try:
user = User.objects.get(username=username)
AuditService.log_from_request(
event_type='user_login',
description=f"User {user.email} logged in",
request=request,
content_object=user
)
# Add redirect URL to response data
response_data = response.data
response_data['redirect_url'] = self.get_redirect_url(user)
response.data = response_data
except User.DoesNotExist:
pass
return response
def get_redirect_url(self, user):
"""
Determine the appropriate redirect URL based on user role and hospital context.
"""
# Check if user is a Source User first
from apps.px_sources.models import SourceUser
if SourceUser.objects.filter(user=user).exists():
return '/px_sources/dashboard/'
# PX Admins need to select a hospital first
if user.is_px_admin():
from apps.organizations.models import Hospital
# Check if there's already a hospital in session
# Since we don't have access to request here, frontend should handle this
# Return to hospital selector URL
return '/health/select-hospital/'
# Users without hospital assignment get error page
if not user.hospital:
return '/health/no-hospital/'
# All other users go to dashboard
return '/'
class UserViewSet(viewsets.ModelViewSet):
"""
ViewSet for User model.
Permissions:
- List/Retrieve: Authenticated users
- Create/Update/Delete: PX Admins only
- Users can update their own profile
"""
queryset = User.objects.all()
permission_classes = [IsAuthenticated]
filterset_fields = ['is_active', 'hospital', 'department', 'groups']
search_fields = ['username', 'email', 'first_name', 'last_name', 'employee_id']
ordering_fields = ['date_joined', 'email', 'last_name']
ordering = ['-date_joined']
def get_serializer_class(self):
"""Return appropriate serializer based on action"""
if self.action == 'create':
return UserCreateSerializer
elif self.action in ['update', 'partial_update']:
return UserUpdateSerializer
return UserSerializer
def get_permissions(self):
"""Set permissions based on action"""
if self.action in ['create', 'destroy']:
return [IsPXAdmin()]
elif self.action in ['update', 'partial_update']:
return [IsOwnerOrPXAdmin()]
return [IsAuthenticated()]
def get_queryset(self):
"""Filter queryset based on user role"""
queryset = super().get_queryset()
user = self.request.user
# PX Admins see all users
if user.is_px_admin():
return queryset.select_related('hospital', 'department')
# Hospital Admins see users in their hospital
if user.is_hospital_admin() and user.hospital:
return queryset.filter(hospital=user.hospital).select_related('hospital', 'department')
# Department Managers see users in their department
if user.is_department_manager() and user.department:
return queryset.filter(department=user.department).select_related('hospital', 'department')
# Others see only themselves
return queryset.filter(id=user.id)
def perform_create(self, serializer):
"""Log user creation"""
user = serializer.save()
AuditService.log_from_request(
event_type='other',
description=f"User {user.email} created",
request=self.request,
content_object=user
)
def perform_update(self, serializer):
"""Log user update"""
user = serializer.save()
AuditService.log_from_request(
event_type='other',
description=f"User {user.email} updated",
request=self.request,
content_object=user
)
@action(detail=False, methods=['get'], permission_classes=[IsAuthenticated])
def me(self, request):
"""Get current user profile"""
serializer = self.get_serializer(request.user)
return Response(serializer.data)
@action(detail=False, methods=['put'], permission_classes=[IsAuthenticated])
def update_profile(self, request):
"""Update current user profile"""
serializer = UserUpdateSerializer(request.user, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
serializer.save()
AuditService.log_from_request(
event_type='other',
description=f"User {request.user.email} updated their profile",
request=request,
content_object=request.user
)
return Response(UserSerializer(request.user).data)
@action(detail=False, methods=['post'], permission_classes=[IsAuthenticated])
def change_password(self, request):
"""Change user password"""
serializer = ChangePasswordSerializer(data=request.data, context={'request': request})
serializer.is_valid(raise_exception=True)
# Change password
request.user.set_password(serializer.validated_data['new_password'])
request.user.save()
AuditService.log_from_request(
event_type='other',
description=f"User {request.user.email} changed their password",
request=request,
content_object=request.user
)
return Response({'message': 'Password changed successfully'}, status=status.HTTP_200_OK)
@action(detail=True, methods=['post'], permission_classes=[IsPXAdmin])
def assign_role(self, request, pk=None):
"""Assign role to user (PX Admin only)"""
user = self.get_object()
role_id = request.data.get('role_id')
try:
role = Role.objects.get(id=role_id)
user.groups.add(role.group)
AuditService.log_from_request(
event_type='role_change',
description=f"Role {role.display_name} assigned to user {user.email}",
request=request,
content_object=user,
metadata={'role': role.name}
)
return Response({'message': f'Role {role.display_name} assigned successfully'})
except Role.DoesNotExist:
return Response({'error': 'Role not found'}, status=status.HTTP_404_NOT_FOUND)
@action(detail=True, methods=['post'], permission_classes=[IsPXAdmin])
def remove_role(self, request, pk=None):
"""Remove role from user (PX Admin only)"""
user = self.get_object()
role_id = request.data.get('role_id')
try:
role = Role.objects.get(id=role_id)
user.groups.remove(role.group)
AuditService.log_from_request(
event_type='role_change',
description=f"Role {role.display_name} removed from user {user.email}",
request=request,
content_object=user,
metadata={'role': role.name}
)
return Response({'message': f'Role {role.display_name} removed successfully'})
except Role.DoesNotExist:
return Response({'error': 'Role not found'}, status=status.HTTP_404_NOT_FOUND)
class RoleViewSet(viewsets.ModelViewSet):
"""
ViewSet for Role model.
Permissions:
- List/Retrieve: Authenticated users
- Create/Update/Delete: PX Admins only
"""
queryset = Role.objects.all()
serializer_class = RoleSerializer
permission_classes = [IsPXAdminOrReadOnly]
filterset_fields = ['name', 'level']
search_fields = ['name', 'display_name', 'description']
ordering_fields = ['level', 'name']
ordering = ['-level', 'name']
def get_queryset(self):
return super().get_queryset().select_related('group')
# ==================== Settings Views ====================
@login_required
def user_settings(request):
"""
User settings page for managing notification preferences, profile, and security.
"""
user = request.user
if request.method == 'POST':
# Get form type
form_type = request.POST.get('form_type', 'preferences')
if form_type == 'preferences':
# Update notification preferences
user.notification_email_enabled = request.POST.get('notification_email_enabled', 'off') == 'on'
user.notification_sms_enabled = request.POST.get('notification_sms_enabled', 'off') == 'on'
user.preferred_notification_channel = request.POST.get('preferred_notification_channel', 'email')
user.explanation_notification_channel = request.POST.get('explanation_notification_channel', 'email')
user.phone = request.POST.get('phone', '')
user.language = request.POST.get('language', 'en')
messages.success(request, _('Notification preferences updated successfully.'))
elif form_type == 'profile':
# Update profile information
user.first_name = request.POST.get('first_name', '')
user.last_name = request.POST.get('last_name', '')
user.phone = request.POST.get('phone', '')
user.bio = request.POST.get('bio', '')
# Handle avatar upload
if request.FILES.get('avatar'):
user.avatar = request.FILES.get('avatar')
messages.success(request, _('Profile updated successfully.'))
elif form_type == 'password':
# Change password
current_password = request.POST.get('current_password')
new_password = request.POST.get('new_password')
confirm_password = request.POST.get('confirm_password')
if not user.check_password(current_password):
messages.error(request, _('Current password is incorrect.'))
elif new_password != confirm_password:
messages.error(request, _('New passwords do not match.'))
elif len(new_password) < 8:
messages.error(request, _('Password must be at least 8 characters long.'))
else:
user.set_password(new_password)
messages.success(request, _('Password changed successfully. Please login again.'))
# Re-authenticate user with new password
from django.contrib.auth import update_session_auth_hash
update_session_auth_hash(request, user)
user.save()
# Log the update
AuditService.log_from_request(
event_type='other',
description=f"User {user.email} updated settings",
request=request,
content_object=user
)
return redirect('accounts:settings')
context = {
'user': user,
'notification_channels': [
('email', _('Email')),
('sms', _('SMS')),
('both', _('Both'))
],
'languages': [
('en', _('English')),
('ar', _('Arabic'))
]
}
return render(request, 'accounts/settings.html', context)
# ==================== Onboarding ViewSets ====================
class AcknowledgementContentViewSet(viewsets.ModelViewSet):
"""
ViewSet for AcknowledgementContent model.
Permissions:
- List/Retrieve: Authenticated users
- Create/Update/Delete: PX Admins only
"""
queryset = AcknowledgementContent.objects.all()
serializer_class = AcknowledgementContentSerializer
permission_classes = [CanManageAcknowledgementContent]
filterset_fields = ['role', 'is_active']
search_fields = ['code', 'title_en', 'title_ar', 'description_en', 'description_ar']
ordering_fields = ['role', 'order']
ordering = ['role', 'order']
class AcknowledgementChecklistItemViewSet(viewsets.ModelViewSet):
"""
ViewSet for AcknowledgementChecklistItem model.
Permissions:
- List/Retrieve: Authenticated users
- Create/Update/Delete: PX Admins only
"""
queryset = AcknowledgementChecklistItem.objects.all()
serializer_class = AcknowledgementChecklistItemSerializer
permission_classes = [CanManageAcknowledgementContent]
filterset_fields = ['role', 'content', 'is_required', 'is_active']
search_fields = ['code', 'text_en', 'text_ar', 'description_en', 'description_ar']
ordering_fields = ['role', 'order']
ordering = ['role', 'order']
def get_queryset(self):
return super().get_queryset().select_related('content')
class UserAcknowledgementViewSet(viewsets.ReadOnlyModelViewSet):
"""
ViewSet for UserAcknowledgement model.
Permissions:
- Users can view their own acknowledgements
- PX Admins can view all
"""
queryset = UserAcknowledgement.objects.all()
serializer_class = UserAcknowledgementSerializer
permission_classes = [IsOnboardingOwnerOrAdmin]
filterset_fields = ['user', 'checklist_item', 'is_acknowledged']
ordering_fields = ['-acknowledged_at']
ordering = ['-acknowledged_at']
def get_queryset(self):
queryset = super().get_queryset()
user = self.request.user
# PX Admins see all
if user.is_px_admin():
return queryset.select_related('user', 'checklist_item')
# Others see only their own
return queryset.filter(user=user).select_related('user', 'checklist_item')
# ==================== Onboarding Actions for UserViewSet ====================
def onboarding_create_provisional(self, request):
"""Create provisional user"""
from .services import OnboardingService, EmailService
serializer = ProvisionalUserSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
# Create provisional user
user_data = serializer.validated_data.copy()
roles = user_data.pop('roles', [])
user = OnboardingService.create_provisional_user(user_data)
# Assign roles
for role_name in roles:
from .models import Role as RoleModel
try:
role = RoleModel.objects.get(name=role_name)
user.groups.add(role.group)
except RoleModel.DoesNotExist:
pass
# Send invitation email
EmailService.send_invitation_email(user, request)
return Response(
UserSerializer(user).data,
status=status.HTTP_201_CREATED
)
def onboarding_resend_invitation(self, request, pk=None):
"""Resend invitation email"""
from .services import EmailService
user = self.get_object()
if not user.is_provisional:
return Response(
{'error': 'User is not a provisional user'},
status=status.HTTP_400_BAD_REQUEST
)
EmailService.send_reminder_email(user, request)
return Response({'message': 'Invitation email resent successfully'})
def onboarding_progress(self, request):
"""Get current user's onboarding progress"""
from .services import OnboardingService
user = request.user
# Get checklist items
required_items = OnboardingService.get_checklist_items(user).filter(is_required=True)
acknowledged_items = UserAcknowledgement.objects.filter(
user=user,
checklist_item__in=required_items,
is_acknowledged=True
)
progress = {
'current_step': user.current_wizard_step,
'completed_steps': user.wizard_completed_steps,
'progress_percentage': OnboardingService.get_user_progress_percentage(user),
'total_required_items': required_items.count(),
'acknowledged_items': acknowledged_items.count()
}
serializer = WizardProgressSerializer(progress)
return Response(serializer.data)
def onboarding_content(self, request):
"""Get wizard content for current user"""
from .services import OnboardingService
content = OnboardingService.get_wizard_content(request.user)
serializer = AcknowledgementContentSerializer(content, many=True)
return Response(serializer.data)
def onboarding_checklist(self, request):
"""Get checklist items for current user"""
from .services import OnboardingService
items = OnboardingService.get_checklist_items(request.user)
# Include acknowledgement status
from django.db import models
acknowledged_ids = UserAcknowledgement.objects.filter(
user=request.user,
is_acknowledged=True
).values_list('checklist_item_id', flat=True)
data = []
for item in items:
item_data = AcknowledgementChecklistItemSerializer(item).data
item_data['is_acknowledged'] = item.id in acknowledged_ids
data.append(item_data)
return Response(data)
def onboarding_acknowledge(self, request):
"""Acknowledge a checklist item"""
from .services import OnboardingService
serializer = AcknowledgeItemSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
try:
item = AcknowledgementChecklistItem.objects.get(
id=serializer.validated_data['checklist_item_id']
)
except AcknowledgementChecklistItem.DoesNotExist:
return Response(
{'error': 'Checklist item not found'},
status=status.HTTP_404_NOT_FOUND
)
# Acknowledge item
OnboardingService.acknowledge_item(
request.user,
item,
signature=serializer.validated_data.get('signature', ''),
request=request
)
return Response({'message': 'Item acknowledged successfully'})
def onboarding_complete(self, request):
"""Complete wizard and activate account"""
from .services import OnboardingService, EmailService
serializer = AccountActivationSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
# Complete wizard
success = OnboardingService.complete_wizard(
request.user,
serializer.validated_data['username'],
serializer.validated_data['password'],
serializer.validated_data['signature'],
request=request
)
if not success:
return Response(
{'error': 'Failed to complete wizard. Please ensure all required items are acknowledged.'},
status=status.HTTP_400_BAD_REQUEST
)
# Notify admins
from django.contrib.auth import get_user_model
User = get_user_model()
admin_users = User.objects.filter(groups__name='PX Admin')
EmailService.send_completion_notification(request.user, admin_users, request)
return Response({'message': 'Account activated successfully'})
def onboarding_status(self, request, pk=None):
"""Get onboarding status for a specific user"""
user = self.get_object()
status_data = {
'user': UserSerializer(user).data,
'is_provisional': user.is_provisional,
'acknowledgement_completed': user.acknowledgement_completed,
'acknowledgement_completed_at': user.acknowledgement_completed_at,
'current_wizard_step': user.current_wizard_step,
'invitation_expires_at': user.invitation_expires_at,
'progress_percentage': user.get_onboarding_progress_percentage()
}
return Response(status_data)
# Add onboarding actions to UserViewSet with proper function names
UserViewSet.onboarding_create_provisional = action(
detail=False,
methods=['post'],
permission_classes=[CanManageOnboarding],
url_path='onboarding/create-provisional'
)(onboarding_create_provisional)
UserViewSet.onboarding_resend_invitation = action(
detail=True,
methods=['post'],
permission_classes=[CanManageOnboarding],
url_path='onboarding/resend-invitation'
)(onboarding_resend_invitation)
UserViewSet.onboarding_progress = action(
detail=False,
methods=['get'],
permission_classes=[IsProvisionalUser],
url_path='onboarding/progress'
)(onboarding_progress)
UserViewSet.onboarding_content = action(
detail=False,
methods=['get'],
permission_classes=[IsProvisionalUser],
url_path='onboarding/content'
)(onboarding_content)
UserViewSet.onboarding_checklist = action(
detail=False,
methods=['get'],
permission_classes=[IsProvisionalUser],
url_path='onboarding/checklist'
)(onboarding_checklist)
UserViewSet.onboarding_acknowledge = action(
detail=False,
methods=['post'],
permission_classes=[IsProvisionalUser],
url_path='onboarding/acknowledge'
)(onboarding_acknowledge)
UserViewSet.onboarding_complete = action(
detail=False,
methods=['post'],
permission_classes=[IsProvisionalUser],
url_path='onboarding/complete'
)(onboarding_complete)
UserViewSet.onboarding_status = action(
detail=True,
methods=['get'],
permission_classes=[CanViewOnboarding],
url_path='onboarding/status'
)(onboarding_status)
# ==================== Onboarding ViewSets ====================