2025-08-12 13:33:25 +03:00

508 lines
20 KiB
Python

from rest_framework import viewsets, permissions, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework import filters
from django.db.models import Q, Count, Sum, F
from django.utils import timezone
from datetime import timedelta
from decimal import Decimal
from ..models import (
InventoryItem, InventoryStock, InventoryLocation,
PurchaseOrder, PurchaseOrderItem, Supplier
)
from .serializers import (
InventoryItemSerializer, InventoryStockSerializer, InventoryLocationSerializer,
PurchaseOrderSerializer, PurchaseOrderItemSerializer, SupplierSerializer,
InventoryStatsSerializer, StockAdjustmentSerializer, StockTransferSerializer,
PurchaseOrderCreateSerializer, ReceiveOrderSerializer
)
from core.utils import AuditLogger
class BaseViewSet(viewsets.ModelViewSet):
"""Base ViewSet with common functionality"""
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
def get_queryset(self):
# Filter by tenant if user has one
if hasattr(self.request.user, 'tenant') and self.request.user.tenant:
return self.queryset.filter(tenant=self.request.user.tenant)
return self.queryset
def perform_create(self, serializer):
if hasattr(self.request.user, 'tenant'):
serializer.save(tenant=self.request.user.tenant)
else:
serializer.save()
class SupplierViewSet(BaseViewSet):
"""ViewSet for Supplier model"""
queryset = Supplier.objects.all()
serializer_class = SupplierSerializer
filterset_fields = ['is_active', 'city', 'state', 'country']
search_fields = ['name', 'contact_person', 'email', 'phone']
ordering_fields = ['name', 'created_at']
ordering = ['name']
@action(detail=False, methods=['get'])
def active(self, request):
"""Get active suppliers"""
queryset = self.get_queryset().filter(is_active=True)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class InventoryLocationViewSet(BaseViewSet):
"""ViewSet for InventoryLocation model"""
queryset = InventoryLocation.objects.all()
serializer_class = InventoryLocationSerializer
filterset_fields = ['location_type', 'building', 'floor', 'is_active']
search_fields = ['name', 'description', 'room', 'zone']
ordering_fields = ['name', 'building', 'floor']
ordering = ['building', 'floor', 'name']
@action(detail=False, methods=['get'])
def active(self, request):
"""Get active locations"""
queryset = self.get_queryset().filter(is_active=True)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class InventoryItemViewSet(BaseViewSet):
"""ViewSet for InventoryItem model"""
queryset = InventoryItem.objects.all()
serializer_class = InventoryItemSerializer
filterset_fields = [
'category', 'manufacturer', 'is_active', 'requires_expiry_tracking',
'requires_lot_tracking', 'is_controlled_substance'
]
search_fields = ['name', 'description', 'sku', 'barcode', 'model_number']
ordering_fields = ['name', 'category', 'unit_cost']
ordering = ['name']
@action(detail=False, methods=['get'])
def active(self, request):
"""Get active items"""
queryset = self.get_queryset().filter(is_active=True)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def by_category(self, request):
"""Get items by category"""
category = request.query_params.get('category')
if category:
queryset = self.get_queryset().filter(category=category, is_active=True)
else:
queryset = self.get_queryset().filter(is_active=True)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class InventoryStockViewSet(BaseViewSet):
"""ViewSet for InventoryStock model"""
queryset = InventoryStock.objects.all()
serializer_class = InventoryStockSerializer
filterset_fields = ['item', 'location']
search_fields = [
'item__name', 'item__sku', 'location__name', 'lot_number'
]
ordering_fields = ['item__name', 'quantity_on_hand', 'expiration_date']
ordering = ['item__name']
@action(detail=False, methods=['get'])
def low_stock(self, request):
"""Get low stock items"""
queryset = self.get_queryset().filter(
quantity_on_hand__lte=F('item__reorder_level')
)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def expired(self, request):
"""Get expired items"""
today = timezone.now().date()
queryset = self.get_queryset().filter(expiration_date__lt=today)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def expiring_soon(self, request):
"""Get items expiring within 30 days"""
today = timezone.now().date()
thirty_days = today + timedelta(days=30)
queryset = self.get_queryset().filter(
expiration_date__lte=thirty_days,
expiration_date__gte=today
)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def adjust(self, request, pk=None):
"""Adjust stock quantity"""
stock = self.get_object()
serializer = StockAdjustmentSerializer(data=request.data)
if serializer.is_valid():
adjustment_type = serializer.validated_data['adjustment_type']
quantity = serializer.validated_data['quantity']
reason = serializer.validated_data['reason']
notes = serializer.validated_data.get('notes', '')
old_quantity = stock.quantity_on_hand
if adjustment_type == 'ADD':
stock.quantity_on_hand += quantity
elif adjustment_type == 'SUBTRACT':
stock.quantity_on_hand = max(0, stock.quantity_on_hand - quantity)
elif adjustment_type == 'SET':
stock.quantity_on_hand = quantity
stock.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='STOCK_ADJUSTED',
model='InventoryStock',
object_id=str(stock.stock_id),
details={
'item_name': stock.item.name,
'location': stock.location.name,
'old_quantity': old_quantity,
'new_quantity': stock.quantity_on_hand,
'adjustment_type': adjustment_type,
'reason': reason
}
)
return Response({'message': 'Stock adjusted successfully'})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['post'])
def transfer(self, request, pk=None):
"""Transfer stock to another location"""
from_stock = self.get_object()
serializer = StockTransferSerializer(data=request.data)
if serializer.is_valid():
to_location = InventoryLocation.objects.get(
id=serializer.validated_data['to_location_id']
)
quantity = serializer.validated_data['quantity']
reason = serializer.validated_data['reason']
notes = serializer.validated_data.get('notes', '')
if from_stock.quantity_available < quantity:
return Response(
{'error': 'Insufficient available quantity'},
status=status.HTTP_400_BAD_REQUEST
)
# Reduce from source
from_stock.quantity_on_hand -= quantity
from_stock.save()
# Add to destination (create or update)
to_stock, created = InventoryStock.objects.get_or_create(
item=from_stock.item,
location=to_location,
lot_number=from_stock.lot_number,
expiration_date=from_stock.expiration_date,
defaults={
'quantity_on_hand': quantity,
'unit_cost': from_stock.unit_cost,
'tenant': getattr(request.user, 'tenant', None)
}
)
if not created:
to_stock.quantity_on_hand += quantity
to_stock.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='STOCK_TRANSFERRED',
model='InventoryStock',
object_id=str(from_stock.stock_id),
details={
'item_name': from_stock.item.name,
'from_location': from_stock.location.name,
'to_location': to_location.name,
'quantity': quantity,
'reason': reason
}
)
return Response({'message': 'Stock transferred successfully'})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class PurchaseOrderViewSet(BaseViewSet):
"""ViewSet for PurchaseOrder model"""
queryset = PurchaseOrder.objects.all()
serializer_class = PurchaseOrderSerializer
filterset_fields = ['status', 'supplier', 'created_by']
search_fields = ['order_number', 'supplier__name', 'notes']
ordering_fields = ['order_date', 'expected_delivery_date', 'grand_total']
ordering = ['-order_date']
@action(detail=False, methods=['post'])
def create_order(self, request):
"""Create a purchase order"""
serializer = PurchaseOrderCreateSerializer(data=request.data)
if serializer.is_valid():
supplier = Supplier.objects.get(id=serializer.validated_data['supplier_id'])
# Create purchase order
order = PurchaseOrder.objects.create(
supplier=supplier,
order_date=timezone.now().date(),
expected_delivery_date=serializer.validated_data['expected_delivery_date'],
status='PENDING',
notes=serializer.validated_data.get('notes', ''),
created_by=request.user,
tenant=getattr(request.user, 'tenant', None)
)
# Create order items
total_amount = Decimal('0.00')
for item_data in serializer.validated_data['items']:
item = InventoryItem.objects.get(id=item_data['item_id'])
line_total = Decimal(str(item_data['quantity'])) * Decimal(str(item_data['unit_cost']))
PurchaseOrderItem.objects.create(
purchase_order=order,
item=item,
quantity_ordered=item_data['quantity'],
unit_cost=item_data['unit_cost'],
notes=item_data.get('notes', ''),
tenant=getattr(request.user, 'tenant', None)
)
total_amount += line_total
# Update order totals
order.total_amount = total_amount
order.grand_total = total_amount + (order.tax_amount or 0) + (order.shipping_cost or 0)
order.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='PURCHASE_ORDER_CREATED',
model='PurchaseOrder',
object_id=str(order.order_id),
details={
'order_number': order.order_number,
'supplier': supplier.name,
'total_amount': float(total_amount)
}
)
return Response({
'message': 'Purchase order created successfully',
'order': PurchaseOrderSerializer(order).data
})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['get'])
def pending(self, request):
"""Get pending orders"""
queryset = self.get_queryset().filter(status='PENDING')
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def receive(self, request, pk=None):
"""Receive a purchase order"""
order = self.get_object()
serializer = ReceiveOrderSerializer(data=request.data)
if serializer.is_valid():
notes = serializer.validated_data.get('notes', '')
# Process received items
for item_data in serializer.validated_data['items']:
order_item = PurchaseOrderItem.objects.get(
purchase_order=order,
id=item_data['item_id']
)
quantity_received = item_data['quantity_received']
order_item.quantity_received += quantity_received
order_item.save()
# Update inventory stock
# For simplicity, assume default location
default_location = InventoryLocation.objects.filter(
location_type='WAREHOUSE'
).first()
if default_location:
stock, created = InventoryStock.objects.get_or_create(
item=order_item.item,
location=default_location,
defaults={
'quantity_on_hand': quantity_received,
'unit_cost': order_item.unit_cost,
'tenant': getattr(request.user, 'tenant', None)
}
)
if not created:
stock.quantity_on_hand += quantity_received
stock.save()
# Update order status
order.status = 'RECEIVED'
order.actual_delivery_date = timezone.now().date()
order.notes = f"{order.notes}\nReceived: {notes}" if notes else order.notes
order.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='PURCHASE_ORDER_RECEIVED',
model='PurchaseOrder',
object_id=str(order.order_id),
details={
'order_number': order.order_number,
'supplier': order.supplier.name
}
)
return Response({'message': 'Purchase order received successfully'})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['post'])
def cancel(self, request, pk=None):
"""Cancel a purchase order"""
order = self.get_object()
reason = request.data.get('reason', '')
order.status = 'CANCELLED'
order.notes = f"{order.notes}\nCancelled: {reason}"
order.save()
# Log the action
AuditLogger.log_action(
user=request.user,
action='PURCHASE_ORDER_CANCELLED',
model='PurchaseOrder',
object_id=str(order.order_id),
details={
'order_number': order.order_number,
'reason': reason
}
)
return Response({'message': 'Purchase order cancelled successfully'})
class PurchaseOrderItemViewSet(viewsets.ReadOnlyModelViewSet):
"""ViewSet for PurchaseOrderItem model (read-only)"""
queryset = PurchaseOrderItem.objects.all()
serializer_class = PurchaseOrderItemSerializer
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['purchase_order', 'item']
search_fields = ['item__name', 'item__sku']
ordering_fields = ['item__name', 'quantity_ordered']
ordering = ['item__name']
def get_queryset(self):
if hasattr(self.request.user, 'tenant') and self.request.user.tenant:
return self.queryset.filter(tenant=self.request.user.tenant)
return self.queryset
class InventoryStatsViewSet(viewsets.ViewSet):
"""ViewSet for inventory statistics"""
permission_classes = [permissions.IsAuthenticated]
@action(detail=False, methods=['get'])
def dashboard(self, request):
"""Get inventory dashboard statistics"""
tenant_filter = {}
if hasattr(request.user, 'tenant') and request.user.tenant:
tenant_filter['tenant'] = request.user.tenant
today = timezone.now().date()
thirty_days = today + timedelta(days=30)
# Item statistics
items = InventoryItem.objects.filter(**tenant_filter)
total_items = items.count()
# Stock statistics
stocks = InventoryStock.objects.filter(**tenant_filter)
low_stock_items = stocks.filter(
quantity_on_hand__lte=F('item__reorder_level')
).count()
expired_items = stocks.filter(expiration_date__lt=today).count()
expiring_soon = stocks.filter(
expiration_date__lte=thirty_days,
expiration_date__gte=today
).count()
# Total inventory value
total_value = stocks.aggregate(
total=Sum(F('quantity_on_hand') * F('unit_cost'))
)['total'] or 0
# Purchase order statistics
orders = PurchaseOrder.objects.filter(**tenant_filter)
pending_orders = orders.filter(status='PENDING').count()
received_today = orders.filter(
actual_delivery_date=today
).count()
# Category breakdown
category_breakdown = items.values('category').annotate(
count=Count('id')
).order_by('-count')
# Location utilization (mock data)
locations = InventoryLocation.objects.filter(**tenant_filter, is_active=True)
location_utilization = {}
for location in locations:
# Calculate utilization based on stock levels
location_stocks = stocks.filter(location=location)
total_items_in_location = location_stocks.aggregate(
total=Sum('quantity_on_hand')
)['total'] or 0
# Assume capacity utilization (mock calculation)
utilization = min((total_items_in_location / (location.capacity or 1000)) * 100, 100)
location_utilization[location.name] = round(utilization, 1)
stats = {
'total_items': total_items,
'low_stock_items': low_stock_items,
'expired_items': expired_items,
'expiring_soon': expiring_soon,
'total_value': total_value,
'pending_orders': pending_orders,
'received_today': received_today,
'category_breakdown': {item['category']: item['count'] for item in category_breakdown},
'location_utilization': location_utilization
}
serializer = InventoryStatsSerializer(stats)
return Response(serializer.data)