From 5434657f240205fd2e4b45daf216d5542e835e1b Mon Sep 17 00:00:00 2001 From: Conan Scott Date: Wed, 4 Mar 2026 10:14:40 +0000 Subject: [PATCH] Add scripts/st_env_snapshot.py --- scripts/st_env_snapshot.py | 185 +++++++++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 scripts/st_env_snapshot.py diff --git a/scripts/st_env_snapshot.py b/scripts/st_env_snapshot.py new file mode 100644 index 0000000..972fc39 --- /dev/null +++ b/scripts/st_env_snapshot.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +""" +st_env_snapshot.py — Query live ST environment for pre-flight context injection. + +Usage: + python scripts/st_env_snapshot.py --host HOST --port 444 --user admin --pass PASSWORD + python scripts/st_env_snapshot.py --host HOST --user admin --pass PASSWORD --output yaml + +Output: JSON or YAML snapshot of accounts, partner sites, certificates, applications. +Use this output to validate flow spec prerequisites before deploying. +""" + +import argparse +import json +import sys +import urllib.request +import urllib.error +import ssl +import base64 + +def api_get(base_url, path, auth): + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + req = urllib.request.Request(f"{base_url}{path}") + req.add_header("Authorization", f"Basic {auth}") + req.add_header("Accept", "application/json") + try: + with urllib.request.urlopen(req, context=ctx, timeout=10) as r: + return json.loads(r.read()) + except urllib.error.HTTPError as e: + return {"error": f"HTTP {e.code}: {e.reason}", "path": path} + except Exception as e: + return {"error": str(e), "path": path} + +def snapshot(host, port, user, password): + base = f"https://{host}:{port}/api/v2.0" + auth = base64.b64encode(f"{user}:{password}".encode()).decode() + + # Accounts + accts_raw = api_get(base, "/accounts?type=individual&limit=100", auth) + accts = [] + for a in accts_raw if isinstance(accts_raw, list) else accts_raw.get("result", []): + accts.append({ + "name": a.get("name"), + "type": a.get("type"), + "status": "locked" if a.get("locked") else "active" + }) + + # Partner accounts + partners_raw = api_get(base, "/accounts?type=partner&limit=100", auth) + partners = [] + for a in partners_raw if isinstance(partners_raw, list) else partners_raw.get("result", []): + partners.append({"name": a.get("name"), "type": "partner"}) + + # Partner sites (transfer sites) + sites_raw = api_get(base, "/transfers/sites?limit=100", auth) + sites = [] + for s in sites_raw if isinstance(sites_raw, list) else sites_raw.get("result", []): + sites.append({ + "name": s.get("name"), + "type": s.get("type"), + "partner": s.get("partnerAccount"), + "protocol": s.get("protocol") + }) + + # Certificates (filter to PGP and SSH) + certs_raw = api_get(base, "/certificates?limit=200", auth) + certs = [] + for c in certs_raw if isinstance(certs_raw, list) else certs_raw.get("result", []): + certs.append({ + "name": c.get("name"), + "type": c.get("type"), + "usage": c.get("usage"), + "account": c.get("account"), + "accessLevel": c.get("accessLevel") + }) + + # AR Applications + apps_raw = api_get(base, "/applications?type=AdvancedRouting&limit=100", auth) + apps = [] + for a in apps_raw if isinstance(apps_raw, list) else apps_raw.get("result", []): + apps.append({"name": a.get("name"), "type": a.get("type")}) + + # Subscriptions + subs_raw = api_get(base, "/subscriptions?type=AdvancedRouting&limit=100", auth) + subs = [] + for s in subs_raw if isinstance(subs_raw, list) else subs_raw.get("result", []): + subs.append({ + "account": s.get("account"), + "folder": s.get("folder"), + "application": s.get("application") + }) + + return { + "host": host, + "accounts": accts, + "partner_accounts": partners, + "partner_sites": sites, + "certificates": certs, + "applications": apps, + "subscriptions": subs + } + +def validate_spec_prerequisites(snapshot_data, prereqs): + """Check that all spec prerequisites exist in the snapshot. Returns list of missing items.""" + missing = [] + + acct_names = {a["name"] for a in snapshot_data.get("accounts", [])} + partner_names = {a["name"] for a in snapshot_data.get("partner_accounts", [])} + site_names = {s["name"] for s in snapshot_data.get("partner_sites", [])} + cert_map = {c["name"]: c for c in snapshot_data.get("certificates", [])} + + for a in prereqs.get("accounts", []): + if a["name"] not in acct_names: + missing.append(f"Account '{a['name']}' not found") + + for p in prereqs.get("partner_accounts", []): + if p["name"] not in partner_names: + missing.append(f"Partner account '{p['name']}' not found") + + for s in prereqs.get("partner_sites", []): + if s["name"] not in site_names: + missing.append(f"Partner site '{s['name']}' not found") + + for c in prereqs.get("certificates", []): + name = c["name"] + if name not in cert_map: + missing.append(f"Certificate '{name}' not found") + continue + live = cert_map[name] + if c.get("accessLevel") and live.get("accessLevel") != c["accessLevel"]: + missing.append( + f"Certificate '{name}' has accessLevel={live.get('accessLevel')}, " + f"expected {c['accessLevel']} (routing engine requires PUBLIC)" + ) + if c.get("account") and live.get("account") != c["account"]: + missing.append( + f"Certificate '{name}' owned by account='{live.get('account')}', " + f"expected '{c['account']}'" + ) + + return missing + +def main(): + parser = argparse.ArgumentParser(description="Snapshot ST environment for pre-flight validation") + parser.add_argument("--host", required=True) + parser.add_argument("--port", default="444") + parser.add_argument("--user", required=True) + parser.add_argument("--pass", dest="password", required=True) + parser.add_argument("--output", choices=["json", "yaml"], default="json") + parser.add_argument("--validate-spec", help="Path to flow spec YAML to validate prerequisites") + args = parser.parse_args() + + data = snapshot(args.host, args.port, args.user, args.password) + + if args.validate_spec: + try: + import yaml + with open(args.validate_spec) as f: + spec = yaml.safe_load(f) + prereqs = spec.get("prerequisites", {}) + missing = validate_spec_prerequisites(data, prereqs) + if missing: + print("PRE-FLIGHT FAILED — missing prerequisites:", file=sys.stderr) + for m in missing: + print(f" ✗ {m}", file=sys.stderr) + sys.exit(2) + else: + print("PRE-FLIGHT PASSED — all prerequisites present", file=sys.stderr) + except ImportError: + print("Warning: PyYAML not installed, skipping spec validation", file=sys.stderr) + + if args.output == "yaml": + try: + import yaml + print(yaml.dump(data, default_flow_style=False)) + except ImportError: + print("PyYAML not installed, falling back to JSON", file=sys.stderr) + print(json.dumps(data, indent=2)) + else: + print(json.dumps(data, indent=2)) + +if __name__ == "__main__": + main()