Improve SignalWire streaming, randomize caller names, update frontend
- Add streamSid tracking and per-caller send locks for SignalWire - Improve TTS streaming with real-time pacing and detailed logging - Block host audio to caller during TTS playback - Randomize caller names between sessions from name pools - Update page title and show phone number in UI Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
"""Phone caller queue and audio stream service"""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
import time
|
||||
import threading
|
||||
import numpy as np
|
||||
@@ -20,7 +22,20 @@ class CallerService:
|
||||
self._lock = threading.Lock()
|
||||
self._websockets: dict[str, any] = {} # caller_id -> WebSocket
|
||||
self._call_sids: dict[str, str] = {} # caller_id -> SignalWire callSid
|
||||
self.streaming_tts: bool = False # True while TTS audio is being streamed
|
||||
self._stream_sids: dict[str, str] = {} # caller_id -> SignalWire streamSid
|
||||
self._send_locks: dict[str, asyncio.Lock] = {} # per-caller send lock
|
||||
self._streaming_tts: set[str] = set() # caller_ids currently receiving TTS
|
||||
|
||||
def _get_send_lock(self, caller_id: str) -> asyncio.Lock:
|
||||
if caller_id not in self._send_locks:
|
||||
self._send_locks[caller_id] = asyncio.Lock()
|
||||
return self._send_locks[caller_id]
|
||||
|
||||
def is_streaming_tts(self, caller_id: str) -> bool:
|
||||
return caller_id in self._streaming_tts
|
||||
|
||||
def is_streaming_tts_any(self) -> bool:
|
||||
return len(self._streaming_tts) > 0
|
||||
|
||||
def add_to_queue(self, caller_id: str, phone: str):
|
||||
with self._lock:
|
||||
@@ -94,6 +109,8 @@ class CallerService:
|
||||
print(f"[Caller] {call_info['phone']} hung up — channel {call_info['channel']} released")
|
||||
self._websockets.pop(caller_id, None)
|
||||
self._call_sids.pop(caller_id, None)
|
||||
self._stream_sids.pop(caller_id, None)
|
||||
self._send_locks.pop(caller_id, None)
|
||||
|
||||
def reset(self):
|
||||
with self._lock:
|
||||
@@ -105,6 +122,9 @@ class CallerService:
|
||||
self._caller_counter = 0
|
||||
self._websockets.clear()
|
||||
self._call_sids.clear()
|
||||
self._stream_sids.clear()
|
||||
self._send_locks.clear()
|
||||
self._streaming_tts.clear()
|
||||
print("[Caller] Service reset")
|
||||
|
||||
def register_websocket(self, caller_id: str, websocket):
|
||||
@@ -119,29 +139,34 @@ class CallerService:
|
||||
"""Send small audio chunk to caller via SignalWire WebSocket.
|
||||
Encodes L16 PCM as base64 JSON per SignalWire protocol.
|
||||
"""
|
||||
if caller_id in self._streaming_tts:
|
||||
return # Don't send host audio during TTS streaming
|
||||
|
||||
ws = self._websockets.get(caller_id)
|
||||
if not ws:
|
||||
return
|
||||
|
||||
try:
|
||||
import base64
|
||||
if sample_rate != 16000:
|
||||
audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
|
||||
ratio = 16000 / sample_rate
|
||||
out_len = int(len(audio) * ratio)
|
||||
indices = (np.arange(out_len) / ratio).astype(int)
|
||||
indices = np.clip(indices, 0, len(audio) - 1)
|
||||
audio = audio[indices]
|
||||
pcm_data = (audio * 32767).astype(np.int16).tobytes()
|
||||
lock = self._get_send_lock(caller_id)
|
||||
async with lock:
|
||||
try:
|
||||
if sample_rate != 16000:
|
||||
audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
|
||||
ratio = 16000 / sample_rate
|
||||
out_len = int(len(audio) * ratio)
|
||||
indices = (np.arange(out_len) / ratio).astype(int)
|
||||
indices = np.clip(indices, 0, len(audio) - 1)
|
||||
audio = audio[indices]
|
||||
pcm_data = (audio * 32767).astype(np.int16).tobytes()
|
||||
|
||||
payload = base64.b64encode(pcm_data).decode('ascii')
|
||||
import json
|
||||
await ws.send_text(json.dumps({
|
||||
"event": "media",
|
||||
"media": {"payload": payload}
|
||||
}))
|
||||
except Exception as e:
|
||||
print(f"[Caller] Failed to send audio: {e}")
|
||||
payload = base64.b64encode(pcm_data).decode('ascii')
|
||||
stream_sid = self._stream_sids.get(caller_id, "")
|
||||
await ws.send_text(json.dumps({
|
||||
"event": "media",
|
||||
"streamSid": stream_sid,
|
||||
"media": {"payload": payload}
|
||||
}))
|
||||
except Exception as e:
|
||||
print(f"[Caller] Failed to send audio: {e}")
|
||||
|
||||
async def stream_audio_to_caller(self, caller_id: str, pcm_data: bytes, sample_rate: int):
|
||||
"""Stream large audio (TTS) to caller in real-time chunks via SignalWire WebSocket."""
|
||||
@@ -149,10 +174,10 @@ class CallerService:
|
||||
if not ws:
|
||||
return
|
||||
|
||||
self.streaming_tts = True
|
||||
lock = self._get_send_lock(caller_id)
|
||||
self._streaming_tts.add(caller_id)
|
||||
chunks_sent = 0
|
||||
try:
|
||||
import base64
|
||||
import json
|
||||
audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
|
||||
if sample_rate != 16000:
|
||||
ratio = 16000 / sample_rate
|
||||
@@ -161,23 +186,40 @@ class CallerService:
|
||||
indices = np.clip(indices, 0, len(audio) - 1)
|
||||
audio = audio[indices]
|
||||
|
||||
total_chunks = (len(audio) + 959) // 960
|
||||
duration_s = len(audio) / 16000
|
||||
print(f"[Caller] TTS stream starting: {duration_s:.1f}s audio, {total_chunks} chunks")
|
||||
|
||||
chunk_samples = 960
|
||||
chunk_duration = chunk_samples / 16000 # 60ms per chunk
|
||||
|
||||
for i in range(0, len(audio), chunk_samples):
|
||||
if caller_id not in self._websockets:
|
||||
print(f"[Caller] TTS stream aborted: caller {caller_id} disconnected at chunk {chunks_sent}/{total_chunks}")
|
||||
break
|
||||
t0 = time.time()
|
||||
chunk = audio[i:i + chunk_samples]
|
||||
pcm_chunk = (chunk * 32767).astype(np.int16).tobytes()
|
||||
payload = base64.b64encode(pcm_chunk).decode('ascii')
|
||||
await ws.send_text(json.dumps({
|
||||
"event": "media",
|
||||
"media": {"payload": payload}
|
||||
}))
|
||||
await asyncio.sleep(0.055)
|
||||
stream_sid = self._stream_sids.get(caller_id, "")
|
||||
async with lock:
|
||||
await ws.send_text(json.dumps({
|
||||
"event": "media",
|
||||
"streamSid": stream_sid,
|
||||
"media": {"payload": payload}
|
||||
}))
|
||||
chunks_sent += 1
|
||||
# Sleep to match real-time playback rate
|
||||
elapsed = time.time() - t0
|
||||
sleep_time = max(0, chunk_duration - elapsed)
|
||||
await asyncio.sleep(sleep_time)
|
||||
|
||||
print(f"[Caller] TTS stream finished: {chunks_sent}/{total_chunks} chunks sent")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[Caller] Failed to stream audio: {e}")
|
||||
print(f"[Caller] TTS stream failed at chunk {chunks_sent}: {e}")
|
||||
finally:
|
||||
self.streaming_tts = False
|
||||
self._streaming_tts.discard(caller_id)
|
||||
|
||||
def register_call_sid(self, caller_id: str, call_sid: str):
|
||||
"""Track SignalWire callSid for a caller"""
|
||||
@@ -190,3 +232,11 @@ class CallerService:
|
||||
def unregister_call_sid(self, caller_id: str):
|
||||
"""Remove callSid tracking"""
|
||||
self._call_sids.pop(caller_id, None)
|
||||
|
||||
def register_stream_sid(self, caller_id: str, stream_sid: str):
|
||||
"""Track SignalWire streamSid for a caller"""
|
||||
self._stream_sids[caller_id] = stream_sid
|
||||
|
||||
def unregister_stream_sid(self, caller_id: str):
|
||||
"""Remove streamSid tracking"""
|
||||
self._stream_sids.pop(caller_id, None)
|
||||
|
||||
Reference in New Issue
Block a user