Pre-production security fixes to prevent cross-hospital data leaks: - Standards API: add get_queryset() filtering by department__hospital - Reports service: add user param with hospital filtering to all querysets - RCA views: replace is_superuser with tenant_hospital pattern, add access checks to all 11 mutation views - Notifications views: replace is_superuser patterns with _get_notification_hospital helper across all 5 settings functions - Appreciation API: add tenant_hospital fallback to AppreciationViewSet, AppreciationStatsViewSet, and LeaderboardView - AI Analytics: add tenant_hospital fallback in ExecutiveSummaryGenerator and ActionRecommendationEngine - SourceUserRestrictionMiddleware: remove None from ALLOWED_URL_NAMES - Complaint export: fix nullable patient/due_at/description crashes in CSV and Excel export, fix invalid get_category_display/get_source_display calls E2E test updates: - Update isolation gap tests to actively assert hospital filtering - Fix CSV export test to use API context for download handling - Switch clinical-staff tests to serial mode to prevent race conditions
140 lines
4.2 KiB
Python
140 lines
4.2 KiB
Python
"""
|
|
Middleware for PX Source User access restriction.
|
|
|
|
Provides global route-level protection to ensure source users
|
|
can only access their designated pages.
|
|
"""
|
|
|
|
from django.urls import resolve
|
|
from django.shortcuts import redirect
|
|
from django.contrib import messages
|
|
from django.utils.deprecation import MiddlewareMixin
|
|
|
|
|
|
class SourceUserRestrictionMiddleware(MiddlewareMixin):
|
|
"""
|
|
STRICT middleware that restricts source users to ONLY:
|
|
1. /px-sources/* pages (their dashboard, complaints, inquiries)
|
|
2. Password change page
|
|
3. Logout
|
|
|
|
ALL other routes are BLOCKED.
|
|
"""
|
|
|
|
# URL path prefixes that source users CAN access (whitelist)
|
|
ALLOWED_PATH_PREFIXES = [
|
|
"/px-sources/", # Source user portal
|
|
]
|
|
|
|
# Specific URL names that source users CAN access
|
|
ALLOWED_URL_NAMES = {
|
|
# Password change
|
|
"accounts:password_change",
|
|
"accounts:password_change_done",
|
|
# Settings (limited)
|
|
"accounts:settings",
|
|
# Logout
|
|
"accounts:logout",
|
|
# Login (for redirect after logout)
|
|
"accounts:login",
|
|
}
|
|
|
|
# Explicitly blocked paths (even if they match allowed prefixes)
|
|
BLOCKED_PATHS = [
|
|
"/px-sources/new/",
|
|
"/px-sources/create/",
|
|
"/px-sources/<uuid:pk>/edit/",
|
|
"/px-sources/<uuid:pk>/delete/",
|
|
"/px-sources/<uuid:pk>/toggle/",
|
|
"/px-sources/ajax/",
|
|
"/px-sources/api/",
|
|
]
|
|
|
|
def process_request(self, request):
|
|
# Skip for unauthenticated users
|
|
if not request.user.is_authenticated:
|
|
return None
|
|
|
|
# Skip for superusers
|
|
if request.user.is_superuser:
|
|
return None
|
|
|
|
# Check if user is a source user
|
|
if not self._is_source_user(request.user):
|
|
return None
|
|
|
|
# Source user detected - apply strict restrictions
|
|
path = request.path
|
|
|
|
# Get current route name
|
|
try:
|
|
resolver = resolve(path)
|
|
route_name = f"{resolver.namespace}:{resolver.url_name}" if resolver.namespace else resolver.url_name
|
|
except:
|
|
route_name = None
|
|
|
|
# Check if URL name is explicitly allowed
|
|
if route_name in self.ALLOWED_URL_NAMES:
|
|
return None
|
|
|
|
# Check if path starts with allowed prefixes
|
|
for prefix in self.ALLOWED_PATH_PREFIXES:
|
|
if path.startswith(prefix):
|
|
# Check if it's a blocked sub-path
|
|
for blocked in self.BLOCKED_PATHS:
|
|
if blocked in path:
|
|
return self._block_access(request)
|
|
# Path is allowed
|
|
return None
|
|
|
|
# Check for static/media files (allow these)
|
|
if path.startswith("/static/") or path.startswith("/media/"):
|
|
return None
|
|
|
|
# Check for i18n URLs
|
|
if path.startswith("/i18n/"):
|
|
return None
|
|
|
|
# Everything else is BLOCKED for source users
|
|
return self._block_access(request)
|
|
|
|
def _is_source_user(self, user):
|
|
"""Check if user is a source user via Django Group membership."""
|
|
return user.is_source_user()
|
|
|
|
def _block_access(self, request):
|
|
"""Block access and redirect to source user dashboard."""
|
|
return redirect("px_sources:source_user_dashboard")
|
|
|
|
|
|
class SourceUserSessionMiddleware(MiddlewareMixin):
|
|
"""
|
|
Middleware to set shorter session timeout for source users.
|
|
|
|
Source users have limited access, so their sessions expire faster
|
|
for security purposes.
|
|
"""
|
|
|
|
SOURCE_USER_SESSION_TIMEOUT = 3600 # 1 hour
|
|
NORMAL_SESSION_TIMEOUT = 1209600 # 2 weeks
|
|
|
|
def process_request(self, request):
|
|
if not request.user.is_authenticated:
|
|
return None
|
|
|
|
if self._is_source_user(request.user):
|
|
# Set shorter session for source users
|
|
request.session.set_expiry(self.SOURCE_USER_SESSION_TIMEOUT)
|
|
else:
|
|
# Normal session for other users
|
|
request.session.set_expiry(self.NORMAL_SESSION_TIMEOUT)
|
|
|
|
return None
|
|
|
|
def _is_source_user(self, user):
|
|
"""Check if user is an active source user."""
|
|
if not hasattr(user, "is_source_user"):
|
|
return False
|
|
|
|
return user.is_source_user()
|