Marwan Alwali ab2c4a36c5 update
2025-10-02 10:13:03 +03:00

446 lines
16 KiB
Python

"""
Accounts API views.
"""
from rest_framework import viewsets, filters, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated, AllowAny
from django_filters.rest_framework import DjangoFilterBackend
from django.db.models import Q, Count
from django.utils import timezone
from django.contrib.auth import login, logout
from ..models import User, TwoFactorDevice, SocialAccount, UserSession, PasswordHistory
from .serializers import (
UserSerializer, UserProfileSerializer, TwoFactorDeviceSerializer,
SocialAccountSerializer, UserSessionSerializer, PasswordHistorySerializer,
LoginSerializer, PasswordChangeSerializer, PasswordResetSerializer
)
from core.utils import AuditLogger
class UserViewSet(viewsets.ModelViewSet):
"""
User API viewset.
"""
serializer_class = UserSerializer
permission_classes = [IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = [
'role', 'department', 'is_active', 'is_verified', 'is_approved',
'two_factor_enabled'
]
search_fields = [
'username', 'email', 'first_name', 'last_name',
'employee_id', 'license_number', 'npi_number'
]
ordering_fields = ['username', 'email', 'last_name', 'created_at', 'last_login']
ordering = ['last_name', 'first_name']
def get_queryset(self):
if hasattr(self.request, 'tenant') and self.request.tenant:
return User.objects.filter(tenant=self.request.tenant)
return User.objects.none()
def perform_create(self, serializer):
user = serializer.save(tenant=self.request.user.tenant)
# Log user creation
AuditLogger.log_event(
tenant=self.request.user.tenant,
event_type='CREATE',
event_category='DATA_MODIFICATION',
action='Create User',
description=f'Created user: {user.username}',
user=self.request.user,
content_object=user,
request=self.request
)
def perform_update(self, serializer):
user = serializer.save()
# Log user update
AuditLogger.log_event(
tenant=self.request.user.tenant,
event_type='UPDATE',
event_category='DATA_MODIFICATION',
action='Update User',
description=f'Updated user: {user.username}',
user=self.request.user,
content_object=user,
request=self.request
)
@action(detail=False, methods=['get'])
def me(self, request):
"""
Get current user profile.
"""
serializer = UserProfileSerializer(request.user)
return Response(serializer.data)
@action(detail=False, methods=['put', 'patch'])
def update_profile(self, request):
"""
Update current user profile.
"""
serializer = UserProfileSerializer(
request.user,
data=request.data,
partial=request.method == 'PATCH'
)
if serializer.is_valid():
serializer.save()
# Log profile update
AuditLogger.log_event(
tenant=getattr(request, 'tenant', None),
event_type='UPDATE',
event_category='DATA_MODIFICATION',
action='Update Profile',
description=f'User updated their profile: {request.user.username}',
user=request.user,
content_object=request.user,
request=request
)
return Response(serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['post'])
def change_password(self, request):
"""
Change user password.
"""
serializer = PasswordChangeSerializer(
data=request.data,
context={'request': request}
)
if serializer.is_valid():
serializer.save()
# Log password change
AuditLogger.log_event(
tenant=getattr(request, 'tenant', None),
event_type='UPDATE',
event_category='SECURITY',
action='Change Password',
description=f'User changed their password: {request.user.username}',
user=request.user,
content_object=request.user,
request=request
)
return Response({'message': 'Password changed successfully'})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['post'])
def lock_account(self, request, pk=None):
"""
Lock user account.
"""
user = self.get_object()
duration = request.data.get('duration_minutes', 15)
user.lock_account(duration)
# Log account lock
AuditLogger.log_event(
tenant=getattr(request, 'tenant', None),
event_type='UPDATE',
event_category='SECURITY',
action='Lock User Account',
description=f'Locked user account: {user.username}',
user=request.user,
content_object=user,
additional_data={'duration_minutes': duration},
request=request
)
return Response({'message': 'Account locked successfully'})
@action(detail=True, methods=['post'])
def unlock_account(self, request, pk=None):
"""
Unlock user account.
"""
user = self.get_object()
user.unlock_account()
# Log account unlock
AuditLogger.log_event(
tenant=getattr(request, 'tenant', None),
event_type='UPDATE',
event_category='SECURITY',
action='Unlock User Account',
description=f'Unlocked user account: {user.username}',
user=request.user,
content_object=user,
request=request
)
return Response({'message': 'Account unlocked successfully'})
@action(detail=False, methods=['get'])
def statistics(self, request):
"""
Get user statistics.
"""
queryset = self.get_queryset()
stats = {
'total_users': queryset.count(),
'active_users': queryset.filter(is_active=True).count(),
'pending_approval': queryset.filter(is_approved=False).count(),
'locked_accounts': queryset.filter(locked_until__gt=timezone.now()).count(),
'two_factor_enabled': queryset.filter(two_factor_enabled=True).count(),
'users_by_role': list(
queryset.values('role').annotate(count=Count('id')).order_by('-count')
),
'users_by_department': list(
queryset.exclude(department__isnull=True).exclude(department='')
.values('department').annotate(count=Count('id')).order_by('-count')[:10]
),
}
return Response(stats)
class TwoFactorDeviceViewSet(viewsets.ModelViewSet):
"""
Two-factor device API viewset.
"""
serializer_class = TwoFactorDeviceSerializer
permission_classes = [IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['device_type', 'is_active', 'is_verified']
search_fields = ['name']
ordering_fields = ['name', 'created_at', 'last_used_at']
ordering = ['-created_at']
def get_queryset(self):
# Users can only see their own devices
return TwoFactorDevice.objects.filter(user=self.request.user)
def perform_create(self, serializer):
device = serializer.save(user=self.request.user)
# Log device creation
AuditLogger.log_event(
tenant=self.request.user.tenant,
event_type='CREATE',
event_category='SECURITY',
action='Create Two-Factor Device',
description=f'Created two-factor device: {device.name}',
user=self.request.user,
content_object=device,
request=self.request
)
def perform_destroy(self, instance):
device_name = instance.name
instance.delete()
# Log device deletion
AuditLogger.log_event(
tenant=self.request.user.tenant,
event_type='DELETE',
event_category='SECURITY',
action='Delete Two-Factor Device',
description=f'Deleted two-factor device: {device_name}',
user=self.request.user,
additional_data={'device_name': device_name},
request=self.request
)
class SocialAccountViewSet(viewsets.ReadOnlyModelViewSet):
"""
Social account API viewset.
"""
serializer_class = SocialAccountSerializer
permission_classes = [IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['provider', 'is_active']
search_fields = ['provider_email', 'display_name']
ordering_fields = ['provider', 'created_at', 'last_login_at']
ordering = ['-created_at']
def get_queryset(self):
# Users can only see their own social accounts
return SocialAccount.objects.filter(user=self.request.user)
class UserSessionViewSet(viewsets.ReadOnlyModelViewSet):
"""
User session API viewset.
"""
serializer_class = UserSessionSerializer
permission_classes = [IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['device_type', 'is_active', 'login_method', 'country']
search_fields = ['ip_address', 'browser', 'operating_system']
ordering_fields = ['created_at', 'last_activity_at', 'expires_at']
ordering = ['-created_at']
def get_queryset(self):
if self.request.user.is_staff:
# Staff can see all sessions in their tenant
if hasattr(self.request, 'tenant') and self.request.tenant:
return UserSession.objects.filter(user__tenant=self.request.tenant)
else:
# Regular users can only see their own sessions
return UserSession.objects.filter(user=self.request.user)
return UserSession.objects.none()
@action(detail=True, methods=['post'])
def end_session(self, request, pk=None):
"""
End a user session.
"""
session = self.get_object()
# Check if user can end this session
if not request.user.is_staff and session.user != request.user:
return Response(
{'error': 'You can only end your own sessions'},
status=status.HTTP_403_FORBIDDEN
)
session.end_session()
# Log session termination
AuditLogger.log_event(
tenant=getattr(request, 'tenant', None),
event_type='UPDATE',
event_category='AUTHENTICATION',
action='End User Session',
description=f'Ended session for user: {session.user.username}',
user=request.user,
content_object=session,
additional_data={
'target_user': session.user.username,
'session_ip': session.ip_address,
},
request=request
)
return Response({'message': 'Session ended successfully'})
@action(detail=False, methods=['get'])
def active(self, request):
"""
Get active sessions.
"""
queryset = self.get_queryset().filter(is_active=True)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class AuthViewSet(viewsets.ViewSet):
"""
Authentication API viewset.
"""
@action(detail=False, methods=['post'], permission_classes=[AllowAny])
def login(self, request):
"""
User login.
"""
serializer = LoginSerializer(data=request.data)
if serializer.is_valid():
user = serializer.validated_data['user']
login(request, user)
# Log successful login
AuditLogger.log_event(
tenant=getattr(request, 'tenant', None),
event_type='LOGIN',
event_category='AUTHENTICATION',
action='User Login',
description=f'User logged in: {user.username}',
user=user,
content_object=user,
request=request
)
# Reset failed login attempts
user.reset_failed_login()
return Response({
'message': 'Login successful',
'user': UserSerializer(user).data
})
else:
# Log failed login attempt
username = request.data.get('username')
if username:
try:
user = User.objects.get(username=username)
user.increment_failed_login()
AuditLogger.log_event(
tenant=getattr(request, 'tenant', None),
event_type='LOGIN',
event_category='SECURITY',
action='Failed Login Attempt',
description=f'Failed login attempt for user: {username}',
user=None,
is_successful=False,
additional_data={'username': username},
request=request
)
except User.DoesNotExist:
pass
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['post'])
def logout(self, request):
"""
User logout.
"""
user = request.user
logout(request)
# Log logout
AuditLogger.log_event(
tenant=getattr(request, 'tenant', None),
event_type='LOGOUT',
event_category='AUTHENTICATION',
action='User Logout',
description=f'User logged out: {user.username}',
user=user,
content_object=user,
request=request
)
return Response({'message': 'Logout successful'})
@action(detail=False, methods=['post'], permission_classes=[AllowAny])
def password_reset(self, request):
"""
Password reset request.
"""
serializer = PasswordResetSerializer(data=request.data)
if serializer.is_valid():
email = serializer.validated_data['email']
# Log password reset request
AuditLogger.log_event(
tenant=getattr(request, 'tenant', None),
event_type='UPDATE',
event_category='SECURITY',
action='Password Reset Request',
description=f'Password reset requested for email: {email}',
user=None,
additional_data={'email': email},
request=request
)
# In a real implementation, send password reset email here
return Response({'message': 'Password reset email sent'})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)