HH/apps/surveys/public_views.py
2026-01-24 15:27:30 +03:00

320 lines
11 KiB
Python

"""
Public survey views - Token-based survey forms (no login required)
"""
from django.contrib import messages
from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.utils import timezone
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from user_agents import parse
from apps.core.services import AuditService
from .models import SurveyInstance, SurveyQuestion, SurveyResponse, SurveyTracking
from .analytics import track_survey_open, track_survey_completion
@require_http_methods(["GET", "POST"])
def survey_form(request, token):
"""
Public survey form - accessible via secure token link.
Features:
- No login required
- Token-based access
- Mobile-first responsive design
- Bilingual support (AR/EN)
- Progress indicator
- Question type rendering
- Form validation
"""
# Get survey instance by token
# Allow access until survey is completed or token expires (2 days by default)
try:
survey = SurveyInstance.objects.select_related(
'survey_template',
'patient',
'journey_instance'
).prefetch_related(
'survey_template__questions'
).get(
access_token=token,
status__in=['pending', 'sent', 'viewed', 'in_progress'],
token_expires_at__gt=timezone.now()
)
except SurveyInstance.DoesNotExist:
return render(request, 'surveys/invalid_token.html', {
'error': 'invalid_or_expired'
})
# Track survey open - increment count and record tracking event
# Get device info from user agent
user_agent_str = request.META.get('HTTP_USER_AGENT', '')
ip_address = request.META.get('REMOTE_ADDR', '')
# Parse user agent for device info
user_agent = parse(user_agent_str)
device_type = 'mobile' if user_agent.is_mobile else ('tablet' if user_agent.is_tablet else 'desktop')
browser = f"{user_agent.browser.family} {user_agent.browser.version_string}"
# Update survey instance tracking fields
survey.open_count += 1
survey.last_opened_at = timezone.now()
# Update status based on current state
if not survey.opened_at:
survey.opened_at = timezone.now()
survey.status = 'viewed'
elif survey.status == 'sent':
survey.status = 'viewed'
survey.save(update_fields=['open_count', 'last_opened_at', 'opened_at', 'status'])
# Track page view event
SurveyTracking.track_event(
survey,
'page_view',
user_agent=user_agent_str[:500] if user_agent_str else '',
ip_address=ip_address,
device_type=device_type,
browser=browser,
metadata={
'referrer': request.META.get('HTTP_REFERER', ''),
'language': request.GET.get('lang', 'en'),
}
)
# Get questions
questions = survey.survey_template.questions.filter(
is_required=True
).order_by('order') | survey.survey_template.questions.filter(
is_required=False
).order_by('order')
if request.method == 'POST':
# Process survey responses
language = request.POST.get('language', 'en')
errors = []
responses_data = []
# Validate and collect responses
for question in questions:
field_name = f'question_{question.id}'
# Check if required
if question.is_required and not request.POST.get(field_name):
errors.append(f"Question {question.order + 1} is required")
continue
# Get response value based on question type
if question.question_type in ['rating', 'likert']:
numeric_value = request.POST.get(field_name)
if numeric_value:
responses_data.append({
'question': question,
'numeric_value': float(numeric_value),
'text_value': '',
'choice_value': ''
})
elif question.question_type == 'nps':
numeric_value = request.POST.get(field_name)
if numeric_value:
responses_data.append({
'question': question,
'numeric_value': float(numeric_value),
'text_value': '',
'choice_value': ''
})
elif question.question_type == 'yes_no':
choice_value = request.POST.get(field_name)
if choice_value:
# Convert yes/no to numeric for scoring
numeric_value = 5.0 if choice_value == 'yes' else 1.0
responses_data.append({
'question': question,
'numeric_value': numeric_value,
'text_value': '',
'choice_value': choice_value
})
elif question.question_type == 'multiple_choice':
choice_value = request.POST.get(field_name)
if choice_value:
responses_data.append({
'question': question,
'numeric_value': None,
'text_value': '',
'choice_value': choice_value
})
elif question.question_type in ['text', 'textarea']:
text_value = request.POST.get(field_name, '')
if text_value:
responses_data.append({
'question': question,
'numeric_value': None,
'text_value': text_value,
'choice_value': ''
})
# If validation errors, show form again
if errors:
context = {
'survey': survey,
'questions': questions,
'errors': errors,
'language': language,
}
return render(request, 'surveys/public_form.html', context)
# Save responses
for response_data in responses_data:
SurveyResponse.objects.update_or_create(
survey_instance=survey,
question=response_data['question'],
defaults={
'numeric_value': response_data['numeric_value'],
'text_value': response_data['text_value'],
'choice_value': response_data['choice_value'],
}
)
# Update survey status
survey.status = 'completed'
survey.completed_at = timezone.now()
# Calculate time spent (from opened_at to completed_at)
if survey.opened_at:
time_spent = (timezone.now() - survey.opened_at).total_seconds()
survey.time_spent_seconds = int(time_spent)
survey.save(update_fields=['status', 'completed_at', 'time_spent_seconds'])
# Track completion event
SurveyTracking.track_event(
survey,
'survey_completed',
total_time_spent=survey.time_spent_seconds,
user_agent=user_agent_str[:500] if user_agent_str else '',
ip_address=ip_address,
metadata={
'response_count': len(responses_data),
'language': language,
}
)
# Calculate score
score = survey.calculate_score()
# Log completion
AuditService.log_event(
event_type='survey_completed',
description=f"Survey completed: {survey.survey_template.name}",
user=None,
content_object=survey,
metadata={
'score': float(score) if score else None,
'is_negative': survey.is_negative,
'response_count': len(responses_data)
}
)
# Create PX action if negative
if survey.is_negative:
from apps.surveys.tasks import create_action_from_negative_survey
create_action_from_negative_survey.delay(str(survey.id))
# Redirect to thank you page
return redirect('surveys:thank_you', token=token)
# GET request - show form
# Determine language from query param or browser
language = request.GET.get('lang', 'en')
context = {
'survey': survey,
'questions': questions,
'language': language,
'total_questions': questions.count(),
}
return render(request, 'surveys/public_form.html', context)
def thank_you(request, token):
"""Thank you page after survey completion"""
try:
survey = SurveyInstance.objects.select_related(
'survey_template',
'patient'
).get(
access_token=token,
status='completed'
)
except SurveyInstance.DoesNotExist:
return render(request, 'surveys/invalid_token.html', {
'error': 'not_found'
})
language = request.GET.get('lang', 'en')
context = {
'survey': survey,
'language': language,
}
return render(request, 'surveys/thank_you.html', context)
def invalid_token(request):
"""Invalid or expired token page"""
return render(request, 'surveys/invalid_token.html')
@csrf_exempt
@require_http_methods(["POST"])
def track_survey_start(request, token):
"""
API endpoint to track when patient starts answering survey.
Called via AJAX when patient first interacts with the form.
Updates status from 'viewed' to 'in_progress'.
"""
try:
# Get survey instance
survey = SurveyInstance.objects.get(
access_token=token,
status__in=['viewed', 'in_progress'],
token_expires_at__gt=timezone.now()
)
# Only update if not already in_progress
if survey.status == 'viewed':
survey.status = 'in_progress'
survey.save(update_fields=['status'])
# Track survey started event
SurveyTracking.track_event(
survey,
'survey_started',
user_agent=request.META.get('HTTP_USER_AGENT', '')[:500] if request.META.get('HTTP_USER_AGENT') else '',
ip_address=request.META.get('REMOTE_ADDR', ''),
metadata={
'referrer': request.META.get('HTTP_REFERER', ''),
}
)
return JsonResponse({
'status': 'success',
'survey_status': survey.status,
})
except SurveyInstance.DoesNotExist:
return JsonResponse({
'status': 'error',
'message': 'Survey not found or invalid token'
}, status=404)