159 lines
6.4 KiB
Python

import os
import json
import time
import polib
from concurrent.futures import ThreadPoolExecutor, as_completed
from django.core.management.base import BaseCommand
from django.conf import settings
from openai import OpenAI, APIConnectionError, RateLimitError
# Get API Key from settings or environment
API_KEY = "8319706a96014c5099b44057d231a154.YfbEMn17ZWXPudxK"
# API_KEY = getattr(settings, 'ZAI_API_KEY', os.environ.get('ZAI_API_KEY'))
class Command(BaseCommand):
help = 'Translate or fix fuzzy entries in a .po file using Z.ai (GLM) via OpenAI SDK'
def add_arguments(self, parser):
parser.add_argument('po_file_path', type=str, help='Path to the .po file')
parser.add_argument('--lang', type=str, help='Target language (e.g., "Chinese", "French")', required=True)
parser.add_argument('--batch-size', type=int, default=10, help='Entries per API call (default: 10)')
parser.add_argument('--workers', type=int, default=3, help='Concurrent threads (default: 3)')
parser.add_argument('--model', type=str, default="glm-4.6", help='Model version (default: glm-4.6)')
parser.add_argument('--fix-fuzzy', action='store_true', help='Include entries marked as fuzzy')
def handle(self, *args, **options):
if not API_KEY:
self.stderr.write(self.style.ERROR("Error: ZAI_API_KEY not found in settings or environment."))
return
# 1. Initialize Client based on your docs
client = OpenAI(
api_key=API_KEY,
#base_url="https://api.z.ai/api/paas/v4/"
base_url="https://api.z.ai/api/coding/paas/v4"
)
file_path = options['po_file_path']
target_lang = options['lang']
batch_size = options['batch_size']
max_workers = options['workers']
model_name = options['model']
fix_fuzzy = options['fix_fuzzy']
# 2. Load PO File
self.stdout.write(f"Loading {file_path}...")
try:
po = polib.pofile(file_path)
except Exception as e:
self.stderr.write(self.style.ERROR(f"Could not load file: {e}"))
return
# 3. Filter Entries
entries_to_process = []
for entry in po:
if entry.obsolete:
continue
if not entry.msgstr.strip() or (fix_fuzzy and 'fuzzy' in entry.flags):
entries_to_process.append(entry)
total = len(entries_to_process)
self.stdout.write(self.style.SUCCESS(f"Found {total} entries to process."))
if total == 0:
return
# 4. Batch Processing Logic
def chunked(iterable, n):
for i in range(0, len(iterable), n):
yield iterable[i:i + n]
batches = list(chunked(entries_to_process, batch_size))
self.stdout.write(f"Processing {len(batches)} batches with model {model_name}...")
# 5. Worker Function with Retry Logic
def process_batch(batch_entries):
texts = [e.msgid for e in batch_entries]
system_prompt = (
"You are a professional localization expert for a Django software project. "
"You will receive a JSON list of English strings. "
"Translate them accurately. "
"IMPORTANT Rules:\n"
"1. Return ONLY a JSON list of strings.\n"
"2. Preserve all Python variables (e.g. %(count)s, {name}, %s) exactly.\n"
"3. Do not translate HTML tags.\n"
"4. Do not explain, just return the JSON."
)
user_prompt = (
f"Translate these texts into {target_lang}:\n"
f"{json.dumps(texts, ensure_ascii=False)}"
)
# Simple retry loop for Rate Limits
attempts = 0
max_retries = 3
while attempts < max_retries:
try:
completion = client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.1
)
content = completion.choices[0].message.content
# Sanitize markdown code blocks
content = content.replace('```json', '').replace('```', '').strip()
translations = json.loads(content)
if len(translations) != len(batch_entries):
return False, f"Mismatch: sent {len(batch_entries)}, got {len(translations)}"
# Update entries
for entry, trans in zip(batch_entries, translations):
entry.msgstr = trans
if 'fuzzy' in entry.flags:
entry.flags.remove('fuzzy')
return True, "Success"
except (RateLimitError, APIConnectionError) as e:
attempts += 1
wait_time = 2 ** attempts # Exponential backoff: 2s, 4s, 8s...
time.sleep(wait_time)
if attempts == max_retries:
return False, f"API Error after retries: {e}"
except json.JSONDecodeError:
return False, "AI returned invalid JSON"
except Exception as e:
return False, str(e)
# 6. Execution & Incremental Saving
success_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_batch = {executor.submit(process_batch, batch): batch for batch in batches}
for i, future in enumerate(as_completed(future_to_batch)):
batch = future_to_batch[future]
success, msg = future.result()
if success:
success_count += len(batch)
self.stdout.write(self.style.SUCCESS(f"Batch {i+1}/{len(batches)} done."))
else:
self.stderr.write(self.style.WARNING(f"Batch {i+1} failed: {msg}"))
# Save every 5 batches to be safe
if (i + 1) % 5 == 0:
po.save()
self.stdout.write(f"--- Auto-saved at batch {i+1} ---")
# Final Save
po.save()
self.stdout.write(self.style.SUCCESS(f"\nComplete! Translated {success_count}/{total} entries."))