agdar/psychology/services.py
Marwan Alwali 2f1681b18c update
2025-11-11 13:44:48 +03:00

587 lines
21 KiB
Python

"""
Business logic services for psychology app.
"""
from django.db.models import Q, Count, Avg, F
from django.utils import timezone
from datetime import timedelta
from typing import List, Dict, Optional
from .models import (
PsychologyConsultation,
PsychologyAssessment,
PsychologySession,
PsychologyGoal,
PsychologyProgressReport,
)
from core.models import Patient, User
from notifications.models import Notification
class PsychologyRiskAssessmentService:
"""
Service for managing risk assessments in psychology.
Handles risk level calculations, alerts, and safety protocols.
"""
@staticmethod
def assess_risk_level(consultation: PsychologyConsultation) -> Dict[str, any]:
"""
Assess overall risk level from consultation.
Returns:
dict: Risk assessment summary with level and recommendations
"""
risk_factors = []
risk_level = 'NONE'
# Check suicide risk
if consultation.suicide_risk in ['MODERATE', 'HIGH']:
risk_factors.append(f"Suicide Risk: {consultation.get_suicide_risk_display()}")
risk_level = consultation.suicide_risk
# Check homicide risk
if consultation.homicide_risk in ['MODERATE', 'HIGH']:
risk_factors.append(f"Homicide Risk: {consultation.get_homicide_risk_display()}")
if consultation.homicide_risk == 'HIGH':
risk_level = 'HIGH'
elif risk_level != 'HIGH':
risk_level = consultation.homicide_risk
return {
'risk_level': risk_level,
'risk_factors': risk_factors,
'requires_immediate_attention': risk_level == 'HIGH',
'requires_safety_plan': risk_level in ['MODERATE', 'HIGH'],
'assessment_notes': consultation.risk_assessment_notes,
}
@staticmethod
def create_safety_alert(consultation: PsychologyConsultation) -> bool:
"""
Create safety alert if high risk detected.
Returns:
bool: True if alert was created
"""
risk_assessment = PsychologyRiskAssessmentService.assess_risk_level(consultation)
if risk_assessment['requires_immediate_attention']:
# Create notification for clinical coordinator and senior psychologists
Notification.objects.create(
user=consultation.provider,
notification_type='SAFETY_ALERT',
title=f"High Risk Patient: {consultation.patient.get_full_name()}",
message=f"Patient has {', '.join(risk_assessment['risk_factors'])}. Immediate attention required.",
)
return True
return False
@staticmethod
def get_high_risk_patients(tenant_id: str) -> List[Patient]:
"""
Get list of patients with high risk assessments.
Args:
tenant_id: Tenant ID to filter by
Returns:
List of Patient objects with high risk
"""
high_risk_consultations = PsychologyConsultation.objects.filter(
Q(suicide_risk='HIGH') | Q(homicide_risk='HIGH'),
tenant_id=tenant_id
).values_list('patient_id', flat=True).distinct()
return Patient.objects.filter(id__in=high_risk_consultations)
class PsychologyGoalTrackingService:
"""
Service for tracking and managing psychology treatment goals.
"""
@staticmethod
def calculate_goal_progress(patient: Patient) -> Dict[str, any]:
"""
Calculate overall goal progress for a patient.
Returns:
dict: Goal progress summary
"""
goals = PsychologyGoal.objects.filter(patient=patient)
total_goals = goals.count()
if total_goals == 0:
return {
'total_goals': 0,
'average_progress': 0,
'achieved_count': 0,
'in_progress_count': 0,
'not_started_count': 0,
'achievement_rate': 0,
}
achieved = goals.filter(status='ACHIEVED').count()
in_progress = goals.filter(status='IN_PROGRESS').count()
not_started = goals.filter(status='NOT_STARTED').count()
avg_progress = goals.aggregate(avg=Avg('progress_percentage'))['avg'] or 0
return {
'total_goals': total_goals,
'average_progress': round(avg_progress, 2),
'achieved_count': achieved,
'in_progress_count': in_progress,
'not_started_count': not_started,
'achievement_rate': round((achieved / total_goals) * 100, 2),
}
@staticmethod
def get_overdue_goals(provider: Optional[User] = None) -> List[PsychologyGoal]:
"""
Get goals that are past their target date.
Args:
provider: Optional provider to filter by
Returns:
List of overdue goals
"""
today = timezone.now().date()
queryset = PsychologyGoal.objects.filter(
target_date__lt=today,
status__in=['NOT_STARTED', 'IN_PROGRESS']
)
if provider:
queryset = queryset.filter(consultation__provider=provider)
return list(queryset.select_related('patient', 'consultation'))
@staticmethod
def update_goal_from_session(session: PsychologySession) -> None:
"""
Update goal progress based on session notes.
This can be called after a session to automatically update related goals.
"""
# Get active goals for the patient
active_goals = PsychologyGoal.objects.filter(
patient=session.patient,
status='IN_PROGRESS'
)
# This is a placeholder - actual implementation would parse
# session.progress_toward_goals and update specific goals
# For now, we just ensure goals exist
if active_goals.exists():
# Logic to parse progress notes and update goals
pass
class PsychologyReportGenerationService:
"""
Service for generating psychology progress reports.
"""
@staticmethod
def generate_progress_report(
patient: Patient,
provider: User,
period_start: timezone.datetime.date,
period_end: timezone.datetime.date
) -> PsychologyProgressReport:
"""
Generate a progress report for a patient.
Args:
patient: Patient object
provider: Provider (psychologist)
period_start: Start date of reporting period
period_end: End date of reporting period
Returns:
PsychologyProgressReport object
"""
# Get sessions in the period
sessions = PsychologySession.objects.filter(
patient=patient,
session_date__gte=period_start,
session_date__lte=period_end
).order_by('session_date')
# Get goals
goals = PsychologyGoal.objects.filter(patient=patient)
# Calculate statistics
sessions_count = sessions.count()
# Get initial consultation
initial_consultation = PsychologyConsultation.objects.filter(
patient=patient
).order_by('consultation_date').first()
# Build goals progress summary
goals_progress_text = ""
for goal in goals:
goals_progress_text += f"- {goal.goal_description}: {goal.progress_percentage}% ({goal.get_status_display()})\n"
# Build treatment summary
modalities_used = sessions.values_list('therapy_modality', flat=True).distinct()
treatment_provided = f"Therapy modalities used: {', '.join([m for m in modalities_used if m])}\n"
treatment_provided += f"Total sessions: {sessions_count}\n"
# Create report
report = PsychologyProgressReport.objects.create(
patient=patient,
tenant=patient.tenant,
report_date=timezone.now().date(),
provider=provider,
treatment_start_date=initial_consultation.consultation_date if initial_consultation else period_start,
sessions_scheduled=sessions_count, # This should come from appointments
sessions_attended=sessions_count,
presenting_problems_summary=initial_consultation.presenting_problem if initial_consultation else "",
treatment_provided=treatment_provided,
goals_progress=goals_progress_text,
overall_progress="Progress summary to be completed by clinician.",
current_functioning="Current functioning to be assessed by clinician.",
current_symptoms="Current symptoms to be documented by clinician.",
recommendations="Recommendations to be provided by clinician.",
continue_treatment=True,
prognosis="Prognosis to be determined by clinician.",
)
return report
@staticmethod
def get_session_summary(patient: Patient, period_start: timezone.datetime.date, period_end: timezone.datetime.date) -> Dict[str, any]:
"""
Get summary of sessions for a patient in a period.
Returns:
dict: Session summary statistics
"""
sessions = PsychologySession.objects.filter(
patient=patient,
session_date__gte=period_start,
session_date__lte=period_end
)
return {
'total_sessions': sessions.count(),
'by_type': dict(
sessions.values('session_type').annotate(count=Count('id')).values_list('session_type', 'count')
),
'by_modality': dict(
sessions.values('therapy_modality').annotate(count=Count('id')).values_list('therapy_modality', 'count')
),
'average_duration': sessions.aggregate(avg=Avg('duration_minutes'))['avg'] or 0,
'risk_sessions': sessions.filter(current_risk_level__in=['MODERATE', 'HIGH']).count(),
}
class PsychologyStatisticsService:
"""
Service for generating psychology statistics and analytics.
"""
@staticmethod
def get_provider_statistics(provider: User, start_date: Optional[timezone.datetime.date] = None) -> Dict[str, any]:
"""
Get statistics for a specific provider.
Args:
provider: Provider (psychologist)
start_date: Optional start date for filtering
Returns:
dict: Provider statistics
"""
if start_date is None:
start_date = timezone.now().date() - timedelta(days=30)
consultations = PsychologyConsultation.objects.filter(
provider=provider,
consultation_date__gte=start_date
)
sessions = PsychologySession.objects.filter(
provider=provider,
session_date__gte=start_date
)
assessments = PsychologyAssessment.objects.filter(
provider=provider,
assessment_date__gte=start_date
)
return {
'period_start': start_date,
'period_end': timezone.now().date(),
'consultations_count': consultations.count(),
'sessions_count': sessions.count(),
'assessments_count': assessments.count(),
'unique_patients': consultations.values('patient').distinct().count(),
'high_risk_cases': consultations.filter(
Q(suicide_risk__in=['MODERATE', 'HIGH']) |
Q(homicide_risk__in=['MODERATE', 'HIGH'])
).count(),
'average_session_duration': sessions.aggregate(avg=Avg('duration_minutes'))['avg'] or 0,
'most_common_referral_reason': consultations.values('referral_reason').annotate(
count=Count('id')
).order_by('-count').first(),
}
@staticmethod
def get_tenant_statistics(tenant_id: str, start_date: Optional[timezone.datetime.date] = None) -> Dict[str, any]:
"""
Get psychology statistics for a tenant.
Args:
tenant_id: Tenant ID
start_date: Optional start date for filtering
Returns:
dict: Tenant-wide statistics
"""
if start_date is None:
start_date = timezone.now().date() - timedelta(days=90)
consultations = PsychologyConsultation.objects.filter(
tenant_id=tenant_id,
consultation_date__gte=start_date
)
sessions = PsychologySession.objects.filter(
tenant_id=tenant_id,
session_date__gte=start_date
)
goals = PsychologyGoal.objects.filter(
patient__tenant_id=tenant_id
)
return {
'period_start': start_date,
'period_end': timezone.now().date(),
'total_consultations': consultations.count(),
'total_sessions': sessions.count(),
'total_active_patients': consultations.values('patient').distinct().count(),
'total_goals': goals.count(),
'goals_achieved': goals.filter(status='ACHIEVED').count(),
'average_goal_progress': goals.aggregate(avg=Avg('progress_percentage'))['avg'] or 0,
'high_risk_patients': consultations.filter(
Q(suicide_risk__in=['MODERATE', 'HIGH']) |
Q(homicide_risk__in=['MODERATE', 'HIGH'])
).values('patient').distinct().count(),
'referral_reasons_breakdown': dict(
consultations.values('referral_reason').annotate(
count=Count('id')
).values_list('referral_reason', 'count')
),
'therapy_modalities_breakdown': dict(
sessions.values('therapy_modality').annotate(
count=Count('id')
).values_list('therapy_modality', 'count')
),
}
class PsychologySessionManagementService:
"""
Service for managing psychology sessions.
"""
@staticmethod
def get_next_session_number(patient: Patient) -> int:
"""
Get the next session number for a patient.
Args:
patient: Patient object
Returns:
int: Next session number
"""
last_session = PsychologySession.objects.filter(
patient=patient
).order_by('-session_number').first()
if last_session:
return last_session.session_number + 1
return 1
@staticmethod
def get_patient_session_history(patient: Patient, limit: int = 10) -> List[PsychologySession]:
"""
Get recent session history for a patient.
Args:
patient: Patient object
limit: Number of sessions to return
Returns:
List of PsychologySession objects
"""
return list(
PsychologySession.objects.filter(patient=patient)
.select_related('provider', 'appointment')
.order_by('-session_date', '-session_number')[:limit]
)
@staticmethod
def check_unsigned_sessions(provider: User, days: int = 7) -> List[PsychologySession]:
"""
Get unsigned sessions for a provider.
Args:
provider: Provider (psychologist)
days: Number of days to look back
Returns:
List of unsigned sessions
"""
cutoff_date = timezone.now().date() - timedelta(days=days)
return list(
PsychologySession.objects.filter(
provider=provider,
session_date__gte=cutoff_date,
signed_by__isnull=True
).select_related('patient').order_by('session_date')
)
class PsychologyTreatmentPlanService:
"""
Service for managing treatment plans and recommendations.
"""
@staticmethod
def create_treatment_plan_from_consultation(consultation: PsychologyConsultation) -> List[PsychologyGoal]:
"""
Create treatment goals from consultation recommendations.
Args:
consultation: PsychologyConsultation object
Returns:
List of created PsychologyGoal objects
"""
goals = []
# Parse treatment_goals text and create goal objects
# This is a simplified version - actual implementation would parse structured data
if consultation.treatment_goals:
goal_lines = consultation.treatment_goals.split('\n')
for i, goal_text in enumerate(goal_lines):
if goal_text.strip():
goal = PsychologyGoal.objects.create(
patient=consultation.patient,
consultation=consultation,
goal_description=goal_text.strip(),
target_date=timezone.now().date() + timedelta(days=90), # Default 90 days
status='NOT_STARTED',
progress_percentage=0,
)
goals.append(goal)
return goals
@staticmethod
def get_treatment_summary(patient: Patient) -> Dict[str, any]:
"""
Get comprehensive treatment summary for a patient.
Returns:
dict: Treatment summary
"""
consultations = PsychologyConsultation.objects.filter(patient=patient)
sessions = PsychologySession.objects.filter(patient=patient)
goals = PsychologyGoal.objects.filter(patient=patient)
reports = PsychologyProgressReport.objects.filter(patient=patient)
initial_consultation = consultations.order_by('consultation_date').first()
latest_session = sessions.order_by('-session_date').first()
return {
'patient_id': str(patient.id),
'patient_name': patient.get_full_name(),
'treatment_start_date': initial_consultation.consultation_date if initial_consultation else None,
'total_consultations': consultations.count(),
'total_sessions': sessions.count(),
'total_assessments': PsychologyAssessment.objects.filter(patient=patient).count(),
'total_goals': goals.count(),
'goals_achieved': goals.filter(status='ACHIEVED').count(),
'average_goal_progress': goals.aggregate(avg=Avg('progress_percentage'))['avg'] or 0,
'latest_session_date': latest_session.session_date if latest_session else None,
'latest_risk_level': latest_session.current_risk_level if latest_session else None,
'total_reports': reports.count(),
'latest_report_date': reports.order_by('-report_date').first().report_date if reports.exists() else None,
'current_diagnosis': initial_consultation.provisional_diagnosis if initial_consultation else None,
}
class PsychologyNotificationService:
"""
Service for managing psychology-specific notifications.
"""
@staticmethod
def notify_high_risk_consultation(consultation: PsychologyConsultation) -> None:
"""
Send notifications for high-risk consultations.
"""
risk_assessment = PsychologyRiskAssessmentService.assess_risk_level(consultation)
if risk_assessment['requires_immediate_attention']:
# Notify clinical coordinator
Notification.objects.create(
user=consultation.provider,
notification_type='URGENT',
title=f"High Risk Patient Alert",
message=f"{consultation.patient.get_full_name()} - {', '.join(risk_assessment['risk_factors'])}",
)
@staticmethod
def notify_unsigned_sessions(provider: User) -> int:
"""
Send notification about unsigned sessions.
Returns:
int: Number of unsigned sessions
"""
unsigned_sessions = PsychologySessionManagementService.check_unsigned_sessions(provider, days=7)
if unsigned_sessions:
Notification.objects.create(
user=provider,
notification_type='REMINDER',
title=f"Unsigned Psychology Sessions",
message=f"You have {len(unsigned_sessions)} unsigned psychology sessions from the past week.",
)
return len(unsigned_sessions)
@staticmethod
def notify_overdue_goals(provider: User) -> int:
"""
Send notification about overdue goals.
Returns:
int: Number of overdue goals
"""
overdue_goals = PsychologyGoalTrackingService.get_overdue_goals(provider)
if overdue_goals:
Notification.objects.create(
user=provider,
notification_type='REMINDER',
title=f"Overdue Treatment Goals",
message=f"You have {len(overdue_goals)} overdue treatment goals that need review.",
)
return len(overdue_goals)