481 lines
15 KiB
Python
481 lines
15 KiB
Python
"""
|
|
Accounts services - Onboarding, email notifications, and other account services
|
|
"""
|
|
import secrets
|
|
from datetime import timedelta
|
|
|
|
from django.conf import settings
|
|
from django.core.mail import send_mail
|
|
from django.template.loader import render_to_string
|
|
from django.utils import timezone
|
|
|
|
from .models import (
|
|
AcknowledgementChecklistItem,
|
|
AcknowledgementContent,
|
|
UserAcknowledgement,
|
|
UserProvisionalLog,
|
|
)
|
|
|
|
|
|
class OnboardingService:
|
|
"""Service for managing user onboarding and acknowledgements"""
|
|
|
|
@staticmethod
|
|
def create_provisional_user(user_data):
|
|
"""
|
|
Create a provisional user with invitation token
|
|
|
|
Args:
|
|
user_data: Dict with user fields (email, first_name, last_name, etc.)
|
|
|
|
Returns:
|
|
User instance with is_provisional=True
|
|
"""
|
|
from django.contrib.auth import get_user_model
|
|
User = get_user_model()
|
|
|
|
# Create user with unusable password
|
|
user_data['is_provisional'] = True
|
|
user_data['is_active'] = True # Active but needs onboarding
|
|
user_data['invitation_token'] = OnboardingService.generate_token()
|
|
user_data['invitation_expires_at'] = timezone.now() + timedelta(days=7)
|
|
|
|
user = User.objects.create(**user_data)
|
|
user.set_unusable_password()
|
|
user.save()
|
|
|
|
# Log creation
|
|
UserProvisionalLog.objects.create(
|
|
user=user,
|
|
event_type='created',
|
|
description=f"Provisional user created",
|
|
metadata=user_data
|
|
)
|
|
|
|
return user
|
|
|
|
@staticmethod
|
|
def generate_token():
|
|
"""Generate a secure invitation token"""
|
|
return secrets.token_urlsafe(32)
|
|
|
|
@staticmethod
|
|
def validate_token(token):
|
|
"""
|
|
Validate invitation token
|
|
|
|
Args:
|
|
token: Invitation token string
|
|
|
|
Returns:
|
|
User instance if valid, None otherwise
|
|
"""
|
|
from django.contrib.auth import get_user_model
|
|
User = get_user_model()
|
|
|
|
try:
|
|
user = User.objects.get(
|
|
invitation_token=token,
|
|
is_provisional=True
|
|
)
|
|
|
|
# Check if expired
|
|
if user.invitation_expires_at and user.invitation_expires_at < timezone.now():
|
|
# Log expiration
|
|
UserProvisionalLog.objects.create(
|
|
user=user,
|
|
event_type='invitation_expired',
|
|
description="Invitation token expired"
|
|
)
|
|
return None
|
|
|
|
return user
|
|
except User.DoesNotExist:
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_wizard_content(user):
|
|
"""
|
|
Get content sections for user's wizard
|
|
|
|
Args:
|
|
user: User instance
|
|
|
|
Returns:
|
|
QuerySet of AcknowledgementContent
|
|
"""
|
|
# Get user's role
|
|
role = None
|
|
if user.groups.exists():
|
|
role = user.groups.first().name
|
|
|
|
# Get content for user's role or all roles
|
|
content = AcknowledgementContent.objects.filter(is_active=True).filter(
|
|
models.Q(role=role) | models.Q(role__isnull=True)
|
|
).order_by('order')
|
|
|
|
return content
|
|
|
|
@staticmethod
|
|
def get_checklist_items(user):
|
|
"""
|
|
Get checklist items for user
|
|
|
|
Args:
|
|
user: User instance
|
|
|
|
Returns:
|
|
QuerySet of AcknowledgementChecklistItem
|
|
"""
|
|
from django.db import models
|
|
|
|
# Get user's role
|
|
role = None
|
|
if user.groups.exists():
|
|
role = user.groups.first().name
|
|
|
|
# Get items for user's role or all roles
|
|
items = AcknowledgementChecklistItem.objects.filter(is_active=True).filter(
|
|
models.Q(role=role) | models.Q(role__isnull=True)
|
|
).order_by('order')
|
|
|
|
return items
|
|
|
|
@staticmethod
|
|
def get_user_acknowledgements(user):
|
|
"""
|
|
Get all acknowledgements for a user
|
|
|
|
Args:
|
|
user: User instance
|
|
|
|
Returns:
|
|
QuerySet of UserAcknowledgement
|
|
"""
|
|
return UserAcknowledgement.objects.filter(user=user)
|
|
|
|
@staticmethod
|
|
def get_user_progress_percentage(user):
|
|
"""
|
|
Calculate user's onboarding progress percentage
|
|
|
|
Args:
|
|
user: User instance
|
|
|
|
Returns:
|
|
Float percentage (0-100)
|
|
"""
|
|
# Get all required checklist items for user
|
|
required_items = OnboardingService.get_checklist_items(user).filter(is_required=True)
|
|
|
|
if not required_items.exists():
|
|
return 100.0
|
|
|
|
# Count acknowledged items
|
|
acknowledged_count = UserAcknowledgement.objects.filter(
|
|
user=user,
|
|
checklist_item__in=required_items,
|
|
is_acknowledged=True
|
|
).count()
|
|
|
|
total_count = required_items.count()
|
|
|
|
return (acknowledged_count / total_count) * 100
|
|
|
|
@staticmethod
|
|
def save_wizard_step(user, step_id):
|
|
"""
|
|
Save completed wizard step
|
|
|
|
Args:
|
|
user: User instance
|
|
step_id: Step identifier
|
|
"""
|
|
if step_id not in user.wizard_completed_steps:
|
|
user.wizard_completed_steps.append(step_id)
|
|
user.current_wizard_step = max(user.current_wizard_step, step_id)
|
|
user.save(update_fields=['wizard_completed_steps', 'current_wizard_step'])
|
|
|
|
# Log step completion
|
|
UserProvisionalLog.objects.create(
|
|
user=user,
|
|
event_type='step_completed',
|
|
description=f"Completed wizard step {step_id}",
|
|
metadata={'step_id': step_id}
|
|
)
|
|
|
|
@staticmethod
|
|
def acknowledge_item(user, checklist_item, signature=None, request=None):
|
|
"""
|
|
Acknowledge a checklist item
|
|
|
|
Args:
|
|
user: User instance
|
|
checklist_item: AcknowledgementChecklistItem instance
|
|
signature: Optional signature data
|
|
request: Optional request object for IP/user agent
|
|
|
|
Returns:
|
|
UserAcknowledgement instance
|
|
"""
|
|
# Get or create acknowledgement
|
|
acknowledgement, created = UserAcknowledgement.objects.get_or_create(
|
|
user=user,
|
|
checklist_item=checklist_item,
|
|
defaults={
|
|
'is_acknowledged': True,
|
|
'signature': signature or '',
|
|
'signature_ip': OnboardingService._get_client_ip(request) if request else None,
|
|
'signature_user_agent': request.META.get('HTTP_USER_AGENT', '') if request else ''
|
|
}
|
|
)
|
|
|
|
if created:
|
|
# Log acknowledgement
|
|
UserProvisionalLog.objects.create(
|
|
user=user,
|
|
event_type='step_completed',
|
|
description=f"Acknowledged item: {checklist_item.code}",
|
|
metadata={'checklist_item_id': str(checklist_item.id)}
|
|
)
|
|
|
|
return acknowledgement
|
|
|
|
@staticmethod
|
|
def complete_wizard(user, username, password, signature_data, request=None):
|
|
"""
|
|
Complete wizard and activate user account
|
|
|
|
Args:
|
|
user: User instance
|
|
username: Desired username
|
|
password: Desired password
|
|
signature_data: Final signature data
|
|
request: Optional request object
|
|
|
|
Returns:
|
|
Boolean indicating success
|
|
"""
|
|
from django.contrib.auth import get_user_model
|
|
User = get_user_model()
|
|
|
|
# Check if username is available
|
|
if User.objects.filter(username=username).exists():
|
|
return False
|
|
|
|
# Check if all required items are acknowledged
|
|
required_items = OnboardingService.get_checklist_items(user).filter(is_required=True)
|
|
acknowledged_items = UserAcknowledgement.objects.filter(
|
|
user=user,
|
|
checklist_item__in=required_items,
|
|
is_acknowledged=True
|
|
)
|
|
|
|
if acknowledged_items.count() != required_items.count():
|
|
return False
|
|
|
|
# Activate user
|
|
user.is_provisional = False
|
|
user.acknowledgement_completed = True
|
|
user.acknowledgement_completed_at = timezone.now()
|
|
user.username = username
|
|
user.set_password(password)
|
|
user.invitation_token = None
|
|
user.invitation_expires_at = None
|
|
user.save(update_fields=[
|
|
'is_provisional', 'acknowledgement_completed',
|
|
'acknowledgement_completed_at', 'username',
|
|
'invitation_token', 'invitation_expires_at'
|
|
])
|
|
|
|
# Log activation
|
|
UserProvisionalLog.objects.create(
|
|
user=user,
|
|
event_type='user_activated',
|
|
description="User account activated after completing onboarding",
|
|
ip_address=OnboardingService._get_client_ip(request) if request else None,
|
|
user_agent=request.META.get('HTTP_USER_AGENT', '') if request else ''
|
|
)
|
|
|
|
return True
|
|
|
|
@staticmethod
|
|
def _get_client_ip(request):
|
|
"""Get client IP address from request"""
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
@staticmethod
|
|
def get_pending_onboarding_users():
|
|
"""
|
|
Get count of pending onboarding users
|
|
|
|
Returns:
|
|
Integer count
|
|
"""
|
|
from django.contrib.auth import get_user_model
|
|
User = get_user_model()
|
|
|
|
return User.objects.filter(
|
|
is_provisional=True,
|
|
acknowledgement_completed=False
|
|
).count()
|
|
|
|
|
|
class EmailService:
|
|
"""Service for sending onboarding-related emails"""
|
|
|
|
@staticmethod
|
|
def send_invitation_email(user, request=None):
|
|
"""
|
|
Send invitation email to provisional user
|
|
|
|
Args:
|
|
user: User instance
|
|
request: Optional request object for building URLs
|
|
|
|
Returns:
|
|
Boolean indicating success
|
|
"""
|
|
# Build activation URL
|
|
base_url = getattr(settings, 'BASE_URL', 'http://localhost:8000')
|
|
activation_url = f"{base_url}/accounts/onboarding/activate/{user.invitation_token}/"
|
|
|
|
# Render email content
|
|
context = {
|
|
'user': user,
|
|
'activation_url': activation_url,
|
|
'expires_at': user.invitation_expires_at,
|
|
}
|
|
|
|
subject = render_to_string('accounts/onboarding/invitation_subject.txt', context).strip()
|
|
message_html = render_to_string('accounts/onboarding/invitation_email.html', context)
|
|
message_text = render_to_string('accounts/onboarding/invitation_email.txt', context)
|
|
|
|
# Send email
|
|
try:
|
|
send_mail(
|
|
subject=subject,
|
|
message=message_text,
|
|
from_email=settings.DEFAULT_FROM_EMAIL,
|
|
recipient_list=[user.email],
|
|
html_message=message_html,
|
|
fail_silently=False
|
|
)
|
|
|
|
# Log invitation sent
|
|
UserProvisionalLog.objects.create(
|
|
user=user,
|
|
event_type='invitation_sent',
|
|
description="Invitation email sent",
|
|
metadata={'email': user.email}
|
|
)
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error sending invitation email: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def send_reminder_email(user, request=None):
|
|
"""
|
|
Send reminder email to pending user
|
|
|
|
Args:
|
|
user: User instance
|
|
request: Optional request object
|
|
|
|
Returns:
|
|
Boolean indicating success
|
|
"""
|
|
# Build activation URL
|
|
base_url = getattr(settings, 'BASE_URL', 'http://localhost:8000')
|
|
activation_url = f"{base_url}/accounts/onboarding/activate/{user.invitation_token}/"
|
|
|
|
# Render email content
|
|
context = {
|
|
'user': user,
|
|
'activation_url': activation_url,
|
|
'expires_at': user.invitation_expires_at,
|
|
}
|
|
|
|
subject = render_to_string('accounts/onboarding/reminder_subject.txt', context).strip()
|
|
message_html = render_to_string('accounts/onboarding/reminder_email.html', context)
|
|
message_text = render_to_string('accounts/onboarding/reminder_email.txt', context)
|
|
|
|
# Send email
|
|
try:
|
|
send_mail(
|
|
subject=subject,
|
|
message=message_text,
|
|
from_email=settings.DEFAULT_FROM_EMAIL,
|
|
recipient_list=[user.email],
|
|
html_message=message_html,
|
|
fail_silently=False
|
|
)
|
|
|
|
# Log reminder sent
|
|
UserProvisionalLog.objects.create(
|
|
user=user,
|
|
event_type='invitation_resent',
|
|
description="Reminder email sent",
|
|
metadata={'email': user.email}
|
|
)
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error sending reminder email: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def send_completion_notification(user, admin_users, request=None):
|
|
"""
|
|
Send notification to admins about user completion
|
|
|
|
Args:
|
|
user: User instance who completed onboarding
|
|
admin_users: QuerySet of admin users to notify
|
|
request: Optional request object
|
|
|
|
Returns:
|
|
Boolean indicating success
|
|
"""
|
|
base_url = getattr(settings, 'BASE_URL', 'http://localhost:8000')
|
|
user_detail_url = f"{base_url}/accounts/management/progress/{user.id}/"
|
|
|
|
# Render email content
|
|
context = {
|
|
'user': user,
|
|
'user_detail_url': user_detail_url,
|
|
}
|
|
|
|
subject = render_to_string('accounts/onboarding/completion_subject.txt', context).strip()
|
|
message_html = render_to_string('accounts/onboarding/completion_email.html', context)
|
|
message_text = render_to_string('accounts/onboarding/completion_email.txt', context)
|
|
|
|
# Get admin email list
|
|
admin_emails = [admin.email for admin in admin_users if admin.email]
|
|
|
|
if not admin_emails:
|
|
return False
|
|
|
|
# Send email
|
|
try:
|
|
send_mail(
|
|
subject=subject,
|
|
message=message_text,
|
|
from_email=settings.DEFAULT_FROM_EMAIL,
|
|
recipient_list=admin_emails,
|
|
html_message=message_html,
|
|
fail_silently=False
|
|
)
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error sending completion notification: {e}")
|
|
return False
|