Files
ai-podcast/backend/services/caller_service.py
tcpsyn a94fc92647 Improve SignalWire streaming, randomize caller names, update frontend
- Add streamSid tracking and per-caller send locks for SignalWire
- Improve TTS streaming with real-time pacing and detailed logging
- Block host audio to caller during TTS playback
- Randomize caller names between sessions from name pools
- Update page title and show phone number in UI

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 01:56:05 -07:00

243 lines
9.2 KiB
Python

"""Phone caller queue and audio stream service"""
import asyncio
import base64
import json
import time
import threading
import numpy as np
from typing import Optional
class CallerService:
"""Manages phone caller queue, channel allocation, and WebSocket streams"""
FIRST_REAL_CHANNEL = 3
def __init__(self):
self._queue: list[dict] = []
self.active_calls: dict[str, dict] = {}
self._allocated_channels: set[int] = set()
self._caller_counter: int = 0
self._lock = threading.Lock()
self._websockets: dict[str, any] = {} # caller_id -> WebSocket
self._call_sids: dict[str, str] = {} # caller_id -> SignalWire callSid
self._stream_sids: dict[str, str] = {} # caller_id -> SignalWire streamSid
self._send_locks: dict[str, asyncio.Lock] = {} # per-caller send lock
self._streaming_tts: set[str] = set() # caller_ids currently receiving TTS
def _get_send_lock(self, caller_id: str) -> asyncio.Lock:
if caller_id not in self._send_locks:
self._send_locks[caller_id] = asyncio.Lock()
return self._send_locks[caller_id]
def is_streaming_tts(self, caller_id: str) -> bool:
return caller_id in self._streaming_tts
def is_streaming_tts_any(self) -> bool:
return len(self._streaming_tts) > 0
def add_to_queue(self, caller_id: str, phone: str):
with self._lock:
self._queue.append({
"caller_id": caller_id,
"phone": phone,
"queued_at": time.time(),
})
print(f"[Caller] {phone} added to queue (ID: {caller_id})")
def remove_from_queue(self, caller_id: str):
with self._lock:
self._queue = [c for c in self._queue if c["caller_id"] != caller_id]
print(f"[Caller] {caller_id} removed from queue")
def get_queue(self) -> list[dict]:
now = time.time()
with self._lock:
return [
{
"caller_id": c["caller_id"],
"phone": c["phone"],
"wait_time": int(now - c["queued_at"]),
}
for c in self._queue
]
def allocate_channel(self) -> int:
with self._lock:
ch = self.FIRST_REAL_CHANNEL
while ch in self._allocated_channels:
ch += 1
self._allocated_channels.add(ch)
return ch
def release_channel(self, channel: int):
with self._lock:
self._allocated_channels.discard(channel)
def take_call(self, caller_id: str) -> dict:
caller = None
with self._lock:
for c in self._queue:
if c["caller_id"] == caller_id:
caller = c
break
if caller:
self._queue = [c for c in self._queue if c["caller_id"] != caller_id]
if not caller:
raise ValueError(f"Caller {caller_id} not in queue")
channel = self.allocate_channel()
self._caller_counter += 1
phone = caller["phone"]
call_info = {
"caller_id": caller_id,
"phone": phone,
"channel": channel,
"started_at": time.time(),
}
self.active_calls[caller_id] = call_info
print(f"[Caller] {phone} taken on air — channel {channel}")
return call_info
def hangup(self, caller_id: str):
call_info = self.active_calls.pop(caller_id, None)
if call_info:
self.release_channel(call_info["channel"])
print(f"[Caller] {call_info['phone']} hung up — channel {call_info['channel']} released")
self._websockets.pop(caller_id, None)
self._call_sids.pop(caller_id, None)
self._stream_sids.pop(caller_id, None)
self._send_locks.pop(caller_id, None)
def reset(self):
with self._lock:
for call_info in self.active_calls.values():
self._allocated_channels.discard(call_info["channel"])
self._queue.clear()
self.active_calls.clear()
self._allocated_channels.clear()
self._caller_counter = 0
self._websockets.clear()
self._call_sids.clear()
self._stream_sids.clear()
self._send_locks.clear()
self._streaming_tts.clear()
print("[Caller] Service reset")
def register_websocket(self, caller_id: str, websocket):
"""Register a WebSocket for a caller"""
self._websockets[caller_id] = websocket
def unregister_websocket(self, caller_id: str):
"""Unregister a WebSocket"""
self._websockets.pop(caller_id, None)
async def send_audio_to_caller(self, caller_id: str, pcm_data: bytes, sample_rate: int):
"""Send small audio chunk to caller via SignalWire WebSocket.
Encodes L16 PCM as base64 JSON per SignalWire protocol.
"""
if caller_id in self._streaming_tts:
return # Don't send host audio during TTS streaming
ws = self._websockets.get(caller_id)
if not ws:
return
lock = self._get_send_lock(caller_id)
async with lock:
try:
if sample_rate != 16000:
audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
ratio = 16000 / sample_rate
out_len = int(len(audio) * ratio)
indices = (np.arange(out_len) / ratio).astype(int)
indices = np.clip(indices, 0, len(audio) - 1)
audio = audio[indices]
pcm_data = (audio * 32767).astype(np.int16).tobytes()
payload = base64.b64encode(pcm_data).decode('ascii')
stream_sid = self._stream_sids.get(caller_id, "")
await ws.send_text(json.dumps({
"event": "media",
"streamSid": stream_sid,
"media": {"payload": payload}
}))
except Exception as e:
print(f"[Caller] Failed to send audio: {e}")
async def stream_audio_to_caller(self, caller_id: str, pcm_data: bytes, sample_rate: int):
"""Stream large audio (TTS) to caller in real-time chunks via SignalWire WebSocket."""
ws = self._websockets.get(caller_id)
if not ws:
return
lock = self._get_send_lock(caller_id)
self._streaming_tts.add(caller_id)
chunks_sent = 0
try:
audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
if sample_rate != 16000:
ratio = 16000 / sample_rate
out_len = int(len(audio) * ratio)
indices = (np.arange(out_len) / ratio).astype(int)
indices = np.clip(indices, 0, len(audio) - 1)
audio = audio[indices]
total_chunks = (len(audio) + 959) // 960
duration_s = len(audio) / 16000
print(f"[Caller] TTS stream starting: {duration_s:.1f}s audio, {total_chunks} chunks")
chunk_samples = 960
chunk_duration = chunk_samples / 16000 # 60ms per chunk
for i in range(0, len(audio), chunk_samples):
if caller_id not in self._websockets:
print(f"[Caller] TTS stream aborted: caller {caller_id} disconnected at chunk {chunks_sent}/{total_chunks}")
break
t0 = time.time()
chunk = audio[i:i + chunk_samples]
pcm_chunk = (chunk * 32767).astype(np.int16).tobytes()
payload = base64.b64encode(pcm_chunk).decode('ascii')
stream_sid = self._stream_sids.get(caller_id, "")
async with lock:
await ws.send_text(json.dumps({
"event": "media",
"streamSid": stream_sid,
"media": {"payload": payload}
}))
chunks_sent += 1
# Sleep to match real-time playback rate
elapsed = time.time() - t0
sleep_time = max(0, chunk_duration - elapsed)
await asyncio.sleep(sleep_time)
print(f"[Caller] TTS stream finished: {chunks_sent}/{total_chunks} chunks sent")
except Exception as e:
print(f"[Caller] TTS stream failed at chunk {chunks_sent}: {e}")
finally:
self._streaming_tts.discard(caller_id)
def register_call_sid(self, caller_id: str, call_sid: str):
"""Track SignalWire callSid for a caller"""
self._call_sids[caller_id] = call_sid
def get_call_sid(self, caller_id: str) -> str | None:
"""Get SignalWire callSid for a caller"""
return self._call_sids.get(caller_id)
def unregister_call_sid(self, caller_id: str):
"""Remove callSid tracking"""
self._call_sids.pop(caller_id, None)
def register_stream_sid(self, caller_id: str, stream_sid: str):
"""Track SignalWire streamSid for a caller"""
self._stream_sids[caller_id] = stream_sid
def unregister_stream_sid(self, caller_id: str):
"""Remove streamSid tracking"""
self._stream_sids.pop(caller_id, None)