206 lines
5.8 KiB
Python
206 lines
5.8 KiB
Python
"""Tests for AudioPipeline."""
|
|
|
|
import struct
|
|
|
|
from tuimble.audio import FRAME_SIZE, SAMPLE_RATE, AudioPipeline, _apply_gain
|
|
|
|
|
|
def test_default_construction():
|
|
ap = AudioPipeline()
|
|
assert ap._sample_rate == SAMPLE_RATE
|
|
assert ap._frame_size == FRAME_SIZE
|
|
assert ap._input_device is None
|
|
assert ap._output_device is None
|
|
assert ap.capturing is False
|
|
assert ap.deafened is False
|
|
|
|
|
|
def test_custom_construction():
|
|
ap = AudioPipeline(sample_rate=24000, frame_size=480,
|
|
input_device=1, output_device=2)
|
|
assert ap._sample_rate == 24000
|
|
assert ap._frame_size == 480
|
|
assert ap._input_device == 1
|
|
assert ap._output_device == 2
|
|
|
|
|
|
def test_capturing_toggle():
|
|
ap = AudioPipeline()
|
|
assert ap.capturing is False
|
|
ap.capturing = True
|
|
assert ap.capturing is True
|
|
ap.capturing = False
|
|
assert ap.capturing is False
|
|
|
|
|
|
def test_get_capture_frame_empty():
|
|
ap = AudioPipeline()
|
|
assert ap.get_capture_frame() is None
|
|
|
|
|
|
def test_get_capture_frame_returns_queued():
|
|
ap = AudioPipeline()
|
|
ap._capture_queue.put(b"\x01\x02\x03")
|
|
assert ap.get_capture_frame() == b"\x01\x02\x03"
|
|
assert ap.get_capture_frame() is None
|
|
|
|
|
|
def test_queue_playback_and_callback():
|
|
"""Verify playback callback writes PCM directly to output buffer."""
|
|
ap = AudioPipeline()
|
|
frame_bytes = FRAME_SIZE * 2 # 16-bit mono = 2 bytes per sample
|
|
pcm = bytes(range(256)) * (frame_bytes // 256) + bytes(range(frame_bytes % 256))
|
|
assert len(pcm) == frame_bytes
|
|
|
|
ap.queue_playback(pcm)
|
|
|
|
outdata = bytearray(frame_bytes)
|
|
ap._playback_callback(outdata, FRAME_SIZE, None, None)
|
|
assert bytes(outdata) == pcm
|
|
|
|
|
|
def test_playback_callback_silence_on_empty():
|
|
"""Empty queue produces silence."""
|
|
ap = AudioPipeline()
|
|
frame_bytes = FRAME_SIZE * 2
|
|
outdata = bytearray(b"\xff" * frame_bytes)
|
|
ap._playback_callback(outdata, FRAME_SIZE, None, None)
|
|
assert outdata == bytearray(frame_bytes) # all zeros
|
|
|
|
|
|
def test_playback_callback_short_pcm_pads_silence():
|
|
"""PCM shorter than output buffer gets zero-padded."""
|
|
ap = AudioPipeline()
|
|
frame_bytes = FRAME_SIZE * 2
|
|
short_pcm = b"\x42" * 100
|
|
ap.queue_playback(short_pcm)
|
|
|
|
outdata = bytearray(frame_bytes)
|
|
ap._playback_callback(outdata, FRAME_SIZE, None, None)
|
|
assert outdata[:100] == bytearray(b"\x42" * 100)
|
|
assert outdata[100:] == bytearray(frame_bytes - 100)
|
|
|
|
|
|
def test_queue_playback_overflow_drops():
|
|
"""Full queue drops new data silently."""
|
|
ap = AudioPipeline(frame_size=FRAME_SIZE)
|
|
# Fill the queue
|
|
for i in range(ap._playback_queue.maxsize):
|
|
ap.queue_playback(b"\x00")
|
|
# This should not raise
|
|
ap.queue_playback(b"\xff")
|
|
assert ap._playback_queue.qsize() == ap._playback_queue.maxsize
|
|
|
|
|
|
def test_deafened_toggle():
|
|
ap = AudioPipeline()
|
|
assert ap.deafened is False
|
|
ap.deafened = True
|
|
assert ap.deafened is True
|
|
ap.deafened = False
|
|
assert ap.deafened is False
|
|
|
|
|
|
def test_queue_playback_discards_when_deafened():
|
|
"""Incoming PCM is dropped when deafened."""
|
|
ap = AudioPipeline()
|
|
ap.deafened = True
|
|
ap.queue_playback(b"\x42" * 100)
|
|
assert ap._playback_queue.qsize() == 0
|
|
|
|
|
|
def test_playback_callback_silence_when_deafened():
|
|
"""Playback callback writes silence when deafened, even with queued data."""
|
|
ap = AudioPipeline()
|
|
frame_bytes = FRAME_SIZE * 2
|
|
# Queue data before deafening
|
|
pcm = b"\x42" * frame_bytes
|
|
ap.queue_playback(pcm)
|
|
ap.deafened = True
|
|
|
|
outdata = bytearray(b"\xff" * frame_bytes)
|
|
ap._playback_callback(outdata, FRAME_SIZE, None, None)
|
|
assert outdata == bytearray(frame_bytes) # all zeros
|
|
|
|
|
|
def test_stop_without_start():
|
|
"""Stop on unstarted pipeline should not raise."""
|
|
ap = AudioPipeline()
|
|
ap.stop()
|
|
|
|
|
|
# -- _apply_gain tests -------------------------------------------------------
|
|
|
|
|
|
def test_apply_gain_unity():
|
|
"""Gain 1.0 returns identical samples."""
|
|
pcm = struct.pack("<4h", 100, -200, 32767, -32768)
|
|
assert _apply_gain(pcm, 1.0) == pcm
|
|
|
|
|
|
def test_apply_gain_double():
|
|
"""Gain 2.0 doubles sample values."""
|
|
pcm = struct.pack("<2h", 100, -100)
|
|
result = struct.unpack("<2h", _apply_gain(pcm, 2.0))
|
|
assert result == (200, -200)
|
|
|
|
|
|
def test_apply_gain_clips():
|
|
"""Values exceeding int16 range are clipped."""
|
|
pcm = struct.pack("<2h", 20000, -20000)
|
|
result = struct.unpack("<2h", _apply_gain(pcm, 2.0))
|
|
assert result == (32767, -32768)
|
|
|
|
|
|
def test_apply_gain_zero():
|
|
"""Gain 0.0 produces silence."""
|
|
pcm = struct.pack("<2h", 1000, -1000)
|
|
result = struct.unpack("<2h", _apply_gain(pcm, 0.0))
|
|
assert result == (0, 0)
|
|
|
|
|
|
def test_apply_gain_empty():
|
|
"""Empty buffer returns empty."""
|
|
assert _apply_gain(b"", 2.0) == b""
|
|
|
|
|
|
# -- gain property tests ------------------------------------------------------
|
|
|
|
|
|
def test_gain_defaults():
|
|
ap = AudioPipeline()
|
|
assert ap.input_gain == 1.0
|
|
assert ap.output_gain == 1.0
|
|
|
|
|
|
def test_gain_clamping():
|
|
ap = AudioPipeline()
|
|
ap.input_gain = 3.0
|
|
assert ap.input_gain == 2.0
|
|
ap.output_gain = -1.0
|
|
assert ap.output_gain == 0.0
|
|
|
|
|
|
def test_capture_callback_applies_input_gain():
|
|
"""Input gain is applied to captured PCM."""
|
|
ap = AudioPipeline()
|
|
ap.capturing = True
|
|
ap.input_gain = 0.5
|
|
pcm = struct.pack("<2h", 1000, -1000)
|
|
ap._capture_callback(pcm, 2, None, None)
|
|
frame = ap.get_capture_frame()
|
|
result = struct.unpack("<2h", frame)
|
|
assert result == (500, -500)
|
|
|
|
|
|
def test_playback_callback_applies_output_gain():
|
|
"""Output gain is applied during playback."""
|
|
ap = AudioPipeline()
|
|
ap.output_gain = 0.5
|
|
pcm = struct.pack("<2h", 1000, -1000)
|
|
ap.queue_playback(pcm)
|
|
outdata = bytearray(4)
|
|
ap._playback_callback(outdata, 2, None, None)
|
|
result = struct.unpack("<2h", bytes(outdata))
|
|
assert result == (500, -500)
|