Switch transcription to Fish Audio ASR with chapter-aligned chunking

This commit is contained in:
2026-03-24 00:27:45 +00:00
parent 9bc7ea9468
commit ce3ef32778

237
ingest.py
View File

@@ -6,28 +6,33 @@ Given a Vimeo URL and a chapter list (JSON file), produces structured
per-chapter knowledge chunks suitable for ingestion into knowledge-mcp. per-chapter knowledge chunks suitable for ingestion into knowledge-mcp.
Usage: Usage:
python3 ingest.py --url <vimeo_url> --chapters chapters.json --out out/ [--frames] python3 ingest.py --url <vimeo_url> --chapters chapters/XXXX.json --out out/ [--frames]
Output: Output:
out/<video_id>/chunks.json — array of chapter chunks out/<video_id>/chunks.json — array of chapter chunks
out/<video_id>/frames/ — extracted frame images (if --frames) out/<video_id>/frames/ — extracted frame images (if --frames)
out/<video_id>/transcript.json — full Whisper output (cached) out/<video_id>/transcript.json — full reassembled transcript (cached)
out/<video_id>/audio_chunks/ — per-chapter audio splits (cached)
Dependencies (must be on PATH): Dependencies (must be on PATH):
yt-dlp, ffmpeg, whisper yt-dlp, ffmpeg
pip install requests
""" """
import argparse import argparse
import json import json
import math
import os import os
import subprocess import subprocess
import sys import sys
import tempfile import time
import requests
from pathlib import Path from pathlib import Path
FISH_AUDIO_ASR_URL = "https://api.fish.audio/v1/asr"
# Chapters where frame extraction adds real value (demo-heavy sections). # Chapters where frame extraction adds real value (demo-heavy sections).
# Identified by matching chapter title substrings (case-insensitive).
DEMO_CHAPTER_KEYWORDS = [ DEMO_CHAPTER_KEYWORDS = [
"transcoding", "transcoding",
"s3", "s3",
@@ -40,20 +45,18 @@ DEMO_CHAPTER_KEYWORDS = [
"setup", "setup",
] ]
# Target chunk size in seconds — split audio into groups of chapters
# totalling no more than this before sending to ASR.
# 10 min = 600s keeps each chunk well under Fish Audio's size limit.
CHUNK_MAX_SECONDS = 600
def run(cmd, **kwargs): def run(cmd, **kwargs):
print(f" $ {' '.join(cmd)}", flush=True) print(f" $ {' '.join(str(c) for c in cmd)}", flush=True)
result = subprocess.run(cmd, check=True, **kwargs) result = subprocess.run(cmd, check=True, **kwargs)
return result return result
def seconds_to_hhmmss(s):
s = int(s)
h, rem = divmod(s, 3600)
m, sec = divmod(rem, 60)
return f"{h:02d}:{m:02d}:{sec:02d}"
def parse_timestamp(ts_str): def parse_timestamp(ts_str):
"""Parse 'M:SS' or 'H:MM:SS' to seconds.""" """Parse 'M:SS' or 'H:MM:SS' to seconds."""
parts = ts_str.strip().split(":") parts = ts_str.strip().split(":")
@@ -71,42 +74,170 @@ def download_audio(url, out_dir):
"yt-dlp", "yt-dlp",
"--extract-audio", "--extract-audio",
"--audio-format", "mp3", "--audio-format", "mp3",
"--audio-quality", "3", # ~128kbps, good enough for speech "--audio-quality", "3",
"-o", str(audio_path), "-o", str(audio_path),
url, url,
]) ])
# Find the downloaded file matches = [f for f in out_dir.glob("audio.*") if f.suffix != ".part"]
matches = list(out_dir.glob("audio.*"))
if not matches: if not matches:
raise FileNotFoundError("Audio download failed — no audio.* file found") raise FileNotFoundError("Audio download failed — no audio.* file found")
return matches[0] return matches[0]
def transcribe(audio_path, out_dir, model="medium"): def get_audio_duration(audio_path):
"""Return audio duration in seconds using ffprobe."""
result = subprocess.run(
["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", str(audio_path)],
capture_output=True, text=True, check=True
)
info = json.loads(result.stdout)
return float(info["format"]["duration"])
def split_audio_chunk(audio_path, start_sec, end_sec, out_path):
"""Extract a segment from audio_path into out_path using ffmpeg."""
run([
"ffmpeg", "-y",
"-ss", str(start_sec),
"-to", str(end_sec),
"-i", str(audio_path),
"-c", "copy",
str(out_path),
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def transcribe_chunk_fish(chunk_path, api_key, start_offset_sec):
"""
Send a chunk to Fish Audio ASR. Returns a list of segments with
timestamps adjusted by start_offset_sec.
"""
print(f" → Fish Audio ASR: {chunk_path.name} (offset +{start_offset_sec}s)", flush=True)
with open(chunk_path, "rb") as f:
resp = requests.post(
FISH_AUDIO_ASR_URL,
headers={"Authorization": f"Bearer {api_key}"},
files={"audio": (chunk_path.name, f, "audio/mpeg")},
data={"language": "en", "ignore_timestamps": "false"},
timeout=120,
)
resp.raise_for_status()
data = resp.json()
# Adjust timestamps by the chunk's start offset
segments = []
for seg in data.get("segments", []):
segments.append({
"start": seg["start"] + start_offset_sec,
"end": seg["end"] + start_offset_sec,
"text": seg["text"],
})
# If Fish returns flat text with no segments, synthesise one segment
if not segments and data.get("text"):
segments.append({
"start": start_offset_sec,
"end": start_offset_sec + 60,
"text": data["text"],
})
return segments
def group_chapters_into_chunks(chapters, total_duration_sec):
"""
Group chapters into batches where each batch's audio is <= CHUNK_MAX_SECONDS.
Returns list of (start_sec, end_sec, [chapter_indices]).
"""
groups = []
current_indices = []
current_start = None
for i, chapter in enumerate(chapters):
ch_start = parse_timestamp(chapter["timestamp"])
if i + 1 < len(chapters):
ch_end = parse_timestamp(chapters[i + 1]["timestamp"])
else:
ch_end = total_duration_sec
if current_start is None:
current_start = ch_start
# Would adding this chapter exceed the max chunk size?
if (ch_end - current_start) > CHUNK_MAX_SECONDS and current_indices:
# Flush current group
prev_end = parse_timestamp(chapters[current_indices[-1] + 1]["timestamp"]) \
if current_indices[-1] + 1 < len(chapters) else total_duration_sec
groups.append((current_start, prev_end, list(current_indices)))
current_indices = [i]
current_start = ch_start
else:
current_indices.append(i)
if current_indices:
groups.append((current_start, total_duration_sec, current_indices))
return groups
def transcribe_with_fish(audio_path, out_dir, chapters, api_key):
"""
Split audio by chapter groups, transcribe each with Fish Audio ASR,
reassemble into a unified segment list. Caches per-chunk results.
"""
transcript_path = out_dir / "transcript.json" transcript_path = out_dir / "transcript.json"
if transcript_path.exists(): if transcript_path.exists():
print(f" [cache] Using existing transcript at {transcript_path}") print(f" [cache] Using existing transcript at {transcript_path}")
with open(transcript_path) as f: with open(transcript_path) as f:
return json.load(f) return json.load(f)
run([ chunks_dir = out_dir / "audio_chunks"
"whisper", chunks_dir.mkdir(exist_ok=True)
str(audio_path),
"--model", model,
"--output_format", "json",
"--output_dir", str(out_dir),
"--language", "en",
"--word_timestamps", "True",
])
# Whisper names output after the input filename total_duration = get_audio_duration(audio_path)
whisper_out = out_dir / (audio_path.stem + ".json") print(f" Audio duration: {total_duration:.0f}s ({total_duration/60:.1f} min)")
if not whisper_out.exists():
raise FileNotFoundError(f"Expected Whisper output at {whisper_out}")
whisper_out.rename(transcript_path)
with open(transcript_path) as f: groups = group_chapters_into_chunks(chapters, total_duration)
return json.load(f) print(f" Split into {len(groups)} chunk(s) for ASR\n")
all_segments = []
for g_idx, (g_start, g_end, ch_indices) in enumerate(groups):
chunk_path = chunks_dir / f"chunk_{g_idx:02d}.mp3"
cache_path = chunks_dir / f"chunk_{g_idx:02d}.json"
ch_titles = [chapters[i]["title"][:40] for i in ch_indices]
print(f" Chunk {g_idx}: {g_start:.0f}s{g_end:.0f}s ({len(ch_indices)} chapters: {ch_titles[0]}…)")
if cache_path.exists():
print(f" [cache] Using cached ASR result")
with open(cache_path) as f:
segments = json.load(f)
else:
# Extract audio chunk
if not chunk_path.exists():
split_audio_chunk(audio_path, g_start, g_end, chunk_path)
# Send to Fish Audio
segments = transcribe_chunk_fish(chunk_path, api_key, g_start)
# Cache the result
with open(cache_path, "w") as f:
json.dump(segments, f, indent=2)
# Brief pause between API calls
if g_idx < len(groups) - 1:
time.sleep(1)
all_segments.extend(segments)
print(f" Got {len(segments)} segments")
# Save full reassembled transcript
transcript = {"segments": all_segments}
with open(transcript_path, "w") as f:
json.dump(transcript, f, indent=2)
print(f"\n Total segments: {len(all_segments)}")
return transcript
def extract_frame(video_path, timestamp_sec, out_path): def extract_frame(video_path, timestamp_sec, out_path):
@@ -126,37 +257,32 @@ def is_demo_chapter(title):
def segments_in_window(segments, start_sec, end_sec): def segments_in_window(segments, start_sec, end_sec):
"""Extract transcript text for a time window from Whisper segments.""" """Extract transcript text for a time window from segments."""
texts = [] texts = []
for seg in segments: for seg in segments:
seg_start = seg.get("start", 0) seg_start = seg.get("start", 0)
seg_end = seg.get("end", seg_start) seg_end = seg.get("end", seg_start)
# Include segment if it overlaps with the window
if seg_end >= start_sec and seg_start <= end_sec: if seg_end >= start_sec and seg_start <= end_sec:
texts.append(seg["text"].strip()) texts.append(seg["text"].strip())
return " ".join(texts) return " ".join(texts)
def describe_frame(image_path):
"""
Placeholder: returns a note that frame description needs Claude vision.
In a later pass, replace this with an actual API call or output the
image path for external processing.
"""
return f"[frame: {image_path.name} — vision description pending]"
def main(): def main():
parser = argparse.ArgumentParser(description="Ingest Ask Annie Vimeo session") parser = argparse.ArgumentParser(description="Ingest Ask Annie Vimeo session")
parser.add_argument("--url", required=True, help="Vimeo URL") parser.add_argument("--url", required=True, help="Vimeo URL")
parser.add_argument("--chapters", required=True, help="Path to chapters JSON file") parser.add_argument("--chapters", required=True, help="Path to chapters JSON file")
parser.add_argument("--out", default="out", help="Output directory") parser.add_argument("--out", default="out", help="Output directory")
parser.add_argument("--frames", action="store_true", help="Extract frames for demo chapters") parser.add_argument("--frames", action="store_true", help="Extract frames for demo chapters")
parser.add_argument("--whisper-model", default="medium", help="Whisper model size") parser.add_argument("--fish-api-key", default=os.environ.get("FISH_API_KEY", ""),
parser.add_argument("--video-id", default=None, help="Override video ID (extracted from URL if omitted)") help="Fish Audio API key (or set FISH_API_KEY env var)")
parser.add_argument("--video-id", default=None,
help="Override video ID (extracted from URL if omitted)")
args = parser.parse_args() args = parser.parse_args()
# Derive video ID from URL if not args.fish_api_key:
print("ERROR: Fish Audio API key required. Pass --fish-api-key or set FISH_API_KEY env var.")
sys.exit(1)
video_id = args.video_id or args.url.rstrip("/").split("/")[-1].split("?")[0] video_id = args.video_id or args.url.rstrip("/").split("/")[-1].split("?")[0]
print(f"\n=== Ask Annie Ingest: {video_id} ===\n") print(f"\n=== Ask Annie Ingest: {video_id} ===\n")
@@ -189,14 +315,14 @@ def main():
video_path = None video_path = None
if args.frames: if args.frames:
print("=== Step 1b: Download video (for frame extraction) ===") print("=== Step 1b: Download video (for frame extraction) ===")
existing = list(out_dir.glob("video.*")) existing = [f for f in out_dir.glob("video.*") if f.suffix != ".part"]
if existing: if existing:
video_path = existing[0] video_path = existing[0]
print(f" [cache] Found existing video: {video_path}") print(f" [cache] Found existing video: {video_path}")
else: else:
vid_out = out_dir / "video.%(ext)s" vid_out = out_dir / "video.%(ext)s"
run(["yt-dlp", "-f", "bestvideo[height<=720]", "-o", str(vid_out), args.url]) run(["yt-dlp", "-f", "bestvideo[height<=720]", "-o", str(vid_out), args.url])
matches = list(out_dir.glob("video.*")) matches = [f for f in out_dir.glob("video.*") if f.suffix != ".part"]
if not matches: if not matches:
print(" WARNING: Video download failed, skipping frame extraction") print(" WARNING: Video download failed, skipping frame extraction")
args.frames = False args.frames = False
@@ -204,9 +330,9 @@ def main():
video_path = matches[0] video_path = matches[0]
print() print()
# Step 2: Transcribe # Step 2: Transcribe via Fish Audio
print("=== Step 2: Transcribe ===") print("=== Step 2: Transcribe via Fish Audio ASR ===")
transcript = transcribe(audio_path, out_dir, model=args.whisper_model) transcript = transcribe_with_fish(audio_path, out_dir, chapters, args.fish_api_key)
segments = transcript.get("segments", []) segments = transcript.get("segments", [])
print(f" Got {len(segments)} transcript segments\n") print(f" Got {len(segments)} transcript segments\n")
@@ -215,7 +341,6 @@ def main():
chunks = [] chunks = []
for i, chapter in enumerate(chapters): for i, chapter in enumerate(chapters):
start_sec = parse_timestamp(chapter["timestamp"]) start_sec = parse_timestamp(chapter["timestamp"])
# End = next chapter start, or +10min for last
if i + 1 < len(chapters): if i + 1 < len(chapters):
end_sec = parse_timestamp(chapters[i + 1]["timestamp"]) end_sec = parse_timestamp(chapters[i + 1]["timestamp"])
else: else:
@@ -229,7 +354,7 @@ def main():
frame_file = frames_dir / f"chapter_{i:02d}.jpg" frame_file = frames_dir / f"chapter_{i:02d}.jpg"
print(f" Extracting frame for: {chapter['title']}") print(f" Extracting frame for: {chapter['title']}")
extract_frame(video_path, start_sec + 5, frame_file) extract_frame(video_path, start_sec + 5, frame_file)
frame_desc = describe_frame(frame_file) frame_desc = f"[frame: {frame_file.name} — vision description pending]"
chunk = { chunk = {
"video_id": video_id, "video_id": video_id,