From 3961cfc9d42d9e4bb2ba7ae548538f42b535dcd2 Mon Sep 17 00:00:00 2001 From: tcpsyn Date: Thu, 5 Feb 2026 15:45:08 -0700 Subject: [PATCH] Rename TwilioService to CallerService, remove Twilio-specific audio encoding Co-Authored-By: Claude Opus 4.6 --- backend/main.py | 28 +++--- backend/services/caller_service.py | 147 ++++++++++++++++++++++++++++ backend/services/twilio_service.py | 148 ----------------------------- tests/test_caller_service.py | 102 ++++++++++++++++++++ tests/test_twilio_service.py | 103 -------------------- 5 files changed, 263 insertions(+), 265 deletions(-) create mode 100644 backend/services/caller_service.py delete mode 100644 backend/services/twilio_service.py create mode 100644 tests/test_caller_service.py delete mode 100644 tests/test_twilio_service.py diff --git a/backend/main.py b/backend/main.py index dbd6a48..dd230c8 100644 --- a/backend/main.py +++ b/backend/main.py @@ -17,7 +17,7 @@ from pydantic import BaseModel from typing import Optional from .config import settings -from .services.twilio_service import TwilioService +from .services.caller_service import CallerService from .services.transcription import transcribe_audio from .services.llm import llm_service from .services.tts import generate_speech @@ -405,7 +405,7 @@ class Session: session = Session() -twilio_service = TwilioService() +caller_service = CallerService() # --- Static Files --- @@ -672,7 +672,7 @@ async def text_to_speech(request: TTSRequest): if session.active_real_caller: call_sid = session.active_real_caller["call_sid"] asyncio.create_task( - twilio_service.send_audio_to_caller(call_sid, audio_bytes, 24000) + caller_service.send_audio_to_caller(call_sid, audio_bytes, 24000) ) return {"status": "playing", "duration": len(audio_bytes) / 2 / 24000} @@ -788,7 +788,7 @@ async def twilio_voice_webhook( From: str = Form(...), ): """Handle incoming Twilio call — greet and enqueue""" - twilio_service.add_to_queue(CallSid, From) + caller_service.add_to_queue(CallSid, From) response = VoiceResponse() response.say("You're calling Luke at the Roost. Hold tight, we'll get to you.", voice="alice") @@ -812,14 +812,14 @@ async def twilio_hold_music(): @app.get("/api/queue") async def get_call_queue(): """Get list of callers waiting in queue""" - return {"queue": twilio_service.get_queue()} + return {"queue": caller_service.get_queue()} @app.post("/api/queue/take/{call_sid}") async def take_call_from_queue(call_sid: str): """Take a caller off hold and put them on air""" try: - call_info = twilio_service.take_call(call_sid) + call_info = caller_service.take_call(call_sid) except ValueError as e: raise HTTPException(404, str(e)) @@ -851,7 +851,7 @@ async def take_call_from_queue(call_sid: str): @app.post("/api/queue/drop/{call_sid}") async def drop_from_queue(call_sid: str): """Drop a caller from the queue""" - twilio_service.remove_from_queue(call_sid) + caller_service.remove_from_queue(call_sid) # Hang up the Twilio call from twilio.rest import Client as TwilioClient @@ -889,9 +889,9 @@ async def twilio_media_stream(websocket: WebSocket): if event == "start": stream_sid = msg["start"]["streamSid"] call_sid = msg["start"]["callSid"] - twilio_service.register_websocket(call_sid, websocket) - if call_sid in twilio_service.active_calls: - twilio_service.active_calls[call_sid]["stream_sid"] = stream_sid + caller_service.register_websocket(call_sid, websocket) + if call_sid in caller_service.active_calls: + caller_service.active_calls[call_sid]["stream_sid"] = stream_sid print(f"[Twilio WS] Stream started: {stream_sid} for call {call_sid}") elif event == "media": @@ -902,7 +902,7 @@ async def twilio_media_stream(websocket: WebSocket): audio_buffer.extend(pcm_data) # Get channel for this caller - call_info = twilio_service.active_calls.get(call_sid) + call_info = caller_service.active_calls.get(call_sid) if call_info: channel = call_info["channel"] # Route PCM to the caller's dedicated Loopback channel @@ -928,7 +928,7 @@ async def twilio_media_stream(websocket: WebSocket): print(f"[Twilio WS] Error: {e}") finally: if call_sid: - twilio_service.unregister_websocket(call_sid) + caller_service.unregister_websocket(call_sid) # Transcribe any remaining audio if audio_buffer and call_sid: asyncio.create_task( @@ -938,7 +938,7 @@ async def twilio_media_stream(websocket: WebSocket): async def _handle_real_caller_transcription(call_sid: str, pcm_data: bytes, sample_rate: int): """Transcribe a chunk of real caller audio and add to conversation""" - call_info = twilio_service.active_calls.get(call_sid) + call_info = caller_service.active_calls.get(call_sid) if not call_info: return @@ -1040,7 +1040,7 @@ async def hangup_real_caller(): )) # End the Twilio call - twilio_service.hangup(call_sid) + caller_service.hangup(call_sid) from twilio.rest import Client as TwilioClient if settings.twilio_account_sid and settings.twilio_auth_token: diff --git a/backend/services/caller_service.py b/backend/services/caller_service.py new file mode 100644 index 0000000..9ac870c --- /dev/null +++ b/backend/services/caller_service.py @@ -0,0 +1,147 @@ +"""Browser caller queue and audio stream service""" + +import asyncio +import time +import threading +from typing import Optional + + +class CallerService: + """Manages browser caller queue, channel allocation, and WebSocket streams""" + + FIRST_REAL_CHANNEL = 3 + + def __init__(self): + self._queue: list[dict] = [] + self.active_calls: dict[str, dict] = {} + self._allocated_channels: set[int] = set() + self._caller_counter: int = 0 + self._lock = threading.Lock() + self._websockets: dict[str, any] = {} # caller_id -> WebSocket + + def add_to_queue(self, caller_id: str, name: str): + with self._lock: + self._queue.append({ + "caller_id": caller_id, + "name": name, + "queued_at": time.time(), + }) + print(f"[Caller] {name} added to queue (ID: {caller_id})") + + def remove_from_queue(self, caller_id: str): + with self._lock: + self._queue = [c for c in self._queue if c["caller_id"] != caller_id] + print(f"[Caller] {caller_id} removed from queue") + + def get_queue(self) -> list[dict]: + now = time.time() + with self._lock: + return [ + { + "caller_id": c["caller_id"], + "name": c["name"], + "wait_time": int(now - c["queued_at"]), + } + for c in self._queue + ] + + def allocate_channel(self) -> int: + with self._lock: + ch = self.FIRST_REAL_CHANNEL + while ch in self._allocated_channels: + ch += 1 + self._allocated_channels.add(ch) + return ch + + def release_channel(self, channel: int): + with self._lock: + self._allocated_channels.discard(channel) + + def take_call(self, caller_id: str) -> dict: + caller = None + with self._lock: + for c in self._queue: + if c["caller_id"] == caller_id: + caller = c + break + if caller: + self._queue = [c for c in self._queue if c["caller_id"] != caller_id] + + if not caller: + raise ValueError(f"Caller {caller_id} not in queue") + + channel = self.allocate_channel() + self._caller_counter += 1 + name = caller["name"] + + call_info = { + "caller_id": caller_id, + "name": name, + "channel": channel, + "started_at": time.time(), + } + self.active_calls[caller_id] = call_info + print(f"[Caller] {name} taken on air — channel {channel}") + return call_info + + def hangup(self, caller_id: str): + call_info = self.active_calls.pop(caller_id, None) + if call_info: + self.release_channel(call_info["channel"]) + print(f"[Caller] {call_info['name']} hung up — channel {call_info['channel']} released") + self._websockets.pop(caller_id, None) + + def reset(self): + with self._lock: + for call_info in self.active_calls.values(): + self._allocated_channels.discard(call_info["channel"]) + self._queue.clear() + self.active_calls.clear() + self._allocated_channels.clear() + self._caller_counter = 0 + self._websockets.clear() + print("[Caller] Service reset") + + def register_websocket(self, caller_id: str, websocket): + """Register a WebSocket for a caller""" + self._websockets[caller_id] = websocket + + def unregister_websocket(self, caller_id: str): + """Unregister a WebSocket""" + self._websockets.pop(caller_id, None) + + async def send_audio_to_caller(self, caller_id: str, pcm_data: bytes, sample_rate: int): + """Send audio to real caller via WebSocket binary frame""" + ws = self._websockets.get(caller_id) + if not ws: + return + + try: + if sample_rate != 16000: + import numpy as np + import librosa + audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0 + audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=16000) + pcm_data = (audio * 32767).astype(np.int16).tobytes() + await ws.send_bytes(pcm_data) + except Exception as e: + print(f"[Caller] Failed to send audio: {e}") + + async def notify_caller(self, caller_id: str, message: dict): + """Send JSON control message to caller""" + ws = self._websockets.get(caller_id) + if ws: + import json + await ws.send_text(json.dumps(message)) + + async def disconnect_caller(self, caller_id: str): + """Disconnect a caller's WebSocket""" + ws = self._websockets.get(caller_id) + if ws: + try: + import json + await ws.send_text(json.dumps({"status": "disconnected"})) + await ws.close() + except Exception: + pass + self._websockets.pop(caller_id, None) diff --git a/backend/services/twilio_service.py b/backend/services/twilio_service.py deleted file mode 100644 index 908c4b2..0000000 --- a/backend/services/twilio_service.py +++ /dev/null @@ -1,148 +0,0 @@ -"""Twilio call queue and media stream service""" - -import asyncio -import base64 -import audioop -import time -import threading -from typing import Optional - - -class TwilioService: - """Manages Twilio call queue, channel allocation, and media streams""" - - FIRST_REAL_CHANNEL = 3 - - def __init__(self): - self._queue: list[dict] = [] - self.active_calls: dict[str, dict] = {} - self._allocated_channels: set[int] = set() - self._caller_counter: int = 0 - self._lock = threading.Lock() - self._websockets: dict[str, any] = {} # call_sid -> WebSocket - - def add_to_queue(self, call_sid: str, phone: str): - with self._lock: - self._queue.append({ - "call_sid": call_sid, - "phone": phone, - "queued_at": time.time(), - }) - print(f"[Twilio] Caller {phone} added to queue (SID: {call_sid})") - - def remove_from_queue(self, call_sid: str): - with self._lock: - self._queue = [c for c in self._queue if c["call_sid"] != call_sid] - print(f"[Twilio] Caller {call_sid} removed from queue") - - def get_queue(self) -> list[dict]: - now = time.time() - with self._lock: - return [ - { - "call_sid": c["call_sid"], - "phone": c["phone"], - "wait_time": int(now - c["queued_at"]), - } - for c in self._queue - ] - - def allocate_channel(self) -> int: - with self._lock: - ch = self.FIRST_REAL_CHANNEL - while ch in self._allocated_channels: - ch += 1 - self._allocated_channels.add(ch) - return ch - - def release_channel(self, channel: int): - with self._lock: - self._allocated_channels.discard(channel) - - def take_call(self, call_sid: str) -> dict: - caller = None - with self._lock: - for c in self._queue: - if c["call_sid"] == call_sid: - caller = c - break - if caller: - self._queue = [c for c in self._queue if c["call_sid"] != call_sid] - - if not caller: - raise ValueError(f"Call {call_sid} not in queue") - - channel = self.allocate_channel() - self._caller_counter += 1 - name = f"Caller #{self._caller_counter}" - - call_info = { - "call_sid": call_sid, - "phone": caller["phone"], - "channel": channel, - "name": name, - "started_at": time.time(), - } - self.active_calls[call_sid] = call_info - print(f"[Twilio] {name} ({caller['phone']}) taken on air — channel {channel}") - return call_info - - def hangup(self, call_sid: str): - call_info = self.active_calls.pop(call_sid, None) - if call_info: - self.release_channel(call_info["channel"]) - print(f"[Twilio] {call_info['name']} hung up — channel {call_info['channel']} released") - self._websockets.pop(call_sid, None) - - def reset(self): - with self._lock: - for call_info in self.active_calls.values(): - self._allocated_channels.discard(call_info["channel"]) - self._queue.clear() - self.active_calls.clear() - self._allocated_channels.clear() - self._caller_counter = 0 - self._websockets.clear() - print("[Twilio] Service reset") - - def register_websocket(self, call_sid: str, websocket): - """Register a WebSocket for a call""" - self._websockets[call_sid] = websocket - - def unregister_websocket(self, call_sid: str): - """Unregister a WebSocket""" - self._websockets.pop(call_sid, None) - - async def send_audio_to_caller(self, call_sid: str, pcm_data: bytes, sample_rate: int): - """Send audio back to real caller via Twilio WebSocket""" - ws = self._websockets.get(call_sid) - if not ws: - return - - call_info = self.active_calls.get(call_sid) - if not call_info or "stream_sid" not in call_info: - return - - try: - # Resample to 8kHz if needed - if sample_rate != 8000: - import numpy as np - import librosa - audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0 - audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=8000) - pcm_data = (audio * 32767).astype(np.int16).tobytes() - - # Convert PCM to mulaw - mulaw_data = audioop.lin2ulaw(pcm_data, 2) - - # Send as Twilio media message - import json - await ws.send_text(json.dumps({ - "event": "media", - "streamSid": call_info["stream_sid"], - "media": { - "payload": base64.b64encode(mulaw_data).decode("ascii"), - }, - })) - except Exception as e: - print(f"[Twilio] Failed to send audio to caller: {e}") diff --git a/tests/test_caller_service.py b/tests/test_caller_service.py new file mode 100644 index 0000000..a6c244e --- /dev/null +++ b/tests/test_caller_service.py @@ -0,0 +1,102 @@ +import sys +sys.path.insert(0, "/Users/lukemacneil/ai-podcast") + +from backend.services.caller_service import CallerService + + +def test_queue_starts_empty(): + svc = CallerService() + assert svc.get_queue() == [] + + +def test_add_caller_to_queue(): + svc = CallerService() + svc.add_to_queue("abc123", "Dave") + q = svc.get_queue() + assert len(q) == 1 + assert q[0]["caller_id"] == "abc123" + assert q[0]["name"] == "Dave" + assert "wait_time" in q[0] + + +def test_remove_caller_from_queue(): + svc = CallerService() + svc.add_to_queue("abc123", "Dave") + svc.remove_from_queue("abc123") + assert svc.get_queue() == [] + + +def test_allocate_channel(): + svc = CallerService() + ch1 = svc.allocate_channel() + ch2 = svc.allocate_channel() + assert ch1 == 3 + assert ch2 == 4 + svc.release_channel(ch1) + ch3 = svc.allocate_channel() + assert ch3 == 3 + + +def test_take_call(): + svc = CallerService() + svc.add_to_queue("abc123", "Dave") + result = svc.take_call("abc123") + assert result["caller_id"] == "abc123" + assert result["channel"] >= 3 + assert svc.get_queue() == [] + assert svc.active_calls["abc123"]["channel"] == result["channel"] + + +def test_hangup_real_caller(): + svc = CallerService() + svc.add_to_queue("abc123", "Dave") + svc.take_call("abc123") + ch = svc.active_calls["abc123"]["channel"] + svc.hangup("abc123") + assert "abc123" not in svc.active_calls + assert ch not in svc._allocated_channels + + +def test_caller_counter_increments(): + svc = CallerService() + svc.add_to_queue("id1", "Dave") + svc.add_to_queue("id2", "Sarah") + r1 = svc.take_call("id1") + r2 = svc.take_call("id2") + assert r1["name"] == "Dave" + assert r2["name"] == "Sarah" + + +def test_register_and_unregister_websocket(): + svc = CallerService() + fake_ws = object() + svc.register_websocket("abc123", fake_ws) + assert svc._websockets["abc123"] is fake_ws + svc.unregister_websocket("abc123") + assert "abc123" not in svc._websockets + + +def test_hangup_clears_websocket(): + svc = CallerService() + svc.add_to_queue("abc123", "Dave") + svc.take_call("abc123") + svc.register_websocket("abc123", object()) + svc.hangup("abc123") + assert "abc123" not in svc._websockets + + +def test_reset_clears_websockets(): + svc = CallerService() + svc.register_websocket("id1", object()) + svc.register_websocket("id2", object()) + svc.reset() + assert svc._websockets == {} + + +def test_send_audio_no_websocket(): + """send_audio_to_caller returns silently when no WS registered""" + import asyncio + svc = CallerService() + asyncio.get_event_loop().run_until_complete( + svc.send_audio_to_caller("NONE", b"\x00" * 100, 16000) + ) diff --git a/tests/test_twilio_service.py b/tests/test_twilio_service.py deleted file mode 100644 index 3d37422..0000000 --- a/tests/test_twilio_service.py +++ /dev/null @@ -1,103 +0,0 @@ -import sys -sys.path.insert(0, "/Users/lukemacneil/ai-podcast") - -from backend.services.twilio_service import TwilioService - - -def test_queue_starts_empty(): - svc = TwilioService() - assert svc.get_queue() == [] - - -def test_add_caller_to_queue(): - svc = TwilioService() - svc.add_to_queue("CA123", "+15125550142") - q = svc.get_queue() - assert len(q) == 1 - assert q[0]["call_sid"] == "CA123" - assert q[0]["phone"] == "+15125550142" - assert "wait_time" in q[0] - - -def test_remove_caller_from_queue(): - svc = TwilioService() - svc.add_to_queue("CA123", "+15125550142") - svc.remove_from_queue("CA123") - assert svc.get_queue() == [] - - -def test_allocate_channel(): - svc = TwilioService() - ch1 = svc.allocate_channel() - ch2 = svc.allocate_channel() - assert ch1 == 3 - assert ch2 == 4 - svc.release_channel(ch1) - ch3 = svc.allocate_channel() - assert ch3 == 3 - - -def test_take_call(): - svc = TwilioService() - svc.add_to_queue("CA123", "+15125550142") - result = svc.take_call("CA123") - assert result["call_sid"] == "CA123" - assert result["channel"] >= 3 - assert svc.get_queue() == [] - assert svc.active_calls["CA123"]["channel"] == result["channel"] - - -def test_hangup_real_caller(): - svc = TwilioService() - svc.add_to_queue("CA123", "+15125550142") - svc.take_call("CA123") - ch = svc.active_calls["CA123"]["channel"] - svc.hangup("CA123") - assert "CA123" not in svc.active_calls - assert ch not in svc._allocated_channels - - -def test_caller_counter_increments(): - svc = TwilioService() - svc.add_to_queue("CA1", "+15125550001") - svc.add_to_queue("CA2", "+15125550002") - r1 = svc.take_call("CA1") - r2 = svc.take_call("CA2") - assert r1["name"] == "Caller #1" - assert r2["name"] == "Caller #2" - - -def test_register_and_unregister_websocket(): - svc = TwilioService() - fake_ws = object() - svc.register_websocket("CA123", fake_ws) - assert svc._websockets["CA123"] is fake_ws - svc.unregister_websocket("CA123") - assert "CA123" not in svc._websockets - - -def test_hangup_clears_websocket(): - svc = TwilioService() - svc.add_to_queue("CA123", "+15125550142") - svc.take_call("CA123") - svc.register_websocket("CA123", object()) - svc.hangup("CA123") - assert "CA123" not in svc._websockets - - -def test_reset_clears_websockets(): - svc = TwilioService() - svc.register_websocket("CA1", object()) - svc.register_websocket("CA2", object()) - svc.reset() - assert svc._websockets == {} - - -def test_send_audio_no_websocket(): - """send_audio_to_caller returns silently when no WS registered""" - import asyncio - svc = TwilioService() - # Should not raise - asyncio.get_event_loop().run_until_complete( - svc.send_audio_to_caller("CA_NONE", b"\x00" * 100, 8000) - )