#!/usr/bin/env python3 """ run_tests.py — Automated test runner for ST AR flow validation. Uploads each fixture via SFTP, waits for routing to complete, verifies destination artifacts, and reports results. Usage: python scripts/run_tests.py --spec specs/compress-pgp-passthrough.yaml --env .env python scripts/run_tests.py --env .env --fixture compress-txt python scripts/run_tests.py --env .env --verbose Exit codes: 0 = all pass, 1 = one or more fail, 2 = environment/config error Output: JSON results to stdout, human summary to stderr. """ import argparse import json import os import sys import time import subprocess import ssl import base64 import urllib.request import urllib.error from pathlib import Path from datetime import datetime, timezone FIXTURES_DIR = Path(__file__).parent.parent / "fixtures" MACHINE_CONTEXT = Path(__file__).parent.parent / "MACHINE.md" # Canonical test cases — mirrors MACHINE.md test case registry TEST_CASES = [ { "id": "compress-txt", "fixture": "test.txt", "expected_filename": "test.txt.zip", "verify": "suffix_and_larger", "suffix": ".zip", "notes": "Compress step: output must be larger than input (ZIP header overhead)" }, { "id": "pgp-encrypt-md", "fixture": "test.md", "expected_filename": "test.md.pgp", "verify": "suffix_and_magic", "suffix": ".pgp", "magic_byte": 0xC1, "notes": "PgpEncryption step: binary PGP output, first byte must be 0xC1" }, { "id": "passthrough-csv", "fixture": "test.csv", "expected_filename": "test.csv", "verify": "exact_match", "notes": "No matching step: filename and size must be unchanged" }, { "id": "passthrough-pdf", "fixture": "test.pdf", "expected_filename": "test.pdf", "verify": "exact_match", "notes": "No matching step: filename and size must be unchanged" }, { "id": "passthrough-jpg", "fixture": "test.jpg", "expected_filename": "test.jpg", "verify": "exact_match", "notes": "No matching step: filename and size must be unchanged" }, ] def load_env(env_file): env = {} path = Path(env_file) if path.exists(): with open(path) as f: for line in f: line = line.strip() if line and not line.startswith("#") and "=" in line: k, v = line.split("=", 1) env[k.strip()] = v.strip().strip('"') # Merge with OS env (OS takes precedence) merged = {**env, **os.environ} return merged def require_env(env, *keys): missing = [k for k in keys if not env.get(k)] if missing: print(f"Missing required env vars: {', '.join(missing)}", file=sys.stderr) print("See README.md for the full list of required variables.", file=sys.stderr) sys.exit(2) def sftp_upload(env, local_path, verbose=False): """Upload a file to ST via SFTP. Returns True on success.""" host = env["ST_SFTP_HOST"] port = env.get("ST_SFTP_PORT", "8022") user = env["ST_SFTP_USER"] password = env["ST_SFTP_PASS"] remote_dir = env["ST_SFTP_UPLOAD_DIR"] filename = Path(local_path).name remote_path = f"{remote_dir}/{filename}" # Use sshpass + sftp cmd = [ "sshpass", "-p", password, "sftp", "-P", port, "-o", "StrictHostKeyChecking=no", "-o", "BatchMode=no", f"{user}@{host}" ] batch = f"put {local_path} {remote_path}\nquit\n" if verbose: print(f" SFTP: {local_path} → {user}@{host}:{remote_path}", file=sys.stderr) result = subprocess.run(cmd, input=batch.encode(), capture_output=True, timeout=30) return result.returncode == 0 def ssh_list_destination(env, verbose=False): """List files in partner destination dir. Returns dict of {filename: size_bytes}.""" host = env["PARTNER_SSH_HOST"] port = env.get("PARTNER_SSH_PORT", "22") user = env["PARTNER_SSH_USER"] key = env.get("PARTNER_SSH_KEY", "") dest_dir = env["PARTNER_DEST_DIR"] ssh_args = [ "ssh", "-p", port, "-o", "StrictHostKeyChecking=no", ] if key: ssh_args += ["-i", key] ssh_args += [f"{user}@{host}", f"ls -la {dest_dir}/"] if verbose: print(f" SSH list: {user}@{host}:{dest_dir}", file=sys.stderr) result = subprocess.run(ssh_args, capture_output=True, timeout=15) if result.returncode != 0: return {} files = {} for line in result.stdout.decode().splitlines(): parts = line.split() if len(parts) >= 9 and not line.startswith("total") and parts[0][0] != "d": size = int(parts[4]) fname = parts[8] files[fname] = size return files def ssh_read_bytes(env, filename, n=4, verbose=False): """Read first n bytes of a file on the partner host. Returns bytes or None.""" host = env["PARTNER_SSH_HOST"] port = env.get("PARTNER_SSH_PORT", "22") user = env["PARTNER_SSH_USER"] key = env.get("PARTNER_SSH_KEY", "") dest_dir = env["PARTNER_DEST_DIR"] ssh_args = ["ssh", "-p", port, "-o", "StrictHostKeyChecking=no"] if key: ssh_args += ["-i", key] ssh_args += [f"{user}@{host}", f"dd if={dest_dir}/{filename} bs=1 count={n} 2>/dev/null | xxd -p"] result = subprocess.run(ssh_args, capture_output=True, timeout=10) if result.returncode != 0: return None try: hex_str = result.stdout.decode().strip().replace("\n", "") return bytes.fromhex(hex_str) except Exception: return None def ssh_delete_files(env, filenames, verbose=False): """Remove test artifacts from destination after run.""" if not filenames: return host = env["PARTNER_SSH_HOST"] port = env.get("PARTNER_SSH_PORT", "22") user = env["PARTNER_SSH_USER"] key = env.get("PARTNER_SSH_KEY", "") dest_dir = env["PARTNER_DEST_DIR"] names = " ".join(f"{dest_dir}/{f}" for f in filenames) ssh_args = ["ssh", "-p", port, "-o", "StrictHostKeyChecking=no"] if key: ssh_args += ["-i", key] ssh_args += [f"{user}@{host}", f"rm -f {names}"] subprocess.run(ssh_args, capture_output=True, timeout=10) def verify_result(tc, dest_files, env, verbose): expected = tc["expected_filename"] fixture_path = FIXTURES_DIR / tc["fixture"] fixture_size = fixture_path.stat().st_size if expected not in dest_files: return False, f"Expected '{expected}' not found in destination. Present: {list(dest_files.keys())}" dest_size = dest_files[expected] method = tc["verify"] if method == "exact_match": if dest_size != fixture_size: return False, f"Size mismatch: expected {fixture_size}B, got {dest_size}B" return True, f"Pass-through confirmed: '{expected}' ({dest_size}B)" elif method == "suffix_and_larger": if not expected.endswith(tc["suffix"]): return False, f"Filename does not end with '{tc['suffix']}'" if dest_size <= fixture_size: return False, f"Compressed file ({dest_size}B) not larger than input ({fixture_size}B) — compression may have failed" return True, f"Compressed: '{expected}' ({fixture_size}B → {dest_size}B)" elif method == "suffix_and_magic": if not expected.endswith(tc["suffix"]): return False, f"Filename does not end with '{tc['suffix']}'" magic = tc.get("magic_byte") if magic is not None: first_bytes = ssh_read_bytes(env, expected, 1, verbose) if first_bytes is None: return False, "Could not read destination file bytes for magic check" if first_bytes[0] != magic: return False, f"Magic byte mismatch: expected 0x{magic:02X}, got 0x{first_bytes[0]:02X}" return True, f"PGP encrypted: '{expected}' ({dest_size}B, magic byte OK)" return False, f"Unknown verify method: {method}" def run_tests(test_cases, env, verbose=False): results = [] timeout = int(env.get("ROUTING_TIMEOUT_SEC", "30")) uploaded = [] for tc in test_cases: fixture_path = FIXTURES_DIR / tc["fixture"] if not fixture_path.exists(): results.append({"id": tc["id"], "status": "ERROR", "message": f"Fixture not found: {fixture_path}"}) continue print(f"\n [{tc['id']}] Uploading {tc['fixture']}...", file=sys.stderr) ok = sftp_upload(env, str(fixture_path), verbose) if not ok: results.append({"id": tc["id"], "status": "ERROR", "message": "SFTP upload failed"}) continue uploaded.append(tc) # Wait for routing if uploaded: print(f"\n Waiting {timeout}s for routing...", file=sys.stderr) time.sleep(timeout) # Verify all dest_files = ssh_list_destination(env, verbose) if verbose: print(f" Destination files: {dest_files}", file=sys.stderr) artifacts_to_clean = [] for tc in uploaded: passed, message = verify_result(tc, dest_files, env, verbose) status = "PASS" if passed else "FAIL" results.append({ "id": tc["id"], "fixture": tc["fixture"], "expected": tc["expected_filename"], "status": status, "message": message }) artifacts_to_clean.append(tc["expected_filename"]) icon = "✅" if passed else "❌" print(f" {icon} {tc['id']}: {message}", file=sys.stderr) # Cleanup destination ssh_delete_files(env, artifacts_to_clean, verbose) return results def main(): parser = argparse.ArgumentParser(description="Run ST AR flow test suite") parser.add_argument("--spec", help="Flow spec YAML (used for context only)") parser.add_argument("--env", default=".env", help="Env file path") parser.add_argument("--fixture", help="Run only this fixture id") parser.add_argument("--verbose", action="store_true") parser.add_argument("--no-cleanup", action="store_true", help="Leave destination artifacts after run") args = parser.parse_args() env = load_env(args.env) require_env(env, "ST_SFTP_HOST", "ST_SFTP_USER", "ST_SFTP_PASS", "ST_SFTP_UPLOAD_DIR", "PARTNER_SSH_HOST", "PARTNER_SSH_USER", "PARTNER_DEST_DIR" ) cases = TEST_CASES if args.fixture: cases = [tc for tc in TEST_CASES if tc["id"] == args.fixture] if not cases: print(f"Unknown fixture id: {args.fixture}. Valid: {[t['id'] for t in TEST_CASES]}", file=sys.stderr) sys.exit(2) print(f"\nST Flow Test Run — {datetime.now(timezone.utc).isoformat()}", file=sys.stderr) print(f"Running {len(cases)} test(s)\n", file=sys.stderr) results = run_tests(cases, env, args.verbose) passed = sum(1 for r in results if r["status"] == "PASS") failed = sum(1 for r in results if r["status"] == "FAIL") errors = sum(1 for r in results if r["status"] == "ERROR") print(f"\n{'='*40}", file=sys.stderr) print(f"Results: {passed} passed, {failed} failed, {errors} errors", file=sys.stderr) output = { "run_at": datetime.now(timezone.utc).isoformat(), "total": len(results), "passed": passed, "failed": failed, "errors": errors, "results": results } print(json.dumps(output, indent=2)) sys.exit(0 if failed == 0 and errors == 0 else 1) if __name__ == "__main__": main()