HH/apps/reports/views.py
ismail 23d439f5a5 fix: harden multi-tenant data isolation across 8 modules
Pre-production security fixes to prevent cross-hospital data leaks:

- Standards API: add get_queryset() filtering by department__hospital
- Reports service: add user param with hospital filtering to all querysets
- RCA views: replace is_superuser with tenant_hospital pattern, add access
  checks to all 11 mutation views
- Notifications views: replace is_superuser patterns with _get_notification_hospital
  helper across all 5 settings functions
- Appreciation API: add tenant_hospital fallback to AppreciationViewSet,
  AppreciationStatsViewSet, and LeaderboardView
- AI Analytics: add tenant_hospital fallback in ExecutiveSummaryGenerator and
  ActionRecommendationEngine
- SourceUserRestrictionMiddleware: remove None from ALLOWED_URL_NAMES
- Complaint export: fix nullable patient/due_at/description crashes in CSV
  and Excel export, fix invalid get_category_display/get_source_display calls

E2E test updates:
- Update isolation gap tests to actively assert hospital filtering
- Fix CSV export test to use API context for download handling
- Switch clinical-staff tests to serial mode to prevent race conditions
2026-04-07 01:23:10 +03:00

441 lines
16 KiB
Python

"""
Report Builder UI Views - Simplified Version
Handles the visual report builder interface, saved reports,
and exports. No chart functionality.
"""
import json
from django.contrib.auth.decorators import login_required
from django.contrib import messages
from django.http import JsonResponse, HttpResponse
from django.shortcuts import render, redirect, get_object_or_404
from django.views.decorators.http import require_POST, require_GET
from django.views.decorators.csrf import ensure_csrf_cookie
from django.utils import timezone
from django.core.paginator import Paginator
from apps.organizations.models import Department, Hospital
from .models import SavedReport, GeneratedReport, ReportTemplate, DataSource, ReportFormat
from .services import ReportBuilderService, ReportExportService
@login_required
@ensure_csrf_cookie
def report_builder(request):
"""
Visual report builder interface.
Allows creating custom reports with:
- Data source selection
- Dynamic filters
- Column selection
- Chart configuration
"""
user = request.user
# Get hospitals for filter
hospital = getattr(request, "tenant_hospital", None) or user.hospital
hospitals = Hospital.objects.filter(status="active")
if not user.is_px_admin() and hospital:
hospitals = hospitals.filter(id=hospital.id)
# Get saved reports
saved_reports = SavedReport.objects.filter(created_by=user).order_by("-created_at")[:10]
context = {
"hospitals": hospitals,
"saved_reports": saved_reports,
"data_sources": DataSource.choices,
}
return render(request, "reports/report_builder.html", context)
@login_required
def report_preview_api(request):
"""
API endpoint to preview report data.
Returns JSON with:
- Report data rows
- Summary statistics
- Chart data
"""
if request.method != "POST":
return JsonResponse({"error": "POST required"}, status=405)
try:
data = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({"error": "Invalid JSON"}, status=400)
data_source = data.get("data_source", "complaints")
filter_config = data.get("filter_config", {})
column_config = data.get("column_config", [])
grouping_config = data.get("grouping_config", {})
chart_config = data.get("chart_config", {})
sort_config = data.get("sort_config", [])
# Apply user's hospital restriction
user = request.user
hospital = getattr(request, "tenant_hospital", None) or user.hospital
if not user.is_px_admin() and hospital:
filter_config["hospital"] = str(hospital.id)
# Generate report data
report_data = ReportBuilderService.generate_report_data(
data_source=data_source,
filter_config=filter_config,
column_config=column_config,
grouping_config=grouping_config,
sort_config=sort_config,
user=user,
)
# Generate summary
summary = ReportBuilderService.generate_summary(data_source, filter_config, user=user)
return JsonResponse(
{
"success": True,
"data": report_data,
"summary": summary,
}
)
@login_required
def save_report(request):
"""Save a report configuration."""
if request.method != "POST":
return JsonResponse({"error": "POST required"}, status=405)
try:
data = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({"error": "Invalid JSON"}, status=400)
report_id = data.get("id")
if report_id:
# Update existing report
report = get_object_or_404(SavedReport, id=report_id, created_by=request.user)
report.name = data.get("name", report.name)
report.description = data.get("description", report.description)
report.data_source = data.get("data_source", report.data_source)
report.filter_config = data.get("filter_config", report.filter_config)
report.column_config = data.get("column_config", report.column_config)
report.grouping_config = data.get("grouping_config", report.grouping_config)
report.sort_config = data.get("sort_config", report.sort_config)
report.is_shared = data.get("is_shared", report.is_shared)
report.save()
else:
# Create new report
report = SavedReport.objects.create(
name=data.get("name", "Untitled Report"),
description=data.get("description", ""),
data_source=data.get("data_source", "complaints"),
filter_config=data.get("filter_config", {}),
column_config=data.get("column_config", []),
grouping_config=data.get("grouping_config", {}),
sort_config=data.get("sort_config", []),
is_shared=data.get("is_shared", False),
created_by=request.user,
hospital=request.user.hospital,
)
return JsonResponse({"success": True, "report_id": str(report.id), "message": "Report saved successfully"})
@login_required
def saved_reports_list(request):
"""List all saved reports."""
user = request.user
# Get user's reports and shared reports
hospital = getattr(request, "tenant_hospital", None) or user.hospital
queryset = SavedReport.objects.filter(created_by=user)
if hospital:
queryset = queryset | SavedReport.objects.filter(is_shared=True, hospital=hospital)
# Remove duplicates and order
queryset = queryset.distinct().order_by("-created_at")
# Filter by data source
data_source = request.GET.get("data_source")
if data_source:
queryset = queryset.filter(data_source=data_source)
# Search
search = request.GET.get("search", "")
if search:
queryset = queryset.filter(name__icontains=search)
# Pagination
paginator = Paginator(queryset, 25)
page_number = request.GET.get("page", 1)
page_obj = paginator.get_page(page_number)
context = {
"page_obj": page_obj,
"reports": page_obj.object_list,
"data_sources": DataSource.choices,
"search": search,
"selected_source": data_source,
}
return render(request, "reports/saved_reports.html", context)
@login_required
def report_detail(request, report_id):
"""View a saved report with live data."""
user = request.user
report = get_object_or_404(SavedReport, id=report_id)
# Check access
hospital = getattr(request, "tenant_hospital", None) or user.hospital
if report.created_by != user and not (report.is_shared and report.hospital == hospital):
if not user.is_px_admin():
messages.error(request, "You don't have access to this report.")
return redirect("reports:saved_reports")
# Apply user's hospital restriction
filter_config = report.filter_config.copy()
if not user.is_px_admin() and hospital:
filter_config["hospital"] = str(hospital.id)
# Generate report data
report_data = ReportBuilderService.generate_report_data(
data_source=report.data_source,
filter_config=filter_config,
column_config=report.column_config,
grouping_config=report.grouping_config,
sort_config=report.sort_config,
user=user,
)
# Generate summary
summary = ReportBuilderService.generate_summary(report.data_source, filter_config, user=user)
# Update last run
report.last_run_at = timezone.now()
report.last_run_count = len(report_data.get("rows", []))
report.save(update_fields=["last_run_at", "last_run_count"])
context = {
"report": report,
"data": report_data,
"summary": summary,
"source_fields": ReportBuilderService.SOURCE_FIELDS.get(report.data_source, {}),
}
return render(request, "reports/report_detail.html", context)
@login_required
def delete_report(request, report_id):
"""Delete a saved report."""
report = get_object_or_404(SavedReport, id=report_id, created_by=request.user)
if request.method == "POST":
report.delete()
messages.success(request, "Report deleted successfully.")
return redirect("reports:saved_reports")
return render(request, "reports/report_confirm_delete.html", {"report": report})
@login_required
def export_report(request, report_id, export_format):
"""Export a report to Excel, PDF, or CSV."""
user = request.user
report = get_object_or_404(SavedReport, id=report_id)
# Check access
hospital = getattr(request, "tenant_hospital", None) or user.hospital
if report.created_by != user and not (report.is_shared and report.hospital == hospital):
if not user.is_px_admin():
messages.error(request, "You don't have access to this report.")
return redirect("reports:saved_reports")
# Apply user's hospital restriction
filter_config = report.filter_config.copy()
if not user.is_px_admin() and hospital:
filter_config["hospital"] = str(hospital.id)
# Generate report data
report_data = ReportBuilderService.generate_report_data(
data_source=report.data_source,
filter_config=filter_config,
column_config=report.column_config,
grouping_config=report.grouping_config,
sort_config=report.sort_config,
user=user,
)
rows = report_data.get("rows", [])
columns = report_data.get("columns", [])
column_keys = report_data.get("column_keys", columns) # Use keys if available, fallback to labels
# Generate filename
filename = f"{report.name.replace(' ', '_')}_{timezone.now().strftime('%Y%m%d')}"
# Export based on format
if export_format == "csv":
return ReportExportService.export_to_csv(rows, columns, column_keys, filename)
elif export_format == "excel":
return ReportExportService.export_to_excel(rows, columns, column_keys, filename)
elif export_format == "pdf":
return ReportExportService.export_to_pdf(rows, columns, column_keys, report.name, filename)
else:
messages.error(request, f"Unsupported export format: {export_format}")
return redirect("reports:report_detail", report_id=report_id)
@login_required
def report_templates(request):
"""List available report templates."""
templates = ReportTemplate.objects.filter(is_active=True).order_by("category", "sort_order", "name")
# Group by category
categories = {}
for template in templates:
cat = template.category or "General"
if cat not in categories:
categories[cat] = []
categories[cat].append(template)
context = {
"categories": categories,
"templates": templates,
}
return render(request, "reports/report_templates.html", context)
@login_required
def use_template(request, template_id):
"""Create a report from a template."""
template = get_object_or_404(ReportTemplate, id=template_id, is_active=True)
if request.method == "POST":
# Create report from template with overrides
overrides = {
"name": request.POST.get("name", f"{template.name} - {timezone.now().strftime('%Y-%m-%d')}"),
}
# Apply any filter overrides from the form
for key, value in request.POST.items():
if key.startswith("filter_"):
filter_key = key[7:] # Remove 'filter_' prefix
if "filter_config" not in overrides:
overrides["filter_config"] = template.filter_config.copy()
overrides["filter_config"][filter_key] = value
report = template.create_report(request.user, overrides)
messages.success(request, f"Report created from template: {template.name}")
return redirect("reports:report_detail", report_id=report.id)
# Get available filter options
hospitals = Hospital.objects.filter(status="active")
if not request.user.is_px_admin() and request.user.hospital:
hospitals = hospitals.filter(id=request.user.hospital.id)
context = {
"template": template,
"hospitals": hospitals,
"source_filters": ReportBuilderService.SOURCE_FILTERS.get(template.data_source, []),
}
return render(request, "reports/use_template.html", context)
@login_required
def filter_options_api(request):
"""API endpoint to get filter options for a data source."""
data_source = request.GET.get("data_source", "complaints")
options = {}
# Status options - use defined choices, not database queries
if data_source == "complaints":
from apps.complaints.models import Complaint
# Get unique status values from model choices
options["status"] = (
[choice[0] for choice in Complaint.STATUS_CHOICES]
if hasattr(Complaint, "STATUS_CHOICES")
else ["open", "in_progress", "resolved", "closed"]
)
options["severity"] = ["low", "medium", "high", "critical"]
options["priority"] = ["low", "medium", "high", "urgent"]
# Get unique source types from model choices or use defaults
options["source"] = ["walk_in", "call", "email", "website", "social_media", "app"]
elif data_source == "inquiries":
from apps.complaints.models import Complaint
options["status"] = (
[choice[0] for choice in Complaint.STATUS_CHOICES]
if hasattr(Complaint, "STATUS_CHOICES")
else ["open", "in_progress", "resolved", "closed"]
)
elif data_source == "observations":
from apps.observations.models import Observation, ObservationStatus
options["status"] = [s.value for s in ObservationStatus]
options["severity"] = ["low", "medium", "high", "critical"]
elif data_source == "surveys":
options["status"] = ["pending", "sent", "completed", "expired"]
options["patient_type"] = ["inpatient", "outpatient", "emergency"]
options["journey_type"] = ["admission", "discharge", "visit"]
elif data_source == "px_actions":
options["status"] = ["open", "in_progress", "completed", "closed"]
options["priority"] = ["low", "medium", "high", "urgent"]
elif data_source == "physicians":
options["journey_type"] = ["inpatient", "outpatient", "emergency"]
# Hospital options
hospitals = Hospital.objects.filter(status="active")
if not request.user.is_px_admin() and request.user.hospital:
hospitals = hospitals.filter(id=request.user.hospital.id)
options["hospitals"] = list(hospitals.values("id", "name"))
# Department options (filtered by hospital if provided)
hospital_id = request.GET.get("hospital")
departments = Department.objects.filter(status="active")
if hospital_id:
departments = departments.filter(hospital_id=hospital_id)
elif not request.user.is_px_admin() and request.user.hospital:
departments = departments.filter(hospital=request.user.hospital)
options["departments"] = list(departments.values("id", "name"))
# Available columns for the data source
fields = ReportBuilderService.SOURCE_FIELDS.get(data_source, {})
# Default columns (first 8 fields)
default_columns = list(fields.keys())[:8]
options["columns"] = [
{"key": key, "label": info["label"], "type": info["type"], "selected": key in default_columns}
for key, info in fields.items()
]
return JsonResponse(options)
@login_required
def available_fields_api(request):
"""API endpoint to get available fields for a data source."""
data_source = request.GET.get("data_source", "complaints")
fields = ReportBuilderService.SOURCE_FIELDS.get(data_source, {})
return JsonResponse({"fields": {k: {"label": v["label"], "type": v["type"]} for k, v in fields.items()}})