Files
feadulta/scripts/retranslate_lang.py

276 lines
11 KiB
Python

#!/usr/bin/env python3
"""
retranslate_lang.py
Retranslates ALL posts for a given language (ID > 42760) from their Spanish originals.
Uses chunk-based translation (~800 chars per chunk) to avoid model drift.
Sequential, single process.
Usage: python3 retranslate_lang.py fr
python3 retranslate_lang.py it
python3 retranslate_lang.py pt
"""
import pymysql, json, re, html, urllib.request, time, sys
from langdetect import detect, LangDetectException, DetectorFactory
DetectorFactory.seed = 0
JAN_URL = "http://172.19.128.1:1337/v1/chat/completions"
JAN_MODEL = "gemma-3-12b-it-Q4_K_M"
DB = dict(host='172.18.0.2', port=3306, user='wordpress_user',
password='wordpress_pass', database='wordpress_db', charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
LANG_CONFIG = {
"en": {"name": "English", "footer": "<p><em>English version translated with AI</em></p>"},
"fr": {"name": "French", "footer": "<p><em>Version française traduite par IA</em></p>"},
"it": {"name": "Italian", "footer": "<p><em>Versione italiana tradotta con IA</em></p>"},
"pt": {"name": "Portuguese", "footer": "<p><em>Versão portuguesa traduzida com IA</em></p>"},
}
CHUNK_SIZE = 800
MAX_RETRIES = 2
def strip_html(text):
if not text: return ''
text = re.sub(r'<[^>]+>', ' ', text)
text = html.unescape(text)
return re.sub(r'\s+', ' ', text).strip()
def detect_lang(text, min_len=40):
t = strip_html(text)[:400].strip()
if len(t) < min_len: return None
try: return detect(t)
except: return None
def call_jan(messages, max_tokens=1200, temperature=0.2, timeout=150):
payload = json.dumps({
"model": JAN_MODEL, "messages": messages,
"temperature": temperature, "max_tokens": max_tokens,
}).encode("utf-8")
req = urllib.request.Request(
JAN_URL, data=payload,
headers={"Content-Type": "application/json", "Authorization": "Bearer dummy"},
method="POST"
)
with urllib.request.urlopen(req, timeout=timeout) as r:
return json.loads(r.read())["choices"][0]["message"]["content"].strip()
def fix_html_structure(content):
"""Fix common model errors: markdown bold → HTML, orphaned text → <p> wrapped,
unclosed <p> before a new <p>."""
# **text** → <p><strong>text</strong></p>
content = re.sub(r'\*\*(.+?)\*\*',
lambda m: '<p><strong>' + m.group(1).strip() + '</strong></p>',
content)
# Lines of bare text not inside any block tag → wrap in <p>
lines = content.split('\n')
fixed = []
for line in lines:
s = line.strip()
if s and not s.startswith('<') and not s.startswith('<!--'):
fixed.append('<p>' + s + '</p>')
else:
fixed.append(line)
content = '\n'.join(fixed)
# Clean up doubled closing tags
content = re.sub(r'</p>\s*</p>', '</p>', content)
# Fix unclosed <p>: text not ending in block tag followed by \n\n<p>
content = re.sub(r'([^>])\n\n(<p[> ])', r'\1</p>\n\n\2', content)
# Fix nested <em> inside a quote: <em>"..."(n. <em>18).</em> → <em>"..."(n. 18).</em>
content = re.sub(r'\(n\.\s*<em>(\d+\)\.)</em>', r'(n. \1</em>', content)
# Generic: remove extra </em> after </p> if em tags unbalanced
opens = len(re.findall(r'<em[ >]', content))
closes = len(re.findall(r'</em>', content))
if opens < closes:
# Remove extra closing tags
for _ in range(closes - opens):
content = content.replace('</em></p>', '</p>', 1)
elif opens > closes:
# Add missing closing tag before </p> of last unbalanced paragraph
content = re.sub(r'(<em>[^<]*(?:<(?!/em>)[^<]*)*)\n\n<p', r'\1</em>\n\n<p', content)
return content
def translate_chunk(chunk, lang_name, attempt=0):
prompts = [
f"You are a professional translator. Translate the following Spanish text to {lang_name}. Preserve all HTML tags exactly. Return ONLY the translated text, no preamble, no explanation.",
f"Translate from Spanish to {lang_name}. Your entire response must be in {lang_name}. Preserve HTML tags. Return ONLY the translation, nothing else.",
]
result = call_jan([
{"role": "system", "content": prompts[min(attempt, 1)]},
{"role": "user", "content": chunk}
])
# Short chunks: retry if output == input (model didn't translate)
plain_in = strip_html(chunk).strip().lower()
plain_out = strip_html(result).strip().lower()
if len(plain_in) < 40 and plain_in == plain_out and attempt == 0:
return translate_chunk(chunk, lang_name, attempt=1)
return result
def translate_title(es_title, lang_name):
try:
result = call_jan([
{"role": "system", "content": "You are a translator. Respond ONLY with the translated text, nothing else."},
{"role": "user", "content": f"Translate from Spanish to {lang_name}, ALL CAPS:\n\n{es_title}"}
], max_tokens=150, temperature=0.1, timeout=30)
result = result.strip().strip('"').strip("'")
if result.upper() == es_title.upper():
return es_title
return result
except:
return es_title
def split_chunks(content):
parts = re.split(r'(</p>|</li>|</h[1-6]>|</blockquote>)', content)
chunks, current = [], ""
for i in range(0, len(parts), 2):
segment = parts[i] + (parts[i+1] if i+1 < len(parts) else "")
if len(current) + len(segment) <= CHUNK_SIZE:
current += segment
else:
if current: chunks.append(current)
if len(segment) > CHUNK_SIZE:
sentences = re.split(r'(?<=[.!?])\s+', segment)
current = ""
for s in sentences:
if len(current) + len(s) <= CHUNK_SIZE:
current += s + " "
else:
if current: chunks.append(current.strip())
current = s + " "
else:
current = segment
if current: chunks.append(current)
return [c for c in chunks if strip_html(c).strip()]
def main():
if len(sys.argv) < 2 or sys.argv[1] not in LANG_CONFIG:
print(f"Usage: python3 {sys.argv[0]} [fr|it|pt|en]")
sys.exit(1)
lang = sys.argv[1]
lang_name = LANG_CONFIG[lang]["name"]
footer = LANG_CONFIG[lang]["footer"]
db = pymysql.connect(**DB)
c = db.cursor()
c.execute("""
SELECT DISTINCT p.ID, p.post_title,
ttg.description as group_desc
FROM wp_posts p
JOIN wp_term_relationships trl ON p.ID=trl.object_id
JOIN wp_term_taxonomy ttl ON trl.term_taxonomy_id=ttl.term_taxonomy_id AND ttl.taxonomy='language'
JOIN wp_terms t_lang ON ttl.term_id=t_lang.term_id AND t_lang.slug=%s
JOIN wp_term_relationships trg ON p.ID=trg.object_id
JOIN wp_term_taxonomy ttg ON trg.term_taxonomy_id=ttg.term_taxonomy_id AND ttg.taxonomy='post_translations'
WHERE p.ID > 42760 AND p.post_type='post' AND p.post_status='publish'
ORDER BY p.ID
""", (lang,))
posts = c.fetchall()
print(f"Found {len(posts)} {lang_name} posts to retranslate\n", flush=True)
done = errors = skipped = 0
for n, p in enumerate(posts, 1):
post_id = p['ID']
desc = p['group_desc'] or ''
m = re.search(r's:2:"es";i:(\d+);', desc)
if not m:
print(f"[{n}/{len(posts)}] {post_id} — SKIP (no ES original)", flush=True)
skipped += 1
continue
es_id = int(m.group(1))
c.execute("SELECT post_title, post_content FROM wp_posts WHERE ID=%s", (es_id,))
es = c.fetchone()
if not es or not es['post_content']:
print(f"[{n}/{len(posts)}] {post_id} — SKIP (ES:{es_id} empty)", flush=True)
skipped += 1
continue
es_title = es['post_title'] or ''
es_content = es['post_content']
plain_len = len(strip_html(es_content))
chunks = split_chunks(es_content)
print(f"\n[{n}/{len(posts)}] WP:{post_id} ← ES:{es_id}{es_title[:50]}", flush=True)
print(f" {plain_len} chars, {len(chunks)} chunks", flush=True)
if plain_len < 50:
print(f" SKIP (too short)", flush=True)
skipped += 1
continue
try:
t0 = time.time()
t_title = translate_title(es_title, lang_name)
translated = []
chunk_bad = 0
for i, chunk in enumerate(chunks):
try:
result = translate_chunk(chunk, lang_name, attempt=0)
detected = detect_lang(result, min_len=40)
if detected and detected != lang and len(strip_html(result)) >= 40:
result2 = translate_chunk(chunk, lang_name, attempt=1)
detected2 = detect_lang(result2, min_len=40)
if detected2 == lang or detected2 is None:
result = result2
else:
chunk_bad += 1
translated.append(result)
except Exception as e:
print(f" chunk {i+1} ERROR: {e}", flush=True)
translated.append(chunk)
chunk_bad += 1
t_content = fix_html_structure("\n".join(translated))
# Remove any old footer variants before adding the correct one
for old in ["<p><em>Traducido con IA</em></p>",
"<p><em>English version translated with AI</em></p>",
"<p><em>Version française traduite par IA</em></p>",
"<p><em>Versione italiana tradotta con IA</em></p>",
"<p><em>Versão portuguesa traduzida com IA</em></p>"]:
t_content = t_content.replace(old, "")
t_content = t_content.rstrip() + "\n" + footer
elapsed = time.time() - t0
lang_ok = detect_lang(t_content, min_len=80) in (lang, None)
status = "✓" if lang_ok else "⚠"
bad_note = f" ({chunk_bad} chunks bad)" if chunk_bad else ""
db2 = pymysql.connect(**DB)
c2 = db2.cursor()
c2.execute("UPDATE wp_posts SET post_title=%s, post_content=%s WHERE ID=%s",
(t_title, t_content, post_id))
db2.commit()
db2.close()
print(f" {status} {t_title[:60]} ({elapsed:.0f}s){bad_note}", flush=True)
done += 1
except Exception as e:
print(f" ✗ ERROR: {e}", flush=True)
errors += 1
db.close()
print(f"\n{'='*50}")
print(f"Done: {done} ✓ errors: {errors} ✗ skipped: {skipped}")
print(f"Total: {len(posts)}")
if __name__ == "__main__":
main()