HH/apps/organizations/views.py
2026-01-24 15:27:27 +03:00

593 lines
21 KiB
Python

"""
Organizations views and viewsets
"""
from django.db import models
from rest_framework import status, viewsets
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from apps.accounts.permissions import (
CanAccessDepartmentData,
CanAccessHospitalData,
IsPXAdminOrHospitalAdmin,
IsPXAdmin
)
from .models import Department, Hospital, Organization, Patient, Staff
from .models import Staff as StaffModel
from .serializers import (
DepartmentSerializer,
HospitalSerializer,
OrganizationSerializer,
PatientListSerializer,
PatientSerializer,
StaffSerializer,
)
class OrganizationViewSet(viewsets.ModelViewSet):
"""
ViewSet for Organization model.
Permissions:
- PX Admins can manage organizations
- Others can view organizations
"""
queryset = Organization.objects.all()
serializer_class = OrganizationSerializer
permission_classes = [IsAuthenticated]
filterset_fields = ['status', 'city']
search_fields = ['name', 'name_ar', 'code', 'license_number']
ordering_fields = ['name', 'created_at']
ordering = ['name']
def get_queryset(self):
"""Filter organizations based on user role"""
queryset = super().get_queryset().prefetch_related('hospitals')
user = self.request.user
# PX Admins see all organizations
if user.is_px_admin():
return queryset
# Hospital Admins and others see their organization
if user.is_hospital_admin() and user.hospital and user.hospital.organization:
return queryset.filter(id=user.hospital.organization.id)
# Others with hospital see their organization
if user.hospital and user.hospital.organization:
return queryset.filter(id=user.hospital.organization.id)
return queryset.none()
class HospitalViewSet(viewsets.ModelViewSet):
"""
ViewSet for Hospital model.
Permissions:
- PX Admins and Hospital Admins can manage hospitals
- Others can view hospitals they belong to
"""
queryset = Hospital.objects.all()
serializer_class = HospitalSerializer
permission_classes = [IsAuthenticated, CanAccessHospitalData]
filterset_fields = ['status', 'city', 'organization']
search_fields = ['name', 'name_ar', 'code', 'city']
ordering_fields = ['name', 'created_at']
ordering = ['name']
def get_queryset(self):
"""Filter hospitals based on user role"""
queryset = super().get_queryset()
user = self.request.user
# PX Admins see all hospitals
if user.is_px_admin():
return queryset
# Hospital Admins see their hospital
if user.is_hospital_admin() and user.hospital:
return queryset.filter(id=user.hospital.id)
# Department Managers see their hospital
if user.is_department_manager() and user.hospital:
return queryset.filter(id=user.hospital.id)
# Others see hospitals they're associated with
if user.hospital:
return queryset.filter(id=user.hospital.id)
return queryset.none()
class DepartmentViewSet(viewsets.ModelViewSet):
"""
ViewSet for Department model.
Permissions:
- PX Admins and Hospital Admins can manage departments
- Department Managers can view their department
"""
queryset = Department.objects.all()
serializer_class = DepartmentSerializer
permission_classes = [IsAuthenticated, CanAccessDepartmentData]
filterset_fields = ['status', 'hospital', 'parent', 'hospital__organization']
search_fields = ['name', 'name_ar', 'code']
ordering_fields = ['name', 'created_at']
ordering = ['hospital', 'name']
def get_queryset(self):
"""Filter departments based on user role"""
queryset = super().get_queryset().select_related('hospital', 'parent', 'manager')
user = self.request.user
# PX Admins see all departments
if user.is_px_admin():
return queryset
# Hospital Admins see departments in their hospital
if user.is_hospital_admin() and user.hospital:
return queryset.filter(hospital=user.hospital)
# Department Managers see their department and sub-departments
if user.is_department_manager() and user.department:
return queryset.filter(
hospital=user.hospital
).filter(
models.Q(id=user.department.id) | models.Q(parent=user.department)
)
# Others see departments in their hospital
if user.hospital:
return queryset.filter(hospital=user.hospital)
return queryset.none()
class StaffViewSet(viewsets.ModelViewSet):
"""
ViewSet for Staff model.
Permissions:
- PX Admins and Hospital Admins can manage staff
- Others can view staff
"""
queryset = StaffModel.objects.all()
serializer_class = StaffSerializer
permission_classes = [IsAuthenticated]
filterset_fields = ['status', 'hospital', 'department', 'staff_type', 'specialization', 'job_title', 'hospital__organization']
search_fields = ['first_name', 'last_name', 'first_name_ar', 'last_name_ar', 'employee_id', 'license_number', 'job_title']
ordering_fields = ['last_name', 'created_at']
ordering = ['last_name', 'first_name']
def get_permissions(self):
"""Set permissions based on action"""
if self.action in ['create_user_account', 'link_user', 'unlink_user', 'send_invitation']:
return [IsAuthenticated()]
return super().get_permissions()
def get_queryset(self):
"""Filter staff based on user role"""
queryset = super().get_queryset().select_related('hospital', 'department', 'user')
user = self.request.user
# PX Admins see all staff
if user.is_px_admin():
return queryset
# Hospital Admins see staff in their hospital
if user.is_hospital_admin() and user.hospital:
return queryset.filter(hospital=user.hospital)
# Department Managers see staff in their department
if user.is_department_manager() and user.department:
return queryset.filter(department=user.department)
# Others see staff in their hospital
if user.hospital:
return queryset.filter(hospital=user.hospital)
return queryset.none()
@action(detail=True, methods=['post'])
def create_user_account(self, request, pk=None):
"""
Create a user account for a staff member.
Auto-generates username, password, and sends email.
"""
staff = self.get_object()
if staff.user:
return Response(
{'error': 'Staff member already has a user account'},
status=status.HTTP_400_BAD_REQUEST
)
# Check permissions
user = request.user
if not user.is_px_admin() and not user.is_hospital_admin():
return Response(
{'error': 'You do not have permission to create user accounts'},
status=status.HTTP_403_FORBIDDEN
)
# Hospital Admins can only create accounts for staff in their hospital
if user.is_hospital_admin() and staff.hospital != user.hospital:
return Response(
{'error': 'You can only create accounts for staff in your hospital'},
status=status.HTTP_403_FORBIDDEN
)
# Get role from request or use default based on staff_type
from .services import StaffService
role = request.data.get('role', StaffService.get_staff_type_role(staff.staff_type))
try:
user_account, was_created = StaffService.create_user_for_staff(
staff,
role=role,
request=request
)
if was_created:
# Generate password for email (only for new users)
password = StaffService.generate_password()
user_account.set_password(password)
user_account.save()
# Send email with credentials
try:
StaffService.send_credentials_email(staff, password, request)
message = 'User account created and credentials emailed successfully'
except Exception as e:
message = f'User account created. Email sending failed: {str(e)}'
else:
# Existing user was linked - no password to generate or email to send
message = 'Existing user account linked successfully. The staff member can now log in with their existing credentials.'
serializer = self.get_serializer(staff)
return Response({
'message': message,
'staff': serializer.data,
'email': user_account.email,
'was_created': was_created
}, status=status.HTTP_200_OK if not was_created else status.HTTP_201_CREATED)
except ValueError as e:
return Response(
{'error': str(e)},
status=status.HTTP_400_BAD_REQUEST
)
@action(detail=True, methods=['post'])
def link_user(self, request, pk=None):
"""
Link an existing user account to a staff member.
"""
staff = self.get_object()
if staff.user:
return Response(
{'error': 'Staff member already has a user account'},
status=status.HTTP_400_BAD_REQUEST
)
# Check permissions
user = request.user
if not user.is_px_admin() and not user.is_hospital_admin():
return Response(
{'error': 'You do not have permission to link user accounts'},
status=status.HTTP_403_FORBIDDEN
)
# Hospital Admins can only link accounts for staff in their hospital
if user.is_hospital_admin() and staff.hospital != user.hospital:
return Response(
{'error': 'You can only link accounts for staff in your hospital'},
status=status.HTTP_403_FORBIDDEN
)
user_id = request.data.get('user_id')
if not user_id:
return Response(
{'error': 'user_id is required'},
status=status.HTTP_400_BAD_REQUEST
)
from .services import StaffService
try:
StaffService.link_user_to_staff(staff, user_id, request=request)
serializer = self.get_serializer(staff)
return Response({
'message': 'User account linked successfully',
'staff': serializer.data
})
except ValueError as e:
return Response(
{'error': str(e)},
status=status.HTTP_400_BAD_REQUEST
)
@action(detail=True, methods=['post'])
def unlink_user(self, request, pk=None):
"""
Remove user account association from a staff member.
"""
staff = self.get_object()
if not staff.user:
return Response(
{'error': 'Staff member has no user account'},
status=status.HTTP_400_BAD_REQUEST
)
# Check permissions
user = request.user
if not user.is_px_admin() and not user.is_hospital_admin():
return Response(
{'error': 'You do not have permission to unlink user accounts'},
status=status.HTTP_403_FORBIDDEN
)
# Hospital Admins can only unlink accounts for staff in their hospital
if user.is_hospital_admin() and staff.hospital != user.hospital:
return Response(
{'error': 'You can only unlink accounts for staff in your hospital'},
status=status.HTTP_403_FORBIDDEN
)
from .services import StaffService
try:
StaffService.unlink_user_from_staff(staff, request=request)
serializer = self.get_serializer(staff)
return Response({
'message': 'User account unlinked successfully',
'staff': serializer.data
})
except ValueError as e:
return Response(
{'error': str(e)},
status=status.HTTP_400_BAD_REQUEST
)
@action(detail=True, methods=['post'])
def send_invitation(self, request, pk=None):
"""
Send credentials email to staff member.
Generates new password and emails it.
"""
staff = self.get_object()
if not staff.user:
return Response(
{'error': 'Staff member has no user account'},
status=status.HTTP_400_BAD_REQUEST
)
# Check permissions
user = request.user
if not user.is_px_admin() and not user.is_hospital_admin():
return Response(
{'error': 'You do not have permission to send invitations'},
status=status.HTTP_403_FORBIDDEN
)
# Hospital Admins can only send invitations to staff in their hospital
if user.is_hospital_admin() and staff.hospital != user.hospital:
return Response(
{'error': 'You can only send invitations to staff in your hospital'},
status=status.HTTP_403_FORBIDDEN
)
from .services import StaffService
try:
# Generate new password
password = StaffService.generate_password()
# Update user password
staff.user.set_password(password)
staff.user.save()
# Send email
StaffService.send_credentials_email(staff, password, request)
serializer = self.get_serializer(staff)
return Response({
'message': 'Invitation email sent successfully',
'staff': serializer.data
})
except ValueError as e:
return Response(
{'error': str(e)},
status=status.HTTP_400_BAD_REQUEST
)
@action(detail=False, methods=['get'])
def hierarchy(self, request):
"""
Get staff hierarchy as D3-compatible JSON.
Used for interactive tree visualization.
Note: This action uses a more permissive queryset to allow all authenticated
users to view the organization hierarchy for visualization purposes.
"""
from django.db.models import Q
# Get filter parameters
hospital_id = request.query_params.get('hospital')
department_id = request.query_params.get('department')
search = request.query_params.get('search', '').strip()
# Build base queryset - use all staff for hierarchy visualization
# This allows any authenticated user to see the full organizational structure
queryset = StaffModel.objects.all().select_related('report_to', 'hospital', 'department')
# Apply filters
if hospital_id:
queryset = queryset.filter(hospital_id=hospital_id)
if department_id:
queryset = queryset.filter(department_id=department_id)
if search:
queryset = queryset.filter(
Q(first_name__icontains=search) |
Q(last_name__icontains=search) |
Q(employee_id__icontains=search)
)
# Get all staff with their managers
staff_list = queryset.select_related('report_to', 'hospital', 'department')
# Build staff lookup dictionary
staff_dict = {staff.id: staff for staff in staff_list}
# Build hierarchy tree
def build_node(staff):
"""Recursively build hierarchy node for D3"""
node = {
'id': staff.id,
'name': staff.get_full_name(),
'employee_id': staff.employee_id,
'job_title': staff.job_title or '',
'hospital': staff.hospital.name if staff.hospital else '',
'department': staff.department.name if staff.department else '',
'status': staff.status,
'staff_type': staff.staff_type,
'team_size': 0, # Will be calculated
'children': []
}
# Find direct reports
direct_reports = [
s for s in staff_list
if s.report_to_id == staff.id
]
# Recursively build children
for report in direct_reports:
child_node = build_node(report)
node['children'].append(child_node)
node['team_size'] += 1 + child_node['team_size']
return node
# Group root nodes by organization
from collections import defaultdict
org_groups = defaultdict(list)
# Find root nodes (staff with no manager in the filtered set)
root_staff = [
staff for staff in staff_list
if staff.report_to_id is None or staff.report_to_id not in staff_dict
]
# Group root staff by organization
for staff in root_staff:
if staff.hospital and staff.hospital.organization:
org_name = staff.hospital.organization.name
else:
org_name = 'Organization'
org_groups[org_name].append(staff)
# Build hierarchy for each organization
hierarchy = []
top_managers = 0
for org_name, org_root_staff in org_groups.items():
# Build hierarchy nodes for this organization's root staff
org_root_nodes = [build_node(staff) for staff in org_root_staff]
# Calculate total team size for this organization
org_team_size = sum(node['team_size'] + 1 for node in org_root_nodes)
# Create organization node as parent
org_node = {
'id': None,
'name': org_name,
'employee_id': '',
'job_title': 'Organization',
'hospital': '',
'department': '',
'status': 'active',
'staff_type': 'organization',
'team_size': org_team_size,
'children': org_root_nodes,
'is_organization_root': True
}
hierarchy.append(org_node)
top_managers += len(org_root_nodes)
# If there are multiple organizations, wrap them in a single root
if len(hierarchy) > 1:
total_team_size = sum(node['team_size'] for node in hierarchy)
hierarchy = [{
'id': None,
'name': 'All Organizations',
'employee_id': '',
'job_title': '',
'hospital': '',
'department': '',
'status': 'active',
'staff_type': 'root',
'team_size': total_team_size,
'children': hierarchy,
'is_virtual_root': True
}]
# Calculate statistics
total_staff = len(staff_list)
return Response({
'hierarchy': hierarchy,
'statistics': {
'total_staff': total_staff,
'top_managers': top_managers
}
})
class PatientViewSet(viewsets.ModelViewSet):
"""
ViewSet for Patient model.
Permissions:
- All authenticated users can view patients
- PX Admins and Hospital Admins can manage patients
"""
queryset = Patient.objects.all()
permission_classes = [IsAuthenticated]
filterset_fields = ['status', 'gender', 'primary_hospital', 'city', 'primary_hospital__organization']
search_fields = ['mrn', 'national_id', 'first_name', 'last_name', 'phone', 'email']
ordering_fields = ['last_name', 'created_at']
ordering = ['last_name', 'first_name']
def get_serializer_class(self):
"""Use simplified serializer for list view"""
if self.action == 'list':
return PatientListSerializer
return PatientSerializer
def get_queryset(self):
"""Filter patients based on user role"""
queryset = super().get_queryset().select_related('primary_hospital')
user = self.request.user
# PX Admins see all patients
if user.is_px_admin():
return queryset
# Hospital Admins see patients in their hospital
if user.is_hospital_admin() and user.hospital:
return queryset.filter(primary_hospital=user.hospital)
# Others see patients in their hospital
if user.hospital:
return queryset.filter(primary_hospital=user.hospital)
return queryset