agdar/core/settings_service.py
2025-11-02 14:35:35 +03:00

411 lines
14 KiB
Python

"""
Tenant Settings Service Layer
Provides type-safe retrieval, validation, and management of tenant settings.
"""
import re
from typing import Any, Dict, List, Optional
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.db import transaction
from cryptography.fernet import Fernet
from django.conf import settings
from .models import Tenant, TenantSetting, SettingTemplate, User
class TenantSettingsService:
"""
Service for managing tenant settings with type safety and validation.
"""
# Cache timeout in seconds (5 minutes)
CACHE_TIMEOUT = 300
def __init__(self, tenant: Tenant):
"""
Initialize the service for a specific tenant.
Args:
tenant: The tenant instance
"""
self.tenant = tenant
self._cipher = None
def _get_cache_key(self, setting_key: str) -> str:
"""Generate cache key for a setting."""
return f"tenant_setting:{self.tenant.id}:{setting_key}"
def _get_cipher(self):
"""Get or create Fernet cipher for encryption/decryption."""
if self._cipher is None:
# Use Django SECRET_KEY as encryption key (in production, use a dedicated key)
key = settings.SECRET_KEY[:32].encode().ljust(32, b'0')
from base64 import urlsafe_b64encode
fernet_key = urlsafe_b64encode(key)
self._cipher = Fernet(fernet_key)
return self._cipher
def get_setting(self, key: str, default: Any = None) -> Any:
"""
Retrieve a setting value with type conversion.
Args:
key: The setting key
default: Default value if setting doesn't exist
Returns:
The setting value converted to appropriate type
"""
# Check cache first
cache_key = self._get_cache_key(key)
cached_value = cache.get(cache_key)
if cached_value is not None:
return cached_value
try:
template = SettingTemplate.objects.get(key=key, is_active=True)
tenant_setting = TenantSetting.objects.select_related('template').get(
tenant=self.tenant,
template=template
)
# Get typed value
if template.data_type == SettingTemplate.DataType.ENCRYPTED:
value = self._decrypt_value(tenant_setting.encrypted_value)
elif template.data_type == SettingTemplate.DataType.FILE:
value = tenant_setting.file_value
else:
value = tenant_setting.get_typed_value()
# Cache the value
cache.set(cache_key, value, self.CACHE_TIMEOUT)
return value
except (SettingTemplate.DoesNotExist, TenantSetting.DoesNotExist):
# Return default value or template default
try:
template = SettingTemplate.objects.get(key=key, is_active=True)
if template.default_value:
return self._convert_value(template.default_value, template.data_type)
except SettingTemplate.DoesNotExist:
pass
return default
def set_setting(self, key: str, value: Any, user: Optional[User] = None) -> TenantSetting:
"""
Set a setting value with validation.
Args:
key: The setting key
value: The value to set
user: The user making the change (for audit)
Returns:
The created or updated TenantSetting instance
Raises:
ValidationError: If validation fails
"""
try:
template = SettingTemplate.objects.get(key=key, is_active=True)
except SettingTemplate.DoesNotExist:
raise ValidationError(f"Setting template '{key}' does not exist")
# Validate the value
self._validate_value(value, template)
# Get or create tenant setting
tenant_setting, created = TenantSetting.objects.get_or_create(
tenant=self.tenant,
template=template,
defaults={'updated_by': user}
)
# Set the value based on data type
if template.data_type == SettingTemplate.DataType.ENCRYPTED:
tenant_setting.encrypted_value = self._encrypt_value(str(value))
tenant_setting.value = '' # Clear plain text value
elif template.data_type == SettingTemplate.DataType.FILE:
tenant_setting.file_value = value
tenant_setting.value = ''
elif template.data_type == SettingTemplate.DataType.BOOLEAN:
tenant_setting.value = str(bool(value)).lower()
else:
tenant_setting.value = str(value)
tenant_setting.updated_by = user
tenant_setting.save()
# Invalidate cache
cache_key = self._get_cache_key(key)
cache.delete(cache_key)
return tenant_setting
def get_category_settings(self, category: str) -> Dict[str, Any]:
"""
Get all settings for a specific category.
Args:
category: The category name
Returns:
Dictionary of setting key -> value
"""
templates = SettingTemplate.objects.filter(
category=category,
is_active=True
).order_by('order')
result = {}
for template in templates:
result[template.key] = self.get_setting(template.key)
return result
def get_all_settings(self) -> Dict[str, Dict[str, Any]]:
"""
Get all settings organized by category.
Returns:
Dictionary of category -> {setting_key -> value}
"""
categories = SettingTemplate.Category.choices
result = {}
for category_value, category_label in categories:
result[category_value] = self.get_category_settings(category_value)
return result
def validate_required_settings(self) -> bool:
"""
Check if all required settings have values.
Returns:
True if all required settings are set, False otherwise
"""
missing = self.get_missing_required_settings()
return len(missing) == 0
def get_missing_required_settings(self) -> List[SettingTemplate]:
"""
Get list of required settings that are missing values.
Returns:
List of SettingTemplate instances for missing required settings
"""
required_templates = SettingTemplate.objects.filter(
is_required=True,
is_active=True
)
missing = []
for template in required_templates:
try:
tenant_setting = TenantSetting.objects.get(
tenant=self.tenant,
template=template
)
# Check if value is actually set
if template.data_type == SettingTemplate.DataType.FILE:
if not tenant_setting.file_value:
missing.append(template)
elif template.data_type == SettingTemplate.DataType.ENCRYPTED:
if not tenant_setting.encrypted_value:
missing.append(template)
else:
if not tenant_setting.value:
missing.append(template)
except TenantSetting.DoesNotExist:
missing.append(template)
return missing
def export_settings(self) -> Dict[str, Any]:
"""
Export all settings as JSON (excluding encrypted values).
Returns:
Dictionary of settings
"""
settings_dict = {}
tenant_settings = TenantSetting.objects.filter(
tenant=self.tenant
).select_related('template')
for ts in tenant_settings:
# Skip encrypted values for security
if ts.template.data_type == SettingTemplate.DataType.ENCRYPTED:
settings_dict[ts.template.key] = '***ENCRYPTED***'
elif ts.template.data_type == SettingTemplate.DataType.FILE:
settings_dict[ts.template.key] = str(ts.file_value) if ts.file_value else None
else:
settings_dict[ts.template.key] = ts.get_typed_value()
return settings_dict
@transaction.atomic
def import_settings(self, data: Dict[str, Any], user: Optional[User] = None) -> int:
"""
Import settings from a dictionary.
Args:
data: Dictionary of setting_key -> value
user: The user performing the import
Returns:
Number of settings imported
Raises:
ValidationError: If validation fails
"""
count = 0
for key, value in data.items():
# Skip encrypted placeholders
if value == '***ENCRYPTED***':
continue
try:
self.set_setting(key, value, user)
count += 1
except ValidationError as e:
# Log error but continue with other settings
print(f"Error importing setting {key}: {e}")
return count
def _validate_value(self, value: Any, template: SettingTemplate) -> None:
"""
Validate a value against template rules.
Args:
value: The value to validate
template: The setting template
Raises:
ValidationError: If validation fails
"""
# Check required
if template.is_required and not value:
raise ValidationError(f"{template.label_en} is required")
# Skip validation if value is empty and not required
if not value and not template.is_required:
return
# Type-specific validation
if template.data_type == SettingTemplate.DataType.INTEGER:
try:
int(value)
except (ValueError, TypeError):
raise ValidationError(f"{template.label_en} must be an integer")
elif template.data_type == SettingTemplate.DataType.EMAIL:
from django.core.validators import validate_email
try:
validate_email(value)
except ValidationError:
raise ValidationError(f"{template.label_en} must be a valid email")
elif template.data_type == SettingTemplate.DataType.URL:
from django.core.validators import URLValidator
validator = URLValidator()
try:
validator(value)
except ValidationError:
raise ValidationError(f"{template.label_en} must be a valid URL")
elif template.data_type == SettingTemplate.DataType.CHOICE:
if template.choices:
valid_values = [choice['value'] for choice in template.choices]
if value not in valid_values:
raise ValidationError(
f"{template.label_en} must be one of: {', '.join(valid_values)}"
)
# Regex validation
if template.validation_regex and value:
if not re.match(template.validation_regex, str(value)):
raise ValidationError(
f"{template.label_en} does not match required format"
)
def _convert_value(self, value: str, data_type: str) -> Any:
"""Convert string value to appropriate type."""
if data_type == SettingTemplate.DataType.BOOLEAN:
return value.lower() in ('true', '1', 'yes')
elif data_type == SettingTemplate.DataType.INTEGER:
try:
return int(value)
except (ValueError, TypeError):
return None
else:
return value
def _encrypt_value(self, value: str) -> bytes:
"""Encrypt a sensitive value."""
cipher = self._get_cipher()
return cipher.encrypt(value.encode())
def _decrypt_value(self, encrypted_value: bytes) -> str:
"""Decrypt a sensitive value."""
if not encrypted_value:
return ''
cipher = self._get_cipher()
try:
return cipher.decrypt(encrypted_value).decode()
except Exception:
return ''
def clear_cache(self) -> None:
"""Clear all cached settings for this tenant."""
templates = SettingTemplate.objects.filter(is_active=True)
for template in templates:
cache_key = self._get_cache_key(template.key)
cache.delete(cache_key)
def get_email_configuration(self) -> Dict[str, Any]:
"""
Get email configuration for this tenant.
Returns:
Dictionary with email configuration:
{
'backend': 'console' or 'smtp',
'host': SMTP host,
'port': SMTP port,
'username': SMTP username,
'password': SMTP password (decrypted),
'use_tls': Boolean,
'from_email': From email address,
'from_name': From name,
}
"""
return {
'backend': self.get_setting('email_backend', 'console'),
'host': self.get_setting('email_host', 'localhost'),
'port': self.get_setting('email_port', 587),
'username': self.get_setting('email_host_user', ''),
'password': self.get_setting('email_host_password', ''),
'use_tls': self.get_setting('email_use_tls', True),
'from_email': self.get_setting('email_from_address', 'noreply@agdarcentre.sa'),
'from_name': self.get_setting('email_from_name', self.tenant.name),
}
def get_tenant_settings_service(tenant: Tenant) -> TenantSettingsService:
"""
Factory function to get a settings service for a tenant.
Args:
tenant: The tenant instance
Returns:
TenantSettingsService instance
"""
return TenantSettingsService(tenant)