From 051790136e32cc088f3f769a54f2c0ec47cee249 Mon Sep 17 00:00:00 2001 From: tcpsyn Date: Thu, 5 Feb 2026 17:40:35 -0700 Subject: [PATCH] Update CallerService for SignalWire protocol Co-Authored-By: Claude Opus 4.6 --- backend/services/caller_service.py | 77 ++++++++++++++++-------------- 1 file changed, 42 insertions(+), 35 deletions(-) diff --git a/backend/services/caller_service.py b/backend/services/caller_service.py index ac06e69..836bd71 100644 --- a/backend/services/caller_service.py +++ b/backend/services/caller_service.py @@ -1,4 +1,4 @@ -"""Browser caller queue and audio stream service""" +"""Phone caller queue and audio stream service""" import asyncio import time @@ -8,7 +8,7 @@ from typing import Optional class CallerService: - """Manages browser caller queue, channel allocation, and WebSocket streams""" + """Manages phone caller queue, channel allocation, and WebSocket streams""" FIRST_REAL_CHANNEL = 3 @@ -19,16 +19,17 @@ class CallerService: self._caller_counter: int = 0 self._lock = threading.Lock() self._websockets: dict[str, any] = {} # caller_id -> WebSocket + self._call_sids: dict[str, str] = {} # caller_id -> SignalWire callSid self.streaming_tts: bool = False # True while TTS audio is being streamed - def add_to_queue(self, caller_id: str, name: str): + def add_to_queue(self, caller_id: str, phone: str): with self._lock: self._queue.append({ "caller_id": caller_id, - "name": name, + "phone": phone, "queued_at": time.time(), }) - print(f"[Caller] {name} added to queue (ID: {caller_id})") + print(f"[Caller] {phone} added to queue (ID: {caller_id})") def remove_from_queue(self, caller_id: str): with self._lock: @@ -41,7 +42,7 @@ class CallerService: return [ { "caller_id": c["caller_id"], - "name": c["name"], + "phone": c["phone"], "wait_time": int(now - c["queued_at"]), } for c in self._queue @@ -74,24 +75,25 @@ class CallerService: channel = self.allocate_channel() self._caller_counter += 1 - name = caller["name"] + phone = caller["phone"] call_info = { "caller_id": caller_id, - "name": name, + "phone": phone, "channel": channel, "started_at": time.time(), } self.active_calls[caller_id] = call_info - print(f"[Caller] {name} taken on air — channel {channel}") + print(f"[Caller] {phone} taken on air — channel {channel}") return call_info def hangup(self, caller_id: str): call_info = self.active_calls.pop(caller_id, None) if call_info: self.release_channel(call_info["channel"]) - print(f"[Caller] {call_info['name']} hung up — channel {call_info['channel']} released") + print(f"[Caller] {call_info['phone']} hung up — channel {call_info['channel']} released") self._websockets.pop(caller_id, None) + self._call_sids.pop(caller_id, None) def reset(self): with self._lock: @@ -102,6 +104,7 @@ class CallerService: self._allocated_channels.clear() self._caller_counter = 0 self._websockets.clear() + self._call_sids.clear() print("[Caller] Service reset") def register_websocket(self, caller_id: str, websocket): @@ -113,15 +116,15 @@ class CallerService: self._websockets.pop(caller_id, None) async def send_audio_to_caller(self, caller_id: str, pcm_data: bytes, sample_rate: int): - """Send small audio chunk to real caller via WebSocket binary frame. - For short chunks (host mic, ≤960 samples), sends immediately. - For large chunks (TTS), use stream_audio_to_caller instead. + """Send small audio chunk to caller via SignalWire WebSocket. + Encodes L16 PCM as base64 JSON per SignalWire protocol. """ ws = self._websockets.get(caller_id) if not ws: return try: + import base64 if sample_rate != 16000: audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0 ratio = 16000 / sample_rate @@ -130,18 +133,26 @@ class CallerService: indices = np.clip(indices, 0, len(audio) - 1) audio = audio[indices] pcm_data = (audio * 32767).astype(np.int16).tobytes() - await ws.send_bytes(pcm_data) + + payload = base64.b64encode(pcm_data).decode('ascii') + import json + await ws.send_text(json.dumps({ + "event": "media", + "media": {"payload": payload} + })) except Exception as e: print(f"[Caller] Failed to send audio: {e}") async def stream_audio_to_caller(self, caller_id: str, pcm_data: bytes, sample_rate: int): - """Stream large audio (TTS) to caller in real-time chunks to avoid buffer overflow.""" + """Stream large audio (TTS) to caller in real-time chunks via SignalWire WebSocket.""" ws = self._websockets.get(caller_id) if not ws: return self.streaming_tts = True try: + import base64 + import json audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0 if sample_rate != 16000: ratio = 16000 / sample_rate @@ -150,36 +161,32 @@ class CallerService: indices = np.clip(indices, 0, len(audio) - 1) audio = audio[indices] - # Send in 60ms chunks at real-time rate chunk_samples = 960 for i in range(0, len(audio), chunk_samples): if caller_id not in self._websockets: break chunk = audio[i:i + chunk_samples] pcm_chunk = (chunk * 32767).astype(np.int16).tobytes() - await ws.send_bytes(pcm_chunk) - await asyncio.sleep(0.055) # ~60ms, slightly under to stay ahead + payload = base64.b64encode(pcm_chunk).decode('ascii') + await ws.send_text(json.dumps({ + "event": "media", + "media": {"payload": payload} + })) + await asyncio.sleep(0.055) except Exception as e: print(f"[Caller] Failed to stream audio: {e}") finally: self.streaming_tts = False - async def notify_caller(self, caller_id: str, message: dict): - """Send JSON control message to caller""" - ws = self._websockets.get(caller_id) - if ws: - import json - await ws.send_text(json.dumps(message)) + def register_call_sid(self, caller_id: str, call_sid: str): + """Track SignalWire callSid for a caller""" + self._call_sids[caller_id] = call_sid - async def disconnect_caller(self, caller_id: str): - """Disconnect a caller's WebSocket""" - ws = self._websockets.get(caller_id) - if ws: - try: - import json - await ws.send_text(json.dumps({"status": "disconnected"})) - await ws.close() - except Exception: - pass - self._websockets.pop(caller_id, None) + def get_call_sid(self, caller_id: str) -> str | None: + """Get SignalWire callSid for a caller""" + return self._call_sids.get(caller_id) + + def unregister_call_sid(self, caller_id: str): + """Remove callSid tracking""" + self._call_sids.pop(caller_id, None)