310 lines
12 KiB
Python
310 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
AI-Powered PO File Translator using OpenRouter API
|
|
|
|
This command translates .po files using the OpenRouter API directly,
|
|
matching the project's AI service architecture. Optimized for Arabic translation
|
|
with healthcare/medical context.
|
|
|
|
Usage:
|
|
python manage.py translate_po locale/ar/LC_MESSAGES/django.po --lang Arabic
|
|
python manage.py translate_po locale/ar/LC_MESSAGES/django.po --lang Arabic --model google/gemini-2.5-flash-lite
|
|
python manage.py translate_po locale/ar/LC_MESSAGES/django.po --lang Arabic --dry-run
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import re
|
|
import httpx
|
|
import polib
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from django.core.management.base import BaseCommand
|
|
from django.conf import settings
|
|
|
|
from apps.core.ai_service import AIService
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Translate .po file entries using OpenRouter API (optimized for Arabic)"
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument("po_file_path", type=str, help="Path to the .po file")
|
|
parser.add_argument("--lang", type=str, default="Arabic", help="Target language (default: Arabic)")
|
|
parser.add_argument("--batch-size", type=int, default=5, help="Entries per API call (default: 5)")
|
|
parser.add_argument("--workers", type=int, default=2, help="Concurrent threads (default: 2)")
|
|
parser.add_argument(
|
|
"--model",
|
|
type=str,
|
|
default=None,
|
|
help="Model to use (default: from settings.AI_MODEL or AIService.DEFAULT_MODEL)",
|
|
)
|
|
parser.add_argument("--temperature", type=float, default=0.2, help="Temperature for translation (default: 0.2)")
|
|
parser.add_argument("--fix-fuzzy", action="store_true", help="Include entries marked as fuzzy")
|
|
parser.add_argument("--dry-run", action="store_true", help="Preview translations without saving")
|
|
parser.add_argument("--skip-validation", action="store_true", help="Skip Arabic text validation")
|
|
parser.add_argument(
|
|
"--context", type=str, default="healthcare", help="Context for translation (default: healthcare)"
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
# Setup configuration
|
|
self.setup_config(options)
|
|
|
|
# Load PO file
|
|
po = self.load_po_file(options["po_file_path"])
|
|
if not po:
|
|
return
|
|
|
|
# Filter entries to translate
|
|
entries_to_process = self.filter_entries(po, options["fix_fuzzy"])
|
|
total = len(entries_to_process)
|
|
|
|
if total == 0:
|
|
self.stdout.write(self.style.SUCCESS("No entries to translate."))
|
|
return
|
|
|
|
self.stdout.write(self.style.SUCCESS(f"Found {total} entries to translate to {self.target_lang}"))
|
|
self.stdout.write(f"Using model: {self.model}")
|
|
self.stdout.write(f"Batch size: {self.batch_size}, Workers: {self.workers}")
|
|
|
|
if options["dry_run"]:
|
|
self.stdout.write(self.style.WARNING("DRY RUN MODE - No changes will be saved"))
|
|
|
|
# Process batches
|
|
success_count = self.process_batches(entries_to_process, po, options)
|
|
|
|
# Save if not dry run
|
|
if not options["dry_run"]:
|
|
po.save()
|
|
self.stdout.write(self.style.SUCCESS(f"\n✓ Saved {options['po_file_path']}"))
|
|
|
|
self.stdout.write(self.style.SUCCESS(f"\nComplete! Translated {success_count}/{total} entries successfully."))
|
|
|
|
def setup_config(self, options: Dict[str, Any]) -> None:
|
|
self.api_key = AIService._get_api_key()
|
|
self.model = options["model"] or getattr(settings, "AI_MODEL", None) or AIService.DEFAULT_MODEL
|
|
self.model = AIService._strip_model_prefix(self.model)
|
|
self.temperature = options["temperature"]
|
|
self.batch_size = options["batch_size"]
|
|
self.workers = options["workers"]
|
|
self.target_lang = options["lang"]
|
|
self.context = options["context"]
|
|
self.skip_validation = options["skip_validation"]
|
|
self.dry_run = options["dry_run"]
|
|
|
|
def load_po_file(self, file_path: str) -> Optional[polib.POFile]:
|
|
"""Load and return PO file"""
|
|
self.stdout.write(f"Loading {file_path}...")
|
|
try:
|
|
po = polib.pofile(file_path)
|
|
total_entries = len(po)
|
|
translated = len([e for e in po if e.msgstr.strip() and "fuzzy" not in e.flags])
|
|
self.stdout.write(f" Total entries: {total_entries}")
|
|
self.stdout.write(f" Already translated: {translated}")
|
|
return po
|
|
except Exception as e:
|
|
self.stderr.write(self.style.ERROR(f"Could not load file: {e}"))
|
|
return None
|
|
|
|
def filter_entries(self, po: polib.POFile, fix_fuzzy: bool) -> List[polib.POEntry]:
|
|
"""Filter entries that need translation"""
|
|
entries = []
|
|
for entry in po:
|
|
if entry.obsolete:
|
|
continue
|
|
if not entry.msgstr.strip():
|
|
entries.append(entry)
|
|
elif fix_fuzzy and "fuzzy" in entry.flags:
|
|
entries.append(entry)
|
|
return entries
|
|
|
|
def process_batches(self, entries: List[polib.POEntry], po: polib.POFile, options: Dict[str, Any]) -> int:
|
|
"""Process all batches with threading"""
|
|
batches = list(self.chunked(entries, self.batch_size))
|
|
total_batches = len(batches)
|
|
success_count = 0
|
|
|
|
with ThreadPoolExecutor(max_workers=self.workers) as executor:
|
|
future_to_batch = {
|
|
executor.submit(self.process_batch, batch, i + 1, total_batches): batch
|
|
for i, batch in enumerate(batches)
|
|
}
|
|
|
|
for future in as_completed(future_to_batch):
|
|
batch = future_to_batch[future]
|
|
try:
|
|
success, msg = future.result()
|
|
if success:
|
|
success_count += len(batch)
|
|
else:
|
|
self.stderr.write(self.style.WARNING(f"Batch failed: {msg}"))
|
|
except Exception as e:
|
|
self.stderr.write(self.style.ERROR(f"Batch error: {e}"))
|
|
|
|
# Auto-save every 3 batches (if not dry run)
|
|
if not self.dry_run and (success_count // self.batch_size) % 3 == 0:
|
|
po.save()
|
|
self.stdout.write(f" Auto-saved progress...")
|
|
|
|
return success_count
|
|
|
|
def process_batch(self, batch_entries: List[polib.POEntry], batch_num: int, total_batches: int) -> Tuple[bool, str]:
|
|
"""Process a single batch of entries"""
|
|
texts = []
|
|
contexts = []
|
|
|
|
for entry in batch_entries:
|
|
# Handle plural forms
|
|
if entry.msgid_plural:
|
|
texts.append({"singular": entry.msgid, "plural": entry.msgid_plural, "context": entry.msgctxt or ""})
|
|
else:
|
|
texts.append({"text": entry.msgid, "context": entry.msgctxt or ""})
|
|
|
|
system_prompt = self.build_system_prompt()
|
|
|
|
user_prompt = self.build_user_prompt(texts)
|
|
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
messages = [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_prompt},
|
|
]
|
|
|
|
content = AIService._openrouter_completion(
|
|
model=self.model,
|
|
messages=messages,
|
|
temperature=self.temperature,
|
|
max_tokens=2000,
|
|
timeout=60,
|
|
)
|
|
|
|
translations = self.parse_response(content, len(batch_entries))
|
|
|
|
if not translations:
|
|
return False, "Failed to parse translations"
|
|
|
|
for entry, trans in zip(batch_entries, translations):
|
|
if self.dry_run:
|
|
self.stdout.write(f" [DRY-RUN] {entry.msgid[:50]}... -> {trans[:50]}...")
|
|
else:
|
|
entry.msgstr = trans
|
|
if "fuzzy" in entry.flags:
|
|
entry.flags.remove("fuzzy")
|
|
|
|
self.stdout.write(f" Batch {batch_num}/{total_batches} ✓")
|
|
return True, "Success"
|
|
|
|
except (httpx.HTTPStatusError, httpx.RequestError, httpx.TimeoutException) as e:
|
|
wait_time = 2 ** (attempt + 1)
|
|
self.stderr.write(self.style.WARNING(f" Retry {attempt + 1}/{max_retries} after {wait_time}s: {e}"))
|
|
time.sleep(wait_time)
|
|
if attempt == max_retries - 1:
|
|
return False, f"API error after retries: {e}"
|
|
except Exception as e:
|
|
return False, f"Unexpected error: {e}"
|
|
|
|
return False, "Max retries exceeded"
|
|
|
|
def build_system_prompt(self) -> str:
|
|
"""Build system prompt optimized for Arabic healthcare translation"""
|
|
base_prompt = f"""You are a professional translator specializing in {self.context} software localization.
|
|
|
|
CRITICAL RULES:
|
|
1. Translate from English to {self.target_lang}
|
|
2. Return ONLY a JSON array of translated strings
|
|
3. Preserve ALL variables exactly: %(name)s, {{variable}}, %s, etc.
|
|
4. Preserve HTML tags (<strong>, <a>, etc.) - do not translate them
|
|
5. Preserve newlines (\\n) and formatting
|
|
6. Maintain the same array length and order as input
|
|
"""
|
|
|
|
# Add Arabic-specific instructions
|
|
if self.target_lang.lower() in ["arabic", "ar"]:
|
|
base_prompt += """
|
|
|
|
ARABIC TRANSLATION REQUIREMENTS:
|
|
- Use Modern Standard Arabic (الفصحى) - formal, professional
|
|
- Use healthcare/medical terminology appropriate for Saudi Arabian hospitals
|
|
- Maintain professional, respectful tone suitable for patient experience management
|
|
- Ensure grammatical correctness and natural flow
|
|
- Do not use colloquial Arabic (العامية)
|
|
- Numbers and dates should follow Arabic conventions where appropriate
|
|
- Keep English technical terms if no standard Arabic equivalent exists (e.g., API, URL)
|
|
"""
|
|
|
|
return base_prompt
|
|
|
|
def build_user_prompt(self, texts: List[Dict]) -> str:
|
|
"""Build user prompt with texts to translate"""
|
|
return f"""Translate these texts to {self.target_lang}.
|
|
|
|
Input format: JSON array with text and optional context
|
|
Output format: JSON array of translated strings only
|
|
|
|
Texts to translate:
|
|
{json.dumps(texts, ensure_ascii=False, indent=2)}
|
|
|
|
Return ONLY the JSON array of translations."""
|
|
|
|
def parse_response(self, content: str, expected_count: int) -> Optional[List[str]]:
|
|
"""Parse and validate API response"""
|
|
try:
|
|
# Clean markdown code blocks
|
|
content = content.strip()
|
|
if content.startswith("```json"):
|
|
content = content[7:]
|
|
elif content.startswith("```"):
|
|
content = content[3:]
|
|
if content.endswith("```"):
|
|
content = content[:-3]
|
|
content = content.strip()
|
|
|
|
translations = json.loads(content)
|
|
|
|
# Validate
|
|
if not isinstance(translations, list):
|
|
self.stderr.write(self.style.ERROR("Response is not a JSON array"))
|
|
return None
|
|
|
|
if len(translations) != expected_count:
|
|
self.stderr.write(
|
|
self.style.ERROR(f"Count mismatch: expected {expected_count}, got {len(translations)}")
|
|
)
|
|
return None
|
|
|
|
# Validate Arabic text if needed
|
|
if not self.skip_validation and self.target_lang.lower() in ["arabic", "ar"]:
|
|
translations = [self.validate_arabic(t) for t in translations]
|
|
|
|
return translations
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.stderr.write(self.style.ERROR(f"JSON parse error: {e}"))
|
|
return None
|
|
|
|
def validate_arabic(self, text: str) -> str:
|
|
"""Validate and clean Arabic text"""
|
|
if not text:
|
|
return text
|
|
|
|
# Check for Arabic characters
|
|
arabic_pattern = re.compile(r"[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]")
|
|
|
|
if not arabic_pattern.search(text):
|
|
# No Arabic characters found - mark for review
|
|
self.stderr.write(self.style.WARNING(f"Translation may not be in Arabic: {text[:50]}..."))
|
|
|
|
# Clean excessive whitespace
|
|
text = re.sub(r"\s+", " ", text).strip()
|
|
|
|
return text
|
|
|
|
@staticmethod
|
|
def chunked(iterable: List, n: int):
|
|
"""Split iterable into chunks of size n"""
|
|
for i in range(0, len(iterable), n):
|
|
yield iterable[i : i + n]
|