Marwan Alwali 263292f6be update
2025-11-04 00:50:06 +03:00

555 lines
19 KiB
Python

"""
Advanced Queue Engine for intelligent queue management.
This module provides the AdvancedQueueEngine class which implements:
- Dynamic queue positioning based on multiple factors
- Real-time wait time estimation with load factors
- Automated queue repositioning
- WebSocket broadcasting for real-time updates
- Comprehensive metrics tracking
"""
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
from decimal import Decimal
from django.db.models import Avg, Count, Q
from django.utils import timezone
from django.core.exceptions import ValidationError
class AdvancedQueueEngine:
"""
Intelligent queue management engine with dynamic positioning and real-time updates.
Features:
- Multi-factor queue positioning (priority, wait time, appointment proximity)
- Load-based wait time estimation
- Automatic queue repositioning
- Real-time WebSocket updates
- Comprehensive metrics tracking
"""
def __init__(self, queue):
"""
Initialize the queue engine.
Args:
queue: WaitingQueue instance
"""
self.queue = queue
# Get or create configuration
try:
self.config = queue.configuration
except Exception:
# Create default configuration if not exists
from appointments.models import QueueConfiguration
self.config = QueueConfiguration.objects.create(queue=queue)
def add_to_queue(
self,
patient,
appointment=None,
priority_score: float = 1.0,
notes: str = ""
) -> 'QueueEntry':
"""
Add patient to queue with intelligent positioning.
Args:
patient: PatientProfile instance
appointment: AppointmentRequest instance (optional)
priority_score: Priority score (0.0-10.0)
notes: Additional notes
Returns:
QueueEntry: Created queue entry
"""
from appointments.models import QueueEntry
# Calculate optimal position
position = self.calculate_optimal_position(patient, priority_score, appointment)
# Estimate wait time
wait_time = self.calculate_estimated_wait_time(position)
estimated_service_time = timezone.now() + wait_time
# Create queue entry
entry = QueueEntry.objects.create(
queue=self.queue,
patient=patient,
appointment=appointment,
queue_position=position,
priority_score=priority_score,
estimated_service_time=estimated_service_time,
notes=notes,
status='WAITING'
)
# Reposition other entries if needed
if self.config.use_dynamic_positioning:
self.reposition_queue_entries()
# Broadcast update via WebSocket
if self.config.enable_websocket_updates:
self.broadcast_queue_update()
# Update metrics
self.update_queue_metrics()
return entry
def calculate_optimal_position(
self,
patient,
priority_score: float,
appointment=None
) -> int:
"""
Calculate optimal queue position using weighted factors.
Args:
patient: PatientProfile instance
priority_score: Priority score
appointment: AppointmentRequest instance (optional)
Returns:
int: Optimal queue position
"""
from appointments.models import QueueEntry
if not self.config.use_dynamic_positioning:
# Simple FIFO - add to end
return self.queue.queue_entries.filter(status='WAITING').count() + 1
# Get current waiting entries
waiting_entries = list(
self.queue.queue_entries.filter(status='WAITING')
.select_related('patient', 'appointment')
.order_by('queue_position')
)
if not waiting_entries:
return 1
# Calculate scores for each position
best_position = len(waiting_entries) + 1
best_score = float('-inf')
for pos in range(1, len(waiting_entries) + 2):
score = self._calculate_position_score(
pos, priority_score, appointment, waiting_entries
)
if score > best_score:
best_score = score
best_position = pos
return best_position
def _calculate_position_score(
self,
position: int,
priority_score: float,
appointment,
waiting_entries: List
) -> float:
"""
Calculate score for a specific position.
Args:
position: Position to evaluate
priority_score: Priority score
appointment: AppointmentRequest instance
waiting_entries: List of current waiting entries
Returns:
float: Position score
"""
# Factor 1: Priority score (higher is better)
priority_factor = priority_score * 10 * float(self.config.priority_weight)
# Factor 2: Wait time fairness (earlier positions for longer waits)
wait_time_factor = 0.0
if appointment and appointment.scheduled_datetime:
time_until_appt = (
appointment.scheduled_datetime - timezone.now()
).total_seconds() / 60
# Normalize to 0-100 scale (inverse - less time = higher score)
wait_time_factor = max(0, 100 - (time_until_appt / 10))
wait_time_factor *= float(self.config.wait_time_weight)
# Factor 3: Appointment time proximity
appt_time_factor = 0.0
if appointment:
# Prefer positions that minimize disruption
# Earlier positions get higher scores for urgent appointments
appt_time_factor = (100 - position) * float(self.config.appointment_time_weight)
return priority_factor + wait_time_factor + appt_time_factor
def calculate_estimated_wait_time(self, position: int) -> timedelta:
"""
Calculate estimated wait time based on position and historical data.
Args:
position: Queue position
Returns:
timedelta: Estimated wait time
"""
from appointments.models import QueueMetrics
if position <= 0:
return timedelta(0)
# Get average service time
if self.config.use_historical_data:
# Use historical data from configured days
days_ago = timezone.now().date() - timedelta(days=self.config.historical_data_days)
avg_service_time = QueueMetrics.objects.filter(
queue=self.queue,
date__gte=days_ago
).aggregate(
avg=Avg('average_service_time_minutes')
)['avg'] or self.config.default_service_time_minutes
else:
avg_service_time = self.config.default_service_time_minutes
# Calculate load factor
current_size = self.queue.queue_entries.filter(status='WAITING').count()
load_factor = self.calculate_load_factor(current_size)
# Estimate wait time
base_wait = float(avg_service_time) * (position - 1)
adjusted_wait = base_wait * load_factor
return timedelta(minutes=int(adjusted_wait))
def calculate_load_factor(self, current_size: int) -> float:
"""
Calculate load factor based on queue size.
Args:
current_size: Current queue size
Returns:
float: Load factor multiplier (1.0 - 2.0)
"""
capacity = self.queue.max_queue_size or 50
utilization = current_size / capacity if capacity > 0 else 0
# Apply thresholds from configuration
if utilization < float(self.config.load_factor_normal_threshold):
return 1.0 # Normal processing
elif utilization < float(self.config.load_factor_moderate_threshold):
return 1.2 # Slightly slower
elif utilization < float(self.config.load_factor_high_threshold):
return 1.5 # Significantly slower
else:
return 2.0 # Very slow, overloaded
def reposition_queue_entries(self):
"""
Dynamically reposition all waiting entries based on current factors.
"""
from appointments.models import QueueEntry
if not self.config.auto_reposition_enabled:
return
waiting_entries = list(
self.queue.queue_entries.filter(status='WAITING')
.select_related('patient', 'appointment')
)
if not waiting_entries:
return
# Calculate scores for each entry
scored_entries = []
for entry in waiting_entries:
score = self._calculate_entry_priority_score(entry)
scored_entries.append((entry, score))
# Sort by score (highest first)
scored_entries.sort(key=lambda x: x[1], reverse=True)
# Update positions
position_changes = 0
for new_position, (entry, score) in enumerate(scored_entries, start=1):
if entry.queue_position != new_position:
old_position = entry.queue_position
entry.queue_position = new_position
entry.save(update_fields=['queue_position', 'updated_at'])
position_changes += 1
# Notify if significant change
if (self.config.notify_on_position_change and
abs(old_position - new_position) >= self.config.position_change_threshold):
self._notify_position_change(entry, old_position, new_position)
# Update metrics
if position_changes > 0:
self._update_repositioning_metrics(position_changes)
def _calculate_entry_priority_score(self, entry) -> float:
"""
Calculate priority score for repositioning.
Args:
entry: QueueEntry instance
Returns:
float: Priority score
"""
# Base priority score
score = entry.priority_score or 1.0
# Add wait time factor
wait_minutes = (timezone.now() - entry.joined_at).total_seconds() / 60
wait_factor = min(50, wait_minutes / 2) # Cap at 50 points
# Add appointment proximity factor
if entry.appointment and entry.appointment.scheduled_datetime:
time_until = (
entry.appointment.scheduled_datetime - timezone.now()
).total_seconds() / 60
if time_until < 30: # Within 30 minutes
proximity_factor = 30
elif time_until < 60: # Within 1 hour
proximity_factor = 15
else:
proximity_factor = 0
else:
proximity_factor = 0
return (score * 10) + wait_factor + proximity_factor
def get_next_patient(self) -> Optional['QueueEntry']:
"""
Get the next patient to be served.
Returns:
QueueEntry: Next queue entry, or None if queue is empty
"""
from appointments.models import QueueEntry
next_entry = self.queue.queue_entries.filter(
status='WAITING'
).order_by('queue_position').first()
if next_entry:
# Mark as called
next_entry.mark_as_called()
# Broadcast update
if self.config.enable_websocket_updates:
self.broadcast_queue_update()
# Update metrics
self.update_queue_metrics()
return next_entry
def broadcast_queue_update(self):
"""
Broadcast queue update via WebSocket.
"""
try:
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
channel_layer = get_channel_layer()
if channel_layer:
async_to_sync(channel_layer.group_send)(
f'queue_{self.queue.id}',
{
'type': 'queue_update',
'data': self.get_queue_status()
}
)
except Exception as e:
# Log error but don't fail
import logging
logger = logging.getLogger(__name__)
logger.error(f"WebSocket broadcast error: {e}")
def get_queue_status(self) -> Dict:
"""
Get current queue status for broadcasting.
Returns:
dict: Queue status data
"""
from appointments.models import QueueEntry
waiting = self.queue.queue_entries.filter(status='WAITING').order_by('queue_position')
return {
'queue_id': self.queue.id,
'queue_name': self.queue.name,
'current_size': waiting.count(),
'max_size': self.queue.max_queue_size,
'average_wait_time': str(self.calculate_estimated_wait_time(waiting.count() + 1)),
'is_accepting': self.queue.is_accepting_patients,
'load_factor': self.calculate_load_factor(waiting.count()),
'entries': [
{
'id': entry.id,
'patient_name': entry.patient.get_full_name(),
'position': entry.queue_position,
'wait_time_minutes': entry.wait_time_minutes or 0,
'priority_score': entry.priority_score,
'status': entry.status
}
for entry in waiting[:10] # Top 10 entries
],
'timestamp': timezone.now().isoformat()
}
def update_queue_metrics(self):
"""
Update queue metrics for analytics.
"""
from appointments.models import QueueMetrics
now = timezone.now()
# Get or create metrics for current hour
metrics, created = QueueMetrics.objects.get_or_create(
queue=self.queue,
date=now.date(),
hour=now.hour,
defaults={
'total_entries': 0,
'completed_entries': 0,
'no_shows': 0,
'left_queue': 0,
'average_wait_time_minutes': 0,
'max_wait_time_minutes': 0,
'min_wait_time_minutes': 0,
'average_service_time_minutes': 0,
'peak_queue_size': 0,
'average_queue_size': 0,
'min_queue_size': 0,
'throughput': 0,
'utilization_rate': 0,
'no_show_rate': 0,
'abandonment_rate': 0,
'repositioning_events': 0,
'average_position_changes': 0,
'average_load_factor': 1.0,
'peak_load_factor': 1.0
}
)
# Update current metrics
current_size = self.queue.queue_entries.filter(status='WAITING').count()
current_load = self.calculate_load_factor(current_size)
metrics.peak_queue_size = max(metrics.peak_queue_size, current_size)
metrics.peak_load_factor = max(float(metrics.peak_load_factor), current_load)
# Update average load factor (simple moving average)
if created:
metrics.average_load_factor = current_load
else:
metrics.average_load_factor = (
(float(metrics.average_load_factor) + current_load) / 2
)
metrics.save()
def _notify_position_change(self, entry, old_position: int, new_position: int):
"""
Notify patient of significant position change.
Args:
entry: QueueEntry instance
old_position: Old queue position
new_position: New queue position
"""
# This would integrate with notification system
# For now, just log the change
import logging
logger = logging.getLogger(__name__)
logger.info(
f"Queue position changed for {entry.patient.get_full_name()}: "
f"{old_position} -> {new_position}"
)
def _update_repositioning_metrics(self, position_changes: int):
"""
Update repositioning metrics.
Args:
position_changes: Number of position changes
"""
from appointments.models import QueueMetrics
now = timezone.now()
try:
metrics = QueueMetrics.objects.get(
queue=self.queue,
date=now.date(),
hour=now.hour
)
metrics.repositioning_events += 1
metrics.average_position_changes = (
(float(metrics.average_position_changes) + position_changes) / 2
)
metrics.save()
except QueueMetrics.DoesNotExist:
pass
def get_analytics_summary(self, days: int = 7) -> Dict:
"""
Get analytics summary for the queue.
Args:
days: Number of days to analyze
Returns:
dict: Analytics summary
"""
from appointments.models import QueueMetrics
start_date = timezone.now().date() - timedelta(days=days)
metrics = QueueMetrics.objects.filter(
queue=self.queue,
date__gte=start_date
)
if not metrics.exists():
return {
'period_days': days,
'no_data': True
}
aggregates = metrics.aggregate(
avg_wait_time=Avg('average_wait_time_minutes'),
max_wait_time=Avg('max_wait_time_minutes'),
avg_queue_size=Avg('average_queue_size'),
peak_queue_size=Avg('peak_queue_size'),
avg_utilization=Avg('utilization_rate'),
avg_no_show_rate=Avg('no_show_rate'),
avg_abandonment_rate=Avg('abandonment_rate'),
total_entries=Count('total_entries'),
total_completed=Count('completed_entries')
)
return {
'period_days': days,
'average_wait_time_minutes': round(aggregates['avg_wait_time'] or 0, 2),
'max_wait_time_minutes': round(aggregates['max_wait_time'] or 0, 2),
'average_queue_size': round(aggregates['avg_queue_size'] or 0, 2),
'peak_queue_size': round(aggregates['peak_queue_size'] or 0, 2),
'average_utilization_rate': round(aggregates['avg_utilization'] or 0, 2),
'average_no_show_rate': round(aggregates['avg_no_show_rate'] or 0, 2),
'average_abandonment_rate': round(aggregates['avg_abandonment_rate'] or 0, 2),
'total_entries': aggregates['total_entries'] or 0,
'total_completed': aggregates['total_completed'] or 0
}