369 lines
13 KiB
Python
369 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""Analyze WordPress MySQL dump from a .wpress extract.
|
|
|
|
Parses database.sql and outputs:
|
|
- pages.json : all published pages with title, slug, content, SEO meta
|
|
- design-system.json : colors, fonts from wp_options (Divi theme settings)
|
|
- site-info.json : domain, WP version, detected Divi version, plugin list
|
|
|
|
Usage:
|
|
python3 analyze_db.py <extract_dir> <output_data_dir>
|
|
|
|
extract_dir : path to wpress-extract/ (contains database.sql)
|
|
output_data_dir : where to write JSON output files (e.g. .planning/data/)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SQL parsing helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _unescape_sql(s: str) -> str:
|
|
"""Undo MySQL string escaping."""
|
|
return (s
|
|
.replace("\\'", "'")
|
|
.replace('\\"', '"')
|
|
.replace("\\\\", "\\")
|
|
.replace("\\n", "\n")
|
|
.replace("\\r", "\r")
|
|
.replace("\\t", "\t")
|
|
.replace("\\0", "\0"))
|
|
|
|
|
|
def _parse_values_block(sql_block: str) -> list[list[str]]:
|
|
"""Extract rows from a multi-row INSERT VALUES block.
|
|
|
|
Handles commas inside quoted strings via a simple state machine.
|
|
Returns list of rows; each row is a list of raw string values.
|
|
"""
|
|
rows: list[list[str]] = []
|
|
# Find VALUES section
|
|
m = re.search(r"VALUES\s*", sql_block, re.IGNORECASE)
|
|
if not m:
|
|
return rows
|
|
rest = sql_block[m.end():]
|
|
|
|
i = 0
|
|
n = len(rest)
|
|
while i < n:
|
|
# Skip to '('
|
|
while i < n and rest[i] != '(':
|
|
i += 1
|
|
if i >= n:
|
|
break
|
|
i += 1 # skip '('
|
|
|
|
row: list[str] = []
|
|
field = []
|
|
in_quote = False
|
|
quote_char = ''
|
|
|
|
while i < n:
|
|
c = rest[i]
|
|
if not in_quote:
|
|
if c in ("'", '"'):
|
|
in_quote = True
|
|
quote_char = c
|
|
i += 1
|
|
continue
|
|
elif c == ',' :
|
|
row.append("".join(field))
|
|
field = []
|
|
i += 1
|
|
continue
|
|
elif c == ')':
|
|
row.append("".join(field))
|
|
field = []
|
|
rows.append(row)
|
|
i += 1
|
|
break
|
|
elif c == 'N' and rest[i:i+4] == 'NULL':
|
|
field.append('\x00NULL\x00')
|
|
i += 4
|
|
continue
|
|
else:
|
|
field.append(c)
|
|
i += 1
|
|
else:
|
|
if c == '\\' and i + 1 < n:
|
|
field.append(c)
|
|
field.append(rest[i + 1])
|
|
i += 2
|
|
continue
|
|
elif c == quote_char:
|
|
in_quote = False
|
|
i += 1
|
|
continue
|
|
else:
|
|
field.append(c)
|
|
i += 1
|
|
|
|
return rows
|
|
|
|
|
|
def load_table(sql_text: str, table_name: str) -> list[dict]:
|
|
"""Return all rows for table_name as list of dicts."""
|
|
# Find column definition
|
|
col_re = re.compile(
|
|
rf"CREATE TABLE `{re.escape(table_name)}`\s*\((.*?)\)\s*ENGINE",
|
|
re.DOTALL | re.IGNORECASE,
|
|
)
|
|
m = col_re.search(sql_text)
|
|
if not m:
|
|
return []
|
|
col_block = m.group(1)
|
|
cols = re.findall(r"`([^`]+)`\s+(?:bigint|int|mediumint|smallint|tinyint|varchar|text|mediumtext|longtext|char|datetime|date|float|double|decimal|enum|set|blob|mediumblob|longblob)", col_block, re.IGNORECASE)
|
|
|
|
# Find INSERT blocks for this table
|
|
insert_re = re.compile(
|
|
rf"INSERT INTO `{re.escape(table_name)}`\s+VALUES\s*\(.+?\);",
|
|
re.DOTALL | re.IGNORECASE,
|
|
)
|
|
rows_out: list[dict] = []
|
|
for block in insert_re.finditer(sql_text):
|
|
parsed = _parse_values_block(block.group(0))
|
|
for row in parsed:
|
|
d: dict[str, Any] = {}
|
|
for idx, col in enumerate(cols):
|
|
val = row[idx] if idx < len(row) else ""
|
|
if val == "\x00NULL\x00":
|
|
d[col] = None
|
|
else:
|
|
d[col] = _unescape_sql(val)
|
|
rows_out.append(d)
|
|
return rows_out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Divi version detection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def detect_divi_version(sql_text: str) -> str:
|
|
if "wp:divi/" in sql_text:
|
|
return "5"
|
|
if "[et_pb_section" in sql_text:
|
|
return "4"
|
|
# Check et_theme_builder version in options
|
|
m = re.search(r"'et_theme_builder_api_version','([^']+)'", sql_text)
|
|
if m:
|
|
return "5"
|
|
return "unknown"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Options extraction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_options(sql_text: str, prefix: str = "wp_") -> dict[str, str]:
|
|
table = f"{prefix}options"
|
|
rows = load_table(sql_text, table)
|
|
return {r["option_name"]: r["option_value"] for r in rows if r.get("option_name")}
|
|
|
|
|
|
def _parse_php_serialized_pairs(raw: str) -> dict[str, str]:
|
|
"""Extract key/value string pairs from a PHP-serialized array.
|
|
|
|
Handles both escaped (SQL-dump) and unescaped forms.
|
|
Only returns s->s pairs (string key, string value).
|
|
"""
|
|
result: dict[str, str] = {}
|
|
# SQL dumps escape double-quotes as \\", giving patterns like:
|
|
# s:9:\\"body_font\\";s:7:\\"DM Sans\\";
|
|
# Also handle unescaped form: s:9:"body_font";s:7:"DM Sans";
|
|
pat = re.compile(
|
|
r's:\d+:\\"([^"\\]+)\\";s:\d+:\\"([^"\\]*)\\"' # SQL-escaped
|
|
r'|s:\d+:"([^"]+)";s:\d+:"([^"]*)"', # plain
|
|
)
|
|
for m in pat.finditer(raw):
|
|
if m.group(1) is not None:
|
|
k, v = m.group(1), m.group(2)
|
|
else:
|
|
k, v = m.group(3), m.group(4)
|
|
result[k] = v
|
|
return result
|
|
|
|
|
|
def extract_design_system(options: dict[str, str]) -> dict:
|
|
"""Pull Divi theme colors, fonts, and spacing from wp_options."""
|
|
raw = options.get("et_divi", "") or options.get("et_divi_options", "")
|
|
|
|
design: dict[str, Any] = {}
|
|
|
|
# Parse PHP-serialized et_divi option (Divi 4 + 5 store settings here)
|
|
if raw:
|
|
pairs = _parse_php_serialized_pairs(raw)
|
|
# Map Divi option keys to design-system keys
|
|
key_map = {
|
|
"accent_color": "primary_color_dark",
|
|
"link_color": "primary_color",
|
|
"body_font": "body_font",
|
|
"heading_font": "heading_font",
|
|
"header_font": "heading_font", # Divi 4 alias
|
|
"body_font_size": "body_font_size",
|
|
"body_line_height": "body_line_height",
|
|
"heading_font_weight": "heading_font_weight",
|
|
"header_text_size": "heading_font_size",
|
|
"header_line_height": "heading_line_height",
|
|
"header_color": "heading_color",
|
|
"font_color": "body_color",
|
|
"secondary_accent_color": "secondary_color",
|
|
}
|
|
for divi_key, design_key in key_map.items():
|
|
if divi_key in pairs:
|
|
design.setdefault(design_key, pairs[divi_key])
|
|
|
|
# Site info
|
|
design["site_url"] = options.get("siteurl", "")
|
|
design["site_name"] = options.get("blogname", "")
|
|
|
|
return design
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Page extraction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def extract_pages(sql_text: str, prefix: str = "wp_") -> list[dict]:
|
|
"""Return all published pages and posts with SEO meta."""
|
|
posts = load_table(sql_text, f"{prefix}posts")
|
|
postmeta = load_table(sql_text, f"{prefix}postmeta")
|
|
|
|
# Build postmeta lookup: post_id -> {meta_key: meta_value}
|
|
meta_map: dict[str, dict[str, str]] = {}
|
|
for row in postmeta:
|
|
pid = str(row.get("post_id", ""))
|
|
meta_map.setdefault(pid, {})[row.get("meta_key", "")] = row.get("meta_value", "")
|
|
|
|
pages = []
|
|
for p in posts:
|
|
if p.get("post_status") not in ("publish",):
|
|
continue
|
|
post_type = p.get("post_type", "")
|
|
if post_type not in ("page", "post", "event"):
|
|
continue
|
|
|
|
pid = str(p.get("ID", ""))
|
|
meta = meta_map.get(pid, {})
|
|
|
|
# Rank Math SEO fields
|
|
rm_title = meta.get("rank_math_title", "")
|
|
rm_desc = meta.get("rank_math_description", "")
|
|
rm_focus = meta.get("rank_math_focus_keyword", "")
|
|
|
|
entry = {
|
|
"id": pid,
|
|
"post_type": post_type,
|
|
"slug": p.get("post_name", ""),
|
|
"title": p.get("post_title", ""),
|
|
"status": p.get("post_status", ""),
|
|
"date": p.get("post_date", "")[:10],
|
|
"modified": p.get("post_modified", "")[:10],
|
|
"content_raw": p.get("post_content", ""),
|
|
"excerpt": p.get("post_excerpt", ""),
|
|
"parent_id": p.get("post_parent", "0"),
|
|
"menu_order": p.get("menu_order", "0"),
|
|
"seo_title": rm_title,
|
|
"seo_description": rm_desc,
|
|
"seo_keywords": rm_focus,
|
|
"acf": {k: v for k, v in meta.items() if not k.startswith("_") and not k.startswith("rank_math") and not k.startswith("et_")},
|
|
}
|
|
pages.append(entry)
|
|
|
|
pages.sort(key=lambda x: int(x["menu_order"] or 0))
|
|
return pages
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
if len(sys.argv) < 3:
|
|
print(f"Usage: {sys.argv[0]} <extract_dir> <output_data_dir>")
|
|
sys.exit(1)
|
|
|
|
extract_dir = Path(sys.argv[1])
|
|
out_dir = Path(sys.argv[2])
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
sql_file = extract_dir / "database.sql"
|
|
if not sql_file.exists():
|
|
# Search for it
|
|
found = list(extract_dir.rglob("*.sql"))
|
|
if not found:
|
|
print(f"ERROR: No .sql file found under {extract_dir}")
|
|
sys.exit(1)
|
|
sql_file = found[0]
|
|
print(f"Found SQL at: {sql_file}")
|
|
|
|
print(f"Loading {sql_file} ({sql_file.stat().st_size / 1024 / 1024:.1f} MB)...")
|
|
sql_text = sql_file.read_text(encoding="utf-8", errors="replace")
|
|
|
|
# Detect Divi version
|
|
divi_version = detect_divi_version(sql_text)
|
|
print(f"Divi version detected: {divi_version}")
|
|
|
|
# Load wp_options
|
|
pkg = {}
|
|
pkg_file = extract_dir / "package.json"
|
|
if pkg_file.exists():
|
|
pkg = json.loads(pkg_file.read_text())
|
|
|
|
# AIOIM dumps use SERVMASK_PREFIX_ as a placeholder in the SQL file.
|
|
# Detect which prefix the dump actually uses.
|
|
if "SERVMASK_PREFIX_" in sql_text:
|
|
sql_prefix = "SERVMASK_PREFIX_"
|
|
else:
|
|
sql_prefix = pkg.get("Database", {}).get("Prefix", "wp_")
|
|
runtime_prefix = pkg.get("Database", {}).get("Prefix", "wp_")
|
|
print(f"SQL prefix: {sql_prefix!r} (runtime prefix: {runtime_prefix!r})")
|
|
|
|
options = load_options(sql_text, sql_prefix)
|
|
print(f"Loaded {len(options)} options")
|
|
|
|
# Design system
|
|
design = extract_design_system(options)
|
|
design["divi_version"] = divi_version
|
|
design["wp_version"] = pkg.get("WordPress", {}).get("Version", "")
|
|
design["plugins"] = pkg.get("Plugins", [])
|
|
(out_dir / "design-system.json").write_text(json.dumps(design, indent=2, ensure_ascii=False))
|
|
print(f"Wrote design-system.json ({len(design)} keys)")
|
|
|
|
# Pages
|
|
pages = extract_pages(sql_text, sql_prefix)
|
|
(out_dir / "pages.json").write_text(json.dumps(pages, indent=2, ensure_ascii=False))
|
|
print(f"Wrote pages.json ({len(pages)} pages/posts)")
|
|
|
|
# Site info summary
|
|
site_info = {
|
|
"domain": pkg.get("SiteURL", options.get("siteurl", "")),
|
|
"name": options.get("blogname", ""),
|
|
"tagline": options.get("blogdescription", ""),
|
|
"admin_email": options.get("admin_email", ""),
|
|
"wp_version": pkg.get("WordPress", {}).get("Version", ""),
|
|
"divi_version": divi_version,
|
|
"plugins": pkg.get("Plugins", []),
|
|
"prefix": runtime_prefix,
|
|
"total_pages": len([p for p in pages if p["post_type"] == "page"]),
|
|
"total_posts": len([p for p in pages if p["post_type"] == "post"]),
|
|
}
|
|
(out_dir / "site-info.json").write_text(json.dumps(site_info, indent=2, ensure_ascii=False))
|
|
print(f"Wrote site-info.json")
|
|
|
|
print(f"\nDone. Output in: {out_dir}")
|
|
print(f" pages.json : {len(pages)} entries")
|
|
print(f" design-system.json: {len(design)} keys")
|
|
print(f" site-info.json : done")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|