2025-08-12 13:33:25 +03:00

593 lines
23 KiB
Python

from rest_framework import viewsets, permissions, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework import filters
from django.db.models import Q, Count, Avg, Sum
from django.utils import timezone
from datetime import timedelta, datetime
from ..models import (
Employee, Department, Schedule, ScheduleAssignment,
TimeEntry, PerformanceReview, TrainingRecord
)
from .serializers import (
EmployeeSerializer, DepartmentSerializer, ScheduleSerializer,
ScheduleAssignmentSerializer, TimeEntrySerializer, PerformanceReviewSerializer,
TrainingRecordSerializer, HRStatsSerializer, ClockInOutSerializer,
ScheduleCreateSerializer, PerformanceReviewCreateSerializer, TrainingRecordCreateSerializer
)
from core.utils import AuditLogger
class BaseViewSet(viewsets.ModelViewSet):
"""Base ViewSet with common functionality"""
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
def get_queryset(self):
# Filter by tenant if user has one
if hasattr(self.request.user, 'tenant') and self.request.user.tenant:
return self.queryset.filter(tenant=self.request.user.tenant)
return self.queryset
def perform_create(self, serializer):
if hasattr(self.request.user, 'tenant'):
serializer.save(tenant=self.request.user.tenant)
else:
serializer.save()
class DepartmentViewSet(BaseViewSet):
"""ViewSet for Department model"""
queryset = Department.objects.all()
serializer_class = DepartmentSerializer
filterset_fields = ['is_active', 'manager']
search_fields = ['name', 'description', 'code', 'location']
ordering_fields = ['name', 'code']
ordering = ['name']
@action(detail=False, methods=['get'])
def active(self, request):
"""Get active departments"""
queryset = self.get_queryset().filter(is_active=True)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class EmployeeViewSet(BaseViewSet):
"""ViewSet for Employee model"""
queryset = Employee.objects.all()
serializer_class = EmployeeSerializer
filterset_fields = [
'department', 'employment_status', 'supervisor', 'is_active'
]
search_fields = [
'employee_number', 'user__first_name', 'user__last_name',
'user__email', 'job_title'
]
ordering_fields = ['employee_number', 'hire_date', 'user__last_name']
ordering = ['user__last_name', 'user__first_name']
@action(detail=False, methods=['get'])
def active(self, request):
"""Get active employees"""
queryset = self.get_queryset().filter(is_active=True)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def by_department(self, request):
"""Get employees by department"""
department_id = request.query_params.get('department_id')
if department_id:
queryset = self.get_queryset().filter(
department_id=department_id,
is_active=True
)
else:
queryset = self.get_queryset().filter(is_active=True)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def terminate(self, request, pk=None):
"""Terminate an employee"""
employee = self.get_object()
termination_date = request.data.get('termination_date')
reason = request.data.get('reason', '')
if not termination_date:
return Response(
{'error': 'Termination date is required'},
status=status.HTTP_400_BAD_REQUEST
)
employee.employment_status = 'TERMINATED'
employee.termination_date = termination_date
employee.is_active = False
employee.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='EMPLOYEE_TERMINATED',
model='Employee',
object_id=str(employee.employee_id),
details={
'employee_number': employee.employee_number,
'termination_date': termination_date,
'reason': reason
}
)
return Response({'message': 'Employee terminated successfully'})
class ScheduleViewSet(BaseViewSet):
"""ViewSet for Schedule model"""
queryset = Schedule.objects.all()
serializer_class = ScheduleSerializer
filterset_fields = ['is_published', 'created_by']
search_fields = ['name', 'description']
ordering_fields = ['start_date', 'end_date', 'name']
ordering = ['-start_date']
@action(detail=False, methods=['post'])
def create_schedule(self, request):
"""Create a schedule with assignments"""
serializer = ScheduleCreateSerializer(data=request.data)
if serializer.is_valid():
# Create schedule
schedule = Schedule.objects.create(
name=serializer.validated_data['name'],
description=serializer.validated_data.get('description', ''),
start_date=serializer.validated_data['start_date'],
end_date=serializer.validated_data['end_date'],
notes=serializer.validated_data.get('notes', ''),
created_by=request.user,
tenant=getattr(request.user, 'tenant', None)
)
# Create assignments
for assignment_data in serializer.validated_data['assignments']:
employee = Employee.objects.get(id=assignment_data['employee_id'])
ScheduleAssignment.objects.create(
schedule=schedule,
employee=employee,
date=assignment_data['date'],
start_time=assignment_data['start_time'],
end_time=assignment_data['end_time'],
shift_type=assignment_data['shift_type'],
location=assignment_data.get('location', ''),
notes=assignment_data.get('notes', ''),
tenant=getattr(request.user, 'tenant', None)
)
# Log the action
AuditLogger.log_action(
user=request.user,
action='SCHEDULE_CREATED',
model='Schedule',
object_id=str(schedule.schedule_id),
details={
'schedule_name': schedule.name,
'assignments_count': len(serializer.validated_data['assignments'])
}
)
return Response({
'message': 'Schedule created successfully',
'schedule': ScheduleSerializer(schedule).data
})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['post'])
def publish(self, request, pk=None):
"""Publish a schedule"""
schedule = self.get_object()
schedule.is_published = True
schedule.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='SCHEDULE_PUBLISHED',
model='Schedule',
object_id=str(schedule.schedule_id),
details={'schedule_name': schedule.name}
)
return Response({'message': 'Schedule published successfully'})
class ScheduleAssignmentViewSet(BaseViewSet):
"""ViewSet for ScheduleAssignment model"""
queryset = ScheduleAssignment.objects.all()
serializer_class = ScheduleAssignmentSerializer
filterset_fields = ['schedule', 'employee', 'date', 'shift_type']
search_fields = ['employee__user__first_name', 'employee__user__last_name', 'location']
ordering_fields = ['date', 'start_time']
ordering = ['date', 'start_time']
@action(detail=False, methods=['get'])
def today(self, request):
"""Get today's assignments"""
today = timezone.now().date()
queryset = self.get_queryset().filter(date=today)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def by_employee(self, request):
"""Get assignments by employee"""
employee_id = request.query_params.get('employee_id')
start_date = request.query_params.get('start_date')
end_date = request.query_params.get('end_date')
queryset = self.get_queryset()
if employee_id:
queryset = queryset.filter(employee_id=employee_id)
if start_date and end_date:
queryset = queryset.filter(date__range=[start_date, end_date])
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class TimeEntryViewSet(BaseViewSet):
"""ViewSet for TimeEntry model"""
queryset = TimeEntry.objects.all()
serializer_class = TimeEntrySerializer
filterset_fields = ['employee', 'date', 'entry_type', 'approved_by']
search_fields = ['employee__user__first_name', 'employee__user__last_name']
ordering_fields = ['date', 'clock_in_time']
ordering = ['-date', '-clock_in_time']
@action(detail=False, methods=['post'])
def clock_in_out(self, request):
"""Handle clock in/out operations"""
serializer = ClockInOutSerializer(data=request.data)
if serializer.is_valid():
employee = Employee.objects.get(id=serializer.validated_data['employee_id'])
action = serializer.validated_data['action']
notes = serializer.validated_data.get('notes', '')
today = timezone.now().date()
now = timezone.now().time()
# Get or create today's time entry
time_entry, created = TimeEntry.objects.get_or_create(
employee=employee,
date=today,
defaults={
'entry_type': 'REGULAR',
'notes': notes,
'tenant': getattr(request.user, 'tenant', None)
}
)
# Update based on action
if action == 'CLOCK_IN':
time_entry.clock_in_time = now
elif action == 'CLOCK_OUT':
time_entry.clock_out_time = now
elif action == 'BREAK_START':
time_entry.break_start_time = now
elif action == 'BREAK_END':
time_entry.break_end_time = now
time_entry.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action=f'TIME_{action}',
model='TimeEntry',
object_id=str(time_entry.entry_id),
details={
'employee': employee.user.get_full_name(),
'action': action,
'time': str(now)
}
)
return Response({
'message': f'{action.replace("_", " ").title()} recorded successfully',
'time_entry': TimeEntrySerializer(time_entry).data
})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['get'])
def pending_approval(self, request):
"""Get time entries pending approval"""
queryset = self.get_queryset().filter(approved_by__isnull=True)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def approve(self, request, pk=None):
"""Approve a time entry"""
time_entry = self.get_object()
time_entry.approved_by = request.user
time_entry.approved_date = timezone.now()
time_entry.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='TIME_ENTRY_APPROVED',
model='TimeEntry',
object_id=str(time_entry.entry_id),
details={
'employee': time_entry.employee.user.get_full_name(),
'date': str(time_entry.date)
}
)
return Response({'message': 'Time entry approved successfully'})
class PerformanceReviewViewSet(BaseViewSet):
"""ViewSet for PerformanceReview model"""
queryset = PerformanceReview.objects.all()
serializer_class = PerformanceReviewSerializer
filterset_fields = ['employee', 'reviewer', 'review_type', 'is_final']
search_fields = [
'employee__user__first_name', 'employee__user__last_name',
'reviewer__first_name', 'reviewer__last_name'
]
ordering_fields = ['review_date', 'overall_rating']
ordering = ['-review_date']
@action(detail=False, methods=['post'])
def create_review(self, request):
"""Create a performance review"""
serializer = PerformanceReviewCreateSerializer(data=request.data)
if serializer.is_valid():
employee = Employee.objects.get(id=serializer.validated_data['employee_id'])
# Create performance review
review = PerformanceReview.objects.create(
employee=employee,
reviewer=request.user,
review_period_start=serializer.validated_data['review_period_start'],
review_period_end=serializer.validated_data['review_period_end'],
review_date=timezone.now().date(),
review_type=serializer.validated_data['review_type'],
overall_rating=serializer.validated_data['overall_rating'],
goals_achievements=serializer.validated_data['goals_achievements'],
strengths=serializer.validated_data['strengths'],
areas_for_improvement=serializer.validated_data['areas_for_improvement'],
development_plan=serializer.validated_data['development_plan'],
comments=serializer.validated_data.get('comments', ''),
tenant=getattr(request.user, 'tenant', None)
)
# Log the action
AuditLogger.log_action(
user=request.user,
action='PERFORMANCE_REVIEW_CREATED',
model='PerformanceReview',
object_id=str(review.review_id),
details={
'employee': employee.user.get_full_name(),
'review_type': review.review_type,
'overall_rating': review.overall_rating
}
)
return Response({
'message': 'Performance review created successfully',
'review': PerformanceReviewSerializer(review).data
})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['post'])
def finalize(self, request, pk=None):
"""Finalize a performance review"""
review = self.get_object()
review.is_final = True
review.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='PERFORMANCE_REVIEW_FINALIZED',
model='PerformanceReview',
object_id=str(review.review_id),
details={
'employee': review.employee.user.get_full_name(),
'overall_rating': review.overall_rating
}
)
return Response({'message': 'Performance review finalized successfully'})
class TrainingRecordViewSet(BaseViewSet):
"""ViewSet for TrainingRecord model"""
queryset = TrainingRecord.objects.all()
serializer_class = TrainingRecordSerializer
filterset_fields = ['employee', 'training_type', 'status', 'provider']
search_fields = [
'employee__user__first_name', 'employee__user__last_name',
'training_name', 'provider'
]
ordering_fields = ['start_date', 'completion_date', 'expiration_date']
ordering = ['-start_date']
@action(detail=False, methods=['post'])
def create_record(self, request):
"""Create a training record"""
serializer = TrainingRecordCreateSerializer(data=request.data)
if serializer.is_valid():
employee = Employee.objects.get(id=serializer.validated_data['employee_id'])
# Create training record
record = TrainingRecord.objects.create(
employee=employee,
training_name=serializer.validated_data['training_name'],
training_type=serializer.validated_data['training_type'],
provider=serializer.validated_data['provider'],
start_date=serializer.validated_data['start_date'],
hours=serializer.validated_data['hours'],
cost=serializer.validated_data.get('cost'),
status='IN_PROGRESS',
notes=serializer.validated_data.get('notes', ''),
tenant=getattr(request.user, 'tenant', None)
)
# Log the action
AuditLogger.log_action(
user=request.user,
action='TRAINING_RECORD_CREATED',
model='TrainingRecord',
object_id=str(record.record_id),
details={
'employee': employee.user.get_full_name(),
'training_name': record.training_name,
'training_type': record.training_type
}
)
return Response({
'message': 'Training record created successfully',
'record': TrainingRecordSerializer(record).data
})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['post'])
def complete(self, request, pk=None):
"""Mark training as completed"""
record = self.get_object()
completion_date = request.data.get('completion_date', timezone.now().date())
certificate_number = request.data.get('certificate_number', '')
record.status = 'COMPLETED'
record.completion_date = completion_date
record.certificate_number = certificate_number
record.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='TRAINING_COMPLETED',
model='TrainingRecord',
object_id=str(record.record_id),
details={
'employee': record.employee.user.get_full_name(),
'training_name': record.training_name,
'completion_date': str(completion_date)
}
)
return Response({'message': 'Training marked as completed'})
@action(detail=False, methods=['get'])
def expiring_soon(self, request):
"""Get training records expiring within 30 days"""
thirty_days = timezone.now().date() + timedelta(days=30)
queryset = self.get_queryset().filter(
expiration_date__lte=thirty_days,
expiration_date__gte=timezone.now().date(),
status='COMPLETED'
)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class HRStatsViewSet(viewsets.ViewSet):
"""ViewSet for HR statistics"""
permission_classes = [permissions.IsAuthenticated]
@action(detail=False, methods=['get'])
def dashboard(self, request):
"""Get HR dashboard statistics"""
tenant_filter = {}
if hasattr(request.user, 'tenant') and request.user.tenant:
tenant_filter['tenant'] = request.user.tenant
today = timezone.now().date()
this_month_start = today.replace(day=1)
# Employee statistics
employees = Employee.objects.filter(**tenant_filter)
total_employees = employees.count()
active_employees = employees.filter(is_active=True).count()
new_hires_this_month = employees.filter(
hire_date__gte=this_month_start
).count()
terminations_this_month = employees.filter(
termination_date__gte=this_month_start
).count()
# Attendance statistics
time_entries = TimeEntry.objects.filter(**tenant_filter, date=today)
total_scheduled = ScheduleAssignment.objects.filter(
date=today,
**tenant_filter
).count()
present_count = time_entries.filter(clock_in_time__isnull=False).count()
attendance_rate = (present_count / total_scheduled * 100) if total_scheduled > 0 else 0
# Overtime calculation (mock)
overtime_hours = time_entries.aggregate(
total=Sum('hours_worked')
)['total'] or 0
# Assume standard 8-hour day
regular_hours = present_count * 8
overtime_hours = max(0, overtime_hours - regular_hours)
# Training completion rate
training_records = TrainingRecord.objects.filter(**tenant_filter)
total_training = training_records.count()
completed_training = training_records.filter(status='COMPLETED').count()
training_completion_rate = (completed_training / total_training * 100) if total_training > 0 else 0
# Department breakdown
department_breakdown = employees.filter(is_active=True).values(
'department__name'
).annotate(count=Count('id')).order_by('-count')
# Employment status breakdown
status_breakdown = employees.values('employment_status').annotate(
count=Count('id')
).order_by('-count')
stats = {
'total_employees': total_employees,
'active_employees': active_employees,
'new_hires_this_month': new_hires_this_month,
'terminations_this_month': terminations_this_month,
'attendance_rate': round(attendance_rate, 1),
'overtime_hours': round(overtime_hours, 1),
'training_completion_rate': round(training_completion_rate, 1),
'department_breakdown': {
item['department__name'] or 'Unassigned': item['count']
for item in department_breakdown
},
'employment_status_breakdown': {
item['employment_status']: item['count']
for item in status_breakdown
}
}
serializer = HRStatsSerializer(stats)
return Response(serializer.data)