1761 lines
66 KiB
Python
1761 lines
66 KiB
Python
import json
|
||
import requests
|
||
from rich import print
|
||
|
||
from django.template.loader import render_to_string
|
||
from django.views.decorators.csrf import csrf_exempt
|
||
from django.views.decorators.http import require_http_methods
|
||
from django.http import JsonResponse
|
||
from datetime import datetime,time,timedelta
|
||
from django.views import View
|
||
from django.db.models import Q
|
||
from django.urls import reverse
|
||
from django.conf import settings
|
||
from django.utils import timezone
|
||
from .forms import (
|
||
CandidateExamDateForm,
|
||
ZoomMeetingForm,
|
||
JobPostingForm,
|
||
FormTemplateForm,
|
||
InterviewScheduleForm,JobPostingStatusForm,
|
||
BreakTimeFormSet,
|
||
JobPostingImageForm
|
||
)
|
||
from rest_framework import viewsets
|
||
from django.contrib import messages
|
||
from django.core.paginator import Paginator
|
||
from .linkedin_service import LinkedInService
|
||
from .serializers import JobPostingSerializer, CandidateSerializer
|
||
from django.shortcuts import get_object_or_404, render, redirect
|
||
from django.views.generic import CreateView, UpdateView, DetailView, ListView
|
||
from .utils import (
|
||
create_zoom_meeting,
|
||
delete_zoom_meeting,
|
||
update_zoom_meeting,
|
||
get_zoom_meeting_details,
|
||
schedule_interviews,
|
||
get_available_time_slots,
|
||
)
|
||
from django.views.decorators.csrf import ensure_csrf_cookie
|
||
from django.views.decorators.http import require_POST
|
||
from .models import (
|
||
FormTemplate,
|
||
FormStage,
|
||
FormField,
|
||
FieldResponse,
|
||
FormSubmission,
|
||
InterviewSchedule,
|
||
BreakTime,
|
||
ZoomMeeting,
|
||
Candidate,
|
||
JobPosting,
|
||
ScheduledInterview
|
||
)
|
||
import logging
|
||
from datastar_py.django import (
|
||
DatastarResponse,
|
||
ServerSentEventGenerator as SSE,
|
||
read_signals,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class JobPostingViewSet(viewsets.ModelViewSet):
|
||
queryset = JobPosting.objects.all()
|
||
serializer_class = JobPostingSerializer
|
||
|
||
|
||
class CandidateViewSet(viewsets.ModelViewSet):
|
||
queryset = Candidate.objects.all()
|
||
serializer_class = CandidateSerializer
|
||
|
||
|
||
class ZoomMeetingCreateView(CreateView):
|
||
model = ZoomMeeting
|
||
template_name = "meetings/create_meeting.html"
|
||
form_class = ZoomMeetingForm
|
||
success_url = "/"
|
||
|
||
def form_valid(self, form):
|
||
instance = form.save(commit=False)
|
||
try:
|
||
topic = instance.topic
|
||
if instance.start_time < timezone.now():
|
||
messages.error(self.request, "Start time must be in the future.")
|
||
return redirect("/create-meeting/", status=400)
|
||
start_time = instance.start_time
|
||
# start_time = instance.start_time.isoformat() + "Z"
|
||
duration = instance.duration
|
||
|
||
result = create_zoom_meeting(topic, start_time, duration)
|
||
print(result)
|
||
if result["status"] == "success":
|
||
instance.meeting_id = result["meeting_details"]["meeting_id"]
|
||
instance.join_url = result["meeting_details"]["join_url"]
|
||
instance.host_email = result["meeting_details"]["host_email"]
|
||
instance.password = result["meeting_details"]["password"]
|
||
instance.status = result["zoom_gateway_response"]["status"]
|
||
instance.zoom_gateway_response = result["zoom_gateway_response"]
|
||
instance.save()
|
||
messages.success(self.request, result["message"])
|
||
|
||
return redirect("/", status=201)
|
||
else:
|
||
messages.error(self.request, result["message"])
|
||
return redirect("/", status=400)
|
||
except Exception as e:
|
||
return redirect("/", status=500)
|
||
|
||
|
||
class ZoomMeetingListView(ListView):
|
||
model = ZoomMeeting
|
||
template_name = "meetings/list_meetings.html"
|
||
context_object_name = "meetings"
|
||
paginate_by = 10
|
||
|
||
def get_queryset(self):
|
||
queryset = super().get_queryset().order_by("-start_time")
|
||
|
||
# Handle search
|
||
search_query = self.request.GET.get("search", "")
|
||
if search_query:
|
||
queryset = queryset.filter(
|
||
Q(topic__icontains=search_query) | Q(meeting_id__icontains=search_query)
|
||
)
|
||
|
||
return queryset
|
||
|
||
def get_context_data(self, **kwargs):
|
||
context = super().get_context_data(**kwargs)
|
||
context["search_query"] = self.request.GET.get("search", "")
|
||
return context
|
||
|
||
|
||
class ZoomMeetingDetailsView(DetailView):
|
||
model = ZoomMeeting
|
||
template_name = "meetings/meeting_details.html"
|
||
context_object_name = "meeting"
|
||
|
||
|
||
class ZoomMeetingUpdateView(UpdateView):
|
||
model = ZoomMeeting
|
||
form_class = ZoomMeetingForm
|
||
context_object_name = "meeting"
|
||
template_name = "meetings/update_meeting.html"
|
||
success_url = "/"
|
||
|
||
# def get_form_kwargs(self):
|
||
# kwargs = super().get_form_kwargs()
|
||
# # Ensure the form is initialized with the instance's current values
|
||
# if self.object:
|
||
# kwargs['initial'] = getattr(kwargs, 'initial', {})
|
||
# initial_start_time = ""
|
||
# if self.object.start_time:
|
||
# try:
|
||
# initial_start_time = self.object.start_time.strftime('%m-%d-%Y,T%H:%M')
|
||
# except AttributeError:
|
||
# print(f"Warning: start_time {self.object.start_time} is not a datetime object.")
|
||
# initial_start_time = ""
|
||
# kwargs['initial']['start_time'] = initial_start_time
|
||
# return kwargs
|
||
|
||
def form_valid(self, form):
|
||
instance = form.save(commit=False)
|
||
updated_data = {
|
||
"topic": instance.topic,
|
||
"start_time": instance.start_time.isoformat() + "Z",
|
||
"duration": instance.duration,
|
||
}
|
||
if instance.start_time < timezone.now():
|
||
messages.error(self.request, "Start time must be in the future.")
|
||
return redirect(f"/update-meeting/{instance.pk}/", status=400)
|
||
result = update_zoom_meeting(instance.meeting_id, updated_data)
|
||
|
||
if result["status"] == "success":
|
||
# Fetch the latest details from Zoom after successful update
|
||
details_result = get_zoom_meeting_details(instance.meeting_id)
|
||
|
||
if details_result["status"] == "success":
|
||
zoom_details = details_result["meeting_details"]
|
||
# Update instance with fetched details
|
||
|
||
instance.topic = zoom_details.get("topic", instance.topic)
|
||
|
||
instance.duration = zoom_details.get("duration", instance.duration)
|
||
instance.join_url = zoom_details.get("join_url", instance.join_url)
|
||
instance.password = zoom_details.get("password", instance.password)
|
||
# Corrected status assignment: instance.status, not instance.password
|
||
instance.status = zoom_details.get("status")
|
||
|
||
instance.zoom_gateway_response = details_result.get("meeting_details") # Store full response
|
||
instance.save()
|
||
messages.success(self.request, result["message"] + " Local data updated from Zoom.")
|
||
else:
|
||
# If fetching details fails, save with form data and log a warning
|
||
logger.warning(
|
||
f"Successfully updated Zoom meeting {instance.meeting_id}, but failed to fetch updated details. "
|
||
f"Error: {details_result.get('message', 'Unknown error')}"
|
||
)
|
||
instance.save() # Save with data from the form
|
||
messages.success(self.request, result["message"] + " (Note: Could not refresh local data from Zoom.)")
|
||
return redirect(reverse("meeting_details", kwargs={"slug": instance.slug}))
|
||
else:
|
||
messages.error(self.request, result["message"])
|
||
return redirect(reverse("meeting_details", kwargs={"slug": instance.slug}))
|
||
|
||
|
||
def ZoomMeetingDeleteView(request, pk):
|
||
meeting = get_object_or_404(ZoomMeeting, pk=pk)
|
||
meeting_id = meeting.meeting_id
|
||
try:
|
||
result = delete_zoom_meeting(meeting_id)
|
||
if result["status"] == "success":
|
||
meeting.delete()
|
||
messages.success(request, result["message"])
|
||
else:
|
||
messages.error(request, result["message"])
|
||
return redirect("/")
|
||
except Exception as e:
|
||
messages.error(request, str(e))
|
||
return redirect("/")
|
||
|
||
|
||
# Job Posting
|
||
# def job_list(request):
|
||
# """Display the list of job postings order by creation date descending"""
|
||
# jobs=JobPosting.objects.all().order_by('-created_at')
|
||
|
||
# # Filter by status if provided
|
||
# print(f"the request is: {request} ")
|
||
# status=request.GET.get('status')
|
||
# print(f"DEBUG: Status filter received: {status}")
|
||
# if status:
|
||
# jobs=jobs.filter(status=status)
|
||
|
||
# #pagination
|
||
# paginator=Paginator(jobs,10) # Show 10 jobs per page
|
||
# page_number=request.GET.get('page')
|
||
# page_obj=paginator.get_page(page_number)
|
||
# return render(request, 'jobs/job_list.html', {
|
||
# 'page_obj': page_obj,
|
||
# 'status_filter': status
|
||
# })
|
||
|
||
|
||
def create_job(request):
|
||
"""Create a new job posting"""
|
||
|
||
if request.method == "POST":
|
||
form = JobPostingForm(
|
||
request.POST, is_anonymous_user=not request.user.is_authenticated
|
||
)
|
||
# to check user is authenticated or not
|
||
if form.is_valid():
|
||
try:
|
||
job = form.save(commit=False)
|
||
if request.user.is_authenticated:
|
||
job.created_by = (
|
||
request.user.get_full_name() or request.user.username
|
||
)
|
||
else:
|
||
job.created_by = request.POST.get("created_by", "").strip()
|
||
if not job.created_by:
|
||
job.created_by = "University Administrator"
|
||
|
||
job.save()
|
||
job_apply_url_relative=reverse('job_detail_candidate',kwargs={'slug':job.slug})
|
||
job_apply_url_absolute=request.build_absolute_uri(job_apply_url_relative)
|
||
job.application_url=job_apply_url_absolute
|
||
job.save()
|
||
messages.success(request, f'Job "{job.title}" created successfully!')
|
||
return redirect("job_list")
|
||
except Exception as e:
|
||
logger.error(f"Error creating job: {e}")
|
||
messages.error(request, f"Error creating job: {e}")
|
||
else:
|
||
messages.error(request, f"Please correct the errors below.{form.errors}")
|
||
else:
|
||
form = JobPostingForm(is_anonymous_user=not request.user.is_authenticated)
|
||
return render(request, "jobs/create_job.html", {"form": form})
|
||
|
||
|
||
def edit_job(request, slug):
|
||
"""Edit an existing job posting"""
|
||
if request.method == "POST":
|
||
job = get_object_or_404(JobPosting, slug=slug)
|
||
form = JobPostingForm(
|
||
request.POST,
|
||
instance=job,
|
||
is_anonymous_user=not request.user.is_authenticated,
|
||
)
|
||
if form.is_valid():
|
||
try:
|
||
job = form.save(commit=False)
|
||
if request.user.is_authenticated:
|
||
job.created_by = (
|
||
request.user.get_full_name() or request.user.username
|
||
)
|
||
else:
|
||
job.created_by = request.POST.get("created_by", "").strip()
|
||
if not job.created_by:
|
||
job.created_by = "University Administrator"
|
||
job.save()
|
||
messages.success(request, f'Job "{job.title}" updated successfully!')
|
||
return redirect("job_list")
|
||
except Exception as e:
|
||
logger.error(f"Error updating job: {e}")
|
||
messages.error(request, f"Error updating job: {e}")
|
||
else:
|
||
messages.error(request, "Please correct the errors below.")
|
||
else:
|
||
job = get_object_or_404(JobPosting, slug=slug)
|
||
form = JobPostingForm(
|
||
instance=job, is_anonymous_user=not request.user.is_authenticated
|
||
)
|
||
return render(request, "jobs/edit_job.html", {"form": form, "job": job})
|
||
|
||
|
||
def job_detail(request, slug):
|
||
"""View details of a specific job"""
|
||
job = get_object_or_404(JobPosting, slug=slug)
|
||
|
||
|
||
# Get all candidates for this job, ordered by most recent
|
||
applicants = job.candidates.all().order_by("-created_at")
|
||
|
||
# Count candidates by stage for summary statistics
|
||
total_applicant = applicants.count()
|
||
applied_count = applicants.filter(stage="Applied").count()
|
||
interview_count = applicants.filter(stage="Interview").count()
|
||
offer_count = applicants.filter(stage="Offer").count()
|
||
|
||
status_form = JobPostingStatusForm(instance=job)
|
||
image_upload_form=JobPostingImageForm(instance=job)
|
||
|
||
|
||
|
||
# 2. Check for POST request (Status Update Submission)
|
||
if request.method == 'POST':
|
||
|
||
status_form = JobPostingStatusForm(request.POST, instance=job)
|
||
|
||
if status_form.is_valid():
|
||
status_form.save()
|
||
|
||
# Add a success message
|
||
messages.success(request, f"Status for '{job.title}' updated to '{job.get_status_display()}' successfully!")
|
||
|
||
|
||
return redirect('job_detail', slug=slug)
|
||
else:
|
||
|
||
messages.error(request, "Failed to update status due to validation errors.")
|
||
|
||
|
||
|
||
context = {
|
||
"job": job,
|
||
"applicants": applicants,
|
||
"total_applicants": total_applicant,
|
||
"applied_count": applied_count,
|
||
"interview_count": interview_count,
|
||
"offer_count": offer_count,
|
||
'status_form':status_form,
|
||
'image_upload_form':image_upload_form
|
||
}
|
||
return render(request, "jobs/job_detail.html", context)
|
||
|
||
def job_image_upload(request, slug):
|
||
#only for handling the post request
|
||
job=get_object_or_404(JobPosting,slug=slug)
|
||
if request.method=='POST':
|
||
image_upload_form=JobPostingImageForm(request.POST,request.FILES)
|
||
if image_upload_form.is_valid():
|
||
image_upload_form = image_upload_form.save(commit=False)
|
||
|
||
image_upload_form.job = job
|
||
image_upload_form.save()
|
||
messages.success(request, f"Image uploaded successfully for {job.title}.")
|
||
return redirect('job_detail', slug=job.slug)
|
||
else:
|
||
|
||
messages.error(request, "Image upload failed: Please ensure a valid image file was selected.")
|
||
|
||
return redirect('job_detail', slug=job.slug)
|
||
return redirect('job_detail', slug=job.slug)
|
||
|
||
|
||
def kaauh_career(request):
|
||
active_jobs = JobPosting.objects.select_related(
|
||
'form_template'
|
||
).filter(
|
||
status='ACTIVE',
|
||
form_template__is_active=True
|
||
)
|
||
|
||
return render(request,'jobs/career.html',{'active_jobs':active_jobs})
|
||
|
||
|
||
|
||
# job detail facing the candidate:
|
||
def job_detail_candidate(request, slug):
|
||
job = get_object_or_404(JobPosting, slug=slug)
|
||
return render(request, "jobs/job_detail_candidate.html", {"job": job})
|
||
|
||
|
||
def post_to_linkedin(request, slug):
|
||
"""Post a job to LinkedIn"""
|
||
job = get_object_or_404(JobPosting, slug=slug)
|
||
if job.status != "ACTIVE":
|
||
messages.info(request, "Only active jobs can be posted to LinkedIn.")
|
||
return redirect("job_list")
|
||
|
||
if request.method == "POST":
|
||
try:
|
||
# Check if user is authenticated with LinkedIn
|
||
if "linkedin_access_token" not in request.session:
|
||
messages.error(request, "Please authenticate with LinkedIn first.")
|
||
return redirect("linkedin_login")
|
||
|
||
# Clear previous LinkedIn data for re-posting
|
||
job.posted_to_linkedin = False
|
||
job.linkedin_post_id = ""
|
||
job.linkedin_post_url = ""
|
||
job.linkedin_post_status = ""
|
||
job.linkedin_posted_at = None
|
||
job.save()
|
||
|
||
# Initialize LinkedIn service
|
||
service = LinkedInService()
|
||
service.access_token = request.session["linkedin_access_token"]
|
||
|
||
# Post to LinkedIn
|
||
result = service.create_job_post(job)
|
||
if result["success"]:
|
||
# Update job with LinkedIn info
|
||
job.posted_to_linkedin = True
|
||
job.linkedin_post_id = result["post_id"]
|
||
job.linkedin_post_url = result["post_url"]
|
||
job.linkedin_post_status = "SUCCESS"
|
||
job.linkedin_posted_at = timezone.now()
|
||
job.save()
|
||
|
||
messages.success(request, "Job posted to LinkedIn successfully!")
|
||
else:
|
||
error_msg = result.get("error", "Unknown error")
|
||
job.linkedin_post_status = f"ERROR: {error_msg}"
|
||
job.save()
|
||
messages.error(request, f"Error posting to LinkedIn: {error_msg}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in post_to_linkedin: {e}")
|
||
job.linkedin_post_status = f"ERROR: {str(e)}"
|
||
job.save()
|
||
messages.error(request, f"Error posting to LinkedIn: {e}")
|
||
|
||
return redirect("job_detail", slug=job.slug)
|
||
|
||
|
||
def linkedin_login(request):
|
||
"""Redirect to LinkedIn OAuth"""
|
||
service = LinkedInService()
|
||
auth_url = service.get_auth_url()
|
||
"""
|
||
It creates a special URL that:
|
||
Sends the user to LinkedIn to log in
|
||
Asks the user to grant your app permission to post on their behalf
|
||
Tells LinkedIn where to send the user back after they approve (your redirect_uri)
|
||
http://yoursite.com/linkedin/callback/?code=TEMPORARY_CODE_HERE
|
||
"""
|
||
return redirect(auth_url)
|
||
|
||
|
||
def linkedin_callback(request):
|
||
"""Handle LinkedIn OAuth callback"""
|
||
code = request.GET.get("code")
|
||
if not code:
|
||
messages.error(request, "No authorization code received from LinkedIn.")
|
||
return redirect("job_list")
|
||
|
||
try:
|
||
service = LinkedInService()
|
||
# get_access_token(code)->It makes a POST request to LinkedIn’s token endpoint with parameters
|
||
access_token = service.get_access_token(code)
|
||
request.session["linkedin_access_token"] = access_token
|
||
request.session["linkedin_authenticated"] = True
|
||
settings.LINKEDIN_IS_CONNECTED = True
|
||
messages.success(request, "Successfully authenticated with LinkedIn!")
|
||
except Exception as e:
|
||
logger.error(f"LinkedIn authentication error: {e}")
|
||
messages.error(request, f"LinkedIn authentication failed: {e}")
|
||
|
||
return redirect("job_list")
|
||
|
||
|
||
# applicant views
|
||
def applicant_job_detail(request, slug):
|
||
"""View job details for applicants"""
|
||
job=get_object_or_404(JobPosting,slug=slug,status='ACTIVE')
|
||
return render(request,'jobs/applicant_job_detail.html',{'job':job})
|
||
|
||
def application_success(request,slug):
|
||
job=get_object_or_404(JobPosting,slug=slug)
|
||
return render(request,'jobs/application_success.html',{'job':job})
|
||
|
||
|
||
|
||
|
||
|
||
# Form Preview Views
|
||
# from django.http import JsonResponse
|
||
# from django.views.decorators.csrf import csrf_exempt
|
||
# from django.core.paginator import Paginator
|
||
# from django.contrib.auth.decorators import login_required
|
||
# import json
|
||
|
||
# def form_list(request):
|
||
# """Display list of all available forms"""
|
||
# forms = Form.objects.filter(is_active=True).order_by('-created_at')
|
||
|
||
# # Pagination
|
||
# paginator = Paginator(forms, 12)
|
||
# page_number = request.GET.get('page')
|
||
# page_obj = paginator.get_page(page_number)
|
||
|
||
# return render(request, 'forms/form_list.html', {
|
||
# 'page_obj': page_obj
|
||
# })
|
||
|
||
# def form_preview(request, form_id):
|
||
# """Display form preview for end users"""
|
||
# form = get_object_or_404(Form, id=form_id, is_active=True)
|
||
|
||
# # Get submission count for analytics
|
||
# submission_count = form.submissions.count()
|
||
|
||
# return render(request, 'forms/form_preview.html', {
|
||
# 'form': form,
|
||
# 'submission_count': submission_count,
|
||
# 'is_embed': request.GET.get('embed', 'false') == 'true'
|
||
# })
|
||
|
||
# @csrf_exempt
|
||
# def form_submit(request, form_id):
|
||
# """Handle form submission via AJAX"""
|
||
# if request.method != 'POST':
|
||
# return JsonResponse({'success': False, 'error': 'Only POST method allowed'}, status=405)
|
||
|
||
# form = get_object_or_404(Form, id=form_id, is_active=True)
|
||
|
||
# try:
|
||
# # Parse form data
|
||
# submission_data = {}
|
||
# files = {}
|
||
|
||
# # Process regular form fields
|
||
# for key, value in request.POST.items():
|
||
# if key != 'csrfmiddlewaretoken':
|
||
# submission_data[key] = value
|
||
|
||
# # Process file uploads
|
||
# for key, file in request.FILES.items():
|
||
# if file:
|
||
# files[key] = file
|
||
|
||
# # Create form submission
|
||
# submission = FormSubmission.objects.create(
|
||
# form=form,
|
||
# submission_data=submission_data,
|
||
# ip_address=request.META.get('REMOTE_ADDR'),
|
||
# user_agent=request.META.get('HTTP_USER_AGENT', '')
|
||
# )
|
||
|
||
# # Handle file uploads
|
||
# for field_id, file in files.items():
|
||
# UploadedFile.objects.create(
|
||
# submission=submission,
|
||
# field_id=field_id,
|
||
# file=file,
|
||
# original_filename=file.name
|
||
# )
|
||
|
||
# # TODO: Send email notification if configured
|
||
|
||
# return JsonResponse({
|
||
# 'success': True,
|
||
# 'message': 'Form submitted successfully!',
|
||
# 'submission_id': submission.id
|
||
# })
|
||
|
||
# except Exception as e:
|
||
# logger.error(f"Error submitting form {form_id}: {e}")
|
||
# return JsonResponse({
|
||
# 'success': False,
|
||
# 'error': 'An error occurred while submitting the form. Please try again.'
|
||
# }, status=500)
|
||
|
||
# def form_embed(request, form_id):
|
||
# """Display embeddable version of form"""
|
||
# form = get_object_or_404(Form, id=form_id, is_active=True)
|
||
|
||
# return render(request, 'forms/form_embed.html', {
|
||
# 'form': form,
|
||
# 'is_embed': True
|
||
# })
|
||
|
||
# @login_required
|
||
# def save_form_builder(request):
|
||
# """Save form from builder to database"""
|
||
# if request.method != 'POST':
|
||
# return JsonResponse({'success': False, 'error': 'Only POST method allowed'}, status=405)
|
||
|
||
# try:
|
||
# data = json.loads(request.body)
|
||
# form_data = data.get('form', {})
|
||
|
||
# # Check if this is an update or create
|
||
# form_id = data.get('form_id')
|
||
|
||
# if form_id:
|
||
# # Update existing form
|
||
# form = Form.objects.get(id=form_id, created_by=request.user)
|
||
# form.title = form_data.get('title', 'Untitled Form')
|
||
# form.description = form_data.get('description', '')
|
||
# form.structure = form_data
|
||
# form.save()
|
||
# else:
|
||
# # Create new form
|
||
# form = Form.objects.create(
|
||
# title=form_data.get('title', 'Untitled Form'),
|
||
# description=form_data.get('description', ''),
|
||
# structure=form_data,
|
||
# created_by=request.user
|
||
# )
|
||
|
||
# return JsonResponse({
|
||
# 'success': True,
|
||
# 'form_id': form.id,
|
||
# 'message': 'Form saved successfully!'
|
||
# })
|
||
|
||
# except json.JSONDecodeError:
|
||
# return JsonResponse({
|
||
# 'success': False,
|
||
# 'error': 'Invalid JSON data'
|
||
# }, status=400)
|
||
# except Exception as e:
|
||
# logger.error(f"Error saving form: {e}")
|
||
# return JsonResponse({
|
||
# 'success': False,
|
||
# 'error': 'An error occurred while saving the form'
|
||
# }, status=500)
|
||
|
||
# @login_required
|
||
# def load_form(request, form_id):
|
||
# """Load form data for editing in builder"""
|
||
# try:
|
||
# form = get_object_or_404(Form, id=form_id, created_by=request.user)
|
||
|
||
# return JsonResponse({
|
||
# 'success': True,
|
||
# 'form': {
|
||
# 'id': form.id,
|
||
# 'title': form.title,
|
||
# 'description': form.description,
|
||
# 'structure': form.structure
|
||
# }
|
||
# })
|
||
|
||
# except Exception as e:
|
||
# logger.error(f"Error loading form {form_id}: {e}")
|
||
# return JsonResponse({
|
||
# 'success': False,
|
||
# 'error': 'An error occurred while loading the form'
|
||
# }, status=500)
|
||
|
||
# @csrf_exempt
|
||
# def update_form_builder(request, form_id):
|
||
# """Update existing form from builder"""
|
||
# if request.method != 'POST':
|
||
# return JsonResponse({'success': False, 'error': 'Only POST method allowed'}, status=405)
|
||
|
||
# try:
|
||
# form = get_object_or_404(Form, id=form_id)
|
||
|
||
# # Check if user has permission to edit this form
|
||
# if form.created_by != request.user:
|
||
# return JsonResponse({
|
||
# 'success': False,
|
||
# 'error': 'You do not have permission to edit this form'
|
||
# }, status=403)
|
||
|
||
# data = json.loads(request.body)
|
||
# form_data = data.get('form', {})
|
||
|
||
# # Update form
|
||
# form.title = form_data.get('title', 'Untitled Form')
|
||
# form.description = form_data.get('description', '')
|
||
# form.structure = form_data
|
||
# form.save()
|
||
|
||
# return JsonResponse({
|
||
# 'success': True,
|
||
# 'form_id': form.id,
|
||
# 'message': 'Form updated successfully!'
|
||
# })
|
||
|
||
# except json.JSONDecodeError:
|
||
# return JsonResponse({
|
||
# 'success': False,
|
||
# 'error': 'Invalid JSON data'
|
||
# }, status=400)
|
||
# except Exception as e:
|
||
# logger.error(f"Error updating form {form_id}: {e}")
|
||
# return JsonResponse({
|
||
# 'success': False,
|
||
# 'error': 'An error occurred while updating the form'
|
||
# }, status=500)
|
||
|
||
# def edit_form(request, form_id):
|
||
# """Display form edit page"""
|
||
# form = get_object_or_404(Form, id=form_id)
|
||
|
||
# # Check if user has permission to edit this form
|
||
# if form.created_by != request.user:
|
||
# messages.error(request, 'You do not have permission to edit this form.')
|
||
# return redirect('form_list')
|
||
|
||
# return render(request, 'forms/edit_form.html', {
|
||
# 'form': form
|
||
# })
|
||
|
||
# def form_submissions(request, form_id):
|
||
# """View submissions for a specific form"""
|
||
# form = get_object_or_404(Form, id=form_id, created_by=request.user)
|
||
# submissions = form.submissions.all().order_by('-submitted_at')
|
||
|
||
# # Pagination
|
||
#
|
||
|
||
|
||
@ensure_csrf_cookie
|
||
def form_builder(request, template_id=None):
|
||
"""Render the form builder interface"""
|
||
context = {}
|
||
if template_id:
|
||
template = get_object_or_404(
|
||
FormTemplate, id=template_id, created_by=request.user
|
||
)
|
||
context["template_id"] = template.id
|
||
context["template_name"] = template.name
|
||
return render(request, "forms/form_builder.html", context)
|
||
|
||
|
||
@csrf_exempt
|
||
@require_http_methods(["POST"])
|
||
def save_form_template(request):
|
||
"""Save a new or existing form template"""
|
||
try:
|
||
data = json.loads(request.body)
|
||
template_name = data.get("name", "Untitled Form")
|
||
stages_data = data.get("stages", [])
|
||
template_id = data.get("template_id")
|
||
|
||
if template_id:
|
||
# Update existing template
|
||
template = get_object_or_404(
|
||
FormTemplate, id=template_id, created_by=request.user
|
||
)
|
||
template.name = template_name
|
||
template.save()
|
||
# Clear existing stages and fields
|
||
template.stages.all().delete()
|
||
else:
|
||
# Create new template
|
||
template = FormTemplate.objects.create(
|
||
name=template_name, created_by=request.user
|
||
)
|
||
|
||
# Create stages and fields
|
||
for stage_order, stage_data in enumerate(stages_data):
|
||
stage = FormStage.objects.create(
|
||
template=template,
|
||
name=stage_data["name"],
|
||
order=stage_order,
|
||
is_predefined=stage_data.get("predefined", False),
|
||
)
|
||
|
||
for field_order, field_data in enumerate(stage_data["fields"]):
|
||
options = field_data.get("options", [])
|
||
if not isinstance(options, list):
|
||
options = []
|
||
|
||
file_types = field_data.get("fileTypes", "")
|
||
max_file_size = field_data.get("maxFileSize", 5)
|
||
|
||
FormField.objects.create(
|
||
stage=stage,
|
||
label=field_data.get("label", ""),
|
||
field_type=field_data.get("type", "text"),
|
||
placeholder=field_data.get("placeholder", ""),
|
||
required=field_data.get("required", False),
|
||
order=field_order,
|
||
is_predefined=field_data.get("predefined", False),
|
||
options=options,
|
||
file_types=file_types,
|
||
max_file_size=max_file_size,
|
||
)
|
||
|
||
return JsonResponse(
|
||
{
|
||
"success": True,
|
||
"template_id": template.id,
|
||
"message": "Form template saved successfully!",
|
||
}
|
||
)
|
||
except Exception as e:
|
||
return JsonResponse({"success": False, "error": str(e)}, status=400)
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def load_form_template(request, template_id):
|
||
"""Load an existing form template"""
|
||
template = get_object_or_404(FormTemplate, id=template_id, created_by=request.user)
|
||
|
||
stages = []
|
||
for stage in template.stages.all():
|
||
fields = []
|
||
for field in stage.fields.all():
|
||
fields.append(
|
||
{
|
||
"id": field.id,
|
||
"type": field.field_type,
|
||
"label": field.label,
|
||
"placeholder": field.placeholder,
|
||
"required": field.required,
|
||
"options": field.options,
|
||
"fileTypes": field.file_types,
|
||
"maxFileSize": field.max_file_size,
|
||
"predefined": field.is_predefined,
|
||
}
|
||
)
|
||
stages.append(
|
||
{
|
||
"id": stage.id,
|
||
"name": stage.name,
|
||
"predefined": stage.is_predefined,
|
||
"fields": fields,
|
||
}
|
||
)
|
||
|
||
return JsonResponse(
|
||
{
|
||
"success": True,
|
||
"template": {
|
||
"id": template.id,
|
||
"name": template.name,
|
||
"description": template.description,
|
||
"is_active": template.is_active,
|
||
"job": template.job_id if template.job else None,
|
||
"stages": stages,
|
||
},
|
||
}
|
||
)
|
||
|
||
|
||
def form_templates_list(request):
|
||
"""List all form templates for the current user"""
|
||
query = request.GET.get("q", "")
|
||
templates = FormTemplate.objects.filter(created_by=request.user)
|
||
|
||
if query:
|
||
templates = templates.filter(
|
||
Q(name__icontains=query) | Q(description__icontains=query)
|
||
)
|
||
|
||
templates = templates.order_by("-created_at")
|
||
paginator = Paginator(templates, 10) # Show 10 templates per page
|
||
page_number = request.GET.get("page")
|
||
page_obj = paginator.get_page(page_number)
|
||
form = FormTemplateForm()
|
||
form.fields["job"].queryset = JobPosting.objects.filter(form_template__isnull=True)
|
||
context = {"templates": page_obj, "query": query, "form": form}
|
||
return render(request, "forms/form_templates_list.html", context)
|
||
|
||
|
||
def create_form_template(request):
|
||
"""Create a new form template"""
|
||
if request.method == "POST":
|
||
form = FormTemplateForm(request.POST)
|
||
if form.is_valid():
|
||
template = form.save(commit=False)
|
||
template.created_by = request.user
|
||
template.save()
|
||
messages.success(
|
||
request, f'Form template "{template.name}" created successfully!'
|
||
)
|
||
return redirect("form_templates_list")
|
||
else:
|
||
form = FormTemplateForm()
|
||
|
||
return render(request, "forms/create_form_template.html", {"form": form})
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def list_form_templates(request):
|
||
"""List all form templates for the current user"""
|
||
templates = FormTemplate.objects.filter(created_by=request.user).values(
|
||
"id", "name", "description", "created_at", "updated_at"
|
||
)
|
||
return JsonResponse({"success": True, "templates": list(templates)})
|
||
|
||
|
||
@require_http_methods(["DELETE"])
|
||
def delete_form_template(request, template_id):
|
||
"""Delete a form template"""
|
||
template = get_object_or_404(FormTemplate, id=template_id, created_by=request.user)
|
||
template.delete()
|
||
return JsonResponse(
|
||
{"success": True, "message": "Form template deleted successfully!"}
|
||
)
|
||
|
||
|
||
def form_wizard_view(request, template_id):
|
||
"""Display the form as a step-by-step wizard"""
|
||
template = get_object_or_404(FormTemplate, id=template_id, is_active=True)
|
||
job_id = template.job.internal_job_id
|
||
return render(
|
||
request,
|
||
"forms/form_wizard.html",
|
||
{"template_id": template_id, "job_id": job_id},
|
||
)
|
||
|
||
|
||
@require_POST
|
||
def submit_form(request, template_id):
|
||
"""Handle form submission"""
|
||
print(f"Request method: {request}")
|
||
print(f"CSRF token in POST: {'csrfmiddlewaretoken' in request.POST}")
|
||
print(f"CSRF token value: {request.POST.get('csrfmiddlewaretoken', 'NOT FOUND')}")
|
||
print(f"POST data: {request.POST}")
|
||
print(f"FILES data: {request.FILES}")
|
||
if request.method == "POST":
|
||
try:
|
||
template = get_object_or_404(FormTemplate, id=template_id)
|
||
|
||
# # Create form submission
|
||
# print({key: value for key, value in request.POST.items()})
|
||
# first_name = next((value for key, value in request.POST.items() if key == 'First Name'), None)
|
||
# last_name = next((value for key, value in request.POST.items() if key == 'Last Name'), None)
|
||
# email = next((value for key, value in request.POST.items() if key == 'Email Address'), None)
|
||
# phone = next((value for key, value in request.POST.items() if key == 'Phone Number'), None)
|
||
# address = next((value for key, value in request.POST.items() if key == 'Address'), None)
|
||
# resume = next((value for key, value in request.POST.items() if key == 'Resume Upload'), None)
|
||
# print(first_name, last_name, email, phone, address, resume)
|
||
# create candidate
|
||
|
||
submission = FormSubmission.objects.create(template=template)
|
||
# Process field responses
|
||
for field_id, value in request.POST.items():
|
||
if field_id.startswith("field_"):
|
||
actual_field_id = field_id.replace("field_", "")
|
||
try:
|
||
field = FormField.objects.get(
|
||
id=actual_field_id, stage__template=template
|
||
)
|
||
FieldResponse.objects.create(
|
||
submission=submission,
|
||
field=field,
|
||
value=value if value else None,
|
||
)
|
||
except FormField.DoesNotExist:
|
||
continue
|
||
|
||
# Handle file uploads
|
||
for field_id, uploaded_file in request.FILES.items():
|
||
if field_id.startswith("field_"):
|
||
actual_field_id = field_id.replace("field_", "")
|
||
try:
|
||
field = FormField.objects.get(
|
||
id=actual_field_id, stage__template=template
|
||
)
|
||
FieldResponse.objects.create(
|
||
submission=submission,
|
||
field=field,
|
||
uploaded_file=uploaded_file,
|
||
)
|
||
except FormField.DoesNotExist:
|
||
continue
|
||
try:
|
||
first_name = submission.responses.get(field__label="First Name")
|
||
last_name = submission.responses.get(field__label="Last Name")
|
||
email = submission.responses.get(field__label="Email Address")
|
||
phone = submission.responses.get(field__label="Phone Number")
|
||
address = submission.responses.get(field__label="Address")
|
||
resume = submission.responses.get(field__label="Resume Upload")
|
||
|
||
submission.applicant_name = (
|
||
f"{first_name.display_value} {last_name.display_value}"
|
||
)
|
||
submission.applicant_email = email.display_value
|
||
submission.save()
|
||
Candidate.objects.create(
|
||
first_name=first_name.display_value,
|
||
last_name=last_name.display_value,
|
||
email=email.display_value,
|
||
phone=phone.display_value,
|
||
address=address.display_value,
|
||
resume=resume.get_file if resume.is_file else None,
|
||
job=submission.template.job,
|
||
)
|
||
return redirect('application_success')
|
||
|
||
except Exception as e:
|
||
logger.error(f"Candidate creation failed,{e}")
|
||
pass
|
||
return JsonResponse(
|
||
{
|
||
"success": True,
|
||
"message": "Form submitted successfully!",
|
||
"submission_id": submission.id,
|
||
}
|
||
)
|
||
except Exception as e:
|
||
return JsonResponse({"success": False, "error": str(e)}, status=400)
|
||
else:
|
||
# Handle GET request - this should not happen for form submission
|
||
return JsonResponse(
|
||
{"success": False, "error": "GET method not allowed for form submission"},
|
||
status=405,
|
||
)
|
||
|
||
|
||
def form_template_submissions_list(request, slug):
|
||
"""List all submissions for a specific form template"""
|
||
template = get_object_or_404(FormTemplate, slug=slug)
|
||
|
||
submissions = FormSubmission.objects.filter(template=template).order_by(
|
||
"-submitted_at"
|
||
)
|
||
|
||
# Pagination
|
||
paginator = Paginator(submissions, 10) # Show 10 submissions per page
|
||
page_number = request.GET.get("page")
|
||
page_obj = paginator.get_page(page_number)
|
||
|
||
return render(
|
||
request,
|
||
"forms/form_template_submissions_list.html",
|
||
{"template": template, "page_obj": page_obj},
|
||
)
|
||
|
||
|
||
def form_template_all_submissions(request, template_id):
|
||
"""Display all submissions for a form template in table format"""
|
||
template = get_object_or_404(FormTemplate, id=template_id)
|
||
print(template)
|
||
# Get all submissions for this template
|
||
submissions = FormSubmission.objects.filter(template=template).order_by("-submitted_at")
|
||
|
||
# Get all fields for this template, ordered by stage and field order
|
||
fields = FormField.objects.filter(stage__template=template).select_related('stage').order_by('stage__order', 'order')
|
||
|
||
# Pagination
|
||
paginator = Paginator(submissions, 10) # Show 10 submissions per page
|
||
page_number = request.GET.get("page")
|
||
page_obj = paginator.get_page(page_number)
|
||
|
||
return render(
|
||
request,
|
||
"forms/form_template_all_submissions.html",
|
||
{
|
||
"template": template,
|
||
"page_obj": page_obj,
|
||
"fields": fields,
|
||
},
|
||
)
|
||
|
||
|
||
def form_submission_details(request, template_id, slug):
|
||
"""Display detailed view of a specific form submission"""
|
||
# Get the form template and verify ownership
|
||
template = get_object_or_404(FormTemplate, id=template_id)
|
||
# Get the specific submission
|
||
submission = get_object_or_404(FormSubmission, slug=slug, template=template)
|
||
|
||
# Get all stages with their fields
|
||
stages = template.stages.prefetch_related("fields").order_by("order")
|
||
|
||
# Get all responses for this submission, ordered by field order
|
||
responses = submission.responses.select_related("field").order_by("field__order")
|
||
|
||
# Group responses by stage
|
||
stage_responses = {}
|
||
for stage in stages:
|
||
stage_responses[stage.id] = {
|
||
"stage": stage,
|
||
"responses": responses.filter(field__stage=stage),
|
||
}
|
||
|
||
return render(
|
||
request,
|
||
"forms/form_submission_details.html",
|
||
{
|
||
"template": template,
|
||
"submission": submission,
|
||
"stages": stages,
|
||
"responses": responses,
|
||
"stage_responses": stage_responses,
|
||
},
|
||
)
|
||
|
||
def schedule_interviews_view(request, slug):
|
||
job = get_object_or_404(JobPosting, slug=slug)
|
||
|
||
if request.method == "POST":
|
||
form = InterviewScheduleForm(slug, request.POST)
|
||
break_formset = BreakTimeFormSet(request.POST)
|
||
|
||
# Check if this is a confirmation request
|
||
if "confirm_schedule" in request.POST:
|
||
# Get the schedule data from session
|
||
schedule_data = request.session.get("interview_schedule_data")
|
||
if not schedule_data:
|
||
messages.error(request, "Session expired. Please try again.")
|
||
return redirect("schedule_interviews", slug=slug)
|
||
|
||
# Create the interview schedule
|
||
schedule = InterviewSchedule.objects.create(
|
||
job=job,
|
||
created_by=request.user,
|
||
start_date=datetime.fromisoformat(schedule_data["start_date"]).date(),
|
||
end_date=datetime.fromisoformat(schedule_data["end_date"]).date(),
|
||
working_days=schedule_data["working_days"],
|
||
start_time=time.fromisoformat(schedule_data["start_time"]),
|
||
end_time=time.fromisoformat(schedule_data["end_time"]),
|
||
interview_duration=schedule_data["interview_duration"],
|
||
buffer_time=schedule_data["buffer_time"],
|
||
breaks=schedule_data["breaks"],
|
||
)
|
||
|
||
# Add candidates to the schedule
|
||
candidates = Candidate.objects.filter(id__in=schedule_data["candidate_ids"])
|
||
schedule.candidates.set(candidates)
|
||
|
||
# Create temporary break time objects for slot calculation
|
||
temp_breaks = []
|
||
for break_data in schedule_data["breaks"]:
|
||
temp_breaks.append(
|
||
BreakTime(
|
||
start_time=datetime.strptime(
|
||
break_data["start_time"], "%H:%M:%S"
|
||
).time(),
|
||
end_time=datetime.strptime(
|
||
break_data["end_time"], "%H:%M:%S"
|
||
).time(),
|
||
)
|
||
)
|
||
|
||
# Get available slots
|
||
available_slots = get_available_time_slots(schedule)
|
||
|
||
# Create scheduled interviews
|
||
scheduled_count = 0
|
||
for i, candidate in enumerate(candidates):
|
||
if i < len(available_slots):
|
||
slot = available_slots[i]
|
||
interview_datetime = datetime.combine(slot['date'], slot['time'])
|
||
|
||
# Create Zoom meeting
|
||
meeting_topic = f"Interview for {job.title} - {candidate.name}"
|
||
|
||
start_time = interview_datetime.isoformat() + "Z"
|
||
|
||
zoom_meeting = create_zoom_meeting(
|
||
topic=meeting_topic,
|
||
start_time=start_time,
|
||
duration=schedule.interview_duration
|
||
)
|
||
|
||
result = create_zoom_meeting(meeting_topic, start_time, schedule.interview_duration)
|
||
|
||
if result["status"] == "success":
|
||
zoom_meeting = ZoomMeeting.objects.create(
|
||
topic=meeting_topic,
|
||
start_time=interview_datetime,
|
||
duration=schedule.interview_duration,
|
||
meeting_id=result["meeting_details"]["meeting_id"],
|
||
join_url=result["meeting_details"]["join_url"],
|
||
zoom_gateway_response=result["zoom_gateway_response"],
|
||
)
|
||
|
||
# Create scheduled interview record
|
||
scheduled_interview = ScheduledInterview.objects.create(
|
||
candidate=candidate,
|
||
job=job,
|
||
zoom_meeting=zoom_meeting,
|
||
schedule=schedule,
|
||
interview_date=slot['date'],
|
||
interview_time=slot['time']
|
||
)
|
||
|
||
# Send email to candidate
|
||
# try:
|
||
# send_interview_email(scheduled_interview)
|
||
# except Exception as e:
|
||
# messages.warning(
|
||
# request,
|
||
# f"Interview scheduled for {candidate.name}, but failed to send email: {str(e)}"
|
||
# )
|
||
|
||
scheduled_count += 1
|
||
|
||
messages.success(
|
||
request, f"Successfully scheduled {scheduled_count} interviews."
|
||
)
|
||
|
||
# Clear the session data
|
||
if "interview_schedule_data" in request.session:
|
||
del request.session["interview_schedule_data"]
|
||
|
||
return redirect("job_detail", slug=slug)
|
||
|
||
# This is the initial form submission
|
||
if form.is_valid() and break_formset.is_valid():
|
||
# Get the form data
|
||
candidates = form.cleaned_data["candidates"]
|
||
start_date = form.cleaned_data["start_date"]
|
||
end_date = form.cleaned_data["end_date"]
|
||
working_days = form.cleaned_data["working_days"]
|
||
start_time = form.cleaned_data["start_time"]
|
||
end_time = form.cleaned_data["end_time"]
|
||
interview_duration = form.cleaned_data["interview_duration"]
|
||
buffer_time = form.cleaned_data["buffer_time"]
|
||
|
||
# Process break times
|
||
breaks = []
|
||
for break_form in break_formset:
|
||
if break_form.cleaned_data and not break_form.cleaned_data.get(
|
||
"DELETE"
|
||
):
|
||
breaks.append(
|
||
{
|
||
"start_time": break_form.cleaned_data[
|
||
"start_time"
|
||
].strftime("%H:%M:%S"),
|
||
"end_time": break_form.cleaned_data["end_time"].strftime("%H:%M:%S"),
|
||
}
|
||
)
|
||
|
||
# Create a temporary schedule object (not saved to DB)
|
||
temp_schedule = InterviewSchedule(
|
||
job=job,
|
||
start_date=start_date,
|
||
end_date=end_date,
|
||
working_days=working_days,
|
||
start_time=start_time,
|
||
end_time=end_time,
|
||
interview_duration=interview_duration,
|
||
buffer_time=buffer_time,
|
||
breaks=breaks,
|
||
)
|
||
|
||
# Create temporary break time objects
|
||
temp_breaks = []
|
||
for break_data in breaks:
|
||
temp_breaks.append(
|
||
BreakTime(
|
||
start_time=datetime.strptime(
|
||
break_data["start_time"], "%H:%M:%S"
|
||
).time(),
|
||
end_time=datetime.strptime(
|
||
break_data["end_time"], "%H:%M:%S"
|
||
).time(),
|
||
)
|
||
)
|
||
|
||
# Get available slots
|
||
available_slots = get_available_time_slots(temp_schedule)
|
||
|
||
if len(available_slots) < len(candidates):
|
||
messages.error(
|
||
request,
|
||
f"Not enough available slots. Required: {len(candidates)}, Available: {len(available_slots)}",
|
||
)
|
||
return render(
|
||
request,
|
||
"interviews/schedule_interviews.html",
|
||
{"form": form, "break_formset": break_formset, "job": job},
|
||
)
|
||
|
||
# Create a preview schedule
|
||
preview_schedule = []
|
||
for i, candidate in enumerate(candidates):
|
||
slot = available_slots[i]
|
||
preview_schedule.append(
|
||
{"candidate": candidate, "date": slot["date"], "time": slot["time"]}
|
||
)
|
||
|
||
# Save the form data to session for later use
|
||
schedule_data = {
|
||
"start_date": start_date.isoformat(),
|
||
"end_date": end_date.isoformat(),
|
||
"working_days": working_days,
|
||
"start_time": start_time.isoformat(),
|
||
"end_time": end_time.isoformat(),
|
||
"interview_duration": interview_duration,
|
||
"buffer_time": buffer_time,
|
||
"candidate_ids": [c.id for c in candidates],
|
||
"breaks": breaks,
|
||
}
|
||
request.session["interview_schedule_data"] = schedule_data
|
||
|
||
# Render the preview page
|
||
return render(
|
||
request,
|
||
"interviews/preview_schedule.html",
|
||
{
|
||
"job": job,
|
||
"schedule": preview_schedule,
|
||
"start_date": start_date,
|
||
"end_date": end_date,
|
||
"working_days": working_days,
|
||
"start_time": start_time,
|
||
"end_time": end_time,
|
||
"breaks": breaks,
|
||
"interview_duration": interview_duration,
|
||
"buffer_time": buffer_time,
|
||
},
|
||
)
|
||
else:
|
||
form = InterviewScheduleForm(slug=slug)
|
||
break_formset = BreakTimeFormSet()
|
||
|
||
return render(
|
||
request,
|
||
"interviews/schedule_interviews.html",
|
||
{"form": form, "break_formset": break_formset, "job": job},
|
||
)
|
||
# def schedule_interviews_view(request, slug):
|
||
# job = get_object_or_404(JobPosting, slug=slug)
|
||
|
||
# if request.method == "POST":
|
||
# form = InterviewScheduleForm(slug, request.POST)
|
||
# break_formset = BreakTimeFormSet(request.POST)
|
||
|
||
# # Check if this is a confirmation request
|
||
# if "confirm_schedule" in request.POST:
|
||
# # Get the schedule data from session
|
||
# schedule_data = request.session.get("interview_schedule_data")
|
||
# if not schedule_data:
|
||
# messages.error(request, "Session expired. Please try again.")
|
||
# return redirect("schedule_interviews", slug=slug)
|
||
|
||
# # Create the interview schedule
|
||
# schedule = InterviewSchedule.objects.create(
|
||
# job=job,
|
||
# created_by=request.user,
|
||
# start_date=datetime.fromisoformat(schedule_data["start_date"]).date(),
|
||
# end_date=datetime.fromisoformat(schedule_data["end_date"]).date(),
|
||
# working_days=schedule_data["working_days"],
|
||
# start_time=time.fromisoformat(schedule_data["start_time"]),
|
||
# end_time=time.fromisoformat(schedule_data["end_time"]),
|
||
# interview_duration=schedule_data["interview_duration"],
|
||
# buffer_time=schedule_data["buffer_time"],
|
||
# breaks=schedule_data["breaks"], # Direct assignment for JSON field
|
||
# )
|
||
|
||
# # Add candidates to the schedule
|
||
# candidates = Candidate.objects.filter(id__in=schedule_data["candidate_ids"])
|
||
# schedule.candidates.set(candidates)
|
||
|
||
# # Schedule the interviews
|
||
# try:
|
||
# scheduled_count = schedule_interviews(schedule)
|
||
# messages.success(
|
||
# request, f"Successfully scheduled {scheduled_count} interviews."
|
||
# )
|
||
# # Clear the session data
|
||
# if "interview_schedule_data" in request.session:
|
||
# del request.session["interview_schedule_data"]
|
||
# return redirect("job_detail", slug=slug)
|
||
# except Exception as e:
|
||
# messages.error(request, f"Error scheduling interviews: {str(e)}")
|
||
# return redirect("schedule_interviews", slug=slug)
|
||
|
||
# # This is the initial form submission
|
||
# if form.is_valid() and break_formset.is_valid():
|
||
# # Get the form data
|
||
# candidates = form.cleaned_data["candidates"]
|
||
# start_date = form.cleaned_data["start_date"]
|
||
# end_date = form.cleaned_data["end_date"]
|
||
# working_days = form.cleaned_data["working_days"]
|
||
# start_time = form.cleaned_data["start_time"]
|
||
# end_time = form.cleaned_data["end_time"]
|
||
# interview_duration = form.cleaned_data["interview_duration"]
|
||
# buffer_time = form.cleaned_data["buffer_time"]
|
||
|
||
# # Process break times
|
||
# breaks = []
|
||
# for break_form in break_formset:
|
||
# if break_form.cleaned_data and not break_form.cleaned_data.get(
|
||
# "DELETE"
|
||
# ):
|
||
# breaks.append(
|
||
# {
|
||
# "start_time": break_form.cleaned_data[
|
||
# "start_time"
|
||
# ].strftime("%H:%M:%S"),
|
||
# "end_time": break_form.cleaned_data["end_time"].strftime("%H:%M:%S"),
|
||
# }
|
||
# )
|
||
|
||
# # Create a temporary schedule object (not saved to DB)
|
||
# temp_schedule = InterviewSchedule(
|
||
# job=job,
|
||
# start_date=start_date,
|
||
# end_date=end_date,
|
||
# working_days=working_days,
|
||
# start_time=start_time,
|
||
# end_time=end_time,
|
||
# interview_duration=interview_duration,
|
||
# buffer_time=buffer_time,
|
||
# breaks=breaks, # Direct assignment for JSON field
|
||
# )
|
||
|
||
# # Get available slots
|
||
# available_slots = get_available_time_slots(temp_schedule)
|
||
|
||
# if len(available_slots) < len(candidates):
|
||
# messages.error(
|
||
# request,
|
||
# f"Not enough available slots. Required: {len(candidates)}, Available: {len(available_slots)}",
|
||
# )
|
||
# return render(
|
||
# request,
|
||
# "interviews/schedule_interviews.html",
|
||
# {"form": form, "break_formset": break_formset, "job": job},
|
||
# )
|
||
|
||
# # Create a preview schedule
|
||
# preview_schedule = []
|
||
# for i, candidate in enumerate(candidates):
|
||
# slot = available_slots[i]
|
||
# preview_schedule.append(
|
||
# {"candidate": candidate, "date": slot["date"], "time": slot["time"]}
|
||
# )
|
||
|
||
# # Save the form data to session for later use
|
||
# schedule_data = {
|
||
# "start_date": start_date.isoformat(),
|
||
# "end_date": end_date.isoformat(),
|
||
# "working_days": working_days,
|
||
# "start_time": start_time.isoformat(),
|
||
# "end_time": end_time.isoformat(),
|
||
# "interview_duration": interview_duration,
|
||
# "buffer_time": buffer_time,
|
||
# "candidate_ids": [c.id for c in candidates],
|
||
# "breaks": breaks,
|
||
# }
|
||
# request.session["interview_schedule_data"] = schedule_data
|
||
|
||
# # Render the preview page
|
||
# return render(
|
||
# request,
|
||
# "interviews/preview_schedule.html",
|
||
# {
|
||
# "job": job,
|
||
# "schedule": preview_schedule,
|
||
# "start_date": start_date,
|
||
# "end_date": end_date,
|
||
# "working_days": working_days,
|
||
# "start_time": start_time,
|
||
# "end_time": end_time,
|
||
# "breaks": breaks,
|
||
# "interview_duration": interview_duration,
|
||
# "buffer_time": buffer_time,
|
||
# },
|
||
# )
|
||
# else:
|
||
# form = InterviewScheduleForm(slug=slug)
|
||
# break_formset = BreakTimeFormSet()
|
||
|
||
# return render(
|
||
# request,
|
||
# "interviews/schedule_interviews.html",
|
||
# {"form": form, "break_formset": break_formset, "job": job},
|
||
# )
|
||
|
||
def candidate_screening_view(request, slug):
|
||
"""
|
||
Manage candidate tiers and stage transitions
|
||
"""
|
||
job = get_object_or_404(JobPosting, slug=slug)
|
||
|
||
# Get all candidates for this job, ordered by match score (descending)
|
||
candidates = job.candidates.filter(stage="Applied").order_by("-match_score")
|
||
|
||
# Get tier categorization parameters
|
||
# tier1_count = int(request.GET.get("tier1_count", 100))
|
||
|
||
# # Categorize candidates into tiers
|
||
# tier1_candidates = candidates[:tier1_count] if tier1_count > 0 else []
|
||
# remaining_candidates = candidates[tier1_count:] if tier1_count > 0 else []
|
||
|
||
# if len(remaining_candidates) > 0:
|
||
# # Tier 2: Next 50% of remaining candidates
|
||
# tier2_count = max(1, len(remaining_candidates) // 2)
|
||
# tier2_candidates = remaining_candidates[:tier2_count]
|
||
# tier3_candidates = remaining_candidates[tier2_count:]
|
||
# else:
|
||
# tier2_candidates = []
|
||
# tier3_candidates = []
|
||
|
||
# # Handle form submissions
|
||
# if request.method == "POST":
|
||
# # Update tier categorization
|
||
# if "update_tiers" in request.POST:
|
||
# tier1_count = int(request.POST.get("tier1_count", 100))
|
||
# messages.success(request, f"Tier categorization updated. Tier 1: {tier1_count} candidates")
|
||
# return redirect("candidate_screening_view", slug=slug)
|
||
|
||
# # Update individual candidate stages
|
||
# elif "update_stage" in request.POST:
|
||
# candidate_id = request.POST.get("candidate_id")
|
||
# new_stage = request.POST.get("new_stage")
|
||
# candidate = get_object_or_404(Candidate, id=candidate_id, job=job)
|
||
|
||
# if candidate.can_transition_to(new_stage):
|
||
# old_stage = candidate.stage
|
||
# candidate.stage = new_stage
|
||
# candidate.save()
|
||
# messages.success(request, f"Updated {candidate.name} from {old_stage} to {new_stage}")
|
||
# else:
|
||
# messages.error(request, f"Cannot transition {candidate.name} from {candidate.stage} to {new_stage}")
|
||
|
||
# # Update exam status
|
||
# elif "update_exam_status" in request.POST:
|
||
# candidate_id = request.POST.get("candidate_id")
|
||
# exam_status = request.POST.get("exam_status")
|
||
# exam_date = request.POST.get("exam_date")
|
||
# candidate = get_object_or_404(Candidate, id=candidate_id, job=job)
|
||
|
||
# if candidate.stage == "Exam":
|
||
# candidate.exam_status = exam_status
|
||
# if exam_date:
|
||
# candidate.exam_date = exam_date
|
||
# candidate.save()
|
||
# messages.success(request, f"Updated exam status for {candidate.name}")
|
||
# else:
|
||
# messages.error(request, f"Can only update exam status for candidates in Exam stage")
|
||
|
||
# # Bulk stage update
|
||
# elif "bulk_update_stage" in request.POST:
|
||
# selected_candidates = request.POST.getlist("selected_candidates")
|
||
# new_stage = request.POST.get("bulk_new_stage")
|
||
# updated_count = 0
|
||
|
||
# for candidate_id in selected_candidates:
|
||
# candidate = get_object_or_404(Candidate, id=candidate_id, job=job)
|
||
# if candidate.can_transition_to(new_stage):
|
||
# candidate.stage = new_stage
|
||
# candidate.save()
|
||
# updated_count += 1
|
||
|
||
# messages.success(request, f"Updated {updated_count} candidates to {new_stage} stage")
|
||
|
||
# # Mark individual candidate as Candidate
|
||
# elif "mark_as_candidate" in request.POST:
|
||
# candidate_id = request.POST.get("candidate_id")
|
||
# candidate = get_object_or_404(Candidate, id=candidate_id, job=job)
|
||
|
||
# if candidate.applicant_status == "Applicant":
|
||
# candidate.applicant_status = "Candidate"
|
||
# candidate.save()
|
||
# messages.success(request, f"Marked {candidate.name} as Candidate")
|
||
# else:
|
||
# messages.info(request, f"{candidate.name} is already marked as Candidate")
|
||
|
||
# # Mark all Tier 1 candidates as Candidates
|
||
# elif "mark_as_candidates" in request.POST:
|
||
# updated_count = 0
|
||
# for candidate in tier1_candidates:
|
||
# if candidate.applicant_status == "Applicant":
|
||
# candidate.applicant_status = "Candidate"
|
||
# candidate.save()
|
||
# updated_count += 1
|
||
|
||
# if updated_count > 0:
|
||
# messages.success(request, f"Marked {updated_count} Tier 1 candidates as Candidates")
|
||
# else:
|
||
# messages.info(request, "All Tier 1 candidates are already marked as Candidates")
|
||
|
||
# Group candidates by current stage for display
|
||
stage_groups = {
|
||
"Applied": candidates.filter(stage="Applied"),
|
||
"Exam": candidates.filter(stage="Exam"),
|
||
"Interview": candidates.filter(stage="Interview"),
|
||
"Offer": candidates.filter(stage="Offer"),
|
||
}
|
||
|
||
context = {
|
||
"job": job,
|
||
"candidates": candidates,
|
||
# "stage_groups": stage_groups,
|
||
# "tier1_count": tier1_count,
|
||
# "total_candidates": candidates.count(),
|
||
}
|
||
|
||
return render(request, "recruitment/candidate_screening_view.html", context)
|
||
|
||
|
||
def get_candidates_from_request(request):
|
||
for c in request.POST.items():
|
||
try:
|
||
yield Candidate.objects.get(pk=c[0])
|
||
except Exception as e:
|
||
logger.error(e)
|
||
yield None
|
||
def candidate_exam_view(request, slug):
|
||
"""
|
||
Manage candidate tiers and stage transitions
|
||
"""
|
||
job = get_object_or_404(JobPosting, slug=slug)
|
||
candidates = job.candidates.filter(stage="Exam").order_by("-match_score")
|
||
|
||
return render(request, "recruitment/candidate_exam_view.html", {"job": job, "candidates": candidates})
|
||
|
||
def update_candidate_exam_status(request, slug):
|
||
candidate = get_object_or_404(Candidate, slug=slug)
|
||
if request.method == "POST":
|
||
form = CandidateExamDateForm(request.POST, instance=candidate)
|
||
if form.is_valid():
|
||
form.save()
|
||
return redirect("candidate_exam_view", slug=candidate.job.slug)
|
||
else:
|
||
form = CandidateExamDateForm(request.POST, instance=candidate)
|
||
return render(request, "includes/candidate_exam_status_form.html", {"candidate": candidate,"form": form})
|
||
def bulk_update_candidate_exam_status(request,slug):
|
||
job = get_object_or_404(JobPosting, slug=slug)
|
||
status = request.headers.get('status')
|
||
|
||
if status:
|
||
for candidate in get_candidates_from_request(request):
|
||
try:
|
||
if status == "pass":
|
||
candidate.exam_status = "Passed"
|
||
candidate.stage = "Interview"
|
||
else:
|
||
candidate.exam_status = "Failed"
|
||
candidate.save()
|
||
except Exception as e:
|
||
print(e)
|
||
messages.success(request, f"Updated exam status selected candidates")
|
||
return redirect("candidate_exam_view", slug=job.slug)
|
||
|
||
def candidate_criteria_view_htmx(request, pk):
|
||
candidate = get_object_or_404(Candidate, pk=pk)
|
||
return render(request, "includes/candidate_modal_body.html", {"candidate": candidate})
|
||
|
||
|
||
def candidate_set_exam_date(request, slug):
|
||
candidate = get_object_or_404(Candidate, slug=slug)
|
||
candidate.exam_date = timezone.now()
|
||
candidate.save()
|
||
messages.success(request, f"Set exam date for {candidate.name} to {candidate.exam_date}")
|
||
return redirect("candidate_screening_view", slug=candidate.job.slug)
|
||
|
||
def bulk_candidate_move_to_exam(request):
|
||
for candidate in get_candidates_from_request(request):
|
||
candidate.stage = "Exam"
|
||
candidate.applicant_status = "Candidate"
|
||
candidate.exam_date = timezone.now()
|
||
candidate.save()
|
||
|
||
messages.success(request, f"Candidates Moved to Exam stage")
|
||
return redirect("candidate_screening_view", slug=candidate.job.slug)
|
||
# def response():
|
||
# yield SSE.patch_elements("","")
|
||
# yield SSE.execute_script("console.log('hello world');")
|
||
# return DatastarResponse(response())
|
||
|
||
def candidate_interview_view(request,slug):
|
||
job = get_object_or_404(JobPosting,slug=slug)
|
||
if "Datastar-Request" in request.headers:
|
||
for candidate in get_candidates_from_request(request):
|
||
print(candidate)
|
||
context = {"job":job,"candidates":job.candidates.all()}
|
||
return render(request,"recruitment/candidate_interview_view.html",context)
|
||
|
||
|
||
|
||
|
||
def interview_calendar_view(request, slug):
|
||
job = get_object_or_404(JobPosting, slug=slug)
|
||
|
||
# Get all scheduled interviews for this job
|
||
scheduled_interviews = ScheduledInterview.objects.filter(
|
||
job=job
|
||
).select_related('candidate', 'zoom_meeting')
|
||
print(scheduled_interviews)
|
||
# Convert interviews to calendar events
|
||
events = []
|
||
for interview in scheduled_interviews:
|
||
# Create start datetime
|
||
start_datetime = datetime.combine(
|
||
interview.interview_date,
|
||
interview.interview_time
|
||
)
|
||
|
||
# Calculate end datetime based on interview duration
|
||
duration = interview.zoom_meeting.duration if interview.zoom_meeting else 60
|
||
end_datetime = start_datetime + timedelta(minutes=duration)
|
||
|
||
# Determine event color based on status
|
||
color = '#00636e' # Default color
|
||
if interview.status == 'confirmed':
|
||
color = '#00a86b' # Green for confirmed
|
||
elif interview.status == 'cancelled':
|
||
color = '#e74c3c' # Red for cancelled
|
||
elif interview.status == 'completed':
|
||
color = '#95a5a6' # Gray for completed
|
||
|
||
events.append({
|
||
'title': f"Interview: {interview.candidate.name}",
|
||
'start': start_datetime.isoformat(),
|
||
'end': end_datetime.isoformat(),
|
||
'url': f"{request.path}interview/{interview.id}/",
|
||
'color': color,
|
||
'extendedProps': {
|
||
'candidate': interview.candidate.name,
|
||
'email': interview.candidate.email,
|
||
'status': interview.status,
|
||
'meeting_id': interview.zoom_meeting.meeting_id if interview.zoom_meeting else None,
|
||
'join_url': interview.zoom_meeting.join_url if interview.zoom_meeting else None,
|
||
}
|
||
})
|
||
|
||
context = {
|
||
'job': job,
|
||
'events': events,
|
||
'calendar_color': '#00636e',
|
||
}
|
||
|
||
return render(request, 'recruitment/interview_calendar.html', context)
|
||
|
||
def interview_detail_view(request, slug, interview_id):
|
||
job = get_object_or_404(JobPosting, slug=slug)
|
||
interview = get_object_or_404(
|
||
ScheduledInterview,
|
||
id=interview_id,
|
||
job=job
|
||
)
|
||
|
||
context = {
|
||
'job': job,
|
||
'interview': interview,
|
||
}
|
||
|
||
return render(request, 'recruitment/interview_detail.html', context)
|