Files
st-flow-tests/st_client.py

532 lines
21 KiB
Python

#!/usr/bin/env python3
"""
st_client.py — SecureTransport AR Flow Client Library
Abstracts the ST REST API quirks so callers work with typed objects,
not raw JSON payloads. All known field requirements, mandatory defaults,
and undocumented formats are encoded here.
Usage:
import sys
sys.path.insert(0, '/home/node/.openclaw/site-packages')
from st_client import STClient, CompressStep, PgpEncryptStep, SendToPartnerStep
st = STClient("192.168.0.245", "admin", "openclaw")
flow = st.create_ar_flow(
name="floweng",
account="conan",
folder="/floweng",
steps=[
PgpEncryptStep(filter="*.jpg", key="test-pgp-public", key_owner="conan"),
CompressStep(filter="*.txt"),
SendToPartnerStep(site="clawdbox-partner-site-1771557836"),
]
)
st.delete_ar_flow("floweng")
"""
import sys
import json
import time
import io
import logging
sys.path.insert(0, '/home/node/.openclaw/site-packages')
import requests
import paramiko
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
log = logging.getLogger("st_client")
# ---------------------------------------------------------------------------
# Step definitions
# ---------------------------------------------------------------------------
class CompressStep:
"""Compress matching files to ZIP. Non-matching files pass through."""
def __init__(self, filter="*", level="NORMAL", single_archive=False,
on_failure="PROCEED"):
self.filter = filter
self.level = level
self.single_archive = single_archive
self.on_failure = on_failure
def to_api(self):
payload = {
"type": "Compress",
"status": "ENABLED",
"conditionType": "ALWAYS",
"actionOnStepFailure": self.on_failure,
"fileFilterExpression": self.filter,
"fileFilterExpressionType": "GLOB",
"usePrecedingStepFiles": False,
"compressionType": "ZIP",
# Empirically validated values: STORE FASTEST FAST NORMAL GOOD BETTER BEST
"compressionLevel": self.level,
"singleArchiveEnabled": self.single_archive,
"zipPassword": "",
}
return payload
class PgpEncryptStep:
"""PGP-encrypt matching files. Non-matching files pass through."""
def __init__(self, filter="*", key=None, key_owner=None,
ascii_armour=False, on_failure="PROCEED"):
if not key:
raise ValueError("PgpEncryptStep requires key (cert alias name)")
if not key_owner:
raise ValueError("PgpEncryptStep requires key_owner (account name)")
self.filter = filter
self.key = key
self.key_owner = key_owner
self.ascii_armour = ascii_armour
self.on_failure = on_failure
def to_api(self):
return {
"type": "PgpEncryption",
"status": "ENABLED",
"conditionType": "ALWAYS",
"actionOnStepFailure": self.on_failure,
"fileFilterExpression": self.filter,
"fileFilterExpressionType": "GLOB",
"usePrecedingStepFiles": False,
# ALIAS = use cert name directly; EXPRESSION_WILDCARD = EL expression
"encryptKeyExpression": self.key,
"encryptKeyExpressionType": "ALIAS",
"encryptKeyOwnerExpression": self.key_owner,
"encryptKeyOwnerExpressionType": "NAME",
# "0" disables PGP-level compression (use Compress step instead)
"compressionType": "0",
"useAsciiArmour": self.ascii_armour,
}
class SendToPartnerStep:
"""Deliver all files in route payload to a partner transfer site."""
def __init__(self, site=None, filter="*", on_failure="FAIL"):
if not site:
raise ValueError("SendToPartnerStep requires site (transfer site name)")
self.site = site
self.filter = filter
self.on_failure = on_failure
def to_api(self):
return {
"type": "SendToPartner",
"status": "ENABLED",
"conditionType": "ALWAYS",
"actionOnStepFailure": self.on_failure,
"fileFilterExpression": self.filter,
"fileFilterExpressionType": "GLOB",
"usePrecedingStepFiles": False,
# API requires LIST type with #!#CVD#!# suffix — not documented publicly
"transferSiteExpression": f"{self.site}#!#CVD#!#",
"transferSiteExpressionType": "LIST",
}
# ---------------------------------------------------------------------------
# AR Flow result object
# ---------------------------------------------------------------------------
class ARFlow:
"""Holds IDs for all objects in a created AR flow."""
def __init__(self, name, app_id, sub_id, simple_id, template_id, composite_id):
self.name = name
self.app_id = app_id
self.sub_id = sub_id
self.simple_id = simple_id
self.template_id = template_id
self.composite_id = composite_id
def __repr__(self):
return (f"ARFlow({self.name!r}, app={self.app_id}, sub={self.sub_id}, "
f"simple={self.simple_id}, template={self.template_id}, "
f"composite={self.composite_id})")
# ---------------------------------------------------------------------------
# Main client
# ---------------------------------------------------------------------------
class STClient:
"""
SecureTransport REST API client.
Handles auth, TLS, empty-body responses, and the AR object graph.
All API quirks are encoded here — callers work with typed objects.
"""
def __init__(self, host, user, password, port=444, verify=False):
self.base = f"https://{host}:{port}/api/v2.0"
self.base_v14 = f"https://{host}:{port}/api/v1.4"
self.session = requests.Session()
self.session.auth = (user, password)
self.session.verify = verify
self.session.headers.update({"Accept": "application/json",
"Content-Type": "application/json"})
self._host = host
self._port = port
# -- Low-level helpers --------------------------------------------------
def _get(self, path, v14=False):
base = self.base_v14 if v14 else self.base
r = self.session.get(f"{base}{path}")
r.raise_for_status()
return r.json()
def _post(self, path, body):
r = self.session.post(f"{self.base}{path}", json=body)
if r.status_code not in (200, 201, 204):
try:
detail = r.json()
except Exception:
detail = r.text
raise RuntimeError(f"POST {path} failed {r.status_code}: {detail}")
# Some endpoints return empty body on success
if r.content.strip():
return r.json()
return {}
def _put(self, path, body):
r = self.session.put(f"{self.base}{path}", json=body)
if r.status_code not in (200, 201, 204):
try:
detail = r.json()
except Exception:
detail = r.text
raise RuntimeError(f"PUT {path} failed {r.status_code}: {detail}")
if r.content.strip():
return r.json()
return {}
def _delete(self, path):
r = self.session.delete(f"{self.base}{path}")
if r.status_code not in (200, 204):
try:
detail = r.json()
except Exception:
detail = r.text
raise RuntimeError(f"DELETE {path} failed {r.status_code}: {detail}")
def _find_by_name(self, path, name):
"""Return first result with matching name, or None."""
data = self._get(f"{path}?name={requests.utils.quote(name)}&limit=10")
for item in data.get("result", []):
if item.get("name") == name:
return item
return None
# -- Snapshot -----------------------------------------------------------
def snapshot(self):
"""Return a summary of the live ST environment."""
accounts_raw = self._get("/accounts?limit=200").get("result", [])
sites_raw = self._get("/sites?limit=100").get("result", [])
certs_raw = self._get("/certificates?limit=200").get("result", [])
apps_raw = self._get("/applications?limit=100").get("result", [])
subs_raw = self._get("/subscriptions?limit=100").get("result", [])
return {
"accounts": [
{"name": a["name"], "type": a.get("type"),
"locked": a.get("user", {}).get("locked", False)}
for a in accounts_raw if a.get("type") in ("user",)
],
"sites": [
{"name": s["name"], "protocol": s.get("protocol"),
"host": s.get("host"), "port": s.get("port")}
for s in sites_raw
],
"certificates": [
{"name": c["name"], "type": c.get("type"),
"usage": c.get("usage"), "account": c.get("account"),
"accessLevel": c.get("accessLevel")}
for c in certs_raw
if c.get("type") in ("pgp", "ssh")
],
"ar_applications": [
{"name": a["name"], "id": a["id"]}
for a in apps_raw
if a.get("type") == "AdvancedRouting"
],
"subscriptions": [
{"id": s["id"], "account": s.get("account"),
"folder": s.get("folder"), "application": s.get("application")}
for s in subs_raw
],
}
# -- AR Flow lifecycle --------------------------------------------------
def create_ar_flow(self, name, account, folder, steps,
pta_trigger=True):
"""
Create a complete AR flow: Application → Subscription →
SIMPLE route → TEMPLATE route → COMPOSITE route → link.
Returns an ARFlow object with all created IDs.
Raises on any step failure — no partial state left unchecked.
"""
app_name = f"{name}-ar"
simple_name = f"{name}-simple"
template_name = f"{name}-template"
composite_name = f"{name}-composite"
log.info(f"Creating AR Application: {app_name}")
existing_app = self._find_by_name("/applications", app_name)
if existing_app:
log.info(f" Application already exists: {existing_app['id']}")
app_id = existing_app["id"]
else:
self._post("/applications", {"name": app_name, "type": "AdvancedRouting"})
app_id = self._find_by_name("/applications", app_name)["id"]
log.info(f" app_id={app_id}")
log.info(f"Creating Subscription: account={account} folder={folder}")
existing_sub = None
subs = self._get(f"/subscriptions?account={account}&limit=100").get("result", [])
for s in subs:
if s.get("folder") == folder and s.get("application") == app_name:
existing_sub = s
break
if existing_sub:
log.info(f" Subscription already exists: {existing_sub['id']}")
sub_id = existing_sub["id"]
else:
pta = {
"ptaOnSuccessDoInAdvancedRoutingWildcardPull": pta_trigger,
"submitFilenamePatternExpression": "*",
"submitFilterType": "FILENAME_PATTERN",
"triggerFileOption": "fail",
"triggerOnSuccessfulWildcardPull": pta_trigger,
}
self._post("/subscriptions", {
"type": "AdvancedRouting",
"folder": folder,
"account": account,
"application": app_name,
"postTransmissionActions": pta,
})
subs = self._get(f"/subscriptions?account={account}&limit=100").get("result", [])
sub_id = next(s["id"] for s in subs
if s.get("folder") == folder and s.get("application") == app_name)
log.info(f" sub_id={sub_id}")
log.info(f"Creating SIMPLE route: {simple_name}")
existing_simple = self._find_by_name("/routes", simple_name)
if existing_simple:
log.info(f" SIMPLE already exists: {existing_simple['id']}")
simple_id = existing_simple["id"]
else:
step_payloads = [s.to_api() for s in steps]
self._post("/routes", {
"name": simple_name,
"type": "SIMPLE",
"conditionType": "ALWAYS",
"steps": step_payloads,
})
simple_id = self._find_by_name("/routes", simple_name)["id"]
log.info(f" simple_id={simple_id}")
log.info(f"Creating TEMPLATE route: {template_name}")
existing_template = self._find_by_name("/routes", template_name)
if existing_template:
log.info(f" TEMPLATE already exists: {existing_template['id']}")
template_id = existing_template["id"]
else:
# TEMPLATE conditionType quirk: must be in payload but API rejects
# it as "unsupported parameter" in multiline payloads. Send as a
# pre-serialised string to avoid requests library re-serialising with
# newlines. Use data= not json= to control the wire format exactly.
template_payload = json.dumps({
"name": template_name,
"type": "TEMPLATE",
"conditionType": "MATCH_ALL",
"steps": [{"type": "ExecuteRoute", "status": "ENABLED",
"executeRoute": simple_id, "autostart": False}]
})
r = self.session.post(f"{self.base}/routes", data=template_payload)
if r.status_code not in (200, 201, 204):
raise RuntimeError(
f"POST TEMPLATE failed {r.status_code}: {r.text}")
template_id = self._find_by_name("/routes", template_name)["id"]
log.info(f" template_id={template_id}")
log.info(f"Creating COMPOSITE route: {composite_name}")
existing_composite = self._find_by_name("/routes", composite_name)
if existing_composite:
log.info(f" COMPOSITE already exists: {existing_composite['id']}")
composite_id = existing_composite["id"]
else:
self._post("/routes", {
"name": composite_name,
"type": "COMPOSITE",
"conditionType": "MATCH_ALL",
"routeTemplate": template_id,
"steps": [],
})
composite_id = self._find_by_name("/routes", composite_name)["id"]
log.info(f" composite_id={composite_id}")
log.info(f"Linking COMPOSITE to subscription")
comp_full = self._get(f"/routes/{composite_id}")
if sub_id not in comp_full.get("subscriptions", []):
comp_full["subscriptions"] = [sub_id]
for field in ("id", "metadata", "steps", "routeTemplateName", "account"):
comp_full.pop(field, None)
self._put(f"/routes/{composite_id}", comp_full)
log.info(f" Linked.")
else:
log.info(f" Already linked.")
flow = ARFlow(name, app_id, sub_id, simple_id, template_id, composite_id)
log.info(f"Flow created: {flow}")
return flow
def delete_ar_flow(self, name):
"""
Delete a complete AR flow by name prefix.
Deletion order: unlink COMPOSITE → delete COMPOSITE →
delete TEMPLATE → delete SIMPLE → delete subscription → delete application.
"""
composite_name = f"{name}-composite"
template_name = f"{name}-template"
simple_name = f"{name}-simple"
app_name = f"{name}-ar"
# 1. Unlink and delete COMPOSITE
composite = self._find_by_name("/routes", composite_name)
if composite:
cid = composite["id"]
full = self._get(f"/routes/{cid}")
if full.get("subscriptions"):
full["subscriptions"] = []
for field in ("id", "metadata", "steps", "routeTemplateName", "account"):
full.pop(field, None)
self._put(f"/routes/{cid}", full)
log.info(f"Unlinked COMPOSITE {cid}")
self._delete(f"/routes/{cid}")
log.info(f"Deleted COMPOSITE {cid}")
# 2. Delete TEMPLATE
template = self._find_by_name("/routes", template_name)
if template:
self._delete(f"/routes/{template['id']}")
log.info(f"Deleted TEMPLATE {template['id']}")
# 3. Delete SIMPLE
simple = self._find_by_name("/routes", simple_name)
if simple:
self._delete(f"/routes/{simple['id']}")
log.info(f"Deleted SIMPLE {simple['id']}")
# 4. Delete subscription
subs = self._get("/subscriptions?limit=100").get("result", [])
for s in subs:
if s.get("application") == app_name:
self._delete(f"/subscriptions/{s['id']}")
log.info(f"Deleted subscription {s['id']}")
# 5. Delete application
app = self._find_by_name("/applications", app_name)
if app:
self._delete(f"/applications/{app['id']}")
log.info(f"Deleted application {app['id']}")
log.info(f"Flow '{name}' deleted.")
# -- TM health check ----------------------------------------------------
def verify_routing(self, sftp_host, sftp_port, sftp_user, sftp_password,
upload_folder, dest_host, dest_port, dest_user, dest_key,
dest_folder, probe_filename="tm-probe.txt",
timeout=30):
"""
Upload a probe file via SFTP, wait for routing to deliver it,
verify arrival at destination. Returns True on success.
Raises TMHealthError if TM does not process within timeout.
"""
probe_content = f"tm-probe {time.time()}\n".encode()
log.info(f"Uploading probe: {upload_folder}/{probe_filename}")
transport = paramiko.Transport((sftp_host, sftp_port))
transport.connect(username=sftp_user, password=sftp_password)
sftp = paramiko.SFTPClient.from_transport(transport)
try:
sftp.chdir(upload_folder)
except IOError:
sftp.mkdir(upload_folder)
sftp.chdir(upload_folder)
sftp.putfo(io.BytesIO(probe_content), probe_filename)
sftp.close()
transport.close()
log.info(" Probe uploaded.")
log.info(f"Waiting for delivery to {dest_folder}/{probe_filename}")
deadline = time.time() + timeout
key = paramiko.RSAKey.from_private_key_file(dest_key) if dest_key else None
while time.time() < deadline:
time.sleep(3)
try:
t2 = paramiko.Transport((dest_host, dest_port))
if key:
t2.connect(username=dest_user, pkey=key)
else:
t2.connect(username=dest_user)
s2 = paramiko.SFTPClient.from_transport(t2)
files = s2.listdir(dest_folder)
s2.close(); t2.close()
if probe_filename in files:
log.info(f" Probe arrived. TM healthy.")
return True
except Exception as e:
log.debug(f" Destination check: {e}")
raise TMHealthError(
f"Probe '{probe_filename}' not found in '{dest_folder}' after {timeout}s. "
f"TM may not be running or subscription not registered. "
f"Try: ST Admin UI → Server Configuration → Transaction Manager → Restart"
)
class TMHealthError(Exception):
pass
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import argparse
logging.basicConfig(level=logging.INFO, format="%(message)s")
parser = argparse.ArgumentParser(description="ST Client CLI")
parser.add_argument("--host", required=True)
parser.add_argument("--port", type=int, default=444)
parser.add_argument("--user", default="admin")
parser.add_argument("--pass", dest="password", default="openclaw")
sub = parser.add_subparsers(dest="cmd")
sub.add_parser("snapshot")
d = sub.add_parser("delete-flow")
d.add_argument("name")
args = parser.parse_args()
st = STClient(args.host, args.user, args.password, args.port)
if args.cmd == "snapshot":
print(json.dumps(st.snapshot(), indent=2))
elif args.cmd == "delete-flow":
st.delete_ar_flow(args.name)
else:
parser.print_help()