HH/apps/surveys/analytics.py
2026-01-24 15:27:30 +03:00

376 lines
12 KiB
Python

"""
Survey analytics and tracking utilities.
This module provides functions to calculate survey engagement metrics:
- Open rate
- Completion rate
- Abandonment rate
- Time to complete
- And other engagement metrics
"""
from django.db.models import Avg, Count, F, Q, Sum
from django.utils import timezone
from django.db.models.functions import TruncDay, TruncHour
from .models import SurveyInstance, SurveyTracking
def get_survey_engagement_stats(survey_template_id=None, hospital_id=None, days=30):
"""
Get comprehensive survey engagement statistics.
Args:
survey_template_id: Filter by specific survey template (optional)
hospital_id: Filter by hospital (optional)
days: Number of days to look back (default 30)
Returns:
dict: Engagement statistics
"""
from django.utils import timezone
from datetime import timedelta
cutoff_date = timezone.now() - timedelta(days=days)
queryset = SurveyInstance.objects.filter(created_at__gte=cutoff_date)
if survey_template_id:
queryset = queryset.filter(survey_template_id=survey_template_id)
if hospital_id:
queryset = queryset.filter(hospital_id=hospital_id)
total_sent = queryset.count()
total_opened = queryset.filter(opened_at__isnull=False).count()
total_completed = queryset.filter(status='completed').count()
total_abandoned = queryset.filter(
opened_at__isnull=False,
status__in=['viewed', 'in_progress', 'abandoned']
).count()
# Calculate rates
open_rate = (total_opened / total_sent * 100) if total_sent > 0 else 0
completion_rate = (total_completed / total_sent * 100) if total_sent > 0 else 0
abandonment_rate = (total_abandoned / total_opened * 100) if total_opened > 0 else 0
# Calculate average time to complete (in minutes)
completed_surveys = queryset.filter(
status='completed',
sent_at__isnull=False,
completed_at__isnull=False
).annotate(
time_diff=F('completed_at') - F('sent_at')
)
avg_time_minutes = 0
if completed_surveys.exists():
total_seconds = sum(
s.time_diff.total_seconds() for s in completed_surveys if s.time_diff
)
avg_time_minutes = total_seconds / completed_surveys.count() / 60
return {
'total_sent': total_sent,
'total_opened': total_opened,
'total_completed': total_completed,
'total_abandoned': total_abandoned,
'open_rate': round(open_rate, 2),
'completion_rate': round(completion_rate, 2),
'abandonment_rate': round(abandonment_rate, 2),
'avg_time_to_complete_minutes': round(avg_time_minutes, 2),
}
def get_patient_survey_timeline(patient_id):
"""
Get timeline of surveys for a specific patient.
Args:
patient_id: Patient ID
Returns:
list: Survey timeline with metrics
"""
surveys = SurveyInstance.objects.filter(
patient_id=patient_id
).select_related(
'survey_template'
).order_by('-sent_at')
timeline = []
for survey in surveys:
# Calculate time to complete
time_to_complete = None
if survey.sent_at and survey.completed_at:
time_to_complete = (survey.completed_at - survey.sent_at).total_seconds()
elif survey.sent_at and survey.opened_at and not survey.completed_at:
# Time since last activity for in-progress surveys
time_to_complete = (timezone.now() - survey.opened_at).total_seconds()
timeline.append({
'survey_id': str(survey.id),
'survey_name': survey.survey_template.name,
'survey_type': survey.survey_template.survey_type,
'sent_at': survey.sent_at,
'opened_at': survey.opened_at,
'completed_at': survey.completed_at,
'status': survey.status,
'time_spent_seconds': survey.time_spent_seconds,
'time_to_complete_seconds': time_to_complete,
'open_count': getattr(survey, 'open_count', 0),
'total_score': float(survey.total_score) if survey.total_score else None,
'is_negative': survey.is_negative,
'delivery_channel': survey.delivery_channel,
})
return timeline
def get_survey_completion_times(survey_template_id=None, hospital_id=None, days=30):
"""
Get individual survey completion times.
Args:
survey_template_id: Filter by survey template (optional)
hospital_id: Filter by hospital (optional)
days: Number of days to look back (default 30)
Returns:
list: Survey completion times
"""
from django.utils import timezone
from datetime import timedelta
cutoff_date = timezone.now() - timedelta(days=days)
queryset = SurveyInstance.objects.filter(
status='completed',
sent_at__isnull=False,
completed_at__isnull=False,
created_at__gte=cutoff_date
).select_related(
'patient',
'survey_template'
).annotate(
time_to_complete=F('completed_at') - F('sent_at')
)
if survey_template_id:
queryset = queryset.filter(survey_template_id=survey_template_id)
if hospital_id:
queryset = queryset.filter(hospital_id=hospital_id)
completion_times = []
for survey in queryset:
if survey.time_to_complete:
completion_times.append({
'survey_id': str(survey.id),
'patient_name': survey.patient.get_full_name(),
'survey_name': survey.survey_template.name,
'sent_at': survey.sent_at,
'completed_at': survey.completed_at,
'time_to_complete_minutes': survey.time_to_complete.total_seconds() / 60,
'time_spent_seconds': survey.time_spent_seconds,
'total_score': float(survey.total_score) if survey.total_score else None,
'is_negative': survey.is_negative,
})
# Sort by time to complete
completion_times.sort(key=lambda x: x['time_to_complete_minutes'])
return completion_times
def get_survey_abandonment_analysis(survey_template_id=None, hospital_id=None, days=30):
"""
Analyze abandoned surveys to identify patterns.
Args:
survey_template_id: Filter by survey template (optional)
hospital_id: Filter by hospital (optional)
days: Number of days to look back (default 30)
Returns:
dict: Abandonment analysis
"""
from django.utils import timezone
from datetime import timedelta
cutoff_date = timezone.now() - timedelta(days=days)
# Get abandoned surveys
abandoned_queryset = SurveyInstance.objects.filter(
opened_at__isnull=False,
status__in=['viewed', 'in_progress', 'abandoned'],
created_at__gte=cutoff_date
).select_related(
'survey_template',
'patient'
)
if survey_template_id:
abandoned_queryset = abandoned_queryset.filter(survey_template_id=survey_template_id)
if hospital_id:
abandoned_queryset = abandoned_queryset.filter(hospital_id=hospital_id)
# Analyze abandonment by channel
by_channel = {}
for survey in abandoned_queryset:
channel = survey.delivery_channel
if channel not in by_channel:
by_channel[channel] = 0
by_channel[channel] += 1
# Analyze abandonment by template
by_template = {}
for survey in abandoned_queryset:
template_name = survey.survey_template.name
if template_name not in by_template:
by_template[template_name] = 0
by_template[template_name] += 1
# Calculate time until abandonment
abandonment_times = []
for survey in abandoned_queryset:
if survey.opened_at:
time_abandoned = (timezone.now() - survey.opened_at).total_seconds()
abandonment_times.append(time_abandoned)
avg_abandonment_time_minutes = 0
if abandonment_times:
avg_abandonment_time_minutes = sum(abandonment_times) / len(abandonment_times) / 60
return {
'total_abandoned': abandoned_queryset.count(),
'by_channel': by_channel,
'by_template': by_template,
'avg_time_until_abandonment_minutes': round(avg_abandonment_time_minutes, 2),
}
def get_hourly_survey_activity(hospital_id=None, days=7):
"""
Get survey activity by hour.
Args:
hospital_id: Filter by hospital (optional)
days: Number of days to look back (default 7)
Returns:
list: Hourly activity data
"""
from django.utils import timezone
from datetime import timedelta
cutoff_date = timezone.now() - timedelta(days=days)
queryset = SurveyInstance.objects.filter(created_at__gte=cutoff_date)
if hospital_id:
queryset = queryset.filter(hospital_id=hospital_id)
# Annotate with hour
activity = queryset.annotate(
hour=TruncHour('created_at')
).values('hour', 'status').annotate(
count=Count('id')
).order_by('hour')
return list(activity)
def track_survey_open(survey_instance):
"""
Track when a survey is opened.
This should be called every time a survey is accessed.
Args:
survey_instance: SurveyInstance
Returns:
SurveyTracking: Created tracking event
"""
# Increment open count
if not hasattr(survey_instance, 'open_count'):
survey_instance.open_count = 0
survey_instance.open_count += 1
survey_instance.last_opened_at = timezone.now()
# Update status if first open
if not survey_instance.opened_at:
survey_instance.opened_at = timezone.now()
survey_instance.status = 'viewed'
survey_instance.save(update_fields=['open_count', 'last_opened_at', 'opened_at', 'status'])
# Create tracking event
tracking = SurveyTracking.objects.create(
survey_instance=survey_instance,
event_type='page_view',
user_agent='', # Will be filled by view
ip_address='', # Will be filled by view
)
return tracking
def track_survey_completion(survey_instance):
"""
Track when a survey is completed.
Args:
survey_instance: SurveyInstance
Returns:
SurveyTracking: Created tracking event
"""
# Calculate time spent
time_spent = None
if survey_instance.opened_at and survey_instance.completed_at:
time_spent = (survey_instance.completed_at - survey_instance.opened_at).total_seconds()
# Update survey instance
survey_instance.time_spent_seconds = int(time_spent) if time_spent else None
survey_instance.save(update_fields=['time_spent_seconds'])
# Create tracking event
tracking = SurveyTracking.objects.create(
survey_instance=survey_instance,
event_type='survey_completed',
total_time_spent=int(time_spent) if time_spent else None,
)
return tracking
def track_survey_abandonment(survey_instance):
"""
Track when a survey is abandoned (started but not completed).
Args:
survey_instance: SurveyInstance
Returns:
SurveyTracking: Created tracking event
"""
# Calculate time until abandonment
time_abandoned = None
if survey_instance.opened_at:
time_abandoned = (timezone.now() - survey_instance.opened_at).total_seconds()
# Update status
survey_instance.status = 'abandoned'
survey_instance.save(update_fields=['status'])
# Create tracking event
tracking = SurveyTracking.objects.create(
survey_instance=survey_instance,
event_type='survey_abandoned',
total_time_spent=int(time_abandoned) if time_abandoned else None,
)
return tracking