""" Accounts UI views - Handle HTML rendering for onboarding and authentication """ from django.shortcuts import redirect, render from django.contrib.auth.decorators import login_required from django.contrib.auth import authenticate, login, logout, update_session_auth_hash from django.contrib.auth import get_user_model from django.contrib.auth.forms import PasswordResetForm, SetPasswordForm from django.contrib.auth.views import PasswordResetConfirmView from django.utils import timezone from django.views.decorators.http import require_http_methods from django.http import JsonResponse from django.contrib import messages from django.views.decorators.cache import never_cache from django.utils.translation import gettext_lazy as _ from django.contrib.auth.tokens import default_token_generator import logging from .models import ( AcknowledgementCategory, AcknowledgementContent, AcknowledgementChecklistItem, UserAcknowledgement, UserProvisionalLog, ) from .permissions import IsPXAdmin, CanManageOnboarding, CanViewOnboarding from .services import OnboardingService User = get_user_model() logger = logging.getLogger(__name__) # ==================== Authentication Views ==================== @never_cache def login_view(request): """ Login view for users to authenticate """ # If user is already authenticated, redirect based on role if request.user.is_authenticated: from apps.px_sources.models import SourceUser if SourceUser.objects.filter(user=request.user, is_active=True).exists(): return redirect("/px-sources/dashboard/") return redirect("/") if request.method == "POST": email = request.POST.get("email", "").strip() password = request.POST.get("password", "") remember_me = request.POST.get("remember_me") if email and password: # Authenticate user user = authenticate(request, username=email, password=password) if user is not None: # Check if user is active if not user.is_active: messages.error(request, "This account has been deactivated. Please contact your administrator.") return render(request, "accounts/login.html") # Login user login(request, user) from apps.accounts.services import StaffActivityService StaffActivityService.log( user=user, activity_type="login", description=f"User {user.email} logged in", request=request, module="accounts", ) # Set session expiry based on remember_me if not remember_me: request.session.set_expiry(0) # Session expires when browser closes else: request.session.set_expiry(1209600) # 2 weeks in seconds # Check if user is a Source User from apps.px_sources.models import SourceUser if SourceUser.objects.filter(user=user, is_active=True).exists(): return redirect("/px-sources/dashboard/") # Redirect to next URL or dashboard next_url = request.GET.get("next", "") if next_url: return redirect(next_url) return redirect("/") else: messages.error(request, "Invalid email or password. Please try again.") else: messages.error(request, "Please provide both email and password.") context = { "page_title": "Login - PX360", } return render(request, "accounts/login.html", context) @login_required def logout_view(request): """ Logout view for users to sign out """ from apps.accounts.services import StaffActivityService StaffActivityService.log( user=request.user, activity_type="logout", description=f"User {request.user.email} logged out", request=request, module="accounts", ) logout(request) messages.success(request, "You have been logged out successfully.") return redirect("accounts:login") @never_cache def password_reset_view(request): """ Password reset view - allows users to request a password reset email """ if request.user.is_authenticated: messages.info(request, "You are already logged in. You can change your password from your profile.") return redirect("/") if request.method == "POST": form = PasswordResetForm(request.POST) if form.is_valid(): form.save( request=request, use_https=request.is_secure(), email_template_name="accounts/email/password_reset_email.html", subject_template_name="accounts/email/password_reset_subject.txt", ) messages.success( request, "We've sent you an email with instructions to reset your password. Please check your inbox." ) return redirect("accounts:login") else: form = PasswordResetForm() context = { "form": form, "page_title": "Reset Password - PX360", } return render(request, "accounts/password_reset.html", context) class CustomPasswordResetConfirmView(PasswordResetConfirmView): """ Custom password reset confirm view with custom template """ template_name = "accounts/password_reset_confirm.html" success_url = "/accounts/login/" def form_valid(self, form): messages.success( self.request, "Your password has been reset successfully. You can now login with your new password." ) return super().form_valid(form) @login_required def change_password_view(request): """ Change password view for authenticated users """ if request.method == "POST": form = SetPasswordForm(request.user, request.POST) if form.is_valid(): user = form.save() update_session_auth_hash(request, user) # Keep user logged in messages.success(request, "Your password has been changed successfully.") # Check if user is a Source User from apps.px_sources.models import SourceUser if SourceUser.objects.filter(user=user, is_active=True).exists(): return redirect("/px-sources/dashboard/") else: return redirect("/") else: form = SetPasswordForm(request.user) # Determine redirect URL based on user type from apps.px_sources.models import SourceUser redirect_url = ( "/px-sources/dashboard/" if SourceUser.objects.filter(user=request.user, is_active=True).exists() else "/" ) context = { "form": form, "page_title": "Change Password - PX360", "redirect_url": redirect_url, } return render(request, "accounts/change_password.html", context) # ==================== Onboarding Wizard Views ==================== @never_cache def onboarding_activate(request, token): """ Activate provisional user account using invitation token. Validates token, logs user in, and redirects to onboarding wizard. """ # If user is already authenticated and not provisional, redirect to dashboard if request.user.is_authenticated and not request.user.is_provisional: return redirect("/") # Validate the token user = OnboardingService.validate_token(token) if user is None: # Invalid or expired token messages.error(request, "Invalid or expired activation link. Please contact your administrator for assistance.") return render( request, "accounts/onboarding/activation_error.html", { "page_title": "Activation Error", }, ) # Token is valid - log the user in # Use a backend that doesn't require password from django.contrib.auth.backends import ModelBackend backend = ModelBackend() user.backend = f"{backend.__module__}.{backend.__class__.__name__}" login(request, user) # Log the wizard start from .models import UserProvisionalLog UserProvisionalLog.objects.create( user=user, event_type="wizard_started", description="User started onboarding wizard via activation link", metadata={"token": token[:10] + "..."}, # Only log partial token for security ) # Store token in session for the wizard flow request.session["onboarding_token"] = token # Redirect to welcome page messages.success(request, f"Welcome {user.first_name}! Please complete your onboarding.") return redirect("accounts:onboarding-welcome") @never_cache def onboarding_welcome(request, token=None): """ Welcome page for onboarding wizard """ # If user is already authenticated and not provisional, redirect to dashboard if request.user.is_authenticated and not request.user.is_provisional: return redirect("/") # Check if user is authenticated and provisional if not request.user.is_authenticated: # Not logged in - redirect to login messages.warning(request, "Please use your activation link to access the onboarding wizard.") return redirect("accounts:login") if not request.user.is_provisional: # User is already activated messages.info(request, "Your account is already activated.") return redirect("/") context = { "page_title": "Welcome to PX360", "user": request.user, } return render(request, "accounts/onboarding/welcome.html", context) @login_required def onboarding_step_content(request, step): """ Display content step of the onboarding wizard """ user = request.user # Check if user is provisional if not user.is_provisional: return redirect("/") # Get content for user's role content_list = get_wizard_content_for_user(user) if not content_list: return redirect("/accounts/onboarding/wizard/checklist/") # Get current step content try: current_content = content_list[step - 1] except IndexError: # Step doesn't exist, go to checklist return redirect("/accounts/onboarding/wizard/checklist/") if request.method == "POST": OnboardingService.save_wizard_step(user, step) if step < len(content_list): return redirect("/accounts/onboarding/wizard/step/{}/".format(step + 1)) return redirect("/accounts/onboarding/wizard/checklist/") # Get completed steps completed_steps = user.wizard_completed_steps or [] # Calculate progress progress_percentage = int((len(completed_steps) / len(content_list)) * 100) # Get previous and next steps previous_step = step - 1 if step > 1 else None next_step = step + 1 if step < len(content_list) else None context = { "page_title": f"Onboarding - Step {step}", "step": step, "content": content_list, "current_content": current_content, "completed_steps": completed_steps, "progress_percentage": progress_percentage, "previous_step": previous_step, "next_step": next_step, "user": user, } return render(request, "accounts/onboarding/step_content.html", context) @login_required def onboarding_step_checklist(request): """ Display checklist step of the onboarding wizard """ user = request.user # Check if user is provisional if not user.is_provisional: return redirect("/dashboard/") # Get checklist items for user's role checklist_items = get_checklist_items_for_user(user) # Get acknowledged items acknowledged_ids = UserAcknowledgement.objects.filter(user=user, is_acknowledged=True).values_list( "checklist_item_id", flat=True ) # Add acknowledgement status to items for item in checklist_items: item.is_acknowledged = item.id in acknowledged_ids # Get required items IDs required_items = [str(item.id) for item in checklist_items if item.is_required] # Calculate progress total_count = len(checklist_items) acknowledged_count = len([i for i in checklist_items if i.is_acknowledged]) progress_percentage = int((acknowledged_count / total_count) * 100) if total_count > 0 else 0 context = { "page_title": "Acknowledgement Checklist", "checklist_items": checklist_items, "acknowledged_count": acknowledged_count, "total_count": total_count, "progress_percentage": progress_percentage, "required_items_json": required_items, "user": user, } return render(request, "accounts/onboarding/step_checklist.html", context) @login_required def onboarding_step_activation(request): """ Display account activation step """ user = request.user # Check if user is provisional if not user.is_provisional: return redirect("/dashboard/") # Check if all required acknowledgements are completed required_items = get_checklist_items_for_user(user).filter(is_required=True) acknowledged_items = UserAcknowledgement.objects.filter( user=user, checklist_item__in=required_items, is_acknowledged=True ) if required_items.count() != acknowledged_items.count(): messages.warning(request, "Please complete all required acknowledgements first.") return redirect("/accounts/onboarding/wizard/checklist/") context = { "page_title": "Account Activation", "user": user, } return render(request, "accounts/onboarding/step_activation.html", context) @login_required def onboarding_complete(request): """ Display completion page """ user = request.user # Check if user is not provisional (i.e., completed onboarding) if user.is_provisional: return redirect("/accounts/onboarding/wizard/step/1/") context = { "page_title": "Onboarding Complete", "user": user, } return render(request, "accounts/onboarding/complete.html", context) # ==================== Provisional User Management Views ==================== @login_required def preview_wizard_as_role(request, role=None): """ Preview the onboarding wizard as a specific role. Allows admins to see exactly what users with different roles will see. """ if not request.user.is_px_admin(): messages.error(request, "You do not have permission to view this page.") return redirect("/dashboard/") from .models import Role available_roles = Role.objects.all() preview_content = AcknowledgementContent.objects.filter(is_active=True).order_by("category", "order") preview_checklist = AcknowledgementChecklistItem.objects.filter(is_active=True).order_by( "category", "order", "code" ) context = { "page_title": "Wizard Preview", "available_roles": available_roles, "selected_role": None, "preview_content": preview_content, "preview_checklist": preview_checklist, "content_count": preview_content.count(), "checklist_count": preview_checklist.count(), } return render(request, "accounts/onboarding/preview_wizard.html", context) @login_required def acknowledgement_dashboard(request): """ Acknowledgement reporting dashboard for admins. Shows completion statistics, pending users, and analytics. """ if not (request.user.is_px_admin() or request.user.is_hospital_admin()): messages.error(request, "You do not have permission to view this page.") return redirect("/dashboard/") from django.db.models import Count, Avg, Case, When, IntegerField, F from apps.organizations.models import Hospital # Base queryset - filter by hospital based on user role base_queryset = User.objects.all() if request.user.is_hospital_admin() and not request.user.is_px_admin(): base_queryset = base_queryset.filter(hospital=request.user.hospital) elif request.user.is_px_admin() and request.tenant_hospital: base_queryset = base_queryset.filter(hospital=request.tenant_hospital) # ===== Overall Statistics ===== total_users = base_queryset.filter(is_provisional=False).count() provisional_users = base_queryset.filter(is_provisional=True) stats = { "total_provisional": provisional_users.count(), "completed_onboarding": base_queryset.filter(is_provisional=False, acknowledgement_completed=True).count(), "in_progress": provisional_users.filter(acknowledgement_completed=False).count(), "expired_invitations": provisional_users.filter(invitation_expires_at__lt=timezone.now()).count(), } # Calculate completion rate total_onboarded = stats["completed_onboarding"] + stats["in_progress"] stats["completion_rate"] = ( round((stats["completed_onboarding"] / total_onboarded) * 100, 1) if total_onboarded > 0 else 0 ) # ===== Recent Activity ===== recent_activations = ( base_queryset.filter(acknowledgement_completed=True, acknowledgement_completed_at__isnull=False) .select_related("hospital", "department") .order_by("-acknowledgement_completed_at")[:10] ) # ===== Pending Users List ===== pending_users = ( provisional_users.filter(acknowledgement_completed=False) .select_related("hospital", "department") .order_by("invitation_expires_at")[:20] ) # Add days remaining for each pending user for user in pending_users: if user.invitation_expires_at: user.days_remaining = (user.invitation_expires_at - timezone.now()).days user.is_expiring_soon = user.days_remaining <= 2 else: user.days_remaining = None user.is_expiring_soon = False # ===== Completion by Role ===== completion_by_role = [] role_names = ["PX Admin", "Hospital Admin", "Department Manager", "Staff", "Physician", "Nurse"] for role_name in role_names: role_users = base_queryset.filter(groups__name=role_name) total = role_users.count() completed = role_users.filter(acknowledgement_completed=True).count() if total > 0: completion_by_role.append( { "role": role_name, "total": total, "completed": completed, "pending": total - completed, "rate": round((completed / total) * 100, 1), } ) # Sort by completion rate completion_by_role.sort(key=lambda x: x["rate"], reverse=True) # ===== Completion by Hospital (PX Admin only) ===== completion_by_hospital = [] if request.user.is_px_admin(): for hospital in Hospital.objects.filter(status="active"): hospital_users = User.objects.filter(hospital=hospital) total = hospital_users.count() completed = hospital_users.filter(acknowledgement_completed=True).count() if total > 0: completion_by_hospital.append( { "hospital": hospital, "total": total, "completed": completed, "pending": total - completed, "rate": round((completed / total) * 100, 1), } ) completion_by_hospital.sort(key=lambda x: x["rate"], reverse=True) # ===== Daily Activity Chart Data (Last 30 days) ===== from datetime import timedelta daily_data = [] for i in range(29, -1, -1): date = timezone.now().date() - timedelta(days=i) count = UserAcknowledgement.objects.filter(acknowledged_at__date=date).count() daily_data.append({"date": date.strftime("%Y-%m-%d"), "count": count}) # ===== Checklist Item Completion Rates ===== checklist_stats = [] checklist_items = AcknowledgementChecklistItem.objects.filter(is_active=True, is_required=True) for item in checklist_items: total_ack = UserAcknowledgement.objects.filter(checklist_item=item).count() # Estimate total users who should acknowledge this item if item.category: eligible_users = base_queryset.filter(groups__name__iexact=item.category.name_en.replace("_", " ")).count() else: eligible_users = base_queryset.count() if eligible_users > 0: checklist_stats.append( { "item": item, "acknowledged_count": total_ack, "eligible_count": eligible_users, "completion_rate": round((total_ack / eligible_users) * 100, 1), } ) # Sort by completion rate (lowest first to highlight items needing attention) checklist_stats.sort(key=lambda x: x["completion_rate"]) context = { "page_title": "Acknowledgement Dashboard", "stats": stats, "recent_activations": recent_activations, "pending_users": pending_users, "completion_by_role": completion_by_role, "completion_by_hospital": completion_by_hospital, "daily_data": daily_data, "checklist_stats": checklist_stats[:10], # Top 10 items needing attention "is_px_admin": request.user.is_px_admin(), } return render(request, "accounts/onboarding/dashboard.html", context) @login_required @require_http_methods(["GET", "POST"]) def bulk_invite_users(request): """ Bulk invite users via CSV upload. Expected CSV columns: email, first_name, last_name, role, hospital_id, department_id (optional) """ if not request.user.is_px_admin(): messages.error(request, "You do not have permission to view this page.") return redirect("/dashboard/") import csv import io from .services import OnboardingService, EmailService from apps.organizations.models import Hospital, Department from .models import Role results = {"success": [], "errors": [], "total": 0} if request.method == "POST": csv_file = request.FILES.get("csv_file") emails_text = request.POST.get("emails", "").strip() if not csv_file and not emails_text: messages.error(request, "Please select a CSV file or enter email addresses.") return redirect("accounts:bulk-invite-users") rows_to_process = [] # 1. Process CSV if exists if csv_file: if not csv_file.name.endswith(".csv"): messages.error(request, "Please upload a valid CSV file.") return redirect("accounts:bulk-invite-users") try: decoded_file = csv_file.read().decode("utf-8") io_string = io.StringIO(decoded_file) reader = csv.DictReader(io_string) required_fields = ["email", "first_name", "last_name", "role"] if reader.fieldnames: missing_fields = [f for f in required_fields if f not in reader.fieldnames] if missing_fields: messages.error(request, f"Missing required columns in CSV: {', '.join(missing_fields)}") return redirect("accounts:bulk-invite-users") for row in reader: rows_to_process.append( { "email": row.get("email", "").strip(), "first_name": row.get("first_name", "").strip(), "last_name": row.get("last_name", "").strip(), "role": row.get("role", "").strip(), "hospital_id": row.get("hospital_id", "").strip(), "department_id": row.get("department_id", "").strip(), } ) except Exception as e: messages.error(request, f"Error processing CSV file: {str(e)}") # 2. Process Manual entry if exists if emails_text: role_id = request.POST.get("role_id") hospital_id = request.POST.get("hospital_id") if not role_id: messages.error(request, "Please select a role for manual email entries.") else: try: role_obj = Role.objects.get(id=role_id) role_name = role_obj.name for email in emails_text.splitlines(): email = email.strip() if email and "@" in email: # Basic name extraction from email name_part = email.split("@")[0].replace(".", " ").replace("_", " ").title() parts = name_part.split() f_name = parts[0] if parts else "Staff" l_name = " ".join(parts[1:]) if len(parts) > 1 else "Member" rows_to_process.append( { "email": email, "first_name": f_name, "last_name": l_name, "role": role_name, "hospital_id": hospital_id, "department_id": "", } ) except Exception as e: messages.error(request, f"Error processing manual entries: {str(e)}") # 3. Process all rows for row in rows_to_process: results["total"] += 1 email = row.get("email", "").strip() try: first_name = row.get("first_name", "").strip() last_name = row.get("last_name", "").strip() role_name = row.get("role", "").strip() if not all([email, first_name, last_name, role_name]): results["errors"].append( {"row": results["total"], "email": email or "N/A", "error": "Missing required fields"} ) continue # Check if user already exists if User.objects.filter(email=email).exists(): results["errors"].append( {"row": results["total"], "email": email, "error": "User with this email already exists"} ) continue # Get role try: role = Role.objects.get(name=role_name) except Role.DoesNotExist: results["errors"].append( {"row": results["total"], "email": email, "error": f'Role "{role_name}" does not exist'} ) continue # Get hospital hospital_id = row.get("hospital_id", "").strip() hospital = None if hospital_id: try: hospital = Hospital.objects.get(id=hospital_id) except (Hospital.DoesNotExist, ValueError): # Try searching by name if ID fails (common in CSV) hospital = Hospital.objects.filter(name__iexact=hospital_id).first() if not hospital: results["errors"].append( { "row": results["total"], "email": email, "error": f'Hospital "{hospital_id}" not found', } ) continue # Get department department_id = row.get("department_id", "").strip() department = None if department_id: try: department = Department.objects.get(id=department_id) except (Department.DoesNotExist, ValueError): department = Department.objects.filter(name__iexact=department_id).first() if not department: # Log warning but don't fail for department pass # Create provisional user user_data = { "email": email, "first_name": first_name, "last_name": last_name, "hospital": hospital, "department": department, } user = OnboardingService.create_provisional_user(user_data) # Assign role user.groups.add(role.group) # Send invitation email EmailService.send_invitation_email(user, request) results["success"].append({"email": email, "name": f"{first_name} {last_name}"}) except Exception as e: results["errors"].append( {"row": results["total"], "email": email if "email" in locals() else "N/A", "error": str(e)} ) # Show summary messages if results["success"]: messages.success(request, f"Successfully invited {len(results['success'])} users.") if results["errors"]: messages.warning(request, f"Failed to invite {len(results['errors'])} users. See details below.") # Get data for template roles = Role.objects.all() hospitals = Hospital.objects.filter(status="active").order_by("name") context = { "page_title": "Bulk Invite Users", "results": results, "roles": roles, "hospitals": hospitals, } return render(request, "accounts/onboarding/bulk_invite.html", context) @login_required @require_http_methods(["POST"]) def bulk_resend_invitations(request): """ Bulk resend invitation emails to selected provisional users. """ if not request.user.is_px_admin(): messages.error(request, "You do not have permission to perform this action.") return redirect("/dashboard/") from .services import OnboardingService, EmailService user_ids = request.POST.getlist("user_ids") if not user_ids: messages.warning(request, "No users selected.") return redirect("accounts:provisional-user-list") success_count = 0 error_count = 0 for user_id in user_ids: try: user = User.objects.get(id=user_id, is_provisional=True) # Generate new token if expired or missing if not user.invitation_token or ( user.invitation_expires_at and user.invitation_expires_at < timezone.now() ): user.invitation_token = OnboardingService.generate_token() user.invitation_expires_at = timezone.now() + timedelta(days=7) user.save(update_fields=["invitation_token", "invitation_expires_at"]) # Resend invitation email EmailService.send_invitation_email(user, request) # Log the resend UserProvisionalLog.objects.create( user=user, event_type="invitation_resent", description="Invitation email resent via bulk operation", metadata={"resent_by": str(request.user.id)}, ) success_count += 1 except User.DoesNotExist: error_count += 1 except Exception as e: logger.error(f"Failed to resend invitation to user {user_id}: {str(e)}") error_count += 1 if success_count > 0: messages.success(request, f"Successfully resent invitations to {success_count} users.") if error_count > 0: messages.warning(request, f"Failed to resend invitations to {error_count} users.") return redirect("accounts:provisional-user-list") @login_required @require_http_methods(["POST"]) def bulk_deactivate_users(request): """ Bulk deactivate (delete) selected provisional users. """ if not request.user.is_px_admin(): messages.error(request, "You do not have permission to perform this action.") return redirect("/dashboard/") user_ids = request.POST.getlist("user_ids") if not user_ids: messages.warning(request, "No users selected.") return redirect("accounts:provisional-user-list") # Require confirmation for bulk deletion confirm = request.POST.get("confirm_deletion") if confirm != "yes": messages.warning(request, "Please confirm the deletion by checking the confirmation box.") return redirect("accounts:provisional-user-list") success_count = 0 error_count = 0 for user_id in user_ids: try: user = User.objects.get(id=user_id, is_provisional=True) email = user.email # Store for logging # Delete the user user.delete() success_count += 1 logger.info(f"User {email} (ID: {user_id}) deleted by {request.user.email}") except User.DoesNotExist: error_count += 1 except Exception as e: logger.error(f"Failed to delete user {user_id}: {str(e)}") error_count += 1 if success_count > 0: messages.success(request, f"Successfully deactivated {success_count} users.") if error_count > 0: messages.warning(request, f"Failed to deactivate {error_count} users.") return redirect("accounts:provisional-user-list") @login_required def export_acknowledgements(request): """ Export user acknowledgements to CSV for compliance/audit purposes. """ import csv from django.http import HttpResponse if not (request.user.is_px_admin() or request.user.is_hospital_admin()): messages.error(request, "You do not have permission to view this page.") return redirect("/dashboard/") # Get filter parameters format_type = request.GET.get("format", "csv") status_filter = request.GET.get("status", "all") # Build queryset acknowledgements = UserAcknowledgement.objects.select_related("user", "checklist_item").order_by("-acknowledged_at") # Filter by hospital for hospital admins if request.user.is_hospital_admin() and not request.user.is_px_admin(): acknowledgements = acknowledgements.filter(user__hospital=request.user.hospital) # Apply status filter if provided if status_filter == "completed": acknowledgements = acknowledgements.filter(is_acknowledged=True) elif status_filter == "pending": # Get users who haven't acknowledged pass # Create response response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = ( f'attachment; filename="acknowledgements_{timezone.now().strftime("%Y%m%d_%H%M%S")}.csv"' ) writer = csv.writer(response) # Write header writer.writerow( [ "User ID", "Email", "First Name", "Last Name", "Employee ID", "Hospital", "Department", "Role", "Checklist Item Code", "Checklist Item Text", "Is Acknowledged", "Acknowledged At", "Signature IP", "PDF File URL", ] ) # Write data for ack in acknowledgements: user = ack.user writer.writerow( [ str(user.id), user.email, user.first_name, user.last_name, user.employee_id or "", user.hospital.name if user.hospital else "", user.department.name if user.department else "", user.groups.first().name if user.groups.exists() else "", ack.checklist_item.code, ack.checklist_item.text_en, "Yes" if ack.is_acknowledged else "No", ack.acknowledged_at.strftime("%Y-%m-%d %H:%M:%S") if ack.acknowledged_at else "", ack.signature_ip or "", request.build_absolute_uri(ack.pdf_file.url) if ack.pdf_file else "", ] ) return response @login_required def export_provisional_users(request): """ Export provisional users to CSV. """ import csv from django.http import HttpResponse if not (request.user.is_px_admin() or request.user.is_hospital_admin()): messages.error(request, "You do not have permission to view this page.") return redirect("/dashboard/") # Build queryset users = User.objects.filter(is_provisional=True).select_related("hospital", "department") # Filter by hospital for hospital admins if request.user.is_hospital_admin() and not request.user.is_px_admin(): users = users.filter(hospital=request.user.hospital) # Create response response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = ( f'attachment; filename="provisional_users_{timezone.now().strftime("%Y%m%d_%H%M%S")}.csv"' ) writer = csv.writer(response) # Write header writer.writerow( [ "User ID", "Email", "First Name", "Last Name", "Employee ID", "Hospital", "Department", "Role", "Invitation Expires At", "Days Remaining", "Status", ] ) # Write data for user in users: days_remaining = None if user.invitation_expires_at: days_remaining = (user.invitation_expires_at - timezone.now()).days status = "Active" if user.invitation_expires_at and user.invitation_expires_at < timezone.now(): status = "Expired" elif user.acknowledgement_completed: status = "Completed" writer.writerow( [ str(user.id), user.email, user.first_name, user.last_name, user.employee_id or "", user.hospital.name if user.hospital else "", user.department.name if user.department else "", user.groups.first().name if user.groups.exists() else "", user.invitation_expires_at.strftime("%Y-%m-%d %H:%M:%S") if user.invitation_expires_at else "", days_remaining if days_remaining is not None else "", status, ] ) return response @login_required @require_http_methods(["GET", "POST"]) def provisional_user_list(request): """ List and manage provisional users (PX Admin only) """ if not request.user.is_px_admin(): messages.error(request, "You do not have permission to view this page.") return redirect("/dashboard/") if request.method == "POST": # Handle create provisional user from .serializers import ProvisionalUserSerializer from .services import OnboardingService, EmailService serializer = ProvisionalUserSerializer(data=request.POST) if serializer.is_valid(): user_data = serializer.validated_data.copy() roles = request.POST.getlist("roles", []) # Remove roles from user_data (not a User model field) user_data.pop("roles", None) # Create provisional user user = OnboardingService.create_provisional_user(user_data) # Assign roles for role_name in roles: try: from .models import Role role = Role.objects.get(name=role_name) user.groups.add(role.group) except Role.DoesNotExist: pass # Send invitation email EmailService.send_invitation_email(user, request) messages.success(request, f"Provisional user {user.email} created successfully.") return redirect("accounts:provisional-user-list") else: messages.error(request, "Failed to create provisional user. Please check the form.") # Get all provisional users provisional_users = User.objects.filter(is_provisional=True) # Filter by hospital based on user role # Check PX Admin first to avoid logic issues when user has multiple roles if request.user.is_px_admin() and request.tenant_hospital: from django.db.models import Q provisional_users = provisional_users.filter(Q(hospital=request.tenant_hospital) | Q(hospital__isnull=True)) elif request.user.is_hospital_admin() and request.user.hospital: provisional_users = provisional_users.filter(hospital=request.user.hospital) provisional_users = provisional_users.select_related("hospital", "department").order_by("-created_at") # Calculate statistics total_count = provisional_users.count() completed_count = provisional_users.filter(acknowledgement_completed_at__isnull=False).count() in_progress_count = total_count - completed_count # Get available roles from .models import Role roles = Role.objects.all() # Get hospitals and departments for the form from apps.organizations.models import Hospital, Department hospitals = Hospital.objects.filter(status="active").order_by("name") departments = Department.objects.filter(status="active").order_by("name") context = { "page_title": "Provisional Users", "provisional_accounts": provisional_users, "roles": roles, "hospitals": hospitals, "departments": departments, "total_count": total_count, "completed_count": completed_count, "in_progress_count": in_progress_count, "now": timezone.now(), } return render(request, "accounts/onboarding/provisional_list.html", context) @login_required def provisional_user_progress(request, user_id): """ View onboarding progress for a specific user """ if not (request.user.is_px_admin() or request.user.id == user_id): messages.error(request, "You do not have permission to view this page.") return redirect("/dashboard/") user = User.objects.get(id=user_id) # Get checklist items checklist_items = get_checklist_items_for_user(user) # Get acknowledged items acknowledged_items = UserAcknowledgement.objects.filter(user=user, is_acknowledged=True).select_related( "checklist_item" ) # Create a lookup dict: checklist_item_id -> acknowledged_at timestamp acknowledged_timestamps = {} for ack in acknowledged_items: if ack.checklist_item_id: acknowledged_timestamps[ack.checklist_item_id] = ack.acknowledged_at # Get logs from .models import UserProvisionalLog logs = UserProvisionalLog.objects.filter(user=user).order_by("-created_at") # Calculate progress total_items = checklist_items.filter(is_required=True).count() acknowledged_count = acknowledged_items.filter(checklist_item__is_required=True).count() progress_percentage = int((acknowledged_count / total_items) * 100) if total_items > 0 else 0 # Build checklist_details for template checklist_details = [] for item in checklist_items: checklist_details.append( { "name": item.text_en, "completed": item.id in acknowledged_timestamps, "completed_at": acknowledged_timestamps.get(item.id), "required": item.is_required, } ) # Build wizard content progress wizard_content = get_wizard_content_for_user(user) content_total = wizard_content.count() content_viewed = len(user.wizard_completed_steps or []) content_complete = content_total > 0 and content_viewed >= content_total progress = { "overall_percentage": progress_percentage, "checklist_completed": acknowledged_count, "checklist_total": total_items, "content_viewed": content_viewed, "content_total": content_total, "current_step": user.current_wizard_step or "-", "activation_complete": True, "activation_completed_at": user.created_at, "checklist_complete": acknowledged_count >= total_items and total_items > 0, "checklist_completed_at": None, "content_complete": content_complete, "content_completed_at": user.acknowledgement_completed_at if content_complete else None, "checklist_details": checklist_details, } if acknowledged_count >= total_items and total_items > 0: last_ack = acknowledged_items.order_by("-acknowledged_at").first() if last_ack: progress["checklist_completed_at"] = last_ack.acknowledged_at context = { "page_title": f"Onboarding Progress - {user.email}", "account": user, "user": user, "checklist_items": checklist_items, "logs": logs, "total_items": total_items, "acknowledged_count": acknowledged_count, "remaining_count": total_items - acknowledged_count, "progress_percentage": progress_percentage, "progress": progress, } return render(request, "accounts/onboarding/progress_detail.html", context) # ==================== Acknowledgement Management Views ==================== @login_required def acknowledgement_content_list(request): """ List acknowledgement content (PX Admin only) """ if not request.user.is_px_admin(): messages.error(request, "You do not have permission to view this page.") return redirect("/dashboard/") # Get all content content_list = AcknowledgementContent.objects.select_related("category").order_by("category", "order") context = { "page_title": "Acknowledgement Content", "content_list": content_list, "content_items": content_list, "category_list": AcknowledgementCategory.objects.filter(is_active=True).order_by("order", "name_en"), } return render(request, "accounts/onboarding/content_list.html", context) @login_required def acknowledgement_checklist_list(request): if not request.user.is_px_admin(): messages.error(request, "You do not have permission to view this page.") return redirect("/dashboard/") checklist_items = AcknowledgementChecklistItem.objects.select_related("content", "category").order_by( "category", "order" ) content_list = ( AcknowledgementContent.objects.filter(is_active=True).select_related("category").order_by("category", "order") ) context = { "page_title": "Acknowledgement Checklist Items", "checklist_items": checklist_items, "content_list": content_list, "category_list": AcknowledgementCategory.objects.filter(is_active=True).order_by("order", "name_en"), } return render(request, "accounts/onboarding/checklist_list.html", context) @login_required def acknowledgement_category_list(request): if not request.user.is_px_admin(): messages.error(request, "You do not have permission to view this page.") return redirect("/dashboard/") categories = AcknowledgementCategory.objects.all().order_by("order", "name_en") context = { "page_title": "Acknowledgement Categories", "categories": categories, } return render(request, "accounts/onboarding/category_list.html", context) # ==================== Helper Functions ==================== def get_wizard_content_for_user(user): return AcknowledgementContent.objects.filter(is_active=True).order_by("category", "order") def get_checklist_items_for_user(user): return AcknowledgementChecklistItem.objects.filter(is_active=True).order_by("category", "order", "code") @login_required def staff_activity_log(request): from django.contrib.contenttypes.models import ContentType from django.core.paginator import Paginator from .models import StaffActivityLog queryset = StaffActivityLog.objects.select_related("user", "content_type").order_by("-created_at") user_filter = request.user if user_filter.is_px_admin(): pass elif user_filter.is_hospital_admin() and user_filter.hospital: queryset = queryset.filter(user__hospital=user_filter.hospital) else: queryset = queryset.filter(user=request.user) user_id = request.GET.get("user") if user_id: queryset = queryset.filter(user_id=user_id) activity_type = request.GET.get("activity_type") if activity_type: queryset = queryset.filter(activity_type=activity_type) module = request.GET.get("module") if module: queryset = queryset.filter(module=module) date_from = request.GET.get("date_from") if date_from: queryset = queryset.filter(created_at__gte=date_from) date_to = request.GET.get("date_to") if date_to: queryset = queryset.filter(created_at__lte=date_to) search = request.GET.get("search") if search: queryset = queryset.filter(description__icontains=search) paginator = Paginator(queryset, 25) page_number = request.GET.get("page", 1) page_obj = paginator.get_page(page_number) modules = ( StaffActivityLog.objects.filter(user__isnull=False) .values_list("module", flat=True) .distinct() .order_by("module") ) context = { "page_obj": page_obj, "activities": page_obj.object_list, "activity_types": StaffActivityLog.ActivityType.choices, "modules": [m for m in modules if m], "filters": request.GET, } return render(request, "accounts/staff_activity_log.html", context)