agdar/finance/zatca_tasks.py
2025-11-02 14:35:35 +03:00

561 lines
18 KiB
Python

"""
Celery tasks for ZATCA e-invoice operations.
This module handles:
- Automated invoice submission
- Retry logic for failed submissions
- CSID renewal reminders
- Compliance monitoring
"""
import logging
from datetime import datetime, timedelta
from typing import Dict
from celery import shared_task
from django.utils import timezone
logger = logging.getLogger(__name__)
@shared_task(bind=True, max_retries=5, default_retry_delay=300)
def submit_invoice_to_zatca(self, invoice_id: str, use_sandbox: bool = True):
"""
Submit invoice to ZATCA for clearance or reporting.
Args:
invoice_id: Invoice UUID
use_sandbox: Whether to use sandbox environment
Returns:
Dict: Submission result
"""
from finance.models import Invoice
from finance.zatca_service import ZATCAService
from finance.csid_manager import CSIDManager
try:
# Get invoice
invoice = Invoice.objects.get(id=invoice_id)
# Get active CSID
csid_obj = CSIDManager.get_active_csid(invoice.tenant)
if not csid_obj:
logger.error(f"No active CSID found for invoice {invoice.invoice_number}")
return {
'success': False,
'error': 'No active CSID found. Please onboard your EGS unit.'
}
# Initialize ZATCA service
zatca = ZATCAService(use_sandbox=use_sandbox)
# Determine submission type based on invoice type
if invoice.invoice_type in ['STANDARD', 'STANDARD_DEBIT', 'STANDARD_CREDIT']:
# Submit for clearance (B2B)
success, response = zatca.submit_for_clearance(
invoice=invoice,
csid=csid_obj.certificate,
secret=csid_obj.secret
)
submission_type = 'CLEARANCE'
else:
# Submit for reporting (B2C)
success, response = zatca.submit_for_reporting(
invoice=invoice,
csid=csid_obj.certificate,
secret=csid_obj.secret
)
submission_type = 'REPORTING'
# Update invoice with response
if success:
invoice.zatca_status = 'CLEARED' if submission_type == 'CLEARANCE' else 'REPORTED'
invoice.zatca_submission_date = timezone.now()
invoice.zatca_response = response
# Update CSID usage
csid_obj.increment_usage()
# If clearance, update XML with ZATCA stamp
if submission_type == 'CLEARANCE' and 'clearedInvoice' in response:
invoice.xml_content = response['clearedInvoice']
invoice.save()
logger.info(
f"Invoice {invoice.invoice_number} {submission_type.lower()} successful"
)
return {
'success': True,
'submission_type': submission_type,
'response': response
}
else:
# Handle failure
invoice.zatca_status = 'FAILED'
invoice.zatca_response = response
invoice.save()
# Retry if it's a temporary error
if response.get('status_code') in [500, 503, 504]:
logger.warning(
f"Temporary error for invoice {invoice.invoice_number}. "
f"Retrying... (Attempt {self.request.retries + 1}/5)"
)
raise self.retry(exc=Exception(f"ZATCA server error: {response}"))
logger.error(
f"Invoice {invoice.invoice_number} {submission_type.lower()} failed: {response}"
)
return {
'success': False,
'submission_type': submission_type,
'error': response
}
except Invoice.DoesNotExist:
logger.error(f"Invoice {invoice_id} not found")
return {'success': False, 'error': 'Invoice not found'}
except Exception as e:
logger.error(f"Error submitting invoice {invoice_id}: {e}")
# Retry on exception
try:
raise self.retry(exc=e)
except self.MaxRetriesExceededError:
logger.error(f"Max retries exceeded for invoice {invoice_id}")
return {'success': False, 'error': str(e)}
@shared_task
def batch_submit_pending_invoices(tenant_id: str, use_sandbox: bool = True):
"""
Submit all pending invoices for a tenant.
Args:
tenant_id: Tenant UUID
use_sandbox: Whether to use sandbox environment
Returns:
Dict: Batch submission results
"""
from finance.models import Invoice
from core.models import Tenant
try:
tenant = Tenant.objects.get(id=tenant_id)
# Get pending invoices (issued but not submitted to ZATCA)
pending_invoices = Invoice.objects.filter(
tenant=tenant,
status=Invoice.Status.ISSUED,
zatca_status__in=['', 'FAILED']
)
results = {
'total': pending_invoices.count(),
'submitted': 0,
'failed': 0,
'errors': []
}
for invoice in pending_invoices:
# Submit asynchronously
result = submit_invoice_to_zatca.delay(str(invoice.id), use_sandbox)
# Track results
if result.get('success'):
results['submitted'] += 1
else:
results['failed'] += 1
results['errors'].append({
'invoice': invoice.invoice_number,
'error': result.get('error')
})
logger.info(
f"Batch submission for tenant {tenant.name}: "
f"{results['submitted']} submitted, {results['failed']} failed"
)
return results
except Exception as e:
logger.error(f"Error in batch submission: {e}")
return {'success': False, 'error': str(e)}
@shared_task
def check_csid_expiry():
"""
Check all CSIDs for expiry and send renewal reminders.
This task should run daily.
"""
from finance.models import CSID
from notifications.models import Notification
try:
# Get CSIDs expiring within 30 days
expiring_soon = CSID.objects.filter(
status=CSID.Status.ACTIVE,
expiry_date__lte=timezone.now() + timedelta(days=30),
expiry_date__gt=timezone.now()
)
for csid in expiring_soon:
days_left = csid.days_until_expiry
# Create notification for tenant admin
Notification.objects.create(
tenant=csid.tenant,
title=f"CSID Renewal Required",
message=f"CSID '{csid.common_name}' expires in {days_left} days. Please renew to continue e-invoicing.",
notification_type='WARNING',
priority='HIGH'
)
logger.info(f"Renewal reminder sent for CSID {csid.id} ({days_left} days left)")
# Mark expired CSIDs
expired = CSID.objects.filter(
status=CSID.Status.ACTIVE,
expiry_date__lte=timezone.now()
)
expired_count = expired.update(status=CSID.Status.EXPIRED)
logger.info(f"Marked {expired_count} CSIDs as expired")
return {
'expiring_soon': expiring_soon.count(),
'expired': expired_count
}
except Exception as e:
logger.error(f"Error checking CSID expiry: {e}")
return {'success': False, 'error': str(e)}
@shared_task
def validate_invoice_sequence(tenant_id: str):
"""
Validate invoice counter sequence for a tenant.
Args:
tenant_id: Tenant UUID
Returns:
Dict: Validation results
"""
from finance.csid_manager import InvoiceCounterManager
from core.models import Tenant
try:
tenant = Tenant.objects.get(id=tenant_id)
# Validate sequence
is_valid, gaps = InvoiceCounterManager.validate_counter_sequence(tenant)
if not is_valid:
logger.warning(
f"Invoice sequence validation failed for tenant {tenant.name}. "
f"Gaps found: {gaps}"
)
# Create notification
from notifications.models import Notification
Notification.objects.create(
tenant=tenant,
title="Invoice Sequence Gap Detected",
message=f"Gaps detected in invoice sequence: {gaps}. Please investigate.",
notification_type='WARNING',
priority='HIGH'
)
return {
'tenant': tenant.name,
'is_valid': is_valid,
'gaps': gaps
}
except Exception as e:
logger.error(f"Error validating invoice sequence: {e}")
return {'success': False, 'error': str(e)}
@shared_task
def retry_failed_submissions(tenant_id: str = None, use_sandbox: bool = True):
"""
Retry failed ZATCA submissions.
Args:
tenant_id: Optional tenant UUID (if None, retry for all tenants)
use_sandbox: Whether to use sandbox environment
Returns:
Dict: Retry results
"""
from finance.models import Invoice
from core.models import Tenant
try:
# Get failed invoices
failed_invoices = Invoice.objects.filter(
zatca_status='FAILED',
status=Invoice.Status.ISSUED
)
if tenant_id:
tenant = Tenant.objects.get(id=tenant_id)
failed_invoices = failed_invoices.filter(tenant=tenant)
results = {
'total': failed_invoices.count(),
'retried': 0,
'still_failed': 0
}
for invoice in failed_invoices:
# Retry submission
result = submit_invoice_to_zatca.delay(str(invoice.id), use_sandbox)
if result.get('success'):
results['retried'] += 1
else:
results['still_failed'] += 1
logger.info(f"Retry results: {results}")
return results
except Exception as e:
logger.error(f"Error retrying failed submissions: {e}")
return {'success': False, 'error': str(e)}
@shared_task
def generate_compliance_report(tenant_id: str, start_date: str, end_date: str):
"""
Generate ZATCA compliance report for a tenant.
Args:
tenant_id: Tenant UUID
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
Dict: Compliance report
"""
from finance.models import Invoice
from core.models import Tenant
from datetime import datetime
try:
tenant = Tenant.objects.get(id=tenant_id)
start = datetime.strptime(start_date, '%Y-%m-%d').date()
end = datetime.strptime(end_date, '%Y-%m-%d').date()
# Get invoices for period
invoices = Invoice.objects.filter(
tenant=tenant,
issue_date__gte=start,
issue_date__lte=end
)
report = {
'tenant': tenant.name,
'period': f"{start_date} to {end_date}",
'total_invoices': invoices.count(),
'by_type': {},
'by_status': {},
'zatca_submission': {
'cleared': invoices.filter(zatca_status='CLEARED').count(),
'reported': invoices.filter(zatca_status='REPORTED').count(),
'failed': invoices.filter(zatca_status='FAILED').count(),
'pending': invoices.filter(zatca_status='').count(),
},
'compliance_issues': []
}
# Count by invoice type
for inv_type, _ in Invoice.InvoiceType.choices:
count = invoices.filter(invoice_type=inv_type).count()
if count > 0:
report['by_type'][inv_type] = count
# Count by status
for status, _ in Invoice.Status.choices:
count = invoices.filter(status=status).count()
if count > 0:
report['by_status'][status] = count
# Check for compliance issues
# 1. Invoices not submitted to ZATCA
not_submitted = invoices.filter(
status=Invoice.Status.ISSUED,
zatca_status=''
)
if not_submitted.exists():
report['compliance_issues'].append({
'issue': 'Invoices not submitted to ZATCA',
'count': not_submitted.count(),
'invoices': list(not_submitted.values_list('invoice_number', flat=True)[:10])
})
# 2. Failed submissions
failed = invoices.filter(zatca_status='FAILED')
if failed.exists():
report['compliance_issues'].append({
'issue': 'Failed ZATCA submissions',
'count': failed.count(),
'invoices': list(failed.values_list('invoice_number', flat=True)[:10])
})
# 3. Simplified invoices not reported within 24 hours
late_reporting = invoices.filter(
invoice_type__in=['SIMPLIFIED', 'SIMPLIFIED_DEBIT', 'SIMPLIFIED_CREDIT'],
zatca_status='',
created_at__lt=timezone.now() - timedelta(hours=24)
)
if late_reporting.exists():
report['compliance_issues'].append({
'issue': 'Simplified invoices not reported within 24 hours',
'count': late_reporting.count(),
'invoices': list(late_reporting.values_list('invoice_number', flat=True)[:10])
})
logger.info(f"Compliance report generated for tenant {tenant.name}")
return report
except Exception as e:
logger.error(f"Error generating compliance report: {e}")
return {'success': False, 'error': str(e)}
@shared_task
def auto_submit_simplified_invoices():
"""
Automatically submit simplified invoices that haven't been reported.
This task should run every hour to ensure 24-hour reporting compliance.
"""
from finance.models import Invoice
try:
# Get simplified invoices that need reporting
# (issued but not yet reported to ZATCA)
pending = Invoice.objects.filter(
invoice_type__in=['SIMPLIFIED', 'SIMPLIFIED_DEBIT', 'SIMPLIFIED_CREDIT'],
status=Invoice.Status.ISSUED,
zatca_status__in=['', 'FAILED']
)
results = {
'total': pending.count(),
'submitted': 0,
'failed': 0
}
for invoice in pending:
# Check if within 24 hours
hours_since_issue = (timezone.now() - invoice.created_at).total_seconds() / 3600
if hours_since_issue < 24:
# Submit for reporting
result = submit_invoice_to_zatca.delay(str(invoice.id))
if result.get('success'):
results['submitted'] += 1
else:
results['failed'] += 1
logger.info(f"Auto-submit results: {results}")
return results
except Exception as e:
logger.error(f"Error in auto-submit task: {e}")
return {'success': False, 'error': str(e)}
@shared_task
def monitor_zatca_compliance():
"""
Monitor overall ZATCA compliance across all tenants.
This task should run daily.
"""
from finance.models import Invoice, CSID
from core.models import Tenant
try:
report = {
'timestamp': timezone.now().isoformat(),
'tenants': [],
'summary': {
'total_tenants': 0,
'compliant_tenants': 0,
'non_compliant_tenants': 0,
'total_invoices_24h': 0,
'submitted_24h': 0,
'failed_24h': 0
}
}
# Check each tenant
for tenant in Tenant.objects.all():
# Check CSID status
active_csid = CSID.objects.filter(
tenant=tenant,
status=CSID.Status.ACTIVE
).first()
# Get invoices from last 24 hours
recent_invoices = Invoice.objects.filter(
tenant=tenant,
created_at__gte=timezone.now() - timedelta(hours=24)
)
submitted = recent_invoices.filter(
zatca_status__in=['CLEARED', 'REPORTED']
).count()
failed = recent_invoices.filter(zatca_status='FAILED').count()
tenant_report = {
'name': tenant.name,
'has_active_csid': active_csid is not None,
'csid_expires_in_days': active_csid.days_until_expiry if active_csid else None,
'invoices_24h': recent_invoices.count(),
'submitted_24h': submitted,
'failed_24h': failed,
'compliance_rate': (submitted / recent_invoices.count() * 100) if recent_invoices.count() > 0 else 100
}
report['tenants'].append(tenant_report)
# Update summary
report['summary']['total_tenants'] += 1
report['summary']['total_invoices_24h'] += recent_invoices.count()
report['summary']['submitted_24h'] += submitted
report['summary']['failed_24h'] += failed
if tenant_report['compliance_rate'] >= 95:
report['summary']['compliant_tenants'] += 1
else:
report['summary']['non_compliant_tenants'] += 1
logger.info(f"ZATCA compliance monitoring completed: {report['summary']}")
return report
except Exception as e:
logger.error(f"Error in compliance monitoring: {e}")
return {'success': False, 'error': str(e)}