561 lines
24 KiB
Python
561 lines
24 KiB
Python
import re
|
||
import os
|
||
import json
|
||
import logging
|
||
import requests
|
||
from PyPDF2 import PdfReader
|
||
from datetime import datetime
|
||
from django.db import transaction
|
||
from .utils import create_zoom_meeting
|
||
from recruitment.models import Candidate
|
||
from . linkedin_service import LinkedInService
|
||
from django.shortcuts import get_object_or_404
|
||
from . models import JobPosting
|
||
from django.utils import timezone
|
||
from . models import InterviewSchedule,ScheduledInterview,ZoomMeeting
|
||
|
||
# Add python-docx import for Word document processing
|
||
try:
|
||
from docx import Document
|
||
DOCX_AVAILABLE = True
|
||
except ImportError:
|
||
DOCX_AVAILABLE = False
|
||
logger = logging.getLogger(__name__)
|
||
logger.warning("python-docx not available. Word document processing will be disabled.")
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
OPENROUTER_API_KEY ='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'
|
||
OPENROUTER_MODEL = 'qwen/qwen-2.5-72b-instruct:free'
|
||
|
||
# OPENROUTER_MODEL = 'openai/gpt-oss-20b:free'
|
||
# OPENROUTER_MODEL = 'openai/gpt-oss-20b'
|
||
# OPENROUTER_MODEL = 'mistralai/mistral-small-3.2-24b-instruct:free'
|
||
|
||
# from google import genai
|
||
|
||
# client = genai.Client(api_key="AIzaSyDkwYmvRe5ieTjQi1ClSzD5z5roTwaFsmY")
|
||
|
||
# def google_ai(text):
|
||
# response = client.models.generate_content(
|
||
# model="gemini-2.5-flash", contents=text
|
||
# )
|
||
# return response
|
||
|
||
|
||
if not OPENROUTER_API_KEY:
|
||
logger.warning("OPENROUTER_API_KEY not set. Resume scoring will be skipped.")
|
||
|
||
def extract_text_from_pdf(file_path):
|
||
"""Extract text from PDF files"""
|
||
print("PDF text extraction")
|
||
text = ""
|
||
try:
|
||
with open(file_path, "rb") as f:
|
||
reader = PdfReader(f)
|
||
for page in reader.pages:
|
||
text += (page.extract_text() or "")
|
||
except Exception as e:
|
||
logger.error(f"PDF extraction failed: {e}")
|
||
raise
|
||
return text.strip()
|
||
|
||
def extract_text_from_word(file_path):
|
||
"""Extract text from Word documents (.docx)"""
|
||
if not DOCX_AVAILABLE:
|
||
raise ImportError("python-docx is not installed. Please install it with: pip install python-docx")
|
||
|
||
print("Word text extraction")
|
||
text = ""
|
||
try:
|
||
doc = Document(file_path)
|
||
|
||
# Extract text from paragraphs
|
||
for paragraph in doc.paragraphs:
|
||
text += paragraph.text + "\n"
|
||
|
||
# Extract text from tables
|
||
for table in doc.tables:
|
||
for row in table.rows:
|
||
for cell in row.cells:
|
||
text += cell.text + "\t"
|
||
text += "\n"
|
||
|
||
# Extract text from headers and footers
|
||
for section in doc.sections:
|
||
# Header
|
||
if section.header:
|
||
for paragraph in section.header.paragraphs:
|
||
text += "[HEADER] " + paragraph.text + "\n"
|
||
|
||
# Footer
|
||
if section.footer:
|
||
for paragraph in section.footer.paragraphs:
|
||
text += "[FOOTER] " + paragraph.text + "\n"
|
||
|
||
except Exception as e:
|
||
logger.error(f"Word extraction failed: {e}")
|
||
raise
|
||
return text.strip()
|
||
|
||
def extract_text_from_document(file_path):
|
||
"""Extract text from documents based on file type"""
|
||
if not os.path.exists(file_path):
|
||
raise FileNotFoundError(f"File not found: {file_path}")
|
||
|
||
file_ext = os.path.splitext(file_path)[1].lower()
|
||
|
||
if file_ext == '.pdf':
|
||
return extract_text_from_pdf(file_path)
|
||
elif file_ext == '.docx':
|
||
return extract_text_from_word(file_path)
|
||
else:
|
||
raise ValueError(f"Unsupported file type: {file_ext}. Only .pdf and .docx files are supported.")
|
||
|
||
def format_job_description(pk):
|
||
job_posting = JobPosting.objects.get(pk=pk)
|
||
print(job_posting)
|
||
prompt = f"""
|
||
Can you please organize and format this unformatted job description and qualifications into clear, readable sections using headings and bullet points?
|
||
Format the Content: You need to convert the clear, formatted job description and qualifications into a 2 blocks of HTML code.
|
||
**JOB DESCRIPTION:**
|
||
{job_posting.description}
|
||
|
||
**QUALIFICATIONS:**
|
||
{job_posting.qualifications}
|
||
|
||
**STRICT JSON OUTPUT INSTRUCTIONS:**
|
||
Output a single, valid JSON object with ONLY the following two top-level keys:
|
||
|
||
'job_description': 'A HTML containing the formatted job description',
|
||
'job_qualifications': 'A HTML containing the formatted job qualifications',
|
||
|
||
|
||
Do not include any other text except for the JSON output.
|
||
"""
|
||
result = ai_handler(prompt)
|
||
|
||
if result['status'] == 'error':
|
||
logger.error(f"AI handler returned error for candidate {job_posting.pk}")
|
||
print(f"AI handler returned error for candidate {job_posting.pk}")
|
||
return
|
||
data = result['data']
|
||
if isinstance(data, str):
|
||
data = json.loads(data)
|
||
print(data)
|
||
|
||
job_posting.description = data.get('job_description')
|
||
job_posting.qualifications = data.get('job_qualifications')
|
||
job_posting.save(update_fields=['description', 'qualifications'])
|
||
|
||
|
||
def ai_handler(prompt):
|
||
print("model call")
|
||
response = requests.post(
|
||
url="https://openrouter.ai/api/v1/chat/completions",
|
||
headers={
|
||
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
|
||
"Content-Type": "application/json",
|
||
},
|
||
data=json.dumps({
|
||
"model": OPENROUTER_MODEL,
|
||
"messages": [{"role": "user", "content": prompt}],
|
||
},
|
||
)
|
||
)
|
||
res = {}
|
||
print(response.status_code)
|
||
if response.status_code == 200:
|
||
res = response.json()
|
||
content = res["choices"][0]['message']['content']
|
||
try:
|
||
# print(content)
|
||
content = content.replace("```json","").replace("```","")
|
||
res = json.loads(content)
|
||
print("success response")
|
||
return {"status": "success", "data": res}
|
||
except Exception as e:
|
||
print(e)
|
||
return {"status": "error", "data": str(e)}
|
||
else:
|
||
print("error response")
|
||
return {"status": "error", "data": response.json()}
|
||
|
||
|
||
def safe_cast_to_float(value, default=0.0):
|
||
"""Safely converts a value (int, float, or string) to a float."""
|
||
if isinstance(value, (int, float)):
|
||
return float(value)
|
||
if isinstance(value, str):
|
||
# Remove non-numeric characters except the decimal point
|
||
cleaned_value = re.sub(r'[^\d.]', '', value)
|
||
try:
|
||
# Ensure we handle empty strings after cleaning
|
||
return float(cleaned_value) if cleaned_value else default
|
||
except ValueError:
|
||
return default
|
||
return default
|
||
|
||
def handle_reume_parsing_and_scoring(pk):
|
||
"""
|
||
Optimized Django-Q task to parse a resume, score the candidate against a job,
|
||
and atomically save the results.
|
||
"""
|
||
|
||
# --- 1. Robust Object Retrieval (Prevents looping on DoesNotExist) ---
|
||
try:
|
||
instance = Candidate.objects.get(pk=pk)
|
||
except Candidate.DoesNotExist:
|
||
# Exit gracefully if the candidate was deleted after the task was queued
|
||
logger.warning(f"Candidate matching query does not exist for pk={pk}. Exiting task.")
|
||
print(f"Candidate matching query does not exist for pk={pk}. Exiting task.")
|
||
return
|
||
|
||
logger.info(f"Scoring resume for candidate {pk}")
|
||
print(f"Scoring resume for candidate {pk}")
|
||
|
||
# --- 2. I/O and Initial Data Check ---
|
||
try:
|
||
file_path = instance.resume.path
|
||
if not os.path.exists(file_path):
|
||
logger.warning(f"Resume file not found: {file_path}")
|
||
print(f"Resume file not found: {file_path}")
|
||
# Consider marking the task as unsuccessful but don't re-queue
|
||
return
|
||
|
||
# Use the new unified document parser
|
||
resume_text = extract_text_from_document(file_path)
|
||
job_detail = f"{instance.job.description} {instance.job.qualifications}"
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}")
|
||
print(f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}")
|
||
return
|
||
|
||
# --- 3. Single, Combined LLM Prompt (Major Cost & Latency Optimization) ---
|
||
prompt = f"""
|
||
You are an expert AI system functioning as both a Resume Parser and a Technical Recruiter.
|
||
|
||
Your task is to:
|
||
1. **PARSE**: Extract all key-value information from the provided RESUME TEXT into a clean JSON structure under the key 'resume_data', preserving the original text and it's formatting and dont add any extra text.
|
||
2. **SCORE**: Analyze the parsed data against the JOB CRITERIA and generate a comprehensive score and analysis under the key 'analysis_data'.
|
||
|
||
**JOB CRITERIA:**
|
||
{job_detail}
|
||
|
||
**RESUME TEXT:**
|
||
{resume_text}
|
||
|
||
**STRICT JSON OUTPUT INSTRUCTIONS:**
|
||
Output a single, valid JSON object with ONLY the following two top-level keys:
|
||
|
||
|
||
1. "resume_data": {{
|
||
"full_name": "Full name of the candidate",
|
||
"current_title": "Most recent or current job title",
|
||
"location": "City and state",
|
||
"contact": "Phone number and email",
|
||
"linkedin": "LinkedIn profile URL",
|
||
"github": "GitHub or portfolio URL",
|
||
"summary": "Brief professional profile or summary (1–2 sentences)",
|
||
"education": [{{
|
||
"institution": "Institution name",
|
||
"degree": "Degree name",
|
||
"year": "Year of graduation" (if provided) or '',
|
||
"gpa": "GPA (if provided)",
|
||
"relevant_courses": ["list", "of", "courses"](if provided) or []
|
||
}}],
|
||
"skills": {{
|
||
"category_1": ["skill_a", "skill_b"],
|
||
"uncategorized": ["tool_x"]
|
||
}},
|
||
"experience": [{{
|
||
"company": "Company name",
|
||
"job_title": "Job Title",
|
||
"location": "Location",
|
||
"start_date": "YYYY-MM",
|
||
"end_date": "YYYY-MM or Present",
|
||
"key_achievements": ["concise bullet points"] (if provided) or []
|
||
}}],
|
||
"projects": [{{
|
||
"name": "Project name",
|
||
"year": "Year",
|
||
"technologies_used": ["list", "of", "tech"] (if provided) or [],
|
||
"brief_description": "description"
|
||
}}]
|
||
}}
|
||
|
||
2. "analysis_data": {{
|
||
"match_score": "Integer Score 0-100",
|
||
"strengths": "Brief summary of strengths",
|
||
"weaknesses": "Brief summary of weaknesses",
|
||
"years_of_experience": "Total years of experience (float, e.g., 6.5)",
|
||
"criteria_checklist": List of job requirements if any {{ "Python": "Met", "AWS": "Not Met"}} only output the criteria_checklist in one of ('Met','Not Met') don't output any extra text,
|
||
"category": "Most fitting professional field (e.g., Data Science), only output the category name and no other text example ('Software Development', 'correct') , ('Software Development and devops','wrong') ('Software Development / Backend Development','wrong')",
|
||
"most_recent_job_title": "Candidate's most recent job title",
|
||
"recommendation": "Detailed hiring recommendation narrative",
|
||
"top_3_keywords": ["keyword1", "keyword2", "keyword3"],
|
||
"job_fit_narrative": "Single, concise summary sentence",
|
||
"language_fluency": ["language: fluency_level"],
|
||
"screening_stage_rating": "Standardized rating (Highly Qualified, Qualified , Partially Qualified, Not Qualified)",
|
||
"min_req_met_bool": "Boolean (true/false)",
|
||
"soft_skills_score": "Integer Score 0-100 for inferred non-technical skills",
|
||
"experience_industry_match": "Integer Score 0-100 for industry relevance",
|
||
"seniority_level_match": "Integer Score 0-100 for alignment with JD's seniority level",
|
||
"red_flags": ["List of any potential concerns (if any): e.g., 'Employment gap 1 year', 'Frequent job hopping', 'Missing required certification'"],
|
||
"employment_stability_score": "Integer Score 0-100 (Higher is more stable/longer tenure) (if possible)",
|
||
"transferable_skills_narrative": "A brief sentence describing the relevance of non-core experience (if applicable).",
|
||
"cultural_fit_keywords": ["A list of 3-5 keywords extracted from the resume (if possible) (e.g., 'team-player', 'mentored', 'cross-functional')"]
|
||
}}
|
||
|
||
If a top-level key or its required fields are missing, set the field to null, an empty list, or an empty object as appropriate.
|
||
|
||
Output only valid JSON—no markdown, no extra text.
|
||
"""
|
||
|
||
try:
|
||
result = ai_handler(prompt)
|
||
if result['status'] == 'error':
|
||
logger.error(f"AI handler returned error for candidate {instance.pk}")
|
||
print(f"AI handler returned error for candidate {instance.pk}")
|
||
return
|
||
# Ensure the result is parsed as a Python dict (if ai_handler returns a JSON string)
|
||
data = result['data']
|
||
if isinstance(data, str):
|
||
data = json.loads(data)
|
||
print(data)
|
||
|
||
# parsed_summary = data.get('parsed_data', {})
|
||
# scoring_result = data.get('scoring_data', {})
|
||
|
||
except Exception as e:
|
||
logger.error(f"AI handler failed for candidate {instance.pk}: {e}")
|
||
print(f"AI handler failed for candidate {instance.pk}: {e}")
|
||
return
|
||
|
||
# --- 4. Atomic Database Update (Ensures data integrity) ---
|
||
with transaction.atomic():
|
||
|
||
# Map JSON keys to model fields with appropriate defaults
|
||
# update_map = {
|
||
# 'match_score': ('match_score', 0),
|
||
# 'years_of_experience': ('years_of_experience', 0.0),
|
||
# 'soft_skills_score': ('soft_skills_score', 0),
|
||
# 'experience_industry_match': ('experience_industry_match', 0),
|
||
|
||
# 'min_req_met_bool': ('min_req_met_bool', False),
|
||
# 'screening_stage_rating': ('screening_stage_rating', 'N/A'),
|
||
# 'most_recent_job_title': ('most_recent_job_title', 'N/A'),
|
||
# 'top_3_keywords': ('top_3_keywords', []),
|
||
|
||
# 'strengths': ('strengths', ''),
|
||
# 'weaknesses': ('weaknesses', ''),
|
||
# 'job_fit_narrative': ('job_fit_narrative', ''),
|
||
# 'recommendation': ('recommendation', ''),
|
||
|
||
# 'criteria_checklist': ('criteria_checklist', {}),
|
||
# 'language_fluency': ('language_fluency', []),
|
||
# 'category': ('category', 'N/A'),
|
||
# }
|
||
|
||
# Apply scoring results to the instance
|
||
# for model_field, (json_key, default_value) in update_map.items():
|
||
# instance.ai_analysis_data[model_field] = scoring_result.get(json_key, default_value)
|
||
# instance.set_field(model_field, scoring_result.get(json_key, default_value))
|
||
# instance.set_field("match_score" , int(safe_cast_to_float(scoring_result.get('match_score', 0), default=0)))
|
||
# instance.set_field("years_of_experience" , safe_cast_to_float(scoring_result.get('years_of_experience', 0.0)))
|
||
# instance.set_field("soft_skills_score" , int(safe_cast_to_float(scoring_result.get('soft_skills_score', 0), default=0)))
|
||
# instance.set_field("experience_industry_match" , int(safe_cast_to_float(scoring_result.get('experience_industry_match', 0), default=0)))
|
||
|
||
# # Other Model Fields
|
||
# instance.set_field("min_req_met_bool" , scoring_result.get('min_req_met_bool', False))
|
||
# instance.set_field("screening_stage_rating" , scoring_result.get('screening_stage_rating', 'N/A'))
|
||
# instance.set_field("category" , scoring_result.get('category', 'N/A'))
|
||
# instance.set_field("most_recent_job_title" , scoring_result.get('most_recent_job_title', 'N/A'))
|
||
# instance.set_field("top_3_keywords" , scoring_result.get('top_3_keywords', []))
|
||
# instance.set_field("strengths" , scoring_result.get('strengths', ''))
|
||
# instance.set_field("weaknesses" , scoring_result.get('weaknesses', ''))
|
||
# instance.set_field("job_fit_narrative" , scoring_result.get('job_fit_narrative', ''))
|
||
# instance.set_field("recommendation" , scoring_result.get('recommendation', ''))
|
||
# instance.set_field("criteria_checklist" , scoring_result.get('criteria_checklist', {}))
|
||
# instance.set_field("language_fluency" , scoring_result.get('language_fluency', []))
|
||
|
||
|
||
# 2. Update the Full JSON Field (ai_analysis_data)
|
||
if instance.ai_analysis_data is None:
|
||
instance.ai_analysis_data = {}
|
||
|
||
# Save both structured outputs into the single JSONField for completeness
|
||
instance.ai_analysis_data = data
|
||
# instance.ai_analysis_data['parsed_data'] = parsed_summary
|
||
# instance.ai_analysis_data['scoring_data'] = scoring_result
|
||
|
||
# Apply parsing results
|
||
# instance.parsed_summary = json.dumps(parsed_summary)
|
||
instance.is_resume_parsed = True
|
||
|
||
instance.save(update_fields=['ai_analysis_data', 'is_resume_parsed'])
|
||
|
||
logger.info(f"Successfully scored and saved analysis for candidate {instance.id}")
|
||
print(f"Successfully scored and saved analysis for candidate {instance.id}")
|
||
|
||
def create_interview_and_meeting(
|
||
candidate_id,
|
||
job_id,
|
||
schedule_id,
|
||
slot_date,
|
||
slot_time,
|
||
duration
|
||
):
|
||
"""
|
||
Synchronous task for a single interview slot, dispatched by django-q.
|
||
"""
|
||
try:
|
||
candidate = Candidate.objects.get(pk=candidate_id)
|
||
job = JobPosting.objects.get(pk=job_id)
|
||
schedule = InterviewSchedule.objects.get(pk=schedule_id)
|
||
|
||
interview_datetime = datetime.combine(slot_date, slot_time)
|
||
meeting_topic = f"Interview for {job.title} - {candidate.name}"
|
||
|
||
# 1. External API Call (Slow)
|
||
result = create_zoom_meeting(meeting_topic, interview_datetime, duration)
|
||
|
||
if result["status"] == "success":
|
||
# 2. Database Writes (Slow)
|
||
zoom_meeting = ZoomMeeting.objects.create(
|
||
topic=meeting_topic,
|
||
start_time=interview_datetime,
|
||
duration=duration,
|
||
meeting_id=result["meeting_details"]["meeting_id"],
|
||
join_url=result["meeting_details"]["join_url"],
|
||
zoom_gateway_response=result["zoom_gateway_response"],
|
||
host_email=result["meeting_details"]["host_email"],
|
||
password=result["meeting_details"]["password"]
|
||
)
|
||
ScheduledInterview.objects.create(
|
||
candidate=candidate,
|
||
job=job,
|
||
zoom_meeting=zoom_meeting,
|
||
schedule=schedule,
|
||
interview_date=slot_date,
|
||
interview_time=slot_time
|
||
)
|
||
# Log success or use Django-Q result system for monitoring
|
||
logger.info(f"Successfully scheduled interview for {candidate.name}")
|
||
return True # Task succeeded
|
||
else:
|
||
# Handle Zoom API failure (e.g., log it or notify administrator)
|
||
logger.error(f"Zoom API failed for {candidate.name}: {result['message']}")
|
||
return False # Task failed
|
||
|
||
except Exception as e:
|
||
# Catch any unexpected errors during database lookups or processing
|
||
logger.error(f"Critical error scheduling interview: {e}")
|
||
return False # Task failed
|
||
|
||
|
||
def handle_zoom_webhook_event(payload):
|
||
"""
|
||
Background task to process a Zoom webhook event and update the local ZoomMeeting status.
|
||
It handles: created, updated, started, ended, and deleted events.
|
||
"""
|
||
event_type = payload.get('event')
|
||
object_data = payload['payload']['object']
|
||
|
||
# Zoom often uses a long 'id' for the scheduled meeting and sometimes a 'uuid'.
|
||
# We rely on the unique 'id' that maps to your ZoomMeeting.meeting_id field.
|
||
meeting_id_zoom = str(object_data.get('id'))
|
||
print(meeting_id_zoom)
|
||
if not meeting_id_zoom:
|
||
logger.warning(f"Webhook received without a valid Meeting ID: {event_type}")
|
||
return False
|
||
|
||
try:
|
||
# Use filter().first() to avoid exceptions if the meeting doesn't exist yet,
|
||
# and to simplify the logic flow.
|
||
meeting_instance = ZoomMeeting.objects.filter(meeting_id=meeting_id_zoom).first()
|
||
print(meeting_instance)
|
||
# --- 1. Creation and Update Events ---
|
||
if event_type == 'meeting.updated':
|
||
if meeting_instance:
|
||
# Update key fields from the webhook payload
|
||
meeting_instance.topic = object_data.get('topic', meeting_instance.topic)
|
||
|
||
# Check for and update status and time details
|
||
# if event_type == 'meeting.created':
|
||
# meeting_instance.status = 'scheduled'
|
||
# elif event_type == 'meeting.updated':
|
||
# Only update time fields if they are in the payload
|
||
print(object_data)
|
||
meeting_instance.start_time = object_data.get('start_time', meeting_instance.start_time)
|
||
meeting_instance.duration = object_data.get('duration', meeting_instance.duration)
|
||
meeting_instance.timezone = object_data.get('timezone', meeting_instance.timezone)
|
||
|
||
meeting_instance.status = object_data.get('status', meeting_instance.status)
|
||
|
||
meeting_instance.save(update_fields=['topic', 'start_time', 'duration', 'timezone', 'status'])
|
||
|
||
# --- 2. Status Change Events (Start/End) ---
|
||
elif event_type == 'meeting.started':
|
||
if meeting_instance:
|
||
meeting_instance.status = 'started'
|
||
meeting_instance.save(update_fields=['status'])
|
||
|
||
elif event_type == 'meeting.ended':
|
||
if meeting_instance:
|
||
meeting_instance.status = 'ended'
|
||
meeting_instance.save(update_fields=['status'])
|
||
|
||
# --- 3. Deletion Event (User Action) ---
|
||
elif event_type == 'meeting.deleted':
|
||
if meeting_instance:
|
||
try:
|
||
meeting_instance.status = 'cancelled'
|
||
meeting_instance.save(update_fields=['status'])
|
||
except Exception as e:
|
||
logger.error(f"Failed to mark Zoom meeting as cancelled: {e}")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to process Zoom webhook for {event_type} (ID: {meeting_id_zoom}): {e}", exc_info=True)
|
||
return False
|
||
|
||
def linkedin_post_task(job_slug, access_token):
|
||
# for linked post background tasks
|
||
|
||
job=get_object_or_404(JobPosting,slug=job_slug)
|
||
|
||
try:
|
||
service=LinkedInService()
|
||
service.access_token=access_token
|
||
# long running task
|
||
result=service.create_job_post(job)
|
||
|
||
#update the jobposting object with the final result
|
||
if result['success']:
|
||
job.posted_to_linkedin=True
|
||
job.linkedin_post_id=result['post_id']
|
||
job.linkedin_post_url=result['post_url']
|
||
job.linkedin_post_status='SUCCESSS'
|
||
job.linkedin_posted_at=timezone.now()
|
||
else:
|
||
error_msg=result.get('error',"Unknown API error")
|
||
job.linkedin_post_status = 'FAILED'
|
||
logger.error(f"LinkedIn post failed for job {job_slug}: {error_msg}")
|
||
job.save()
|
||
return result['success']
|
||
except Exception as e:
|
||
logger.error(f"Critical error in LinkedIn task for job {job_slug}: {e}", exc_info=True)
|
||
# Update job status with the critical error
|
||
job.linkedin_post_status = f"CRITICAL_ERROR: {str(e)}"
|
||
job.save()
|
||
return False
|
||
|
||
|
||
def form_close(job_id):
|
||
job = get_object_or_404(JobPosting, pk=job_id)
|
||
job.is_active = False
|
||
job.template_form.is_active = False
|
||
job.save() |