kaauh_ats/recruitment/views_frontend.py

861 lines
31 KiB
Python

import json
import csv
from datetime import datetime
from django.shortcuts import render, get_object_or_404,redirect
from django.contrib import messages
from django.http import JsonResponse, HttpResponse
from django.db.models.fields.json import KeyTextTransform
from recruitment.utils import json_to_markdown_table
from django.db.models import Count, Avg, F, FloatField
from django.db.models.functions import Cast
from . import models
from django.utils.translation import get_language
from . import forms
from django.contrib.auth.decorators import login_required
import ast
from django.template.loader import render_to_string
from django.utils.text import slugify
# from .dashboard import get_dashboard_data
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.messages.views import SuccessMessageMixin
from django.views.generic import ListView, CreateView, UpdateView, DeleteView, DetailView
# JobForm removed - using JobPostingForm instead
from django.urls import reverse_lazy
from django.db.models import Q, Count, Avg
from django.db.models import FloatField
from datastar_py.django import (
DatastarResponse,
ServerSentEventGenerator as SSE,
read_signals,
)
# from rich import print
from rich.markdown import CodeBlock
class JobListView(LoginRequiredMixin, ListView):
model = models.JobPosting
template_name = 'jobs/job_list.html'
context_object_name = 'jobs'
paginate_by = 10
def get_queryset(self):
queryset = super().get_queryset().order_by('-created_at')
# Handle search
search_query = self.request.GET.get('search', '')
if search_query:
queryset = queryset.filter(
Q(title__icontains=search_query) |
Q(description__icontains=search_query) |
Q(department__icontains=search_query)
)
# Filter for non-staff users
if not self.request.user.is_staff:
queryset = queryset.filter(status='Published')
status=self.request.GET.get('status')
if status:
queryset=queryset.filter(status=status)
return queryset
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['search_query'] = self.request.GET.get('search', '')
context['lang'] = get_language()
return context
class JobCreateView(LoginRequiredMixin, SuccessMessageMixin, CreateView):
model = models.JobPosting
form_class = forms.JobPostingForm
template_name = 'jobs/create_job.html'
success_url = reverse_lazy('job_list')
success_message = 'Job created successfully.'
class JobUpdateView(LoginRequiredMixin, SuccessMessageMixin, UpdateView):
model = models.JobPosting
form_class = forms.JobPostingForm
template_name = 'jobs/edit_job.html'
success_url = reverse_lazy('job_list')
success_message = 'Job updated successfully.'
slug_url_kwarg = 'slug'
class JobDeleteView(LoginRequiredMixin, SuccessMessageMixin, DeleteView):
model = models.JobPosting
template_name = 'jobs/partials/delete_modal.html'
success_url = reverse_lazy('job_list')
success_message = 'Job deleted successfully.'
slug_url_kwarg = 'slug'
class JobCandidatesListView(LoginRequiredMixin, ListView):
model = models.Candidate
template_name = 'jobs/job_candidates_list.html'
context_object_name = 'candidates'
paginate_by = 10
def get_queryset(self):
# Get the job by slug
self.job = get_object_or_404(models.JobPosting, slug=self.kwargs['slug'])
# Filter candidates for this specific job
queryset = models.Candidate.objects.filter(job=self.job)
if self.request.GET.get('stage'):
stage=self.request.GET.get('stage')
queryset=queryset.filter(stage=stage)
# Handle search
search_query = self.request.GET.get('search', '')
if search_query:
queryset = queryset.filter(
Q(first_name__icontains=search_query) |
Q(last_name__icontains=search_query) |
Q(email__icontains=search_query) |
Q(phone__icontains=search_query) |
Q(stage__icontains=search_query)
)
# Filter for non-staff users
if not self.request.user.is_staff:
return models.Candidate.objects.none() # Restrict for non-staff
return queryset.order_by('-created_at')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['search_query'] = self.request.GET.get('search', '')
context['job'] = getattr(self, 'job', None)
return context
class CandidateListView(LoginRequiredMixin, ListView):
model = models.Candidate
template_name = 'recruitment/candidate_list.html'
context_object_name = 'candidates'
paginate_by = 100
def get_queryset(self):
queryset = super().get_queryset()
# Handle search
search_query = self.request.GET.get('search', '')
job = self.request.GET.get('job', '')
stage = self.request.GET.get('stage', '')
if search_query:
queryset = queryset.filter(
Q(first_name__icontains=search_query) |
Q(last_name__icontains=search_query) |
Q(email__icontains=search_query) |
Q(phone__icontains=search_query) |
Q(stage__icontains=search_query) |
Q(job__title__icontains=search_query)
)
if job:
queryset = queryset.filter(job__slug=job)
if stage:
queryset = queryset.filter(stage=stage)
# Filter for non-staff users
if not self.request.user.is_staff:
return models.Candidate.objects.none() # Restrict for non-staff
return queryset.order_by('-created_at')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['search_query'] = self.request.GET.get('search', '')
context['job_filter'] = self.request.GET.get('job', '')
context['stage_filter'] = self.request.GET.get('stage', '')
context['available_jobs'] = models.JobPosting.objects.all().order_by('created_at').distinct()
return context
class CandidateCreateView(LoginRequiredMixin, SuccessMessageMixin, CreateView):
model = models.Candidate
form_class = forms.CandidateForm
template_name = 'recruitment/candidate_create.html'
success_url = reverse_lazy('candidate_list')
success_message = 'Candidate created successfully.'
def get_initial(self):
initial = super().get_initial()
if 'slug' in self.kwargs:
job = get_object_or_404(models.JobPosting, slug=self.kwargs['slug'])
initial['job'] = job
return initial
def form_valid(self, form):
if 'slug' in self.kwargs:
job = get_object_or_404(models.JobPosting, slug=self.kwargs['slug'])
form.instance.job = job
return super().form_valid(form)
class CandidateUpdateView(LoginRequiredMixin, SuccessMessageMixin, UpdateView):
model = models.Candidate
form_class = forms.CandidateForm
template_name = 'recruitment/candidate_update.html'
success_url = reverse_lazy('candidate_list')
success_message = 'Candidate updated successfully.'
slug_url_kwarg = 'slug'
class CandidateDeleteView(LoginRequiredMixin, SuccessMessageMixin, DeleteView):
model = models.Candidate
template_name = 'recruitment/candidate_delete.html'
success_url = reverse_lazy('candidate_list')
success_message = 'Candidate deleted successfully.'
slug_url_kwarg = 'slug'
# def job_detail(request, slug):
# job = get_object_or_404(models.JobPosting, slug=slug, status='Published')
# form = forms.CandidateForm()
# return render(request, 'jobs/job_detail.html', {'job': job, 'form': form})
@login_required
def training_list(request):
materials = models.TrainingMaterial.objects.all().order_by('-created_at')
return render(request, 'recruitment/training_list.html', {'materials': materials})
@login_required
def candidate_detail(request, slug):
from rich.json import JSON
candidate = get_object_or_404(models.Candidate, slug=slug)
try:
parsed = ast.literal_eval(candidate.parsed_summary)
except:
parsed = {}
# Create stage update form for staff users
stage_form = None
if request.user.is_staff:
stage_form = forms.CandidateStageForm()
# parsed = JSON(json.dumps(parsed), indent=2, highlight=True, skip_keys=False, ensure_ascii=False, check_circular=True, allow_nan=True, default=None, sort_keys=False)
# parsed = json_to_markdown_table([parsed])
return render(request, 'recruitment/candidate_detail.html', {
'candidate': candidate,
'parsed': parsed,
'stage_form': stage_form,
})
@login_required
def candidate_resume_template_view(request, slug):
"""Display formatted resume template for a candidate"""
candidate = get_object_or_404(models.Candidate, slug=slug)
if not request.user.is_staff:
messages.error(request, _("You don't have permission to view this page."))
return redirect('candidate_list')
return render(request, 'recruitment/candidate_resume_template.html', {
'candidate': candidate
})
@login_required
def candidate_update_stage(request, slug):
"""Handle HTMX stage update requests"""
candidate = get_object_or_404(models.Candidate, slug=slug)
form = forms.CandidateStageForm(request.POST, instance=candidate)
if form.is_valid():
stage_value = form.cleaned_data['stage']
candidate.stage = stage_value
candidate.save(update_fields=['stage'])
messages.success(request,"Candidate Stage Updated")
return redirect("candidate_detail",slug=candidate.slug)
class TrainingListView(LoginRequiredMixin, ListView):
model = models.TrainingMaterial
template_name = 'recruitment/training_list.html'
context_object_name = 'materials'
paginate_by = 10
def get_queryset(self):
queryset = super().get_queryset()
# Handle search
search_query = self.request.GET.get('search', '')
if search_query:
queryset = queryset.filter(
Q(title__icontains=search_query)
)
# Filter for non-staff users
if not self.request.user.is_staff:
return models.TrainingMaterial.objects.none() # Restrict for non-staff
return queryset.filter(created_by=self.request.user).order_by('-created_at')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['search_query'] = self.request.GET.get('search', '')
return context
class TrainingCreateView(LoginRequiredMixin, SuccessMessageMixin, CreateView):
model = models.TrainingMaterial
form_class = forms.TrainingMaterialForm
template_name = 'recruitment/training_create.html'
success_url = reverse_lazy('training_list')
success_message = 'Training material created successfully.'
def form_valid(self, form):
form.instance.created_by = self.request.user
return super().form_valid(form)
class TrainingUpdateView(LoginRequiredMixin, SuccessMessageMixin, UpdateView):
model = models.TrainingMaterial
form_class = forms.TrainingMaterialForm
template_name = 'recruitment/training_update.html'
success_url = reverse_lazy('training_list')
success_message = 'Training material updated successfully.'
slug_url_kwarg = 'slug'
class TrainingDetailView(LoginRequiredMixin, DetailView):
model = models.TrainingMaterial
template_name = 'recruitment/training_detail.html'
context_object_name = 'material'
slug_url_kwarg = 'slug'
class TrainingDeleteView(LoginRequiredMixin, SuccessMessageMixin, DeleteView):
model = models.TrainingMaterial
template_name = 'recruitment/training_delete.html'
success_url = reverse_lazy('training_list')
success_message = 'Training material deleted successfully.'
@login_required
def dashboard_view(request):
# --- Performance Optimization: Aggregate Data in ONE Query ---
# 1. Base Job Query: Get all jobs and annotate with candidate count
jobs_with_counts = models.JobPosting.objects.annotate(
candidate_count=Count('candidates')
).order_by('-candidate_count')
total_jobs = jobs_with_counts.count()
total_candidates = models.Candidate.objects.count()
job_titles = [job.title for job in jobs_with_counts]
job_app_counts = [job.candidate_count for job in jobs_with_counts]
average_applications = round(jobs_with_counts.aggregate(
avg_apps=Avg('candidate_count')
)['avg_apps'] or 0, 2)
# 5. New: Candidate Quality & Funnel Metrics
# Assuming 'match_score' is a direct IntegerField/FloatField on the Candidate model
# (based on the final, optimized version of handle_reume_parsing_and_scoring)
# Average Match Score (Overall Quality)
candidates_with_score = models.Candidate.objects.filter(
# Filter only candidates that have been parsed/scored
is_resume_parsed=True
).annotate(
score_as_text=KeyTextTransform(
'match_score',
KeyTextTransform('scoring_data', F('ai_analysis_data'))
)
).annotate(
# Cast the extracted text score to a FloatField so AVG() can operate on it.
sortable_score=Cast('score_as_text', output_field=FloatField())
)
# 2b. AGGREGATE using the newly created 'sortable_score' field
avg_match_score_result = candidates_with_score.aggregate(
avg_score=Avg('sortable_score')
)['avg_score']
avg_match_score = round(avg_match_score_result or 0, 1)
# 2c. Use the annotated QuerySet for other metrics
# Scored Candidates Ratio (Now simpler, as we filtered the QuerySet)
total_scored = candidates_with_score.count()
scored_ratio = round((total_scored / total_candidates) * 100, 1) if total_candidates > 0 else 0
# High Potential Candidates (Filter the annotated QuerySet)
high_potential_count = candidates_with_score.filter(
sortable_score__gte=75
).count()
high_potential_ratio = round((high_potential_count / total_candidates) * 100, 1) if total_candidates > 0 else 0
jobs=models.JobPosting.objects.all().order_by('internal_job_id')
selected_job_id=request.GET.get('selected_job_id','')
candidate_stage=['APPLIED','EXAM','INTERVIEW','OFFER']
apply_count,exam_count,interview_count,offer_count=[0]*4
if selected_job_id:
job=jobs.get(internal_job_id=selected_job_id)
apply_count=job.screening_candidates_count
exam_count=job.exam_candidates_count
interview_count=job.interview_candidates_count
offer_count=job.offer_candidates_count
all_candidates_count=job.all_candidates_count
else: #default job
job=jobs.first()
apply_count=job.screening_candidates_count
exam_count=job.exam_candidates_count
interview_count=job.interview_candidates_count
offer_count=job.offer_candidates_count
all_candidates_count=job.all_candidates_count
candidates_count=[ apply_count,exam_count,interview_count,offer_count ]
context = {
'total_jobs': total_jobs,
'total_candidates': total_candidates,
'average_applications': average_applications,
# Chart Data
'job_titles': json.dumps(job_titles),
'job_app_counts': json.dumps(job_app_counts),
# New Analytical Metrics (FIXED)
'avg_match_score': avg_match_score,
'high_potential_count': high_potential_count,
'high_potential_ratio': high_potential_ratio,
'scored_ratio': scored_ratio,
'current_job_id':selected_job_id,
'jobs':jobs,
'all_candidates_count':all_candidates_count,
'candidate_stage':json.dumps(candidate_stage),
'candidates_count':json.dumps(candidates_count)
,'my_job':job
}
return render(request, 'recruitment/dashboard.html', context)
@login_required
def candidate_offer_view(request, slug):
"""View for candidates in the Offer stage"""
job = get_object_or_404(models.JobPosting, slug=slug)
# Filter candidates for this specific job and stage
candidates = job.offer_candidates
# Handle search
search_query = request.GET.get('search', '')
if search_query:
candidates = candidates.filter(
Q(first_name__icontains=search_query) |
Q(last_name__icontains=search_query) |
Q(email__icontains=search_query) |
Q(phone__icontains=search_query)
)
candidates = candidates.order_by('-created_at')
context = {
'job': job,
'candidates': candidates,
'search_query': search_query,
'current_stage': 'Offer',
}
return render(request, 'recruitment/candidate_offer_view.html', context)
@login_required
def candidate_hired_view(request, slug):
"""View for hired candidates"""
job = get_object_or_404(models.JobPosting, slug=slug)
# Filter candidates with offer_status = 'Accepted'
candidates = job.candidates.filter(offer_status='Accepted')
# Handle search
search_query = request.GET.get('search', '')
if search_query:
candidates = candidates.filter(
Q(first_name__icontains=search_query) |
Q(last_name__icontains=search_query) |
Q(email__icontains=search_query) |
Q(phone__icontains=search_query)
)
candidates = candidates.order_by('-created_at')
context = {
'job': job,
'candidates': candidates,
'search_query': search_query,
'current_stage': 'Hired',
}
return render(request, 'recruitment/candidate_hired_view.html', context)
@login_required
def update_candidate_status(request, job_slug, candidate_slug, stage_type, status):
"""Handle exam/interview/offer status updates"""
from django.utils import timezone
job = get_object_or_404(models.JobPosting, slug=job_slug)
candidate = get_object_or_404(models.Candidate, slug=candidate_slug, job=job)
if request.method == "POST":
if stage_type == 'exam':
candidate.exam_status = status
candidate.exam_date = timezone.now()
candidate.save(update_fields=['exam_status', 'exam_date'])
return render(request,'recruitment/partials/exam-results.html',{'candidate':candidate,'job':job})
elif stage_type == 'interview':
candidate.interview_status = status
candidate.interview_date = timezone.now()
candidate.save(update_fields=['interview_status', 'interview_date'])
return render(request,'recruitment/partials/interview-results.html',{'candidate':candidate,'job':job})
elif stage_type == 'offer':
candidate.offer_status = status
candidate.offer_date = timezone.now()
candidate.save(update_fields=['offer_status', 'offer_date'])
return render(request,'recruitment/partials/offer-results.html',{'candidate':candidate,'job':job})
return redirect('candidate_detail', candidate.slug)
else:
if stage_type == 'exam':
return render(request,"includes/candidate_update_exam_form.html",{'candidate':candidate,'job':job})
elif stage_type == 'interview':
return render(request,"includes/candidate_update_interview_form.html",{'candidate':candidate,'job':job})
elif stage_type == 'offer':
return render(request,"includes/candidate_update_offer_form.html",{'candidate':candidate,'job':job})
# Stage configuration for CSV export
STAGE_CONFIG = {
'screening': {
'filter': {'stage': 'Applied'},
'fields': ['name', 'email', 'phone', 'created_at', 'stage', 'ai_score', 'years_experience', 'screening_rating', 'professional_category', 'top_skills', 'strengths', 'weaknesses'],
'headers': ['Name', 'Email', 'Phone', 'Application Date', 'Screening Status', 'Match Score', 'Years Experience', 'Screening Rating', 'Professional Category', 'Top 3 Skills', 'Strengths', 'Weaknesses']
},
'exam': {
'filter': {'stage': 'Exam'},
'fields': ['name', 'email', 'phone', 'created_at', 'exam_status', 'exam_date', 'ai_score', 'years_experience', 'screening_rating'],
'headers': ['Name', 'Email', 'Phone', 'Application Date', 'Exam Status', 'Exam Date', 'Match Score', 'Years Experience', 'Screening Rating']
},
'interview': {
'filter': {'stage': 'Interview'},
'fields': ['name', 'email', 'phone', 'created_at', 'interview_status', 'interview_date', 'ai_score', 'years_experience', 'professional_category', 'top_skills'],
'headers': ['Name', 'Email', 'Phone', 'Application Date', 'Interview Status', 'Interview Date', 'Match Score', 'Years Experience', 'Professional Category', 'Top 3 Skills']
},
'offer': {
'filter': {'stage': 'Offer'},
'fields': ['name', 'email', 'phone', 'created_at', 'offer_status', 'offer_date', 'ai_score', 'years_experience', 'professional_category'],
'headers': ['Name', 'Email', 'Phone', 'Application Date', 'Offer Status', 'Offer Date', 'Match Score', 'Years Experience', 'Professional Category']
},
'hired': {
'filter': {'offer_status': 'Accepted'},
'fields': ['name', 'email', 'phone', 'created_at', 'offer_date', 'ai_score', 'years_experience', 'professional_category', 'join_date'],
'headers': ['Name', 'Email', 'Phone', 'Application Date', 'Hire Date', 'Match Score', 'Years Experience', 'Professional Category', 'Join Date']
}
}
@login_required
def export_candidates_csv(request, job_slug, stage):
"""Export candidates for a specific stage as CSV"""
job = get_object_or_404(models.JobPosting, slug=job_slug)
# Validate stage
if stage not in STAGE_CONFIG:
messages.error(request, "Invalid stage specified for export.")
return redirect('job_detail', job.slug)
config = STAGE_CONFIG[stage]
# Filter candidates based on stage
if stage == 'hired':
candidates = job.candidates.filter(**config['filter'])
else:
candidates = job.candidates.filter(**config['filter'])
# Handle search if provided
search_query = request.GET.get('search', '')
if search_query:
candidates = candidates.filter(
Q(first_name__icontains=search_query) |
Q(last_name__icontains=search_query) |
Q(email__icontains=search_query) |
Q(phone__icontains=search_query)
)
candidates = candidates.order_by('-created_at')
# Create CSV response
response = HttpResponse(content_type='text/csv')
filename = f"{slugify(job.title)}_{stage}_{datetime.now().strftime('%Y-%m-%d')}.csv"
response['Content-Disposition'] = f'attachment; filename="{filename}"'
# Write UTF-8 BOM for Excel compatibility
response.write('\ufeff')
writer = csv.writer(response)
# Write headers
headers = config['headers'].copy()
headers.extend(['Job Title', 'Department'])
writer.writerow(headers)
# Write candidate data
for candidate in candidates:
row = []
# Extract data based on stage configuration
for field in config['fields']:
if field == 'name':
row.append(candidate.name)
elif field == 'email':
row.append(candidate.email)
elif field == 'phone':
row.append(candidate.phone)
elif field == 'created_at':
row.append(candidate.created_at.strftime('%Y-%m-%d %H:%M') if candidate.created_at else '')
elif field == 'stage':
row.append(candidate.stage or '')
elif field == 'exam_status':
row.append(candidate.exam_status or '')
elif field == 'exam_date':
row.append(candidate.exam_date.strftime('%Y-%m-%d %H:%M') if candidate.exam_date else '')
elif field == 'interview_status':
row.append(candidate.interview_status or '')
elif field == 'interview_date':
row.append(candidate.interview_date.strftime('%Y-%m-%d %H:%M') if candidate.interview_date else '')
elif field == 'offer_status':
row.append(candidate.offer_status or '')
elif field == 'offer_date':
row.append(candidate.offer_date.strftime('%Y-%m-%d %H:%M') if candidate.offer_date else '')
elif field == 'ai_score':
# Extract AI score using model property
try:
score = candidate.match_score
row.append(f"{score}%" if score else '')
except:
row.append('')
elif field == 'years_experience':
# Extract years of experience using model property
try:
years = candidate.years_of_experience
row.append(f"{years}" if years else '')
except:
row.append('')
elif field == 'screening_rating':
# Extract screening rating using model property
try:
rating = candidate.screening_stage_rating
row.append(rating if rating else '')
except:
row.append('')
elif field == 'professional_category':
# Extract professional category using model property
try:
category = candidate.professional_category
row.append(category if category else '')
except:
row.append('')
elif field == 'top_skills':
# Extract top 3 skills using model property
try:
skills = candidate.top_3_keywords
row.append(', '.join(skills) if skills else '')
except:
row.append('')
elif field == 'strengths':
# Extract strengths using model property
try:
strengths = candidate.strengths
row.append(strengths if strengths else '')
except:
row.append('')
elif field == 'weaknesses':
# Extract weaknesses using model property
try:
weaknesses = candidate.weaknesses
row.append(weaknesses if weaknesses else '')
except:
row.append('')
elif field == 'join_date':
row.append(candidate.join_date.strftime('%Y-%m-%d') if candidate.join_date else '')
else:
row.append(getattr(candidate, field, ''))
# Add job information
row.extend([job.title, job.department or ''])
writer.writerow(row)
return response
# Removed incorrect
# The job_detail view is handled by function-based view in recruitment.views
@login_required
def sync_hired_candidates(request, job_slug):
"""Sync hired candidates to external sources using Django-Q"""
from django_q.tasks import async_task
from .tasks import sync_hired_candidates_task
if request.method == 'POST':
job = get_object_or_404(models.JobPosting, slug=job_slug)
try:
# Enqueue sync task to Django-Q for background processing
task_id = async_task(
sync_hired_candidates_task,
job_slug,
group=f"sync_job_{job_slug}",
timeout=300 # 5 minutes timeout
)
# Return immediate response with task ID for tracking
return JsonResponse({
'status': 'queued',
'message': 'Sync task has been queued for background processing',
'task_id': task_id
})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': f'Failed to queue sync task: {str(e)}'
}, status=500)
# For GET requests, return error
return JsonResponse({
'status': 'error',
'message': 'Only POST requests are allowed'
}, status=405)
@login_required
def test_source_connection(request, source_id):
"""Test connection to an external source"""
from .candidate_sync_service import CandidateSyncService
if request.method == 'POST':
source = get_object_or_404(models.Source, id=source_id)
try:
# Initialize sync service
sync_service = CandidateSyncService()
# Test connection
result = sync_service.test_source_connection(source)
# Return JSON response
return JsonResponse({
'status': 'success',
'result': result
})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': f'Connection test failed: {str(e)}'
}, status=500)
# For GET requests, return error
return JsonResponse({
'status': 'error',
'message': 'Only POST requests are allowed'
}, status=405)
@login_required
def sync_task_status(request, task_id):
"""Check the status of a sync task"""
from django_q.models import Task
try:
# Get the task from Django-Q
task = Task.objects.get(id=task_id)
# Determine status based on task state
if task.success():
status = 'completed'
message = 'Sync completed successfully'
result = task.result
elif task.stopped():
status = 'failed'
message = 'Sync task failed or was stopped'
result = task.result
elif task.started():
status = 'running'
message = 'Sync is currently running'
result = None
else:
status = 'pending'
message = 'Sync task is queued and waiting to start'
result = None
return JsonResponse({
'status': status,
'message': message,
'result': result,
'task_id': task_id,
'started': task.started(),
'stopped': task.stopped(),
'success': task.success()
})
except Task.DoesNotExist:
return JsonResponse({
'status': 'error',
'message': 'Task not found'
}, status=404)
except Exception as e:
return JsonResponse({
'status': 'error',
'message': f'Failed to check task status: {str(e)}'
}, status=500)
@login_required
def sync_history(request, job_slug=None):
"""View sync history and logs"""
from .models import IntegrationLog
from django_q.models import Task
# Get sync logs
if job_slug:
# Filter for specific job
job = get_object_or_404(models.JobPosting, slug=job_slug)
logs = IntegrationLog.objects.filter(
action=IntegrationLog.ActionChoices.SYNC,
request_data__job_slug=job_slug
).order_by('-created_at')
else:
# Get all sync logs
logs = IntegrationLog.objects.filter(
action=IntegrationLog.ActionChoices.SYNC
).order_by('-created_at')
# Get recent sync tasks
recent_tasks = Task.objects.filter(
group__startswith='sync_job_'
).order_by('-started')[:20]
context = {
'logs': logs,
'recent_tasks': recent_tasks,
'job': job if job_slug else None,
}
return render(request, 'recruitment/sync_history.html', context)