agdar/finance/csid_manager.py
2025-11-02 14:35:35 +03:00

190 lines
5.3 KiB
Python

"""
CSID (Cryptographic Stamp Identifier) Management
This module handles:
- CSID storage and retrieval
- CSID renewal
- Private key management
- Certificate validation
"""
import logging
from datetime import timedelta
from typing import Optional, Tuple
from django.core.exceptions import ValidationError
from django.utils import timezone
# Import CSID model from finance.models to avoid duplicate model registration
from finance.models import CSID
logger = logging.getLogger(__name__)
class CSIDManager:
"""Manager class for CSID operations."""
@staticmethod
def get_active_csid(tenant) -> Optional[CSID]:
"""
Get active production CSID for tenant.
Args:
tenant: Tenant instance
Returns:
Optional[CSID]: Active CSID or None
"""
try:
csid = CSID.objects.filter(
tenant=tenant,
csid_type=CSID.CSIDType.PRODUCTION,
status=CSID.Status.ACTIVE,
expiry_date__gt=timezone.now()
).order_by('-issue_date').first()
if csid:
logger.info(f"Active CSID found for tenant {tenant.name}: {csid.id}")
return csid
else:
logger.warning(f"No active CSID found for tenant {tenant.name}")
return None
except Exception as e:
logger.error(f"Error retrieving active CSID: {e}")
return None
@staticmethod
def check_expiry_and_renew(tenant) -> Tuple[bool, str]:
"""
Check if CSID needs renewal and initiate if needed.
Args:
tenant: Tenant instance
Returns:
Tuple[bool, str]: (needs_renewal, message)
"""
csid = CSIDManager.get_active_csid(tenant)
if not csid:
return True, "No active CSID found. Please onboard your EGS unit."
if csid.needs_renewal:
return True, f"CSID expires in {csid.days_until_expiry} days. Please renew."
return False, "CSID is valid"
@staticmethod
def revoke_all_for_tenant(tenant, reason: str = ""):
"""
Revoke all CSIDs for a tenant.
Args:
tenant: Tenant instance
reason: Reason for revocation
"""
csids = CSID.objects.filter(
tenant=tenant,
status=CSID.Status.ACTIVE
)
count = 0
for csid in csids:
csid.revoke(reason)
count += 1
logger.info(f"Revoked {count} CSIDs for tenant {tenant.name}")
return count
class InvoiceCounterManager:
"""Manager for invoice counter operations."""
@staticmethod
def get_next_counter(tenant) -> int:
"""
Get next invoice counter value for tenant.
Args:
tenant: Tenant instance
Returns:
int: Next counter value
"""
from finance.models import Invoice
# Get the highest counter value for this tenant
last_invoice = Invoice.objects.filter(
tenant=tenant
).order_by('-invoice_counter').first()
if last_invoice:
next_counter = last_invoice.invoice_counter + 1
else:
next_counter = 1
logger.info(f"Next invoice counter for tenant {tenant.name}: {next_counter}")
return next_counter
@staticmethod
def get_previous_invoice_hash(tenant) -> str:
"""
Get hash of the last invoice for this tenant.
Args:
tenant: Tenant instance
Returns:
str: Previous invoice hash (empty string if no previous invoice)
"""
from finance.models import Invoice
last_invoice = Invoice.objects.filter(
tenant=tenant
).order_by('-invoice_counter').first()
if last_invoice and last_invoice.invoice_hash:
logger.info(f"Previous invoice hash retrieved for tenant {tenant.name}")
return last_invoice.invoice_hash
logger.info(f"No previous invoice found for tenant {tenant.name}")
return ""
@staticmethod
def validate_counter_sequence(tenant) -> Tuple[bool, list]:
"""
Validate that invoice counters are sequential.
Args:
tenant: Tenant instance
Returns:
Tuple[bool, list]: (is_valid, list of gaps)
"""
from finance.models import Invoice
invoices = Invoice.objects.filter(
tenant=tenant
).order_by('invoice_counter').values_list('invoice_counter', flat=True)
if not invoices:
return True, []
gaps = []
expected = 1
for counter in invoices:
if counter != expected:
gaps.append((expected, counter - 1))
expected = counter + 1
else:
expected += 1
is_valid = len(gaps) == 0
if not is_valid:
logger.warning(f"Invoice counter gaps found for tenant {tenant.name}: {gaps}")
return is_valid, gaps