684 lines
23 KiB
Python
684 lines
23 KiB
Python
"""
|
|
Core business logic services.
|
|
|
|
This module contains service classes that encapsulate business logic
|
|
for core operations like consent management, patient services, etc.
|
|
"""
|
|
|
|
import logging
|
|
from typing import List, Optional, Tuple
|
|
|
|
from django.db import transaction
|
|
from django.utils import timezone
|
|
|
|
from core.models import Consent, Patient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ConsentService:
|
|
"""Service class for consent management operations."""
|
|
|
|
# Service type consent requirements configuration
|
|
SERVICE_CONSENT_REQUIREMENTS = {
|
|
'MEDICAL': {
|
|
'requires_specific': False,
|
|
'requires_photo_video': False,
|
|
'description': 'General medical consultations and examinations',
|
|
},
|
|
'ABA': {
|
|
'requires_specific': True,
|
|
'requires_photo_video': True,
|
|
'description': 'Applied Behavior Analysis therapy (often involves video recording)',
|
|
},
|
|
'OT': {
|
|
'requires_specific': True,
|
|
'requires_photo_video': False,
|
|
'description': 'Occupational Therapy services',
|
|
},
|
|
'SLP': {
|
|
'requires_specific': True,
|
|
'requires_photo_video': False,
|
|
'description': 'Speech-Language Pathology services',
|
|
},
|
|
'NURSING': {
|
|
'requires_specific': False,
|
|
'requires_photo_video': False,
|
|
'description': 'Nursing care and procedures',
|
|
},
|
|
'SURGERY': {
|
|
'requires_specific': True,
|
|
'requires_photo_video': False,
|
|
'description': 'Surgical procedures',
|
|
},
|
|
'PROCEDURE': {
|
|
'requires_specific': True,
|
|
'requires_photo_video': False,
|
|
'description': 'Medical procedures',
|
|
},
|
|
'ANESTHESIA': {
|
|
'requires_specific': True,
|
|
'requires_photo_video': False,
|
|
'description': 'Anesthesia administration',
|
|
},
|
|
'BLOOD_TRANSFUSION': {
|
|
'requires_specific': True,
|
|
'requires_photo_video': False,
|
|
'description': 'Blood transfusion procedures',
|
|
},
|
|
'EXPERIMENTAL_TREATMENT': {
|
|
'requires_specific': True,
|
|
'requires_photo_video': False,
|
|
'description': 'Experimental or research treatments',
|
|
},
|
|
'BEHAVIORAL_THERAPY': {
|
|
'requires_specific': True,
|
|
'requires_photo_video': True,
|
|
'description': 'Behavioral therapy services',
|
|
},
|
|
'RESEARCH': {
|
|
'requires_specific': True,
|
|
'requires_photo_video': True,
|
|
'description': 'Research participation',
|
|
},
|
|
'PHYSIOTHERAPY': {
|
|
'requires_specific': True,
|
|
'requires_photo_video': False,
|
|
'description': 'Physiotherapy services',
|
|
},
|
|
'PSYCHOLOGY': {
|
|
'requires_specific': True,
|
|
'requires_photo_video': False,
|
|
'description': 'Psychology and counseling services',
|
|
},
|
|
'NUTRITION': {
|
|
'requires_specific': False,
|
|
'requires_photo_video': False,
|
|
'description': 'Nutrition counseling and dietary services',
|
|
},
|
|
}
|
|
|
|
@staticmethod
|
|
def verify_consent_for_service(patient: Patient, service_type: str) -> Tuple[bool, str]:
|
|
"""
|
|
Check if patient has signed consent for service.
|
|
|
|
Args:
|
|
patient: Patient instance
|
|
service_type: Service type to check consent for
|
|
|
|
Returns:
|
|
Tuple[bool, str]: (has_consent, message)
|
|
"""
|
|
try:
|
|
# Check for general treatment consent (required for all services)
|
|
general_consent = Consent.objects.filter(
|
|
patient=patient,
|
|
consent_type='GENERAL_TREATMENT',
|
|
is_active=True,
|
|
signed_at__isnull=False
|
|
).exists()
|
|
|
|
if not general_consent:
|
|
return False, "General treatment consent required. Please sign consent form."
|
|
|
|
# Check for service-specific consent if needed
|
|
service_specific_required = ConsentService._requires_service_specific_consent(
|
|
service_type
|
|
)
|
|
|
|
if service_specific_required:
|
|
service_consent = Consent.objects.filter(
|
|
patient=patient,
|
|
consent_type='SERVICE_SPECIFIC',
|
|
is_active=True,
|
|
signed_at__isnull=False,
|
|
content_text__icontains=service_type
|
|
).exists()
|
|
|
|
if not service_consent:
|
|
return False, f"Service-specific consent required for {service_type}"
|
|
|
|
# Check for photo/video consent if service involves recording
|
|
if ConsentService._requires_photo_video_consent(service_type):
|
|
photo_consent = Consent.objects.filter(
|
|
patient=patient,
|
|
consent_type='PHOTO_VIDEO',
|
|
is_active=True,
|
|
signed_at__isnull=False
|
|
).exists()
|
|
|
|
if not photo_consent:
|
|
return False, "Photo/Video consent required for this service"
|
|
|
|
logger.info(f"Consent verified for patient {patient.mrn} for service {service_type}")
|
|
return True, "All required consents verified"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error verifying consent for patient {patient.mrn}: {e}")
|
|
return False, f"Error verifying consent: {str(e)}"
|
|
|
|
@staticmethod
|
|
def _requires_service_specific_consent(service_type: str) -> bool:
|
|
"""
|
|
Check if service requires service-specific consent.
|
|
|
|
Args:
|
|
service_type: Service type to check
|
|
|
|
Returns:
|
|
bool: True if service-specific consent required
|
|
"""
|
|
config = ConsentService.SERVICE_CONSENT_REQUIREMENTS.get(service_type.upper(), {})
|
|
return config.get('requires_specific', False)
|
|
|
|
@staticmethod
|
|
def _requires_photo_video_consent(service_type: str) -> bool:
|
|
"""
|
|
Check if service requires photo/video consent.
|
|
|
|
Args:
|
|
service_type: Service type to check
|
|
|
|
Returns:
|
|
bool: True if photo/video consent required
|
|
"""
|
|
config = ConsentService.SERVICE_CONSENT_REQUIREMENTS.get(service_type.upper(), {})
|
|
return config.get('requires_photo_video', False)
|
|
|
|
@staticmethod
|
|
def get_missing_consents(patient: Patient, service_type: str = None) -> List[str]:
|
|
"""
|
|
Get list of missing consent types for patient.
|
|
|
|
Args:
|
|
patient: Patient instance
|
|
service_type: Optional service type to check specific requirements
|
|
|
|
Returns:
|
|
List[str]: List of missing consent types
|
|
"""
|
|
missing = []
|
|
|
|
# Check general treatment consent
|
|
general_consent = Consent.objects.filter(
|
|
patient=patient,
|
|
consent_type='GENERAL_TREATMENT',
|
|
is_active=True,
|
|
signed_at__isnull=False
|
|
).exists()
|
|
|
|
if not general_consent:
|
|
missing.append('GENERAL_TREATMENT')
|
|
|
|
# Check service-specific if applicable
|
|
if service_type and ConsentService._requires_service_specific_consent(service_type):
|
|
service_consent = Consent.objects.filter(
|
|
patient=patient,
|
|
consent_type='SERVICE_SPECIFIC',
|
|
is_active=True,
|
|
signed_at__isnull=False,
|
|
content_text__icontains=service_type
|
|
).exists()
|
|
|
|
if not service_consent:
|
|
missing.append('SERVICE_SPECIFIC')
|
|
|
|
# Check photo/video if applicable
|
|
if service_type and ConsentService._requires_photo_video_consent(service_type):
|
|
photo_consent = Consent.objects.filter(
|
|
patient=patient,
|
|
consent_type='PHOTO_VIDEO',
|
|
is_active=True,
|
|
signed_at__isnull=False
|
|
).exists()
|
|
|
|
if not photo_consent:
|
|
missing.append('PHOTO_VIDEO')
|
|
|
|
return missing
|
|
|
|
@staticmethod
|
|
def get_active_consents(patient: Patient) -> List[Consent]:
|
|
"""
|
|
Get all active consents for patient.
|
|
|
|
Args:
|
|
patient: Patient instance
|
|
|
|
Returns:
|
|
List[Consent]: List of active consents
|
|
"""
|
|
return list(Consent.objects.filter(
|
|
patient=patient,
|
|
is_active=True,
|
|
signed_at__isnull=False
|
|
).order_by('-signed_at'))
|
|
|
|
@staticmethod
|
|
@transaction.atomic
|
|
def sign_consent(
|
|
consent: Consent,
|
|
signed_by_name: str,
|
|
signed_by_relationship: str,
|
|
signature_method: str,
|
|
signature_image=None,
|
|
signed_ip: str = None,
|
|
signed_user_agent: str = None
|
|
) -> Consent:
|
|
"""
|
|
Sign a consent form.
|
|
|
|
Args:
|
|
consent: Consent instance to sign
|
|
signed_by_name: Name of person signing
|
|
signed_by_relationship: Relationship to patient
|
|
signature_method: Method used for signature
|
|
signature_image: Optional signature image
|
|
signed_ip: IP address of signer
|
|
signed_user_agent: User agent of signer
|
|
|
|
Returns:
|
|
Consent: Signed consent instance
|
|
"""
|
|
consent.signed_by_name = signed_by_name
|
|
consent.signed_by_relationship = signed_by_relationship
|
|
consent.signed_at = timezone.now()
|
|
consent.signature_method = signature_method
|
|
|
|
if signature_image:
|
|
consent.signature_image = signature_image
|
|
|
|
if signed_ip:
|
|
consent.signed_ip = signed_ip
|
|
|
|
if signed_user_agent:
|
|
consent.signed_user_agent = signed_user_agent
|
|
|
|
consent.save()
|
|
|
|
logger.info(
|
|
f"Consent signed: {consent.id} ({consent.consent_type}) "
|
|
f"for patient {consent.patient.mrn} by {signed_by_name}"
|
|
)
|
|
|
|
return consent
|
|
|
|
@staticmethod
|
|
@transaction.atomic
|
|
def create_consent(
|
|
patient: Patient,
|
|
consent_type: str,
|
|
content_text: str,
|
|
**kwargs
|
|
) -> Consent:
|
|
"""
|
|
Create a new consent form for patient.
|
|
|
|
Args:
|
|
patient: Patient instance
|
|
consent_type: Type of consent
|
|
content_text: Consent form text
|
|
**kwargs: Additional consent fields
|
|
|
|
Returns:
|
|
Consent: Created consent instance
|
|
"""
|
|
consent = Consent.objects.create(
|
|
tenant=patient.tenant,
|
|
patient=patient,
|
|
consent_type=consent_type,
|
|
content_text=content_text,
|
|
is_active=True,
|
|
**kwargs
|
|
)
|
|
|
|
logger.info(
|
|
f"Consent created: {consent.id} ({consent_type}) for patient {patient.mrn}"
|
|
)
|
|
|
|
return consent
|
|
|
|
|
|
class ConsentEmailService:
|
|
"""Service for email-based consent signing."""
|
|
|
|
@staticmethod
|
|
@transaction.atomic
|
|
def send_consent_for_signing(
|
|
consent,
|
|
email: str,
|
|
sent_by,
|
|
expiry_hours: int = 72
|
|
):
|
|
"""
|
|
Send consent form to parent/guardian for signing.
|
|
|
|
Args:
|
|
consent: Consent instance to send
|
|
email: Email address to send to
|
|
sent_by: User sending the consent
|
|
expiry_hours: Hours until token expires (default 72)
|
|
|
|
Returns:
|
|
ConsentToken: Created token instance
|
|
"""
|
|
from datetime import timedelta
|
|
from core.models import ConsentToken
|
|
|
|
# Generate secure token
|
|
token_string = ConsentToken.generate_token()
|
|
|
|
# Calculate expiry
|
|
expires_at = timezone.now() + timedelta(hours=expiry_hours)
|
|
|
|
# Create token
|
|
token = ConsentToken.objects.create(
|
|
consent=consent,
|
|
token=token_string,
|
|
email=email,
|
|
expires_at=expires_at,
|
|
sent_by=sent_by
|
|
)
|
|
|
|
# Send email
|
|
ConsentEmailService._send_consent_email(token)
|
|
|
|
logger.info(
|
|
f"Consent signing link sent to {email} for consent {consent.id}"
|
|
)
|
|
|
|
return token
|
|
|
|
@staticmethod
|
|
def _send_consent_email(token):
|
|
"""Send email with consent signing link using tenant-specific settings."""
|
|
from django.core.mail import send_mail, get_connection
|
|
from django.template.loader import render_to_string
|
|
from django.conf import settings
|
|
from django.urls import reverse
|
|
from core.settings_service import get_tenant_settings_service
|
|
|
|
# Get tenant-specific email settings
|
|
settings_service = get_tenant_settings_service(token.consent.tenant)
|
|
email_config = settings_service.get_email_configuration()
|
|
|
|
# Build signing URL
|
|
signing_url = settings.SITE_URL + reverse(
|
|
'core:consent_sign_public',
|
|
kwargs={'token': token.token}
|
|
)
|
|
|
|
# Prepare email context
|
|
context = {
|
|
'consent': token.consent,
|
|
'patient': token.consent.patient,
|
|
'signing_url': signing_url,
|
|
'expires_at': token.expires_at,
|
|
'clinic_name': token.consent.tenant.name,
|
|
}
|
|
|
|
# Render email templates
|
|
subject = f"Consent Form for {token.consent.patient.full_name_en}"
|
|
|
|
try:
|
|
html_message = render_to_string(
|
|
'emails/consent_signing_request.html',
|
|
context
|
|
)
|
|
text_message = render_to_string(
|
|
'emails/consent_signing_request.txt',
|
|
context
|
|
)
|
|
except:
|
|
# Fallback to simple text if templates don't exist
|
|
text_message = f"""
|
|
Consent Form for {token.consent.patient.full_name_en}
|
|
|
|
Dear Parent/Guardian,
|
|
|
|
{token.consent.tenant.name} requires your consent for treatment services.
|
|
|
|
Please review and sign the consent form by visiting this link:
|
|
{signing_url}
|
|
|
|
This link will expire on {token.expires_at.strftime('%B %d, %Y at %I:%M %p')}
|
|
|
|
You do not need to create an account or log in.
|
|
|
|
If you have any questions, please contact us.
|
|
"""
|
|
html_message = None
|
|
|
|
# Create email connection with tenant settings
|
|
connection = ConsentEmailService._get_email_connection(email_config)
|
|
|
|
# Send email
|
|
send_mail(
|
|
subject=subject,
|
|
message=text_message,
|
|
from_email=email_config.get('from_email', settings.DEFAULT_FROM_EMAIL),
|
|
recipient_list=[token.email],
|
|
html_message=html_message,
|
|
fail_silently=False,
|
|
connection=connection,
|
|
)
|
|
|
|
@staticmethod
|
|
def verify_token(token_string: str) -> Tuple[bool, str, Optional['ConsentToken']]:
|
|
"""
|
|
Verify if token is valid.
|
|
|
|
Args:
|
|
token_string: Token string to verify
|
|
|
|
Returns:
|
|
Tuple[bool, str, Optional[ConsentToken]]: (is_valid, message, token)
|
|
"""
|
|
from core.models import ConsentToken
|
|
|
|
try:
|
|
token = ConsentToken.objects.select_related(
|
|
'consent', 'consent__patient'
|
|
).get(token=token_string)
|
|
|
|
if not token.is_valid():
|
|
if token.used_at:
|
|
return False, "This consent has already been signed.", None
|
|
elif token.expires_at < timezone.now():
|
|
return False, "This link has expired. Please request a new one.", None
|
|
else:
|
|
return False, "This link is no longer valid.", None
|
|
|
|
return True, "Token is valid.", token
|
|
|
|
except ConsentToken.DoesNotExist:
|
|
return False, "Invalid consent link.", None
|
|
|
|
@staticmethod
|
|
@transaction.atomic
|
|
def sign_consent_via_token(
|
|
token,
|
|
signed_by_name: str,
|
|
signed_by_relationship: str,
|
|
signature_method: str,
|
|
signature_image=None,
|
|
signed_ip: str = None,
|
|
signed_user_agent: str = None
|
|
):
|
|
"""
|
|
Sign consent using email token.
|
|
|
|
Args:
|
|
token: ConsentToken instance
|
|
signed_by_name: Name of person signing
|
|
signed_by_relationship: Relationship to patient
|
|
signature_method: Method used for signature
|
|
signature_image: Optional signature image
|
|
signed_ip: IP address of signer
|
|
signed_user_agent: User agent of signer
|
|
|
|
Returns:
|
|
Consent: Signed consent instance
|
|
"""
|
|
# Sign the consent
|
|
consent = ConsentService.sign_consent(
|
|
consent=token.consent,
|
|
signed_by_name=signed_by_name,
|
|
signed_by_relationship=signed_by_relationship,
|
|
signature_method=signature_method,
|
|
signature_image=signature_image,
|
|
signed_ip=signed_ip,
|
|
signed_user_agent=signed_user_agent
|
|
)
|
|
|
|
# Mark token as used
|
|
token.mark_as_used()
|
|
|
|
# Send confirmation email
|
|
ConsentEmailService._send_confirmation_email(token, consent)
|
|
|
|
logger.info(
|
|
f"Consent {consent.id} signed via email token by {signed_by_name}"
|
|
)
|
|
|
|
return consent
|
|
|
|
@staticmethod
|
|
def _send_confirmation_email(token, consent):
|
|
"""Send confirmation email after consent is signed using tenant-specific settings."""
|
|
from django.core.mail import send_mail, get_connection
|
|
from django.template.loader import render_to_string
|
|
from django.conf import settings
|
|
from core.settings_service import get_tenant_settings_service
|
|
|
|
# Get tenant-specific email settings
|
|
settings_service = get_tenant_settings_service(consent.tenant)
|
|
email_config = settings_service.get_email_configuration()
|
|
|
|
context = {
|
|
'consent': consent,
|
|
'patient': consent.patient,
|
|
'signed_by_name': consent.signed_by_name,
|
|
'signed_at': consent.signed_at,
|
|
'clinic_name': consent.tenant.name,
|
|
}
|
|
|
|
subject = f"Consent Form Signed - {consent.patient.full_name_en}"
|
|
|
|
try:
|
|
html_message = render_to_string(
|
|
'emails/consent_signed_confirmation.html',
|
|
context
|
|
)
|
|
text_message = render_to_string(
|
|
'emails/consent_signed_confirmation.txt',
|
|
context
|
|
)
|
|
except:
|
|
# Fallback to simple text
|
|
text_message = f"""
|
|
Consent Form Signed Successfully
|
|
|
|
Dear {consent.signed_by_name},
|
|
|
|
Thank you for signing the consent form for {consent.patient.full_name_en}.
|
|
|
|
Consent Details:
|
|
- Patient: {consent.patient.full_name_en}
|
|
- Signed by: {consent.signed_by_name}
|
|
- Signed on: {consent.signed_at.strftime('%B %d, %Y at %I:%M %p')}
|
|
- Consent Type: {consent.get_consent_type_display()}
|
|
|
|
A copy of this consent has been recorded in our system.
|
|
|
|
If you have any questions, please contact {consent.tenant.name}.
|
|
"""
|
|
html_message = None
|
|
|
|
# Create email connection with tenant settings
|
|
connection = ConsentEmailService._get_email_connection(email_config)
|
|
|
|
send_mail(
|
|
subject=subject,
|
|
message=text_message,
|
|
from_email=email_config.get('from_email', settings.DEFAULT_FROM_EMAIL),
|
|
recipient_list=[token.email],
|
|
html_message=html_message,
|
|
fail_silently=True, # Don't fail if confirmation email fails
|
|
connection=connection,
|
|
)
|
|
|
|
@staticmethod
|
|
def _get_email_connection(email_config):
|
|
"""Create email connection from tenant configuration."""
|
|
from django.core.mail import get_connection
|
|
|
|
backend = email_config.get('backend', 'console')
|
|
|
|
if backend == 'smtp':
|
|
# Use SMTP with tenant settings
|
|
return get_connection(
|
|
backend='django.core.mail.backends.smtp.EmailBackend',
|
|
host=email_config.get('host', 'localhost'),
|
|
port=email_config.get('port', 587),
|
|
username=email_config.get('username', ''),
|
|
password=email_config.get('password', ''),
|
|
use_tls=email_config.get('use_tls', True),
|
|
fail_silently=False,
|
|
)
|
|
else:
|
|
# Use console backend for testing
|
|
return get_connection(
|
|
backend='django.core.mail.backends.console.EmailBackend'
|
|
)
|
|
|
|
|
|
class PatientService:
|
|
"""Service class for patient-related operations."""
|
|
|
|
@staticmethod
|
|
def get_patient_summary(patient: Patient) -> dict:
|
|
"""
|
|
Get comprehensive summary of patient information.
|
|
|
|
Args:
|
|
patient: Patient instance
|
|
|
|
Returns:
|
|
dict: Patient summary information
|
|
"""
|
|
from finance.services import FinancialClearanceService
|
|
|
|
# Get financial information
|
|
outstanding_balance = FinancialClearanceService.get_outstanding_balance(patient)
|
|
outstanding_invoices = FinancialClearanceService.get_outstanding_invoices(patient)
|
|
|
|
# Get consent information
|
|
active_consents = ConsentService.get_active_consents(patient)
|
|
|
|
# Get appointment information
|
|
from appointments.models import Appointment
|
|
upcoming_appointments = Appointment.objects.filter(
|
|
patient=patient,
|
|
scheduled_date__gte=timezone.now().date(),
|
|
status__in=['BOOKED', 'CONFIRMED']
|
|
).order_by('scheduled_date', 'scheduled_time')[:5]
|
|
|
|
return {
|
|
'patient': patient,
|
|
'financial': {
|
|
'outstanding_balance': outstanding_balance,
|
|
'outstanding_invoices_count': len(outstanding_invoices),
|
|
'outstanding_invoices': outstanding_invoices[:3], # Latest 3
|
|
},
|
|
'consents': {
|
|
'active_count': len(active_consents),
|
|
'consents': active_consents,
|
|
},
|
|
'appointments': {
|
|
'upcoming_count': upcoming_appointments.count(),
|
|
'upcoming': list(upcoming_appointments),
|
|
},
|
|
}
|