2025-10-05 12:19:45 +03:00

578 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import requests
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from django.http import JsonResponse
from recruitment.models import FormSubmission,Form,UploadedFile
from datetime import datetime
from django.views import View
from django.db.models import Q
from django.urls import reverse
from django.utils import timezone
from .forms import ZoomMeetingForm,JobPostingForm
from rest_framework import viewsets
from django.contrib import messages
from django.core.paginator import Paginator
from .linkedin_service import LinkedInService
from .models import ZoomMeeting, Job, Candidate, JobPosting
from .serializers import JobPostingSerializer, CandidateSerializer
from django.shortcuts import get_object_or_404, render, redirect
from django.views.generic import CreateView,UpdateView,DetailView,ListView
from .utils import create_zoom_meeting, delete_zoom_meeting, update_zoom_meeting
import logging
logger=logging.getLogger(__name__)
class JobPostingViewSet(viewsets.ModelViewSet):
queryset = JobPosting.objects.all()
serializer_class = JobPostingSerializer
class CandidateViewSet(viewsets.ModelViewSet):
queryset = Candidate.objects.all()
serializer_class = CandidateSerializer
class ZoomMeetingCreateView(CreateView):
model = ZoomMeeting
template_name = 'meetings/create_meeting.html'
form_class = ZoomMeetingForm
success_url = '/'
def form_valid(self, form):
instance = form.save(commit=False)
try:
topic = instance.topic
if instance.start_time < timezone.now():
messages.error(self.request, "Start time must be in the future.")
return redirect('/create-meeting/', status=400)
start_time = instance.start_time.isoformat() + "Z"
duration = instance.duration
result = create_zoom_meeting(topic, start_time, duration)
if result["status"] == "success":
instance.meeting_id = result['meeting_details']['meeting_id']
instance.join_url = result['meeting_details']['join_url']
instance.host_email = result['meeting_details']['host_email']
instance.zoom_gateway_response = result['zoom_gateway_response']
instance.save()
messages.success(self.request, result["message"])
return redirect('/', status=201)
else:
messages.error(self.request, result["message"])
return redirect('/', status=400)
except Exception as e:
return redirect('/', status=500)
class ZoomMeetingListView(ListView):
model = ZoomMeeting
template_name = 'meetings/list_meetings.html'
context_object_name = 'meetings'
paginate_by = 10
def get_queryset(self):
queryset = super().get_queryset().order_by('-start_time')
# Handle search
search_query = self.request.GET.get('search', '')
if search_query:
queryset = queryset.filter(
Q(topic__icontains=search_query) |
Q(meeting_id__icontains=search_query) |
Q(host_email__icontains=search_query)
)
return queryset
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['search_query'] = self.request.GET.get('search', '')
return context
class ZoomMeetingDetailsView(DetailView):
model = ZoomMeeting
template_name = 'meetings/meeting_details.html'
context_object_name = 'meeting'
class ZoomMeetingUpdateView(UpdateView):
model = ZoomMeeting
form_class = ZoomMeetingForm
context_object_name = 'meeting'
template_name = 'meetings/update_meeting.html'
success_url = '/'
def form_valid(self, form):
instance = form.save(commit=False)
updated_data = {
'topic': instance.topic,
'start_time': instance.start_time.isoformat() + "Z",
'duration': instance.duration
}
if instance.start_time < timezone.now():
messages.error(self.request, "Start time must be in the future.")
return redirect(f'/update-meeting/{instance.pk}/', status=400)
result = update_zoom_meeting(instance.meeting_id, updated_data)
if result["status"] == "success":
instance.save()
messages.success(self.request, result["message"])
return redirect(reverse('meeting_details', kwargs={'pk': instance.pk}))
else:
messages.error(self.request, result["message"])
return redirect(reverse('meeting_details', kwargs={'pk': instance.pk}))
def ZoomMeetingDeleteView(request, pk):
meeting = get_object_or_404(ZoomMeeting, pk=pk)
meeting_id = meeting.meeting_id
try:
result = delete_zoom_meeting(meeting_id)
if result["status"] == "success":
meeting.delete()
messages.success(request, result["message"])
else:
messages.error(request, result["message"])
return redirect('/')
except Exception as e:
messages.error(request, str(e))
return redirect('/')
#Job Posting
def job_list(request):
"""Display the list of job postings order by creation date descending"""
jobs=JobPosting.objects.all().order_by('-created_at')
# Filter by status if provided
status=request.GET.get('status')
if status:
jobs=jobs.filter(status=status)
#pagination
paginator=Paginator(jobs,10) # Show 10 jobs per page
page_number=request.GET.get('page')
page_obj=paginator.get_page(page_number)
return render(request, 'jobs/job_list.html', {
'page_obj': page_obj,
'status_filter': status
})
def create_job(request):
"""Create a new job posting"""
if request.method=='POST':
form=JobPostingForm(request.POST,is_anonymous_user=not request.user.is_authenticated)
#to check user is authenticated or not
if form.is_valid():
try:
job=form.save(commit=False)
if request.user.is_authenticated:
job.created_by=request.user.get_full_name() or request.user.username
else:
job.created_by=request.POST.get('created_by','').strip()
if not job.created_by:
job.created_by="University Administrator"
job.save()
messages.success(request,f'Job "{job.title}" created successfully!')
return redirect('job_list')
except Exception as e:
logger.error(f"Error creating job: {e}")
messages.error(request,f"Error creating job: {e}")
else:
messages.error(request, f'Please correct the errors below.{form.errors}')
else:
form=JobPostingForm(is_anonymous_user=not request.user.is_authenticated)
return render(request,'jobs/create_job.html',{'form':form})
def edit_job(request,slug):
"""Edit an existing job posting"""
if request.method=='POST':
job=get_object_or_404(JobPosting,slug=slug)
form=JobPostingForm(request.POST,instance=job,is_anonymous_user=not request.user.is_authenticated)
if form.is_valid():
try:
job=form.save(commit=False)
if request.user.is_authenticated:
job.created_by=request.user.get_full_name() or request.user.username
else:
job.created_by=request.POST.get('created_by','').strip()
if not job.created_by:
job.created_by="University Administrator"
job.save()
messages.success(request,f'Job "{job.title}" updated successfully!')
return redirect('job_list')
except Exception as e:
logger.error(f"Error updating job: {e}")
messages.error(request,f"Error updating job: {e}")
else:
messages.error(request, 'Please correct the errors below.')
else:
job=get_object_or_404(JobPosting,slug=slug)
form=JobPostingForm(instance=job,is_anonymous_user=not request.user.is_authenticated)
return render(request,'jobs/edit_job.html',{'form':form,'job':job})
def job_detail(request, slug):
"""View details of a specific job"""
job = get_object_or_404(JobPosting, slug=slug)
# Get all candidates for this job, ordered by most recent
candidates = job.candidates.all().order_by('-created_at')
# Count candidates by stage for summary statistics
total_candidates = candidates.count()
applied_count = candidates.filter(stage='Applied').count()
interview_count = candidates.filter(stage='Interview').count()
offer_count = candidates.filter(stage='Offer').count()
context = {
'job': job,
'candidates': candidates,
'total_candidates': total_candidates,
'applied_count': applied_count,
'interview_count': interview_count,
'offer_count': offer_count,
}
return render(request, 'jobs/job_detail.html', context)
\
def post_to_linkedin(request,slug):
"""Post a job to LinkedIn"""
job=get_object_or_404(JobPosting,slug=slug)
if job.status!='ACTIVE':
messages.info(request,'Only active jobs can be posted to LinkedIn.')
return redirect('job_list')
if request.method=='POST':
try:
# Check if user is authenticated with LinkedIn
if 'linkedin_access_token' not in request.session:
messages.error(request,'Please authenticate with LinkedIn first.')
return redirect('linkedin_login')
# Clear previous LinkedIn data for re-posting
job.posted_to_linkedin=False
job.linkedin_post_id=''
job.linkedin_post_url=''
job.linkedin_post_status=''
job.linkedin_posted_at=None
job.save()
# Initialize LinkedIn service
service=LinkedInService()
service.access_token=request.session['linkedin_access_token']
# Post to LinkedIn
result=service.create_job_post(job)
if result['success']:
# Update job with LinkedIn info
job.posted_to_linkedin=True
job.linkedin_post_id=result['post_id']
job.linkedin_post_url=result['post_url']
job.linkedin_post_status='SUCCESS'
job.linkedin_posted_at=timezone.now()
job.save()
messages.success(request,'Job posted to LinkedIn successfully!')
else:
error_msg=result.get('error','Unknown error')
job.linkedin_post_status=f'ERROR: {error_msg}'
job.save()
messages.error(request,f'Error posting to LinkedIn: {error_msg}')
except Exception as e:
logger.error(f"Error in post_to_linkedin: {e}")
job.linkedin_post_status = f'ERROR: {str(e)}'
job.save()
messages.error(request, f'Error posting to LinkedIn: {e}')
return redirect('job_detail', slug=job.slug)
def linkedin_login(request):
"""Redirect to LinkedIn OAuth"""
service=LinkedInService()
auth_url=service.get_auth_url()
"""
It creates a special URL that:
Sends the user to LinkedIn to log in
Asks the user to grant your app permission to post on their behalf
Tells LinkedIn where to send the user back after they approve (your redirect_uri)
http://yoursite.com/linkedin/callback/?code=TEMPORARY_CODE_HERE
"""
return redirect(auth_url)
def linkedin_callback(request):
"""Handle LinkedIn OAuth callback"""
code=request.GET.get('code')
if not code:
messages.error(request,'No authorization code received from LinkedIn.')
return redirect('job_list')
try:
service=LinkedInService()
#get_access_token(code)->It makes a POST request to LinkedIns token endpoint with parameters
access_token=service.get_access_token(code)
request.session['linkedin_access_token']=access_token
request.session['linkedin_authenticated']=True
messages.success(request,'Successfully authenticated with LinkedIn!')
except Exception as e:
logger.error(f"LinkedIn authentication error: {e}")
messages.error(request,f'LinkedIn authentication failed: {e}')
return redirect('job_list')
#applicant views
def applicant_job_detail(request,slug):
"""View job details for applicants"""
job=get_object_or_404(JobPosting,slug=slug,status='ACTIVE')
return render(request,'jobs/applicant_job_detail.html',{'job':job})
def form_builder(request):
return render(request,'form_builder.html')
# Form Preview Views
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.core.paginator import Paginator
from django.contrib.auth.decorators import login_required
import json
def form_list(request):
"""Display list of all available forms"""
forms = Form.objects.filter(is_active=True).order_by('-created_at')
# Pagination
paginator = Paginator(forms, 12)
page_number = request.GET.get('page')
page_obj = paginator.get_page(page_number)
return render(request, 'forms/form_list.html', {
'page_obj': page_obj
})
def form_preview(request, form_id):
"""Display form preview for end users"""
form = get_object_or_404(Form, id=form_id, is_active=True)
# Get submission count for analytics
submission_count = form.submissions.count()
return render(request, 'forms/form_preview.html', {
'form': form,
'submission_count': submission_count,
'is_embed': request.GET.get('embed', 'false') == 'true'
})
@csrf_exempt
def form_submit(request, form_id):
"""Handle form submission via AJAX"""
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Only POST method allowed'}, status=405)
form = get_object_or_404(Form, id=form_id, is_active=True)
try:
# Parse form data
submission_data = {}
files = {}
# Process regular form fields
for key, value in request.POST.items():
if key != 'csrfmiddlewaretoken':
submission_data[key] = value
# Process file uploads
for key, file in request.FILES.items():
if file:
files[key] = file
# Create form submission
submission = FormSubmission.objects.create(
form=form,
submission_data=submission_data,
ip_address=request.META.get('REMOTE_ADDR'),
user_agent=request.META.get('HTTP_USER_AGENT', '')
)
# Handle file uploads
for field_id, file in files.items():
UploadedFile.objects.create(
submission=submission,
field_id=field_id,
file=file,
original_filename=file.name
)
# TODO: Send email notification if configured
return JsonResponse({
'success': True,
'message': 'Form submitted successfully!',
'submission_id': submission.id
})
except Exception as e:
logger.error(f"Error submitting form {form_id}: {e}")
return JsonResponse({
'success': False,
'error': 'An error occurred while submitting the form. Please try again.'
}, status=500)
def form_embed(request, form_id):
"""Display embeddable version of form"""
form = get_object_or_404(Form, id=form_id, is_active=True)
return render(request, 'forms/form_embed.html', {
'form': form,
'is_embed': True
})
@login_required
def save_form_builder(request):
"""Save form from builder to database"""
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Only POST method allowed'}, status=405)
try:
data = json.loads(request.body)
form_data = data.get('form', {})
# Check if this is an update or create
form_id = data.get('form_id')
if form_id:
# Update existing form
form = Form.objects.get(id=form_id, created_by=request.user)
form.title = form_data.get('title', 'Untitled Form')
form.description = form_data.get('description', '')
form.structure = form_data
form.save()
else:
# Create new form
form = Form.objects.create(
title=form_data.get('title', 'Untitled Form'),
description=form_data.get('description', ''),
structure=form_data,
created_by=request.user
)
return JsonResponse({
'success': True,
'form_id': form.id,
'message': 'Form saved successfully!'
})
except json.JSONDecodeError:
return JsonResponse({
'success': False,
'error': 'Invalid JSON data'
}, status=400)
except Exception as e:
logger.error(f"Error saving form: {e}")
return JsonResponse({
'success': False,
'error': 'An error occurred while saving the form'
}, status=500)
@login_required
def load_form(request, form_id):
"""Load form data for editing in builder"""
try:
form = get_object_or_404(Form, id=form_id, created_by=request.user)
return JsonResponse({
'success': True,
'form': {
'id': form.id,
'title': form.title,
'description': form.description,
'structure': form.structure
}
})
except Exception as e:
logger.error(f"Error loading form {form_id}: {e}")
return JsonResponse({
'success': False,
'error': 'An error occurred while loading the form'
}, status=500)
@csrf_exempt
def update_form_builder(request, form_id):
"""Update existing form from builder"""
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Only POST method allowed'}, status=405)
try:
form = get_object_or_404(Form, id=form_id)
# Check if user has permission to edit this form
if form.created_by != request.user:
return JsonResponse({
'success': False,
'error': 'You do not have permission to edit this form'
}, status=403)
data = json.loads(request.body)
form_data = data.get('form', {})
# Update form
form.title = form_data.get('title', 'Untitled Form')
form.description = form_data.get('description', '')
form.structure = form_data
form.save()
return JsonResponse({
'success': True,
'form_id': form.id,
'message': 'Form updated successfully!'
})
except json.JSONDecodeError:
return JsonResponse({
'success': False,
'error': 'Invalid JSON data'
}, status=400)
except Exception as e:
logger.error(f"Error updating form {form_id}: {e}")
return JsonResponse({
'success': False,
'error': 'An error occurred while updating the form'
}, status=500)
def edit_form(request, form_id):
"""Display form edit page"""
form = get_object_or_404(Form, id=form_id)
# Check if user has permission to edit this form
if form.created_by != request.user:
messages.error(request, 'You do not have permission to edit this form.')
return redirect('form_list')
return render(request, 'forms/edit_form.html', {
'form': form
})
def form_submissions(request, form_id):
"""View submissions for a specific form"""
form = get_object_or_404(Form, id=form_id, created_by=request.user)
submissions = form.submissions.all().order_by('-submitted_at')
# Pagination
paginator = Paginator(submissions, 20)
page_number = request.GET.get('page')
page_obj = paginator.get_page(page_number)
return render(request, 'forms/form_submissions.html', {
'form': form,
'page_obj': page_obj
})