Files
st-flow-tests/scripts/st_env_snapshot.py

186 lines
6.9 KiB
Python

#!/usr/bin/env python3
"""
st_env_snapshot.py — Query live ST environment for pre-flight context injection.
Usage:
python scripts/st_env_snapshot.py --host HOST --port 444 --user admin --pass PASSWORD
python scripts/st_env_snapshot.py --host HOST --user admin --pass PASSWORD --output yaml
Output: JSON or YAML snapshot of accounts, partner sites, certificates, applications.
Use this output to validate flow spec prerequisites before deploying.
"""
import argparse
import json
import sys
import urllib.request
import urllib.error
import ssl
import base64
def api_get(base_url, path, auth):
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(f"{base_url}{path}")
req.add_header("Authorization", f"Basic {auth}")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, context=ctx, timeout=10) as r:
return json.loads(r.read())
except urllib.error.HTTPError as e:
return {"error": f"HTTP {e.code}: {e.reason}", "path": path}
except Exception as e:
return {"error": str(e), "path": path}
def snapshot(host, port, user, password):
base = f"https://{host}:{port}/api/v2.0"
auth = base64.b64encode(f"{user}:{password}".encode()).decode()
# Accounts
accts_raw = api_get(base, "/accounts?type=individual&limit=100", auth)
accts = []
for a in accts_raw if isinstance(accts_raw, list) else accts_raw.get("result", []):
accts.append({
"name": a.get("name"),
"type": a.get("type"),
"status": "locked" if a.get("locked") else "active"
})
# Partner accounts
partners_raw = api_get(base, "/accounts?type=partner&limit=100", auth)
partners = []
for a in partners_raw if isinstance(partners_raw, list) else partners_raw.get("result", []):
partners.append({"name": a.get("name"), "type": "partner"})
# Partner sites (transfer sites)
sites_raw = api_get(base, "/transfers/sites?limit=100", auth)
sites = []
for s in sites_raw if isinstance(sites_raw, list) else sites_raw.get("result", []):
sites.append({
"name": s.get("name"),
"type": s.get("type"),
"partner": s.get("partnerAccount"),
"protocol": s.get("protocol")
})
# Certificates (filter to PGP and SSH)
certs_raw = api_get(base, "/certificates?limit=200", auth)
certs = []
for c in certs_raw if isinstance(certs_raw, list) else certs_raw.get("result", []):
certs.append({
"name": c.get("name"),
"type": c.get("type"),
"usage": c.get("usage"),
"account": c.get("account"),
"accessLevel": c.get("accessLevel")
})
# AR Applications
apps_raw = api_get(base, "/applications?type=AdvancedRouting&limit=100", auth)
apps = []
for a in apps_raw if isinstance(apps_raw, list) else apps_raw.get("result", []):
apps.append({"name": a.get("name"), "type": a.get("type")})
# Subscriptions
subs_raw = api_get(base, "/subscriptions?type=AdvancedRouting&limit=100", auth)
subs = []
for s in subs_raw if isinstance(subs_raw, list) else subs_raw.get("result", []):
subs.append({
"account": s.get("account"),
"folder": s.get("folder"),
"application": s.get("application")
})
return {
"host": host,
"accounts": accts,
"partner_accounts": partners,
"partner_sites": sites,
"certificates": certs,
"applications": apps,
"subscriptions": subs
}
def validate_spec_prerequisites(snapshot_data, prereqs):
"""Check that all spec prerequisites exist in the snapshot. Returns list of missing items."""
missing = []
acct_names = {a["name"] for a in snapshot_data.get("accounts", [])}
partner_names = {a["name"] for a in snapshot_data.get("partner_accounts", [])}
site_names = {s["name"] for s in snapshot_data.get("partner_sites", [])}
cert_map = {c["name"]: c for c in snapshot_data.get("certificates", [])}
for a in prereqs.get("accounts", []):
if a["name"] not in acct_names:
missing.append(f"Account '{a['name']}' not found")
for p in prereqs.get("partner_accounts", []):
if p["name"] not in partner_names:
missing.append(f"Partner account '{p['name']}' not found")
for s in prereqs.get("partner_sites", []):
if s["name"] not in site_names:
missing.append(f"Partner site '{s['name']}' not found")
for c in prereqs.get("certificates", []):
name = c["name"]
if name not in cert_map:
missing.append(f"Certificate '{name}' not found")
continue
live = cert_map[name]
if c.get("accessLevel") and live.get("accessLevel") != c["accessLevel"]:
missing.append(
f"Certificate '{name}' has accessLevel={live.get('accessLevel')}, "
f"expected {c['accessLevel']} (routing engine requires PUBLIC)"
)
if c.get("account") and live.get("account") != c["account"]:
missing.append(
f"Certificate '{name}' owned by account='{live.get('account')}', "
f"expected '{c['account']}'"
)
return missing
def main():
parser = argparse.ArgumentParser(description="Snapshot ST environment for pre-flight validation")
parser.add_argument("--host", required=True)
parser.add_argument("--port", default="444")
parser.add_argument("--user", required=True)
parser.add_argument("--pass", dest="password", required=True)
parser.add_argument("--output", choices=["json", "yaml"], default="json")
parser.add_argument("--validate-spec", help="Path to flow spec YAML to validate prerequisites")
args = parser.parse_args()
data = snapshot(args.host, args.port, args.user, args.password)
if args.validate_spec:
try:
import yaml
with open(args.validate_spec) as f:
spec = yaml.safe_load(f)
prereqs = spec.get("prerequisites", {})
missing = validate_spec_prerequisites(data, prereqs)
if missing:
print("PRE-FLIGHT FAILED — missing prerequisites:", file=sys.stderr)
for m in missing:
print(f"{m}", file=sys.stderr)
sys.exit(2)
else:
print("PRE-FLIGHT PASSED — all prerequisites present", file=sys.stderr)
except ImportError:
print("Warning: PyYAML not installed, skipping spec validation", file=sys.stderr)
if args.output == "yaml":
try:
import yaml
print(yaml.dump(data, default_flow_style=False))
except ImportError:
print("PyYAML not installed, falling back to JSON", file=sys.stderr)
print(json.dumps(data, indent=2))
else:
print(json.dumps(data, indent=2))
if __name__ == "__main__":
main()