#!/usr/bin/env python3 """Extract the best short-form clips from a podcast episode. Two-pass pipeline (default): 1. Fast Whisper model (base) transcribes full episode for clip identification 2. LLM selects best moments 3. Quality Whisper model (large-v3) re-transcribes only selected clips for precise timestamps Usage: python make_clips.py ~/Desktop/episode12.mp3 --count 3 python make_clips.py ~/Desktop/episode12.mp3 --transcript website/transcripts/episode-12-love-lies-and-loyalty.txt python make_clips.py ~/Desktop/episode12.mp3 --fast-model small --quality-model large-v3 python make_clips.py ~/Desktop/episode12.mp3 --single-pass # skip two-pass, use quality model only """ import argparse import json import os import re import subprocess import sys import tempfile from pathlib import Path import requests from dotenv import load_dotenv load_dotenv(Path(__file__).parent / ".env") OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") WHISPER_MODEL_FAST = "base" WHISPER_MODEL_QUALITY = "large-v3" COVER_ART = Path(__file__).parent / "website" / "images" / "cover.png" # Fonts FONT_BOLD = "/Library/Fonts/Montserrat-ExtraBold.ttf" FONT_MEDIUM = "/Library/Fonts/Montserrat-Medium.ttf" FONT_SEMIBOLD = "/Library/Fonts/Montserrat-SemiBold.ttf" # Video dimensions (9:16 vertical) WIDTH = 1080 HEIGHT = 1920 def _build_whisper_prompt(labeled_transcript: str) -> str: """Build an initial_prompt for Whisper from the labeled transcript. Whisper's initial_prompt conditions the model to recognize specific names and vocabulary. We extract speaker names and the first few lines of dialog. """ prompt_parts = ["Luke at the Roost podcast. Host: Luke."] if labeled_transcript: # Extract speaker names names = set(re.findall(r'^([A-Z][A-Z\s\'-]+?):', labeled_transcript, re.MULTILINE)) caller_names = [n.strip().title() for n in names if n.strip() != "LUKE"] if caller_names: prompt_parts.append(f"Callers: {', '.join(caller_names)}.") # First ~500 chars of transcript as context (stripped of labels) stripped = re.sub(r'^[A-Z][A-Z\s\'-]+?:\s*', '', labeled_transcript[:800], flags=re.MULTILINE) stripped = re.sub(r'\n+', ' ', stripped).strip()[:500] if stripped: prompt_parts.append(stripped) return " ".join(prompt_parts) def transcribe_with_timestamps(audio_path: str, whisper_model: str = None, labeled_transcript: str = "") -> list[dict]: """Transcribe audio with word-level timestamps using faster-whisper. Returns list of segments: [{start, end, text, words: [{word, start, end}]}] """ model_name = whisper_model or WHISPER_MODEL_QUALITY cache_path = Path(audio_path).with_suffix(f".whisper_cache_{model_name}.json") if cache_path.exists(): print(f" Using cached Whisper output ({model_name})") with open(cache_path) as f: return json.load(f) try: from faster_whisper import WhisperModel except ImportError: print("Error: faster-whisper not installed. Run: pip install faster-whisper") sys.exit(1) initial_prompt = _build_whisper_prompt(labeled_transcript) print(f" Model: {model_name}") if labeled_transcript: print(f" Prompt: {initial_prompt[:100]}...") model = WhisperModel(model_name, compute_type="float32") segments_iter, info = model.transcribe( audio_path, word_timestamps=True, initial_prompt=initial_prompt, language="en", beam_size=5, vad_filter=True, ) segments = [] for seg in segments_iter: words = [] if seg.words: for w in seg.words: words.append({ "word": w.word.strip(), "start": round(w.start, 3), "end": round(w.end, 3), }) segments.append({ "start": round(seg.start, 3), "end": round(seg.end, 3), "text": seg.text.strip(), "words": words, }) print(f" Transcribed {info.duration:.1f}s ({len(segments)} segments)") with open(cache_path, "w") as f: json.dump(segments, f) print(f" Cached to {cache_path}") return segments def refine_clip_timestamps(audio_path: str, clips: list[dict], quality_model: str, labeled_transcript: str = "", ) -> dict[int, list[dict]]: """Re-transcribe just the selected clip ranges with a high-quality model. Extracts each clip segment, runs the quality model on it, and returns refined segments with timestamps mapped back to the original timeline. Returns: {clip_index: [segments]} keyed by clip index """ try: from faster_whisper import WhisperModel except ImportError: print("Error: faster-whisper not installed. Run: pip install faster-whisper") sys.exit(1) initial_prompt = _build_whisper_prompt(labeled_transcript) print(f" Refinement model: {quality_model}") model = None # Lazy-load so we skip if all cached refined = {} with tempfile.TemporaryDirectory() as tmp: for i, clip in enumerate(clips): # Add padding around clip for context (Whisper does better with some lead-in) pad = 3.0 seg_start = max(0, clip["start_time"] - pad) seg_end = clip["end_time"] + pad # Check cache first cache_key = f"{Path(audio_path).stem}_clip{i}_{seg_start:.1f}-{seg_end:.1f}" cache_path = Path(audio_path).parent / f".whisper_refine_{quality_model}_{cache_key}.json" if cache_path.exists(): print(f" Clip {i+1}: Using cached refinement") with open(cache_path) as f: refined[i] = json.load(f) continue # Extract clip segment to temp WAV seg_path = os.path.join(tmp, f"segment_{i}.wav") cmd = [ "ffmpeg", "-y", "-ss", str(seg_start), "-t", str(seg_end - seg_start), "-i", audio_path, "-ar", "16000", "-ac", "1", seg_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f" Clip {i+1}: Failed to extract segment") refined[i] = [] continue # Lazy-load model on first non-cached clip if model is None: model = WhisperModel(quality_model, compute_type="float32") segments_iter, info = model.transcribe( seg_path, word_timestamps=True, initial_prompt=initial_prompt, language="en", beam_size=5, vad_filter=True, ) # Collect segments and offset timestamps back to original timeline segments = [] for seg in segments_iter: words = [] if seg.words: for w in seg.words: words.append({ "word": w.word.strip(), "start": round(w.start + seg_start, 3), "end": round(w.end + seg_start, 3), }) segments.append({ "start": round(seg.start + seg_start, 3), "end": round(seg.end + seg_start, 3), "text": seg.text.strip(), "words": words, }) refined[i] = segments print(f" Clip {i+1}: Refined {info.duration:.1f}s → {len(segments)} segments") with open(cache_path, "w") as f: json.dump(segments, f) return refined def get_transcript_text(segments: list[dict]) -> str: """Build timestamped transcript text for the LLM.""" lines = [] for seg in segments: mins = int(seg["start"] // 60) secs = int(seg["start"] % 60) lines.append(f"[{mins:02d}:{secs:02d}] {seg['text']}") return "\n".join(lines) def select_clips_with_llm(transcript_text: str, labeled_transcript: str, chapters_json: str | None, count: int) -> list[dict]: """Ask LLM to pick the best clip-worthy moments.""" if not OPENROUTER_API_KEY: print("Error: OPENROUTER_API_KEY not set in .env") sys.exit(1) chapters_context = "" if chapters_json: chapters_context = f"\nCHAPTERS:\n{chapters_json}\n" labeled_context = "" if labeled_transcript: # Truncate if too long — LLM needs the gist, not every word if len(labeled_transcript) > 12000: labeled_context = f"\nSPEAKER-LABELED TRANSCRIPT (truncated):\n{labeled_transcript[:12000]}...\n" else: labeled_context = f"\nSPEAKER-LABELED TRANSCRIPT:\n{labeled_transcript}\n" prompt = f"""You are selecting the {count} best moments from a podcast episode for short-form video clips (TikTok/YouTube Shorts/Reels). Each clip should be 30-60 seconds long and contain a single compelling moment — a funny exchange, an emotional beat, a surprising take, or an interesting story. TIMESTAMPED TRANSCRIPT: {transcript_text} {chapters_context}{labeled_context} Pick the {count} best moments. For each, return: - title: A catchy, short title for the clip (max 8 words) - start_time: Start timestamp in seconds (float). Start a few seconds before the key moment for context. - end_time: End timestamp in seconds (float). 30-60 seconds after start_time. - caption_text: The key quote or line that makes this moment clip-worthy (1-2 sentences max) IMPORTANT: - Use the timestamps from the transcript to set precise start/end times - Ensure clips don't overlap - Prefer moments with back-and-forth dialog over monologues - Avoid intro/outro segments Respond with ONLY a JSON array, no markdown or explanation: [{{"title": "...", "start_time": 0.0, "end_time": 0.0, "caption_text": "..."}}]""" response = requests.post( "https://openrouter.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", }, json={ "model": "anthropic/claude-3.5-sonnet", "messages": [{"role": "user", "content": prompt}], "max_tokens": 2048, "temperature": 0.3, }, ) if response.status_code != 200: print(f"Error from OpenRouter: {response.text}") sys.exit(1) content = response.json()["choices"][0]["message"]["content"].strip() if content.startswith("```"): content = re.sub(r"^```(?:json)?\n?", "", content) content = re.sub(r"\n?```$", "", content) try: clips = json.loads(content) except json.JSONDecodeError as e: print(f"Error parsing LLM response: {e}") print(f"Response was: {content[:500]}") sys.exit(1) # Validate and clamp durations validated = [] for clip in clips: duration = clip["end_time"] - clip["start_time"] if duration < 15: clip["end_time"] = clip["start_time"] + 30 elif duration > 75: clip["end_time"] = clip["start_time"] + 60 validated.append(clip) return validated def snap_to_sentences(clips: list[dict], segments: list[dict]) -> list[dict]: """Snap clip start/end times to sentence boundaries. Uses Whisper segment boundaries and punctuation to find the nearest sentence start/end so clips don't begin or end mid-sentence. """ # Build a list of sentence boundary timestamps from Whisper segments. # A sentence boundary is: the start of a segment, or a word right after .?! sentence_starts = [] sentence_ends = [] for seg in segments: sentence_starts.append(seg["start"]) sentence_ends.append(seg["end"]) # Also find sentence breaks within segments using word punctuation words = seg.get("words", []) for i, w in enumerate(words): if w["word"].rstrip().endswith(('.', '?', '!')): sentence_ends.append(w["end"]) if i + 1 < len(words): sentence_starts.append(words[i + 1]["start"]) sentence_starts.sort() sentence_ends.sort() for clip in clips: original_start = clip["start_time"] original_end = clip["end_time"] # Find nearest sentence start at or before the clip start # Look up to 5s back for a sentence boundary best_start = original_start best_start_dist = float('inf') for s in sentence_starts: dist = abs(s - original_start) if dist < best_start_dist and s <= original_start + 1: best_start = s best_start_dist = dist if s > original_start + 1: break # Find nearest sentence end at or after the clip end # Look up to 5s forward for a sentence boundary best_end = original_end best_end_dist = float('inf') for e in sentence_ends: if e < original_end - 5: continue dist = abs(e - original_end) if dist < best_end_dist: best_end = e best_end_dist = dist if e > original_end + 5: break # Make sure we didn't create a clip that's too short or too long duration = best_end - best_start if duration < 20: # Too short — extend end to next sentence boundary for e in sentence_ends: if e > best_start + 25: best_end = e break elif duration > 75: # Too long — pull end back for e in reversed(sentence_ends): if best_start + 30 <= e <= best_start + 65: best_end = e break clip["start_time"] = best_start clip["end_time"] = best_end return clips def get_words_in_range(segments: list[dict], start: float, end: float) -> list[dict]: """Extract word-level timestamps for a time range from Whisper segments.""" words = [] for seg in segments: if seg["end"] < start or seg["start"] > end: continue for w in seg.get("words", []): if w["start"] >= start - 0.5 and w["end"] <= end + 0.5: words.append(w) return words def _words_similar(a: str, b: str, max_dist: int = 2) -> bool: """Check if two words are within edit distance max_dist (Levenshtein).""" if abs(len(a) - len(b)) > max_dist: return False # Simple DP edit distance, bounded prev = list(range(len(b) + 1)) for i in range(1, len(a) + 1): curr = [i] + [0] * len(b) for j in range(1, len(b) + 1): cost = 0 if a[i - 1] == b[j - 1] else 1 curr[j] = min(curr[j - 1] + 1, prev[j] + 1, prev[j - 1] + cost) prev = curr return prev[len(b)] <= max_dist def _find_labeled_section(labeled_transcript: str, range_text: str) -> str | None: """Find the section of labeled transcript matching a Whisper text range.""" # Strip speaker labels and punctuation from labeled transcript for matching labeled_stripped = re.sub(r'^[A-Z][A-Z\s\'-]+?:\s*', '', labeled_transcript, flags=re.MULTILINE) labeled_clean = re.sub(r'[^\w\s]', '', labeled_stripped.lower()) labeled_clean = re.sub(r'\s+', ' ', labeled_clean) whisper_clean = re.sub(r'[^\w\s]', '', range_text.lower()) whisper_clean = re.sub(r'\s+', ' ', whisper_clean) whisper_words_list = whisper_clean.split() # Try progressively shorter phrases from different positions for phrase_len in [10, 7, 5, 3]: for start_offset in [0, len(whisper_words_list) // 3, len(whisper_words_list) // 2]: words_slice = whisper_words_list[start_offset:start_offset + phrase_len] phrase = " ".join(words_slice) if len(phrase) < 8: continue pos = labeled_clean.find(phrase) if pos != -1: # Map back to original transcript — find first word near this position match_pos = labeled_transcript.lower().find( words_slice[0], max(0, pos - 300)) if match_pos == -1: match_pos = max(0, pos) else: match_pos = max(0, match_pos - start_offset * 6) context_start = max(0, match_pos - 400) context_end = min(len(labeled_transcript), match_pos + len(range_text) + 600) return labeled_transcript[context_start:context_end] return None def _parse_labeled_words(labeled_section: str) -> list[tuple[str, str, str]]: """Parse speaker-labeled text into (original_word, clean_lower, speaker) tuples.""" result = [] for m in re.finditer(r'^([A-Z][A-Z\s\'-]+?):\s*(.+?)(?=\n[A-Z][A-Z\s\'-]+?:|\n\n|\Z)', labeled_section, re.MULTILINE | re.DOTALL): speaker = m.group(1).strip() text = m.group(2) for w in text.split(): original = w.strip() clean = re.sub(r"[^\w']", '', original.lower()) if clean: result.append((original, clean, speaker)) return result def add_speaker_labels(words: list[dict], labeled_transcript: str, start_time: float, end_time: float, segments: list[dict]) -> list[dict]: """Add speaker labels AND correct word text using labeled transcript. Uses Whisper only for timestamps. Takes text from the labeled transcript, which has correct names and spelling. Aligns using greedy forward matching with edit-distance fuzzy matching. """ if not labeled_transcript or not words: return words # Get the raw Whisper text for this time range range_text = "" for seg in segments: if seg["end"] < start_time or seg["start"] > end_time: continue range_text += " " + seg["text"] range_text = range_text.strip() # Find matching section in labeled transcript labeled_section = _find_labeled_section(labeled_transcript, range_text) if not labeled_section: return words labeled_words_flat = _parse_labeled_words(labeled_section) if not labeled_words_flat: return words # Greedy forward alignment: for each Whisper word, find best match # in labeled words within a lookahead window labeled_idx = 0 current_speaker = labeled_words_flat[0][2] corrections = 0 for word_entry in words: whisper_clean = re.sub(r"[^\w']", '', word_entry["word"].lower()) if not whisper_clean: word_entry["speaker"] = current_speaker continue # Search forward for best match best_idx = None best_score = 0 # 2 = exact, 1 = fuzzy window = min(labeled_idx + 12, len(labeled_words_flat)) for j in range(labeled_idx, window): labeled_clean = labeled_words_flat[j][1] if labeled_clean == whisper_clean: best_idx = j best_score = 2 break if len(whisper_clean) >= 3 and len(labeled_clean) >= 3: if _words_similar(whisper_clean, labeled_clean): if best_score < 1: best_idx = j best_score = 1 # Don't break — keep looking for exact match if best_idx is not None: original_word, _, speaker = labeled_words_flat[best_idx] current_speaker = speaker # Replace Whisper's word with correct version corrected = re.sub(r'[^\w\s\'-]', '', original_word) if corrected and corrected.lower() != whisper_clean: word_entry["word"] = corrected corrections += 1 elif corrected: word_entry["word"] = corrected labeled_idx = best_idx + 1 else: # No match — advance labeled pointer by 1 to stay roughly in sync if labeled_idx < len(labeled_words_flat): labeled_idx += 1 word_entry["speaker"] = current_speaker if corrections: print(f" Corrected {corrections} words from labeled transcript") return words def group_words_into_lines(words: list[dict], clip_start: float, clip_duration: float) -> list[dict]: """Group words into timed caption lines for rendering. Returns list of: {start, end, speaker, words: [{word, highlighted}]} """ if not words: return [] # Group words into display lines (5-7 words per line) raw_lines = [] current_line = [] for w in words: current_line.append(w) if len(current_line) >= 6 or w["word"].rstrip().endswith(('.', '?', '!', ',')): if len(current_line) >= 3: raw_lines.append(current_line) current_line = [] if current_line: if raw_lines and len(current_line) < 3: raw_lines[-1].extend(current_line) else: raw_lines.append(current_line) lines = [] for line_words in raw_lines: line_start = line_words[0]["start"] - clip_start line_end = line_words[-1]["end"] - clip_start if line_start < 0: line_start = 0 if line_end > clip_duration: line_end = clip_duration if line_end <= line_start: continue lines.append({ "start": line_start, "end": line_end, "speaker": line_words[0].get("speaker", ""), "words": line_words, }) return lines def extract_clip_audio(audio_path: str, start: float, end: float, output_path: str) -> bool: """Extract audio clip with fade in/out.""" duration = end - start fade_in = 0.3 fade_out = 0.5 af = f"afade=t=in:d={fade_in},afade=t=out:st={duration - fade_out}:d={fade_out}" cmd = [ "ffmpeg", "-y", "-ss", str(start), "-t", str(duration), "-i", audio_path, "-af", af, "-ab", "192k", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True) return result.returncode == 0 def generate_background_image(episode_number: int, clip_title: str, output_path: str) -> bool: """Generate 9:16 vertical background with blurred/cropped cover art.""" from PIL import Image, ImageDraw, ImageFilter, ImageFont if not COVER_ART.exists(): print(f" Warning: Cover art not found at {COVER_ART}") # Create solid dark background fallback img = Image.new("RGB", (WIDTH, HEIGHT), (20, 15, 30)) img.save(output_path) return True cover = Image.open(COVER_ART).convert("RGB") # Scale cover to fill 1080x1920 (crop to fit) cover_ratio = cover.width / cover.height target_ratio = WIDTH / HEIGHT if cover_ratio > target_ratio: new_h = HEIGHT new_w = int(HEIGHT * cover_ratio) else: new_w = WIDTH new_h = int(WIDTH / cover_ratio) cover = cover.resize((new_w, new_h), Image.LANCZOS) # Center crop left = (new_w - WIDTH) // 2 top = (new_h - HEIGHT) // 2 cover = cover.crop((left, top, left + WIDTH, top + HEIGHT)) # Heavy blur + darken for background bg = cover.filter(ImageFilter.GaussianBlur(radius=30)) from PIL import ImageEnhance bg = ImageEnhance.Brightness(bg).enhance(0.3) # Place sharp cover art centered, sized to ~60% width art_size = int(WIDTH * 0.6) art = Image.open(COVER_ART).convert("RGB") art = art.resize((art_size, art_size), Image.LANCZOS) # Add rounded shadow effect (just darken behind) art_x = (WIDTH - art_size) // 2 art_y = int(HEIGHT * 0.18) bg.paste(art, (art_x, art_y)) # Draw text overlays draw = ImageDraw.Draw(bg) try: font_ep = ImageFont.truetype(FONT_BOLD, 42) font_title = ImageFont.truetype(FONT_BOLD, 56) font_url = ImageFont.truetype(FONT_SEMIBOLD, 32) except OSError: font_ep = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 42) font_title = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 56) font_url = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 32) margin = 60 # Episode label at top ep_text = f"EPISODE {episode_number}" if episode_number else "LUKE AT THE ROOST" draw.text((margin, 80), ep_text, font=font_ep, fill=(255, 200, 80)) # Clip title below episode label # Word wrap the title import textwrap wrapped_title = textwrap.fill(clip_title, width=22) draw.text((margin, 140), wrapped_title, font=font_title, fill=(255, 255, 255)) # Watermark at bottom url_text = "lukeattheroost.com" bbox = draw.textbbox((0, 0), url_text, font=font_url) url_w = bbox[2] - bbox[0] draw.text(((WIDTH - url_w) // 2, HEIGHT - 80), url_text, font=font_url, fill=(255, 200, 80, 200)) bg.save(output_path, "PNG") return True def generate_caption_frames(bg_path: str, caption_lines: list[dict], clip_start: float, duration: float, tmp_dir: Path, fps: int = 10) -> str: """Generate caption frame PNGs and a concat file for ffmpeg. Uses a low FPS (10) since the background is static — only captions change. Returns path to the concat file. """ from PIL import Image, ImageDraw, ImageFont bg = Image.open(bg_path).convert("RGB") try: font_caption = ImageFont.truetype(FONT_BOLD, 52) font_speaker = ImageFont.truetype(FONT_SEMIBOLD, 40) except OSError: font_caption = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 52) font_speaker = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 40) frames_dir = tmp_dir / "frames" frames_dir.mkdir(exist_ok=True) n_frames = int(duration * fps) frame_duration = 1.0 / fps concat_lines = [] prev_state = None # (line_idx, highlighted_word_idx) — only reuse when both match prev_frame_path = None for frame_num in range(n_frames): t = frame_num * frame_duration # Find active caption line active_idx = -1 active_line = None for i, line in enumerate(caption_lines): if line["start"] <= t <= line["end"]: active_idx = i active_line = line break # Find which word is currently highlighted highlight_idx = -1 if active_line: for wi, w in enumerate(active_line["words"]): word_rel_start = w["start"] - clip_start word_rel_end = w["end"] - clip_start if word_rel_start <= t <= word_rel_end: highlight_idx = wi break if highlight_idx == -1: # Between words — highlight the last spoken word for wi in range(len(active_line["words"]) - 1, -1, -1): if t > active_line["words"][wi]["end"] - clip_start: highlight_idx = wi break # Reuse previous frame only if same line AND same highlighted word state = (active_idx, highlight_idx) if state == prev_state and prev_frame_path: concat_lines.append(f"file '{prev_frame_path}'") concat_lines.append(f"duration {frame_duration:.4f}") continue frame = bg.copy() if active_line: draw = ImageDraw.Draw(frame) margin = 60 caption_y = int(HEIGHT * 0.78) # Speaker label if active_line.get("speaker"): for dx, dy in [(-1, -1), (-1, 1), (1, -1), (1, 1)]: draw.text((margin + dx, caption_y - 55 + dy), active_line["speaker"], font=font_speaker, fill=(0, 0, 0)) draw.text((margin, caption_y - 55), active_line["speaker"], font=font_speaker, fill=(255, 200, 80)) # Caption text — all words visible, current word highlighted yellow x = margin y = caption_y for wi, w in enumerate(active_line["words"]): word_text = w["word"] + " " if wi == highlight_idx: color = (255, 200, 80) # Yellow — currently spoken elif wi < highlight_idx or (highlight_idx == -1 and t > w["end"] - clip_start): color = (255, 255, 255) # White — already spoken else: color = (180, 180, 180) # Gray — upcoming bbox = draw.textbbox((0, 0), word_text, font=font_caption) w_width = bbox[2] - bbox[0] # Wrap line if x + w_width > WIDTH - margin: x = margin y += 65 # Outline for dx, dy in [(-2, -2), (-2, 2), (2, -2), (2, 2)]: draw.text((x + dx, y + dy), w["word"], font=font_caption, fill=(0, 0, 0)) draw.text((x, y), w["word"], font=font_caption, fill=color) x += w_width frame_path = str(frames_dir / f"frame_{frame_num:05d}.png") frame.save(frame_path, "PNG") concat_lines.append(f"file '{frame_path}'") concat_lines.append(f"duration {frame_duration:.4f}") prev_state = state prev_frame_path = frame_path # Final frame needs duration too if prev_frame_path: concat_lines.append(f"file '{prev_frame_path}'") concat_lines.append(f"duration {frame_duration:.4f}") concat_path = str(tmp_dir / "concat.txt") with open(concat_path, "w") as f: f.write("\n".join(concat_lines)) return concat_path def generate_clip_video(audio_path: str, background_path: str, caption_lines: list[dict], clip_start: float, output_path: str, duration: float, tmp_dir: Path) -> bool: """Generate clip video with burned-in captions using Pillow + ffmpeg.""" if caption_lines: # Generate frames with captions concat_path = generate_caption_frames( background_path, caption_lines, clip_start, duration, tmp_dir ) cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", concat_path, "-i", audio_path, "-c:v", "libx264", "-preset", "medium", "-crf", "23", "-pix_fmt", "yuv420p", "-c:a", "aac", "-b:a", "192k", "-t", str(duration), "-shortest", "-r", "30", output_path, ] else: # No captions — just static image + audio cmd = [ "ffmpeg", "-y", "-loop", "1", "-i", background_path, "-i", audio_path, "-c:v", "libx264", "-preset", "medium", "-crf", "23", "-pix_fmt", "yuv420p", "-c:a", "aac", "-b:a", "192k", "-t", str(duration), "-shortest", "-r", "30", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f" ffmpeg error: {result.stderr[-300:]}") return False return True def slugify(text: str) -> str: """Convert text to URL-friendly slug.""" slug = re.sub(r'[^a-z0-9]+', '-', text.lower()).strip('-') return slug[:50] def detect_episode_number(audio_path: str) -> int | None: """Try to detect episode number from filename.""" name = Path(audio_path).stem m = re.search(r'(?:episode|ep|podcast)[-_]?(\d+)', name, re.IGNORECASE) if m: return int(m.group(1)) m = re.search(r'(\d+)', name) if m: return int(m.group(1)) return None def main(): parser = argparse.ArgumentParser(description="Extract short-form clips from podcast episodes") parser.add_argument("audio_file", help="Path to episode MP3") parser.add_argument("--transcript", "-t", help="Path to labeled transcript (.txt)") parser.add_argument("--chapters", "-c", help="Path to chapters JSON") parser.add_argument("--count", "-n", type=int, default=3, help="Number of clips to extract (default: 3)") parser.add_argument("--episode-number", "-e", type=int, help="Episode number (auto-detected from filename)") parser.add_argument("--output-dir", "-o", help="Output directory (default: clips/episode-N/)") parser.add_argument("--audio-only", action="store_true", help="Only extract audio clips, skip video") parser.add_argument("--fast-model", default=WHISPER_MODEL_FAST, help=f"Fast Whisper model for clip identification (default: {WHISPER_MODEL_FAST})") parser.add_argument("--quality-model", default=WHISPER_MODEL_QUALITY, help=f"Quality Whisper model for clip refinement (default: {WHISPER_MODEL_QUALITY})") parser.add_argument("--single-pass", action="store_true", help="Use quality model for everything (slower, no two-pass)") args = parser.parse_args() audio_path = Path(args.audio_file).expanduser().resolve() if not audio_path.exists(): print(f"Error: Audio file not found: {audio_path}") sys.exit(1) # Detect episode number episode_number = args.episode_number or detect_episode_number(str(audio_path)) # Resolve output directory if args.output_dir: output_dir = Path(args.output_dir) elif episode_number: output_dir = Path(__file__).parent / "clips" / f"episode-{episode_number}" else: output_dir = Path(__file__).parent / "clips" / audio_path.stem output_dir.mkdir(parents=True, exist_ok=True) print(f"Clip extraction: {audio_path.name}") if episode_number: print(f"Episode: {episode_number}") print(f"Output: {output_dir}") print(f"Clips requested: {args.count}") # Step 1: Load labeled transcript (needed to prime Whisper with names) print(f"\n[1] Loading labeled transcript...") labeled_transcript = "" if args.transcript: transcript_path = Path(args.transcript).expanduser().resolve() if transcript_path.exists(): labeled_transcript = transcript_path.read_text() print(f" Loaded: {transcript_path.name} ({len(labeled_transcript)} chars)") else: print(f" Warning: Transcript not found: {transcript_path}") else: # Auto-detect from website/transcripts/ transcripts_dir = Path(__file__).parent / "website" / "transcripts" if episode_number and transcripts_dir.exists(): for f in transcripts_dir.iterdir(): if f.suffix == ".txt" and f"episode-{episode_number}" in f.name: labeled_transcript = f.read_text() print(f" Auto-detected: {f.name}") break if not labeled_transcript: print(" No labeled transcript found (names may be inaccurate)") # Step 2: Fast transcription for clip identification two_pass = not args.single_pass and args.fast_model != args.quality_model if two_pass: print(f"\n[2/6] Fast transcription for clip identification ({args.fast_model})...") else: print(f"\n[2/5] Transcribing with word-level timestamps ({args.quality_model})...") identify_model = args.fast_model if two_pass else args.quality_model segments = transcribe_with_timestamps( str(audio_path), identify_model, labeled_transcript ) # Build timestamped transcript for LLM transcript_text = get_transcript_text(segments) # Load chapters if provided chapters_json = None if args.chapters: chapters_path = Path(args.chapters).expanduser().resolve() if chapters_path.exists(): with open(chapters_path) as f: chapters_json = f.read() print(f" Chapters loaded: {chapters_path.name}") # Step 3: LLM selects best moments step_total = 6 if two_pass else 5 print(f"\n[3/{step_total}] Selecting {args.count} best moments with LLM...") clips = select_clips_with_llm(transcript_text, labeled_transcript, chapters_json, args.count) # Snap to sentence boundaries so clips don't start/end mid-sentence clips = snap_to_sentences(clips, segments) for i, clip in enumerate(clips): duration = clip["end_time"] - clip["start_time"] print(f" Clip {i+1}: \"{clip['title']}\" " f"({clip['start_time']:.1f}s - {clip['end_time']:.1f}s, {duration:.0f}s)") print(f" \"{clip['caption_text']}\"") # Step 4: Refine clip timestamps with quality model (two-pass only) refined = {} if two_pass: print(f"\n[4/{step_total}] Refining clips with {args.quality_model}...") refined = refine_clip_timestamps( str(audio_path), clips, args.quality_model, labeled_transcript ) # Re-snap to sentence boundaries using refined segments for i, clip in enumerate(clips): if i in refined and refined[i]: clip_segments = refined[i] clips[i:i+1] = snap_to_sentences([clip], clip_segments) # Step N: Extract audio clips extract_step = 5 if two_pass else 4 print(f"\n[{extract_step}/{step_total}] Extracting audio clips...") for i, clip in enumerate(clips): slug = slugify(clip["title"]) mp3_path = output_dir / f"clip-{i+1}-{slug}.mp3" if extract_clip_audio(str(audio_path), clip["start_time"], clip["end_time"], str(mp3_path)): print(f" Clip {i+1} audio: {mp3_path.name}") else: print(f" Error extracting clip {i+1} audio") video_step = 6 if two_pass else 5 if args.audio_only: print(f"\n[{video_step}/{step_total}] Skipped video generation (--audio-only)") print(f"\nDone! {len(clips)} audio clips saved to {output_dir}") return # Step N: Generate video clips print(f"\n[{video_step}/{step_total}] Generating video clips...") with tempfile.TemporaryDirectory() as tmp: tmp_dir = Path(tmp) for i, clip in enumerate(clips): slug = slugify(clip["title"]) mp3_path = output_dir / f"clip-{i+1}-{slug}.mp3" mp4_path = output_dir / f"clip-{i+1}-{slug}.mp4" duration = clip["end_time"] - clip["start_time"] print(f" Clip {i+1}: Generating video...") # Generate background image bg_path = str(tmp_dir / f"bg_{i}.png") generate_background_image(episode_number, clip["title"], bg_path) # Get word timestamps — use refined segments if available word_source = refined[i] if (two_pass and i in refined and refined[i]) else segments clip_words = get_words_in_range(word_source, clip["start_time"], clip["end_time"]) # Add speaker labels clip_words = add_speaker_labels(clip_words, labeled_transcript, clip["start_time"], clip["end_time"], word_source) # Group words into timed caption lines caption_lines = group_words_into_lines( clip_words, clip["start_time"], duration ) # Use a per-clip temp dir for frames clip_tmp = tmp_dir / f"clip_{i}" clip_tmp.mkdir(exist_ok=True) # Composite video if generate_clip_video(str(mp3_path), bg_path, caption_lines, clip["start_time"], str(mp4_path), duration, clip_tmp): file_size = mp4_path.stat().st_size / (1024 * 1024) print(f" Clip {i+1} video: {mp4_path.name} ({file_size:.1f} MB)") else: print(f" Error generating clip {i+1} video") # Summary print(f"\nDone! {len(clips)} clips saved to {output_dir}") for i, clip in enumerate(clips): slug = slugify(clip["title"]) mp4 = output_dir / f"clip-{i+1}-{slug}.mp4" mp3 = output_dir / f"clip-{i+1}-{slug}.mp3" print(f" {i+1}. \"{clip['title']}\"") if mp4.exists(): print(f" Video: {mp4}") if mp3.exists(): print(f" Audio: {mp3}") if __name__ == "__main__": main()