agdar/appointments/api_views.py
2025-11-02 14:35:35 +03:00

442 lines
16 KiB
Python

"""
DRF API ViewSets for Appointments app.
"""
from rest_framework import viewsets, filters, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from django_filters.rest_framework import DjangoFilterBackend
from django.utils import timezone
from django.db.models import Q, Count
from datetime import datetime, timedelta
from .models import Appointment, Provider, Room, Schedule, AppointmentReminder
from .serializers import (
AppointmentListSerializer, AppointmentDetailSerializer,
AppointmentCreateSerializer, AppointmentUpdateSerializer,
AppointmentStatusSerializer, ProviderSerializer,
RoomSerializer, ScheduleSerializer, AppointmentReminderSerializer,
CalendarSlotSerializer
)
class AppointmentViewSet(viewsets.ModelViewSet):
"""
API endpoint for Appointment CRUD and status transitions.
list: Get list of appointments
retrieve: Get appointment details
create: Book new appointment
update: Update appointment
destroy: Cancel appointment
Custom actions:
- confirm: Confirm appointment
- arrive: Mark patient as arrived
- start: Start appointment
- complete: Complete appointment
- cancel: Cancel appointment with reason
- reschedule: Reschedule appointment
"""
permission_classes = [IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['patient', 'clinic', 'provider', 'status', 'scheduled_date']
search_fields = ['appointment_number', 'patient__mrn', 'patient__first_name_en']
ordering_fields = ['scheduled_date', 'scheduled_time', 'created_at']
ordering = ['scheduled_date', 'scheduled_time']
def get_queryset(self):
"""Filter by tenant and optionally by date range."""
queryset = Appointment.objects.filter(tenant=self.request.user.tenant)
# Filter by date range if provided
start_date = self.request.query_params.get('start_date')
end_date = self.request.query_params.get('end_date')
if start_date:
queryset = queryset.filter(scheduled_date__gte=start_date)
if end_date:
queryset = queryset.filter(scheduled_date__lte=end_date)
return queryset.select_related('patient', 'clinic', 'provider__user', 'room')
def get_serializer_class(self):
"""Return appropriate serializer based on action."""
if self.action == 'list':
return AppointmentListSerializer
elif self.action == 'create':
return AppointmentCreateSerializer
elif self.action in ['update', 'partial_update']:
return AppointmentUpdateSerializer
elif self.action in ['confirm', 'arrive', 'start', 'complete', 'cancel']:
return AppointmentStatusSerializer
return AppointmentDetailSerializer
def perform_create(self, serializer):
"""Set tenant and initial status on create."""
serializer.save(
tenant=self.request.user.tenant,
status='BOOKED'
)
@action(detail=True, methods=['post'])
def confirm(self, request, pk=None):
"""Confirm appointment."""
appointment = self.get_object()
if appointment.status != 'BOOKED':
return Response(
{'error': 'Only booked appointments can be confirmed'},
status=status.HTTP_400_BAD_REQUEST
)
appointment.status = 'CONFIRMED'
appointment.confirmation_sent_at = timezone.now()
appointment.confirmation_method = request.data.get('method', 'SMS')
appointment.save()
serializer = AppointmentDetailSerializer(appointment)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def arrive(self, request, pk=None):
"""Mark patient as arrived."""
appointment = self.get_object()
if appointment.status not in ['BOOKED', 'CONFIRMED']:
return Response(
{'error': 'Invalid appointment status for arrival'},
status=status.HTTP_400_BAD_REQUEST
)
appointment.status = 'ARRIVED'
appointment.arrival_at = timezone.now()
appointment.save()
serializer = AppointmentDetailSerializer(appointment)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def start(self, request, pk=None):
"""Start appointment."""
appointment = self.get_object()
if appointment.status != 'ARRIVED':
return Response(
{'error': 'Patient must be marked as arrived first'},
status=status.HTTP_400_BAD_REQUEST
)
appointment.status = 'IN_PROGRESS'
appointment.start_at = timezone.now()
appointment.save()
serializer = AppointmentDetailSerializer(appointment)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def complete(self, request, pk=None):
"""Complete appointment."""
appointment = self.get_object()
if appointment.status != 'IN_PROGRESS':
return Response(
{'error': 'Appointment must be in progress to complete'},
status=status.HTTP_400_BAD_REQUEST
)
appointment.status = 'COMPLETED'
appointment.end_at = timezone.now()
appointment.save()
serializer = AppointmentDetailSerializer(appointment)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def cancel(self, request, pk=None):
"""Cancel appointment with reason."""
appointment = self.get_object()
if appointment.status in ['COMPLETED', 'CANCELLED']:
return Response(
{'error': 'Cannot cancel completed or already cancelled appointment'},
status=status.HTTP_400_BAD_REQUEST
)
cancel_reason = request.data.get('cancel_reason')
if not cancel_reason:
return Response(
{'error': 'Cancel reason is required'},
status=status.HTTP_400_BAD_REQUEST
)
appointment.status = 'CANCELLED'
appointment.cancel_reason = cancel_reason
appointment.cancelled_by = request.user
appointment.save()
serializer = AppointmentDetailSerializer(appointment)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def reschedule(self, request, pk=None):
"""Reschedule appointment."""
appointment = self.get_object()
if appointment.status in ['COMPLETED', 'CANCELLED']:
return Response(
{'error': 'Cannot reschedule completed or cancelled appointment'},
status=status.HTTP_400_BAD_REQUEST
)
new_date = request.data.get('scheduled_date')
new_time = request.data.get('scheduled_time')
reschedule_reason = request.data.get('reschedule_reason')
if not all([new_date, new_time, reschedule_reason]):
return Response(
{'error': 'New date, time, and reason are required'},
status=status.HTTP_400_BAD_REQUEST
)
appointment.scheduled_date = new_date
appointment.scheduled_time = new_time
appointment.reschedule_reason = reschedule_reason
appointment.reschedule_count += 1
appointment.status = 'RESCHEDULED'
appointment.save()
serializer = AppointmentDetailSerializer(appointment)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def today(self, request):
"""Get today's appointments."""
today = timezone.now().date()
appointments = self.get_queryset().filter(scheduled_date=today)
serializer = AppointmentListSerializer(appointments, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def upcoming(self, request):
"""Get upcoming appointments."""
today = timezone.now().date()
appointments = self.get_queryset().filter(
scheduled_date__gte=today,
status__in=['BOOKED', 'CONFIRMED']
).order_by('scheduled_date', 'scheduled_time')[:20]
serializer = AppointmentListSerializer(appointments, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def statistics(self, request):
"""Get appointment statistics."""
queryset = self.get_queryset()
stats = {
'total': queryset.count(),
'by_status': dict(queryset.values('status').annotate(count=Count('id')).values_list('status', 'count')),
'today': queryset.filter(scheduled_date=timezone.now().date()).count(),
'this_week': queryset.filter(
scheduled_date__gte=timezone.now().date(),
scheduled_date__lt=timezone.now().date() + timedelta(days=7)
).count(),
}
return Response(stats)
@action(detail=False, methods=['get'])
def calendar(self, request):
"""Get appointments for calendar view."""
from datetime import datetime
# Get date range from query params
start_str = request.query_params.get('start')
end_str = request.query_params.get('end')
if not start_str or not end_str:
return Response(
{'error': 'start and end parameters are required'},
status=status.HTTP_400_BAD_REQUEST
)
# Parse dates
try:
start_date = datetime.fromisoformat(start_str.replace('Z', '+00:00')).date()
end_date = datetime.fromisoformat(end_str.replace('Z', '+00:00')).date()
except ValueError:
return Response(
{'error': 'Invalid date format'},
status=status.HTTP_400_BAD_REQUEST
)
# Get appointments in date range
queryset = self.get_queryset().filter(
scheduled_date__gte=start_date,
scheduled_date__lte=end_date
)
# Apply additional filters
clinic = request.query_params.get('clinic')
provider = request.query_params.get('provider')
status_filter = request.query_params.get('status')
if clinic:
queryset = queryset.filter(clinic_id=clinic)
if provider:
queryset = queryset.filter(provider_id=provider)
if status_filter:
queryset = queryset.filter(status=status_filter)
# Serialize with additional fields for calendar
appointments = []
for appointment in queryset:
# Calculate end time
from datetime import datetime, timedelta
start_datetime = datetime.combine(
appointment.scheduled_date,
appointment.scheduled_time
)
end_datetime = start_datetime + timedelta(minutes=appointment.duration)
appointments.append({
'id': str(appointment.id),
'appointment_number': appointment.appointment_number,
'patient_name': appointment.patient.full_name_en,
'patient_mrn': appointment.patient.mrn,
'clinic_name': appointment.clinic.name_en,
'provider_name': appointment.provider.user.get_full_name(),
'service_type': appointment.service_type,
'scheduled_date': appointment.scheduled_date.isoformat(),
'scheduled_time': appointment.scheduled_time.isoformat(),
'end_time': end_datetime.isoformat(),
'duration': appointment.duration,
'status': appointment.status,
'room': appointment.room.name if appointment.room else None,
})
return Response(appointments)
class ProviderViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint for Provider (read-only).
list: Get list of providers
retrieve: Get provider details
availability: Check provider availability
"""
permission_classes = [IsAuthenticated]
serializer_class = ProviderSerializer
filter_backends = [DjangoFilterBackend, filters.SearchFilter]
filterset_fields = ['is_available', 'specialties']
search_fields = ['user__first_name', 'user__last_name']
def get_queryset(self):
"""Filter by tenant and optionally by clinic."""
queryset = Provider.objects.filter(
tenant=self.request.user.tenant
).select_related('user').prefetch_related('specialties')
# Filter by clinic if provided (clinic is a specialty/clinic relationship)
clinic_id = self.request.query_params.get('clinic')
if clinic_id:
queryset = queryset.filter(specialties__id=clinic_id)
return queryset
@action(detail=True, methods=['get'])
def availability(self, request, pk=None):
"""Check provider availability for a date range."""
provider = self.get_object()
start_date = request.query_params.get('start_date')
end_date = request.query_params.get('end_date')
if not start_date or not end_date:
return Response(
{'error': 'start_date and end_date are required'},
status=status.HTTP_400_BAD_REQUEST
)
# Get provider's schedule
schedules = Schedule.objects.filter(provider=provider, is_active=True)
# Get existing appointments
appointments = Appointment.objects.filter(
provider=provider,
scheduled_date__gte=start_date,
scheduled_date__lte=end_date,
status__in=['BOOKED', 'CONFIRMED', 'ARRIVED', 'IN_PROGRESS']
)
availability_data = {
'provider': ProviderSerializer(provider).data,
'schedules': ScheduleSerializer(schedules, many=True).data,
'booked_slots': AppointmentListSerializer(appointments, many=True).data,
}
return Response(availability_data)
@action(detail=True, methods=['get'])
def appointments(self, request, pk=None):
"""Get provider's appointments."""
provider = self.get_object()
appointments = Appointment.objects.filter(
provider=provider,
tenant=request.user.tenant
).order_by('-scheduled_date', '-scheduled_time')[:50]
serializer = AppointmentListSerializer(appointments, many=True)
return Response(serializer.data)
class RoomViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint for Room (read-only).
list: Get list of rooms
retrieve: Get room details
"""
permission_classes = [IsAuthenticated]
serializer_class = RoomSerializer
filter_backends = [DjangoFilterBackend, filters.SearchFilter]
filterset_fields = ['clinic', 'is_available']
search_fields = ['name', 'room_number']
def get_queryset(self):
"""Filter by tenant."""
return Room.objects.filter(
tenant=self.request.user.tenant
).select_related('clinic')
class ScheduleViewSet(viewsets.ModelViewSet):
"""
API endpoint for Schedule CRUD operations.
list: Get list of schedules
retrieve: Get schedule details
create: Create new schedule
update: Update schedule
destroy: Delete schedule
"""
permission_classes = [IsAuthenticated]
serializer_class = ScheduleSerializer
filter_backends = [DjangoFilterBackend]
filterset_fields = ['provider', 'day_of_week', 'is_active']
def get_queryset(self):
"""Filter by tenant."""
return Schedule.objects.filter(
tenant=self.request.user.tenant
).select_related('provider__user')
def perform_create(self, serializer):
"""Set tenant on create."""
serializer.save(tenant=self.request.user.tenant)