feat: add Mumble music playback with Opus streaming

ctypes libopus encoder (src/derp/opus.py), voice varint/packet builder
and stream_audio method on MumbleBot (src/derp/mumble.py), music plugin
with play/stop/skip/queue/np/volume commands (plugins/music.py).
Audio pipeline: yt-dlp|ffmpeg subprocess -> PCM -> Opus -> UDPTunnel.
67 new tests (1561 total).
This commit is contained in:
user
2026-02-21 21:42:28 +01:00
parent b074356ec6
commit 47b13c3f1f
6 changed files with 1232 additions and 1 deletions

View File

@@ -11,6 +11,7 @@ from derp.mumble import (
MSG_PING,
MSG_SERVER_SYNC,
MSG_TEXT_MESSAGE,
MSG_UDPTUNNEL,
MSG_USER_REMOVE,
MSG_USER_STATE,
MumbleBot,
@@ -20,15 +21,19 @@ from derp.mumble import (
_build_ping_payload,
_build_text_message_payload,
_build_version_payload,
_build_voice_packet,
_decode_fields,
_decode_varint,
_encode_field,
_encode_varint,
_encode_voice_varint,
_escape_html,
_field_int,
_field_ints,
_field_str,
_pack_msg,
_scale_pcm,
_shell_quote,
_strip_html,
_unpack_header,
)
@@ -912,3 +917,181 @@ class TestMumbleBotConfig:
}
bot = MumbleBot("test", config, PluginRegistry())
assert bot._proxy is False
# ---------------------------------------------------------------------------
# TestVoiceVarint
# ---------------------------------------------------------------------------
class TestVoiceVarint:
def test_zero(self):
assert _encode_voice_varint(0) == b"\x00"
def test_7bit_max(self):
assert _encode_voice_varint(127) == b"\x7f"
def test_7bit_small(self):
assert _encode_voice_varint(1) == b"\x01"
assert _encode_voice_varint(42) == b"\x2a"
def test_14bit_min(self):
# 128 = 0x80 -> prefix 10, value 128
result = _encode_voice_varint(128)
assert len(result) == 2
assert result[0] & 0xC0 == 0x80 # top 2 bits = 10
def test_14bit_max(self):
result = _encode_voice_varint(0x3FFF)
assert len(result) == 2
def test_21bit(self):
result = _encode_voice_varint(0x4000)
assert len(result) == 3
assert result[0] & 0xE0 == 0xC0 # top 3 bits = 110
def test_28bit(self):
result = _encode_voice_varint(0x200000)
assert len(result) == 4
assert result[0] & 0xF0 == 0xE0 # top 4 bits = 1110
def test_64bit(self):
result = _encode_voice_varint(0x10000000)
assert len(result) == 9
assert result[0] == 0xF0
def test_negative_raises(self):
import pytest
with pytest.raises(ValueError, match="non-negative"):
_encode_voice_varint(-1)
def test_14bit_roundtrip(self):
"""Value encoded and decoded back correctly (manual decode)."""
val = 300
data = _encode_voice_varint(val)
assert len(data) == 2
decoded = ((data[0] & 0x3F) << 8) | data[1]
assert decoded == val
def test_7bit_roundtrip(self):
for v in range(128):
data = _encode_voice_varint(v)
assert len(data) == 1
assert data[0] == v
# ---------------------------------------------------------------------------
# TestVoicePacket
# ---------------------------------------------------------------------------
class TestVoicePacket:
def test_header_byte(self):
pkt = _build_voice_packet(0, b"\xaa\xbb")
assert pkt[0] == 0x80 # type=4, target=0
def test_sequence_encoding(self):
pkt = _build_voice_packet(42, b"\x00")
# byte 0: header, byte 1: sequence=42 (7-bit)
assert pkt[1] == 42
def test_opus_data_present(self):
opus = b"\xde\xad\xbe\xef"
pkt = _build_voice_packet(0, opus)
assert pkt.endswith(opus)
def test_length_field(self):
opus = b"\x00" * 10
pkt = _build_voice_packet(0, opus)
# header(1) + seq(1, val=0) + length(1, val=10) + data(10) = 13
assert len(pkt) == 13
assert pkt[2] == 10 # length varint
def test_terminator_flag(self):
opus = b"\x00" * 5
pkt = _build_voice_packet(0, opus, last=True)
# length with bit 13 set: 5 | 0x2000 = 0x2005
# 0x2005 in 14-bit varint: 10_100000 00000101
length_bytes = _encode_voice_varint(5 | 0x2000)
assert length_bytes in pkt
def test_no_terminator_by_default(self):
opus = b"\x00" * 5
pkt = _build_voice_packet(0, opus, last=False)
# length=5, no bit 13
assert pkt[2] == 5
# ---------------------------------------------------------------------------
# TestPcmScaling
# ---------------------------------------------------------------------------
class TestPcmScaling:
def test_unity_volume(self):
import struct as _s
pcm = _s.pack("<hh", 1000, -1000)
result = _scale_pcm(pcm, 1.0)
assert result == pcm
def test_half_volume(self):
import struct as _s
pcm = _s.pack("<h", 1000)
result = _scale_pcm(pcm, 0.5)
samples = _s.unpack("<h", result)
assert samples[0] == 500
def test_clamp_positive(self):
import struct as _s
pcm = _s.pack("<h", 32767)
result = _scale_pcm(pcm, 2.0)
samples = _s.unpack("<h", result)
assert samples[0] == 32767
def test_clamp_negative(self):
import struct as _s
pcm = _s.pack("<h", -32768)
result = _scale_pcm(pcm, 2.0)
samples = _s.unpack("<h", result)
assert samples[0] == -32768
def test_zero_volume(self):
import struct as _s
pcm = _s.pack("<hh", 32767, -32768)
result = _scale_pcm(pcm, 0.0)
samples = _s.unpack("<hh", result)
assert samples == (0, 0)
def test_preserves_length(self):
pcm = b"\x00" * 1920 # 960 samples
result = _scale_pcm(pcm, 0.5)
assert len(result) == 1920
# ---------------------------------------------------------------------------
# TestShellQuote
# ---------------------------------------------------------------------------
class TestShellQuote:
def test_simple(self):
assert _shell_quote("hello") == "'hello'"
def test_single_quote(self):
assert _shell_quote("it's") == "'it'\\''s'"
def test_url(self):
url = "https://youtube.com/watch?v=abc&t=10"
quoted = _shell_quote(url)
assert quoted.startswith("'")
assert quoted.endswith("'")
# ---------------------------------------------------------------------------
# TestMsgUdpTunnel
# ---------------------------------------------------------------------------
class TestMsgUdpTunnel:
def test_constant(self):
assert MSG_UDPTUNNEL == 1

353
tests/test_music.py Normal file
View File

@@ -0,0 +1,353 @@
"""Tests for the music playback plugin."""
import asyncio
import importlib.util
import sys
from unittest.mock import AsyncMock, MagicMock, patch
# -- Load plugin module directly ---------------------------------------------
_spec = importlib.util.spec_from_file_location("music", "plugins/music.py")
_mod = importlib.util.module_from_spec(_spec)
sys.modules["music"] = _mod
_spec.loader.exec_module(_mod)
# -- Fakes -------------------------------------------------------------------
class _FakeState:
def __init__(self):
self._store: dict[str, dict[str, str]] = {}
def get(self, ns: str, key: str) -> str | None:
return self._store.get(ns, {}).get(key)
def set(self, ns: str, key: str, value: str) -> None:
self._store.setdefault(ns, {})[key] = value
def delete(self, ns: str, key: str) -> None:
self._store.get(ns, {}).pop(key, None)
def keys(self, ns: str) -> list[str]:
return list(self._store.get(ns, {}).keys())
class _FakeBot:
"""Minimal bot for music plugin testing."""
def __init__(self, *, mumble: bool = True):
self.sent: list[tuple[str, str]] = []
self.replied: list[str] = []
self.state = _FakeState()
self._pstate: dict = {}
self._tasks: set[asyncio.Task] = set()
if mumble:
self.stream_audio = AsyncMock()
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
async def reply(self, message, text: str) -> None:
self.replied.append(text)
def _is_admin(self, message) -> bool:
return False
def _spawn(self, coro, *, name=None):
task = asyncio.ensure_future(coro)
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
return task
class _Msg:
"""Minimal message object."""
def __init__(self, text="!play url", nick="Alice", target="0",
is_channel=True):
self.text = text
self.nick = nick
self.target = target
self.is_channel = is_channel
self.prefix = nick
self.command = "PRIVMSG"
self.params = [target, text]
self.tags = {}
self.raw = {}
# ---------------------------------------------------------------------------
# TestMumbleGuard
# ---------------------------------------------------------------------------
class TestMumbleGuard:
def test_is_mumble_true(self):
bot = _FakeBot(mumble=True)
assert _mod._is_mumble(bot) is True
def test_is_mumble_false(self):
bot = _FakeBot(mumble=False)
assert _mod._is_mumble(bot) is False
def test_play_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!play https://example.com")
asyncio.run(_mod.cmd_play(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
def test_stop_non_mumble_silent(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!stop")
asyncio.run(_mod.cmd_stop(bot, msg))
assert bot.replied == []
def test_skip_non_mumble_silent(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!skip")
asyncio.run(_mod.cmd_skip(bot, msg))
assert bot.replied == []
# ---------------------------------------------------------------------------
# TestPlayCommand
# ---------------------------------------------------------------------------
class TestPlayCommand:
def test_play_no_url(self):
bot = _FakeBot()
msg = _Msg(text="!play")
asyncio.run(_mod.cmd_play(bot, msg))
assert any("Usage" in r for r in bot.replied)
def test_play_queues_track(self):
bot = _FakeBot()
msg = _Msg(text="!play https://example.com/track")
with patch.object(_mod, "_resolve_title", return_value="Test Track"):
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_play(bot, msg))
assert any("Playing" in r for r in bot.replied)
ps = _mod._ps(bot)
assert len(ps["queue"]) == 1
assert ps["queue"][0].title == "Test Track"
def test_play_shows_queued_when_busy(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(
url="x", title="Current", requester="Bob",
)
msg = _Msg(text="!play https://example.com/next")
with patch.object(_mod, "_resolve_title", return_value="Next Track"):
asyncio.run(_mod.cmd_play(bot, msg))
assert any("Queued" in r for r in bot.replied)
def test_play_queue_full(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["queue"] = [
_mod._Track(url="x", title="t", requester="a")
for _ in range(_mod._MAX_QUEUE)
]
msg = _Msg(text="!play https://example.com/overflow")
with patch.object(_mod, "_resolve_title", return_value="Overflow"):
asyncio.run(_mod.cmd_play(bot, msg))
assert any("full" in r.lower() for r in bot.replied)
# ---------------------------------------------------------------------------
# TestStopCommand
# ---------------------------------------------------------------------------
class TestStopCommand:
def test_stop_clears_queue(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["queue"] = [_mod._Track(url="x", title="t", requester="a")]
ps["current"] = _mod._Track(url="y", title="s", requester="b")
msg = _Msg(text="!stop")
asyncio.run(_mod.cmd_stop(bot, msg))
assert ps["queue"] == []
assert ps["current"] is None
assert any("Stopped" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestSkipCommand
# ---------------------------------------------------------------------------
class TestSkipCommand:
def test_skip_nothing_playing(self):
bot = _FakeBot()
msg = _Msg(text="!skip")
asyncio.run(_mod.cmd_skip(bot, msg))
assert any("Nothing" in r for r in bot.replied)
def test_skip_with_queue(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="a", title="First", requester="x")
ps["queue"] = [_mod._Track(url="b", title="Second", requester="y")]
# We need to mock the task
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
msg = _Msg(text="!skip")
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_skip(bot, msg))
assert any("Skipped" in r for r in bot.replied)
mock_task.cancel.assert_called_once()
def test_skip_empty_queue(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="a", title="Only", requester="x")
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
msg = _Msg(text="!skip")
asyncio.run(_mod.cmd_skip(bot, msg))
assert any("empty" in r.lower() for r in bot.replied)
# ---------------------------------------------------------------------------
# TestQueueCommand
# ---------------------------------------------------------------------------
class TestQueueCommand:
def test_queue_empty(self):
bot = _FakeBot()
msg = _Msg(text="!queue")
asyncio.run(_mod.cmd_queue(bot, msg))
assert any("empty" in r.lower() for r in bot.replied)
def test_queue_with_tracks(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="a", title="Now", requester="x")
ps["queue"] = [
_mod._Track(url="b", title="Next", requester="y"),
]
msg = _Msg(text="!queue")
asyncio.run(_mod.cmd_queue(bot, msg))
assert any("Now" in r for r in bot.replied)
assert any("Next" in r for r in bot.replied)
def test_queue_with_url_delegates(self):
bot = _FakeBot()
msg = _Msg(text="!queue https://example.com/track")
with patch.object(_mod, "_resolve_title", return_value="Title"):
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_queue(bot, msg))
# Should have called cmd_play logic
assert any("Playing" in r or "Queued" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestNpCommand
# ---------------------------------------------------------------------------
class TestNpCommand:
def test_np_nothing(self):
bot = _FakeBot()
msg = _Msg(text="!np")
asyncio.run(_mod.cmd_np(bot, msg))
assert any("Nothing" in r for r in bot.replied)
def test_np_playing(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(
url="x", title="Cool Song", requester="DJ",
)
msg = _Msg(text="!np")
asyncio.run(_mod.cmd_np(bot, msg))
assert any("Cool Song" in r for r in bot.replied)
assert any("DJ" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestVolumeCommand
# ---------------------------------------------------------------------------
class TestVolumeCommand:
def test_volume_show(self):
bot = _FakeBot()
msg = _Msg(text="!volume")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("50%" in r for r in bot.replied)
def test_volume_set(self):
bot = _FakeBot()
msg = _Msg(text="!volume 75")
asyncio.run(_mod.cmd_volume(bot, msg))
ps = _mod._ps(bot)
assert ps["volume"] == 75
assert any("75%" in r for r in bot.replied)
def test_volume_out_of_range(self):
bot = _FakeBot()
msg = _Msg(text="!volume 150")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_volume_negative(self):
bot = _FakeBot()
msg = _Msg(text="!volume -10")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_volume_invalid(self):
bot = _FakeBot()
msg = _Msg(text="!volume abc")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("Usage" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestPerBotState
# ---------------------------------------------------------------------------
class TestPerBotState:
def test_ps_initializes(self):
bot = _FakeBot()
ps = _mod._ps(bot)
assert ps["queue"] == []
assert ps["current"] is None
assert ps["volume"] == 50
def test_ps_stable_reference(self):
bot = _FakeBot()
ps1 = _mod._ps(bot)
ps2 = _mod._ps(bot)
assert ps1 is ps2
def test_ps_isolated_per_bot(self):
bot1 = _FakeBot()
bot2 = _FakeBot()
_mod._ps(bot1)["volume"] = 80
assert _mod._ps(bot2)["volume"] == 50
# ---------------------------------------------------------------------------
# TestHelpers
# ---------------------------------------------------------------------------
class TestMusicHelpers:
def test_truncate_short(self):
assert _mod._truncate("short") == "short"
def test_truncate_long(self):
long = "x" * 100
result = _mod._truncate(long)
assert len(result) == 80
assert result.endswith("...")

154
tests/test_opus.py Normal file
View File

@@ -0,0 +1,154 @@
"""Tests for the Opus ctypes wrapper."""
import math
import struct
from unittest.mock import MagicMock, patch
import pytest
from derp.opus import CHANNELS, FRAME_BYTES, FRAME_SIZE, SAMPLE_RATE
# -- Helpers -----------------------------------------------------------------
def _silence() -> bytes:
"""Generate one frame of silence (1920 bytes of zeros)."""
return b"\x00" * FRAME_BYTES
def _sine_frame(freq: float = 440.0) -> bytes:
"""Generate one 20ms frame of a sine wave at the given frequency."""
samples = []
for i in range(FRAME_SIZE):
t = i / SAMPLE_RATE
val = int(16000 * math.sin(2 * math.pi * freq * t))
samples.append(struct.pack("<h", max(-32768, min(32767, val))))
return b"".join(samples)
# -- Mock libopus for unit testing without system library --------------------
def _mock_lib():
"""Build a mock ctypes CDLL that simulates libopus calls."""
lib = MagicMock()
lib.opus_encoder_get_size.return_value = 256
lib.opus_encoder_init.return_value = 0
lib.opus_encoder_ctl.return_value = 0
lib.opus_encode.return_value = 10 # 10 bytes output
return lib
@pytest.fixture(autouse=True)
def _reset_lib_cache():
"""Reset the cached _lib before each test."""
import derp.opus as _mod
old = _mod._lib
_mod._lib = None
yield
_mod._lib = old
# ---------------------------------------------------------------------------
# TestOpusConstants
# ---------------------------------------------------------------------------
class TestOpusConstants:
def test_sample_rate(self):
assert SAMPLE_RATE == 48000
def test_channels(self):
assert CHANNELS == 1
def test_frame_size(self):
assert FRAME_SIZE == 960
def test_frame_bytes(self):
assert FRAME_BYTES == FRAME_SIZE * CHANNELS * 2
# ---------------------------------------------------------------------------
# TestOpusEncoder (mocked libopus)
# ---------------------------------------------------------------------------
class TestOpusEncoder:
def test_encode_silence(self):
"""Encoding silence produces bytes output."""
lib = _mock_lib()
with patch("derp.opus._load_lib", return_value=lib):
from derp.opus import OpusEncoder
enc = OpusEncoder()
result = enc.encode(_silence())
assert isinstance(result, bytes)
assert len(result) > 0
enc.close()
def test_encode_sine(self):
"""Encoding a sine wave produces bytes output."""
lib = _mock_lib()
with patch("derp.opus._load_lib", return_value=lib):
from derp.opus import OpusEncoder
enc = OpusEncoder()
result = enc.encode(_sine_frame())
assert isinstance(result, bytes)
enc.close()
def test_encode_wrong_size(self):
"""Passing wrong buffer size raises ValueError."""
lib = _mock_lib()
with patch("derp.opus._load_lib", return_value=lib):
from derp.opus import OpusEncoder
enc = OpusEncoder()
with pytest.raises(ValueError, match="expected 1920"):
enc.encode(b"\x00" * 100)
enc.close()
def test_encode_multi_frame(self):
"""Multiple sequential encodes work."""
lib = _mock_lib()
with patch("derp.opus._load_lib", return_value=lib):
from derp.opus import OpusEncoder
enc = OpusEncoder()
for _ in range(5):
result = enc.encode(_silence())
assert isinstance(result, bytes)
enc.close()
def test_custom_bitrate(self):
"""Custom bitrate is passed to opus_encoder_ctl."""
lib = _mock_lib()
with patch("derp.opus._load_lib", return_value=lib):
from derp.opus import OpusEncoder
enc = OpusEncoder(bitrate=96000)
assert lib.opus_encoder_ctl.called
enc.close()
def test_init_failure(self):
"""RuntimeError on encoder init failure."""
lib = _mock_lib()
lib.opus_encoder_init.return_value = -1
with patch("derp.opus._load_lib", return_value=lib):
from derp.opus import OpusEncoder
with pytest.raises(RuntimeError, match="opus_encoder_init"):
OpusEncoder()
def test_encode_failure(self):
"""RuntimeError on encode failure."""
lib = _mock_lib()
lib.opus_encode.return_value = -1
with patch("derp.opus._load_lib", return_value=lib):
from derp.opus import OpusEncoder
enc = OpusEncoder()
with pytest.raises(RuntimeError, match="opus_encode"):
enc.encode(_silence())
def test_close_clears_state(self):
"""close() sets internal state to None."""
lib = _mock_lib()
with patch("derp.opus._load_lib", return_value=lib):
from derp.opus import OpusEncoder
enc = OpusEncoder()
enc.close()
assert enc._state is None