2025-11-02 14:35:35 +03:00

751 lines
22 KiB
Python

"""
Integrations Celery tasks for external system communication.
This module contains tasks for NPHIES (insurance), ZATCA (e-invoicing),
Lab, and Radiology integrations.
"""
import logging
from typing import Dict, Optional, List
from celery import shared_task
from django.conf import settings
from django.utils import timezone
logger = logging.getLogger(__name__)
# ============================================================================
# NPHIES Integration Tasks (Saudi Insurance)
# ============================================================================
@shared_task(bind=True, max_retries=3)
def submit_nphies_eligibility(self, patient_id: str, insurance_id: str) -> Dict:
"""
Submit eligibility check to NPHIES.
Args:
patient_id: UUID of the patient
insurance_id: Insurance policy ID
Returns:
dict: Eligibility response
"""
try:
from core.models import Patient
patient = Patient.objects.get(id=patient_id)
# TODO: Implement actual NPHIES API call
# This is a placeholder for the integration
logger.info(f"NPHIES eligibility check for patient {patient.mrn}")
# Placeholder response
response = {
'status': 'success',
'eligible': True,
'coverage_type': 'FULL',
'message': 'Patient is eligible for coverage',
'timestamp': str(timezone.now()),
}
return response
except Exception as exc:
logger.error(f"NPHIES eligibility check failed: {exc}")
raise self.retry(exc=exc, countdown=300)
@shared_task(bind=True, max_retries=3)
def submit_nphies_prior_auth(self, appointment_id: str, services: list) -> Dict:
"""
Submit prior authorization request to NPHIES.
Args:
appointment_id: UUID of the appointment
services: List of services requiring authorization
Returns:
dict: Prior authorization response
"""
try:
from appointments.models import Appointment
appointment = Appointment.objects.select_related('patient').get(id=appointment_id)
# TODO: Implement actual NPHIES API call
logger.info(f"NPHIES prior auth for appointment {appointment_id}")
# Placeholder response
response = {
'status': 'success',
'authorization_number': f"AUTH-{timezone.now().strftime('%Y%m%d%H%M%S')}",
'approved': True,
'services': services,
'valid_until': str(timezone.now().date()),
'timestamp': str(timezone.now()),
}
return response
except Exception as exc:
logger.error(f"NPHIES prior auth failed: {exc}")
raise self.retry(exc=exc, countdown=300)
@shared_task(bind=True, max_retries=3)
def submit_nphies_claim(self, invoice_id: str) -> Dict:
"""
Submit insurance claim to NPHIES.
Args:
invoice_id: UUID of the invoice
Returns:
dict: Claim submission response
"""
try:
from finance.models import Invoice
invoice = Invoice.objects.select_related('patient', 'appointment').get(id=invoice_id)
# TODO: Implement actual NPHIES API call
logger.info(f"NPHIES claim submission for invoice {invoice.invoice_number}")
# Placeholder response
response = {
'status': 'success',
'claim_number': f"CLM-{timezone.now().strftime('%Y%m%d%H%M%S')}",
'submitted': True,
'amount_claimed': str(invoice.total_amount),
'timestamp': str(timezone.now()),
}
return response
except Exception as exc:
logger.error(f"NPHIES claim submission failed: {exc}")
raise self.retry(exc=exc, countdown=300)
# ============================================================================
# ZATCA Integration Tasks (Saudi E-Invoicing)
# ============================================================================
@shared_task(bind=True, max_retries=3)
def submit_zatca_invoice(self, invoice_id: str) -> Dict:
"""
Submit e-invoice to ZATCA (Saudi Tax Authority).
Args:
invoice_id: UUID of the invoice
Returns:
dict: ZATCA submission response
"""
try:
from finance.models import Invoice
invoice = Invoice.objects.select_related('patient').get(id=invoice_id)
# Only submit issued or paid invoices
if invoice.status not in ['ISSUED', 'PAID']:
logger.warning(f"Invoice {invoice.invoice_number} not ready for ZATCA submission")
return {'status': 'skipped', 'reason': 'Invoice not issued'}
# TODO: Implement actual ZATCA API call
# This should include:
# 1. Generate XML invoice in ZATCA format
# 2. Sign invoice with digital certificate
# 3. Generate QR code
# 4. Submit to ZATCA API
# 5. Store ZATCA UUID and hash
logger.info(f"ZATCA e-invoice submission for {invoice.invoice_number}")
# Placeholder response
response = {
'status': 'success',
'zatca_uuid': f"ZATCA-{timezone.now().strftime('%Y%m%d%H%M%S')}",
'invoice_hash': 'placeholder_hash',
'qr_code': 'placeholder_qr_code_data',
'submitted': True,
'timestamp': str(timezone.now()),
}
return response
except Exception as exc:
logger.error(f"ZATCA submission failed: {exc}")
raise self.retry(exc=exc, countdown=300)
@shared_task
def submit_pending_zatca_invoices() -> Dict:
"""
Submit all pending invoices to ZATCA.
This task runs daily at 11:00 PM to submit invoices
that haven't been submitted to ZATCA yet.
Returns:
dict: Submission statistics
"""
from finance.models import Invoice
# Get invoices that need ZATCA submission
pending_invoices = Invoice.objects.filter(
status__in=['ISSUED', 'PAID'],
# Add field to track ZATCA submission status
# zatca_submitted=False
)
stats = {
'total': pending_invoices.count(),
'submitted': 0,
'failed': 0,
'errors': []
}
for invoice in pending_invoices:
try:
result = submit_zatca_invoice.delay(str(invoice.id)).get()
if result.get('status') == 'success':
stats['submitted'] += 1
else:
stats['failed'] += 1
stats['errors'].append(f"Invoice {invoice.invoice_number}: {result.get('reason')}")
except Exception as exc:
stats['failed'] += 1
stats['errors'].append(f"Invoice {invoice.invoice_number}: {str(exc)}")
logger.info(f"ZATCA batch submission: {stats['submitted']} submitted, {stats['failed']} failed")
return stats
# ============================================================================
# Lab Integration Tasks
# ============================================================================
@shared_task(bind=True, max_retries=3)
def sync_lab_results(self) -> Dict:
"""
Sync lab results from external lab system.
This task runs every 30 minutes to fetch new lab results.
Returns:
dict: Sync statistics
"""
try:
# TODO: Implement actual lab API integration
# This should:
# 1. Connect to lab API
# 2. Fetch new results since last sync
# 3. Match results to patients by MRN or order ID
# 4. Store results in database
# 5. Notify providers of new results
logger.info("Lab results sync started")
# Placeholder response
stats = {
'status': 'success',
'new_results': 0,
'updated_results': 0,
'errors': 0,
'timestamp': str(timezone.now()),
}
logger.info(f"Lab results sync completed: {stats}")
return stats
except Exception as exc:
logger.error(f"Lab results sync failed: {exc}")
raise self.retry(exc=exc, countdown=600)
@shared_task(bind=True, max_retries=3)
def send_lab_order(self, order_id: str) -> Dict:
"""
Send lab order to external lab system.
Args:
order_id: UUID of the lab order
Returns:
dict: Order submission response
"""
try:
# TODO: Implement actual lab order submission
# This should:
# 1. Get order details from database
# 2. Format order in lab system format
# 3. Submit to lab API
# 4. Store lab reference number
logger.info(f"Lab order submission: {order_id}")
# Placeholder response
response = {
'status': 'success',
'lab_reference': f"LAB-{timezone.now().strftime('%Y%m%d%H%M%S')}",
'submitted': True,
'estimated_completion': str(timezone.now().date()),
'timestamp': str(timezone.now()),
}
return response
except Exception as exc:
logger.error(f"Lab order submission failed: {exc}")
raise self.retry(exc=exc, countdown=300)
# ============================================================================
# Radiology Integration Tasks
# ============================================================================
@shared_task(bind=True, max_retries=3)
def sync_radiology_results(self) -> Dict:
"""
Sync radiology results from PACS/RIS system.
This task runs every 30 minutes to fetch new radiology results.
Returns:
dict: Sync statistics
"""
try:
# TODO: Implement actual radiology system integration
# This should:
# 1. Connect to PACS/RIS API
# 2. Fetch new studies and reports
# 3. Match to patients and orders
# 4. Store results and images
# 5. Notify providers of new results
logger.info("Radiology results sync started")
# Placeholder response
stats = {
'status': 'success',
'new_studies': 0,
'new_reports': 0,
'errors': 0,
'timestamp': str(timezone.now()),
}
logger.info(f"Radiology results sync completed: {stats}")
return stats
except Exception as exc:
logger.error(f"Radiology results sync failed: {exc}")
raise self.retry(exc=exc, countdown=600)
@shared_task(bind=True, max_retries=3)
def send_radiology_order(self, order_id: str) -> Dict:
"""
Send radiology order to PACS/RIS system.
Args:
order_id: UUID of the radiology order
Returns:
dict: Order submission response
"""
try:
# TODO: Implement actual radiology order submission
# This should:
# 1. Get order details from database
# 2. Format order in DICOM/HL7 format
# 3. Submit to PACS/RIS
# 4. Store accession number
logger.info(f"Radiology order submission: {order_id}")
# Placeholder response
response = {
'status': 'success',
'accession_number': f"RAD-{timezone.now().strftime('%Y%m%d%H%M%S')}",
'submitted': True,
'scheduled_date': str(timezone.now().date()),
'timestamp': str(timezone.now()),
}
return response
except Exception as exc:
logger.error(f"Radiology order submission failed: {exc}")
raise self.retry(exc=exc, countdown=300)
# ============================================================================
# SMS/WhatsApp Integration Tasks
# ============================================================================
@shared_task(bind=True, max_retries=3)
def send_sms_async(
self,
to: str,
message: str,
tenant_id: str,
template_code: Optional[str] = None,
context: Optional[Dict] = None
) -> Dict:
"""
Send SMS message asynchronously.
Args:
to: Recipient phone number
message: Message body (if not using template)
tenant_id: Tenant UUID
template_code: Template code (optional)
context: Template context variables (optional)
Returns:
dict: Send result
"""
try:
from integrations.messaging_service import MessagingService
service = MessagingService()
if template_code and context:
result = service.send_from_template(
template_code=template_code,
recipient_phone=to,
channel='SMS',
context=context,
tenant_id=tenant_id
)
else:
result = service.send_message(
to=to,
message=message,
channel='SMS',
tenant_id=tenant_id
)
if not result['success']:
logger.warning(f"SMS send failed: {result.get('error')}")
# Retry on failure
raise self.retry(exc=Exception(result.get('error')), countdown=300)
logger.info(f"SMS sent successfully to {to}")
return result
except Exception as exc:
logger.error(f"SMS async send failed: {exc}")
raise self.retry(exc=exc, countdown=300)
@shared_task(bind=True, max_retries=3)
def send_whatsapp_async(
self,
to: str,
message: str,
tenant_id: str,
template_code: Optional[str] = None,
context: Optional[Dict] = None
) -> Dict:
"""
Send WhatsApp message asynchronously.
Args:
to: Recipient phone number
message: Message body (if not using template)
tenant_id: Tenant UUID
template_code: Template code (optional)
context: Template context variables (optional)
Returns:
dict: Send result
"""
try:
from integrations.messaging_service import MessagingService
service = MessagingService()
if template_code and context:
result = service.send_from_template(
template_code=template_code,
recipient_phone=to,
channel='WHATSAPP',
context=context,
tenant_id=tenant_id
)
else:
result = service.send_message(
to=to,
message=message,
channel='WHATSAPP',
tenant_id=tenant_id
)
if not result['success']:
logger.warning(f"WhatsApp send failed: {result.get('error')}")
# Retry on failure
raise self.retry(exc=Exception(result.get('error')), countdown=300)
logger.info(f"WhatsApp sent successfully to {to}")
return result
except Exception as exc:
logger.error(f"WhatsApp async send failed: {exc}")
raise self.retry(exc=exc, countdown=300)
@shared_task
def update_message_statuses() -> Dict:
"""
Update status of all pending messages by checking with providers.
This task runs every 5 minutes to update message delivery status.
Returns:
dict: Update statistics
"""
from notifications.models import Message
from integrations.messaging_service import MessagingService
from datetime import timedelta
# Get messages sent in last 24 hours that aren't delivered/failed
cutoff = timezone.now() - timedelta(hours=24)
pending_messages = Message.objects.filter(
created_at__gte=cutoff,
status__in=[Message.Status.QUEUED, Message.Status.SENT],
provider_message_id__isnull=False
)
service = MessagingService()
stats = {
'total': pending_messages.count(),
'updated': 0,
'errors': 0
}
for message in pending_messages:
try:
result = service.update_message_status(str(message.id))
if result['success'] and result.get('updated'):
stats['updated'] += 1
except Exception as exc:
logger.error(f"Failed to update message {message.id}: {exc}")
stats['errors'] += 1
logger.info(f"Message status update: {stats['updated']}/{stats['total']} updated")
return stats
@shared_task
def retry_failed_messages() -> Dict:
"""
Retry sending failed messages that can be retried.
This task runs every hour to retry failed messages.
Returns:
dict: Retry statistics
"""
from notifications.models import Message
from integrations.messaging_service import MessagingService
from datetime import timedelta
# Get failed messages from last 24 hours that can be retried
cutoff = timezone.now() - timedelta(hours=24)
failed_messages = Message.objects.filter(
created_at__gte=cutoff,
status=Message.Status.FAILED,
retry_count__lt=3
)
service = MessagingService()
stats = {
'total': failed_messages.count(),
'retried': 0,
'succeeded': 0,
'failed': 0
}
for message in failed_messages:
try:
result = service.retry_failed_message(str(message.id))
stats['retried'] += 1
if result['success']:
stats['succeeded'] += 1
else:
stats['failed'] += 1
except Exception as exc:
logger.error(f"Failed to retry message {message.id}: {exc}")
stats['failed'] += 1
logger.info(f"Message retry: {stats['succeeded']}/{stats['retried']} succeeded")
return stats
@shared_task
def cleanup_old_messages() -> Dict:
"""
Clean up old message records to prevent database bloat.
Deletes messages older than 90 days.
This task runs daily at 2:00 AM.
Returns:
dict: Cleanup statistics
"""
from notifications.models import Message
from datetime import timedelta
cutoff = timezone.now() - timedelta(days=90)
old_messages = Message.objects.filter(created_at__lt=cutoff)
count = old_messages.count()
old_messages.delete()
logger.info(f"Cleaned up {count} old messages")
return {
'deleted': count,
'cutoff_date': str(cutoff.date())
}
@shared_task
def send_bulk_sms(
recipients: List[str],
message: str,
tenant_id: str
) -> Dict:
"""
Send SMS to multiple recipients.
Args:
recipients: List of phone numbers
message: Message body
tenant_id: Tenant UUID
Returns:
dict: Bulk send statistics
"""
from integrations.messaging_service import MessagingService
service = MessagingService()
result = service.send_bulk_messages(
recipients=recipients,
message=message,
channel='SMS',
tenant_id=tenant_id
)
logger.info(f"Bulk SMS: {result['sent']}/{result['total']} sent")
return result
@shared_task
def send_bulk_whatsapp(
recipients: List[str],
message: str,
tenant_id: str
) -> Dict:
"""
Send WhatsApp message to multiple recipients.
Args:
recipients: List of phone numbers
message: Message body
tenant_id: Tenant UUID
Returns:
dict: Bulk send statistics
"""
from integrations.messaging_service import MessagingService
service = MessagingService()
result = service.send_bulk_messages(
recipients=recipients,
message=message,
channel='WHATSAPP',
tenant_id=tenant_id
)
logger.info(f"Bulk WhatsApp: {result['sent']}/{result['total']} sent")
return result
# ============================================================================
# Integration Health Check
# ============================================================================
@shared_task
def check_integration_health() -> Dict:
"""
Check health status of all external integrations.
Returns:
dict: Health status of all integrations
"""
health = {
'nphies': {'status': 'unknown', 'last_check': str(timezone.now())},
'zatca': {'status': 'unknown', 'last_check': str(timezone.now())},
'lab': {'status': 'unknown', 'last_check': str(timezone.now())},
'radiology': {'status': 'unknown', 'last_check': str(timezone.now())},
'sms': {'status': 'unknown', 'last_check': str(timezone.now())},
'whatsapp': {'status': 'unknown', 'last_check': str(timezone.now())},
}
# Check SMS/WhatsApp providers
try:
from integrations.sms_providers import ProviderFactory
# Test SMS provider
try:
sms_provider = ProviderFactory.create_sms_provider()
health['sms']['status'] = 'healthy'
health['sms']['provider'] = getattr(settings, 'SMS_PROVIDER', 'mock')
except Exception as exc:
health['sms']['status'] = 'error'
health['sms']['error'] = str(exc)
# Test WhatsApp provider
try:
whatsapp_provider = ProviderFactory.create_whatsapp_provider()
health['whatsapp']['status'] = 'healthy'
health['whatsapp']['provider'] = getattr(settings, 'WHATSAPP_PROVIDER', 'mock')
except Exception as exc:
health['whatsapp']['status'] = 'error'
health['whatsapp']['error'] = str(exc)
except Exception as exc:
logger.error(f"Health check failed: {exc}")
# TODO: Implement actual health checks for other integrations
# This should ping each API and verify connectivity
logger.info(f"Integration health check: {health}")
return health