Update CallerService for SignalWire protocol

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-05 17:40:35 -07:00
parent c22818bfec
commit 051790136e

View File

@@ -1,4 +1,4 @@
"""Browser caller queue and audio stream service""" """Phone caller queue and audio stream service"""
import asyncio import asyncio
import time import time
@@ -8,7 +8,7 @@ from typing import Optional
class CallerService: class CallerService:
"""Manages browser caller queue, channel allocation, and WebSocket streams""" """Manages phone caller queue, channel allocation, and WebSocket streams"""
FIRST_REAL_CHANNEL = 3 FIRST_REAL_CHANNEL = 3
@@ -19,16 +19,17 @@ class CallerService:
self._caller_counter: int = 0 self._caller_counter: int = 0
self._lock = threading.Lock() self._lock = threading.Lock()
self._websockets: dict[str, any] = {} # caller_id -> WebSocket self._websockets: dict[str, any] = {} # caller_id -> WebSocket
self._call_sids: dict[str, str] = {} # caller_id -> SignalWire callSid
self.streaming_tts: bool = False # True while TTS audio is being streamed self.streaming_tts: bool = False # True while TTS audio is being streamed
def add_to_queue(self, caller_id: str, name: str): def add_to_queue(self, caller_id: str, phone: str):
with self._lock: with self._lock:
self._queue.append({ self._queue.append({
"caller_id": caller_id, "caller_id": caller_id,
"name": name, "phone": phone,
"queued_at": time.time(), "queued_at": time.time(),
}) })
print(f"[Caller] {name} added to queue (ID: {caller_id})") print(f"[Caller] {phone} added to queue (ID: {caller_id})")
def remove_from_queue(self, caller_id: str): def remove_from_queue(self, caller_id: str):
with self._lock: with self._lock:
@@ -41,7 +42,7 @@ class CallerService:
return [ return [
{ {
"caller_id": c["caller_id"], "caller_id": c["caller_id"],
"name": c["name"], "phone": c["phone"],
"wait_time": int(now - c["queued_at"]), "wait_time": int(now - c["queued_at"]),
} }
for c in self._queue for c in self._queue
@@ -74,24 +75,25 @@ class CallerService:
channel = self.allocate_channel() channel = self.allocate_channel()
self._caller_counter += 1 self._caller_counter += 1
name = caller["name"] phone = caller["phone"]
call_info = { call_info = {
"caller_id": caller_id, "caller_id": caller_id,
"name": name, "phone": phone,
"channel": channel, "channel": channel,
"started_at": time.time(), "started_at": time.time(),
} }
self.active_calls[caller_id] = call_info self.active_calls[caller_id] = call_info
print(f"[Caller] {name} taken on air — channel {channel}") print(f"[Caller] {phone} taken on air — channel {channel}")
return call_info return call_info
def hangup(self, caller_id: str): def hangup(self, caller_id: str):
call_info = self.active_calls.pop(caller_id, None) call_info = self.active_calls.pop(caller_id, None)
if call_info: if call_info:
self.release_channel(call_info["channel"]) self.release_channel(call_info["channel"])
print(f"[Caller] {call_info['name']} hung up — channel {call_info['channel']} released") print(f"[Caller] {call_info['phone']} hung up — channel {call_info['channel']} released")
self._websockets.pop(caller_id, None) self._websockets.pop(caller_id, None)
self._call_sids.pop(caller_id, None)
def reset(self): def reset(self):
with self._lock: with self._lock:
@@ -102,6 +104,7 @@ class CallerService:
self._allocated_channels.clear() self._allocated_channels.clear()
self._caller_counter = 0 self._caller_counter = 0
self._websockets.clear() self._websockets.clear()
self._call_sids.clear()
print("[Caller] Service reset") print("[Caller] Service reset")
def register_websocket(self, caller_id: str, websocket): def register_websocket(self, caller_id: str, websocket):
@@ -113,15 +116,15 @@ class CallerService:
self._websockets.pop(caller_id, None) self._websockets.pop(caller_id, None)
async def send_audio_to_caller(self, caller_id: str, pcm_data: bytes, sample_rate: int): async def send_audio_to_caller(self, caller_id: str, pcm_data: bytes, sample_rate: int):
"""Send small audio chunk to real caller via WebSocket binary frame. """Send small audio chunk to caller via SignalWire WebSocket.
For short chunks (host mic, ≤960 samples), sends immediately. Encodes L16 PCM as base64 JSON per SignalWire protocol.
For large chunks (TTS), use stream_audio_to_caller instead.
""" """
ws = self._websockets.get(caller_id) ws = self._websockets.get(caller_id)
if not ws: if not ws:
return return
try: try:
import base64
if sample_rate != 16000: if sample_rate != 16000:
audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0 audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
ratio = 16000 / sample_rate ratio = 16000 / sample_rate
@@ -130,18 +133,26 @@ class CallerService:
indices = np.clip(indices, 0, len(audio) - 1) indices = np.clip(indices, 0, len(audio) - 1)
audio = audio[indices] audio = audio[indices]
pcm_data = (audio * 32767).astype(np.int16).tobytes() pcm_data = (audio * 32767).astype(np.int16).tobytes()
await ws.send_bytes(pcm_data)
payload = base64.b64encode(pcm_data).decode('ascii')
import json
await ws.send_text(json.dumps({
"event": "media",
"media": {"payload": payload}
}))
except Exception as e: except Exception as e:
print(f"[Caller] Failed to send audio: {e}") print(f"[Caller] Failed to send audio: {e}")
async def stream_audio_to_caller(self, caller_id: str, pcm_data: bytes, sample_rate: int): async def stream_audio_to_caller(self, caller_id: str, pcm_data: bytes, sample_rate: int):
"""Stream large audio (TTS) to caller in real-time chunks to avoid buffer overflow.""" """Stream large audio (TTS) to caller in real-time chunks via SignalWire WebSocket."""
ws = self._websockets.get(caller_id) ws = self._websockets.get(caller_id)
if not ws: if not ws:
return return
self.streaming_tts = True self.streaming_tts = True
try: try:
import base64
import json
audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0 audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
if sample_rate != 16000: if sample_rate != 16000:
ratio = 16000 / sample_rate ratio = 16000 / sample_rate
@@ -150,36 +161,32 @@ class CallerService:
indices = np.clip(indices, 0, len(audio) - 1) indices = np.clip(indices, 0, len(audio) - 1)
audio = audio[indices] audio = audio[indices]
# Send in 60ms chunks at real-time rate
chunk_samples = 960 chunk_samples = 960
for i in range(0, len(audio), chunk_samples): for i in range(0, len(audio), chunk_samples):
if caller_id not in self._websockets: if caller_id not in self._websockets:
break break
chunk = audio[i:i + chunk_samples] chunk = audio[i:i + chunk_samples]
pcm_chunk = (chunk * 32767).astype(np.int16).tobytes() pcm_chunk = (chunk * 32767).astype(np.int16).tobytes()
await ws.send_bytes(pcm_chunk) payload = base64.b64encode(pcm_chunk).decode('ascii')
await asyncio.sleep(0.055) # ~60ms, slightly under to stay ahead await ws.send_text(json.dumps({
"event": "media",
"media": {"payload": payload}
}))
await asyncio.sleep(0.055)
except Exception as e: except Exception as e:
print(f"[Caller] Failed to stream audio: {e}") print(f"[Caller] Failed to stream audio: {e}")
finally: finally:
self.streaming_tts = False self.streaming_tts = False
async def notify_caller(self, caller_id: str, message: dict): def register_call_sid(self, caller_id: str, call_sid: str):
"""Send JSON control message to caller""" """Track SignalWire callSid for a caller"""
ws = self._websockets.get(caller_id) self._call_sids[caller_id] = call_sid
if ws:
import json
await ws.send_text(json.dumps(message))
async def disconnect_caller(self, caller_id: str): def get_call_sid(self, caller_id: str) -> str | None:
"""Disconnect a caller's WebSocket""" """Get SignalWire callSid for a caller"""
ws = self._websockets.get(caller_id) return self._call_sids.get(caller_id)
if ws:
try: def unregister_call_sid(self, caller_id: str):
import json """Remove callSid tracking"""
await ws.send_text(json.dumps({"status": "disconnected"})) self._call_sids.pop(caller_id, None)
await ws.close()
except Exception:
pass
self._websockets.pop(caller_id, None)