haikal/inventory/management/commands/generate_slugs.py
2025-05-18 20:43:33 +03:00

120 lines
4.4 KiB
Python

from inventory.models import *
from django.core.management.base import BaseCommand
from django.db import transaction, models
from django.utils.text import slugify
from django.db.models import Case, When, Value
class Command(BaseCommand):
help = 'Generate slugs for model instances with proper empty value handling'
def add_arguments(self, parser):
parser.add_argument(
'--model',
type=str,
required=True,
help='Model name (format: "app_label.ModelName")'
)
parser.add_argument(
'--field',
type=str,
default='name',
help='Field to use as slug source (default: "name")'
)
parser.add_argument(
'--batch-size',
type=int,
default=1000,
help='Number of records to process at once (default: 1000)'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Test without actually saving changes'
)
parser.add_argument(
'--fill-empty',
action='store_true',
help='Fill empty slugs with model-ID when source field is empty'
)
def handle(self, *args, **options):
model = self.get_model(options['model'])
source_field = options['field']
batch_size = options['batch_size']
dry_run = options['dry_run']
fill_empty = options['fill_empty']
queryset = model.objects.filter(models.Q(slug__isnull=True) | models.Q(slug=''))
total_count = queryset.count()
processed = 0
empty_source = 0
self.stdout.write(
self.style.SUCCESS(
f'Generating slugs for {total_count} {model._meta.model_name} records '
f'using field "{source_field}" (batch size: {batch_size})'
)
)
with transaction.atomic():
if dry_run:
self.stdout.write(self.style.WARNING('DRY RUN - No changes will be saved'))
transaction.set_rollback(True)
for offset in range(0, total_count, batch_size):
batch = queryset[offset:offset + batch_size]
updates = []
for obj in batch:
source_value = getattr(obj, source_field, '')
if not source_value:
if fill_empty:
# Fallback to model-ID when source field is empty
new_slug = f"{model._meta.model_name.lower()}-{obj.pk}"
empty_source += 1
else:
self.stdout.write(
self.style.WARNING(
f'Skipping {obj} (empty {source_field})'
)
)
continue
else:
slug_base = slugify(str(source_value))[:50] # Ensure string and truncate
new_slug = f"{slug_base}-{obj.pk}" # Guaranteed unique
updates.append((obj.pk, new_slug))
processed += 1
if updates and not dry_run:
cases = [When(pk=pk, then=Value(slug)) for pk, slug in updates]
model.objects.filter(pk__in=[u[0] for u in updates]).update(
slug=Case(*cases, output_field=models.CharField())
)
self.stdout.write(
f'Processed batch {offset//batch_size + 1}: '
f'{min(offset + batch_size, total_count)}/{total_count}'
)
stats = [
f"Total processed: {processed}",
f"Records with empty source field: {empty_source}",
f"Skipped records: {total_count - processed - empty_source}"
]
self.stdout.write(
self.style.SUCCESS('\n'.join(stats))
)
def get_model(self, model_path):
"""Get model class from 'app_label.ModelName' string"""
from django.apps import apps
try:
app_label, model_name = model_path.split('.')
return apps.get_model(app_label, model_name)
except ValueError:
raise self.style.ERROR('Model must be specified as "app_label.ModelName"')
except LookupError as e:
raise self.style.ERROR(f'Model not found: {e}')