2025-08-12 13:33:25 +03:00

307 lines
8.3 KiB
Python

"""
Core utility functions for the hospital management system.
"""
import uuid
import hashlib
import json
from datetime import datetime, timedelta
from django.conf import settings
from django.utils import timezone
from django.contrib.contenttypes.models import ContentType
from .models import AuditLogEntry, SystemConfiguration
class AuditLogger:
"""
Utility class for audit logging.
"""
@staticmethod
def log_event(
tenant,
event_type,
event_category,
action,
description,
user=None,
content_object=None,
changes=None,
additional_data=None,
patient_id=None,
patient_mrn=None,
risk_level='LOW',
request=None
):
"""
Log an audit event.
"""
log_data = {
'tenant': tenant,
'event_type': event_type,
'event_category': event_category,
'action': action,
'description': description,
'risk_level': risk_level,
'changes': changes or {},
'additional_data': additional_data or {},
}
# User information
if user:
log_data.update({
'user': user,
'user_email': user.email,
'user_role': getattr(user, 'role', None),
})
# Request information
if request:
log_data.update({
'session_key': request.session.session_key,
'ip_address': get_client_ip(request),
'user_agent': request.META.get('HTTP_USER_AGENT', ''),
})
# Content object information
if content_object:
log_data.update({
'content_type': ContentType.objects.get_for_model(content_object),
'object_id': content_object.pk,
'object_repr': str(content_object),
})
# Patient context
if patient_id:
log_data['patient_id'] = patient_id
if patient_mrn:
log_data['patient_mrn'] = patient_mrn
# Set compliance flags
log_data['hipaa_relevant'] = event_category in [
'PATIENT_DATA', 'CLINICAL_DATA', 'DATA_ACCESS'
]
log_data['gdpr_relevant'] = event_category in [
'DATA_ACCESS', 'DATA_MODIFICATION', 'DATA_EXPORT'
]
return AuditLogEntry.objects.create(**log_data)
class ConfigurationManager:
"""
Utility class for managing system configurations.
"""
@staticmethod
def get_config(key, tenant=None, default=None):
"""
Get a configuration value.
"""
try:
config = SystemConfiguration.objects.get(
key=key,
tenant=tenant,
is_active=True
)
return config.get_typed_value()
except SystemConfiguration.DoesNotExist:
# Try global configuration if tenant-specific not found
if tenant:
try:
config = SystemConfiguration.objects.get(
key=key,
tenant=None,
is_active=True
)
return config.get_typed_value()
except SystemConfiguration.DoesNotExist:
pass
return default
@staticmethod
def set_config(key, value, tenant=None, data_type='STRING', category='GENERAL', user=None):
"""
Set a configuration value.
"""
config, created = SystemConfiguration.objects.get_or_create(
key=key,
tenant=tenant,
defaults={
'value': str(value),
'data_type': data_type,
'category': category,
'updated_by': user,
}
)
if not created:
config.value = str(value)
config.data_type = data_type
config.updated_by = user
config.save()
return config
class HL7Utility:
"""
Utility class for HL7 message handling.
"""
@staticmethod
def generate_message_id():
"""
Generate a unique HL7 message ID.
"""
return str(uuid.uuid4()).replace('-', '')[:20]
@staticmethod
def format_datetime(dt=None):
"""
Format datetime for HL7 messages.
"""
if dt is None:
dt = timezone.now()
return dt.strftime('%Y%m%d%H%M%S')
@staticmethod
def parse_hl7_datetime(hl7_datetime):
"""
Parse HL7 datetime string.
"""
try:
return datetime.strptime(hl7_datetime, '%Y%m%d%H%M%S')
except ValueError:
try:
return datetime.strptime(hl7_datetime, '%Y%m%d')
except ValueError:
return None
class DICOMUtility:
"""
Utility class for DICOM handling.
"""
@staticmethod
def generate_study_uid():
"""
Generate a unique DICOM Study Instance UID.
"""
# Use hospital OID prefix + timestamp + random
timestamp = int(timezone.now().timestamp())
random_part = uuid.uuid4().hex[:8]
return f"1.2.826.0.1.3680043.8.498.{timestamp}.{random_part}"
@staticmethod
def generate_series_uid():
"""
Generate a unique DICOM Series Instance UID.
"""
timestamp = int(timezone.now().timestamp())
random_part = uuid.uuid4().hex[:8]
return f"1.2.826.0.1.3680043.8.498.{timestamp}.{random_part}"
@staticmethod
def generate_instance_uid():
"""
Generate a unique DICOM SOP Instance UID.
"""
timestamp = int(timezone.now().timestamp())
random_part = uuid.uuid4().hex[:8]
return f"1.2.826.0.1.3680043.8.498.{timestamp}.{random_part}"
class EncryptionUtility:
"""
Utility class for data encryption.
"""
@staticmethod
def encrypt_data(data):
"""
Encrypt sensitive data.
"""
# Simple encryption for demo - use proper encryption in production
key = settings.HOSPITAL_SETTINGS.get('ENCRYPTION_KEY', 'default-key')
return hashlib.sha256(f"{key}{data}".encode()).hexdigest()
@staticmethod
def decrypt_data(encrypted_data):
"""
Decrypt sensitive data.
"""
# This is a placeholder - implement proper decryption
return encrypted_data
def get_client_ip(request):
"""
Get the client IP address from request.
"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
def generate_medical_record_number(tenant):
"""
Generate a unique medical record number for a tenant.
"""
# Use tenant prefix + timestamp + random
prefix = tenant.name[:3].upper()
timestamp = int(timezone.now().timestamp())
random_part = uuid.uuid4().hex[:6].upper()
return f"{prefix}{timestamp}{random_part}"
def calculate_age(birth_date):
"""
Calculate age from birth date.
"""
today = timezone.now().date()
return today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))
def format_phone_number(phone):
"""
Format phone number for display.
"""
# Remove all non-digit characters
digits = ''.join(filter(str.isdigit, phone))
if len(digits) == 10:
return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
elif len(digits) == 11 and digits[0] == '1':
return f"+1 ({digits[1:4]}) {digits[4:7]}-{digits[7:]}"
else:
return phone
def validate_mrn(mrn):
"""
Validate medical record number format.
"""
# Basic validation - customize based on requirements
if not mrn:
return False
if len(mrn) < 6 or len(mrn) > 20:
return False
return mrn.isalnum()
def get_next_sequence_number(tenant, sequence_type):
"""
Get the next sequence number for various entity types.
"""
key = f"sequence_{sequence_type}"
current = ConfigurationManager.get_config(key, tenant, 0)
next_number = current + 1
ConfigurationManager.set_config(key, next_number, tenant, 'INTEGER')
return next_number