diff --git a/backend/main.py b/backend/main.py index edf3e03..32435ed 100644 --- a/backend/main.py +++ b/backend/main.py @@ -872,25 +872,38 @@ async def caller_audio_stream(websocket: WebSocket): # --- Host Audio Broadcast --- -async def _broadcast_host_audio(pcm_bytes: bytes): - """Send host mic audio to all active real callers""" - for caller_id in list(caller_service.active_calls.keys()): - try: - await caller_service.send_audio_to_caller(caller_id, pcm_bytes, 16000) - except Exception: - pass +_host_audio_queue: asyncio.Queue = None +_host_audio_task: asyncio.Task = None + + +async def _host_audio_sender(): + """Persistent task that drains audio queue and sends to callers""" + while True: + pcm_bytes = await _host_audio_queue.get() + for caller_id in list(caller_service.active_calls.keys()): + try: + await caller_service.send_audio_to_caller(caller_id, pcm_bytes, 16000) + except Exception: + pass + + +def _start_host_audio_sender(): + """Start the persistent host audio sender task""" + global _host_audio_queue, _host_audio_task + if _host_audio_queue is None: + _host_audio_queue = asyncio.Queue(maxsize=100) + if _host_audio_task is None or _host_audio_task.done(): + _host_audio_task = asyncio.create_task(_host_audio_sender()) def _host_audio_sync_callback(pcm_bytes: bytes): - """Sync wrapper to schedule async broadcast from audio thread""" + """Sync callback from audio thread — push to queue for async sending""" + if _host_audio_queue is None: + return try: - loop = asyncio.get_event_loop() - if loop.is_running(): - loop.call_soon_threadsafe( - asyncio.ensure_future, _broadcast_host_audio(pcm_bytes) - ) - except Exception: - pass + _host_audio_queue.put_nowait(pcm_bytes) + except asyncio.QueueFull: + pass # Drop frame rather than block # --- Queue Endpoints --- @@ -920,6 +933,7 @@ async def take_call_from_queue(caller_id: str): # Start host mic streaming if this is the first real caller if len(caller_service.active_calls) == 1: + _start_host_audio_sender() audio_service.start_host_stream(_host_audio_sync_callback) return { diff --git a/backend/services/audio.py b/backend/services/audio.py index cc18cb4..1d37499 100644 --- a/backend/services/audio.py +++ b/backend/services/audio.py @@ -350,10 +350,10 @@ class AudioService: self._live_caller_num_channels = num_channels self._live_caller_channel_idx = channel_idx - # Ring buffer: 3 seconds capacity, 150ms pre-buffer before playback starts + # Ring buffer: 3 seconds capacity, 80ms pre-buffer before playback starts ring_size = int(device_sr * 3) ring = np.zeros(ring_size, dtype=np.float32) - prebuffer_samples = int(device_sr * 0.15) + prebuffer_samples = int(device_sr * 0.08) # Mutable state shared between writer (main thread) and reader (audio callback) # CPython GIL makes individual int reads/writes atomic state = {"write_pos": 0, "read_pos": 0, "avail": 0, "started": False} @@ -459,6 +459,11 @@ class AudioService: record_channel = min(self.input_channel, max_channels) - 1 step = max(1, int(device_sr / 16000)) + # Buffer host mic to send ~60ms chunks instead of tiny 21ms ones + host_accum = [] + host_accum_samples = [0] + send_threshold = 960 # 60ms at 16kHz + def callback(indata, frames, time_info, status): # Capture for push-to-talk recording if active if self._recording: @@ -470,8 +475,16 @@ class AudioService: # Simple decimation to ~16kHz if step > 1: mono = mono[::step] - pcm = (mono * 32767).astype(np.int16).tobytes() - self._host_send_callback(pcm) + + host_accum.append(mono.copy()) + host_accum_samples[0] += len(mono) + + if host_accum_samples[0] >= send_threshold: + combined = np.concatenate(host_accum) + pcm = (combined * 32767).astype(np.int16).tobytes() + host_accum.clear() + host_accum_samples[0] = 0 + self._host_send_callback(pcm) self._host_device_sr = device_sr self._host_stream = sd.InputStream( diff --git a/frontend/call-in.html b/frontend/call-in.html index 77affa5..055cbbe 100644 --- a/frontend/call-in.html +++ b/frontend/call-in.html @@ -150,6 +150,6 @@ - + diff --git a/frontend/js/call-in.js b/frontend/js/call-in.js index fde133b..7293c37 100644 --- a/frontend/js/call-in.js +++ b/frontend/js/call-in.js @@ -84,7 +84,7 @@ class PlaybackProcessor extends AudioWorkletProcessor { this.readPos = 0; this.available = 0; this.started = false; - this.jitterMs = 150; // buffer 150ms before starting playback + this.jitterMs = 100; // buffer 100ms before starting playback this.jitterSamples = Math.floor(16000 * this.jitterMs / 1000); this.port.onmessage = (e) => {