Files
ai-podcast/make_clips.py
tcpsyn 0c2201fab5 Fix Remotion render error reporting and harden clip timestamps
- Show full stderr (head + tail) instead of truncating to last 500 chars
- Add --timeout=60000 and --log=verbose to Remotion render command
- Clamp word timestamps to [0, duration] to prevent negative/OOB values

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 19:22:03 -06:00

1636 lines
60 KiB
Python
Executable File

#!/usr/bin/env python3
"""Extract the best short-form clips from a podcast episode.
Two-pass pipeline (default):
1. Fast Whisper model (base) transcribes full episode for clip identification
2. LLM selects best moments
3. Quality Whisper model (large-v3) re-transcribes only selected clips for precise timestamps
Usage:
python make_clips.py ~/Desktop/episode12.mp3 --count 3
python make_clips.py ~/Desktop/episode12.mp3 --transcript website/transcripts/episode-12-love-lies-and-loyalty.txt
python make_clips.py ~/Desktop/episode12.mp3 --fast-model small --quality-model large-v3
python make_clips.py ~/Desktop/episode12.mp3 --single-pass # skip two-pass, use quality model only
"""
import argparse
import json
import os
import re
import subprocess
import sys
import tempfile
import xml.etree.ElementTree as ET
from pathlib import Path
import requests
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / ".env")
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
RSS_FEED_URL = "https://podcast.macneilmediagroup.com/@LukeAtTheRoost/feed.xml"
EPISODE_CACHE_DIR = Path(__file__).parent / "clips" / ".episode-cache"
WHISPER_MODEL_FAST = "distil-large-v3"
WHISPER_MODEL_QUALITY = "distil-large-v3"
COVER_ART = Path(__file__).parent / "website" / "images" / "cover.png"
REMOTION_DIR = Path(__file__).parent / "remotion-demo"
# Fonts
FONT_BOLD = "/Library/Fonts/Montserrat-ExtraBold.ttf"
FONT_MEDIUM = "/Library/Fonts/Montserrat-Medium.ttf"
FONT_SEMIBOLD = "/Library/Fonts/Montserrat-SemiBold.ttf"
# Video dimensions (9:16 vertical)
WIDTH = 1080
HEIGHT = 1920
def _build_whisper_prompt(labeled_transcript: str) -> str:
"""Build an initial_prompt for Whisper from the labeled transcript.
Whisper's initial_prompt conditions the model to recognize specific names
and vocabulary. We extract speaker names and the first few lines of dialog.
"""
prompt_parts = ["Luke at the Roost podcast. Host: Luke."]
if labeled_transcript:
# Extract speaker names
names = set(re.findall(r'^([A-Z][A-Z\s\'-]+?):', labeled_transcript, re.MULTILINE))
caller_names = [n.strip().title() for n in names if n.strip() != "LUKE"]
if caller_names:
prompt_parts.append(f"Callers: {', '.join(caller_names)}.")
# First ~500 chars of transcript as context (stripped of labels)
stripped = re.sub(r'^[A-Z][A-Z\s\'-]+?:\s*', '', labeled_transcript[:800], flags=re.MULTILINE)
stripped = re.sub(r'\n+', ' ', stripped).strip()[:500]
if stripped:
prompt_parts.append(stripped)
return " ".join(prompt_parts)
def transcribe_with_timestamps(audio_path: str, whisper_model: str = None,
labeled_transcript: str = "") -> list[dict]:
"""Transcribe audio with word-level timestamps using mlx-whisper (Apple Silicon GPU).
Returns list of segments: [{start, end, text, words: [{word, start, end}]}]
"""
model_name = whisper_model or WHISPER_MODEL_QUALITY
cache_path = Path(audio_path).with_suffix(f".whisper_cache_{model_name}.json")
if cache_path.exists():
print(f" Using cached Whisper output ({model_name})")
with open(cache_path) as f:
return json.load(f)
try:
import mlx_whisper
except ImportError:
print("Error: mlx-whisper not installed. Run: pip install mlx-whisper")
sys.exit(1)
MODEL_HF_REPOS = {
"distil-large-v3": "mlx-community/distil-whisper-large-v3",
"large-v3": "mlx-community/whisper-large-v3-mlx",
"medium": "mlx-community/whisper-medium-mlx",
"small": "mlx-community/whisper-small-mlx",
"base": "mlx-community/whisper-base-mlx",
}
hf_repo = MODEL_HF_REPOS.get(model_name, f"mlx-community/whisper-{model_name}-mlx")
initial_prompt = _build_whisper_prompt(labeled_transcript)
print(f" Model: {model_name} (MLX GPU)")
if labeled_transcript:
print(f" Prompt: {initial_prompt[:100]}...")
result = mlx_whisper.transcribe(
audio_path,
path_or_hf_repo=hf_repo,
language="en",
word_timestamps=True,
initial_prompt=initial_prompt,
)
segments = []
for seg in result.get("segments", []):
words = []
for w in seg.get("words", []):
words.append({
"word": w["word"].strip(),
"start": round(w["start"], 3),
"end": round(w["end"], 3),
})
segments.append({
"start": round(seg["start"], 3),
"end": round(seg["end"], 3),
"text": seg["text"].strip(),
"words": words,
})
duration = segments[-1]["end"] if segments else 0
print(f" Transcribed {duration:.1f}s ({len(segments)} segments)")
with open(cache_path, "w") as f:
json.dump(segments, f)
print(f" Cached to {cache_path}")
return segments
def refine_clip_timestamps(audio_path: str, clips: list[dict],
quality_model: str, labeled_transcript: str = "",
) -> dict[int, list[dict]]:
"""Re-transcribe just the selected clip ranges with mlx-whisper (GPU).
Extracts each clip segment, runs the quality model on it, and returns
refined segments with word-level timestamps mapped back to the original timeline.
Returns: {clip_index: [segments]} keyed by clip index
"""
try:
import mlx_whisper
except ImportError:
print("Error: mlx-whisper not installed. Run: pip install mlx-whisper")
sys.exit(1)
MODEL_HF_REPOS = {
"distil-large-v3": "mlx-community/distil-whisper-large-v3",
"large-v3": "mlx-community/whisper-large-v3-mlx",
"medium": "mlx-community/whisper-medium-mlx",
"small": "mlx-community/whisper-small-mlx",
"base": "mlx-community/whisper-base-mlx",
}
hf_repo = MODEL_HF_REPOS.get(quality_model, f"mlx-community/whisper-{quality_model}-mlx")
print(f" Refinement model: {quality_model} (MLX GPU)")
initial_prompt = _build_whisper_prompt(labeled_transcript)
refined = {}
with tempfile.TemporaryDirectory() as tmp:
for i, clip in enumerate(clips):
pad = 3.0
seg_start = max(0, clip["start_time"] - pad)
seg_end = clip["end_time"] + pad
cache_key = f"{Path(audio_path).stem}_clip{i}_{seg_start:.1f}-{seg_end:.1f}"
cache_path = Path(audio_path).parent / f".whisper_refine_{quality_model}_{cache_key}.json"
if cache_path.exists():
print(f" Clip {i+1}: Using cached refinement")
with open(cache_path) as f:
refined[i] = json.load(f)
continue
seg_path = os.path.join(tmp, f"segment_{i}.wav")
cmd = [
"ffmpeg", "-y", "-ss", str(seg_start), "-t", str(seg_end - seg_start),
"-i", audio_path, "-ar", "16000", "-ac", "1", seg_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f" Clip {i+1}: Failed to extract segment")
refined[i] = []
continue
mlx_result = mlx_whisper.transcribe(
seg_path,
path_or_hf_repo=hf_repo,
language="en",
word_timestamps=True,
initial_prompt=initial_prompt,
)
segments = []
for seg_data in mlx_result.get("segments", []):
text = seg_data["text"].strip()
words = []
for w in seg_data.get("words", []):
words.append({
"word": w["word"].strip(),
"start": round(w["start"] + seg_start, 3),
"end": round(w["end"] + seg_start, 3),
})
segments.append({
"start": round(seg_data["start"] + seg_start, 3),
"end": round(seg_data["end"] + seg_start, 3),
"text": text,
"words": words,
})
refined[i] = segments
seg_duration = segments[-1]["end"] - segments[0]["start"] if segments else 0
print(f" Clip {i+1}: Refined {seg_duration:.1f}s → {len(segments)} segments")
with open(cache_path, "w") as f:
json.dump(segments, f)
return refined
def get_transcript_text(segments: list[dict]) -> str:
"""Build timestamped transcript text for the LLM."""
lines = []
for seg in segments:
mins = int(seg["start"] // 60)
secs = int(seg["start"] % 60)
lines.append(f"[{mins:02d}:{secs:02d}] {seg['text']}")
return "\n".join(lines)
def select_clips_with_llm(transcript_text: str, labeled_transcript: str,
chapters_json: str | None, count: int) -> list[dict]:
"""Ask LLM to pick the best clip-worthy moments."""
if not OPENROUTER_API_KEY:
print("Error: OPENROUTER_API_KEY not set in .env")
sys.exit(1)
chapters_context = ""
if chapters_json:
chapters_context = f"\nCHAPTERS:\n{chapters_json}\n"
labeled_context = ""
if labeled_transcript:
# Truncate if too long — LLM needs the gist, not every word
if len(labeled_transcript) > 12000:
labeled_context = f"\nSPEAKER-LABELED TRANSCRIPT (truncated):\n{labeled_transcript[:12000]}...\n"
else:
labeled_context = f"\nSPEAKER-LABELED TRANSCRIPT:\n{labeled_transcript}\n"
prompt = f"""You are selecting the {count} best moments from a podcast episode for short-form video clips (TikTok/YouTube Shorts/Reels).
Each clip should be 30-60 seconds long and contain a single compelling moment — a funny exchange, an emotional beat, a surprising take, or an interesting story.
TIMESTAMPED TRANSCRIPT:
{transcript_text}
{chapters_context}{labeled_context}
Pick the {count} best moments. For each, return:
- title: A catchy, short title for the clip (max 8 words)
- start_time: Start timestamp in seconds (float). Start a few seconds before the key moment for context.
- end_time: End timestamp in seconds (float). 30-60 seconds after start_time.
- caption_text: The key quote or line that makes this moment clip-worthy (1-2 sentences max)
IMPORTANT:
- Use the timestamps from the transcript to set precise start/end times
- Ensure clips don't overlap
- Prefer moments with back-and-forth dialog over monologues
- Avoid intro/outro segments
Respond with ONLY a JSON array, no markdown or explanation:
[{{"title": "...", "start_time": 0.0, "end_time": 0.0, "caption_text": "..."}}]"""
response = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": "anthropic/claude-sonnet-4-5",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 2048,
"temperature": 0.3,
},
)
if response.status_code != 200:
print(f"Error from OpenRouter: {response.text}")
sys.exit(1)
content = response.json()["choices"][0]["message"]["content"].strip()
if content.startswith("```"):
content = re.sub(r"^```(?:json)?\n?", "", content)
content = re.sub(r"\n?```$", "", content)
try:
clips = json.loads(content)
except json.JSONDecodeError as e:
print(f"Error parsing LLM response: {e}")
print(f"Response was: {content[:500]}")
sys.exit(1)
# Validate and clamp durations
validated = []
for clip in clips:
duration = clip["end_time"] - clip["start_time"]
if duration < 15:
clip["end_time"] = clip["start_time"] + 30
elif duration > 75:
clip["end_time"] = clip["start_time"] + 60
validated.append(clip)
return validated
def generate_social_metadata(clips: list[dict], labeled_transcript: str,
episode_number: int | None) -> list[dict]:
"""Generate social media descriptions and hashtags for each clip."""
if not OPENROUTER_API_KEY:
print("Error: OPENROUTER_API_KEY not set in .env")
sys.exit(1)
clips_summary = "\n".join(
f'{i+1}. "{c["title"]}"{c["caption_text"]}'
for i, c in enumerate(clips)
)
episode_context = f"This is Episode {episode_number} of " if episode_number else "This is an episode of "
prompt = f"""{episode_context}the "Luke at the Roost" podcast — a late-night call-in show where AI-generated callers share stories, confessions, and hot takes with host Luke.
Here are {len(clips)} clips selected from this episode:
{clips_summary}
For each clip, generate:
1. description: A short, engaging description for social media (1-2 sentences, hook the viewer, conversational tone). Do NOT include hashtags in the description.
2. hashtags: An array of 5-8 hashtags. Always include #lukeattheroost and #podcast. Add topic-relevant and trending-style tags.
Respond with ONLY a JSON array matching the clip order:
[{{"description": "...", "hashtags": ["#tag1", "#tag2", ...]}}]"""
response = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": "anthropic/claude-sonnet-4-5",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 2048,
"temperature": 0.7,
},
)
if response.status_code != 200:
print(f"Error from OpenRouter: {response.text}")
return clips
content = response.json()["choices"][0]["message"]["content"].strip()
if content.startswith("```"):
content = re.sub(r"^```(?:json)?\n?", "", content)
content = re.sub(r"\n?```$", "", content)
try:
metadata = json.loads(content)
except json.JSONDecodeError as e:
print(f"Error parsing social metadata: {e}")
return clips
for i, clip in enumerate(clips):
if i < len(metadata):
clip["description"] = metadata[i].get("description", "")
clip["hashtags"] = metadata[i].get("hashtags", [])
return clips
def snap_to_sentences(clips: list[dict], segments: list[dict]) -> list[dict]:
"""Snap clip start/end times to sentence boundaries.
Uses Whisper segment boundaries and punctuation to find the nearest
sentence start/end so clips don't begin or end mid-sentence.
"""
# Build a list of sentence boundary timestamps from Whisper segments.
# A sentence boundary is: the start of a segment, or a word right after .?!
sentence_starts = []
sentence_ends = []
for seg in segments:
sentence_starts.append(seg["start"])
sentence_ends.append(seg["end"])
# Also find sentence breaks within segments using word punctuation
words = seg.get("words", [])
for i, w in enumerate(words):
if w["word"].rstrip().endswith(('.', '?', '!')):
sentence_ends.append(w["end"])
if i + 1 < len(words):
sentence_starts.append(words[i + 1]["start"])
sentence_starts.sort()
sentence_ends.sort()
for clip in clips:
original_start = clip["start_time"]
original_end = clip["end_time"]
# Find nearest sentence start at or before the clip start
# Look up to 5s back for a sentence boundary
best_start = original_start
best_start_dist = float('inf')
for s in sentence_starts:
dist = abs(s - original_start)
if dist < best_start_dist and s <= original_start + 1:
best_start = s
best_start_dist = dist
if s > original_start + 1:
break
# Find nearest sentence end at or after the clip end
# Look up to 5s forward for a sentence boundary
best_end = original_end
best_end_dist = float('inf')
for e in sentence_ends:
if e < original_end - 5:
continue
dist = abs(e - original_end)
if dist < best_end_dist:
best_end = e
best_end_dist = dist
if e > original_end + 5:
break
# Make sure we didn't create a clip that's too short or too long
duration = best_end - best_start
if duration < 20:
# Too short — extend end to next sentence boundary
for e in sentence_ends:
if e > best_start + 25:
best_end = e
break
elif duration > 75:
# Too long — pull end back
for e in reversed(sentence_ends):
if best_start + 30 <= e <= best_start + 65:
best_end = e
break
clip["start_time"] = best_start
clip["end_time"] = best_end
return clips
def get_words_in_range(segments: list[dict], start: float, end: float) -> list[dict]:
"""Extract word-level timestamps for a time range from Whisper segments."""
words = []
for seg in segments:
if seg["end"] < start or seg["start"] > end:
continue
for w in seg.get("words", []):
if w["start"] >= start - 0.5 and w["end"] <= end + 0.5:
words.append(w)
return words
def _edit_distance(a: str, b: str) -> int:
"""Levenshtein edit distance between two strings."""
if abs(len(a) - len(b)) > 5:
return max(len(a), len(b))
prev = list(range(len(b) + 1))
for i in range(1, len(a) + 1):
curr = [i] + [0] * len(b)
for j in range(1, len(b) + 1):
cost = 0 if a[i - 1] == b[j - 1] else 1
curr[j] = min(curr[j - 1] + 1, prev[j] + 1, prev[j - 1] + cost)
prev = curr
return prev[len(b)]
def _word_score(a: str, b: str) -> int:
"""Alignment score: +2 exact, +1 fuzzy (edit dist ≤2), -1 mismatch."""
if a == b:
return 2
if len(a) >= 3 and len(b) >= 3 and _edit_distance(a, b) <= 2:
return 1
return -1
def _align_sequences(whisper_words: list[str],
labeled_words: list[str]) -> list[tuple[int | None, int | None]]:
"""Needleman-Wunsch DP alignment between whisper and labeled word sequences.
Returns list of (whisper_idx, labeled_idx) pairs where None = gap.
"""
n = len(whisper_words)
m = len(labeled_words)
GAP = -1
# Build score matrix
score = [[0] * (m + 1) for _ in range(n + 1)]
for i in range(1, n + 1):
score[i][0] = score[i - 1][0] + GAP
for j in range(1, m + 1):
score[0][j] = score[0][j - 1] + GAP
for i in range(1, n + 1):
for j in range(1, m + 1):
match = score[i - 1][j - 1] + _word_score(whisper_words[i - 1], labeled_words[j - 1])
delete = score[i - 1][j] + GAP
insert = score[i][j - 1] + GAP
score[i][j] = max(match, delete, insert)
# Traceback
pairs = []
i, j = n, m
while i > 0 or j > 0:
if i > 0 and j > 0 and score[i][j] == score[i - 1][j - 1] + _word_score(whisper_words[i - 1], labeled_words[j - 1]):
pairs.append((i - 1, j - 1))
i -= 1
j -= 1
elif i > 0 and score[i][j] == score[i - 1][j] + GAP:
pairs.append((i - 1, None))
i -= 1
else:
pairs.append((None, j - 1))
j -= 1
pairs.reverse()
return pairs
def _parse_full_transcript(labeled_transcript: str) -> list[dict]:
"""Parse entire labeled transcript into flat word list with speaker metadata.
Returns list of {word: str, clean: str, speaker: str} for every word.
"""
result = []
for m in re.finditer(r'^([A-Z][A-Z\s\'-]+?):\s*(.+?)(?=\n[A-Z][A-Z\s\'-]+?:|\n\n|\Z)',
labeled_transcript, re.MULTILINE | re.DOTALL):
speaker = m.group(1).strip()
text = m.group(2)
for w in text.split():
original = w.strip()
clean = re.sub(r"[^\w']", '', original.lower())
if clean:
result.append({"word": original, "clean": clean, "speaker": speaker})
return result
def _find_transcript_region(labeled_words: list[dict], whisper_words: list[str],
) -> tuple[int, int] | None:
"""Find the region of labeled_words that best matches the whisper words.
Uses multi-anchor matching: tries phrases from start, middle, and end
of the whisper words to find a consensus region.
"""
if not whisper_words or not labeled_words:
return None
labeled_clean = [w["clean"] for w in labeled_words]
n_labeled = len(labeled_clean)
def find_phrase(phrase_words: list[str], search_start: int = 0,
search_end: int | None = None) -> int | None:
"""Find a phrase in labeled_clean, return index of first word or None."""
if search_end is None:
search_end = n_labeled
plen = len(phrase_words)
for i in range(search_start, min(search_end, n_labeled - plen + 1)):
match = True
for k in range(plen):
if _word_score(phrase_words[k], labeled_clean[i + k]) < 1:
match = False
break
if match:
return i
return None
# Try anchors from different positions in the whisper words
anchors = []
n_whisper = len(whisper_words)
anchor_positions = [0, n_whisper // 2, max(0, n_whisper - 5)]
# Deduplicate positions
anchor_positions = sorted(set(anchor_positions))
for pos in anchor_positions:
for phrase_len in [5, 4, 3]:
phrase = whisper_words[pos:pos + phrase_len]
if len(phrase) < 3:
continue
idx = find_phrase(phrase)
if idx is not None:
# Estimate region start based on anchor's position in whisper
region_start = max(0, idx - pos)
anchors.append(region_start)
break
if not anchors:
return None
# Use median anchor as region start for robustness
anchors.sort()
region_start = anchors[len(anchors) // 2]
# Region extends to cover all whisper words plus margin
margin = max(20, n_whisper // 4)
region_start = max(0, region_start - margin)
region_end = min(n_labeled, region_start + n_whisper + 2 * margin)
return (region_start, region_end)
def add_speaker_labels(words: list[dict], labeled_transcript: str,
start_time: float, end_time: float,
segments: list[dict]) -> list[dict]:
"""Replace Whisper text with labeled transcript text, keeping Whisper timestamps.
The labeled transcript is the source of truth for TEXT. Whisper is only used
for TIMESTAMPS. Uses DP alignment to map between the two, then rebuilds the
word list from the labeled transcript with interpolated timestamps for any
words Whisper missed.
"""
if not labeled_transcript or not words:
return words
all_labeled = _parse_full_transcript(labeled_transcript)
if not all_labeled:
return words
whisper_clean = []
for w in words:
clean = re.sub(r"[^\w']", '', w["word"].lower())
whisper_clean.append(clean if clean else w["word"].lower())
region = _find_transcript_region(all_labeled, whisper_clean)
if region is None:
return words
region_start, region_end = region
region_words = all_labeled[region_start:region_end]
region_clean = [w["clean"] for w in region_words]
pairs = _align_sequences(whisper_clean, region_clean)
# Build mapping: labeled_idx -> whisper_idx (for timestamp lookup)
labeled_to_whisper = {}
for w_idx, l_idx in pairs:
if w_idx is not None and l_idx is not None:
score = _word_score(whisper_clean[w_idx], region_clean[l_idx])
if score > 0:
labeled_to_whisper[l_idx] = w_idx
# Find the range of labeled words that actually overlap with this clip
# Use only labeled indices that have a whisper match to determine boundaries
matched_labeled_indices = sorted(labeled_to_whisper.keys())
if not matched_labeled_indices:
return words
first_labeled = matched_labeled_indices[0]
last_labeled = matched_labeled_indices[-1]
# Build output from labeled transcript words with whisper timestamps
result = []
corrections = 0
for l_idx in range(first_labeled, last_labeled + 1):
labeled_word = region_words[l_idx]
word_text = re.sub(r'[^\w\s\'-]', '', labeled_word["word"]).strip()
if not word_text:
continue
if l_idx in labeled_to_whisper:
w_idx = labeled_to_whisper[l_idx]
ts_start = words[w_idx]["start"]
ts_end = words[w_idx]["end"]
if word_text.lower() != whisper_clean[w_idx]:
corrections += 1
else:
# Interpolate timestamp from neighbors
ts_start, ts_end = _interpolate_timestamp(l_idx, labeled_to_whisper, words)
result.append({
"word": word_text,
"start": ts_start,
"end": ts_end,
"speaker": labeled_word["speaker"],
})
if corrections:
print(f" Corrected {corrections} words from labeled transcript")
if len(result) != len(words):
print(f" Word count: {len(words)} (whisper) -> {len(result)} (labeled)")
return result
def _interpolate_speaker(idx: int, matched: dict, n_words: int) -> str | None:
"""Find speaker from nearest matched neighbor."""
for dist in range(1, n_words):
before = idx - dist
after = idx + dist
if before >= 0 and before in matched:
return matched[before][0]["speaker"]
if after < n_words and after in matched:
return matched[after][0]["speaker"]
return None
def _interpolate_timestamp(labeled_idx: int, labeled_to_whisper: dict,
words: list[dict]) -> tuple[float, float]:
"""Interpolate timestamp for a labeled word with no direct whisper match.
Finds the nearest matched neighbors before and after, then linearly
interpolates based on position.
"""
before_l = after_l = None
for dist in range(1, len(labeled_to_whisper) + 10):
if before_l is None and (labeled_idx - dist) in labeled_to_whisper:
before_l = labeled_idx - dist
if after_l is None and (labeled_idx + dist) in labeled_to_whisper:
after_l = labeled_idx + dist
if before_l is not None and after_l is not None:
break
if before_l is not None and after_l is not None:
w_before = words[labeled_to_whisper[before_l]]
w_after = words[labeled_to_whisper[after_l]]
span = after_l - before_l
frac = (labeled_idx - before_l) / span
start = w_before["end"] + frac * (w_after["start"] - w_before["end"])
duration = (w_after["start"] - w_before["end"]) / span
return start, start + max(duration, 0.1)
elif before_l is not None:
w = words[labeled_to_whisper[before_l]]
offset = (labeled_idx - before_l) * 0.3
return w["end"] + offset, w["end"] + offset + 0.3
elif after_l is not None:
w = words[labeled_to_whisper[after_l]]
offset = (after_l - labeled_idx) * 0.3
return w["start"] - offset - 0.3, w["start"] - offset
else:
return 0.0, 0.3
def polish_clip_words(words: list[dict], labeled_transcript: str = "") -> list[dict]:
"""Use LLM to add punctuation and fix capitalization.
The word text is already correct (from the labeled transcript). This step
only adds sentence punctuation and proper capitalization.
"""
if not words or not OPENROUTER_API_KEY:
return words
raw_text = " ".join(w["word"] for w in words)
prompt = f"""Add punctuation and capitalization to this podcast transcript excerpt so it reads as proper sentences.
RULES:
- Keep the EXACT same number of words in the EXACT same order
- The words themselves are already correct — do NOT change any word's spelling
- Only add punctuation (periods, commas, question marks, exclamation marks) and fix capitalization
- Do NOT add, remove, merge, or reorder words
- Contractions count as one word (don't = 1 word)
- Return ONLY the corrected text, nothing else
RAW TEXT ({len(words)} words):
{raw_text}"""
try:
response = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": "anthropic/claude-sonnet-4-5",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 2048,
"temperature": 0,
},
timeout=30,
)
if response.status_code != 200:
print(f" Polish failed ({response.status_code}), using raw text")
return words
polished = response.json()["choices"][0]["message"]["content"].strip()
polished_words = polished.split()
if len(polished_words) != len(words):
print(f" Polish word count mismatch ({len(polished_words)} vs {len(words)}), using raw text")
return words
changes = 0
for i, pw in enumerate(polished_words):
if pw != words[i]["word"]:
changes += 1
words[i]["word"] = pw
if changes:
print(f" Polished {changes} words")
except Exception as e:
print(f" Polish error: {e}")
return words
def group_words_into_lines(words: list[dict], clip_start: float,
clip_duration: float) -> list[dict]:
"""Group words into timed caption lines for rendering.
Splits at speaker changes so each line has a single, correct speaker label.
Returns list of: {start, end, speaker, words: [{word, highlighted}]}
"""
if not words:
return []
# First split at speaker boundaries, then group into display lines
speaker_groups = []
current_group = []
current_speaker = words[0].get("speaker", "")
for w in words:
speaker = w.get("speaker", "")
if speaker and speaker != current_speaker and current_group:
speaker_groups.append((current_speaker, current_group))
current_group = []
current_speaker = speaker
current_group.append(w)
if current_group:
speaker_groups.append((current_speaker, current_group))
# Now group each speaker's words into display lines (5-7 words)
raw_lines = []
for speaker, group_words in speaker_groups:
current_line = []
for w in group_words:
current_line.append(w)
if len(current_line) >= 6 or w["word"].rstrip().endswith(('.', '?', '!', ',')):
if len(current_line) >= 3:
raw_lines.append((speaker, current_line))
current_line = []
if current_line:
if raw_lines and len(current_line) < 3 and raw_lines[-1][0] == speaker:
raw_lines[-1] = (speaker, raw_lines[-1][1] + current_line)
else:
raw_lines.append((speaker, current_line))
lines = []
for speaker, line_words in raw_lines:
line_start = line_words[0]["start"] - clip_start
line_end = line_words[-1]["end"] - clip_start
if line_start < 0:
line_start = 0
if line_end > clip_duration:
line_end = clip_duration
if line_end <= line_start:
continue
lines.append({
"start": line_start,
"end": line_end,
"speaker": speaker,
"words": line_words,
})
return lines
def extract_clip_audio(audio_path: str, start: float, end: float,
output_path: str) -> bool:
"""Extract audio clip with fade in/out."""
duration = end - start
fade_in = 0.3
fade_out = 0.5
af = f"afade=t=in:d={fade_in},afade=t=out:st={duration - fade_out}:d={fade_out}"
cmd = [
"ffmpeg", "-y",
"-ss", str(start),
"-t", str(duration),
"-i", audio_path,
"-af", af,
"-ab", "192k",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode == 0
def generate_background_image(episode_number: int, clip_title: str,
output_path: str) -> bool:
"""Generate 9:16 vertical background with blurred/cropped cover art."""
from PIL import Image, ImageDraw, ImageFilter, ImageFont
if not COVER_ART.exists():
print(f" Warning: Cover art not found at {COVER_ART}")
# Create solid dark background fallback
img = Image.new("RGB", (WIDTH, HEIGHT), (20, 15, 30))
img.save(output_path)
return True
cover = Image.open(COVER_ART).convert("RGB")
# Scale cover to fill 1080x1920 (crop to fit)
cover_ratio = cover.width / cover.height
target_ratio = WIDTH / HEIGHT
if cover_ratio > target_ratio:
new_h = HEIGHT
new_w = int(HEIGHT * cover_ratio)
else:
new_w = WIDTH
new_h = int(WIDTH / cover_ratio)
cover = cover.resize((new_w, new_h), Image.LANCZOS)
# Center crop
left = (new_w - WIDTH) // 2
top = (new_h - HEIGHT) // 2
cover = cover.crop((left, top, left + WIDTH, top + HEIGHT))
# Heavy blur + darken for background
bg = cover.filter(ImageFilter.GaussianBlur(radius=30))
from PIL import ImageEnhance
bg = ImageEnhance.Brightness(bg).enhance(0.3)
# Place sharp cover art centered, sized to ~60% width
art_size = int(WIDTH * 0.6)
art = Image.open(COVER_ART).convert("RGB")
art = art.resize((art_size, art_size), Image.LANCZOS)
# Add rounded shadow effect (just darken behind)
art_x = (WIDTH - art_size) // 2
art_y = int(HEIGHT * 0.18)
bg.paste(art, (art_x, art_y))
# Draw text overlays
draw = ImageDraw.Draw(bg)
try:
font_ep = ImageFont.truetype(FONT_BOLD, 42)
font_title = ImageFont.truetype(FONT_BOLD, 56)
font_url = ImageFont.truetype(FONT_SEMIBOLD, 32)
except OSError:
font_ep = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 42)
font_title = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 56)
font_url = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 32)
margin = 60
# Episode label at top
ep_text = f"EPISODE {episode_number}" if episode_number else "LUKE AT THE ROOST"
draw.text((margin, 80), ep_text, font=font_ep, fill=(255, 200, 80))
# Clip title below episode label
# Word wrap the title
import textwrap
wrapped_title = textwrap.fill(clip_title, width=22)
draw.text((margin, 140), wrapped_title, font=font_title, fill=(255, 255, 255))
# Watermark at bottom
url_text = "lukeattheroost.com"
bbox = draw.textbbox((0, 0), url_text, font=font_url)
url_w = bbox[2] - bbox[0]
draw.text(((WIDTH - url_w) // 2, HEIGHT - 80), url_text,
font=font_url, fill=(255, 200, 80, 200))
bg.save(output_path, "PNG")
return True
def generate_caption_frames(bg_path: str, caption_lines: list[dict],
clip_start: float, duration: float,
tmp_dir: Path, fps: int = 10) -> str:
"""Generate caption frame PNGs and a concat file for ffmpeg.
Uses a low FPS (10) since the background is static — only captions change.
Returns path to the concat file.
"""
from PIL import Image, ImageDraw, ImageFont
bg = Image.open(bg_path).convert("RGB")
try:
font_caption = ImageFont.truetype(FONT_BOLD, 52)
font_speaker = ImageFont.truetype(FONT_SEMIBOLD, 40)
except OSError:
font_caption = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 52)
font_speaker = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 40)
frames_dir = tmp_dir / "frames"
frames_dir.mkdir(exist_ok=True)
n_frames = int(duration * fps)
frame_duration = 1.0 / fps
concat_lines = []
prev_state = None # (line_idx, highlighted_word_idx) — only reuse when both match
prev_frame_path = None
for frame_num in range(n_frames):
t = frame_num * frame_duration
# Find active caption line
active_idx = -1
active_line = None
for i, line in enumerate(caption_lines):
if line["start"] <= t <= line["end"]:
active_idx = i
active_line = line
break
# Find which word is currently highlighted
highlight_idx = -1
if active_line:
for wi, w in enumerate(active_line["words"]):
word_rel_start = w["start"] - clip_start
word_rel_end = w["end"] - clip_start
if word_rel_start <= t <= word_rel_end:
highlight_idx = wi
break
if highlight_idx == -1:
# Between words — highlight the last spoken word
for wi in range(len(active_line["words"]) - 1, -1, -1):
if t > active_line["words"][wi]["end"] - clip_start:
highlight_idx = wi
break
# Reuse previous frame only if same line AND same highlighted word
state = (active_idx, highlight_idx)
if state == prev_state and prev_frame_path:
concat_lines.append(f"file '{prev_frame_path}'")
concat_lines.append(f"duration {frame_duration:.4f}")
continue
frame = bg.copy()
if active_line:
draw = ImageDraw.Draw(frame)
margin = 60
caption_y = int(HEIGHT * 0.78)
# Speaker label
if active_line.get("speaker"):
for dx, dy in [(-1, -1), (-1, 1), (1, -1), (1, 1)]:
draw.text((margin + dx, caption_y - 55 + dy),
active_line["speaker"],
font=font_speaker, fill=(0, 0, 0))
draw.text((margin, caption_y - 55), active_line["speaker"],
font=font_speaker, fill=(255, 200, 80))
# Caption text — all words visible, current word highlighted yellow
x = margin
y = caption_y
for wi, w in enumerate(active_line["words"]):
word_text = w["word"] + " "
if wi == highlight_idx:
color = (255, 200, 80) # Yellow — currently spoken
elif wi < highlight_idx or (highlight_idx == -1 and t > w["end"] - clip_start):
color = (255, 255, 255) # White — already spoken
else:
color = (180, 180, 180) # Gray — upcoming
bbox = draw.textbbox((0, 0), word_text, font=font_caption)
w_width = bbox[2] - bbox[0]
# Wrap line
if x + w_width > WIDTH - margin:
x = margin
y += 65
# Outline
for dx, dy in [(-2, -2), (-2, 2), (2, -2), (2, 2)]:
draw.text((x + dx, y + dy), w["word"],
font=font_caption, fill=(0, 0, 0))
draw.text((x, y), w["word"], font=font_caption, fill=color)
x += w_width
frame_path = str(frames_dir / f"frame_{frame_num:05d}.png")
frame.save(frame_path, "PNG")
concat_lines.append(f"file '{frame_path}'")
concat_lines.append(f"duration {frame_duration:.4f}")
prev_state = state
prev_frame_path = frame_path
# Final frame needs duration too
if prev_frame_path:
concat_lines.append(f"file '{prev_frame_path}'")
concat_lines.append(f"duration {frame_duration:.4f}")
concat_path = str(tmp_dir / "concat.txt")
with open(concat_path, "w") as f:
f.write("\n".join(concat_lines))
return concat_path
def generate_clip_video(audio_path: str, background_path: str,
caption_lines: list[dict], clip_start: float,
output_path: str, duration: float,
tmp_dir: Path) -> bool:
"""Generate clip video with burned-in captions using Pillow + ffmpeg."""
if caption_lines:
# Generate frames with captions
concat_path = generate_caption_frames(
background_path, caption_lines, clip_start, duration, tmp_dir
)
cmd = [
"ffmpeg", "-y",
"-f", "concat", "-safe", "0", "-i", concat_path,
"-i", audio_path,
"-c:v", "libx264", "-preset", "medium", "-crf", "23",
"-pix_fmt", "yuv420p",
"-c:a", "aac", "-b:a", "192k",
"-t", str(duration),
"-shortest",
"-r", "30",
output_path,
]
else:
# No captions — just static image + audio
cmd = [
"ffmpeg", "-y",
"-loop", "1", "-i", background_path,
"-i", audio_path,
"-c:v", "libx264", "-preset", "medium", "-crf", "23",
"-pix_fmt", "yuv420p",
"-c:a", "aac", "-b:a", "192k",
"-t", str(duration),
"-shortest",
"-r", "30",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f" ffmpeg error: {result.stderr[-300:]}")
return False
return True
def generate_clip_video_remotion(
audio_path: str,
caption_lines: list[dict],
clip_start: float,
clip_title: str,
episode_number: int | None,
output_path: str,
duration: float,
) -> bool:
"""Generate clip video using Remotion (animated captions, waveform, dynamic background)."""
if not REMOTION_DIR.exists():
print(f" Remotion project not found at {REMOTION_DIR}")
return False
# Copy assets to Remotion public/ dir
public_dir = REMOTION_DIR / "public"
public_dir.mkdir(exist_ok=True)
# Copy audio
audio_dest = public_dir / "clip-audio.mp3"
import shutil
shutil.copy2(audio_path, audio_dest)
# Copy cover art
cover_dest = public_dir / "cover.png"
if COVER_ART.exists() and (not cover_dest.exists()
or cover_dest.stat().st_mtime < COVER_ART.stat().st_mtime):
shutil.copy2(COVER_ART, cover_dest)
# Build caption data for Remotion — convert word timestamps to clip-relative
remotion_lines = []
for line in caption_lines:
remotion_words = []
for w in line["words"]:
w_start = max(0, round(w["start"] - clip_start, 3))
w_end = min(round(duration, 3), round(w["end"] - clip_start, 3))
if w_end <= w_start:
w_end = w_start + 0.1
remotion_words.append({
"word": w["word"].strip(),
"start": w_start,
"end": w_end,
})
remotion_lines.append({
"start": round(line["start"], 3),
"end": round(line["end"], 3),
"speaker": line.get("speaker", ""),
"words": remotion_words,
})
episode_label = f"EPISODE {episode_number}" if episode_number else "LUKE AT THE ROOST"
props = {
"captionLines": remotion_lines,
"clipTitle": clip_title,
"episodeLabel": episode_label,
"durationSeconds": round(duration + 0.5, 1), # small padding
"audioFile": "clip-audio.mp3",
"coverFile": "cover.png",
}
# Write props to temp file
props_path = REMOTION_DIR / "render-props.json"
with open(props_path, "w") as f:
json.dump(props, f)
cmd = [
"npx", "remotion", "render",
"src/index.ts", "PodcastClipDemo",
f"--props={props_path}",
"--timeout=60000",
"--log=verbose",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True, cwd=str(REMOTION_DIR))
props_path.unlink(missing_ok=True)
if result.returncode != 0:
stderr = result.stderr
# Show head (error message) and tail (stack trace) of stderr
if len(stderr) > 1000:
print(f" Remotion error (first 500 chars): {stderr[:500]}")
print(f" ... (last 500 chars): {stderr[-500:]}")
else:
print(f" Remotion error: {stderr}")
return False
return True
def slugify(text: str) -> str:
"""Convert text to URL-friendly slug."""
slug = re.sub(r'[^a-z0-9]+', '-', text.lower()).strip('-')
return slug[:50]
def detect_episode_number(audio_path: str) -> int | None:
"""Try to detect episode number from filename."""
name = Path(audio_path).stem
m = re.search(r'(?:episode|ep|podcast)[-_]?(\d+)', name, re.IGNORECASE)
if m:
return int(m.group(1))
m = re.search(r'(\d+)', name)
if m:
return int(m.group(1))
return None
def fetch_episodes() -> list[dict]:
"""Fetch episode list from Castopod RSS feed."""
print("Fetching episodes from Castopod...")
try:
resp = requests.get(RSS_FEED_URL, timeout=15)
resp.raise_for_status()
except requests.RequestException as e:
print(f"Error fetching RSS feed: {e}")
sys.exit(1)
root = ET.fromstring(resp.content)
ns = {"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd"}
episodes = []
for item in root.findall(".//item"):
title = item.findtext("title", "")
enclosure = item.find("enclosure")
audio_url = enclosure.get("url", "") if enclosure is not None else ""
duration = item.findtext("itunes:duration", "", ns)
ep_num = item.findtext("itunes:episode", "", ns)
pub_date = item.findtext("pubDate", "")
if not audio_url:
continue
episodes.append({
"title": title,
"audio_url": audio_url,
"duration": duration,
"episode_number": int(ep_num) if ep_num and ep_num.isdigit() else None,
"pub_date": pub_date,
})
return episodes
def pick_episode(episodes: list[dict]) -> dict:
"""Display episode list and let user pick one."""
if not episodes:
print("No episodes found.")
sys.exit(1)
# Sort by episode number (episodes without numbers go to the end)
episodes.sort(key=lambda e: (e["episode_number"] is None, e["episode_number"] or 0))
print(f"\nFound {len(episodes)} episodes:\n")
for ep in episodes:
num = ep['episode_number']
label = f"Ep{num}" if num else " "
dur = ep['duration'] or "?"
display_num = f"{num:>2}" if num else " ?"
print(f" {display_num}. [{label:>4}] {ep['title']} ({dur})")
print()
while True:
try:
choice = input("Select episode number (or 'q' to quit): ").strip()
if choice.lower() == 'q':
sys.exit(0)
num = int(choice)
# Match by episode number first
match = next((ep for ep in episodes if ep["episode_number"] == num), None)
if match:
return match
print(f" No episode #{num} found. Episodes: {', '.join(str(e['episode_number']) for e in episodes if e['episode_number'])}")
except (ValueError, EOFError):
print(" Enter an episode number")
def download_episode(episode: dict) -> Path:
"""Download episode audio, using a cache to avoid re-downloading."""
EPISODE_CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Build a filename from episode number or title slug
if episode["episode_number"]:
filename = f"episode-{episode['episode_number']}.mp3"
else:
filename = slugify(episode["title"]) + ".mp3"
cached = EPISODE_CACHE_DIR / filename
if cached.exists():
size_mb = cached.stat().st_size / (1024 * 1024)
print(f"Using cached: {cached.name} ({size_mb:.1f} MB)")
return cached
print(f"Downloading: {episode['title']}...")
try:
resp = requests.get(episode["audio_url"], stream=True, timeout=30)
resp.raise_for_status()
total = int(resp.headers.get("content-length", 0))
downloaded = 0
with open(cached, "wb") as f:
for chunk in resp.iter_content(chunk_size=1024 * 1024):
f.write(chunk)
downloaded += len(chunk)
if total:
pct = downloaded / total * 100
print(f"\r {downloaded / (1024*1024):.1f} / {total / (1024*1024):.1f} MB ({pct:.0f}%)", end="", flush=True)
else:
print(f"\r {downloaded / (1024*1024):.1f} MB", end="", flush=True)
print()
except requests.RequestException as e:
if cached.exists():
cached.unlink()
print(f"\nError downloading episode: {e}")
sys.exit(1)
size_mb = cached.stat().st_size / (1024 * 1024)
print(f"Saved: {cached.name} ({size_mb:.1f} MB)")
return cached
def main():
parser = argparse.ArgumentParser(description="Extract short-form clips from podcast episodes")
parser.add_argument("audio_file", nargs="?", help="Path to episode MP3 (optional if using --pick)")
parser.add_argument("--pick", "-p", action="store_true",
help="Pick an episode from Castopod to clip")
parser.add_argument("--transcript", "-t", help="Path to labeled transcript (.txt)")
parser.add_argument("--chapters", "-c", help="Path to chapters JSON")
parser.add_argument("--count", "-n", type=int, default=3, help="Number of clips to extract (default: 3)")
parser.add_argument("--episode-number", "-e", type=int, help="Episode number (auto-detected from filename)")
parser.add_argument("--output-dir", "-o", help="Output directory (default: clips/episode-N/)")
parser.add_argument("--audio-only", action="store_true", help="Only extract audio clips, skip video")
parser.add_argument("--fast-model", default=WHISPER_MODEL_FAST,
help=f"Fast Whisper model for clip identification (default: {WHISPER_MODEL_FAST})")
parser.add_argument("--quality-model", default=WHISPER_MODEL_QUALITY,
help=f"Quality Whisper model for clip refinement (default: {WHISPER_MODEL_QUALITY})")
parser.add_argument("--single-pass", action="store_true",
help="Use quality model for everything (slower, no two-pass)")
parser.add_argument("--legacy-video", action="store_true",
help="Use old PIL+ffmpeg video renderer instead of Remotion")
args = parser.parse_args()
# Default to --pick when no audio file provided
if not args.audio_file and not args.pick:
args.pick = True
if args.pick:
episodes = fetch_episodes()
selected = pick_episode(episodes)
audio_path = download_episode(selected)
episode_number = selected["episode_number"] or args.episode_number
else:
audio_path = Path(args.audio_file).expanduser().resolve()
if not audio_path.exists():
print(f"Error: Audio file not found: {audio_path}")
sys.exit(1)
episode_number = None
# Detect episode number
if not args.pick:
episode_number = args.episode_number or detect_episode_number(str(audio_path))
if args.episode_number:
episode_number = args.episode_number
# Resolve output directory
if args.output_dir:
output_dir = Path(args.output_dir)
elif episode_number:
output_dir = Path(__file__).parent / "clips" / f"episode-{episode_number}"
else:
output_dir = Path(__file__).parent / "clips" / audio_path.stem
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Clip extraction: {audio_path.name}")
if episode_number:
print(f"Episode: {episode_number}")
print(f"Output: {output_dir}")
print(f"Clips requested: {args.count}")
# Step 1: Load labeled transcript (needed to prime Whisper with names)
print(f"\n[1] Loading labeled transcript...")
labeled_transcript = ""
if args.transcript:
transcript_path = Path(args.transcript).expanduser().resolve()
if transcript_path.exists():
labeled_transcript = transcript_path.read_text()
print(f" Loaded: {transcript_path.name} ({len(labeled_transcript)} chars)")
else:
print(f" Warning: Transcript not found: {transcript_path}")
else:
# Auto-detect from website/transcripts/
transcripts_dir = Path(__file__).parent / "website" / "transcripts"
if episode_number and transcripts_dir.exists():
for f in transcripts_dir.iterdir():
if f.suffix == ".txt" and f"episode-{episode_number}" in f.name:
labeled_transcript = f.read_text()
print(f" Auto-detected: {f.name}")
break
if not labeled_transcript:
print(" No labeled transcript found (names may be inaccurate)")
# Step 2: Fast transcription for clip identification
two_pass = not args.single_pass and args.fast_model != args.quality_model
if two_pass:
print(f"\n[2/7] Fast transcription for clip identification ({args.fast_model})...")
else:
print(f"\n[2/6] Transcribing with word-level timestamps ({args.quality_model})...")
identify_model = args.fast_model if two_pass else args.quality_model
segments = transcribe_with_timestamps(
str(audio_path), identify_model, labeled_transcript
)
# Build timestamped transcript for LLM
transcript_text = get_transcript_text(segments)
# Load chapters if provided
chapters_json = None
if args.chapters:
chapters_path = Path(args.chapters).expanduser().resolve()
if chapters_path.exists():
with open(chapters_path) as f:
chapters_json = f.read()
print(f" Chapters loaded: {chapters_path.name}")
# Step 3: LLM selects best moments
step_total = 7 if two_pass else 6
print(f"\n[3/{step_total}] Selecting {args.count} best moments with LLM...")
clips = select_clips_with_llm(transcript_text, labeled_transcript,
chapters_json, args.count)
# Snap to sentence boundaries so clips don't start/end mid-sentence
clips = snap_to_sentences(clips, segments)
for i, clip in enumerate(clips):
duration = clip["end_time"] - clip["start_time"]
print(f" Clip {i+1}: \"{clip['title']}\" "
f"({clip['start_time']:.1f}s - {clip['end_time']:.1f}s, {duration:.0f}s)")
print(f" \"{clip['caption_text']}\"")
# Generate social media metadata
meta_step = 4
print(f"\n[{meta_step}/{step_total}] Generating social media descriptions...")
clips = generate_social_metadata(clips, labeled_transcript, episode_number)
for i, clip in enumerate(clips):
if "description" in clip:
print(f" Clip {i+1}: {clip['description'][:80]}...")
print(f" {' '.join(clip.get('hashtags', []))}")
# Step 5: Refine clip timestamps with quality model (two-pass only)
refined = {}
if two_pass:
print(f"\n[5/{step_total}] Refining clips with {args.quality_model}...")
refined = refine_clip_timestamps(
str(audio_path), clips, args.quality_model, labeled_transcript
)
# Re-snap to sentence boundaries using refined segments
for i, clip in enumerate(clips):
if i in refined and refined[i]:
clip_segments = refined[i]
clips[i:i+1] = snap_to_sentences([clip], clip_segments)
# Step N: Extract audio clips
extract_step = 6 if two_pass else 5
print(f"\n[{extract_step}/{step_total}] Extracting audio clips...")
for i, clip in enumerate(clips):
slug = slugify(clip["title"])
mp3_path = output_dir / f"clip-{i+1}-{slug}.mp3"
if extract_clip_audio(str(audio_path), clip["start_time"], clip["end_time"],
str(mp3_path)):
print(f" Clip {i+1} audio: {mp3_path.name}")
else:
print(f" Error extracting clip {i+1} audio")
video_step = 7 if two_pass else 6
if args.audio_only:
print(f"\n[{video_step}/{step_total}] Skipped video generation (--audio-only)")
print(f"\nDone! {len(clips)} audio clips saved to {output_dir}")
return
# Step N: Generate video clips
use_remotion = REMOTION_DIR.exists() and not args.legacy_video
renderer = "Remotion" if use_remotion else "PIL+ffmpeg"
print(f"\n[{video_step}/{step_total}] Generating video clips ({renderer})...")
with tempfile.TemporaryDirectory() as tmp:
tmp_dir = Path(tmp)
for i, clip in enumerate(clips):
slug = slugify(clip["title"])
mp3_path = output_dir / f"clip-{i+1}-{slug}.mp3"
mp4_path = output_dir / f"clip-{i+1}-{slug}.mp4"
duration = clip["end_time"] - clip["start_time"]
print(f" Clip {i+1}: Generating video...")
# Get word timestamps — use refined segments if available
word_source = refined[i] if (two_pass and i in refined and refined[i]) else segments
clip_words = get_words_in_range(word_source, clip["start_time"], clip["end_time"])
# Add speaker labels
clip_words = add_speaker_labels(clip_words, labeled_transcript,
clip["start_time"], clip["end_time"],
word_source)
# Polish text with LLM (fix punctuation, capitalization, mishearings)
clip_words = polish_clip_words(clip_words, labeled_transcript)
# Group words into timed caption lines
caption_lines = group_words_into_lines(
clip_words, clip["start_time"], duration
)
if use_remotion:
if generate_clip_video_remotion(
str(mp3_path), caption_lines, clip["start_time"],
clip["title"], episode_number, str(mp4_path), duration
):
file_size = mp4_path.stat().st_size / (1024 * 1024)
print(f" Clip {i+1} video: {mp4_path.name} ({file_size:.1f} MB)")
else:
print(f" Error generating clip {i+1} video (Remotion)")
else:
# Legacy PIL+ffmpeg renderer
bg_path = str(tmp_dir / f"bg_{i}.png")
generate_background_image(episode_number, clip["title"], bg_path)
clip_tmp = tmp_dir / f"clip_{i}"
clip_tmp.mkdir(exist_ok=True)
if generate_clip_video(str(mp3_path), bg_path, caption_lines,
clip["start_time"], str(mp4_path),
duration, clip_tmp):
file_size = mp4_path.stat().st_size / (1024 * 1024)
print(f" Clip {i+1} video: {mp4_path.name} ({file_size:.1f} MB)")
else:
print(f" Error generating clip {i+1} video")
# Save clips metadata for social upload
metadata_path = output_dir / "clips-metadata.json"
metadata = []
for i, clip in enumerate(clips):
slug = slugify(clip["title"])
metadata.append({
"title": clip["title"],
"clip_file": f"clip-{i+1}-{slug}.mp4",
"audio_file": f"clip-{i+1}-{slug}.mp3",
"caption_text": clip.get("caption_text", ""),
"description": clip.get("description", ""),
"hashtags": clip.get("hashtags", []),
"start_time": clip["start_time"],
"end_time": clip["end_time"],
"duration": round(clip["end_time"] - clip["start_time"], 1),
"episode_number": episode_number,
})
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2)
print(f"\nSocial metadata: {metadata_path}")
# Summary
print(f"\nDone! {len(clips)} clips saved to {output_dir}")
for i, clip in enumerate(clips):
slug = slugify(clip["title"])
mp4 = output_dir / f"clip-{i+1}-{slug}.mp4"
mp3 = output_dir / f"clip-{i+1}-{slug}.mp3"
print(f" {i+1}. \"{clip['title']}\"")
if mp4.exists():
print(f" Video: {mp4}")
if mp3.exists():
print(f" Audio: {mp3}")
if __name__ == "__main__":
main()