411 lines
14 KiB
Python
411 lines
14 KiB
Python
"""
|
|
Tenant Settings Service Layer
|
|
|
|
Provides type-safe retrieval, validation, and management of tenant settings.
|
|
"""
|
|
|
|
import re
|
|
from typing import Any, Dict, List, Optional
|
|
from django.core.cache import cache
|
|
from django.core.exceptions import ValidationError
|
|
from django.db import transaction
|
|
from cryptography.fernet import Fernet
|
|
from django.conf import settings
|
|
|
|
from .models import Tenant, TenantSetting, SettingTemplate, User
|
|
|
|
|
|
class TenantSettingsService:
|
|
"""
|
|
Service for managing tenant settings with type safety and validation.
|
|
"""
|
|
|
|
# Cache timeout in seconds (5 minutes)
|
|
CACHE_TIMEOUT = 300
|
|
|
|
def __init__(self, tenant: Tenant):
|
|
"""
|
|
Initialize the service for a specific tenant.
|
|
|
|
Args:
|
|
tenant: The tenant instance
|
|
"""
|
|
self.tenant = tenant
|
|
self._cipher = None
|
|
|
|
def _get_cache_key(self, setting_key: str) -> str:
|
|
"""Generate cache key for a setting."""
|
|
return f"tenant_setting:{self.tenant.id}:{setting_key}"
|
|
|
|
def _get_cipher(self):
|
|
"""Get or create Fernet cipher for encryption/decryption."""
|
|
if self._cipher is None:
|
|
# Use Django SECRET_KEY as encryption key (in production, use a dedicated key)
|
|
key = settings.SECRET_KEY[:32].encode().ljust(32, b'0')
|
|
from base64 import urlsafe_b64encode
|
|
fernet_key = urlsafe_b64encode(key)
|
|
self._cipher = Fernet(fernet_key)
|
|
return self._cipher
|
|
|
|
def get_setting(self, key: str, default: Any = None) -> Any:
|
|
"""
|
|
Retrieve a setting value with type conversion.
|
|
|
|
Args:
|
|
key: The setting key
|
|
default: Default value if setting doesn't exist
|
|
|
|
Returns:
|
|
The setting value converted to appropriate type
|
|
"""
|
|
# Check cache first
|
|
cache_key = self._get_cache_key(key)
|
|
cached_value = cache.get(cache_key)
|
|
if cached_value is not None:
|
|
return cached_value
|
|
|
|
try:
|
|
template = SettingTemplate.objects.get(key=key, is_active=True)
|
|
tenant_setting = TenantSetting.objects.select_related('template').get(
|
|
tenant=self.tenant,
|
|
template=template
|
|
)
|
|
|
|
# Get typed value
|
|
if template.data_type == SettingTemplate.DataType.ENCRYPTED:
|
|
value = self._decrypt_value(tenant_setting.encrypted_value)
|
|
elif template.data_type == SettingTemplate.DataType.FILE:
|
|
value = tenant_setting.file_value
|
|
else:
|
|
value = tenant_setting.get_typed_value()
|
|
|
|
# Cache the value
|
|
cache.set(cache_key, value, self.CACHE_TIMEOUT)
|
|
return value
|
|
|
|
except (SettingTemplate.DoesNotExist, TenantSetting.DoesNotExist):
|
|
# Return default value or template default
|
|
try:
|
|
template = SettingTemplate.objects.get(key=key, is_active=True)
|
|
if template.default_value:
|
|
return self._convert_value(template.default_value, template.data_type)
|
|
except SettingTemplate.DoesNotExist:
|
|
pass
|
|
|
|
return default
|
|
|
|
def set_setting(self, key: str, value: Any, user: Optional[User] = None) -> TenantSetting:
|
|
"""
|
|
Set a setting value with validation.
|
|
|
|
Args:
|
|
key: The setting key
|
|
value: The value to set
|
|
user: The user making the change (for audit)
|
|
|
|
Returns:
|
|
The created or updated TenantSetting instance
|
|
|
|
Raises:
|
|
ValidationError: If validation fails
|
|
"""
|
|
try:
|
|
template = SettingTemplate.objects.get(key=key, is_active=True)
|
|
except SettingTemplate.DoesNotExist:
|
|
raise ValidationError(f"Setting template '{key}' does not exist")
|
|
|
|
# Validate the value
|
|
self._validate_value(value, template)
|
|
|
|
# Get or create tenant setting
|
|
tenant_setting, created = TenantSetting.objects.get_or_create(
|
|
tenant=self.tenant,
|
|
template=template,
|
|
defaults={'updated_by': user}
|
|
)
|
|
|
|
# Set the value based on data type
|
|
if template.data_type == SettingTemplate.DataType.ENCRYPTED:
|
|
tenant_setting.encrypted_value = self._encrypt_value(str(value))
|
|
tenant_setting.value = '' # Clear plain text value
|
|
elif template.data_type == SettingTemplate.DataType.FILE:
|
|
tenant_setting.file_value = value
|
|
tenant_setting.value = ''
|
|
elif template.data_type == SettingTemplate.DataType.BOOLEAN:
|
|
tenant_setting.value = str(bool(value)).lower()
|
|
else:
|
|
tenant_setting.value = str(value)
|
|
|
|
tenant_setting.updated_by = user
|
|
tenant_setting.save()
|
|
|
|
# Invalidate cache
|
|
cache_key = self._get_cache_key(key)
|
|
cache.delete(cache_key)
|
|
|
|
return tenant_setting
|
|
|
|
def get_category_settings(self, category: str) -> Dict[str, Any]:
|
|
"""
|
|
Get all settings for a specific category.
|
|
|
|
Args:
|
|
category: The category name
|
|
|
|
Returns:
|
|
Dictionary of setting key -> value
|
|
"""
|
|
templates = SettingTemplate.objects.filter(
|
|
category=category,
|
|
is_active=True
|
|
).order_by('order')
|
|
|
|
result = {}
|
|
for template in templates:
|
|
result[template.key] = self.get_setting(template.key)
|
|
|
|
return result
|
|
|
|
def get_all_settings(self) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Get all settings organized by category.
|
|
|
|
Returns:
|
|
Dictionary of category -> {setting_key -> value}
|
|
"""
|
|
categories = SettingTemplate.Category.choices
|
|
result = {}
|
|
|
|
for category_value, category_label in categories:
|
|
result[category_value] = self.get_category_settings(category_value)
|
|
|
|
return result
|
|
|
|
def validate_required_settings(self) -> bool:
|
|
"""
|
|
Check if all required settings have values.
|
|
|
|
Returns:
|
|
True if all required settings are set, False otherwise
|
|
"""
|
|
missing = self.get_missing_required_settings()
|
|
return len(missing) == 0
|
|
|
|
def get_missing_required_settings(self) -> List[SettingTemplate]:
|
|
"""
|
|
Get list of required settings that are missing values.
|
|
|
|
Returns:
|
|
List of SettingTemplate instances for missing required settings
|
|
"""
|
|
required_templates = SettingTemplate.objects.filter(
|
|
is_required=True,
|
|
is_active=True
|
|
)
|
|
|
|
missing = []
|
|
for template in required_templates:
|
|
try:
|
|
tenant_setting = TenantSetting.objects.get(
|
|
tenant=self.tenant,
|
|
template=template
|
|
)
|
|
# Check if value is actually set
|
|
if template.data_type == SettingTemplate.DataType.FILE:
|
|
if not tenant_setting.file_value:
|
|
missing.append(template)
|
|
elif template.data_type == SettingTemplate.DataType.ENCRYPTED:
|
|
if not tenant_setting.encrypted_value:
|
|
missing.append(template)
|
|
else:
|
|
if not tenant_setting.value:
|
|
missing.append(template)
|
|
except TenantSetting.DoesNotExist:
|
|
missing.append(template)
|
|
|
|
return missing
|
|
|
|
def export_settings(self) -> Dict[str, Any]:
|
|
"""
|
|
Export all settings as JSON (excluding encrypted values).
|
|
|
|
Returns:
|
|
Dictionary of settings
|
|
"""
|
|
settings_dict = {}
|
|
tenant_settings = TenantSetting.objects.filter(
|
|
tenant=self.tenant
|
|
).select_related('template')
|
|
|
|
for ts in tenant_settings:
|
|
# Skip encrypted values for security
|
|
if ts.template.data_type == SettingTemplate.DataType.ENCRYPTED:
|
|
settings_dict[ts.template.key] = '***ENCRYPTED***'
|
|
elif ts.template.data_type == SettingTemplate.DataType.FILE:
|
|
settings_dict[ts.template.key] = str(ts.file_value) if ts.file_value else None
|
|
else:
|
|
settings_dict[ts.template.key] = ts.get_typed_value()
|
|
|
|
return settings_dict
|
|
|
|
@transaction.atomic
|
|
def import_settings(self, data: Dict[str, Any], user: Optional[User] = None) -> int:
|
|
"""
|
|
Import settings from a dictionary.
|
|
|
|
Args:
|
|
data: Dictionary of setting_key -> value
|
|
user: The user performing the import
|
|
|
|
Returns:
|
|
Number of settings imported
|
|
|
|
Raises:
|
|
ValidationError: If validation fails
|
|
"""
|
|
count = 0
|
|
for key, value in data.items():
|
|
# Skip encrypted placeholders
|
|
if value == '***ENCRYPTED***':
|
|
continue
|
|
|
|
try:
|
|
self.set_setting(key, value, user)
|
|
count += 1
|
|
except ValidationError as e:
|
|
# Log error but continue with other settings
|
|
print(f"Error importing setting {key}: {e}")
|
|
|
|
return count
|
|
|
|
def _validate_value(self, value: Any, template: SettingTemplate) -> None:
|
|
"""
|
|
Validate a value against template rules.
|
|
|
|
Args:
|
|
value: The value to validate
|
|
template: The setting template
|
|
|
|
Raises:
|
|
ValidationError: If validation fails
|
|
"""
|
|
# Check required
|
|
if template.is_required and not value:
|
|
raise ValidationError(f"{template.label_en} is required")
|
|
|
|
# Skip validation if value is empty and not required
|
|
if not value and not template.is_required:
|
|
return
|
|
|
|
# Type-specific validation
|
|
if template.data_type == SettingTemplate.DataType.INTEGER:
|
|
try:
|
|
int(value)
|
|
except (ValueError, TypeError):
|
|
raise ValidationError(f"{template.label_en} must be an integer")
|
|
|
|
elif template.data_type == SettingTemplate.DataType.EMAIL:
|
|
from django.core.validators import validate_email
|
|
try:
|
|
validate_email(value)
|
|
except ValidationError:
|
|
raise ValidationError(f"{template.label_en} must be a valid email")
|
|
|
|
elif template.data_type == SettingTemplate.DataType.URL:
|
|
from django.core.validators import URLValidator
|
|
validator = URLValidator()
|
|
try:
|
|
validator(value)
|
|
except ValidationError:
|
|
raise ValidationError(f"{template.label_en} must be a valid URL")
|
|
|
|
elif template.data_type == SettingTemplate.DataType.CHOICE:
|
|
if template.choices:
|
|
valid_values = [choice['value'] for choice in template.choices]
|
|
if value not in valid_values:
|
|
raise ValidationError(
|
|
f"{template.label_en} must be one of: {', '.join(valid_values)}"
|
|
)
|
|
|
|
# Regex validation
|
|
if template.validation_regex and value:
|
|
if not re.match(template.validation_regex, str(value)):
|
|
raise ValidationError(
|
|
f"{template.label_en} does not match required format"
|
|
)
|
|
|
|
def _convert_value(self, value: str, data_type: str) -> Any:
|
|
"""Convert string value to appropriate type."""
|
|
if data_type == SettingTemplate.DataType.BOOLEAN:
|
|
return value.lower() in ('true', '1', 'yes')
|
|
elif data_type == SettingTemplate.DataType.INTEGER:
|
|
try:
|
|
return int(value)
|
|
except (ValueError, TypeError):
|
|
return None
|
|
else:
|
|
return value
|
|
|
|
def _encrypt_value(self, value: str) -> bytes:
|
|
"""Encrypt a sensitive value."""
|
|
cipher = self._get_cipher()
|
|
return cipher.encrypt(value.encode())
|
|
|
|
def _decrypt_value(self, encrypted_value: bytes) -> str:
|
|
"""Decrypt a sensitive value."""
|
|
if not encrypted_value:
|
|
return ''
|
|
cipher = self._get_cipher()
|
|
try:
|
|
return cipher.decrypt(encrypted_value).decode()
|
|
except Exception:
|
|
return ''
|
|
|
|
def clear_cache(self) -> None:
|
|
"""Clear all cached settings for this tenant."""
|
|
templates = SettingTemplate.objects.filter(is_active=True)
|
|
for template in templates:
|
|
cache_key = self._get_cache_key(template.key)
|
|
cache.delete(cache_key)
|
|
|
|
def get_email_configuration(self) -> Dict[str, Any]:
|
|
"""
|
|
Get email configuration for this tenant.
|
|
|
|
Returns:
|
|
Dictionary with email configuration:
|
|
{
|
|
'backend': 'console' or 'smtp',
|
|
'host': SMTP host,
|
|
'port': SMTP port,
|
|
'username': SMTP username,
|
|
'password': SMTP password (decrypted),
|
|
'use_tls': Boolean,
|
|
'from_email': From email address,
|
|
'from_name': From name,
|
|
}
|
|
"""
|
|
return {
|
|
'backend': self.get_setting('email_backend', 'console'),
|
|
'host': self.get_setting('email_host', 'localhost'),
|
|
'port': self.get_setting('email_port', 587),
|
|
'username': self.get_setting('email_host_user', ''),
|
|
'password': self.get_setting('email_host_password', ''),
|
|
'use_tls': self.get_setting('email_use_tls', True),
|
|
'from_email': self.get_setting('email_from_address', 'noreply@agdarcentre.sa'),
|
|
'from_name': self.get_setting('email_from_name', self.tenant.name),
|
|
}
|
|
|
|
|
|
def get_tenant_settings_service(tenant: Tenant) -> TenantSettingsService:
|
|
"""
|
|
Factory function to get a settings service for a tenant.
|
|
|
|
Args:
|
|
tenant: The tenant instance
|
|
|
|
Returns:
|
|
TenantSettingsService instance
|
|
"""
|
|
return TenantSettingsService(tenant)
|