From 1f728cd8af11aa205d1dac1f90ba3e87d91aa93e Mon Sep 17 00:00:00 2001 From: Conan Scott Date: Wed, 4 Mar 2026 10:14:41 +0000 Subject: [PATCH] Add scripts/st_deploy.py --- scripts/st_deploy.py | 285 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 scripts/st_deploy.py diff --git a/scripts/st_deploy.py b/scripts/st_deploy.py new file mode 100644 index 0000000..b566c79 --- /dev/null +++ b/scripts/st_deploy.py @@ -0,0 +1,285 @@ +#!/usr/bin/env python3 +""" +st_deploy.py — Idempotent deploy of an ST AR flow from a declarative YAML spec. + +Usage: + python scripts/st_deploy.py --spec specs/compress-pgp-passthrough.yaml \\ + --host 192.168.0.245 --port 444 --user admin --pass PASSWORD + python scripts/st_deploy.py --spec specs/... --host ... --dry-run + +Exit codes: + 0 = success + 1 = deploy error + 2 = validation/prerequisite error +""" + +import argparse +import json +import sys +import ssl +import base64 +import urllib.request +import urllib.error + +try: + import yaml +except ImportError: + print("PyYAML required: pip install pyyaml", file=sys.stderr) + sys.exit(2) + +from st_env_snapshot import snapshot, validate_spec_prerequisites + +CTX = ssl.create_default_context() +CTX.check_hostname = False +CTX.verify_mode = ssl.CERT_NONE + +def api(base_url, method, path, auth, body=None): + url = f"{base_url}{path}" + data = json.dumps(body).encode() if body is not None else None + req = urllib.request.Request(url, data=data, method=method) + req.add_header("Authorization", f"Basic {auth}") + req.add_header("Accept", "application/json") + if data: + req.add_header("Content-Type", "application/json") + try: + with urllib.request.urlopen(req, context=CTX, timeout=15) as r: + text = r.read() + return json.loads(text) if text.strip() else {} + except urllib.error.HTTPError as e: + body_text = e.read().decode(errors="replace") + raise RuntimeError(f"HTTP {e.code} {method} {path}: {body_text[:300]}") + +def get_or_none(base, path, auth, name_field="name", name_value=None): + """Return first matching item by name, or None.""" + try: + result = api(base, "GET", path, auth) + items = result if isinstance(result, list) else result.get("result", []) + if name_value: + for item in items: + if item.get(name_field) == name_value: + return item + return None + except Exception: + return None + +def ensure_application(base, auth, spec, dry_run): + app_spec = spec["application"] + name = app_spec["name"] + existing = get_or_none(base, f"/applications?name={name}", auth, "name", name) + if existing: + print(f" [OK] Application '{name}' exists (id: {existing.get('id')})") + return existing + if dry_run: + print(f" [DRY] Would CREATE application '{name}'") + return {"id": "DRY_RUN", "name": name} + result = api(base, "POST", "/applications", auth, {"name": name, "type": "AdvancedRouting"}) + created = api(base, "GET", f"/applications?name={name}", auth) + items = created if isinstance(created, list) else created.get("result", []) + obj = next((x for x in items if x["name"] == name), {"name": name}) + print(f" [CREATED] Application '{name}' (id: {obj.get('id')})") + return obj + +def ensure_subscription(base, auth, spec, dry_run): + sub_spec = spec["subscription"] + account = sub_spec["account"] + folder = sub_spec["folder"] + app_name = sub_spec["application"] + existing = get_or_none(base, f"/subscriptions?account={account}", auth, "folder", folder) + if existing: + print(f" [OK] Subscription exists (account={account}, folder={folder}, id: {existing.get('id')})") + return existing + if dry_run: + print(f" [DRY] Would CREATE subscription (account={account}, folder={folder})") + return {"id": "DRY_RUN"} + pta = sub_spec.get("post_transmission_actions", { + "ptaOnSuccessDoInAdvancedRoutingWildcardPull": True, + "submitFilenamePatternExpression": "*", + "submitFilterType": "FILENAME_PATTERN", + "triggerFileOption": "fail", + "triggerOnSuccessfulWildcardPull": True + }) + payload = { + "type": "AdvancedRouting", + "folder": folder, + "account": account, + "application": app_name, + "postTransmissionActions": { + "ptaOnSuccessDoInAdvancedRoutingWildcardPull": pta.get("ptaOnSuccessDoInAdvancedRoutingWildcardPull", True), + "submitFilenamePatternExpression": pta.get("submitFilenamePatternExpression", "*"), + "submitFilterType": pta.get("submitFilterType", "FILENAME_PATTERN"), + "triggerFileOption": pta.get("triggerFileOption", "fail"), + "triggerOnSuccessfulWildcardPull": pta.get("triggerOnSuccessfulWildcardPull", True) + } + } + result = api(base, "POST", "/subscriptions", auth, payload) + sub_id = result.get("id", "?") + print(f" [CREATED] Subscription (account={account}, folder={folder}, id={sub_id})") + return result + +def build_step(step_spec): + """Convert spec step dict to ST API step payload. Omit id and precedingStep.""" + step_type = step_spec["type"] + base = { + "type": step_type, + "status": "ENABLED", + "autostart": False, + "conditionType": step_spec.get("conditionType", "ALWAYS"), + "condition": step_spec.get("condition"), + "actionOnStepFailure": step_spec.get("actionOnStepFailure", "PROCEED"), + "fileFilterExpression": step_spec.get("fileFilterExpression", "*"), + "fileFilterExpressionType": step_spec.get("fileFilterExpressionType", "GLOB"), + "usePrecedingStepFiles": step_spec.get("usePrecedingStepFiles", False), + } + if step_type == "Compress": + base.update({ + "singleArchiveEnabled": step_spec.get("singleArchiveEnabled", False), + "compressionMode": step_spec.get("compressionMode", "ZIP"), + }) + elif step_type == "PgpEncryption": + base.update({ + "encryptKeyExpression": step_spec["encryptKeyExpression"], + "encryptKeyExpressionType": step_spec.get("encryptKeyExpressionType", "ALIAS"), + "encryptKeyOwnerExpression": step_spec["encryptKeyOwnerExpression"], + "encryptKeyOwnerExpressionType": step_spec.get("encryptKeyOwnerExpressionType", "NAME"), + "compressionType": step_spec.get("compressionType", "0"), + "useAsciiArmour": step_spec.get("useAsciiArmour", False), + "signKeyExpression": step_spec.get("signKeyExpression"), + "signKeyExpressionType": step_spec.get("signKeyExpressionType"), + "signKeyOwnerExpression": step_spec.get("signKeyOwnerExpression"), + "signKeyOwnerExpressionType": step_spec.get("signKeyOwnerExpressionType"), + }) + elif step_type == "SendToPartner": + base.update({ + "partnerName": step_spec["partnerName"], + "partnerSite": step_spec["partnerSite"], + }) + elif step_type == "ExternalScript": + base.update({ + "scriptPath": step_spec["scriptPath"], + "outputFileExpression": step_spec.get("outputFileExpression"), + }) + return base + +def ensure_simple_route(base, auth, spec, dry_run): + route_spec = spec["routes"]["simple"] + name = route_spec["name"] + existing = get_or_none(base, f"/routes?name={name}", auth, "name", name) + if existing: + print(f" [OK] SIMPLE route '{name}' exists (id: {existing.get('id')})") + return existing + if dry_run: + print(f" [DRY] Would CREATE SIMPLE route '{name}' with {len(route_spec['steps'])} steps") + return {"id": "DRY_RUN", "name": name} + steps = [build_step(s) for s in route_spec.get("steps", [])] + payload = { + "name": name, + "type": "SIMPLE", + "conditionType": route_spec.get("conditionType", "ALWAYS"), + "condition": route_spec.get("condition"), + "steps": steps + } + result = api(base, "POST", "/routes", auth, payload) + created = api(base, "GET", f"/routes?name={name}", auth) + items = created if isinstance(created, list) else created.get("result", []) + obj = next((x for x in items if x["name"] == name), {"name": name}) + print(f" [CREATED] SIMPLE route '{name}' (id: {obj.get('id')})") + return obj + +def ensure_template_route(base, auth, spec, simple_id, dry_run): + route_spec = spec["routes"]["template"] + name = route_spec["name"] + existing = get_or_none(base, f"/routes?name={name}", auth, "name", name) + if existing: + print(f" [OK] TEMPLATE route '{name}' exists (id: {existing.get('id')})") + return existing + if dry_run: + print(f" [DRY] Would CREATE TEMPLATE route '{name}'") + return {"id": "DRY_RUN", "name": name} + payload = { + "name": name, + "type": "TEMPLATE", + "conditionType": "MATCH_ALL", + "steps": [{"type": "ExecuteRoute", "executeRoute": simple_id, "status": "ENABLED", "autostart": False}] + } + result = api(base, "POST", "/routes", auth, payload) + created = api(base, "GET", f"/routes?name={name}", auth) + items = created if isinstance(created, list) else created.get("result", []) + obj = next((x for x in items if x["name"] == name), {"name": name}) + print(f" [CREATED] TEMPLATE route '{name}' (id: {obj.get('id')})") + return obj + +def ensure_composite_route(base, auth, spec, template_id, sub_id, dry_run): + route_spec = spec["routes"]["composite"] + name = route_spec["name"] + existing = get_or_none(base, f"/routes?name={name}", auth, "name", name) + if existing: + print(f" [OK] COMPOSITE route '{name}' exists (id: {existing.get('id')})") + return existing + if dry_run: + print(f" [DRY] Would CREATE COMPOSITE route '{name}' and link to subscription") + return {"id": "DRY_RUN", "name": name} + payload = { + "name": name, + "type": "COMPOSITE", + "conditionType": "MATCH_ALL", + "routeTemplate": template_id, + "steps": [] + } + result = api(base, "POST", "/routes", auth, payload) + created = api(base, "GET", f"/routes?name={name}", auth) + items = created if isinstance(created, list) else created.get("result", []) + obj = next((x for x in items if x["name"] == name), {"name": name}) + comp_id = obj.get("id") + print(f" [CREATED] COMPOSITE route '{name}' (id: {comp_id})") + # Link to subscription + link_payload = dict(obj) + link_payload["subscriptions"] = [sub_id] + api(base, "PUT", f"/routes/{comp_id}", auth, link_payload) + print(f" [LINKED] COMPOSITE route → subscription {sub_id}") + return obj + +def main(): + parser = argparse.ArgumentParser(description="Idempotent deploy of ST AR flow from spec YAML") + parser.add_argument("--spec", required=True, help="Path to flow spec YAML") + parser.add_argument("--host", required=True) + parser.add_argument("--port", default="444") + parser.add_argument("--user", required=True) + parser.add_argument("--pass", dest="password", required=True) + parser.add_argument("--dry-run", action="store_true", help="Show planned actions without executing") + args = parser.parse_args() + + with open(args.spec) as f: + spec = yaml.safe_load(f) + + base = f"https://{args.host}:{args.port}/api/v2.0" + auth = base64.b64encode(f"{args.user}:{args.password}".encode()).decode() + + print(f"\n{'[DRY RUN] ' if args.dry_run else ''}Deploying spec: {spec['meta']['name']}") + print(f" {spec['meta'].get('description','')}\n") + + # Pre-flight: validate prerequisites + print("Pre-flight check...") + env_data = snapshot(args.host, args.port, args.user, args.password) + prereqs = spec.get("prerequisites", {}) + missing = validate_spec_prerequisites(env_data, prereqs) + if missing: + print("PRE-FLIGHT FAILED:") + for m in missing: + print(f" ✗ {m}") + sys.exit(2) + print(" Pre-flight passed\n") + + print("Deploying...") + try: + app = ensure_application(base, auth, spec, args.dry_run) + sub = ensure_subscription(base, auth, spec, args.dry_run) + simple = ensure_simple_route(base, auth, spec, args.dry_run) + template = ensure_template_route(base, auth, spec, simple["id"], args.dry_run) + composite = ensure_composite_route(base, auth, spec, template["id"], sub["id"], args.dry_run) + print(f"\n{'[DRY RUN] ' if args.dry_run else ''}Deploy complete.") + except RuntimeError as e: + print(f"\nDeploy error: {e}", file=sys.stderr) + sys.exit(1) + +if __name__ == "__main__": + main()