Files
feadulta/scripts/import_public_joomla_delta.py
T

518 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Importa a WordPress local el delta visible en Joomla produccion usando HTML publico.
Ruta de contingencia para cuando no hay SSH/DB a produccion. Conserva los IDs
Joomla en `_fgj2wp_old_k2_id` y `_fgj2wp_old_content_id` extraidos de las URLs.
Por defecto es dry-run. Usar `--apply` para escribir en la BD local.
"""
import argparse
import html
import re
import subprocess
import sys
import unicodedata
from dataclasses import dataclass, field
from typing import Optional
from urllib.parse import urljoin
import pymysql
ORIGIN_IP = "134.0.10.170"
HOST = "www.feadulta.com"
BASE = f"https://{HOST}"
WP_DB_USER = "wordpress_user"
WP_DB_PASS = "wordpress_pass"
WP_DB_NAME = "wordpress_db"
TERM_FEADULTA = 71
TERM_CARTA_SEMANA = 6
TERM_CARTAS_OTRAS = 21
TERM_CARTA_PASADA = 22
TERM_INDICE_MULTIMEDIA = 26
TERM_VIDEOS = 58
TERM_LECTURA = 1645
TERM_COMENTARIO_EDITORIAL = 1646
TERM_COMENTARIO = 1647
TERM_EUCARISTIA = 1648
TERM_MULTIMEDIA = 1649
TERM_ARTICULOS = 1650
SECTION_TO_TERM = {
"lectura": TERM_LECTURA,
"comentario_editorial": TERM_COMENTARIO_EDITORIAL,
"comentario": TERM_COMENTARIO,
"articulo": TERM_ARTICULOS,
"eucaristia": TERM_EUCARISTIA,
"multimedia": TERM_MULTIMEDIA,
}
CARTAS = [
{
"content_id": 9136,
"url": "/es/ayuda/otras-semanas/9136-uno-y-trino.html",
"date": "2026-05-28 00:00:00",
"cats": [TERM_CARTAS_OTRAS, TERM_FEADULTA],
},
{
"content_id": 9143,
"url": "/es/ayuda/semana-pasada/9143-20-anos-de-fe-adulta.html",
"date": "2026-06-06 00:00:00",
"cats": [TERM_CARTAS_OTRAS, TERM_CARTA_PASADA, TERM_FEADULTA],
},
{
"content_id": 9150,
"url": "/es/ayuda/esta-semana/9150-la-puerta-pequena.html",
"date": "2026-06-13 00:00:00",
"cats": [TERM_CARTA_SEMANA, TERM_CARTAS_OTRAS, TERM_FEADULTA],
},
]
@dataclass
class Item:
kind: str
source_id: int
url: str
title: str = ""
content: str = ""
slug: str = ""
date: str = "2026-06-13 00:00:00"
author_name: Optional[str] = None
term_ids: set[int] = field(default_factory=set)
carta_source_id: Optional[int] = None
def wp_ip() -> str:
result = subprocess.run(
[
"docker",
"inspect",
"wordpress-mysql",
"--format",
"{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}",
],
capture_output=True,
text=True,
check=True,
)
return result.stdout.strip()
def conn():
return pymysql.connect(
host=wp_ip(),
user=WP_DB_USER,
password=WP_DB_PASS,
database=WP_DB_NAME,
charset="utf8mb4",
autocommit=False,
cursorclass=pymysql.cursors.DictCursor,
)
def normalize(text: str) -> str:
text = unicodedata.normalize("NFKD", text)
text = "".join(c for c in text if not unicodedata.combining(c))
return re.sub(r"\s+", " ", text).strip().lower()
def slug_from_url(path: str) -> str:
name = path.rstrip("/").rsplit("/", 1)[-1]
name = name.split("?", 1)[0]
name = re.sub(r"^\d+-", "", name)
return re.sub(r"\.html$", "", name)
def id_from_url(path: str) -> Optional[int]:
m = re.search(r"/(\d+)-[^/?#]+(?:\.html)?", path)
return int(m.group(1)) if m else None
def fetch(path: str) -> str:
url = urljoin(BASE, path)
print(f"FETCH {path}", file=sys.stderr, flush=True)
result = subprocess.run(
[
"curl",
"--resolve",
f"{HOST}:443:{ORIGIN_IP}",
"-k",
"-L",
"--max-time",
"12",
"-A",
"Mozilla/5.0 Codex Feadulta delta importer",
"-sS",
url,
],
capture_output=True,
text=True,
check=True,
)
return result.stdout
def clean_fragment(fragment: str) -> str:
fragment = re.sub(r"<script\b.*?</script>", "", fragment, flags=re.I | re.S)
fragment = re.sub(r"<form\b.*?</form>", "", fragment, flags=re.I | re.S)
fragment = re.sub(r"\s+href=\"([^\"]*)\?tmpl=component[^\"]*\"", r' href="\1"', fragment)
fragment = fragment.replace("\r\n", "\n")
# Rutas de imagen Joomla -> uploads WP cuando el fichero existe localmente.
def repl(m):
attr, path = m.group(1), m.group(2)
local = f"/home/rafa/joomla-migration/wordpress/wp-content/uploads/{path}"
try:
exists = subprocess.run(["test", "-f", local]).returncode == 0
except Exception:
exists = False
if exists:
return f'{attr}="/fea/wp-content/uploads/{path}"'
return f'{attr}="/images/{path}"'
fragment = re.sub(r'(src|href)="/images/([^"]+)"', repl, fragment)
return fragment.strip()
def extract_title_and_content(doc: str) -> tuple[str, str]:
title = ""
m = re.search(r'<h2 class="fa-postheader">\s*(.*?)\s*</h2>', doc, re.I | re.S)
if not m:
m = re.search(r'<h2 class="itemTitle">\s*(.*?)\s*</h2>', doc, re.I | re.S)
if not m:
m = re.search(r'<meta property="og:title" content="([^"]+)"', doc, re.I | re.S)
if not m:
m = re.search(r"<title>\s*(.*?)\s*</title>", doc, re.I | re.S)
if m:
title = html.unescape(re.sub(r"<.*?>", "", m.group(1))).strip()
m = re.search(r'<div class="fa-article">\s*(.*?)\s*</div>\s*</div>\s*<div class="cleared"', doc, re.I | re.S)
if not m:
m = re.search(r'<div class="itemFullText">\s*(.*?)\s*</div>', doc, re.I | re.S)
if not m:
m = re.search(r'<div class="fa-article">\s*(.*?)\s*</div>', doc, re.I | re.S)
content = clean_fragment(m.group(1)) if m else ""
return title, content
def extract_author(doc: str) -> Optional[str]:
m = re.search(r'<meta name="author" content="([^"]+)"', doc, re.I)
if m:
return html.unescape(m.group(1)).strip()
m = re.search(r'<a rel="author"[^>]*>\s*(.*?)\s*</a>', doc, re.I | re.S)
if m:
return html.unescape(re.sub(r"<.*?>", "", m.group(1))).strip()
return None
def iter_paragraphs(content: str):
for m in re.finditer(r"<p\b[^>]*>(.*?)</p>", content, flags=re.I | re.S):
yield m.group(1)
def links_by_section(carta: Item) -> list[tuple[str, str, str, Optional[str]]]:
section = None
evangelio_pos = 0
out = []
for p in iter_paragraphs(carta.content):
plain = normalize(re.sub(r"<.*?>", " ", html.unescape(p)))
if "evangelio y comentarios al evangelio" in plain:
section = "evangelio"
evangelio_pos = 0
continue
if "articulos seleccionados para la semana" in plain:
section = "articulo"
continue
if "eucaristias mas participativas" in plain:
section = "eucaristia"
continue
if "material multimedia" in plain:
section = "multimedia"
continue
if not section:
continue
for href, text in re.findall(r'<a\b[^>]*href="([^"]+)"[^>]*>(.*?)</a>', p, flags=re.I | re.S):
href = html.unescape(href)
text_plain = html.unescape(re.sub(r"<.*?>", " ", text))
text_plain = re.sub(r"\s+", " ", text_plain).strip()
if section == "evangelio":
if evangelio_pos == 0:
cat = "lectura"
elif evangelio_pos == 1:
cat = "comentario_editorial"
else:
cat = "comentario"
evangelio_pos += 1
else:
cat = section
author = text_plain.split(":", 1)[0].strip() if ":" in text_plain else None
out.append((href, cat, text_plain, author))
return out
def load_existing(c, meta_key: str) -> set[int]:
with c.cursor() as cur:
cur.execute(
"SELECT CAST(meta_value AS UNSIGNED) id FROM wp_postmeta WHERE meta_key=%s",
(meta_key,),
)
return {int(r["id"]) for r in cur.fetchall() if r["id"] is not None}
def max_existing(ids: set[int]) -> int:
return max(ids) if ids else 0
def load_terms(c) -> dict[int, int]:
term_ids = [
TERM_FEADULTA,
TERM_CARTA_SEMANA,
TERM_CARTAS_OTRAS,
TERM_CARTA_PASADA,
TERM_INDICE_MULTIMEDIA,
TERM_VIDEOS,
TERM_LECTURA,
TERM_COMENTARIO_EDITORIAL,
TERM_COMENTARIO,
TERM_EUCARISTIA,
TERM_MULTIMEDIA,
TERM_ARTICULOS,
]
with c.cursor() as cur:
cur.execute(
"SELECT term_id, term_taxonomy_id FROM wp_term_taxonomy "
"WHERE taxonomy='category' AND term_id IN (%s)" % ",".join(["%s"] * len(term_ids)),
term_ids,
)
return {int(r["term_id"]): int(r["term_taxonomy_id"]) for r in cur.fetchall()}
def load_lang_es(c) -> Optional[int]:
with c.cursor() as cur:
cur.execute(
"SELECT tt.term_taxonomy_id FROM wp_terms t "
"JOIN wp_term_taxonomy tt ON tt.term_id=t.term_id "
"WHERE tt.taxonomy='language' AND t.slug='es' LIMIT 1"
)
row = cur.fetchone()
return int(row["term_taxonomy_id"]) if row else None
def load_authors(c) -> dict[str, int]:
with c.cursor() as cur:
cur.execute("SELECT ID, display_name, user_login FROM wp_users")
rows = cur.fetchall()
authors = {}
for r in rows:
authors[normalize(r["display_name"])] = int(r["ID"])
authors[normalize(r["user_login"])] = int(r["ID"])
return authors
def resolve_author(author_map: dict[str, int], name: Optional[str]) -> int:
if not name:
return 1
n = normalize(name)
if n in author_map:
return author_map[n]
for key, uid in author_map.items():
if n == key or n in key or key in n:
return uid
return 1
def build_items(c) -> list[Item]:
existing_k2 = load_existing(c, "_fgj2wp_old_k2_id")
existing_content = load_existing(c, "_fgj2wp_old_content_id")
max_k2 = max_existing(existing_k2)
max_content = max_existing(existing_content)
print(
f"WP existentes: K2={len(existing_k2)} max={max_k2} "
f"content={len(existing_content)} max={max_content}"
)
items: dict[tuple[str, int], Item] = {}
for carta_def in CARTAS:
doc = fetch(carta_def["url"])
title, content = extract_title_and_content(doc)
carta = Item(
kind="content",
source_id=carta_def["content_id"],
url=carta_def["url"],
title=title,
content=content,
slug=slug_from_url(carta_def["url"]),
date=carta_def["date"],
term_ids=set(carta_def["cats"]),
)
if carta.source_id > max_content and carta.source_id not in existing_content:
items[(carta.kind, carta.source_id)] = carta
for href, cat_name, _text, author in links_by_section(carta):
if "/buscadoravanzado/item/" in href:
sid = id_from_url(href)
if not sid or sid <= max_k2 or sid in existing_k2:
continue
key = ("k2", sid)
item = items.get(key)
if not item:
item = Item(
kind="k2",
source_id=sid,
url=href,
slug=slug_from_url(href),
date=carta.date,
author_name=author,
term_ids={TERM_FEADULTA},
carta_source_id=carta.source_id,
)
items[key] = item
item.term_ids.add(SECTION_TO_TERM[cat_name])
elif "/indice-multimedia/" in href or "/videos/" in href:
sid = id_from_url(href)
if not sid or sid <= max_content or sid in existing_content:
continue
is_video = "/videos/" in href
key = ("content", sid)
item = items.get(key)
if not item:
item = Item(
kind="content",
source_id=sid,
url=href,
slug=slug_from_url(href),
date=carta.date,
term_ids={TERM_MULTIMEDIA, TERM_VIDEOS if is_video else TERM_INDICE_MULTIMEDIA},
)
items[key] = item
# Fetch item pages after discovery.
for item in items.values():
if item.title and item.content:
continue
doc = fetch(item.url)
title, content = extract_title_and_content(doc)
author = extract_author(doc)
item.title = title or item.slug.replace("-", " ").title()
item.content = content
if author and not item.author_name:
item.author_name = author
return sorted(items.values(), key=lambda x: (x.date, x.kind, x.source_id))
def insert_item(c, item: Item, term_to_tt: dict[int, int], lang_es_tt: Optional[int], author_map: dict[str, int], dry_run: bool) -> Optional[int]:
author_id = resolve_author(author_map, item.author_name)
if dry_run:
print(
f"[DRY] {item.kind:7s} {item.source_id:5d} "
f"terms={sorted(item.term_ids)} author={author_id} {item.title[:70]}"
)
return None
with c.cursor() as cur:
cur.execute(
"""
INSERT INTO wp_posts
(post_author, post_date, post_date_gmt, post_content, post_title,
post_excerpt, post_status, comment_status, ping_status, post_name,
post_type, post_modified, post_modified_gmt, comment_count,
to_ping, pinged, post_content_filtered)
VALUES
(%s,%s,%s,%s,%s,'','publish','open','open',%s,
'post',%s,%s,0,'','','')
""",
(author_id, item.date, item.date, item.content, item.title, item.slug, item.date, item.date),
)
post_id = cur.lastrowid
meta_key = "_fgj2wp_old_k2_id" if item.kind == "k2" else "_fgj2wp_old_content_id"
cur.execute(
"INSERT INTO wp_postmeta (post_id, meta_key, meta_value) VALUES (%s,%s,%s)",
(post_id, meta_key, str(item.source_id)),
)
cur.execute(
"INSERT INTO wp_postmeta (post_id, meta_key, meta_value) VALUES (%s,'Idioma','1')",
(post_id,),
)
for term_id in sorted(item.term_ids):
tt = term_to_tt.get(term_id)
if tt:
cur.execute(
"INSERT IGNORE INTO wp_term_relationships (object_id, term_taxonomy_id) VALUES (%s,%s)",
(post_id, tt),
)
if lang_es_tt:
cur.execute(
"INSERT IGNORE INTO wp_term_relationships (object_id, term_taxonomy_id) VALUES (%s,%s)",
(post_id, lang_es_tt),
)
return post_id
def refresh_counts(c, term_to_tt: dict[int, int], lang_es_tt: Optional[int]):
ttids = list(term_to_tt.values())
if lang_es_tt:
ttids.append(lang_es_tt)
with c.cursor() as cur:
cur.execute(
"UPDATE wp_term_taxonomy tt SET count = ("
"SELECT COUNT(*) FROM wp_term_relationships tr "
"WHERE tr.term_taxonomy_id=tt.term_taxonomy_id"
") WHERE tt.term_taxonomy_id IN (%s)" % ",".join(["%s"] * len(ttids)),
ttids,
)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--apply", action="store_true", help="escribe en WordPress local")
args = ap.parse_args()
dry_run = not args.apply
c = conn()
try:
term_to_tt = load_terms(c)
lang_es_tt = load_lang_es(c)
author_map = load_authors(c)
items = build_items(c)
print(f"Items nuevos detectados: {len(items)}")
print(
" K2:",
len([i for i in items if i.kind == "k2"]),
"content:",
len([i for i in items if i.kind == "content"]),
)
source_to_wp = {}
for item in items:
wp_id = insert_item(c, item, term_to_tt, lang_es_tt, author_map, dry_run)
if wp_id:
source_to_wp[(item.kind, item.source_id)] = wp_id
if not dry_run:
with c.cursor() as cur:
for item in items:
if item.kind != "k2" or not item.carta_source_id:
continue
wp_id = source_to_wp.get(("k2", item.source_id))
carta_wp_id = source_to_wp.get(("content", item.carta_source_id))
if wp_id and carta_wp_id:
cur.execute(
"INSERT IGNORE INTO wp_postmeta (post_id, meta_key, meta_value) VALUES (%s,'_carta_id',%s)",
(wp_id, str(carta_wp_id)),
)
refresh_counts(c, term_to_tt, lang_es_tt)
c.commit()
print("Import commit OK.")
else:
c.rollback()
print("Dry-run: sin cambios.")
except Exception:
c.rollback()
raise
finally:
c.close()
if __name__ == "__main__":
main()