Add scripts/st_deploy.py

This commit is contained in:
2026-03-04 10:14:41 +00:00
parent 5434657f24
commit 1f728cd8af

285
scripts/st_deploy.py Normal file
View File

@@ -0,0 +1,285 @@
#!/usr/bin/env python3
"""
st_deploy.py — Idempotent deploy of an ST AR flow from a declarative YAML spec.
Usage:
python scripts/st_deploy.py --spec specs/compress-pgp-passthrough.yaml \\
--host 192.168.0.245 --port 444 --user admin --pass PASSWORD
python scripts/st_deploy.py --spec specs/... --host ... --dry-run
Exit codes:
0 = success
1 = deploy error
2 = validation/prerequisite error
"""
import argparse
import json
import sys
import ssl
import base64
import urllib.request
import urllib.error
try:
import yaml
except ImportError:
print("PyYAML required: pip install pyyaml", file=sys.stderr)
sys.exit(2)
from st_env_snapshot import snapshot, validate_spec_prerequisites
CTX = ssl.create_default_context()
CTX.check_hostname = False
CTX.verify_mode = ssl.CERT_NONE
def api(base_url, method, path, auth, body=None):
url = f"{base_url}{path}"
data = json.dumps(body).encode() if body is not None else None
req = urllib.request.Request(url, data=data, method=method)
req.add_header("Authorization", f"Basic {auth}")
req.add_header("Accept", "application/json")
if data:
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, context=CTX, timeout=15) as r:
text = r.read()
return json.loads(text) if text.strip() else {}
except urllib.error.HTTPError as e:
body_text = e.read().decode(errors="replace")
raise RuntimeError(f"HTTP {e.code} {method} {path}: {body_text[:300]}")
def get_or_none(base, path, auth, name_field="name", name_value=None):
"""Return first matching item by name, or None."""
try:
result = api(base, "GET", path, auth)
items = result if isinstance(result, list) else result.get("result", [])
if name_value:
for item in items:
if item.get(name_field) == name_value:
return item
return None
except Exception:
return None
def ensure_application(base, auth, spec, dry_run):
app_spec = spec["application"]
name = app_spec["name"]
existing = get_or_none(base, f"/applications?name={name}", auth, "name", name)
if existing:
print(f" [OK] Application '{name}' exists (id: {existing.get('id')})")
return existing
if dry_run:
print(f" [DRY] Would CREATE application '{name}'")
return {"id": "DRY_RUN", "name": name}
result = api(base, "POST", "/applications", auth, {"name": name, "type": "AdvancedRouting"})
created = api(base, "GET", f"/applications?name={name}", auth)
items = created if isinstance(created, list) else created.get("result", [])
obj = next((x for x in items if x["name"] == name), {"name": name})
print(f" [CREATED] Application '{name}' (id: {obj.get('id')})")
return obj
def ensure_subscription(base, auth, spec, dry_run):
sub_spec = spec["subscription"]
account = sub_spec["account"]
folder = sub_spec["folder"]
app_name = sub_spec["application"]
existing = get_or_none(base, f"/subscriptions?account={account}", auth, "folder", folder)
if existing:
print(f" [OK] Subscription exists (account={account}, folder={folder}, id: {existing.get('id')})")
return existing
if dry_run:
print(f" [DRY] Would CREATE subscription (account={account}, folder={folder})")
return {"id": "DRY_RUN"}
pta = sub_spec.get("post_transmission_actions", {
"ptaOnSuccessDoInAdvancedRoutingWildcardPull": True,
"submitFilenamePatternExpression": "*",
"submitFilterType": "FILENAME_PATTERN",
"triggerFileOption": "fail",
"triggerOnSuccessfulWildcardPull": True
})
payload = {
"type": "AdvancedRouting",
"folder": folder,
"account": account,
"application": app_name,
"postTransmissionActions": {
"ptaOnSuccessDoInAdvancedRoutingWildcardPull": pta.get("ptaOnSuccessDoInAdvancedRoutingWildcardPull", True),
"submitFilenamePatternExpression": pta.get("submitFilenamePatternExpression", "*"),
"submitFilterType": pta.get("submitFilterType", "FILENAME_PATTERN"),
"triggerFileOption": pta.get("triggerFileOption", "fail"),
"triggerOnSuccessfulWildcardPull": pta.get("triggerOnSuccessfulWildcardPull", True)
}
}
result = api(base, "POST", "/subscriptions", auth, payload)
sub_id = result.get("id", "?")
print(f" [CREATED] Subscription (account={account}, folder={folder}, id={sub_id})")
return result
def build_step(step_spec):
"""Convert spec step dict to ST API step payload. Omit id and precedingStep."""
step_type = step_spec["type"]
base = {
"type": step_type,
"status": "ENABLED",
"autostart": False,
"conditionType": step_spec.get("conditionType", "ALWAYS"),
"condition": step_spec.get("condition"),
"actionOnStepFailure": step_spec.get("actionOnStepFailure", "PROCEED"),
"fileFilterExpression": step_spec.get("fileFilterExpression", "*"),
"fileFilterExpressionType": step_spec.get("fileFilterExpressionType", "GLOB"),
"usePrecedingStepFiles": step_spec.get("usePrecedingStepFiles", False),
}
if step_type == "Compress":
base.update({
"singleArchiveEnabled": step_spec.get("singleArchiveEnabled", False),
"compressionMode": step_spec.get("compressionMode", "ZIP"),
})
elif step_type == "PgpEncryption":
base.update({
"encryptKeyExpression": step_spec["encryptKeyExpression"],
"encryptKeyExpressionType": step_spec.get("encryptKeyExpressionType", "ALIAS"),
"encryptKeyOwnerExpression": step_spec["encryptKeyOwnerExpression"],
"encryptKeyOwnerExpressionType": step_spec.get("encryptKeyOwnerExpressionType", "NAME"),
"compressionType": step_spec.get("compressionType", "0"),
"useAsciiArmour": step_spec.get("useAsciiArmour", False),
"signKeyExpression": step_spec.get("signKeyExpression"),
"signKeyExpressionType": step_spec.get("signKeyExpressionType"),
"signKeyOwnerExpression": step_spec.get("signKeyOwnerExpression"),
"signKeyOwnerExpressionType": step_spec.get("signKeyOwnerExpressionType"),
})
elif step_type == "SendToPartner":
base.update({
"partnerName": step_spec["partnerName"],
"partnerSite": step_spec["partnerSite"],
})
elif step_type == "ExternalScript":
base.update({
"scriptPath": step_spec["scriptPath"],
"outputFileExpression": step_spec.get("outputFileExpression"),
})
return base
def ensure_simple_route(base, auth, spec, dry_run):
route_spec = spec["routes"]["simple"]
name = route_spec["name"]
existing = get_or_none(base, f"/routes?name={name}", auth, "name", name)
if existing:
print(f" [OK] SIMPLE route '{name}' exists (id: {existing.get('id')})")
return existing
if dry_run:
print(f" [DRY] Would CREATE SIMPLE route '{name}' with {len(route_spec['steps'])} steps")
return {"id": "DRY_RUN", "name": name}
steps = [build_step(s) for s in route_spec.get("steps", [])]
payload = {
"name": name,
"type": "SIMPLE",
"conditionType": route_spec.get("conditionType", "ALWAYS"),
"condition": route_spec.get("condition"),
"steps": steps
}
result = api(base, "POST", "/routes", auth, payload)
created = api(base, "GET", f"/routes?name={name}", auth)
items = created if isinstance(created, list) else created.get("result", [])
obj = next((x for x in items if x["name"] == name), {"name": name})
print(f" [CREATED] SIMPLE route '{name}' (id: {obj.get('id')})")
return obj
def ensure_template_route(base, auth, spec, simple_id, dry_run):
route_spec = spec["routes"]["template"]
name = route_spec["name"]
existing = get_or_none(base, f"/routes?name={name}", auth, "name", name)
if existing:
print(f" [OK] TEMPLATE route '{name}' exists (id: {existing.get('id')})")
return existing
if dry_run:
print(f" [DRY] Would CREATE TEMPLATE route '{name}'")
return {"id": "DRY_RUN", "name": name}
payload = {
"name": name,
"type": "TEMPLATE",
"conditionType": "MATCH_ALL",
"steps": [{"type": "ExecuteRoute", "executeRoute": simple_id, "status": "ENABLED", "autostart": False}]
}
result = api(base, "POST", "/routes", auth, payload)
created = api(base, "GET", f"/routes?name={name}", auth)
items = created if isinstance(created, list) else created.get("result", [])
obj = next((x for x in items if x["name"] == name), {"name": name})
print(f" [CREATED] TEMPLATE route '{name}' (id: {obj.get('id')})")
return obj
def ensure_composite_route(base, auth, spec, template_id, sub_id, dry_run):
route_spec = spec["routes"]["composite"]
name = route_spec["name"]
existing = get_or_none(base, f"/routes?name={name}", auth, "name", name)
if existing:
print(f" [OK] COMPOSITE route '{name}' exists (id: {existing.get('id')})")
return existing
if dry_run:
print(f" [DRY] Would CREATE COMPOSITE route '{name}' and link to subscription")
return {"id": "DRY_RUN", "name": name}
payload = {
"name": name,
"type": "COMPOSITE",
"conditionType": "MATCH_ALL",
"routeTemplate": template_id,
"steps": []
}
result = api(base, "POST", "/routes", auth, payload)
created = api(base, "GET", f"/routes?name={name}", auth)
items = created if isinstance(created, list) else created.get("result", [])
obj = next((x for x in items if x["name"] == name), {"name": name})
comp_id = obj.get("id")
print(f" [CREATED] COMPOSITE route '{name}' (id: {comp_id})")
# Link to subscription
link_payload = dict(obj)
link_payload["subscriptions"] = [sub_id]
api(base, "PUT", f"/routes/{comp_id}", auth, link_payload)
print(f" [LINKED] COMPOSITE route → subscription {sub_id}")
return obj
def main():
parser = argparse.ArgumentParser(description="Idempotent deploy of ST AR flow from spec YAML")
parser.add_argument("--spec", required=True, help="Path to flow spec YAML")
parser.add_argument("--host", required=True)
parser.add_argument("--port", default="444")
parser.add_argument("--user", required=True)
parser.add_argument("--pass", dest="password", required=True)
parser.add_argument("--dry-run", action="store_true", help="Show planned actions without executing")
args = parser.parse_args()
with open(args.spec) as f:
spec = yaml.safe_load(f)
base = f"https://{args.host}:{args.port}/api/v2.0"
auth = base64.b64encode(f"{args.user}:{args.password}".encode()).decode()
print(f"\n{'[DRY RUN] ' if args.dry_run else ''}Deploying spec: {spec['meta']['name']}")
print(f" {spec['meta'].get('description','')}\n")
# Pre-flight: validate prerequisites
print("Pre-flight check...")
env_data = snapshot(args.host, args.port, args.user, args.password)
prereqs = spec.get("prerequisites", {})
missing = validate_spec_prerequisites(env_data, prereqs)
if missing:
print("PRE-FLIGHT FAILED:")
for m in missing:
print(f"{m}")
sys.exit(2)
print(" Pre-flight passed\n")
print("Deploying...")
try:
app = ensure_application(base, auth, spec, args.dry_run)
sub = ensure_subscription(base, auth, spec, args.dry_run)
simple = ensure_simple_route(base, auth, spec, args.dry_run)
template = ensure_template_route(base, auth, spec, simple["id"], args.dry_run)
composite = ensure_composite_route(base, auth, spec, template["id"], sub["id"], args.dry_run)
print(f"\n{'[DRY RUN] ' if args.dry_run else ''}Deploy complete.")
except RuntimeError as e:
print(f"\nDeploy error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()