From a149c3eb3978415f94f55674b81e8e664d9a56db Mon Sep 17 00:00:00 2001 From: Conan Scott Date: Wed, 4 Mar 2026 10:14:41 +0000 Subject: [PATCH] Add scripts/run_tests.py --- scripts/run_tests.py | 321 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 321 insertions(+) create mode 100644 scripts/run_tests.py diff --git a/scripts/run_tests.py b/scripts/run_tests.py new file mode 100644 index 0000000..0078204 --- /dev/null +++ b/scripts/run_tests.py @@ -0,0 +1,321 @@ +#!/usr/bin/env python3 +""" +run_tests.py — Automated test runner for ST AR flow validation. + +Uploads each fixture via SFTP, waits for routing to complete, +verifies destination artifacts, and reports results. + +Usage: + python scripts/run_tests.py --spec specs/compress-pgp-passthrough.yaml --env .env + python scripts/run_tests.py --env .env --fixture compress-txt + python scripts/run_tests.py --env .env --verbose + +Exit codes: 0 = all pass, 1 = one or more fail, 2 = environment/config error + +Output: JSON results to stdout, human summary to stderr. +""" + +import argparse +import json +import os +import sys +import time +import subprocess +import ssl +import base64 +import urllib.request +import urllib.error +from pathlib import Path +from datetime import datetime, timezone + +FIXTURES_DIR = Path(__file__).parent.parent / "fixtures" +MACHINE_CONTEXT = Path(__file__).parent.parent / "MACHINE.md" + +# Canonical test cases — mirrors MACHINE.md test case registry +TEST_CASES = [ + { + "id": "compress-txt", + "fixture": "test.txt", + "expected_filename": "test.txt.zip", + "verify": "suffix_and_larger", + "suffix": ".zip", + "notes": "Compress step: output must be larger than input (ZIP header overhead)" + }, + { + "id": "pgp-encrypt-md", + "fixture": "test.md", + "expected_filename": "test.md.pgp", + "verify": "suffix_and_magic", + "suffix": ".pgp", + "magic_byte": 0xC1, + "notes": "PgpEncryption step: binary PGP output, first byte must be 0xC1" + }, + { + "id": "passthrough-csv", + "fixture": "test.csv", + "expected_filename": "test.csv", + "verify": "exact_match", + "notes": "No matching step: filename and size must be unchanged" + }, + { + "id": "passthrough-pdf", + "fixture": "test.pdf", + "expected_filename": "test.pdf", + "verify": "exact_match", + "notes": "No matching step: filename and size must be unchanged" + }, + { + "id": "passthrough-jpg", + "fixture": "test.jpg", + "expected_filename": "test.jpg", + "verify": "exact_match", + "notes": "No matching step: filename and size must be unchanged" + }, +] + +def load_env(env_file): + env = {} + path = Path(env_file) + if path.exists(): + with open(path) as f: + for line in f: + line = line.strip() + if line and not line.startswith("#") and "=" in line: + k, v = line.split("=", 1) + env[k.strip()] = v.strip().strip('"') + # Merge with OS env (OS takes precedence) + merged = {**env, **os.environ} + return merged + +def require_env(env, *keys): + missing = [k for k in keys if not env.get(k)] + if missing: + print(f"Missing required env vars: {', '.join(missing)}", file=sys.stderr) + print("See README.md for the full list of required variables.", file=sys.stderr) + sys.exit(2) + +def sftp_upload(env, local_path, verbose=False): + """Upload a file to ST via SFTP. Returns True on success.""" + host = env["ST_SFTP_HOST"] + port = env.get("ST_SFTP_PORT", "8022") + user = env["ST_SFTP_USER"] + password = env["ST_SFTP_PASS"] + remote_dir = env["ST_SFTP_UPLOAD_DIR"] + filename = Path(local_path).name + remote_path = f"{remote_dir}/{filename}" + + # Use sshpass + sftp + cmd = [ + "sshpass", "-p", password, + "sftp", "-P", port, + "-o", "StrictHostKeyChecking=no", + "-o", "BatchMode=no", + f"{user}@{host}" + ] + batch = f"put {local_path} {remote_path}\nquit\n" + if verbose: + print(f" SFTP: {local_path} → {user}@{host}:{remote_path}", file=sys.stderr) + result = subprocess.run(cmd, input=batch.encode(), capture_output=True, timeout=30) + return result.returncode == 0 + +def ssh_list_destination(env, verbose=False): + """List files in partner destination dir. Returns dict of {filename: size_bytes}.""" + host = env["PARTNER_SSH_HOST"] + port = env.get("PARTNER_SSH_PORT", "22") + user = env["PARTNER_SSH_USER"] + key = env.get("PARTNER_SSH_KEY", "") + dest_dir = env["PARTNER_DEST_DIR"] + + ssh_args = [ + "ssh", "-p", port, + "-o", "StrictHostKeyChecking=no", + ] + if key: + ssh_args += ["-i", key] + ssh_args += [f"{user}@{host}", f"ls -la {dest_dir}/"] + + if verbose: + print(f" SSH list: {user}@{host}:{dest_dir}", file=sys.stderr) + result = subprocess.run(ssh_args, capture_output=True, timeout=15) + if result.returncode != 0: + return {} + files = {} + for line in result.stdout.decode().splitlines(): + parts = line.split() + if len(parts) >= 9 and not line.startswith("total") and parts[0][0] != "d": + size = int(parts[4]) + fname = parts[8] + files[fname] = size + return files + +def ssh_read_bytes(env, filename, n=4, verbose=False): + """Read first n bytes of a file on the partner host. Returns bytes or None.""" + host = env["PARTNER_SSH_HOST"] + port = env.get("PARTNER_SSH_PORT", "22") + user = env["PARTNER_SSH_USER"] + key = env.get("PARTNER_SSH_KEY", "") + dest_dir = env["PARTNER_DEST_DIR"] + + ssh_args = ["ssh", "-p", port, "-o", "StrictHostKeyChecking=no"] + if key: + ssh_args += ["-i", key] + ssh_args += [f"{user}@{host}", f"dd if={dest_dir}/{filename} bs=1 count={n} 2>/dev/null | xxd -p"] + + result = subprocess.run(ssh_args, capture_output=True, timeout=10) + if result.returncode != 0: + return None + try: + hex_str = result.stdout.decode().strip().replace("\n", "") + return bytes.fromhex(hex_str) + except Exception: + return None + +def ssh_delete_files(env, filenames, verbose=False): + """Remove test artifacts from destination after run.""" + if not filenames: + return + host = env["PARTNER_SSH_HOST"] + port = env.get("PARTNER_SSH_PORT", "22") + user = env["PARTNER_SSH_USER"] + key = env.get("PARTNER_SSH_KEY", "") + dest_dir = env["PARTNER_DEST_DIR"] + names = " ".join(f"{dest_dir}/{f}" for f in filenames) + ssh_args = ["ssh", "-p", port, "-o", "StrictHostKeyChecking=no"] + if key: + ssh_args += ["-i", key] + ssh_args += [f"{user}@{host}", f"rm -f {names}"] + subprocess.run(ssh_args, capture_output=True, timeout=10) + +def verify_result(tc, dest_files, env, verbose): + expected = tc["expected_filename"] + fixture_path = FIXTURES_DIR / tc["fixture"] + fixture_size = fixture_path.stat().st_size + + if expected not in dest_files: + return False, f"Expected '{expected}' not found in destination. Present: {list(dest_files.keys())}" + + dest_size = dest_files[expected] + method = tc["verify"] + + if method == "exact_match": + if dest_size != fixture_size: + return False, f"Size mismatch: expected {fixture_size}B, got {dest_size}B" + return True, f"Pass-through confirmed: '{expected}' ({dest_size}B)" + + elif method == "suffix_and_larger": + if not expected.endswith(tc["suffix"]): + return False, f"Filename does not end with '{tc['suffix']}'" + if dest_size <= fixture_size: + return False, f"Compressed file ({dest_size}B) not larger than input ({fixture_size}B) — compression may have failed" + return True, f"Compressed: '{expected}' ({fixture_size}B → {dest_size}B)" + + elif method == "suffix_and_magic": + if not expected.endswith(tc["suffix"]): + return False, f"Filename does not end with '{tc['suffix']}'" + magic = tc.get("magic_byte") + if magic is not None: + first_bytes = ssh_read_bytes(env, expected, 1, verbose) + if first_bytes is None: + return False, "Could not read destination file bytes for magic check" + if first_bytes[0] != magic: + return False, f"Magic byte mismatch: expected 0x{magic:02X}, got 0x{first_bytes[0]:02X}" + return True, f"PGP encrypted: '{expected}' ({dest_size}B, magic byte OK)" + + return False, f"Unknown verify method: {method}" + +def run_tests(test_cases, env, verbose=False): + results = [] + timeout = int(env.get("ROUTING_TIMEOUT_SEC", "30")) + uploaded = [] + + for tc in test_cases: + fixture_path = FIXTURES_DIR / tc["fixture"] + if not fixture_path.exists(): + results.append({"id": tc["id"], "status": "ERROR", "message": f"Fixture not found: {fixture_path}"}) + continue + + print(f"\n [{tc['id']}] Uploading {tc['fixture']}...", file=sys.stderr) + ok = sftp_upload(env, str(fixture_path), verbose) + if not ok: + results.append({"id": tc["id"], "status": "ERROR", "message": "SFTP upload failed"}) + continue + uploaded.append(tc) + + # Wait for routing + if uploaded: + print(f"\n Waiting {timeout}s for routing...", file=sys.stderr) + time.sleep(timeout) + + # Verify all + dest_files = ssh_list_destination(env, verbose) + if verbose: + print(f" Destination files: {dest_files}", file=sys.stderr) + + artifacts_to_clean = [] + for tc in uploaded: + passed, message = verify_result(tc, dest_files, env, verbose) + status = "PASS" if passed else "FAIL" + results.append({ + "id": tc["id"], + "fixture": tc["fixture"], + "expected": tc["expected_filename"], + "status": status, + "message": message + }) + artifacts_to_clean.append(tc["expected_filename"]) + icon = "✅" if passed else "❌" + print(f" {icon} {tc['id']}: {message}", file=sys.stderr) + + # Cleanup destination + ssh_delete_files(env, artifacts_to_clean, verbose) + + return results + +def main(): + parser = argparse.ArgumentParser(description="Run ST AR flow test suite") + parser.add_argument("--spec", help="Flow spec YAML (used for context only)") + parser.add_argument("--env", default=".env", help="Env file path") + parser.add_argument("--fixture", help="Run only this fixture id") + parser.add_argument("--verbose", action="store_true") + parser.add_argument("--no-cleanup", action="store_true", help="Leave destination artifacts after run") + args = parser.parse_args() + + env = load_env(args.env) + require_env(env, + "ST_SFTP_HOST", "ST_SFTP_USER", "ST_SFTP_PASS", "ST_SFTP_UPLOAD_DIR", + "PARTNER_SSH_HOST", "PARTNER_SSH_USER", "PARTNER_DEST_DIR" + ) + + cases = TEST_CASES + if args.fixture: + cases = [tc for tc in TEST_CASES if tc["id"] == args.fixture] + if not cases: + print(f"Unknown fixture id: {args.fixture}. Valid: {[t['id'] for t in TEST_CASES]}", file=sys.stderr) + sys.exit(2) + + print(f"\nST Flow Test Run — {datetime.now(timezone.utc).isoformat()}", file=sys.stderr) + print(f"Running {len(cases)} test(s)\n", file=sys.stderr) + + results = run_tests(cases, env, args.verbose) + + passed = sum(1 for r in results if r["status"] == "PASS") + failed = sum(1 for r in results if r["status"] == "FAIL") + errors = sum(1 for r in results if r["status"] == "ERROR") + + print(f"\n{'='*40}", file=sys.stderr) + print(f"Results: {passed} passed, {failed} failed, {errors} errors", file=sys.stderr) + + output = { + "run_at": datetime.now(timezone.utc).isoformat(), + "total": len(results), + "passed": passed, + "failed": failed, + "errors": errors, + "results": results + } + print(json.dumps(output, indent=2)) + + sys.exit(0 if failed == 0 and errors == 0 else 1) + +if __name__ == "__main__": + main()