Initial commit: AI Radio Show web application

- FastAPI backend with multiple TTS providers (Inworld, ElevenLabs, Kokoro, F5-TTS, etc.)
- Web frontend with caller management, music, and soundboard
- Whisper transcription integration
- OpenRouter/Ollama LLM support
- Castopod podcast publishing script

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-04 23:11:20 -07:00
commit 029ce6d689
25 changed files with 6817 additions and 0 deletions

View File

@@ -0,0 +1 @@
# Services package

479
backend/services/audio.py Normal file
View File

@@ -0,0 +1,479 @@
"""Server-side audio service for Loopback routing"""
import sounddevice as sd
import numpy as np
import threading
import queue
import json
from pathlib import Path
from typing import Optional, Callable
import wave
import time
# Settings file path
SETTINGS_FILE = Path(__file__).parent.parent.parent / "audio_settings.json"
class AudioService:
"""Manages audio I/O with multi-channel support for Loopback routing"""
def __init__(self):
# Device configuration
self.input_device: Optional[int] = None
self.input_channel: int = 1 # 1-indexed channel
self.output_device: Optional[int] = None # Single output device (multi-channel)
self.caller_channel: int = 1 # Channel for caller TTS
self.music_channel: int = 2 # Channel for music
self.sfx_channel: int = 3 # Channel for SFX
self.phone_filter: bool = False # Phone filter on caller voices
# Recording state
self._recording = False
self._record_thread: Optional[threading.Thread] = None
self._audio_queue: queue.Queue = queue.Queue()
self._recorded_audio: list = []
self._record_device_sr: int = 48000
# Music playback state
self._music_stream: Optional[sd.OutputStream] = None
self._music_data: Optional[np.ndarray] = None
self._music_resampled: Optional[np.ndarray] = None
self._music_position: int = 0
self._music_playing: bool = False
self._music_volume: float = 0.3
self._music_loop: bool = True
# Caller playback state
self._caller_stop_event = threading.Event()
self._caller_thread: Optional[threading.Thread] = None
# Sample rates
self.input_sample_rate = 16000 # For Whisper
self.output_sample_rate = 24000 # For TTS
# Load saved settings
self._load_settings()
def _load_settings(self):
"""Load settings from disk"""
if SETTINGS_FILE.exists():
try:
with open(SETTINGS_FILE) as f:
data = json.load(f)
self.input_device = data.get("input_device")
self.input_channel = data.get("input_channel", 1)
self.output_device = data.get("output_device")
self.caller_channel = data.get("caller_channel", 1)
self.music_channel = data.get("music_channel", 2)
self.sfx_channel = data.get("sfx_channel", 3)
self.phone_filter = data.get("phone_filter", False)
print(f"Loaded audio settings: output={self.output_device}, channels={self.caller_channel}/{self.music_channel}/{self.sfx_channel}, phone_filter={self.phone_filter}")
except Exception as e:
print(f"Failed to load audio settings: {e}")
def _save_settings(self):
"""Save settings to disk"""
try:
data = {
"input_device": self.input_device,
"input_channel": self.input_channel,
"output_device": self.output_device,
"caller_channel": self.caller_channel,
"music_channel": self.music_channel,
"sfx_channel": self.sfx_channel,
"phone_filter": self.phone_filter,
}
with open(SETTINGS_FILE, "w") as f:
json.dump(data, f, indent=2)
print(f"Saved audio settings")
except Exception as e:
print(f"Failed to save audio settings: {e}")
def list_devices(self) -> list[dict]:
"""List all available audio devices"""
devices = sd.query_devices()
result = []
for i, d in enumerate(devices):
result.append({
"id": i,
"name": d["name"],
"inputs": d["max_input_channels"],
"outputs": d["max_output_channels"],
"default_sr": d["default_samplerate"]
})
return result
def set_devices(
self,
input_device: Optional[int] = None,
input_channel: Optional[int] = None,
output_device: Optional[int] = None,
caller_channel: Optional[int] = None,
music_channel: Optional[int] = None,
sfx_channel: Optional[int] = None,
phone_filter: Optional[bool] = None
):
"""Configure audio devices and channels"""
if input_device is not None:
self.input_device = input_device
if input_channel is not None:
self.input_channel = input_channel
if output_device is not None:
self.output_device = output_device
if caller_channel is not None:
self.caller_channel = caller_channel
if music_channel is not None:
self.music_channel = music_channel
if sfx_channel is not None:
self.sfx_channel = sfx_channel
if phone_filter is not None:
self.phone_filter = phone_filter
# Persist to disk
self._save_settings()
def get_device_settings(self) -> dict:
"""Get current device configuration"""
return {
"input_device": self.input_device,
"input_channel": self.input_channel,
"output_device": self.output_device,
"caller_channel": self.caller_channel,
"music_channel": self.music_channel,
"sfx_channel": self.sfx_channel,
"phone_filter": self.phone_filter,
}
# --- Recording ---
def start_recording(self) -> bool:
"""Start recording from input device"""
if self._recording:
return False
if self.input_device is None:
print("No input device configured")
return False
self._recording = True
self._recorded_audio = []
self._record_thread = threading.Thread(target=self._record_worker)
self._record_thread.start()
print(f"Recording started from device {self.input_device}")
return True
def stop_recording(self) -> bytes:
"""Stop recording and return audio data resampled to 16kHz for Whisper"""
import librosa
if not self._recording:
return b""
self._recording = False
if self._record_thread:
self._record_thread.join(timeout=2.0)
if not self._recorded_audio:
return b""
# Combine all chunks
audio = np.concatenate(self._recorded_audio)
device_sr = getattr(self, '_record_device_sr', 48000)
print(f"Recording stopped: {len(audio)} samples @ {device_sr}Hz ({len(audio)/device_sr:.2f}s)")
# Resample to 16kHz for Whisper
if device_sr != 16000:
audio = librosa.resample(audio, orig_sr=device_sr, target_sr=16000)
print(f"Resampled to 16kHz: {len(audio)} samples")
# Convert to bytes (16-bit PCM)
audio_int16 = (audio * 32767).astype(np.int16)
return audio_int16.tobytes()
def _record_worker(self):
"""Background thread for recording from specific channel"""
try:
# Get device info
device_info = sd.query_devices(self.input_device)
max_channels = device_info['max_input_channels']
device_sr = int(device_info['default_samplerate'])
record_channel = min(self.input_channel, max_channels) - 1
# Store device sample rate for later resampling
self._record_device_sr = device_sr
print(f"Recording from device {self.input_device} ch {self.input_channel} @ {device_sr}Hz")
def callback(indata, frames, time_info, status):
if status:
print(f"Record status: {status}")
if self._recording:
self._recorded_audio.append(indata[:, record_channel].copy())
with sd.InputStream(
device=self.input_device,
channels=max_channels,
samplerate=device_sr, # Use device's native rate
dtype=np.float32,
callback=callback,
blocksize=1024
):
while self._recording:
time.sleep(0.05)
except Exception as e:
print(f"Recording error: {e}")
self._recording = False
# --- Caller TTS Playback ---
def _apply_fade(self, audio: np.ndarray, sample_rate: int, fade_ms: int = 15) -> np.ndarray:
"""Apply fade-in and fade-out to avoid clicks"""
fade_samples = int(sample_rate * fade_ms / 1000)
if len(audio) < fade_samples * 2:
return audio
# Fade in
fade_in = np.linspace(0, 1, fade_samples)
audio[:fade_samples] *= fade_in
# Fade out
fade_out = np.linspace(1, 0, fade_samples)
audio[-fade_samples:] *= fade_out
return audio
def play_caller_audio(self, audio_bytes: bytes, sample_rate: int = 24000):
"""Play caller TTS audio to specific channel of output device (interruptible)"""
import librosa
# Stop any existing caller audio
self.stop_caller_audio()
self._caller_stop_event.clear()
# Convert bytes to numpy
audio = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0
if self.output_device is None:
print("No output device configured, using default")
audio = self._apply_fade(audio, sample_rate)
with sd.OutputStream(samplerate=sample_rate, channels=1, dtype=np.float32) as stream:
stream.write(audio.reshape(-1, 1))
return
try:
# Get device info and resample to device's native rate
device_info = sd.query_devices(self.output_device)
num_channels = device_info['max_output_channels']
device_sr = int(device_info['default_samplerate'])
channel_idx = min(self.caller_channel, num_channels) - 1
# Resample if needed
if sample_rate != device_sr:
audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=device_sr)
# Apply fade to prevent clicks
audio = self._apply_fade(audio, device_sr)
# Create multi-channel output with audio only on target channel
multi_ch = np.zeros((len(audio), num_channels), dtype=np.float32)
multi_ch[:, channel_idx] = audio
print(f"Playing caller audio to device {self.output_device} ch {self.caller_channel} @ {device_sr}Hz")
# Play in chunks so we can interrupt
chunk_size = int(device_sr * 0.1) # 100ms chunks
pos = 0
with sd.OutputStream(
device=self.output_device,
samplerate=device_sr,
channels=num_channels,
dtype=np.float32
) as stream:
while pos < len(multi_ch) and not self._caller_stop_event.is_set():
end = min(pos + chunk_size, len(multi_ch))
stream.write(multi_ch[pos:end])
pos = end
if self._caller_stop_event.is_set():
print("Caller audio stopped early")
else:
print(f"Played caller audio: {len(audio)/device_sr:.2f}s")
except Exception as e:
print(f"Caller playback error: {e}")
def stop_caller_audio(self):
"""Stop any playing caller audio"""
self._caller_stop_event.set()
# --- Music Playback ---
def load_music(self, file_path: str) -> bool:
"""Load a music file for playback"""
path = Path(file_path)
if not path.exists():
print(f"Music file not found: {file_path}")
return False
try:
import librosa
audio, sr = librosa.load(str(path), sr=self.output_sample_rate, mono=True)
self._music_data = audio.astype(np.float32)
self._music_position = 0
print(f"Loaded music: {path.name} ({len(audio)/sr:.1f}s)")
return True
except Exception as e:
print(f"Failed to load music: {e}")
return False
def play_music(self):
"""Start music playback to specific channel"""
import librosa
if self._music_data is None:
print("No music loaded")
return
if self._music_playing:
self.stop_music()
self._music_playing = True
self._music_position = 0
if self.output_device is None:
print("No output device configured, using default")
num_channels = 2
device = None
device_sr = self.output_sample_rate
channel_idx = 0
else:
device_info = sd.query_devices(self.output_device)
num_channels = device_info['max_output_channels']
device_sr = int(device_info['default_samplerate'])
device = self.output_device
channel_idx = min(self.music_channel, num_channels) - 1
# Resample music to device sample rate if needed
if self.output_sample_rate != device_sr:
self._music_resampled = librosa.resample(
self._music_data, orig_sr=self.output_sample_rate, target_sr=device_sr
)
else:
self._music_resampled = self._music_data.copy()
# Apply fade-in at start of track
fade_samples = int(device_sr * 0.015) # 15ms fade
if len(self._music_resampled) > fade_samples:
fade_in = np.linspace(0, 1, fade_samples).astype(np.float32)
self._music_resampled[:fade_samples] *= fade_in
def callback(outdata, frames, time_info, status):
outdata.fill(0)
if not self._music_playing or self._music_resampled is None:
return
end_pos = self._music_position + frames
if end_pos <= len(self._music_resampled):
outdata[:, channel_idx] = self._music_resampled[self._music_position:end_pos] * self._music_volume
self._music_position = end_pos
else:
remaining = len(self._music_resampled) - self._music_position
if remaining > 0:
outdata[:remaining, channel_idx] = self._music_resampled[self._music_position:] * self._music_volume
if self._music_loop:
self._music_position = 0
wrap_frames = frames - remaining
if wrap_frames > 0:
outdata[remaining:, channel_idx] = self._music_resampled[:wrap_frames] * self._music_volume
self._music_position = wrap_frames
else:
self._music_playing = False
try:
self._music_stream = sd.OutputStream(
device=device,
channels=num_channels,
samplerate=device_sr,
dtype=np.float32,
callback=callback,
blocksize=2048
)
self._music_stream.start()
print(f"Music playback started on ch {self.music_channel} @ {device_sr}Hz")
except Exception as e:
print(f"Music playback error: {e}")
self._music_playing = False
def stop_music(self):
"""Stop music playback"""
self._music_playing = False
if self._music_stream:
self._music_stream.stop()
self._music_stream.close()
self._music_stream = None
self._music_position = 0
print("Music stopped")
def set_music_volume(self, volume: float):
"""Set music volume (0.0 to 1.0)"""
self._music_volume = max(0.0, min(1.0, volume))
def is_music_playing(self) -> bool:
"""Check if music is currently playing"""
return self._music_playing
# --- SFX Playback ---
def play_sfx(self, file_path: str):
"""Play a sound effect to specific channel using dedicated stream"""
path = Path(file_path)
if not path.exists():
print(f"SFX file not found: {file_path}")
return
try:
import librosa
if self.output_device is None:
audio, sr = librosa.load(str(path), sr=None, mono=True)
audio = self._apply_fade(audio, sr)
def play():
# Use a dedicated stream instead of sd.play()
with sd.OutputStream(samplerate=sr, channels=1, dtype=np.float32) as stream:
stream.write(audio.reshape(-1, 1))
else:
device_info = sd.query_devices(self.output_device)
num_channels = device_info['max_output_channels']
device_sr = int(device_info['default_samplerate'])
channel_idx = min(self.sfx_channel, num_channels) - 1
audio, _ = librosa.load(str(path), sr=device_sr, mono=True)
audio = self._apply_fade(audio, device_sr)
multi_ch = np.zeros((len(audio), num_channels), dtype=np.float32)
multi_ch[:, channel_idx] = audio
def play():
# Use dedicated stream to avoid interrupting other audio
with sd.OutputStream(
device=self.output_device,
samplerate=device_sr,
channels=num_channels,
dtype=np.float32
) as stream:
stream.write(multi_ch)
threading.Thread(target=play, daemon=True).start()
print(f"Playing SFX: {path.name} on ch {self.sfx_channel}")
except Exception as e:
print(f"SFX playback error: {e}")
# Global instance
audio_service = AudioService()

View File

@@ -0,0 +1,112 @@
"""Edge TTS service - free Microsoft TTS API"""
import asyncio
import io
import numpy as np
from typing import Optional
try:
import edge_tts
EDGE_TTS_AVAILABLE = True
except ImportError:
EDGE_TTS_AVAILABLE = False
class EdgeTTSService:
"""TTS using Microsoft Edge's free API"""
def __init__(self):
self.sample_rate = 24000 # Edge TTS outputs 24kHz
def is_available(self) -> bool:
return EDGE_TTS_AVAILABLE
async def generate_speech(self, text: str, voice: str = "en-US-JennyNeural") -> bytes:
"""Generate speech from text using Edge TTS
Args:
text: Text to synthesize
voice: Edge TTS voice name (e.g., "en-US-JennyNeural")
Returns:
Raw PCM audio bytes (16-bit signed int, 24kHz mono)
"""
if not EDGE_TTS_AVAILABLE:
raise RuntimeError("edge-tts not installed. Run: pip install edge-tts")
communicate = edge_tts.Communicate(text, voice)
# Collect MP3 audio data
mp3_data = b''
async for chunk in communicate.stream():
if chunk['type'] == 'audio':
mp3_data += chunk['data']
if not mp3_data:
raise RuntimeError("No audio generated")
# Convert MP3 to PCM
pcm_data = await self._mp3_to_pcm(mp3_data)
return pcm_data
async def _mp3_to_pcm(self, mp3_data: bytes) -> bytes:
"""Convert MP3 to raw PCM using ffmpeg or pydub"""
loop = asyncio.get_event_loop()
def convert():
try:
# Try pydub first (more reliable)
from pydub import AudioSegment
audio = AudioSegment.from_mp3(io.BytesIO(mp3_data))
# Convert to 24kHz mono 16-bit
audio = audio.set_frame_rate(24000).set_channels(1).set_sample_width(2)
return audio.raw_data
except ImportError:
pass
# Fallback to ffmpeg subprocess
import subprocess
process = subprocess.Popen(
[
'ffmpeg', '-i', 'pipe:0',
'-f', 's16le',
'-acodec', 'pcm_s16le',
'-ar', '24000',
'-ac', '1',
'pipe:1'
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
pcm_data, stderr = process.communicate(input=mp3_data)
if process.returncode != 0:
raise RuntimeError(f"ffmpeg failed: {stderr.decode()}")
return pcm_data
return await loop.run_in_executor(None, convert)
async def list_voices(self) -> list[dict]:
"""List available Edge TTS voices"""
if not EDGE_TTS_AVAILABLE:
return []
voices = await edge_tts.list_voices()
return [
{
"id": v["ShortName"],
"name": v["ShortName"].replace("Neural", ""),
"gender": v["Gender"],
"locale": v["Locale"],
}
for v in voices
if v["Locale"].startswith("en-")
]
# Global instance
edge_tts_service = EdgeTTSService()
def is_edge_tts_available() -> bool:
return edge_tts_service.is_available()

175
backend/services/llm.py Normal file
View File

@@ -0,0 +1,175 @@
"""LLM service with OpenRouter and Ollama support"""
import httpx
from typing import Optional
from ..config import settings
# Available OpenRouter models
OPENROUTER_MODELS = [
"anthropic/claude-3-haiku",
"anthropic/claude-3.5-sonnet",
"openai/gpt-4o-mini",
"openai/gpt-4o",
"google/gemini-flash-1.5",
"google/gemini-pro-1.5",
"meta-llama/llama-3.1-8b-instruct",
"mistralai/mistral-7b-instruct",
]
class LLMService:
"""Abstraction layer for LLM providers"""
def __init__(self):
self.provider = settings.llm_provider
self.openrouter_model = settings.openrouter_model
self.ollama_model = settings.ollama_model
self.ollama_host = settings.ollama_host
self.tts_provider = settings.tts_provider
def update_settings(
self,
provider: Optional[str] = None,
openrouter_model: Optional[str] = None,
ollama_model: Optional[str] = None,
ollama_host: Optional[str] = None,
tts_provider: Optional[str] = None
):
"""Update LLM settings"""
if provider:
self.provider = provider
if openrouter_model:
self.openrouter_model = openrouter_model
if ollama_model:
self.ollama_model = ollama_model
if ollama_host:
self.ollama_host = ollama_host
if tts_provider:
self.tts_provider = tts_provider
# Also update the global settings so TTS service picks it up
settings.tts_provider = tts_provider
async def get_ollama_models(self) -> list[str]:
"""Fetch available models from Ollama"""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{self.ollama_host}/api/tags")
response.raise_for_status()
data = response.json()
return [model["name"] for model in data.get("models", [])]
except Exception as e:
print(f"Failed to fetch Ollama models: {e}")
return []
def get_settings(self) -> dict:
"""Get current settings (sync version without Ollama models)"""
return {
"provider": self.provider,
"openrouter_model": self.openrouter_model,
"ollama_model": self.ollama_model,
"ollama_host": self.ollama_host,
"tts_provider": self.tts_provider,
"available_openrouter_models": OPENROUTER_MODELS,
"available_ollama_models": [] # Fetched separately
}
async def get_settings_async(self) -> dict:
"""Get current settings with Ollama models"""
ollama_models = await self.get_ollama_models()
return {
"provider": self.provider,
"openrouter_model": self.openrouter_model,
"ollama_model": self.ollama_model,
"ollama_host": self.ollama_host,
"tts_provider": self.tts_provider,
"available_openrouter_models": OPENROUTER_MODELS,
"available_ollama_models": ollama_models
}
async def generate(
self,
messages: list[dict],
system_prompt: Optional[str] = None
) -> str:
"""
Generate a response from the LLM.
Args:
messages: List of message dicts with 'role' and 'content'
system_prompt: Optional system prompt to prepend
Returns:
Generated text response
"""
if system_prompt:
messages = [{"role": "system", "content": system_prompt}] + messages
if self.provider == "openrouter":
return await self._call_openrouter(messages)
else:
return await self._call_ollama(messages)
async def _call_openrouter(self, messages: list[dict]) -> str:
"""Call OpenRouter API with retry"""
for attempt in range(2): # Try twice
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {settings.openrouter_api_key}",
"Content-Type": "application/json",
},
json={
"model": self.openrouter_model,
"messages": messages,
"max_tokens": 100,
},
)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
except (httpx.TimeoutException, httpx.ReadTimeout):
print(f"OpenRouter timeout (attempt {attempt + 1})")
if attempt == 0:
continue # Retry once
return "Uh, sorry, I lost you there for a second. What was that?"
except Exception as e:
print(f"OpenRouter error: {e}")
return "Yeah... I don't know, man."
return "Uh, hold on a sec..."
async def _call_ollama(self, messages: list[dict]) -> str:
"""Call Ollama API"""
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.ollama_host}/api/chat",
json={
"model": self.ollama_model,
"messages": messages,
"stream": False,
"options": {
"num_predict": 100, # Allow complete thoughts
"temperature": 0.8, # Balanced creativity/coherence
"top_p": 0.9, # Focused word choices
"repeat_penalty": 1.3, # Avoid repetition
"top_k": 50, # Reasonable token variety
},
},
timeout=30.0
)
response.raise_for_status()
data = response.json()
return data["message"]["content"]
except httpx.TimeoutException:
print("Ollama timeout")
return "Uh, sorry, I lost you there for a second. What was that?"
except Exception as e:
print(f"Ollama error: {e}")
return "Yeah... I don't know, man."
# Global instance
llm_service = LLMService()

View File

@@ -0,0 +1,144 @@
"""Piper TTS service using sherpa-onnx for fast local voice synthesis"""
import asyncio
import numpy as np
from pathlib import Path
from typing import Optional
# Models directory
MODELS_DIR = Path(__file__).parent.parent.parent / "models" / "sherpa"
# Try to import sherpa-onnx
try:
import sherpa_onnx
SHERPA_AVAILABLE = True
except ImportError:
SHERPA_AVAILABLE = False
sherpa_onnx = None
# Available sherpa-onnx Piper models
PIPER_MODELS = {
"amy": {
"dir": "vits-piper-en_US-amy-low",
"model": "en_US-amy-low.onnx",
"name": "Amy (US Female)",
"sample_rate": 16000,
},
"joe": {
"dir": "vits-piper-en_US-joe-medium",
"model": "en_US-joe-medium.onnx",
"name": "Joe (US Male)",
"sample_rate": 22050,
},
"lessac": {
"dir": "vits-piper-en_US-lessac-medium",
"model": "en_US-lessac-medium.onnx",
"name": "Lessac (US Female)",
"sample_rate": 22050,
},
"alan": {
"dir": "vits-piper-en_GB-alan-medium",
"model": "en_GB-alan-medium.onnx",
"name": "Alan (UK Male)",
"sample_rate": 22050,
},
}
class PiperTTSService:
"""Fast local TTS using sherpa-onnx with Piper models"""
def __init__(self):
self.output_sample_rate = 24000 # Our standard output rate
self._tts_engines: dict[str, any] = {}
def is_available(self) -> bool:
"""Check if sherpa-onnx is available"""
return SHERPA_AVAILABLE
def _get_engine(self, model_key: str):
"""Get or create a TTS engine for the given model"""
if model_key in self._tts_engines:
return self._tts_engines[model_key], PIPER_MODELS[model_key]["sample_rate"]
if model_key not in PIPER_MODELS:
raise ValueError(f"Unknown model: {model_key}")
model_info = PIPER_MODELS[model_key]
model_dir = MODELS_DIR / model_info["dir"]
if not model_dir.exists():
raise RuntimeError(f"Model not found: {model_dir}")
config = sherpa_onnx.OfflineTtsConfig(
model=sherpa_onnx.OfflineTtsModelConfig(
vits=sherpa_onnx.OfflineTtsVitsModelConfig(
model=str(model_dir / model_info["model"]),
tokens=str(model_dir / "tokens.txt"),
data_dir=str(model_dir / "espeak-ng-data"),
),
num_threads=2,
),
)
tts = sherpa_onnx.OfflineTts(config)
self._tts_engines[model_key] = tts
return tts, model_info["sample_rate"]
async def generate_speech(self, text: str, model_key: str = "amy") -> bytes:
"""Generate speech from text using sherpa-onnx
Args:
text: Text to synthesize
model_key: Model key (amy, joe, lessac, alan)
Returns:
Raw PCM audio bytes (16-bit signed int, 24kHz mono)
"""
if not SHERPA_AVAILABLE:
raise RuntimeError("sherpa-onnx not installed. Run: pip install sherpa-onnx")
loop = asyncio.get_event_loop()
def run_tts():
tts, model_sample_rate = self._get_engine(model_key)
audio = tts.generate(text)
samples = np.array(audio.samples, dtype=np.float32)
# Resample to 24kHz if needed
if model_sample_rate != self.output_sample_rate:
ratio = self.output_sample_rate / model_sample_rate
new_length = int(len(samples) * ratio)
samples = np.interp(
np.linspace(0, len(samples) - 1, new_length),
np.arange(len(samples)),
samples
).astype(np.float32)
# Convert to int16
audio_int16 = (samples * 32767).astype(np.int16)
return audio_int16.tobytes()
return await loop.run_in_executor(None, run_tts)
def list_available_models(self) -> list[dict]:
"""List available models"""
available = []
for key, info in PIPER_MODELS.items():
model_dir = MODELS_DIR / info["dir"]
if model_dir.exists():
available.append({
"id": key,
"name": info["name"],
"sample_rate": info["sample_rate"],
})
return available
# Global instance
piper_service = PiperTTSService()
def is_piper_available() -> bool:
"""Check if Piper (sherpa-onnx) is available"""
return piper_service.is_available()

View File

@@ -0,0 +1,116 @@
"""Whisper transcription service"""
import tempfile
import numpy as np
from faster_whisper import WhisperModel
import librosa
# Global model instance (loaded once)
_whisper_model = None
def get_whisper_model() -> WhisperModel:
"""Get or create Whisper model instance"""
global _whisper_model
if _whisper_model is None:
print("Loading Whisper tiny model for fast transcription...")
# Use tiny model for speed - about 3-4x faster than base
# beam_size=1 and best_of=1 for fastest inference
_whisper_model = WhisperModel("tiny", device="cpu", compute_type="int8")
print("Whisper model loaded")
return _whisper_model
def decode_audio(audio_data: bytes, source_sample_rate: int = None) -> tuple[np.ndarray, int]:
"""
Decode audio from various formats to numpy array.
Args:
audio_data: Raw audio bytes
source_sample_rate: If provided, treat as raw PCM at this sample rate
Returns:
Tuple of (audio array as float32, sample rate)
"""
# If sample rate is provided, assume raw PCM (from server-side recording)
if source_sample_rate is not None:
print(f"Decoding raw PCM at {source_sample_rate}Hz, {len(audio_data)} bytes")
if len(audio_data) % 2 != 0:
audio_data = audio_data + b'\x00'
audio = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0
return audio, source_sample_rate
print(f"First 20 bytes: {audio_data[:20].hex()}")
# Try to decode with librosa first (handles webm, ogg, wav, mp3, etc via ffmpeg)
try:
with tempfile.NamedTemporaryFile(suffix='.webm', delete=False) as f:
f.write(audio_data)
temp_path = f.name
audio, sample_rate = librosa.load(temp_path, sr=None, mono=True)
print(f"Decoded with librosa: {len(audio)} samples at {sample_rate}Hz")
import os
os.unlink(temp_path)
return audio.astype(np.float32), sample_rate
except Exception as e:
print(f"librosa decode failed: {e}, trying raw PCM at 16kHz...")
# Fall back to raw PCM (16-bit signed int, 16kHz mono - Whisper's rate)
if len(audio_data) % 2 != 0:
audio_data = audio_data + b'\x00'
audio = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0
return audio, 16000
async def transcribe_audio(audio_data: bytes, source_sample_rate: int = None) -> str:
"""
Transcribe audio data to text using Whisper.
Args:
audio_data: Audio bytes (webm, ogg, wav, or raw PCM)
source_sample_rate: If provided, treat audio_data as raw PCM at this rate
Returns:
Transcribed text
"""
model = get_whisper_model()
print(f"Transcribing audio: {len(audio_data)} bytes")
# Decode audio from whatever format
audio, detected_sample_rate = decode_audio(audio_data, source_sample_rate)
print(f"Audio samples: {len(audio)}, duration: {len(audio)/detected_sample_rate:.2f}s")
print(f"Audio range: min={audio.min():.4f}, max={audio.max():.4f}")
# Check if audio is too quiet
if np.abs(audio).max() < 0.01:
print("Warning: Audio appears to be silent or very quiet")
return ""
# Resample to 16kHz for Whisper
if detected_sample_rate != 16000:
audio_16k = librosa.resample(audio, orig_sr=detected_sample_rate, target_sr=16000)
print(f"Resampled to {len(audio_16k)} samples at 16kHz")
else:
audio_16k = audio
# Transcribe with speed optimizations
segments, info = model.transcribe(
audio_16k,
beam_size=1, # Faster, slightly less accurate
best_of=1,
language="en", # Skip language detection
vad_filter=True, # Skip silence
)
segments_list = list(segments)
text = " ".join([s.text for s in segments_list]).strip()
print(f"Transcription result: '{text}' (language: {info.language}, prob: {info.language_probability:.2f})")
return text

701
backend/services/tts.py Normal file
View File

@@ -0,0 +1,701 @@
"""TTS service with ElevenLabs, F5-TTS, MLX Kokoro, StyleTTS2, VITS, and Bark support"""
import os
import numpy as np
from scipy.signal import butter, filtfilt
from pathlib import Path
import tempfile
import torch
from ..config import settings
# Patch torch.load for compatibility with PyTorch 2.6+
_original_torch_load = torch.load
def _patched_torch_load(*args, **kwargs):
kwargs['weights_only'] = False
return _original_torch_load(*args, **kwargs)
torch.load = _patched_torch_load
# Global clients
_elevenlabs_client = None
_vits_tts = None
_bark_loaded = False
_kokoro_model = None
_styletts2_model = None
_f5tts_model = None
_chattts_model = None
_chattts_speakers = {} # Cache for speaker embeddings
# Kokoro voice mapping - using highest-graded voices
# Grades from https://huggingface.co/hexgrad/Kokoro-82M/blob/main/VOICES.md
KOKORO_VOICES = {
# Male voices (best available are C+ grade)
"VR6AewLTigWG4xSOukaG": "am_fenrir", # Tony - deep/powerful (C+)
"TxGEqnHWrfWFTfGW9XjX": "am_michael", # Rick - solid male voice (C+)
"pNInz6obpgDQGcFmaJgB": "am_puck", # Dennis - anxious dad (C+)
"ODq5zmih8GrVes37Dizd": "bm_george", # Earl - older/distinguished British (C)
"IKne3meq5aSn9XLyUdCD": "bm_fable", # Marcus - young British (C)
# Female voices (much better quality available)
"jBpfuIE2acCO8z3wKNLl": "af_heart", # Jasmine - best quality (A)
"EXAVITQu4vr4xnSDxMaL": "af_bella", # Megan - warm/friendly (A-)
"21m00Tcm4TlvDq8ikWAM": "bf_emma", # Tanya - professional British (B-)
"XB0fDUnXU5powFXDhCwa": "af_nicole", # Carla - Jersey mom (B-)
"pFZP5JQG7iQjIQuC4Bku": "af_sarah", # Brenda - overthinker (C+)
}
# Speed adjustments per voice (1.0 = normal, lower = slower/more natural)
# Slower speeds (0.85-0.95) generally sound more natural
KOKORO_SPEEDS = {
# Male voices - slower speeds help with C+ grade voices
"VR6AewLTigWG4xSOukaG": 0.9, # Tony (am_fenrir) - deep voice, slower
"TxGEqnHWrfWFTfGW9XjX": 0.92, # Rick (am_michael) - solid pace
"pNInz6obpgDQGcFmaJgB": 0.95, # Dennis (am_puck) - anxious but not rushed
"ODq5zmih8GrVes37Dizd": 0.85, # Earl (bm_george) - older, slower British
"IKne3meq5aSn9XLyUdCD": 0.95, # Marcus (bm_fable) - young, natural
# Female voices - A-grade voices can handle faster speeds
"jBpfuIE2acCO8z3wKNLl": 0.95, # Jasmine (af_heart) - best voice, natural pace
"EXAVITQu4vr4xnSDxMaL": 0.95, # Megan (af_bella) - warm
"21m00Tcm4TlvDq8ikWAM": 0.9, # Tanya (bf_emma) - professional British
"XB0fDUnXU5powFXDhCwa": 0.95, # Carla (af_nicole) - animated but clear
"pFZP5JQG7iQjIQuC4Bku": 0.92, # Brenda (af_sarah) - overthinker, measured
}
DEFAULT_KOKORO_VOICE = "af_heart"
DEFAULT_KOKORO_SPEED = 0.95
# VCTK speaker mapping - different voices for different callers
VITS_SPEAKERS = {
# Male voices
"VR6AewLTigWG4xSOukaG": "p226", # Tony
"TxGEqnHWrfWFTfGW9XjX": "p251", # Rick
"pNInz6obpgDQGcFmaJgB": "p245", # Dennis
"ODq5zmih8GrVes37Dizd": "p232", # Earl
"IKne3meq5aSn9XLyUdCD": "p252", # Marcus
# Female voices
"jBpfuIE2acCO8z3wKNLl": "p225", # Jasmine
"EXAVITQu4vr4xnSDxMaL": "p228", # Megan
"21m00Tcm4TlvDq8ikWAM": "p229", # Tanya
"XB0fDUnXU5powFXDhCwa": "p231", # Carla
"pFZP5JQG7iQjIQuC4Bku": "p233", # Brenda
}
DEFAULT_VITS_SPEAKER = "p225"
# Inworld voice mapping - maps ElevenLabs voice IDs to Inworld voices
# Full voice list from API: Alex, Ashley, Blake, Carter, Clive, Craig, Deborah,
# Dennis, Dominus, Edward, Elizabeth, Hades, Hana, Julia, Luna, Mark, Olivia,
# Pixie, Priya, Ronald, Sarah, Shaun, Theodore, Timothy, Wendy
INWORLD_VOICES = {
# Male voices - each caller gets a unique voice matching their personality
"VR6AewLTigWG4xSOukaG": "Edward", # Tony - fast-talking, emphatic, streetwise
"TxGEqnHWrfWFTfGW9XjX": "Shaun", # Rick - friendly, dynamic, conversational
"pNInz6obpgDQGcFmaJgB": "Alex", # Dennis - energetic, expressive, mildly nasal
"ODq5zmih8GrVes37Dizd": "Craig", # Earl - older British, refined, articulate
"IKne3meq5aSn9XLyUdCD": "Timothy", # Marcus - lively, upbeat American
# Female voices - each caller gets a unique voice matching their personality
"jBpfuIE2acCO8z3wKNLl": "Hana", # Jasmine - bright, expressive young female
"EXAVITQu4vr4xnSDxMaL": "Ashley", # Megan - warm, natural female
"21m00Tcm4TlvDq8ikWAM": "Wendy", # Tanya - posh, middle-aged British
"XB0fDUnXU5powFXDhCwa": "Sarah", # Carla - fast-talking, questioning tone
"pFZP5JQG7iQjIQuC4Bku": "Deborah", # Brenda - gentle, elegant
}
DEFAULT_INWORLD_VOICE = "Dennis"
def preprocess_text_for_kokoro(text: str) -> str:
"""
Preprocess text to improve Kokoro prosody and naturalness.
- Adds slight pauses via punctuation
- Handles contractions and abbreviations
- Normalizes spacing
"""
import re
# Normalize whitespace
text = ' '.join(text.split())
# Add comma pauses after common transition words (if no punctuation follows)
transitions = [
r'\b(Well)\s+(?=[A-Za-z])',
r'\b(So)\s+(?=[A-Za-z])',
r'\b(Now)\s+(?=[A-Za-z])',
r'\b(Look)\s+(?=[A-Za-z])',
r'\b(See)\s+(?=[A-Za-z])',
r'\b(Anyway)\s+(?=[A-Za-z])',
r'\b(Actually)\s+(?=[A-Za-z])',
r'\b(Honestly)\s+(?=[A-Za-z])',
r'\b(Basically)\s+(?=[A-Za-z])',
]
for pattern in transitions:
text = re.sub(pattern, r'\1, ', text)
# Add pause after "I mean" at start of sentence
text = re.sub(r'^(I mean)\s+', r'\1, ', text)
text = re.sub(r'\.\s+(I mean)\s+', r'. \1, ', text)
# Expand common abbreviations for better pronunciation
abbreviations = {
r'\bDr\.': 'Doctor',
r'\bMr\.': 'Mister',
r'\bMrs\.': 'Missus',
r'\bMs\.': 'Miss',
r'\bSt\.': 'Street',
r'\bAve\.': 'Avenue',
r'\betc\.': 'etcetera',
r'\bvs\.': 'versus',
r'\bw/': 'with',
r'\bw/o': 'without',
}
for abbr, expansion in abbreviations.items():
text = re.sub(abbr, expansion, text, flags=re.IGNORECASE)
# Add breath pause (comma) before conjunctions in long sentences
text = re.sub(r'(\w{20,})\s+(and|but|or)\s+', r'\1, \2 ', text)
# Ensure proper spacing after punctuation
text = re.sub(r'([.!?])\s*([A-Z])', r'\1 \2', text)
return text
# StyleTTS2 reference voice files (place .wav files in voices/ directory for voice cloning)
# Maps voice_id to reference audio filename - if file doesn't exist, uses default voice
STYLETTS2_VOICES = {
# Male voices
"VR6AewLTigWG4xSOukaG": "tony.wav", # Tony
"TxGEqnHWrfWFTfGW9XjX": "rick.wav", # Rick
"pNInz6obpgDQGcFmaJgB": "dennis.wav", # Dennis
"ODq5zmih8GrVes37Dizd": "earl.wav", # Earl
"IKne3meq5aSn9XLyUdCD": "marcus.wav", # Marcus
# Female voices
"jBpfuIE2acCO8z3wKNLl": "jasmine.wav", # Jasmine
"EXAVITQu4vr4xnSDxMaL": "megan.wav", # Megan
"21m00Tcm4TlvDq8ikWAM": "tanya.wav", # Tanya
"XB0fDUnXU5powFXDhCwa": "carla.wav", # Carla
"pFZP5JQG7iQjIQuC4Bku": "brenda.wav", # Brenda
}
# F5-TTS reference voices (same files as StyleTTS2, reuses voices/ directory)
# Requires: mono, 24kHz, 5-10 seconds, with transcript in .txt file
F5TTS_VOICES = STYLETTS2_VOICES.copy()
# ChatTTS speaker seeds - different seeds produce different voices
# These are used to generate consistent speaker embeddings
CHATTTS_SEEDS = {
# Male voices
"VR6AewLTigWG4xSOukaG": 42, # Tony - deep voice
"TxGEqnHWrfWFTfGW9XjX": 123, # Rick
"pNInz6obpgDQGcFmaJgB": 456, # Dennis
"ODq5zmih8GrVes37Dizd": 789, # Earl
"IKne3meq5aSn9XLyUdCD": 1011, # Marcus
# Female voices
"jBpfuIE2acCO8z3wKNLl": 2024, # Jasmine
"EXAVITQu4vr4xnSDxMaL": 3033, # Megan
"21m00Tcm4TlvDq8ikWAM": 4042, # Tanya
"XB0fDUnXU5powFXDhCwa": 5051, # Carla
"pFZP5JQG7iQjIQuC4Bku": 6060, # Brenda
}
DEFAULT_CHATTTS_SEED = 42
def get_elevenlabs_client():
"""Get or create ElevenLabs client"""
global _elevenlabs_client
if _elevenlabs_client is None:
from elevenlabs.client import ElevenLabs
_elevenlabs_client = ElevenLabs(api_key=settings.elevenlabs_api_key)
return _elevenlabs_client
def get_vits_tts():
"""Get or create VITS VCTK TTS instance"""
global _vits_tts
if _vits_tts is None:
from TTS.api import TTS
_vits_tts = TTS("tts_models/en/vctk/vits")
return _vits_tts
def get_kokoro_model():
"""Get or create Kokoro MLX model"""
global _kokoro_model
if _kokoro_model is None:
from mlx_audio.tts.utils import load_model
_kokoro_model = load_model(model_path='mlx-community/Kokoro-82M-bf16')
print("Kokoro MLX model loaded")
return _kokoro_model
def ensure_bark_loaded():
"""Ensure Bark models are loaded on GPU"""
global _bark_loaded
if not _bark_loaded:
os.environ['SUNO_USE_SMALL_MODELS'] = '1'
# Force Bark to use MPS (Apple Silicon GPU)
if torch.backends.mps.is_available():
os.environ['SUNO_OFFLOAD_CPU'] = '0'
os.environ['SUNO_ENABLE_MPS'] = '1'
from bark import preload_models
preload_models()
_bark_loaded = True
print(f"Bark loaded on device: {'MPS' if torch.backends.mps.is_available() else 'CPU'}")
def get_styletts2_model():
"""Get or create StyleTTS2 model"""
global _styletts2_model
if _styletts2_model is None:
from styletts2 import tts
_styletts2_model = tts.StyleTTS2()
print("StyleTTS2 model loaded")
return _styletts2_model
def get_f5tts_generate():
"""Get F5-TTS generate function (lazy load)"""
global _f5tts_model
if _f5tts_model is None:
# Disable tqdm progress bars to avoid BrokenPipeError in server context
import os
os.environ['HF_HUB_DISABLE_PROGRESS_BARS'] = '1'
os.environ['TQDM_DISABLE'] = '1'
from f5_tts_mlx.generate import generate
_f5tts_model = generate
print("F5-TTS MLX loaded")
return _f5tts_model
def get_chattts_model():
"""Get or create ChatTTS model"""
global _chattts_model
if _chattts_model is None:
import ChatTTS
_chattts_model = ChatTTS.Chat()
_chattts_model.load(compile=False)
print("ChatTTS model loaded")
return _chattts_model
def get_chattts_speaker(voice_id: str):
"""Get or create a consistent speaker embedding for a voice"""
global _chattts_speakers
if voice_id not in _chattts_speakers:
chat = get_chattts_model()
seed = CHATTTS_SEEDS.get(voice_id, DEFAULT_CHATTTS_SEED)
# Set seed for reproducible speaker
torch.manual_seed(seed)
_chattts_speakers[voice_id] = chat.sample_random_speaker()
print(f"[ChatTTS] Created speaker for voice {voice_id} with seed {seed}")
return _chattts_speakers[voice_id]
def phone_filter(audio: np.ndarray, sample_rate: int = 24000, quality: str = "normal") -> np.ndarray:
"""Apply phone filter with variable quality."""
audio = audio.flatten()
presets = {
"good": (200, 7000, 1.0, 0.0),
"normal": (300, 3400, 1.5, 0.005),
"bad": (400, 2800, 2.0, 0.015),
"terrible": (500, 2200, 2.5, 0.03),
}
low_hz, high_hz, distortion, noise = presets.get(quality, presets["normal"])
low = low_hz / (sample_rate / 2)
high = high_hz / (sample_rate / 2)
b, a = butter(4, [low, high], btype='band')
filtered = filtfilt(b, a, audio)
filtered = np.tanh(filtered * distortion) * 0.8
if noise > 0:
static = np.random.normal(0, noise, len(filtered)).astype(np.float32)
static_envelope = np.random.random(len(filtered) // 1000 + 1)
static_envelope = np.repeat(static_envelope, 1000)[:len(filtered)]
static *= (static_envelope > 0.7).astype(np.float32)
filtered = filtered + static
return filtered.astype(np.float32)
async def generate_speech_elevenlabs(text: str, voice_id: str) -> tuple[np.ndarray, int]:
"""Generate speech using ElevenLabs"""
client = get_elevenlabs_client()
audio_gen = client.text_to_speech.convert(
voice_id=voice_id,
text=text,
model_id="eleven_v3",
output_format="pcm_24000"
)
audio_bytes = b"".join(audio_gen)
audio = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0
return audio, 24000
async def generate_speech_kokoro(text: str, voice_id: str) -> tuple[np.ndarray, int]:
"""Generate speech using MLX Kokoro (fast, good quality, Apple Silicon optimized)"""
import librosa
from mlx_audio.tts.generate import generate_audio
model = get_kokoro_model()
voice = KOKORO_VOICES.get(voice_id, DEFAULT_KOKORO_VOICE)
speed = KOKORO_SPEEDS.get(voice_id, DEFAULT_KOKORO_SPEED)
# Preprocess text for better prosody
text = preprocess_text_for_kokoro(text)
# Determine lang_code from voice prefix (a=American, b=British)
lang_code = 'b' if voice.startswith('b') else 'a'
with tempfile.TemporaryDirectory() as tmpdir:
generate_audio(
text,
model=model,
voice=voice,
speed=speed,
lang_code=lang_code,
output_path=tmpdir,
file_prefix='tts',
verbose=False
)
# Read the generated audio file
audio_file = Path(tmpdir) / 'tts_000.wav'
if not audio_file.exists():
raise RuntimeError("Kokoro failed to generate audio")
audio, sr = librosa.load(str(audio_file), sr=None, mono=True)
# Resample to 24kHz if needed
if sr != 24000:
audio = librosa.resample(audio, orig_sr=sr, target_sr=24000)
return audio.astype(np.float32), 24000
async def generate_speech_vits(text: str, voice_id: str) -> tuple[np.ndarray, int]:
"""Generate speech using VITS VCTK (fast, multiple speakers)"""
import librosa
tts = get_vits_tts()
speaker = VITS_SPEAKERS.get(voice_id, DEFAULT_VITS_SPEAKER)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp_path = tmp.name
try:
tts.tts_to_file(text=text, file_path=tmp_path, speaker=speaker)
audio, sr = librosa.load(tmp_path, sr=None, mono=True)
if sr != 24000:
audio = librosa.resample(audio, orig_sr=sr, target_sr=24000)
return audio.astype(np.float32), 24000
finally:
Path(tmp_path).unlink(missing_ok=True)
async def generate_speech_bark(text: str, voice_id: str) -> tuple[np.ndarray, int]:
"""Generate speech using Bark (slow but expressive, supports emotes like [laughs])"""
import librosa
from bark import SAMPLE_RATE, generate_audio
ensure_bark_loaded()
# Generate audio with Bark
audio = generate_audio(text)
# Normalize to prevent clipping (Bark can exceed [-1, 1])
max_val = np.abs(audio).max()
if max_val > 0.95:
audio = audio * (0.95 / max_val)
# Resample to 24kHz if needed
if SAMPLE_RATE != 24000:
audio = librosa.resample(audio, orig_sr=SAMPLE_RATE, target_sr=24000)
return audio.astype(np.float32), 24000
async def generate_speech_styletts2(text: str, voice_id: str) -> tuple[np.ndarray, int]:
"""Generate speech using StyleTTS2 (high quality, supports voice cloning)"""
import librosa
model = get_styletts2_model()
# Check for reference voice file
voice_file = STYLETTS2_VOICES.get(voice_id)
voice_path = None
if voice_file:
voice_path = settings.base_dir / "voices" / voice_file
if not voice_path.exists():
voice_path = None # Use default voice if file doesn't exist
# Generate audio
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp_path = tmp.name
try:
if voice_path:
print(f"[StyleTTS2] Using voice clone: {voice_path}")
audio = model.inference(
text,
target_voice_path=str(voice_path),
output_wav_file=tmp_path,
output_sample_rate=24000,
diffusion_steps=5, # Balance quality/speed
alpha=0.3, # More voice-like than text-like
beta=0.7, # Good prosody
)
else:
print("[StyleTTS2] Using default voice")
audio = model.inference(
text,
output_wav_file=tmp_path,
output_sample_rate=24000,
diffusion_steps=5,
)
# Load the generated audio
audio, sr = librosa.load(tmp_path, sr=None, mono=True)
if sr != 24000:
audio = librosa.resample(audio, orig_sr=sr, target_sr=24000)
return audio.astype(np.float32), 24000
finally:
Path(tmp_path).unlink(missing_ok=True)
async def generate_speech_f5tts(text: str, voice_id: str) -> tuple[np.ndarray, int]:
"""Generate speech using F5-TTS MLX (very natural, supports voice cloning)"""
import librosa
generate = get_f5tts_generate()
# Check for reference voice file and transcript
voice_file = F5TTS_VOICES.get(voice_id)
ref_audio_path = None
ref_text = None
if voice_file:
voice_path = settings.base_dir / "voices" / voice_file
txt_path = voice_path.with_suffix('.txt')
if voice_path.exists() and txt_path.exists():
ref_audio_path = str(voice_path)
ref_text = txt_path.read_text().strip()
print(f"[F5-TTS] Using voice clone: {voice_path}")
if not ref_audio_path:
print("[F5-TTS] Using default voice")
# Generate audio to temp file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp_path = tmp.name
try:
generate(
generation_text=text,
ref_audio_path=ref_audio_path,
ref_audio_text=ref_text,
steps=8,
speed=1.0,
output_path=tmp_path,
)
# Load the generated audio
audio, sr = librosa.load(tmp_path, sr=None, mono=True)
# Resample to 24kHz if needed
if sr != 24000:
audio = librosa.resample(audio, orig_sr=sr, target_sr=24000)
return audio.astype(np.float32), 24000
finally:
Path(tmp_path).unlink(missing_ok=True)
async def generate_speech_chattts(text: str, voice_id: str) -> tuple[np.ndarray, int]:
"""Generate speech using ChatTTS (natural conversational speech, multiple speakers)"""
import ChatTTS
chat = get_chattts_model()
# Ensure text is not empty and has reasonable content
text = text.strip()
if not text:
text = "Hello."
print(f"[ChatTTS] Generating speech for: {text[:50]}...")
# Get consistent speaker for this voice
seed = CHATTTS_SEEDS.get(voice_id, DEFAULT_CHATTTS_SEED)
torch.manual_seed(seed)
# Configure inference parameters
params_infer_code = ChatTTS.Chat.InferCodeParams(
temperature=0.3,
top_P=0.7,
top_K=20,
)
# Generate audio (skip text refinement to avoid narrow() error with this version)
wavs = chat.infer(
[text],
params_infer_code=params_infer_code,
skip_refine_text=True,
)
if wavs is None or len(wavs) == 0:
raise RuntimeError("ChatTTS failed to generate audio")
audio = wavs[0]
# Handle different output shapes
if audio.ndim > 1:
audio = audio.squeeze()
# Normalize
max_val = np.abs(audio).max()
if max_val > 0.95:
audio = audio * (0.95 / max_val)
return audio.astype(np.float32), 24000
async def generate_speech_inworld(text: str, voice_id: str) -> tuple[np.ndarray, int]:
"""Generate speech using Inworld TTS API (high quality, natural voices)"""
import httpx
import base64
import librosa
voice = INWORLD_VOICES.get(voice_id, DEFAULT_INWORLD_VOICE)
api_key = settings.inworld_api_key
if not api_key:
raise RuntimeError("INWORLD_API_KEY not set in environment")
print(f"[Inworld TTS] Voice: {voice}, Text: {text[:50]}...")
url = "https://api.inworld.ai/tts/v1/voice"
headers = {
"Content-Type": "application/json",
"Authorization": f"Basic {api_key}",
}
payload = {
"text": text,
"voice_id": voice,
"model_id": "inworld-tts-1.5-mini",
"audio_config": {
"encoding": "LINEAR16",
"sample_rate_hertz": 48000,
},
}
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(url, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
# Decode base64 audio
audio_b64 = data.get("audioContent")
if not audio_b64:
raise RuntimeError("Inworld TTS returned no audio content")
audio_bytes = base64.b64decode(audio_b64)
# Parse audio using soundfile (handles WAV, MP3, etc.)
import soundfile as sf
import io
# soundfile can read WAV, FLAC, OGG, and with ffmpeg: MP3
# MP3 files start with ID3 tag or 0xff sync bytes
try:
audio, sr = sf.read(io.BytesIO(audio_bytes))
except Exception as e:
print(f"[Inworld TTS] soundfile failed: {e}, trying raw PCM")
# Fallback to raw PCM
if len(audio_bytes) % 2 != 0:
audio_bytes = audio_bytes[:-1]
audio = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0
sr = 48000
# Resample to 24kHz to match other providers
if sr != 24000:
audio = librosa.resample(audio, orig_sr=sr, target_sr=24000)
return audio.astype(np.float32), 24000
async def generate_speech(
text: str,
voice_id: str,
phone_quality: str = "normal",
apply_filter: bool = True
) -> bytes:
"""
Generate speech from text.
Args:
text: Text to speak
voice_id: ElevenLabs voice ID (mapped to local voice if using local TTS)
phone_quality: Quality of phone filter ("none" to disable)
apply_filter: Whether to apply phone filter
Returns:
Raw PCM audio bytes (16-bit signed int, 24kHz)
"""
# Choose TTS provider
provider = settings.tts_provider
print(f"[TTS] Provider: {provider}, Text: {text[:50]}...")
if provider == "kokoro":
audio, sample_rate = await generate_speech_kokoro(text, voice_id)
elif provider == "f5tts":
audio, sample_rate = await generate_speech_f5tts(text, voice_id)
elif provider == "inworld":
audio, sample_rate = await generate_speech_inworld(text, voice_id)
elif provider == "chattts":
audio, sample_rate = await generate_speech_chattts(text, voice_id)
elif provider == "styletts2":
audio, sample_rate = await generate_speech_styletts2(text, voice_id)
elif provider == "bark":
audio, sample_rate = await generate_speech_bark(text, voice_id)
elif provider == "vits":
audio, sample_rate = await generate_speech_vits(text, voice_id)
elif provider == "elevenlabs":
audio, sample_rate = await generate_speech_elevenlabs(text, voice_id)
else:
raise ValueError(f"Unknown TTS provider: {provider}")
# Apply phone filter if requested
# Skip filter for Bark - it already has rough audio quality
if apply_filter and phone_quality not in ("none", "studio") and provider != "bark":
audio = phone_filter(audio, sample_rate, phone_quality)
# Convert to bytes
audio_int16 = (audio * 32768).clip(-32768, 32767).astype(np.int16)
return audio_int16.tobytes()
# Voice IDs for cohost and announcer
COHOST_VOICE_ID = "nPczCjzI2devNBz1zQrb"
ANNOUNCER_VOICE_ID = "ErXwobaYiN019PkySvjV"
async def generate_cohost_speech(text: str) -> bytes:
"""Generate speech for cohost Bobby (no phone filter)"""
return await generate_speech(text, COHOST_VOICE_ID, apply_filter=False)
async def generate_announcer_speech(text: str) -> bytes:
"""Generate speech for announcer (no phone filter)"""
return await generate_speech(text, ANNOUNCER_VOICE_ID, apply_filter=False)

200
backend/services/voices.py Normal file
View File

@@ -0,0 +1,200 @@
"""Voice configuration and TTS provider management"""
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class TTSProvider(str, Enum):
ELEVENLABS = "elevenlabs"
EDGE = "edge" # Microsoft Edge TTS (free)
PIPER = "piper" # Local Piper via sherpa-onnx (free, fast)
@dataclass
class Voice:
"""Voice configuration"""
id: str
name: str
provider: TTSProvider
provider_voice_id: str # The actual ID used by the provider
description: str = ""
language: str = "en"
gender: str = "neutral"
# ElevenLabs voices
ELEVENLABS_VOICES = [
Voice("el_tony", "Tony (ElevenLabs)", TTSProvider.ELEVENLABS, "IKne3meq5aSn9XLyUdCD",
"Male, New York accent, expressive", "en", "male"),
Voice("el_jasmine", "Jasmine (ElevenLabs)", TTSProvider.ELEVENLABS, "FGY2WhTYpPnrIDTdsKH5",
"Female, confident, direct", "en", "female"),
Voice("el_rick", "Rick (ElevenLabs)", TTSProvider.ELEVENLABS, "JBFqnCBsd6RMkjVDRZzb",
"Male, Texas accent, older", "en", "male"),
Voice("el_megan", "Megan (ElevenLabs)", TTSProvider.ELEVENLABS, "XrExE9yKIg1WjnnlVkGX",
"Female, young, casual", "en", "female"),
Voice("el_dennis", "Dennis (ElevenLabs)", TTSProvider.ELEVENLABS, "cjVigY5qzO86Huf0OWal",
"Male, middle-aged, anxious", "en", "male"),
Voice("el_tanya", "Tanya (ElevenLabs)", TTSProvider.ELEVENLABS, "N2lVS1w4EtoT3dr4eOWO",
"Female, Miami, sassy", "en", "female"),
Voice("el_earl", "Earl (ElevenLabs)", TTSProvider.ELEVENLABS, "EXAVITQu4vr4xnSDxMaL",
"Male, elderly, Southern", "en", "male"),
Voice("el_carla", "Carla (ElevenLabs)", TTSProvider.ELEVENLABS, "CwhRBWXzGAHq8TQ4Fs17",
"Female, Jersey, sharp", "en", "female"),
Voice("el_marcus", "Marcus (ElevenLabs)", TTSProvider.ELEVENLABS, "bIHbv24MWmeRgasZH58o",
"Male, young, urban", "en", "male"),
Voice("el_brenda", "Brenda (ElevenLabs)", TTSProvider.ELEVENLABS, "Xb7hH8MSUJpSbSDYk0k2",
"Female, middle-aged, worried", "en", "female"),
Voice("el_jake", "Jake (ElevenLabs)", TTSProvider.ELEVENLABS, "SOYHLrjzK2X1ezoPC6cr",
"Male, Boston, insecure", "en", "male"),
Voice("el_diane", "Diane (ElevenLabs)", TTSProvider.ELEVENLABS, "cgSgspJ2msm6clMCkdW9",
"Female, mature, conflicted", "en", "female"),
Voice("el_bobby", "Bobby (ElevenLabs)", TTSProvider.ELEVENLABS, "nPczCjzI2devNBz1zQrb",
"Male, sidekick, wisecracking", "en", "male"),
Voice("el_announcer", "Announcer (ElevenLabs)", TTSProvider.ELEVENLABS, "ErXwobaYiN019PkySvjV",
"Male, radio announcer", "en", "male"),
]
# Edge TTS voices (Microsoft, free)
EDGE_VOICES = [
# US voices
Voice("edge_jenny", "Jenny (Edge)", TTSProvider.EDGE, "en-US-JennyNeural",
"Female, American, friendly", "en", "female"),
Voice("edge_guy", "Guy (Edge)", TTSProvider.EDGE, "en-US-GuyNeural",
"Male, American, casual", "en", "male"),
Voice("edge_aria", "Aria (Edge)", TTSProvider.EDGE, "en-US-AriaNeural",
"Female, American, professional", "en", "female"),
Voice("edge_davis", "Davis (Edge)", TTSProvider.EDGE, "en-US-DavisNeural",
"Male, American, calm", "en", "male"),
Voice("edge_amber", "Amber (Edge)", TTSProvider.EDGE, "en-US-AmberNeural",
"Female, American, warm", "en", "female"),
Voice("edge_andrew", "Andrew (Edge)", TTSProvider.EDGE, "en-US-AndrewNeural",
"Male, American, confident", "en", "male"),
Voice("edge_ashley", "Ashley (Edge)", TTSProvider.EDGE, "en-US-AshleyNeural",
"Female, American, cheerful", "en", "female"),
Voice("edge_brian", "Brian (Edge)", TTSProvider.EDGE, "en-US-BrianNeural",
"Male, American, narrator", "en", "male"),
Voice("edge_christopher", "Christopher (Edge)", TTSProvider.EDGE, "en-US-ChristopherNeural",
"Male, American, reliable", "en", "male"),
Voice("edge_cora", "Cora (Edge)", TTSProvider.EDGE, "en-US-CoraNeural",
"Female, American, older", "en", "female"),
Voice("edge_elizabeth", "Elizabeth (Edge)", TTSProvider.EDGE, "en-US-ElizabethNeural",
"Female, American, elegant", "en", "female"),
Voice("edge_eric", "Eric (Edge)", TTSProvider.EDGE, "en-US-EricNeural",
"Male, American, friendly", "en", "male"),
Voice("edge_jacob", "Jacob (Edge)", TTSProvider.EDGE, "en-US-JacobNeural",
"Male, American, young", "en", "male"),
Voice("edge_michelle", "Michelle (Edge)", TTSProvider.EDGE, "en-US-MichelleNeural",
"Female, American, clear", "en", "female"),
Voice("edge_monica", "Monica (Edge)", TTSProvider.EDGE, "en-US-MonicaNeural",
"Female, American, expressive", "en", "female"),
Voice("edge_roger", "Roger (Edge)", TTSProvider.EDGE, "en-US-RogerNeural",
"Male, American, mature", "en", "male"),
Voice("edge_steffan", "Steffan (Edge)", TTSProvider.EDGE, "en-US-SteffanNeural",
"Male, American, formal", "en", "male"),
Voice("edge_tony", "Tony (Edge)", TTSProvider.EDGE, "en-US-TonyNeural",
"Male, American, conversational", "en", "male"),
# UK voices
Voice("edge_sonia", "Sonia (Edge UK)", TTSProvider.EDGE, "en-GB-SoniaNeural",
"Female, British, professional", "en", "female"),
Voice("edge_ryan", "Ryan (Edge UK)", TTSProvider.EDGE, "en-GB-RyanNeural",
"Male, British, clear", "en", "male"),
Voice("edge_libby", "Libby (Edge UK)", TTSProvider.EDGE, "en-GB-LibbyNeural",
"Female, British, warm", "en", "female"),
Voice("edge_thomas", "Thomas (Edge UK)", TTSProvider.EDGE, "en-GB-ThomasNeural",
"Male, British, friendly", "en", "male"),
# Australian voices
Voice("edge_natasha", "Natasha (Edge AU)", TTSProvider.EDGE, "en-AU-NatashaNeural",
"Female, Australian, friendly", "en", "female"),
Voice("edge_william", "William (Edge AU)", TTSProvider.EDGE, "en-AU-WilliamNeural",
"Male, Australian, casual", "en", "male"),
]
# Piper voices (local, via sherpa-onnx)
PIPER_VOICES = [
Voice("piper_amy", "Amy (Piper)", TTSProvider.PIPER, "amy",
"Female, American, clear", "en", "female"),
Voice("piper_joe", "Joe (Piper)", TTSProvider.PIPER, "joe",
"Male, American, natural", "en", "male"),
Voice("piper_lessac", "Lessac (Piper)", TTSProvider.PIPER, "lessac",
"Female, American, expressive", "en", "female"),
Voice("piper_alan", "Alan (Piper)", TTSProvider.PIPER, "alan",
"Male, British, clear", "en", "male"),
]
# All voices combined
ALL_VOICES = ELEVENLABS_VOICES + EDGE_VOICES + PIPER_VOICES
# Voice lookup by ID
VOICES_BY_ID = {v.id: v for v in ALL_VOICES}
# Default voice assignments for callers (maps caller key to voice ID)
DEFAULT_CALLER_VOICES = {
"1": "el_tony", # Tony from Staten Island
"2": "el_jasmine", # Jasmine from Atlanta
"3": "el_rick", # Rick from Texas
"4": "el_megan", # Megan from Portland
"5": "el_dennis", # Dennis from Long Island
"6": "el_tanya", # Tanya from Miami
"7": "el_earl", # Earl from Tennessee
"8": "el_carla", # Carla from Jersey
"9": "el_marcus", # Marcus from Detroit
"0": "el_brenda", # Brenda from Phoenix
"-": "el_jake", # Jake from Boston
"=": "el_diane", # Diane from Chicago
"bobby": "el_bobby",
"announcer": "el_announcer",
}
class VoiceManager:
"""Manages voice assignments and TTS provider selection"""
def __init__(self):
# Current voice assignments (can be modified at runtime)
self.caller_voices = DEFAULT_CALLER_VOICES.copy()
def get_voice(self, voice_id: str) -> Optional[Voice]:
"""Get voice by ID"""
return VOICES_BY_ID.get(voice_id)
def get_caller_voice(self, caller_key: str) -> Voice:
"""Get the voice assigned to a caller"""
voice_id = self.caller_voices.get(caller_key, "el_tony")
return VOICES_BY_ID.get(voice_id, ELEVENLABS_VOICES[0])
def set_caller_voice(self, caller_key: str, voice_id: str):
"""Assign a voice to a caller"""
if voice_id in VOICES_BY_ID:
self.caller_voices[caller_key] = voice_id
def get_all_voices(self) -> list[dict]:
"""Get all available voices as dicts for API"""
return [
{
"id": v.id,
"name": v.name,
"provider": v.provider.value,
"description": v.description,
"gender": v.gender,
}
for v in ALL_VOICES
]
def get_voices_by_provider(self, provider: TTSProvider) -> list[Voice]:
"""Get all voices for a specific provider"""
return [v for v in ALL_VOICES if v.provider == provider]
def get_caller_voice_assignments(self) -> dict[str, str]:
"""Get current caller voice assignments"""
return self.caller_voices.copy()
def set_caller_voice_assignments(self, assignments: dict[str, str]):
"""Set multiple caller voice assignments"""
for caller_key, voice_id in assignments.items():
if voice_id in VOICES_BY_ID:
self.caller_voices[caller_key] = voice_id
# Global instance
voice_manager = VoiceManager()