#!/usr/bin/env python3 """ st_client.py — SecureTransport REST API Primitives Thin, correct bindings to the ST v2.0 REST API. No flow architecture opinions here — composition is the caller's responsibility. Usage: import sys sys.path.insert(0, '/home/node/.openclaw/site-packages') from st_client import STClient, CompressStep, PgpEncryptStep, SendToPartnerStep st = STClient("192.168.0.245", "admin", "openclaw") snap = st.snapshot() app_id = st.create_application("myflow-ar") sub_id = st.create_subscription("conan", "/myflow", "myflow-ar") simple_id = st.create_simple_route("myflow-simple", steps=[ PgpEncryptStep(filter="*.jpg", key="my-pgp-cert", key_owner="conan"), CompressStep(filter="*.txt"), SendToPartnerStep(site="my-site", target_account="partner"), ]) template_id = st.create_template_route("myflow-template", simple_id) composite_id = st.create_composite_route("myflow-composite", template_id) st.link_subscription(composite_id, sub_id) Known API quirks encoded here (so you don't rediscover them): - TEMPLATE route: must use data= not json= to avoid newline parse issue in conditionType - conditionType must be explicit on every route and step; null causes NPE at runtime - SendToPartner: transferSiteExpression requires #!#CVD#!# suffix for LIST type - PgpEncryption: encryptKeyExpressionType only accepts ALIAS or EXPRESSION_WILDCARD - PgpEncryption: cert must have accessLevel=PUBLIC and account= or routing engine can't see it - Compress: singleArchiveEnabled=null treated as true by UI; always set explicitly - Compress: compressionType "0" on PgpEncryptStep disables PGP-level compression (not a bug) - SendToPartner: target_account controls whose sites are used — must match site owner's account - POST /applications returns empty body on success; follow-up GET required for ID """ import sys import json import logging sys.path.insert(0, '/home/node/.openclaw/site-packages') import requests import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) log = logging.getLogger("st_client") # --------------------------------------------------------------------------- # Step definitions — encode all field quirks, expose only meaningful params # --------------------------------------------------------------------------- class CompressStep: """Compress matching files to ZIP. Non-matching files pass through.""" def __init__(self, filter="*", level="NORMAL", single_archive=False, archive_name=None, on_failure="PROCEED"): self.filter = filter self.level = level self.single_archive = single_archive self.archive_name = archive_name # Required when single_archive=True self.on_failure = on_failure def to_api(self): payload = { "type": "Compress", "status": "ENABLED", "conditionType": "ALWAYS", "actionOnStepFailure": self.on_failure, "fileFilterExpression": self.filter, "fileFilterExpressionType": "GLOB", "usePrecedingStepFiles": False, "compressionType": "ZIP", # Validated values: STORE FASTEST FAST NORMAL GOOD BETTER BEST "compressionLevel": self.level, # null is treated as true by the UI — always set explicitly "singleArchiveEnabled": self.single_archive, "zipPassword": "", } if self.single_archive and self.archive_name: payload["singleArchiveName"] = self.archive_name return payload class PgpEncryptStep: """PGP-encrypt matching files. Cert must be PUBLIC and owned by key_owner.""" def __init__(self, filter="*", key=None, key_owner=None, ascii_armour=False, on_failure="PROCEED"): if not key: raise ValueError("PgpEncryptStep requires key (cert alias name)") if not key_owner: raise ValueError("PgpEncryptStep requires key_owner (account name that owns the cert)") self.filter = filter self.key = key self.key_owner = key_owner self.ascii_armour = ascii_armour self.on_failure = on_failure def to_api(self): return { "type": "PgpEncryption", "status": "ENABLED", "conditionType": "ALWAYS", "actionOnStepFailure": self.on_failure, "fileFilterExpression": self.filter, "fileFilterExpressionType": "GLOB", "usePrecedingStepFiles": False, # ALIAS = use cert name directly; EXPRESSION_WILDCARD = EL expression # No other values accepted — will silently fail with PROCEED "encryptKeyExpression": self.key, "encryptKeyExpressionType": "ALIAS", "encryptKeyOwnerExpression": self.key_owner, "encryptKeyOwnerExpressionType": "NAME", # "0" disables PGP-level compression (ZIP handled by CompressStep) "compressionType": "0", "useAsciiArmour": self.ascii_armour, } class SendToPartnerStep: """Deliver files to a partner transfer site. target_account: the account that owns the transfer site. The AR flow runs in the inbound account's context — if the site belongs to a different account (e.g. a dedicated partner account), set target_account to that account name. Omit if the site is owned by the same account running the flow. """ def __init__(self, site=None, filter="*", target_account=None, on_failure="FAIL"): if not site: raise ValueError("SendToPartnerStep requires site (transfer site name)") self.site = site self.filter = filter self.target_account = target_account self.on_failure = on_failure def to_api(self): payload = { "type": "SendToPartner", "status": "ENABLED", "conditionType": "ALWAYS", "actionOnStepFailure": self.on_failure, "fileFilterExpression": self.filter, "fileFilterExpressionType": "GLOB", "usePrecedingStepFiles": False, # LIST type requires #!#CVD#!# suffix — not in public docs "transferSiteExpression": f"{self.site}#!#CVD#!#", "transferSiteExpressionType": "LIST", } if self.target_account: payload["targetAccountExpression"] = self.target_account payload["targetAccountExpressionType"] = "NAME" return payload # --------------------------------------------------------------------------- # Main client — primitives only # --------------------------------------------------------------------------- class STClient: """ SecureTransport REST API client — primitive operations only. Handles auth, TLS quirks, and empty-body responses. Callers are responsible for object ordering and composition. """ def __init__(self, host, user, password, port=444, verify=False): self.base = f"https://{host}:{port}/api/v2.0" self.session = requests.Session() self.session.auth = (user, password) self.session.verify = verify self.session.headers.update({"Accept": "application/json", "Content-Type": "application/json"}) # -- Low-level HTTP helpers --------------------------------------------- def _get(self, path): r = self.session.get(f"{self.base}{path}") r.raise_for_status() return r.json() def _post(self, path, body): r = self.session.post(f"{self.base}{path}", json=body) if r.status_code not in (200, 201, 204): raise RuntimeError(f"POST {path} → {r.status_code}: {_err(r)}") return r.json() if r.content.strip() else {} def _post_raw(self, path, raw_str): """POST with pre-serialised string body — avoids requests re-serialising.""" r = self.session.post(f"{self.base}{path}", data=raw_str) if r.status_code not in (200, 201, 204): raise RuntimeError(f"POST {path} → {r.status_code}: {_err(r)}") return r.json() if r.content.strip() else {} def _put(self, path, body): r = self.session.put(f"{self.base}{path}", json=body) if r.status_code not in (200, 201, 204): raise RuntimeError(f"PUT {path} → {r.status_code}: {_err(r)}") return r.json() if r.content.strip() else {} def _delete(self, path): r = self.session.delete(f"{self.base}{path}") if r.status_code not in (200, 204): raise RuntimeError(f"DELETE {path} → {r.status_code}: {_err(r)}") def _find(self, path, name): """Return first result with matching name, or None.""" data = self._get(f"{path}?name={requests.utils.quote(name)}&limit=10") for item in data.get("result", []): if item.get("name") == name: return item return None # -- Introspection ------------------------------------------------------ def snapshot(self): """Live environment summary — run before every build.""" accounts = self._get("/accounts?limit=200").get("result", []) sites = self._get("/sites?limit=100").get("result", []) certs = self._get("/certificates?limit=200").get("result", []) apps = self._get("/applications?limit=100").get("result", []) subs = self._get("/subscriptions?limit=100").get("result", []) return { "accounts": [ {"name": a["name"], "type": a.get("type"), "locked": a.get("user", {}).get("locked", False)} for a in accounts if a.get("type") == "user" ], "sites": [ {"name": s["name"], "account": s.get("account"), "protocol": s.get("protocol"), "host": s.get("host"), "port": s.get("port")} for s in sites ], "certificates": [ {"name": c["name"], "type": c.get("type"), "usage": c.get("usage"), "account": c.get("account"), "accessLevel": c.get("accessLevel")} for c in certs if c.get("type") in ("pgp", "ssh") ], "ar_applications": [ {"name": a["name"], "id": a["id"]} for a in apps if a.get("type") == "AdvancedRouting" ], "subscriptions": [ {"id": s["id"], "account": s.get("account"), "folder": s.get("folder"), "application": s.get("application")} for s in subs ], } def get_transfer_logs(self, limit=20, offset=0): """ Fetch transfer log entries. Correct endpoint: /logs/transfers (not /transfers, /filetracking, or /logs/files — all 404). Returns raw result list. """ data = self._get(f"/logs/transfers?limit={limit}&offset={offset}") return data.get("result", []) def get_transfer_detail(self, url_representation): """Fetch full transfer detail including subtransmission step breakdown.""" return self._get(f"/logs/transfers/{url_representation}") # -- Applications ------------------------------------------------------- def create_application(self, name, app_type="AdvancedRouting"): """Create an AR application. Returns the application ID.""" existing = self._find("/applications", name) if existing: log.info(f"Application '{name}' already exists: {existing['id']}") return existing["id"] self._post("/applications", {"name": name, "type": app_type}) app = self._find("/applications", name) log.info(f"Created application '{name}': {app['id']}") return app["id"] def delete_application(self, name): """Delete an application by name.""" app = self._find("/applications", name) if app: self._delete(f"/applications/{app['id']}") log.info(f"Deleted application '{name}'") # -- Subscriptions ------------------------------------------------------ def create_subscription(self, account, folder, app_name): """ Create an AR subscription linking account+folder to an application. The PTA block below is required for client SFTP upload to trigger routing. triggerFileOption='fail' is standard; submitFilenamePatternExpression only affects server-initiated pulls, not client uploads. Returns the subscription ID. """ subs = self._get(f"/subscriptions?account={account}&limit=100").get("result", []) for s in subs: if s.get("folder") == folder and s.get("application") == app_name: log.info(f"Subscription '{account}{folder}→{app_name}' already exists: {s['id']}") return s["id"] self._post("/subscriptions", { "type": "AdvancedRouting", "folder": folder, "account": account, "application": app_name, "postTransmissionActions": { "ptaOnSuccessDoInAdvancedRoutingWildcardPull": True, "submitFilenamePatternExpression": "*", "submitFilterType": "FILENAME_PATTERN", "triggerFileOption": "fail", "triggerOnSuccessfulWildcardPull": True, }, }) subs = self._get(f"/subscriptions?account={account}&limit=100").get("result", []) sub = next(s for s in subs if s.get("folder") == folder and s.get("application") == app_name) log.info(f"Created subscription '{account}{folder}→{app_name}': {sub['id']}") return sub["id"] def delete_subscription(self, sub_id): """Delete a subscription by ID.""" self._delete(f"/subscriptions/{sub_id}") log.info(f"Deleted subscription {sub_id}") # -- Routes ------------------------------------------------------------- def create_simple_route(self, name, steps, condition_type="ALWAYS", condition=None): """ Create a SIMPLE route with the given steps. Steps are auto-chained in array order. Returns the route ID. condition_type: ALWAYS | EL condition: EL expression string (only when condition_type=EL) e.g. "${transfer.target.matches('.*\\\\.txt')}" Note: double-escape the dot in Java regex within Python strings. """ existing = self._find("/routes", name) if existing: log.info(f"SIMPLE route '{name}' already exists: {existing['id']}") return existing["id"] body = { "name": name, "type": "SIMPLE", "conditionType": condition_type, "steps": [s.to_api() for s in steps], } if condition: body["condition"] = condition self._post("/routes", body) route = self._find("/routes", name) log.info(f"Created SIMPLE route '{name}': {route['id']}") return route["id"] def create_template_route(self, name, simple_route_id, condition_type="MATCH_ALL"): """ Create a TEMPLATE route that executes a SIMPLE route. Returns the route ID. Quirk: must be sent as pre-serialised string (data= not json=) to avoid requests adding newlines that break the conditionType parser. """ existing = self._find("/routes", name) if existing: log.info(f"TEMPLATE route '{name}' already exists: {existing['id']}") return existing["id"] payload = json.dumps({ "name": name, "type": "TEMPLATE", "conditionType": condition_type, "steps": [{ "type": "ExecuteRoute", "status": "ENABLED", "executeRoute": simple_route_id, "autostart": False, }] }) self._post_raw("/routes", payload) route = self._find("/routes", name) log.info(f"Created TEMPLATE route '{name}': {route['id']}") return route["id"] def create_composite_route(self, name, template_id, condition_type="MATCH_ALL"): """ Create a COMPOSITE route bound to a TEMPLATE. Returns the route ID. Subscriptions are linked separately via link_subscription(). """ existing = self._find("/routes", name) if existing: log.info(f"COMPOSITE route '{name}' already exists: {existing['id']}") return existing["id"] self._post("/routes", { "name": name, "type": "COMPOSITE", "conditionType": condition_type, "routeTemplate": template_id, "steps": [], }) route = self._find("/routes", name) log.info(f"Created COMPOSITE route '{name}': {route['id']}") return route["id"] def link_subscription(self, composite_id, sub_id): """Link a subscription to a COMPOSITE route.""" full = self._get(f"/routes/{composite_id}") current = full.get("subscriptions", []) if sub_id in current: log.info(f"Subscription {sub_id} already linked to composite {composite_id}") return full["subscriptions"] = current + [sub_id] # Remove read-only fields that cause PUT rejection for field in ("id", "metadata", "steps", "routeTemplateName", "account"): full.pop(field, None) self._put(f"/routes/{composite_id}", full) log.info(f"Linked subscription {sub_id} → composite {composite_id}") def delete_route(self, route_id): """Delete a route by ID.""" self._delete(f"/routes/{route_id}") log.info(f"Deleted route {route_id}") def get_route(self, name): """Fetch a route by name. Returns full object or None.""" return self._find("/routes", name) def update_route(self, route_id, updates): """ Apply a dict of field updates to a route via GET → merge → PUT. Handles read-only field stripping automatically. """ full = self._get(f"/routes/{route_id}") full.update(updates) for field in ("id", "metadata", "steps", "routeTemplateName", "account"): full.pop(field, None) return self._put(f"/routes/{route_id}", full) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _err(response): try: return response.json() except Exception: return response.text # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- if __name__ == "__main__": import argparse logging.basicConfig(level=logging.INFO, format="%(message)s") parser = argparse.ArgumentParser(description="ST Client CLI") parser.add_argument("--host", required=True) parser.add_argument("--port", type=int, default=444) parser.add_argument("--user", default="admin") parser.add_argument("--pass", dest="password", default="openclaw") sub = parser.add_subparsers(dest="cmd") sub.add_parser("snapshot") log_p = sub.add_parser("logs") log_p.add_argument("--limit", type=int, default=20) log_p.add_argument("--offset", type=int, default=0) args = parser.parse_args() st = STClient(args.host, args.user, args.password, args.port) if args.cmd == "snapshot": print(json.dumps(st.snapshot(), indent=2)) elif args.cmd == "logs": entries = st.get_transfer_logs(limit=args.limit, offset=args.offset) for e in entries: print(json.dumps({ "time": e.get("startTime"), "account": e.get("account"), "file": e.get("filename"), "status": e.get("status"), "dir": e.get("remoteDir"), "app": e.get("application"), })) else: parser.print_help()