`.
**TASK 2: LinkedIn Post Creation**
1. **Write the Post:** Create an engaging, professional, and concise LinkedIn post (maximum 1300 characters) summarizing the opportunity.
2. **Encourage Action:** The post must have a strong call-to-action (CTA) encouraging applications.
3. **Use Hashtags:** Integrate relevant industry, role, and company hashtags (including any provided in the raw input) naturally at the end of the post.
**STRICT JSON OUTPUT INSTRUCTIONS:**
Output a **single, valid JSON object** with **ONLY** the following three top-level key-value pairs.
* The values for `html_job_description` and `html_qualifications` MUST be the complete, formatted HTML strings (including all tags).
* The value for `linkedin_post` MUST be the complete, final LinkedIn post as a single string not greater than 3000 characters.
**Output Keys:**
1. `html_job_description`
2. `html_qualifications`
3. 'html_benefits'
4. 'html_application_instructions'
5. `linkedin_post_data`
**Do not include any other text, explanation, or markdown outside of the final JSON object.**
"""
result = ai_handler(prompt)
print(f"REsults: {result}")
if result['status'] == 'error':
logger.error(f"AI handler returned error for candidate {job_posting.pk}")
print(f"AI handler returned error for candidate {job_posting.pk}")
return
data = result['data']
if isinstance(data, str):
data = json.loads(data)
print(data)
job_posting.description = data.get('html_job_description')
job_posting.qualifications = data.get('html_qualifications')
job_posting.benefits=data.get('html_benefits')
job_posting.application_instructions=data.get('html_application_instruction')
job_posting.linkedin_post_formated_data=data.get('linkedin_post_data')
job_posting.ai_parsed = True
job_posting.save(update_fields=['description', 'qualifications','linkedin_post_formated_data','ai_parsed'])
def ai_handler(prompt):
print("model call")
response = requests.post(
url=OPENROUTER_API_URL,
headers={
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
},
data=json.dumps({
"model": OPENROUTER_MODEL,
"messages": [{"role": "user", "content": prompt}],
},
)
)
res = {}
print(response.status_code)
if response.status_code == 200:
res = response.json()
print(res)
content = res["choices"][0]['message']['content']
try:
# print(content)
content = content.replace("```json","").replace("```","")
res = json.loads(content)
print("success response")
return {"status": "success", "data": res}
except Exception as e:
print(e)
return {"status": "error", "data": str(e)}
else:
print("error response")
return {"status": "error", "data": response.json()}
def safe_cast_to_float(value, default=0.0):
"""Safely converts a value (int, float, or string) to a float."""
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
# Remove non-numeric characters except the decimal point
cleaned_value = re.sub(r'[^\d.]', '', value)
try:
# Ensure we handle empty strings after cleaning
return float(cleaned_value) if cleaned_value else default
except ValueError:
return default
return default
# def handle_resume_parsing_and_scoring(pk):
# """
# Optimized Django-Q task to parse a resume, score the candidate against a job,
# and atomically save the results.
# """
# # --- 1. Robust Object Retrieval (Prevents looping on DoesNotExist) ---
# try:
# instance = Application.objects.get(pk=pk)
# except Application.DoesNotExist:
# # Exit gracefully if the candidate was deleted after the task was queued
# logger.warning(f"Candidate matching query does not exist for pk={pk}. Exiting task.")
# print(f"Candidate matching query does not exist for pk={pk}. Exiting task.")
# return
# logger.info(f"Scoring resume for candidate {pk}")
# print(f"Scoring resume for candidate {pk}")
# # --- 2. I/O and Initial Data Check ---
# try:
# file_path = instance.resume.path
# if not os.path.exists(file_path):
# logger.warning(f"Resume file not found: {file_path}")
# print(f"Resume file not found: {file_path}")
# # Consider marking the task as unsuccessful but don't re-queue
# return
# # Use the new unified document parser
# resume_text = extract_text_from_document(file_path)
# job_detail = f"{instance.job.description} {instance.job.qualifications}"
# except Exception as e:
# logger.error(f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}")
# print(f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}")
# return
# print(resume_text)
# # --- 3. Single, Combined LLM Prompt (Major Cost & Latency Optimization) ---
# prompt = f"""
# You are an expert AI system functioning as both a Resume Parser and a Technical Recruiter.
# Your task is to:
# 1. **PARSE**: Extract all key-value information from the provided RESUME TEXT into a clean JSON structure under the key 'resume_data', preserving the original text and it's formatting and dont add any extra text.
# 2. **SCORE**: Analyze the parsed data against the JOB CRITERIA and generate a comprehensive score and analysis under the key 'analysis_data'.
# **JOB CRITERIA:**
# {job_detail}
# **RESUME TEXT:**
# {resume_text}
# **STRICT JSON OUTPUT INSTRUCTIONS:**
# Output a single, valid JSON object with ONLY the following two top-level keys:
# 1. "resume_data": {{
# "full_name": "Full name of the candidate",
# "current_title": "Most recent or current job title",
# "location": "City and state",
# "contact": "Phone number and email",
# "linkedin": "LinkedIn profile URL",
# "github": "GitHub or portfolio URL",
# "summary": "Brief professional profile or summary (1–2 sentences)",
# "education": [{{
# "institution": "Institution name",
# "degree": "Degree name",
# "year": "Year of graduation" (if provided) or '',
# "gpa": "GPA (if provided)",
# "relevant_courses": ["list", "of", "courses"](if provided) or []
# }}],
# "skills": {{
# "category_1": ["skill_a", "skill_b"],
# "uncategorized": ["tool_x"]
# }},
# "experience": [{{
# "company": "Company name",
# "job_title": "Job Title",
# "location": "Location",
# "start_date": "YYYY-MM",
# "end_date": "YYYY-MM or Present",
# "key_achievements": ["concise bullet points"] (if provided) or []
# }}],
# "projects": [{{
# "name": "Project name",
# "year": "Year",
# "technologies_used": ["list", "of", "tech"] (if provided) or [],
# "brief_description": "description"
# }}]
# }}
# 2. "analysis_data": {{
# "match_score": "Integer Score 0-100",
# "strengths": "Brief summary of strengths",
# "weaknesses": "Brief summary of weaknesses",
# "years_of_experience": "Total years of experience (float, e.g., 6.5)",
# "criteria_checklist": List of job requirements if any {{ "Python": "Met", "AWS": "Not Met"}} only output the criteria_checklist in one of ('Met','Not Met') don't output any extra text,
# "category": "Most fitting professional field (e.g., Data Science), only output the category name and no other text example ('Software Development', 'correct') , ('Software Development and devops','wrong') ('Software Development / Backend Development','wrong')",
# "most_recent_job_title": "Candidate's most recent job title",
# "recommendation": "Detailed hiring recommendation narrative",
# "top_3_keywords": ["keyword1", "keyword2", "keyword3"],
# "job_fit_narrative": "Single, concise summary sentence",
# "language_fluency": ["language: fluency_level"],
# "screening_stage_rating": "Standardized rating (Highly Qualified, Qualified , Partially Qualified, Not Qualified)",
# "min_req_met_bool": "Boolean (true/false)",
# "soft_skills_score": "Integer Score 0-100 for inferred non-technical skills",
# "experience_industry_match": "Integer Score 0-100 for industry relevance",
# "seniority_level_match": "Integer Score 0-100 for alignment with JD's seniority level",
# "red_flags": ["List of any potential concerns (if any): e.g., 'Employment gap 1 year', 'Frequent job hopping', 'Missing required certification'"],
# "employment_stability_score": "Integer Score 0-100 (Higher is more stable/longer tenure) (if possible)",
# "transferable_skills_narrative": "A brief sentence describing the relevance of non-core experience (if applicable).",
# "cultural_fit_keywords": ["A list of 3-5 keywords extracted from the resume (if possible) (e.g., 'team-player', 'mentored', 'cross-functional')"]
# }}
# If a top-level key or its required fields are missing, set the field to null, an empty list, or an empty object as appropriate.
# Output only valid JSON—no markdown, no extra text.
# """
# try:
# result = ai_handler(prompt)
# if result['status'] == 'error':
# logger.error(f"AI handler returned error for candidate {instance.pk}")
# print(f"AI handler returned error for candidate {instance.pk}")
# return
# # Ensure the result is parsed as a Python dict (if ai_handler returns a JSON string)
# data = result['data']
# if isinstance(data, str):
# data = json.loads(data)
# print(data)
# # parsed_summary = data.get('parsed_data', {})
# # scoring_result = data.get('scoring_data', {})
# except Exception as e:
# logger.error(f"AI handler failed for candidate {instance.pk}: {e}")
# print(f"AI handler failed for candidate {instance.pk}: {e}")
# return
# # --- 4. Atomic Database Update (Ensures data integrity) ---
# with transaction.atomic():
# # Map JSON keys to model fields with appropriate defaults
# # update_map = {
# # 'match_score': ('match_score', 0),
# # 'years_of_experience': ('years_of_experience', 0.0),
# # 'soft_skills_score': ('soft_skills_score', 0),
# # 'experience_industry_match': ('experience_industry_match', 0),
# # 'min_req_met_bool': ('min_req_met_bool', False),
# # 'screening_stage_rating': ('screening_stage_rating', 'N/A'),
# # 'most_recent_job_title': ('most_recent_job_title', 'N/A'),
# # 'top_3_keywords': ('top_3_keywords', []),
# # 'strengths': ('strengths', ''),
# # 'weaknesses': ('weaknesses', ''),
# # 'job_fit_narrative': ('job_fit_narrative', ''),
# # 'recommendation': ('recommendation', ''),
# # 'criteria_checklist': ('criteria_checklist', {}),
# # 'language_fluency': ('language_fluency', []),
# # 'category': ('category', 'N/A'),
# # }
# # Apply scoring results to the instance
# # for model_field, (json_key, default_value) in update_map.items():
# # instance.ai_analysis_data[model_field] = scoring_result.get(json_key, default_value)
# # instance.set_field(model_field, scoring_result.get(json_key, default_value))
# # instance.set_field("match_score" , int(safe_cast_to_float(scoring_result.get('match_score', 0), default=0)))
# # instance.set_field("years_of_experience" , safe_cast_to_float(scoring_result.get('years_of_experience', 0.0)))
# # instance.set_field("soft_skills_score" , int(safe_cast_to_float(scoring_result.get('soft_skills_score', 0), default=0)))
# # instance.set_field("experience_industry_match" , int(safe_cast_to_float(scoring_result.get('experience_industry_match', 0), default=0)))
# # # Other Model Fields
# # instance.set_field("min_req_met_bool" , scoring_result.get('min_req_met_bool', False))
# # instance.set_field("screening_stage_rating" , scoring_result.get('screening_stage_rating', 'N/A'))
# # instance.set_field("category" , scoring_result.get('category', 'N/A'))
# # instance.set_field("most_recent_job_title" , scoring_result.get('most_recent_job_title', 'N/A'))
# # instance.set_field("top_3_keywords" , scoring_result.get('top_3_keywords', []))
# # instance.set_field("strengths" , scoring_result.get('strengths', ''))
# # instance.set_field("weaknesses" , scoring_result.get('weaknesses', ''))
# # instance.set_field("job_fit_narrative" , scoring_result.get('job_fit_narrative', ''))
# # instance.set_field("recommendation" , scoring_result.get('recommendation', ''))
# # instance.set_field("criteria_checklist" , scoring_result.get('criteria_checklist', {}))
# # instance.set_field("language_fluency" , scoring_result.get('language_fluency', []))
# # 2. Update the Full JSON Field (ai_analysis_data)
# if instance.ai_analysis_data is None:
# instance.ai_analysis_data = {}
# # Save both structured outputs into the single JSONField for completeness
# instance.ai_analysis_data = data
# # instance.ai_analysis_data['parsed_data'] = parsed_summary
# # instance.ai_analysis_data['scoring_data'] = scoring_result
# # Apply parsing results
# # instance.parsed_summary = json.dumps(parsed_summary)
# instance.is_resume_parsed = True
# instance.save(update_fields=['ai_analysis_data', 'is_resume_parsed'])
# logger.info(f"Successfully scored and saved analysis for candidate {instance.id}")
# print(f"Successfully scored and saved analysis for candidate {instance.id}")
def handle_resume_parsing_and_scoring(pk: int):
"""
Optimized Django-Q task to parse a resume in English and Arabic, score the candidate,
and atomically save the results.
"""
# --- 1. Robust Object Retrieval (Prevents looping on DoesNotExist) ---
try:
# NOTE: Replace 'Application.objects.get' with your actual model manager call
instance = Application.objects.get(pk=pk)
except Application.DoesNotExist:
# Exit gracefully if the candidate was deleted after the task was queued
logger.warning(f"Candidate matching query does not exist for pk={pk}. Exiting task.")
print(f"Candidate matching query does not exist for pk={pk}. Exiting task.")
return
logger.info(f"Scoring resume for candidate {pk}")
print(f"Scoring resume for candidate {pk}")
# --- 2. I/O and Initial Data Check ---
try:
# Assuming instance.resume is a Django FileField
file_path = instance.resume.path
if not os.path.exists(file_path):
logger.warning(f"Resume file not found: {file_path}")
print(f"Resume file not found: {file_path}")
return
# Use the new unified document parser
resume_text = extract_text_from_document(file_path)
job_detail = f"{instance.job.description} {instance.job.qualifications}"
except Exception as e:
logger.error(f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}")
print(f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}")
return
print(resume_text)
# --- 3. Single, Combined LLM Prompt (Major Cost & Latency Optimization) ---
prompt = f"""
You are an expert AI system functioning as both a Resume Parser and a Technical Recruiter, capable of multi-language output.
Your task is to:
1. **PARSE (English)**: Extract all key-value information from the RESUME TEXT into a clean JSON structure under the key **'resume_data_en'**.
2. **PARSE (Arabic)**: Translate and output the exact same parsed data structure into Arabic under the key **'resume_data_ar'**. The keys must remain in English, but the values (names, titles, summaries, descriptions) must be in Arabic.
3. **SCORE (English)**: Analyze the data against the JOB CRITERIA and generate a comprehensive score and analysis under **'analysis_data_en'**, including an English narrative/recommendation.
4. **SCORE (Arabic)**: Output an identical analysis structure under **'analysis_data_ar'**, but ensure the narrative fields (**recommendation**, **job_fit_narrative**, **strengths**, **weaknesses**, **transferable_skills_narrative**) are translated into Arabic. All numerical and list fields (scores, checklist, keywords) must be identical to the English analysis.
**JOB CRITERIA:**
{job_detail}
**RESUME TEXT:**
{resume_text}
**STRICT JSON OUTPUT INSTRUCTIONS:**
You MUST output a single, valid JSON object.
This object MUST contain ONLY the following four top-level keys:
1. "resume_data_en"
2. "resume_data_ar"
3. "analysis_data_en"
4. "analysis_data_ar"
**ABSOLUTELY DO NOT use generic keys like "resume_data" or "analysis_data" at the top level.**
1. "resume_data_en": {{ /* English Parsed Data */
"full_name": "Full name of the candidate",
"current_title": "Most recent or current job title",
"location": "City and state",
"contact": "Phone number and email",
"linkedin": "LinkedIn profile URL",
"github": "GitHub or portfolio URL",
"summary": "Brief professional profile or summary (1–2 sentences)",
"education": [{{
"institution": "Institution name",
"degree": "Degree name",
"year": "Year of graduation" (if provided) or '',
"gpa": "GPA (if provided)",
"relevant_courses": ["list", "of", "courses"](if provided) or []
}}],
"skills": {{
"category_1": ["skill_a", "skill_b"],
"uncategorized": ["tool_x"]
}},
"experience": [{{
"company": "Company name",
"job_title": "Job Title",
"location": "Location",
"start_date": "YYYY-MM",
"end_date": "YYYY-MM or Present",
"key_achievements": ["concise bullet points"] (if provided) or []
}}],
"projects": [{{
"name": "Project name",
"year": "Year",
"technologies_used": ["list", "of", "tech"] (if provided) or [],
"brief_description": "description"
}}]
}}
2. "resume_data_ar": {{ /* Arabic Translated Parsed Data (Keys in English, Values in Arabic) */
"full_name": "الاسم الكامل للمرشح",
"current_title": "أحدث أو الحالي مسمى وظيفي",
"location": "المدينة والدولة",
"contact": "رقم الهاتف والبريد الإلكتروني",
"linkedin": "رابط ملف LinkedIn الشخصي",
"github": "رابط GitHub أو ملف الأعمال",
"summary": "ملف تعريفي مهني موجز أو ملخص (جملة واحدة أو جملتين)",
"education": [{{
"institution": "اسم المؤسسة",
"degree": "اسم الدرجة العلمية",
"year": "سنة التخرج (إذا توفرت) أو ''",
"gpa": "المعدل التراكمي (إذا توفر)",
"relevant_courses": ["قائمة", "بالدورات", "ذات", "الصلة"](إذا توفرت) أو []
}}],
"skills": {{
"category_1": ["مهارة_أ", "مهارة_ب"],
"uncategorized": ["أداة_س"]
}},
"experience": [{{
"company": "اسم الشركة",
"job_title": "المسمى الوظيفي",
"location": "الموقع",
"start_date": "السنة-الشهر (YYYY-MM)",
"end_date": "السنة-الشهر (YYYY-MM) أو Present",
"key_achievements": ["نقاط", "رئيسية", "موجزة", "للإنجازات"] (إذا توفرت) أو []
}}],
"projects": [{{
"name": "اسم المشروع",
"year": "السنة",
"technologies_used": ["قائمة", "بالتقنيات", "المستخدمة"] (إذا توفرت) أو [],
"brief_description": "وصف موجز"
}}]
}}
3. "analysis_data_en": {{ /* English Analysis and Narratives */
"match_score": "Integer Score 0-100",
"strengths": "Brief summary of strengths",
"weaknesses": "Brief summary of weaknesses",
"years_of_experience": "Total years of experience (float, e.g., 6.5)",
"criteria_checklist": List of job requirements if any {{ "Python": "Met", "AWS": "Not Met"}} only output the criteria_checklist in one of ('Met','Not Met') don't output any extra text,
"category": "Most fitting professional field (e.g., Data Science), only output the category name and no other text example ('Software Development', 'correct') , ('Software Development and devops','wrong') ('Software Development / Backend Development','wrong')",
"most_recent_job_title": "Candidate's most recent job title",
"recommendation": "Detailed hiring recommendation narrative",
"top_3_keywords": ["keyword1", "keyword2", "keyword3"],
"job_fit_narrative": "Single, concise summary sentence",
"language_fluency": ["language: fluency_level"],
"screening_stage_rating": "Standardized rating (Highly Qualified, Qualified , Partially Qualified, Not Qualified)",
"min_req_met_bool": "Boolean (true/false)",
"soft_skills_score": "Integer Score 0-100 for inferred non-technical skills",
"experience_industry_match": "Integer Score 0-100 for industry relevance",
"seniority_level_match": "Integer Score 0-100 for alignment with JD's seniority level",
"red_flags": ["List of any potential concerns (if any): e.g., 'Employment gap 1 year', 'Frequent job hopping', 'Missing required certification'"],
"employment_stability_score": "Integer Score 0-100 (Higher is more stable/longer tenure) (if possible)",
"transferable_skills_narrative": "A brief sentence describing the relevance of non-core experience (if applicable).",
"cultural_fit_keywords": ["A list of 3-5 keywords extracted from the resume (if possible) (e.g., 'team-player', 'mentored', 'cross-functional')"]
}}
4. "analysis_data_ar": {{ /* Identical Analysis structure, but with Arabic Translated Narratives */
"match_score": "Integer Score 0-100",
"strengths": "ملخص موجز لنقاط القوة",
"weaknesses": "ملخص موجز لنقاط الضعف",
"years_of_experience": "Total years of experience (float, e.g., 6.5)",
"criteria_checklist": List of job requirements if any {{ "Python": "Met", "AWS": "Not Met"}} only output the criteria_checklist in one of ('Met','Not Met') don't output any extra text,
"category": "Most fitting professional field (e.g., Data Science), only output the category name and no other text example ('Software Development', 'correct') , ('Software Development and devops','wrong') ('Software Development / Backend Development','wrong')",
"most_recent_job_title": "Candidate's most recent job title",
"recommendation": "سرد تفصيلي بتوصية التوظيف",
"top_3_keywords": ["keyword1", "keyword2", "keyword3"],
"job_fit_narrative": "جملة واحدة موجزة تلخص مدى ملاءمة الوظيفة",
"language_fluency": ["language: fluency_level"],
"screening_stage_rating": "Standardized rating (Highly Qualified, Qualified , Partially Qualified, Not Qualified)",
"min_req_met_bool": "Boolean (true/false)",
"soft_skills_score": "Integer Score 0-100 for inferred non-technical skills",
"experience_industry_match": "Integer Score 0-100 for industry relevance",
"seniority_level_match": "Integer Score 0-100 for alignment with JD's seniority level",
"red_flags": ["List of any potential concerns (if any): e.g., 'Employment gap 1 year', 'Frequent job hopping', 'Missing required certification'"],
"employment_stability_score": "Integer Score 0-100 (Higher is more stable/longer tenure) (if possible)",
"transferable_skills_narrative": "جملة موجزة تصف أهمية الخبرة غير الأساسية (إذا انطبقت).",
"cultural_fit_keywords": ["A list of 3-5 keywords extracted from the resume (if possible) (e.g., 'team-player', 'mentored', 'cross-functional')"]
}}
If a top-level key or its required fields are missing, set the field to null, an empty list, or an empty object as appropriate.
Be Clear and Direct Avoid overly indirect politeness which can add confusion.
Be strict,objective and concise and critical in your responses, and don't give inflated scores to weak candidates.
Output only valid JSON—no markdown, no extra text.
"""
try:
# Call the AI handler
result = ai_handler(prompt)
if result['status'] == 'error':
logger.error(f"AI handler returned error for candidate {instance.pk}")
print(f"AI handler returned error for candidate {instance.pk}")
return
# Ensure the result is parsed as a Python dict
data = result['data']
if isinstance(data, str):
data = json.loads(data)
print(data)
except Exception as e:
logger.error(f"AI handler failed for candidate {instance.pk}: {e}")
print(f"AI handler failed for candidate {instance.pk}: {e}")
return
# --- 4. Atomic Database Update (Ensures data integrity) ---
with transaction.atomic():
# 2. Update the Full JSON Field (ai_analysis_data)
if instance.ai_analysis_data is None:
instance.ai_analysis_data = {}
# Save all four structured outputs into the single JSONField
instance.ai_analysis_data = data
instance.is_resume_parsed = True
# Save changes to the database
# NOTE: If you extract individual fields (like match_score) to separate columns,
# ensure those are handled here, using data.get('analysis_data_en', {}).get('match_score').
instance.save(update_fields=['ai_analysis_data', 'is_resume_parsed'])
logger.info(f"Successfully scored and saved analysis (EN/AR) for candidate {instance.id}")
print(f"Successfully scored and saved analysis (EN/AR) for candidate {instance.id}")
from django.utils import timezone
def create_interview_and_meeting(schedule_id):
"""
Synchronous task for a single interview slot, dispatched by django-q.
"""
try:
schedule = ScheduledInterview.objects.get(pk=schedule_id)
interview = schedule.interview
print("creating zoooooooooooooooooooooooooooooooooooooom meeting")
result = create_zoom_meeting(interview.topic, interview.start_time, interview.duration)
if result["status"] == "success":
interview.meeting_id = result["meeting_details"]["meeting_id"]
interview.details_url = result["meeting_details"]["join_url"]
interview.zoom_gateway_response = result["zoom_gateway_response"]
interview.host_email = result["meeting_details"]["host_email"]
interview.password = result["meeting_details"]["password"]
interview.save()
logger.info(f"Successfully scheduled interview for {Application.name}")
return True
else:
# Handle Zoom API failure (e.g., log it or notify administrator)
logger.error(f"Zoom API failed for {Application.name}: {result['message']}")
return False # Task failed
except Exception as e:
# Catch any unexpected errors during database lookups or processing
logger.error(f"Critical error scheduling interview: {e}")
return False # Task failed
def handle_zoom_webhook_event(payload):
"""
Background task to process a Zoom webhook event and update the local ZoomMeeting status.
It handles: created, updated, started, ended, and deleted events.
"""
event_type = payload.get('event')
object_data = payload['payload']['object']
# Zoom often uses a long 'id' for the scheduled meeting and sometimes a 'uuid'.
# We rely on the unique 'id' that maps to your ZoomMeeting.meeting_id field.
meeting_id_zoom = str(object_data.get('id'))
if not meeting_id_zoom:
logger.warning(f"Webhook received without a valid Meeting ID: {event_type}")
return False
try:
# Use filter().first() to avoid exceptions if the meeting doesn't exist yet,
# and to simplify the logic flow.
meeting_instance = ''#TODO:update #ZoomMeetingDetails.objects.filter(meeting_id=meeting_id_zoom).first()
print(meeting_instance)
# --- 1. Creation and Update Events ---
if event_type == 'meeting.updated':
if meeting_instance:
# Update key fields from the webhook payload
meeting_instance.topic = object_data.get('topic', meeting_instance.topic)
# Check for and update status and time details
# if event_type == 'meeting.created':
# meeting_instance.status = 'scheduled'
# elif event_type == 'meeting.updated':
# Only update time fields if they are in the payload
print(object_data)
meeting_instance.start_time = object_data.get('start_time', meeting_instance.start_time)
meeting_instance.duration = object_data.get('duration', meeting_instance.duration)
meeting_instance.timezone = object_data.get('timezone', meeting_instance.timezone)
meeting_instance.status = object_data.get('status', meeting_instance.status)
meeting_instance.save(update_fields=['topic', 'start_time', 'duration', 'timezone', 'status'])
# --- 2. Status Change Events (Start/End) ---
elif event_type == 'meeting.started':
if meeting_instance:
meeting_instance.status = 'started'
meeting_instance.save(update_fields=['status'])
elif event_type == 'meeting.ended':
if meeting_instance:
meeting_instance.status = 'ended'
meeting_instance.save(update_fields=['status'])
# --- 3. Deletion Event (User Action) ---
elif event_type == 'meeting.deleted':
if meeting_instance:
try:
meeting_instance.status = 'cancelled'
meeting_instance.save(update_fields=['status'])
except Exception as e:
logger.error(f"Failed to mark Zoom meeting as cancelled: {e}")
return True
except Exception as e:
logger.error(f"Failed to process Zoom webhook for {event_type} (ID: {meeting_id_zoom}): {e}", exc_info=True)
return False
def linkedin_post_task(job_slug, access_token):
# for linked post background tasks
job=get_object_or_404(JobPosting,slug=job_slug)
try:
service=LinkedInService()
service.access_token=access_token
# long running task
result=service.create_job_post(job)
#update the jobposting object with the final result
if result['success']:
job.posted_to_linkedin=True
job.linkedin_post_id=result['post_id']
job.linkedin_post_url=result['post_url']
job.linkedin_post_status='SUCCESSS'
job.linkedin_posted_at=timezone.now()
else:
error_msg=result.get('error',"Unknown API error")
job.linkedin_post_status = 'FAILED'
logger.error(f"LinkedIn post failed for job {job_slug}: {error_msg}")
job.save()
return result['success']
except Exception as e:
logger.error(f"Critical error in LinkedIn task for job {job_slug}: {e}", exc_info=True)
# Update job status with the critical error
job.linkedin_post_status = f"CRITICAL_ERROR: {str(e)}"
job.save()
return False
def form_close(job_id):
job = get_object_or_404(JobPosting, pk=job_id)
job.is_active = False
job.template_form.is_active = False
job.save()
#TODO:send email to admins
def sync_hired_candidates_task(job_slug):
"""
Django-Q background task to sync hired candidates to all configured sources.
Args:
job_slug (str): The slug of the job posting
Returns:
dict: Sync results with status and details
"""
from .models import JobPosting, IntegrationLog
logger.info(f"Starting background sync task for job: {job_slug}")
job = JobPosting.objects.get(slug=job_slug)
source = job.source
if source.sync_status == "DISABLED":
logger.warning(f"Source {source.name} is disabled. Aborting sync for job {job_slug}.")
return {"status": "error", "message": "Source is disabled"}
source.sync_status = "SYNCING"
source.save(update_fields=['sync_status'])
# Prepare and send the sync request
try:
request_data = {"internal_job_id": job.internal_job_id, "data": job.source_sync_data}
results = requests.post(
url=source.sync_endpoint,
headers=source.custom_headers,
json=request_data,
timeout=30
)
# response_data = results.json()
if results.status_code == 200:
IntegrationLog.objects.create(
source=source,
action=IntegrationLog.ActionChoices.SYNC,
endpoint=source.sync_endpoint,
method="POST",
request_data=request_data,
status_code=results.status_code,
ip_address="127.0.0.1",
user_agent="",
)
source.last_sync_at = timezone.now()
source.sync_status = "SUCCESS"
source.save(update_fields=['last_sync_at', 'sync_status'])
logger.info(f"Background sync completed for job {job_slug}: {results}")
return results
else:
error_msg = f"Source API returned status {results.status_code}: {results.text}"
logger.error(error_msg)
IntegrationLog.objects.create(
source=source,
action=IntegrationLog.ActionChoices.ERROR,
endpoint=source.sync_endpoint,
method="POST",
request_data={"message": "Failed to sync hired candidates", "internal_job_id": job.internal_job_id},
error_message=error_msg,
status_code="ERROR",
ip_address="127.0.0.1",
user_agent=""
)
source.sync_status = "ERROR"
source.save(update_fields=['sync_status'])
return {"status": "error", "message": error_msg}
except Exception as e:
error_msg = f"Unexpected error during sync: {str(e)}"
logger.error(error_msg, exc_info=True)
IntegrationLog.objects.create(
source=source,
action=IntegrationLog.ActionChoices.ERROR,
endpoint=source.sync_endpoint,
method="POST",
request_data={"status": "error"},
error_message=error_msg,
status_code="ERROR",
ip_address="127.0.0.1",
user_agent=""
)
source.sync_status = "ERROR"
source.save(update_fields=['sync_status'])
# def sync_candidate_to_source_task(candidate_id, source_id):
# """
# Django-Q background task to sync a single candidate to a specific source.
# Args:
# candidate_id (int): The ID of the candidate
# source_id (int): The ID of the source
# Returns:
# dict: Sync result for this specific candidate-source pair
# """
# from .candidate_sync_service import CandidateSyncService
# from .models import Application, Source, IntegrationLog
# logger.info(f"Starting sync task for candidate {candidate_id} to source {source_id}")
# try:
# # Get the candidate and source
# application = Application.objects.get(pk=candidate_id)
# source = Source.objects.get(pk=source_id)
# # Initialize sync service
# sync_service = CandidateSyncService()
# # Perform the sync operation
# result = sync_service.sync_candidate_to_source(application, source)
# # Log the operation
# IntegrationLog.objects.create(
# source=source,
# action=IntegrationLog.ActionChoices.SYNC,
# endpoint=source.sync_endpoint or "unknown",
# method=source.sync_method or "POST",
# request_data={"candidate_id": candidate_id, "application_name": application.name},
# response_data=result,
# status_code="SUCCESS" if result.get('success') else "ERROR",
# error_message=result.get('error') if not result.get('success') else None,
# ip_address="127.0.0.1",
# user_agent="Django-Q Background Task",
# processing_time=result.get('duration', 0)
# )
# logger.info(f"Sync completed for candidate {candidate_id} to source {source_id}: {result}")
# return result
# except Application.DoesNotExist:
# error_msg = f"Application not found: {candidate_id}"
# logger.error(error_msg)
# return {"success": False, "error": error_msg}
# except Source.DoesNotExist:
# error_msg = f"Source not found: {source_id}"
# logger.error(error_msg)
# return {"success": False, "error": error_msg}
# except Exception as e:
# error_msg = f"Unexpected error during sync: {str(e)}"
# logger.error(error_msg, exc_info=True)
# return {"success": False, "error": error_msg}
from django.conf import settings
from django.core.mail import EmailMultiAlternatives
from django.utils.html import strip_tags
def _task_send_individual_email(subject, body_message, recipient, attachments,sender,job):
"""Internal helper to create and send a single email."""
from_email = getattr(settings, 'DEFAULT_FROM_EMAIL', 'noreply@kaauh.edu.sa')
is_html = '<' in body_message and '>' in body_message
if is_html:
plain_message = strip_tags(body_message)
email_obj = EmailMultiAlternatives(subject=subject, body=plain_message, from_email=from_email, to=[recipient])
email_obj.attach_alternative(body_message, "text/html")
else:
email_obj = EmailMultiAlternatives(subject=subject, body=body_message, from_email=from_email, to=[recipient])
if attachments:
for attachment in attachments:
if isinstance(attachment, tuple) and len(attachment) == 3:
filename, content, content_type = attachment
email_obj.attach(filename, content, content_type)
try:
result=email_obj.send(fail_silently=False)
if result==1 and sender and job: # job is none when email sent after message creation
try:
user=get_object_or_404(User,email=recipient)
new_message = Message.objects.create(
sender=sender,
recipient=user,
job=job,
subject=subject,
content=body_message, # Store the full HTML or plain content
message_type='DIRECT',
is_read=False, # It's just sent, not read yet
)
logger.info(f"Stored sent message ID {new_message.id} in DB.")
except Exception as e:
logger.error(f"Email sent to {recipient}, but failed to store in DB: {str(e)}")
return result == 1
except Exception as e:
logger.error(f"Failed to send email to {recipient}: {str(e)}", exc_info=True)
def send_bulk_email_task(subject, customized_sends,attachments=None,sender_user_id=None,job_id=None, hook='recruitment.tasks.email_success_hook'):
"""
Django-Q background task to send pre-formatted email to a list of recipients.,
Receives arguments directly from the async_task call.
"""
print("jhjmfhsdjhfksjhdkfjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjh")
logger.info(f"Starting bulk email task for {len(customized_sends)} recipients")
successful_sends = 0
total_recipients = len(customized_sends)
if not customized_sends:
return {'success': False, 'error': 'No recipients provided to task.'}
sender=get_object_or_404(User,pk=sender_user_id)
job=get_object_or_404(JobPosting,pk=job_id)
# Since the async caller sends one task per recipient, total_recipients should be 1.
for recipient_email, custom_message in customized_sends:
# The 'message' is the custom message specific to this recipient.
r=_task_send_individual_email(subject, custom_message, recipient_email, attachments,sender,job)
print(f"Email send result for {recipient_email}: {r}")
if r:
successful_sends += 1
print(f"successful_sends: {successful_sends} out of {total_recipients}")
if successful_sends > 0:
logger.info(f"Bulk email task completed successfully. Sent to {successful_sends}/{total_recipients} recipients.")
return {
'success': True,
'recipients_count': successful_sends,
'message': f"Sent successfully to {successful_sends} recipient(s)."
}
else:
logger.error(f"Bulk email task failed: No emails were sent successfully.")
return {'success': False, 'error': "No emails were sent successfully in the background task."}
def email_success_hook(task):
"""
The success hook must accept the Task object as the first and only required positional argument.
"""
if task.success:
logger.info(f"Task ID {task.id} succeeded. Result: {task.result}")
else:
logger.error(f"Task ID {task.id} failed. Error: {task.result}")
import io
import zipfile
import os
from django.core.files.base import ContentFile
from django.conf import settings
from .models import Application, JobPosting # Import your models
ALLOWED_EXTENSIONS = (".pdf", ".docx")
def generate_and_save_cv_zip(job_posting_id):
"""
Generates a zip file of all CVs for a job posting and saves it to the job model.
"""
job = JobPosting.objects.get(id=job_posting_id)
entries = Application.objects.filter(job=job)
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
for entry in entries:
if not entry.resume:
continue
file_name = entry.resume.name.split("/")[-1]
file_name_lower = file_name.lower()
if file_name_lower.endswith(ALLOWED_EXTENSIONS):
try:
with entry.resume.open("rb") as file_obj:
file_content = file_obj.read()
zf.writestr(file_name, file_content)
except Exception as e:
# Log the error using Django's logging system if set up
print(f"Error processing file {file_name}: {e}")
continue
# 4. Save the generated zip buffer to the JobPosting model
zip_buffer.seek(0)
now = str(timezone.now())
zip_filename = f"all_cvs_for_{job.slug}_{job.title}_{now}.zip"
# Use ContentFile to save the bytes stream into the FileField
job.cv_zip_file.save(zip_filename, ContentFile(zip_buffer.read()))
job.zip_created = True # Assuming you added a BooleanField for tracking completion
job.save()
return f"Successfully created zip for Job ID {job.slug} {job_posting_id}"
def send_one_day_reminder(job_id):
"""
Send email reminder 1 day before job application deadline.
"""
try:
job = JobPosting.objects.get(pk=job_id)
# Only send if job is still active
if job.status != 'ACTIVE':
logger.info(f"Job {job_id} is no longer active, skipping 1-day reminder")
return
# Get application count
application_count = Application.objects.filter(job=job).count()
# Determine recipients
recipients = []
if job.assigned_to:
recipients.append(job.assigned_to.email)
# Add admin users as fallback or additional recipients
admin_users = User.objects.filter(is_staff=True)
if not recipients: # If no assigned user, send to all admins
recipients = [admin.email for admin in admin_users]
if not recipients:
logger.warning(f"No recipients found for job {job_id} 1-day reminder")
return
# Create email content
subject = f"Reminder: Job '{job.title}' closes tomorrow"
html_message = f"""
Job Closing Reminder
Job Title: {job.title}
Application Deadline: {job.application_deadline.strftime('%B %d, %Y')}
Current Applications: {application_count}
Status: {job.get_status_display()}
This job posting will close tomorrow. Please review any pending applications before the deadline.
View Job Details
This is an automated reminder from the KAAUH Recruitment System.
"""
# Send email to each recipient
for recipient_email in recipients:
_task_send_individual_email(subject, html_message, recipient_email, None, None, None)
logger.info(f"Sent 1-day reminder for job {job_id} to {len(recipients)} recipients")
except JobPosting.DoesNotExist:
logger.error(f"Job {job_id} not found for 1-day reminder")
except Exception as e:
logger.error(f"Error sending 1-day reminder for job {job_id}: {str(e)}")
def send_fifteen_minute_reminder(job_id):
"""
Send final email reminder 15 minutes before job application deadline.
"""
try:
job = JobPosting.objects.get(pk=job_id)
# Only send if job is still active
if job.status != 'ACTIVE':
logger.info(f"Job {job_id} is no longer active, skipping 15-minute reminder")
return
# Get application count
application_count = Application.objects.filter(job=job).count()
# Determine recipients
recipients = []
if job.assigned_to:
recipients.append(job.assigned_to.email)
# Add admin users as fallback or additional recipients
admin_users = User.objects.filter(is_staff=True)
if not recipients: # If no assigned user, send to all admins
recipients = [admin.email for admin in admin_users]
if not recipients:
logger.warning(f"No recipients found for job {job_id} 15-minute reminder")
return
# Create email content
subject = f"FINAL REMINDER: Job '{job.title}' closes in 15 minutes"
html_message = f"""
⚠️ FINAL REMINDER
Job Title: {job.title}
Application Deadline: {job.application_deadline.strftime('%B %d, %Y at %I:%M %p')}
Current Applications: {application_count}
Status: {job.get_status_display()}
This job posting will close in 15 minutes. This is your final reminder to review any pending applications.
View Job Details Now
This is an automated final reminder from the KAAUH Recruitment System.
"""
# Send email to each recipient
for recipient_email in recipients:
_task_send_individual_email(subject, html_message, recipient_email, None, None, None)
logger.info(f"Sent 15-minute reminder for job {job_id} to {len(recipients)} recipients")
except JobPosting.DoesNotExist:
logger.error(f"Job {job_id} not found for 15-minute reminder")
except Exception as e:
logger.error(f"Error sending 15-minute reminder for job {job_id}: {str(e)}")
def send_job_closed_notification(job_id):
"""
Send notification when job has closed and update job status.
"""
try:
job = JobPosting.objects.get(pk=job_id)
# Only proceed if job is currently active
if job.status != 'ACTIVE':
logger.info(f"Job {job_id} is already not active, skipping closed notification")
return
# Get final application count
application_count = Application.objects.filter(job=job).count()
# Update job status to closed
job.status = 'CLOSED'
job.save(update_fields=['status'])
# Also close the form template
if job.template_form:
job.template_form.is_active = False
job.template_form.save(update_fields=['is_active'])
# Determine recipients
recipients = []
if job.assigned_to:
recipients.append(job.assigned_to.email)
# Add admin users as fallback or additional recipients
admin_users = User.objects.filter(is_staff=True)
if not recipients: # If no assigned user, send to all admins
recipients = [admin.email for admin in admin_users]
if not recipients:
logger.warning(f"No recipients found for job {job_id} closed notification")
return
# Create email content
subject = f"Job '{job.title}' has closed - {application_count} applications received"
html_message = f"""
Job Closed Notification
Job Title: {job.title}
Application Deadline: {job.application_deadline.strftime('%B %d, %Y at %I:%M %p')}
Total Applications Received: {application_count}
Status: {job.get_status_display()}
The job posting has been automatically closed and is no longer accepting applications.
View Job Details
View Applications
This is an automated notification from the KAAUH Recruitment System.
"""
# Send email to each recipient
for recipient_email in recipients:
_task_send_individual_email(subject, html_message, recipient_email, None, None, None)
logger.info(f"Sent job closed notification for job {job_id} to {len(recipients)} recipients")
except JobPosting.DoesNotExist:
logger.error(f"Job {job_id} not found for closed notification")
except Exception as e:
logger.error(f"Error sending job closed notification for job {job_id}: {str(e)}")