From bf140a77b787b032f0ae64139e215b8b2f3a55c3 Mon Sep 17 00:00:00 2001 From: tcpsyn Date: Thu, 5 Feb 2026 15:49:49 -0700 Subject: [PATCH] Add browser caller WebSocket handler with PCM audio streaming Co-Authored-By: Claude Opus 4.6 --- backend/main.py | 86 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/backend/main.py b/backend/main.py index e7622d2..1982e9a 100644 --- a/backend/main.py +++ b/backend/main.py @@ -777,6 +777,92 @@ async def update_settings(data: dict): return llm_service.get_settings() +# --- Browser Caller WebSocket --- + +@app.websocket("/api/caller/stream") +async def caller_audio_stream(websocket: WebSocket): + """Handle browser caller WebSocket — bidirectional audio""" + await websocket.accept() + + caller_id = str(uuid.uuid4())[:8] + caller_name = "Anonymous" + audio_buffer = bytearray() + CHUNK_DURATION_S = 3 + SAMPLE_RATE = 16000 + chunk_samples = CHUNK_DURATION_S * SAMPLE_RATE + + try: + # Wait for join message + join_data = await websocket.receive_text() + join_msg = json.loads(join_data) + if join_msg.get("type") == "join": + caller_name = join_msg.get("name", "Anonymous").strip() or "Anonymous" + + # Add to queue + caller_service.add_to_queue(caller_id, caller_name) + caller_service.register_websocket(caller_id, websocket) + + # Notify caller they're queued + queue = caller_service.get_queue() + position = next((i+1 for i, c in enumerate(queue) if c["caller_id"] == caller_id), 0) + await websocket.send_text(json.dumps({ + "status": "queued", + "caller_id": caller_id, + "position": position, + })) + + # Main loop — handle both text and binary messages + while True: + message = await websocket.receive() + + if message.get("type") == "websocket.disconnect": + break + + if "bytes" in message and message["bytes"]: + # Binary audio data — only process if caller is on air + call_info = caller_service.active_calls.get(caller_id) + if not call_info: + continue # Still in queue, ignore audio + + pcm_data = message["bytes"] + audio_buffer.extend(pcm_data) + + # Route to Loopback channel + channel = call_info["channel"] + audio_service.route_real_caller_audio(pcm_data, channel, SAMPLE_RATE) + + # Transcribe when we have enough audio + if len(audio_buffer) >= chunk_samples * 2: + pcm_chunk = bytes(audio_buffer[:chunk_samples * 2]) + audio_buffer = audio_buffer[chunk_samples * 2:] + asyncio.create_task( + _handle_real_caller_transcription(caller_id, pcm_chunk, SAMPLE_RATE) + ) + + elif "text" in message and message["text"]: + # Control messages (future use) + pass + + except WebSocketDisconnect: + print(f"[Caller WS] Disconnected: {caller_id} ({caller_name})") + except Exception as e: + print(f"[Caller WS] Error: {e}") + finally: + caller_service.unregister_websocket(caller_id) + # If still in queue, remove + caller_service.remove_from_queue(caller_id) + # If on air, clean up + if caller_id in caller_service.active_calls: + caller_service.hangup(caller_id) + if session.active_real_caller and session.active_real_caller.get("caller_id") == caller_id: + session.active_real_caller = None + # Transcribe remaining audio + if audio_buffer: + asyncio.create_task( + _handle_real_caller_transcription(caller_id, bytes(audio_buffer), SAMPLE_RATE) + ) + + # --- Queue Endpoints --- @app.get("/api/queue")