from django.contrib.auth.mixins import LoginRequiredMixin from django.shortcuts import render from django.views.decorators.csrf import csrf_exempt from django.utils.decorators import method_decorator from django.views import View from django.http import JsonResponse from django.apps import apps from django.db import models from django.conf import settings from django.utils import timezone from datetime import timedelta import json import hashlib import logging import uuid import re from inventory import models as inventory_models from inventory.utils import get_user_type from .models import AnalysisCache from .services.llm_service import get_llm_chain from .services.analysis_service import ( generate_model_insight, generate_count_insight, generate_relationship_insight, generate_performance_insight, generate_statistics_insight, generate_recommendations ) from .services.cache_service import CacheService from .utils.response_formatter import format_response logger = logging.getLogger(__name__) @method_decorator(csrf_exempt, name='dispatch') class ModelAnalystView(View): """ View for handling model analysis requests and rendering the chatbot interface. This view provides both GET and POST methods: - GET: Renders the chatbot interface - POST: Processes analysis requests and returns JSON responses The view includes caching, permission checking, and multilingual support. """ # Configuration settings (can be moved to Django settings) CACHE_DURATION = getattr(settings, 'ANALYSIS_CACHE_DURATION', 3600) DEFAULT_LANGUAGE = getattr(settings, 'DEFAULT_LANGUAGE', 'en') def get(self, request, *args, **kwargs): """ Render the chatbot interface. :param request: The HTTP request :return: Rendered chatbot.html template """ context = { 'dark_mode': request.session.get('dark_mode', False) } return render(request, "haikalbot/chatbot.html", context) def post(self, request, *args, **kwargs): """ Process analysis requests and return JSON responses. :param request: The HTTP request containing the prompt :return: JsonResponse with analysis results """ try: # Parse request data data = json.loads(request.body) prompt = data.get('prompt') language = data.get('language', self.DEFAULT_LANGUAGE) dealer = get_user_type(request) # Validate request if not prompt: error_msg = "الاستعلام مطلوب" if language == 'ar' else "Prompt is required" return self._error_response(error_msg, 400) if not self._check_permissions(dealer.id): error_msg = "تم رفض الإذن" if language == 'ar' else "Permission denied" return self._error_response(error_msg, 403) # Check cache cache_service = CacheService() prompt_hash = cache_service.generate_hash(prompt, dealer.id, language) cached_result = cache_service.get_cached_result(prompt_hash, request.user, dealer.id) if cached_result: return JsonResponse(cached_result) # Process prompt and generate insights insights = self._process_prompt(prompt, dealer, language) # Cache results cache_service.cache_result( prompt_hash, insights, request.user, dealer.id, self.CACHE_DURATION ) return JsonResponse(insights) except json.JSONDecodeError: error_msg = "بيانات JSON غير صالحة في نص الطلب" if language == 'ar' else "Invalid JSON in request body" return self._error_response(error_msg, 400) except Exception as e: logger.exception("Error processing model analysis request") error_msg = f"حدث خطأ: {str(e)}" if language == 'ar' else f"An error occurred: {str(e)}" return self._error_response(error_msg, 500) def _error_response(self, message, status): """ Create a standardized error response. :param message: Error message :param status: HTTP status code :return: JsonResponse with error details """ return JsonResponse({"status": "error", "message": message}, status=status) def _check_permissions(self, dealer_id): """ Check if the dealer has permissions to access the analysis. :param dealer_id: ID of the dealer :return: True if dealer has permissions, False otherwise """ try: return inventory_models.Dealer.objects.filter(id=dealer_id).exists() except Exception: logger.exception("Error checking permissions") return False def _process_prompt(self, prompt, dealer, language): """ Process the prompt and generate insights. :param prompt: User's prompt text :param dealer: Dealer object :param language: Language code (e.g., 'en', 'ar') :return: Dictionary with analysis results """ # Initialize response structure response = format_response( prompt=prompt, language=language, request_id=str(uuid.uuid4()), timestamp=timezone.now().isoformat() ) # Get LLM chain for prompt analysis chain = get_llm_chain(language=language) # Parse prompt using LLM if chain: try: result = chain.invoke({"prompt": prompt}) json_match = re.search(r'({.*})', result.replace('\n', ' '), re.DOTALL) result = json.loads(json_match.group(1)) if json_match else {} except Exception as e: logger.error(f"LLM error fallback: {e}") result = {} else: result = {} # Extract analysis parameters analysis_type = result.get('analysis_type', 'general') target_models = result.get('target_models', []) query_params = result.get('query_params', {}) # Get models to analyze all_models = list(apps.get_models()) models_to_analyze = self._filter_models(all_models, target_models) if dealer: models_to_analyze = self._filter_by_dealer(models_to_analyze, dealer.id) # Select analysis method based on type analysis_method = { 'count': generate_count_insight, 'relationship': generate_relationship_insight, 'performance': generate_performance_insight, 'statistics': generate_statistics_insight }.get(analysis_type, self._generate_model_insight_all) # Generate insights insights = analysis_method(models_to_analyze, query_params, dealer.id if dealer else None, language) # Add insights to response insights_key = "التحليلات" if language == 'ar' else "insights" if isinstance(insights, list): response[insights_key].extend(insights) else: response[insights_key].append(insights) # Generate recommendations recommendations = generate_recommendations(models_to_analyze, analysis_type, language) if recommendations: recs_key = "التوصيات" if language == 'ar' else "recommendations" response[recs_key] = recommendations # Add plain text summary for response summary_lines = [] for insight in response[insights_key]: if isinstance(insight, dict): summary_lines.append(insight.get('type', 'Insight')) else: summary_lines.append(str(insight)) response['response'] = "\n".join(summary_lines) return response def _filter_models(self, all_models, target_models): """ Filter models based on target model names. :param all_models: List of all available models :param target_models: List of target model names :return: Filtered list of models """ if not target_models: return all_models return [m for m in all_models if m.__name__ in target_models or m.__name__.lower() in [t.lower() for t in target_models]] def _filter_by_dealer(self, models, dealer_id): """ Filter models that are relevant to the dealer. :param models: List of models :param dealer_id: ID of the dealer :return: Filtered list of models """ dealer_models = [m for m in models if any(f.name in ('dealer', 'dealer_id') for f in m._meta.fields)] return dealer_models if dealer_models else models def _generate_model_insight_all(self, models, query_params, dealer_id, language): """ Generate insights for all models. :param models: List of models :param query_params: Query parameters :param dealer_id: ID of the dealer :param language: Language code :return: List of insights """ return [generate_model_insight(m, dealer_id, language) for m in models]