diff --git a/backend/main.py b/backend/main.py index 3f9e243..f891703 100644 --- a/backend/main.py +++ b/backend/main.py @@ -4,11 +4,12 @@ import uuid import asyncio from dataclasses import dataclass, field from pathlib import Path -from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect +from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Request, Response from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse import json import time +import httpx from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional @@ -416,9 +417,48 @@ async def index(): return FileResponse(frontend_dir / "index.html") -@app.get("/call-in") -async def call_in_page(): - return FileResponse(frontend_dir / "call-in.html") +# --- SignalWire Endpoints --- + +@app.post("/api/signalwire/voice") +async def signalwire_voice_webhook(request: Request): + """Handle inbound call from SignalWire — return XML to start bidirectional stream""" + form = await request.form() + caller_phone = form.get("From", "Unknown") + call_sid = form.get("CallSid", "") + print(f"[SignalWire] Inbound call from {caller_phone} (CallSid: {call_sid})") + + ws_scheme = "wss" + host = request.headers.get("host", "radioshow.macneilmediagroup.com") + stream_url = f"{ws_scheme}://{host}/api/signalwire/stream" + + xml = f""" + + + + + + + +""" + + return Response(content=xml, media_type="application/xml") + + +async def _signalwire_end_call(call_sid: str): + """End a phone call via SignalWire REST API""" + if not call_sid or not settings.signalwire_space: + return + try: + url = f"https://{settings.signalwire_space}/api/laml/2010-04-01/Accounts/{settings.signalwire_project_id}/Calls/{call_sid}" + async with httpx.AsyncClient(timeout=10.0) as client: + response = await client.post( + url, + data={"Status": "completed"}, + auth=(settings.signalwire_project_id, settings.signalwire_token), + ) + print(f"[SignalWire] End call {call_sid}: {response.status_code}") + except Exception as e: + print(f"[SignalWire] Failed to end call {call_sid}: {e}") # --- Request Models --- @@ -802,60 +842,54 @@ async def update_settings(data: dict): return llm_service.get_settings() -# --- Browser Caller WebSocket --- - -@app.websocket("/api/caller/stream") -async def caller_audio_stream(websocket: WebSocket): - """Handle browser caller WebSocket — bidirectional audio""" +@app.websocket("/api/signalwire/stream") +async def signalwire_audio_stream(websocket: WebSocket): + """Handle SignalWire bidirectional audio stream""" await websocket.accept() caller_id = str(uuid.uuid4())[:8] - caller_name = "Anonymous" + caller_phone = "Unknown" + call_sid = "" audio_buffer = bytearray() CHUNK_DURATION_S = 3 SAMPLE_RATE = 16000 chunk_samples = CHUNK_DURATION_S * SAMPLE_RATE + stream_started = False try: - # Wait for join message - join_data = await websocket.receive_text() - join_msg = json.loads(join_data) - if join_msg.get("type") == "join": - caller_name = join_msg.get("name", "Anonymous").strip() or "Anonymous" - - # Add to queue - caller_service.add_to_queue(caller_id, caller_name) - caller_service.register_websocket(caller_id, websocket) - - # Notify caller they're queued - queue = caller_service.get_queue() - position = next((i+1 for i, c in enumerate(queue) if c["caller_id"] == caller_id), 0) - await websocket.send_text(json.dumps({ - "status": "queued", - "caller_id": caller_id, - "position": position, - })) - - # Main loop — handle both text and binary messages while True: - message = await websocket.receive() + raw = await websocket.receive_text() + msg = json.loads(raw) + event = msg.get("event") - if message.get("type") == "websocket.disconnect": - break + if event == "start": + custom = msg.get("start", {}).get("customParameters", {}) + caller_phone = custom.get("caller_phone", "Unknown") + call_sid = custom.get("call_sid", "") + + stream_started = True + print(f"[SignalWire WS] Stream started: {caller_phone} (CallSid: {call_sid})") + + caller_service.add_to_queue(caller_id, caller_phone) + caller_service.register_websocket(caller_id, websocket) + if call_sid: + caller_service.register_call_sid(caller_id, call_sid) + + elif event == "media" and stream_started: + import base64 + payload = msg.get("media", {}).get("payload", "") + if not payload: + continue + + pcm_data = base64.b64decode(payload) - if "bytes" in message and message["bytes"]: - # Binary audio data — only process if caller is on air call_info = caller_service.active_calls.get(caller_id) if not call_info: - continue # Still in queue, ignore audio + continue - pcm_data = message["bytes"] audio_buffer.extend(pcm_data) - - # Route to configured live caller Loopback channel audio_service.route_real_caller_audio(pcm_data, SAMPLE_RATE) - # Transcribe when we have enough audio if len(audio_buffer) >= chunk_samples * 2: pcm_chunk = bytes(audio_buffer[:chunk_samples * 2]) audio_buffer = audio_buffer[chunk_samples * 2:] @@ -863,24 +897,24 @@ async def caller_audio_stream(websocket: WebSocket): _handle_real_caller_transcription(caller_id, pcm_chunk, SAMPLE_RATE) ) - elif "text" in message and message["text"]: - # Control messages (future use) - pass + elif event == "stop": + print(f"[SignalWire WS] Stream stopped: {caller_phone}") + break except WebSocketDisconnect: - print(f"[Caller WS] Disconnected: {caller_id} ({caller_name})") + print(f"[SignalWire WS] Disconnected: {caller_id} ({caller_phone})") except Exception as e: - print(f"[Caller WS] Error: {e}") + print(f"[SignalWire WS] Error: {e}") finally: caller_service.unregister_websocket(caller_id) - # If still in queue, remove + caller_service.unregister_call_sid(caller_id) caller_service.remove_from_queue(caller_id) - # If on air, clean up if caller_id in caller_service.active_calls: caller_service.hangup(caller_id) if session.active_real_caller and session.active_real_caller.get("caller_id") == caller_id: session.active_real_caller = None - # Transcribe remaining audio + if len(caller_service.active_calls) == 0: + audio_service.stop_host_stream() if audio_buffer: asyncio.create_task( _handle_real_caller_transcription(caller_id, bytes(audio_buffer), SAMPLE_RATE) @@ -945,12 +979,9 @@ async def take_call_from_queue(caller_id: str): session.active_real_caller = { "caller_id": call_info["caller_id"], "channel": call_info["channel"], - "name": call_info["name"], + "phone": call_info["phone"], } - # Notify caller they're on air via WebSocket - await caller_service.notify_caller(caller_id, {"status": "on_air", "channel": call_info["channel"]}) - # Start host mic streaming if this is the first real caller if len(caller_service.active_calls) == 1: _start_host_audio_sender() @@ -965,8 +996,10 @@ async def take_call_from_queue(caller_id: str): @app.post("/api/queue/drop/{caller_id}") async def drop_from_queue(caller_id: str): """Drop a caller from the queue""" + call_sid = caller_service.get_call_sid(caller_id) caller_service.remove_from_queue(caller_id) - await caller_service.disconnect_caller(caller_id) + if call_sid: + await _signalwire_end_call(call_sid) return {"status": "dropped"} @@ -980,18 +1013,18 @@ async def _handle_real_caller_transcription(caller_id: str, pcm_data: bytes, sam if not text or not text.strip(): return - caller_name = call_info["name"] - print(f"[Real Caller] {caller_name}: {text}") + caller_phone = call_info["phone"] + print(f"[Real Caller] {caller_phone}: {text}") # Add to conversation with real_caller role - session.add_message(f"real_caller:{caller_name}", text) + session.add_message(f"real_caller:{caller_phone}", text) # If AI auto-respond mode is on and an AI caller is active, check if AI should respond if session.ai_respond_mode == "auto" and session.current_caller_key: - asyncio.create_task(_check_ai_auto_respond(text, caller_name)) + asyncio.create_task(_check_ai_auto_respond(text, caller_phone)) -async def _check_ai_auto_respond(real_caller_text: str, real_caller_name: str): +async def _check_ai_auto_respond(real_caller_text: str, real_caller_phone: str): """Check if AI caller should jump in, and generate response if so""" if not session.caller: return @@ -1060,13 +1093,15 @@ async def hangup_real_caller(): raise HTTPException(400, "No active real caller") caller_id = session.active_real_caller["caller_id"] - caller_name = session.active_real_caller["name"] + caller_phone = session.active_real_caller["phone"] conversation_snapshot = list(session.conversation) auto_followup_enabled = session.auto_followup - # Disconnect the caller immediately + # End the phone call via SignalWire + call_sid = caller_service.get_call_sid(caller_id) caller_service.hangup(caller_id) - await caller_service.disconnect_caller(caller_id) + if call_sid: + asyncio.create_task(_signalwire_end_call(call_sid)) # Stop host streaming if no more active callers if len(caller_service.active_calls) == 0: @@ -1074,24 +1109,22 @@ async def hangup_real_caller(): session.active_real_caller = None - # Play hangup sound in background import threading hangup_sound = settings.sounds_dir / "hangup.wav" if hangup_sound.exists(): threading.Thread(target=audio_service.play_sfx, args=(str(hangup_sound),), daemon=True).start() - # Summarize and store history in background asyncio.create_task( - _summarize_real_call(caller_name, conversation_snapshot, auto_followup_enabled) + _summarize_real_call(caller_phone, conversation_snapshot, auto_followup_enabled) ) return { "status": "disconnected", - "caller": caller_name, + "caller": caller_phone, } -async def _summarize_real_call(caller_name: str, conversation: list, auto_followup_enabled: bool): +async def _summarize_real_call(caller_phone: str, conversation: list, auto_followup_enabled: bool): """Background task: summarize call and store in history""" summary = "" if conversation: @@ -1105,11 +1138,11 @@ async def _summarize_real_call(caller_name: str, conversation: list, auto_follow session.call_history.append(CallRecord( caller_type="real", - caller_name=caller_name, + caller_name=caller_phone, summary=summary, transcript=conversation, )) - print(f"[Real Caller] {caller_name} call summarized: {summary[:80]}...") + print(f"[Real Caller] {caller_phone} call summarized: {summary[:80]}...") if auto_followup_enabled: await _auto_followup(summary)