refactor: primitives-only client, drop create_ar_flow factory

This commit is contained in:
2026-03-04 17:36:02 +00:00
parent 55ad444c04
commit e827c18433

View File

@@ -1,10 +1,9 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
st_client.py — SecureTransport AR Flow Client Library st_client.py — SecureTransport REST API Primitives
Abstracts the ST REST API quirks so callers work with typed objects, Thin, correct bindings to the ST v2.0 REST API.
not raw JSON payloads. All known field requirements, mandatory defaults, No flow architecture opinions here — composition is the caller's responsibility.
and undocumented formats are encoded here.
Usage: Usage:
import sys import sys
@@ -12,28 +11,37 @@ Usage:
from st_client import STClient, CompressStep, PgpEncryptStep, SendToPartnerStep from st_client import STClient, CompressStep, PgpEncryptStep, SendToPartnerStep
st = STClient("192.168.0.245", "admin", "openclaw") st = STClient("192.168.0.245", "admin", "openclaw")
flow = st.create_ar_flow( snap = st.snapshot()
name="floweng",
account="conan", app_id = st.create_application("myflow-ar")
folder="/floweng", sub_id = st.create_subscription("conan", "/myflow", "myflow-ar")
steps=[ simple_id = st.create_simple_route("myflow-simple", steps=[
PgpEncryptStep(filter="*.jpg", key="test-pgp-public", key_owner="conan"), PgpEncryptStep(filter="*.jpg", key="my-pgp-cert", key_owner="conan"),
CompressStep(filter="*.txt"), CompressStep(filter="*.txt"),
SendToPartnerStep(site="clawdbox-partner-site-1771557836"), SendToPartnerStep(site="my-site", target_account="partner"),
] ])
) template_id = st.create_template_route("myflow-template", simple_id)
st.delete_ar_flow("floweng") composite_id = st.create_composite_route("myflow-composite", template_id)
st.link_subscription(composite_id, sub_id)
Known API quirks encoded here (so you don't rediscover them):
- TEMPLATE route: must use data= not json= to avoid newline parse issue in conditionType
- conditionType must be explicit on every route and step; null causes NPE at runtime
- SendToPartner: transferSiteExpression requires #!#CVD#!# suffix for LIST type
- PgpEncryption: encryptKeyExpressionType only accepts ALIAS or EXPRESSION_WILDCARD
- PgpEncryption: cert must have accessLevel=PUBLIC and account=<owner> or routing engine can't see it
- Compress: singleArchiveEnabled=null treated as true by UI; always set explicitly
- Compress: compressionType "0" on PgpEncryptStep disables PGP-level compression (not a bug)
- SendToPartner: target_account controls whose sites are used — must match site owner's account
- POST /applications returns empty body on success; follow-up GET required for ID
""" """
import sys import sys
import json import json
import time
import io
import logging import logging
sys.path.insert(0, '/home/node/.openclaw/site-packages') sys.path.insert(0, '/home/node/.openclaw/site-packages')
import requests import requests
import paramiko
import urllib3 import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@@ -42,7 +50,7 @@ log = logging.getLogger("st_client")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Step definitions # Step definitions — encode all field quirks, expose only meaningful params
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class CompressStep: class CompressStep:
@@ -55,7 +63,7 @@ class CompressStep:
self.on_failure = on_failure self.on_failure = on_failure
def to_api(self): def to_api(self):
payload = { return {
"type": "Compress", "type": "Compress",
"status": "ENABLED", "status": "ENABLED",
"conditionType": "ALWAYS", "conditionType": "ALWAYS",
@@ -64,22 +72,22 @@ class CompressStep:
"fileFilterExpressionType": "GLOB", "fileFilterExpressionType": "GLOB",
"usePrecedingStepFiles": False, "usePrecedingStepFiles": False,
"compressionType": "ZIP", "compressionType": "ZIP",
# Empirically validated values: STORE FASTEST FAST NORMAL GOOD BETTER BEST # Validated values: STORE FASTEST FAST NORMAL GOOD BETTER BEST
"compressionLevel": self.level, "compressionLevel": self.level,
# null is treated as true by the UI — always set explicitly
"singleArchiveEnabled": self.single_archive, "singleArchiveEnabled": self.single_archive,
"zipPassword": "", "zipPassword": "",
} }
return payload
class PgpEncryptStep: class PgpEncryptStep:
"""PGP-encrypt matching files. Non-matching files pass through.""" """PGP-encrypt matching files. Cert must be PUBLIC and owned by key_owner."""
def __init__(self, filter="*", key=None, key_owner=None, def __init__(self, filter="*", key=None, key_owner=None,
ascii_armour=False, on_failure="PROCEED"): ascii_armour=False, on_failure="PROCEED"):
if not key: if not key:
raise ValueError("PgpEncryptStep requires key (cert alias name)") raise ValueError("PgpEncryptStep requires key (cert alias name)")
if not key_owner: if not key_owner:
raise ValueError("PgpEncryptStep requires key_owner (account name)") raise ValueError("PgpEncryptStep requires key_owner (account name that owns the cert)")
self.filter = filter self.filter = filter
self.key = key self.key = key
self.key_owner = key_owner self.key_owner = key_owner
@@ -96,27 +104,36 @@ class PgpEncryptStep:
"fileFilterExpressionType": "GLOB", "fileFilterExpressionType": "GLOB",
"usePrecedingStepFiles": False, "usePrecedingStepFiles": False,
# ALIAS = use cert name directly; EXPRESSION_WILDCARD = EL expression # ALIAS = use cert name directly; EXPRESSION_WILDCARD = EL expression
# No other values accepted — will silently fail with PROCEED
"encryptKeyExpression": self.key, "encryptKeyExpression": self.key,
"encryptKeyExpressionType": "ALIAS", "encryptKeyExpressionType": "ALIAS",
"encryptKeyOwnerExpression": self.key_owner, "encryptKeyOwnerExpression": self.key_owner,
"encryptKeyOwnerExpressionType": "NAME", "encryptKeyOwnerExpressionType": "NAME",
# "0" disables PGP-level compression (use Compress step instead) # "0" disables PGP-level compression (ZIP handled by CompressStep)
"compressionType": "0", "compressionType": "0",
"useAsciiArmour": self.ascii_armour, "useAsciiArmour": self.ascii_armour,
} }
class SendToPartnerStep: class SendToPartnerStep:
"""Deliver all files in route payload to a partner transfer site.""" """Deliver files to a partner transfer site.
def __init__(self, site=None, filter="*", on_failure="FAIL"):
target_account: the account that owns the transfer site. The AR flow runs
in the inbound account's context — if the site belongs to a different account
(e.g. a dedicated partner account), set target_account to that account name.
Omit if the site is owned by the same account running the flow.
"""
def __init__(self, site=None, filter="*", target_account=None,
on_failure="FAIL"):
if not site: if not site:
raise ValueError("SendToPartnerStep requires site (transfer site name)") raise ValueError("SendToPartnerStep requires site (transfer site name)")
self.site = site self.site = site
self.filter = filter self.filter = filter
self.target_account = target_account
self.on_failure = on_failure self.on_failure = on_failure
def to_api(self): def to_api(self):
return { payload = {
"type": "SendToPartner", "type": "SendToPartner",
"status": "ENABLED", "status": "ENABLED",
"conditionType": "ALWAYS", "conditionType": "ALWAYS",
@@ -124,98 +141,68 @@ class SendToPartnerStep:
"fileFilterExpression": self.filter, "fileFilterExpression": self.filter,
"fileFilterExpressionType": "GLOB", "fileFilterExpressionType": "GLOB",
"usePrecedingStepFiles": False, "usePrecedingStepFiles": False,
# API requires LIST type with #!#CVD#!# suffix — not documented publicly # LIST type requires #!#CVD#!# suffix — not in public docs
"transferSiteExpression": f"{self.site}#!#CVD#!#", "transferSiteExpression": f"{self.site}#!#CVD#!#",
"transferSiteExpressionType": "LIST", "transferSiteExpressionType": "LIST",
} }
if self.target_account:
payload["targetAccountExpression"] = self.target_account
payload["targetAccountExpressionType"] = "NAME"
return payload
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# AR Flow result object # Main client — primitives only
# ---------------------------------------------------------------------------
class ARFlow:
"""Holds IDs for all objects in a created AR flow."""
def __init__(self, name, app_id, sub_id, simple_id, template_id, composite_id):
self.name = name
self.app_id = app_id
self.sub_id = sub_id
self.simple_id = simple_id
self.template_id = template_id
self.composite_id = composite_id
def __repr__(self):
return (f"ARFlow({self.name!r}, app={self.app_id}, sub={self.sub_id}, "
f"simple={self.simple_id}, template={self.template_id}, "
f"composite={self.composite_id})")
# ---------------------------------------------------------------------------
# Main client
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class STClient: class STClient:
""" """
SecureTransport REST API client. SecureTransport REST API client — primitive operations only.
Handles auth, TLS, empty-body responses, and the AR object graph. Handles auth, TLS quirks, and empty-body responses.
All API quirks are encoded here — callers work with typed objects. Callers are responsible for object ordering and composition.
""" """
def __init__(self, host, user, password, port=444, verify=False): def __init__(self, host, user, password, port=444, verify=False):
self.base = f"https://{host}:{port}/api/v2.0" self.base = f"https://{host}:{port}/api/v2.0"
self.base_v14 = f"https://{host}:{port}/api/v1.4"
self.session = requests.Session() self.session = requests.Session()
self.session.auth = (user, password) self.session.auth = (user, password)
self.session.verify = verify self.session.verify = verify
self.session.headers.update({"Accept": "application/json", self.session.headers.update({"Accept": "application/json",
"Content-Type": "application/json"}) "Content-Type": "application/json"})
self._host = host
self._port = port
# -- Low-level helpers -------------------------------------------------- # -- Low-level HTTP helpers ---------------------------------------------
def _get(self, path, v14=False): def _get(self, path):
base = self.base_v14 if v14 else self.base r = self.session.get(f"{self.base}{path}")
r = self.session.get(f"{base}{path}")
r.raise_for_status() r.raise_for_status()
return r.json() return r.json()
def _post(self, path, body): def _post(self, path, body):
r = self.session.post(f"{self.base}{path}", json=body) r = self.session.post(f"{self.base}{path}", json=body)
if r.status_code not in (200, 201, 204): if r.status_code not in (200, 201, 204):
try: raise RuntimeError(f"POST {path}{r.status_code}: {_err(r)}")
detail = r.json() return r.json() if r.content.strip() else {}
except Exception:
detail = r.text def _post_raw(self, path, raw_str):
raise RuntimeError(f"POST {path} failed {r.status_code}: {detail}") """POST with pre-serialised string body — avoids requests re-serialising."""
# Some endpoints return empty body on success r = self.session.post(f"{self.base}{path}", data=raw_str)
if r.content.strip(): if r.status_code not in (200, 201, 204):
return r.json() raise RuntimeError(f"POST {path}{r.status_code}: {_err(r)}")
return {} return r.json() if r.content.strip() else {}
def _put(self, path, body): def _put(self, path, body):
r = self.session.put(f"{self.base}{path}", json=body) r = self.session.put(f"{self.base}{path}", json=body)
if r.status_code not in (200, 201, 204): if r.status_code not in (200, 201, 204):
try: raise RuntimeError(f"PUT {path}{r.status_code}: {_err(r)}")
detail = r.json() return r.json() if r.content.strip() else {}
except Exception:
detail = r.text
raise RuntimeError(f"PUT {path} failed {r.status_code}: {detail}")
if r.content.strip():
return r.json()
return {}
def _delete(self, path): def _delete(self, path):
r = self.session.delete(f"{self.base}{path}") r = self.session.delete(f"{self.base}{path}")
if r.status_code not in (200, 204): if r.status_code not in (200, 204):
try: raise RuntimeError(f"DELETE {path}{r.status_code}: {_err(r)}")
detail = r.json()
except Exception:
detail = r.text
raise RuntimeError(f"DELETE {path} failed {r.status_code}: {detail}")
def _find_by_name(self, path, name): def _find(self, path, name):
"""Return first result with matching name, or None.""" """Return first result with matching name, or None."""
data = self._get(f"{path}?name={requests.utils.quote(name)}&limit=10") data = self._get(f"{path}?name={requests.utils.quote(name)}&limit=10")
for item in data.get("result", []): for item in data.get("result", []):
@@ -223,281 +210,242 @@ class STClient:
return item return item
return None return None
# -- Snapshot ----------------------------------------------------------- # -- Introspection ------------------------------------------------------
def snapshot(self): def snapshot(self):
"""Return a summary of the live ST environment.""" """Live environment summary — run before every build."""
accounts_raw = self._get("/accounts?limit=200").get("result", []) accounts = self._get("/accounts?limit=200").get("result", [])
sites_raw = self._get("/sites?limit=100").get("result", []) sites = self._get("/sites?limit=100").get("result", [])
certs_raw = self._get("/certificates?limit=200").get("result", []) certs = self._get("/certificates?limit=200").get("result", [])
apps_raw = self._get("/applications?limit=100").get("result", []) apps = self._get("/applications?limit=100").get("result", [])
subs_raw = self._get("/subscriptions?limit=100").get("result", []) subs = self._get("/subscriptions?limit=100").get("result", [])
return { return {
"accounts": [ "accounts": [
{"name": a["name"], "type": a.get("type"), {"name": a["name"], "type": a.get("type"),
"locked": a.get("user", {}).get("locked", False)} "locked": a.get("user", {}).get("locked", False)}
for a in accounts_raw if a.get("type") in ("user",) for a in accounts if a.get("type") == "user"
], ],
"sites": [ "sites": [
{"name": s["name"], "protocol": s.get("protocol"), {"name": s["name"], "account": s.get("account"),
"protocol": s.get("protocol"),
"host": s.get("host"), "port": s.get("port")} "host": s.get("host"), "port": s.get("port")}
for s in sites_raw for s in sites
], ],
"certificates": [ "certificates": [
{"name": c["name"], "type": c.get("type"), {"name": c["name"], "type": c.get("type"),
"usage": c.get("usage"), "account": c.get("account"), "usage": c.get("usage"), "account": c.get("account"),
"accessLevel": c.get("accessLevel")} "accessLevel": c.get("accessLevel")}
for c in certs_raw for c in certs if c.get("type") in ("pgp", "ssh")
if c.get("type") in ("pgp", "ssh")
], ],
"ar_applications": [ "ar_applications": [
{"name": a["name"], "id": a["id"]} {"name": a["name"], "id": a["id"]}
for a in apps_raw for a in apps if a.get("type") == "AdvancedRouting"
if a.get("type") == "AdvancedRouting"
], ],
"subscriptions": [ "subscriptions": [
{"id": s["id"], "account": s.get("account"), {"id": s["id"], "account": s.get("account"),
"folder": s.get("folder"), "application": s.get("application")} "folder": s.get("folder"), "application": s.get("application")}
for s in subs_raw for s in subs
], ],
} }
# -- AR Flow lifecycle -------------------------------------------------- def get_transfer_logs(self, limit=20, offset=0):
def create_ar_flow(self, name, account, folder, steps,
pta_trigger=True):
""" """
Create a complete AR flow: Application → Subscription → Fetch transfer log entries. Correct endpoint: /logs/transfers
SIMPLE route → TEMPLATE route → COMPOSITE route → link. (not /transfers, /filetracking, or /logs/files — all 404).
Returns raw result list.
Returns an ARFlow object with all created IDs.
Raises on any step failure — no partial state left unchecked.
""" """
app_name = f"{name}-ar" data = self._get(f"/logs/transfers?limit={limit}&offset={offset}")
simple_name = f"{name}-simple" return data.get("result", [])
template_name = f"{name}-template"
composite_name = f"{name}-composite"
log.info(f"Creating AR Application: {app_name}") def get_transfer_detail(self, url_representation):
existing_app = self._find_by_name("/applications", app_name) """Fetch full transfer detail including subtransmission step breakdown."""
if existing_app: return self._get(f"/logs/transfers/{url_representation}")
log.info(f" Application already exists: {existing_app['id']}")
app_id = existing_app["id"]
else:
self._post("/applications", {"name": app_name, "type": "AdvancedRouting"})
app_id = self._find_by_name("/applications", app_name)["id"]
log.info(f" app_id={app_id}")
log.info(f"Creating Subscription: account={account} folder={folder}") # -- Applications -------------------------------------------------------
existing_sub = None
def create_application(self, name, app_type="AdvancedRouting"):
"""Create an AR application. Returns the application ID."""
existing = self._find("/applications", name)
if existing:
log.info(f"Application '{name}' already exists: {existing['id']}")
return existing["id"]
self._post("/applications", {"name": name, "type": app_type})
app = self._find("/applications", name)
log.info(f"Created application '{name}': {app['id']}")
return app["id"]
def delete_application(self, name):
"""Delete an application by name."""
app = self._find("/applications", name)
if app:
self._delete(f"/applications/{app['id']}")
log.info(f"Deleted application '{name}'")
# -- Subscriptions ------------------------------------------------------
def create_subscription(self, account, folder, app_name):
"""
Create an AR subscription linking account+folder to an application.
The PTA block below is required for client SFTP upload to trigger routing.
triggerFileOption='fail' is standard; submitFilenamePatternExpression
only affects server-initiated pulls, not client uploads.
Returns the subscription ID.
"""
subs = self._get(f"/subscriptions?account={account}&limit=100").get("result", []) subs = self._get(f"/subscriptions?account={account}&limit=100").get("result", [])
for s in subs: for s in subs:
if s.get("folder") == folder and s.get("application") == app_name: if s.get("folder") == folder and s.get("application") == app_name:
existing_sub = s log.info(f"Subscription '{account}{folder}{app_name}' already exists: {s['id']}")
break return s["id"]
if existing_sub:
log.info(f" Subscription already exists: {existing_sub['id']}")
sub_id = existing_sub["id"]
else:
pta = {
"ptaOnSuccessDoInAdvancedRoutingWildcardPull": pta_trigger,
"submitFilenamePatternExpression": "*",
"submitFilterType": "FILENAME_PATTERN",
"triggerFileOption": "fail",
"triggerOnSuccessfulWildcardPull": pta_trigger,
}
self._post("/subscriptions", { self._post("/subscriptions", {
"type": "AdvancedRouting", "type": "AdvancedRouting",
"folder": folder, "folder": folder,
"account": account, "account": account,
"application": app_name, "application": app_name,
"postTransmissionActions": pta, "postTransmissionActions": {
"ptaOnSuccessDoInAdvancedRoutingWildcardPull": True,
"submitFilenamePatternExpression": "*",
"submitFilterType": "FILENAME_PATTERN",
"triggerFileOption": "fail",
"triggerOnSuccessfulWildcardPull": True,
},
}) })
subs = self._get(f"/subscriptions?account={account}&limit=100").get("result", []) subs = self._get(f"/subscriptions?account={account}&limit=100").get("result", [])
sub_id = next(s["id"] for s in subs sub = next(s for s in subs
if s.get("folder") == folder and s.get("application") == app_name) if s.get("folder") == folder and s.get("application") == app_name)
log.info(f" sub_id={sub_id}") log.info(f"Created subscription '{account}{folder}{app_name}': {sub['id']}")
return sub["id"]
log.info(f"Creating SIMPLE route: {simple_name}") def delete_subscription(self, sub_id):
existing_simple = self._find_by_name("/routes", simple_name) """Delete a subscription by ID."""
if existing_simple: self._delete(f"/subscriptions/{sub_id}")
log.info(f" SIMPLE already exists: {existing_simple['id']}") log.info(f"Deleted subscription {sub_id}")
simple_id = existing_simple["id"]
else: # -- Routes -------------------------------------------------------------
step_payloads = [s.to_api() for s in steps]
self._post("/routes", { def create_simple_route(self, name, steps, condition_type="ALWAYS", condition=None):
"name": simple_name, """
Create a SIMPLE route with the given steps.
Steps are auto-chained in array order.
Returns the route ID.
condition_type: ALWAYS | EL
condition: EL expression string (only when condition_type=EL)
e.g. "${transfer.target.matches('.*\\\\.txt')}"
Note: double-escape the dot in Java regex within Python strings.
"""
existing = self._find("/routes", name)
if existing:
log.info(f"SIMPLE route '{name}' already exists: {existing['id']}")
return existing["id"]
body = {
"name": name,
"type": "SIMPLE", "type": "SIMPLE",
"conditionType": "ALWAYS", "conditionType": condition_type,
"steps": step_payloads, "steps": [s.to_api() for s in steps],
}) }
simple_id = self._find_by_name("/routes", simple_name)["id"] if condition:
log.info(f" simple_id={simple_id}") body["condition"] = condition
self._post("/routes", body)
route = self._find("/routes", name)
log.info(f"Created SIMPLE route '{name}': {route['id']}")
return route["id"]
log.info(f"Creating TEMPLATE route: {template_name}") def create_template_route(self, name, simple_route_id,
existing_template = self._find_by_name("/routes", template_name) condition_type="MATCH_ALL"):
if existing_template: """
log.info(f" TEMPLATE already exists: {existing_template['id']}") Create a TEMPLATE route that executes a SIMPLE route.
template_id = existing_template["id"] Returns the route ID.
else:
# TEMPLATE conditionType quirk: must be in payload but API rejects Quirk: must be sent as pre-serialised string (data= not json=) to
# it as "unsupported parameter" in multiline payloads. Send as a avoid requests adding newlines that break the conditionType parser.
# pre-serialised string to avoid requests library re-serialising with """
# newlines. Use data= not json= to control the wire format exactly. existing = self._find("/routes", name)
template_payload = json.dumps({ if existing:
"name": template_name, log.info(f"TEMPLATE route '{name}' already exists: {existing['id']}")
return existing["id"]
payload = json.dumps({
"name": name,
"type": "TEMPLATE", "type": "TEMPLATE",
"conditionType": "MATCH_ALL", "conditionType": condition_type,
"steps": [{"type": "ExecuteRoute", "status": "ENABLED", "steps": [{
"executeRoute": simple_id, "autostart": False}] "type": "ExecuteRoute",
"status": "ENABLED",
"executeRoute": simple_route_id,
"autostart": False,
}]
}) })
r = self.session.post(f"{self.base}/routes", data=template_payload) self._post_raw("/routes", payload)
if r.status_code not in (200, 201, 204): route = self._find("/routes", name)
raise RuntimeError( log.info(f"Created TEMPLATE route '{name}': {route['id']}")
f"POST TEMPLATE failed {r.status_code}: {r.text}") return route["id"]
template_id = self._find_by_name("/routes", template_name)["id"]
log.info(f" template_id={template_id}")
log.info(f"Creating COMPOSITE route: {composite_name}") def create_composite_route(self, name, template_id,
existing_composite = self._find_by_name("/routes", composite_name) condition_type="MATCH_ALL"):
if existing_composite: """
log.info(f" COMPOSITE already exists: {existing_composite['id']}") Create a COMPOSITE route bound to a TEMPLATE.
composite_id = existing_composite["id"] Returns the route ID.
else: Subscriptions are linked separately via link_subscription().
"""
existing = self._find("/routes", name)
if existing:
log.info(f"COMPOSITE route '{name}' already exists: {existing['id']}")
return existing["id"]
self._post("/routes", { self._post("/routes", {
"name": composite_name, "name": name,
"type": "COMPOSITE", "type": "COMPOSITE",
"conditionType": "MATCH_ALL", "conditionType": condition_type,
"routeTemplate": template_id, "routeTemplate": template_id,
"steps": [], "steps": [],
}) })
composite_id = self._find_by_name("/routes", composite_name)["id"] route = self._find("/routes", name)
log.info(f" composite_id={composite_id}") log.info(f"Created COMPOSITE route '{name}': {route['id']}")
return route["id"]
log.info(f"Linking COMPOSITE to subscription") def link_subscription(self, composite_id, sub_id):
comp_full = self._get(f"/routes/{composite_id}") """Link a subscription to a COMPOSITE route."""
if sub_id not in comp_full.get("subscriptions", []): full = self._get(f"/routes/{composite_id}")
comp_full["subscriptions"] = [sub_id] current = full.get("subscriptions", [])
for field in ("id", "metadata", "steps", "routeTemplateName", "account"): if sub_id in current:
comp_full.pop(field, None) log.info(f"Subscription {sub_id} already linked to composite {composite_id}")
self._put(f"/routes/{composite_id}", comp_full) return
log.info(f" Linked.") full["subscriptions"] = current + [sub_id]
else: # Remove read-only fields that cause PUT rejection
log.info(f" Already linked.")
flow = ARFlow(name, app_id, sub_id, simple_id, template_id, composite_id)
log.info(f"Flow created: {flow}")
return flow
def delete_ar_flow(self, name):
"""
Delete a complete AR flow by name prefix.
Deletion order: unlink COMPOSITE → delete COMPOSITE →
delete TEMPLATE → delete SIMPLE → delete subscription → delete application.
"""
composite_name = f"{name}-composite"
template_name = f"{name}-template"
simple_name = f"{name}-simple"
app_name = f"{name}-ar"
# 1. Unlink and delete COMPOSITE
composite = self._find_by_name("/routes", composite_name)
if composite:
cid = composite["id"]
full = self._get(f"/routes/{cid}")
if full.get("subscriptions"):
full["subscriptions"] = []
for field in ("id", "metadata", "steps", "routeTemplateName", "account"): for field in ("id", "metadata", "steps", "routeTemplateName", "account"):
full.pop(field, None) full.pop(field, None)
self._put(f"/routes/{cid}", full) self._put(f"/routes/{composite_id}", full)
log.info(f"Unlinked COMPOSITE {cid}") log.info(f"Linked subscription {sub_id} → composite {composite_id}")
self._delete(f"/routes/{cid}")
log.info(f"Deleted COMPOSITE {cid}")
# 2. Delete TEMPLATE def delete_route(self, route_id):
template = self._find_by_name("/routes", template_name) """Delete a route by ID."""
if template: self._delete(f"/routes/{route_id}")
self._delete(f"/routes/{template['id']}") log.info(f"Deleted route {route_id}")
log.info(f"Deleted TEMPLATE {template['id']}")
# 3. Delete SIMPLE def get_route(self, name):
simple = self._find_by_name("/routes", simple_name) """Fetch a route by name. Returns full object or None."""
if simple: return self._find("/routes", name)
self._delete(f"/routes/{simple['id']}")
log.info(f"Deleted SIMPLE {simple['id']}")
# 4. Delete subscription def update_route(self, route_id, updates):
subs = self._get("/subscriptions?limit=100").get("result", [])
for s in subs:
if s.get("application") == app_name:
self._delete(f"/subscriptions/{s['id']}")
log.info(f"Deleted subscription {s['id']}")
# 5. Delete application
app = self._find_by_name("/applications", app_name)
if app:
self._delete(f"/applications/{app['id']}")
log.info(f"Deleted application {app['id']}")
log.info(f"Flow '{name}' deleted.")
# -- TM health check ----------------------------------------------------
def verify_routing(self, sftp_host, sftp_port, sftp_user, sftp_password,
upload_folder, dest_host, dest_port, dest_user, dest_key,
dest_folder, probe_filename="tm-probe.txt",
timeout=30):
""" """
Upload a probe file via SFTP, wait for routing to deliver it, Apply a dict of field updates to a route via GET → merge → PUT.
verify arrival at destination. Returns True on success. Handles read-only field stripping automatically.
Raises TMHealthError if TM does not process within timeout.
""" """
probe_content = f"tm-probe {time.time()}\n".encode() full = self._get(f"/routes/{route_id}")
full.update(updates)
for field in ("id", "metadata", "steps", "routeTemplateName", "account"):
full.pop(field, None)
return self._put(f"/routes/{route_id}", full)
log.info(f"Uploading probe: {upload_folder}/{probe_filename}")
transport = paramiko.Transport((sftp_host, sftp_port)) # ---------------------------------------------------------------------------
transport.connect(username=sftp_user, password=sftp_password) # Helpers
sftp = paramiko.SFTPClient.from_transport(transport) # ---------------------------------------------------------------------------
def _err(response):
try: try:
sftp.chdir(upload_folder) return response.json()
except IOError: except Exception:
sftp.mkdir(upload_folder) return response.text
sftp.chdir(upload_folder)
sftp.putfo(io.BytesIO(probe_content), probe_filename)
sftp.close()
transport.close()
log.info(" Probe uploaded.")
log.info(f"Waiting for delivery to {dest_folder}/{probe_filename}")
deadline = time.time() + timeout
key = paramiko.RSAKey.from_private_key_file(dest_key) if dest_key else None
while time.time() < deadline:
time.sleep(3)
try:
t2 = paramiko.Transport((dest_host, dest_port))
if key:
t2.connect(username=dest_user, pkey=key)
else:
t2.connect(username=dest_user)
s2 = paramiko.SFTPClient.from_transport(t2)
files = s2.listdir(dest_folder)
s2.close(); t2.close()
if probe_filename in files:
log.info(f" Probe arrived. TM healthy.")
return True
except Exception as e:
log.debug(f" Destination check: {e}")
raise TMHealthError(
f"Probe '{probe_filename}' not found in '{dest_folder}' after {timeout}s. "
f"TM may not be running or subscription not registered. "
f"Try: ST Admin UI → Server Configuration → Transaction Manager → Restart"
)
class TMHealthError(Exception):
pass
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -514,18 +462,26 @@ if __name__ == "__main__":
parser.add_argument("--user", default="admin") parser.add_argument("--user", default="admin")
parser.add_argument("--pass", dest="password", default="openclaw") parser.add_argument("--pass", dest="password", default="openclaw")
sub = parser.add_subparsers(dest="cmd") sub = parser.add_subparsers(dest="cmd")
sub.add_parser("snapshot") sub.add_parser("snapshot")
log_p = sub.add_parser("logs")
d = sub.add_parser("delete-flow") log_p.add_argument("--limit", type=int, default=20)
d.add_argument("name") log_p.add_argument("--offset", type=int, default=0)
args = parser.parse_args() args = parser.parse_args()
st = STClient(args.host, args.user, args.password, args.port) st = STClient(args.host, args.user, args.password, args.port)
if args.cmd == "snapshot": if args.cmd == "snapshot":
print(json.dumps(st.snapshot(), indent=2)) print(json.dumps(st.snapshot(), indent=2))
elif args.cmd == "delete-flow": elif args.cmd == "logs":
st.delete_ar_flow(args.name) entries = st.get_transfer_logs(limit=args.limit, offset=args.offset)
for e in entries:
print(json.dumps({
"time": e.get("startTime"),
"account": e.get("account"),
"file": e.get("filename"),
"status": e.get("status"),
"dir": e.get("remoteDir"),
"app": e.get("application"),
}))
else: else:
parser.print_help() parser.print_help()