# Appointments App - Advanced Features Implementation Plan ## Phases 10-15: Smart Scheduling, Queue Intelligence & Integration **Project**: Hospital Management System v4 **App**: Appointments **Date**: 2025-01-10 **Status**: Planning Phase **Based on**: a_q_w.md recommendations --- ## Executive Summary This plan outlines the implementation of advanced features for the appointments app, building upon the foundation established in Phases 1-7. These enhancements will transform the system from a basic appointment management tool into an intelligent, automated healthcare scheduling platform. ### Strategic Objectives 1. **Reduce No-Shows by 30%** through smart scheduling and predictive analytics 2. **Decrease Wait Times by 40%** via intelligent queue management 3. **Increase Slot Utilization by 25%** through automated waitlist fulfillment 4. **Improve Patient Satisfaction by 50%** with multi-channel notifications 5. **Enable Seamless Integration** with EHR and insurance systems ### Current State vs. Target State | Feature | Current State | Target State | |---------|---------------|--------------| | Scheduling | Manual, basic availability | AI-powered, multi-factor optimization | | Queue Management | Basic FIFO with priority | Real-time, dynamic positioning | | Waitlist | Manual contact, basic tracking | Automated fulfillment, intelligent matching | | Notifications | Email only, basic templates | Multi-channel (SMS/Email/Push/Voice) | | Integration | None | EHR, Insurance, HL7/FHIR ready | | Patient Portal | Admin-only scheduling | Self-service with real-time availability | --- ## Phase 10: Smart Scheduling Engine **Priority**: HIGH **Estimated Time**: 15-20 hours **Dependencies**: Phases 1-7 completed ### Overview Implement an intelligent scheduling system that considers multiple factors to optimize appointment placement, reduce no-shows, and improve resource utilization. ### 10.1 Database Schema Enhancements #### New Models ```python # appointments/models.py additions class SchedulingPreference(models.Model): """Store patient scheduling preferences and patterns""" tenant = models.ForeignKey('core.Tenant', on_delete=models.CASCADE) patient = models.OneToOneField('patients.PatientProfile', on_delete=models.CASCADE) # Preference data preferred_days = models.JSONField(default=list) # ['Monday', 'Wednesday'] preferred_times = models.JSONField(default=list) # ['morning', 'afternoon'] preferred_providers = models.ManyToManyField('accounts.User', blank=True) # Behavioral data average_no_show_rate = models.DecimalField(max_digits=5, decimal_places=2, default=0) average_late_arrival_minutes = models.IntegerField(default=0) total_appointments = models.IntegerField(default=0) completed_appointments = models.IntegerField(default=0) # Geographic data home_address = models.TextField(blank=True) travel_time_to_clinic = models.DurationField(null=True, blank=True) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class AppointmentPriorityRule(models.Model): """Define rules for appointment prioritization""" tenant = models.ForeignKey('core.Tenant', on_delete=models.CASCADE) name = models.CharField(max_length=100) description = models.TextField() # Rule conditions appointment_types = models.ManyToManyField('AppointmentTemplate', blank=True) specialties = models.JSONField(default=list) diagnosis_codes = models.JSONField(default=list) # ICD-10 codes # Priority scoring base_priority_score = models.IntegerField(default=0) urgency_multiplier = models.DecimalField(max_digits=3, decimal_places=2, default=1.0) # Time constraints max_wait_days = models.IntegerField(null=True, blank=True) requires_same_day = models.BooleanField(default=False) is_active = models.BooleanField(default=True) created_at = models.DateTimeField(auto_now_add=True) class SchedulingMetrics(models.Model): """Track scheduling performance metrics""" tenant = models.ForeignKey('core.Tenant', on_delete=models.CASCADE) provider = models.ForeignKey('accounts.User', on_delete=models.CASCADE) date = models.DateField() # Utilization metrics total_slots = models.IntegerField(default=0) booked_slots = models.IntegerField(default=0) completed_appointments = models.IntegerField(default=0) no_shows = models.IntegerField(default=0) cancellations = models.IntegerField(default=0) # Time metrics average_appointment_duration = models.DurationField(null=True) average_wait_time = models.DurationField(null=True) total_overtime_minutes = models.IntegerField(default=0) # Calculated fields utilization_rate = models.DecimalField(max_digits=5, decimal_places=2, default=0) no_show_rate = models.DecimalField(max_digits=5, decimal_places=2, default=0) created_at = models.DateTimeField(auto_now_add=True) class Meta: unique_together = ['tenant', 'provider', 'date'] ``` ### 10.2 Smart Scheduler Implementation #### Core Scheduling Logic ```python # appointments/scheduling/smart_scheduler.py from datetime import datetime, timedelta from typing import List, Dict, Optional from django.db.models import Q, Avg, Count from django.utils import timezone import numpy as np class SmartScheduler: """ Intelligent appointment scheduling engine with multi-factor optimization. """ def __init__(self, tenant): self.tenant = tenant self.weights = { 'provider_availability': 0.30, 'patient_priority': 0.25, 'no_show_risk': 0.20, 'geographic_proximity': 0.15, 'patient_preference': 0.10 } def find_optimal_slots( self, patient, provider, appointment_type, preferred_dates: List[datetime], duration_minutes: int = 30, max_results: int = 10 ) -> List[Dict]: """ Find optimal appointment slots using multi-factor analysis. Returns: List of slot dictionaries with scores and metadata """ # Step 1: Get base available slots base_slots = self._get_available_slots( provider, preferred_dates, duration_minutes ) if not base_slots: return [] # Step 2: Score each slot scored_slots = [] for slot in base_slots: score = self._calculate_slot_score( slot, patient, provider, appointment_type ) scored_slots.append({ 'datetime': slot['start'], 'end_datetime': slot['end'], 'provider': provider, 'score': score, 'factors': score['breakdown'] }) # Step 3: Sort by score and return top results scored_slots.sort(key=lambda x: x['score']['total'], reverse=True) return scored_slots[:max_results] def _get_available_slots( self, provider, date_range: List[datetime], duration_minutes: int ) -> List[Dict]: """Get all available time slots for provider.""" from appointments.models import SlotAvailability, AppointmentRequest slots = [] duration = timedelta(minutes=duration_minutes) for date in date_range: # Get provider's availability for this date availabilities = SlotAvailability.objects.filter( tenant=self.tenant, provider=provider, date=date.date(), is_active=True, is_blocked=False ) for availability in availabilities: # Calculate time slots within availability window current_time = datetime.combine( availability.date, availability.start_time ) end_time = datetime.combine( availability.date, availability.end_time ) while current_time + duration <= end_time: # Check if slot is already booked if not self._is_slot_booked(provider, current_time, duration): slots.append({ 'start': current_time, 'end': current_time + duration, 'availability_id': availability.id }) current_time += timedelta(minutes=availability.duration_minutes or 30) return slots def _is_slot_booked( self, provider, start_time: datetime, duration: timedelta ) -> bool: """Check if a time slot is already booked.""" from appointments.models import AppointmentRequest end_time = start_time + duration overlapping = AppointmentRequest.objects.filter( tenant=self.tenant, provider=provider, scheduled_datetime__lt=end_time, scheduled_end_datetime__gt=start_time, status__in=['PENDING', 'CONFIRMED', 'CHECKED_IN', 'IN_PROGRESS'] ).exists() return overlapping def _calculate_slot_score( self, slot: Dict, patient, provider, appointment_type ) -> Dict: """Calculate multi-factor score for a slot.""" scores = {} # Factor 1: Provider availability (buffer time consideration) scores['provider_availability'] = self._score_provider_availability( provider, slot['start'] ) # Factor 2: Patient priority scores['patient_priority'] = self._score_patient_priority( patient, appointment_type ) # Factor 3: No-show risk mitigation scores['no_show_risk'] = self._score_no_show_risk( patient, slot['start'] ) # Factor 4: Geographic proximity scores['geographic_proximity'] = self._score_geographic_proximity( patient, slot['start'] ) # Factor 5: Patient preference alignment scores['patient_preference'] = self._score_patient_preference( patient, provider, slot['start'] ) # Calculate weighted total total_score = sum( scores[factor] * self.weights[factor] for factor in scores ) return { 'total': round(total_score, 2), 'breakdown': scores } def _score_provider_availability( self, provider, slot_time: datetime ) -> float: """Score based on provider's schedule density.""" # Check appointments before and after this slot buffer_time = timedelta(minutes=15) appointments_nearby = AppointmentRequest.objects.filter( tenant=self.tenant, provider=provider, scheduled_datetime__gte=slot_time - buffer_time, scheduled_datetime__lte=slot_time + buffer_time, status__in=['CONFIRMED', 'CHECKED_IN', 'IN_PROGRESS'] ).count() # Prefer slots with some buffer time if appointments_nearby == 0: return 100.0 # Ideal - plenty of buffer elif appointments_nearby == 1: return 75.0 # Good - some buffer else: return 50.0 # Acceptable - tight schedule def _score_patient_priority( self, patient, appointment_type ) -> float: """Score based on clinical priority.""" from appointments.models import AppointmentPriorityRule # Check if there are priority rules for this appointment type rules = AppointmentPriorityRule.objects.filter( tenant=self.tenant, is_active=True, appointment_types=appointment_type ).first() if rules: return min(rules.base_priority_score, 100.0) return 50.0 # Default priority def _score_no_show_risk( self, patient, slot_time: datetime ) -> float: """Score based on predicted no-show probability.""" try: prefs = SchedulingPreference.objects.get( tenant=self.tenant, patient=patient ) # Lower score for higher no-show risk no_show_rate = float(prefs.average_no_show_rate) return max(0, 100 - (no_show_rate * 100)) except SchedulingPreference.DoesNotExist: return 75.0 # Neutral score for new patients def _score_geographic_proximity( self, patient, slot_time: datetime ) -> float: """Score based on travel time and time of day.""" try: prefs = SchedulingPreference.objects.get( tenant=self.tenant, patient=patient ) if not prefs.travel_time_to_clinic: return 50.0 # Prefer slots that allow comfortable travel time hour = slot_time.hour travel_minutes = prefs.travel_time_to_clinic.total_seconds() / 60 # Morning rush hour (7-9 AM) if 7 <= hour <= 9: penalty = travel_minutes * 0.5 # Evening rush hour (4-6 PM) elif 16 <= hour <= 18: penalty = travel_minutes * 0.5 else: penalty = 0 return max(0, 100 - penalty) except SchedulingPreference.DoesNotExist: return 50.0 def _score_patient_preference( self, patient, provider, slot_time: datetime ) -> float: """Score based on patient's historical preferences.""" try: prefs = SchedulingPreference.objects.get( tenant=self.tenant, patient=patient ) score = 50.0 # Base score # Check day preference day_name = slot_time.strftime('%A') if day_name in prefs.preferred_days: score += 25.0 # Check time preference hour = slot_time.hour if hour < 12 and 'morning' in prefs.preferred_times: score += 15.0 elif 12 <= hour < 17 and 'afternoon' in prefs.preferred_times: score += 15.0 elif hour >= 17 and 'evening' in prefs.preferred_times: score += 15.0 # Check provider preference if provider in prefs.preferred_providers.all(): score += 10.0 return min(score, 100.0) except SchedulingPreference.DoesNotExist: return 50.0 def apply_priority_routing( self, slots: List[Dict], patient, appointment_type ) -> List[Dict]: """Route high-priority patients to earliest available slots.""" from appointments.models import AppointmentPriorityRule # Check if this is a high-priority appointment rules = AppointmentPriorityRule.objects.filter( tenant=self.tenant, is_active=True, appointment_types=appointment_type, base_priority_score__gte=80 # High priority threshold ).first() if rules and rules.requires_same_day: # Filter to same-day slots only today = timezone.now().date() slots = [s for s in slots if s['datetime'].date() == today] if rules: # Sort by earliest time for high-priority slots.sort(key=lambda x: x['datetime']) return slots def predict_no_show_probability(self, patient, slot_time: datetime) -> float: """ Predict probability of no-show using historical data. Returns: Float between 0 and 1 representing no-show probability """ try: prefs = SchedulingPreference.objects.get( tenant=self.tenant, patient=patient ) # Base probability from historical rate base_prob = float(prefs.average_no_show_rate) / 100 # Adjust based on time factors hour = slot_time.hour day_of_week = slot_time.weekday() # Early morning appointments have higher no-show rates if hour < 8: base_prob *= 1.3 # Monday appointments have higher no-show rates if day_of_week == 0: base_prob *= 1.2 # Friday afternoon appointments have higher no-show rates if day_of_week == 4 and hour >= 14: base_prob *= 1.15 return min(base_prob, 1.0) except SchedulingPreference.DoesNotExist: return 0.15 # Default 15% no-show rate for new patients ``` ### 10.3 Scheduling Utilities ```python # appointments/scheduling/utils.py from datetime import datetime, timedelta from typing import List, Dict from django.db.models import Avg, Count, Q from django.utils import timezone class SchedulingAnalytics: """Analytics and reporting for scheduling performance.""" @staticmethod def calculate_provider_utilization(provider, date_range_start, date_range_end): """Calculate provider utilization rate.""" from appointments.models import SlotAvailability, AppointmentRequest # Total available slots total_slots = SlotAvailability.objects.filter( provider=provider, date__gte=date_range_start, date__lte=date_range_end, is_active=True ).aggregate( total=Count('id') )['total'] or 0 # Booked appointments booked = AppointmentRequest.objects.filter( provider=provider, scheduled_datetime__date__gte=date_range_start, scheduled_datetime__date__lte=date_range_end, status__in=['CONFIRMED', 'COMPLETED', 'IN_PROGRESS'] ).count() if total_slots == 0: return 0.0 return round((booked / total_slots) * 100, 2) @staticmethod def calculate_no_show_rate(provider, date_range_start, date_range_end): """Calculate no-show rate for provider.""" from appointments.models import AppointmentRequest total = AppointmentRequest.objects.filter( provider=provider, scheduled_datetime__date__gte=date_range_start, scheduled_datetime__date__lte=date_range_end ).count() no_shows = AppointmentRequest.objects.filter( provider=provider, scheduled_datetime__date__gte=date_range_start, scheduled_datetime__date__lte=date_range_end, status='NO_SHOW' ).count() if total == 0: return 0.0 return round((no_shows / total) * 100, 2) @staticmethod def update_patient_scheduling_preferences(patient): """Update patient preferences based on historical data.""" from appointments.models import AppointmentRequest, SchedulingPreference # Get patient's appointment history appointments = AppointmentRequest.objects.filter( patient=patient ).order_by('-scheduled_datetime') if not appointments.exists(): return None # Calculate metrics total = appointments.count() completed = appointments.filter(status='COMPLETED').count() no_shows = appointments.filter(status='NO_SHOW').count() # Extract patterns preferred_days = list(set([ appt.scheduled_datetime.strftime('%A') for appt in appointments.filter(status='COMPLETED')[:20] ])) preferred_times = [] for appt in appointments.filter(status='COMPLETED')[:20]: hour = appt.scheduled_datetime.hour if hour < 12: preferred_times.append('morning') elif hour < 17: preferred_times.append('afternoon') else: preferred_times.append('evening') preferred_times = list(set(preferred_times)) # Update or create preferences prefs, created = SchedulingPreference.objects.update_or_create( tenant=patient.tenant, patient=patient, defaults={ 'preferred_days': preferred_days, 'preferred_times': preferred_times, 'total_appointments': total, 'completed_appointments': completed, 'average_no_show_rate': round((no_shows / total) * 100, 2) if total > 0 else 0 } ) return prefs class ConflictDetector: """Detect and resolve scheduling conflicts.""" @staticmethod def check_conflicts(provider, start_time, end_time, exclude_appointment_id=None): """Check for scheduling conflicts.""" from appointments.models import AppointmentRequest conflicts = AppointmentRequest.objects.filter( provider=provider, scheduled_datetime__lt=end_time, scheduled_end_datetime__gt=start_time, status__in=['PENDING', 'CONFIRMED', 'CHECKED_IN', 'IN_PROGRESS'] ) if exclude_appointment_id: conflicts = conflicts.exclude(id=exclude_appointment_id) return list(conflicts) @staticmethod def suggest_alternative_slots(provider, original_time, duration_minutes=30, count=5): """Suggest alternative slots when conflict exists.""" from appointments.scheduling.smart_scheduler import SmartScheduler scheduler = SmartScheduler(provider.tenant) # Generate date range (next 7 days) date_range = [ original_time + timedelta(days=i) for i in range(7) ] # Find available slots slots = scheduler._get_available_slots( provider, date_range, duration_minutes ) # Return closest alternatives slots.sort(key=lambda x: abs((x['start'] - original_time).total_seconds())) return slots[:count] ``` ### 10.4 API Endpoints ```python # appointments/api/scheduling_views.py from rest_framework import viewsets, status from rest_framework.decorators import action from rest_framework.response import Response from django.utils import timezone from datetime import datetime, timedelta from appointments.scheduling.smart_scheduler import SmartScheduler from appointments.scheduling.utils import SchedulingAnalytics, ConflictDetector class SmartSchedulingViewSet(viewsets.ViewSet): """API endpoints for smart scheduling features.""" @action(detail=False, methods=['post']) def find_optimal_slots(self, request): """ Find optimal appointment slots using AI. POST /api/appointments/scheduling/find-optimal-slots/ { "patient_id": 123, "provider_id": 456, "appointment_type_id": 789, "preferred_dates": ["2025-01-15", "2025-01-16"], "duration_minutes": 30 } """ # Extract parameters patient_id = request.data.get('patient_id') provider_id = request.data.get('provider_id') appointment_type_id = request.data.get('appointment_type_id') preferred_dates = request.data.get('preferred_dates', []) duration_minutes = request.data.get('duration_minutes', 30) # Validate inputs if not all([patient_id, provider_id, appointment_type_id]): return Response( {'error': 'Missing required parameters'}, status=status.HTTP_400_BAD_REQUEST ) # Get objects from patients.models import PatientProfile from accounts.models import User from appointments.models import AppointmentTemplate try: patient = PatientProfile.objects.get(id=patient_id) provider = User.objects.get(id=provider_id) appointment_type = AppointmentTemplate.objects.get(id=appointment_type_id) except Exception as e: return Response( {'error': str(e)}, status=status.HTTP_404_NOT_FOUND ) # Convert date strings to datetime objects date_objects = [ datetime.strptime(date_str, '%Y-%m-%d') for date_str in preferred_dates ] # Find optimal slots scheduler = SmartScheduler(request.user.tenant) slots = scheduler.find_optimal_slots( patient=patient, provider=provider, appointment_type=appointment_type, preferred_dates=date_objects, duration_minutes=duration_minutes ) # Format response return Response({ 'slots': [ { 'datetime': slot['datetime'].isoformat(), 'end_datetime': slot['end_datetime'].isoformat(), 'provider_id': slot['provider'].id, 'provider_name': slot['provider'].get_full_name(), 'score': slot['score']['total'], 'score_breakdown': slot['score']['breakdown'] } for slot in slots ], 'count': len(slots) }) @action(detail=False, methods=['post']) def check_conflicts(self, request): """ Check for scheduling conflicts. POST /api/appointments/scheduling/check-conflicts/ { "provider_id": 456, "start_time": "2025-01-15T10:00:00", "end_time": "2025-01-15T10:30:00", "exclude_appointment_id": 123 } """ provider_id = request.data.get('provider_id') start_time = request.data.get('start_time') end_time = request.data.get('end_time') exclude_id = request.data.get('exclude_appointment_id') # Get provider from accounts.models import User try: provider = User.objects.get(id=provider_id) except User.DoesNotExist: return Response( {'error': 'Provider not found'}, status=status.HTTP_404_NOT_FOUND ) # Parse times start_dt = datetime.fromisoformat(start_time) end_dt = datetime.fromisoformat(end_time) # Check conflicts conflicts = ConflictDetector.check_conflicts( provider, start_dt, end_dt, exclude_id ) if conflicts: # Suggest alternatives alternatives = ConflictDetector.suggest_alternative_slots( provider, start_dt, duration_minutes=int((end_dt - start_dt).total_seconds() / 60) ) return Response({ 'has_conflict': True, 'conflicts': [ { 'id': c.id, 'patient': c.patient.get_full_name(), 'start': c.scheduled_datetime.isoformat(), 'end': c.scheduled_end_datetime.isoformat() } for c in conflicts ], 'alternative_slots': [ { 'start': alt['start'].isoformat(), 'end': alt['end'].isoformat() } for alt in alternatives ] }) return Response({'has_conflict': False}) @action(detail=False, methods=['get']) def provider_analytics(self, request): """ Get scheduling analytics for a provider. GET /api/appointments/scheduling/provider-analytics/?provider_id=456&days=30 """ provider_id = request.query_params.get('provider_id') days = int(request.query_params.get('days', 30)) from accounts.models import User try: provider = User.objects.get(id=provider_id) except User.DoesNotExist: return Response( {'error': 'Provider not found'}, status=status.HTTP_404_NOT_FOUND ) # Calculate date range end_date = timezone.now().date() start_date = end_date - timedelta(days=days) # Get analytics utilization = SchedulingAnalytics.calculate_provider_utilization( provider, start_date, end_date ) no_show_rate = SchedulingAnalytics.calculate_no_show_rate( provider, start_date, end_date ) return Response({ 'provider_id': provider.id, 'provider_name': provider.get_full_name(), 'period': { 'start': start_date.isoformat(), 'end': end_date.isoformat(), 'days': days }, 'metrics': { 'utilization_rate': utilization, 'no_show_rate': no_show_rate } }) ``` ### 10.5 Implementation Checklist - [x] **Step 1: Database Changes (models.py)** ✅ COMPLETE - [x] Create SchedulingPreference model - [x] Create AppointmentPriorityRule model - [x] Create SchedulingMetrics model - [x] Run migrations - [x] Add indexes for performance - [x] **Step 2: Core Logic (scheduling/)** ✅ COMPLETE - [x] Implement SmartScheduler class - [x] Implement multi-factor scoring algorithm - [x] Implement priority routing logic - [x] Implement no-show prediction - [x] Implement geographic optimization - [x] **Step 3: Utilities (utils.py)** ✅ COMPLETE - [x] Implement SchedulingAnalytics class - [x] Implement ConflictDetector class - [x] Create preference update utilities - [x] Existing utils already contain required functionality - [x] **Step 4: API Endpoints (api/)** ✅ COMPLETE - [x] Create SmartSchedulingViewSet - [x] Add find_optimal_slots endpoint - [x] Add check_conflicts endpoint - [x] Add provider_analytics endpoint - [x] Add update_patient_preferences endpoint - [x] **Step 5: Admin Interface (admin.py)** ✅ COMPLETE - [x] Register SchedulingPreference in admin - [x] Register AppointmentPriorityRule in admin - [x] Register SchedulingMetrics in admin - [x] **Step 6: Views (views.py)** ✅ COMPLETE - [x] SmartSchedulingView - Main scheduling interface - [x] find_optimal_slots_view - HTMX slot finder - [x] scheduling_analytics_view - Analytics dashboard - [x] check_scheduling_conflicts_view - HTMX conflict checker - [x] update_patient_preferences_view - Manual preference update - [x] scheduling_metrics_dashboard - Metrics dashboard - [x] **Step 7: URLs (urls.py)** ✅ COMPLETE - [x] /appointments/scheduling/ - Main interface - [x] /appointments/scheduling/find-optimal-slots/ - Find slots - [x] /appointments/scheduling/check-conflicts/ - Check conflicts - [x] /appointments/scheduling/analytics/ - Analytics - [x] /appointments/scheduling/metrics/ - Metrics dashboard - [x] /appointments/scheduling/patient//update-preferences/ - Update prefs - [x] **Step 8: Templates** ✅ COMPLETE (8/8 templates) - [x] **Main Templates (4/4):** - [x] `smart_scheduling.html` - Main scheduling interface with search form - [x] `analytics.html` - Provider performance analytics dashboard - [x] `metrics_dashboard.html` - Detailed metrics with time-series data - [x] `conflicts.html` - Standalone conflict checking page - [x] **Partial Templates (4/4):** - [x] `partials/optimal_slots.html` - AI-ranked slot results with score breakdown - [x] `partials/conflicts.html` - Conflict detection with alternative suggestions - [x] `partials/provider_metrics.html` - Reusable provider metrics card - [x] `partials/slot_score_breakdown.html` - Detailed scoring visualization - [ ] **Testing** (Future) - [ ] Unit tests for SmartScheduler - [ ] Unit tests for scoring algorithms - [ ] Integration tests for API endpoints - [ ] Performance tests for slot finding - [ ] **Documentation** (Future) - [ ] API documentation - [ ] Algorithm documentation - [ ] User guide for scheduling features ### 10.6 Success Criteria - ✅ Slot finding returns results in <2 seconds for 30-day range - ✅ Multi-factor scoring produces measurable improvement in slot utilization - ✅ No-show prediction accuracy >70% - ✅ API endpoints handle 100+ concurrent requests - ✅ Patient preferences auto-update after each appointment - ✅ Conflict detection catches 100% of overlapping appointments --- ## Phase 11: Advanced Queue Management **Priority**: HIGH **Estimated Time**: 12-15 hours **Dependencies**: Phase 10 completed ### Overview Enhance the queue management system with real-time calculations, dynamic positioning, WebSocket support, and advanced analytics. ### 11.1 Database Schema Enhancements ```python # appointments/models.py additions class QueueConfiguration(models.Model): """Advanced queue configuration and rules""" queue = models.OneToOneField('WaitingQueue', on_delete=models.CASCADE) # Dynamic positioning rules use_dynamic_positioning = models.BooleanField(default=True) priority_weight = models.DecimalField(max_digits=3, decimal_places=2, default=0.5) wait_time_weight = models.DecimalField(max_digits=3, decimal_places=2, default=0.3) appointment_time_weight = models.DecimalField(max_digits=3, decimal_places=2, default=0.2) # Capacity management enable_overflow_queue = models.BooleanField(default=False) overflow_threshold = models.IntegerField(default=10) # Wait time estimation use_historical_data = models.BooleanField(default=True) default_service_time_minutes = models.IntegerField(default=20) # Real-time updates enable_websocket_updates = models.BooleanField(default=True) update_interval_seconds = models.IntegerField(default=30) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class QueueMetrics(models.Model): """Track queue performance metrics""" queue = models.ForeignKey('WaitingQueue', on_delete=models.CASCADE) date = models.DateField() hour = models.IntegerField() # 0-23 # Volume metrics total_entries = models.IntegerField(default=0) completed_entries = models.IntegerField(default=0) no_shows = models.IntegerField(default=0) # Time metrics average_wait_time_minutes = models.DecimalField(max_digits=6, decimal_places=2, default=0) max_wait_time_minutes = models.IntegerField(default=0) average_service_time_minutes = models.DecimalField(max_digits=6, decimal_places=2, default=0) # Queue state peak_queue_size = models.IntegerField(default=0) average_queue_size = models.DecimalField(max_digits=6, decimal_places=2, default=0) created_at = models.DateTimeField(auto_now_add=True) class Meta: unique_together = ['queue', 'date', 'hour'] indexes = [ models.Index(fields=['queue', 'date']), models.Index(fields=['date', 'hour']), ] ``` ### 11.2 Queue Engine Implementation ```python # appointments/queue/queue_engine.py from datetime import datetime, timedelta from typing import List, Dict, Optional from django.db.models import Avg, Count, F, Q from django.utils import timezone from decimal import Decimal class AdvancedQueueEngine: """ Advanced queue management with dynamic positioning and real-time updates. """ def __init__(self, queue): self.queue = queue try: self.config = queue.queueconfiguration except: # Create default configuration if not exists from appointments.models import QueueConfiguration self.config = QueueConfiguration.objects.create(queue=queue) def add_to_queue( self, patient, appointment=None, priority_score: float = 0.0, notes: str = "" ): """ Add patient to queue with intelligent positioning. """ from appointments.models import QueueEntry # Calculate optimal position position = self.calculate_optimal_position(patient, priority_score, appointment) # Estimate wait time wait_time = self.calculate_estimated_wait_time(position) # Create queue entry entry = QueueEntry.objects.create( queue=self.queue, patient=patient, appointment=appointment, queue_position=position, priority_score=priority_score, estimated_service_time=wait_time, notes=notes, status='WAITING' ) # Reposition other entries if needed if self.config.use_dynamic_positioning: self.reposition_queue_entries() # Broadcast update via WebSocket if self.config.enable_websocket_updates: self.broadcast_queue_update() # Update metrics self.update_queue_metrics() return entry def calculate_optimal_position( self, patient, priority_score: float, appointment=None ) -> int: """ Calculate optimal queue position using weighted factors. """ from appointments.models import QueueEntry if not self.config.use_dynamic_positioning: # Simple FIFO - add to end return self.queue.queueentry_set.filter( status='WAITING' ).count() + 1 # Get current waiting entries waiting_entries = self.queue.queueentry_set.filter( status='WAITING' ).order_by('queue_position') if not waiting_entries.exists(): return 1 # Calculate scores for each position best_position = len(waiting_entries) + 1 best_score = float('-inf') for pos in range(1, len(waiting_entries) + 2): score = self._calculate_position_score( pos, priority_score, appointment, waiting_entries ) if score > best_score: best_score = score best_position = pos return best_position def _calculate_position_score( self, position: int, priority_score: float, appointment, waiting_entries ) -> float: """Calculate score for a specific position.""" # Factor 1: Priority score (higher is better) priority_factor = priority_score * float(self.config.priority_weight) # Factor 2: Wait time fairness (earlier positions for longer waits) wait_time_factor = 0.0 if appointment and appointment.scheduled_datetime: time_until_appt = ( appointment.scheduled_datetime - timezone.now() ).total_seconds() / 60 # Normalize to 0-100 scale wait_time_factor = min(100, max(0, 100 - time_until_appt)) wait_time_factor *= float(self.config.wait_time_weight) # Factor 3: Appointment time proximity appt_time_factor = 0.0 if appointment: # Prefer positions that minimize disruption appt_time_factor = (100 - position) * float(self.config.appointment_time_weight) return priority_factor + wait_time_factor + appt_time_factor def calculate_estimated_wait_time(self, position: int) -> timedelta: """ Calculate estimated wait time based on position and historical data. """ from appointments.models import QueueMetrics if position <= 0: return timedelta(0) # Get average service time if self.config.use_historical_data: # Use historical data from last 7 days week_ago = timezone.now().date() - timedelta(days=7) avg_service_time = QueueMetrics.objects.filter( queue=self.queue, date__gte=week_ago ).aggregate( avg=Avg('average_service_time_minutes') )['avg'] or self.config.default_service_time_minutes else: avg_service_time = self.config.default_service_time_minutes # Calculate load factor current_size = self.queue.queueentry_set.filter( status='WAITING' ).count() load_factor = self.calculate_load_factor(current_size) # Estimate wait time base_wait = avg_service_time * (position - 1) adjusted_wait = base_wait * load_factor return timedelta(minutes=int(adjusted_wait)) def calculate_load_factor(self, current_size: int) -> float: """ Calculate load factor based on queue size. Higher load = longer wait times per patient. """ capacity = self.queue.max_queue_size or 10 utilization = current_size / capacity if utilization < 0.5: return 1.0 # Normal processing elif utilization < 0.75: return 1.2 # Slightly slower elif utilization < 0.9: return 1.5 # Significantly slower else: return 2.0 # Very slow, overloaded def reposition_queue_entries(self): """ Dynamically reposition all waiting entries based on current factors. """ from appointments.models import QueueEntry waiting_entries = list( self.queue.queueentry_set.filter( status='WAITING' ).select_related('patient', 'appointment') ) if not waiting_entries: return # Calculate scores for each entry scored_entries = [] for entry in waiting_entries: score = self._calculate_entry_priority_score(entry) scored_entries.append((entry, score)) # Sort by score (highest first) scored_entries.sort(key=lambda x: x[1], reverse=True) # Update positions for new_position, (entry, score) in enumerate(scored_entries, start=1): if entry.queue_position != new_position: entry.queue_position = new_position entry.save(update_fields=['queue_position']) def _calculate_entry_priority_score(self, entry) -> float: """Calculate priority score for repositioning.""" # Base priority score score = entry.priority_score or 0.0 # Add wait time factor wait_minutes = (timezone.now() - entry.joined_at).total_seconds() / 60 wait_factor = min(50, wait_minutes / 2) # Cap at 50 points # Add appointment proximity factor if entry.appointment and entry.appointment.scheduled_datetime: time_until = ( entry.appointment.scheduled_datetime - timezone.now() ).total_seconds() / 60 if time_until < 30: # Within 30 minutes proximity_factor = 30 elif time_until < 60: # Within 1 hour proximity_factor = 15 else: proximity_factor = 0 else: proximity_factor = 0 return score + wait_factor + proximity_factor def get_next_patient(self) -> Optional['QueueEntry']: """ Get the next patient to be served. """ from appointments.models import QueueEntry next_entry = self.queue.queueentry_set.filter( status='WAITING' ).order_by('queue_position').first() if next_entry: # Mark as called next_entry.mark_as_called() # Broadcast update if self.config.enable_websocket_updates: self.broadcast_queue_update() return next_entry def broadcast_queue_update(self): """ Broadcast queue update via WebSocket. """ # This will be implemented with Django Channels from channels.layers import get_channel_layer from asgiref.sync import async_to_sync try: channel_layer = get_channel_layer() if channel_layer: async_to_sync(channel_layer.group_send)( f'queue_{self.queue.id}', { 'type': 'queue_update', 'data': self.get_queue_status() } ) except Exception as e: # Log error but don't fail print(f"WebSocket broadcast error: {e}") def get_queue_status(self) -> Dict: """Get current queue status for broadcasting.""" from appointments.models import QueueEntry waiting = self.queue.queueentry_set.filter(status='WAITING') return { 'queue_id': self.queue.id, 'queue_name': self.queue.name, 'current_size': waiting.count(), 'max_size': self.queue.max_queue_size, 'average_wait_time': str(self.queue.calculate_wait_time()), 'is_accepting': self.queue.is_accepting_patients, 'entries': [ { 'id': entry.id, 'patient_name': entry.patient.get_full_name(), 'position': entry.queue_position, 'wait_time': str(entry.wait_time_minutes or 0), 'status': entry.status } for entry in waiting.order_by('queue_position')[:10] ], 'timestamp': timezone.now().isoformat() } def update_queue_metrics(self): """Update queue metrics for analytics.""" from appointments.models import QueueMetrics now = timezone.now() # Get or create metrics for current hour metrics, created = QueueMetrics.objects.get_or_create( queue=self.queue, date=now.date(), hour=now.hour, defaults={ 'total_entries': 0, 'completed_entries': 0, 'no_shows': 0, 'average_wait_time_minutes': 0, 'max_wait_time_minutes': 0, 'average_service_time_minutes': 0, 'peak_queue_size': 0, 'average_queue_size': 0 } ) # Update current metrics current_size = self.queue.queueentry_set.filter( status='WAITING' ).count() metrics.peak_queue_size = max(metrics.peak_queue_size, current_size) metrics.save() ``` ### 11.3 WebSocket Consumer for Real-time Updates ```python # appointments/consumers.py import json from channels.generic.websocket import AsyncWebsocketConsumer from channels.db import database_sync_to_async class QueueConsumer(AsyncWebsocketConsumer): """ WebSocket consumer for real-time queue updates. """ async def connect(self): self.queue_id = self.scope['url_route']['kwargs']['queue_id'] self.queue_group_name = f'queue_{self.queue_id}' # Join queue group await self.channel_layer.group_add( self.queue_group_name, self.channel_name ) await self.accept() # Send initial queue status status = await self.get_queue_status() await self.send(text_data=json.dumps(status)) async def disconnect(self, close_code): # Leave queue group await self.channel_layer.group_discard( self.queue_group_name, self.channel_name ) async def receive(self, text_data): """Handle messages from WebSocket.""" data = json.dumps(text_data) message_type = data.get('type') if message_type == 'get_status': status = await self.get_queue_status() await self.send(text_data=json.dumps(status)) async def queue_update(self, event): """Handle queue update events from group.""" await self.send(text_data=json.dumps(event['data'])) @database_sync_to_async def get_queue_status(self): """Get current queue status.""" from appointments.models import WaitingQueue from appointments.queue.queue_engine import AdvancedQueueEngine try: queue = WaitingQueue.objects.get(id=self.queue_id) engine = AdvancedQueueEngine(queue) return engine.get_queue_status() except WaitingQueue.DoesNotExist: return {'error': 'Queue not found'} ``` ### 11.4 Implementation Checklist - [ ] **Database Changes** - [ ] Create QueueConfiguration model - [ ] Create QueueMetrics model - [ ] Run migrations - [ ] Add database indexes - [ ] **Queue Engine** - [ ] Implement AdvancedQueueEngine class - [ ] Implement dynamic positioning algorithm - [ ] Implement load factor calculations - [ ] Implement wait time estimation - [ ] **WebSocket Support** - [ ] Install Django Channels - [ ] Create QueueConsumer - [ ] Configure routing - [ ] Set up Redis for channel layer - [ ] **API Endpoints** - [ ] Create queue management endpoints - [ ] Add real-time status endpoint - [ ] Add metrics endpoint - [ ] **Frontend Integration** - [ ] Create WebSocket client - [ ] Add real-time queue display - [ ] Add queue management UI - [ ] **Testing** - [ ] Unit tests for queue engine - [ ] WebSocket connection tests - [ ] Load testing for concurrent updates ### 11.5 Success Criteria - ✅ Dynamic positioning reduces average wait time by 20% - ✅ WebSocket updates delivered in <1 second - ✅ System handles 50+ concurrent queue updates - ✅ Wait time estimates accurate within ±10 minutes - ✅ Queue metrics updated in real-time --- ## Phase 12: Intelligent Waitlist Automation **Priority**: HIGH **Estimated Time**: 10-12 hours **Dependencies**: Phases 10-11 completed ### Overview Implement automated waitlist fulfillment with intelligent matching, slot cancellation tracking, and multi-channel notifications. ### 12.1 Database Schema Enhancements ```python # appointments/models.py additions class SlotCancellation(models.Model): """Track cancelled slots for waitlist fulfillment""" tenant = models.ForeignKey('core.Tenant', on_delete=models.CASCADE) provider = models.ForeignKey('accounts.User', on_delete=models.CASCADE) original_appointment = models.ForeignKey( 'AppointmentRequest', on_delete=models.SET_NULL, null=True, related_name='cancellation_record' ) # Slot details cancelled_slot_start = models.DateTimeField() cancelled_slot_end = models.DateTimeField() appointment_type = models.ForeignKey('AppointmentTemplate', on_delete=models.PROTECT) # Cancellation info cancellation_time = models.DateTimeField(auto_now_add=True) cancellation_reason = models.TextField(blank=True) # Fulfillment tracking filled_by_waitlist = models.BooleanField(default=False) filled_by = models.ForeignKey( 'WaitingList', on_delete=models.SET_NULL, null=True, blank=True ) filled_at = models.DateTimeField(null=True, blank=True) # Notification tracking notifications_sent = models.IntegerField(default=0) patients_notified = models.ManyToManyField('patients.PatientProfile', blank=True) is_active = models.BooleanField(default=True) expires_at = models.DateTimeField() # Slot expires if not filled class Meta: indexes = [ models.Index(fields=['provider', 'cancelled_slot_start']), models.Index(fields=['is_active', 'expires_at']), ] # Add to existing WaitingList model class WaitingList(models.Model): # ... existing fields ... # Add flexibility scoring flexibility_score = models.IntegerField(default=0) # 0-100 # Add notification preferences notification_channels = models.JSONField(default=list) # ['sms', 'email', 'push'] # Add response tracking last_opportunity_offered = models.DateTimeField(null=True, blank=True) opportunities_offered = models.IntegerField(default=0) opportunities_accepted = models.IntegerField(default=0) opportunities_declined = models.IntegerField(default=0) ``` ### 12.2 Waitlist Engine Implementation ```python # appointments/waitlist/waitlist_engine.py from datetime import datetime, timedelta from typing import List, Dict, Optional from django.db.models import Q from django.utils import timezone class WaitlistEngine: """ Intelligent waitlist management with automated fulfillment. """ def __init__(self, tenant): self.tenant = tenant def process_cancellation(self, appointment): """ Process appointment cancellation and attempt waitlist fulfillment. """ from appointments.models import SlotCancellation # Create cancellation record cancellation = SlotCancellation.objects.create( tenant=self.tenant, provider=appointment.provider, original_appointment=appointment, cancelled_slot_start=appointment.scheduled_datetime, cancelled_slot_end=appointment.scheduled_end_datetime, appointment_type=appointment.appointment_type, cancellation_reason=appointment.cancellation_reason or '', expires_at=appointment.scheduled_datetime - timedelta(hours=2) ) # Attempt to fill from waitlist self.attempt_waitlist_fulfillment(cancellation) return cancellation def attempt_waitlist_fulfillment(self, cancellation): """ Attempt to fill cancelled slot from waitlist. """ # Find matching waitlist entries matches = self.find_matching_waitlist_entries(cancellation) if not matches: return False # Notify matches in priority order for waitlist_entry in matches[:5]: # Notify top 5 matches self.send_waitlist_opportunity(waitlist_entry, cancellation) cancellation.notifications_sent += 1 cancellation.patients_notified.add(waitlist_entry.patient) cancellation.save() return True def find_matching_waitlist_entries(self, cancellation) -> List: """ Find waitlist entries that match the cancelled slot. """ from appointments.models import WaitingList # Base query - active waitlist entries queryset = WaitingList.objects.filter( tenant=self.tenant, status='ACTIVE', appointment_type=cancellation.appointment_type ) # Filter by date range slot_date = cancellation.cancelled_slot_start.date() queryset = queryset.filter( Q(desired_date_range_start__lte=slot_date) & Q(desired_date_range_end__gte=slot_date) ) # Filter by provider preference (if specified) queryset = queryset.filter( Q(preferred_provider=cancellation.provider) | Q(preferred_provider__isnull=True) ) # Score and sort matches scored_matches = [] for entry in queryset: score = self.calculate_match_score(entry, cancellation) scored_matches.append((entry, score)) # Sort by score (highest first) and priority scored_matches.sort( key=lambda x: (x[1], x[0].priority, -x[0].created_at.timestamp()), reverse=True ) return [match[0] for match in scored_matches] def calculate_match_score(self, waitlist_entry, cancellation) -> float: """ Calculate how well a waitlist entry matches a cancelled slot. """ score = 0.0 # Factor 1: Provider match (30 points) if waitlist_entry.preferred_provider == cancellation.provider: score += 30.0 elif not waitlist_entry.preferred_provider: score += 15.0 # Neutral if no preference # Factor 2: Date preference (25 points) slot_date = cancellation.cancelled_slot_start.date() if waitlist_entry.preferred_date: days_diff = abs((slot_date - waitlist_entry.preferred_date).days) if days_diff == 0: score += 25.0 elif days_diff <= 3: score += 15.0 elif days_diff <= 7: score += 10.0 else: score += 12.5 # Neutral if no preference # Factor 3: Time preference (20 points) slot_hour = cancellation.cancelled_slot_start.hour if waitlist_entry.preferred_time: pref_hour = waitlist_entry.preferred_time.hour hour_diff = abs(slot_hour - pref_hour) if hour_diff <= 1: score += 20.0 elif hour_diff <= 2: score += 10.0 else: score += 10.0 # Neutral if no preference # Factor 4: Flexibility (15 points) score += (waitlist_entry.flexibility_score / 100) * 15.0 # Factor 5: Wait time (10 points) days_waiting = (timezone.now().date() - waitlist_entry.created_at.date()).days score += min(10.0, days_waiting / 3) # Max 10 points for 30+ days return score def send_waitlist_opportunity(self, waitlist_entry, cancellation): """ Send notification about available slot to waitlist patient. """ from appointments.notifications import NotificationEngine # Update waitlist entry waitlist_entry.last_opportunity_offered = timezone.now() waitlist_entry.opportunities_offered += 1 waitlist_entry.save() # Prepare notification context context = { 'patient': waitlist_entry.patient, 'provider': cancellation.provider, 'slot_datetime': cancellation.cancelled_slot_start, 'slot_end': cancellation.cancelled_slot_end, 'appointment_type': cancellation.appointment_type, 'expiration_time': cancellation.expires_at, 'response_url': self.generate_response_url(waitlist_entry, cancellation) } # Send via preferred channels notifier = NotificationEngine(self.tenant) for channel in waitlist_entry.notification_channels: if channel == 'sms': notifier.send_sms_waitlist_opportunity(context) elif channel == 'email': notifier.send_email_waitlist_opportunity(context) elif channel == 'push': notifier.send_push_waitlist_opportunity(context) def generate_response_url(self, waitlist_entry, cancellation) -> str: """Generate unique URL for patient to respond to opportunity.""" import hashlib # Create unique token token_string = f"{waitlist_entry.id}-{cancellation.id}-{timezone.now().timestamp()}" token = hashlib.sha256(token_string.encode()).hexdigest()[:32] # Store token (you'd need a ResponseToken model) # For now, return placeholder return f"/appointments/waitlist/respond/{token}/" def accept_waitlist_opportunity(self, waitlist_entry, cancellation): """ Process patient acceptance of waitlist opportunity. """ from appointments.models import AppointmentRequest # Create new appointment appointment = AppointmentRequest.objects.create( tenant=self.tenant, patient=waitlist_entry.patient, provider=cancellation.provider, appointment_type=cancellation.appointment_type, scheduled_datetime=cancellation.cancelled_slot_start, scheduled_end_datetime=cancellation.cancelled_slot_end, status='CONFIRMED', priority=waitlist_entry.priority, is_waitlist_conversion=True ) # Update cancellation record cancellation.filled_by_waitlist = True cancellation.filled_by = waitlist_entry cancellation.filled_at = timezone.now() cancellation.is_active = False cancellation.save() # Update waitlist entry waitlist_entry.status = 'SCHEDULED' waitlist_entry.scheduled_appointment = appointment waitlist_entry.opportunities_accepted += 1 waitlist_entry.save() return appointment def calculate_flexibility_score(self, waitlist_entry) -> int: """ Calculate flexibility score based on patient preferences. Score: 0 (inflexible) to 100 (very flexible) """ score = 50 # Base score # More flexible if no provider preference if not waitlist_entry.preferred_provider: score += 20 # More flexible if wide date range if waitlist_entry.desired_date_range_start and waitlist_entry.desired_date_range_end: days_range = ( waitlist_entry.desired_date_range_end - waitlist_entry.desired_date_range_start ).days if days_range > 30: score += 20 elif days_range > 14: score += 10 # More flexible if no time preference if not waitlist_entry.preferred_time: score += 10 return min(100, score) ``` ### 12.3 Implementation Checklist - [ ] **Database Changes** - [ ] Create SlotCancellation model - [ ] Add flexibility_score to WaitingList - [ ] Add notification_channels to WaitingList - [ ] Run migrations - [ ] **Waitlist Engine** - [ ] Implement WaitlistEngine class - [ ] Implement matching algorithm - [ ] Implement scoring system - [ ] Implement notification system - [ ] **Signal Integration** - [ ] Connect to appointment cancellation signal - [ ] Auto-trigger waitlist fulfillment - [ ] Update waitlist metrics - [ ] **API Endpoints** - [ ]