Replace queue with ring buffer jitter absorption for live caller audio
- Server: 150ms pre-buffer ring buffer eliminates gaps from timing mismatches - Browser playback: 150ms jitter buffer (up from 80ms) for network jitter - Capture chunks: 960 samples/60ms (better network efficiency) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -55,7 +55,7 @@ class AudioService:
|
||||
|
||||
# Live caller routing state
|
||||
self._live_caller_stream: Optional[sd.OutputStream] = None
|
||||
self._live_caller_queue: Optional[queue.Queue] = None
|
||||
self._live_caller_write: Optional[Callable] = None
|
||||
|
||||
# Sample rates
|
||||
self.input_sample_rate = 16000 # For Whisper
|
||||
@@ -325,15 +325,13 @@ class AudioService:
|
||||
self._caller_stop_event.set()
|
||||
|
||||
def _start_live_caller_stream(self):
|
||||
"""Start persistent output stream for live caller audio"""
|
||||
"""Start persistent output stream with ring buffer jitter absorption"""
|
||||
if self._live_caller_stream is not None:
|
||||
return
|
||||
|
||||
if self.output_device is None:
|
||||
return
|
||||
|
||||
self._live_caller_queue = queue.Queue()
|
||||
|
||||
device_info = sd.query_devices(self.output_device)
|
||||
num_channels = device_info['max_output_channels']
|
||||
device_sr = int(device_info['default_samplerate'])
|
||||
@@ -343,22 +341,52 @@ class AudioService:
|
||||
self._live_caller_num_channels = num_channels
|
||||
self._live_caller_channel_idx = channel_idx
|
||||
|
||||
# Ring buffer: 3 seconds capacity, 150ms pre-buffer before playback starts
|
||||
ring_size = int(device_sr * 3)
|
||||
ring = np.zeros(ring_size, dtype=np.float32)
|
||||
prebuffer_samples = int(device_sr * 0.15)
|
||||
# Mutable state shared between writer (main thread) and reader (audio callback)
|
||||
# CPython GIL makes individual int reads/writes atomic
|
||||
state = {"write_pos": 0, "read_pos": 0, "avail": 0, "started": False}
|
||||
|
||||
def write_audio(data):
|
||||
n = len(data)
|
||||
wp = state["write_pos"]
|
||||
if wp + n <= ring_size:
|
||||
ring[wp:wp + n] = data
|
||||
else:
|
||||
first = ring_size - wp
|
||||
ring[wp:] = data[:first]
|
||||
ring[:n - first] = data[first:]
|
||||
state["write_pos"] = (wp + n) % ring_size
|
||||
state["avail"] += n
|
||||
|
||||
def callback(outdata, frames, time_info, status):
|
||||
outdata.fill(0)
|
||||
written = 0
|
||||
while written < frames:
|
||||
try:
|
||||
chunk = self._live_caller_queue.get_nowait()
|
||||
end = min(written + len(chunk), frames)
|
||||
count = end - written
|
||||
outdata[written:end, channel_idx] = chunk[:count]
|
||||
if count < len(chunk):
|
||||
# Put remainder back (rare)
|
||||
leftover = chunk[count:]
|
||||
self._live_caller_queue.put(leftover)
|
||||
written = end
|
||||
except Exception:
|
||||
break
|
||||
avail = state["avail"]
|
||||
|
||||
if not state["started"]:
|
||||
if avail >= prebuffer_samples:
|
||||
state["started"] = True
|
||||
else:
|
||||
return
|
||||
|
||||
if avail < frames:
|
||||
# Underrun — stop and re-buffer
|
||||
state["started"] = False
|
||||
return
|
||||
|
||||
rp = state["read_pos"]
|
||||
if rp + frames <= ring_size:
|
||||
outdata[:frames, channel_idx] = ring[rp:rp + frames]
|
||||
else:
|
||||
first = ring_size - rp
|
||||
outdata[:first, channel_idx] = ring[rp:]
|
||||
outdata[first:frames, channel_idx] = ring[:frames - first]
|
||||
state["read_pos"] = (rp + frames) % ring_size
|
||||
state["avail"] -= frames
|
||||
|
||||
self._live_caller_write = write_audio
|
||||
|
||||
self._live_caller_stream = sd.OutputStream(
|
||||
device=self.output_device,
|
||||
@@ -369,7 +397,7 @@ class AudioService:
|
||||
blocksize=1024,
|
||||
)
|
||||
self._live_caller_stream.start()
|
||||
print(f"[Audio] Live caller stream started on ch {self.live_caller_channel} @ {device_sr}Hz")
|
||||
print(f"[Audio] Live caller stream started on ch {self.live_caller_channel} @ {device_sr}Hz (prebuffer {prebuffer_samples} samples)")
|
||||
|
||||
def _stop_live_caller_stream(self):
|
||||
"""Stop persistent live caller output stream"""
|
||||
@@ -377,7 +405,7 @@ class AudioService:
|
||||
self._live_caller_stream.stop()
|
||||
self._live_caller_stream.close()
|
||||
self._live_caller_stream = None
|
||||
self._live_caller_queue = None
|
||||
self._live_caller_write = None
|
||||
print("[Audio] Live caller stream stopped")
|
||||
|
||||
def route_real_caller_audio(self, pcm_data: bytes, sample_rate: int):
|
||||
@@ -385,14 +413,12 @@ class AudioService:
|
||||
if self.output_device is None:
|
||||
return
|
||||
|
||||
# Ensure persistent stream is running
|
||||
if self._live_caller_stream is None:
|
||||
self._start_live_caller_stream()
|
||||
|
||||
try:
|
||||
audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
|
||||
|
||||
# Simple decimation/interpolation instead of librosa
|
||||
device_sr = self._live_caller_device_sr
|
||||
if sample_rate != device_sr:
|
||||
ratio = device_sr / sample_rate
|
||||
@@ -401,7 +427,8 @@ class AudioService:
|
||||
indices = np.clip(indices, 0, len(audio) - 1)
|
||||
audio = audio[indices]
|
||||
|
||||
self._live_caller_queue.put(audio)
|
||||
if self._live_caller_write:
|
||||
self._live_caller_write(audio)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Real caller audio routing error: {e}")
|
||||
|
||||
@@ -150,6 +150,6 @@
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<script src="/js/call-in.js?v=2"></script>
|
||||
<script src="/js/call-in.js?v=3"></script>
|
||||
</body>
|
||||
</html>
|
||||
|
||||
@@ -46,7 +46,7 @@ class CallerProcessor extends AudioWorkletProcessor {
|
||||
constructor() {
|
||||
super();
|
||||
this.buffer = [];
|
||||
this.targetSamples = 640; // 40ms at 16kHz — low latency
|
||||
this.targetSamples = 960; // 60ms at 16kHz
|
||||
}
|
||||
process(inputs) {
|
||||
const input = inputs[0][0];
|
||||
@@ -84,7 +84,7 @@ class PlaybackProcessor extends AudioWorkletProcessor {
|
||||
this.readPos = 0;
|
||||
this.available = 0;
|
||||
this.started = false;
|
||||
this.jitterMs = 80; // buffer 80ms before starting playback
|
||||
this.jitterMs = 150; // buffer 150ms before starting playback
|
||||
this.jitterSamples = Math.floor(16000 * this.jitterMs / 1000);
|
||||
|
||||
this.port.onmessage = (e) => {
|
||||
|
||||
Reference in New Issue
Block a user