Add direct YouTube upload to publish pipeline, publish ep14

Bypass flaky YouTube RSS ingestion by converting MP3+cover to MP4
and uploading via YouTube Data API. Videos are auto-added to the
podcast playlist. Includes yt_auth.py for token management.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-17 15:07:16 -07:00
parent 2b3551cada
commit b1bd4ed365
4 changed files with 602 additions and 1 deletions

View File

@@ -62,6 +62,15 @@ OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
WHISPER_MODEL = "distil-large-v3"
# YouTube
YT_CLIENT_SECRETS = Path(__file__).parent / "youtube_client_secrets.json"
YT_TOKEN_FILE = Path(__file__).parent / "youtube_token.json"
YT_SCOPES = [
"https://www.googleapis.com/auth/youtube.upload",
"https://www.googleapis.com/auth/youtube",
]
YT_PODCAST_PLAYLIST = "PLGq4uZyNV1yYH_rcitTTPVysPbC6-7pe-"
# Postiz (social media posting)
POSTIZ_URL = "https://social.lukeattheroost.com"
POSTIZ_JWT_SECRET = "9d499bab97b303506af6ae18b29a60e6b5a0b1049177f533232ad14dd9729814"
@@ -847,6 +856,136 @@ def post_to_social(metadata: dict, episode_slug: str, image_path: str = None):
print(f" Posted to {posted}/{len(POSTIZ_INTEGRATIONS)} channels")
def get_youtube_service():
"""Authenticate with YouTube API. Reuses saved token."""
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from googleapiclient.discovery import build as yt_build
creds = None
if YT_TOKEN_FILE.exists():
creds = Credentials.from_authorized_user_file(str(YT_TOKEN_FILE), YT_SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
with open(YT_TOKEN_FILE, "w") as f:
f.write(creds.to_json())
else:
print(" Warning: YouTube token missing or invalid. Run: python yt_auth.py")
return None
return yt_build("youtube", "v3", credentials=creds)
def upload_to_youtube(audio_path: str, metadata: dict, chapters: list,
episode_slug: str) -> str | None:
"""Convert audio to video with cover art, upload to YouTube, add to podcast playlist."""
import time as _time
import random
from googleapiclient.http import MediaFileUpload
from googleapiclient.errors import HttpError
youtube = get_youtube_service()
if not youtube:
return None
cover_art = Path(__file__).parent / "website" / "images" / "cover.png"
video_path = Path(audio_path).with_suffix(".yt.mp4")
# Convert MP3 + cover art to MP4
print(" Converting audio to video...")
result = subprocess.run([
"ffmpeg", "-y", "-loop", "1",
"-i", str(cover_art), "-i", audio_path,
"-c:v", "libx264", "-tune", "stillimage",
"-c:a", "aac", "-b:a", "192k",
"-pix_fmt", "yuv420p", "-shortest",
"-movflags", "+faststart", str(video_path)
], capture_output=True, text=True, timeout=600)
if result.returncode != 0:
print(f" Warning: ffmpeg failed: {result.stderr[-200:]}")
return None
# Build chapter timestamps for description
chapter_lines = []
for ch in chapters:
t = int(ch["startTime"])
h, m, s = t // 3600, (t % 3600) // 60, t % 60
ts = f"{h}:{m:02d}:{s:02d}" if h > 0 else f"{m}:{s:02d}"
chapter_lines.append(f"{ts} {ch['title']}")
episode_url = f"https://lukeattheroost.com/episode.html?slug={episode_slug}"
description = (
f"{metadata['description']}\n\n"
+ "\n".join(chapter_lines) + "\n\n"
f"Listen on your favorite podcast app: {episode_url}\n\n"
f"#podcast #LukeAtTheRoost #talkradio #callinshow"
)
body = {
"snippet": {
"title": metadata["title"][:100],
"description": description,
"tags": ["podcast", "Luke at the Roost", "talk radio", "call-in show",
"talk show", "comedy"],
"categoryId": "22",
},
"status": {
"privacyStatus": "public",
"selfDeclaredMadeForKids": False,
},
}
media = MediaFileUpload(
str(video_path), mimetype="video/mp4",
chunksize=5 * 1024 * 1024, resumable=True,
)
request = youtube.videos().insert(part="snippet,status", body=body, media_body=media)
file_mb = video_path.stat().st_size / 1_000_000
print(f" Uploading to YouTube ({file_mb:.0f} MB)...")
response = None
retry = 0
while response is None:
try:
status, response = request.next_chunk()
if status:
pct = int(status.progress() * 100)
if pct % 25 == 0:
print(f" Upload {pct}%...")
except HttpError as e:
if e.resp.status in (500, 502, 503, 504) and retry < 5:
retry += 1
wait = random.random() * (2 ** retry)
print(f" Retrying in {wait:.1f}s...")
_time.sleep(wait)
else:
print(f" YouTube API error: {e}")
video_path.unlink(missing_ok=True)
return None
video_id = response["id"]
video_path.unlink(missing_ok=True)
# Add to podcast playlist
try:
youtube.playlistItems().insert(part="snippet", body={
"snippet": {
"playlistId": YT_PODCAST_PLAYLIST,
"resourceId": {"kind": "youtube#video", "videoId": video_id},
}
}).execute()
print(f" Added to podcast playlist")
except HttpError as e:
print(f" Warning: Could not add to playlist: {e}")
print(f" Add manually in YouTube Studio")
print(f" https://youtube.com/watch?v={video_id}")
return video_id
def get_next_episode_number() -> int:
"""Get the next episode number from Castopod."""
headers = get_auth_header()
@@ -1041,7 +1180,13 @@ def main():
else:
print(f" Warning: Website deploy failed: {deploy_result.stderr[:200]}")
# Step 5.5: Generate social image and post
# Step 5.5: Upload to YouTube
print("[5.5] Uploading to YouTube...")
yt_video_id = upload_to_youtube(
str(audio_path), metadata, metadata["chapters"], episode["slug"]
)
# Step 5.7: Generate social image and post
social_image_path = str(audio_path.with_suffix(".social.jpg"))
generate_social_image(episode_number, metadata["description"], social_image_path)
post_to_social(metadata, episode["slug"], social_image_path)
@@ -1051,10 +1196,14 @@ def main():
print("=" * 50)
print(f"Episode URL: {CASTOPOD_URL}/@{PODCAST_HANDLE}/episodes/{episode['slug']}")
print(f"RSS Feed: {CASTOPOD_URL}/@{PODCAST_HANDLE}/feed.xml")
if yt_video_id:
print(f"YouTube: https://youtube.com/watch?v={yt_video_id}")
print("=" * 50)
if not chapters_uploaded:
print("\nNote: Chapters upload failed. Add manually via Castopod admin UI")
print(f" Chapters file: {chapters_path}")
if not yt_video_id:
print("\nNote: YouTube upload failed. Run 'python yt_auth.py' if token expired")
if __name__ == "__main__":