# import os # import fitz # PyMuPDF # import spacy # import requests from recruitment import models from django.conf import settings from datetime import datetime, timedelta, time, date from django.utils import timezone from .models import ScheduledInterview from django.template.loader import render_to_string from django.core.mail import send_mail # nlp = spacy.load("en_core_web_sm") # def extract_text_from_pdf(pdf_path): # text = "" # with fitz.open(pdf_path) as doc: # for page in doc: # text += page.get_text() # return text # def extract_summary_from_pdf(pdf_path): # if not os.path.exists(pdf_path): # return {'error': 'File not found'} # text = extract_text_from_pdf(pdf_path) # doc = nlp(text) # summary = { # 'name': doc.ents[0].text if doc.ents else '', # 'skills': [chunk.text for chunk in doc.noun_chunks if len(chunk.text.split()) > 1], # 'summary': text[:500] # } # return summary import requests from PyPDF2 import PdfReader import os import json import logging logger = logging.getLogger(__name__) OPENROUTER_API_KEY ='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' OPENROUTER_MODEL = 'qwen/qwen-2.5-72b-instruct:free' if not OPENROUTER_API_KEY: logger.warning("OPENROUTER_API_KEY not set. Resume scoring will be skipped.") def extract_text_from_pdf(file_path): print("text extraction") text = "" try: with open(file_path, "rb") as f: reader = PdfReader(f) for page in reader.pages: text += (page.extract_text() or "") except Exception as e: logger.error(f"PDF extraction failed: {e}") raise return text.strip() def score_resume_with_openrouter(prompt): print("model call") response = requests.post( url="https://openrouter.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", }, data=json.dumps({ "model": OPENROUTER_MODEL, "messages": [{"role": "user", "content": prompt}], }, ) ) # print(response.status_code) # print(response.json()) res = {} print(response.status_code) if response.status_code == 200: res = response.json() content = res["choices"][0]['message']['content'] try: content = content.replace("```json","").replace("```","") res = json.loads(content) except Exception as e: print(e) # res = raw_output["choices"][0]["message"]["content"] else: print("error response") return res # print(f"rawraw_output) # print(response) # def match_resume_with_job_description(resume, job_description,prompt=""): # resume_doc = nlp(resume) # job_doc = nlp(job_description) # similarity = resume_doc.similarity(job_doc) # return similarity def dashboard_callback(request, context): total_jobs = models.Job.objects.count() total_candidates = models.Candidate.objects.count() jobs = models.Job.objects.all() job_titles = [job.title for job in jobs] job_app_counts = [job.candidates.count() for job in jobs] context.update({ "total_jobs": total_jobs, "total_candidates": total_candidates, "job_titles": job_titles, "job_app_counts": job_app_counts, }) return context def get_access_token(): """Obtain an access token using server-to-server OAuth.""" client_id = settings.ZOOM_CLIENT_ID client_secret = settings.ZOOM_CLIENT_SECRET auth_url = "https://zoom.us/oauth/token" headers = { "Content-Type": "application/x-www-form-urlencoded", } data = { "grant_type": "account_credentials", "account_id": settings.ZOOM_ACCOUNT_ID, } auth = (client_id, client_secret) response = requests.post(auth_url, headers=headers, data=data, auth=auth) if response.status_code == 200: return response.json().get("access_token") else: raise Exception(f"Failed to obtain access token: {response.json()}") def create_zoom_meeting(topic, start_time, duration): """ Create a Zoom meeting using the Zoom API. Args: topic (str): The topic of the meeting. start_time (str): The start time of the meeting in ISO 8601 format (e.g., "2023-10-01T10:00:00Z"). duration (int): The duration of the meeting in minutes. Returns: dict: A dictionary containing the meeting details if successful, or an error message if failed. """ try: access_token = get_access_token() meeting_details = { "topic": topic, "type": 2, "start_time": start_time.isoformat() + "Z", "duration": duration, "timezone": "UTC", "settings": { "host_video": True, "participant_video": True, "join_before_host": True, "mute_upon_entry": False, "approval_type": 2, "audio": "both", "auto_recording": "none" } } # Make API request to Zoom to create the meeting headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } response = requests.post( "https://api.zoom.us/v2/users/me/meetings", headers=headers, json=meeting_details ) # Check response status if response.status_code == 201: meeting_data = response.json() return { "status": "success", "message": "Meeting created successfully.", "meeting_details": { "join_url": meeting_data['join_url'], "meeting_id": meeting_data['id'], "password": meeting_data['password'], "host_email": meeting_data['host_email'] }, "zoom_gateway_response": meeting_data } else: return { "status": "error", "message": "Failed to create meeting.", "details": response.json() } except Exception as e: return { "status": "error", "message": str(e) } def list_zoom_meetings(next_page_token=None): """ List all meetings for a user using the Zoom API. Args: next_page_token (str, optional): The token for paginated results. Defaults to None. Returns: dict: A dictionary containing the list of meetings or an error message. """ try: access_token = get_access_token() user_id = 'me' params = {} if next_page_token: params['next_page_token'] = next_page_token headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } response = requests.get( f"https://api.zoom.us/v2/users/{user_id}/meetings", headers=headers, params=params ) if response.status_code == 200: meetings_data = response.json() return { "status": "success", "message": "Meetings retrieved successfully.", "meetings": meetings_data.get("meetings", []), "next_page_token": meetings_data.get("next_page_token") } else: return { "status": "error", "message": "Failed to retrieve meetings.", "details": response.json() } except Exception as e: return { "status": "error", "message": str(e) } def get_zoom_meeting_details(meeting_id): """ Retrieve details of a specific meeting using the Zoom API. Args: meeting_id (str): The ID of the meeting to retrieve. Returns: dict: A dictionary containing the meeting details or an error message. Date/datetime fields in 'meeting_details' will be ISO format strings. """ try: access_token = get_access_token() headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } response = requests.get( f"https://api.zoom.us/v2/meetings/{meeting_id}", headers=headers ) if response.status_code == 200: meeting_data = response.json() datetime_fields = [ 'start_time', 'created_at', 'updated_at', 'password_changed_at', 'host_join_before_start_time', 'audio_recording_start', 'recording_files_end' # Add any other known datetime fields ] for field_name in datetime_fields: if field_name in meeting_data and meeting_data[field_name] is not None: try: # Convert ISO 8601 string to datetime object, then back to ISO string # This ensures consistent string format, handling 'Z' for UTC dt_obj = datetime.fromisoformat(meeting_data[field_name].replace('Z', '+00:00')) meeting_data[field_name] = dt_obj.isoformat() except (ValueError, TypeError) as e: logger.warning( f"Could not parse or re-serialize datetime field '{field_name}' " f"for meeting {meeting_id}: {e}. Original value: '{meeting_data[field_name]}'" ) # Keep original string if re-serialization fails, or set to None # meeting_data[field_name] = None return { "status": "success", "message": "Meeting details retrieved successfully.", "meeting_details": meeting_data } else: return { "status": "error", "message": "Failed to retrieve meeting details.", "details": response.json() } except Exception as e: return { "status": "error", "message": str(e) } def update_zoom_meeting(meeting_id, updated_data): """ Update a Zoom meeting using the Zoom API. Args: meeting_id (str): The ID of the meeting to update. updated_data (dict): A dictionary containing the fields to update (e.g., topic, start_time, duration). Returns: dict: A dictionary containing the updated meeting details or an error message. """ try: access_token = get_access_token() headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } response = requests.patch( f"https://api.zoom.us/v2/meetings/{meeting_id}/", headers=headers, json=updated_data ) print(response.status_code) if response.status_code == 204: return { "status": "success", "message": "Meeting updated successfully." } else: print(response.json()) return { "status": "error", "message": "Failed to update meeting.", } except Exception as e: return { "status": "error", "message": str(e) } def delete_zoom_meeting(meeting_id): """ Delete a Zoom meeting using the Zoom API. Args: meeting_id (str): The ID of the meeting to delete. Returns: dict: A dictionary indicating success or failure. """ try: access_token = get_access_token() headers = { "Authorization": f"Bearer {access_token}" } response = requests.delete( f"https://api.zoom.us/v2/meetings/{meeting_id}", headers=headers ) if response.status_code == 204: return { "status": "success", "message": "Meeting deleted successfully." } else: return { "status": "error", "message": "Failed to delete meeting.", "details": response.json() } except Exception as e: return { "status": "error", "message": str(e) } def schedule_interviews(schedule): """ Schedule interviews for all candidates in the schedule based on the criteria. Returns the number of interviews successfully scheduled. """ candidates = list(schedule.candidates.all()) if not candidates: return 0 # Calculate available time slots available_slots = get_available_time_slots(schedule) if len(available_slots) < len(candidates): raise ValueError(f"Not enough available slots. Required: {len(candidates)}, Available: {len(available_slots)}") # Schedule interviews scheduled_count = 0 for i, candidate in enumerate(candidates): slot = available_slots[i] interview_datetime = datetime.combine(slot['date'], slot['time']) # Create Zoom meeting meeting_topic = f"Interview for {schedule.job.title} - {candidate.name}" meeting = create_zoom_meeting( topic=meeting_topic, start_time=interview_datetime, duration=schedule.interview_duration, timezone=timezone.get_current_timezone_name() ) # Create scheduled interview record scheduled_interview = ScheduledInterview.objects.create( candidate=candidate, job=schedule.job, zoom_meeting=meeting, schedule=schedule, interview_date=slot['date'], interview_time=slot['time'] ) candidate.interview_date=interview_datetime # Send email to candidate send_interview_email(scheduled_interview) scheduled_count += 1 return scheduled_count def send_interview_email(scheduled_interview): """ Send an interview invitation email to the candidate. """ subject = f"Interview Invitation for {scheduled_interview.job.title}" context = { 'candidate_name': scheduled_interview.candidate.name, 'job_title': scheduled_interview.job.title, 'company_name': scheduled_interview.job.company.name, 'interview_date': scheduled_interview.interview_date, 'interview_time': scheduled_interview.interview_time, 'join_url': scheduled_interview.zoom_meeting.join_url, 'meeting_id': scheduled_interview.zoom_meeting.meeting_id, } # Render email templates text_message = render_to_string('interviews/email/interview_invitation.txt', context) html_message = render_to_string('interviews/email/interview_invitation.html', context) # Send email send_mail( subject=subject, message=text_message, from_email=settings.DEFAULT_FROM_EMAIL, recipient_list=[scheduled_interview.candidate.email], html_message=html_message, fail_silently=False, ) def get_available_time_slots(schedule): """ Generate a list of available time slots based on the schedule criteria. Returns a list of dictionaries with 'date' and 'time' keys. """ slots = [] current_date = schedule.start_date end_date = schedule.end_date # Convert working days to a set for quick lookup working_days_set = set(int(day) for day in schedule.working_days) # Parse times start_time = schedule.start_time end_time = schedule.end_time # Calculate slot duration (interview duration + buffer time) slot_duration = timedelta(minutes=schedule.interview_duration + schedule.buffer_time) # Get breaks from the schedule breaks = schedule.breaks if hasattr(schedule, 'breaks') and schedule.breaks else [] while current_date <= end_date: # Check if current day is a working day weekday = current_date.weekday() # Monday is 0, Sunday is 6 if weekday in working_days_set: # Generate slots for this day current_time = start_time while True: # Calculate the end time of this slot slot_end_time = (datetime.combine(current_date, current_time) + slot_duration).time() # Check if the slot fits within the working hours if slot_end_time > end_time: break # Check if slot conflicts with any break time conflict_with_break = False for break_data in breaks: # Parse break times try: break_start = datetime.strptime(break_data['start_time'], '%H:%M:%S').time() break_end = datetime.strptime(break_data['end_time'], '%H:%M:%S').time() # Check if the slot overlaps with this break time if not (current_time >= break_end or slot_end_time <= break_start): conflict_with_break = True break except (ValueError, KeyError) as e: continue if not conflict_with_break: # Add this slot to available slots slots.append({ 'date': current_date, 'time': current_time }) # Move to next slot current_datetime = datetime.combine(current_date, current_time) + slot_duration current_time = current_datetime.time() # Move to next day current_date += timedelta(days=1) return slots def json_to_markdown_table(data_list): if not data_list: return "" headers = data_list[0].keys() markdown = "| " + " | ".join(headers) + " |\n" markdown += "| " + " | ".join(["---"] * len(headers)) + " |\n" for row in data_list: values = [str(row.get(header, "")) for header in headers] markdown += "| " + " | ".join(values) + " |\n" return markdown def get_applications_from_request(request): for c in request.POST.items(): try: yield models.Application.objects.get(pk=c[0]) except Exception as e: logger.error(e) yield None def update_meeting(instance, updated_data): result = update_zoom_meeting(instance.meeting_id, updated_data) if result["status"] == "success": # Fetch the latest details from Zoom after successful update details_result = get_zoom_meeting_details(instance.meeting_id) if details_result["status"] == "success": zoom_details = details_result["meeting_details"] # Update instance with fetched details instance.topic = zoom_details.get("topic", instance.topic) instance.duration = zoom_details.get("duration", instance.duration) # instance.details_url = zoom_details.get("join_url", instance.details_url) instance.password = zoom_details.get("password", instance.password) # Corrected status assignment: instance.status, not instance.password instance.status = zoom_details.get("status") instance.zoom_gateway_response = details_result.get("meeting_details") # Store full response instance.save() logger.info(f"Successfully updated Zoom meeting {instance.meeting_id}.") return {"status": "success", "message": "Zoom meeting updated successfully."} elif details_result["status"] == "error": # If fetching details fails, save with form data and log a warning logger.warning( f"Successfully updated Zoom meeting {instance.meeting_id}, but failed to fetch updated details. " f"Error: {details_result.get('message', 'Unknown error')}" ) return {"status": "success", "message": "Zoom meeting updated successfully."} logger.warning(f"Failed to update Zoom meeting {instance.meeting_id}. Error: {result.get('message', 'Unknown error')}") return {"status": "error", "message": result.get("message", "Zoom meeting update failed.")}