From c82420ddad86b0b0ad0378e3454588e25a60a342 Mon Sep 17 00:00:00 2001 From: tcpsyn Date: Thu, 5 Feb 2026 13:39:02 -0700 Subject: [PATCH] Add outbound audio streaming to real callers Co-Authored-By: Claude Opus 4.6 --- backend/main.py | 12 ++++++++ backend/services/twilio_service.py | 48 ++++++++++++++++++++++++++++++ tests/test_twilio_service.py | 36 ++++++++++++++++++++++ 3 files changed, 96 insertions(+) diff --git a/backend/main.py b/backend/main.py index d343de2..62e24e2 100644 --- a/backend/main.py +++ b/backend/main.py @@ -663,6 +663,13 @@ async def text_to_speech(request: TTSRequest): ) thread.start() + # Also send to active real callers so they hear the AI + if session.active_real_caller: + call_sid = session.active_real_caller["call_sid"] + asyncio.create_task( + twilio_service.send_audio_to_caller(call_sid, audio_bytes, 24000) + ) + return {"status": "playing", "duration": len(audio_bytes) / 2 / 24000} @@ -877,6 +884,9 @@ async def twilio_media_stream(websocket: WebSocket): if event == "start": stream_sid = msg["start"]["streamSid"] call_sid = msg["start"]["callSid"] + twilio_service.register_websocket(call_sid, websocket) + if call_sid in twilio_service.active_calls: + twilio_service.active_calls[call_sid]["stream_sid"] = stream_sid print(f"[Twilio WS] Stream started: {stream_sid} for call {call_sid}") elif event == "media": @@ -912,6 +922,8 @@ async def twilio_media_stream(websocket: WebSocket): except Exception as e: print(f"[Twilio WS] Error: {e}") finally: + if call_sid: + twilio_service.unregister_websocket(call_sid) # Transcribe any remaining audio if audio_buffer and call_sid: asyncio.create_task( diff --git a/backend/services/twilio_service.py b/backend/services/twilio_service.py index 185a807..908c4b2 100644 --- a/backend/services/twilio_service.py +++ b/backend/services/twilio_service.py @@ -1,5 +1,8 @@ """Twilio call queue and media stream service""" +import asyncio +import base64 +import audioop import time import threading from typing import Optional @@ -16,6 +19,7 @@ class TwilioService: self._allocated_channels: set[int] = set() self._caller_counter: int = 0 self._lock = threading.Lock() + self._websockets: dict[str, any] = {} # call_sid -> WebSocket def add_to_queue(self, call_sid: str, phone: str): with self._lock: @@ -88,6 +92,7 @@ class TwilioService: if call_info: self.release_channel(call_info["channel"]) print(f"[Twilio] {call_info['name']} hung up — channel {call_info['channel']} released") + self._websockets.pop(call_sid, None) def reset(self): with self._lock: @@ -97,4 +102,47 @@ class TwilioService: self.active_calls.clear() self._allocated_channels.clear() self._caller_counter = 0 + self._websockets.clear() print("[Twilio] Service reset") + + def register_websocket(self, call_sid: str, websocket): + """Register a WebSocket for a call""" + self._websockets[call_sid] = websocket + + def unregister_websocket(self, call_sid: str): + """Unregister a WebSocket""" + self._websockets.pop(call_sid, None) + + async def send_audio_to_caller(self, call_sid: str, pcm_data: bytes, sample_rate: int): + """Send audio back to real caller via Twilio WebSocket""" + ws = self._websockets.get(call_sid) + if not ws: + return + + call_info = self.active_calls.get(call_sid) + if not call_info or "stream_sid" not in call_info: + return + + try: + # Resample to 8kHz if needed + if sample_rate != 8000: + import numpy as np + import librosa + audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0 + audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=8000) + pcm_data = (audio * 32767).astype(np.int16).tobytes() + + # Convert PCM to mulaw + mulaw_data = audioop.lin2ulaw(pcm_data, 2) + + # Send as Twilio media message + import json + await ws.send_text(json.dumps({ + "event": "media", + "streamSid": call_info["stream_sid"], + "media": { + "payload": base64.b64encode(mulaw_data).decode("ascii"), + }, + })) + except Exception as e: + print(f"[Twilio] Failed to send audio to caller: {e}") diff --git a/tests/test_twilio_service.py b/tests/test_twilio_service.py index 6070c15..3d37422 100644 --- a/tests/test_twilio_service.py +++ b/tests/test_twilio_service.py @@ -65,3 +65,39 @@ def test_caller_counter_increments(): r2 = svc.take_call("CA2") assert r1["name"] == "Caller #1" assert r2["name"] == "Caller #2" + + +def test_register_and_unregister_websocket(): + svc = TwilioService() + fake_ws = object() + svc.register_websocket("CA123", fake_ws) + assert svc._websockets["CA123"] is fake_ws + svc.unregister_websocket("CA123") + assert "CA123" not in svc._websockets + + +def test_hangup_clears_websocket(): + svc = TwilioService() + svc.add_to_queue("CA123", "+15125550142") + svc.take_call("CA123") + svc.register_websocket("CA123", object()) + svc.hangup("CA123") + assert "CA123" not in svc._websockets + + +def test_reset_clears_websockets(): + svc = TwilioService() + svc.register_websocket("CA1", object()) + svc.register_websocket("CA2", object()) + svc.reset() + assert svc._websockets == {} + + +def test_send_audio_no_websocket(): + """send_audio_to_caller returns silently when no WS registered""" + import asyncio + svc = TwilioService() + # Should not raise + asyncio.get_event_loop().run_until_complete( + svc.send_audio_to_caller("CA_NONE", b"\x00" * 100, 8000) + )