Add batch_transcribe.py
This commit is contained in:
149
batch_transcribe.py
Normal file
149
batch_transcribe.py
Normal file
@@ -0,0 +1,149 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ask-annie/batch_transcribe.py
|
||||
|
||||
Batch download + transcribe all sessions in ask-annie-videos-list.txt
|
||||
Skips any session that already has a transcript.json cached.
|
||||
|
||||
Usage:
|
||||
DEEPGRAM_API_KEY=xxx python3 batch_transcribe.py [--out ./out] [--delay 45]
|
||||
|
||||
After this completes, git add/commit/push the out/ directory, then ping Clawd
|
||||
to generate chapters for each session.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import http.client
|
||||
import json
|
||||
import os
|
||||
import ssl
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def run(cmd, **kwargs):
|
||||
print(f" $ {' '.join(str(c) for c in cmd)}", flush=True)
|
||||
result = subprocess.run(cmd, check=True, **kwargs)
|
||||
return result
|
||||
|
||||
|
||||
def download_audio(url, out_dir):
|
||||
audio_path = out_dir / "audio.%(ext)s"
|
||||
run(["yt-dlp", "--extract-audio", "--audio-format", "mp3",
|
||||
"--audio-quality", "3", "-o", str(audio_path), url])
|
||||
matches = [f for f in out_dir.glob("audio.*") if f.suffix != ".part"]
|
||||
if not matches:
|
||||
raise FileNotFoundError("Audio download failed")
|
||||
return matches[0]
|
||||
|
||||
|
||||
def transcribe_deepgram(audio_path, api_key):
|
||||
print(f" Sending {audio_path.stat().st_size/1024/1024:.1f}MB to Deepgram...", flush=True)
|
||||
with open(audio_path, "rb") as f:
|
||||
audio_bytes = f.read()
|
||||
params = "?model=nova-3&language=en&punctuate=true&utterances=true&smart_format=true"
|
||||
ctx = ssl.create_default_context()
|
||||
conn = http.client.HTTPSConnection("api.deepgram.com", timeout=600, context=ctx)
|
||||
conn.request("POST", f"/v1/listen{params}", body=audio_bytes,
|
||||
headers={"Authorization": f"Token {api_key}", "Content-Type": "audio/mp3"})
|
||||
resp = conn.getresponse()
|
||||
resp_body = resp.read().decode()
|
||||
conn.close()
|
||||
print(f" HTTP {resp.status}", flush=True)
|
||||
if resp.status != 200:
|
||||
raise RuntimeError(f"Deepgram error {resp.status}: {resp_body[:200]}")
|
||||
return json.loads(resp_body)
|
||||
|
||||
|
||||
def process_video(url, out_base, api_key):
|
||||
video_id = url.rstrip("/").split("/")[-1].split("?")[0]
|
||||
out_dir = out_base / video_id
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
transcript_path = out_dir / "transcript.json"
|
||||
txt_path = out_dir / "transcript.txt"
|
||||
|
||||
if transcript_path.exists() and txt_path.exists():
|
||||
print(f" [skip] {video_id} — transcript already exists")
|
||||
return True, video_id
|
||||
|
||||
# Download audio
|
||||
audio_path = None
|
||||
for f in out_dir.glob("audio.*"):
|
||||
if f.suffix != ".part":
|
||||
print(f" [cache] audio: {f.name}")
|
||||
audio_path = f
|
||||
break
|
||||
if audio_path is None:
|
||||
print(f" Downloading audio...")
|
||||
audio_path = download_audio(url, out_dir)
|
||||
|
||||
# Transcribe
|
||||
raw = transcribe_deepgram(audio_path, api_key)
|
||||
with open(transcript_path, "w") as f:
|
||||
json.dump(raw, f, indent=2)
|
||||
|
||||
# Write plain text
|
||||
utterances = raw.get("results", {}).get("utterances", [])
|
||||
duration = raw.get("metadata", {}).get("duration", 0)
|
||||
with open(txt_path, "w") as f:
|
||||
f.write(f"# Transcript: {video_id}\n")
|
||||
f.write(f"# URL: {url}\n")
|
||||
f.write(f"# Duration: {duration:.0f}s ({duration/60:.1f} min)\n\n")
|
||||
for u in utterances:
|
||||
m, s = divmod(int(u["start"]), 60)
|
||||
f.write(f"[{m}:{s:02d}] {u['transcript']}\n")
|
||||
|
||||
print(f" ✅ {video_id} — {duration/60:.1f} min, {len(utterances)} utterances")
|
||||
return True, video_id
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--videos", default="ask-annie-videos-list.txt")
|
||||
parser.add_argument("--out", default="out")
|
||||
parser.add_argument("--delay", type=int, default=45,
|
||||
help="Seconds to wait between videos (default: 45)")
|
||||
parser.add_argument("--deepgram-api-key", default=os.environ.get("DEEPGRAM_API_KEY", ""))
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.deepgram_api_key:
|
||||
print("ERROR: Set DEEPGRAM_API_KEY or pass --deepgram-api-key")
|
||||
sys.exit(1)
|
||||
|
||||
with open(args.videos) as f:
|
||||
urls = [l.strip() for l in f if l.strip() and not l.startswith("#")]
|
||||
|
||||
# Skip already-done session from session 1
|
||||
out_base = Path(args.out)
|
||||
print(f"=== Batch transcribe: {len(urls)} videos ===\n")
|
||||
|
||||
done, failed = [], []
|
||||
for i, url in enumerate(urls):
|
||||
video_id = url.rstrip("/").split("/")[-1].split("?")[0]
|
||||
print(f"\n[{i+1}/{len(urls)}] {video_id}")
|
||||
print(f" URL: {url}")
|
||||
try:
|
||||
ok, vid = process_video(url, out_base, args.deepgram_api_key)
|
||||
done.append(vid)
|
||||
except Exception as e:
|
||||
print(f" ❌ FAILED: {e}")
|
||||
failed.append((video_id, str(e)))
|
||||
|
||||
# Delay between videos (skip after last)
|
||||
if i < len(urls) - 1:
|
||||
print(f" Waiting {args.delay}s before next video...")
|
||||
time.sleep(args.delay)
|
||||
|
||||
print(f"\n=== Done: {len(done)} ok, {len(failed)} failed ===")
|
||||
if failed:
|
||||
for vid, err in failed:
|
||||
print(f" ❌ {vid}: {err}")
|
||||
print(f"\nNext: git add out/ && git commit -m 'batch transcripts' && git push")
|
||||
print("Then ping Clawd to generate chapters for each session.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user