Removed ~300 lines of redundant hospital filtering code from views. Templates no longer use hospital dropdowns, so views don't need to: - Query Hospital.objects.filter() - Apply RBAC filtering to hospitals queryset - Pass hospitals to context The middleware (TenantMiddleware) already handles hospital filtering via request.tenant_hospital for all users. Files cleaned: - apps/surveys/ui_views.py - apps/callcenter/ui_views.py - apps/complaints/ui_views.py - apps/analytics/ui_views.py - apps/physicians/ui_views.py - apps/projects/ui_views.py - apps/feedback/views.py - apps/dashboard/views.py - apps/journeys/ui_views.py - apps/appreciation/ui_views.py
1680 lines
62 KiB
Python
1680 lines
62 KiB
Python
"""
|
|
Survey Console UI views - Server-rendered templates for survey management
|
|
"""
|
|
|
|
from django.contrib import messages
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.core.paginator import Paginator
|
|
from django.db.models import Q, Prefetch, Avg, Count, F, Case, When, IntegerField
|
|
from django.db.models.functions import TruncDate
|
|
from django.http import FileResponse, HttpResponse
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.utils import timezone
|
|
from django.views.decorators.http import require_http_methods
|
|
from django.db.models import ExpressionWrapper, FloatField
|
|
|
|
from apps.core.services import AuditService
|
|
from apps.core.decorators import block_source_user
|
|
from apps.organizations.models import Department, Hospital
|
|
|
|
from .forms import (
|
|
ManualSurveySendForm,
|
|
SurveyQuestionFormSet,
|
|
SurveyTemplateForm,
|
|
ManualPhoneSurveySendForm,
|
|
BulkCSVSurveySendForm,
|
|
)
|
|
from .services import SurveyDeliveryService
|
|
from .models import SurveyInstance, SurveyTemplate, SurveyQuestion
|
|
from .tasks import send_satisfaction_feedback
|
|
from datetime import datetime
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def survey_instance_list(request):
|
|
"""
|
|
Survey instances list view with filters.
|
|
|
|
Features:
|
|
- Server-side pagination
|
|
- Filters (status, journey type, hospital, date range)
|
|
- Search by patient MRN
|
|
- Score display
|
|
"""
|
|
# Source Users don't have access to surveys
|
|
if request.user.is_source_user():
|
|
from django.core.exceptions import PermissionDenied
|
|
|
|
raise PermissionDenied("Source users do not have access to surveys.")
|
|
|
|
# Base queryset with optimizations
|
|
queryset = SurveyInstance.objects.select_related(
|
|
"survey_template", "patient", "journey_instance__journey_template"
|
|
).prefetch_related("responses__question")
|
|
|
|
# Apply RBAC filters
|
|
user = request.user
|
|
if user.is_px_admin():
|
|
pass # See all
|
|
elif user.is_hospital_admin() and user.hospital:
|
|
queryset = queryset.filter(survey_template__hospital=user.hospital)
|
|
elif user.hospital:
|
|
queryset = queryset.filter(survey_template__hospital=user.hospital)
|
|
else:
|
|
queryset = queryset.none()
|
|
|
|
# Apply filters
|
|
status_filter = request.GET.get("status")
|
|
if status_filter:
|
|
queryset = queryset.filter(status=status_filter)
|
|
|
|
survey_type = request.GET.get("survey_type")
|
|
if survey_type:
|
|
queryset = queryset.filter(survey_template__survey_type=survey_type)
|
|
|
|
is_negative = request.GET.get("is_negative")
|
|
if is_negative == "true":
|
|
queryset = queryset.filter(is_negative=True)
|
|
|
|
hospital_filter = request.GET.get("hospital")
|
|
if hospital_filter:
|
|
queryset = queryset.filter(survey_template__hospital_id=hospital_filter)
|
|
|
|
# Search
|
|
search_query = request.GET.get("search")
|
|
if search_query:
|
|
queryset = queryset.filter(
|
|
Q(patient__mrn__icontains=search_query)
|
|
| Q(patient__first_name__icontains=search_query)
|
|
| Q(patient__last_name__icontains=search_query)
|
|
| Q(encounter_id__icontains=search_query)
|
|
)
|
|
|
|
# Date range
|
|
date_from = request.GET.get("date_from")
|
|
if date_from:
|
|
queryset = queryset.filter(sent_at__gte=date_from)
|
|
|
|
date_to = request.GET.get("date_to")
|
|
if date_to:
|
|
queryset = queryset.filter(sent_at__lte=date_to)
|
|
|
|
# Ordering
|
|
order_by = request.GET.get("order_by", "-created_at")
|
|
queryset = queryset.order_by(order_by)
|
|
|
|
# Pagination
|
|
page_size = int(request.GET.get("page_size", 25))
|
|
paginator = Paginator(queryset, page_size)
|
|
page_number = request.GET.get("page", 1)
|
|
page_obj = paginator.get_page(page_number)
|
|
|
|
context = {
|
|
"surveys": page_obj,
|
|
"filters": request.GET,
|
|
}
|
|
|
|
return render(request, "surveys/instance_list.html", context)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def survey_instance_detail(request, pk):
|
|
"""
|
|
Survey instance detail view with responses.
|
|
|
|
Features:
|
|
- Full survey details
|
|
- All responses
|
|
- Score breakdown
|
|
- Related journey/stage info
|
|
- Score comparison with template average
|
|
- Related surveys from same patient
|
|
"""
|
|
survey = get_object_or_404(
|
|
SurveyInstance.objects.select_related(
|
|
"survey_template", "patient", "journey_instance__journey_template"
|
|
).prefetch_related("responses__question"),
|
|
pk=pk,
|
|
)
|
|
|
|
# Get responses
|
|
responses = survey.responses.all().order_by("question__order")
|
|
|
|
# Calculate average score for this survey template
|
|
template_average = (
|
|
SurveyInstance.objects.filter(survey_template=survey.survey_template, status="completed").aggregate(
|
|
avg_score=Avg("total_score")
|
|
)["avg_score"]
|
|
or 0
|
|
)
|
|
|
|
# Get related surveys from the same patient
|
|
related_surveys = (
|
|
SurveyInstance.objects.filter(patient=survey.patient, status="completed")
|
|
.exclude(id=survey.id)
|
|
.select_related("survey_template")
|
|
.order_by("-completed_at")[:5]
|
|
)
|
|
|
|
# Get response statistics for each question (for choice questions)
|
|
question_stats = {}
|
|
for response in responses:
|
|
if response.question.question_type in ["multiple_choice", "single_choice"]:
|
|
choice_responses = (
|
|
SurveyInstance.objects.filter(survey_template=survey.survey_template, status="completed")
|
|
.values(f"responses__choice_value")
|
|
.annotate(count=Count("id"))
|
|
.filter(responses__question=response.question)
|
|
.order_by("-count")
|
|
)
|
|
|
|
question_stats[response.question.id] = {
|
|
"type": "choice",
|
|
"options": [
|
|
{
|
|
"value": opt["responses__choice_value"],
|
|
"count": opt["count"],
|
|
"percentage": round(
|
|
(opt["count"] / choice_responses.count() * 100) if choice_responses.count() > 0 else 0, 1
|
|
),
|
|
}
|
|
for opt in choice_responses
|
|
if opt["responses__choice_value"]
|
|
],
|
|
}
|
|
elif response.question.question_type == "rating":
|
|
rating_stats = SurveyInstance.objects.filter(
|
|
survey_template=survey.survey_template, status="completed"
|
|
).aggregate(avg_rating=Avg("responses__numeric_value"), total_responses=Count("responses"))
|
|
question_stats[response.question.id] = {
|
|
"type": "rating",
|
|
"average": round(rating_stats["avg_rating"] or 0, 2),
|
|
"total_responses": rating_stats["total_responses"] or 0,
|
|
}
|
|
|
|
context = {
|
|
"survey": survey,
|
|
"responses": responses,
|
|
"template_average": round(template_average, 2),
|
|
"related_surveys": related_surveys,
|
|
"question_stats": question_stats,
|
|
}
|
|
|
|
return render(request, "surveys/instance_detail.html", context)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def survey_template_list(request):
|
|
"""Survey templates list view"""
|
|
queryset = SurveyTemplate.objects.select_related("hospital").prefetch_related("questions")
|
|
|
|
# Apply RBAC filters
|
|
user = request.user
|
|
if user.is_px_admin():
|
|
# PX Admins see templates for their selected hospital (from session)
|
|
tenant_hospital = getattr(request, "tenant_hospital", None)
|
|
if tenant_hospital:
|
|
queryset = queryset.filter(hospital=tenant_hospital)
|
|
else:
|
|
# If no hospital selected, show none (user needs to select a hospital)
|
|
queryset = queryset.none()
|
|
elif user.hospital:
|
|
queryset = queryset.filter(hospital=user.hospital)
|
|
else:
|
|
queryset = queryset.none()
|
|
|
|
# Apply filters
|
|
survey_type = request.GET.get("survey_type")
|
|
if survey_type:
|
|
queryset = queryset.filter(survey_type=survey_type)
|
|
|
|
hospital_filter = request.GET.get("hospital")
|
|
if hospital_filter:
|
|
queryset = queryset.filter(hospital_id=hospital_filter)
|
|
|
|
is_active = request.GET.get("is_active")
|
|
if is_active == "true":
|
|
queryset = queryset.filter(is_active=True)
|
|
elif is_active == "false":
|
|
queryset = queryset.filter(is_active=False)
|
|
|
|
# Ordering
|
|
queryset = queryset.order_by("hospital", "survey_type", "name")
|
|
|
|
# Pagination
|
|
page_size = int(request.GET.get("page_size", 25))
|
|
paginator = Paginator(queryset, page_size)
|
|
page_number = request.GET.get("page", 1)
|
|
page_obj = paginator.get_page(page_number)
|
|
|
|
context = {
|
|
"page_obj": page_obj,
|
|
"templates": page_obj.object_list,
|
|
"filters": request.GET,
|
|
}
|
|
|
|
return render(request, "surveys/template_list.html", context)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def survey_template_create(request):
|
|
"""Create a new survey template with questions"""
|
|
# Check permission
|
|
user = request.user
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
messages.error(request, "You don't have permission to create survey templates.")
|
|
return redirect("surveys:template_list")
|
|
|
|
if request.method == "POST":
|
|
form = SurveyTemplateForm(request.POST, user=user)
|
|
formset = SurveyQuestionFormSet(request.POST)
|
|
|
|
if form.is_valid() and formset.is_valid():
|
|
template = form.save(commit=False)
|
|
template.created_by = user
|
|
template.save()
|
|
|
|
questions = formset.save(commit=False)
|
|
for question in questions:
|
|
question.survey_template = template
|
|
question.save()
|
|
|
|
messages.success(request, "Survey template created successfully.")
|
|
return redirect("surveys:template_detail", pk=template.pk)
|
|
else:
|
|
form = SurveyTemplateForm(user=user)
|
|
formset = SurveyQuestionFormSet()
|
|
|
|
context = {
|
|
"form": form,
|
|
"formset": formset,
|
|
}
|
|
|
|
return render(request, "surveys/template_form.html", context)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def survey_template_detail(request, pk):
|
|
"""View survey template details"""
|
|
template = get_object_or_404(SurveyTemplate.objects.select_related("hospital").prefetch_related("questions"), pk=pk)
|
|
|
|
# Check permission
|
|
user = request.user
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
if user.hospital and template.hospital != user.hospital:
|
|
messages.error(request, "You don't have permission to view this template.")
|
|
return redirect("surveys:template_list")
|
|
|
|
# Get statistics
|
|
total_instances = template.instances.count()
|
|
completed_instances = template.instances.filter(status="completed").count()
|
|
negative_instances = template.instances.filter(is_negative=True).count()
|
|
avg_score = template.instances.filter(status="completed").aggregate(avg_score=Avg("total_score"))["avg_score"] or 0
|
|
|
|
context = {
|
|
"template": template,
|
|
"questions": template.questions.all().order_by("order"),
|
|
"stats": {
|
|
"total_instances": total_instances,
|
|
"completed_instances": completed_instances,
|
|
"negative_instances": negative_instances,
|
|
"completion_rate": round((completed_instances / total_instances * 100) if total_instances > 0 else 0, 1),
|
|
"avg_score": round(avg_score, 2),
|
|
},
|
|
}
|
|
|
|
return render(request, "surveys/template_detail.html", context)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def survey_template_edit(request, pk):
|
|
"""Edit an existing survey template with questions"""
|
|
template = get_object_or_404(SurveyTemplate, pk=pk)
|
|
|
|
# Check permission
|
|
user = request.user
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
if user.hospital and template.hospital != user.hospital:
|
|
messages.error(request, "You don't have permission to edit this template.")
|
|
return redirect("surveys:template_list")
|
|
|
|
if request.method == "POST":
|
|
form = SurveyTemplateForm(request.POST, instance=template, user=user)
|
|
formset = SurveyQuestionFormSet(request.POST, instance=template)
|
|
|
|
if form.is_valid() and formset.is_valid():
|
|
form.save()
|
|
formset.save()
|
|
|
|
messages.success(request, "Survey template updated successfully.")
|
|
return redirect("surveys:template_detail", pk=template.pk)
|
|
else:
|
|
form = SurveyTemplateForm(instance=template, user=user)
|
|
formset = SurveyQuestionFormSet(instance=template)
|
|
|
|
context = {
|
|
"form": form,
|
|
"formset": formset,
|
|
"template": template,
|
|
}
|
|
|
|
return render(request, "surveys/template_form.html", context)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def survey_template_delete(request, pk):
|
|
"""Delete a survey template"""
|
|
template = get_object_or_404(SurveyTemplate, pk=pk)
|
|
|
|
# Check permission
|
|
user = request.user
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
if user.hospital and template.hospital != user.hospital:
|
|
messages.error(request, "You don't have permission to delete this template.")
|
|
return redirect("surveys:template_list")
|
|
|
|
if request.method == "POST":
|
|
template_name = template.name
|
|
template.delete()
|
|
messages.success(request, f"Survey template '{template_name}' deleted successfully.")
|
|
return redirect("surveys:template_list")
|
|
|
|
context = {
|
|
"template": template,
|
|
}
|
|
|
|
return render(request, "surveys/template_confirm_delete.html", context)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
@require_http_methods(["POST"])
|
|
def survey_log_patient_contact(request, pk):
|
|
"""
|
|
Log patient contact for negative survey.
|
|
|
|
This records that the user contacted the patient to discuss
|
|
the negative survey feedback.
|
|
"""
|
|
survey = get_object_or_404(SurveyInstance, pk=pk)
|
|
|
|
# Check permission
|
|
user = request.user
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
if user.hospital and survey.survey_template.hospital != user.hospital:
|
|
messages.error(request, "You don't have permission to modify this survey.")
|
|
return redirect("surveys:instance_detail", pk=pk)
|
|
|
|
# Check if survey is negative
|
|
if not survey.is_negative:
|
|
messages.warning(request, "This survey is not marked as negative.")
|
|
return redirect("surveys:instance_detail", pk=pk)
|
|
|
|
# Get form data
|
|
contact_notes = request.POST.get("contact_notes", "")
|
|
issue_resolved = request.POST.get("issue_resolved") == "on"
|
|
|
|
if not contact_notes:
|
|
messages.error(request, "Please provide contact notes.")
|
|
return redirect("surveys:instance_detail", pk=pk)
|
|
|
|
try:
|
|
# Update survey
|
|
survey.patient_contacted = True
|
|
survey.patient_contacted_at = timezone.now()
|
|
survey.patient_contacted_by = user
|
|
survey.contact_notes = contact_notes
|
|
survey.issue_resolved = issue_resolved
|
|
survey.save(
|
|
update_fields=[
|
|
"patient_contacted",
|
|
"patient_contacted_at",
|
|
"patient_contacted_by",
|
|
"contact_notes",
|
|
"issue_resolved",
|
|
]
|
|
)
|
|
|
|
# Log audit
|
|
AuditService.log_event(
|
|
event_type="survey_patient_contacted",
|
|
description=f"Patient contacted for negative survey by {user.get_full_name()}",
|
|
user=user,
|
|
content_object=survey,
|
|
metadata={
|
|
"contact_notes": contact_notes,
|
|
"issue_resolved": issue_resolved,
|
|
"survey_score": float(survey.total_score) if survey.total_score else None,
|
|
},
|
|
)
|
|
|
|
status = "resolved" if issue_resolved else "discussed"
|
|
messages.success(request, f"Patient contact logged successfully. Issue marked as {status}.")
|
|
|
|
except Exception as e:
|
|
messages.error(request, f"Error logging patient contact: {str(e)}")
|
|
|
|
return redirect("surveys:instance_detail", pk=pk)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def survey_comments_list(request):
|
|
"""
|
|
Survey comments list view with AI analysis.
|
|
|
|
Features:
|
|
- Display all survey comments with AI analysis
|
|
- Filters (sentiment, survey type, hospital, date range)
|
|
- Search by patient MRN
|
|
- Server-side pagination
|
|
- PatientType display from journey
|
|
"""
|
|
# Base queryset - only completed surveys with comments
|
|
queryset = (
|
|
SurveyInstance.objects.select_related("survey_template", "patient", "journey_instance__journey_template")
|
|
.filter(status="completed", comment__isnull=False)
|
|
.exclude(comment="")
|
|
)
|
|
|
|
# Apply RBAC filters
|
|
user = request.user
|
|
if user.is_px_admin():
|
|
pass # See all
|
|
elif user.is_hospital_admin() and user.hospital:
|
|
queryset = queryset.filter(survey_template__hospital=user.hospital)
|
|
elif user.hospital:
|
|
queryset = queryset.filter(survey_template__hospital=user.hospital)
|
|
else:
|
|
queryset = queryset.none()
|
|
|
|
# Apply filters
|
|
sentiment_filter = request.GET.get("sentiment")
|
|
if sentiment_filter:
|
|
queryset = queryset.filter(comment_analysis__sentiment=sentiment_filter)
|
|
|
|
survey_type = request.GET.get("survey_type")
|
|
if survey_type:
|
|
queryset = queryset.filter(survey_template__survey_type=survey_type)
|
|
|
|
hospital_filter = request.GET.get("hospital")
|
|
if hospital_filter:
|
|
queryset = queryset.filter(survey_template__hospital_id=hospital_filter)
|
|
|
|
# Patient type filter
|
|
patient_type_filter = request.GET.get("patient_type")
|
|
if patient_type_filter:
|
|
# Map filter values to HIS codes
|
|
patient_type_map = {
|
|
"outpatient": ["1"],
|
|
"inpatient": ["2", "O"],
|
|
"emergency": ["3", "E"],
|
|
}
|
|
if patient_type_filter in patient_type_map:
|
|
codes = patient_type_map[patient_type_filter]
|
|
queryset = queryset.filter(metadata__patient_type__in=codes)
|
|
|
|
# Search
|
|
search_query = request.GET.get("search")
|
|
if search_query:
|
|
queryset = queryset.filter(
|
|
Q(patient__mrn__icontains=search_query)
|
|
| Q(patient__first_name__icontains=search_query)
|
|
| Q(patient__last_name__icontains=search_query)
|
|
| Q(comment__icontains=search_query)
|
|
)
|
|
|
|
# Date range
|
|
date_from = request.GET.get("date_from")
|
|
if date_from:
|
|
queryset = queryset.filter(completed_at__gte=date_from)
|
|
|
|
date_to = request.GET.get("date_to")
|
|
if date_to:
|
|
queryset = queryset.filter(completed_at__lte=date_to)
|
|
|
|
# Ordering
|
|
order_by = request.GET.get("order_by", "-completed_at")
|
|
queryset = queryset.order_by(order_by)
|
|
|
|
# Pagination
|
|
page_size = int(request.GET.get("page_size", 25))
|
|
paginator = Paginator(queryset, page_size)
|
|
page_number = request.GET.get("page", 1)
|
|
page_obj = paginator.get_page(page_number)
|
|
|
|
# Statistics
|
|
stats_queryset = SurveyInstance.objects.filter(status="completed", comment__isnull=False).exclude(comment="")
|
|
|
|
# Apply same RBAC filters to stats
|
|
if user.is_px_admin():
|
|
pass
|
|
elif user.is_hospital_admin() and user.hospital:
|
|
stats_queryset = stats_queryset.filter(survey_template__hospital=user.hospital)
|
|
elif user.hospital:
|
|
stats_queryset = stats_queryset.filter(survey_template__hospital=user.hospital)
|
|
else:
|
|
stats_queryset = stats_queryset.none()
|
|
|
|
total_count = stats_queryset.count()
|
|
positive_count = stats_queryset.filter(comment_analysis__sentiment="positive").count()
|
|
negative_count = stats_queryset.filter(comment_analysis__sentiment="negative").count()
|
|
neutral_count = stats_queryset.filter(comment_analysis__sentiment="neutral").count()
|
|
analyzed_count = stats_queryset.filter(comment_analyzed=True).count()
|
|
|
|
# Patient Type Distribution Chart Data
|
|
patient_type_distribution = []
|
|
patient_type_labels = []
|
|
patient_type_mapping = {
|
|
"1": "Outpatient",
|
|
"2": "Inpatient",
|
|
"O": "Inpatient",
|
|
"3": "Emergency",
|
|
"E": "Emergency",
|
|
}
|
|
|
|
# Count each patient type
|
|
outpatient_count = stats_queryset.filter(metadata__patient_type__in=["1"]).count()
|
|
inpatient_count = stats_queryset.filter(metadata__patient_type__in=["2", "O"]).count()
|
|
emergency_count = stats_queryset.filter(metadata__patient_type__in=["3", "E"]).count()
|
|
unknown_count = total_count - (outpatient_count + inpatient_count + emergency_count)
|
|
|
|
patient_type_distribution = [
|
|
{
|
|
"type": "outpatient",
|
|
"label": "Outpatient",
|
|
"count": outpatient_count,
|
|
"percentage": round((outpatient_count / total_count * 100) if total_count > 0 else 0, 1),
|
|
},
|
|
{
|
|
"type": "inpatient",
|
|
"label": "Inpatient",
|
|
"count": inpatient_count,
|
|
"percentage": round((inpatient_count / total_count * 100) if total_count > 0 else 0, 1),
|
|
},
|
|
{
|
|
"type": "emergency",
|
|
"label": "Emergency",
|
|
"count": emergency_count,
|
|
"percentage": round((emergency_count / total_count * 100) if total_count > 0 else 0, 1),
|
|
},
|
|
{
|
|
"type": "unknown",
|
|
"label": "N/A",
|
|
"count": unknown_count,
|
|
"percentage": round((unknown_count / total_count * 100) if total_count > 0 else 0, 1),
|
|
},
|
|
]
|
|
|
|
# Sentiment by Patient Type Chart Data
|
|
sentiment_by_patient_type = {
|
|
"types": ["Outpatient", "Inpatient", "Emergency", "N/A"],
|
|
"positive": [],
|
|
"negative": [],
|
|
"neutral": [],
|
|
}
|
|
|
|
# Calculate sentiment for each patient type
|
|
for pt_type, codes in [
|
|
("outpatient", ["1"]),
|
|
("inpatient", ["2", "O"]),
|
|
("emergency", ["3", "E"]),
|
|
("unknown", []),
|
|
]:
|
|
if codes:
|
|
pt_queryset = stats_queryset.filter(metadata__patient_type__in=codes)
|
|
else:
|
|
pt_queryset = stats_queryset.exclude(metadata__patient_type__in=["1", "2", "O", "3", "E"])
|
|
|
|
pt_positive = pt_queryset.filter(comment_analysis__sentiment="positive").count()
|
|
pt_negative = pt_queryset.filter(comment_analysis__sentiment="negative").count()
|
|
pt_neutral = pt_queryset.filter(comment_analysis__sentiment="neutral").count()
|
|
|
|
sentiment_by_patient_type["positive"].append(pt_positive)
|
|
sentiment_by_patient_type["negative"].append(pt_negative)
|
|
sentiment_by_patient_type["neutral"].append(pt_neutral)
|
|
|
|
# PatientType mapping helper - maps HIS codes to display values
|
|
# HIS codes: "1"=Inpatient, "2"/"O"=Outpatient, "3"/"E"=Emergency
|
|
PATIENT_TYPE_MAPPING = {
|
|
"1": {"label": "Outpatient", "icon": "bi-person-walking", "color": "bg-primary"},
|
|
"2": {"label": "Inpatient", "icon": "bi-hospital", "color": "bg-warning"},
|
|
"O": {"label": "Inpatient", "icon": "bi-hospital", "color": "bg-warning"},
|
|
"3": {"label": "Emergency", "icon": "bi-ambulance", "color": "bg-danger"},
|
|
"E": {"label": "Emergency", "icon": "bi-ambulance", "color": "bg-danger"},
|
|
}
|
|
|
|
def get_patient_type_display(survey):
|
|
"""Get patient type display for a survey from HIS metadata"""
|
|
# Get patient_type from survey metadata (saved by HIS adapter)
|
|
patient_type_code = survey.metadata.get("patient_type")
|
|
|
|
if patient_type_code:
|
|
type_info = PATIENT_TYPE_MAPPING.get(str(patient_type_code))
|
|
if type_info:
|
|
return {
|
|
"code": patient_type_code,
|
|
"label": type_info["label"],
|
|
"icon": type_info["icon"],
|
|
"color": type_info["color"],
|
|
}
|
|
|
|
# Fallback: try to get from journey if available
|
|
if survey.journey_instance and survey.journey_instance.journey_template:
|
|
journey_type = survey.journey_instance.journey_template.journey_type
|
|
journey_mapping = {
|
|
"opd": {"label": "Outpatient", "icon": "bi-person-walking", "color": "bg-primary"},
|
|
"inpatient": {"label": "Inpatient", "icon": "bi-hospital", "color": "bg-warning"},
|
|
"ems": {"label": "Emergency", "icon": "bi-ambulance", "color": "bg-danger"},
|
|
"day_case": {"label": "Day Case", "icon": "bi-calendar-day", "color": "bg-info"},
|
|
}
|
|
return journey_mapping.get(
|
|
journey_type, {"code": None, "label": "N/A", "icon": "bi-question-circle", "color": "bg-secondary"}
|
|
)
|
|
|
|
return {"code": None, "label": "N/A", "icon": "bi-question-circle", "color": "bg-secondary"}
|
|
|
|
# Add patient type info to surveys
|
|
for survey in page_obj.object_list:
|
|
survey.patient_type = get_patient_type_display(survey)
|
|
|
|
stats = {
|
|
"total": total_count,
|
|
"positive": positive_count,
|
|
"negative": negative_count,
|
|
"neutral": neutral_count,
|
|
"analyzed": analyzed_count,
|
|
"unanalyzed": total_count - analyzed_count,
|
|
}
|
|
|
|
# Serialize chart data to JSON
|
|
import json
|
|
|
|
patient_type_distribution_json = json.dumps(patient_type_distribution)
|
|
sentiment_by_patient_type_json = json.dumps(sentiment_by_patient_type)
|
|
|
|
context = {
|
|
"page_obj": page_obj,
|
|
"surveys": page_obj.object_list,
|
|
"stats": stats,
|
|
"filters": request.GET,
|
|
"patient_type_distribution": patient_type_distribution,
|
|
"sentiment_by_patient_type": sentiment_by_patient_type,
|
|
"patient_type_distribution_json": patient_type_distribution_json,
|
|
"sentiment_by_patient_type_json": sentiment_by_patient_type_json,
|
|
}
|
|
|
|
return render(request, "surveys/comment_list.html", context)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
@require_http_methods(["POST"])
|
|
def survey_send_satisfaction_feedback(request, pk):
|
|
"""
|
|
Send satisfaction feedback form to patient.
|
|
|
|
This creates and sends a feedback form to assess patient satisfaction
|
|
with how their negative survey concerns were addressed.
|
|
"""
|
|
survey = get_object_or_404(SurveyInstance, pk=pk)
|
|
|
|
# Check permission
|
|
user = request.user
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
if user.hospital and survey.survey_template.hospital != user.hospital:
|
|
messages.error(request, "You don't have permission to modify this survey.")
|
|
return redirect("surveys:instance_detail", pk=pk)
|
|
|
|
# Check if survey is negative
|
|
if not survey.is_negative:
|
|
messages.warning(request, "This survey is not marked as negative.")
|
|
return redirect("surveys:instance_detail", pk=pk)
|
|
|
|
# Check if patient was contacted
|
|
if not survey.patient_contacted:
|
|
messages.error(request, "Please log patient contact before sending satisfaction feedback.")
|
|
return redirect("surveys:instance_detail", pk=pk)
|
|
|
|
# Check if already sent
|
|
if survey.satisfaction_feedback_sent:
|
|
messages.warning(request, "Satisfaction feedback has already been sent for this survey.")
|
|
return redirect("surveys:instance_detail", pk=pk)
|
|
|
|
try:
|
|
# Trigger async task to send satisfaction feedback
|
|
send_satisfaction_feedback.delay(str(survey.id), str(user.id))
|
|
|
|
messages.success(
|
|
request,
|
|
"Satisfaction feedback form is being sent to the patient. "
|
|
"They will receive a link to provide their feedback.",
|
|
)
|
|
|
|
except Exception as e:
|
|
messages.error(request, f"Error sending satisfaction feedback: {str(e)}")
|
|
|
|
return redirect("surveys:instance_detail", pk=pk)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def manual_survey_send(request):
|
|
"""
|
|
Manually send a survey to a patient or staff member.
|
|
|
|
Features:
|
|
- Select survey template
|
|
- Choose recipient type (patient/staff)
|
|
- Search and select recipient
|
|
- Choose delivery channel (email/SMS)
|
|
- Add custom message (optional)
|
|
- Send survey immediately
|
|
"""
|
|
user = request.user
|
|
|
|
# Check permission
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
messages.error(request, "You don't have permission to send surveys manually.")
|
|
return redirect("surveys:instance_list")
|
|
|
|
if request.method == "POST":
|
|
form = ManualSurveySendForm(user, request.POST)
|
|
|
|
if form.is_valid():
|
|
try:
|
|
# Get form data
|
|
survey_template = form.cleaned_data["survey_template"]
|
|
recipient_type = form.cleaned_data["recipient_type"]
|
|
recipient_id = form.cleaned_data["recipient"]
|
|
delivery_channel = form.cleaned_data["delivery_channel"]
|
|
custom_message = form.cleaned_data.get("custom_message", "")
|
|
|
|
# Get recipient object
|
|
from apps.organizations.models import Patient, Staff
|
|
|
|
if recipient_type == "patient":
|
|
recipient = get_object_or_404(Patient, pk=recipient_id)
|
|
recipient_name = f"{recipient.first_name} {recipient.last_name}"
|
|
else:
|
|
recipient = get_object_or_404(Staff, pk=recipient_id)
|
|
recipient_name = f"{recipient.first_name} {recipient.last_name}"
|
|
|
|
# Check if recipient has contact info for selected channel
|
|
if delivery_channel == "email":
|
|
contact_info = recipient.email
|
|
if not contact_info:
|
|
messages.error(request, f"{recipient_type.title()} does not have an email address.")
|
|
return render(request, "surveys/manual_send.html", {"form": form})
|
|
elif delivery_channel == "sms":
|
|
contact_info = recipient.phone
|
|
if not contact_info:
|
|
messages.error(request, f"{recipient_type.title()} does not have a phone number.")
|
|
return render(request, "surveys/manual_send.html", {"form": form})
|
|
|
|
# Create survey instance
|
|
from .models import SurveyStatus
|
|
|
|
survey_instance = SurveyInstance.objects.create(
|
|
survey_template=survey_template,
|
|
patient=recipient if recipient_type == "patient" else None,
|
|
staff=recipient if recipient_type == "staff" else None,
|
|
hospital=survey_template.hospital,
|
|
delivery_channel=delivery_channel,
|
|
recipient_email=contact_info if delivery_channel == "email" else None,
|
|
recipient_phone=contact_info if delivery_channel == "sms" else None,
|
|
status=SurveyStatus.SENT,
|
|
metadata={
|
|
"sent_manually": True,
|
|
"sent_by": str(user.id),
|
|
"custom_message": custom_message,
|
|
"recipient_type": recipient_type,
|
|
},
|
|
)
|
|
|
|
# Send survey
|
|
success = SurveyDeliveryService.deliver_survey(survey_instance)
|
|
|
|
if success:
|
|
# Log audit
|
|
AuditService.log_event(
|
|
event_type="survey_sent_manually",
|
|
description=f"Survey sent manually to {recipient_name} ({recipient_type}) by {user.get_full_name()}",
|
|
user=user,
|
|
content_object=survey_instance,
|
|
metadata={
|
|
"survey_template": survey_template.name,
|
|
"recipient_type": recipient_type,
|
|
"recipient_id": recipient_id,
|
|
"delivery_channel": delivery_channel,
|
|
"custom_message": custom_message,
|
|
},
|
|
)
|
|
|
|
messages.success(
|
|
request,
|
|
f"Survey sent successfully to {recipient_name} via {delivery_channel.upper()}. "
|
|
f"Survey ID: {survey_instance.id}",
|
|
)
|
|
return redirect("surveys:instance_detail", pk=survey_instance.pk)
|
|
else:
|
|
messages.error(request, "Failed to send survey. Please try again.")
|
|
survey_instance.delete()
|
|
return render(request, "surveys/manual_send.html", {"form": form})
|
|
|
|
except Exception as e:
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.error(f"Error sending survey manually: {str(e)}", exc_info=True)
|
|
messages.error(request, f"Error sending survey: {str(e)}")
|
|
return render(request, "surveys/manual_send.html", {"form": form})
|
|
else:
|
|
form = ManualSurveySendForm(user)
|
|
|
|
context = {
|
|
"form": form,
|
|
}
|
|
|
|
return render(request, "surveys/manual_send.html", context)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def manual_survey_send_phone(request):
|
|
"""
|
|
Send survey to a manually entered phone number.
|
|
|
|
Features:
|
|
- Enter phone number directly
|
|
- Optional recipient name
|
|
- Send via SMS only
|
|
"""
|
|
user = request.user
|
|
|
|
# Check permission
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
messages.error(request, "You don't have permission to send surveys manually.")
|
|
return redirect("surveys:instance_list")
|
|
|
|
if request.method == "POST":
|
|
form = ManualPhoneSurveySendForm(user, request.POST)
|
|
|
|
if form.is_valid():
|
|
try:
|
|
survey_template = form.cleaned_data["survey_template"]
|
|
phone_number = form.cleaned_data["phone_number"]
|
|
recipient_name = form.cleaned_data.get("recipient_name", "")
|
|
custom_message = form.cleaned_data.get("custom_message", "")
|
|
|
|
# Create survey instance
|
|
from .models import SurveyStatus
|
|
|
|
survey_instance = SurveyInstance.objects.create(
|
|
survey_template=survey_template,
|
|
hospital=survey_template.hospital,
|
|
delivery_channel="sms",
|
|
recipient_phone=phone_number,
|
|
status=SurveyStatus.SENT,
|
|
metadata={
|
|
"sent_manually": True,
|
|
"sent_by": str(user.id),
|
|
"custom_message": custom_message,
|
|
"recipient_name": recipient_name,
|
|
"recipient_type": "manual_phone",
|
|
},
|
|
)
|
|
|
|
# Send survey
|
|
success = SurveyDeliveryService.deliver_survey(survey_instance)
|
|
|
|
if success:
|
|
# Log audit
|
|
AuditService.log_event(
|
|
event_type="survey_sent_manually_phone",
|
|
description=f"Survey sent manually to phone {phone_number} by {user.get_full_name()}",
|
|
user=user,
|
|
content_object=survey_instance,
|
|
metadata={
|
|
"survey_template": survey_template.name,
|
|
"phone_number": phone_number,
|
|
"recipient_name": recipient_name,
|
|
"custom_message": custom_message,
|
|
},
|
|
)
|
|
|
|
display_name = recipient_name if recipient_name else phone_number
|
|
messages.success(
|
|
request, f"Survey sent successfully to {display_name} via SMS. Survey ID: {survey_instance.id}"
|
|
)
|
|
return redirect("surveys:instance_detail", pk=survey_instance.pk)
|
|
else:
|
|
messages.error(request, "Failed to send survey. Please try again.")
|
|
survey_instance.delete()
|
|
return render(request, "surveys/manual_send_phone.html", {"form": form})
|
|
|
|
except Exception as e:
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.error(f"Error sending survey to phone: {str(e)}", exc_info=True)
|
|
messages.error(request, f"Error sending survey: {str(e)}")
|
|
return render(request, "surveys/manual_send_phone.html", {"form": form})
|
|
else:
|
|
form = ManualPhoneSurveySendForm(user)
|
|
|
|
context = {
|
|
"form": form,
|
|
}
|
|
|
|
return render(request, "surveys/manual_send_phone.html", context)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def manual_survey_send_csv(request):
|
|
"""
|
|
Bulk send surveys via CSV upload.
|
|
|
|
CSV Format:
|
|
phone_number,name(optional)
|
|
+966501234567,John Doe
|
|
+966501234568,Jane Smith
|
|
|
|
Features:
|
|
- Upload CSV with phone numbers
|
|
- Optional names for each recipient
|
|
- Bulk processing with success/failure reporting
|
|
"""
|
|
import csv
|
|
import io
|
|
from django.utils.translation import gettext as _
|
|
|
|
user = request.user
|
|
|
|
# Check permission
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
messages.error(request, "You don't have permission to send surveys manually.")
|
|
return redirect("surveys:instance_list")
|
|
|
|
if request.method == "POST":
|
|
form = BulkCSVSurveySendForm(user, request.POST, request.FILES)
|
|
|
|
if form.is_valid():
|
|
try:
|
|
survey_template = form.cleaned_data["survey_template"]
|
|
csv_file = form.cleaned_data["csv_file"]
|
|
custom_message = form.cleaned_data.get("custom_message", "")
|
|
|
|
# Parse CSV
|
|
decoded_file = csv_file.read().decode("utf-8-sig") # Handle BOM
|
|
io_string = io.StringIO(decoded_file)
|
|
reader = csv.reader(io_string)
|
|
|
|
# Skip header if present
|
|
first_row = next(reader, None)
|
|
if not first_row:
|
|
messages.error(request, "CSV file is empty.")
|
|
return render(request, "surveys/manual_send_csv.html", {"form": form})
|
|
|
|
# Check if first row is header or data
|
|
rows = []
|
|
if first_row[0].strip().lower() in ["phone", "phone_number", "mobile", "number", "tel"]:
|
|
# First row is header, use remaining rows
|
|
pass
|
|
else:
|
|
# First row is data
|
|
rows.append(first_row)
|
|
|
|
# Read remaining rows
|
|
for row in reader:
|
|
if row and row[0].strip(): # Skip empty rows
|
|
rows.append(row)
|
|
|
|
if not rows:
|
|
messages.error(request, "No valid phone numbers found in CSV.")
|
|
return render(request, "surveys/manual_send_csv.html", {"form": form})
|
|
|
|
# Process each row
|
|
from .models import SurveyStatus
|
|
|
|
success_count = 0
|
|
failed_count = 0
|
|
failed_numbers = []
|
|
created_instances = []
|
|
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
for row in rows:
|
|
try:
|
|
phone_number = row[0].strip() if len(row) > 0 else ""
|
|
recipient_name = row[1].strip() if len(row) > 1 else ""
|
|
|
|
# Clean phone number
|
|
phone_number = phone_number.replace(" ", "").replace("-", "").replace("(", "").replace(")", "")
|
|
|
|
logger.info(f"Processing row: phone={phone_number}, name={recipient_name}")
|
|
|
|
# Skip empty or invalid
|
|
if not phone_number or not phone_number.startswith("+"):
|
|
failed_count += 1
|
|
failed_numbers.append(f"{phone_number} (invalid format - must start with +)")
|
|
logger.warning(f"Invalid phone format: {phone_number}")
|
|
continue
|
|
|
|
# Create survey instance
|
|
survey_instance = SurveyInstance.objects.create(
|
|
survey_template=survey_template,
|
|
hospital=survey_template.hospital,
|
|
delivery_channel="sms",
|
|
recipient_phone=phone_number,
|
|
status=SurveyStatus.SENT,
|
|
metadata={
|
|
"sent_manually": True,
|
|
"sent_by": str(user.id),
|
|
"custom_message": custom_message,
|
|
"recipient_name": recipient_name,
|
|
"recipient_type": "csv_upload",
|
|
"csv_row": row,
|
|
},
|
|
)
|
|
|
|
logger.info(f"Created survey instance: {survey_instance.id}")
|
|
|
|
# Send survey
|
|
success = SurveyDeliveryService.deliver_survey(survey_instance)
|
|
|
|
logger.info(f"Survey delivery result for {phone_number}: success={success}")
|
|
|
|
if success:
|
|
success_count += 1
|
|
created_instances.append(survey_instance)
|
|
else:
|
|
failed_count += 1
|
|
failed_numbers.append(f"{phone_number} (delivery failed - check logs)")
|
|
survey_instance.delete()
|
|
|
|
except Exception as e:
|
|
failed_count += 1
|
|
phone_display = phone_number if "phone_number" in locals() else "unknown"
|
|
failed_numbers.append(f"{phone_display} (exception: {str(e)})")
|
|
logger.error(f"Exception processing row {row}: {str(e)}", exc_info=True)
|
|
|
|
# Log audit for bulk operation
|
|
AuditService.log_event(
|
|
event_type="survey_sent_manually_csv",
|
|
description=f"Bulk survey send via CSV: {success_count} successful, {failed_count} failed by {user.get_full_name()}",
|
|
user=user,
|
|
metadata={
|
|
"survey_template": survey_template.name,
|
|
"success_count": success_count,
|
|
"failed_count": failed_count,
|
|
"total_count": len(rows),
|
|
"custom_message": custom_message,
|
|
},
|
|
)
|
|
|
|
# Show results
|
|
if success_count > 0 and failed_count == 0:
|
|
messages.success(request, f"Successfully sent {success_count} surveys from CSV.")
|
|
elif success_count > 0 and failed_count > 0:
|
|
messages.warning(
|
|
request,
|
|
f"Sent {success_count} surveys successfully. {failed_count} failed. Check failed numbers: {', '.join(failed_numbers[:5])}{'...' if len(failed_numbers) > 5 else ''}",
|
|
)
|
|
else:
|
|
messages.error(
|
|
request, f"All {failed_count} surveys failed to send. Check the phone numbers and try again."
|
|
)
|
|
|
|
# Redirect to list view with filter for these surveys
|
|
if created_instances:
|
|
return redirect("surveys:instance_list")
|
|
else:
|
|
return render(request, "surveys/manual_send_csv.html", {"form": form})
|
|
|
|
except Exception as e:
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.error(f"Error processing CSV upload: {str(e)}", exc_info=True)
|
|
messages.error(request, f"Error processing CSV: {str(e)}")
|
|
return render(request, "surveys/manual_send_csv.html", {"form": form})
|
|
else:
|
|
form = BulkCSVSurveySendForm(user)
|
|
|
|
context = {
|
|
"form": form,
|
|
}
|
|
|
|
return render(request, "surveys/manual_send_csv.html", context)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def survey_analytics_reports(request):
|
|
"""
|
|
Survey analytics reports management page.
|
|
|
|
Features:
|
|
- List all available reports
|
|
- View report details
|
|
- Generate new reports
|
|
- Download reports
|
|
- Delete reports
|
|
"""
|
|
import os
|
|
from django.conf import settings
|
|
|
|
user = request.user
|
|
|
|
# Check permission - only admins and hospital admins can view reports
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
messages.error(request, "You don't have permission to view analytics reports.")
|
|
return redirect("surveys:instance_list")
|
|
|
|
output_dir = getattr(settings, "SURVEY_REPORTS_DIR", "reports")
|
|
|
|
# Get all reports
|
|
reports = []
|
|
if os.path.exists(output_dir):
|
|
for filename in os.listdir(output_dir):
|
|
filepath = os.path.join(output_dir, filename)
|
|
if os.path.isfile(filepath):
|
|
stat = os.stat(filepath)
|
|
report_type = "unknown"
|
|
if filename.endswith(".json"):
|
|
report_type = "json"
|
|
elif filename.endswith(".html"):
|
|
report_type = "html"
|
|
elif filename.endswith(".md"):
|
|
report_type = "markdown"
|
|
|
|
reports.append(
|
|
{
|
|
"filename": filename,
|
|
"type": report_type,
|
|
"size": stat.st_size,
|
|
"size_human": _human_readable_size(stat.st_size),
|
|
"created": stat.st_ctime,
|
|
"created_date": datetime.fromtimestamp(stat.st_ctime),
|
|
"modified": stat.st_mtime,
|
|
}
|
|
)
|
|
|
|
# Sort by creation date (newest first)
|
|
reports.sort(key=lambda x: x["created"], reverse=True)
|
|
|
|
# Statistics
|
|
total_reports = len(reports)
|
|
json_reports = len([r for r in reports if r["type"] == "json"])
|
|
html_reports = len([r for r in reports if r["type"] == "html"])
|
|
markdown_reports = len([r for r in reports if r["type"] == "markdown"])
|
|
|
|
context = {
|
|
"reports": reports,
|
|
"stats": {
|
|
"total": total_reports,
|
|
"json": json_reports,
|
|
"html": html_reports,
|
|
"markdown": markdown_reports,
|
|
},
|
|
}
|
|
|
|
return render(request, "surveys/analytics_reports.html", context)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def survey_analytics_report_view(request, filename):
|
|
"""
|
|
View a specific survey analytics report.
|
|
|
|
Features:
|
|
- Display HTML reports in browser
|
|
- Provide download links for all formats
|
|
- Show report metadata
|
|
"""
|
|
import os
|
|
from django.conf import settings
|
|
|
|
user = request.user
|
|
|
|
# Check permission
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
messages.error(request, "You don't have permission to view analytics reports.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
output_dir = getattr(settings, "SURVEY_REPORTS_DIR", "reports")
|
|
filepath = os.path.join(output_dir, filename)
|
|
|
|
# Security check - ensure file is in reports directory
|
|
if not os.path.abspath(filepath).startswith(os.path.abspath(output_dir)):
|
|
messages.error(request, "Invalid report file.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
if not os.path.exists(filepath):
|
|
messages.error(request, "Report file not found.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
# Determine report type
|
|
report_type = "unknown"
|
|
if filename.endswith(".json"):
|
|
report_type = "json"
|
|
elif filename.endswith(".html"):
|
|
report_type = "html"
|
|
elif filename.endswith(".md"):
|
|
report_type = "markdown"
|
|
|
|
# Get file info
|
|
stat = os.stat(filepath)
|
|
|
|
# For HTML files, render them directly
|
|
if report_type == "html":
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
context = {
|
|
"filename": filename,
|
|
"content": content,
|
|
"report_type": report_type,
|
|
"size": stat.st_size,
|
|
"size_human": _human_readable_size(stat.st_size),
|
|
"created_date": datetime.fromtimestamp(stat.st_ctime),
|
|
"modified_date": datetime.fromtimestamp(stat.st_mtime),
|
|
}
|
|
|
|
return render(request, "surveys/analytics_report_view.html", context)
|
|
|
|
# For JSON and Markdown files, show info and provide download
|
|
context = {
|
|
"filename": filename,
|
|
"report_type": report_type,
|
|
"size": stat.st_size,
|
|
"size_human": _human_readable_size(stat.st_size),
|
|
"created_date": datetime.fromtimestamp(stat.st_ctime),
|
|
"modified_date": datetime.fromtimestamp(stat.st_mtime),
|
|
}
|
|
|
|
return render(request, "surveys/analytics_report_info.html", context)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def survey_analytics_report_download(request, filename):
|
|
"""
|
|
Download a survey analytics report file.
|
|
"""
|
|
import os
|
|
from django.conf import settings
|
|
from django.http import FileResponse
|
|
|
|
user = request.user
|
|
|
|
# Check permission
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
messages.error(request, "You don't have permission to download analytics reports.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
output_dir = getattr(settings, "SURVEY_REPORTS_DIR", "reports")
|
|
filepath = os.path.join(output_dir, filename)
|
|
|
|
# Security check
|
|
if not os.path.abspath(filepath).startswith(os.path.abspath(output_dir)):
|
|
messages.error(request, "Invalid report file.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
if not os.path.exists(filepath):
|
|
messages.error(request, "Report file not found.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
# Determine content type
|
|
content_type = "application/octet-stream"
|
|
if filename.endswith(".html"):
|
|
content_type = "text/html"
|
|
elif filename.endswith(".json"):
|
|
content_type = "application/json"
|
|
elif filename.endswith(".md"):
|
|
content_type = "text/markdown"
|
|
|
|
# Send file
|
|
return FileResponse(open(filepath, "rb"), content_type=content_type, as_attachment=True, filename=filename)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def survey_analytics_report_view_inline(request, filename):
|
|
"""
|
|
View a survey analytics report inline in the browser.
|
|
|
|
Unlike download, this displays the file content directly in the browser
|
|
without forcing a download. Works for HTML, JSON, and Markdown files.
|
|
"""
|
|
import os
|
|
from django.conf import settings
|
|
from django.http import FileResponse, HttpResponse
|
|
|
|
user = request.user
|
|
|
|
# Check permission
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
messages.error(request, "You don't have permission to view analytics reports.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
output_dir = getattr(settings, "SURVEY_REPORTS_DIR", "reports")
|
|
filepath = os.path.join(output_dir, filename)
|
|
|
|
# Security check
|
|
if not os.path.abspath(filepath).startswith(os.path.abspath(output_dir)):
|
|
messages.error(request, "Invalid report file.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
if not os.path.exists(filepath):
|
|
messages.error(request, "Report file not found.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
# Determine content type
|
|
content_type = "application/octet-stream"
|
|
if filename.endswith(".html"):
|
|
content_type = "text/html"
|
|
elif filename.endswith(".json"):
|
|
content_type = "application/json"
|
|
elif filename.endswith(".md"):
|
|
content_type = "text/plain; charset=utf-8" # Plain text for markdown
|
|
|
|
# For HTML and JSON, serve inline with FileResponse
|
|
if filename.endswith((".html", ".json")):
|
|
response = FileResponse(
|
|
open(filepath, "rb"),
|
|
content_type=content_type,
|
|
as_attachment=False, # Display inline
|
|
)
|
|
# Add CSP header to allow inline scripts for HTML reports
|
|
if filename.endswith(".html"):
|
|
response["Content-Security-Policy"] = (
|
|
"default-src 'self' 'unsafe-inline' 'unsafe-eval' cdn.jsdelivr.net cdnjs.cloudflare.com; script-src 'self' 'unsafe-inline' 'unsafe-eval' cdn.jsdelivr.net; style-src 'self' 'unsafe-inline' cdn.jsdelivr.net cdnjs.cloudflare.com;"
|
|
)
|
|
return response
|
|
|
|
# For Markdown, render it in a template with viewer
|
|
elif filename.endswith(".md"):
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
stat = os.stat(filepath)
|
|
context = {
|
|
"filename": filename,
|
|
"content": content,
|
|
"report_type": "markdown",
|
|
"size": stat.st_size,
|
|
"size_human": _human_readable_size(stat.st_size),
|
|
"created_date": datetime.fromtimestamp(stat.st_ctime),
|
|
"modified_date": datetime.fromtimestamp(stat.st_mtime),
|
|
}
|
|
return render(request, "surveys/analytics_report_markdown_view.html", context)
|
|
|
|
# For other files, fallback to download
|
|
return FileResponse(open(filepath, "rb"), content_type=content_type, as_attachment=True, filename=filename)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
@require_http_methods(["POST"])
|
|
def survey_analytics_report_delete(request, filename):
|
|
"""
|
|
Delete a survey analytics report file.
|
|
"""
|
|
import os
|
|
from django.conf import settings
|
|
|
|
user = request.user
|
|
|
|
# Check permission - only PX admins can delete reports
|
|
if not user.is_px_admin():
|
|
messages.error(request, "You don't have permission to delete analytics reports.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
output_dir = getattr(settings, "SURVEY_REPORTS_DIR", "reports")
|
|
filepath = os.path.join(output_dir, filename)
|
|
|
|
# Security check
|
|
if not os.path.abspath(filepath).startswith(os.path.abspath(output_dir)):
|
|
messages.error(request, "Invalid report file.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
if not os.path.exists(filepath):
|
|
messages.error(request, "Report file not found.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
try:
|
|
os.remove(filepath)
|
|
messages.success(request, f"Report '{filename}' deleted successfully.")
|
|
except Exception as e:
|
|
messages.error(request, f"Error deleting report: {str(e)}")
|
|
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
|
|
def _human_readable_size(size_bytes):
|
|
"""Convert bytes to human readable format"""
|
|
for unit in ["B", "KB", "MB", "GB"]:
|
|
if size_bytes < 1024.0:
|
|
return f"{size_bytes:.2f} {unit}"
|
|
size_bytes /= 1024.0
|
|
return f"{size_bytes:.2f} TB"
|
|
|
|
|
|
# ============================================================================
|
|
# ENHANCED SURVEY REPORTS - Separate reports per survey type
|
|
# ============================================================================
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def enhanced_survey_reports_list(request):
|
|
"""
|
|
List all enhanced survey report directories.
|
|
"""
|
|
from django.conf import settings
|
|
import os
|
|
|
|
user = request.user
|
|
|
|
# Check permission
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
messages.error(request, "You don't have permission to view analytics reports.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
output_dir = getattr(settings, "SURVEY_REPORTS_DIR", "reports")
|
|
|
|
# Find all report directories (folders starting with "reports_")
|
|
report_sets = []
|
|
if os.path.exists(output_dir):
|
|
for item in sorted(os.listdir(output_dir), reverse=True):
|
|
item_path = os.path.join(output_dir, item)
|
|
if os.path.isdir(item_path) and item.startswith("reports_"):
|
|
# Check if it has an index.html
|
|
index_path = os.path.join(item_path, "index.html")
|
|
if os.path.exists(index_path):
|
|
stat = os.stat(item_path)
|
|
# Count individual reports
|
|
report_count = len([f for f in os.listdir(item_path) if f.endswith(".html") and f != "index.html"])
|
|
|
|
report_sets.append(
|
|
{
|
|
"dir_name": item,
|
|
"created": datetime.fromtimestamp(stat.st_ctime),
|
|
"modified": datetime.fromtimestamp(stat.st_mtime),
|
|
"report_count": report_count,
|
|
"size": sum(
|
|
os.path.getsize(os.path.join(item_path, f))
|
|
for f in os.listdir(item_path)
|
|
if os.path.isfile(os.path.join(item_path, f))
|
|
),
|
|
}
|
|
)
|
|
|
|
context = {
|
|
"report_sets": report_sets,
|
|
}
|
|
|
|
return render(request, "surveys/enhanced_reports_list.html", context)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def enhanced_survey_report_view(request, dir_name):
|
|
"""
|
|
View the index of an enhanced report set.
|
|
"""
|
|
from django.conf import settings
|
|
import os
|
|
|
|
user = request.user
|
|
|
|
# Check permission
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
messages.error(request, "You don't have permission to view analytics reports.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
output_dir = getattr(settings, "SURVEY_REPORTS_DIR", "reports")
|
|
dir_path = os.path.join(output_dir, dir_name)
|
|
|
|
# Security check
|
|
if not os.path.abspath(dir_path).startswith(os.path.abspath(output_dir)):
|
|
messages.error(request, "Invalid report directory.")
|
|
return redirect("surveys:enhanced_reports_list")
|
|
|
|
if not os.path.exists(dir_path):
|
|
messages.error(request, "Report directory not found.")
|
|
return redirect("surveys:enhanced_reports_list")
|
|
|
|
# Serve the index.html file
|
|
index_path = os.path.join(dir_path, "index.html")
|
|
if os.path.exists(index_path):
|
|
with open(index_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
return HttpResponse(content, content_type="text/html")
|
|
|
|
messages.error(request, "Index file not found.")
|
|
return redirect("surveys:enhanced_reports_list")
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def enhanced_survey_report_file(request, dir_name, filename):
|
|
"""
|
|
View/serve an individual enhanced report file.
|
|
"""
|
|
from django.conf import settings
|
|
import os
|
|
from django.http import FileResponse
|
|
|
|
user = request.user
|
|
|
|
# Check permission
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
messages.error(request, "You don't have permission to view analytics reports.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
output_dir = getattr(settings, "SURVEY_REPORTS_DIR", "reports")
|
|
dir_path = os.path.join(output_dir, dir_name)
|
|
filepath = os.path.join(dir_path, filename)
|
|
|
|
# Security check
|
|
if not os.path.abspath(filepath).startswith(os.path.abspath(dir_path)):
|
|
messages.error(request, "Invalid file path.")
|
|
return redirect("surveys:enhanced_reports_list")
|
|
|
|
if not os.path.exists(filepath):
|
|
messages.error(request, "File not found.")
|
|
return redirect("surveys:enhanced_reports_list")
|
|
|
|
# Serve file based on type
|
|
if filename.endswith(".html"):
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
response = HttpResponse(content, content_type="text/html")
|
|
# Allow inline scripts/CDN resources
|
|
response["Content-Security-Policy"] = (
|
|
"default-src 'self' 'unsafe-inline' 'unsafe-eval' cdn.jsdelivr.net cdnjs.cloudflare.com unpkg.com; script-src 'self' 'unsafe-inline' 'unsafe-eval' cdn.jsdelivr.net unpkg.com; style-src 'self' 'unsafe-inline' cdn.jsdelivr.net cdnjs.cloudflare.com fonts.googleapis.com; font-src 'self' fonts.gstatic.com;"
|
|
)
|
|
return response
|
|
elif filename.endswith(".json"):
|
|
return FileResponse(open(filepath, "rb"), content_type="application/json")
|
|
else:
|
|
return FileResponse(open(filepath, "rb"), as_attachment=True)
|
|
|
|
|
|
@block_source_user
|
|
@login_required
|
|
def generate_enhanced_report_ui(request):
|
|
"""
|
|
UI view to generate enhanced reports with form.
|
|
"""
|
|
from apps.surveys.models import SurveyTemplate
|
|
from django.conf import settings
|
|
|
|
user = request.user
|
|
|
|
# Check permission
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
messages.error(request, "You don't have permission to generate analytics reports.")
|
|
return redirect("surveys:analytics_reports")
|
|
|
|
if request.method == "POST":
|
|
template_id = request.POST.get("template")
|
|
start_date = request.POST.get("start_date")
|
|
end_date = request.POST.get("end_date")
|
|
|
|
try:
|
|
# Get template name if specified
|
|
template_name = None
|
|
if template_id:
|
|
try:
|
|
template = SurveyTemplate.objects.get(id=template_id)
|
|
template_name = template.name
|
|
except SurveyTemplate.DoesNotExist:
|
|
pass
|
|
|
|
# Parse dates
|
|
start_dt = None
|
|
end_dt = None
|
|
if start_date:
|
|
start_dt = datetime.strptime(start_date, "%Y-%m-%d").date()
|
|
if end_date:
|
|
end_dt = datetime.strptime(end_date, "%Y-%m-%d").date()
|
|
|
|
# Generate enhanced reports
|
|
from .analytics_utils import generate_enhanced_survey_reports
|
|
|
|
result = generate_enhanced_survey_reports(template_name=template_name, start_date=start_dt, end_date=end_dt)
|
|
|
|
messages.success(request, f"Generated {len(result['individual_reports'])} reports successfully!")
|
|
|
|
# Redirect to the report directory
|
|
dir_name = os.path.basename(result["reports_dir"])
|
|
return redirect("surveys:enhanced_report_view", dir_name=dir_name)
|
|
|
|
except Exception as e:
|
|
messages.error(request, f"Error generating reports: {str(e)}")
|
|
return redirect("surveys:enhanced_reports_list")
|
|
|
|
# GET request - show form
|
|
templates = SurveyTemplate.objects.filter(is_active=True).order_by("name")
|
|
|
|
context = {
|
|
"templates": templates,
|
|
}
|
|
|
|
return render(request, "surveys/generate_enhanced_report.html", context)
|