agdar/appointments/views.py
Marwan Alwali 25c9701c34 update
2025-11-06 18:18:43 +03:00

1734 lines
64 KiB
Python

"""
Appointments views for the Tenhal Multidisciplinary Healthcare Platform.
This module contains views for appointment management including:
- Appointment CRUD operations
- State machine transitions (confirm, reschedule, cancel, arrive, start, complete)
- Calendar views
- Provider schedules
- Appointment reminders
"""
from django.contrib.auth.mixins import LoginRequiredMixin
from django.db.models import Q, Count
from django.http import JsonResponse, HttpResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.views import View
from django.views.generic import ListView, DetailView, CreateView, UpdateView, DeleteView
from django.urls import reverse_lazy
from django.contrib import messages
from core.mixins import (
TenantFilterMixin,
RolePermissionMixin,
AuditLogMixin,
HTMXResponseMixin,
SuccessMessageMixin,
PaginationMixin,
)
from core.models import *
from .models import *
from .forms import *
class AppointmentListView(LoginRequiredMixin, TenantFilterMixin, PaginationMixin,
HTMXResponseMixin, ListView):
"""
Appointment list view with filtering and search.
Features:
- Filter by status, clinic, provider, date range
- Search by patient name/MRN
- Role-based filtering (providers see only their appointments)
- Export to CSV
"""
model = Appointment
template_name = 'appointments/appointment_list.html'
htmx_template_name = 'appointments/partials/appointment_list_partial.html'
context_object_name = 'appointments'
paginate_by = 25
def get_queryset(self):
"""Get filtered queryset based on role and filters."""
queryset = super().get_queryset()
user = self.request.user
# Role-based filtering
if user.role in [User.Role.DOCTOR, User.Role.NURSE, User.Role.OT,
User.Role.SLP, User.Role.ABA]:
# Clinical staff see only their appointments
queryset = queryset.filter(provider__user=user)
# Apply search
search_query = self.request.GET.get('search', '').strip()
if search_query:
queryset = queryset.filter(
Q(patient__first_name_en__icontains=search_query) |
Q(patient__last_name_en__icontains=search_query) |
Q(patient__mrn__icontains=search_query) |
Q(appointment_number__icontains=search_query)
)
# Apply filters
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
clinic_id = self.request.GET.get('clinic')
if clinic_id:
queryset = queryset.filter(clinic_id=clinic_id)
provider_id = self.request.GET.get('provider')
if provider_id:
queryset = queryset.filter(provider_id=provider_id)
date_from = self.request.GET.get('date_from')
if date_from:
queryset = queryset.filter(appointment_date__gte=date_from)
date_to = self.request.GET.get('date_to')
if date_to:
queryset = queryset.filter(appointment_date__lte=date_to)
# Default ordering
return queryset.select_related(
'patient', 'provider', 'clinic', 'room', 'cancelled_by',
).order_by('-scheduled_date', '-scheduled_time')
def get_context_data(self, **kwargs):
"""Add filter options to context."""
context = super().get_context_data(**kwargs)
# Add search form
context['search_form'] = AppointmentSearchForm(self.request.GET)
# Add filter options
context['clinics'] = Clinic.objects.filter(tenant=self.request.user.tenant)
context['providers'] = User.objects.filter(
tenant=self.request.user.tenant,
role__in=[User.Role.DOCTOR, User.Role.NURSE, User.Role.OT,
User.Role.SLP, User.Role.ABA]
)
context['status_choices'] = Appointment.Status.choices
# Add current filters
context['current_filters'] = {
'search': self.request.GET.get('search', ''),
'status': self.request.GET.get('status', ''),
'clinic': self.request.GET.get('clinic', ''),
'provider': self.request.GET.get('provider', ''),
'date_from': self.request.GET.get('date_from', ''),
'date_to': self.request.GET.get('date_to', ''),
}
return context
class AppointmentCalendarView(LoginRequiredMixin, TenantFilterMixin, ListView):
"""
Calendar view for appointments.
Features:
- Weekly/monthly calendar grid
- Filter by clinic/provider
- Drag-and-drop rescheduling (HTMX)
- Color-coded by status
"""
model = Appointment
template_name = 'appointments/appointment_calendar.html'
context_object_name = 'appointments'
def get_queryset(self):
"""Get appointments for the selected date range."""
queryset = super().get_queryset()
user = self.request.user
# Role-based filtering
if user.role in [User.Role.DOCTOR, User.Role.NURSE, User.Role.OT,
User.Role.SLP, User.Role.ABA, User.Role.ADMIN]:
queryset = queryset.filter(provider__user=user)
# Get date range (default: current week)
view_type = self.request.GET.get('view', 'week') # week or month
date_str = self.request.GET.get('date')
if date_str:
from datetime import datetime
base_date = datetime.strptime(date_str, '%Y-%m-%d').date()
else:
base_date = timezone.now().date()
if view_type == 'week':
# Get week range
start_date = base_date - timezone.timedelta(days=base_date.weekday())
end_date = start_date + timezone.timedelta(days=6)
else: # month
# Get month range
start_date = base_date.replace(day=1)
if base_date.month == 12:
end_date = base_date.replace(year=base_date.year + 1, month=1, day=1)
else:
end_date = base_date.replace(month=base_date.month + 1, day=1)
end_date = end_date - timezone.timedelta(days=1)
queryset = queryset.filter(
scheduled_date__gte=start_date,
scheduled_date__lte=end_date
)
# Apply filters
clinic_id = self.request.GET.get('clinic')
if clinic_id:
queryset = queryset.filter(clinic_id=clinic_id)
provider_id = self.request.GET.get('provider')
if provider_id:
queryset = queryset.filter(provider_id=provider_id)
return queryset.select_related('patient', 'provider', 'clinic', 'service')
def get_context_data(self, **kwargs):
"""Add calendar data to context."""
context = super().get_context_data(**kwargs)
# Add filter options
context['clinics'] = Clinic.objects.filter(tenant=self.request.user.tenant)
context['providers'] = User.objects.filter(
tenant=self.request.user.tenant,
role__in=[User.Role.DOCTOR, User.Role.NURSE, User.Role.OT,
User.Role.SLP, User.Role.ABA, User.Role.ADMIN]
)
# Get unique service types for filter
context['services'] = Appointment.objects.filter(
tenant=self.request.user.tenant
).values_list('service_type', flat=True).distinct().order_by('service_type')
# Add view type and date
context['view_type'] = self.request.GET.get('view', 'week')
context['current_date'] = self.request.GET.get('date', timezone.now().date())
# Calendar customization settings
context['calendar_settings'] = {
'initial_view': 'dayGridMonth',
'slot_duration': '00:30:00',
'slot_min_time': '08:00:00',
'slot_max_time': '18:00:00',
'weekends': True,
'all_day_slot': False,
'event_time_format': 'h:mm a',
'slot_label_format': 'h:mm a',
}
# Event status colors
context['status_colors'] = {
'BOOKED': '#0d6efd',
'CONFIRMED': '#198754',
'ARRIVED': '#ffc107',
'IN_PROGRESS': '#fd7e14',
'COMPLETED': '#6c757d',
'CANCELLED': '#dc3545',
'NO_SHOW': '#6c757d',
'RESCHEDULED': '#0dcaf0',
}
return context
class AppointmentDetailView(LoginRequiredMixin, TenantFilterMixin, DetailView):
"""
Appointment detail view.
Features:
- Full appointment details
- Patient information
- Status history
- Available actions based on current status
- Related clinical documents
"""
model = Appointment
template_name = 'appointments/appointment_detail.html'
context_object_name = 'appointment'
def get_context_data(self, **kwargs):
"""Add related data and available actions."""
context = super().get_context_data(**kwargs)
appointment = self.object
# Get available actions based on current status
context['available_actions'] = self._get_available_actions(appointment)
# Get status history (if using simple_history)
if hasattr(appointment, 'history'):
context['status_history'] = appointment.history.all()[:10]
# Get related clinical documents
context['clinical_documents'] = self._get_clinical_documents(appointment)
# Check if user can modify
context['can_modify'] = self._can_user_modify(appointment)
return context
def _get_available_actions(self, appointment):
"""Get list of available actions based on current status."""
actions = []
status = appointment.status
if status == Appointment.Status.BOOKED:
actions.extend(['confirm', 'reschedule', 'cancel'])
elif status == Appointment.Status.CONFIRMED:
actions.extend(['arrive', 'reschedule', 'cancel'])
elif status == Appointment.Status.ARRIVED:
actions.extend(['start', 'cancel'])
elif status == Appointment.Status.IN_PROGRESS:
actions.extend(['complete'])
elif status == Appointment.Status.RESCHEDULED:
actions.extend(['confirm', 'cancel'])
return actions
def _get_clinical_documents(self, appointment):
"""Get clinical documents related to this appointment."""
documents = {}
patient = appointment.patient
# Import models
from nursing.models import NursingEncounter
from medical.models import MedicalConsultation, MedicalFollowUp
from aba.models import ABAConsult
from ot.models import OTConsult, OTSession
from slp.models import SLPConsult, SLPAssessment, SLPIntervention
# Get documents linked to this appointment
documents['nursing'] = NursingEncounter.objects.filter(
appointment=appointment
).first()
documents['medical_consult'] = MedicalConsultation.objects.filter(
appointment=appointment
).first()
documents['medical_followup'] = MedicalFollowUp.objects.filter(
appointment=appointment
).first()
documents['aba'] = ABAConsult.objects.filter(
appointment=appointment
).first()
documents['ot_consult'] = OTConsult.objects.filter(
appointment=appointment
).first()
documents['ot_session'] = OTSession.objects.filter(
appointment=appointment
).first()
documents['slp_consult'] = SLPConsult.objects.filter(
appointment=appointment
).first()
documents['slp_assessment'] = SLPAssessment.objects.filter(
appointment=appointment
).first()
documents['slp_intervention'] = SLPIntervention.objects.filter(
appointment=appointment
).first()
return documents
def _can_user_modify(self, appointment):
"""Check if current user can modify appointment."""
user = self.request.user
# Admin and FrontDesk can always modify
if user.role in [User.Role.ADMIN, User.Role.FRONT_DESK]:
return True
# Provider can modify their own appointments
if appointment.provider == user:
return True
return False
class AppointmentCreateView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin,
SuccessMessageMixin, CreateView):
"""
Appointment creation view.
Features:
- Auto-generate appointment number
- Check provider availability
- Check patient consent
- Send confirmation notification
- Auto-populate patient from ?patient= URL parameter
"""
model = Appointment
form_class = AppointmentForm
template_name = 'appointments/appointment_form.html'
success_message = _("Appointment created successfully! Number: {appointment_number}")
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK]
def get_initial(self):
"""Set initial form values, including patient from URL parameter."""
initial = super().get_initial()
# Check for patient parameter in URL
patient_id = self.request.GET.get('patient')
if patient_id:
try:
# Verify patient exists and belongs to tenant
from core.models import Patient
patient = Patient.objects.get(
id=patient_id,
tenant=self.request.user.tenant
)
initial['patient'] = patient
except (Patient.DoesNotExist, ValueError):
# Invalid patient ID, ignore
pass
return initial
def get_success_url(self):
"""Redirect to appointment detail."""
return reverse_lazy('appointments:appointment_detail', kwargs={'pk': self.object.pk})
def form_valid(self, form):
"""Set tenant, generate number, and send notification."""
# Set tenant
form.instance.tenant = self.request.user.tenant
# Generate appointment number
if not form.instance.appointment_number:
form.instance.appointment_number = self._generate_appointment_number()
# Set initial status
form.instance.status = Appointment.Status.BOOKED
# Save appointment
response = super().form_valid(form)
# Send confirmation notification (async in production)
self._send_confirmation_notification()
# Update success message
self.success_message = self.success_message.format(
appointment_number=self.object.appointment_number
)
return response
def _generate_appointment_number(self):
"""Generate unique appointment number."""
import random
tenant = self.request.user.tenant
year = timezone.now().year
for _ in range(10):
random_num = random.randint(10000, 99999)
number = f"APT-{tenant.code}-{year}-{random_num}"
if not Appointment.objects.filter(appointment_number=number).exists():
return number
# Fallback
timestamp = int(timezone.now().timestamp())
return f"APT-{tenant.code}-{year}-{timestamp}"
def _send_confirmation_notification(self):
"""Send appointment confirmation notification."""
# TODO: Implement notification sending
# This would use the notifications app to send SMS/WhatsApp/Email
pass
def get_context_data(self, **kwargs):
"""Add form title to context."""
context = super().get_context_data(**kwargs)
context['form_title'] = _('Create New Appointment')
context['submit_text'] = _('Create Appointment')
return context
class AppointmentUpdateView(LoginRequiredMixin, RolePermissionMixin, TenantFilterMixin,
AuditLogMixin, SuccessMessageMixin, UpdateView):
"""
Appointment update view.
Features:
- Update appointment details
- Cannot change status (use state transition views)
- Audit trail
"""
model = Appointment
form_class = AppointmentForm
template_name = 'appointments/appointment_form.html'
success_message = _("Appointment updated successfully!")
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK, User.Role.DOCTOR,
User.Role.NURSE, User.Role.OT, User.Role.SLP, User.Role.ABA]
def get_success_url(self):
"""Redirect to appointment detail."""
return reverse_lazy('appointments:appointment_detail', kwargs={'pk': self.object.pk})
def get_form(self, form_class=None):
"""Disable status field."""
form = super().get_form(form_class)
# Make status read-only (use state transition views instead)
if 'status' in form.fields:
form.fields['status'].disabled = True
form.fields['status'].help_text = 'Use action buttons to change status'
return form
def get_context_data(self, **kwargs):
"""Add form title to context."""
context = super().get_context_data(**kwargs)
context['form_title'] = _('Update Appointment: %(number)s') % {'number': self.object.appointment_number}
context['submit_text'] = _('Update Appointment')
return context
# State Machine Transition Views
class AppointmentConfirmView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Confirm appointment (BOOKED → CONFIRMED).
Features:
- Update status to CONFIRMED
- Set confirmation timestamp
- Send confirmation notification
"""
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK]
def post(self, request, pk):
"""Confirm appointment."""
appointment = get_object_or_404(
Appointment,
pk=pk,
tenant=request.user.tenant
)
# Check if transition is valid
if appointment.status != Appointment.Status.BOOKED:
messages.error(request, _('Appointment cannot be confirmed from current status.'))
return redirect('appointments:appointment_detail', pk=pk)
# Update status
appointment.status = Appointment.Status.CONFIRMED
appointment.confirmation_at = timezone.now()
appointment.save()
# Send notification
self._send_notification(appointment, 'confirmed')
messages.success(request, _('Appointment confirmed successfully!'))
return redirect('appointments:appointment_detail', pk=pk)
def _send_notification(self, appointment, event_type):
"""Send notification for appointment event."""
# TODO: Implement notification
pass
class AppointmentRescheduleView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin,
SuccessMessageMixin, UpdateView):
"""
Reschedule appointment.
Features:
- Change date/time
- Record reschedule reason
- Update status to RESCHEDULED
- Send notification
"""
model = Appointment
form_class = RescheduleForm
template_name = 'appointments/appointment_reschedule.html'
success_message = _("Appointment rescheduled successfully!")
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK]
def get_success_url(self):
"""Redirect to appointment detail."""
return reverse_lazy('appointments:appointment_detail', kwargs={'pk': self.object.pk})
def form_valid(self, form):
"""Update status and send notification."""
# Update status
form.instance.status = Appointment.Status.RESCHEDULED
form.instance.reschedule_at = timezone.now()
# Save
response = super().form_valid(form)
# Send notification
self._send_notification(self.object, 'rescheduled')
return response
def _send_notification(self, appointment, event_type):
"""Send notification."""
# TODO: Implement notification
pass
class AppointmentCancelView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Cancel appointment.
Features:
- Update status to CANCELLED
- Record cancellation reason
- Send notification
"""
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK, User.Role.DOCTOR,
User.Role.NURSE, User.Role.OT, User.Role.SLP, User.Role.ABA]
def post(self, request, pk):
"""Cancel appointment."""
appointment = get_object_or_404(
Appointment,
pk=pk,
tenant=request.user.tenant
)
# Get cancellation reason
cancel_reason = request.POST.get('cancel_reason', '')
# Update status
appointment.status = Appointment.Status.CANCELLED
appointment.cancel_reason = cancel_reason
appointment.cancel_at = timezone.now()
appointment.save()
# Send notification
self._send_notification(appointment, 'cancelled')
messages.success(request, _('Appointment cancelled successfully!'))
return redirect('appointments:appointment_detail', pk=pk)
def _send_notification(self, appointment, event_type):
"""Send notification."""
# TODO: Implement notification
pass
class AppointmentArriveView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Mark patient as arrived (CONFIRMED → ARRIVED).
Features:
- Update status to ARRIVED
- Set arrival timestamp
- Trigger check-in workflow
"""
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK]
def post(self, request, pk):
"""Mark patient as arrived."""
appointment = get_object_or_404(
Appointment,
pk=pk,
tenant=request.user.tenant
)
# Check if transition is valid
if appointment.status != Appointment.Status.CONFIRMED:
messages.error(request, _('Patient can only arrive for confirmed appointments.'))
return redirect('appointments:appointment_detail', pk=pk)
# Update status
appointment.status = Appointment.Status.ARRIVED
appointment.arrival_at = timezone.now()
appointment.save()
messages.success(request, _('Patient marked as arrived!'))
return redirect('appointments:appointment_detail', pk=pk)
class AppointmentStartView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Start appointment (ARRIVED → IN_PROGRESS).
Features:
- Update status to IN_PROGRESS
- Set start timestamp
- Redirect to appropriate clinical form
"""
allowed_roles = [User.Role.DOCTOR, User.Role.NURSE, User.Role.OT,
User.Role.SLP, User.Role.ABA]
def post(self, request, pk):
"""Start appointment."""
appointment = get_object_or_404(
Appointment,
pk=pk,
tenant=request.user.tenant,
provider=request.user # Only provider can start their appointment
)
# Check if transition is valid
if appointment.status != Appointment.Status.ARRIVED:
messages.error(request, _('Appointment can only be started after patient arrives.'))
return redirect('appointments:appointment_detail', pk=pk)
# Update status
appointment.status = Appointment.Status.IN_PROGRESS
appointment.start_at = timezone.now()
appointment.save()
messages.success(request, _('Appointment started!'))
# Redirect to appropriate clinical form based on clinic
return self._redirect_to_clinical_form(appointment)
def _redirect_to_clinical_form(self, appointment):
"""Redirect to appropriate clinical form based on clinic specialty."""
clinic_specialty = appointment.clinic.specialty
if clinic_specialty == Clinic.Specialty.MEDICAL:
return redirect('medical:consultation_create', appointment_id=appointment.pk)
elif clinic_specialty == Clinic.Specialty.NURSING:
return redirect('nursing:encounter_create', appointment_id=appointment.pk)
elif clinic_specialty == Clinic.Specialty.ABA:
return redirect('aba:consult_create', appointment_id=appointment.pk)
elif clinic_specialty == Clinic.Specialty.OT:
return redirect('ot:consult_create', appointment_id=appointment.pk)
elif clinic_specialty == Clinic.Specialty.SLP:
return redirect('slp:consult_create', appointment_id=appointment.pk)
else:
return redirect('appointments:appointment_detail', pk=appointment.pk)
class AppointmentCompleteView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Complete appointment (IN_PROGRESS → COMPLETED).
Features:
- Update status to COMPLETED
- Set end timestamp
- Trigger post-appointment workflow (invoice, follow-up)
"""
allowed_roles = [User.Role.DOCTOR, User.Role.NURSE, User.Role.OT,
User.Role.SLP, User.Role.ABA]
def post(self, request, pk):
"""Complete appointment."""
appointment = get_object_or_404(
Appointment,
pk=pk,
tenant=request.user.tenant,
provider=request.user
)
# Check if transition is valid
if appointment.status != Appointment.Status.IN_PROGRESS:
messages.error(request, _('Only in-progress appointments can be completed.'))
return redirect('appointments:appointment_detail', pk=pk)
# Update status
appointment.status = Appointment.Status.COMPLETED
appointment.end_at = timezone.now()
appointment.save()
# Trigger post-appointment workflow
self._trigger_post_appointment_workflow(appointment)
messages.success(request, _('Appointment completed successfully!'))
return redirect('appointments:appointment_detail', pk=pk)
def _trigger_post_appointment_workflow(self, appointment):
"""Trigger post-appointment tasks."""
# TODO: Implement post-appointment workflow
# - Create invoice
# - Schedule follow-up
# - Send satisfaction survey
pass
class AppointmentNoShowView(LoginRequiredMixin, RolePermissionMixin, AuditLogMixin, View):
"""
Mark appointment as no-show.
Features:
- Update status to NO_SHOW
- Record timestamp
- Send notification
"""
allowed_roles = [User.Role.ADMIN, User.Role.FRONT_DESK]
def post(self, request, pk):
"""Mark as no-show."""
appointment = get_object_or_404(
Appointment,
pk=pk,
tenant=request.user.tenant
)
# Update status
appointment.status = Appointment.Status.NO_SHOW
appointment.no_show_at = timezone.now()
appointment.save()
# Send notification
self._send_notification(appointment, 'no_show')
messages.warning(request, _('Appointment marked as no-show.'))
return redirect('appointments:appointment_detail', pk=pk)
def _send_notification(self, appointment, event_type):
"""Send notification."""
# TODO: Implement notification
pass
# ============================================================================
# Patient Confirmation Views (Phase 5)
# ============================================================================
class ConfirmAppointmentView(View):
"""
Patient-facing appointment confirmation view.
Features:
- Public access (no login required)
- Token-based authentication
- Confirm or decline appointment
- Mobile-friendly interface
- Metadata tracking (IP, user agent)
"""
def get(self, request, token):
"""Display confirmation page."""
from .confirmation_service import ConfirmationService
# Get confirmation by token
confirmation = ConfirmationService.get_confirmation_by_token(token)
if not confirmation:
return self._render_error(
request,
_('Invalid Confirmation Link'),
_('This confirmation link is invalid or has expired. Please contact the clinic.')
)
# Check if already processed
if confirmation.status in ['CONFIRMED', 'DECLINED']:
return self._render_already_processed(request, confirmation)
# Check if expired
if confirmation.is_expired:
return self._render_error(
request,
_('Link Expired'),
_('This confirmation link has expired. Please contact the clinic to reschedule.')
)
# Render confirmation page
context = {
'confirmation': confirmation,
'appointment': confirmation.appointment,
'patient': confirmation.appointment.patient,
'token': token,
}
return render(request, 'appointments/confirm_appointment.html', context)
def post(self, request, token):
"""Process confirmation or decline."""
from .confirmation_service import ConfirmationService
# Get confirmation
confirmation = ConfirmationService.get_confirmation_by_token(token)
if not confirmation:
return JsonResponse({
'success': False,
'error': _('Invalid confirmation link')
}, status=400)
# Get action (confirm or decline)
action = request.POST.get('action')
if action == 'confirm':
# Confirm appointment
success, message = ConfirmationService.confirm_appointment(
confirmation=confirmation,
method='LINK',
ip_address=self._get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', '')
)
if success:
return self._render_success(request, confirmation, 'confirmed')
else:
return self._render_error(request, _('Confirmation Failed'), message)
elif action == 'decline':
# Get decline reason
reason = request.POST.get('reason', 'Patient declined')
# Decline appointment
success, message = ConfirmationService.decline_appointment(
confirmation=confirmation,
reason=reason,
ip_address=self._get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', '')
)
if success:
return self._render_success(request, confirmation, 'declined')
else:
return self._render_error(request, _('Decline Failed'), message)
else:
return JsonResponse({
'success': False,
'error': _('Invalid action')
}, status=400)
def _render_error(self, request, title, message):
"""Render error page."""
context = {
'error_title': title,
'error_message': message,
}
return render(request, 'appointments/confirmation_error.html', context)
def _render_success(self, request, confirmation, action):
"""Render success page."""
context = {
'confirmation': confirmation,
'appointment': confirmation.appointment,
'action': action,
}
return render(request, 'appointments/confirmation_success.html', context)
def _render_already_processed(self, request, confirmation):
"""Render already processed page."""
context = {
'confirmation': confirmation,
'appointment': confirmation.appointment,
'status': confirmation.status,
}
return render(request, 'appointments/confirmation_already_processed.html', context)
def _get_client_ip(self, request):
"""Get client IP address."""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
# ============================================================================
# Availability API Views
# ============================================================================
class AvailableSlotsView(LoginRequiredMixin, View):
"""
API endpoint to get available time slots for a provider on a specific date.
Features:
- Returns JSON list of available slots
- Checks provider schedule
- Excludes booked appointments
- Supports custom duration
"""
def get(self, request):
"""Get available slots."""
from datetime import datetime
from .availability_service import AvailabilityService
# Get parameters
provider_id = request.GET.get('provider')
date_str = request.GET.get('date')
duration = int(request.GET.get('duration', 30))
# Validate parameters
if not provider_id or not date_str:
return JsonResponse({
'success': False,
'error': _('Provider and date are required')
}, status=400)
try:
# Parse date
date = datetime.strptime(date_str, '%Y-%m-%d').date()
# Get available slots
slots = AvailabilityService.get_available_slots(
provider_id=provider_id,
date=date,
duration=duration
)
return JsonResponse({
'success': True,
'slots': slots,
'provider_id': provider_id,
'date': date_str,
'duration': duration
})
except ValueError as e:
return JsonResponse({
'success': False,
'error': _('Invalid date format: %(error)s') % {'error': str(e)}
}, status=400)
except Exception as e:
return JsonResponse({
'success': False,
'error': _('Error getting available slots: %(error)s') % {'error': str(e)}
}, status=500)
class AppointmentQuickViewView(LoginRequiredMixin, TenantFilterMixin, DetailView):
"""
Quick view for appointment details (used in calendar modal).
Returns a partial HTML template for AJAX loading.
"""
model = Appointment
template_name = 'appointments/appointment_quick_view.html'
context_object_name = 'appointment'
def get_queryset(self):
"""Filter by tenant."""
return super().get_queryset().select_related(
'patient', 'clinic', 'provider__user', 'room'
)
class AppointmentEventsView(LoginRequiredMixin, TenantFilterMixin, View):
"""
API endpoint for calendar events.
Returns appointments in FullCalendar-compatible JSON format.
"""
def get(self, request):
"""Get appointments as calendar events."""
from datetime import datetime, timedelta
# Get date range from query params
start_str = request.GET.get('start')
end_str = request.GET.get('end')
if not start_str or not end_str:
return JsonResponse({'error': _('start and end parameters are required')}, status=400)
# Parse dates
try:
start_date = datetime.fromisoformat(start_str.replace('Z', '+00:00')).date()
end_date = datetime.fromisoformat(end_str.replace('Z', '+00:00')).date()
except ValueError:
return JsonResponse({'error': _('Invalid date format')}, status=400)
# Get appointments in date range
queryset = Appointment.objects.filter(
tenant=request.user.tenant,
scheduled_date__gte=start_date,
scheduled_date__lte=end_date
).select_related('patient', 'provider__user', 'clinic', 'room')
# Apply filters
service = request.GET.get('service', '').strip()
if service:
queryset = queryset.filter(service_type__icontains=service)
provider_id = request.GET.get('provider', '').strip()
if provider_id:
queryset = queryset.filter(provider_id=provider_id)
status_filter = request.GET.get('status', '').strip()
if status_filter:
queryset = queryset.filter(status=status_filter)
# Build events list
events = []
for appointment in queryset:
# Calculate end time
start_datetime = datetime.combine(
appointment.scheduled_date,
appointment.scheduled_time
)
end_datetime = start_datetime + timedelta(minutes=appointment.duration)
events.append({
'id': str(appointment.id),
'patient_name': appointment.patient.full_name_en,
'service_name': appointment.service_type,
'provider_name': appointment.provider.user.get_full_name(),
'room_name': appointment.room.name if appointment.room else '',
'start_time': start_datetime.isoformat(),
'end_time': end_datetime.isoformat(),
'status': appointment.status,
})
return JsonResponse(events, safe=False)
class DeclineAppointmentView(View):
"""
Quick decline view (alternative to confirm page).
Features:
- Direct decline link
- Optional reason
- Confirmation message
"""
def get(self, request, token):
"""Display decline confirmation page."""
from .confirmation_service import ConfirmationService
# Get confirmation
confirmation = ConfirmationService.get_confirmation_by_token(token)
if not confirmation:
return render(request, 'appointments/confirmation_error.html', {
'error_title': _('Invalid Link'),
'error_message': _('This link is invalid or has expired.')
})
# Check if already processed
if confirmation.status in ['CONFIRMED', 'DECLINED']:
context = {
'confirmation': confirmation,
'appointment': confirmation.appointment,
'status': confirmation.status,
}
return render(request, 'appointments/confirmation_already_processed.html', context)
# Render decline form
context = {
'confirmation': confirmation,
'appointment': confirmation.appointment,
'token': token,
}
return render(request, 'appointments/decline_appointment.html', context)
def post(self, request, token):
"""Process decline."""
from .confirmation_service import ConfirmationService
# Get confirmation
confirmation = ConfirmationService.get_confirmation_by_token(token)
if not confirmation:
return JsonResponse({
'success': False,
'error': _('Invalid confirmation link')
}, status=400)
# Get reason
reason = request.POST.get('reason', _('Patient declined'))
# Decline appointment
success, message = ConfirmationService.decline_appointment(
confirmation=confirmation,
reason=reason,
ip_address=self._get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', '')
)
if success:
context = {
'confirmation': confirmation,
'appointment': confirmation.appointment,
'action': 'declined',
}
return render(request, 'appointments/confirmation_success.html', context)
else:
return render(request, 'appointments/confirmation_error.html', {
'error_title': _('Decline Failed'),
'error_message': message
})
def _get_client_ip(self, request):
"""Get client IP address."""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class AppointmentPDFView(LoginRequiredMixin, TenantFilterMixin, View):
"""
Generate PDF for appointment details.
Features:
- Appointment information
- Patient details
- Provider and clinic information
- Instructions from clinical documents if available
- Professional formatting with Arabic support
"""
def get(self, request, pk):
"""Generate and return PDF."""
from reportlab.lib.pagesizes import A4
from reportlab.lib.units import inch
from reportlab.lib import colors
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, Image
from reportlab.pdfgen import canvas
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.lib.enums import TA_CENTER, TA_RIGHT, TA_LEFT
from io import BytesIO
import os
from django.conf import settings
import arabic_reshaper
from bidi.algorithm import get_display
# Get appointment
appointment = get_object_or_404(
Appointment.objects.select_related(
'patient', 'provider__user', 'clinic', 'room', 'tenant'
),
pk=pk,
tenant=request.user.tenant
)
# Create PDF buffer
buffer = BytesIO()
# Create PDF document
doc = SimpleDocTemplate(
buffer,
pagesize=A4,
rightMargin=0.75*inch,
leftMargin=0.75*inch,
topMargin=1.5*inch,
bottomMargin=0.75*inch
)
# Container for PDF elements
elements = []
# Register Arabic font
try:
pdfmetrics.registerFont(TTFont('Arabic', '/System/Library/Fonts/SFArabic.ttf'))
ARABIC_FONT_AVAILABLE = True
except Exception as e:
ARABIC_FONT_AVAILABLE = False
# Styles
styles = getSampleStyleSheet()
# Helper function for Arabic text
def format_arabic(text):
"""Format Arabic text for proper display in PDF."""
if not text:
return ""
reshaped_text = arabic_reshaper.reshape(text)
return get_display(reshaped_text)
# Custom styles
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=18,
textColor=colors.HexColor('#0d6efd'),
spaceAfter=20,
alignment=TA_CENTER
)
heading_style = ParagraphStyle(
'CustomHeading',
parent=styles['Heading2'],
fontSize=14,
textColor=colors.HexColor('#212529'),
spaceAfter=12,
spaceBefore=12
)
normal_style = styles['Normal']
# Header with logo and tenant info
tenant = appointment.tenant
header_data = []
# Try to add logo if available from tenant settings
logo_path = None
try:
from core.models import TenantSetting, SettingTemplate
logo_setting = TenantSetting.objects.filter(
tenant=tenant,
template__key='basic_logo'
).first()
if logo_setting and logo_setting.file_value:
logo_path = os.path.join(settings.MEDIA_ROOT, str(logo_setting.file_value))
if os.path.exists(logo_path):
logo = Image(logo_path, width=0.8*inch, height=0.8*inch)
logo.hAlign = 'LEFT'
else:
logo_path = None
except Exception as e:
# If logo retrieval fails, continue without logo
logo_path = None
# Create header table
if logo_path:
tenant_info_html = f'<b>{tenant.name}</b><br/>'
if tenant.name_ar and ARABIC_FONT_AVAILABLE:
tenant_info_html += f'<font name="Arabic" size=11>{format_arabic(tenant.name_ar)}</font><br/>'
header_data = [[logo, Paragraph(tenant_info_html, ParagraphStyle(
'TenantInfo',
parent=styles['Normal'],
fontSize=12,
alignment=TA_CENTER
))]]
header_table = Table(header_data, colWidths=[2*inch, 4*inch])
header_table.setStyle(TableStyle([
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('ALIGN', (0, 0), (0, 0), 'LEFT'),
('ALIGN', (1, 0), (1, 0), 'CENTER'),
]))
elements.append(header_table)
else:
# No logo, just tenant name
tenant_name_html = f'<b>{tenant.name}</b><br/>'
if tenant.name_ar and ARABIC_FONT_AVAILABLE:
tenant_name_html += f'<font name="Arabic" size=14>{format_arabic(tenant.name_ar)}</font>'
tenant_name = Paragraph(tenant_name_html,
ParagraphStyle('TenantName', parent=styles['Heading1'], fontSize=16, alignment=TA_CENTER))
elements.append(tenant_name)
elements.append(Spacer(1, 0.15*inch))
# Title
title_html = f"Appointment Details - {appointment.appointment_number}<br/>"
if ARABIC_FONT_AVAILABLE:
title_html += f'<font name="Arabic" size=16>{format_arabic("تفاصيل الموعد")}</font>'
title = Paragraph(title_html, title_style)
elements.append(title)
elements.append(Spacer(1, 0.15*inch))
# Appointment Information Section
heading_html = "Appointment Information / "
if ARABIC_FONT_AVAILABLE:
heading_html += f'<font name="Arabic" size=12>{format_arabic("معلومات الموعد")}</font>'
elements.append(Paragraph(heading_html, heading_style))
# Build appointment data with Arabic font support using Paragraphs
appointment_data = []
cell_style = ParagraphStyle('Cell', parent=styles['Normal'], fontSize=10)
label_style = ParagraphStyle('Label', parent=styles['Normal'], fontSize=10, fontName='Helvetica-Bold')
label_html = "Appointment Number"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("رقم الموعد")}</font>'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(appointment.appointment_number, cell_style)
])
label_html = "Status"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("الحالة")}</font>'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(appointment.get_status_display(), cell_style)
])
label_html = "Service Type"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("نوع الخدمة")}</font>'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(appointment.service_type, cell_style)
])
label_html = "Date"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("التاريخ")}</font>'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(appointment.scheduled_date.strftime('%A, %B %d, %Y'), cell_style)
])
label_html = "Time"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("الوقت")}</font>'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(f"{appointment.scheduled_time.strftime('%H:%M')} ({appointment.duration} minutes)", cell_style)
])
label_html = "Clinic"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("العيادة")}</font>'
clinic_value = appointment.clinic.name_en
if appointment.clinic.name_ar and ARABIC_FONT_AVAILABLE:
clinic_value += f' / <font name="Arabic" size=9>{format_arabic(appointment.clinic.name_ar)}</font>'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(clinic_value, cell_style)
])
label_html = "Room"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("الغرفة")}</font>'
room_value = f"{appointment.room.room_number} - {appointment.room.name}" if appointment.room else 'Not assigned'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(room_value, cell_style)
])
label_html = "Provider"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("مقدم الخدمة")}</font>'
appointment_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(f"{appointment.provider.user.get_full_name()} ({appointment.provider.user.get_role_display()})", cell_style)
])
appointment_table = Table(appointment_data, colWidths=[2.5*inch, 3.5*inch])
appointment_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (0, -1), colors.HexColor('#f8f9fa')),
('TEXTCOLOR', (0, 0), (-1, -1), colors.black),
('ALIGN', (0, 0), (0, -1), 'RIGHT'),
('ALIGN', (1, 0), (1, -1), 'LEFT'),
('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'),
('FONTNAME', (1, 0), (1, -1), 'Helvetica'),
('FONTSIZE', (0, 0), (-1, -1), 10),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('LEFTPADDING', (0, 0), (-1, -1), 8),
('RIGHTPADDING', (0, 0), (-1, -1), 8),
('TOPPADDING', (0, 0), (-1, -1), 6),
('BOTTOMPADDING', (0, 0), (-1, -1), 6),
]))
elements.append(appointment_table)
elements.append(Spacer(1, 0.3*inch))
# Patient Information Section
heading_html = "Patient Information / "
if ARABIC_FONT_AVAILABLE:
heading_html += f'<font name="Arabic" size=12>{format_arabic("معلومات المريض")}</font>'
elements.append(Paragraph(heading_html, heading_style))
patient = appointment.patient
patient_name_ar = f"{patient.first_name_ar} {patient.last_name_ar}" if patient.first_name_ar and patient.last_name_ar else ""
# Build patient data with Arabic font support using Paragraphs
patient_data = []
label_html = "Name"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("الاسم")}</font>'
patient_value = f"{patient.first_name_en} {patient.last_name_en}"
if patient_name_ar and ARABIC_FONT_AVAILABLE:
patient_value += f' / <font name="Arabic" size=9>{format_arabic(patient_name_ar)}</font>'
patient_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(patient_value, cell_style)
])
label_html = "MRN"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("رقم السجل الطبي")}</font>'
patient_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(patient.mrn, cell_style)
])
label_html = "Date of Birth"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("تاريخ الميلاد")}</font>'
patient_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(patient.date_of_birth.strftime('%Y-%m-%d'), cell_style)
])
label_html = "Gender"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("الجنس")}</font>'
patient_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(patient.get_sex_display(), cell_style)
])
label_html = "Phone"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("الهاتف")}</font>'
patient_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(str(patient.phone), cell_style)
])
label_html = "Email"
if ARABIC_FONT_AVAILABLE:
label_html += f' / <font name="Arabic" size=9>{format_arabic("البريد الإلكتروني")}</font>'
patient_data.append([
Paragraph(label_html + ':', label_style),
Paragraph(patient.email if patient.email else 'Not provided', cell_style)
])
patient_table = Table(patient_data, colWidths=[2.5*inch, 3.5*inch])
patient_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (0, -1), colors.HexColor('#f8f9fa')),
('TEXTCOLOR', (0, 0), (-1, -1), colors.black),
('ALIGN', (0, 0), (0, -1), 'RIGHT'),
('ALIGN', (1, 0), (1, -1), 'LEFT'),
('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'),
('FONTNAME', (1, 0), (1, -1), 'Helvetica'),
('FONTSIZE', (0, 0), (-1, -1), 10),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('LEFTPADDING', (0, 0), (-1, -1), 8),
('RIGHTPADDING', (0, 0), (-1, -1), 8),
('TOPPADDING', (0, 0), (-1, -1), 6),
('BOTTOMPADDING', (0, 0), (-1, -1), 6),
]))
elements.append(patient_table)
elements.append(Spacer(1, 0.3*inch))
# Notes Section (if available)
if appointment.notes:
heading_html = "Notes / "
if ARABIC_FONT_AVAILABLE:
heading_html += f'<font name="Arabic" size=12>{format_arabic("ملاحظات")}</font>'
elements.append(Paragraph(heading_html, heading_style))
notes_text = Paragraph(appointment.notes, normal_style)
elements.append(notes_text)
elements.append(Spacer(1, 0.2*inch))
# Instructions Section (from clinical documents)
instructions = self._get_clinical_instructions(appointment)
if instructions:
heading_html = "Clinical Instructions / "
if ARABIC_FONT_AVAILABLE:
heading_html += f'<font name="Arabic" size=12>{format_arabic("التعليمات السريرية")}</font>'
elements.append(Paragraph(heading_html, heading_style))
for instruction in instructions:
instruction_text = Paragraph(f"{instruction}", normal_style)
elements.append(instruction_text)
elements.append(Spacer(1, 0.1*inch))
# Footer with generation info
elements.append(Spacer(1, 0.5*inch))
footer_text = f"Generated on: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}"
footer = Paragraph(footer_text, ParagraphStyle(
'Footer',
parent=styles['Normal'],
fontSize=8,
textColor=colors.grey,
alignment=1
))
elements.append(footer)
# Build PDF
doc.build(elements)
# Get PDF value
pdf = buffer.getvalue()
buffer.close()
# Create response
response = HttpResponse(content_type='application/pdf')
# Check if view parameter is set to inline
view_mode = request.GET.get('view', 'download')
if view_mode == 'inline':
response['Content-Disposition'] = f'inline; filename="appointment_{appointment.appointment_number}.pdf"'
else:
response['Content-Disposition'] = f'attachment; filename="appointment_{appointment.appointment_number}.pdf"'
response.write(pdf)
return response
def _get_clinical_instructions(self, appointment):
"""Extract instructions from clinical documents."""
instructions = []
# Import models
try:
from nursing.models import NursingEncounter
from medical.models import MedicalConsultation, MedicalFollowUp
from aba.models import ABAConsult
from ot.models import OTConsult, OTSession
from slp.models import SLPConsult, SLPAssessment, SLPIntervention
# Check nursing encounter
nursing = NursingEncounter.objects.filter(appointment=appointment).first()
if nursing and hasattr(nursing, 'instructions') and nursing.instructions:
instructions.append(f"Nursing: {nursing.instructions}")
# Check medical consultation
medical_consult = MedicalConsultation.objects.filter(appointment=appointment).first()
if medical_consult and hasattr(medical_consult, 'instructions') and medical_consult.instructions:
instructions.append(f"Medical Consultation: {medical_consult.instructions}")
# Check medical follow-up
medical_followup = MedicalFollowUp.objects.filter(appointment=appointment).first()
if medical_followup and hasattr(medical_followup, 'instructions') and medical_followup.instructions:
instructions.append(f"Medical Follow-up: {medical_followup.instructions}")
# Check ABA
aba = ABAConsult.objects.filter(appointment=appointment).first()
if aba and hasattr(aba, 'recommendations') and aba.recommendations:
instructions.append(f"ABA Recommendations: {aba.recommendations}")
# Check OT consultation
ot_consult = OTConsult.objects.filter(appointment=appointment).first()
if ot_consult and hasattr(ot_consult, 'recommendations') and ot_consult.recommendations:
instructions.append(f"OT Recommendations: {ot_consult.recommendations}")
# Check OT session
ot_session = OTSession.objects.filter(appointment=appointment).first()
if ot_session and hasattr(ot_session, 'home_program') and ot_session.home_program:
instructions.append(f"OT Home Program: {ot_session.home_program}")
# Check SLP consultation
slp_consult = SLPConsult.objects.filter(appointment=appointment).first()
if slp_consult and hasattr(slp_consult, 'recommendations') and slp_consult.recommendations:
instructions.append(f"SLP Recommendations: {slp_consult.recommendations}")
# Check SLP assessment
slp_assessment = SLPAssessment.objects.filter(appointment=appointment).first()
if slp_assessment and hasattr(slp_assessment, 'recommendations') and slp_assessment.recommendations:
instructions.append(f"SLP Assessment Recommendations: {slp_assessment.recommendations}")
# Check SLP intervention
slp_intervention = SLPIntervention.objects.filter(appointment=appointment).first()
if slp_intervention and hasattr(slp_intervention, 'home_program') and slp_intervention.home_program:
instructions.append(f"SLP Home Program: {slp_intervention.home_program}")
except Exception as e:
# If any model doesn't exist or has issues, just skip it
pass
return instructions
class AppointmentEmailPDFView(LoginRequiredMixin, TenantFilterMixin, View):
"""
Email appointment PDF to patient.
Features:
- Generate PDF
- Send via email with optional custom message
- Uses existing email infrastructure
"""
def post(self, request, pk):
"""Send appointment PDF via email."""
from django.core.mail import EmailMessage
from django.template.loader import render_to_string
from io import BytesIO
# Get appointment
appointment = get_object_or_404(
Appointment.objects.select_related(
'patient', 'provider__user', 'clinic', 'room', 'tenant'
),
pk=pk,
tenant=request.user.tenant
)
# Get email address and message from form
email_address = request.POST.get('email_address', '').strip()
custom_message = request.POST.get('email_message', '').strip()
# Validate email
if not email_address:
messages.error(request, _('Email address is required.'))
return redirect('appointments:appointment_detail', pk=pk)
try:
# Generate PDF using the same logic as AppointmentPDFView
pdf_view = AppointmentPDFView()
pdf_view.request = request
# Create a mock request with GET parameters to generate PDF
from django.test import RequestFactory
factory = RequestFactory()
pdf_request = factory.get(f'/appointments/{pk}/pdf/')
pdf_request.user = request.user
# Generate PDF
pdf_response = pdf_view.get(pdf_request, pk)
pdf_content = pdf_response.content
# Prepare email subject
subject = f"Appointment Details - {appointment.appointment_number}"
# Prepare email body
context = {
'appointment': appointment,
'patient': appointment.patient,
'custom_message': custom_message,
'tenant': appointment.tenant,
}
# Create email body (plain text)
email_body = f"""
Dear {appointment.patient.first_name_en} {appointment.patient.last_name_en},
Please find attached the details for your appointment.
Appointment Number: {appointment.appointment_number}
Date: {appointment.scheduled_date.strftime('%A, %B %d, %Y')}
Time: {appointment.scheduled_time.strftime('%H:%M')}
Clinic: {appointment.clinic.name_en}
Provider: {appointment.provider.user.get_full_name()}
"""
if custom_message:
email_body += f"\n{custom_message}\n\n"
email_body += f"""
Best regards,
{appointment.tenant.name}
"""
# Create email
email = EmailMessage(
subject=subject,
body=email_body,
from_email=None, # Will use DEFAULT_FROM_EMAIL from settings
to=[email_address],
)
# Attach PDF
email.attach(
f'appointment_{appointment.appointment_number}.pdf',
pdf_content,
'application/pdf'
)
# Send email
email.send(fail_silently=False)
messages.success(
request,
_('Appointment PDF has been sent to %(email)s successfully!') % {'email': email_address}
)
except Exception as e:
messages.error(
request,
_('Failed to send email: %(error)s') % {'error': str(e)}
)
return redirect('appointments:appointment_detail', pk=pk)