Covers PitchShifter passthrough, frequency shift direction, output length preservation, semitone clamping. AudioPipeline tests verify pitch property defaults, get/set, clamping, and dequeue integration.
272 lines
7.5 KiB
Python
272 lines
7.5 KiB
Python
"""Tests for AudioPipeline."""
|
|
|
|
import struct
|
|
|
|
import numpy as np
|
|
|
|
from tuimble.audio import FRAME_SIZE, AudioPipeline, _apply_gain
|
|
|
|
|
|
def test_default_construction():
|
|
ap = AudioPipeline()
|
|
assert ap.capturing is False
|
|
assert ap.deafened is False
|
|
assert ap.input_gain == 1.0
|
|
assert ap.output_gain == 1.0
|
|
|
|
|
|
def test_custom_construction():
|
|
ap = AudioPipeline(
|
|
sample_rate=24000, frame_size=480, input_device=1, output_device=2
|
|
)
|
|
# Verify via public behavior: get_capture_frame returns None
|
|
assert ap.get_capture_frame() is None
|
|
|
|
|
|
def test_capturing_toggle():
|
|
ap = AudioPipeline()
|
|
assert ap.capturing is False
|
|
ap.capturing = True
|
|
assert ap.capturing is True
|
|
ap.capturing = False
|
|
assert ap.capturing is False
|
|
|
|
|
|
def test_get_capture_frame_empty():
|
|
ap = AudioPipeline()
|
|
assert ap.get_capture_frame() is None
|
|
|
|
|
|
def test_capture_and_retrieve():
|
|
"""Capture callback queues frames; get_capture_frame retrieves them."""
|
|
ap = AudioPipeline()
|
|
ap.capturing = True
|
|
pcm = b"\x01\x02\x03\x04"
|
|
ap._capture_callback(pcm, 2, None, None)
|
|
assert ap.get_capture_frame() == pcm
|
|
assert ap.get_capture_frame() is None
|
|
|
|
|
|
def test_queue_playback_and_callback():
|
|
"""Verify playback callback writes PCM directly to output buffer."""
|
|
ap = AudioPipeline()
|
|
frame_bytes = FRAME_SIZE * 2 # 16-bit mono = 2 bytes per sample
|
|
pcm = bytes(range(256)) * (frame_bytes // 256) + bytes(range(frame_bytes % 256))
|
|
assert len(pcm) == frame_bytes
|
|
|
|
ap.queue_playback(pcm)
|
|
|
|
outdata = bytearray(frame_bytes)
|
|
ap._playback_callback(outdata, FRAME_SIZE, None, None)
|
|
assert bytes(outdata) == pcm
|
|
|
|
|
|
def test_playback_callback_silence_on_empty():
|
|
"""Empty queue produces silence."""
|
|
ap = AudioPipeline()
|
|
frame_bytes = FRAME_SIZE * 2
|
|
outdata = bytearray(b"\xff" * frame_bytes)
|
|
ap._playback_callback(outdata, FRAME_SIZE, None, None)
|
|
assert outdata == bytearray(frame_bytes) # all zeros
|
|
|
|
|
|
def test_playback_callback_short_pcm_pads_silence():
|
|
"""PCM shorter than output buffer gets zero-padded."""
|
|
ap = AudioPipeline()
|
|
frame_bytes = FRAME_SIZE * 2
|
|
short_pcm = b"\x42" * 100
|
|
ap.queue_playback(short_pcm)
|
|
|
|
outdata = bytearray(frame_bytes)
|
|
ap._playback_callback(outdata, FRAME_SIZE, None, None)
|
|
assert outdata[:100] == bytearray(b"\x42" * 100)
|
|
assert outdata[100:] == bytearray(frame_bytes - 100)
|
|
|
|
|
|
def test_queue_playback_overflow_drops():
|
|
"""Full queue drops new data silently."""
|
|
ap = AudioPipeline(frame_size=FRAME_SIZE)
|
|
# Use non-zero PCM so we can distinguish from silence
|
|
frame = b"\x42\x42"
|
|
for _ in range(50): # maxsize=50
|
|
ap.queue_playback(frame)
|
|
# This should not raise (dropped because queue is full)
|
|
ap.queue_playback(b"\xff\xff")
|
|
# Drain and count -- frames with our marker byte
|
|
count = 0
|
|
for _ in range(60): # more than queue size
|
|
outdata = bytearray(2)
|
|
ap._playback_callback(outdata, 1, None, None)
|
|
if outdata != bytearray(2):
|
|
count += 1
|
|
assert count == 50
|
|
|
|
|
|
def test_deafened_toggle():
|
|
ap = AudioPipeline()
|
|
assert ap.deafened is False
|
|
ap.deafened = True
|
|
assert ap.deafened is True
|
|
ap.deafened = False
|
|
assert ap.deafened is False
|
|
|
|
|
|
def test_queue_playback_discards_when_deafened():
|
|
"""Incoming PCM is dropped when deafened."""
|
|
ap = AudioPipeline()
|
|
ap.deafened = True
|
|
ap.queue_playback(b"\x42" * 100)
|
|
# Nothing to play back
|
|
outdata = bytearray(200)
|
|
ap._playback_callback(outdata, 100, None, None)
|
|
assert outdata == bytearray(200) # silence
|
|
|
|
|
|
def test_playback_callback_silence_when_deafened():
|
|
"""Playback callback writes silence when deafened, even with queued data."""
|
|
ap = AudioPipeline()
|
|
frame_bytes = FRAME_SIZE * 2
|
|
pcm = b"\x42" * frame_bytes
|
|
ap.queue_playback(pcm)
|
|
ap.deafened = True
|
|
|
|
outdata = bytearray(b"\xff" * frame_bytes)
|
|
ap._playback_callback(outdata, FRAME_SIZE, None, None)
|
|
assert outdata == bytearray(frame_bytes) # all zeros
|
|
|
|
|
|
def test_stop_without_start():
|
|
"""Stop on unstarted pipeline should not raise."""
|
|
ap = AudioPipeline()
|
|
ap.stop()
|
|
|
|
|
|
def test_stop_drains_queues():
|
|
"""Queues are empty after stop()."""
|
|
ap = AudioPipeline()
|
|
ap.capturing = True
|
|
ap._capture_callback(b"\x00\x00", 1, None, None)
|
|
ap.queue_playback(b"\x00\x00")
|
|
ap.stop()
|
|
assert ap.get_capture_frame() is None
|
|
# Playback queue also drained -- callback produces silence
|
|
outdata = bytearray(2)
|
|
ap._playback_callback(outdata, 1, None, None)
|
|
assert outdata == bytearray(2)
|
|
|
|
|
|
# -- _apply_gain tests -------------------------------------------------------
|
|
|
|
|
|
def test_apply_gain_unity():
|
|
"""Gain 1.0 returns identical samples."""
|
|
pcm = struct.pack("<4h", 100, -200, 32767, -32768)
|
|
assert _apply_gain(pcm, 1.0) == pcm
|
|
|
|
|
|
def test_apply_gain_double():
|
|
"""Gain 2.0 doubles sample values."""
|
|
pcm = struct.pack("<2h", 100, -100)
|
|
result = struct.unpack("<2h", _apply_gain(pcm, 2.0))
|
|
assert result == (200, -200)
|
|
|
|
|
|
def test_apply_gain_clips():
|
|
"""Values exceeding int16 range are clipped."""
|
|
pcm = struct.pack("<2h", 20000, -20000)
|
|
result = struct.unpack("<2h", _apply_gain(pcm, 2.0))
|
|
assert result == (32767, -32768)
|
|
|
|
|
|
def test_apply_gain_zero():
|
|
"""Gain 0.0 produces silence."""
|
|
pcm = struct.pack("<2h", 1000, -1000)
|
|
result = struct.unpack("<2h", _apply_gain(pcm, 0.0))
|
|
assert result == (0, 0)
|
|
|
|
|
|
def test_apply_gain_empty():
|
|
"""Empty buffer returns empty."""
|
|
assert _apply_gain(b"", 2.0) == b""
|
|
|
|
|
|
# -- gain property tests ------------------------------------------------------
|
|
|
|
|
|
def test_gain_defaults():
|
|
ap = AudioPipeline()
|
|
assert ap.input_gain == 1.0
|
|
assert ap.output_gain == 1.0
|
|
|
|
|
|
def test_gain_clamping():
|
|
ap = AudioPipeline()
|
|
ap.input_gain = 3.0
|
|
assert ap.input_gain == 2.0
|
|
ap.output_gain = -1.0
|
|
assert ap.output_gain == 0.0
|
|
|
|
|
|
def test_capture_callback_applies_input_gain():
|
|
"""Input gain is applied to captured PCM."""
|
|
ap = AudioPipeline()
|
|
ap.capturing = True
|
|
ap.input_gain = 0.5
|
|
pcm = struct.pack("<2h", 1000, -1000)
|
|
ap._capture_callback(pcm, 2, None, None)
|
|
frame = ap.get_capture_frame()
|
|
result = struct.unpack("<2h", frame)
|
|
assert result == (500, -500)
|
|
|
|
|
|
def test_playback_callback_applies_output_gain():
|
|
"""Output gain is applied during playback."""
|
|
ap = AudioPipeline()
|
|
ap.output_gain = 0.5
|
|
pcm = struct.pack("<2h", 1000, -1000)
|
|
ap.queue_playback(pcm)
|
|
outdata = bytearray(4)
|
|
ap._playback_callback(outdata, 2, None, None)
|
|
result = struct.unpack("<2h", bytes(outdata))
|
|
assert result == (500, -500)
|
|
|
|
|
|
# -- pitch property tests ----------------------------------------------------
|
|
|
|
|
|
def test_pitch_default():
|
|
ap = AudioPipeline()
|
|
assert ap.pitch == 0.0
|
|
|
|
|
|
def test_pitch_set_and_get():
|
|
ap = AudioPipeline()
|
|
ap.pitch = 5.0
|
|
assert ap.pitch == 5.0
|
|
ap.pitch = -3.0
|
|
assert ap.pitch == -3.0
|
|
|
|
|
|
def test_pitch_clamping():
|
|
ap = AudioPipeline()
|
|
ap.pitch = 20.0
|
|
assert ap.pitch == 12.0
|
|
ap.pitch = -20.0
|
|
assert ap.pitch == -12.0
|
|
|
|
|
|
def test_pitch_applied_on_dequeue():
|
|
"""Pitch shifting runs in get_capture_frame, not the callback."""
|
|
ap = AudioPipeline()
|
|
ap.capturing = True
|
|
ap.pitch = 4.0
|
|
|
|
t = np.arange(FRAME_SIZE) / 48000.0
|
|
pcm = (np.sin(2 * np.pi * 440.0 * t) * 16000).astype(np.int16).tobytes()
|
|
|
|
ap._capture_callback(pcm, FRAME_SIZE, None, None)
|
|
frame = ap.get_capture_frame()
|
|
assert frame is not None
|
|
assert len(frame) == len(pcm)
|
|
assert frame != pcm
|