diff --git a/backend/main.py b/backend/main.py
index b27ad28..d42132a 100644
--- a/backend/main.py
+++ b/backend/main.py
@@ -25,6 +25,7 @@ from .services.llm import llm_service
from .services.tts import generate_speech
from .services.audio import audio_service
from .services.news import news_service, extract_keywords, STOP_WORDS
+from .services.regulars import regular_caller_service
app = FastAPI(title="AI Radio Show")
@@ -115,7 +116,8 @@ CALLER_BASES = {
def _randomize_callers():
- """Assign random names and voices to callers, unique per gender."""
+ """Assign random names and voices to callers, unique per gender.
+ Overrides 2-3 slots with returning regulars when available."""
num_m = sum(1 for c in CALLER_BASES.values() if c["gender"] == "male")
num_f = sum(1 for c in CALLER_BASES.values() if c["gender"] == "female")
males = random.sample(MALE_NAMES, num_m)
@@ -125,6 +127,8 @@ def _randomize_callers():
f_voices = random.sample(female_pool, min(num_f, len(female_pool)))
mi, fi = 0, 0
for base in CALLER_BASES.values():
+ base["returning"] = False
+ base["regular_id"] = None
if base["gender"] == "male":
base["name"] = males[mi]
base["voice"] = m_voices[mi]
@@ -134,6 +138,32 @@ def _randomize_callers():
base["voice"] = f_voices[fi]
fi += 1
+ # Override 2-3 random slots with returning callers
+ try:
+ returning = regular_caller_service.get_returning_callers(random.randint(2, 3))
+ if returning:
+ keys_by_gender = {"male": [], "female": []}
+ for k, v in CALLER_BASES.items():
+ keys_by_gender[v["gender"]].append(k)
+
+ for regular in returning:
+ gender = regular["gender"]
+ candidates = keys_by_gender.get(gender, [])
+ if not candidates:
+ continue
+ key = random.choice(candidates)
+ candidates.remove(key)
+ base = CALLER_BASES[key]
+ base["name"] = regular["name"]
+ base["returning"] = True
+ base["regular_id"] = regular["id"]
+ # Keep the randomly assigned voice — regulars sound different each time
+ if returning:
+ names = [r["name"] for r in returning]
+ print(f"[Regulars] Injected returning callers: {', '.join(names)}")
+ except Exception as e:
+ print(f"[Regulars] Failed to inject returning callers: {e}")
+
_randomize_callers() # Initial assignment
# Background components for dynamic generation
@@ -1239,10 +1269,68 @@ def pick_location() -> str:
return random.choice(LOCATIONS_OUT_OF_STATE)
+def _generate_returning_caller_background(base: dict) -> str:
+ """Generate background for a returning regular caller."""
+ regular_id = base.get("regular_id")
+ regulars = regular_caller_service.get_regulars()
+ regular = next((r for r in regulars if r["id"] == regular_id), None)
+ if not regular:
+ return generate_caller_background(base)
+
+ gender = regular["gender"]
+ age = regular["age"]
+ job = regular["job"]
+ location = regular["location"]
+ traits = regular.get("personality_traits", [])
+
+ # Build previous calls section
+ prev_calls = regular.get("call_history", [])
+ prev_section = ""
+ if prev_calls:
+ lines = [f"- {c['summary']}" for c in prev_calls[-3:]]
+ prev_section = "\nPREVIOUS CALLS:\n" + "\n".join(lines)
+ prev_section += "\nYou're calling back with an update — something has changed since last time. Reference your previous call(s) naturally."
+
+ # Reuse standard personality layers
+ interest1, interest2 = random.sample(INTERESTS, 2)
+ quirk1, quirk2 = random.sample(QUIRKS, 2)
+ people_pool = PEOPLE_MALE if gender == "male" else PEOPLE_FEMALE
+ person1, person2 = random.sample(people_pool, 2)
+ tic1, tic2 = random.sample(VERBAL_TICS, 2)
+ arc = random.choice(EMOTIONAL_ARCS)
+ vehicle = random.choice(VEHICLES)
+ having = random.choice(HAVING_RIGHT_NOW)
+
+ time_ctx = _get_time_context()
+ moon = _get_moon_phase()
+ season_ctx = _get_seasonal_context()
+
+ trait_str = ", ".join(traits) if traits else "a regular caller"
+
+ parts = [
+ f"{age}, {job} {location}. Returning caller — {trait_str}.",
+ f"{interest1.capitalize()}, {interest2}.",
+ f"{quirk1.capitalize()}, {quirk2}.",
+ f"\nRIGHT NOW: {time_ctx} Moon: {moon}.",
+ f"\nSEASON: {season_ctx}",
+ f"\nPEOPLE IN THEIR LIFE: {person1.capitalize()}. {person2.capitalize()}. Use their names when talking about them.",
+ f"\nDRIVES: {vehicle.capitalize()}.",
+ f"\nHAVING RIGHT NOW: {having}",
+ f"\nVERBAL HABITS: Tends to say \"{tic1}\" and \"{tic2}\" — use these naturally in conversation.",
+ f"\nEMOTIONAL ARC: {arc}",
+ f"\nRELATIONSHIP TO THE SHOW: Has called before. Comfortable on air. Knows Luke a bit. Might reference their last call.",
+ prev_section,
+ ]
+
+ return " ".join(parts[:3]) + "".join(parts[3:])
+
+
def generate_caller_background(base: dict) -> str:
"""Generate a unique background for a caller (sync, no research).
~30% of callers are 'topic callers' who call about something interesting
instead of a personal problem. Includes full personality layers for realism."""
+ if base.get("returning") and base.get("regular_id"):
+ return _generate_returning_caller_background(base)
gender = base["gender"]
age = random.randint(*base["age_range"])
jobs = JOBS_MALE if gender == "male" else JOBS_FEMALE
@@ -1491,8 +1579,58 @@ async def enrich_caller_background(background: str) -> str:
return background
+def detect_host_mood(messages: list[dict]) -> str:
+ """Analyze recent host messages to detect mood signals for caller adaptation."""
+ host_msgs = [m["content"] for m in messages if m.get("role") in ("user", "host")][-5:]
+ if not host_msgs:
+ return ""
+
+ signals = []
+
+ # Check average word count — short responses suggest dismissiveness
+ avg_words = sum(len(m.split()) for m in host_msgs) / len(host_msgs)
+ if avg_words < 8:
+ signals.append("The host is giving short responses — they might be losing interest, testing you, or waiting for you to bring something real. Don't ramble. Get to the point or change the subject.")
+
+ # Pushback patterns
+ pushback_phrases = ["i don't think", "that's not", "come on", "really?", "i disagree",
+ "that doesn't", "are you sure", "i don't buy", "no way", "but that's",
+ "hold on", "wait a minute", "let's be honest"]
+ pushback_count = sum(1 for m in host_msgs for p in pushback_phrases if p in m.lower())
+ if pushback_count >= 2:
+ signals.append("The host is pushing back — they're challenging you. Don't fold immediately. Defend your position or concede specifically, not generically.")
+
+ # Supportive patterns
+ supportive_phrases = ["i hear you", "that makes sense", "i get it", "that's real",
+ "i feel you", "you're right", "absolutely", "exactly", "good for you",
+ "i respect that", "that took guts", "i'm glad you"]
+ supportive_count = sum(1 for m in host_msgs for p in supportive_phrases if p in m.lower())
+ if supportive_count >= 2:
+ signals.append("The host is being supportive — they're with you. You can go deeper. Share something you've been holding back.")
+
+ # Joking patterns
+ joke_indicators = ["haha", "lmao", "lol", "that's hilarious", "no way", "you're killing me",
+ "shut up", "get out", "are you serious", "you're joking"]
+ joke_count = sum(1 for m in host_msgs for p in joke_indicators if p in m.lower())
+ if joke_count >= 2:
+ signals.append("The host is in a playful mood — joking around. You can joke back, lean into the humor, but you can also use it as a door to something real.")
+
+ # Probing — lots of questions
+ question_count = sum(m.count("?") for m in host_msgs)
+ if question_count >= 3:
+ signals.append("The host is asking a lot of questions — they're digging. Give them real answers. Don't deflect.")
+
+ if not signals:
+ return ""
+
+ # Cap at 2 signals
+ signals = signals[:2]
+ return "\nEMOTIONAL READ ON THE HOST:\n" + "\n".join(f"- {s}" for s in signals) + "\n"
+
+
def get_caller_prompt(caller: dict, conversation_summary: str = "", show_history: str = "",
- news_context: str = "", research_context: str = "") -> str:
+ news_context: str = "", research_context: str = "",
+ emotional_read: str = "") -> str:
"""Generate a natural system prompt for a caller"""
context = ""
if conversation_summary:
@@ -1519,7 +1657,7 @@ Continue naturally. Don't repeat yourself.
return f"""You're {caller['name']}, calling a late-night radio show called "Luke at the Roost." It's late. You trust this host.
{caller['vibe']}
-{history}{context}{world_context}
+{history}{context}{world_context}{emotional_read}
HOW TO TALK:
- Sound like a real person on the phone, not an essay. This is a conversation, not a monologue.
- VARY YOUR LENGTH. Sometimes one sentence. Sometimes two or three. Match the moment.
@@ -1607,6 +1745,8 @@ class CallRecord:
caller_name: str # "Tony" or "Caller #3"
summary: str # LLM-generated summary after hangup
transcript: list[dict] = field(default_factory=list)
+ started_at: float = 0.0
+ ended_at: float = 0.0
class Session:
@@ -1616,6 +1756,7 @@ class Session:
self.conversation: list[dict] = []
self.caller_backgrounds: dict[str, str] = {} # Generated backgrounds for this session
self.call_history: list[CallRecord] = []
+ self._call_started_at: float = 0.0
self.active_real_caller: dict | None = None
self.ai_respond_mode: str = "manual" # "manual" or "auto"
self.auto_followup: bool = False
@@ -1626,13 +1767,14 @@ class Session:
def start_call(self, caller_key: str):
self.current_caller_key = caller_key
self.conversation = []
+ self._call_started_at = time.time()
def end_call(self):
self.current_caller_key = None
self.conversation = []
def add_message(self, role: str, content: str):
- self.conversation.append({"role": role, "content": content})
+ self.conversation.append({"role": role, "content": content, "timestamp": time.time()})
def get_caller_background(self, caller_key: str) -> str:
"""Get or generate background for a caller in this session"""
@@ -1977,7 +2119,7 @@ async def get_callers():
"""Get list of available callers"""
return {
"callers": [
- {"key": k, "name": v["name"]}
+ {"key": k, "name": v["name"], "returning": v.get("returning", False)}
for k, v in CALLER_BASES.items()
],
"current": session.current_caller_key,
@@ -1985,6 +2127,12 @@ async def get_callers():
}
+@app.get("/api/regulars")
+async def get_regulars():
+ """Get list of regular callers"""
+ return {"regulars": regular_caller_service.get_regulars()}
+
+
@app.post("/api/session/reset")
async def reset_session():
"""Reset session - all callers get fresh backgrounds"""
@@ -2037,6 +2185,9 @@ async def hangup():
session._research_task = None
caller_name = session.caller["name"] if session.caller else None
+ caller_key = session.current_caller_key
+ conversation_snapshot = list(session.conversation)
+ call_started = getattr(session, '_call_started_at', 0.0)
session.end_call()
# Play hangup sound in background so response returns immediately
@@ -2044,9 +2195,74 @@ async def hangup():
if hangup_sound.exists():
threading.Thread(target=audio_service.play_sfx, args=(str(hangup_sound),), daemon=True).start()
+ # Generate summary for AI caller in background
+ if caller_name and conversation_snapshot:
+ asyncio.create_task(_summarize_ai_call(caller_key, caller_name, conversation_snapshot, call_started))
+
return {"status": "disconnected", "caller": caller_name}
+async def _summarize_ai_call(caller_key: str, caller_name: str, conversation: list[dict], started_at: float):
+ """Background task: summarize AI caller conversation and store in history"""
+ ended_at = time.time()
+ summary = ""
+ if conversation:
+ transcript_text = "\n".join(
+ f"{msg['role']}: {msg['content']}" for msg in conversation
+ )
+ try:
+ summary = await llm_service.generate(
+ messages=[{"role": "user", "content": f"Summarize this radio show call in 1-2 sentences:\n{transcript_text}"}],
+ system_prompt="You summarize radio show conversations concisely. Focus on what the caller talked about and any emotional moments.",
+ )
+ except Exception as e:
+ print(f"[AI Summary] Failed to generate summary: {e}")
+ summary = f"{caller_name} called in."
+
+ session.call_history.append(CallRecord(
+ caller_type="ai",
+ caller_name=caller_name,
+ summary=summary,
+ transcript=conversation,
+ started_at=started_at,
+ ended_at=ended_at,
+ ))
+ print(f"[AI Summary] {caller_name} call summarized: {summary[:80]}...")
+
+ # Returning caller promotion/update logic
+ try:
+ base = CALLER_BASES.get(caller_key) if caller_key else None
+ if base and summary:
+ if base.get("returning") and base.get("regular_id"):
+ # Update existing regular's call history
+ regular_caller_service.update_after_call(base["regular_id"], summary)
+ elif len(conversation) >= 6 and random.random() < 0.20:
+ # 20% chance to promote first-timer with 6+ messages
+ bg = session.caller_backgrounds.get(caller_key, "")
+ traits = []
+ for label in ["QUIRK", "STRONG OPINION", "SECRET SIDE", "FOOD OPINION"]:
+ for line in bg.split("\n"):
+ if label in line:
+ traits.append(line.split(":", 1)[-1].strip()[:80])
+ break
+ # Extract job and location from first line of background
+ first_line = bg.split(".")[0] if bg else ""
+ parts = first_line.split(",", 1)
+ job_loc = parts[1].strip() if len(parts) > 1 else ""
+ job_parts = job_loc.rsplit(" in ", 1) if " in " in job_loc else (job_loc, "unknown")
+ regular_caller_service.add_regular(
+ name=caller_name,
+ gender=base.get("gender", "male"),
+ age=random.randint(*base.get("age_range", (30, 50))),
+ job=job_parts[0].strip() if isinstance(job_parts, tuple) else job_parts[0],
+ location="in " + job_parts[1].strip() if isinstance(job_parts, tuple) and len(job_parts) > 1 else "unknown",
+ personality_traits=traits[:4],
+ first_call_summary=summary,
+ )
+ except Exception as e:
+ print(f"[Regulars] Promotion logic error: {e}")
+
+
# --- Chat & TTS Endpoints ---
import re
@@ -2174,7 +2390,8 @@ async def chat(request: ChatRequest):
conversation_summary = session.get_conversation_summary()
show_history = session.get_show_history()
- system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history)
+ mood = detect_host_mood(session.conversation)
+ system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history, emotional_read=mood)
messages = _normalize_messages_for_llm(session.conversation[-10:])
response = await llm_service.generate(
@@ -2276,13 +2493,16 @@ async def get_music():
@app.post("/api/music/play")
async def play_music(request: MusicRequest):
- """Load and play a music track"""
+ """Load and play a music track, crossfading if already playing"""
track_path = settings.music_dir / request.track
if not track_path.exists():
raise HTTPException(404, "Track not found")
- audio_service.load_music(str(track_path))
- audio_service.play_music()
+ if audio_service.is_music_playing():
+ audio_service.crossfade_to(str(track_path))
+ else:
+ audio_service.load_music(str(track_path))
+ audio_service.play_music()
return {"status": "playing", "track": request.track}
@@ -2352,6 +2572,9 @@ async def play_ad(request: MusicRequest):
if not ad_path.exists():
raise HTTPException(404, "Ad not found")
+ if audio_service._music_playing:
+ audio_service.stop_music(fade_duration=1.0)
+ await asyncio.sleep(1.1)
audio_service.play_ad(str(ad_path))
return {"status": "playing", "track": request.track}
@@ -2393,6 +2616,126 @@ async def update_settings(data: dict):
return llm_service.get_settings()
+# --- Caller Screening ---
+
+SCREENING_PROMPT = """You are a friendly, brief phone screener for "Luke at the Roost" radio show.
+Your job: Get the caller's first name and what they want to talk about. That's it.
+
+Rules:
+- Be warm but brief (1-2 sentences per response)
+- First ask their name, then ask what they want to talk about
+- After you have both, say something like "Great, sit tight and we'll get you on with Luke!"
+- Never pretend to be Luke or the host
+- Keep it casual and conversational
+- If they're hard to understand, ask them to repeat"""
+
+_screening_audio_buffers: dict[str, bytearray] = {}
+
+
+async def _start_screening_greeting(caller_id: str):
+ """Send initial screening greeting to queued caller after brief delay"""
+ await asyncio.sleep(2) # Wait for stream to stabilize
+
+ ws = caller_service._websockets.get(caller_id)
+ if not ws:
+ return
+
+ caller_service.start_screening(caller_id)
+ greeting = "Hey there! Thanks for calling Luke at the Roost. What's your name?"
+ caller_service.update_screening(caller_id, screener_text=greeting)
+
+ try:
+ audio_bytes = await generate_speech(greeting, "Sarah", "none")
+ if audio_bytes:
+ await caller_service.stream_audio_to_caller(caller_id, audio_bytes, 24000)
+ except Exception as e:
+ print(f"[Screening] Greeting TTS failed: {e}")
+
+
+async def _handle_screening_audio(caller_id: str, pcm_data: bytes, sample_rate: int):
+ """Process audio from a queued caller for screening conversation"""
+ state = caller_service.get_screening_state(caller_id)
+ if not state or state["status"] == "complete":
+ return
+
+ # Skip if TTS is currently streaming to this caller
+ if caller_service.is_streaming_tts(caller_id):
+ return
+
+ # Transcribe caller speech
+ try:
+ text = await transcribe_audio(pcm_data, source_sample_rate=sample_rate)
+ except Exception as e:
+ print(f"[Screening] Transcription failed: {e}")
+ return
+
+ if not text or not text.strip():
+ return
+
+ print(f"[Screening] Caller {caller_id}: {text}")
+ caller_service.update_screening(caller_id, caller_text=text)
+
+ # Build conversation for LLM
+ messages = []
+ for msg in state["conversation"]:
+ role = "assistant" if msg["role"] == "screener" else "user"
+ messages.append({"role": role, "content": msg["content"]})
+
+ # Generate screener response
+ try:
+ response = await llm_service.generate(
+ messages=messages,
+ system_prompt=SCREENING_PROMPT
+ )
+ except Exception as e:
+ print(f"[Screening] LLM failed: {e}")
+ return
+
+ if not response or not response.strip():
+ return
+
+ response = response.strip()
+ print(f"[Screening] Screener → {caller_id}: {response}")
+ caller_service.update_screening(caller_id, screener_text=response)
+
+ # After 2+ caller responses, try to extract name and topic
+ if state["response_count"] >= 2:
+ try:
+ extract_prompt = f"""From this screening conversation, extract the caller's name and topic.
+Conversation:
+{chr(10).join(f'{m["role"]}: {m["content"]}' for m in state["conversation"])}
+
+Respond with ONLY JSON: {{"name": "their first name or null", "topic": "brief topic or null"}}"""
+ extract = await llm_service.generate(
+ messages=[{"role": "user", "content": extract_prompt}],
+ system_prompt="You extract structured data from conversations. Respond with only valid JSON."
+ )
+ json_match = re.search(r'\{[^}]+\}', extract)
+ if json_match:
+ info = json.loads(json_match.group())
+ if info.get("name"):
+ caller_service.update_screening(caller_id, caller_name=info["name"])
+ if info.get("topic"):
+ caller_service.update_screening(caller_id, topic=info["topic"])
+ if info.get("name") and info.get("topic"):
+ caller_service.end_screening(caller_id)
+ broadcast_event("screening_complete", {
+ "caller_id": caller_id,
+ "name": info["name"],
+ "topic": info["topic"]
+ })
+ except Exception as e:
+ print(f"[Screening] Extract failed: {e}")
+
+ # TTS the screener response back to caller
+ try:
+ audio_bytes = await generate_speech(response, "Sarah", "none")
+ if audio_bytes:
+ await caller_service.stream_audio_to_caller(caller_id, audio_bytes, 24000)
+ except Exception as e:
+ print(f"[Screening] Response TTS failed: {e}")
+
+
@app.websocket("/api/signalwire/stream")
async def signalwire_audio_stream(websocket: WebSocket):
"""Handle SignalWire bidirectional audio stream"""
@@ -2402,6 +2745,7 @@ async def signalwire_audio_stream(websocket: WebSocket):
caller_phone = "Unknown"
call_sid = ""
audio_buffer = bytearray()
+ screening_buffer = bytearray()
CHUNK_DURATION_S = 3
SAMPLE_RATE = 16000
chunk_samples = CHUNK_DURATION_S * SAMPLE_RATE
@@ -2448,6 +2792,9 @@ async def signalwire_audio_stream(websocket: WebSocket):
if stream_sid:
caller_service.register_stream_sid(caller_id, stream_sid)
+ # Start screening conversation
+ asyncio.create_task(_start_screening_greeting(caller_id))
+
elif event == "media" and stream_started:
try:
payload = msg.get("media", {}).get("payload", "")
@@ -2458,6 +2805,16 @@ async def signalwire_audio_stream(websocket: WebSocket):
call_info = caller_service.active_calls.get(caller_id)
if not call_info:
+ # Caller is queued, not on air — route to screening
+ screening_buffer.extend(pcm_data)
+ if len(screening_buffer) >= chunk_samples * 2:
+ pcm_chunk = bytes(screening_buffer[:chunk_samples * 2])
+ screening_buffer = screening_buffer[chunk_samples * 2:]
+ audio_check = np.frombuffer(pcm_chunk, dtype=np.int16).astype(np.float32) / 32768.0
+ if np.abs(audio_check).max() >= 0.01:
+ asyncio.create_task(
+ _handle_screening_audio(caller_id, pcm_chunk, SAMPLE_RATE)
+ )
continue
audio_buffer.extend(pcm_data)
@@ -2713,7 +3070,8 @@ async def _trigger_ai_auto_respond(accumulated_text: str):
conversation_summary = session.get_conversation_summary()
show_history = session.get_show_history()
- system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history)
+ mood = detect_host_mood(session.conversation)
+ system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history, emotional_read=mood)
messages = _normalize_messages_for_llm(session.conversation[-10:])
response = await llm_service.generate(
@@ -2785,7 +3143,8 @@ async def ai_respond():
conversation_summary = session.get_conversation_summary()
show_history = session.get_show_history()
- system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history)
+ mood = detect_host_mood(session.conversation)
+ system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history, emotional_read=mood)
messages = _normalize_messages_for_llm(session.conversation[-10:])
response = await llm_service.generate(
@@ -2856,6 +3215,7 @@ async def hangup_real_caller():
caller_id = session.active_real_caller["caller_id"]
caller_phone = session.active_real_caller["phone"]
conversation_snapshot = list(session.conversation)
+ call_started = getattr(session, '_call_started_at', 0.0)
auto_followup_enabled = session.auto_followup
# End the phone call via SignalWire
@@ -2875,7 +3235,7 @@ async def hangup_real_caller():
threading.Thread(target=audio_service.play_sfx, args=(str(hangup_sound),), daemon=True).start()
asyncio.create_task(
- _summarize_real_call(caller_phone, conversation_snapshot, auto_followup_enabled)
+ _summarize_real_call(caller_phone, conversation_snapshot, call_started, auto_followup_enabled)
)
return {
@@ -2884,8 +3244,9 @@ async def hangup_real_caller():
}
-async def _summarize_real_call(caller_phone: str, conversation: list, auto_followup_enabled: bool):
+async def _summarize_real_call(caller_phone: str, conversation: list, started_at: float, auto_followup_enabled: bool):
"""Background task: summarize call and store in history"""
+ ended_at = time.time()
summary = ""
if conversation:
transcript_text = "\n".join(
@@ -2901,6 +3262,8 @@ async def _summarize_real_call(caller_phone: str, conversation: list, auto_follo
caller_name=caller_phone,
summary=summary,
transcript=conversation,
+ started_at=started_at,
+ ended_at=ended_at,
))
print(f"[Real Caller] {caller_phone} call summarized: {summary[:80]}...")
@@ -2963,6 +3326,70 @@ async def set_auto_followup(data: dict):
return {"enabled": session.auto_followup}
+# --- Transcript & Chapter Export ---
+
+@app.get("/api/session/export")
+async def export_session():
+ """Export session transcript with speaker labels and chapters from call boundaries"""
+ if not session.call_history:
+ raise HTTPException(400, "No calls in this session to export")
+
+ # Find the earliest call start as session base time
+ session_start = min(
+ (r.started_at for r in session.call_history if r.started_at > 0),
+ default=time.time()
+ )
+
+ transcript_lines = []
+ chapters = []
+
+ for i, record in enumerate(session.call_history):
+ # Chapter from call start time
+ offset_seconds = max(0, record.started_at - session_start) if record.started_at > 0 else 0
+ chapter_title = f"{record.caller_name}"
+ if record.summary:
+ # Use first sentence of summary for chapter title
+ short_summary = record.summary.split(".")[0].strip()
+ if short_summary:
+ chapter_title += f" \u2014 {short_summary}"
+ chapters.append({"startTime": round(offset_seconds), "title": chapter_title})
+
+ # Separator between calls
+ if i > 0:
+ transcript_lines.append("")
+ transcript_lines.append(f"--- Call {i + 1}: {record.caller_name} ---")
+ transcript_lines.append("")
+
+ # Transcript lines with timestamps
+ for msg in record.transcript:
+ msg_offset = msg.get("timestamp", 0) - session_start if msg.get("timestamp") else offset_seconds
+ if msg_offset < 0:
+ msg_offset = 0
+ mins = int(msg_offset // 60)
+ secs = int(msg_offset % 60)
+
+ role = msg.get("role", "")
+ if role in ("user", "host"):
+ speaker = "HOST"
+ elif role.startswith("real_caller:"):
+ speaker = role.split(":", 1)[1].upper()
+ elif role.startswith("ai_caller:"):
+ speaker = role.split(":", 1)[1].upper()
+ elif role == "assistant":
+ speaker = record.caller_name.upper()
+ else:
+ speaker = role.upper()
+
+ transcript_lines.append(f"[{mins:02d}:{secs:02d}] {speaker}: {msg['content']}")
+
+ return {
+ "session_id": session.id,
+ "transcript": "\n".join(transcript_lines),
+ "chapters": chapters,
+ "call_count": len(session.call_history),
+ }
+
+
# --- Server Control Endpoints ---
import subprocess
diff --git a/backend/services/audio.py b/backend/services/audio.py
index a2c668c..996d893 100644
--- a/backend/services/audio.py
+++ b/backend/services/audio.py
@@ -53,6 +53,14 @@ class AudioService:
self._music_volume: float = 0.3
self._music_loop: bool = True
+ # Music crossfade state
+ self._crossfade_active: bool = False
+ self._crossfade_old_data: Optional[np.ndarray] = None
+ self._crossfade_old_position: int = 0
+ self._crossfade_progress: float = 0.0
+ self._crossfade_samples: int = 0
+ self._crossfade_step: float = 0.0
+
# Caller playback state
self._caller_stop_event = threading.Event()
self._caller_thread: Optional[threading.Thread] = None
@@ -578,6 +586,55 @@ class AudioService:
print(f"Failed to load music: {e}")
return False
+ def crossfade_to(self, file_path: str, duration: float = 3.0):
+ """Crossfade from current music track to a new one"""
+ import librosa
+
+ if not self._music_playing or self._music_resampled is None:
+ if self.load_music(file_path):
+ self.play_music()
+ return
+
+ # Load the new track
+ path = Path(file_path)
+ if not path.exists():
+ print(f"Music file not found: {file_path}")
+ return
+
+ try:
+ audio, sr = librosa.load(str(path), sr=self.output_sample_rate, mono=True)
+ new_data = audio.astype(np.float32)
+ except Exception as e:
+ print(f"Failed to load music for crossfade: {e}")
+ return
+
+ # Get device sample rate for resampling
+ if self.output_device is not None:
+ device_info = sd.query_devices(self.output_device)
+ device_sr = int(device_info['default_samplerate'])
+ else:
+ device_sr = self.output_sample_rate
+
+ if self.output_sample_rate != device_sr:
+ new_resampled = librosa.resample(new_data, orig_sr=self.output_sample_rate, target_sr=device_sr)
+ else:
+ new_resampled = new_data.copy()
+
+ # Swap: current becomes old, new becomes current
+ self._crossfade_old_data = self._music_resampled
+ self._crossfade_old_position = self._music_position
+ self._music_resampled = new_resampled
+ self._music_data = new_data
+ self._music_position = 0
+
+ # Configure crossfade timing
+ self._crossfade_samples = int(device_sr * duration)
+ self._crossfade_progress = 0.0
+ self._crossfade_step = 1.0 / self._crossfade_samples if self._crossfade_samples > 0 else 1.0
+ self._crossfade_active = True
+
+ print(f"Crossfading to {path.name} over {duration}s")
+
def play_music(self):
"""Start music playback to specific channel"""
import librosa
@@ -625,24 +682,54 @@ class AudioService:
if not self._music_playing or self._music_resampled is None:
return
+ # Read new track samples
end_pos = self._music_position + frames
-
if end_pos <= len(self._music_resampled):
- outdata[:, channel_idx] = self._music_resampled[self._music_position:end_pos] * self._music_volume
+ new_samples = self._music_resampled[self._music_position:end_pos].copy()
self._music_position = end_pos
else:
remaining = len(self._music_resampled) - self._music_position
+ new_samples = np.zeros(frames, dtype=np.float32)
if remaining > 0:
- outdata[:remaining, channel_idx] = self._music_resampled[self._music_position:] * self._music_volume
-
+ new_samples[:remaining] = self._music_resampled[self._music_position:]
if self._music_loop:
- self._music_position = 0
wrap_frames = frames - remaining
if wrap_frames > 0:
- outdata[remaining:, channel_idx] = self._music_resampled[:wrap_frames] * self._music_volume
+ new_samples[remaining:] = self._music_resampled[:wrap_frames]
self._music_position = wrap_frames
else:
- self._music_playing = False
+ self._music_position = len(self._music_resampled)
+ if remaining <= 0:
+ self._music_playing = False
+
+ if self._crossfade_active and self._crossfade_old_data is not None:
+ # Read old track samples
+ old_end = self._crossfade_old_position + frames
+ if old_end <= len(self._crossfade_old_data):
+ old_samples = self._crossfade_old_data[self._crossfade_old_position:old_end]
+ self._crossfade_old_position = old_end
+ else:
+ old_remaining = len(self._crossfade_old_data) - self._crossfade_old_position
+ old_samples = np.zeros(frames, dtype=np.float32)
+ if old_remaining > 0:
+ old_samples[:old_remaining] = self._crossfade_old_data[self._crossfade_old_position:]
+ self._crossfade_old_position = len(self._crossfade_old_data)
+
+ # Compute fade curves for this chunk
+ start_progress = self._crossfade_progress
+ end_progress = min(1.0, start_progress + self._crossfade_step * frames)
+ fade_in = np.linspace(start_progress, end_progress, frames, dtype=np.float32)
+ fade_out = 1.0 - fade_in
+
+ outdata[:, channel_idx] = (old_samples * fade_out + new_samples * fade_in) * self._music_volume
+ self._crossfade_progress = end_progress
+
+ if self._crossfade_progress >= 1.0:
+ self._crossfade_active = False
+ self._crossfade_old_data = None
+ print("Crossfade complete")
+ else:
+ outdata[:, channel_idx] = new_samples * self._music_volume
try:
self._music_stream = sd.OutputStream(
@@ -659,15 +746,48 @@ class AudioService:
print(f"Music playback error: {e}")
self._music_playing = False
- def stop_music(self):
- """Stop music playback"""
- self._music_playing = False
- if self._music_stream:
+ def stop_music(self, fade_duration: float = 2.0):
+ """Stop music playback with fade out"""
+ if not self._music_playing or not self._music_stream:
+ self._music_playing = False
+ if self._music_stream:
+ self._music_stream.stop()
+ self._music_stream.close()
+ self._music_stream = None
+ self._music_position = 0
+ return
+
+ if fade_duration <= 0:
+ self._music_playing = False
self._music_stream.stop()
self._music_stream.close()
self._music_stream = None
- self._music_position = 0
- print("Music stopped")
+ self._music_position = 0
+ print("Music stopped")
+ return
+
+ import threading
+ original_volume = self._music_volume
+ steps = 20
+ step_time = fade_duration / steps
+
+ def _fade():
+ for i in range(steps):
+ if not self._music_playing:
+ break
+ self._music_volume = original_volume * (1 - (i + 1) / steps)
+ import time
+ time.sleep(step_time)
+ self._music_playing = False
+ if self._music_stream:
+ self._music_stream.stop()
+ self._music_stream.close()
+ self._music_stream = None
+ self._music_position = 0
+ self._music_volume = original_volume
+ print("Music faded out and stopped")
+
+ threading.Thread(target=_fade, daemon=True).start()
def play_ad(self, file_path: str):
"""Load and play an ad file once (no loop) on the ad channel"""
diff --git a/backend/services/caller_service.py b/backend/services/caller_service.py
index 12870a0..266560f 100644
--- a/backend/services/caller_service.py
+++ b/backend/services/caller_service.py
@@ -25,6 +25,7 @@ class CallerService:
self._stream_sids: dict[str, str] = {} # caller_id -> SignalWire streamSid
self._send_locks: dict[str, asyncio.Lock] = {} # per-caller send lock
self._streaming_tts: set[str] = set() # caller_ids currently receiving TTS
+ self._screening_state: dict[str, dict] = {} # caller_id -> screening conversation
def _get_send_lock(self, caller_id: str) -> asyncio.Lock:
if caller_id not in self._send_locks:
@@ -51,18 +52,6 @@ class CallerService:
self._queue = [c for c in self._queue if c["caller_id"] != caller_id]
print(f"[Caller] {caller_id} removed from queue")
- def get_queue(self) -> list[dict]:
- now = time.time()
- with self._lock:
- return [
- {
- "caller_id": c["caller_id"],
- "phone": c["phone"],
- "wait_time": int(now - c["queued_at"]),
- }
- for c in self._queue
- ]
-
def allocate_channel(self) -> int:
with self._lock:
ch = self.FIRST_REAL_CHANNEL
@@ -111,6 +100,7 @@ class CallerService:
self._call_sids.pop(caller_id, None)
self._stream_sids.pop(caller_id, None)
self._send_locks.pop(caller_id, None)
+ self._screening_state.pop(caller_id, None)
def reset(self):
with self._lock:
@@ -125,8 +115,72 @@ class CallerService:
self._stream_sids.clear()
self._send_locks.clear()
self._streaming_tts.clear()
+ self._screening_state.clear()
print("[Caller] Service reset")
+ # --- Screening ---
+
+ def start_screening(self, caller_id: str):
+ """Initialize screening state for a queued caller"""
+ self._screening_state[caller_id] = {
+ "conversation": [],
+ "caller_name": None,
+ "topic": None,
+ "status": "screening", # screening, complete
+ "response_count": 0,
+ }
+ print(f"[Screening] Started for {caller_id}")
+
+ def get_screening_state(self, caller_id: str) -> Optional[dict]:
+ return self._screening_state.get(caller_id)
+
+ def update_screening(self, caller_id: str, caller_text: str = None,
+ screener_text: str = None, caller_name: str = None,
+ topic: str = None):
+ """Update screening conversation and extracted info"""
+ state = self._screening_state.get(caller_id)
+ if not state:
+ return
+ if caller_text:
+ state["conversation"].append({"role": "caller", "content": caller_text})
+ state["response_count"] += 1
+ if screener_text:
+ state["conversation"].append({"role": "screener", "content": screener_text})
+ if caller_name:
+ state["caller_name"] = caller_name
+ if topic:
+ state["topic"] = topic
+
+ def end_screening(self, caller_id: str):
+ """Mark screening as complete"""
+ state = self._screening_state.get(caller_id)
+ if state:
+ state["status"] = "complete"
+ print(f"[Screening] Complete for {caller_id}: name={state.get('caller_name')}, topic={state.get('topic')}")
+
+ def get_queue(self) -> list[dict]:
+ """Get queue with screening info enrichment"""
+ now = time.time()
+ with self._lock:
+ result = []
+ for c in self._queue:
+ entry = {
+ "caller_id": c["caller_id"],
+ "phone": c["phone"],
+ "wait_time": int(now - c["queued_at"]),
+ }
+ screening = self._screening_state.get(c["caller_id"])
+ if screening:
+ entry["screening_status"] = screening["status"]
+ entry["caller_name"] = screening.get("caller_name")
+ entry["screening_summary"] = screening.get("topic")
+ else:
+ entry["screening_status"] = None
+ entry["caller_name"] = None
+ entry["screening_summary"] = None
+ result.append(entry)
+ return result
+
def register_websocket(self, caller_id: str, websocket):
"""Register a WebSocket for a caller"""
self._websockets[caller_id] = websocket
diff --git a/backend/services/regulars.py b/backend/services/regulars.py
new file mode 100644
index 0000000..1533f90
--- /dev/null
+++ b/backend/services/regulars.py
@@ -0,0 +1,95 @@
+"""Returning caller persistence service"""
+
+import json
+import time
+import uuid
+from pathlib import Path
+from typing import Optional
+
+DATA_FILE = Path(__file__).parent.parent.parent / "data" / "regulars.json"
+MAX_REGULARS = 12
+
+
+class RegularCallerService:
+ """Manages persistent 'regular' callers who return across sessions"""
+
+ def __init__(self):
+ self._regulars: list[dict] = []
+ self._load()
+
+ def _load(self):
+ if DATA_FILE.exists():
+ try:
+ with open(DATA_FILE) as f:
+ data = json.load(f)
+ self._regulars = data.get("regulars", [])
+ print(f"[Regulars] Loaded {len(self._regulars)} regular callers")
+ except Exception as e:
+ print(f"[Regulars] Failed to load: {e}")
+ self._regulars = []
+
+ def _save(self):
+ try:
+ DATA_FILE.parent.mkdir(parents=True, exist_ok=True)
+ with open(DATA_FILE, "w") as f:
+ json.dump({"regulars": self._regulars}, f, indent=2)
+ except Exception as e:
+ print(f"[Regulars] Failed to save: {e}")
+
+ def get_regulars(self) -> list[dict]:
+ return list(self._regulars)
+
+ def get_returning_callers(self, count: int = 2) -> list[dict]:
+ """Get up to `count` regulars for returning caller slots"""
+ import random
+ if not self._regulars:
+ return []
+ available = [r for r in self._regulars if len(r.get("call_history", [])) > 0]
+ if not available:
+ return []
+ return random.sample(available, min(count, len(available)))
+
+ def add_regular(self, name: str, gender: str, age: int, job: str,
+ location: str, personality_traits: list[str],
+ first_call_summary: str) -> dict:
+ """Promote a first-time caller to regular"""
+ # Retire oldest if at cap
+ if len(self._regulars) >= MAX_REGULARS:
+ self._regulars.sort(key=lambda r: r.get("last_call", 0))
+ retired = self._regulars.pop(0)
+ print(f"[Regulars] Retired {retired['name']} to make room")
+
+ regular = {
+ "id": str(uuid.uuid4())[:8],
+ "name": name,
+ "gender": gender,
+ "age": age,
+ "job": job,
+ "location": location,
+ "personality_traits": personality_traits,
+ "call_history": [
+ {"summary": first_call_summary, "timestamp": time.time()}
+ ],
+ "last_call": time.time(),
+ "created_at": time.time(),
+ }
+ self._regulars.append(regular)
+ self._save()
+ print(f"[Regulars] Promoted {name} to regular (total: {len(self._regulars)})")
+ return regular
+
+ def update_after_call(self, regular_id: str, call_summary: str):
+ """Update a regular's history after a returning call"""
+ for regular in self._regulars:
+ if regular["id"] == regular_id:
+ regular.setdefault("call_history", []).append(
+ {"summary": call_summary, "timestamp": time.time()}
+ )
+ regular["last_call"] = time.time()
+ self._save()
+ print(f"[Regulars] Updated {regular['name']} call history ({len(regular['call_history'])} calls)")
+ return
+ print(f"[Regulars] Regular {regular_id} not found for update")
+
+
+regular_caller_service = RegularCallerService()
diff --git a/data/regulars.json b/data/regulars.json
new file mode 100644
index 0000000..e1b5fd1
--- /dev/null
+++ b/data/regulars.json
@@ -0,0 +1 @@
+{"regulars": []}
diff --git a/docs/plans/2026-02-05-real-callers-design.md b/docs/plans/2026-02-05-real-callers-design.md
new file mode 100644
index 0000000..1b25cf5
--- /dev/null
+++ b/docs/plans/2026-02-05-real-callers-design.md
@@ -0,0 +1,189 @@
+# Real Callers + AI Follow-Up Design
+
+## Overview
+
+Add real phone callers to the AI Radio Show via Twilio, alongside existing AI callers. Real callers dial a phone number, wait in a hold queue, and get taken on air by the host. Three-way conversations between host, real caller, and AI caller are supported. AI follow-up callers automatically reference what real callers said.
+
+## Requirements
+
+- Real callers connect via Twilio phone number
+- Full-duplex audio — host and caller talk simultaneously, talk over each other
+- Each real caller gets their own dedicated audio channel for recording
+- Three-way calls: host + real caller + AI caller all live at once
+- AI caller can respond manually (host-triggered) or automatically (listens and decides when to jump in)
+- AI follow-up callers reference real caller conversations via show history
+- Auto follow-up mode: system picks an AI caller and connects them after a real call
+- Simple hold queue — callers wait with hold music, host sees list and picks who goes on air
+- Twilio webhooks exposed via Cloudflare tunnel
+
+## Architecture
+
+### Audio Routing (Loopback Channels)
+
+```
+Ch 1: Host mic (existing)
+Ch 2: AI callers / TTS (existing)
+Ch 3+: Real callers (dynamically assigned per call)
+Ch N-1: Music (existing)
+Ch N: SFX (existing)
+```
+
+### Call Flow — Real Caller
+
+```
+Caller dials Twilio number
+ → Twilio POST /api/twilio/voice
+ → TwiML response: greeting + enqueue with hold music
+ → Caller waits in hold queue
+ → Host sees caller in dashboard queue panel
+ → Host clicks "Take Call"
+ → POST /api/queue/take/{call_sid}
+ → Twilio opens WebSocket to /api/twilio/stream
+ → Bidirectional audio:
+ Caller audio → decode mulaw → dedicated Loopback channel
+ Host audio + AI TTS → encode mulaw → Twilio → caller hears both
+ → Real-time Whisper transcription of caller audio
+ → Host hangs up → call summarized → stored in show history
+```
+
+### Three-Way Call Flow
+
+```
+Host mic ──────→ Ch 1 (recording)
+ → Twilio outbound (real caller hears you)
+ → Whisper transcription (AI gets your words)
+
+Real caller ──→ Ch 3+ (recording, dedicated channel)
+ → Whisper transcription (AI gets their words)
+ → Host headphones
+
+AI TTS ───────→ Ch 2 (recording)
+ → Twilio outbound (real caller hears AI)
+ → Host headphones (already works)
+```
+
+Conversation history becomes three-party with role labels: `host`, `real_caller`, `ai_caller`.
+
+### AI Auto-Respond Mode
+
+When toggled on, after each real caller transcription chunk:
+
+1. Lightweight LLM call ("should I respond?" — use fast model like Haiku)
+2. If YES → full response generated → TTS → plays on AI channel + streams to Twilio
+3. Cooldown (~10s) prevents rapid-fire
+4. Host can override with mute button
+
+### AI Follow-Up System
+
+After a real caller hangs up:
+
+1. Full transcript (host + real caller + any AI) summarized by LLM
+2. Summary stored in `session.call_history`
+3. Next AI caller's system prompt includes show history:
+ ```
+ EARLIER IN THE SHOW:
+ - Dave (real caller) called about his wife leaving after 12 years.
+ He got emotional about his kids.
+ - Jasmine called about her boss hitting on her at work.
+ You can reference these if it feels natural. Don't force it.
+ ```
+
+**Host-triggered (default):** Click any AI caller as normal. They already have show context.
+
+**Auto mode:** After real caller hangs up, system waits ~5-10s, picks a fitting AI caller via short LLM call, biases their background generation toward the topic, auto-connects.
+
+## Backend Changes
+
+### New Module: `backend/services/twilio_service.py`
+
+Manages Twilio integration:
+- WebSocket handler for Media Streams (decode/encode mulaw 8kHz ↔ PCM)
+- Call queue state (waiting callers, SIDs, timestamps, assigned channels)
+- Channel pool management (allocate/release Loopback channels for real callers)
+- Outbound audio mixing (host + AI TTS → mulaw → Twilio)
+- Methods: `take_call()`, `hangup_real_caller()`, `get_queue()`, `send_audio_to_caller()`
+
+### New Endpoints
+
+```python
+# Twilio webhooks
+POST /api/twilio/voice # Incoming call → TwiML (greet + enqueue)
+POST /api/twilio/hold-music # Hold music TwiML for waiting callers
+WS /api/twilio/stream # Media Streams WebSocket (bidirectional audio)
+
+# Host controls
+GET /api/queue # List waiting callers (number, wait time)
+POST /api/queue/take/{call_sid} # Dequeue caller → start media stream
+POST /api/queue/drop/{call_sid} # Drop caller from queue
+
+# AI follow-up
+POST /api/followup/generate # Summarize last real call, trigger AI follow-up
+```
+
+### Session Model Changes
+
+```python
+class CallRecord:
+ caller_type: str # "ai" or "real"
+ caller_name: str # "Tony" or "Caller #3"
+ summary: str # LLM-generated summary after hangup
+ transcript: list[dict] # Full conversation [{role, content}]
+
+class Session:
+ # Existing fields...
+ call_history: list[CallRecord] # All calls this episode
+ active_real_caller: dict | None # {call_sid, phone, channel, name}
+ active_ai_caller: str | None # Caller key
+ ai_respond_mode: str # "manual" or "auto"
+ auto_followup: bool # Auto-generate AI follow-up after real calls
+```
+
+Three-party conversation history uses roles: `host`, `real_caller:{name}`, `ai_caller:{name}`.
+
+### AI Caller Prompt Changes
+
+`get_caller_prompt()` extended to include:
+- Show history from `session.call_history`
+- Current real caller context (if three-way call active)
+- Instructions for referencing real callers naturally
+
+## Frontend Changes
+
+### New: Call Queue Panel
+
+Between callers section and chat. Shows waiting real callers with phone number and wait time. "Take Call" and "Drop" buttons per caller. Polls `/api/queue` every few seconds.
+
+### Modified: Active Call Indicator
+
+Shows real caller and AI caller simultaneously when both active:
+- Real caller: name, channel number, call duration, hang up button
+- AI caller: name, Manual/Auto toggle, "Let [name] respond" button (manual mode)
+- Auto Follow-Up checkbox
+
+### Modified: Chat Log
+
+Three-party with visual distinction:
+- Host messages: existing style
+- Real caller: labeled "Dave (caller)", distinct color
+- AI caller: labeled "Tony (AI)", distinct color
+
+### Modified: Caller Grid
+
+When real caller is active, clicking an AI caller adds them as third party instead of starting fresh call. Indicator shows which AI callers have been on the show this session.
+
+## Dependencies
+
+- `twilio` Python package (for TwiML generation, REST API)
+- Twilio account with phone number (~$1.15/mo + per-minute)
+- Cloudflare tunnel for exposing webhook endpoints
+- `audioop` or equivalent for mulaw encode/decode (stdlib in Python 3.11)
+
+## Configuration
+
+New env vars in `.env`:
+```
+TWILIO_ACCOUNT_SID=...
+TWILIO_AUTH_TOKEN=...
+TWILIO_PHONE_NUMBER=+1...
+TWILIO_WEBHOOK_BASE_URL=https://your-tunnel.cloudflare.com
+```
diff --git a/docs/plans/2026-02-05-real-callers-implementation.md b/docs/plans/2026-02-05-real-callers-implementation.md
new file mode 100644
index 0000000..d42f7da
--- /dev/null
+++ b/docs/plans/2026-02-05-real-callers-implementation.md
@@ -0,0 +1,1829 @@
+# Real Callers + AI Follow-Up Implementation Plan
+
+> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
+
+**Goal:** Add Twilio phone call-in support with hold queue, three-way calls (host + real caller + AI), AI auto-respond mode, and AI follow-up callers that reference real caller conversations.
+
+**Architecture:** Twilio Media Streams deliver real caller audio via WebSocket. Audio is decoded from mulaw 8kHz, routed to a dedicated Loopback channel, and transcribed in real-time. Host + AI TTS audio is mixed and streamed back to the caller. Session model tracks multi-party conversations and show history for AI follow-up context.
+
+**Tech Stack:** Python/FastAPI, Twilio (twilio package + Media Streams WebSocket), sounddevice, faster-whisper, existing LLM/TTS services, vanilla JS frontend.
+
+**Design doc:** `docs/plans/2026-02-05-real-callers-design.md`
+
+---
+
+## Task 1: Config and Dependencies
+
+**Files:**
+- Modify: `backend/config.py`
+- Modify: `.env`
+
+**Step 1: Install twilio package**
+
+```bash
+pip install twilio
+```
+
+**Step 2: Add Twilio settings to config**
+
+In `backend/config.py`, add to the `Settings` class after the existing API key fields:
+
+```python
+# Twilio Settings
+twilio_account_sid: str = os.getenv("TWILIO_ACCOUNT_SID", "")
+twilio_auth_token: str = os.getenv("TWILIO_AUTH_TOKEN", "")
+twilio_phone_number: str = os.getenv("TWILIO_PHONE_NUMBER", "")
+twilio_webhook_base_url: str = os.getenv("TWILIO_WEBHOOK_BASE_URL", "")
+```
+
+**Step 3: Add placeholder env vars to `.env`**
+
+```
+TWILIO_ACCOUNT_SID=
+TWILIO_AUTH_TOKEN=
+TWILIO_PHONE_NUMBER=
+TWILIO_WEBHOOK_BASE_URL=
+```
+
+**Step 4: Verify server starts**
+
+```bash
+cd /Users/lukemacneil/ai-podcast && python -m uvicorn backend.main:app --reload --host 0.0.0.0 --port 8000
+```
+
+Expected: Server starts without errors.
+
+**Step 5: Commit**
+
+```bash
+git add backend/config.py .env
+git commit -m "Add Twilio config and dependencies"
+```
+
+---
+
+## Task 2: Session Model — Multi-Party Calls and Show History
+
+**Files:**
+- Modify: `backend/main.py` (Session class, lines 296-356)
+- Create: `tests/test_session.py`
+
+**Step 1: Write tests for new session model**
+
+Create `tests/test_session.py`:
+
+```python
+import sys
+sys.path.insert(0, "/Users/lukemacneil/ai-podcast")
+
+from backend.main import Session, CallRecord
+
+
+def test_call_record_creation():
+ record = CallRecord(
+ caller_type="real",
+ caller_name="Dave",
+ summary="Called about his wife leaving",
+ transcript=[{"role": "host", "content": "What happened?"}],
+ )
+ assert record.caller_type == "real"
+ assert record.caller_name == "Dave"
+
+
+def test_session_call_history():
+ s = Session()
+ assert s.call_history == []
+ record = CallRecord(
+ caller_type="ai", caller_name="Tony",
+ summary="Talked about gambling", transcript=[],
+ )
+ s.call_history.append(record)
+ assert len(s.call_history) == 1
+
+
+def test_session_active_real_caller():
+ s = Session()
+ assert s.active_real_caller is None
+ s.active_real_caller = {
+ "call_sid": "CA123", "phone": "+15125550142",
+ "channel": 3, "name": "Caller #1",
+ }
+ assert s.active_real_caller["channel"] == 3
+
+
+def test_session_three_party_conversation():
+ s = Session()
+ s.start_call("1") # AI caller Tony
+ s.add_message("host", "Hey Tony")
+ s.add_message("ai_caller:Tony", "What's up man")
+ s.add_message("real_caller:Dave", "Yeah I agree with Tony")
+ assert len(s.conversation) == 3
+ assert s.conversation[2]["role"] == "real_caller:Dave"
+
+
+def test_session_get_show_history_summary():
+ s = Session()
+ s.call_history.append(CallRecord(
+ caller_type="real", caller_name="Dave",
+ summary="Called about his wife leaving after 12 years",
+ transcript=[],
+ ))
+ s.call_history.append(CallRecord(
+ caller_type="ai", caller_name="Jasmine",
+ summary="Talked about her boss hitting on her",
+ transcript=[],
+ ))
+ summary = s.get_show_history()
+ assert "Dave" in summary
+ assert "Jasmine" in summary
+
+
+def test_session_reset_clears_history():
+ s = Session()
+ s.call_history.append(CallRecord(
+ caller_type="real", caller_name="Dave",
+ summary="test", transcript=[],
+ ))
+ s.active_real_caller = {"call_sid": "CA123"}
+ s.ai_respond_mode = "auto"
+ s.reset()
+ assert s.call_history == []
+ assert s.active_real_caller is None
+ assert s.ai_respond_mode == "manual"
+
+
+def test_session_conversation_summary_three_party():
+ s = Session()
+ s.start_call("1")
+ s.add_message("host", "Tell me what happened")
+ s.add_message("real_caller:Dave", "She just left man")
+ s.add_message("ai_caller:Tony", "Same thing happened to me")
+ summary = s.get_conversation_summary()
+ assert "Dave" in summary
+ assert "Tony" in summary
+```
+
+**Step 2: Run tests to verify they fail**
+
+```bash
+cd /Users/lukemacneil/ai-podcast && python -m pytest tests/test_session.py -v
+```
+
+Expected: Failures — `CallRecord` doesn't exist, new fields missing.
+
+**Step 3: Implement CallRecord and extend Session**
+
+In `backend/main.py`, add `CallRecord` dataclass above the `Session` class:
+
+```python
+from dataclasses import dataclass, field
+
+@dataclass
+class CallRecord:
+ caller_type: str # "ai" or "real"
+ caller_name: str # "Tony" or "Caller #3"
+ summary: str # LLM-generated summary after hangup
+ transcript: list[dict] = field(default_factory=list)
+```
+
+Extend `Session.__init__` to add:
+
+```python
+self.call_history: list[CallRecord] = []
+self.active_real_caller: dict | None = None
+self.ai_respond_mode: str = "manual" # "manual" or "auto"
+self.auto_followup: bool = False
+```
+
+Add `get_show_history()` method to Session:
+
+```python
+def get_show_history(self) -> str:
+ """Get formatted show history for AI caller prompts"""
+ if not self.call_history:
+ return ""
+ lines = ["EARLIER IN THE SHOW:"]
+ for record in self.call_history:
+ caller_type_label = "(real caller)" if record.caller_type == "real" else "(AI)"
+ lines.append(f"- {record.caller_name} {caller_type_label}: {record.summary}")
+ lines.append("You can reference these if it feels natural. Don't force it.")
+ return "\n".join(lines)
+```
+
+Update `get_conversation_summary()` to handle three-party roles — replace the role label logic:
+
+```python
+def get_conversation_summary(self) -> str:
+ if len(self.conversation) <= 2:
+ return ""
+ summary_parts = []
+ for msg in self.conversation[-6:]:
+ role = msg["role"]
+ if role == "user" or role == "host":
+ label = "Host"
+ elif role.startswith("real_caller:"):
+ label = role.split(":", 1)[1]
+ elif role.startswith("ai_caller:"):
+ label = role.split(":", 1)[1]
+ elif role == "assistant":
+ label = self.caller["name"] if self.caller else "Caller"
+ else:
+ label = role
+ content = msg["content"]
+ summary_parts.append(
+ f'{label}: "{content[:100]}..."' if len(content) > 100
+ else f'{label}: "{content}"'
+ )
+ return "\n".join(summary_parts)
+```
+
+Update `reset()` to clear new fields:
+
+```python
+def reset(self):
+ self.caller_backgrounds = {}
+ self.current_caller_key = None
+ self.conversation = []
+ self.call_history = []
+ self.active_real_caller = None
+ self.ai_respond_mode = "manual"
+ self.auto_followup = False
+ self.id = str(uuid.uuid4())[:8]
+ print(f"[Session] Reset - new session ID: {self.id}")
+```
+
+**Step 4: Run tests to verify they pass**
+
+```bash
+cd /Users/lukemacneil/ai-podcast && python -m pytest tests/test_session.py -v
+```
+
+Expected: All PASS.
+
+**Step 5: Commit**
+
+```bash
+git add backend/main.py tests/test_session.py
+git commit -m "Add CallRecord model and multi-party session support"
+```
+
+---
+
+## Task 3: Twilio Call Queue Service
+
+**Files:**
+- Create: `backend/services/twilio_service.py`
+- Create: `tests/test_twilio_service.py`
+
+**Step 1: Write tests for call queue**
+
+Create `tests/test_twilio_service.py`:
+
+```python
+import sys
+sys.path.insert(0, "/Users/lukemacneil/ai-podcast")
+
+from backend.services.twilio_service import TwilioService
+
+
+def test_queue_starts_empty():
+ svc = TwilioService()
+ assert svc.get_queue() == []
+
+
+def test_add_caller_to_queue():
+ svc = TwilioService()
+ svc.add_to_queue("CA123", "+15125550142")
+ q = svc.get_queue()
+ assert len(q) == 1
+ assert q[0]["call_sid"] == "CA123"
+ assert q[0]["phone"] == "+15125550142"
+ assert "wait_time" in q[0]
+
+
+def test_remove_caller_from_queue():
+ svc = TwilioService()
+ svc.add_to_queue("CA123", "+15125550142")
+ svc.remove_from_queue("CA123")
+ assert svc.get_queue() == []
+
+
+def test_allocate_channel():
+ svc = TwilioService()
+ ch1 = svc.allocate_channel()
+ ch2 = svc.allocate_channel()
+ assert ch1 == 3 # First real caller channel
+ assert ch2 == 4
+ svc.release_channel(ch1)
+ ch3 = svc.allocate_channel()
+ assert ch3 == 3 # Reuses released channel
+
+
+def test_take_call():
+ svc = TwilioService()
+ svc.add_to_queue("CA123", "+15125550142")
+ result = svc.take_call("CA123")
+ assert result["call_sid"] == "CA123"
+ assert result["channel"] >= 3
+ assert svc.get_queue() == [] # Removed from queue
+ assert svc.active_calls["CA123"]["channel"] == result["channel"]
+
+
+def test_hangup_real_caller():
+ svc = TwilioService()
+ svc.add_to_queue("CA123", "+15125550142")
+ svc.take_call("CA123")
+ ch = svc.active_calls["CA123"]["channel"]
+ svc.hangup("CA123")
+ assert "CA123" not in svc.active_calls
+ # Channel is released back to pool
+ assert ch not in svc._allocated_channels
+
+
+def test_caller_counter_increments():
+ svc = TwilioService()
+ svc.add_to_queue("CA1", "+15125550001")
+ svc.add_to_queue("CA2", "+15125550002")
+ r1 = svc.take_call("CA1")
+ r2 = svc.take_call("CA2")
+ assert r1["name"] == "Caller #1"
+ assert r2["name"] == "Caller #2"
+```
+
+**Step 2: Run tests to verify they fail**
+
+```bash
+cd /Users/lukemacneil/ai-podcast && python -m pytest tests/test_twilio_service.py -v
+```
+
+Expected: ImportError — module doesn't exist.
+
+**Step 3: Implement TwilioService**
+
+Create `backend/services/twilio_service.py`:
+
+```python
+"""Twilio call queue and media stream service"""
+
+import time
+import threading
+from typing import Optional
+
+
+class TwilioService:
+ """Manages Twilio call queue, channel allocation, and media streams"""
+
+ # Real caller channels start at 3 (1=host, 2=AI callers)
+ FIRST_REAL_CHANNEL = 3
+
+ def __init__(self):
+ self._queue: list[dict] = [] # Waiting callers
+ self.active_calls: dict[str, dict] = {} # call_sid -> {phone, channel, name, stream}
+ self._allocated_channels: set[int] = set()
+ self._caller_counter: int = 0
+ self._lock = threading.Lock()
+
+ def add_to_queue(self, call_sid: str, phone: str):
+ """Add incoming caller to hold queue"""
+ with self._lock:
+ self._queue.append({
+ "call_sid": call_sid,
+ "phone": phone,
+ "queued_at": time.time(),
+ })
+ print(f"[Twilio] Caller {phone} added to queue (SID: {call_sid})")
+
+ def remove_from_queue(self, call_sid: str):
+ """Remove caller from queue without taking them"""
+ with self._lock:
+ self._queue = [c for c in self._queue if c["call_sid"] != call_sid]
+ print(f"[Twilio] Caller {call_sid} removed from queue")
+
+ def get_queue(self) -> list[dict]:
+ """Get current queue with wait times"""
+ now = time.time()
+ with self._lock:
+ return [
+ {
+ "call_sid": c["call_sid"],
+ "phone": c["phone"],
+ "wait_time": int(now - c["queued_at"]),
+ }
+ for c in self._queue
+ ]
+
+ def allocate_channel(self) -> int:
+ """Allocate the next available Loopback channel for a real caller"""
+ with self._lock:
+ ch = self.FIRST_REAL_CHANNEL
+ while ch in self._allocated_channels:
+ ch += 1
+ self._allocated_channels.add(ch)
+ return ch
+
+ def release_channel(self, channel: int):
+ """Release a channel back to the pool"""
+ with self._lock:
+ self._allocated_channels.discard(channel)
+
+ def take_call(self, call_sid: str) -> dict:
+ """Take a caller off hold — allocate channel and mark active"""
+ # Find in queue
+ caller = None
+ with self._lock:
+ for c in self._queue:
+ if c["call_sid"] == call_sid:
+ caller = c
+ break
+ if caller:
+ self._queue = [c for c in self._queue if c["call_sid"] != call_sid]
+
+ if not caller:
+ raise ValueError(f"Call {call_sid} not in queue")
+
+ channel = self.allocate_channel()
+ self._caller_counter += 1
+ name = f"Caller #{self._caller_counter}"
+
+ call_info = {
+ "call_sid": call_sid,
+ "phone": caller["phone"],
+ "channel": channel,
+ "name": name,
+ "started_at": time.time(),
+ }
+ self.active_calls[call_sid] = call_info
+ print(f"[Twilio] {name} ({caller['phone']}) taken on air — channel {channel}")
+ return call_info
+
+ def hangup(self, call_sid: str):
+ """Hang up on a real caller — release channel"""
+ call_info = self.active_calls.pop(call_sid, None)
+ if call_info:
+ self.release_channel(call_info["channel"])
+ print(f"[Twilio] {call_info['name']} hung up — channel {call_info['channel']} released")
+
+ def reset(self):
+ """Reset all state"""
+ with self._lock:
+ for call_info in self.active_calls.values():
+ self._allocated_channels.discard(call_info["channel"])
+ self._queue.clear()
+ self.active_calls.clear()
+ self._allocated_channels.clear()
+ self._caller_counter = 0
+ print("[Twilio] Service reset")
+```
+
+**Step 4: Run tests to verify they pass**
+
+```bash
+cd /Users/lukemacneil/ai-podcast && python -m pytest tests/test_twilio_service.py -v
+```
+
+Expected: All PASS.
+
+**Step 5: Commit**
+
+```bash
+git add backend/services/twilio_service.py tests/test_twilio_service.py
+git commit -m "Add Twilio call queue service with channel allocation"
+```
+
+---
+
+## Task 4: Twilio Webhook Endpoints
+
+**Files:**
+- Modify: `backend/main.py`
+
+**Step 1: Add Twilio webhook imports and service instance**
+
+At the top of `backend/main.py`, add:
+
+```python
+from twilio.twiml.voice_response import VoiceResponse
+from .services.twilio_service import TwilioService
+```
+
+After `session = Session()`, add:
+
+```python
+twilio_service = TwilioService()
+```
+
+**Step 2: Add the voice webhook endpoint**
+
+This is what Twilio calls when someone dials your number:
+
+```python
+from fastapi import Form
+
+@app.post("/api/twilio/voice")
+async def twilio_voice_webhook(
+ CallSid: str = Form(...),
+ From: str = Form(...),
+):
+ """Handle incoming Twilio call — greet and enqueue"""
+ twilio_service.add_to_queue(CallSid, From)
+
+ response = VoiceResponse()
+ response.say("You're calling Luke at the Roost. Hold tight, we'll get to you.", voice="alice")
+ response.enqueue(
+ "radio_show",
+ wait_url="/api/twilio/hold-music",
+ wait_url_method="POST",
+ )
+ return Response(content=str(response), media_type="application/xml")
+```
+
+**Step 3: Add hold music endpoint**
+
+```python
+@app.post("/api/twilio/hold-music")
+async def twilio_hold_music():
+ """Serve hold music TwiML for queued callers"""
+ response = VoiceResponse()
+ # Play hold music in a loop — Twilio will re-request this URL periodically
+ music_files = list(settings.music_dir.glob("*.mp3")) + list(settings.music_dir.glob("*.wav"))
+ if music_files:
+ # Use first available track via public URL
+ response.say("Please hold, you'll be on air shortly.", voice="alice")
+ response.pause(length=30)
+ else:
+ response.say("Please hold.", voice="alice")
+ response.pause(length=30)
+ return Response(content=str(response), media_type="application/xml")
+```
+
+**Step 4: Add queue management endpoints**
+
+```python
+@app.get("/api/queue")
+async def get_call_queue():
+ """Get list of callers waiting in queue"""
+ return {"queue": twilio_service.get_queue()}
+
+
+@app.post("/api/queue/take/{call_sid}")
+async def take_call_from_queue(call_sid: str):
+ """Take a caller off hold and put them on air"""
+ try:
+ call_info = twilio_service.take_call(call_sid)
+ except ValueError as e:
+ raise HTTPException(404, str(e))
+
+ session.active_real_caller = {
+ "call_sid": call_info["call_sid"],
+ "phone": call_info["phone"],
+ "channel": call_info["channel"],
+ "name": call_info["name"],
+ }
+
+ # Connect Twilio media stream by updating the call
+ # This redirects the call from the queue to a media stream
+ from twilio.rest import Client as TwilioClient
+ if settings.twilio_account_sid and settings.twilio_auth_token:
+ client = TwilioClient(settings.twilio_account_sid, settings.twilio_auth_token)
+ twiml = VoiceResponse()
+ connect = twiml.connect()
+ connect.stream(
+ url=f"wss://{settings.twilio_webhook_base_url.replace('https://', '')}/api/twilio/stream",
+ name=call_sid,
+ )
+ client.calls(call_sid).update(twiml=str(twiml))
+
+ return {
+ "status": "on_air",
+ "caller": call_info,
+ }
+
+
+@app.post("/api/queue/drop/{call_sid}")
+async def drop_from_queue(call_sid: str):
+ """Drop a caller from the queue"""
+ twilio_service.remove_from_queue(call_sid)
+
+ # Hang up the Twilio call
+ from twilio.rest import Client as TwilioClient
+ if settings.twilio_account_sid and settings.twilio_auth_token:
+ try:
+ client = TwilioClient(settings.twilio_account_sid, settings.twilio_auth_token)
+ client.calls(call_sid).update(status="completed")
+ except Exception as e:
+ print(f"[Twilio] Failed to end call {call_sid}: {e}")
+
+ return {"status": "dropped"}
+```
+
+**Step 5: Add Response import**
+
+```python
+from fastapi.responses import FileResponse, Response
+```
+
+(Modify the existing `FileResponse` import line to include `Response`.)
+
+**Step 6: Verify server starts**
+
+```bash
+cd /Users/lukemacneil/ai-podcast && python -c "from backend.main import app; print('OK')"
+```
+
+Expected: `OK`
+
+**Step 7: Commit**
+
+```bash
+git add backend/main.py
+git commit -m "Add Twilio webhook and queue management endpoints"
+```
+
+---
+
+## Task 5: WebSocket Media Stream Handler
+
+**Files:**
+- Modify: `backend/main.py`
+- Modify: `backend/services/twilio_service.py`
+- Modify: `backend/services/audio.py`
+
+This is the core of real caller audio — bidirectional streaming via Twilio Media Streams.
+
+**Step 1: Add WebSocket endpoint to main.py**
+
+```python
+from fastapi import WebSocket, WebSocketDisconnect
+import json
+import base64
+import audioop
+import asyncio
+import struct
+
+@app.websocket("/api/twilio/stream")
+async def twilio_media_stream(websocket: WebSocket):
+ """Handle Twilio Media Streams WebSocket — bidirectional audio"""
+ await websocket.accept()
+ print("[Twilio WS] Media stream connected")
+
+ call_sid = None
+ stream_sid = None
+ audio_buffer = bytearray()
+ CHUNK_DURATION_S = 3 # Transcribe every 3 seconds of audio
+ MULAW_SAMPLE_RATE = 8000
+ chunk_samples = CHUNK_DURATION_S * MULAW_SAMPLE_RATE
+
+ try:
+ while True:
+ data = await websocket.receive_text()
+ msg = json.loads(data)
+ event = msg.get("event")
+
+ if event == "start":
+ stream_sid = msg["start"]["streamSid"]
+ call_sid = msg["start"]["callSid"]
+ print(f"[Twilio WS] Stream started: {stream_sid} for call {call_sid}")
+
+ elif event == "media":
+ # Decode mulaw audio from base64
+ payload = base64.b64decode(msg["media"]["payload"])
+ # Convert mulaw to 16-bit PCM
+ pcm_data = audioop.ulaw2lin(payload, 2)
+ audio_buffer.extend(pcm_data)
+
+ # Get channel for this caller
+ call_info = twilio_service.active_calls.get(call_sid)
+ if call_info:
+ channel = call_info["channel"]
+ # Route PCM to the caller's dedicated Loopback channel
+ audio_service.route_real_caller_audio(pcm_data, channel, MULAW_SAMPLE_RATE)
+
+ # When we have enough audio, transcribe
+ if len(audio_buffer) >= chunk_samples * 2: # 2 bytes per sample
+ pcm_chunk = bytes(audio_buffer[:chunk_samples * 2])
+ audio_buffer = audio_buffer[chunk_samples * 2:]
+
+ # Transcribe in background
+ asyncio.create_task(
+ _handle_real_caller_transcription(call_sid, pcm_chunk, MULAW_SAMPLE_RATE)
+ )
+
+ elif event == "stop":
+ print(f"[Twilio WS] Stream stopped: {stream_sid}")
+ break
+
+ except WebSocketDisconnect:
+ print(f"[Twilio WS] Disconnected: {call_sid}")
+ except Exception as e:
+ print(f"[Twilio WS] Error: {e}")
+ finally:
+ # Transcribe any remaining audio
+ if audio_buffer and call_sid:
+ asyncio.create_task(
+ _handle_real_caller_transcription(call_sid, bytes(audio_buffer), MULAW_SAMPLE_RATE)
+ )
+
+
+async def _handle_real_caller_transcription(call_sid: str, pcm_data: bytes, sample_rate: int):
+ """Transcribe a chunk of real caller audio and add to conversation"""
+ call_info = twilio_service.active_calls.get(call_sid)
+ if not call_info:
+ return
+
+ text = await transcribe_audio(pcm_data, source_sample_rate=sample_rate)
+ if not text or not text.strip():
+ return
+
+ caller_name = call_info["name"]
+ print(f"[Real Caller] {caller_name}: {text}")
+
+ # Add to conversation with real_caller role
+ session.add_message(f"real_caller:{caller_name}", text)
+
+ # If AI auto-respond mode is on and an AI caller is active, check if AI should respond
+ if session.ai_respond_mode == "auto" and session.current_caller_key:
+ asyncio.create_task(_check_ai_auto_respond(text, caller_name))
+
+
+async def _check_ai_auto_respond(real_caller_text: str, real_caller_name: str):
+ """Check if AI caller should jump in, and generate response if so"""
+ if not session.caller:
+ return
+
+ # Cooldown check
+ if hasattr(session, '_last_ai_auto_respond') and \
+ time.time() - session._last_ai_auto_respond < 10:
+ return
+
+ ai_name = session.caller["name"]
+
+ # Quick "should I respond?" check with minimal LLM call
+ should_respond = await llm_service.generate(
+ messages=[{"role": "user", "content": f'Someone just said: "{real_caller_text}". Should {ai_name} jump in? Reply only YES or NO.'}],
+ system_prompt=f"You're deciding if {ai_name} should respond to what was just said on a radio show. Say YES if it's interesting or relevant to them, NO if not.",
+ )
+
+ if "YES" not in should_respond.upper():
+ return
+
+ print(f"[Auto-Respond] {ai_name} is jumping in...")
+ session._last_ai_auto_respond = time.time()
+
+ # Generate full response
+ conversation_summary = session.get_conversation_summary()
+ show_history = session.get_show_history()
+ system_prompt = get_caller_prompt(session.caller, conversation_summary)
+ if show_history:
+ system_prompt += f"\n\n{show_history}"
+
+ response = await llm_service.generate(
+ messages=session.conversation[-10:],
+ system_prompt=system_prompt,
+ )
+ response = clean_for_tts(response)
+ if not response or not response.strip():
+ return
+
+ session.add_message(f"ai_caller:{ai_name}", response)
+
+ # Generate TTS and play
+ audio_bytes = await generate_speech(response, session.caller["voice"], "none")
+
+ import threading
+ thread = threading.Thread(
+ target=audio_service.play_caller_audio,
+ args=(audio_bytes, 24000),
+ daemon=True,
+ )
+ thread.start()
+
+ # Also send to Twilio so real caller hears the AI
+ # (handled in Task 6 - outbound audio mixing)
+```
+
+**Step 2: Add `route_real_caller_audio` to AudioService**
+
+In `backend/services/audio.py`, add this method to `AudioService`:
+
+```python
+def route_real_caller_audio(self, pcm_data: bytes, channel: int, sample_rate: int):
+ """Route real caller PCM audio to a specific Loopback channel"""
+ import librosa
+
+ if self.output_device is None:
+ return
+
+ try:
+ # Convert bytes to float32
+ audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
+
+ device_info = sd.query_devices(self.output_device)
+ num_channels = device_info['max_output_channels']
+ device_sr = int(device_info['default_samplerate'])
+ channel_idx = min(channel, num_channels) - 1
+
+ # Resample from Twilio's 8kHz to device sample rate
+ if sample_rate != device_sr:
+ audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=device_sr)
+
+ # Create multi-channel output
+ multi_ch = np.zeros((len(audio), num_channels), dtype=np.float32)
+ multi_ch[:, channel_idx] = audio
+
+ # Write to output device (non-blocking, small chunks)
+ with sd.OutputStream(
+ device=self.output_device,
+ samplerate=device_sr,
+ channels=num_channels,
+ dtype=np.float32,
+ ) as stream:
+ stream.write(multi_ch)
+
+ except Exception as e:
+ print(f"Real caller audio routing error: {e}")
+```
+
+**Step 3: Add `import time` at the top of `main.py`** (if not already present)
+
+**Step 4: Verify server starts**
+
+```bash
+cd /Users/lukemacneil/ai-podcast && python -c "from backend.main import app; print('OK')"
+```
+
+Expected: `OK`
+
+**Step 5: Commit**
+
+```bash
+git add backend/main.py backend/services/audio.py
+git commit -m "Add Twilio WebSocket media stream handler with real-time transcription"
+```
+
+---
+
+## Task 6: Outbound Audio to Real Caller (Host + AI TTS)
+
+**Files:**
+- Modify: `backend/services/twilio_service.py`
+- Modify: `backend/main.py`
+
+The real caller needs to hear the host's voice and the AI caller's TTS through the phone.
+
+**Step 1: Add WebSocket registry to TwilioService**
+
+In `backend/services/twilio_service.py`, add:
+
+```python
+import asyncio
+import base64
+import audioop
+
+class TwilioService:
+ def __init__(self):
+ # ... existing init ...
+ self._websockets: dict[str, any] = {} # call_sid -> WebSocket
+
+ def register_websocket(self, call_sid: str, websocket):
+ """Register a WebSocket for a call"""
+ self._websockets[call_sid] = websocket
+
+ def unregister_websocket(self, call_sid: str):
+ """Unregister a WebSocket"""
+ self._websockets.pop(call_sid, None)
+
+ async def send_audio_to_caller(self, call_sid: str, pcm_data: bytes, sample_rate: int):
+ """Send audio back to real caller via Twilio WebSocket"""
+ ws = self._websockets.get(call_sid)
+ if not ws:
+ return
+
+ call_info = self.active_calls.get(call_sid)
+ if not call_info or "stream_sid" not in call_info:
+ return
+
+ try:
+ # Resample to 8kHz if needed
+ if sample_rate != 8000:
+ import numpy as np
+ import librosa
+ audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
+ audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=8000)
+ pcm_data = (audio * 32767).astype(np.int16).tobytes()
+
+ # Convert PCM to mulaw
+ mulaw_data = audioop.lin2ulaw(pcm_data, 2)
+
+ # Send as Twilio media message
+ import json
+ await ws.send_text(json.dumps({
+ "event": "media",
+ "streamSid": call_info["stream_sid"],
+ "media": {
+ "payload": base64.b64encode(mulaw_data).decode("ascii"),
+ },
+ }))
+ except Exception as e:
+ print(f"[Twilio] Failed to send audio to caller: {e}")
+```
+
+**Step 2: Update WebSocket handler in main.py to register/unregister**
+
+In the `twilio_media_stream` function, after the `event == "start"` block, add:
+
+```python
+if event == "start":
+ stream_sid = msg["start"]["streamSid"]
+ call_sid = msg["start"]["callSid"]
+ twilio_service.register_websocket(call_sid, websocket)
+ if call_sid in twilio_service.active_calls:
+ twilio_service.active_calls[call_sid]["stream_sid"] = stream_sid
+ print(f"[Twilio WS] Stream started: {stream_sid} for call {call_sid}")
+```
+
+In the `finally` block, add:
+
+```python
+finally:
+ if call_sid:
+ twilio_service.unregister_websocket(call_sid)
+```
+
+**Step 3: Send AI TTS audio to real caller**
+
+In the `/api/tts` endpoint, after starting the playback thread, add code to also stream to any active real callers:
+
+```python
+# Also send to active real callers so they hear the AI
+if session.active_real_caller:
+ call_sid = session.active_real_caller["call_sid"]
+ asyncio.create_task(
+ twilio_service.send_audio_to_caller(call_sid, audio_bytes, 24000)
+ )
+```
+
+**Step 4: Commit**
+
+```bash
+git add backend/main.py backend/services/twilio_service.py
+git commit -m "Add outbound audio streaming to real callers"
+```
+
+---
+
+## Task 7: AI Follow-Up System
+
+**Files:**
+- Modify: `backend/main.py`
+- Create: `tests/test_followup.py`
+
+**Step 1: Write tests**
+
+Create `tests/test_followup.py`:
+
+```python
+import sys
+sys.path.insert(0, "/Users/lukemacneil/ai-podcast")
+
+from backend.main import Session, CallRecord, get_caller_prompt
+
+
+def test_caller_prompt_includes_show_history():
+ s = Session()
+ s.call_history.append(CallRecord(
+ caller_type="real", caller_name="Dave",
+ summary="Called about his wife leaving after 12 years",
+ transcript=[],
+ ))
+
+ # Simulate an active AI caller
+ s.start_call("1") # Tony
+ caller = s.caller
+ prompt = get_caller_prompt(caller, "", s.get_show_history())
+ assert "Dave" in prompt
+ assert "wife leaving" in prompt
+ assert "EARLIER IN THE SHOW" in prompt
+```
+
+**Step 2: Update `get_caller_prompt` to accept show history**
+
+In `backend/main.py`, modify `get_caller_prompt` signature and body:
+
+```python
+def get_caller_prompt(caller: dict, conversation_summary: str = "", show_history: str = "") -> str:
+ context = ""
+ if conversation_summary:
+ context = f"""
+CONVERSATION SO FAR:
+{conversation_summary}
+Continue naturally. Don't repeat yourself.
+"""
+ history = ""
+ if show_history:
+ history = f"\n{show_history}\n"
+
+ return f"""You're {caller['name']}, calling a late-night radio show. You trust this host.
+
+{caller['vibe']}
+{history}{context}
+HOW TO TALK:
+... # rest of the existing prompt unchanged
+"""
+```
+
+**Step 3: Update `/api/chat` to include show history**
+
+In the `/api/chat` endpoint:
+
+```python
+@app.post("/api/chat")
+async def chat(request: ChatRequest):
+ if not session.caller:
+ raise HTTPException(400, "No active call")
+
+ session.add_message("user", request.text)
+
+ conversation_summary = session.get_conversation_summary()
+ show_history = session.get_show_history()
+ system_prompt = get_caller_prompt(session.caller, conversation_summary, show_history)
+
+ # ... rest unchanged
+```
+
+**Step 4: Add hangup endpoint for real callers with summarization**
+
+```python
+@app.post("/api/hangup/real")
+async def hangup_real_caller():
+ """Hang up on real caller — summarize call and store in history"""
+ if not session.active_real_caller:
+ raise HTTPException(400, "No active real caller")
+
+ call_sid = session.active_real_caller["call_sid"]
+ caller_name = session.active_real_caller["name"]
+
+ # Summarize the conversation
+ summary = ""
+ if session.conversation:
+ transcript_text = "\n".join(
+ f"{msg['role']}: {msg['content']}" for msg in session.conversation
+ )
+ summary = await llm_service.generate(
+ messages=[{"role": "user", "content": f"Summarize this radio show call in 1-2 sentences:\n{transcript_text}"}],
+ system_prompt="You summarize radio show conversations concisely. Focus on what the caller talked about and any emotional moments.",
+ )
+
+ # Store in call history
+ session.call_history.append(CallRecord(
+ caller_type="real",
+ caller_name=caller_name,
+ summary=summary,
+ transcript=list(session.conversation),
+ ))
+
+ # Clean up
+ twilio_service.hangup(call_sid)
+
+ # End the Twilio call
+ from twilio.rest import Client as TwilioClient
+ if settings.twilio_account_sid and settings.twilio_auth_token:
+ try:
+ client = TwilioClient(settings.twilio_account_sid, settings.twilio_auth_token)
+ client.calls(call_sid).update(status="completed")
+ except Exception as e:
+ print(f"[Twilio] Failed to end call: {e}")
+
+ session.active_real_caller = None
+ # Don't clear conversation — AI follow-up might reference it
+ # Conversation gets cleared when next call starts
+
+ # Play hangup sound
+ hangup_sound = settings.sounds_dir / "hangup.wav"
+ if hangup_sound.exists():
+ audio_service.play_sfx(str(hangup_sound))
+
+ # Auto follow-up?
+ auto_followup_triggered = False
+ if session.auto_followup:
+ auto_followup_triggered = True
+ asyncio.create_task(_auto_followup(summary))
+
+ return {
+ "status": "disconnected",
+ "caller": caller_name,
+ "summary": summary,
+ "auto_followup": auto_followup_triggered,
+ }
+
+
+async def _auto_followup(last_call_summary: str):
+ """Automatically pick an AI caller and connect them as follow-up"""
+ await asyncio.sleep(7) # Brief pause before follow-up
+
+ # Ask LLM to pick best AI caller for follow-up
+ caller_list = ", ".join(
+ f'{k}: {v["name"]} ({v["gender"]}, {v["age_range"][0]}-{v["age_range"][1]})'
+ for k, v in CALLER_BASES.items()
+ )
+ pick = await llm_service.generate(
+ messages=[{"role": "user", "content": f'A caller just talked about: "{last_call_summary}". Which AI caller should follow up? Available: {caller_list}. Reply with just the key number.'}],
+ system_prompt="Pick the most interesting AI caller to follow up on this topic. Just reply with the number key.",
+ )
+
+ # Extract key from response
+ import re
+ match = re.search(r'\d+', pick)
+ if match:
+ caller_key = match.group()
+ if caller_key in CALLER_BASES:
+ session.start_call(caller_key)
+ print(f"[Auto Follow-Up] {CALLER_BASES[caller_key]['name']} is calling in about: {last_call_summary[:50]}...")
+```
+
+**Step 5: Add manual follow-up endpoint**
+
+```python
+@app.post("/api/followup/generate")
+async def generate_followup():
+ """Generate an AI follow-up caller based on recent show history"""
+ if not session.call_history:
+ raise HTTPException(400, "No call history to follow up on")
+
+ last_record = session.call_history[-1]
+ await _auto_followup(last_record.summary)
+
+ return {
+ "status": "followup_triggered",
+ "based_on": last_record.caller_name,
+ }
+```
+
+**Step 6: Run tests**
+
+```bash
+cd /Users/lukemacneil/ai-podcast && python -m pytest tests/test_followup.py -v
+```
+
+Expected: All PASS.
+
+**Step 7: Commit**
+
+```bash
+git add backend/main.py tests/test_followup.py
+git commit -m "Add AI follow-up system with call summarization and show history"
+```
+
+---
+
+## Task 8: Frontend — Call Queue Panel
+
+**Files:**
+- Modify: `frontend/index.html`
+- Modify: `frontend/js/app.js`
+- Modify: `frontend/css/style.css`
+
+**Step 1: Add queue panel HTML**
+
+In `frontend/index.html`, after the callers section (`` at line 27) and before the chat section, add:
+
+```html
+
+Incoming Calls
+