215 lines
8.0 KiB
Python
215 lines
8.0 KiB
Python
"""
|
|
Integrations Celery tasks
|
|
|
|
This module contains the core event processing logic that:
|
|
1. Processes inbound events from external systems
|
|
2. Finds matching journey instances
|
|
3. Completes journey stages
|
|
4. Triggers survey creation
|
|
"""
|
|
import logging
|
|
|
|
from celery import shared_task
|
|
from django.db import transaction
|
|
from django.utils import timezone
|
|
|
|
logger = logging.getLogger('apps.integrations')
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def process_inbound_event(self, event_id):
|
|
"""
|
|
Process an inbound integration event.
|
|
|
|
This is the core event processing task that:
|
|
1. Finds the journey instance by encounter_id
|
|
2. Finds the matching stage by trigger_event_code
|
|
3. Completes the stage
|
|
4. Creates survey instance if configured
|
|
5. Logs audit events
|
|
|
|
Args:
|
|
event_id: UUID of the InboundEvent to process
|
|
|
|
Returns:
|
|
dict: Processing result with status and details
|
|
"""
|
|
from apps.core.services import create_audit_log
|
|
from apps.integrations.models import InboundEvent
|
|
from apps.journeys.models import PatientJourneyInstance, PatientJourneyStageInstance, StageStatus
|
|
from apps.organizations.models import Department, Staff
|
|
|
|
try:
|
|
# Get the event
|
|
event = InboundEvent.objects.get(id=event_id)
|
|
event.mark_processing()
|
|
|
|
logger.info(f"Processing event {event.id}: {event.event_code} for encounter {event.encounter_id}")
|
|
|
|
# Find journey instance by encounter_id
|
|
try:
|
|
journey_instance = PatientJourneyInstance.objects.select_related(
|
|
'journey_template', 'patient', 'hospital'
|
|
).get(encounter_id=event.encounter_id)
|
|
except PatientJourneyInstance.DoesNotExist:
|
|
error_msg = f"No journey instance found for encounter {event.encounter_id}"
|
|
logger.warning(error_msg)
|
|
event.mark_ignored(error_msg)
|
|
return {'status': 'ignored', 'reason': error_msg}
|
|
|
|
# Find matching stage by trigger_event_code
|
|
matching_stages = journey_instance.stage_instances.filter(
|
|
stage_template__trigger_event_code=event.event_code,
|
|
status__in=[StageStatus.PENDING, StageStatus.IN_PROGRESS]
|
|
).select_related('stage_template')
|
|
|
|
if not matching_stages.exists():
|
|
error_msg = f"No pending stage found with trigger {event.event_code}"
|
|
logger.warning(error_msg)
|
|
event.mark_ignored(error_msg)
|
|
return {'status': 'ignored', 'reason': error_msg}
|
|
|
|
# Get the first matching stage
|
|
stage_instance = matching_stages.first()
|
|
|
|
# Extract staff and department from event payload
|
|
staff = None
|
|
department = None
|
|
|
|
if event.physician_license:
|
|
try:
|
|
staff = Staff.objects.get(
|
|
license_number=event.physician_license,
|
|
hospital=journey_instance.hospital
|
|
)
|
|
except Staff.DoesNotExist:
|
|
logger.warning(f"Staff member not found with license: {event.physician_license}")
|
|
|
|
if event.department_code:
|
|
try:
|
|
department = Department.objects.get(
|
|
code=event.department_code,
|
|
hospital=journey_instance.hospital
|
|
)
|
|
except Department.DoesNotExist:
|
|
logger.warning(f"Department not found: {event.department_code}")
|
|
|
|
# Complete the stage
|
|
with transaction.atomic():
|
|
success = stage_instance.complete(
|
|
event=event,
|
|
staff=staff,
|
|
department=department,
|
|
metadata=event.payload_json
|
|
)
|
|
|
|
if success:
|
|
# Log stage completion
|
|
create_audit_log(
|
|
event_type='stage_completed',
|
|
description=f"Stage {stage_instance.stage_template.name} completed for encounter {event.encounter_id}",
|
|
content_object=stage_instance,
|
|
metadata={
|
|
'event_code': event.event_code,
|
|
'stage_name': stage_instance.stage_template.name,
|
|
'journey_type': journey_instance.journey_template.journey_type
|
|
}
|
|
)
|
|
|
|
# Check if this is a discharge event
|
|
if event.event_code.upper() == 'PATIENT_DISCHARGED':
|
|
logger.info(f"Discharge event received for encounter {event.encounter_id}")
|
|
|
|
# Mark journey as completed
|
|
journey_instance.status = 'completed'
|
|
journey_instance.completed_at = timezone.now()
|
|
journey_instance.save()
|
|
|
|
# Check if post-discharge survey is enabled
|
|
if journey_instance.journey_template.send_post_discharge_survey:
|
|
logger.info(
|
|
f"Post-discharge survey enabled for journey {journey_instance.id}. "
|
|
f"Will send in {journey_instance.journey_template.post_discharge_survey_delay_hours} hour(s)"
|
|
)
|
|
|
|
# Queue post-discharge survey creation task with delay
|
|
from apps.surveys.tasks import create_post_discharge_survey
|
|
delay_hours = journey_instance.journey_template.post_discharge_survey_delay_hours
|
|
delay_seconds = delay_hours * 3600
|
|
|
|
create_post_discharge_survey.apply_async(
|
|
args=[str(journey_instance.id)],
|
|
countdown=delay_seconds
|
|
)
|
|
|
|
logger.info(
|
|
f"Queued post-discharge survey for journey {journey_instance.id} "
|
|
f"(delay: {delay_hours}h)"
|
|
)
|
|
else:
|
|
logger.info(
|
|
f"Post-discharge survey disabled for journey {journey_instance.id}"
|
|
)
|
|
|
|
# Mark event as processed
|
|
event.mark_processed()
|
|
|
|
logger.info(
|
|
f"Successfully processed event {event.id}: "
|
|
f"Completed stage {stage_instance.stage_template.name}"
|
|
)
|
|
|
|
return {
|
|
'status': 'processed',
|
|
'stage_completed': stage_instance.stage_template.name,
|
|
'journey_completion': journey_instance.get_completion_percentage()
|
|
}
|
|
else:
|
|
error_msg = "Failed to complete stage"
|
|
event.mark_failed(error_msg)
|
|
return {'status': 'failed', 'reason': error_msg}
|
|
|
|
except InboundEvent.DoesNotExist:
|
|
error_msg = f"Event {event_id} not found"
|
|
logger.error(error_msg)
|
|
return {'status': 'error', 'reason': error_msg}
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error processing event: {str(e)}"
|
|
logger.error(error_msg, exc_info=True)
|
|
|
|
try:
|
|
event.mark_failed(error_msg)
|
|
except:
|
|
pass
|
|
|
|
# Retry the task
|
|
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
|
|
|
|
|
|
@shared_task
|
|
def process_pending_events():
|
|
"""
|
|
Periodic task to process pending events.
|
|
|
|
This task runs every minute (configured in config/celery.py)
|
|
and processes all pending events.
|
|
"""
|
|
from apps.integrations.models import InboundEvent
|
|
|
|
pending_events = InboundEvent.objects.filter(
|
|
status='pending'
|
|
).order_by('received_at')[:100] # Process max 100 at a time
|
|
|
|
processed_count = 0
|
|
|
|
for event in pending_events:
|
|
# Queue individual event for processing
|
|
process_inbound_event.delay(str(event.id))
|
|
processed_count += 1
|
|
|
|
if processed_count > 0:
|
|
logger.info(f"Queued {processed_count} pending events for processing")
|
|
|
|
return {'queued': processed_count}
|