Add session news/research fields and helper functions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-06 00:21:25 -07:00
parent e28579f909
commit e46337a05a

View File

@@ -2,6 +2,9 @@
import uuid import uuid
import asyncio import asyncio
import base64
import threading
import traceback
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Request, Response from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Request, Response
@@ -10,6 +13,7 @@ from fastapi.responses import FileResponse
import json import json
import time import time
import httpx import httpx
import numpy as np
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional from typing import Optional
@@ -20,6 +24,7 @@ from .services.transcription import transcribe_audio
from .services.llm import llm_service from .services.llm import llm_service
from .services.tts import generate_speech from .services.tts import generate_speech
from .services.audio import audio_service from .services.audio import audio_service
from .services.news import news_service, extract_keywords
app = FastAPI(title="AI Radio Show") app = FastAPI(title="AI Radio Show")
@@ -321,6 +326,9 @@ class Session:
self.active_real_caller: dict | None = None self.active_real_caller: dict | None = None
self.ai_respond_mode: str = "manual" # "manual" or "auto" self.ai_respond_mode: str = "manual" # "manual" or "auto"
self.auto_followup: bool = False self.auto_followup: bool = False
self.news_headlines: list = []
self.research_notes: dict[str, list] = {}
self._research_task: asyncio.Task | None = None
def start_call(self, caller_key: str): def start_call(self, caller_key: str):
self.current_caller_key = caller_key self.current_caller_key = caller_key
@@ -398,12 +406,87 @@ class Session:
self.active_real_caller = None self.active_real_caller = None
self.ai_respond_mode = "manual" self.ai_respond_mode = "manual"
self.auto_followup = False self.auto_followup = False
self.news_headlines = []
self.research_notes = {}
if self._research_task and not self._research_task.done():
self._research_task.cancel()
self._research_task = None
self.id = str(uuid.uuid4())[:8] self.id = str(uuid.uuid4())[:8]
print(f"[Session] Reset - new session ID: {self.id}") print(f"[Session] Reset - new session ID: {self.id}")
session = Session() session = Session()
caller_service = CallerService() caller_service = CallerService()
_ai_response_lock = asyncio.Lock() # Prevents concurrent AI responses
_session_epoch = 0 # Increments on hangup/call start — stale tasks check this
# --- News & Research Helpers ---
async def _fetch_session_headlines():
try:
session.news_headlines = await news_service.get_headlines()
print(f"[News] Loaded {len(session.news_headlines)} headlines for session")
except Exception as e:
print(f"[News] Failed to load headlines: {e}")
async def _background_research(text: str):
keywords = extract_keywords(text)
if not keywords:
return
query = " ".join(keywords)
if query.lower() in session.research_notes:
return
try:
results = await news_service.search_topic(query)
if results:
session.research_notes[query.lower()] = results
print(f"[Research] Found {len(results)} results for '{query}'")
except Exception as e:
print(f"[Research] Error: {e}")
def _build_news_context() -> tuple[str, str]:
news_context = ""
if session.news_headlines:
news_context = news_service.format_headlines_for_prompt(session.news_headlines[:6])
research_context = ""
if session.research_notes:
all_items = []
for items in session.research_notes.values():
all_items.extend(items)
seen = set()
unique = []
for item in all_items:
if item.title not in seen:
seen.add(item.title)
unique.append(item)
research_context = news_service.format_headlines_for_prompt(unique[:8])
return news_context, research_context
# --- Lifecycle ---
@app.on_event("shutdown")
async def shutdown():
"""Clean up resources on server shutdown"""
global _host_audio_task
print("[Server] Shutting down — cleaning up resources...")
# Stop host mic streaming
audio_service.stop_host_stream()
# Cancel host audio sender task
if _host_audio_task and not _host_audio_task.done():
_host_audio_task.cancel()
try:
await _host_audio_task
except (asyncio.CancelledError, Exception):
pass
_host_audio_task = None
# Disconnect all active callers
for caller_id in list(caller_service.active_calls.keys()):
caller_service.hangup(caller_id)
caller_service.reset()
print("[Server] Cleanup complete")
# --- Static Files --- # --- Static Files ---
@@ -427,12 +510,16 @@ async def signalwire_voice_webhook(request: Request):
call_sid = form.get("CallSid", "") call_sid = form.get("CallSid", "")
print(f"[SignalWire] Inbound call from {caller_phone} (CallSid: {call_sid})") print(f"[SignalWire] Inbound call from {caller_phone} (CallSid: {call_sid})")
ws_scheme = "wss" # Use dedicated stream URL (ngrok) if configured, otherwise derive from request
host = request.headers.get("host", "radioshow.macneilmediagroup.com") if settings.signalwire_stream_url:
stream_url = f"{ws_scheme}://{host}/api/signalwire/stream" stream_url = settings.signalwire_stream_url
else:
host = request.headers.get("host", "radioshow.macneilmediagroup.com")
stream_url = f"wss://{host}/api/signalwire/stream"
xml = f"""<?xml version="1.0" encoding="UTF-8"?> xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response> <Response>
<Say voice="woman">You've reached Luke at the Roost. Hold tight, we'll get you on the air.</Say>
<Connect> <Connect>
<Stream url="{stream_url}" codec="L16@16000h"> <Stream url="{stream_url}" codec="L16@16000h">
<Parameter name="caller_phone" value="{caller_phone}"/> <Parameter name="caller_phone" value="{caller_phone}"/>
@@ -567,15 +654,19 @@ async def get_callers():
async def reset_session(): async def reset_session():
"""Reset session - all callers get fresh backgrounds""" """Reset session - all callers get fresh backgrounds"""
session.reset() session.reset()
_chat_updates.clear()
return {"status": "reset", "session_id": session.id} return {"status": "reset", "session_id": session.id}
@app.post("/api/call/{caller_key}") @app.post("/api/call/{caller_key}")
async def start_call(caller_key: str): async def start_call(caller_key: str):
"""Start a call with a caller""" """Start a call with a caller"""
global _session_epoch
if caller_key not in CALLER_BASES: if caller_key not in CALLER_BASES:
raise HTTPException(404, "Caller not found") raise HTTPException(404, "Caller not found")
_session_epoch += 1
audio_service.stop_caller_audio()
session.start_call(caller_key) session.start_call(caller_key)
caller = session.caller # This generates the background if needed caller = session.caller # This generates the background if needed
@@ -589,14 +680,22 @@ async def start_call(caller_key: str):
@app.post("/api/hangup") @app.post("/api/hangup")
async def hangup(): async def hangup():
"""Hang up current call""" """Hang up current call"""
global _session_epoch, _auto_respond_pending
_session_epoch += 1
# Stop any playing caller audio immediately # Stop any playing caller audio immediately
audio_service.stop_caller_audio() audio_service.stop_caller_audio()
# Cancel any pending auto-respond
if _auto_respond_pending and not _auto_respond_pending.done():
_auto_respond_pending.cancel()
_auto_respond_pending = None
_auto_respond_buffer.clear()
caller_name = session.caller["name"] if session.caller else None caller_name = session.caller["name"] if session.caller else None
session.end_call() session.end_call()
# Play hangup sound in background so response returns immediately # Play hangup sound in background so response returns immediately
import threading
hangup_sound = settings.sounds_dir / "hangup.wav" hangup_sound = settings.sounds_dir / "hangup.wav"
if hangup_sound.exists(): if hangup_sound.exists():
threading.Thread(target=audio_service.play_sfx, args=(str(hangup_sound),), daemon=True).start() threading.Thread(target=audio_service.play_sfx, args=(str(hangup_sound),), daemon=True).start()
@@ -668,23 +767,81 @@ def clean_for_tts(text: str) -> str:
return text.strip() return text.strip()
# --- Chat Broadcast (for real-time frontend updates) ---
_chat_updates: list[dict] = []
_CHAT_UPDATES_MAX = 500
def broadcast_chat(sender: str, text: str):
"""Add a chat message to the update queue for frontend polling"""
_chat_updates.append({"type": "chat", "sender": sender, "text": text, "id": len(_chat_updates)})
if len(_chat_updates) > _CHAT_UPDATES_MAX:
del _chat_updates[:_CHAT_UPDATES_MAX // 2]
def broadcast_event(event_type: str, data: dict = None):
"""Add a system event to the update queue for frontend polling"""
entry = {"type": event_type, "id": len(_chat_updates)}
if data:
entry.update(data)
_chat_updates.append(entry)
@app.get("/api/conversation/updates")
async def get_conversation_updates(since: int = 0):
"""Get new chat/event messages since a given index"""
return {"messages": _chat_updates[since:]}
def _normalize_messages_for_llm(messages: list[dict]) -> list[dict]:
"""Convert custom roles (real_caller:X, ai_caller:X) to standard LLM roles"""
normalized = []
for msg in messages:
role = msg["role"]
content = msg["content"]
if role.startswith("real_caller:"):
caller_label = role.split(":", 1)[1]
normalized.append({"role": "user", "content": f"[Real caller {caller_label}]: {content}"})
elif role.startswith("ai_caller:"):
normalized.append({"role": "assistant", "content": content})
elif role == "host":
normalized.append({"role": "user", "content": content})
else:
normalized.append(msg)
return normalized
@app.post("/api/chat") @app.post("/api/chat")
async def chat(request: ChatRequest): async def chat(request: ChatRequest):
"""Chat with current caller""" """Chat with current caller"""
if not session.caller: if not session.caller:
raise HTTPException(400, "No active call") raise HTTPException(400, "No active call")
epoch = _session_epoch
session.add_message("user", request.text) session.add_message("user", request.text)
# Include conversation summary and show history for context async with _ai_response_lock:
conversation_summary = session.get_conversation_summary() if _session_epoch != epoch:
show_history = session.get_show_history() raise HTTPException(409, "Call ended while waiting")
system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history)
response = await llm_service.generate( # Stop any playing caller audio so responses don't overlap
messages=session.conversation[-10:], # Reduced history for speed audio_service.stop_caller_audio()
system_prompt=system_prompt
) # Include conversation summary and show history for context
conversation_summary = session.get_conversation_summary()
show_history = session.get_show_history()
system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history)
messages = _normalize_messages_for_llm(session.conversation[-10:])
response = await llm_service.generate(
messages=messages,
system_prompt=system_prompt
)
# Discard if call changed while we were generating
if _session_epoch != epoch:
print(f"[Chat] Discarding stale response (epoch {epoch}{_session_epoch})")
raise HTTPException(409, "Call changed during response")
print(f"[Chat] Raw LLM: {response[:100] if response else '(empty)'}...") print(f"[Chat] Raw LLM: {response[:100] if response else '(empty)'}...")
@@ -710,19 +867,25 @@ async def chat(request: ChatRequest):
@app.post("/api/tts") @app.post("/api/tts")
async def text_to_speech(request: TTSRequest): async def text_to_speech(request: TTSRequest):
"""Generate and play speech on caller output device (non-blocking)""" """Generate and play speech on caller output device (non-blocking)"""
# Validate text is not empty
if not request.text or not request.text.strip(): if not request.text or not request.text.strip():
raise HTTPException(400, "Text cannot be empty") raise HTTPException(400, "Text cannot be empty")
# Phone filter disabled - always use "none" epoch = _session_epoch
audio_bytes = await generate_speech( audio_bytes = await generate_speech(
request.text, request.text,
request.voice_id, request.voice_id,
"none" "none"
) )
# Don't play if call changed during TTS generation
if _session_epoch != epoch:
return {"status": "discarded", "duration": 0}
# Stop any existing audio before playing new
audio_service.stop_caller_audio()
# Play in background thread - returns immediately, can be interrupted by hangup # Play in background thread - returns immediately, can be interrupted by hangup
import threading
thread = threading.Thread( thread = threading.Thread(
target=audio_service.play_caller_audio, target=audio_service.play_caller_audio,
args=(audio_bytes, 24000), args=(audio_bytes, 24000),
@@ -858,69 +1021,125 @@ async def signalwire_audio_stream(websocket: WebSocket):
try: try:
while True: while True:
raw = await websocket.receive_text() message = await websocket.receive()
msg = json.loads(raw)
if message.get("type") == "websocket.disconnect":
break
raw = message.get("text")
if not raw:
continue
try:
msg = json.loads(raw)
except json.JSONDecodeError:
continue
event = msg.get("event") event = msg.get("event")
if event == "start": if event == "start":
custom = msg.get("start", {}).get("customParameters", {}) custom = msg.get("start", {}).get("customParameters", {})
caller_phone = custom.get("caller_phone", "Unknown") caller_phone = custom.get("caller_phone", "Unknown")
call_sid = custom.get("call_sid", "") call_sid = custom.get("call_sid", "")
stream_sid = msg.get("start", {}).get("streamSid", "")
stream_started = True stream_started = True
print(f"[SignalWire WS] Stream started: {caller_phone} (CallSid: {call_sid})") print(f"[SignalWire WS] Stream started: {caller_phone} (CallSid: {call_sid}, StreamSid: {stream_sid})")
caller_service.add_to_queue(caller_id, caller_phone) caller_service.add_to_queue(caller_id, caller_phone)
caller_service.register_websocket(caller_id, websocket) caller_service.register_websocket(caller_id, websocket)
broadcast_event("caller_queued", {"phone": caller_phone})
broadcast_chat("System", f"{caller_phone} is waiting in the queue")
ring_sound = settings.sounds_dir / "phone_ring.wav"
if ring_sound.exists():
threading.Thread(target=audio_service.play_sfx, args=(str(ring_sound),), daemon=True).start()
if call_sid: if call_sid:
caller_service.register_call_sid(caller_id, call_sid) caller_service.register_call_sid(caller_id, call_sid)
if stream_sid:
caller_service.register_stream_sid(caller_id, stream_sid)
elif event == "media" and stream_started: elif event == "media" and stream_started:
import base64 try:
payload = msg.get("media", {}).get("payload", "") payload = msg.get("media", {}).get("payload", "")
if not payload: if not payload:
continue continue
pcm_data = base64.b64decode(payload) pcm_data = base64.b64decode(payload)
call_info = caller_service.active_calls.get(caller_id) call_info = caller_service.active_calls.get(caller_id)
if not call_info: if not call_info:
continue continue
audio_buffer.extend(pcm_data) audio_buffer.extend(pcm_data)
audio_service.route_real_caller_audio(pcm_data, SAMPLE_RATE) audio_service.route_real_caller_audio(pcm_data, SAMPLE_RATE)
if len(audio_buffer) >= chunk_samples * 2: if len(audio_buffer) >= chunk_samples * 2:
pcm_chunk = bytes(audio_buffer[:chunk_samples * 2]) pcm_chunk = bytes(audio_buffer[:chunk_samples * 2])
audio_buffer = audio_buffer[chunk_samples * 2:] audio_buffer = audio_buffer[chunk_samples * 2:]
asyncio.create_task( # Skip transcription if audio is silent
_handle_real_caller_transcription(caller_id, pcm_chunk, SAMPLE_RATE) audio_check = np.frombuffer(pcm_chunk, dtype=np.int16).astype(np.float32) / 32768.0
) if np.abs(audio_check).max() < 0.01:
continue
asyncio.create_task(
_safe_transcribe(caller_id, pcm_chunk, SAMPLE_RATE)
)
except Exception as e:
print(f"[SignalWire WS] Media frame error (non-fatal): {e}")
continue # Skip bad frame, don't disconnect caller
elif event == "stop": elif event == "stop":
print(f"[SignalWire WS] Stream stopped: {caller_phone}") print(f"[SignalWire WS] Stream stop event received: {caller_phone} (caller_id: {caller_id})")
break break
except WebSocketDisconnect: except WebSocketDisconnect:
print(f"[SignalWire WS] Disconnected: {caller_id} ({caller_phone})") on_air = caller_id in caller_service.active_calls
tts_active = caller_service.is_streaming_tts(caller_id)
started_at = caller_service.active_calls.get(caller_id, {}).get("started_at")
duration = f"{time.time() - started_at:.0f}s" if started_at else "n/a"
print(f"[SignalWire WS] DROPPED: {caller_id} ({caller_phone}) on_air={on_air} tts_active={tts_active} duration={duration}")
disconnect_reason = "dropped"
except Exception as e: except Exception as e:
print(f"[SignalWire WS] Error: {e}") print(f"[SignalWire WS] Error: {e}")
traceback.print_exc()
disconnect_reason = f"error: {e}"
else:
disconnect_reason = "clean"
finally: finally:
was_on_air = caller_id in caller_service.active_calls
caller_service.unregister_websocket(caller_id) caller_service.unregister_websocket(caller_id)
caller_service.unregister_call_sid(caller_id) caller_service.unregister_call_sid(caller_id)
caller_service.unregister_stream_sid(caller_id)
caller_service.remove_from_queue(caller_id) caller_service.remove_from_queue(caller_id)
if caller_id in caller_service.active_calls: if was_on_air:
caller_service.hangup(caller_id) caller_service.hangup(caller_id)
if session.active_real_caller and session.active_real_caller.get("caller_id") == caller_id: if session.active_real_caller and session.active_real_caller.get("caller_id") == caller_id:
session.active_real_caller = None session.active_real_caller = None
if len(caller_service.active_calls) == 0: if len(caller_service.active_calls) == 0:
audio_service.stop_host_stream() audio_service.stop_host_stream()
if audio_buffer: broadcast_event("caller_disconnected", {"phone": caller_phone, "reason": disconnect_reason})
broadcast_chat("System", f"{caller_phone} disconnected ({disconnect_reason})")
drop_sound = settings.sounds_dir / ("busy.wav" if disconnect_reason == "dropped" else "hangup.wav")
if drop_sound.exists():
threading.Thread(target=audio_service.play_sfx, args=(str(drop_sound),), daemon=True).start()
elif stream_started:
broadcast_chat("System", f"{caller_phone} left the queue")
if audio_buffer and caller_id in caller_service.active_calls:
asyncio.create_task( asyncio.create_task(
_handle_real_caller_transcription(caller_id, bytes(audio_buffer), SAMPLE_RATE) _safe_transcribe(caller_id, bytes(audio_buffer), SAMPLE_RATE)
) )
async def _safe_transcribe(caller_id: str, pcm_chunk: bytes, sample_rate: int):
"""Wrapper that catches transcription errors so they don't crash anything"""
try:
await _handle_real_caller_transcription(caller_id, pcm_chunk, sample_rate)
except Exception as e:
print(f"[Transcription] Error (non-fatal): {e}")
# --- Host Audio Broadcast --- # --- Host Audio Broadcast ---
_host_audio_queue: asyncio.Queue = None _host_audio_queue: asyncio.Queue = None
@@ -928,24 +1147,48 @@ _host_audio_task: asyncio.Task = None
async def _host_audio_sender(): async def _host_audio_sender():
"""Persistent task that drains audio queue and sends to callers""" """Persistent task that drains audio queue, batches frames, and sends to callers"""
while True: _send_count = [0]
try:
while True:
pcm_bytes = await _host_audio_queue.get() pcm_bytes = await _host_audio_queue.get()
# Skip host mic audio while TTS is streaming to avoid interleaving if caller_service.is_streaming_tts_any():
if caller_service.streaming_tts:
continue continue
# Drain all available frames and concatenate
chunks = [pcm_bytes]
while not _host_audio_queue.empty():
try:
extra = _host_audio_queue.get_nowait()
if not caller_service.is_streaming_tts_any():
chunks.append(extra)
except asyncio.QueueEmpty:
break
combined = b''.join(chunks)
t0 = time.time()
for caller_id in list(caller_service.active_calls.keys()): for caller_id in list(caller_service.active_calls.keys()):
try: try:
await caller_service.send_audio_to_caller(caller_id, pcm_bytes, 16000) await caller_service.send_audio_to_caller(caller_id, combined, 16000)
except Exception: except Exception:
pass pass
elapsed = time.time() - t0
_send_count[0] += 1
if _send_count[0] % 20 == 0:
qsize = _host_audio_queue.qsize()
audio_ms = len(combined) / 2 / 16000 * 1000
print(f"[HostAudio] send took {elapsed*1000:.0f}ms, {len(chunks)} chunks batched ({audio_ms:.0f}ms audio), queue: {qsize}")
except asyncio.CancelledError:
print("[HostAudio] Sender task cancelled")
except Exception as e:
print(f"[HostAudio] Sender task error: {e}")
def _start_host_audio_sender(): def _start_host_audio_sender():
"""Start the persistent host audio sender task""" """Start the persistent host audio sender task"""
global _host_audio_queue, _host_audio_task global _host_audio_queue, _host_audio_task
if _host_audio_queue is None: if _host_audio_queue is None:
_host_audio_queue = asyncio.Queue(maxsize=100) _host_audio_queue = asyncio.Queue(maxsize=50)
if _host_audio_task is None or _host_audio_task.done(): if _host_audio_task is None or _host_audio_task.done():
_host_audio_task = asyncio.create_task(_host_audio_sender()) _host_audio_task = asyncio.create_task(_host_audio_sender())
@@ -1003,8 +1246,14 @@ async def drop_from_queue(caller_id: str):
return {"status": "dropped"} return {"status": "dropped"}
_auto_respond_pending: asyncio.Task | None = None
_auto_respond_buffer: list[str] = []
async def _handle_real_caller_transcription(caller_id: str, pcm_data: bytes, sample_rate: int): async def _handle_real_caller_transcription(caller_id: str, pcm_data: bytes, sample_rate: int):
"""Transcribe a chunk of real caller audio and add to conversation""" """Transcribe a chunk of real caller audio and add to conversation"""
global _auto_respond_pending
call_info = caller_service.active_calls.get(caller_id) call_info = caller_service.active_calls.get(caller_id)
if not call_info: if not call_info:
return return
@@ -1016,59 +1265,103 @@ async def _handle_real_caller_transcription(caller_id: str, pcm_data: bytes, sam
caller_phone = call_info["phone"] caller_phone = call_info["phone"]
print(f"[Real Caller] {caller_phone}: {text}") print(f"[Real Caller] {caller_phone}: {text}")
# Add to conversation with real_caller role # Add to conversation and broadcast to frontend
session.add_message(f"real_caller:{caller_phone}", text) session.add_message(f"real_caller:{caller_phone}", text)
broadcast_chat(f"{caller_phone} (caller)", text)
# If AI auto-respond mode is on and an AI caller is active, check if AI should respond # If AI auto-respond mode is on and an AI caller is active, debounce auto-respond
if session.ai_respond_mode == "auto" and session.current_caller_key: if session.ai_respond_mode == "auto" and session.current_caller_key:
asyncio.create_task(_check_ai_auto_respond(text, caller_phone)) _auto_respond_buffer.append(text)
# Cancel any pending auto-respond timer and restart it
if _auto_respond_pending and not _auto_respond_pending.done():
_auto_respond_pending.cancel()
_auto_respond_pending = asyncio.create_task(_debounced_auto_respond(caller_phone))
async def _check_ai_auto_respond(real_caller_text: str, real_caller_phone: str): async def _debounced_auto_respond(caller_phone: str):
"""Check if AI caller should jump in, and generate response if so""" """Wait for caller to stop talking (4s pause), then trigger AI response"""
try:
await asyncio.sleep(4) # Wait 4 seconds of silence
except asyncio.CancelledError:
return # More speech came in, timer restarted
# Gather accumulated text
accumulated = " ".join(_auto_respond_buffer)
_auto_respond_buffer.clear()
if not accumulated.strip():
return
print(f"[Auto-Respond] Caller paused. Accumulated: {accumulated[:100]}...")
await _trigger_ai_auto_respond(accumulated)
async def _trigger_ai_auto_respond(accumulated_text: str):
"""Generate AI caller response to accumulated real caller speech"""
epoch = _session_epoch
if not session.caller: if not session.caller:
return return
if _ai_response_lock.locked():
return
# Cooldown check # Cooldown check
if not hasattr(session, '_last_ai_auto_respond'): if not hasattr(session, '_last_ai_auto_respond'):
session._last_ai_auto_respond = 0 session._last_ai_auto_respond = 0
if time.time() - session._last_ai_auto_respond < 10: if time.time() - session._last_ai_auto_respond < 5:
return return
ai_name = session.caller["name"] ai_name = session.caller["name"]
# Quick "should I respond?" check with minimal LLM call async with _ai_response_lock:
should_respond = await llm_service.generate( if _session_epoch != epoch:
messages=[{"role": "user", "content": f'Someone just said: "{real_caller_text}". Should {ai_name} jump in? Reply only YES or NO.'}], return # Call changed while waiting for lock
system_prompt=f"You're deciding if {ai_name} should respond to what was just said on a radio show. Say YES if it's interesting or relevant to them, NO if not.",
)
if "YES" not in should_respond.upper(): print(f"[Auto-Respond] {ai_name} is jumping in...")
session._last_ai_auto_respond = time.time()
audio_service.stop_caller_audio()
broadcast_event("ai_status", {"text": f"{ai_name} is thinking..."})
conversation_summary = session.get_conversation_summary()
show_history = session.get_show_history()
system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history)
messages = _normalize_messages_for_llm(session.conversation[-10:])
response = await llm_service.generate(
messages=messages,
system_prompt=system_prompt,
)
# Discard if call changed during generation
if _session_epoch != epoch:
print(f"[Auto-Respond] Discarding stale response (epoch {epoch}{_session_epoch})")
broadcast_event("ai_done")
return return
print(f"[Auto-Respond] {ai_name} is jumping in...")
session._last_ai_auto_respond = time.time()
# Generate full response
conversation_summary = session.get_conversation_summary()
show_history = session.get_show_history()
system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history)
response = await llm_service.generate(
messages=session.conversation[-10:],
system_prompt=system_prompt,
)
response = clean_for_tts(response) response = clean_for_tts(response)
response = ensure_complete_thought(response) response = ensure_complete_thought(response)
if not response or not response.strip(): if not response or not response.strip():
broadcast_event("ai_done")
return
# Final staleness check before playing audio
if _session_epoch != epoch:
broadcast_event("ai_done")
return return
session.add_message(f"ai_caller:{ai_name}", response) session.add_message(f"ai_caller:{ai_name}", response)
broadcast_chat(ai_name, response)
# Generate TTS and play broadcast_event("ai_status", {"text": f"{ai_name} is speaking..."})
audio_bytes = await generate_speech(response, session.caller["voice"], "none") audio_bytes = await generate_speech(response, session.caller["voice"], "none")
import threading # Don't play if call changed during TTS generation
if _session_epoch != epoch:
print(f"[Auto-Respond] Discarding stale TTS (epoch {epoch}{_session_epoch})")
broadcast_event("ai_done")
return
thread = threading.Thread( thread = threading.Thread(
target=audio_service.play_caller_audio, target=audio_service.play_caller_audio,
args=(audio_bytes, 24000), args=(audio_bytes, 24000),
@@ -1076,22 +1369,96 @@ async def _check_ai_auto_respond(real_caller_text: str, real_caller_phone: str):
) )
thread.start() thread.start()
# Also send to active real caller so they hear the AI broadcast_event("ai_done")
# Also stream to active real caller so they hear the AI
if session.active_real_caller: if session.active_real_caller:
caller_id = session.active_real_caller["caller_id"] caller_id = session.active_real_caller["caller_id"]
asyncio.create_task( asyncio.create_task(
caller_service.send_audio_to_caller(caller_id, audio_bytes, 24000) caller_service.stream_audio_to_caller(caller_id, audio_bytes, 24000)
) )
@app.post("/api/ai-respond")
async def ai_respond():
"""Trigger AI caller to respond based on current conversation"""
if not session.caller:
raise HTTPException(400, "No active AI caller")
epoch = _session_epoch
async with _ai_response_lock:
if _session_epoch != epoch:
raise HTTPException(409, "Call ended while waiting")
audio_service.stop_caller_audio()
conversation_summary = session.get_conversation_summary()
show_history = session.get_show_history()
system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history)
messages = _normalize_messages_for_llm(session.conversation[-10:])
response = await llm_service.generate(
messages=messages,
system_prompt=system_prompt
)
if _session_epoch != epoch:
raise HTTPException(409, "Call changed during response")
response = clean_for_tts(response)
response = ensure_complete_thought(response)
if not response or not response.strip():
response = "Uh... sorry, what was that?"
ai_name = session.caller["name"]
session.add_message(f"ai_caller:{ai_name}", response)
# TTS
audio_bytes = await generate_speech(response, session.caller["voice"], "none")
if _session_epoch != epoch:
raise HTTPException(409, "Call changed during TTS")
thread = threading.Thread(
target=audio_service.play_caller_audio,
args=(audio_bytes, 24000),
daemon=True,
)
thread.start()
# Stream to real caller
if session.active_real_caller:
caller_id = session.active_real_caller["caller_id"]
asyncio.create_task(
caller_service.stream_audio_to_caller(caller_id, audio_bytes, 24000)
)
return {
"text": response,
"caller": ai_name,
"voice_id": session.caller["voice"]
}
# --- Follow-Up & Session Control Endpoints --- # --- Follow-Up & Session Control Endpoints ---
@app.post("/api/hangup/real") @app.post("/api/hangup/real")
async def hangup_real_caller(): async def hangup_real_caller():
"""Hang up on real caller — disconnect immediately, summarize in background""" """Hang up on real caller — disconnect immediately, summarize in background"""
global _session_epoch, _auto_respond_pending
if not session.active_real_caller: if not session.active_real_caller:
raise HTTPException(400, "No active real caller") raise HTTPException(400, "No active real caller")
_session_epoch += 1
# Cancel any pending auto-respond
if _auto_respond_pending and not _auto_respond_pending.done():
_auto_respond_pending.cancel()
_auto_respond_pending = None
_auto_respond_buffer.clear()
caller_id = session.active_real_caller["caller_id"] caller_id = session.active_real_caller["caller_id"]
caller_phone = session.active_real_caller["phone"] caller_phone = session.active_real_caller["phone"]
conversation_snapshot = list(session.conversation) conversation_snapshot = list(session.conversation)
@@ -1109,7 +1476,6 @@ async def hangup_real_caller():
session.active_real_caller = None session.active_real_caller = None
import threading
hangup_sound = settings.sounds_dir / "hangup.wav" hangup_sound = settings.sounds_dir / "hangup.wav"
if hangup_sound.exists(): if hangup_sound.exists():
threading.Thread(target=audio_service.play_sfx, args=(str(hangup_sound),), daemon=True).start() threading.Thread(target=audio_service.play_sfx, args=(str(hangup_sound),), daemon=True).start()