- make_clips.py: Extract best moments from episodes as short-form clips (9:16 vertical MP4 with captions for TikTok/Shorts/Reels) - deploy_stats_cron.sh: Deploy podcast_stats.py to NAS as Docker container running hourly with auto-restart - podcast_stats.py: Add _find_ytdlp() for Docker compatibility, auto-detect local Docker for Castopod DB queries - publish_episode.py: Upgrade Whisper model from base to large-v3 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1089 lines
40 KiB
Python
Executable File
1089 lines
40 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Extract the best short-form clips from a podcast episode.
|
|
|
|
Two-pass pipeline (default):
|
|
1. Fast Whisper model (base) transcribes full episode for clip identification
|
|
2. LLM selects best moments
|
|
3. Quality Whisper model (large-v3) re-transcribes only selected clips for precise timestamps
|
|
|
|
Usage:
|
|
python make_clips.py ~/Desktop/episode12.mp3 --count 3
|
|
python make_clips.py ~/Desktop/episode12.mp3 --transcript website/transcripts/episode-12-love-lies-and-loyalty.txt
|
|
python make_clips.py ~/Desktop/episode12.mp3 --fast-model small --quality-model large-v3
|
|
python make_clips.py ~/Desktop/episode12.mp3 --single-pass # skip two-pass, use quality model only
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv(Path(__file__).parent / ".env")
|
|
|
|
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
|
|
WHISPER_MODEL_FAST = "base"
|
|
WHISPER_MODEL_QUALITY = "large-v3"
|
|
COVER_ART = Path(__file__).parent / "website" / "images" / "cover.png"
|
|
|
|
# Fonts
|
|
FONT_BOLD = "/Library/Fonts/Montserrat-ExtraBold.ttf"
|
|
FONT_MEDIUM = "/Library/Fonts/Montserrat-Medium.ttf"
|
|
FONT_SEMIBOLD = "/Library/Fonts/Montserrat-SemiBold.ttf"
|
|
|
|
# Video dimensions (9:16 vertical)
|
|
WIDTH = 1080
|
|
HEIGHT = 1920
|
|
|
|
|
|
def _build_whisper_prompt(labeled_transcript: str) -> str:
|
|
"""Build an initial_prompt for Whisper from the labeled transcript.
|
|
|
|
Whisper's initial_prompt conditions the model to recognize specific names
|
|
and vocabulary. We extract speaker names and the first few lines of dialog.
|
|
"""
|
|
prompt_parts = ["Luke at the Roost podcast. Host: Luke."]
|
|
|
|
if labeled_transcript:
|
|
# Extract speaker names
|
|
names = set(re.findall(r'^([A-Z][A-Z\s\'-]+?):', labeled_transcript, re.MULTILINE))
|
|
caller_names = [n.strip().title() for n in names if n.strip() != "LUKE"]
|
|
if caller_names:
|
|
prompt_parts.append(f"Callers: {', '.join(caller_names)}.")
|
|
|
|
# First ~500 chars of transcript as context (stripped of labels)
|
|
stripped = re.sub(r'^[A-Z][A-Z\s\'-]+?:\s*', '', labeled_transcript[:800], flags=re.MULTILINE)
|
|
stripped = re.sub(r'\n+', ' ', stripped).strip()[:500]
|
|
if stripped:
|
|
prompt_parts.append(stripped)
|
|
|
|
return " ".join(prompt_parts)
|
|
|
|
|
|
def transcribe_with_timestamps(audio_path: str, whisper_model: str = None,
|
|
labeled_transcript: str = "") -> list[dict]:
|
|
"""Transcribe audio with word-level timestamps using faster-whisper.
|
|
|
|
Returns list of segments: [{start, end, text, words: [{word, start, end}]}]
|
|
"""
|
|
model_name = whisper_model or WHISPER_MODEL_QUALITY
|
|
cache_path = Path(audio_path).with_suffix(f".whisper_cache_{model_name}.json")
|
|
if cache_path.exists():
|
|
print(f" Using cached Whisper output ({model_name})")
|
|
with open(cache_path) as f:
|
|
return json.load(f)
|
|
|
|
try:
|
|
from faster_whisper import WhisperModel
|
|
except ImportError:
|
|
print("Error: faster-whisper not installed. Run: pip install faster-whisper")
|
|
sys.exit(1)
|
|
|
|
initial_prompt = _build_whisper_prompt(labeled_transcript)
|
|
print(f" Model: {model_name}")
|
|
if labeled_transcript:
|
|
print(f" Prompt: {initial_prompt[:100]}...")
|
|
model = WhisperModel(model_name, compute_type="float32")
|
|
segments_iter, info = model.transcribe(
|
|
audio_path,
|
|
word_timestamps=True,
|
|
initial_prompt=initial_prompt,
|
|
language="en",
|
|
beam_size=5,
|
|
vad_filter=True,
|
|
)
|
|
|
|
segments = []
|
|
for seg in segments_iter:
|
|
words = []
|
|
if seg.words:
|
|
for w in seg.words:
|
|
words.append({
|
|
"word": w.word.strip(),
|
|
"start": round(w.start, 3),
|
|
"end": round(w.end, 3),
|
|
})
|
|
segments.append({
|
|
"start": round(seg.start, 3),
|
|
"end": round(seg.end, 3),
|
|
"text": seg.text.strip(),
|
|
"words": words,
|
|
})
|
|
|
|
print(f" Transcribed {info.duration:.1f}s ({len(segments)} segments)")
|
|
|
|
with open(cache_path, "w") as f:
|
|
json.dump(segments, f)
|
|
print(f" Cached to {cache_path}")
|
|
|
|
return segments
|
|
|
|
|
|
def refine_clip_timestamps(audio_path: str, clips: list[dict],
|
|
quality_model: str, labeled_transcript: str = "",
|
|
) -> dict[int, list[dict]]:
|
|
"""Re-transcribe just the selected clip ranges with a high-quality model.
|
|
|
|
Extracts each clip segment, runs the quality model on it, and returns
|
|
refined segments with timestamps mapped back to the original timeline.
|
|
|
|
Returns: {clip_index: [segments]} keyed by clip index
|
|
"""
|
|
try:
|
|
from faster_whisper import WhisperModel
|
|
except ImportError:
|
|
print("Error: faster-whisper not installed. Run: pip install faster-whisper")
|
|
sys.exit(1)
|
|
|
|
initial_prompt = _build_whisper_prompt(labeled_transcript)
|
|
print(f" Refinement model: {quality_model}")
|
|
|
|
model = None # Lazy-load so we skip if all cached
|
|
refined = {}
|
|
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
for i, clip in enumerate(clips):
|
|
# Add padding around clip for context (Whisper does better with some lead-in)
|
|
pad = 3.0
|
|
seg_start = max(0, clip["start_time"] - pad)
|
|
seg_end = clip["end_time"] + pad
|
|
|
|
# Check cache first
|
|
cache_key = f"{Path(audio_path).stem}_clip{i}_{seg_start:.1f}-{seg_end:.1f}"
|
|
cache_path = Path(audio_path).parent / f".whisper_refine_{quality_model}_{cache_key}.json"
|
|
if cache_path.exists():
|
|
print(f" Clip {i+1}: Using cached refinement")
|
|
with open(cache_path) as f:
|
|
refined[i] = json.load(f)
|
|
continue
|
|
|
|
# Extract clip segment to temp WAV
|
|
seg_path = os.path.join(tmp, f"segment_{i}.wav")
|
|
cmd = [
|
|
"ffmpeg", "-y", "-ss", str(seg_start), "-t", str(seg_end - seg_start),
|
|
"-i", audio_path, "-ar", "16000", "-ac", "1", seg_path,
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
print(f" Clip {i+1}: Failed to extract segment")
|
|
refined[i] = []
|
|
continue
|
|
|
|
# Lazy-load model on first non-cached clip
|
|
if model is None:
|
|
model = WhisperModel(quality_model, compute_type="float32")
|
|
|
|
segments_iter, info = model.transcribe(
|
|
seg_path,
|
|
word_timestamps=True,
|
|
initial_prompt=initial_prompt,
|
|
language="en",
|
|
beam_size=5,
|
|
vad_filter=True,
|
|
)
|
|
|
|
# Collect segments and offset timestamps back to original timeline
|
|
segments = []
|
|
for seg in segments_iter:
|
|
words = []
|
|
if seg.words:
|
|
for w in seg.words:
|
|
words.append({
|
|
"word": w.word.strip(),
|
|
"start": round(w.start + seg_start, 3),
|
|
"end": round(w.end + seg_start, 3),
|
|
})
|
|
segments.append({
|
|
"start": round(seg.start + seg_start, 3),
|
|
"end": round(seg.end + seg_start, 3),
|
|
"text": seg.text.strip(),
|
|
"words": words,
|
|
})
|
|
|
|
refined[i] = segments
|
|
print(f" Clip {i+1}: Refined {info.duration:.1f}s → {len(segments)} segments")
|
|
|
|
with open(cache_path, "w") as f:
|
|
json.dump(segments, f)
|
|
|
|
return refined
|
|
|
|
|
|
def get_transcript_text(segments: list[dict]) -> str:
|
|
"""Build timestamped transcript text for the LLM."""
|
|
lines = []
|
|
for seg in segments:
|
|
mins = int(seg["start"] // 60)
|
|
secs = int(seg["start"] % 60)
|
|
lines.append(f"[{mins:02d}:{secs:02d}] {seg['text']}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def select_clips_with_llm(transcript_text: str, labeled_transcript: str,
|
|
chapters_json: str | None, count: int) -> list[dict]:
|
|
"""Ask LLM to pick the best clip-worthy moments."""
|
|
if not OPENROUTER_API_KEY:
|
|
print("Error: OPENROUTER_API_KEY not set in .env")
|
|
sys.exit(1)
|
|
|
|
chapters_context = ""
|
|
if chapters_json:
|
|
chapters_context = f"\nCHAPTERS:\n{chapters_json}\n"
|
|
|
|
labeled_context = ""
|
|
if labeled_transcript:
|
|
# Truncate if too long — LLM needs the gist, not every word
|
|
if len(labeled_transcript) > 12000:
|
|
labeled_context = f"\nSPEAKER-LABELED TRANSCRIPT (truncated):\n{labeled_transcript[:12000]}...\n"
|
|
else:
|
|
labeled_context = f"\nSPEAKER-LABELED TRANSCRIPT:\n{labeled_transcript}\n"
|
|
|
|
prompt = f"""You are selecting the {count} best moments from a podcast episode for short-form video clips (TikTok/YouTube Shorts/Reels).
|
|
|
|
Each clip should be 30-60 seconds long and contain a single compelling moment — a funny exchange, an emotional beat, a surprising take, or an interesting story.
|
|
|
|
TIMESTAMPED TRANSCRIPT:
|
|
{transcript_text}
|
|
{chapters_context}{labeled_context}
|
|
Pick the {count} best moments. For each, return:
|
|
- title: A catchy, short title for the clip (max 8 words)
|
|
- start_time: Start timestamp in seconds (float). Start a few seconds before the key moment for context.
|
|
- end_time: End timestamp in seconds (float). 30-60 seconds after start_time.
|
|
- caption_text: The key quote or line that makes this moment clip-worthy (1-2 sentences max)
|
|
|
|
IMPORTANT:
|
|
- Use the timestamps from the transcript to set precise start/end times
|
|
- Ensure clips don't overlap
|
|
- Prefer moments with back-and-forth dialog over monologues
|
|
- Avoid intro/outro segments
|
|
|
|
Respond with ONLY a JSON array, no markdown or explanation:
|
|
[{{"title": "...", "start_time": 0.0, "end_time": 0.0, "caption_text": "..."}}]"""
|
|
|
|
response = requests.post(
|
|
"https://openrouter.ai/api/v1/chat/completions",
|
|
headers={
|
|
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
json={
|
|
"model": "anthropic/claude-3.5-sonnet",
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"max_tokens": 2048,
|
|
"temperature": 0.3,
|
|
},
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
print(f"Error from OpenRouter: {response.text}")
|
|
sys.exit(1)
|
|
|
|
content = response.json()["choices"][0]["message"]["content"].strip()
|
|
if content.startswith("```"):
|
|
content = re.sub(r"^```(?:json)?\n?", "", content)
|
|
content = re.sub(r"\n?```$", "", content)
|
|
|
|
try:
|
|
clips = json.loads(content)
|
|
except json.JSONDecodeError as e:
|
|
print(f"Error parsing LLM response: {e}")
|
|
print(f"Response was: {content[:500]}")
|
|
sys.exit(1)
|
|
|
|
# Validate and clamp durations
|
|
validated = []
|
|
for clip in clips:
|
|
duration = clip["end_time"] - clip["start_time"]
|
|
if duration < 15:
|
|
clip["end_time"] = clip["start_time"] + 30
|
|
elif duration > 75:
|
|
clip["end_time"] = clip["start_time"] + 60
|
|
validated.append(clip)
|
|
|
|
return validated
|
|
|
|
|
|
def snap_to_sentences(clips: list[dict], segments: list[dict]) -> list[dict]:
|
|
"""Snap clip start/end times to sentence boundaries.
|
|
|
|
Uses Whisper segment boundaries and punctuation to find the nearest
|
|
sentence start/end so clips don't begin or end mid-sentence.
|
|
"""
|
|
# Build a list of sentence boundary timestamps from Whisper segments.
|
|
# A sentence boundary is: the start of a segment, or a word right after .?!
|
|
sentence_starts = []
|
|
sentence_ends = []
|
|
|
|
for seg in segments:
|
|
sentence_starts.append(seg["start"])
|
|
sentence_ends.append(seg["end"])
|
|
|
|
# Also find sentence breaks within segments using word punctuation
|
|
words = seg.get("words", [])
|
|
for i, w in enumerate(words):
|
|
if w["word"].rstrip().endswith(('.', '?', '!')):
|
|
sentence_ends.append(w["end"])
|
|
if i + 1 < len(words):
|
|
sentence_starts.append(words[i + 1]["start"])
|
|
|
|
sentence_starts.sort()
|
|
sentence_ends.sort()
|
|
|
|
for clip in clips:
|
|
original_start = clip["start_time"]
|
|
original_end = clip["end_time"]
|
|
|
|
# Find nearest sentence start at or before the clip start
|
|
# Look up to 5s back for a sentence boundary
|
|
best_start = original_start
|
|
best_start_dist = float('inf')
|
|
for s in sentence_starts:
|
|
dist = abs(s - original_start)
|
|
if dist < best_start_dist and s <= original_start + 1:
|
|
best_start = s
|
|
best_start_dist = dist
|
|
if s > original_start + 1:
|
|
break
|
|
|
|
# Find nearest sentence end at or after the clip end
|
|
# Look up to 5s forward for a sentence boundary
|
|
best_end = original_end
|
|
best_end_dist = float('inf')
|
|
for e in sentence_ends:
|
|
if e < original_end - 5:
|
|
continue
|
|
dist = abs(e - original_end)
|
|
if dist < best_end_dist:
|
|
best_end = e
|
|
best_end_dist = dist
|
|
if e > original_end + 5:
|
|
break
|
|
|
|
# Make sure we didn't create a clip that's too short or too long
|
|
duration = best_end - best_start
|
|
if duration < 20:
|
|
# Too short — extend end to next sentence boundary
|
|
for e in sentence_ends:
|
|
if e > best_start + 25:
|
|
best_end = e
|
|
break
|
|
elif duration > 75:
|
|
# Too long — pull end back
|
|
for e in reversed(sentence_ends):
|
|
if best_start + 30 <= e <= best_start + 65:
|
|
best_end = e
|
|
break
|
|
|
|
clip["start_time"] = best_start
|
|
clip["end_time"] = best_end
|
|
|
|
return clips
|
|
|
|
|
|
def get_words_in_range(segments: list[dict], start: float, end: float) -> list[dict]:
|
|
"""Extract word-level timestamps for a time range from Whisper segments."""
|
|
words = []
|
|
for seg in segments:
|
|
if seg["end"] < start or seg["start"] > end:
|
|
continue
|
|
for w in seg.get("words", []):
|
|
if w["start"] >= start - 0.5 and w["end"] <= end + 0.5:
|
|
words.append(w)
|
|
return words
|
|
|
|
|
|
def _words_similar(a: str, b: str, max_dist: int = 2) -> bool:
|
|
"""Check if two words are within edit distance max_dist (Levenshtein)."""
|
|
if abs(len(a) - len(b)) > max_dist:
|
|
return False
|
|
# Simple DP edit distance, bounded
|
|
prev = list(range(len(b) + 1))
|
|
for i in range(1, len(a) + 1):
|
|
curr = [i] + [0] * len(b)
|
|
for j in range(1, len(b) + 1):
|
|
cost = 0 if a[i - 1] == b[j - 1] else 1
|
|
curr[j] = min(curr[j - 1] + 1, prev[j] + 1, prev[j - 1] + cost)
|
|
prev = curr
|
|
return prev[len(b)] <= max_dist
|
|
|
|
|
|
def _find_labeled_section(labeled_transcript: str, range_text: str) -> str | None:
|
|
"""Find the section of labeled transcript matching a Whisper text range."""
|
|
# Strip speaker labels and punctuation from labeled transcript for matching
|
|
labeled_stripped = re.sub(r'^[A-Z][A-Z\s\'-]+?:\s*', '', labeled_transcript, flags=re.MULTILINE)
|
|
labeled_clean = re.sub(r'[^\w\s]', '', labeled_stripped.lower())
|
|
labeled_clean = re.sub(r'\s+', ' ', labeled_clean)
|
|
|
|
whisper_clean = re.sub(r'[^\w\s]', '', range_text.lower())
|
|
whisper_clean = re.sub(r'\s+', ' ', whisper_clean)
|
|
whisper_words_list = whisper_clean.split()
|
|
|
|
# Try progressively shorter phrases from different positions
|
|
for phrase_len in [10, 7, 5, 3]:
|
|
for start_offset in [0, len(whisper_words_list) // 3, len(whisper_words_list) // 2]:
|
|
words_slice = whisper_words_list[start_offset:start_offset + phrase_len]
|
|
phrase = " ".join(words_slice)
|
|
if len(phrase) < 8:
|
|
continue
|
|
pos = labeled_clean.find(phrase)
|
|
if pos != -1:
|
|
# Map back to original transcript — find first word near this position
|
|
match_pos = labeled_transcript.lower().find(
|
|
words_slice[0], max(0, pos - 300))
|
|
if match_pos == -1:
|
|
match_pos = max(0, pos)
|
|
else:
|
|
match_pos = max(0, match_pos - start_offset * 6)
|
|
|
|
context_start = max(0, match_pos - 400)
|
|
context_end = min(len(labeled_transcript), match_pos + len(range_text) + 600)
|
|
return labeled_transcript[context_start:context_end]
|
|
|
|
return None
|
|
|
|
|
|
def _parse_labeled_words(labeled_section: str) -> list[tuple[str, str, str]]:
|
|
"""Parse speaker-labeled text into (original_word, clean_lower, speaker) tuples."""
|
|
result = []
|
|
for m in re.finditer(r'^([A-Z][A-Z\s\'-]+?):\s*(.+?)(?=\n[A-Z][A-Z\s\'-]+?:|\n\n|\Z)',
|
|
labeled_section, re.MULTILINE | re.DOTALL):
|
|
speaker = m.group(1).strip()
|
|
text = m.group(2)
|
|
for w in text.split():
|
|
original = w.strip()
|
|
clean = re.sub(r"[^\w']", '', original.lower())
|
|
if clean:
|
|
result.append((original, clean, speaker))
|
|
return result
|
|
|
|
|
|
def add_speaker_labels(words: list[dict], labeled_transcript: str,
|
|
start_time: float, end_time: float,
|
|
segments: list[dict]) -> list[dict]:
|
|
"""Add speaker labels AND correct word text using labeled transcript.
|
|
|
|
Uses Whisper only for timestamps. Takes text from the labeled transcript,
|
|
which has correct names and spelling. Aligns using greedy forward matching
|
|
with edit-distance fuzzy matching.
|
|
"""
|
|
if not labeled_transcript or not words:
|
|
return words
|
|
|
|
# Get the raw Whisper text for this time range
|
|
range_text = ""
|
|
for seg in segments:
|
|
if seg["end"] < start_time or seg["start"] > end_time:
|
|
continue
|
|
range_text += " " + seg["text"]
|
|
range_text = range_text.strip()
|
|
|
|
# Find matching section in labeled transcript
|
|
labeled_section = _find_labeled_section(labeled_transcript, range_text)
|
|
if not labeled_section:
|
|
return words
|
|
|
|
labeled_words_flat = _parse_labeled_words(labeled_section)
|
|
if not labeled_words_flat:
|
|
return words
|
|
|
|
# Greedy forward alignment: for each Whisper word, find best match
|
|
# in labeled words within a lookahead window
|
|
labeled_idx = 0
|
|
current_speaker = labeled_words_flat[0][2]
|
|
corrections = 0
|
|
|
|
for word_entry in words:
|
|
whisper_clean = re.sub(r"[^\w']", '', word_entry["word"].lower())
|
|
if not whisper_clean:
|
|
word_entry["speaker"] = current_speaker
|
|
continue
|
|
|
|
# Search forward for best match
|
|
best_idx = None
|
|
best_score = 0 # 2 = exact, 1 = fuzzy
|
|
window = min(labeled_idx + 12, len(labeled_words_flat))
|
|
|
|
for j in range(labeled_idx, window):
|
|
labeled_clean = labeled_words_flat[j][1]
|
|
|
|
if labeled_clean == whisper_clean:
|
|
best_idx = j
|
|
best_score = 2
|
|
break
|
|
|
|
if len(whisper_clean) >= 3 and len(labeled_clean) >= 3:
|
|
if _words_similar(whisper_clean, labeled_clean):
|
|
if best_score < 1:
|
|
best_idx = j
|
|
best_score = 1
|
|
# Don't break — keep looking for exact match
|
|
|
|
if best_idx is not None:
|
|
original_word, _, speaker = labeled_words_flat[best_idx]
|
|
current_speaker = speaker
|
|
|
|
# Replace Whisper's word with correct version
|
|
corrected = re.sub(r'[^\w\s\'-]', '', original_word)
|
|
if corrected and corrected.lower() != whisper_clean:
|
|
word_entry["word"] = corrected
|
|
corrections += 1
|
|
elif corrected:
|
|
word_entry["word"] = corrected
|
|
|
|
labeled_idx = best_idx + 1
|
|
else:
|
|
# No match — advance labeled pointer by 1 to stay roughly in sync
|
|
if labeled_idx < len(labeled_words_flat):
|
|
labeled_idx += 1
|
|
|
|
word_entry["speaker"] = current_speaker
|
|
|
|
if corrections:
|
|
print(f" Corrected {corrections} words from labeled transcript")
|
|
|
|
return words
|
|
|
|
|
|
def group_words_into_lines(words: list[dict], clip_start: float,
|
|
clip_duration: float) -> list[dict]:
|
|
"""Group words into timed caption lines for rendering.
|
|
|
|
Returns list of: {start, end, speaker, words: [{word, highlighted}]}
|
|
"""
|
|
if not words:
|
|
return []
|
|
|
|
# Group words into display lines (5-7 words per line)
|
|
raw_lines = []
|
|
current_line = []
|
|
for w in words:
|
|
current_line.append(w)
|
|
if len(current_line) >= 6 or w["word"].rstrip().endswith(('.', '?', '!', ',')):
|
|
if len(current_line) >= 3:
|
|
raw_lines.append(current_line)
|
|
current_line = []
|
|
if current_line:
|
|
if raw_lines and len(current_line) < 3:
|
|
raw_lines[-1].extend(current_line)
|
|
else:
|
|
raw_lines.append(current_line)
|
|
|
|
lines = []
|
|
for line_words in raw_lines:
|
|
line_start = line_words[0]["start"] - clip_start
|
|
line_end = line_words[-1]["end"] - clip_start
|
|
|
|
if line_start < 0:
|
|
line_start = 0
|
|
if line_end > clip_duration:
|
|
line_end = clip_duration
|
|
if line_end <= line_start:
|
|
continue
|
|
|
|
lines.append({
|
|
"start": line_start,
|
|
"end": line_end,
|
|
"speaker": line_words[0].get("speaker", ""),
|
|
"words": line_words,
|
|
})
|
|
|
|
return lines
|
|
|
|
|
|
def extract_clip_audio(audio_path: str, start: float, end: float,
|
|
output_path: str) -> bool:
|
|
"""Extract audio clip with fade in/out."""
|
|
duration = end - start
|
|
fade_in = 0.3
|
|
fade_out = 0.5
|
|
|
|
af = f"afade=t=in:d={fade_in},afade=t=out:st={duration - fade_out}:d={fade_out}"
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-ss", str(start),
|
|
"-t", str(duration),
|
|
"-i", audio_path,
|
|
"-af", af,
|
|
"-ab", "192k",
|
|
output_path,
|
|
]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
return result.returncode == 0
|
|
|
|
|
|
def generate_background_image(episode_number: int, clip_title: str,
|
|
output_path: str) -> bool:
|
|
"""Generate 9:16 vertical background with blurred/cropped cover art."""
|
|
from PIL import Image, ImageDraw, ImageFilter, ImageFont
|
|
|
|
if not COVER_ART.exists():
|
|
print(f" Warning: Cover art not found at {COVER_ART}")
|
|
# Create solid dark background fallback
|
|
img = Image.new("RGB", (WIDTH, HEIGHT), (20, 15, 30))
|
|
img.save(output_path)
|
|
return True
|
|
|
|
cover = Image.open(COVER_ART).convert("RGB")
|
|
|
|
# Scale cover to fill 1080x1920 (crop to fit)
|
|
cover_ratio = cover.width / cover.height
|
|
target_ratio = WIDTH / HEIGHT
|
|
|
|
if cover_ratio > target_ratio:
|
|
new_h = HEIGHT
|
|
new_w = int(HEIGHT * cover_ratio)
|
|
else:
|
|
new_w = WIDTH
|
|
new_h = int(WIDTH / cover_ratio)
|
|
|
|
cover = cover.resize((new_w, new_h), Image.LANCZOS)
|
|
|
|
# Center crop
|
|
left = (new_w - WIDTH) // 2
|
|
top = (new_h - HEIGHT) // 2
|
|
cover = cover.crop((left, top, left + WIDTH, top + HEIGHT))
|
|
|
|
# Heavy blur + darken for background
|
|
bg = cover.filter(ImageFilter.GaussianBlur(radius=30))
|
|
from PIL import ImageEnhance
|
|
bg = ImageEnhance.Brightness(bg).enhance(0.3)
|
|
|
|
# Place sharp cover art centered, sized to ~60% width
|
|
art_size = int(WIDTH * 0.6)
|
|
art = Image.open(COVER_ART).convert("RGB")
|
|
art = art.resize((art_size, art_size), Image.LANCZOS)
|
|
|
|
# Add rounded shadow effect (just darken behind)
|
|
art_x = (WIDTH - art_size) // 2
|
|
art_y = int(HEIGHT * 0.18)
|
|
bg.paste(art, (art_x, art_y))
|
|
|
|
# Draw text overlays
|
|
draw = ImageDraw.Draw(bg)
|
|
|
|
try:
|
|
font_ep = ImageFont.truetype(FONT_BOLD, 42)
|
|
font_title = ImageFont.truetype(FONT_BOLD, 56)
|
|
font_url = ImageFont.truetype(FONT_SEMIBOLD, 32)
|
|
except OSError:
|
|
font_ep = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 42)
|
|
font_title = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 56)
|
|
font_url = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 32)
|
|
|
|
margin = 60
|
|
|
|
# Episode label at top
|
|
ep_text = f"EPISODE {episode_number}" if episode_number else "LUKE AT THE ROOST"
|
|
draw.text((margin, 80), ep_text, font=font_ep, fill=(255, 200, 80))
|
|
|
|
# Clip title below episode label
|
|
# Word wrap the title
|
|
import textwrap
|
|
wrapped_title = textwrap.fill(clip_title, width=22)
|
|
draw.text((margin, 140), wrapped_title, font=font_title, fill=(255, 255, 255))
|
|
|
|
# Watermark at bottom
|
|
url_text = "lukeattheroost.com"
|
|
bbox = draw.textbbox((0, 0), url_text, font=font_url)
|
|
url_w = bbox[2] - bbox[0]
|
|
draw.text(((WIDTH - url_w) // 2, HEIGHT - 80), url_text,
|
|
font=font_url, fill=(255, 200, 80, 200))
|
|
|
|
bg.save(output_path, "PNG")
|
|
return True
|
|
|
|
|
|
def generate_caption_frames(bg_path: str, caption_lines: list[dict],
|
|
clip_start: float, duration: float,
|
|
tmp_dir: Path, fps: int = 10) -> str:
|
|
"""Generate caption frame PNGs and a concat file for ffmpeg.
|
|
|
|
Uses a low FPS (10) since the background is static — only captions change.
|
|
Returns path to the concat file.
|
|
"""
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
|
|
bg = Image.open(bg_path).convert("RGB")
|
|
|
|
try:
|
|
font_caption = ImageFont.truetype(FONT_BOLD, 52)
|
|
font_speaker = ImageFont.truetype(FONT_SEMIBOLD, 40)
|
|
except OSError:
|
|
font_caption = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 52)
|
|
font_speaker = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 40)
|
|
|
|
frames_dir = tmp_dir / "frames"
|
|
frames_dir.mkdir(exist_ok=True)
|
|
|
|
n_frames = int(duration * fps)
|
|
frame_duration = 1.0 / fps
|
|
|
|
concat_lines = []
|
|
|
|
prev_state = None # (line_idx, highlighted_word_idx) — only reuse when both match
|
|
prev_frame_path = None
|
|
|
|
for frame_num in range(n_frames):
|
|
t = frame_num * frame_duration
|
|
|
|
# Find active caption line
|
|
active_idx = -1
|
|
active_line = None
|
|
for i, line in enumerate(caption_lines):
|
|
if line["start"] <= t <= line["end"]:
|
|
active_idx = i
|
|
active_line = line
|
|
break
|
|
|
|
# Find which word is currently highlighted
|
|
highlight_idx = -1
|
|
if active_line:
|
|
for wi, w in enumerate(active_line["words"]):
|
|
word_rel_start = w["start"] - clip_start
|
|
word_rel_end = w["end"] - clip_start
|
|
if word_rel_start <= t <= word_rel_end:
|
|
highlight_idx = wi
|
|
break
|
|
if highlight_idx == -1:
|
|
# Between words — highlight the last spoken word
|
|
for wi in range(len(active_line["words"]) - 1, -1, -1):
|
|
if t > active_line["words"][wi]["end"] - clip_start:
|
|
highlight_idx = wi
|
|
break
|
|
|
|
# Reuse previous frame only if same line AND same highlighted word
|
|
state = (active_idx, highlight_idx)
|
|
if state == prev_state and prev_frame_path:
|
|
concat_lines.append(f"file '{prev_frame_path}'")
|
|
concat_lines.append(f"duration {frame_duration:.4f}")
|
|
continue
|
|
|
|
frame = bg.copy()
|
|
|
|
if active_line:
|
|
draw = ImageDraw.Draw(frame)
|
|
margin = 60
|
|
caption_y = int(HEIGHT * 0.78)
|
|
|
|
# Speaker label
|
|
if active_line.get("speaker"):
|
|
for dx, dy in [(-1, -1), (-1, 1), (1, -1), (1, 1)]:
|
|
draw.text((margin + dx, caption_y - 55 + dy),
|
|
active_line["speaker"],
|
|
font=font_speaker, fill=(0, 0, 0))
|
|
draw.text((margin, caption_y - 55), active_line["speaker"],
|
|
font=font_speaker, fill=(255, 200, 80))
|
|
|
|
# Caption text — all words visible, current word highlighted yellow
|
|
x = margin
|
|
y = caption_y
|
|
for wi, w in enumerate(active_line["words"]):
|
|
word_text = w["word"] + " "
|
|
|
|
if wi == highlight_idx:
|
|
color = (255, 200, 80) # Yellow — currently spoken
|
|
elif wi < highlight_idx or (highlight_idx == -1 and t > w["end"] - clip_start):
|
|
color = (255, 255, 255) # White — already spoken
|
|
else:
|
|
color = (180, 180, 180) # Gray — upcoming
|
|
|
|
bbox = draw.textbbox((0, 0), word_text, font=font_caption)
|
|
w_width = bbox[2] - bbox[0]
|
|
|
|
# Wrap line
|
|
if x + w_width > WIDTH - margin:
|
|
x = margin
|
|
y += 65
|
|
|
|
# Outline
|
|
for dx, dy in [(-2, -2), (-2, 2), (2, -2), (2, 2)]:
|
|
draw.text((x + dx, y + dy), w["word"],
|
|
font=font_caption, fill=(0, 0, 0))
|
|
|
|
draw.text((x, y), w["word"], font=font_caption, fill=color)
|
|
x += w_width
|
|
|
|
frame_path = str(frames_dir / f"frame_{frame_num:05d}.png")
|
|
frame.save(frame_path, "PNG")
|
|
|
|
concat_lines.append(f"file '{frame_path}'")
|
|
concat_lines.append(f"duration {frame_duration:.4f}")
|
|
|
|
prev_state = state
|
|
prev_frame_path = frame_path
|
|
|
|
# Final frame needs duration too
|
|
if prev_frame_path:
|
|
concat_lines.append(f"file '{prev_frame_path}'")
|
|
concat_lines.append(f"duration {frame_duration:.4f}")
|
|
|
|
concat_path = str(tmp_dir / "concat.txt")
|
|
with open(concat_path, "w") as f:
|
|
f.write("\n".join(concat_lines))
|
|
|
|
return concat_path
|
|
|
|
|
|
def generate_clip_video(audio_path: str, background_path: str,
|
|
caption_lines: list[dict], clip_start: float,
|
|
output_path: str, duration: float,
|
|
tmp_dir: Path) -> bool:
|
|
"""Generate clip video with burned-in captions using Pillow + ffmpeg."""
|
|
if caption_lines:
|
|
# Generate frames with captions
|
|
concat_path = generate_caption_frames(
|
|
background_path, caption_lines, clip_start, duration, tmp_dir
|
|
)
|
|
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-f", "concat", "-safe", "0", "-i", concat_path,
|
|
"-i", audio_path,
|
|
"-c:v", "libx264", "-preset", "medium", "-crf", "23",
|
|
"-pix_fmt", "yuv420p",
|
|
"-c:a", "aac", "-b:a", "192k",
|
|
"-t", str(duration),
|
|
"-shortest",
|
|
"-r", "30",
|
|
output_path,
|
|
]
|
|
else:
|
|
# No captions — just static image + audio
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-loop", "1", "-i", background_path,
|
|
"-i", audio_path,
|
|
"-c:v", "libx264", "-preset", "medium", "-crf", "23",
|
|
"-pix_fmt", "yuv420p",
|
|
"-c:a", "aac", "-b:a", "192k",
|
|
"-t", str(duration),
|
|
"-shortest",
|
|
"-r", "30",
|
|
output_path,
|
|
]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
print(f" ffmpeg error: {result.stderr[-300:]}")
|
|
return False
|
|
return True
|
|
|
|
|
|
def slugify(text: str) -> str:
|
|
"""Convert text to URL-friendly slug."""
|
|
slug = re.sub(r'[^a-z0-9]+', '-', text.lower()).strip('-')
|
|
return slug[:50]
|
|
|
|
|
|
def detect_episode_number(audio_path: str) -> int | None:
|
|
"""Try to detect episode number from filename."""
|
|
name = Path(audio_path).stem
|
|
m = re.search(r'(?:episode|ep|podcast)[-_]?(\d+)', name, re.IGNORECASE)
|
|
if m:
|
|
return int(m.group(1))
|
|
m = re.search(r'(\d+)', name)
|
|
if m:
|
|
return int(m.group(1))
|
|
return None
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Extract short-form clips from podcast episodes")
|
|
parser.add_argument("audio_file", help="Path to episode MP3")
|
|
parser.add_argument("--transcript", "-t", help="Path to labeled transcript (.txt)")
|
|
parser.add_argument("--chapters", "-c", help="Path to chapters JSON")
|
|
parser.add_argument("--count", "-n", type=int, default=3, help="Number of clips to extract (default: 3)")
|
|
parser.add_argument("--episode-number", "-e", type=int, help="Episode number (auto-detected from filename)")
|
|
parser.add_argument("--output-dir", "-o", help="Output directory (default: clips/episode-N/)")
|
|
parser.add_argument("--audio-only", action="store_true", help="Only extract audio clips, skip video")
|
|
parser.add_argument("--fast-model", default=WHISPER_MODEL_FAST,
|
|
help=f"Fast Whisper model for clip identification (default: {WHISPER_MODEL_FAST})")
|
|
parser.add_argument("--quality-model", default=WHISPER_MODEL_QUALITY,
|
|
help=f"Quality Whisper model for clip refinement (default: {WHISPER_MODEL_QUALITY})")
|
|
parser.add_argument("--single-pass", action="store_true",
|
|
help="Use quality model for everything (slower, no two-pass)")
|
|
args = parser.parse_args()
|
|
|
|
audio_path = Path(args.audio_file).expanduser().resolve()
|
|
if not audio_path.exists():
|
|
print(f"Error: Audio file not found: {audio_path}")
|
|
sys.exit(1)
|
|
|
|
# Detect episode number
|
|
episode_number = args.episode_number or detect_episode_number(str(audio_path))
|
|
|
|
# Resolve output directory
|
|
if args.output_dir:
|
|
output_dir = Path(args.output_dir)
|
|
elif episode_number:
|
|
output_dir = Path(__file__).parent / "clips" / f"episode-{episode_number}"
|
|
else:
|
|
output_dir = Path(__file__).parent / "clips" / audio_path.stem
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
print(f"Clip extraction: {audio_path.name}")
|
|
if episode_number:
|
|
print(f"Episode: {episode_number}")
|
|
print(f"Output: {output_dir}")
|
|
print(f"Clips requested: {args.count}")
|
|
|
|
# Step 1: Load labeled transcript (needed to prime Whisper with names)
|
|
print(f"\n[1] Loading labeled transcript...")
|
|
labeled_transcript = ""
|
|
if args.transcript:
|
|
transcript_path = Path(args.transcript).expanduser().resolve()
|
|
if transcript_path.exists():
|
|
labeled_transcript = transcript_path.read_text()
|
|
print(f" Loaded: {transcript_path.name} ({len(labeled_transcript)} chars)")
|
|
else:
|
|
print(f" Warning: Transcript not found: {transcript_path}")
|
|
else:
|
|
# Auto-detect from website/transcripts/
|
|
transcripts_dir = Path(__file__).parent / "website" / "transcripts"
|
|
if episode_number and transcripts_dir.exists():
|
|
for f in transcripts_dir.iterdir():
|
|
if f.suffix == ".txt" and f"episode-{episode_number}" in f.name:
|
|
labeled_transcript = f.read_text()
|
|
print(f" Auto-detected: {f.name}")
|
|
break
|
|
if not labeled_transcript:
|
|
print(" No labeled transcript found (names may be inaccurate)")
|
|
|
|
# Step 2: Fast transcription for clip identification
|
|
two_pass = not args.single_pass and args.fast_model != args.quality_model
|
|
if two_pass:
|
|
print(f"\n[2/6] Fast transcription for clip identification ({args.fast_model})...")
|
|
else:
|
|
print(f"\n[2/5] Transcribing with word-level timestamps ({args.quality_model})...")
|
|
identify_model = args.fast_model if two_pass else args.quality_model
|
|
segments = transcribe_with_timestamps(
|
|
str(audio_path), identify_model, labeled_transcript
|
|
)
|
|
|
|
# Build timestamped transcript for LLM
|
|
transcript_text = get_transcript_text(segments)
|
|
|
|
# Load chapters if provided
|
|
chapters_json = None
|
|
if args.chapters:
|
|
chapters_path = Path(args.chapters).expanduser().resolve()
|
|
if chapters_path.exists():
|
|
with open(chapters_path) as f:
|
|
chapters_json = f.read()
|
|
print(f" Chapters loaded: {chapters_path.name}")
|
|
|
|
# Step 3: LLM selects best moments
|
|
step_total = 6 if two_pass else 5
|
|
print(f"\n[3/{step_total}] Selecting {args.count} best moments with LLM...")
|
|
clips = select_clips_with_llm(transcript_text, labeled_transcript,
|
|
chapters_json, args.count)
|
|
|
|
# Snap to sentence boundaries so clips don't start/end mid-sentence
|
|
clips = snap_to_sentences(clips, segments)
|
|
|
|
for i, clip in enumerate(clips):
|
|
duration = clip["end_time"] - clip["start_time"]
|
|
print(f" Clip {i+1}: \"{clip['title']}\" "
|
|
f"({clip['start_time']:.1f}s - {clip['end_time']:.1f}s, {duration:.0f}s)")
|
|
print(f" \"{clip['caption_text']}\"")
|
|
|
|
# Step 4: Refine clip timestamps with quality model (two-pass only)
|
|
refined = {}
|
|
if two_pass:
|
|
print(f"\n[4/{step_total}] Refining clips with {args.quality_model}...")
|
|
refined = refine_clip_timestamps(
|
|
str(audio_path), clips, args.quality_model, labeled_transcript
|
|
)
|
|
# Re-snap to sentence boundaries using refined segments
|
|
for i, clip in enumerate(clips):
|
|
if i in refined and refined[i]:
|
|
clip_segments = refined[i]
|
|
clips[i:i+1] = snap_to_sentences([clip], clip_segments)
|
|
|
|
# Step N: Extract audio clips
|
|
extract_step = 5 if two_pass else 4
|
|
print(f"\n[{extract_step}/{step_total}] Extracting audio clips...")
|
|
for i, clip in enumerate(clips):
|
|
slug = slugify(clip["title"])
|
|
mp3_path = output_dir / f"clip-{i+1}-{slug}.mp3"
|
|
|
|
if extract_clip_audio(str(audio_path), clip["start_time"], clip["end_time"],
|
|
str(mp3_path)):
|
|
print(f" Clip {i+1} audio: {mp3_path.name}")
|
|
else:
|
|
print(f" Error extracting clip {i+1} audio")
|
|
|
|
video_step = 6 if two_pass else 5
|
|
if args.audio_only:
|
|
print(f"\n[{video_step}/{step_total}] Skipped video generation (--audio-only)")
|
|
print(f"\nDone! {len(clips)} audio clips saved to {output_dir}")
|
|
return
|
|
|
|
# Step N: Generate video clips
|
|
print(f"\n[{video_step}/{step_total}] Generating video clips...")
|
|
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_dir = Path(tmp)
|
|
|
|
for i, clip in enumerate(clips):
|
|
slug = slugify(clip["title"])
|
|
mp3_path = output_dir / f"clip-{i+1}-{slug}.mp3"
|
|
mp4_path = output_dir / f"clip-{i+1}-{slug}.mp4"
|
|
duration = clip["end_time"] - clip["start_time"]
|
|
|
|
print(f" Clip {i+1}: Generating video...")
|
|
|
|
# Generate background image
|
|
bg_path = str(tmp_dir / f"bg_{i}.png")
|
|
generate_background_image(episode_number, clip["title"], bg_path)
|
|
|
|
# Get word timestamps — use refined segments if available
|
|
word_source = refined[i] if (two_pass and i in refined and refined[i]) else segments
|
|
clip_words = get_words_in_range(word_source, clip["start_time"], clip["end_time"])
|
|
|
|
# Add speaker labels
|
|
clip_words = add_speaker_labels(clip_words, labeled_transcript,
|
|
clip["start_time"], clip["end_time"],
|
|
word_source)
|
|
|
|
# Group words into timed caption lines
|
|
caption_lines = group_words_into_lines(
|
|
clip_words, clip["start_time"], duration
|
|
)
|
|
|
|
# Use a per-clip temp dir for frames
|
|
clip_tmp = tmp_dir / f"clip_{i}"
|
|
clip_tmp.mkdir(exist_ok=True)
|
|
|
|
# Composite video
|
|
if generate_clip_video(str(mp3_path), bg_path, caption_lines,
|
|
clip["start_time"], str(mp4_path),
|
|
duration, clip_tmp):
|
|
file_size = mp4_path.stat().st_size / (1024 * 1024)
|
|
print(f" Clip {i+1} video: {mp4_path.name} ({file_size:.1f} MB)")
|
|
else:
|
|
print(f" Error generating clip {i+1} video")
|
|
|
|
# Summary
|
|
print(f"\nDone! {len(clips)} clips saved to {output_dir}")
|
|
for i, clip in enumerate(clips):
|
|
slug = slugify(clip["title"])
|
|
mp4 = output_dir / f"clip-{i+1}-{slug}.mp4"
|
|
mp3 = output_dir / f"clip-{i+1}-{slug}.mp3"
|
|
print(f" {i+1}. \"{clip['title']}\"")
|
|
if mp4.exists():
|
|
print(f" Video: {mp4}")
|
|
if mp3.exists():
|
|
print(f" Audio: {mp3}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|