agdar/finance/crypto_service.py
2025-11-02 14:35:35 +03:00

398 lines
12 KiB
Python

"""
Cryptographic Service for ZATCA E-Invoice
This module handles:
- ECDSA key pair generation (secp256k1 curve)
- Digital signature generation
- Certificate signing request (CSR) generation
- Signature verification
"""
import base64
import hashlib
import logging
from typing import Tuple, Optional
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
from cryptography import x509
from cryptography.x509.oid import NameOID
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class CryptoService:
"""Service for cryptographic operations."""
@staticmethod
def generate_key_pair() -> Tuple[bytes, bytes]:
"""
Generate ECDSA key pair using secp256k1 curve.
Returns:
Tuple[bytes, bytes]: (private_key_pem, public_key_pem)
"""
try:
# Generate private key using secp256k1 curve
private_key = ec.generate_private_key(
ec.SECP256K1(),
default_backend()
)
# Serialize private key to PEM format
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# Get public key
public_key = private_key.public_key()
# Serialize public key to PEM format
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
logger.info("ECDSA key pair generated successfully")
return private_pem, public_pem
except Exception as e:
logger.error(f"Error generating key pair: {e}")
raise
@staticmethod
def sign_data(data: str, private_key_pem: bytes) -> str:
"""
Sign data using ECDSA private key.
Args:
data: Data to sign (usually invoice hash)
private_key_pem: Private key in PEM format
Returns:
str: Base64 encoded signature
"""
try:
# Load private key
private_key = serialization.load_pem_private_key(
private_key_pem,
password=None,
backend=default_backend()
)
# Sign data
signature = private_key.sign(
data.encode('utf-8'),
ec.ECDSA(hashes.SHA256())
)
# Encode signature in base64
signature_base64 = base64.b64encode(signature).decode('utf-8')
logger.info("Data signed successfully")
return signature_base64
except Exception as e:
logger.error(f"Error signing data: {e}")
raise
@staticmethod
def verify_signature(data: str, signature_base64: str, public_key_pem: bytes) -> bool:
"""
Verify ECDSA signature.
Args:
data: Original data
signature_base64: Base64 encoded signature
public_key_pem: Public key in PEM format
Returns:
bool: True if signature is valid
"""
try:
# Load public key
public_key = serialization.load_pem_public_key(
public_key_pem,
backend=default_backend()
)
# Decode signature
signature = base64.b64decode(signature_base64)
# Verify signature
public_key.verify(
signature,
data.encode('utf-8'),
ec.ECDSA(hashes.SHA256())
)
logger.info("Signature verified successfully")
return True
except Exception as e:
logger.error(f"Signature verification failed: {e}")
return False
@staticmethod
def generate_csr(
tenant,
egs_serial_number: str,
common_name: str,
organization_unit: str = "",
private_key_pem: bytes = None
) -> Tuple[str, bytes]:
"""
Generate Certificate Signing Request (CSR) for ZATCA onboarding.
Args:
tenant: Tenant instance
egs_serial_number: EGS serial number (format: 1-Manufacturer|2-Model|3-Serial)
common_name: Name or asset tracking number
organization_unit: Branch name or TIN for VAT groups
private_key_pem: Optional existing private key
Returns:
Tuple[str, bytes]: (csr_pem, private_key_pem)
"""
try:
# Generate key pair if not provided
if not private_key_pem:
private_key_pem, _ = CryptoService.generate_key_pair()
# Load private key
private_key = serialization.load_pem_private_key(
private_key_pem,
password=None,
backend=default_backend()
)
# Build CSR subject
subject_components = [
x509.NameAttribute(NameOID.COMMON_NAME, common_name),
x509.NameAttribute(NameOID.SERIAL_NUMBER, egs_serial_number),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, tenant.name),
x509.NameAttribute(NameOID.COUNTRY_NAME, "SA"),
]
# Add organization identifier (VAT number)
if tenant.vat_number:
subject_components.append(
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, tenant.vat_number)
)
# Add organization unit if provided
if organization_unit:
subject_components.append(
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, organization_unit)
)
subject = x509.Name(subject_components)
# Build CSR
csr = x509.CertificateSigningRequestBuilder().subject_name(
subject
).sign(private_key, hashes.SHA256(), default_backend())
# Serialize CSR to PEM
csr_pem = csr.public_bytes(serialization.Encoding.PEM).decode('utf-8')
logger.info(f"CSR generated for tenant {tenant.name}")
return csr_pem, private_key_pem
except Exception as e:
logger.error(f"Error generating CSR: {e}")
raise
@staticmethod
def extract_public_key_from_certificate(certificate_pem: str) -> bytes:
"""
Extract public key from X.509 certificate.
Args:
certificate_pem: Certificate in PEM format
Returns:
bytes: Public key in PEM format
"""
try:
# Load certificate
cert = x509.load_pem_x509_certificate(
certificate_pem.encode('utf-8'),
default_backend()
)
# Extract public key
public_key = cert.public_key()
# Serialize to PEM
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return public_pem
except Exception as e:
logger.error(f"Error extracting public key: {e}")
raise
@staticmethod
def hash_certificate(certificate_pem: str) -> str:
"""
Generate SHA-256 hash of certificate.
Args:
certificate_pem: Certificate in PEM format
Returns:
str: Base64 encoded hash
"""
try:
# Hash certificate
cert_hash = hashlib.sha256(certificate_pem.encode('utf-8')).digest()
# Encode in base64
hash_base64 = base64.b64encode(cert_hash).decode('utf-8')
return hash_base64
except Exception as e:
logger.error(f"Error hashing certificate: {e}")
raise
class InvoiceSigningService:
"""Service for signing invoices."""
@staticmethod
def sign_invoice(invoice, private_key_pem: bytes) -> str:
"""
Sign invoice using ECDSA.
Args:
invoice: Invoice instance
private_key_pem: Private key in PEM format
Returns:
str: Base64 encoded signature
"""
try:
# Get invoice hash
invoice_hash = invoice.invoice_hash
# Sign hash
signature = CryptoService.sign_data(invoice_hash, private_key_pem)
logger.info(f"Invoice {invoice.invoice_number} signed successfully")
return signature
except Exception as e:
logger.error(f"Error signing invoice: {e}")
raise
@staticmethod
def sign_simplified_invoice(invoice, csid_obj) -> bool:
"""
Sign simplified invoice with CSID.
Args:
invoice: Invoice instance
csid_obj: CSID instance with private key
Returns:
bool: True if signing successful
"""
try:
# For simplified invoices, we need to sign with the CSID private key
# Note: In production, private key should be stored securely
# Generate signature (placeholder - needs actual private key)
signature = f"SIGNED_{invoice.invoice_hash[:32]}"
# Update invoice
invoice.cryptographic_stamp = signature
invoice.save(update_fields=['cryptographic_stamp'])
# Update CSID usage
csid_obj.increment_usage()
logger.info(f"Simplified invoice {invoice.invoice_number} signed")
return True
except Exception as e:
logger.error(f"Error signing simplified invoice: {e}")
return False
class SecureKeyStorage:
"""Service for secure key storage."""
@staticmethod
def encrypt_key(key_data: bytes, encryption_key: str) -> str:
"""
Encrypt private key for storage.
Args:
key_data: Key data to encrypt
encryption_key: Encryption key
Returns:
str: Base64 encoded encrypted key
"""
try:
from cryptography.fernet import Fernet
# Create Fernet instance
f = Fernet(encryption_key.encode())
# Encrypt key
encrypted = f.encrypt(key_data)
# Encode in base64
encrypted_base64 = base64.b64encode(encrypted).decode('utf-8')
return encrypted_base64
except Exception as e:
logger.error(f"Error encrypting key: {e}")
raise
@staticmethod
def decrypt_key(encrypted_key_base64: str, encryption_key: str) -> bytes:
"""
Decrypt private key from storage.
Args:
encrypted_key_base64: Base64 encoded encrypted key
encryption_key: Encryption key
Returns:
bytes: Decrypted key data
"""
try:
from cryptography.fernet import Fernet
# Create Fernet instance
f = Fernet(encryption_key.encode())
# Decode from base64
encrypted = base64.b64decode(encrypted_key_base64)
# Decrypt key
decrypted = f.decrypt(encrypted)
return decrypted
except Exception as e:
logger.error(f"Error decrypting key: {e}")
raise