538 lines
17 KiB
Python

# import os
# import fitz # PyMuPDF
# import spacy
# import requests
from recruitment import models
from django.conf import settings
from datetime import datetime, timedelta, time, date
from django.utils import timezone
from .models import ScheduledInterview
from django.template.loader import render_to_string
from django.core.mail import send_mail
# nlp = spacy.load("en_core_web_sm")
# def extract_text_from_pdf(pdf_path):
# text = ""
# with fitz.open(pdf_path) as doc:
# for page in doc:
# text += page.get_text()
# return text
# def extract_summary_from_pdf(pdf_path):
# if not os.path.exists(pdf_path):
# return {'error': 'File not found'}
# text = extract_text_from_pdf(pdf_path)
# doc = nlp(text)
# summary = {
# 'name': doc.ents[0].text if doc.ents else '',
# 'skills': [chunk.text for chunk in doc.noun_chunks if len(chunk.text.split()) > 1],
# 'summary': text[:500]
# }
# return summary
import requests
from PyPDF2 import PdfReader
import os
import json
import logging
logger = logging.getLogger(__name__)
OPENROUTER_API_KEY ='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'
OPENROUTER_MODEL = 'qwen/qwen-2.5-72b-instruct:free'
if not OPENROUTER_API_KEY:
logger.warning("OPENROUTER_API_KEY not set. Resume scoring will be skipped.")
def extract_text_from_pdf(file_path):
print("text extraction")
text = ""
try:
with open(file_path, "rb") as f:
reader = PdfReader(f)
for page in reader.pages:
text += (page.extract_text() or "")
except Exception as e:
logger.error(f"PDF extraction failed: {e}")
raise
return text.strip()
def score_resume_with_openrouter(prompt):
print("model call")
response = requests.post(
url="https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
},
data=json.dumps({
"model": OPENROUTER_MODEL,
"messages": [{"role": "user", "content": prompt}],
},
)
)
# print(response.status_code)
# print(response.json())
res = {}
print(response.status_code)
if response.status_code == 200:
res = response.json()
content = res["choices"][0]['message']['content']
try:
content = content.replace("```json","").replace("```","")
res = json.loads(content)
except Exception as e:
print(e)
# res = raw_output["choices"][0]["message"]["content"]
else:
print("error response")
return res
# print(f"rawraw_output)
# print(response)
# def match_resume_with_job_description(resume, job_description,prompt=""):
# resume_doc = nlp(resume)
# job_doc = nlp(job_description)
# similarity = resume_doc.similarity(job_doc)
# return similarity
def dashboard_callback(request, context):
total_jobs = models.Job.objects.count()
total_candidates = models.Candidate.objects.count()
jobs = models.Job.objects.all()
job_titles = [job.title for job in jobs]
job_app_counts = [job.candidates.count() for job in jobs]
context.update({
"total_jobs": total_jobs,
"total_candidates": total_candidates,
"job_titles": job_titles,
"job_app_counts": job_app_counts,
})
return context
def get_access_token():
"""Obtain an access token using server-to-server OAuth."""
client_id = settings.ZOOM_CLIENT_ID
client_secret = settings.ZOOM_CLIENT_SECRET
auth_url = "https://zoom.us/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
}
data = {
"grant_type": "account_credentials",
"account_id": settings.ZOOM_ACCOUNT_ID,
}
auth = (client_id, client_secret)
response = requests.post(auth_url, headers=headers, data=data, auth=auth)
if response.status_code == 200:
return response.json().get("access_token")
else:
raise Exception(f"Failed to obtain access token: {response.json()}")
def create_zoom_meeting(topic, start_time, duration):
"""
Create a Zoom meeting using the Zoom API.
Args:
topic (str): The topic of the meeting.
start_time (str): The start time of the meeting in ISO 8601 format (e.g., "2023-10-01T10:00:00Z").
duration (int): The duration of the meeting in minutes.
Returns:
dict: A dictionary containing the meeting details if successful, or an error message if failed.
"""
try:
access_token = get_access_token()
meeting_details = {
"topic": topic,
"type": 2,
"start_time": start_time,
"duration": duration,
"timezone": "UTC",
"settings": {
"host_video": True,
"participant_video": True,
"join_before_host": True,
"mute_upon_entry": False,
"approval_type": 2,
"audio": "both",
"auto_recording": "none"
}
}
# Make API request to Zoom to create the meeting
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
response = requests.post(
"https://api.zoom.us/v2/users/me/meetings",
headers=headers,
json=meeting_details
)
# Check response status
if response.status_code == 201:
meeting_data = response.json()
return {
"status": "success",
"message": "Meeting created successfully.",
"meeting_details": {
"join_url": meeting_data['join_url'],
"meeting_id": meeting_data['id'],
"password": meeting_data['password'],
"host_email": meeting_data['host_email']
},
"zoom_gateway_response": meeting_data
}
else:
return {
"status": "error",
"message": "Failed to create meeting.",
"details": response.json()
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
def list_zoom_meetings(next_page_token=None):
"""
List all meetings for a user using the Zoom API.
Args:
next_page_token (str, optional): The token for paginated results. Defaults to None.
Returns:
dict: A dictionary containing the list of meetings or an error message.
"""
try:
access_token = get_access_token()
user_id = 'me'
params = {}
if next_page_token:
params['next_page_token'] = next_page_token
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
response = requests.get(
f"https://api.zoom.us/v2/users/{user_id}/meetings",
headers=headers,
params=params
)
if response.status_code == 200:
meetings_data = response.json()
return {
"status": "success",
"message": "Meetings retrieved successfully.",
"meetings": meetings_data.get("meetings", []),
"next_page_token": meetings_data.get("next_page_token")
}
else:
return {
"status": "error",
"message": "Failed to retrieve meetings.",
"details": response.json()
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
def get_zoom_meeting_details(meeting_id):
"""
Retrieve details of a specific meeting using the Zoom API.
Args:
meeting_id (str): The ID of the meeting to retrieve.
Returns:
dict: A dictionary containing the meeting details or an error message.
"""
try:
access_token = get_access_token()
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
response = requests.get(
f"https://api.zoom.us/v2/meetings/{meeting_id}",
headers=headers
)
if response.status_code == 200:
meeting_data = response.json()
return {
"status": "success",
"message": "Meeting details retrieved successfully.",
"meeting_details": meeting_data
}
else:
return {
"status": "error",
"message": "Failed to retrieve meeting details.",
"details": response.json()
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
def update_zoom_meeting(meeting_id, updated_data):
"""
Update a Zoom meeting using the Zoom API.
Args:
meeting_id (str): The ID of the meeting to update.
updated_data (dict): A dictionary containing the fields to update (e.g., topic, start_time, duration).
Returns:
dict: A dictionary containing the updated meeting details or an error message.
"""
try:
access_token = get_access_token()
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
response = requests.patch(
f"https://api.zoom.us/v2/meetings/{meeting_id}",
headers=headers,
json=updated_data
)
if response.status_code == 204:
return {
"status": "success",
"message": "Meeting updated successfully."
}
else:
print(response.json())
return {
"status": "error",
"message": "Failed to update meeting.",
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
def delete_zoom_meeting(meeting_id):
"""
Delete a Zoom meeting using the Zoom API.
Args:
meeting_id (str): The ID of the meeting to delete.
Returns:
dict: A dictionary indicating success or failure.
"""
try:
access_token = get_access_token()
headers = {
"Authorization": f"Bearer {access_token}"
}
response = requests.delete(
f"https://api.zoom.us/v2/meetings/{meeting_id}",
headers=headers
)
if response.status_code == 204:
return {
"status": "success",
"message": "Meeting deleted successfully."
}
else:
return {
"status": "error",
"message": "Failed to delete meeting.",
"details": response.json()
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
def schedule_interviews(schedule):
"""
Schedule interviews for all candidates in the schedule based on the criteria.
Returns the number of interviews successfully scheduled.
"""
candidates = list(schedule.candidates.all())
print(candidates)
if not candidates:
return 0
# Calculate available time slots
available_slots = get_available_time_slots(schedule)
print(available_slots)
if len(available_slots) < len(candidates):
raise ValueError(f"Not enough available slots. Required: {len(candidates)}, Available: {len(available_slots)}")
# Schedule interviews
scheduled_count = 0
for i, candidate in enumerate(candidates):
slot = available_slots[i]
interview_datetime = datetime.combine(slot['date'], slot['time'])
# Create Zoom meeting
meeting_topic = f"Interview for {schedule.job.title} - {candidate.name}"
meeting = create_zoom_meeting(
topic=meeting_topic,
start_time=interview_datetime,
duration=schedule.interview_duration,
timezone=timezone.get_current_timezone_name()
)
# Create scheduled interview record
scheduled_interview = ScheduledInterview.objects.create(
candidate=candidate,
job=schedule.job,
zoom_meeting=meeting,
schedule=schedule,
interview_date=slot['date'],
interview_time=slot['time']
)
# Send email to candidate
send_interview_email(scheduled_interview)
scheduled_count += 1
return scheduled_count
def send_interview_email(scheduled_interview):
"""
Send an interview invitation email to the candidate.
"""
subject = f"Interview Invitation for {scheduled_interview.job.title}"
context = {
'candidate_name': scheduled_interview.candidate.name,
'job_title': scheduled_interview.job.title,
'company_name': scheduled_interview.job.company.name,
'interview_date': scheduled_interview.interview_date,
'interview_time': scheduled_interview.interview_time,
'join_url': scheduled_interview.zoom_meeting.join_url,
'meeting_id': scheduled_interview.zoom_meeting.meeting_id,
}
# Render email templates
text_message = render_to_string('interviews/email/interview_invitation.txt', context)
html_message = render_to_string('interviews/email/interview_invitation.html', context)
# Send email
send_mail(
subject=subject,
message=text_message,
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=[scheduled_interview.candidate.email],
html_message=html_message,
fail_silently=False,
)
def get_available_time_slots(schedule, breaks=None):
"""
Generate a list of available time slots based on the schedule criteria.
Returns a list of dictionaries with 'date' and 'time' keys.
"""
slots = []
current_date = schedule.start_date
end_date = schedule.end_date
# Convert working days to a set for quick lookup
# working_days should be a list of integers where 0=Monday, 1=Tuesday, etc.
working_days_set = set(int(day) for day in schedule.working_days)
# Parse times
start_time = schedule.start_time
end_time = schedule.end_time
# Calculate slot duration (interview duration + buffer time)
slot_duration = timedelta(minutes=schedule.interview_duration + schedule.buffer_time)
# Debug output - remove in production
print(f"Working days: {working_days_set}")
print(f"Date range: {current_date} to {end_date}")
print(f"Time range: {start_time} to {end_time}")
print(f"Slot duration: {slot_duration}")
print(f"Breaks: {breaks}")
while current_date <= end_date:
# Check if current day is a working day
weekday = current_date.weekday() # Monday is 0, Sunday is 6
print(f"Checking {current_date}, weekday: {weekday}, in working days: {weekday in working_days_set}")
if weekday in working_days_set:
# Generate slots for this day
current_time = start_time
while True:
# Calculate the end time of this slot
slot_end_time = (datetime.combine(current_date, current_time) + slot_duration).time()
# Check if the slot fits within the working hours
if slot_end_time > end_time:
break
# Check if slot conflicts with any break time
conflict_with_break = False
if breaks:
for break_time in breaks:
# Check if the slot overlaps with this break time
if not (current_time >= break_time.end_time or slot_end_time <= break_time.start_time):
conflict_with_break = True
print(f"Slot {current_time}-{slot_end_time} conflicts with break {break_time.start_time}-{break_time.end_time}")
break
if not conflict_with_break:
# Add this slot to available slots
slots.append({
'date': current_date,
'time': current_time
})
print(f"Added slot: {current_date} {current_time}")
# Move to next slot
current_datetime = datetime.combine(current_date, current_time) + slot_duration
current_time = current_datetime.time()
# Move to next day
current_date += timedelta(days=1)
print(f"Total slots generated: {len(slots)}")
return slots