#!/usr/bin/env python3 """ st_deploy.py — Idempotent deploy of an ST AR flow from a declarative YAML spec. Usage: python scripts/st_deploy.py --spec specs/compress-pgp-passthrough.yaml \\ --host 192.168.0.245 --port 444 --user admin --pass PASSWORD python scripts/st_deploy.py --spec specs/... --host ... --dry-run Exit codes: 0 = success 1 = deploy error 2 = validation/prerequisite error """ import argparse import json import sys import ssl import base64 import urllib.request import urllib.error try: import yaml except ImportError: print("PyYAML required: pip install pyyaml", file=sys.stderr) sys.exit(2) from st_env_snapshot import snapshot, validate_spec_prerequisites CTX = ssl.create_default_context() CTX.check_hostname = False CTX.verify_mode = ssl.CERT_NONE def api(base_url, method, path, auth, body=None): url = f"{base_url}{path}" data = json.dumps(body).encode() if body is not None else None req = urllib.request.Request(url, data=data, method=method) req.add_header("Authorization", f"Basic {auth}") req.add_header("Accept", "application/json") if data: req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req, context=CTX, timeout=15) as r: text = r.read() return json.loads(text) if text.strip() else {} except urllib.error.HTTPError as e: body_text = e.read().decode(errors="replace") raise RuntimeError(f"HTTP {e.code} {method} {path}: {body_text[:300]}") def get_or_none(base, path, auth, name_field="name", name_value=None): """Return first matching item by name, or None.""" try: result = api(base, "GET", path, auth) items = result if isinstance(result, list) else result.get("result", []) if name_value: for item in items: if item.get(name_field) == name_value: return item return None except Exception: return None def ensure_application(base, auth, spec, dry_run): app_spec = spec["application"] name = app_spec["name"] existing = get_or_none(base, f"/applications?name={name}", auth, "name", name) if existing: print(f" [OK] Application '{name}' exists (id: {existing.get('id')})") return existing if dry_run: print(f" [DRY] Would CREATE application '{name}'") return {"id": "DRY_RUN", "name": name} result = api(base, "POST", "/applications", auth, {"name": name, "type": "AdvancedRouting"}) created = api(base, "GET", f"/applications?name={name}", auth) items = created if isinstance(created, list) else created.get("result", []) obj = next((x for x in items if x["name"] == name), {"name": name}) print(f" [CREATED] Application '{name}' (id: {obj.get('id')})") return obj def ensure_subscription(base, auth, spec, dry_run): sub_spec = spec["subscription"] account = sub_spec["account"] folder = sub_spec["folder"] app_name = sub_spec["application"] existing = get_or_none(base, f"/subscriptions?account={account}", auth, "folder", folder) if existing: print(f" [OK] Subscription exists (account={account}, folder={folder}, id: {existing.get('id')})") return existing if dry_run: print(f" [DRY] Would CREATE subscription (account={account}, folder={folder})") return {"id": "DRY_RUN"} pta = sub_spec.get("post_transmission_actions", { "ptaOnSuccessDoInAdvancedRoutingWildcardPull": True, "submitFilenamePatternExpression": "*", "submitFilterType": "FILENAME_PATTERN", "triggerFileOption": "fail", "triggerOnSuccessfulWildcardPull": True }) payload = { "type": "AdvancedRouting", "folder": folder, "account": account, "application": app_name, "postTransmissionActions": { "ptaOnSuccessDoInAdvancedRoutingWildcardPull": pta.get("ptaOnSuccessDoInAdvancedRoutingWildcardPull", True), "submitFilenamePatternExpression": pta.get("submitFilenamePatternExpression", "*"), "submitFilterType": pta.get("submitFilterType", "FILENAME_PATTERN"), "triggerFileOption": pta.get("triggerFileOption", "fail"), "triggerOnSuccessfulWildcardPull": pta.get("triggerOnSuccessfulWildcardPull", True) } } result = api(base, "POST", "/subscriptions", auth, payload) sub_id = result.get("id", "?") print(f" [CREATED] Subscription (account={account}, folder={folder}, id={sub_id})") return result def build_step(step_spec): """Convert spec step dict to ST API step payload. Omit id and precedingStep.""" step_type = step_spec["type"] base = { "type": step_type, "status": "ENABLED", "autostart": False, "conditionType": step_spec.get("conditionType", "ALWAYS"), "condition": step_spec.get("condition"), "actionOnStepFailure": step_spec.get("actionOnStepFailure", "PROCEED"), "fileFilterExpression": step_spec.get("fileFilterExpression", "*"), "fileFilterExpressionType": step_spec.get("fileFilterExpressionType", "GLOB"), "usePrecedingStepFiles": step_spec.get("usePrecedingStepFiles", False), } if step_type == "Compress": base.update({ "singleArchiveEnabled": step_spec.get("singleArchiveEnabled", False), "compressionMode": step_spec.get("compressionMode", "ZIP"), }) elif step_type == "PgpEncryption": base.update({ "encryptKeyExpression": step_spec["encryptKeyExpression"], "encryptKeyExpressionType": step_spec.get("encryptKeyExpressionType", "ALIAS"), "encryptKeyOwnerExpression": step_spec["encryptKeyOwnerExpression"], "encryptKeyOwnerExpressionType": step_spec.get("encryptKeyOwnerExpressionType", "NAME"), "compressionType": step_spec.get("compressionType", "0"), "useAsciiArmour": step_spec.get("useAsciiArmour", False), "signKeyExpression": step_spec.get("signKeyExpression"), "signKeyExpressionType": step_spec.get("signKeyExpressionType"), "signKeyOwnerExpression": step_spec.get("signKeyOwnerExpression"), "signKeyOwnerExpressionType": step_spec.get("signKeyOwnerExpressionType"), }) elif step_type == "SendToPartner": base.update({ "partnerName": step_spec["partnerName"], "partnerSite": step_spec["partnerSite"], }) elif step_type == "ExternalScript": base.update({ "scriptPath": step_spec["scriptPath"], "outputFileExpression": step_spec.get("outputFileExpression"), }) return base def ensure_simple_route(base, auth, spec, dry_run): route_spec = spec["routes"]["simple"] name = route_spec["name"] existing = get_or_none(base, f"/routes?name={name}", auth, "name", name) if existing: print(f" [OK] SIMPLE route '{name}' exists (id: {existing.get('id')})") return existing if dry_run: print(f" [DRY] Would CREATE SIMPLE route '{name}' with {len(route_spec['steps'])} steps") return {"id": "DRY_RUN", "name": name} steps = [build_step(s) for s in route_spec.get("steps", [])] payload = { "name": name, "type": "SIMPLE", "conditionType": route_spec.get("conditionType", "ALWAYS"), "condition": route_spec.get("condition"), "steps": steps } result = api(base, "POST", "/routes", auth, payload) created = api(base, "GET", f"/routes?name={name}", auth) items = created if isinstance(created, list) else created.get("result", []) obj = next((x for x in items if x["name"] == name), {"name": name}) print(f" [CREATED] SIMPLE route '{name}' (id: {obj.get('id')})") return obj def ensure_template_route(base, auth, spec, simple_id, dry_run): route_spec = spec["routes"]["template"] name = route_spec["name"] existing = get_or_none(base, f"/routes?name={name}", auth, "name", name) if existing: print(f" [OK] TEMPLATE route '{name}' exists (id: {existing.get('id')})") return existing if dry_run: print(f" [DRY] Would CREATE TEMPLATE route '{name}'") return {"id": "DRY_RUN", "name": name} payload = { "name": name, "type": "TEMPLATE", "conditionType": "MATCH_ALL", "steps": [{"type": "ExecuteRoute", "executeRoute": simple_id, "status": "ENABLED", "autostart": False}] } result = api(base, "POST", "/routes", auth, payload) created = api(base, "GET", f"/routes?name={name}", auth) items = created if isinstance(created, list) else created.get("result", []) obj = next((x for x in items if x["name"] == name), {"name": name}) print(f" [CREATED] TEMPLATE route '{name}' (id: {obj.get('id')})") return obj def ensure_composite_route(base, auth, spec, template_id, sub_id, dry_run): route_spec = spec["routes"]["composite"] name = route_spec["name"] existing = get_or_none(base, f"/routes?name={name}", auth, "name", name) if existing: print(f" [OK] COMPOSITE route '{name}' exists (id: {existing.get('id')})") return existing if dry_run: print(f" [DRY] Would CREATE COMPOSITE route '{name}' and link to subscription") return {"id": "DRY_RUN", "name": name} payload = { "name": name, "type": "COMPOSITE", "conditionType": "MATCH_ALL", "routeTemplate": template_id, "steps": [] } result = api(base, "POST", "/routes", auth, payload) created = api(base, "GET", f"/routes?name={name}", auth) items = created if isinstance(created, list) else created.get("result", []) obj = next((x for x in items if x["name"] == name), {"name": name}) comp_id = obj.get("id") print(f" [CREATED] COMPOSITE route '{name}' (id: {comp_id})") # Link to subscription link_payload = dict(obj) link_payload["subscriptions"] = [sub_id] api(base, "PUT", f"/routes/{comp_id}", auth, link_payload) print(f" [LINKED] COMPOSITE route → subscription {sub_id}") return obj def main(): parser = argparse.ArgumentParser(description="Idempotent deploy of ST AR flow from spec YAML") parser.add_argument("--spec", required=True, help="Path to flow spec YAML") parser.add_argument("--host", required=True) parser.add_argument("--port", default="444") parser.add_argument("--user", required=True) parser.add_argument("--pass", dest="password", required=True) parser.add_argument("--dry-run", action="store_true", help="Show planned actions without executing") args = parser.parse_args() with open(args.spec) as f: spec = yaml.safe_load(f) base = f"https://{args.host}:{args.port}/api/v2.0" auth = base64.b64encode(f"{args.user}:{args.password}".encode()).decode() print(f"\n{'[DRY RUN] ' if args.dry_run else ''}Deploying spec: {spec['meta']['name']}") print(f" {spec['meta'].get('description','')}\n") # Pre-flight: validate prerequisites print("Pre-flight check...") env_data = snapshot(args.host, args.port, args.user, args.password) prereqs = spec.get("prerequisites", {}) missing = validate_spec_prerequisites(env_data, prereqs) if missing: print("PRE-FLIGHT FAILED:") for m in missing: print(f" ✗ {m}") sys.exit(2) print(" Pre-flight passed\n") print("Deploying...") try: app = ensure_application(base, auth, spec, args.dry_run) sub = ensure_subscription(base, auth, spec, args.dry_run) simple = ensure_simple_route(base, auth, spec, args.dry_run) template = ensure_template_route(base, auth, spec, simple["id"], args.dry_run) composite = ensure_composite_route(base, auth, spec, template["id"], sub["id"], args.dry_run) print(f"\n{'[DRY RUN] ' if args.dry_run else ''}Deploy complete.") except RuntimeError as e: print(f"\nDeploy error: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()