301 lines
10 KiB
Python
301 lines
10 KiB
Python
#!/usr/bin/env python
|
|
"""Test that chart data serializes correctly to JSON"""
|
|
import os
|
|
import sys
|
|
import django
|
|
|
|
# Setup Django
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'PX360.settings')
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
django.setup()
|
|
|
|
import json
|
|
from apps.accounts.models import User
|
|
from apps.surveys.models import SurveyInstance, SurveyTemplate
|
|
|
|
def test_serialization():
|
|
"""Test that all chart data structures serialize correctly"""
|
|
print("=" * 80)
|
|
print("CHART DATA JSON SERIALIZATION TEST")
|
|
print("=" * 80)
|
|
|
|
# Get a test user
|
|
try:
|
|
user = User.objects.get(email='test.user@example.com')
|
|
print(f"\n✓ Testing as user: {user.email}")
|
|
print(f" Hospital: {user.hospital}")
|
|
print(f" PX Admin: {user.is_px_admin()}")
|
|
except User.DoesNotExist:
|
|
print("\n✗ Test user not found")
|
|
return False
|
|
|
|
# Get stats queryset
|
|
stats_queryset = SurveyInstance.objects.select_related('survey_template')
|
|
|
|
if user.is_px_admin():
|
|
pass
|
|
elif user.hospital:
|
|
stats_queryset = stats_queryset.filter(survey_template__hospital=user.hospital)
|
|
else:
|
|
stats_queryset = stats_queryset.none()
|
|
|
|
total_count = stats_queryset.count()
|
|
print(f"\n✓ Total surveys in queryset: {total_count}")
|
|
|
|
if total_count == 0:
|
|
print("\n✗ No surveys found - cannot test charts")
|
|
return False
|
|
|
|
# Test Score Distribution
|
|
print("\n" + "=" * 80)
|
|
print("1. SCORE DISTRIBUTION")
|
|
print("=" * 80)
|
|
score_distribution = []
|
|
score_ranges = [
|
|
('1-2', 1, 2),
|
|
('2-3', 2, 3),
|
|
('3-4', 3, 4),
|
|
('4-5', 4, 5),
|
|
]
|
|
|
|
for label, min_score, max_score in score_ranges:
|
|
if max_score == 5:
|
|
count = stats_queryset.filter(
|
|
total_score__gte=min_score,
|
|
total_score__lte=max_score
|
|
).count()
|
|
else:
|
|
count = stats_queryset.filter(
|
|
total_score__gte=min_score,
|
|
total_score__lt=max_score
|
|
).count()
|
|
score_distribution.append({
|
|
'range': label,
|
|
'count': count,
|
|
'percentage': round((count / total_count * 100) if total_count > 0 else 0, 1)
|
|
})
|
|
|
|
try:
|
|
score_json = json.dumps(score_distribution)
|
|
print(f"✓ Score distribution JSON: {len(score_json)} chars")
|
|
print(f" Preview: {score_json[:100]}...")
|
|
print(f" Has data: {any(item['count'] > 0 for item in score_distribution)}")
|
|
except Exception as e:
|
|
print(f"✗ Failed to serialize score distribution: {e}")
|
|
return False
|
|
|
|
# Test Engagement Funnel
|
|
print("\n" + "=" * 80)
|
|
print("2. ENGAGEMENT FUNNEL")
|
|
print("=" * 80)
|
|
sent_count = stats_queryset.filter(status__in=['sent', 'pending']).count()
|
|
completed_count = stats_queryset.filter(status='completed').count()
|
|
opened_count = stats_queryset.filter(open_count__gt=0).count()
|
|
in_progress_count = stats_queryset.filter(status='in_progress').count()
|
|
viewed_count = stats_queryset.filter(status='viewed').count()
|
|
|
|
engagement_funnel = [
|
|
{'stage': 'Sent/Pending', 'count': sent_count, 'percentage': 100},
|
|
{'stage': 'Viewed', 'count': viewed_count, 'percentage': round((viewed_count / sent_count * 100) if sent_count > 0 else 0, 1)},
|
|
{'stage': 'Opened', 'count': opened_count, 'percentage': round((opened_count / sent_count * 100) if sent_count > 0 else 0, 1)},
|
|
{'stage': 'In Progress', 'count': in_progress_count, 'percentage': round((in_progress_count / opened_count * 100) if opened_count > 0 else 0, 1)},
|
|
{'stage': 'Completed', 'count': completed_count, 'percentage': round((completed_count / sent_count * 100) if sent_count > 0 else 0, 1)},
|
|
]
|
|
|
|
try:
|
|
engagement_json = json.dumps(engagement_funnel)
|
|
print(f"✓ Engagement funnel JSON: {len(engagement_json)} chars")
|
|
print(f" Preview: {engagement_json[:100]}...")
|
|
print(f" Has data: {any(item['count'] > 0 for item in engagement_funnel)}")
|
|
except Exception as e:
|
|
print(f"✗ Failed to serialize engagement funnel: {e}")
|
|
return False
|
|
|
|
# Test Completion Time
|
|
print("\n" + "=" * 80)
|
|
print("3. COMPLETION TIME DISTRIBUTION")
|
|
print("=" * 80)
|
|
completed_surveys = stats_queryset.filter(
|
|
status='completed',
|
|
time_spent_seconds__isnull=False
|
|
)
|
|
|
|
completion_time_ranges = [
|
|
('< 1 min', 0, 60),
|
|
('1-5 min', 60, 300),
|
|
('5-10 min', 300, 600),
|
|
('10-20 min', 600, 1200),
|
|
('20+ min', 1200, float('inf')),
|
|
]
|
|
|
|
completion_time_distribution = []
|
|
for label, min_seconds, max_seconds in completion_time_ranges:
|
|
if max_seconds == float('inf'):
|
|
count = completed_surveys.filter(time_spent_seconds__gte=min_seconds).count()
|
|
else:
|
|
count = completed_surveys.filter(
|
|
time_spent_seconds__gte=min_seconds,
|
|
time_spent_seconds__lt=max_seconds
|
|
).count()
|
|
|
|
completion_time_distribution.append({
|
|
'range': label,
|
|
'count': count,
|
|
'percentage': round((count / completed_count * 100) if completed_count > 0 else 0, 1)
|
|
})
|
|
|
|
try:
|
|
time_json = json.dumps(completion_time_distribution)
|
|
print(f"✓ Completion time JSON: {len(time_json)} chars")
|
|
print(f" Preview: {time_json[:100]}...")
|
|
print(f" Has data: {any(item['count'] > 0 for item in completion_time_distribution)}")
|
|
except Exception as e:
|
|
print(f"✗ Failed to serialize completion time: {e}")
|
|
return False
|
|
|
|
# Test Device Distribution
|
|
print("\n" + "=" * 80)
|
|
print("4. DEVICE DISTRIBUTION")
|
|
print("=" * 80)
|
|
from apps.surveys.models import SurveyTracking
|
|
from django.db.models import Count
|
|
|
|
tracking_events = SurveyTracking.objects.filter(
|
|
survey_instance__in=stats_queryset
|
|
).values('device_type').annotate(
|
|
count=Count('id')
|
|
).order_by('-count')
|
|
|
|
device_mapping = {
|
|
'mobile': 'Mobile',
|
|
'tablet': 'Tablet',
|
|
'desktop': 'Desktop',
|
|
}
|
|
|
|
device_distribution = []
|
|
for entry in tracking_events:
|
|
device_key = entry['device_type']
|
|
device_name = device_mapping.get(device_key, device_key.title())
|
|
count = entry['count']
|
|
percentage = round((count / tracking_events.count() * 100) if tracking_events.count() > 0 else 0, 1)
|
|
|
|
device_distribution.append({
|
|
'type': device_key,
|
|
'name': device_name,
|
|
'count': count,
|
|
'percentage': percentage
|
|
})
|
|
|
|
try:
|
|
device_json = json.dumps(device_distribution)
|
|
print(f"✓ Device distribution JSON: {len(device_json)} chars")
|
|
print(f" Preview: {device_json[:100]}...")
|
|
print(f" Has data: {len(device_distribution) > 0}")
|
|
except Exception as e:
|
|
print(f"✗ Failed to serialize device distribution: {e}")
|
|
return False
|
|
|
|
# Test Trend Data
|
|
print("\n" + "=" * 80)
|
|
print("5. 30-DAY TREND")
|
|
print("=" * 80)
|
|
from django.utils import timezone
|
|
import datetime
|
|
from django.db.models.functions import TruncDate
|
|
from django.db.models import Q
|
|
|
|
thirty_days_ago = timezone.now() - datetime.timedelta(days=30)
|
|
|
|
trend_queryset = stats_queryset.filter(
|
|
sent_at__gte=thirty_days_ago
|
|
)
|
|
|
|
if not trend_queryset.exists():
|
|
trend_queryset = stats_queryset.filter(
|
|
created_at__gte=thirty_days_ago
|
|
).annotate(
|
|
date=TruncDate('created_at')
|
|
)
|
|
else:
|
|
trend_queryset = trend_queryset.annotate(
|
|
date=TruncDate('sent_at')
|
|
)
|
|
|
|
trend_data = trend_queryset.values('date').annotate(
|
|
sent=Count('id'),
|
|
completed=Count('id', filter=Q(status='completed'))
|
|
).order_by('date')
|
|
|
|
trend_labels = []
|
|
trend_sent = []
|
|
trend_completed = []
|
|
|
|
for entry in trend_data:
|
|
if entry['date']:
|
|
trend_labels.append(entry['date'].strftime('%Y-%m-%d'))
|
|
trend_sent.append(entry['sent'])
|
|
trend_completed.append(entry['completed'])
|
|
|
|
try:
|
|
labels_json = json.dumps(trend_labels)
|
|
sent_json = json.dumps(trend_sent)
|
|
completed_json = json.dumps(trend_completed)
|
|
|
|
print(f"✓ Trend labels JSON: {len(labels_json)} chars ({len(trend_labels)} days)")
|
|
print(f"✓ Trend sent JSON: {len(sent_json)} chars")
|
|
print(f"✓ Trend completed JSON: {len(completed_json)} chars")
|
|
print(f" Has data: {len(trend_labels) > 0}")
|
|
except Exception as e:
|
|
print(f"✗ Failed to serialize trend data: {e}")
|
|
return False
|
|
|
|
# Test Survey Types
|
|
print("\n" + "=" * 80)
|
|
print("6. SURVEY TYPES")
|
|
print("=" * 80)
|
|
|
|
survey_type_data = stats_queryset.values(
|
|
'survey_template__survey_type'
|
|
).annotate(
|
|
count=Count('id')
|
|
).order_by('-count')
|
|
|
|
survey_type_mapping = {
|
|
'stage': 'Journey Stage',
|
|
'complaint_resolution': 'Complaint Resolution',
|
|
'general': 'General',
|
|
'nps': 'NPS',
|
|
}
|
|
|
|
survey_types = []
|
|
for entry in survey_type_data:
|
|
type_key = entry['survey_template__survey_type']
|
|
type_name = survey_type_mapping.get(type_key, type_key.title())
|
|
count = entry['count']
|
|
percentage = round((count / total_count * 100) if total_count > 0 else 0, 1)
|
|
|
|
survey_types.append({
|
|
'type': type_key,
|
|
'name': type_name,
|
|
'count': count,
|
|
'percentage': percentage
|
|
})
|
|
|
|
try:
|
|
types_json = json.dumps(survey_types)
|
|
print(f"✓ Survey types JSON: {len(types_json)} chars")
|
|
print(f" Preview: {types_json[:100]}...")
|
|
print(f" Has data: {len(survey_types) > 0}")
|
|
except Exception as e:
|
|
print(f"✗ Failed to serialize survey types: {e}")
|
|
return False
|
|
|
|
print("\n" + "=" * 80)
|
|
print("✓ ALL TESTS PASSED - Chart data serializes correctly!")
|
|
print("=" * 80)
|
|
return True
|
|
|
|
if __name__ == '__main__':
|
|
success = test_serialization()
|
|
sys.exit(0 if success else 1)
|