agdar/appointments/package_integration_service.py
Marwan Alwali 7e014ee160 update
2025-11-16 14:56:32 +03:00

365 lines
14 KiB
Python

"""
Package Integration Service for Appointments.
This service integrates the appointments app with the finance package system,
enabling automatic scheduling of appointments when a patient purchases a package.
"""
from datetime import datetime, date, time, timedelta
from typing import List, Dict, Optional, Tuple
from django.db import transaction
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from .models import Appointment, Provider
from .availability_service import AvailabilityService
class PackageIntegrationService:
"""Service for integrating appointments with finance packages."""
@staticmethod
def schedule_package_appointments(
package_purchase,
provider_id: str,
start_date: date,
end_date: Optional[date] = None,
preferred_days: Optional[List[int]] = None,
use_multiple_providers: bool = False,
provider_assignments: Optional[Dict[int, str]] = None,
auto_schedule: bool = True
) -> Tuple[List[Appointment], List[str]]:
"""
Schedule all appointments for a purchased package.
Args:
package_purchase: finance.PackagePurchase object
provider_id: Default provider ID (used if not using multiple providers)
start_date: Preferred start date for scheduling
end_date: Target end date (optional, defaults to package expiry)
preferred_days: List of preferred day numbers (0=Sunday, 6=Saturday)
use_multiple_providers: Whether to use different providers for sessions
provider_assignments: Dict mapping session numbers to provider IDs
auto_schedule: Whether to automatically schedule all sessions
Returns:
Tuple of (list of created appointments, list of error messages)
"""
appointments = []
errors = []
if not auto_schedule:
return appointments, ["Auto-scheduling is disabled"]
# Get package details
total_sessions = package_purchase.total_sessions
sessions_to_schedule = total_sessions - package_purchase.sessions_used
if sessions_to_schedule <= 0:
return appointments, ["No sessions remaining in package"]
# Set end date to package expiry if not provided
if not end_date:
end_date = package_purchase.expiry_date
# Get default provider
try:
default_provider = Provider.objects.get(id=provider_id)
except Provider.DoesNotExist:
return appointments, ["Default provider not found"]
# Get clinic from package services
clinic = PackageIntegrationService._get_clinic_from_package(package_purchase)
if not clinic:
return appointments, ["Could not determine clinic from package"]
# Get service type from package
service_type = PackageIntegrationService._get_service_type_from_package(package_purchase)
# Get duration from package services
duration = PackageIntegrationService._get_duration_from_package(package_purchase)
# Schedule each session
current_date = max(start_date, date.today())
for session_num in range(1, sessions_to_schedule + 1):
# Get provider for this session
if use_multiple_providers and provider_assignments and session_num in provider_assignments:
try:
provider = Provider.objects.get(id=provider_assignments[session_num])
except Provider.DoesNotExist:
errors.append(f"Session {session_num}: Provider not found")
continue
else:
provider = default_provider
# Try to schedule this session
appointment, error = PackageIntegrationService._schedule_single_appointment(
package_purchase=package_purchase,
session_number=session_num + package_purchase.sessions_used,
provider=provider,
clinic=clinic,
service_type=service_type,
duration=duration,
start_date=current_date,
end_date=end_date,
preferred_days=preferred_days or []
)
if appointment:
appointments.append(appointment)
# Update current_date to after this appointment
current_date = appointment.scheduled_date + timedelta(days=1)
else:
errors.append(f"Session {session_num}: {error}")
return appointments, errors
@staticmethod
def _schedule_single_appointment(
package_purchase,
session_number: int,
provider: Provider,
clinic,
service_type: str,
duration: int,
start_date: date,
end_date: date,
preferred_days: List[int]
) -> Tuple[Optional[Appointment], Optional[str]]:
"""
Schedule a single appointment within the date range.
Returns:
Tuple of (Appointment object or None, error message or None)
"""
current_date = start_date
max_attempts = 90 # Try up to 90 days
attempts = 0
while current_date <= end_date and attempts < max_attempts:
# Check if this day matches preferred days
day_of_week = (current_date.weekday() + 1) % 7 # Convert to Sunday=0
if not preferred_days or day_of_week in preferred_days:
# Get available slots for this provider on this date
available_slots = AvailabilityService.get_available_slots(
provider_id=str(provider.id),
date=current_date,
duration=duration
)
if available_slots:
# Take the first available slot
first_slot = available_slots[0]
slot_time = datetime.strptime(first_slot['time'], '%H:%M').time()
# Create the appointment
try:
appointment = PackageIntegrationService._create_appointment(
package_purchase=package_purchase,
session_number=session_number,
provider=provider,
clinic=clinic,
service_type=service_type,
scheduled_date=current_date,
scheduled_time=slot_time,
duration=duration
)
return appointment, None
except Exception as e:
return None, f"Failed to create appointment: {str(e)}"
# Move to next day
current_date += timedelta(days=1)
attempts += 1
# No slot found
return None, f"No available slots found within {max_attempts} days"
@staticmethod
def _create_appointment(
package_purchase,
session_number: int,
provider: Provider,
clinic,
service_type: str,
scheduled_date: date,
scheduled_time: time,
duration: int
) -> Appointment:
"""
Create an appointment for a package session.
Args:
package_purchase: finance.PackagePurchase object
session_number: Session number within package
provider: Provider object
clinic: Clinic object
service_type: Service type
scheduled_date: Date for appointment
scheduled_time: Time for appointment
duration: Duration in minutes
Returns:
Created Appointment object
"""
# Generate appointment number
appointment_number = PackageIntegrationService._generate_appointment_number(
package_purchase.tenant
)
# Create appointment
appointment = Appointment.objects.create(
tenant=package_purchase.tenant,
appointment_number=appointment_number,
patient=package_purchase.patient,
clinic=clinic,
provider=provider,
service_type=service_type,
scheduled_date=scheduled_date,
scheduled_time=scheduled_time,
duration=duration,
status='BOOKED',
package_purchase=package_purchase,
session_number_in_package=session_number,
notes=f"Package: {package_purchase.package.name_en}, Session {session_number}/{package_purchase.total_sessions}"
)
return appointment
@staticmethod
def _generate_appointment_number(tenant) -> str:
"""Generate unique appointment number."""
from django.db.models import Max
# Get the latest appointment number for this tenant
latest = Appointment.objects.filter(
tenant=tenant
).aggregate(Max('appointment_number'))['appointment_number__max']
if latest:
# Extract number and increment
try:
num = int(latest.split('-')[-1])
new_num = num + 1
except (ValueError, IndexError):
new_num = 1
else:
new_num = 1
# Format: APT-YYYYMMDD-NNNN
today = date.today()
return f"APT-{today.strftime('%Y%m%d')}-{new_num:04d}"
@staticmethod
def _get_clinic_from_package(package_purchase):
"""Get clinic from package services."""
# Get the first service in the package to determine clinic
package_service = package_purchase.package.packageservice_set.first()
if package_service and package_service.service:
return package_service.service.clinic
return None
@staticmethod
def _get_service_type_from_package(package_purchase):
"""Get service type from package."""
# Use package name as service type
return package_purchase.package.name_en
@staticmethod
def _get_duration_from_package(package_purchase):
"""Get session duration from package services."""
# Get the first service to determine duration
package_service = package_purchase.package.packageservice_set.first()
if package_service and package_service.service:
return package_service.service.duration
return 30 # Default 30 minutes
@staticmethod
def increment_package_usage(appointment: Appointment):
"""
Increment package usage when appointment is completed.
Args:
appointment: Appointment object
"""
if appointment.package_purchase and appointment.status == 'COMPLETED':
package_purchase = appointment.package_purchase
package_purchase.sessions_used += 1
# Update status if all sessions used
if package_purchase.sessions_used >= package_purchase.total_sessions:
package_purchase.status = 'COMPLETED'
package_purchase.save()
@staticmethod
def get_available_packages_for_patient(patient, clinic=None):
"""
Get available packages for a patient.
Args:
patient: Patient object
clinic: Optional clinic filter
Returns:
QuerySet of PackagePurchase objects
"""
from finance.models import PackagePurchase
from django.db.models import F
queryset = PackagePurchase.objects.filter(
patient=patient,
status='ACTIVE'
).select_related('package').filter(
sessions_used__lt=F('total_sessions')
)
# Filter by clinic if provided
if clinic:
queryset = queryset.filter(
package__packageservice__service__clinic=clinic
).distinct()
return queryset
@staticmethod
def get_package_progress(package_purchase):
"""
Get progress information for a package purchase.
Args:
package_purchase: PackagePurchase object
Returns:
Dictionary with progress information
"""
appointments = Appointment.objects.filter(
package_purchase=package_purchase
).order_by('session_number_in_package')
return {
'total_sessions': package_purchase.total_sessions,
'sessions_used': package_purchase.sessions_used,
'sessions_remaining': package_purchase.sessions_remaining,
'scheduled_appointments': appointments.filter(
status__in=['BOOKED', 'CONFIRMED', 'ARRIVED', 'IN_PROGRESS']
).count(),
'completed_appointments': appointments.filter(status='COMPLETED').count(),
'cancelled_appointments': appointments.filter(status='CANCELLED').count(),
'appointments': [
{
'session_number': appt.session_number_in_package,
'appointment_number': appt.appointment_number,
'scheduled_date': appt.scheduled_date.isoformat(),
'scheduled_time': appt.scheduled_time.strftime('%H:%M'),
'provider': appt.provider.user.get_full_name(),
'status': appt.get_status_display(),
}
for appt in appointments
]
}