- make_clips.py: Extract best moments from episodes as short-form clips (9:16 vertical MP4 with captions for TikTok/Shorts/Reels) - deploy_stats_cron.sh: Deploy podcast_stats.py to NAS as Docker container running hourly with auto-restart - podcast_stats.py: Add _find_ytdlp() for Docker compatibility, auto-detect local Docker for Castopod DB queries - publish_episode.py: Upgrade Whisper model from base to large-v3 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
437 lines
15 KiB
Python
437 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Podcast Stats — Aggregate reviews, comments, likes, and analytics from all platforms.
|
|
|
|
Usage:
|
|
python podcast_stats.py # All platforms
|
|
python podcast_stats.py --youtube # YouTube only
|
|
python podcast_stats.py --apple # Apple Podcasts only
|
|
python podcast_stats.py --spotify # Spotify only
|
|
python podcast_stats.py --castopod # Castopod downloads only
|
|
python podcast_stats.py --comments # Include full YouTube comments
|
|
python podcast_stats.py --json # Output as JSON
|
|
python podcast_stats.py --json --upload # Output JSON and upload to BunnyCDN
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
|
|
import requests
|
|
|
|
YOUTUBE_PLAYLIST = "PLGq4uZyNV1yYH_rcitTTPVysPbC6-7pe-"
|
|
APPLE_PODCAST_ID = "1875205848"
|
|
APPLE_STOREFRONTS = ["us", "gb", "ca", "au"]
|
|
SPOTIFY_SHOW_ID = "0ZrpMigG1fo0CCN7F4YmuF"
|
|
NAS_SSH = "luke@mmgnas-10g"
|
|
NAS_SSH_PORT = "8001"
|
|
DOCKER_BIN = "/share/CACHEDEV1_DATA/.qpkg/container-station/bin/docker"
|
|
CASTOPOD_DB_CONTAINER = "castopod-mariadb-1"
|
|
|
|
BUNNY_STORAGE_ZONE = "lukeattheroost"
|
|
BUNNY_STORAGE_KEY = "92749cd3-85df-4cff-938fe35eb994-30f8-4cf2"
|
|
BUNNY_STORAGE_REGION = "la"
|
|
BUNNY_ACCOUNT_KEY = "2865f279-297b-431a-ad18-0ccf1f8e4fa8cf636cea-3222-415a-84ed-56ee195c0530"
|
|
|
|
|
|
def _find_ytdlp():
|
|
"""Find yt-dlp: check local venv first, then fall back to PATH."""
|
|
import shutil
|
|
venv_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "venv", "bin", "yt-dlp")
|
|
if os.path.exists(venv_path):
|
|
return venv_path
|
|
path_bin = shutil.which("yt-dlp")
|
|
if path_bin:
|
|
return path_bin
|
|
return "yt-dlp"
|
|
|
|
|
|
def gather_apple_reviews():
|
|
all_reviews = []
|
|
seen_ids = set()
|
|
|
|
for storefront in APPLE_STOREFRONTS:
|
|
url = f"https://itunes.apple.com/{storefront}/rss/customerreviews/id={APPLE_PODCAST_ID}/sortby=mostrecent/json"
|
|
try:
|
|
resp = requests.get(url, timeout=15)
|
|
if resp.status_code != 200:
|
|
continue
|
|
data = resp.json()
|
|
except Exception:
|
|
continue
|
|
|
|
feed = data.get("feed", {})
|
|
entries = feed.get("entry", [])
|
|
if not entries:
|
|
continue
|
|
|
|
for entry in entries:
|
|
if "im:name" in entry and "im:rating" not in entry:
|
|
continue
|
|
|
|
review_id = entry.get("id", {}).get("label", "")
|
|
if review_id in seen_ids:
|
|
continue
|
|
seen_ids.add(review_id)
|
|
|
|
author = entry.get("author", {}).get("name", {}).get("label", "Unknown")
|
|
title = entry.get("title", {}).get("label", "")
|
|
content = entry.get("content", {}).get("label", "")
|
|
rating = int(entry.get("im:rating", {}).get("label", "0"))
|
|
updated = entry.get("updated", {}).get("label", "")
|
|
date_str = updated[:10] if updated else ""
|
|
|
|
all_reviews.append({
|
|
"author": author,
|
|
"title": title,
|
|
"content": content,
|
|
"rating": rating,
|
|
"date": date_str,
|
|
"storefront": storefront.upper(),
|
|
})
|
|
|
|
avg_rating = round(sum(r["rating"] for r in all_reviews) / len(all_reviews), 1) if all_reviews else None
|
|
return {
|
|
"avg_rating": avg_rating,
|
|
"review_count": len(all_reviews),
|
|
"reviews": all_reviews[:10],
|
|
}
|
|
|
|
|
|
def gather_spotify():
|
|
result = {"show_title": None, "rating": None, "url": f"https://open.spotify.com/show/{SPOTIFY_SHOW_ID}"}
|
|
|
|
try:
|
|
oembed_url = f"https://open.spotify.com/oembed?url=https://open.spotify.com/show/{SPOTIFY_SHOW_ID}"
|
|
resp = requests.get(oembed_url, timeout=15)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
result["show_title"] = data.get("title")
|
|
|
|
show_url = f"https://open.spotify.com/show/{SPOTIFY_SHOW_ID}"
|
|
resp = requests.get(show_url, timeout=15, headers={
|
|
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)"
|
|
})
|
|
|
|
rating_match = re.search(r'"ratingValue"\s*:\s*"?([\d.]+)"?', resp.text)
|
|
if rating_match:
|
|
result["rating"] = float(rating_match.group(1))
|
|
else:
|
|
rating_match2 = re.search(r'rating["\s:]*(\d+\.?\d*)\s*/\s*5', resp.text, re.IGNORECASE)
|
|
if rating_match2:
|
|
result["rating"] = float(rating_match2.group(1))
|
|
except Exception:
|
|
pass
|
|
|
|
return result
|
|
|
|
|
|
def gather_youtube(include_comments=False):
|
|
result = {
|
|
"total_views": 0,
|
|
"total_likes": 0,
|
|
"total_comments": 0,
|
|
"subscribers": None,
|
|
"videos": [],
|
|
}
|
|
|
|
try:
|
|
proc = subprocess.run(
|
|
[_find_ytdlp(), "--dump-json", "--flat-playlist",
|
|
f"https://www.youtube.com/playlist?list={YOUTUBE_PLAYLIST}"],
|
|
capture_output=True, text=True, timeout=60
|
|
)
|
|
if proc.returncode != 0:
|
|
return result
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
return result
|
|
|
|
video_ids = []
|
|
for line in proc.stdout.strip().split("\n"):
|
|
if not line:
|
|
continue
|
|
try:
|
|
entry = json.loads(line)
|
|
vid = entry.get("id") or entry.get("url", "").split("=")[-1]
|
|
if vid:
|
|
video_ids.append(vid)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if not video_ids:
|
|
return result
|
|
|
|
total_views = 0
|
|
total_likes = 0
|
|
total_comments = 0
|
|
videos = []
|
|
|
|
for vid in video_ids:
|
|
try:
|
|
cmd = [_find_ytdlp(), "--dump-json", "--no-download", f"https://www.youtube.com/watch?v={vid}"]
|
|
if include_comments:
|
|
cmd.insert(2, "--write-comments")
|
|
vr = subprocess.run(cmd, capture_output=True, text=True, timeout=90)
|
|
if vr.returncode != 0:
|
|
continue
|
|
vdata = json.loads(vr.stdout)
|
|
|
|
title = vdata.get("title", "Unknown")
|
|
views = vdata.get("view_count", 0) or 0
|
|
likes = vdata.get("like_count", 0) or 0
|
|
comment_count = vdata.get("comment_count", 0) or 0
|
|
upload_date = vdata.get("upload_date", "")
|
|
if upload_date:
|
|
upload_date = f"{upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:]}"
|
|
|
|
comments_list = []
|
|
if include_comments:
|
|
for c in (vdata.get("comments") or [])[:5]:
|
|
comments_list.append({
|
|
"author": c.get("author", "Unknown"),
|
|
"text": c.get("text", "")[:200],
|
|
"time": c.get("time_text", ""),
|
|
"likes": c.get("like_count", 0),
|
|
})
|
|
|
|
total_views += views
|
|
total_likes += likes
|
|
total_comments += comment_count
|
|
|
|
videos.append({
|
|
"title": title,
|
|
"views": views,
|
|
"likes": likes,
|
|
"comments": comment_count,
|
|
"date": upload_date,
|
|
})
|
|
except (subprocess.TimeoutExpired, json.JSONDecodeError):
|
|
continue
|
|
|
|
# Get subscriber count
|
|
if videos:
|
|
try:
|
|
vr = subprocess.run(
|
|
[_find_ytdlp(), "--dump-json", "--no-download", "--playlist-items", "1",
|
|
f"https://www.youtube.com/playlist?list={YOUTUBE_PLAYLIST}"],
|
|
capture_output=True, text=True, timeout=30
|
|
)
|
|
if vr.returncode == 0:
|
|
ch_data = json.loads(vr.stdout)
|
|
sub = ch_data.get("channel_follower_count")
|
|
if sub is not None:
|
|
result["subscribers"] = sub
|
|
except Exception:
|
|
pass
|
|
|
|
result["total_views"] = total_views
|
|
result["total_likes"] = total_likes
|
|
result["total_comments"] = total_comments
|
|
result["videos"] = videos
|
|
return result
|
|
|
|
|
|
def _run_db_query(sql):
|
|
# If running on NAS (docker socket available), exec directly
|
|
docker_bin = None
|
|
for path in [DOCKER_BIN, "/usr/bin/docker", "/usr/local/bin/docker"]:
|
|
if os.path.exists(path):
|
|
docker_bin = path
|
|
break
|
|
|
|
if docker_bin:
|
|
cmd = [docker_bin, "exec", "-i", CASTOPOD_DB_CONTAINER,
|
|
"mysql", "-u", "castopod", "-pBYtbFfk3ndeVabb26xb0UyKU", "castopod", "-N"]
|
|
else:
|
|
cmd = [
|
|
"ssh", "-p", NAS_SSH_PORT, NAS_SSH,
|
|
f"{DOCKER_BIN} exec -i {CASTOPOD_DB_CONTAINER} mysql -u castopod -pBYtbFfk3ndeVabb26xb0UyKU castopod -N"
|
|
]
|
|
try:
|
|
proc = subprocess.run(cmd, input=sql, capture_output=True, text=True, timeout=30)
|
|
stderr = proc.stderr.strip()
|
|
stdout = proc.stdout.strip()
|
|
if proc.returncode != 0 and not stdout:
|
|
return None, stderr
|
|
return stdout, None
|
|
except subprocess.TimeoutExpired:
|
|
return None, "timeout"
|
|
except Exception as e:
|
|
return None, str(e)
|
|
|
|
|
|
def gather_castopod():
|
|
result = {"total_downloads": 0, "unique_listeners": 0, "episodes": []}
|
|
|
|
query = (
|
|
"SELECT p.title, "
|
|
"(SELECT SUM(hits) FROM cp_analytics_podcasts WHERE podcast_id = p.id), "
|
|
"(SELECT SUM(unique_listeners) FROM cp_analytics_podcasts WHERE podcast_id = p.id) "
|
|
"FROM cp_podcasts p WHERE p.handle = 'LukeAtTheRoost' LIMIT 1;"
|
|
)
|
|
episode_query = (
|
|
"SELECT e.title, e.slug, COALESCE(SUM(ae.hits), 0), e.published_at "
|
|
"FROM cp_episodes e LEFT JOIN cp_analytics_podcasts_by_episode ae ON ae.episode_id = e.id "
|
|
"WHERE e.podcast_id = (SELECT id FROM cp_podcasts WHERE handle = 'LukeAtTheRoost') "
|
|
"GROUP BY e.id ORDER BY e.published_at DESC;"
|
|
)
|
|
|
|
out, err = _run_db_query(query)
|
|
if err or not out:
|
|
return result
|
|
|
|
parts = out.split("\t")
|
|
if len(parts) >= 3:
|
|
result["total_downloads"] = int(parts[1]) if parts[1] and parts[1] != "NULL" else 0
|
|
result["unique_listeners"] = int(parts[2]) if parts[2] and parts[2] != "NULL" else 0
|
|
elif len(parts) >= 2:
|
|
result["total_downloads"] = int(parts[1]) if parts[1] and parts[1] != "NULL" else 0
|
|
|
|
out, err = _run_db_query(episode_query)
|
|
if err or not out:
|
|
return result
|
|
|
|
for line in out.strip().split("\n"):
|
|
cols = line.split("\t")
|
|
if len(cols) >= 4:
|
|
result["episodes"].append({
|
|
"title": cols[0],
|
|
"downloads": int(cols[2]) if cols[2] else 0,
|
|
"date": cols[3][:10] if cols[3] else "",
|
|
})
|
|
|
|
return result
|
|
|
|
|
|
def print_apple(data):
|
|
print("\n⭐ APPLE PODCASTS")
|
|
print("─" * 40)
|
|
if data["reviews"]:
|
|
print(f" Rating: {data['avg_rating']}/5 ({data['review_count']} reviews)")
|
|
print()
|
|
for r in data["reviews"]:
|
|
stars = "★" * r["rating"] + "☆" * (5 - r["rating"])
|
|
print(f" {stars} \"{r['title']}\" — {r['author']} ({r['date']}, {r['storefront']})")
|
|
if r["content"] and r["content"] != r["title"]:
|
|
content_preview = r["content"][:120]
|
|
if len(r["content"]) > 120:
|
|
content_preview += "..."
|
|
print(f" {content_preview}")
|
|
else:
|
|
print(" No reviews found")
|
|
|
|
|
|
def print_spotify(data):
|
|
print("\n🎵 SPOTIFY")
|
|
print("─" * 40)
|
|
if data["show_title"]:
|
|
print(f" Show: {data['show_title']}")
|
|
if data["rating"]:
|
|
print(f" Rating: {data['rating']}/5")
|
|
else:
|
|
print(" Rating: Not publicly available (Spotify hides ratings from web)")
|
|
print(f" Link: {data['url']}")
|
|
|
|
|
|
def print_youtube(data):
|
|
print("\n📺 YOUTUBE")
|
|
print("─" * 40)
|
|
sub_str = f" | Subscribers: {data['subscribers']:,}" if data["subscribers"] else ""
|
|
print(f" Total views: {data['total_views']:,} | Likes: {data['total_likes']:,} | Comments: {data['total_comments']:,}{sub_str}")
|
|
print()
|
|
for v in data["videos"]:
|
|
print(f" {v['title']}")
|
|
print(f" {v['views']:,} views, {v['likes']:,} likes, {v['comments']:,} comments — {v['date']}")
|
|
|
|
|
|
def print_castopod(data):
|
|
print("\n📊 DOWNLOADS (Castopod)")
|
|
print("─" * 40)
|
|
print(f" Total downloads: {data['total_downloads']:,} | Unique listeners: {data['unique_listeners']:,}")
|
|
if data["episodes"]:
|
|
print()
|
|
for ep in data["episodes"]:
|
|
print(f" {ep['title']} — {ep['downloads']:,} downloads ({ep['date']})")
|
|
|
|
|
|
def upload_to_bunnycdn(json_data):
|
|
storage_url = f"https://{BUNNY_STORAGE_REGION}.storage.bunnycdn.com/{BUNNY_STORAGE_ZONE}/stats.json"
|
|
resp = requests.put(
|
|
storage_url,
|
|
data=json_data,
|
|
headers={
|
|
"AccessKey": BUNNY_STORAGE_KEY,
|
|
"Content-Type": "application/json",
|
|
},
|
|
timeout=30,
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
purge_url = "https://api.bunny.net/purge"
|
|
requests.post(
|
|
purge_url,
|
|
params={"url": "https://cdn.lukeattheroost.com/stats.json"},
|
|
headers={"AccessKey": BUNNY_ACCOUNT_KEY},
|
|
timeout=15,
|
|
)
|
|
print("Uploaded stats.json to BunnyCDN and purged cache", file=sys.stderr)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Podcast analytics aggregator")
|
|
parser.add_argument("--youtube", action="store_true", help="YouTube only")
|
|
parser.add_argument("--apple", action="store_true", help="Apple Podcasts only")
|
|
parser.add_argument("--spotify", action="store_true", help="Spotify only")
|
|
parser.add_argument("--castopod", action="store_true", help="Castopod only")
|
|
parser.add_argument("--comments", action="store_true", help="Include YouTube comments")
|
|
parser.add_argument("--json", dest="json_output", action="store_true", help="Output as JSON")
|
|
parser.add_argument("--upload", action="store_true", help="Upload JSON to BunnyCDN (requires --json)")
|
|
args = parser.parse_args()
|
|
|
|
if args.upload and not args.json_output:
|
|
print("Error: --upload requires --json", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
run_all = not (args.youtube or args.apple or args.spotify or args.castopod)
|
|
|
|
results = {}
|
|
if run_all or args.castopod:
|
|
results["castopod"] = gather_castopod()
|
|
if run_all or args.apple:
|
|
results["apple"] = gather_apple_reviews()
|
|
if run_all or args.spotify:
|
|
results["spotify"] = gather_spotify()
|
|
if run_all or args.youtube:
|
|
results["youtube"] = gather_youtube(include_comments=args.comments or args.youtube)
|
|
|
|
if args.json_output:
|
|
output = {
|
|
"updated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
**results,
|
|
}
|
|
json_str = json.dumps(output, indent=2, ensure_ascii=False)
|
|
print(json_str)
|
|
if args.upload:
|
|
upload_to_bunnycdn(json_str)
|
|
else:
|
|
print("=" * 45)
|
|
print(" PODCAST STATS: Luke at the Roost")
|
|
print("=" * 45)
|
|
if "castopod" in results:
|
|
print_castopod(results["castopod"])
|
|
if "apple" in results:
|
|
print_apple(results["apple"])
|
|
if "spotify" in results:
|
|
print_spotify(results["spotify"])
|
|
if "youtube" in results:
|
|
print_youtube(results["youtube"])
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|