HH/apps/integrations/tasks.py

195 lines
6.9 KiB
Python

"""
Integrations Celery tasks
This module contains the core event processing logic that:
1. Processes inbound events from external systems
2. Finds matching journey instances
3. Completes journey stages
4. Triggers survey creation
"""
import logging
from celery import shared_task
from django.db import transaction
logger = logging.getLogger('apps.integrations')
@shared_task(bind=True, max_retries=3)
def process_inbound_event(self, event_id):
"""
Process an inbound integration event.
This is the core event processing task that:
1. Finds the journey instance by encounter_id
2. Finds the matching stage by trigger_event_code
3. Completes the stage
4. Creates survey instance if configured
5. Logs audit events
Args:
event_id: UUID of the InboundEvent to process
Returns:
dict: Processing result with status and details
"""
from apps.core.services import create_audit_log
from apps.integrations.models import InboundEvent
from apps.journeys.models import PatientJourneyInstance, PatientJourneyStageInstance, StageStatus
from apps.organizations.models import Department, Staff
try:
# Get the event
event = InboundEvent.objects.get(id=event_id)
event.mark_processing()
logger.info(f"Processing event {event.id}: {event.event_code} for encounter {event.encounter_id}")
# Find journey instance by encounter_id
try:
journey_instance = PatientJourneyInstance.objects.select_related(
'journey_template', 'patient', 'hospital'
).get(encounter_id=event.encounter_id)
except PatientJourneyInstance.DoesNotExist:
error_msg = f"No journey instance found for encounter {event.encounter_id}"
logger.warning(error_msg)
event.mark_ignored(error_msg)
return {'status': 'ignored', 'reason': error_msg}
# Find matching stage by trigger_event_code
matching_stages = journey_instance.stage_instances.filter(
stage_template__trigger_event_code=event.event_code,
status__in=[StageStatus.PENDING, StageStatus.IN_PROGRESS]
).select_related('stage_template')
if not matching_stages.exists():
error_msg = f"No pending stage found with trigger {event.event_code}"
logger.warning(error_msg)
event.mark_ignored(error_msg)
return {'status': 'ignored', 'reason': error_msg}
# Get the first matching stage
stage_instance = matching_stages.first()
# Extract staff and department from event payload
staff = None
department = None
if event.physician_license:
try:
staff = Staff.objects.get(
license_number=event.physician_license,
hospital=journey_instance.hospital
)
except Staff.DoesNotExist:
logger.warning(f"Staff member not found with license: {event.physician_license}")
if event.department_code:
try:
department = Department.objects.get(
code=event.department_code,
hospital=journey_instance.hospital
)
except Department.DoesNotExist:
logger.warning(f"Department not found: {event.department_code}")
# Complete the stage
with transaction.atomic():
success = stage_instance.complete(
event=event,
staff=staff,
department=department,
metadata=event.payload_json
)
if success:
# Log stage completion
create_audit_log(
event_type='stage_completed',
description=f"Stage {stage_instance.stage_template.name} completed for encounter {event.encounter_id}",
content_object=stage_instance,
metadata={
'event_code': event.event_code,
'stage_name': stage_instance.stage_template.name,
'journey_type': journey_instance.journey_template.journey_type
}
)
# Check if survey should be sent
if stage_instance.stage_template.auto_send_survey and stage_instance.stage_template.survey_template:
# Queue survey creation task with delay
from apps.surveys.tasks import create_and_send_survey
delay_seconds = stage_instance.stage_template.survey_delay_hours * 3600
logger.info(
f"Queuing survey for stage {stage_instance.stage_template.name} "
f"(delay: {stage_instance.stage_template.survey_delay_hours}h)"
)
create_and_send_survey.apply_async(
args=[str(stage_instance.id)],
countdown=delay_seconds
)
# Mark event as processed
event.mark_processed()
logger.info(
f"Successfully processed event {event.id}: "
f"Completed stage {stage_instance.stage_template.name}"
)
return {
'status': 'processed',
'stage_completed': stage_instance.stage_template.name,
'journey_completion': journey_instance.get_completion_percentage()
}
else:
error_msg = "Failed to complete stage"
event.mark_failed(error_msg)
return {'status': 'failed', 'reason': error_msg}
except InboundEvent.DoesNotExist:
error_msg = f"Event {event_id} not found"
logger.error(error_msg)
return {'status': 'error', 'reason': error_msg}
except Exception as e:
error_msg = f"Error processing event: {str(e)}"
logger.error(error_msg, exc_info=True)
try:
event.mark_failed(error_msg)
except:
pass
# Retry the task
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
@shared_task
def process_pending_events():
"""
Periodic task to process pending events.
This task runs every minute (configured in config/celery.py)
and processes all pending events.
"""
from apps.integrations.models import InboundEvent
pending_events = InboundEvent.objects.filter(
status='pending'
).order_by('received_at')[:100] # Process max 100 at a time
processed_count = 0
for event in pending_events:
# Queue individual event for processing
process_inbound_event.delay(str(event.id))
processed_count += 1
if processed_count > 0:
logger.info(f"Queued {processed_count} pending events for processing")
return {'queued': processed_count}