From e9944c88ebd9f53c07fb22bc8f970f70ab3aaa78 Mon Sep 17 00:00:00 2001 From: Username Date: Tue, 24 Feb 2026 14:14:52 +0100 Subject: [PATCH] audio: add gain control to capture and playback --- src/tuimble/audio.py | 37 +++++++++++++++++++- tests/test_audio.py | 80 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 115 insertions(+), 2 deletions(-) diff --git a/src/tuimble/audio.py b/src/tuimble/audio.py index 1715d71..0de932f 100644 --- a/src/tuimble/audio.py +++ b/src/tuimble/audio.py @@ -9,6 +9,7 @@ from __future__ import annotations import logging import queue +import struct log = logging.getLogger(__name__) @@ -18,6 +19,17 @@ FRAME_SIZE = 960 # 20ms at 48kHz DTYPE = "int16" +def _apply_gain(pcm: bytes, gain: float) -> bytes: + """Scale int16 PCM samples by gain factor with clipping.""" + n = len(pcm) // 2 + if n == 0: + return pcm + fmt = f"<{n}h" + samples = struct.unpack(fmt, pcm[: n * 2]) + scaled = [max(-32768, min(32767, int(s * gain))) for s in samples] + return struct.pack(fmt, *scaled) + + class AudioPipeline: """Manages audio input/output streams and Opus codec.""" @@ -40,6 +52,8 @@ class AudioPipeline: self._output_stream = None self._capturing = False self._deafened = False + self._input_gain = 1.0 + self._output_gain = 1.0 def start(self): """Open audio streams.""" @@ -93,13 +107,32 @@ class AudioPipeline: def deafened(self, value: bool): self._deafened = value + @property + def input_gain(self) -> float: + return self._input_gain + + @input_gain.setter + def input_gain(self, value: float): + self._input_gain = max(0.0, min(2.0, value)) + + @property + def output_gain(self) -> float: + return self._output_gain + + @output_gain.setter + def output_gain(self, value: float): + self._output_gain = max(0.0, min(2.0, value)) + def _capture_callback(self, indata, frames, time_info, status): """Called by sounddevice when input data is available.""" if status: log.warning("capture status: %s", status) if self._capturing: try: - self._capture_queue.put_nowait(bytes(indata)) + pcm = bytes(indata) + if self._input_gain != 1.0: + pcm = _apply_gain(pcm, self._input_gain) + self._capture_queue.put_nowait(pcm) except queue.Full: pass @@ -112,6 +145,8 @@ class AudioPipeline: return try: pcm = self._playback_queue.get_nowait() + if self._output_gain != 1.0: + pcm = _apply_gain(pcm, self._output_gain) n = min(len(pcm), len(outdata)) outdata[:n] = pcm[:n] if n < len(outdata): diff --git a/tests/test_audio.py b/tests/test_audio.py index b93d359..433340a 100644 --- a/tests/test_audio.py +++ b/tests/test_audio.py @@ -1,6 +1,8 @@ """Tests for AudioPipeline.""" -from tuimble.audio import FRAME_SIZE, SAMPLE_RATE, AudioPipeline +import struct + +from tuimble.audio import FRAME_SIZE, SAMPLE_RATE, AudioPipeline, _apply_gain def test_default_construction(): @@ -125,3 +127,79 @@ def test_stop_without_start(): """Stop on unstarted pipeline should not raise.""" ap = AudioPipeline() ap.stop() + + +# -- _apply_gain tests ------------------------------------------------------- + + +def test_apply_gain_unity(): + """Gain 1.0 returns identical samples.""" + pcm = struct.pack("<4h", 100, -200, 32767, -32768) + assert _apply_gain(pcm, 1.0) == pcm + + +def test_apply_gain_double(): + """Gain 2.0 doubles sample values.""" + pcm = struct.pack("<2h", 100, -100) + result = struct.unpack("<2h", _apply_gain(pcm, 2.0)) + assert result == (200, -200) + + +def test_apply_gain_clips(): + """Values exceeding int16 range are clipped.""" + pcm = struct.pack("<2h", 20000, -20000) + result = struct.unpack("<2h", _apply_gain(pcm, 2.0)) + assert result == (32767, -32768) + + +def test_apply_gain_zero(): + """Gain 0.0 produces silence.""" + pcm = struct.pack("<2h", 1000, -1000) + result = struct.unpack("<2h", _apply_gain(pcm, 0.0)) + assert result == (0, 0) + + +def test_apply_gain_empty(): + """Empty buffer returns empty.""" + assert _apply_gain(b"", 2.0) == b"" + + +# -- gain property tests ------------------------------------------------------ + + +def test_gain_defaults(): + ap = AudioPipeline() + assert ap.input_gain == 1.0 + assert ap.output_gain == 1.0 + + +def test_gain_clamping(): + ap = AudioPipeline() + ap.input_gain = 3.0 + assert ap.input_gain == 2.0 + ap.output_gain = -1.0 + assert ap.output_gain == 0.0 + + +def test_capture_callback_applies_input_gain(): + """Input gain is applied to captured PCM.""" + ap = AudioPipeline() + ap.capturing = True + ap.input_gain = 0.5 + pcm = struct.pack("<2h", 1000, -1000) + ap._capture_callback(pcm, 2, None, None) + frame = ap.get_capture_frame() + result = struct.unpack("<2h", frame) + assert result == (500, -500) + + +def test_playback_callback_applies_output_gain(): + """Output gain is applied during playback.""" + ap = AudioPipeline() + ap.output_gain = 0.5 + pcm = struct.pack("<2h", 1000, -1000) + ap.queue_playback(pcm) + outdata = bytearray(4) + ap._playback_callback(outdata, 2, None, None) + result = struct.unpack("<2h", bytes(outdata)) + assert result == (500, -500)