""" Cryptographic Service for ZATCA E-Invoice This module handles: - ECDSA key pair generation (secp256k1 curve) - Digital signature generation - Certificate signing request (CSR) generation - Signature verification """ import base64 import hashlib import logging from typing import Tuple, Optional from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.backends import default_backend from cryptography import x509 from cryptography.x509.oid import NameOID from datetime import datetime, timedelta logger = logging.getLogger(__name__) class CryptoService: """Service for cryptographic operations.""" @staticmethod def generate_key_pair() -> Tuple[bytes, bytes]: """ Generate ECDSA key pair using secp256k1 curve. Returns: Tuple[bytes, bytes]: (private_key_pem, public_key_pem) """ try: # Generate private key using secp256k1 curve private_key = ec.generate_private_key( ec.SECP256K1(), default_backend() ) # Serialize private key to PEM format private_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) # Get public key public_key = private_key.public_key() # Serialize public key to PEM format public_pem = public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) logger.info("ECDSA key pair generated successfully") return private_pem, public_pem except Exception as e: logger.error(f"Error generating key pair: {e}") raise @staticmethod def sign_data(data: str, private_key_pem: bytes) -> str: """ Sign data using ECDSA private key. Args: data: Data to sign (usually invoice hash) private_key_pem: Private key in PEM format Returns: str: Base64 encoded signature """ try: # Load private key private_key = serialization.load_pem_private_key( private_key_pem, password=None, backend=default_backend() ) # Sign data signature = private_key.sign( data.encode('utf-8'), ec.ECDSA(hashes.SHA256()) ) # Encode signature in base64 signature_base64 = base64.b64encode(signature).decode('utf-8') logger.info("Data signed successfully") return signature_base64 except Exception as e: logger.error(f"Error signing data: {e}") raise @staticmethod def verify_signature(data: str, signature_base64: str, public_key_pem: bytes) -> bool: """ Verify ECDSA signature. Args: data: Original data signature_base64: Base64 encoded signature public_key_pem: Public key in PEM format Returns: bool: True if signature is valid """ try: # Load public key public_key = serialization.load_pem_public_key( public_key_pem, backend=default_backend() ) # Decode signature signature = base64.b64decode(signature_base64) # Verify signature public_key.verify( signature, data.encode('utf-8'), ec.ECDSA(hashes.SHA256()) ) logger.info("Signature verified successfully") return True except Exception as e: logger.error(f"Signature verification failed: {e}") return False @staticmethod def generate_csr( tenant, egs_serial_number: str, common_name: str, organization_unit: str = "", private_key_pem: bytes = None ) -> Tuple[str, bytes]: """ Generate Certificate Signing Request (CSR) for ZATCA onboarding. Args: tenant: Tenant instance egs_serial_number: EGS serial number (format: 1-Manufacturer|2-Model|3-Serial) common_name: Name or asset tracking number organization_unit: Branch name or TIN for VAT groups private_key_pem: Optional existing private key Returns: Tuple[str, bytes]: (csr_pem, private_key_pem) """ try: # Generate key pair if not provided if not private_key_pem: private_key_pem, _ = CryptoService.generate_key_pair() # Load private key private_key = serialization.load_pem_private_key( private_key_pem, password=None, backend=default_backend() ) # Build CSR subject subject_components = [ x509.NameAttribute(NameOID.COMMON_NAME, common_name), x509.NameAttribute(NameOID.SERIAL_NUMBER, egs_serial_number), x509.NameAttribute(NameOID.ORGANIZATION_NAME, tenant.name), x509.NameAttribute(NameOID.COUNTRY_NAME, "SA"), ] # Add organization identifier (VAT number) if tenant.vat_number: subject_components.append( x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, tenant.vat_number) ) # Add organization unit if provided if organization_unit: subject_components.append( x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, organization_unit) ) subject = x509.Name(subject_components) # Build CSR csr = x509.CertificateSigningRequestBuilder().subject_name( subject ).sign(private_key, hashes.SHA256(), default_backend()) # Serialize CSR to PEM csr_pem = csr.public_bytes(serialization.Encoding.PEM).decode('utf-8') logger.info(f"CSR generated for tenant {tenant.name}") return csr_pem, private_key_pem except Exception as e: logger.error(f"Error generating CSR: {e}") raise @staticmethod def extract_public_key_from_certificate(certificate_pem: str) -> bytes: """ Extract public key from X.509 certificate. Args: certificate_pem: Certificate in PEM format Returns: bytes: Public key in PEM format """ try: # Load certificate cert = x509.load_pem_x509_certificate( certificate_pem.encode('utf-8'), default_backend() ) # Extract public key public_key = cert.public_key() # Serialize to PEM public_pem = public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) return public_pem except Exception as e: logger.error(f"Error extracting public key: {e}") raise @staticmethod def hash_certificate(certificate_pem: str) -> str: """ Generate SHA-256 hash of certificate. Args: certificate_pem: Certificate in PEM format Returns: str: Base64 encoded hash """ try: # Hash certificate cert_hash = hashlib.sha256(certificate_pem.encode('utf-8')).digest() # Encode in base64 hash_base64 = base64.b64encode(cert_hash).decode('utf-8') return hash_base64 except Exception as e: logger.error(f"Error hashing certificate: {e}") raise class InvoiceSigningService: """Service for signing invoices.""" @staticmethod def sign_invoice(invoice, private_key_pem: bytes) -> str: """ Sign invoice using ECDSA. Args: invoice: Invoice instance private_key_pem: Private key in PEM format Returns: str: Base64 encoded signature """ try: # Get invoice hash invoice_hash = invoice.invoice_hash # Sign hash signature = CryptoService.sign_data(invoice_hash, private_key_pem) logger.info(f"Invoice {invoice.invoice_number} signed successfully") return signature except Exception as e: logger.error(f"Error signing invoice: {e}") raise @staticmethod def sign_simplified_invoice(invoice, csid_obj) -> bool: """ Sign simplified invoice with CSID. Args: invoice: Invoice instance csid_obj: CSID instance with private key Returns: bool: True if signing successful """ try: # For simplified invoices, we need to sign with the CSID private key # Note: In production, private key should be stored securely # Generate signature (placeholder - needs actual private key) signature = f"SIGNED_{invoice.invoice_hash[:32]}" # Update invoice invoice.cryptographic_stamp = signature invoice.save(update_fields=['cryptographic_stamp']) # Update CSID usage csid_obj.increment_usage() logger.info(f"Simplified invoice {invoice.invoice_number} signed") return True except Exception as e: logger.error(f"Error signing simplified invoice: {e}") return False class SecureKeyStorage: """Service for secure key storage.""" @staticmethod def encrypt_key(key_data: bytes, encryption_key: str) -> str: """ Encrypt private key for storage. Args: key_data: Key data to encrypt encryption_key: Encryption key Returns: str: Base64 encoded encrypted key """ try: from cryptography.fernet import Fernet # Create Fernet instance f = Fernet(encryption_key.encode()) # Encrypt key encrypted = f.encrypt(key_data) # Encode in base64 encrypted_base64 = base64.b64encode(encrypted).decode('utf-8') return encrypted_base64 except Exception as e: logger.error(f"Error encrypting key: {e}") raise @staticmethod def decrypt_key(encrypted_key_base64: str, encryption_key: str) -> bytes: """ Decrypt private key from storage. Args: encrypted_key_base64: Base64 encoded encrypted key encryption_key: Encryption key Returns: bytes: Decrypted key data """ try: from cryptography.fernet import Fernet # Create Fernet instance f = Fernet(encryption_key.encode()) # Decode from base64 encrypted = base64.b64decode(encrypted_key_base64) # Decrypt key decrypted = f.decrypt(encrypted) return decrypted except Exception as e: logger.error(f"Error decrypting key: {e}") raise