recent updates

This commit is contained in:
2026-06-09 18:31:59 +02:00
parent 398b94965c
commit 94f7a1f72a
42 changed files with 8686 additions and 0 deletions
@@ -0,0 +1,368 @@
#!/usr/bin/env python3
"""Analyze WordPress MySQL dump from a .wpress extract.
Parses database.sql and outputs:
- pages.json : all published pages with title, slug, content, SEO meta
- design-system.json : colors, fonts from wp_options (Divi theme settings)
- site-info.json : domain, WP version, detected Divi version, plugin list
Usage:
python3 analyze_db.py <extract_dir> <output_data_dir>
extract_dir : path to wpress-extract/ (contains database.sql)
output_data_dir : where to write JSON output files (e.g. .planning/data/)
"""
from __future__ import annotations
import json
import os
import re
import sys
from pathlib import Path
from typing import Any
# ---------------------------------------------------------------------------
# SQL parsing helpers
# ---------------------------------------------------------------------------
def _unescape_sql(s: str) -> str:
"""Undo MySQL string escaping."""
return (s
.replace("\\'", "'")
.replace('\\"', '"')
.replace("\\\\", "\\")
.replace("\\n", "\n")
.replace("\\r", "\r")
.replace("\\t", "\t")
.replace("\\0", "\0"))
def _parse_values_block(sql_block: str) -> list[list[str]]:
"""Extract rows from a multi-row INSERT VALUES block.
Handles commas inside quoted strings via a simple state machine.
Returns list of rows; each row is a list of raw string values.
"""
rows: list[list[str]] = []
# Find VALUES section
m = re.search(r"VALUES\s*", sql_block, re.IGNORECASE)
if not m:
return rows
rest = sql_block[m.end():]
i = 0
n = len(rest)
while i < n:
# Skip to '('
while i < n and rest[i] != '(':
i += 1
if i >= n:
break
i += 1 # skip '('
row: list[str] = []
field = []
in_quote = False
quote_char = ''
while i < n:
c = rest[i]
if not in_quote:
if c in ("'", '"'):
in_quote = True
quote_char = c
i += 1
continue
elif c == ',' :
row.append("".join(field))
field = []
i += 1
continue
elif c == ')':
row.append("".join(field))
field = []
rows.append(row)
i += 1
break
elif c == 'N' and rest[i:i+4] == 'NULL':
field.append('\x00NULL\x00')
i += 4
continue
else:
field.append(c)
i += 1
else:
if c == '\\' and i + 1 < n:
field.append(c)
field.append(rest[i + 1])
i += 2
continue
elif c == quote_char:
in_quote = False
i += 1
continue
else:
field.append(c)
i += 1
return rows
def load_table(sql_text: str, table_name: str) -> list[dict]:
"""Return all rows for table_name as list of dicts."""
# Find column definition
col_re = re.compile(
rf"CREATE TABLE `{re.escape(table_name)}`\s*\((.*?)\)\s*ENGINE",
re.DOTALL | re.IGNORECASE,
)
m = col_re.search(sql_text)
if not m:
return []
col_block = m.group(1)
cols = re.findall(r"`([^`]+)`\s+(?:bigint|int|mediumint|smallint|tinyint|varchar|text|mediumtext|longtext|char|datetime|date|float|double|decimal|enum|set|blob|mediumblob|longblob)", col_block, re.IGNORECASE)
# Find INSERT blocks for this table
insert_re = re.compile(
rf"INSERT INTO `{re.escape(table_name)}`\s+VALUES\s*\(.+?\);",
re.DOTALL | re.IGNORECASE,
)
rows_out: list[dict] = []
for block in insert_re.finditer(sql_text):
parsed = _parse_values_block(block.group(0))
for row in parsed:
d: dict[str, Any] = {}
for idx, col in enumerate(cols):
val = row[idx] if idx < len(row) else ""
if val == "\x00NULL\x00":
d[col] = None
else:
d[col] = _unescape_sql(val)
rows_out.append(d)
return rows_out
# ---------------------------------------------------------------------------
# Divi version detection
# ---------------------------------------------------------------------------
def detect_divi_version(sql_text: str) -> str:
if "wp:divi/" in sql_text:
return "5"
if "[et_pb_section" in sql_text:
return "4"
# Check et_theme_builder version in options
m = re.search(r"'et_theme_builder_api_version','([^']+)'", sql_text)
if m:
return "5"
return "unknown"
# ---------------------------------------------------------------------------
# Options extraction
# ---------------------------------------------------------------------------
def load_options(sql_text: str, prefix: str = "wp_") -> dict[str, str]:
table = f"{prefix}options"
rows = load_table(sql_text, table)
return {r["option_name"]: r["option_value"] for r in rows if r.get("option_name")}
def _parse_php_serialized_pairs(raw: str) -> dict[str, str]:
"""Extract key/value string pairs from a PHP-serialized array.
Handles both escaped (SQL-dump) and unescaped forms.
Only returns s->s pairs (string key, string value).
"""
result: dict[str, str] = {}
# SQL dumps escape double-quotes as \\", giving patterns like:
# s:9:\\"body_font\\";s:7:\\"DM Sans\\";
# Also handle unescaped form: s:9:"body_font";s:7:"DM Sans";
pat = re.compile(
r's:\d+:\\"([^"\\]+)\\";s:\d+:\\"([^"\\]*)\\"' # SQL-escaped
r'|s:\d+:"([^"]+)";s:\d+:"([^"]*)"', # plain
)
for m in pat.finditer(raw):
if m.group(1) is not None:
k, v = m.group(1), m.group(2)
else:
k, v = m.group(3), m.group(4)
result[k] = v
return result
def extract_design_system(options: dict[str, str]) -> dict:
"""Pull Divi theme colors, fonts, and spacing from wp_options."""
raw = options.get("et_divi", "") or options.get("et_divi_options", "")
design: dict[str, Any] = {}
# Parse PHP-serialized et_divi option (Divi 4 + 5 store settings here)
if raw:
pairs = _parse_php_serialized_pairs(raw)
# Map Divi option keys to design-system keys
key_map = {
"accent_color": "primary_color_dark",
"link_color": "primary_color",
"body_font": "body_font",
"heading_font": "heading_font",
"header_font": "heading_font", # Divi 4 alias
"body_font_size": "body_font_size",
"body_line_height": "body_line_height",
"heading_font_weight": "heading_font_weight",
"header_text_size": "heading_font_size",
"header_line_height": "heading_line_height",
"header_color": "heading_color",
"font_color": "body_color",
"secondary_accent_color": "secondary_color",
}
for divi_key, design_key in key_map.items():
if divi_key in pairs:
design.setdefault(design_key, pairs[divi_key])
# Site info
design["site_url"] = options.get("siteurl", "")
design["site_name"] = options.get("blogname", "")
return design
# ---------------------------------------------------------------------------
# Page extraction
# ---------------------------------------------------------------------------
def extract_pages(sql_text: str, prefix: str = "wp_") -> list[dict]:
"""Return all published pages and posts with SEO meta."""
posts = load_table(sql_text, f"{prefix}posts")
postmeta = load_table(sql_text, f"{prefix}postmeta")
# Build postmeta lookup: post_id -> {meta_key: meta_value}
meta_map: dict[str, dict[str, str]] = {}
for row in postmeta:
pid = str(row.get("post_id", ""))
meta_map.setdefault(pid, {})[row.get("meta_key", "")] = row.get("meta_value", "")
pages = []
for p in posts:
if p.get("post_status") not in ("publish",):
continue
post_type = p.get("post_type", "")
if post_type not in ("page", "post", "event"):
continue
pid = str(p.get("ID", ""))
meta = meta_map.get(pid, {})
# Rank Math SEO fields
rm_title = meta.get("rank_math_title", "")
rm_desc = meta.get("rank_math_description", "")
rm_focus = meta.get("rank_math_focus_keyword", "")
entry = {
"id": pid,
"post_type": post_type,
"slug": p.get("post_name", ""),
"title": p.get("post_title", ""),
"status": p.get("post_status", ""),
"date": p.get("post_date", "")[:10],
"modified": p.get("post_modified", "")[:10],
"content_raw": p.get("post_content", ""),
"excerpt": p.get("post_excerpt", ""),
"parent_id": p.get("post_parent", "0"),
"menu_order": p.get("menu_order", "0"),
"seo_title": rm_title,
"seo_description": rm_desc,
"seo_keywords": rm_focus,
"acf": {k: v for k, v in meta.items() if not k.startswith("_") and not k.startswith("rank_math") and not k.startswith("et_")},
}
pages.append(entry)
pages.sort(key=lambda x: int(x["menu_order"] or 0))
return pages
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <extract_dir> <output_data_dir>")
sys.exit(1)
extract_dir = Path(sys.argv[1])
out_dir = Path(sys.argv[2])
out_dir.mkdir(parents=True, exist_ok=True)
sql_file = extract_dir / "database.sql"
if not sql_file.exists():
# Search for it
found = list(extract_dir.rglob("*.sql"))
if not found:
print(f"ERROR: No .sql file found under {extract_dir}")
sys.exit(1)
sql_file = found[0]
print(f"Found SQL at: {sql_file}")
print(f"Loading {sql_file} ({sql_file.stat().st_size / 1024 / 1024:.1f} MB)...")
sql_text = sql_file.read_text(encoding="utf-8", errors="replace")
# Detect Divi version
divi_version = detect_divi_version(sql_text)
print(f"Divi version detected: {divi_version}")
# Load wp_options
pkg = {}
pkg_file = extract_dir / "package.json"
if pkg_file.exists():
pkg = json.loads(pkg_file.read_text())
# AIOIM dumps use SERVMASK_PREFIX_ as a placeholder in the SQL file.
# Detect which prefix the dump actually uses.
if "SERVMASK_PREFIX_" in sql_text:
sql_prefix = "SERVMASK_PREFIX_"
else:
sql_prefix = pkg.get("Database", {}).get("Prefix", "wp_")
runtime_prefix = pkg.get("Database", {}).get("Prefix", "wp_")
print(f"SQL prefix: {sql_prefix!r} (runtime prefix: {runtime_prefix!r})")
options = load_options(sql_text, sql_prefix)
print(f"Loaded {len(options)} options")
# Design system
design = extract_design_system(options)
design["divi_version"] = divi_version
design["wp_version"] = pkg.get("WordPress", {}).get("Version", "")
design["plugins"] = pkg.get("Plugins", [])
(out_dir / "design-system.json").write_text(json.dumps(design, indent=2, ensure_ascii=False))
print(f"Wrote design-system.json ({len(design)} keys)")
# Pages
pages = extract_pages(sql_text, sql_prefix)
(out_dir / "pages.json").write_text(json.dumps(pages, indent=2, ensure_ascii=False))
print(f"Wrote pages.json ({len(pages)} pages/posts)")
# Site info summary
site_info = {
"domain": pkg.get("SiteURL", options.get("siteurl", "")),
"name": options.get("blogname", ""),
"tagline": options.get("blogdescription", ""),
"admin_email": options.get("admin_email", ""),
"wp_version": pkg.get("WordPress", {}).get("Version", ""),
"divi_version": divi_version,
"plugins": pkg.get("Plugins", []),
"prefix": runtime_prefix,
"total_pages": len([p for p in pages if p["post_type"] == "page"]),
"total_posts": len([p for p in pages if p["post_type"] == "post"]),
}
(out_dir / "site-info.json").write_text(json.dumps(site_info, indent=2, ensure_ascii=False))
print(f"Wrote site-info.json")
print(f"\nDone. Output in: {out_dir}")
print(f" pages.json : {len(pages)} entries")
print(f" design-system.json: {len(design)} keys")
print(f" site-info.json : done")
if __name__ == "__main__":
main()
@@ -0,0 +1,271 @@
#!/usr/bin/env python3
"""Extract content from Divi 5 block markup in pages.json.
Reads .planning/data/pages.json (produced by analyze_db.py) and for each page
parses the `content_raw` Divi 5 block structure into a clean per-page JSON
under .planning/data/content/{slug}.json.
Usage:
python3 extract_divi5.py <pages_json> <output_dir>
pages_json : path to .planning/data/pages.json
output_dir : directory to write {slug}.json files (created if missing)
"""
from __future__ import annotations
import json
import re
import sys
from pathlib import Path
from html.parser import HTMLParser
# ---------------------------------------------------------------------------
# HTML inner-text extractor
# ---------------------------------------------------------------------------
class _TextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self.parts: list[str] = []
def handle_data(self, data: str):
self.parts.append(data)
def get_text(self) -> str:
return " ".join(self.parts).strip()
def _text(html: str) -> str:
p = _TextExtractor()
p.feed(html)
return p.get_text()
# ---------------------------------------------------------------------------
# Divi block parsing
# ---------------------------------------------------------------------------
# Matches opening block comment: <!-- wp:divi/MODULE {JSON} -->
_BLOCK_OPEN = re.compile(r"<!--\s*wp:(divi/[a-z0-9_-]+)\s*(.*?)--?>", re.DOTALL)
# Matches closing block comment: <!-- /wp:divi/MODULE -->
_BLOCK_CLOSE = re.compile(r"<!--\s*/wp:(divi/[a-z0-9_-]+)\s*-->")
# Strip et_pb_* class tokens and data-et-* attributes
_ET_CLASS = re.compile(r"\b(et_pb_[a-z0-9_-]+|divi-[a-z0-9_-]+-[a-z0-9_-]+|d5_[a-z0-9_-]+)\b", re.IGNORECASE)
_ET_ATTR = re.compile(r'\s+data-(?:et|builder|module-id|module-class|d5)-[a-z0-9_-]+\s*=\s*"[^"]*"', re.IGNORECASE)
_EMPTY_CL = re.compile(r'\s+class="\s*"')
def _clean(html: str) -> str:
"""Strip Divi noise from an HTML fragment."""
out = _BLOCK_OPEN.sub("", html)
out = _BLOCK_CLOSE.sub("", out)
out = _ET_ATTR.sub("", out)
out = _ET_CLASS.sub("", out)
out = _EMPTY_CL.sub("", out)
out = re.sub(r"\n{3,}", "\n\n", out)
return out.strip()
def _parse_attrs(raw_json: str) -> dict:
"""Parse the JSON attrs blob from a block comment (may be empty)."""
raw_json = raw_json.strip()
if not raw_json:
return {}
try:
return json.loads(raw_json)
except Exception:
return {}
def _extract_inner(content: str, block_type: str) -> str:
"""Return the raw inner HTML of the first matching block."""
open_pat = re.compile(rf"<!--\s*wp:{re.escape(block_type)}[^>]*-->", re.DOTALL)
close_pat = re.compile(rf"<!--\s*/wp:{re.escape(block_type)}\s*-->")
m = open_pat.search(content)
if not m:
return ""
start = m.end()
m2 = close_pat.search(content, start)
end = m2.start() if m2 else len(content)
return content[start:end]
def _bg_color(attrs: dict) -> str:
"""Extract background colour from Divi 5 attrs dict."""
bg = attrs.get("backgroundColor", {})
if isinstance(bg, dict):
return bg.get("value", bg.get("color", ""))
return str(bg) if bg else ""
def _section_type(bg: str) -> str:
"""Classify section by background colour."""
dark_colors = {"#0f5f53", "#1a3a34", "#0d4d42"}
brand_colors = {"#1a8a7a", "#20a090"}
light_colors = {"#f5f5f5", "#fafafa", "#f0f0f0", "#efefef"}
bg_lower = bg.lower().strip()
if bg_lower in dark_colors:
return "dark"
if bg_lower in brand_colors:
return "brand"
if bg_lower in light_colors:
return "light"
if bg_lower in ("#ffffff", "#fff", ""):
return "white"
return "custom"
# ---------------------------------------------------------------------------
# Section/module extraction
# ---------------------------------------------------------------------------
def _extract_modules(section_html: str) -> list[dict]:
"""Walk block comments inside a section and extract module data."""
modules: list[dict] = []
pos = 0
content = section_html
for m in _BLOCK_OPEN.finditer(content):
block_type = m.group(1) # e.g. "divi/text"
attrs = _parse_attrs(m.group(2))
inner_start = m.end()
# Find matching close tag
close_pat = re.compile(rf"<!--\s*/wp:{re.escape(block_type)}\s*-->")
close_m = close_pat.search(content, inner_start)
inner_html = content[inner_start : close_m.start() if close_m else len(content)]
clean_inner = _clean(inner_html)
module_type = block_type.split("/")[-1] # "text", "button", "image", etc.
mod: dict = {"module": module_type}
if module_type == "text":
mod["html"] = clean_inner
mod["text"] = _text(clean_inner)
elif module_type in ("button", "cta"):
mod["text"] = attrs.get("buttonText", _text(clean_inner))
mod["url"] = attrs.get("buttonUrl", attrs.get("url", "#"))
elif module_type == "image":
src = attrs.get("src", attrs.get("url", ""))
mod["src"] = src
mod["alt"] = attrs.get("altText", attrs.get("alt", ""))
mod["caption"] = attrs.get("caption", "")
elif module_type == "blurb":
mod["title"] = attrs.get("title", "")
mod["icon"] = attrs.get("iconName", "")
mod["html"] = clean_inner
mod["text"] = _text(clean_inner)
elif module_type == "testimonial":
mod["quote"] = attrs.get("content", _text(clean_inner))
mod["author"] = attrs.get("authorName", "")
mod["company"] = attrs.get("authorJobTitle", "")
elif module_type == "video":
mod["src"] = attrs.get("src", "")
mod["poster"] = attrs.get("poster", attrs.get("image", ""))
elif module_type in ("accordion", "toggle"):
items = re.findall(r"<dt[^>]*>(.*?)</dt>\s*<dd[^>]*>(.*?)</dd>", clean_inner, re.DOTALL)
mod["items"] = [{"q": q.strip(), "a": a.strip()} for q, a in items]
elif module_type == "contact_form":
mod["form_id"] = attrs.get("formId", "")
mod["note"] = "REPLACE with AM vanilla form — see 08-forms.md"
else:
mod["html"] = clean_inner
mod["attrs"] = attrs
modules.append(mod)
return modules
def parse_page_content(content_raw: str) -> list[dict]:
"""Parse Divi 5 block content into a list of section dicts."""
sections: list[dict] = []
section_pat = re.compile(r"<!--\s*wp:divi/section(.*?)-->", re.DOTALL)
section_close = re.compile(r"<!--\s*/wp:divi/section\s*-->")
for sm in section_pat.finditer(content_raw):
attrs = _parse_attrs(sm.group(1).strip())
start = sm.end()
close_m = section_close.search(content_raw, start)
sec_html = content_raw[start : close_m.start() if close_m else len(content_raw)]
bg = _bg_color(attrs)
sec_type = _section_type(bg)
modules = _extract_modules(sec_html)
# Determine semantic role from first module
role = "content"
if modules and modules[0]["module"] in ("fullwidth_header", "text"):
first_html = modules[0].get("html", "")
if "<h1" in first_html:
role = "hero"
sections.append({
"role": role,
"section_type": sec_type,
"background_color": bg,
"attrs": attrs,
"modules": modules,
})
return sections
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <pages_json> <output_dir>")
sys.exit(1)
pages_path = Path(sys.argv[1])
out_dir = Path(sys.argv[2])
out_dir.mkdir(parents=True, exist_ok=True)
pages = json.loads(pages_path.read_text(encoding="utf-8"))
print(f"Processing {len(pages)} pages...")
for page in pages:
slug = page.get("slug") or f"page-{page['id']}"
content = page.get("content_raw", "")
sections = parse_page_content(content) if content.strip() else []
output = {
"id": page["id"],
"slug": slug,
"title": page["title"],
"post_type": page["post_type"],
"seo_title": page.get("seo_title", ""),
"seo_description": page.get("seo_description", ""),
"seo_keywords": page.get("seo_keywords", ""),
"acf": page.get("acf", {}),
"date": page.get("date", ""),
"modified": page.get("modified", ""),
"sections": sections,
"section_count": len(sections),
}
out_file = out_dir / f"{slug}.json"
out_file.write_text(json.dumps(output, indent=2, ensure_ascii=False))
print(f" {slug}.json ({len(sections)} sections)")
print(f"\nDone. {len(pages)} content files in {out_dir}")
if __name__ == "__main__":
main()
@@ -0,0 +1,99 @@
#!/usr/bin/env python3
"""
extract_nav.py — Extract WordPress navigation menus from database.sql dump.
Outputs nav.json: [{label, href, display_order, is_cta}]
Usage: python3 extract_nav.py <wpress-extract-dir> <output-data-dir>
"""
import sys, re, json, os
CTA_KEYWORDS = {'book', 'get started', 'contact', 'sign up', 'register', 'join', 'buy', 'shop'}
def extract_nav(extract_dir: str, data_dir: str):
sql_path = os.path.join(extract_dir, 'database.sql')
if not os.path.exists(sql_path):
print(f"ERROR: {sql_path} not found", file=sys.stderr)
sys.exit(1)
with open(sql_path, encoding='utf-8', errors='replace') as f:
sql = f.read()
# Detect table prefix
prefix_match = re.search(r"INSERT INTO `(\w+)options`", sql)
prefix = prefix_match.group(1) if prefix_match else 'wp_'
# Find nav menu items: post_type = 'nav_menu_item'
# Extract INSERT rows from wp_posts
posts_pattern = re.compile(
r"INSERT INTO `%sposts`[^;]+?;" % re.escape(prefix),
re.DOTALL | re.IGNORECASE
)
postmeta_pattern = re.compile(
r"INSERT INTO `%spostmeta`[^;]+?;" % re.escape(prefix),
re.DOTALL | re.IGNORECASE
)
nav_posts = {}
for m in posts_pattern.finditer(sql):
rows = re.findall(r"\((\d+),[^,]*,'[^']*','[^']*','([^']*)'[^,]*,[^,]*,[^,]*,[^,]*,[^,]*,[^,]*,[^,]*,'([^']*)'[^,]*,[^,]*,\d+,'nav_menu_item'", m.group())
for post_id, post_title, post_status in rows:
if post_status == 'publish':
nav_posts[post_id] = {'label': post_title, 'href': '/', 'menu_order': 0}
if not nav_posts:
# Fallback: simpler pattern
for m in posts_pattern.finditer(sql):
block = m.group()
ids = re.findall(r"\((\d+),", block)
titles = re.findall(r"'([^']{1,60})'", block)
for i, post_id in enumerate(ids):
if i < len(titles) and titles[i]:
nav_posts[post_id] = {'label': titles[i], 'href': '/', 'menu_order': i}
# Extract menu item URLs from postmeta (_menu_item_url or _menu_item_object_id)
for m in postmeta_pattern.finditer(sql):
block = m.group()
# _menu_item_url
url_matches = re.findall(r"\((\d+),\s*\d+,\s*'_menu_item_url',\s*'([^']*)'\)", block)
for post_id, url in url_matches:
if post_id in nav_posts and url:
nav_posts[post_id]['href'] = url
# _menu_item_menu_order
order_matches = re.findall(r"\((\d+),\s*\d+,\s*'_menu_item_menu_order',\s*'(\d+)'\)", block)
for post_id, order in order_matches:
if post_id in nav_posts:
nav_posts[post_id]['menu_order'] = int(order)
# Clean up hrefs: make relative if same domain
items = []
for idx, (post_id, item) in enumerate(sorted(nav_posts.items(), key=lambda x: x[1].get('menu_order', 0))):
label = item['label'].strip()
href = item['href'].strip()
if not label:
continue
# Make relative
href = re.sub(r'https?://[^/]+', '', href) or '/'
if not href.startswith('/'):
href = '/' + href
is_cta = 1 if any(kw in label.lower() for kw in CTA_KEYWORDS) else 0
items.append({
'label': label,
'href': href,
'display_order': idx + 1,
'is_cta': is_cta
})
os.makedirs(data_dir, exist_ok=True)
out_path = os.path.join(data_dir, 'nav.json')
with open(out_path, 'w', encoding='utf-8') as f:
json.dump(items, f, indent=2, ensure_ascii=False)
print(f"nav.json: {len(items)} items → {out_path}")
for item in items:
print(f" {'[CTA]' if item['is_cta'] else ' '} {item['label']}{item['href']}")
if __name__ == '__main__':
if len(sys.argv) != 3:
print("Usage: python3 extract_nav.py <wpress-extract-dir> <output-data-dir>")
sys.exit(1)
extract_nav(sys.argv[1], sys.argv[2])
@@ -0,0 +1,110 @@
#!/usr/bin/env python3
"""Extract All-in-One WP Migration .wpress archive.
Usage:
python3 extract_wpress.py <path/to/file.wpress> <output/directory>
The .wpress format is a sequential binary archive with 4377-byte headers:
255 bytes filename (null-padded)
14 bytes file size in bytes (ASCII digits, null-padded)
12 bytes mtime unix timestamp (ASCII digits, null-padded)
4096 bytes relative path (null-padded)
Followed immediately by the raw file bytes, then the next header.
"""
import os
import sys
import argparse
from pathlib import Path
HEADER_SIZE = 4377
NAME_LEN = 255
SIZE_LEN = 14
MTIME_LEN = 12
PATH_LEN = 4096
def _parse_int(b: bytes) -> int:
s = b.split(b"\x00", 1)[0].decode(errors="replace").strip()
return int(s) if s else 0
def _parse_str(b: bytes) -> str:
return b.split(b"\x00", 1)[0].decode(errors="replace")
def extract(wpress_path: str, out_dir: str, verbose: bool = True) -> dict:
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
count = 0
total_bytes = 0
skipped = 0
with open(wpress_path, "rb") as f:
while True:
header = f.read(HEADER_SIZE)
if not header or len(header) < HEADER_SIZE:
break
if header == b"\x00" * HEADER_SIZE:
break
name = _parse_str(header[0:NAME_LEN])
size = _parse_int(header[NAME_LEN : NAME_LEN + SIZE_LEN])
mtime = _parse_int(header[NAME_LEN + SIZE_LEN : NAME_LEN + SIZE_LEN + MTIME_LEN])
path = _parse_str(header[NAME_LEN + SIZE_LEN + MTIME_LEN : NAME_LEN + SIZE_LEN + MTIME_LEN + PATH_LEN])
# Sanitise path traversal
path = path.lstrip("/").lstrip("\\").lstrip(".")
path = path.lstrip("/")
dest_dir = out / path if path else out
dest_dir.mkdir(parents=True, exist_ok=True)
dest_file = dest_dir / name
if not name:
skipped += 1
f.seek(size, 1)
continue
with open(dest_file, "wb") as o:
remaining = size
while remaining > 0:
chunk = f.read(min(65536, remaining))
if not chunk:
break
o.write(chunk)
remaining -= len(chunk)
try:
if mtime > 0:
os.utime(dest_file, (mtime, mtime))
except Exception:
pass
count += 1
total_bytes += size
if verbose and count % 200 == 0:
print(f" [{count} files | {total_bytes / 1024 / 1024:.1f} MB extracted]", flush=True)
result = {
"files": count,
"bytes": total_bytes,
"mb": round(total_bytes / 1024 / 1024, 1),
"skipped": skipped,
"out_dir": str(out),
}
print(f"DONE: {count} files | {result['mb']} MB -> {out_dir} (skipped {skipped})")
return result
def main():
p = argparse.ArgumentParser(description="Extract .wpress archive")
p.add_argument("wpress", help="Path to .wpress file")
p.add_argument("outdir", help="Destination directory")
p.add_argument("-q", "--quiet", action="store_true", help="Suppress progress output")
args = p.parse_args()
extract(args.wpress, args.outdir, verbose=not args.quiet)
if __name__ == "__main__":
main()
@@ -0,0 +1,149 @@
#!/usr/bin/env python3
"""
migrate.py — AM Stack A migration launcher.
Points at a .wpress file and runs all extraction phases automatically.
Phases 7+ require human/agent review of staged seed_databases.py.
Usage:
python3 migrate.py --wpress /path/to/backup.wpress --domain example.com [--project /path/to/project]
Output:
Runs phases 0-6, then prints agent breadcrumbs for phases 7-11.
"""
import argparse, os, sys, subprocess, json
SOPS = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
SCRIPTS = os.path.join(SOPS, 'scripts')
def run(cmd: list, label: str) -> bool:
print(f"\n[{label}] Running: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=False)
if result.returncode != 0:
print(f"[{label}] FAILED (exit {result.returncode})")
return False
print(f"[{label}] OK")
return True
def phase_header(n: int, title: str):
print(f"\n{'='*60}")
print(f" Phase {n}{title}")
print(f"{'='*60}")
def main():
parser = argparse.ArgumentParser(description='AM Stack A migration launcher')
parser.add_argument('--wpress', required=True, help='Path to .wpress backup file')
parser.add_argument('--domain', required=True, help='Target domain (e.g. example.com)')
parser.add_argument('--project', help='Project directory (default: ~/arisingmedia-websites/{domain})')
args = parser.parse_args()
wpress = os.path.abspath(args.wpress)
domain = args.domain
project = args.project or os.path.expanduser(f'~/arisingmedia-websites/{domain}')
extract_dir = os.path.join(project, '.planning', 'wpress-extract')
data_dir = os.path.join(project, '.planning', 'data')
content_dir = os.path.join(data_dir, 'content')
if not os.path.exists(wpress):
print(f"ERROR: .wpress file not found: {wpress}")
sys.exit(1)
print(f"\nAM Stack A Migration Pipeline")
print(f" Domain: {domain}")
print(f" Project: {project}")
print(f" Archive: {wpress}")
# Phase 0 — Setup
phase_header(0, 'Setup')
for d in [extract_dir, data_dir, content_dir,
os.path.join(project, 'assets', 'images'),
os.path.join(project, 'build'),
os.path.join(project, 'src', 'api', 'data'),
os.path.join(project, 'src', 'api', 'templates'),
os.path.join(project, 'src', 'api', 'components')]:
os.makedirs(d, exist_ok=True)
print(f" mkdir {d}")
# Phase 1 — Extract
phase_header(1, 'Extract .wpress archive')
if not run(['python3', os.path.join(SCRIPTS, 'extract_wpress.py'), wpress, extract_dir], 'Phase 1'):
sys.exit(1)
# Phase 2 — DB Analysis
phase_header(2, 'Database analysis')
if not run(['python3', os.path.join(SCRIPTS, 'analyze_db.py'), extract_dir, data_dir], 'Phase 2'):
sys.exit(1)
# Detect Divi version
site_info_path = os.path.join(data_dir, 'site-info.json')
divi_version = 5
if os.path.exists(site_info_path):
with open(site_info_path) as f:
info = json.load(f)
divi_version = info.get('divi_version', 5)
print(f" Divi version detected: {divi_version}")
# Phase 3 — Nav extraction
phase_header(3, 'Extract navigation menus')
run(['python3', os.path.join(SCRIPTS, 'extract_nav.py'), extract_dir, data_dir], 'Phase 3 (nav)')
# Phase 3 — Content extraction
extract_script = f'extract_divi{divi_version}.py'
pages_json = os.path.join(data_dir, 'pages.json')
if not run(['python3', os.path.join(SCRIPTS, extract_script), pages_json, content_dir], f'Phase 3 (divi{divi_version})'):
print(f" WARNING: content extraction had errors — review {content_dir}")
# Phase 5 — Media
phase_header(5, 'Extract and convert media')
run(['python3', os.path.join(SCRIPTS, 'extract_media.py'), extract_dir, data_dir,
os.path.join(project, 'assets', 'images')], 'Phase 5')
# Phase 6 — Stage seed_databases.py
phase_header(6, 'Stage seed_databases.py skeleton')
seed_path = os.path.join(project, 'build', 'seed_databases.py')
# Check if stage_seed.py exists
stage_script = os.path.join(SCRIPTS, 'stage_seed.py')
if os.path.exists(stage_script):
run(['python3', stage_script, data_dir, seed_path, '--domain', domain], 'Phase 6')
else:
print(f" WARNING: stage_seed.py not found — seed_databases.py must be written manually")
print(f" Reference: /home/sirdrez/arisingmedia-websites/vibrantyou.yoga/build/seed_databases.py")
# Print agent breadcrumbs for remaining phases
print(f"\n{'='*60}")
print(" EXTRACTION COMPLETE — Manual/Agent phases follow")
print(f"{'='*60}")
print(f"""
Phases 0-6 complete. Staged content is at:
{data_dir}/content/ ← extracted page sections (JSON)
{data_dir}/nav.json ← navigation items
{data_dir}/media-manifest.json ← image URL mappings
{seed_path} ← seed_databases.py skeleton
Next steps (see 10-agent-breadcrumbs.md for full detail):
Phase 7 — REVIEW seed_databases.py
Open: {seed_path}
For each page: verify sections_json has correct section types
Replace em-dashes. Remove Divi shortcode residue. Review nav items.
Phase 8 — RUN seed_databases.py
cd {project} && python3 build/seed_databases.py
Verify: output shows all counts > 0
Phase 9 — SCAFFOLD PHP templates
Copy from reference: vibrantyou.yoga/src/api/
Update brand name and colors in _header.php + _footer.php
Phase 10 — BUILD
cd {project} && docker compose build --no-cache && docker compose up -d
Verify: curl -I http://localhost:PORT/
Phase 11 — QA
bash {SOPS}/../tools/verify-protection.sh http://localhost:PORT
Lighthouse in Firefox
Reference: {SOPS}/wp-divi-pipeline-to-am-stack/10-agent-breadcrumbs.md
""")
if __name__ == '__main__':
main()
@@ -0,0 +1,175 @@
#!/usr/bin/env bash
# run_pipeline.sh — AM WP+Divi to HTML pipeline master script
# Usage: bash run_pipeline.sh <domain>
# Example: bash run_pipeline.sh vibrantyou.yoga
set -euo pipefail
DOMAIN="${1:-}"
if [ -z "$DOMAIN" ]; then
echo "Usage: $0 <domain>"
echo " Example: $0 vibrantyou.yoga"
exit 1
fi
PROJECT="/home/sirdrez/arisingmedia-websites/$DOMAIN"
SOPS="/home/sirdrez/arisingmedia-websites/.am-webdesign-sops"
SCRIPTS="$SOPS/wp-divi-pipeline/scripts"
WPRESS=$(ls "$PROJECT/.planning/"*.wpress 2>/dev/null | head -1)
if [ -z "$WPRESS" ]; then
echo "ERROR: No .wpress file found in $PROJECT/.planning/"
exit 1
fi
echo "================================================"
echo " AM WP+Divi Pipeline"
echo " Domain: $DOMAIN"
echo " Archive: $(basename $WPRESS)"
echo "================================================"
echo ""
# ---------------------------------------------------------------------------
# Phase 0 — Directory structure
# ---------------------------------------------------------------------------
echo "[Phase 0] Creating directory structure..."
mkdir -p "$PROJECT"/{src/{about,services,contact,blog,classes,components,assets/{css,js,images,svg,fonts}},build,infra,api}
mkdir -p "$PROJECT/.planning"/{data/{content},scripts,wpress-extract}
echo " OK: directories created"
echo ""
# ---------------------------------------------------------------------------
# Phase 1 — Extract .wpress archive
# ---------------------------------------------------------------------------
EXTRACT_DIR="$PROJECT/.planning/wpress-extract"
if [ -f "$EXTRACT_DIR/database.sql" ]; then
echo "[Phase 1] Archive already extracted — skipping"
echo " Found: $EXTRACT_DIR/database.sql"
else
echo "[Phase 1] Extracting archive (this may take a few minutes)..."
python3 "$SCRIPTS/extract_wpress.py" "$WPRESS" "$EXTRACT_DIR"
echo " OK: extraction complete"
fi
echo ""
# ---------------------------------------------------------------------------
# Phase 2 — Database analysis
# ---------------------------------------------------------------------------
DATA_DIR="$PROJECT/.planning/data"
echo "[Phase 2] Analyzing database..."
python3 "$SCRIPTS/analyze_db.py" "$EXTRACT_DIR" "$DATA_DIR"
PAGE_COUNT=$(python3 -c "import json; print(len(json.load(open('$DATA_DIR/pages.json'))))" 2>/dev/null || echo 0)
echo " OK: $PAGE_COUNT pages extracted"
echo ""
# ---------------------------------------------------------------------------
# Phase 3 — Content extraction (Divi 5)
# ---------------------------------------------------------------------------
echo "[Phase 3] Extracting Divi 5 content..."
python3 "$SCRIPTS/extract_divi5.py" \
"$DATA_DIR/pages.json" \
"$DATA_DIR/content/"
echo " OK: content JSON files written"
echo ""
# ---------------------------------------------------------------------------
# Phase 4 — Design system (manual step)
# ---------------------------------------------------------------------------
echo "[Phase 4] Design system (MANUAL STEP REQUIRED)"
echo " Read: $DATA_DIR/design-system.json"
echo " Write: $PROJECT/src/assets/css/main.css"
echo " Ref: $SOPS/wp-divi-pipeline/04-design-system-extraction.md"
echo ""
# ---------------------------------------------------------------------------
# Phase 5 — Media migration
# ---------------------------------------------------------------------------
UPLOADS_DIR="$EXTRACT_DIR/uploads"
IMAGES_DIR="$PROJECT/src/assets/images"
if [ -d "$UPLOADS_DIR" ]; then
echo "[Phase 5] Migrating media..."
# Catalog originals (skip WP-generated size variants)
find "$UPLOADS_DIR" -type f \( -name "*.jpg" -o -name "*.jpeg" -o -name "*.png" -o -name "*.gif" -o -name "*.webp" \) \
| grep -v -E "\-[0-9]+x[0-9]+\.(jpg|jpeg|png|webp|gif)$" \
| sort > "$DATA_DIR/media-originals.txt"
MEDIA_COUNT=$(wc -l < "$DATA_DIR/media-originals.txt")
echo " Found: $MEDIA_COUNT original images"
# Copy to src/assets/images/
while IFS= read -r src_img; do
fname=$(basename "$src_img")
cp "$src_img" "$IMAGES_DIR/$fname"
done < "$DATA_DIR/media-originals.txt"
# Convert to WebP if cwebp available
if command -v cwebp &>/dev/null; then
echo " Converting to WebP..."
cd "$IMAGES_DIR"
for img in *.jpg *.jpeg *.png; do
[ -f "$img" ] || continue
base="${img%.*}"
cwebp -q 82 "$img" -o "${base}.webp" 2>/dev/null && rm "$img"
done
WEBP_COUNT=$(ls *.webp 2>/dev/null | wc -l)
echo " WebP files: $WEBP_COUNT"
cd "$PROJECT"
else
echo " WARN: cwebp not found — images copied as-is (convert manually)"
fi
echo " OK: media migrated to $IMAGES_DIR"
else
echo "[Phase 5] No uploads/ directory found — skipping media migration"
fi
echo ""
# ---------------------------------------------------------------------------
# Phase 6 — HTML build (manual step)
# ---------------------------------------------------------------------------
echo "[Phase 6] HTML Build (MANUAL STEP REQUIRED)"
echo " Ref: $SOPS/wp-divi-pipeline/05-content-migration.md"
echo " Build order:"
echo " 1. src/assets/css/main.css"
echo " 2. src/assets/css/components.css"
echo " 3. src/components/header.html"
echo " 4. src/components/footer.html"
echo " 5. src/assets/js/components.js"
echo " 6. src/assets/js/main.js"
echo " 7. src/index.html (home — design system anchor)"
echo " 8. Remaining pages"
echo ""
# ---------------------------------------------------------------------------
# Phase 7 — SEO audit
# ---------------------------------------------------------------------------
echo "[Phase 7] SEO audit (run after HTML build):"
echo " grep -rL '<title>' $PROJECT/src --include='*.html' | grep -v _template"
echo " grep -rL 'canonical' $PROJECT/src --include='*.html' | grep -v _template"
echo " grep -rL 'ld+json' $PROJECT/src --include='*.html' | grep -v _template"
echo " grep -r '{{' $PROJECT/src --include='*.html'"
echo ""
# ---------------------------------------------------------------------------
# Phase 8 — Infra
# ---------------------------------------------------------------------------
echo "[Phase 8] Infra setup:"
echo " Copy Dockerfile + docker-compose.yml from vibrantyoucoaching.com"
echo " Update server_name in infra/nginx.conf to: $DOMAIN"
echo " Run: docker compose up -d --build"
echo ""
# ---------------------------------------------------------------------------
# Phase 9 — Protection check
# ---------------------------------------------------------------------------
echo "[Phase 9] After deploy, run:"
echo " bash $SOPS/tools/verify-protection.sh https://$DOMAIN"
echo ""
echo "================================================"
echo " Pipeline setup complete."
echo " Phases 0-3 + 5 executed automatically."
echo " Phases 4, 6, 7, 8, 9 require manual steps."
echo " See $SOPS/wp-divi-pipeline/ for all SOPs."
echo "================================================"
@@ -0,0 +1,574 @@
#!/usr/bin/env python3
"""
stage_seed.py — Phase 6 of WP/Divi → Stack A migration pipeline.
Reads extracted JSON from prior pipeline run and generates a seed_databases.py
skeleton for the target project. Human/agent reviews [FILL] markers and fills
gaps before running the seeder.
Usage:
python3 stage_seed.py <data_dir> <seed_path> --domain <domain> [--force]
Example:
python3 stage_seed.py /path/to/.planning/data build/seed_databases.py --domain example.com
"""
import argparse
import json
import os
import re
from datetime import datetime
def slugify(text):
"""Convert text to URL-safe slug."""
return re.sub(r'[^a-z0-9]+', '-', text.lower()).strip('-')
def infer_template(slug):
"""Infer template type from page slug."""
slug_lower = slug.lower()
if slug_lower == 'home':
return 'home'
elif slug_lower in ('classes', 'class'):
return 'classes'
elif slug_lower == 'schedule':
return 'schedule'
elif slug_lower == 'glossary':
return 'glossary'
elif slug_lower in ('blog', 'posts', 'articles'):
return 'blog'
else:
return 'static'
def load_json_file(path):
"""Load JSON file, return empty dict/list if not found."""
if not os.path.exists(path):
return None
try:
with open(path, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Warning: Failed to load {path}: {e}")
return None
def generate_seed_script(data_dir, domain, design_system, pages, glossary, nav):
"""Generate the seed_databases.py script content."""
now = datetime.now().isoformat()
# Build pages_data list in outer scope
pages_list = []
for page in pages:
if page.get('status') != 'publish' or page.get('post_type') != 'page':
continue
slug = page.get('slug', '')
title = page.get('title', '[FILL] Title needed')
meta_desc = page.get('seo_description', '')
if not meta_desc:
meta_desc = f"[FILL] Meta description for {slug}"
canonical = f"https://{domain}/{slug}/" if slug != 'home' else f"https://{domain}/"
date_str = page.get('date', datetime.now().isoformat())
# Infer template
template_map = {
'home': 'home',
'classes': 'classes',
'schedule': 'schedule',
'glossary': 'glossary',
'blog': 'blog',
}
template = template_map.get(slug, 'static')
pages_list.append({
'slug': slug,
'template': template,
'title': title,
'meta_description': meta_desc,
'canonical_url': canonical,
'hero_h1': f"[FILL] {title}",
'sections_json': '[]',
'updated_at': date_str
})
# Build pages_data JSON string
pages_json_str = json.dumps(pages_list, indent=8)
script = f'''#!/usr/bin/env python3
"""
seed_databases.py — generated by stage_seed.py on {now}
Source: {data_dir}
Domain: {domain}
EDIT THIS FILE then run: python3 build/seed_databases.py
Content marked [FILL] needs human/agent review before seeding.
"""
import sqlite3
import json
import os
from datetime import datetime
DB_DIR = os.path.join(os.path.dirname(__file__), '..', 'src', 'api', 'data')
os.makedirs(DB_DIR, exist_ok=True)
def slugify(text):
"""Convert text to URL-safe slug."""
import re
return re.sub(r'[^a-z0-9]+', '-', text.lower()).strip('-')
def seed_pages():
"""Create pages.sqlite and populate with published pages."""
db_path = os.path.join(DB_DIR, 'pages.sqlite')
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS pages (
id INTEGER PRIMARY KEY,
slug TEXT UNIQUE NOT NULL,
template TEXT NOT NULL,
title TEXT NOT NULL,
meta_description TEXT,
canonical_url TEXT,
og_image TEXT,
schema_json TEXT,
hero_eyebrow TEXT,
hero_h1 TEXT,
hero_lead TEXT,
sections_json TEXT,
updated_at TEXT
)
""")
pages_data = {pages_json_str}
for page in pages_data:
c.execute("""
INSERT OR REPLACE INTO pages
(slug, template, title, meta_description, canonical_url, hero_h1, sections_json, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
page['slug'],
page['template'],
page['title'],
page['meta_description'],
page['canonical_url'],
page['hero_h1'],
page['sections_json'],
page['updated_at']
))
conn.commit()
conn.close()
print(f"✓ pages.sqlite created with {{len(pages_data)}} pages")
def seed_nav():
"""Create nav.sqlite and populate navigation items."""
db_path = os.path.join(DB_DIR, 'nav.sqlite')
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS nav_items (
id INTEGER PRIMARY KEY,
label TEXT NOT NULL,
href TEXT NOT NULL,
display_order INTEGER DEFAULT 0,
is_cta INTEGER DEFAULT 0
)
""")
'''
if nav:
script += f'''
nav_items = {json.dumps(nav, indent=8)}
for item in nav_items:
c.execute("""
INSERT INTO nav_items (label, href, display_order, is_cta)
VALUES (?, ?, ?, ?)
""", (item['label'], item['href'], item.get('display_order', 0), item.get('is_cta', 0)))
conn.commit()
conn.close()
print(f"✓ nav.sqlite created with {{len(nav_items)}} nav items")
'''
else:
script += '''
# [FILL] nav.json not found — add navigation items manually
# Example:
# nav_items = [
# {"label": "Home", "href": "/", "display_order": 1, "is_cta": 0},
# {"label": "Classes", "href": "/classes", "display_order": 2, "is_cta": 0},
# {"label": "Schedule", "href": "/schedule", "display_order": 3, "is_cta": 0},
# {"label": "Get Started", "href": "/contact", "display_order": 4, "is_cta": 1},
# ]
# Then uncomment and insert rows
conn.commit()
conn.close()
print("✓ nav.sqlite created (empty — [FILL] navigation items)")
'''
# Seed glossary
if glossary:
script += f'''
def seed_glossary():
"""Create glossary.sqlite and populate terms."""
db_path = os.path.join(DB_DIR, 'glossary.sqlite')
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS terms (
id INTEGER PRIMARY KEY,
slug TEXT UNIQUE NOT NULL,
term TEXT NOT NULL,
pronunciation TEXT,
definition TEXT NOT NULL,
category TEXT NOT NULL,
level TEXT NOT NULL,
display_order INTEGER DEFAULT 0
)
""")
glossary_items = {json.dumps(glossary, indent=8)}
for idx, item in enumerate(glossary_items):
fields = item.get('fields', {{}})
term = fields.get('sanskrit_name', '[FILL] Term needed')
slug = slugify(term)
pronunciation = fields.get('pronunciation', '')
definition = fields.get('definition', '[FILL] Definition needed')
category = fields.get('category', 'yoga')
level = fields.get('level', 'beginner')
c.execute("""
INSERT OR REPLACE INTO terms
(slug, term, pronunciation, definition, category, level, display_order)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (slug, term, pronunciation, definition, category, level, idx))
conn.commit()
conn.close()
print(f"✓ glossary.sqlite created with {{len(glossary_items)}} terms")
'''
else:
script += '''
def seed_glossary():
"""Create glossary.sqlite (empty — no glossary.json found)."""
db_path = os.path.join(DB_DIR, 'glossary.sqlite')
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS terms (
id INTEGER PRIMARY KEY,
slug TEXT UNIQUE NOT NULL,
term TEXT NOT NULL,
pronunciation TEXT,
definition TEXT NOT NULL,
category TEXT NOT NULL,
level TEXT NOT NULL,
display_order INTEGER DEFAULT 0
)
""")
conn.commit()
conn.close()
print("✓ glossary.sqlite created (empty)")
'''
script += '''
def seed_testimonials():
"""Create testimonials.sqlite (empty stub)."""
db_path = os.path.join(DB_DIR, 'testimonials.sqlite')
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS testimonials (
id INTEGER PRIMARY KEY,
quote TEXT NOT NULL,
author_name TEXT NOT NULL,
author_role TEXT,
is_featured INTEGER DEFAULT 0
)
""")
# [FILL] Add testimonials extracted from Divi testimonial modules or client-provided
# rows = [
# {"quote": "...", "author_name": "...", "author_role": "...", "is_featured": 0},
# ]
conn.commit()
conn.close()
print("✓ testimonials.sqlite created (empty — [FILL] add testimonials)")
def seed_blog():
"""Create blog.sqlite (empty stub)."""
db_path = os.path.join(DB_DIR, 'blog.sqlite')
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY,
slug TEXT UNIQUE NOT NULL,
title TEXT NOT NULL,
excerpt TEXT,
content TEXT,
author TEXT,
published_at TEXT,
is_featured INTEGER DEFAULT 0
)
""")
# [FILL] Add blog posts extracted from WP posts table
# rows = [
# {"slug": "...", "title": "...", "excerpt": "...", "content": "...", "author": "...", "published_at": "..."},
# ]
conn.commit()
conn.close()
print("✓ blog.sqlite created (empty — [FILL] add blog posts)")
def seed_videos():
"""Create videos.sqlite (empty stub)."""
db_path = os.path.join(DB_DIR, 'videos.sqlite')
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS videos (
id INTEGER PRIMARY KEY,
slug TEXT UNIQUE NOT NULL,
title TEXT NOT NULL,
duration TEXT,
embed_url TEXT,
thumbnail TEXT,
category TEXT,
level TEXT,
is_free INTEGER DEFAULT 1
)
""")
# [FILL] Add on-demand video entries if site has video content
# rows = [
# {"slug": "...", "title": "...", "duration": "12:34", "embed_url": "...", "category": "...", "level": "..."},
# ]
conn.commit()
conn.close()
print("✓ videos.sqlite created (empty — [FILL] add videos)")
def seed_events():
"""Create events.sqlite (empty stub)."""
db_path = os.path.join(DB_DIR, 'events.sqlite')
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY,
slug TEXT UNIQUE NOT NULL,
title TEXT NOT NULL,
event_date TEXT,
time_cet TEXT,
format TEXT,
capacity INTEGER,
price_eur REAL,
status TEXT DEFAULT 'open'
)
""")
# [FILL] Add workshop/event entries
# rows = [
# {"slug": "...", "title": "...", "event_date": "2026-06-15", "time_cet": "10:00", "format": "online", "capacity": 20, "price_eur": 29.99},
# ]
conn.commit()
conn.close()
print("✓ events.sqlite created (empty — [FILL] add events)")
def seed_schedule():
"""Create schedule.sqlite (empty stub)."""
db_path = os.path.join(DB_DIR, 'schedule.sqlite')
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS classes (
id INTEGER PRIMARY KEY,
day_of_week TEXT NOT NULL,
day_order INTEGER NOT NULL,
time_cet TEXT NOT NULL,
class_name TEXT NOT NULL,
level TEXT NOT NULL,
format TEXT NOT NULL,
duration_min INTEGER NOT NULL,
badge_variant TEXT DEFAULT ''
)
""")
# [FILL] Add recurring class schedule rows
# rows = [
# {"day_of_week": "Monday", "day_order": 1, "time_cet": "10:00", "class_name": "Hatha Yoga", "level": "beginner", "format": "online", "duration_min": 60, "badge_variant": "featured"},
# ]
conn.commit()
conn.close()
print("✓ schedule.sqlite created (empty — [FILL] add class schedule)")
def seed_instructors():
"""Create instructors.sqlite (empty stub)."""
db_path = os.path.join(DB_DIR, 'instructors.sqlite')
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS instructors (
id INTEGER PRIMARY KEY,
slug TEXT UNIQUE NOT NULL,
name TEXT NOT NULL,
title TEXT,
bio TEXT,
certifications TEXT,
image TEXT,
is_primary INTEGER DEFAULT 0
)
""")
# [FILL] Add instructor rows
# rows = [
# {"slug": "alice-johnson", "name": "Alice Johnson", "title": "Lead Instructor", "bio": "...", "certifications": "...", "is_primary": 1},
# ]
conn.commit()
conn.close()
print("✓ instructors.sqlite created (empty — [FILL] add instructors)")
def seed_packages():
"""Create packages.sqlite (empty stub)."""
db_path = os.path.join(DB_DIR, 'packages.sqlite')
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS packages (
id INTEGER PRIMARY KEY,
slug TEXT UNIQUE NOT NULL,
name TEXT NOT NULL,
price_eur REAL,
sessions_count INTEGER,
validity_days INTEGER,
is_featured INTEGER DEFAULT 0
)
""")
# [FILL] Add class pack/package options
# rows = [
# {"slug": "starter", "name": "Starter Pack", "price_eur": 49.99, "sessions_count": 5, "validity_days": 30, "is_featured": 0},
# {"slug": "unlimited", "name": "Unlimited Monthly", "price_eur": 99.99, "sessions_count": None, "validity_days": 30, "is_featured": 1},
# ]
conn.commit()
conn.close()
print("✓ packages.sqlite created (empty — [FILL] add packages)")
if __name__ == '__main__':
seed_pages()
seed_nav()
seed_glossary()
seed_testimonials()
seed_blog()
seed_videos()
seed_events()
seed_schedule()
seed_instructors()
seed_packages()
print("\\nSeeding complete. Review [FILL] markers before running in production.")
'''
return script
def main():
parser = argparse.ArgumentParser(
description='Generate seed_databases.py from extracted WP/Divi JSON data'
)
parser.add_argument('data_dir', help='Path to extracted data directory (.planning/data/)')
parser.add_argument('seed_path', help='Output path for seed_databases.py')
parser.add_argument('--domain', required=True, help='Domain name (e.g., example.com)')
parser.add_argument('--force', action='store_true', help='Overwrite existing seed_databases.py')
args = parser.parse_args()
# Validate inputs
if not os.path.isdir(args.data_dir):
print(f"Error: data_dir not found: {args.data_dir}")
return 1
if os.path.exists(args.seed_path) and not args.force:
print(f"Error: seed_databases.py already exists at {args.seed_path}")
print("Use --force to overwrite")
return 1
# Load required data files
pages = load_json_file(os.path.join(args.data_dir, 'pages.json'))
if not pages:
print("Error: pages.json not found or invalid")
return 1
design_system = load_json_file(os.path.join(args.data_dir, 'design-system.json'))
glossary = load_json_file(os.path.join(args.data_dir, 'glossary.json'))
nav = load_json_file(os.path.join(args.data_dir, 'nav.json'))
# Generate script
script_content = generate_seed_script(
args.data_dir,
args.domain,
design_system,
pages,
glossary,
nav
)
# Write output
os.makedirs(os.path.dirname(args.seed_path), exist_ok=True)
with open(args.seed_path, 'w') as f:
f.write(script_content)
# Make executable
os.chmod(args.seed_path, 0o755)
print(f"✓ Generated: {args.seed_path}")
print(f" Pages: {len([p for p in pages if p.get('status') == 'publish' and p.get('post_type') == 'page'])}")
print(f" Glossary terms: {len(glossary) if glossary else 0}")
print(f" Nav items: {len(nav) if nav else 0}")
print("\nNext: Review [FILL] markers, then run: python3 " + args.seed_path)
return 0
if __name__ == '__main__':
exit(main())