307 lines
8.3 KiB
Python
307 lines
8.3 KiB
Python
"""
|
|
Core utility functions for the hospital management system.
|
|
"""
|
|
|
|
import uuid
|
|
import hashlib
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
from django.conf import settings
|
|
from django.utils import timezone
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from .models import AuditLogEntry, SystemConfiguration
|
|
|
|
|
|
class AuditLogger:
|
|
"""
|
|
Utility class for audit logging.
|
|
"""
|
|
|
|
@staticmethod
|
|
def log_event(
|
|
tenant,
|
|
event_type,
|
|
event_category,
|
|
action,
|
|
description,
|
|
user=None,
|
|
content_object=None,
|
|
changes=None,
|
|
additional_data=None,
|
|
patient_id=None,
|
|
patient_mrn=None,
|
|
risk_level='LOW',
|
|
request=None
|
|
):
|
|
"""
|
|
Log an audit event.
|
|
"""
|
|
log_data = {
|
|
'tenant': tenant,
|
|
'event_type': event_type,
|
|
'event_category': event_category,
|
|
'action': action,
|
|
'description': description,
|
|
'risk_level': risk_level,
|
|
'changes': changes or {},
|
|
'additional_data': additional_data or {},
|
|
}
|
|
|
|
# User information
|
|
if user:
|
|
log_data.update({
|
|
'user': user,
|
|
'user_email': user.email,
|
|
'user_role': getattr(user, 'role', None),
|
|
})
|
|
|
|
# Request information
|
|
if request:
|
|
log_data.update({
|
|
'session_key': request.session.session_key,
|
|
'ip_address': get_client_ip(request),
|
|
'user_agent': request.META.get('HTTP_USER_AGENT', ''),
|
|
})
|
|
|
|
# Content object information
|
|
if content_object:
|
|
log_data.update({
|
|
'content_type': ContentType.objects.get_for_model(content_object),
|
|
'object_id': content_object.pk,
|
|
'object_repr': str(content_object),
|
|
})
|
|
|
|
# Patient context
|
|
if patient_id:
|
|
log_data['patient_id'] = patient_id
|
|
if patient_mrn:
|
|
log_data['patient_mrn'] = patient_mrn
|
|
|
|
# Set compliance flags
|
|
log_data['hipaa_relevant'] = event_category in [
|
|
'PATIENT_DATA', 'CLINICAL_DATA', 'DATA_ACCESS'
|
|
]
|
|
log_data['gdpr_relevant'] = event_category in [
|
|
'DATA_ACCESS', 'DATA_MODIFICATION', 'DATA_EXPORT'
|
|
]
|
|
|
|
return AuditLogEntry.objects.create(**log_data)
|
|
|
|
|
|
class ConfigurationManager:
|
|
"""
|
|
Utility class for managing system configurations.
|
|
"""
|
|
|
|
@staticmethod
|
|
def get_config(key, tenant=None, default=None):
|
|
"""
|
|
Get a configuration value.
|
|
"""
|
|
try:
|
|
config = SystemConfiguration.objects.get(
|
|
key=key,
|
|
tenant=tenant,
|
|
is_active=True
|
|
)
|
|
return config.get_typed_value()
|
|
except SystemConfiguration.DoesNotExist:
|
|
# Try global configuration if tenant-specific not found
|
|
if tenant:
|
|
try:
|
|
config = SystemConfiguration.objects.get(
|
|
key=key,
|
|
tenant=None,
|
|
is_active=True
|
|
)
|
|
return config.get_typed_value()
|
|
except SystemConfiguration.DoesNotExist:
|
|
pass
|
|
return default
|
|
|
|
@staticmethod
|
|
def set_config(key, value, tenant=None, data_type='STRING', category='GENERAL', user=None):
|
|
"""
|
|
Set a configuration value.
|
|
"""
|
|
config, created = SystemConfiguration.objects.get_or_create(
|
|
key=key,
|
|
tenant=tenant,
|
|
defaults={
|
|
'value': str(value),
|
|
'data_type': data_type,
|
|
'category': category,
|
|
'updated_by': user,
|
|
}
|
|
)
|
|
|
|
if not created:
|
|
config.value = str(value)
|
|
config.data_type = data_type
|
|
config.updated_by = user
|
|
config.save()
|
|
|
|
return config
|
|
|
|
|
|
class HL7Utility:
|
|
"""
|
|
Utility class for HL7 message handling.
|
|
"""
|
|
|
|
@staticmethod
|
|
def generate_message_id():
|
|
"""
|
|
Generate a unique HL7 message ID.
|
|
"""
|
|
return str(uuid.uuid4()).replace('-', '')[:20]
|
|
|
|
@staticmethod
|
|
def format_datetime(dt=None):
|
|
"""
|
|
Format datetime for HL7 messages.
|
|
"""
|
|
if dt is None:
|
|
dt = timezone.now()
|
|
return dt.strftime('%Y%m%d%H%M%S')
|
|
|
|
@staticmethod
|
|
def parse_hl7_datetime(hl7_datetime):
|
|
"""
|
|
Parse HL7 datetime string.
|
|
"""
|
|
try:
|
|
return datetime.strptime(hl7_datetime, '%Y%m%d%H%M%S')
|
|
except ValueError:
|
|
try:
|
|
return datetime.strptime(hl7_datetime, '%Y%m%d')
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
class DICOMUtility:
|
|
"""
|
|
Utility class for DICOM handling.
|
|
"""
|
|
|
|
@staticmethod
|
|
def generate_study_uid():
|
|
"""
|
|
Generate a unique DICOM Study Instance UID.
|
|
"""
|
|
# Use hospital OID prefix + timestamp + random
|
|
timestamp = int(timezone.now().timestamp())
|
|
random_part = uuid.uuid4().hex[:8]
|
|
return f"1.2.826.0.1.3680043.8.498.{timestamp}.{random_part}"
|
|
|
|
@staticmethod
|
|
def generate_series_uid():
|
|
"""
|
|
Generate a unique DICOM Series Instance UID.
|
|
"""
|
|
timestamp = int(timezone.now().timestamp())
|
|
random_part = uuid.uuid4().hex[:8]
|
|
return f"1.2.826.0.1.3680043.8.498.{timestamp}.{random_part}"
|
|
|
|
@staticmethod
|
|
def generate_instance_uid():
|
|
"""
|
|
Generate a unique DICOM SOP Instance UID.
|
|
"""
|
|
timestamp = int(timezone.now().timestamp())
|
|
random_part = uuid.uuid4().hex[:8]
|
|
return f"1.2.826.0.1.3680043.8.498.{timestamp}.{random_part}"
|
|
|
|
|
|
class EncryptionUtility:
|
|
"""
|
|
Utility class for data encryption.
|
|
"""
|
|
|
|
@staticmethod
|
|
def encrypt_data(data):
|
|
"""
|
|
Encrypt sensitive data.
|
|
"""
|
|
# Simple encryption for demo - use proper encryption in production
|
|
key = settings.HOSPITAL_SETTINGS.get('ENCRYPTION_KEY', 'default-key')
|
|
return hashlib.sha256(f"{key}{data}".encode()).hexdigest()
|
|
|
|
@staticmethod
|
|
def decrypt_data(encrypted_data):
|
|
"""
|
|
Decrypt sensitive data.
|
|
"""
|
|
# This is a placeholder - implement proper decryption
|
|
return encrypted_data
|
|
|
|
|
|
def get_client_ip(request):
|
|
"""
|
|
Get the client IP address from request.
|
|
"""
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
|
|
def generate_medical_record_number(tenant):
|
|
"""
|
|
Generate a unique medical record number for a tenant.
|
|
"""
|
|
# Use tenant prefix + timestamp + random
|
|
prefix = tenant.name[:3].upper()
|
|
timestamp = int(timezone.now().timestamp())
|
|
random_part = uuid.uuid4().hex[:6].upper()
|
|
return f"{prefix}{timestamp}{random_part}"
|
|
|
|
|
|
def calculate_age(birth_date):
|
|
"""
|
|
Calculate age from birth date.
|
|
"""
|
|
today = timezone.now().date()
|
|
return today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))
|
|
|
|
|
|
def format_phone_number(phone):
|
|
"""
|
|
Format phone number for display.
|
|
"""
|
|
# Remove all non-digit characters
|
|
digits = ''.join(filter(str.isdigit, phone))
|
|
|
|
if len(digits) == 10:
|
|
return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
|
|
elif len(digits) == 11 and digits[0] == '1':
|
|
return f"+1 ({digits[1:4]}) {digits[4:7]}-{digits[7:]}"
|
|
else:
|
|
return phone
|
|
|
|
|
|
def validate_mrn(mrn):
|
|
"""
|
|
Validate medical record number format.
|
|
"""
|
|
# Basic validation - customize based on requirements
|
|
if not mrn:
|
|
return False
|
|
if len(mrn) < 6 or len(mrn) > 20:
|
|
return False
|
|
return mrn.isalnum()
|
|
|
|
|
|
def get_next_sequence_number(tenant, sequence_type):
|
|
"""
|
|
Get the next sequence number for various entity types.
|
|
"""
|
|
key = f"sequence_{sequence_type}"
|
|
current = ConfigurationManager.get_config(key, tenant, 0)
|
|
next_number = current + 1
|
|
ConfigurationManager.set_config(key, next_number, tenant, 'INTEGER')
|
|
return next_number
|
|
|