From e827c184331183bf80c409ad781aec7088367b2f Mon Sep 17 00:00:00 2001 From: Conan Scott Date: Wed, 4 Mar 2026 17:36:02 +0000 Subject: [PATCH] refactor: primitives-only client, drop create_ar_flow factory --- st_client.py | 590 ++++++++++++++++++++++++--------------------------- 1 file changed, 273 insertions(+), 317 deletions(-) diff --git a/st_client.py b/st_client.py index e5cb432..37eb09b 100644 --- a/st_client.py +++ b/st_client.py @@ -1,10 +1,9 @@ #!/usr/bin/env python3 """ -st_client.py — SecureTransport AR Flow Client Library +st_client.py — SecureTransport REST API Primitives -Abstracts the ST REST API quirks so callers work with typed objects, -not raw JSON payloads. All known field requirements, mandatory defaults, -and undocumented formats are encoded here. +Thin, correct bindings to the ST v2.0 REST API. +No flow architecture opinions here — composition is the caller's responsibility. Usage: import sys @@ -12,28 +11,37 @@ Usage: from st_client import STClient, CompressStep, PgpEncryptStep, SendToPartnerStep st = STClient("192.168.0.245", "admin", "openclaw") - flow = st.create_ar_flow( - name="floweng", - account="conan", - folder="/floweng", - steps=[ - PgpEncryptStep(filter="*.jpg", key="test-pgp-public", key_owner="conan"), - CompressStep(filter="*.txt"), - SendToPartnerStep(site="clawdbox-partner-site-1771557836"), - ] - ) - st.delete_ar_flow("floweng") + snap = st.snapshot() + + app_id = st.create_application("myflow-ar") + sub_id = st.create_subscription("conan", "/myflow", "myflow-ar") + simple_id = st.create_simple_route("myflow-simple", steps=[ + PgpEncryptStep(filter="*.jpg", key="my-pgp-cert", key_owner="conan"), + CompressStep(filter="*.txt"), + SendToPartnerStep(site="my-site", target_account="partner"), + ]) + template_id = st.create_template_route("myflow-template", simple_id) + composite_id = st.create_composite_route("myflow-composite", template_id) + st.link_subscription(composite_id, sub_id) + +Known API quirks encoded here (so you don't rediscover them): +- TEMPLATE route: must use data= not json= to avoid newline parse issue in conditionType +- conditionType must be explicit on every route and step; null causes NPE at runtime +- SendToPartner: transferSiteExpression requires #!#CVD#!# suffix for LIST type +- PgpEncryption: encryptKeyExpressionType only accepts ALIAS or EXPRESSION_WILDCARD +- PgpEncryption: cert must have accessLevel=PUBLIC and account= or routing engine can't see it +- Compress: singleArchiveEnabled=null treated as true by UI; always set explicitly +- Compress: compressionType "0" on PgpEncryptStep disables PGP-level compression (not a bug) +- SendToPartner: target_account controls whose sites are used — must match site owner's account +- POST /applications returns empty body on success; follow-up GET required for ID """ import sys import json -import time -import io import logging sys.path.insert(0, '/home/node/.openclaw/site-packages') import requests -import paramiko import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -42,7 +50,7 @@ log = logging.getLogger("st_client") # --------------------------------------------------------------------------- -# Step definitions +# Step definitions — encode all field quirks, expose only meaningful params # --------------------------------------------------------------------------- class CompressStep: @@ -55,7 +63,7 @@ class CompressStep: self.on_failure = on_failure def to_api(self): - payload = { + return { "type": "Compress", "status": "ENABLED", "conditionType": "ALWAYS", @@ -64,22 +72,22 @@ class CompressStep: "fileFilterExpressionType": "GLOB", "usePrecedingStepFiles": False, "compressionType": "ZIP", - # Empirically validated values: STORE FASTEST FAST NORMAL GOOD BETTER BEST + # Validated values: STORE FASTEST FAST NORMAL GOOD BETTER BEST "compressionLevel": self.level, + # null is treated as true by the UI — always set explicitly "singleArchiveEnabled": self.single_archive, "zipPassword": "", } - return payload class PgpEncryptStep: - """PGP-encrypt matching files. Non-matching files pass through.""" + """PGP-encrypt matching files. Cert must be PUBLIC and owned by key_owner.""" def __init__(self, filter="*", key=None, key_owner=None, ascii_armour=False, on_failure="PROCEED"): if not key: raise ValueError("PgpEncryptStep requires key (cert alias name)") if not key_owner: - raise ValueError("PgpEncryptStep requires key_owner (account name)") + raise ValueError("PgpEncryptStep requires key_owner (account name that owns the cert)") self.filter = filter self.key = key self.key_owner = key_owner @@ -96,27 +104,36 @@ class PgpEncryptStep: "fileFilterExpressionType": "GLOB", "usePrecedingStepFiles": False, # ALIAS = use cert name directly; EXPRESSION_WILDCARD = EL expression + # No other values accepted — will silently fail with PROCEED "encryptKeyExpression": self.key, "encryptKeyExpressionType": "ALIAS", "encryptKeyOwnerExpression": self.key_owner, "encryptKeyOwnerExpressionType": "NAME", - # "0" disables PGP-level compression (use Compress step instead) + # "0" disables PGP-level compression (ZIP handled by CompressStep) "compressionType": "0", "useAsciiArmour": self.ascii_armour, } class SendToPartnerStep: - """Deliver all files in route payload to a partner transfer site.""" - def __init__(self, site=None, filter="*", on_failure="FAIL"): + """Deliver files to a partner transfer site. + + target_account: the account that owns the transfer site. The AR flow runs + in the inbound account's context — if the site belongs to a different account + (e.g. a dedicated partner account), set target_account to that account name. + Omit if the site is owned by the same account running the flow. + """ + def __init__(self, site=None, filter="*", target_account=None, + on_failure="FAIL"): if not site: raise ValueError("SendToPartnerStep requires site (transfer site name)") self.site = site self.filter = filter + self.target_account = target_account self.on_failure = on_failure def to_api(self): - return { + payload = { "type": "SendToPartner", "status": "ENABLED", "conditionType": "ALWAYS", @@ -124,98 +141,68 @@ class SendToPartnerStep: "fileFilterExpression": self.filter, "fileFilterExpressionType": "GLOB", "usePrecedingStepFiles": False, - # API requires LIST type with #!#CVD#!# suffix — not documented publicly + # LIST type requires #!#CVD#!# suffix — not in public docs "transferSiteExpression": f"{self.site}#!#CVD#!#", "transferSiteExpressionType": "LIST", } + if self.target_account: + payload["targetAccountExpression"] = self.target_account + payload["targetAccountExpressionType"] = "NAME" + return payload # --------------------------------------------------------------------------- -# AR Flow result object -# --------------------------------------------------------------------------- - -class ARFlow: - """Holds IDs for all objects in a created AR flow.""" - def __init__(self, name, app_id, sub_id, simple_id, template_id, composite_id): - self.name = name - self.app_id = app_id - self.sub_id = sub_id - self.simple_id = simple_id - self.template_id = template_id - self.composite_id = composite_id - - def __repr__(self): - return (f"ARFlow({self.name!r}, app={self.app_id}, sub={self.sub_id}, " - f"simple={self.simple_id}, template={self.template_id}, " - f"composite={self.composite_id})") - - -# --------------------------------------------------------------------------- -# Main client +# Main client — primitives only # --------------------------------------------------------------------------- class STClient: """ - SecureTransport REST API client. + SecureTransport REST API client — primitive operations only. - Handles auth, TLS, empty-body responses, and the AR object graph. - All API quirks are encoded here — callers work with typed objects. + Handles auth, TLS quirks, and empty-body responses. + Callers are responsible for object ordering and composition. """ def __init__(self, host, user, password, port=444, verify=False): self.base = f"https://{host}:{port}/api/v2.0" - self.base_v14 = f"https://{host}:{port}/api/v1.4" self.session = requests.Session() self.session.auth = (user, password) self.session.verify = verify self.session.headers.update({"Accept": "application/json", "Content-Type": "application/json"}) - self._host = host - self._port = port - # -- Low-level helpers -------------------------------------------------- + # -- Low-level HTTP helpers --------------------------------------------- - def _get(self, path, v14=False): - base = self.base_v14 if v14 else self.base - r = self.session.get(f"{base}{path}") + def _get(self, path): + r = self.session.get(f"{self.base}{path}") r.raise_for_status() return r.json() def _post(self, path, body): r = self.session.post(f"{self.base}{path}", json=body) if r.status_code not in (200, 201, 204): - try: - detail = r.json() - except Exception: - detail = r.text - raise RuntimeError(f"POST {path} failed {r.status_code}: {detail}") - # Some endpoints return empty body on success - if r.content.strip(): - return r.json() - return {} + raise RuntimeError(f"POST {path} → {r.status_code}: {_err(r)}") + return r.json() if r.content.strip() else {} + + def _post_raw(self, path, raw_str): + """POST with pre-serialised string body — avoids requests re-serialising.""" + r = self.session.post(f"{self.base}{path}", data=raw_str) + if r.status_code not in (200, 201, 204): + raise RuntimeError(f"POST {path} → {r.status_code}: {_err(r)}") + return r.json() if r.content.strip() else {} def _put(self, path, body): r = self.session.put(f"{self.base}{path}", json=body) if r.status_code not in (200, 201, 204): - try: - detail = r.json() - except Exception: - detail = r.text - raise RuntimeError(f"PUT {path} failed {r.status_code}: {detail}") - if r.content.strip(): - return r.json() - return {} + raise RuntimeError(f"PUT {path} → {r.status_code}: {_err(r)}") + return r.json() if r.content.strip() else {} def _delete(self, path): r = self.session.delete(f"{self.base}{path}") if r.status_code not in (200, 204): - try: - detail = r.json() - except Exception: - detail = r.text - raise RuntimeError(f"DELETE {path} failed {r.status_code}: {detail}") + raise RuntimeError(f"DELETE {path} → {r.status_code}: {_err(r)}") - def _find_by_name(self, path, name): + def _find(self, path, name): """Return first result with matching name, or None.""" data = self._get(f"{path}?name={requests.utils.quote(name)}&limit=10") for item in data.get("result", []): @@ -223,281 +210,242 @@ class STClient: return item return None - # -- Snapshot ----------------------------------------------------------- + # -- Introspection ------------------------------------------------------ def snapshot(self): - """Return a summary of the live ST environment.""" - accounts_raw = self._get("/accounts?limit=200").get("result", []) - sites_raw = self._get("/sites?limit=100").get("result", []) - certs_raw = self._get("/certificates?limit=200").get("result", []) - apps_raw = self._get("/applications?limit=100").get("result", []) - subs_raw = self._get("/subscriptions?limit=100").get("result", []) - + """Live environment summary — run before every build.""" + accounts = self._get("/accounts?limit=200").get("result", []) + sites = self._get("/sites?limit=100").get("result", []) + certs = self._get("/certificates?limit=200").get("result", []) + apps = self._get("/applications?limit=100").get("result", []) + subs = self._get("/subscriptions?limit=100").get("result", []) return { "accounts": [ {"name": a["name"], "type": a.get("type"), "locked": a.get("user", {}).get("locked", False)} - for a in accounts_raw if a.get("type") in ("user",) + for a in accounts if a.get("type") == "user" ], "sites": [ - {"name": s["name"], "protocol": s.get("protocol"), + {"name": s["name"], "account": s.get("account"), + "protocol": s.get("protocol"), "host": s.get("host"), "port": s.get("port")} - for s in sites_raw + for s in sites ], "certificates": [ {"name": c["name"], "type": c.get("type"), "usage": c.get("usage"), "account": c.get("account"), "accessLevel": c.get("accessLevel")} - for c in certs_raw - if c.get("type") in ("pgp", "ssh") + for c in certs if c.get("type") in ("pgp", "ssh") ], "ar_applications": [ {"name": a["name"], "id": a["id"]} - for a in apps_raw - if a.get("type") == "AdvancedRouting" + for a in apps if a.get("type") == "AdvancedRouting" ], "subscriptions": [ {"id": s["id"], "account": s.get("account"), "folder": s.get("folder"), "application": s.get("application")} - for s in subs_raw + for s in subs ], } - # -- AR Flow lifecycle -------------------------------------------------- - - def create_ar_flow(self, name, account, folder, steps, - pta_trigger=True): + def get_transfer_logs(self, limit=20, offset=0): """ - Create a complete AR flow: Application → Subscription → - SIMPLE route → TEMPLATE route → COMPOSITE route → link. - - Returns an ARFlow object with all created IDs. - Raises on any step failure — no partial state left unchecked. + Fetch transfer log entries. Correct endpoint: /logs/transfers + (not /transfers, /filetracking, or /logs/files — all 404). + Returns raw result list. """ - app_name = f"{name}-ar" - simple_name = f"{name}-simple" - template_name = f"{name}-template" - composite_name = f"{name}-composite" + data = self._get(f"/logs/transfers?limit={limit}&offset={offset}") + return data.get("result", []) - log.info(f"Creating AR Application: {app_name}") - existing_app = self._find_by_name("/applications", app_name) - if existing_app: - log.info(f" Application already exists: {existing_app['id']}") - app_id = existing_app["id"] - else: - self._post("/applications", {"name": app_name, "type": "AdvancedRouting"}) - app_id = self._find_by_name("/applications", app_name)["id"] - log.info(f" app_id={app_id}") + def get_transfer_detail(self, url_representation): + """Fetch full transfer detail including subtransmission step breakdown.""" + return self._get(f"/logs/transfers/{url_representation}") - log.info(f"Creating Subscription: account={account} folder={folder}") - existing_sub = None + # -- Applications ------------------------------------------------------- + + def create_application(self, name, app_type="AdvancedRouting"): + """Create an AR application. Returns the application ID.""" + existing = self._find("/applications", name) + if existing: + log.info(f"Application '{name}' already exists: {existing['id']}") + return existing["id"] + self._post("/applications", {"name": name, "type": app_type}) + app = self._find("/applications", name) + log.info(f"Created application '{name}': {app['id']}") + return app["id"] + + def delete_application(self, name): + """Delete an application by name.""" + app = self._find("/applications", name) + if app: + self._delete(f"/applications/{app['id']}") + log.info(f"Deleted application '{name}'") + + # -- Subscriptions ------------------------------------------------------ + + def create_subscription(self, account, folder, app_name): + """ + Create an AR subscription linking account+folder to an application. + + The PTA block below is required for client SFTP upload to trigger routing. + triggerFileOption='fail' is standard; submitFilenamePatternExpression + only affects server-initiated pulls, not client uploads. + Returns the subscription ID. + """ subs = self._get(f"/subscriptions?account={account}&limit=100").get("result", []) for s in subs: if s.get("folder") == folder and s.get("application") == app_name: - existing_sub = s - break - if existing_sub: - log.info(f" Subscription already exists: {existing_sub['id']}") - sub_id = existing_sub["id"] - else: - pta = { - "ptaOnSuccessDoInAdvancedRoutingWildcardPull": pta_trigger, + log.info(f"Subscription '{account}{folder}→{app_name}' already exists: {s['id']}") + return s["id"] + self._post("/subscriptions", { + "type": "AdvancedRouting", + "folder": folder, + "account": account, + "application": app_name, + "postTransmissionActions": { + "ptaOnSuccessDoInAdvancedRoutingWildcardPull": True, "submitFilenamePatternExpression": "*", "submitFilterType": "FILENAME_PATTERN", "triggerFileOption": "fail", - "triggerOnSuccessfulWildcardPull": pta_trigger, - } - self._post("/subscriptions", { - "type": "AdvancedRouting", - "folder": folder, - "account": account, - "application": app_name, - "postTransmissionActions": pta, - }) - subs = self._get(f"/subscriptions?account={account}&limit=100").get("result", []) - sub_id = next(s["id"] for s in subs - if s.get("folder") == folder and s.get("application") == app_name) - log.info(f" sub_id={sub_id}") + "triggerOnSuccessfulWildcardPull": True, + }, + }) + subs = self._get(f"/subscriptions?account={account}&limit=100").get("result", []) + sub = next(s for s in subs + if s.get("folder") == folder and s.get("application") == app_name) + log.info(f"Created subscription '{account}{folder}→{app_name}': {sub['id']}") + return sub["id"] - log.info(f"Creating SIMPLE route: {simple_name}") - existing_simple = self._find_by_name("/routes", simple_name) - if existing_simple: - log.info(f" SIMPLE already exists: {existing_simple['id']}") - simple_id = existing_simple["id"] - else: - step_payloads = [s.to_api() for s in steps] - self._post("/routes", { - "name": simple_name, - "type": "SIMPLE", - "conditionType": "ALWAYS", - "steps": step_payloads, - }) - simple_id = self._find_by_name("/routes", simple_name)["id"] - log.info(f" simple_id={simple_id}") + def delete_subscription(self, sub_id): + """Delete a subscription by ID.""" + self._delete(f"/subscriptions/{sub_id}") + log.info(f"Deleted subscription {sub_id}") - log.info(f"Creating TEMPLATE route: {template_name}") - existing_template = self._find_by_name("/routes", template_name) - if existing_template: - log.info(f" TEMPLATE already exists: {existing_template['id']}") - template_id = existing_template["id"] - else: - # TEMPLATE conditionType quirk: must be in payload but API rejects - # it as "unsupported parameter" in multiline payloads. Send as a - # pre-serialised string to avoid requests library re-serialising with - # newlines. Use data= not json= to control the wire format exactly. - template_payload = json.dumps({ - "name": template_name, - "type": "TEMPLATE", - "conditionType": "MATCH_ALL", - "steps": [{"type": "ExecuteRoute", "status": "ENABLED", - "executeRoute": simple_id, "autostart": False}] - }) - r = self.session.post(f"{self.base}/routes", data=template_payload) - if r.status_code not in (200, 201, 204): - raise RuntimeError( - f"POST TEMPLATE failed {r.status_code}: {r.text}") - template_id = self._find_by_name("/routes", template_name)["id"] - log.info(f" template_id={template_id}") + # -- Routes ------------------------------------------------------------- - log.info(f"Creating COMPOSITE route: {composite_name}") - existing_composite = self._find_by_name("/routes", composite_name) - if existing_composite: - log.info(f" COMPOSITE already exists: {existing_composite['id']}") - composite_id = existing_composite["id"] - else: - self._post("/routes", { - "name": composite_name, - "type": "COMPOSITE", - "conditionType": "MATCH_ALL", - "routeTemplate": template_id, - "steps": [], - }) - composite_id = self._find_by_name("/routes", composite_name)["id"] - log.info(f" composite_id={composite_id}") - - log.info(f"Linking COMPOSITE to subscription") - comp_full = self._get(f"/routes/{composite_id}") - if sub_id not in comp_full.get("subscriptions", []): - comp_full["subscriptions"] = [sub_id] - for field in ("id", "metadata", "steps", "routeTemplateName", "account"): - comp_full.pop(field, None) - self._put(f"/routes/{composite_id}", comp_full) - log.info(f" Linked.") - else: - log.info(f" Already linked.") - - flow = ARFlow(name, app_id, sub_id, simple_id, template_id, composite_id) - log.info(f"Flow created: {flow}") - return flow - - def delete_ar_flow(self, name): + def create_simple_route(self, name, steps, condition_type="ALWAYS", condition=None): """ - Delete a complete AR flow by name prefix. - Deletion order: unlink COMPOSITE → delete COMPOSITE → - delete TEMPLATE → delete SIMPLE → delete subscription → delete application. + Create a SIMPLE route with the given steps. + Steps are auto-chained in array order. + Returns the route ID. + + condition_type: ALWAYS | EL + condition: EL expression string (only when condition_type=EL) + e.g. "${transfer.target.matches('.*\\\\.txt')}" + Note: double-escape the dot in Java regex within Python strings. """ - composite_name = f"{name}-composite" - template_name = f"{name}-template" - simple_name = f"{name}-simple" - app_name = f"{name}-ar" + existing = self._find("/routes", name) + if existing: + log.info(f"SIMPLE route '{name}' already exists: {existing['id']}") + return existing["id"] + body = { + "name": name, + "type": "SIMPLE", + "conditionType": condition_type, + "steps": [s.to_api() for s in steps], + } + if condition: + body["condition"] = condition + self._post("/routes", body) + route = self._find("/routes", name) + log.info(f"Created SIMPLE route '{name}': {route['id']}") + return route["id"] - # 1. Unlink and delete COMPOSITE - composite = self._find_by_name("/routes", composite_name) - if composite: - cid = composite["id"] - full = self._get(f"/routes/{cid}") - if full.get("subscriptions"): - full["subscriptions"] = [] - for field in ("id", "metadata", "steps", "routeTemplateName", "account"): - full.pop(field, None) - self._put(f"/routes/{cid}", full) - log.info(f"Unlinked COMPOSITE {cid}") - self._delete(f"/routes/{cid}") - log.info(f"Deleted COMPOSITE {cid}") - - # 2. Delete TEMPLATE - template = self._find_by_name("/routes", template_name) - if template: - self._delete(f"/routes/{template['id']}") - log.info(f"Deleted TEMPLATE {template['id']}") - - # 3. Delete SIMPLE - simple = self._find_by_name("/routes", simple_name) - if simple: - self._delete(f"/routes/{simple['id']}") - log.info(f"Deleted SIMPLE {simple['id']}") - - # 4. Delete subscription - subs = self._get("/subscriptions?limit=100").get("result", []) - for s in subs: - if s.get("application") == app_name: - self._delete(f"/subscriptions/{s['id']}") - log.info(f"Deleted subscription {s['id']}") - - # 5. Delete application - app = self._find_by_name("/applications", app_name) - if app: - self._delete(f"/applications/{app['id']}") - log.info(f"Deleted application {app['id']}") - - log.info(f"Flow '{name}' deleted.") - - # -- TM health check ---------------------------------------------------- - - def verify_routing(self, sftp_host, sftp_port, sftp_user, sftp_password, - upload_folder, dest_host, dest_port, dest_user, dest_key, - dest_folder, probe_filename="tm-probe.txt", - timeout=30): + def create_template_route(self, name, simple_route_id, + condition_type="MATCH_ALL"): """ - Upload a probe file via SFTP, wait for routing to deliver it, - verify arrival at destination. Returns True on success. + Create a TEMPLATE route that executes a SIMPLE route. + Returns the route ID. - Raises TMHealthError if TM does not process within timeout. + Quirk: must be sent as pre-serialised string (data= not json=) to + avoid requests adding newlines that break the conditionType parser. """ - probe_content = f"tm-probe {time.time()}\n".encode() + existing = self._find("/routes", name) + if existing: + log.info(f"TEMPLATE route '{name}' already exists: {existing['id']}") + return existing["id"] + payload = json.dumps({ + "name": name, + "type": "TEMPLATE", + "conditionType": condition_type, + "steps": [{ + "type": "ExecuteRoute", + "status": "ENABLED", + "executeRoute": simple_route_id, + "autostart": False, + }] + }) + self._post_raw("/routes", payload) + route = self._find("/routes", name) + log.info(f"Created TEMPLATE route '{name}': {route['id']}") + return route["id"] - log.info(f"Uploading probe: {upload_folder}/{probe_filename}") - transport = paramiko.Transport((sftp_host, sftp_port)) - transport.connect(username=sftp_user, password=sftp_password) - sftp = paramiko.SFTPClient.from_transport(transport) - try: - sftp.chdir(upload_folder) - except IOError: - sftp.mkdir(upload_folder) - sftp.chdir(upload_folder) - sftp.putfo(io.BytesIO(probe_content), probe_filename) - sftp.close() - transport.close() - log.info(" Probe uploaded.") + def create_composite_route(self, name, template_id, + condition_type="MATCH_ALL"): + """ + Create a COMPOSITE route bound to a TEMPLATE. + Returns the route ID. + Subscriptions are linked separately via link_subscription(). + """ + existing = self._find("/routes", name) + if existing: + log.info(f"COMPOSITE route '{name}' already exists: {existing['id']}") + return existing["id"] + self._post("/routes", { + "name": name, + "type": "COMPOSITE", + "conditionType": condition_type, + "routeTemplate": template_id, + "steps": [], + }) + route = self._find("/routes", name) + log.info(f"Created COMPOSITE route '{name}': {route['id']}") + return route["id"] - log.info(f"Waiting for delivery to {dest_folder}/{probe_filename}") - deadline = time.time() + timeout - key = paramiko.RSAKey.from_private_key_file(dest_key) if dest_key else None - while time.time() < deadline: - time.sleep(3) - try: - t2 = paramiko.Transport((dest_host, dest_port)) - if key: - t2.connect(username=dest_user, pkey=key) - else: - t2.connect(username=dest_user) - s2 = paramiko.SFTPClient.from_transport(t2) - files = s2.listdir(dest_folder) - s2.close(); t2.close() - if probe_filename in files: - log.info(f" Probe arrived. TM healthy.") - return True - except Exception as e: - log.debug(f" Destination check: {e}") - raise TMHealthError( - f"Probe '{probe_filename}' not found in '{dest_folder}' after {timeout}s. " - f"TM may not be running or subscription not registered. " - f"Try: ST Admin UI → Server Configuration → Transaction Manager → Restart" - ) + def link_subscription(self, composite_id, sub_id): + """Link a subscription to a COMPOSITE route.""" + full = self._get(f"/routes/{composite_id}") + current = full.get("subscriptions", []) + if sub_id in current: + log.info(f"Subscription {sub_id} already linked to composite {composite_id}") + return + full["subscriptions"] = current + [sub_id] + # Remove read-only fields that cause PUT rejection + for field in ("id", "metadata", "steps", "routeTemplateName", "account"): + full.pop(field, None) + self._put(f"/routes/{composite_id}", full) + log.info(f"Linked subscription {sub_id} → composite {composite_id}") + + def delete_route(self, route_id): + """Delete a route by ID.""" + self._delete(f"/routes/{route_id}") + log.info(f"Deleted route {route_id}") + + def get_route(self, name): + """Fetch a route by name. Returns full object or None.""" + return self._find("/routes", name) + + def update_route(self, route_id, updates): + """ + Apply a dict of field updates to a route via GET → merge → PUT. + Handles read-only field stripping automatically. + """ + full = self._get(f"/routes/{route_id}") + full.update(updates) + for field in ("id", "metadata", "steps", "routeTemplateName", "account"): + full.pop(field, None) + return self._put(f"/routes/{route_id}", full) -class TMHealthError(Exception): - pass +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _err(response): + try: + return response.json() + except Exception: + return response.text # --------------------------------------------------------------------------- @@ -514,18 +462,26 @@ if __name__ == "__main__": parser.add_argument("--user", default="admin") parser.add_argument("--pass", dest="password", default="openclaw") sub = parser.add_subparsers(dest="cmd") - sub.add_parser("snapshot") - - d = sub.add_parser("delete-flow") - d.add_argument("name") + log_p = sub.add_parser("logs") + log_p.add_argument("--limit", type=int, default=20) + log_p.add_argument("--offset", type=int, default=0) args = parser.parse_args() st = STClient(args.host, args.user, args.password, args.port) if args.cmd == "snapshot": print(json.dumps(st.snapshot(), indent=2)) - elif args.cmd == "delete-flow": - st.delete_ar_flow(args.name) + elif args.cmd == "logs": + entries = st.get_transfer_logs(limit=args.limit, offset=args.offset) + for e in entries: + print(json.dumps({ + "time": e.get("startTime"), + "account": e.get("account"), + "file": e.get("filename"), + "status": e.get("status"), + "dir": e.get("remoteDir"), + "app": e.get("application"), + })) else: parser.print_help()