Reduce live caller latency and improve reliability
- Replace per-callback async task spawning with persistent queue-based sender - Buffer host mic to 60ms chunks (was 21ms) to reduce WebSocket frame rate - Reduce server ring buffer prebuffer from 150ms to 80ms - Reduce browser playback jitter buffer from 150ms to 100ms Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -872,8 +872,14 @@ async def caller_audio_stream(websocket: WebSocket):
|
|||||||
|
|
||||||
# --- Host Audio Broadcast ---
|
# --- Host Audio Broadcast ---
|
||||||
|
|
||||||
async def _broadcast_host_audio(pcm_bytes: bytes):
|
_host_audio_queue: asyncio.Queue = None
|
||||||
"""Send host mic audio to all active real callers"""
|
_host_audio_task: asyncio.Task = None
|
||||||
|
|
||||||
|
|
||||||
|
async def _host_audio_sender():
|
||||||
|
"""Persistent task that drains audio queue and sends to callers"""
|
||||||
|
while True:
|
||||||
|
pcm_bytes = await _host_audio_queue.get()
|
||||||
for caller_id in list(caller_service.active_calls.keys()):
|
for caller_id in list(caller_service.active_calls.keys()):
|
||||||
try:
|
try:
|
||||||
await caller_service.send_audio_to_caller(caller_id, pcm_bytes, 16000)
|
await caller_service.send_audio_to_caller(caller_id, pcm_bytes, 16000)
|
||||||
@@ -881,16 +887,23 @@ async def _broadcast_host_audio(pcm_bytes: bytes):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _start_host_audio_sender():
|
||||||
|
"""Start the persistent host audio sender task"""
|
||||||
|
global _host_audio_queue, _host_audio_task
|
||||||
|
if _host_audio_queue is None:
|
||||||
|
_host_audio_queue = asyncio.Queue(maxsize=100)
|
||||||
|
if _host_audio_task is None or _host_audio_task.done():
|
||||||
|
_host_audio_task = asyncio.create_task(_host_audio_sender())
|
||||||
|
|
||||||
|
|
||||||
def _host_audio_sync_callback(pcm_bytes: bytes):
|
def _host_audio_sync_callback(pcm_bytes: bytes):
|
||||||
"""Sync wrapper to schedule async broadcast from audio thread"""
|
"""Sync callback from audio thread — push to queue for async sending"""
|
||||||
|
if _host_audio_queue is None:
|
||||||
|
return
|
||||||
try:
|
try:
|
||||||
loop = asyncio.get_event_loop()
|
_host_audio_queue.put_nowait(pcm_bytes)
|
||||||
if loop.is_running():
|
except asyncio.QueueFull:
|
||||||
loop.call_soon_threadsafe(
|
pass # Drop frame rather than block
|
||||||
asyncio.ensure_future, _broadcast_host_audio(pcm_bytes)
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# --- Queue Endpoints ---
|
# --- Queue Endpoints ---
|
||||||
@@ -920,6 +933,7 @@ async def take_call_from_queue(caller_id: str):
|
|||||||
|
|
||||||
# Start host mic streaming if this is the first real caller
|
# Start host mic streaming if this is the first real caller
|
||||||
if len(caller_service.active_calls) == 1:
|
if len(caller_service.active_calls) == 1:
|
||||||
|
_start_host_audio_sender()
|
||||||
audio_service.start_host_stream(_host_audio_sync_callback)
|
audio_service.start_host_stream(_host_audio_sync_callback)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -350,10 +350,10 @@ class AudioService:
|
|||||||
self._live_caller_num_channels = num_channels
|
self._live_caller_num_channels = num_channels
|
||||||
self._live_caller_channel_idx = channel_idx
|
self._live_caller_channel_idx = channel_idx
|
||||||
|
|
||||||
# Ring buffer: 3 seconds capacity, 150ms pre-buffer before playback starts
|
# Ring buffer: 3 seconds capacity, 80ms pre-buffer before playback starts
|
||||||
ring_size = int(device_sr * 3)
|
ring_size = int(device_sr * 3)
|
||||||
ring = np.zeros(ring_size, dtype=np.float32)
|
ring = np.zeros(ring_size, dtype=np.float32)
|
||||||
prebuffer_samples = int(device_sr * 0.15)
|
prebuffer_samples = int(device_sr * 0.08)
|
||||||
# Mutable state shared between writer (main thread) and reader (audio callback)
|
# Mutable state shared between writer (main thread) and reader (audio callback)
|
||||||
# CPython GIL makes individual int reads/writes atomic
|
# CPython GIL makes individual int reads/writes atomic
|
||||||
state = {"write_pos": 0, "read_pos": 0, "avail": 0, "started": False}
|
state = {"write_pos": 0, "read_pos": 0, "avail": 0, "started": False}
|
||||||
@@ -459,6 +459,11 @@ class AudioService:
|
|||||||
record_channel = min(self.input_channel, max_channels) - 1
|
record_channel = min(self.input_channel, max_channels) - 1
|
||||||
step = max(1, int(device_sr / 16000))
|
step = max(1, int(device_sr / 16000))
|
||||||
|
|
||||||
|
# Buffer host mic to send ~60ms chunks instead of tiny 21ms ones
|
||||||
|
host_accum = []
|
||||||
|
host_accum_samples = [0]
|
||||||
|
send_threshold = 960 # 60ms at 16kHz
|
||||||
|
|
||||||
def callback(indata, frames, time_info, status):
|
def callback(indata, frames, time_info, status):
|
||||||
# Capture for push-to-talk recording if active
|
# Capture for push-to-talk recording if active
|
||||||
if self._recording:
|
if self._recording:
|
||||||
@@ -470,7 +475,15 @@ class AudioService:
|
|||||||
# Simple decimation to ~16kHz
|
# Simple decimation to ~16kHz
|
||||||
if step > 1:
|
if step > 1:
|
||||||
mono = mono[::step]
|
mono = mono[::step]
|
||||||
pcm = (mono * 32767).astype(np.int16).tobytes()
|
|
||||||
|
host_accum.append(mono.copy())
|
||||||
|
host_accum_samples[0] += len(mono)
|
||||||
|
|
||||||
|
if host_accum_samples[0] >= send_threshold:
|
||||||
|
combined = np.concatenate(host_accum)
|
||||||
|
pcm = (combined * 32767).astype(np.int16).tobytes()
|
||||||
|
host_accum.clear()
|
||||||
|
host_accum_samples[0] = 0
|
||||||
self._host_send_callback(pcm)
|
self._host_send_callback(pcm)
|
||||||
|
|
||||||
self._host_device_sr = device_sr
|
self._host_device_sr = device_sr
|
||||||
|
|||||||
@@ -150,6 +150,6 @@
|
|||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<script src="/js/call-in.js?v=3"></script>
|
<script src="/js/call-in.js?v=4"></script>
|
||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
||||||
|
|||||||
@@ -84,7 +84,7 @@ class PlaybackProcessor extends AudioWorkletProcessor {
|
|||||||
this.readPos = 0;
|
this.readPos = 0;
|
||||||
this.available = 0;
|
this.available = 0;
|
||||||
this.started = false;
|
this.started = false;
|
||||||
this.jitterMs = 150; // buffer 150ms before starting playback
|
this.jitterMs = 100; // buffer 100ms before starting playback
|
||||||
this.jitterSamples = Math.floor(16000 * this.jitterMs / 1000);
|
this.jitterSamples = Math.floor(16000 * this.jitterMs / 1000);
|
||||||
|
|
||||||
this.port.onmessage = (e) => {
|
this.port.onmessage = (e) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user