246 lines
8.7 KiB
Python
246 lines
8.7 KiB
Python
from django.shortcuts import render, get_object_or_404, redirect
|
|
from django.views.generic import ListView, CreateView, UpdateView, DetailView, DeleteView
|
|
from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin
|
|
from django.urls import reverse_lazy
|
|
from django.contrib import messages
|
|
from django.http import JsonResponse
|
|
from django.db import models
|
|
from .models import Source, IntegrationLog
|
|
from .forms import SourceForm, generate_api_key, generate_api_secret
|
|
from .decorators import login_required, staff_user_required
|
|
|
|
class SourceListView(LoginRequiredMixin, UserPassesTestMixin, ListView):
|
|
"""List all sources"""
|
|
model = Source
|
|
template_name = 'recruitment/source_list.html'
|
|
context_object_name = 'sources'
|
|
paginate_by = 10
|
|
|
|
def test_func(self):
|
|
return self.request.user.is_staff
|
|
|
|
def get_queryset(self):
|
|
queryset = super().get_queryset().order_by('name')
|
|
|
|
# Search functionality
|
|
search_query = self.request.GET.get('search', '')
|
|
if search_query:
|
|
queryset = queryset.filter(
|
|
models.Q(name__icontains=search_query) |
|
|
models.Q(source_type__icontains=search_query) |
|
|
models.Q(description__icontains=search_query)
|
|
)
|
|
|
|
return queryset
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
context['search_query'] = self.request.GET.get('search', '')
|
|
return context
|
|
|
|
class SourceCreateView(LoginRequiredMixin, UserPassesTestMixin, CreateView):
|
|
"""Create a new source"""
|
|
model = Source
|
|
form_class = SourceForm
|
|
template_name = 'recruitment/source_form.html'
|
|
success_url = reverse_lazy('source_list')
|
|
|
|
def test_func(self):
|
|
return self.request.user.is_staff
|
|
|
|
def form_valid(self, form):
|
|
# Set initial values
|
|
form.instance.created_by = self.request.user.get_full_name() or self.request.user.username
|
|
|
|
# Check if we need to generate API keys
|
|
if form.cleaned_data.get('generate_keys') == 'true':
|
|
form.instance.api_key = generate_api_key()
|
|
form.instance.api_secret = generate_api_secret()
|
|
|
|
# Log the key generation
|
|
IntegrationLog.objects.create(
|
|
source=form.instance,
|
|
action=IntegrationLog.ActionChoices.CREATE,
|
|
endpoint='/api/sources/',
|
|
method='POST',
|
|
request_data={'name': form.instance.name},
|
|
ip_address=self.request.META.get('REMOTE_ADDR'),
|
|
user_agent=self.request.META.get('HTTP_USER_AGENT', '')
|
|
)
|
|
|
|
response = super().form_valid(form)
|
|
|
|
# Add success message
|
|
messages.success(self.request, f'Source "{form.instance.name}" created successfully!')
|
|
|
|
return response
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
context['title'] = 'Create New Source'
|
|
context['generate_keys'] = self.request.GET.get('generate_keys', 'false')
|
|
return context
|
|
|
|
class SourceDetailView(LoginRequiredMixin, UserPassesTestMixin, DetailView):
|
|
"""View source details"""
|
|
model = Source
|
|
template_name = 'recruitment/source_detail.html'
|
|
context_object_name = 'source'
|
|
|
|
def test_func(self):
|
|
return self.request.user.is_staff
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
|
|
# Mask API keys in display
|
|
source = self.object
|
|
if source.api_key:
|
|
masked_key = source.api_key[:8] + '*' * 24
|
|
context['masked_api_key'] = masked_key
|
|
else:
|
|
context['masked_api_key'] = 'Not generated'
|
|
|
|
if source.api_secret:
|
|
masked_secret = source.api_secret[:12] + '*' * 52
|
|
context['masked_api_secret'] = masked_secret
|
|
else:
|
|
context['masked_api_secret'] = 'Not generated'
|
|
|
|
# Get recent integration logs
|
|
context['recent_logs'] = IntegrationLog.objects.filter(
|
|
source=source
|
|
).order_by('-created_at')[:10]
|
|
|
|
return context
|
|
|
|
class SourceUpdateView(LoginRequiredMixin, UserPassesTestMixin, UpdateView):
|
|
"""Update an existing source"""
|
|
model = Source
|
|
form_class = SourceForm
|
|
template_name = 'recruitment/source_form.html'
|
|
success_url = reverse_lazy('source_list')
|
|
|
|
def test_func(self):
|
|
return self.request.user.is_staff
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
context['title'] = f'Edit Source: {self.object.name}'
|
|
context['generate_keys'] = self.request.GET.get('generate_keys', 'false')
|
|
return context
|
|
|
|
def form_valid(self, form):
|
|
# Check if we need to generate new API keys
|
|
if form.cleaned_data.get('generate_keys') == 'true':
|
|
form.instance.api_key = generate_api_key()
|
|
form.instance.api_secret = generate_api_secret()
|
|
|
|
# Log the key regeneration
|
|
IntegrationLog.objects.create(
|
|
source=self.object,
|
|
action=IntegrationLog.ActionChoices.CREATE,
|
|
endpoint=f'/api/sources/{self.object.pk}/',
|
|
method='PUT',
|
|
request_data={'name': form.instance.name, 'regenerated_keys': True},
|
|
ip_address=self.request.META.get('REMOTE_ADDR'),
|
|
user_agent=self.request.META.get('HTTP_USER_AGENT', '')
|
|
)
|
|
|
|
messages.success(self.request, 'New API keys generated successfully!')
|
|
|
|
response = super().form_valid(form)
|
|
messages.success(self.request, f'Source "{form.instance.name}" updated successfully!')
|
|
return response
|
|
|
|
class SourceDeleteView(LoginRequiredMixin, UserPassesTestMixin, DeleteView):
|
|
"""Delete a source"""
|
|
model = Source
|
|
template_name = 'recruitment/source_confirm_delete.html'
|
|
success_url = reverse_lazy('source_list')
|
|
|
|
def test_func(self):
|
|
return self.request.user.is_staff
|
|
|
|
def delete(self, request, *args, **kwargs):
|
|
self.object = self.get_object()
|
|
success_url = self.get_success_url()
|
|
|
|
# Log the deletion
|
|
IntegrationLog.objects.create(
|
|
source=self.object,
|
|
action=IntegrationLog.ActionChoices.SYNC, # Using SYNC for deletion
|
|
endpoint=f'/api/sources/{self.object.pk}/',
|
|
method='DELETE',
|
|
request_data={'name': self.object.name},
|
|
ip_address=self.request.META.get('REMOTE_ADDR'),
|
|
user_agent=self.request.META.get('HTTP_USER_AGENT', '')
|
|
)
|
|
|
|
messages.success(request, f'Source "{self.object.name}" deleted successfully!')
|
|
return super().delete(request, *args, **kwargs)
|
|
|
|
@login_required
|
|
@staff_user_required
|
|
def generate_api_keys_view(request, pk):
|
|
"""Generate new API keys for a specific source"""
|
|
if not request.user.is_staff:
|
|
return JsonResponse({'error': 'Permission denied'}, status=403)
|
|
|
|
try:
|
|
source = get_object_or_404(Source, pk=pk)
|
|
except Source.DoesNotExist:
|
|
return JsonResponse({'error': 'Source not found'}, status=404)
|
|
|
|
if request.method == 'POST':
|
|
# Generate new API keys
|
|
new_api_key = generate_api_key()
|
|
new_api_secret = generate_api_secret()
|
|
|
|
old_api_key = source.api_key
|
|
source.api_key = new_api_key
|
|
source.api_secret = new_api_secret
|
|
source.save()
|
|
|
|
return redirect('source_detail', pk=source.pk)
|
|
|
|
|
|
return JsonResponse({'error': 'Invalid request method'}, status=405)
|
|
|
|
@login_required
|
|
@staff_user_required
|
|
def toggle_source_status_view(request, pk):
|
|
"""Toggle the active status of a source"""
|
|
if not request.user.is_staff:
|
|
return JsonResponse({'error': 'Permission denied'}, status=403)
|
|
|
|
try:
|
|
source = get_object_or_404(Source, pk=pk)
|
|
except Source.DoesNotExist:
|
|
return JsonResponse({'error': 'Source not found'}, status=404)
|
|
|
|
if request.method == 'POST':
|
|
old_status = source.is_active
|
|
source.is_active = not source.is_active
|
|
source.save()
|
|
|
|
status_text = 'activated' if source.is_active else 'deactivated'
|
|
|
|
return redirect('source_detail', pk=source.pk)
|
|
# return JsonResponse({
|
|
# 'success': True,
|
|
# 'is_active': source.is_active,
|
|
# 'message': f'Source "{source.name}" {status_text} successfully'
|
|
# })
|
|
@login_required
|
|
def copy_to_clipboard_view(request):
|
|
"""HTMX endpoint to copy text to clipboard"""
|
|
if request.method == 'POST':
|
|
text_to_copy = request.POST.get('text', '')
|
|
|
|
return render(request, 'includes/copy_to_clipboard.html', {
|
|
'text': text_to_copy
|
|
})
|
|
|
|
return JsonResponse({'error': 'Invalid request method'}, status=405)
|