373 lines
14 KiB
Python
373 lines
14 KiB
Python
"""
|
||
Base utilities for realistic scenario testing with time compression.
|
||
|
||
Provides reusable functionality for:
|
||
- Time-compressed simulation
|
||
- SLA configuration setup
|
||
- Staff hierarchy creation
|
||
- Progress logging
|
||
- Database verification
|
||
"""
|
||
import time
|
||
from datetime import timedelta
|
||
from django.utils import timezone
|
||
from django.db import transaction
|
||
|
||
|
||
class ScenarioTestBase:
|
||
"""Base class for realistic scenario testing"""
|
||
|
||
def __init__(self, time_compression_ratio=2):
|
||
"""
|
||
Initialize scenario test base.
|
||
|
||
Args:
|
||
time_compression_ratio: 1 second of real time = X hours of system time
|
||
Default: 2 (1s = 2h)
|
||
"""
|
||
self.time_compression_ratio = time_compression_ratio
|
||
self.start_time = None
|
||
self.step_number = 0
|
||
|
||
def print_header(self, title):
|
||
"""Print formatted header"""
|
||
print("\n" + "=" * 80)
|
||
print(f" {title}")
|
||
print("=" * 80)
|
||
self.start_time = time.time()
|
||
|
||
def print_step(self, message, duration_seconds=0):
|
||
"""
|
||
Print step with optional sleep.
|
||
|
||
Args:
|
||
message: Step description
|
||
duration_seconds: How long to wait (0 for no wait)
|
||
"""
|
||
self.step_number += 1
|
||
system_hours = duration_seconds * self.time_compression_ratio
|
||
|
||
print(f"\n[Step {self.step_number}] {message}")
|
||
if system_hours > 0:
|
||
print(f" → Waiting {duration_seconds}s (simulates {system_hours:.0f} hours)")
|
||
|
||
for i in range(1, duration_seconds + 1):
|
||
time.sleep(1)
|
||
if i % 5 == 0 or i == duration_seconds:
|
||
print(f" [{i}/{duration_seconds}s] Simulated time: {i * self.time_compression_ratio:.0f} hours")
|
||
else:
|
||
print(" → Executing immediately")
|
||
|
||
def print_success(self, message):
|
||
"""Print success message"""
|
||
print(f" ✓ {message}")
|
||
|
||
def print_warning(self, message):
|
||
"""Print warning message"""
|
||
print(f" ⚠ {message}")
|
||
|
||
def print_error(self, message):
|
||
"""Print error message"""
|
||
print(f" ✗ {message}")
|
||
|
||
def print_info(self, message):
|
||
"""Print info message"""
|
||
print(f" ℹ {message}")
|
||
|
||
def print_email_preview(self, subject, recipient, message_preview):
|
||
"""Print email preview"""
|
||
print("\n ┌─ EMAIL PREVIEW ─────────────────────────────────────")
|
||
print(f" │ To: {recipient}")
|
||
print(f" │ Subject: {subject}")
|
||
print(f" └─────────────────────────────────────────────────────")
|
||
for line in message_preview.split('\n')[:10]: # First 10 lines
|
||
print(f" │ {line}")
|
||
if message_preview.count('\n') > 10:
|
||
print(" │ ...")
|
||
print(" └─────────────────────────────────────────────────────\n")
|
||
|
||
def create_explanation_sla_config(
|
||
self,
|
||
hospital,
|
||
response_hours=48,
|
||
reminder_hours_before=12,
|
||
auto_escalate_enabled=True,
|
||
escalation_hours_overdue=0,
|
||
max_escalation_levels=3,
|
||
is_active=True
|
||
):
|
||
"""Create or update ExplanationSLAConfig for a hospital"""
|
||
from apps.complaints.models import ExplanationSLAConfig
|
||
|
||
config, created = ExplanationSLAConfig.objects.update_or_create(
|
||
hospital=hospital,
|
||
defaults={
|
||
'response_hours': response_hours,
|
||
'reminder_hours_before': reminder_hours_before,
|
||
'auto_escalate_enabled': auto_escalate_enabled,
|
||
'escalation_hours_overdue': escalation_hours_overdue,
|
||
'max_escalation_levels': max_escalation_levels,
|
||
'is_active': is_active
|
||
}
|
||
)
|
||
|
||
if created:
|
||
self.print_success(f"Created ExplanationSLAConfig: {response_hours}h response time")
|
||
else:
|
||
self.print_success(f"Updated ExplanationSLAConfig: {response_hours}h response time")
|
||
|
||
return config
|
||
|
||
def create_complaint_sla_config(
|
||
self,
|
||
hospital,
|
||
severity='medium',
|
||
priority='medium',
|
||
sla_hours=72,
|
||
reminder_hours_before=24,
|
||
second_reminder_enabled=True,
|
||
second_reminder_hours_before=6,
|
||
is_active=True
|
||
):
|
||
"""Create or update ComplaintSLAConfig for a hospital"""
|
||
from apps.complaints.models import ComplaintSLAConfig
|
||
|
||
config, created = ComplaintSLAConfig.objects.update_or_create(
|
||
hospital=hospital,
|
||
severity=severity,
|
||
priority=priority,
|
||
defaults={
|
||
'sla_hours': sla_hours,
|
||
'reminder_hours_before': reminder_hours_before,
|
||
'second_reminder_enabled': second_reminder_enabled,
|
||
'second_reminder_hours_before': second_reminder_hours_before,
|
||
'is_active': is_active
|
||
}
|
||
)
|
||
|
||
if created:
|
||
self.print_success(f"Created ComplaintSLAConfig: {severity}/{priority} - {sla_hours}h SLA")
|
||
else:
|
||
self.print_success(f"Updated ComplaintSLAConfig: {severity}/{priority} - {sla_hours}h SLA")
|
||
|
||
return config
|
||
|
||
def create_staff_hierarchy(
|
||
self,
|
||
hospital,
|
||
department=None
|
||
):
|
||
"""
|
||
Create staff hierarchy for testing escalation.
|
||
|
||
Returns:
|
||
tuple: (staff, manager, department_head, admin)
|
||
"""
|
||
from apps.organizations.models import Staff
|
||
from apps.accounts.models import User
|
||
import secrets
|
||
|
||
# Get hospital admin user
|
||
admin = User.objects.filter(
|
||
hospital=hospital,
|
||
groups__name='Hospital Admin'
|
||
).first()
|
||
|
||
if not admin:
|
||
# Create admin if not exists - use User.objects.create_user
|
||
admin = User.objects.create_user(
|
||
email=f"admin@{hospital.name.lower().replace(' ', '')}.test",
|
||
password="test123",
|
||
first_name="Hospital",
|
||
last_name="Admin",
|
||
hospital=hospital,
|
||
phone="+966500000000"
|
||
)
|
||
# Add to Hospital Admin group
|
||
from django.contrib.auth.models import Group
|
||
admin_group, _ = Group.objects.get_or_create(name='Hospital Admin')
|
||
admin.groups.add(admin_group)
|
||
self.print_success(f"Created hospital admin: {admin.email}")
|
||
|
||
# Create or get department head
|
||
dept_head_email = f"dept_head@{hospital.name.lower().replace(' ', '')}.test"
|
||
dept_head_exists = Staff.objects.filter(email=dept_head_email).exists()
|
||
|
||
if dept_head_exists:
|
||
department_head = Staff.objects.get(email=dept_head_email)
|
||
self.print_info(f"Using existing department head: {department_head.get_full_name()}")
|
||
else:
|
||
# Generate unique employee_id
|
||
emp_id = f"DH-{secrets.token_hex(4).upper()}"
|
||
department_head = Staff.objects.create(
|
||
hospital=hospital,
|
||
department=department,
|
||
email=dept_head_email,
|
||
employee_id=emp_id,
|
||
staff_type='admin',
|
||
first_name="Ahmed",
|
||
last_name="Al-Farsi",
|
||
first_name_ar="أحمد",
|
||
last_name_ar="الفارسي",
|
||
job_title="Department Head",
|
||
specialization="Administration",
|
||
phone="+966511111111",
|
||
status='active'
|
||
)
|
||
self.print_success(f"Created department head: {department_head.get_full_name()}")
|
||
|
||
# Create or get manager
|
||
mgr_email = f"manager@{hospital.name.lower().replace(' ', '')}.test"
|
||
mgr_exists = Staff.objects.filter(email=mgr_email).exists()
|
||
|
||
if mgr_exists:
|
||
manager = Staff.objects.get(email=mgr_email)
|
||
self.print_info(f"Using existing manager: {manager.get_full_name()}")
|
||
else:
|
||
# Generate unique employee_id
|
||
emp_id = f"MGR-{secrets.token_hex(4).upper()}"
|
||
manager = Staff.objects.create(
|
||
hospital=hospital,
|
||
department=department,
|
||
report_to=department_head,
|
||
email=mgr_email,
|
||
employee_id=emp_id,
|
||
staff_type='admin',
|
||
first_name="Mohammed",
|
||
last_name="Al-Rashid",
|
||
first_name_ar="محمد",
|
||
last_name_ar="الرشيد",
|
||
job_title="Manager",
|
||
specialization="Operations",
|
||
phone="+966512222222",
|
||
status='active'
|
||
)
|
||
self.print_success(f"Created manager: {manager.get_full_name()}")
|
||
|
||
# Create or get staff member
|
||
staff_email = f"staff@{hospital.name.lower().replace(' ', '')}.test"
|
||
staff_exists = Staff.objects.filter(email=staff_email).exists()
|
||
|
||
if staff_exists:
|
||
staff = Staff.objects.get(email=staff_email)
|
||
self.print_info(f"Using existing staff member: {staff.get_full_name()}")
|
||
else:
|
||
# Generate unique employee_id
|
||
emp_id = f"STF-{secrets.token_hex(4).upper()}"
|
||
staff = Staff.objects.create(
|
||
hospital=hospital,
|
||
department=department,
|
||
report_to=manager,
|
||
email=staff_email,
|
||
employee_id=emp_id,
|
||
staff_type='nurse',
|
||
first_name="Omar",
|
||
last_name="Al-Harbi",
|
||
first_name_ar="عمر",
|
||
last_name_ar="الحربي",
|
||
job_title="Nurse",
|
||
specialization="Patient Care",
|
||
phone="+966513333333",
|
||
status='active'
|
||
)
|
||
self.print_success(f"Created staff member: {staff.get_full_name()}")
|
||
|
||
self.print_success(
|
||
f"Staff hierarchy: "
|
||
f"{staff.get_full_name()} → {manager.get_full_name()} → "
|
||
f"{department_head.get_full_name()} → Admin"
|
||
)
|
||
|
||
return staff, manager, department_head, admin
|
||
|
||
def verify_explanation_state(self, explanation, expected_state):
|
||
"""Verify explanation is in expected state"""
|
||
from apps.complaints.models import ComplaintExplanation
|
||
|
||
# Refresh from database
|
||
explanation = ComplaintExplanation.objects.get(id=explanation.id)
|
||
|
||
is_used = explanation.is_used
|
||
is_overdue = explanation.is_overdue
|
||
has_reminder = explanation.reminder_sent_at is not None
|
||
has_escalation = explanation.escalated_to_manager is not None
|
||
|
||
if expected_state == 'submitted':
|
||
if is_used and not is_overdue:
|
||
self.print_success(
|
||
f"Explanation submitted successfully "
|
||
f"(is_used={is_used}, is_overdue={is_overdue})"
|
||
)
|
||
return True
|
||
else:
|
||
self.print_error(
|
||
f"Explanation not submitted as expected "
|
||
f"(is_used={is_used}, is_overdue={is_overdue})"
|
||
)
|
||
return False
|
||
|
||
elif expected_state == 'pending':
|
||
if not is_used and not is_overdue and not has_reminder:
|
||
self.print_success(
|
||
f"Explanation pending correctly "
|
||
f"(is_used={is_used}, is_overdue={is_overdue}, reminder={has_reminder})"
|
||
)
|
||
return True
|
||
else:
|
||
self.print_error(
|
||
f"Explanation not in pending state "
|
||
f"(is_used={is_used}, is_overdue={is_overdue}, reminder={has_reminder})"
|
||
)
|
||
return False
|
||
|
||
elif expected_state == 'reminded':
|
||
if not is_used and has_reminder and not has_escalation:
|
||
self.print_success(
|
||
f"Reminder sent correctly "
|
||
f"(is_used={is_used}, has_reminder={has_reminder}, escalated={has_escalation})"
|
||
)
|
||
return True
|
||
else:
|
||
self.print_error(
|
||
f"Reminder not sent correctly "
|
||
f"(is_used={is_used}, has_reminder={has_reminder}, escalated={has_escalation})"
|
||
)
|
||
return False
|
||
|
||
elif expected_state == 'overdue':
|
||
if not is_used and is_overdue and has_escalation:
|
||
self.print_success(
|
||
f"Explanation overdue and escalated correctly "
|
||
f"(is_used={is_used}, is_overdue={is_overdue}, escalated={has_escalation})"
|
||
)
|
||
return True
|
||
else:
|
||
self.print_error(
|
||
f"Explanation not overdue/escalated correctly "
|
||
f"(is_used={is_used}, is_overdue={is_overdue}, escalated={has_escalation})"
|
||
)
|
||
return False
|
||
|
||
else:
|
||
self.print_error(f"Unknown expected state: {expected_state}")
|
||
return False
|
||
|
||
def print_summary(self, total_steps, successful_steps):
|
||
"""Print test summary"""
|
||
elapsed = time.time() - self.start_time
|
||
|
||
print("\n" + "=" * 80)
|
||
print(" TEST SUMMARY")
|
||
print("=" * 80)
|
||
print(f" Total Steps: {total_steps}")
|
||
print(f" Successful: {successful_steps}")
|
||
print(f" Failed: {total_steps - successful_steps}")
|
||
print(f" Elapsed Time: {elapsed:.1f}s")
|
||
|
||
if successful_steps == total_steps:
|
||
print("\n ✓✓✓ ALL TESTS PASSED ✓✓✓")
|
||
else:
|
||
print(f"\n ✗✗✗ {total_steps - successful_steps} TEST(S) FAILED ✗✗✗")
|
||
|
||
print("=" * 80 + "\n")
|