haikal/inventory/management/commands/generate_slugs.py
2025-06-22 13:25:54 +03:00

125 lines
4.5 KiB
Python

from inventory.models import *
from django.core.management.base import BaseCommand
from django.db import transaction, models
from django.utils.text import slugify
from django.db.models import Case, When, Value
class Command(BaseCommand):
help = "Generate slugs for model instances with proper empty value handling"
def add_arguments(self, parser):
parser.add_argument(
"--model",
type=str,
required=True,
help='Model name (format: "app_label.ModelName")',
)
parser.add_argument(
"--field",
type=str,
default="name",
help='Field to use as slug source (default: "name")',
)
parser.add_argument(
"--batch-size",
type=int,
default=1000,
help="Number of records to process at once (default: 1000)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Test without actually saving changes",
)
parser.add_argument(
"--fill-empty",
action="store_true",
help="Fill empty slugs with model-ID when source field is empty",
)
def handle(self, *args, **options):
model = self.get_model(options["model"])
source_field = options["field"]
batch_size = options["batch_size"]
dry_run = options["dry_run"]
fill_empty = options["fill_empty"]
queryset = model.objects.filter(models.Q(slug__isnull=True) | models.Q(slug=""))
total_count = queryset.count()
processed = 0
empty_source = 0
self.stdout.write(
self.style.SUCCESS(
f"Generating slugs for {total_count} {model._meta.model_name} records "
f'using field "{source_field}" (batch size: {batch_size})'
)
)
with transaction.atomic():
if dry_run:
self.stdout.write(
self.style.WARNING("DRY RUN - No changes will be saved")
)
transaction.set_rollback(True)
for offset in range(0, total_count, batch_size):
batch = queryset[offset : offset + batch_size]
updates = []
for obj in batch:
source_value = getattr(obj, source_field, "")
if not source_value:
if fill_empty:
# Fallback to model-ID when source field is empty
new_slug = f"{model._meta.model_name.lower()}-{obj.pk}"
empty_source += 1
else:
self.stdout.write(
self.style.WARNING(
f"Skipping {obj} (empty {source_field})"
)
)
continue
else:
slug_base = slugify(str(source_value))[
:50
] # Ensure string and truncate
new_slug = f"{slug_base}-{obj.pk}" # Guaranteed unique
updates.append((obj.pk, new_slug))
processed += 1
if updates and not dry_run:
cases = [When(pk=pk, then=Value(slug)) for pk, slug in updates]
model.objects.filter(pk__in=[u[0] for u in updates]).update(
slug=Case(*cases, output_field=models.CharField())
)
self.stdout.write(
f"Processed batch {offset // batch_size + 1}: "
f"{min(offset + batch_size, total_count)}/{total_count}"
)
stats = [
f"Total processed: {processed}",
f"Records with empty source field: {empty_source}",
f"Skipped records: {total_count - processed - empty_source}",
]
self.stdout.write(self.style.SUCCESS("\n".join(stats)))
def get_model(self, model_path):
"""Get model class from 'app_label.ModelName' string"""
from django.apps import apps
try:
app_label, model_name = model_path.split(".")
return apps.get_model(app_label, model_name)
except ValueError:
raise self.style.ERROR('Model must be specified as "app_label.ModelName"')
except LookupError as e:
raise self.style.ERROR(f"Model not found: {e}")