HH/apps/analytics/ui_views.py
2026-01-05 19:40:24 +03:00

484 lines
16 KiB
Python

"""
Analytics Console UI views
"""
from datetime import datetime
from django.contrib.auth.decorators import login_required
from django.core.paginator import Paginator
from django.db.models import Avg, Count, F, Value
from django.db.models.functions import Concat
from django.http import JsonResponse
from django.shortcuts import render
from apps.complaints.models import Complaint
from apps.organizations.models import Department, Hospital
from apps.px_action_center.models import PXAction
from apps.surveys.models import SurveyInstance
from apps.physicians.models import PhysicianMonthlyRating
from .models import KPI, KPIValue
from .services import UnifiedAnalyticsService, ExportService
@login_required
def analytics_dashboard(request):
"""
Analytics dashboard with KPIs and charts.
Features:
- KPI cards with current values
- Trend charts
- Department rankings
- Physician rankings
"""
user = request.user
# Get hospital filter
hospital_filter = request.GET.get('hospital')
if hospital_filter:
hospital = Hospital.objects.filter(id=hospital_filter).first()
elif user.hospital:
hospital = user.hospital
else:
hospital = None
# Calculate key metrics
complaints_queryset = Complaint.objects.all()
actions_queryset = PXAction.objects.all()
surveys_queryset = SurveyInstance.objects.filter(status='completed')
if hospital:
complaints_queryset = complaints_queryset.filter(hospital=hospital)
actions_queryset = actions_queryset.filter(hospital=hospital)
surveys_queryset = surveys_queryset.filter(survey_template__hospital=hospital)
# KPI calculations
kpis = {
'total_complaints': complaints_queryset.count(),
'open_complaints': complaints_queryset.filter(status='open').count(),
'overdue_complaints': complaints_queryset.filter(is_overdue=True).count(),
'total_actions': actions_queryset.count(),
'open_actions': actions_queryset.filter(status='open').count(),
'overdue_actions': actions_queryset.filter(is_overdue=True).count(),
'avg_survey_score': surveys_queryset.aggregate(avg=Avg('total_score'))['avg'] or 0,
'negative_surveys': surveys_queryset.filter(is_negative=True).count(),
}
# Department rankings (top 5 by survey score)
department_rankings = Department.objects.filter(
status='active'
).annotate(
avg_score=Avg('journey_stages__survey_instance__total_score'),
survey_count=Count('journey_stages__survey_instance')
).filter(
survey_count__gt=0
).order_by('-avg_score')[:5]
# Get hospitals for filter
hospitals = Hospital.objects.filter(status='active')
if not user.is_px_admin() and user.hospital:
hospitals = hospitals.filter(id=user.hospital.id)
context = {
'kpis': kpis,
'department_rankings': department_rankings,
'hospitals': hospitals,
'selected_hospital': hospital,
}
return render(request, 'analytics/dashboard.html', context)
@login_required
def kpi_list(request):
"""KPI definitions list view"""
queryset = KPI.objects.all()
# Apply filters
category_filter = request.GET.get('category')
if category_filter:
queryset = queryset.filter(category=category_filter)
is_active = request.GET.get('is_active')
if is_active == 'true':
queryset = queryset.filter(is_active=True)
elif is_active == 'false':
queryset = queryset.filter(is_active=False)
# Ordering
queryset = queryset.order_by('category', 'name')
# Pagination
page_size = int(request.GET.get('page_size', 25))
paginator = Paginator(queryset, page_size)
page_number = request.GET.get('page', 1)
page_obj = paginator.get_page(page_number)
context = {
'page_obj': page_obj,
'kpis': page_obj.object_list,
'filters': request.GET,
}
return render(request, 'analytics/kpi_list.html', context)
@login_required
def command_center(request):
"""
PX Command Center - Unified Dashboard
Comprehensive dashboard showing all PX360 metrics:
- Complaints, Surveys, Actions KPIs
- Interactive charts with ApexCharts
- Department and Physician rankings
- Export to Excel/PDF
"""
user = request.user
# Get filter parameters
filters = {
'date_range': request.GET.get('date_range', '30d'),
'hospital': request.GET.get('hospital', ''),
'department': request.GET.get('department', ''),
'kpi_category': request.GET.get('kpi_category', ''),
'custom_start': request.GET.get('custom_start', ''),
'custom_end': request.GET.get('custom_end', ''),
}
# Get hospitals for filter
hospitals = Hospital.objects.filter(status='active')
if not user.is_px_admin() and user.hospital:
hospitals = hospitals.filter(id=user.hospital.id)
# Get departments for filter
departments = Department.objects.filter(status='active')
if filters.get('hospital'):
departments = departments.filter(hospital_id=filters['hospital'])
elif not user.is_px_admin() and user.hospital:
departments = departments.filter(hospital=user.hospital)
# Get initial KPIs
custom_start = None
custom_end = None
if filters['custom_start'] and filters['custom_end']:
custom_start = datetime.strptime(filters['custom_start'], '%Y-%m-%d')
custom_end = datetime.strptime(filters['custom_end'], '%Y-%m-%d')
kpis = UnifiedAnalyticsService.get_all_kpis(
user=user,
date_range=filters['date_range'],
hospital_id=filters['hospital'] if filters['hospital'] else None,
department_id=filters['department'] if filters['department'] else None,
custom_start=custom_start,
custom_end=custom_end
)
context = {
'filters': filters,
'hospitals': hospitals,
'departments': departments,
'kpis': kpis,
}
return render(request, 'analytics/command_center.html', context)
@login_required
def command_center_api(request):
"""
API endpoint for Command Center data
Returns JSON data for KPIs, charts, and tables based on filters.
Used by JavaScript to dynamically update dashboard.
"""
if request.method != 'GET':
return JsonResponse({'error': 'Only GET requests allowed'}, status=405)
user = request.user
# Get filter parameters
date_range = request.GET.get('date_range', '30d')
hospital_id = request.GET.get('hospital')
department_id = request.GET.get('department')
kpi_category = request.GET.get('kpi_category')
custom_start_str = request.GET.get('custom_start')
custom_end_str = request.GET.get('custom_end')
# Parse custom dates
custom_start = None
custom_end = None
if custom_start_str and custom_end_str:
try:
custom_start = datetime.strptime(custom_start_str, '%Y-%m-%d')
custom_end = datetime.strptime(custom_end_str, '%Y-%m-%d')
except ValueError:
pass
# Handle hospital_id (can be integer or UUID string)
hospital_id = hospital_id if hospital_id else None
# Handle department_id (UUID string)
department_id = department_id if department_id else None
# Get KPIs
kpis = UnifiedAnalyticsService.get_all_kpis(
user=user,
date_range=date_range,
hospital_id=hospital_id,
department_id=department_id,
kpi_category=kpi_category,
custom_start=custom_start,
custom_end=custom_end
)
# Ensure numeric KPIs are proper Python types for JSON serialization
numeric_kpis = [
'total_complaints', 'open_complaints', 'overdue_complaints',
'high_severity_complaints', 'resolved_complaints',
'total_actions', 'open_actions', 'overdue_actions', 'escalated_actions', 'resolved_actions',
'total_surveys', 'negative_surveys', 'avg_survey_score',
'negative_social_mentions', 'low_call_ratings', 'total_sentiment_analyses'
]
for key in numeric_kpis:
if key in kpis:
value = kpis[key]
if value is None:
kpis[key] = 0.0 if key == 'avg_survey_score' else 0
elif isinstance(value, (int, float)):
# Already a number - ensure floats for specific fields
if key == 'avg_survey_score':
kpis[key] = float(value)
else:
# Try to convert to number
try:
kpis[key] = float(value)
except (ValueError, TypeError):
kpis[key] = 0.0 if key == 'avg_survey_score' else 0
# Handle nested trend data
if 'complaints_trend' in kpis and isinstance(kpis['complaints_trend'], dict):
trend = kpis['complaints_trend']
trend['current'] = int(trend.get('current', 0))
trend['previous'] = int(trend.get('previous', 0))
trend['percentage_change'] = float(trend.get('percentage_change', 0))
# Get chart data
chart_types = [
'complaints_trend',
'complaints_by_category',
'survey_satisfaction_trend',
'survey_distribution',
'department_performance',
'physician_leaderboard'
]
charts = {}
for chart_type in chart_types:
charts[chart_type] = UnifiedAnalyticsService.get_chart_data(
user=user,
chart_type=chart_type,
date_range=date_range,
hospital_id=hospital_id,
department_id=department_id,
custom_start=custom_start,
custom_end=custom_end
)
# Get table data
tables = {}
# Overdue complaints table
complaints_qs = Complaint.objects.filter(is_overdue=True)
if hospital_id:
complaints_qs = complaints_qs.filter(hospital_id=hospital_id)
if department_id:
complaints_qs = complaints_qs.filter(department_id=department_id)
# Apply role-based filtering
if not user.is_px_admin() and user.hospital:
complaints_qs = complaints_qs.filter(hospital=user.hospital)
if user.is_department_manager() and user.department:
complaints_qs = complaints_qs.filter(department=user.department)
tables['overdue_complaints'] = list(
complaints_qs.select_related('hospital', 'department', 'patient')
.order_by('due_at')[:20]
.values(
'id',
'title',
'severity',
'due_at',
hospital_name=F('hospital__name'),
department_name=F('department__name'),
patient_name=Concat('patient__first_name', Value(' '), 'patient__last_name')
)
)
# Physician leaderboard table
physician_data = charts.get('physician_leaderboard', {}).get('metadata', [])
tables['physician_leaderboard'] = [
{
'physician_id': p['physician_id'],
'name': p['name'],
'specialization': p['specialization'],
'department': p['department'],
'rating': float(p['rating']) if p['rating'] is not None else 0.0,
'surveys': int(p['surveys']) if p['surveys'] is not None else 0,
'positive': int(p['positive']) if p['positive'] is not None else 0,
'neutral': int(p['neutral']) if p['neutral'] is not None else 0,
'negative': int(p['negative']) if p['negative'] is not None else 0
}
for p in physician_data
]
return JsonResponse({
'kpis': kpis,
'charts': charts,
'tables': tables
})
@login_required
def export_command_center(request, export_format):
"""
Export Command Center data to Excel or PDF
Args:
export_format: 'excel' or 'pdf'
Returns:
HttpResponse with file download
"""
if export_format not in ['excel', 'pdf']:
return JsonResponse({'error': 'Invalid export format'}, status=400)
user = request.user
# Get filter parameters
date_range = request.GET.get('date_range', '30d')
hospital_id = request.GET.get('hospital')
department_id = request.GET.get('department')
kpi_category = request.GET.get('kpi_category')
custom_start_str = request.GET.get('custom_start')
custom_end_str = request.GET.get('custom_end')
# Parse custom dates
custom_start = None
custom_end = None
if custom_start_str and custom_end_str:
try:
custom_start = datetime.strptime(custom_start_str, '%Y-%m-%d')
custom_end = datetime.strptime(custom_end_str, '%Y-%m-%d')
except ValueError:
pass
# Handle hospital_id and department_id (can be integer or UUID string)
hospital_id = hospital_id if hospital_id else None
department_id = department_id if department_id else None
# Get all data
kpis = UnifiedAnalyticsService.get_all_kpis(
user=user,
date_range=date_range,
hospital_id=hospital_id,
department_id=department_id,
kpi_category=kpi_category,
custom_start=custom_start,
custom_end=custom_end
)
chart_types = [
'complaints_trend',
'complaints_by_category',
'survey_satisfaction_trend',
'survey_distribution',
'department_performance',
'physician_leaderboard'
]
charts = {}
for chart_type in chart_types:
charts[chart_type] = UnifiedAnalyticsService.get_chart_data(
user=user,
chart_type=chart_type,
date_range=date_range,
hospital_id=hospital_id,
department_id=department_id,
custom_start=custom_start,
custom_end=custom_end
)
# Get table data
tables = {}
# Overdue complaints
complaints_qs = Complaint.objects.filter(is_overdue=True)
if hospital_id:
complaints_qs = complaints_qs.filter(hospital_id=hospital_id)
if department_id:
complaints_qs = complaints_qs.filter(department_id=department_id)
if not user.is_px_admin() and user.hospital:
complaints_qs = complaints_qs.filter(hospital=user.hospital)
if user.is_department_manager() and user.department:
complaints_qs = complaints_qs.filter(department=user.department)
tables['overdue_complaints'] = {
'headers': ['ID', 'Title', 'Patient', 'Severity', 'Hospital', 'Department', 'Due Date'],
'rows': list(
complaints_qs.select_related('hospital', 'department', 'patient')
.order_by('due_at')[:100]
.annotate(
patient_name=Concat('patient__first_name', Value(' '), 'patient__last_name'),
hospital_name=F('hospital__name'),
department_name=F('department__name')
)
.values_list(
'id',
'title',
'patient_name',
'severity',
'hospital_name',
'department_name',
'due_at'
)
)
}
# Physician leaderboard
physician_data = charts.get('physician_leaderboard', {}).get('metadata', [])
tables['physician_leaderboard'] = {
'headers': ['Name', 'Specialization', 'Department', 'Rating', 'Surveys', 'Positive', 'Neutral', 'Negative'],
'rows': [
[
p['name'],
p['specialization'],
p['department'],
str(p['rating']),
str(p['surveys']),
str(p['positive']),
str(p['neutral']),
str(p['negative'])
]
for p in physician_data
]
}
# Prepare export data
export_data = ExportService.prepare_dashboard_data(
user=user,
kpis=kpis,
charts=charts,
tables=tables
)
# Export based on format
if export_format == 'excel':
return ExportService.export_to_excel(export_data)
elif export_format == 'pdf':
return ExportService.export_to_pdf(export_data)
return JsonResponse({'error': 'Export failed'}, status=500)