HH/apps/px_sources/ui_views.py
ismail c5f76b3855
Some checks are pending
Build and Push Docker Image / build (push) Waiting to run
updates
2026-05-11 14:45:30 +03:00

1655 lines
63 KiB
Python

"""
PX Sources UI views - HTML template rendering
"""
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.db import models
from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.utils.translation import gettext_lazy as _
from django.views.decorators.http import require_http_methods
from .models import PXSource, SourceUser, SourceComplaint, CommunicationRequest
from .decorators import source_user_required, block_source_user
from apps.accounts.models import User
from apps.complaints.models import Complaint, Inquiry
def check_source_permission(user):
"""Check if user has permission to manage sources"""
return user.is_px_admin() or user.is_hospital_admin()
@login_required
@block_source_user
def source_list(request):
"""
List all PX sources
"""
sources = PXSource.objects.all()
# Filter by active status
is_active = request.GET.get("is_active")
if is_active:
sources = sources.filter(is_active=is_active == "true")
# Search
search = request.GET.get("search")
if search:
sources = sources.filter(
models.Q(name_en__icontains=search)
| models.Q(name_ar__icontains=search)
| models.Q(description__icontains=search)
| models.Q(code__icontains=search)
)
sources = sources.order_by("name_en")
context = {
"sources": sources,
"is_active": is_active,
"search": search,
}
return render(request, "px_sources/source_list.html", context)
@login_required
@block_source_user
def source_detail(request, pk):
"""
View source details
"""
source = get_object_or_404(PXSource, pk=pk)
# Filter usage records by hospital based on selected context
usage_records_qs = source.usage_records.select_related("content_type", "hospital", "user")
if request.user.is_hospital_admin() and request.user.hospital:
usage_records_qs = usage_records_qs.filter(hospital=request.user.hospital)
elif request.user.is_px_admin() and hasattr(request, "tenant_hospital") and request.tenant_hospital:
usage_records_qs = usage_records_qs.filter(hospital=request.tenant_hospital)
usage_records = usage_records_qs.order_by("-created_at")[:20]
# Get source users for this source - filter by hospital based on selected context
source_users_qs = source.source_users.select_related("user")
if request.user.is_hospital_admin() and request.user.hospital:
source_users_qs = source_users_qs.filter(hospital=request.user.hospital)
elif request.user.is_px_admin() and hasattr(request, "tenant_hospital") and request.tenant_hospital:
source_users_qs = source_users_qs.filter(hospital=request.tenant_hospital)
source_users = source_users_qs.order_by("-created_at")
# Get available users (not already assigned to this source) - filter by hospital
assigned_user_ids = source_users.values_list("user_id", flat=True)
available_users_qs = User.objects.exclude(id__in=assigned_user_ids)
if request.user.is_hospital_admin() and request.user.hospital:
available_users_qs = available_users_qs.filter(hospital=request.user.hospital)
elif request.user.is_px_admin() and hasattr(request, "tenant_hospital") and request.tenant_hospital:
available_users_qs = available_users_qs.filter(hospital=request.tenant_hospital)
available_users = available_users_qs.order_by("email")
# Get usage stats - filtered by hospital based on selected context
usage_stats_queryset = source.usage_records.all()
if request.user.is_hospital_admin() and request.user.hospital:
usage_stats_queryset = usage_stats_queryset.filter(hospital=request.user.hospital)
elif request.user.is_px_admin() and hasattr(request, "tenant_hospital") and request.tenant_hospital:
usage_stats_queryset = usage_stats_queryset.filter(hospital=request.tenant_hospital)
# Calculate stats from filtered queryset
from django.utils import timezone
from datetime import timedelta
cutoff = timezone.now() - timedelta(days=30)
recent_usage = usage_stats_queryset.filter(created_at__gte=cutoff)
from django.contrib.contenttypes.models import ContentType
complaint_ct = ContentType.objects.get(app_label="complaints", model="complaint")
inquiry_ct = ContentType.objects.get(app_label="complaints", model="inquiry")
usage_stats = {
"total": usage_stats_queryset.count(),
"recent": recent_usage.count(),
"complaints": recent_usage.filter(content_type=complaint_ct).count(),
"inquiries": recent_usage.filter(content_type=inquiry_ct).count(),
}
source_complaints = source.complaints.select_related("hospital", "department").order_by("-created_at")[:50]
source_inquiries = source.inquiries.select_related("hospital", "department").order_by("-created_at")[:50]
ext_source_complaints = source.source_complaints.select_related("system_complaint", "created_by").order_by("-created_at")[:50]
context = {
"source": source,
"usage_records": usage_records,
"source_users": source_users,
"available_users": available_users,
"usage_stats": usage_stats,
"source_complaints": source_complaints,
"source_inquiries": source_inquiries,
"ext_source_complaints": ext_source_complaints,
"complaints_count": source.complaints.count(),
"inquiries_count": source.inquiries.count(),
}
return render(request, "px_sources/source_detail.html", context)
@login_required
@block_source_user
def source_create(request):
"""
Create a new PX source
"""
if not check_source_permission(request.user):
messages.error(request, _("You don't have permission to create sources."))
return redirect("px_sources:source_list")
if request.method == "POST":
try:
source = PXSource(
code=request.POST.get("code", ""),
name_en=request.POST.get("name_en"),
name_ar=request.POST.get("name_ar", ""),
description=request.POST.get("description", ""),
source_type=request.POST.get("source_type", "internal"),
contact_email=request.POST.get("contact_email", ""),
contact_phone=request.POST.get("contact_phone", ""),
is_active=request.POST.get("is_active") == "on",
)
source.save()
messages.success(request, _("Source created successfully!"))
return redirect("px_sources:source_detail", pk=source.pk)
except Exception as e:
messages.error(request, _("Error creating source: {}").format(str(e)))
context = {
"source_types": PXSource.SOURCE_TYPE_CHOICES,
}
return render(request, "px_sources/source_form.html", context)
@login_required
@block_source_user
def source_edit(request, pk):
"""
Edit an existing PX source
"""
if not check_source_permission(request.user):
messages.error(request, _("You don't have permission to edit sources."))
return redirect("px_sources:source_detail", pk=pk)
source = get_object_or_404(PXSource, pk=pk)
if request.method == "POST":
try:
source.code = request.POST.get("code", source.code)
source.name_en = request.POST.get("name_en")
source.name_ar = request.POST.get("name_ar", "")
source.description = request.POST.get("description", "")
source.source_type = request.POST.get("source_type", "internal")
source.contact_email = request.POST.get("contact_email", "")
source.contact_phone = request.POST.get("contact_phone", "")
source.is_active = request.POST.get("is_active") == "on"
source.save()
messages.success(request, _("Source updated successfully!"))
return redirect("px_sources:source_detail", pk=source.pk)
except Exception as e:
messages.error(request, _("Error updating source: {}").format(str(e)))
context = {
"source": source,
"source_types": PXSource.SOURCE_TYPE_CHOICES,
}
return render(request, "px_sources/source_form.html", context)
@login_required
@block_source_user
def source_delete(request, pk):
"""
Delete a PX source
"""
if not request.user.is_px_admin():
messages.error(request, _("You don't have permission to delete sources."))
return redirect("px_sources:source_detail", pk=pk)
source = get_object_or_404(PXSource, pk=pk)
if request.method == "POST":
source_name = source.name_en
source.delete()
messages.success(request, _("Source '{}' deleted successfully!").format(source_name))
return redirect("px_sources:source_list")
context = {
"source": source,
}
return render(request, "px_sources/source_confirm_delete.html", context)
@login_required
@block_source_user
def source_toggle_status(request, pk):
"""
Toggle source active status (supports both AJAX and regular form submission)
"""
if not (request.user.is_px_admin() or request.user.is_hospital_admin()):
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
return JsonResponse({"error": "Permission denied"}, status=403)
messages.error(request, _("You don't have permission to toggle source status."))
return redirect("px_sources:source_list")
if request.method != "POST":
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
return JsonResponse({"error": "Method not allowed"}, status=405)
messages.error(request, _("Invalid request method."))
return redirect("px_sources:source_list")
source = get_object_or_404(PXSource, pk=pk)
source.is_active = not source.is_active
source.save()
status_text = _("activated") if source.is_active else _("deactivated")
success_message = _("Source '{}' {} successfully.").format(source.name_en, status_text)
# Handle AJAX request
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
return JsonResponse({"success": True, "is_active": source.is_active, "message": str(success_message)})
# Handle regular form submission
messages.success(request, success_message)
return redirect("px_sources:source_list")
@login_required
@block_source_user
def ajax_search_sources(request):
"""
AJAX endpoint for searching sources
"""
term = request.GET.get("term", "")
queryset = PXSource.objects.filter(is_active=True)
if term:
queryset = queryset.filter(
models.Q(name_en__icontains=term)
| models.Q(name_ar__icontains=term)
| models.Q(description__icontains=term)
)
sources = queryset.order_by("name_en")[:20]
results = [
{
"id": str(source.id),
"text": source.name_en,
"name_en": source.name_en,
"name_ar": source.name_ar,
}
for source in sources
]
return JsonResponse({"results": results})
@login_required
@source_user_required
def source_user_dashboard(request):
"""
Dashboard for source users.
Shows:
- User's assigned source
- Statistics (complaints, inquiries from their source)
- Create buttons for complaints/inquiries
- Tables of recent complaints/inquiries from their source
"""
# Get source user profile
source_user = SourceUser.get_active_source_user(request.user)
if not source_user:
messages.error(request, _("You are not assigned as a source user. Please contact your administrator."))
return redirect("/")
# Get source
source = source_user.source
# Get complaints from this source (recent 5) - filter by hospital
from apps.complaints.models import Complaint
complaints_queryset = Complaint.objects.filter(source=source)
if source_user.hospital:
complaints_queryset = complaints_queryset.filter(hospital=source_user.hospital)
complaints = complaints_queryset.select_related("patient", "hospital", "assigned_to").order_by("-created_at")[:5]
# Get inquiries from this source (recent 5) - filter by hospital
from apps.complaints.models import Inquiry
inquiries_queryset = Inquiry.objects.filter(source=source)
if source_user.hospital:
inquiries_queryset = inquiries_queryset.filter(hospital=source_user.hospital)
inquiries = inquiries_queryset.select_related("patient", "hospital", "assigned_to").order_by("-created_at")[:5]
# Calculate statistics - filtered by hospital
total_complaints = complaints_queryset.count()
total_inquiries = inquiries_queryset.count()
open_complaints = complaints_queryset.filter(status="open").count()
open_inquiries = inquiries_queryset.filter(status="open").count()
context = {
"source_user": source_user,
"source": source,
"complaints": complaints,
"inquiries": inquiries,
"total_complaints": total_complaints,
"total_inquiries": total_inquiries,
"open_complaints": open_complaints,
"open_inquiries": open_inquiries,
"can_create_complaints": source_user.can_create_complaints,
"can_create_inquiries": source_user.can_create_inquiries,
"can_create_observations": source_user.can_create_observations,
"can_create_suggestions": source_user.can_create_suggestions,
}
return render(request, "px_sources/source_user_dashboard.html", context)
@login_required
@source_user_required
def ajax_source_choices(request):
"""
AJAX endpoint for getting source choices for dropdowns
"""
queryset = PXSource.get_active_sources()
choices = [
{
"id": str(source.id),
"name_en": source.name_en,
"name_ar": source.name_ar,
}
for source in queryset
]
return JsonResponse({"choices": choices})
@login_required
@block_source_user
def source_user_create(request, pk):
"""
Create a new source user for a specific PX source.
Only PX admins can create source users.
Allows selecting an existing user or creating a new user.
"""
# if not request.user.is_px_admin():
# messages.error(request, _("You don't have permission to create source users."))
# return redirect('px_sources:source_detail', pk=pk)
source = get_object_or_404(PXSource, pk=pk)
if request.method == "POST":
creation_mode = request.POST.get("creation_mode", "existing") # 'existing' or 'new'
try:
if creation_mode == "existing":
# Select from existing users
user_id = request.POST.get("user")
if not user_id:
messages.error(request, _("Please select a user."))
return redirect("px_sources:source_user_create", pk=pk)
user = get_object_or_404(User, pk=user_id)
# Check if user already has a source user profile
if SourceUser.objects.filter(user=user).exists():
messages.error(request, _("User already has a source profile. A user can only manage one source."))
return redirect("px_sources:source_detail", pk=pk)
else: # creation_mode == 'new'
# Create a new user
email = request.POST.get("new_email", "").strip().lower()
first_name = request.POST.get("new_first_name", "").strip()
last_name = request.POST.get("new_last_name", "").strip()
password = request.POST.get("new_password", "")
confirm_password = request.POST.get("new_password_confirm", "")
# Validation
errors = []
if not email:
errors.append(_("Email is required."))
elif User.objects.filter(email=email).exists():
errors.append(_("A user with this email already exists."))
if not first_name:
errors.append(_("First name is required."))
if not last_name:
errors.append(_("Last name is required."))
if not password:
errors.append(_("Password is required."))
elif len(password) < 8:
errors.append(_("Password must be at least 8 characters."))
if password != confirm_password:
errors.append(_("Passwords do not match."))
if errors:
for error in errors:
messages.error(request, error)
return redirect("px_sources:source_user_create", pk=pk)
# Create the user
user = User.objects.create_user(
email=email,
password=password,
first_name=first_name,
last_name=last_name,
phone=request.POST.get("new_phone", "").strip(),
employee_id=request.POST.get("new_employee_id", "").strip(),
)
from django.contrib.auth.models import Group
try:
px_source_group = Group.objects.get(name="PX Source User")
user.groups.add(px_source_group)
except Group.DoesNotExist:
try:
px_admin_group = Group.objects.get(name="PX Admin")
user.groups.add(px_admin_group)
except Group.DoesNotExist:
pass
messages.success(request, _("New user created successfully!"))
# Get hospital from request (for PX Admins with tenant_hospital) or user's hospital
hospital = None
if request.user.is_px_admin() and hasattr(request, "tenant_hospital") and request.tenant_hospital:
hospital = request.tenant_hospital
elif request.user.hospital:
hospital = request.user.hospital
elif user.hospital:
hospital = user.hospital
# Create source user
source_user = SourceUser.objects.create(
user=user,
source=source,
hospital=hospital,
is_active=request.POST.get("is_active") == "on",
can_create_complaints=request.POST.get("can_create_complaints") == "on",
can_create_inquiries=request.POST.get("can_create_inquiries") == "on",
can_create_observations=request.POST.get("can_create_observations") == "on",
can_create_suggestions=request.POST.get("can_create_suggestions") == "on",
)
messages.success(request, _("Source user created successfully!"))
return redirect("px_sources:source_detail", pk=pk)
except Exception as e:
messages.error(request, _("Error creating source user: {}").format(str(e)))
context = {
"source": source,
"available_users": User.objects.exclude(id__in=source.source_users.values_list("user_id", flat=True)).order_by(
"email"
),
"creation_mode": "new", # Default to new user creation
}
return render(request, "px_sources/source_user_form.html", context)
@login_required
@block_source_user
def source_user_edit(request, pk, user_pk):
"""
Edit an existing source user.
Only PX admins can edit source users.
"""
if not request.user.is_px_admin():
messages.error(request, _("You don't have permission to edit source users."))
return redirect("px_sources:source_detail", pk=pk)
source = get_object_or_404(PXSource, pk=pk)
source_user = get_object_or_404(SourceUser, pk=user_pk, source=source)
if request.method == "POST":
try:
source_user.is_active = request.POST.get("is_active") == "on"
source_user.can_create_complaints = request.POST.get("can_create_complaints") == "on"
source_user.can_create_inquiries = request.POST.get("can_create_inquiries") == "on"
source_user.can_create_observations = request.POST.get("can_create_observations") == "on"
source_user.can_create_suggestions = request.POST.get("can_create_suggestions") == "on"
source_user.save()
messages.success(request, _("Source user updated successfully!"))
return redirect("px_sources:source_detail", pk=pk)
except Exception as e:
messages.error(request, _("Error updating source user: {}").format(str(e)))
context = {
"source": source,
"source_user": source_user,
}
return render(request, "px_sources/source_user_form.html", context)
@login_required
@block_source_user
def source_user_delete(request, pk, user_pk):
"""
Delete a source user.
Only PX admins can delete source users.
"""
if not request.user.is_px_admin():
messages.error(request, _("You don't have permission to delete source users."))
return redirect("px_sources:source_detail", pk=pk)
source = get_object_or_404(PXSource, pk=pk)
source_user = get_object_or_404(SourceUser, pk=user_pk, source=source)
if request.method == "POST":
user_name = source_user.user.get_full_name() or source_user.user.email
source_user.delete()
messages.success(request, _("Source user '{}' deleted successfully!").format(user_name))
return redirect("px_sources:source_detail", pk=pk)
context = {
"source": source,
"source_user": source_user,
}
return render(request, "px_sources/source_user_confirm_delete.html", context)
@login_required
@block_source_user
def source_user_toggle_status(request, pk, user_pk):
"""
Toggle source user active status (AJAX).
Only PX admins can toggle status.
"""
if not request.user.is_px_admin():
return JsonResponse({"error": "Permission denied"}, status=403)
if request.method != "POST":
return JsonResponse({"error": "Method not allowed"}, status=405)
source = get_object_or_404(PXSource, pk=pk)
source_user = get_object_or_404(SourceUser, pk=user_pk, source=source)
source_user.is_active = not source_user.is_active
source_user.save()
return JsonResponse(
{
"success": True,
"is_active": source_user.is_active,
"message": "Source user {} successfully".format("activated" if source_user.is_active else "deactivated"),
}
)
@login_required
@source_user_required
def source_user_complaint_list(request):
"""
List complaints for the current Source User.
Shows only complaints from their assigned source.
"""
# Get source user profile
source_user = SourceUser.get_active_source_user(request.user)
if not source_user:
messages.error(request, _("You are not assigned as a source user. Please contact your administrator."))
return redirect("/")
source = source_user.source
# Get complaints from this source - filter by hospital for data isolation
from apps.complaints.models import Complaint, Inquiry
from django.db.models import Q
complaints_queryset = Complaint.objects.filter(source=source)
# Apply hospital filter for data isolation
if source_user.hospital:
complaints_queryset = complaints_queryset.filter(hospital=source_user.hospital)
complaints_queryset = complaints_queryset.select_related("patient", "hospital", "assigned_to", "created_by")
# Apply filters
status_filter = request.GET.get("status")
if status_filter:
complaints_queryset = complaints_queryset.filter(status=status_filter)
priority_filter = request.GET.get("priority")
if priority_filter:
complaints_queryset = complaints_queryset.filter(priority=priority_filter)
category_filter = request.GET.get("category")
if category_filter:
complaints_queryset = complaints_queryset.filter(category=category_filter)
# Search
search = request.GET.get("search")
if search:
complaints_queryset = complaints_queryset.filter(
Q(title__icontains=search) | Q(description__icontains=search) | Q(patient_name__icontains=search)
)
# Order and paginate
complaints_queryset = complaints_queryset.order_by("-created_at")
from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
paginator = Paginator(complaints_queryset, 20) # 20 per page
page = request.GET.get("page")
try:
complaints = paginator.page(page)
except PageNotAnInteger:
complaints = paginator.page(1)
except EmptyPage:
complaints = paginator.page(paginator.num_pages)
# Calculate totals filtered by hospital
total_complaints_queryset = Complaint.objects.filter(source=source)
total_inquiries_queryset = Inquiry.objects.filter(source=source)
if source_user.hospital:
total_complaints_queryset = total_complaints_queryset.filter(hospital=source_user.hospital)
total_inquiries_queryset = total_inquiries_queryset.filter(hospital=source_user.hospital)
context = {
"complaints": complaints,
"source_user": source_user,
"source": source,
"status_filter": status_filter,
"priority_filter": priority_filter,
"category_filter": category_filter,
"search": search,
"complaints_count": complaints_queryset.count(),
"total_complaints": total_complaints_queryset.count(),
"total_inquiries": total_inquiries_queryset.count(),
}
return render(request, "px_sources/source_user_complaint_list.html", context)
@login_required
@source_user_required
def source_user_inquiry_list(request):
"""
List inquiries for the current Source User.
Shows only inquiries from their assigned source.
"""
# Get source user profile
source_user = SourceUser.get_active_source_user(request.user)
if not source_user:
messages.error(request, _("You are not assigned as a source user. Please contact your administrator."))
return redirect("/")
source = source_user.source
# Get inquiries from this source - filter by hospital for data isolation
from apps.complaints.models import Inquiry, Complaint
from django.db.models import Q
inquiries_queryset = Inquiry.objects.filter(source=source)
# Apply hospital filter for data isolation
if source_user.hospital:
inquiries_queryset = inquiries_queryset.filter(hospital=source_user.hospital)
inquiries_queryset = inquiries_queryset.select_related("patient", "hospital", "assigned_to", "created_by")
# Apply filters
status_filter = request.GET.get("status")
if status_filter:
inquiries_queryset = inquiries_queryset.filter(status=status_filter)
category_filter = request.GET.get("category")
if category_filter:
inquiries_queryset = inquiries_queryset.filter(category=category_filter)
# Search
search = request.GET.get("search")
if search:
inquiries_queryset = inquiries_queryset.filter(
Q(subject__icontains=search) | Q(message__icontains=search) | Q(contact_name__icontains=search)
)
# Order and paginate
inquiries_queryset = inquiries_queryset.order_by("-created_at")
from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
paginator = Paginator(inquiries_queryset, 20) # 20 per page
page = request.GET.get("page")
try:
inquiries = paginator.page(page)
except PageNotAnInteger:
inquiries = paginator.page(1)
except EmptyPage:
inquiries = paginator.page(paginator.num_pages)
# Calculate totals filtered by hospital
total_complaints_queryset = Complaint.objects.filter(source=source)
total_inquiries_queryset = Inquiry.objects.filter(source=source)
if source_user.hospital:
total_complaints_queryset = total_complaints_queryset.filter(hospital=source_user.hospital)
total_inquiries_queryset = total_inquiries_queryset.filter(hospital=source_user.hospital)
context = {
"inquiries": inquiries,
"source_user": source_user,
"source": source,
"status_filter": status_filter,
"category_filter": category_filter,
"search": search,
"inquiries_count": inquiries_queryset.count(),
"total_complaints": total_complaints_queryset.count(),
"total_inquiries": total_inquiries_queryset.count(),
}
return render(request, "px_sources/source_user_inquiry_list.html", context)
def _add_tailwind_classes(form):
tw_text = "w-full px-4 py-3 border-2 border-blue-100 rounded-xl text-navy focus:ring-2 focus:ring-blue focus:border-transparent transition"
tw_select = tw_text + " bg-white"
tw_area = tw_text + " resize-none"
for field in form.fields.values():
w = field.widget
cls = w.__class__.__name__
if cls == "Select":
w.attrs["class"] = tw_select
elif cls == "Textarea":
w.attrs["class"] = tw_area
w.attrs.setdefault("rows", "5")
elif cls in ("DateInput", "DateTimeInput"):
w.attrs["class"] = tw_text
elif cls in ("EmailInput", "TelInput"):
w.attrs["class"] = tw_text
else:
w.attrs["class"] = tw_text
@login_required
@source_user_required
def source_user_create_complaint(request):
"""
Create a complaint for source users.
Simplified form that automatically:
- Assigns the user's source
- Sets the hospital from the source user's context
- Hides admin-only fields
"""
from apps.complaints.forms import PublicComplaintForm
from apps.complaints.models import Complaint
from apps.complaints.tasks import notify_admins_new_complaint
from apps.core.services import AuditService
import uuid
from datetime import datetime
source_user = SourceUser.get_active_source_user(request.user)
if not source_user or not source_user.can_create_complaints:
messages.error(request, _("You don't have permission to create complaints."))
return redirect("px_sources:source_user_dashboard")
source = source_user.source
if request.method == "POST":
form = PublicComplaintForm(request.POST, request.FILES)
# Add Tailwind CSS classes to form fields
for field_name, field in form.fields.items():
if field.widget.__class__.__name__ == "TextInput":
field.widget.attrs.update(
{
"class": "w-full px-4 py-3 border-2 border-blue-100 rounded-xl text-navy focus:ring-2 focus:ring-blue focus:border-transparent transition"
}
)
elif field.widget.__class__.__name__ == "Textarea":
field.widget.attrs.update(
{
"class": "w-full px-4 py-3 border-2 border-blue-100 rounded-xl text-navy focus:ring-2 focus:ring-blue focus:border-transparent transition",
"rows": "5",
}
)
elif field.widget.__class__.__name__ == "Select":
field.widget.attrs.update(
{
"class": "w-full px-4 py-3 border-2 border-blue-100 rounded-xl text-navy focus:ring-2 focus:ring-blue focus:border-transparent transition bg-white"
}
)
elif field.widget.__class__.__name__ == "DateInput":
field.widget.attrs.update(
{
"class": "w-full px-4 py-3 border-2 border-blue-100 rounded-xl text-navy focus:ring-2 focus:ring-blue focus:border-transparent transition",
"type": "date",
}
)
elif field.widget.__class__.__name__ == "EmailInput":
field.widget.attrs.update(
{
"class": "w-full px-4 py-3 border-2 border-blue-100 rounded-xl text-navy focus:ring-2 focus:ring-blue focus:border-transparent transition"
}
)
elif field.widget.__class__.__name__ == "TelInput":
field.widget.attrs.update(
{
"class": "w-full px-4 py-3 border-2 border-blue-100 rounded-xl text-navy focus:ring-2 focus:ring-blue focus:border-transparent transition"
}
)
# Auto-populate hospital from source user's context
if not form.data.get("hospital"):
if source_user.hospital:
form.data = form.data.copy()
form.data["hospital"] = str(source_user.hospital.id)
if not form.data.get("location"):
from apps.organizations.models import Location
first_location = Location.objects.first()
if first_location:
form.data = form.data.copy()
form.data["location"] = str(first_location.id)
if form.is_valid():
try:
# Create complaint
complaint = form.save(commit=False)
# Set source automatically
complaint.source = source
# Set hospital from source user's context
if source_user.hospital:
complaint.hospital = source_user.hospital
# Map complaint_details to description (form field vs model field)
complaint.description = form.cleaned_data.get("complaint_details", "")
# Generate reference number
today = datetime.now().strftime("%Y%m%d")
random_suffix = str(uuid.uuid4().int)[:6]
complaint.reference_number = f"CMP-{today}-{random_suffix}"
# Set created by
complaint.created_by = request.user
complaint.save()
# Create initial update
from apps.complaints.models import ComplaintUpdate
ComplaintUpdate.objects.create(
complaint=complaint,
update_type="note",
message=f"Complaint submitted by {source.name_en} source user.",
created_by=request.user,
)
# Trigger AI analysis
try:
from apps.complaints.tasks import analyze_complaint_with_ai
analyze_complaint_with_ai.delay(str(complaint.id))
except:
pass # AI analysis is optional
# Notify admins
try:
notify_admins_new_complaint.delay(str(complaint.id))
except:
pass # Notification is optional
# Log audit
try:
AuditService.log_event(
event_type="complaint_created_by_source_user",
description=f"Complaint created by source user: {complaint.reference_number}",
user=request.user,
content_object=complaint,
metadata={
"source": source.name_en,
"source_user_id": str(source_user.id),
},
)
except:
pass # Audit logging is optional
messages.success(request, f"Complaint submitted successfully! Reference: {complaint.reference_number}")
return redirect("px_sources:source_user_complaint_list")
except Exception as e:
messages.error(request, f"Error creating complaint: {str(e)}")
else:
messages.error(request, "Please correct the errors below.")
else:
form = PublicComplaintForm()
# Pre-populate hospital from source user's context
if source_user.hospital:
form.initial["hospital"] = source_user.hospital.id
# Pre-populate location (get first location from user's hospital if available)
if source_user.hospital:
from apps.organizations.models import Location
first_location = Location.objects.first()
if first_location:
form.initial["location"] = first_location.id
# Add Tailwind CSS classes to form fields (for GET request too)
for field_name, field in form.fields.items():
if field.widget.__class__.__name__ == "TextInput":
field.widget.attrs.update(
{
"class": "w-full px-4 py-3 border-2 border-blue-100 rounded-xl text-navy focus:ring-2 focus:ring-blue focus:border-transparent transition"
}
)
elif field.widget.__class__.__name__ == "Textarea":
field.widget.attrs.update(
{
"class": "w-full px-4 py-3 border-2 border-blue-100 rounded-xl text-navy focus:ring-2 focus:ring-blue focus:border-transparent transition",
"rows": "5",
}
)
elif field.widget.__class__.__name__ == "Select":
field.widget.attrs.update(
{
"class": "w-full px-4 py-3 border-2 border-blue-100 rounded-xl text-navy focus:ring-2 focus:ring-blue focus:border-transparent transition bg-white"
}
)
elif field.widget.__class__.__name__ == "DateInput":
field.widget.attrs.update(
{
"class": "w-full px-4 py-3 border-2 border-blue-100 rounded-xl text-navy focus:ring-2 focus:ring-blue focus:border-transparent transition",
"type": "date",
}
)
elif field.widget.__class__.__name__ == "EmailInput":
field.widget.attrs.update(
{
"class": "w-full px-4 py-3 border-2 border-blue-100 rounded-xl text-navy focus:ring-2 focus:ring-blue focus:border-transparent transition"
}
)
elif field.widget.__class__.__name__ == "TelInput":
field.widget.attrs.update(
{
"class": "w-full px-4 py-3 border-2 border-blue-100 rounded-xl text-navy focus:ring-2 focus:ring-blue focus:border-transparent transition"
}
)
context = {
"form": form,
"source": source,
"source_user": source_user,
}
return render(request, "px_sources/source_user_create_complaint.html", context)
@login_required
@source_user_required
def source_user_create_inquiry(request):
"""
Create an inquiry for source users.
Simplified form that automatically:
- Assigns the user's source
- Sets the hospital from the source user's context
- Hides admin-only fields
"""
from apps.complaints.forms import InquiryForm
from apps.complaints.models import Inquiry
import uuid
from datetime import datetime
source_user = SourceUser.get_active_source_user(request.user)
if not source_user or not source_user.can_create_inquiries:
messages.error(request, _("You don't have permission to create inquiries."))
return redirect("px_sources:source_user_dashboard")
source = source_user.source
initial = {}
if source_user.hospital:
initial["hospital"] = source_user.hospital
if request.method == "POST":
form = InquiryForm(request.POST, request=request, initial=initial)
from apps.organizations.models import Location, MainSection, SubSection
form.fields["location"].queryset = Location.active_locations()
location_id = form.data.get("location")
if location_id:
available_sections = (
SubSection.objects.filter(location_id=location_id).values_list("main_section_id", flat=True).distinct()
)
form.fields["main_section"].queryset = MainSection.objects.filter(id__in=available_sections).order_by("name_en")
section_id = form.data.get("main_section")
if section_id:
form.fields["subsection"].queryset = SubSection.objects.filter(
location_id=location_id, main_section_id=section_id
).order_by("name_en")
if form.is_valid():
try:
inquiry = form.save(commit=False)
inquiry.source = source
if source_user.hospital:
inquiry.hospital = source_user.hospital
today = datetime.now().strftime("%Y%m%d")
random_suffix = str(uuid.uuid4().int)[:6]
inquiry.reference_number = f"INQ-{today}-{random_suffix}"
inquiry.created_by = request.user
inquiry.save()
from apps.core.services import AuditService
try:
AuditService.log_event(
event_type="inquiry_created_by_source_user",
description=f"Inquiry created by source user: {inquiry.reference_number}",
user=request.user,
content_object=inquiry,
metadata={
"source": source.name_en,
"source_user_id": str(source_user.id),
},
)
except:
pass
messages.success(request, f"Inquiry submitted successfully! Reference: {inquiry.reference_number}")
return redirect("px_sources:source_user_inquiry_list")
except Exception as e:
messages.error(request, f"Error creating inquiry: {str(e)}")
else:
messages.error(request, "Please correct the errors below.")
else:
form = InquiryForm(request=request, initial=initial)
TAILWIND_TEXT = "w-full px-4 py-3 border-2 border-blue-100 rounded-xl text-navy focus:ring-2 focus:ring-blue focus:border-transparent transition"
TAILWIND_SELECT = "w-full px-4 py-3 border-2 border-blue-100 rounded-xl text-navy focus:ring-2 focus:ring-blue focus:border-transparent transition bg-white"
for field_name in ("contact_name", "contact_email", "contact_phone", "subject"):
if field_name in form.fields:
form.fields[field_name].widget.attrs["class"] = TAILWIND_TEXT
if "message" in form.fields:
form.fields["message"].widget.attrs.update({"class": TAILWIND_TEXT, "rows": "5"})
for field_name in ("category", "location", "main_section", "subsection"):
if field_name in form.fields:
form.fields[field_name].widget.attrs["class"] = TAILWIND_SELECT
if "location" in form.fields:
from apps.organizations.models import Location
form.fields["location"].queryset = Location.active_locations()
context = {
"form": form,
"source": source,
"source_user": source_user,
}
return render(request, "px_sources/source_user_create_inquiry.html", context)
@login_required
@require_http_methods(["POST"])
def source_complaint_create(request, pk):
"""Create a source complaint from PX Source detail page"""
source = get_object_or_404(PXSource, pk=pk)
if not _can_manage_source_complaints(request.user):
messages.error(request, _("You don't have permission to create source complaints."))
return redirect("px_sources:source_detail", pk=pk)
subject = request.POST.get("subject", "").strip()
description = request.POST.get("description", "").strip()
if not subject or not description:
messages.error(request, _("Subject and description are required."))
return redirect("px_sources:source_detail", pk=pk)
SourceComplaint.objects.create(
px_source=source,
subject=subject,
description=description,
patient_name=request.POST.get("patient_name", "").strip(),
contact_phone=request.POST.get("contact_phone", "").strip(),
contact_email=request.POST.get("contact_email", "").strip(),
created_by=request.user,
)
messages.success(request, _("Source complaint created successfully."))
return redirect("px_sources:source_detail", pk=pk)
@login_required
def convert_to_system_complaint(request, pk):
"""Convert a SourceComplaint to a system Complaint — review/edit modal"""
from apps.organizations.models import Hospital, Department
source_complaint = get_object_or_404(SourceComplaint, pk=pk)
if not _can_manage_source_complaints(request.user):
messages.error(request, _("You don't have permission to convert source complaints."))
return redirect("px_sources:source_detail", pk=source_complaint.px_source.pk)
if source_complaint.status == SourceComplaint.StatusChoices.CLOSED:
messages.error(request, _("Closed source complaints cannot be converted."))
return redirect("px_sources:source_detail", pk=source_complaint.px_source.pk)
if request.method == "POST":
title = request.POST.get("title", source_complaint.subject).strip()
description = request.POST.get("description", source_complaint.description).strip()
patient_name = request.POST.get("patient_name", source_complaint.patient_name).strip()
hospital_id = request.POST.get("hospital")
department_id = request.POST.get("department")
if not title or not description:
messages.error(request, _("Title and description are required."))
return redirect("px_sources:source_detail", pk=source_complaint.px_source.pk)
complaint = Complaint.objects.create(
title=title,
description=description,
patient_name=patient_name,
source=source_complaint.px_source,
complaint_source_type="EXTERNAL",
hospital_id=hospital_id or None,
department_id=department_id or None,
created_by=request.user,
)
source_complaint.system_complaint = complaint
source_complaint.status = SourceComplaint.StatusChoices.CONVERTED
source_complaint.save(update_fields=["system_complaint", "status", "updated_at"])
from apps.complaints.models import ComplaintUpdate
ComplaintUpdate.objects.create(
complaint=complaint,
update_type="system_note",
message=f"Converted from source complaint {source_complaint.reference_number}",
created_by=request.user,
)
messages.success(request, _(f"Converted to system complaint: {complaint.reference_number}"))
return redirect("complaints:complaint_detail", pk=complaint.pk)
hospitals = Hospital.objects.filter(status="active")
departments = Department.objects.filter(status="active").select_related("hospital")
context = {
"source_complaint": source_complaint,
"hospitals": hospitals,
"departments": departments,
}
return render(request, "px_sources/convert_to_complaint_modal.html", context)
def _can_manage_source_complaints(user):
return user.is_px_admin() or user.is_hospital_admin() or getattr(user, "is_department_manager", lambda: False)()
@login_required
def source_user_observation_list(request):
source_user = SourceUser.get_active_source_user(request.user)
if not source_user:
messages.error(request, _("You are not assigned as a source user. Please contact your administrator."))
return redirect("/")
source = source_user.source
from apps.observations.models import Observation
from django.db.models import Q
observations_qs = Observation.objects.filter(px_source=source)
if source_user.hospital:
observations_qs = observations_qs.filter(hospital=source_user.hospital)
observations_qs = observations_qs.select_related("hospital", "category", "assigned_department")
status_filter = request.GET.get("status")
if status_filter:
observations_qs = observations_qs.filter(status=status_filter)
category_filter = request.GET.get("category")
if category_filter:
observations_qs = observations_qs.filter(category_id=category_filter)
search = request.GET.get("search")
if search:
observations_qs = observations_qs.filter(
Q(description__icontains=search) | Q(tracking_code__icontains=search) | Q(location_text__icontains=search)
)
observations_qs = observations_qs.order_by("-created_at")
from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
paginator = Paginator(observations_qs, 20)
page = request.GET.get("page")
try:
observations = paginator.page(page)
except PageNotAnInteger:
observations = paginator.page(1)
except EmptyPage:
observations = paginator.page(paginator.num_pages)
from apps.observations.models import ObservationCategory
categories = ObservationCategory.objects.filter(is_active=True).order_by("name_en")
context = {
"observations": observations,
"source_user": source_user,
"source": source,
"status_filter": status_filter,
"category_filter": category_filter,
"search": search,
"observations_count": observations_qs.count(),
"categories": categories,
}
return render(request, "px_sources/source_user_observation_list.html", context)
@login_required
def source_user_suggestion_list(request):
source_user = SourceUser.get_active_source_user(request.user)
if not source_user:
messages.error(request, _("You are not assigned as a source user. Please contact your administrator."))
return redirect("/")
source = source_user.source
from apps.feedback.models import Feedback, FeedbackType
from django.db.models import Q
suggestions_qs = Feedback.objects.filter(source=source, feedback_type=FeedbackType.SUGGESTION)
if source_user.hospital:
suggestions_qs = suggestions_qs.filter(hospital=source_user.hospital)
suggestions_qs = suggestions_qs.select_related("hospital", "department")
status_filter = request.GET.get("status")
if status_filter:
suggestions_qs = suggestions_qs.filter(status=status_filter)
category_filter = request.GET.get("category")
if category_filter:
suggestions_qs = suggestions_qs.filter(category=category_filter)
search = request.GET.get("search")
if search:
suggestions_qs = suggestions_qs.filter(
Q(title__icontains=search) | Q(message__icontains=search)
)
suggestions_qs = suggestions_qs.order_by("-created_at")
from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
paginator = Paginator(suggestions_qs, 20)
page = request.GET.get("page")
try:
suggestions = paginator.page(page)
except PageNotAnInteger:
suggestions = paginator.page(1)
except EmptyPage:
suggestions = paginator.page(paginator.num_pages)
context = {
"suggestions": suggestions,
"source_user": source_user,
"source": source,
"status_filter": status_filter,
"category_filter": category_filter,
"search": search,
"suggestions_count": suggestions_qs.count(),
}
return render(request, "px_sources/source_user_suggestion_list.html", context)
@login_required
def source_user_create_observation(request):
from apps.observations.forms import PublicObservationForm
source_user = SourceUser.get_active_source_user(request.user)
if not source_user or not source_user.can_create_observations:
messages.error(request, _("You don't have permission to create observations."))
return redirect("px_sources:source_user_dashboard")
source = source_user.source
if request.method == "POST":
form = PublicObservationForm(request.POST)
if not form.data.get("hospital"):
if source_user.hospital:
form.data = form.data.copy()
form.data["hospital"] = str(source_user.hospital.id)
_add_tailwind_classes(form)
if form.is_valid():
try:
observation = form.save(commit=False)
observation.px_source = source
observation.source = "web_form"
observation.created_by = request.user
observation.metadata = {"created_via": "source_user_portal"}
observation.save()
try:
from apps.core.services import AuditService
AuditService.log_event(
event_type="observation_created_by_source_user",
description=f"Observation created by source user: {observation.tracking_code}",
user=request.user,
content_object=observation,
metadata={"source": source.name_en, "source_user_id": str(source_user.id)},
)
except Exception:
pass
messages.success(request, _("Observation submitted successfully! Tracking code: %s") % observation.tracking_code)
return redirect("px_sources:source_user_observations")
except Exception as e:
messages.error(request, _("Error creating observation: %s") % str(e))
else:
messages.error(request, _("Please correct the errors below."))
else:
form = PublicObservationForm()
if source_user.hospital:
form.initial["hospital"] = source_user.hospital.id
from apps.organizations.models import Location
first_location = Location.objects.first()
if first_location:
form.initial["location"] = first_location.id
_add_tailwind_classes(form)
context = {"form": form, "source": source, "source_user": source_user}
return render(request, "px_sources/source_user_create_observation.html", context)
@login_required
def source_user_create_suggestion(request):
from apps.feedback.forms import PublicSuggestionForm
from apps.feedback.models import Feedback, FeedbackType, FeedbackCategory
source_user = SourceUser.get_active_source_user(request.user)
if not source_user or not source_user.can_create_suggestions:
messages.error(request, _("You don't have permission to create suggestions."))
return redirect("px_sources:source_user_dashboard")
source = source_user.source
if request.method == "POST":
form = PublicSuggestionForm(request.POST)
form.data = form.data.copy()
if not form.data.get("hospital") and source_user.hospital:
form.data["hospital"] = str(source_user.hospital.id)
if not form.data.get("category"):
form.data["category"] = "other"
if not form.data.get("title"):
form.data["title"] = form.data.get("message", "")[:80]
_add_tailwind_classes(form)
if form.is_valid():
try:
message = form.cleaned_data["message"]
title = message[:80].rstrip() + ("..." if len(message) > 80 else "")
feedback = Feedback(
hospital=source_user.hospital or form.cleaned_data.get("hospital"),
feedback_type=FeedbackType.SUGGESTION,
title=title,
message=message,
category=FeedbackCategory.OTHER,
is_anonymous=False,
contact_name=form.cleaned_data["contact_name"],
contact_phone=form.cleaned_data.get("contact_phone", ""),
contact_email=form.cleaned_data.get("contact_email", ""),
source=source,
metadata={"created_via": "source_user_portal"},
)
feedback.save()
try:
from apps.feedback.tasks import analyze_suggestion_with_ai
analyze_suggestion_with_ai.delay(str(feedback.id))
except Exception:
pass
try:
from apps.core.services import AuditService
AuditService.log_event(
event_type="suggestion_created_by_source_user",
description=f"Suggestion created by source user: {feedback.title}",
user=request.user,
content_object=feedback,
metadata={"source": source.name_en, "source_user_id": str(source_user.id)},
)
except Exception:
pass
messages.success(request, _("Suggestion submitted successfully!"))
return redirect("px_sources:source_user_suggestions")
except Exception as e:
messages.error(request, _("Error creating suggestion: %s") % str(e))
else:
messages.error(request, _("Please correct the errors below."))
else:
form = PublicSuggestionForm()
if source_user.hospital:
form.initial["hospital"] = source_user.hospital.id
from apps.organizations.models import Location
first_location = Location.objects.first()
if first_location:
form.initial["location"] = first_location.id
_add_tailwind_classes(form)
context = {"form": form, "source": source, "source_user": source_user}
return render(request, "px_sources/source_user_create_suggestion.html", context)
@login_required
def source_user_create_communication_request(request):
source_user = SourceUser.get_active_source_user(request.user)
if not source_user:
messages.error(request, _("You are not assigned as a source user."))
return redirect("/")
if request.method == "POST":
from apps.notifications.services import NotificationService
from apps.accounts.models import User
from django.conf import settings
hospital = source_user.hospital or request.user.hospital
cr = CommunicationRequest.objects.create(
source_user=source_user,
hospital=hospital,
patient_name=request.POST.get("patient_name", "").strip(),
patient_phone=request.POST.get("patient_phone", "").strip(),
patient_mrn=request.POST.get("patient_mrn", "").strip(),
reason=request.POST.get("reason", "general_inquiry"),
message=request.POST.get("message", "").strip(),
)
site_url = getattr(settings, "SITE_URL", "http://localhost:8000")
detail_url = f"{site_url}/px-sources/communication-requests/manage/{cr.pk}/"
px_employee = User.objects.filter(
groups__name__in=["PX Admin", "PX Employee"],
hospital=hospital,
is_active=True,
).exclude(email="")
for staff_user in px_employee:
try:
NotificationService.send_email(
to_email=staff_user.email,
subject=f"New Communication Request - {cr.get_reason_display()}",
template_name="emails/communication_request_notification",
context={
"patient_name": cr.patient_name,
"reason": cr.get_reason_display(),
"message": cr.message,
"source_user_name": request.user.get_full_name(),
"request_id": str(cr.id),
"detail_url": detail_url,
},
)
except Exception:
pass
messages.success(request, _("Communication request sent to the PX team."))
return redirect("px_sources:source_user_communication_requests")
context = {
"source_user": source_user,
}
return render(request, "px_sources/source_user_create_communication_request.html", context)
@login_required
def source_user_communication_requests(request):
source_user = SourceUser.get_active_source_user(request.user)
if not source_user:
messages.error(request, _("You are not assigned as a source user."))
return redirect("/")
from .models import CommunicationRequest
requests_qs = CommunicationRequest.objects.filter(
source_user=source_user
).order_by("-created_at")
context = {
"source_user": source_user,
"comm_requests": requests_qs,
"total_count": requests_qs.count(),
"pending_count": requests_qs.filter(status="pending").count(),
}
return render(request, "px_sources/source_user_communication_request_list.html", context)
def _get_hospital(request):
if request.user.is_px_admin() and hasattr(request, "tenant_hospital") and request.tenant_hospital:
return request.tenant_hospital
if request.user.hospital:
return request.user.hospital
return None
@login_required
@block_source_user
def communication_request_list(request):
hospital = _get_hospital(request)
qs = CommunicationRequest.objects.select_related(
"source_user", "source_user__user", "source_user__source", "contacted_by"
)
if hospital:
qs = qs.filter(hospital=hospital)
status_filter = request.GET.get("status", "")
if status_filter:
qs = qs.filter(status=status_filter)
qs = qs.order_by("-created_at")
base_qs = CommunicationRequest.objects.all()
if hospital:
base_qs = base_qs.filter(hospital=hospital)
all_total_count = base_qs.count()
filtered_count = qs.count()
pending_count = base_qs.filter(status="pending").count()
context = {
"comm_requests": qs,
"total_count": filtered_count,
"all_total_count": all_total_count,
"pending_count": pending_count,
"status_filter": status_filter,
}
return render(request, "px_sources/communication_request_list.html", context)
@login_required
@block_source_user
def communication_request_detail(request, pk):
comm_req = get_object_or_404(
CommunicationRequest.objects.select_related(
"source_user", "source_user__user", "source_user__source", "contacted_by", "hospital"
),
pk=pk,
)
hospital = _get_hospital(request)
if hospital and comm_req.hospital != hospital:
messages.error(request, _("You do not have permission to view this request."))
return redirect("px_sources:communication_request_list")
if request.method == "POST":
new_status = request.POST.get("status", comm_req.status)
resolution_notes = request.POST.get("resolution_notes", "")
valid_statuses = [s[0] for s in CommunicationRequest.Status.choices]
if new_status in valid_statuses:
comm_req.status = new_status
comm_req.resolution_notes = resolution_notes
if new_status in ("contacted", "resolved", "closed") and not comm_req.contacted_by:
from django.utils import timezone
comm_req.contacted_by = request.user
comm_req.contacted_at = timezone.now()
comm_req.save()
messages.success(request, _("Communication request updated successfully."))
else:
messages.error(request, _("Invalid status."))
return redirect("px_sources:communication_request_detail", pk=comm_req.pk)
context = {
"comm_req": comm_req,
}
return render(request, "px_sources/communication_request_detail.html", context)