#!/usr/bin/env python3 """ ask-annie/ingest.py Given a Vimeo URL and a chapter list (JSON file), produces structured per-chapter knowledge chunks suitable for ingestion into knowledge-mcp. Usage: python3 ingest.py --url --chapters chapters/XXXX.json --out out/ [--frames] Output: out//chunks.json — array of chapter chunks out//frames/ — extracted frame images (if --frames) out//transcript.json — full reassembled transcript (cached) out//audio_chunks/ — per-chapter audio splits (cached) Dependencies (must be on PATH): yt-dlp, ffmpeg pip install requests """ import argparse import json import math import os import subprocess import sys import time import urllib.request import uuid from pathlib import Path FISH_AUDIO_ASR_URL = "https://api.fish.audio/v1/asr" # Chapters where frame extraction adds real value (demo-heavy sections). DEMO_CHAPTER_KEYWORDS = [ "transcoding", "s3", "file tracking", "workbench", "new capability", "preview", "demo", "ui", "setup", ] # Target chunk size in seconds — split audio into groups of chapters # totalling no more than this before sending to ASR. # 10 min = 600s keeps each chunk well under Fish Audio's size limit. CHUNK_MAX_SECONDS = 600 def run(cmd, **kwargs): print(f" $ {' '.join(str(c) for c in cmd)}", flush=True) result = subprocess.run(cmd, check=True, **kwargs) return result def parse_timestamp(ts_str): """Parse 'M:SS' or 'H:MM:SS' to seconds.""" parts = ts_str.strip().split(":") parts = [int(p) for p in parts] if len(parts) == 2: return parts[0] * 60 + parts[1] elif len(parts) == 3: return parts[0] * 3600 + parts[1] * 60 + parts[2] raise ValueError(f"Cannot parse timestamp: {ts_str}") def download_audio(url, out_dir): audio_path = out_dir / "audio.%(ext)s" run([ "yt-dlp", "--extract-audio", "--audio-format", "mp3", "--audio-quality", "3", "-o", str(audio_path), url, ]) matches = [f for f in out_dir.glob("audio.*") if f.suffix != ".part"] if not matches: raise FileNotFoundError("Audio download failed — no audio.* file found") return matches[0] def get_audio_duration(audio_path): """Return audio duration in seconds using ffprobe.""" result = subprocess.run( ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", str(audio_path)], capture_output=True, text=True, check=True ) info = json.loads(result.stdout) return float(info["format"]["duration"]) def split_audio_chunk(audio_path, start_sec, end_sec, out_path): """Extract a segment from audio_path into out_path using ffmpeg.""" run([ "ffmpeg", "-y", "-ss", str(start_sec), "-to", str(end_sec), "-i", str(audio_path), "-c", "copy", str(out_path), ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def transcribe_chunk_fish(chunk_path, api_key, start_offset_sec): """ Send a chunk to Fish Audio ASR. Returns a list of segments with timestamps adjusted by start_offset_sec. Uses stdlib urllib + manual multipart encoding (no requests dep). """ print(f" → Fish Audio ASR: {chunk_path.name} (offset +{start_offset_sec}s)", flush=True) # Build multipart/form-data manually boundary = uuid.uuid4().hex with open(chunk_path, "rb") as f: audio_data = f.read() def field(name, value): return ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="{name}"\r\n\r\n' f"{value}\r\n" ).encode() body = ( field("language", "en") + field("ignore_timestamps", "false") + f"--{boundary}\r\n".encode() + f'Content-Disposition: form-data; name="audio"; filename="{chunk_path.name}"\r\n'.encode() + b"Content-Type: audio/mpeg\r\n\r\n" + audio_data + f"\r\n--{boundary}--\r\n".encode() ) req = urllib.request.Request( FISH_AUDIO_ASR_URL, data=body, headers={ "Authorization": f"Bearer {api_key}", "Content-Type": f"multipart/form-data; boundary={boundary}", }, method="POST", ) with urllib.request.urlopen(req, timeout=300) as resp: data = json.loads(resp.read().decode()) # Adjust timestamps by the chunk's start offset segments = [] for seg in data.get("segments", []): segments.append({ "start": seg["start"] + start_offset_sec, "end": seg["end"] + start_offset_sec, "text": seg["text"], }) # If Fish returns flat text with no segments, synthesise one segment if not segments and data.get("text"): segments.append({ "start": start_offset_sec, "end": start_offset_sec + 60, "text": data["text"], }) return segments def group_chapters_into_chunks(chapters, total_duration_sec): """ Group chapters into batches where each batch's audio is <= CHUNK_MAX_SECONDS. Returns list of (start_sec, end_sec, [chapter_indices]). """ groups = [] current_indices = [] current_start = None for i, chapter in enumerate(chapters): ch_start = parse_timestamp(chapter["timestamp"]) if i + 1 < len(chapters): ch_end = parse_timestamp(chapters[i + 1]["timestamp"]) else: ch_end = total_duration_sec if current_start is None: current_start = ch_start # Would adding this chapter exceed the max chunk size? if (ch_end - current_start) > CHUNK_MAX_SECONDS and current_indices: # Flush current group prev_end = parse_timestamp(chapters[current_indices[-1] + 1]["timestamp"]) \ if current_indices[-1] + 1 < len(chapters) else total_duration_sec groups.append((current_start, prev_end, list(current_indices))) current_indices = [i] current_start = ch_start else: current_indices.append(i) if current_indices: groups.append((current_start, total_duration_sec, current_indices)) return groups def transcribe_with_fish(audio_path, out_dir, chapters, api_key): """ Split audio by chapter groups, transcribe each with Fish Audio ASR, reassemble into a unified segment list. Caches per-chunk results. """ transcript_path = out_dir / "transcript.json" if transcript_path.exists(): print(f" [cache] Using existing transcript at {transcript_path}") with open(transcript_path) as f: return json.load(f) chunks_dir = out_dir / "audio_chunks" chunks_dir.mkdir(exist_ok=True) total_duration = get_audio_duration(audio_path) print(f" Audio duration: {total_duration:.0f}s ({total_duration/60:.1f} min)") groups = group_chapters_into_chunks(chapters, total_duration) print(f" Split into {len(groups)} chunk(s) for ASR\n") all_segments = [] for g_idx, (g_start, g_end, ch_indices) in enumerate(groups): chunk_path = chunks_dir / f"chunk_{g_idx:02d}.mp3" cache_path = chunks_dir / f"chunk_{g_idx:02d}.json" ch_titles = [chapters[i]["title"][:40] for i in ch_indices] print(f" Chunk {g_idx}: {g_start:.0f}s–{g_end:.0f}s ({len(ch_indices)} chapters: {ch_titles[0]}…)") if cache_path.exists(): print(f" [cache] Using cached ASR result") with open(cache_path) as f: segments = json.load(f) else: # Extract audio chunk if not chunk_path.exists(): split_audio_chunk(audio_path, g_start, g_end, chunk_path) # Send to Fish Audio segments = transcribe_chunk_fish(chunk_path, api_key, g_start) # Cache the result with open(cache_path, "w") as f: json.dump(segments, f, indent=2) # Brief pause between API calls if g_idx < len(groups) - 1: time.sleep(1) all_segments.extend(segments) print(f" Got {len(segments)} segments") # Save full reassembled transcript transcript = {"segments": all_segments} with open(transcript_path, "w") as f: json.dump(transcript, f, indent=2) print(f"\n Total segments: {len(all_segments)}") return transcript def extract_frame(video_path, timestamp_sec, out_path): run([ "ffmpeg", "-y", "-ss", str(timestamp_sec), "-i", str(video_path), "-frames:v", "1", "-q:v", "2", str(out_path), ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def is_demo_chapter(title): t = title.lower() return any(kw in t for kw in DEMO_CHAPTER_KEYWORDS) def segments_in_window(segments, start_sec, end_sec): """Extract transcript text for a time window from segments.""" texts = [] for seg in segments: seg_start = seg.get("start", 0) seg_end = seg.get("end", seg_start) if seg_end >= start_sec and seg_start <= end_sec: texts.append(seg["text"].strip()) return " ".join(texts) def main(): parser = argparse.ArgumentParser(description="Ingest Ask Annie Vimeo session") parser.add_argument("--url", required=True, help="Vimeo URL") parser.add_argument("--chapters", required=True, help="Path to chapters JSON file") parser.add_argument("--out", default="out", help="Output directory") parser.add_argument("--frames", action="store_true", help="Extract frames for demo chapters") parser.add_argument("--fish-api-key", default=os.environ.get("FISH_API_KEY", ""), help="Fish Audio API key (or set FISH_API_KEY env var)") parser.add_argument("--video-id", default=None, help="Override video ID (extracted from URL if omitted)") args = parser.parse_args() if not args.fish_api_key: print("ERROR: Fish Audio API key required. Pass --fish-api-key or set FISH_API_KEY env var.") sys.exit(1) video_id = args.video_id or args.url.rstrip("/").split("/")[-1].split("?")[0] print(f"\n=== Ask Annie Ingest: {video_id} ===\n") out_dir = Path(args.out) / video_id frames_dir = out_dir / "frames" out_dir.mkdir(parents=True, exist_ok=True) if args.frames: frames_dir.mkdir(exist_ok=True) # Load chapters with open(args.chapters) as f: chapters = json.load(f) print(f"Loaded {len(chapters)} chapters\n") # Step 1: Download audio print("=== Step 1: Download audio ===") audio_path = None for f in out_dir.glob("audio.*"): if f.suffix == ".part": print(f" [skip] Ignoring partial download: {f}") continue print(f" [cache] Found existing audio: {f}") audio_path = f break if audio_path is None: audio_path = download_audio(args.url, out_dir) print(f" Audio: {audio_path}\n") # Step 1b: Download video (only if --frames requested) video_path = None if args.frames: print("=== Step 1b: Download video (for frame extraction) ===") existing = [f for f in out_dir.glob("video.*") if f.suffix != ".part"] if existing: video_path = existing[0] print(f" [cache] Found existing video: {video_path}") else: vid_out = out_dir / "video.%(ext)s" run(["yt-dlp", "-f", "bestvideo[height<=720]", "-o", str(vid_out), args.url]) matches = [f for f in out_dir.glob("video.*") if f.suffix != ".part"] if not matches: print(" WARNING: Video download failed, skipping frame extraction") args.frames = False else: video_path = matches[0] print() # Step 2: Transcribe via Fish Audio print("=== Step 2: Transcribe via Fish Audio ASR ===") transcript = transcribe_with_fish(audio_path, out_dir, chapters, args.fish_api_key) segments = transcript.get("segments", []) print(f" Got {len(segments)} transcript segments\n") # Step 3: Build chunks print("=== Step 3: Build chunks ===") chunks = [] for i, chapter in enumerate(chapters): start_sec = parse_timestamp(chapter["timestamp"]) if i + 1 < len(chapters): end_sec = parse_timestamp(chapters[i + 1]["timestamp"]) else: end_sec = start_sec + 600 text = segments_in_window(segments, start_sec, end_sec) demo = is_demo_chapter(chapter["title"]) frame_desc = None if args.frames and demo and video_path: frame_file = frames_dir / f"chapter_{i:02d}.jpg" print(f" Extracting frame for: {chapter['title']}") extract_frame(video_path, start_sec + 5, frame_file) frame_desc = f"[frame: {frame_file.name} — vision description pending]" chunk = { "video_id": video_id, "video_url": args.url, "chapter_index": i, "timestamp": chapter["timestamp"], "timestamp_sec": start_sec, "title": chapter["title"], "summary": chapter.get("summary", ""), "transcript": text, "is_demo": demo, "frame_description": frame_desc, "source": "ask-annie", "series": "ST Best Practices Q&A", } chunks.append(chunk) print(f" [{i:02d}] {chapter['timestamp']} — {chapter['title'][:60]} ({'demo' if demo else 'qa'})") # Step 4: Write output chunks_path = out_dir / "chunks.json" with open(chunks_path, "w") as f: json.dump(chunks, f, indent=2) print(f"\n=== Done: {len(chunks)} chunks → {chunks_path} ===\n") if __name__ == "__main__": main()