276 lines
11 KiB
Python
276 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
retranslate_lang.py
|
|
|
|
Retranslates ALL posts for a given language (ID > 42760) from their Spanish originals.
|
|
Uses chunk-based translation (~800 chars per chunk) to avoid model drift.
|
|
Sequential, single process.
|
|
|
|
Usage: python3 retranslate_lang.py fr
|
|
python3 retranslate_lang.py it
|
|
python3 retranslate_lang.py pt
|
|
"""
|
|
|
|
import pymysql, json, re, html, urllib.request, time, sys
|
|
from langdetect import detect, LangDetectException, DetectorFactory
|
|
DetectorFactory.seed = 0
|
|
|
|
JAN_URL = "http://172.19.128.1:1337/v1/chat/completions"
|
|
JAN_MODEL = "gemma-3-12b-it-Q4_K_M"
|
|
|
|
DB = dict(host='172.18.0.2', port=3306, user='wordpress_user',
|
|
password='wordpress_pass', database='wordpress_db', charset='utf8mb4',
|
|
cursorclass=pymysql.cursors.DictCursor)
|
|
|
|
LANG_CONFIG = {
|
|
"en": {"name": "English", "footer": "<p><em>English version translated with AI</em></p>"},
|
|
"fr": {"name": "French", "footer": "<p><em>Version française traduite par IA</em></p>"},
|
|
"it": {"name": "Italian", "footer": "<p><em>Versione italiana tradotta con IA</em></p>"},
|
|
"pt": {"name": "Portuguese", "footer": "<p><em>Versão portuguesa traduzida com IA</em></p>"},
|
|
}
|
|
|
|
CHUNK_SIZE = 800
|
|
MAX_RETRIES = 2
|
|
|
|
|
|
def strip_html(text):
|
|
if not text: return ''
|
|
text = re.sub(r'<[^>]+>', ' ', text)
|
|
text = html.unescape(text)
|
|
return re.sub(r'\s+', ' ', text).strip()
|
|
|
|
|
|
def detect_lang(text, min_len=40):
|
|
t = strip_html(text)[:400].strip()
|
|
if len(t) < min_len: return None
|
|
try: return detect(t)
|
|
except: return None
|
|
|
|
|
|
def call_jan(messages, max_tokens=1200, temperature=0.2, timeout=150):
|
|
payload = json.dumps({
|
|
"model": JAN_MODEL, "messages": messages,
|
|
"temperature": temperature, "max_tokens": max_tokens,
|
|
}).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
JAN_URL, data=payload,
|
|
headers={"Content-Type": "application/json", "Authorization": "Bearer dummy"},
|
|
method="POST"
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as r:
|
|
return json.loads(r.read())["choices"][0]["message"]["content"].strip()
|
|
|
|
|
|
def fix_html_structure(content):
|
|
"""Fix common model errors: markdown bold → HTML, orphaned text → <p> wrapped,
|
|
unclosed <p> before a new <p>."""
|
|
# **text** → <p><strong>text</strong></p>
|
|
content = re.sub(r'\*\*(.+?)\*\*',
|
|
lambda m: '<p><strong>' + m.group(1).strip() + '</strong></p>',
|
|
content)
|
|
# Lines of bare text not inside any block tag → wrap in <p>
|
|
lines = content.split('\n')
|
|
fixed = []
|
|
for line in lines:
|
|
s = line.strip()
|
|
if s and not s.startswith('<') and not s.startswith('<!--'):
|
|
fixed.append('<p>' + s + '</p>')
|
|
else:
|
|
fixed.append(line)
|
|
content = '\n'.join(fixed)
|
|
# Clean up doubled closing tags
|
|
content = re.sub(r'</p>\s*</p>', '</p>', content)
|
|
# Fix unclosed <p>: text not ending in block tag followed by \n\n<p>
|
|
content = re.sub(r'([^>])\n\n(<p[> ])', r'\1</p>\n\n\2', content)
|
|
# Fix nested <em> inside a quote: <em>"..."(n. <em>18).</em> → <em>"..."(n. 18).</em>
|
|
content = re.sub(r'\(n\.\s*<em>(\d+\)\.)</em>', r'(n. \1</em>', content)
|
|
# Generic: remove extra </em> after </p> if em tags unbalanced
|
|
opens = len(re.findall(r'<em[ >]', content))
|
|
closes = len(re.findall(r'</em>', content))
|
|
if opens < closes:
|
|
# Remove extra closing tags
|
|
for _ in range(closes - opens):
|
|
content = content.replace('</em></p>', '</p>', 1)
|
|
elif opens > closes:
|
|
# Add missing closing tag before </p> of last unbalanced paragraph
|
|
content = re.sub(r'(<em>[^<]*(?:<(?!/em>)[^<]*)*)\n\n<p', r'\1</em>\n\n<p', content)
|
|
return content
|
|
|
|
|
|
def translate_chunk(chunk, lang_name, attempt=0):
|
|
prompts = [
|
|
f"You are a professional translator. Translate the following Spanish text to {lang_name}. Preserve all HTML tags exactly. Return ONLY the translated text, no preamble, no explanation.",
|
|
f"Translate from Spanish to {lang_name}. Your entire response must be in {lang_name}. Preserve HTML tags. Return ONLY the translation, nothing else.",
|
|
]
|
|
result = call_jan([
|
|
{"role": "system", "content": prompts[min(attempt, 1)]},
|
|
{"role": "user", "content": chunk}
|
|
])
|
|
# Short chunks: retry if output == input (model didn't translate)
|
|
plain_in = strip_html(chunk).strip().lower()
|
|
plain_out = strip_html(result).strip().lower()
|
|
if len(plain_in) < 40 and plain_in == plain_out and attempt == 0:
|
|
return translate_chunk(chunk, lang_name, attempt=1)
|
|
return result
|
|
|
|
|
|
def translate_title(es_title, lang_name):
|
|
try:
|
|
result = call_jan([
|
|
{"role": "system", "content": "You are a translator. Respond ONLY with the translated text, nothing else."},
|
|
{"role": "user", "content": f"Translate from Spanish to {lang_name}, ALL CAPS:\n\n{es_title}"}
|
|
], max_tokens=150, temperature=0.1, timeout=30)
|
|
result = result.strip().strip('"').strip("'")
|
|
if result.upper() == es_title.upper():
|
|
return es_title
|
|
return result
|
|
except:
|
|
return es_title
|
|
|
|
|
|
def split_chunks(content):
|
|
parts = re.split(r'(</p>|</li>|</h[1-6]>|</blockquote>)', content)
|
|
chunks, current = [], ""
|
|
for i in range(0, len(parts), 2):
|
|
segment = parts[i] + (parts[i+1] if i+1 < len(parts) else "")
|
|
if len(current) + len(segment) <= CHUNK_SIZE:
|
|
current += segment
|
|
else:
|
|
if current: chunks.append(current)
|
|
if len(segment) > CHUNK_SIZE:
|
|
sentences = re.split(r'(?<=[.!?])\s+', segment)
|
|
current = ""
|
|
for s in sentences:
|
|
if len(current) + len(s) <= CHUNK_SIZE:
|
|
current += s + " "
|
|
else:
|
|
if current: chunks.append(current.strip())
|
|
current = s + " "
|
|
else:
|
|
current = segment
|
|
if current: chunks.append(current)
|
|
return [c for c in chunks if strip_html(c).strip()]
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 2 or sys.argv[1] not in LANG_CONFIG:
|
|
print(f"Usage: python3 {sys.argv[0]} [fr|it|pt|en]")
|
|
sys.exit(1)
|
|
|
|
lang = sys.argv[1]
|
|
lang_name = LANG_CONFIG[lang]["name"]
|
|
footer = LANG_CONFIG[lang]["footer"]
|
|
|
|
db = pymysql.connect(**DB)
|
|
c = db.cursor()
|
|
|
|
c.execute("""
|
|
SELECT DISTINCT p.ID, p.post_title,
|
|
ttg.description as group_desc
|
|
FROM wp_posts p
|
|
JOIN wp_term_relationships trl ON p.ID=trl.object_id
|
|
JOIN wp_term_taxonomy ttl ON trl.term_taxonomy_id=ttl.term_taxonomy_id AND ttl.taxonomy='language'
|
|
JOIN wp_terms t_lang ON ttl.term_id=t_lang.term_id AND t_lang.slug=%s
|
|
JOIN wp_term_relationships trg ON p.ID=trg.object_id
|
|
JOIN wp_term_taxonomy ttg ON trg.term_taxonomy_id=ttg.term_taxonomy_id AND ttg.taxonomy='post_translations'
|
|
WHERE p.ID > 42760 AND p.post_type='post' AND p.post_status='publish'
|
|
ORDER BY p.ID
|
|
""", (lang,))
|
|
posts = c.fetchall()
|
|
print(f"Found {len(posts)} {lang_name} posts to retranslate\n", flush=True)
|
|
|
|
done = errors = skipped = 0
|
|
|
|
for n, p in enumerate(posts, 1):
|
|
post_id = p['ID']
|
|
desc = p['group_desc'] or ''
|
|
m = re.search(r's:2:"es";i:(\d+);', desc)
|
|
if not m:
|
|
print(f"[{n}/{len(posts)}] {post_id} — SKIP (no ES original)", flush=True)
|
|
skipped += 1
|
|
continue
|
|
|
|
es_id = int(m.group(1))
|
|
c.execute("SELECT post_title, post_content FROM wp_posts WHERE ID=%s", (es_id,))
|
|
es = c.fetchone()
|
|
if not es or not es['post_content']:
|
|
print(f"[{n}/{len(posts)}] {post_id} — SKIP (ES:{es_id} empty)", flush=True)
|
|
skipped += 1
|
|
continue
|
|
|
|
es_title = es['post_title'] or ''
|
|
es_content = es['post_content']
|
|
plain_len = len(strip_html(es_content))
|
|
chunks = split_chunks(es_content)
|
|
|
|
print(f"\n[{n}/{len(posts)}] WP:{post_id} ← ES:{es_id} — {es_title[:50]}", flush=True)
|
|
print(f" {plain_len} chars, {len(chunks)} chunks", flush=True)
|
|
|
|
if plain_len < 50:
|
|
print(f" SKIP (too short)", flush=True)
|
|
skipped += 1
|
|
continue
|
|
|
|
try:
|
|
t0 = time.time()
|
|
|
|
t_title = translate_title(es_title, lang_name)
|
|
|
|
translated = []
|
|
chunk_bad = 0
|
|
for i, chunk in enumerate(chunks):
|
|
try:
|
|
result = translate_chunk(chunk, lang_name, attempt=0)
|
|
detected = detect_lang(result, min_len=40)
|
|
|
|
if detected and detected != lang and len(strip_html(result)) >= 40:
|
|
result2 = translate_chunk(chunk, lang_name, attempt=1)
|
|
detected2 = detect_lang(result2, min_len=40)
|
|
if detected2 == lang or detected2 is None:
|
|
result = result2
|
|
else:
|
|
chunk_bad += 1
|
|
translated.append(result)
|
|
except Exception as e:
|
|
print(f" chunk {i+1} ERROR: {e}", flush=True)
|
|
translated.append(chunk)
|
|
chunk_bad += 1
|
|
|
|
t_content = fix_html_structure("\n".join(translated))
|
|
# Remove any old footer variants before adding the correct one
|
|
for old in ["<p><em>Traducido con IA</em></p>",
|
|
"<p><em>English version translated with AI</em></p>",
|
|
"<p><em>Version française traduite par IA</em></p>",
|
|
"<p><em>Versione italiana tradotta con IA</em></p>",
|
|
"<p><em>Versão portuguesa traduzida com IA</em></p>"]:
|
|
t_content = t_content.replace(old, "")
|
|
t_content = t_content.rstrip() + "\n" + footer
|
|
|
|
elapsed = time.time() - t0
|
|
lang_ok = detect_lang(t_content, min_len=80) in (lang, None)
|
|
status = "✓" if lang_ok else "⚠"
|
|
bad_note = f" ({chunk_bad} chunks bad)" if chunk_bad else ""
|
|
|
|
db2 = pymysql.connect(**DB)
|
|
c2 = db2.cursor()
|
|
c2.execute("UPDATE wp_posts SET post_title=%s, post_content=%s WHERE ID=%s",
|
|
(t_title, t_content, post_id))
|
|
db2.commit()
|
|
db2.close()
|
|
|
|
print(f" {status} {t_title[:60]} ({elapsed:.0f}s){bad_note}", flush=True)
|
|
done += 1
|
|
|
|
except Exception as e:
|
|
print(f" ✗ ERROR: {e}", flush=True)
|
|
errors += 1
|
|
|
|
db.close()
|
|
print(f"\n{'='*50}")
|
|
print(f"Done: {done} ✓ errors: {errors} ✗ skipped: {skipped}")
|
|
print(f"Total: {len(posts)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|