diff --git a/deploy_stats_cron.sh b/deploy_stats_cron.sh new file mode 100755 index 0000000..991fb09 --- /dev/null +++ b/deploy_stats_cron.sh @@ -0,0 +1,73 @@ +#!/bin/bash +# Deploy podcast_stats.py to NAS as a long-running Docker container that updates hourly. +# +# Usage: ./deploy_stats_cron.sh + +set -e + +NAS_HOST="mmgnas-10g" +NAS_USER="luke" +NAS_PORT="8001" +DOCKER_BIN="/share/CACHEDEV1_DATA/.qpkg/container-station/bin/docker" +DEPLOY_DIR="/share/CACHEDEV1_DATA/podcast-stats" +CONTAINER_NAME="podcast-stats" + +echo "Deploying podcast stats to NAS..." + +# Create deploy dir and copy files +ssh -p "$NAS_PORT" "$NAS_USER@$NAS_HOST" "mkdir -p $DEPLOY_DIR" +scp -P "$NAS_PORT" podcast_stats.py "$NAS_USER@$NAS_HOST:$DEPLOY_DIR/podcast_stats.py" + +# Create Dockerfile locally, then copy it over (NAS /tmp is tiny) +TMPFILE=$(mktemp) +cat > "$TMPFILE" << 'DOCKERFILE' +FROM python:3.11-slim +RUN apt-get update && apt-get install -y --no-install-recommends curl && rm -rf /var/lib/apt/lists/* \ + && curl -fsSL https://download.docker.com/linux/static/stable/x86_64/docker-27.5.1.tgz | tar xz --strip-components=1 -C /usr/local/bin docker/docker \ + && apt-get purge -y curl && apt-get autoremove -y +RUN pip install --no-cache-dir requests yt-dlp +COPY podcast_stats.py /app/podcast_stats.py +COPY run_loop.sh /app/run_loop.sh +RUN chmod +x /app/run_loop.sh +WORKDIR /app +CMD ["/app/run_loop.sh"] +DOCKERFILE +scp -P "$NAS_PORT" "$TMPFILE" "$NAS_USER@$NAS_HOST:$DEPLOY_DIR/Dockerfile" +rm "$TMPFILE" + +# Create the loop script +TMPFILE=$(mktemp) +cat > "$TMPFILE" << 'LOOPSCRIPT' +#!/bin/sh +echo "podcast-stats: starting hourly loop" +while true; do + echo "$(date -u '+%Y-%m-%dT%H:%M:%SZ') Running stats update..." + python podcast_stats.py --json --upload 2>&1 || echo " ...failed, will retry next hour" + echo "Sleeping 1 hour..." + sleep 3600 +done +LOOPSCRIPT +scp -P "$NAS_PORT" "$TMPFILE" "$NAS_USER@$NAS_HOST:$DEPLOY_DIR/run_loop.sh" +rm "$TMPFILE" + +echo "Building Docker image on NAS..." +ssh -p "$NAS_PORT" "$NAS_USER@$NAS_HOST" \ + "TMPDIR=$DEPLOY_DIR $DOCKER_BIN build -t $CONTAINER_NAME $DEPLOY_DIR" + +# Stop old container if running +ssh -p "$NAS_PORT" "$NAS_USER@$NAS_HOST" \ + "$DOCKER_BIN rm -f $CONTAINER_NAME 2>/dev/null || true" + +# Run as a daemon with auto-restart (survives reboots) +echo "Starting container..." +ssh -p "$NAS_PORT" "$NAS_USER@$NAS_HOST" \ + "$DOCKER_BIN run -d --name $CONTAINER_NAME --restart unless-stopped --network host -v /var/run/docker.sock:/var/run/docker.sock $CONTAINER_NAME" + +echo "Verifying..." +sleep 3 +ssh -p "$NAS_PORT" "$NAS_USER@$NAS_HOST" \ + "$DOCKER_BIN logs $CONTAINER_NAME 2>&1 | tail -5" + +echo "" +echo "Done! Container runs hourly in a loop with --restart unless-stopped." +echo " Logs: ssh -p $NAS_PORT $NAS_USER@$NAS_HOST '$DOCKER_BIN logs -f $CONTAINER_NAME'" diff --git a/make_clips.py b/make_clips.py new file mode 100755 index 0000000..862d2cd --- /dev/null +++ b/make_clips.py @@ -0,0 +1,1088 @@ +#!/usr/bin/env python3 +"""Extract the best short-form clips from a podcast episode. + +Two-pass pipeline (default): + 1. Fast Whisper model (base) transcribes full episode for clip identification + 2. LLM selects best moments + 3. Quality Whisper model (large-v3) re-transcribes only selected clips for precise timestamps + +Usage: + python make_clips.py ~/Desktop/episode12.mp3 --count 3 + python make_clips.py ~/Desktop/episode12.mp3 --transcript website/transcripts/episode-12-love-lies-and-loyalty.txt + python make_clips.py ~/Desktop/episode12.mp3 --fast-model small --quality-model large-v3 + python make_clips.py ~/Desktop/episode12.mp3 --single-pass # skip two-pass, use quality model only +""" + +import argparse +import json +import os +import re +import subprocess +import sys +import tempfile +from pathlib import Path + +import requests +from dotenv import load_dotenv + +load_dotenv(Path(__file__).parent / ".env") + +OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") +WHISPER_MODEL_FAST = "base" +WHISPER_MODEL_QUALITY = "large-v3" +COVER_ART = Path(__file__).parent / "website" / "images" / "cover.png" + +# Fonts +FONT_BOLD = "/Library/Fonts/Montserrat-ExtraBold.ttf" +FONT_MEDIUM = "/Library/Fonts/Montserrat-Medium.ttf" +FONT_SEMIBOLD = "/Library/Fonts/Montserrat-SemiBold.ttf" + +# Video dimensions (9:16 vertical) +WIDTH = 1080 +HEIGHT = 1920 + + +def _build_whisper_prompt(labeled_transcript: str) -> str: + """Build an initial_prompt for Whisper from the labeled transcript. + + Whisper's initial_prompt conditions the model to recognize specific names + and vocabulary. We extract speaker names and the first few lines of dialog. + """ + prompt_parts = ["Luke at the Roost podcast. Host: Luke."] + + if labeled_transcript: + # Extract speaker names + names = set(re.findall(r'^([A-Z][A-Z\s\'-]+?):', labeled_transcript, re.MULTILINE)) + caller_names = [n.strip().title() for n in names if n.strip() != "LUKE"] + if caller_names: + prompt_parts.append(f"Callers: {', '.join(caller_names)}.") + + # First ~500 chars of transcript as context (stripped of labels) + stripped = re.sub(r'^[A-Z][A-Z\s\'-]+?:\s*', '', labeled_transcript[:800], flags=re.MULTILINE) + stripped = re.sub(r'\n+', ' ', stripped).strip()[:500] + if stripped: + prompt_parts.append(stripped) + + return " ".join(prompt_parts) + + +def transcribe_with_timestamps(audio_path: str, whisper_model: str = None, + labeled_transcript: str = "") -> list[dict]: + """Transcribe audio with word-level timestamps using faster-whisper. + + Returns list of segments: [{start, end, text, words: [{word, start, end}]}] + """ + model_name = whisper_model or WHISPER_MODEL_QUALITY + cache_path = Path(audio_path).with_suffix(f".whisper_cache_{model_name}.json") + if cache_path.exists(): + print(f" Using cached Whisper output ({model_name})") + with open(cache_path) as f: + return json.load(f) + + try: + from faster_whisper import WhisperModel + except ImportError: + print("Error: faster-whisper not installed. Run: pip install faster-whisper") + sys.exit(1) + + initial_prompt = _build_whisper_prompt(labeled_transcript) + print(f" Model: {model_name}") + if labeled_transcript: + print(f" Prompt: {initial_prompt[:100]}...") + model = WhisperModel(model_name, compute_type="float32") + segments_iter, info = model.transcribe( + audio_path, + word_timestamps=True, + initial_prompt=initial_prompt, + language="en", + beam_size=5, + vad_filter=True, + ) + + segments = [] + for seg in segments_iter: + words = [] + if seg.words: + for w in seg.words: + words.append({ + "word": w.word.strip(), + "start": round(w.start, 3), + "end": round(w.end, 3), + }) + segments.append({ + "start": round(seg.start, 3), + "end": round(seg.end, 3), + "text": seg.text.strip(), + "words": words, + }) + + print(f" Transcribed {info.duration:.1f}s ({len(segments)} segments)") + + with open(cache_path, "w") as f: + json.dump(segments, f) + print(f" Cached to {cache_path}") + + return segments + + +def refine_clip_timestamps(audio_path: str, clips: list[dict], + quality_model: str, labeled_transcript: str = "", + ) -> dict[int, list[dict]]: + """Re-transcribe just the selected clip ranges with a high-quality model. + + Extracts each clip segment, runs the quality model on it, and returns + refined segments with timestamps mapped back to the original timeline. + + Returns: {clip_index: [segments]} keyed by clip index + """ + try: + from faster_whisper import WhisperModel + except ImportError: + print("Error: faster-whisper not installed. Run: pip install faster-whisper") + sys.exit(1) + + initial_prompt = _build_whisper_prompt(labeled_transcript) + print(f" Refinement model: {quality_model}") + + model = None # Lazy-load so we skip if all cached + refined = {} + + with tempfile.TemporaryDirectory() as tmp: + for i, clip in enumerate(clips): + # Add padding around clip for context (Whisper does better with some lead-in) + pad = 3.0 + seg_start = max(0, clip["start_time"] - pad) + seg_end = clip["end_time"] + pad + + # Check cache first + cache_key = f"{Path(audio_path).stem}_clip{i}_{seg_start:.1f}-{seg_end:.1f}" + cache_path = Path(audio_path).parent / f".whisper_refine_{quality_model}_{cache_key}.json" + if cache_path.exists(): + print(f" Clip {i+1}: Using cached refinement") + with open(cache_path) as f: + refined[i] = json.load(f) + continue + + # Extract clip segment to temp WAV + seg_path = os.path.join(tmp, f"segment_{i}.wav") + cmd = [ + "ffmpeg", "-y", "-ss", str(seg_start), "-t", str(seg_end - seg_start), + "-i", audio_path, "-ar", "16000", "-ac", "1", seg_path, + ] + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + print(f" Clip {i+1}: Failed to extract segment") + refined[i] = [] + continue + + # Lazy-load model on first non-cached clip + if model is None: + model = WhisperModel(quality_model, compute_type="float32") + + segments_iter, info = model.transcribe( + seg_path, + word_timestamps=True, + initial_prompt=initial_prompt, + language="en", + beam_size=5, + vad_filter=True, + ) + + # Collect segments and offset timestamps back to original timeline + segments = [] + for seg in segments_iter: + words = [] + if seg.words: + for w in seg.words: + words.append({ + "word": w.word.strip(), + "start": round(w.start + seg_start, 3), + "end": round(w.end + seg_start, 3), + }) + segments.append({ + "start": round(seg.start + seg_start, 3), + "end": round(seg.end + seg_start, 3), + "text": seg.text.strip(), + "words": words, + }) + + refined[i] = segments + print(f" Clip {i+1}: Refined {info.duration:.1f}s → {len(segments)} segments") + + with open(cache_path, "w") as f: + json.dump(segments, f) + + return refined + + +def get_transcript_text(segments: list[dict]) -> str: + """Build timestamped transcript text for the LLM.""" + lines = [] + for seg in segments: + mins = int(seg["start"] // 60) + secs = int(seg["start"] % 60) + lines.append(f"[{mins:02d}:{secs:02d}] {seg['text']}") + return "\n".join(lines) + + +def select_clips_with_llm(transcript_text: str, labeled_transcript: str, + chapters_json: str | None, count: int) -> list[dict]: + """Ask LLM to pick the best clip-worthy moments.""" + if not OPENROUTER_API_KEY: + print("Error: OPENROUTER_API_KEY not set in .env") + sys.exit(1) + + chapters_context = "" + if chapters_json: + chapters_context = f"\nCHAPTERS:\n{chapters_json}\n" + + labeled_context = "" + if labeled_transcript: + # Truncate if too long — LLM needs the gist, not every word + if len(labeled_transcript) > 12000: + labeled_context = f"\nSPEAKER-LABELED TRANSCRIPT (truncated):\n{labeled_transcript[:12000]}...\n" + else: + labeled_context = f"\nSPEAKER-LABELED TRANSCRIPT:\n{labeled_transcript}\n" + + prompt = f"""You are selecting the {count} best moments from a podcast episode for short-form video clips (TikTok/YouTube Shorts/Reels). + +Each clip should be 30-60 seconds long and contain a single compelling moment — a funny exchange, an emotional beat, a surprising take, or an interesting story. + +TIMESTAMPED TRANSCRIPT: +{transcript_text} +{chapters_context}{labeled_context} +Pick the {count} best moments. For each, return: +- title: A catchy, short title for the clip (max 8 words) +- start_time: Start timestamp in seconds (float). Start a few seconds before the key moment for context. +- end_time: End timestamp in seconds (float). 30-60 seconds after start_time. +- caption_text: The key quote or line that makes this moment clip-worthy (1-2 sentences max) + +IMPORTANT: +- Use the timestamps from the transcript to set precise start/end times +- Ensure clips don't overlap +- Prefer moments with back-and-forth dialog over monologues +- Avoid intro/outro segments + +Respond with ONLY a JSON array, no markdown or explanation: +[{{"title": "...", "start_time": 0.0, "end_time": 0.0, "caption_text": "..."}}]""" + + response = requests.post( + "https://openrouter.ai/api/v1/chat/completions", + headers={ + "Authorization": f"Bearer {OPENROUTER_API_KEY}", + "Content-Type": "application/json", + }, + json={ + "model": "anthropic/claude-3.5-sonnet", + "messages": [{"role": "user", "content": prompt}], + "max_tokens": 2048, + "temperature": 0.3, + }, + ) + + if response.status_code != 200: + print(f"Error from OpenRouter: {response.text}") + sys.exit(1) + + content = response.json()["choices"][0]["message"]["content"].strip() + if content.startswith("```"): + content = re.sub(r"^```(?:json)?\n?", "", content) + content = re.sub(r"\n?```$", "", content) + + try: + clips = json.loads(content) + except json.JSONDecodeError as e: + print(f"Error parsing LLM response: {e}") + print(f"Response was: {content[:500]}") + sys.exit(1) + + # Validate and clamp durations + validated = [] + for clip in clips: + duration = clip["end_time"] - clip["start_time"] + if duration < 15: + clip["end_time"] = clip["start_time"] + 30 + elif duration > 75: + clip["end_time"] = clip["start_time"] + 60 + validated.append(clip) + + return validated + + +def snap_to_sentences(clips: list[dict], segments: list[dict]) -> list[dict]: + """Snap clip start/end times to sentence boundaries. + + Uses Whisper segment boundaries and punctuation to find the nearest + sentence start/end so clips don't begin or end mid-sentence. + """ + # Build a list of sentence boundary timestamps from Whisper segments. + # A sentence boundary is: the start of a segment, or a word right after .?! + sentence_starts = [] + sentence_ends = [] + + for seg in segments: + sentence_starts.append(seg["start"]) + sentence_ends.append(seg["end"]) + + # Also find sentence breaks within segments using word punctuation + words = seg.get("words", []) + for i, w in enumerate(words): + if w["word"].rstrip().endswith(('.', '?', '!')): + sentence_ends.append(w["end"]) + if i + 1 < len(words): + sentence_starts.append(words[i + 1]["start"]) + + sentence_starts.sort() + sentence_ends.sort() + + for clip in clips: + original_start = clip["start_time"] + original_end = clip["end_time"] + + # Find nearest sentence start at or before the clip start + # Look up to 5s back for a sentence boundary + best_start = original_start + best_start_dist = float('inf') + for s in sentence_starts: + dist = abs(s - original_start) + if dist < best_start_dist and s <= original_start + 1: + best_start = s + best_start_dist = dist + if s > original_start + 1: + break + + # Find nearest sentence end at or after the clip end + # Look up to 5s forward for a sentence boundary + best_end = original_end + best_end_dist = float('inf') + for e in sentence_ends: + if e < original_end - 5: + continue + dist = abs(e - original_end) + if dist < best_end_dist: + best_end = e + best_end_dist = dist + if e > original_end + 5: + break + + # Make sure we didn't create a clip that's too short or too long + duration = best_end - best_start + if duration < 20: + # Too short — extend end to next sentence boundary + for e in sentence_ends: + if e > best_start + 25: + best_end = e + break + elif duration > 75: + # Too long — pull end back + for e in reversed(sentence_ends): + if best_start + 30 <= e <= best_start + 65: + best_end = e + break + + clip["start_time"] = best_start + clip["end_time"] = best_end + + return clips + + +def get_words_in_range(segments: list[dict], start: float, end: float) -> list[dict]: + """Extract word-level timestamps for a time range from Whisper segments.""" + words = [] + for seg in segments: + if seg["end"] < start or seg["start"] > end: + continue + for w in seg.get("words", []): + if w["start"] >= start - 0.5 and w["end"] <= end + 0.5: + words.append(w) + return words + + +def _words_similar(a: str, b: str, max_dist: int = 2) -> bool: + """Check if two words are within edit distance max_dist (Levenshtein).""" + if abs(len(a) - len(b)) > max_dist: + return False + # Simple DP edit distance, bounded + prev = list(range(len(b) + 1)) + for i in range(1, len(a) + 1): + curr = [i] + [0] * len(b) + for j in range(1, len(b) + 1): + cost = 0 if a[i - 1] == b[j - 1] else 1 + curr[j] = min(curr[j - 1] + 1, prev[j] + 1, prev[j - 1] + cost) + prev = curr + return prev[len(b)] <= max_dist + + +def _find_labeled_section(labeled_transcript: str, range_text: str) -> str | None: + """Find the section of labeled transcript matching a Whisper text range.""" + # Strip speaker labels and punctuation from labeled transcript for matching + labeled_stripped = re.sub(r'^[A-Z][A-Z\s\'-]+?:\s*', '', labeled_transcript, flags=re.MULTILINE) + labeled_clean = re.sub(r'[^\w\s]', '', labeled_stripped.lower()) + labeled_clean = re.sub(r'\s+', ' ', labeled_clean) + + whisper_clean = re.sub(r'[^\w\s]', '', range_text.lower()) + whisper_clean = re.sub(r'\s+', ' ', whisper_clean) + whisper_words_list = whisper_clean.split() + + # Try progressively shorter phrases from different positions + for phrase_len in [10, 7, 5, 3]: + for start_offset in [0, len(whisper_words_list) // 3, len(whisper_words_list) // 2]: + words_slice = whisper_words_list[start_offset:start_offset + phrase_len] + phrase = " ".join(words_slice) + if len(phrase) < 8: + continue + pos = labeled_clean.find(phrase) + if pos != -1: + # Map back to original transcript — find first word near this position + match_pos = labeled_transcript.lower().find( + words_slice[0], max(0, pos - 300)) + if match_pos == -1: + match_pos = max(0, pos) + else: + match_pos = max(0, match_pos - start_offset * 6) + + context_start = max(0, match_pos - 400) + context_end = min(len(labeled_transcript), match_pos + len(range_text) + 600) + return labeled_transcript[context_start:context_end] + + return None + + +def _parse_labeled_words(labeled_section: str) -> list[tuple[str, str, str]]: + """Parse speaker-labeled text into (original_word, clean_lower, speaker) tuples.""" + result = [] + for m in re.finditer(r'^([A-Z][A-Z\s\'-]+?):\s*(.+?)(?=\n[A-Z][A-Z\s\'-]+?:|\n\n|\Z)', + labeled_section, re.MULTILINE | re.DOTALL): + speaker = m.group(1).strip() + text = m.group(2) + for w in text.split(): + original = w.strip() + clean = re.sub(r"[^\w']", '', original.lower()) + if clean: + result.append((original, clean, speaker)) + return result + + +def add_speaker_labels(words: list[dict], labeled_transcript: str, + start_time: float, end_time: float, + segments: list[dict]) -> list[dict]: + """Add speaker labels AND correct word text using labeled transcript. + + Uses Whisper only for timestamps. Takes text from the labeled transcript, + which has correct names and spelling. Aligns using greedy forward matching + with edit-distance fuzzy matching. + """ + if not labeled_transcript or not words: + return words + + # Get the raw Whisper text for this time range + range_text = "" + for seg in segments: + if seg["end"] < start_time or seg["start"] > end_time: + continue + range_text += " " + seg["text"] + range_text = range_text.strip() + + # Find matching section in labeled transcript + labeled_section = _find_labeled_section(labeled_transcript, range_text) + if not labeled_section: + return words + + labeled_words_flat = _parse_labeled_words(labeled_section) + if not labeled_words_flat: + return words + + # Greedy forward alignment: for each Whisper word, find best match + # in labeled words within a lookahead window + labeled_idx = 0 + current_speaker = labeled_words_flat[0][2] + corrections = 0 + + for word_entry in words: + whisper_clean = re.sub(r"[^\w']", '', word_entry["word"].lower()) + if not whisper_clean: + word_entry["speaker"] = current_speaker + continue + + # Search forward for best match + best_idx = None + best_score = 0 # 2 = exact, 1 = fuzzy + window = min(labeled_idx + 12, len(labeled_words_flat)) + + for j in range(labeled_idx, window): + labeled_clean = labeled_words_flat[j][1] + + if labeled_clean == whisper_clean: + best_idx = j + best_score = 2 + break + + if len(whisper_clean) >= 3 and len(labeled_clean) >= 3: + if _words_similar(whisper_clean, labeled_clean): + if best_score < 1: + best_idx = j + best_score = 1 + # Don't break — keep looking for exact match + + if best_idx is not None: + original_word, _, speaker = labeled_words_flat[best_idx] + current_speaker = speaker + + # Replace Whisper's word with correct version + corrected = re.sub(r'[^\w\s\'-]', '', original_word) + if corrected and corrected.lower() != whisper_clean: + word_entry["word"] = corrected + corrections += 1 + elif corrected: + word_entry["word"] = corrected + + labeled_idx = best_idx + 1 + else: + # No match — advance labeled pointer by 1 to stay roughly in sync + if labeled_idx < len(labeled_words_flat): + labeled_idx += 1 + + word_entry["speaker"] = current_speaker + + if corrections: + print(f" Corrected {corrections} words from labeled transcript") + + return words + + +def group_words_into_lines(words: list[dict], clip_start: float, + clip_duration: float) -> list[dict]: + """Group words into timed caption lines for rendering. + + Returns list of: {start, end, speaker, words: [{word, highlighted}]} + """ + if not words: + return [] + + # Group words into display lines (5-7 words per line) + raw_lines = [] + current_line = [] + for w in words: + current_line.append(w) + if len(current_line) >= 6 or w["word"].rstrip().endswith(('.', '?', '!', ',')): + if len(current_line) >= 3: + raw_lines.append(current_line) + current_line = [] + if current_line: + if raw_lines and len(current_line) < 3: + raw_lines[-1].extend(current_line) + else: + raw_lines.append(current_line) + + lines = [] + for line_words in raw_lines: + line_start = line_words[0]["start"] - clip_start + line_end = line_words[-1]["end"] - clip_start + + if line_start < 0: + line_start = 0 + if line_end > clip_duration: + line_end = clip_duration + if line_end <= line_start: + continue + + lines.append({ + "start": line_start, + "end": line_end, + "speaker": line_words[0].get("speaker", ""), + "words": line_words, + }) + + return lines + + +def extract_clip_audio(audio_path: str, start: float, end: float, + output_path: str) -> bool: + """Extract audio clip with fade in/out.""" + duration = end - start + fade_in = 0.3 + fade_out = 0.5 + + af = f"afade=t=in:d={fade_in},afade=t=out:st={duration - fade_out}:d={fade_out}" + cmd = [ + "ffmpeg", "-y", + "-ss", str(start), + "-t", str(duration), + "-i", audio_path, + "-af", af, + "-ab", "192k", + output_path, + ] + + result = subprocess.run(cmd, capture_output=True, text=True) + return result.returncode == 0 + + +def generate_background_image(episode_number: int, clip_title: str, + output_path: str) -> bool: + """Generate 9:16 vertical background with blurred/cropped cover art.""" + from PIL import Image, ImageDraw, ImageFilter, ImageFont + + if not COVER_ART.exists(): + print(f" Warning: Cover art not found at {COVER_ART}") + # Create solid dark background fallback + img = Image.new("RGB", (WIDTH, HEIGHT), (20, 15, 30)) + img.save(output_path) + return True + + cover = Image.open(COVER_ART).convert("RGB") + + # Scale cover to fill 1080x1920 (crop to fit) + cover_ratio = cover.width / cover.height + target_ratio = WIDTH / HEIGHT + + if cover_ratio > target_ratio: + new_h = HEIGHT + new_w = int(HEIGHT * cover_ratio) + else: + new_w = WIDTH + new_h = int(WIDTH / cover_ratio) + + cover = cover.resize((new_w, new_h), Image.LANCZOS) + + # Center crop + left = (new_w - WIDTH) // 2 + top = (new_h - HEIGHT) // 2 + cover = cover.crop((left, top, left + WIDTH, top + HEIGHT)) + + # Heavy blur + darken for background + bg = cover.filter(ImageFilter.GaussianBlur(radius=30)) + from PIL import ImageEnhance + bg = ImageEnhance.Brightness(bg).enhance(0.3) + + # Place sharp cover art centered, sized to ~60% width + art_size = int(WIDTH * 0.6) + art = Image.open(COVER_ART).convert("RGB") + art = art.resize((art_size, art_size), Image.LANCZOS) + + # Add rounded shadow effect (just darken behind) + art_x = (WIDTH - art_size) // 2 + art_y = int(HEIGHT * 0.18) + bg.paste(art, (art_x, art_y)) + + # Draw text overlays + draw = ImageDraw.Draw(bg) + + try: + font_ep = ImageFont.truetype(FONT_BOLD, 42) + font_title = ImageFont.truetype(FONT_BOLD, 56) + font_url = ImageFont.truetype(FONT_SEMIBOLD, 32) + except OSError: + font_ep = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 42) + font_title = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 56) + font_url = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 32) + + margin = 60 + + # Episode label at top + ep_text = f"EPISODE {episode_number}" if episode_number else "LUKE AT THE ROOST" + draw.text((margin, 80), ep_text, font=font_ep, fill=(255, 200, 80)) + + # Clip title below episode label + # Word wrap the title + import textwrap + wrapped_title = textwrap.fill(clip_title, width=22) + draw.text((margin, 140), wrapped_title, font=font_title, fill=(255, 255, 255)) + + # Watermark at bottom + url_text = "lukeattheroost.com" + bbox = draw.textbbox((0, 0), url_text, font=font_url) + url_w = bbox[2] - bbox[0] + draw.text(((WIDTH - url_w) // 2, HEIGHT - 80), url_text, + font=font_url, fill=(255, 200, 80, 200)) + + bg.save(output_path, "PNG") + return True + + +def generate_caption_frames(bg_path: str, caption_lines: list[dict], + clip_start: float, duration: float, + tmp_dir: Path, fps: int = 10) -> str: + """Generate caption frame PNGs and a concat file for ffmpeg. + + Uses a low FPS (10) since the background is static — only captions change. + Returns path to the concat file. + """ + from PIL import Image, ImageDraw, ImageFont + + bg = Image.open(bg_path).convert("RGB") + + try: + font_caption = ImageFont.truetype(FONT_BOLD, 52) + font_speaker = ImageFont.truetype(FONT_SEMIBOLD, 40) + except OSError: + font_caption = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 52) + font_speaker = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 40) + + frames_dir = tmp_dir / "frames" + frames_dir.mkdir(exist_ok=True) + + n_frames = int(duration * fps) + frame_duration = 1.0 / fps + + concat_lines = [] + + prev_state = None # (line_idx, highlighted_word_idx) — only reuse when both match + prev_frame_path = None + + for frame_num in range(n_frames): + t = frame_num * frame_duration + + # Find active caption line + active_idx = -1 + active_line = None + for i, line in enumerate(caption_lines): + if line["start"] <= t <= line["end"]: + active_idx = i + active_line = line + break + + # Find which word is currently highlighted + highlight_idx = -1 + if active_line: + for wi, w in enumerate(active_line["words"]): + word_rel_start = w["start"] - clip_start + word_rel_end = w["end"] - clip_start + if word_rel_start <= t <= word_rel_end: + highlight_idx = wi + break + if highlight_idx == -1: + # Between words — highlight the last spoken word + for wi in range(len(active_line["words"]) - 1, -1, -1): + if t > active_line["words"][wi]["end"] - clip_start: + highlight_idx = wi + break + + # Reuse previous frame only if same line AND same highlighted word + state = (active_idx, highlight_idx) + if state == prev_state and prev_frame_path: + concat_lines.append(f"file '{prev_frame_path}'") + concat_lines.append(f"duration {frame_duration:.4f}") + continue + + frame = bg.copy() + + if active_line: + draw = ImageDraw.Draw(frame) + margin = 60 + caption_y = int(HEIGHT * 0.78) + + # Speaker label + if active_line.get("speaker"): + for dx, dy in [(-1, -1), (-1, 1), (1, -1), (1, 1)]: + draw.text((margin + dx, caption_y - 55 + dy), + active_line["speaker"], + font=font_speaker, fill=(0, 0, 0)) + draw.text((margin, caption_y - 55), active_line["speaker"], + font=font_speaker, fill=(255, 200, 80)) + + # Caption text — all words visible, current word highlighted yellow + x = margin + y = caption_y + for wi, w in enumerate(active_line["words"]): + word_text = w["word"] + " " + + if wi == highlight_idx: + color = (255, 200, 80) # Yellow — currently spoken + elif wi < highlight_idx or (highlight_idx == -1 and t > w["end"] - clip_start): + color = (255, 255, 255) # White — already spoken + else: + color = (180, 180, 180) # Gray — upcoming + + bbox = draw.textbbox((0, 0), word_text, font=font_caption) + w_width = bbox[2] - bbox[0] + + # Wrap line + if x + w_width > WIDTH - margin: + x = margin + y += 65 + + # Outline + for dx, dy in [(-2, -2), (-2, 2), (2, -2), (2, 2)]: + draw.text((x + dx, y + dy), w["word"], + font=font_caption, fill=(0, 0, 0)) + + draw.text((x, y), w["word"], font=font_caption, fill=color) + x += w_width + + frame_path = str(frames_dir / f"frame_{frame_num:05d}.png") + frame.save(frame_path, "PNG") + + concat_lines.append(f"file '{frame_path}'") + concat_lines.append(f"duration {frame_duration:.4f}") + + prev_state = state + prev_frame_path = frame_path + + # Final frame needs duration too + if prev_frame_path: + concat_lines.append(f"file '{prev_frame_path}'") + concat_lines.append(f"duration {frame_duration:.4f}") + + concat_path = str(tmp_dir / "concat.txt") + with open(concat_path, "w") as f: + f.write("\n".join(concat_lines)) + + return concat_path + + +def generate_clip_video(audio_path: str, background_path: str, + caption_lines: list[dict], clip_start: float, + output_path: str, duration: float, + tmp_dir: Path) -> bool: + """Generate clip video with burned-in captions using Pillow + ffmpeg.""" + if caption_lines: + # Generate frames with captions + concat_path = generate_caption_frames( + background_path, caption_lines, clip_start, duration, tmp_dir + ) + + cmd = [ + "ffmpeg", "-y", + "-f", "concat", "-safe", "0", "-i", concat_path, + "-i", audio_path, + "-c:v", "libx264", "-preset", "medium", "-crf", "23", + "-pix_fmt", "yuv420p", + "-c:a", "aac", "-b:a", "192k", + "-t", str(duration), + "-shortest", + "-r", "30", + output_path, + ] + else: + # No captions — just static image + audio + cmd = [ + "ffmpeg", "-y", + "-loop", "1", "-i", background_path, + "-i", audio_path, + "-c:v", "libx264", "-preset", "medium", "-crf", "23", + "-pix_fmt", "yuv420p", + "-c:a", "aac", "-b:a", "192k", + "-t", str(duration), + "-shortest", + "-r", "30", + output_path, + ] + + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + print(f" ffmpeg error: {result.stderr[-300:]}") + return False + return True + + +def slugify(text: str) -> str: + """Convert text to URL-friendly slug.""" + slug = re.sub(r'[^a-z0-9]+', '-', text.lower()).strip('-') + return slug[:50] + + +def detect_episode_number(audio_path: str) -> int | None: + """Try to detect episode number from filename.""" + name = Path(audio_path).stem + m = re.search(r'(?:episode|ep|podcast)[-_]?(\d+)', name, re.IGNORECASE) + if m: + return int(m.group(1)) + m = re.search(r'(\d+)', name) + if m: + return int(m.group(1)) + return None + + +def main(): + parser = argparse.ArgumentParser(description="Extract short-form clips from podcast episodes") + parser.add_argument("audio_file", help="Path to episode MP3") + parser.add_argument("--transcript", "-t", help="Path to labeled transcript (.txt)") + parser.add_argument("--chapters", "-c", help="Path to chapters JSON") + parser.add_argument("--count", "-n", type=int, default=3, help="Number of clips to extract (default: 3)") + parser.add_argument("--episode-number", "-e", type=int, help="Episode number (auto-detected from filename)") + parser.add_argument("--output-dir", "-o", help="Output directory (default: clips/episode-N/)") + parser.add_argument("--audio-only", action="store_true", help="Only extract audio clips, skip video") + parser.add_argument("--fast-model", default=WHISPER_MODEL_FAST, + help=f"Fast Whisper model for clip identification (default: {WHISPER_MODEL_FAST})") + parser.add_argument("--quality-model", default=WHISPER_MODEL_QUALITY, + help=f"Quality Whisper model for clip refinement (default: {WHISPER_MODEL_QUALITY})") + parser.add_argument("--single-pass", action="store_true", + help="Use quality model for everything (slower, no two-pass)") + args = parser.parse_args() + + audio_path = Path(args.audio_file).expanduser().resolve() + if not audio_path.exists(): + print(f"Error: Audio file not found: {audio_path}") + sys.exit(1) + + # Detect episode number + episode_number = args.episode_number or detect_episode_number(str(audio_path)) + + # Resolve output directory + if args.output_dir: + output_dir = Path(args.output_dir) + elif episode_number: + output_dir = Path(__file__).parent / "clips" / f"episode-{episode_number}" + else: + output_dir = Path(__file__).parent / "clips" / audio_path.stem + output_dir.mkdir(parents=True, exist_ok=True) + + print(f"Clip extraction: {audio_path.name}") + if episode_number: + print(f"Episode: {episode_number}") + print(f"Output: {output_dir}") + print(f"Clips requested: {args.count}") + + # Step 1: Load labeled transcript (needed to prime Whisper with names) + print(f"\n[1] Loading labeled transcript...") + labeled_transcript = "" + if args.transcript: + transcript_path = Path(args.transcript).expanduser().resolve() + if transcript_path.exists(): + labeled_transcript = transcript_path.read_text() + print(f" Loaded: {transcript_path.name} ({len(labeled_transcript)} chars)") + else: + print(f" Warning: Transcript not found: {transcript_path}") + else: + # Auto-detect from website/transcripts/ + transcripts_dir = Path(__file__).parent / "website" / "transcripts" + if episode_number and transcripts_dir.exists(): + for f in transcripts_dir.iterdir(): + if f.suffix == ".txt" and f"episode-{episode_number}" in f.name: + labeled_transcript = f.read_text() + print(f" Auto-detected: {f.name}") + break + if not labeled_transcript: + print(" No labeled transcript found (names may be inaccurate)") + + # Step 2: Fast transcription for clip identification + two_pass = not args.single_pass and args.fast_model != args.quality_model + if two_pass: + print(f"\n[2/6] Fast transcription for clip identification ({args.fast_model})...") + else: + print(f"\n[2/5] Transcribing with word-level timestamps ({args.quality_model})...") + identify_model = args.fast_model if two_pass else args.quality_model + segments = transcribe_with_timestamps( + str(audio_path), identify_model, labeled_transcript + ) + + # Build timestamped transcript for LLM + transcript_text = get_transcript_text(segments) + + # Load chapters if provided + chapters_json = None + if args.chapters: + chapters_path = Path(args.chapters).expanduser().resolve() + if chapters_path.exists(): + with open(chapters_path) as f: + chapters_json = f.read() + print(f" Chapters loaded: {chapters_path.name}") + + # Step 3: LLM selects best moments + step_total = 6 if two_pass else 5 + print(f"\n[3/{step_total}] Selecting {args.count} best moments with LLM...") + clips = select_clips_with_llm(transcript_text, labeled_transcript, + chapters_json, args.count) + + # Snap to sentence boundaries so clips don't start/end mid-sentence + clips = snap_to_sentences(clips, segments) + + for i, clip in enumerate(clips): + duration = clip["end_time"] - clip["start_time"] + print(f" Clip {i+1}: \"{clip['title']}\" " + f"({clip['start_time']:.1f}s - {clip['end_time']:.1f}s, {duration:.0f}s)") + print(f" \"{clip['caption_text']}\"") + + # Step 4: Refine clip timestamps with quality model (two-pass only) + refined = {} + if two_pass: + print(f"\n[4/{step_total}] Refining clips with {args.quality_model}...") + refined = refine_clip_timestamps( + str(audio_path), clips, args.quality_model, labeled_transcript + ) + # Re-snap to sentence boundaries using refined segments + for i, clip in enumerate(clips): + if i in refined and refined[i]: + clip_segments = refined[i] + clips[i:i+1] = snap_to_sentences([clip], clip_segments) + + # Step N: Extract audio clips + extract_step = 5 if two_pass else 4 + print(f"\n[{extract_step}/{step_total}] Extracting audio clips...") + for i, clip in enumerate(clips): + slug = slugify(clip["title"]) + mp3_path = output_dir / f"clip-{i+1}-{slug}.mp3" + + if extract_clip_audio(str(audio_path), clip["start_time"], clip["end_time"], + str(mp3_path)): + print(f" Clip {i+1} audio: {mp3_path.name}") + else: + print(f" Error extracting clip {i+1} audio") + + video_step = 6 if two_pass else 5 + if args.audio_only: + print(f"\n[{video_step}/{step_total}] Skipped video generation (--audio-only)") + print(f"\nDone! {len(clips)} audio clips saved to {output_dir}") + return + + # Step N: Generate video clips + print(f"\n[{video_step}/{step_total}] Generating video clips...") + + with tempfile.TemporaryDirectory() as tmp: + tmp_dir = Path(tmp) + + for i, clip in enumerate(clips): + slug = slugify(clip["title"]) + mp3_path = output_dir / f"clip-{i+1}-{slug}.mp3" + mp4_path = output_dir / f"clip-{i+1}-{slug}.mp4" + duration = clip["end_time"] - clip["start_time"] + + print(f" Clip {i+1}: Generating video...") + + # Generate background image + bg_path = str(tmp_dir / f"bg_{i}.png") + generate_background_image(episode_number, clip["title"], bg_path) + + # Get word timestamps — use refined segments if available + word_source = refined[i] if (two_pass and i in refined and refined[i]) else segments + clip_words = get_words_in_range(word_source, clip["start_time"], clip["end_time"]) + + # Add speaker labels + clip_words = add_speaker_labels(clip_words, labeled_transcript, + clip["start_time"], clip["end_time"], + word_source) + + # Group words into timed caption lines + caption_lines = group_words_into_lines( + clip_words, clip["start_time"], duration + ) + + # Use a per-clip temp dir for frames + clip_tmp = tmp_dir / f"clip_{i}" + clip_tmp.mkdir(exist_ok=True) + + # Composite video + if generate_clip_video(str(mp3_path), bg_path, caption_lines, + clip["start_time"], str(mp4_path), + duration, clip_tmp): + file_size = mp4_path.stat().st_size / (1024 * 1024) + print(f" Clip {i+1} video: {mp4_path.name} ({file_size:.1f} MB)") + else: + print(f" Error generating clip {i+1} video") + + # Summary + print(f"\nDone! {len(clips)} clips saved to {output_dir}") + for i, clip in enumerate(clips): + slug = slugify(clip["title"]) + mp4 = output_dir / f"clip-{i+1}-{slug}.mp4" + mp3 = output_dir / f"clip-{i+1}-{slug}.mp3" + print(f" {i+1}. \"{clip['title']}\"") + if mp4.exists(): + print(f" Video: {mp4}") + if mp3.exists(): + print(f" Audio: {mp3}") + + +if __name__ == "__main__": + main() diff --git a/podcast_stats.py b/podcast_stats.py index c5191da..1ff5fc3 100644 --- a/podcast_stats.py +++ b/podcast_stats.py @@ -38,6 +38,18 @@ BUNNY_STORAGE_REGION = "la" BUNNY_ACCOUNT_KEY = "2865f279-297b-431a-ad18-0ccf1f8e4fa8cf636cea-3222-415a-84ed-56ee195c0530" +def _find_ytdlp(): + """Find yt-dlp: check local venv first, then fall back to PATH.""" + import shutil + venv_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "venv", "bin", "yt-dlp") + if os.path.exists(venv_path): + return venv_path + path_bin = shutil.which("yt-dlp") + if path_bin: + return path_bin + return "yt-dlp" + + def gather_apple_reviews(): all_reviews = [] seen_ids = set() @@ -129,7 +141,7 @@ def gather_youtube(include_comments=False): try: proc = subprocess.run( - [os.path.join(os.path.dirname(os.path.abspath(__file__)), "venv", "bin", "yt-dlp"), "--dump-json", "--flat-playlist", + [_find_ytdlp(), "--dump-json", "--flat-playlist", f"https://www.youtube.com/playlist?list={YOUTUBE_PLAYLIST}"], capture_output=True, text=True, timeout=60 ) @@ -160,7 +172,7 @@ def gather_youtube(include_comments=False): for vid in video_ids: try: - cmd = [os.path.join(os.path.dirname(os.path.abspath(__file__)), "venv", "bin", "yt-dlp"), "--dump-json", "--no-download", f"https://www.youtube.com/watch?v={vid}"] + cmd = [_find_ytdlp(), "--dump-json", "--no-download", f"https://www.youtube.com/watch?v={vid}"] if include_comments: cmd.insert(2, "--write-comments") vr = subprocess.run(cmd, capture_output=True, text=True, timeout=90) @@ -204,7 +216,7 @@ def gather_youtube(include_comments=False): if videos: try: vr = subprocess.run( - [os.path.join(os.path.dirname(os.path.abspath(__file__)), "venv", "bin", "yt-dlp"), "--dump-json", "--no-download", "--playlist-items", "1", + [_find_ytdlp(), "--dump-json", "--no-download", "--playlist-items", "1", f"https://www.youtube.com/playlist?list={YOUTUBE_PLAYLIST}"], capture_output=True, text=True, timeout=30 ) @@ -224,10 +236,21 @@ def gather_youtube(include_comments=False): def _run_db_query(sql): - cmd = [ - "ssh", "-p", NAS_SSH_PORT, NAS_SSH, - f"{DOCKER_BIN} exec -i {CASTOPOD_DB_CONTAINER} mysql -u castopod -pBYtbFfk3ndeVabb26xb0UyKU castopod -N" - ] + # If running on NAS (docker socket available), exec directly + docker_bin = None + for path in [DOCKER_BIN, "/usr/bin/docker", "/usr/local/bin/docker"]: + if os.path.exists(path): + docker_bin = path + break + + if docker_bin: + cmd = [docker_bin, "exec", "-i", CASTOPOD_DB_CONTAINER, + "mysql", "-u", "castopod", "-pBYtbFfk3ndeVabb26xb0UyKU", "castopod", "-N"] + else: + cmd = [ + "ssh", "-p", NAS_SSH_PORT, NAS_SSH, + f"{DOCKER_BIN} exec -i {CASTOPOD_DB_CONTAINER} mysql -u castopod -pBYtbFfk3ndeVabb26xb0UyKU castopod -N" + ] try: proc = subprocess.run(cmd, input=sql, capture_output=True, text=True, timeout=30) stderr = proc.stderr.strip() @@ -236,7 +259,7 @@ def _run_db_query(sql): return None, stderr return stdout, None except subprocess.TimeoutExpired: - return None, "SSH timeout" + return None, "timeout" except Exception as e: return None, str(e) diff --git a/publish_episode.py b/publish_episode.py index a1c746d..8109037 100755 --- a/publish_episode.py +++ b/publish_episode.py @@ -60,7 +60,7 @@ PODCAST_ID = 1 PODCAST_HANDLE = "LukeAtTheRoost" OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") -WHISPER_MODEL = "base" # Options: tiny, base, small, medium, large +WHISPER_MODEL = "large-v3" # Postiz (social media posting) POSTIZ_URL = "https://social.lukeattheroost.com"