Files
ai-podcast/analyze_gaps.py
luke 5d8ab57e20 Show theme feature, Irish music genre, strip silence overhaul
- Add show theme UI in header bar + backend API (inject into caller prompts)
- Add Irish genre category for music dropdown
- Strip silence: RMS-based speaker detection (fixes Devon not being identified)
- Strip silence: Devon-specific 3s threshold for interjections
- Strip silence: sparse track item handling in shift logic
- Strip silence: music lead-in preservation after silence removal
- Strip silence: no max gap limit (IDENT/AD regions protect breaks)
- Add analyze_gaps.py tool for per-show threshold analysis

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 03:30:15 -06:00

261 lines
9.6 KiB
Python

#!/usr/bin/env python3
"""Analyze silence gaps in podcast stems to find optimal strip-silence thresholds.
Usage: python analyze_gaps.py recordings/2026-03-17_235137/
"""
import sys
import numpy as np
import soundfile as sf
from pathlib import Path
BLOCK_SEC = 0.1
SILENCE_DB = -30
THRESHOLD = 10 ** (SILENCE_DB / 20)
MIN_VOICE_SEC = 0.3
def load_stem(path: Path) -> tuple[np.ndarray, int]:
audio, sr = sf.read(path, dtype="float32")
if audio.ndim > 1:
audio = audio[:, 0]
return audio, sr
def compute_rms_blocks(audio: np.ndarray, sr: int) -> np.ndarray:
block_samples = int(sr * BLOCK_SEC)
n_blocks = len(audio) // block_samples
if n_blocks == 0:
return np.array([0.0])
trimmed = audio[:n_blocks * block_samples].reshape(n_blocks, block_samples)
return np.sqrt(np.mean(trimmed ** 2, axis=1))
def compute_peak_blocks(audio: np.ndarray, sr: int) -> np.ndarray:
block_samples = int(sr * BLOCK_SEC)
n_blocks = len(audio) // block_samples
if n_blocks == 0:
return np.array([0.0])
trimmed = audio[:n_blocks * block_samples].reshape(n_blocks, block_samples)
return np.max(np.abs(trimmed), axis=1)
def analyze(stems_dir: Path):
stems_dir = Path(stems_dir)
voice_stems = {}
for name in ["host", "devon", "caller"]:
path = stems_dir / f"{name}.wav"
if path.exists():
print(f"Loading {name}...", end=" ", flush=True)
audio, sr = load_stem(path)
voice_stems[name] = audio
print(f"{len(audio)/sr:.0f}s @ {sr}Hz")
if not voice_stems:
print("No voice stems found")
return
sr_val = sr
duration = max(len(a) for a in voice_stems.values()) / sr_val
print(f"\nTotal duration: {duration/60:.1f} min")
# Compute per-track RMS and peak blocks
track_rms = {}
track_peak = {}
for name, audio in voice_stems.items():
track_rms[name] = compute_rms_blocks(audio, sr_val)
track_peak[name] = compute_peak_blocks(audio, sr_val)
n_blocks = min(len(v) for v in track_peak.values())
# Detect gaps using same logic as Lua script (RMS for speaker ID, peak for silence)
min_voice_blocks = int(MIN_VOICE_SEC / BLOCK_SEC)
track_names = list(voice_stems.keys())
gaps = []
in_silence = False
silence_start = 0
track_before = None
last_active = None
voice_run = 0
voice_run_track = None
for i in range(n_blocks):
# Peak for silence detection
best_peak = max(track_peak[name][i] for name in track_names)
# RMS for speaker identification
best_rms = 0
best_track = None
for name in track_names:
r = track_rms[name][i]
if r > best_rms:
best_rms = r
best_track = name
all_silent = best_peak < THRESHOLD
if not all_silent:
last_active = best_track
if in_silence:
if all_silent:
voice_run = 0
voice_run_track = None
else:
if voice_run == 0:
voice_run_track = best_track
voice_run += 1
if voice_run >= min_voice_blocks:
voice_start_block = i - (voice_run - 1)
gap_start = silence_start * BLOCK_SEC
gap_end = voice_start_block * BLOCK_SEC
dur = gap_end - gap_start
if dur >= 0.5: # log gaps >= 0.5s
gaps.append({
"start": gap_start,
"end": gap_end,
"dur": dur,
"before": track_before or "?",
"after": voice_run_track or "?",
})
in_silence = False
voice_run = 0
voice_run_track = None
else:
if all_silent:
in_silence = True
silence_start = i
track_before = last_active
voice_run = 0
voice_run_track = None
# Trailing silence
if in_silence:
dur = (n_blocks - silence_start) * BLOCK_SEC
if dur >= 0.5:
gaps.append({
"start": silence_start * BLOCK_SEC,
"end": n_blocks * BLOCK_SEC,
"dur": dur,
"before": track_before or "?",
"after": "end",
})
if not gaps:
print("No gaps detected")
return
# Categorize gaps
categories = {
"host_self": [], # Host -> Host
"host_to_caller": [], # Host -> Caller (TTS latency)
"caller_to_host": [], # Caller -> Host
"host_to_devon": [], # Host -> Devon (TTS latency)
"devon_to_host": [], # Devon -> Host
"caller_to_devon": [],# Caller -> Devon (interjection)
"devon_to_caller": [],# Devon -> Caller
"other": [],
}
for g in gaps:
b, a = g["before"], g["after"]
if b == "host" and a == "host":
categories["host_self"].append(g)
elif b == "host" and a == "caller":
categories["host_to_caller"].append(g)
elif b == "caller" and a == "host":
categories["caller_to_host"].append(g)
elif b == "host" and a == "devon":
categories["host_to_devon"].append(g)
elif b == "devon" and a == "host":
categories["devon_to_host"].append(g)
elif b == "caller" and a == "devon":
categories["caller_to_devon"].append(g)
elif b == "devon" and a == "caller":
categories["devon_to_caller"].append(g)
else:
categories["other"].append(g)
# Print results
print(f"\n{'='*70}")
print(f"GAP ANALYSIS — {len(gaps)} gaps detected")
print(f"{'='*70}")
total_silence = sum(g["dur"] for g in gaps)
print(f"Total silence: {total_silence:.0f}s ({total_silence/60:.1f} min)")
print(f"Content after removal: ~{(duration - total_silence)/60:.1f} min")
for cat_name, cat_gaps in sorted(categories.items(), key=lambda x: -len(x[1])):
if not cat_gaps:
continue
durs = sorted([g["dur"] for g in cat_gaps])
print(f"\n--- {cat_name} ({len(cat_gaps)} gaps) ---")
print(f" Range: {durs[0]:.1f}s - {durs[-1]:.1f}s")
print(f" Median: {np.median(durs):.1f}s Mean: {np.mean(durs):.1f}s")
if len(durs) >= 5:
print(f" P25: {np.percentile(durs, 25):.1f}s P75: {np.percentile(durs, 75):.1f}s")
# Histogram
brackets = [(0, 1), (1, 2), (2, 3), (3, 5), (5, 8), (8, 12), (12, 18), (18, 30), (30, 60), (60, 999)]
print(f" Distribution:")
for lo, hi in brackets:
count = sum(1 for d in durs if lo <= d < hi)
if count > 0:
bar = "#" * count
label = f"{lo}-{hi}s" if hi < 999 else f"{lo}s+"
print(f" {label:>8s}: {bar} ({count})")
# Find natural clusters and suggest thresholds
print(f"\n{'='*70}")
print("SUGGESTED THRESHOLDS")
print(f"{'='*70}")
# For each Devon-involved category, find the gap between interjection and TTS gaps
devon_gaps = categories["host_to_devon"] + categories["devon_to_host"] + categories["caller_to_devon"] + categories["devon_to_caller"]
if devon_gaps:
devon_durs = sorted([g["dur"] for g in devon_gaps])
# Look for a natural break between short (interjection) and long (TTS) gaps
short = [d for d in devon_durs if d < 5]
long = [d for d in devon_durs if d >= 5]
if short and long:
suggested = (max(short) + min(long)) / 2
print(f"Devon threshold: {suggested:.1f}s (short gaps: {len(short)} up to {max(short):.1f}s, long gaps: {len(long)} from {min(long):.1f}s)")
elif short:
print(f"Devon threshold: {max(short) + 1:.1f}s (all gaps are short, max {max(short):.1f}s)")
else:
print(f"Devon threshold: 3.0s (all gaps are long, min {min(long):.1f}s)")
caller_gaps = categories["host_to_caller"] + categories["caller_to_host"]
if caller_gaps:
caller_durs = sorted([g["dur"] for g in caller_gaps])
short = [d for d in caller_durs if d < 5]
long = [d for d in caller_durs if d >= 5]
if short and long:
suggested = (max(short) + min(long)) / 2
print(f"Caller transition threshold: {suggested:.1f}s (short: {len(short)} up to {max(short):.1f}s, long: {len(long)} from {min(long):.1f}s)")
elif long:
print(f"Caller transition threshold: {min(long) - 1:.1f}s (all gaps >= {min(long):.1f}s)")
host_self = categories["host_self"]
if host_self:
host_durs = sorted([g["dur"] for g in host_self])
short = [d for d in host_durs if d < 5]
long = [d for d in host_durs if d >= 5]
if short and long:
suggested = (max(short) + min(long)) / 2
print(f"Same-speaker threshold: {suggested:.1f}s (short: {len(short)} up to {max(short):.1f}s, long: {len(long)} from {min(long):.1f}s)")
elif long:
print(f"Same-speaker threshold: {min(long) - 1:.1f}s (all gaps >= {min(long):.1f}s)")
all_durs = sorted([g["dur"] for g in gaps])
would_cut = [d for d in all_durs if d >= 3.0]
print(f"\nWith current thresholds (Devon=3s, others=6s):")
print(f" Would cut: ~{len(would_cut)} gaps, ~{sum(would_cut):.0f}s ({sum(would_cut)/60:.1f} min)")
print(f" Result: ~{(duration - sum(would_cut))/60:.1f} min")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python analyze_gaps.py <stems_dir>")
sys.exit(1)
analyze(Path(sys.argv[1]))