Files
ask-annie/ingest.py

394 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
ask-annie/ingest.py
Given a Vimeo URL and a chapter list (JSON file), produces structured
per-chapter knowledge chunks suitable for ingestion into knowledge-mcp.
Usage:
python3 ingest.py --url <vimeo_url> --chapters chapters/XXXX.json --out out/ [--frames]
Output:
out/<video_id>/chunks.json — array of chapter chunks
out/<video_id>/frames/ — extracted frame images (if --frames)
out/<video_id>/transcript.json — full reassembled transcript (cached)
out/<video_id>/audio_chunks/ — per-chapter audio splits (cached)
Dependencies (must be on PATH):
yt-dlp, ffmpeg
pip install requests
"""
import argparse
import json
import math
import os
import subprocess
import sys
import time
from pathlib import Path
# Chapters where frame extraction adds real value (demo-heavy sections).
DEMO_CHAPTER_KEYWORDS = [
"transcoding",
"s3",
"file tracking",
"workbench",
"new capability",
"preview",
"demo",
"ui",
"setup",
]
# Target chunk size in seconds — split audio into groups of chapters
# totalling no more than this before sending to ASR.
# 10 min = 600s keeps each chunk well under Fish Audio's size limit.
CHUNK_MAX_SECONDS = 600
def run(cmd, **kwargs):
print(f" $ {' '.join(str(c) for c in cmd)}", flush=True)
result = subprocess.run(cmd, check=True, **kwargs)
return result
def parse_timestamp(ts_str):
"""Parse 'M:SS' or 'H:MM:SS' to seconds."""
parts = ts_str.strip().split(":")
parts = [int(p) for p in parts]
if len(parts) == 2:
return parts[0] * 60 + parts[1]
elif len(parts) == 3:
return parts[0] * 3600 + parts[1] * 60 + parts[2]
raise ValueError(f"Cannot parse timestamp: {ts_str}")
def download_audio(url, out_dir):
audio_path = out_dir / "audio.%(ext)s"
run([
"yt-dlp",
"--extract-audio",
"--audio-format", "mp3",
"--audio-quality", "3",
"-o", str(audio_path),
url,
])
matches = [f for f in out_dir.glob("audio.*") if f.suffix != ".part"]
if not matches:
raise FileNotFoundError("Audio download failed — no audio.* file found")
return matches[0]
def get_audio_duration(audio_path):
"""Return audio duration in seconds using ffprobe."""
result = subprocess.run(
["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", str(audio_path)],
capture_output=True, text=True, check=True
)
info = json.loads(result.stdout)
return float(info["format"]["duration"])
def split_audio_chunk(audio_path, start_sec, end_sec, out_path):
"""Extract a segment from audio_path into out_path using ffmpeg."""
run([
"ffmpeg", "-y",
"-ss", str(start_sec),
"-to", str(end_sec),
"-i", str(audio_path),
"-c", "copy",
str(out_path),
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def transcribe_chunk_fish(chunk_path, api_key, start_offset_sec):
"""
Send a chunk to Fish Audio ASR via the official SDK.
Returns a list of segments with timestamps adjusted by start_offset_sec.
Note: Fish Audio returns timestamps in milliseconds.
"""
import httpx
print(f" → Fish Audio ASR: {chunk_path.name} (offset +{start_offset_sec}s)", flush=True)
with open(chunk_path, "rb") as f:
audio_bytes = f.read()
print(f" Sending {len(audio_bytes)/1024/1024:.1f}MB to Fish Audio...", flush=True)
with httpx.Client(timeout=300) as client:
resp = client.post(
"https://api.fish.audio/v1/asr",
headers={"Authorization": f"Bearer {api_key}"},
files={"audio": (chunk_path.name, audio_bytes, "audio/mpeg")},
data={"language": "en", "ignore_timestamps": "false"},
)
print(f" HTTP {resp.status_code}", flush=True)
if resp.status_code != 200:
raise RuntimeError(f"Fish Audio ASR error {resp.status_code}: {resp.text!r}")
data = resp.json()
# Fish Audio returns timestamps in milliseconds — convert to seconds
# and adjust by the chunk's start offset in the full audio
segments = []
for seg in data.get("segments", []):
segments.append({
"start": seg["start"] / 1000.0 + start_offset_sec,
"end": seg["end"] / 1000.0 + start_offset_sec,
"text": seg["text"],
})
# Fallback: if no segments, use full text as one block
if not segments and data.get("text"):
segments.append({
"start": start_offset_sec,
"end": start_offset_sec + 60,
"text": data["text"],
})
return segments
def group_chapters_into_chunks(chapters, total_duration_sec):
"""
Group chapters into batches where each batch's audio is <= CHUNK_MAX_SECONDS.
Returns list of (start_sec, end_sec, [chapter_indices]).
"""
groups = []
current_indices = []
current_start = None
for i, chapter in enumerate(chapters):
ch_start = parse_timestamp(chapter["timestamp"])
if i + 1 < len(chapters):
ch_end = parse_timestamp(chapters[i + 1]["timestamp"])
else:
ch_end = total_duration_sec
if current_start is None:
current_start = ch_start
# Would adding this chapter exceed the max chunk size?
if (ch_end - current_start) > CHUNK_MAX_SECONDS and current_indices:
# Flush current group
prev_end = parse_timestamp(chapters[current_indices[-1] + 1]["timestamp"]) \
if current_indices[-1] + 1 < len(chapters) else total_duration_sec
groups.append((current_start, prev_end, list(current_indices)))
current_indices = [i]
current_start = ch_start
else:
current_indices.append(i)
if current_indices:
groups.append((current_start, total_duration_sec, current_indices))
return groups
def transcribe_with_fish(audio_path, out_dir, chapters, api_key):
"""
Split audio by chapter groups, transcribe each with Fish Audio ASR,
reassemble into a unified segment list. Caches per-chunk results.
"""
transcript_path = out_dir / "transcript.json"
if transcript_path.exists():
print(f" [cache] Using existing transcript at {transcript_path}")
with open(transcript_path) as f:
return json.load(f)
chunks_dir = out_dir / "audio_chunks"
chunks_dir.mkdir(exist_ok=True)
total_duration = get_audio_duration(audio_path)
print(f" Audio duration: {total_duration:.0f}s ({total_duration/60:.1f} min)")
groups = group_chapters_into_chunks(chapters, total_duration)
print(f" Split into {len(groups)} chunk(s) for ASR\n")
all_segments = []
for g_idx, (g_start, g_end, ch_indices) in enumerate(groups):
chunk_path = chunks_dir / f"chunk_{g_idx:02d}.mp3"
cache_path = chunks_dir / f"chunk_{g_idx:02d}.json"
ch_titles = [chapters[i]["title"][:40] for i in ch_indices]
print(f" Chunk {g_idx}: {g_start:.0f}s{g_end:.0f}s ({len(ch_indices)} chapters: {ch_titles[0]}…)")
if cache_path.exists():
print(f" [cache] Using cached ASR result")
with open(cache_path) as f:
segments = json.load(f)
else:
# Extract audio chunk
if not chunk_path.exists():
split_audio_chunk(audio_path, g_start, g_end, chunk_path)
# Send to Fish Audio
segments = transcribe_chunk_fish(chunk_path, api_key, g_start)
# Cache the result
with open(cache_path, "w") as f:
json.dump(segments, f, indent=2)
# Brief pause between API calls
if g_idx < len(groups) - 1:
time.sleep(1)
all_segments.extend(segments)
print(f" Got {len(segments)} segments")
# Save full reassembled transcript
transcript = {"segments": all_segments}
with open(transcript_path, "w") as f:
json.dump(transcript, f, indent=2)
print(f"\n Total segments: {len(all_segments)}")
return transcript
def extract_frame(video_path, timestamp_sec, out_path):
run([
"ffmpeg", "-y",
"-ss", str(timestamp_sec),
"-i", str(video_path),
"-frames:v", "1",
"-q:v", "2",
str(out_path),
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def is_demo_chapter(title):
t = title.lower()
return any(kw in t for kw in DEMO_CHAPTER_KEYWORDS)
def segments_in_window(segments, start_sec, end_sec):
"""Extract transcript text for a time window from segments."""
texts = []
for seg in segments:
seg_start = seg.get("start", 0)
seg_end = seg.get("end", seg_start)
if seg_end >= start_sec and seg_start <= end_sec:
texts.append(seg["text"].strip())
return " ".join(texts)
def main():
parser = argparse.ArgumentParser(description="Ingest Ask Annie Vimeo session")
parser.add_argument("--url", required=True, help="Vimeo URL")
parser.add_argument("--chapters", required=True, help="Path to chapters JSON file")
parser.add_argument("--out", default="out", help="Output directory")
parser.add_argument("--frames", action="store_true", help="Extract frames for demo chapters")
parser.add_argument("--fish-api-key", default=os.environ.get("FISH_API_KEY", ""),
help="Fish Audio API key (or set FISH_API_KEY env var)")
parser.add_argument("--video-id", default=None,
help="Override video ID (extracted from URL if omitted)")
args = parser.parse_args()
if not args.fish_api_key:
print("ERROR: Fish Audio API key required. Pass --fish-api-key or set FISH_API_KEY env var.")
sys.exit(1)
video_id = args.video_id or args.url.rstrip("/").split("/")[-1].split("?")[0]
print(f"\n=== Ask Annie Ingest: {video_id} ===\n")
out_dir = Path(args.out) / video_id
frames_dir = out_dir / "frames"
out_dir.mkdir(parents=True, exist_ok=True)
if args.frames:
frames_dir.mkdir(exist_ok=True)
# Load chapters
with open(args.chapters) as f:
chapters = json.load(f)
print(f"Loaded {len(chapters)} chapters\n")
# Step 1: Download audio
print("=== Step 1: Download audio ===")
audio_path = None
for f in out_dir.glob("audio.*"):
if f.suffix == ".part":
print(f" [skip] Ignoring partial download: {f}")
continue
print(f" [cache] Found existing audio: {f}")
audio_path = f
break
if audio_path is None:
audio_path = download_audio(args.url, out_dir)
print(f" Audio: {audio_path}\n")
# Step 1b: Download video (only if --frames requested)
video_path = None
if args.frames:
print("=== Step 1b: Download video (for frame extraction) ===")
existing = [f for f in out_dir.glob("video.*") if f.suffix != ".part"]
if existing:
video_path = existing[0]
print(f" [cache] Found existing video: {video_path}")
else:
vid_out = out_dir / "video.%(ext)s"
run(["yt-dlp", "-f", "bestvideo[height<=720]", "-o", str(vid_out), args.url])
matches = [f for f in out_dir.glob("video.*") if f.suffix != ".part"]
if not matches:
print(" WARNING: Video download failed, skipping frame extraction")
args.frames = False
else:
video_path = matches[0]
print()
# Step 2: Transcribe via Fish Audio
print("=== Step 2: Transcribe via Fish Audio ASR ===")
transcript = transcribe_with_fish(audio_path, out_dir, chapters, args.fish_api_key)
segments = transcript.get("segments", [])
print(f" Got {len(segments)} transcript segments\n")
# Step 3: Build chunks
print("=== Step 3: Build chunks ===")
chunks = []
for i, chapter in enumerate(chapters):
start_sec = parse_timestamp(chapter["timestamp"])
if i + 1 < len(chapters):
end_sec = parse_timestamp(chapters[i + 1]["timestamp"])
else:
end_sec = start_sec + 600
text = segments_in_window(segments, start_sec, end_sec)
demo = is_demo_chapter(chapter["title"])
frame_desc = None
if args.frames and demo and video_path:
frame_file = frames_dir / f"chapter_{i:02d}.jpg"
print(f" Extracting frame for: {chapter['title']}")
extract_frame(video_path, start_sec + 5, frame_file)
frame_desc = f"[frame: {frame_file.name} — vision description pending]"
chunk = {
"video_id": video_id,
"video_url": args.url,
"chapter_index": i,
"timestamp": chapter["timestamp"],
"timestamp_sec": start_sec,
"title": chapter["title"],
"summary": chapter.get("summary", ""),
"transcript": text,
"is_demo": demo,
"frame_description": frame_desc,
"source": "ask-annie",
"series": "ST Best Practices Q&A",
}
chunks.append(chunk)
print(f" [{i:02d}] {chapter['timestamp']}{chapter['title'][:60]} ({'demo' if demo else 'qa'})")
# Step 4: Write output
chunks_path = out_dir / "chunks.json"
with open(chunks_path, "w") as f:
json.dump(chunks, f, indent=2)
print(f"\n=== Done: {len(chunks)} chunks → {chunks_path} ===\n")
if __name__ == "__main__":
main()