Add browser caller WebSocket handler with PCM audio streaming
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -777,6 +777,92 @@ async def update_settings(data: dict):
|
||||
return llm_service.get_settings()
|
||||
|
||||
|
||||
# --- Browser Caller WebSocket ---
|
||||
|
||||
@app.websocket("/api/caller/stream")
|
||||
async def caller_audio_stream(websocket: WebSocket):
|
||||
"""Handle browser caller WebSocket — bidirectional audio"""
|
||||
await websocket.accept()
|
||||
|
||||
caller_id = str(uuid.uuid4())[:8]
|
||||
caller_name = "Anonymous"
|
||||
audio_buffer = bytearray()
|
||||
CHUNK_DURATION_S = 3
|
||||
SAMPLE_RATE = 16000
|
||||
chunk_samples = CHUNK_DURATION_S * SAMPLE_RATE
|
||||
|
||||
try:
|
||||
# Wait for join message
|
||||
join_data = await websocket.receive_text()
|
||||
join_msg = json.loads(join_data)
|
||||
if join_msg.get("type") == "join":
|
||||
caller_name = join_msg.get("name", "Anonymous").strip() or "Anonymous"
|
||||
|
||||
# Add to queue
|
||||
caller_service.add_to_queue(caller_id, caller_name)
|
||||
caller_service.register_websocket(caller_id, websocket)
|
||||
|
||||
# Notify caller they're queued
|
||||
queue = caller_service.get_queue()
|
||||
position = next((i+1 for i, c in enumerate(queue) if c["caller_id"] == caller_id), 0)
|
||||
await websocket.send_text(json.dumps({
|
||||
"status": "queued",
|
||||
"caller_id": caller_id,
|
||||
"position": position,
|
||||
}))
|
||||
|
||||
# Main loop — handle both text and binary messages
|
||||
while True:
|
||||
message = await websocket.receive()
|
||||
|
||||
if message.get("type") == "websocket.disconnect":
|
||||
break
|
||||
|
||||
if "bytes" in message and message["bytes"]:
|
||||
# Binary audio data — only process if caller is on air
|
||||
call_info = caller_service.active_calls.get(caller_id)
|
||||
if not call_info:
|
||||
continue # Still in queue, ignore audio
|
||||
|
||||
pcm_data = message["bytes"]
|
||||
audio_buffer.extend(pcm_data)
|
||||
|
||||
# Route to Loopback channel
|
||||
channel = call_info["channel"]
|
||||
audio_service.route_real_caller_audio(pcm_data, channel, SAMPLE_RATE)
|
||||
|
||||
# Transcribe when we have enough audio
|
||||
if len(audio_buffer) >= chunk_samples * 2:
|
||||
pcm_chunk = bytes(audio_buffer[:chunk_samples * 2])
|
||||
audio_buffer = audio_buffer[chunk_samples * 2:]
|
||||
asyncio.create_task(
|
||||
_handle_real_caller_transcription(caller_id, pcm_chunk, SAMPLE_RATE)
|
||||
)
|
||||
|
||||
elif "text" in message and message["text"]:
|
||||
# Control messages (future use)
|
||||
pass
|
||||
|
||||
except WebSocketDisconnect:
|
||||
print(f"[Caller WS] Disconnected: {caller_id} ({caller_name})")
|
||||
except Exception as e:
|
||||
print(f"[Caller WS] Error: {e}")
|
||||
finally:
|
||||
caller_service.unregister_websocket(caller_id)
|
||||
# If still in queue, remove
|
||||
caller_service.remove_from_queue(caller_id)
|
||||
# If on air, clean up
|
||||
if caller_id in caller_service.active_calls:
|
||||
caller_service.hangup(caller_id)
|
||||
if session.active_real_caller and session.active_real_caller.get("caller_id") == caller_id:
|
||||
session.active_real_caller = None
|
||||
# Transcribe remaining audio
|
||||
if audio_buffer:
|
||||
asyncio.create_task(
|
||||
_handle_real_caller_transcription(caller_id, bytes(audio_buffer), SAMPLE_RATE)
|
||||
)
|
||||
|
||||
|
||||
# --- Queue Endpoints ---
|
||||
|
||||
@app.get("/api/queue")
|
||||
|
||||
Reference in New Issue
Block a user