From 47b13c3f1f98f01e3b57eaf2274da262658192e1 Mon Sep 17 00:00:00 2001 From: user Date: Sat, 21 Feb 2026 21:42:28 +0100 Subject: [PATCH] feat: add Mumble music playback with Opus streaming ctypes libopus encoder (src/derp/opus.py), voice varint/packet builder and stream_audio method on MumbleBot (src/derp/mumble.py), music plugin with play/stop/skip/queue/np/volume commands (plugins/music.py). Audio pipeline: yt-dlp|ffmpeg subprocess -> PCM -> Opus -> UDPTunnel. 67 new tests (1561 total). --- plugins/music.py | 283 ++++++++++++++++++++++++++++++++++ src/derp/mumble.py | 160 +++++++++++++++++++- src/derp/opus.py | 100 ++++++++++++ tests/test_mumble.py | 183 ++++++++++++++++++++++ tests/test_music.py | 353 +++++++++++++++++++++++++++++++++++++++++++ tests/test_opus.py | 154 +++++++++++++++++++ 6 files changed, 1232 insertions(+), 1 deletion(-) create mode 100644 plugins/music.py create mode 100644 src/derp/opus.py create mode 100644 tests/test_music.py create mode 100644 tests/test_opus.py diff --git a/plugins/music.py b/plugins/music.py new file mode 100644 index 0000000..fcfbe40 --- /dev/null +++ b/plugins/music.py @@ -0,0 +1,283 @@ +"""Plugin: music playback for Mumble voice channels.""" + +from __future__ import annotations + +import asyncio +import logging +import subprocess +from dataclasses import dataclass + +from derp.plugin import command + +log = logging.getLogger(__name__) + +_MAX_QUEUE = 50 +_MAX_TITLE_LEN = 80 + + +@dataclass(slots=True) +class _Track: + url: str + title: str + requester: str + + +# -- Per-bot runtime state --------------------------------------------------- + + +def _ps(bot): + """Per-bot plugin runtime state.""" + return bot._pstate.setdefault("music", { + "queue": [], + "current": None, + "volume": 50, + "task": None, + "done_event": None, + }) + + +# -- Helpers ----------------------------------------------------------------- + + +def _is_mumble(bot) -> bool: + """Check if bot supports voice streaming.""" + return hasattr(bot, "stream_audio") + + +def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str: + """Truncate text with ellipsis if needed.""" + if len(text) <= max_len: + return text + return text[: max_len - 3].rstrip() + "..." + + +def _resolve_title(url: str) -> str: + """Resolve track title via yt-dlp. Blocking, run in executor.""" + try: + result = subprocess.run( + ["yt-dlp", "--get-title", "--no-warnings", url], + capture_output=True, text=True, timeout=15, + ) + title = result.stdout.strip() + return title if title else url + except Exception: + return url + + +# -- Play loop --------------------------------------------------------------- + + +async def _play_loop(bot) -> None: + """Pop tracks from queue and stream them sequentially.""" + ps = _ps(bot) + try: + while ps["queue"]: + track = ps["queue"].pop(0) + ps["current"] = track + + done = asyncio.Event() + ps["done_event"] = done + + volume = ps["volume"] / 100.0 + try: + await bot.stream_audio( + track.url, volume=volume, on_done=done, + ) + except asyncio.CancelledError: + raise + except Exception: + log.exception("music: stream error for %s", track.url) + + await done.wait() + except asyncio.CancelledError: + pass + finally: + ps["current"] = None + ps["done_event"] = None + ps["task"] = None + + +def _ensure_loop(bot) -> None: + """Start the play loop if not already running.""" + ps = _ps(bot) + task = ps.get("task") + if task and not task.done(): + return + ps["task"] = bot._spawn(_play_loop(bot), name="music-play-loop") + + +# -- Commands ---------------------------------------------------------------- + + +@command("play", help="Music: !play ") +async def cmd_play(bot, message): + """Play a URL or add to queue if already playing. + + Usage: + !play Play audio from URL (YouTube, SoundCloud, etc.) + """ + if not _is_mumble(bot): + await bot.reply(message, "Music playback is Mumble-only") + return + + parts = message.text.split(None, 1) + if len(parts) < 2: + await bot.reply(message, "Usage: !play ") + return + + url = parts[1].strip() + ps = _ps(bot) + + if len(ps["queue"]) >= _MAX_QUEUE: + await bot.reply(message, f"Queue full ({_MAX_QUEUE} tracks)") + return + + loop = asyncio.get_running_loop() + title = await loop.run_in_executor(None, _resolve_title, url) + track = _Track(url=url, title=title, requester=message.nick or "?") + + ps["queue"].append(track) + + if ps["current"] is not None: + pos = len(ps["queue"]) + await bot.reply( + message, + f"Queued #{pos}: {_truncate(title)}", + ) + else: + await bot.reply(message, f"Playing: {_truncate(title)}") + _ensure_loop(bot) + + +@command("stop", help="Music: !stop") +async def cmd_stop(bot, message): + """Stop playback and clear queue.""" + if not _is_mumble(bot): + return + + ps = _ps(bot) + ps["queue"].clear() + + task = ps.get("task") + if task and not task.done(): + task.cancel() + ps["current"] = None + ps["task"] = None + ps["done_event"] = None + + await bot.reply(message, "Stopped") + + +@command("skip", help="Music: !skip") +async def cmd_skip(bot, message): + """Skip current track, advance to next in queue.""" + if not _is_mumble(bot): + return + + ps = _ps(bot) + if ps["current"] is None: + await bot.reply(message, "Nothing playing") + return + + task = ps.get("task") + if task and not task.done(): + task.cancel() + + skipped = ps["current"] + ps["current"] = None + ps["task"] = None + + if ps["queue"]: + _ensure_loop(bot) + await bot.reply( + message, + f"Skipped: {_truncate(skipped.title)}", + ) + else: + await bot.reply(message, "Skipped, queue empty") + + +@command("queue", help="Music: !queue [url]") +async def cmd_queue(bot, message): + """Show queue or add a URL. + + Usage: + !queue Show current queue + !queue Add URL to queue (alias for !play) + """ + if not _is_mumble(bot): + return + + parts = message.text.split(None, 1) + if len(parts) >= 2: + # Alias for !play + await cmd_play(bot, message) + return + + ps = _ps(bot) + lines = [] + if ps["current"]: + lines.append( + f"Now: {_truncate(ps['current'].title)}" + f" [{ps['current'].requester}]" + ) + if ps["queue"]: + for i, track in enumerate(ps["queue"], 1): + lines.append( + f" {i}. {_truncate(track.title)} [{track.requester}]" + ) + else: + if not ps["current"]: + lines.append("Queue empty") + + for line in lines: + await bot.reply(message, line) + + +@command("np", help="Music: !np") +async def cmd_np(bot, message): + """Show now-playing track.""" + if not _is_mumble(bot): + return + + ps = _ps(bot) + if ps["current"] is None: + await bot.reply(message, "Nothing playing") + return + + track = ps["current"] + await bot.reply( + message, + f"Now playing: {_truncate(track.title)} [{track.requester}]", + ) + + +@command("volume", help="Music: !volume [0-100]") +async def cmd_volume(bot, message): + """Get or set playback volume. + + Usage: + !volume Show current volume + !volume <0-100> Set volume (applies on next track) + """ + if not _is_mumble(bot): + return + + ps = _ps(bot) + parts = message.text.split(None, 1) + if len(parts) < 2: + await bot.reply(message, f"Volume: {ps['volume']}%") + return + + try: + val = int(parts[1]) + except ValueError: + await bot.reply(message, "Usage: !volume <0-100>") + return + + if val < 0 or val > 100: + await bot.reply(message, "Volume must be 0-100") + return + + ps["volume"] = val + await bot.reply(message, f"Volume set to {val}%") diff --git a/src/derp/mumble.py b/src/derp/mumble.py index 939e066..0676c55 100644 --- a/src/derp/mumble.py +++ b/src/derp/mumble.py @@ -1,7 +1,8 @@ -"""Mumble adapter: TLS/TCP over SOCKS5, protobuf control channel (text only).""" +"""Mumble adapter: TLS/TCP over SOCKS5, protobuf control channel + voice.""" from __future__ import annotations +import array import asyncio import html import logging @@ -25,6 +26,7 @@ _AMBIGUOUS = object() # sentinel for ambiguous prefix matches # -- Mumble message types ---------------------------------------------------- MSG_VERSION = 0 +MSG_UDPTUNNEL = 1 MSG_AUTHENTICATE = 2 MSG_PING = 3 MSG_SERVER_SYNC = 5 @@ -148,6 +150,85 @@ def _escape_html(text: str) -> str: return html.escape(text, quote=False) +def _shell_quote(s: str) -> str: + """Quote a string for safe shell interpolation.""" + return "'" + s.replace("'", "'\\''") + "'" + + +# -- Mumble voice helpers ---------------------------------------------------- + + +def _encode_voice_varint(value: int) -> bytes: + """Encode an integer using Mumble's voice varint format. + + NOT the same as protobuf varint. Mumble voice varints use a prefix + code based on leading bits: + 0xxxxxxx -- 7-bit (0-127) + 10xxxxxx yyyyyyyy -- 14-bit + 110xxxxx yyyyyyyy yyyyyyyy -- 21-bit + 1110xxxx yyyyyyyy yyyyyyyy yyyyyyyy -- 28-bit + 11110000 + 8 bytes -- 64-bit + """ + if value < 0: + raise ValueError("voice varint must be non-negative") + if value < 0x80: + return bytes([value]) + if value < 0x4000: + return bytes([0x80 | (value >> 8), value & 0xFF]) + if value < 0x200000: + return bytes([ + 0xC0 | (value >> 16), + (value >> 8) & 0xFF, + value & 0xFF, + ]) + if value < 0x10000000: + return bytes([ + 0xE0 | (value >> 24), + (value >> 16) & 0xFF, + (value >> 8) & 0xFF, + value & 0xFF, + ]) + # 64-bit fallback + return b"\xf0" + value.to_bytes(8, "big") + + +def _build_voice_packet( + sequence: int, + opus_data: bytes, + *, + last: bool = False, +) -> bytes: + """Build a Mumble voice packet for client-to-server Opus audio. + + Format (client-to-server, no session ID): + 1 byte : header (type=4 << 5 | target=0 -> 0x80) + varint : sequence number (increments by 1 per frame) + varint : opus frame length (bit 13 = terminator on last) + N bytes : raw opus data + """ + header = bytes([0x80]) # type=4 (Opus), target=0 + seq = _encode_voice_varint(sequence) + length = len(opus_data) + if last: + length |= 0x2000 # bit 13 = terminator flag + size = _encode_voice_varint(length) + return header + seq + size + opus_data + + +def _scale_pcm(data: bytes, volume: float) -> bytes: + """Scale s16le PCM samples by a volume factor, clamped to [-32768, 32767].""" + samples = array.array("h") + samples.frombytes(data) + for i in range(len(samples)): + val = int(samples[i] * volume) + if val > 32767: + val = 32767 + elif val < -32768: + val = -32768 + samples[i] = val + return samples.tobytes() + + # -- MumbleMessage ----------------------------------------------------------- @@ -646,6 +727,83 @@ class MumbleBot: """Send an action as italic HTML text.""" await self._send_html(target, f"{_escape_html(text)}") + # -- Voice streaming ----------------------------------------------------- + + async def _send_voice_packet(self, packet: bytes) -> None: + """Send a voice packet via UDPTunnel (msg type 1).""" + await self._send_msg(MSG_UDPTUNNEL, packet) + + async def stream_audio( + self, + url: str, + *, + volume: float = 0.5, + on_done=None, + ) -> None: + """Stream audio from URL through yt-dlp|ffmpeg to voice channel. + + Pipeline: + yt-dlp -o - -f bestaudio + | ffmpeg -i pipe:0 -f s16le -ar 48000 -ac 1 pipe:1 + + Reads 1920 bytes (20ms frames), scales volume, encodes Opus, + wraps in voice packets, sends at 20ms intervals. Sets terminator + flag on the last frame. + + Args: + url: Audio URL (YouTube, SoundCloud, etc.) + volume: Volume scale factor (0.0 to 1.0). + on_done: Optional asyncio.Event to set when playback ends. + """ + from derp.opus import FRAME_BYTES, OpusEncoder + + proc = await asyncio.create_subprocess_exec( + "sh", "-c", + f"yt-dlp -o - -f bestaudio --no-warnings {_shell_quote(url)}" + f" | ffmpeg -i pipe:0 -f s16le -ar 48000 -ac 1" + f" -loglevel error pipe:1", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + ) + + encoder = OpusEncoder() + sequence = 0 + try: + while True: + pcm = await proc.stdout.read(FRAME_BYTES) + if not pcm: + break + if len(pcm) < FRAME_BYTES: + pcm += b"\x00" * (FRAME_BYTES - len(pcm)) + + if volume != 1.0: + pcm = _scale_pcm(pcm, volume) + + opus_data = encoder.encode(pcm) + pkt = _build_voice_packet(sequence, opus_data) + await self._send_voice_packet(pkt) + sequence += 1 + + # Pace at 20ms per frame + await asyncio.sleep(0.02) + + # Send terminator frame (silence) + silence = b"\x00" * FRAME_BYTES + opus_data = encoder.encode(silence) + pkt = _build_voice_packet(sequence, opus_data, last=True) + await self._send_voice_packet(pkt) + except asyncio.CancelledError: + pass + finally: + encoder.close() + try: + proc.kill() + except ProcessLookupError: + pass + await proc.wait() + if on_done is not None: + on_done.set() + async def shorten_url(self, url: str) -> str: """Shorten a URL via FlaskPaste. Returns original on failure.""" fp = self.registry._modules.get("flaskpaste") diff --git a/src/derp/opus.py b/src/derp/opus.py new file mode 100644 index 0000000..f751d17 --- /dev/null +++ b/src/derp/opus.py @@ -0,0 +1,100 @@ +"""Minimal ctypes wrapper around system libopus for encoding only.""" + +from __future__ import annotations + +import ctypes +import ctypes.util +from ctypes import POINTER, c_char_p, c_int, c_int32 + +SAMPLE_RATE = 48000 +CHANNELS = 1 +FRAME_SIZE = 960 # 20ms at 48kHz mono +FRAME_BYTES = 1920 # FRAME_SIZE * CHANNELS * 2 (s16le) + +_APPLICATION_AUDIO = 2049 + +_OPUS_SET_BITRATE_REQUEST = 4002 +_OPUS_OK = 0 + +_lib: ctypes.CDLL | None = None + + +def _load_lib() -> ctypes.CDLL: + """Find and load libopus, cached after first call.""" + global _lib + if _lib is not None: + return _lib + + path = ctypes.util.find_library("opus") + if path is None: + path = "libopus.so.0" + + lib = ctypes.cdll.LoadLibrary(path) + + lib.opus_encoder_get_size.argtypes = [c_int] + lib.opus_encoder_get_size.restype = c_int + + lib.opus_encoder_init.argtypes = [c_char_p, c_int32, c_int, c_int] + lib.opus_encoder_init.restype = c_int + + lib.opus_encode.argtypes = [ + c_char_p, # encoder state + c_char_p, # pcm input + c_int, # frame_size (samples per channel) + POINTER(ctypes.c_ubyte), # output buffer + c_int32, # max output bytes + ] + lib.opus_encode.restype = c_int + + lib.opus_encoder_ctl.argtypes = [c_char_p, c_int] + lib.opus_encoder_ctl.restype = c_int + + _lib = lib + return lib + + +class OpusEncoder: + """Opus encoder for 48kHz mono s16le PCM -> Opus frames.""" + + def __init__(self, bitrate: int = 64000) -> None: + lib = _load_lib() + size = lib.opus_encoder_get_size(CHANNELS) + self._state = ctypes.create_string_buffer(size) + rc = lib.opus_encoder_init( + self._state, SAMPLE_RATE, CHANNELS, _APPLICATION_AUDIO, + ) + if rc != _OPUS_OK: + raise RuntimeError(f"opus_encoder_init failed: {rc}") + + rc = lib.opus_encoder_ctl( + self._state, _OPUS_SET_BITRATE_REQUEST, c_int32(bitrate), + ) + if rc != _OPUS_OK: + raise RuntimeError(f"opus_encoder_ctl set bitrate failed: {rc}") + + self._lib = lib + self._out = (ctypes.c_ubyte * 4000)() + + def encode(self, pcm: bytes) -> bytes: + """Encode one 20ms frame of s16le PCM to an Opus packet. + + Args: + pcm: Exactly 1920 bytes (960 samples, 48kHz mono s16le). + + Returns: + Opus-encoded frame bytes. + """ + if len(pcm) != FRAME_BYTES: + raise ValueError( + f"expected {FRAME_BYTES} bytes, got {len(pcm)}" + ) + n = self._lib.opus_encode( + self._state, pcm, FRAME_SIZE, self._out, len(self._out), + ) + if n < 0: + raise RuntimeError(f"opus_encode failed: {n}") + return bytes(self._out[:n]) + + def close(self) -> None: + """Release encoder state.""" + self._state = None diff --git a/tests/test_mumble.py b/tests/test_mumble.py index f177ab6..e69dec6 100644 --- a/tests/test_mumble.py +++ b/tests/test_mumble.py @@ -11,6 +11,7 @@ from derp.mumble import ( MSG_PING, MSG_SERVER_SYNC, MSG_TEXT_MESSAGE, + MSG_UDPTUNNEL, MSG_USER_REMOVE, MSG_USER_STATE, MumbleBot, @@ -20,15 +21,19 @@ from derp.mumble import ( _build_ping_payload, _build_text_message_payload, _build_version_payload, + _build_voice_packet, _decode_fields, _decode_varint, _encode_field, _encode_varint, + _encode_voice_varint, _escape_html, _field_int, _field_ints, _field_str, _pack_msg, + _scale_pcm, + _shell_quote, _strip_html, _unpack_header, ) @@ -912,3 +917,181 @@ class TestMumbleBotConfig: } bot = MumbleBot("test", config, PluginRegistry()) assert bot._proxy is False + + +# --------------------------------------------------------------------------- +# TestVoiceVarint +# --------------------------------------------------------------------------- + + +class TestVoiceVarint: + def test_zero(self): + assert _encode_voice_varint(0) == b"\x00" + + def test_7bit_max(self): + assert _encode_voice_varint(127) == b"\x7f" + + def test_7bit_small(self): + assert _encode_voice_varint(1) == b"\x01" + assert _encode_voice_varint(42) == b"\x2a" + + def test_14bit_min(self): + # 128 = 0x80 -> prefix 10, value 128 + result = _encode_voice_varint(128) + assert len(result) == 2 + assert result[0] & 0xC0 == 0x80 # top 2 bits = 10 + + def test_14bit_max(self): + result = _encode_voice_varint(0x3FFF) + assert len(result) == 2 + + def test_21bit(self): + result = _encode_voice_varint(0x4000) + assert len(result) == 3 + assert result[0] & 0xE0 == 0xC0 # top 3 bits = 110 + + def test_28bit(self): + result = _encode_voice_varint(0x200000) + assert len(result) == 4 + assert result[0] & 0xF0 == 0xE0 # top 4 bits = 1110 + + def test_64bit(self): + result = _encode_voice_varint(0x10000000) + assert len(result) == 9 + assert result[0] == 0xF0 + + def test_negative_raises(self): + import pytest + with pytest.raises(ValueError, match="non-negative"): + _encode_voice_varint(-1) + + def test_14bit_roundtrip(self): + """Value encoded and decoded back correctly (manual decode).""" + val = 300 + data = _encode_voice_varint(val) + assert len(data) == 2 + decoded = ((data[0] & 0x3F) << 8) | data[1] + assert decoded == val + + def test_7bit_roundtrip(self): + for v in range(128): + data = _encode_voice_varint(v) + assert len(data) == 1 + assert data[0] == v + + +# --------------------------------------------------------------------------- +# TestVoicePacket +# --------------------------------------------------------------------------- + + +class TestVoicePacket: + def test_header_byte(self): + pkt = _build_voice_packet(0, b"\xaa\xbb") + assert pkt[0] == 0x80 # type=4, target=0 + + def test_sequence_encoding(self): + pkt = _build_voice_packet(42, b"\x00") + # byte 0: header, byte 1: sequence=42 (7-bit) + assert pkt[1] == 42 + + def test_opus_data_present(self): + opus = b"\xde\xad\xbe\xef" + pkt = _build_voice_packet(0, opus) + assert pkt.endswith(opus) + + def test_length_field(self): + opus = b"\x00" * 10 + pkt = _build_voice_packet(0, opus) + # header(1) + seq(1, val=0) + length(1, val=10) + data(10) = 13 + assert len(pkt) == 13 + assert pkt[2] == 10 # length varint + + def test_terminator_flag(self): + opus = b"\x00" * 5 + pkt = _build_voice_packet(0, opus, last=True) + # length with bit 13 set: 5 | 0x2000 = 0x2005 + # 0x2005 in 14-bit varint: 10_100000 00000101 + length_bytes = _encode_voice_varint(5 | 0x2000) + assert length_bytes in pkt + + def test_no_terminator_by_default(self): + opus = b"\x00" * 5 + pkt = _build_voice_packet(0, opus, last=False) + # length=5, no bit 13 + assert pkt[2] == 5 + + +# --------------------------------------------------------------------------- +# TestPcmScaling +# --------------------------------------------------------------------------- + + +class TestPcmScaling: + def test_unity_volume(self): + import struct as _s + pcm = _s.pack(" str | None: + return self._store.get(ns, {}).get(key) + + def set(self, ns: str, key: str, value: str) -> None: + self._store.setdefault(ns, {})[key] = value + + def delete(self, ns: str, key: str) -> None: + self._store.get(ns, {}).pop(key, None) + + def keys(self, ns: str) -> list[str]: + return list(self._store.get(ns, {}).keys()) + + +class _FakeBot: + """Minimal bot for music plugin testing.""" + + def __init__(self, *, mumble: bool = True): + self.sent: list[tuple[str, str]] = [] + self.replied: list[str] = [] + self.state = _FakeState() + self._pstate: dict = {} + self._tasks: set[asyncio.Task] = set() + if mumble: + self.stream_audio = AsyncMock() + + async def send(self, target: str, text: str) -> None: + self.sent.append((target, text)) + + async def reply(self, message, text: str) -> None: + self.replied.append(text) + + def _is_admin(self, message) -> bool: + return False + + def _spawn(self, coro, *, name=None): + task = asyncio.ensure_future(coro) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) + return task + + +class _Msg: + """Minimal message object.""" + + def __init__(self, text="!play url", nick="Alice", target="0", + is_channel=True): + self.text = text + self.nick = nick + self.target = target + self.is_channel = is_channel + self.prefix = nick + self.command = "PRIVMSG" + self.params = [target, text] + self.tags = {} + self.raw = {} + + +# --------------------------------------------------------------------------- +# TestMumbleGuard +# --------------------------------------------------------------------------- + + +class TestMumbleGuard: + def test_is_mumble_true(self): + bot = _FakeBot(mumble=True) + assert _mod._is_mumble(bot) is True + + def test_is_mumble_false(self): + bot = _FakeBot(mumble=False) + assert _mod._is_mumble(bot) is False + + def test_play_non_mumble(self): + bot = _FakeBot(mumble=False) + msg = _Msg(text="!play https://example.com") + asyncio.run(_mod.cmd_play(bot, msg)) + assert any("Mumble-only" in r for r in bot.replied) + + def test_stop_non_mumble_silent(self): + bot = _FakeBot(mumble=False) + msg = _Msg(text="!stop") + asyncio.run(_mod.cmd_stop(bot, msg)) + assert bot.replied == [] + + def test_skip_non_mumble_silent(self): + bot = _FakeBot(mumble=False) + msg = _Msg(text="!skip") + asyncio.run(_mod.cmd_skip(bot, msg)) + assert bot.replied == [] + + +# --------------------------------------------------------------------------- +# TestPlayCommand +# --------------------------------------------------------------------------- + + +class TestPlayCommand: + def test_play_no_url(self): + bot = _FakeBot() + msg = _Msg(text="!play") + asyncio.run(_mod.cmd_play(bot, msg)) + assert any("Usage" in r for r in bot.replied) + + def test_play_queues_track(self): + bot = _FakeBot() + msg = _Msg(text="!play https://example.com/track") + with patch.object(_mod, "_resolve_title", return_value="Test Track"): + with patch.object(_mod, "_ensure_loop"): + asyncio.run(_mod.cmd_play(bot, msg)) + assert any("Playing" in r for r in bot.replied) + ps = _mod._ps(bot) + assert len(ps["queue"]) == 1 + assert ps["queue"][0].title == "Test Track" + + def test_play_shows_queued_when_busy(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["current"] = _mod._Track( + url="x", title="Current", requester="Bob", + ) + msg = _Msg(text="!play https://example.com/next") + with patch.object(_mod, "_resolve_title", return_value="Next Track"): + asyncio.run(_mod.cmd_play(bot, msg)) + assert any("Queued" in r for r in bot.replied) + + def test_play_queue_full(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["queue"] = [ + _mod._Track(url="x", title="t", requester="a") + for _ in range(_mod._MAX_QUEUE) + ] + msg = _Msg(text="!play https://example.com/overflow") + with patch.object(_mod, "_resolve_title", return_value="Overflow"): + asyncio.run(_mod.cmd_play(bot, msg)) + assert any("full" in r.lower() for r in bot.replied) + + +# --------------------------------------------------------------------------- +# TestStopCommand +# --------------------------------------------------------------------------- + + +class TestStopCommand: + def test_stop_clears_queue(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["queue"] = [_mod._Track(url="x", title="t", requester="a")] + ps["current"] = _mod._Track(url="y", title="s", requester="b") + msg = _Msg(text="!stop") + asyncio.run(_mod.cmd_stop(bot, msg)) + assert ps["queue"] == [] + assert ps["current"] is None + assert any("Stopped" in r for r in bot.replied) + + +# --------------------------------------------------------------------------- +# TestSkipCommand +# --------------------------------------------------------------------------- + + +class TestSkipCommand: + def test_skip_nothing_playing(self): + bot = _FakeBot() + msg = _Msg(text="!skip") + asyncio.run(_mod.cmd_skip(bot, msg)) + assert any("Nothing" in r for r in bot.replied) + + def test_skip_with_queue(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["current"] = _mod._Track(url="a", title="First", requester="x") + ps["queue"] = [_mod._Track(url="b", title="Second", requester="y")] + # We need to mock the task + mock_task = MagicMock() + mock_task.done.return_value = False + ps["task"] = mock_task + msg = _Msg(text="!skip") + with patch.object(_mod, "_ensure_loop"): + asyncio.run(_mod.cmd_skip(bot, msg)) + assert any("Skipped" in r for r in bot.replied) + mock_task.cancel.assert_called_once() + + def test_skip_empty_queue(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["current"] = _mod._Track(url="a", title="Only", requester="x") + mock_task = MagicMock() + mock_task.done.return_value = False + ps["task"] = mock_task + msg = _Msg(text="!skip") + asyncio.run(_mod.cmd_skip(bot, msg)) + assert any("empty" in r.lower() for r in bot.replied) + + +# --------------------------------------------------------------------------- +# TestQueueCommand +# --------------------------------------------------------------------------- + + +class TestQueueCommand: + def test_queue_empty(self): + bot = _FakeBot() + msg = _Msg(text="!queue") + asyncio.run(_mod.cmd_queue(bot, msg)) + assert any("empty" in r.lower() for r in bot.replied) + + def test_queue_with_tracks(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["current"] = _mod._Track(url="a", title="Now", requester="x") + ps["queue"] = [ + _mod._Track(url="b", title="Next", requester="y"), + ] + msg = _Msg(text="!queue") + asyncio.run(_mod.cmd_queue(bot, msg)) + assert any("Now" in r for r in bot.replied) + assert any("Next" in r for r in bot.replied) + + def test_queue_with_url_delegates(self): + bot = _FakeBot() + msg = _Msg(text="!queue https://example.com/track") + with patch.object(_mod, "_resolve_title", return_value="Title"): + with patch.object(_mod, "_ensure_loop"): + asyncio.run(_mod.cmd_queue(bot, msg)) + # Should have called cmd_play logic + assert any("Playing" in r or "Queued" in r for r in bot.replied) + + +# --------------------------------------------------------------------------- +# TestNpCommand +# --------------------------------------------------------------------------- + + +class TestNpCommand: + def test_np_nothing(self): + bot = _FakeBot() + msg = _Msg(text="!np") + asyncio.run(_mod.cmd_np(bot, msg)) + assert any("Nothing" in r for r in bot.replied) + + def test_np_playing(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["current"] = _mod._Track( + url="x", title="Cool Song", requester="DJ", + ) + msg = _Msg(text="!np") + asyncio.run(_mod.cmd_np(bot, msg)) + assert any("Cool Song" in r for r in bot.replied) + assert any("DJ" in r for r in bot.replied) + + +# --------------------------------------------------------------------------- +# TestVolumeCommand +# --------------------------------------------------------------------------- + + +class TestVolumeCommand: + def test_volume_show(self): + bot = _FakeBot() + msg = _Msg(text="!volume") + asyncio.run(_mod.cmd_volume(bot, msg)) + assert any("50%" in r for r in bot.replied) + + def test_volume_set(self): + bot = _FakeBot() + msg = _Msg(text="!volume 75") + asyncio.run(_mod.cmd_volume(bot, msg)) + ps = _mod._ps(bot) + assert ps["volume"] == 75 + assert any("75%" in r for r in bot.replied) + + def test_volume_out_of_range(self): + bot = _FakeBot() + msg = _Msg(text="!volume 150") + asyncio.run(_mod.cmd_volume(bot, msg)) + assert any("0-100" in r for r in bot.replied) + + def test_volume_negative(self): + bot = _FakeBot() + msg = _Msg(text="!volume -10") + asyncio.run(_mod.cmd_volume(bot, msg)) + assert any("0-100" in r for r in bot.replied) + + def test_volume_invalid(self): + bot = _FakeBot() + msg = _Msg(text="!volume abc") + asyncio.run(_mod.cmd_volume(bot, msg)) + assert any("Usage" in r for r in bot.replied) + + +# --------------------------------------------------------------------------- +# TestPerBotState +# --------------------------------------------------------------------------- + + +class TestPerBotState: + def test_ps_initializes(self): + bot = _FakeBot() + ps = _mod._ps(bot) + assert ps["queue"] == [] + assert ps["current"] is None + assert ps["volume"] == 50 + + def test_ps_stable_reference(self): + bot = _FakeBot() + ps1 = _mod._ps(bot) + ps2 = _mod._ps(bot) + assert ps1 is ps2 + + def test_ps_isolated_per_bot(self): + bot1 = _FakeBot() + bot2 = _FakeBot() + _mod._ps(bot1)["volume"] = 80 + assert _mod._ps(bot2)["volume"] == 50 + + +# --------------------------------------------------------------------------- +# TestHelpers +# --------------------------------------------------------------------------- + + +class TestMusicHelpers: + def test_truncate_short(self): + assert _mod._truncate("short") == "short" + + def test_truncate_long(self): + long = "x" * 100 + result = _mod._truncate(long) + assert len(result) == 80 + assert result.endswith("...") diff --git a/tests/test_opus.py b/tests/test_opus.py new file mode 100644 index 0000000..6020d06 --- /dev/null +++ b/tests/test_opus.py @@ -0,0 +1,154 @@ +"""Tests for the Opus ctypes wrapper.""" + +import math +import struct +from unittest.mock import MagicMock, patch + +import pytest + +from derp.opus import CHANNELS, FRAME_BYTES, FRAME_SIZE, SAMPLE_RATE + +# -- Helpers ----------------------------------------------------------------- + + +def _silence() -> bytes: + """Generate one frame of silence (1920 bytes of zeros).""" + return b"\x00" * FRAME_BYTES + + +def _sine_frame(freq: float = 440.0) -> bytes: + """Generate one 20ms frame of a sine wave at the given frequency.""" + samples = [] + for i in range(FRAME_SIZE): + t = i / SAMPLE_RATE + val = int(16000 * math.sin(2 * math.pi * freq * t)) + samples.append(struct.pack(" 0 + enc.close() + + def test_encode_sine(self): + """Encoding a sine wave produces bytes output.""" + lib = _mock_lib() + with patch("derp.opus._load_lib", return_value=lib): + from derp.opus import OpusEncoder + enc = OpusEncoder() + result = enc.encode(_sine_frame()) + assert isinstance(result, bytes) + enc.close() + + def test_encode_wrong_size(self): + """Passing wrong buffer size raises ValueError.""" + lib = _mock_lib() + with patch("derp.opus._load_lib", return_value=lib): + from derp.opus import OpusEncoder + enc = OpusEncoder() + with pytest.raises(ValueError, match="expected 1920"): + enc.encode(b"\x00" * 100) + enc.close() + + def test_encode_multi_frame(self): + """Multiple sequential encodes work.""" + lib = _mock_lib() + with patch("derp.opus._load_lib", return_value=lib): + from derp.opus import OpusEncoder + enc = OpusEncoder() + for _ in range(5): + result = enc.encode(_silence()) + assert isinstance(result, bytes) + enc.close() + + def test_custom_bitrate(self): + """Custom bitrate is passed to opus_encoder_ctl.""" + lib = _mock_lib() + with patch("derp.opus._load_lib", return_value=lib): + from derp.opus import OpusEncoder + enc = OpusEncoder(bitrate=96000) + assert lib.opus_encoder_ctl.called + enc.close() + + def test_init_failure(self): + """RuntimeError on encoder init failure.""" + lib = _mock_lib() + lib.opus_encoder_init.return_value = -1 + with patch("derp.opus._load_lib", return_value=lib): + from derp.opus import OpusEncoder + with pytest.raises(RuntimeError, match="opus_encoder_init"): + OpusEncoder() + + def test_encode_failure(self): + """RuntimeError on encode failure.""" + lib = _mock_lib() + lib.opus_encode.return_value = -1 + with patch("derp.opus._load_lib", return_value=lib): + from derp.opus import OpusEncoder + enc = OpusEncoder() + with pytest.raises(RuntimeError, match="opus_encode"): + enc.encode(_silence()) + + def test_close_clears_state(self): + """close() sets internal state to None.""" + lib = _mock_lib() + with patch("derp.opus._load_lib", return_value=lib): + from derp.opus import OpusEncoder + enc = OpusEncoder() + enc.close() + assert enc._state is None