Clip pipeline improvements, direct YouTube upload, hero redesign, how-it-works updates

- make_clips: migrate refine_clip_timestamps to mlx-whisper, add LLM caption
  polishing, fix speaker label reversal in grouped caption lines
- upload_clips: interactive episode/clip/platform menus, direct YouTube Shorts
  upload via Data API v3 (bypasses Postiz), direct Bluesky upload
- Website hero: centered layout with left-column cover art on desktop, compact
  text links instead of pill buttons, scaled up typography
- How-it-works: move anatomy section above diagram, update stats (320 names,
  189+ personality layers, 20 towns, 570+ topics, 1400+ scenarios), add
  drunk/high/unhinged callers, voicemails, MLX Whisper GPU, LLM-polished captions
- All footers: add System Status link, remove Ko-fi branding
- .gitignore: YouTube OAuth credential files

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-16 04:06:23 -07:00
parent 3164a70e48
commit f0271e61df
9 changed files with 591 additions and 266 deletions

238
upload_clips.py Normal file → Executable file
View File

@@ -1,12 +1,11 @@
#!/usr/bin/env python3
"""Upload podcast clips to social media via Postiz (and direct Bluesky via atproto).
"""Upload podcast clips to social media (direct YouTube & Bluesky, Postiz for others).
Usage:
python upload_clips.py clips/episode-12/
python upload_clips.py clips/episode-12/ --clip 1
python upload_clips.py clips/episode-12/ --platforms ig,yt
python upload_clips.py clips/episode-12/ --schedule "2026-02-16T10:00:00"
python upload_clips.py clips/episode-12/ --yes # skip confirmation
python upload_clips.py # interactive: pick episode, clips, platforms
python upload_clips.py clips/episode-12/ # pick clips and platforms interactively
python upload_clips.py clips/episode-12/ --clip 1 --platforms ig,yt
python upload_clips.py clips/episode-12/ --yes # skip all prompts, upload everything
"""
import argparse
@@ -27,6 +26,9 @@ POSTIZ_URL = os.getenv("POSTIZ_URL", "https://social.lukeattheroost.com")
BSKY_HANDLE = os.getenv("BSKY_HANDLE", "lukeattheroost.bsky.social")
BSKY_APP_PASSWORD = os.getenv("BSKY_APP_PASSWORD")
YT_CLIENT_SECRETS = Path(__file__).parent / "youtube_client_secrets.json"
YT_TOKEN_FILE = Path(__file__).parent / "youtube_token.json"
PLATFORM_ALIASES = {
"ig": "instagram", "insta": "instagram", "instagram": "instagram",
"yt": "youtube", "youtube": "youtube",
@@ -214,6 +216,106 @@ def post_to_bluesky(clip: dict, clip_file: Path) -> bool:
return True
def get_youtube_service():
"""Authenticate with YouTube API. First run opens a browser, then reuses saved token."""
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build as yt_build
scopes = ["https://www.googleapis.com/auth/youtube.upload"]
creds = None
if YT_TOKEN_FILE.exists():
creds = Credentials.from_authorized_user_file(str(YT_TOKEN_FILE), scopes)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
if not YT_CLIENT_SECRETS.exists():
print(" Error: youtube_client_secrets.json not found")
print(" Download OAuth2 Desktop App credentials from Google Cloud Console")
return None
flow = InstalledAppFlow.from_client_secrets_file(str(YT_CLIENT_SECRETS), scopes)
creds = flow.run_local_server(port=8090)
with open(YT_TOKEN_FILE, "w") as f:
f.write(creds.to_json())
return yt_build("youtube", "v3", credentials=creds)
def post_to_youtube(clip: dict, clip_file: Path) -> bool:
"""Upload a clip directly to YouTube Shorts via the Data API."""
import time
import random
from googleapiclient.http import MediaFileUpload
from googleapiclient.errors import HttpError
youtube = get_youtube_service()
if not youtube:
return False
title = clip["title"]
if "#Shorts" not in title:
title = f"{title} #Shorts"
description = build_content(clip, "youtube")
if "#Shorts" not in description:
description += "\n\n#Shorts"
tags = [h.lstrip("#") for h in clip.get("hashtags", [])]
if "Shorts" not in tags:
tags.insert(0, "Shorts")
body = {
"snippet": {
"title": title[:100],
"description": description,
"tags": tags,
"categoryId": "24", # Entertainment
},
"status": {
"privacyStatus": "public",
"selfDeclaredMadeForKids": False,
},
}
media = MediaFileUpload(
str(clip_file),
mimetype="video/mp4",
chunksize=256 * 1024,
resumable=True,
)
request = youtube.videos().insert(part="snippet,status", body=body, media_body=media)
file_size = clip_file.stat().st_size / 1_000_000
print(f" Uploading video ({file_size:.1f} MB)...")
response = None
retry = 0
while response is None:
try:
status, response = request.next_chunk()
if status:
print(f" Upload {int(status.progress() * 100)}%...")
except HttpError as e:
if e.resp.status in (500, 502, 503, 504) and retry < 5:
retry += 1
wait = random.random() * (2 ** retry)
print(f" Retrying in {wait:.1f}s...")
time.sleep(wait)
else:
print(f" YouTube API error: {e}")
return False
video_id = response["id"]
print(f" https://youtube.com/shorts/{video_id}")
return True
def create_post(integration_id: str, content: str, media: dict,
settings: dict, schedule: str | None = None) -> dict:
from datetime import datetime, timezone
@@ -253,7 +355,7 @@ def create_post(integration_id: str, content: str, media: dict,
def main():
valid_names = sorted(set(PLATFORM_ALIASES.keys()))
parser = argparse.ArgumentParser(description="Upload podcast clips to social media via Postiz")
parser.add_argument("clips_dir", help="Path to clips directory (e.g. clips/episode-12/)")
parser.add_argument("clips_dir", nargs="?", help="Path to clips directory (e.g. clips/episode-12/). If omitted, shows a picker.")
parser.add_argument("--clip", "-c", type=int, help="Upload only clip N (1-indexed)")
parser.add_argument("--platforms", "-p",
help=f"Comma-separated platforms ({','.join(ALL_PLATFORMS)}). Default: all")
@@ -266,6 +368,75 @@ def main():
print("Error: POSTIZ_API_KEY not set in .env")
sys.exit(1)
# Resolve clips directory — pick interactively if not provided
if args.clips_dir:
clips_dir = Path(args.clips_dir).expanduser().resolve()
else:
clips_root = Path(__file__).parent / "clips"
episode_dirs = sorted(
[d for d in clips_root.iterdir()
if d.is_dir() and not d.name.startswith(".") and (d / "clips-metadata.json").exists()],
key=lambda d: d.name,
)
if not episode_dirs:
print("No clip directories found in clips/. Run make_clips.py first.")
sys.exit(1)
print("\nAvailable episodes:\n")
for i, d in enumerate(episode_dirs):
with open(d / "clips-metadata.json") as f:
meta = json.load(f)
print(f" {i+1}. {d.name} ({len(meta)} clip{'s' if len(meta) != 1 else ''})")
print()
while True:
try:
choice = input("Which episode? ").strip()
idx = int(choice) - 1
if 0 <= idx < len(episode_dirs):
clips_dir = episode_dirs[idx]
break
print(f" Enter 1-{len(episode_dirs)}")
except (ValueError, EOFError):
print(f" Enter an episode number")
metadata_path = clips_dir / "clips-metadata.json"
if not metadata_path.exists():
print(f"Error: No clips-metadata.json found in {clips_dir}")
print("Run make_clips.py first to generate clips and metadata.")
sys.exit(1)
with open(metadata_path) as f:
clips = json.load(f)
# Pick clips
if args.clip:
if args.clip < 1 or args.clip > len(clips):
print(f"Error: Clip {args.clip} not found (have {len(clips)} clips)")
sys.exit(1)
clips = [clips[args.clip - 1]]
elif not args.yes:
print(f"\nFound {len(clips)} clip(s):\n")
for i, clip in enumerate(clips):
desc = clip.get('description', clip.get('caption_text', ''))
if len(desc) > 70:
desc = desc[:desc.rfind(' ', 0, 70)] + '...'
print(f" {i+1}. \"{clip['title']}\" ({clip['duration']:.0f}s)")
print(f" {desc}")
print(f"\n a. All clips")
print()
while True:
choice = input("Which clips? (e.g. 1,3 or a for all): ").strip().lower()
if choice in ('a', 'all'):
break
try:
indices = [int(x.strip()) for x in choice.split(",")]
if all(1 <= x <= len(clips) for x in indices):
clips = [clips[x - 1] for x in indices]
break
print(f" Invalid selection. Enter 1-{len(clips)}, comma-separated, or 'a' for all.")
except (ValueError, EOFError):
print(f" Enter clip numbers (e.g. 1,3) or 'a' for all")
# Pick platforms
if args.platforms:
requested = []
for p in args.platforms.split(","):
@@ -276,28 +447,29 @@ def main():
sys.exit(1)
requested.append(PLATFORM_ALIASES[p])
target_platforms = list(dict.fromkeys(requested))
elif not args.yes:
print(f"\nPlatforms:\n")
for i, p in enumerate(ALL_PLATFORMS):
print(f" {i+1}. {PLATFORM_DISPLAY[p]}")
print(f"\n a. All platforms (default)")
print()
choice = input("Which platforms? (e.g. 1,3,5 or a for all) [a]: ").strip().lower()
if choice and choice not in ('a', 'all'):
try:
indices = [int(x.strip()) for x in choice.split(",")]
target_platforms = [ALL_PLATFORMS[x - 1] for x in indices if 1 <= x <= len(ALL_PLATFORMS)]
if not target_platforms:
target_platforms = ALL_PLATFORMS[:]
except (ValueError, IndexError):
target_platforms = ALL_PLATFORMS[:]
else:
target_platforms = ALL_PLATFORMS[:]
else:
target_platforms = ALL_PLATFORMS[:]
clips_dir = Path(args.clips_dir).expanduser().resolve()
metadata_path = clips_dir / "clips-metadata.json"
if not metadata_path.exists():
print(f"Error: No clips-metadata.json found in {clips_dir}")
print("Run make_clips.py first to generate clips and metadata.")
sys.exit(1)
with open(metadata_path) as f:
clips = json.load(f)
if args.clip:
if args.clip < 1 or args.clip > len(clips):
print(f"Error: Clip {args.clip} not found (have {len(clips)} clips)")
sys.exit(1)
clips = [clips[args.clip - 1]]
DIRECT_PLATFORMS = {"bluesky", "youtube"}
needs_postiz = not args.dry_run and any(
p != "bluesky" for p in target_platforms)
p not in DIRECT_PLATFORMS for p in target_platforms)
if needs_postiz:
print("Fetching connected accounts from Postiz...")
integrations = fetch_integrations()
@@ -312,6 +484,12 @@ def main():
else:
print("Warning: BSKY_APP_PASSWORD not set in .env, skipping Bluesky")
continue
if platform == "youtube":
if YT_CLIENT_SECRETS.exists() or YT_TOKEN_FILE.exists() or args.dry_run:
active_platforms[platform] = {"name": "YouTube Shorts", "_direct": True}
else:
print("Warning: youtube_client_secrets.json not found, skipping YouTube")
continue
if args.dry_run:
active_platforms[platform] = {"name": PLATFORM_DISPLAY[platform]}
continue
@@ -384,6 +562,16 @@ def main():
else:
print(f" {display}: Failed")
if "youtube" in active_platforms:
print(f" Posting to YouTube Shorts (direct)...")
try:
if post_to_youtube(clip, clip_file):
print(f" YouTube: Posted!")
else:
print(f" YouTube: Failed")
except Exception as e:
print(f" YouTube: Failed — {e}")
if "bluesky" in active_platforms:
print(f" Posting to Bluesky (direct)...")
try: