kaauh_ats/recruitment/tasks/email_tasks.py
2025-12-14 12:47:27 +03:00

307 lines
11 KiB
Python

"""
Background email tasks for Django-Q integration.
"""
import logging
from typing import Dict, Any
from django_q.tasks import async_task
from .services.email_service import UnifiedEmailService
from .dto.email_dto import EmailConfig, BulkEmailConfig, EmailTemplate, EmailResult
from .email_templates import EmailTemplates
logger = logging.getLogger(__name__)
def send_email_task(email_config_dict: Dict[str, Any]) -> Dict[str, Any]:
"""
Background task for sending individual emails.
Args:
email_config_dict: Dictionary representation of EmailConfig
Returns:
Dict with task result
"""
try:
# Reconstruct EmailConfig from dictionary
config = EmailConfig(
to_email=email_config_dict["to_email"],
subject=email_config_dict["subject"],
template_name=email_config_dict.get("template_name"),
context=email_config_dict.get("context", {}),
html_content=email_config_dict.get("html_content"),
attachments=email_config_dict.get("attachments", []),
priority=EmailPriority(email_config_dict.get("priority", "normal")),
cc_emails=email_config_dict.get("cc_emails", []),
bcc_emails=email_config_dict.get("bcc_emails", []),
reply_to=email_config_dict.get("reply_to"),
)
# Add sender and job objects if IDs provided
if email_config_dict.get("sender_id"):
from django.contrib.auth import get_user_model
User = get_user_model()
try:
config.sender = User.objects.get(id=email_config_dict["sender_id"])
except User.DoesNotExist:
logger.warning(
f"Sender user {email_config_dict['sender_id']} not found"
)
if email_config_dict.get("job_id"):
from .models import JobPosting
try:
config.job = JobPosting.objects.get(id=email_config_dict["job_id"])
except JobPosting.DoesNotExist:
logger.warning(f"Job {email_config_dict['job_id']} not found")
# Send email using unified service
service = UnifiedEmailService()
result = service.send_email(config)
return {
"success": result.success,
"message": result.message,
"recipient_count": result.recipient_count,
"error_details": result.error_details,
}
except Exception as e:
error_msg = f"Background email task failed: {str(e)}"
logger.error(error_msg, exc_info=True)
return {"success": False, "message": error_msg, "error_details": str(e)}
def send_bulk_email_task(*args, **kwargs) -> Dict[str, Any]:
"""
Background task for sending bulk emails.
Supports both old parameter format and new BulkEmailConfig format for backward compatibility.
Args:
*args: Variable positional arguments (old format)
**kwargs: Variable keyword arguments (old format)
Returns:
Dict with task result
"""
try:
config = None
# Handle both old format and new BulkEmailConfig format
if len(args) == 1 and isinstance(args[0], dict):
# New format: BulkEmailConfig dictionary
bulk_config_dict = args[0]
config = BulkEmailConfig(
subject=bulk_config_dict["subject"],
template_name=bulk_config_dict.get("template_name"),
recipients_data=bulk_config_dict["recipients_data"],
attachments=bulk_config_dict.get("attachments", []),
priority=EmailPriority(bulk_config_dict.get("priority", "normal")),
async_send=False, # Force sync processing in background
)
# Add sender and job objects if IDs provided
if bulk_config_dict.get("sender_id"):
from django.contrib.auth import get_user_model
User = get_user_model()
try:
config.sender = User.objects.get(id=bulk_config_dict["sender_id"])
except User.DoesNotExist:
logger.warning(
f"Sender user {bulk_config_dict['sender_id']} not found"
)
if bulk_config_dict.get("job_id"):
from .models import JobPosting
try:
config.job = JobPosting.objects.get(id=bulk_config_dict["job_id"])
except JobPosting.DoesNotExist:
logger.warning(f"Job {bulk_config_dict['job_id']} not found")
else:
# Old format: individual parameters
subject = kwargs.get("subject")
customized_sends = kwargs.get("customized_sends", [])
attachments = kwargs.get("attachments")
sender_user_id = kwargs.get("sender_user_id")
job_id = kwargs.get("job_id")
if not subject or not customized_sends:
return {"success": False, "message": "Missing required parameters"}
# Convert old format to BulkEmailConfig
recipients_data = []
for send_data in customized_sends:
if isinstance(send_data, dict):
recipients_data.append(
{
"email": send_data.get("email"),
"name": send_data.get(
"name",
send_data.get("email", "").split("@")[0]
if "@" in send_data.get("email", "")
else send_data.get("email", ""),
),
"personalization": send_data.get("personalization", {}),
}
)
else:
# Handle legacy format where customized_sends might be list of emails
recipients_data.append(
{
"email": send_data,
"name": send_data.split("@")[0]
if "@" in send_data
else send_data,
}
)
config = BulkEmailConfig(
subject=subject,
recipients_data=recipients_data,
attachments=attachments or [],
priority=EmailPriority.NORMAL,
async_send=False, # Force sync processing in background
)
# Handle old format with sender_user_id and job_id
if sender_user_id:
from django.contrib.auth import get_user_model
User = get_user_model()
try:
config.sender = User.objects.get(id=sender_user_id)
except User.DoesNotExist:
logger.warning(f"Sender user {sender_user_id} not found")
if job_id:
from .models import JobPosting
try:
config.job = JobPosting.objects.get(id=job_id)
except JobPosting.DoesNotExist:
logger.warning(f"Job {job_id} not found")
# Send bulk emails using unified service
service = UnifiedEmailService()
result = service.send_bulk_emails(config)
return {
"success": result.success,
"message": result.message,
"recipient_count": result.recipient_count,
"error_details": result.error_details,
}
except Exception as e:
error_msg = f"Background bulk email task failed: {str(e)}"
logger.error(error_msg, exc_info=True)
return {"success": False, "message": error_msg, "error_details": str(e)}
def send_interview_email_task(interview_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Background task specifically for interview invitation emails.
Args:
interview_data: Dictionary with interview details
Returns:
Dict with task result
"""
try:
from .models import ScheduledInterview
from .dto.email_dto import EmailConfig, EmailTemplate, EmailPriority
# Get interview object
interview_id = interview_data.get("interview_id")
if not interview_id:
raise ValueError("interview_id is required")
try:
interview = ScheduledInterview.objects.get(id=interview_id)
except ScheduledInterview.DoesNotExist:
raise ValueError(f"Interview {interview_id} not found")
# Build email configuration
service = UnifiedEmailService()
context = service.template_manager.build_interview_context(
interview.candidate,
interview.job,
{
"topic": f"Interview for {interview.job.title}",
"date_time": interview.interview_date,
"duration": "60 minutes",
"join_url": interview.zoom_meeting.join_url
if interview.zoom_meeting
else "",
"meeting_id": interview.zoom_meeting.meeting_id
if interview.zoom_meeting
else "",
},
)
config = EmailConfig(
to_email=interview.candidate.email,
subject=service.template_manager.get_subject_line(
EmailTemplate.INTERVIEW_INVITATION_ALT, context
),
template_name=EmailTemplate.INTERVIEW_INVITATION_ALT.value,
context=context,
priority=EmailPriority.HIGH,
)
# Send email
result = service.send_email(config)
return {
"success": result.success,
"message": result.message,
"recipient_count": result.recipient_count,
"error_details": result.error_details,
"interview_id": interview_id,
}
except Exception as e:
error_msg = f"Interview email task failed: {str(e)}"
logger.error(error_msg, exc_info=True)
return {
"success": False,
"message": error_msg,
"error_details": str(e),
"interview_id": interview_data.get("interview_id"),
}
def email_success_hook(task):
"""
Success hook for email tasks.
Args:
task: Django-Q task object
"""
if task.success:
logger.info(f"Email task {task.id} completed successfully: {task.result}")
else:
logger.error(f"Email task {task.id} failed: {task.result}")
def email_failure_hook(task):
"""
Failure hook for email tasks.
Args:
task: Django-Q task object
"""
logger.error(f"Email task {task.id} failed after retries: {task.result}")
# Additional failure handling can be added here
# e.g., send notification to admin, log to external system, etc.