232 lines
8.5 KiB
Python
232 lines
8.5 KiB
Python
from django.db.models import Avg, Sum, Max, Min, ForeignKey, OneToOneField
|
||
import inspect
|
||
from django.db import models
|
||
from django.utils.translation import gettext_lazy as _
|
||
|
||
|
||
def _localized_keys(language):
|
||
if language == 'ar':
|
||
return {
|
||
'type': 'نوع', 'model': 'النموذج', 'count': 'العدد', 'filters': 'الفلاتر_المطبقة',
|
||
'error': 'خطأ', 'chart_type': 'نوع_الرسم_البياني', 'labels': 'التسميات', 'data': 'البيانات',
|
||
'visualization_data': 'بيانات_الرسم_البياني', 'field': 'الحقل', 'value': 'القيمة',
|
||
'statistic_type': 'نوع_الإحصاء', 'results': 'النتائج', 'title': 'العنوان'
|
||
}
|
||
else:
|
||
return {
|
||
'type': 'type', 'model': 'model', 'count': 'count', 'filters': 'filters_applied',
|
||
'error': 'error', 'chart_type': 'chart_type', 'labels': 'labels', 'data': 'data',
|
||
'visualization_data': 'visualization_data', 'field': 'field', 'value': 'value',
|
||
'statistic_type': 'statistic_type', 'results': 'results', 'title': 'title'
|
||
}
|
||
|
||
|
||
def generate_count_insight(models, query_params, dealer_id=None, language='ar'):
|
||
keys = _localized_keys(language)
|
||
results = []
|
||
|
||
for model in models:
|
||
try:
|
||
queryset = model.objects.all()
|
||
|
||
if dealer_id:
|
||
if hasattr(model, 'dealer_id'):
|
||
queryset = queryset.filter(dealer_id=dealer_id)
|
||
elif hasattr(model, 'dealer'):
|
||
queryset = queryset.filter(dealer=dealer_id)
|
||
|
||
filters = {}
|
||
for key, value in query_params.items():
|
||
if key not in ['field', 'operation'] and hasattr(model, key):
|
||
try:
|
||
field = model._meta.get_field(key)
|
||
if isinstance(field, models.IntegerField):
|
||
value = int(value)
|
||
elif isinstance(field, models.BooleanField):
|
||
value = value.lower() in ('true', '1', 'yes')
|
||
except Exception:
|
||
pass
|
||
filters[key] = value
|
||
|
||
if filters:
|
||
queryset = queryset.filter(**filters)
|
||
|
||
results.append({
|
||
keys['model']: model.__name__,
|
||
keys['count']: queryset.count(),
|
||
keys['filters']: filters
|
||
})
|
||
|
||
except Exception as e:
|
||
results.append({
|
||
keys['model']: model.__name__,
|
||
keys['error']: str(e)
|
||
})
|
||
|
||
return {
|
||
'type': keys['type'] + '_analysis',
|
||
keys['results']: results,
|
||
keys['visualization_data']: {
|
||
keys['chart_type']: 'bar',
|
||
keys['labels']: [r[keys['model']] for r in results if keys['count'] in r],
|
||
keys['data']: [r[keys['count']] for r in results if keys['count'] in r]
|
||
}
|
||
}
|
||
|
||
|
||
def generate_statistics_insight(models, query_params, dealer_id=None, language='ar'):
|
||
keys = _localized_keys(language)
|
||
results = []
|
||
field = query_params.get('field')
|
||
operation = query_params.get('operation', 'average')
|
||
|
||
for model in models:
|
||
try:
|
||
if not field or not hasattr(model, field):
|
||
continue
|
||
|
||
queryset = model.objects.all()
|
||
if dealer_id:
|
||
if hasattr(model, 'dealer_id'):
|
||
queryset = queryset.filter(dealer_id=dealer_id)
|
||
elif hasattr(model, 'dealer'):
|
||
queryset = queryset.filter(dealer=dealer_id)
|
||
|
||
filters = {}
|
||
for k, v in query_params.items():
|
||
if k not in ['field', 'operation'] and hasattr(model, k):
|
||
filters[k] = v
|
||
|
||
if filters:
|
||
queryset = queryset.filter(**filters)
|
||
|
||
stat_map = {
|
||
'average': Avg,
|
||
'sum': Sum,
|
||
'max': Max,
|
||
'min': Min
|
||
}
|
||
|
||
if operation in stat_map:
|
||
agg = queryset.aggregate(val=stat_map[operation](field))['val']
|
||
value = agg
|
||
else:
|
||
value = queryset.count()
|
||
|
||
results.append({
|
||
keys['model']: model.__name__,
|
||
keys['field']: field,
|
||
keys['statistic_type']: operation,
|
||
keys['value']: value,
|
||
keys['filters']: filters
|
||
})
|
||
|
||
except Exception as e:
|
||
results.append({keys['model']: model.__name__, keys['error']: str(e)})
|
||
|
||
return {
|
||
'type': keys['type'] + '_analysis',
|
||
keys['results']: results,
|
||
keys['visualization_data']: {
|
||
keys['chart_type']: 'bar',
|
||
keys['labels']: [f"{r[keys['model']]}.{r[keys['field']]}" for r in results if keys['value'] in r],
|
||
keys['data']: [r[keys['value']] for r in results if keys['value'] in r],
|
||
keys['title']: f"{operation} of {field}" if language != 'ar' else f"{field} ({operation})"
|
||
}
|
||
}
|
||
|
||
|
||
def generate_recommendations(model_classes, analysis_type, language='ar'):
|
||
recs = []
|
||
for model in model_classes:
|
||
for field in model._meta.fields:
|
||
if isinstance(field, ForeignKey) and not field.db_index:
|
||
msg = f"أضف db_index=True إلى {model.__name__}.{field.name}" if language == 'ar' else f"Add db_index=True to {model.__name__}.{field.name}"
|
||
recs.append(msg)
|
||
if isinstance(field, models.CharField) and not field.db_index and field.name in ['name', 'title', 'description', 'text']:
|
||
msg = f"فكر في فهرسة الحقل النصي {model.__name__}.{field.name}" if language == 'ar' else f"Consider indexing the text field {model.__name__}.{field.name}"
|
||
recs.append(msg)
|
||
return recs[:5]
|
||
|
||
|
||
def generate_model_insight(model, dealer_id=None, language='ar'):
|
||
keys = _localized_keys(language)
|
||
fields_info = [
|
||
{
|
||
'name': f.name,
|
||
'type': f.__class__.__name__,
|
||
'null': f.null,
|
||
'blank': f.blank,
|
||
'unique': f.unique,
|
||
'pk': f.primary_key
|
||
} for f in model._meta.fields
|
||
]
|
||
|
||
try:
|
||
qs = model.objects.all()
|
||
if dealer_id:
|
||
if hasattr(model, 'dealer_id'):
|
||
qs = qs.filter(dealer_id=dealer_id)
|
||
elif hasattr(model, 'dealer'):
|
||
qs = qs.filter(dealer=dealer_id)
|
||
count = qs.count()
|
||
except Exception:
|
||
count = "error"
|
||
|
||
return {
|
||
'type': keys['type'] + '_analysis',
|
||
keys['model']: model.__name__,
|
||
'fields': fields_info,
|
||
'count': count
|
||
}
|
||
|
||
|
||
def generate_relationship_insight(models, query_params=None, dealer_id=None, language='ar'):
|
||
from_ = "من" if language == 'ar' else "from"
|
||
to_ = "إلى" if language == 'ar' else "to"
|
||
rel_type = "نوع" if language == 'ar' else "type"
|
||
relationships = []
|
||
|
||
for model in models:
|
||
for field in model._meta.fields:
|
||
if isinstance(field, (ForeignKey, OneToOneField)):
|
||
relationships.append({
|
||
from_: model.__name__,
|
||
to_: field.related_model.__name__,
|
||
rel_type: field.__class__.__name__
|
||
})
|
||
for field in model._meta.many_to_many:
|
||
relationships.append({
|
||
from_: model.__name__,
|
||
to_: field.related_model.__name__,
|
||
rel_type: 'ManyToManyField'
|
||
})
|
||
|
||
return {
|
||
'type': 'تحليل_العلاقات' if language == 'ar' else 'relationship_analysis',
|
||
'relationships': relationships
|
||
}
|
||
|
||
|
||
def generate_performance_insight(models, query_params=None, dealer_id=None, language='ar'):
|
||
issues = []
|
||
for model in models:
|
||
for field in model._meta.fields:
|
||
if isinstance(field, ForeignKey) and not field.db_index:
|
||
issues.append({
|
||
'model': model.__name__,
|
||
'field': field.name,
|
||
'issue': 'Missing index on ForeignKey'
|
||
})
|
||
if isinstance(field, models.CharField) and not field.db_index and field.name in ['name', 'title']:
|
||
issues.append({
|
||
'model': model.__name__,
|
||
'field': field.name,
|
||
'issue': 'Unindexed CharField used in filtering'
|
||
})
|
||
|
||
return {
|
||
'type': 'تحليل_الأداء' if language == 'ar' else 'performance_analysis',
|
||
'issues': issues
|
||
}
|