1713 lines
70 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import os
import json
import logging
import requests
# # import re
import os
import json
import logging
from django_q.tasks import async_task
# from .services.email_service import UnifiedEmailService
from .dto.email_dto import EmailConfig, BulkEmailConfig, EmailTemplate, EmailResult
from .email_templates import EmailTemplates
logger = logging.getLogger(__name__) # Commented out as it's not used in this file
from datetime import datetime
from django.db import transaction
from .utils import create_zoom_meeting
from recruitment.models import Application
from .linkedin_service import LinkedInService
from django.shortcuts import get_object_or_404
from .models import JobPosting
from django.utils import timezone
from django.template.loader import render_to_string
from .models import BulkInterviewTemplate, Interview, Message, ScheduledInterview
from django.contrib.auth import get_user_model
from .utils import get_setting
from pypdf import PdfReader
User = get_user_model()
# Add python-docx import for Word document processing
try:
from docx import Document
DOCX_AVAILABLE = True
except ImportError:
DOCX_AVAILABLE = False
logger = logging.getLogger(__name__)
logger.warning(
"python-docx not available. Word document processing will be disabled."
)
logger = logging.getLogger(__name__)
OPENROUTER_API_URL = get_setting("OPENROUTER_API_URL")
OPENROUTER_API_KEY = get_setting("OPENROUTER_API_KEY")
OPENROUTER_MODEL = get_setting("OPENROUTER_MODEL")
# OPENROUTER_API_KEY ='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'
# OPENROUTER_MODEL = 'qwen/qwen-2.5-72b-instruct'
# OPENROUTER_MODEL = 'qwen/qwen-2.5-7b-instruct'
# OPENROUTER_MODEL = 'openai/gpt-oss-20b'
# OPENROUTER_MODEL = 'mistralai/mistral-small-3.2-24b-instruct:free'
# https://openrouter.ai/api/v1/chat/completions
# from google import genai
# client = genai.Client(api_key="AIzaSyDkwYmvRe5ieTjQi1ClSzD5z5roTwaFsmY")
# def google_ai(text):
# response = client.models.generate_content(
# model="gemini-2.5-flash", contents=text
# )
# return response
if not OPENROUTER_API_KEY:
logger.warning("OPENROUTER_API_KEY not set. Resume scoring will be skipped.")
def extract_text_from_pdf(file_path):
"""Extract text from PDF files"""
print("PDF text extraction")
text = ""
try:
with open(file_path, "rb") as f:
reader = PdfReader(f)
for page in reader.pages:
text += page.extract_text() or ""
except Exception as e:
logger.error(f"PDF extraction failed: {e}")
raise
return text.strip()
def extract_text_from_word(file_path):
"""Extract text from Word documents (.docx)"""
if not DOCX_AVAILABLE:
raise ImportError(
"python-docx is not installed. Please install it with: pip install python-docx"
)
print("Word text extraction")
text = ""
try:
doc = Document(file_path)
# Extract text from paragraphs
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
# Extract text from tables
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
text += cell.text + "\t"
text += "\n"
# Extract text from headers and footers
for section in doc.sections:
# Header
if section.header:
for paragraph in section.header.paragraphs:
text += "[HEADER] " + paragraph.text + "\n"
# Footer
if section.footer:
for paragraph in section.footer.paragraphs:
text += "[FOOTER] " + paragraph.text + "\n"
except Exception as e:
logger.error(f"Word extraction failed: {e}")
raise
return text.strip()
def extract_text_from_document(file_path):
"""Extract text from documents based on file type"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == ".pdf":
return extract_text_from_pdf(file_path)
elif file_ext == ".docx":
return extract_text_from_word(file_path)
else:
raise ValueError(
f"Unsupported file type: {file_ext}. Only .pdf and .docx files are supported."
)
def format_job_description(pk):
job_posting = JobPosting.objects.get(pk=pk)
print(job_posting)
prompt = f"""
You are a dual-purpose AI assistant specializing in content formatting and social media copywriting for job announcements.
**JOB POSTING DATA (Raw Input):**
---
**JOB DESCRIPTION:**
{job_posting.description}
**QUALIFICATIONS:**
{job_posting.qualifications}
**BENEFITS:**
{job_posting.benefits}
**APPLICATION INSTRUCTIONS:**
{job_posting.application_instructions}
**APPLICATION DEADLINE:**
{job_posting.application_deadline}
**HASHTAGS: for search and reach:**
{job_posting.hash_tags}
**APPLICATION URL: for career page only if it is provided**
{job_posting.application_url}
---
**TASK 1: HTML Formatting (Two Blocks)**
1. **Format the Job Description:** Organize and format the raw JOB DESCRIPTION and BENEFITS data into clear, readable sections using `<h2>` headings and `<ul>`/`<li>` bullet points. Encapsulate the entire formatted block within a single `<div>`.
2. **Format the Qualifications:** Organize and format the raw QUALIFICATIONS data into clear, readable sections using `<h2>` headings and `<ul>`/`<li>` bullet points. Encapsulate the entire formatted block within a single `<div>`.
3. **Format the Benefits:** Organize and format the raw Requirements data into clear, readable sections using `<h2>` headings and `<ul>`/`<li>` bullet points. Encapsulate the entire formatted block within a single `<div>`.
4. **Application Instructions:** Organize and format the raw Requirements data into clear, readable sections using `<h2>` headings and `<ul>`/`<li>` bullet points. Encapsulate the entire formatted block within a single `<div>`.
**TASK 2: LinkedIn Post Creation**
1. **Write the Post:** Create an engaging, professional, and concise LinkedIn post (maximum 1300 characters) summarizing the opportunity.
2. **Encourage Action:** The post must have a strong call-to-action (CTA) encouraging applications.
3. **Use Hashtags:** Integrate relevant industry, role, and company hashtags (including any provided in the raw input) naturally at the end of the post.
**STRICT JSON OUTPUT INSTRUCTIONS:**
Output a **single, valid JSON object** with **ONLY** the following three top-level key-value pairs.
* The values for `html_job_description` and `html_qualifications` MUST be the complete, formatted HTML strings (including all tags).
* The value for `linkedin_post` MUST be the complete, final LinkedIn post as a single string not greater than 3000 characters.
**Output Keys:**
1. `html_job_description`
2. `html_qualifications`
3. 'html_benefits'
4. 'html_application_instructions'
5. `linkedin_post_data`
**Do not include any other text, explanation, or markdown outside of the final JSON object.**
"""
result = ai_handler(prompt)
print(f"REsults: {result}")
if result["status"] == "error":
logger.error(f"AI handler returned error for candidate {job_posting.pk}")
print(f"AI handler returned error for candidate {job_posting.pk}")
return
data = result["data"]
if isinstance(data, str):
data = json.loads(data)
print(data)
job_posting.description = data.get("html_job_description")
job_posting.qualifications = data.get("html_qualifications")
job_posting.benefits = data.get("html_benefits")
job_posting.application_instructions = data.get("html_application_instruction")
job_posting.linkedin_post_formated_data = data.get("linkedin_post_data")
job_posting.ai_parsed = True
job_posting.save(
update_fields=[
"description",
"qualifications",
"linkedin_post_formated_data",
"ai_parsed",
]
)
def ai_handler(prompt):
print("model call")
OPENROUTER_API_URL = get_setting("OPENROUTER_API_URL")
OPENROUTER_API_KEY = get_setting("OPENROUTER_API_KEY")
OPENROUTER_MODEL = get_setting("OPENROUTER_MODEL")
print(OPENROUTER_MODEL)
response = requests.post(
url=OPENROUTER_API_URL,
headers={
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
},
data=json.dumps(
{
"model": OPENROUTER_MODEL,
"messages": [{"role": "user", "content": prompt}],
},
),
)
res = {}
print(response.status_code)
if response.status_code == 200:
res = response.json()
print(res)
content = res["choices"][0]["message"]["content"]
try:
# print(content)
content = content.replace("```json", "").replace("```", "")
res = json.loads(content)
print("success response")
return {"status": "success", "data": res}
except Exception as e:
print(e)
return {"status": "error", "data": str(e)}
else:
print("error response")
return {"status": "error", "data": response.json()}
def safe_cast_to_float(value, default=0.0):
"""Safely converts a value (int, float, or string) to a float."""
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
# Remove non-numeric characters except the decimal point
cleaned_value = re.sub(r"[^\d.]", "", value)
try:
# Ensure we handle empty strings after cleaning
return float(cleaned_value) if cleaned_value else default
except ValueError:
return default
return default
# def handle_resume_parsing_and_scoring(pk):
# """
# Optimized Django-Q task to parse a resume, score the candidate against a job,
# and atomically save the results.
# """
# # --- 1. Robust Object Retrieval (Prevents looping on DoesNotExist) ---
# try:
# instance = Application.objects.get(pk=pk)
# except Application.DoesNotExist:
# # Exit gracefully if the candidate was deleted after the task was queued
# logger.warning(f"Candidate matching query does not exist for pk={pk}. Exiting task.")
# print(f"Candidate matching query does not exist for pk={pk}. Exiting task.")
# return
# logger.info(f"Scoring resume for candidate {pk}")
# print(f"Scoring resume for candidate {pk}")
# # --- 2. I/O and Initial Data Check ---
# try:
# file_path = instance.resume.path
# if not os.path.exists(file_path):
# logger.warning(f"Resume file not found: {file_path}")
# print(f"Resume file not found: {file_path}")
# # Consider marking the task as unsuccessful but don't re-queue
# return
# # Use the new unified document parser
# resume_text = extract_text_from_document(file_path)
# job_detail = f"{instance.job.description} {instance.job.qualifications}"
# except Exception as e:
# logger.error(f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}")
# print(f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}")
# return
# print(resume_text)
# # --- 3. Single, Combined LLM Prompt (Major Cost & Latency Optimization) ---
# prompt = f"""
# You are an expert AI system functioning as both a Resume Parser and a Technical Recruiter.
# Your task is to:
# 1. **PARSE**: Extract all key-value information from the provided RESUME TEXT into a clean JSON structure under the key 'resume_data', preserving the original text and it's formatting and dont add any extra text.
# 2. **SCORE**: Analyze the parsed data against the JOB CRITERIA and generate a comprehensive score and analysis under the key 'analysis_data'.
# **JOB CRITERIA:**
# {job_detail}
# **RESUME TEXT:**
# {resume_text}
# **STRICT JSON OUTPUT INSTRUCTIONS:**
# Output a single, valid JSON object with ONLY the following two top-level keys:
# 1. "resume_data": {{
# "full_name": "Full name of the candidate",
# "current_title": "Most recent or current job title",
# "location": "City and state",
# "contact": "Phone number and email",
# "linkedin": "LinkedIn profile URL",
# "github": "GitHub or portfolio URL",
# "summary": "Brief professional profile or summary (12 sentences)",
# "education": [{{
# "institution": "Institution name",
# "degree": "Degree name",
# "year": "Year of graduation" (if provided) or '',
# "gpa": "GPA (if provided)",
# "relevant_courses": ["list", "of", "courses"](if provided) or []
# }}],
# "skills": {{
# "category_1": ["skill_a", "skill_b"],
# "uncategorized": ["tool_x"]
# }},
# "experience": [{{
# "company": "Company name",
# "job_title": "Job Title",
# "location": "Location",
# "start_date": "YYYY-MM",
# "end_date": "YYYY-MM or Present",
# "key_achievements": ["concise bullet points"] (if provided) or []
# }}],
# "projects": [{{
# "name": "Project name",
# "year": "Year",
# "technologies_used": ["list", "of", "tech"] (if provided) or [],
# "brief_description": "description"
# }}]
# }}
# 2. "analysis_data": {{
# "match_score": "Integer Score 0-100",
# "strengths": "Brief summary of strengths",
# "weaknesses": "Brief summary of weaknesses",
# "years_of_experience": "Total years of experience (float, e.g., 6.5)",
# "criteria_checklist": List of job requirements if any {{ "Python": "Met", "AWS": "Not Met"}} only output the criteria_checklist in one of ('Met','Not Met') don't output any extra text,
# "category": "Most fitting professional field (e.g., Data Science), only output the category name and no other text example ('Software Development', 'correct') , ('Software Development and devops','wrong') ('Software Development / Backend Development','wrong')",
# "most_recent_job_title": "Candidate's most recent job title",
# "recommendation": "Detailed hiring recommendation narrative",
# "top_3_keywords": ["keyword1", "keyword2", "keyword3"],
# "job_fit_narrative": "Single, concise summary sentence",
# "language_fluency": ["language: fluency_level"],
# "screening_stage_rating": "Standardized rating (Highly Qualified, Qualified , Partially Qualified, Not Qualified)",
# "min_req_met_bool": "Boolean (true/false)",
# "soft_skills_score": "Integer Score 0-100 for inferred non-technical skills",
# "experience_industry_match": "Integer Score 0-100 for industry relevance",
# "seniority_level_match": "Integer Score 0-100 for alignment with JD's seniority level",
# "red_flags": ["List of any potential concerns (if any): e.g., 'Employment gap 1 year', 'Frequent job hopping', 'Missing required certification'"],
# "employment_stability_score": "Integer Score 0-100 (Higher is more stable/longer tenure) (if possible)",
# "transferable_skills_narrative": "A brief sentence describing the relevance of non-core experience (if applicable).",
# "cultural_fit_keywords": ["A list of 3-5 keywords extracted from the resume (if possible) (e.g., 'team-player', 'mentored', 'cross-functional')"]
# }}
# If a top-level key or its required fields are missing, set the field to null, an empty list, or an empty object as appropriate.
# Output only valid JSON—no markdown, no extra text.
# """
# try:
# result = ai_handler(prompt)
# if result['status'] == 'error':
# logger.error(f"AI handler returned error for candidate {instance.pk}")
# print(f"AI handler returned error for candidate {instance.pk}")
# return
# # Ensure the result is parsed as a Python dict (if ai_handler returns a JSON string)
# data = result['data']
# if isinstance(data, str):
# data = json.loads(data)
# print(data)
# # parsed_summary = data.get('parsed_data', {})
# # scoring_result = data.get('scoring_data', {})
# except Exception as e:
# logger.error(f"AI handler failed for candidate {instance.pk}: {e}")
# print(f"AI handler failed for candidate {instance.pk}: {e}")
# return
# # --- 4. Atomic Database Update (Ensures data integrity) ---
# with transaction.atomic():
# # Map JSON keys to model fields with appropriate defaults
# # update_map = {
# # 'match_score': ('match_score', 0),
# # 'years_of_experience': ('years_of_experience', 0.0),
# # 'soft_skills_score': ('soft_skills_score', 0),
# # 'experience_industry_match': ('experience_industry_match', 0),
# # 'min_req_met_bool': ('min_req_met_bool', False),
# # 'screening_stage_rating': ('screening_stage_rating', 'N/A'),
# # 'most_recent_job_title': ('most_recent_job_title', 'N/A'),
# # 'top_3_keywords': ('top_3_keywords', []),
# # 'strengths': ('strengths', ''),
# # 'weaknesses': ('weaknesses', ''),
# # 'job_fit_narrative': ('job_fit_narrative', ''),
# # 'recommendation': ('recommendation', ''),
# # 'criteria_checklist': ('criteria_checklist', {}),
# # 'language_fluency': ('language_fluency', []),
# # 'category': ('category', 'N/A'),
# # }
# # Apply scoring results to the instance
# # for model_field, (json_key, default_value) in update_map.items():
# # instance.ai_analysis_data[model_field] = scoring_result.get(json_key, default_value)
# # instance.set_field(model_field, scoring_result.get(json_key, default_value))
# # instance.set_field("match_score" , int(safe_cast_to_float(scoring_result.get('match_score', 0), default=0)))
# # instance.set_field("years_of_experience" , safe_cast_to_float(scoring_result.get('years_of_experience', 0.0)))
# # instance.set_field("soft_skills_score" , int(safe_cast_to_float(scoring_result.get('soft_skills_score', 0), default=0)))
# # instance.set_field("experience_industry_match" , int(safe_cast_to_float(scoring_result.get('experience_industry_match', 0), default=0)))
# # # Other Model Fields
# # instance.set_field("min_req_met_bool" , scoring_result.get('min_req_met_bool', False))
# # instance.set_field("screening_stage_rating" , scoring_result.get('screening_stage_rating', 'N/A'))
# # instance.set_field("category" , scoring_result.get('category', 'N/A'))
# # instance.set_field("most_recent_job_title" , scoring_result.get('most_recent_job_title', 'N/A'))
# # instance.set_field("top_3_keywords" , scoring_result.get('top_3_keywords', []))
# # instance.set_field("strengths" , scoring_result.get('strengths', ''))
# # instance.set_field("weaknesses" , scoring_result.get('weaknesses', ''))
# # instance.set_field("job_fit_narrative" , scoring_result.get('job_fit_narrative', ''))
# # instance.set_field("recommendation" , scoring_result.get('recommendation', ''))
# # instance.set_field("criteria_checklist" , scoring_result.get('criteria_checklist', {}))
# # instance.set_field("language_fluency" , scoring_result.get('language_fluency', []))
# # 2. Update the Full JSON Field (ai_analysis_data)
# if instance.ai_analysis_data is None:
# instance.ai_analysis_data = {}
# # Save both structured outputs into the single JSONField for completeness
# instance.ai_analysis_data = data
# # instance.ai_analysis_data['parsed_data'] = parsed_summary
# # instance.ai_analysis_data['scoring_data'] = scoring_result
# # Apply parsing results
# # instance.parsed_summary = json.dumps(parsed_summary)
# instance.is_resume_parsed = True
# instance.save(update_fields=['ai_analysis_data', 'is_resume_parsed'])
# logger.info(f"Successfully scored and saved analysis for candidate {instance.id}")
# print(f"Successfully scored and saved analysis for candidate {instance.id}")
def handle_resume_parsing_and_scoring(pk: int):
"""
Optimized Django-Q task to parse a resume in English and Arabic, score the candidate,
and atomically save the results.
"""
# --- 1. Robust Object Retrieval (Prevents looping on DoesNotExist) ---
try:
# NOTE: Replace 'Application.objects.get' with your actual model manager call
instance = Application.objects.get(pk=pk)
except Application.DoesNotExist:
# Exit gracefully if the candidate was deleted after the task was queued
logger.warning(
f"Candidate matching query does not exist for pk={pk}. Exiting task."
)
print(f"Candidate matching query does not exist for pk={pk}. Exiting task.")
return
logger.info(f"Scoring resume for candidate {pk}")
print(f"Scoring resume for candidate {pk}")
# --- 2. I/O and Initial Data Check ---
try:
# Assuming instance.resume is a Django FileField
file_path = instance.resume.path
if not os.path.exists(file_path):
logger.warning(f"Resume file not found: {file_path}")
print(f"Resume file not found: {file_path}")
return
# Use the new unified document parser
resume_text = extract_text_from_document(file_path)
job_detail = f"{instance.job.description} {instance.job.qualifications}"
except Exception as e:
logger.error(
f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}"
)
print(
f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}"
)
return
print(resume_text)
# --- 3. Single, Combined LLM Prompt (Major Cost & Latency Optimization) ---
prompt = f"""
You are an expert AI system functioning as both a Resume Parser and a Technical Recruiter, capable of multi-language output.
Your task is to:
1. **PARSE (English)**: Extract all key-value information from the RESUME TEXT into a clean JSON structure under the key **'resume_data_en'**.
2. **PARSE (Arabic)**: Translate and output the exact same parsed data structure into Arabic under the key **'resume_data_ar'**. The keys must remain in English, but the values (names, titles, summaries, descriptions) must be in Arabic.
3. **SCORE (English)**: Analyze the data against the JOB CRITERIA and generate a comprehensive score and analysis under **'analysis_data_en'**, including an English narrative/recommendation.
4. **SCORE (Arabic)**: Output an identical analysis structure under **'analysis_data_ar'**, but ensure the narrative fields (**recommendation**, **job_fit_narrative**, **strengths**, **weaknesses**, **transferable_skills_narrative**) are translated into Arabic. All numerical and list fields (scores, checklist, keywords) must be identical to the English analysis.
**JOB CRITERIA:**
{job_detail}
**RESUME TEXT:**
{resume_text}
**STRICT JSON OUTPUT INSTRUCTIONS:**
You MUST output a single, valid JSON object.
This object MUST contain ONLY the following four top-level keys:
1. "resume_data_en"
2. "resume_data_ar"
3. "analysis_data_en"
4. "analysis_data_ar"
**ABSOLUTELY DO NOT use generic keys like "resume_data" or "analysis_data" at the top level.**
1. "resume_data_en": {{ /* English Parsed Data */
"full_name": "Full name of the candidate",
"current_title": "Most recent or current job title",
"location": "City and state",
"contact": "Phone number and email",
"linkedin": "LinkedIn profile URL",
"github": "GitHub or portfolio URL",
"summary": "Brief professional profile or summary (12 sentences)",
"education": [{{
"institution": "Institution name",
"degree": "Degree name",
"year": "Year of graduation" (if provided) or '',
"gpa": "GPA (if provided)",
"relevant_courses": ["list", "of", "courses"](if provided) or []
}}],
"skills": {{
"category_1": ["skill_a", "skill_b"],
"uncategorized": ["tool_x"]
}},
"experience": [{{
"company": "Company name",
"job_title": "Job Title",
"location": "Location",
"start_date": "YYYY-MM",
"end_date": "YYYY-MM or Present",
"key_achievements": ["concise bullet points"] (if provided) or []
}}],
"projects": [{{
"name": "Project name",
"year": "Year",
"technologies_used": ["list", "of", "tech"] (if provided) or [],
"brief_description": "description"
}}]
}}
2. "resume_data_ar": {{ /* Arabic Translated Parsed Data (Keys in English, Values in Arabic) */
"full_name": "الاسم الكامل للمرشح",
"current_title": "أحدث أو الحالي مسمى وظيفي",
"location": "المدينة والدولة",
"contact": "رقم الهاتف والبريد الإلكتروني",
"linkedin": "رابط ملف LinkedIn الشخصي",
"github": "رابط GitHub أو ملف الأعمال",
"summary": "ملف تعريفي مهني موجز أو ملخص (جملة واحدة أو جملتين)",
"education": [{{
"institution": "اسم المؤسسة",
"degree": "اسم الدرجة العلمية",
"year": "سنة التخرج (إذا توفرت) أو ''",
"gpa": "المعدل التراكمي (إذا توفر)",
"relevant_courses": ["قائمة", "بالدورات", "ذات", "الصلة"](إذا توفرت) أو []
}}],
"skills": {{
"category_1": ["مهارة_أ", "مهارة_ب"],
"uncategorized": ["أداة_س"]
}},
"experience": [{{
"company": "اسم الشركة",
"job_title": "المسمى الوظيفي",
"location": "الموقع",
"start_date": "السنة-الشهر (YYYY-MM)",
"end_date": "السنة-الشهر (YYYY-MM) أو Present",
"key_achievements": ["نقاط", "رئيسية", "موجزة", "للإنجازات"] (إذا توفرت) أو []
}}],
"projects": [{{
"name": "اسم المشروع",
"year": "السنة",
"technologies_used": ["قائمة", "بالتقنيات", "المستخدمة"] (إذا توفرت) أو [],
"brief_description": "وصف موجز"
}}]
}}
3. "analysis_data_en": {{ /* English Analysis and Narratives */
"match_score": "Integer Score 0-100",
"strengths": "Brief summary of strengths",
"weaknesses": "Brief summary of weaknesses",
"years_of_experience": "Total years of experience (float, e.g., 6.5)",
"criteria_checklist": List of job requirements if any {{ "Python": "Met", "AWS": "Not Met"}} only output the criteria_checklist in one of ('Met','Not Met') don't output any extra text,
"category": "Most fitting professional field (e.g., Data Science), only output the category name and no other text example ('Software Development', 'correct') , ('Software Development and devops','wrong') ('Software Development / Backend Development','wrong')",
"most_recent_job_title": "Candidate's most recent job title",
"recommendation": "Detailed hiring recommendation narrative",
"top_3_keywords": ["keyword1", "keyword2", "keyword3"],
"job_fit_narrative": "Single, concise summary sentence",
"language_fluency": ["language: fluency_level"],
"screening_stage_rating": "Standardized rating (Highly Qualified, Qualified , Partially Qualified, Not Qualified)",
"min_req_met_bool": "Boolean (true/false)",
"soft_skills_score": "Integer Score 0-100 for inferred non-technical skills",
"experience_industry_match": "Integer Score 0-100 for industry relevance",
"seniority_level_match": "Integer Score 0-100 for alignment with JD's seniority level",
"red_flags": ["List of any potential concerns (if any): e.g., 'Employment gap 1 year', 'Frequent job hopping', 'Missing required certification'"],
"employment_stability_score": "Integer Score 0-100 (Higher is more stable/longer tenure) (if possible)",
"transferable_skills_narrative": "A brief sentence describing the relevance of non-core experience (if applicable).",
"cultural_fit_keywords": ["A list of 3-5 keywords extracted from the resume (if possible) (e.g., 'team-player', 'mentored', 'cross-functional')"]
}}
4. "analysis_data_ar": {{ /* Identical Analysis structure, but with Arabic Translated Narratives */
"match_score": "Integer Score 0-100",
"strengths": "ملخص موجز لنقاط القوة",
"weaknesses": "ملخص موجز لنقاط الضعف",
"years_of_experience": "Total years of experience (float, e.g., 6.5)",
"criteria_checklist": List of job requirements if any {{ "Python": "Met", "AWS": "Not Met"}} only output the criteria_checklist in one of ('Met','Not Met') don't output any extra text,
"category": "Most fitting professional field (e.g., Data Science), only output the category name and no other text example ('Software Development', 'correct') , ('Software Development and devops','wrong') ('Software Development / Backend Development','wrong')",
"most_recent_job_title": "Candidate's most recent job title",
"recommendation": "سرد تفصيلي بتوصية التوظيف",
"top_3_keywords": ["keyword1", "keyword2", "keyword3"],
"job_fit_narrative": "جملة واحدة موجزة تلخص مدى ملاءمة الوظيفة",
"language_fluency": ["language: fluency_level"],
"screening_stage_rating": "Standardized rating (Highly Qualified, Qualified , Partially Qualified, Not Qualified)",
"min_req_met_bool": "Boolean (true/false)",
"soft_skills_score": "Integer Score 0-100 for inferred non-technical skills",
"experience_industry_match": "Integer Score 0-100 for industry relevance",
"seniority_level_match": "Integer Score 0-100 for alignment with JD's seniority level",
"red_flags": ["List of any potential concerns (if any): e.g., 'Employment gap 1 year', 'Frequent job hopping', 'Missing required certification'"],
"employment_stability_score": "Integer Score 0-100 (Higher is more stable/longer tenure) (if possible)",
"transferable_skills_narrative": "جملة موجزة تصف أهمية الخبرة غير الأساسية (إذا انطبقت).",
"cultural_fit_keywords": ["A list of 3-5 keywords extracted from the resume (if possible) (e.g., 'team-player', 'mentored', 'cross-functional')"]
}}
If a top-level key or its required fields are missing, set the field to null, an empty list, or an empty object as appropriate.
Be Clear and Direct Avoid overly indirect politeness which can add confusion.
Be strict,objective and concise and critical in your responses, and don't give inflated scores to weak candidates.
Output only valid JSON—no markdown, no extra text.
"""
try:
# Call the AI handler
result = ai_handler(prompt)
if result["status"] == "error":
logger.error(f"AI handler returned error for candidate {instance.pk}")
print(f"AI handler returned error for candidate {instance.pk}")
return
# Ensure the result is parsed as a Python dict
data = result["data"]
if isinstance(data, str):
data = json.loads(data)
print(data)
except Exception as e:
logger.error(f"AI handler failed for candidate {instance.pk}: {e}")
print(f"AI handler failed for candidate {instance.pk}: {e}")
return
# --- 4. Atomic Database Update (Ensures data integrity) ---
with transaction.atomic():
# 2. Update the Full JSON Field (ai_analysis_data)
if instance.ai_analysis_data is None:
instance.ai_analysis_data = {}
# Save all four structured outputs into the single JSONField
instance.ai_analysis_data = data
instance.is_resume_parsed = True
# Save changes to the database
# NOTE: If you extract individual fields (like match_score) to separate columns,
# ensure those are handled here, using data.get('analysis_data_en', {}).get('match_score').
instance.save(update_fields=["ai_analysis_data", "is_resume_parsed"])
logger.info(
f"Successfully scored and saved analysis (EN/AR) for candidate {instance.id}"
)
print(f"Successfully scored and saved analysis (EN/AR) for candidate {instance.id}")
from django.utils import timezone
def create_interview_and_meeting(schedule_id):
"""
Synchronous task for a single interview slot, dispatched by django-q.
"""
try:
schedule = ScheduledInterview.objects.get(pk=schedule_id)
interview = schedule.interview
logger.info(f"Processing schedule {schedule_id} with interview {interview.id}")
logger.info(f"Interview topic: {interview.topic}")
logger.info(f"Interview start_time: {interview.start_time}")
logger.info(f"Interview duration: {interview.duration}")
result = create_zoom_meeting(interview.topic, interview.start_time, interview.duration)
if result["status"] == "success":
interview.meeting_id = result["meeting_details"]["meeting_id"]
interview.join_url = result["meeting_details"]["join_url"]
interview.host_email = result["meeting_details"]["host_email"]
interview.password = result["meeting_details"]["password"]
interview.zoom_gateway_response = result["zoom_gateway_response"]
interview.save()
logger.info(f"Successfully scheduled interview for {schedule.application.name}")
return True
else:
# Handle Zoom API failure (e.g., log it or notify administrator)
logger.error(f"Zoom API failed for {schedule.application.name}: {result['message']}")
return False # Task failed
except Exception as e:
# Catch any unexpected errors during database lookups or processing
logger.error(f"Critical error scheduling interview: {e}")
return False # Task failed
def handle_zoom_webhook_event(payload):
"""
Background task to process a Zoom webhook event and update the local ZoomMeeting status.
It handles: created, updated, started, ended, and deleted events.
"""
event_type = payload.get("event")
object_data = payload["payload"]["object"]
meeting_id = str(object_data.get("id"))
if not meeting_id:
logger.warning(f"Webhook received without a valid Meeting ID: {event_type}")
return False
try:
meeting_instance = Interview.objects.filter(meeting_id=meeting_id).first()
if event_type == "meeting.updated":
logger.info(f"Zoom meeting updated: {meeting_id}")
if meeting_instance:
# Update key fields from the webhook payload
meeting_instance.topic = object_data.get(
"topic", meeting_instance.topic
)
meeting_instance.start_time = object_data.get(
"start_time", meeting_instance.start_time
)
meeting_instance.duration = object_data.get(
"duration", meeting_instance.duration
)
meeting_instance.timezone = object_data.get(
"timezone", meeting_instance.timezone
)
meeting_instance.status = object_data.get(
"status", meeting_instance.status
)
meeting_instance.save(
update_fields=[
"topic",
"start_time",
"duration",
"timezone",
"status",
]
)
# --- 3. Deletion Event (User Action) ---
elif event_type in ["meeting.started","meeting.ended","meeting.deleted"]:
if meeting_instance:
try:
meeting_instance.status = event_type.split(".")[-1]
meeting_instance.save(update_fields=["status"])
except Exception as e:
logger.error(f"Failed to mark Zoom meeting as cancelled: {e}")
return True
except Exception as e:
logger.error(
f"Failed to process Zoom webhook for {event_type} (ID: {meeting_id}): {e}",
exc_info=True,
)
return False
def linkedin_post_task(job_slug, access_token):
# for linked post background tasks
job = get_object_or_404(JobPosting, slug=job_slug)
try:
service = LinkedInService()
service.access_token = access_token
# long running task
result = service.create_job_post(job)
# update the jobposting object with the final result
if result["success"]:
job.posted_to_linkedin = True
job.linkedin_post_id = result["post_id"]
job.linkedin_post_url = result["post_url"]
job.linkedin_post_status = "SUCCESSS"
job.linkedin_posted_at = timezone.now()
else:
error_msg = result.get("error", "Unknown API error")
job.linkedin_post_status = "FAILED"
logger.error(f"LinkedIn post failed for job {job_slug}: {error_msg}")
job.save()
return result["success"]
except Exception as e:
logger.error(
f"Critical error in LinkedIn task for job {job_slug}: {e}", exc_info=True
)
# Update job status with the critical error
job.linkedin_post_status = f"CRITICAL_ERROR: {str(e)}"
job.save()
return False
def form_close(job_id):
job = get_object_or_404(JobPosting, pk=job_id)
job.is_active = False
job.template_form.is_active = False
job.save()
# TODO:send email to admins
def sync_hired_candidates_task(job_slug):
"""
Django-Q background task to sync hired candidates to all configured sources.
Args:
job_slug (str): The slug of the job posting
Returns:
dict: Sync results with status and details
"""
from .models import JobPosting, IntegrationLog
logger.info(f"Starting background sync task for job: {job_slug}")
job = JobPosting.objects.get(slug=job_slug)
source = job.source
if source.sync_status == "DISABLED":
logger.warning(
f"Source {source.name} is disabled. Aborting sync for job {job_slug}."
)
return {"status": "error", "message": "Source is disabled"}
source.sync_status = "SYNCING"
source.save(update_fields=["sync_status"])
# Prepare and send the sync request
try:
request_data = {
"internal_job_id": job.internal_job_id,
"data": job.source_sync_data,
}
results = requests.post(
url=source.sync_endpoint,
headers=source.custom_headers,
json=request_data,
timeout=30,
)
# response_data = results.json()
if results.status_code == 200:
IntegrationLog.objects.create(
source=source,
action=IntegrationLog.ActionChoices.SYNC,
endpoint=source.sync_endpoint,
method="POST",
request_data=request_data,
status_code=results.status_code,
ip_address="127.0.0.1",
user_agent="",
)
source.last_sync_at = timezone.now()
source.sync_status = "SUCCESS"
source.save(update_fields=["last_sync_at", "sync_status"])
logger.info(f"Background sync completed for job {job_slug}: {results}")
return results
else:
error_msg = (
f"Source API returned status {results.status_code}: {results.text}"
)
logger.error(error_msg)
IntegrationLog.objects.create(
source=source,
action=IntegrationLog.ActionChoices.ERROR,
endpoint=source.sync_endpoint,
method="POST",
request_data={
"message": "Failed to sync hired candidates",
"internal_job_id": job.internal_job_id,
},
error_message=error_msg,
status_code="ERROR",
ip_address="127.0.0.1",
user_agent="",
)
source.sync_status = "ERROR"
source.save(update_fields=["sync_status"])
return {"status": "error", "message": error_msg}
except Exception as e:
error_msg = f"Unexpected error during sync: {str(e)}"
logger.error(error_msg, exc_info=True)
IntegrationLog.objects.create(
source=source,
action=IntegrationLog.ActionChoices.ERROR,
endpoint=source.sync_endpoint,
method="POST",
request_data={"status": "error"},
error_message=error_msg,
status_code="ERROR",
ip_address="127.0.0.1",
user_agent="",
)
source.sync_status = "ERROR"
source.save(update_fields=["sync_status"])
# def sync_candidate_to_source_task(candidate_id, source_id):
# """
# Django-Q background task to sync a single candidate to a specific source.
# Args:
# candidate_id (int): The ID of the candidate
# source_id (int): The ID of the source
# Returns:
# dict: Sync result for this specific candidate-source pair
# """
# from .candidate_sync_service import CandidateSyncService
# from .models import Application, Source, IntegrationLog
# logger.info(f"Starting sync task for candidate {candidate_id} to source {source_id}")
# try:
# # Get the candidate and source
# application = Application.objects.get(pk=candidate_id)
# source = Source.objects.get(pk=source_id)
# # Initialize sync service
# sync_service = CandidateSyncService()
# # Perform the sync operation
# result = sync_service.sync_candidate_to_source(application, source)
# # Log the operation
# IntegrationLog.objects.create(
# source=source,
# action=IntegrationLog.ActionChoices.SYNC,
# endpoint=source.sync_endpoint or "unknown",
# method=source.sync_method or "POST",
# request_data={"candidate_id": candidate_id, "application_name": application.name},
# response_data=result,
# status_code="SUCCESS" if result.get('success') else "ERROR",
# error_message=result.get('error') if not result.get('success') else None,
# ip_address="127.0.0.1",
# user_agent="Django-Q Background Task",
# processing_time=result.get('duration', 0)
# )
# logger.info(f"Sync completed for candidate {candidate_id} to source {source_id}: {result}")
# return result
# except Application.DoesNotExist:
# error_msg = f"Application not found: {candidate_id}"
# logger.error(error_msg)
# return {"success": False, "error": error_msg}
# except Source.DoesNotExist:
# error_msg = f"Source not found: {source_id}"
# logger.error(error_msg)
# return {"success": False, "error": error_msg}
# except Exception as e:
# error_msg = f"Unexpected error during sync: {str(e)}"
# logger.error(error_msg, exc_info=True)
# return {"success": False, "error": error_msg}
from django.conf import settings
from django.core.mail import EmailMultiAlternatives
from django.utils.html import strip_tags
# def _task_send_individual_email(subject, body_message, recipient, attachments,sender,job):
# """Internal helper to create and send a single email."""
# from_email = getattr(settings, 'DEFAULT_FROM_EMAIL', 'noreply@kaauh.edu.sa')
# is_html = '<' in body_message and '>' in body_message
# if is_html:
# plain_message = strip_tags(body_message)
# email_obj = EmailMultiAlternatives(subject=subject, body=plain_message, from_email=from_email, to=[recipient])
# email_obj.attach_alternative(body_message, "text/html")
# else:
# email_obj = EmailMultiAlternatives(subject=subject, body=body_message, from_email=from_email, to=[recipient])
# if attachments:
# for attachment in attachments:
# if isinstance(attachment, tuple) and len(attachment) == 3:
# filename, content, content_type = attachment
# email_obj.attach(filename, content, content_type)
# try:
# result=email_obj.send(fail_silently=False)
# if result==1 and sender and job: # job is none when email sent after message creation
# try:
# user=get_object_or_404(User,email=recipient)
# new_message = Message.objects.create(
# sender=sender,
# recipient=user,
# job=job,
# subject=subject,
# content=body_message, # Store the full HTML or plain content
# message_type='DIRECT',
# is_read=False, # It's just sent, not read yet
# )
# logger.info(f"Stored sent message ID {new_message.id} in DB.")
# except Exception as e:
# logger.error(f"Email sent to {recipient}, but failed to store in DB: {str(e)}")
# return result == 1
# except Exception as e:
# logger.error(f"Failed to send email to {recipient}: {str(e)}", exc_info=True)
def _task_send_individual_email(
subject,
body_message,
recipient,
attachments=None,
sender=None,
job=None,
context=None,
):
"""
Creates and sends a single email using the branded HTML template.
If the context is provided, it renders the branded template.
If the context is None, it sends the plain body_message.
Args:
subject (str): The email subject.
body_message (str): The main content of the email.
recipient (str): The recipient's email address.
attachments (list, optional): List of (filename, content, mimetype) tuples.
sender (User, optional): The User object who initiated the send.
job (Job, optional): The associated Job object (if any).
context (dict, optional): Context data for rendering the HTML template.
Returns:
bool: True if the email was successfully sent and logged, False otherwise.
"""
from_email = getattr(settings, "DEFAULT_FROM_EMAIL", "noreply@kaauh.edu.sa")
# --- 1. Template Rendering (New Logic) ---
if context:
# 1a. Populate the base context required by the branded template
base_context = {
"subject": subject,
"user_name": context.pop(
"user_name", recipient
), # Expect user_name from context or default to email
"email_message": body_message,
"user_email": recipient,
"logo_url": context.pop(
"logo_url", settings.STATIC_URL + "/images/kaauh-logo.png"
),
# Merge any other custom context variables
**context,
}
try:
html_content = render_to_string("emails/email_template.html", base_context)
plain_message = strip_tags(html_content)
except Exception as e:
logger.error(
f"Error rendering HTML template for {recipient}. Sending plain text instead. Error: {e}"
)
html_content = None
plain_message = body_message # Fallback to the original body_message
else:
# Use the original body_message as the plain text body
html_content = None
plain_message = body_message
# --- 2. Create Email Object ---
email_obj = EmailMultiAlternatives(
subject=subject,
body=plain_message, # Always use plain text for the main body
from_email=from_email,
to=[recipient],
)
# Attach HTML alternative if rendered successfully
if html_content:
email_obj.attach_alternative(html_content, "text/html")
# --- 3. Attachments ---
if attachments:
for attachment in attachments:
if isinstance(attachment, tuple) and len(attachment) == 3:
filename, content, content_type = attachment
email_obj.attach(filename, content, content_type)
# --- 4. Send and Log ---
try:
# Note: EmailMultiAlternatives inherits from EmailMessage and uses .send()
result = email_obj.send(fail_silently=False)
if (
result == 1 and sender and job
): # job is None when email sent after message creation
# --- Assuming Message and User are available ---
try:
# IMPORTANT: You need to define how to find the User object from the recipient email.
# Assuming you have access to the User model and get_object_or_404
# User = ... # Define or import your User model
# Message = ... # Define or import your Message model
user = User.objects.get(email=recipient)
new_message = Message.objects.create(
sender=sender,
recipient=user,
job=job,
subject=subject,
content=html_content
or body_message, # Store HTML if sent, otherwise store original body
message_type="DIRECT",
is_read=False,
)
logger.info(
f"Stored sent message ID {new_message.id} for {recipient} in DB."
)
except Exception as e:
logger.error(
f"Email sent successfully to {recipient}, but failed to store message in DB: {str(e)}"
)
# Continue execution even if logging fails, as the email was sent
return result == 1 # Return True if send was successful
except Exception as e:
logger.error(f"Failed to send email to {recipient}: {str(e)}", exc_info=True)
return False
# def send_bulk_email_task(
# subject,
# recipients,
# attachments=None,
# sender_user_id=None,
# job_id=None,
# hook="recruitment.tasks.email_success_hook",
# ):
# """
# Django-Q background task to send pre-formatted email to a list of recipients.,
# Receives arguments directly from the async_task call.
# """
# logger.info(f"Starting bulk email task for {len(recipients)} recipients")
# successful_sends = 0
# total_recipients = len(recipients)
# if not recipients:
# return {"success": False, "error": "No recipients provided to task."}
# sender = get_object_or_404(User, pk=sender_user_id)
# job = get_object_or_404(JobPosting, pk=job_id)
# # Since the async caller sends one task per recipient, total_recipients should be 1.
# for recipient_email in recipients:
# # The 'message' is the custom message specific to this recipient.
# r = _task_send_individual_email(
# subject, recipient_email, attachments, sender, job
# )
# print(f"Email send result for {recipient_email}: {r}")
# if r:
# successful_sends += 1
# print(f"successful_sends: {successful_sends} out of {total_recipients}")
# if successful_sends > 0:
# logger.info(
# f"Bulk email task completed successfully. Sent to {successful_sends}/{total_recipients} recipients."
# )
# return {
# "success": True,
# "recipients_count": successful_sends,
# "message": f"Sent successfully to {successful_sends} recipient(s).",
# }
# else:
# logger.error(f"Bulk email task failed: No emails were sent successfully.")
# return {
# "success": False,
# "error": "No emails were sent successfully in the background task.",
# }
def email_success_hook(task):
"""
The success hook must accept the Task object as the first and only required positional argument.
"""
if task.success:
logger.info(f"Task ID {task.id} succeeded. Result: {task.result}")
else:
logger.error(f"Task ID {task.id} failed. Error: {task.result}")
import io
import zipfile
import os
from django.core.files.base import ContentFile
from django.conf import settings
from .models import Application, JobPosting # Import your models
ALLOWED_EXTENSIONS = (".pdf", ".docx")
def generate_and_save_cv_zip(job_posting_id):
"""
Generates a zip file of all CVs for a job posting and saves it to the job model.
"""
job = JobPosting.objects.get(id=job_posting_id)
entries = Application.objects.filter(job=job)
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
for entry in entries:
if not entry.resume:
continue
file_name = entry.resume.name.split("/")[-1]
file_name_lower = file_name.lower()
if file_name_lower.endswith(ALLOWED_EXTENSIONS):
try:
with entry.resume.open("rb") as file_obj:
file_content = file_obj.read()
zf.writestr(file_name, file_content)
except Exception as e:
# Log the error using Django's logging system if set up
print(f"Error processing file {file_name}: {e}")
continue
# 4. Save the generated zip buffer to the JobPosting model
zip_buffer.seek(0)
now = str(timezone.now())
zip_filename = f"all_cvs_for_{job.slug}_{job.title}_{now}.zip"
# Use ContentFile to save the bytes stream into the FileField
job.cv_zip_file.save(zip_filename, ContentFile(zip_buffer.read()))
job.zip_created = True # Assuming you added a BooleanField for tracking completion
job.save()
return f"Successfully created zip for Job ID {job.slug} {job_posting_id}"
def send_one_day_reminder(job_id):
"""
Send email reminder 1 day before job application deadline.
"""
try:
job = JobPosting.objects.get(pk=job_id)
# Only send if job is still active
if job.status != "ACTIVE":
logger.info(f"Job {job_id} is no longer active, skipping 1-day reminder")
return
# Get application count
application_count = Application.objects.filter(job=job).count()
# Determine recipients
recipients = []
if job.assigned_to:
recipients.append(job.assigned_to.email)
# Add admin users as fallback or additional recipients
admin_users = User.objects.filter(is_staff=True)
if not recipients: # If no assigned user, send to all admins
recipients = [admin.email for admin in admin_users]
if not recipients:
logger.warning(f"No recipients found for job {job_id} 1-day reminder")
return
# Create email content
subject = f"Reminder: Job '{job.title}' closes tomorrow"
html_message = f"""
<html>
<body>
<h2>Job Closing Reminder</h2>
<p><strong>Job Title:</strong> {job.title}</p>
<p><strong>Application Deadline:</strong> {job.application_deadline.strftime("%B %d, %Y")}</p>
<p><strong>Current Applications:</strong> {application_count}</p>
<p><strong>Status:</strong> {job.get_status_display()}</p>
<p>This job posting will close <strong>tomorrow</strong>. Please review any pending applications before the deadline.</p>
<p><a href="/recruitment/jobs/{job.pk}/" style="background-color: #007cba; color: white; padding: 10px 20px; text-decoration: none; border-radius: 5px;">View Job Details</a></p>
<hr>
<p><small>This is an automated reminder from the KAAUH Recruitment System.</small></p>
</body>
</html>
"""
# Send email to each recipient
for recipient_email in recipients:
_task_send_individual_email(
subject, html_message, recipient_email, None, None, None
)
logger.info(
f"Sent 1-day reminder for job {job_id} to {len(recipients)} recipients"
)
except JobPosting.DoesNotExist:
logger.error(f"Job {job_id} not found for 1-day reminder")
except Exception as e:
logger.error(f"Error sending 1-day reminder for job {job_id}: {str(e)}")
def send_fifteen_minute_reminder(job_id):
"""
Send final email reminder 15 minutes before job application deadline.
"""
try:
job = JobPosting.objects.get(pk=job_id)
# Only send if job is still active
if job.status != "ACTIVE":
logger.info(
f"Job {job_id} is no longer active, skipping 15-minute reminder"
)
return
# Get application count
application_count = Application.objects.filter(job=job).count()
# Determine recipients
recipients = []
if job.assigned_to:
recipients.append(job.assigned_to.email)
# Add admin users as fallback or additional recipients
admin_users = User.objects.filter(is_staff=True)
if not recipients: # If no assigned user, send to all admins
recipients = [admin.email for admin in admin_users]
if not recipients:
logger.warning(f"No recipients found for job {job_id} 15-minute reminder")
return
# Create email content
subject = f"FINAL REMINDER: Job '{job.title}' closes in 15 minutes"
html_message = f"""
<html>
<body>
<h2 style="color: #d63384;">⚠️ FINAL REMINDER</h2>
<p><strong>Job Title:</strong> {job.title}</p>
<p><strong>Application Deadline:</strong> {job.application_deadline.strftime("%B %d, %Y at %I:%M %p")}</p>
<p><strong>Current Applications:</strong> {application_count}</p>
<p><strong>Status:</strong> {job.get_status_display()}</p>
<p style="color: #d63384; font-weight: bold;">This job posting will close in <strong>15 minutes</strong>. This is your final reminder to review any pending applications.</p>
<p><a href="/recruitment/jobs/{job.pk}/" style="background-color: #dc3545; color: white; padding: 10px 20px; text-decoration: none; border-radius: 5px;">View Job Details Now</a></p>
<hr>
<p><small>This is an automated final reminder from the KAAUH Recruitment System.</small></p>
</body>
</html>
"""
# Send email to each recipient
for recipient_email in recipients:
_task_send_individual_email(
subject, html_message, recipient_email, None, None, None
)
logger.info(
f"Sent 15-minute reminder for job {job_id} to {len(recipients)} recipients"
)
except JobPosting.DoesNotExist:
logger.error(f"Job {job_id} not found for 15-minute reminder")
except Exception as e:
logger.error(f"Error sending 15-minute reminder for job {job_id}: {str(e)}")
def send_job_closed_notification(job_id):
"""
Send notification when job has closed and update job status.
"""
try:
job = JobPosting.objects.get(pk=job_id)
# Only proceed if job is currently active
if job.status != "ACTIVE":
logger.info(
f"Job {job_id} is already not active, skipping closed notification"
)
return
# Get final application count
application_count = Application.objects.filter(job=job).count()
# Update job status to closed
job.status = "CLOSED"
job.save(update_fields=["status"])
# Also close the form template
if job.template_form:
job.template_form.is_active = False
job.template_form.save(update_fields=["is_active"])
# Determine recipients
recipients = []
if job.assigned_to:
recipients.append(job.assigned_to.email)
# Add admin users as fallback or additional recipients
admin_users = User.objects.filter(is_staff=True)
if not recipients: # If no assigned user, send to all admins
recipients = [admin.email for admin in admin_users]
if not recipients:
logger.warning(f"No recipients found for job {job_id} closed notification")
return
# Create email content
subject = (
f"Job '{job.title}' has closed - {application_count} applications received"
)
html_message = f"""
<html>
<body>
<h2>Job Closed Notification</h2>
<p><strong>Job Title:</strong> {job.title}</p>
<p><strong>Application Deadline:</strong> {job.application_deadline.strftime("%B %d, %Y at %I:%M %p")}</p>
<p><strong>Total Applications Received:</strong> <strong style="color: #28a745;">{application_count}</strong></p>
<p><strong>Status:</strong> {job.get_status_display()}</p>
<p>The job posting has been automatically closed and is no longer accepting applications.</p>
<p><a href="/recruitment/jobs/{job.pk}/" style="background-color: #6c757d; color: white; padding: 10px 20px; text-decoration: none; border-radius: 5px;">View Job Details</a></p>
<p><a href="/recruitment/applications/?job={job.pk}" style="background-color: #007cba; color: white; padding: 10px 20px; text-decoration: none; border-radius: 5px;">View Applications</a></p>
<hr>
<p><small>This is an automated notification from the KAAUH Recruitment System.</small></p>
</body>
</html>
"""
# Send email to each recipient
for recipient_email in recipients:
_task_send_individual_email(
subject, html_message, recipient_email, None, None, None
)
logger.info(
f"Sent job closed notification for job {job_id} to {len(recipients)} recipients"
)
except JobPosting.DoesNotExist:
logger.error(f"Job {job_id} not found for closed notification")
except Exception as e:
logger.error(
f"Error sending job closed notification for job {job_id}: {str(e)}"
)
def send_email_task(
recipient_emails,
subject: str,
template_name: str,
context: dict,
) -> str:
"""
Django-Q task to send a bulk email asynchronously.
"""
from .services.email_service import EmailService
if not recipient_emails:
return json.dumps({"status": "error", "message": "No recipients provided."})
service = EmailService()
# Execute the bulk sending method
processed_count = service.send_email_service(
recipient_emails=recipient_emails,
subject=subject,
template_name=template_name,
context=context,
)
# The return value is stored in the result object for monitoring
return json.dumps({
"status": "success",
"count": processed_count,
"message": f"Attempted to send email to {len(recipient_emails)} recipients. Service reported processing {processed_count}."
})
def generate_interview_questions(schedule_id: int) -> dict:
"""
Generate AI-powered interview questions based on job requirements and candidate profile.
Args:
schedule_id (int): The ID of the scheduled interview
Returns:
dict: Result containing status and generated questions or error message
"""
from .models import ScheduledInterview
try:
# Get the scheduled interview with related data
schedule = ScheduledInterview.objects.get(pk=schedule_id)
application = schedule.application
job = schedule.job
logger.info(f"Generating interview questions for schedule {schedule_id}")
# Prepare context for AI
job_description = job.description or ""
job_qualifications = job.qualifications or ""
candidate_resume_text = ""
# Extract candidate resume text if available and parsed
if application.ai_analysis_data:
resume_data_en = application.ai_analysis_data.get('resume_data_en', {})
candidate_resume_text = f"""
Candidate Name: {resume_data_en.get('full_name', 'N/A')}
Current Title: {resume_data_en.get('current_title', 'N/A')}
Summary: {resume_data_en.get('summary', 'N/A')}
Skills: {resume_data_en.get('skills', {})}
Experience: {resume_data_en.get('experience', [])}
Education: {resume_data_en.get('education', [])}
"""
# Create the AI prompt
prompt = f"""
You are an expert technical interviewer and hiring manager. Generate relevant interview questions based on the following information:
JOB INFORMATION:
Job Title: {job.title}
Department: {job.department}
Job Description: {job_description}
Qualifications: {job_qualifications}
CANDIDATE PROFILE:
{candidate_resume_text}
TASK:
Generate 8-10 interview questions in english and arabic that are:
1. Technical questions related to the job requirements
2. Behavioral questions to assess soft skills and cultural fit
3. Situational questions to evaluate problem-solving abilities
4. Questions should be appropriate for the candidate's experience level
For each question, specify:
- Type: "technical", "behavioral", or "situational"
- Difficulty: "easy", "medium", or "hard"
- Category: A brief category name (e.g., "Python Programming", "Team Collaboration", "Problem Solving")
- Question: The actual interview question
OUTPUT FORMAT:
Return a JSON object with the following structure:
{{
"questions": {{
"en":[
{{
"question_text": "The actual question text",
"question_type": "technical|behavioral|situational",
"difficulty_level": "easy|medium|hard",
"category": "Category name"
}}
],
"ar":[
{{
"question_text": "The actual question text",
"question_type": "technical|behavioral|situational",
"difficulty_level": "easy|medium|hard",
"category": "Category name"
}}
]}}
}}
Make questions specific to the job requirements and candidate background. Avoid generic questions.
Output only valid JSON — no markdown, no extra text.
"""
# Call AI handler
result = ai_handler(prompt)
if result["status"] == "error":
logger.error(f"AI handler returned error for interview questions: {result['data']}")
return {"status": "error", "message": "Failed to generate questions"}
# Parse AI response
data = result["data"]
if isinstance(data, str):
data = json.loads(data)
questions = data.get("questions", [])
if not questions:
return {"status": "error", "message": "No questions generated"}
if schedule.interview_questions is None:
schedule.interview_questions={}
schedule.interview_questions=questions
schedule.save(update_fields=["interview_questions"])
# schedule.save(update_fields=["interview_questions"])
logger.info(f"Successfully generated questions for schedule {schedule_id}")
return {
"status": "success",
"message": f"Generated interview questions"
}
except ScheduledInterview.DoesNotExist:
error_msg = f"Scheduled interview with ID {schedule_id} not found"
logger.error(error_msg)
return {"status": "error", "message": error_msg}
except Exception as e:
error_msg = f"Error generating interview questions: {str(e)}"
logger.error(error_msg, exc_info=True)
return {"status": "error", "message": error_msg}
# def send_single_email_task(
# recipient_emails,
# subject: str,
# template_name: str,
# context: dict,
# ) -> str:
# """
# Django-Q task to send a bulk email asynchronously.
# """
# from .services.email_service import EmailService
# # if not recipient_emails:
# # return json.dumps({"status": "error", "message": "No recipients provided."})
# # service = EmailService()
# # # Execute the bulk sending method
# # processed_count = service.send_bulk_email(
# # recipient_emails=recipient_emails,
# # subject=subject,
# # template_name=template_name,
# # context=context,
# # )
# # The return value is stored in the result object for monitoring
# return json.dumps({
# "status": "success",
# "count": processed_count,
# "message": f"Attempted to send email to {len(recipient_emails)} recipients. Service reported processing {processed_count}."
# })