` bullet points. Encapsulate the entire formatted block within a single ``.
**TASK 2: LinkedIn Post Creation**
1. **Write the Post:** Create an engaging, professional, and concise LinkedIn post (maximum 1300 characters) summarizing the opportunity.
2. **Encourage Action:** The post must have a strong call-to-action (CTA) encouraging applications.
3. **Use Hashtags:** Integrate relevant industry, role, and company hashtags (including any provided in the raw input) naturally at the end of the post.
**STRICT JSON OUTPUT INSTRUCTIONS:**
Output a **single, valid JSON object** with **ONLY** the following three top-level key-value pairs.
* The values for `html_job_description` and `html_qualifications` MUST be the complete, formatted HTML strings (including all tags).
* The value for `linkedin_post` MUST be the complete, final LinkedIn post as a single string not greater than 3000 characters.
**Output Keys:**
1. `html_job_description`
2. `html_qualifications`
3 `html_job_requirements`
4. `linkedin_post_data`
**Do not include any other text, explanation, or markdown outside of the final JSON object.**
"""
result = ai_handler(prompt)
print(f"REsults: {result}")
if result['status'] == 'error':
logger.error(f"AI handler returned error for candidate {job_posting.pk}")
print(f"AI handler returned error for candidate {job_posting.pk}")
return
data = result['data']
if isinstance(data, str):
data = json.loads(data)
print(data)
job_posting.description = data.get('html_job_description')
job_posting.qualifications = data.get('html_qualifications')
job_posting.linkedin_post_formated_data=data.get('linkedin_post_data')
job_posting.save(update_fields=['description', 'qualifications','linkedin_post_formated_data'])
def ai_handler(prompt):
print("model call")
response = requests.post(
url="https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
},
data=json.dumps({
"model": OPENROUTER_MODEL,
"messages": [{"role": "user", "content": prompt}],
},
)
)
res = {}
print(response.status_code)
if response.status_code == 200:
res = response.json()
print(res)
content = res["choices"][0]['message']['content']
try:
# print(content)
content = content.replace("```json","").replace("```","")
res = json.loads(content)
print("success response")
return {"status": "success", "data": res}
except Exception as e:
print(e)
return {"status": "error", "data": str(e)}
else:
print("error response")
return {"status": "error", "data": response.json()}
def safe_cast_to_float(value, default=0.0):
"""Safely converts a value (int, float, or string) to a float."""
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
# Remove non-numeric characters except the decimal point
cleaned_value = re.sub(r'[^\d.]', '', value)
try:
# Ensure we handle empty strings after cleaning
return float(cleaned_value) if cleaned_value else default
except ValueError:
return default
return default
def handle_reume_parsing_and_scoring(pk):
"""
Optimized Django-Q task to parse a resume, score the candidate against a job,
and atomically save the results.
"""
# --- 1. Robust Object Retrieval (Prevents looping on DoesNotExist) ---
try:
instance = Candidate.objects.get(pk=pk)
except Candidate.DoesNotExist:
# Exit gracefully if the candidate was deleted after the task was queued
logger.warning(f"Candidate matching query does not exist for pk={pk}. Exiting task.")
print(f"Candidate matching query does not exist for pk={pk}. Exiting task.")
return
logger.info(f"Scoring resume for candidate {pk}")
print(f"Scoring resume for candidate {pk}")
# --- 2. I/O and Initial Data Check ---
try:
file_path = instance.resume.path
if not os.path.exists(file_path):
logger.warning(f"Resume file not found: {file_path}")
print(f"Resume file not found: {file_path}")
# Consider marking the task as unsuccessful but don't re-queue
return
# Use the new unified document parser
resume_text = extract_text_from_document(file_path)
job_detail = f"{instance.job.description} {instance.job.qualifications}"
except Exception as e:
logger.error(f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}")
print(f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}")
return
print(resume_text)
# --- 3. Single, Combined LLM Prompt (Major Cost & Latency Optimization) ---
prompt = f"""
You are an expert AI system functioning as both a Resume Parser and a Technical Recruiter.
Your task is to:
1. **PARSE**: Extract all key-value information from the provided RESUME TEXT into a clean JSON structure under the key 'resume_data', preserving the original text and it's formatting and dont add any extra text.
2. **SCORE**: Analyze the parsed data against the JOB CRITERIA and generate a comprehensive score and analysis under the key 'analysis_data'.
**JOB CRITERIA:**
{job_detail}
**RESUME TEXT:**
{resume_text}
**STRICT JSON OUTPUT INSTRUCTIONS:**
Output a single, valid JSON object with ONLY the following two top-level keys:
1. "resume_data": {{
"full_name": "Full name of the candidate",
"current_title": "Most recent or current job title",
"location": "City and state",
"contact": "Phone number and email",
"linkedin": "LinkedIn profile URL",
"github": "GitHub or portfolio URL",
"summary": "Brief professional profile or summary (1–2 sentences)",
"education": [{{
"institution": "Institution name",
"degree": "Degree name",
"year": "Year of graduation" (if provided) or '',
"gpa": "GPA (if provided)",
"relevant_courses": ["list", "of", "courses"](if provided) or []
}}],
"skills": {{
"category_1": ["skill_a", "skill_b"],
"uncategorized": ["tool_x"]
}},
"experience": [{{
"company": "Company name",
"job_title": "Job Title",
"location": "Location",
"start_date": "YYYY-MM",
"end_date": "YYYY-MM or Present",
"key_achievements": ["concise bullet points"] (if provided) or []
}}],
"projects": [{{
"name": "Project name",
"year": "Year",
"technologies_used": ["list", "of", "tech"] (if provided) or [],
"brief_description": "description"
}}]
}}
2. "analysis_data": {{
"match_score": "Integer Score 0-100",
"strengths": "Brief summary of strengths",
"weaknesses": "Brief summary of weaknesses",
"years_of_experience": "Total years of experience (float, e.g., 6.5)",
"criteria_checklist": List of job requirements if any {{ "Python": "Met", "AWS": "Not Met"}} only output the criteria_checklist in one of ('Met','Not Met') don't output any extra text,
"category": "Most fitting professional field (e.g., Data Science), only output the category name and no other text example ('Software Development', 'correct') , ('Software Development and devops','wrong') ('Software Development / Backend Development','wrong')",
"most_recent_job_title": "Candidate's most recent job title",
"recommendation": "Detailed hiring recommendation narrative",
"top_3_keywords": ["keyword1", "keyword2", "keyword3"],
"job_fit_narrative": "Single, concise summary sentence",
"language_fluency": ["language: fluency_level"],
"screening_stage_rating": "Standardized rating (Highly Qualified, Qualified , Partially Qualified, Not Qualified)",
"min_req_met_bool": "Boolean (true/false)",
"soft_skills_score": "Integer Score 0-100 for inferred non-technical skills",
"experience_industry_match": "Integer Score 0-100 for industry relevance",
"seniority_level_match": "Integer Score 0-100 for alignment with JD's seniority level",
"red_flags": ["List of any potential concerns (if any): e.g., 'Employment gap 1 year', 'Frequent job hopping', 'Missing required certification'"],
"employment_stability_score": "Integer Score 0-100 (Higher is more stable/longer tenure) (if possible)",
"transferable_skills_narrative": "A brief sentence describing the relevance of non-core experience (if applicable).",
"cultural_fit_keywords": ["A list of 3-5 keywords extracted from the resume (if possible) (e.g., 'team-player', 'mentored', 'cross-functional')"]
}}
If a top-level key or its required fields are missing, set the field to null, an empty list, or an empty object as appropriate.
Output only valid JSON—no markdown, no extra text.
"""
try:
result = ai_handler(prompt)
if result['status'] == 'error':
logger.error(f"AI handler returned error for candidate {instance.pk}")
print(f"AI handler returned error for candidate {instance.pk}")
return
# Ensure the result is parsed as a Python dict (if ai_handler returns a JSON string)
data = result['data']
if isinstance(data, str):
data = json.loads(data)
print(data)
# parsed_summary = data.get('parsed_data', {})
# scoring_result = data.get('scoring_data', {})
except Exception as e:
logger.error(f"AI handler failed for candidate {instance.pk}: {e}")
print(f"AI handler failed for candidate {instance.pk}: {e}")
return
# --- 4. Atomic Database Update (Ensures data integrity) ---
with transaction.atomic():
# Map JSON keys to model fields with appropriate defaults
# update_map = {
# 'match_score': ('match_score', 0),
# 'years_of_experience': ('years_of_experience', 0.0),
# 'soft_skills_score': ('soft_skills_score', 0),
# 'experience_industry_match': ('experience_industry_match', 0),
# 'min_req_met_bool': ('min_req_met_bool', False),
# 'screening_stage_rating': ('screening_stage_rating', 'N/A'),
# 'most_recent_job_title': ('most_recent_job_title', 'N/A'),
# 'top_3_keywords': ('top_3_keywords', []),
# 'strengths': ('strengths', ''),
# 'weaknesses': ('weaknesses', ''),
# 'job_fit_narrative': ('job_fit_narrative', ''),
# 'recommendation': ('recommendation', ''),
# 'criteria_checklist': ('criteria_checklist', {}),
# 'language_fluency': ('language_fluency', []),
# 'category': ('category', 'N/A'),
# }
# Apply scoring results to the instance
# for model_field, (json_key, default_value) in update_map.items():
# instance.ai_analysis_data[model_field] = scoring_result.get(json_key, default_value)
# instance.set_field(model_field, scoring_result.get(json_key, default_value))
# instance.set_field("match_score" , int(safe_cast_to_float(scoring_result.get('match_score', 0), default=0)))
# instance.set_field("years_of_experience" , safe_cast_to_float(scoring_result.get('years_of_experience', 0.0)))
# instance.set_field("soft_skills_score" , int(safe_cast_to_float(scoring_result.get('soft_skills_score', 0), default=0)))
# instance.set_field("experience_industry_match" , int(safe_cast_to_float(scoring_result.get('experience_industry_match', 0), default=0)))
# # Other Model Fields
# instance.set_field("min_req_met_bool" , scoring_result.get('min_req_met_bool', False))
# instance.set_field("screening_stage_rating" , scoring_result.get('screening_stage_rating', 'N/A'))
# instance.set_field("category" , scoring_result.get('category', 'N/A'))
# instance.set_field("most_recent_job_title" , scoring_result.get('most_recent_job_title', 'N/A'))
# instance.set_field("top_3_keywords" , scoring_result.get('top_3_keywords', []))
# instance.set_field("strengths" , scoring_result.get('strengths', ''))
# instance.set_field("weaknesses" , scoring_result.get('weaknesses', ''))
# instance.set_field("job_fit_narrative" , scoring_result.get('job_fit_narrative', ''))
# instance.set_field("recommendation" , scoring_result.get('recommendation', ''))
# instance.set_field("criteria_checklist" , scoring_result.get('criteria_checklist', {}))
# instance.set_field("language_fluency" , scoring_result.get('language_fluency', []))
# 2. Update the Full JSON Field (ai_analysis_data)
if instance.ai_analysis_data is None:
instance.ai_analysis_data = {}
# Save both structured outputs into the single JSONField for completeness
instance.ai_analysis_data = data
# instance.ai_analysis_data['parsed_data'] = parsed_summary
# instance.ai_analysis_data['scoring_data'] = scoring_result
# Apply parsing results
# instance.parsed_summary = json.dumps(parsed_summary)
instance.is_resume_parsed = True
instance.save(update_fields=['ai_analysis_data', 'is_resume_parsed'])
logger.info(f"Successfully scored and saved analysis for candidate {instance.id}")
print(f"Successfully scored and saved analysis for candidate {instance.id}")
def create_interview_and_meeting(
candidate_id,
job_id,
schedule_id,
slot_date,
slot_time,
duration
):
"""
Synchronous task for a single interview slot, dispatched by django-q.
"""
try:
candidate = Candidate.objects.get(pk=candidate_id)
job = JobPosting.objects.get(pk=job_id)
schedule = InterviewSchedule.objects.get(pk=schedule_id)
interview_datetime = datetime.combine(slot_date, slot_time)
meeting_topic = f"Interview for {job.title} - {candidate.name}"
# 1. External API Call (Slow)
result = create_zoom_meeting(meeting_topic, interview_datetime, duration)
if result["status"] == "success":
# 2. Database Writes (Slow)
zoom_meeting = ZoomMeeting.objects.create(
topic=meeting_topic,
start_time=interview_datetime,
duration=duration,
meeting_id=result["meeting_details"]["meeting_id"],
join_url=result["meeting_details"]["join_url"],
zoom_gateway_response=result["zoom_gateway_response"],
host_email=result["meeting_details"]["host_email"],
password=result["meeting_details"]["password"]
)
ScheduledInterview.objects.create(
candidate=candidate,
job=job,
zoom_meeting=zoom_meeting,
schedule=schedule,
interview_date=slot_date,
interview_time=slot_time
)
# Log success or use Django-Q result system for monitoring
logger.info(f"Successfully scheduled interview for {candidate.name}")
return True # Task succeeded
else:
# Handle Zoom API failure (e.g., log it or notify administrator)
logger.error(f"Zoom API failed for {candidate.name}: {result['message']}")
return False # Task failed
except Exception as e:
# Catch any unexpected errors during database lookups or processing
logger.error(f"Critical error scheduling interview: {e}")
return False # Task failed
def handle_zoom_webhook_event(payload):
"""
Background task to process a Zoom webhook event and update the local ZoomMeeting status.
It handles: created, updated, started, ended, and deleted events.
"""
event_type = payload.get('event')
object_data = payload['payload']['object']
# Zoom often uses a long 'id' for the scheduled meeting and sometimes a 'uuid'.
# We rely on the unique 'id' that maps to your ZoomMeeting.meeting_id field.
meeting_id_zoom = str(object_data.get('id'))
print(meeting_id_zoom)
if not meeting_id_zoom:
logger.warning(f"Webhook received without a valid Meeting ID: {event_type}")
return False
try:
# Use filter().first() to avoid exceptions if the meeting doesn't exist yet,
# and to simplify the logic flow.
meeting_instance = ZoomMeeting.objects.filter(meeting_id=meeting_id_zoom).first()
print(meeting_instance)
# --- 1. Creation and Update Events ---
if event_type == 'meeting.updated':
if meeting_instance:
# Update key fields from the webhook payload
meeting_instance.topic = object_data.get('topic', meeting_instance.topic)
# Check for and update status and time details
# if event_type == 'meeting.created':
# meeting_instance.status = 'scheduled'
# elif event_type == 'meeting.updated':
# Only update time fields if they are in the payload
print(object_data)
meeting_instance.start_time = object_data.get('start_time', meeting_instance.start_time)
meeting_instance.duration = object_data.get('duration', meeting_instance.duration)
meeting_instance.timezone = object_data.get('timezone', meeting_instance.timezone)
meeting_instance.status = object_data.get('status', meeting_instance.status)
meeting_instance.save(update_fields=['topic', 'start_time', 'duration', 'timezone', 'status'])
# --- 2. Status Change Events (Start/End) ---
elif event_type == 'meeting.started':
if meeting_instance:
meeting_instance.status = 'started'
meeting_instance.save(update_fields=['status'])
elif event_type == 'meeting.ended':
if meeting_instance:
meeting_instance.status = 'ended'
meeting_instance.save(update_fields=['status'])
# --- 3. Deletion Event (User Action) ---
elif event_type == 'meeting.deleted':
if meeting_instance:
try:
meeting_instance.status = 'cancelled'
meeting_instance.save(update_fields=['status'])
except Exception as e:
logger.error(f"Failed to mark Zoom meeting as cancelled: {e}")
return True
except Exception as e:
logger.error(f"Failed to process Zoom webhook for {event_type} (ID: {meeting_id_zoom}): {e}", exc_info=True)
return False
def linkedin_post_task(job_slug, access_token):
# for linked post background tasks
job=get_object_or_404(JobPosting,slug=job_slug)
try:
service=LinkedInService()
service.access_token=access_token
# long running task
result=service.create_job_post(job)
#update the jobposting object with the final result
if result['success']:
job.posted_to_linkedin=True
job.linkedin_post_id=result['post_id']
job.linkedin_post_url=result['post_url']
job.linkedin_post_status='SUCCESSS'
job.linkedin_posted_at=timezone.now()
else:
error_msg=result.get('error',"Unknown API error")
job.linkedin_post_status = 'FAILED'
logger.error(f"LinkedIn post failed for job {job_slug}: {error_msg}")
job.save()
return result['success']
except Exception as e:
logger.error(f"Critical error in LinkedIn task for job {job_slug}: {e}", exc_info=True)
# Update job status with the critical error
job.linkedin_post_status = f"CRITICAL_ERROR: {str(e)}"
job.save()
return False
def form_close(job_id):
job = get_object_or_404(JobPosting, pk=job_id)
job.is_active = False
job.template_form.is_active = False
job.save()