From 55ad444c0404394cddc860918e210282620611fc Mon Sep 17 00:00:00 2001 From: Conan Scott Date: Wed, 4 Mar 2026 15:47:56 +0000 Subject: [PATCH] =?UTF-8?q?Add=20st=5Fclient.py=20=E2=80=94=20typed=20AR?= =?UTF-8?q?=20flow=20client=20library?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- st_client.py | 531 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 531 insertions(+) create mode 100644 st_client.py diff --git a/st_client.py b/st_client.py new file mode 100644 index 0000000..e5cb432 --- /dev/null +++ b/st_client.py @@ -0,0 +1,531 @@ +#!/usr/bin/env python3 +""" +st_client.py — SecureTransport AR Flow Client Library + +Abstracts the ST REST API quirks so callers work with typed objects, +not raw JSON payloads. All known field requirements, mandatory defaults, +and undocumented formats are encoded here. + +Usage: + import sys + sys.path.insert(0, '/home/node/.openclaw/site-packages') + from st_client import STClient, CompressStep, PgpEncryptStep, SendToPartnerStep + + st = STClient("192.168.0.245", "admin", "openclaw") + flow = st.create_ar_flow( + name="floweng", + account="conan", + folder="/floweng", + steps=[ + PgpEncryptStep(filter="*.jpg", key="test-pgp-public", key_owner="conan"), + CompressStep(filter="*.txt"), + SendToPartnerStep(site="clawdbox-partner-site-1771557836"), + ] + ) + st.delete_ar_flow("floweng") +""" + +import sys +import json +import time +import io +import logging + +sys.path.insert(0, '/home/node/.openclaw/site-packages') +import requests +import paramiko +import urllib3 + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +log = logging.getLogger("st_client") + + +# --------------------------------------------------------------------------- +# Step definitions +# --------------------------------------------------------------------------- + +class CompressStep: + """Compress matching files to ZIP. Non-matching files pass through.""" + def __init__(self, filter="*", level="NORMAL", single_archive=False, + on_failure="PROCEED"): + self.filter = filter + self.level = level + self.single_archive = single_archive + self.on_failure = on_failure + + def to_api(self): + payload = { + "type": "Compress", + "status": "ENABLED", + "conditionType": "ALWAYS", + "actionOnStepFailure": self.on_failure, + "fileFilterExpression": self.filter, + "fileFilterExpressionType": "GLOB", + "usePrecedingStepFiles": False, + "compressionType": "ZIP", + # Empirically validated values: STORE FASTEST FAST NORMAL GOOD BETTER BEST + "compressionLevel": self.level, + "singleArchiveEnabled": self.single_archive, + "zipPassword": "", + } + return payload + + +class PgpEncryptStep: + """PGP-encrypt matching files. Non-matching files pass through.""" + def __init__(self, filter="*", key=None, key_owner=None, + ascii_armour=False, on_failure="PROCEED"): + if not key: + raise ValueError("PgpEncryptStep requires key (cert alias name)") + if not key_owner: + raise ValueError("PgpEncryptStep requires key_owner (account name)") + self.filter = filter + self.key = key + self.key_owner = key_owner + self.ascii_armour = ascii_armour + self.on_failure = on_failure + + def to_api(self): + return { + "type": "PgpEncryption", + "status": "ENABLED", + "conditionType": "ALWAYS", + "actionOnStepFailure": self.on_failure, + "fileFilterExpression": self.filter, + "fileFilterExpressionType": "GLOB", + "usePrecedingStepFiles": False, + # ALIAS = use cert name directly; EXPRESSION_WILDCARD = EL expression + "encryptKeyExpression": self.key, + "encryptKeyExpressionType": "ALIAS", + "encryptKeyOwnerExpression": self.key_owner, + "encryptKeyOwnerExpressionType": "NAME", + # "0" disables PGP-level compression (use Compress step instead) + "compressionType": "0", + "useAsciiArmour": self.ascii_armour, + } + + +class SendToPartnerStep: + """Deliver all files in route payload to a partner transfer site.""" + def __init__(self, site=None, filter="*", on_failure="FAIL"): + if not site: + raise ValueError("SendToPartnerStep requires site (transfer site name)") + self.site = site + self.filter = filter + self.on_failure = on_failure + + def to_api(self): + return { + "type": "SendToPartner", + "status": "ENABLED", + "conditionType": "ALWAYS", + "actionOnStepFailure": self.on_failure, + "fileFilterExpression": self.filter, + "fileFilterExpressionType": "GLOB", + "usePrecedingStepFiles": False, + # API requires LIST type with #!#CVD#!# suffix — not documented publicly + "transferSiteExpression": f"{self.site}#!#CVD#!#", + "transferSiteExpressionType": "LIST", + } + + +# --------------------------------------------------------------------------- +# AR Flow result object +# --------------------------------------------------------------------------- + +class ARFlow: + """Holds IDs for all objects in a created AR flow.""" + def __init__(self, name, app_id, sub_id, simple_id, template_id, composite_id): + self.name = name + self.app_id = app_id + self.sub_id = sub_id + self.simple_id = simple_id + self.template_id = template_id + self.composite_id = composite_id + + def __repr__(self): + return (f"ARFlow({self.name!r}, app={self.app_id}, sub={self.sub_id}, " + f"simple={self.simple_id}, template={self.template_id}, " + f"composite={self.composite_id})") + + +# --------------------------------------------------------------------------- +# Main client +# --------------------------------------------------------------------------- + +class STClient: + """ + SecureTransport REST API client. + + Handles auth, TLS, empty-body responses, and the AR object graph. + All API quirks are encoded here — callers work with typed objects. + """ + + def __init__(self, host, user, password, port=444, verify=False): + self.base = f"https://{host}:{port}/api/v2.0" + self.base_v14 = f"https://{host}:{port}/api/v1.4" + self.session = requests.Session() + self.session.auth = (user, password) + self.session.verify = verify + self.session.headers.update({"Accept": "application/json", + "Content-Type": "application/json"}) + self._host = host + self._port = port + + # -- Low-level helpers -------------------------------------------------- + + def _get(self, path, v14=False): + base = self.base_v14 if v14 else self.base + r = self.session.get(f"{base}{path}") + r.raise_for_status() + return r.json() + + def _post(self, path, body): + r = self.session.post(f"{self.base}{path}", json=body) + if r.status_code not in (200, 201, 204): + try: + detail = r.json() + except Exception: + detail = r.text + raise RuntimeError(f"POST {path} failed {r.status_code}: {detail}") + # Some endpoints return empty body on success + if r.content.strip(): + return r.json() + return {} + + def _put(self, path, body): + r = self.session.put(f"{self.base}{path}", json=body) + if r.status_code not in (200, 201, 204): + try: + detail = r.json() + except Exception: + detail = r.text + raise RuntimeError(f"PUT {path} failed {r.status_code}: {detail}") + if r.content.strip(): + return r.json() + return {} + + def _delete(self, path): + r = self.session.delete(f"{self.base}{path}") + if r.status_code not in (200, 204): + try: + detail = r.json() + except Exception: + detail = r.text + raise RuntimeError(f"DELETE {path} failed {r.status_code}: {detail}") + + def _find_by_name(self, path, name): + """Return first result with matching name, or None.""" + data = self._get(f"{path}?name={requests.utils.quote(name)}&limit=10") + for item in data.get("result", []): + if item.get("name") == name: + return item + return None + + # -- Snapshot ----------------------------------------------------------- + + def snapshot(self): + """Return a summary of the live ST environment.""" + accounts_raw = self._get("/accounts?limit=200").get("result", []) + sites_raw = self._get("/sites?limit=100").get("result", []) + certs_raw = self._get("/certificates?limit=200").get("result", []) + apps_raw = self._get("/applications?limit=100").get("result", []) + subs_raw = self._get("/subscriptions?limit=100").get("result", []) + + return { + "accounts": [ + {"name": a["name"], "type": a.get("type"), + "locked": a.get("user", {}).get("locked", False)} + for a in accounts_raw if a.get("type") in ("user",) + ], + "sites": [ + {"name": s["name"], "protocol": s.get("protocol"), + "host": s.get("host"), "port": s.get("port")} + for s in sites_raw + ], + "certificates": [ + {"name": c["name"], "type": c.get("type"), + "usage": c.get("usage"), "account": c.get("account"), + "accessLevel": c.get("accessLevel")} + for c in certs_raw + if c.get("type") in ("pgp", "ssh") + ], + "ar_applications": [ + {"name": a["name"], "id": a["id"]} + for a in apps_raw + if a.get("type") == "AdvancedRouting" + ], + "subscriptions": [ + {"id": s["id"], "account": s.get("account"), + "folder": s.get("folder"), "application": s.get("application")} + for s in subs_raw + ], + } + + # -- AR Flow lifecycle -------------------------------------------------- + + def create_ar_flow(self, name, account, folder, steps, + pta_trigger=True): + """ + Create a complete AR flow: Application → Subscription → + SIMPLE route → TEMPLATE route → COMPOSITE route → link. + + Returns an ARFlow object with all created IDs. + Raises on any step failure — no partial state left unchecked. + """ + app_name = f"{name}-ar" + simple_name = f"{name}-simple" + template_name = f"{name}-template" + composite_name = f"{name}-composite" + + log.info(f"Creating AR Application: {app_name}") + existing_app = self._find_by_name("/applications", app_name) + if existing_app: + log.info(f" Application already exists: {existing_app['id']}") + app_id = existing_app["id"] + else: + self._post("/applications", {"name": app_name, "type": "AdvancedRouting"}) + app_id = self._find_by_name("/applications", app_name)["id"] + log.info(f" app_id={app_id}") + + log.info(f"Creating Subscription: account={account} folder={folder}") + existing_sub = None + subs = self._get(f"/subscriptions?account={account}&limit=100").get("result", []) + for s in subs: + if s.get("folder") == folder and s.get("application") == app_name: + existing_sub = s + break + if existing_sub: + log.info(f" Subscription already exists: {existing_sub['id']}") + sub_id = existing_sub["id"] + else: + pta = { + "ptaOnSuccessDoInAdvancedRoutingWildcardPull": pta_trigger, + "submitFilenamePatternExpression": "*", + "submitFilterType": "FILENAME_PATTERN", + "triggerFileOption": "fail", + "triggerOnSuccessfulWildcardPull": pta_trigger, + } + self._post("/subscriptions", { + "type": "AdvancedRouting", + "folder": folder, + "account": account, + "application": app_name, + "postTransmissionActions": pta, + }) + subs = self._get(f"/subscriptions?account={account}&limit=100").get("result", []) + sub_id = next(s["id"] for s in subs + if s.get("folder") == folder and s.get("application") == app_name) + log.info(f" sub_id={sub_id}") + + log.info(f"Creating SIMPLE route: {simple_name}") + existing_simple = self._find_by_name("/routes", simple_name) + if existing_simple: + log.info(f" SIMPLE already exists: {existing_simple['id']}") + simple_id = existing_simple["id"] + else: + step_payloads = [s.to_api() for s in steps] + self._post("/routes", { + "name": simple_name, + "type": "SIMPLE", + "conditionType": "ALWAYS", + "steps": step_payloads, + }) + simple_id = self._find_by_name("/routes", simple_name)["id"] + log.info(f" simple_id={simple_id}") + + log.info(f"Creating TEMPLATE route: {template_name}") + existing_template = self._find_by_name("/routes", template_name) + if existing_template: + log.info(f" TEMPLATE already exists: {existing_template['id']}") + template_id = existing_template["id"] + else: + # TEMPLATE conditionType quirk: must be in payload but API rejects + # it as "unsupported parameter" in multiline payloads. Send as a + # pre-serialised string to avoid requests library re-serialising with + # newlines. Use data= not json= to control the wire format exactly. + template_payload = json.dumps({ + "name": template_name, + "type": "TEMPLATE", + "conditionType": "MATCH_ALL", + "steps": [{"type": "ExecuteRoute", "status": "ENABLED", + "executeRoute": simple_id, "autostart": False}] + }) + r = self.session.post(f"{self.base}/routes", data=template_payload) + if r.status_code not in (200, 201, 204): + raise RuntimeError( + f"POST TEMPLATE failed {r.status_code}: {r.text}") + template_id = self._find_by_name("/routes", template_name)["id"] + log.info(f" template_id={template_id}") + + log.info(f"Creating COMPOSITE route: {composite_name}") + existing_composite = self._find_by_name("/routes", composite_name) + if existing_composite: + log.info(f" COMPOSITE already exists: {existing_composite['id']}") + composite_id = existing_composite["id"] + else: + self._post("/routes", { + "name": composite_name, + "type": "COMPOSITE", + "conditionType": "MATCH_ALL", + "routeTemplate": template_id, + "steps": [], + }) + composite_id = self._find_by_name("/routes", composite_name)["id"] + log.info(f" composite_id={composite_id}") + + log.info(f"Linking COMPOSITE to subscription") + comp_full = self._get(f"/routes/{composite_id}") + if sub_id not in comp_full.get("subscriptions", []): + comp_full["subscriptions"] = [sub_id] + for field in ("id", "metadata", "steps", "routeTemplateName", "account"): + comp_full.pop(field, None) + self._put(f"/routes/{composite_id}", comp_full) + log.info(f" Linked.") + else: + log.info(f" Already linked.") + + flow = ARFlow(name, app_id, sub_id, simple_id, template_id, composite_id) + log.info(f"Flow created: {flow}") + return flow + + def delete_ar_flow(self, name): + """ + Delete a complete AR flow by name prefix. + Deletion order: unlink COMPOSITE → delete COMPOSITE → + delete TEMPLATE → delete SIMPLE → delete subscription → delete application. + """ + composite_name = f"{name}-composite" + template_name = f"{name}-template" + simple_name = f"{name}-simple" + app_name = f"{name}-ar" + + # 1. Unlink and delete COMPOSITE + composite = self._find_by_name("/routes", composite_name) + if composite: + cid = composite["id"] + full = self._get(f"/routes/{cid}") + if full.get("subscriptions"): + full["subscriptions"] = [] + for field in ("id", "metadata", "steps", "routeTemplateName", "account"): + full.pop(field, None) + self._put(f"/routes/{cid}", full) + log.info(f"Unlinked COMPOSITE {cid}") + self._delete(f"/routes/{cid}") + log.info(f"Deleted COMPOSITE {cid}") + + # 2. Delete TEMPLATE + template = self._find_by_name("/routes", template_name) + if template: + self._delete(f"/routes/{template['id']}") + log.info(f"Deleted TEMPLATE {template['id']}") + + # 3. Delete SIMPLE + simple = self._find_by_name("/routes", simple_name) + if simple: + self._delete(f"/routes/{simple['id']}") + log.info(f"Deleted SIMPLE {simple['id']}") + + # 4. Delete subscription + subs = self._get("/subscriptions?limit=100").get("result", []) + for s in subs: + if s.get("application") == app_name: + self._delete(f"/subscriptions/{s['id']}") + log.info(f"Deleted subscription {s['id']}") + + # 5. Delete application + app = self._find_by_name("/applications", app_name) + if app: + self._delete(f"/applications/{app['id']}") + log.info(f"Deleted application {app['id']}") + + log.info(f"Flow '{name}' deleted.") + + # -- TM health check ---------------------------------------------------- + + def verify_routing(self, sftp_host, sftp_port, sftp_user, sftp_password, + upload_folder, dest_host, dest_port, dest_user, dest_key, + dest_folder, probe_filename="tm-probe.txt", + timeout=30): + """ + Upload a probe file via SFTP, wait for routing to deliver it, + verify arrival at destination. Returns True on success. + + Raises TMHealthError if TM does not process within timeout. + """ + probe_content = f"tm-probe {time.time()}\n".encode() + + log.info(f"Uploading probe: {upload_folder}/{probe_filename}") + transport = paramiko.Transport((sftp_host, sftp_port)) + transport.connect(username=sftp_user, password=sftp_password) + sftp = paramiko.SFTPClient.from_transport(transport) + try: + sftp.chdir(upload_folder) + except IOError: + sftp.mkdir(upload_folder) + sftp.chdir(upload_folder) + sftp.putfo(io.BytesIO(probe_content), probe_filename) + sftp.close() + transport.close() + log.info(" Probe uploaded.") + + log.info(f"Waiting for delivery to {dest_folder}/{probe_filename}") + deadline = time.time() + timeout + key = paramiko.RSAKey.from_private_key_file(dest_key) if dest_key else None + while time.time() < deadline: + time.sleep(3) + try: + t2 = paramiko.Transport((dest_host, dest_port)) + if key: + t2.connect(username=dest_user, pkey=key) + else: + t2.connect(username=dest_user) + s2 = paramiko.SFTPClient.from_transport(t2) + files = s2.listdir(dest_folder) + s2.close(); t2.close() + if probe_filename in files: + log.info(f" Probe arrived. TM healthy.") + return True + except Exception as e: + log.debug(f" Destination check: {e}") + raise TMHealthError( + f"Probe '{probe_filename}' not found in '{dest_folder}' after {timeout}s. " + f"TM may not be running or subscription not registered. " + f"Try: ST Admin UI → Server Configuration → Transaction Manager → Restart" + ) + + +class TMHealthError(Exception): + pass + + +# --------------------------------------------------------------------------- +# CLI entry point +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + import argparse + logging.basicConfig(level=logging.INFO, format="%(message)s") + + parser = argparse.ArgumentParser(description="ST Client CLI") + parser.add_argument("--host", required=True) + parser.add_argument("--port", type=int, default=444) + parser.add_argument("--user", default="admin") + parser.add_argument("--pass", dest="password", default="openclaw") + sub = parser.add_subparsers(dest="cmd") + + sub.add_parser("snapshot") + + d = sub.add_parser("delete-flow") + d.add_argument("name") + + args = parser.parse_args() + st = STClient(args.host, args.user, args.password, args.port) + + if args.cmd == "snapshot": + print(json.dumps(st.snapshot(), indent=2)) + elif args.cmd == "delete-flow": + st.delete_ar_flow(args.name) + else: + parser.print_help()