Files
st-flow-tests/scripts/run_tests.py
2026-03-04 10:14:41 +00:00

322 lines
11 KiB
Python

#!/usr/bin/env python3
"""
run_tests.py — Automated test runner for ST AR flow validation.
Uploads each fixture via SFTP, waits for routing to complete,
verifies destination artifacts, and reports results.
Usage:
python scripts/run_tests.py --spec specs/compress-pgp-passthrough.yaml --env .env
python scripts/run_tests.py --env .env --fixture compress-txt
python scripts/run_tests.py --env .env --verbose
Exit codes: 0 = all pass, 1 = one or more fail, 2 = environment/config error
Output: JSON results to stdout, human summary to stderr.
"""
import argparse
import json
import os
import sys
import time
import subprocess
import ssl
import base64
import urllib.request
import urllib.error
from pathlib import Path
from datetime import datetime, timezone
FIXTURES_DIR = Path(__file__).parent.parent / "fixtures"
MACHINE_CONTEXT = Path(__file__).parent.parent / "MACHINE.md"
# Canonical test cases — mirrors MACHINE.md test case registry
TEST_CASES = [
{
"id": "compress-txt",
"fixture": "test.txt",
"expected_filename": "test.txt.zip",
"verify": "suffix_and_larger",
"suffix": ".zip",
"notes": "Compress step: output must be larger than input (ZIP header overhead)"
},
{
"id": "pgp-encrypt-md",
"fixture": "test.md",
"expected_filename": "test.md.pgp",
"verify": "suffix_and_magic",
"suffix": ".pgp",
"magic_byte": 0xC1,
"notes": "PgpEncryption step: binary PGP output, first byte must be 0xC1"
},
{
"id": "passthrough-csv",
"fixture": "test.csv",
"expected_filename": "test.csv",
"verify": "exact_match",
"notes": "No matching step: filename and size must be unchanged"
},
{
"id": "passthrough-pdf",
"fixture": "test.pdf",
"expected_filename": "test.pdf",
"verify": "exact_match",
"notes": "No matching step: filename and size must be unchanged"
},
{
"id": "passthrough-jpg",
"fixture": "test.jpg",
"expected_filename": "test.jpg",
"verify": "exact_match",
"notes": "No matching step: filename and size must be unchanged"
},
]
def load_env(env_file):
env = {}
path = Path(env_file)
if path.exists():
with open(path) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
env[k.strip()] = v.strip().strip('"')
# Merge with OS env (OS takes precedence)
merged = {**env, **os.environ}
return merged
def require_env(env, *keys):
missing = [k for k in keys if not env.get(k)]
if missing:
print(f"Missing required env vars: {', '.join(missing)}", file=sys.stderr)
print("See README.md for the full list of required variables.", file=sys.stderr)
sys.exit(2)
def sftp_upload(env, local_path, verbose=False):
"""Upload a file to ST via SFTP. Returns True on success."""
host = env["ST_SFTP_HOST"]
port = env.get("ST_SFTP_PORT", "8022")
user = env["ST_SFTP_USER"]
password = env["ST_SFTP_PASS"]
remote_dir = env["ST_SFTP_UPLOAD_DIR"]
filename = Path(local_path).name
remote_path = f"{remote_dir}/{filename}"
# Use sshpass + sftp
cmd = [
"sshpass", "-p", password,
"sftp", "-P", port,
"-o", "StrictHostKeyChecking=no",
"-o", "BatchMode=no",
f"{user}@{host}"
]
batch = f"put {local_path} {remote_path}\nquit\n"
if verbose:
print(f" SFTP: {local_path}{user}@{host}:{remote_path}", file=sys.stderr)
result = subprocess.run(cmd, input=batch.encode(), capture_output=True, timeout=30)
return result.returncode == 0
def ssh_list_destination(env, verbose=False):
"""List files in partner destination dir. Returns dict of {filename: size_bytes}."""
host = env["PARTNER_SSH_HOST"]
port = env.get("PARTNER_SSH_PORT", "22")
user = env["PARTNER_SSH_USER"]
key = env.get("PARTNER_SSH_KEY", "")
dest_dir = env["PARTNER_DEST_DIR"]
ssh_args = [
"ssh", "-p", port,
"-o", "StrictHostKeyChecking=no",
]
if key:
ssh_args += ["-i", key]
ssh_args += [f"{user}@{host}", f"ls -la {dest_dir}/"]
if verbose:
print(f" SSH list: {user}@{host}:{dest_dir}", file=sys.stderr)
result = subprocess.run(ssh_args, capture_output=True, timeout=15)
if result.returncode != 0:
return {}
files = {}
for line in result.stdout.decode().splitlines():
parts = line.split()
if len(parts) >= 9 and not line.startswith("total") and parts[0][0] != "d":
size = int(parts[4])
fname = parts[8]
files[fname] = size
return files
def ssh_read_bytes(env, filename, n=4, verbose=False):
"""Read first n bytes of a file on the partner host. Returns bytes or None."""
host = env["PARTNER_SSH_HOST"]
port = env.get("PARTNER_SSH_PORT", "22")
user = env["PARTNER_SSH_USER"]
key = env.get("PARTNER_SSH_KEY", "")
dest_dir = env["PARTNER_DEST_DIR"]
ssh_args = ["ssh", "-p", port, "-o", "StrictHostKeyChecking=no"]
if key:
ssh_args += ["-i", key]
ssh_args += [f"{user}@{host}", f"dd if={dest_dir}/{filename} bs=1 count={n} 2>/dev/null | xxd -p"]
result = subprocess.run(ssh_args, capture_output=True, timeout=10)
if result.returncode != 0:
return None
try:
hex_str = result.stdout.decode().strip().replace("\n", "")
return bytes.fromhex(hex_str)
except Exception:
return None
def ssh_delete_files(env, filenames, verbose=False):
"""Remove test artifacts from destination after run."""
if not filenames:
return
host = env["PARTNER_SSH_HOST"]
port = env.get("PARTNER_SSH_PORT", "22")
user = env["PARTNER_SSH_USER"]
key = env.get("PARTNER_SSH_KEY", "")
dest_dir = env["PARTNER_DEST_DIR"]
names = " ".join(f"{dest_dir}/{f}" for f in filenames)
ssh_args = ["ssh", "-p", port, "-o", "StrictHostKeyChecking=no"]
if key:
ssh_args += ["-i", key]
ssh_args += [f"{user}@{host}", f"rm -f {names}"]
subprocess.run(ssh_args, capture_output=True, timeout=10)
def verify_result(tc, dest_files, env, verbose):
expected = tc["expected_filename"]
fixture_path = FIXTURES_DIR / tc["fixture"]
fixture_size = fixture_path.stat().st_size
if expected not in dest_files:
return False, f"Expected '{expected}' not found in destination. Present: {list(dest_files.keys())}"
dest_size = dest_files[expected]
method = tc["verify"]
if method == "exact_match":
if dest_size != fixture_size:
return False, f"Size mismatch: expected {fixture_size}B, got {dest_size}B"
return True, f"Pass-through confirmed: '{expected}' ({dest_size}B)"
elif method == "suffix_and_larger":
if not expected.endswith(tc["suffix"]):
return False, f"Filename does not end with '{tc['suffix']}'"
if dest_size <= fixture_size:
return False, f"Compressed file ({dest_size}B) not larger than input ({fixture_size}B) — compression may have failed"
return True, f"Compressed: '{expected}' ({fixture_size}B → {dest_size}B)"
elif method == "suffix_and_magic":
if not expected.endswith(tc["suffix"]):
return False, f"Filename does not end with '{tc['suffix']}'"
magic = tc.get("magic_byte")
if magic is not None:
first_bytes = ssh_read_bytes(env, expected, 1, verbose)
if first_bytes is None:
return False, "Could not read destination file bytes for magic check"
if first_bytes[0] != magic:
return False, f"Magic byte mismatch: expected 0x{magic:02X}, got 0x{first_bytes[0]:02X}"
return True, f"PGP encrypted: '{expected}' ({dest_size}B, magic byte OK)"
return False, f"Unknown verify method: {method}"
def run_tests(test_cases, env, verbose=False):
results = []
timeout = int(env.get("ROUTING_TIMEOUT_SEC", "30"))
uploaded = []
for tc in test_cases:
fixture_path = FIXTURES_DIR / tc["fixture"]
if not fixture_path.exists():
results.append({"id": tc["id"], "status": "ERROR", "message": f"Fixture not found: {fixture_path}"})
continue
print(f"\n [{tc['id']}] Uploading {tc['fixture']}...", file=sys.stderr)
ok = sftp_upload(env, str(fixture_path), verbose)
if not ok:
results.append({"id": tc["id"], "status": "ERROR", "message": "SFTP upload failed"})
continue
uploaded.append(tc)
# Wait for routing
if uploaded:
print(f"\n Waiting {timeout}s for routing...", file=sys.stderr)
time.sleep(timeout)
# Verify all
dest_files = ssh_list_destination(env, verbose)
if verbose:
print(f" Destination files: {dest_files}", file=sys.stderr)
artifacts_to_clean = []
for tc in uploaded:
passed, message = verify_result(tc, dest_files, env, verbose)
status = "PASS" if passed else "FAIL"
results.append({
"id": tc["id"],
"fixture": tc["fixture"],
"expected": tc["expected_filename"],
"status": status,
"message": message
})
artifacts_to_clean.append(tc["expected_filename"])
icon = "" if passed else ""
print(f" {icon} {tc['id']}: {message}", file=sys.stderr)
# Cleanup destination
ssh_delete_files(env, artifacts_to_clean, verbose)
return results
def main():
parser = argparse.ArgumentParser(description="Run ST AR flow test suite")
parser.add_argument("--spec", help="Flow spec YAML (used for context only)")
parser.add_argument("--env", default=".env", help="Env file path")
parser.add_argument("--fixture", help="Run only this fixture id")
parser.add_argument("--verbose", action="store_true")
parser.add_argument("--no-cleanup", action="store_true", help="Leave destination artifacts after run")
args = parser.parse_args()
env = load_env(args.env)
require_env(env,
"ST_SFTP_HOST", "ST_SFTP_USER", "ST_SFTP_PASS", "ST_SFTP_UPLOAD_DIR",
"PARTNER_SSH_HOST", "PARTNER_SSH_USER", "PARTNER_DEST_DIR"
)
cases = TEST_CASES
if args.fixture:
cases = [tc for tc in TEST_CASES if tc["id"] == args.fixture]
if not cases:
print(f"Unknown fixture id: {args.fixture}. Valid: {[t['id'] for t in TEST_CASES]}", file=sys.stderr)
sys.exit(2)
print(f"\nST Flow Test Run — {datetime.now(timezone.utc).isoformat()}", file=sys.stderr)
print(f"Running {len(cases)} test(s)\n", file=sys.stderr)
results = run_tests(cases, env, args.verbose)
passed = sum(1 for r in results if r["status"] == "PASS")
failed = sum(1 for r in results if r["status"] == "FAIL")
errors = sum(1 for r in results if r["status"] == "ERROR")
print(f"\n{'='*40}", file=sys.stderr)
print(f"Results: {passed} passed, {failed} failed, {errors} errors", file=sys.stderr)
output = {
"run_at": datetime.now(timezone.utc).isoformat(),
"total": len(results),
"passed": passed,
"failed": failed,
"errors": errors,
"results": results
}
print(json.dumps(output, indent=2))
sys.exit(0 if failed == 0 and errors == 0 else 1)
if __name__ == "__main__":
main()