Files
ai-podcast/make_clips.py
tcpsyn b02616bc44 Add clip generator, hourly stats cron, and transcription improvements
- make_clips.py: Extract best moments from episodes as short-form clips
  (9:16 vertical MP4 with captions for TikTok/Shorts/Reels)
- deploy_stats_cron.sh: Deploy podcast_stats.py to NAS as Docker container
  running hourly with auto-restart
- podcast_stats.py: Add _find_ytdlp() for Docker compatibility, auto-detect
  local Docker for Castopod DB queries
- publish_episode.py: Upgrade Whisper model from base to large-v3

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 01:30:33 -07:00

1089 lines
40 KiB
Python
Executable File

#!/usr/bin/env python3
"""Extract the best short-form clips from a podcast episode.
Two-pass pipeline (default):
1. Fast Whisper model (base) transcribes full episode for clip identification
2. LLM selects best moments
3. Quality Whisper model (large-v3) re-transcribes only selected clips for precise timestamps
Usage:
python make_clips.py ~/Desktop/episode12.mp3 --count 3
python make_clips.py ~/Desktop/episode12.mp3 --transcript website/transcripts/episode-12-love-lies-and-loyalty.txt
python make_clips.py ~/Desktop/episode12.mp3 --fast-model small --quality-model large-v3
python make_clips.py ~/Desktop/episode12.mp3 --single-pass # skip two-pass, use quality model only
"""
import argparse
import json
import os
import re
import subprocess
import sys
import tempfile
from pathlib import Path
import requests
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / ".env")
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
WHISPER_MODEL_FAST = "base"
WHISPER_MODEL_QUALITY = "large-v3"
COVER_ART = Path(__file__).parent / "website" / "images" / "cover.png"
# Fonts
FONT_BOLD = "/Library/Fonts/Montserrat-ExtraBold.ttf"
FONT_MEDIUM = "/Library/Fonts/Montserrat-Medium.ttf"
FONT_SEMIBOLD = "/Library/Fonts/Montserrat-SemiBold.ttf"
# Video dimensions (9:16 vertical)
WIDTH = 1080
HEIGHT = 1920
def _build_whisper_prompt(labeled_transcript: str) -> str:
"""Build an initial_prompt for Whisper from the labeled transcript.
Whisper's initial_prompt conditions the model to recognize specific names
and vocabulary. We extract speaker names and the first few lines of dialog.
"""
prompt_parts = ["Luke at the Roost podcast. Host: Luke."]
if labeled_transcript:
# Extract speaker names
names = set(re.findall(r'^([A-Z][A-Z\s\'-]+?):', labeled_transcript, re.MULTILINE))
caller_names = [n.strip().title() for n in names if n.strip() != "LUKE"]
if caller_names:
prompt_parts.append(f"Callers: {', '.join(caller_names)}.")
# First ~500 chars of transcript as context (stripped of labels)
stripped = re.sub(r'^[A-Z][A-Z\s\'-]+?:\s*', '', labeled_transcript[:800], flags=re.MULTILINE)
stripped = re.sub(r'\n+', ' ', stripped).strip()[:500]
if stripped:
prompt_parts.append(stripped)
return " ".join(prompt_parts)
def transcribe_with_timestamps(audio_path: str, whisper_model: str = None,
labeled_transcript: str = "") -> list[dict]:
"""Transcribe audio with word-level timestamps using faster-whisper.
Returns list of segments: [{start, end, text, words: [{word, start, end}]}]
"""
model_name = whisper_model or WHISPER_MODEL_QUALITY
cache_path = Path(audio_path).with_suffix(f".whisper_cache_{model_name}.json")
if cache_path.exists():
print(f" Using cached Whisper output ({model_name})")
with open(cache_path) as f:
return json.load(f)
try:
from faster_whisper import WhisperModel
except ImportError:
print("Error: faster-whisper not installed. Run: pip install faster-whisper")
sys.exit(1)
initial_prompt = _build_whisper_prompt(labeled_transcript)
print(f" Model: {model_name}")
if labeled_transcript:
print(f" Prompt: {initial_prompt[:100]}...")
model = WhisperModel(model_name, compute_type="float32")
segments_iter, info = model.transcribe(
audio_path,
word_timestamps=True,
initial_prompt=initial_prompt,
language="en",
beam_size=5,
vad_filter=True,
)
segments = []
for seg in segments_iter:
words = []
if seg.words:
for w in seg.words:
words.append({
"word": w.word.strip(),
"start": round(w.start, 3),
"end": round(w.end, 3),
})
segments.append({
"start": round(seg.start, 3),
"end": round(seg.end, 3),
"text": seg.text.strip(),
"words": words,
})
print(f" Transcribed {info.duration:.1f}s ({len(segments)} segments)")
with open(cache_path, "w") as f:
json.dump(segments, f)
print(f" Cached to {cache_path}")
return segments
def refine_clip_timestamps(audio_path: str, clips: list[dict],
quality_model: str, labeled_transcript: str = "",
) -> dict[int, list[dict]]:
"""Re-transcribe just the selected clip ranges with a high-quality model.
Extracts each clip segment, runs the quality model on it, and returns
refined segments with timestamps mapped back to the original timeline.
Returns: {clip_index: [segments]} keyed by clip index
"""
try:
from faster_whisper import WhisperModel
except ImportError:
print("Error: faster-whisper not installed. Run: pip install faster-whisper")
sys.exit(1)
initial_prompt = _build_whisper_prompt(labeled_transcript)
print(f" Refinement model: {quality_model}")
model = None # Lazy-load so we skip if all cached
refined = {}
with tempfile.TemporaryDirectory() as tmp:
for i, clip in enumerate(clips):
# Add padding around clip for context (Whisper does better with some lead-in)
pad = 3.0
seg_start = max(0, clip["start_time"] - pad)
seg_end = clip["end_time"] + pad
# Check cache first
cache_key = f"{Path(audio_path).stem}_clip{i}_{seg_start:.1f}-{seg_end:.1f}"
cache_path = Path(audio_path).parent / f".whisper_refine_{quality_model}_{cache_key}.json"
if cache_path.exists():
print(f" Clip {i+1}: Using cached refinement")
with open(cache_path) as f:
refined[i] = json.load(f)
continue
# Extract clip segment to temp WAV
seg_path = os.path.join(tmp, f"segment_{i}.wav")
cmd = [
"ffmpeg", "-y", "-ss", str(seg_start), "-t", str(seg_end - seg_start),
"-i", audio_path, "-ar", "16000", "-ac", "1", seg_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f" Clip {i+1}: Failed to extract segment")
refined[i] = []
continue
# Lazy-load model on first non-cached clip
if model is None:
model = WhisperModel(quality_model, compute_type="float32")
segments_iter, info = model.transcribe(
seg_path,
word_timestamps=True,
initial_prompt=initial_prompt,
language="en",
beam_size=5,
vad_filter=True,
)
# Collect segments and offset timestamps back to original timeline
segments = []
for seg in segments_iter:
words = []
if seg.words:
for w in seg.words:
words.append({
"word": w.word.strip(),
"start": round(w.start + seg_start, 3),
"end": round(w.end + seg_start, 3),
})
segments.append({
"start": round(seg.start + seg_start, 3),
"end": round(seg.end + seg_start, 3),
"text": seg.text.strip(),
"words": words,
})
refined[i] = segments
print(f" Clip {i+1}: Refined {info.duration:.1f}s → {len(segments)} segments")
with open(cache_path, "w") as f:
json.dump(segments, f)
return refined
def get_transcript_text(segments: list[dict]) -> str:
"""Build timestamped transcript text for the LLM."""
lines = []
for seg in segments:
mins = int(seg["start"] // 60)
secs = int(seg["start"] % 60)
lines.append(f"[{mins:02d}:{secs:02d}] {seg['text']}")
return "\n".join(lines)
def select_clips_with_llm(transcript_text: str, labeled_transcript: str,
chapters_json: str | None, count: int) -> list[dict]:
"""Ask LLM to pick the best clip-worthy moments."""
if not OPENROUTER_API_KEY:
print("Error: OPENROUTER_API_KEY not set in .env")
sys.exit(1)
chapters_context = ""
if chapters_json:
chapters_context = f"\nCHAPTERS:\n{chapters_json}\n"
labeled_context = ""
if labeled_transcript:
# Truncate if too long — LLM needs the gist, not every word
if len(labeled_transcript) > 12000:
labeled_context = f"\nSPEAKER-LABELED TRANSCRIPT (truncated):\n{labeled_transcript[:12000]}...\n"
else:
labeled_context = f"\nSPEAKER-LABELED TRANSCRIPT:\n{labeled_transcript}\n"
prompt = f"""You are selecting the {count} best moments from a podcast episode for short-form video clips (TikTok/YouTube Shorts/Reels).
Each clip should be 30-60 seconds long and contain a single compelling moment — a funny exchange, an emotional beat, a surprising take, or an interesting story.
TIMESTAMPED TRANSCRIPT:
{transcript_text}
{chapters_context}{labeled_context}
Pick the {count} best moments. For each, return:
- title: A catchy, short title for the clip (max 8 words)
- start_time: Start timestamp in seconds (float). Start a few seconds before the key moment for context.
- end_time: End timestamp in seconds (float). 30-60 seconds after start_time.
- caption_text: The key quote or line that makes this moment clip-worthy (1-2 sentences max)
IMPORTANT:
- Use the timestamps from the transcript to set precise start/end times
- Ensure clips don't overlap
- Prefer moments with back-and-forth dialog over monologues
- Avoid intro/outro segments
Respond with ONLY a JSON array, no markdown or explanation:
[{{"title": "...", "start_time": 0.0, "end_time": 0.0, "caption_text": "..."}}]"""
response = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": "anthropic/claude-3.5-sonnet",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 2048,
"temperature": 0.3,
},
)
if response.status_code != 200:
print(f"Error from OpenRouter: {response.text}")
sys.exit(1)
content = response.json()["choices"][0]["message"]["content"].strip()
if content.startswith("```"):
content = re.sub(r"^```(?:json)?\n?", "", content)
content = re.sub(r"\n?```$", "", content)
try:
clips = json.loads(content)
except json.JSONDecodeError as e:
print(f"Error parsing LLM response: {e}")
print(f"Response was: {content[:500]}")
sys.exit(1)
# Validate and clamp durations
validated = []
for clip in clips:
duration = clip["end_time"] - clip["start_time"]
if duration < 15:
clip["end_time"] = clip["start_time"] + 30
elif duration > 75:
clip["end_time"] = clip["start_time"] + 60
validated.append(clip)
return validated
def snap_to_sentences(clips: list[dict], segments: list[dict]) -> list[dict]:
"""Snap clip start/end times to sentence boundaries.
Uses Whisper segment boundaries and punctuation to find the nearest
sentence start/end so clips don't begin or end mid-sentence.
"""
# Build a list of sentence boundary timestamps from Whisper segments.
# A sentence boundary is: the start of a segment, or a word right after .?!
sentence_starts = []
sentence_ends = []
for seg in segments:
sentence_starts.append(seg["start"])
sentence_ends.append(seg["end"])
# Also find sentence breaks within segments using word punctuation
words = seg.get("words", [])
for i, w in enumerate(words):
if w["word"].rstrip().endswith(('.', '?', '!')):
sentence_ends.append(w["end"])
if i + 1 < len(words):
sentence_starts.append(words[i + 1]["start"])
sentence_starts.sort()
sentence_ends.sort()
for clip in clips:
original_start = clip["start_time"]
original_end = clip["end_time"]
# Find nearest sentence start at or before the clip start
# Look up to 5s back for a sentence boundary
best_start = original_start
best_start_dist = float('inf')
for s in sentence_starts:
dist = abs(s - original_start)
if dist < best_start_dist and s <= original_start + 1:
best_start = s
best_start_dist = dist
if s > original_start + 1:
break
# Find nearest sentence end at or after the clip end
# Look up to 5s forward for a sentence boundary
best_end = original_end
best_end_dist = float('inf')
for e in sentence_ends:
if e < original_end - 5:
continue
dist = abs(e - original_end)
if dist < best_end_dist:
best_end = e
best_end_dist = dist
if e > original_end + 5:
break
# Make sure we didn't create a clip that's too short or too long
duration = best_end - best_start
if duration < 20:
# Too short — extend end to next sentence boundary
for e in sentence_ends:
if e > best_start + 25:
best_end = e
break
elif duration > 75:
# Too long — pull end back
for e in reversed(sentence_ends):
if best_start + 30 <= e <= best_start + 65:
best_end = e
break
clip["start_time"] = best_start
clip["end_time"] = best_end
return clips
def get_words_in_range(segments: list[dict], start: float, end: float) -> list[dict]:
"""Extract word-level timestamps for a time range from Whisper segments."""
words = []
for seg in segments:
if seg["end"] < start or seg["start"] > end:
continue
for w in seg.get("words", []):
if w["start"] >= start - 0.5 and w["end"] <= end + 0.5:
words.append(w)
return words
def _words_similar(a: str, b: str, max_dist: int = 2) -> bool:
"""Check if two words are within edit distance max_dist (Levenshtein)."""
if abs(len(a) - len(b)) > max_dist:
return False
# Simple DP edit distance, bounded
prev = list(range(len(b) + 1))
for i in range(1, len(a) + 1):
curr = [i] + [0] * len(b)
for j in range(1, len(b) + 1):
cost = 0 if a[i - 1] == b[j - 1] else 1
curr[j] = min(curr[j - 1] + 1, prev[j] + 1, prev[j - 1] + cost)
prev = curr
return prev[len(b)] <= max_dist
def _find_labeled_section(labeled_transcript: str, range_text: str) -> str | None:
"""Find the section of labeled transcript matching a Whisper text range."""
# Strip speaker labels and punctuation from labeled transcript for matching
labeled_stripped = re.sub(r'^[A-Z][A-Z\s\'-]+?:\s*', '', labeled_transcript, flags=re.MULTILINE)
labeled_clean = re.sub(r'[^\w\s]', '', labeled_stripped.lower())
labeled_clean = re.sub(r'\s+', ' ', labeled_clean)
whisper_clean = re.sub(r'[^\w\s]', '', range_text.lower())
whisper_clean = re.sub(r'\s+', ' ', whisper_clean)
whisper_words_list = whisper_clean.split()
# Try progressively shorter phrases from different positions
for phrase_len in [10, 7, 5, 3]:
for start_offset in [0, len(whisper_words_list) // 3, len(whisper_words_list) // 2]:
words_slice = whisper_words_list[start_offset:start_offset + phrase_len]
phrase = " ".join(words_slice)
if len(phrase) < 8:
continue
pos = labeled_clean.find(phrase)
if pos != -1:
# Map back to original transcript — find first word near this position
match_pos = labeled_transcript.lower().find(
words_slice[0], max(0, pos - 300))
if match_pos == -1:
match_pos = max(0, pos)
else:
match_pos = max(0, match_pos - start_offset * 6)
context_start = max(0, match_pos - 400)
context_end = min(len(labeled_transcript), match_pos + len(range_text) + 600)
return labeled_transcript[context_start:context_end]
return None
def _parse_labeled_words(labeled_section: str) -> list[tuple[str, str, str]]:
"""Parse speaker-labeled text into (original_word, clean_lower, speaker) tuples."""
result = []
for m in re.finditer(r'^([A-Z][A-Z\s\'-]+?):\s*(.+?)(?=\n[A-Z][A-Z\s\'-]+?:|\n\n|\Z)',
labeled_section, re.MULTILINE | re.DOTALL):
speaker = m.group(1).strip()
text = m.group(2)
for w in text.split():
original = w.strip()
clean = re.sub(r"[^\w']", '', original.lower())
if clean:
result.append((original, clean, speaker))
return result
def add_speaker_labels(words: list[dict], labeled_transcript: str,
start_time: float, end_time: float,
segments: list[dict]) -> list[dict]:
"""Add speaker labels AND correct word text using labeled transcript.
Uses Whisper only for timestamps. Takes text from the labeled transcript,
which has correct names and spelling. Aligns using greedy forward matching
with edit-distance fuzzy matching.
"""
if not labeled_transcript or not words:
return words
# Get the raw Whisper text for this time range
range_text = ""
for seg in segments:
if seg["end"] < start_time or seg["start"] > end_time:
continue
range_text += " " + seg["text"]
range_text = range_text.strip()
# Find matching section in labeled transcript
labeled_section = _find_labeled_section(labeled_transcript, range_text)
if not labeled_section:
return words
labeled_words_flat = _parse_labeled_words(labeled_section)
if not labeled_words_flat:
return words
# Greedy forward alignment: for each Whisper word, find best match
# in labeled words within a lookahead window
labeled_idx = 0
current_speaker = labeled_words_flat[0][2]
corrections = 0
for word_entry in words:
whisper_clean = re.sub(r"[^\w']", '', word_entry["word"].lower())
if not whisper_clean:
word_entry["speaker"] = current_speaker
continue
# Search forward for best match
best_idx = None
best_score = 0 # 2 = exact, 1 = fuzzy
window = min(labeled_idx + 12, len(labeled_words_flat))
for j in range(labeled_idx, window):
labeled_clean = labeled_words_flat[j][1]
if labeled_clean == whisper_clean:
best_idx = j
best_score = 2
break
if len(whisper_clean) >= 3 and len(labeled_clean) >= 3:
if _words_similar(whisper_clean, labeled_clean):
if best_score < 1:
best_idx = j
best_score = 1
# Don't break — keep looking for exact match
if best_idx is not None:
original_word, _, speaker = labeled_words_flat[best_idx]
current_speaker = speaker
# Replace Whisper's word with correct version
corrected = re.sub(r'[^\w\s\'-]', '', original_word)
if corrected and corrected.lower() != whisper_clean:
word_entry["word"] = corrected
corrections += 1
elif corrected:
word_entry["word"] = corrected
labeled_idx = best_idx + 1
else:
# No match — advance labeled pointer by 1 to stay roughly in sync
if labeled_idx < len(labeled_words_flat):
labeled_idx += 1
word_entry["speaker"] = current_speaker
if corrections:
print(f" Corrected {corrections} words from labeled transcript")
return words
def group_words_into_lines(words: list[dict], clip_start: float,
clip_duration: float) -> list[dict]:
"""Group words into timed caption lines for rendering.
Returns list of: {start, end, speaker, words: [{word, highlighted}]}
"""
if not words:
return []
# Group words into display lines (5-7 words per line)
raw_lines = []
current_line = []
for w in words:
current_line.append(w)
if len(current_line) >= 6 or w["word"].rstrip().endswith(('.', '?', '!', ',')):
if len(current_line) >= 3:
raw_lines.append(current_line)
current_line = []
if current_line:
if raw_lines and len(current_line) < 3:
raw_lines[-1].extend(current_line)
else:
raw_lines.append(current_line)
lines = []
for line_words in raw_lines:
line_start = line_words[0]["start"] - clip_start
line_end = line_words[-1]["end"] - clip_start
if line_start < 0:
line_start = 0
if line_end > clip_duration:
line_end = clip_duration
if line_end <= line_start:
continue
lines.append({
"start": line_start,
"end": line_end,
"speaker": line_words[0].get("speaker", ""),
"words": line_words,
})
return lines
def extract_clip_audio(audio_path: str, start: float, end: float,
output_path: str) -> bool:
"""Extract audio clip with fade in/out."""
duration = end - start
fade_in = 0.3
fade_out = 0.5
af = f"afade=t=in:d={fade_in},afade=t=out:st={duration - fade_out}:d={fade_out}"
cmd = [
"ffmpeg", "-y",
"-ss", str(start),
"-t", str(duration),
"-i", audio_path,
"-af", af,
"-ab", "192k",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode == 0
def generate_background_image(episode_number: int, clip_title: str,
output_path: str) -> bool:
"""Generate 9:16 vertical background with blurred/cropped cover art."""
from PIL import Image, ImageDraw, ImageFilter, ImageFont
if not COVER_ART.exists():
print(f" Warning: Cover art not found at {COVER_ART}")
# Create solid dark background fallback
img = Image.new("RGB", (WIDTH, HEIGHT), (20, 15, 30))
img.save(output_path)
return True
cover = Image.open(COVER_ART).convert("RGB")
# Scale cover to fill 1080x1920 (crop to fit)
cover_ratio = cover.width / cover.height
target_ratio = WIDTH / HEIGHT
if cover_ratio > target_ratio:
new_h = HEIGHT
new_w = int(HEIGHT * cover_ratio)
else:
new_w = WIDTH
new_h = int(WIDTH / cover_ratio)
cover = cover.resize((new_w, new_h), Image.LANCZOS)
# Center crop
left = (new_w - WIDTH) // 2
top = (new_h - HEIGHT) // 2
cover = cover.crop((left, top, left + WIDTH, top + HEIGHT))
# Heavy blur + darken for background
bg = cover.filter(ImageFilter.GaussianBlur(radius=30))
from PIL import ImageEnhance
bg = ImageEnhance.Brightness(bg).enhance(0.3)
# Place sharp cover art centered, sized to ~60% width
art_size = int(WIDTH * 0.6)
art = Image.open(COVER_ART).convert("RGB")
art = art.resize((art_size, art_size), Image.LANCZOS)
# Add rounded shadow effect (just darken behind)
art_x = (WIDTH - art_size) // 2
art_y = int(HEIGHT * 0.18)
bg.paste(art, (art_x, art_y))
# Draw text overlays
draw = ImageDraw.Draw(bg)
try:
font_ep = ImageFont.truetype(FONT_BOLD, 42)
font_title = ImageFont.truetype(FONT_BOLD, 56)
font_url = ImageFont.truetype(FONT_SEMIBOLD, 32)
except OSError:
font_ep = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 42)
font_title = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 56)
font_url = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 32)
margin = 60
# Episode label at top
ep_text = f"EPISODE {episode_number}" if episode_number else "LUKE AT THE ROOST"
draw.text((margin, 80), ep_text, font=font_ep, fill=(255, 200, 80))
# Clip title below episode label
# Word wrap the title
import textwrap
wrapped_title = textwrap.fill(clip_title, width=22)
draw.text((margin, 140), wrapped_title, font=font_title, fill=(255, 255, 255))
# Watermark at bottom
url_text = "lukeattheroost.com"
bbox = draw.textbbox((0, 0), url_text, font=font_url)
url_w = bbox[2] - bbox[0]
draw.text(((WIDTH - url_w) // 2, HEIGHT - 80), url_text,
font=font_url, fill=(255, 200, 80, 200))
bg.save(output_path, "PNG")
return True
def generate_caption_frames(bg_path: str, caption_lines: list[dict],
clip_start: float, duration: float,
tmp_dir: Path, fps: int = 10) -> str:
"""Generate caption frame PNGs and a concat file for ffmpeg.
Uses a low FPS (10) since the background is static — only captions change.
Returns path to the concat file.
"""
from PIL import Image, ImageDraw, ImageFont
bg = Image.open(bg_path).convert("RGB")
try:
font_caption = ImageFont.truetype(FONT_BOLD, 52)
font_speaker = ImageFont.truetype(FONT_SEMIBOLD, 40)
except OSError:
font_caption = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 52)
font_speaker = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 40)
frames_dir = tmp_dir / "frames"
frames_dir.mkdir(exist_ok=True)
n_frames = int(duration * fps)
frame_duration = 1.0 / fps
concat_lines = []
prev_state = None # (line_idx, highlighted_word_idx) — only reuse when both match
prev_frame_path = None
for frame_num in range(n_frames):
t = frame_num * frame_duration
# Find active caption line
active_idx = -1
active_line = None
for i, line in enumerate(caption_lines):
if line["start"] <= t <= line["end"]:
active_idx = i
active_line = line
break
# Find which word is currently highlighted
highlight_idx = -1
if active_line:
for wi, w in enumerate(active_line["words"]):
word_rel_start = w["start"] - clip_start
word_rel_end = w["end"] - clip_start
if word_rel_start <= t <= word_rel_end:
highlight_idx = wi
break
if highlight_idx == -1:
# Between words — highlight the last spoken word
for wi in range(len(active_line["words"]) - 1, -1, -1):
if t > active_line["words"][wi]["end"] - clip_start:
highlight_idx = wi
break
# Reuse previous frame only if same line AND same highlighted word
state = (active_idx, highlight_idx)
if state == prev_state and prev_frame_path:
concat_lines.append(f"file '{prev_frame_path}'")
concat_lines.append(f"duration {frame_duration:.4f}")
continue
frame = bg.copy()
if active_line:
draw = ImageDraw.Draw(frame)
margin = 60
caption_y = int(HEIGHT * 0.78)
# Speaker label
if active_line.get("speaker"):
for dx, dy in [(-1, -1), (-1, 1), (1, -1), (1, 1)]:
draw.text((margin + dx, caption_y - 55 + dy),
active_line["speaker"],
font=font_speaker, fill=(0, 0, 0))
draw.text((margin, caption_y - 55), active_line["speaker"],
font=font_speaker, fill=(255, 200, 80))
# Caption text — all words visible, current word highlighted yellow
x = margin
y = caption_y
for wi, w in enumerate(active_line["words"]):
word_text = w["word"] + " "
if wi == highlight_idx:
color = (255, 200, 80) # Yellow — currently spoken
elif wi < highlight_idx or (highlight_idx == -1 and t > w["end"] - clip_start):
color = (255, 255, 255) # White — already spoken
else:
color = (180, 180, 180) # Gray — upcoming
bbox = draw.textbbox((0, 0), word_text, font=font_caption)
w_width = bbox[2] - bbox[0]
# Wrap line
if x + w_width > WIDTH - margin:
x = margin
y += 65
# Outline
for dx, dy in [(-2, -2), (-2, 2), (2, -2), (2, 2)]:
draw.text((x + dx, y + dy), w["word"],
font=font_caption, fill=(0, 0, 0))
draw.text((x, y), w["word"], font=font_caption, fill=color)
x += w_width
frame_path = str(frames_dir / f"frame_{frame_num:05d}.png")
frame.save(frame_path, "PNG")
concat_lines.append(f"file '{frame_path}'")
concat_lines.append(f"duration {frame_duration:.4f}")
prev_state = state
prev_frame_path = frame_path
# Final frame needs duration too
if prev_frame_path:
concat_lines.append(f"file '{prev_frame_path}'")
concat_lines.append(f"duration {frame_duration:.4f}")
concat_path = str(tmp_dir / "concat.txt")
with open(concat_path, "w") as f:
f.write("\n".join(concat_lines))
return concat_path
def generate_clip_video(audio_path: str, background_path: str,
caption_lines: list[dict], clip_start: float,
output_path: str, duration: float,
tmp_dir: Path) -> bool:
"""Generate clip video with burned-in captions using Pillow + ffmpeg."""
if caption_lines:
# Generate frames with captions
concat_path = generate_caption_frames(
background_path, caption_lines, clip_start, duration, tmp_dir
)
cmd = [
"ffmpeg", "-y",
"-f", "concat", "-safe", "0", "-i", concat_path,
"-i", audio_path,
"-c:v", "libx264", "-preset", "medium", "-crf", "23",
"-pix_fmt", "yuv420p",
"-c:a", "aac", "-b:a", "192k",
"-t", str(duration),
"-shortest",
"-r", "30",
output_path,
]
else:
# No captions — just static image + audio
cmd = [
"ffmpeg", "-y",
"-loop", "1", "-i", background_path,
"-i", audio_path,
"-c:v", "libx264", "-preset", "medium", "-crf", "23",
"-pix_fmt", "yuv420p",
"-c:a", "aac", "-b:a", "192k",
"-t", str(duration),
"-shortest",
"-r", "30",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f" ffmpeg error: {result.stderr[-300:]}")
return False
return True
def slugify(text: str) -> str:
"""Convert text to URL-friendly slug."""
slug = re.sub(r'[^a-z0-9]+', '-', text.lower()).strip('-')
return slug[:50]
def detect_episode_number(audio_path: str) -> int | None:
"""Try to detect episode number from filename."""
name = Path(audio_path).stem
m = re.search(r'(?:episode|ep|podcast)[-_]?(\d+)', name, re.IGNORECASE)
if m:
return int(m.group(1))
m = re.search(r'(\d+)', name)
if m:
return int(m.group(1))
return None
def main():
parser = argparse.ArgumentParser(description="Extract short-form clips from podcast episodes")
parser.add_argument("audio_file", help="Path to episode MP3")
parser.add_argument("--transcript", "-t", help="Path to labeled transcript (.txt)")
parser.add_argument("--chapters", "-c", help="Path to chapters JSON")
parser.add_argument("--count", "-n", type=int, default=3, help="Number of clips to extract (default: 3)")
parser.add_argument("--episode-number", "-e", type=int, help="Episode number (auto-detected from filename)")
parser.add_argument("--output-dir", "-o", help="Output directory (default: clips/episode-N/)")
parser.add_argument("--audio-only", action="store_true", help="Only extract audio clips, skip video")
parser.add_argument("--fast-model", default=WHISPER_MODEL_FAST,
help=f"Fast Whisper model for clip identification (default: {WHISPER_MODEL_FAST})")
parser.add_argument("--quality-model", default=WHISPER_MODEL_QUALITY,
help=f"Quality Whisper model for clip refinement (default: {WHISPER_MODEL_QUALITY})")
parser.add_argument("--single-pass", action="store_true",
help="Use quality model for everything (slower, no two-pass)")
args = parser.parse_args()
audio_path = Path(args.audio_file).expanduser().resolve()
if not audio_path.exists():
print(f"Error: Audio file not found: {audio_path}")
sys.exit(1)
# Detect episode number
episode_number = args.episode_number or detect_episode_number(str(audio_path))
# Resolve output directory
if args.output_dir:
output_dir = Path(args.output_dir)
elif episode_number:
output_dir = Path(__file__).parent / "clips" / f"episode-{episode_number}"
else:
output_dir = Path(__file__).parent / "clips" / audio_path.stem
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Clip extraction: {audio_path.name}")
if episode_number:
print(f"Episode: {episode_number}")
print(f"Output: {output_dir}")
print(f"Clips requested: {args.count}")
# Step 1: Load labeled transcript (needed to prime Whisper with names)
print(f"\n[1] Loading labeled transcript...")
labeled_transcript = ""
if args.transcript:
transcript_path = Path(args.transcript).expanduser().resolve()
if transcript_path.exists():
labeled_transcript = transcript_path.read_text()
print(f" Loaded: {transcript_path.name} ({len(labeled_transcript)} chars)")
else:
print(f" Warning: Transcript not found: {transcript_path}")
else:
# Auto-detect from website/transcripts/
transcripts_dir = Path(__file__).parent / "website" / "transcripts"
if episode_number and transcripts_dir.exists():
for f in transcripts_dir.iterdir():
if f.suffix == ".txt" and f"episode-{episode_number}" in f.name:
labeled_transcript = f.read_text()
print(f" Auto-detected: {f.name}")
break
if not labeled_transcript:
print(" No labeled transcript found (names may be inaccurate)")
# Step 2: Fast transcription for clip identification
two_pass = not args.single_pass and args.fast_model != args.quality_model
if two_pass:
print(f"\n[2/6] Fast transcription for clip identification ({args.fast_model})...")
else:
print(f"\n[2/5] Transcribing with word-level timestamps ({args.quality_model})...")
identify_model = args.fast_model if two_pass else args.quality_model
segments = transcribe_with_timestamps(
str(audio_path), identify_model, labeled_transcript
)
# Build timestamped transcript for LLM
transcript_text = get_transcript_text(segments)
# Load chapters if provided
chapters_json = None
if args.chapters:
chapters_path = Path(args.chapters).expanduser().resolve()
if chapters_path.exists():
with open(chapters_path) as f:
chapters_json = f.read()
print(f" Chapters loaded: {chapters_path.name}")
# Step 3: LLM selects best moments
step_total = 6 if two_pass else 5
print(f"\n[3/{step_total}] Selecting {args.count} best moments with LLM...")
clips = select_clips_with_llm(transcript_text, labeled_transcript,
chapters_json, args.count)
# Snap to sentence boundaries so clips don't start/end mid-sentence
clips = snap_to_sentences(clips, segments)
for i, clip in enumerate(clips):
duration = clip["end_time"] - clip["start_time"]
print(f" Clip {i+1}: \"{clip['title']}\" "
f"({clip['start_time']:.1f}s - {clip['end_time']:.1f}s, {duration:.0f}s)")
print(f" \"{clip['caption_text']}\"")
# Step 4: Refine clip timestamps with quality model (two-pass only)
refined = {}
if two_pass:
print(f"\n[4/{step_total}] Refining clips with {args.quality_model}...")
refined = refine_clip_timestamps(
str(audio_path), clips, args.quality_model, labeled_transcript
)
# Re-snap to sentence boundaries using refined segments
for i, clip in enumerate(clips):
if i in refined and refined[i]:
clip_segments = refined[i]
clips[i:i+1] = snap_to_sentences([clip], clip_segments)
# Step N: Extract audio clips
extract_step = 5 if two_pass else 4
print(f"\n[{extract_step}/{step_total}] Extracting audio clips...")
for i, clip in enumerate(clips):
slug = slugify(clip["title"])
mp3_path = output_dir / f"clip-{i+1}-{slug}.mp3"
if extract_clip_audio(str(audio_path), clip["start_time"], clip["end_time"],
str(mp3_path)):
print(f" Clip {i+1} audio: {mp3_path.name}")
else:
print(f" Error extracting clip {i+1} audio")
video_step = 6 if two_pass else 5
if args.audio_only:
print(f"\n[{video_step}/{step_total}] Skipped video generation (--audio-only)")
print(f"\nDone! {len(clips)} audio clips saved to {output_dir}")
return
# Step N: Generate video clips
print(f"\n[{video_step}/{step_total}] Generating video clips...")
with tempfile.TemporaryDirectory() as tmp:
tmp_dir = Path(tmp)
for i, clip in enumerate(clips):
slug = slugify(clip["title"])
mp3_path = output_dir / f"clip-{i+1}-{slug}.mp3"
mp4_path = output_dir / f"clip-{i+1}-{slug}.mp4"
duration = clip["end_time"] - clip["start_time"]
print(f" Clip {i+1}: Generating video...")
# Generate background image
bg_path = str(tmp_dir / f"bg_{i}.png")
generate_background_image(episode_number, clip["title"], bg_path)
# Get word timestamps — use refined segments if available
word_source = refined[i] if (two_pass and i in refined and refined[i]) else segments
clip_words = get_words_in_range(word_source, clip["start_time"], clip["end_time"])
# Add speaker labels
clip_words = add_speaker_labels(clip_words, labeled_transcript,
clip["start_time"], clip["end_time"],
word_source)
# Group words into timed caption lines
caption_lines = group_words_into_lines(
clip_words, clip["start_time"], duration
)
# Use a per-clip temp dir for frames
clip_tmp = tmp_dir / f"clip_{i}"
clip_tmp.mkdir(exist_ok=True)
# Composite video
if generate_clip_video(str(mp3_path), bg_path, caption_lines,
clip["start_time"], str(mp4_path),
duration, clip_tmp):
file_size = mp4_path.stat().st_size / (1024 * 1024)
print(f" Clip {i+1} video: {mp4_path.name} ({file_size:.1f} MB)")
else:
print(f" Error generating clip {i+1} video")
# Summary
print(f"\nDone! {len(clips)} clips saved to {output_dir}")
for i, clip in enumerate(clips):
slug = slugify(clip["title"])
mp4 = output_dir / f"clip-{i+1}-{slug}.mp4"
mp3 = output_dir / f"clip-{i+1}-{slug}.mp3"
print(f" {i+1}. \"{clip['title']}\"")
if mp4.exists():
print(f" Video: {mp4}")
if mp3.exists():
print(f" Audio: {mp3}")
if __name__ == "__main__":
main()