Add listener email system with IMAP polling, TTS playback, and show awareness

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-16 05:22:56 -07:00
parent f0271e61df
commit d85a8d4511
11 changed files with 335 additions and 15 deletions

View File

@@ -22,6 +22,11 @@ class Settings(BaseSettings):
signalwire_phone: str = os.getenv("SIGNALWIRE_PHONE", "")
signalwire_stream_url: str = os.getenv("SIGNALWIRE_STREAM_URL", "")
# Email (IMAP)
submissions_imap_host: str = os.getenv("SUBMISSIONS_IMAP_HOST", "")
submissions_imap_user: str = os.getenv("SUBMISSIONS_IMAP_USER", "")
submissions_imap_pass: str = os.getenv("SUBMISSIONS_IMAP_PASS", "")
# LLM Settings
llm_provider: str = "openrouter" # "openrouter" or "ollama"
openrouter_model: str = "anthropic/claude-sonnet-4-5"

View File

@@ -2717,15 +2717,22 @@ class Session:
def get_show_history(self) -> str:
"""Get formatted show history for AI caller prompts.
Randomly picks one previous caller to have a strong reaction to."""
if not self.call_history:
if not self.call_history and not any(e.read_on_air for e in _listener_emails):
return ""
lines = ["EARLIER IN THE SHOW:"]
for record in self.call_history:
caller_type_label = "(real caller)" if record.caller_type == "real" else "(AI)"
lines.append(f"- {record.caller_name} {caller_type_label}: {record.summary}")
# Include emails that were read on the show
read_emails = [e for e in _listener_emails if e.read_on_air]
for em in read_emails:
sender_name = em.sender.split("<")[0].strip().strip('"') if "<" in em.sender else "a listener"
preview = em.body[:150] if len(em.body) > 150 else em.body
lines.append(f"- A listener email from {sender_name} was read on air: \"{em.subject}\"{preview}")
# 20% chance to have a strong reaction to a previous caller
if random.random() < 0.20:
if self.call_history and random.random() < 0.20:
target = random.choice(self.call_history)
reaction = random.choice(SHOW_HISTORY_REACTIONS)
lines.append(f"\nYOU HEARD {target.caller_name.upper()} EARLIER and you {reaction}. Mention it if it comes up.")
@@ -3092,7 +3099,9 @@ async def _sync_signalwire_voicemails():
async def startup():
"""Pre-generate caller backgrounds on server start"""
_load_voicemails()
_load_emails()
asyncio.create_task(_sync_signalwire_voicemails())
asyncio.create_task(_poll_imap_emails())
restored = _load_checkpoint()
if not restored:
asyncio.create_task(_pregenerate_backgrounds())
@@ -3418,6 +3427,196 @@ async def delete_voicemail(vm_id: str):
return {"status": "deleted"}
# --- Listener Emails ---
EMAILS_META = Path(__file__).parent.parent / "data" / "emails.json"
@dataclass
class ListenerEmail:
id: str
sender: str
subject: str
body: str
timestamp: float
read_on_air: bool = False
_listener_emails: list[ListenerEmail] = []
def _load_emails():
global _listener_emails
if EMAILS_META.exists():
try:
with open(EMAILS_META) as f:
data = json.load(f)
_listener_emails = [
ListenerEmail(
id=e["id"], sender=e["sender"], subject=e["subject"],
body=e["body"], timestamp=e["timestamp"],
read_on_air=e.get("read_on_air", False),
)
for e in data.get("emails", [])
]
print(f"[Email] Loaded {len(_listener_emails)} emails")
except Exception as e:
print(f"[Email] Failed to load: {e}")
_listener_emails = []
def _save_emails():
try:
EMAILS_META.parent.mkdir(parents=True, exist_ok=True)
data = {
"emails": [
{
"id": e.id, "sender": e.sender, "subject": e.subject,
"body": e.body, "timestamp": e.timestamp,
"read_on_air": e.read_on_air,
}
for e in _listener_emails
],
}
with open(EMAILS_META, "w") as f:
json.dump(data, f, indent=2)
except Exception as exc:
print(f"[Email] Failed to save: {exc}")
async def _poll_imap_emails():
"""Background task: poll IMAP every 30s for new listener emails"""
import imaplib
import email as email_lib
from email.header import decode_header
host = settings.submissions_imap_host
user = settings.submissions_imap_user
passwd = settings.submissions_imap_pass
if not host or not user or not passwd:
print("[Email] IMAP not configured, skipping email polling")
return
while True:
try:
mail = imaplib.IMAP4_SSL(host, 993)
mail.login(user, passwd)
mail.select("INBOX")
_, msg_nums = mail.search(None, "UNSEEN")
if msg_nums[0]:
for num in msg_nums[0].split():
_, msg_data = mail.fetch(num, "(RFC822)")
raw = msg_data[0][1]
msg = email_lib.message_from_bytes(raw)
# Decode sender
from_raw = msg.get("From", "Unknown")
# Decode subject
subj_raw = msg.get("Subject", "(no subject)")
decoded_parts = decode_header(subj_raw)
subject = ""
for part, charset in decoded_parts:
if isinstance(part, bytes):
subject += part.decode(charset or "utf-8", errors="replace")
else:
subject += part
# Extract plain text body
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
body = payload.decode(charset, errors="replace")
break
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
body = payload.decode(charset, errors="replace")
body = body.strip()
if not body:
continue
# Parse timestamp from email Date header
from email.utils import parsedate_to_datetime
try:
ts = parsedate_to_datetime(msg.get("Date", "")).timestamp()
except Exception:
ts = time.time()
em = ListenerEmail(
id=str(uuid.uuid4())[:8],
sender=from_raw,
subject=subject,
body=body,
timestamp=ts,
)
_listener_emails.append(em)
print(f"[Email] New email from {from_raw}: {subject[:50]}")
# Mark as SEEN (already done by fetch with UNSEEN filter)
mail.store(num, "+FLAGS", "\\Seen")
_save_emails()
mail.logout()
except Exception as exc:
print(f"[Email] IMAP poll error: {exc}")
await asyncio.sleep(30)
@app.get("/api/emails")
async def list_emails():
return [
{
"id": e.id, "sender": e.sender, "subject": e.subject,
"body": e.body, "timestamp": e.timestamp,
"read_on_air": e.read_on_air,
}
for e in sorted(_listener_emails, key=lambda e: e.timestamp, reverse=True)
]
@app.post("/api/email/{email_id}/play-on-air")
async def play_email_on_air(email_id: str):
em = next((e for e in _listener_emails if e.id == email_id), None)
if not em:
raise HTTPException(status_code=404, detail="Email not found")
# Extract display name, fall back to just "a listener"
sender_name = em.sender.split("<")[0].strip().strip('"') if "<" in em.sender else "a listener"
intro = f"This email is from {sender_name}. Subject: {em.subject}."
full_text = f"{intro}\n\n{em.body}"
async def _generate_and_play():
try:
audio_bytes = await generate_speech(full_text, "Alex", phone_quality="none", apply_filter=False)
audio_service.play_caller_audio(audio_bytes, 24000)
except Exception as exc:
print(f"[Email] TTS playback error: {exc}")
asyncio.create_task(_generate_and_play())
em.read_on_air = True
_save_emails()
return {"status": "playing"}
@app.delete("/api/email/{email_id}")
async def delete_email(email_id: str):
em = next((e for e in _listener_emails if e.id == email_id), None)
if not em:
raise HTTPException(status_code=404, detail="Email not found")
_listener_emails.remove(em)
_save_emails()
return {"status": "deleted"}
async def _signalwire_end_call(call_sid: str):
"""End a phone call via SignalWire REST API"""
if not call_sid or not settings.signalwire_space: