190 lines
5.3 KiB
Python
190 lines
5.3 KiB
Python
"""
|
|
CSID (Cryptographic Stamp Identifier) Management
|
|
|
|
This module handles:
|
|
- CSID storage and retrieval
|
|
- CSID renewal
|
|
- Private key management
|
|
- Certificate validation
|
|
"""
|
|
|
|
import logging
|
|
from datetime import timedelta
|
|
from typing import Optional, Tuple
|
|
|
|
from django.core.exceptions import ValidationError
|
|
from django.utils import timezone
|
|
|
|
# Import CSID model from finance.models to avoid duplicate model registration
|
|
from finance.models import CSID
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CSIDManager:
|
|
"""Manager class for CSID operations."""
|
|
|
|
@staticmethod
|
|
def get_active_csid(tenant) -> Optional[CSID]:
|
|
"""
|
|
Get active production CSID for tenant.
|
|
|
|
Args:
|
|
tenant: Tenant instance
|
|
|
|
Returns:
|
|
Optional[CSID]: Active CSID or None
|
|
"""
|
|
try:
|
|
csid = CSID.objects.filter(
|
|
tenant=tenant,
|
|
csid_type=CSID.CSIDType.PRODUCTION,
|
|
status=CSID.Status.ACTIVE,
|
|
expiry_date__gt=timezone.now()
|
|
).order_by('-issue_date').first()
|
|
|
|
if csid:
|
|
logger.info(f"Active CSID found for tenant {tenant.name}: {csid.id}")
|
|
return csid
|
|
else:
|
|
logger.warning(f"No active CSID found for tenant {tenant.name}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving active CSID: {e}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def check_expiry_and_renew(tenant) -> Tuple[bool, str]:
|
|
"""
|
|
Check if CSID needs renewal and initiate if needed.
|
|
|
|
Args:
|
|
tenant: Tenant instance
|
|
|
|
Returns:
|
|
Tuple[bool, str]: (needs_renewal, message)
|
|
"""
|
|
csid = CSIDManager.get_active_csid(tenant)
|
|
|
|
if not csid:
|
|
return True, "No active CSID found. Please onboard your EGS unit."
|
|
|
|
if csid.needs_renewal:
|
|
return True, f"CSID expires in {csid.days_until_expiry} days. Please renew."
|
|
|
|
return False, "CSID is valid"
|
|
|
|
@staticmethod
|
|
def revoke_all_for_tenant(tenant, reason: str = ""):
|
|
"""
|
|
Revoke all CSIDs for a tenant.
|
|
|
|
Args:
|
|
tenant: Tenant instance
|
|
reason: Reason for revocation
|
|
"""
|
|
csids = CSID.objects.filter(
|
|
tenant=tenant,
|
|
status=CSID.Status.ACTIVE
|
|
)
|
|
|
|
count = 0
|
|
for csid in csids:
|
|
csid.revoke(reason)
|
|
count += 1
|
|
|
|
logger.info(f"Revoked {count} CSIDs for tenant {tenant.name}")
|
|
return count
|
|
|
|
|
|
class InvoiceCounterManager:
|
|
"""Manager for invoice counter operations."""
|
|
|
|
@staticmethod
|
|
def get_next_counter(tenant) -> int:
|
|
"""
|
|
Get next invoice counter value for tenant.
|
|
|
|
Args:
|
|
tenant: Tenant instance
|
|
|
|
Returns:
|
|
int: Next counter value
|
|
"""
|
|
from finance.models import Invoice
|
|
|
|
# Get the highest counter value for this tenant
|
|
last_invoice = Invoice.objects.filter(
|
|
tenant=tenant
|
|
).order_by('-invoice_counter').first()
|
|
|
|
if last_invoice:
|
|
next_counter = last_invoice.invoice_counter + 1
|
|
else:
|
|
next_counter = 1
|
|
|
|
logger.info(f"Next invoice counter for tenant {tenant.name}: {next_counter}")
|
|
return next_counter
|
|
|
|
@staticmethod
|
|
def get_previous_invoice_hash(tenant) -> str:
|
|
"""
|
|
Get hash of the last invoice for this tenant.
|
|
|
|
Args:
|
|
tenant: Tenant instance
|
|
|
|
Returns:
|
|
str: Previous invoice hash (empty string if no previous invoice)
|
|
"""
|
|
from finance.models import Invoice
|
|
|
|
last_invoice = Invoice.objects.filter(
|
|
tenant=tenant
|
|
).order_by('-invoice_counter').first()
|
|
|
|
if last_invoice and last_invoice.invoice_hash:
|
|
logger.info(f"Previous invoice hash retrieved for tenant {tenant.name}")
|
|
return last_invoice.invoice_hash
|
|
|
|
logger.info(f"No previous invoice found for tenant {tenant.name}")
|
|
return ""
|
|
|
|
@staticmethod
|
|
def validate_counter_sequence(tenant) -> Tuple[bool, list]:
|
|
"""
|
|
Validate that invoice counters are sequential.
|
|
|
|
Args:
|
|
tenant: Tenant instance
|
|
|
|
Returns:
|
|
Tuple[bool, list]: (is_valid, list of gaps)
|
|
"""
|
|
from finance.models import Invoice
|
|
|
|
invoices = Invoice.objects.filter(
|
|
tenant=tenant
|
|
).order_by('invoice_counter').values_list('invoice_counter', flat=True)
|
|
|
|
if not invoices:
|
|
return True, []
|
|
|
|
gaps = []
|
|
expected = 1
|
|
|
|
for counter in invoices:
|
|
if counter != expected:
|
|
gaps.append((expected, counter - 1))
|
|
expected = counter + 1
|
|
else:
|
|
expected += 1
|
|
|
|
is_valid = len(gaps) == 0
|
|
|
|
if not is_valid:
|
|
logger.warning(f"Invoice counter gaps found for tenant {tenant.name}: {gaps}")
|
|
|
|
return is_valid, gaps
|