Files
2026-06-09 18:31:59 +02:00

369 lines
13 KiB
Python

#!/usr/bin/env python3
"""Analyze WordPress MySQL dump from a .wpress extract.
Parses database.sql and outputs:
- pages.json : all published pages with title, slug, content, SEO meta
- design-system.json : colors, fonts from wp_options (Divi theme settings)
- site-info.json : domain, WP version, detected Divi version, plugin list
Usage:
python3 analyze_db.py <extract_dir> <output_data_dir>
extract_dir : path to wpress-extract/ (contains database.sql)
output_data_dir : where to write JSON output files (e.g. .planning/data/)
"""
from __future__ import annotations
import json
import os
import re
import sys
from pathlib import Path
from typing import Any
# ---------------------------------------------------------------------------
# SQL parsing helpers
# ---------------------------------------------------------------------------
def _unescape_sql(s: str) -> str:
"""Undo MySQL string escaping."""
return (s
.replace("\\'", "'")
.replace('\\"', '"')
.replace("\\\\", "\\")
.replace("\\n", "\n")
.replace("\\r", "\r")
.replace("\\t", "\t")
.replace("\\0", "\0"))
def _parse_values_block(sql_block: str) -> list[list[str]]:
"""Extract rows from a multi-row INSERT VALUES block.
Handles commas inside quoted strings via a simple state machine.
Returns list of rows; each row is a list of raw string values.
"""
rows: list[list[str]] = []
# Find VALUES section
m = re.search(r"VALUES\s*", sql_block, re.IGNORECASE)
if not m:
return rows
rest = sql_block[m.end():]
i = 0
n = len(rest)
while i < n:
# Skip to '('
while i < n and rest[i] != '(':
i += 1
if i >= n:
break
i += 1 # skip '('
row: list[str] = []
field = []
in_quote = False
quote_char = ''
while i < n:
c = rest[i]
if not in_quote:
if c in ("'", '"'):
in_quote = True
quote_char = c
i += 1
continue
elif c == ',' :
row.append("".join(field))
field = []
i += 1
continue
elif c == ')':
row.append("".join(field))
field = []
rows.append(row)
i += 1
break
elif c == 'N' and rest[i:i+4] == 'NULL':
field.append('\x00NULL\x00')
i += 4
continue
else:
field.append(c)
i += 1
else:
if c == '\\' and i + 1 < n:
field.append(c)
field.append(rest[i + 1])
i += 2
continue
elif c == quote_char:
in_quote = False
i += 1
continue
else:
field.append(c)
i += 1
return rows
def load_table(sql_text: str, table_name: str) -> list[dict]:
"""Return all rows for table_name as list of dicts."""
# Find column definition
col_re = re.compile(
rf"CREATE TABLE `{re.escape(table_name)}`\s*\((.*?)\)\s*ENGINE",
re.DOTALL | re.IGNORECASE,
)
m = col_re.search(sql_text)
if not m:
return []
col_block = m.group(1)
cols = re.findall(r"`([^`]+)`\s+(?:bigint|int|mediumint|smallint|tinyint|varchar|text|mediumtext|longtext|char|datetime|date|float|double|decimal|enum|set|blob|mediumblob|longblob)", col_block, re.IGNORECASE)
# Find INSERT blocks for this table
insert_re = re.compile(
rf"INSERT INTO `{re.escape(table_name)}`\s+VALUES\s*\(.+?\);",
re.DOTALL | re.IGNORECASE,
)
rows_out: list[dict] = []
for block in insert_re.finditer(sql_text):
parsed = _parse_values_block(block.group(0))
for row in parsed:
d: dict[str, Any] = {}
for idx, col in enumerate(cols):
val = row[idx] if idx < len(row) else ""
if val == "\x00NULL\x00":
d[col] = None
else:
d[col] = _unescape_sql(val)
rows_out.append(d)
return rows_out
# ---------------------------------------------------------------------------
# Divi version detection
# ---------------------------------------------------------------------------
def detect_divi_version(sql_text: str) -> str:
if "wp:divi/" in sql_text:
return "5"
if "[et_pb_section" in sql_text:
return "4"
# Check et_theme_builder version in options
m = re.search(r"'et_theme_builder_api_version','([^']+)'", sql_text)
if m:
return "5"
return "unknown"
# ---------------------------------------------------------------------------
# Options extraction
# ---------------------------------------------------------------------------
def load_options(sql_text: str, prefix: str = "wp_") -> dict[str, str]:
table = f"{prefix}options"
rows = load_table(sql_text, table)
return {r["option_name"]: r["option_value"] for r in rows if r.get("option_name")}
def _parse_php_serialized_pairs(raw: str) -> dict[str, str]:
"""Extract key/value string pairs from a PHP-serialized array.
Handles both escaped (SQL-dump) and unescaped forms.
Only returns s->s pairs (string key, string value).
"""
result: dict[str, str] = {}
# SQL dumps escape double-quotes as \\", giving patterns like:
# s:9:\\"body_font\\";s:7:\\"DM Sans\\";
# Also handle unescaped form: s:9:"body_font";s:7:"DM Sans";
pat = re.compile(
r's:\d+:\\"([^"\\]+)\\";s:\d+:\\"([^"\\]*)\\"' # SQL-escaped
r'|s:\d+:"([^"]+)";s:\d+:"([^"]*)"', # plain
)
for m in pat.finditer(raw):
if m.group(1) is not None:
k, v = m.group(1), m.group(2)
else:
k, v = m.group(3), m.group(4)
result[k] = v
return result
def extract_design_system(options: dict[str, str]) -> dict:
"""Pull Divi theme colors, fonts, and spacing from wp_options."""
raw = options.get("et_divi", "") or options.get("et_divi_options", "")
design: dict[str, Any] = {}
# Parse PHP-serialized et_divi option (Divi 4 + 5 store settings here)
if raw:
pairs = _parse_php_serialized_pairs(raw)
# Map Divi option keys to design-system keys
key_map = {
"accent_color": "primary_color_dark",
"link_color": "primary_color",
"body_font": "body_font",
"heading_font": "heading_font",
"header_font": "heading_font", # Divi 4 alias
"body_font_size": "body_font_size",
"body_line_height": "body_line_height",
"heading_font_weight": "heading_font_weight",
"header_text_size": "heading_font_size",
"header_line_height": "heading_line_height",
"header_color": "heading_color",
"font_color": "body_color",
"secondary_accent_color": "secondary_color",
}
for divi_key, design_key in key_map.items():
if divi_key in pairs:
design.setdefault(design_key, pairs[divi_key])
# Site info
design["site_url"] = options.get("siteurl", "")
design["site_name"] = options.get("blogname", "")
return design
# ---------------------------------------------------------------------------
# Page extraction
# ---------------------------------------------------------------------------
def extract_pages(sql_text: str, prefix: str = "wp_") -> list[dict]:
"""Return all published pages and posts with SEO meta."""
posts = load_table(sql_text, f"{prefix}posts")
postmeta = load_table(sql_text, f"{prefix}postmeta")
# Build postmeta lookup: post_id -> {meta_key: meta_value}
meta_map: dict[str, dict[str, str]] = {}
for row in postmeta:
pid = str(row.get("post_id", ""))
meta_map.setdefault(pid, {})[row.get("meta_key", "")] = row.get("meta_value", "")
pages = []
for p in posts:
if p.get("post_status") not in ("publish",):
continue
post_type = p.get("post_type", "")
if post_type not in ("page", "post", "event"):
continue
pid = str(p.get("ID", ""))
meta = meta_map.get(pid, {})
# Rank Math SEO fields
rm_title = meta.get("rank_math_title", "")
rm_desc = meta.get("rank_math_description", "")
rm_focus = meta.get("rank_math_focus_keyword", "")
entry = {
"id": pid,
"post_type": post_type,
"slug": p.get("post_name", ""),
"title": p.get("post_title", ""),
"status": p.get("post_status", ""),
"date": p.get("post_date", "")[:10],
"modified": p.get("post_modified", "")[:10],
"content_raw": p.get("post_content", ""),
"excerpt": p.get("post_excerpt", ""),
"parent_id": p.get("post_parent", "0"),
"menu_order": p.get("menu_order", "0"),
"seo_title": rm_title,
"seo_description": rm_desc,
"seo_keywords": rm_focus,
"acf": {k: v for k, v in meta.items() if not k.startswith("_") and not k.startswith("rank_math") and not k.startswith("et_")},
}
pages.append(entry)
pages.sort(key=lambda x: int(x["menu_order"] or 0))
return pages
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <extract_dir> <output_data_dir>")
sys.exit(1)
extract_dir = Path(sys.argv[1])
out_dir = Path(sys.argv[2])
out_dir.mkdir(parents=True, exist_ok=True)
sql_file = extract_dir / "database.sql"
if not sql_file.exists():
# Search for it
found = list(extract_dir.rglob("*.sql"))
if not found:
print(f"ERROR: No .sql file found under {extract_dir}")
sys.exit(1)
sql_file = found[0]
print(f"Found SQL at: {sql_file}")
print(f"Loading {sql_file} ({sql_file.stat().st_size / 1024 / 1024:.1f} MB)...")
sql_text = sql_file.read_text(encoding="utf-8", errors="replace")
# Detect Divi version
divi_version = detect_divi_version(sql_text)
print(f"Divi version detected: {divi_version}")
# Load wp_options
pkg = {}
pkg_file = extract_dir / "package.json"
if pkg_file.exists():
pkg = json.loads(pkg_file.read_text())
# AIOIM dumps use SERVMASK_PREFIX_ as a placeholder in the SQL file.
# Detect which prefix the dump actually uses.
if "SERVMASK_PREFIX_" in sql_text:
sql_prefix = "SERVMASK_PREFIX_"
else:
sql_prefix = pkg.get("Database", {}).get("Prefix", "wp_")
runtime_prefix = pkg.get("Database", {}).get("Prefix", "wp_")
print(f"SQL prefix: {sql_prefix!r} (runtime prefix: {runtime_prefix!r})")
options = load_options(sql_text, sql_prefix)
print(f"Loaded {len(options)} options")
# Design system
design = extract_design_system(options)
design["divi_version"] = divi_version
design["wp_version"] = pkg.get("WordPress", {}).get("Version", "")
design["plugins"] = pkg.get("Plugins", [])
(out_dir / "design-system.json").write_text(json.dumps(design, indent=2, ensure_ascii=False))
print(f"Wrote design-system.json ({len(design)} keys)")
# Pages
pages = extract_pages(sql_text, sql_prefix)
(out_dir / "pages.json").write_text(json.dumps(pages, indent=2, ensure_ascii=False))
print(f"Wrote pages.json ({len(pages)} pages/posts)")
# Site info summary
site_info = {
"domain": pkg.get("SiteURL", options.get("siteurl", "")),
"name": options.get("blogname", ""),
"tagline": options.get("blogdescription", ""),
"admin_email": options.get("admin_email", ""),
"wp_version": pkg.get("WordPress", {}).get("Version", ""),
"divi_version": divi_version,
"plugins": pkg.get("Plugins", []),
"prefix": runtime_prefix,
"total_pages": len([p for p in pages if p["post_type"] == "page"]),
"total_posts": len([p for p in pages if p["post_type"] == "post"]),
}
(out_dir / "site-info.json").write_text(json.dumps(site_info, indent=2, ensure_ascii=False))
print(f"Wrote site-info.json")
print(f"\nDone. Output in: {out_dir}")
print(f" pages.json : {len(pages)} entries")
print(f" design-system.json: {len(design)} keys")
print(f" site-info.json : done")
if __name__ == "__main__":
main()