HH/apps/accounts/views.py
2026-03-15 23:48:45 +03:00

673 lines
24 KiB
Python

"""
Accounts views and viewsets
"""
from django.contrib import messages
from django.contrib.auth import get_user_model
from django.contrib.auth.decorators import login_required
from django.shortcuts import render, redirect
from django.utils.translation import gettext as _
from rest_framework import status, viewsets
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework_simplejwt.views import TokenObtainPairView
from apps.core.services import AuditService
from .models import Role
from .models import AcknowledgementChecklistItem, AcknowledgementContent, UserAcknowledgement
from .permissions import (
CanManageAcknowledgementContent,
CanManageOnboarding,
CanViewOnboarding,
IsOnboardingOwnerOrAdmin,
IsProvisionalUser,
IsPXAdmin,
IsPXAdminOrReadOnly,
IsOwnerOrPXAdmin,
)
from .serializers import (
AccountActivationSerializer,
AcknowledgementChecklistItemSerializer,
AcknowledgementContentSerializer,
AcknowledgeItemSerializer,
ChangePasswordSerializer,
ProvisionalUserSerializer,
ResendInvitationSerializer,
RoleSerializer,
UserAcknowledgementSerializer,
UserCreateSerializer,
UserSerializer,
UserUpdateSerializer,
WizardProgressSerializer,
)
User = get_user_model()
class CustomTokenObtainPairView(TokenObtainPairView):
"""
Custom JWT token view that logs user login and provides redirect info.
"""
def post(self, request, *args, **kwargs):
response = super().post(request, *args, **kwargs)
# Log successful login and add redirect info
if response.status_code == 200:
username = request.data.get("username")
try:
user = User.objects.get(username=username)
AuditService.log_from_request(
event_type="user_login",
description=f"User {user.email} logged in",
request=request,
content_object=user,
)
# Add redirect URL to response data
response_data = response.data
response_data["redirect_url"] = self.get_redirect_url(user)
response.data = response_data
except User.DoesNotExist:
pass
return response
def get_redirect_url(self, user):
"""
Determine the appropriate redirect URL based on user role and hospital context.
"""
# Check if user is a Source User first
from apps.px_sources.models import SourceUser
if SourceUser.objects.filter(user=user).exists():
return "/px_sources/dashboard/"
# PX Admins need to select a hospital first
if user.is_px_admin():
from apps.organizations.models import Hospital
# Check if there's already a hospital in session
# Since we don't have access to request here, frontend should handle this
# Return to hospital selector URL
return "/health/select-hospital/"
# Users without hospital assignment get error page
if not user.hospital:
return "/health/no-hospital/"
# All other users go to dashboard
return "/"
class UserViewSet(viewsets.ModelViewSet):
"""
ViewSet for User model.
Permissions:
- List/Retrieve: Authenticated users
- Create/Update/Delete: PX Admins only
- Users can update their own profile
"""
queryset = User.objects.all()
permission_classes = [IsAuthenticated]
filterset_fields = ["is_active", "hospital", "department", "groups"]
search_fields = ["username", "email", "first_name", "last_name", "employee_id"]
ordering_fields = ["date_joined", "email", "last_name"]
ordering = ["-date_joined"]
def get_serializer_class(self):
"""Return appropriate serializer based on action"""
if self.action == "create":
return UserCreateSerializer
elif self.action in ["update", "partial_update"]:
return UserUpdateSerializer
return UserSerializer
def get_permissions(self):
"""Set permissions based on action"""
if self.action in ["create", "destroy"]:
return [IsPXAdmin()]
elif self.action in ["update", "partial_update"]:
return [IsOwnerOrPXAdmin()]
return [IsAuthenticated()]
def get_queryset(self):
"""Filter queryset based on user role"""
queryset = super().get_queryset()
user = self.request.user
# PX Admins see all users
if user.is_px_admin():
return queryset.select_related("hospital", "department")
# Hospital Admins see users in their hospital
if user.is_hospital_admin() and user.hospital:
return queryset.filter(hospital=user.hospital).select_related("hospital", "department")
# Department Managers see users in their department
if user.is_department_manager() and user.department:
return queryset.filter(department=user.department).select_related("hospital", "department")
# Source Users see only themselves
if user.is_source_user():
return queryset.filter(id=user.id)
# Others see only themselves
return queryset.filter(id=user.id)
def perform_create(self, serializer):
"""Log user creation"""
user = serializer.save()
AuditService.log_from_request(
event_type="other", description=f"User {user.email} created", request=self.request, content_object=user
)
def perform_update(self, serializer):
"""Log user update"""
user = serializer.save()
AuditService.log_from_request(
event_type="other", description=f"User {user.email} updated", request=self.request, content_object=user
)
@action(detail=False, methods=["get"], permission_classes=[IsAuthenticated])
def me(self, request):
"""Get current user profile"""
serializer = self.get_serializer(request.user)
return Response(serializer.data)
@action(detail=False, methods=["put"], permission_classes=[IsAuthenticated])
def update_profile(self, request):
"""Update current user profile"""
serializer = UserUpdateSerializer(request.user, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
serializer.save()
AuditService.log_from_request(
event_type="other",
description=f"User {request.user.email} updated their profile",
request=request,
content_object=request.user,
)
return Response(UserSerializer(request.user).data)
@action(detail=False, methods=["post"], permission_classes=[IsAuthenticated])
def change_password(self, request):
"""Change user password"""
serializer = ChangePasswordSerializer(data=request.data, context={"request": request})
serializer.is_valid(raise_exception=True)
# Change password
request.user.set_password(serializer.validated_data["new_password"])
request.user.save()
AuditService.log_from_request(
event_type="other",
description=f"User {request.user.email} changed their password",
request=request,
content_object=request.user,
)
return Response({"message": "Password changed successfully"}, status=status.HTTP_200_OK)
@action(detail=True, methods=["post"], permission_classes=[IsPXAdmin])
def assign_role(self, request, pk=None):
"""Assign role to user (PX Admin only)"""
user = self.get_object()
role_id = request.data.get("role_id")
try:
role = Role.objects.get(id=role_id)
user.groups.add(role.group)
AuditService.log_from_request(
event_type="role_change",
description=f"Role {role.display_name} assigned to user {user.email}",
request=request,
content_object=user,
metadata={"role": role.name},
)
return Response({"message": f"Role {role.display_name} assigned successfully"})
except Role.DoesNotExist:
return Response({"error": "Role not found"}, status=status.HTTP_404_NOT_FOUND)
@action(detail=True, methods=["post"], permission_classes=[IsPXAdmin])
def remove_role(self, request, pk=None):
"""Remove role from user (PX Admin only)"""
user = self.get_object()
role_id = request.data.get("role_id")
try:
role = Role.objects.get(id=role_id)
user.groups.remove(role.group)
AuditService.log_from_request(
event_type="role_change",
description=f"Role {role.display_name} removed from user {user.email}",
request=request,
content_object=user,
metadata={"role": role.name},
)
return Response({"message": f"Role {role.display_name} removed successfully"})
except Role.DoesNotExist:
return Response({"error": "Role not found"}, status=status.HTTP_404_NOT_FOUND)
class RoleViewSet(viewsets.ModelViewSet):
"""
ViewSet for Role model.
Permissions:
- List/Retrieve: Authenticated users
- Create/Update/Delete: PX Admins only
"""
queryset = Role.objects.all()
serializer_class = RoleSerializer
permission_classes = [IsPXAdminOrReadOnly]
filterset_fields = ["name", "level"]
search_fields = ["name", "display_name", "description"]
ordering_fields = ["level", "name"]
ordering = ["-level", "name"]
def get_queryset(self):
return super().get_queryset().select_related("group")
# ==================== Settings Views ====================
@login_required
def user_settings(request):
"""
User settings page for managing notification preferences, profile, and security.
"""
user = request.user
if request.method == "POST":
# Get form type
form_type = request.POST.get("form_type", "preferences")
if form_type == "preferences":
# Update notification preferences
user.notification_email_enabled = request.POST.get("notification_email_enabled", "off") == "on"
user.notification_sms_enabled = request.POST.get("notification_sms_enabled", "off") == "on"
user.preferred_notification_channel = request.POST.get("preferred_notification_channel", "email")
user.explanation_notification_channel = request.POST.get("explanation_notification_channel", "email")
user.phone = request.POST.get("phone", "")
user.language = request.POST.get("language", "en")
messages.success(request, _("Notification preferences updated successfully."))
elif form_type == "profile":
# Update profile information
user.first_name = request.POST.get("first_name", "")
user.last_name = request.POST.get("last_name", "")
user.phone = request.POST.get("phone", "")
user.bio = request.POST.get("bio", "")
# Handle avatar upload
if request.FILES.get("avatar"):
user.avatar = request.FILES.get("avatar")
messages.success(request, _("Profile updated successfully."))
elif form_type == "password":
# Change password
current_password = request.POST.get("current_password")
new_password = request.POST.get("new_password")
confirm_password = request.POST.get("confirm_password")
if not user.check_password(current_password):
messages.error(request, _("Current password is incorrect."))
elif new_password != confirm_password:
messages.error(request, _("New passwords do not match."))
elif len(new_password) < 8:
messages.error(request, _("Password must be at least 8 characters long."))
else:
user.set_password(new_password)
messages.success(request, _("Password changed successfully. Please login again."))
# Re-authenticate user with new password
from django.contrib.auth import update_session_auth_hash
update_session_auth_hash(request, user)
user.save()
# Log the update
AuditService.log_from_request(
event_type="other", description=f"User {user.email} updated settings", request=request, content_object=user
)
return redirect("accounts:settings")
context = {
"user": user,
"notification_channels": [("email", _("Email")), ("sms", _("SMS")), ("both", _("Both"))],
"languages": [("en", _("English")), ("ar", _("Arabic"))],
}
# Add user statistics for PX Admin
if user.is_px_admin():
User = get_user_model()
context["total_users_count"] = User.objects.count()
context["active_users_count"] = User.objects.filter(is_active=True, is_provisional=False).count()
context["provisional_users_count"] = User.objects.filter(is_provisional=True).count()
context["inactive_users_count"] = User.objects.filter(is_active=False).count()
return render(request, "accounts/settings.html", context)
# ==================== Onboarding ViewSets ====================
class AcknowledgementContentViewSet(viewsets.ModelViewSet):
"""
ViewSet for AcknowledgementContent model.
Permissions:
- List/Retrieve: Authenticated users
- Create/Update/Delete: PX Admins only
"""
queryset = AcknowledgementContent.objects.all()
serializer_class = AcknowledgementContentSerializer
permission_classes = [CanManageAcknowledgementContent]
filterset_fields = ["role", "is_active"]
search_fields = ["code", "title_en", "title_ar", "description_en", "description_ar"]
ordering_fields = ["role", "order"]
ordering = ["role", "order"]
class AcknowledgementChecklistItemViewSet(viewsets.ModelViewSet):
"""
ViewSet for AcknowledgementChecklistItem model.
Permissions:
- List/Retrieve: Authenticated users
- Create/Update/Delete: PX Admins only
"""
queryset = AcknowledgementChecklistItem.objects.all()
serializer_class = AcknowledgementChecklistItemSerializer
permission_classes = [CanManageAcknowledgementContent]
filterset_fields = ["role", "content", "is_required", "is_active"]
search_fields = ["code", "text_en", "text_ar", "description_en", "description_ar"]
ordering_fields = ["role", "order"]
ordering = ["role", "order"]
def get_queryset(self):
return super().get_queryset().select_related("content")
class UserAcknowledgementViewSet(viewsets.ReadOnlyModelViewSet):
"""
ViewSet for UserAcknowledgement model.
Permissions:
- Users can view their own acknowledgements
- PX Admins can view all
"""
queryset = UserAcknowledgement.objects.all()
serializer_class = UserAcknowledgementSerializer
permission_classes = [IsOnboardingOwnerOrAdmin]
filterset_fields = ["user", "checklist_item", "is_acknowledged"]
ordering_fields = ["-acknowledged_at"]
ordering = ["-acknowledged_at"]
def get_queryset(self):
queryset = super().get_queryset()
user = self.request.user
# PX Admins see all
if user.is_px_admin():
return queryset.select_related("user", "checklist_item")
# Others see only their own
return queryset.filter(user=user).select_related("user", "checklist_item")
@action(detail=True, methods=["get"], permission_classes=[IsAuthenticated])
def download_pdf(self, request, pk=None):
"""
Download PDF for a specific acknowledgement
"""
from django.http import FileResponse, Http404
import os
acknowledgement = self.get_object()
# Check if PDF exists
if not acknowledgement.pdf_file:
return Response({"error": "PDF not available for this acknowledgement"}, status=status.HTTP_404_NOT_FOUND)
# Check file exists
if not os.path.exists(acknowledgement.pdf_file.path):
return Response({"error": "PDF file not found"}, status=status.HTTP_404_NOT_FOUND)
# Return file
try:
response = FileResponse(open(acknowledgement.pdf_file.path, "rb"), content_type="application/pdf")
# Generate filename
filename = f"acknowledgement_{acknowledgement.id}_{acknowledgement.user.employee_id or acknowledgement.user.username}.pdf"
response["Content-Disposition"] = f'attachment; filename="{filename}"'
return response
except Exception as e:
return Response({"error": f"Error downloading PDF: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# ==================== Onboarding Actions for UserViewSet ====================
def onboarding_create_provisional(self, request):
"""Create provisional user"""
from .services import OnboardingService, EmailService
serializer = ProvisionalUserSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
# Create provisional user
user_data = serializer.validated_data.copy()
roles = user_data.pop("roles", [])
user = OnboardingService.create_provisional_user(user_data)
# Assign roles
for role_name in roles:
from .models import Role as RoleModel
try:
role = RoleModel.objects.get(name=role_name)
user.groups.add(role.group)
except RoleModel.DoesNotExist:
pass
# Send invitation email
EmailService.send_invitation_email(user, request)
return Response(UserSerializer(user).data, status=status.HTTP_201_CREATED)
def onboarding_resend_invitation(self, request, pk=None):
"""Resend invitation email"""
from .services import EmailService
user = self.get_object()
if not user.is_provisional:
return Response({"error": "User is not a provisional user"}, status=status.HTTP_400_BAD_REQUEST)
EmailService.send_reminder_email(user, request)
return Response({"message": "Invitation email resent successfully"})
def onboarding_progress(self, request):
"""Get current user's onboarding progress"""
from .services import OnboardingService
user = request.user
# Get checklist items
required_items = OnboardingService.get_checklist_items(user).filter(is_required=True)
acknowledged_items = UserAcknowledgement.objects.filter(
user=user, checklist_item__in=required_items, is_acknowledged=True
)
progress = {
"current_step": user.current_wizard_step,
"completed_steps": user.wizard_completed_steps,
"progress_percentage": OnboardingService.get_user_progress_percentage(user),
"total_required_items": required_items.count(),
"acknowledged_items": acknowledged_items.count(),
}
serializer = WizardProgressSerializer(progress)
return Response(serializer.data)
def onboarding_content(self, request):
"""Get wizard content for current user"""
from .services import OnboardingService
content = OnboardingService.get_wizard_content(request.user)
serializer = AcknowledgementContentSerializer(content, many=True)
return Response(serializer.data)
def onboarding_checklist(self, request):
"""Get checklist items for current user"""
from .services import OnboardingService
items = OnboardingService.get_checklist_items(request.user)
# Include acknowledgement status
from django.db import models
acknowledged_ids = UserAcknowledgement.objects.filter(user=request.user, is_acknowledged=True).values_list(
"checklist_item_id", flat=True
)
data = []
for item in items:
item_data = AcknowledgementChecklistItemSerializer(item).data
item_data["is_acknowledged"] = item.id in acknowledged_ids
data.append(item_data)
return Response(data)
def onboarding_acknowledge(self, request):
"""Acknowledge a checklist item"""
from .services import OnboardingService
serializer = AcknowledgeItemSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
try:
item = AcknowledgementChecklistItem.objects.get(id=serializer.validated_data["checklist_item_id"])
except AcknowledgementChecklistItem.DoesNotExist:
return Response({"error": "Checklist item not found"}, status=status.HTTP_404_NOT_FOUND)
# Acknowledge item
OnboardingService.acknowledge_item(
request.user, item, signature=serializer.validated_data.get("signature", ""), request=request
)
return Response({"message": "Item acknowledged successfully"})
def onboarding_complete(self, request):
"""Complete wizard and activate account"""
from .services import OnboardingService, EmailService
serializer = AccountActivationSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
# Complete wizard
success = OnboardingService.complete_wizard(
request.user,
serializer.validated_data["username"],
serializer.validated_data["password"],
serializer.validated_data["signature"],
request=request,
)
if not success:
return Response(
{"error": "Failed to complete wizard. Please ensure all required items are acknowledged."},
status=status.HTTP_400_BAD_REQUEST,
)
# Notify admins
from django.contrib.auth import get_user_model
User = get_user_model()
admin_users = User.objects.filter(groups__name="PX Admin")
EmailService.send_completion_notification(request.user, admin_users, request)
return Response({"message": "Account activated successfully"})
def onboarding_status(self, request, pk=None):
"""Get onboarding status for a specific user"""
user = self.get_object()
status_data = {
"user": UserSerializer(user).data,
"is_provisional": user.is_provisional,
"acknowledgement_completed": user.acknowledgement_completed,
"acknowledgement_completed_at": user.acknowledgement_completed_at,
"current_wizard_step": user.current_wizard_step,
"invitation_expires_at": user.invitation_expires_at,
"progress_percentage": user.get_onboarding_progress_percentage(),
}
return Response(status_data)
# Add onboarding actions to UserViewSet with proper function names
UserViewSet.onboarding_create_provisional = action(
detail=False, methods=["post"], permission_classes=[CanManageOnboarding], url_path="onboarding/create-provisional"
)(onboarding_create_provisional)
UserViewSet.onboarding_resend_invitation = action(
detail=True, methods=["post"], permission_classes=[CanManageOnboarding], url_path="onboarding/resend-invitation"
)(onboarding_resend_invitation)
UserViewSet.onboarding_progress = action(
detail=False, methods=["get"], permission_classes=[IsProvisionalUser], url_path="onboarding/progress"
)(onboarding_progress)
UserViewSet.onboarding_content = action(
detail=False, methods=["get"], permission_classes=[IsProvisionalUser], url_path="onboarding/content"
)(onboarding_content)
UserViewSet.onboarding_checklist = action(
detail=False, methods=["get"], permission_classes=[IsProvisionalUser], url_path="onboarding/checklist"
)(onboarding_checklist)
UserViewSet.onboarding_acknowledge = action(
detail=False, methods=["post"], permission_classes=[IsProvisionalUser], url_path="onboarding/acknowledge"
)(onboarding_acknowledge)
UserViewSet.onboarding_complete = action(
detail=False, methods=["post"], permission_classes=[IsProvisionalUser], url_path="onboarding/complete"
)(onboarding_complete)
UserViewSet.onboarding_status = action(
detail=True, methods=["get"], permission_classes=[CanViewOnboarding], url_path="onboarding/status"
)(onboarding_status)
# ==================== Onboarding ViewSets ====================