260 lines
8.2 KiB
Python
260 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ask-annie/ingest.py
|
|
|
|
Given a Vimeo URL and a chapter list (JSON file), produces structured
|
|
per-chapter knowledge chunks suitable for ingestion into knowledge-mcp.
|
|
|
|
Usage:
|
|
python3 ingest.py --url <vimeo_url> --chapters chapters.json --out out/ [--frames]
|
|
|
|
Output:
|
|
out/<video_id>/chunks.json — array of chapter chunks
|
|
out/<video_id>/frames/ — extracted frame images (if --frames)
|
|
out/<video_id>/transcript.json — full Whisper output (cached)
|
|
|
|
Dependencies (must be on PATH):
|
|
yt-dlp, ffmpeg, whisper
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
|
|
# Chapters where frame extraction adds real value (demo-heavy sections).
|
|
# Identified by matching chapter title substrings (case-insensitive).
|
|
DEMO_CHAPTER_KEYWORDS = [
|
|
"transcoding",
|
|
"s3",
|
|
"file tracking",
|
|
"workbench",
|
|
"new capability",
|
|
"preview",
|
|
"demo",
|
|
"ui",
|
|
"setup",
|
|
]
|
|
|
|
|
|
def run(cmd, **kwargs):
|
|
print(f" $ {' '.join(cmd)}", flush=True)
|
|
result = subprocess.run(cmd, check=True, **kwargs)
|
|
return result
|
|
|
|
|
|
def seconds_to_hhmmss(s):
|
|
s = int(s)
|
|
h, rem = divmod(s, 3600)
|
|
m, sec = divmod(rem, 60)
|
|
return f"{h:02d}:{m:02d}:{sec:02d}"
|
|
|
|
|
|
def parse_timestamp(ts_str):
|
|
"""Parse 'M:SS' or 'H:MM:SS' to seconds."""
|
|
parts = ts_str.strip().split(":")
|
|
parts = [int(p) for p in parts]
|
|
if len(parts) == 2:
|
|
return parts[0] * 60 + parts[1]
|
|
elif len(parts) == 3:
|
|
return parts[0] * 3600 + parts[1] * 60 + parts[2]
|
|
raise ValueError(f"Cannot parse timestamp: {ts_str}")
|
|
|
|
|
|
def download_audio(url, out_dir):
|
|
audio_path = out_dir / "audio.%(ext)s"
|
|
run([
|
|
"yt-dlp",
|
|
"--extract-audio",
|
|
"--audio-format", "mp3",
|
|
"--audio-quality", "3", # ~128kbps, good enough for speech
|
|
"-o", str(audio_path),
|
|
url,
|
|
])
|
|
# Find the downloaded file
|
|
matches = list(out_dir.glob("audio.*"))
|
|
if not matches:
|
|
raise FileNotFoundError("Audio download failed — no audio.* file found")
|
|
return matches[0]
|
|
|
|
|
|
def transcribe(audio_path, out_dir, model="medium"):
|
|
transcript_path = out_dir / "transcript.json"
|
|
if transcript_path.exists():
|
|
print(f" [cache] Using existing transcript at {transcript_path}")
|
|
with open(transcript_path) as f:
|
|
return json.load(f)
|
|
|
|
run([
|
|
"whisper",
|
|
str(audio_path),
|
|
"--model", model,
|
|
"--output_format", "json",
|
|
"--output_dir", str(out_dir),
|
|
"--language", "en",
|
|
"--word_timestamps", "True",
|
|
])
|
|
|
|
# Whisper names output after the input filename
|
|
whisper_out = out_dir / (audio_path.stem + ".json")
|
|
if not whisper_out.exists():
|
|
raise FileNotFoundError(f"Expected Whisper output at {whisper_out}")
|
|
whisper_out.rename(transcript_path)
|
|
|
|
with open(transcript_path) as f:
|
|
return json.load(f)
|
|
|
|
|
|
def extract_frame(video_path, timestamp_sec, out_path):
|
|
run([
|
|
"ffmpeg", "-y",
|
|
"-ss", str(timestamp_sec),
|
|
"-i", str(video_path),
|
|
"-frames:v", "1",
|
|
"-q:v", "2",
|
|
str(out_path),
|
|
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
|
|
def is_demo_chapter(title):
|
|
t = title.lower()
|
|
return any(kw in t for kw in DEMO_CHAPTER_KEYWORDS)
|
|
|
|
|
|
def segments_in_window(segments, start_sec, end_sec):
|
|
"""Extract transcript text for a time window from Whisper segments."""
|
|
texts = []
|
|
for seg in segments:
|
|
seg_start = seg.get("start", 0)
|
|
seg_end = seg.get("end", seg_start)
|
|
# Include segment if it overlaps with the window
|
|
if seg_end >= start_sec and seg_start <= end_sec:
|
|
texts.append(seg["text"].strip())
|
|
return " ".join(texts)
|
|
|
|
|
|
def describe_frame(image_path):
|
|
"""
|
|
Placeholder: returns a note that frame description needs Claude vision.
|
|
In a later pass, replace this with an actual API call or output the
|
|
image path for external processing.
|
|
"""
|
|
return f"[frame: {image_path.name} — vision description pending]"
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Ingest Ask Annie Vimeo session")
|
|
parser.add_argument("--url", required=True, help="Vimeo URL")
|
|
parser.add_argument("--chapters", required=True, help="Path to chapters JSON file")
|
|
parser.add_argument("--out", default="out", help="Output directory")
|
|
parser.add_argument("--frames", action="store_true", help="Extract frames for demo chapters")
|
|
parser.add_argument("--whisper-model", default="medium", help="Whisper model size")
|
|
parser.add_argument("--video-id", default=None, help="Override video ID (extracted from URL if omitted)")
|
|
args = parser.parse_args()
|
|
|
|
# Derive video ID from URL
|
|
video_id = args.video_id or args.url.rstrip("/").split("/")[-1].split("?")[0]
|
|
print(f"\n=== Ask Annie Ingest: {video_id} ===\n")
|
|
|
|
out_dir = Path(args.out) / video_id
|
|
frames_dir = out_dir / "frames"
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
if args.frames:
|
|
frames_dir.mkdir(exist_ok=True)
|
|
|
|
# Load chapters
|
|
with open(args.chapters) as f:
|
|
chapters = json.load(f)
|
|
print(f"Loaded {len(chapters)} chapters\n")
|
|
|
|
# Step 1: Download audio
|
|
print("=== Step 1: Download audio ===")
|
|
audio_path = None
|
|
for f in out_dir.glob("audio.*"):
|
|
if f.suffix == ".part":
|
|
print(f" [skip] Ignoring partial download: {f}")
|
|
continue
|
|
print(f" [cache] Found existing audio: {f}")
|
|
audio_path = f
|
|
break
|
|
if audio_path is None:
|
|
audio_path = download_audio(args.url, out_dir)
|
|
print(f" Audio: {audio_path}\n")
|
|
|
|
# Step 1b: Download video (only if --frames requested)
|
|
video_path = None
|
|
if args.frames:
|
|
print("=== Step 1b: Download video (for frame extraction) ===")
|
|
existing = list(out_dir.glob("video.*"))
|
|
if existing:
|
|
video_path = existing[0]
|
|
print(f" [cache] Found existing video: {video_path}")
|
|
else:
|
|
vid_out = out_dir / "video.%(ext)s"
|
|
run(["yt-dlp", "-f", "bestvideo[height<=720]", "-o", str(vid_out), args.url])
|
|
matches = list(out_dir.glob("video.*"))
|
|
if not matches:
|
|
print(" WARNING: Video download failed, skipping frame extraction")
|
|
args.frames = False
|
|
else:
|
|
video_path = matches[0]
|
|
print()
|
|
|
|
# Step 2: Transcribe
|
|
print("=== Step 2: Transcribe ===")
|
|
transcript = transcribe(audio_path, out_dir, model=args.whisper_model)
|
|
segments = transcript.get("segments", [])
|
|
print(f" Got {len(segments)} transcript segments\n")
|
|
|
|
# Step 3: Build chunks
|
|
print("=== Step 3: Build chunks ===")
|
|
chunks = []
|
|
for i, chapter in enumerate(chapters):
|
|
start_sec = parse_timestamp(chapter["timestamp"])
|
|
# End = next chapter start, or +10min for last
|
|
if i + 1 < len(chapters):
|
|
end_sec = parse_timestamp(chapters[i + 1]["timestamp"])
|
|
else:
|
|
end_sec = start_sec + 600
|
|
|
|
text = segments_in_window(segments, start_sec, end_sec)
|
|
demo = is_demo_chapter(chapter["title"])
|
|
|
|
frame_desc = None
|
|
if args.frames and demo and video_path:
|
|
frame_file = frames_dir / f"chapter_{i:02d}.jpg"
|
|
print(f" Extracting frame for: {chapter['title']}")
|
|
extract_frame(video_path, start_sec + 5, frame_file)
|
|
frame_desc = describe_frame(frame_file)
|
|
|
|
chunk = {
|
|
"video_id": video_id,
|
|
"video_url": args.url,
|
|
"chapter_index": i,
|
|
"timestamp": chapter["timestamp"],
|
|
"timestamp_sec": start_sec,
|
|
"title": chapter["title"],
|
|
"summary": chapter.get("summary", ""),
|
|
"transcript": text,
|
|
"is_demo": demo,
|
|
"frame_description": frame_desc,
|
|
"source": "ask-annie",
|
|
"series": "ST Best Practices Q&A",
|
|
}
|
|
chunks.append(chunk)
|
|
print(f" [{i:02d}] {chapter['timestamp']} — {chapter['title'][:60]} ({'demo' if demo else 'qa'})")
|
|
|
|
# Step 4: Write output
|
|
chunks_path = out_dir / "chunks.json"
|
|
with open(chunks_path, "w") as f:
|
|
json.dump(chunks, f, indent=2)
|
|
print(f"\n=== Done: {len(chunks)} chunks → {chunks_path} ===\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|