398 lines
18 KiB
Python
398 lines
18 KiB
Python
"""
|
|
Management command to test SLA reminder functionality.
|
|
|
|
Creates test complaints with various scenarios and optionally runs the SLA reminder task
|
|
to verify correct recipient selection and timing calculations.
|
|
|
|
Usage:
|
|
python manage.py test_sla_reminders --dry-run
|
|
python manage.py test_sla_reminders --run-task
|
|
python manage.py test_sla_reminders --scenario assigned --complaint-count 3
|
|
"""
|
|
|
|
import random
|
|
from datetime import timedelta
|
|
from django.core.management.base import BaseCommand
|
|
from django.db import transaction
|
|
from django.utils import timezone
|
|
from django.conf import settings
|
|
|
|
from apps.accounts.models import User
|
|
from apps.complaints.models import Complaint, ComplaintCategory
|
|
from apps.organizations.models import Hospital, Department, Staff
|
|
from apps.px_sources.models import PXSource
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Test SLA reminder functionality by creating test complaints and optionally running the reminder task"
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument("--hospital-code", type=str, help="Target hospital code (default: first active hospital)")
|
|
parser.add_argument(
|
|
"--complaint-count",
|
|
type=int,
|
|
default=5,
|
|
help="Number of test complaints to create per scenario (default: 5)",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run", action="store_true", help="Preview without creating complaints or sending emails"
|
|
)
|
|
parser.add_argument(
|
|
"--run-task", action="store_true", help="Execute the SLA reminder task after creating complaints"
|
|
)
|
|
parser.add_argument(
|
|
"--scenario",
|
|
type=str,
|
|
default="all",
|
|
choices=["assigned", "unassigned", "department-manager", "all"],
|
|
help="Test specific scenario (default: all)",
|
|
)
|
|
parser.add_argument("--cleanup", action="store_true", help="Delete test complaints after testing")
|
|
parser.add_argument(
|
|
"--hours-before-reminder",
|
|
type=int,
|
|
default=1,
|
|
help="Hours before first reminder threshold to create complaints (default: 1)",
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
hospital_code = options["hospital_code"]
|
|
complaint_count = options["complaint_count"]
|
|
dry_run = options["dry_run"]
|
|
run_task = options["run_task"]
|
|
scenario = options["scenario"]
|
|
cleanup = options["cleanup"]
|
|
hours_before_reminder = options["hours_before_reminder"]
|
|
|
|
self.stdout.write(f"\n{'=' * 80}")
|
|
self.stdout.write("🔔 SLA REMINDER TEST COMMAND")
|
|
self.stdout.write(f"{'=' * 80}\n")
|
|
|
|
# Get hospital
|
|
if hospital_code:
|
|
hospital = Hospital.objects.filter(code=hospital_code).first()
|
|
if not hospital:
|
|
self.stdout.write(self.style.ERROR(f"Hospital with code '{hospital_code}' not found"))
|
|
return
|
|
else:
|
|
hospital = Hospital.objects.filter(status="active").first()
|
|
if not hospital:
|
|
self.stdout.write(self.style.ERROR("No active hospitals found"))
|
|
return
|
|
|
|
self.stdout.write(f"🏥 Hospital: {hospital.name} (Code: {hospital.code})")
|
|
|
|
# Get or create test data
|
|
test_data = self.setup_test_data(hospital, dry_run)
|
|
if not test_data:
|
|
return
|
|
|
|
admin_user, px_coordinator, department_with_manager, department_without_manager, source = test_data
|
|
|
|
# Define scenarios
|
|
scenarios = {
|
|
"assigned": {
|
|
"name": "Assigned Complaint",
|
|
"description": "Complaint assigned to a user - email goes to assigned user",
|
|
"assigned_to": admin_user,
|
|
"department": department_with_manager,
|
|
},
|
|
"unassigned": {
|
|
"name": "Unassigned Complaint (No Manager)",
|
|
"description": "Unassigned complaint with no department manager - email goes to hospital admins",
|
|
"assigned_to": None,
|
|
"department": department_without_manager,
|
|
},
|
|
"department-manager": {
|
|
"name": "Unassigned Complaint (With Manager)",
|
|
"description": "Unassigned complaint with department manager - email goes to department manager",
|
|
"assigned_to": None,
|
|
"department": department_with_manager,
|
|
},
|
|
}
|
|
|
|
# Filter scenarios
|
|
if scenario != "all":
|
|
scenarios = {scenario: scenarios[scenario]}
|
|
|
|
# Track created complaints
|
|
created_complaints = []
|
|
|
|
for scenario_key, scenario_config in scenarios.items():
|
|
self.stdout.write(f"\n{'=' * 80}")
|
|
self.stdout.write(f"📋 SCENARIO: {scenario_config['name']}")
|
|
self.stdout.write(f"{'=' * 80}")
|
|
self.stdout.write(f" {scenario_config['description']}")
|
|
self.stdout.write(f"\n Creating {complaint_count} test complaint(s)...\n")
|
|
|
|
for i in range(complaint_count):
|
|
# Calculate backdated time to trigger reminder
|
|
# SLA is 72 hours, first reminder at 24 hours
|
|
# Create complaint that's already past the 24-hour mark
|
|
hours_since_creation = 25 + i # 25, 26, 27... hours ago
|
|
created_at_time = timezone.now() - timedelta(hours=hours_since_creation)
|
|
|
|
if not dry_run:
|
|
try:
|
|
with transaction.atomic():
|
|
complaint = self.create_test_complaint(
|
|
hospital=hospital,
|
|
source=source,
|
|
department=scenario_config["department"],
|
|
assigned_to=scenario_config["assigned_to"],
|
|
created_at=created_at_time,
|
|
index=i + 1,
|
|
scenario=scenario_key,
|
|
)
|
|
created_complaints.append(complaint)
|
|
|
|
self.stdout.write(self.style.SUCCESS(f" ✓ Created: {complaint.reference_number}"))
|
|
self.stdout.write(f" Title: {complaint.title}")
|
|
self.stdout.write(
|
|
f" Created: {complaint.created_at.strftime('%Y-%m-%d %H:%M:%S')} ({hours_since_creation} hours ago)"
|
|
)
|
|
self.stdout.write(f" Due: {complaint.due_at.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
self.stdout.write(
|
|
f" Assigned to: {complaint.assigned_to.get_full_name() if complaint.assigned_to else 'None'}"
|
|
)
|
|
self.stdout.write(
|
|
f" Department: {complaint.department.name if complaint.department else 'None'}"
|
|
)
|
|
if complaint.department and complaint.department.manager:
|
|
self.stdout.write(f" Dept Manager: {complaint.department.manager.get_full_name()}")
|
|
except Exception as e:
|
|
self.stdout.write(self.style.ERROR(f" ✗ Error creating complaint: {str(e)}"))
|
|
else:
|
|
# Dry run - show what would be created
|
|
self.stdout.write(f" [DRY RUN] Would create complaint {i + 1}")
|
|
self.stdout.write(
|
|
f" Created: {created_at_time.strftime('%Y-%m-%d %H:%M:%S')} ({hours_since_creation} hours ago)"
|
|
)
|
|
self.stdout.write(
|
|
f" Assigned to: {scenario_config['assigned_to'].get_full_name() if scenario_config['assigned_to'] else 'None'}"
|
|
)
|
|
self.stdout.write(
|
|
f" Department: {scenario_config['department'].name if scenario_config['department'] else 'None'}"
|
|
)
|
|
|
|
# Calculate expected recipient
|
|
expected_recipient = self.calculate_expected_recipient(
|
|
assigned_to=scenario_config["assigned_to"],
|
|
department=scenario_config["department"],
|
|
hospital=hospital,
|
|
)
|
|
self.stdout.write(f" Expected recipient: {expected_recipient}")
|
|
|
|
# Summary
|
|
self.stdout.write(f"\n{'=' * 80}")
|
|
self.stdout.write("📊 SUMMARY")
|
|
self.stdout.write(f"{'=' * 80}")
|
|
if dry_run:
|
|
self.stdout.write(self.style.WARNING(f" DRY RUN - No complaints created"))
|
|
else:
|
|
self.stdout.write(f" Total complaints created: {len(created_complaints)}")
|
|
for comp in created_complaints[:5]:
|
|
self.stdout.write(f" - {comp.reference_number}")
|
|
if len(created_complaints) > 5:
|
|
self.stdout.write(f" ... and {len(created_complaints) - 5} more")
|
|
|
|
# Run SLA reminder task if requested
|
|
if run_task and not dry_run:
|
|
self.stdout.write(f"\n{'=' * 80}")
|
|
self.stdout.write("🚀 RUNNING SLA REMINDER TASK")
|
|
self.stdout.write(f"{'=' * 80}\n")
|
|
|
|
from apps.complaints.tasks import send_sla_reminders
|
|
|
|
try:
|
|
result = send_sla_reminders()
|
|
self.stdout.write(self.style.SUCCESS(f"\n✓ SLA reminder task completed"))
|
|
self.stdout.write(f" Result: {result}")
|
|
except Exception as e:
|
|
self.stdout.write(self.style.ERROR(f"\n✗ Error running task: {str(e)}"))
|
|
import traceback
|
|
|
|
self.stdout.write(traceback.format_exc())
|
|
elif run_task and dry_run:
|
|
self.stdout.write(f"\n{'=' * 80}")
|
|
self.stdout.write(self.style.WARNING("⚠ Cannot run task in dry-run mode"))
|
|
self.stdout.write(f"{'=' * 80}")
|
|
|
|
# Cleanup if requested
|
|
if cleanup and not dry_run:
|
|
self.stdout.write(f"\n{'=' * 80}")
|
|
self.stdout.write("🗑️ CLEANING UP TEST DATA")
|
|
self.stdout.write(f"{'=' * 80}\n")
|
|
|
|
count = len(created_complaints)
|
|
for comp in created_complaints:
|
|
comp.delete()
|
|
|
|
self.stdout.write(self.style.SUCCESS(f"✓ Deleted {count} test complaints"))
|
|
|
|
self.stdout.write(f"\n{'=' * 80}")
|
|
self.stdout.write("✅ TEST COMPLETED")
|
|
self.stdout.write(f"{'=' * 80}\n")
|
|
|
|
def setup_test_data(self, hospital, dry_run=False):
|
|
"""Setup or get test users, departments, and sources"""
|
|
from django.contrib.auth.models import Group
|
|
|
|
# Get Hospital Admin group
|
|
hospital_admin_group = Group.objects.filter(name="Hospital Admin").first()
|
|
|
|
# Get or create test admin user
|
|
admin_user = User.objects.filter(hospital=hospital, groups=hospital_admin_group, is_active=True).first()
|
|
|
|
if not admin_user:
|
|
self.stdout.write(self.style.ERROR(f"No hospital admin found for {hospital.name}"))
|
|
self.stdout.write(" Please create a hospital admin user first")
|
|
return None
|
|
|
|
# Get PX Coordinator group
|
|
px_coordinator_group = Group.objects.filter(name="PX Coordinator").first()
|
|
|
|
# Get PX Coordinator
|
|
px_coordinator = User.objects.filter(hospital=hospital, groups=px_coordinator_group, is_active=True).first()
|
|
|
|
# Get or create department with manager
|
|
dept_with_manager = (
|
|
Department.objects.filter(hospital=hospital, status="active").exclude(manager__isnull=True).first()
|
|
)
|
|
|
|
if not dept_with_manager:
|
|
if not dry_run:
|
|
self.stdout.write(self.style.WARNING("No department with manager found, creating one..."))
|
|
dept_with_manager = Department.objects.create(
|
|
hospital=hospital,
|
|
name="Test Department",
|
|
name_ar="قسم الاختبار",
|
|
status="active",
|
|
)
|
|
# Assign admin as manager
|
|
dept_with_manager.manager = admin_user
|
|
dept_with_manager.save()
|
|
else:
|
|
self.stdout.write(self.style.WARNING("No department with manager found (dry-run mode)"))
|
|
self.stdout.write(
|
|
" Tip: Run without --dry-run to create test data, or assign a manager to an existing department"
|
|
)
|
|
# Use first available department for dry-run display
|
|
dept_with_manager = Department.objects.filter(hospital=hospital, status="active").first()
|
|
if not dept_with_manager:
|
|
return None
|
|
|
|
# Get or create department without manager
|
|
dept_without_manager = (
|
|
Department.objects.filter(
|
|
hospital=hospital,
|
|
status="active",
|
|
)
|
|
.exclude(id=dept_with_manager.id)
|
|
.first()
|
|
)
|
|
|
|
if not dept_without_manager:
|
|
if not dry_run:
|
|
self.stdout.write(self.style.WARNING("No department without manager found, creating one..."))
|
|
dept_without_manager = Department.objects.create(
|
|
hospital=hospital,
|
|
name="Test Department No Manager",
|
|
name_ar="قسم بدون مدير",
|
|
status="active",
|
|
)
|
|
else:
|
|
self.stdout.write(self.style.WARNING("No second department found (dry-run mode)"))
|
|
self.stdout.write(" Tip: Run without --dry-run to create test data")
|
|
# Reuse the same department for dry-run display
|
|
dept_without_manager = dept_with_manager
|
|
|
|
# Get or create source
|
|
source = PXSource.objects.filter(name_en="Patient", is_active=True).first()
|
|
if not source:
|
|
source = PXSource.objects.filter(is_active=True).first()
|
|
|
|
self.stdout.write(f"\n👥 Test Data:")
|
|
self.stdout.write(f" Admin User: {admin_user.get_full_name()} ({admin_user.email})")
|
|
if px_coordinator:
|
|
self.stdout.write(f" PX Coordinator: {px_coordinator.get_full_name()} ({px_coordinator.email})")
|
|
if dept_with_manager.manager:
|
|
self.stdout.write(
|
|
f" Dept with Manager: {dept_with_manager.name} (Manager: {dept_with_manager.manager.get_full_name()})"
|
|
)
|
|
else:
|
|
self.stdout.write(
|
|
f" Dept: {dept_with_manager.name} (No manager assigned - will use for 'without manager' scenario)"
|
|
)
|
|
self.stdout.write(f" Dept without Manager: {dept_without_manager.name}")
|
|
if source:
|
|
self.stdout.write(f" Source: {source.name_en}")
|
|
|
|
return admin_user, px_coordinator, dept_with_manager, dept_without_manager, source
|
|
|
|
def create_test_complaint(self, hospital, source, department, assigned_to, created_at, index, scenario):
|
|
"""Create a test complaint with backdated created_at"""
|
|
|
|
# Temporarily disable auto_now_add for created_at
|
|
from django.db.models import DateTimeField
|
|
from django.db.models.fields import Field
|
|
|
|
# Save original auto_now_add
|
|
created_at_field = Complaint._meta.get_field("created_at")
|
|
original_auto_now_add = created_at_field.auto_now_add
|
|
created_at_field.auto_now_add = False
|
|
|
|
try:
|
|
complaint = Complaint.objects.create(
|
|
hospital=hospital,
|
|
source=source,
|
|
department=department,
|
|
assigned_to=assigned_to,
|
|
category=self.get_category(),
|
|
title=f"Test SLA Reminder - {scenario.replace('-', ' ').title()} #{index}",
|
|
description=f"This is a test complaint created to verify SLA reminder functionality. Scenario: {scenario}. This complaint was created {((timezone.now() - created_at).total_seconds() / 3600):.1f} hours ago to test if reminders are sent correctly.",
|
|
severity="medium",
|
|
priority="medium",
|
|
status="open",
|
|
contact_name="Test Patient",
|
|
contact_phone="+966500000000",
|
|
contact_email="test@example.com",
|
|
created_at=created_at,
|
|
)
|
|
|
|
# If assigned_to is set, also set status to in_progress and activated_at
|
|
if assigned_to:
|
|
complaint.status = "in_progress"
|
|
complaint.activated_at = created_at + timedelta(hours=1)
|
|
complaint.save(update_fields=["status", "activated_at"])
|
|
|
|
return complaint
|
|
finally:
|
|
# Restore auto_now_add
|
|
created_at_field.auto_now_add = original_auto_now_add
|
|
|
|
def get_category(self):
|
|
"""Get a test category"""
|
|
category = ComplaintCategory.objects.filter(code="communication", is_active=True).first()
|
|
if not category:
|
|
category = ComplaintCategory.objects.filter(is_active=True).first()
|
|
return category
|
|
|
|
def calculate_expected_recipient(self, assigned_to, department, hospital):
|
|
"""Calculate expected email recipient based on complaint configuration"""
|
|
if assigned_to:
|
|
return f"{assigned_to.get_full_name()} ({assigned_to.email}) - Assigned User"
|
|
|
|
if department and department.manager:
|
|
return f"{department.manager.get_full_name()} ({department.manager.email}) - Department Manager"
|
|
|
|
# Fallback to hospital admins and coordinators
|
|
from apps.complaints.tasks import get_hospital_admins_and_coordinators
|
|
|
|
recipients = get_hospital_admins_and_coordinators(hospital)
|
|
if recipients:
|
|
names = [f"{r.get_full_name()} ({r.email})" for r in recipients]
|
|
return f"Hospital Admins/Coordinators: {', '.join(names)}"
|
|
|
|
return "NO RECIPIENTS FOUND"
|