Removed ~300 lines of redundant hospital filtering code from views. Templates no longer use hospital dropdowns, so views don't need to: - Query Hospital.objects.filter() - Apply RBAC filtering to hospitals queryset - Pass hospitals to context The middleware (TenantMiddleware) already handles hospital filtering via request.tenant_hospital for all users. Files cleaned: - apps/surveys/ui_views.py - apps/callcenter/ui_views.py - apps/complaints/ui_views.py - apps/analytics/ui_views.py - apps/physicians/ui_views.py - apps/projects/ui_views.py - apps/feedback/views.py - apps/dashboard/views.py - apps/journeys/ui_views.py - apps/appreciation/ui_views.py
982 lines
34 KiB
Python
982 lines
34 KiB
Python
"""
|
|
Call Center Console UI views
|
|
"""
|
|
|
|
from django.contrib import messages
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.core.paginator import Paginator
|
|
from django.db.models import Q, Avg
|
|
from django.http import JsonResponse
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.utils import timezone
|
|
from django.views.decorators.http import require_http_methods
|
|
from uuid import UUID
|
|
|
|
from apps.complaints.models import Complaint, Inquiry, ComplaintSource
|
|
from apps.px_sources.models import PXSource
|
|
from apps.core.services import AuditService
|
|
from apps.organizations.models import Department, Hospital, Patient, Staff
|
|
|
|
from .models import CallCenterInteraction, CallRecord
|
|
|
|
|
|
@login_required
|
|
def interaction_list(request):
|
|
"""Call center interactions list view"""
|
|
queryset = CallCenterInteraction.objects.select_related("patient", "hospital", "department", "agent")
|
|
|
|
# Apply RBAC filters
|
|
user = request.user
|
|
# Get selected hospital for PX Admins (from middleware)
|
|
selected_hospital = getattr(request, "tenant_hospital", None)
|
|
|
|
if user.is_px_admin():
|
|
# PX Admins see all, but filter by selected hospital if set
|
|
if selected_hospital:
|
|
queryset = queryset.filter(hospital=selected_hospital)
|
|
elif user.hospital:
|
|
queryset = queryset.filter(hospital=user.hospital)
|
|
else:
|
|
queryset = queryset.none()
|
|
|
|
# Apply filters
|
|
call_type_filter = request.GET.get("call_type")
|
|
if call_type_filter:
|
|
queryset = queryset.filter(call_type=call_type_filter)
|
|
|
|
hospital_filter = request.GET.get("hospital")
|
|
if hospital_filter:
|
|
queryset = queryset.filter(hospital_id=hospital_filter)
|
|
|
|
is_low_rating = request.GET.get("is_low_rating")
|
|
if is_low_rating == "true":
|
|
queryset = queryset.filter(is_low_rating=True)
|
|
|
|
# Search
|
|
search_query = request.GET.get("search")
|
|
if search_query:
|
|
queryset = queryset.filter(
|
|
Q(subject__icontains=search_query)
|
|
| Q(caller_name__icontains=search_query)
|
|
| Q(patient__mrn__icontains=search_query)
|
|
)
|
|
|
|
# Date range
|
|
date_from = request.GET.get("date_from")
|
|
if date_from:
|
|
queryset = queryset.filter(call_started_at__gte=date_from)
|
|
|
|
date_to = request.GET.get("date_to")
|
|
if date_to:
|
|
queryset = queryset.filter(call_started_at__lte=date_to)
|
|
|
|
# Ordering
|
|
queryset = queryset.order_by("-call_started_at")
|
|
|
|
# Pagination
|
|
page_size = int(request.GET.get("page_size", 25))
|
|
paginator = Paginator(queryset, page_size)
|
|
page_number = request.GET.get("page", 1)
|
|
page_obj = paginator.get_page(page_number)
|
|
|
|
stats = {
|
|
"total": queryset.count(),
|
|
"low_rating": queryset.filter(is_low_rating=True).count(),
|
|
"avg_satisfaction": queryset.filter(satisfaction_rating__isnull=False).aggregate(
|
|
avg=Avg("satisfaction_rating")
|
|
)["avg"]
|
|
or 0,
|
|
}
|
|
|
|
context = {
|
|
"page_obj": page_obj,
|
|
"interactions": page_obj.object_list,
|
|
"stats": stats,
|
|
"filters": request.GET,
|
|
}
|
|
|
|
return render(request, "callcenter/interaction_list.html", context)
|
|
|
|
|
|
@login_required
|
|
def interaction_detail(request, pk):
|
|
"""Call center interaction detail view"""
|
|
interaction = get_object_or_404(
|
|
CallCenterInteraction.objects.select_related("patient", "hospital", "department", "agent"), pk=pk
|
|
)
|
|
|
|
context = {
|
|
"interaction": interaction,
|
|
}
|
|
|
|
return render(request, "callcenter/interaction_detail.html", context)
|
|
|
|
|
|
# ============================================================================
|
|
# COMPLAINT CREATION FOR CALL CENTER
|
|
# ============================================================================
|
|
|
|
|
|
@login_required
|
|
@require_http_methods(["GET", "POST"])
|
|
def create_complaint(request):
|
|
"""
|
|
Create complaint from call center interaction.
|
|
|
|
Call center staff can create complaints on behalf of patients/callers.
|
|
"""
|
|
if request.method == "POST":
|
|
try:
|
|
# Get form data
|
|
patient_id = request.POST.get("patient_id", None)
|
|
hospital_id = request.POST.get("hospital_id")
|
|
department_id = request.POST.get("department_id", None)
|
|
staff_id = request.POST.get("staff_id", None)
|
|
|
|
title = request.POST.get("title")
|
|
description = request.POST.get("description")
|
|
category = request.POST.get("category")
|
|
subcategory = request.POST.get("subcategory", "")
|
|
priority = request.POST.get("priority")
|
|
severity = request.POST.get("severity")
|
|
encounter_id = request.POST.get("encounter_id", "")
|
|
|
|
# Call center specific fields
|
|
caller_name = request.POST.get("caller_name", "")
|
|
caller_phone = request.POST.get("caller_phone", "")
|
|
caller_relationship = request.POST.get("caller_relationship", "patient")
|
|
|
|
# Validate required fields
|
|
if not all([hospital_id, title, description, category, priority, severity]):
|
|
messages.error(request, "Please fill in all required fields.")
|
|
return redirect("callcenter:create_complaint")
|
|
|
|
# If no patient selected, we need caller info
|
|
if not patient_id and not caller_name:
|
|
messages.error(request, "Please provide either patient or caller information.")
|
|
return redirect("callcenter:create_complaint")
|
|
|
|
# Get first active source for call center
|
|
try:
|
|
call_center_source = PXSource.objects.filter(is_active=True).first()
|
|
except PXSource.DoesNotExist:
|
|
messages.error(request, "No active PX sources available.")
|
|
return redirect("callcenter:create_complaint")
|
|
|
|
# Create complaint
|
|
complaint = Complaint.objects.create(
|
|
patient_id=patient_id if patient_id else None,
|
|
hospital_id=hospital_id,
|
|
department_id=department_id if department_id else None,
|
|
staff_id=staff_id if staff_id else None,
|
|
title=title,
|
|
description=description,
|
|
category=category,
|
|
subcategory=subcategory,
|
|
priority=priority,
|
|
severity=severity,
|
|
source=call_center_source,
|
|
encounter_id=encounter_id,
|
|
)
|
|
|
|
# Create call center interaction record
|
|
CallCenterInteraction.objects.create(
|
|
patient_id=patient_id if patient_id else None,
|
|
caller_name=caller_name,
|
|
caller_phone=caller_phone,
|
|
caller_relationship=caller_relationship,
|
|
hospital_id=hospital_id,
|
|
department_id=department_id if department_id else None,
|
|
agent=request.user,
|
|
call_type="complaint",
|
|
subject=title,
|
|
notes=description,
|
|
metadata={
|
|
"complaint_id": str(complaint.id),
|
|
"category": category,
|
|
"severity": severity,
|
|
},
|
|
)
|
|
|
|
# Log audit
|
|
AuditService.log_event(
|
|
event_type="complaint_created",
|
|
description=f"Complaint created via call center: {complaint.title}",
|
|
user=request.user,
|
|
content_object=complaint,
|
|
metadata={
|
|
"category": complaint.category,
|
|
"severity": complaint.severity,
|
|
"source": "call_center",
|
|
"caller_name": caller_name,
|
|
},
|
|
)
|
|
|
|
messages.success(request, f"Complaint #{complaint.id} created successfully.")
|
|
return redirect("callcenter:complaint_success", pk=complaint.id)
|
|
|
|
except Exception as e:
|
|
messages.error(request, f"Error creating complaint: {str(e)}")
|
|
return redirect("callcenter:create_complaint")
|
|
|
|
return render(request, "callcenter/complaint_form.html", {})
|
|
|
|
|
|
@login_required
|
|
def complaint_success(request, pk):
|
|
"""Success page after creating complaint"""
|
|
complaint = get_object_or_404(Complaint, pk=pk)
|
|
|
|
context = {
|
|
"complaint": complaint,
|
|
}
|
|
|
|
return render(request, "callcenter/complaint_success.html", context)
|
|
|
|
|
|
@login_required
|
|
def complaint_list(request):
|
|
"""List complaints created by call center"""
|
|
queryset = Complaint.objects.filter(complaint_source=ComplaintSource.CALL_CENTER).select_related(
|
|
"patient", "hospital", "department", "staff", "assigned_to"
|
|
)
|
|
|
|
# Apply RBAC filters
|
|
user = request.user
|
|
if user.is_px_admin():
|
|
pass
|
|
elif user.hospital:
|
|
queryset = queryset.filter(hospital=user.hospital)
|
|
else:
|
|
queryset = queryset.none()
|
|
|
|
# Apply filters
|
|
status_filter = request.GET.get("status")
|
|
if status_filter:
|
|
queryset = queryset.filter(status=status_filter)
|
|
|
|
severity_filter = request.GET.get("severity")
|
|
if severity_filter:
|
|
queryset = queryset.filter(severity=severity_filter)
|
|
|
|
hospital_filter = request.GET.get("hospital")
|
|
if hospital_filter:
|
|
queryset = queryset.filter(hospital_id=hospital_filter)
|
|
|
|
# Search
|
|
search_query = request.GET.get("search")
|
|
if search_query:
|
|
queryset = queryset.filter(
|
|
Q(title__icontains=search_query)
|
|
| Q(description__icontains=search_query)
|
|
| Q(patient__mrn__icontains=search_query)
|
|
)
|
|
|
|
# Ordering
|
|
queryset = queryset.order_by("-created_at")
|
|
|
|
# Pagination
|
|
page_size = int(request.GET.get("page_size", 25))
|
|
paginator = Paginator(queryset, page_size)
|
|
page_number = request.GET.get("page", 1)
|
|
page_obj = paginator.get_page(page_number)
|
|
|
|
stats = {
|
|
"total": queryset.count(),
|
|
"open": queryset.filter(status="open").count(),
|
|
"in_progress": queryset.filter(status="in_progress").count(),
|
|
"resolved": queryset.filter(status="resolved").count(),
|
|
}
|
|
|
|
context = {
|
|
"page_obj": page_obj,
|
|
"complaints": page_obj.object_list,
|
|
"stats": stats,
|
|
"filters": request.GET,
|
|
}
|
|
|
|
return render(request, "callcenter/complaint_list.html", context)
|
|
|
|
|
|
# ============================================================================
|
|
# INQUIRY CREATION FOR CALL CENTER
|
|
# ============================================================================
|
|
|
|
|
|
@login_required
|
|
@require_http_methods(["GET", "POST"])
|
|
def create_inquiry(request):
|
|
"""
|
|
Create inquiry from call center interaction.
|
|
|
|
Call center staff can create inquiries for general questions/requests.
|
|
"""
|
|
if request.method == "POST":
|
|
try:
|
|
# Get form data
|
|
patient_id = request.POST.get("patient_id", None)
|
|
hospital_id = request.POST.get("hospital_id")
|
|
department_id = request.POST.get("department_id", None)
|
|
|
|
subject = request.POST.get("subject")
|
|
message = request.POST.get("message")
|
|
category = request.POST.get("category")
|
|
|
|
# Contact info (if no patient)
|
|
contact_name = request.POST.get("contact_name", "")
|
|
contact_phone = request.POST.get("contact_phone", "")
|
|
contact_email = request.POST.get("contact_email", "")
|
|
|
|
# Call center specific
|
|
caller_relationship = request.POST.get("caller_relationship", "patient")
|
|
|
|
# Validate required fields
|
|
if not all([hospital_id, subject, message, category]):
|
|
messages.error(request, "Please fill in all required fields.")
|
|
return redirect("callcenter:create_inquiry")
|
|
|
|
# If no patient, need contact info
|
|
if not patient_id and not contact_name:
|
|
messages.error(request, "Please provide either patient or contact information.")
|
|
return redirect("callcenter:create_inquiry")
|
|
|
|
# Create inquiry
|
|
inquiry = Inquiry.objects.create(
|
|
patient_id=patient_id if patient_id else None,
|
|
hospital_id=hospital_id,
|
|
department_id=department_id if department_id else None,
|
|
subject=subject,
|
|
message=message,
|
|
category=category,
|
|
contact_name=contact_name,
|
|
contact_phone=contact_phone,
|
|
contact_email=contact_email,
|
|
)
|
|
|
|
# Create call center interaction record
|
|
CallCenterInteraction.objects.create(
|
|
patient_id=patient_id if patient_id else None,
|
|
caller_name=contact_name,
|
|
caller_phone=contact_phone,
|
|
caller_relationship=caller_relationship,
|
|
hospital_id=hospital_id,
|
|
department_id=department_id if department_id else None,
|
|
agent=request.user,
|
|
call_type="inquiry",
|
|
subject=subject,
|
|
notes=message,
|
|
metadata={
|
|
"inquiry_id": str(inquiry.id),
|
|
"category": category,
|
|
},
|
|
)
|
|
|
|
# Log audit
|
|
AuditService.log_event(
|
|
event_type="inquiry_created",
|
|
description=f"Inquiry created via call center: {inquiry.subject}",
|
|
user=request.user,
|
|
content_object=inquiry,
|
|
metadata={
|
|
"category": inquiry.category,
|
|
"source": "call_center",
|
|
"contact_name": contact_name,
|
|
},
|
|
)
|
|
|
|
messages.success(request, f"Inquiry #{inquiry.id} created successfully.")
|
|
return redirect("callcenter:inquiry_success", pk=inquiry.id)
|
|
|
|
except Exception as e:
|
|
messages.error(request, f"Error creating inquiry: {str(e)}")
|
|
return redirect("callcenter:create_inquiry")
|
|
|
|
return render(request, "callcenter/inquiry_form.html", {})
|
|
|
|
|
|
@login_required
|
|
def inquiry_success(request, pk):
|
|
"""Success page after creating inquiry"""
|
|
inquiry = get_object_or_404(Inquiry, pk=pk)
|
|
|
|
context = {
|
|
"inquiry": inquiry,
|
|
}
|
|
|
|
return render(request, "callcenter/inquiry_success.html", context)
|
|
|
|
|
|
@login_required
|
|
def inquiry_list(request):
|
|
"""List inquiries created by call center"""
|
|
queryset = Inquiry.objects.select_related("patient", "hospital", "department", "assigned_to", "responded_by")
|
|
|
|
# Apply RBAC filters
|
|
user = request.user
|
|
if user.is_px_admin():
|
|
pass
|
|
elif user.hospital:
|
|
queryset = queryset.filter(hospital=user.hospital)
|
|
else:
|
|
queryset = queryset.none()
|
|
|
|
# Apply filters
|
|
status_filter = request.GET.get("status")
|
|
if status_filter:
|
|
queryset = queryset.filter(status=status_filter)
|
|
|
|
category_filter = request.GET.get("category")
|
|
if category_filter:
|
|
queryset = queryset.filter(category=category_filter)
|
|
|
|
hospital_filter = request.GET.get("hospital")
|
|
if hospital_filter:
|
|
queryset = queryset.filter(hospital_id=hospital_filter)
|
|
|
|
# Search
|
|
search_query = request.GET.get("search")
|
|
if search_query:
|
|
queryset = queryset.filter(
|
|
Q(subject__icontains=search_query)
|
|
| Q(message__icontains=search_query)
|
|
| Q(contact_name__icontains=search_query)
|
|
)
|
|
|
|
# Ordering
|
|
queryset = queryset.order_by("-created_at")
|
|
|
|
# Pagination
|
|
page_size = int(request.GET.get("page_size", 25))
|
|
paginator = Paginator(queryset, page_size)
|
|
page_number = request.GET.get("page", 1)
|
|
page_obj = paginator.get_page(page_number)
|
|
|
|
stats = {
|
|
"total": queryset.count(),
|
|
"open": queryset.filter(status="open").count(),
|
|
"in_progress": queryset.filter(status="in_progress").count(),
|
|
"resolved": queryset.filter(status="resolved").count(),
|
|
}
|
|
|
|
context = {
|
|
"page_obj": page_obj,
|
|
"inquiries": page_obj.object_list,
|
|
"stats": stats,
|
|
"filters": request.GET,
|
|
}
|
|
|
|
return render(request, "callcenter/inquiry_list.html", context)
|
|
|
|
|
|
# ============================================================================
|
|
# AJAX/API HELPERS
|
|
# ============================================================================
|
|
|
|
|
|
@login_required
|
|
def get_departments_by_hospital(request):
|
|
"""Get departments for a hospital (AJAX)"""
|
|
hospital_id = request.GET.get("hospital_id")
|
|
if not hospital_id:
|
|
return JsonResponse({"departments": []})
|
|
|
|
departments = Department.objects.filter(hospital_id=hospital_id, status="active").values("id", "name", "name_ar")
|
|
|
|
return JsonResponse({"departments": list(departments)})
|
|
|
|
|
|
@login_required
|
|
def get_staff_by_hospital(request):
|
|
"""Get staff for a hospital (AJAX)"""
|
|
hospital_id = request.GET.get("hospital_id")
|
|
if not hospital_id:
|
|
return JsonResponse({"staff": []})
|
|
|
|
staff_members = Staff.objects.filter(hospital_id=hospital_id, status="active").values(
|
|
"id", "first_name", "last_name", "staff_type", "specialization"
|
|
)
|
|
|
|
# Format staff names
|
|
staff_list = [
|
|
{
|
|
"id": str(s["id"]),
|
|
"name": f"Dr. {s['first_name']} {s['last_name']}"
|
|
if s["staff_type"] == "physician"
|
|
else f"{s['first_name']} {s['last_name']}",
|
|
"staff_type": s["staff_type"],
|
|
"specialization": s["specialization"],
|
|
}
|
|
for s in staff_members
|
|
]
|
|
|
|
return JsonResponse({"staff": staff_list})
|
|
|
|
|
|
@login_required
|
|
def search_patients(request):
|
|
"""Search patients by MRN or name (AJAX)"""
|
|
query = request.GET.get("q", "")
|
|
hospital_id = request.GET.get("hospital_id", None)
|
|
|
|
if len(query) < 2:
|
|
return JsonResponse({"patients": []})
|
|
|
|
patients = Patient.objects.filter(
|
|
Q(mrn__icontains=query)
|
|
| Q(first_name__icontains=query)
|
|
| Q(last_name__icontains=query)
|
|
| Q(national_id__icontains=query)
|
|
| Q(phone__icontains=query)
|
|
)
|
|
|
|
if hospital_id:
|
|
patients = patients.filter(hospital_id=hospital_id)
|
|
|
|
patients = patients[:20]
|
|
|
|
results = [
|
|
{
|
|
"id": str(p.id),
|
|
"mrn": p.mrn,
|
|
"name": p.get_full_name(),
|
|
"phone": p.phone,
|
|
"email": p.email,
|
|
"national_id": p.national_id,
|
|
}
|
|
for p in patients
|
|
]
|
|
|
|
return JsonResponse({"patients": results})
|
|
|
|
|
|
# ============================================================================
|
|
# CALL RECORDS (CSV IMPORT) VIEWS
|
|
# ============================================================================
|
|
|
|
|
|
@login_required
|
|
def call_records_list(request):
|
|
"""
|
|
Call records list view with stats cards.
|
|
|
|
Shows all imported call records with filtering and search.
|
|
"""
|
|
queryset = CallRecord.objects.select_related("hospital")
|
|
|
|
# Apply RBAC filters
|
|
user = request.user
|
|
if user.is_px_admin():
|
|
pass
|
|
elif user.hospital:
|
|
queryset = queryset.filter(hospital=user.hospital)
|
|
else:
|
|
queryset = queryset.none()
|
|
|
|
# Apply filters
|
|
evaluated_filter = request.GET.get("evaluated")
|
|
if evaluated_filter:
|
|
queryset = queryset.filter(evaluated=evaluated_filter == "true")
|
|
|
|
call_type_filter = request.GET.get("call_type")
|
|
if call_type_filter == "inbound":
|
|
queryset = queryset.filter(Q(inbound_id__isnull=False) | Q(inbound_name__isnull=False))
|
|
queryset = queryset.exclude(Q(inbound_id="") & Q(inbound_name=""))
|
|
elif call_type_filter == "outbound":
|
|
queryset = queryset.filter(Q(outbound_id__isnull=False) | Q(outbound_name__isnull=False))
|
|
queryset = queryset.exclude(Q(outbound_id="") & Q(outbound_name=""))
|
|
|
|
department_filter = request.GET.get("department")
|
|
if department_filter:
|
|
queryset = queryset.filter(department__icontains=department_filter)
|
|
|
|
hospital_filter = request.GET.get("hospital")
|
|
if hospital_filter:
|
|
queryset = queryset.filter(hospital_id=hospital_filter)
|
|
|
|
# Search
|
|
search_query = request.GET.get("search")
|
|
if search_query:
|
|
queryset = queryset.filter(
|
|
Q(first_name__icontains=search_query)
|
|
| Q(last_name__icontains=search_query)
|
|
| Q(department__icontains=search_query)
|
|
| Q(extension__icontains=search_query)
|
|
| Q(inbound_name__icontains=search_query)
|
|
| Q(outbound_name__icontains=search_query)
|
|
)
|
|
|
|
# Date range
|
|
date_from = request.GET.get("date_from")
|
|
if date_from:
|
|
queryset = queryset.filter(call_start__gte=date_from)
|
|
|
|
date_to = request.GET.get("date_to")
|
|
if date_to:
|
|
queryset = queryset.filter(call_start__lte=date_to)
|
|
|
|
# Ordering
|
|
queryset = queryset.order_by("-call_start")
|
|
|
|
# Pagination
|
|
page_size = int(request.GET.get("page_size", 25))
|
|
paginator = Paginator(queryset, page_size)
|
|
page_number = request.GET.get("page", 1)
|
|
page_obj = paginator.get_page(page_number)
|
|
|
|
stats = {
|
|
"total_calls": queryset.count(),
|
|
"total_duration": sum((r.call_duration_seconds or 0) for r in queryset),
|
|
"inbound_calls": queryset.filter(Q(inbound_id__isnull=False) | Q(inbound_name__isnull=False))
|
|
.exclude(Q(inbound_id="") & Q(inbound_name=""))
|
|
.count(),
|
|
"outbound_calls": queryset.filter(Q(outbound_id__isnull=False) | Q(outbound_name__isnull=False))
|
|
.exclude(Q(outbound_id="") & Q(outbound_name=""))
|
|
.count(),
|
|
"evaluated_calls": queryset.filter(evaluated=True).count(),
|
|
"not_evaluated_calls": queryset.filter(evaluated=False).count(),
|
|
"avg_duration": queryset.filter(call_duration_seconds__isnull=False).aggregate(
|
|
avg=Avg("call_duration_seconds")
|
|
)["avg"]
|
|
or 0,
|
|
}
|
|
|
|
# Format duration for display
|
|
def format_duration(seconds):
|
|
if not seconds:
|
|
return "0:00"
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
secs = int(seconds % 60)
|
|
if hours > 0:
|
|
return f"{hours}:{minutes:02d}:{secs:02d}"
|
|
return f"{minutes}:{secs:02d}"
|
|
|
|
stats["total_duration_formatted"] = format_duration(stats["total_duration"])
|
|
stats["avg_duration_formatted"] = format_duration(stats["avg_duration"])
|
|
|
|
# Get unique departments for filter dropdown
|
|
departments = CallRecord.objects.values_list("department", flat=True).distinct()
|
|
departments = [d for d in departments if d]
|
|
|
|
context = {
|
|
"page_obj": page_obj,
|
|
"call_records": page_obj.object_list,
|
|
"stats": stats,
|
|
"departments": departments,
|
|
"filters": request.GET,
|
|
}
|
|
|
|
return render(request, "callcenter/call_records_list.html", context)
|
|
|
|
|
|
@login_required
|
|
@require_http_methods(["GET", "POST"])
|
|
def import_call_records(request):
|
|
"""
|
|
Import call records from CSV file.
|
|
|
|
CSV must have the same headers as the export format.
|
|
"""
|
|
if request.method == "POST":
|
|
try:
|
|
csv_file = request.FILES.get("csv_file")
|
|
if not csv_file:
|
|
messages.error(request, "Please select a CSV file to upload.")
|
|
return redirect("callcenter:import_call_records")
|
|
|
|
# Check file extension
|
|
if not csv_file.name.endswith(".csv"):
|
|
messages.error(request, "Please upload a valid CSV file.")
|
|
return redirect("callcenter:import_call_records")
|
|
|
|
import csv
|
|
from datetime import datetime
|
|
import hashlib
|
|
import codecs
|
|
|
|
# Decode the file and remove BOM if present
|
|
decoded_file = csv_file.read().decode("utf-8-sig") # utf-8-sig removes BOM
|
|
reader = csv.DictReader(decoded_file.splitlines())
|
|
|
|
# Required headers
|
|
required_headers = [
|
|
"Media ID",
|
|
"Media Type",
|
|
"Call Start",
|
|
"First Name",
|
|
"Last Name",
|
|
"Extension",
|
|
"Department",
|
|
"Call End",
|
|
"Length",
|
|
"File Name",
|
|
]
|
|
|
|
# Validate headers
|
|
if reader.fieldnames is None:
|
|
messages.error(request, "Invalid CSV file format.")
|
|
return redirect("callcenter:import_call_records")
|
|
|
|
# Clean headers (remove any remaining BOM or whitespace)
|
|
cleaned_fieldnames = [f.strip() if f else f for f in reader.fieldnames]
|
|
reader.fieldnames = cleaned_fieldnames
|
|
|
|
missing_headers = [h for h in required_headers if h not in reader.fieldnames]
|
|
if missing_headers:
|
|
messages.error(request, f"Missing required headers: {', '.join(missing_headers)}")
|
|
return redirect("callcenter:import_call_records")
|
|
|
|
# Parse CSV and create records
|
|
imported_count = 0
|
|
skipped_count = 0
|
|
error_count = 0
|
|
|
|
for row_num, row in enumerate(reader, start=2): # Start at 2 (header is row 1)
|
|
try:
|
|
# Parse Media ID
|
|
media_id_str = row.get("Media ID", "").strip()
|
|
if not media_id_str:
|
|
skipped_count += 1
|
|
continue
|
|
|
|
try:
|
|
media_id = UUID(media_id_str)
|
|
except ValueError:
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Check if record already exists
|
|
if CallRecord.objects.filter(media_id=media_id).exists():
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Parse call start time
|
|
call_start_str = row.get("Call Start", "").strip()
|
|
if not call_start_str:
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Try multiple datetime formats
|
|
call_start = None
|
|
for fmt in [
|
|
"%m/%d/%y %H:%M", # 10/30/25 19:57
|
|
"%m/%d/%Y %H:%M", # 10/30/2025 19:57
|
|
"%m/%d/%y %I:%M:%S %p", # 10/30/25 7:57:48 PM
|
|
"%m/%d/%Y %I:%M:%S %p", # 10/30/2025 7:57:48 PM
|
|
"%m/%d/%y %I:%M %p", # 10/30/25 7:57 PM
|
|
"%m/%d/%Y %I:%M %p", # 10/30/2025 7:57 PM
|
|
]:
|
|
try:
|
|
call_start = datetime.strptime(call_start_str, fmt)
|
|
break
|
|
except ValueError:
|
|
continue
|
|
|
|
if not call_start:
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Parse call end time
|
|
call_end = None
|
|
call_end_str = row.get("Call End", "").strip()
|
|
if call_end_str:
|
|
for fmt in [
|
|
"%m/%d/%y %H:%M",
|
|
"%m/%d/%Y %H:%M",
|
|
"%m/%d/%y %I:%M:%S %p",
|
|
"%m/%d/%Y %I:%M:%S %p",
|
|
"%m/%d/%y %I:%M %p",
|
|
"%m/%d/%Y %I:%M %p",
|
|
]:
|
|
try:
|
|
call_end = datetime.strptime(call_end_str, fmt)
|
|
break
|
|
except ValueError:
|
|
continue
|
|
|
|
# Parse call duration
|
|
call_duration_seconds = None
|
|
length_str = row.get("Length", "").strip()
|
|
if length_str:
|
|
try:
|
|
parts = length_str.split(":")
|
|
if len(parts) == 3:
|
|
# HH:MM:SS format
|
|
call_duration_seconds = int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
|
|
elif len(parts) == 2:
|
|
# M:SS or MM:SS format
|
|
call_duration_seconds = int(parts[0]) * 60 + int(parts[1])
|
|
elif len(parts) == 1:
|
|
# Just seconds
|
|
call_duration_seconds = int(parts[0])
|
|
except ValueError:
|
|
pass
|
|
|
|
# Get or create hospital
|
|
hospital = None
|
|
if request.user.hospital:
|
|
hospital = request.user.hospital
|
|
|
|
# Create the record
|
|
CallRecord.objects.create(
|
|
media_id=media_id,
|
|
media_type=row.get("Media Type", "Calls").strip(),
|
|
chain=row.get("Chain", "").strip(),
|
|
evaluated=row.get("Evaluated", "").strip().lower() == "true",
|
|
call_start=call_start,
|
|
call_end=call_end,
|
|
call_length=length_str,
|
|
call_duration_seconds=call_duration_seconds,
|
|
first_name=row.get("First Name", "").strip(),
|
|
last_name=row.get("Last Name", "").strip(),
|
|
extension=row.get("Extension", "").strip(),
|
|
department=row.get("Department", "").strip(),
|
|
location=row.get("Location", "").strip(),
|
|
inbound_id=row.get("Inbound ID", "").strip(),
|
|
inbound_name=row.get("Inbound Name", "").strip(),
|
|
dnis=row.get("DNIS", "").strip(),
|
|
outbound_id=row.get("Outbound ID", "").strip(),
|
|
outbound_name=row.get("Outbound Name", "").strip(),
|
|
flag_name=row.get("Flag Name", "").strip(),
|
|
flag_value=row.get("Flag Value", "").strip(),
|
|
file_location=row.get("File Location", "").strip(),
|
|
file_name=row.get("File Name", "").strip(),
|
|
file_hash=row.get("FileHash", "").strip(),
|
|
external_ref=row.get("External Ref", "").strip(),
|
|
transfer_from=row.get("Transfer From", "").strip(),
|
|
recorded_by=row.get("Recorded By", "").strip(),
|
|
time_zone=row.get("Time Zone", "03:00:00").strip(),
|
|
recording_server_name=row.get("Recording Server Name", "").strip(),
|
|
hospital=hospital,
|
|
)
|
|
|
|
imported_count += 1
|
|
|
|
except Exception as e:
|
|
error_count += 1
|
|
continue
|
|
|
|
messages.success(
|
|
request,
|
|
f"Import completed: {imported_count} records imported, {skipped_count} skipped (duplicates/invalid), {error_count} errors.",
|
|
)
|
|
return redirect("callcenter:call_records_list")
|
|
|
|
except Exception as e:
|
|
messages.error(request, f"Error importing CSV: {str(e)}")
|
|
return redirect("callcenter:import_call_records")
|
|
|
|
# GET request - show upload form
|
|
context = {
|
|
"sample_headers": [
|
|
"Media ID",
|
|
"Media Type",
|
|
"Chain",
|
|
"Evaluated",
|
|
"Call Start",
|
|
"First Name",
|
|
"Last Name",
|
|
"Extension",
|
|
"Department",
|
|
"Location",
|
|
"Inbound ID",
|
|
"Inbound Name",
|
|
"DNIS",
|
|
"Outbound ID",
|
|
"Outbound Name",
|
|
"Length",
|
|
"Call End",
|
|
"Flag Name",
|
|
"Flag Value",
|
|
"File Location",
|
|
"File Name",
|
|
"External Ref",
|
|
"FileHash",
|
|
"Transfer From",
|
|
"Recorded By",
|
|
"Time Zone",
|
|
"Recording Server Name",
|
|
],
|
|
}
|
|
|
|
return render(request, "callcenter/import_call_records.html", context)
|
|
|
|
|
|
@login_required
|
|
def export_call_records_template(request):
|
|
"""
|
|
Export a sample CSV template for importing call records.
|
|
"""
|
|
import csv
|
|
from django.http import HttpResponse
|
|
|
|
response = HttpResponse(content_type="text/csv")
|
|
response["Content-Disposition"] = 'attachment; filename="call_records_template.csv"'
|
|
|
|
writer = csv.writer(response)
|
|
writer.writerow(
|
|
[
|
|
"Media ID",
|
|
"Media Type",
|
|
"Chain",
|
|
"Evaluated",
|
|
"Call Start",
|
|
"First Name",
|
|
"Last Name",
|
|
"Extension",
|
|
"Department",
|
|
"Location",
|
|
"Inbound ID",
|
|
"Inbound Name",
|
|
"DNIS",
|
|
"Outbound ID",
|
|
"Outbound Name",
|
|
"Length",
|
|
"Call End",
|
|
"Flag Name",
|
|
"Flag Value",
|
|
"File Location",
|
|
"File Name",
|
|
"External Ref",
|
|
"FileHash",
|
|
"Transfer From",
|
|
"Recorded By",
|
|
"Time Zone",
|
|
"Recording Server Name",
|
|
]
|
|
)
|
|
# Add one sample row
|
|
writer.writerow(
|
|
[
|
|
"aade2430-2eb0-4e05-93eb-9567e2be07ae",
|
|
"Calls",
|
|
"",
|
|
"False",
|
|
"10/30/2025 7:57:48 PM",
|
|
"Patient",
|
|
"Relation",
|
|
"1379",
|
|
"Patient Relation",
|
|
"",
|
|
"597979769",
|
|
"",
|
|
"",
|
|
"",
|
|
"",
|
|
"00:01:11",
|
|
"10/30/2025 7:59:00 PM",
|
|
"",
|
|
"",
|
|
"E:\\Calls",
|
|
"2025-10-30\\x1379 19.57.48.467 10-30-2025.mp3",
|
|
"12946311",
|
|
"",
|
|
"0",
|
|
"",
|
|
"03:00:00",
|
|
"ahnuzdcnqms02",
|
|
]
|
|
)
|
|
|
|
return response
|