HH/scenario_test_base.py

373 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Base utilities for realistic scenario testing with time compression.
Provides reusable functionality for:
- Time-compressed simulation
- SLA configuration setup
- Staff hierarchy creation
- Progress logging
- Database verification
"""
import time
from datetime import timedelta
from django.utils import timezone
from django.db import transaction
class ScenarioTestBase:
"""Base class for realistic scenario testing"""
def __init__(self, time_compression_ratio=2):
"""
Initialize scenario test base.
Args:
time_compression_ratio: 1 second of real time = X hours of system time
Default: 2 (1s = 2h)
"""
self.time_compression_ratio = time_compression_ratio
self.start_time = None
self.step_number = 0
def print_header(self, title):
"""Print formatted header"""
print("\n" + "=" * 80)
print(f" {title}")
print("=" * 80)
self.start_time = time.time()
def print_step(self, message, duration_seconds=0):
"""
Print step with optional sleep.
Args:
message: Step description
duration_seconds: How long to wait (0 for no wait)
"""
self.step_number += 1
system_hours = duration_seconds * self.time_compression_ratio
print(f"\n[Step {self.step_number}] {message}")
if system_hours > 0:
print(f" → Waiting {duration_seconds}s (simulates {system_hours:.0f} hours)")
for i in range(1, duration_seconds + 1):
time.sleep(1)
if i % 5 == 0 or i == duration_seconds:
print(f" [{i}/{duration_seconds}s] Simulated time: {i * self.time_compression_ratio:.0f} hours")
else:
print(" → Executing immediately")
def print_success(self, message):
"""Print success message"""
print(f"{message}")
def print_warning(self, message):
"""Print warning message"""
print(f"{message}")
def print_error(self, message):
"""Print error message"""
print(f"{message}")
def print_info(self, message):
"""Print info message"""
print(f" {message}")
def print_email_preview(self, subject, recipient, message_preview):
"""Print email preview"""
print("\n ┌─ EMAIL PREVIEW ─────────────────────────────────────")
print(f" │ To: {recipient}")
print(f" │ Subject: {subject}")
print(f" └─────────────────────────────────────────────────────")
for line in message_preview.split('\n')[:10]: # First 10 lines
print(f"{line}")
if message_preview.count('\n') > 10:
print(" │ ...")
print(" └─────────────────────────────────────────────────────\n")
def create_explanation_sla_config(
self,
hospital,
response_hours=48,
reminder_hours_before=12,
auto_escalate_enabled=True,
escalation_hours_overdue=0,
max_escalation_levels=3,
is_active=True
):
"""Create or update ExplanationSLAConfig for a hospital"""
from apps.complaints.models import ExplanationSLAConfig
config, created = ExplanationSLAConfig.objects.update_or_create(
hospital=hospital,
defaults={
'response_hours': response_hours,
'reminder_hours_before': reminder_hours_before,
'auto_escalate_enabled': auto_escalate_enabled,
'escalation_hours_overdue': escalation_hours_overdue,
'max_escalation_levels': max_escalation_levels,
'is_active': is_active
}
)
if created:
self.print_success(f"Created ExplanationSLAConfig: {response_hours}h response time")
else:
self.print_success(f"Updated ExplanationSLAConfig: {response_hours}h response time")
return config
def create_complaint_sla_config(
self,
hospital,
severity='medium',
priority='medium',
sla_hours=72,
reminder_hours_before=24,
second_reminder_enabled=True,
second_reminder_hours_before=6,
is_active=True
):
"""Create or update ComplaintSLAConfig for a hospital"""
from apps.complaints.models import ComplaintSLAConfig
config, created = ComplaintSLAConfig.objects.update_or_create(
hospital=hospital,
severity=severity,
priority=priority,
defaults={
'sla_hours': sla_hours,
'reminder_hours_before': reminder_hours_before,
'second_reminder_enabled': second_reminder_enabled,
'second_reminder_hours_before': second_reminder_hours_before,
'is_active': is_active
}
)
if created:
self.print_success(f"Created ComplaintSLAConfig: {severity}/{priority} - {sla_hours}h SLA")
else:
self.print_success(f"Updated ComplaintSLAConfig: {severity}/{priority} - {sla_hours}h SLA")
return config
def create_staff_hierarchy(
self,
hospital,
department=None
):
"""
Create staff hierarchy for testing escalation.
Returns:
tuple: (staff, manager, department_head, admin)
"""
from apps.organizations.models import Staff
from apps.accounts.models import User
import secrets
# Get hospital admin user
admin = User.objects.filter(
hospital=hospital,
groups__name='Hospital Admin'
).first()
if not admin:
# Create admin if not exists - use User.objects.create_user
admin = User.objects.create_user(
email=f"admin@{hospital.name.lower().replace(' ', '')}.test",
password="test123",
first_name="Hospital",
last_name="Admin",
hospital=hospital,
phone="+966500000000"
)
# Add to Hospital Admin group
from django.contrib.auth.models import Group
admin_group, _ = Group.objects.get_or_create(name='Hospital Admin')
admin.groups.add(admin_group)
self.print_success(f"Created hospital admin: {admin.email}")
# Create or get department head
dept_head_email = f"dept_head@{hospital.name.lower().replace(' ', '')}.test"
dept_head_exists = Staff.objects.filter(email=dept_head_email).exists()
if dept_head_exists:
department_head = Staff.objects.get(email=dept_head_email)
self.print_info(f"Using existing department head: {department_head.get_full_name()}")
else:
# Generate unique employee_id
emp_id = f"DH-{secrets.token_hex(4).upper()}"
department_head = Staff.objects.create(
hospital=hospital,
department=department,
email=dept_head_email,
employee_id=emp_id,
staff_type='admin',
first_name="Ahmed",
last_name="Al-Farsi",
first_name_ar="أحمد",
last_name_ar="الفارسي",
job_title="Department Head",
specialization="Administration",
phone="+966511111111",
status='active'
)
self.print_success(f"Created department head: {department_head.get_full_name()}")
# Create or get manager
mgr_email = f"manager@{hospital.name.lower().replace(' ', '')}.test"
mgr_exists = Staff.objects.filter(email=mgr_email).exists()
if mgr_exists:
manager = Staff.objects.get(email=mgr_email)
self.print_info(f"Using existing manager: {manager.get_full_name()}")
else:
# Generate unique employee_id
emp_id = f"MGR-{secrets.token_hex(4).upper()}"
manager = Staff.objects.create(
hospital=hospital,
department=department,
report_to=department_head,
email=mgr_email,
employee_id=emp_id,
staff_type='admin',
first_name="Mohammed",
last_name="Al-Rashid",
first_name_ar="محمد",
last_name_ar="الرشيد",
job_title="Manager",
specialization="Operations",
phone="+966512222222",
status='active'
)
self.print_success(f"Created manager: {manager.get_full_name()}")
# Create or get staff member
staff_email = f"staff@{hospital.name.lower().replace(' ', '')}.test"
staff_exists = Staff.objects.filter(email=staff_email).exists()
if staff_exists:
staff = Staff.objects.get(email=staff_email)
self.print_info(f"Using existing staff member: {staff.get_full_name()}")
else:
# Generate unique employee_id
emp_id = f"STF-{secrets.token_hex(4).upper()}"
staff = Staff.objects.create(
hospital=hospital,
department=department,
report_to=manager,
email=staff_email,
employee_id=emp_id,
staff_type='nurse',
first_name="Omar",
last_name="Al-Harbi",
first_name_ar="عمر",
last_name_ar="الحربي",
job_title="Nurse",
specialization="Patient Care",
phone="+966513333333",
status='active'
)
self.print_success(f"Created staff member: {staff.get_full_name()}")
self.print_success(
f"Staff hierarchy: "
f"{staff.get_full_name()}{manager.get_full_name()}"
f"{department_head.get_full_name()} → Admin"
)
return staff, manager, department_head, admin
def verify_explanation_state(self, explanation, expected_state):
"""Verify explanation is in expected state"""
from apps.complaints.models import ComplaintExplanation
# Refresh from database
explanation = ComplaintExplanation.objects.get(id=explanation.id)
is_used = explanation.is_used
is_overdue = explanation.is_overdue
has_reminder = explanation.reminder_sent_at is not None
has_escalation = explanation.escalated_to_manager is not None
if expected_state == 'submitted':
if is_used and not is_overdue:
self.print_success(
f"Explanation submitted successfully "
f"(is_used={is_used}, is_overdue={is_overdue})"
)
return True
else:
self.print_error(
f"Explanation not submitted as expected "
f"(is_used={is_used}, is_overdue={is_overdue})"
)
return False
elif expected_state == 'pending':
if not is_used and not is_overdue and not has_reminder:
self.print_success(
f"Explanation pending correctly "
f"(is_used={is_used}, is_overdue={is_overdue}, reminder={has_reminder})"
)
return True
else:
self.print_error(
f"Explanation not in pending state "
f"(is_used={is_used}, is_overdue={is_overdue}, reminder={has_reminder})"
)
return False
elif expected_state == 'reminded':
if not is_used and has_reminder and not has_escalation:
self.print_success(
f"Reminder sent correctly "
f"(is_used={is_used}, has_reminder={has_reminder}, escalated={has_escalation})"
)
return True
else:
self.print_error(
f"Reminder not sent correctly "
f"(is_used={is_used}, has_reminder={has_reminder}, escalated={has_escalation})"
)
return False
elif expected_state == 'overdue':
if not is_used and is_overdue and has_escalation:
self.print_success(
f"Explanation overdue and escalated correctly "
f"(is_used={is_used}, is_overdue={is_overdue}, escalated={has_escalation})"
)
return True
else:
self.print_error(
f"Explanation not overdue/escalated correctly "
f"(is_used={is_used}, is_overdue={is_overdue}, escalated={has_escalation})"
)
return False
else:
self.print_error(f"Unknown expected state: {expected_state}")
return False
def print_summary(self, total_steps, successful_steps):
"""Print test summary"""
elapsed = time.time() - self.start_time
print("\n" + "=" * 80)
print(" TEST SUMMARY")
print("=" * 80)
print(f" Total Steps: {total_steps}")
print(f" Successful: {successful_steps}")
print(f" Failed: {total_steps - successful_steps}")
print(f" Elapsed Time: {elapsed:.1f}s")
if successful_steps == total_steps:
print("\n ✓✓✓ ALL TESTS PASSED ✓✓✓")
else:
print(f"\n ✗✗✗ {total_steps - successful_steps} TEST(S) FAILED ✗✗✗")
print("=" * 80 + "\n")