#!/usr/bin/env python3 """ st_env_snapshot.py — Query live ST environment for pre-flight context injection. Usage: python scripts/st_env_snapshot.py --host HOST --port 444 --user admin --pass PASSWORD python scripts/st_env_snapshot.py --host HOST --user admin --pass PASSWORD --output yaml Output: JSON or YAML snapshot of accounts, partner sites, certificates, applications. Use this output to validate flow spec prerequisites before deploying. """ import argparse import json import sys import urllib.request import urllib.error import ssl import base64 def api_get(base_url, path, auth): ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE req = urllib.request.Request(f"{base_url}{path}") req.add_header("Authorization", f"Basic {auth}") req.add_header("Accept", "application/json") try: with urllib.request.urlopen(req, context=ctx, timeout=10) as r: return json.loads(r.read()) except urllib.error.HTTPError as e: return {"error": f"HTTP {e.code}: {e.reason}", "path": path} except Exception as e: return {"error": str(e), "path": path} def snapshot(host, port, user, password): base = f"https://{host}:{port}/api/v2.0" auth = base64.b64encode(f"{user}:{password}".encode()).decode() # Accounts — no type filter (type=individual is invalid; all accounts returned, categorised client-side) accts_raw = api_get(base, "/accounts?limit=200", auth) accts = [] partners = [] for a in accts_raw if isinstance(accts_raw, list) else accts_raw.get("result", []): locked = a.get("user", {}).get("locked", False) if isinstance(a.get("user"), dict) else False entry = { "name": a.get("name"), "type": a.get("type"), "status": "locked" if locked else "active" } # Partner accounts have routingMode set and no transferType=N pattern; use name heuristic or type field # In ST v5, partner accounts show type=user but have a distinct homeFolderAccessLevel pattern. # Most reliable: include all non-service accounts and let caller filter. if a.get("type") in ("user",): accts.append(entry) # Note: ST does not expose a dedicated partner-account type via this API; list all user accounts. # Partner sites — correct endpoint is /sites (not /transfers/sites) sites_raw = api_get(base, "/sites?limit=100", auth) sites = [] for s in sites_raw if isinstance(sites_raw, list) else sites_raw.get("result", []): sites.append({ "name": s.get("name"), "type": s.get("type"), "partner": s.get("partner"), "protocol": s.get("protocol"), "host": s.get("host"), "port": s.get("port") }) # Certificates (filter to PGP and SSH) certs_raw = api_get(base, "/certificates?limit=200", auth) certs = [] for c in certs_raw if isinstance(certs_raw, list) else certs_raw.get("result", []): certs.append({ "name": c.get("name"), "type": c.get("type"), "usage": c.get("usage"), "account": c.get("account"), "accessLevel": c.get("accessLevel") }) # AR Applications apps_raw = api_get(base, "/applications?type=AdvancedRouting&limit=100", auth) apps = [] for a in apps_raw if isinstance(apps_raw, list) else apps_raw.get("result", []): apps.append({"name": a.get("name"), "type": a.get("type")}) # Subscriptions subs_raw = api_get(base, "/subscriptions?type=AdvancedRouting&limit=100", auth) subs = [] for s in subs_raw if isinstance(subs_raw, list) else subs_raw.get("result", []): subs.append({ "account": s.get("account"), "folder": s.get("folder"), "application": s.get("application") }) return { "host": host, "accounts": accts, "partner_sites": sites, "certificates": certs, "applications": apps, "subscriptions": subs } def validate_spec_prerequisites(snapshot_data, prereqs): """Check that all spec prerequisites exist in the snapshot. Returns list of missing items.""" missing = [] acct_names = {a["name"] for a in snapshot_data.get("accounts", [])} # partner_accounts merged into accounts in v2; check against acct_names acct_names_all = acct_names site_names = {s["name"] for s in snapshot_data.get("partner_sites", [])} cert_map = {c["name"]: c for c in snapshot_data.get("certificates", [])} for a in prereqs.get("accounts", []): if a["name"] not in acct_names: missing.append(f"Account '{a['name']}' not found") for p in prereqs.get("partner_accounts", []): if p["name"] not in acct_names: missing.append(f"Partner account '{p['name']}' not found") for s in prereqs.get("partner_sites", []): if s["name"] not in site_names: missing.append(f"Partner site '{s['name']}' not found") for c in prereqs.get("certificates", []): name = c["name"] if name not in cert_map: missing.append(f"Certificate '{name}' not found") continue live = cert_map[name] if c.get("accessLevel") and live.get("accessLevel") != c["accessLevel"]: missing.append( f"Certificate '{name}' has accessLevel={live.get('accessLevel')}, " f"expected {c['accessLevel']} (routing engine requires PUBLIC)" ) if c.get("account") and live.get("account") != c["account"]: missing.append( f"Certificate '{name}' owned by account='{live.get('account')}', " f"expected '{c['account']}'" ) return missing def main(): parser = argparse.ArgumentParser(description="Snapshot ST environment for pre-flight validation") parser.add_argument("--host", required=True) parser.add_argument("--port", default="444") parser.add_argument("--user", required=True) parser.add_argument("--pass", dest="password", required=True) parser.add_argument("--output", choices=["json", "yaml"], default="json") parser.add_argument("--validate-spec", help="Path to flow spec YAML to validate prerequisites") args = parser.parse_args() data = snapshot(args.host, args.port, args.user, args.password) if args.validate_spec: try: import yaml with open(args.validate_spec) as f: spec = yaml.safe_load(f) prereqs = spec.get("prerequisites", {}) missing = validate_spec_prerequisites(data, prereqs) if missing: print("PRE-FLIGHT FAILED — missing prerequisites:", file=sys.stderr) for m in missing: print(f" ✗ {m}", file=sys.stderr) sys.exit(2) else: print("PRE-FLIGHT PASSED — all prerequisites present", file=sys.stderr) except ImportError: print("Warning: PyYAML not installed, skipping spec validation", file=sys.stderr) if args.output == "yaml": try: import yaml print(yaml.dump(data, default_flow_style=False)) except ImportError: print("PyYAML not installed, falling back to JSON", file=sys.stderr) print(json.dumps(data, indent=2)) else: print(json.dumps(data, indent=2)) if __name__ == "__main__": main()