""" CSID (Cryptographic Stamp Identifier) Management This module handles: - CSID storage and retrieval - CSID renewal - Private key management - Certificate validation """ import logging from datetime import timedelta from typing import Optional, Tuple from django.core.exceptions import ValidationError from django.utils import timezone # Import CSID model from finance.models to avoid duplicate model registration from finance.models import CSID logger = logging.getLogger(__name__) class CSIDManager: """Manager class for CSID operations.""" @staticmethod def get_active_csid(tenant) -> Optional[CSID]: """ Get active production CSID for tenant. Args: tenant: Tenant instance Returns: Optional[CSID]: Active CSID or None """ try: csid = CSID.objects.filter( tenant=tenant, csid_type=CSID.CSIDType.PRODUCTION, status=CSID.Status.ACTIVE, expiry_date__gt=timezone.now() ).order_by('-issue_date').first() if csid: logger.info(f"Active CSID found for tenant {tenant.name}: {csid.id}") return csid else: logger.warning(f"No active CSID found for tenant {tenant.name}") return None except Exception as e: logger.error(f"Error retrieving active CSID: {e}") return None @staticmethod def check_expiry_and_renew(tenant) -> Tuple[bool, str]: """ Check if CSID needs renewal and initiate if needed. Args: tenant: Tenant instance Returns: Tuple[bool, str]: (needs_renewal, message) """ csid = CSIDManager.get_active_csid(tenant) if not csid: return True, "No active CSID found. Please onboard your EGS unit." if csid.needs_renewal: return True, f"CSID expires in {csid.days_until_expiry} days. Please renew." return False, "CSID is valid" @staticmethod def revoke_all_for_tenant(tenant, reason: str = ""): """ Revoke all CSIDs for a tenant. Args: tenant: Tenant instance reason: Reason for revocation """ csids = CSID.objects.filter( tenant=tenant, status=CSID.Status.ACTIVE ) count = 0 for csid in csids: csid.revoke(reason) count += 1 logger.info(f"Revoked {count} CSIDs for tenant {tenant.name}") return count class InvoiceCounterManager: """Manager for invoice counter operations.""" @staticmethod def get_next_counter(tenant) -> int: """ Get next invoice counter value for tenant. Args: tenant: Tenant instance Returns: int: Next counter value """ from finance.models import Invoice # Get the highest counter value for this tenant last_invoice = Invoice.objects.filter( tenant=tenant ).order_by('-invoice_counter').first() if last_invoice: next_counter = last_invoice.invoice_counter + 1 else: next_counter = 1 logger.info(f"Next invoice counter for tenant {tenant.name}: {next_counter}") return next_counter @staticmethod def get_previous_invoice_hash(tenant) -> str: """ Get hash of the last invoice for this tenant. Args: tenant: Tenant instance Returns: str: Previous invoice hash (empty string if no previous invoice) """ from finance.models import Invoice last_invoice = Invoice.objects.filter( tenant=tenant ).order_by('-invoice_counter').first() if last_invoice and last_invoice.invoice_hash: logger.info(f"Previous invoice hash retrieved for tenant {tenant.name}") return last_invoice.invoice_hash logger.info(f"No previous invoice found for tenant {tenant.name}") return "" @staticmethod def validate_counter_sequence(tenant) -> Tuple[bool, list]: """ Validate that invoice counters are sequential. Args: tenant: Tenant instance Returns: Tuple[bool, list]: (is_valid, list of gaps) """ from finance.models import Invoice invoices = Invoice.objects.filter( tenant=tenant ).order_by('invoice_counter').values_list('invoice_counter', flat=True) if not invoices: return True, [] gaps = [] expected = 1 for counter in invoices: if counter != expected: gaps.append((expected, counter - 1)) expected = counter + 1 else: expected += 1 is_valid = len(gaps) == 0 if not is_valid: logger.warning(f"Invoice counter gaps found for tenant {tenant.name}: {gaps}") return is_valid, gaps