import re import os import json import logging import requests from PyPDF2 import PdfReader from datetime import datetime from django.db import transaction from .utils import create_zoom_meeting from recruitment.models import Candidate from . linkedin_service import LinkedInService from django.shortcuts import get_object_or_404 from . models import JobPosting from django.utils import timezone from . models import InterviewSchedule,ScheduledInterview,ZoomMeeting # Add python-docx import for Word document processing try: from docx import Document DOCX_AVAILABLE = True except ImportError: DOCX_AVAILABLE = False logger = logging.getLogger(__name__) logger.warning("python-docx not available. Word document processing will be disabled.") logger = logging.getLogger(__name__) OPENROUTER_API_KEY ='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' OPENROUTER_MODEL = 'qwen/qwen-2.5-72b-instruct:free' # OPENROUTER_MODEL = 'openai/gpt-oss-20b:free' # OPENROUTER_MODEL = 'openai/gpt-oss-20b' # OPENROUTER_MODEL = 'mistralai/mistral-small-3.2-24b-instruct:free' # from google import genai # client = genai.Client(api_key="AIzaSyDkwYmvRe5ieTjQi1ClSzD5z5roTwaFsmY") # def google_ai(text): # response = client.models.generate_content( # model="gemini-2.5-flash", contents=text # ) # return response if not OPENROUTER_API_KEY: logger.warning("OPENROUTER_API_KEY not set. Resume scoring will be skipped.") def extract_text_from_pdf(file_path): """Extract text from PDF files""" print("PDF text extraction") text = "" try: with open(file_path, "rb") as f: reader = PdfReader(f) for page in reader.pages: text += (page.extract_text() or "") except Exception as e: logger.error(f"PDF extraction failed: {e}") raise return text.strip() def extract_text_from_word(file_path): """Extract text from Word documents (.docx)""" if not DOCX_AVAILABLE: raise ImportError("python-docx is not installed. Please install it with: pip install python-docx") print("Word text extraction") text = "" try: doc = Document(file_path) # Extract text from paragraphs for paragraph in doc.paragraphs: text += paragraph.text + "\n" # Extract text from tables for table in doc.tables: for row in table.rows: for cell in row.cells: text += cell.text + "\t" text += "\n" # Extract text from headers and footers for section in doc.sections: # Header if section.header: for paragraph in section.header.paragraphs: text += "[HEADER] " + paragraph.text + "\n" # Footer if section.footer: for paragraph in section.footer.paragraphs: text += "[FOOTER] " + paragraph.text + "\n" except Exception as e: logger.error(f"Word extraction failed: {e}") raise return text.strip() def extract_text_from_document(file_path): """Extract text from documents based on file type""" if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") file_ext = os.path.splitext(file_path)[1].lower() if file_ext == '.pdf': return extract_text_from_pdf(file_path) elif file_ext == '.docx': return extract_text_from_word(file_path) else: raise ValueError(f"Unsupported file type: {file_ext}. Only .pdf and .docx files are supported.") def format_job_description(pk): job_posting = JobPosting.objects.get(pk=pk) print(job_posting) prompt = f""" Can you please organize and format this unformatted job description and qualifications into clear, readable sections using headings and bullet points? Format the Content: You need to convert the clear, formatted job description and qualifications into a 2 blocks of HTML code. **JOB DESCRIPTION:** {job_posting.description} **QUALIFICATIONS:** {job_posting.qualifications} **STRICT JSON OUTPUT INSTRUCTIONS:** Output a single, valid JSON object with ONLY the following two top-level keys: 'job_description': 'A HTML containing the formatted job description', 'job_qualifications': 'A HTML containing the formatted job qualifications', Do not include any other text except for the JSON output. """ result = ai_handler(prompt) if result['status'] == 'error': logger.error(f"AI handler returned error for candidate {job_posting.pk}") print(f"AI handler returned error for candidate {job_posting.pk}") return data = result['data'] if isinstance(data, str): data = json.loads(data) print(data) job_posting.description = data.get('job_description') job_posting.qualifications = data.get('job_qualifications') job_posting.save(update_fields=['description', 'qualifications']) def ai_handler(prompt): print("model call") response = requests.post( url="https://openrouter.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", }, data=json.dumps({ "model": OPENROUTER_MODEL, "messages": [{"role": "user", "content": prompt}], }, ) ) res = {} print(response.status_code) if response.status_code == 200: res = response.json() content = res["choices"][0]['message']['content'] try: # print(content) content = content.replace("```json","").replace("```","") res = json.loads(content) print("success response") return {"status": "success", "data": res} except Exception as e: print(e) return {"status": "error", "data": str(e)} else: print("error response") return {"status": "error", "data": response.json()} def safe_cast_to_float(value, default=0.0): """Safely converts a value (int, float, or string) to a float.""" if isinstance(value, (int, float)): return float(value) if isinstance(value, str): # Remove non-numeric characters except the decimal point cleaned_value = re.sub(r'[^\d.]', '', value) try: # Ensure we handle empty strings after cleaning return float(cleaned_value) if cleaned_value else default except ValueError: return default return default def handle_reume_parsing_and_scoring(pk): """ Optimized Django-Q task to parse a resume, score the candidate against a job, and atomically save the results. """ # --- 1. Robust Object Retrieval (Prevents looping on DoesNotExist) --- try: instance = Candidate.objects.get(pk=pk) except Candidate.DoesNotExist: # Exit gracefully if the candidate was deleted after the task was queued logger.warning(f"Candidate matching query does not exist for pk={pk}. Exiting task.") print(f"Candidate matching query does not exist for pk={pk}. Exiting task.") return logger.info(f"Scoring resume for candidate {pk}") print(f"Scoring resume for candidate {pk}") # --- 2. I/O and Initial Data Check --- try: file_path = instance.resume.path if not os.path.exists(file_path): logger.warning(f"Resume file not found: {file_path}") print(f"Resume file not found: {file_path}") # Consider marking the task as unsuccessful but don't re-queue return # Use the new unified document parser resume_text = extract_text_from_document(file_path) job_detail = f"{instance.job.description} {instance.job.qualifications}" except Exception as e: logger.error(f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}") print(f"Error during initial data retrieval/parsing for candidate {instance.pk}: {e}") return # --- 3. Single, Combined LLM Prompt (Major Cost & Latency Optimization) --- prompt = f""" You are an expert AI system functioning as both a Resume Parser and a Technical Recruiter. Your task is to: 1. **PARSE**: Extract all key-value information from the provided RESUME TEXT into a clean JSON structure under the key 'resume_data', preserving the original text and it's formatting and dont add any extra text. 2. **SCORE**: Analyze the parsed data against the JOB CRITERIA and generate a comprehensive score and analysis under the key 'analysis_data'. **JOB CRITERIA:** {job_detail} **RESUME TEXT:** {resume_text} **STRICT JSON OUTPUT INSTRUCTIONS:** Output a single, valid JSON object with ONLY the following two top-level keys: 1. "resume_data": {{ "full_name": "Full name of the candidate", "current_title": "Most recent or current job title", "location": "City and state", "contact": "Phone number and email", "linkedin": "LinkedIn profile URL", "github": "GitHub or portfolio URL", "summary": "Brief professional profile or summary (1–2 sentences)", "education": [{{ "institution": "Institution name", "degree": "Degree name", "year": "Year of graduation" (if provided) or '', "gpa": "GPA (if provided)", "relevant_courses": ["list", "of", "courses"](if provided) or [] }}], "skills": {{ "category_1": ["skill_a", "skill_b"], "uncategorized": ["tool_x"] }}, "experience": [{{ "company": "Company name", "job_title": "Job Title", "location": "Location", "start_date": "YYYY-MM", "end_date": "YYYY-MM or Present", "key_achievements": ["concise bullet points"] (if provided) or [] }}], "projects": [{{ "name": "Project name", "year": "Year", "technologies_used": ["list", "of", "tech"] (if provided) or [], "brief_description": "description" }}] }} 2. "analysis_data": {{ "match_score": "Integer Score 0-100", "strengths": "Brief summary of strengths", "weaknesses": "Brief summary of weaknesses", "years_of_experience": "Total years of experience (float, e.g., 6.5)", "criteria_checklist": {{ "Python": "Met", "AWS": "Not Mentioned"}}, "category": "Most fitting professional field (e.g., Data Science), only output the category name and no other text example ('Software Development', 'correct') , ('Software Development and devops','wrong') ('Software Development / Backend Development','wrong')", "most_recent_job_title": "Candidate's most recent job title", "recommendation": "Detailed hiring recommendation narrative", "top_3_keywords": ["keyword1", "keyword2", "keyword3"], "job_fit_narrative": "Single, concise summary sentence", "language_fluency": ["language: fluency_level"], "screening_stage_rating": "Standardized rating (Highly Qualified, Qualified , Partially Qualified, Not Qualified)", "min_req_met_bool": "Boolean (true/false)", "soft_skills_score": "Integer Score 0-100 for inferred non-technical skills", "experience_industry_match": "Integer Score 0-100 for industry relevance", "seniority_level_match": "Integer Score 0-100 for alignment with JD's seniority level", "red_flags": ["List of any potential concerns (if any): e.g., 'Employment gap 1 year', 'Frequent job hopping', 'Missing required certification'"], "employment_stability_score": "Integer Score 0-100 (Higher is more stable/longer tenure) (if possible)", "transferable_skills_narrative": "A brief sentence describing the relevance of non-core experience (if applicable).", "cultural_fit_keywords": ["A list of 3-5 keywords extracted from the resume (if possible) (e.g., 'team-player', 'mentored', 'cross-functional')"] }} If a top-level key or its required fields are missing, set the field to null, an empty list, or an empty object as appropriate. Output only valid JSON—no markdown, no extra text. """ try: result = ai_handler(prompt) if result['status'] == 'error': logger.error(f"AI handler returned error for candidate {instance.pk}") print(f"AI handler returned error for candidate {instance.pk}") return # Ensure the result is parsed as a Python dict (if ai_handler returns a JSON string) data = result['data'] if isinstance(data, str): data = json.loads(data) print(data) # parsed_summary = data.get('parsed_data', {}) # scoring_result = data.get('scoring_data', {}) except Exception as e: logger.error(f"AI handler failed for candidate {instance.pk}: {e}") print(f"AI handler failed for candidate {instance.pk}: {e}") return # --- 4. Atomic Database Update (Ensures data integrity) --- with transaction.atomic(): # Map JSON keys to model fields with appropriate defaults # update_map = { # 'match_score': ('match_score', 0), # 'years_of_experience': ('years_of_experience', 0.0), # 'soft_skills_score': ('soft_skills_score', 0), # 'experience_industry_match': ('experience_industry_match', 0), # 'min_req_met_bool': ('min_req_met_bool', False), # 'screening_stage_rating': ('screening_stage_rating', 'N/A'), # 'most_recent_job_title': ('most_recent_job_title', 'N/A'), # 'top_3_keywords': ('top_3_keywords', []), # 'strengths': ('strengths', ''), # 'weaknesses': ('weaknesses', ''), # 'job_fit_narrative': ('job_fit_narrative', ''), # 'recommendation': ('recommendation', ''), # 'criteria_checklist': ('criteria_checklist', {}), # 'language_fluency': ('language_fluency', []), # 'category': ('category', 'N/A'), # } # Apply scoring results to the instance # for model_field, (json_key, default_value) in update_map.items(): # instance.ai_analysis_data[model_field] = scoring_result.get(json_key, default_value) # instance.set_field(model_field, scoring_result.get(json_key, default_value)) # instance.set_field("match_score" , int(safe_cast_to_float(scoring_result.get('match_score', 0), default=0))) # instance.set_field("years_of_experience" , safe_cast_to_float(scoring_result.get('years_of_experience', 0.0))) # instance.set_field("soft_skills_score" , int(safe_cast_to_float(scoring_result.get('soft_skills_score', 0), default=0))) # instance.set_field("experience_industry_match" , int(safe_cast_to_float(scoring_result.get('experience_industry_match', 0), default=0))) # # Other Model Fields # instance.set_field("min_req_met_bool" , scoring_result.get('min_req_met_bool', False)) # instance.set_field("screening_stage_rating" , scoring_result.get('screening_stage_rating', 'N/A')) # instance.set_field("category" , scoring_result.get('category', 'N/A')) # instance.set_field("most_recent_job_title" , scoring_result.get('most_recent_job_title', 'N/A')) # instance.set_field("top_3_keywords" , scoring_result.get('top_3_keywords', [])) # instance.set_field("strengths" , scoring_result.get('strengths', '')) # instance.set_field("weaknesses" , scoring_result.get('weaknesses', '')) # instance.set_field("job_fit_narrative" , scoring_result.get('job_fit_narrative', '')) # instance.set_field("recommendation" , scoring_result.get('recommendation', '')) # instance.set_field("criteria_checklist" , scoring_result.get('criteria_checklist', {})) # instance.set_field("language_fluency" , scoring_result.get('language_fluency', [])) # 2. Update the Full JSON Field (ai_analysis_data) if instance.ai_analysis_data is None: instance.ai_analysis_data = {} # Save both structured outputs into the single JSONField for completeness instance.ai_analysis_data = data # instance.ai_analysis_data['parsed_data'] = parsed_summary # instance.ai_analysis_data['scoring_data'] = scoring_result # Apply parsing results # instance.parsed_summary = json.dumps(parsed_summary) instance.is_resume_parsed = True instance.save(update_fields=['ai_analysis_data', 'is_resume_parsed']) logger.info(f"Successfully scored and saved analysis for candidate {instance.id}") print(f"Successfully scored and saved analysis for candidate {instance.id}") def create_interview_and_meeting( candidate_id, job_id, schedule_id, slot_date, slot_time, duration ): """ Synchronous task for a single interview slot, dispatched by django-q. """ try: candidate = Candidate.objects.get(pk=candidate_id) job = JobPosting.objects.get(pk=job_id) schedule = InterviewSchedule.objects.get(pk=schedule_id) interview_datetime = datetime.combine(slot_date, slot_time) meeting_topic = f"Interview for {job.title} - {candidate.name}" # 1. External API Call (Slow) result = create_zoom_meeting(meeting_topic, interview_datetime, duration) if result["status"] == "success": # 2. Database Writes (Slow) zoom_meeting = ZoomMeeting.objects.create( topic=meeting_topic, start_time=interview_datetime, duration=duration, meeting_id=result["meeting_details"]["meeting_id"], join_url=result["meeting_details"]["join_url"], zoom_gateway_response=result["zoom_gateway_response"], host_email=result["meeting_details"]["host_email"], password=result["meeting_details"]["password"] ) ScheduledInterview.objects.create( candidate=candidate, job=job, zoom_meeting=zoom_meeting, schedule=schedule, interview_date=slot_date, interview_time=slot_time ) # Log success or use Django-Q result system for monitoring logger.info(f"Successfully scheduled interview for {candidate.name}") return True # Task succeeded else: # Handle Zoom API failure (e.g., log it or notify administrator) logger.error(f"Zoom API failed for {candidate.name}: {result['message']}") return False # Task failed except Exception as e: # Catch any unexpected errors during database lookups or processing logger.error(f"Critical error scheduling interview: {e}") return False # Task failed def handle_zoom_webhook_event(payload): """ Background task to process a Zoom webhook event and update the local ZoomMeeting status. It handles: created, updated, started, ended, and deleted events. """ event_type = payload.get('event') object_data = payload['payload']['object'] # Zoom often uses a long 'id' for the scheduled meeting and sometimes a 'uuid'. # We rely on the unique 'id' that maps to your ZoomMeeting.meeting_id field. meeting_id_zoom = str(object_data.get('id')) print(meeting_id_zoom) if not meeting_id_zoom: logger.warning(f"Webhook received without a valid Meeting ID: {event_type}") return False try: # Use filter().first() to avoid exceptions if the meeting doesn't exist yet, # and to simplify the logic flow. meeting_instance = ZoomMeeting.objects.filter(meeting_id=meeting_id_zoom).first() print(meeting_instance) # --- 1. Creation and Update Events --- if event_type == 'meeting.updated': if meeting_instance: # Update key fields from the webhook payload meeting_instance.topic = object_data.get('topic', meeting_instance.topic) # Check for and update status and time details # if event_type == 'meeting.created': # meeting_instance.status = 'scheduled' # elif event_type == 'meeting.updated': # Only update time fields if they are in the payload print(object_data) meeting_instance.start_time = object_data.get('start_time', meeting_instance.start_time) meeting_instance.duration = object_data.get('duration', meeting_instance.duration) meeting_instance.timezone = object_data.get('timezone', meeting_instance.timezone) meeting_instance.status = object_data.get('status', meeting_instance.status) meeting_instance.save(update_fields=['topic', 'start_time', 'duration', 'timezone', 'status']) # --- 2. Status Change Events (Start/End) --- elif event_type == 'meeting.started': if meeting_instance: meeting_instance.status = 'started' meeting_instance.save(update_fields=['status']) elif event_type == 'meeting.ended': if meeting_instance: meeting_instance.status = 'ended' meeting_instance.save(update_fields=['status']) # --- 3. Deletion Event (User Action) --- elif event_type == 'meeting.deleted': if meeting_instance: try: meeting_instance.status = 'cancelled' meeting_instance.save(update_fields=['status']) except Exception as e: logger.error(f"Failed to mark Zoom meeting as cancelled: {e}") return True except Exception as e: logger.error(f"Failed to process Zoom webhook for {event_type} (ID: {meeting_id_zoom}): {e}", exc_info=True) return False def linkedin_post_task(job_slug, access_token): # for linked post background tasks job=get_object_or_404(JobPosting,slug=job_slug) try: service=LinkedInService() service.access_token=access_token # long running task result=service.create_job_post(job) #update the jobposting object with the final result if result['success']: job.posted_to_linkedin=True job.linkedin_post_id=result['post_id'] job.linkedin_post_url=result['post_url'] job.linkedin_post_status='SUCCESSS' job.linkedin_posted_at=timezone.now() else: error_msg=result.get('error',"Unknown API error") job.linkedin_post_status = 'FAILED' logger.error(f"LinkedIn post failed for job {job_slug}: {error_msg}") job.save() return result['success'] except Exception as e: logger.error(f"Critical error in LinkedIn task for job {job_slug}: {e}", exc_info=True) # Update job status with the critical error job.linkedin_post_status = f"CRITICAL_ERROR: {str(e)}" job.save() return False