#!/usr/bin/env python3 """Analyze WordPress MySQL dump from a .wpress extract. Parses database.sql and outputs: - pages.json : all published pages with title, slug, content, SEO meta - design-system.json : colors, fonts from wp_options (Divi theme settings) - site-info.json : domain, WP version, detected Divi version, plugin list Usage: python3 analyze_db.py extract_dir : path to wpress-extract/ (contains database.sql) output_data_dir : where to write JSON output files (e.g. .planning/data/) """ from __future__ import annotations import json import os import re import sys from pathlib import Path from typing import Any # --------------------------------------------------------------------------- # SQL parsing helpers # --------------------------------------------------------------------------- def _unescape_sql(s: str) -> str: """Undo MySQL string escaping.""" return (s .replace("\\'", "'") .replace('\\"', '"') .replace("\\\\", "\\") .replace("\\n", "\n") .replace("\\r", "\r") .replace("\\t", "\t") .replace("\\0", "\0")) def _parse_values_block(sql_block: str) -> list[list[str]]: """Extract rows from a multi-row INSERT VALUES block. Handles commas inside quoted strings via a simple state machine. Returns list of rows; each row is a list of raw string values. """ rows: list[list[str]] = [] # Find VALUES section m = re.search(r"VALUES\s*", sql_block, re.IGNORECASE) if not m: return rows rest = sql_block[m.end():] i = 0 n = len(rest) while i < n: # Skip to '(' while i < n and rest[i] != '(': i += 1 if i >= n: break i += 1 # skip '(' row: list[str] = [] field = [] in_quote = False quote_char = '' while i < n: c = rest[i] if not in_quote: if c in ("'", '"'): in_quote = True quote_char = c i += 1 continue elif c == ',' : row.append("".join(field)) field = [] i += 1 continue elif c == ')': row.append("".join(field)) field = [] rows.append(row) i += 1 break elif c == 'N' and rest[i:i+4] == 'NULL': field.append('\x00NULL\x00') i += 4 continue else: field.append(c) i += 1 else: if c == '\\' and i + 1 < n: field.append(c) field.append(rest[i + 1]) i += 2 continue elif c == quote_char: in_quote = False i += 1 continue else: field.append(c) i += 1 return rows def load_table(sql_text: str, table_name: str) -> list[dict]: """Return all rows for table_name as list of dicts.""" # Find column definition col_re = re.compile( rf"CREATE TABLE `{re.escape(table_name)}`\s*\((.*?)\)\s*ENGINE", re.DOTALL | re.IGNORECASE, ) m = col_re.search(sql_text) if not m: return [] col_block = m.group(1) cols = re.findall(r"`([^`]+)`\s+(?:bigint|int|mediumint|smallint|tinyint|varchar|text|mediumtext|longtext|char|datetime|date|float|double|decimal|enum|set|blob|mediumblob|longblob)", col_block, re.IGNORECASE) # Find INSERT blocks for this table insert_re = re.compile( rf"INSERT INTO `{re.escape(table_name)}`\s+VALUES\s*\(.+?\);", re.DOTALL | re.IGNORECASE, ) rows_out: list[dict] = [] for block in insert_re.finditer(sql_text): parsed = _parse_values_block(block.group(0)) for row in parsed: d: dict[str, Any] = {} for idx, col in enumerate(cols): val = row[idx] if idx < len(row) else "" if val == "\x00NULL\x00": d[col] = None else: d[col] = _unescape_sql(val) rows_out.append(d) return rows_out # --------------------------------------------------------------------------- # Divi version detection # --------------------------------------------------------------------------- def detect_divi_version(sql_text: str) -> str: if "wp:divi/" in sql_text: return "5" if "[et_pb_section" in sql_text: return "4" # Check et_theme_builder version in options m = re.search(r"'et_theme_builder_api_version','([^']+)'", sql_text) if m: return "5" return "unknown" # --------------------------------------------------------------------------- # Options extraction # --------------------------------------------------------------------------- def load_options(sql_text: str, prefix: str = "wp_") -> dict[str, str]: table = f"{prefix}options" rows = load_table(sql_text, table) return {r["option_name"]: r["option_value"] for r in rows if r.get("option_name")} def _parse_php_serialized_pairs(raw: str) -> dict[str, str]: """Extract key/value string pairs from a PHP-serialized array. Handles both escaped (SQL-dump) and unescaped forms. Only returns s->s pairs (string key, string value). """ result: dict[str, str] = {} # SQL dumps escape double-quotes as \\", giving patterns like: # s:9:\\"body_font\\";s:7:\\"DM Sans\\"; # Also handle unescaped form: s:9:"body_font";s:7:"DM Sans"; pat = re.compile( r's:\d+:\\"([^"\\]+)\\";s:\d+:\\"([^"\\]*)\\"' # SQL-escaped r'|s:\d+:"([^"]+)";s:\d+:"([^"]*)"', # plain ) for m in pat.finditer(raw): if m.group(1) is not None: k, v = m.group(1), m.group(2) else: k, v = m.group(3), m.group(4) result[k] = v return result def extract_design_system(options: dict[str, str]) -> dict: """Pull Divi theme colors, fonts, and spacing from wp_options.""" raw = options.get("et_divi", "") or options.get("et_divi_options", "") design: dict[str, Any] = {} # Parse PHP-serialized et_divi option (Divi 4 + 5 store settings here) if raw: pairs = _parse_php_serialized_pairs(raw) # Map Divi option keys to design-system keys key_map = { "accent_color": "primary_color_dark", "link_color": "primary_color", "body_font": "body_font", "heading_font": "heading_font", "header_font": "heading_font", # Divi 4 alias "body_font_size": "body_font_size", "body_line_height": "body_line_height", "heading_font_weight": "heading_font_weight", "header_text_size": "heading_font_size", "header_line_height": "heading_line_height", "header_color": "heading_color", "font_color": "body_color", "secondary_accent_color": "secondary_color", } for divi_key, design_key in key_map.items(): if divi_key in pairs: design.setdefault(design_key, pairs[divi_key]) # Site info design["site_url"] = options.get("siteurl", "") design["site_name"] = options.get("blogname", "") return design # --------------------------------------------------------------------------- # Page extraction # --------------------------------------------------------------------------- def extract_pages(sql_text: str, prefix: str = "wp_") -> list[dict]: """Return all published pages and posts with SEO meta.""" posts = load_table(sql_text, f"{prefix}posts") postmeta = load_table(sql_text, f"{prefix}postmeta") # Build postmeta lookup: post_id -> {meta_key: meta_value} meta_map: dict[str, dict[str, str]] = {} for row in postmeta: pid = str(row.get("post_id", "")) meta_map.setdefault(pid, {})[row.get("meta_key", "")] = row.get("meta_value", "") pages = [] for p in posts: if p.get("post_status") not in ("publish",): continue post_type = p.get("post_type", "") if post_type not in ("page", "post", "event"): continue pid = str(p.get("ID", "")) meta = meta_map.get(pid, {}) # Rank Math SEO fields rm_title = meta.get("rank_math_title", "") rm_desc = meta.get("rank_math_description", "") rm_focus = meta.get("rank_math_focus_keyword", "") entry = { "id": pid, "post_type": post_type, "slug": p.get("post_name", ""), "title": p.get("post_title", ""), "status": p.get("post_status", ""), "date": p.get("post_date", "")[:10], "modified": p.get("post_modified", "")[:10], "content_raw": p.get("post_content", ""), "excerpt": p.get("post_excerpt", ""), "parent_id": p.get("post_parent", "0"), "menu_order": p.get("menu_order", "0"), "seo_title": rm_title, "seo_description": rm_desc, "seo_keywords": rm_focus, "acf": {k: v for k, v in meta.items() if not k.startswith("_") and not k.startswith("rank_math") and not k.startswith("et_")}, } pages.append(entry) pages.sort(key=lambda x: int(x["menu_order"] or 0)) return pages # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): if len(sys.argv) < 3: print(f"Usage: {sys.argv[0]} ") sys.exit(1) extract_dir = Path(sys.argv[1]) out_dir = Path(sys.argv[2]) out_dir.mkdir(parents=True, exist_ok=True) sql_file = extract_dir / "database.sql" if not sql_file.exists(): # Search for it found = list(extract_dir.rglob("*.sql")) if not found: print(f"ERROR: No .sql file found under {extract_dir}") sys.exit(1) sql_file = found[0] print(f"Found SQL at: {sql_file}") print(f"Loading {sql_file} ({sql_file.stat().st_size / 1024 / 1024:.1f} MB)...") sql_text = sql_file.read_text(encoding="utf-8", errors="replace") # Detect Divi version divi_version = detect_divi_version(sql_text) print(f"Divi version detected: {divi_version}") # Load wp_options pkg = {} pkg_file = extract_dir / "package.json" if pkg_file.exists(): pkg = json.loads(pkg_file.read_text()) # AIOIM dumps use SERVMASK_PREFIX_ as a placeholder in the SQL file. # Detect which prefix the dump actually uses. if "SERVMASK_PREFIX_" in sql_text: sql_prefix = "SERVMASK_PREFIX_" else: sql_prefix = pkg.get("Database", {}).get("Prefix", "wp_") runtime_prefix = pkg.get("Database", {}).get("Prefix", "wp_") print(f"SQL prefix: {sql_prefix!r} (runtime prefix: {runtime_prefix!r})") options = load_options(sql_text, sql_prefix) print(f"Loaded {len(options)} options") # Design system design = extract_design_system(options) design["divi_version"] = divi_version design["wp_version"] = pkg.get("WordPress", {}).get("Version", "") design["plugins"] = pkg.get("Plugins", []) (out_dir / "design-system.json").write_text(json.dumps(design, indent=2, ensure_ascii=False)) print(f"Wrote design-system.json ({len(design)} keys)") # Pages pages = extract_pages(sql_text, sql_prefix) (out_dir / "pages.json").write_text(json.dumps(pages, indent=2, ensure_ascii=False)) print(f"Wrote pages.json ({len(pages)} pages/posts)") # Site info summary site_info = { "domain": pkg.get("SiteURL", options.get("siteurl", "")), "name": options.get("blogname", ""), "tagline": options.get("blogdescription", ""), "admin_email": options.get("admin_email", ""), "wp_version": pkg.get("WordPress", {}).get("Version", ""), "divi_version": divi_version, "plugins": pkg.get("Plugins", []), "prefix": runtime_prefix, "total_pages": len([p for p in pages if p["post_type"] == "page"]), "total_posts": len([p for p in pages if p["post_type"] == "post"]), } (out_dir / "site-info.json").write_text(json.dumps(site_info, indent=2, ensure_ascii=False)) print(f"Wrote site-info.json") print(f"\nDone. Output in: {out_dir}") print(f" pages.json : {len(pages)} entries") print(f" design-system.json: {len(design)} keys") print(f" site-info.json : done") if __name__ == "__main__": main()