audio: add gain control to capture and playback

This commit is contained in:
Username
2026-02-24 14:14:52 +01:00
parent eb98165370
commit e9944c88eb
2 changed files with 115 additions and 2 deletions

View File

@@ -9,6 +9,7 @@ from __future__ import annotations
import logging
import queue
import struct
log = logging.getLogger(__name__)
@@ -18,6 +19,17 @@ FRAME_SIZE = 960 # 20ms at 48kHz
DTYPE = "int16"
def _apply_gain(pcm: bytes, gain: float) -> bytes:
"""Scale int16 PCM samples by gain factor with clipping."""
n = len(pcm) // 2
if n == 0:
return pcm
fmt = f"<{n}h"
samples = struct.unpack(fmt, pcm[: n * 2])
scaled = [max(-32768, min(32767, int(s * gain))) for s in samples]
return struct.pack(fmt, *scaled)
class AudioPipeline:
"""Manages audio input/output streams and Opus codec."""
@@ -40,6 +52,8 @@ class AudioPipeline:
self._output_stream = None
self._capturing = False
self._deafened = False
self._input_gain = 1.0
self._output_gain = 1.0
def start(self):
"""Open audio streams."""
@@ -93,13 +107,32 @@ class AudioPipeline:
def deafened(self, value: bool):
self._deafened = value
@property
def input_gain(self) -> float:
return self._input_gain
@input_gain.setter
def input_gain(self, value: float):
self._input_gain = max(0.0, min(2.0, value))
@property
def output_gain(self) -> float:
return self._output_gain
@output_gain.setter
def output_gain(self, value: float):
self._output_gain = max(0.0, min(2.0, value))
def _capture_callback(self, indata, frames, time_info, status):
"""Called by sounddevice when input data is available."""
if status:
log.warning("capture status: %s", status)
if self._capturing:
try:
self._capture_queue.put_nowait(bytes(indata))
pcm = bytes(indata)
if self._input_gain != 1.0:
pcm = _apply_gain(pcm, self._input_gain)
self._capture_queue.put_nowait(pcm)
except queue.Full:
pass
@@ -112,6 +145,8 @@ class AudioPipeline:
return
try:
pcm = self._playback_queue.get_nowait()
if self._output_gain != 1.0:
pcm = _apply_gain(pcm, self._output_gain)
n = min(len(pcm), len(outdata))
outdata[:n] = pcm[:n]
if n < len(outdata):

View File

@@ -1,6 +1,8 @@
"""Tests for AudioPipeline."""
from tuimble.audio import FRAME_SIZE, SAMPLE_RATE, AudioPipeline
import struct
from tuimble.audio import FRAME_SIZE, SAMPLE_RATE, AudioPipeline, _apply_gain
def test_default_construction():
@@ -125,3 +127,79 @@ def test_stop_without_start():
"""Stop on unstarted pipeline should not raise."""
ap = AudioPipeline()
ap.stop()
# -- _apply_gain tests -------------------------------------------------------
def test_apply_gain_unity():
"""Gain 1.0 returns identical samples."""
pcm = struct.pack("<4h", 100, -200, 32767, -32768)
assert _apply_gain(pcm, 1.0) == pcm
def test_apply_gain_double():
"""Gain 2.0 doubles sample values."""
pcm = struct.pack("<2h", 100, -100)
result = struct.unpack("<2h", _apply_gain(pcm, 2.0))
assert result == (200, -200)
def test_apply_gain_clips():
"""Values exceeding int16 range are clipped."""
pcm = struct.pack("<2h", 20000, -20000)
result = struct.unpack("<2h", _apply_gain(pcm, 2.0))
assert result == (32767, -32768)
def test_apply_gain_zero():
"""Gain 0.0 produces silence."""
pcm = struct.pack("<2h", 1000, -1000)
result = struct.unpack("<2h", _apply_gain(pcm, 0.0))
assert result == (0, 0)
def test_apply_gain_empty():
"""Empty buffer returns empty."""
assert _apply_gain(b"", 2.0) == b""
# -- gain property tests ------------------------------------------------------
def test_gain_defaults():
ap = AudioPipeline()
assert ap.input_gain == 1.0
assert ap.output_gain == 1.0
def test_gain_clamping():
ap = AudioPipeline()
ap.input_gain = 3.0
assert ap.input_gain == 2.0
ap.output_gain = -1.0
assert ap.output_gain == 0.0
def test_capture_callback_applies_input_gain():
"""Input gain is applied to captured PCM."""
ap = AudioPipeline()
ap.capturing = True
ap.input_gain = 0.5
pcm = struct.pack("<2h", 1000, -1000)
ap._capture_callback(pcm, 2, None, None)
frame = ap.get_capture_frame()
result = struct.unpack("<2h", frame)
assert result == (500, -500)
def test_playback_callback_applies_output_gain():
"""Output gain is applied during playback."""
ap = AudioPipeline()
ap.output_gain = 0.5
pcm = struct.pack("<2h", 1000, -1000)
ap.queue_playback(pcm)
outdata = bytearray(4)
ap._playback_callback(outdata, 2, None, None)
result = struct.unpack("<2h", bytes(outdata))
assert result == (500, -500)