866 lines
27 KiB
Python
866 lines
27 KiB
Python
"""
|
|
Utility functions for recruitment app
|
|
"""
|
|
from recruitment import models
|
|
from django.conf import settings
|
|
from datetime import datetime, timedelta, time, date
|
|
from django.utils import timezone
|
|
from .models import ScheduledInterview
|
|
from django.template.loader import render_to_string
|
|
from django.core.mail import send_mail
|
|
import random
|
|
import os
|
|
import json
|
|
import logging
|
|
import requests
|
|
from PyPDF2 import PdfReader
|
|
from django.conf import settings
|
|
from .models import Settings, Application
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def get_setting(key, default=None):
|
|
"""
|
|
Get a setting value from the database, with fallback to environment variables and default
|
|
|
|
Args:
|
|
key (str): The setting key to retrieve
|
|
default: Default value if not found in database or environment
|
|
|
|
Returns:
|
|
The setting value from database, environment variable, or default
|
|
"""
|
|
try:
|
|
# First try to get from database
|
|
setting = Settings.objects.get(key=key)
|
|
return setting.value
|
|
except Settings.DoesNotExist:
|
|
# Fall back to environment variable
|
|
env_value = os.getenv(key)
|
|
if env_value is not None:
|
|
return env_value
|
|
# Finally return the default
|
|
return default
|
|
except Exception:
|
|
# In case of any database error, fall back to environment or default
|
|
env_value = os.getenv(key)
|
|
if env_value is not None:
|
|
return env_value
|
|
return default
|
|
|
|
|
|
def set_setting(key, value):
|
|
"""
|
|
Set a setting value in the database
|
|
|
|
Args:
|
|
key (str): The setting key
|
|
value: The setting value
|
|
|
|
Returns:
|
|
Settings: The created or updated setting object
|
|
"""
|
|
setting, created = Settings.objects.update_or_create(
|
|
key=key,
|
|
defaults={'value': str(value)}
|
|
)
|
|
return setting
|
|
|
|
|
|
def get_zoom_config():
|
|
"""
|
|
Get all Zoom configuration settings
|
|
|
|
Returns:
|
|
dict: Dictionary containing all Zoom settings
|
|
"""
|
|
return {
|
|
'ZOOM_ACCOUNT_ID': get_setting('ZOOM_ACCOUNT_ID'),
|
|
'ZOOM_CLIENT_ID': get_setting('ZOOM_CLIENT_ID'),
|
|
'ZOOM_CLIENT_SECRET': get_setting('ZOOM_CLIENT_SECRET'),
|
|
'ZOOM_WEBHOOK_API_KEY': get_setting('ZOOM_WEBHOOK_API_KEY'),
|
|
'SECRET_TOKEN': get_setting('SECRET_TOKEN'),
|
|
}
|
|
|
|
|
|
def get_linkedin_config():
|
|
"""
|
|
Get all LinkedIn configuration settings
|
|
|
|
Returns:
|
|
dict: Dictionary containing all LinkedIn settings
|
|
"""
|
|
return {
|
|
'LINKEDIN_CLIENT_ID': get_setting('LINKEDIN_CLIENT_ID'),
|
|
'LINKEDIN_CLIENT_SECRET': get_setting('LINKEDIN_CLIENT_SECRET'),
|
|
'LINKEDIN_REDIRECT_URI': get_setting('LINKEDIN_REDIRECT_URI'),
|
|
}
|
|
|
|
|
|
def get_applications_from_request(request):
|
|
"""
|
|
Extract application IDs from request and return Application objects
|
|
"""
|
|
application_ids = request.POST.getlist("candidate_ids")
|
|
if application_ids:
|
|
return Application.objects.filter(id__in=application_ids)
|
|
return Application.objects.none()
|
|
|
|
|
|
def schedule_interviews(schedule, applications):
|
|
"""
|
|
Schedule interviews for multiple applications based on a schedule template
|
|
"""
|
|
from .models import ScheduledInterview
|
|
from datetime import datetime, timedelta
|
|
|
|
scheduled_interviews = []
|
|
available_slots = get_available_time_slots(schedule)
|
|
|
|
for i, application in enumerate(applications):
|
|
if i < len(available_slots):
|
|
slot = available_slots[i]
|
|
interview = ScheduledInterview.objects.create(
|
|
application=application,
|
|
job=schedule.job,
|
|
interview_date=slot['date'],
|
|
interview_time=slot['time'],
|
|
status='scheduled'
|
|
)
|
|
scheduled_interviews.append(interview)
|
|
|
|
return scheduled_interviews
|
|
|
|
|
|
def get_available_time_slots(schedule):
|
|
"""
|
|
Calculate available time slots for interviews based on schedule
|
|
"""
|
|
from datetime import datetime, timedelta, time
|
|
import calendar
|
|
|
|
slots = []
|
|
current_date = schedule.start_date
|
|
|
|
while current_date <= schedule.end_date:
|
|
# Check if current date is a working day
|
|
weekday = current_date.weekday()
|
|
if str(weekday) in schedule.working_days:
|
|
# Calculate slots for this day
|
|
day_slots = _calculate_day_slots(schedule, current_date)
|
|
slots.extend(day_slots)
|
|
|
|
current_date += timedelta(days=1)
|
|
|
|
return slots
|
|
|
|
|
|
def _calculate_day_slots(schedule, date):
|
|
"""
|
|
Calculate available slots for a specific day
|
|
"""
|
|
from datetime import datetime, timedelta, time
|
|
|
|
slots = []
|
|
current_time = schedule.start_time
|
|
end_time = schedule.end_time
|
|
|
|
# Convert to datetime for easier calculation
|
|
current_datetime = datetime.combine(date, current_time)
|
|
end_datetime = datetime.combine(date, end_time)
|
|
|
|
# Calculate break times
|
|
break_start = None
|
|
break_end = None
|
|
if schedule.break_start_time and schedule.break_end_time:
|
|
break_start = datetime.combine(date, schedule.break_start_time)
|
|
break_end = datetime.combine(date, schedule.break_end_time)
|
|
|
|
while current_datetime + timedelta(minutes=schedule.interview_duration) <= end_datetime:
|
|
# Skip break time
|
|
if break_start and break_end:
|
|
if break_start <= current_datetime < break_end:
|
|
current_datetime = break_end
|
|
continue
|
|
|
|
slots.append({
|
|
'date': date,
|
|
'time': current_datetime.time()
|
|
})
|
|
|
|
# Move to next slot
|
|
current_datetime += timedelta(minutes=schedule.interview_duration + schedule.buffer_time)
|
|
|
|
return slots
|
|
|
|
|
|
def json_to_markdown_table(data):
|
|
"""
|
|
Convert JSON data to markdown table format
|
|
"""
|
|
if not data:
|
|
return ""
|
|
|
|
if isinstance(data, list):
|
|
if not data:
|
|
return ""
|
|
|
|
# Get headers from first item
|
|
first_item = data[0]
|
|
if isinstance(first_item, dict):
|
|
headers = list(first_item.keys())
|
|
rows = []
|
|
for item in data:
|
|
row = []
|
|
for header in headers:
|
|
value = item.get(header, '')
|
|
if isinstance(value, (dict, list)):
|
|
value = str(value)
|
|
row.append(str(value))
|
|
rows.append(row)
|
|
else:
|
|
# Simple list
|
|
headers = ['Value']
|
|
rows = [[str(item)] for item in data]
|
|
elif isinstance(data, dict):
|
|
headers = ['Key', 'Value']
|
|
rows = []
|
|
for key, value in data.items():
|
|
if isinstance(value, (dict, list)):
|
|
value = str(value)
|
|
rows.append([str(key), str(value)])
|
|
else:
|
|
# Single value
|
|
return str(data)
|
|
|
|
# Build markdown table
|
|
if not headers or not rows:
|
|
return str(data)
|
|
|
|
# Header row
|
|
table = "| " + " | ".join(headers) + " |\n"
|
|
|
|
# Separator row
|
|
table += "| " + " | ".join(["---"] * len(headers)) + " |\n"
|
|
|
|
# Data rows
|
|
for row in rows:
|
|
# Escape pipe characters in cells
|
|
escaped_row = [cell.replace("|", "\\|") for cell in row]
|
|
table += "| " + " | ".join(escaped_row) + " |\n"
|
|
|
|
return table
|
|
|
|
|
|
def initialize_default_settings():
|
|
"""
|
|
Initialize default settings in the database from current hardcoded values
|
|
This should be run once to migrate existing settings
|
|
"""
|
|
# Zoom settings
|
|
zoom_settings = {
|
|
'ZOOM_ACCOUNT_ID': getattr(settings, 'ZOOM_ACCOUNT_ID', ''),
|
|
'ZOOM_CLIENT_ID': getattr(settings, 'ZOOM_CLIENT_ID', ''),
|
|
'ZOOM_CLIENT_SECRET': getattr(settings, 'ZOOM_CLIENT_SECRET', ''),
|
|
'ZOOM_WEBHOOK_API_KEY': getattr(settings, 'ZOOM_WEBHOOK_API_KEY', ''),
|
|
'SECRET_TOKEN': getattr(settings, 'SECRET_TOKEN', ''),
|
|
}
|
|
|
|
# LinkedIn settings
|
|
linkedin_settings = {
|
|
'LINKEDIN_CLIENT_ID': getattr(settings, 'LINKEDIN_CLIENT_ID', ''),
|
|
'LINKEDIN_CLIENT_SECRET': getattr(settings, 'LINKEDIN_CLIENT_SECRET', ''),
|
|
'LINKEDIN_REDIRECT_URI': getattr(settings, 'LINKEDIN_REDIRECT_URI', ''),
|
|
}
|
|
|
|
# Create settings if they don't exist
|
|
all_settings = {**zoom_settings, **linkedin_settings}
|
|
|
|
for key, value in all_settings.items():
|
|
if value: # Only set if value exists
|
|
set_setting(key, value)
|
|
|
|
|
|
|
|
#####################################
|
|
|
|
|
|
def extract_text_from_pdf(file_path):
|
|
print("text extraction")
|
|
text = ""
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
reader = PdfReader(f)
|
|
for page in reader.pages:
|
|
text += (page.extract_text() or "")
|
|
except Exception as e:
|
|
logger.error(f"PDF extraction failed: {e}")
|
|
raise
|
|
return text.strip()
|
|
|
|
def score_resume_with_openrouter(prompt):
|
|
print("model call")
|
|
OPENROUTER_API_URL = get_setting('OPENROUTER_API_URL')
|
|
OPENROUTER_API_KEY = get_setting('OPENROUTER_API_KEY')
|
|
OPENROUTER_MODEL = get_setting('OPENROUTER_MODEL')
|
|
|
|
response = requests.post(
|
|
url=OPENROUTER_API_URL,
|
|
headers={
|
|
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
data=json.dumps({
|
|
"model": OPENROUTER_MODEL,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
},
|
|
)
|
|
)
|
|
# print(response.status_code)
|
|
# print(response.json())
|
|
res = {}
|
|
if response.status_code == 200:
|
|
res = response.json()
|
|
content = res["choices"][0]['message']['content']
|
|
try:
|
|
|
|
content = content.replace("```json","").replace("```","")
|
|
|
|
res = json.loads(content)
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
# res = raw_output["choices"][0]["message"]["content"]
|
|
else:
|
|
print("error response")
|
|
return res
|
|
# print(f"rawraw_output)
|
|
# print(response)
|
|
|
|
|
|
|
|
# def match_resume_with_job_description(resume, job_description,prompt=""):
|
|
# resume_doc = nlp(resume)
|
|
# job_doc = nlp(job_description)
|
|
# similarity = resume_doc.similarity(job_doc)
|
|
# return similarity
|
|
|
|
def dashboard_callback(request, context):
|
|
total_jobs = models.Job.objects.count()
|
|
total_candidates = models.Candidate.objects.count()
|
|
jobs = models.Job.objects.all()
|
|
job_titles = [job.title for job in jobs]
|
|
job_app_counts = [job.candidates.count() for job in jobs]
|
|
|
|
context.update({
|
|
"total_jobs": total_jobs,
|
|
"total_candidates": total_candidates,
|
|
"job_titles": job_titles,
|
|
"job_app_counts": job_app_counts,
|
|
})
|
|
return context
|
|
|
|
|
|
|
|
|
|
def get_access_token():
|
|
"""Obtain an access token using server-to-server OAuth."""
|
|
ZOOM_ACCOUNT_ID = get_setting("ZOOM_ACCOUNT_ID")
|
|
ZOOM_CLIENT_ID = get_setting("ZOOM_CLIENT_ID")
|
|
ZOOM_CLIENT_SECRET = get_setting("ZOOM_CLIENT_SECRET")
|
|
ZOOM_AUTH_URL = get_setting("ZOOM_AUTH_URL")
|
|
|
|
headers = {
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
}
|
|
data = {
|
|
"grant_type": "account_credentials",
|
|
"account_id": ZOOM_ACCOUNT_ID,
|
|
}
|
|
|
|
auth = (ZOOM_CLIENT_ID, ZOOM_CLIENT_SECRET)
|
|
|
|
response = requests.post(ZOOM_AUTH_URL, headers=headers, data=data, auth=auth)
|
|
|
|
if response.status_code == 200:
|
|
return response.json().get("access_token")
|
|
else:
|
|
raise Exception(f"Failed to obtain access token: {response.json()}")
|
|
|
|
def create_zoom_meeting(topic, start_time, duration):
|
|
"""
|
|
Create a Zoom meeting using the Zoom API.
|
|
|
|
Args:
|
|
topic (str): The topic of the meeting.
|
|
start_time (str): The start time of the meeting in ISO 8601 format (e.g., "2023-10-01T10:00:00Z").
|
|
duration (int): The duration of the meeting in minutes.
|
|
|
|
Returns:
|
|
dict: A dictionary containing the meeting details if successful, or an error message if failed.
|
|
"""
|
|
try:
|
|
access_token = get_access_token()
|
|
|
|
meeting_details = {
|
|
"topic": topic,
|
|
"type": 2,
|
|
"start_time": start_time.isoformat() + "Z",
|
|
"duration": duration,
|
|
"timezone": "UTC",
|
|
"settings": {
|
|
"host_video": True,
|
|
"participant_video": True,
|
|
"join_before_host": True,
|
|
"mute_upon_entry": False,
|
|
"approval_type": 2,
|
|
"audio": "both",
|
|
"auto_recording": "none"
|
|
}
|
|
}
|
|
|
|
# Make API request to Zoom to create the meeting
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
ZOOM_MEETING_URL = get_setting('ZOOM_MEETING_URL')
|
|
response = requests.post(
|
|
ZOOM_MEETING_URL,
|
|
headers=headers,
|
|
json=meeting_details
|
|
)
|
|
|
|
# Check response status
|
|
if response.status_code == 201:
|
|
meeting_data = response.json()
|
|
return {
|
|
"status": "success",
|
|
"message": "Meeting created successfully.",
|
|
"meeting_details": {
|
|
"join_url": meeting_data['join_url'],
|
|
"meeting_id": meeting_data['id'],
|
|
"password": meeting_data['password'],
|
|
"host_email": meeting_data['host_email']
|
|
},
|
|
"zoom_gateway_response": meeting_data
|
|
}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": "Failed to create meeting.",
|
|
"details": response.json()
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"message": str(e)
|
|
}
|
|
|
|
|
|
def list_zoom_meetings(next_page_token=None):
|
|
"""
|
|
List all meetings for a user using the Zoom API.
|
|
|
|
Args:
|
|
next_page_token (str, optional): The token for paginated results. Defaults to None.
|
|
|
|
Returns:
|
|
dict: A dictionary containing the list of meetings or an error message.
|
|
"""
|
|
try:
|
|
access_token = get_access_token()
|
|
user_id = 'me'
|
|
|
|
params = {}
|
|
if next_page_token:
|
|
params['next_page_token'] = next_page_token
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
response = requests.get(
|
|
f"https://api.zoom.us/v2/users/{user_id}/meetings",
|
|
headers=headers,
|
|
params=params
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
meetings_data = response.json()
|
|
return {
|
|
"status": "success",
|
|
"message": "Meetings retrieved successfully.",
|
|
"meetings": meetings_data.get("meetings", []),
|
|
"next_page_token": meetings_data.get("next_page_token")
|
|
}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": "Failed to retrieve meetings.",
|
|
"details": response.json()
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"message": str(e)
|
|
}
|
|
|
|
|
|
def get_zoom_meeting_details(meeting_id):
|
|
"""
|
|
Retrieve details of a specific meeting using the Zoom API.
|
|
|
|
Args:
|
|
meeting_id (str): The ID of the meeting to retrieve.
|
|
|
|
Returns:
|
|
dict: A dictionary containing the meeting details or an error message.
|
|
Date/datetime fields in 'meeting_details' will be ISO format strings.
|
|
"""
|
|
try:
|
|
access_token = get_access_token()
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
response = requests.get(
|
|
f"https://api.zoom.us/v2/meetings/{meeting_id}",
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
meeting_data = response.json()
|
|
datetime_fields = [
|
|
'start_time', 'created_at', 'updated_at',
|
|
'password_changed_at', 'host_join_before_start_time',
|
|
'audio_recording_start', 'recording_files_end' # Add any other known datetime fields
|
|
]
|
|
for field_name in datetime_fields:
|
|
if field_name in meeting_data and meeting_data[field_name] is not None:
|
|
try:
|
|
# Convert ISO 8601 string to datetime object, then back to ISO string
|
|
# This ensures consistent string format, handling 'Z' for UTC
|
|
dt_obj = datetime.fromisoformat(meeting_data[field_name].replace('Z', '+00:00'))
|
|
meeting_data[field_name] = dt_obj.isoformat()
|
|
except (ValueError, TypeError) as e:
|
|
logger.warning(
|
|
f"Could not parse or re-serialize datetime field '{field_name}' "
|
|
f"for meeting {meeting_id}: {e}. Original value: '{meeting_data[field_name]}'"
|
|
)
|
|
# Keep original string if re-serialization fails, or set to None
|
|
# meeting_data[field_name] = None
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Meeting details retrieved successfully.",
|
|
"meeting_details": meeting_data
|
|
}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": "Failed to retrieve meeting details.",
|
|
"details": response.json()
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"message": str(e)
|
|
}
|
|
|
|
|
|
def update_zoom_meeting(meeting_id, updated_data):
|
|
"""
|
|
Update a Zoom meeting using the Zoom API.
|
|
|
|
Args:
|
|
meeting_id (str): The ID of the meeting to update.
|
|
updated_data (dict): A dictionary containing the fields to update (e.g., topic, start_time, duration).
|
|
|
|
Returns:
|
|
dict: A dictionary containing the updated meeting details or an error message.
|
|
"""
|
|
try:
|
|
access_token = get_access_token()
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
response = requests.patch(
|
|
f"https://api.zoom.us/v2/meetings/{meeting_id}/",
|
|
headers=headers,
|
|
json=updated_data
|
|
)
|
|
|
|
print(response.status_code)
|
|
|
|
if response.status_code == 204:
|
|
return {
|
|
"status": "success",
|
|
"message": "Meeting updated successfully."
|
|
}
|
|
else:
|
|
print(response.json())
|
|
return {
|
|
"status": "error",
|
|
"message": "Failed to update meeting.",
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"message": str(e)
|
|
}
|
|
|
|
|
|
def delete_zoom_meeting(meeting_id):
|
|
"""
|
|
Delete a Zoom meeting using the Zoom API.
|
|
|
|
Args:
|
|
meeting_id (str): The ID of the meeting to delete.
|
|
|
|
Returns:
|
|
dict: A dictionary indicating success or failure.
|
|
"""
|
|
try:
|
|
access_token = get_access_token()
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}"
|
|
}
|
|
response = requests.delete(
|
|
f"https://api.zoom.us/v2/meetings/{meeting_id}",
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code == 204:
|
|
return {
|
|
"status": "success",
|
|
"message": "Meeting deleted successfully."
|
|
}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": "Failed to delete meeting.",
|
|
"details": response.json()
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"message": str(e)
|
|
}
|
|
|
|
def schedule_interviews(schedule):
|
|
"""
|
|
Schedule interviews for all candidates in the schedule based on the criteria.
|
|
Returns the number of interviews successfully scheduled.
|
|
"""
|
|
candidates = list(schedule.candidates.all())
|
|
if not candidates:
|
|
return 0
|
|
|
|
# Calculate available time slots
|
|
available_slots = get_available_time_slots(schedule)
|
|
|
|
if len(available_slots) < len(candidates):
|
|
raise ValueError(f"Not enough available slots. Required: {len(candidates)}, Available: {len(available_slots)}")
|
|
|
|
# Schedule interviews
|
|
scheduled_count = 0
|
|
for i, candidate in enumerate(candidates):
|
|
slot = available_slots[i]
|
|
interview_datetime = datetime.combine(slot['date'], slot['time'])
|
|
|
|
# Create Zoom meeting
|
|
meeting_topic = f"Interview for {schedule.job.title} - {candidate.name}"
|
|
meeting = create_zoom_meeting(
|
|
topic=meeting_topic,
|
|
start_time=interview_datetime,
|
|
duration=schedule.interview_duration,
|
|
timezone=timezone.get_current_timezone_name()
|
|
)
|
|
|
|
# Create scheduled interview record
|
|
scheduled_interview = ScheduledInterview.objects.create(
|
|
candidate=candidate,
|
|
job=schedule.job,
|
|
zoom_meeting=meeting,
|
|
schedule=schedule,
|
|
interview_date=slot['date'],
|
|
interview_time=slot['time']
|
|
)
|
|
candidate.interview_date=interview_datetime
|
|
# Send email to candidate
|
|
send_interview_email(scheduled_interview)
|
|
|
|
scheduled_count += 1
|
|
|
|
return scheduled_count
|
|
|
|
def send_interview_email(scheduled_interview):
|
|
"""
|
|
Send an interview invitation email to the candidate.
|
|
"""
|
|
subject = f"Interview Invitation for {scheduled_interview.job.title}"
|
|
|
|
context = {
|
|
'candidate_name': scheduled_interview.candidate.name,
|
|
'job_title': scheduled_interview.job.title,
|
|
'company_name': scheduled_interview.job.company.name,
|
|
'interview_date': scheduled_interview.interview_date,
|
|
'interview_time': scheduled_interview.interview_time,
|
|
'join_url': scheduled_interview.zoom_meeting.join_url,
|
|
'meeting_id': scheduled_interview.zoom_meeting.meeting_id,
|
|
}
|
|
|
|
# Render email templates
|
|
text_message = render_to_string('interviews/email/interview_invitation.txt', context)
|
|
html_message = render_to_string('interviews/email/interview_invitation.html', context)
|
|
|
|
# Send email
|
|
send_mail(
|
|
subject=subject,
|
|
message=text_message,
|
|
from_email=settings.DEFAULT_FROM_EMAIL,
|
|
recipient_list=[scheduled_interview.candidate.email],
|
|
html_message=html_message,
|
|
fail_silently=False,
|
|
)
|
|
|
|
def get_available_time_slots(schedule):
|
|
"""
|
|
Generate a list of available time slots based on the schedule criteria.
|
|
Returns a list of dictionaries with 'date' and 'time' keys.
|
|
"""
|
|
slots = []
|
|
current_date = schedule.start_date
|
|
end_date = schedule.end_date
|
|
|
|
# Convert working days to a set for quick lookup
|
|
working_days_set = set(int(day) for day in schedule.working_days)
|
|
|
|
# Parse times
|
|
start_time = schedule.start_time
|
|
end_time = schedule.end_time
|
|
|
|
# Calculate slot duration (interview duration + buffer time)
|
|
slot_duration = timedelta(minutes=schedule.interview_duration + schedule.buffer_time)
|
|
|
|
# Get breaks from the schedule
|
|
breaks = schedule.breaks if hasattr(schedule, 'breaks') and schedule.breaks else []
|
|
|
|
while current_date <= end_date:
|
|
# Check if current day is a working day
|
|
weekday = current_date.weekday() # Monday is 0, Sunday is 6
|
|
|
|
if weekday in working_days_set:
|
|
# Generate slots for this day
|
|
current_time = start_time
|
|
|
|
while True:
|
|
# Calculate the end time of this slot
|
|
slot_end_time = (datetime.combine(current_date, current_time) + slot_duration).time()
|
|
|
|
# Check if the slot fits within the working hours
|
|
if slot_end_time > end_time:
|
|
break
|
|
|
|
# Check if slot conflicts with any break time
|
|
conflict_with_break = False
|
|
for break_data in breaks:
|
|
# Parse break times
|
|
try:
|
|
break_start = datetime.strptime(break_data['start_time'], '%H:%M:%S').time()
|
|
break_end = datetime.strptime(break_data['end_time'], '%H:%M:%S').time()
|
|
|
|
# Check if the slot overlaps with this break time
|
|
if not (current_time >= break_end or slot_end_time <= break_start):
|
|
conflict_with_break = True
|
|
break
|
|
except (ValueError, KeyError) as e:
|
|
continue
|
|
|
|
if not conflict_with_break:
|
|
# Add this slot to available slots
|
|
slots.append({
|
|
'date': current_date,
|
|
'time': current_time
|
|
})
|
|
|
|
# Move to next slot
|
|
current_datetime = datetime.combine(current_date, current_time) + slot_duration
|
|
current_time = current_datetime.time()
|
|
|
|
# Move to next day
|
|
current_date += timedelta(days=1)
|
|
|
|
return slots
|
|
|
|
|
|
def json_to_markdown_table(data_list):
|
|
if not data_list:
|
|
return ""
|
|
|
|
headers = data_list[0].keys()
|
|
markdown = "| " + " | ".join(headers) + " |\n"
|
|
markdown += "| " + " | ".join(["---"] * len(headers)) + " |\n"
|
|
|
|
for row in data_list:
|
|
values = [str(row.get(header, "")) for header in headers]
|
|
markdown += "| " + " | ".join(values) + " |\n"
|
|
return markdown
|
|
|
|
|
|
def get_applications_from_request(request):
|
|
for c in request.POST.items():
|
|
try:
|
|
yield models.Application.objects.get(pk=c[0])
|
|
except Exception as e:
|
|
logger.error(e)
|
|
yield None
|
|
|
|
|
|
|
|
def update_meeting(instance, updated_data):
|
|
result = update_zoom_meeting(instance.meeting_id, updated_data)
|
|
if result["status"] == "success":
|
|
details_result = get_zoom_meeting_details(instance.meeting_id)
|
|
|
|
if details_result["status"] == "success":
|
|
zoom_details = details_result["meeting_details"]
|
|
|
|
instance.topic = zoom_details.get("topic", instance.topic)
|
|
instance.duration = zoom_details.get("duration", instance.duration)
|
|
instance.details_url = zoom_details.get("join_url", instance.details_url)
|
|
instance.password = zoom_details.get("password", instance.password)
|
|
instance.status = zoom_details.get("status")
|
|
|
|
instance.zoom_gateway_response = details_result.get("meeting_details") # Store full response
|
|
instance.save()
|
|
logger.info(f"Successfully updated Zoom meeting {instance.meeting_id}.")
|
|
return {"status": "success", "message": "Zoom meeting updated successfully."}
|
|
elif details_result["status"] == "error":
|
|
# If fetching details fails, save with form data and log a warning
|
|
logger.warning(
|
|
f"Successfully updated Zoom meeting {instance.meeting_id}, but failed to fetch updated details. "
|
|
f"Error: {details_result.get('message', 'Unknown error')}"
|
|
)
|
|
return {"status": "success", "message": "Zoom meeting updated successfully."}
|
|
|
|
logger.warning(f"Failed to update Zoom meeting {instance.meeting_id}. Error: {result.get('message', 'Unknown error')}")
|
|
return {"status": "error", "message": result.get("message", "Zoom meeting update failed.")}
|
|
|
|
|
|
|
|
|
|
|
|
def generate_random_password():
|
|
import string,random
|
|
|
|
return "".join(random.choices(string.ascii_letters + string.digits, k=12)) |