#!/usr/bin/env python3 """ ask-annie/ingest.py Given a Vimeo URL and a chapter list (JSON file), produces structured per-chapter knowledge chunks suitable for ingestion into knowledge-mcp. Usage: python3 ingest.py --url --chapters chapters.json --out out/ [--frames] Output: out//chunks.json — array of chapter chunks out//frames/ — extracted frame images (if --frames) out//transcript.json — full Whisper output (cached) Dependencies (must be on PATH): yt-dlp, ffmpeg, whisper """ import argparse import json import os import subprocess import sys import tempfile from pathlib import Path # Chapters where frame extraction adds real value (demo-heavy sections). # Identified by matching chapter title substrings (case-insensitive). DEMO_CHAPTER_KEYWORDS = [ "transcoding", "s3", "file tracking", "workbench", "new capability", "preview", "demo", "ui", "setup", ] def run(cmd, **kwargs): print(f" $ {' '.join(cmd)}", flush=True) result = subprocess.run(cmd, check=True, **kwargs) return result def seconds_to_hhmmss(s): s = int(s) h, rem = divmod(s, 3600) m, sec = divmod(rem, 60) return f"{h:02d}:{m:02d}:{sec:02d}" def parse_timestamp(ts_str): """Parse 'M:SS' or 'H:MM:SS' to seconds.""" parts = ts_str.strip().split(":") parts = [int(p) for p in parts] if len(parts) == 2: return parts[0] * 60 + parts[1] elif len(parts) == 3: return parts[0] * 3600 + parts[1] * 60 + parts[2] raise ValueError(f"Cannot parse timestamp: {ts_str}") def download_audio(url, out_dir): audio_path = out_dir / "audio.%(ext)s" run([ "yt-dlp", "--extract-audio", "--audio-format", "mp3", "--audio-quality", "3", # ~128kbps, good enough for speech "-o", str(audio_path), url, ]) # Find the downloaded file matches = list(out_dir.glob("audio.*")) if not matches: raise FileNotFoundError("Audio download failed — no audio.* file found") return matches[0] def transcribe(audio_path, out_dir, model="medium"): transcript_path = out_dir / "transcript.json" if transcript_path.exists(): print(f" [cache] Using existing transcript at {transcript_path}") with open(transcript_path) as f: return json.load(f) run([ "whisper", str(audio_path), "--model", model, "--output_format", "json", "--output_dir", str(out_dir), "--language", "en", "--word_timestamps", "True", ]) # Whisper names output after the input filename whisper_out = out_dir / (audio_path.stem + ".json") if not whisper_out.exists(): raise FileNotFoundError(f"Expected Whisper output at {whisper_out}") whisper_out.rename(transcript_path) with open(transcript_path) as f: return json.load(f) def extract_frame(video_path, timestamp_sec, out_path): run([ "ffmpeg", "-y", "-ss", str(timestamp_sec), "-i", str(video_path), "-frames:v", "1", "-q:v", "2", str(out_path), ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def is_demo_chapter(title): t = title.lower() return any(kw in t for kw in DEMO_CHAPTER_KEYWORDS) def segments_in_window(segments, start_sec, end_sec): """Extract transcript text for a time window from Whisper segments.""" texts = [] for seg in segments: seg_start = seg.get("start", 0) seg_end = seg.get("end", seg_start) # Include segment if it overlaps with the window if seg_end >= start_sec and seg_start <= end_sec: texts.append(seg["text"].strip()) return " ".join(texts) def describe_frame(image_path): """ Placeholder: returns a note that frame description needs Claude vision. In a later pass, replace this with an actual API call or output the image path for external processing. """ return f"[frame: {image_path.name} — vision description pending]" def main(): parser = argparse.ArgumentParser(description="Ingest Ask Annie Vimeo session") parser.add_argument("--url", required=True, help="Vimeo URL") parser.add_argument("--chapters", required=True, help="Path to chapters JSON file") parser.add_argument("--out", default="out", help="Output directory") parser.add_argument("--frames", action="store_true", help="Extract frames for demo chapters") parser.add_argument("--whisper-model", default="medium", help="Whisper model size") parser.add_argument("--video-id", default=None, help="Override video ID (extracted from URL if omitted)") args = parser.parse_args() # Derive video ID from URL video_id = args.video_id or args.url.rstrip("/").split("/")[-1].split("?")[0] print(f"\n=== Ask Annie Ingest: {video_id} ===\n") out_dir = Path(args.out) / video_id frames_dir = out_dir / "frames" out_dir.mkdir(parents=True, exist_ok=True) if args.frames: frames_dir.mkdir(exist_ok=True) # Load chapters with open(args.chapters) as f: chapters = json.load(f) print(f"Loaded {len(chapters)} chapters\n") # Step 1: Download audio print("=== Step 1: Download audio ===") audio_path = None for f in out_dir.glob("audio.*"): print(f" [cache] Found existing audio: {f}") audio_path = f break if audio_path is None: audio_path = download_audio(args.url, out_dir) print(f" Audio: {audio_path}\n") # Step 1b: Download video (only if --frames requested) video_path = None if args.frames: print("=== Step 1b: Download video (for frame extraction) ===") existing = list(out_dir.glob("video.*")) if existing: video_path = existing[0] print(f" [cache] Found existing video: {video_path}") else: vid_out = out_dir / "video.%(ext)s" run(["yt-dlp", "-f", "bestvideo[height<=720]", "-o", str(vid_out), args.url]) matches = list(out_dir.glob("video.*")) if not matches: print(" WARNING: Video download failed, skipping frame extraction") args.frames = False else: video_path = matches[0] print() # Step 2: Transcribe print("=== Step 2: Transcribe ===") transcript = transcribe(audio_path, out_dir, model=args.whisper_model) segments = transcript.get("segments", []) print(f" Got {len(segments)} transcript segments\n") # Step 3: Build chunks print("=== Step 3: Build chunks ===") chunks = [] for i, chapter in enumerate(chapters): start_sec = parse_timestamp(chapter["timestamp"]) # End = next chapter start, or +10min for last if i + 1 < len(chapters): end_sec = parse_timestamp(chapters[i + 1]["timestamp"]) else: end_sec = start_sec + 600 text = segments_in_window(segments, start_sec, end_sec) demo = is_demo_chapter(chapter["title"]) frame_desc = None if args.frames and demo and video_path: frame_file = frames_dir / f"chapter_{i:02d}.jpg" print(f" Extracting frame for: {chapter['title']}") extract_frame(video_path, start_sec + 5, frame_file) frame_desc = describe_frame(frame_file) chunk = { "video_id": video_id, "video_url": args.url, "chapter_index": i, "timestamp": chapter["timestamp"], "timestamp_sec": start_sec, "title": chapter["title"], "summary": chapter.get("summary", ""), "transcript": text, "is_demo": demo, "frame_description": frame_desc, "source": "ask-annie", "series": "ST Best Practices Q&A", } chunks.append(chunk) print(f" [{i:02d}] {chapter['timestamp']} — {chapter['title'][:60]} ({'demo' if demo else 'qa'})") # Step 4: Write output chunks_path = out_dir / "chunks.json" with open(chunks_path, "w") as f: json.dump(chunks, f, indent=2) print(f"\n=== Done: {len(chunks)} chunks → {chunks_path} ===\n") if __name__ == "__main__": main()