"""Tests for the Mumble adapter.""" import asyncio from unittest.mock import patch from derp.mumble import ( _WIRE_LEN, _WIRE_VARINT, MSG_CHANNEL_REMOVE, MSG_CHANNEL_STATE, MSG_PING, MSG_SERVER_SYNC, MSG_TEXT_MESSAGE, MSG_USER_REMOVE, MSG_USER_STATE, MumbleBot, MumbleMessage, _build_authenticate_payload, _build_mumble_message, _build_ping_payload, _build_text_message_payload, _build_version_payload, _decode_fields, _decode_varint, _encode_field, _encode_varint, _escape_html, _field_int, _field_ints, _field_str, _pack_msg, _strip_html, _unpack_header, ) from derp.plugin import PluginRegistry # -- Helpers ----------------------------------------------------------------- def _make_bot(admins=None, operators=None, trusted=None, prefix=None): """Create a MumbleBot with test config.""" config = { "mumble": { "enabled": True, "host": "127.0.0.1", "port": 64738, "username": "derp", "password": "", "tls_verify": False, "admins": admins or [], "operators": operators or [], "trusted": trusted or [], }, "bot": { "prefix": prefix or "!", "paste_threshold": 4, "plugins_dir": "plugins", "rate_limit": 2.0, "rate_burst": 5, }, } registry = PluginRegistry() bot = MumbleBot("mu-test", config, registry) return bot def _mu_msg(text="!ping", nick="Alice", prefix="Alice", target="0", is_channel=True): """Create a MumbleMessage for command testing.""" return MumbleMessage( raw={}, nick=nick, prefix=prefix, text=text, target=target, is_channel=is_channel, params=[target, text], ) # -- Test helpers for registering commands ----------------------------------- async def _echo_handler(bot, msg): """Simple command handler that echoes text.""" args = msg.text.split(None, 1) reply = args[1] if len(args) > 1 else "no args" await bot.reply(msg, reply) async def _admin_handler(bot, msg): """Admin-only command handler.""" await bot.reply(msg, "admin action done") # --------------------------------------------------------------------------- # TestProtobufCodec # --------------------------------------------------------------------------- class TestProtobufCodec: def test_encode_varint_zero(self): assert _encode_varint(0) == b"\x00" def test_encode_varint_small(self): assert _encode_varint(1) == b"\x01" assert _encode_varint(127) == b"\x7f" def test_encode_varint_two_byte(self): assert _encode_varint(128) == b"\x80\x01" assert _encode_varint(300) == b"\xac\x02" def test_encode_varint_large(self): # 16384 = 0x4000 encoded = _encode_varint(16384) val, _ = _decode_varint(encoded, 0) assert val == 16384 def test_decode_varint_zero(self): val, off = _decode_varint(b"\x00", 0) assert val == 0 assert off == 1 def test_decode_varint_small(self): val, off = _decode_varint(b"\x01", 0) assert val == 1 assert off == 1 def test_decode_varint_multi_byte(self): val, off = _decode_varint(b"\xac\x02", 0) assert val == 300 assert off == 2 def test_varint_roundtrip(self): for n in [0, 1, 127, 128, 300, 16384, 2**21, 2**28]: encoded = _encode_varint(n) decoded, _ = _decode_varint(encoded, 0) assert decoded == n, f"roundtrip failed for {n}" def test_encode_field_varint(self): # field 1, wire type 0, value 42 data = _encode_field(1, _WIRE_VARINT, 42) fields = _decode_fields(data) assert fields[1] == [42] def test_encode_field_string(self): data = _encode_field(5, _WIRE_LEN, "hello") fields = _decode_fields(data) assert fields[5] == [b"hello"] def test_encode_field_bytes(self): data = _encode_field(3, _WIRE_LEN, b"\x00\x01\x02") fields = _decode_fields(data) assert fields[3] == [b"\x00\x01\x02"] def test_decode_multiple_fields(self): data = ( _encode_field(1, _WIRE_VARINT, 10) + _encode_field(2, _WIRE_LEN, "test") + _encode_field(3, _WIRE_VARINT, 99) ) fields = _decode_fields(data) assert fields[1] == [10] assert fields[2] == [b"test"] assert fields[3] == [99] def test_decode_repeated_fields(self): data = ( _encode_field(3, _WIRE_VARINT, 1) + _encode_field(3, _WIRE_VARINT, 2) + _encode_field(3, _WIRE_VARINT, 3) ) fields = _decode_fields(data) assert fields[3] == [1, 2, 3] def test_pack_unpack_header(self): packed = _pack_msg(11, b"hello") msg_type, length = _unpack_header(packed[:6]) assert msg_type == 11 assert length == 5 assert packed[6:] == b"hello" def test_pack_empty_payload(self): packed = _pack_msg(3) assert len(packed) == 6 msg_type, length = _unpack_header(packed) assert msg_type == 3 assert length == 0 def test_field_str(self): fields = {5: [b"hello"]} assert _field_str(fields, 5) == "hello" assert _field_str(fields, 1) is None def test_field_int(self): fields = {1: [42]} assert _field_int(fields, 1) == 42 assert _field_int(fields, 2) is None def test_field_ints(self): fields = {3: [1, 2, 3]} assert _field_ints(fields, 3) == [1, 2, 3] assert _field_ints(fields, 9) == [] def test_decode_empty(self): fields = _decode_fields(b"") assert fields == {} # --------------------------------------------------------------------------- # TestMumbleMessage # --------------------------------------------------------------------------- class TestMumbleMessage: def test_defaults(self): msg = MumbleMessage(raw={}, nick=None, prefix=None, text=None, target=None) assert msg.is_channel is True assert msg.command == "PRIVMSG" assert msg.params == [] assert msg.tags == {} def test_custom_values(self): msg = MumbleMessage( raw={"field": 1}, nick="Alice", prefix="Alice", text="hello", target="0", is_channel=True, command="PRIVMSG", params=["0", "hello"], tags={"key": "val"}, ) assert msg.nick == "Alice" assert msg.prefix == "Alice" assert msg.text == "hello" assert msg.target == "0" assert msg.tags == {"key": "val"} def test_duck_type_compat(self): """MumbleMessage has the same attribute names as IRC Message.""" msg = _mu_msg() attrs = ["raw", "nick", "prefix", "text", "target", "is_channel", "command", "params", "tags"] for attr in attrs: assert hasattr(msg, attr), f"missing attribute: {attr}" def test_dm_message(self): msg = _mu_msg(target="dm", is_channel=False) assert msg.is_channel is False assert msg.target == "dm" def test_prefix_is_username(self): msg = _mu_msg(prefix="admin_user") assert msg.prefix == "admin_user" # --------------------------------------------------------------------------- # TestBuildMumbleMessage # --------------------------------------------------------------------------- class TestBuildMumbleMessage: def test_channel_message(self): fields = { 1: [42], # actor session 3: [5], # channel_id 5: [b"Hello"], # message HTML } users = {42: "Alice"} msg = _build_mumble_message(fields, users, our_session=1) assert msg is not None assert msg.nick == "Alice" assert msg.prefix == "Alice" assert msg.text == "Hello" assert msg.target == "5" assert msg.is_channel is True def test_dm_message(self): fields = { 1: [42], # actor session 2: [1], # target session (DM) 5: [b"secret"], # message } users = {42: "Bob"} msg = _build_mumble_message(fields, users, our_session=1) assert msg is not None assert msg.target == "dm" assert msg.is_channel is False def test_missing_sender(self): fields = { 3: [0], # channel_id 5: [b"anonymous"], # message } msg = _build_mumble_message(fields, {}, our_session=1) assert msg is not None assert msg.nick is None assert msg.prefix is None def test_missing_message(self): fields = { 1: [42], 3: [0], } msg = _build_mumble_message(fields, {42: "Alice"}, our_session=1) assert msg is None def test_html_stripped(self): fields = { 1: [1], 3: [0], 5: [b"click & go"], } msg = _build_mumble_message(fields, {1: "User"}, our_session=0) assert msg is not None assert msg.text == "click & go" def test_no_target(self): fields = { 1: [42], 5: [b"orphan message"], } msg = _build_mumble_message(fields, {42: "Alice"}, our_session=1) assert msg is not None assert msg.target is None assert msg.is_channel is True # --------------------------------------------------------------------------- # TestHtmlHelpers # --------------------------------------------------------------------------- class TestHtmlHelpers: def test_strip_html_simple(self): assert _strip_html("bold") == "bold" def test_strip_html_entities(self): assert _strip_html("& < > "") == '& < > "' def test_strip_html_nested(self): assert _strip_html("

hello world

") == "hello world" def test_strip_html_plain(self): assert _strip_html("no tags here") == "no tags here" def test_escape_html(self): assert _escape_html("