1836 lines
62 KiB
Markdown
1836 lines
62 KiB
Markdown
# Appointments App - Advanced Features Implementation Plan
|
|
## Phases 10-15: Smart Scheduling, Queue Intelligence & Integration
|
|
|
|
**Project**: Hospital Management System v4
|
|
**App**: Appointments
|
|
**Date**: 2025-01-10
|
|
**Status**: Planning Phase
|
|
**Based on**: a_q_w.md recommendations
|
|
|
|
---
|
|
|
|
## Executive Summary
|
|
|
|
This plan outlines the implementation of advanced features for the appointments app, building upon the foundation established in Phases 1-7. These enhancements will transform the system from a basic appointment management tool into an intelligent, automated healthcare scheduling platform.
|
|
|
|
### Strategic Objectives
|
|
|
|
1. **Reduce No-Shows by 30%** through smart scheduling and predictive analytics
|
|
2. **Decrease Wait Times by 40%** via intelligent queue management
|
|
3. **Increase Slot Utilization by 25%** through automated waitlist fulfillment
|
|
4. **Improve Patient Satisfaction by 50%** with multi-channel notifications
|
|
5. **Enable Seamless Integration** with EHR and insurance systems
|
|
|
|
### Current State vs. Target State
|
|
|
|
| Feature | Current State | Target State |
|
|
|---------|---------------|--------------|
|
|
| Scheduling | Manual, basic availability | AI-powered, multi-factor optimization |
|
|
| Queue Management | Basic FIFO with priority | Real-time, dynamic positioning |
|
|
| Waitlist | Manual contact, basic tracking | Automated fulfillment, intelligent matching |
|
|
| Notifications | Email only, basic templates | Multi-channel (SMS/Email/Push/Voice) |
|
|
| Integration | None | EHR, Insurance, HL7/FHIR ready |
|
|
| Patient Portal | Admin-only scheduling | Self-service with real-time availability |
|
|
|
|
---
|
|
|
|
## Phase 10: Smart Scheduling Engine
|
|
|
|
**Priority**: HIGH
|
|
**Estimated Time**: 15-20 hours
|
|
**Dependencies**: Phases 1-7 completed
|
|
|
|
### Overview
|
|
|
|
Implement an intelligent scheduling system that considers multiple factors to optimize appointment placement, reduce no-shows, and improve resource utilization.
|
|
|
|
### 10.1 Database Schema Enhancements
|
|
|
|
#### New Models
|
|
|
|
```python
|
|
# appointments/models.py additions
|
|
|
|
class SchedulingPreference(models.Model):
|
|
"""Store patient scheduling preferences and patterns"""
|
|
tenant = models.ForeignKey('core.Tenant', on_delete=models.CASCADE)
|
|
patient = models.OneToOneField('patients.PatientProfile', on_delete=models.CASCADE)
|
|
|
|
# Preference data
|
|
preferred_days = models.JSONField(default=list) # ['Monday', 'Wednesday']
|
|
preferred_times = models.JSONField(default=list) # ['morning', 'afternoon']
|
|
preferred_providers = models.ManyToManyField('accounts.User', blank=True)
|
|
|
|
# Behavioral data
|
|
average_no_show_rate = models.DecimalField(max_digits=5, decimal_places=2, default=0)
|
|
average_late_arrival_minutes = models.IntegerField(default=0)
|
|
total_appointments = models.IntegerField(default=0)
|
|
completed_appointments = models.IntegerField(default=0)
|
|
|
|
# Geographic data
|
|
home_address = models.TextField(blank=True)
|
|
travel_time_to_clinic = models.DurationField(null=True, blank=True)
|
|
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
|
|
|
|
class AppointmentPriorityRule(models.Model):
|
|
"""Define rules for appointment prioritization"""
|
|
tenant = models.ForeignKey('core.Tenant', on_delete=models.CASCADE)
|
|
name = models.CharField(max_length=100)
|
|
description = models.TextField()
|
|
|
|
# Rule conditions
|
|
appointment_types = models.ManyToManyField('AppointmentTemplate', blank=True)
|
|
specialties = models.JSONField(default=list)
|
|
diagnosis_codes = models.JSONField(default=list) # ICD-10 codes
|
|
|
|
# Priority scoring
|
|
base_priority_score = models.IntegerField(default=0)
|
|
urgency_multiplier = models.DecimalField(max_digits=3, decimal_places=2, default=1.0)
|
|
|
|
# Time constraints
|
|
max_wait_days = models.IntegerField(null=True, blank=True)
|
|
requires_same_day = models.BooleanField(default=False)
|
|
|
|
is_active = models.BooleanField(default=True)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
|
|
|
|
class SchedulingMetrics(models.Model):
|
|
"""Track scheduling performance metrics"""
|
|
tenant = models.ForeignKey('core.Tenant', on_delete=models.CASCADE)
|
|
provider = models.ForeignKey('accounts.User', on_delete=models.CASCADE)
|
|
date = models.DateField()
|
|
|
|
# Utilization metrics
|
|
total_slots = models.IntegerField(default=0)
|
|
booked_slots = models.IntegerField(default=0)
|
|
completed_appointments = models.IntegerField(default=0)
|
|
no_shows = models.IntegerField(default=0)
|
|
cancellations = models.IntegerField(default=0)
|
|
|
|
# Time metrics
|
|
average_appointment_duration = models.DurationField(null=True)
|
|
average_wait_time = models.DurationField(null=True)
|
|
total_overtime_minutes = models.IntegerField(default=0)
|
|
|
|
# Calculated fields
|
|
utilization_rate = models.DecimalField(max_digits=5, decimal_places=2, default=0)
|
|
no_show_rate = models.DecimalField(max_digits=5, decimal_places=2, default=0)
|
|
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
|
|
class Meta:
|
|
unique_together = ['tenant', 'provider', 'date']
|
|
```
|
|
|
|
### 10.2 Smart Scheduler Implementation
|
|
|
|
#### Core Scheduling Logic
|
|
|
|
```python
|
|
# appointments/scheduling/smart_scheduler.py
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict, Optional
|
|
from django.db.models import Q, Avg, Count
|
|
from django.utils import timezone
|
|
import numpy as np
|
|
|
|
class SmartScheduler:
|
|
"""
|
|
Intelligent appointment scheduling engine with multi-factor optimization.
|
|
"""
|
|
|
|
def __init__(self, tenant):
|
|
self.tenant = tenant
|
|
self.weights = {
|
|
'provider_availability': 0.30,
|
|
'patient_priority': 0.25,
|
|
'no_show_risk': 0.20,
|
|
'geographic_proximity': 0.15,
|
|
'patient_preference': 0.10
|
|
}
|
|
|
|
def find_optimal_slots(
|
|
self,
|
|
patient,
|
|
provider,
|
|
appointment_type,
|
|
preferred_dates: List[datetime],
|
|
duration_minutes: int = 30,
|
|
max_results: int = 10
|
|
) -> List[Dict]:
|
|
"""
|
|
Find optimal appointment slots using multi-factor analysis.
|
|
|
|
Returns:
|
|
List of slot dictionaries with scores and metadata
|
|
"""
|
|
# Step 1: Get base available slots
|
|
base_slots = self._get_available_slots(
|
|
provider, preferred_dates, duration_minutes
|
|
)
|
|
|
|
if not base_slots:
|
|
return []
|
|
|
|
# Step 2: Score each slot
|
|
scored_slots = []
|
|
for slot in base_slots:
|
|
score = self._calculate_slot_score(
|
|
slot, patient, provider, appointment_type
|
|
)
|
|
scored_slots.append({
|
|
'datetime': slot['start'],
|
|
'end_datetime': slot['end'],
|
|
'provider': provider,
|
|
'score': score,
|
|
'factors': score['breakdown']
|
|
})
|
|
|
|
# Step 3: Sort by score and return top results
|
|
scored_slots.sort(key=lambda x: x['score']['total'], reverse=True)
|
|
return scored_slots[:max_results]
|
|
|
|
def _get_available_slots(
|
|
self,
|
|
provider,
|
|
date_range: List[datetime],
|
|
duration_minutes: int
|
|
) -> List[Dict]:
|
|
"""Get all available time slots for provider."""
|
|
from appointments.models import SlotAvailability, AppointmentRequest
|
|
|
|
slots = []
|
|
duration = timedelta(minutes=duration_minutes)
|
|
|
|
for date in date_range:
|
|
# Get provider's availability for this date
|
|
availabilities = SlotAvailability.objects.filter(
|
|
tenant=self.tenant,
|
|
provider=provider,
|
|
date=date.date(),
|
|
is_active=True,
|
|
is_blocked=False
|
|
)
|
|
|
|
for availability in availabilities:
|
|
# Calculate time slots within availability window
|
|
current_time = datetime.combine(
|
|
availability.date,
|
|
availability.start_time
|
|
)
|
|
end_time = datetime.combine(
|
|
availability.date,
|
|
availability.end_time
|
|
)
|
|
|
|
while current_time + duration <= end_time:
|
|
# Check if slot is already booked
|
|
if not self._is_slot_booked(provider, current_time, duration):
|
|
slots.append({
|
|
'start': current_time,
|
|
'end': current_time + duration,
|
|
'availability_id': availability.id
|
|
})
|
|
|
|
current_time += timedelta(minutes=availability.duration_minutes or 30)
|
|
|
|
return slots
|
|
|
|
def _is_slot_booked(
|
|
self,
|
|
provider,
|
|
start_time: datetime,
|
|
duration: timedelta
|
|
) -> bool:
|
|
"""Check if a time slot is already booked."""
|
|
from appointments.models import AppointmentRequest
|
|
|
|
end_time = start_time + duration
|
|
|
|
overlapping = AppointmentRequest.objects.filter(
|
|
tenant=self.tenant,
|
|
provider=provider,
|
|
scheduled_datetime__lt=end_time,
|
|
scheduled_end_datetime__gt=start_time,
|
|
status__in=['PENDING', 'CONFIRMED', 'CHECKED_IN', 'IN_PROGRESS']
|
|
).exists()
|
|
|
|
return overlapping
|
|
|
|
def _calculate_slot_score(
|
|
self,
|
|
slot: Dict,
|
|
patient,
|
|
provider,
|
|
appointment_type
|
|
) -> Dict:
|
|
"""Calculate multi-factor score for a slot."""
|
|
|
|
scores = {}
|
|
|
|
# Factor 1: Provider availability (buffer time consideration)
|
|
scores['provider_availability'] = self._score_provider_availability(
|
|
provider, slot['start']
|
|
)
|
|
|
|
# Factor 2: Patient priority
|
|
scores['patient_priority'] = self._score_patient_priority(
|
|
patient, appointment_type
|
|
)
|
|
|
|
# Factor 3: No-show risk mitigation
|
|
scores['no_show_risk'] = self._score_no_show_risk(
|
|
patient, slot['start']
|
|
)
|
|
|
|
# Factor 4: Geographic proximity
|
|
scores['geographic_proximity'] = self._score_geographic_proximity(
|
|
patient, slot['start']
|
|
)
|
|
|
|
# Factor 5: Patient preference alignment
|
|
scores['patient_preference'] = self._score_patient_preference(
|
|
patient, provider, slot['start']
|
|
)
|
|
|
|
# Calculate weighted total
|
|
total_score = sum(
|
|
scores[factor] * self.weights[factor]
|
|
for factor in scores
|
|
)
|
|
|
|
return {
|
|
'total': round(total_score, 2),
|
|
'breakdown': scores
|
|
}
|
|
|
|
def _score_provider_availability(
|
|
self,
|
|
provider,
|
|
slot_time: datetime
|
|
) -> float:
|
|
"""Score based on provider's schedule density."""
|
|
# Check appointments before and after this slot
|
|
buffer_time = timedelta(minutes=15)
|
|
|
|
appointments_nearby = AppointmentRequest.objects.filter(
|
|
tenant=self.tenant,
|
|
provider=provider,
|
|
scheduled_datetime__gte=slot_time - buffer_time,
|
|
scheduled_datetime__lte=slot_time + buffer_time,
|
|
status__in=['CONFIRMED', 'CHECKED_IN', 'IN_PROGRESS']
|
|
).count()
|
|
|
|
# Prefer slots with some buffer time
|
|
if appointments_nearby == 0:
|
|
return 100.0 # Ideal - plenty of buffer
|
|
elif appointments_nearby == 1:
|
|
return 75.0 # Good - some buffer
|
|
else:
|
|
return 50.0 # Acceptable - tight schedule
|
|
|
|
def _score_patient_priority(
|
|
self,
|
|
patient,
|
|
appointment_type
|
|
) -> float:
|
|
"""Score based on clinical priority."""
|
|
from appointments.models import AppointmentPriorityRule
|
|
|
|
# Check if there are priority rules for this appointment type
|
|
rules = AppointmentPriorityRule.objects.filter(
|
|
tenant=self.tenant,
|
|
is_active=True,
|
|
appointment_types=appointment_type
|
|
).first()
|
|
|
|
if rules:
|
|
return min(rules.base_priority_score, 100.0)
|
|
|
|
return 50.0 # Default priority
|
|
|
|
def _score_no_show_risk(
|
|
self,
|
|
patient,
|
|
slot_time: datetime
|
|
) -> float:
|
|
"""Score based on predicted no-show probability."""
|
|
try:
|
|
prefs = SchedulingPreference.objects.get(
|
|
tenant=self.tenant,
|
|
patient=patient
|
|
)
|
|
|
|
# Lower score for higher no-show risk
|
|
no_show_rate = float(prefs.average_no_show_rate)
|
|
return max(0, 100 - (no_show_rate * 100))
|
|
|
|
except SchedulingPreference.DoesNotExist:
|
|
return 75.0 # Neutral score for new patients
|
|
|
|
def _score_geographic_proximity(
|
|
self,
|
|
patient,
|
|
slot_time: datetime
|
|
) -> float:
|
|
"""Score based on travel time and time of day."""
|
|
try:
|
|
prefs = SchedulingPreference.objects.get(
|
|
tenant=self.tenant,
|
|
patient=patient
|
|
)
|
|
|
|
if not prefs.travel_time_to_clinic:
|
|
return 50.0
|
|
|
|
# Prefer slots that allow comfortable travel time
|
|
hour = slot_time.hour
|
|
travel_minutes = prefs.travel_time_to_clinic.total_seconds() / 60
|
|
|
|
# Morning rush hour (7-9 AM)
|
|
if 7 <= hour <= 9:
|
|
penalty = travel_minutes * 0.5
|
|
# Evening rush hour (4-6 PM)
|
|
elif 16 <= hour <= 18:
|
|
penalty = travel_minutes * 0.5
|
|
else:
|
|
penalty = 0
|
|
|
|
return max(0, 100 - penalty)
|
|
|
|
except SchedulingPreference.DoesNotExist:
|
|
return 50.0
|
|
|
|
def _score_patient_preference(
|
|
self,
|
|
patient,
|
|
provider,
|
|
slot_time: datetime
|
|
) -> float:
|
|
"""Score based on patient's historical preferences."""
|
|
try:
|
|
prefs = SchedulingPreference.objects.get(
|
|
tenant=self.tenant,
|
|
patient=patient
|
|
)
|
|
|
|
score = 50.0 # Base score
|
|
|
|
# Check day preference
|
|
day_name = slot_time.strftime('%A')
|
|
if day_name in prefs.preferred_days:
|
|
score += 25.0
|
|
|
|
# Check time preference
|
|
hour = slot_time.hour
|
|
if hour < 12 and 'morning' in prefs.preferred_times:
|
|
score += 15.0
|
|
elif 12 <= hour < 17 and 'afternoon' in prefs.preferred_times:
|
|
score += 15.0
|
|
elif hour >= 17 and 'evening' in prefs.preferred_times:
|
|
score += 15.0
|
|
|
|
# Check provider preference
|
|
if provider in prefs.preferred_providers.all():
|
|
score += 10.0
|
|
|
|
return min(score, 100.0)
|
|
|
|
except SchedulingPreference.DoesNotExist:
|
|
return 50.0
|
|
|
|
def apply_priority_routing(
|
|
self,
|
|
slots: List[Dict],
|
|
patient,
|
|
appointment_type
|
|
) -> List[Dict]:
|
|
"""Route high-priority patients to earliest available slots."""
|
|
from appointments.models import AppointmentPriorityRule
|
|
|
|
# Check if this is a high-priority appointment
|
|
rules = AppointmentPriorityRule.objects.filter(
|
|
tenant=self.tenant,
|
|
is_active=True,
|
|
appointment_types=appointment_type,
|
|
base_priority_score__gte=80 # High priority threshold
|
|
).first()
|
|
|
|
if rules and rules.requires_same_day:
|
|
# Filter to same-day slots only
|
|
today = timezone.now().date()
|
|
slots = [s for s in slots if s['datetime'].date() == today]
|
|
|
|
if rules:
|
|
# Sort by earliest time for high-priority
|
|
slots.sort(key=lambda x: x['datetime'])
|
|
|
|
return slots
|
|
|
|
def predict_no_show_probability(self, patient, slot_time: datetime) -> float:
|
|
"""
|
|
Predict probability of no-show using historical data.
|
|
|
|
Returns:
|
|
Float between 0 and 1 representing no-show probability
|
|
"""
|
|
try:
|
|
prefs = SchedulingPreference.objects.get(
|
|
tenant=self.tenant,
|
|
patient=patient
|
|
)
|
|
|
|
# Base probability from historical rate
|
|
base_prob = float(prefs.average_no_show_rate) / 100
|
|
|
|
# Adjust based on time factors
|
|
hour = slot_time.hour
|
|
day_of_week = slot_time.weekday()
|
|
|
|
# Early morning appointments have higher no-show rates
|
|
if hour < 8:
|
|
base_prob *= 1.3
|
|
|
|
# Monday appointments have higher no-show rates
|
|
if day_of_week == 0:
|
|
base_prob *= 1.2
|
|
|
|
# Friday afternoon appointments have higher no-show rates
|
|
if day_of_week == 4 and hour >= 14:
|
|
base_prob *= 1.15
|
|
|
|
return min(base_prob, 1.0)
|
|
|
|
except SchedulingPreference.DoesNotExist:
|
|
return 0.15 # Default 15% no-show rate for new patients
|
|
```
|
|
|
|
### 10.3 Scheduling Utilities
|
|
|
|
```python
|
|
# appointments/scheduling/utils.py
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict
|
|
from django.db.models import Avg, Count, Q
|
|
from django.utils import timezone
|
|
|
|
class SchedulingAnalytics:
|
|
"""Analytics and reporting for scheduling performance."""
|
|
|
|
@staticmethod
|
|
def calculate_provider_utilization(provider, date_range_start, date_range_end):
|
|
"""Calculate provider utilization rate."""
|
|
from appointments.models import SlotAvailability, AppointmentRequest
|
|
|
|
# Total available slots
|
|
total_slots = SlotAvailability.objects.filter(
|
|
provider=provider,
|
|
date__gte=date_range_start,
|
|
date__lte=date_range_end,
|
|
is_active=True
|
|
).aggregate(
|
|
total=Count('id')
|
|
)['total'] or 0
|
|
|
|
# Booked appointments
|
|
booked = AppointmentRequest.objects.filter(
|
|
provider=provider,
|
|
scheduled_datetime__date__gte=date_range_start,
|
|
scheduled_datetime__date__lte=date_range_end,
|
|
status__in=['CONFIRMED', 'COMPLETED', 'IN_PROGRESS']
|
|
).count()
|
|
|
|
if total_slots == 0:
|
|
return 0.0
|
|
|
|
return round((booked / total_slots) * 100, 2)
|
|
|
|
@staticmethod
|
|
def calculate_no_show_rate(provider, date_range_start, date_range_end):
|
|
"""Calculate no-show rate for provider."""
|
|
from appointments.models import AppointmentRequest
|
|
|
|
total = AppointmentRequest.objects.filter(
|
|
provider=provider,
|
|
scheduled_datetime__date__gte=date_range_start,
|
|
scheduled_datetime__date__lte=date_range_end
|
|
).count()
|
|
|
|
no_shows = AppointmentRequest.objects.filter(
|
|
provider=provider,
|
|
scheduled_datetime__date__gte=date_range_start,
|
|
scheduled_datetime__date__lte=date_range_end,
|
|
status='NO_SHOW'
|
|
).count()
|
|
|
|
if total == 0:
|
|
return 0.0
|
|
|
|
return round((no_shows / total) * 100, 2)
|
|
|
|
@staticmethod
|
|
def update_patient_scheduling_preferences(patient):
|
|
"""Update patient preferences based on historical data."""
|
|
from appointments.models import AppointmentRequest, SchedulingPreference
|
|
|
|
# Get patient's appointment history
|
|
appointments = AppointmentRequest.objects.filter(
|
|
patient=patient
|
|
).order_by('-scheduled_datetime')
|
|
|
|
if not appointments.exists():
|
|
return None
|
|
|
|
# Calculate metrics
|
|
total = appointments.count()
|
|
completed = appointments.filter(status='COMPLETED').count()
|
|
no_shows = appointments.filter(status='NO_SHOW').count()
|
|
|
|
# Extract patterns
|
|
preferred_days = list(set([
|
|
appt.scheduled_datetime.strftime('%A')
|
|
for appt in appointments.filter(status='COMPLETED')[:20]
|
|
]))
|
|
|
|
preferred_times = []
|
|
for appt in appointments.filter(status='COMPLETED')[:20]:
|
|
hour = appt.scheduled_datetime.hour
|
|
if hour < 12:
|
|
preferred_times.append('morning')
|
|
elif hour < 17:
|
|
preferred_times.append('afternoon')
|
|
else:
|
|
preferred_times.append('evening')
|
|
|
|
preferred_times = list(set(preferred_times))
|
|
|
|
# Update or create preferences
|
|
prefs, created = SchedulingPreference.objects.update_or_create(
|
|
tenant=patient.tenant,
|
|
patient=patient,
|
|
defaults={
|
|
'preferred_days': preferred_days,
|
|
'preferred_times': preferred_times,
|
|
'total_appointments': total,
|
|
'completed_appointments': completed,
|
|
'average_no_show_rate': round((no_shows / total) * 100, 2) if total > 0 else 0
|
|
}
|
|
)
|
|
|
|
return prefs
|
|
|
|
|
|
class ConflictDetector:
|
|
"""Detect and resolve scheduling conflicts."""
|
|
|
|
@staticmethod
|
|
def check_conflicts(provider, start_time, end_time, exclude_appointment_id=None):
|
|
"""Check for scheduling conflicts."""
|
|
from appointments.models import AppointmentRequest
|
|
|
|
conflicts = AppointmentRequest.objects.filter(
|
|
provider=provider,
|
|
scheduled_datetime__lt=end_time,
|
|
scheduled_end_datetime__gt=start_time,
|
|
status__in=['PENDING', 'CONFIRMED', 'CHECKED_IN', 'IN_PROGRESS']
|
|
)
|
|
|
|
if exclude_appointment_id:
|
|
conflicts = conflicts.exclude(id=exclude_appointment_id)
|
|
|
|
return list(conflicts)
|
|
|
|
@staticmethod
|
|
def suggest_alternative_slots(provider, original_time, duration_minutes=30, count=5):
|
|
"""Suggest alternative slots when conflict exists."""
|
|
from appointments.scheduling.smart_scheduler import SmartScheduler
|
|
|
|
scheduler = SmartScheduler(provider.tenant)
|
|
|
|
# Generate date range (next 7 days)
|
|
date_range = [
|
|
original_time + timedelta(days=i)
|
|
for i in range(7)
|
|
]
|
|
|
|
# Find available slots
|
|
slots = scheduler._get_available_slots(
|
|
provider, date_range, duration_minutes
|
|
)
|
|
|
|
# Return closest alternatives
|
|
slots.sort(key=lambda x: abs((x['start'] - original_time).total_seconds()))
|
|
return slots[:count]
|
|
```
|
|
|
|
### 10.4 API Endpoints
|
|
|
|
```python
|
|
# appointments/api/scheduling_views.py
|
|
|
|
from rest_framework import viewsets, status
|
|
from rest_framework.decorators import action
|
|
from rest_framework.response import Response
|
|
from django.utils import timezone
|
|
from datetime import datetime, timedelta
|
|
|
|
from appointments.scheduling.smart_scheduler import SmartScheduler
|
|
from appointments.scheduling.utils import SchedulingAnalytics, ConflictDetector
|
|
|
|
|
|
class SmartSchedulingViewSet(viewsets.ViewSet):
|
|
"""API endpoints for smart scheduling features."""
|
|
|
|
@action(detail=False, methods=['post'])
|
|
def find_optimal_slots(self, request):
|
|
"""
|
|
Find optimal appointment slots using AI.
|
|
|
|
POST /api/appointments/scheduling/find-optimal-slots/
|
|
{
|
|
"patient_id": 123,
|
|
"provider_id": 456,
|
|
"appointment_type_id": 789,
|
|
"preferred_dates": ["2025-01-15", "2025-01-16"],
|
|
"duration_minutes": 30
|
|
}
|
|
"""
|
|
# Extract parameters
|
|
patient_id = request.data.get('patient_id')
|
|
provider_id = request.data.get('provider_id')
|
|
appointment_type_id = request.data.get('appointment_type_id')
|
|
preferred_dates = request.data.get('preferred_dates', [])
|
|
duration_minutes = request.data.get('duration_minutes', 30)
|
|
|
|
# Validate inputs
|
|
if not all([patient_id, provider_id, appointment_type_id]):
|
|
return Response(
|
|
{'error': 'Missing required parameters'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Get objects
|
|
from patients.models import PatientProfile
|
|
from accounts.models import User
|
|
from appointments.models import AppointmentTemplate
|
|
|
|
try:
|
|
patient = PatientProfile.objects.get(id=patient_id)
|
|
provider = User.objects.get(id=provider_id)
|
|
appointment_type = AppointmentTemplate.objects.get(id=appointment_type_id)
|
|
except Exception as e:
|
|
return Response(
|
|
{'error': str(e)},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
# Convert date strings to datetime objects
|
|
date_objects = [
|
|
datetime.strptime(date_str, '%Y-%m-%d')
|
|
for date_str in preferred_dates
|
|
]
|
|
|
|
# Find optimal slots
|
|
scheduler = SmartScheduler(request.user.tenant)
|
|
slots = scheduler.find_optimal_slots(
|
|
patient=patient,
|
|
provider=provider,
|
|
appointment_type=appointment_type,
|
|
preferred_dates=date_objects,
|
|
duration_minutes=duration_minutes
|
|
)
|
|
|
|
# Format response
|
|
return Response({
|
|
'slots': [
|
|
{
|
|
'datetime': slot['datetime'].isoformat(),
|
|
'end_datetime': slot['end_datetime'].isoformat(),
|
|
'provider_id': slot['provider'].id,
|
|
'provider_name': slot['provider'].get_full_name(),
|
|
'score': slot['score']['total'],
|
|
'score_breakdown': slot['score']['breakdown']
|
|
}
|
|
for slot in slots
|
|
],
|
|
'count': len(slots)
|
|
})
|
|
|
|
@action(detail=False, methods=['post'])
|
|
def check_conflicts(self, request):
|
|
"""
|
|
Check for scheduling conflicts.
|
|
|
|
POST /api/appointments/scheduling/check-conflicts/
|
|
{
|
|
"provider_id": 456,
|
|
"start_time": "2025-01-15T10:00:00",
|
|
"end_time": "2025-01-15T10:30:00",
|
|
"exclude_appointment_id": 123
|
|
}
|
|
"""
|
|
provider_id = request.data.get('provider_id')
|
|
start_time = request.data.get('start_time')
|
|
end_time = request.data.get('end_time')
|
|
exclude_id = request.data.get('exclude_appointment_id')
|
|
|
|
# Get provider
|
|
from accounts.models import User
|
|
try:
|
|
provider = User.objects.get(id=provider_id)
|
|
except User.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Provider not found'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
# Parse times
|
|
start_dt = datetime.fromisoformat(start_time)
|
|
end_dt = datetime.fromisoformat(end_time)
|
|
|
|
# Check conflicts
|
|
conflicts = ConflictDetector.check_conflicts(
|
|
provider, start_dt, end_dt, exclude_id
|
|
)
|
|
|
|
if conflicts:
|
|
# Suggest alternatives
|
|
alternatives = ConflictDetector.suggest_alternative_slots(
|
|
provider, start_dt,
|
|
duration_minutes=int((end_dt - start_dt).total_seconds() / 60)
|
|
)
|
|
|
|
return Response({
|
|
'has_conflict': True,
|
|
'conflicts': [
|
|
{
|
|
'id': c.id,
|
|
'patient': c.patient.get_full_name(),
|
|
'start': c.scheduled_datetime.isoformat(),
|
|
'end': c.scheduled_end_datetime.isoformat()
|
|
}
|
|
for c in conflicts
|
|
],
|
|
'alternative_slots': [
|
|
{
|
|
'start': alt['start'].isoformat(),
|
|
'end': alt['end'].isoformat()
|
|
}
|
|
for alt in alternatives
|
|
]
|
|
})
|
|
|
|
return Response({'has_conflict': False})
|
|
|
|
@action(detail=False, methods=['get'])
|
|
def provider_analytics(self, request):
|
|
"""
|
|
Get scheduling analytics for a provider.
|
|
|
|
GET /api/appointments/scheduling/provider-analytics/?provider_id=456&days=30
|
|
"""
|
|
provider_id = request.query_params.get('provider_id')
|
|
days = int(request.query_params.get('days', 30))
|
|
|
|
from accounts.models import User
|
|
try:
|
|
provider = User.objects.get(id=provider_id)
|
|
except User.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Provider not found'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
# Calculate date range
|
|
end_date = timezone.now().date()
|
|
start_date = end_date - timedelta(days=days)
|
|
|
|
# Get analytics
|
|
utilization = SchedulingAnalytics.calculate_provider_utilization(
|
|
provider, start_date, end_date
|
|
)
|
|
no_show_rate = SchedulingAnalytics.calculate_no_show_rate(
|
|
provider, start_date, end_date
|
|
)
|
|
|
|
return Response({
|
|
'provider_id': provider.id,
|
|
'provider_name': provider.get_full_name(),
|
|
'period': {
|
|
'start': start_date.isoformat(),
|
|
'end': end_date.isoformat(),
|
|
'days': days
|
|
},
|
|
'metrics': {
|
|
'utilization_rate': utilization,
|
|
'no_show_rate': no_show_rate
|
|
}
|
|
})
|
|
```
|
|
|
|
### 10.5 Implementation Checklist
|
|
|
|
- [x] **Step 1: Database Changes (models.py)** ✅ COMPLETE
|
|
- [x] Create SchedulingPreference model
|
|
- [x] Create AppointmentPriorityRule model
|
|
- [x] Create SchedulingMetrics model
|
|
- [x] Run migrations
|
|
- [x] Add indexes for performance
|
|
|
|
- [x] **Step 2: Core Logic (scheduling/)** ✅ COMPLETE
|
|
- [x] Implement SmartScheduler class
|
|
- [x] Implement multi-factor scoring algorithm
|
|
- [x] Implement priority routing logic
|
|
- [x] Implement no-show prediction
|
|
- [x] Implement geographic optimization
|
|
|
|
- [x] **Step 3: Utilities (utils.py)** ✅ COMPLETE
|
|
- [x] Implement SchedulingAnalytics class
|
|
- [x] Implement ConflictDetector class
|
|
- [x] Create preference update utilities
|
|
- [x] Existing utils already contain required functionality
|
|
|
|
- [x] **Step 4: API Endpoints (api/)** ✅ COMPLETE
|
|
- [x] Create SmartSchedulingViewSet
|
|
- [x] Add find_optimal_slots endpoint
|
|
- [x] Add check_conflicts endpoint
|
|
- [x] Add provider_analytics endpoint
|
|
- [x] Add update_patient_preferences endpoint
|
|
|
|
- [x] **Step 5: Admin Interface (admin.py)** ✅ COMPLETE
|
|
- [x] Register SchedulingPreference in admin
|
|
- [x] Register AppointmentPriorityRule in admin
|
|
- [x] Register SchedulingMetrics in admin
|
|
|
|
- [x] **Step 6: Views (views.py)** ✅ COMPLETE
|
|
- [x] SmartSchedulingView - Main scheduling interface
|
|
- [x] find_optimal_slots_view - HTMX slot finder
|
|
- [x] scheduling_analytics_view - Analytics dashboard
|
|
- [x] check_scheduling_conflicts_view - HTMX conflict checker
|
|
- [x] update_patient_preferences_view - Manual preference update
|
|
- [x] scheduling_metrics_dashboard - Metrics dashboard
|
|
|
|
- [x] **Step 7: URLs (urls.py)** ✅ COMPLETE
|
|
- [x] /appointments/scheduling/ - Main interface
|
|
- [x] /appointments/scheduling/find-optimal-slots/ - Find slots
|
|
- [x] /appointments/scheduling/check-conflicts/ - Check conflicts
|
|
- [x] /appointments/scheduling/analytics/ - Analytics
|
|
- [x] /appointments/scheduling/metrics/ - Metrics dashboard
|
|
- [x] /appointments/scheduling/patient/<id>/update-preferences/ - Update prefs
|
|
|
|
- [x] **Step 8: Templates** ✅ COMPLETE (8/8 templates)
|
|
- [x] **Main Templates (4/4):**
|
|
- [x] `smart_scheduling.html` - Main scheduling interface with search form
|
|
- [x] `analytics.html` - Provider performance analytics dashboard
|
|
- [x] `metrics_dashboard.html` - Detailed metrics with time-series data
|
|
- [x] `conflicts.html` - Standalone conflict checking page
|
|
- [x] **Partial Templates (4/4):**
|
|
- [x] `partials/optimal_slots.html` - AI-ranked slot results with score breakdown
|
|
- [x] `partials/conflicts.html` - Conflict detection with alternative suggestions
|
|
- [x] `partials/provider_metrics.html` - Reusable provider metrics card
|
|
- [x] `partials/slot_score_breakdown.html` - Detailed scoring visualization
|
|
|
|
- [ ] **Testing** (Future)
|
|
- [ ] Unit tests for SmartScheduler
|
|
- [ ] Unit tests for scoring algorithms
|
|
- [ ] Integration tests for API endpoints
|
|
- [ ] Performance tests for slot finding
|
|
|
|
- [ ] **Documentation** (Future)
|
|
- [ ] API documentation
|
|
- [ ] Algorithm documentation
|
|
- [ ] User guide for scheduling features
|
|
|
|
### 10.6 Success Criteria
|
|
|
|
- ✅ Slot finding returns results in <2 seconds for 30-day range
|
|
- ✅ Multi-factor scoring produces measurable improvement in slot utilization
|
|
- ✅ No-show prediction accuracy >70%
|
|
- ✅ API endpoints handle 100+ concurrent requests
|
|
- ✅ Patient preferences auto-update after each appointment
|
|
- ✅ Conflict detection catches 100% of overlapping appointments
|
|
|
|
---
|
|
|
|
## Phase 11: Advanced Queue Management
|
|
|
|
**Priority**: HIGH
|
|
**Estimated Time**: 12-15 hours
|
|
**Dependencies**: Phase 10 completed
|
|
|
|
### Overview
|
|
|
|
Enhance the queue management system with real-time calculations, dynamic positioning, WebSocket support, and advanced analytics.
|
|
|
|
### 11.1 Database Schema Enhancements
|
|
|
|
```python
|
|
# appointments/models.py additions
|
|
|
|
class QueueConfiguration(models.Model):
|
|
"""Advanced queue configuration and rules"""
|
|
queue = models.OneToOneField('WaitingQueue', on_delete=models.CASCADE)
|
|
|
|
# Dynamic positioning rules
|
|
use_dynamic_positioning = models.BooleanField(default=True)
|
|
priority_weight = models.DecimalField(max_digits=3, decimal_places=2, default=0.5)
|
|
wait_time_weight = models.DecimalField(max_digits=3, decimal_places=2, default=0.3)
|
|
appointment_time_weight = models.DecimalField(max_digits=3, decimal_places=2, default=0.2)
|
|
|
|
# Capacity management
|
|
enable_overflow_queue = models.BooleanField(default=False)
|
|
overflow_threshold = models.IntegerField(default=10)
|
|
|
|
# Wait time estimation
|
|
use_historical_data = models.BooleanField(default=True)
|
|
default_service_time_minutes = models.IntegerField(default=20)
|
|
|
|
# Real-time updates
|
|
enable_websocket_updates = models.BooleanField(default=True)
|
|
update_interval_seconds = models.IntegerField(default=30)
|
|
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
|
|
|
|
class QueueMetrics(models.Model):
|
|
"""Track queue performance metrics"""
|
|
queue = models.ForeignKey('WaitingQueue', on_delete=models.CASCADE)
|
|
date = models.DateField()
|
|
hour = models.IntegerField() # 0-23
|
|
|
|
# Volume metrics
|
|
total_entries = models.IntegerField(default=0)
|
|
completed_entries = models.IntegerField(default=0)
|
|
no_shows = models.IntegerField(default=0)
|
|
|
|
# Time metrics
|
|
average_wait_time_minutes = models.DecimalField(max_digits=6, decimal_places=2, default=0)
|
|
max_wait_time_minutes = models.IntegerField(default=0)
|
|
average_service_time_minutes = models.DecimalField(max_digits=6, decimal_places=2, default=0)
|
|
|
|
# Queue state
|
|
peak_queue_size = models.IntegerField(default=0)
|
|
average_queue_size = models.DecimalField(max_digits=6, decimal_places=2, default=0)
|
|
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
|
|
class Meta:
|
|
unique_together = ['queue', 'date', 'hour']
|
|
indexes = [
|
|
models.Index(fields=['queue', 'date']),
|
|
models.Index(fields=['date', 'hour']),
|
|
]
|
|
```
|
|
|
|
### 11.2 Queue Engine Implementation
|
|
|
|
```python
|
|
# appointments/queue/queue_engine.py
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict, Optional
|
|
from django.db.models import Avg, Count, F, Q
|
|
from django.utils import timezone
|
|
from decimal import Decimal
|
|
|
|
class AdvancedQueueEngine:
|
|
"""
|
|
Advanced queue management with dynamic positioning and real-time updates.
|
|
"""
|
|
|
|
def __init__(self, queue):
|
|
self.queue = queue
|
|
try:
|
|
self.config = queue.queueconfiguration
|
|
except:
|
|
# Create default configuration if not exists
|
|
from appointments.models import QueueConfiguration
|
|
self.config = QueueConfiguration.objects.create(queue=queue)
|
|
|
|
def add_to_queue(
|
|
self,
|
|
patient,
|
|
appointment=None,
|
|
priority_score: float = 0.0,
|
|
notes: str = ""
|
|
):
|
|
"""
|
|
Add patient to queue with intelligent positioning.
|
|
"""
|
|
from appointments.models import QueueEntry
|
|
|
|
# Calculate optimal position
|
|
position = self.calculate_optimal_position(patient, priority_score, appointment)
|
|
|
|
# Estimate wait time
|
|
wait_time = self.calculate_estimated_wait_time(position)
|
|
|
|
# Create queue entry
|
|
entry = QueueEntry.objects.create(
|
|
queue=self.queue,
|
|
patient=patient,
|
|
appointment=appointment,
|
|
queue_position=position,
|
|
priority_score=priority_score,
|
|
estimated_service_time=wait_time,
|
|
notes=notes,
|
|
status='WAITING'
|
|
)
|
|
|
|
# Reposition other entries if needed
|
|
if self.config.use_dynamic_positioning:
|
|
self.reposition_queue_entries()
|
|
|
|
# Broadcast update via WebSocket
|
|
if self.config.enable_websocket_updates:
|
|
self.broadcast_queue_update()
|
|
|
|
# Update metrics
|
|
self.update_queue_metrics()
|
|
|
|
return entry
|
|
|
|
def calculate_optimal_position(
|
|
self,
|
|
patient,
|
|
priority_score: float,
|
|
appointment=None
|
|
) -> int:
|
|
"""
|
|
Calculate optimal queue position using weighted factors.
|
|
"""
|
|
from appointments.models import QueueEntry
|
|
|
|
if not self.config.use_dynamic_positioning:
|
|
# Simple FIFO - add to end
|
|
return self.queue.queueentry_set.filter(
|
|
status='WAITING'
|
|
).count() + 1
|
|
|
|
# Get current waiting entries
|
|
waiting_entries = self.queue.queueentry_set.filter(
|
|
status='WAITING'
|
|
).order_by('queue_position')
|
|
|
|
if not waiting_entries.exists():
|
|
return 1
|
|
|
|
# Calculate scores for each position
|
|
best_position = len(waiting_entries) + 1
|
|
best_score = float('-inf')
|
|
|
|
for pos in range(1, len(waiting_entries) + 2):
|
|
score = self._calculate_position_score(
|
|
pos, priority_score, appointment, waiting_entries
|
|
)
|
|
if score > best_score:
|
|
best_score = score
|
|
best_position = pos
|
|
|
|
return best_position
|
|
|
|
def _calculate_position_score(
|
|
self,
|
|
position: int,
|
|
priority_score: float,
|
|
appointment,
|
|
waiting_entries
|
|
) -> float:
|
|
"""Calculate score for a specific position."""
|
|
|
|
# Factor 1: Priority score (higher is better)
|
|
priority_factor = priority_score * float(self.config.priority_weight)
|
|
|
|
# Factor 2: Wait time fairness (earlier positions for longer waits)
|
|
wait_time_factor = 0.0
|
|
if appointment and appointment.scheduled_datetime:
|
|
time_until_appt = (
|
|
appointment.scheduled_datetime - timezone.now()
|
|
).total_seconds() / 60
|
|
# Normalize to 0-100 scale
|
|
wait_time_factor = min(100, max(0, 100 - time_until_appt))
|
|
wait_time_factor *= float(self.config.wait_time_weight)
|
|
|
|
# Factor 3: Appointment time proximity
|
|
appt_time_factor = 0.0
|
|
if appointment:
|
|
# Prefer positions that minimize disruption
|
|
appt_time_factor = (100 - position) * float(self.config.appointment_time_weight)
|
|
|
|
return priority_factor + wait_time_factor + appt_time_factor
|
|
|
|
def calculate_estimated_wait_time(self, position: int) -> timedelta:
|
|
"""
|
|
Calculate estimated wait time based on position and historical data.
|
|
"""
|
|
from appointments.models import QueueMetrics
|
|
|
|
if position <= 0:
|
|
return timedelta(0)
|
|
|
|
# Get average service time
|
|
if self.config.use_historical_data:
|
|
# Use historical data from last 7 days
|
|
week_ago = timezone.now().date() - timedelta(days=7)
|
|
avg_service_time = QueueMetrics.objects.filter(
|
|
queue=self.queue,
|
|
date__gte=week_ago
|
|
).aggregate(
|
|
avg=Avg('average_service_time_minutes')
|
|
)['avg'] or self.config.default_service_time_minutes
|
|
else:
|
|
avg_service_time = self.config.default_service_time_minutes
|
|
|
|
# Calculate load factor
|
|
current_size = self.queue.queueentry_set.filter(
|
|
status='WAITING'
|
|
).count()
|
|
|
|
load_factor = self.calculate_load_factor(current_size)
|
|
|
|
# Estimate wait time
|
|
base_wait = avg_service_time * (position - 1)
|
|
adjusted_wait = base_wait * load_factor
|
|
|
|
return timedelta(minutes=int(adjusted_wait))
|
|
|
|
def calculate_load_factor(self, current_size: int) -> float:
|
|
"""
|
|
Calculate load factor based on queue size.
|
|
Higher load = longer wait times per patient.
|
|
"""
|
|
capacity = self.queue.max_queue_size or 10
|
|
utilization = current_size / capacity
|
|
|
|
if utilization < 0.5:
|
|
return 1.0 # Normal processing
|
|
elif utilization < 0.75:
|
|
return 1.2 # Slightly slower
|
|
elif utilization < 0.9:
|
|
return 1.5 # Significantly slower
|
|
else:
|
|
return 2.0 # Very slow, overloaded
|
|
|
|
def reposition_queue_entries(self):
|
|
"""
|
|
Dynamically reposition all waiting entries based on current factors.
|
|
"""
|
|
from appointments.models import QueueEntry
|
|
|
|
waiting_entries = list(
|
|
self.queue.queueentry_set.filter(
|
|
status='WAITING'
|
|
).select_related('patient', 'appointment')
|
|
)
|
|
|
|
if not waiting_entries:
|
|
return
|
|
|
|
# Calculate scores for each entry
|
|
scored_entries = []
|
|
for entry in waiting_entries:
|
|
score = self._calculate_entry_priority_score(entry)
|
|
scored_entries.append((entry, score))
|
|
|
|
# Sort by score (highest first)
|
|
scored_entries.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
# Update positions
|
|
for new_position, (entry, score) in enumerate(scored_entries, start=1):
|
|
if entry.queue_position != new_position:
|
|
entry.queue_position = new_position
|
|
entry.save(update_fields=['queue_position'])
|
|
|
|
def _calculate_entry_priority_score(self, entry) -> float:
|
|
"""Calculate priority score for repositioning."""
|
|
|
|
# Base priority score
|
|
score = entry.priority_score or 0.0
|
|
|
|
# Add wait time factor
|
|
wait_minutes = (timezone.now() - entry.joined_at).total_seconds() / 60
|
|
wait_factor = min(50, wait_minutes / 2) # Cap at 50 points
|
|
|
|
# Add appointment proximity factor
|
|
if entry.appointment and entry.appointment.scheduled_datetime:
|
|
time_until = (
|
|
entry.appointment.scheduled_datetime - timezone.now()
|
|
).total_seconds() / 60
|
|
if time_until < 30: # Within 30 minutes
|
|
proximity_factor = 30
|
|
elif time_until < 60: # Within 1 hour
|
|
proximity_factor = 15
|
|
else:
|
|
proximity_factor = 0
|
|
else:
|
|
proximity_factor = 0
|
|
|
|
return score + wait_factor + proximity_factor
|
|
|
|
def get_next_patient(self) -> Optional['QueueEntry']:
|
|
"""
|
|
Get the next patient to be served.
|
|
"""
|
|
from appointments.models import QueueEntry
|
|
|
|
next_entry = self.queue.queueentry_set.filter(
|
|
status='WAITING'
|
|
).order_by('queue_position').first()
|
|
|
|
if next_entry:
|
|
# Mark as called
|
|
next_entry.mark_as_called()
|
|
|
|
# Broadcast update
|
|
if self.config.enable_websocket_updates:
|
|
self.broadcast_queue_update()
|
|
|
|
return next_entry
|
|
|
|
def broadcast_queue_update(self):
|
|
"""
|
|
Broadcast queue update via WebSocket.
|
|
"""
|
|
# This will be implemented with Django Channels
|
|
from channels.layers import get_channel_layer
|
|
from asgiref.sync import async_to_sync
|
|
|
|
try:
|
|
channel_layer = get_channel_layer()
|
|
if channel_layer:
|
|
async_to_sync(channel_layer.group_send)(
|
|
f'queue_{self.queue.id}',
|
|
{
|
|
'type': 'queue_update',
|
|
'data': self.get_queue_status()
|
|
}
|
|
)
|
|
except Exception as e:
|
|
# Log error but don't fail
|
|
print(f"WebSocket broadcast error: {e}")
|
|
|
|
def get_queue_status(self) -> Dict:
|
|
"""Get current queue status for broadcasting."""
|
|
from appointments.models import QueueEntry
|
|
|
|
waiting = self.queue.queueentry_set.filter(status='WAITING')
|
|
|
|
return {
|
|
'queue_id': self.queue.id,
|
|
'queue_name': self.queue.name,
|
|
'current_size': waiting.count(),
|
|
'max_size': self.queue.max_queue_size,
|
|
'average_wait_time': str(self.queue.calculate_wait_time()),
|
|
'is_accepting': self.queue.is_accepting_patients,
|
|
'entries': [
|
|
{
|
|
'id': entry.id,
|
|
'patient_name': entry.patient.get_full_name(),
|
|
'position': entry.queue_position,
|
|
'wait_time': str(entry.wait_time_minutes or 0),
|
|
'status': entry.status
|
|
}
|
|
for entry in waiting.order_by('queue_position')[:10]
|
|
],
|
|
'timestamp': timezone.now().isoformat()
|
|
}
|
|
|
|
def update_queue_metrics(self):
|
|
"""Update queue metrics for analytics."""
|
|
from appointments.models import QueueMetrics
|
|
|
|
now = timezone.now()
|
|
|
|
# Get or create metrics for current hour
|
|
metrics, created = QueueMetrics.objects.get_or_create(
|
|
queue=self.queue,
|
|
date=now.date(),
|
|
hour=now.hour,
|
|
defaults={
|
|
'total_entries': 0,
|
|
'completed_entries': 0,
|
|
'no_shows': 0,
|
|
'average_wait_time_minutes': 0,
|
|
'max_wait_time_minutes': 0,
|
|
'average_service_time_minutes': 0,
|
|
'peak_queue_size': 0,
|
|
'average_queue_size': 0
|
|
}
|
|
)
|
|
|
|
# Update current metrics
|
|
current_size = self.queue.queueentry_set.filter(
|
|
status='WAITING'
|
|
).count()
|
|
|
|
metrics.peak_queue_size = max(metrics.peak_queue_size, current_size)
|
|
metrics.save()
|
|
```
|
|
|
|
### 11.3 WebSocket Consumer for Real-time Updates
|
|
|
|
```python
|
|
# appointments/consumers.py
|
|
|
|
import json
|
|
from channels.generic.websocket import AsyncWebsocketConsumer
|
|
from channels.db import database_sync_to_async
|
|
|
|
class QueueConsumer(AsyncWebsocketConsumer):
|
|
"""
|
|
WebSocket consumer for real-time queue updates.
|
|
"""
|
|
|
|
async def connect(self):
|
|
self.queue_id = self.scope['url_route']['kwargs']['queue_id']
|
|
self.queue_group_name = f'queue_{self.queue_id}'
|
|
|
|
# Join queue group
|
|
await self.channel_layer.group_add(
|
|
self.queue_group_name,
|
|
self.channel_name
|
|
)
|
|
|
|
await self.accept()
|
|
|
|
# Send initial queue status
|
|
status = await self.get_queue_status()
|
|
await self.send(text_data=json.dumps(status))
|
|
|
|
async def disconnect(self, close_code):
|
|
# Leave queue group
|
|
await self.channel_layer.group_discard(
|
|
self.queue_group_name,
|
|
self.channel_name
|
|
)
|
|
|
|
async def receive(self, text_data):
|
|
"""Handle messages from WebSocket."""
|
|
data = json.dumps(text_data)
|
|
message_type = data.get('type')
|
|
|
|
if message_type == 'get_status':
|
|
status = await self.get_queue_status()
|
|
await self.send(text_data=json.dumps(status))
|
|
|
|
async def queue_update(self, event):
|
|
"""Handle queue update events from group."""
|
|
await self.send(text_data=json.dumps(event['data']))
|
|
|
|
@database_sync_to_async
|
|
def get_queue_status(self):
|
|
"""Get current queue status."""
|
|
from appointments.models import WaitingQueue
|
|
from appointments.queue.queue_engine import AdvancedQueueEngine
|
|
|
|
try:
|
|
queue = WaitingQueue.objects.get(id=self.queue_id)
|
|
engine = AdvancedQueueEngine(queue)
|
|
return engine.get_queue_status()
|
|
except WaitingQueue.DoesNotExist:
|
|
return {'error': 'Queue not found'}
|
|
```
|
|
|
|
### 11.4 Implementation Checklist
|
|
|
|
- [ ] **Database Changes**
|
|
- [ ] Create QueueConfiguration model
|
|
- [ ] Create QueueMetrics model
|
|
- [ ] Run migrations
|
|
- [ ] Add database indexes
|
|
|
|
- [ ] **Queue Engine**
|
|
- [ ] Implement AdvancedQueueEngine class
|
|
- [ ] Implement dynamic positioning algorithm
|
|
- [ ] Implement load factor calculations
|
|
- [ ] Implement wait time estimation
|
|
|
|
- [ ] **WebSocket Support**
|
|
- [ ] Install Django Channels
|
|
- [ ] Create QueueConsumer
|
|
- [ ] Configure routing
|
|
- [ ] Set up Redis for channel layer
|
|
|
|
- [ ] **API Endpoints**
|
|
- [ ] Create queue management endpoints
|
|
- [ ] Add real-time status endpoint
|
|
- [ ] Add metrics endpoint
|
|
|
|
- [ ] **Frontend Integration**
|
|
- [ ] Create WebSocket client
|
|
- [ ] Add real-time queue display
|
|
- [ ] Add queue management UI
|
|
|
|
- [ ] **Testing**
|
|
- [ ] Unit tests for queue engine
|
|
- [ ] WebSocket connection tests
|
|
- [ ] Load testing for concurrent updates
|
|
|
|
### 11.5 Success Criteria
|
|
|
|
- ✅ Dynamic positioning reduces average wait time by 20%
|
|
- ✅ WebSocket updates delivered in <1 second
|
|
- ✅ System handles 50+ concurrent queue updates
|
|
- ✅ Wait time estimates accurate within ±10 minutes
|
|
- ✅ Queue metrics updated in real-time
|
|
|
|
---
|
|
|
|
## Phase 12: Intelligent Waitlist Automation
|
|
|
|
**Priority**: HIGH
|
|
**Estimated Time**: 10-12 hours
|
|
**Dependencies**: Phases 10-11 completed
|
|
|
|
### Overview
|
|
|
|
Implement automated waitlist fulfillment with intelligent matching, slot cancellation tracking, and multi-channel notifications.
|
|
|
|
### 12.1 Database Schema Enhancements
|
|
|
|
```python
|
|
# appointments/models.py additions
|
|
|
|
class SlotCancellation(models.Model):
|
|
"""Track cancelled slots for waitlist fulfillment"""
|
|
tenant = models.ForeignKey('core.Tenant', on_delete=models.CASCADE)
|
|
provider = models.ForeignKey('accounts.User', on_delete=models.CASCADE)
|
|
original_appointment = models.ForeignKey(
|
|
'AppointmentRequest',
|
|
on_delete=models.SET_NULL,
|
|
null=True,
|
|
related_name='cancellation_record'
|
|
)
|
|
|
|
# Slot details
|
|
cancelled_slot_start = models.DateTimeField()
|
|
cancelled_slot_end = models.DateTimeField()
|
|
appointment_type = models.ForeignKey('AppointmentTemplate', on_delete=models.PROTECT)
|
|
|
|
# Cancellation info
|
|
cancellation_time = models.DateTimeField(auto_now_add=True)
|
|
cancellation_reason = models.TextField(blank=True)
|
|
|
|
# Fulfillment tracking
|
|
filled_by_waitlist = models.BooleanField(default=False)
|
|
filled_by = models.ForeignKey(
|
|
'WaitingList',
|
|
on_delete=models.SET_NULL,
|
|
null=True,
|
|
blank=True
|
|
)
|
|
filled_at = models.DateTimeField(null=True, blank=True)
|
|
|
|
# Notification tracking
|
|
notifications_sent = models.IntegerField(default=0)
|
|
patients_notified = models.ManyToManyField('patients.PatientProfile', blank=True)
|
|
|
|
is_active = models.BooleanField(default=True)
|
|
expires_at = models.DateTimeField() # Slot expires if not filled
|
|
|
|
class Meta:
|
|
indexes = [
|
|
models.Index(fields=['provider', 'cancelled_slot_start']),
|
|
models.Index(fields=['is_active', 'expires_at']),
|
|
]
|
|
|
|
|
|
# Add to existing WaitingList model
|
|
class WaitingList(models.Model):
|
|
# ... existing fields ...
|
|
|
|
# Add flexibility scoring
|
|
flexibility_score = models.IntegerField(default=0) # 0-100
|
|
|
|
# Add notification preferences
|
|
notification_channels = models.JSONField(default=list) # ['sms', 'email', 'push']
|
|
|
|
# Add response tracking
|
|
last_opportunity_offered = models.DateTimeField(null=True, blank=True)
|
|
opportunities_offered = models.IntegerField(default=0)
|
|
opportunities_accepted = models.IntegerField(default=0)
|
|
opportunities_declined = models.IntegerField(default=0)
|
|
```
|
|
|
|
### 12.2 Waitlist Engine Implementation
|
|
|
|
```python
|
|
# appointments/waitlist/waitlist_engine.py
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict, Optional
|
|
from django.db.models import Q
|
|
from django.utils import timezone
|
|
|
|
class WaitlistEngine:
|
|
"""
|
|
Intelligent waitlist management with automated fulfillment.
|
|
"""
|
|
|
|
def __init__(self, tenant):
|
|
self.tenant = tenant
|
|
|
|
def process_cancellation(self, appointment):
|
|
"""
|
|
Process appointment cancellation and attempt waitlist fulfillment.
|
|
"""
|
|
from appointments.models import SlotCancellation
|
|
|
|
# Create cancellation record
|
|
cancellation = SlotCancellation.objects.create(
|
|
tenant=self.tenant,
|
|
provider=appointment.provider,
|
|
original_appointment=appointment,
|
|
cancelled_slot_start=appointment.scheduled_datetime,
|
|
cancelled_slot_end=appointment.scheduled_end_datetime,
|
|
appointment_type=appointment.appointment_type,
|
|
cancellation_reason=appointment.cancellation_reason or '',
|
|
expires_at=appointment.scheduled_datetime - timedelta(hours=2)
|
|
)
|
|
|
|
# Attempt to fill from waitlist
|
|
self.attempt_waitlist_fulfillment(cancellation)
|
|
|
|
return cancellation
|
|
|
|
def attempt_waitlist_fulfillment(self, cancellation):
|
|
"""
|
|
Attempt to fill cancelled slot from waitlist.
|
|
"""
|
|
# Find matching waitlist entries
|
|
matches = self.find_matching_waitlist_entries(cancellation)
|
|
|
|
if not matches:
|
|
return False
|
|
|
|
# Notify matches in priority order
|
|
for waitlist_entry in matches[:5]: # Notify top 5 matches
|
|
self.send_waitlist_opportunity(waitlist_entry, cancellation)
|
|
cancellation.notifications_sent += 1
|
|
cancellation.patients_notified.add(waitlist_entry.patient)
|
|
|
|
cancellation.save()
|
|
return True
|
|
|
|
def find_matching_waitlist_entries(self, cancellation) -> List:
|
|
"""
|
|
Find waitlist entries that match the cancelled slot.
|
|
"""
|
|
from appointments.models import WaitingList
|
|
|
|
# Base query - active waitlist entries
|
|
queryset = WaitingList.objects.filter(
|
|
tenant=self.tenant,
|
|
status='ACTIVE',
|
|
appointment_type=cancellation.appointment_type
|
|
)
|
|
|
|
# Filter by date range
|
|
slot_date = cancellation.cancelled_slot_start.date()
|
|
queryset = queryset.filter(
|
|
Q(desired_date_range_start__lte=slot_date) &
|
|
Q(desired_date_range_end__gte=slot_date)
|
|
)
|
|
|
|
# Filter by provider preference (if specified)
|
|
queryset = queryset.filter(
|
|
Q(preferred_provider=cancellation.provider) |
|
|
Q(preferred_provider__isnull=True)
|
|
)
|
|
|
|
# Score and sort matches
|
|
scored_matches = []
|
|
for entry in queryset:
|
|
score = self.calculate_match_score(entry, cancellation)
|
|
scored_matches.append((entry, score))
|
|
|
|
# Sort by score (highest first) and priority
|
|
scored_matches.sort(
|
|
key=lambda x: (x[1], x[0].priority, -x[0].created_at.timestamp()),
|
|
reverse=True
|
|
)
|
|
|
|
return [match[0] for match in scored_matches]
|
|
|
|
def calculate_match_score(self, waitlist_entry, cancellation) -> float:
|
|
"""
|
|
Calculate how well a waitlist entry matches a cancelled slot.
|
|
"""
|
|
score = 0.0
|
|
|
|
# Factor 1: Provider match (30 points)
|
|
if waitlist_entry.preferred_provider == cancellation.provider:
|
|
score += 30.0
|
|
elif not waitlist_entry.preferred_provider:
|
|
score += 15.0 # Neutral if no preference
|
|
|
|
# Factor 2: Date preference (25 points)
|
|
slot_date = cancellation.cancelled_slot_start.date()
|
|
if waitlist_entry.preferred_date:
|
|
days_diff = abs((slot_date - waitlist_entry.preferred_date).days)
|
|
if days_diff == 0:
|
|
score += 25.0
|
|
elif days_diff <= 3:
|
|
score += 15.0
|
|
elif days_diff <= 7:
|
|
score += 10.0
|
|
else:
|
|
score += 12.5 # Neutral if no preference
|
|
|
|
# Factor 3: Time preference (20 points)
|
|
slot_hour = cancellation.cancelled_slot_start.hour
|
|
if waitlist_entry.preferred_time:
|
|
pref_hour = waitlist_entry.preferred_time.hour
|
|
hour_diff = abs(slot_hour - pref_hour)
|
|
if hour_diff <= 1:
|
|
score += 20.0
|
|
elif hour_diff <= 2:
|
|
score += 10.0
|
|
else:
|
|
score += 10.0 # Neutral if no preference
|
|
|
|
# Factor 4: Flexibility (15 points)
|
|
score += (waitlist_entry.flexibility_score / 100) * 15.0
|
|
|
|
# Factor 5: Wait time (10 points)
|
|
days_waiting = (timezone.now().date() - waitlist_entry.created_at.date()).days
|
|
score += min(10.0, days_waiting / 3) # Max 10 points for 30+ days
|
|
|
|
return score
|
|
|
|
def send_waitlist_opportunity(self, waitlist_entry, cancellation):
|
|
"""
|
|
Send notification about available slot to waitlist patient.
|
|
"""
|
|
from appointments.notifications import NotificationEngine
|
|
|
|
# Update waitlist entry
|
|
waitlist_entry.last_opportunity_offered = timezone.now()
|
|
waitlist_entry.opportunities_offered += 1
|
|
waitlist_entry.save()
|
|
|
|
# Prepare notification context
|
|
context = {
|
|
'patient': waitlist_entry.patient,
|
|
'provider': cancellation.provider,
|
|
'slot_datetime': cancellation.cancelled_slot_start,
|
|
'slot_end': cancellation.cancelled_slot_end,
|
|
'appointment_type': cancellation.appointment_type,
|
|
'expiration_time': cancellation.expires_at,
|
|
'response_url': self.generate_response_url(waitlist_entry, cancellation)
|
|
}
|
|
|
|
# Send via preferred channels
|
|
notifier = NotificationEngine(self.tenant)
|
|
for channel in waitlist_entry.notification_channels:
|
|
if channel == 'sms':
|
|
notifier.send_sms_waitlist_opportunity(context)
|
|
elif channel == 'email':
|
|
notifier.send_email_waitlist_opportunity(context)
|
|
elif channel == 'push':
|
|
notifier.send_push_waitlist_opportunity(context)
|
|
|
|
def generate_response_url(self, waitlist_entry, cancellation) -> str:
|
|
"""Generate unique URL for patient to respond to opportunity."""
|
|
import hashlib
|
|
|
|
# Create unique token
|
|
token_string = f"{waitlist_entry.id}-{cancellation.id}-{timezone.now().timestamp()}"
|
|
token = hashlib.sha256(token_string.encode()).hexdigest()[:32]
|
|
|
|
# Store token (you'd need a ResponseToken model)
|
|
# For now, return placeholder
|
|
return f"/appointments/waitlist/respond/{token}/"
|
|
|
|
def accept_waitlist_opportunity(self, waitlist_entry, cancellation):
|
|
"""
|
|
Process patient acceptance of waitlist opportunity.
|
|
"""
|
|
from appointments.models import AppointmentRequest
|
|
|
|
# Create new appointment
|
|
appointment = AppointmentRequest.objects.create(
|
|
tenant=self.tenant,
|
|
patient=waitlist_entry.patient,
|
|
provider=cancellation.provider,
|
|
appointment_type=cancellation.appointment_type,
|
|
scheduled_datetime=cancellation.cancelled_slot_start,
|
|
scheduled_end_datetime=cancellation.cancelled_slot_end,
|
|
status='CONFIRMED',
|
|
priority=waitlist_entry.priority,
|
|
is_waitlist_conversion=True
|
|
)
|
|
|
|
# Update cancellation record
|
|
cancellation.filled_by_waitlist = True
|
|
cancellation.filled_by = waitlist_entry
|
|
cancellation.filled_at = timezone.now()
|
|
cancellation.is_active = False
|
|
cancellation.save()
|
|
|
|
# Update waitlist entry
|
|
waitlist_entry.status = 'SCHEDULED'
|
|
waitlist_entry.scheduled_appointment = appointment
|
|
waitlist_entry.opportunities_accepted += 1
|
|
waitlist_entry.save()
|
|
|
|
return appointment
|
|
|
|
def calculate_flexibility_score(self, waitlist_entry) -> int:
|
|
"""
|
|
Calculate flexibility score based on patient preferences.
|
|
Score: 0 (inflexible) to 100 (very flexible)
|
|
"""
|
|
score = 50 # Base score
|
|
|
|
# More flexible if no provider preference
|
|
if not waitlist_entry.preferred_provider:
|
|
score += 20
|
|
|
|
# More flexible if wide date range
|
|
if waitlist_entry.desired_date_range_start and waitlist_entry.desired_date_range_end:
|
|
days_range = (
|
|
waitlist_entry.desired_date_range_end -
|
|
waitlist_entry.desired_date_range_start
|
|
).days
|
|
if days_range > 30:
|
|
score += 20
|
|
elif days_range > 14:
|
|
score += 10
|
|
|
|
# More flexible if no time preference
|
|
if not waitlist_entry.preferred_time:
|
|
score += 10
|
|
|
|
return min(100, score)
|
|
```
|
|
|
|
### 12.3 Implementation Checklist
|
|
|
|
- [ ] **Database Changes**
|
|
- [ ] Create SlotCancellation model
|
|
- [ ] Add flexibility_score to WaitingList
|
|
- [ ] Add notification_channels to WaitingList
|
|
- [ ] Run migrations
|
|
|
|
- [ ] **Waitlist Engine**
|
|
- [ ] Implement WaitlistEngine class
|
|
- [ ] Implement matching algorithm
|
|
- [ ] Implement scoring system
|
|
- [ ] Implement notification system
|
|
|
|
- [ ] **Signal Integration**
|
|
- [ ] Connect to appointment cancellation signal
|
|
- [ ] Auto-trigger waitlist fulfillment
|
|
- [ ] Update waitlist metrics
|
|
|
|
- [ ] **API Endpoints**
|
|
- [ ]
|