ctypes libopus encoder (src/derp/opus.py), voice varint/packet builder and stream_audio method on MumbleBot (src/derp/mumble.py), music plugin with play/stop/skip/queue/np/volume commands (plugins/music.py). Audio pipeline: yt-dlp|ffmpeg subprocess -> PCM -> Opus -> UDPTunnel. 67 new tests (1561 total).
1098 lines
34 KiB
Python
1098 lines
34 KiB
Python
"""Tests for the Mumble adapter."""
|
|
|
|
import asyncio
|
|
from unittest.mock import patch
|
|
|
|
from derp.mumble import (
|
|
_WIRE_LEN,
|
|
_WIRE_VARINT,
|
|
MSG_CHANNEL_REMOVE,
|
|
MSG_CHANNEL_STATE,
|
|
MSG_PING,
|
|
MSG_SERVER_SYNC,
|
|
MSG_TEXT_MESSAGE,
|
|
MSG_UDPTUNNEL,
|
|
MSG_USER_REMOVE,
|
|
MSG_USER_STATE,
|
|
MumbleBot,
|
|
MumbleMessage,
|
|
_build_authenticate_payload,
|
|
_build_mumble_message,
|
|
_build_ping_payload,
|
|
_build_text_message_payload,
|
|
_build_version_payload,
|
|
_build_voice_packet,
|
|
_decode_fields,
|
|
_decode_varint,
|
|
_encode_field,
|
|
_encode_varint,
|
|
_encode_voice_varint,
|
|
_escape_html,
|
|
_field_int,
|
|
_field_ints,
|
|
_field_str,
|
|
_pack_msg,
|
|
_scale_pcm,
|
|
_shell_quote,
|
|
_strip_html,
|
|
_unpack_header,
|
|
)
|
|
from derp.plugin import PluginRegistry
|
|
|
|
# -- Helpers -----------------------------------------------------------------
|
|
|
|
|
|
def _make_bot(admins=None, operators=None, trusted=None, prefix=None):
|
|
"""Create a MumbleBot with test config."""
|
|
config = {
|
|
"mumble": {
|
|
"enabled": True,
|
|
"host": "127.0.0.1",
|
|
"port": 64738,
|
|
"username": "derp",
|
|
"password": "",
|
|
"tls_verify": False,
|
|
"admins": admins or [],
|
|
"operators": operators or [],
|
|
"trusted": trusted or [],
|
|
},
|
|
"bot": {
|
|
"prefix": prefix or "!",
|
|
"paste_threshold": 4,
|
|
"plugins_dir": "plugins",
|
|
"rate_limit": 2.0,
|
|
"rate_burst": 5,
|
|
},
|
|
}
|
|
registry = PluginRegistry()
|
|
bot = MumbleBot("mu-test", config, registry)
|
|
return bot
|
|
|
|
|
|
def _mu_msg(text="!ping", nick="Alice", prefix="Alice",
|
|
target="0", is_channel=True):
|
|
"""Create a MumbleMessage for command testing."""
|
|
return MumbleMessage(
|
|
raw={}, nick=nick, prefix=prefix, text=text, target=target,
|
|
is_channel=is_channel,
|
|
params=[target, text],
|
|
)
|
|
|
|
|
|
# -- Test helpers for registering commands -----------------------------------
|
|
|
|
|
|
async def _echo_handler(bot, msg):
|
|
"""Simple command handler that echoes text."""
|
|
args = msg.text.split(None, 1)
|
|
reply = args[1] if len(args) > 1 else "no args"
|
|
await bot.reply(msg, reply)
|
|
|
|
|
|
async def _admin_handler(bot, msg):
|
|
"""Admin-only command handler."""
|
|
await bot.reply(msg, "admin action done")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestProtobufCodec
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProtobufCodec:
|
|
def test_encode_varint_zero(self):
|
|
assert _encode_varint(0) == b"\x00"
|
|
|
|
def test_encode_varint_small(self):
|
|
assert _encode_varint(1) == b"\x01"
|
|
assert _encode_varint(127) == b"\x7f"
|
|
|
|
def test_encode_varint_two_byte(self):
|
|
assert _encode_varint(128) == b"\x80\x01"
|
|
assert _encode_varint(300) == b"\xac\x02"
|
|
|
|
def test_encode_varint_large(self):
|
|
# 16384 = 0x4000
|
|
encoded = _encode_varint(16384)
|
|
val, _ = _decode_varint(encoded, 0)
|
|
assert val == 16384
|
|
|
|
def test_decode_varint_zero(self):
|
|
val, off = _decode_varint(b"\x00", 0)
|
|
assert val == 0
|
|
assert off == 1
|
|
|
|
def test_decode_varint_small(self):
|
|
val, off = _decode_varint(b"\x01", 0)
|
|
assert val == 1
|
|
assert off == 1
|
|
|
|
def test_decode_varint_multi_byte(self):
|
|
val, off = _decode_varint(b"\xac\x02", 0)
|
|
assert val == 300
|
|
assert off == 2
|
|
|
|
def test_varint_roundtrip(self):
|
|
for n in [0, 1, 127, 128, 300, 16384, 2**21, 2**28]:
|
|
encoded = _encode_varint(n)
|
|
decoded, _ = _decode_varint(encoded, 0)
|
|
assert decoded == n, f"roundtrip failed for {n}"
|
|
|
|
def test_encode_field_varint(self):
|
|
# field 1, wire type 0, value 42
|
|
data = _encode_field(1, _WIRE_VARINT, 42)
|
|
fields = _decode_fields(data)
|
|
assert fields[1] == [42]
|
|
|
|
def test_encode_field_string(self):
|
|
data = _encode_field(5, _WIRE_LEN, "hello")
|
|
fields = _decode_fields(data)
|
|
assert fields[5] == [b"hello"]
|
|
|
|
def test_encode_field_bytes(self):
|
|
data = _encode_field(3, _WIRE_LEN, b"\x00\x01\x02")
|
|
fields = _decode_fields(data)
|
|
assert fields[3] == [b"\x00\x01\x02"]
|
|
|
|
def test_decode_multiple_fields(self):
|
|
data = (
|
|
_encode_field(1, _WIRE_VARINT, 10)
|
|
+ _encode_field(2, _WIRE_LEN, "test")
|
|
+ _encode_field(3, _WIRE_VARINT, 99)
|
|
)
|
|
fields = _decode_fields(data)
|
|
assert fields[1] == [10]
|
|
assert fields[2] == [b"test"]
|
|
assert fields[3] == [99]
|
|
|
|
def test_decode_repeated_fields(self):
|
|
data = (
|
|
_encode_field(3, _WIRE_VARINT, 1)
|
|
+ _encode_field(3, _WIRE_VARINT, 2)
|
|
+ _encode_field(3, _WIRE_VARINT, 3)
|
|
)
|
|
fields = _decode_fields(data)
|
|
assert fields[3] == [1, 2, 3]
|
|
|
|
def test_pack_unpack_header(self):
|
|
packed = _pack_msg(11, b"hello")
|
|
msg_type, length = _unpack_header(packed[:6])
|
|
assert msg_type == 11
|
|
assert length == 5
|
|
assert packed[6:] == b"hello"
|
|
|
|
def test_pack_empty_payload(self):
|
|
packed = _pack_msg(3)
|
|
assert len(packed) == 6
|
|
msg_type, length = _unpack_header(packed)
|
|
assert msg_type == 3
|
|
assert length == 0
|
|
|
|
def test_field_str(self):
|
|
fields = {5: [b"hello"]}
|
|
assert _field_str(fields, 5) == "hello"
|
|
assert _field_str(fields, 1) is None
|
|
|
|
def test_field_int(self):
|
|
fields = {1: [42]}
|
|
assert _field_int(fields, 1) == 42
|
|
assert _field_int(fields, 2) is None
|
|
|
|
def test_field_ints(self):
|
|
fields = {3: [1, 2, 3]}
|
|
assert _field_ints(fields, 3) == [1, 2, 3]
|
|
assert _field_ints(fields, 9) == []
|
|
|
|
def test_decode_empty(self):
|
|
fields = _decode_fields(b"")
|
|
assert fields == {}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestMumbleMessage
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMumbleMessage:
|
|
def test_defaults(self):
|
|
msg = MumbleMessage(raw={}, nick=None, prefix=None, text=None,
|
|
target=None)
|
|
assert msg.is_channel is True
|
|
assert msg.command == "PRIVMSG"
|
|
assert msg.params == []
|
|
assert msg.tags == {}
|
|
|
|
def test_custom_values(self):
|
|
msg = MumbleMessage(
|
|
raw={"field": 1}, nick="Alice", prefix="Alice",
|
|
text="hello", target="0", is_channel=True,
|
|
command="PRIVMSG", params=["0", "hello"],
|
|
tags={"key": "val"},
|
|
)
|
|
assert msg.nick == "Alice"
|
|
assert msg.prefix == "Alice"
|
|
assert msg.text == "hello"
|
|
assert msg.target == "0"
|
|
assert msg.tags == {"key": "val"}
|
|
|
|
def test_duck_type_compat(self):
|
|
"""MumbleMessage has the same attribute names as IRC Message."""
|
|
msg = _mu_msg()
|
|
attrs = ["raw", "nick", "prefix", "text", "target",
|
|
"is_channel", "command", "params", "tags"]
|
|
for attr in attrs:
|
|
assert hasattr(msg, attr), f"missing attribute: {attr}"
|
|
|
|
def test_dm_message(self):
|
|
msg = _mu_msg(target="dm", is_channel=False)
|
|
assert msg.is_channel is False
|
|
assert msg.target == "dm"
|
|
|
|
def test_prefix_is_username(self):
|
|
msg = _mu_msg(prefix="admin_user")
|
|
assert msg.prefix == "admin_user"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestBuildMumbleMessage
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestBuildMumbleMessage:
|
|
def test_channel_message(self):
|
|
fields = {
|
|
1: [42], # actor session
|
|
3: [5], # channel_id
|
|
5: [b"<b>Hello</b>"], # message HTML
|
|
}
|
|
users = {42: "Alice"}
|
|
msg = _build_mumble_message(fields, users, our_session=1)
|
|
assert msg is not None
|
|
assert msg.nick == "Alice"
|
|
assert msg.prefix == "Alice"
|
|
assert msg.text == "Hello"
|
|
assert msg.target == "5"
|
|
assert msg.is_channel is True
|
|
|
|
def test_dm_message(self):
|
|
fields = {
|
|
1: [42], # actor session
|
|
2: [1], # target session (DM)
|
|
5: [b"secret"], # message
|
|
}
|
|
users = {42: "Bob"}
|
|
msg = _build_mumble_message(fields, users, our_session=1)
|
|
assert msg is not None
|
|
assert msg.target == "dm"
|
|
assert msg.is_channel is False
|
|
|
|
def test_missing_sender(self):
|
|
fields = {
|
|
3: [0], # channel_id
|
|
5: [b"anonymous"], # message
|
|
}
|
|
msg = _build_mumble_message(fields, {}, our_session=1)
|
|
assert msg is not None
|
|
assert msg.nick is None
|
|
assert msg.prefix is None
|
|
|
|
def test_missing_message(self):
|
|
fields = {
|
|
1: [42],
|
|
3: [0],
|
|
}
|
|
msg = _build_mumble_message(fields, {42: "Alice"}, our_session=1)
|
|
assert msg is None
|
|
|
|
def test_html_stripped(self):
|
|
fields = {
|
|
1: [1],
|
|
3: [0],
|
|
5: [b"<a href='link'>click & go</a>"],
|
|
}
|
|
msg = _build_mumble_message(fields, {1: "User"}, our_session=0)
|
|
assert msg is not None
|
|
assert msg.text == "click & go"
|
|
|
|
def test_no_target(self):
|
|
fields = {
|
|
1: [42],
|
|
5: [b"orphan message"],
|
|
}
|
|
msg = _build_mumble_message(fields, {42: "Alice"}, our_session=1)
|
|
assert msg is not None
|
|
assert msg.target is None
|
|
assert msg.is_channel is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestHtmlHelpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestHtmlHelpers:
|
|
def test_strip_html_simple(self):
|
|
assert _strip_html("<b>bold</b>") == "bold"
|
|
|
|
def test_strip_html_entities(self):
|
|
assert _strip_html("& < > "") == '& < > "'
|
|
|
|
def test_strip_html_nested(self):
|
|
assert _strip_html("<p><b>hello</b> <i>world</i></p>") == "hello world"
|
|
|
|
def test_strip_html_plain(self):
|
|
assert _strip_html("no tags here") == "no tags here"
|
|
|
|
def test_escape_html(self):
|
|
assert _escape_html("<script>alert('xss')") == "<script>alert('xss')"
|
|
|
|
def test_escape_html_ampersand(self):
|
|
assert _escape_html("a & b") == "a & b"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestMumbleBotReply
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMumbleBotReply:
|
|
def test_send_builds_text_message(self):
|
|
bot = _make_bot()
|
|
sent: list[tuple[int, bytes]] = []
|
|
|
|
async def _fake_send_msg(msg_type, payload=b""):
|
|
sent.append((msg_type, payload))
|
|
|
|
with patch.object(bot, "_send_msg", side_effect=_fake_send_msg):
|
|
asyncio.run(bot.send("5", "hello"))
|
|
assert len(sent) == 1
|
|
assert sent[0][0] == MSG_TEXT_MESSAGE
|
|
# Verify payload contains the message
|
|
fields = _decode_fields(sent[0][1])
|
|
text = _field_str(fields, 5)
|
|
assert text == "hello"
|
|
assert _field_ints(fields, 3) == [5]
|
|
|
|
def test_send_escapes_html(self):
|
|
bot = _make_bot()
|
|
sent: list[tuple[int, bytes]] = []
|
|
|
|
async def _fake_send_msg(msg_type, payload=b""):
|
|
sent.append((msg_type, payload))
|
|
|
|
with patch.object(bot, "_send_msg", side_effect=_fake_send_msg):
|
|
asyncio.run(bot.send("0", "<script>alert('xss')"))
|
|
fields = _decode_fields(sent[0][1])
|
|
text = _field_str(fields, 5)
|
|
assert "<script>" not in text
|
|
assert "<script>" in text
|
|
|
|
def test_reply_sends_to_target(self):
|
|
bot = _make_bot()
|
|
msg = _mu_msg(target="5")
|
|
sent: list[tuple[str, str]] = []
|
|
|
|
async def _fake_send(target, text):
|
|
sent.append((target, text))
|
|
|
|
with patch.object(bot, "send", side_effect=_fake_send):
|
|
asyncio.run(bot.reply(msg, "pong"))
|
|
assert sent == [("5", "pong")]
|
|
|
|
def test_reply_dm_fallback(self):
|
|
bot = _make_bot()
|
|
msg = _mu_msg(target="dm", is_channel=False)
|
|
sent: list[tuple[str, str]] = []
|
|
|
|
async def _fake_send(target, text):
|
|
sent.append((target, text))
|
|
|
|
with patch.object(bot, "send", side_effect=_fake_send):
|
|
asyncio.run(bot.reply(msg, "dm reply"))
|
|
# DM falls back to root channel "0"
|
|
assert sent == [("0", "dm reply")]
|
|
|
|
def test_long_reply_under_threshold(self):
|
|
bot = _make_bot()
|
|
msg = _mu_msg()
|
|
sent: list[str] = []
|
|
|
|
async def _fake_send(target, text):
|
|
sent.append(text)
|
|
|
|
with patch.object(bot, "send", side_effect=_fake_send):
|
|
asyncio.run(bot.long_reply(msg, ["a", "b", "c"]))
|
|
assert sent == ["a", "b", "c"]
|
|
|
|
def test_long_reply_over_threshold_no_paste(self):
|
|
bot = _make_bot()
|
|
msg = _mu_msg()
|
|
sent: list[str] = []
|
|
|
|
async def _fake_send(target, text):
|
|
sent.append(text)
|
|
|
|
with patch.object(bot, "send", side_effect=_fake_send):
|
|
asyncio.run(bot.long_reply(msg, ["a", "b", "c", "d", "e"]))
|
|
assert sent == ["a", "b", "c", "d", "e"]
|
|
|
|
def test_long_reply_empty(self):
|
|
bot = _make_bot()
|
|
msg = _mu_msg()
|
|
with patch.object(bot, "send") as mock_send:
|
|
asyncio.run(bot.long_reply(msg, []))
|
|
mock_send.assert_not_called()
|
|
|
|
def test_action_format(self):
|
|
bot = _make_bot()
|
|
sent: list[tuple[str, str]] = []
|
|
|
|
async def _fake_send_html(target, html_text):
|
|
sent.append((target, html_text))
|
|
|
|
with patch.object(bot, "_send_html", side_effect=_fake_send_html):
|
|
asyncio.run(bot.action("0", "does a thing"))
|
|
assert sent == [("0", "<i>does a thing</i>")]
|
|
|
|
def test_action_escapes_content(self):
|
|
bot = _make_bot()
|
|
sent: list[tuple[str, str]] = []
|
|
|
|
async def _fake_send_html(target, html_text):
|
|
sent.append((target, html_text))
|
|
|
|
with patch.object(bot, "_send_html", side_effect=_fake_send_html):
|
|
asyncio.run(bot.action("0", "<script>"))
|
|
assert sent == [("0", "<i><script></i>")]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestMumbleBotDispatch
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMumbleBotDispatch:
|
|
def test_dispatch_known_command(self):
|
|
bot = _make_bot()
|
|
bot.registry.register_command(
|
|
"echo", _echo_handler, help="echo", plugin="test")
|
|
msg = _mu_msg(text="!echo world")
|
|
sent: list[str] = []
|
|
|
|
async def _fake_send(target, text):
|
|
sent.append(text)
|
|
|
|
with patch.object(bot, "send", side_effect=_fake_send):
|
|
asyncio.run(bot._dispatch_command(msg))
|
|
assert sent == ["world"]
|
|
|
|
def test_dispatch_unknown_command(self):
|
|
bot = _make_bot()
|
|
msg = _mu_msg(text="!nonexistent")
|
|
with patch.object(bot, "send") as mock_send:
|
|
asyncio.run(bot._dispatch_command(msg))
|
|
mock_send.assert_not_called()
|
|
|
|
def test_dispatch_no_prefix(self):
|
|
bot = _make_bot()
|
|
msg = _mu_msg(text="just a message")
|
|
with patch.object(bot, "send") as mock_send:
|
|
asyncio.run(bot._dispatch_command(msg))
|
|
mock_send.assert_not_called()
|
|
|
|
def test_dispatch_empty_text(self):
|
|
bot = _make_bot()
|
|
msg = _mu_msg(text="")
|
|
with patch.object(bot, "send") as mock_send:
|
|
asyncio.run(bot._dispatch_command(msg))
|
|
mock_send.assert_not_called()
|
|
|
|
def test_dispatch_none_text(self):
|
|
bot = _make_bot()
|
|
msg = _mu_msg()
|
|
msg.text = None
|
|
with patch.object(bot, "send") as mock_send:
|
|
asyncio.run(bot._dispatch_command(msg))
|
|
mock_send.assert_not_called()
|
|
|
|
def test_dispatch_ambiguous(self):
|
|
bot = _make_bot()
|
|
bot.registry.register_command("ping", _echo_handler, plugin="test")
|
|
bot.registry.register_command("plugins", _echo_handler, plugin="test")
|
|
msg = _mu_msg(text="!p")
|
|
sent: list[str] = []
|
|
|
|
async def _fake_send(target, text):
|
|
sent.append(text)
|
|
|
|
with patch.object(bot, "send", side_effect=_fake_send):
|
|
asyncio.run(bot._dispatch_command(msg))
|
|
assert len(sent) == 1
|
|
assert "Ambiguous" in sent[0]
|
|
|
|
def test_dispatch_tier_denied(self):
|
|
bot = _make_bot()
|
|
bot.registry.register_command(
|
|
"secret", _admin_handler, plugin="test", tier="admin")
|
|
msg = _mu_msg(text="!secret", prefix="nobody")
|
|
sent: list[str] = []
|
|
|
|
async def _fake_send(target, text):
|
|
sent.append(text)
|
|
|
|
with patch.object(bot, "send", side_effect=_fake_send):
|
|
asyncio.run(bot._dispatch_command(msg))
|
|
assert len(sent) == 1
|
|
assert "Permission denied" in sent[0]
|
|
|
|
def test_dispatch_tier_allowed(self):
|
|
bot = _make_bot(admins=["Alice"])
|
|
bot.registry.register_command(
|
|
"secret", _admin_handler, plugin="test", tier="admin")
|
|
msg = _mu_msg(text="!secret", prefix="Alice")
|
|
sent: list[str] = []
|
|
|
|
async def _fake_send(target, text):
|
|
sent.append(text)
|
|
|
|
with patch.object(bot, "send", side_effect=_fake_send):
|
|
asyncio.run(bot._dispatch_command(msg))
|
|
assert sent == ["admin action done"]
|
|
|
|
def test_dispatch_prefix_match(self):
|
|
bot = _make_bot()
|
|
bot.registry.register_command("echo", _echo_handler, plugin="test")
|
|
msg = _mu_msg(text="!ec hello")
|
|
sent: list[str] = []
|
|
|
|
async def _fake_send(target, text):
|
|
sent.append(text)
|
|
|
|
with patch.object(bot, "send", side_effect=_fake_send):
|
|
asyncio.run(bot._dispatch_command(msg))
|
|
assert sent == ["hello"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestMumbleBotTier
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMumbleBotTier:
|
|
def test_admin_tier(self):
|
|
bot = _make_bot(admins=["AdminUser"])
|
|
msg = _mu_msg(prefix="AdminUser")
|
|
assert bot._get_tier(msg) == "admin"
|
|
|
|
def test_oper_tier(self):
|
|
bot = _make_bot(operators=["OperUser"])
|
|
msg = _mu_msg(prefix="OperUser")
|
|
assert bot._get_tier(msg) == "oper"
|
|
|
|
def test_trusted_tier(self):
|
|
bot = _make_bot(trusted=["TrustedUser"])
|
|
msg = _mu_msg(prefix="TrustedUser")
|
|
assert bot._get_tier(msg) == "trusted"
|
|
|
|
def test_user_tier_default(self):
|
|
bot = _make_bot()
|
|
msg = _mu_msg(prefix="RandomUser")
|
|
assert bot._get_tier(msg) == "user"
|
|
|
|
def test_no_prefix(self):
|
|
bot = _make_bot(admins=["Admin"])
|
|
msg = _mu_msg()
|
|
msg.prefix = None
|
|
assert bot._get_tier(msg) == "user"
|
|
|
|
def test_is_admin_true(self):
|
|
bot = _make_bot(admins=["Admin"])
|
|
msg = _mu_msg(prefix="Admin")
|
|
assert bot._is_admin(msg) is True
|
|
|
|
def test_is_admin_false(self):
|
|
bot = _make_bot()
|
|
msg = _mu_msg(prefix="Nobody")
|
|
assert bot._is_admin(msg) is False
|
|
|
|
def test_priority_order(self):
|
|
"""Admin takes priority over oper and trusted."""
|
|
bot = _make_bot(admins=["User"], operators=["User"], trusted=["User"])
|
|
msg = _mu_msg(prefix="User")
|
|
assert bot._get_tier(msg) == "admin"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestMumbleBotNoOps
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMumbleBotNoOps:
|
|
def test_join_noop(self):
|
|
bot = _make_bot()
|
|
asyncio.run(bot.join("#channel"))
|
|
|
|
def test_part_noop(self):
|
|
bot = _make_bot()
|
|
asyncio.run(bot.part("#channel", "reason"))
|
|
|
|
def test_kick_noop(self):
|
|
bot = _make_bot()
|
|
asyncio.run(bot.kick("#channel", "nick", "reason"))
|
|
|
|
def test_mode_noop(self):
|
|
bot = _make_bot()
|
|
asyncio.run(bot.mode("#channel", "+o", "nick"))
|
|
|
|
def test_set_topic_noop(self):
|
|
bot = _make_bot()
|
|
asyncio.run(bot.set_topic("#channel", "new topic"))
|
|
|
|
def test_quit_stops(self):
|
|
bot = _make_bot()
|
|
bot._running = True
|
|
asyncio.run(bot.quit())
|
|
assert bot._running is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestMumbleBotProtocol
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMumbleBotProtocol:
|
|
def test_version_payload_structure(self):
|
|
payload = _build_version_payload()
|
|
fields = _decode_fields(payload)
|
|
# field 1: version_v1 (uint32)
|
|
assert _field_int(fields, 1) == (1 << 16) | (5 << 8)
|
|
# field 2: release (string)
|
|
assert _field_str(fields, 2) == "derp 1.5.0"
|
|
# field 3: os (string)
|
|
assert _field_str(fields, 3) == "Linux"
|
|
|
|
def test_authenticate_payload_structure(self):
|
|
payload = _build_authenticate_payload("testuser", "testpass")
|
|
fields = _decode_fields(payload)
|
|
assert _field_str(fields, 1) == "testuser"
|
|
assert _field_str(fields, 2) == "testpass"
|
|
# field 5: opus (bool=1)
|
|
assert _field_int(fields, 5) == 1
|
|
|
|
def test_authenticate_no_password(self):
|
|
payload = _build_authenticate_payload("testuser", "")
|
|
fields = _decode_fields(payload)
|
|
assert _field_str(fields, 1) == "testuser"
|
|
assert 2 not in fields # no password field
|
|
|
|
def test_ping_payload_roundtrip(self):
|
|
payload = _build_ping_payload(123456789)
|
|
fields = _decode_fields(payload)
|
|
assert _field_int(fields, 1) == 123456789
|
|
|
|
def test_text_message_payload(self):
|
|
payload = _build_text_message_payload(
|
|
channel_ids=[5], message="<b>hello</b>",
|
|
)
|
|
fields = _decode_fields(payload)
|
|
assert _field_str(fields, 5) == "<b>hello</b>"
|
|
assert _field_ints(fields, 3) == [5]
|
|
|
|
def test_text_message_multiple_channels(self):
|
|
payload = _build_text_message_payload(
|
|
channel_ids=[1, 2, 3], message="broadcast",
|
|
)
|
|
fields = _decode_fields(payload)
|
|
assert _field_ints(fields, 3) == [1, 2, 3]
|
|
|
|
def test_handle_ping_echoes(self):
|
|
bot = _make_bot()
|
|
sent: list[tuple[int, bytes]] = []
|
|
|
|
async def _fake_send_msg(msg_type, payload=b""):
|
|
sent.append((msg_type, payload))
|
|
|
|
ping_payload = _build_ping_payload(42)
|
|
with patch.object(bot, "_send_msg", side_effect=_fake_send_msg):
|
|
asyncio.run(bot._handle(MSG_PING, ping_payload))
|
|
assert len(sent) == 1
|
|
assert sent[0][0] == MSG_PING
|
|
fields = _decode_fields(sent[0][1])
|
|
assert _field_int(fields, 1) == 42
|
|
|
|
def test_handle_server_sync(self):
|
|
bot = _make_bot()
|
|
payload = (
|
|
_encode_field(1, _WIRE_VARINT, 99)
|
|
+ _encode_field(3, _WIRE_LEN, "Welcome!")
|
|
)
|
|
asyncio.run(bot._handle(MSG_SERVER_SYNC, payload))
|
|
assert bot._session == 99
|
|
|
|
def test_handle_channel_state(self):
|
|
bot = _make_bot()
|
|
payload = (
|
|
_encode_field(1, _WIRE_VARINT, 5)
|
|
+ _encode_field(3, _WIRE_LEN, "General")
|
|
)
|
|
asyncio.run(bot._handle(MSG_CHANNEL_STATE, payload))
|
|
assert bot._channels[5] == "General"
|
|
|
|
def test_handle_channel_remove(self):
|
|
bot = _make_bot()
|
|
bot._channels[5] = "General"
|
|
payload = _encode_field(1, _WIRE_VARINT, 5)
|
|
asyncio.run(bot._handle(MSG_CHANNEL_REMOVE, payload))
|
|
assert 5 not in bot._channels
|
|
|
|
def test_handle_user_state(self):
|
|
bot = _make_bot()
|
|
payload = (
|
|
_encode_field(1, _WIRE_VARINT, 10)
|
|
+ _encode_field(3, _WIRE_LEN, "Alice")
|
|
+ _encode_field(5, _WIRE_VARINT, 0)
|
|
)
|
|
asyncio.run(bot._handle(MSG_USER_STATE, payload))
|
|
assert bot._users[10] == "Alice"
|
|
assert bot._user_channels[10] == 0
|
|
|
|
def test_handle_user_state_channel_change(self):
|
|
bot = _make_bot()
|
|
bot._users[10] = "Alice"
|
|
bot._user_channels[10] = 0
|
|
# User moves to channel 5
|
|
payload = (
|
|
_encode_field(1, _WIRE_VARINT, 10)
|
|
+ _encode_field(5, _WIRE_VARINT, 5)
|
|
)
|
|
asyncio.run(bot._handle(MSG_USER_STATE, payload))
|
|
assert bot._users[10] == "Alice" # name unchanged
|
|
assert bot._user_channels[10] == 5
|
|
|
|
def test_handle_user_remove(self):
|
|
bot = _make_bot()
|
|
bot._users[10] = "Alice"
|
|
bot._user_channels[10] = 0
|
|
payload = _encode_field(1, _WIRE_VARINT, 10)
|
|
asyncio.run(bot._handle(MSG_USER_REMOVE, payload))
|
|
assert 10 not in bot._users
|
|
assert 10 not in bot._user_channels
|
|
|
|
def test_handle_text_message_dispatch(self):
|
|
bot = _make_bot()
|
|
bot._users[42] = "Alice"
|
|
bot.registry.register_command(
|
|
"echo", _echo_handler, help="echo", plugin="test")
|
|
payload = (
|
|
_encode_field(1, _WIRE_VARINT, 42) # actor
|
|
+ _encode_field(3, _WIRE_VARINT, 0) # channel_id
|
|
+ _encode_field(5, _WIRE_LEN, "!echo test") # message
|
|
)
|
|
sent: list[str] = []
|
|
|
|
async def _fake_send(target, text):
|
|
sent.append(text)
|
|
|
|
with patch.object(bot, "send", side_effect=_fake_send):
|
|
asyncio.run(bot._handle(MSG_TEXT_MESSAGE, payload))
|
|
assert sent == ["test"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestPluginManagement
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPluginManagement:
|
|
def test_load_plugin_not_found(self):
|
|
bot = _make_bot()
|
|
ok, msg = bot.load_plugin("nonexistent_xyz")
|
|
assert ok is False
|
|
assert "not found" in msg
|
|
|
|
def test_load_plugin_already_loaded(self):
|
|
bot = _make_bot()
|
|
bot.registry._modules["test"] = object()
|
|
ok, msg = bot.load_plugin("test")
|
|
assert ok is False
|
|
assert "already loaded" in msg
|
|
|
|
def test_unload_core_refused(self):
|
|
bot = _make_bot()
|
|
ok, msg = bot.unload_plugin("core")
|
|
assert ok is False
|
|
assert "cannot unload core" in msg
|
|
|
|
def test_unload_not_loaded(self):
|
|
bot = _make_bot()
|
|
ok, msg = bot.unload_plugin("nonexistent")
|
|
assert ok is False
|
|
assert "not loaded" in msg
|
|
|
|
def test_reload_delegates(self):
|
|
bot = _make_bot()
|
|
ok, msg = bot.reload_plugin("nonexistent")
|
|
assert ok is False
|
|
assert "not loaded" in msg
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestMumbleBotConfig
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMumbleBotConfig:
|
|
def test_prefix_from_mumble_section(self):
|
|
config = {
|
|
"mumble": {
|
|
"enabled": True,
|
|
"host": "127.0.0.1",
|
|
"port": 64738,
|
|
"username": "derp",
|
|
"password": "",
|
|
"tls_verify": False,
|
|
"prefix": "/",
|
|
"admins": [],
|
|
"operators": [],
|
|
"trusted": [],
|
|
},
|
|
"bot": {"prefix": "!", "rate_limit": 2.0, "rate_burst": 5},
|
|
}
|
|
bot = MumbleBot("test", config, PluginRegistry())
|
|
assert bot.prefix == "/"
|
|
|
|
def test_prefix_falls_back_to_bot(self):
|
|
config = {
|
|
"mumble": {
|
|
"enabled": True,
|
|
"host": "127.0.0.1",
|
|
"port": 64738,
|
|
"username": "derp",
|
|
"password": "",
|
|
"tls_verify": False,
|
|
"admins": [],
|
|
"operators": [],
|
|
"trusted": [],
|
|
},
|
|
"bot": {"prefix": "!", "rate_limit": 2.0, "rate_burst": 5},
|
|
}
|
|
bot = MumbleBot("test", config, PluginRegistry())
|
|
assert bot.prefix == "!"
|
|
|
|
def test_admins_coerced_to_str(self):
|
|
bot = _make_bot(admins=[111, 222])
|
|
assert bot._admins == ["111", "222"]
|
|
|
|
def test_default_port(self):
|
|
bot = _make_bot()
|
|
assert bot._port == 64738
|
|
|
|
def test_tls_verify_default(self):
|
|
bot = _make_bot()
|
|
assert bot._tls_verify is False
|
|
|
|
def test_nick_from_username(self):
|
|
bot = _make_bot()
|
|
assert bot.nick == "derp"
|
|
|
|
def test_proxy_default_true(self):
|
|
bot = _make_bot()
|
|
assert bot._proxy is True
|
|
|
|
def test_proxy_disabled(self):
|
|
config = {
|
|
"mumble": {
|
|
"enabled": True,
|
|
"host": "127.0.0.1",
|
|
"port": 64738,
|
|
"username": "derp",
|
|
"password": "",
|
|
"tls_verify": False,
|
|
"proxy": False,
|
|
"admins": [],
|
|
"operators": [],
|
|
"trusted": [],
|
|
},
|
|
"bot": {"prefix": "!", "rate_limit": 2.0, "rate_burst": 5},
|
|
}
|
|
bot = MumbleBot("test", config, PluginRegistry())
|
|
assert bot._proxy is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestVoiceVarint
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestVoiceVarint:
|
|
def test_zero(self):
|
|
assert _encode_voice_varint(0) == b"\x00"
|
|
|
|
def test_7bit_max(self):
|
|
assert _encode_voice_varint(127) == b"\x7f"
|
|
|
|
def test_7bit_small(self):
|
|
assert _encode_voice_varint(1) == b"\x01"
|
|
assert _encode_voice_varint(42) == b"\x2a"
|
|
|
|
def test_14bit_min(self):
|
|
# 128 = 0x80 -> prefix 10, value 128
|
|
result = _encode_voice_varint(128)
|
|
assert len(result) == 2
|
|
assert result[0] & 0xC0 == 0x80 # top 2 bits = 10
|
|
|
|
def test_14bit_max(self):
|
|
result = _encode_voice_varint(0x3FFF)
|
|
assert len(result) == 2
|
|
|
|
def test_21bit(self):
|
|
result = _encode_voice_varint(0x4000)
|
|
assert len(result) == 3
|
|
assert result[0] & 0xE0 == 0xC0 # top 3 bits = 110
|
|
|
|
def test_28bit(self):
|
|
result = _encode_voice_varint(0x200000)
|
|
assert len(result) == 4
|
|
assert result[0] & 0xF0 == 0xE0 # top 4 bits = 1110
|
|
|
|
def test_64bit(self):
|
|
result = _encode_voice_varint(0x10000000)
|
|
assert len(result) == 9
|
|
assert result[0] == 0xF0
|
|
|
|
def test_negative_raises(self):
|
|
import pytest
|
|
with pytest.raises(ValueError, match="non-negative"):
|
|
_encode_voice_varint(-1)
|
|
|
|
def test_14bit_roundtrip(self):
|
|
"""Value encoded and decoded back correctly (manual decode)."""
|
|
val = 300
|
|
data = _encode_voice_varint(val)
|
|
assert len(data) == 2
|
|
decoded = ((data[0] & 0x3F) << 8) | data[1]
|
|
assert decoded == val
|
|
|
|
def test_7bit_roundtrip(self):
|
|
for v in range(128):
|
|
data = _encode_voice_varint(v)
|
|
assert len(data) == 1
|
|
assert data[0] == v
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestVoicePacket
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestVoicePacket:
|
|
def test_header_byte(self):
|
|
pkt = _build_voice_packet(0, b"\xaa\xbb")
|
|
assert pkt[0] == 0x80 # type=4, target=0
|
|
|
|
def test_sequence_encoding(self):
|
|
pkt = _build_voice_packet(42, b"\x00")
|
|
# byte 0: header, byte 1: sequence=42 (7-bit)
|
|
assert pkt[1] == 42
|
|
|
|
def test_opus_data_present(self):
|
|
opus = b"\xde\xad\xbe\xef"
|
|
pkt = _build_voice_packet(0, opus)
|
|
assert pkt.endswith(opus)
|
|
|
|
def test_length_field(self):
|
|
opus = b"\x00" * 10
|
|
pkt = _build_voice_packet(0, opus)
|
|
# header(1) + seq(1, val=0) + length(1, val=10) + data(10) = 13
|
|
assert len(pkt) == 13
|
|
assert pkt[2] == 10 # length varint
|
|
|
|
def test_terminator_flag(self):
|
|
opus = b"\x00" * 5
|
|
pkt = _build_voice_packet(0, opus, last=True)
|
|
# length with bit 13 set: 5 | 0x2000 = 0x2005
|
|
# 0x2005 in 14-bit varint: 10_100000 00000101
|
|
length_bytes = _encode_voice_varint(5 | 0x2000)
|
|
assert length_bytes in pkt
|
|
|
|
def test_no_terminator_by_default(self):
|
|
opus = b"\x00" * 5
|
|
pkt = _build_voice_packet(0, opus, last=False)
|
|
# length=5, no bit 13
|
|
assert pkt[2] == 5
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestPcmScaling
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPcmScaling:
|
|
def test_unity_volume(self):
|
|
import struct as _s
|
|
pcm = _s.pack("<hh", 1000, -1000)
|
|
result = _scale_pcm(pcm, 1.0)
|
|
assert result == pcm
|
|
|
|
def test_half_volume(self):
|
|
import struct as _s
|
|
pcm = _s.pack("<h", 1000)
|
|
result = _scale_pcm(pcm, 0.5)
|
|
samples = _s.unpack("<h", result)
|
|
assert samples[0] == 500
|
|
|
|
def test_clamp_positive(self):
|
|
import struct as _s
|
|
pcm = _s.pack("<h", 32767)
|
|
result = _scale_pcm(pcm, 2.0)
|
|
samples = _s.unpack("<h", result)
|
|
assert samples[0] == 32767
|
|
|
|
def test_clamp_negative(self):
|
|
import struct as _s
|
|
pcm = _s.pack("<h", -32768)
|
|
result = _scale_pcm(pcm, 2.0)
|
|
samples = _s.unpack("<h", result)
|
|
assert samples[0] == -32768
|
|
|
|
def test_zero_volume(self):
|
|
import struct as _s
|
|
pcm = _s.pack("<hh", 32767, -32768)
|
|
result = _scale_pcm(pcm, 0.0)
|
|
samples = _s.unpack("<hh", result)
|
|
assert samples == (0, 0)
|
|
|
|
def test_preserves_length(self):
|
|
pcm = b"\x00" * 1920 # 960 samples
|
|
result = _scale_pcm(pcm, 0.5)
|
|
assert len(result) == 1920
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestShellQuote
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestShellQuote:
|
|
def test_simple(self):
|
|
assert _shell_quote("hello") == "'hello'"
|
|
|
|
def test_single_quote(self):
|
|
assert _shell_quote("it's") == "'it'\\''s'"
|
|
|
|
def test_url(self):
|
|
url = "https://youtube.com/watch?v=abc&t=10"
|
|
quoted = _shell_quote(url)
|
|
assert quoted.startswith("'")
|
|
assert quoted.endswith("'")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestMsgUdpTunnel
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMsgUdpTunnel:
|
|
def test_constant(self):
|
|
assert MSG_UDPTUNNEL == 1
|