443 lines
18 KiB
Python
443 lines
18 KiB
Python
|
||
|
||
|
||
# def job_list(request):
|
||
# """Display list of all job postings"""
|
||
# jobs = JobPosting.objects.all().order_by('-created_at')
|
||
|
||
# # Filter by status if provided
|
||
# status = request.GET.get('status')
|
||
# if status:
|
||
# jobs = jobs.filter(status=status)
|
||
|
||
# paginator = Paginator(jobs, 10) # Show 10 jobs per page
|
||
# page_number = request.GET.get('page')
|
||
# page_obj = paginator.get_page(page_number)
|
||
|
||
# return render(request, 'jobs/job_list.html', {
|
||
# 'page_obj': page_obj,
|
||
# 'status_filter': status
|
||
# })
|
||
|
||
|
||
# def create_job(request):
|
||
# """Create a new job posting"""
|
||
# if request.method == 'POST':
|
||
# try:
|
||
# job = JobPosting()
|
||
# job.title = request.POST.get('title', '').strip()
|
||
# job.department = request.POST.get('department', '').strip()
|
||
# job.job_type = request.POST.get('job_type', 'FULL_TIME')
|
||
# job.workplace_type = request.POST.get('workplace_type', 'ON_SITE')
|
||
# job.location_city = request.POST.get('location_city', '').strip()
|
||
# job.location_state = request.POST.get('location_state', '').strip()
|
||
# job.location_country = request.POST.get('location_country', 'United States').strip()
|
||
# job.description = request.POST.get('description', '').strip()
|
||
# job.qualifications = request.POST.get('qualifications', '').strip()
|
||
# job.salary_range = request.POST.get('salary_range', '').strip()
|
||
# job.benefits = request.POST.get('benefits', '').strip()
|
||
# job.application_url = request.POST.get('application_url', '').strip()
|
||
# job.application_deadline = request.POST.get('application_deadline') or None
|
||
# job.application_instructions = request.POST.get('application_instructions', '').strip()
|
||
# # job.created_by = request.POST.get('created_by', request.user.get_full_name() or request.user.username)
|
||
# # Handle created_by without user authentication
|
||
# job.created_by = request.POST.get('created_by', '').strip()
|
||
# if not job.created_by:
|
||
# job.created_by = "University Administrator"
|
||
|
||
# job.position_number = request.POST.get('position_number', '').strip()
|
||
# job.reporting_to = request.POST.get('reporting_to', '').strip()
|
||
# job.start_date = request.POST.get('start_date') or None
|
||
|
||
# # Set initial status
|
||
# status = request.POST.get('status', 'DRAFT')
|
||
# if status in ['ACTIVE', 'CLOSED', 'ARCHIVED']:
|
||
# job.status = status
|
||
|
||
# job.save()
|
||
# messages.success(request, f'Job "{job.title}" created successfully!')
|
||
# return redirect('job_detail', job_id=job.id)
|
||
|
||
# except Exception as e:
|
||
# logger.error(f"Error creating job: {e}")
|
||
# messages.error(request, f'Error creating job: {e}')
|
||
|
||
# return render(request, 'jobs/create_job.html')
|
||
|
||
|
||
# def job_detail(request, job_id):
|
||
# """View details of a specific job"""
|
||
# job = get_object_or_404(JobPosting, id=job_id)
|
||
# return render(request, 'jobs/job_detail.html', {'job': job})
|
||
|
||
|
||
# def edit_job(request, job_id):
|
||
# """Edit an existing job posting"""
|
||
# job = get_object_or_404(JobPosting, id=job_id)
|
||
|
||
# if request.method == 'POST':
|
||
# try:
|
||
# job.title = request.POST.get('title', job.title).strip()
|
||
# job.department = request.POST.get('department', job.department).strip()
|
||
# job.job_type = request.POST.get('job_type', job.job_type)
|
||
# job.workplace_type = request.POST.get('workplace_type', job.workplace_type)
|
||
# job.location_city = request.POST.get('location_city', job.location_city).strip()
|
||
# job.location_state = request.POST.get('location_state', job.location_state).strip()
|
||
# job.location_country = request.POST.get('location_country', job.location_country).strip()
|
||
# job.description = request.POST.get('description', job.description).strip()
|
||
# job.qualifications = request.POST.get('qualifications', job.qualifications).strip()
|
||
# job.salary_range = request.POST.get('salary_range', job.salary_range).strip()
|
||
# job.benefits = request.POST.get('benefits', job.benefits).strip()
|
||
# job.application_url = request.POST.get('application_url', job.application_url).strip()
|
||
# job.application_deadline = request.POST.get('application_deadline') or job.application_deadline
|
||
# job.application_instructions = request.POST.get('application_instructions', job.application_instructions).strip()
|
||
# job.created_by = request.POST.get('created_by', job.created_by).strip()
|
||
# job.position_number = request.POST.get('position_number', job.position_number).strip()
|
||
# job.reporting_to = request.POST.get('reporting_to', job.reporting_to).strip()
|
||
# job.start_date = request.POST.get('start_date') or job.start_date
|
||
|
||
# # Update status
|
||
# status = request.POST.get('status')
|
||
# if status and status in dict(JobPosting.STATUS_CHOICES):
|
||
# job.status = status
|
||
|
||
# job.save()
|
||
# messages.success(request, f'Job "{job.title}" updated successfully!')
|
||
# return redirect('job_detail', job_id=job.id)
|
||
|
||
# except Exception as e:
|
||
# logger.error(f"Error updating job: {e}")
|
||
# messages.error(request, f'Error updating job: {e}')
|
||
|
||
# return render(request, 'jobs/edit_job.html', {'job': job})
|
||
|
||
|
||
# def post_to_linkedin(request, job_id):
|
||
# """Post a job to LinkedIn"""
|
||
# job = get_object_or_404(JobPosting, id=job_id)
|
||
|
||
# if request.method == 'POST':
|
||
# try:
|
||
# # Check if user is authenticated with LinkedIn
|
||
# if 'linkedin_access_token' not in request.session:
|
||
# messages.error(request, 'Please authenticate with LinkedIn first.')
|
||
# return redirect('linkedin_login')
|
||
|
||
# # Initialize LinkedIn service
|
||
# service = LinkedInService()
|
||
# service.access_token = request.session['linkedin_access_token']
|
||
|
||
# # Post to LinkedIn
|
||
# result = service.create_job_post(job)
|
||
|
||
# if result['success']:
|
||
# # Update job with LinkedIn info
|
||
# job.posted_to_linkedin = True
|
||
# job.linkedin_post_id = result['post_id']
|
||
# job.linkedin_post_url = result['post_url']
|
||
# job.linkedin_post_status = 'SUCCESS'
|
||
# job.linkedin_posted_at = timezone.now()
|
||
# job.save()
|
||
|
||
# messages.success(request, 'Job posted to LinkedIn successfully!')
|
||
# else:
|
||
# error_msg = result.get('error', 'Unknown error')
|
||
# job.linkedin_post_status = f'ERROR: {error_msg}'
|
||
# job.save()
|
||
# messages.error(request, f'Error posting to LinkedIn: {error_msg}')
|
||
|
||
# except Exception as e:
|
||
# logger.error(f"Error in post_to_linkedin: {e}")
|
||
# job.linkedin_post_status = f'ERROR: {str(e)}'
|
||
# job.save()
|
||
# messages.error(request, f'Error posting to LinkedIn: {e}')
|
||
|
||
# return redirect('job_detail', job_id=job.id)
|
||
|
||
# # LinkedIn Authentication Views
|
||
# def linkedin_login(request):
|
||
# """Redirect to LinkedIn OAuth"""
|
||
# service = LinkedInService()
|
||
# auth_url = service.get_auth_url()
|
||
# return redirect(auth_url)
|
||
|
||
# def linkedin_callback(request):
|
||
# """Handle LinkedIn OAuth callback"""
|
||
# code = request.GET.get('code')
|
||
# if not code:
|
||
# messages.error(request, 'No authorization code received from LinkedIn.')
|
||
# return redirect('job_list')
|
||
|
||
# try:
|
||
# service = LinkedInService()
|
||
# access_token = service.get_access_token(code)
|
||
# request.session['linkedin_access_token'] = access_token
|
||
# request.session['linkedin_authenticated'] = True
|
||
# messages.success(request, 'Successfully authenticated with LinkedIn!')
|
||
# except Exception as e:
|
||
# logger.error(f"LinkedIn authentication error: {e}")
|
||
# messages.error(request, f'LinkedIn authentication failed: {e}')
|
||
|
||
# return redirect('job_list')
|
||
|
||
# @csrf_exempt
|
||
# def api_post_job_to_linkedin(request, job_id):
|
||
# """API endpoint to post job to LinkedIn (for automated systems)"""
|
||
# if request.method != 'POST':
|
||
# return JsonResponse({'error': 'POST method required'}, status=405)
|
||
|
||
# try:
|
||
# job = JobPosting.objects.get(id=job_id)
|
||
|
||
# # Check if LinkedIn is authenticated
|
||
# if 'linkedin_access_token' not in request.session:
|
||
# return JsonResponse({'error': 'Not authenticated with LinkedIn'}, status=401)
|
||
|
||
# service = LinkedInService()
|
||
# service.access_token = request.session['linkedin_access_token']
|
||
# result = service.create_job_post(job)
|
||
|
||
# if result['success']:
|
||
# job.posted_to_linkedin = True
|
||
# job.linkedin_post_id = result['post_id']
|
||
# job.linkedin_post_url = result['post_url']
|
||
# job.linkedin_post_status = 'SUCCESS'
|
||
# job.linkedin_posted_at = timezone.now()
|
||
# job.save()
|
||
|
||
# return JsonResponse({
|
||
# 'success': True,
|
||
# 'linkedin_post_id': result['post_id'],
|
||
# 'linkedin_post_url': result['post_url']
|
||
# })
|
||
# else:
|
||
# return JsonResponse({
|
||
# 'error': result.get('error', 'Unknown error'),
|
||
# 'status_code': result.get('status_code', 500)
|
||
# }, status=400)
|
||
|
||
# except JobPosting.DoesNotExist:
|
||
# return JsonResponse({'error': 'Job not found'}, status=404)
|
||
# except Exception as e:
|
||
# logger.error(f"API error: {e}")
|
||
# return JsonResponse({'error': str(e)}, status=500)
|
||
|
||
# from .forms import JobPostingForm
|
||
# def form_check(request):
|
||
# form=JobPostingForm()
|
||
# return render(request,'jobs/new_form.html',{'form':form})
|
||
|
||
from .import forms
|
||
from .import models
|
||
from django.shortcuts import render, get_object_or_404, redirect
|
||
from django.views.decorators.http import require_POST
|
||
from django.core.paginator import Paginator
|
||
from django.utils import timezone
|
||
from django.contrib import messages
|
||
from .linkedin_service import LinkedInService
|
||
import json
|
||
import logging
|
||
logger=logging.getLogger(__name__)
|
||
|
||
|
||
def job_list(request):
|
||
"""Display the list of job postings order by creation date descending"""
|
||
jobs=models.JobPosting.objects.all().order_by('-created_at')
|
||
|
||
# Filter by status if provided
|
||
status=request.GET.get('status')
|
||
if status:
|
||
jobs=jobs.filter(status=status)
|
||
|
||
#pagination
|
||
paginator=Paginator(jobs,10) # Show 10 jobs per page
|
||
page_number=request.GET.get('page')
|
||
page_obj=paginator.get_page(page_number)
|
||
return render(request, 'jobs/job_list.html', {
|
||
'page_obj': page_obj,
|
||
'status_filter': status
|
||
})
|
||
|
||
|
||
|
||
def create_job(request):
|
||
"""Create a new job posting"""
|
||
if request.method=='POST':
|
||
form=forms.JobPostingForm(request.POST,is_anonymous_user=not request.user.is_authenticated) # pass the boolean value
|
||
#to check user is authenticated or not
|
||
if form.is_valid():
|
||
try:
|
||
job=form.save(commit=False)
|
||
if request.user.is_authenticated:
|
||
job.created_by=request.user.get_full_name() or request.user.username
|
||
else:
|
||
job.created_by=request.POST.get('created_by','').strip()
|
||
if not job.created_by:
|
||
job.created_by="University Administrator"
|
||
job.save()
|
||
messages.success(request,f'Job "{job.title}" created successfully!')
|
||
return redirect('jobs:job_list')
|
||
except Exception as e:
|
||
logger.error(f"Error creating job: {e}")
|
||
messages.error(request,f"Error creating job: {e}")
|
||
else:
|
||
messages.error(request, 'Please correct the errors below.')
|
||
else:
|
||
form=forms.JobPostingForm(is_anonymous_user=not request.user.is_authenticated)
|
||
return render(request,'jobs/create_job.html',{'form':form})
|
||
|
||
|
||
|
||
|
||
|
||
def edit_job(request,job_id):
|
||
"""Edit an existing job posting"""
|
||
if request.method=='POST':
|
||
job=get_object_or_404(models.JobPosting,pk=job_id)
|
||
form=forms.JobPostingForm(request.POST,instance=job,is_anonymous_user=not request.user.is_authenticated)
|
||
if form.is_valid():
|
||
try:
|
||
job=form.save(commit=False)
|
||
if request.user.is_authenticated:
|
||
job.created_by=request.user.get_full_name() or request.user.username
|
||
else:
|
||
job.created_by=request.POST.get('created_by','').strip()
|
||
if not job.created_by:
|
||
job.created_by="University Administrator"
|
||
job.save()
|
||
messages.success(request,f'Job "{job.title}" updated successfully!')
|
||
return redirect('jobs:job_list')
|
||
except Exception as e:
|
||
logger.error(f"Error updating job: {e}")
|
||
messages.error(request,f"Error updating job: {e}")
|
||
else:
|
||
messages.error(request, 'Please correct the errors below.')
|
||
else:
|
||
job=get_object_or_404(models.JobPosting,pk=job_id)
|
||
form=forms.JobPostingForm(instance=job,is_anonymous_user=not request.user.is_authenticated)
|
||
return render(request,'jobs/edit_job.html',{'form':form,'job_id':job_id})
|
||
|
||
def job_detail(request, job_id):
|
||
"""View details of a specific job"""
|
||
job = get_object_or_404(models.JobPosting, pk=job_id)
|
||
image_form=forms.PostImageUploadForm()
|
||
return render(request, 'jobs/job_detail.html', {'job': job,'image_form':image_form})
|
||
|
||
@require_POST
|
||
def create_jobpost_image(request,job_id):
|
||
"""Handle image upload for a job posting"""
|
||
job=get_object_or_404(models.JobPosting,pk=job_id)
|
||
form=forms.PostImageUploadForm(request.POST,request.FILES)
|
||
if form.is_valid():
|
||
try:
|
||
post_image=form.save(commit=False)
|
||
post_image.job_posting=job
|
||
post_image.save()
|
||
messages.success(request,'Image uploaded successfully!')
|
||
except Exception as e:
|
||
logger.error(f"Error uploading image: {e}")
|
||
messages.error(request,f'Error uploading image: {e}')
|
||
else:
|
||
messages.error(request,'Please correct the errors below.')
|
||
return redirect('jobs:job_detail',job_id=job.pk)
|
||
|
||
|
||
|
||
|
||
def post_to_linkedin(request,job_id):
|
||
"""Post a job to LinkedIn"""
|
||
job=get_object_or_404(models.JobPosting,pk=job_id)
|
||
if job.status!='ACTIVE':
|
||
messages.info(request,'Only active jobs can be posted to LinkedIn.')
|
||
return redirect('jobs:job_list')
|
||
|
||
if request.method=='POST':
|
||
try:
|
||
# Check if user is authenticated with LinkedIn
|
||
if 'linkedin_access_token' not in request.session:
|
||
messages.error(request,'Please authenticate with LinkedIn first.')
|
||
return redirect('linkedin_login')
|
||
|
||
# Clear previous LinkedIn data for re-posting
|
||
job.posted_to_linkedin=False
|
||
job.linkedin_post_id=''
|
||
job.linkedin_post_url=''
|
||
job.linkedin_post_status=''
|
||
job.linkedin_posted_at=None
|
||
job.save()
|
||
|
||
# Initialize LinkedIn service
|
||
service=LinkedInService()
|
||
service.access_token=request.session['linkedin_access_token']
|
||
|
||
# Post to LinkedIn
|
||
result=service.create_job_post(job)
|
||
if result['success']:
|
||
# Update job with LinkedIn info
|
||
job.posted_to_linkedin=True
|
||
job.linkedin_post_id=result['post_id']
|
||
job.linkedin_post_url=result['post_url']
|
||
job.linkedin_post_status='SUCCESS'
|
||
job.linkedin_posted_at=timezone.now()
|
||
job.save()
|
||
|
||
messages.success(request,'Job posted to LinkedIn successfully!')
|
||
else:
|
||
error_msg=result.get('error','Unknown error')
|
||
job.linkedin_post_status=f'ERROR: {error_msg}'
|
||
job.save()
|
||
messages.error(request,f'Error posting to LinkedIn: {error_msg}')
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in post_to_linkedin: {e}")
|
||
job.linkedin_post_status = f'ERROR: {str(e)}'
|
||
job.save()
|
||
messages.error(request, f'Error posting to LinkedIn: {e}')
|
||
|
||
return redirect('jobs:job_detail', job_id=job.pk)
|
||
|
||
def linkedin_login(request):
|
||
"""Redirect to LinkedIn OAuth"""
|
||
service=LinkedInService()
|
||
auth_url=service.get_auth_url()
|
||
"""
|
||
It creates a special URL that:
|
||
Sends the user to LinkedIn to log in
|
||
Asks the user to grant your app permission to post on their behalf
|
||
Tells LinkedIn where to send the user back after they approve (your redirect_uri)
|
||
http://yoursite.com/linkedin/callback/?code=TEMPORARY_CODE_HERE
|
||
"""
|
||
return redirect(auth_url)
|
||
|
||
|
||
def linkedin_callback(request):
|
||
"""Handle LinkedIn OAuth callback"""
|
||
code=request.GET.get('code')
|
||
if not code:
|
||
messages.error(request,'No authorization code received from LinkedIn.')
|
||
return redirect('jobs:job_list')
|
||
|
||
try:
|
||
service=LinkedInService()
|
||
#get_access_token(code)->It makes a POST request to LinkedIn’s token endpoint with parameters
|
||
access_token=service.get_access_token(code)
|
||
request.session['linkedin_access_token']=access_token
|
||
request.session['linkedin_authenticated']=True
|
||
messages.success(request,'Successfully authenticated with LinkedIn!')
|
||
except Exception as e:
|
||
logger.error(f"LinkedIn authentication error: {e}")
|
||
messages.error(request,f'LinkedIn authentication failed: {e}')
|
||
|
||
return redirect('jobs:job_list')
|
||
|
||
|
||
|
||
|
||
#applicant views
|
||
def applicant_job_detail(request,job_id):
|
||
"""View job details for applicants"""
|
||
job=get_object_or_404(models.JobPosting,pk=job_id,status='ACTIVE')
|
||
return render(request,'jobs/applicant_job_detail.html',{'job':job})
|
||
|
||
|