910 lines
28 KiB
Python
910 lines
28 KiB
Python
"""
|
|
Utility functions for recruitment app
|
|
"""
|
|
|
|
from recruitment import models
|
|
from django.conf import settings
|
|
from datetime import datetime, timedelta
|
|
from django.utils import timezone
|
|
from .models import ScheduledInterview
|
|
import os
|
|
import json
|
|
import logging
|
|
import requests
|
|
from PyPDF2 import PdfReader
|
|
from django.conf import settings
|
|
from .models import Settings, Application
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_setting(key, default=None):
|
|
"""
|
|
Get a setting value from the database, with fallback to environment variables and default
|
|
|
|
Args:
|
|
key (str): The setting key to retrieve
|
|
default: Default value if not found in database or environment
|
|
|
|
Returns:
|
|
The setting value from database, environment variable, or default
|
|
"""
|
|
try:
|
|
# First try to get from database
|
|
setting = Settings.objects.get(key=key)
|
|
return setting.value
|
|
except Settings.DoesNotExist:
|
|
# Fall back to environment variable
|
|
env_value = os.getenv(key)
|
|
if env_value is not None:
|
|
return env_value
|
|
# Finally return the default
|
|
return default
|
|
except Exception:
|
|
# In case of any database error, fall back to environment or default
|
|
env_value = os.getenv(key)
|
|
if env_value is not None:
|
|
return env_value
|
|
return default
|
|
|
|
|
|
def set_setting(key, value,name):
|
|
"""
|
|
Set a setting value in the database
|
|
|
|
Args:
|
|
key (str): The setting key
|
|
value: The setting value
|
|
|
|
Returns:
|
|
Settings: The created or updated setting object
|
|
"""
|
|
print(key,value)
|
|
setting, created = Settings.objects.update_or_create(
|
|
key=key, value=value,name=name
|
|
)
|
|
return setting
|
|
|
|
|
|
def get_zoom_config():
|
|
"""
|
|
Get all Zoom configuration settings
|
|
|
|
Returns:
|
|
dict: Dictionary containing all Zoom settings
|
|
"""
|
|
return {
|
|
"ZOOM_ACCOUNT_ID": get_setting("ZOOM_ACCOUNT_ID"),
|
|
"ZOOM_CLIENT_ID": get_setting("ZOOM_CLIENT_ID"),
|
|
"ZOOM_CLIENT_SECRET": get_setting("ZOOM_CLIENT_SECRET"),
|
|
"ZOOM_WEBHOOK_API_KEY": get_setting("ZOOM_WEBHOOK_API_KEY"),
|
|
"SECRET_TOKEN": get_setting("SECRET_TOKEN"),
|
|
}
|
|
|
|
|
|
def get_linkedin_config():
|
|
"""
|
|
Get all LinkedIn configuration settings
|
|
|
|
Returns:
|
|
dict: Dictionary containing all LinkedIn settings
|
|
"""
|
|
return {
|
|
"LINKEDIN_CLIENT_ID": get_setting("LINKEDIN_CLIENT_ID"),
|
|
"LINKEDIN_CLIENT_SECRET": get_setting("LINKEDIN_CLIENT_SECRET"),
|
|
"LINKEDIN_REDIRECT_URI": get_setting("LINKEDIN_REDIRECT_URI"),
|
|
}
|
|
|
|
|
|
def get_applications_from_request(request):
|
|
"""
|
|
Extract application IDs from request and return Application objects
|
|
"""
|
|
application_ids = request.POST.getlist("candidate_ids")
|
|
if application_ids:
|
|
return Application.objects.filter(id__in=application_ids)
|
|
return Application.objects.none()
|
|
|
|
|
|
def schedule_interviews(schedule, applications):
|
|
"""
|
|
Schedule interviews for multiple applications based on a schedule template
|
|
"""
|
|
from .models import ScheduledInterview
|
|
from datetime import datetime, timedelta
|
|
|
|
scheduled_interviews = []
|
|
available_slots = get_available_time_slots(schedule)
|
|
|
|
for i, application in enumerate(applications):
|
|
if i < len(available_slots):
|
|
slot = available_slots[i]
|
|
interview = ScheduledInterview.objects.create(
|
|
application=application,
|
|
job=schedule.job,
|
|
interview_date=slot["date"],
|
|
interview_time=slot["time"],
|
|
status="scheduled",
|
|
)
|
|
scheduled_interviews.append(interview)
|
|
|
|
return scheduled_interviews
|
|
|
|
|
|
def get_available_time_slots(schedule):
|
|
"""
|
|
Calculate available time slots for interviews based on schedule
|
|
"""
|
|
from datetime import datetime, timedelta, time
|
|
import calendar
|
|
|
|
slots = []
|
|
current_date = schedule.start_date
|
|
|
|
while current_date <= schedule.end_date:
|
|
# Check if current date is a working day
|
|
weekday = current_date.weekday()
|
|
if str(weekday) in schedule.working_days:
|
|
# Calculate slots for this day
|
|
day_slots = _calculate_day_slots(schedule, current_date)
|
|
slots.extend(day_slots)
|
|
|
|
current_date += timedelta(days=1)
|
|
|
|
return slots
|
|
|
|
|
|
def _calculate_day_slots(schedule, date):
|
|
"""
|
|
Calculate available slots for a specific day
|
|
"""
|
|
from datetime import datetime, timedelta, time
|
|
|
|
slots = []
|
|
current_time = schedule.start_time
|
|
end_time = schedule.end_time
|
|
|
|
# Convert to datetime for easier calculation
|
|
current_datetime = datetime.combine(date, current_time)
|
|
end_datetime = datetime.combine(date, end_time)
|
|
|
|
# Calculate break times
|
|
break_start = None
|
|
break_end = None
|
|
if schedule.break_start_time and schedule.break_end_time:
|
|
break_start = datetime.combine(date, schedule.break_start_time)
|
|
break_end = datetime.combine(date, schedule.break_end_time)
|
|
|
|
while (
|
|
current_datetime + timedelta(minutes=schedule.interview_duration)
|
|
<= end_datetime
|
|
):
|
|
# Skip break time
|
|
if break_start and break_end:
|
|
if break_start <= current_datetime < break_end:
|
|
current_datetime = break_end
|
|
continue
|
|
|
|
slots.append({"date": date, "time": current_datetime.time()})
|
|
|
|
# Move to next slot
|
|
current_datetime += timedelta(
|
|
minutes=schedule.interview_duration + schedule.buffer_time
|
|
)
|
|
|
|
return slots
|
|
|
|
|
|
def json_to_markdown_table(data):
|
|
"""
|
|
Convert JSON data to markdown table format
|
|
"""
|
|
if not data:
|
|
return ""
|
|
|
|
if isinstance(data, list):
|
|
if not data:
|
|
return ""
|
|
|
|
# Get headers from first item
|
|
first_item = data[0]
|
|
if isinstance(first_item, dict):
|
|
headers = list(first_item.keys())
|
|
rows = []
|
|
for item in data:
|
|
row = []
|
|
for header in headers:
|
|
value = item.get(header, "")
|
|
if isinstance(value, (dict, list)):
|
|
value = str(value)
|
|
row.append(str(value))
|
|
rows.append(row)
|
|
else:
|
|
# Simple list
|
|
headers = ["Value"]
|
|
rows = [[str(item)] for item in data]
|
|
elif isinstance(data, dict):
|
|
headers = ["Key", "Value"]
|
|
rows = []
|
|
for key, value in data.items():
|
|
if isinstance(value, (dict, list)):
|
|
value = str(value)
|
|
rows.append([str(key), str(value)])
|
|
else:
|
|
# Single value
|
|
return str(data)
|
|
|
|
# Build markdown table
|
|
if not headers or not rows:
|
|
return str(data)
|
|
|
|
# Header row
|
|
table = "| " + " | ".join(headers) + " |\n"
|
|
|
|
# Separator row
|
|
table += "| " + " | ".join(["---"] * len(headers)) + " |\n"
|
|
|
|
# Data rows
|
|
for row in rows:
|
|
# Escape pipe characters in cells
|
|
escaped_row = [cell.replace("|", "\\|") for cell in row]
|
|
table += "| " + " | ".join(escaped_row) + " |\n"
|
|
|
|
return table
|
|
|
|
|
|
def initialize_default_settings():
|
|
"""
|
|
Initialize default settings in the database from current hardcoded values
|
|
This should be run once to migrate existing settings
|
|
"""
|
|
# Zoom settings
|
|
zoom_settings = {
|
|
"ZOOM_ACCOUNT_ID": "",
|
|
"ZOOM_CLIENT_ID": "",
|
|
"ZOOM_CLIENT_SECRET": "",
|
|
"ZOOM_WEBHOOK_API_KEY": "",
|
|
"SECRET_TOKEN": "",
|
|
}
|
|
|
|
# LinkedIn settings
|
|
linkedin_settings = {
|
|
"LINKEDIN_CLIENT_ID": "",
|
|
"LINKEDIN_CLIENT_SECRET": "",
|
|
"LINKEDIN_REDIRECT_URI": "",
|
|
}
|
|
|
|
openrouter_settings = {
|
|
"OPENROUTER_API_URL":"",
|
|
"OPENROUTER_API_KEY":"",
|
|
"OPENROUTER_MODEL":""
|
|
}
|
|
|
|
|
|
|
|
# Create settings if they don't exist
|
|
all_settings = {**zoom_settings, **linkedin_settings,**openrouter_settings}
|
|
names=['ZOOM','ZOOM','ZOOM','ZOOM','ZOOM','LINKEDIN','LINKEDIN','LINKEDIN','OPENROUTER','OPENROUTER','OPENROUTER']
|
|
i=0
|
|
for key, value in all_settings.items():
|
|
set_setting(key, value,names[i])
|
|
i=i+1
|
|
|
|
|
|
#####################################
|
|
|
|
|
|
def extract_text_from_pdf(file_path):
|
|
print("text extraction")
|
|
text = ""
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
reader = PdfReader(f)
|
|
for page in reader.pages:
|
|
text += page.extract_text() or ""
|
|
except Exception as e:
|
|
logger.error(f"PDF extraction failed: {e}")
|
|
raise
|
|
return text.strip()
|
|
|
|
|
|
def score_resume_with_openrouter(prompt):
|
|
print("model call")
|
|
OPENROUTER_API_URL = get_setting("OPENROUTER_API_URL")
|
|
OPENROUTER_API_KEY = get_setting("OPENROUTER_API_KEY")
|
|
OPENROUTER_MODEL = get_setting("OPENROUTER_MODEL")
|
|
|
|
response = requests.post(
|
|
url=OPENROUTER_API_URL,
|
|
headers={
|
|
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
data=json.dumps(
|
|
{
|
|
"model": OPENROUTER_MODEL,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
},
|
|
),
|
|
)
|
|
# print(response.status_code)
|
|
# print(response.json())
|
|
res = {}
|
|
if response.status_code == 200:
|
|
res = response.json()
|
|
content = res["choices"][0]["message"]["content"]
|
|
try:
|
|
content = content.replace("```json", "").replace("```", "")
|
|
|
|
res = json.loads(content)
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
# res = raw_output["choices"][0]["message"]["content"]
|
|
else:
|
|
print("error response")
|
|
return res
|
|
# print(f"rawraw_output)
|
|
# print(response)
|
|
|
|
|
|
# def match_resume_with_job_description(resume, job_description,prompt=""):
|
|
# resume_doc = nlp(resume)
|
|
# job_doc = nlp(job_description)
|
|
# similarity = resume_doc.similarity(job_doc)
|
|
# return similarity
|
|
|
|
|
|
def dashboard_callback(request, context):
|
|
total_jobs = models.Job.objects.count()
|
|
total_candidates = models.Candidate.objects.count()
|
|
jobs = models.Job.objects.all()
|
|
job_titles = [job.title for job in jobs]
|
|
job_app_counts = [job.candidates.count() for job in jobs]
|
|
|
|
context.update(
|
|
{
|
|
"total_jobs": total_jobs,
|
|
"total_candidates": total_candidates,
|
|
"job_titles": job_titles,
|
|
"job_app_counts": job_app_counts,
|
|
}
|
|
)
|
|
return context
|
|
|
|
|
|
def get_access_token():
|
|
"""Obtain an access token using server-to-server OAuth."""
|
|
ZOOM_ACCOUNT_ID = get_setting("ZOOM_ACCOUNT_ID")
|
|
ZOOM_CLIENT_ID = get_setting("ZOOM_CLIENT_ID")
|
|
ZOOM_CLIENT_SECRET = get_setting("ZOOM_CLIENT_SECRET")
|
|
ZOOM_AUTH_URL = get_setting("ZOOM_AUTH_URL")
|
|
|
|
headers = {
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
}
|
|
data = {
|
|
"grant_type": "account_credentials",
|
|
"account_id": ZOOM_ACCOUNT_ID,
|
|
}
|
|
|
|
auth = (ZOOM_CLIENT_ID, ZOOM_CLIENT_SECRET)
|
|
|
|
response = requests.post(ZOOM_AUTH_URL, headers=headers, data=data, auth=auth)
|
|
|
|
if response.status_code == 200:
|
|
return response.json().get("access_token")
|
|
else:
|
|
raise Exception(f"Failed to obtain access token: {response.json()}")
|
|
|
|
|
|
def create_zoom_meeting(topic, start_time, duration):
|
|
"""
|
|
Create a Zoom meeting using the Zoom API.
|
|
|
|
Args:
|
|
topic (str): The topic of the meeting.
|
|
start_time (str): The start time of the meeting in ISO 8601 format (e.g., "2023-10-01T10:00:00Z").
|
|
duration (int): The duration of the meeting in minutes.
|
|
|
|
Returns:
|
|
dict: A dictionary containing the meeting details if successful, or an error message if failed.
|
|
"""
|
|
try:
|
|
access_token = get_access_token()
|
|
|
|
zoom_start_time = start_time.strftime("%Y-%m-%dT%H:%M:%S")
|
|
logger.info(zoom_start_time)
|
|
|
|
meeting_details = {
|
|
"topic": topic,
|
|
"type": 2,
|
|
"start_time": zoom_start_time,
|
|
"duration": duration,
|
|
"timezone": "Asia/Riyadh",
|
|
"settings": {
|
|
"host_video": True,
|
|
"participant_video": True,
|
|
"join_before_host": True,
|
|
"mute_upon_entry": False,
|
|
"approval_type": 2,
|
|
"audio": "both",
|
|
"auto_recording": "none",
|
|
},
|
|
}
|
|
|
|
# Make API request to Zoom to create the meeting
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
ZOOM_MEETING_URL = get_setting("ZOOM_MEETING_URL")
|
|
|
|
response = requests.post(
|
|
ZOOM_MEETING_URL, headers=headers, json=meeting_details
|
|
)
|
|
|
|
# Check response status
|
|
if response.status_code == 201:
|
|
meeting_data = response.json()
|
|
logger.info(meeting_data)
|
|
return {
|
|
"status": "success",
|
|
"message": "Meeting created successfully.",
|
|
"meeting_details": {
|
|
"join_url": meeting_data["join_url"],
|
|
"meeting_id": meeting_data["id"],
|
|
"password": meeting_data["password"],
|
|
"host_email": meeting_data["host_email"],
|
|
},
|
|
"zoom_gateway_response": meeting_data,
|
|
}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": "Failed to create meeting.",
|
|
"details": response.json(),
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
|
|
def list_zoom_meetings(next_page_token=None):
|
|
"""
|
|
List all meetings for a user using the Zoom API.
|
|
|
|
Args:
|
|
next_page_token (str, optional): The token for paginated results. Defaults to None.
|
|
|
|
Returns:
|
|
dict: A dictionary containing the list of meetings or an error message.
|
|
"""
|
|
try:
|
|
access_token = get_access_token()
|
|
user_id = "me"
|
|
|
|
params = {}
|
|
if next_page_token:
|
|
params["next_page_token"] = next_page_token
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
response = requests.get(
|
|
f"https://api.zoom.us/v2/users/{user_id}/meetings",
|
|
headers=headers,
|
|
params=params,
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
meetings_data = response.json()
|
|
return {
|
|
"status": "success",
|
|
"message": "Meetings retrieved successfully.",
|
|
"meetings": meetings_data.get("meetings", []),
|
|
"next_page_token": meetings_data.get("next_page_token"),
|
|
}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": "Failed to retrieve meetings.",
|
|
"details": response.json(),
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
|
|
def get_zoom_meeting_details(meeting_id):
|
|
"""
|
|
Retrieve details of a specific meeting using the Zoom API.
|
|
|
|
Args:
|
|
meeting_id (str): The ID of the meeting to retrieve.
|
|
|
|
Returns:
|
|
dict: A dictionary containing the meeting details or an error message.
|
|
Date/datetime fields in 'meeting_details' will be ISO format strings.
|
|
"""
|
|
try:
|
|
access_token = get_access_token()
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
response = requests.get(
|
|
f"https://api.zoom.us/v2/meetings/{meeting_id}", headers=headers
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
meeting_data = response.json()
|
|
datetime_fields = [
|
|
"start_time",
|
|
"created_at",
|
|
"updated_at",
|
|
"password_changed_at",
|
|
"host_join_before_start_time",
|
|
"audio_recording_start",
|
|
"recording_files_end", # Add any other known datetime fields
|
|
]
|
|
for field_name in datetime_fields:
|
|
if field_name in meeting_data and meeting_data[field_name] is not None:
|
|
try:
|
|
# Convert ISO 8601 string to datetime object, then back to ISO string
|
|
# This ensures consistent string format, handling 'Z' for UTC
|
|
dt_obj = datetime.fromisoformat(
|
|
meeting_data[field_name].replace("Z", "+00:00")
|
|
)
|
|
meeting_data[field_name] = dt_obj.isoformat()
|
|
except (ValueError, TypeError) as e:
|
|
logger.warning(
|
|
f"Could not parse or re-serialize datetime field '{field_name}' "
|
|
f"for meeting {meeting_id}: {e}. Original value: '{meeting_data[field_name]}'"
|
|
)
|
|
# Keep original string if re-serialization fails, or set to None
|
|
# meeting_data[field_name] = None
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Meeting details retrieved successfully.",
|
|
"meeting_details": meeting_data,
|
|
}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": "Failed to retrieve meeting details.",
|
|
"details": response.json(),
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
|
|
def update_zoom_meeting(meeting_id, updated_data):
|
|
"""
|
|
Update a Zoom meeting using the Zoom API.
|
|
|
|
Args:
|
|
meeting_id (str): The ID of the meeting to update.
|
|
updated_data (dict): A dictionary containing the fields to update (e.g., topic, start_time, duration).
|
|
|
|
Returns:
|
|
dict: A dictionary containing the updated meeting details or an error message.
|
|
"""
|
|
try:
|
|
access_token = get_access_token()
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
response = requests.patch(
|
|
f"https://api.zoom.us/v2/meetings/{meeting_id}/",
|
|
headers=headers,
|
|
json=updated_data,
|
|
)
|
|
|
|
print(response.status_code)
|
|
|
|
if response.status_code == 204:
|
|
return {"status": "success", "message": "Meeting updated successfully."}
|
|
else:
|
|
print(response.json())
|
|
return {
|
|
"status": "error",
|
|
"message": "Failed to update meeting.",
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
|
|
def delete_zoom_meeting(meeting_id):
|
|
"""
|
|
Delete a Zoom meeting using the Zoom API.
|
|
|
|
Args:
|
|
meeting_id (str): The ID of the meeting to delete.
|
|
|
|
Returns:
|
|
dict: A dictionary indicating success or failure.
|
|
"""
|
|
try:
|
|
access_token = get_access_token()
|
|
headers = {"Authorization": f"Bearer {access_token}"}
|
|
response = requests.delete(
|
|
f"https://api.zoom.us/v2/meetings/{meeting_id}", headers=headers
|
|
)
|
|
|
|
if response.status_code == 204:
|
|
return {"status": "success", "message": "Meeting deleted successfully."}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": "Failed to delete meeting.",
|
|
"details": response.json(),
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
|
|
def schedule_interviews(schedule):
|
|
"""
|
|
Schedule interviews for all candidates in the schedule based on the criteria.
|
|
Returns the number of interviews successfully scheduled.
|
|
"""
|
|
candidates = list(schedule.candidates.all())
|
|
if not candidates:
|
|
return 0
|
|
|
|
# Calculate available time slots
|
|
available_slots = get_available_time_slots(schedule)
|
|
|
|
if len(available_slots) < len(candidates):
|
|
raise ValueError(
|
|
f"Not enough available slots. Required: {len(candidates)}, Available: {len(available_slots)}"
|
|
)
|
|
|
|
# Schedule interviews
|
|
scheduled_count = 0
|
|
for i, candidate in enumerate(candidates):
|
|
slot = available_slots[i]
|
|
interview_datetime = datetime.combine(slot["date"], slot["time"])
|
|
|
|
# Create Zoom meeting
|
|
meeting_topic = f"Interview for {schedule.job.title} - {candidate.name}"
|
|
meeting = create_zoom_meeting(
|
|
topic=meeting_topic,
|
|
start_time=interview_datetime,
|
|
duration=schedule.interview_duration,
|
|
timezone=timezone.get_current_timezone_name(),
|
|
)
|
|
|
|
# Create scheduled interview record
|
|
scheduled_interview = ScheduledInterview.objects.create(
|
|
candidate=candidate,
|
|
job=schedule.job,
|
|
zoom_meeting=meeting,
|
|
schedule=schedule,
|
|
interview_date=slot["date"],
|
|
interview_time=slot["time"],
|
|
)
|
|
candidate.interview_date = interview_datetime
|
|
# Send email to candidate
|
|
send_interview_email(scheduled_interview)
|
|
|
|
scheduled_count += 1
|
|
|
|
return scheduled_count
|
|
|
|
|
|
def send_interview_email(scheduled_interview):
|
|
"""
|
|
Send an interview invitation email to the candidate using the unified email service.
|
|
"""
|
|
try:
|
|
from .services.email_service import UnifiedEmailService
|
|
from .dto.email_dto import EmailConfig, EmailTemplate, EmailPriority
|
|
|
|
# Create unified email service
|
|
service = UnifiedEmailService()
|
|
|
|
# Build interview context using template manager
|
|
context = service.template_manager.build_interview_context(
|
|
scheduled_interview.candidate,
|
|
scheduled_interview.job,
|
|
{
|
|
"topic": f"Interview for {scheduled_interview.job.title}",
|
|
"date_time": scheduled_interview.interview_date,
|
|
"duration": "60 minutes",
|
|
"join_url": scheduled_interview.zoom_meeting.join_url
|
|
if scheduled_interview.zoom_meeting
|
|
else "",
|
|
"meeting_id": scheduled_interview.zoom_meeting.meeting_id
|
|
if scheduled_interview.zoom_meeting
|
|
else "",
|
|
},
|
|
)
|
|
|
|
# Create email configuration
|
|
config = EmailConfig(
|
|
to_email=scheduled_interview.candidate.email,
|
|
subject=service.template_manager.get_subject_line(
|
|
EmailTemplate.INTERVIEW_INVITATION_ALT, context
|
|
),
|
|
template_name=EmailTemplate.INTERVIEW_INVITATION_ALT.value,
|
|
context=context,
|
|
priority=EmailPriority.HIGH,
|
|
)
|
|
|
|
# Send email using unified service
|
|
result = service.send_email(config)
|
|
|
|
if result.success:
|
|
logger.info(
|
|
f"Interview invitation sent to {scheduled_interview.candidate.email}"
|
|
)
|
|
else:
|
|
logger.error(f"Failed to send interview invitation: {result.message}")
|
|
|
|
return result.success
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in send_interview_email: {str(e)}", exc_info=True)
|
|
return False
|
|
|
|
|
|
def get_available_time_slots(schedule):
|
|
"""
|
|
Generate a list of available time slots based on the schedule criteria.
|
|
Returns a list of dictionaries with 'date' and 'time' keys.
|
|
"""
|
|
slots = []
|
|
current_date = schedule.start_date
|
|
end_date = schedule.end_date
|
|
|
|
# Convert working days to a set for quick lookup
|
|
working_days_set = set(int(day) for day in schedule.working_days)
|
|
|
|
# Parse times
|
|
start_time = schedule.start_time
|
|
end_time = schedule.end_time
|
|
|
|
# Calculate slot duration (interview duration + buffer time)
|
|
slot_duration = timedelta(
|
|
minutes=schedule.interview_duration + schedule.buffer_time
|
|
)
|
|
|
|
# Get breaks from the schedule
|
|
breaks = schedule.breaks if hasattr(schedule, "breaks") and schedule.breaks else []
|
|
|
|
while current_date <= end_date:
|
|
# Check if current day is a working day
|
|
weekday = current_date.weekday() # Monday is 0, Sunday is 6
|
|
|
|
if weekday in working_days_set:
|
|
# Generate slots for this day
|
|
current_time = start_time
|
|
|
|
while True:
|
|
# Calculate the end time of this slot
|
|
slot_end_time = (
|
|
datetime.combine(current_date, current_time) + slot_duration
|
|
).time()
|
|
|
|
# Check if the slot fits within the working hours
|
|
if slot_end_time > end_time:
|
|
break
|
|
|
|
# Check if slot conflicts with any break time
|
|
conflict_with_break = False
|
|
for break_data in breaks:
|
|
# Parse break times
|
|
try:
|
|
break_start = datetime.strptime(
|
|
break_data["start_time"], "%H:%M:%S"
|
|
).time()
|
|
break_end = datetime.strptime(
|
|
break_data["end_time"], "%H:%M:%S"
|
|
).time()
|
|
|
|
# Check if the slot overlaps with this break time
|
|
if not (
|
|
current_time >= break_end or slot_end_time <= break_start
|
|
):
|
|
conflict_with_break = True
|
|
break
|
|
except (ValueError, KeyError) as e:
|
|
continue
|
|
|
|
if not conflict_with_break:
|
|
# Add this slot to available slots
|
|
slots.append({"date": current_date, "time": current_time})
|
|
|
|
# Move to next slot
|
|
current_datetime = (
|
|
datetime.combine(current_date, current_time) + slot_duration
|
|
)
|
|
current_time = current_datetime.time()
|
|
|
|
# Move to next day
|
|
current_date += timedelta(days=1)
|
|
|
|
return slots
|
|
|
|
|
|
def json_to_markdown_table(data_list):
|
|
if not data_list:
|
|
return ""
|
|
|
|
headers = data_list[0].keys()
|
|
markdown = "| " + " | ".join(headers) + " |\n"
|
|
markdown += "| " + " | ".join(["---"] * len(headers)) + " |\n"
|
|
|
|
for row in data_list:
|
|
values = [str(row.get(header, "")) for header in headers]
|
|
markdown += "| " + " | ".join(values) + " |\n"
|
|
return markdown
|
|
|
|
|
|
def get_applications_from_request(request):
|
|
for c in request.POST.items():
|
|
try:
|
|
yield models.Application.objects.get(pk=c[0])
|
|
except Exception as e:
|
|
logger.error(e)
|
|
yield None
|
|
|
|
|
|
def update_meeting(instance, updated_data):
|
|
result = update_zoom_meeting(instance.meeting_id, updated_data)
|
|
if result["status"] == "success":
|
|
details_result = get_zoom_meeting_details(instance.meeting_id)
|
|
|
|
if details_result["status"] == "success":
|
|
zoom_details = details_result["meeting_details"]
|
|
|
|
instance.topic = zoom_details.get("topic", instance.topic)
|
|
instance.duration = zoom_details.get("duration", instance.duration)
|
|
instance.details_url = zoom_details.get("join_url", instance.details_url)
|
|
instance.password = zoom_details.get("password", instance.password)
|
|
instance.status = zoom_details.get("status")
|
|
|
|
instance.zoom_gateway_response = details_result.get(
|
|
"meeting_details"
|
|
) # Store full response
|
|
instance.save()
|
|
logger.info(f"Successfully updated Zoom meeting {instance.meeting_id}.")
|
|
return {
|
|
"status": "success",
|
|
"message": "Zoom meeting updated successfully.",
|
|
}
|
|
elif details_result["status"] == "error":
|
|
# If fetching details fails, save with form data and log a warning
|
|
logger.warning(
|
|
f"Successfully updated Zoom meeting {instance.meeting_id}, but failed to fetch updated details. "
|
|
f"Error: {details_result.get('message', 'Unknown error')}"
|
|
)
|
|
return {
|
|
"status": "success",
|
|
"message": "Zoom meeting updated successfully.",
|
|
}
|
|
|
|
logger.warning(
|
|
f"Failed to update Zoom meeting {instance.meeting_id}. Error: {result.get('message', 'Unknown error')}"
|
|
)
|
|
return {
|
|
"status": "error",
|
|
"message": result.get("message", "Zoom meeting update failed."),
|
|
}
|
|
|
|
|
|
def generate_random_password():
|
|
import string, random
|
|
|
|
return "".join(random.choices(string.ascii_letters + string.digits, k=12))
|