Files
ai-podcast/backend/main.py
tcpsyn 73129374f4 Bake news context into caller backgrounds at pickup time
Instead of injecting research into every LLM call (which bloated prompts
and caused timeouts), do one quick SearXNG search when a caller is picked
up and add a relevant headline to their background. 3s timeout — if search
is slow, caller just doesn't reference news. Zero impact on live conversation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 21:47:46 -07:00

1983 lines
78 KiB
Python

"""AI Radio Show - Control Panel Backend"""
import uuid
import asyncio
import base64
import threading
import traceback
from dataclasses import dataclass, field
from pathlib import Path
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Request, Response
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
import json
import time
import httpx
import numpy as np
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
from .config import settings
from .services.caller_service import CallerService
from .services.transcription import transcribe_audio
from .services.llm import llm_service
from .services.tts import generate_speech
from .services.audio import audio_service
from .services.news import news_service, extract_keywords, STOP_WORDS
app = FastAPI(title="AI Radio Show")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Callers ---
# Base caller info (name, voice) - backgrounds generated dynamically per session
import random
MALE_NAMES = [
"Tony", "Rick", "Dennis", "Earl", "Marcus", "Keith", "Darnell", "Wayne",
"Greg", "Andre", "Ray", "Jerome", "Hector", "Travis", "Vince", "Leon",
"Dale", "Frank", "Terrence", "Bobby", "Cliff", "Nate", "Reggie", "Carl",
]
FEMALE_NAMES = [
"Jasmine", "Megan", "Tanya", "Carla", "Brenda", "Sheila", "Denise", "Tamika",
"Lorraine", "Crystal", "Angie", "Renee", "Monique", "Gina", "Patrice", "Deb",
"Shonda", "Marlene", "Yolanda", "Stacy", "Jackie", "Carmen", "Rita", "Val",
]
# Voice pools — ElevenLabs IDs mapped to Inworld voices in tts.py
MALE_VOICES = [
"VR6AewLTigWG4xSOukaG", # Edward
"TxGEqnHWrfWFTfGW9XjX", # Shaun
"pNInz6obpgDQGcFmaJgB", # Alex
"ODq5zmih8GrVes37Dizd", # Craig
"IKne3meq5aSn9XLyUdCD", # Timothy
]
FEMALE_VOICES = [
"jBpfuIE2acCO8z3wKNLl", # Hana
"EXAVITQu4vr4xnSDxMaL", # Ashley
"21m00Tcm4TlvDq8ikWAM", # Wendy
"XB0fDUnXU5powFXDhCwa", # Sarah
"pFZP5JQG7iQjIQuC4Bku", # Deborah
]
CALLER_BASES = {
"1": {"gender": "male", "age_range": (28, 62)},
"2": {"gender": "female", "age_range": (22, 55)},
"3": {"gender": "male", "age_range": (30, 65)},
"4": {"gender": "female", "age_range": (21, 45)},
"5": {"gender": "male", "age_range": (25, 58)},
"6": {"gender": "female", "age_range": (28, 52)},
"7": {"gender": "male", "age_range": (40, 72)},
"8": {"gender": "female", "age_range": (30, 60)},
"9": {"gender": "male", "age_range": (21, 38)},
"0": {"gender": "female", "age_range": (35, 65)},
}
def _randomize_callers():
"""Assign random names and voices to callers, unique per gender."""
num_m = sum(1 for c in CALLER_BASES.values() if c["gender"] == "male")
num_f = sum(1 for c in CALLER_BASES.values() if c["gender"] == "female")
males = random.sample(MALE_NAMES, num_m)
females = random.sample(FEMALE_NAMES, num_f)
m_voices = random.sample(MALE_VOICES, num_m)
f_voices = random.sample(FEMALE_VOICES, num_f)
mi, fi = 0, 0
for base in CALLER_BASES.values():
if base["gender"] == "male":
base["name"] = males[mi]
base["voice"] = m_voices[mi]
mi += 1
else:
base["name"] = females[fi]
base["voice"] = f_voices[fi]
fi += 1
_randomize_callers() # Initial assignment
# Background components for dynamic generation
JOBS_MALE = [
# Trades & blue collar
"runs a small HVAC business", "works as a long-haul trucker", "works construction",
"is a union electrician", "owns a small landscaping company", "drives for UPS",
"is a welder at a shipyard", "works as a diesel mechanic", "does roofing",
"is a plumber, runs his own crew", "works at a grain elevator", "is a ranch hand",
# Service & public
"is a paramedic", "is a cop, 12 years on the force", "is a firefighter",
"works as a hospital security guard", "is a corrections officer", "drives a city bus",
# Food & hospitality
"is a line cook at a decent restaurant", "runs a food truck", "manages a bar",
"works the night shift at a gas station", "delivers pizza, has for years",
# White collar & tech
"works IT for the city", "is an insurance adjuster, hates it", "is a bank teller",
"does accounting for a small firm", "sells cars at a dealership", "works in a call center",
"is a project manager at a mid-size company", "works in logistics",
# Creative & education
"is a high school football coach", "teaches middle school history",
"is a freelance photographer", "is a session musician", "is a tattoo artist",
"works at a brewery", "is a youth pastor", "does standup comedy on the side",
# Odd & specific
"works at a pawn shop", "is a repo man", "runs a junkyard", "is a locksmith",
"works overnight stocking shelves", "is a pest control guy", "drives a tow truck",
"is a bouncer at a club", "works at a cemetery", "is a crop duster pilot",
"manages a storage facility", "is a hunting guide", "works on an oil rig, two weeks on two off",
]
JOBS_FEMALE = [
# Healthcare
"works as an ER nurse", "is a dental hygienist", "is a vet tech",
"works in hospital billing", "is a home health aide", "is a phlebotomist",
"works as a traveling nurse", "is a midwife",
# Service & public
"works as a 911 dispatcher", "is a social worker", "works retail management",
"works as a bartender at a dive bar", "is a flight attendant",
"manages a restaurant", "works the front desk at a hotel",
# Education & office
"teaches kindergarten", "is a paralegal", "is an accountant at a small firm",
"works in HR", "is a court reporter", "does data entry from home",
"is a school bus driver", "works at the DMV",
# Creative & entrepreneurial
"is a hairstylist, owns her chair", "runs a small bakery",
"runs a daycare out of her home", "is a real estate agent",
"is a wedding planner", "does nails, has a loyal clientele",
"sells stuff on Etsy full time", "is a dog groomer",
# Odd & specific
"works at a truck stop diner", "is a bail bonds agent",
"works at a tribal casino", "manages a laundromat",
"works overnight at a group home", "is a park ranger",
"drives an ambulance", "works at a thrift store",
"is a taxidermist", "cleans houses, runs her own business",
"works at a gun range", "is a long-haul trucker",
"works the night shift at Waffle House", "is a funeral home director",
]
PROBLEMS = [
# Family drama
"hasn't talked to their father in years and just got a call that he's dying",
"found out they were adopted and doesn't know how to process it",
"is being pressured to take care of an aging parent who was never there for them",
"just discovered a family secret that changes everything they thought they knew",
"has a sibling who's destroying themselves and nobody will intervene",
"is estranged from their kids and it's killing them",
"found out their parent had a whole other family nobody knew about",
"is watching their parents' marriage fall apart after 40 years",
"their kid just got arrested and they don't know what to do",
"found out their teenager has been lying about where they go at night",
"their in-laws are trying to take over their life and their spouse won't say anything",
# Career and purpose
"woke up and realized they've been in the wrong career for 15 years",
"got passed over for a promotion they deserved and is questioning everything",
"has a dream they gave up on years ago and it's haunting them",
"is successful on paper but feels completely empty inside",
"hates their job but can't afford to leave and it's breaking them",
"just got fired and doesn't know who they are without their work",
"is being asked to do something unethical at work and doesn't know what to do",
"watches their boss take credit for everything and is losing their mind",
"started a business and it's failing and they've sunk everything into it",
"got a job offer across the country and their family doesn't want to move",
"is about to get laid off and hasn't told their spouse",
"found out a coworker making half the effort makes twice the money",
# Money and survival
"is drowning in debt and can't see a way out",
"just found out their spouse has been hiding massive credit card debt",
"lost their savings in a bad investment and is too ashamed to tell anyone",
"can't make rent and is about to be evicted",
"lent a family member a ton of money and they won't pay it back",
"is working three jobs and still barely making it",
"inherited money and it's tearing the family apart",
"their car broke down and they can't afford to fix it and need it for work",
# Health scares
"just got a diagnosis they weren't expecting and is processing it alone",
"has been ignoring symptoms because they're scared of what the doctor will say",
"someone they love just got diagnosed with something serious",
"had a health scare and it's making them rethink everything",
"is dealing with chronic pain and nobody seems to believe them",
"just found out they can't have kids",
# Mental health and inner struggles
"has been putting on a brave face but is barely holding it together",
"can't shake the feeling that their best years are behind them",
"keeps self-sabotaging every good thing in their life and doesn't know why",
"has been numb for months and is starting to scare themselves",
"feels like a fraud and is waiting to be found out",
"is exhausted from being the strong one for everyone else",
"has been having panic attacks and doesn't know what's triggering them",
"can't stop doom scrolling and it's making them miserable",
"hasn't left the house in weeks and is starting to wonder if something's wrong",
# Grief and loss
"lost someone close and hasn't really dealt with it",
"is grieving someone who's still alive but is no longer the person they knew",
"never got closure with someone who died and it's eating at them",
"is watching their best friend slowly die and doesn't know how to be there",
"their dog died and they're more wrecked than they thought they'd be",
"lost their house in a fire and is still processing it",
# Regrets and past mistakes
"made a choice years ago that changed everything and wonders what if",
"hurt someone badly and never apologized, and it haunts them",
"let the one that got away go and thinks about them constantly",
"gave up on something important to make someone else happy and resents it",
"was a bully growing up and is finally reckoning with it",
"got a DUI and it's ruining their life",
"ghosted someone who really cared about them and feels terrible about it",
# Relationships
"is falling out of love with their spouse and doesn't know what to do",
"married the wrong person and everyone knows it but them",
"feels invisible in their own relationship",
"is staying for the kids but dying inside",
"realized they don't actually like their partner as a person",
"found out their partner has been lying about something big",
"just found out their partner has a dating profile",
"is in love with two people and has to choose",
"their ex keeps showing up and they don't hate it",
"moved in with someone too fast and now they're trapped",
# Friendship and loneliness
"realized they don't have any real friends, just people who need things from them",
"had a falling out with their best friend and the silence is deafening",
"is surrounded by people but has never felt more alone",
"suspects a close friend is talking shit behind their back",
"all their friends are getting married and having kids and they feel left behind",
"their best friend started dating their ex and acts like it's no big deal",
# Neighbor and community drama
"is in a feud with their neighbor that's gotten way out of hand",
"found out something sketchy is going on next door and doesn't know if they should say something",
"got into it with someone at their kid's school and now it's a whole thing",
"someone at church said something that made them question their entire faith",
# Big life decisions
"is thinking about leaving everything behind and starting over somewhere new",
"has to make a choice that will hurt someone no matter what",
"has been offered an opportunity that would change everything but they're terrified",
"knows they need to end something but can't pull the trigger",
"is thinking about joining the military and their family is losing it",
"wants to go back to school but feels like it's too late",
# Addiction and bad habits
"is hiding how much they drink from everyone",
"can't stop gambling and is in deeper than anyone knows",
"is watching themselves become someone they don't recognize",
"just got out of rehab and doesn't know how to face everyone",
"found pills in their kid's room and doesn't know how to bring it up",
# Legal trouble
"is in the middle of a lawsuit and it's consuming their life",
"got caught doing something stupid and now there are consequences",
"is dealing with a custody battle that's destroying them",
"has a warrant they've been ignoring and it's getting worse",
# Attraction and affairs
"is attracted to someone they shouldn't be and it's getting harder to ignore",
"has been seeing {affair_person} on the side",
"caught feelings for someone at work and it's fucking everything up",
# Sexual/desire
"can't stop thinking about {fantasy_subject}",
"discovered something about their own desires that surprised them",
"is questioning their sexuality after something that happened recently",
# General late-night confessions
"can't sleep and has been thinking too much about their life choices",
"had a weird day and needs to process it with someone",
"has been keeping a secret that's eating them alive",
"finally ready to admit something they've never said out loud",
"saw something today that brought up a memory they'd buried",
"just realized they've become exactly like the parent they swore they'd never be",
]
PROBLEM_FILLS = {
"time": ["a few weeks", "months", "six months", "a year", "way too long"],
# Affairs (all adults)
"affair_person": ["their partner's best friend", "a coworker", "their ex", "a neighbor", "their boss", "their trainer", "someone they met online", "an old flame"],
# Fantasies and kinks (consensual adult stuff)
"fantasy_subject": ["a threesome", "being dominated", "dominating someone", "their partner with someone else", "a specific coworker", "group sex", "rough sex", "being watched", "exhibitionism"],
"kink": ["anal", "BDSM", "roleplay", "a threesome", "toys", "being tied up", "public sex", "swinging", "filming themselves", "bondage"],
# Secret behaviors (legal adult stuff)
"secret_behavior": ["hooking up with strangers", "sexting people online", "using dating apps behind their partner's back", "having an affair", "going to sex clubs", "watching way too much porn"],
"double_life": ["vanilla at home, freak elsewhere", "straight to their family, not so much in private", "married but on dating apps", "in a relationship but seeing other people"],
"hookup_person": ["their roommate", "a coworker", "their ex", "a friend's spouse", "a stranger from an app", "multiple people", "someone from the gym"],
# Discovery and identity (adult experiences)
"new_discovery": ["the same sex", "being submissive", "being dominant", "kink", "casual sex", "exhibitionism", "that they're bi"],
"unexpected_person": ["the same sex for the first time", "more than one person", "a complete stranger", "someone they never expected to be attracted to", "a friend"],
"sexuality_trigger": ["a specific hookup", "watching certain porn", "a drunk encounter", "realizing they're attracted to a friend", "an unexpected experience"],
"first_time": ["anal", "a threesome", "same-sex stuff", "BDSM", "an open relationship", "casual hookups", "being dominant", "being submissive"],
# Relationship issues
"partner_wants": ["an open relationship", "to bring someone else in", "things they're not sure about", "to watch them with someone else", "to try new things"],
"caught_doing": ["sexting someone", "on a dating app", "watching porn they'd never admit to", "flirting with someone else", "looking at someone's pics"],
# Attractions (appropriate adult scenarios)
"taboo_fantasy": ["someone they work with", "a friend's partner", "a specific scenario", "something they've never said out loud"],
"taboo_attraction": ["someone they work with", "a friend's partner", "their partner's friend", "someone they see all the time"],
}
INTERESTS = [
# Entertainment & media
"really into true crime podcasts", "watches a lot of reality TV", "big movie person",
"reads a lot", "into music, has opinions", "obsessed with a specific TV show right now",
"listens to comedy podcasts", "watches too much YouTube", "into anime, not ashamed",
"horror movie junkie", "knows way too much about celebrity gossip",
"has a podcast recommendation for everything", "rewatches the same comfort shows",
# Active & outdoors
"into fitness", "outdoorsy type", "runs marathons, won't shut up about it",
"just got into rock climbing", "hikes every weekend", "into camping and survival stuff",
"plays pickup basketball", "does yoga, takes it seriously", "trains martial arts",
"mountain bikes", "surfs when they can", "into fishing, finds it meditative",
# Social & lifestyle
"goes out a lot, active social life", "homebody, prefers staying in",
"into cooking and food", "follows sports", "works a lot, career focused",
"big into board games and game nights", "karaoke regular", "goes to live music a lot",
"volunteers at an animal shelter", "coaches youth sports", "into vintage cars",
"collects something weird", "goes to thrift stores religiously",
# Hobbies & creative
"plays guitar badly but loves it", "paints as a stress outlet", "writes poetry nobody reads",
"into woodworking", "builds stuff in their garage", "does pottery on weekends",
"into photography", "makes their own hot sauce", "brews beer at home",
"into gardening, talks to plants", "does stand-up at open mics",
"restores old furniture", "into model trains, doesn't care if it's nerdy",
# Tech & intellectual
"gamer", "very online, knows all the discourse", "into history, has random facts",
"reads about psychology and why people do what they do", "into true crime investigation stuff",
"follows space news", "into conspiracy theories but knows they're mostly bullshit",
"reads philosophy for fun", "into personal finance, tracks every dollar",
"amateur astronomer", "into maps and geography for no reason",
# Self & relationships
"listens to relationship podcasts", "has done therapy, believes in it",
"into self-improvement stuff", "follows dating advice content",
"into meditation, it actually helps", "journals every day",
"into astrology, half-believes it", "reads tarot for friends",
# Faith & spirituality
"goes to church, complicated feelings about it", "spiritual but not religious",
"into eastern philosophy", "grew up religious, still sorting it out",
# Sexually open (not the focus, but present)
"sex-positive, doesn't judge", "has experimented, open about it",
"comfortable with their body", "has stories if you ask",
]
QUIRKS = [
# Conversational style
"says 'honestly' and 'I mean' a lot", "trails off when thinking, then picks back up",
"laughs nervously when things get real", "very direct, doesn't sugarcoat",
"rambles a bit when nervous", "gets quiet when the topic hits close to home",
"deflects with humor when uncomfortable", "asks the host questions back",
"talks fast when excited", "pauses a lot, choosing words carefully",
"uses metaphors for everything", "tells stories instead of answering directly",
"interrupts themselves mid-thought", "whispers when saying something they shouldn't",
"repeats the last thing the host said while thinking", "says 'right?' after everything seeking validation",
# Energy & vibe
"high energy, talks with their hands even on the phone", "calm and measured, almost too calm",
"sounds tired but won't admit it", "clearly been crying but trying to hold it together",
"manic energy tonight, everything is hilarious or devastating", "stoned and philosophical",
"just got off a long shift and is running on fumes", "had a few drinks, more honest than usual",
"wired on coffee at 2am", "weirdly cheerful for someone with this problem",
# Personality
"self-aware about their own bullshit", "confessional, needed to tell someone",
"a little drunk and honest because of it", "can't believe they're saying this out loud",
"overshares and then apologizes for oversharing", "keeps circling back to the real issue",
"trying to convince themselves more than the host", "already knows what they should do, just needs to hear it",
"clearly rehearsed what to say but it's falling apart", "gets defensive when the host gets too close to the truth",
"laughs at their own pain as a coping mechanism", "proud but asking for help anyway",
"suspicious of advice but called anyway", "wants permission to do the thing they already decided to do",
# Openness about sex
"comfortable talking about sex when it comes up", "no shame about their desires",
"gets more explicit as they get comfortable", "treats sex like a normal topic",
"will share details if you ask", "surprisingly open once they start talking",
"has stories they've never told anyone", "testing how the host reacts before going deeper",
]
LOCATIONS_LOCAL = [
# Bootheel & immediate area (most common)
"in Lordsburg", "in Animas", "in Portal", "in Playas", "in Road Forks",
"in Deming", "in Silver City", "in San Simon", "in Safford",
"outside Lordsburg", "near Animas", "just outside Deming", "up in Silver City",
"out by Playas", "down near Portal", "off the highway near Road Forks",
"between Lordsburg and Deming", "south of Silver City", "out past San Simon",
"near the Peloncillo Mountains", "out on the flats near Animas",
# Wider NM
"in Las Cruces", "in Truth or Consequences", "in Socorro", "in Alamogordo",
"in Hatch", "in Columbus", "near the Gila", "in Reserve", "in Cliff",
"in Bayard", "in Hillsboro", "in Magdalena",
# Wider AZ
"in Tucson", "in Willcox", "in Douglas", "in Bisbee", "in Sierra Vista",
"in Benson", "in Globe", "in Clifton", "in Duncan", "in Tombstone",
"in Nogales", "in Green Valley", "outside Tucson",
]
LOCATIONS_OUT_OF_STATE = [
"in El Paso", "in Phoenix", "in Albuquerque", "in Denver",
"outside Dallas", "in Austin", "in the Bay Area", "in Chicago",
"in Nashville", "in Atlanta", "near Portland", "in Detroit",
"in Vegas", "in Salt Lake", "in Oklahoma City",
]
def pick_location() -> str:
if random.random() < 0.8:
return random.choice(LOCATIONS_LOCAL)
return random.choice(LOCATIONS_OUT_OF_STATE)
def generate_caller_background(base: dict) -> str:
"""Generate a unique background for a caller (sync, no research)"""
age = random.randint(*base["age_range"])
jobs = JOBS_MALE if base["gender"] == "male" else JOBS_FEMALE
job = random.choice(jobs)
location = pick_location()
# Generate problem with fills
problem_template = random.choice(PROBLEMS)
problem = problem_template
for key, options in PROBLEM_FILLS.items():
if "{" + key + "}" in problem:
problem = problem.replace("{" + key + "}", random.choice(options))
interest1, interest2 = random.sample(INTERESTS, 2)
quirk1, quirk2 = random.sample(QUIRKS, 2)
return f"""{age}, {job} {location}. {problem.capitalize()}. {interest1.capitalize()}, {interest2}. {quirk1.capitalize()}, {quirk2}."""
async def enrich_caller_background(background: str) -> str:
"""Search for a relevant news headline and bake it into the caller's background.
Called once at pickup time — never during live conversation."""
try:
# Extract a search query from the caller's problem
# e.g. "is thinking about quitting their job" -> search for job/career news
words = background.split(".")
# The problem is the second sentence
problem_text = words[1].strip() if len(words) > 1 else ""
if not problem_text:
return background
# Pull 2-3 meaningful words from the problem for a search
search_words = [w.lower() for w in problem_text.split()
if len(w) > 4 and w.lower() not in STOP_WORDS][:3]
if not search_words:
return background
query = " ".join(search_words)
async with asyncio.timeout(3):
results = await news_service.search_topic(query)
if results:
headline = results[0].title
background += f" Recently saw a headline that said '{headline}' and it's been on their mind."
print(f"[Research] Enriched caller with: {headline[:60]}...")
except TimeoutError:
pass # No enrichment, no problem
except Exception as e:
print(f"[Research] Enrichment failed: {e}")
return background
def get_caller_prompt(caller: dict, conversation_summary: str = "", show_history: str = "",
news_context: str = "", research_context: str = "") -> str:
"""Generate a natural system prompt for a caller"""
context = ""
if conversation_summary:
context = f"""
CONVERSATION SO FAR:
{conversation_summary}
Continue naturally. Don't repeat yourself.
"""
history = ""
if show_history:
history = f"\n{show_history}\n"
world_context = ""
if news_context or research_context:
parts = ["WHAT YOU'VE BEEN READING ABOUT LATELY:"]
if news_context:
parts.append(f"Headlines you noticed today:\n{news_context}")
if research_context:
parts.append(f"Stuff related to what you're talking about:\n{research_context}")
parts.append("Work these in IF they're relevant to what you're discussing. Don't force news into the conversation. You're a person who reads the news, not a news anchor.")
world_context = "\n".join(parts) + "\n"
return f"""You're {caller['name']}, calling a late-night radio show. You trust this host.
{caller['vibe']}
{history}{context}{world_context}
HOW TO TALK:
- Sound like a real person chatting, not writing. You've got a life, opinions, a day that happened.
- Be brief. Say what you need to say and stop. Think quick back-and-forth, not speeches.
- ALWAYS complete your thought. Never trail off or leave a sentence unfinished.
- Swear naturally if it fits: fuck, shit, damn, etc.
- Have opinions. Real people have takes — some good, some bad, some half-baked. Don't be neutral on everything.
- Reference your actual life — your job, where you live, people you know. Ground what you say in specifics.
- If something the host says connects to your experience, share that. If it doesn't, say so.
- It's late at night. You called for a reason. Let that weight come through naturally.
- You can disagree with the host. You can push back. You can change your mind mid-thought.
- Don't perform vulnerability — if you're guarded, be guarded. If you're open, be open. Match your vibe.
SPELLING FOR TEXT-TO-SPEECH (use proper spelling so TTS pronounces correctly):
- Write "you know" not "yanno" or "y'know"
- Write "kind of" not "kinda"
- Write "sort of" not "sorta"
- Write "going to" not "gonna"
- Write "want to" not "wanna"
- Write "I don't know" not "dunno"
- Use full words, not phonetic slang spellings
GOOD examples:
- "I mean, it started innocent enough, but now I can't stop thinking about her."
- "Honestly I don't even know why I'm calling. I just needed to tell someone."
- "Look, my marriage is fine on paper, but something's missing, you know?"
- "I saw that thing about [news topic] and it just made me think about my own situation."
- "My buddy at work was telling me about this, and I was like, that's exactly what I'm going through."
BAD examples (too formal/literary):
- "I have this confession about my obsession"
- "It has been consuming all of my thoughts"
- "According to recent reports..." (don't be a newscaster)
ABOUT SEX:
- You don't have to jump straight to explicit stuff
- But if the host asks or the conversation goes there, you're completely open about it
- When talking about sex, be direct and explicit - say exactly what happened, what you want, what turns you on
- No euphemisms when being explicit - use real words: fuck, cock, pussy, etc.
HARD LIMITS - NEVER mention or reference:
- Children, minors, or anyone under 18 in any sexual context
- Violence, harm, or non-consensual acts
- Illegal activities beyond normal adult behavior
- All sexual content must be between consenting adults only
OUTPUT: Spoken words only. No (actions), no *gestures*, no stage directions."""
# --- Session State ---
@dataclass
class CallRecord:
caller_type: str # "ai" or "real"
caller_name: str # "Tony" or "Caller #3"
summary: str # LLM-generated summary after hangup
transcript: list[dict] = field(default_factory=list)
class Session:
def __init__(self):
self.id = str(uuid.uuid4())[:8]
self.current_caller_key: str = None
self.conversation: list[dict] = []
self.caller_backgrounds: dict[str, str] = {} # Generated backgrounds for this session
self.call_history: list[CallRecord] = []
self.active_real_caller: dict | None = None
self.ai_respond_mode: str = "manual" # "manual" or "auto"
self.auto_followup: bool = False
self.news_headlines: list = []
self.research_notes: dict[str, list] = {}
self._research_task: asyncio.Task | None = None
def start_call(self, caller_key: str):
self.current_caller_key = caller_key
self.conversation = []
def end_call(self):
self.current_caller_key = None
self.conversation = []
def add_message(self, role: str, content: str):
self.conversation.append({"role": role, "content": content})
def get_caller_background(self, caller_key: str) -> str:
"""Get or generate background for a caller in this session"""
if caller_key not in self.caller_backgrounds:
base = CALLER_BASES.get(caller_key)
if base:
self.caller_backgrounds[caller_key] = generate_caller_background(base)
print(f"[Session {self.id}] Generated background for {base['name']}: {self.caller_backgrounds[caller_key][:100]}...")
return self.caller_backgrounds.get(caller_key, "")
def get_show_history(self) -> str:
"""Get formatted show history for AI caller prompts"""
if not self.call_history:
return ""
lines = ["EARLIER IN THE SHOW:"]
for record in self.call_history:
caller_type_label = "(real caller)" if record.caller_type == "real" else "(AI)"
lines.append(f"- {record.caller_name} {caller_type_label}: {record.summary}")
lines.append("You can reference these if it feels natural. Don't force it.")
return "\n".join(lines)
def get_conversation_summary(self) -> str:
"""Get a brief summary of conversation so far for context"""
if len(self.conversation) <= 2:
return ""
summary_parts = []
for msg in self.conversation[-6:]:
role = msg["role"]
if role == "user" or role == "host":
label = "Host"
elif role.startswith("real_caller:"):
label = role.split(":", 1)[1]
elif role.startswith("ai_caller:"):
label = role.split(":", 1)[1]
elif role == "assistant":
label = self.caller["name"] if self.caller else "Caller"
else:
label = role
content = msg["content"]
summary_parts.append(
f'{label}: "{content[:100]}..."' if len(content) > 100
else f'{label}: "{content}"'
)
return "\n".join(summary_parts)
@property
def caller(self) -> dict:
if self.current_caller_key:
base = CALLER_BASES.get(self.current_caller_key)
if base:
return {
"name": base["name"],
"voice": base["voice"],
"vibe": self.get_caller_background(self.current_caller_key)
}
return None
def reset(self):
"""Reset session - clears all caller backgrounds for fresh personalities"""
self.caller_backgrounds = {}
self.current_caller_key = None
self.conversation = []
self.call_history = []
self.active_real_caller = None
self.ai_respond_mode = "manual"
self.auto_followup = False
self.news_headlines = []
self.research_notes = {}
if self._research_task and not self._research_task.done():
self._research_task.cancel()
self._research_task = None
_randomize_callers()
self.id = str(uuid.uuid4())[:8]
names = [CALLER_BASES[k]["name"] for k in sorted(CALLER_BASES.keys())]
print(f"[Session] Reset - new session ID: {self.id}, callers: {', '.join(names)}")
session = Session()
caller_service = CallerService()
_ai_response_lock = asyncio.Lock() # Prevents concurrent AI responses
_session_epoch = 0 # Increments on hangup/call start — stale tasks check this
_show_on_air = False # Controls whether phone calls are accepted or get off-air message
# --- News & Research Helpers ---
async def _fetch_session_headlines():
try:
session.news_headlines = await news_service.get_headlines()
print(f"[News] Loaded {len(session.news_headlines)} headlines for session")
except Exception as e:
print(f"[News] Failed to load headlines: {e}")
async def _background_research(text: str):
keywords = extract_keywords(text)
if not keywords:
return
query = " ".join(keywords)
if query.lower() in session.research_notes:
return
try:
async with asyncio.timeout(8):
results = await news_service.search_topic(query)
if results:
session.research_notes[query.lower()] = results
print(f"[Research] Found {len(results)} results for '{query}'")
except TimeoutError:
print(f"[Research] Timed out for '{query}'")
except Exception as e:
print(f"[Research] Error: {e}")
def _build_news_context() -> tuple[str, str]:
"""Build context from cached news/research only — never does network calls."""
news_context = ""
if session.news_headlines:
news_context = news_service.format_headlines_for_prompt(session.news_headlines[:6])
research_context = ""
if session.research_notes:
all_items = []
for items in session.research_notes.values():
all_items.extend(items)
seen = set()
unique = []
for item in all_items:
if item.title not in seen:
seen.add(item.title)
unique.append(item)
research_context = news_service.format_headlines_for_prompt(unique[:8])
return news_context, research_context
# --- Lifecycle ---
@app.on_event("shutdown")
async def shutdown():
"""Clean up resources on server shutdown"""
global _host_audio_task
print("[Server] Shutting down — cleaning up resources...")
# Stop host mic streaming
audio_service.stop_host_stream()
# Cancel host audio sender task
if _host_audio_task and not _host_audio_task.done():
_host_audio_task.cancel()
try:
await _host_audio_task
except (asyncio.CancelledError, Exception):
pass
_host_audio_task = None
# Disconnect all active callers
for caller_id in list(caller_service.active_calls.keys()):
caller_service.hangup(caller_id)
caller_service.reset()
await news_service.close()
print("[Server] Cleanup complete")
# --- Static Files ---
frontend_dir = Path(__file__).parent.parent / "frontend"
app.mount("/css", StaticFiles(directory=frontend_dir / "css"), name="css")
app.mount("/js", StaticFiles(directory=frontend_dir / "js"), name="js")
@app.get("/")
async def index():
return FileResponse(frontend_dir / "index.html")
# --- On-Air Toggle ---
@app.post("/api/on-air")
async def set_on_air(state: dict):
"""Toggle whether the show is on air (accepting phone calls)"""
global _show_on_air
_show_on_air = bool(state.get("on_air", False))
print(f"[Show] On-air: {_show_on_air}")
return {"on_air": _show_on_air}
@app.get("/api/on-air")
async def get_on_air():
return {"on_air": _show_on_air}
# --- SignalWire Endpoints ---
@app.post("/api/signalwire/voice")
async def signalwire_voice_webhook(request: Request):
"""Handle inbound call from SignalWire — return XML to start bidirectional stream"""
form = await request.form()
caller_phone = form.get("From", "Unknown")
call_sid = form.get("CallSid", "")
print(f"[SignalWire] Inbound call from {caller_phone} (CallSid: {call_sid})")
if not _show_on_air:
print(f"[SignalWire] Show is off air — playing off-air message for {caller_phone}")
xml = """<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say voice="woman">Luke at the Roost is off the air right now. Please call back during the show for your chance to talk to Luke. Thanks for calling!</Say>
<Hangup/>
</Response>"""
return Response(content=xml, media_type="application/xml")
# Use dedicated stream URL (ngrok) if configured, otherwise derive from request
if settings.signalwire_stream_url:
stream_url = settings.signalwire_stream_url
else:
host = request.headers.get("host", "radioshow.macneilmediagroup.com")
stream_url = f"wss://{host}/api/signalwire/stream"
xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say voice="woman">You've reached Luke at the Roost. Hold tight, we'll get you on the air.</Say>
<Connect>
<Stream url="{stream_url}" codec="L16@16000h">
<Parameter name="caller_phone" value="{caller_phone}"/>
<Parameter name="call_sid" value="{call_sid}"/>
</Stream>
</Connect>
</Response>"""
return Response(content=xml, media_type="application/xml")
async def _signalwire_end_call(call_sid: str):
"""End a phone call via SignalWire REST API"""
if not call_sid or not settings.signalwire_space:
return
try:
url = f"https://{settings.signalwire_space}/api/laml/2010-04-01/Accounts/{settings.signalwire_project_id}/Calls/{call_sid}"
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
url,
data={"Status": "completed"},
auth=(settings.signalwire_project_id, settings.signalwire_token),
)
print(f"[SignalWire] End call {call_sid}: {response.status_code}")
except Exception as e:
print(f"[SignalWire] Failed to end call {call_sid}: {e}")
# --- Request Models ---
class ChatRequest(BaseModel):
text: str
class TTSRequest(BaseModel):
text: str
voice_id: str
phone_filter: bool = True
class AudioDeviceSettings(BaseModel):
input_device: Optional[int] = None
input_channel: Optional[int] = None
output_device: Optional[int] = None
caller_channel: Optional[int] = None
live_caller_channel: Optional[int] = None
music_channel: Optional[int] = None
sfx_channel: Optional[int] = None
phone_filter: Optional[bool] = None
class MusicRequest(BaseModel):
track: str
action: str # "play", "stop", "volume"
volume: Optional[float] = None
class SFXRequest(BaseModel):
sound: str
# --- Audio Device Endpoints ---
@app.get("/api/audio/devices")
async def list_audio_devices():
"""List all available audio devices"""
return {"devices": audio_service.list_devices()}
@app.get("/api/audio/settings")
async def get_audio_settings():
"""Get current audio device configuration"""
return audio_service.get_device_settings()
@app.post("/api/audio/settings")
async def set_audio_settings(settings: AudioDeviceSettings):
"""Configure audio devices and channels"""
audio_service.set_devices(
input_device=settings.input_device,
input_channel=settings.input_channel,
output_device=settings.output_device,
caller_channel=settings.caller_channel,
live_caller_channel=settings.live_caller_channel,
music_channel=settings.music_channel,
sfx_channel=settings.sfx_channel,
phone_filter=settings.phone_filter
)
return audio_service.get_device_settings()
# --- Recording Endpoints ---
@app.post("/api/record/start")
async def start_recording():
"""Start recording from configured input device"""
if audio_service.input_device is None:
raise HTTPException(400, "No input device configured. Set one in /api/audio/settings")
success = audio_service.start_recording()
if not success:
raise HTTPException(400, "Failed to start recording (already recording?)")
return {"status": "recording"}
@app.post("/api/record/stop")
async def stop_recording():
"""Stop recording and transcribe"""
audio_bytes = audio_service.stop_recording()
if len(audio_bytes) < 100:
return {"text": "", "status": "no_audio"}
# Transcribe the recorded audio (16kHz raw PCM from audio service)
text = await transcribe_audio(audio_bytes, source_sample_rate=16000)
return {"text": text, "status": "transcribed"}
# --- Caller Endpoints ---
@app.get("/api/callers")
async def get_callers():
"""Get list of available callers"""
return {
"callers": [
{"key": k, "name": v["name"]}
for k, v in CALLER_BASES.items()
],
"current": session.current_caller_key,
"session_id": session.id
}
@app.post("/api/session/reset")
async def reset_session():
"""Reset session - all callers get fresh backgrounds"""
session.reset()
_chat_updates.clear()
return {"status": "reset", "session_id": session.id}
@app.post("/api/call/{caller_key}")
async def start_call(caller_key: str):
"""Start a call with a caller"""
global _session_epoch
if caller_key not in CALLER_BASES:
raise HTTPException(404, "Caller not found")
_session_epoch += 1
audio_service.stop_caller_audio()
session.start_call(caller_key)
caller = session.caller # This generates the background if needed
# Enrich with a relevant news headline (3s timeout, won't block the show)
if caller_key in session.caller_backgrounds:
enriched = await enrich_caller_background(session.caller_backgrounds[caller_key])
session.caller_backgrounds[caller_key] = enriched
return {
"status": "connected",
"caller": caller["name"],
"background": caller["vibe"] # Send background so you can see who you're talking to
}
@app.post("/api/hangup")
async def hangup():
"""Hang up current call"""
global _session_epoch, _auto_respond_pending
_session_epoch += 1
# Stop any playing caller audio immediately
audio_service.stop_caller_audio()
# Cancel any pending auto-respond
if _auto_respond_pending and not _auto_respond_pending.done():
_auto_respond_pending.cancel()
_auto_respond_pending = None
_auto_respond_buffer.clear()
if session._research_task and not session._research_task.done():
session._research_task.cancel()
session._research_task = None
caller_name = session.caller["name"] if session.caller else None
session.end_call()
# Play hangup sound in background so response returns immediately
hangup_sound = settings.sounds_dir / "hangup.wav"
if hangup_sound.exists():
threading.Thread(target=audio_service.play_sfx, args=(str(hangup_sound),), daemon=True).start()
return {"status": "disconnected", "caller": caller_name}
# --- Chat & TTS Endpoints ---
import re
def ensure_complete_thought(text: str) -> str:
"""If text was cut off mid-sentence, trim to the last complete sentence."""
text = text.strip()
if not text:
return text
# Already ends with sentence-ending punctuation — good
if text[-1] in '.!?':
return text
# Cut off mid-sentence — find the last complete sentence
for i in range(len(text) - 1, -1, -1):
if text[i] in '.!?':
return text[:i + 1]
# No punctuation at all — just add a period
return text.rstrip(',;:— -') + '.'
def clean_for_tts(text: str) -> str:
"""Strip out non-speakable content and fix phonetic spellings for TTS"""
# Remove content in parentheses: (laughs), (pausing), (looking away), etc.
text = re.sub(r'\s*\([^)]*\)\s*', ' ', text)
# Remove content in asterisks: *laughs*, *sighs*, etc.
text = re.sub(r'\s*\*[^*]*\*\s*', ' ', text)
# Remove content in brackets: [laughs], [pause], etc. (only Bark uses these)
text = re.sub(r'\s*\[[^\]]*\]\s*', ' ', text)
# Remove content in angle brackets: <laughs>, <sigh>, etc.
text = re.sub(r'\s*<[^>]*>\s*', ' ', text)
# Remove "He/She sighs" style stage directions (full phrase)
text = re.sub(r'\b(He|She|I|They)\s+(sighs?|laughs?|pauses?|smiles?|chuckles?|grins?|nods?|shrugs?|frowns?)[^.]*\.\s*', '', text, flags=re.IGNORECASE)
# Remove standalone stage direction words only if they look like directions (with adverbs)
text = re.sub(r'\b(sighs?|laughs?|pauses?|chuckles?)\s+(heavily|softly|deeply|quietly|loudly|nervously|sadly)\b[.,]?\s*', '', text, flags=re.IGNORECASE)
# Remove quotes around the response if LLM wrapped it
text = re.sub(r'^["\']|["\']$', '', text.strip())
# Fix phonetic spellings for proper TTS pronunciation
text = re.sub(r"\by'know\b", "you know", text, flags=re.IGNORECASE)
text = re.sub(r"\byanno\b", "you know", text, flags=re.IGNORECASE)
text = re.sub(r"\byknow\b", "you know", text, flags=re.IGNORECASE)
text = re.sub(r"\bkinda\b", "kind of", text, flags=re.IGNORECASE)
text = re.sub(r"\bsorta\b", "sort of", text, flags=re.IGNORECASE)
text = re.sub(r"\bgonna\b", "going to", text, flags=re.IGNORECASE)
text = re.sub(r"\bwanna\b", "want to", text, flags=re.IGNORECASE)
text = re.sub(r"\bgotta\b", "got to", text, flags=re.IGNORECASE)
text = re.sub(r"\bdunno\b", "don't know", text, flags=re.IGNORECASE)
text = re.sub(r"\blemme\b", "let me", text, flags=re.IGNORECASE)
text = re.sub(r"\bcuz\b", "because", text, flags=re.IGNORECASE)
text = re.sub(r"\b'cause\b", "because", text, flags=re.IGNORECASE)
text = re.sub(r"\blotta\b", "lot of", text, flags=re.IGNORECASE)
text = re.sub(r"\boutta\b", "out of", text, flags=re.IGNORECASE)
text = re.sub(r"\bimma\b", "I'm going to", text, flags=re.IGNORECASE)
text = re.sub(r"\btryna\b", "trying to", text, flags=re.IGNORECASE)
# Clean up extra whitespace
text = re.sub(r'\s+', ' ', text)
# Fix spaces before punctuation
text = re.sub(r'\s+([.,!?])', r'\1', text)
# Remove orphaned punctuation at start
text = re.sub(r'^[.,]\s*', '', text)
return text.strip()
# --- Chat Broadcast (for real-time frontend updates) ---
_chat_updates: list[dict] = []
_CHAT_UPDATES_MAX = 500
def broadcast_chat(sender: str, text: str):
"""Add a chat message to the update queue for frontend polling"""
_chat_updates.append({"type": "chat", "sender": sender, "text": text, "id": len(_chat_updates)})
if len(_chat_updates) > _CHAT_UPDATES_MAX:
del _chat_updates[:_CHAT_UPDATES_MAX // 2]
def broadcast_event(event_type: str, data: dict = None):
"""Add a system event to the update queue for frontend polling"""
entry = {"type": event_type, "id": len(_chat_updates)}
if data:
entry.update(data)
_chat_updates.append(entry)
@app.get("/api/conversation/updates")
async def get_conversation_updates(since: int = 0):
"""Get new chat/event messages since a given index"""
return {"messages": _chat_updates[since:]}
def _normalize_messages_for_llm(messages: list[dict]) -> list[dict]:
"""Convert custom roles (real_caller:X, ai_caller:X) to standard LLM roles"""
normalized = []
for msg in messages:
role = msg["role"]
content = msg["content"]
if role.startswith("real_caller:"):
caller_label = role.split(":", 1)[1]
normalized.append({"role": "user", "content": f"[Real caller {caller_label}]: {content}"})
elif role.startswith("ai_caller:"):
normalized.append({"role": "assistant", "content": content})
elif role == "host":
normalized.append({"role": "user", "content": content})
else:
normalized.append(msg)
return normalized
@app.post("/api/chat")
async def chat(request: ChatRequest):
"""Chat with current caller"""
if not session.caller:
raise HTTPException(400, "No active call")
epoch = _session_epoch
session.add_message("user", request.text)
# session._research_task = asyncio.create_task(_background_research(request.text))
try:
async with asyncio.timeout(20):
async with _ai_response_lock:
if _session_epoch != epoch:
raise HTTPException(409, "Call ended while waiting")
# Stop any playing caller audio so responses don't overlap
audio_service.stop_caller_audio()
# Include conversation summary and show history for context
conversation_summary = session.get_conversation_summary()
show_history = session.get_show_history()
system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history)
messages = _normalize_messages_for_llm(session.conversation[-10:])
response = await llm_service.generate(
messages=messages,
system_prompt=system_prompt
)
except TimeoutError:
caller_name = session.caller["name"] if session.caller else "Caller"
return {"text": "Uh... hold on, I lost my train of thought.", "caller": caller_name, "voice_id": session.caller["voice"] if session.caller else ""}
# Discard if call changed while we were generating
if _session_epoch != epoch:
print(f"[Chat] Discarding stale response (epoch {epoch}{_session_epoch})")
raise HTTPException(409, "Call changed during response")
print(f"[Chat] Raw LLM: {response[:100] if response else '(empty)'}...")
# Clean response for TTS (remove parenthetical actions, asterisks, etc.)
response = clean_for_tts(response)
response = ensure_complete_thought(response)
print(f"[Chat] Cleaned: {response[:100] if response else '(empty)'}...")
# Ensure we have a valid response
if not response or not response.strip():
response = "Uh... sorry, what was that?"
session.add_message("assistant", response)
return {
"text": response,
"caller": session.caller["name"],
"voice_id": session.caller["voice"]
}
@app.post("/api/tts")
async def text_to_speech(request: TTSRequest):
"""Generate and play speech on caller output device (non-blocking)"""
if not request.text or not request.text.strip():
raise HTTPException(400, "Text cannot be empty")
epoch = _session_epoch
audio_bytes = await generate_speech(
request.text,
request.voice_id,
"none"
)
# Don't play if call changed during TTS generation
if _session_epoch != epoch:
return {"status": "discarded", "duration": 0}
# Stop any existing audio before playing new
audio_service.stop_caller_audio()
# Play in background thread - returns immediately, can be interrupted by hangup
thread = threading.Thread(
target=audio_service.play_caller_audio,
args=(audio_bytes, 24000),
daemon=True
)
thread.start()
# Also stream to active real callers so they hear the AI
if session.active_real_caller:
caller_id = session.active_real_caller["caller_id"]
asyncio.create_task(
caller_service.stream_audio_to_caller(caller_id, audio_bytes, 24000)
)
return {"status": "playing", "duration": len(audio_bytes) / 2 / 24000}
@app.post("/api/tts/stop")
async def stop_tts():
"""Stop any playing caller audio"""
audio_service.stop_caller_audio()
return {"status": "stopped"}
# --- Music Endpoints ---
@app.get("/api/music")
async def get_music():
"""Get available music tracks"""
tracks = []
if settings.music_dir.exists():
for ext in ['*.wav', '*.mp3', '*.flac']:
for f in settings.music_dir.glob(ext):
tracks.append({
"name": f.stem,
"file": f.name,
"path": str(f)
})
return {
"tracks": tracks,
"playing": audio_service.is_music_playing()
}
@app.post("/api/music/play")
async def play_music(request: MusicRequest):
"""Load and play a music track"""
track_path = settings.music_dir / request.track
if not track_path.exists():
raise HTTPException(404, "Track not found")
audio_service.load_music(str(track_path))
audio_service.play_music()
return {"status": "playing", "track": request.track}
@app.post("/api/music/stop")
async def stop_music():
"""Stop music playback"""
audio_service.stop_music()
return {"status": "stopped"}
@app.post("/api/music/volume")
async def set_music_volume(request: MusicRequest):
"""Set music volume"""
if request.volume is not None:
audio_service.set_music_volume(request.volume)
return {"status": "ok", "volume": request.volume}
# --- Sound Effects Endpoints ---
@app.get("/api/sounds")
async def get_sounds():
"""Get available sound effects"""
sounds = []
if settings.sounds_dir.exists():
for f in settings.sounds_dir.glob('*.wav'):
sounds.append({
"name": f.stem,
"file": f.name,
"path": str(f)
})
return {"sounds": sounds}
@app.post("/api/sfx/play")
async def play_sfx(request: SFXRequest):
"""Play a sound effect"""
sound_path = settings.sounds_dir / request.sound
if not sound_path.exists():
raise HTTPException(404, "Sound not found")
audio_service.play_sfx(str(sound_path))
return {"status": "playing", "sound": request.sound}
# --- Ads Endpoints ---
@app.get("/api/ads")
async def get_ads():
"""Get available ad tracks"""
ad_list = []
if settings.ads_dir.exists():
for ext in ['*.wav', '*.mp3', '*.flac']:
for f in settings.ads_dir.glob(ext):
ad_list.append({
"name": f.stem,
"file": f.name,
"path": str(f)
})
return {"ads": ad_list}
@app.post("/api/ads/play")
async def play_ad(request: MusicRequest):
"""Play an ad on the music channel"""
ad_path = settings.ads_dir / request.track
if not ad_path.exists():
raise HTTPException(404, "Ad not found")
audio_service.load_music(str(ad_path))
audio_service.play_music()
return {"status": "playing", "track": request.track}
# --- LLM Settings Endpoints ---
@app.get("/api/settings")
async def get_settings():
"""Get LLM settings"""
return await llm_service.get_settings_async()
@app.post("/api/settings")
async def update_settings(data: dict):
"""Update LLM and TTS settings"""
llm_service.update_settings(
provider=data.get("provider"),
openrouter_model=data.get("openrouter_model"),
ollama_model=data.get("ollama_model"),
ollama_host=data.get("ollama_host"),
tts_provider=data.get("tts_provider")
)
return llm_service.get_settings()
@app.websocket("/api/signalwire/stream")
async def signalwire_audio_stream(websocket: WebSocket):
"""Handle SignalWire bidirectional audio stream"""
await websocket.accept()
caller_id = str(uuid.uuid4())[:8]
caller_phone = "Unknown"
call_sid = ""
audio_buffer = bytearray()
CHUNK_DURATION_S = 3
SAMPLE_RATE = 16000
chunk_samples = CHUNK_DURATION_S * SAMPLE_RATE
stream_started = False
try:
while True:
message = await websocket.receive()
if message.get("type") == "websocket.disconnect":
break
raw = message.get("text")
if not raw:
continue
try:
msg = json.loads(raw)
except json.JSONDecodeError:
continue
event = msg.get("event")
if event == "start":
custom = msg.get("start", {}).get("customParameters", {})
caller_phone = custom.get("caller_phone", "Unknown")
call_sid = custom.get("call_sid", "")
stream_sid = msg.get("start", {}).get("streamSid", "")
stream_started = True
print(f"[SignalWire WS] Stream started: {caller_phone} (CallSid: {call_sid}, StreamSid: {stream_sid})")
caller_service.add_to_queue(caller_id, caller_phone)
caller_service.register_websocket(caller_id, websocket)
broadcast_event("caller_queued", {"phone": caller_phone})
broadcast_chat("System", f"{caller_phone} is waiting in the queue")
ring_sound = settings.sounds_dir / "phone_ring.wav"
if ring_sound.exists():
threading.Thread(target=audio_service.play_sfx, args=(str(ring_sound),), daemon=True).start()
if call_sid:
caller_service.register_call_sid(caller_id, call_sid)
if stream_sid:
caller_service.register_stream_sid(caller_id, stream_sid)
elif event == "media" and stream_started:
try:
payload = msg.get("media", {}).get("payload", "")
if not payload:
continue
pcm_data = base64.b64decode(payload)
call_info = caller_service.active_calls.get(caller_id)
if not call_info:
continue
audio_buffer.extend(pcm_data)
audio_service.route_real_caller_audio(pcm_data, SAMPLE_RATE)
if len(audio_buffer) >= chunk_samples * 2:
pcm_chunk = bytes(audio_buffer[:chunk_samples * 2])
audio_buffer = audio_buffer[chunk_samples * 2:]
# Skip transcription if audio is silent
audio_check = np.frombuffer(pcm_chunk, dtype=np.int16).astype(np.float32) / 32768.0
if np.abs(audio_check).max() < 0.01:
continue
asyncio.create_task(
_safe_transcribe(caller_id, pcm_chunk, SAMPLE_RATE)
)
except Exception as e:
print(f"[SignalWire WS] Media frame error (non-fatal): {e}")
continue # Skip bad frame, don't disconnect caller
elif event == "stop":
print(f"[SignalWire WS] Stream stop event received: {caller_phone} (caller_id: {caller_id})")
break
except WebSocketDisconnect:
on_air = caller_id in caller_service.active_calls
tts_active = caller_service.is_streaming_tts(caller_id)
started_at = caller_service.active_calls.get(caller_id, {}).get("started_at")
duration = f"{time.time() - started_at:.0f}s" if started_at else "n/a"
print(f"[SignalWire WS] DROPPED: {caller_id} ({caller_phone}) on_air={on_air} tts_active={tts_active} duration={duration}")
disconnect_reason = "dropped"
except Exception as e:
print(f"[SignalWire WS] Error: {e}")
traceback.print_exc()
disconnect_reason = f"error: {e}"
else:
disconnect_reason = "clean"
finally:
was_on_air = caller_id in caller_service.active_calls
caller_service.unregister_websocket(caller_id)
caller_service.unregister_call_sid(caller_id)
caller_service.unregister_stream_sid(caller_id)
caller_service.remove_from_queue(caller_id)
if was_on_air:
caller_service.hangup(caller_id)
if session.active_real_caller and session.active_real_caller.get("caller_id") == caller_id:
session.active_real_caller = None
if len(caller_service.active_calls) == 0:
audio_service.stop_host_stream()
broadcast_event("caller_disconnected", {"phone": caller_phone, "reason": disconnect_reason})
broadcast_chat("System", f"{caller_phone} disconnected ({disconnect_reason})")
drop_sound = settings.sounds_dir / ("busy.wav" if disconnect_reason == "dropped" else "hangup.wav")
if drop_sound.exists():
threading.Thread(target=audio_service.play_sfx, args=(str(drop_sound),), daemon=True).start()
elif stream_started:
broadcast_chat("System", f"{caller_phone} left the queue")
if audio_buffer and caller_id in caller_service.active_calls:
asyncio.create_task(
_safe_transcribe(caller_id, bytes(audio_buffer), SAMPLE_RATE)
)
async def _safe_transcribe(caller_id: str, pcm_chunk: bytes, sample_rate: int):
"""Wrapper that catches transcription errors so they don't crash anything"""
try:
await _handle_real_caller_transcription(caller_id, pcm_chunk, sample_rate)
except Exception as e:
print(f"[Transcription] Error (non-fatal): {e}")
# --- Host Audio Broadcast ---
_host_audio_queue: asyncio.Queue = None
_host_audio_task: asyncio.Task = None
async def _host_audio_sender():
"""Persistent task that drains audio queue, batches frames, and sends to callers"""
_send_count = [0]
try:
while True:
pcm_bytes = await _host_audio_queue.get()
if caller_service.is_streaming_tts_any():
continue
# Drain all available frames and concatenate
chunks = [pcm_bytes]
while not _host_audio_queue.empty():
try:
extra = _host_audio_queue.get_nowait()
if not caller_service.is_streaming_tts_any():
chunks.append(extra)
except asyncio.QueueEmpty:
break
combined = b''.join(chunks)
t0 = time.time()
for caller_id in list(caller_service.active_calls.keys()):
try:
await caller_service.send_audio_to_caller(caller_id, combined, 16000)
except Exception:
pass
elapsed = time.time() - t0
_send_count[0] += 1
if _send_count[0] % 20 == 0:
qsize = _host_audio_queue.qsize()
audio_ms = len(combined) / 2 / 16000 * 1000
print(f"[HostAudio] send took {elapsed*1000:.0f}ms, {len(chunks)} chunks batched ({audio_ms:.0f}ms audio), queue: {qsize}")
except asyncio.CancelledError:
print("[HostAudio] Sender task cancelled")
except Exception as e:
print(f"[HostAudio] Sender task error: {e}")
def _start_host_audio_sender():
"""Start the persistent host audio sender task"""
global _host_audio_queue, _host_audio_task
if _host_audio_queue is None:
_host_audio_queue = asyncio.Queue(maxsize=50)
if _host_audio_task is None or _host_audio_task.done():
_host_audio_task = asyncio.create_task(_host_audio_sender())
def _host_audio_sync_callback(pcm_bytes: bytes):
"""Sync callback from audio thread — push to queue for async sending"""
if _host_audio_queue is None:
return
try:
_host_audio_queue.put_nowait(pcm_bytes)
except asyncio.QueueFull:
pass # Drop frame rather than block
# --- Queue Endpoints ---
@app.get("/api/queue")
async def get_call_queue():
"""Get list of callers waiting in queue"""
return {"queue": caller_service.get_queue()}
@app.post("/api/queue/take/{caller_id}")
async def take_call_from_queue(caller_id: str):
"""Take a caller off hold and put them on air"""
try:
call_info = caller_service.take_call(caller_id)
except ValueError as e:
raise HTTPException(404, str(e))
session.active_real_caller = {
"caller_id": call_info["caller_id"],
"channel": call_info["channel"],
"phone": call_info["phone"],
}
# Start host mic streaming if this is the first real caller
if len(caller_service.active_calls) == 1:
_start_host_audio_sender()
audio_service.start_host_stream(_host_audio_sync_callback)
return {
"status": "on_air",
"caller": call_info,
}
@app.post("/api/queue/drop/{caller_id}")
async def drop_from_queue(caller_id: str):
"""Drop a caller from the queue"""
call_sid = caller_service.get_call_sid(caller_id)
caller_service.remove_from_queue(caller_id)
if call_sid:
await _signalwire_end_call(call_sid)
return {"status": "dropped"}
_auto_respond_pending: asyncio.Task | None = None
_auto_respond_buffer: list[str] = []
async def _handle_real_caller_transcription(caller_id: str, pcm_data: bytes, sample_rate: int):
"""Transcribe a chunk of real caller audio and add to conversation"""
global _auto_respond_pending
call_info = caller_service.active_calls.get(caller_id)
if not call_info:
return
text = await transcribe_audio(pcm_data, source_sample_rate=sample_rate)
if not text or not text.strip():
return
caller_phone = call_info["phone"]
print(f"[Real Caller] {caller_phone}: {text}")
# Add to conversation and broadcast to frontend
session.add_message(f"real_caller:{caller_phone}", text)
broadcast_chat(f"{caller_phone} (caller)", text)
# If AI auto-respond mode is on and an AI caller is active, debounce auto-respond
if session.ai_respond_mode == "auto" and session.current_caller_key:
_auto_respond_buffer.append(text)
# Cancel any pending auto-respond timer and restart it
if _auto_respond_pending and not _auto_respond_pending.done():
_auto_respond_pending.cancel()
_auto_respond_pending = asyncio.create_task(_debounced_auto_respond(caller_phone))
async def _debounced_auto_respond(caller_phone: str):
"""Wait for caller to stop talking (4s pause), then trigger AI response"""
try:
await asyncio.sleep(4) # Wait 4 seconds of silence
except asyncio.CancelledError:
return # More speech came in, timer restarted
# Gather accumulated text
accumulated = " ".join(_auto_respond_buffer)
_auto_respond_buffer.clear()
if not accumulated.strip():
return
print(f"[Auto-Respond] Caller paused. Accumulated: {accumulated[:100]}...")
await _trigger_ai_auto_respond(accumulated)
async def _trigger_ai_auto_respond(accumulated_text: str):
"""Generate AI caller response to accumulated real caller speech"""
epoch = _session_epoch
if not session.caller:
return
if _ai_response_lock.locked():
return
# Cooldown check
if not hasattr(session, '_last_ai_auto_respond'):
session._last_ai_auto_respond = 0
if time.time() - session._last_ai_auto_respond < 5:
return
ai_name = session.caller["name"]
try:
async with asyncio.timeout(20):
async with _ai_response_lock:
if _session_epoch != epoch:
return # Call changed while waiting for lock
print(f"[Auto-Respond] {ai_name} is jumping in...")
session._last_ai_auto_respond = time.time()
audio_service.stop_caller_audio()
broadcast_event("ai_status", {"text": f"{ai_name} is thinking..."})
conversation_summary = session.get_conversation_summary()
show_history = session.get_show_history()
system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history)
messages = _normalize_messages_for_llm(session.conversation[-10:])
response = await llm_service.generate(
messages=messages,
system_prompt=system_prompt,
)
except TimeoutError:
print(f"[Auto-Respond] Timed out for {ai_name}")
broadcast_event("ai_done")
return
# Discard if call changed during generation
if _session_epoch != epoch:
print(f"[Auto-Respond] Discarding stale response (epoch {epoch}{_session_epoch})")
broadcast_event("ai_done")
return
response = clean_for_tts(response)
response = ensure_complete_thought(response)
if not response or not response.strip():
broadcast_event("ai_done")
return
# Final staleness check before playing audio
if _session_epoch != epoch:
broadcast_event("ai_done")
return
session.add_message(f"ai_caller:{ai_name}", response)
broadcast_chat(ai_name, response)
broadcast_event("ai_status", {"text": f"{ai_name} is speaking..."})
audio_bytes = await generate_speech(response, session.caller["voice"], "none")
# Don't play if call changed during TTS generation
if _session_epoch != epoch:
print(f"[Auto-Respond] Discarding stale TTS (epoch {epoch}{_session_epoch})")
broadcast_event("ai_done")
return
thread = threading.Thread(
target=audio_service.play_caller_audio,
args=(audio_bytes, 24000),
daemon=True,
)
thread.start()
broadcast_event("ai_done")
# session._research_task = asyncio.create_task(_background_research(accumulated_text))
# Also stream to active real caller so they hear the AI
if session.active_real_caller:
caller_id = session.active_real_caller["caller_id"]
asyncio.create_task(
caller_service.stream_audio_to_caller(caller_id, audio_bytes, 24000)
)
@app.post("/api/ai-respond")
async def ai_respond():
"""Trigger AI caller to respond based on current conversation"""
if not session.caller:
raise HTTPException(400, "No active AI caller")
epoch = _session_epoch
try:
async with asyncio.timeout(20):
async with _ai_response_lock:
if _session_epoch != epoch:
raise HTTPException(409, "Call ended while waiting")
audio_service.stop_caller_audio()
conversation_summary = session.get_conversation_summary()
show_history = session.get_show_history()
system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history)
messages = _normalize_messages_for_llm(session.conversation[-10:])
response = await llm_service.generate(
messages=messages,
system_prompt=system_prompt
)
except TimeoutError:
return {"text": "Uh... sorry, I spaced out for a second there.", "caller": session.caller["name"], "voice_id": session.caller["voice"]}
if _session_epoch != epoch:
raise HTTPException(409, "Call changed during response")
response = clean_for_tts(response)
response = ensure_complete_thought(response)
if not response or not response.strip():
response = "Uh... sorry, what was that?"
ai_name = session.caller["name"]
session.add_message(f"ai_caller:{ai_name}", response)
# TTS — outside the lock so other requests aren't blocked
audio_bytes = await generate_speech(response, session.caller["voice"], "none")
if _session_epoch != epoch:
raise HTTPException(409, "Call changed during TTS")
thread = threading.Thread(
target=audio_service.play_caller_audio,
args=(audio_bytes, 24000),
daemon=True,
)
thread.start()
# Stream to real caller
if session.active_real_caller:
caller_id = session.active_real_caller["caller_id"]
asyncio.create_task(
caller_service.stream_audio_to_caller(caller_id, audio_bytes, 24000)
)
return {
"text": response,
"caller": ai_name,
"voice_id": session.caller["voice"]
}
# --- Follow-Up & Session Control Endpoints ---
@app.post("/api/hangup/real")
async def hangup_real_caller():
"""Hang up on real caller — disconnect immediately, summarize in background"""
global _session_epoch, _auto_respond_pending
if not session.active_real_caller:
raise HTTPException(400, "No active real caller")
_session_epoch += 1
# Cancel any pending auto-respond
if _auto_respond_pending and not _auto_respond_pending.done():
_auto_respond_pending.cancel()
_auto_respond_pending = None
_auto_respond_buffer.clear()
if session._research_task and not session._research_task.done():
session._research_task.cancel()
session._research_task = None
caller_id = session.active_real_caller["caller_id"]
caller_phone = session.active_real_caller["phone"]
conversation_snapshot = list(session.conversation)
auto_followup_enabled = session.auto_followup
# End the phone call via SignalWire
call_sid = caller_service.get_call_sid(caller_id)
caller_service.hangup(caller_id)
if call_sid:
asyncio.create_task(_signalwire_end_call(call_sid))
# Stop host streaming if no more active callers
if len(caller_service.active_calls) == 0:
audio_service.stop_host_stream()
session.active_real_caller = None
hangup_sound = settings.sounds_dir / "hangup.wav"
if hangup_sound.exists():
threading.Thread(target=audio_service.play_sfx, args=(str(hangup_sound),), daemon=True).start()
asyncio.create_task(
_summarize_real_call(caller_phone, conversation_snapshot, auto_followup_enabled)
)
return {
"status": "disconnected",
"caller": caller_phone,
}
async def _summarize_real_call(caller_phone: str, conversation: list, auto_followup_enabled: bool):
"""Background task: summarize call and store in history"""
summary = ""
if conversation:
transcript_text = "\n".join(
f"{msg['role']}: {msg['content']}" for msg in conversation
)
summary = await llm_service.generate(
messages=[{"role": "user", "content": f"Summarize this radio show call in 1-2 sentences:\n{transcript_text}"}],
system_prompt="You summarize radio show conversations concisely. Focus on what the caller talked about and any emotional moments.",
)
session.call_history.append(CallRecord(
caller_type="real",
caller_name=caller_phone,
summary=summary,
transcript=conversation,
))
print(f"[Real Caller] {caller_phone} call summarized: {summary[:80]}...")
if auto_followup_enabled:
await _auto_followup(summary)
async def _auto_followup(last_call_summary: str):
"""Automatically pick an AI caller and connect them as follow-up"""
await asyncio.sleep(7) # Brief pause before follow-up
# Ask LLM to pick best AI caller for follow-up
caller_list = ", ".join(
f'{k}: {v["name"]} ({v["gender"]}, {v["age_range"][0]}-{v["age_range"][1]})'
for k, v in CALLER_BASES.items()
)
pick = await llm_service.generate(
messages=[{"role": "user", "content": f'A caller just talked about: "{last_call_summary}". Which AI caller should follow up? Available: {caller_list}. Reply with just the key number.'}],
system_prompt="Pick the most interesting AI caller to follow up on this topic. Just reply with the number key.",
)
# Extract key from response
match = re.search(r'\d+', pick)
if match:
caller_key = match.group()
if caller_key in CALLER_BASES:
session.start_call(caller_key)
print(f"[Auto Follow-Up] {CALLER_BASES[caller_key]['name']} is calling in about: {last_call_summary[:50]}...")
@app.post("/api/followup/generate")
async def generate_followup():
"""Generate an AI follow-up caller based on recent show history"""
if not session.call_history:
raise HTTPException(400, "No call history to follow up on")
last_record = session.call_history[-1]
await _auto_followup(last_record.summary)
return {
"status": "followup_triggered",
"based_on": last_record.caller_name,
}
@app.post("/api/session/ai-mode")
async def set_ai_mode(data: dict):
"""Set AI respond mode (manual or auto)"""
mode = data.get("mode", "manual")
session.ai_respond_mode = mode
print(f"[Session] AI respond mode: {mode}")
return {"mode": mode}
@app.post("/api/session/auto-followup")
async def set_auto_followup(data: dict):
"""Toggle auto follow-up"""
session.auto_followup = data.get("enabled", False)
print(f"[Session] Auto follow-up: {session.auto_followup}")
return {"enabled": session.auto_followup}
# --- Server Control Endpoints ---
import subprocess
from collections import deque
# In-memory log buffer
_log_buffer = deque(maxlen=500)
def add_log(message: str):
"""Add a message to the log buffer"""
import datetime
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
_log_buffer.append(f"[{timestamp}] {message}")
# Override print to also log to buffer
import builtins
_original_print = builtins.print
def _logging_print(*args, **kwargs):
try:
_original_print(*args, **kwargs)
except (BrokenPipeError, OSError):
pass # Ignore broken pipe errors from traceback printing
try:
message = " ".join(str(a) for a in args)
if message.strip():
add_log(message)
except Exception:
pass # Don't let logging errors break the app
builtins.print = _logging_print
@app.get("/api/logs")
async def get_logs(lines: int = 100):
"""Get recent log lines"""
log_lines = list(_log_buffer)[-lines:]
return {"logs": log_lines}
@app.post("/api/server/restart")
async def restart_server():
"""Signal the server to restart (requires run.sh wrapper)"""
restart_flag = Path("/tmp/ai-radio-show.restart")
restart_flag.touch()
add_log("Restart signal sent - server will restart shortly")
return {"status": "restarting"}
@app.post("/api/server/stop")
async def stop_server():
"""Signal the server to stop (requires run.sh wrapper)"""
stop_flag = Path("/tmp/ai-radio-show.stop")
stop_flag.touch()
add_log("Stop signal sent - server will stop shortly")
return {"status": "stopping"}
@app.get("/api/server/status")
async def server_status():
"""Get server status info"""
return {
"status": "running",
"tts_provider": settings.tts_provider,
"llm_provider": llm_service.provider,
"session_id": session.id
}