Add RAG ingestion script

This commit is contained in:
2026-03-24 03:28:59 +00:00
parent eff844cde5
commit c8450345e4

130
ingest_to_rag.py Normal file
View File

@@ -0,0 +1,130 @@
#!/usr/bin/env python3
"""
ingest_to_rag.py
Push ask-annie chunks.json into the knowledge-mcp RAG notebook.
Usage:
python3 ingest_to_rag.py --chunks out/1020102626/chunks.json \
--notebook securetransport-md \
--mcporter /path/to/mcporter.json
"""
import argparse
import json
import subprocess
import sys
import time
from pathlib import Path
def mcporter_call(config_path, tool, args_dict):
"""Call an MCP tool via mcporter CLI."""
cmd = [
"mcporter", "call", tool,
"--config", config_path,
"--args", json.dumps(args_dict),
"--output", "json",
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"mcporter error: {result.stderr.strip()}")
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
raise RuntimeError(f"mcporter bad output: {result.stdout!r}")
def chunk_to_markdown(chunk):
"""Format a chunk as markdown for ingestion."""
lines = [
f"# {chunk['title']}",
f"",
f"**Series:** {chunk['series']}",
f"**Video:** [{chunk['video_id']}]({chunk['video_url']}) @ {chunk['timestamp']}",
f"**Type:** {'Demo/walkthrough' if chunk['is_demo'] else 'Q&A'}",
f"",
f"## Summary",
f"",
chunk['summary'],
f"",
]
if chunk.get('transcript'):
lines += [
f"## Transcript",
f"",
chunk['transcript'],
f"",
]
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Ingest Ask Annie chunks into knowledge-mcp RAG")
parser.add_argument("--chunks", required=True, help="Path to chunks.json")
parser.add_argument("--notebook", default="securetransport-md", help="knowledge-mcp notebook ID")
parser.add_argument("--mcporter", default="/home/node/.openclaw/workspace/config/mcporter.json",
help="Path to mcporter config")
parser.add_argument("--dry-run", action="store_true", help="Print what would be ingested, don't send")
parser.add_argument("--skip", type=int, default=0, help="Skip first N chunks (resume)")
args = parser.parse_args()
with open(args.chunks) as f:
chunks = json.load(f)
print(f"Loaded {len(chunks)} chunks from {args.chunks}")
print(f"Notebook: {args.notebook}")
print(f"Dry run: {args.dry_run}")
if args.skip:
print(f"Skipping first {args.skip} chunks")
print()
success = 0
failed = 0
for i, chunk in enumerate(chunks):
if i < args.skip:
continue
content = chunk_to_markdown(chunk)
metadata = {
"source": "ask-annie",
"series": chunk["series"],
"video_id": chunk["video_id"],
"video_url": chunk["video_url"],
"chapter_index": chunk["chapter_index"],
"timestamp": chunk["timestamp"],
"title": chunk["title"],
"is_demo": chunk["is_demo"],
}
print(f"[{i:02d}/{len(chunks)-1}] {chunk['timestamp']}{chunk['title'][:55]}")
if args.dry_run:
print(f" Would ingest {len(content)} chars")
continue
try:
result = mcporter_call(
args.mcporter,
"knowledge-mcp.add_source",
{
"notebook_id": args.notebook,
"content": content,
"metadata": metadata,
}
)
print(f" ✅ ok")
success += 1
except Exception as e:
print(f" ❌ failed: {e}")
failed += 1
# Brief pause to avoid hammering the API
time.sleep(0.5)
print(f"\n=== Done: {success} ingested, {failed} failed ===")
if __name__ == "__main__":
main()