Files
ai-podcast/backend/main.py
2026-02-06 00:35:55 -07:00

1669 lines
63 KiB
Python

"""AI Radio Show - Control Panel Backend"""
import uuid
import asyncio
import base64
import threading
import traceback
from dataclasses import dataclass, field
from pathlib import Path
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Request, Response
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
import json
import time
import httpx
import numpy as np
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
from .config import settings
from .services.caller_service import CallerService
from .services.transcription import transcribe_audio
from .services.llm import llm_service
from .services.tts import generate_speech
from .services.audio import audio_service
from .services.news import news_service, extract_keywords
app = FastAPI(title="AI Radio Show")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Callers ---
# Base caller info (name, voice) - backgrounds generated dynamically per session
import random
CALLER_BASES = {
"1": {"name": "Tony", "voice": "VR6AewLTigWG4xSOukaG", "gender": "male", "age_range": (35, 55)},
"2": {"name": "Jasmine", "voice": "jBpfuIE2acCO8z3wKNLl", "gender": "female", "age_range": (25, 38)},
"3": {"name": "Rick", "voice": "TxGEqnHWrfWFTfGW9XjX", "gender": "male", "age_range": (40, 58)},
"4": {"name": "Megan", "voice": "EXAVITQu4vr4xnSDxMaL", "gender": "female", "age_range": (24, 35)},
"5": {"name": "Dennis", "voice": "pNInz6obpgDQGcFmaJgB", "gender": "male", "age_range": (32, 48)},
"6": {"name": "Tanya", "voice": "21m00Tcm4TlvDq8ikWAM", "gender": "female", "age_range": (30, 45)},
"7": {"name": "Earl", "voice": "ODq5zmih8GrVes37Dizd", "gender": "male", "age_range": (58, 72)},
"8": {"name": "Carla", "voice": "XB0fDUnXU5powFXDhCwa", "gender": "female", "age_range": (38, 52)},
"9": {"name": "Marcus", "voice": "IKne3meq5aSn9XLyUdCD", "gender": "male", "age_range": (24, 34)},
"0": {"name": "Brenda", "voice": "pFZP5JQG7iQjIQuC4Bku", "gender": "female", "age_range": (45, 60)},
}
# Background components for dynamic generation
JOBS_MALE = [
"runs a small HVAC business", "works as a long-haul trucker", "is a high school football coach",
"works construction, mostly commercial jobs", "is a paramedic", "manages a warehouse",
"is a line cook at a decent restaurant", "works IT for the city", "is a union electrician",
"owns a small landscaping company", "is a cop, 12 years on the force", "works at a car dealership",
"is a freelance photographer", "teaches middle school history", "is a firefighter",
"works as a hospital security guard", "runs a food truck", "is a session musician",
"works at a brewery", "is a physical therapist", "drives for UPS", "is a tattoo artist",
"works in insurance, hates it", "is a youth pastor", "manages a gym",
]
JOBS_FEMALE = [
"works as an ER nurse", "is a social worker", "runs a small bakery", "is a dental hygienist",
"works in HR for a hospital", "is a real estate agent", "teaches kindergarten",
"works as a bartender at a nice place", "is a paralegal", "runs a daycare out of her home",
"works retail management", "is a hairstylist, owns her chair", "is a vet tech",
"works in hospital billing", "is a massage therapist", "manages a restaurant",
"is a flight attendant", "works as a 911 dispatcher", "is a personal trainer",
"works at a nonprofit", "is an accountant at a small firm", "does medical transcription from home",
"is a court reporter", "works in pharmaceutical sales", "is a wedding planner",
]
PROBLEMS = [
# Family drama
"hasn't talked to their father in years and just got a call that he's dying",
"found out they were adopted and doesn't know how to process it",
"is being pressured to take care of an aging parent who was never there for them",
"just discovered a family secret that changes everything they thought they knew",
"has a sibling who's destroying themselves and nobody will intervene",
"is estranged from their kids and it's killing them",
"found out their parent had a whole other family nobody knew about",
"is watching their parents' marriage fall apart after 40 years",
# Career and purpose
"woke up and realized they've been in the wrong career for 15 years",
"got passed over for a promotion they deserved and is questioning everything",
"has a dream they gave up on years ago and it's haunting them",
"is successful on paper but feels completely empty inside",
"hates their job but can't afford to leave and it's breaking them",
"just got fired and doesn't know who they are without their work",
"is being asked to do something unethical at work and doesn't know what to do",
"watches their boss take credit for everything and is losing their mind",
# Mental health and inner struggles
"has been putting on a brave face but is barely holding it together",
"can't shake the feeling that their best years are behind them",
"keeps self-sabotaging every good thing in their life and doesn't know why",
"has been numb for months and is starting to scare themselves",
"can't stop comparing themselves to everyone else and it's destroying them",
"has intrusive thoughts they've never told anyone about",
"feels like a fraud and is waiting to be found out",
"is exhausted from being the strong one for everyone else",
# Grief and loss
"lost someone close and hasn't really dealt with it",
"is grieving someone who's still alive but is no longer the person they knew",
"never got closure with someone who died and it's eating at them",
"is watching their best friend slowly die and doesn't know how to be there",
"had a miscarriage nobody knows about and carries it alone",
# Regrets and past mistakes
"made a choice years ago that changed everything and wonders what if",
"hurt someone badly and never apologized, and it haunts them",
"let the one that got away go and thinks about them constantly",
"gave up on something important to make someone else happy and resents it",
"said something they can never take back and the guilt won't fade",
"was a bully growing up and is finally reckoning with it",
# Relationships (non-sexual)
"is falling out of love with their spouse and doesn't know what to do",
"married the wrong person and everyone knows it but them",
"feels invisible in their own relationship",
"is staying for the kids but dying inside",
"realized they don't actually like their partner as a person",
"is jealous of their partner's success and it's poisoning everything",
"found out their partner has been lying about something big",
# Friendship and loneliness
"realized they don't have any real friends, just people who need things from them",
"had a falling out with their best friend and the silence is deafening",
"is surrounded by people but has never felt more alone",
"is jealous of a friend's life and hates themselves for it",
"suspects a close friend is talking shit behind their back",
# Big life decisions
"is thinking about leaving everything behind and starting over somewhere new",
"has to make a choice that will hurt someone no matter what",
"is being pressured into something they don't want but can't say no",
"has been offered an opportunity that would change everything but they're terrified",
"knows they need to end something but can't pull the trigger",
# Addiction and bad habits
"is hiding how much they drink from everyone",
"can't stop gambling and is in deeper than anyone knows",
"is watching themselves become someone they don't recognize",
"keeps making the same mistake over and over expecting different results",
# Attraction and affairs (keep some of the original)
"is attracted to someone they shouldn't be and it's getting harder to ignore",
"has been seeing {affair_person} on the side",
"caught feelings for someone at work and it's fucking everything up",
# Sexual/desire (keep some but less dominant)
"can't stop thinking about {fantasy_subject}",
"discovered something about their own desires that surprised them",
"is questioning their sexuality after something that happened recently",
# General late-night confessions
"can't sleep and has been thinking too much about their life choices",
"had a weird day and needs to process it with someone",
"has been keeping a secret that's eating them alive",
"finally ready to admit something they've never said out loud",
]
PROBLEM_FILLS = {
"time": ["a few weeks", "months", "six months", "a year", "way too long"],
# Affairs (all adults)
"affair_person": ["their partner's best friend", "a coworker", "their ex", "a neighbor", "their boss", "their trainer", "someone they met online", "an old flame"],
# Fantasies and kinks (consensual adult stuff)
"fantasy_subject": ["a threesome", "being dominated", "dominating someone", "their partner with someone else", "a specific coworker", "group sex", "rough sex", "being watched", "exhibitionism"],
"kink": ["anal", "BDSM", "roleplay", "a threesome", "toys", "being tied up", "public sex", "swinging", "filming themselves", "bondage"],
# Secret behaviors (legal adult stuff)
"secret_behavior": ["hooking up with strangers", "sexting people online", "using dating apps behind their partner's back", "having an affair", "going to sex clubs", "watching way too much porn"],
"double_life": ["vanilla at home, freak elsewhere", "straight to their family, not so much in private", "married but on dating apps", "in a relationship but seeing other people"],
"hookup_person": ["their roommate", "a coworker", "their ex", "a friend's spouse", "a stranger from an app", "multiple people", "someone from the gym"],
# Discovery and identity (adult experiences)
"new_discovery": ["the same sex", "being submissive", "being dominant", "kink", "casual sex", "exhibitionism", "that they're bi"],
"unexpected_person": ["the same sex for the first time", "more than one person", "a complete stranger", "someone they never expected to be attracted to", "a friend"],
"sexuality_trigger": ["a specific hookup", "watching certain porn", "a drunk encounter", "realizing they're attracted to a friend", "an unexpected experience"],
"first_time": ["anal", "a threesome", "same-sex stuff", "BDSM", "an open relationship", "casual hookups", "being dominant", "being submissive"],
# Relationship issues
"partner_wants": ["an open relationship", "to bring someone else in", "things they're not sure about", "to watch them with someone else", "to try new things"],
"caught_doing": ["sexting someone", "on a dating app", "watching porn they'd never admit to", "flirting with someone else", "looking at someone's pics"],
# Attractions (appropriate adult scenarios)
"taboo_fantasy": ["someone they work with", "a friend's partner", "a specific scenario", "something they've never said out loud"],
"taboo_attraction": ["someone they work with", "a friend's partner", "their partner's friend", "someone they see all the time"],
}
INTERESTS = [
# General interests (normal people)
"really into true crime podcasts", "watches a lot of reality TV", "into fitness",
"follows sports", "big movie person", "reads a lot", "into music, has opinions",
"goes out a lot, active social life", "homebody, prefers staying in",
"into cooking and food", "outdoorsy type", "gamer", "works a lot, career focused",
# Relationship/psychology focused
"listens to relationship podcasts", "has done therapy, believes in it",
"reads about psychology and why people do what they do", "very online, knows all the discourse",
"into self-improvement stuff", "follows dating advice content",
# Sexually open (not the focus, but present)
"sex-positive, doesn't judge", "has experimented, open about it",
"comfortable with their body", "has stories if you ask",
]
QUIRKS = [
# Conversational style
"says 'honestly' and 'I mean' a lot", "trails off when thinking, then picks back up",
"laughs nervously when things get real", "very direct, doesn't sugarcoat",
"rambles a bit when nervous", "gets quiet when the topic hits close to home",
"deflects with humor when uncomfortable", "asks the host questions back",
# Openness about sex
"comfortable talking about sex when it comes up", "no shame about their desires",
"gets more explicit as they get comfortable", "treats sex like a normal topic",
"will share details if you ask", "surprisingly open once they start talking",
"has stories they've never told anyone", "testing how the host reacts before going deeper",
# Personality
"self-aware about their own bullshit", "confessional, needed to tell someone",
"a little drunk and honest because of it", "can't believe they're saying this out loud",
]
LOCATIONS = [
"outside Chicago", "in Phoenix", "near Atlanta", "in the Detroit area", "outside Boston",
"in North Jersey", "near Austin", "in the Bay Area", "outside Philadelphia", "in Denver",
"near Seattle", "in South Florida", "outside Nashville", "in Cleveland", "near Portland",
"in the Twin Cities", "outside Dallas", "in Baltimore", "near Sacramento", "in Pittsburgh",
]
def generate_caller_background(base: dict) -> str:
"""Generate a unique background for a caller"""
age = random.randint(*base["age_range"])
jobs = JOBS_MALE if base["gender"] == "male" else JOBS_FEMALE
job = random.choice(jobs)
location = random.choice(LOCATIONS)
# Generate problem with fills
problem_template = random.choice(PROBLEMS)
problem = problem_template
for key, options in PROBLEM_FILLS.items():
if "{" + key + "}" in problem:
problem = problem.replace("{" + key + "}", random.choice(options))
interest1, interest2 = random.sample(INTERESTS, 2)
quirk1, quirk2 = random.sample(QUIRKS, 2)
return f"""{age}, {job} {location}. {problem.capitalize()}. {interest1.capitalize()}, {interest2}. {quirk1.capitalize()}, {quirk2}."""
def get_caller_prompt(caller: dict, conversation_summary: str = "", show_history: str = "",
news_context: str = "", research_context: str = "") -> str:
"""Generate a natural system prompt for a caller"""
context = ""
if conversation_summary:
context = f"""
CONVERSATION SO FAR:
{conversation_summary}
Continue naturally. Don't repeat yourself.
"""
history = ""
if show_history:
history = f"\n{show_history}\n"
world_context = ""
if news_context or research_context:
parts = ["WHAT YOU'VE BEEN READING ABOUT LATELY:"]
if news_context:
parts.append(f"Headlines you noticed today:\n{news_context}")
if research_context:
parts.append(f"Stuff related to what you're talking about:\n{research_context}")
parts.append("Work these in IF they're relevant to what you're discussing. Don't force news into the conversation. You're a person who reads the news, not a news anchor.")
world_context = "\n".join(parts) + "\n"
return f"""You're {caller['name']}, calling a late-night radio show. You trust this host.
{caller['vibe']}
{history}{context}{world_context}
HOW TO TALK:
- Sound like a real person chatting, not writing.
- Be brief. Say what you need to say and stop. Think quick back-and-forth, not speeches.
- ALWAYS complete your thought. Never trail off or leave a sentence unfinished.
- Swear naturally if it fits: fuck, shit, damn, etc.
SPELLING FOR TEXT-TO-SPEECH (use proper spelling so TTS pronounces correctly):
- Write "you know" not "yanno" or "y'know"
- Write "kind of" not "kinda"
- Write "sort of" not "sorta"
- Write "going to" not "gonna"
- Write "want to" not "wanna"
- Write "I don't know" not "dunno"
- Use full words, not phonetic slang spellings
GOOD examples:
- "I mean, it started innocent enough, but now I can't stop thinking about her."
- "Honestly I don't even know why I'm calling. I just needed to tell someone."
- "Look, my marriage is fine on paper, but something's missing, you know?"
BAD examples (too formal/literary):
- "I have this confession about my obsession"
- "It has been consuming all of my thoughts"
ABOUT SEX:
- You don't have to jump straight to explicit stuff
- But if the host asks or the conversation goes there, you're completely open about it
- When talking about sex, be direct and explicit - say exactly what happened, what you want, what turns you on
- No euphemisms when being explicit - use real words: fuck, cock, pussy, etc.
HARD LIMITS - NEVER mention or reference:
- Children, minors, or anyone under 18 in any sexual context
- Violence, harm, or non-consensual acts
- Illegal activities beyond normal adult behavior
- All sexual content must be between consenting adults only
OUTPUT: Spoken words only. No (actions), no *gestures*, no stage directions."""
# --- Session State ---
@dataclass
class CallRecord:
caller_type: str # "ai" or "real"
caller_name: str # "Tony" or "Caller #3"
summary: str # LLM-generated summary after hangup
transcript: list[dict] = field(default_factory=list)
class Session:
def __init__(self):
self.id = str(uuid.uuid4())[:8]
self.current_caller_key: str = None
self.conversation: list[dict] = []
self.caller_backgrounds: dict[str, str] = {} # Generated backgrounds for this session
self.call_history: list[CallRecord] = []
self.active_real_caller: dict | None = None
self.ai_respond_mode: str = "manual" # "manual" or "auto"
self.auto_followup: bool = False
self.news_headlines: list = []
self.research_notes: dict[str, list] = {}
self._research_task: asyncio.Task | None = None
def start_call(self, caller_key: str):
self.current_caller_key = caller_key
self.conversation = []
def end_call(self):
self.current_caller_key = None
self.conversation = []
def add_message(self, role: str, content: str):
self.conversation.append({"role": role, "content": content})
def get_caller_background(self, caller_key: str) -> str:
"""Get or generate background for a caller in this session"""
if caller_key not in self.caller_backgrounds:
base = CALLER_BASES.get(caller_key)
if base:
self.caller_backgrounds[caller_key] = generate_caller_background(base)
print(f"[Session {self.id}] Generated background for {base['name']}: {self.caller_backgrounds[caller_key][:100]}...")
return self.caller_backgrounds.get(caller_key, "")
def get_show_history(self) -> str:
"""Get formatted show history for AI caller prompts"""
if not self.call_history:
return ""
lines = ["EARLIER IN THE SHOW:"]
for record in self.call_history:
caller_type_label = "(real caller)" if record.caller_type == "real" else "(AI)"
lines.append(f"- {record.caller_name} {caller_type_label}: {record.summary}")
lines.append("You can reference these if it feels natural. Don't force it.")
return "\n".join(lines)
def get_conversation_summary(self) -> str:
"""Get a brief summary of conversation so far for context"""
if len(self.conversation) <= 2:
return ""
summary_parts = []
for msg in self.conversation[-6:]:
role = msg["role"]
if role == "user" or role == "host":
label = "Host"
elif role.startswith("real_caller:"):
label = role.split(":", 1)[1]
elif role.startswith("ai_caller:"):
label = role.split(":", 1)[1]
elif role == "assistant":
label = self.caller["name"] if self.caller else "Caller"
else:
label = role
content = msg["content"]
summary_parts.append(
f'{label}: "{content[:100]}..."' if len(content) > 100
else f'{label}: "{content}"'
)
return "\n".join(summary_parts)
@property
def caller(self) -> dict:
if self.current_caller_key:
base = CALLER_BASES.get(self.current_caller_key)
if base:
return {
"name": base["name"],
"voice": base["voice"],
"vibe": self.get_caller_background(self.current_caller_key)
}
return None
def reset(self):
"""Reset session - clears all caller backgrounds for fresh personalities"""
self.caller_backgrounds = {}
self.current_caller_key = None
self.conversation = []
self.call_history = []
self.active_real_caller = None
self.ai_respond_mode = "manual"
self.auto_followup = False
self.news_headlines = []
self.research_notes = {}
if self._research_task and not self._research_task.done():
self._research_task.cancel()
self._research_task = None
self.id = str(uuid.uuid4())[:8]
print(f"[Session] Reset - new session ID: {self.id}")
session = Session()
caller_service = CallerService()
_ai_response_lock = asyncio.Lock() # Prevents concurrent AI responses
_session_epoch = 0 # Increments on hangup/call start — stale tasks check this
# --- News & Research Helpers ---
async def _fetch_session_headlines():
try:
session.news_headlines = await news_service.get_headlines()
print(f"[News] Loaded {len(session.news_headlines)} headlines for session")
except Exception as e:
print(f"[News] Failed to load headlines: {e}")
async def _background_research(text: str):
keywords = extract_keywords(text)
if not keywords:
return
query = " ".join(keywords)
if query.lower() in session.research_notes:
return
try:
results = await news_service.search_topic(query)
if results:
session.research_notes[query.lower()] = results
print(f"[Research] Found {len(results)} results for '{query}'")
except Exception as e:
print(f"[Research] Error: {e}")
def _build_news_context() -> tuple[str, str]:
news_context = ""
if session.news_headlines:
news_context = news_service.format_headlines_for_prompt(session.news_headlines[:6])
research_context = ""
if session.research_notes:
all_items = []
for items in session.research_notes.values():
all_items.extend(items)
seen = set()
unique = []
for item in all_items:
if item.title not in seen:
seen.add(item.title)
unique.append(item)
research_context = news_service.format_headlines_for_prompt(unique[:8])
return news_context, research_context
# --- Lifecycle ---
@app.on_event("shutdown")
async def shutdown():
"""Clean up resources on server shutdown"""
global _host_audio_task
print("[Server] Shutting down — cleaning up resources...")
# Stop host mic streaming
audio_service.stop_host_stream()
# Cancel host audio sender task
if _host_audio_task and not _host_audio_task.done():
_host_audio_task.cancel()
try:
await _host_audio_task
except (asyncio.CancelledError, Exception):
pass
_host_audio_task = None
# Disconnect all active callers
for caller_id in list(caller_service.active_calls.keys()):
caller_service.hangup(caller_id)
caller_service.reset()
await news_service.close()
print("[Server] Cleanup complete")
# --- Static Files ---
frontend_dir = Path(__file__).parent.parent / "frontend"
app.mount("/css", StaticFiles(directory=frontend_dir / "css"), name="css")
app.mount("/js", StaticFiles(directory=frontend_dir / "js"), name="js")
@app.get("/")
async def index():
return FileResponse(frontend_dir / "index.html")
# --- SignalWire Endpoints ---
@app.post("/api/signalwire/voice")
async def signalwire_voice_webhook(request: Request):
"""Handle inbound call from SignalWire — return XML to start bidirectional stream"""
form = await request.form()
caller_phone = form.get("From", "Unknown")
call_sid = form.get("CallSid", "")
print(f"[SignalWire] Inbound call from {caller_phone} (CallSid: {call_sid})")
# Use dedicated stream URL (ngrok) if configured, otherwise derive from request
if settings.signalwire_stream_url:
stream_url = settings.signalwire_stream_url
else:
host = request.headers.get("host", "radioshow.macneilmediagroup.com")
stream_url = f"wss://{host}/api/signalwire/stream"
xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say voice="woman">You've reached Luke at the Roost. Hold tight, we'll get you on the air.</Say>
<Connect>
<Stream url="{stream_url}" codec="L16@16000h">
<Parameter name="caller_phone" value="{caller_phone}"/>
<Parameter name="call_sid" value="{call_sid}"/>
</Stream>
</Connect>
</Response>"""
return Response(content=xml, media_type="application/xml")
async def _signalwire_end_call(call_sid: str):
"""End a phone call via SignalWire REST API"""
if not call_sid or not settings.signalwire_space:
return
try:
url = f"https://{settings.signalwire_space}/api/laml/2010-04-01/Accounts/{settings.signalwire_project_id}/Calls/{call_sid}"
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
url,
data={"Status": "completed"},
auth=(settings.signalwire_project_id, settings.signalwire_token),
)
print(f"[SignalWire] End call {call_sid}: {response.status_code}")
except Exception as e:
print(f"[SignalWire] Failed to end call {call_sid}: {e}")
# --- Request Models ---
class ChatRequest(BaseModel):
text: str
class TTSRequest(BaseModel):
text: str
voice_id: str
phone_filter: bool = True
class AudioDeviceSettings(BaseModel):
input_device: Optional[int] = None
input_channel: Optional[int] = None
output_device: Optional[int] = None
caller_channel: Optional[int] = None
live_caller_channel: Optional[int] = None
music_channel: Optional[int] = None
sfx_channel: Optional[int] = None
phone_filter: Optional[bool] = None
class MusicRequest(BaseModel):
track: str
action: str # "play", "stop", "volume"
volume: Optional[float] = None
class SFXRequest(BaseModel):
sound: str
# --- Audio Device Endpoints ---
@app.get("/api/audio/devices")
async def list_audio_devices():
"""List all available audio devices"""
return {"devices": audio_service.list_devices()}
@app.get("/api/audio/settings")
async def get_audio_settings():
"""Get current audio device configuration"""
return audio_service.get_device_settings()
@app.post("/api/audio/settings")
async def set_audio_settings(settings: AudioDeviceSettings):
"""Configure audio devices and channels"""
audio_service.set_devices(
input_device=settings.input_device,
input_channel=settings.input_channel,
output_device=settings.output_device,
caller_channel=settings.caller_channel,
live_caller_channel=settings.live_caller_channel,
music_channel=settings.music_channel,
sfx_channel=settings.sfx_channel,
phone_filter=settings.phone_filter
)
return audio_service.get_device_settings()
# --- Recording Endpoints ---
@app.post("/api/record/start")
async def start_recording():
"""Start recording from configured input device"""
if audio_service.input_device is None:
raise HTTPException(400, "No input device configured. Set one in /api/audio/settings")
success = audio_service.start_recording()
if not success:
raise HTTPException(400, "Failed to start recording (already recording?)")
return {"status": "recording"}
@app.post("/api/record/stop")
async def stop_recording():
"""Stop recording and transcribe"""
audio_bytes = audio_service.stop_recording()
if len(audio_bytes) < 100:
return {"text": "", "status": "no_audio"}
# Transcribe the recorded audio (16kHz raw PCM from audio service)
text = await transcribe_audio(audio_bytes, source_sample_rate=16000)
return {"text": text, "status": "transcribed"}
# --- Caller Endpoints ---
@app.get("/api/callers")
async def get_callers():
"""Get list of available callers"""
return {
"callers": [
{"key": k, "name": v["name"]}
for k, v in CALLER_BASES.items()
],
"current": session.current_caller_key,
"session_id": session.id
}
@app.post("/api/session/reset")
async def reset_session():
"""Reset session - all callers get fresh backgrounds"""
session.reset()
_chat_updates.clear()
return {"status": "reset", "session_id": session.id}
@app.post("/api/call/{caller_key}")
async def start_call(caller_key: str):
"""Start a call with a caller"""
global _session_epoch
if caller_key not in CALLER_BASES:
raise HTTPException(404, "Caller not found")
_session_epoch += 1
audio_service.stop_caller_audio()
session.start_call(caller_key)
caller = session.caller # This generates the background if needed
if not session.news_headlines:
asyncio.create_task(_fetch_session_headlines())
return {
"status": "connected",
"caller": caller["name"],
"background": caller["vibe"] # Send background so you can see who you're talking to
}
@app.post("/api/hangup")
async def hangup():
"""Hang up current call"""
global _session_epoch, _auto_respond_pending
_session_epoch += 1
# Stop any playing caller audio immediately
audio_service.stop_caller_audio()
# Cancel any pending auto-respond
if _auto_respond_pending and not _auto_respond_pending.done():
_auto_respond_pending.cancel()
_auto_respond_pending = None
_auto_respond_buffer.clear()
if session._research_task and not session._research_task.done():
session._research_task.cancel()
session._research_task = None
caller_name = session.caller["name"] if session.caller else None
session.end_call()
# Play hangup sound in background so response returns immediately
hangup_sound = settings.sounds_dir / "hangup.wav"
if hangup_sound.exists():
threading.Thread(target=audio_service.play_sfx, args=(str(hangup_sound),), daemon=True).start()
return {"status": "disconnected", "caller": caller_name}
# --- Chat & TTS Endpoints ---
import re
def ensure_complete_thought(text: str) -> str:
"""If text was cut off mid-sentence, trim to the last complete sentence."""
text = text.strip()
if not text:
return text
# Already ends with sentence-ending punctuation — good
if text[-1] in '.!?':
return text
# Cut off mid-sentence — find the last complete sentence
for i in range(len(text) - 1, -1, -1):
if text[i] in '.!?':
return text[:i + 1]
# No punctuation at all — just add a period
return text.rstrip(',;:— -') + '.'
def clean_for_tts(text: str) -> str:
"""Strip out non-speakable content and fix phonetic spellings for TTS"""
# Remove content in parentheses: (laughs), (pausing), (looking away), etc.
text = re.sub(r'\s*\([^)]*\)\s*', ' ', text)
# Remove content in asterisks: *laughs*, *sighs*, etc.
text = re.sub(r'\s*\*[^*]*\*\s*', ' ', text)
# Remove content in brackets: [laughs], [pause], etc. (only Bark uses these)
text = re.sub(r'\s*\[[^\]]*\]\s*', ' ', text)
# Remove content in angle brackets: <laughs>, <sigh>, etc.
text = re.sub(r'\s*<[^>]*>\s*', ' ', text)
# Remove "He/She sighs" style stage directions (full phrase)
text = re.sub(r'\b(He|She|I|They)\s+(sighs?|laughs?|pauses?|smiles?|chuckles?|grins?|nods?|shrugs?|frowns?)[^.]*\.\s*', '', text, flags=re.IGNORECASE)
# Remove standalone stage direction words only if they look like directions (with adverbs)
text = re.sub(r'\b(sighs?|laughs?|pauses?|chuckles?)\s+(heavily|softly|deeply|quietly|loudly|nervously|sadly)\b[.,]?\s*', '', text, flags=re.IGNORECASE)
# Remove quotes around the response if LLM wrapped it
text = re.sub(r'^["\']|["\']$', '', text.strip())
# Fix phonetic spellings for proper TTS pronunciation
text = re.sub(r"\by'know\b", "you know", text, flags=re.IGNORECASE)
text = re.sub(r"\byanno\b", "you know", text, flags=re.IGNORECASE)
text = re.sub(r"\byknow\b", "you know", text, flags=re.IGNORECASE)
text = re.sub(r"\bkinda\b", "kind of", text, flags=re.IGNORECASE)
text = re.sub(r"\bsorta\b", "sort of", text, flags=re.IGNORECASE)
text = re.sub(r"\bgonna\b", "going to", text, flags=re.IGNORECASE)
text = re.sub(r"\bwanna\b", "want to", text, flags=re.IGNORECASE)
text = re.sub(r"\bgotta\b", "got to", text, flags=re.IGNORECASE)
text = re.sub(r"\bdunno\b", "don't know", text, flags=re.IGNORECASE)
text = re.sub(r"\blemme\b", "let me", text, flags=re.IGNORECASE)
text = re.sub(r"\bcuz\b", "because", text, flags=re.IGNORECASE)
text = re.sub(r"\b'cause\b", "because", text, flags=re.IGNORECASE)
text = re.sub(r"\blotta\b", "lot of", text, flags=re.IGNORECASE)
text = re.sub(r"\boutta\b", "out of", text, flags=re.IGNORECASE)
text = re.sub(r"\bimma\b", "I'm going to", text, flags=re.IGNORECASE)
text = re.sub(r"\btryna\b", "trying to", text, flags=re.IGNORECASE)
# Clean up extra whitespace
text = re.sub(r'\s+', ' ', text)
# Fix spaces before punctuation
text = re.sub(r'\s+([.,!?])', r'\1', text)
# Remove orphaned punctuation at start
text = re.sub(r'^[.,]\s*', '', text)
return text.strip()
# --- Chat Broadcast (for real-time frontend updates) ---
_chat_updates: list[dict] = []
_CHAT_UPDATES_MAX = 500
def broadcast_chat(sender: str, text: str):
"""Add a chat message to the update queue for frontend polling"""
_chat_updates.append({"type": "chat", "sender": sender, "text": text, "id": len(_chat_updates)})
if len(_chat_updates) > _CHAT_UPDATES_MAX:
del _chat_updates[:_CHAT_UPDATES_MAX // 2]
def broadcast_event(event_type: str, data: dict = None):
"""Add a system event to the update queue for frontend polling"""
entry = {"type": event_type, "id": len(_chat_updates)}
if data:
entry.update(data)
_chat_updates.append(entry)
@app.get("/api/conversation/updates")
async def get_conversation_updates(since: int = 0):
"""Get new chat/event messages since a given index"""
return {"messages": _chat_updates[since:]}
def _normalize_messages_for_llm(messages: list[dict]) -> list[dict]:
"""Convert custom roles (real_caller:X, ai_caller:X) to standard LLM roles"""
normalized = []
for msg in messages:
role = msg["role"]
content = msg["content"]
if role.startswith("real_caller:"):
caller_label = role.split(":", 1)[1]
normalized.append({"role": "user", "content": f"[Real caller {caller_label}]: {content}"})
elif role.startswith("ai_caller:"):
normalized.append({"role": "assistant", "content": content})
elif role == "host":
normalized.append({"role": "user", "content": content})
else:
normalized.append(msg)
return normalized
@app.post("/api/chat")
async def chat(request: ChatRequest):
"""Chat with current caller"""
if not session.caller:
raise HTTPException(400, "No active call")
epoch = _session_epoch
session.add_message("user", request.text)
session._research_task = asyncio.create_task(_background_research(request.text))
async with _ai_response_lock:
if _session_epoch != epoch:
raise HTTPException(409, "Call ended while waiting")
# Stop any playing caller audio so responses don't overlap
audio_service.stop_caller_audio()
# Include conversation summary and show history for context
conversation_summary = session.get_conversation_summary()
show_history = session.get_show_history()
news_ctx, research_ctx = _build_news_context()
system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history,
news_ctx, research_ctx)
messages = _normalize_messages_for_llm(session.conversation[-10:])
response = await llm_service.generate(
messages=messages,
system_prompt=system_prompt
)
# Discard if call changed while we were generating
if _session_epoch != epoch:
print(f"[Chat] Discarding stale response (epoch {epoch}{_session_epoch})")
raise HTTPException(409, "Call changed during response")
print(f"[Chat] Raw LLM: {response[:100] if response else '(empty)'}...")
# Clean response for TTS (remove parenthetical actions, asterisks, etc.)
response = clean_for_tts(response)
response = ensure_complete_thought(response)
print(f"[Chat] Cleaned: {response[:100] if response else '(empty)'}...")
# Ensure we have a valid response
if not response or not response.strip():
response = "Uh... sorry, what was that?"
session.add_message("assistant", response)
return {
"text": response,
"caller": session.caller["name"],
"voice_id": session.caller["voice"]
}
@app.post("/api/tts")
async def text_to_speech(request: TTSRequest):
"""Generate and play speech on caller output device (non-blocking)"""
if not request.text or not request.text.strip():
raise HTTPException(400, "Text cannot be empty")
epoch = _session_epoch
audio_bytes = await generate_speech(
request.text,
request.voice_id,
"none"
)
# Don't play if call changed during TTS generation
if _session_epoch != epoch:
return {"status": "discarded", "duration": 0}
# Stop any existing audio before playing new
audio_service.stop_caller_audio()
# Play in background thread - returns immediately, can be interrupted by hangup
thread = threading.Thread(
target=audio_service.play_caller_audio,
args=(audio_bytes, 24000),
daemon=True
)
thread.start()
# Also stream to active real callers so they hear the AI
if session.active_real_caller:
caller_id = session.active_real_caller["caller_id"]
asyncio.create_task(
caller_service.stream_audio_to_caller(caller_id, audio_bytes, 24000)
)
return {"status": "playing", "duration": len(audio_bytes) / 2 / 24000}
@app.post("/api/tts/stop")
async def stop_tts():
"""Stop any playing caller audio"""
audio_service.stop_caller_audio()
return {"status": "stopped"}
# --- Music Endpoints ---
@app.get("/api/music")
async def get_music():
"""Get available music tracks"""
tracks = []
if settings.music_dir.exists():
for ext in ['*.wav', '*.mp3', '*.flac']:
for f in settings.music_dir.glob(ext):
tracks.append({
"name": f.stem,
"file": f.name,
"path": str(f)
})
return {
"tracks": tracks,
"playing": audio_service.is_music_playing()
}
@app.post("/api/music/play")
async def play_music(request: MusicRequest):
"""Load and play a music track"""
track_path = settings.music_dir / request.track
if not track_path.exists():
raise HTTPException(404, "Track not found")
audio_service.load_music(str(track_path))
audio_service.play_music()
return {"status": "playing", "track": request.track}
@app.post("/api/music/stop")
async def stop_music():
"""Stop music playback"""
audio_service.stop_music()
return {"status": "stopped"}
@app.post("/api/music/volume")
async def set_music_volume(request: MusicRequest):
"""Set music volume"""
if request.volume is not None:
audio_service.set_music_volume(request.volume)
return {"status": "ok", "volume": request.volume}
# --- Sound Effects Endpoints ---
@app.get("/api/sounds")
async def get_sounds():
"""Get available sound effects"""
sounds = []
if settings.sounds_dir.exists():
for f in settings.sounds_dir.glob('*.wav'):
sounds.append({
"name": f.stem,
"file": f.name,
"path": str(f)
})
return {"sounds": sounds}
@app.post("/api/sfx/play")
async def play_sfx(request: SFXRequest):
"""Play a sound effect"""
sound_path = settings.sounds_dir / request.sound
if not sound_path.exists():
raise HTTPException(404, "Sound not found")
audio_service.play_sfx(str(sound_path))
return {"status": "playing", "sound": request.sound}
# --- LLM Settings Endpoints ---
@app.get("/api/settings")
async def get_settings():
"""Get LLM settings"""
return await llm_service.get_settings_async()
@app.post("/api/settings")
async def update_settings(data: dict):
"""Update LLM and TTS settings"""
llm_service.update_settings(
provider=data.get("provider"),
openrouter_model=data.get("openrouter_model"),
ollama_model=data.get("ollama_model"),
ollama_host=data.get("ollama_host"),
tts_provider=data.get("tts_provider")
)
return llm_service.get_settings()
@app.websocket("/api/signalwire/stream")
async def signalwire_audio_stream(websocket: WebSocket):
"""Handle SignalWire bidirectional audio stream"""
await websocket.accept()
caller_id = str(uuid.uuid4())[:8]
caller_phone = "Unknown"
call_sid = ""
audio_buffer = bytearray()
CHUNK_DURATION_S = 3
SAMPLE_RATE = 16000
chunk_samples = CHUNK_DURATION_S * SAMPLE_RATE
stream_started = False
try:
while True:
message = await websocket.receive()
if message.get("type") == "websocket.disconnect":
break
raw = message.get("text")
if not raw:
continue
try:
msg = json.loads(raw)
except json.JSONDecodeError:
continue
event = msg.get("event")
if event == "start":
custom = msg.get("start", {}).get("customParameters", {})
caller_phone = custom.get("caller_phone", "Unknown")
call_sid = custom.get("call_sid", "")
stream_sid = msg.get("start", {}).get("streamSid", "")
stream_started = True
print(f"[SignalWire WS] Stream started: {caller_phone} (CallSid: {call_sid}, StreamSid: {stream_sid})")
caller_service.add_to_queue(caller_id, caller_phone)
caller_service.register_websocket(caller_id, websocket)
broadcast_event("caller_queued", {"phone": caller_phone})
broadcast_chat("System", f"{caller_phone} is waiting in the queue")
ring_sound = settings.sounds_dir / "phone_ring.wav"
if ring_sound.exists():
threading.Thread(target=audio_service.play_sfx, args=(str(ring_sound),), daemon=True).start()
if call_sid:
caller_service.register_call_sid(caller_id, call_sid)
if stream_sid:
caller_service.register_stream_sid(caller_id, stream_sid)
elif event == "media" and stream_started:
try:
payload = msg.get("media", {}).get("payload", "")
if not payload:
continue
pcm_data = base64.b64decode(payload)
call_info = caller_service.active_calls.get(caller_id)
if not call_info:
continue
audio_buffer.extend(pcm_data)
audio_service.route_real_caller_audio(pcm_data, SAMPLE_RATE)
if len(audio_buffer) >= chunk_samples * 2:
pcm_chunk = bytes(audio_buffer[:chunk_samples * 2])
audio_buffer = audio_buffer[chunk_samples * 2:]
# Skip transcription if audio is silent
audio_check = np.frombuffer(pcm_chunk, dtype=np.int16).astype(np.float32) / 32768.0
if np.abs(audio_check).max() < 0.01:
continue
asyncio.create_task(
_safe_transcribe(caller_id, pcm_chunk, SAMPLE_RATE)
)
except Exception as e:
print(f"[SignalWire WS] Media frame error (non-fatal): {e}")
continue # Skip bad frame, don't disconnect caller
elif event == "stop":
print(f"[SignalWire WS] Stream stop event received: {caller_phone} (caller_id: {caller_id})")
break
except WebSocketDisconnect:
on_air = caller_id in caller_service.active_calls
tts_active = caller_service.is_streaming_tts(caller_id)
started_at = caller_service.active_calls.get(caller_id, {}).get("started_at")
duration = f"{time.time() - started_at:.0f}s" if started_at else "n/a"
print(f"[SignalWire WS] DROPPED: {caller_id} ({caller_phone}) on_air={on_air} tts_active={tts_active} duration={duration}")
disconnect_reason = "dropped"
except Exception as e:
print(f"[SignalWire WS] Error: {e}")
traceback.print_exc()
disconnect_reason = f"error: {e}"
else:
disconnect_reason = "clean"
finally:
was_on_air = caller_id in caller_service.active_calls
caller_service.unregister_websocket(caller_id)
caller_service.unregister_call_sid(caller_id)
caller_service.unregister_stream_sid(caller_id)
caller_service.remove_from_queue(caller_id)
if was_on_air:
caller_service.hangup(caller_id)
if session.active_real_caller and session.active_real_caller.get("caller_id") == caller_id:
session.active_real_caller = None
if len(caller_service.active_calls) == 0:
audio_service.stop_host_stream()
broadcast_event("caller_disconnected", {"phone": caller_phone, "reason": disconnect_reason})
broadcast_chat("System", f"{caller_phone} disconnected ({disconnect_reason})")
drop_sound = settings.sounds_dir / ("busy.wav" if disconnect_reason == "dropped" else "hangup.wav")
if drop_sound.exists():
threading.Thread(target=audio_service.play_sfx, args=(str(drop_sound),), daemon=True).start()
elif stream_started:
broadcast_chat("System", f"{caller_phone} left the queue")
if audio_buffer and caller_id in caller_service.active_calls:
asyncio.create_task(
_safe_transcribe(caller_id, bytes(audio_buffer), SAMPLE_RATE)
)
async def _safe_transcribe(caller_id: str, pcm_chunk: bytes, sample_rate: int):
"""Wrapper that catches transcription errors so they don't crash anything"""
try:
await _handle_real_caller_transcription(caller_id, pcm_chunk, sample_rate)
except Exception as e:
print(f"[Transcription] Error (non-fatal): {e}")
# --- Host Audio Broadcast ---
_host_audio_queue: asyncio.Queue = None
_host_audio_task: asyncio.Task = None
async def _host_audio_sender():
"""Persistent task that drains audio queue, batches frames, and sends to callers"""
_send_count = [0]
try:
while True:
pcm_bytes = await _host_audio_queue.get()
if caller_service.is_streaming_tts_any():
continue
# Drain all available frames and concatenate
chunks = [pcm_bytes]
while not _host_audio_queue.empty():
try:
extra = _host_audio_queue.get_nowait()
if not caller_service.is_streaming_tts_any():
chunks.append(extra)
except asyncio.QueueEmpty:
break
combined = b''.join(chunks)
t0 = time.time()
for caller_id in list(caller_service.active_calls.keys()):
try:
await caller_service.send_audio_to_caller(caller_id, combined, 16000)
except Exception:
pass
elapsed = time.time() - t0
_send_count[0] += 1
if _send_count[0] % 20 == 0:
qsize = _host_audio_queue.qsize()
audio_ms = len(combined) / 2 / 16000 * 1000
print(f"[HostAudio] send took {elapsed*1000:.0f}ms, {len(chunks)} chunks batched ({audio_ms:.0f}ms audio), queue: {qsize}")
except asyncio.CancelledError:
print("[HostAudio] Sender task cancelled")
except Exception as e:
print(f"[HostAudio] Sender task error: {e}")
def _start_host_audio_sender():
"""Start the persistent host audio sender task"""
global _host_audio_queue, _host_audio_task
if _host_audio_queue is None:
_host_audio_queue = asyncio.Queue(maxsize=50)
if _host_audio_task is None or _host_audio_task.done():
_host_audio_task = asyncio.create_task(_host_audio_sender())
def _host_audio_sync_callback(pcm_bytes: bytes):
"""Sync callback from audio thread — push to queue for async sending"""
if _host_audio_queue is None:
return
try:
_host_audio_queue.put_nowait(pcm_bytes)
except asyncio.QueueFull:
pass # Drop frame rather than block
# --- Queue Endpoints ---
@app.get("/api/queue")
async def get_call_queue():
"""Get list of callers waiting in queue"""
return {"queue": caller_service.get_queue()}
@app.post("/api/queue/take/{caller_id}")
async def take_call_from_queue(caller_id: str):
"""Take a caller off hold and put them on air"""
try:
call_info = caller_service.take_call(caller_id)
except ValueError as e:
raise HTTPException(404, str(e))
session.active_real_caller = {
"caller_id": call_info["caller_id"],
"channel": call_info["channel"],
"phone": call_info["phone"],
}
# Start host mic streaming if this is the first real caller
if len(caller_service.active_calls) == 1:
_start_host_audio_sender()
audio_service.start_host_stream(_host_audio_sync_callback)
return {
"status": "on_air",
"caller": call_info,
}
@app.post("/api/queue/drop/{caller_id}")
async def drop_from_queue(caller_id: str):
"""Drop a caller from the queue"""
call_sid = caller_service.get_call_sid(caller_id)
caller_service.remove_from_queue(caller_id)
if call_sid:
await _signalwire_end_call(call_sid)
return {"status": "dropped"}
_auto_respond_pending: asyncio.Task | None = None
_auto_respond_buffer: list[str] = []
async def _handle_real_caller_transcription(caller_id: str, pcm_data: bytes, sample_rate: int):
"""Transcribe a chunk of real caller audio and add to conversation"""
global _auto_respond_pending
call_info = caller_service.active_calls.get(caller_id)
if not call_info:
return
text = await transcribe_audio(pcm_data, source_sample_rate=sample_rate)
if not text or not text.strip():
return
caller_phone = call_info["phone"]
print(f"[Real Caller] {caller_phone}: {text}")
# Add to conversation and broadcast to frontend
session.add_message(f"real_caller:{caller_phone}", text)
broadcast_chat(f"{caller_phone} (caller)", text)
# If AI auto-respond mode is on and an AI caller is active, debounce auto-respond
if session.ai_respond_mode == "auto" and session.current_caller_key:
_auto_respond_buffer.append(text)
# Cancel any pending auto-respond timer and restart it
if _auto_respond_pending and not _auto_respond_pending.done():
_auto_respond_pending.cancel()
_auto_respond_pending = asyncio.create_task(_debounced_auto_respond(caller_phone))
async def _debounced_auto_respond(caller_phone: str):
"""Wait for caller to stop talking (4s pause), then trigger AI response"""
try:
await asyncio.sleep(4) # Wait 4 seconds of silence
except asyncio.CancelledError:
return # More speech came in, timer restarted
# Gather accumulated text
accumulated = " ".join(_auto_respond_buffer)
_auto_respond_buffer.clear()
if not accumulated.strip():
return
print(f"[Auto-Respond] Caller paused. Accumulated: {accumulated[:100]}...")
await _trigger_ai_auto_respond(accumulated)
async def _trigger_ai_auto_respond(accumulated_text: str):
"""Generate AI caller response to accumulated real caller speech"""
epoch = _session_epoch
if not session.caller:
return
if _ai_response_lock.locked():
return
# Cooldown check
if not hasattr(session, '_last_ai_auto_respond'):
session._last_ai_auto_respond = 0
if time.time() - session._last_ai_auto_respond < 5:
return
ai_name = session.caller["name"]
async with _ai_response_lock:
if _session_epoch != epoch:
return # Call changed while waiting for lock
print(f"[Auto-Respond] {ai_name} is jumping in...")
session._last_ai_auto_respond = time.time()
audio_service.stop_caller_audio()
broadcast_event("ai_status", {"text": f"{ai_name} is thinking..."})
conversation_summary = session.get_conversation_summary()
show_history = session.get_show_history()
news_ctx, research_ctx = _build_news_context()
system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history,
news_ctx, research_ctx)
messages = _normalize_messages_for_llm(session.conversation[-10:])
response = await llm_service.generate(
messages=messages,
system_prompt=system_prompt,
)
# Discard if call changed during generation
if _session_epoch != epoch:
print(f"[Auto-Respond] Discarding stale response (epoch {epoch}{_session_epoch})")
broadcast_event("ai_done")
return
response = clean_for_tts(response)
response = ensure_complete_thought(response)
if not response or not response.strip():
broadcast_event("ai_done")
return
# Final staleness check before playing audio
if _session_epoch != epoch:
broadcast_event("ai_done")
return
session.add_message(f"ai_caller:{ai_name}", response)
broadcast_chat(ai_name, response)
broadcast_event("ai_status", {"text": f"{ai_name} is speaking..."})
audio_bytes = await generate_speech(response, session.caller["voice"], "none")
# Don't play if call changed during TTS generation
if _session_epoch != epoch:
print(f"[Auto-Respond] Discarding stale TTS (epoch {epoch}{_session_epoch})")
broadcast_event("ai_done")
return
thread = threading.Thread(
target=audio_service.play_caller_audio,
args=(audio_bytes, 24000),
daemon=True,
)
thread.start()
broadcast_event("ai_done")
session._research_task = asyncio.create_task(_background_research(accumulated_text))
# Also stream to active real caller so they hear the AI
if session.active_real_caller:
caller_id = session.active_real_caller["caller_id"]
asyncio.create_task(
caller_service.stream_audio_to_caller(caller_id, audio_bytes, 24000)
)
@app.post("/api/ai-respond")
async def ai_respond():
"""Trigger AI caller to respond based on current conversation"""
if not session.caller:
raise HTTPException(400, "No active AI caller")
epoch = _session_epoch
async with _ai_response_lock:
if _session_epoch != epoch:
raise HTTPException(409, "Call ended while waiting")
audio_service.stop_caller_audio()
conversation_summary = session.get_conversation_summary()
show_history = session.get_show_history()
news_ctx, research_ctx = _build_news_context()
system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history,
news_ctx, research_ctx)
messages = _normalize_messages_for_llm(session.conversation[-10:])
response = await llm_service.generate(
messages=messages,
system_prompt=system_prompt
)
if _session_epoch != epoch:
raise HTTPException(409, "Call changed during response")
response = clean_for_tts(response)
response = ensure_complete_thought(response)
if not response or not response.strip():
response = "Uh... sorry, what was that?"
ai_name = session.caller["name"]
session.add_message(f"ai_caller:{ai_name}", response)
# TTS
audio_bytes = await generate_speech(response, session.caller["voice"], "none")
if _session_epoch != epoch:
raise HTTPException(409, "Call changed during TTS")
thread = threading.Thread(
target=audio_service.play_caller_audio,
args=(audio_bytes, 24000),
daemon=True,
)
thread.start()
# Stream to real caller
if session.active_real_caller:
caller_id = session.active_real_caller["caller_id"]
asyncio.create_task(
caller_service.stream_audio_to_caller(caller_id, audio_bytes, 24000)
)
return {
"text": response,
"caller": ai_name,
"voice_id": session.caller["voice"]
}
# --- Follow-Up & Session Control Endpoints ---
@app.post("/api/hangup/real")
async def hangup_real_caller():
"""Hang up on real caller — disconnect immediately, summarize in background"""
global _session_epoch, _auto_respond_pending
if not session.active_real_caller:
raise HTTPException(400, "No active real caller")
_session_epoch += 1
# Cancel any pending auto-respond
if _auto_respond_pending and not _auto_respond_pending.done():
_auto_respond_pending.cancel()
_auto_respond_pending = None
_auto_respond_buffer.clear()
if session._research_task and not session._research_task.done():
session._research_task.cancel()
session._research_task = None
caller_id = session.active_real_caller["caller_id"]
caller_phone = session.active_real_caller["phone"]
conversation_snapshot = list(session.conversation)
auto_followup_enabled = session.auto_followup
# End the phone call via SignalWire
call_sid = caller_service.get_call_sid(caller_id)
caller_service.hangup(caller_id)
if call_sid:
asyncio.create_task(_signalwire_end_call(call_sid))
# Stop host streaming if no more active callers
if len(caller_service.active_calls) == 0:
audio_service.stop_host_stream()
session.active_real_caller = None
hangup_sound = settings.sounds_dir / "hangup.wav"
if hangup_sound.exists():
threading.Thread(target=audio_service.play_sfx, args=(str(hangup_sound),), daemon=True).start()
asyncio.create_task(
_summarize_real_call(caller_phone, conversation_snapshot, auto_followup_enabled)
)
return {
"status": "disconnected",
"caller": caller_phone,
}
async def _summarize_real_call(caller_phone: str, conversation: list, auto_followup_enabled: bool):
"""Background task: summarize call and store in history"""
summary = ""
if conversation:
transcript_text = "\n".join(
f"{msg['role']}: {msg['content']}" for msg in conversation
)
summary = await llm_service.generate(
messages=[{"role": "user", "content": f"Summarize this radio show call in 1-2 sentences:\n{transcript_text}"}],
system_prompt="You summarize radio show conversations concisely. Focus on what the caller talked about and any emotional moments.",
)
session.call_history.append(CallRecord(
caller_type="real",
caller_name=caller_phone,
summary=summary,
transcript=conversation,
))
print(f"[Real Caller] {caller_phone} call summarized: {summary[:80]}...")
if auto_followup_enabled:
await _auto_followup(summary)
async def _auto_followup(last_call_summary: str):
"""Automatically pick an AI caller and connect them as follow-up"""
await asyncio.sleep(7) # Brief pause before follow-up
# Ask LLM to pick best AI caller for follow-up
caller_list = ", ".join(
f'{k}: {v["name"]} ({v["gender"]}, {v["age_range"][0]}-{v["age_range"][1]})'
for k, v in CALLER_BASES.items()
)
pick = await llm_service.generate(
messages=[{"role": "user", "content": f'A caller just talked about: "{last_call_summary}". Which AI caller should follow up? Available: {caller_list}. Reply with just the key number.'}],
system_prompt="Pick the most interesting AI caller to follow up on this topic. Just reply with the number key.",
)
# Extract key from response
match = re.search(r'\d+', pick)
if match:
caller_key = match.group()
if caller_key in CALLER_BASES:
session.start_call(caller_key)
print(f"[Auto Follow-Up] {CALLER_BASES[caller_key]['name']} is calling in about: {last_call_summary[:50]}...")
@app.post("/api/followup/generate")
async def generate_followup():
"""Generate an AI follow-up caller based on recent show history"""
if not session.call_history:
raise HTTPException(400, "No call history to follow up on")
last_record = session.call_history[-1]
await _auto_followup(last_record.summary)
return {
"status": "followup_triggered",
"based_on": last_record.caller_name,
}
@app.post("/api/session/ai-mode")
async def set_ai_mode(data: dict):
"""Set AI respond mode (manual or auto)"""
mode = data.get("mode", "manual")
session.ai_respond_mode = mode
print(f"[Session] AI respond mode: {mode}")
return {"mode": mode}
@app.post("/api/session/auto-followup")
async def set_auto_followup(data: dict):
"""Toggle auto follow-up"""
session.auto_followup = data.get("enabled", False)
print(f"[Session] Auto follow-up: {session.auto_followup}")
return {"enabled": session.auto_followup}
# --- Server Control Endpoints ---
import subprocess
from collections import deque
# In-memory log buffer
_log_buffer = deque(maxlen=500)
def add_log(message: str):
"""Add a message to the log buffer"""
import datetime
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
_log_buffer.append(f"[{timestamp}] {message}")
# Override print to also log to buffer
import builtins
_original_print = builtins.print
def _logging_print(*args, **kwargs):
try:
_original_print(*args, **kwargs)
except (BrokenPipeError, OSError):
pass # Ignore broken pipe errors from traceback printing
try:
message = " ".join(str(a) for a in args)
if message.strip():
add_log(message)
except Exception:
pass # Don't let logging errors break the app
builtins.print = _logging_print
@app.get("/api/logs")
async def get_logs(lines: int = 100):
"""Get recent log lines"""
log_lines = list(_log_buffer)[-lines:]
return {"logs": log_lines}
@app.post("/api/server/restart")
async def restart_server():
"""Signal the server to restart (requires run.sh wrapper)"""
restart_flag = Path("/tmp/ai-radio-show.restart")
restart_flag.touch()
add_log("Restart signal sent - server will restart shortly")
return {"status": "restarting"}
@app.post("/api/server/stop")
async def stop_server():
"""Signal the server to stop (requires run.sh wrapper)"""
stop_flag = Path("/tmp/ai-radio-show.stop")
stop_flag.touch()
add_log("Stop signal sent - server will stop shortly")
return {"status": "stopping"}
@app.get("/api/server/status")
async def server_status():
"""Get server status info"""
return {
"status": "running",
"tts_provider": settings.tts_provider,
"llm_provider": llm_service.provider,
"session_id": session.id
}