#!/usr/bin/env python3 """ ingest_to_rag.py Push ask-annie chunks.json into the knowledge-mcp RAG notebook. Usage: python3 ingest_to_rag.py --chunks out/1020102626/chunks.json \ --notebook securetransport-md \ --mcporter /path/to/mcporter.json """ import argparse import json import subprocess import sys import time from pathlib import Path def mcporter_call(config_path, tool, args_dict): """Call an MCP tool via mcporter CLI.""" cmd = [ "mcporter", "call", tool, "--config", config_path, "--args", json.dumps(args_dict), "--output", "json", ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"mcporter error: {result.stderr.strip()}") try: return json.loads(result.stdout) except json.JSONDecodeError: raise RuntimeError(f"mcporter bad output: {result.stdout!r}") def chunk_to_markdown(chunk): """Format a chunk as markdown for ingestion.""" lines = [ f"# {chunk['title']}", f"", f"**Series:** {chunk['series']}", f"**Video:** [{chunk['video_id']}]({chunk['video_url']}) @ {chunk['timestamp']}", f"**Type:** {'Demo/walkthrough' if chunk['is_demo'] else 'Q&A'}", f"", f"## Summary", f"", chunk['summary'], f"", ] if chunk.get('transcript'): lines += [ f"## Transcript", f"", chunk['transcript'], f"", ] return "\n".join(lines) def main(): parser = argparse.ArgumentParser(description="Ingest Ask Annie chunks into knowledge-mcp RAG") parser.add_argument("--chunks", required=True, help="Path to chunks.json") parser.add_argument("--notebook", default="securetransport-md", help="knowledge-mcp notebook ID") parser.add_argument("--mcporter", default="/home/node/.openclaw/workspace/config/mcporter.json", help="Path to mcporter config") parser.add_argument("--dry-run", action="store_true", help="Print what would be ingested, don't send") parser.add_argument("--skip", type=int, default=0, help="Skip first N chunks (resume)") args = parser.parse_args() with open(args.chunks) as f: chunks = json.load(f) print(f"Loaded {len(chunks)} chunks from {args.chunks}") print(f"Notebook: {args.notebook}") print(f"Dry run: {args.dry_run}") if args.skip: print(f"Skipping first {args.skip} chunks") print() success = 0 failed = 0 for i, chunk in enumerate(chunks): if i < args.skip: continue content = chunk_to_markdown(chunk) metadata = { "source": "ask-annie", "series": chunk["series"], "video_id": chunk["video_id"], "video_url": chunk["video_url"], "chapter_index": chunk["chapter_index"], "timestamp": chunk["timestamp"], "title": chunk["title"], "is_demo": chunk["is_demo"], } print(f"[{i:02d}/{len(chunks)-1}] {chunk['timestamp']} — {chunk['title'][:55]}") if args.dry_run: print(f" Would ingest {len(content)} chars") continue try: result = mcporter_call( args.mcporter, "knowledge-mcp.add_source", { "notebook_id": args.notebook, "content": content, "metadata": metadata, } ) print(f" ✅ ok") success += 1 except Exception as e: print(f" ❌ failed: {e}") failed += 1 # Brief pause to avoid hammering the API time.sleep(0.5) print(f"\n=== Done: {success} ingested, {failed} failed ===") if __name__ == "__main__": main()