hospital-management/APPOINTMENTS_ADVANCED_FEATURES_PLAN.md
Marwan Alwali 263292f6be update
2025-11-04 00:50:06 +03:00

62 KiB

Appointments App - Advanced Features Implementation Plan

Phases 10-15: Smart Scheduling, Queue Intelligence & Integration

Project: Hospital Management System v4
App: Appointments
Date: 2025-01-10
Status: Planning Phase
Based on: a_q_w.md recommendations


Executive Summary

This plan outlines the implementation of advanced features for the appointments app, building upon the foundation established in Phases 1-7. These enhancements will transform the system from a basic appointment management tool into an intelligent, automated healthcare scheduling platform.

Strategic Objectives

  1. Reduce No-Shows by 30% through smart scheduling and predictive analytics
  2. Decrease Wait Times by 40% via intelligent queue management
  3. Increase Slot Utilization by 25% through automated waitlist fulfillment
  4. Improve Patient Satisfaction by 50% with multi-channel notifications
  5. Enable Seamless Integration with EHR and insurance systems

Current State vs. Target State

Feature Current State Target State
Scheduling Manual, basic availability AI-powered, multi-factor optimization
Queue Management Basic FIFO with priority Real-time, dynamic positioning
Waitlist Manual contact, basic tracking Automated fulfillment, intelligent matching
Notifications Email only, basic templates Multi-channel (SMS/Email/Push/Voice)
Integration None EHR, Insurance, HL7/FHIR ready
Patient Portal Admin-only scheduling Self-service with real-time availability

Phase 10: Smart Scheduling Engine

Priority: HIGH
Estimated Time: 15-20 hours
Dependencies: Phases 1-7 completed

Overview

Implement an intelligent scheduling system that considers multiple factors to optimize appointment placement, reduce no-shows, and improve resource utilization.

10.1 Database Schema Enhancements

New Models

# appointments/models.py additions

class SchedulingPreference(models.Model):
    """Store patient scheduling preferences and patterns"""
    tenant = models.ForeignKey('core.Tenant', on_delete=models.CASCADE)
    patient = models.OneToOneField('patients.PatientProfile', on_delete=models.CASCADE)
    
    # Preference data
    preferred_days = models.JSONField(default=list)  # ['Monday', 'Wednesday']
    preferred_times = models.JSONField(default=list)  # ['morning', 'afternoon']
    preferred_providers = models.ManyToManyField('accounts.User', blank=True)
    
    # Behavioral data
    average_no_show_rate = models.DecimalField(max_digits=5, decimal_places=2, default=0)
    average_late_arrival_minutes = models.IntegerField(default=0)
    total_appointments = models.IntegerField(default=0)
    completed_appointments = models.IntegerField(default=0)
    
    # Geographic data
    home_address = models.TextField(blank=True)
    travel_time_to_clinic = models.DurationField(null=True, blank=True)
    
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)


class AppointmentPriorityRule(models.Model):
    """Define rules for appointment prioritization"""
    tenant = models.ForeignKey('core.Tenant', on_delete=models.CASCADE)
    name = models.CharField(max_length=100)
    description = models.TextField()
    
    # Rule conditions
    appointment_types = models.ManyToManyField('AppointmentTemplate', blank=True)
    specialties = models.JSONField(default=list)
    diagnosis_codes = models.JSONField(default=list)  # ICD-10 codes
    
    # Priority scoring
    base_priority_score = models.IntegerField(default=0)
    urgency_multiplier = models.DecimalField(max_digits=3, decimal_places=2, default=1.0)
    
    # Time constraints
    max_wait_days = models.IntegerField(null=True, blank=True)
    requires_same_day = models.BooleanField(default=False)
    
    is_active = models.BooleanField(default=True)
    created_at = models.DateTimeField(auto_now_add=True)


class SchedulingMetrics(models.Model):
    """Track scheduling performance metrics"""
    tenant = models.ForeignKey('core.Tenant', on_delete=models.CASCADE)
    provider = models.ForeignKey('accounts.User', on_delete=models.CASCADE)
    date = models.DateField()
    
    # Utilization metrics
    total_slots = models.IntegerField(default=0)
    booked_slots = models.IntegerField(default=0)
    completed_appointments = models.IntegerField(default=0)
    no_shows = models.IntegerField(default=0)
    cancellations = models.IntegerField(default=0)
    
    # Time metrics
    average_appointment_duration = models.DurationField(null=True)
    average_wait_time = models.DurationField(null=True)
    total_overtime_minutes = models.IntegerField(default=0)
    
    # Calculated fields
    utilization_rate = models.DecimalField(max_digits=5, decimal_places=2, default=0)
    no_show_rate = models.DecimalField(max_digits=5, decimal_places=2, default=0)
    
    created_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        unique_together = ['tenant', 'provider', 'date']

10.2 Smart Scheduler Implementation

Core Scheduling Logic

# appointments/scheduling/smart_scheduler.py

from datetime import datetime, timedelta
from typing import List, Dict, Optional
from django.db.models import Q, Avg, Count
from django.utils import timezone
import numpy as np

class SmartScheduler:
    """
    Intelligent appointment scheduling engine with multi-factor optimization.
    """
    
    def __init__(self, tenant):
        self.tenant = tenant
        self.weights = {
            'provider_availability': 0.30,
            'patient_priority': 0.25,
            'no_show_risk': 0.20,
            'geographic_proximity': 0.15,
            'patient_preference': 0.10
        }
    
    def find_optimal_slots(
        self,
        patient,
        provider,
        appointment_type,
        preferred_dates: List[datetime],
        duration_minutes: int = 30,
        max_results: int = 10
    ) -> List[Dict]:
        """
        Find optimal appointment slots using multi-factor analysis.
        
        Returns:
            List of slot dictionaries with scores and metadata
        """
        # Step 1: Get base available slots
        base_slots = self._get_available_slots(
            provider, preferred_dates, duration_minutes
        )
        
        if not base_slots:
            return []
        
        # Step 2: Score each slot
        scored_slots = []
        for slot in base_slots:
            score = self._calculate_slot_score(
                slot, patient, provider, appointment_type
            )
            scored_slots.append({
                'datetime': slot['start'],
                'end_datetime': slot['end'],
                'provider': provider,
                'score': score,
                'factors': score['breakdown']
            })
        
        # Step 3: Sort by score and return top results
        scored_slots.sort(key=lambda x: x['score']['total'], reverse=True)
        return scored_slots[:max_results]
    
    def _get_available_slots(
        self,
        provider,
        date_range: List[datetime],
        duration_minutes: int
    ) -> List[Dict]:
        """Get all available time slots for provider."""
        from appointments.models import SlotAvailability, AppointmentRequest
        
        slots = []
        duration = timedelta(minutes=duration_minutes)
        
        for date in date_range:
            # Get provider's availability for this date
            availabilities = SlotAvailability.objects.filter(
                tenant=self.tenant,
                provider=provider,
                date=date.date(),
                is_active=True,
                is_blocked=False
            )
            
            for availability in availabilities:
                # Calculate time slots within availability window
                current_time = datetime.combine(
                    availability.date,
                    availability.start_time
                )
                end_time = datetime.combine(
                    availability.date,
                    availability.end_time
                )
                
                while current_time + duration <= end_time:
                    # Check if slot is already booked
                    if not self._is_slot_booked(provider, current_time, duration):
                        slots.append({
                            'start': current_time,
                            'end': current_time + duration,
                            'availability_id': availability.id
                        })
                    
                    current_time += timedelta(minutes=availability.duration_minutes or 30)
        
        return slots
    
    def _is_slot_booked(
        self,
        provider,
        start_time: datetime,
        duration: timedelta
    ) -> bool:
        """Check if a time slot is already booked."""
        from appointments.models import AppointmentRequest
        
        end_time = start_time + duration
        
        overlapping = AppointmentRequest.objects.filter(
            tenant=self.tenant,
            provider=provider,
            scheduled_datetime__lt=end_time,
            scheduled_end_datetime__gt=start_time,
            status__in=['PENDING', 'CONFIRMED', 'CHECKED_IN', 'IN_PROGRESS']
        ).exists()
        
        return overlapping
    
    def _calculate_slot_score(
        self,
        slot: Dict,
        patient,
        provider,
        appointment_type
    ) -> Dict:
        """Calculate multi-factor score for a slot."""
        
        scores = {}
        
        # Factor 1: Provider availability (buffer time consideration)
        scores['provider_availability'] = self._score_provider_availability(
            provider, slot['start']
        )
        
        # Factor 2: Patient priority
        scores['patient_priority'] = self._score_patient_priority(
            patient, appointment_type
        )
        
        # Factor 3: No-show risk mitigation
        scores['no_show_risk'] = self._score_no_show_risk(
            patient, slot['start']
        )
        
        # Factor 4: Geographic proximity
        scores['geographic_proximity'] = self._score_geographic_proximity(
            patient, slot['start']
        )
        
        # Factor 5: Patient preference alignment
        scores['patient_preference'] = self._score_patient_preference(
            patient, provider, slot['start']
        )
        
        # Calculate weighted total
        total_score = sum(
            scores[factor] * self.weights[factor]
            for factor in scores
        )
        
        return {
            'total': round(total_score, 2),
            'breakdown': scores
        }
    
    def _score_provider_availability(
        self,
        provider,
        slot_time: datetime
    ) -> float:
        """Score based on provider's schedule density."""
        # Check appointments before and after this slot
        buffer_time = timedelta(minutes=15)
        
        appointments_nearby = AppointmentRequest.objects.filter(
            tenant=self.tenant,
            provider=provider,
            scheduled_datetime__gte=slot_time - buffer_time,
            scheduled_datetime__lte=slot_time + buffer_time,
            status__in=['CONFIRMED', 'CHECKED_IN', 'IN_PROGRESS']
        ).count()
        
        # Prefer slots with some buffer time
        if appointments_nearby == 0:
            return 100.0  # Ideal - plenty of buffer
        elif appointments_nearby == 1:
            return 75.0   # Good - some buffer
        else:
            return 50.0   # Acceptable - tight schedule
    
    def _score_patient_priority(
        self,
        patient,
        appointment_type
    ) -> float:
        """Score based on clinical priority."""
        from appointments.models import AppointmentPriorityRule
        
        # Check if there are priority rules for this appointment type
        rules = AppointmentPriorityRule.objects.filter(
            tenant=self.tenant,
            is_active=True,
            appointment_types=appointment_type
        ).first()
        
        if rules:
            return min(rules.base_priority_score, 100.0)
        
        return 50.0  # Default priority
    
    def _score_no_show_risk(
        self,
        patient,
        slot_time: datetime
    ) -> float:
        """Score based on predicted no-show probability."""
        try:
            prefs = SchedulingPreference.objects.get(
                tenant=self.tenant,
                patient=patient
            )
            
            # Lower score for higher no-show risk
            no_show_rate = float(prefs.average_no_show_rate)
            return max(0, 100 - (no_show_rate * 100))
            
        except SchedulingPreference.DoesNotExist:
            return 75.0  # Neutral score for new patients
    
    def _score_geographic_proximity(
        self,
        patient,
        slot_time: datetime
    ) -> float:
        """Score based on travel time and time of day."""
        try:
            prefs = SchedulingPreference.objects.get(
                tenant=self.tenant,
                patient=patient
            )
            
            if not prefs.travel_time_to_clinic:
                return 50.0
            
            # Prefer slots that allow comfortable travel time
            hour = slot_time.hour
            travel_minutes = prefs.travel_time_to_clinic.total_seconds() / 60
            
            # Morning rush hour (7-9 AM)
            if 7 <= hour <= 9:
                penalty = travel_minutes * 0.5
            # Evening rush hour (4-6 PM)
            elif 16 <= hour <= 18:
                penalty = travel_minutes * 0.5
            else:
                penalty = 0
            
            return max(0, 100 - penalty)
            
        except SchedulingPreference.DoesNotExist:
            return 50.0
    
    def _score_patient_preference(
        self,
        patient,
        provider,
        slot_time: datetime
    ) -> float:
        """Score based on patient's historical preferences."""
        try:
            prefs = SchedulingPreference.objects.get(
                tenant=self.tenant,
                patient=patient
            )
            
            score = 50.0  # Base score
            
            # Check day preference
            day_name = slot_time.strftime('%A')
            if day_name in prefs.preferred_days:
                score += 25.0
            
            # Check time preference
            hour = slot_time.hour
            if hour < 12 and 'morning' in prefs.preferred_times:
                score += 15.0
            elif 12 <= hour < 17 and 'afternoon' in prefs.preferred_times:
                score += 15.0
            elif hour >= 17 and 'evening' in prefs.preferred_times:
                score += 15.0
            
            # Check provider preference
            if provider in prefs.preferred_providers.all():
                score += 10.0
            
            return min(score, 100.0)
            
        except SchedulingPreference.DoesNotExist:
            return 50.0
    
    def apply_priority_routing(
        self,
        slots: List[Dict],
        patient,
        appointment_type
    ) -> List[Dict]:
        """Route high-priority patients to earliest available slots."""
        from appointments.models import AppointmentPriorityRule
        
        # Check if this is a high-priority appointment
        rules = AppointmentPriorityRule.objects.filter(
            tenant=self.tenant,
            is_active=True,
            appointment_types=appointment_type,
            base_priority_score__gte=80  # High priority threshold
        ).first()
        
        if rules and rules.requires_same_day:
            # Filter to same-day slots only
            today = timezone.now().date()
            slots = [s for s in slots if s['datetime'].date() == today]
        
        if rules:
            # Sort by earliest time for high-priority
            slots.sort(key=lambda x: x['datetime'])
        
        return slots
    
    def predict_no_show_probability(self, patient, slot_time: datetime) -> float:
        """
        Predict probability of no-show using historical data.
        
        Returns:
            Float between 0 and 1 representing no-show probability
        """
        try:
            prefs = SchedulingPreference.objects.get(
                tenant=self.tenant,
                patient=patient
            )
            
            # Base probability from historical rate
            base_prob = float(prefs.average_no_show_rate) / 100
            
            # Adjust based on time factors
            hour = slot_time.hour
            day_of_week = slot_time.weekday()
            
            # Early morning appointments have higher no-show rates
            if hour < 8:
                base_prob *= 1.3
            
            # Monday appointments have higher no-show rates
            if day_of_week == 0:
                base_prob *= 1.2
            
            # Friday afternoon appointments have higher no-show rates
            if day_of_week == 4 and hour >= 14:
                base_prob *= 1.15
            
            return min(base_prob, 1.0)
            
        except SchedulingPreference.DoesNotExist:
            return 0.15  # Default 15% no-show rate for new patients

10.3 Scheduling Utilities

# appointments/scheduling/utils.py

from datetime import datetime, timedelta
from typing import List, Dict
from django.db.models import Avg, Count, Q
from django.utils import timezone

class SchedulingAnalytics:
    """Analytics and reporting for scheduling performance."""
    
    @staticmethod
    def calculate_provider_utilization(provider, date_range_start, date_range_end):
        """Calculate provider utilization rate."""
        from appointments.models import SlotAvailability, AppointmentRequest
        
        # Total available slots
        total_slots = SlotAvailability.objects.filter(
            provider=provider,
            date__gte=date_range_start,
            date__lte=date_range_end,
            is_active=True
        ).aggregate(
            total=Count('id')
        )['total'] or 0
        
        # Booked appointments
        booked = AppointmentRequest.objects.filter(
            provider=provider,
            scheduled_datetime__date__gte=date_range_start,
            scheduled_datetime__date__lte=date_range_end,
            status__in=['CONFIRMED', 'COMPLETED', 'IN_PROGRESS']
        ).count()
        
        if total_slots == 0:
            return 0.0
        
        return round((booked / total_slots) * 100, 2)
    
    @staticmethod
    def calculate_no_show_rate(provider, date_range_start, date_range_end):
        """Calculate no-show rate for provider."""
        from appointments.models import AppointmentRequest
        
        total = AppointmentRequest.objects.filter(
            provider=provider,
            scheduled_datetime__date__gte=date_range_start,
            scheduled_datetime__date__lte=date_range_end
        ).count()
        
        no_shows = AppointmentRequest.objects.filter(
            provider=provider,
            scheduled_datetime__date__gte=date_range_start,
            scheduled_datetime__date__lte=date_range_end,
            status='NO_SHOW'
        ).count()
        
        if total == 0:
            return 0.0
        
        return round((no_shows / total) * 100, 2)
    
    @staticmethod
    def update_patient_scheduling_preferences(patient):
        """Update patient preferences based on historical data."""
        from appointments.models import AppointmentRequest, SchedulingPreference
        
        # Get patient's appointment history
        appointments = AppointmentRequest.objects.filter(
            patient=patient
        ).order_by('-scheduled_datetime')
        
        if not appointments.exists():
            return None
        
        # Calculate metrics
        total = appointments.count()
        completed = appointments.filter(status='COMPLETED').count()
        no_shows = appointments.filter(status='NO_SHOW').count()
        
        # Extract patterns
        preferred_days = list(set([
            appt.scheduled_datetime.strftime('%A')
            for appt in appointments.filter(status='COMPLETED')[:20]
        ]))
        
        preferred_times = []
        for appt in appointments.filter(status='COMPLETED')[:20]:
            hour = appt.scheduled_datetime.hour
            if hour < 12:
                preferred_times.append('morning')
            elif hour < 17:
                preferred_times.append('afternoon')
            else:
                preferred_times.append('evening')
        
        preferred_times = list(set(preferred_times))
        
        # Update or create preferences
        prefs, created = SchedulingPreference.objects.update_or_create(
            tenant=patient.tenant,
            patient=patient,
            defaults={
                'preferred_days': preferred_days,
                'preferred_times': preferred_times,
                'total_appointments': total,
                'completed_appointments': completed,
                'average_no_show_rate': round((no_shows / total) * 100, 2) if total > 0 else 0
            }
        )
        
        return prefs


class ConflictDetector:
    """Detect and resolve scheduling conflicts."""
    
    @staticmethod
    def check_conflicts(provider, start_time, end_time, exclude_appointment_id=None):
        """Check for scheduling conflicts."""
        from appointments.models import AppointmentRequest
        
        conflicts = AppointmentRequest.objects.filter(
            provider=provider,
            scheduled_datetime__lt=end_time,
            scheduled_end_datetime__gt=start_time,
            status__in=['PENDING', 'CONFIRMED', 'CHECKED_IN', 'IN_PROGRESS']
        )
        
        if exclude_appointment_id:
            conflicts = conflicts.exclude(id=exclude_appointment_id)
        
        return list(conflicts)
    
    @staticmethod
    def suggest_alternative_slots(provider, original_time, duration_minutes=30, count=5):
        """Suggest alternative slots when conflict exists."""
        from appointments.scheduling.smart_scheduler import SmartScheduler
        
        scheduler = SmartScheduler(provider.tenant)
        
        # Generate date range (next 7 days)
        date_range = [
            original_time + timedelta(days=i)
            for i in range(7)
        ]
        
        # Find available slots
        slots = scheduler._get_available_slots(
            provider, date_range, duration_minutes
        )
        
        # Return closest alternatives
        slots.sort(key=lambda x: abs((x['start'] - original_time).total_seconds()))
        return slots[:count]

10.4 API Endpoints

# appointments/api/scheduling_views.py

from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django.utils import timezone
from datetime import datetime, timedelta

from appointments.scheduling.smart_scheduler import SmartScheduler
from appointments.scheduling.utils import SchedulingAnalytics, ConflictDetector


class SmartSchedulingViewSet(viewsets.ViewSet):
    """API endpoints for smart scheduling features."""
    
    @action(detail=False, methods=['post'])
    def find_optimal_slots(self, request):
        """
        Find optimal appointment slots using AI.
        
        POST /api/appointments/scheduling/find-optimal-slots/
        {
            "patient_id": 123,
            "provider_id": 456,
            "appointment_type_id": 789,
            "preferred_dates": ["2025-01-15", "2025-01-16"],
            "duration_minutes": 30
        }
        """
        # Extract parameters
        patient_id = request.data.get('patient_id')
        provider_id = request.data.get('provider_id')
        appointment_type_id = request.data.get('appointment_type_id')
        preferred_dates = request.data.get('preferred_dates', [])
        duration_minutes = request.data.get('duration_minutes', 30)
        
        # Validate inputs
        if not all([patient_id, provider_id, appointment_type_id]):
            return Response(
                {'error': 'Missing required parameters'},
                status=status.HTTP_400_BAD_REQUEST
            )
        
        # Get objects
        from patients.models import PatientProfile
        from accounts.models import User
        from appointments.models import AppointmentTemplate
        
        try:
            patient = PatientProfile.objects.get(id=patient_id)
            provider = User.objects.get(id=provider_id)
            appointment_type = AppointmentTemplate.objects.get(id=appointment_type_id)
        except Exception as e:
            return Response(
                {'error': str(e)},
                status=status.HTTP_404_NOT_FOUND
            )
        
        # Convert date strings to datetime objects
        date_objects = [
            datetime.strptime(date_str, '%Y-%m-%d')
            for date_str in preferred_dates
        ]
        
        # Find optimal slots
        scheduler = SmartScheduler(request.user.tenant)
        slots = scheduler.find_optimal_slots(
            patient=patient,
            provider=provider,
            appointment_type=appointment_type,
            preferred_dates=date_objects,
            duration_minutes=duration_minutes
        )
        
        # Format response
        return Response({
            'slots': [
                {
                    'datetime': slot['datetime'].isoformat(),
                    'end_datetime': slot['end_datetime'].isoformat(),
                    'provider_id': slot['provider'].id,
                    'provider_name': slot['provider'].get_full_name(),
                    'score': slot['score']['total'],
                    'score_breakdown': slot['score']['breakdown']
                }
                for slot in slots
            ],
            'count': len(slots)
        })
    
    @action(detail=False, methods=['post'])
    def check_conflicts(self, request):
        """
        Check for scheduling conflicts.
        
        POST /api/appointments/scheduling/check-conflicts/
        {
            "provider_id": 456,
            "start_time": "2025-01-15T10:00:00",
            "end_time": "2025-01-15T10:30:00",
            "exclude_appointment_id": 123
        }
        """
        provider_id = request.data.get('provider_id')
        start_time = request.data.get('start_time')
        end_time = request.data.get('end_time')
        exclude_id = request.data.get('exclude_appointment_id')
        
        # Get provider
        from accounts.models import User
        try:
            provider = User.objects.get(id=provider_id)
        except User.DoesNotExist:
            return Response(
                {'error': 'Provider not found'},
                status=status.HTTP_404_NOT_FOUND
            )
        
        # Parse times
        start_dt = datetime.fromisoformat(start_time)
        end_dt = datetime.fromisoformat(end_time)
        
        # Check conflicts
        conflicts = ConflictDetector.check_conflicts(
            provider, start_dt, end_dt, exclude_id
        )
        
        if conflicts:
            # Suggest alternatives
            alternatives = ConflictDetector.suggest_alternative_slots(
                provider, start_dt,
                duration_minutes=int((end_dt - start_dt).total_seconds() / 60)
            )
            
            return Response({
                'has_conflict': True,
                'conflicts': [
                    {
                        'id': c.id,
                        'patient': c.patient.get_full_name(),
                        'start': c.scheduled_datetime.isoformat(),
                        'end': c.scheduled_end_datetime.isoformat()
                    }
                    for c in conflicts
                ],
                'alternative_slots': [
                    {
                        'start': alt['start'].isoformat(),
                        'end': alt['end'].isoformat()
                    }
                    for alt in alternatives
                ]
            })
        
        return Response({'has_conflict': False})
    
    @action(detail=False, methods=['get'])
    def provider_analytics(self, request):
        """
        Get scheduling analytics for a provider.
        
        GET /api/appointments/scheduling/provider-analytics/?provider_id=456&days=30
        """
        provider_id = request.query_params.get('provider_id')
        days = int(request.query_params.get('days', 30))
        
        from accounts.models import User
        try:
            provider = User.objects.get(id=provider_id)
        except User.DoesNotExist:
            return Response(
                {'error': 'Provider not found'},
                status=status.HTTP_404_NOT_FOUND
            )
        
        # Calculate date range
        end_date = timezone.now().date()
        start_date = end_date - timedelta(days=days)
        
        # Get analytics
        utilization = SchedulingAnalytics.calculate_provider_utilization(
            provider, start_date, end_date
        )
        no_show_rate = SchedulingAnalytics.calculate_no_show_rate(
            provider, start_date, end_date
        )
        
        return Response({
            'provider_id': provider.id,
            'provider_name': provider.get_full_name(),
            'period': {
                'start': start_date.isoformat(),
                'end': end_date.isoformat(),
                'days': days
            },
            'metrics': {
                'utilization_rate': utilization,
                'no_show_rate': no_show_rate
            }
        })

10.5 Implementation Checklist

  • Step 1: Database Changes (models.py) COMPLETE

    • Create SchedulingPreference model
    • Create AppointmentPriorityRule model
    • Create SchedulingMetrics model
    • Run migrations
    • Add indexes for performance
  • Step 2: Core Logic (scheduling/) COMPLETE

    • Implement SmartScheduler class
    • Implement multi-factor scoring algorithm
    • Implement priority routing logic
    • Implement no-show prediction
    • Implement geographic optimization
  • Step 3: Utilities (utils.py) COMPLETE

    • Implement SchedulingAnalytics class
    • Implement ConflictDetector class
    • Create preference update utilities
    • Existing utils already contain required functionality
  • Step 4: API Endpoints (api/) COMPLETE

    • Create SmartSchedulingViewSet
    • Add find_optimal_slots endpoint
    • Add check_conflicts endpoint
    • Add provider_analytics endpoint
    • Add update_patient_preferences endpoint
  • Step 5: Admin Interface (admin.py) COMPLETE

    • Register SchedulingPreference in admin
    • Register AppointmentPriorityRule in admin
    • Register SchedulingMetrics in admin
  • Step 6: Views (views.py) COMPLETE

    • SmartSchedulingView - Main scheduling interface
    • find_optimal_slots_view - HTMX slot finder
    • scheduling_analytics_view - Analytics dashboard
    • check_scheduling_conflicts_view - HTMX conflict checker
    • update_patient_preferences_view - Manual preference update
    • scheduling_metrics_dashboard - Metrics dashboard
  • Step 7: URLs (urls.py) COMPLETE

    • /appointments/scheduling/ - Main interface
    • /appointments/scheduling/find-optimal-slots/ - Find slots
    • /appointments/scheduling/check-conflicts/ - Check conflicts
    • /appointments/scheduling/analytics/ - Analytics
    • /appointments/scheduling/metrics/ - Metrics dashboard
    • /appointments/scheduling/patient//update-preferences/ - Update prefs
  • Step 8: Templates COMPLETE (8/8 templates)

    • Main Templates (4/4):
      • smart_scheduling.html - Main scheduling interface with search form
      • analytics.html - Provider performance analytics dashboard
      • metrics_dashboard.html - Detailed metrics with time-series data
      • conflicts.html - Standalone conflict checking page
    • Partial Templates (4/4):
      • partials/optimal_slots.html - AI-ranked slot results with score breakdown
      • partials/conflicts.html - Conflict detection with alternative suggestions
      • partials/provider_metrics.html - Reusable provider metrics card
      • partials/slot_score_breakdown.html - Detailed scoring visualization
  • Testing (Future)

    • Unit tests for SmartScheduler
    • Unit tests for scoring algorithms
    • Integration tests for API endpoints
    • Performance tests for slot finding
  • Documentation (Future)

    • API documentation
    • Algorithm documentation
    • User guide for scheduling features

10.6 Success Criteria

  • Slot finding returns results in <2 seconds for 30-day range
  • Multi-factor scoring produces measurable improvement in slot utilization
  • No-show prediction accuracy >70%
  • API endpoints handle 100+ concurrent requests
  • Patient preferences auto-update after each appointment
  • Conflict detection catches 100% of overlapping appointments

Phase 11: Advanced Queue Management

Priority: HIGH
Estimated Time: 12-15 hours
Dependencies: Phase 10 completed

Overview

Enhance the queue management system with real-time calculations, dynamic positioning, WebSocket support, and advanced analytics.

11.1 Database Schema Enhancements

# appointments/models.py additions

class QueueConfiguration(models.Model):
    """Advanced queue configuration and rules"""
    queue = models.OneToOneField('WaitingQueue', on_delete=models.CASCADE)
    
    # Dynamic positioning rules
    use_dynamic_positioning = models.BooleanField(default=True)
    priority_weight = models.DecimalField(max_digits=3, decimal_places=2, default=0.5)
    wait_time_weight = models.DecimalField(max_digits=3, decimal_places=2, default=0.3)
    appointment_time_weight = models.DecimalField(max_digits=3, decimal_places=2, default=0.2)
    
    # Capacity management
    enable_overflow_queue = models.BooleanField(default=False)
    overflow_threshold = models.IntegerField(default=10)
    
    # Wait time estimation
    use_historical_data = models.BooleanField(default=True)
    default_service_time_minutes = models.IntegerField(default=20)
    
    # Real-time updates
    enable_websocket_updates = models.BooleanField(default=True)
    update_interval_seconds = models.IntegerField(default=30)
    
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)


class QueueMetrics(models.Model):
    """Track queue performance metrics"""
    queue = models.ForeignKey('WaitingQueue', on_delete=models.CASCADE)
    date = models.DateField()
    hour = models.IntegerField()  # 0-23
    
    # Volume metrics
    total_entries = models.IntegerField(default=0)
    completed_entries = models.IntegerField(default=0)
    no_shows = models.IntegerField(default=0)
    
    # Time metrics
    average_wait_time_minutes = models.DecimalField(max_digits=6, decimal_places=2, default=0)
    max_wait_time_minutes = models.IntegerField(default=0)
    average_service_time_minutes = models.DecimalField(max_digits=6, decimal_places=2, default=0)
    
    # Queue state
    peak_queue_size = models.IntegerField(default=0)
    average_queue_size = models.DecimalField(max_digits=6, decimal_places=2, default=0)
    
    created_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        unique_together = ['queue', 'date', 'hour']
        indexes = [
            models.Index(fields=['queue', 'date']),
            models.Index(fields=['date', 'hour']),
        ]

11.2 Queue Engine Implementation

# appointments/queue/queue_engine.py

from datetime import datetime, timedelta
from typing import List, Dict, Optional
from django.db.models import Avg, Count, F, Q
from django.utils import timezone
from decimal import Decimal

class AdvancedQueueEngine:
    """
    Advanced queue management with dynamic positioning and real-time updates.
    """
    
    def __init__(self, queue):
        self.queue = queue
        try:
            self.config = queue.queueconfiguration
        except:
            # Create default configuration if not exists
            from appointments.models import QueueConfiguration
            self.config = QueueConfiguration.objects.create(queue=queue)
    
    def add_to_queue(
        self,
        patient,
        appointment=None,
        priority_score: float = 0.0,
        notes: str = ""
    ):
        """
        Add patient to queue with intelligent positioning.
        """
        from appointments.models import QueueEntry
        
        # Calculate optimal position
        position = self.calculate_optimal_position(patient, priority_score, appointment)
        
        # Estimate wait time
        wait_time = self.calculate_estimated_wait_time(position)
        
        # Create queue entry
        entry = QueueEntry.objects.create(
            queue=self.queue,
            patient=patient,
            appointment=appointment,
            queue_position=position,
            priority_score=priority_score,
            estimated_service_time=wait_time,
            notes=notes,
            status='WAITING'
        )
        
        # Reposition other entries if needed
        if self.config.use_dynamic_positioning:
            self.reposition_queue_entries()
        
        # Broadcast update via WebSocket
        if self.config.enable_websocket_updates:
            self.broadcast_queue_update()
        
        # Update metrics
        self.update_queue_metrics()
        
        return entry
    
    def calculate_optimal_position(
        self,
        patient,
        priority_score: float,
        appointment=None
    ) -> int:
        """
        Calculate optimal queue position using weighted factors.
        """
        from appointments.models import QueueEntry
        
        if not self.config.use_dynamic_positioning:
            # Simple FIFO - add to end
            return self.queue.queueentry_set.filter(
                status='WAITING'
            ).count() + 1
        
        # Get current waiting entries
        waiting_entries = self.queue.queueentry_set.filter(
            status='WAITING'
        ).order_by('queue_position')
        
        if not waiting_entries.exists():
            return 1
        
        # Calculate scores for each position
        best_position = len(waiting_entries) + 1
        best_score = float('-inf')
        
        for pos in range(1, len(waiting_entries) + 2):
            score = self._calculate_position_score(
                pos, priority_score, appointment, waiting_entries
            )
            if score > best_score:
                best_score = score
                best_position = pos
        
        return best_position
    
    def _calculate_position_score(
        self,
        position: int,
        priority_score: float,
        appointment,
        waiting_entries
    ) -> float:
        """Calculate score for a specific position."""
        
        # Factor 1: Priority score (higher is better)
        priority_factor = priority_score * float(self.config.priority_weight)
        
        # Factor 2: Wait time fairness (earlier positions for longer waits)
        wait_time_factor = 0.0
        if appointment and appointment.scheduled_datetime:
            time_until_appt = (
                appointment.scheduled_datetime - timezone.now()
            ).total_seconds() / 60
            # Normalize to 0-100 scale
            wait_time_factor = min(100, max(0, 100 - time_until_appt))
            wait_time_factor *= float(self.config.wait_time_weight)
        
        # Factor 3: Appointment time proximity
        appt_time_factor = 0.0
        if appointment:
            # Prefer positions that minimize disruption
            appt_time_factor = (100 - position) * float(self.config.appointment_time_weight)
        
        return priority_factor + wait_time_factor + appt_time_factor
    
    def calculate_estimated_wait_time(self, position: int) -> timedelta:
        """
        Calculate estimated wait time based on position and historical data.
        """
        from appointments.models import QueueMetrics
        
        if position <= 0:
            return timedelta(0)
        
        # Get average service time
        if self.config.use_historical_data:
            # Use historical data from last 7 days
            week_ago = timezone.now().date() - timedelta(days=7)
            avg_service_time = QueueMetrics.objects.filter(
                queue=self.queue,
                date__gte=week_ago
            ).aggregate(
                avg=Avg('average_service_time_minutes')
            )['avg'] or self.config.default_service_time_minutes
        else:
            avg_service_time = self.config.default_service_time_minutes
        
        # Calculate load factor
        current_size = self.queue.queueentry_set.filter(
            status='WAITING'
        ).count()
        
        load_factor = self.calculate_load_factor(current_size)
        
        # Estimate wait time
        base_wait = avg_service_time * (position - 1)
        adjusted_wait = base_wait * load_factor
        
        return timedelta(minutes=int(adjusted_wait))
    
    def calculate_load_factor(self, current_size: int) -> float:
        """
        Calculate load factor based on queue size.
        Higher load = longer wait times per patient.
        """
        capacity = self.queue.max_queue_size or 10
        utilization = current_size / capacity
        
        if utilization < 0.5:
            return 1.0  # Normal processing
        elif utilization < 0.75:
            return 1.2  # Slightly slower
        elif utilization < 0.9:
            return 1.5  # Significantly slower
        else:
            return 2.0  # Very slow, overloaded
    
    def reposition_queue_entries(self):
        """
        Dynamically reposition all waiting entries based on current factors.
        """
        from appointments.models import QueueEntry
        
        waiting_entries = list(
            self.queue.queueentry_set.filter(
                status='WAITING'
            ).select_related('patient', 'appointment')
        )
        
        if not waiting_entries:
            return
        
        # Calculate scores for each entry
        scored_entries = []
        for entry in waiting_entries:
            score = self._calculate_entry_priority_score(entry)
            scored_entries.append((entry, score))
        
        # Sort by score (highest first)
        scored_entries.sort(key=lambda x: x[1], reverse=True)
        
        # Update positions
        for new_position, (entry, score) in enumerate(scored_entries, start=1):
            if entry.queue_position != new_position:
                entry.queue_position = new_position
                entry.save(update_fields=['queue_position'])
    
    def _calculate_entry_priority_score(self, entry) -> float:
        """Calculate priority score for repositioning."""
        
        # Base priority score
        score = entry.priority_score or 0.0
        
        # Add wait time factor
        wait_minutes = (timezone.now() - entry.joined_at).total_seconds() / 60
        wait_factor = min(50, wait_minutes / 2)  # Cap at 50 points
        
        # Add appointment proximity factor
        if entry.appointment and entry.appointment.scheduled_datetime:
            time_until = (
                entry.appointment.scheduled_datetime - timezone.now()
            ).total_seconds() / 60
            if time_until < 30:  # Within 30 minutes
                proximity_factor = 30
            elif time_until < 60:  # Within 1 hour
                proximity_factor = 15
            else:
                proximity_factor = 0
        else:
            proximity_factor = 0
        
        return score + wait_factor + proximity_factor
    
    def get_next_patient(self) -> Optional['QueueEntry']:
        """
        Get the next patient to be served.
        """
        from appointments.models import QueueEntry
        
        next_entry = self.queue.queueentry_set.filter(
            status='WAITING'
        ).order_by('queue_position').first()
        
        if next_entry:
            # Mark as called
            next_entry.mark_as_called()
            
            # Broadcast update
            if self.config.enable_websocket_updates:
                self.broadcast_queue_update()
        
        return next_entry
    
    def broadcast_queue_update(self):
        """
        Broadcast queue update via WebSocket.
        """
        # This will be implemented with Django Channels
        from channels.layers import get_channel_layer
        from asgiref.sync import async_to_sync
        
        try:
            channel_layer = get_channel_layer()
            if channel_layer:
                async_to_sync(channel_layer.group_send)(
                    f'queue_{self.queue.id}',
                    {
                        'type': 'queue_update',
                        'data': self.get_queue_status()
                    }
                )
        except Exception as e:
            # Log error but don't fail
            print(f"WebSocket broadcast error: {e}")
    
    def get_queue_status(self) -> Dict:
        """Get current queue status for broadcasting."""
        from appointments.models import QueueEntry
        
        waiting = self.queue.queueentry_set.filter(status='WAITING')
        
        return {
            'queue_id': self.queue.id,
            'queue_name': self.queue.name,
            'current_size': waiting.count(),
            'max_size': self.queue.max_queue_size,
            'average_wait_time': str(self.queue.calculate_wait_time()),
            'is_accepting': self.queue.is_accepting_patients,
            'entries': [
                {
                    'id': entry.id,
                    'patient_name': entry.patient.get_full_name(),
                    'position': entry.queue_position,
                    'wait_time': str(entry.wait_time_minutes or 0),
                    'status': entry.status
                }
                for entry in waiting.order_by('queue_position')[:10]
            ],
            'timestamp': timezone.now().isoformat()
        }
    
    def update_queue_metrics(self):
        """Update queue metrics for analytics."""
        from appointments.models import QueueMetrics
        
        now = timezone.now()
        
        # Get or create metrics for current hour
        metrics, created = QueueMetrics.objects.get_or_create(
            queue=self.queue,
            date=now.date(),
            hour=now.hour,
            defaults={
                'total_entries': 0,
                'completed_entries': 0,
                'no_shows': 0,
                'average_wait_time_minutes': 0,
                'max_wait_time_minutes': 0,
                'average_service_time_minutes': 0,
                'peak_queue_size': 0,
                'average_queue_size': 0
            }
        )
        
        # Update current metrics
        current_size = self.queue.queueentry_set.filter(
            status='WAITING'
        ).count()
        
        metrics.peak_queue_size = max(metrics.peak_queue_size, current_size)
        metrics.save()

11.3 WebSocket Consumer for Real-time Updates

# appointments/consumers.py

import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async

class QueueConsumer(AsyncWebsocketConsumer):
    """
    WebSocket consumer for real-time queue updates.
    """
    
    async def connect(self):
        self.queue_id = self.scope['url_route']['kwargs']['queue_id']
        self.queue_group_name = f'queue_{self.queue_id}'
        
        # Join queue group
        await self.channel_layer.group_add(
            self.queue_group_name,
            self.channel_name
        )
        
        await self.accept()
        
        # Send initial queue status
        status = await self.get_queue_status()
        await self.send(text_data=json.dumps(status))
    
    async def disconnect(self, close_code):
        # Leave queue group
        await self.channel_layer.group_discard(
            self.queue_group_name,
            self.channel_name
        )
    
    async def receive(self, text_data):
        """Handle messages from WebSocket."""
        data = json.dumps(text_data)
        message_type = data.get('type')
        
        if message_type == 'get_status':
            status = await self.get_queue_status()
            await self.send(text_data=json.dumps(status))
    
    async def queue_update(self, event):
        """Handle queue update events from group."""
        await self.send(text_data=json.dumps(event['data']))
    
    @database_sync_to_async
    def get_queue_status(self):
        """Get current queue status."""
        from appointments.models import WaitingQueue
        from appointments.queue.queue_engine import AdvancedQueueEngine
        
        try:
            queue = WaitingQueue.objects.get(id=self.queue_id)
            engine = AdvancedQueueEngine(queue)
            return engine.get_queue_status()
        except WaitingQueue.DoesNotExist:
            return {'error': 'Queue not found'}

11.4 Implementation Checklist

  • Database Changes

    • Create QueueConfiguration model
    • Create QueueMetrics model
    • Run migrations
    • Add database indexes
  • Queue Engine

    • Implement AdvancedQueueEngine class
    • Implement dynamic positioning algorithm
    • Implement load factor calculations
    • Implement wait time estimation
  • WebSocket Support

    • Install Django Channels
    • Create QueueConsumer
    • Configure routing
    • Set up Redis for channel layer
  • API Endpoints

    • Create queue management endpoints
    • Add real-time status endpoint
    • Add metrics endpoint
  • Frontend Integration

    • Create WebSocket client
    • Add real-time queue display
    • Add queue management UI
  • Testing

    • Unit tests for queue engine
    • WebSocket connection tests
    • Load testing for concurrent updates

11.5 Success Criteria

  • Dynamic positioning reduces average wait time by 20%
  • WebSocket updates delivered in <1 second
  • System handles 50+ concurrent queue updates
  • Wait time estimates accurate within ±10 minutes
  • Queue metrics updated in real-time

Phase 12: Intelligent Waitlist Automation

Priority: HIGH
Estimated Time: 10-12 hours
Dependencies: Phases 10-11 completed

Overview

Implement automated waitlist fulfillment with intelligent matching, slot cancellation tracking, and multi-channel notifications.

12.1 Database Schema Enhancements

# appointments/models.py additions

class SlotCancellation(models.Model):
    """Track cancelled slots for waitlist fulfillment"""
    tenant = models.ForeignKey('core.Tenant', on_delete=models.CASCADE)
    provider = models.ForeignKey('accounts.User', on_delete=models.CASCADE)
    original_appointment = models.ForeignKey(
        'AppointmentRequest',
        on_delete=models.SET_NULL,
        null=True,
        related_name='cancellation_record'
    )
    
    # Slot details
    cancelled_slot_start = models.DateTimeField()
    cancelled_slot_end = models.DateTimeField()
    appointment_type = models.ForeignKey('AppointmentTemplate', on_delete=models.PROTECT)
    
    # Cancellation info
    cancellation_time = models.DateTimeField(auto_now_add=True)
    cancellation_reason = models.TextField(blank=True)
    
    # Fulfillment tracking
    filled_by_waitlist = models.BooleanField(default=False)
    filled_by = models.ForeignKey(
        'WaitingList',
        on_delete=models.SET_NULL,
        null=True,
        blank=True
    )
    filled_at = models.DateTimeField(null=True, blank=True)
    
    # Notification tracking
    notifications_sent = models.IntegerField(default=0)
    patients_notified = models.ManyToManyField('patients.PatientProfile', blank=True)
    
    is_active = models.BooleanField(default=True)
    expires_at = models.DateTimeField()  # Slot expires if not filled
    
    class Meta:
        indexes = [
            models.Index(fields=['provider', 'cancelled_slot_start']),
            models.Index(fields=['is_active', 'expires_at']),
        ]


# Add to existing WaitingList model
class WaitingList(models.Model):
    # ... existing fields ...
    
    # Add flexibility scoring
    flexibility_score = models.IntegerField(default=0)  # 0-100
    
    # Add notification preferences
    notification_channels = models.JSONField(default=list)  # ['sms', 'email', 'push']
    
    # Add response tracking
    last_opportunity_offered = models.DateTimeField(null=True, blank=True)
    opportunities_offered = models.IntegerField(default=0)
    opportunities_accepted = models.IntegerField(default=0)
    opportunities_declined = models.IntegerField(default=0)

12.2 Waitlist Engine Implementation

# appointments/waitlist/waitlist_engine.py

from datetime import datetime, timedelta
from typing import List, Dict, Optional
from django.db.models import Q
from django.utils import timezone

class WaitlistEngine:
    """
    Intelligent waitlist management with automated fulfillment.
    """
    
    def __init__(self, tenant):
        self.tenant = tenant
    
    def process_cancellation(self, appointment):
        """
        Process appointment cancellation and attempt waitlist fulfillment.
        """
        from appointments.models import SlotCancellation
        
        # Create cancellation record
        cancellation = SlotCancellation.objects.create(
            tenant=self.tenant,
            provider=appointment.provider,
            original_appointment=appointment,
            cancelled_slot_start=appointment.scheduled_datetime,
            cancelled_slot_end=appointment.scheduled_end_datetime,
            appointment_type=appointment.appointment_type,
            cancellation_reason=appointment.cancellation_reason or '',
            expires_at=appointment.scheduled_datetime - timedelta(hours=2)
        )
        
        # Attempt to fill from waitlist
        self.attempt_waitlist_fulfillment(cancellation)
        
        return cancellation
    
    def attempt_waitlist_fulfillment(self, cancellation):
        """
        Attempt to fill cancelled slot from waitlist.
        """
        # Find matching waitlist entries
        matches = self.find_matching_waitlist_entries(cancellation)
        
        if not matches:
            return False
        
        # Notify matches in priority order
        for waitlist_entry in matches[:5]:  # Notify top 5 matches
            self.send_waitlist_opportunity(waitlist_entry, cancellation)
            cancellation.notifications_sent += 1
            cancellation.patients_notified.add(waitlist_entry.patient)
        
        cancellation.save()
        return True
    
    def find_matching_waitlist_entries(self, cancellation) -> List:
        """
        Find waitlist entries that match the cancelled slot.
        """
        from appointments.models import WaitingList
        
        # Base query - active waitlist entries
        queryset = WaitingList.objects.filter(
            tenant=self.tenant,
            status='ACTIVE',
            appointment_type=cancellation.appointment_type
        )
        
        # Filter by date range
        slot_date = cancellation.cancelled_slot_start.date()
        queryset = queryset.filter(
            Q(desired_date_range_start__lte=slot_date) &
            Q(desired_date_range_end__gte=slot_date)
        )
        
        # Filter by provider preference (if specified)
        queryset = queryset.filter(
            Q(preferred_provider=cancellation.provider) |
            Q(preferred_provider__isnull=True)
        )
        
        # Score and sort matches
        scored_matches = []
        for entry in queryset:
            score = self.calculate_match_score(entry, cancellation)
            scored_matches.append((entry, score))
        
        # Sort by score (highest first) and priority
        scored_matches.sort(
            key=lambda x: (x[1], x[0].priority, -x[0].created_at.timestamp()),
            reverse=True
        )
        
        return [match[0] for match in scored_matches]
    
    def calculate_match_score(self, waitlist_entry, cancellation) -> float:
        """
        Calculate how well a waitlist entry matches a cancelled slot.
        """
        score = 0.0
        
        # Factor 1: Provider match (30 points)
        if waitlist_entry.preferred_provider == cancellation.provider:
            score += 30.0
        elif not waitlist_entry.preferred_provider:
            score += 15.0  # Neutral if no preference
        
        # Factor 2: Date preference (25 points)
        slot_date = cancellation.cancelled_slot_start.date()
        if waitlist_entry.preferred_date:
            days_diff = abs((slot_date - waitlist_entry.preferred_date).days)
            if days_diff == 0:
                score += 25.0
            elif days_diff <= 3:
                score += 15.0
            elif days_diff <= 7:
                score += 10.0
        else:
            score += 12.5  # Neutral if no preference
        
        # Factor 3: Time preference (20 points)
        slot_hour = cancellation.cancelled_slot_start.hour
        if waitlist_entry.preferred_time:
            pref_hour = waitlist_entry.preferred_time.hour
            hour_diff = abs(slot_hour - pref_hour)
            if hour_diff <= 1:
                score += 20.0
            elif hour_diff <= 2:
                score += 10.0
        else:
            score += 10.0  # Neutral if no preference
        
        # Factor 4: Flexibility (15 points)
        score += (waitlist_entry.flexibility_score / 100) * 15.0
        
        # Factor 5: Wait time (10 points)
        days_waiting = (timezone.now().date() - waitlist_entry.created_at.date()).days
        score += min(10.0, days_waiting / 3)  # Max 10 points for 30+ days
        
        return score
    
    def send_waitlist_opportunity(self, waitlist_entry, cancellation):
        """
        Send notification about available slot to waitlist patient.
        """
        from appointments.notifications import NotificationEngine
        
        # Update waitlist entry
        waitlist_entry.last_opportunity_offered = timezone.now()
        waitlist_entry.opportunities_offered += 1
        waitlist_entry.save()
        
        # Prepare notification context
        context = {
            'patient': waitlist_entry.patient,
            'provider': cancellation.provider,
            'slot_datetime': cancellation.cancelled_slot_start,
            'slot_end': cancellation.cancelled_slot_end,
            'appointment_type': cancellation.appointment_type,
            'expiration_time': cancellation.expires_at,
            'response_url': self.generate_response_url(waitlist_entry, cancellation)
        }
        
        # Send via preferred channels
        notifier = NotificationEngine(self.tenant)
        for channel in waitlist_entry.notification_channels:
            if channel == 'sms':
                notifier.send_sms_waitlist_opportunity(context)
            elif channel == 'email':
                notifier.send_email_waitlist_opportunity(context)
            elif channel == 'push':
                notifier.send_push_waitlist_opportunity(context)
    
    def generate_response_url(self, waitlist_entry, cancellation) -> str:
        """Generate unique URL for patient to respond to opportunity."""
        import hashlib
        
        # Create unique token
        token_string = f"{waitlist_entry.id}-{cancellation.id}-{timezone.now().timestamp()}"
        token = hashlib.sha256(token_string.encode()).hexdigest()[:32]
        
        # Store token (you'd need a ResponseToken model)
        # For now, return placeholder
        return f"/appointments/waitlist/respond/{token}/"
    
    def accept_waitlist_opportunity(self, waitlist_entry, cancellation):
        """
        Process patient acceptance of waitlist opportunity.
        """
        from appointments.models import AppointmentRequest
        
        # Create new appointment
        appointment = AppointmentRequest.objects.create(
            tenant=self.tenant,
            patient=waitlist_entry.patient,
            provider=cancellation.provider,
            appointment_type=cancellation.appointment_type,
            scheduled_datetime=cancellation.cancelled_slot_start,
            scheduled_end_datetime=cancellation.cancelled_slot_end,
            status='CONFIRMED',
            priority=waitlist_entry.priority,
            is_waitlist_conversion=True
        )
        
        # Update cancellation record
        cancellation.filled_by_waitlist = True
        cancellation.filled_by = waitlist_entry
        cancellation.filled_at = timezone.now()
        cancellation.is_active = False
        cancellation.save()
        
        # Update waitlist entry
        waitlist_entry.status = 'SCHEDULED'
        waitlist_entry.scheduled_appointment = appointment
        waitlist_entry.opportunities_accepted += 1
        waitlist_entry.save()
        
        return appointment
    
    def calculate_flexibility_score(self, waitlist_entry) -> int:
        """
        Calculate flexibility score based on patient preferences.
        Score: 0 (inflexible) to 100 (very flexible)
        """
        score = 50  # Base score
        
        # More flexible if no provider preference
        if not waitlist_entry.preferred_provider:
            score += 20
        
        # More flexible if wide date range
        if waitlist_entry.desired_date_range_start and waitlist_entry.desired_date_range_end:
            days_range = (
                waitlist_entry.desired_date_range_end - 
                waitlist_entry.desired_date_range_start
            ).days
            if days_range > 30:
                score += 20
            elif days_range > 14:
                score += 10
        
        # More flexible if no time preference
        if not waitlist_entry.preferred_time:
            score += 10
        
        return min(100, score)

12.3 Implementation Checklist

  • Database Changes

    • Create SlotCancellation model
    • Add flexibility_score to WaitingList
    • Add notification_channels to WaitingList
    • Run migrations
  • Waitlist Engine

    • Implement WaitlistEngine class
    • Implement matching algorithm
    • Implement scoring system
    • Implement notification system
  • Signal Integration

    • Connect to appointment cancellation signal
    • Auto-trigger waitlist fulfillment
    • Update waitlist metrics
  • API Endpoints