398 lines
12 KiB
Python
398 lines
12 KiB
Python
"""
|
|
Cryptographic Service for ZATCA E-Invoice
|
|
|
|
This module handles:
|
|
- ECDSA key pair generation (secp256k1 curve)
|
|
- Digital signature generation
|
|
- Certificate signing request (CSR) generation
|
|
- Signature verification
|
|
"""
|
|
|
|
import base64
|
|
import hashlib
|
|
import logging
|
|
from typing import Tuple, Optional
|
|
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ec
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import NameOID
|
|
from datetime import datetime, timedelta
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CryptoService:
|
|
"""Service for cryptographic operations."""
|
|
|
|
@staticmethod
|
|
def generate_key_pair() -> Tuple[bytes, bytes]:
|
|
"""
|
|
Generate ECDSA key pair using secp256k1 curve.
|
|
|
|
Returns:
|
|
Tuple[bytes, bytes]: (private_key_pem, public_key_pem)
|
|
"""
|
|
try:
|
|
# Generate private key using secp256k1 curve
|
|
private_key = ec.generate_private_key(
|
|
ec.SECP256K1(),
|
|
default_backend()
|
|
)
|
|
|
|
# Serialize private key to PEM format
|
|
private_pem = private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
)
|
|
|
|
# Get public key
|
|
public_key = private_key.public_key()
|
|
|
|
# Serialize public key to PEM format
|
|
public_pem = public_key.public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
|
)
|
|
|
|
logger.info("ECDSA key pair generated successfully")
|
|
|
|
return private_pem, public_pem
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating key pair: {e}")
|
|
raise
|
|
|
|
@staticmethod
|
|
def sign_data(data: str, private_key_pem: bytes) -> str:
|
|
"""
|
|
Sign data using ECDSA private key.
|
|
|
|
Args:
|
|
data: Data to sign (usually invoice hash)
|
|
private_key_pem: Private key in PEM format
|
|
|
|
Returns:
|
|
str: Base64 encoded signature
|
|
"""
|
|
try:
|
|
# Load private key
|
|
private_key = serialization.load_pem_private_key(
|
|
private_key_pem,
|
|
password=None,
|
|
backend=default_backend()
|
|
)
|
|
|
|
# Sign data
|
|
signature = private_key.sign(
|
|
data.encode('utf-8'),
|
|
ec.ECDSA(hashes.SHA256())
|
|
)
|
|
|
|
# Encode signature in base64
|
|
signature_base64 = base64.b64encode(signature).decode('utf-8')
|
|
|
|
logger.info("Data signed successfully")
|
|
|
|
return signature_base64
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error signing data: {e}")
|
|
raise
|
|
|
|
@staticmethod
|
|
def verify_signature(data: str, signature_base64: str, public_key_pem: bytes) -> bool:
|
|
"""
|
|
Verify ECDSA signature.
|
|
|
|
Args:
|
|
data: Original data
|
|
signature_base64: Base64 encoded signature
|
|
public_key_pem: Public key in PEM format
|
|
|
|
Returns:
|
|
bool: True if signature is valid
|
|
"""
|
|
try:
|
|
# Load public key
|
|
public_key = serialization.load_pem_public_key(
|
|
public_key_pem,
|
|
backend=default_backend()
|
|
)
|
|
|
|
# Decode signature
|
|
signature = base64.b64decode(signature_base64)
|
|
|
|
# Verify signature
|
|
public_key.verify(
|
|
signature,
|
|
data.encode('utf-8'),
|
|
ec.ECDSA(hashes.SHA256())
|
|
)
|
|
|
|
logger.info("Signature verified successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Signature verification failed: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def generate_csr(
|
|
tenant,
|
|
egs_serial_number: str,
|
|
common_name: str,
|
|
organization_unit: str = "",
|
|
private_key_pem: bytes = None
|
|
) -> Tuple[str, bytes]:
|
|
"""
|
|
Generate Certificate Signing Request (CSR) for ZATCA onboarding.
|
|
|
|
Args:
|
|
tenant: Tenant instance
|
|
egs_serial_number: EGS serial number (format: 1-Manufacturer|2-Model|3-Serial)
|
|
common_name: Name or asset tracking number
|
|
organization_unit: Branch name or TIN for VAT groups
|
|
private_key_pem: Optional existing private key
|
|
|
|
Returns:
|
|
Tuple[str, bytes]: (csr_pem, private_key_pem)
|
|
"""
|
|
try:
|
|
# Generate key pair if not provided
|
|
if not private_key_pem:
|
|
private_key_pem, _ = CryptoService.generate_key_pair()
|
|
|
|
# Load private key
|
|
private_key = serialization.load_pem_private_key(
|
|
private_key_pem,
|
|
password=None,
|
|
backend=default_backend()
|
|
)
|
|
|
|
# Build CSR subject
|
|
subject_components = [
|
|
x509.NameAttribute(NameOID.COMMON_NAME, common_name),
|
|
x509.NameAttribute(NameOID.SERIAL_NUMBER, egs_serial_number),
|
|
x509.NameAttribute(NameOID.ORGANIZATION_NAME, tenant.name),
|
|
x509.NameAttribute(NameOID.COUNTRY_NAME, "SA"),
|
|
]
|
|
|
|
# Add organization identifier (VAT number)
|
|
if tenant.vat_number:
|
|
subject_components.append(
|
|
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, tenant.vat_number)
|
|
)
|
|
|
|
# Add organization unit if provided
|
|
if organization_unit:
|
|
subject_components.append(
|
|
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, organization_unit)
|
|
)
|
|
|
|
subject = x509.Name(subject_components)
|
|
|
|
# Build CSR
|
|
csr = x509.CertificateSigningRequestBuilder().subject_name(
|
|
subject
|
|
).sign(private_key, hashes.SHA256(), default_backend())
|
|
|
|
# Serialize CSR to PEM
|
|
csr_pem = csr.public_bytes(serialization.Encoding.PEM).decode('utf-8')
|
|
|
|
logger.info(f"CSR generated for tenant {tenant.name}")
|
|
|
|
return csr_pem, private_key_pem
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating CSR: {e}")
|
|
raise
|
|
|
|
@staticmethod
|
|
def extract_public_key_from_certificate(certificate_pem: str) -> bytes:
|
|
"""
|
|
Extract public key from X.509 certificate.
|
|
|
|
Args:
|
|
certificate_pem: Certificate in PEM format
|
|
|
|
Returns:
|
|
bytes: Public key in PEM format
|
|
"""
|
|
try:
|
|
# Load certificate
|
|
cert = x509.load_pem_x509_certificate(
|
|
certificate_pem.encode('utf-8'),
|
|
default_backend()
|
|
)
|
|
|
|
# Extract public key
|
|
public_key = cert.public_key()
|
|
|
|
# Serialize to PEM
|
|
public_pem = public_key.public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
|
)
|
|
|
|
return public_pem
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting public key: {e}")
|
|
raise
|
|
|
|
@staticmethod
|
|
def hash_certificate(certificate_pem: str) -> str:
|
|
"""
|
|
Generate SHA-256 hash of certificate.
|
|
|
|
Args:
|
|
certificate_pem: Certificate in PEM format
|
|
|
|
Returns:
|
|
str: Base64 encoded hash
|
|
"""
|
|
try:
|
|
# Hash certificate
|
|
cert_hash = hashlib.sha256(certificate_pem.encode('utf-8')).digest()
|
|
|
|
# Encode in base64
|
|
hash_base64 = base64.b64encode(cert_hash).decode('utf-8')
|
|
|
|
return hash_base64
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error hashing certificate: {e}")
|
|
raise
|
|
|
|
|
|
class InvoiceSigningService:
|
|
"""Service for signing invoices."""
|
|
|
|
@staticmethod
|
|
def sign_invoice(invoice, private_key_pem: bytes) -> str:
|
|
"""
|
|
Sign invoice using ECDSA.
|
|
|
|
Args:
|
|
invoice: Invoice instance
|
|
private_key_pem: Private key in PEM format
|
|
|
|
Returns:
|
|
str: Base64 encoded signature
|
|
"""
|
|
try:
|
|
# Get invoice hash
|
|
invoice_hash = invoice.invoice_hash
|
|
|
|
# Sign hash
|
|
signature = CryptoService.sign_data(invoice_hash, private_key_pem)
|
|
|
|
logger.info(f"Invoice {invoice.invoice_number} signed successfully")
|
|
|
|
return signature
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error signing invoice: {e}")
|
|
raise
|
|
|
|
@staticmethod
|
|
def sign_simplified_invoice(invoice, csid_obj) -> bool:
|
|
"""
|
|
Sign simplified invoice with CSID.
|
|
|
|
Args:
|
|
invoice: Invoice instance
|
|
csid_obj: CSID instance with private key
|
|
|
|
Returns:
|
|
bool: True if signing successful
|
|
"""
|
|
try:
|
|
# For simplified invoices, we need to sign with the CSID private key
|
|
# Note: In production, private key should be stored securely
|
|
|
|
# Generate signature (placeholder - needs actual private key)
|
|
signature = f"SIGNED_{invoice.invoice_hash[:32]}"
|
|
|
|
# Update invoice
|
|
invoice.cryptographic_stamp = signature
|
|
invoice.save(update_fields=['cryptographic_stamp'])
|
|
|
|
# Update CSID usage
|
|
csid_obj.increment_usage()
|
|
|
|
logger.info(f"Simplified invoice {invoice.invoice_number} signed")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error signing simplified invoice: {e}")
|
|
return False
|
|
|
|
|
|
class SecureKeyStorage:
|
|
"""Service for secure key storage."""
|
|
|
|
@staticmethod
|
|
def encrypt_key(key_data: bytes, encryption_key: str) -> str:
|
|
"""
|
|
Encrypt private key for storage.
|
|
|
|
Args:
|
|
key_data: Key data to encrypt
|
|
encryption_key: Encryption key
|
|
|
|
Returns:
|
|
str: Base64 encoded encrypted key
|
|
"""
|
|
try:
|
|
from cryptography.fernet import Fernet
|
|
|
|
# Create Fernet instance
|
|
f = Fernet(encryption_key.encode())
|
|
|
|
# Encrypt key
|
|
encrypted = f.encrypt(key_data)
|
|
|
|
# Encode in base64
|
|
encrypted_base64 = base64.b64encode(encrypted).decode('utf-8')
|
|
|
|
return encrypted_base64
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error encrypting key: {e}")
|
|
raise
|
|
|
|
@staticmethod
|
|
def decrypt_key(encrypted_key_base64: str, encryption_key: str) -> bytes:
|
|
"""
|
|
Decrypt private key from storage.
|
|
|
|
Args:
|
|
encrypted_key_base64: Base64 encoded encrypted key
|
|
encryption_key: Encryption key
|
|
|
|
Returns:
|
|
bytes: Decrypted key data
|
|
"""
|
|
try:
|
|
from cryptography.fernet import Fernet
|
|
|
|
# Create Fernet instance
|
|
f = Fernet(encryption_key.encode())
|
|
|
|
# Decode from base64
|
|
encrypted = base64.b64decode(encrypted_key_base64)
|
|
|
|
# Decrypt key
|
|
decrypted = f.decrypt(encrypted)
|
|
|
|
return decrypted
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error decrypting key: {e}")
|
|
raise
|