2025-11-17 09:33:47 +03:00

822 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import os
import json
import logging
import requests
from PyPDF2 import PdfReader
from datetime import datetime
from django.db import transaction
from .utils import create_zoom_meeting
from recruitment.models import Application
from . linkedin_service import LinkedInService
from django.shortcuts import get_object_or_404
from . models import JobPosting
from django.utils import timezone
from . models import InterviewSchedule,ScheduledInterview,ZoomMeetingDetails
# Add python-docx import for Word document processing
try:
from docx import Document
DOCX_AVAILABLE = True
except ImportError:
DOCX_AVAILABLE = False
logger = logging.getLogger(__name__)
logger.warning("python-docx not available. Word document processing will be disabled.")
logger = logging.getLogger(__name__)
OPENROUTER_API_KEY ='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'
OPENROUTER_MODEL = 'x-ai/grok-code-fast-1'
# OPENROUTER_MODEL = 'openai/gpt-oss-20b:free'
# OPENROUTER_MODEL = 'openai/gpt-oss-20b'
# OPENROUTER_MODEL = 'mistralai/mistral-small-3.2-24b-instruct:free'
# from google import genai
# client = genai.Client(api_key="AIzaSyDkwYmvRe5ieTjQi1ClSzD5z5roTwaFsmY")
# def google_ai(text):
# response = client.models.generate_content(
# model="gemini-2.5-flash", contents=text
# )
# return response
if not OPENROUTER_API_KEY:
logger.warning("OPENROUTER_API_KEY not set. Resume scoring will be skipped.")
def extract_text_from_pdf(file_path):
"""Extract text from PDF files"""
print("PDF text extraction")
text = ""
try:
with open(file_path, "rb") as f:
reader = PdfReader(f)
for page in reader.pages:
text += (page.extract_text() or "")
except Exception as e:
logger.error(f"PDF extraction failed: {e}")
raise
return text.strip()
def extract_text_from_word(file_path):
"""Extract text from Word documents (.docx)"""
if not DOCX_AVAILABLE:
raise ImportError("python-docx is not installed. Please install it with: pip install python-docx")
print("Word text extraction")
text = ""
try:
doc = Document(file_path)
# Extract text from paragraphs
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
# Extract text from tables
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
text += cell.text + "\t"
text += "\n"
# Extract text from headers and footers
for section in doc.sections:
# Header
if section.header:
for paragraph in section.header.paragraphs:
text += "[HEADER] " + paragraph.text + "\n"
# Footer
if section.footer:
for paragraph in section.footer.paragraphs:
text += "[FOOTER] " + paragraph.text + "\n"
except Exception as e:
logger.error(f"Word extraction failed: {e}")
raise
return text.strip()
def extract_text_from_document(file_path):
"""Extract text from documents based on file type"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.pdf':
return extract_text_from_pdf(file_path)
elif file_ext == '.docx':
return extract_text_from_word(file_path)
else:
raise ValueError(f"Unsupported file type: {file_ext}. Only .pdf and .docx files are supported.")
def format_job_description(pk):
job_posting = JobPosting.objects.get(pk=pk)
print(job_posting)
prompt = f"""
You are a dual-purpose AI assistant specializing in content formatting and social media copywriting for job announcements.
**JOB POSTING DATA (Raw Input):**
---
**JOB DESCRIPTION:**
{job_posting.description}
**QUALIFICATIONS:**
{job_posting.qualifications}
**BENEFITS:**
{job_posting.benefits}
**APPLICATION INSTRUCTIONS:**
{job_posting.application_instructions}
**APPLICATION DEADLINE:**
{job_posting.application_deadline}
**HASHTAGS: for search and reach:**
{job_posting.hash_tags}
**APPLICATION URL: for career page only if it is provided**
{job_posting.application_url}
---
**TASK 1: HTML Formatting (Two Blocks)**
1. **Format the Job Description:** Organize and format the raw JOB DESCRIPTION and BENEFITS data into clear, readable sections using `<h2>` headings and `<ul>`/`<li>` bullet points. Encapsulate the entire formatted block within a single `<div>`.
2. **Format the Qualifications:** Organize and format the raw QUALIFICATIONS data into clear, readable sections using `<h2>` headings and `<ul>`/`<li>` bullet points. Encapsulate the entire formatted block within a single `<div>`.
3. **Format the Benefits:** Organize and format the raw Requirements data into clear, readable sections using `<h2>` headings and `<ul>`/`<li>` bullet points. Encapsulate the entire formatted block within a single `<div>`.
4. **Application Instructions:** Organize and format the raw Requirements data into clear, readable sections using `<h2>` headings and `<ul>`/`<li>` bullet points. Encapsulate the entire formatted block within a single `<div>`.
**TASK 2: LinkedIn Post Creation**
1. **Write the Post:** Create an engaging, professional, and concise LinkedIn post (maximum 1300 characters) summarizing the opportunity.
2. **Encourage Action:** The post must have a strong call-to-action (CTA) encouraging applications.
3. **Use Hashtags:** Integrate relevant industry, role, and company hashtags (including any provided in the raw input) naturally at the end of the post.
**STRICT JSON OUTPUT INSTRUCTIONS:**
Output a **single, valid JSON object** with **ONLY** the following three top-level key-value pairs.
* The values for `html_job_description` and `html_qualifications` MUST be the complete, formatted HTML strings (including all tags).
* The value for `linkedin_post` MUST be the complete, final LinkedIn post as a single string not greater than 3000 characters.
**Output Keys:**
1. `html_job_description`
2. `html_qualifications`
3. 'html_benefits'
4. 'html_application_instructions'
5. `linkedin_post_data`
**Do not include any other text, explanation, or markdown outside of the final JSON object.**
"""
result = ai_handler(prompt)
print(f"REsults: {result}")
if result['status'] == 'error':
logger.error(f"AI handler returned error for candidate {job_posting.pk}")
print(f"AI handler returned error for candidate {job_posting.pk}")
return
data = result['data']
if isinstance(data, str):
data = json.loads(data)
print(data)
job_posting.description = data.get('html_job_description')
job_posting.qualifications = data.get('html_qualifications')
job_posting.benefits=data.get('html_benefits')
job_posting.application_instructions=data.get('html_application_instruction')
job_posting.linkedin_post_formated_data=data.get('linkedin_post_data')
job_posting.save(update_fields=['description', 'qualifications','linkedin_post_formated_data'])
def ai_handler(prompt):
print("model call")
response = requests.post(
url="https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
},
data=json.dumps({
"model": OPENROUTER_MODEL,
"messages": [{"role": "user", "content": prompt}],
},
)
)
res = {}
print(response.status_code)
if response.status_code == 200:
res = response.json()
print(res)
content = res["choices"][0]['message']['content']
try:
# print(content)
content = content.replace("```json","").replace("```","")
res = json.loads(content)
print("success response")
return {"status": "success", "data": res}
except Exception as e:
print(e)
return {"status": "error", "data": str(e)}
else:
print("error response")
return {"status": "error", "data": response.json()}
def safe_cast_to_float(value, default=0.0):
"""Safely converts a value (int, float, or string) to a float."""
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
# Remove non-numeric characters except the decimal point
cleaned_value = re.sub(r'[^\d.]', '', value)
try:
# Ensure we handle empty strings after cleaning
return float(cleaned_value) if cleaned_value else default
except ValueError:
return default
return default
def handle_reume_parsing_and_scoring(pk):
"""
Optimized Django-Q task to parse a resume, score the candidate against a job,
and atomically save the results.
"""
# --- 1. Robust Object Retrieval (Prevents looping on DoesNotExist) ---
try:
instance = Application.objects.get(pk=pk)
except Application.DoesNotExist:
# Exit gracefully if the candidate was deleted after the task was queued
logger.warning(f"Candidate matching query does not exist for pk={pk}. Exiting task.")
print(f"Candidate matching query does not exist for pk={pk}. Exiting task.")
return
logger.info(f"Scoring resume for candidate {pk}")
print(f"Scoring resume for candidate {pk}")
# --- 2. I/O and Initial Data Check ---
try:
file_path = instance.resume.path
if not os.path.exists(file_path):
logger.warning(f"Resume file not found: {file_path}")
print(f"Resume file not found: {file_path}")
# Consider marking the task as unsuccessful but don't re-queue
return
# Use the new unified document parser
resume_text = extract_text_from_document(file_path)
job_detail = f"{instance.job.description} {instance.job.qualifications}"
except Exception as e:
logger.error(f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}")
print(f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}")
return
print(resume_text)
# --- 3. Single, Combined LLM Prompt (Major Cost & Latency Optimization) ---
prompt = f"""
You are an expert AI system functioning as both a Resume Parser and a Technical Recruiter.
Your task is to:
1. **PARSE**: Extract all key-value information from the provided RESUME TEXT into a clean JSON structure under the key 'resume_data', preserving the original text and it's formatting and dont add any extra text.
2. **SCORE**: Analyze the parsed data against the JOB CRITERIA and generate a comprehensive score and analysis under the key 'analysis_data'.
**JOB CRITERIA:**
{job_detail}
**RESUME TEXT:**
{resume_text}
**STRICT JSON OUTPUT INSTRUCTIONS:**
Output a single, valid JSON object with ONLY the following two top-level keys:
1. "resume_data": {{
"full_name": "Full name of the candidate",
"current_title": "Most recent or current job title",
"location": "City and state",
"contact": "Phone number and email",
"linkedin": "LinkedIn profile URL",
"github": "GitHub or portfolio URL",
"summary": "Brief professional profile or summary (12 sentences)",
"education": [{{
"institution": "Institution name",
"degree": "Degree name",
"year": "Year of graduation" (if provided) or '',
"gpa": "GPA (if provided)",
"relevant_courses": ["list", "of", "courses"](if provided) or []
}}],
"skills": {{
"category_1": ["skill_a", "skill_b"],
"uncategorized": ["tool_x"]
}},
"experience": [{{
"company": "Company name",
"job_title": "Job Title",
"location": "Location",
"start_date": "YYYY-MM",
"end_date": "YYYY-MM or Present",
"key_achievements": ["concise bullet points"] (if provided) or []
}}],
"projects": [{{
"name": "Project name",
"year": "Year",
"technologies_used": ["list", "of", "tech"] (if provided) or [],
"brief_description": "description"
}}]
}}
2. "analysis_data": {{
"match_score": "Integer Score 0-100",
"strengths": "Brief summary of strengths",
"weaknesses": "Brief summary of weaknesses",
"years_of_experience": "Total years of experience (float, e.g., 6.5)",
"criteria_checklist": List of job requirements if any {{ "Python": "Met", "AWS": "Not Met"}} only output the criteria_checklist in one of ('Met','Not Met') don't output any extra text,
"category": "Most fitting professional field (e.g., Data Science), only output the category name and no other text example ('Software Development', 'correct') , ('Software Development and devops','wrong') ('Software Development / Backend Development','wrong')",
"most_recent_job_title": "Candidate's most recent job title",
"recommendation": "Detailed hiring recommendation narrative",
"top_3_keywords": ["keyword1", "keyword2", "keyword3"],
"job_fit_narrative": "Single, concise summary sentence",
"language_fluency": ["language: fluency_level"],
"screening_stage_rating": "Standardized rating (Highly Qualified, Qualified , Partially Qualified, Not Qualified)",
"min_req_met_bool": "Boolean (true/false)",
"soft_skills_score": "Integer Score 0-100 for inferred non-technical skills",
"experience_industry_match": "Integer Score 0-100 for industry relevance",
"seniority_level_match": "Integer Score 0-100 for alignment with JD's seniority level",
"red_flags": ["List of any potential concerns (if any): e.g., 'Employment gap 1 year', 'Frequent job hopping', 'Missing required certification'"],
"employment_stability_score": "Integer Score 0-100 (Higher is more stable/longer tenure) (if possible)",
"transferable_skills_narrative": "A brief sentence describing the relevance of non-core experience (if applicable).",
"cultural_fit_keywords": ["A list of 3-5 keywords extracted from the resume (if possible) (e.g., 'team-player', 'mentored', 'cross-functional')"]
}}
If a top-level key or its required fields are missing, set the field to null, an empty list, or an empty object as appropriate.
Output only valid JSON—no markdown, no extra text.
"""
try:
result = ai_handler(prompt)
if result['status'] == 'error':
logger.error(f"AI handler returned error for candidate {instance.pk}")
print(f"AI handler returned error for candidate {instance.pk}")
return
# Ensure the result is parsed as a Python dict (if ai_handler returns a JSON string)
data = result['data']
if isinstance(data, str):
data = json.loads(data)
print(data)
# parsed_summary = data.get('parsed_data', {})
# scoring_result = data.get('scoring_data', {})
except Exception as e:
logger.error(f"AI handler failed for candidate {instance.pk}: {e}")
print(f"AI handler failed for candidate {instance.pk}: {e}")
return
# --- 4. Atomic Database Update (Ensures data integrity) ---
with transaction.atomic():
# Map JSON keys to model fields with appropriate defaults
# update_map = {
# 'match_score': ('match_score', 0),
# 'years_of_experience': ('years_of_experience', 0.0),
# 'soft_skills_score': ('soft_skills_score', 0),
# 'experience_industry_match': ('experience_industry_match', 0),
# 'min_req_met_bool': ('min_req_met_bool', False),
# 'screening_stage_rating': ('screening_stage_rating', 'N/A'),
# 'most_recent_job_title': ('most_recent_job_title', 'N/A'),
# 'top_3_keywords': ('top_3_keywords', []),
# 'strengths': ('strengths', ''),
# 'weaknesses': ('weaknesses', ''),
# 'job_fit_narrative': ('job_fit_narrative', ''),
# 'recommendation': ('recommendation', ''),
# 'criteria_checklist': ('criteria_checklist', {}),
# 'language_fluency': ('language_fluency', []),
# 'category': ('category', 'N/A'),
# }
# Apply scoring results to the instance
# for model_field, (json_key, default_value) in update_map.items():
# instance.ai_analysis_data[model_field] = scoring_result.get(json_key, default_value)
# instance.set_field(model_field, scoring_result.get(json_key, default_value))
# instance.set_field("match_score" , int(safe_cast_to_float(scoring_result.get('match_score', 0), default=0)))
# instance.set_field("years_of_experience" , safe_cast_to_float(scoring_result.get('years_of_experience', 0.0)))
# instance.set_field("soft_skills_score" , int(safe_cast_to_float(scoring_result.get('soft_skills_score', 0), default=0)))
# instance.set_field("experience_industry_match" , int(safe_cast_to_float(scoring_result.get('experience_industry_match', 0), default=0)))
# # Other Model Fields
# instance.set_field("min_req_met_bool" , scoring_result.get('min_req_met_bool', False))
# instance.set_field("screening_stage_rating" , scoring_result.get('screening_stage_rating', 'N/A'))
# instance.set_field("category" , scoring_result.get('category', 'N/A'))
# instance.set_field("most_recent_job_title" , scoring_result.get('most_recent_job_title', 'N/A'))
# instance.set_field("top_3_keywords" , scoring_result.get('top_3_keywords', []))
# instance.set_field("strengths" , scoring_result.get('strengths', ''))
# instance.set_field("weaknesses" , scoring_result.get('weaknesses', ''))
# instance.set_field("job_fit_narrative" , scoring_result.get('job_fit_narrative', ''))
# instance.set_field("recommendation" , scoring_result.get('recommendation', ''))
# instance.set_field("criteria_checklist" , scoring_result.get('criteria_checklist', {}))
# instance.set_field("language_fluency" , scoring_result.get('language_fluency', []))
# 2. Update the Full JSON Field (ai_analysis_data)
if instance.ai_analysis_data is None:
instance.ai_analysis_data = {}
# Save both structured outputs into the single JSONField for completeness
instance.ai_analysis_data = data
# instance.ai_analysis_data['parsed_data'] = parsed_summary
# instance.ai_analysis_data['scoring_data'] = scoring_result
# Apply parsing results
# instance.parsed_summary = json.dumps(parsed_summary)
instance.is_resume_parsed = True
instance.save(update_fields=['ai_analysis_data', 'is_resume_parsed'])
logger.info(f"Successfully scored and saved analysis for candidate {instance.id}")
print(f"Successfully scored and saved analysis for candidate {instance.id}")
from django.utils import timezone
def create_interview_and_meeting(
candidate_id,
job_id,
schedule_id,
slot_date,
slot_time,
duration
):
"""
Synchronous task for a single interview slot, dispatched by django-q.
"""
try:
candidate = Application.objects.get(pk=candidate_id)
job = JobPosting.objects.get(pk=job_id)
schedule = InterviewSchedule.objects.get(pk=schedule_id)
interview_datetime = timezone.make_aware(datetime.combine(slot_date, slot_time))
meeting_topic = f"Interview for {job.title} - {candidate.name}"
# 1. External API Call (Slow)
result = create_zoom_meeting(meeting_topic, interview_datetime, duration)
if result["status"] == "success":
# 2. Database Writes (Slow)
zoom_meeting = ZoomMeetingDetails.objects.create(
topic=meeting_topic,
start_time=interview_datetime,
duration=duration,
meeting_id=result["meeting_details"]["meeting_id"],
details_url=result["meeting_details"]["join_url"],
zoom_gateway_response=result["zoom_gateway_response"],
host_email=result["meeting_details"]["host_email"],
password=result["meeting_details"]["password"],
location_type="Remote"
)
ScheduledInterview.objects.create(
application=candidate,
job=job,
interview_location=zoom_meeting,
schedule=schedule,
interview_date=slot_date,
interview_time=slot_time
)
# Log success or use Django-Q result system for monitoring
logger.info(f"Successfully scheduled interview for {Application.name}")
return True # Task succeeded
else:
# Handle Zoom API failure (e.g., log it or notify administrator)
logger.error(f"Zoom API failed for {Application.name}: {result['message']}")
return False # Task failed
except Exception as e:
# Catch any unexpected errors during database lookups or processing
logger.error(f"Critical error scheduling interview: {e}")
return False # Task failed
def handle_zoom_webhook_event(payload):
"""
Background task to process a Zoom webhook event and update the local ZoomMeeting status.
It handles: created, updated, started, ended, and deleted events.
"""
event_type = payload.get('event')
object_data = payload['payload']['object']
# Zoom often uses a long 'id' for the scheduled meeting and sometimes a 'uuid'.
# We rely on the unique 'id' that maps to your ZoomMeeting.meeting_id field.
meeting_id_zoom = str(object_data.get('id'))
print(meeting_id_zoom)
if not meeting_id_zoom:
logger.warning(f"Webhook received without a valid Meeting ID: {event_type}")
return False
try:
# Use filter().first() to avoid exceptions if the meeting doesn't exist yet,
# and to simplify the logic flow.
meeting_instance = ZoomMeetingDetails.objects.filter(meeting_id=meeting_id_zoom).first()
print(meeting_instance)
# --- 1. Creation and Update Events ---
if event_type == 'meeting.updated':
if meeting_instance:
# Update key fields from the webhook payload
meeting_instance.topic = object_data.get('topic', meeting_instance.topic)
# Check for and update status and time details
# if event_type == 'meeting.created':
# meeting_instance.status = 'scheduled'
# elif event_type == 'meeting.updated':
# Only update time fields if they are in the payload
print(object_data)
meeting_instance.start_time = object_data.get('start_time', meeting_instance.start_time)
meeting_instance.duration = object_data.get('duration', meeting_instance.duration)
meeting_instance.timezone = object_data.get('timezone', meeting_instance.timezone)
meeting_instance.status = object_data.get('status', meeting_instance.status)
meeting_instance.save(update_fields=['topic', 'start_time', 'duration', 'timezone', 'status'])
# --- 2. Status Change Events (Start/End) ---
elif event_type == 'meeting.started':
if meeting_instance:
meeting_instance.status = 'started'
meeting_instance.save(update_fields=['status'])
elif event_type == 'meeting.ended':
if meeting_instance:
meeting_instance.status = 'ended'
meeting_instance.save(update_fields=['status'])
# --- 3. Deletion Event (User Action) ---
elif event_type == 'meeting.deleted':
if meeting_instance:
try:
meeting_instance.status = 'cancelled'
meeting_instance.save(update_fields=['status'])
except Exception as e:
logger.error(f"Failed to mark Zoom meeting as cancelled: {e}")
return True
except Exception as e:
logger.error(f"Failed to process Zoom webhook for {event_type} (ID: {meeting_id_zoom}): {e}", exc_info=True)
return False
def linkedin_post_task(job_slug, access_token):
# for linked post background tasks
job=get_object_or_404(JobPosting,slug=job_slug)
try:
service=LinkedInService()
service.access_token=access_token
# long running task
result=service.create_job_post(job)
#update the jobposting object with the final result
if result['success']:
job.posted_to_linkedin=True
job.linkedin_post_id=result['post_id']
job.linkedin_post_url=result['post_url']
job.linkedin_post_status='SUCCESSS'
job.linkedin_posted_at=timezone.now()
else:
error_msg=result.get('error',"Unknown API error")
job.linkedin_post_status = 'FAILED'
logger.error(f"LinkedIn post failed for job {job_slug}: {error_msg}")
job.save()
return result['success']
except Exception as e:
logger.error(f"Critical error in LinkedIn task for job {job_slug}: {e}", exc_info=True)
# Update job status with the critical error
job.linkedin_post_status = f"CRITICAL_ERROR: {str(e)}"
job.save()
return False
def form_close(job_id):
job = get_object_or_404(JobPosting, pk=job_id)
job.is_active = False
job.template_form.is_active = False
job.save()
def sync_hired_candidates_task(job_slug):
"""
Django-Q background task to sync hired candidates to all configured sources.
Args:
job_slug (str): The slug of the job posting
Returns:
dict: Sync results with status and details
"""
from .candidate_sync_service import CandidateSyncService
from .models import JobPosting, IntegrationLog
logger.info(f"Starting background sync task for job: {job_slug}")
try:
# Get the job posting
job = JobPosting.objects.get(slug=job_slug)
# Initialize sync service
sync_service = CandidateSyncService()
print(sync_service)
# Perform the sync operation
results = sync_service.sync_hired_candidates_to_all_sources(job)
print(results)
# Log the sync operation
# IntegrationLog.objects.create(
# source=None, # This is a multi-source sync operation
# action=IntegrationLog.ActionChoices.SYNC,
# endpoint="multi_source_sync",
# method="BACKGROUND_TASK",
# request_data={"job_slug": job_slug, "candidate_count": job.accepted_candidates.count()},
# response_data=results,
# status_code="SUCCESS" if results.get('summary', {}).get('failed', 0) == 0 else "PARTIAL",
# ip_address="127.0.0.1", # Background task
# user_agent="Django-Q Background Task",
# processing_time=results.get('summary', {}).get('total_duration', 0)
# )
logger.info(f"Background sync completed for job {job_slug}: {results}")
return results
except JobPosting.DoesNotExist:
error_msg = f"Job posting not found: {job_slug}"
logger.error(error_msg)
# Log the error
IntegrationLog.objects.create(
source=None,
action=IntegrationLog.ActionChoices.ERROR,
endpoint="multi_source_sync",
method="BACKGROUND_TASK",
request_data={"job_slug": job_slug},
error_message=error_msg,
status_code="ERROR",
ip_address="127.0.0.1",
user_agent="Django-Q Background Task"
)
return {"status": "error", "message": error_msg}
except Exception as e:
error_msg = f"Unexpected error during sync: {str(e)}"
logger.error(error_msg, exc_info=True)
# Log the error
IntegrationLog.objects.create(
source=None,
action=IntegrationLog.ActionChoices.ERROR,
endpoint="multi_source_sync",
method="BACKGROUND_TASK",
request_data={"job_slug": job_slug},
error_message=error_msg,
status_code="ERROR",
ip_address="127.0.0.1",
user_agent="Django-Q Background Task"
)
return {"status": "error", "message": error_msg}
def sync_candidate_to_source_task(candidate_id, source_id):
"""
Django-Q background task to sync a single candidate to a specific source.
Args:
candidate_id (int): The ID of the candidate
source_id (int): The ID of the source
Returns:
dict: Sync result for this specific candidate-source pair
"""
from .candidate_sync_service import CandidateSyncService
from .models import Application, Source, IntegrationLog
logger.info(f"Starting sync task for candidate {candidate_id} to source {source_id}")
try:
# Get the candidate and source
application = Application.objects.get(pk=candidate_id)
source = Source.objects.get(pk=source_id)
# Initialize sync service
sync_service = CandidateSyncService()
# Perform the sync operation
result = sync_service.sync_candidate_to_source(application, source)
# Log the operation
IntegrationLog.objects.create(
source=source,
action=IntegrationLog.ActionChoices.SYNC,
endpoint=source.sync_endpoint or "unknown",
method=source.sync_method or "POST",
request_data={"candidate_id": candidate_id, "application_name": application.name},
response_data=result,
status_code="SUCCESS" if result.get('success') else "ERROR",
error_message=result.get('error') if not result.get('success') else None,
ip_address="127.0.0.1",
user_agent="Django-Q Background Task",
processing_time=result.get('duration', 0)
)
logger.info(f"Sync completed for candidate {candidate_id} to source {source_id}: {result}")
return result
except Application.DoesNotExist:
error_msg = f"Application not found: {candidate_id}"
logger.error(error_msg)
return {"success": False, "error": error_msg}
except Source.DoesNotExist:
error_msg = f"Source not found: {source_id}"
logger.error(error_msg)
return {"success": False, "error": error_msg}
except Exception as e:
error_msg = f"Unexpected error during sync: {str(e)}"
logger.error(error_msg, exc_info=True)
return {"success": False, "error": error_msg}
from django.conf import settings
from django.core.mail import EmailMultiAlternatives
from django.utils.html import strip_tags
def _task_send_individual_email(subject, body_message, recipient, attachments):
"""Internal helper to create and send a single email."""
from_email = getattr(settings, 'DEFAULT_FROM_EMAIL', 'noreply@kaauh.edu.sa')
is_html = '<' in body_message and '>' in body_message
if is_html:
plain_message = strip_tags(body_message)
email_obj = EmailMultiAlternatives(subject=subject, body=plain_message, from_email=from_email, to=[recipient])
email_obj.attach_alternative(body_message, "text/html")
else:
email_obj = EmailMultiAlternatives(subject=subject, body=body_message, from_email=from_email, to=[recipient])
if attachments:
for attachment in attachments:
if isinstance(attachment, tuple) and len(attachment) == 3:
filename, content, content_type = attachment
email_obj.attach(filename, content, content_type)
try:
email_obj.send(fail_silently=False)
return True
except Exception as e:
logger.error(f"Task failed to send email to {recipient}: {str(e)}", exc_info=True)
return False
def send_bulk_email_task(subject, message, recipient_list, attachments=None, hook='recruitment.tasks.email_success_hook'):
"""
Django-Q background task to send pre-formatted email to a list of recipients.
Receives arguments directly from the async_task call.
"""
logger.info(f"Starting bulk email task for {len(recipient_list)} recipients")
successful_sends = 0
total_recipients = len(recipient_list)
if not recipient_list:
return {'success': False, 'error': 'No recipients provided to task.'}
# Since the async caller sends one task per recipient, total_recipients should be 1.
for recipient in recipient_list:
# The 'message' is the custom message specific to this recipient.
if _task_send_individual_email(subject, message, recipient, attachments):
successful_sends += 1
if successful_sends > 0:
logger.info(f"Bulk email task completed successfully. Sent to {successful_sends}/{total_recipients} recipients.")
return {
'success': True,
'recipients_count': successful_sends,
'message': f"Sent successfully to {successful_sends} recipient(s)."
}
else:
logger.error(f"Bulk email task failed: No emails were sent successfully.")
return {'success': False, 'error': "No emails were sent successfully in the background task."}
def email_success_hook(task):
"""
The success hook must accept the Task object as the first and only required positional argument.
"""
if task.success:
logger.info(f"Task ID {task.id} succeeded. Result: {task.result}")
else:
logger.error(f"Task ID {task.id} failed. Error: {task.result}")