HH/apps/simulator/views.py
2026-01-24 15:27:30 +03:00

672 lines
23 KiB
Python

"""
Simulator views for testing external notification APIs.
This module provides API endpoints that:
- Simulate external email and SMS services
- Receive and process HIS journey events
- Create journeys, send surveys, and trigger notifications
"""
import logging
from datetime import datetime, timedelta
from django.conf import settings
from django.core.mail import send_mail
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from rest_framework.decorators import api_view
from rest_framework.response import Response
from rest_framework import status
import json
from .serializers import HISJourneyEventSerializer, HISJourneyEventListSerializer
from apps.journeys.models import (
JourneyType,
PatientJourneyTemplate,
PatientJourneyInstance,
PatientJourneyStageInstance,
StageStatus
)
from apps.organizations.models import Hospital, Department, Patient
from apps.surveys.models import SurveyTemplate, SurveyInstance
from apps.notifications.services import NotificationService
logger = logging.getLogger(__name__)
# Request counter for tracking
request_counter = {'email': 0, 'sms': 0}
# Request history (last 10 requests)
request_history = []
def log_simulator_request(channel, payload, status):
"""Log simulator request to history and file."""
request_id = len(request_history) + 1
entry = {
'id': request_id,
'channel': channel,
'timestamp': datetime.now().isoformat(),
'status': status,
'payload': payload
}
request_history.append(entry)
# Keep only last 10 requests
if len(request_history) > 10:
request_history.pop(0)
# Log to file
logger.info(f"[Simulator] {channel.upper()} Request #{request_id}: {status}")
@csrf_exempt
@require_http_methods(["POST"])
def email_simulator(request):
"""
Simulate external email API endpoint.
Accepts POST request with JSON payload:
{
"to": "recipient@example.com",
"subject": "Email subject",
"message": "Plain text message",
"html_message": "Optional HTML content"
}
Sends real email via Django SMTP and returns 200 OK.
"""
request_counter['email'] += 1
try:
# Parse request body
data = json.loads(request.body)
# Validate required fields
required_fields = ['to', 'subject', 'message']
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
response = {
'success': False,
'error': f'Missing required fields: {", ".join(missing_fields)}'
}
log_simulator_request('email', data, 'failed')
return JsonResponse(response, status=400)
# Extract fields
to_email = data['to']
subject = data['subject']
message = data['message']
html_message = data.get('html_message', None)
# Log the request
logger.info(f"[Email Simulator] Sending email to {to_email}: {subject}")
# Print formatted email to terminal
print(f"\n{'' + ''*68 + ''}")
print(f"{' ' * 15}📧 EMAIL SIMULATOR{' ' * 34}")
print(f"{''*68}")
print(f"║ Request #: {request_counter['email']:<52}")
print(f"{''*68}")
print(f"║ To: {to_email:<52}")
print(f"║ Subject: {subject[:52]:<52}")
print(f"{''*68}")
print(f"║ Message:{' '*55}")
# Word wrap message
words = message.split()
line = ""
for word in words:
if len(line) + len(word) + 1 > 68:
print(f"{line:<68}")
line = "" + word
else:
if line == "":
line += word
else:
line += " " + word
# Print last line if not empty
if line != "":
print(f"{line:<68}")
# Print HTML section if present
if html_message:
print(f"{''*68}")
print(f"║ HTML Message:{' '*48}")
html_preview = html_message[:200].replace('\n', ' ')
print(f"{html_preview:<66}")
if len(html_message) > 200:
print(f"║ ... ({len(html_message)} total characters){' '*30}")
print(f"{''*68}\n")
# Send real email via Django SMTP
try:
send_mail(
subject=subject,
message=message,
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=[to_email],
html_message=html_message,
fail_silently=False
)
logger.info(f"[Email Simulator] Email sent via SMTP to {to_email}")
email_status = 'sent'
except Exception as email_error:
logger.error(f"[Email Simulator] SMTP Error: {str(email_error)}")
# Log as 'partial' since we displayed it but failed to send
email_status = 'partial'
# Return error response
response = {
'success': False,
'error': f'Email displayed but failed to send via SMTP: {str(email_error)}',
'data': {
'to': to_email,
'subject': subject,
'message_length': len(message),
'has_html': html_message is not None,
'displayed': True,
'sent': False
}
}
log_simulator_request('email', data, email_status)
return JsonResponse(response, status=500)
# Log success
logger.info(f"[Email Simulator] Email sent successfully to {to_email}")
log_simulator_request('email', data, email_status)
response = {
'success': True,
'message': 'Email sent successfully',
'data': {
'to': to_email,
'subject': subject,
'message_length': len(message),
'has_html': html_message is not None
}
}
return JsonResponse(response, status=200)
except json.JSONDecodeError:
response = {
'success': False,
'error': 'Invalid JSON format'
}
log_simulator_request('email', {}, 'failed')
return JsonResponse(response, status=400)
except Exception as e:
logger.error(f"[Email Simulator] Error: {str(e)}")
response = {
'success': False,
'error': str(e)
}
log_simulator_request('email', data if 'data' in locals() else {}, 'failed')
return JsonResponse(response, status=500)
@csrf_exempt
@require_http_methods(["POST"])
def sms_simulator(request):
"""
Simulate external SMS API endpoint.
Accepts POST request with JSON payload:
{
"to": "+966501234567",
"message": "SMS message text"
}
Prints SMS to terminal with formatted output and returns 200 OK.
"""
request_counter['sms'] += 1
try:
# Parse request body
data = json.loads(request.body)
# Validate required fields
required_fields = ['to', 'message']
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
response = {
'success': False,
'error': f'Missing required fields: {", ".join(missing_fields)}'
}
log_simulator_request('sms', data, 'failed')
return JsonResponse(response, status=400)
# Extract fields
to_phone = data['to']
message = data['message']
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Log the request
logger.info(f"[SMS Simulator] Sending SMS to {to_phone}")
# Print formatted SMS to terminal
print(f"\n{'' + ''*68 + ''}")
print(f"{' ' * 15}📱 SMS SIMULATOR{' ' * 35}")
print(f"{''*68}")
print(f"║ Request #: {request_counter['sms']:<52}")
print(f"{''*68}")
print(f"║ To: {to_phone:<52}")
print(f"║ Time: {timestamp:<52}")
print(f"{''*68}")
print(f"║ Message:{' '*55}")
# Word wrap message
words = message.split()
line = ""
for word in words:
if len(line) + len(word) + 1 > 68:
print(f"{line:<68}")
line = "" + word
else:
if line == "":
line += word
else:
line += " " + word
# Print last line if not empty
if line != "":
print(f"{line:<68}")
print(f"{''*68}\n")
# Log success
logger.info(f"[SMS Simulator] SMS sent to {to_phone}: {message[:50]}...")
log_simulator_request('sms', data, 'sent')
response = {
'success': True,
'message': 'SMS sent successfully',
'data': {
'to': to_phone,
'message_length': len(message)
}
}
return JsonResponse(response, status=200)
except json.JSONDecodeError:
response = {
'success': False,
'error': 'Invalid JSON format'
}
log_simulator_request('sms', {}, 'failed')
return JsonResponse(response, status=400)
except Exception as e:
logger.error(f"[SMS Simulator] Error: {str(e)}")
response = {
'success': False,
'error': str(e)
}
log_simulator_request('sms', data if 'data' in locals() else {}, 'failed')
return JsonResponse(response, status=500)
@csrf_exempt
@require_http_methods(["GET"])
def health_check(request):
"""
Health check endpoint for simulator.
Returns simulator status and statistics.
"""
return JsonResponse({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'statistics': {
'total_requests': request_counter['email'] + request_counter['sms'],
'email_requests': request_counter['email'],
'sms_requests': request_counter['sms']
},
'recent_requests': request_history[-5:] # Last 5 requests
}, status=200)
@csrf_exempt
@require_http_methods(["GET"])
def reset_simulator(request):
"""
Reset simulator statistics and history.
Clears request counter and history.
"""
global request_counter, request_history
request_counter = {'email': 0, 'sms': 0}
request_history = []
logger.info("[Simulator] Reset statistics and history")
return JsonResponse({
'success': True,
'message': 'Simulator reset successfully'
}, status=200)
from rest_framework.permissions import AllowAny
from rest_framework.decorators import permission_classes
@api_view(['POST'])
@permission_classes([AllowAny])
def his_events_handler(request):
"""
HIS Events API Endpoint
Receives patient journey events from HIS simulator:
- Creates or updates patient records
- Creates journey instances
- Processes stage completions
- Sends post-discharge surveys when journey is complete
Expected payload:
{
"events": [
{
"encounter_id": "ENC-2024-001",
"mrn": "MRN-12345",
"national_id": "1234567890",
"first_name": "Ahmed",
"last_name": "Mohammed",
"phone": "+966501234567",
"email": "patient@example.com",
"event_type": "OPD_STAGE_1_REGISTRATION",
"timestamp": "2024-01-20T10:30:00Z",
"visit_type": "opd",
"department": "Cardiology",
"hospital_code": "ALH-main"
}
]
}
"""
try:
# Validate request data
serializer = HISJourneyEventListSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{'error': 'Invalid data', 'details': serializer.errors},
status=status.HTTP_400_BAD_REQUEST
)
events_data = serializer.validated_data['events']
# Process each event
results = []
survey_invitations_sent = []
for event_data in events_data:
result = process_his_event(event_data)
results.append(result)
# Track if survey was sent
if result.get('survey_sent'):
survey_invitations_sent.append({
'encounter_id': event_data['encounter_id'],
'survey_id': result['survey_id'],
'survey_url': result['survey_url']
})
return Response({
'success': True,
'message': f'Processed {len(events_data)} events successfully',
'results': results,
'surveys_sent': len(survey_invitations_sent),
'survey_details': survey_invitations_sent
}, status=status.HTTP_200_OK)
except Exception as e:
logger.error(f"[HIS Events Handler] Error: {str(e)}", exc_info=True)
return Response(
{'error': 'Internal server error', 'details': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def process_his_event(event_data):
"""
Process a single HIS journey event
Steps:
1. Get or create patient
2. Get or create journey instance
3. Find and update stage instance
4. Check if journey is complete and send survey if needed
"""
try:
# 1. Get or create patient
patient = get_or_create_patient(event_data)
# 2. Get or create journey instance
journey_instance = get_or_create_journey_instance(event_data, patient)
# 3. Find and update stage instance
stage_instance = update_stage_instance(journey_instance, event_data)
result = {
'encounter_id': event_data['encounter_id'],
'patient_id': str(patient.id) if patient else None,
'journey_id': str(journey_instance.id),
'stage_id': str(stage_instance.id) if stage_instance else None,
'stage_status': stage_instance.status if stage_instance else 'not_found',
'survey_sent': False
}
# 4. Check if journey is complete and send survey
if journey_instance.is_complete():
survey_result = send_post_discharge_survey(journey_instance, patient)
if survey_result:
result.update(survey_result)
return result
except Exception as e:
logger.error(f"[Process HIS Event] Error for encounter {event_data.get('encounter_id')}: {str(e)}", exc_info=True)
return {
'encounter_id': event_data.get('encounter_id'),
'error': str(e),
'success': False
}
def get_or_create_patient(event_data):
"""Get or create patient from event data"""
try:
patient, created = Patient.objects.get_or_create(
mrn=event_data['mrn'],
defaults={
'first_name': event_data['first_name'],
'last_name': event_data['last_name'],
'phone': event_data['phone'],
'email': event_data['email'],
'national_id': event_data.get('national_id', ''),
}
)
logger.info(f"{'Created' if created else 'Found'} patient {patient.mrn}: {patient.get_full_name()}")
return patient
except Exception as e:
logger.error(f"Error creating patient: {str(e)}")
raise
def get_or_create_journey_instance(event_data, patient):
"""Get or create journey instance for this encounter"""
try:
# Get hospital from event data or default to ALH-main
hospital_code = event_data.get('hospital_code', 'ALH-main')
hospital = Hospital.objects.filter(code=hospital_code).first()
if not hospital:
raise ValueError(f"Hospital with code '{hospital_code}' not found. Please run seed_journey_surveys command first.")
# Map visit_type to JourneyType
journey_type_map = {
'ems': JourneyType.EMS,
'inpatient': JourneyType.INPATIENT,
'opd': JourneyType.OPD
}
journey_type = journey_type_map.get(event_data['visit_type'])
if not journey_type:
raise ValueError(f"Invalid visit_type: {event_data['visit_type']}")
# Get journey template
journey_template = PatientJourneyTemplate.objects.filter(
hospital=hospital,
journey_type=journey_type,
is_active=True
).first()
if not journey_template:
raise ValueError(f"No active journey template found for {journey_type}")
# Get or create journey instance
journey_instance, created = PatientJourneyInstance.objects.get_or_create(
encounter_id=event_data['encounter_id'],
defaults={
'journey_template': journey_template,
'patient': patient,
'hospital': hospital,
'status': 'active'
}
)
# Create stage instances if this is a new journey
if created:
for stage_template in journey_template.stages.filter(is_active=True):
PatientJourneyStageInstance.objects.create(
journey_instance=journey_instance,
stage_template=stage_template,
status=StageStatus.PENDING
)
logger.info(f"Created new journey instance {journey_instance.id} with {journey_template.stages.count()} stages")
return journey_instance
except Exception as e:
logger.error(f"Error creating journey instance: {str(e)}")
raise
def update_stage_instance(journey_instance, event_data):
"""Find and update stage instance based on event_type"""
try:
# Find stage template by trigger_event_code
stage_template = journey_instance.journey_template.stages.filter(
trigger_event_code=event_data['event_type'],
is_active=True
).first()
if not stage_template:
logger.warning(f"No stage template found for event_type: {event_data['event_type']}")
return None
# Get or create stage instance
stage_instance, created = PatientJourneyStageInstance.objects.get_or_create(
journey_instance=journey_instance,
stage_template=stage_template,
defaults={
'status': StageStatus.PENDING
}
)
# Complete the stage
if stage_instance.status != StageStatus.COMPLETED:
from django.utils import timezone
stage_instance.status = StageStatus.COMPLETED
stage_instance.completed_at = timezone.now()
stage_instance.save()
logger.info(f"Completed stage {stage_template.name} for journey {journey_instance.encounter_id}")
else:
logger.info(f"Stage {stage_template.name} already completed for journey {journey_instance.encounter_id}")
return stage_instance
except Exception as e:
logger.error(f"Error updating stage instance: {str(e)}")
raise
def send_post_discharge_survey(journey_instance, patient):
"""
Send post-discharge survey to patient
Creates a survey instance and sends invitation via email and SMS
"""
try:
# Check if journey template has post-discharge survey enabled
journey_template = journey_instance.journey_template
if not journey_template.send_post_discharge_survey:
return None
# Check if survey already sent for this journey
existing_survey = SurveyInstance.objects.filter(
journey_instance=journey_instance
).first()
if existing_survey:
logger.info(f"Survey already sent for journey {journey_instance.encounter_id}")
return None
# Get survey template from journey template
# Use first stage's survey template as the comprehensive survey
first_stage = journey_template.stages.filter(is_active=True).order_by('order').first()
if not first_stage or not first_stage.survey_template:
logger.warning(f"No survey template found for journey {journey_instance.encounter_id}")
return None
survey_template = first_stage.survey_template
# Create survey instance
survey_instance = SurveyInstance.objects.create(
survey_template=survey_template,
patient=patient,
journey_instance=journey_instance,
hospital=journey_instance.hospital,
delivery_channel='email', # Primary channel is email
status='pending',
recipient_email=patient.email,
recipient_phone=patient.phone
)
logger.info(f"Created survey instance {survey_instance.id} for journey {journey_instance.encounter_id}")
# Send survey invitation via email
try:
email_log = NotificationService.send_survey_invitation(survey_instance, language='en')
logger.info(f"Survey invitation sent via email to {patient.email}")
except Exception as e:
logger.error(f"Error sending survey email: {str(e)}")
# Also send via SMS (as backup)
try:
sms_log = NotificationService.send_sms(
phone=patient.phone,
message=f"Your experience survey is ready: {survey_instance.get_survey_url()}",
related_object=survey_instance,
metadata={'survey_id': str(survey_instance.id)}
)
logger.info(f"Survey invitation sent via SMS to {patient.phone}")
except Exception as e:
logger.error(f"Error sending survey SMS: {str(e)}")
# Return survey details
return {
'survey_sent': True,
'survey_id': str(survey_instance.id),
'survey_url': survey_instance.get_survey_url(),
'delivery_channel': 'email_and_sms'
}
except Exception as e:
logger.error(f"Error sending post-discharge survey: {str(e)}", exc_info=True)
return {
'survey_sent': False,
'error': str(e)
}