""" Business logic services for psychology app. """ from django.db.models import Q, Count, Avg, F from django.utils import timezone from datetime import timedelta from typing import List, Dict, Optional from .models import ( PsychologyConsultation, PsychologyAssessment, PsychologySession, PsychologyGoal, PsychologyProgressReport, ) from core.models import Patient, User from notifications.models import Notification class PsychologyRiskAssessmentService: """ Service for managing risk assessments in psychology. Handles risk level calculations, alerts, and safety protocols. """ @staticmethod def assess_risk_level(consultation: PsychologyConsultation) -> Dict[str, any]: """ Assess overall risk level from consultation. Returns: dict: Risk assessment summary with level and recommendations """ risk_factors = [] risk_level = 'NONE' # Check suicide risk if consultation.suicide_risk in ['MODERATE', 'HIGH']: risk_factors.append(f"Suicide Risk: {consultation.get_suicide_risk_display()}") risk_level = consultation.suicide_risk # Check homicide risk if consultation.homicide_risk in ['MODERATE', 'HIGH']: risk_factors.append(f"Homicide Risk: {consultation.get_homicide_risk_display()}") if consultation.homicide_risk == 'HIGH': risk_level = 'HIGH' elif risk_level != 'HIGH': risk_level = consultation.homicide_risk return { 'risk_level': risk_level, 'risk_factors': risk_factors, 'requires_immediate_attention': risk_level == 'HIGH', 'requires_safety_plan': risk_level in ['MODERATE', 'HIGH'], 'assessment_notes': consultation.risk_assessment_notes, } @staticmethod def create_safety_alert(consultation: PsychologyConsultation) -> bool: """ Create safety alert if high risk detected. Returns: bool: True if alert was created """ risk_assessment = PsychologyRiskAssessmentService.assess_risk_level(consultation) if risk_assessment['requires_immediate_attention']: # Create notification for clinical coordinator and senior psychologists Notification.objects.create( user=consultation.provider, notification_type='SAFETY_ALERT', title=f"High Risk Patient: {consultation.patient.get_full_name()}", message=f"Patient has {', '.join(risk_assessment['risk_factors'])}. Immediate attention required.", ) return True return False @staticmethod def get_high_risk_patients(tenant_id: str) -> List[Patient]: """ Get list of patients with high risk assessments. Args: tenant_id: Tenant ID to filter by Returns: List of Patient objects with high risk """ high_risk_consultations = PsychologyConsultation.objects.filter( Q(suicide_risk='HIGH') | Q(homicide_risk='HIGH'), tenant_id=tenant_id ).values_list('patient_id', flat=True).distinct() return Patient.objects.filter(id__in=high_risk_consultations) class PsychologyGoalTrackingService: """ Service for tracking and managing psychology treatment goals. """ @staticmethod def calculate_goal_progress(patient: Patient) -> Dict[str, any]: """ Calculate overall goal progress for a patient. Returns: dict: Goal progress summary """ goals = PsychologyGoal.objects.filter(patient=patient) total_goals = goals.count() if total_goals == 0: return { 'total_goals': 0, 'average_progress': 0, 'achieved_count': 0, 'in_progress_count': 0, 'not_started_count': 0, 'achievement_rate': 0, } achieved = goals.filter(status='ACHIEVED').count() in_progress = goals.filter(status='IN_PROGRESS').count() not_started = goals.filter(status='NOT_STARTED').count() avg_progress = goals.aggregate(avg=Avg('progress_percentage'))['avg'] or 0 return { 'total_goals': total_goals, 'average_progress': round(avg_progress, 2), 'achieved_count': achieved, 'in_progress_count': in_progress, 'not_started_count': not_started, 'achievement_rate': round((achieved / total_goals) * 100, 2), } @staticmethod def get_overdue_goals(provider: Optional[User] = None) -> List[PsychologyGoal]: """ Get goals that are past their target date. Args: provider: Optional provider to filter by Returns: List of overdue goals """ today = timezone.now().date() queryset = PsychologyGoal.objects.filter( target_date__lt=today, status__in=['NOT_STARTED', 'IN_PROGRESS'] ) if provider: queryset = queryset.filter(consultation__provider=provider) return list(queryset.select_related('patient', 'consultation')) @staticmethod def update_goal_from_session(session: PsychologySession) -> None: """ Update goal progress based on session notes. This can be called after a session to automatically update related goals. """ # Get active goals for the patient active_goals = PsychologyGoal.objects.filter( patient=session.patient, status='IN_PROGRESS' ) # This is a placeholder - actual implementation would parse # session.progress_toward_goals and update specific goals # For now, we just ensure goals exist if active_goals.exists(): # Logic to parse progress notes and update goals pass class PsychologyReportGenerationService: """ Service for generating psychology progress reports. """ @staticmethod def generate_progress_report( patient: Patient, provider: User, period_start: timezone.datetime.date, period_end: timezone.datetime.date ) -> PsychologyProgressReport: """ Generate a progress report for a patient. Args: patient: Patient object provider: Provider (psychologist) period_start: Start date of reporting period period_end: End date of reporting period Returns: PsychologyProgressReport object """ # Get sessions in the period sessions = PsychologySession.objects.filter( patient=patient, session_date__gte=period_start, session_date__lte=period_end ).order_by('session_date') # Get goals goals = PsychologyGoal.objects.filter(patient=patient) # Calculate statistics sessions_count = sessions.count() # Get initial consultation initial_consultation = PsychologyConsultation.objects.filter( patient=patient ).order_by('consultation_date').first() # Build goals progress summary goals_progress_text = "" for goal in goals: goals_progress_text += f"- {goal.goal_description}: {goal.progress_percentage}% ({goal.get_status_display()})\n" # Build treatment summary modalities_used = sessions.values_list('therapy_modality', flat=True).distinct() treatment_provided = f"Therapy modalities used: {', '.join([m for m in modalities_used if m])}\n" treatment_provided += f"Total sessions: {sessions_count}\n" # Create report report = PsychologyProgressReport.objects.create( patient=patient, tenant=patient.tenant, report_date=timezone.now().date(), provider=provider, treatment_start_date=initial_consultation.consultation_date if initial_consultation else period_start, sessions_scheduled=sessions_count, # This should come from appointments sessions_attended=sessions_count, presenting_problems_summary=initial_consultation.presenting_problem if initial_consultation else "", treatment_provided=treatment_provided, goals_progress=goals_progress_text, overall_progress="Progress summary to be completed by clinician.", current_functioning="Current functioning to be assessed by clinician.", current_symptoms="Current symptoms to be documented by clinician.", recommendations="Recommendations to be provided by clinician.", continue_treatment=True, prognosis="Prognosis to be determined by clinician.", ) return report @staticmethod def get_session_summary(patient: Patient, period_start: timezone.datetime.date, period_end: timezone.datetime.date) -> Dict[str, any]: """ Get summary of sessions for a patient in a period. Returns: dict: Session summary statistics """ sessions = PsychologySession.objects.filter( patient=patient, session_date__gte=period_start, session_date__lte=period_end ) return { 'total_sessions': sessions.count(), 'by_type': dict( sessions.values('session_type').annotate(count=Count('id')).values_list('session_type', 'count') ), 'by_modality': dict( sessions.values('therapy_modality').annotate(count=Count('id')).values_list('therapy_modality', 'count') ), 'average_duration': sessions.aggregate(avg=Avg('duration_minutes'))['avg'] or 0, 'risk_sessions': sessions.filter(current_risk_level__in=['MODERATE', 'HIGH']).count(), } class PsychologyStatisticsService: """ Service for generating psychology statistics and analytics. """ @staticmethod def get_provider_statistics(provider: User, start_date: Optional[timezone.datetime.date] = None) -> Dict[str, any]: """ Get statistics for a specific provider. Args: provider: Provider (psychologist) start_date: Optional start date for filtering Returns: dict: Provider statistics """ if start_date is None: start_date = timezone.now().date() - timedelta(days=30) consultations = PsychologyConsultation.objects.filter( provider=provider, consultation_date__gte=start_date ) sessions = PsychologySession.objects.filter( provider=provider, session_date__gte=start_date ) assessments = PsychologyAssessment.objects.filter( provider=provider, assessment_date__gte=start_date ) return { 'period_start': start_date, 'period_end': timezone.now().date(), 'consultations_count': consultations.count(), 'sessions_count': sessions.count(), 'assessments_count': assessments.count(), 'unique_patients': consultations.values('patient').distinct().count(), 'high_risk_cases': consultations.filter( Q(suicide_risk__in=['MODERATE', 'HIGH']) | Q(homicide_risk__in=['MODERATE', 'HIGH']) ).count(), 'average_session_duration': sessions.aggregate(avg=Avg('duration_minutes'))['avg'] or 0, 'most_common_referral_reason': consultations.values('referral_reason').annotate( count=Count('id') ).order_by('-count').first(), } @staticmethod def get_tenant_statistics(tenant_id: str, start_date: Optional[timezone.datetime.date] = None) -> Dict[str, any]: """ Get psychology statistics for a tenant. Args: tenant_id: Tenant ID start_date: Optional start date for filtering Returns: dict: Tenant-wide statistics """ if start_date is None: start_date = timezone.now().date() - timedelta(days=90) consultations = PsychologyConsultation.objects.filter( tenant_id=tenant_id, consultation_date__gte=start_date ) sessions = PsychologySession.objects.filter( tenant_id=tenant_id, session_date__gte=start_date ) goals = PsychologyGoal.objects.filter( patient__tenant_id=tenant_id ) return { 'period_start': start_date, 'period_end': timezone.now().date(), 'total_consultations': consultations.count(), 'total_sessions': sessions.count(), 'total_active_patients': consultations.values('patient').distinct().count(), 'total_goals': goals.count(), 'goals_achieved': goals.filter(status='ACHIEVED').count(), 'average_goal_progress': goals.aggregate(avg=Avg('progress_percentage'))['avg'] or 0, 'high_risk_patients': consultations.filter( Q(suicide_risk__in=['MODERATE', 'HIGH']) | Q(homicide_risk__in=['MODERATE', 'HIGH']) ).values('patient').distinct().count(), 'referral_reasons_breakdown': dict( consultations.values('referral_reason').annotate( count=Count('id') ).values_list('referral_reason', 'count') ), 'therapy_modalities_breakdown': dict( sessions.values('therapy_modality').annotate( count=Count('id') ).values_list('therapy_modality', 'count') ), } class PsychologySessionManagementService: """ Service for managing psychology sessions. """ @staticmethod def get_next_session_number(patient: Patient) -> int: """ Get the next session number for a patient. Args: patient: Patient object Returns: int: Next session number """ last_session = PsychologySession.objects.filter( patient=patient ).order_by('-session_number').first() if last_session: return last_session.session_number + 1 return 1 @staticmethod def get_patient_session_history(patient: Patient, limit: int = 10) -> List[PsychologySession]: """ Get recent session history for a patient. Args: patient: Patient object limit: Number of sessions to return Returns: List of PsychologySession objects """ return list( PsychologySession.objects.filter(patient=patient) .select_related('provider', 'appointment') .order_by('-session_date', '-session_number')[:limit] ) @staticmethod def check_unsigned_sessions(provider: User, days: int = 7) -> List[PsychologySession]: """ Get unsigned sessions for a provider. Args: provider: Provider (psychologist) days: Number of days to look back Returns: List of unsigned sessions """ cutoff_date = timezone.now().date() - timedelta(days=days) return list( PsychologySession.objects.filter( provider=provider, session_date__gte=cutoff_date, signed_by__isnull=True ).select_related('patient').order_by('session_date') ) class PsychologyTreatmentPlanService: """ Service for managing treatment plans and recommendations. """ @staticmethod def create_treatment_plan_from_consultation(consultation: PsychologyConsultation) -> List[PsychologyGoal]: """ Create treatment goals from consultation recommendations. Args: consultation: PsychologyConsultation object Returns: List of created PsychologyGoal objects """ goals = [] # Parse treatment_goals text and create goal objects # This is a simplified version - actual implementation would parse structured data if consultation.treatment_goals: goal_lines = consultation.treatment_goals.split('\n') for i, goal_text in enumerate(goal_lines): if goal_text.strip(): goal = PsychologyGoal.objects.create( patient=consultation.patient, consultation=consultation, goal_description=goal_text.strip(), target_date=timezone.now().date() + timedelta(days=90), # Default 90 days status='NOT_STARTED', progress_percentage=0, ) goals.append(goal) return goals @staticmethod def get_treatment_summary(patient: Patient) -> Dict[str, any]: """ Get comprehensive treatment summary for a patient. Returns: dict: Treatment summary """ consultations = PsychologyConsultation.objects.filter(patient=patient) sessions = PsychologySession.objects.filter(patient=patient) goals = PsychologyGoal.objects.filter(patient=patient) reports = PsychologyProgressReport.objects.filter(patient=patient) initial_consultation = consultations.order_by('consultation_date').first() latest_session = sessions.order_by('-session_date').first() return { 'patient_id': str(patient.id), 'patient_name': patient.get_full_name(), 'treatment_start_date': initial_consultation.consultation_date if initial_consultation else None, 'total_consultations': consultations.count(), 'total_sessions': sessions.count(), 'total_assessments': PsychologyAssessment.objects.filter(patient=patient).count(), 'total_goals': goals.count(), 'goals_achieved': goals.filter(status='ACHIEVED').count(), 'average_goal_progress': goals.aggregate(avg=Avg('progress_percentage'))['avg'] or 0, 'latest_session_date': latest_session.session_date if latest_session else None, 'latest_risk_level': latest_session.current_risk_level if latest_session else None, 'total_reports': reports.count(), 'latest_report_date': reports.order_by('-report_date').first().report_date if reports.exists() else None, 'current_diagnosis': initial_consultation.provisional_diagnosis if initial_consultation else None, } class PsychologyNotificationService: """ Service for managing psychology-specific notifications. """ @staticmethod def notify_high_risk_consultation(consultation: PsychologyConsultation) -> None: """ Send notifications for high-risk consultations. """ risk_assessment = PsychologyRiskAssessmentService.assess_risk_level(consultation) if risk_assessment['requires_immediate_attention']: # Notify clinical coordinator Notification.objects.create( user=consultation.provider, notification_type='URGENT', title=f"High Risk Patient Alert", message=f"{consultation.patient.get_full_name()} - {', '.join(risk_assessment['risk_factors'])}", ) @staticmethod def notify_unsigned_sessions(provider: User) -> int: """ Send notification about unsigned sessions. Returns: int: Number of unsigned sessions """ unsigned_sessions = PsychologySessionManagementService.check_unsigned_sessions(provider, days=7) if unsigned_sessions: Notification.objects.create( user=provider, notification_type='REMINDER', title=f"Unsigned Psychology Sessions", message=f"You have {len(unsigned_sessions)} unsigned psychology sessions from the past week.", ) return len(unsigned_sessions) @staticmethod def notify_overdue_goals(provider: User) -> int: """ Send notification about overdue goals. Returns: int: Number of overdue goals """ overdue_goals = PsychologyGoalTrackingService.get_overdue_goals(provider) if overdue_goals: Notification.objects.create( user=provider, notification_type='REMINDER', title=f"Overdue Treatment Goals", message=f"You have {len(overdue_goals)} overdue treatment goals that need review.", ) return len(overdue_goals)