751 lines
22 KiB
Python
751 lines
22 KiB
Python
"""
|
|
Integrations Celery tasks for external system communication.
|
|
|
|
This module contains tasks for NPHIES (insurance), ZATCA (e-invoicing),
|
|
Lab, and Radiology integrations.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, Optional, List
|
|
|
|
from celery import shared_task
|
|
from django.conf import settings
|
|
from django.utils import timezone
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ============================================================================
|
|
# NPHIES Integration Tasks (Saudi Insurance)
|
|
# ============================================================================
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def submit_nphies_eligibility(self, patient_id: str, insurance_id: str) -> Dict:
|
|
"""
|
|
Submit eligibility check to NPHIES.
|
|
|
|
Args:
|
|
patient_id: UUID of the patient
|
|
insurance_id: Insurance policy ID
|
|
|
|
Returns:
|
|
dict: Eligibility response
|
|
"""
|
|
try:
|
|
from core.models import Patient
|
|
|
|
patient = Patient.objects.get(id=patient_id)
|
|
|
|
# TODO: Implement actual NPHIES API call
|
|
# This is a placeholder for the integration
|
|
|
|
logger.info(f"NPHIES eligibility check for patient {patient.mrn}")
|
|
|
|
# Placeholder response
|
|
response = {
|
|
'status': 'success',
|
|
'eligible': True,
|
|
'coverage_type': 'FULL',
|
|
'message': 'Patient is eligible for coverage',
|
|
'timestamp': str(timezone.now()),
|
|
}
|
|
|
|
return response
|
|
|
|
except Exception as exc:
|
|
logger.error(f"NPHIES eligibility check failed: {exc}")
|
|
raise self.retry(exc=exc, countdown=300)
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def submit_nphies_prior_auth(self, appointment_id: str, services: list) -> Dict:
|
|
"""
|
|
Submit prior authorization request to NPHIES.
|
|
|
|
Args:
|
|
appointment_id: UUID of the appointment
|
|
services: List of services requiring authorization
|
|
|
|
Returns:
|
|
dict: Prior authorization response
|
|
"""
|
|
try:
|
|
from appointments.models import Appointment
|
|
|
|
appointment = Appointment.objects.select_related('patient').get(id=appointment_id)
|
|
|
|
# TODO: Implement actual NPHIES API call
|
|
|
|
logger.info(f"NPHIES prior auth for appointment {appointment_id}")
|
|
|
|
# Placeholder response
|
|
response = {
|
|
'status': 'success',
|
|
'authorization_number': f"AUTH-{timezone.now().strftime('%Y%m%d%H%M%S')}",
|
|
'approved': True,
|
|
'services': services,
|
|
'valid_until': str(timezone.now().date()),
|
|
'timestamp': str(timezone.now()),
|
|
}
|
|
|
|
return response
|
|
|
|
except Exception as exc:
|
|
logger.error(f"NPHIES prior auth failed: {exc}")
|
|
raise self.retry(exc=exc, countdown=300)
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def submit_nphies_claim(self, invoice_id: str) -> Dict:
|
|
"""
|
|
Submit insurance claim to NPHIES.
|
|
|
|
Args:
|
|
invoice_id: UUID of the invoice
|
|
|
|
Returns:
|
|
dict: Claim submission response
|
|
"""
|
|
try:
|
|
from finance.models import Invoice
|
|
|
|
invoice = Invoice.objects.select_related('patient', 'appointment').get(id=invoice_id)
|
|
|
|
# TODO: Implement actual NPHIES API call
|
|
|
|
logger.info(f"NPHIES claim submission for invoice {invoice.invoice_number}")
|
|
|
|
# Placeholder response
|
|
response = {
|
|
'status': 'success',
|
|
'claim_number': f"CLM-{timezone.now().strftime('%Y%m%d%H%M%S')}",
|
|
'submitted': True,
|
|
'amount_claimed': str(invoice.total_amount),
|
|
'timestamp': str(timezone.now()),
|
|
}
|
|
|
|
return response
|
|
|
|
except Exception as exc:
|
|
logger.error(f"NPHIES claim submission failed: {exc}")
|
|
raise self.retry(exc=exc, countdown=300)
|
|
|
|
|
|
# ============================================================================
|
|
# ZATCA Integration Tasks (Saudi E-Invoicing)
|
|
# ============================================================================
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def submit_zatca_invoice(self, invoice_id: str) -> Dict:
|
|
"""
|
|
Submit e-invoice to ZATCA (Saudi Tax Authority).
|
|
|
|
Args:
|
|
invoice_id: UUID of the invoice
|
|
|
|
Returns:
|
|
dict: ZATCA submission response
|
|
"""
|
|
try:
|
|
from finance.models import Invoice
|
|
|
|
invoice = Invoice.objects.select_related('patient').get(id=invoice_id)
|
|
|
|
# Only submit issued or paid invoices
|
|
if invoice.status not in ['ISSUED', 'PAID']:
|
|
logger.warning(f"Invoice {invoice.invoice_number} not ready for ZATCA submission")
|
|
return {'status': 'skipped', 'reason': 'Invoice not issued'}
|
|
|
|
# TODO: Implement actual ZATCA API call
|
|
# This should include:
|
|
# 1. Generate XML invoice in ZATCA format
|
|
# 2. Sign invoice with digital certificate
|
|
# 3. Generate QR code
|
|
# 4. Submit to ZATCA API
|
|
# 5. Store ZATCA UUID and hash
|
|
|
|
logger.info(f"ZATCA e-invoice submission for {invoice.invoice_number}")
|
|
|
|
# Placeholder response
|
|
response = {
|
|
'status': 'success',
|
|
'zatca_uuid': f"ZATCA-{timezone.now().strftime('%Y%m%d%H%M%S')}",
|
|
'invoice_hash': 'placeholder_hash',
|
|
'qr_code': 'placeholder_qr_code_data',
|
|
'submitted': True,
|
|
'timestamp': str(timezone.now()),
|
|
}
|
|
|
|
return response
|
|
|
|
except Exception as exc:
|
|
logger.error(f"ZATCA submission failed: {exc}")
|
|
raise self.retry(exc=exc, countdown=300)
|
|
|
|
|
|
@shared_task
|
|
def submit_pending_zatca_invoices() -> Dict:
|
|
"""
|
|
Submit all pending invoices to ZATCA.
|
|
|
|
This task runs daily at 11:00 PM to submit invoices
|
|
that haven't been submitted to ZATCA yet.
|
|
|
|
Returns:
|
|
dict: Submission statistics
|
|
"""
|
|
from finance.models import Invoice
|
|
|
|
# Get invoices that need ZATCA submission
|
|
pending_invoices = Invoice.objects.filter(
|
|
status__in=['ISSUED', 'PAID'],
|
|
# Add field to track ZATCA submission status
|
|
# zatca_submitted=False
|
|
)
|
|
|
|
stats = {
|
|
'total': pending_invoices.count(),
|
|
'submitted': 0,
|
|
'failed': 0,
|
|
'errors': []
|
|
}
|
|
|
|
for invoice in pending_invoices:
|
|
try:
|
|
result = submit_zatca_invoice.delay(str(invoice.id)).get()
|
|
if result.get('status') == 'success':
|
|
stats['submitted'] += 1
|
|
else:
|
|
stats['failed'] += 1
|
|
stats['errors'].append(f"Invoice {invoice.invoice_number}: {result.get('reason')}")
|
|
except Exception as exc:
|
|
stats['failed'] += 1
|
|
stats['errors'].append(f"Invoice {invoice.invoice_number}: {str(exc)}")
|
|
|
|
logger.info(f"ZATCA batch submission: {stats['submitted']} submitted, {stats['failed']} failed")
|
|
|
|
return stats
|
|
|
|
|
|
# ============================================================================
|
|
# Lab Integration Tasks
|
|
# ============================================================================
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def sync_lab_results(self) -> Dict:
|
|
"""
|
|
Sync lab results from external lab system.
|
|
|
|
This task runs every 30 minutes to fetch new lab results.
|
|
|
|
Returns:
|
|
dict: Sync statistics
|
|
"""
|
|
try:
|
|
# TODO: Implement actual lab API integration
|
|
# This should:
|
|
# 1. Connect to lab API
|
|
# 2. Fetch new results since last sync
|
|
# 3. Match results to patients by MRN or order ID
|
|
# 4. Store results in database
|
|
# 5. Notify providers of new results
|
|
|
|
logger.info("Lab results sync started")
|
|
|
|
# Placeholder response
|
|
stats = {
|
|
'status': 'success',
|
|
'new_results': 0,
|
|
'updated_results': 0,
|
|
'errors': 0,
|
|
'timestamp': str(timezone.now()),
|
|
}
|
|
|
|
logger.info(f"Lab results sync completed: {stats}")
|
|
|
|
return stats
|
|
|
|
except Exception as exc:
|
|
logger.error(f"Lab results sync failed: {exc}")
|
|
raise self.retry(exc=exc, countdown=600)
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def send_lab_order(self, order_id: str) -> Dict:
|
|
"""
|
|
Send lab order to external lab system.
|
|
|
|
Args:
|
|
order_id: UUID of the lab order
|
|
|
|
Returns:
|
|
dict: Order submission response
|
|
"""
|
|
try:
|
|
# TODO: Implement actual lab order submission
|
|
# This should:
|
|
# 1. Get order details from database
|
|
# 2. Format order in lab system format
|
|
# 3. Submit to lab API
|
|
# 4. Store lab reference number
|
|
|
|
logger.info(f"Lab order submission: {order_id}")
|
|
|
|
# Placeholder response
|
|
response = {
|
|
'status': 'success',
|
|
'lab_reference': f"LAB-{timezone.now().strftime('%Y%m%d%H%M%S')}",
|
|
'submitted': True,
|
|
'estimated_completion': str(timezone.now().date()),
|
|
'timestamp': str(timezone.now()),
|
|
}
|
|
|
|
return response
|
|
|
|
except Exception as exc:
|
|
logger.error(f"Lab order submission failed: {exc}")
|
|
raise self.retry(exc=exc, countdown=300)
|
|
|
|
|
|
# ============================================================================
|
|
# Radiology Integration Tasks
|
|
# ============================================================================
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def sync_radiology_results(self) -> Dict:
|
|
"""
|
|
Sync radiology results from PACS/RIS system.
|
|
|
|
This task runs every 30 minutes to fetch new radiology results.
|
|
|
|
Returns:
|
|
dict: Sync statistics
|
|
"""
|
|
try:
|
|
# TODO: Implement actual radiology system integration
|
|
# This should:
|
|
# 1. Connect to PACS/RIS API
|
|
# 2. Fetch new studies and reports
|
|
# 3. Match to patients and orders
|
|
# 4. Store results and images
|
|
# 5. Notify providers of new results
|
|
|
|
logger.info("Radiology results sync started")
|
|
|
|
# Placeholder response
|
|
stats = {
|
|
'status': 'success',
|
|
'new_studies': 0,
|
|
'new_reports': 0,
|
|
'errors': 0,
|
|
'timestamp': str(timezone.now()),
|
|
}
|
|
|
|
logger.info(f"Radiology results sync completed: {stats}")
|
|
|
|
return stats
|
|
|
|
except Exception as exc:
|
|
logger.error(f"Radiology results sync failed: {exc}")
|
|
raise self.retry(exc=exc, countdown=600)
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def send_radiology_order(self, order_id: str) -> Dict:
|
|
"""
|
|
Send radiology order to PACS/RIS system.
|
|
|
|
Args:
|
|
order_id: UUID of the radiology order
|
|
|
|
Returns:
|
|
dict: Order submission response
|
|
"""
|
|
try:
|
|
# TODO: Implement actual radiology order submission
|
|
# This should:
|
|
# 1. Get order details from database
|
|
# 2. Format order in DICOM/HL7 format
|
|
# 3. Submit to PACS/RIS
|
|
# 4. Store accession number
|
|
|
|
logger.info(f"Radiology order submission: {order_id}")
|
|
|
|
# Placeholder response
|
|
response = {
|
|
'status': 'success',
|
|
'accession_number': f"RAD-{timezone.now().strftime('%Y%m%d%H%M%S')}",
|
|
'submitted': True,
|
|
'scheduled_date': str(timezone.now().date()),
|
|
'timestamp': str(timezone.now()),
|
|
}
|
|
|
|
return response
|
|
|
|
except Exception as exc:
|
|
logger.error(f"Radiology order submission failed: {exc}")
|
|
raise self.retry(exc=exc, countdown=300)
|
|
|
|
|
|
# ============================================================================
|
|
# SMS/WhatsApp Integration Tasks
|
|
# ============================================================================
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def send_sms_async(
|
|
self,
|
|
to: str,
|
|
message: str,
|
|
tenant_id: str,
|
|
template_code: Optional[str] = None,
|
|
context: Optional[Dict] = None
|
|
) -> Dict:
|
|
"""
|
|
Send SMS message asynchronously.
|
|
|
|
Args:
|
|
to: Recipient phone number
|
|
message: Message body (if not using template)
|
|
tenant_id: Tenant UUID
|
|
template_code: Template code (optional)
|
|
context: Template context variables (optional)
|
|
|
|
Returns:
|
|
dict: Send result
|
|
"""
|
|
try:
|
|
from integrations.messaging_service import MessagingService
|
|
|
|
service = MessagingService()
|
|
|
|
if template_code and context:
|
|
result = service.send_from_template(
|
|
template_code=template_code,
|
|
recipient_phone=to,
|
|
channel='SMS',
|
|
context=context,
|
|
tenant_id=tenant_id
|
|
)
|
|
else:
|
|
result = service.send_message(
|
|
to=to,
|
|
message=message,
|
|
channel='SMS',
|
|
tenant_id=tenant_id
|
|
)
|
|
|
|
if not result['success']:
|
|
logger.warning(f"SMS send failed: {result.get('error')}")
|
|
# Retry on failure
|
|
raise self.retry(exc=Exception(result.get('error')), countdown=300)
|
|
|
|
logger.info(f"SMS sent successfully to {to}")
|
|
return result
|
|
|
|
except Exception as exc:
|
|
logger.error(f"SMS async send failed: {exc}")
|
|
raise self.retry(exc=exc, countdown=300)
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def send_whatsapp_async(
|
|
self,
|
|
to: str,
|
|
message: str,
|
|
tenant_id: str,
|
|
template_code: Optional[str] = None,
|
|
context: Optional[Dict] = None
|
|
) -> Dict:
|
|
"""
|
|
Send WhatsApp message asynchronously.
|
|
|
|
Args:
|
|
to: Recipient phone number
|
|
message: Message body (if not using template)
|
|
tenant_id: Tenant UUID
|
|
template_code: Template code (optional)
|
|
context: Template context variables (optional)
|
|
|
|
Returns:
|
|
dict: Send result
|
|
"""
|
|
try:
|
|
from integrations.messaging_service import MessagingService
|
|
|
|
service = MessagingService()
|
|
|
|
if template_code and context:
|
|
result = service.send_from_template(
|
|
template_code=template_code,
|
|
recipient_phone=to,
|
|
channel='WHATSAPP',
|
|
context=context,
|
|
tenant_id=tenant_id
|
|
)
|
|
else:
|
|
result = service.send_message(
|
|
to=to,
|
|
message=message,
|
|
channel='WHATSAPP',
|
|
tenant_id=tenant_id
|
|
)
|
|
|
|
if not result['success']:
|
|
logger.warning(f"WhatsApp send failed: {result.get('error')}")
|
|
# Retry on failure
|
|
raise self.retry(exc=Exception(result.get('error')), countdown=300)
|
|
|
|
logger.info(f"WhatsApp sent successfully to {to}")
|
|
return result
|
|
|
|
except Exception as exc:
|
|
logger.error(f"WhatsApp async send failed: {exc}")
|
|
raise self.retry(exc=exc, countdown=300)
|
|
|
|
|
|
@shared_task
|
|
def update_message_statuses() -> Dict:
|
|
"""
|
|
Update status of all pending messages by checking with providers.
|
|
|
|
This task runs every 5 minutes to update message delivery status.
|
|
|
|
Returns:
|
|
dict: Update statistics
|
|
"""
|
|
from notifications.models import Message
|
|
from integrations.messaging_service import MessagingService
|
|
from datetime import timedelta
|
|
|
|
# Get messages sent in last 24 hours that aren't delivered/failed
|
|
cutoff = timezone.now() - timedelta(hours=24)
|
|
pending_messages = Message.objects.filter(
|
|
created_at__gte=cutoff,
|
|
status__in=[Message.Status.QUEUED, Message.Status.SENT],
|
|
provider_message_id__isnull=False
|
|
)
|
|
|
|
service = MessagingService()
|
|
stats = {
|
|
'total': pending_messages.count(),
|
|
'updated': 0,
|
|
'errors': 0
|
|
}
|
|
|
|
for message in pending_messages:
|
|
try:
|
|
result = service.update_message_status(str(message.id))
|
|
if result['success'] and result.get('updated'):
|
|
stats['updated'] += 1
|
|
except Exception as exc:
|
|
logger.error(f"Failed to update message {message.id}: {exc}")
|
|
stats['errors'] += 1
|
|
|
|
logger.info(f"Message status update: {stats['updated']}/{stats['total']} updated")
|
|
|
|
return stats
|
|
|
|
|
|
@shared_task
|
|
def retry_failed_messages() -> Dict:
|
|
"""
|
|
Retry sending failed messages that can be retried.
|
|
|
|
This task runs every hour to retry failed messages.
|
|
|
|
Returns:
|
|
dict: Retry statistics
|
|
"""
|
|
from notifications.models import Message
|
|
from integrations.messaging_service import MessagingService
|
|
from datetime import timedelta
|
|
|
|
# Get failed messages from last 24 hours that can be retried
|
|
cutoff = timezone.now() - timedelta(hours=24)
|
|
failed_messages = Message.objects.filter(
|
|
created_at__gte=cutoff,
|
|
status=Message.Status.FAILED,
|
|
retry_count__lt=3
|
|
)
|
|
|
|
service = MessagingService()
|
|
stats = {
|
|
'total': failed_messages.count(),
|
|
'retried': 0,
|
|
'succeeded': 0,
|
|
'failed': 0
|
|
}
|
|
|
|
for message in failed_messages:
|
|
try:
|
|
result = service.retry_failed_message(str(message.id))
|
|
stats['retried'] += 1
|
|
|
|
if result['success']:
|
|
stats['succeeded'] += 1
|
|
else:
|
|
stats['failed'] += 1
|
|
|
|
except Exception as exc:
|
|
logger.error(f"Failed to retry message {message.id}: {exc}")
|
|
stats['failed'] += 1
|
|
|
|
logger.info(f"Message retry: {stats['succeeded']}/{stats['retried']} succeeded")
|
|
|
|
return stats
|
|
|
|
|
|
@shared_task
|
|
def cleanup_old_messages() -> Dict:
|
|
"""
|
|
Clean up old message records to prevent database bloat.
|
|
|
|
Deletes messages older than 90 days.
|
|
This task runs daily at 2:00 AM.
|
|
|
|
Returns:
|
|
dict: Cleanup statistics
|
|
"""
|
|
from notifications.models import Message
|
|
from datetime import timedelta
|
|
|
|
cutoff = timezone.now() - timedelta(days=90)
|
|
|
|
old_messages = Message.objects.filter(created_at__lt=cutoff)
|
|
count = old_messages.count()
|
|
|
|
old_messages.delete()
|
|
|
|
logger.info(f"Cleaned up {count} old messages")
|
|
|
|
return {
|
|
'deleted': count,
|
|
'cutoff_date': str(cutoff.date())
|
|
}
|
|
|
|
|
|
@shared_task
|
|
def send_bulk_sms(
|
|
recipients: List[str],
|
|
message: str,
|
|
tenant_id: str
|
|
) -> Dict:
|
|
"""
|
|
Send SMS to multiple recipients.
|
|
|
|
Args:
|
|
recipients: List of phone numbers
|
|
message: Message body
|
|
tenant_id: Tenant UUID
|
|
|
|
Returns:
|
|
dict: Bulk send statistics
|
|
"""
|
|
from integrations.messaging_service import MessagingService
|
|
|
|
service = MessagingService()
|
|
|
|
result = service.send_bulk_messages(
|
|
recipients=recipients,
|
|
message=message,
|
|
channel='SMS',
|
|
tenant_id=tenant_id
|
|
)
|
|
|
|
logger.info(f"Bulk SMS: {result['sent']}/{result['total']} sent")
|
|
|
|
return result
|
|
|
|
|
|
@shared_task
|
|
def send_bulk_whatsapp(
|
|
recipients: List[str],
|
|
message: str,
|
|
tenant_id: str
|
|
) -> Dict:
|
|
"""
|
|
Send WhatsApp message to multiple recipients.
|
|
|
|
Args:
|
|
recipients: List of phone numbers
|
|
message: Message body
|
|
tenant_id: Tenant UUID
|
|
|
|
Returns:
|
|
dict: Bulk send statistics
|
|
"""
|
|
from integrations.messaging_service import MessagingService
|
|
|
|
service = MessagingService()
|
|
|
|
result = service.send_bulk_messages(
|
|
recipients=recipients,
|
|
message=message,
|
|
channel='WHATSAPP',
|
|
tenant_id=tenant_id
|
|
)
|
|
|
|
logger.info(f"Bulk WhatsApp: {result['sent']}/{result['total']} sent")
|
|
|
|
return result
|
|
|
|
|
|
# ============================================================================
|
|
# Integration Health Check
|
|
# ============================================================================
|
|
|
|
|
|
@shared_task
|
|
def check_integration_health() -> Dict:
|
|
"""
|
|
Check health status of all external integrations.
|
|
|
|
Returns:
|
|
dict: Health status of all integrations
|
|
"""
|
|
health = {
|
|
'nphies': {'status': 'unknown', 'last_check': str(timezone.now())},
|
|
'zatca': {'status': 'unknown', 'last_check': str(timezone.now())},
|
|
'lab': {'status': 'unknown', 'last_check': str(timezone.now())},
|
|
'radiology': {'status': 'unknown', 'last_check': str(timezone.now())},
|
|
'sms': {'status': 'unknown', 'last_check': str(timezone.now())},
|
|
'whatsapp': {'status': 'unknown', 'last_check': str(timezone.now())},
|
|
}
|
|
|
|
# Check SMS/WhatsApp providers
|
|
try:
|
|
from integrations.sms_providers import ProviderFactory
|
|
|
|
# Test SMS provider
|
|
try:
|
|
sms_provider = ProviderFactory.create_sms_provider()
|
|
health['sms']['status'] = 'healthy'
|
|
health['sms']['provider'] = getattr(settings, 'SMS_PROVIDER', 'mock')
|
|
except Exception as exc:
|
|
health['sms']['status'] = 'error'
|
|
health['sms']['error'] = str(exc)
|
|
|
|
# Test WhatsApp provider
|
|
try:
|
|
whatsapp_provider = ProviderFactory.create_whatsapp_provider()
|
|
health['whatsapp']['status'] = 'healthy'
|
|
health['whatsapp']['provider'] = getattr(settings, 'WHATSAPP_PROVIDER', 'mock')
|
|
except Exception as exc:
|
|
health['whatsapp']['status'] = 'error'
|
|
health['whatsapp']['error'] = str(exc)
|
|
|
|
except Exception as exc:
|
|
logger.error(f"Health check failed: {exc}")
|
|
|
|
# TODO: Implement actual health checks for other integrations
|
|
# This should ping each API and verify connectivity
|
|
|
|
logger.info(f"Integration health check: {health}")
|
|
|
|
return health
|