Ep13 publish, MLX whisper, voicemail system, hero redesign, massive topic expansion
- Switch whisper transcription from faster-whisper (CPU) to lightning-whisper-mlx (GPU) - Fix word_timestamps hanging, use ffprobe for accurate duration - Add Cloudflare Pages Worker for SignalWire voicemail fallback when server offline - Add voicemail sync on startup, delete tracking, save feature - Add /feed RSS proxy to _worker.js (was broken by worker taking over routing) - Redesign website hero section: ghost buttons, compact phone, plain text links - Rewrite caller prompts for faster point-getting and host-following - Expand TOPIC_CALLIN from ~250 to 547 entries across 34 categories - Add new categories: biology, psychology, engineering, math, geology, animals, work, money, books, movies, relationships, health, language, true crime, drunk/high/unhinged callers - Remove bad Inworld voices (Pixie, Dominus), reduce repeat caller frequency - Add audio monitor device routing, uvicorn --reload-dir fix - Publish episode 13 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
542
make_clips.py
542
make_clips.py
@@ -20,6 +20,7 @@ import re
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import xml.etree.ElementTree as ET
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
@@ -28,6 +29,8 @@ from dotenv import load_dotenv
|
||||
load_dotenv(Path(__file__).parent / ".env")
|
||||
|
||||
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
|
||||
RSS_FEED_URL = "https://podcast.macneilmediagroup.com/@LukeAtTheRoost/feed.xml"
|
||||
EPISODE_CACHE_DIR = Path(__file__).parent / "clips" / ".episode-cache"
|
||||
WHISPER_MODEL_FAST = "base"
|
||||
WHISPER_MODEL_QUALITY = "large-v3"
|
||||
COVER_ART = Path(__file__).parent / "website" / "images" / "cover.png"
|
||||
@@ -273,7 +276,7 @@ Respond with ONLY a JSON array, no markdown or explanation:
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
json={
|
||||
"model": "anthropic/claude-3.5-sonnet",
|
||||
"model": "anthropic/claude-sonnet-4-5",
|
||||
"messages": [{"role": "user", "content": prompt}],
|
||||
"max_tokens": 2048,
|
||||
"temperature": 0.3,
|
||||
@@ -309,6 +312,70 @@ Respond with ONLY a JSON array, no markdown or explanation:
|
||||
return validated
|
||||
|
||||
|
||||
def generate_social_metadata(clips: list[dict], labeled_transcript: str,
|
||||
episode_number: int | None) -> list[dict]:
|
||||
"""Generate social media descriptions and hashtags for each clip."""
|
||||
if not OPENROUTER_API_KEY:
|
||||
print("Error: OPENROUTER_API_KEY not set in .env")
|
||||
sys.exit(1)
|
||||
|
||||
clips_summary = "\n".join(
|
||||
f'{i+1}. "{c["title"]}" — {c["caption_text"]}'
|
||||
for i, c in enumerate(clips)
|
||||
)
|
||||
|
||||
episode_context = f"This is Episode {episode_number} of " if episode_number else "This is an episode of "
|
||||
|
||||
prompt = f"""{episode_context}the "Luke at the Roost" podcast — a late-night call-in show where AI-generated callers share stories, confessions, and hot takes with host Luke.
|
||||
|
||||
Here are {len(clips)} clips selected from this episode:
|
||||
|
||||
{clips_summary}
|
||||
|
||||
For each clip, generate:
|
||||
1. description: A short, engaging description for social media (1-2 sentences, hook the viewer, conversational tone). Do NOT include hashtags in the description.
|
||||
2. hashtags: An array of 5-8 hashtags. Always include #lukeattheroost and #podcast. Add topic-relevant and trending-style tags.
|
||||
|
||||
Respond with ONLY a JSON array matching the clip order:
|
||||
[{{"description": "...", "hashtags": ["#tag1", "#tag2", ...]}}]"""
|
||||
|
||||
response = requests.post(
|
||||
"https://openrouter.ai/api/v1/chat/completions",
|
||||
headers={
|
||||
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
json={
|
||||
"model": "anthropic/claude-sonnet-4-5",
|
||||
"messages": [{"role": "user", "content": prompt}],
|
||||
"max_tokens": 2048,
|
||||
"temperature": 0.7,
|
||||
},
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
print(f"Error from OpenRouter: {response.text}")
|
||||
return clips
|
||||
|
||||
content = response.json()["choices"][0]["message"]["content"].strip()
|
||||
if content.startswith("```"):
|
||||
content = re.sub(r"^```(?:json)?\n?", "", content)
|
||||
content = re.sub(r"\n?```$", "", content)
|
||||
|
||||
try:
|
||||
metadata = json.loads(content)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Error parsing social metadata: {e}")
|
||||
return clips
|
||||
|
||||
for i, clip in enumerate(clips):
|
||||
if i < len(metadata):
|
||||
clip["description"] = metadata[i].get("description", "")
|
||||
clip["hashtags"] = metadata[i].get("hashtags", [])
|
||||
|
||||
return clips
|
||||
|
||||
|
||||
def snap_to_sentences(clips: list[dict], segments: list[dict]) -> list[dict]:
|
||||
"""Snap clip start/end times to sentence boundaries.
|
||||
|
||||
@@ -398,11 +465,10 @@ def get_words_in_range(segments: list[dict], start: float, end: float) -> list[d
|
||||
return words
|
||||
|
||||
|
||||
def _words_similar(a: str, b: str, max_dist: int = 2) -> bool:
|
||||
"""Check if two words are within edit distance max_dist (Levenshtein)."""
|
||||
if abs(len(a) - len(b)) > max_dist:
|
||||
return False
|
||||
# Simple DP edit distance, bounded
|
||||
def _edit_distance(a: str, b: str) -> int:
|
||||
"""Levenshtein edit distance between two strings."""
|
||||
if abs(len(a) - len(b)) > 5:
|
||||
return max(len(a), len(b))
|
||||
prev = list(range(len(b) + 1))
|
||||
for i in range(1, len(a) + 1):
|
||||
curr = [i] + [0] * len(b)
|
||||
@@ -410,139 +476,204 @@ def _words_similar(a: str, b: str, max_dist: int = 2) -> bool:
|
||||
cost = 0 if a[i - 1] == b[j - 1] else 1
|
||||
curr[j] = min(curr[j - 1] + 1, prev[j] + 1, prev[j - 1] + cost)
|
||||
prev = curr
|
||||
return prev[len(b)] <= max_dist
|
||||
return prev[len(b)]
|
||||
|
||||
|
||||
def _find_labeled_section(labeled_transcript: str, range_text: str) -> str | None:
|
||||
"""Find the section of labeled transcript matching a Whisper text range."""
|
||||
# Strip speaker labels and punctuation from labeled transcript for matching
|
||||
labeled_stripped = re.sub(r'^[A-Z][A-Z\s\'-]+?:\s*', '', labeled_transcript, flags=re.MULTILINE)
|
||||
labeled_clean = re.sub(r'[^\w\s]', '', labeled_stripped.lower())
|
||||
labeled_clean = re.sub(r'\s+', ' ', labeled_clean)
|
||||
|
||||
whisper_clean = re.sub(r'[^\w\s]', '', range_text.lower())
|
||||
whisper_clean = re.sub(r'\s+', ' ', whisper_clean)
|
||||
whisper_words_list = whisper_clean.split()
|
||||
|
||||
# Try progressively shorter phrases from different positions
|
||||
for phrase_len in [10, 7, 5, 3]:
|
||||
for start_offset in [0, len(whisper_words_list) // 3, len(whisper_words_list) // 2]:
|
||||
words_slice = whisper_words_list[start_offset:start_offset + phrase_len]
|
||||
phrase = " ".join(words_slice)
|
||||
if len(phrase) < 8:
|
||||
continue
|
||||
pos = labeled_clean.find(phrase)
|
||||
if pos != -1:
|
||||
# Map back to original transcript — find first word near this position
|
||||
match_pos = labeled_transcript.lower().find(
|
||||
words_slice[0], max(0, pos - 300))
|
||||
if match_pos == -1:
|
||||
match_pos = max(0, pos)
|
||||
else:
|
||||
match_pos = max(0, match_pos - start_offset * 6)
|
||||
|
||||
context_start = max(0, match_pos - 400)
|
||||
context_end = min(len(labeled_transcript), match_pos + len(range_text) + 600)
|
||||
return labeled_transcript[context_start:context_end]
|
||||
|
||||
return None
|
||||
def _word_score(a: str, b: str) -> int:
|
||||
"""Alignment score: +2 exact, +1 fuzzy (edit dist ≤2), -1 mismatch."""
|
||||
if a == b:
|
||||
return 2
|
||||
if len(a) >= 3 and len(b) >= 3 and _edit_distance(a, b) <= 2:
|
||||
return 1
|
||||
return -1
|
||||
|
||||
|
||||
def _parse_labeled_words(labeled_section: str) -> list[tuple[str, str, str]]:
|
||||
"""Parse speaker-labeled text into (original_word, clean_lower, speaker) tuples."""
|
||||
def _align_sequences(whisper_words: list[str],
|
||||
labeled_words: list[str]) -> list[tuple[int | None, int | None]]:
|
||||
"""Needleman-Wunsch DP alignment between whisper and labeled word sequences.
|
||||
|
||||
Returns list of (whisper_idx, labeled_idx) pairs where None = gap.
|
||||
"""
|
||||
n = len(whisper_words)
|
||||
m = len(labeled_words)
|
||||
GAP = -1
|
||||
|
||||
# Build score matrix
|
||||
score = [[0] * (m + 1) for _ in range(n + 1)]
|
||||
for i in range(1, n + 1):
|
||||
score[i][0] = score[i - 1][0] + GAP
|
||||
for j in range(1, m + 1):
|
||||
score[0][j] = score[0][j - 1] + GAP
|
||||
|
||||
for i in range(1, n + 1):
|
||||
for j in range(1, m + 1):
|
||||
match = score[i - 1][j - 1] + _word_score(whisper_words[i - 1], labeled_words[j - 1])
|
||||
delete = score[i - 1][j] + GAP
|
||||
insert = score[i][j - 1] + GAP
|
||||
score[i][j] = max(match, delete, insert)
|
||||
|
||||
# Traceback
|
||||
pairs = []
|
||||
i, j = n, m
|
||||
while i > 0 or j > 0:
|
||||
if i > 0 and j > 0 and score[i][j] == score[i - 1][j - 1] + _word_score(whisper_words[i - 1], labeled_words[j - 1]):
|
||||
pairs.append((i - 1, j - 1))
|
||||
i -= 1
|
||||
j -= 1
|
||||
elif i > 0 and score[i][j] == score[i - 1][j] + GAP:
|
||||
pairs.append((i - 1, None))
|
||||
i -= 1
|
||||
else:
|
||||
pairs.append((None, j - 1))
|
||||
j -= 1
|
||||
|
||||
pairs.reverse()
|
||||
return pairs
|
||||
|
||||
|
||||
def _parse_full_transcript(labeled_transcript: str) -> list[dict]:
|
||||
"""Parse entire labeled transcript into flat word list with speaker metadata.
|
||||
|
||||
Returns list of {word: str, clean: str, speaker: str} for every word.
|
||||
"""
|
||||
result = []
|
||||
for m in re.finditer(r'^([A-Z][A-Z\s\'-]+?):\s*(.+?)(?=\n[A-Z][A-Z\s\'-]+?:|\n\n|\Z)',
|
||||
labeled_section, re.MULTILINE | re.DOTALL):
|
||||
labeled_transcript, re.MULTILINE | re.DOTALL):
|
||||
speaker = m.group(1).strip()
|
||||
text = m.group(2)
|
||||
for w in text.split():
|
||||
original = w.strip()
|
||||
clean = re.sub(r"[^\w']", '', original.lower())
|
||||
if clean:
|
||||
result.append((original, clean, speaker))
|
||||
result.append({"word": original, "clean": clean, "speaker": speaker})
|
||||
return result
|
||||
|
||||
|
||||
def _find_transcript_region(labeled_words: list[dict], whisper_words: list[str],
|
||||
) -> tuple[int, int] | None:
|
||||
"""Find the region of labeled_words that best matches the whisper words.
|
||||
|
||||
Uses multi-anchor matching: tries phrases from start, middle, and end
|
||||
of the whisper words to find a consensus region.
|
||||
"""
|
||||
if not whisper_words or not labeled_words:
|
||||
return None
|
||||
|
||||
labeled_clean = [w["clean"] for w in labeled_words]
|
||||
n_labeled = len(labeled_clean)
|
||||
|
||||
def find_phrase(phrase_words: list[str], search_start: int = 0,
|
||||
search_end: int | None = None) -> int | None:
|
||||
"""Find a phrase in labeled_clean, return index of first word or None."""
|
||||
if search_end is None:
|
||||
search_end = n_labeled
|
||||
plen = len(phrase_words)
|
||||
for i in range(search_start, min(search_end, n_labeled - plen + 1)):
|
||||
match = True
|
||||
for k in range(plen):
|
||||
if _word_score(phrase_words[k], labeled_clean[i + k]) < 1:
|
||||
match = False
|
||||
break
|
||||
if match:
|
||||
return i
|
||||
return None
|
||||
|
||||
# Try anchors from different positions in the whisper words
|
||||
anchors = []
|
||||
n_whisper = len(whisper_words)
|
||||
anchor_positions = [0, n_whisper // 2, max(0, n_whisper - 5)]
|
||||
# Deduplicate positions
|
||||
anchor_positions = sorted(set(anchor_positions))
|
||||
|
||||
for pos in anchor_positions:
|
||||
for phrase_len in [5, 4, 3]:
|
||||
phrase = whisper_words[pos:pos + phrase_len]
|
||||
if len(phrase) < 3:
|
||||
continue
|
||||
idx = find_phrase(phrase)
|
||||
if idx is not None:
|
||||
# Estimate region start based on anchor's position in whisper
|
||||
region_start = max(0, idx - pos)
|
||||
anchors.append(region_start)
|
||||
break
|
||||
|
||||
if not anchors:
|
||||
return None
|
||||
|
||||
# Use median anchor as region start for robustness
|
||||
anchors.sort()
|
||||
region_start = anchors[len(anchors) // 2]
|
||||
|
||||
# Region extends to cover all whisper words plus margin
|
||||
margin = max(20, n_whisper // 4)
|
||||
region_start = max(0, region_start - margin)
|
||||
region_end = min(n_labeled, region_start + n_whisper + 2 * margin)
|
||||
|
||||
return (region_start, region_end)
|
||||
|
||||
|
||||
def add_speaker_labels(words: list[dict], labeled_transcript: str,
|
||||
start_time: float, end_time: float,
|
||||
segments: list[dict]) -> list[dict]:
|
||||
"""Add speaker labels AND correct word text using labeled transcript.
|
||||
|
||||
Uses Whisper only for timestamps. Takes text from the labeled transcript,
|
||||
which has correct names and spelling. Aligns using greedy forward matching
|
||||
with edit-distance fuzzy matching.
|
||||
Uses Needleman-Wunsch DP alignment to match Whisper words to the labeled
|
||||
transcript. This handles insertions/deletions gracefully — one missed word
|
||||
becomes a single gap instead of cascading failures.
|
||||
"""
|
||||
if not labeled_transcript or not words:
|
||||
return words
|
||||
|
||||
# Get the raw Whisper text for this time range
|
||||
range_text = ""
|
||||
for seg in segments:
|
||||
if seg["end"] < start_time or seg["start"] > end_time:
|
||||
continue
|
||||
range_text += " " + seg["text"]
|
||||
range_text = range_text.strip()
|
||||
|
||||
# Find matching section in labeled transcript
|
||||
labeled_section = _find_labeled_section(labeled_transcript, range_text)
|
||||
if not labeled_section:
|
||||
# Parse full transcript into flat word list
|
||||
all_labeled = _parse_full_transcript(labeled_transcript)
|
||||
if not all_labeled:
|
||||
return words
|
||||
|
||||
labeled_words_flat = _parse_labeled_words(labeled_section)
|
||||
if not labeled_words_flat:
|
||||
# Build whisper clean word list
|
||||
whisper_clean = []
|
||||
for w in words:
|
||||
clean = re.sub(r"[^\w']", '', w["word"].lower())
|
||||
whisper_clean.append(clean if clean else w["word"].lower())
|
||||
|
||||
# Find the matching region in the transcript
|
||||
region = _find_transcript_region(all_labeled, whisper_clean)
|
||||
if region is None:
|
||||
return words
|
||||
|
||||
# Greedy forward alignment: for each Whisper word, find best match
|
||||
# in labeled words within a lookahead window
|
||||
labeled_idx = 0
|
||||
current_speaker = labeled_words_flat[0][2]
|
||||
region_start, region_end = region
|
||||
region_words = all_labeled[region_start:region_end]
|
||||
region_clean = [w["clean"] for w in region_words]
|
||||
|
||||
# Run DP alignment
|
||||
pairs = _align_sequences(whisper_clean, region_clean)
|
||||
|
||||
# Build speaker assignments from aligned pairs
|
||||
# matched[whisper_idx] = (labeled_word_dict, score)
|
||||
matched = {}
|
||||
for w_idx, l_idx in pairs:
|
||||
if w_idx is not None and l_idx is not None:
|
||||
score = _word_score(whisper_clean[w_idx], region_clean[l_idx])
|
||||
if score > 0:
|
||||
matched[w_idx] = (region_words[l_idx], score)
|
||||
|
||||
# Apply matches and interpolate speakers for gaps
|
||||
corrections = 0
|
||||
for i, word_entry in enumerate(words):
|
||||
if i in matched:
|
||||
labeled_word, score = matched[i]
|
||||
word_entry["speaker"] = labeled_word["speaker"]
|
||||
|
||||
for word_entry in words:
|
||||
whisper_clean = re.sub(r"[^\w']", '', word_entry["word"].lower())
|
||||
if not whisper_clean:
|
||||
word_entry["speaker"] = current_speaker
|
||||
continue
|
||||
|
||||
# Search forward for best match
|
||||
best_idx = None
|
||||
best_score = 0 # 2 = exact, 1 = fuzzy
|
||||
window = min(labeled_idx + 12, len(labeled_words_flat))
|
||||
|
||||
for j in range(labeled_idx, window):
|
||||
labeled_clean = labeled_words_flat[j][1]
|
||||
|
||||
if labeled_clean == whisper_clean:
|
||||
best_idx = j
|
||||
best_score = 2
|
||||
break
|
||||
|
||||
if len(whisper_clean) >= 3 and len(labeled_clean) >= 3:
|
||||
if _words_similar(whisper_clean, labeled_clean):
|
||||
if best_score < 1:
|
||||
best_idx = j
|
||||
best_score = 1
|
||||
# Don't break — keep looking for exact match
|
||||
|
||||
if best_idx is not None:
|
||||
original_word, _, speaker = labeled_words_flat[best_idx]
|
||||
current_speaker = speaker
|
||||
|
||||
# Replace Whisper's word with correct version
|
||||
corrected = re.sub(r'[^\w\s\'-]', '', original_word)
|
||||
if corrected and corrected.lower() != whisper_clean:
|
||||
# Replace text only on confident matches
|
||||
corrected = re.sub(r'[^\w\s\'-]', '', labeled_word["word"])
|
||||
if corrected:
|
||||
if corrected.lower() != whisper_clean[i]:
|
||||
corrections += 1
|
||||
word_entry["word"] = corrected
|
||||
corrections += 1
|
||||
elif corrected:
|
||||
word_entry["word"] = corrected
|
||||
|
||||
labeled_idx = best_idx + 1
|
||||
else:
|
||||
# No match — advance labeled pointer by 1 to stay roughly in sync
|
||||
if labeled_idx < len(labeled_words_flat):
|
||||
labeled_idx += 1
|
||||
|
||||
word_entry["speaker"] = current_speaker
|
||||
# Interpolate speaker from nearest matched neighbor
|
||||
speaker = _interpolate_speaker(i, matched, len(words))
|
||||
if speaker:
|
||||
word_entry["speaker"] = speaker
|
||||
|
||||
if corrections:
|
||||
print(f" Corrected {corrections} words from labeled transcript")
|
||||
@@ -550,6 +681,19 @@ def add_speaker_labels(words: list[dict], labeled_transcript: str,
|
||||
return words
|
||||
|
||||
|
||||
def _interpolate_speaker(idx: int, matched: dict, n_words: int) -> str | None:
|
||||
"""Find speaker from nearest matched neighbor."""
|
||||
# Search outward from idx
|
||||
for dist in range(1, n_words):
|
||||
before = idx - dist
|
||||
after = idx + dist
|
||||
if before >= 0 and before in matched:
|
||||
return matched[before][0]["speaker"]
|
||||
if after < n_words and after in matched:
|
||||
return matched[after][0]["speaker"]
|
||||
return None
|
||||
|
||||
|
||||
def group_words_into_lines(words: list[dict], clip_start: float,
|
||||
clip_duration: float) -> list[dict]:
|
||||
"""Group words into timed caption lines for rendering.
|
||||
@@ -894,9 +1038,123 @@ def detect_episode_number(audio_path: str) -> int | None:
|
||||
return None
|
||||
|
||||
|
||||
def fetch_episodes() -> list[dict]:
|
||||
"""Fetch episode list from Castopod RSS feed."""
|
||||
print("Fetching episodes from Castopod...")
|
||||
try:
|
||||
resp = requests.get(RSS_FEED_URL, timeout=15)
|
||||
resp.raise_for_status()
|
||||
except requests.RequestException as e:
|
||||
print(f"Error fetching RSS feed: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
root = ET.fromstring(resp.content)
|
||||
ns = {"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd"}
|
||||
episodes = []
|
||||
|
||||
for item in root.findall(".//item"):
|
||||
title = item.findtext("title", "")
|
||||
enclosure = item.find("enclosure")
|
||||
audio_url = enclosure.get("url", "") if enclosure is not None else ""
|
||||
duration = item.findtext("itunes:duration", "", ns)
|
||||
ep_num = item.findtext("itunes:episode", "", ns)
|
||||
pub_date = item.findtext("pubDate", "")
|
||||
|
||||
if not audio_url:
|
||||
continue
|
||||
|
||||
episodes.append({
|
||||
"title": title,
|
||||
"audio_url": audio_url,
|
||||
"duration": duration,
|
||||
"episode_number": int(ep_num) if ep_num and ep_num.isdigit() else None,
|
||||
"pub_date": pub_date,
|
||||
})
|
||||
|
||||
return episodes
|
||||
|
||||
|
||||
def pick_episode(episodes: list[dict]) -> dict:
|
||||
"""Display episode list and let user pick one."""
|
||||
if not episodes:
|
||||
print("No episodes found.")
|
||||
sys.exit(1)
|
||||
|
||||
# Sort by episode number (episodes without numbers go to the end)
|
||||
episodes.sort(key=lambda e: (e["episode_number"] is None, e["episode_number"] or 0))
|
||||
|
||||
print(f"\nFound {len(episodes)} episodes:\n")
|
||||
for ep in episodes:
|
||||
num = ep['episode_number']
|
||||
label = f"Ep{num}" if num else " "
|
||||
dur = ep['duration'] or "?"
|
||||
display_num = f"{num:>2}" if num else " ?"
|
||||
print(f" {display_num}. [{label:>4}] {ep['title']} ({dur})")
|
||||
|
||||
print()
|
||||
while True:
|
||||
try:
|
||||
choice = input("Select episode number (or 'q' to quit): ").strip()
|
||||
if choice.lower() == 'q':
|
||||
sys.exit(0)
|
||||
num = int(choice)
|
||||
# Match by episode number first
|
||||
match = next((ep for ep in episodes if ep["episode_number"] == num), None)
|
||||
if match:
|
||||
return match
|
||||
print(f" No episode #{num} found. Episodes: {', '.join(str(e['episode_number']) for e in episodes if e['episode_number'])}")
|
||||
except (ValueError, EOFError):
|
||||
print(" Enter an episode number")
|
||||
|
||||
|
||||
def download_episode(episode: dict) -> Path:
|
||||
"""Download episode audio, using a cache to avoid re-downloading."""
|
||||
EPISODE_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Build a filename from episode number or title slug
|
||||
if episode["episode_number"]:
|
||||
filename = f"episode-{episode['episode_number']}.mp3"
|
||||
else:
|
||||
filename = slugify(episode["title"]) + ".mp3"
|
||||
|
||||
cached = EPISODE_CACHE_DIR / filename
|
||||
if cached.exists():
|
||||
size_mb = cached.stat().st_size / (1024 * 1024)
|
||||
print(f"Using cached: {cached.name} ({size_mb:.1f} MB)")
|
||||
return cached
|
||||
|
||||
print(f"Downloading: {episode['title']}...")
|
||||
try:
|
||||
resp = requests.get(episode["audio_url"], stream=True, timeout=30)
|
||||
resp.raise_for_status()
|
||||
total = int(resp.headers.get("content-length", 0))
|
||||
downloaded = 0
|
||||
with open(cached, "wb") as f:
|
||||
for chunk in resp.iter_content(chunk_size=1024 * 1024):
|
||||
f.write(chunk)
|
||||
downloaded += len(chunk)
|
||||
if total:
|
||||
pct = downloaded / total * 100
|
||||
print(f"\r {downloaded / (1024*1024):.1f} / {total / (1024*1024):.1f} MB ({pct:.0f}%)", end="", flush=True)
|
||||
else:
|
||||
print(f"\r {downloaded / (1024*1024):.1f} MB", end="", flush=True)
|
||||
print()
|
||||
except requests.RequestException as e:
|
||||
if cached.exists():
|
||||
cached.unlink()
|
||||
print(f"\nError downloading episode: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
size_mb = cached.stat().st_size / (1024 * 1024)
|
||||
print(f"Saved: {cached.name} ({size_mb:.1f} MB)")
|
||||
return cached
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Extract short-form clips from podcast episodes")
|
||||
parser.add_argument("audio_file", help="Path to episode MP3")
|
||||
parser.add_argument("audio_file", nargs="?", help="Path to episode MP3 (optional if using --pick)")
|
||||
parser.add_argument("--pick", "-p", action="store_true",
|
||||
help="Pick an episode from Castopod to clip")
|
||||
parser.add_argument("--transcript", "-t", help="Path to labeled transcript (.txt)")
|
||||
parser.add_argument("--chapters", "-c", help="Path to chapters JSON")
|
||||
parser.add_argument("--count", "-n", type=int, default=3, help="Number of clips to extract (default: 3)")
|
||||
@@ -911,13 +1169,27 @@ def main():
|
||||
help="Use quality model for everything (slower, no two-pass)")
|
||||
args = parser.parse_args()
|
||||
|
||||
audio_path = Path(args.audio_file).expanduser().resolve()
|
||||
if not audio_path.exists():
|
||||
print(f"Error: Audio file not found: {audio_path}")
|
||||
sys.exit(1)
|
||||
# Default to --pick when no audio file provided
|
||||
if not args.audio_file and not args.pick:
|
||||
args.pick = True
|
||||
|
||||
if args.pick:
|
||||
episodes = fetch_episodes()
|
||||
selected = pick_episode(episodes)
|
||||
audio_path = download_episode(selected)
|
||||
episode_number = selected["episode_number"] or args.episode_number
|
||||
else:
|
||||
audio_path = Path(args.audio_file).expanduser().resolve()
|
||||
if not audio_path.exists():
|
||||
print(f"Error: Audio file not found: {audio_path}")
|
||||
sys.exit(1)
|
||||
episode_number = None
|
||||
|
||||
# Detect episode number
|
||||
episode_number = args.episode_number or detect_episode_number(str(audio_path))
|
||||
if not args.pick:
|
||||
episode_number = args.episode_number or detect_episode_number(str(audio_path))
|
||||
if args.episode_number:
|
||||
episode_number = args.episode_number
|
||||
|
||||
# Resolve output directory
|
||||
if args.output_dir:
|
||||
@@ -959,9 +1231,9 @@ def main():
|
||||
# Step 2: Fast transcription for clip identification
|
||||
two_pass = not args.single_pass and args.fast_model != args.quality_model
|
||||
if two_pass:
|
||||
print(f"\n[2/6] Fast transcription for clip identification ({args.fast_model})...")
|
||||
print(f"\n[2/7] Fast transcription for clip identification ({args.fast_model})...")
|
||||
else:
|
||||
print(f"\n[2/5] Transcribing with word-level timestamps ({args.quality_model})...")
|
||||
print(f"\n[2/6] Transcribing with word-level timestamps ({args.quality_model})...")
|
||||
identify_model = args.fast_model if two_pass else args.quality_model
|
||||
segments = transcribe_with_timestamps(
|
||||
str(audio_path), identify_model, labeled_transcript
|
||||
@@ -980,7 +1252,7 @@ def main():
|
||||
print(f" Chapters loaded: {chapters_path.name}")
|
||||
|
||||
# Step 3: LLM selects best moments
|
||||
step_total = 6 if two_pass else 5
|
||||
step_total = 7 if two_pass else 6
|
||||
print(f"\n[3/{step_total}] Selecting {args.count} best moments with LLM...")
|
||||
clips = select_clips_with_llm(transcript_text, labeled_transcript,
|
||||
chapters_json, args.count)
|
||||
@@ -994,10 +1266,19 @@ def main():
|
||||
f"({clip['start_time']:.1f}s - {clip['end_time']:.1f}s, {duration:.0f}s)")
|
||||
print(f" \"{clip['caption_text']}\"")
|
||||
|
||||
# Step 4: Refine clip timestamps with quality model (two-pass only)
|
||||
# Generate social media metadata
|
||||
meta_step = 4
|
||||
print(f"\n[{meta_step}/{step_total}] Generating social media descriptions...")
|
||||
clips = generate_social_metadata(clips, labeled_transcript, episode_number)
|
||||
for i, clip in enumerate(clips):
|
||||
if "description" in clip:
|
||||
print(f" Clip {i+1}: {clip['description'][:80]}...")
|
||||
print(f" {' '.join(clip.get('hashtags', []))}")
|
||||
|
||||
# Step 5: Refine clip timestamps with quality model (two-pass only)
|
||||
refined = {}
|
||||
if two_pass:
|
||||
print(f"\n[4/{step_total}] Refining clips with {args.quality_model}...")
|
||||
print(f"\n[5/{step_total}] Refining clips with {args.quality_model}...")
|
||||
refined = refine_clip_timestamps(
|
||||
str(audio_path), clips, args.quality_model, labeled_transcript
|
||||
)
|
||||
@@ -1008,7 +1289,7 @@ def main():
|
||||
clips[i:i+1] = snap_to_sentences([clip], clip_segments)
|
||||
|
||||
# Step N: Extract audio clips
|
||||
extract_step = 5 if two_pass else 4
|
||||
extract_step = 6 if two_pass else 5
|
||||
print(f"\n[{extract_step}/{step_total}] Extracting audio clips...")
|
||||
for i, clip in enumerate(clips):
|
||||
slug = slugify(clip["title"])
|
||||
@@ -1020,7 +1301,7 @@ def main():
|
||||
else:
|
||||
print(f" Error extracting clip {i+1} audio")
|
||||
|
||||
video_step = 6 if two_pass else 5
|
||||
video_step = 7 if two_pass else 6
|
||||
if args.audio_only:
|
||||
print(f"\n[{video_step}/{step_total}] Skipped video generation (--audio-only)")
|
||||
print(f"\nDone! {len(clips)} audio clips saved to {output_dir}")
|
||||
@@ -1071,6 +1352,27 @@ def main():
|
||||
else:
|
||||
print(f" Error generating clip {i+1} video")
|
||||
|
||||
# Save clips metadata for social upload
|
||||
metadata_path = output_dir / "clips-metadata.json"
|
||||
metadata = []
|
||||
for i, clip in enumerate(clips):
|
||||
slug = slugify(clip["title"])
|
||||
metadata.append({
|
||||
"title": clip["title"],
|
||||
"clip_file": f"clip-{i+1}-{slug}.mp4",
|
||||
"audio_file": f"clip-{i+1}-{slug}.mp3",
|
||||
"caption_text": clip.get("caption_text", ""),
|
||||
"description": clip.get("description", ""),
|
||||
"hashtags": clip.get("hashtags", []),
|
||||
"start_time": clip["start_time"],
|
||||
"end_time": clip["end_time"],
|
||||
"duration": round(clip["end_time"] - clip["start_time"], 1),
|
||||
"episode_number": episode_number,
|
||||
})
|
||||
with open(metadata_path, "w") as f:
|
||||
json.dump(metadata, f, indent=2)
|
||||
print(f"\nSocial metadata: {metadata_path}")
|
||||
|
||||
# Summary
|
||||
print(f"\nDone! {len(clips)} clips saved to {output_dir}")
|
||||
for i, clip in enumerate(clips):
|
||||
|
||||
Reference in New Issue
Block a user