From 4d97ea9099e47959ec95be69c2f9f18f62b04820 Mon Sep 17 00:00:00 2001 From: tcpsyn Date: Thu, 5 Feb 2026 16:37:50 -0700 Subject: [PATCH] Replace queue with ring buffer jitter absorption for live caller audio - Server: 150ms pre-buffer ring buffer eliminates gaps from timing mismatches - Browser playback: 150ms jitter buffer (up from 80ms) for network jitter - Capture chunks: 960 samples/60ms (better network efficiency) Co-Authored-By: Claude Opus 4.6 --- backend/services/audio.py | 73 +++++++++++++++++++++++++++------------ frontend/call-in.html | 2 +- frontend/js/call-in.js | 4 +-- 3 files changed, 53 insertions(+), 26 deletions(-) diff --git a/backend/services/audio.py b/backend/services/audio.py index 05dbb0e..14aa934 100644 --- a/backend/services/audio.py +++ b/backend/services/audio.py @@ -55,7 +55,7 @@ class AudioService: # Live caller routing state self._live_caller_stream: Optional[sd.OutputStream] = None - self._live_caller_queue: Optional[queue.Queue] = None + self._live_caller_write: Optional[Callable] = None # Sample rates self.input_sample_rate = 16000 # For Whisper @@ -325,15 +325,13 @@ class AudioService: self._caller_stop_event.set() def _start_live_caller_stream(self): - """Start persistent output stream for live caller audio""" + """Start persistent output stream with ring buffer jitter absorption""" if self._live_caller_stream is not None: return if self.output_device is None: return - self._live_caller_queue = queue.Queue() - device_info = sd.query_devices(self.output_device) num_channels = device_info['max_output_channels'] device_sr = int(device_info['default_samplerate']) @@ -343,22 +341,52 @@ class AudioService: self._live_caller_num_channels = num_channels self._live_caller_channel_idx = channel_idx + # Ring buffer: 3 seconds capacity, 150ms pre-buffer before playback starts + ring_size = int(device_sr * 3) + ring = np.zeros(ring_size, dtype=np.float32) + prebuffer_samples = int(device_sr * 0.15) + # Mutable state shared between writer (main thread) and reader (audio callback) + # CPython GIL makes individual int reads/writes atomic + state = {"write_pos": 0, "read_pos": 0, "avail": 0, "started": False} + + def write_audio(data): + n = len(data) + wp = state["write_pos"] + if wp + n <= ring_size: + ring[wp:wp + n] = data + else: + first = ring_size - wp + ring[wp:] = data[:first] + ring[:n - first] = data[first:] + state["write_pos"] = (wp + n) % ring_size + state["avail"] += n + def callback(outdata, frames, time_info, status): outdata.fill(0) - written = 0 - while written < frames: - try: - chunk = self._live_caller_queue.get_nowait() - end = min(written + len(chunk), frames) - count = end - written - outdata[written:end, channel_idx] = chunk[:count] - if count < len(chunk): - # Put remainder back (rare) - leftover = chunk[count:] - self._live_caller_queue.put(leftover) - written = end - except Exception: - break + avail = state["avail"] + + if not state["started"]: + if avail >= prebuffer_samples: + state["started"] = True + else: + return + + if avail < frames: + # Underrun — stop and re-buffer + state["started"] = False + return + + rp = state["read_pos"] + if rp + frames <= ring_size: + outdata[:frames, channel_idx] = ring[rp:rp + frames] + else: + first = ring_size - rp + outdata[:first, channel_idx] = ring[rp:] + outdata[first:frames, channel_idx] = ring[:frames - first] + state["read_pos"] = (rp + frames) % ring_size + state["avail"] -= frames + + self._live_caller_write = write_audio self._live_caller_stream = sd.OutputStream( device=self.output_device, @@ -369,7 +397,7 @@ class AudioService: blocksize=1024, ) self._live_caller_stream.start() - print(f"[Audio] Live caller stream started on ch {self.live_caller_channel} @ {device_sr}Hz") + print(f"[Audio] Live caller stream started on ch {self.live_caller_channel} @ {device_sr}Hz (prebuffer {prebuffer_samples} samples)") def _stop_live_caller_stream(self): """Stop persistent live caller output stream""" @@ -377,7 +405,7 @@ class AudioService: self._live_caller_stream.stop() self._live_caller_stream.close() self._live_caller_stream = None - self._live_caller_queue = None + self._live_caller_write = None print("[Audio] Live caller stream stopped") def route_real_caller_audio(self, pcm_data: bytes, sample_rate: int): @@ -385,14 +413,12 @@ class AudioService: if self.output_device is None: return - # Ensure persistent stream is running if self._live_caller_stream is None: self._start_live_caller_stream() try: audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0 - # Simple decimation/interpolation instead of librosa device_sr = self._live_caller_device_sr if sample_rate != device_sr: ratio = device_sr / sample_rate @@ -401,7 +427,8 @@ class AudioService: indices = np.clip(indices, 0, len(audio) - 1) audio = audio[indices] - self._live_caller_queue.put(audio) + if self._live_caller_write: + self._live_caller_write(audio) except Exception as e: print(f"Real caller audio routing error: {e}") diff --git a/frontend/call-in.html b/frontend/call-in.html index aa72937..77affa5 100644 --- a/frontend/call-in.html +++ b/frontend/call-in.html @@ -150,6 +150,6 @@ - + diff --git a/frontend/js/call-in.js b/frontend/js/call-in.js index a7c4b76..fde133b 100644 --- a/frontend/js/call-in.js +++ b/frontend/js/call-in.js @@ -46,7 +46,7 @@ class CallerProcessor extends AudioWorkletProcessor { constructor() { super(); this.buffer = []; - this.targetSamples = 640; // 40ms at 16kHz — low latency + this.targetSamples = 960; // 60ms at 16kHz } process(inputs) { const input = inputs[0][0]; @@ -84,7 +84,7 @@ class PlaybackProcessor extends AudioWorkletProcessor { this.readPos = 0; this.available = 0; this.started = false; - this.jitterMs = 80; // buffer 80ms before starting playback + this.jitterMs = 150; // buffer 150ms before starting playback this.jitterSamples = Math.floor(16000 * this.jitterMs / 1000); this.port.onmessage = (e) => {