Marwan Alwali ab2c4a36c5 update
2025-10-02 10:13:03 +03:00

773 lines
25 KiB
Python

"""
Integration app models for external system integrations.
"""
import uuid
import json
from datetime import datetime, timedelta
from decimal import Decimal
from django.db import models
from django.contrib.auth import get_user_model
from django.core.validators import MinValueValidator, MaxValueValidator
from django.utils import timezone
from core.models import Tenant
from django.conf import settings
class ExternalSystem(models.Model):
"""
External system configuration for integrations.
"""
class SystemType(models.TextChoices):
EHR = 'EHR', 'Electronic Health Record'
HIS = 'HIS', 'Hospital Information System'
LIS = 'LIS', 'Laboratory Information System'
RIS = 'RIS', 'Radiology Information System'
PACS = 'PACS', 'Picture Archiving System'
PHARMACY = 'PHARMACY', 'Pharmacy Management System'
BILLING = 'BILLING', 'Billing System'
INSURANCE = 'INSURANCE', 'Insurance System'
GOVERNMENT = 'GOVERNMENT', 'Government System'
VENDOR = 'VENDOR', 'Vendor System'
API = 'API', 'API Service'
DATABASE = 'DATABASE', 'Database System'
FILE = 'FILE', 'File System'
FTP = 'FTP', 'FTP Server'
SFTP = 'SFTP', 'SFTP Server'
CLOUD = 'CLOUD', 'Cloud Service'
IOT = 'IOT', 'IoT Device'
MONITORING = 'MONITORING', 'Monitoring System'
ANALYTICS = 'ANALYTICS', 'Analytics Platform'
OTHER = 'OTHER', 'Other System'
class AuthenticationType(models.TextChoices):
NONE = 'NONE', 'No Authentication'
BASIC = 'BASIC', 'Basic Authentication'
BEARER = 'BEARER', 'Bearer Token'
API_KEY = 'API_KEY', 'API Key'
OAUTH2 = 'OAUTH2', 'OAuth 2.0'
CERTIFICATE = 'CERTIFICATE', 'Client Certificate'
CUSTOM = 'CUSTOM', 'Custom Authentication'
system_id = models.UUIDField(
primary_key=True,
default=uuid.uuid4,
editable=False
)
tenant = models.ForeignKey(
Tenant,
on_delete=models.CASCADE,
related_name='external_systems'
)
name = models.CharField(max_length=200)
description = models.TextField(blank=True)
system_type = models.CharField(
max_length=20,
choices=SystemType.choices,
)
vendor = models.CharField(max_length=200, blank=True)
version = models.CharField(max_length=100, blank=True)
# Connection details
base_url = models.URLField(blank=True)
host = models.CharField(max_length=255, blank=True)
port = models.PositiveIntegerField(
null=True,
blank=True,
validators=[MinValueValidator(1), MaxValueValidator(65535)]
)
database_name = models.CharField(max_length=200, blank=True)
# Authentication
authentication_type = models.CharField(
max_length=20,
choices=AuthenticationType.choices,
default='none'
)
authentication_config = models.JSONField(default=dict, blank=True)
# Configuration
configuration = models.JSONField(default=dict, blank=True)
timeout_seconds = models.PositiveIntegerField(default=30)
retry_attempts = models.PositiveIntegerField(default=3)
retry_delay_seconds = models.PositiveIntegerField(default=5)
# Status
is_active = models.BooleanField(default=True)
is_healthy = models.BooleanField(default=False)
last_health_check = models.DateTimeField(null=True, blank=True)
health_check_interval = models.PositiveIntegerField(
default=300, # 5 minutes
help_text="Health check interval in seconds"
)
# Usage tracking
connection_count = models.PositiveIntegerField(default=0)
success_count = models.PositiveIntegerField(default=0)
failure_count = models.PositiveIntegerField(default=0)
last_used_at = models.DateTimeField(null=True, blank=True)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
created_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='created_external_systems'
)
class Meta:
db_table = 'integration_external_system'
indexes = [
models.Index(fields=['tenant', 'system_type']),
models.Index(fields=['tenant', 'is_active']),
models.Index(fields=['tenant', 'is_healthy']),
models.Index(fields=['last_health_check']),
]
unique_together = [['tenant', 'name']]
def __str__(self):
return f"{self.name} ({self.get_system_type_display()})"
@property
def success_rate(self):
"""Calculate success rate percentage."""
total = self.connection_count
if total == 0:
return 0.0
return (self.success_count / total) * 100
@property
def is_due_for_health_check(self):
"""Check if system is due for health check."""
if not self.last_health_check:
return True
next_check = self.last_health_check + timedelta(seconds=self.health_check_interval)
return timezone.now() >= next_check
class IntegrationEndpoint(models.Model):
"""
Integration endpoint configuration.
"""
class EndpointType(models.TextChoices):
REST_API = 'REST_API', 'REST API'
SOAP = 'SOAP', 'SOAP Web Service'
HL7 = 'HL7', 'HL7 Interface'
DICOM = 'DICOM', 'DICOM Service'
FTP = 'FTP', 'FTP Transfer'
SFTP = 'SFTP', 'SFTP Transfer'
DATABASE = 'DATABASE', 'Database Query'
FILE = 'FILE', 'File Processing'
WEBHOOK = 'WEBHOOK', 'Webhook'
QUEUE = 'QUEUE', 'Message Queue'
EMAIL = 'EMAIL', 'Email'
CUSTOM = 'CUSTOM', 'Custom Integration'
class HttpMethod(models.TextChoices):
GET = 'GET', 'GET'
POST = 'POST', 'POST'
PUT = 'PUT', 'PUT'
PATCH = 'PATCH', 'PATCH'
DELETE = 'DELETE', 'DELETE'
HEAD = 'HEAD', 'HEAD'
OPTIONS = 'OPTIONS', 'OPTIONS'
class Direction(models.TextChoices):
INBOUND = 'INBOUND', 'Inbound'
OUTBOUND = 'OUTBOUND', 'Outbound'
BIDIRECTIONAL = 'BIDIRECTIONAL', 'Bidirectional'
endpoint_id = models.UUIDField(
primary_key=True,
default=uuid.uuid4,
editable=False
)
external_system = models.ForeignKey(
ExternalSystem,
on_delete=models.CASCADE,
related_name='endpoints'
)
name = models.CharField(max_length=200)
description = models.TextField(blank=True)
endpoint_type = models.CharField(
max_length=20,
choices=EndpointType.choices,
)
# Endpoint details
path = models.CharField(max_length=500, blank=True)
method = models.CharField(
max_length=10,
choices=HttpMethod.choices,
default=HttpMethod.GET,
)
direction = models.CharField(
max_length=20,
choices=Direction.choices,
default=Direction.OUTBOUND,
)
# Configuration
headers = models.JSONField(default=dict, blank=True)
parameters = models.JSONField(default=dict, blank=True)
request_format = models.CharField(max_length=50, default='json')
response_format = models.CharField(max_length=50, default='json')
# Data mapping
request_mapping = models.JSONField(default=dict, blank=True)
response_mapping = models.JSONField(default=dict, blank=True)
# Validation
request_schema = models.JSONField(default=dict, blank=True)
response_schema = models.JSONField(default=dict, blank=True)
# Status
is_active = models.BooleanField(default=True)
# Usage tracking
execution_count = models.PositiveIntegerField(default=0)
success_count = models.PositiveIntegerField(default=0)
failure_count = models.PositiveIntegerField(default=0)
last_executed_at = models.DateTimeField(null=True, blank=True)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
created_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='created_integration_endpoints'
)
class Meta:
db_table = 'integration_endpoint'
indexes = [
models.Index(fields=['external_system', 'endpoint_type']),
models.Index(fields=['external_system', 'is_active']),
models.Index(fields=['direction']),
models.Index(fields=['last_executed_at']),
]
unique_together = [['external_system', 'name']]
def __str__(self):
return f"{self.external_system.name} - {self.name}"
@property
def success_rate(self):
"""Calculate success rate percentage."""
total = self.execution_count
if total == 0:
return 0.0
return (self.success_count / total) * 100
class DataMapping(models.Model):
"""
Data mapping configuration for field transformations.
"""
class MappingType(models.TextChoices):
FIELD = 'field', 'Field Mapping'
VALUE = 'value', 'Value Mapping'
TRANSFORMATION = 'transformation', 'Data Transformation'
VALIDATION = 'validation', 'Data Validation'
ENRICHMENT = 'enrichment', 'Data Enrichment'
FILTERING = 'filtering', 'Data Filtering'
class TransformationType(models.TextChoices):
NONE = 'none', 'No Transformation'
FORMAT = 'format', 'Format Conversion'
CALCULATION = 'calculation', 'Calculation'
LOOKUP = 'lookup', 'Lookup Table'
CONCATENATION = 'concatenation', 'Concatenation'
SPLIT = 'split', 'Split Field'
DEFAULT = 'default', 'Default Value'
CONDITIONAL = 'conditional', 'Conditional Logic'
CUSTOM = 'custom', 'Custom Function'
mapping_id = models.UUIDField(
primary_key=True,
default=uuid.uuid4,
editable=False
)
endpoint = models.ForeignKey(
IntegrationEndpoint,
on_delete=models.CASCADE,
related_name='data_mappings'
)
name = models.CharField(max_length=200)
description = models.TextField(blank=True)
mapping_type = models.CharField(
max_length=20,
choices=MappingType.choices,
)
# Source configuration
source_field = models.CharField(max_length=500)
source_format = models.CharField(max_length=100, blank=True)
source_validation = models.JSONField(default=dict, blank=True)
# Target configuration
target_field = models.CharField(max_length=500)
target_format = models.CharField(max_length=100, blank=True)
target_validation = models.JSONField(default=dict, blank=True)
# Transformation
transformation_type = models.CharField(
max_length=20,
choices=TransformationType.choices,
default=TransformationType.NONE,
)
transformation_config = models.JSONField(default=dict, blank=True)
# Validation rules
is_required = models.BooleanField(default=False)
validation_rules = models.JSONField(default=dict, blank=True)
default_value = models.TextField(blank=True)
# Status
is_active = models.BooleanField(default=True)
# Usage tracking
usage_count = models.PositiveIntegerField(default=0)
success_count = models.PositiveIntegerField(default=0)
failure_count = models.PositiveIntegerField(default=0)
last_used_at = models.DateTimeField(null=True, blank=True)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
created_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='created_data_mappings'
)
class Meta:
db_table = 'integration_data_mapping'
indexes = [
models.Index(fields=['endpoint', 'mapping_type']),
models.Index(fields=['endpoint', 'is_active']),
models.Index(fields=['source_field']),
models.Index(fields=['target_field']),
]
unique_together = [['endpoint', 'name']]
def __str__(self):
return f"{self.endpoint.name} - {self.name}"
@property
def success_rate(self):
"""Calculate success rate percentage."""
total = self.usage_count
if total == 0:
return 0.0
return (self.success_count / total) * 100
class IntegrationExecution(models.Model):
"""
Integration execution tracking and logging.
"""
class ExecutionType(models.TextChoices):
MANUAL = 'manual', 'Manual Execution'
SCHEDULED = 'scheduled', 'Scheduled Execution'
TRIGGERED = 'triggered', 'Event Triggered'
WEBHOOK = 'webhook', 'Webhook Triggered'
API = 'api', 'API Call'
BATCH = 'batch', 'Batch Processing'
REAL_TIME = 'real_time', 'Real-time Processing'
class ExecutionStatus(models.TextChoices):
PENDING = 'pending', 'Pending'
RUNNING = 'running', 'Running'
COMPLETED = 'completed', 'Completed'
FAILED = 'failed', 'Failed'
CANCELLED = 'cancelled', 'Cancelled'
TIMEOUT = 'timeout', 'Timeout'
RETRY = 'retry', 'Retrying'
execution_id = models.UUIDField(
primary_key=True,
default=uuid.uuid4,
editable=False
)
endpoint = models.ForeignKey(
IntegrationEndpoint,
on_delete=models.CASCADE,
related_name='executions'
)
execution_type = models.CharField(
max_length=20,
choices=ExecutionType.choices,
)
# Execution details
status = models.CharField(
max_length=20,
choices=ExecutionStatus.choices,
default=ExecutionStatus.PENDING,
)
started_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True, blank=True)
# Request/Response
request_data = models.JSONField(default=dict, blank=True)
response_data = models.JSONField(default=dict, blank=True)
request_size_bytes = models.PositiveIntegerField(default=0)
response_size_bytes = models.PositiveIntegerField(default=0)
# Performance metrics
processing_time_ms = models.PositiveIntegerField(null=True, blank=True)
network_time_ms = models.PositiveIntegerField(null=True, blank=True)
# Error handling
error_message = models.TextField(blank=True)
error_details = models.JSONField(default=dict, blank=True)
retry_count = models.PositiveIntegerField(default=0)
# External tracking
external_id = models.CharField(max_length=200, blank=True)
correlation_id = models.CharField(max_length=200, blank=True)
# Metadata
triggered_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='triggered_integrations'
)
metadata = models.JSONField(default=dict, blank=True)
class Meta:
db_table = 'integration_execution'
indexes = [
models.Index(fields=['endpoint', 'status']),
models.Index(fields=['endpoint', 'started_at']),
models.Index(fields=['execution_type']),
models.Index(fields=['correlation_id']),
models.Index(fields=['external_id']),
]
def __str__(self):
return f"{self.endpoint.name} - {self.execution_id}"
@property
def duration(self):
"""Calculate execution duration."""
if self.completed_at and self.started_at:
return self.completed_at - self.started_at
return None
@property
def is_successful(self):
"""Check if execution was successful."""
return self.status == 'completed'
class WebhookEndpoint(models.Model):
"""
Webhook endpoint configuration for receiving external data.
"""
class AuthenticationType(models.TextChoices):
NONE = 'none', 'No Authentication'
BASIC = 'basic', 'Basic Authentication'
BEARER = 'bearer', 'Bearer Token'
API_KEY = 'api_key', 'API Key'
SIGNATURE = 'signature', 'Signature Verification'
IP_WHITELIST = 'ip_whitelist', 'IP Whitelist'
CUSTOM = 'custom', 'Custom Authentication'
webhook_id = models.UUIDField(
primary_key=True,
default=uuid.uuid4,
editable=False
)
external_system = models.ForeignKey(
ExternalSystem,
on_delete=models.CASCADE,
related_name='webhooks'
)
name = models.CharField(max_length=200)
description = models.TextField(blank=True)
# Webhook configuration
url_path = models.CharField(
max_length=500,
unique=True,
help_text="URL path for webhook endpoint"
)
allowed_methods = models.JSONField(
default=list,
help_text="Allowed HTTP methods"
)
# Authentication
authentication_type = models.CharField(
max_length=20,
choices=AuthenticationType.choices,
default=AuthenticationType.NONE,
)
authentication_config = models.JSONField(default=dict, blank=True)
# Processing configuration
data_mapping = models.ForeignKey(
DataMapping,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='webhooks'
)
processing_config = models.JSONField(default=dict, blank=True)
# Rate limiting
rate_limit_per_minute = models.PositiveIntegerField(
null=True,
blank=True,
help_text="Maximum requests per minute"
)
rate_limit_per_hour = models.PositiveIntegerField(
null=True,
blank=True,
help_text="Maximum requests per hour"
)
# Status
is_active = models.BooleanField(default=True)
# Usage tracking
request_count = models.PositiveIntegerField(default=0)
success_count = models.PositiveIntegerField(default=0)
failure_count = models.PositiveIntegerField(default=0)
last_request_at = models.DateTimeField(null=True, blank=True)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
created_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='created_webhooks'
)
class Meta:
db_table = 'integration_webhook_endpoint'
indexes = [
models.Index(fields=['external_system', 'is_active']),
models.Index(fields=['url_path']),
models.Index(fields=['last_request_at']),
]
unique_together = [['external_system', 'name']]
def __str__(self):
return f"{self.external_system.name} - {self.name}"
@property
def success_rate(self):
"""Calculate success rate percentage."""
total = self.request_count
if total == 0:
return 0.0
return (self.success_count / total) * 100
@property
def full_url(self):
"""Get full webhook URL."""
# This would be constructed based on the application's base URL
return f"/api/webhooks/{self.url_path}"
class WebhookExecution(models.Model):
"""
Webhook execution tracking and logging.
"""
class IntegrationStatus(models.TextChoices):
RECEIVED = 'received', 'Received'
PROCESSING = 'processing', 'Processing'
COMPLETED = 'completed', 'Completed'
FAILED = 'failed', 'Failed'
REJECTED = 'rejected', 'Rejected'
TIMEOUT = 'timeout', 'Timeout'
execution_id = models.UUIDField(
primary_key=True,
default=uuid.uuid4,
editable=False
)
webhook = models.ForeignKey(
WebhookEndpoint,
on_delete=models.CASCADE,
related_name='executions'
)
# Request details
method = models.CharField(max_length=10)
headers = models.JSONField(default=dict, blank=True)
query_params = models.JSONField(default=dict, blank=True)
payload = models.JSONField(default=dict, blank=True)
payload_size_bytes = models.PositiveIntegerField(default=0)
# Client information
client_ip = models.GenericIPAddressField(null=True, blank=True)
user_agent = models.TextField(blank=True)
# Processing
status = models.CharField(
max_length=20,
choices=IntegrationStatus.choices,
default=IntegrationStatus.RECEIVED,
)
received_at = models.DateTimeField(auto_now_add=True)
processed_at = models.DateTimeField(null=True, blank=True)
processing_time_ms = models.PositiveIntegerField(null=True, blank=True)
# Response
response_status = models.PositiveIntegerField(default=200)
response_data = models.JSONField(default=dict, blank=True)
# Error handling
error_message = models.TextField(blank=True)
error_details = models.JSONField(default=dict, blank=True)
# External tracking
external_id = models.CharField(max_length=200, blank=True)
correlation_id = models.CharField(max_length=200, blank=True)
# Metadata
metadata = models.JSONField(default=dict, blank=True)
class Meta:
db_table = 'integration_webhook_execution'
indexes = [
models.Index(fields=['webhook', 'status']),
models.Index(fields=['webhook', 'received_at']),
models.Index(fields=['client_ip']),
models.Index(fields=['correlation_id']),
models.Index(fields=['external_id']),
]
def __str__(self):
return f"{self.webhook.name} - {self.execution_id}"
@property
def duration(self):
"""Calculate processing duration."""
if self.processed_at and self.received_at:
return self.processed_at - self.received_at
return None
@property
def is_successful(self):
"""Check if execution was successful."""
return self.status == 'completed'
class IntegrationLog(models.Model):
"""
Integration activity logging for audit and monitoring.
"""
class LogLevel(models.TextChoices):
DEBUG = 'debug', 'Debug'
INFO = 'info', 'Info'
WARNING = 'warning', 'Warning'
ERROR = 'error', 'Error'
CRITICAL = 'critical', 'Critical'
class LogCategory(models.TextChoices):
CONNECTION = 'connection', 'Connection'
AUTHENTICATION = 'authentication', 'Authentication'
DATA_TRANSFER = 'data_transfer', 'Data Transfer'
TRANSFORMATION = 'transformation', 'Data Transformation'
VALIDATION = 'validation', 'Data Validation'
ERROR = 'error', 'Error'
PERFORMANCE = 'performance', 'Performance'
SECURITY = 'security', 'Security'
AUDIT = 'audit', 'Audit'
SYSTEM = 'system', 'System'
log_id = models.UUIDField(
primary_key=True,
default=uuid.uuid4,
editable=False
)
external_system = models.ForeignKey(
ExternalSystem,
on_delete=models.CASCADE,
related_name='logs',
null=True,
blank=True
)
endpoint = models.ForeignKey(
IntegrationEndpoint,
on_delete=models.CASCADE,
related_name='logs',
null=True,
blank=True
)
execution = models.ForeignKey(
IntegrationExecution,
on_delete=models.CASCADE,
related_name='logs',
null=True,
blank=True
)
# Log details
level = models.CharField(
max_length=20,
choices=LogLevel.choices,
)
category = models.CharField(
max_length=20,
choices=LogCategory.choices,
)
message = models.TextField()
details = models.JSONField(default=dict, blank=True)
# Context
correlation_id = models.CharField(max_length=200, blank=True)
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='integration_logs'
)
# Metadata
timestamp = models.DateTimeField(auto_now_add=True)
metadata = models.JSONField(default=dict, blank=True)
class Meta:
db_table = 'integration_log'
indexes = [
models.Index(fields=['external_system', 'level']),
models.Index(fields=['endpoint', 'level']),
models.Index(fields=['execution']),
models.Index(fields=['level', 'timestamp']),
models.Index(fields=['category', 'timestamp']),
models.Index(fields=['correlation_id']),
]
def __str__(self):
system_name = self.external_system.name if self.external_system else "System"
return f"{system_name} - {self.get_level_display()}: {self.message[:50]}"