""" Base utilities for realistic scenario testing with time compression. Provides reusable functionality for: - Time-compressed simulation - SLA configuration setup - Staff hierarchy creation - Progress logging - Database verification """ import time from datetime import timedelta from django.utils import timezone from django.db import transaction class ScenarioTestBase: """Base class for realistic scenario testing""" def __init__(self, time_compression_ratio=2): """ Initialize scenario test base. Args: time_compression_ratio: 1 second of real time = X hours of system time Default: 2 (1s = 2h) """ self.time_compression_ratio = time_compression_ratio self.start_time = None self.step_number = 0 def print_header(self, title): """Print formatted header""" print("\n" + "=" * 80) print(f" {title}") print("=" * 80) self.start_time = time.time() def print_step(self, message, duration_seconds=0): """ Print step with optional sleep. Args: message: Step description duration_seconds: How long to wait (0 for no wait) """ self.step_number += 1 system_hours = duration_seconds * self.time_compression_ratio print(f"\n[Step {self.step_number}] {message}") if system_hours > 0: print(f" → Waiting {duration_seconds}s (simulates {system_hours:.0f} hours)") for i in range(1, duration_seconds + 1): time.sleep(1) if i % 5 == 0 or i == duration_seconds: print(f" [{i}/{duration_seconds}s] Simulated time: {i * self.time_compression_ratio:.0f} hours") else: print(" → Executing immediately") def print_success(self, message): """Print success message""" print(f" ✓ {message}") def print_warning(self, message): """Print warning message""" print(f" ⚠ {message}") def print_error(self, message): """Print error message""" print(f" ✗ {message}") def print_info(self, message): """Print info message""" print(f" ℹ {message}") def print_email_preview(self, subject, recipient, message_preview): """Print email preview""" print("\n ┌─ EMAIL PREVIEW ─────────────────────────────────────") print(f" │ To: {recipient}") print(f" │ Subject: {subject}") print(f" └─────────────────────────────────────────────────────") for line in message_preview.split('\n')[:10]: # First 10 lines print(f" │ {line}") if message_preview.count('\n') > 10: print(" │ ...") print(" └─────────────────────────────────────────────────────\n") def create_explanation_sla_config( self, hospital, response_hours=48, reminder_hours_before=12, auto_escalate_enabled=True, escalation_hours_overdue=0, max_escalation_levels=3, is_active=True ): """Create or update ExplanationSLAConfig for a hospital""" from apps.complaints.models import ExplanationSLAConfig config, created = ExplanationSLAConfig.objects.update_or_create( hospital=hospital, defaults={ 'response_hours': response_hours, 'reminder_hours_before': reminder_hours_before, 'auto_escalate_enabled': auto_escalate_enabled, 'escalation_hours_overdue': escalation_hours_overdue, 'max_escalation_levels': max_escalation_levels, 'is_active': is_active } ) if created: self.print_success(f"Created ExplanationSLAConfig: {response_hours}h response time") else: self.print_success(f"Updated ExplanationSLAConfig: {response_hours}h response time") return config def create_complaint_sla_config( self, hospital, severity='medium', priority='medium', sla_hours=72, reminder_hours_before=24, second_reminder_enabled=True, second_reminder_hours_before=6, is_active=True ): """Create or update ComplaintSLAConfig for a hospital""" from apps.complaints.models import ComplaintSLAConfig config, created = ComplaintSLAConfig.objects.update_or_create( hospital=hospital, severity=severity, priority=priority, defaults={ 'sla_hours': sla_hours, 'reminder_hours_before': reminder_hours_before, 'second_reminder_enabled': second_reminder_enabled, 'second_reminder_hours_before': second_reminder_hours_before, 'is_active': is_active } ) if created: self.print_success(f"Created ComplaintSLAConfig: {severity}/{priority} - {sla_hours}h SLA") else: self.print_success(f"Updated ComplaintSLAConfig: {severity}/{priority} - {sla_hours}h SLA") return config def create_staff_hierarchy( self, hospital, department=None ): """ Create staff hierarchy for testing escalation. Returns: tuple: (staff, manager, department_head, admin) """ from apps.organizations.models import Staff from apps.accounts.models import User import secrets # Get hospital admin user admin = User.objects.filter( hospital=hospital, groups__name='Hospital Admin' ).first() if not admin: # Create admin if not exists - use User.objects.create_user admin = User.objects.create_user( email=f"admin@{hospital.name.lower().replace(' ', '')}.test", password="test123", first_name="Hospital", last_name="Admin", hospital=hospital, phone="+966500000000" ) # Add to Hospital Admin group from django.contrib.auth.models import Group admin_group, _ = Group.objects.get_or_create(name='Hospital Admin') admin.groups.add(admin_group) self.print_success(f"Created hospital admin: {admin.email}") # Create or get department head dept_head_email = f"dept_head@{hospital.name.lower().replace(' ', '')}.test" dept_head_exists = Staff.objects.filter(email=dept_head_email).exists() if dept_head_exists: department_head = Staff.objects.get(email=dept_head_email) self.print_info(f"Using existing department head: {department_head.get_full_name()}") else: # Generate unique employee_id emp_id = f"DH-{secrets.token_hex(4).upper()}" department_head = Staff.objects.create( hospital=hospital, department=department, email=dept_head_email, employee_id=emp_id, staff_type='admin', first_name="Ahmed", last_name="Al-Farsi", first_name_ar="أحمد", last_name_ar="الفارسي", job_title="Department Head", specialization="Administration", phone="+966511111111", status='active' ) self.print_success(f"Created department head: {department_head.get_full_name()}") # Create or get manager mgr_email = f"manager@{hospital.name.lower().replace(' ', '')}.test" mgr_exists = Staff.objects.filter(email=mgr_email).exists() if mgr_exists: manager = Staff.objects.get(email=mgr_email) self.print_info(f"Using existing manager: {manager.get_full_name()}") else: # Generate unique employee_id emp_id = f"MGR-{secrets.token_hex(4).upper()}" manager = Staff.objects.create( hospital=hospital, department=department, report_to=department_head, email=mgr_email, employee_id=emp_id, staff_type='admin', first_name="Mohammed", last_name="Al-Rashid", first_name_ar="محمد", last_name_ar="الرشيد", job_title="Manager", specialization="Operations", phone="+966512222222", status='active' ) self.print_success(f"Created manager: {manager.get_full_name()}") # Create or get staff member staff_email = f"staff@{hospital.name.lower().replace(' ', '')}.test" staff_exists = Staff.objects.filter(email=staff_email).exists() if staff_exists: staff = Staff.objects.get(email=staff_email) self.print_info(f"Using existing staff member: {staff.get_full_name()}") else: # Generate unique employee_id emp_id = f"STF-{secrets.token_hex(4).upper()}" staff = Staff.objects.create( hospital=hospital, department=department, report_to=manager, email=staff_email, employee_id=emp_id, staff_type='nurse', first_name="Omar", last_name="Al-Harbi", first_name_ar="عمر", last_name_ar="الحربي", job_title="Nurse", specialization="Patient Care", phone="+966513333333", status='active' ) self.print_success(f"Created staff member: {staff.get_full_name()}") self.print_success( f"Staff hierarchy: " f"{staff.get_full_name()} → {manager.get_full_name()} → " f"{department_head.get_full_name()} → Admin" ) return staff, manager, department_head, admin def verify_explanation_state(self, explanation, expected_state): """Verify explanation is in expected state""" from apps.complaints.models import ComplaintExplanation # Refresh from database explanation = ComplaintExplanation.objects.get(id=explanation.id) is_used = explanation.is_used is_overdue = explanation.is_overdue has_reminder = explanation.reminder_sent_at is not None has_escalation = explanation.escalated_to_manager is not None if expected_state == 'submitted': if is_used and not is_overdue: self.print_success( f"Explanation submitted successfully " f"(is_used={is_used}, is_overdue={is_overdue})" ) return True else: self.print_error( f"Explanation not submitted as expected " f"(is_used={is_used}, is_overdue={is_overdue})" ) return False elif expected_state == 'pending': if not is_used and not is_overdue and not has_reminder: self.print_success( f"Explanation pending correctly " f"(is_used={is_used}, is_overdue={is_overdue}, reminder={has_reminder})" ) return True else: self.print_error( f"Explanation not in pending state " f"(is_used={is_used}, is_overdue={is_overdue}, reminder={has_reminder})" ) return False elif expected_state == 'reminded': if not is_used and has_reminder and not has_escalation: self.print_success( f"Reminder sent correctly " f"(is_used={is_used}, has_reminder={has_reminder}, escalated={has_escalation})" ) return True else: self.print_error( f"Reminder not sent correctly " f"(is_used={is_used}, has_reminder={has_reminder}, escalated={has_escalation})" ) return False elif expected_state == 'overdue': if not is_used and is_overdue and has_escalation: self.print_success( f"Explanation overdue and escalated correctly " f"(is_used={is_used}, is_overdue={is_overdue}, escalated={has_escalation})" ) return True else: self.print_error( f"Explanation not overdue/escalated correctly " f"(is_used={is_used}, is_overdue={is_overdue}, escalated={has_escalation})" ) return False else: self.print_error(f"Unknown expected state: {expected_state}") return False def print_summary(self, total_steps, successful_steps): """Print test summary""" elapsed = time.time() - self.start_time print("\n" + "=" * 80) print(" TEST SUMMARY") print("=" * 80) print(f" Total Steps: {total_steps}") print(f" Successful: {successful_steps}") print(f" Failed: {total_steps - successful_steps}") print(f" Elapsed Time: {elapsed:.1f}s") if successful_steps == total_steps: print("\n ✓✓✓ ALL TESTS PASSED ✓✓✓") else: print(f"\n ✗✗✗ {total_steps - successful_steps} TEST(S) FAILED ✗✗✗") print("=" * 80 + "\n")