Marwan Alwali 263292f6be update
2025-11-04 00:50:06 +03:00

321 lines
9.7 KiB
Python

"""
WebSocket consumers for real-time appointments and queue updates.
This module provides WebSocket consumers for:
- Real-time queue status updates
- Queue position changes
- Wait time updates
- Patient notifications
"""
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.utils import timezone
class QueueConsumer(AsyncWebsocketConsumer):
"""
WebSocket consumer for real-time queue updates.
Provides real-time updates for:
- Queue size changes
- Position updates
- Wait time estimates
- Patient called notifications
"""
async def connect(self):
"""
Handle WebSocket connection.
Join the queue-specific group and send initial status.
"""
# Get queue ID from URL route
self.queue_id = self.scope['url_route']['kwargs']['queue_id']
self.queue_group_name = f'queue_{self.queue_id}'
# Join queue group
await self.channel_layer.group_add(
self.queue_group_name,
self.channel_name
)
# Accept the WebSocket connection
await self.accept()
# Send initial queue status
try:
status = await self.get_queue_status()
await self.send(text_data=json.dumps({
'type': 'initial_status',
'data': status
}))
except Exception as e:
await self.send(text_data=json.dumps({
'type': 'error',
'message': f'Failed to get initial status: {str(e)}'
}))
async def disconnect(self, close_code):
"""
Handle WebSocket disconnection.
Leave the queue group.
"""
# Leave queue group
await self.channel_layer.group_discard(
self.queue_group_name,
self.channel_name
)
async def receive(self, text_data):
"""
Handle messages from WebSocket client.
Supported message types:
- get_status: Request current queue status
- ping: Heartbeat/keepalive
"""
try:
data = json.loads(text_data)
message_type = data.get('type')
if message_type == 'get_status':
# Client requesting current status
status = await self.get_queue_status()
await self.send(text_data=json.dumps({
'type': 'status_update',
'data': status
}))
elif message_type == 'ping':
# Heartbeat response
await self.send(text_data=json.dumps({
'type': 'pong',
'timestamp': timezone.now().isoformat()
}))
else:
await self.send(text_data=json.dumps({
'type': 'error',
'message': f'Unknown message type: {message_type}'
}))
except json.JSONDecodeError:
await self.send(text_data=json.dumps({
'type': 'error',
'message': 'Invalid JSON'
}))
except Exception as e:
await self.send(text_data=json.dumps({
'type': 'error',
'message': str(e)
}))
async def queue_update(self, event):
"""
Handle queue update events from group.
Broadcast to WebSocket client.
"""
await self.send(text_data=json.dumps({
'type': 'queue_update',
'data': event['data']
}))
async def position_change(self, event):
"""
Handle position change events.
Notify specific patient of position change.
"""
await self.send(text_data=json.dumps({
'type': 'position_change',
'data': event['data']
}))
async def patient_called(self, event):
"""
Handle patient called events.
Notify when a patient is called.
"""
await self.send(text_data=json.dumps({
'type': 'patient_called',
'data': event['data']
}))
@database_sync_to_async
def get_queue_status(self):
"""
Get current queue status from database.
Returns:
dict: Queue status data
"""
from appointments.models import WaitingQueue
from appointments.queue import AdvancedQueueEngine
try:
queue = WaitingQueue.objects.get(id=self.queue_id)
engine = AdvancedQueueEngine(queue)
return engine.get_queue_status()
except WaitingQueue.DoesNotExist:
return {
'error': 'Queue not found',
'queue_id': self.queue_id
}
except Exception as e:
return {
'error': str(e),
'queue_id': self.queue_id
}
class PatientQueueConsumer(AsyncWebsocketConsumer):
"""
WebSocket consumer for individual patient queue updates.
Provides patient-specific updates:
- Personal position changes
- Estimated wait time updates
- Called notifications
"""
async def connect(self):
"""
Handle WebSocket connection for patient.
Join patient-specific group.
"""
# Get patient ID and queue ID from URL route
self.patient_id = self.scope['url_route']['kwargs']['patient_id']
self.queue_id = self.scope['url_route']['kwargs']['queue_id']
self.patient_group_name = f'patient_{self.patient_id}_queue_{self.queue_id}'
# Join patient-specific group
await self.channel_layer.group_add(
self.patient_group_name,
self.channel_name
)
# Accept connection
await self.accept()
# Send initial patient status
try:
status = await self.get_patient_queue_status()
await self.send(text_data=json.dumps({
'type': 'initial_status',
'data': status
}))
except Exception as e:
await self.send(text_data=json.dumps({
'type': 'error',
'message': f'Failed to get patient status: {str(e)}'
}))
async def disconnect(self, close_code):
"""
Handle WebSocket disconnection.
Leave patient group.
"""
await self.channel_layer.group_discard(
self.patient_group_name,
self.channel_name
)
async def receive(self, text_data):
"""
Handle messages from patient WebSocket client.
"""
try:
data = json.loads(text_data)
message_type = data.get('type')
if message_type == 'get_status':
status = await self.get_patient_queue_status()
await self.send(text_data=json.dumps({
'type': 'status_update',
'data': status
}))
elif message_type == 'ping':
await self.send(text_data=json.dumps({
'type': 'pong',
'timestamp': timezone.now().isoformat()
}))
except Exception as e:
await self.send(text_data=json.dumps({
'type': 'error',
'message': str(e)
}))
async def patient_update(self, event):
"""
Handle patient-specific update events.
"""
await self.send(text_data=json.dumps({
'type': 'patient_update',
'data': event['data']
}))
async def position_change(self, event):
"""
Handle position change for this patient.
"""
await self.send(text_data=json.dumps({
'type': 'position_change',
'data': event['data']
}))
async def called_notification(self, event):
"""
Handle notification that patient has been called.
"""
await self.send(text_data=json.dumps({
'type': 'called',
'data': event['data']
}))
@database_sync_to_async
def get_patient_queue_status(self):
"""
Get patient-specific queue status.
Returns:
dict: Patient queue status
"""
from appointments.models import QueueEntry
try:
entry = QueueEntry.objects.select_related(
'queue', 'patient', 'appointment'
).get(
queue_id=self.queue_id,
patient_id=self.patient_id,
status='WAITING'
)
return {
'patient_id': self.patient_id,
'queue_id': self.queue_id,
'queue_name': entry.queue.name,
'position': entry.queue_position,
'wait_time_minutes': entry.wait_time_minutes or 0,
'estimated_service_time': entry.estimated_service_time.isoformat() if entry.estimated_service_time else None,
'priority_score': entry.priority_score,
'status': entry.status,
'joined_at': entry.joined_at.isoformat(),
'timestamp': timezone.now().isoformat()
}
except QueueEntry.DoesNotExist:
return {
'error': 'Not in queue',
'patient_id': self.patient_id,
'queue_id': self.queue_id
}
except Exception as e:
return {
'error': str(e),
'patient_id': self.patient_id,
'queue_id': self.queue_id
}