diff --git a/ingest.py b/ingest.py index 98ae754..e6df956 100644 --- a/ingest.py +++ b/ingest.py @@ -6,28 +6,33 @@ Given a Vimeo URL and a chapter list (JSON file), produces structured per-chapter knowledge chunks suitable for ingestion into knowledge-mcp. Usage: - python3 ingest.py --url --chapters chapters.json --out out/ [--frames] + python3 ingest.py --url --chapters chapters/XXXX.json --out out/ [--frames] Output: - out//chunks.json — array of chapter chunks - out//frames/ — extracted frame images (if --frames) - out//transcript.json — full Whisper output (cached) + out//chunks.json — array of chapter chunks + out//frames/ — extracted frame images (if --frames) + out//transcript.json — full reassembled transcript (cached) + out//audio_chunks/ — per-chapter audio splits (cached) Dependencies (must be on PATH): - yt-dlp, ffmpeg, whisper + yt-dlp, ffmpeg + pip install requests """ import argparse import json +import math import os import subprocess import sys -import tempfile +import time +import requests from pathlib import Path +FISH_AUDIO_ASR_URL = "https://api.fish.audio/v1/asr" + # Chapters where frame extraction adds real value (demo-heavy sections). -# Identified by matching chapter title substrings (case-insensitive). DEMO_CHAPTER_KEYWORDS = [ "transcoding", "s3", @@ -40,20 +45,18 @@ DEMO_CHAPTER_KEYWORDS = [ "setup", ] +# Target chunk size in seconds — split audio into groups of chapters +# totalling no more than this before sending to ASR. +# 10 min = 600s keeps each chunk well under Fish Audio's size limit. +CHUNK_MAX_SECONDS = 600 + def run(cmd, **kwargs): - print(f" $ {' '.join(cmd)}", flush=True) + print(f" $ {' '.join(str(c) for c in cmd)}", flush=True) result = subprocess.run(cmd, check=True, **kwargs) return result -def seconds_to_hhmmss(s): - s = int(s) - h, rem = divmod(s, 3600) - m, sec = divmod(rem, 60) - return f"{h:02d}:{m:02d}:{sec:02d}" - - def parse_timestamp(ts_str): """Parse 'M:SS' or 'H:MM:SS' to seconds.""" parts = ts_str.strip().split(":") @@ -71,42 +74,170 @@ def download_audio(url, out_dir): "yt-dlp", "--extract-audio", "--audio-format", "mp3", - "--audio-quality", "3", # ~128kbps, good enough for speech + "--audio-quality", "3", "-o", str(audio_path), url, ]) - # Find the downloaded file - matches = list(out_dir.glob("audio.*")) + matches = [f for f in out_dir.glob("audio.*") if f.suffix != ".part"] if not matches: raise FileNotFoundError("Audio download failed — no audio.* file found") return matches[0] -def transcribe(audio_path, out_dir, model="medium"): +def get_audio_duration(audio_path): + """Return audio duration in seconds using ffprobe.""" + result = subprocess.run( + ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", str(audio_path)], + capture_output=True, text=True, check=True + ) + info = json.loads(result.stdout) + return float(info["format"]["duration"]) + + +def split_audio_chunk(audio_path, start_sec, end_sec, out_path): + """Extract a segment from audio_path into out_path using ffmpeg.""" + run([ + "ffmpeg", "-y", + "-ss", str(start_sec), + "-to", str(end_sec), + "-i", str(audio_path), + "-c", "copy", + str(out_path), + ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + +def transcribe_chunk_fish(chunk_path, api_key, start_offset_sec): + """ + Send a chunk to Fish Audio ASR. Returns a list of segments with + timestamps adjusted by start_offset_sec. + """ + print(f" → Fish Audio ASR: {chunk_path.name} (offset +{start_offset_sec}s)", flush=True) + with open(chunk_path, "rb") as f: + resp = requests.post( + FISH_AUDIO_ASR_URL, + headers={"Authorization": f"Bearer {api_key}"}, + files={"audio": (chunk_path.name, f, "audio/mpeg")}, + data={"language": "en", "ignore_timestamps": "false"}, + timeout=120, + ) + resp.raise_for_status() + data = resp.json() + + # Adjust timestamps by the chunk's start offset + segments = [] + for seg in data.get("segments", []): + segments.append({ + "start": seg["start"] + start_offset_sec, + "end": seg["end"] + start_offset_sec, + "text": seg["text"], + }) + + # If Fish returns flat text with no segments, synthesise one segment + if not segments and data.get("text"): + segments.append({ + "start": start_offset_sec, + "end": start_offset_sec + 60, + "text": data["text"], + }) + + return segments + + +def group_chapters_into_chunks(chapters, total_duration_sec): + """ + Group chapters into batches where each batch's audio is <= CHUNK_MAX_SECONDS. + Returns list of (start_sec, end_sec, [chapter_indices]). + """ + groups = [] + current_indices = [] + current_start = None + + for i, chapter in enumerate(chapters): + ch_start = parse_timestamp(chapter["timestamp"]) + if i + 1 < len(chapters): + ch_end = parse_timestamp(chapters[i + 1]["timestamp"]) + else: + ch_end = total_duration_sec + + if current_start is None: + current_start = ch_start + + # Would adding this chapter exceed the max chunk size? + if (ch_end - current_start) > CHUNK_MAX_SECONDS and current_indices: + # Flush current group + prev_end = parse_timestamp(chapters[current_indices[-1] + 1]["timestamp"]) \ + if current_indices[-1] + 1 < len(chapters) else total_duration_sec + groups.append((current_start, prev_end, list(current_indices))) + current_indices = [i] + current_start = ch_start + else: + current_indices.append(i) + + if current_indices: + groups.append((current_start, total_duration_sec, current_indices)) + + return groups + + +def transcribe_with_fish(audio_path, out_dir, chapters, api_key): + """ + Split audio by chapter groups, transcribe each with Fish Audio ASR, + reassemble into a unified segment list. Caches per-chunk results. + """ transcript_path = out_dir / "transcript.json" if transcript_path.exists(): print(f" [cache] Using existing transcript at {transcript_path}") with open(transcript_path) as f: return json.load(f) - run([ - "whisper", - str(audio_path), - "--model", model, - "--output_format", "json", - "--output_dir", str(out_dir), - "--language", "en", - "--word_timestamps", "True", - ]) + chunks_dir = out_dir / "audio_chunks" + chunks_dir.mkdir(exist_ok=True) - # Whisper names output after the input filename - whisper_out = out_dir / (audio_path.stem + ".json") - if not whisper_out.exists(): - raise FileNotFoundError(f"Expected Whisper output at {whisper_out}") - whisper_out.rename(transcript_path) + total_duration = get_audio_duration(audio_path) + print(f" Audio duration: {total_duration:.0f}s ({total_duration/60:.1f} min)") - with open(transcript_path) as f: - return json.load(f) + groups = group_chapters_into_chunks(chapters, total_duration) + print(f" Split into {len(groups)} chunk(s) for ASR\n") + + all_segments = [] + + for g_idx, (g_start, g_end, ch_indices) in enumerate(groups): + chunk_path = chunks_dir / f"chunk_{g_idx:02d}.mp3" + cache_path = chunks_dir / f"chunk_{g_idx:02d}.json" + + ch_titles = [chapters[i]["title"][:40] for i in ch_indices] + print(f" Chunk {g_idx}: {g_start:.0f}s–{g_end:.0f}s ({len(ch_indices)} chapters: {ch_titles[0]}…)") + + if cache_path.exists(): + print(f" [cache] Using cached ASR result") + with open(cache_path) as f: + segments = json.load(f) + else: + # Extract audio chunk + if not chunk_path.exists(): + split_audio_chunk(audio_path, g_start, g_end, chunk_path) + + # Send to Fish Audio + segments = transcribe_chunk_fish(chunk_path, api_key, g_start) + + # Cache the result + with open(cache_path, "w") as f: + json.dump(segments, f, indent=2) + + # Brief pause between API calls + if g_idx < len(groups) - 1: + time.sleep(1) + + all_segments.extend(segments) + print(f" Got {len(segments)} segments") + + # Save full reassembled transcript + transcript = {"segments": all_segments} + with open(transcript_path, "w") as f: + json.dump(transcript, f, indent=2) + + print(f"\n Total segments: {len(all_segments)}") + return transcript def extract_frame(video_path, timestamp_sec, out_path): @@ -126,37 +257,32 @@ def is_demo_chapter(title): def segments_in_window(segments, start_sec, end_sec): - """Extract transcript text for a time window from Whisper segments.""" + """Extract transcript text for a time window from segments.""" texts = [] for seg in segments: seg_start = seg.get("start", 0) seg_end = seg.get("end", seg_start) - # Include segment if it overlaps with the window if seg_end >= start_sec and seg_start <= end_sec: texts.append(seg["text"].strip()) return " ".join(texts) -def describe_frame(image_path): - """ - Placeholder: returns a note that frame description needs Claude vision. - In a later pass, replace this with an actual API call or output the - image path for external processing. - """ - return f"[frame: {image_path.name} — vision description pending]" - - def main(): parser = argparse.ArgumentParser(description="Ingest Ask Annie Vimeo session") parser.add_argument("--url", required=True, help="Vimeo URL") parser.add_argument("--chapters", required=True, help="Path to chapters JSON file") parser.add_argument("--out", default="out", help="Output directory") parser.add_argument("--frames", action="store_true", help="Extract frames for demo chapters") - parser.add_argument("--whisper-model", default="medium", help="Whisper model size") - parser.add_argument("--video-id", default=None, help="Override video ID (extracted from URL if omitted)") + parser.add_argument("--fish-api-key", default=os.environ.get("FISH_API_KEY", ""), + help="Fish Audio API key (or set FISH_API_KEY env var)") + parser.add_argument("--video-id", default=None, + help="Override video ID (extracted from URL if omitted)") args = parser.parse_args() - # Derive video ID from URL + if not args.fish_api_key: + print("ERROR: Fish Audio API key required. Pass --fish-api-key or set FISH_API_KEY env var.") + sys.exit(1) + video_id = args.video_id or args.url.rstrip("/").split("/")[-1].split("?")[0] print(f"\n=== Ask Annie Ingest: {video_id} ===\n") @@ -189,14 +315,14 @@ def main(): video_path = None if args.frames: print("=== Step 1b: Download video (for frame extraction) ===") - existing = list(out_dir.glob("video.*")) + existing = [f for f in out_dir.glob("video.*") if f.suffix != ".part"] if existing: video_path = existing[0] print(f" [cache] Found existing video: {video_path}") else: vid_out = out_dir / "video.%(ext)s" run(["yt-dlp", "-f", "bestvideo[height<=720]", "-o", str(vid_out), args.url]) - matches = list(out_dir.glob("video.*")) + matches = [f for f in out_dir.glob("video.*") if f.suffix != ".part"] if not matches: print(" WARNING: Video download failed, skipping frame extraction") args.frames = False @@ -204,9 +330,9 @@ def main(): video_path = matches[0] print() - # Step 2: Transcribe - print("=== Step 2: Transcribe ===") - transcript = transcribe(audio_path, out_dir, model=args.whisper_model) + # Step 2: Transcribe via Fish Audio + print("=== Step 2: Transcribe via Fish Audio ASR ===") + transcript = transcribe_with_fish(audio_path, out_dir, chapters, args.fish_api_key) segments = transcript.get("segments", []) print(f" Got {len(segments)} transcript segments\n") @@ -215,7 +341,6 @@ def main(): chunks = [] for i, chapter in enumerate(chapters): start_sec = parse_timestamp(chapter["timestamp"]) - # End = next chapter start, or +10min for last if i + 1 < len(chapters): end_sec = parse_timestamp(chapters[i + 1]["timestamp"]) else: @@ -229,7 +354,7 @@ def main(): frame_file = frames_dir / f"chapter_{i:02d}.jpg" print(f" Extracting frame for: {chapter['title']}") extract_frame(video_path, start_sec + 5, frame_file) - frame_desc = describe_frame(frame_file) + frame_desc = f"[frame: {frame_file.name} — vision description pending]" chunk = { "video_id": video_id,