From e46337a05ab40faa766ef1119e523d0486265b63 Mon Sep 17 00:00:00 2001 From: tcpsyn Date: Fri, 6 Feb 2026 00:21:25 -0700 Subject: [PATCH] Add session news/research fields and helper functions Co-Authored-By: Claude Opus 4.6 --- backend/main.py | 514 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 440 insertions(+), 74 deletions(-) diff --git a/backend/main.py b/backend/main.py index f891703..5d85a38 100644 --- a/backend/main.py +++ b/backend/main.py @@ -2,6 +2,9 @@ import uuid import asyncio +import base64 +import threading +import traceback from dataclasses import dataclass, field from pathlib import Path from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Request, Response @@ -10,6 +13,7 @@ from fastapi.responses import FileResponse import json import time import httpx +import numpy as np from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional @@ -20,6 +24,7 @@ from .services.transcription import transcribe_audio from .services.llm import llm_service from .services.tts import generate_speech from .services.audio import audio_service +from .services.news import news_service, extract_keywords app = FastAPI(title="AI Radio Show") @@ -321,6 +326,9 @@ class Session: self.active_real_caller: dict | None = None self.ai_respond_mode: str = "manual" # "manual" or "auto" self.auto_followup: bool = False + self.news_headlines: list = [] + self.research_notes: dict[str, list] = {} + self._research_task: asyncio.Task | None = None def start_call(self, caller_key: str): self.current_caller_key = caller_key @@ -398,12 +406,87 @@ class Session: self.active_real_caller = None self.ai_respond_mode = "manual" self.auto_followup = False + self.news_headlines = [] + self.research_notes = {} + if self._research_task and not self._research_task.done(): + self._research_task.cancel() + self._research_task = None self.id = str(uuid.uuid4())[:8] print(f"[Session] Reset - new session ID: {self.id}") session = Session() caller_service = CallerService() +_ai_response_lock = asyncio.Lock() # Prevents concurrent AI responses +_session_epoch = 0 # Increments on hangup/call start — stale tasks check this + + +# --- News & Research Helpers --- + +async def _fetch_session_headlines(): + try: + session.news_headlines = await news_service.get_headlines() + print(f"[News] Loaded {len(session.news_headlines)} headlines for session") + except Exception as e: + print(f"[News] Failed to load headlines: {e}") + + +async def _background_research(text: str): + keywords = extract_keywords(text) + if not keywords: + return + query = " ".join(keywords) + if query.lower() in session.research_notes: + return + try: + results = await news_service.search_topic(query) + if results: + session.research_notes[query.lower()] = results + print(f"[Research] Found {len(results)} results for '{query}'") + except Exception as e: + print(f"[Research] Error: {e}") + + +def _build_news_context() -> tuple[str, str]: + news_context = "" + if session.news_headlines: + news_context = news_service.format_headlines_for_prompt(session.news_headlines[:6]) + research_context = "" + if session.research_notes: + all_items = [] + for items in session.research_notes.values(): + all_items.extend(items) + seen = set() + unique = [] + for item in all_items: + if item.title not in seen: + seen.add(item.title) + unique.append(item) + research_context = news_service.format_headlines_for_prompt(unique[:8]) + return news_context, research_context + + +# --- Lifecycle --- +@app.on_event("shutdown") +async def shutdown(): + """Clean up resources on server shutdown""" + global _host_audio_task + print("[Server] Shutting down — cleaning up resources...") + # Stop host mic streaming + audio_service.stop_host_stream() + # Cancel host audio sender task + if _host_audio_task and not _host_audio_task.done(): + _host_audio_task.cancel() + try: + await _host_audio_task + except (asyncio.CancelledError, Exception): + pass + _host_audio_task = None + # Disconnect all active callers + for caller_id in list(caller_service.active_calls.keys()): + caller_service.hangup(caller_id) + caller_service.reset() + print("[Server] Cleanup complete") # --- Static Files --- @@ -427,12 +510,16 @@ async def signalwire_voice_webhook(request: Request): call_sid = form.get("CallSid", "") print(f"[SignalWire] Inbound call from {caller_phone} (CallSid: {call_sid})") - ws_scheme = "wss" - host = request.headers.get("host", "radioshow.macneilmediagroup.com") - stream_url = f"{ws_scheme}://{host}/api/signalwire/stream" + # Use dedicated stream URL (ngrok) if configured, otherwise derive from request + if settings.signalwire_stream_url: + stream_url = settings.signalwire_stream_url + else: + host = request.headers.get("host", "radioshow.macneilmediagroup.com") + stream_url = f"wss://{host}/api/signalwire/stream" xml = f""" + You've reached Luke at the Roost. Hold tight, we'll get you on the air. @@ -567,15 +654,19 @@ async def get_callers(): async def reset_session(): """Reset session - all callers get fresh backgrounds""" session.reset() + _chat_updates.clear() return {"status": "reset", "session_id": session.id} @app.post("/api/call/{caller_key}") async def start_call(caller_key: str): """Start a call with a caller""" + global _session_epoch if caller_key not in CALLER_BASES: raise HTTPException(404, "Caller not found") + _session_epoch += 1 + audio_service.stop_caller_audio() session.start_call(caller_key) caller = session.caller # This generates the background if needed @@ -589,14 +680,22 @@ async def start_call(caller_key: str): @app.post("/api/hangup") async def hangup(): """Hang up current call""" + global _session_epoch, _auto_respond_pending + _session_epoch += 1 + # Stop any playing caller audio immediately audio_service.stop_caller_audio() + # Cancel any pending auto-respond + if _auto_respond_pending and not _auto_respond_pending.done(): + _auto_respond_pending.cancel() + _auto_respond_pending = None + _auto_respond_buffer.clear() + caller_name = session.caller["name"] if session.caller else None session.end_call() # Play hangup sound in background so response returns immediately - import threading hangup_sound = settings.sounds_dir / "hangup.wav" if hangup_sound.exists(): threading.Thread(target=audio_service.play_sfx, args=(str(hangup_sound),), daemon=True).start() @@ -668,23 +767,81 @@ def clean_for_tts(text: str) -> str: return text.strip() +# --- Chat Broadcast (for real-time frontend updates) --- +_chat_updates: list[dict] = [] +_CHAT_UPDATES_MAX = 500 + + +def broadcast_chat(sender: str, text: str): + """Add a chat message to the update queue for frontend polling""" + _chat_updates.append({"type": "chat", "sender": sender, "text": text, "id": len(_chat_updates)}) + if len(_chat_updates) > _CHAT_UPDATES_MAX: + del _chat_updates[:_CHAT_UPDATES_MAX // 2] + + +def broadcast_event(event_type: str, data: dict = None): + """Add a system event to the update queue for frontend polling""" + entry = {"type": event_type, "id": len(_chat_updates)} + if data: + entry.update(data) + _chat_updates.append(entry) + + +@app.get("/api/conversation/updates") +async def get_conversation_updates(since: int = 0): + """Get new chat/event messages since a given index""" + return {"messages": _chat_updates[since:]} + + +def _normalize_messages_for_llm(messages: list[dict]) -> list[dict]: + """Convert custom roles (real_caller:X, ai_caller:X) to standard LLM roles""" + normalized = [] + for msg in messages: + role = msg["role"] + content = msg["content"] + if role.startswith("real_caller:"): + caller_label = role.split(":", 1)[1] + normalized.append({"role": "user", "content": f"[Real caller {caller_label}]: {content}"}) + elif role.startswith("ai_caller:"): + normalized.append({"role": "assistant", "content": content}) + elif role == "host": + normalized.append({"role": "user", "content": content}) + else: + normalized.append(msg) + return normalized + + @app.post("/api/chat") async def chat(request: ChatRequest): """Chat with current caller""" if not session.caller: raise HTTPException(400, "No active call") + epoch = _session_epoch session.add_message("user", request.text) - # Include conversation summary and show history for context - conversation_summary = session.get_conversation_summary() - show_history = session.get_show_history() - system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history) + async with _ai_response_lock: + if _session_epoch != epoch: + raise HTTPException(409, "Call ended while waiting") - response = await llm_service.generate( - messages=session.conversation[-10:], # Reduced history for speed - system_prompt=system_prompt - ) + # Stop any playing caller audio so responses don't overlap + audio_service.stop_caller_audio() + + # Include conversation summary and show history for context + conversation_summary = session.get_conversation_summary() + show_history = session.get_show_history() + system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history) + + messages = _normalize_messages_for_llm(session.conversation[-10:]) + response = await llm_service.generate( + messages=messages, + system_prompt=system_prompt + ) + + # Discard if call changed while we were generating + if _session_epoch != epoch: + print(f"[Chat] Discarding stale response (epoch {epoch} → {_session_epoch})") + raise HTTPException(409, "Call changed during response") print(f"[Chat] Raw LLM: {response[:100] if response else '(empty)'}...") @@ -710,19 +867,25 @@ async def chat(request: ChatRequest): @app.post("/api/tts") async def text_to_speech(request: TTSRequest): """Generate and play speech on caller output device (non-blocking)""" - # Validate text is not empty if not request.text or not request.text.strip(): raise HTTPException(400, "Text cannot be empty") - # Phone filter disabled - always use "none" + epoch = _session_epoch + audio_bytes = await generate_speech( request.text, request.voice_id, "none" ) + # Don't play if call changed during TTS generation + if _session_epoch != epoch: + return {"status": "discarded", "duration": 0} + + # Stop any existing audio before playing new + audio_service.stop_caller_audio() + # Play in background thread - returns immediately, can be interrupted by hangup - import threading thread = threading.Thread( target=audio_service.play_caller_audio, args=(audio_bytes, 24000), @@ -858,69 +1021,125 @@ async def signalwire_audio_stream(websocket: WebSocket): try: while True: - raw = await websocket.receive_text() - msg = json.loads(raw) + message = await websocket.receive() + + if message.get("type") == "websocket.disconnect": + break + + raw = message.get("text") + if not raw: + continue + + try: + msg = json.loads(raw) + except json.JSONDecodeError: + continue + event = msg.get("event") if event == "start": custom = msg.get("start", {}).get("customParameters", {}) caller_phone = custom.get("caller_phone", "Unknown") call_sid = custom.get("call_sid", "") + stream_sid = msg.get("start", {}).get("streamSid", "") stream_started = True - print(f"[SignalWire WS] Stream started: {caller_phone} (CallSid: {call_sid})") + print(f"[SignalWire WS] Stream started: {caller_phone} (CallSid: {call_sid}, StreamSid: {stream_sid})") caller_service.add_to_queue(caller_id, caller_phone) caller_service.register_websocket(caller_id, websocket) + broadcast_event("caller_queued", {"phone": caller_phone}) + broadcast_chat("System", f"{caller_phone} is waiting in the queue") + + ring_sound = settings.sounds_dir / "phone_ring.wav" + if ring_sound.exists(): + threading.Thread(target=audio_service.play_sfx, args=(str(ring_sound),), daemon=True).start() + if call_sid: caller_service.register_call_sid(caller_id, call_sid) + if stream_sid: + caller_service.register_stream_sid(caller_id, stream_sid) elif event == "media" and stream_started: - import base64 - payload = msg.get("media", {}).get("payload", "") - if not payload: - continue + try: + payload = msg.get("media", {}).get("payload", "") + if not payload: + continue - pcm_data = base64.b64decode(payload) + pcm_data = base64.b64decode(payload) - call_info = caller_service.active_calls.get(caller_id) - if not call_info: - continue + call_info = caller_service.active_calls.get(caller_id) + if not call_info: + continue - audio_buffer.extend(pcm_data) - audio_service.route_real_caller_audio(pcm_data, SAMPLE_RATE) + audio_buffer.extend(pcm_data) + audio_service.route_real_caller_audio(pcm_data, SAMPLE_RATE) - if len(audio_buffer) >= chunk_samples * 2: - pcm_chunk = bytes(audio_buffer[:chunk_samples * 2]) - audio_buffer = audio_buffer[chunk_samples * 2:] - asyncio.create_task( - _handle_real_caller_transcription(caller_id, pcm_chunk, SAMPLE_RATE) - ) + if len(audio_buffer) >= chunk_samples * 2: + pcm_chunk = bytes(audio_buffer[:chunk_samples * 2]) + audio_buffer = audio_buffer[chunk_samples * 2:] + # Skip transcription if audio is silent + audio_check = np.frombuffer(pcm_chunk, dtype=np.int16).astype(np.float32) / 32768.0 + if np.abs(audio_check).max() < 0.01: + continue + asyncio.create_task( + _safe_transcribe(caller_id, pcm_chunk, SAMPLE_RATE) + ) + except Exception as e: + print(f"[SignalWire WS] Media frame error (non-fatal): {e}") + continue # Skip bad frame, don't disconnect caller elif event == "stop": - print(f"[SignalWire WS] Stream stopped: {caller_phone}") + print(f"[SignalWire WS] Stream stop event received: {caller_phone} (caller_id: {caller_id})") break except WebSocketDisconnect: - print(f"[SignalWire WS] Disconnected: {caller_id} ({caller_phone})") + on_air = caller_id in caller_service.active_calls + tts_active = caller_service.is_streaming_tts(caller_id) + started_at = caller_service.active_calls.get(caller_id, {}).get("started_at") + duration = f"{time.time() - started_at:.0f}s" if started_at else "n/a" + print(f"[SignalWire WS] DROPPED: {caller_id} ({caller_phone}) on_air={on_air} tts_active={tts_active} duration={duration}") + disconnect_reason = "dropped" except Exception as e: print(f"[SignalWire WS] Error: {e}") + traceback.print_exc() + disconnect_reason = f"error: {e}" + else: + disconnect_reason = "clean" finally: + was_on_air = caller_id in caller_service.active_calls caller_service.unregister_websocket(caller_id) caller_service.unregister_call_sid(caller_id) + caller_service.unregister_stream_sid(caller_id) caller_service.remove_from_queue(caller_id) - if caller_id in caller_service.active_calls: + if was_on_air: caller_service.hangup(caller_id) if session.active_real_caller and session.active_real_caller.get("caller_id") == caller_id: session.active_real_caller = None if len(caller_service.active_calls) == 0: audio_service.stop_host_stream() - if audio_buffer: + broadcast_event("caller_disconnected", {"phone": caller_phone, "reason": disconnect_reason}) + broadcast_chat("System", f"{caller_phone} disconnected ({disconnect_reason})") + + drop_sound = settings.sounds_dir / ("busy.wav" if disconnect_reason == "dropped" else "hangup.wav") + if drop_sound.exists(): + threading.Thread(target=audio_service.play_sfx, args=(str(drop_sound),), daemon=True).start() + elif stream_started: + broadcast_chat("System", f"{caller_phone} left the queue") + if audio_buffer and caller_id in caller_service.active_calls: asyncio.create_task( - _handle_real_caller_transcription(caller_id, bytes(audio_buffer), SAMPLE_RATE) + _safe_transcribe(caller_id, bytes(audio_buffer), SAMPLE_RATE) ) +async def _safe_transcribe(caller_id: str, pcm_chunk: bytes, sample_rate: int): + """Wrapper that catches transcription errors so they don't crash anything""" + try: + await _handle_real_caller_transcription(caller_id, pcm_chunk, sample_rate) + except Exception as e: + print(f"[Transcription] Error (non-fatal): {e}") + + # --- Host Audio Broadcast --- _host_audio_queue: asyncio.Queue = None @@ -928,24 +1147,48 @@ _host_audio_task: asyncio.Task = None async def _host_audio_sender(): - """Persistent task that drains audio queue and sends to callers""" - while True: + """Persistent task that drains audio queue, batches frames, and sends to callers""" + _send_count = [0] + try: + while True: pcm_bytes = await _host_audio_queue.get() - # Skip host mic audio while TTS is streaming to avoid interleaving - if caller_service.streaming_tts: + if caller_service.is_streaming_tts_any(): continue + + # Drain all available frames and concatenate + chunks = [pcm_bytes] + while not _host_audio_queue.empty(): + try: + extra = _host_audio_queue.get_nowait() + if not caller_service.is_streaming_tts_any(): + chunks.append(extra) + except asyncio.QueueEmpty: + break + + combined = b''.join(chunks) + t0 = time.time() for caller_id in list(caller_service.active_calls.keys()): try: - await caller_service.send_audio_to_caller(caller_id, pcm_bytes, 16000) + await caller_service.send_audio_to_caller(caller_id, combined, 16000) except Exception: pass + elapsed = time.time() - t0 + _send_count[0] += 1 + if _send_count[0] % 20 == 0: + qsize = _host_audio_queue.qsize() + audio_ms = len(combined) / 2 / 16000 * 1000 + print(f"[HostAudio] send took {elapsed*1000:.0f}ms, {len(chunks)} chunks batched ({audio_ms:.0f}ms audio), queue: {qsize}") + except asyncio.CancelledError: + print("[HostAudio] Sender task cancelled") + except Exception as e: + print(f"[HostAudio] Sender task error: {e}") def _start_host_audio_sender(): """Start the persistent host audio sender task""" global _host_audio_queue, _host_audio_task if _host_audio_queue is None: - _host_audio_queue = asyncio.Queue(maxsize=100) + _host_audio_queue = asyncio.Queue(maxsize=50) if _host_audio_task is None or _host_audio_task.done(): _host_audio_task = asyncio.create_task(_host_audio_sender()) @@ -1003,8 +1246,14 @@ async def drop_from_queue(caller_id: str): return {"status": "dropped"} +_auto_respond_pending: asyncio.Task | None = None +_auto_respond_buffer: list[str] = [] + + async def _handle_real_caller_transcription(caller_id: str, pcm_data: bytes, sample_rate: int): """Transcribe a chunk of real caller audio and add to conversation""" + global _auto_respond_pending + call_info = caller_service.active_calls.get(caller_id) if not call_info: return @@ -1016,59 +1265,103 @@ async def _handle_real_caller_transcription(caller_id: str, pcm_data: bytes, sam caller_phone = call_info["phone"] print(f"[Real Caller] {caller_phone}: {text}") - # Add to conversation with real_caller role + # Add to conversation and broadcast to frontend session.add_message(f"real_caller:{caller_phone}", text) + broadcast_chat(f"{caller_phone} (caller)", text) - # If AI auto-respond mode is on and an AI caller is active, check if AI should respond + # If AI auto-respond mode is on and an AI caller is active, debounce auto-respond if session.ai_respond_mode == "auto" and session.current_caller_key: - asyncio.create_task(_check_ai_auto_respond(text, caller_phone)) + _auto_respond_buffer.append(text) + # Cancel any pending auto-respond timer and restart it + if _auto_respond_pending and not _auto_respond_pending.done(): + _auto_respond_pending.cancel() + _auto_respond_pending = asyncio.create_task(_debounced_auto_respond(caller_phone)) -async def _check_ai_auto_respond(real_caller_text: str, real_caller_phone: str): - """Check if AI caller should jump in, and generate response if so""" +async def _debounced_auto_respond(caller_phone: str): + """Wait for caller to stop talking (4s pause), then trigger AI response""" + try: + await asyncio.sleep(4) # Wait 4 seconds of silence + except asyncio.CancelledError: + return # More speech came in, timer restarted + + # Gather accumulated text + accumulated = " ".join(_auto_respond_buffer) + _auto_respond_buffer.clear() + + if not accumulated.strip(): + return + + print(f"[Auto-Respond] Caller paused. Accumulated: {accumulated[:100]}...") + await _trigger_ai_auto_respond(accumulated) + + +async def _trigger_ai_auto_respond(accumulated_text: str): + """Generate AI caller response to accumulated real caller speech""" + epoch = _session_epoch + if not session.caller: return + if _ai_response_lock.locked(): + return + # Cooldown check if not hasattr(session, '_last_ai_auto_respond'): session._last_ai_auto_respond = 0 - if time.time() - session._last_ai_auto_respond < 10: + if time.time() - session._last_ai_auto_respond < 5: return ai_name = session.caller["name"] - # Quick "should I respond?" check with minimal LLM call - should_respond = await llm_service.generate( - messages=[{"role": "user", "content": f'Someone just said: "{real_caller_text}". Should {ai_name} jump in? Reply only YES or NO.'}], - system_prompt=f"You're deciding if {ai_name} should respond to what was just said on a radio show. Say YES if it's interesting or relevant to them, NO if not.", - ) + async with _ai_response_lock: + if _session_epoch != epoch: + return # Call changed while waiting for lock - if "YES" not in should_respond.upper(): + print(f"[Auto-Respond] {ai_name} is jumping in...") + session._last_ai_auto_respond = time.time() + audio_service.stop_caller_audio() + broadcast_event("ai_status", {"text": f"{ai_name} is thinking..."}) + + conversation_summary = session.get_conversation_summary() + show_history = session.get_show_history() + system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history) + + messages = _normalize_messages_for_llm(session.conversation[-10:]) + response = await llm_service.generate( + messages=messages, + system_prompt=system_prompt, + ) + + # Discard if call changed during generation + if _session_epoch != epoch: + print(f"[Auto-Respond] Discarding stale response (epoch {epoch} → {_session_epoch})") + broadcast_event("ai_done") return - print(f"[Auto-Respond] {ai_name} is jumping in...") - session._last_ai_auto_respond = time.time() - - # Generate full response - conversation_summary = session.get_conversation_summary() - show_history = session.get_show_history() - system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history) - - response = await llm_service.generate( - messages=session.conversation[-10:], - system_prompt=system_prompt, - ) response = clean_for_tts(response) response = ensure_complete_thought(response) if not response or not response.strip(): + broadcast_event("ai_done") + return + + # Final staleness check before playing audio + if _session_epoch != epoch: + broadcast_event("ai_done") return session.add_message(f"ai_caller:{ai_name}", response) + broadcast_chat(ai_name, response) - # Generate TTS and play + broadcast_event("ai_status", {"text": f"{ai_name} is speaking..."}) audio_bytes = await generate_speech(response, session.caller["voice"], "none") - import threading + # Don't play if call changed during TTS generation + if _session_epoch != epoch: + print(f"[Auto-Respond] Discarding stale TTS (epoch {epoch} → {_session_epoch})") + broadcast_event("ai_done") + return + thread = threading.Thread( target=audio_service.play_caller_audio, args=(audio_bytes, 24000), @@ -1076,22 +1369,96 @@ async def _check_ai_auto_respond(real_caller_text: str, real_caller_phone: str): ) thread.start() - # Also send to active real caller so they hear the AI + broadcast_event("ai_done") + + # Also stream to active real caller so they hear the AI if session.active_real_caller: caller_id = session.active_real_caller["caller_id"] asyncio.create_task( - caller_service.send_audio_to_caller(caller_id, audio_bytes, 24000) + caller_service.stream_audio_to_caller(caller_id, audio_bytes, 24000) ) +@app.post("/api/ai-respond") +async def ai_respond(): + """Trigger AI caller to respond based on current conversation""" + if not session.caller: + raise HTTPException(400, "No active AI caller") + + epoch = _session_epoch + + async with _ai_response_lock: + if _session_epoch != epoch: + raise HTTPException(409, "Call ended while waiting") + + audio_service.stop_caller_audio() + + conversation_summary = session.get_conversation_summary() + show_history = session.get_show_history() + system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history) + + messages = _normalize_messages_for_llm(session.conversation[-10:]) + response = await llm_service.generate( + messages=messages, + system_prompt=system_prompt + ) + + if _session_epoch != epoch: + raise HTTPException(409, "Call changed during response") + + response = clean_for_tts(response) + response = ensure_complete_thought(response) + + if not response or not response.strip(): + response = "Uh... sorry, what was that?" + + ai_name = session.caller["name"] + session.add_message(f"ai_caller:{ai_name}", response) + + # TTS + audio_bytes = await generate_speech(response, session.caller["voice"], "none") + + if _session_epoch != epoch: + raise HTTPException(409, "Call changed during TTS") + + thread = threading.Thread( + target=audio_service.play_caller_audio, + args=(audio_bytes, 24000), + daemon=True, + ) + thread.start() + + # Stream to real caller + if session.active_real_caller: + caller_id = session.active_real_caller["caller_id"] + asyncio.create_task( + caller_service.stream_audio_to_caller(caller_id, audio_bytes, 24000) + ) + + return { + "text": response, + "caller": ai_name, + "voice_id": session.caller["voice"] + } + + # --- Follow-Up & Session Control Endpoints --- @app.post("/api/hangup/real") async def hangup_real_caller(): """Hang up on real caller — disconnect immediately, summarize in background""" + global _session_epoch, _auto_respond_pending if not session.active_real_caller: raise HTTPException(400, "No active real caller") + _session_epoch += 1 + + # Cancel any pending auto-respond + if _auto_respond_pending and not _auto_respond_pending.done(): + _auto_respond_pending.cancel() + _auto_respond_pending = None + _auto_respond_buffer.clear() + caller_id = session.active_real_caller["caller_id"] caller_phone = session.active_real_caller["phone"] conversation_snapshot = list(session.conversation) @@ -1109,7 +1476,6 @@ async def hangup_real_caller(): session.active_real_caller = None - import threading hangup_sound = settings.sounds_dir / "hangup.wav" if hangup_sound.exists(): threading.Thread(target=audio_service.play_sfx, args=(str(hangup_sound),), daemon=True).start()