agdar/appointments/views.py
Marwan Alwali a4665842c9 update
2025-11-23 10:58:07 +03:00

3160 lines
119 KiB
Python

"""
Appointments views for the Tenhal Multidisciplinary Healthcare Platform.
This module contains views for appointment management including:
- Appointment CRUD operations
- State machine transitions (confirm, reschedule, cancel, arrive, start, complete)
- Calendar views
- Provider schedules
- Appointment reminders
"""
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.auth.decorators import login_required
from django.core.paginator import Paginator
from django.db.models import Q, Count
from django.http import JsonResponse, HttpResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.views import View
from django.views.generic import ListView, DetailView, CreateView, UpdateView, DeleteView
from django.urls import reverse_lazy
from django.contrib import messages
from datetime import date
from core.mixins import (
TenantFilterMixin,
RolePermissionMixin,
AuditLogMixin,
HTMXResponseMixin,
SuccessMessageMixin,
PaginationMixin,
)
from core.models import *
from .models import *
from .forms import *
class AppointmentListView(LoginRequiredMixin, TenantFilterMixin, PaginationMixin,
HTMXResponseMixin, ListView):
"""
Appointment list view with filtering and search.
Features:
- Filter by status, clinic, provider, date range
- Search by patient name/MRN
- Role-based filtering (providers see only their appointments)
- Export to CSV
"""
model = Appointment
template_name = 'appointments/appointment_list.html'
htmx_template_name = 'appointments/partials/appointment_list_partial.html'
context_object_name = 'appointments'
paginate_by = 25
def get_queryset(self):
"""Get filtered queryset based on role and filters."""
queryset = super().get_queryset()
user = self.request.user
# Role-based filtering
if user.role in [User.Role.DOCTOR, User.Role.NURSE, User.Role.OT,
User.Role.SLP, User.Role.ABA]:
# Clinical staff see only their appointments
queryset = queryset.filter(provider__user=user)
# Apply search
search_query = self.request.GET.get('search', '').strip()
if search_query:
queryset = queryset.filter(
Q(patient__first_name_en__icontains=search_query) |
Q(patient__last_name_en__icontains=search_query) |
Q(patient__mrn__icontains=search_query) |
Q(appointment_number__icontains=search_query)
)
# Apply filters
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
clinic_id = self.request.GET.get('clinic')
if clinic_id:
queryset = queryset.filter(clinic_id=clinic_id)
provider_id = self.request.GET.get('provider')
if provider_id:
queryset = queryset.filter(provider_id=provider_id)
date_from = self.request.GET.get('date_from')
if date_from:
queryset = queryset.filter(appointment_date__gte=date_from)
date_to = self.request.GET.get('date_to')
if date_to:
queryset = queryset.filter(appointment_date__lte=date_to)
# Default ordering
return queryset.select_related(
'patient', 'provider', 'clinic', 'room', 'cancelled_by',
).order_by('-scheduled_date', '-scheduled_time')
def get_context_data(self, **kwargs):
"""Add filter options to context."""
context = super().get_context_data(**kwargs)
# Add search form
context['search_form'] = AppointmentSearchForm(self.request.GET)
# Add filter options
context['clinics'] = Clinic.objects.filter(tenant=self.request.user.tenant)
context['providers'] = User.objects.filter(
tenant=self.request.user.tenant,
role__in=[User.Role.DOCTOR, User.Role.NURSE, User.Role.OT,
User.Role.SLP, User.Role.ABA]
)
context['status_choices'] = Appointment.Status.choices
# Add current filters
context['current_filters'] = {
'search': self.request.GET.get('search', ''),
'status': self.request.GET.get('status', ''),
'clinic': self.request.GET.get('clinic', ''),
'provider': self.request.GET.get('provider', ''),
'date_from': self.request.GET.get('date_from', ''),
'date_to': self.request.GET.get('date_to', ''),
}
return context
class AppointmentCalendarView(LoginRequiredMixin, TenantFilterMixin, ListView):
"""
Calendar view for appointments.
Features:
- Weekly/monthly calendar grid
- Filter by clinic/provider
- Drag-and-drop rescheduling (HTMX)
- Color-coded by status
"""
model = Appointment
template_name = 'appointments/appointment_calendar.html'
context_object_name = 'appointments'
def get_queryset(self):
"""Get appointments for the selected date range."""
queryset = super().get_queryset()
user = self.request.user
# Role-based filtering
if user.role in [User.Role.DOCTOR, User.Role.NURSE, User.Role.OT,
User.Role.SLP, User.Role.ABA, User.Role.ADMIN]:
queryset = queryset.filter(provider__user=user)
# Get date range (default: current week)
view_type = self.request.GET.get('view', 'week') # week or month
date_str = self.request.GET.get('date')
if date_str:
from datetime import datetime
base_date = datetime.strptime(date_str, '%Y-%m-%d').date()
else:
base_date = timezone.now().date()
if view_type == 'week':
# Get week range
start_date = base_date - timezone.timedelta(days=base_date.weekday())
end_date = start_date + timezone.timedelta(days=6)
else: # month
# Get month range
start_date = base_date.replace(day=1)
if base_date.month == 12:
end_date = base_date.replace(year=base_date.year + 1, month=1, day=1)
else:
end_date = base_date.replace(month=base_date.month + 1, day=1)
end_date = end_date - timezone.timedelta(days=1)
queryset = queryset.filter(
scheduled_date__gte=start_date,
scheduled_date__lte=end_date
)
# Apply filters
clinic_id = self.request.GET.get('clinic')
if clinic_id:
queryset = queryset.filter(clinic_id=clinic_id)
provider_id = self.request.GET.get('provider')
if provider_id:
queryset = queryset.filter(provider_id=provider_id)
return queryset.select_related('patient', 'provider', 'clinic', 'service')
def get_context_data(self, **kwargs):
"""Add calendar data to context."""
context = super().get_context_data(**kwargs)
# Add filter options
context['clinics'] = Clinic.objects.filter(tenant=self.request.user.tenant)
context['providers'] = User.objects.filter(
tenant=self.request.user.tenant,
role__in=[User.Role.DOCTOR, User.Role.NURSE, User.Role.OT,
User.Role.SLP, User.Role.ABA, User.Role.ADMIN]
)
# Get unique service types for filter
context['services'] = Appointment.objects.filter(
tenant=self.request.user.tenant
).values_list('service_type', flat=True).distinct().order_by('service_type')
# Add view type and date
context['view_type'] = self.request.GET.get('view', 'week')
context['current_date'] = self.request.GET.get('date', timezone.now().date())
# Calendar customization settings
context['calendar_settings'] = {
'initial_view': 'dayGridMonth',
'slot_duration': '00:30:00',
'slot_min_time': '08:00:00',
'slot_max_time': '18:00:00',
'weekends': True,
'all_day_slot': False,
'event_time_format': 'h:mm a',
'slot_label_format': 'h:mm a',
}
# Event status colors
context['status_colors'] = {
'BOOKED': '#0d6efd',
'CONFIRMED': '#198754',
'ARRIVED': '#ffc107',
'IN_PROGRESS': '#fd7e14',
'COMPLETED': '#6c757d',
'CANCELLED': '#dc3545',
'NO_SHOW': '#6c757d',
'RESCHEDULED': '#0dcaf0',
}
return context
class AppointmentDetailView(LoginRequiredMixin, TenantFilterMixin, DetailView):
"""
Appointment detail view.
Features:
- Full appointment details
- Patient information
- Status history
- Available actions based on current status
- Related clinical documents
"""
model = Appointment
template_name = 'appointments/appointment_detail.html'
context_object_name = 'appointment'
def get_context_data(self, **kwargs):
"""Add related data and available actions."""
context = super().get_context_data(**kwargs)
appointment = self.object
# Get available actions based on current status
context['available_actions'] = self._get_available_actions(appointment)
# Get status history (if using simple_history)
if hasattr(appointment, 'history'):
context['status_history'] = appointment.history.all()[:10]
# Get related clinical documents
context['clinical_documents'] = self._get_clinical_documents(appointment)
# Check if user can modify
context['can_modify'] = self._can_user_modify(appointment)
# Check for paid invoice
from finance.models import Invoice
paid_invoice = Invoice.objects.filter(
appointment=appointment,
status=Invoice.Status.PAID
).first()
context['paid_invoice'] = paid_invoice
return context
def _get_available_actions(self, appointment):
"""Get list of available actions based on current status."""
actions = []
status = appointment.status
if status == Appointment.Status.BOOKED:
actions.extend(['confirm', 'reschedule', 'cancel'])
elif status == Appointment.Status.CONFIRMED:
actions.extend(['arrive', 'reschedule', 'cancel'])
elif status == Appointment.Status.ARRIVED:
actions.extend(['start', 'cancel'])
elif status == Appointment.Status.IN_PROGRESS:
actions.extend(['complete'])
elif status == Appointment.Status.RESCHEDULED:
actions.extend(['confirm', 'cancel'])
return actions
def _get_clinical_documents(self, appointment):
"""Get clinical documents related to this appointment."""
documents = {}
patient = appointment.patient
# Import models
from nursing.models import NursingEncounter
from medical.models import MedicalConsultation, MedicalFollowUp
from aba.models import ABAConsult
from ot.models import OTConsult, OTSession
from slp.models import SLPConsult, SLPAssessment, SLPIntervention
# Get documents linked to this appointment
documents['nursing'] = NursingEncounter.objects.filter(
appointment=appointment
).first()
documents['medical_consult'] = MedicalConsultation.objects.filter(
appointment=appointment
).first()
documents['medical_followup'] = MedicalFollowUp.objects.filter(
appointment=appointment
).first()
documents['aba'] = ABAConsult.objects.filter(
appointment=appointment
).first()
documents['ot_consult'] = OTConsult.objects.filter(
appointment=appointment
).first()
documents['ot_session'] = OTSession.objects.filter(
appointment=appointment
).first()
documents['slp_consult'] = SLPConsult.objects.filter(
appointment=appointment
).first()
documents['slp_assessment'] = SLPAssessment.objects.filter(
appointment=appointment
).first()
documents['slp_intervention'] = SLPIntervention.objects.filter(
appointment=appointment
).first()
return documents
def _can_user_modify(self, appointment):
"""Check if current user can modify appointment."""
user = self.request.user
# Admin and FrontDesk can always modify
if user.role in [User.Role.ADMIN, User.Role.FRONT_DESK]:
return True
# Provider can modify their own appointments
if appointment.provider == user:
return True
return False
class AppointmentCreateView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin,
SuccessMessageMixin, CreateView):
"""
Appointment creation view.
Features:
- Auto-generate appointment number
- Check provider availability
- Check patient consent BEFORE appointment creation
- Send confirmation notification
- Auto-populate patient from ?patient= URL parameter
- Load available packages for patient
"""
model = Appointment
form_class = AppointmentForm
template_name = 'appointments/appointment_form.html'
success_message = _("Appointment created successfully! Number: {appointment_number}")
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK]
def get_form_kwargs(self):
"""Pass patient and tenant to form if available."""
kwargs = super().get_form_kwargs()
# Always pass tenant
kwargs['tenant'] = self.request.user.tenant
# Check for patient parameter in URL or form data
patient_id = self.request.GET.get('patient')
if not patient_id and self.request.method == 'POST':
patient_id = self.request.POST.get('patient')
if patient_id:
try:
from core.models import Patient
patient = Patient.objects.get(
id=patient_id,
tenant=self.request.user.tenant
)
kwargs['patient'] = patient
except (Patient.DoesNotExist, ValueError):
pass
return kwargs
def get_initial(self):
"""Set initial form values, including patient from URL parameter."""
initial = super().get_initial()
# Check for patient parameter in URL
patient_id = self.request.GET.get('patient')
if patient_id:
try:
# Verify patient exists and belongs to tenant
from core.models import Patient
patient = Patient.objects.get(
id=patient_id,
tenant=self.request.user.tenant
)
initial['patient'] = patient
except (Patient.DoesNotExist, ValueError):
# Invalid patient ID, ignore
pass
return initial
def form_valid(self, form):
"""Validate consent before creating appointment."""
from core.services import ConsentService
from core.models import Patient
# Get patient from form
patient = form.cleaned_data.get('patient')
if not patient:
messages.error(self.request, _("Patient is required to create an appointment."))
return self.form_invalid(form)
# Get service type from clinic specialty
clinic = form.cleaned_data.get('clinic')
service_type = self._get_service_type_from_clinic(clinic)
# Check consent before creating appointment
has_consent, consent_message = ConsentService.verify_consent_for_service(
patient, service_type
)
if not has_consent:
# Get missing consents for detailed feedback
missing_consents = ConsentService.get_missing_consents(patient, service_type)
# Build error message
error_msg = _(
"Cannot create appointment: {message}. "
"Patient must sign required consent forms before booking."
).format(message=consent_message)
if missing_consents:
error_msg += " " + _("Missing consent types: {types}.").format(
types=', '.join(missing_consents)
)
messages.error(self.request, error_msg)
# Add a button/link to create consent in the message
messages.warning(
self.request,
_("Please create and sign the required consent forms for this patient first. "
"Go to patient detail page → Consents tab to create consent forms.")
)
# Store form data in session to allow user to return after signing consent
self.request.session['pending_appointment_data'] = {
'patient_id': str(patient.id),
'clinic_id': str(clinic.id) if clinic else None,
'provider_id': str(form.cleaned_data.get('provider').id) if form.cleaned_data.get('provider') else None,
'scheduled_date': str(form.cleaned_data.get('scheduled_date')) if form.cleaned_data.get('scheduled_date') else None,
'scheduled_time': str(form.cleaned_data.get('scheduled_time')) if form.cleaned_data.get('scheduled_time') else None,
'service_type': form.cleaned_data.get('service_type', ''),
'notes': form.cleaned_data.get('notes', ''),
}
# Redirect to patient detail page with consent tab
redirect_url = reverse_lazy('core:patient_detail', kwargs={'pk': patient.pk})
redirect_url = f"{redirect_url}?tab=consents&missing={','.join(missing_consents)}&return_to=appointment_create"
return redirect(redirect_url)
# Consent verified, proceed with appointment creation
return self._create_appointment(form)
def _get_service_type_from_clinic(self, clinic):
"""Get service type from clinic specialty."""
if not clinic:
return 'MEDICAL' # Default
# Map clinic specialty to service type
specialty_to_service = {
'MEDICAL': 'MEDICAL',
'NURSING': 'NURSING',
'ABA': 'ABA',
'OT': 'OT',
'SLP': 'SLP',
'PSYCHOLOGY': 'PSYCHOLOGY',
'PHYSIOTHERAPY': 'PHYSIOTHERAPY',
'NUTRITION': 'NUTRITION',
}
return specialty_to_service.get(clinic.specialty, 'MEDICAL')
def _create_appointment(self, form):
"""Create appointment after consent validation."""
# Set tenant
form.instance.tenant = self.request.user.tenant
# Generate appointment number
if not form.instance.appointment_number:
form.instance.appointment_number = self._generate_appointment_number()
# Set initial status
form.instance.status = Appointment.Status.BOOKED
# Handle package selection
package_purchase = form.cleaned_data.get('package_purchase')
if package_purchase:
patient = form.cleaned_data.get('patient')
# CRITICAL: Verify package belongs to this patient
if package_purchase.patient != patient:
messages.error(
self.request,
_('Selected package does not belong to this patient. Please select a valid package.')
)
return self.form_invalid(form)
# Verify package has remaining sessions
if package_purchase.sessions_remaining <= 0:
messages.error(self.request, _('Selected package has no remaining sessions'))
return self.form_invalid(form)
# Verify package is not expired
if package_purchase.is_expired:
messages.error(self.request, _('Selected package has expired'))
return self.form_invalid(form)
# Link appointment to package
form.instance.package_purchase = package_purchase
# Use atomic transaction to get correct session number
from django.db import transaction
with transaction.atomic():
# Get the maximum session number for this package and add 1
max_session = Appointment.objects.filter(
package_purchase=package_purchase
).aggregate(
max_num=models.Max('session_number_in_package')
)['max_num']
form.instance.session_number_in_package = (max_session or 0) + 1
# Add package info to notes
if form.instance.notes:
form.instance.notes += f"\n\n[Package: {package_purchase.package.name_en}, Session {form.instance.session_number_in_package}/{package_purchase.total_sessions}]"
else:
form.instance.notes = f"Package: {package_purchase.package.name_en}, Session {form.instance.session_number_in_package}/{package_purchase.total_sessions}"
# Save appointment
response = super().form_valid(form)
# Clear pending appointment data from session
if 'pending_appointment_data' in self.request.session:
del self.request.session['pending_appointment_data']
# Send confirmation notification (async in production)
self._send_confirmation_notification()
# Update success message
if package_purchase:
self.success_message = _(
"Appointment created successfully! Number: {appointment_number}. "
"Package session {session_num}/{total_sessions}"
).format(
appointment_number=self.object.appointment_number,
session_num=self.object.session_number_in_package,
total_sessions=package_purchase.total_sessions
)
else:
self.success_message = self.success_message.format(
appointment_number=self.object.appointment_number
)
return response
def get_success_url(self):
"""Redirect to appointment detail."""
return reverse_lazy('appointments:appointment_detail', kwargs={'pk': self.object.pk})
def _generate_appointment_number(self):
"""Generate unique appointment number."""
import random
tenant = self.request.user.tenant
year = timezone.now().year
for _ in range(10):
random_num = random.randint(10000, 99999)
number = f"APT-{tenant.code}-{year}-{random_num}"
if not Appointment.objects.filter(appointment_number=number).exists():
return number
# Fallback
timestamp = int(timezone.now().timestamp())
return f"APT-{tenant.code}-{year}-{timestamp}"
def _send_confirmation_notification(self):
"""Send appointment confirmation notification."""
# TODO: Implement notification sending
# This would use the notifications app to send SMS/WhatsApp/Email
pass
def get_context_data(self, **kwargs):
"""Add form title to context."""
context = super().get_context_data(**kwargs)
context['form_title'] = _('Create New Appointment')
context['submit_text'] = _('Create Appointment')
# Check if returning from consent creation
if 'pending_appointment_data' in self.request.session:
context['has_pending_data'] = True
messages.info(
self.request,
_("Consent forms have been signed. You can now complete the appointment booking.")
)
return context
class AppointmentUpdateView(LoginRequiredMixin, RolePermissionMixin, TenantFilterMixin,
AuditLogMixin, SuccessMessageMixin, UpdateView):
"""
Appointment update view.
Features:
- Update appointment details
- Cannot change status (use state transition views)
- Audit trail
"""
model = Appointment
form_class = AppointmentForm
template_name = 'appointments/appointment_form.html'
success_message = _("Appointment updated successfully!")
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK, User.Role.DOCTOR,
User.Role.NURSE, User.Role.OT, User.Role.SLP, User.Role.ABA]
def get_success_url(self):
"""Redirect to appointment detail."""
return reverse_lazy('appointments:appointment_detail', kwargs={'pk': self.object.pk})
def get_form(self, form_class=None):
"""Disable status field."""
form = super().get_form(form_class)
# Make status read-only (use state transition views instead)
if 'status' in form.fields:
form.fields['status'].disabled = True
form.fields['status'].help_text = 'Use action buttons to change status'
return form
def get_context_data(self, **kwargs):
"""Add form title to context."""
context = super().get_context_data(**kwargs)
context['form_title'] = _('Update Appointment: %(number)s') % {'number': self.object.appointment_number}
context['submit_text'] = _('Update Appointment')
return context
# State Machine Transition Views
class AppointmentConfirmView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Confirm appointment (BOOKED → CONFIRMED).
Features:
- Update status to CONFIRMED
- Set confirmation timestamp
- Send confirmation notification
"""
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK]
def post(self, request, pk):
"""Confirm appointment."""
appointment = get_object_or_404(
Appointment,
pk=pk,
tenant=request.user.tenant
)
# Check if transition is valid
if appointment.status != Appointment.Status.BOOKED:
messages.error(request, _('Appointment cannot be confirmed from current status.'))
return redirect('appointments:appointment_detail', pk=pk)
# Update status
appointment.status = Appointment.Status.CONFIRMED
appointment.confirmation_at = timezone.now()
appointment.save()
# Send notification
self._send_notification(appointment, 'confirmed')
messages.success(request, _('Appointment confirmed successfully!'))
return redirect('appointments:appointment_detail', pk=pk)
def _send_notification(self, appointment, event_type):
"""Send notification for appointment event."""
# TODO: Implement notification
pass
class AppointmentRescheduleView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin,
SuccessMessageMixin, UpdateView):
"""
Reschedule appointment.
Features:
- Change date/time
- Record reschedule reason
- Update status to RESCHEDULED
- Send notification
"""
model = Appointment
form_class = RescheduleForm
template_name = 'appointments/appointment_reschedule.html'
success_message = _("Appointment rescheduled successfully!")
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK]
def get_success_url(self):
"""Redirect to appointment detail."""
return reverse_lazy('appointments:appointment_detail', kwargs={'pk': self.object.pk})
def form_valid(self, form):
"""Update status and send notification."""
# Update status
form.instance.status = Appointment.Status.RESCHEDULED
form.instance.reschedule_at = timezone.now()
# Save
response = super().form_valid(form)
# Send notification
self._send_notification(self.object, 'rescheduled')
return response
def _send_notification(self, appointment, event_type):
"""Send notification."""
# TODO: Implement notification
pass
class AppointmentCancelView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Cancel appointment.
Features:
- Update status to CANCELLED
- Record cancellation reason
- Send notification
"""
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK, User.Role.DOCTOR,
User.Role.NURSE, User.Role.OT, User.Role.SLP, User.Role.ABA]
def post(self, request, pk):
"""Cancel appointment."""
appointment = get_object_or_404(
Appointment,
pk=pk,
tenant=request.user.tenant
)
# Get cancellation reason
cancel_reason = request.POST.get('cancel_reason', '')
# Update status
appointment.status = Appointment.Status.CANCELLED
appointment.cancel_reason = cancel_reason
appointment.cancel_at = timezone.now()
appointment.save()
# Send notification
self._send_notification(appointment, 'cancelled')
messages.success(request, _('Appointment cancelled successfully!'))
return redirect('appointments:appointment_detail', pk=pk)
def _send_notification(self, appointment, event_type):
"""Send notification."""
# TODO: Implement notification
pass
class AppointmentArriveView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Mark patient as arrived (CONFIRMED → ARRIVED).
Features:
- Check for paid invoice before arrival
- Update status to ARRIVED
- Set arrival timestamp
- Trigger check-in workflow
"""
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK]
def post(self, request, pk):
"""Mark patient as arrived."""
appointment = get_object_or_404(
Appointment,
pk=pk,
tenant=request.user.tenant
)
# Check if transition is valid
if appointment.status != Appointment.Status.CONFIRMED:
messages.error(request, _('Patient can only arrive for confirmed appointments.'))
return redirect('appointments:appointment_detail', pk=pk)
# Check if there's a paid invoice for this appointment
from finance.models import Invoice
paid_invoice = Invoice.objects.filter(
appointment=appointment,
status=Invoice.Status.PAID
).first()
if not paid_invoice:
# No paid invoice found, redirect to invoice creation
messages.warning(
request,
_('No paid invoice found for this appointment. Please create and process an invoice before marking the patient as arrived.')
)
# Redirect to invoice creation with pre-populated data
return redirect(f"{reverse_lazy('finance:invoice_create')}?patient={appointment.patient.pk}&appointment={appointment.pk}")
# Update status
appointment.status = Appointment.Status.ARRIVED
appointment.arrival_at = timezone.now()
appointment.save()
messages.success(request, _('Patient marked as arrived!'))
# Automatically redirect to create the appropriate clinical session
return self._redirect_to_create_clinical_session(appointment)
def _redirect_to_create_clinical_session(self, appointment):
"""Redirect to create appropriate clinical session based on clinic specialty."""
clinic_specialty = appointment.clinic.specialty
if clinic_specialty == 'MEDICAL':
# Check if it's initial consultation or follow-up
from medical.models import MedicalConsultation
has_previous_consult = MedicalConsultation.objects.filter(
patient=appointment.patient
).exists()
if has_previous_consult:
return redirect('medical:followup_create') + f'?appointment={appointment.pk}'
else:
return redirect('medical:consultation_create') + f'?appointment={appointment.pk}'
elif clinic_specialty == 'NURSING':
return redirect('nursing:encounter_create') + f'?appointment={appointment.pk}'
elif clinic_specialty == 'ABA':
return redirect('aba:consult_create') + f'?appointment={appointment.pk}'
elif clinic_specialty == 'OT':
# Check if it's consultation or session
from ot.models import OTConsult
has_previous_consult = OTConsult.objects.filter(
patient=appointment.patient
).exists()
if has_previous_consult:
return redirect('ot:session_create') + f'?appointment={appointment.pk}'
else:
return redirect('ot:consult_create') + f'?appointment={appointment.pk}'
elif clinic_specialty == 'SLP':
# Check if it's consultation, assessment, or intervention
from slp.models import SLPConsult
has_previous_consult = SLPConsult.objects.filter(
patient=appointment.patient
).exists()
if has_previous_consult:
# Could be assessment or intervention - default to intervention
return redirect('slp:intervention_create') + f'?appointment={appointment.pk}'
else:
return redirect('slp:consult_create') + f'?appointment={appointment.pk}'
elif clinic_specialty == 'PSYCHOLOGY':
return redirect('psychology:consult_create') + f'?appointment={appointment.pk}'
else:
# Unknown specialty, just go back to appointment detail
return redirect('appointments:appointment_detail', pk=appointment.pk)
class AppointmentStartView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Start appointment (ARRIVED → IN_PROGRESS).
Features:
- Update status to IN_PROGRESS
- Set start timestamp
- Redirect to appropriate clinical form
"""
allowed_roles = [User.Role.DOCTOR, User.Role.NURSE, User.Role.OT,
User.Role.SLP, User.Role.ABA]
def post(self, request, pk):
"""Start appointment."""
appointment = get_object_or_404(
Appointment,
pk=pk,
tenant=request.user.tenant,
provider=request.user # Only provider can start their appointment
)
# Check if transition is valid
if appointment.status != Appointment.Status.ARRIVED:
messages.error(request, _('Appointment can only be started after patient arrives.'))
return redirect('appointments:appointment_detail', pk=pk)
# Update status
appointment.status = Appointment.Status.IN_PROGRESS
appointment.start_at = timezone.now()
appointment.save()
messages.success(request, _('Appointment started!'))
# Redirect to appropriate clinical form based on clinic
return self._redirect_to_clinical_form(appointment)
def _redirect_to_clinical_form(self, appointment):
"""Redirect to appropriate clinical form based on clinic specialty."""
clinic_specialty = appointment.clinic.specialty
if clinic_specialty == Clinic.Specialty.MEDICAL:
return redirect('medical:consultation_create', appointment_id=appointment.pk)
elif clinic_specialty == Clinic.Specialty.NURSING:
return redirect('nursing:encounter_create', appointment_id=appointment.pk)
elif clinic_specialty == Clinic.Specialty.ABA:
return redirect('aba:consult_create', appointment_id=appointment.pk)
elif clinic_specialty == Clinic.Specialty.OT:
return redirect('ot:consult_create', appointment_id=appointment.pk)
elif clinic_specialty == Clinic.Specialty.SLP:
return redirect('slp:consult_create', appointment_id=appointment.pk)
else:
return redirect('appointments:appointment_detail', pk=appointment.pk)
class AppointmentCompleteView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Complete appointment (IN_PROGRESS → COMPLETED).
Features:
- Update status to COMPLETED
- Set end timestamp
- Trigger post-appointment workflow (invoice, follow-up)
"""
allowed_roles = [User.Role.DOCTOR, User.Role.NURSE, User.Role.OT,
User.Role.SLP, User.Role.ABA]
def post(self, request, pk):
"""Complete appointment."""
appointment = get_object_or_404(
Appointment,
pk=pk,
tenant=request.user.tenant,
provider=request.user
)
# Check if transition is valid
if appointment.status != Appointment.Status.IN_PROGRESS:
messages.error(request, _('Only in-progress appointments can be completed.'))
return redirect('appointments:appointment_detail', pk=pk)
# Update status
appointment.status = Appointment.Status.COMPLETED
appointment.end_at = timezone.now()
appointment.save()
# Trigger post-appointment workflow
self._trigger_post_appointment_workflow(appointment)
messages.success(request, _('Appointment completed successfully!'))
return redirect('appointments:appointment_detail', pk=pk)
def _trigger_post_appointment_workflow(self, appointment):
"""Trigger post-appointment tasks."""
# TODO: Implement post-appointment workflow
# - Create invoice
# - Schedule follow-up
# - Send satisfaction survey
pass
class AppointmentNoShowView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Mark appointment as no-show.
Features:
- Update status to NO_SHOW
- Record timestamp
- Send notification
"""
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK]
def post(self, request, pk):
"""Mark as no-show."""
appointment = get_object_or_404(
Appointment,
pk=pk,
tenant=request.user.tenant
)
# Update status
appointment.status = Appointment.Status.NO_SHOW
appointment.no_show_at = timezone.now()
appointment.save()
# Send notification
self._send_notification(appointment, 'no_show')
messages.warning(request, _('Appointment marked as no-show.'))
return redirect('appointments:appointment_detail', pk=pk)
def _send_notification(self, appointment, event_type):
"""Send notification."""
# TODO: Implement notification
pass
# ============================================================================
# Patient Confirmation Views (Phase 5)
# ============================================================================
class ConfirmAppointmentView(View):
"""
Patient-facing appointment confirmation view.
Features:
- Public access (no login required)
- Token-based authentication
- Confirm or decline appointment
- Mobile-friendly interface
- Metadata tracking (IP, user agent)
"""
def get(self, request, token):
"""Display confirmation page."""
from .confirmation_service import ConfirmationService
# Get confirmation by token
confirmation = ConfirmationService.get_confirmation_by_token(token)
if not confirmation:
return self._render_error(
request,
_('Invalid Confirmation Link'),
_('This confirmation link is invalid or has expired. Please contact the clinic.')
)
# Check if already processed
if confirmation.status in ['CONFIRMED', 'DECLINED']:
return self._render_already_processed(request, confirmation)
# Check if expired
if confirmation.is_expired:
return self._render_error(
request,
_('Link Expired'),
_('This confirmation link has expired. Please contact the clinic to reschedule.')
)
# Render confirmation page
context = {
'confirmation': confirmation,
'appointment': confirmation.appointment,
'patient': confirmation.appointment.patient,
'token': token,
}
return render(request, 'appointments/confirm_appointment.html', context)
def post(self, request, token):
"""Process confirmation or decline."""
from .confirmation_service import ConfirmationService
# Get confirmation
confirmation = ConfirmationService.get_confirmation_by_token(token)
if not confirmation:
return JsonResponse({
'success': False,
'error': _('Invalid confirmation link')
}, status=400)
# Get action (confirm or decline)
action = request.POST.get('action')
if action == 'confirm':
# Confirm appointment
success, message = ConfirmationService.confirm_appointment(
confirmation=confirmation,
method='LINK',
ip_address=self._get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', '')
)
if success:
return self._render_success(request, confirmation, 'confirmed')
else:
return self._render_error(request, _('Confirmation Failed'), message)
elif action == 'decline':
# Get decline reason
reason = request.POST.get('reason', 'Patient declined')
# Decline appointment
success, message = ConfirmationService.decline_appointment(
confirmation=confirmation,
reason=reason,
ip_address=self._get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', '')
)
if success:
return self._render_success(request, confirmation, 'declined')
else:
return self._render_error(request, _('Decline Failed'), message)
else:
return JsonResponse({
'success': False,
'error': _('Invalid action')
}, status=400)
def _render_error(self, request, title, message):
"""Render error page."""
context = {
'error_title': title,
'error_message': message,
}
return render(request, 'appointments/confirmation_error.html', context)
def _render_success(self, request, confirmation, action):
"""Render success page."""
context = {
'confirmation': confirmation,
'appointment': confirmation.appointment,
'action': action,
}
return render(request, 'appointments/confirmation_success.html', context)
def _render_already_processed(self, request, confirmation):
"""Render already processed page."""
context = {
'confirmation': confirmation,
'appointment': confirmation.appointment,
'status': confirmation.status,
}
return render(request, 'appointments/confirmation_already_processed.html', context)
def _get_client_ip(self, request):
"""Get client IP address."""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
# ============================================================================
# Availability API Views
# ============================================================================
class AvailableSlotsView(LoginRequiredMixin, View):
"""
API endpoint to get available time slots for a provider on a specific date.
Features:
- Returns JSON list of available slots
- Checks provider schedule
- Excludes booked appointments
- Supports custom duration
"""
def get(self, request):
"""Get available slots."""
from datetime import datetime
from .availability_service import AvailabilityService
# Get parameters
provider_id = request.GET.get('provider')
date_str = request.GET.get('date')
duration = int(request.GET.get('duration', 30))
# Validate parameters
if not provider_id or not date_str:
return JsonResponse({
'success': False,
'error': _('Provider and date are required')
}, status=400)
try:
# Parse date
date = datetime.strptime(date_str, '%Y-%m-%d').date()
# Get available slots (now returns dict with slots and metadata)
result = AvailabilityService.get_available_slots(
provider_id=provider_id,
date=date,
duration=duration
)
# Return enhanced response with reason codes and metadata
return JsonResponse({
'success': True,
'slots': result.get('slots', []),
'reason': result.get('reason'),
'message': result.get('message'),
'working_days': result.get('working_days'),
'provider_name': result.get('provider_name'),
'total_slots': result.get('total_slots'),
'booked_slots': result.get('booked_slots'),
'provider_id': provider_id,
'date': date_str,
'duration': duration
})
except ValueError as e:
return JsonResponse({
'success': False,
'error': _('Invalid date format: %(error)s') % {'error': str(e)}
}, status=400)
except Exception as e:
return JsonResponse({
'success': False,
'error': _('Error getting available slots: %(error)s') % {'error': str(e)}
}, status=500)
class CheckConsentStatusView(LoginRequiredMixin, View):
"""
API endpoint to check consent status for a patient and service type.
Features:
- Returns consent status (valid/missing)
- Lists missing consent types
- Provides consent creation URL
"""
def get(self, request):
"""Check consent status."""
from core.services import ConsentService
from core.models import Patient, Clinic
# Get parameters
patient_id = request.GET.get('patient')
clinic_id = request.GET.get('clinic')
# Validate parameters
if not patient_id:
return JsonResponse({
'success': False,
'error': _('Patient is required')
}, status=400)
if not clinic_id:
return JsonResponse({
'success': False,
'error': _('Clinic is required')
}, status=400)
try:
# Get patient
patient = Patient.objects.get(
id=patient_id,
tenant=request.user.tenant
)
# Get clinic
clinic = Clinic.objects.get(
id=clinic_id,
tenant=request.user.tenant
)
# Get service type from clinic specialty
service_type = self._get_service_type_from_clinic(clinic)
# Check consent
has_consent, consent_message = ConsentService.verify_consent_for_service(
patient, service_type
)
# Get missing consents
missing_consents = ConsentService.get_missing_consents(patient, service_type)
# Get the first missing consent type to pre-populate
consent_type_param = f"&consent_type={missing_consents[0]}" if missing_consents else ""
# Build response
response_data = {
'success': True,
'has_consent': has_consent,
'message': consent_message,
'missing_consents': missing_consents,
'patient_id': str(patient.id),
'patient_name': patient.full_name_en,
'service_type': service_type,
'consent_url': f"{reverse_lazy('core:consent_create')}?patient={patient.pk}{consent_type_param}"
}
return JsonResponse(response_data)
except Patient.DoesNotExist:
return JsonResponse({
'success': False,
'error': _('Patient not found')
}, status=404)
except Clinic.DoesNotExist:
return JsonResponse({
'success': False,
'error': _('Clinic not found')
}, status=404)
except Exception as e:
return JsonResponse({
'success': False,
'error': _('Error checking consent: %(error)s') % {'error': str(e)}
}, status=500)
def _get_service_type_from_clinic(self, clinic):
"""Get service type from clinic specialty."""
if not clinic:
return 'MEDICAL'
specialty_to_service = {
'MEDICAL': 'MEDICAL',
'NURSING': 'NURSING',
'ABA': 'ABA',
'OT': 'OT',
'SLP': 'SLP',
'PSYCHOLOGY': 'PSYCHOLOGY',
'PHYSIOTHERAPY': 'PHYSIOTHERAPY',
'NUTRITION': 'NUTRITION',
}
return specialty_to_service.get(clinic.specialty, 'MEDICAL')
class AvailableRoomsView(LoginRequiredMixin, View):
"""
API endpoint to get available rooms for a clinic at a specific date/time.
Features:
- Returns only rooms that are available (no conflicts)
- Checks room conflicts based on date, time, and duration
- Filters by clinic
"""
def get(self, request):
"""Get available rooms."""
from datetime import datetime
from .room_conflict_service import RoomAvailabilityService
from core.models import Clinic
# Get parameters
clinic_id = request.GET.get('clinic')
date_str = request.GET.get('date')
time_str = request.GET.get('time')
duration = int(request.GET.get('duration', 30))
# Validate parameters
if not clinic_id:
return JsonResponse({
'success': False,
'error': _('Clinic is required')
}, status=400)
try:
# Get clinic
clinic = Clinic.objects.get(
id=clinic_id,
tenant=request.user.tenant
)
# If date and time provided, filter by availability
if date_str and time_str:
try:
# Parse date and time
scheduled_date = datetime.strptime(date_str, '%Y-%m-%d').date()
scheduled_time = datetime.strptime(time_str, '%H:%M').time()
# Get available rooms using the service
available_rooms = RoomAvailabilityService.get_available_rooms(
clinic=clinic,
scheduled_date=scheduled_date,
scheduled_time=scheduled_time,
duration=duration,
tenant=request.user.tenant
)
except ValueError as e:
return JsonResponse({
'success': False,
'error': _('Invalid date or time format: %(error)s') % {'error': str(e)}
}, status=400)
else:
# No date/time provided, return all rooms for clinic
# Room is already imported via 'from .models import *' at top of file
available_rooms = Room.objects.filter(
clinic=clinic,
tenant=request.user.tenant
).filter(
Q(is_available=True) | Q(is_available__isnull=True)
)
# Build response
rooms_data = []
for room in available_rooms:
room_data = {
'id': str(room.id),
'room_number': room.room_number,
'name': room.name,
}
# Add optional fields if they exist
if hasattr(room, 'room_type'):
room_data['room_type'] = room.room_type
if hasattr(room, 'capacity'):
room_data['capacity'] = room.capacity
rooms_data.append(room_data)
return JsonResponse({
'success': True,
'rooms': rooms_data,
'clinic_id': str(clinic.id),
'date': date_str,
'time': time_str,
'duration': duration
})
except Clinic.DoesNotExist:
return JsonResponse({
'success': False,
'error': _('Clinic not found')
}, status=404)
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"Error in AvailableRoomsView: {error_details}")
return JsonResponse({
'success': False,
'error': _('Error getting available rooms: %(error)s') % {'error': str(e)}
}, status=500)
class AppointmentQuickViewView(LoginRequiredMixin, TenantFilterMixin, DetailView):
"""
Quick view for appointment details (used in calendar modal).
Returns a partial HTML template for AJAX loading.
"""
model = Appointment
template_name = 'appointments/appointment_quick_view.html'
context_object_name = 'appointment'
def get_queryset(self):
"""Filter by tenant."""
return super().get_queryset().select_related(
'patient', 'clinic', 'provider__user', 'room'
)
class AppointmentEventsView(LoginRequiredMixin, TenantFilterMixin, View):
"""
API endpoint for calendar events.
Returns appointments in FullCalendar-compatible JSON format.
"""
def get(self, request):
"""Get appointments as calendar events."""
from datetime import datetime, timedelta
# Get date range from query params
start_str = request.GET.get('start')
end_str = request.GET.get('end')
if not start_str or not end_str:
return JsonResponse({'error': _('start and end parameters are required')}, status=400)
# Parse dates
try:
start_date = datetime.fromisoformat(start_str.replace('Z', '+00:00')).date()
end_date = datetime.fromisoformat(end_str.replace('Z', '+00:00')).date()
except ValueError:
return JsonResponse({'error': _('Invalid date format')}, status=400)
# Get appointments in date range
queryset = Appointment.objects.filter(
tenant=request.user.tenant,
scheduled_date__gte=start_date,
scheduled_date__lte=end_date
).select_related('patient', 'provider__user', 'clinic', 'room')
# Apply filters
service = request.GET.get('service', '').strip()
if service:
queryset = queryset.filter(service_type__icontains=service)
provider_id = request.GET.get('provider', '').strip()
if provider_id:
queryset = queryset.filter(provider_id=provider_id)
status_filter = request.GET.get('status', '').strip()
if status_filter:
queryset = queryset.filter(status=status_filter)
# Build events list
events = []
for appointment in queryset:
# Calculate end time
start_datetime = datetime.combine(
appointment.scheduled_date,
appointment.scheduled_time
)
end_datetime = start_datetime + timedelta(minutes=appointment.duration)
events.append({
'id': str(appointment.id),
'patient_name': appointment.patient.full_name_en,
'service_name': appointment.service_type,
'provider_name': appointment.provider.user.get_full_name(),
'room_name': appointment.room.name if appointment.room else '',
'start_time': start_datetime.isoformat(),
'end_time': end_datetime.isoformat(),
'status': appointment.status,
})
return JsonResponse(events, safe=False)
class DeclineAppointmentView(View):
"""
Quick decline view (alternative to confirm page).
Features:
- Direct decline link
- Optional reason
- Confirmation message
"""
def get(self, request, token):
"""Display decline confirmation page."""
from .confirmation_service import ConfirmationService
# Get confirmation
confirmation = ConfirmationService.get_confirmation_by_token(token)
if not confirmation:
return render(request, 'appointments/confirmation_error.html', {
'error_title': _('Invalid Link'),
'error_message': _('This link is invalid or has expired.')
})
# Check if already processed
if confirmation.status in ['CONFIRMED', 'DECLINED']:
context = {
'confirmation': confirmation,
'appointment': confirmation.appointment,
'status': confirmation.status,
}
return render(request, 'appointments/confirmation_already_processed.html', context)
# Render decline form
context = {
'confirmation': confirmation,
'appointment': confirmation.appointment,
'token': token,
}
return render(request, 'appointments/decline_appointment.html', context)
def post(self, request, token):
"""Process decline."""
from .confirmation_service import ConfirmationService
# Get confirmation
confirmation = ConfirmationService.get_confirmation_by_token(token)
if not confirmation:
return JsonResponse({
'success': False,
'error': _('Invalid confirmation link')
}, status=400)
# Get reason
reason = request.POST.get('reason', _('Patient declined'))
# Decline appointment
success, message = ConfirmationService.decline_appointment(
confirmation=confirmation,
reason=reason,
ip_address=self._get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', '')
)
if success:
context = {
'confirmation': confirmation,
'appointment': confirmation.appointment,
'action': 'declined',
}
return render(request, 'appointments/confirmation_success.html', context)
else:
return render(request, 'appointments/confirmation_error.html', {
'error_title': _('Decline Failed'),
'error_message': message
})
def _get_client_ip(self, request):
"""Get client IP address."""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class AppointmentPDFView(LoginRequiredMixin, TenantFilterMixin, View):
"""
Generate PDF for appointment details.
Features:
- Appointment information
- Patient details
- Provider and clinic information
- Instructions from clinical documents if available
- Professional formatting with Arabic support
"""
def get(self, request, pk):
"""Generate and return PDF."""
from reportlab.lib.pagesizes import A4
from reportlab.lib.units import inch
from reportlab.lib import colors
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, Image
from reportlab.pdfgen import canvas
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.lib.enums import TA_CENTER, TA_RIGHT, TA_LEFT
from io import BytesIO
import os
from django.conf import settings
import arabic_reshaper
from bidi.algorithm import get_display
# Get appointment
appointment = get_object_or_404(
Appointment.objects.select_related(
'patient', 'provider__user', 'clinic', 'room', 'tenant'
),
pk=pk,
tenant=request.user.tenant
)
# Create PDF buffer
buffer = BytesIO()
# Create PDF document
doc = SimpleDocTemplate(
buffer,
pagesize=A4,
rightMargin=0.75*inch,
leftMargin=0.75*inch,
topMargin=1.5*inch,
bottomMargin=0.75*inch
)
# Container for PDF elements
elements = []
# Register Arabic font
try:
pdfmetrics.registerFont(TTFont('Arabic', '/System/Library/Fonts/SFArabic.ttf'))
ARABIC_FONT_AVAILABLE = True
except Exception as e:
ARABIC_FONT_AVAILABLE = False
# Styles
styles = getSampleStyleSheet()
# Helper function for Arabic text
def format_arabic(text):
"""Format Arabic text for proper display in PDF."""
if not text:
return ""
reshaped_text = arabic_reshaper.reshape(text)
return get_display(reshaped_text)
# Custom styles
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=18,
textColor=colors.HexColor('#0d6efd'),
spaceAfter=20,
alignment=TA_CENTER
)
heading_style = ParagraphStyle(
'CustomHeading',
parent=styles['Heading2'],
fontSize=14,
textColor=colors.HexColor('#212529'),
spaceAfter=12,
spaceBefore=12
)
normal_style = styles['Normal']
# Header with logo and tenant info
tenant = appointment.tenant
header_data = []
# Try to add logo if available from tenant settings
logo_path = None
try:
from core.models import TenantSetting, SettingTemplate
logo_setting = TenantSetting.objects.filter(
tenant=tenant,
template__key='basic_logo'
).first()
if logo_setting and logo_setting.file_value:
logo_path = os.path.join(settings.MEDIA_ROOT, str(logo_setting.file_value))
if os.path.exists(logo_path):
logo = Image(logo_path, width=0.8*inch, height=0.8*inch)
logo.hAlign = 'LEFT'
else:
logo_path = None
except Exception as e:
# If logo retrieval fails, continue without logo
logo_path = None
# Create header table
if logo_path:
tenant_info_html = f'<b>{tenant.name}</b><br/>'
if tenant.name_ar and ARABIC_FONT_AVAILABLE:
tenant_info_html += f'<font name="Arabic" size=11>{format_arabic(tenant.name_ar)}</font><br/>'
header_data = [[logo, Paragraph(tenant_info_html, ParagraphStyle(
'TenantInfo',
parent=styles['Normal'],
fontSize=12,
alignment=TA_CENTER
))]]
header_table = Table(header_data, colWidths=[2*inch, 4*inch])
header_table.setStyle(TableStyle([
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('ALIGN', (0, 0), (0, 0), 'LEFT'),
('ALIGN', (1, 0), (1, 0), 'CENTER'),
]))
elements.append(header_table)
else:
# No logo, just tenant name
tenant_name_html = f'<b>{tenant.name}</b><br/>'
if tenant.name_ar and ARABIC_FONT_AVAILABLE:
tenant_name_html += f'<font name="Arabic" size=14>{format_arabic(tenant.name_ar)}</font>'
tenant_name = Paragraph(tenant_name_html,
ParagraphStyle('TenantName', parent=styles['Heading1'], fontSize=16, alignment=TA_CENTER))
elements.append(tenant_name)
elements.append(Spacer(1, 0.15*inch))
# Title
title_html = f"Appointment Details - {appointment.appointment_number}<br/>"
if ARABIC_FONT_AVAILABLE:
title_html += f'<font name="Arabic" size=16>{format_arabic("تفاصيل الموعد")}</font>'
title = Paragraph(title_html, title_style)
elements.append(title)
elements.append(Spacer(1, 0.15*inch))
# Appointment Information Section
heading_html = "Appointment Information / "
if ARABIC_FONT_AVAILABLE:
heading_html += f'<font name="Arabic" size=12>{format_arabic("معلومات الموعد")}</font>'
elements.append(Paragraph(heading_html, heading_style))
# Build appointment data with Arabic font support using Paragraphs
appointment_data = []
cell_style = ParagraphStyle('Cell', parent=styles['Normal'], fontSize=10)
label_style = ParagraphStyle('Label', parent=styles['Normal'], fontSize=10, fontName='Helvetica-Bold')
label_html = "Appointment Number"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("رقم الموعد")}</font>'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(appointment.appointment_number, cell_style)
])
label_html = "Status"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("الحالة")}</font>'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(appointment.get_status_display(), cell_style)
])
label_html = "Service Type"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("نوع الخدمة")}</font>'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(appointment.service_type, cell_style)
])
label_html = "Date"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("التاريخ")}</font>'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(appointment.scheduled_date.strftime('%A, %B %d, %Y'), cell_style)
])
label_html = "Time"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("الوقت")}</font>'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(f"{appointment.scheduled_time.strftime('%H:%M')} ({appointment.duration} minutes)", cell_style)
])
label_html = "Clinic"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("العيادة")}</font>'
clinic_value = appointment.clinic.name_en
if appointment.clinic.name_ar and ARABIC_FONT_AVAILABLE:
clinic_value += f' / <font name="Arabic" size=9>{format_arabic(appointment.clinic.name_ar)}</font>'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(clinic_value, cell_style)
])
label_html = "Room"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("الغرفة")}</font>'
room_value = f"{appointment.room.room_number} - {appointment.room.name}" if appointment.room else 'Not assigned'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(room_value, cell_style)
])
label_html = "Provider"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("مقدم الخدمة")}</font>'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(f"{appointment.provider.user.get_full_name()} ({appointment.provider.user.get_role_display()})", cell_style)
])
appointment_table = Table(appointment_data, colWidths=[2.5*inch, 3.5*inch])
appointment_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (0, -1), colors.HexColor('#f8f9fa')),
('TEXTCOLOR', (0, 0), (-1, -1), colors.black),
('ALIGN', (0, 0), (0, -1), 'RIGHT'),
('ALIGN', (1, 0), (1, -1), 'LEFT'),
('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'),
('FONTNAME', (1, 0), (1, -1), 'Helvetica'),
('FONTSIZE', (0, 0), (-1, -1), 10),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('LEFTPADDING', (0, 0), (-1, -1), 8),
('RIGHTPADDING', (0, 0), (-1, -1), 8),
('TOPPADDING', (0, 0), (-1, -1), 6),
('BOTTOMPADDING', (0, 0), (-1, -1), 6),
]))
elements.append(appointment_table)
elements.append(Spacer(1, 0.3*inch))
# Patient Information Section
heading_html = "Patient Information / "
if ARABIC_FONT_AVAILABLE:
heading_html += f'<font name="Arabic" size=12>{format_arabic("معلومات المريض")}</font>'
elements.append(Paragraph(heading_html, heading_style))
patient = appointment.patient
patient_name_ar = f"{patient.first_name_ar} {patient.last_name_ar}" if patient.first_name_ar and patient.last_name_ar else ""
# Build patient data with Arabic font support using Paragraphs
patient_data = []
label_html = "Name"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("الاسم")}</font>'
patient_value = f"{patient.first_name_en} {patient.last_name_en}"
if patient_name_ar and ARABIC_FONT_AVAILABLE:
patient_value += f' / <font name="Arabic" size=9>{format_arabic(patient_name_ar)}</font>'
patient_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(patient_value, cell_style)
])
label_html = "MRN"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("رقم السجل الطبي")}</font>'
patient_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(patient.mrn, cell_style)
])
label_html = "Date of Birth"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("تاريخ الميلاد")}</font>'
patient_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(patient.date_of_birth.strftime('%Y-%m-%d'), cell_style)
])
label_html = "Gender"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("الجنس")}</font>'
patient_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(patient.get_sex_display(), cell_style)
])
label_html = "Phone"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("الهاتف")}</font>'
patient_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(str(patient.phone), cell_style)
])
label_html = "Email"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("البريد الإلكتروني")}</font>'
patient_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(patient.email if patient.email else 'Not provided', cell_style)
])
patient_table = Table(patient_data, colWidths=[2.5*inch, 3.5*inch])
patient_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (0, -1), colors.HexColor('#f8f9fa')),
('TEXTCOLOR', (0, 0), (-1, -1), colors.black),
('ALIGN', (0, 0), (0, -1), 'RIGHT'),
('ALIGN', (1, 0), (1, -1), 'LEFT'),
('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'),
('FONTNAME', (1, 0), (1, -1), 'Helvetica'),
('FONTSIZE', (0, 0), (-1, -1), 10),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('LEFTPADDING', (0, 0), (-1, -1), 8),
('RIGHTPADDING', (0, 0), (-1, -1), 8),
('TOPPADDING', (0, 0), (-1, -1), 6),
('BOTTOMPADDING', (0, 0), (-1, -1), 6),
]))
elements.append(patient_table)
elements.append(Spacer(1, 0.3*inch))
# Notes Section (if available)
if appointment.notes:
heading_html = "Notes / "
if ARABIC_FONT_AVAILABLE:
heading_html += f'<font name="Arabic" size=12>{format_arabic("ملاحظات")}</font>'
elements.append(Paragraph(heading_html, heading_style))
notes_text = Paragraph(appointment.notes, normal_style)
elements.append(notes_text)
elements.append(Spacer(1, 0.2*inch))
# Instructions Section (from clinical documents)
instructions = self._get_clinical_instructions(appointment)
if instructions:
heading_html = "Clinical Instructions / "
if ARABIC_FONT_AVAILABLE:
heading_html += f'<font name="Arabic" size=12>{format_arabic("التعليمات السريرية")}</font>'
elements.append(Paragraph(heading_html, heading_style))
for instruction in instructions:
instruction_text = Paragraph(f"{instruction}", normal_style)
elements.append(instruction_text)
elements.append(Spacer(1, 0.1*inch))
# Footer with generation info
elements.append(Spacer(1, 0.5*inch))
footer_text = f"Generated on: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}"
footer = Paragraph(footer_text, ParagraphStyle(
'Footer',
parent=styles['Normal'],
fontSize=8,
textColor=colors.grey,
alignment=1
))
elements.append(footer)
# Build PDF
doc.build(elements)
# Get PDF value
pdf = buffer.getvalue()
buffer.close()
# Create response
response = HttpResponse(content_type='application/pdf')
# Check if view parameter is set to inline
view_mode = request.GET.get('view', 'download')
if view_mode == 'inline':
response['Content-Disposition'] = f'inline; filename="appointment_{appointment.appointment_number}.pdf"'
else:
response['Content-Disposition'] = f'attachment; filename="appointment_{appointment.appointment_number}.pdf"'
response.write(pdf)
return response
def _get_clinical_instructions(self, appointment):
"""Extract instructions from clinical documents."""
instructions = []
# Import models
try:
from nursing.models import NursingEncounter
from medical.models import MedicalConsultation, MedicalFollowUp
from aba.models import ABAConsult
from ot.models import OTConsult, OTSession
from slp.models import SLPConsult, SLPAssessment, SLPIntervention
# Check nursing encounter
nursing = NursingEncounter.objects.filter(appointment=appointment).first()
if nursing and hasattr(nursing, 'instructions') and nursing.instructions:
instructions.append(f"Nursing: {nursing.instructions}")
# Check medical consultation
medical_consult = MedicalConsultation.objects.filter(appointment=appointment).first()
if medical_consult and hasattr(medical_consult, 'instructions') and medical_consult.instructions:
instructions.append(f"Medical Consultation: {medical_consult.instructions}")
# Check medical follow-up
medical_followup = MedicalFollowUp.objects.filter(appointment=appointment).first()
if medical_followup and hasattr(medical_followup, 'instructions') and medical_followup.instructions:
instructions.append(f"Medical Follow-up: {medical_followup.instructions}")
# Check ABA
aba = ABAConsult.objects.filter(appointment=appointment).first()
if aba and hasattr(aba, 'recommendations') and aba.recommendations:
instructions.append(f"ABA Recommendations: {aba.recommendations}")
# Check OT consultation
ot_consult = OTConsult.objects.filter(appointment=appointment).first()
if ot_consult and hasattr(ot_consult, 'recommendations') and ot_consult.recommendations:
instructions.append(f"OT Recommendations: {ot_consult.recommendations}")
# Check OT session
ot_session = OTSession.objects.filter(appointment=appointment).first()
if ot_session and hasattr(ot_session, 'home_program') and ot_session.home_program:
instructions.append(f"OT Home Program: {ot_session.home_program}")
# Check SLP consultation
slp_consult = SLPConsult.objects.filter(appointment=appointment).first()
if slp_consult and hasattr(slp_consult, 'recommendations') and slp_consult.recommendations:
instructions.append(f"SLP Recommendations: {slp_consult.recommendations}")
# Check SLP assessment
slp_assessment = SLPAssessment.objects.filter(appointment=appointment).first()
if slp_assessment and hasattr(slp_assessment, 'recommendations') and slp_assessment.recommendations:
instructions.append(f"SLP Assessment Recommendations: {slp_assessment.recommendations}")
# Check SLP intervention
slp_intervention = SLPIntervention.objects.filter(appointment=appointment).first()
if slp_intervention and hasattr(slp_intervention, 'home_program') and slp_intervention.home_program:
instructions.append(f"SLP Home Program: {slp_intervention.home_program}")
except Exception as e:
# If any model doesn't exist or has issues, just skip it
pass
return instructions
class AppointmentEmailPDFView(LoginRequiredMixin, TenantFilterMixin, View):
"""
Email appointment PDF to patient.
Features:
- Generate PDF
- Send via email with optional custom message
- Uses existing email infrastructure
"""
def post(self, request, pk):
"""Send appointment PDF via email."""
from django.core.mail import EmailMessage
from django.template.loader import render_to_string
from io import BytesIO
# Get appointment
appointment = get_object_or_404(
Appointment.objects.select_related(
'patient', 'provider__user', 'clinic', 'room', 'tenant'
),
pk=pk,
tenant=request.user.tenant
)
# Get email address and message from form
email_address = request.POST.get('email_address', '').strip()
custom_message = request.POST.get('email_message', '').strip()
# Validate email
if not email_address:
messages.error(request, _('Email address is required.'))
return redirect('appointments:appointment_detail', pk=pk)
try:
# Generate PDF using the same logic as AppointmentPDFView
pdf_view = AppointmentPDFView()
pdf_view.request = request
# Create a mock request with GET parameters to generate PDF
from django.test import RequestFactory
factory = RequestFactory()
pdf_request = factory.get(f'/appointments/{pk}/pdf/')
pdf_request.user = request.user
# Generate PDF
pdf_response = pdf_view.get(pdf_request, pk)
pdf_content = pdf_response.content
# Prepare email subject
subject = f"Appointment Details - {appointment.appointment_number}"
# Prepare email body
context = {
'appointment': appointment,
'patient': appointment.patient,
'custom_message': custom_message,
'tenant': appointment.tenant,
}
# Create email body (plain text)
email_body = f"""
Dear {appointment.patient.first_name_en} {appointment.patient.last_name_en},
Please find attached the details for your appointment.
Appointment Number: {appointment.appointment_number}
Date: {appointment.scheduled_date.strftime('%A, %B %d, %Y')}
Time: {appointment.scheduled_time.strftime('%H:%M')}
Clinic: {appointment.clinic.name_en}
Provider: {appointment.provider.user.get_full_name()}
"""
if custom_message:
email_body += f"\n{custom_message}\n\n"
email_body += f"""
Best regards,
{appointment.tenant.name}
"""
# Create email
email = EmailMessage(
subject=subject,
body=email_body,
from_email=None, # Will use DEFAULT_FROM_EMAIL from settings
to=[email_address],
)
# Attach PDF
email.attach(
f'appointment_{appointment.appointment_number}.pdf',
pdf_content,
'application/pdf'
)
# Send email
email.send(fail_silently=False)
messages.success(
request,
_('Appointment PDF has been sent to %(email)s successfully!') % {'email': email_address}
)
except Exception as e:
messages.error(
request,
_('Failed to send email: %(error)s') % {'error': str(e)}
)
return redirect('appointments:appointment_detail', pk=pk)
# ============================================================================
# Session Views (Group Session Support)
# ============================================================================
class SessionListView(LoginRequiredMixin, TenantFilterMixin, PaginationMixin, ListView):
"""
List view for sessions (both individual and group).
Features:
- Filter by session type, status, clinic, provider
- Search by session number
- Show capacity information for group sessions
"""
model = Session
template_name = 'appointments/session_list.html'
context_object_name = 'sessions'
paginate_by = 25
def get_queryset(self):
"""Get filtered queryset."""
from .models import Session
queryset = Session.objects.filter(tenant=self.request.user.tenant)
# Apply search
search_query = self.request.GET.get('search_query', '').strip()
if search_query:
queryset = queryset.filter(
Q(session_number__icontains=search_query) |
Q(provider__user__first_name__icontains=search_query) |
Q(provider__user__last_name__icontains=search_query) |
Q(service_type__icontains=search_query)
)
# Apply filters
session_type = self.request.GET.get('session_type')
if session_type:
queryset = queryset.filter(session_type=session_type)
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
clinic_id = self.request.GET.get('clinic')
if clinic_id:
queryset = queryset.filter(clinic_id=clinic_id)
provider_id = self.request.GET.get('provider')
if provider_id:
queryset = queryset.filter(provider_id=provider_id)
date_from = self.request.GET.get('date_from')
if date_from:
queryset = queryset.filter(scheduled_date__gte=date_from)
date_to = self.request.GET.get('date_to')
if date_to:
queryset = queryset.filter(scheduled_date__lte=date_to)
return queryset.select_related(
'provider__user', 'clinic', 'room'
).prefetch_related('participants').order_by('-scheduled_date', '-scheduled_time')
def get_context_data(self, **kwargs):
"""Add search form and filter options."""
context = super().get_context_data(**kwargs)
from .forms import SessionSearchForm
context['search_form'] = SessionSearchForm(
self.request.GET,
tenant=self.request.user.tenant
)
return context
class SessionDetailView(LoginRequiredMixin, TenantFilterMixin, DetailView):
"""
Detail view for a session showing all participants.
Features:
- Show session details
- List all participants with their status
- Show capacity information
- Actions: add patient, start session, complete session
"""
model = Session
template_name = 'appointments/session_detail.html'
context_object_name = 'session'
def get_context_data(self, **kwargs):
"""Add participants and available actions."""
context = super().get_context_data(**kwargs)
session = self.object
# Get participants
context['participants'] = session.participants.select_related('patient').order_by('created_at')
# Get available actions
context['can_add_patients'] = not session.is_full and session.status == 'SCHEDULED'
context['can_start'] = session.status == 'SCHEDULED'
context['can_complete'] = session.status == 'IN_PROGRESS'
context['can_cancel'] = session.status in ['SCHEDULED', 'IN_PROGRESS']
return context
class GroupSessionCreateView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin,
SuccessMessageMixin, CreateView):
"""
Create a new group session.
Features:
- Create empty group session
- Set capacity (1-20)
- Validate provider availability
"""
model = Session
template_name = 'appointments/group_session_form.html'
success_message = _("Group session created successfully! Session: {session_number}")
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK]
def get_form_class(self):
from .forms import GroupSessionCreateForm
return GroupSessionCreateForm
def form_valid(self, form):
"""Create session using SessionService."""
from .session_service import SessionService
try:
# Create session
session = SessionService.create_group_session(
provider=form.cleaned_data['provider'],
clinic=form.cleaned_data['clinic'],
scheduled_date=form.cleaned_data['scheduled_date'],
scheduled_time=form.cleaned_data['scheduled_time'],
duration=form.cleaned_data['duration'],
service_type=form.cleaned_data['service_type'],
max_capacity=form.cleaned_data['max_capacity'],
room=form.cleaned_data.get('room'),
group_notes=form.cleaned_data.get('group_notes', '')
)
self.object = session
self.success_message = self.success_message.format(session_number=session.session_number)
messages.success(self.request, self.success_message)
return redirect(self.get_success_url())
except ValueError as e:
messages.error(self.request, str(e))
return self.form_invalid(form)
def get_success_url(self):
"""Redirect to session detail."""
return reverse_lazy('appointments:session_detail', kwargs={'pk': self.object.pk})
class AddPatientToSessionView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Add a patient to an existing session.
Features:
- Check capacity
- Validate patient availability
- Generate appointment number
"""
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK]
def get(self, request, pk):
"""Show form to add patient."""
from .models import Session
from .forms import AddPatientToSessionForm
session = get_object_or_404(Session, pk=pk, tenant=request.user.tenant)
if session.is_full:
messages.error(request, _('Session is full. Cannot add more patients.'))
return redirect('appointments:session_detail', pk=pk)
form = AddPatientToSessionForm(tenant=request.user.tenant, session=session)
return render(request, 'appointments/add_patient_to_session.html', {
'form': form,
'session': session
})
def post(self, request, pk):
"""Add patient to session with consent validation."""
from .models import Session
from .forms import AddPatientToSessionForm
from .session_service import SessionService
from core.services import ConsentService
session = get_object_or_404(Session, pk=pk, tenant=request.user.tenant)
form = AddPatientToSessionForm(
request.POST,
tenant=request.user.tenant,
session=session
)
if form.is_valid():
patient = form.cleaned_data['patient']
# Get service type from clinic specialty
service_type = self._get_service_type_from_clinic(session.clinic)
# Check consent before adding patient to session
has_consent, consent_message = ConsentService.verify_consent_for_service(
patient, service_type
)
if not has_consent:
# Get missing consents for detailed feedback
missing_consents = ConsentService.get_missing_consents(patient, service_type)
# Build error message
error_msg = _(
"Cannot add patient to session: {message}. "
"Patient must sign required consent forms before booking."
).format(message=consent_message)
if missing_consents:
error_msg += " " + _("Missing consent types: {types}.").format(
types=', '.join(missing_consents)
)
messages.error(request, error_msg)
# Add a button/link to create consent in the message
messages.warning(
request,
_("Please create and sign the required consent forms for this patient first. "
"Go to patient detail page → Consents tab to create consent forms.")
)
# Redirect to patient detail page with consent tab
redirect_url = reverse_lazy('core:patient_detail', kwargs={'pk': patient.pk})
redirect_url = f"{redirect_url}?tab=consents&missing={','.join(missing_consents)}&return_to=session_{session.pk}"
return redirect(redirect_url)
# Consent verified, proceed with adding patient to session
try:
participant = SessionService.add_patient_to_session(
session=session,
patient=patient,
individual_notes=form.cleaned_data.get('individual_notes', '')
)
messages.success(
request,
_('Patient %(patient)s added to session. Appointment #: %(appt)s') % {
'patient': participant.patient.full_name_en,
'appt': participant.appointment_number
}
)
return redirect('appointments:session_detail', pk=pk)
except ValueError as e:
messages.error(request, str(e))
return render(request, 'appointments/add_patient_to_session.html', {
'form': form,
'session': session
})
def _get_service_type_from_clinic(self, clinic):
"""Get service type from clinic specialty."""
if not clinic:
return 'MEDICAL' # Default
# Map clinic specialty to service type
specialty_to_service = {
'MEDICAL': 'MEDICAL',
'NURSING': 'NURSING',
'ABA': 'ABA',
'OT': 'OT',
'SLP': 'SLP',
'PSYCHOLOGY': 'PSYCHOLOGY',
'PHYSIOTHERAPY': 'PHYSIOTHERAPY',
'NUTRITION': 'NUTRITION',
}
return specialty_to_service.get(clinic.specialty, 'MEDICAL')
class SessionParticipantCheckInView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Check in a participant (mark as arrived).
Features:
- Validate prerequisites (finance, consent)
- Update participant status
"""
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK]
def post(self, request, pk):
"""Check in participant."""
from .models import SessionParticipant
from .session_service import SessionService
participant = get_object_or_404(
SessionParticipant,
pk=pk,
session__tenant=request.user.tenant
)
try:
SessionService.check_in_participant(participant, checked_in_by=request.user)
messages.success(
request,
_('%(patient)s checked in successfully!') % {'patient': participant.patient.full_name_en}
)
except ValueError as e:
messages.error(request, str(e))
return redirect('appointments:session_detail', pk=participant.session.pk)
class SessionParticipantStatusUpdateView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Update participant status (confirm, no-show, cancel, etc.).
"""
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK, User.Role.DOCTOR,
User.Role.NURSE, User.Role.OT, User.Role.SLP, User.Role.ABA]
def post(self, request, pk):
"""Update participant status."""
from .models import SessionParticipant
from .session_service import SessionService
participant = get_object_or_404(
SessionParticipant,
pk=pk,
session__tenant=request.user.tenant
)
action = request.POST.get('action')
notes = request.POST.get('notes', '')
try:
if action == 'confirm':
SessionService.confirm_participant(participant)
messages.success(request, _('Participant confirmed.'))
elif action == 'arrived':
SessionService.check_in_participant(participant, checked_in_by=request.user)
messages.success(request, _('Participant checked in.'))
elif action == 'attended':
SessionService.mark_participant_attended(participant)
messages.success(request, _('Participant marked as attended.'))
elif action == 'no_show':
reason = request.POST.get('no_show_reason', 'PATIENT_FORGOT')
SessionService.mark_participant_no_show(participant, reason, notes)
messages.warning(request, _('Participant marked as no-show.'))
elif action == 'cancel':
SessionService.remove_patient_from_session(
participant, notes or 'Cancelled', request.user
)
messages.info(request, _('Participant cancelled.'))
else:
messages.error(request, _('Invalid action.'))
except ValueError as e:
messages.error(request, str(e))
return redirect('appointments:session_detail', pk=participant.session.pk)
class SessionStartView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Start a session (mark as in progress).
"""
allowed_roles = [User.Role.DOCTOR, User.Role.NURSE, User.Role.OT,
User.Role.SLP, User.Role.ABA]
def post(self, request, pk):
"""Start session."""
from .models import Session
from .session_service import SessionService
session = get_object_or_404(
Session,
pk=pk,
tenant=request.user.tenant,
provider__user=request.user
)
try:
SessionService.start_session(session)
messages.success(request, _('Session started!'))
except ValueError as e:
messages.error(request, str(e))
return redirect('appointments:session_detail', pk=pk)
class SessionCompleteView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Complete a session.
"""
allowed_roles = [User.Role.DOCTOR, User.Role.NURSE, User.Role.OT,
User.Role.SLP, User.Role.ABA]
def post(self, request, pk):
"""Complete session."""
from .models import Session
from .session_service import SessionService
session = get_object_or_404(
Session,
pk=pk,
tenant=request.user.tenant,
provider__user=request.user
)
try:
SessionService.complete_session(session)
messages.success(request, _('Session completed!'))
except ValueError as e:
messages.error(request, str(e))
return redirect('appointments:session_detail', pk=pk)
class SessionCancelView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Cancel a session and all its participants.
"""
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK]
def post(self, request, pk):
"""Cancel session."""
from .models import Session
from .session_service import SessionService
session = get_object_or_404(Session, pk=pk, tenant=request.user.tenant)
cancel_reason = request.POST.get('cancel_reason', '')
if not cancel_reason:
messages.error(request, _('Cancellation reason is required.'))
return redirect('appointments:session_detail', pk=pk)
try:
SessionService.cancel_session(session, cancel_reason, request.user)
messages.warning(request, _('Session cancelled. All participants have been notified.'))
except ValueError as e:
messages.error(request, str(e))
return redirect('appointments:session_detail', pk=pk)
class AvailableGroupSessionsView(LoginRequiredMixin, TenantFilterMixin, ListView):
"""
List available group sessions with open spots.
Features:
- Show only group sessions with available capacity
- Filter by clinic, service type, date range
- Quick add patient action
"""
model = Session
template_name = 'appointments/available_group_sessions.html'
context_object_name = 'sessions'
paginate_by = 20
def get_queryset(self):
"""Get available group sessions."""
from .session_service import SessionService
from datetime import date, timedelta
# Get filter parameters
clinic_id = self.request.GET.get('clinic')
service_type = self.request.GET.get('service_type')
date_from = self.request.GET.get('date_from')
date_to = self.request.GET.get('date_to')
# Default date range: today to 30 days from now
if not date_from:
date_from = date.today()
else:
date_from = date.fromisoformat(date_from)
if not date_to:
date_to = date.today() + timedelta(days=30)
else:
date_to = date.fromisoformat(date_to)
# Get clinic
if clinic_id:
from core.models import Clinic
clinic = Clinic.objects.get(id=clinic_id, tenant=self.request.user.tenant)
else:
# Get first clinic for tenant
from core.models import Clinic
clinic = Clinic.objects.filter(tenant=self.request.user.tenant).first()
if not clinic:
from .models import Session
return Session.objects.none()
# Get available sessions
return SessionService.get_available_group_sessions(
clinic=clinic,
date_from=date_from,
date_to=date_to,
service_type=service_type
)
def get_context_data(self, **kwargs):
"""Add filter form."""
context = super().get_context_data(**kwargs)
from core.models import Clinic
context['clinics'] = Clinic.objects.filter(
tenant=self.request.user.tenant,
is_active=True
)
return context
# ============================================================================
# Package Selection API Views
# ============================================================================
class GetPackagesForPatientView(LoginRequiredMixin, View):
"""
API endpoint to get packages for a patient.
Returns both assigned packages (PackagePurchases) and available packages (Packages).
"""
def get(self, request):
"""Get packages for patient."""
from finance.models import Package, PackagePurchase
from django.db.models import F
patient_id = request.GET.get('patient')
if not patient_id:
return JsonResponse({
'success': False,
'error': _('Patient ID is required')
}, status=400)
try:
from core.models import Patient
patient = Patient.objects.get(
id=patient_id,
tenant=request.user.tenant
)
# Get assigned packages (existing PackagePurchases for this patient)
assigned_packages = PackagePurchase.objects.filter(
patient=patient,
status='ACTIVE'
).filter(
sessions_used__lt=F('total_sessions')
).select_related('package').order_by('-purchase_date')
assigned_data = []
for pp in assigned_packages:
assigned_data.append({
'id': str(pp.id),
'type': 'purchase',
'name': pp.package.name_en,
'name_ar': pp.package.name_ar,
'total_sessions': pp.total_sessions,
'sessions_used': pp.sessions_used,
'sessions_remaining': pp.sessions_remaining,
'purchase_date': pp.purchase_date.isoformat(),
'expiry_date': pp.expiry_date.isoformat(),
'is_expired': pp.is_expired,
'package_id': str(pp.package.id)
})
# Get available packages (all active packages not yet assigned to this patient)
# Get IDs of packages already assigned to this patient
assigned_package_ids = assigned_packages.values_list('package_id', flat=True)
# Get all active packages
available_packages = Package.objects.filter(
tenant=request.user.tenant,
is_active=True
).prefetch_related('packageservice_set__service')
available_data = []
for pkg in available_packages:
# Include all packages (user can purchase same package multiple times)
available_data.append({
'id': str(pkg.id),
'type': 'package',
'name': pkg.name_en,
'name_ar': pkg.name_ar,
'total_sessions': pkg.total_sessions,
'price': float(pkg.price),
'validity_days': pkg.validity_days,
'description': pkg.description,
'services': [
{
'name': ps.service.name_en,
'sessions': ps.sessions,
'clinic': ps.service.clinic.name_en if ps.service.clinic else None,
'clinic_id': str(ps.service.clinic.id) if ps.service.clinic else None
}
for ps in pkg.packageservice_set.all()
]
})
return JsonResponse({
'success': True,
'patient_id': str(patient.id),
'patient_name': patient.full_name_en,
'assigned_packages': assigned_data,
'available_packages': available_data
})
except Patient.DoesNotExist:
return JsonResponse({
'success': False,
'error': _('Patient not found')
}, status=404)
except Exception as e:
import traceback
print(f"Error in GetPackagesForPatientView: {traceback.format_exc()}")
return JsonResponse({
'success': False,
'error': _('Error fetching packages: %(error)s') % {'error': str(e)}
}, status=500)
class AssignPackageToPatientView(LoginRequiredMixin, RolePermissionMixin, View):
"""
API endpoint to assign a package to a patient.
Creates a new PackagePurchase record.
"""
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK]
# Exempt from CSRF for AJAX calls
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
@method_decorator(csrf_exempt)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
def post(self, request):
"""Assign package to patient."""
from finance.models import Package, PackagePurchase
from datetime import date, timedelta
import json
try:
# Try to parse JSON body
try:
data = json.loads(request.body.decode('utf-8'))
except (json.JSONDecodeError, AttributeError):
# Fallback to POST data
data = {
'package_id': request.POST.get('package_id'),
'patient_id': request.POST.get('patient_id')
}
package_id = data.get('package_id')
patient_id = data.get('patient_id')
print(f"AssignPackageToPatientView - package_id: {package_id}, patient_id: {patient_id}")
if not package_id or not patient_id:
return JsonResponse({
'success': False,
'error': _('Package ID and Patient ID are required')
}, status=400)
# Get package and patient
from core.models import Patient
package = Package.objects.get(
id=package_id,
tenant=request.user.tenant,
is_active=True
)
patient = Patient.objects.get(
id=patient_id,
tenant=request.user.tenant
)
# Create PackagePurchase
package_purchase = PackagePurchase.objects.create(
tenant=request.user.tenant,
patient=patient,
package=package,
purchase_date=date.today(),
expiry_date=date.today() + timedelta(days=package.validity_days),
total_sessions=package.total_sessions,
sessions_used=0,
status='ACTIVE',
invoice=None # No invoice required for package assignment
)
print(f"PackagePurchase created: {package_purchase.id}")
return JsonResponse({
'success': True,
'package_purchase_id': str(package_purchase.id),
'message': _('Package assigned to patient successfully'),
'package_name': package.name_en,
'total_sessions': package_purchase.total_sessions,
'expiry_date': package_purchase.expiry_date.isoformat()
})
except Package.DoesNotExist:
print("Package not found")
return JsonResponse({
'success': False,
'error': _('Package not found')
}, status=404)
except Patient.DoesNotExist:
print("Patient not found")
return JsonResponse({
'success': False,
'error': _('Patient not found')
}, status=404)
except Exception as e:
import traceback
error_trace = traceback.format_exc()
print(f"Error in AssignPackageToPatientView: {error_trace}")
return JsonResponse({
'success': False,
'error': _('Error assigning package: %(error)s') % {'error': str(e)}
}, status=500)
class GetClinicsForPackageView(LoginRequiredMixin, View):
"""
API endpoint to get clinics that can perform services in a package.
"""
def get(self, request):
"""Get clinics for package services."""
from finance.models import Package, PackagePurchase
package_id = request.GET.get('package_id')
package_purchase_id = request.GET.get('package_purchase_id')
if not package_id and not package_purchase_id:
return JsonResponse({
'success': False,
'error': _('Package ID or PackagePurchase ID is required')
}, status=400)
try:
# Get package (either from Package or PackagePurchase)
if package_purchase_id:
package_purchase = PackagePurchase.objects.get(
id=package_purchase_id,
patient__tenant=request.user.tenant
)
package = package_purchase.package
else:
package = Package.objects.get(
id=package_id,
tenant=request.user.tenant
)
# Get all services in package
package_services = package.packageservice_set.select_related('service__clinic').all()
# Get unique clinics from services
clinics = set()
for ps in package_services:
if ps.service and ps.service.clinic:
clinics.add(ps.service.clinic)
# Build response
clinics_data = []
for clinic in clinics:
clinics_data.append({
'id': str(clinic.id),
'name_en': clinic.name_en,
'name_ar': clinic.name_ar,
'specialty': clinic.specialty
})
return JsonResponse({
'success': True,
'clinics': clinics_data,
'package_name': package.name_en
})
except (Package.DoesNotExist, PackagePurchase.DoesNotExist):
return JsonResponse({
'success': False,
'error': _('Package not found')
}, status=404)
except Exception as e:
import traceback
print(f"Error in GetClinicsForPackageView: {traceback.format_exc()}")
return JsonResponse({
'success': False,
'error': _('Error fetching clinics: %(error)s') % {'error': str(e)}
}, status=500)
# ============================================================================
# Package Auto-Scheduling View
# ============================================================================
@login_required
def schedule_package_view(request, package_purchase_id):
"""
Auto-schedule all appointments for a package purchase.
Features:
- Select provider for all sessions
- Set start date and end date
- Select preferred days
- Auto-schedule all remaining sessions
"""
from finance.models import PackagePurchase
from .package_integration_service import PackageIntegrationService
# Get package purchase
package_purchase = get_object_or_404(
PackagePurchase.objects.select_related('package', 'patient'),
id=package_purchase_id,
patient__tenant=request.user.tenant
)
# Check if package has remaining sessions
if package_purchase.sessions_remaining <= 0:
messages.warning(request, _('This package has no remaining sessions to schedule.'))
return redirect('finance:package_purchase_detail', pk=package_purchase_id)
# Check if package is expired
if package_purchase.is_expired:
messages.error(request, _('This package has expired and cannot be used.'))
return redirect('finance:package_purchase_detail', pk=package_purchase_id)
if request.method == 'POST':
# Get form data
provider_id = request.POST.get('provider')
room_id = request.POST.get('room') # NEW: Get room selection
start_date_str = request.POST.get('start_date')
end_date_str = request.POST.get('end_date')
preferred_days = request.POST.getlist('preferred_days')
sessions_to_schedule = request.POST.get('sessions_to_schedule')
# Validate required fields
if not provider_id:
messages.error(request, _('Please select a provider'))
return redirect('appointments:schedule_package', package_purchase_id=package_purchase_id)
if not start_date_str:
messages.error(request, _('Please select a start date'))
return redirect('appointments:schedule_package', package_purchase_id=package_purchase_id)
if not sessions_to_schedule:
messages.error(request, _('Please enter the number of sessions to schedule'))
return redirect('appointments:schedule_package', package_purchase_id=package_purchase_id)
# Validate sessions_to_schedule
try:
sessions_to_schedule = int(sessions_to_schedule)
if sessions_to_schedule < 1:
messages.error(request, _('Number of sessions must be at least 1'))
return redirect('appointments:schedule_package', package_purchase_id=package_purchase_id)
if sessions_to_schedule > package_purchase.sessions_remaining:
messages.error(request, _('Cannot schedule more than %(max)s sessions') % {'max': package_purchase.sessions_remaining})
return redirect('appointments:schedule_package', package_purchase_id=package_purchase_id)
except ValueError:
messages.error(request, _('Invalid number of sessions'))
return redirect('appointments:schedule_package', package_purchase_id=package_purchase_id)
# Parse dates
from datetime import date as date_class
start_date = date_class.fromisoformat(start_date_str)
end_date = date_class.fromisoformat(end_date_str) if end_date_str else None
# Convert preferred days to integers
preferred_days_int = [int(d) for d in preferred_days] if preferred_days else None
# Schedule appointments (only the specified number)
appointments, errors = PackageIntegrationService.schedule_package_appointments(
package_purchase=package_purchase,
provider_id=provider_id,
room_id=room_id, # NEW: Pass room_id
start_date=start_date,
end_date=end_date,
preferred_days=preferred_days_int,
use_multiple_providers=False,
provider_assignments=None,
auto_schedule=True,
sessions_to_schedule=sessions_to_schedule
)
# Show results
if appointments:
if errors:
messages.warning(
request,
_('Scheduled %(count)d appointment(s) with some errors: %(errors)s') % {
'count': len(appointments),
'errors': ', '.join(errors[:3])
}
)
else:
messages.success(
request,
_('Successfully scheduled %(count)d appointment(s)!') % {'count': len(appointments)}
)
else:
messages.error(
request,
_('Failed to schedule appointments: %(errors)s') % {'errors': ', '.join(errors)}
)
return redirect('finance:package_purchase_detail', pk=package_purchase_id)
# GET request - show form
# Get available providers for the package's clinic
clinic = PackageIntegrationService._get_clinic_from_package(package_purchase)
if clinic:
providers = Provider.objects.filter(
tenant=request.user.tenant,
specialties=clinic,
is_available=True
)
else:
providers = Provider.objects.filter(
tenant=request.user.tenant,
is_available=True
)
return render(request, 'appointments/schedule_package_form.html', {
'package_purchase': package_purchase,
'providers': providers,
})