# import os # import fitz # PyMuPDF # import spacy # import requests from recruitment import models from django.conf import settings from datetime import datetime, timedelta, time, date from django.utils import timezone from .models import ScheduledInterview from django.template.loader import render_to_string from django.core.mail import send_mail # nlp = spacy.load("en_core_web_sm") # def extract_text_from_pdf(pdf_path): # text = "" # with fitz.open(pdf_path) as doc: # for page in doc: # text += page.get_text() # return text # def extract_summary_from_pdf(pdf_path): # if not os.path.exists(pdf_path): # return {'error': 'File not found'} # text = extract_text_from_pdf(pdf_path) # doc = nlp(text) # summary = { # 'name': doc.ents[0].text if doc.ents else '', # 'skills': [chunk.text for chunk in doc.noun_chunks if len(chunk.text.split()) > 1], # 'summary': text[:500] # } # return summary import requests from PyPDF2 import PdfReader import os import json import logging logger = logging.getLogger(__name__) OPENROUTER_API_KEY ='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' OPENROUTER_MODEL = 'qwen/qwen-2.5-72b-instruct:free' if not OPENROUTER_API_KEY: logger.warning("OPENROUTER_API_KEY not set. Resume scoring will be skipped.") def extract_text_from_pdf(file_path): print("text extraction") text = "" try: with open(file_path, "rb") as f: reader = PdfReader(f) for page in reader.pages: text += (page.extract_text() or "") except Exception as e: logger.error(f"PDF extraction failed: {e}") raise return text.strip() def score_resume_with_openrouter(prompt): print("model call") response = requests.post( url="https://openrouter.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", }, data=json.dumps({ "model": OPENROUTER_MODEL, "messages": [{"role": "user", "content": prompt}], }, ) ) # print(response.status_code) # print(response.json()) res = {} print(response.status_code) if response.status_code == 200: res = response.json() content = res["choices"][0]['message']['content'] try: content = content.replace("```json","").replace("```","") res = json.loads(content) except Exception as e: print(e) # res = raw_output["choices"][0]["message"]["content"] else: print("error response") return res # print(f"rawraw_output) # print(response) # def match_resume_with_job_description(resume, job_description,prompt=""): # resume_doc = nlp(resume) # job_doc = nlp(job_description) # similarity = resume_doc.similarity(job_doc) # return similarity def dashboard_callback(request, context): total_jobs = models.Job.objects.count() total_candidates = models.Candidate.objects.count() jobs = models.Job.objects.all() job_titles = [job.title for job in jobs] job_app_counts = [job.candidates.count() for job in jobs] context.update({ "total_jobs": total_jobs, "total_candidates": total_candidates, "job_titles": job_titles, "job_app_counts": job_app_counts, }) return context def get_access_token(): """Obtain an access token using server-to-server OAuth.""" client_id = settings.ZOOM_CLIENT_ID client_secret = settings.ZOOM_CLIENT_SECRET auth_url = "https://zoom.us/oauth/token" headers = { "Content-Type": "application/x-www-form-urlencoded", } data = { "grant_type": "account_credentials", "account_id": settings.ZOOM_ACCOUNT_ID, } auth = (client_id, client_secret) response = requests.post(auth_url, headers=headers, data=data, auth=auth) if response.status_code == 200: return response.json().get("access_token") else: raise Exception(f"Failed to obtain access token: {response.json()}") def create_zoom_meeting(topic, start_time, duration): """ Create a Zoom meeting using the Zoom API. Args: topic (str): The topic of the meeting. start_time (str): The start time of the meeting in ISO 8601 format (e.g., "2023-10-01T10:00:00Z"). duration (int): The duration of the meeting in minutes. Returns: dict: A dictionary containing the meeting details if successful, or an error message if failed. """ try: access_token = get_access_token() meeting_details = { "topic": topic, "type": 2, "start_time": start_time, "duration": duration, "timezone": "UTC", "settings": { "host_video": True, "participant_video": True, "join_before_host": True, "mute_upon_entry": False, "approval_type": 2, "audio": "both", "auto_recording": "none" } } # Make API request to Zoom to create the meeting headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } response = requests.post( "https://api.zoom.us/v2/users/me/meetings", headers=headers, json=meeting_details ) # Check response status if response.status_code == 201: meeting_data = response.json() return { "status": "success", "message": "Meeting created successfully.", "meeting_details": { "join_url": meeting_data['join_url'], "meeting_id": meeting_data['id'], "password": meeting_data['password'], "host_email": meeting_data['host_email'] }, "zoom_gateway_response": meeting_data } else: return { "status": "error", "message": "Failed to create meeting.", "details": response.json() } except Exception as e: return { "status": "error", "message": str(e) } def list_zoom_meetings(next_page_token=None): """ List all meetings for a user using the Zoom API. Args: next_page_token (str, optional): The token for paginated results. Defaults to None. Returns: dict: A dictionary containing the list of meetings or an error message. """ try: access_token = get_access_token() user_id = 'me' params = {} if next_page_token: params['next_page_token'] = next_page_token headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } response = requests.get( f"https://api.zoom.us/v2/users/{user_id}/meetings", headers=headers, params=params ) if response.status_code == 200: meetings_data = response.json() return { "status": "success", "message": "Meetings retrieved successfully.", "meetings": meetings_data.get("meetings", []), "next_page_token": meetings_data.get("next_page_token") } else: return { "status": "error", "message": "Failed to retrieve meetings.", "details": response.json() } except Exception as e: return { "status": "error", "message": str(e) } def get_zoom_meeting_details(meeting_id): """ Retrieve details of a specific meeting using the Zoom API. Args: meeting_id (str): The ID of the meeting to retrieve. Returns: dict: A dictionary containing the meeting details or an error message. """ try: access_token = get_access_token() headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } response = requests.get( f"https://api.zoom.us/v2/meetings/{meeting_id}", headers=headers ) if response.status_code == 200: meeting_data = response.json() return { "status": "success", "message": "Meeting details retrieved successfully.", "meeting_details": meeting_data } else: return { "status": "error", "message": "Failed to retrieve meeting details.", "details": response.json() } except Exception as e: return { "status": "error", "message": str(e) } def update_zoom_meeting(meeting_id, updated_data): """ Update a Zoom meeting using the Zoom API. Args: meeting_id (str): The ID of the meeting to update. updated_data (dict): A dictionary containing the fields to update (e.g., topic, start_time, duration). Returns: dict: A dictionary containing the updated meeting details or an error message. """ try: access_token = get_access_token() headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } response = requests.patch( f"https://api.zoom.us/v2/meetings/{meeting_id}", headers=headers, json=updated_data ) if response.status_code == 204: return { "status": "success", "message": "Meeting updated successfully." } else: print(response.json()) return { "status": "error", "message": "Failed to update meeting.", } except Exception as e: return { "status": "error", "message": str(e) } def delete_zoom_meeting(meeting_id): """ Delete a Zoom meeting using the Zoom API. Args: meeting_id (str): The ID of the meeting to delete. Returns: dict: A dictionary indicating success or failure. """ try: access_token = get_access_token() headers = { "Authorization": f"Bearer {access_token}" } response = requests.delete( f"https://api.zoom.us/v2/meetings/{meeting_id}", headers=headers ) if response.status_code == 204: return { "status": "success", "message": "Meeting deleted successfully." } else: return { "status": "error", "message": "Failed to delete meeting.", "details": response.json() } except Exception as e: return { "status": "error", "message": str(e) } def schedule_interviews(schedule): """ Schedule interviews for all candidates in the schedule based on the criteria. Returns the number of interviews successfully scheduled. """ candidates = list(schedule.candidates.all()) print(candidates) if not candidates: return 0 # Calculate available time slots available_slots = get_available_time_slots(schedule) print(available_slots) if len(available_slots) < len(candidates): raise ValueError(f"Not enough available slots. Required: {len(candidates)}, Available: {len(available_slots)}") # Schedule interviews scheduled_count = 0 for i, candidate in enumerate(candidates): slot = available_slots[i] interview_datetime = datetime.combine(slot['date'], slot['time']) # Create Zoom meeting meeting_topic = f"Interview for {schedule.job.title} - {candidate.name}" meeting = create_zoom_meeting( topic=meeting_topic, start_time=interview_datetime, duration=schedule.interview_duration, timezone=timezone.get_current_timezone_name() ) # Create scheduled interview record scheduled_interview = ScheduledInterview.objects.create( candidate=candidate, job=schedule.job, zoom_meeting=meeting, schedule=schedule, interview_date=slot['date'], interview_time=slot['time'] ) # Send email to candidate send_interview_email(scheduled_interview) scheduled_count += 1 return scheduled_count def send_interview_email(scheduled_interview): """ Send an interview invitation email to the candidate. """ subject = f"Interview Invitation for {scheduled_interview.job.title}" context = { 'candidate_name': scheduled_interview.candidate.name, 'job_title': scheduled_interview.job.title, 'company_name': scheduled_interview.job.company.name, 'interview_date': scheduled_interview.interview_date, 'interview_time': scheduled_interview.interview_time, 'join_url': scheduled_interview.zoom_meeting.join_url, 'meeting_id': scheduled_interview.zoom_meeting.meeting_id, } # Render email templates text_message = render_to_string('interviews/email/interview_invitation.txt', context) html_message = render_to_string('interviews/email/interview_invitation.html', context) # Send email send_mail( subject=subject, message=text_message, from_email=settings.DEFAULT_FROM_EMAIL, recipient_list=[scheduled_interview.candidate.email], html_message=html_message, fail_silently=False, ) def get_available_time_slots(schedule, breaks=None): """ Generate a list of available time slots based on the schedule criteria. Returns a list of dictionaries with 'date' and 'time' keys. """ slots = [] current_date = schedule.start_date end_date = schedule.end_date # Convert working days to a set for quick lookup # working_days should be a list of integers where 0=Monday, 1=Tuesday, etc. working_days_set = set(int(day) for day in schedule.working_days) # Parse times start_time = schedule.start_time end_time = schedule.end_time # Calculate slot duration (interview duration + buffer time) slot_duration = timedelta(minutes=schedule.interview_duration + schedule.buffer_time) # Debug output - remove in production print(f"Working days: {working_days_set}") print(f"Date range: {current_date} to {end_date}") print(f"Time range: {start_time} to {end_time}") print(f"Slot duration: {slot_duration}") print(f"Breaks: {breaks}") while current_date <= end_date: # Check if current day is a working day weekday = current_date.weekday() # Monday is 0, Sunday is 6 print(f"Checking {current_date}, weekday: {weekday}, in working days: {weekday in working_days_set}") if weekday in working_days_set: # Generate slots for this day current_time = start_time while True: # Calculate the end time of this slot slot_end_time = (datetime.combine(current_date, current_time) + slot_duration).time() # Check if the slot fits within the working hours if slot_end_time > end_time: break # Check if slot conflicts with any break time conflict_with_break = False if breaks: for break_time in breaks: # Check if the slot overlaps with this break time if not (current_time >= break_time.end_time or slot_end_time <= break_time.start_time): conflict_with_break = True print(f"Slot {current_time}-{slot_end_time} conflicts with break {break_time.start_time}-{break_time.end_time}") break if not conflict_with_break: # Add this slot to available slots slots.append({ 'date': current_date, 'time': current_time }) print(f"Added slot: {current_date} {current_time}") # Move to next slot current_datetime = datetime.combine(current_date, current_time) + slot_duration current_time = current_datetime.time() # Move to next day current_date += timedelta(days=1) print(f"Total slots generated: {len(slots)}") return slots def json_to_markdown_table(data_list): if not data_list: return "" headers = data_list[0].keys() markdown = "| " + " | ".join(headers) + " |\n" markdown += "| " + " | ".join(["---"] * len(headers)) + " |\n" for row in data_list: values = [str(row.get(header, "")) for header in headers] markdown += "| " + " | ".join(values) + " |\n" return markdown