Marwan Alwali 263292f6be update
2025-11-04 00:50:06 +03:00

5074 lines
176 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
HR app views with comprehensive CRUD operations following healthcare best practices.
"""
from django.shortcuts import render, get_object_or_404, redirect
from django.contrib.auth.decorators import login_required
from django.views.decorators.http import require_POST, require_GET, require_http_methods
from django.template.loader import render_to_string
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib import messages
from django.views.generic import (
ListView, DetailView, CreateView, UpdateView, DeleteView, TemplateView
)
from django.urls import reverse_lazy, reverse
from django.http import JsonResponse, HttpResponse
from django.db.models import Q, Count, Avg, Sum
from django.utils import timezone
from django.core.paginator import Paginator
from django.db import transaction
from datetime import datetime, timedelta, date
import json
from accounts.models import User
from appointments.utils import get_tenant_from_request
from .models import *
from .forms import *
from core.utils import AuditLogger
class HRDashboardView(LoginRequiredMixin, TemplateView):
"""
Main HR dashboard with comprehensive statistics and recent activity.
"""
template_name = 'hr/dashboard.html'
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
# Basic statistics
context.update({
'total_employees': Employee.objects.filter(tenant=self.request.user.tenant).count(),
'active_employees': Employee.objects.filter(
tenant=self.request.user.tenant,
employment_status='ACTIVE'
).count(),
'departments': Department.objects.filter(tenant=self.request.user.tenant),
'pending_reviews': PerformanceReview.objects.filter(
employee__tenant=self.request.user.tenant,
status='PENDING'
).count(),
})
# Recent activity
context.update({
'recent_hires': Employee.objects.filter(
tenant=self.request.user.tenant,
hire_date__gte=timezone.now().date() - timedelta(days=30)
).order_by('-hire_date')[:5],
'recent_reviews': PerformanceReview.objects.filter(
employee__tenant=self.request.user.tenant
).order_by('-review_date')[:5],
'recent_training': TrainingRecord.objects.filter(
employee__tenant=self.request.user.tenant
).order_by('-completion_date')[:5],
})
# Attendance statistics
today = timezone.now().date()
context.update({
'employees_clocked_in': TimeEntry.objects.filter(
employee__tenant=self.request.user.tenant,
clock_in_time__date=today,
clock_out_time__isnull=True
).count(),
'total_hours_today': TimeEntry.objects.filter(
employee__tenant=self.request.user.tenant,
clock_in_time__date=today,
clock_out_time__isnull=False
).aggregate(
total=Sum('total_hours')
)['total'] or 0,
})
return context
class EmployeeListView(LoginRequiredMixin, ListView):
"""
List all employees with filtering and search capabilities.
"""
model = Employee
template_name = 'hr/employees/employee_list.html'
context_object_name = 'employees'
paginate_by = 20
def get_queryset(self):
queryset = Employee.objects.filter(tenant=self.request.user.tenant)
# Search functionality
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(first_name__icontains=search) |
Q(last_name__icontains=search) |
Q(employee_id__icontains=search) |
Q(email__icontains=search)
)
# Filter by department
department = self.request.GET.get('department')
if department:
queryset = queryset.filter(department_id=department)
# Filter by employment status
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(employment_status=status)
# Filter by position
position = self.request.GET.get('position')
if position:
queryset = queryset.filter(job_title__icontains=position)
return queryset.select_related('department').order_by('last_name', 'first_name')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['departments'] = Department.objects.filter(
tenant=self.request.user.tenant
).order_by('name')
context['search'] = self.request.GET.get('search', '')
context['selected_department'] = self.request.GET.get('department', '')
context['selected_status'] = self.request.GET.get('status', '')
context['selected_position'] = self.request.GET.get('position', '')
return context
class EmployeeDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about a specific employee.
"""
model = Employee
template_name = 'hr/employees/employee_detail.html'
context_object_name = 'employee'
def get_queryset(self):
return Employee.objects.filter(tenant=self.request.user.tenant)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
employee = self.get_object()
# Recent time entries
context['recent_time_entries'] = TimeEntry.objects.filter(
employee=employee
).order_by('-clock_in_time')[:10]
# Recent performance reviews
context['recent_reviews'] = PerformanceReview.objects.filter(
employee=employee
).order_by('-review_date')[:5]
# Training records
context['training_records'] = TrainingRecord.objects.filter(
employee=employee
).order_by('-completion_date')[:10]
# Schedule assignments
context['current_schedules'] = Schedule.objects.filter(
employee=employee,
effective_date__lte=timezone.now().date(),
end_date__gte=timezone.now().date() if Schedule.end_date else True
)
return context
class EmployeeCreateView(LoginRequiredMixin, CreateView):
"""
Create a new employee record.
"""
model = Employee
form_class = EmployeeForm
template_name = 'hr/employees/employee_form.html'
success_url = reverse_lazy('hr:employee_list')
def form_valid(self, form):
form.instance.tenant = self.request.user.tenant
messages.success(self.request, 'Employee created successfully.')
return super().form_valid(form)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
class EmployeeUpdateView(LoginRequiredMixin, UpdateView):
"""
Update an existing employee record.
"""
model = Employee
form_class = EmployeeForm
template_name = 'hr/employees/employee_form.html'
def get_queryset(self):
return Employee.objects.filter(tenant=self.request.user.tenant)
def get_success_url(self):
return reverse('hr:employee_detail', kwargs={'pk': self.object.pk})
def form_valid(self, form):
messages.success(self.request, 'Employee updated successfully.')
return super().form_valid(form)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
class EmployeeDeleteView(LoginRequiredMixin, DeleteView):
"""
Soft delete an employee record (healthcare compliance).
"""
model = Employee
template_name = 'hr/employees/employee_confirm_delete.html'
success_url = reverse_lazy('hr:employee_list')
def get_queryset(self):
return Employee.objects.filter(tenant=self.request.user.tenant)
def delete(self, request, *args, **kwargs):
# Soft delete - mark as inactive instead of actual deletion
employee = self.get_object()
employee.employment_status = 'INACTIVE'
employee.termination_date = timezone.now().date()
employee.save()
messages.success(request, f'Employee {employee.get_full_name()} has been deactivated.')
return redirect(self.success_url)
class DepartmentListView(LoginRequiredMixin, ListView):
"""
List all departments with employee counts.
"""
model = Department
template_name = 'hr/departments/department_list.html'
context_object_name = 'departments'
paginate_by = 20
def get_queryset(self):
queryset = Department.objects.filter(tenant=self.request.user.tenant)
# Search functionality
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(name__icontains=search) |
Q(department_code__icontains=search) |
Q(description__icontains=search)
)
return queryset
class DepartmentDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about a specific department.
"""
model = Department
template_name = 'hr/departments/department_detail.html'
context_object_name = 'department'
def get_queryset(self):
return Department.objects.filter(tenant=self.request.user.tenant)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
department = self.get_object()
# Department employees
context['employees'] = Employee.objects.filter(
department=department
).order_by('last_name', 'first_name')
# Department statistics
context['active_employees'] = Employee.objects.filter(
department=department,
employment_status='ACTIVE'
).count()
return context
class DepartmentCreateView(LoginRequiredMixin, CreateView):
"""
Create a new department.
"""
model = Department
form_class = DepartmentForm
template_name = 'hr/departments/department_form.html'
success_url = reverse_lazy('hr:department_list')
def form_valid(self, form):
form.instance.tenant = self.request.user.tenant
messages.success(self.request, 'Department created successfully.')
return super().form_valid(form)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
class DepartmentUpdateView(LoginRequiredMixin, UpdateView):
"""
Update an existing department.
"""
model = Department
form_class = DepartmentForm
template_name = 'hr/departments/department_form.html'
def get_queryset(self):
return Department.objects.filter(tenant=self.request.user.tenant)
def get_success_url(self):
return reverse('hr:department_detail', kwargs={'pk': self.object.pk})
def form_valid(self, form):
messages.success(self.request, 'Department updated successfully.')
return super().form_valid(form)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
class DepartmentDeleteView(LoginRequiredMixin, DeleteView):
"""
Delete a department (only if no employees assigned).
"""
model = Department
template_name = 'hr/departments/department_confirm_delete.html'
success_url = reverse_lazy('hr:department_list')
def get_queryset(self):
return Department.objects.filter(tenant=self.request.user.tenant)
def delete(self, request, *args, **kwargs):
department = self.get_object()
# Check if department has employees
if Employee.objects.filter(department=department).exists():
messages.error(request, 'Cannot delete department with assigned employees.')
return redirect('hr:department_detail', pk=department.pk)
messages.success(request, f'Department {department.name} deleted successfully.')
return super().delete(request, *args, **kwargs)
class ScheduleListView(LoginRequiredMixin, ListView):
"""
List all schedules with filtering capabilities.
"""
model = Schedule
template_name = 'hr/schedules/schedule_list.html'
context_object_name = 'schedules'
paginate_by = 20
def get_queryset(self):
queryset = Schedule.objects.filter(employee__tenant=self.request.user.tenant)
# Filter by status
is_active = self.request.GET.get('is_active')
if is_active:
queryset = queryset.filter(is_active=(is_active == 'true'))
# Filter by date range
effective_date = self.request.GET.get('effective_date')
end_date = self.request.GET.get('end_date')
if effective_date:
queryset = queryset.filter(effective_date__gte=effective_date)
if end_date:
queryset = queryset.filter(end_date__lte=end_date)
return queryset.select_related('employee').order_by('-effective_date')
class ScheduleDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about a specific schedule.
"""
model = Schedule
template_name = 'hr/schedules/schedule_detail.html'
context_object_name = 'schedule'
def get_queryset(self):
return Schedule.objects.filter(employee__tenant=self.request.user.tenant)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
schedule = self.get_object()
# Schedule assignments
context['assignments'] = ScheduleAssignment.objects.filter(
schedule=schedule
).order_by('assignment_date', 'start_time')
return context
class ScheduleCreateView(LoginRequiredMixin, CreateView):
"""
Create a new schedule.
"""
model = Schedule
form_class = ScheduleForm
template_name = 'hr/schedules/schedule_form.html'
success_url = reverse_lazy('hr:schedule_list')
def form_valid(self, form):
form.instance.created_by = self.request.user
messages.success(self.request, 'Schedule created successfully.')
return super().form_valid(form)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
class ScheduleUpdateView(LoginRequiredMixin, UpdateView):
"""
Update an existing schedule (limited after publication).
"""
model = Schedule
form_class = ScheduleForm
template_name = 'hr/schedules/schedule_form.html'
def get_queryset(self):
return Schedule.objects.filter(employee__tenant=self.request.user.tenant)
def get_success_url(self):
return reverse('hr:schedule_detail', kwargs={'pk': self.object.pk})
def form_valid(self, form):
messages.success(self.request, 'Schedule updated successfully.')
return super().form_valid(form)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
class ScheduleAssignmentListView(LoginRequiredMixin, ListView):
"""
List all schedule assignments with filtering capabilities.
"""
model = ScheduleAssignment
template_name = 'hr/assignments/schedule_assignment_list.html'
context_object_name = 'assignments'
paginate_by = 20
def get_queryset(self):
queryset = ScheduleAssignment.objects.filter(
schedule__employee__tenant=self.request.user.tenant
)
# Filter by employee
employee = self.request.GET.get('employee')
if employee:
queryset = queryset.filter(schedule__employee_id=employee)
# Filter by date range
start_date = self.request.GET.get('start_date')
end_date = self.request.GET.get('end_date')
if start_date:
queryset = queryset.filter(assignment_date__gte=start_date)
if end_date:
queryset = queryset.filter(assignment_date__lte=end_date)
# Filter by status
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
return queryset.select_related('schedule__employee', 'department').order_by('assignment_date', 'start_time')
class ScheduleAssignmentCreateView(LoginRequiredMixin, CreateView):
"""
Create a new schedule assignment.
"""
model = ScheduleAssignment
form_class = ScheduleAssignmentForm
template_name = 'hr/assignments/schedule_assignment_form.html'
success_url = reverse_lazy('hr:schedule_assignment_list')
def form_valid(self, form):
messages.success(self.request, 'Schedule assignment created successfully.')
return super().form_valid(form)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
class ScheduleAssignmentUpdateView(LoginRequiredMixin, UpdateView):
"""
Update an existing schedule assignment.
"""
model = ScheduleAssignment
form_class = ScheduleAssignmentForm
template_name = 'hr/assignments/schedule_assignment_form.html'
success_url = reverse_lazy('hr:schedule_assignment_list')
def get_queryset(self):
return ScheduleAssignment.objects.filter(schedule__employee__tenant=self.request.user.tenant)
def form_valid(self, form):
messages.success(self.request, 'Schedule assignment updated successfully.')
return super().form_valid(form)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
class ScheduleAssignmentDeleteView(LoginRequiredMixin, DeleteView):
model = ScheduleAssignment
template_name = 'hr/assignments/schedule_assignment_confirm_delete.html'
success_url = reverse_lazy('hr:schedule_assignment_list')
def get_queryset(self):
queryset = ScheduleAssignment.objects.filter(schedule__employee__tenant=self.request.user.tenant)
employee = self.request.GET.get('employee')
if employee:
queryset = queryset.filter(schedule__employee_id=employee)
class TimeEntryListView(LoginRequiredMixin, ListView):
"""
List time entries with filtering capabilities.
"""
model = TimeEntry
template_name = 'hr/time_entries/time_entry_list.html'
context_object_name = 'time_entries'
paginate_by = 20
def get_queryset(self):
queryset = TimeEntry.objects.filter(
employee__tenant=self.request.user.tenant
).select_related('employee')
# Filter by employee
employee = self.request.GET.get('employee')
if employee:
queryset = queryset.filter(employee_id=employee)
# Filter by date range
start_date = self.request.GET.get('start_date')
end_date = self.request.GET.get('end_date')
if start_date:
queryset = queryset.filter(work_date__gte=start_date)
if end_date:
queryset = queryset.filter(work_date__lte=end_date)
# Filter by approval status
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
return queryset.order_by('-work_date')
class TimeEntryDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about a specific time entry.
"""
model = TimeEntry
template_name = 'hr/time_entries/time_entry_detail.html'
context_object_name = 'time_entry'
def get_queryset(self):
return TimeEntry.objects.filter(employee__tenant=self.request.user.tenant)
class TimeEntryCreateView(LoginRequiredMixin, CreateView):
"""
Create a new time entry (manual entry).
"""
model = TimeEntry
form_class = TimeEntryForm
template_name = 'hr/time_entries/time_entry_form.html'
success_url = reverse_lazy('hr:time_entry_list')
def form_valid(self, form):
messages.success(self.request, 'Time entry created successfully.')
return super().form_valid(form)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
class TimeEntryUpdateView(LoginRequiredMixin, UpdateView):
"""
Update an existing time entry (limited fields).
"""
model = TimeEntry
form_class = TimeEntryForm
template_name = 'hr/time_entries/time_entry_form.html'
success_url = reverse_lazy('hr:time_entry_list')
def get_queryset(self):
return TimeEntry.objects.filter(employee__tenant=self.request.user.tenant)
def form_valid(self, form):
# Restrict updates for approved entries
if self.object.status == 'APPROVED' and not self.request.user.is_superuser:
messages.error(self.request, 'Cannot modify approved time entries.')
return redirect('hr:time_entry_detail', pk=self.object.pk)
messages.success(self.request, 'Time entry updated successfully.')
return super().form_valid(form)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
class PerformanceReviewListView(LoginRequiredMixin, ListView):
"""
List performance reviews with filtering capabilities.
"""
model = PerformanceReview
template_name = 'hr/reviews/performance_review_list.html'
context_object_name = 'reviews'
paginate_by = 20
def get_queryset(self):
queryset = PerformanceReview.objects.filter(
employee__tenant=self.request.user.tenant
).select_related('employee', 'reviewer')
# Filter by employee
employee = self.request.GET.get('employee')
if employee:
queryset = queryset.filter(employee_id=employee)
# Filter by status
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
# Filter by review type
review_type = self.request.GET.get('review_type')
if review_type:
queryset = queryset.filter(review_type=review_type)
return queryset.order_by('-review_date')
class PerformanceReviewDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about a specific performance review.
"""
model = PerformanceReview
template_name = 'hr/reviews/performance_review_detail.html'
context_object_name = 'review'
def get_queryset(self):
# Eager load employee + department + supervisor to reduce queries
return (PerformanceReview.objects
.select_related('employee', 'employee__department', 'employee__supervisor', 'reviewer')
.filter(employee__tenant=self.request.user.tenant))
def get_object(self, queryset=None):
queryset = queryset or self.get_queryset()
return get_object_or_404(queryset, pk=self.kwargs.get('pk') or self.kwargs.get('id'))
@staticmethod
def _split_lines(text):
if not text:
return []
parts = []
for line in str(text).replace('\r', '').split('\n'):
for piece in line.split(';'):
piece = piece.strip()
if piece:
parts.append(piece)
return parts
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
review = ctx['review']
# Build categories from competency_ratings JSON: { "Teamwork": 4.0, ... }
# Make a list of dicts the template can iterate through.
categories = []
ratings = review.competency_ratings or {}
# Keep a stable order (by key) to avoid chart jitter
for name in sorted(ratings.keys()):
try:
score = float(ratings[name])
except (TypeError, ValueError):
score = 0.0
categories.append({'name': name, 'score': score, 'comments': ''})
# Previous reviews (same employee, exclude current)
previous_reviews = (
PerformanceReview.objects
.filter(employee=review.employee)
.exclude(pk=review.pk)
.order_by('-review_date')[:5]
.select_related('employee')
)
# Strengths / AFI lists for bullet rendering
strengths_list = self._split_lines(review.strengths)
afi_list = self._split_lines(review.areas_for_improvement)
# Goals blocks as lists for nicer display
goals_achieved_list = self._split_lines(review.goals_achieved)
goals_not_achieved_list = self._split_lines(review.goals_not_achieved)
future_goals_list = self._split_lines(review.future_goals)
ctx.update({
'categories': categories,
'previous_reviews': previous_reviews,
'review_strengths_list': strengths_list,
'review_afi_list': afi_list,
'goals_achieved_list': goals_achieved_list,
'goals_not_achieved_list': goals_not_achieved_list,
'future_goals_list': future_goals_list,
})
# convenience on the review object (template already expects these list props sometimes)
setattr(review, 'strengths_list', strengths_list)
setattr(review, 'areas_for_improvement_list', afi_list)
return ctx
class PerformanceReviewCreateView(LoginRequiredMixin, CreateView):
"""
Create a new performance review.
"""
model = PerformanceReview
form_class = PerformanceReviewForm
template_name = 'hr/reviews/performance_review_form.html'
success_url = reverse_lazy('hr:performance_review_list')
def form_valid(self, form):
form.instance.reviewer = self.request.user
messages.success(self.request, 'Performance review created successfully.')
return super().form_valid(form)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
class PerformanceReviewUpdateView(LoginRequiredMixin, UpdateView):
"""
Update an existing performance review.
"""
model = PerformanceReview
form_class = PerformanceReviewForm
template_name = 'hr/reviews/performance_review_form.html'
def get_queryset(self):
return PerformanceReview.objects.filter(employee__tenant=self.request.user.tenant)
def get_success_url(self):
return reverse('hr:performance_review_detail', kwargs={'pk': self.object.pk})
def form_valid(self, form):
messages.success(self.request, 'Performance review updated successfully.')
return super().form_valid(form)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
class PerformanceReviewDeleteView(LoginRequiredMixin, DeleteView):
"""
Delete a performance review (only if not finalized).
"""
model = PerformanceReview
template_name = 'hr/reviews/performance_review_confirm_delete.html'
context_object_name = 'review'
success_url = reverse_lazy('hr:performance_review_list')
def get_queryset(self):
return PerformanceReview.objects.filter(employee__tenant=self.request.user.tenant)
def delete(self, request, *args, **kwargs):
review = self.get_object()
# Check if review is finalized
if review.status == 'COMPLETED':
messages.error(request, 'Cannot delete completed performance reviews.')
return redirect('hr:performance_review_detail', pk=review.pk)
messages.success(request, 'Performance review deleted successfully.')
return super().delete(request, *args, **kwargs)
class TrainingManagementView(LoginRequiredMixin, ListView):
model = TrainingRecord
template_name = 'hr/training/training_management.html'
context_object_name = 'training_records'
paginate_by = 20
def get_queryset(self):
tenant = self.request.user.tenant
qs = (TrainingRecord.objects
.filter(employee__tenant=tenant)
.select_related('employee', 'employee__department')
.order_by( '-completion_date'))
# optional GET filters (works with the templates inputs)
if emp := self.request.GET.get('employee'):
qs = qs.filter(employee_id=emp)
if ttype := self.request.GET.get('training_type'):
qs = qs.filter(training_type=ttype)
if status := self.request.GET.get('status'):
qs = qs.filter(status=status)
return qs
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
tenant = self.request.user.tenant
today = timezone.now().date()
base = TrainingRecord.objects.filter(employee__tenant=tenant)
total_records = base.count()
completed_trainings = base.filter(status='COMPLETED').count()
pending_trainings = base.filter(status__in=['SCHEDULED', 'IN_PROGRESS']).count()
overdue_trainings = base.filter(expiry_date__lt=today, expiry_date__isnull=False).exclude(
status='COMPLETED').count()
# compliance rate = “not expired” among trainings that have an expiry_date
with_expiry = base.filter(expiry_date__isnull=False)
valid_now = with_expiry.filter(expiry_date__gte=today).count()
compliance_rate = round((valid_now / with_expiry.count()) * 100, 1) if with_expiry.exists() else 100.0
# expiring soon = within 30 days
expiring_soon_count = with_expiry.filter(expiry_date__gte=today,
expiry_date__lte=today + timedelta(days=30)).count()
# department compliance (simple example: percent of non-expired per dept among those with expiry)
dept_rows = []
departments = Department.objects.filter(tenant=tenant).order_by('name')
for d in departments:
d_qs = with_expiry.filter(employee__department=d)
if not d_qs.exists():
rate = 100
else:
ok = d_qs.filter(expiry_date__gte=today).count()
rate = round((ok / d_qs.count()) * 100)
color = 'success' if rate >= 90 else 'warning' if rate >= 70 else 'danger'
dept_rows.append({'name': d.name, 'compliance_rate': rate, 'compliance_color': color})
# “compliance alerts” demo (overdue/expiring soon)
alerts = []
for tr in base.select_related('employee'):
if tr.expiry_date:
if tr.expiry_date < today:
alerts.append({
'id': tr.id,
'employee': tr.employee,
'requirement': tr.training_name,
'due_date': tr.expiry_date,
'priority_color': 'danger',
'urgency_color': 'danger',
'get_priority_display': 'Overdue',
})
elif today <= tr.expiry_date <= today + timedelta(days=30):
alerts.append({
'id': tr.id,
'employee': tr.employee,
'requirement': tr.training_name,
'due_date': tr.expiry_date,
'priority_color': 'warning',
'urgency_color': 'warning',
'get_priority_display': 'Expiring Soon',
})
ctx.update({
'total_records': total_records,
'completed_trainings': completed_trainings,
'pending_trainings': pending_trainings,
'overdue_trainings': overdue_trainings,
'departments': departments,
'compliance_rate': compliance_rate,
'expiring_soon_count': expiring_soon_count,
'department_compliance': dept_rows,
'compliance_alerts': alerts,
})
return ctx
class TrainingRecordListView(LoginRequiredMixin, ListView):
"""
List training records with filtering capabilities.
"""
model = TrainingRecord
template_name = 'hr/training/training_record_list.html'
context_object_name = 'training_records'
paginate_by = 20
def get_queryset(self):
queryset = TrainingRecord.objects.filter(
employee__tenant=self.request.user.tenant
).select_related('employee')
# Filter by employee
employee = self.request.GET.get('employee')
if employee:
queryset = queryset.filter(employee_id=employee)
# Filter by training type
training_type = self.request.GET.get('training_type')
if training_type:
queryset = queryset.filter(training_type=training_type)
# Filter by completion status
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
return queryset.order_by('-completion_date')
class TrainingRecordDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about a specific training record.
"""
model = TrainingRecord
template_name = 'hr/training/training_record_detail.html'
context_object_name = 'record'
def get_queryset(self):
tenant = self.request.user.tenant
return TrainingRecord.objects.filter(employee__tenant=tenant)
class TrainingRecordCreateView(LoginRequiredMixin, CreateView):
"""
Create a new training record.
"""
model = TrainingRecord
form_class = TrainingRecordForm
template_name = 'hr/training/training_record_form.html'
success_url = reverse_lazy('hr:training_record_list')
def form_valid(self, form):
form.instance.created_by = self.request.user
messages.success(self.request, 'Training record created successfully.')
return super().form_valid(form)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
class TrainingRecordUpdateView(LoginRequiredMixin, UpdateView):
"""
Update an existing training record.
"""
model = TrainingRecord
form_class = TrainingRecordForm
template_name = 'hr/training/training_record_form.html'
def get_queryset(self):
return TrainingRecord.objects.filter(employee__tenant=self.request.user.tenant)
def get_success_url(self):
return reverse('hr:training_record_detail', kwargs={'pk': self.object.pk})
def form_valid(self, form):
messages.success(self.request, 'Training record updated successfully.')
return super().form_valid(form)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
class TrainingRecordDeleteView(LoginRequiredMixin, DeleteView):
"""
Delete a training record.
"""
model = TrainingRecord
template_name = 'hr/training/training_record_confirm_delete.html'
success_url = reverse_lazy('hr:training_record_list')
context_object_name = 'record'
def get_queryset(self):
return TrainingRecord.objects.filter(employee__tenant=self.request.user.tenant)
def delete(self, request, *args, **kwargs):
messages.success(request, 'Training record deleted successfully.')
return super().delete(request, *args, **kwargs)
class TrainingProgramListView(LoginRequiredMixin, ListView):
"""List all training programs."""
model = TrainingPrograms
template_name = 'hr/training/program_list.html'
context_object_name = 'programs'
paginate_by = 20
def get_queryset(self):
queryset = TrainingPrograms.objects.filter(tenant=self.request.user.tenant)
# Search functionality
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(name__icontains=search) |
Q(description__icontains=search) |
Q(program_provider__icontains=search)
)
# Filter by program type
program_type = self.request.GET.get('program_type')
if program_type:
queryset = queryset.filter(program_type=program_type)
# Filter by certification status
is_certified = self.request.GET.get('is_certified')
if is_certified:
queryset = queryset.filter(is_certified=(is_certified == 'true'))
return queryset.select_related('instructor').order_by('name')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['program_types'] = TrainingPrograms.TrainingType.choices
context['search'] = self.request.GET.get('search', '')
context['selected_type'] = self.request.GET.get('program_type', '')
context['selected_certified'] = self.request.GET.get('is_certified', '')
return context
class TrainingProgramDetailView(LoginRequiredMixin, DetailView):
"""Display detailed information about a training program."""
model = TrainingPrograms
template_name = 'hr/training/program_detail.html'
context_object_name = 'program'
def get_queryset(self):
return TrainingPrograms.objects.filter(tenant=self.request.user.tenant)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
program = self.get_object()
# Get program modules
context['modules'] = ProgramModule.objects.filter(program=program).order_by('order')
# Get prerequisites
context['prerequisites'] = ProgramPrerequisite.objects.filter(
program=program
).select_related('required_program')
# Get upcoming sessions
context['upcoming_sessions'] = TrainingSession.objects.filter(
program=program,
start_at__gte=timezone.now()
).order_by('start_at')[:5]
# Get recent completions
context['recent_completions'] = TrainingRecord.objects.filter(
program=program,
status='COMPLETED'
).select_related('employee').order_by('-completion_date')[:10]
# Statistics
context['total_enrollments'] = TrainingRecord.objects.filter(program=program).count()
context['completion_rate'] = 0
if context['total_enrollments'] > 0:
completed = TrainingRecord.objects.filter(program=program, status='COMPLETED').count()
context['completion_rate'] = round((completed / context['total_enrollments']) * 100, 1)
return context
class TrainingProgramCreateView(LoginRequiredMixin, CreateView):
"""Create a new training program."""
model = TrainingPrograms
form_class = TrainingProgramForm
template_name = 'hr/training/program_form.html'
def form_valid(self, form):
form.instance.tenant = self.request.user.tenant
form.instance.created_by = self.request.user.employee_profile
messages.success(self.request, 'Training program created successfully.')
return super().form_valid(form)
def get_success_url(self):
return reverse('hr:training_program_detail', kwargs={'pk': self.object.pk})
class TrainingProgramUpdateView(LoginRequiredMixin, UpdateView):
"""Update an existing training program."""
model = TrainingPrograms
template_name = 'hr/training/program_form.html'
form_class = TrainingProgramForm
def get_queryset(self):
return TrainingPrograms.objects.filter(tenant=self.request.user.tenant)
def form_valid(self, form):
messages.success(self.request, 'Training program updated successfully.')
return super().form_valid(form)
def get_success_url(self):
return reverse('hr:training_program_detail', kwargs={'pk': self.object.pk})
class TrainingSessionListView(LoginRequiredMixin, ListView):
"""List all training sessions."""
model = TrainingSession
template_name = 'hr/training/session_list.html'
context_object_name = 'sessions'
paginate_by = 20
def get_queryset(self):
queryset = TrainingSession.objects.filter(
program__tenant=self.request.user.tenant
).select_related('program', 'instructor')
# Filter by program
program_id = self.request.GET.get('program')
if program_id:
queryset = queryset.filter(program_id=program_id)
# Filter by delivery method
delivery_method = self.request.GET.get('delivery_method')
if delivery_method:
queryset = queryset.filter(delivery_method=delivery_method)
# Filter by date range
start_date = self.request.GET.get('start_date')
if start_date:
queryset = queryset.filter(start_at__date__gte=start_date)
end_date = self.request.GET.get('end_date')
if end_date:
queryset = queryset.filter(start_at__date__lte=end_date)
return queryset.order_by('start_at')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['programs'] = TrainingPrograms.objects.filter(
tenant=self.request.user.tenant
).order_by('name')
context['delivery_methods'] = TrainingSession.TrainingDelivery.choices
return context
class TrainingSessionDetailView(LoginRequiredMixin, DetailView):
"""Display detailed information about a training session."""
model = TrainingSession
template_name = 'hr/training/session_detail.html'
context_object_name = 'session'
def get_queryset(self):
return TrainingSession.objects.filter(
program__tenant=self.request.user.tenant
).select_related('program', 'instructor')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
session = self.get_object()
# Get enrollments
context['enrollments'] = TrainingRecord.objects.filter(
session=session
).select_related('employee').order_by('enrolled_at')
# Get attendance records
context['attendance_records'] = TrainingAttendance.objects.filter(
enrollment__session=session
).select_related('enrollment__employee')
# Statistics
context['total_enrolled'] = context['enrollments'].count()
context['capacity_percentage'] = 0
if session.capacity > 0:
context['capacity_percentage'] = round(
(context['total_enrolled'] / session.capacity) * 100, 1
)
return context
class TrainingSessionCreateView(LoginRequiredMixin, CreateView):
"""Create a new training session."""
model = TrainingSession
form_class = TrainingSessionForm
template_name = 'hr/training/session_form.html'
def form_valid(self, form):
form.instance.created_by = self.request.user.employee_profile
messages.success(self.request, 'Training session created successfully.')
return super().form_valid(form)
def get_success_url(self):
return reverse('hr:training_session_detail', kwargs={'pk': self.object.pk})
class TrainingSessionUpdateView(LoginRequiredMixin, UpdateView):
"""Update an existing training session."""
model = TrainingSession
form_class = TrainingSessionForm
template_name = 'hr/training/session_form.html'
def get_queryset(self):
return TrainingSession.objects.filter(program__tenant=self.request.user.tenant)
def form_valid(self, form):
messages.success(self.request, 'Training session updated successfully.')
return super().form_valid(form)
def get_success_url(self):
return reverse('hr:training_session_detail', kwargs={'pk': self.object.pk})
class TrainingCertificateListView(LoginRequiredMixin, ListView):
"""List all training certificates."""
model = TrainingCertificates
template_name = 'hr/training/certificate_list.html'
context_object_name = 'certificates'
paginate_by = 20
def get_queryset(self):
queryset = TrainingCertificates.objects.filter(
employee__tenant=self.request.user.tenant
).select_related('employee', 'program', 'enrollment')
# Filter by employee
employee_id = self.request.GET.get('employee')
if employee_id:
queryset = queryset.filter(employee_id=employee_id)
# Filter by program
program_id = self.request.GET.get('program')
if program_id:
queryset = queryset.filter(program_id=program_id)
# Filter by expiry status
expiry_status = self.request.GET.get('expiry_status')
today = timezone.now().date()
if expiry_status == 'valid':
queryset = queryset.filter(
Q(expiry_date__isnull=True) | Q(expiry_date__gt=today)
)
elif expiry_status == 'expiring':
queryset = queryset.filter(
expiry_date__gt=today,
expiry_date__lte=today + timedelta(days=30)
)
elif expiry_status == 'expired':
queryset = queryset.filter(expiry_date__lt=today)
return queryset.order_by('-issued_date')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['employees'] = Employee.objects.filter(
tenant=self.request.user.tenant,
employment_status='ACTIVE'
).order_by('last_name', 'first_name')
context['programs'] = TrainingPrograms.objects.filter(
tenant=self.request.user.tenant,
is_certified=True
).order_by('name')
return context
class TrainingCertificateDetailView(LoginRequiredMixin, DetailView):
"""Display detailed information about a training certificate."""
model = TrainingCertificates
template_name = 'hr/training/certificate_detail.html'
context_object_name = 'certificate'
def get_queryset(self):
return TrainingCertificates.objects.filter(
employee__tenant=self.request.user.tenant
).select_related('employee', 'program', 'enrollment', 'signed_by')
class LeaveRequestListView(LoginRequiredMixin, ListView):
"""
List all leave requests with filtering capabilities.
"""
model = LeaveRequest
template_name = 'hr/leave/leave_request_list.html'
context_object_name = 'leave_requests'
paginate_by = 20
def get_queryset(self):
queryset = LeaveRequest.objects.filter(
employee__tenant=self.request.user.tenant
).select_related('employee', 'leave_type', 'current_approver')
# Filter by employee
employee_id = self.request.GET.get('employee')
if employee_id:
queryset = queryset.filter(employee_id=employee_id)
# Filter by leave type
leave_type_id = self.request.GET.get('leave_type')
if leave_type_id:
queryset = queryset.filter(leave_type_id=leave_type_id)
# Filter by status
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
# Filter by date range
start_date = self.request.GET.get('start_date')
if start_date:
queryset = queryset.filter(start_date__gte=start_date)
end_date = self.request.GET.get('end_date')
if end_date:
queryset = queryset.filter(end_date__lte=end_date)
# Search
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(employee__first_name__icontains=search) |
Q(employee__last_name__icontains=search) |
Q(reason__icontains=search)
)
return queryset.order_by('-created_at')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['employees'] = Employee.objects.filter(
tenant=self.request.user.tenant,
employment_status='ACTIVE'
).order_by('last_name', 'first_name')
context['leave_types'] = LeaveType.objects.filter(
tenant=self.request.user.tenant,
is_active=True
).order_by('name')
context['statuses'] = LeaveRequest.RequestStatus.choices
return context
class LeaveRequestDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about a leave request.
"""
model = LeaveRequest
template_name = 'hr/leave/leave_request_detail.html'
context_object_name = 'leave_request'
def get_queryset(self):
return LeaveRequest.objects.filter(
employee__tenant=self.request.user.tenant
).select_related(
'employee', 'leave_type', 'current_approver',
'final_approver', 'cancelled_by'
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
leave_request = self.get_object()
# Get approval history
context['approvals'] = LeaveApproval.objects.filter(
leave_request=leave_request
).select_related('approver', 'delegated_by').order_by('level')
# Get employee's leave balance
try:
context['balance'] = LeaveBalance.objects.get(
employee=leave_request.employee,
leave_type=leave_request.leave_type,
year=leave_request.start_date.year
)
except LeaveBalance.DoesNotExist:
context['balance'] = None
# Check if current user can approve
if hasattr(self.request.user, 'employee_profile'):
context['can_approve'] = (
leave_request.current_approver == self.request.user.employee_profile and
leave_request.status == 'PENDING'
)
else:
context['can_approve'] = False
return context
class LeaveRequestCreateView(LoginRequiredMixin, CreateView):
"""
Create a new leave request.
"""
model = LeaveRequest
form_class = LeaveRequestForm
template_name = 'hr/leave/leave_request_form.html'
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
if hasattr(self.request.user, 'employee_profile'):
kwargs['employee'] = self.request.user.employee_profile
return kwargs
def form_valid(self, form):
if hasattr(self.request.user, 'employee_profile'):
form.instance.employee = self.request.user.employee_profile
form.instance.created_by = self.request.user
form.instance.status = 'PENDING'
form.instance.submitted_at = timezone.now()
messages.success(self.request, 'Leave request submitted successfully.')
return super().form_valid(form)
else:
messages.error(self.request, 'You must have an employee profile to request leave.')
return redirect('hr:leave_request_list')
def get_success_url(self):
return reverse('hr:leave_request_detail', kwargs={'pk': self.object.pk})
class LeaveRequestUpdateView(LoginRequiredMixin, UpdateView):
"""
Update an existing leave request (only if in DRAFT status).
"""
model = LeaveRequest
form_class = LeaveRequestForm
template_name = 'hr/leave/leave_request_form.html'
def get_queryset(self):
return LeaveRequest.objects.filter(
employee__tenant=self.request.user.tenant,
status='DRAFT'
)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
if hasattr(self.request.user, 'employee_profile'):
kwargs['employee'] = self.request.user.employee_profile
return kwargs
def form_valid(self, form):
messages.success(self.request, 'Leave request updated successfully.')
return super().form_valid(form)
def get_success_url(self):
return reverse('hr:leave_request_detail', kwargs={'pk': self.object.pk})
class SalaryListView(LoginRequiredMixin, ListView):
"""
List all salary records with filtering capabilities.
"""
model = SalaryInformation
template_name = 'hr/salary/salary_list.html'
context_object_name = 'salary_records'
paginate_by = 20
def get_queryset(self):
queryset = SalaryInformation.objects.filter(
employee__tenant=self.request.user.tenant
).select_related('employee', 'employee__department')
# Filter by employee
employee_id = self.request.GET.get('employee')
if employee_id:
queryset = queryset.filter(employee_id=employee_id)
# Filter by currency
currency = self.request.GET.get('currency')
if currency:
queryset = queryset.filter(currency=currency)
# Filter by payment frequency
payment_frequency = self.request.GET.get('payment_frequency')
if payment_frequency:
queryset = queryset.filter(payment_frequency=payment_frequency)
# Filter by active status
is_active = self.request.GET.get('is_active')
if is_active:
queryset = queryset.filter(is_active=(is_active == 'true'))
# Search
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(employee__first_name__icontains=search) |
Q(employee__last_name__icontains=search) |
Q(employee__employee_id__icontains=search)
)
return queryset.order_by('-effective_date', 'employee__last_name')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['employees'] = Employee.objects.filter(
tenant=self.request.user.tenant,
employment_status='ACTIVE'
).order_by('last_name', 'first_name')
context['currencies'] = SalaryInformation.Currency.choices
context['payment_frequencies'] = SalaryInformation.PaymentFrequency.choices
return context
class SalaryCreateView(LoginRequiredMixin, CreateView):
"""
Create a new salary record.
"""
model = SalaryInformation
form_class = SalaryInformationForm
template_name = 'hr/salary/salary_form.html'
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
def form_valid(self, form):
form.instance.tenant = self.request.user.tenant
form.instance.created_by = self.request.user
messages.success(self.request, 'Salary record created successfully.')
return super().form_valid(form)
def get_success_url(self):
return reverse('hr:salary_detail', kwargs={'pk': self.object.pk})
class SalaryDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about a salary record.
"""
model = SalaryInformation
template_name = 'hr/salary/salary_detail.html'
context_object_name = 'salary'
def get_queryset(self):
return SalaryInformation.objects.filter(
employee__tenant=self.request.user.tenant
).select_related('employee', 'employee__department', 'created_by')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
salary = self.get_object()
# Get salary adjustment history
context['adjustments'] = SalaryAdjustment.objects.filter(
Q(previous_salary=salary) | Q(new_salary=salary)
).select_related('employee', 'approved_by').order_by('-created_at')
# Get employee's salary history
context['salary_history'] = SalaryInformation.objects.filter(
employee=salary.employee,
employee__tenant=self.request.user.tenant
).order_by('-effective_date')[:10]
return context
class SalaryUpdateView(LoginRequiredMixin, UpdateView):
"""
Update an existing salary record.
"""
model = SalaryInformation
form_class = SalaryInformationForm
template_name = 'hr/salary/salary_form.html'
def get_queryset(self):
return SalaryInformation.objects.filter(employee__tenant=self.request.user.tenant)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
def form_valid(self, form):
messages.success(self.request, 'Salary record updated successfully.')
return super().form_valid(form)
def get_success_url(self):
return reverse('hr:salary_detail', kwargs={'pk': self.object.pk})
class SalaryDeleteView(LoginRequiredMixin, DeleteView):
"""
Delete a salary record (soft delete by marking as inactive).
"""
model = SalaryInformation
template_name = 'hr/salary/salary_confirm_delete.html'
success_url = reverse_lazy('hr:salary_list')
def get_queryset(self):
return SalaryInformation.objects.filter(tenant=self.request.user.tenant)
def delete(self, request, *args, **kwargs):
salary = self.get_object()
# Soft delete - mark as inactive
salary.is_active = False
salary.end_date = timezone.now().date()
salary.save()
messages.success(request, f'Salary record for {salary.employee.get_full_name()} has been deactivated.')
return redirect(self.success_url)
class LeaveBalanceListView(LoginRequiredMixin, ListView):
"""
List leave balances for all employees.
"""
model = LeaveBalance
template_name = 'hr/leave/leave_balance_list.html'
context_object_name = 'balances'
paginate_by = 20
def get_queryset(self):
current_year = date.today().year
queryset = LeaveBalance.objects.filter(
employee__tenant=self.request.user.tenant,
year=current_year
).select_related('employee', 'leave_type')
# Filter by employee
employee_id = self.request.GET.get('employee')
if employee_id:
queryset = queryset.filter(employee_id=employee_id)
# Filter by leave type
leave_type_id = self.request.GET.get('leave_type')
if leave_type_id:
queryset = queryset.filter(leave_type_id=leave_type_id)
return queryset.order_by('employee__last_name', 'employee__first_name', 'leave_type__name')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['employees'] = Employee.objects.filter(
tenant=self.request.user.tenant,
employment_status='ACTIVE'
).order_by('last_name', 'first_name')
context['leave_types'] = LeaveType.objects.filter(
tenant=self.request.user.tenant,
is_active=True
).order_by('name')
context['current_year'] = date.today().year
return context
class LeaveDelegateListView(LoginRequiredMixin, ListView):
"""
List all leave delegations.
"""
model = LeaveDelegate
template_name = 'hr/leave/leave_delegate_list.html'
context_object_name = 'delegations'
paginate_by = 20
def get_queryset(self):
queryset = LeaveDelegate.objects.filter(
delegator__tenant=self.request.user.tenant
).select_related('delegator', 'delegate')
# Filter by active status
is_active = self.request.GET.get('is_active')
if is_active:
queryset = queryset.filter(is_active=(is_active == 'true'))
# Filter by current status
show_current = self.request.GET.get('show_current')
if show_current == 'true':
today = date.today()
queryset = queryset.filter(
is_active=True,
start_date__lte=today,
end_date__gte=today
)
return queryset.order_by('-start_date')
class LeaveDelegateCreateView(LoginRequiredMixin, CreateView):
"""
Create a new leave delegation.
"""
model = LeaveDelegate
form_class = LeaveDelegateForm
template_name = 'hr/leave/leave_delegate_form.html'
success_url = reverse_lazy('hr:leave_delegate_list')
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
if hasattr(self.request.user, 'employee_profile'):
kwargs['delegator'] = self.request.user.employee_profile
return kwargs
def form_valid(self, form):
if hasattr(self.request.user, 'employee_profile'):
form.instance.delegator = self.request.user.employee_profile
form.instance.created_by = self.request.user
messages.success(self.request, 'Leave delegation created successfully.')
return super().form_valid(form)
else:
messages.error(request, 'You must have an employee profile to create delegations.')
return redirect('hr:leave_delegate_list')
class SalaryAdjustmentCreateView(LoginRequiredMixin, CreateView):
"""
Create a new salary adjustment.
"""
model = SalaryAdjustment
form_class = SalaryAdjustmentForm
template_name = 'hr/salary/salary_adjustment_form.html'
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
def form_valid(self, form):
form.instance.tenant = self.request.user.tenant
form.instance.created_by = self.request.user
# Auto-approve if user has permission
if self.request.user.has_perm('hr.approve_salaryadjustment'):
form.instance.approved_by = self.request.user
form.instance.approval_date = timezone.now()
messages.success(self.request, 'Salary adjustment created successfully.')
return super().form_valid(form)
def get_success_url(self):
return reverse('hr:salary_adjustment_list')
class SalaryAdjustmentListView(LoginRequiredMixin, ListView):
"""
List all salary adjustments with filtering capabilities.
"""
model = SalaryAdjustment
template_name = 'hr/salary/salary_adjustment_list.html'
context_object_name = 'adjustments'
paginate_by = 20
def get_queryset(self):
queryset = SalaryAdjustment.objects.filter(
tenant=self.request.user.tenant
).select_related('employee', 'previous_salary', 'new_salary', 'approved_by')
# Filter by employee
employee_id = self.request.GET.get('employee')
if employee_id:
queryset = queryset.filter(employee_id=employee_id)
# Filter by adjustment type
adjustment_type = self.request.GET.get('adjustment_type')
if adjustment_type:
queryset = queryset.filter(adjustment_type=adjustment_type)
# Filter by approval status
is_approved = self.request.GET.get('is_approved')
if is_approved == 'true':
queryset = queryset.filter(approved_by__isnull=False)
elif is_approved == 'false':
queryset = queryset.filter(approved_by__isnull=True)
# Filter by date range
start_date = self.request.GET.get('start_date')
if start_date:
queryset = queryset.filter(adjustment_date__gte=start_date)
end_date = self.request.GET.get('end_date')
if end_date:
queryset = queryset.filter(adjustment_date__lte=end_date)
return queryset.order_by('-adjustment_date')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['employees'] = Employee.objects.filter(
tenant=self.request.user.tenant,
employment_status='ACTIVE'
).order_by('last_name', 'first_name')
context['adjustment_types'] = SalaryAdjustment.AdjustmentType.choices
return context
class DocumentRequestListView(LoginRequiredMixin, ListView):
"""
List all document requests with filtering capabilities.
"""
model = DocumentRequest
template_name = 'hr/documents/document_request_list.html'
context_object_name = 'document_requests'
paginate_by = 20
def get_queryset(self):
tenant = self.request.user.tenant
queryset = DocumentRequest.objects.filter(
employee__tenant=tenant
).select_related('employee', 'employee__department', 'processed_by')
# Filter by employee
employee_id = self.request.GET.get('employee')
if employee_id:
queryset = queryset.filter(employee_id=employee_id)
# Filter by document type
document_type = self.request.GET.get('document_type')
if document_type:
queryset = queryset.filter(document_type=document_type)
# Filter by status
status = self.request.GET.get('status')
if status:
queryset = queryset.filter(status=status)
# Filter by language
language = self.request.GET.get('language')
if language:
queryset = queryset.filter(language=language)
# Filter by delivery method
delivery_method = self.request.GET.get('delivery_method')
if delivery_method:
queryset = queryset.filter(delivery_method=delivery_method)
# Filter by urgent
is_urgent = self.request.GET.get('is_urgent')
if is_urgent == 'true':
queryset = queryset.filter(
required_by_date__lte=timezone.now().date() + timedelta(days=3)
)
# Search
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
Q(employee__first_name__icontains=search) |
Q(employee__last_name__icontains=search) |
Q(document_number__icontains=search) |
Q(purpose__icontains=search)
)
return queryset.order_by('-created_at')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['employees'] = Employee.objects.filter(
tenant=self.request.user.tenant,
employment_status='ACTIVE'
).order_by('last_name', 'first_name')
context['document_types'] = DocumentRequest.DocumentType.choices
context['statuses'] = DocumentRequest.RequestStatus.choices
context['languages'] = DocumentRequest.Language.choices
context['delivery_methods'] = DocumentRequest.DeliveryMethod.choices
return context
class DocumentRequestCreateView(LoginRequiredMixin, CreateView):
"""
Create a new document request.
"""
model = DocumentRequest
form_class = DocumentRequestForm
template_name = 'hr/documents/document_request_form.html'
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
if hasattr(self.request.user, 'employee_profile'):
kwargs['employee'] = self.request.user.employee_profile
return kwargs
def form_valid(self, form):
form.instance.tenant = self.request.user.tenant
# Set employee if not already set
if not form.instance.employee and hasattr(self.request.user, 'employee_profile'):
form.instance.employee = self.request.user.employee_profile
form.instance.created_by = self.request.user
form.instance.status = 'PENDING'
messages.success(self.request, 'Document request submitted successfully.')
return super().form_valid(form)
def get_success_url(self):
return reverse('hr:document_request_detail', kwargs={'pk': self.object.pk})
class DocumentRequestDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about a document request.
"""
model = DocumentRequest
template_name = 'hr/documents/document_request_detail.html'
context_object_name = 'document_request'
def get_queryset(self):
tenant = self.request.user.tenant
return DocumentRequest.objects.filter(
employee__tenant=tenant
).select_related('employee', 'employee__department', 'processed_by', 'created_by')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
document_request = self.get_object()
# Check if current user can process
context['can_process'] = (
self.request.user.has_perm('hr.change_documentrequest') and
document_request.status in ['PENDING', 'IN_PROGRESS']
)
# Check if current user can update (only if draft or pending and is owner)
context['can_update'] = (
hasattr(self.request.user, 'employee_profile') and
document_request.employee == self.request.user.employee_profile and
document_request.status in ['DRAFT', 'PENDING']
)
# Check if current user can cancel
context['can_cancel'] = (
hasattr(self.request.user, 'employee_profile') and
document_request.employee == self.request.user.employee_profile and
document_request.status in ['PENDING', 'IN_PROGRESS']
)
return context
class DocumentRequestUpdateView(LoginRequiredMixin, UpdateView):
"""
Update an existing document request (only if in DRAFT or PENDING status).
"""
model = DocumentRequest
form_class = DocumentRequestForm
template_name = 'hr/documents/document_request_form.html'
def get_queryset(self):
tenant = self.request.user.tenant
# Only allow updates for draft or pending requests by the owner
queryset = DocumentRequest.objects.filter(
employee__tenant=tenant,
status__in=['DRAFT', 'PENDING']
)
if hasattr(self.request.user, 'employee_profile'):
queryset = queryset.filter(employee=self.request.user.employee_profile)
return queryset
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
if hasattr(self.request.user, 'employee_profile'):
kwargs['employee'] = self.request.user.employee_profile
return kwargs
def form_valid(self, form):
messages.success(self.request, 'Document request updated successfully.')
return super().form_valid(form)
def get_success_url(self):
return reverse('hr:document_request_detail', kwargs={'pk': self.object.pk})
class DocumentTemplateListView(LoginRequiredMixin, ListView):
"""
List all document templates (admin only).
"""
model = DocumentTemplate
template_name = 'hr/documents/document_template_list.html'
context_object_name = 'templates'
paginate_by = 20
def dispatch(self, request, *args, **kwargs):
if not request.user.has_perm('hr.view_documenttemplate'):
messages.error(request, 'You do not have permission to view document templates.')
return redirect('hr:dashboard')
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
tenant = self.request.user.tenant
queryset = DocumentTemplate.objects.filter(
tenant=tenant
)
# Filter by document type
document_type = self.request.GET.get('document_type')
if document_type:
queryset = queryset.filter(document_type=document_type)
# Filter by language
language = self.request.GET.get('language')
if language:
queryset = queryset.filter(language=language)
# Filter by active status
is_active = self.request.GET.get('is_active')
if is_active:
queryset = queryset.filter(is_active=(is_active == 'true'))
# Filter by default status
is_default = self.request.GET.get('is_default')
if is_default:
queryset = queryset.filter(is_default=(is_default == 'true'))
return queryset.order_by('document_type', 'language', '-is_default')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['document_types'] = DocumentRequest.DocumentType.choices
context['languages'] = DocumentRequest.Language.choices
return context
class DocumentTemplateCreateView(LoginRequiredMixin, CreateView):
"""
Create a new document template (admin only).
"""
model = DocumentTemplate
form_class = DocumentTemplateForm
template_name = 'hr/documents/document_template_form.html'
def dispatch(self, request, *args, **kwargs):
if not request.user.has_perm('hr.add_documenttemplate'):
messages.error(request, 'You do not have permission to create document templates.')
return redirect('hr:document_template_list')
return super().dispatch(request, *args, **kwargs)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
def form_valid(self, form):
form.instance.tenant = self.request.user.tenant
form.instance.created_by = self.request.user
messages.success(self.request, 'Document template created successfully.')
return super().form_valid(form)
def get_success_url(self):
return reverse('hr:document_template_detail', kwargs={'pk': self.object.pk})
class DocumentTemplateDetailView(LoginRequiredMixin, DetailView):
"""
Display detailed information about a document template.
"""
model = DocumentTemplate
template_name = 'hr/documents/document_template_detail.html'
context_object_name = 'template'
def dispatch(self, request, *args, **kwargs):
if not request.user.has_perm('hr.view_documenttemplate'):
messages.error(request, 'You do not have permission to view document templates.')
return redirect('hr:dashboard')
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
tenant = self.request.user.tenant
return DocumentTemplate.objects.filter(
tenant=tenant
).select_related('created_by')
def get_context_data(self, **kwargs):
tenant = self.request.user.tenant
context = super().get_context_data(**kwargs)
template = self.get_object()
# Get recent document requests using this template
context['recent_requests'] = DocumentRequest.objects.filter(
employee__tenant=tenant,
document_type=template.document_type,
language=template.language
).select_related('employee').order_by('-created_at')[:10]
return context
class DocumentTemplateUpdateView(LoginRequiredMixin, UpdateView):
"""
Update an existing document template (admin only).
"""
model = DocumentTemplate
form_class = DocumentTemplateForm
template_name = 'hr/documents/document_template_form.html'
def dispatch(self, request, *args, **kwargs):
if not request.user.has_perm('hr.change_documenttemplate'):
messages.error(request, 'You do not have permission to update document templates.')
return redirect('hr:document_template_list')
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return DocumentTemplate.objects.filter(tenant=self.request.user.tenant)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
def form_valid(self, form):
messages.success(self.request, 'Document template updated successfully.')
return super().form_valid(form)
def get_success_url(self):
return reverse('hr:document_template_detail', kwargs={'pk': self.object.pk})
class DocumentTemplateDeleteView(LoginRequiredMixin, DeleteView):
"""
Delete a document template (admin only).
"""
model = DocumentTemplate
template_name = 'hr/documents/document_template_confirm_delete.html'
success_url = reverse_lazy('hr:document_template_list')
def dispatch(self, request, *args, **kwargs):
if not request.user.has_perm('hr.delete_documenttemplate'):
messages.error(request, 'You do not have permission to delete document templates.')
return redirect('hr:document_template_list')
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
tenant = self.request.user.tenant
return DocumentTemplate.objects.filter(tenant=tenant)
def delete(self, request, *args, **kwargs):
template = self.get_object()
# Check if template is being used
active_requests = DocumentRequest.objects.filter(
tenant=request.user.tenant,
document_type=template.document_type,
language=template.language,
status__in=['PENDING', 'IN_PROGRESS']
).count()
if active_requests > 0:
messages.error(
request,
f'Cannot delete template. There are {active_requests} active document requests using this template.'
)
return redirect('hr:document_template_detail', pk=template.pk)
messages.success(request, f'Document template "{template.name}" deleted successfully.')
return super().delete(request, *args, **kwargs)
@login_required
def complete_performance_review(request, review_id):
review = get_object_or_404(PerformanceReview, pk=review_id)
review.status = 'COMPLETED'
review.completed_by = request.user
review.save()
messages.success(request, 'Performance review completed successfully.')
return redirect('hr:performance_review_detail', pk=review.pk)
@login_required
def hr_stats(request):
"""
Return HR statistics for dashboard updates.
"""
context = {
'total_employees': Employee.objects.filter(tenant=request.user.tenant).count(),
'active_employees': Employee.objects.filter(
tenant=request.user.tenant,
employment_status='ACTIVE'
).count(),
'total_departments': Department.objects.filter(tenant=request.user.tenant).count(),
'pending_reviews': PerformanceReview.objects.filter(
employee__tenant=request.user.tenant,
status='IN_PROGRESS'
).count(),
'employees_clocked_in': TimeEntry.objects.filter(
employee__tenant=request.user.tenant,
clock_in_time__date=timezone.now().date(),
clock_out_time__isnull=True
).count(),
}
return render(request, 'hr/partials/hr_stats.html', context)
@login_required
def employee_search(request):
"""
Search employees for HTMX updates.
"""
search = request.GET.get('search', '')
employees = Employee.objects.filter(
tenant=request.user.tenant
)
if search:
employees = employees.filter(
Q(first_name__icontains=search) |
Q(last_name__icontains=search) |
Q(employee_id__icontains=search)
)
employees = employees.select_related('department')[:20]
return render(request, 'hr/partials/employee_list.html', {
'employees': employees
})
@login_required
def attendance_summary(request):
"""
Return attendance summary for dashboard updates.
"""
today = timezone.now().date()
tenant = request.user.tenant
# Total active employees
total_active_employees = Employee.objects.filter(
tenant=tenant,
employment_status='ACTIVE'
).count()
# Employees clocked in today (still working)
clocked_in_entries = TimeEntry.objects.filter(
employee__tenant=tenant,
clock_in_time__date=today,
clock_out_time__isnull=True
).select_related('employee', 'employee__department')
employees_clocked_in = clocked_in_entries.count()
# Late arrivals (clocked in after 9:00 AM)
late_today = TimeEntry.objects.filter(
employee__tenant=tenant,
clock_in_time__date=today,
clock_in_time__time__gt=time(9, 0), # Assuming 9 AM is late
clock_out_time__isnull=True
).count()
# Absent today (active employees not clocked in)
absent_today = total_active_employees - employees_clocked_in
# Total hours today (completed shifts)
total_hours_today = TimeEntry.objects.filter(
employee__tenant=tenant,
clock_in_time__date=today,
clock_out_time__isnull=False
).aggregate(
total=Sum('total_hours')
)['total']
# Current employees (detailed list for display)
current_employees = clocked_in_entries.select_related('employee', 'employee__department').order_by(
'-clock_in_time')
context = {
'employees_clocked_in': employees_clocked_in,
'absent_today': absent_today,
'late_today': late_today,
'total_hours_today': total_hours_today,
'current_employees': current_employees,
'total_active_employees': total_active_employees,
}
return render(request, 'hr/partials/attendance_summary.html', context)
@require_GET
def clock_controls(request, employee_id):
"""Return the clock controls partial for today's state (HTMX GET)."""
employee = get_object_or_404(Employee, id=employee_id, tenant=request.user.tenant)
today = timezone.localdate()
# Prefer the open entry for today; otherwise the last entry today (finished)
time_entry = (
TimeEntry.objects
.filter(employee=employee, work_date=today)
.order_by('clock_out_time', '-clock_in_time') # open first (clock_out_time NULL), else latest finished
.first()
)
return render(request, 'hr/partials/clock_controls.html', {
'employee': employee,
'time_entry': time_entry,
})
def _render_controls(request, employee, time_entry):
html = render_to_string('hr/partials/clock_controls.html',
{'employee': employee, 'time_entry': time_entry},
request=request)
return HttpResponse(html)
@require_POST
def clock_in(request, employee_id):
employee = get_object_or_404(Employee, id=employee_id, tenant=request.user.tenant)
today = timezone.localdate()
open_entry = TimeEntry.objects.filter(
employee=employee, work_date=today, clock_out_time__isnull=True
).first()
if open_entry:
return _render_controls(request, employee, open_entry)
time_entry = TimeEntry.objects.create(
employee=employee, work_date=today, clock_in_time=timezone.now(), status='DRAFT'
)
if request.headers.get('HX-Request'):
return _render_controls(request, employee, time_entry)
return JsonResponse({'success': True, 'time_entry_id': time_entry.id})
@require_POST
def clock_out(request, employee_id):
employee = get_object_or_404(Employee, id=employee_id, tenant=request.user.tenant)
today = timezone.localdate()
time_entry = TimeEntry.objects.filter(
employee=employee, work_date=today, clock_out_time__isnull=True
).first()
if not time_entry:
# Re-render to default state (will show "Clock In")
if request.headers.get('HX-Request'):
return _render_controls(request, employee, None)
return JsonResponse({'success': False, 'message': 'No active clock-in found.'}, status=400)
time_entry.clock_out_time = timezone.now()
time_entry.save()
if request.headers.get('HX-Request'):
return _render_controls(request, employee, time_entry)
return JsonResponse({'success': True, 'time_entry_id': time_entry.id})
@login_required
def approve_time_entry(request, entry_id):
"""
Approve a time entry.
"""
time_entry = get_object_or_404(
TimeEntry,
id=entry_id,
employee__tenant=request.user.tenant
)
if request.method == 'POST':
# Check if entry is complete
if not time_entry.clock_out_time:
messages.error(request, 'Cannot approve incomplete time entry.')
return redirect('hr:time_entry_detail', pk=time_entry.id)
# Update time entry
time_entry.status = 'APPROVED'
time_entry.approved_by = request.user
time_entry.approval_date = timezone.now()
time_entry.save()
messages.success(request, 'Time entry approved successfully.')
# Redirect based on source
next_url = request.POST.get('next', reverse('hr:time_entry_detail', kwargs={'pk': time_entry.id}))
return redirect(next_url)
return render(request, 'hr/time_entries/time_entry_approve.html', {
'time_entry': time_entry
})
@login_required
def publish_schedule(request, schedule_id):
"""
Publish a schedule.
"""
schedule = get_object_or_404(
Schedule,
id=schedule_id,
employee__tenant=request.user.tenant
)
if request.method == 'POST':
# Check if schedule has assignments
if not ScheduleAssignment.objects.filter(schedule=schedule).exists():
messages.error(request, 'Cannot publish empty schedule.')
return redirect('hr:schedule_detail', pk=schedule.id)
# Update schedule
schedule.approved_by = request.user
schedule.approval_date = timezone.now()
schedule.save()
messages.success(request, 'Schedule published successfully.')
return redirect('hr:schedule_detail', pk=schedule.id)
return render(request, 'hr/schedules/schedule_publish.html', {
'schedule': schedule
})
@login_required
def api_employee_list(request):
"""
API endpoint for employee list.
"""
employees = Employee.objects.filter(
tenant=request.user.tenant
).values('employee_id', 'first_name', 'father_name', 'grandfather_name', 'last_name', 'job_title')
return JsonResponse({'employees': list(employees)})
@login_required
def api_department_list(request):
"""
API endpoint for department list.
"""
tenant = request.user.tenant
departments = Department.objects.filter(
tenant=tenant
).values('id', 'name', 'department_code')
return JsonResponse({'departments': list(departments)})
# def department_tree(request):
# """
# HTMX view for department tree structure.
# """
# from django.db.models import Count, Q
# tenant = get_tenant_from_request(request)
# departments = Department.objects.filter(
# tenant=tenant,
# parent_department=None
# ).prefetch_related('sub_departments').annotate(
# employee_count=Count(
# 'employee',
# filter=Q(employee__employment_status='ACTIVE'),
# distinct=True
# )
# ).order_by('name')
#
# return render(request, 'hr/departments/department_tree.html', {
# 'departments': departments
# })
#
#
# @login_required
# def department_children(request, department_id):
# """
# HTMX endpoint to load department children dynamically.
# """
# tenant = request.user.tenant
# try:
# parent_department = get_object_or_404(
# Department,
# department_id=department_id,
# tenant=tenant
# )
#
# # Get children departments
# children = Department.objects.filter(
# tenant=tenant,
# parent_department=parent_department
# ).prefetch_related('sub_departments').order_by('name')
#
# # Calculate level for children (parent level + 1)
# # Get parent level from request or calculate from hierarchy
# parent_level = int(request.GET.get('level', 0))
# child_level = parent_level + 1
#
# # Annotate employee count for each child
# from django.db.models import Count, Q
# children = children.annotate(
# employee_count=Count(
# 'employee',
# filter=Q(employee__employment_status='ACTIVE'),
# distinct=True
# )
# )
#
# # Render children using the node partial
# from django.template.loader import render_to_string
# html_content = ""
# for child in children:
# html_content += render_to_string(
# 'hr/departments/partials/_department_node.html',
# {
# 'department': child,
# 'level': child_level
# },
# request=request
# )
#
# from django.http import HttpResponse
# return HttpResponse(html_content)
#
# except Department.DoesNotExist:
# from django.http import HttpResponse
# return HttpResponse(
# '<div class="alert alert-danger m-2">'
# '<i class="fa fa-exclamation-triangle"></i> Department not found'
# '</div>',
# status=404
# )
# except Exception as e:
# from django.http import HttpResponse
# return HttpResponse(
# '<div class="alert alert-danger m-2">'
# '<i class="fa fa-exclamation-triangle"></i> Error loading children: ' + str(e) +
# '</div>',
# status=500
# )
@login_required
def department_tree(request):
tenant = request.user.tenant
roots = (
Department.objects
.filter(tenant=tenant, parent_department=None)
.select_related('department_head', 'parent_department')
.prefetch_related('sub_departments')
.annotate(
employee_count=Count(
'employees',
filter=Q(employees__employment_status='ACTIVE'),
distinct=True
),
child_count=Count('sub_departments', distinct=True)
)
.order_by('name')
)
return render(request, 'hr/departments/department_tree.html', {'departments': roots, 'level': 0})
@login_required
def department_children(request, department_id):
tenant = request.user.tenant
parent = get_object_or_404(Department, tenant=tenant, department_id=department_id)
level = int(request.GET.get('level', 0)) + 1
children = (
Department.objects
.filter(tenant=tenant, parent_department=parent)
.select_related('department_head', 'parent_department')
.prefetch_related('sub_departments')
.annotate(
employee_count=Count(
'employees',
filter=Q(employees__employment_status='ACTIVE'),
distinct=True
),
child_count=Count('sub_departments', distinct=True)
)
.order_by('name')
)
# One render with a loop inside the partial
return render(request, 'hr/departments/partials/_department_children.html', {
'children': children,
'level': level
})
@login_required
def activate_department(request, pk):
"""
Activate a department.
"""
department = get_object_or_404(Department, department_id=pk)
department.is_active = True
department.save()
messages.success(request, f'Department "{department.name}" has been activated.')
return redirect('hr:department_detail', pk=pk)
@login_required
def deactivate_department(request, pk):
"""
Deactivate a department.
"""
department = get_object_or_404(Department, department_id=pk)
department.is_active = False
department.save()
messages.success(request, f'Department "{department.name}" has been deactivated.')
return redirect('hr:department_detail', pk=pk)
@login_required
def department_search(request):
"""
AJAX search for departments.
"""
query = request.GET.get('q', '')
departments = []
if query:
departments = Department.objects.filter(
tenant=request.user.tenant,
name__icontains=query
).values('department_id', 'name', 'department_type')[:10]
return JsonResponse({'departments': list(departments)})
@login_required
def bulk_activate_departments(request):
"""
Bulk activate departments.
"""
if request.method == 'POST':
department_ids = request.POST.getlist('department_ids')
count = Department.objects.filter(
tenant=request.user.tenant,
department_id__in=department_ids
).update(is_active=True)
messages.success(request, f'{count} departments have been activated.')
return redirect('hr:department_list')
@login_required
def bulk_deactivate_departments(request):
"""
Bulk deactivate departments.
"""
if request.method == 'POST':
department_ids = request.POST.getlist('department_ids')
count = Department.objects.filter(
tenant=request.user.tenant,
department_id__in=department_ids
).update(is_active=False)
messages.success(request, f'{count} departments have been deactivated.')
return redirect('hr:department_list')
@login_required
def get_department_hierarchy(request):
"""
Get department hierarchy as JSON.
"""
departments = Department.objects.filter(
tenant=request.user.tenant,
is_active=True
).select_related('parent')
def build_tree(parent=None):
children = []
for dept in departments:
if dept.parent == parent:
children.append({
'id': str(dept.department_id),
'name': dept.name,
'type': dept.department_type,
'children': build_tree(dept)
})
return children
hierarchy = build_tree()
return JsonResponse({'hierarchy': hierarchy})
@login_required
def assign_department_head(request, pk):
"""
Assign a department head to a department.
"""
tenant = request.user.tenant
department = get_object_or_404(Department, pk=pk, tenant=tenant)
if request.method == 'POST':
user_id = request.POST.get('user_id')
if user_id:
try:
user = User.objects.get(id=user_id, tenant=tenant)
employee = Employee.objects.get(user=user)
# Remove current department head if exists
if department.department_head:
old_head = department.department_head
AuditLogger.log_event(
tenant=tenant,
request=request,
event_type='UPDATE',
event_category='SYSTEM_ADMINISTRATION',
action=f'Removed department head from {department.name}',
description=f'Removed {old_head.get_full_name()} as head of {department.name}',
content_object=department,
additional_data={
'old_department_head_id': old_head.id,
'old_department_head_name': old_head.get_full_name()
}
)
# Assign new department head
department.department_head = employee
department.save()
# Log the assignment
AuditLogger.log_event(
tenant=tenant,
request=request,
event_type='UPDATE',
event_category='SYSTEM_ADMINISTRATION',
action=f'Assigned department head to {department.name}',
description=f'Assigned {user.get_full_name()} as head of {department.name}',
content_object=department,
additional_data={
'new_department_head_id': user.id,
'new_department_head_name': user.get_full_name()
}
)
messages.success(
request,
f'{user.get_full_name()} has been assigned as head of {department.name}.'
)
return redirect('hr:department_detail', pk=department.pk)
except User.DoesNotExist:
messages.error(request, 'Selected user not found.')
else:
# Remove department head
if department.department_head:
old_head = department.department_head
department.department_head = None
department.save()
# Log the removal
AuditLogger.log_event(
tenant=tenant,
request=request,
event_type='UPDATE',
event_category='SYSTEM_ADMINISTRATION',
action=f'Removed department head from {department.name}',
description=f'Removed {old_head.get_full_name()} as head of {department.name}',
content_object=department,
additional_data={
'removed_department_head_id': old_head.id,
'removed_department_head_name': old_head.get_full_name()
}
)
messages.success(
request,
f'Department head has been removed from {department.name}.'
)
else:
messages.info(request, 'No department head was assigned.')
return redirect('hr:department_detail', pk=department.pk)
# Get eligible users (staff members who can be department heads)
eligible_users = User.objects.filter(
tenant=tenant,
is_active=True,
is_staff=True
).exclude(
id=department.department_head.id if department.department_head else None
).order_by('first_name', 'last_name')
context = {
'department': department,
'eligible_users': eligible_users,
'current_head': department.department_head,
}
return render(request, 'hr/departments/assign_department_head.html', context)
@login_required
def mark_time_entry_paid(request, pk):
"""
Mark a time entry as paid.
"""
time_entry = get_object_or_404(
TimeEntry,
pk=pk,
tenant=request.user.tenant
)
if request.method == 'POST':
if not request.user.has_perm('hr.change_timeentry'):
messages.error(request, "You don't have permission to mark time entries as paid.")
return redirect('hr:time_entry_detail', pk=time_entry.pk)
time_entry.is_paid = True
time_entry.payment_date = timezone.now().date()
time_entry.save()
messages.success(
request,
f"Time entry for {time_entry.employee.get_full_name()} on {time_entry.entry_date} marked as paid."
)
redirect_url = request.POST.get('next', reverse('hr:time_entry_detail', kwargs={'pk': time_entry.pk}))
return redirect(redirect_url)
return redirect('hr:time_entry_detail', pk=time_entry.pk)
@login_required
def bulk_approve_time_entries(request):
"""
Bulk approve time entries.
"""
if not request.user.has_perm('hr.approve_timeentry'):
messages.error(request, "You don't have permission to approve time entries.")
return redirect('hr:time_entry_list')
if request.method == 'POST':
entry_ids = request.POST.getlist('entry_ids')
if not entry_ids:
messages.warning(request, "No time entries selected for approval.")
return redirect('hr:time_entry_list')
# Get entries that belong to this tenant
entries = TimeEntry.objects.filter(
tenant=request.user.tenant,
pk__in=entry_ids,
is_approved=False
)
# Update all entries
updated_count = entries.update(
is_approved=True,
approved_by=request.user,
approval_date=timezone.now()
)
messages.success(request, f"{updated_count} time entries approved successfully.")
redirect_url = request.POST.get('next', reverse('hr:time_entry_list'))
return redirect(redirect_url)
return redirect('hr:time_entry_list')
@login_required
def bulk_mark_time_entries_paid(request):
"""
Bulk mark time entries as paid.
"""
if not request.user.has_perm('hr.change_timeentry'):
messages.error(request, "You don't have permission to mark time entries as paid.")
return redirect('hr:time_entry_list')
if request.method == 'POST':
entry_ids = request.POST.getlist('entry_ids')
if not entry_ids:
messages.warning(request, "No time entries selected for payment.")
return redirect('hr:time_entry_list')
# Get entries that belong to this tenant
entries = TimeEntry.objects.filter(
tenant=request.user.tenant,
pk__in=entry_ids,
is_approved=True,
is_paid=False
)
# Update all entries
updated_count = entries.update(
is_paid=True,
payment_date=timezone.now().date()
)
messages.success(request, f"{updated_count} time entries marked as paid successfully.")
redirect_url = request.POST.get('next', reverse('hr:time_entry_list'))
return redirect(redirect_url)
return redirect('hr:time_entry_list')
@login_required
def acknowledge_review(request, pk):
"""
Acknowledge a performance review.
"""
review = get_object_or_404(
PerformanceReview,
pk=pk,
tenant=request.user.tenant
)
# Check if the user is the employee being reviewed
if hasattr(request.user, 'employee_profile') and request.user.employee_profile == review.employee:
if request.method == 'POST':
review.is_acknowledged = True
review.acknowledgment_date = timezone.now()
review.save()
messages.success(request, "You have acknowledged this performance review.")
redirect_url = request.POST.get('next', reverse('hr:performance_review_detail', kwargs={'pk': review.pk}))
return redirect(redirect_url)
else:
messages.error(request, "Only the employee being reviewed can acknowledge this review.")
return redirect('hr:performance_review_detail', pk=review.pk)
@login_required
def complete_review(request, pk):
"""
Mark a performance review as completed.
"""
review = get_object_or_404(
PerformanceReview,
pk=pk,
tenant=request.user.tenant
)
if request.method == 'POST':
if not request.user.has_perm('hr.change_performancereview'):
messages.error(request, "You don't have permission to complete reviews.")
return redirect('hr:performance_review_detail', pk=review.pk)
# Check if reviewer is the one completing the review
if review.reviewer != request.user:
messages.warning(request, "Only the assigned reviewer should complete this review.")
# Update review status
review.status = 'COMPLETED'
review.completion_date = timezone.now().date()
review.save()
# Update employee's last and next review dates
employee = review.employee
employee.last_review_date = review.completion_date
employee.next_review_date = review.next_review_date
employee.save()
messages.success(
request,
f"Performance review for {review.employee.get_full_name()} marked as completed."
)
redirect_url = request.POST.get('next', reverse('hr:performance_review_detail', kwargs={'pk': review.pk}))
return redirect(redirect_url)
return redirect('hr:performance_review_detail', pk=review.pk)
@login_required
def employee_schedule(request, employee_id):
"""
View an employee's schedule.
"""
employee = get_object_or_404(
Employee,
pk=employee_id,
tenant=request.user.tenant
)
# Get current schedule
current_schedule = Schedule.objects.filter(
employee=employee,
is_current=True,
is_active=True
).first()
# Get upcoming assignments
today = timezone.now().date()
upcoming_assignments = ScheduleAssignment.objects.filter(
schedule__employee=employee,
assignment_date__gte=today
).order_by('assignment_date', 'start_time')[:14] # Next 2 weeks
context = {
'employee': employee,
'current_schedule': current_schedule,
'upcoming_assignments': upcoming_assignments,
}
return render(request, 'hr/employee_schedule.html', context)
@login_required
def enroll_employee(request, session_id):
"""Enroll an employee in a training session."""
session = get_object_or_404(
TrainingSession,
pk=session_id,
program__tenant=request.user.tenant
)
if request.method == 'POST':
employee_id = request.POST.get('employee_id')
employee = get_object_or_404(
Employee,
pk=employee_id,
tenant=request.user.tenant
)
# Check if already enrolled
existing_enrollment = TrainingRecord.objects.filter(
employee=employee,
session=session
).first()
if existing_enrollment:
messages.warning(request, f'{employee.get_full_name()} is already enrolled in this session.')
else:
# Check capacity
current_enrollments = TrainingRecord.objects.filter(session=session).count()
status = 'SCHEDULED'
if session.capacity and current_enrollments >= session.capacity:
status = 'WAITLISTED'
# Create enrollment
TrainingRecord.objects.create(
employee=employee,
program=session.program,
session=session,
status=status,
created_by=request.user
)
if status == 'WAITLISTED':
messages.info(request, f'{employee.get_full_name()} has been added to the waitlist.')
else:
messages.success(request, f'{employee.get_full_name()} has been enrolled successfully.')
return redirect('hr:training_session_detail', pk=session.pk)
@login_required
def mark_attendance(request, enrollment_id):
"""Mark attendance for a training enrollment."""
enrollment = get_object_or_404(
TrainingRecord,
pk=enrollment_id,
employee__tenant=request.user.tenant
)
if request.method == 'POST':
status = request.POST.get('status', 'PRESENT')
notes = request.POST.get('notes', '')
# Create or update attendance record
attendance, created = TrainingAttendance.objects.get_or_create(
enrollment=enrollment,
defaults={
'status': status,
'notes': notes,
'checked_in_at': timezone.now() if status in ['PRESENT', 'LATE'] else None
}
)
if not created:
attendance.status = status
attendance.notes = notes
if status in ['PRESENT', 'LATE'] and not attendance.checked_in_at:
attendance.checked_in_at = timezone.now()
attendance.save()
messages.success(request, f'Attendance marked for {enrollment.employee.get_full_name()}.')
return redirect('hr:training_session_detail', pk=enrollment.session.pk)
@login_required
def issue_certificate(request, enrollment_id):
"""Issue a certificate for a completed training enrollment."""
enrollment = get_object_or_404(
TrainingRecord,
pk=enrollment_id,
employee__tenant=request.user.tenant,
status='COMPLETED',
passed=True,
program__is_certified=True
)
# Check if certificate already exists
existing_certificate = TrainingCertificates.objects.filter(enrollment=enrollment).first()
if existing_certificate:
messages.warning(request, 'Certificate already exists for this enrollment.')
return redirect('hr:training_certificate_detail', pk=existing_certificate.pk)
if request.method == 'POST':
certificate_name = request.POST.get('certificate_name', f'{enrollment.program.name} Certificate')
certification_body = request.POST.get('certification_body', 'Hospital Training Department')
# Calculate expiry date
expiry_date = None
if enrollment.program.validity_days:
expiry_date = enrollment.completion_date + timedelta(days=enrollment.program.validity_days)
# Generate certificate number
certificate_number = f"CERT-{enrollment.program.tenant.id}-{timezone.now().strftime('%Y%m%d')}-{enrollment.id}"
# Create certificate
certificate = TrainingCertificates.objects.create(
program=enrollment.program,
employee=enrollment.employee,
enrollment=enrollment,
certificate_name=certificate_name,
certificate_number=certificate_number,
certification_body=certification_body,
expiry_date=expiry_date,
created_by=request.user.employee_profile,
signed_by=request.user.employee_profile
)
messages.success(request, f'Certificate issued successfully for {enrollment.employee.get_full_name()}.')
return redirect('hr:training_certificate_detail', pk=certificate.pk)
context = {
'enrollment': enrollment,
'suggested_name': f'{enrollment.program.name} Certificate',
'suggested_body': 'Hospital Training Department'
}
return render(request, 'hr/training/issue_certificate.html', context)
@login_required
def training_analytics(request):
"""Display training analytics and reports."""
tenant = request.user.tenant
today = timezone.now().date()
# Basic statistics
total_programs = TrainingPrograms.objects.filter(tenant=tenant).count()
total_sessions = TrainingSession.objects.filter(program__tenant=tenant).count()
total_enrollments = TrainingRecord.objects.filter(employee__tenant=tenant).count()
total_certificates = TrainingCertificates.objects.filter(employee__tenant=tenant).count()
# Completion rates by program type
program_stats = []
for program_type, display_name in TrainingPrograms.TrainingType.choices:
programs = TrainingPrograms.objects.filter(tenant=tenant, program_type=program_type)
if programs.exists():
total_enrollments_type = TrainingRecord.objects.filter(program__in=programs).count()
completed_enrollments = TrainingRecord.objects.filter(
program__in=programs, status='COMPLETED'
).count()
completion_rate = 0
if total_enrollments_type > 0:
completion_rate = round((completed_enrollments / total_enrollments_type) * 100, 1)
program_stats.append({
'type': display_name,
'total_programs': programs.count(),
'total_enrollments': total_enrollments_type,
'completion_rate': completion_rate
})
# Expiring certificates
expiring_certificates = TrainingCertificates.objects.filter(
employee__tenant=tenant,
expiry_date__gte=today,
expiry_date__lte=today + timedelta(days=30)
).select_related('employee', 'program').order_by('expiry_date')
# Training compliance by department
department_compliance = []
departments = Department.objects.filter(tenant=tenant)
for dept in departments:
dept_employees = Employee.objects.filter(department=dept, employment_status='ACTIVE')
if dept_employees.exists():
total_required = dept_employees.count() * 5 # Assume 5 mandatory trainings per employee
completed_trainings = TrainingRecord.objects.filter(
employee__in=dept_employees,
status='COMPLETED',
program__program_type='MANDATORY'
).count()
compliance_rate = round((completed_trainings / total_required) * 100, 1) if total_required > 0 else 0
department_compliance.append({
'department': dept.name,
'employees': dept_employees.count(),
'compliance_rate': compliance_rate
})
context = {
'total_programs': total_programs,
'total_sessions': total_sessions,
'total_enrollments': total_enrollments,
'total_certificates': total_certificates,
'program_stats': program_stats,
'expiring_certificates': expiring_certificates,
'department_compliance': department_compliance,
}
return render(request, 'hr/training/analytics.html', context)
@login_required
def employee_training_transcript(request, employee_id):
"""Display an employee's complete training transcript."""
employee = get_object_or_404(
Employee,
pk=employee_id,
tenant=request.user.tenant
)
# Get all training records
training_records = TrainingRecord.objects.filter(
employee=employee
).select_related('program', 'session').order_by('-completion_date', '-enrolled_at')
# Get all certificates
certificates = TrainingCertificates.objects.filter(
employee=employee
).select_related('program').order_by('-issued_date')
# Calculate statistics
total_hours = sum(record.credits_earned for record in training_records if record.credits_earned)
completed_trainings = training_records.filter(status='COMPLETED').count()
context = {
'employee': employee,
'training_records': training_records,
'certificates': certificates,
'total_hours': total_hours,
'completed_trainings': completed_trainings,
}
return render(request, 'hr/training/employee_transcript.html', context)
@login_required
def training_record_mark_complete(request, record_id):
"""Mark a training record as complete."""
record = get_object_or_404(
TrainingRecord,
pk=record_id,
employee__tenant=request.user.tenant
)
if request.method == 'POST':
# Update record status
record.status = 'COMPLETED'
record.completion_date = timezone.now().date()
record.passed = True
# Set credits earned if not already set
if not record.credits_earned:
if record.session and record.session.hours_override:
record.credits_earned = record.session.hours_override
elif record.program:
record.credits_earned = record.program.duration_hours
# Get score from form if provided
score = request.POST.get('score')
if score:
try:
record.score = float(score)
record.passed = record.score >= 70 # Assuming 70% is passing
except (ValueError, TypeError):
pass
# Add completion notes
notes = request.POST.get('notes', '')
if notes:
existing_notes = record.notes or ''
record.notes = f"{existing_notes}\n\nCompleted: {notes}".strip()
record.save()
# Auto-issue certificate if program is certified and employee passed
if (record.program and record.program.is_certified and record.passed and
not TrainingCertificates.objects.filter(enrollment=record).exists()):
# Calculate expiry date
expiry_date = None
if record.program.validity_days:
expiry_date = record.completion_date + timedelta(days=record.program.validity_days)
# Generate certificate number
certificate_number = f"CERT-{record.program.tenant.id}-{timezone.now().strftime('%Y%m%d')}-{record.id}"
# Create certificate
TrainingCertificates.objects.create(
program=record.program,
employee=record.employee,
enrollment=record,
certificate_name=f'{record.program.name} Certificate',
certificate_number=certificate_number,
certification_body='Hospital Training Department',
expiry_date=expiry_date,
created_by=request.user.employee_profile if hasattr(request.user, 'employee_profile') else None
)
messages.success(request, f'Training record marked as complete and certificate issued for {record.employee.get_full_name()}.')
else:
messages.success(request, f'Training record marked as complete for {record.employee.get_full_name()}.')
return redirect('hr:training_record_detail', pk=record.pk)
context = {
'record': record,
'can_issue_certificate': (record.program and record.program.is_certified and
not TrainingCertificates.objects.filter(enrollment=record).exists())
}
return render(request, 'hr/training/mark_complete.html', context)
@login_required
def training_record_renew(request, record_id):
"""Renew an expired training record by creating a new enrollment."""
original_record = get_object_or_404(
TrainingRecord,
pk=record_id,
employee__tenant=request.user.tenant
)
if request.method == 'POST':
# Create new training record based on the original
new_record = TrainingRecord.objects.create(
employee=original_record.employee,
program=original_record.program,
session=None, # Will need to be assigned to a new session
status='SCHEDULED',
created_by=request.user
)
# Add renewal note
new_record.notes = f"Renewal of training record #{original_record.id} from {original_record.completion_date}"
new_record.save()
messages.success(
request,
f'Training renewal created for {original_record.employee.get_full_name()}. '
f'Please assign to an appropriate session.'
)
return redirect('hr:training_record_detail', pk=new_record.pk)
context = {
'record': original_record,
'available_sessions': TrainingSession.objects.filter(
program=original_record.program,
start_at__gte=timezone.now()
).order_by('start_at')
}
return render(request, 'hr/training/renew_record.html', context)
@login_required
def training_record_mark_expired(request, record_id):
"""Mark a training record as expired."""
record = get_object_or_404(
TrainingRecord,
pk=record_id,
employee__tenant=request.user.tenant
)
if request.method == 'POST':
# Update record status
record.status = 'FAILED' # Using FAILED status to indicate expired/invalid
# Add expiry note
expiry_reason = request.POST.get('reason', 'Marked as expired')
existing_notes = record.notes or ''
record.notes = f"{existing_notes}\n\nExpired: {expiry_reason} on {timezone.now().date()}".strip()
record.save()
# Mark related certificate as expired if exists
certificate = TrainingCertificates.objects.filter(enrollment=record).first()
if certificate:
certificate.expiry_date = timezone.now().date()
certificate.save()
messages.success(request, f'Training record marked as expired for {record.employee.get_full_name()}.')
return redirect('hr:training_record_detail', pk=record.pk)
context = {'record': record}
return render(request, 'hr/training/mark_expired.html', context)
@login_required
def training_record_archive(request, record_id):
"""Archive a training record (soft delete)."""
record = get_object_or_404(
TrainingRecord,
pk=record_id,
employee__tenant=request.user.tenant
)
if request.method == 'POST':
# Add archive note instead of deleting
archive_reason = request.POST.get('reason', 'Archived by user')
existing_notes = record.notes or ''
record.notes = f"{existing_notes}\n\nArchived: {archive_reason} on {timezone.now().date()}".strip()
# Change status to indicate archived
record.status = 'CANCELLED'
record.save()
messages.success(request, f'Training record archived for {record.employee.get_full_name()}.')
return redirect('hr:training_record_list')
context = {'record': record}
return render(request, 'hr/training/archive_record.html', context)
@login_required
def training_record_duplicate(request, record_id):
"""Duplicate a training record for the same or different employee."""
original_record = get_object_or_404(
TrainingRecord,
pk=record_id,
employee__tenant=request.user.tenant
)
if request.method == 'POST':
# Get target employee (default to same employee)
employee_id = request.POST.get('employee_id', original_record.employee.id)
target_employee = get_object_or_404(
Employee,
pk=employee_id,
tenant=request.user.tenant
)
# Check if employee already has this training
existing_record = TrainingRecord.objects.filter(
employee=target_employee,
program=original_record.program,
status__in=['SCHEDULED', 'IN_PROGRESS', 'COMPLETED']
).first()
if existing_record:
messages.warning(
request,
f'{target_employee.get_full_name()} already has an active record for this training program.'
)
return redirect('hr:training_record_detail', pk=original_record.pk)
# Create duplicate record
new_record = TrainingRecord.objects.create(
employee=target_employee,
program=original_record.program,
session=None, # Will need to be assigned to a session
status='SCHEDULED',
created_by=request.user
)
# Add duplication note
new_record.notes = f"Duplicated from training record #{original_record.id} for {original_record.employee.get_full_name()}"
new_record.save()
messages.success(
request,
f'Training record duplicated for {target_employee.get_full_name()}. '
f'Please assign to an appropriate session.'
)
return redirect('hr:training_record_detail', pk=new_record.pk)
context = {
'record': original_record,
'employees': Employee.objects.filter(
tenant=request.user.tenant,
employment_status='ACTIVE'
).order_by('last_name', 'first_name'),
'available_sessions': TrainingSession.objects.filter(
program=original_record.program,
start_at__gte=timezone.now()
).order_by('start_at')
}
return render(request, 'hr/training/duplicate_record.html', context)
@require_http_methods(["GET"])
def get_program_sessions(request):
"""
AJAX endpoint to get sessions for a specific training program.
"""
program_id = request.GET.get('program_id')
if not program_id:
return JsonResponse({'sessions': []})
try:
program = TrainingPrograms.objects.get(
id=program_id,
tenant=request.user.tenant
)
sessions = TrainingSession.objects.filter(
program=program,
start_at__gte=timezone.now()
).order_by('start_at')
sessions_data = []
for session in sessions:
sessions_data.append({
'id': session.id,
'title': session.title or session.program.name,
'start_at': session.start_at.strftime('%Y-%m-%d %H:%M'),
'end_at': session.end_at.strftime('%Y-%m-%d %H:%M'),
'location': session.location or '',
'delivery_method': session.get_delivery_method_display(),
'capacity': session.capacity or 0,
'enrolled_count': session.enrollments.count()
})
return JsonResponse({'sessions': sessions_data})
except TrainingPrograms.DoesNotExist:
return JsonResponse({'sessions': []})
@require_http_methods(["GET"])
def get_program_details(request):
"""
AJAX endpoint to get details for a specific training program.
"""
program_id = request.GET.get('program_id')
if not program_id:
return JsonResponse({'error': 'Program ID required'}, status=400)
try:
program = TrainingPrograms.objects.get(
id=program_id,
tenant=request.user.tenant
)
program_data = {
'id': program.id,
'name': program.name,
'description': program.description,
'program_type': program.program_type,
'duration_hours': float(program.duration_hours),
'cost': float(program.cost),
'is_certified': program.is_certified,
'validity_days': program.validity_days,
}
return JsonResponse(program_data)
except TrainingPrograms.DoesNotExist:
return JsonResponse({'error': 'Program not found'}, status=404)
@login_required
@require_http_methods(["GET"])
def check_time_entry_conflicts(request):
"""
AJAX endpoint to check for conflicting time entries.
"""
employee_id = request.GET.get('employee_id')
date_str = request.GET.get('date')
start_time_str = request.GET.get('start_time')
end_time_str = request.GET.get('end_time')
exclude_id = request.GET.get('exclude_id')
if not all([employee_id, date_str, start_time_str, end_time_str]):
return JsonResponse({'error': 'Missing required parameters'}, status=400)
try:
employee = Employee.objects.get(id=employee_id, tenant=request.user.tenant)
date = timezone.datetime.fromisoformat(date_str).date()
start_time = timezone.datetime.fromisoformat(f"{date_str}T{start_time_str}").time()
end_time = timezone.datetime.fromisoformat(f"{date_str}T{end_time_str}").time()
# Convert to datetime for comparison
start_datetime = timezone.datetime.combine(date, start_time)
end_datetime = timezone.datetime.combine(date, end_time)
# Check for overlapping time entries
conflicting_entries = TimeEntry.objects.filter(
employee=employee,
work_date=date
).exclude(
clock_out_time__isnull=True
)
if exclude_id:
conflicting_entries = conflicting_entries.exclude(id=exclude_id)
conflicts = []
for entry in conflicting_entries:
if entry.clock_in_time and entry.clock_out_time:
# Check for time overlap
if (entry.clock_in_time < end_datetime and entry.clock_out_time > start_datetime):
conflicts.append({
'id': entry.id,
'start_time': entry.clock_in_time.strftime('%H:%M'),
'end_time': entry.clock_out_time.strftime('%H:%M'),
'entry_type': entry.get_entry_type_display(),
'status': entry.get_status_display()
})
if conflicts:
message = f"Found {len(conflicts)} conflicting time entr{'y' if len(conflicts) == 1 else 'ies'} for this time period."
return JsonResponse({
'conflicts': True,
'message': message,
'conflicting_entries': conflicts
})
else:
return JsonResponse({
'conflicts': False,
'message': 'No conflicts found.'
})
except (Employee.DoesNotExist, ValueError) as e:
return JsonResponse({'error': 'Invalid parameters'}, status=400)
@login_required
@require_http_methods(["GET"])
def get_employee_schedule_assignments(request):
"""
AJAX endpoint to get schedule assignments for an employee on a specific date.
"""
employee_id = request.GET.get('employee_id')
date_str = request.GET.get('date')
if not all([employee_id, date_str]):
return JsonResponse({'error': 'Missing required parameters'}, status=400)
try:
employee = Employee.objects.get(id=employee_id, tenant=request.user.tenant)
date = timezone.datetime.fromisoformat(date_str).date()
# Get schedule assignments for this employee on this date
assignments = ScheduleAssignment.objects.filter(
schedule__employee=employee,
assignment_date=date,
status__in=['SCHEDULED', 'CONFIRMED']
).select_related('schedule', 'department').order_by('start_time')
assignments_data = []
for assignment in assignments:
assignments_data.append({
'id': assignment.id,
'shift_type': assignment.get_shift_type_display(),
'start_time': assignment.start_time.strftime('%H:%M'),
'end_time': assignment.end_time.strftime('%H:%M'),
'department': assignment.department.name if assignment.department else '',
'location': assignment.location or '',
'status': assignment.get_status_display(),
'hours': float(assignment.total_hours)
})
return JsonResponse({
'assignments': assignments_data,
'count': len(assignments_data)
})
except Employee.DoesNotExist:
return JsonResponse({'error': 'Employee not found'}, status=404)
except ValueError:
return JsonResponse({'error': 'Invalid date format'}, status=400)
@login_required
@require_http_methods(["GET"])
def get_schedule_assignment_details(request):
"""
AJAX endpoint to get details of a specific schedule assignment.
"""
assignment_id = request.GET.get('assignment_id')
if not assignment_id:
return JsonResponse({'error': 'Assignment ID required'}, status=400)
try:
assignment = ScheduleAssignment.objects.get(
id=assignment_id,
schedule__employee__tenant=request.user.tenant
)
assignment_data = {
'id': assignment.id,
'shift_type': assignment.get_shift_type_display(),
'start_time': assignment.start_time.strftime('%H:%M'),
'end_time': assignment.end_time.strftime('%H:%M'),
'department': assignment.department.name if assignment.department else '',
'location': assignment.location or '',
'status': assignment.get_status_display(),
'hours': float(assignment.total_hours),
'break_minutes': assignment.break_minutes,
'lunch_minutes': assignment.lunch_minutes,
'notes': assignment.notes or '',
'assignment_date': assignment.assignment_date.strftime('%Y-%m-%d'),
'schedule_name': assignment.schedule.name,
'employee_name': assignment.schedule.employee.get_full_name()
}
return JsonResponse(assignment_data)
except ScheduleAssignment.DoesNotExist:
return JsonResponse({'error': 'Schedule assignment not found'}, status=404)
@login_required
def export_assignments(request):
"""
Export schedule assignments in various formats (PDF, Excel, CSV).
"""
from django.http import HttpResponse
import csv
import io
from datetime import datetime
# Get the format from query parameters
export_format = request.GET.get('format', 'csv').lower()
# Get filtered queryset based on the same filters as the list view
queryset = ScheduleAssignment.objects.filter(
schedule__employee__tenant=request.user.tenant
).select_related('schedule__employee', 'department', 'schedule')
# Apply filters
employee = request.GET.get('employee')
if employee:
queryset = queryset.filter(schedule__employee_id=employee)
department = request.GET.get('department')
if department:
queryset = queryset.filter(department_id=department)
shift_type = request.GET.get('shift_type')
if shift_type:
queryset = queryset.filter(shift_type=shift_type)
date_range = request.GET.get('date_range')
if date_range and ' - ' in date_range:
start_date, end_date = date_range.split(' - ')
queryset = queryset.filter(
assignment_date__gte=start_date,
assignment_date__lte=end_date
)
search = request.GET.get('search')
if search:
queryset = queryset.filter(
Q(schedule__employee__first_name__icontains=search) |
Q(schedule__employee__last_name__icontains=search) |
Q(schedule__name__icontains=search)
)
queryset = queryset.order_by('assignment_date', 'start_time')
if export_format == 'csv':
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = f'attachment; filename="schedule_assignments_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv"'
writer = csv.writer(response)
writer.writerow([
'Employee ID', 'Employee Name', 'Department', 'Schedule',
'Date', 'Start Time', 'End Time', 'Shift Type', 'Status', 'Hours'
])
for assignment in queryset:
writer.writerow([
assignment.schedule.employee.employee_id,
assignment.schedule.employee.get_full_name(),
assignment.department.name if assignment.department else '',
assignment.schedule.name,
assignment.assignment_date.strftime('%Y-%m-%d'),
assignment.start_time.strftime('%H:%M'),
assignment.end_time.strftime('%H:%M'),
assignment.get_shift_type_display(),
assignment.get_status_display(),
f"{assignment.total_hours:.2f}"
])
return response
elif export_format == 'excel':
try:
import openpyxl
from openpyxl.utils import get_column_letter
from openpyxl.styles import Font, PatternFill
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Schedule Assignments"
# Headers
headers = [
'Employee ID', 'Employee Name', 'Department', 'Schedule',
'Date', 'Start Time', 'End Time', 'Shift Type', 'Status', 'Hours'
]
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
cell.font = Font(bold=True)
cell.fill = PatternFill(start_color="CCCCCC", end_color="CCCCCC", fill_type="solid")
# Data
for row, assignment in enumerate(queryset, 2):
ws.cell(row=row, column=1, value=assignment.schedule.employee.employee_id)
ws.cell(row=row, column=2, value=assignment.schedule.employee.get_full_name())
ws.cell(row=row, column=3, value=assignment.department.name if assignment.department else '')
ws.cell(row=row, column=4, value=assignment.schedule.name)
ws.cell(row=row, column=5, value=assignment.assignment_date.strftime('%Y-%m-%d'))
ws.cell(row=row, column=6, value=assignment.start_time.strftime('%H:%M'))
ws.cell(row=row, column=7, value=assignment.end_time.strftime('%H:%M'))
ws.cell(row=row, column=8, value=assignment.get_shift_type_display())
ws.cell(row=row, column=9, value=assignment.get_status_display())
ws.cell(row=row, column=10, value=float(assignment.total_hours))
# Auto-adjust column widths
for col in range(1, len(headers) + 1):
ws.column_dimensions[get_column_letter(col)].auto_size = True
response = HttpResponse(
content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
response['Content-Disposition'] = f'attachment; filename="schedule_assignments_{datetime.now().strftime("%Y%m%d_%H%M%S")}.xlsx"'
wb.save(response)
return response
except ImportError:
messages.error(request, 'Excel export requires openpyxl package.')
return redirect('hr:schedule_assignment_list')
elif export_format == 'pdf':
try:
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter, A4
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
response = HttpResponse(content_type='application/pdf')
response['Content-Disposition'] = f'attachment; filename="schedule_assignments_{datetime.now().strftime("%Y%m%d_%H%M%S")}.pdf"'
doc = SimpleDocTemplate(response, pagesize=A4)
elements = []
styles = getSampleStyleSheet()
# Title
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=16,
spaceAfter=30,
alignment=1 # Center alignment
)
title = Paragraph("Schedule Assignments Report", title_style)
elements.append(title)
elements.append(Spacer(1, 12))
# Table data
data = [['Employee', 'Department', 'Date', 'Time', 'Shift', 'Status']]
for assignment in queryset:
data.append([
assignment.schedule.employee.get_full_name(),
assignment.department.name if assignment.department else '',
assignment.assignment_date.strftime('%m/%d/%Y'),
f"{assignment.start_time.strftime('%H:%M')}-{assignment.end_time.strftime('%H:%M')}",
assignment.get_shift_type_display(),
assignment.get_status_display()
])
# Create table
table = Table(data)
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 10),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('FONTNAME', (0, 1), (-1, -1), 'Helvetica'),
('FONTSIZE', (0, 1), (-1, -1), 8),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
elements.append(table)
doc.build(elements)
return response
except ImportError:
messages.error(request, 'PDF export requires reportlab package.')
return redirect('hr:schedule_assignment_list')
else:
messages.error(request, 'Invalid export format.')
return redirect('hr:schedule_assignment_list')
@login_required
def my_profile(request):
"""
Employee self-service profile dashboard.
"""
if not hasattr(request.user, 'employee_profile'):
messages.error(request, 'You must have an employee profile to access this page.')
return redirect('hr:dashboard')
employee = request.user.employee_profile
current_year = date.today().year
# Leave balance
leave_balances = LeaveBalance.objects.filter(
employee=employee,
year=current_year
).select_related('leave_type')
total_leave_balance = sum(balance.available for balance in leave_balances)
# Pending requests
pending_requests_count = LeaveRequest.objects.filter(
employee=employee,
status='PENDING'
).count()
# Completed training
completed_training_count = TrainingRecord.objects.filter(
employee=employee,
status='COMPLETED'
).count()
# Check if manager
is_manager = Employee.objects.filter(
supervisor=employee,
employment_status='ACTIVE'
).exists()
# Pending approvals for managers
pending_approvals_count = 0
if is_manager:
pending_approvals_count = LeaveRequest.objects.filter(
current_approver=employee,
status='PENDING'
).count()
# Recent activity (simplified)
recent_activities = []
# Recent leave requests
recent_leaves = LeaveRequest.objects.filter(
employee=employee
).order_by('-created_at')[:5]
for leave in recent_leaves:
recent_activities.append({
'description': f"Leave request ({leave.leave_type.name}) - {leave.get_status_display()}",
'created_at': leave.created_at
})
context = {
'employee': employee,
'total_leave_balance': total_leave_balance,
'pending_requests_count': pending_requests_count,
'completed_training_count': completed_training_count,
'is_manager': is_manager,
'pending_approvals_count': pending_approvals_count,
'recent_activities': recent_activities[:10]
}
return render(request, 'hr/self_service/my_profile.html', context)
@login_required
def my_profile_edit(request):
"""
Edit employee profile.
"""
if not hasattr(request.user, 'employee_profile'):
messages.error(request, 'You must have an employee profile.')
return redirect('hr:dashboard')
employee = request.user.employee_profile
if request.method == 'POST':
form = EmployeeForm(request.POST, request.FILES, instance=employee, user=request.user)
if form.is_valid():
form.save()
messages.success(request, 'Profile updated successfully.')
return redirect('hr:my_profile')
else:
form = EmployeeForm(instance=employee, user=request.user)
context = {'form': form, 'employee': employee}
return render(request, 'hr/employees/employee_form.html', context)
@login_required
def clock_in_out(request):
"""
Clock in/out interface for employees.
"""
if not hasattr(request.user, 'employee_profile'):
messages.error(request, 'You must have an employee profile.')
return redirect('hr:dashboard')
employee = request.user.employee_profile
today = timezone.now().date()
# Get today's time entry
current_entry = TimeEntry.objects.filter(
employee=employee,
work_date=today
).order_by('-clock_in_time').first()
# Calculate hours worked
hours_worked = 0
if current_entry and current_entry.clock_in_time and not current_entry.clock_out_time:
time_diff = timezone.now() - current_entry.clock_in_time
hours_worked = time_diff.total_seconds() / 3600
# Recent entries
recent_entries = TimeEntry.objects.filter(
employee=employee
).order_by('-work_date')[:7]
context = {
'employee': employee,
'current_entry': current_entry,
'hours_worked': hours_worked,
'recent_entries': recent_entries
}
return render(request, 'hr/self_service/clock_in_out.html', context)
@login_required
@require_POST
def clock_in(request):
"""
Clock in action.
"""
if not hasattr(request.user, 'employee_profile'):
return JsonResponse({'success': False, 'message': 'No employee profile'}, status=400)
employee = request.user.employee_profile
today = timezone.now().date()
# Check if already clocked in
existing_entry = TimeEntry.objects.filter(
employee=employee,
work_date=today,
clock_out_time__isnull=True
).first()
if existing_entry:
messages.warning(request, 'You are already clocked in.')
return redirect('hr:clock_in_out')
# Create new time entry
TimeEntry.objects.create(
employee=employee,
work_date=today,
clock_in_time=timezone.now(),
status='DRAFT',
department=employee.department
)
messages.success(request, 'Clocked in successfully.')
return redirect('hr:clock_in_out')
@login_required
@require_POST
def clock_out(request):
"""
Clock out action.
"""
if not hasattr(request.user, 'employee_profile'):
return JsonResponse({'success': False, 'message': 'No employee profile'}, status=400)
employee = request.user.employee_profile
today = timezone.now().date()
# Find open time entry
time_entry = TimeEntry.objects.filter(
employee=employee,
work_date=today,
clock_out_time__isnull=True
).first()
if not time_entry:
messages.error(request, 'No active clock-in found.')
return redirect('hr:clock_in_out')
# Update time entry
time_entry.clock_out_time = timezone.now()
time_entry.save()
messages.success(request, 'Clocked out successfully.')
return redirect('hr:clock_in_out')
@login_required
@require_POST
def start_break(request):
"""Start break."""
if not hasattr(request.user, 'employee_profile'):
return redirect('hr:clock_in_out')
employee = request.user.employee_profile
today = timezone.now().date()
time_entry = TimeEntry.objects.filter(
employee=employee,
work_date=today,
clock_out_time__isnull=True
).first()
if time_entry:
time_entry.break_start_time = timezone.now()
time_entry.save()
messages.success(request, 'Break started.')
return redirect('hr:clock_in_out')
@login_required
@require_POST
def end_break(request):
"""End break."""
if not hasattr(request.user, 'employee_profile'):
return redirect('hr:clock_in_out')
employee = request.user.employee_profile
today = timezone.now().date()
time_entry = TimeEntry.objects.filter(
employee=employee,
work_date=today,
clock_out_time__isnull=True
).first()
if time_entry:
time_entry.break_end_time = timezone.now()
time_entry.save()
messages.success(request, 'Break ended.')
return redirect('hr:clock_in_out')
@login_required
@require_POST
def start_lunch(request):
"""Start lunch."""
if not hasattr(request.user, 'employee_profile'):
return redirect('hr:clock_in_out')
employee = request.user.employee_profile
today = timezone.now().date()
time_entry = TimeEntry.objects.filter(
employee=employee,
work_date=today,
clock_out_time__isnull=True
).first()
if time_entry:
time_entry.lunch_start_time = timezone.now()
time_entry.save()
messages.success(request, 'Lunch started.')
return redirect('hr:clock_in_out')
@login_required
@require_POST
def end_lunch(request):
"""End lunch."""
if not hasattr(request.user, 'employee_profile'):
return redirect('hr:clock_in_out')
employee = request.user.employee_profile
today = timezone.now().date()
time_entry = TimeEntry.objects.filter(
employee=employee,
work_date=today,
clock_out_time__isnull=True
).first()
if time_entry:
time_entry.lunch_end_time = timezone.now()
time_entry.save()
messages.success(request, 'Lunch ended.')
return redirect('hr:clock_in_out')
@login_required
def my_timesheet(request):
"""
Employee timesheet view.
"""
if not hasattr(request.user, 'employee_profile'):
messages.error(request, 'You must have an employee profile.')
return redirect('hr:dashboard')
employee = request.user.employee_profile
# Get week parameter or default to current week
week_offset = int(request.GET.get('week', 0))
today = date.today()
start_date = today - timedelta(days=today.weekday()) + timedelta(weeks=week_offset)
end_date = start_date + timedelta(days=6)
# Get time entries for the week
time_entries = TimeEntry.objects.filter(
employee=employee,
work_date__gte=start_date,
work_date__lte=end_date
).order_by('work_date')
# Calculate totals
total_hours = sum(entry.total_hours for entry in time_entries)
regular_hours = sum(entry.regular_hours for entry in time_entries)
overtime_hours = sum(entry.overtime_hours for entry in time_entries)
days_worked = time_entries.filter(clock_out_time__isnull=False).count()
context = {
'employee': employee,
'time_entries': time_entries,
'start_date': start_date,
'end_date': end_date,
'total_hours': total_hours,
'regular_hours': regular_hours,
'overtime_hours': overtime_hours,
'total_regular': regular_hours,
'total_overtime': overtime_hours,
'days_worked': days_worked,
'prev_week': week_offset - 1,
'next_week': week_offset + 1,
'period_type': 'week'
}
return render(request, 'hr/self_service/my_timesheet.html', context)
@login_required
def my_leave_dashboard(request):
"""
Employee leave dashboard.
"""
if not hasattr(request.user, 'employee_profile'):
messages.error(request, 'You must have an employee profile.')
return redirect('hr:dashboard')
employee = request.user.employee_profile
current_year = date.today().year
today = date.today()
# Leave balances
leave_balances = LeaveBalance.objects.filter(
employee=employee,
year=current_year
).select_related('leave_type')
# Leave requests
leave_requests = LeaveRequest.objects.filter(
employee=employee
).select_related('leave_type').order_by('-created_at')[:10]
# Upcoming leaves
upcoming_leaves = LeaveRequest.objects.filter(
employee=employee,
status='APPROVED',
start_date__gte=today
).select_related('leave_type').order_by('start_date')[:5]
# Calculate totals
total_entitled = sum(balance.total_entitled for balance in leave_balances)
total_used = sum(balance.used for balance in leave_balances)
total_pending = sum(balance.pending for balance in leave_balances)
total_available = sum(balance.available for balance in leave_balances)
context = {
'employee': employee,
'leave_balances': leave_balances,
'leave_requests': leave_requests,
'upcoming_leaves': upcoming_leaves,
'total_entitled': total_entitled,
'total_used': total_used,
'total_pending': total_pending,
'total_available': total_available
}
return render(request, 'hr/self_service/my_leave_dashboard.html', context)
@login_required
def pending_approvals(request):
"""
Manager pending approvals dashboard.
"""
if not hasattr(request.user, 'employee_profile'):
messages.error(request, 'You must have an employee profile.')
return redirect('hr:dashboard')
employee = request.user.employee_profile
today = date.today()
# Check if user is a manager
team_members = Employee.objects.filter(
supervisor=employee,
employment_status='ACTIVE'
)
if not team_members.exists():
messages.warning(request, 'You do not have any team members to manage.')
return redirect('hr:my_profile')
# Pending leave requests
pending_leave_requests = LeaveRequest.objects.filter(
current_approver=employee,
status='PENDING'
).select_related('employee', 'leave_type').order_by('submitted_at')
# Pending timesheets
pending_timesheets = TimeEntry.objects.filter(
employee__in=team_members,
status='SUBMITTED'
).select_related('employee').order_by('work_date')
# Approved today
approved_today_count = LeaveRequest.objects.filter(
final_approver=employee,
approved_at__date=today
).count()
context = {
'employee': employee,
'pending_leave_requests': pending_leave_requests,
'pending_timesheets': pending_timesheets,
'team_members': team_members,
'pending_leave_count': pending_leave_requests.count(),
'pending_timesheet_count': pending_timesheets.count(),
'team_count': team_members.count(),
'approved_today_count': approved_today_count
}
return render(request, 'hr/self_service/pending_approvals.html', context)
@login_required
@transaction.atomic
def approve_leave_request(request, pk):
"""
Approve a leave request.
"""
leave_request = get_object_or_404(
LeaveRequest,
pk=pk,
employee__tenant=request.user.tenant,
status='PENDING'
)
# Check if user can approve
if not hasattr(request.user, 'employee_profile'):
messages.error(request, 'You must have an employee profile to approve leave requests.')
return redirect('hr:leave_request_detail', pk=pk)
if leave_request.current_approver != request.user.employee_profile:
messages.error(request, 'You are not authorized to approve this leave request.')
return redirect('hr:leave_request_detail', pk=pk)
if request.method == 'POST':
form = LeaveApprovalForm(request.POST)
if form.is_valid():
# Create approval record
approval = form.save(commit=False)
approval.leave_request = leave_request
approval.approver = request.user.employee_profile
approval.level = 1 # Simple single-level approval for now
approval.action_date = timezone.now()
approval.save()
if approval.action == 'APPROVED':
# Update leave request
leave_request.status = 'APPROVED'
leave_request.final_approver = request.user.employee_profile
leave_request.approved_at = timezone.now()
leave_request.save()
# Update leave balance
try:
balance = LeaveBalance.objects.get(
employee=leave_request.employee,
leave_type=leave_request.leave_type,
year=leave_request.start_date.year
)
balance.used += leave_request.total_days
balance.pending -= leave_request.total_days
balance.save()
except LeaveBalance.DoesNotExist:
pass
messages.success(request, 'Leave request approved successfully.')
else:
# Rejected
leave_request.status = 'REJECTED'
leave_request.rejected_at = timezone.now()
leave_request.rejection_reason = approval.comments
leave_request.save()
# Update leave balance (remove from pending)
try:
balance = LeaveBalance.objects.get(
employee=leave_request.employee,
leave_type=leave_request.leave_type,
year=leave_request.start_date.year
)
balance.pending -= leave_request.total_days
balance.save()
except LeaveBalance.DoesNotExist:
pass
messages.success(request, 'Leave request rejected.')
return redirect('hr:leave_request_detail', pk=pk)
else:
form = LeaveApprovalForm()
context = {
'leave_request': leave_request,
'form': form
}
return render(request, 'hr/leave/leave_request_approve.html', context)
@login_required
@transaction.atomic
def cancel_leave_request(request, pk):
"""
Cancel a leave request.
"""
leave_request = get_object_or_404(
LeaveRequest,
pk=pk,
employee__tenant=request.user.tenant
)
# Check if user can cancel
if not hasattr(request.user, 'employee_profile'):
messages.error(request, 'You must have an employee profile to cancel leave requests.')
return redirect('hr:leave_request_detail', pk=pk)
if leave_request.employee != request.user.employee_profile:
messages.error(request, 'You can only cancel your own leave requests.')
return redirect('hr:leave_request_detail', pk=pk)
if not leave_request.can_cancel:
messages.error(request, 'This leave request cannot be cancelled.')
return redirect('hr:leave_request_detail', pk=pk)
if request.method == 'POST':
form = LeaveCancellationForm(request.POST, leave_request=leave_request)
if form.is_valid():
leave_request.status = 'CANCELLED'
leave_request.cancelled_at = timezone.now()
leave_request.cancelled_by = request.user
leave_request.cancellation_reason = form.cleaned_data['cancellation_reason']
leave_request.save()
# Update leave balance
try:
balance = LeaveBalance.objects.get(
employee=leave_request.employee,
leave_type=leave_request.leave_type,
year=leave_request.start_date.year
)
if leave_request.status == 'APPROVED':
# Return used days
balance.used -= leave_request.total_days
else:
# Remove from pending
balance.pending -= leave_request.total_days
balance.save()
except LeaveBalance.DoesNotExist:
pass
messages.success(request, 'Leave request cancelled successfully.')
return redirect('hr:leave_request_detail', pk=pk)
else:
form = LeaveCancellationForm(leave_request=leave_request)
context = {
'leave_request': leave_request,
'form': form
}
return render(request, 'hr/leave/leave_request_cancel.html', context)
@login_required
def employee_leave_balance(request, employee_id):
"""
View leave balance for a specific employee.
"""
employee = get_object_or_404(
Employee,
pk=employee_id,
tenant=request.user.tenant
)
current_year = date.today().year
balances = LeaveBalance.objects.filter(
employee=employee,
year=current_year
).select_related('leave_type').order_by('leave_type__name')
# Get leave history
leave_history = LeaveRequest.objects.filter(
employee=employee,
status__in=['APPROVED', 'COMPLETED']
).select_related('leave_type').order_by('-start_date')[:10]
context = {
'employee': employee,
'balances': balances,
'leave_history': leave_history,
'current_year': current_year
}
return render(request, 'hr/leave/employee_leave_balance.html', context)
@login_required
def deactivate_delegation(request, pk):
"""
Deactivate a leave delegation.
"""
delegation = get_object_or_404(
LeaveDelegate,
pk=pk,
delegator__tenant=request.user.tenant
)
# Check if user can deactivate
if not hasattr(request.user, 'employee_profile'):
messages.error(request, 'You must have an employee profile.')
return redirect('hr:leave_delegate_list')
if delegation.delegator != request.user.employee_profile:
messages.error(request, 'You can only deactivate your own delegations.')
return redirect('hr:leave_delegate_list')
if request.method == 'POST':
delegation.is_active = False
delegation.save()
messages.success(request, 'Delegation deactivated successfully.')
return redirect('hr:leave_delegate_list')
context = {'delegation': delegation}
return render(request, 'hr/leave/leave_delegate_deactivate.html', context)
@login_required
def leave_calendar(request):
"""
Display leave calendar view.
"""
# Get date range from query params or default to current month
year = int(request.GET.get('year', date.today().year))
month = int(request.GET.get('month', date.today().month))
# Get all approved leave requests for the month
start_date = date(year, month, 1)
if month == 12:
end_date = date(year + 1, 1, 1)
else:
end_date = date(year, month + 1, 1)
leave_requests = LeaveRequest.objects.filter(
employee__tenant=request.user.tenant,
status='APPROVED',
start_date__lt=end_date,
end_date__gte=start_date
).select_related('employee', 'leave_type')
context = {
'year': year,
'month': month,
'leave_requests': leave_requests,
'current_date': date.today()
}
return render(request, 'hr/leave/leave_calendar.html', context)
@login_required
def leave_dashboard(request):
"""
Leave management dashboard.
"""
tenant = request.user.tenant
today = date.today()
current_year = today.year
# Statistics
total_requests = LeaveRequest.objects.filter(
employee__tenant=tenant,
start_date__year=current_year
).count()
pending_requests = LeaveRequest.objects.filter(
employee__tenant=tenant,
status='PENDING'
).count()
approved_requests = LeaveRequest.objects.filter(
employee__tenant=tenant,
status='APPROVED',
start_date__year=current_year
).count()
# Requests requiring approval (for current user)
my_pending_approvals = 0
if hasattr(request.user, 'employee_profile'):
my_pending_approvals = LeaveRequest.objects.filter(
current_approver=request.user.employee_profile,
status='PENDING'
).count()
# Recent requests
recent_requests = LeaveRequest.objects.filter(
employee__tenant=tenant
).select_related('employee', 'leave_type').order_by('-created_at')[:10]
# Upcoming leaves
upcoming_leaves = LeaveRequest.objects.filter(
employee__tenant=tenant,
status='APPROVED',
start_date__gte=today
).select_related('employee', 'leave_type').order_by('start_date')[:10]
# Current leaves (employees on leave today)
current_leaves = LeaveRequest.objects.filter(
employee__tenant=tenant,
status='APPROVED',
start_date__lte=today,
end_date__gte=today
).select_related('employee', 'leave_type')
context = {
'total_requests': total_requests,
'pending_requests': pending_requests,
'approved_requests': approved_requests,
'my_pending_approvals': my_pending_approvals,
'recent_requests': recent_requests,
'upcoming_leaves': upcoming_leaves,
'current_leaves': current_leaves,
'current_year': current_year
}
return render(request, 'hr/leave/leave_dashboard.html', context)
@login_required
def salary_history(request, employee_id):
"""
View complete salary history for an employee.
"""
tenant = request.user.tenant
employee = get_object_or_404(
Employee,
pk=employee_id,
tenant=request.user.tenant
)
# Get all salary records
salary_records = SalaryInformation.objects.filter(
employee=employee,
employee__tenant=tenant
).order_by('-effective_date')
# Get all salary adjustments
adjustments = SalaryAdjustment.objects.filter(
employee=employee,
employee__tenant=tenant
).select_related('previous_salary', 'new_salary', 'approved_by').order_by('-created_at')
# Calculate statistics
current_salary = salary_records.filter(is_active=True).first()
total_adjustments = adjustments.count()
total_increase = sum(adj.amount_change for adj in adjustments if adj.amount_change > 0)
context = {
'employee': employee,
'salary_records': salary_records,
'adjustments': adjustments,
'current_salary': current_salary,
'total_adjustments': total_adjustments,
'total_increase': total_increase
}
return render(request, 'hr/salary/salary_history.html', context)
@login_required
@transaction.atomic
def cancel_document_request(request, pk):
"""
Cancel a document request.
"""
tenant = request.user.tenant
document_request = get_object_or_404(
DocumentRequest,
pk=pk,
employee__tenant=tenant
)
# Check if user can cancel
if not hasattr(request.user, 'employee_profile'):
messages.error(request, 'You must have an employee profile.')
return redirect('hr:document_request_detail', pk=pk)
if document_request.employee != request.user.employee_profile:
messages.error(request, 'You can only cancel your own document requests.')
return redirect('hr:document_request_detail', pk=pk)
if document_request.status not in ['PENDING', 'IN_PROGRESS']:
messages.error(request, 'This document request cannot be cancelled.')
return redirect('hr:document_request_detail', pk=pk)
if request.method == 'POST':
cancellation_reason = request.POST.get('cancellation_reason', '')
document_request.status = 'CANCELLED'
document_request.notes = f"{document_request.notes or ''}\n\nCancelled: {cancellation_reason}".strip()
document_request.save()
messages.success(request, 'Document request cancelled successfully.')
return redirect('hr:document_request_detail', pk=pk)
context = {'document_request': document_request}
return render(request, 'hr/documents/document_request_cancel.html', context)
@login_required
@transaction.atomic
def process_document_request(request, pk):
"""
Process a document request (HR staff only).
"""
tenant = request.user.tenant
document_request = get_object_or_404(
DocumentRequest,
pk=pk,
employee__tenant=tenant
)
# Check permissions
if not request.user.has_perm('hr.change_documentrequest'):
messages.error(request, 'You do not have permission to process document requests.')
return redirect('hr:document_request_detail', pk=pk)
if document_request.status not in ['PENDING', 'IN_PROGRESS']:
messages.error(request, 'This document request cannot be processed.')
return redirect('hr:document_request_detail', pk=pk)
if request.method == 'POST':
action = request.POST.get('action')
notes = request.POST.get('notes', '')
if action == 'start':
document_request.status = 'IN_PROGRESS'
document_request.processed_by = request.user
document_request.notes = f"{document_request.notes or ''}\n\nProcessing started: {notes}".strip()
messages.success(request, 'Document request processing started.')
elif action == 'ready':
document_request.status = 'READY'
document_request.processed_by = request.user
document_request.processed_at = timezone.now()
document_request.notes = f"{document_request.notes or ''}\n\nMarked as ready: {notes}".strip()
messages.success(request, 'Document request marked as ready.')
elif action == 'reject':
rejection_reason = request.POST.get('rejection_reason', '')
document_request.status = 'REJECTED'
document_request.processed_by = request.user
document_request.notes = f"{document_request.notes or ''}\n\nRejected: {rejection_reason}".strip()
messages.success(request, 'Document request rejected.')
document_request.save()
return redirect('hr:document_request_detail', pk=pk)
context = {'document_request': document_request}
return render(request, 'hr/documents/document_request_process.html', context)
@login_required
def generate_document(request, pk):
"""
Generate document PDF from template.
"""
tenant = request.user.tenant
document_request = get_object_or_404(
DocumentRequest,
pk=pk,
employee__tenant=tenant
)
# Check permissions
if not request.user.has_perm('hr.change_documentrequest'):
messages.error(request, 'You do not have permission to generate documents.')
return redirect('hr:document_request_detail', pk=pk)
if document_request.status not in ['IN_PROGRESS', 'READY']:
messages.error(request, 'Document must be in progress or ready to generate.')
return redirect('hr:document_request_detail', pk=pk)
# Find appropriate template
template = DocumentTemplate.objects.filter(
tenant=tenant,
document_type=document_request.document_type,
language=document_request.language,
is_active=True
).first()
if not template:
messages.error(request, 'No template found for this document type and language.')
return redirect('hr:document_request_detail', pk=pk)
# Prepare context for template rendering
employee = document_request.employee
context_data = {
'employee_name': employee.get_full_name(),
'employee_id': employee.employee_id,
'job_title': employee.job_title,
'department': employee.department.name if employee.department else '',
'hire_date': employee.hire_date.strftime('%Y-%m-%d') if employee.hire_date else '',
'nationality': employee.nationality or '',
'document_date': timezone.now().date().strftime('%Y-%m-%d'),
'document_number': document_request.document_number,
'purpose': document_request.purpose,
'addressee': document_request.addressee or '',
}
# Add salary information if requested
if document_request.include_salary:
salary = SalaryInformation.objects.filter(
employee=employee,
tenant=tenant,
is_active=True
).first()
if salary:
context_data.update({
'basic_salary': f"{salary.basic_salary:,.2f}",
'total_salary': f"{salary.total_salary:,.2f}",
'currency': salary.get_currency_display(),
})
# Render template content
from django.template import Template, Context
template_obj = Template(template.content)
rendered_content = template_obj.render(Context(context_data))
# Generate PDF (simplified - in production, use proper PDF library)
try:
from django.http import HttpResponse
from reportlab.lib.pagesizes import letter, A4
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib.units import inch
import io
buffer = io.BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=A4)
elements = []
styles = getSampleStyleSheet()
# Add content
for line in rendered_content.split('\n'):
if line.strip():
elements.append(Paragraph(line, styles['Normal']))
elements.append(Spacer(1, 0.2 * inch))
doc.build(elements)
# Save PDF to document request
from django.core.files.base import ContentFile
pdf_content = buffer.getvalue()
buffer.close()
filename = f"{document_request.document_number}.pdf"
document_request.generated_document.save(filename, ContentFile(pdf_content))
document_request.status = 'READY'
document_request.processed_at = timezone.now()
document_request.save()
messages.success(request, 'Document generated successfully.')
return redirect('hr:document_request_detail', pk=pk)
except ImportError:
messages.error(request, 'PDF generation requires reportlab package.')
return redirect('hr:document_request_detail', pk=pk)
@login_required
def download_document(request, pk):
"""
Download generated document PDF.
"""
tenant = request.user.tenant
document_request = get_object_or_404(
DocumentRequest,
pk=pk,
employee__tenant=tenant
)
# Check permissions - employee can download their own, HR can download all
if hasattr(request.user, 'employee_profile'):
if (document_request.employee != request.user.employee_profile and
not request.user.has_perm('hr.view_documentrequest')):
messages.error(request, 'You do not have permission to download this document.')
return redirect('hr:document_request_detail', pk=pk)
if not document_request.generated_document:
messages.error(request, 'No document has been generated yet.')
return redirect('hr:document_request_detail', pk=pk)
# Serve the file
from django.http import FileResponse
response = FileResponse(document_request.generated_document.open('rb'))
response['Content-Type'] = 'application/pdf'
response['Content-Disposition'] = f'attachment; filename="{document_request.document_number}.pdf"'
# Mark as delivered if not already
if document_request.status == 'READY':
document_request.status = 'DELIVERED'
document_request.delivered_at = timezone.now()
document_request.save()
return response
@login_required
def my_document_requests(request):
"""
Employee's own document requests.
"""
if not hasattr(request.user, 'employee_profile'):
messages.error(request, 'You must have an employee profile.')
return redirect('hr:dashboard')
tenant = request.user.tenant
employee = request.user.employee_profile
# Get all document requests for this employee
document_requests = DocumentRequest.objects.filter(
employee=employee,
employee__tenant=tenant
).order_by('-created_at')
# Apply filters
status = request.GET.get('status')
if status:
document_requests = document_requests.filter(status=status)
document_type = request.GET.get('document_type')
if document_type:
document_requests = document_requests.filter(document_type=document_type)
# Paginate
from django.core.paginator import Paginator
paginator = Paginator(document_requests, 20)
page_number = request.GET.get('page')
page_obj = paginator.get_page(page_number)
context = {
'employee': employee,
'document_requests': page_obj,
'statuses': DocumentRequest.RequestStatus.choices,
'document_types': DocumentRequest.DocumentType.choices
}
return render(request, 'hr/documents/my_document_requests.html', context)
@login_required
@transaction.atomic
def approve_document_request(request, pk):
"""
Approve a document request (if template requires approval).
"""
tenant = request.user.tenant
document_request = get_object_or_404(
DocumentRequest,
pk=pk,
employee__tenant=tenant
)
# Check permissions
if not request.user.has_perm('hr.change_documentrequest'):
messages.error(request, 'You do not have permission to approve document requests.')
return redirect('hr:document_request_detail', pk=pk)
# Check if template requires approval
template = DocumentTemplate.objects.filter(
tenant=request.user.tenant,
document_type=document_request.document_type,
language=document_request.language,
is_active=True
).first()
if not template or not template.requires_approval:
messages.error(request, 'This document type does not require approval.')
return redirect('hr:document_request_detail', pk=pk)
if document_request.status != 'PENDING':
messages.error(request, 'Only pending requests can be approved.')
return redirect('hr:document_request_detail', pk=pk)
if request.method == 'POST':
action = request.POST.get('action')
notes = request.POST.get('notes', '')
if action == 'approve':
document_request.status = 'IN_PROGRESS'
document_request.notes = f"{document_request.notes or ''}\n\nApproved by {request.user.get_full_name()}: {notes}".strip()
messages.success(request, 'Document request approved.')
elif action == 'reject':
document_request.status = 'REJECTED'
document_request.notes = f"{document_request.notes or ''}\n\nRejected by {request.user.get_full_name()}: {notes}".strip()
messages.success(request, 'Document request rejected.')
document_request.save()
return redirect('hr:document_request_detail', pk=pk)
context = {
'document_request': document_request,
'template': template
}
return render(request, 'hr/documents/document_request_approve.html', context)