Files
derp/tests/test_mumble.py
user ab924444de fix: survive mumble disconnects without restarting audio stream
Guard stream_audio with _is_audio_ready() so that PCM frames are
dropped (not crashed on) when pymumble recreates SoundOutput with
encoder=None during reconnect. The ffmpeg pipeline stays alive,
position tracking remains accurate, and audio feeding resumes once
the codec is negotiated. Listeners hear brief silence instead of
a 30+ second restart with URL re-resolution.

Also adds chat messages to _auto_resume so users see what the bot
intends ("Resuming 'X' at M:SS in a moment" / "...aborted").
2026-02-22 02:41:44 +01:00

740 lines
23 KiB
Python

"""Tests for the Mumble adapter."""
import asyncio
import struct
from unittest.mock import AsyncMock, MagicMock, patch
from derp.mumble import (
MumbleBot,
MumbleMessage,
_escape_html,
_scale_pcm,
_scale_pcm_ramp,
_shell_quote,
_strip_html,
)
from derp.plugin import PluginRegistry
# -- Helpers -----------------------------------------------------------------
def _make_bot(admins=None, operators=None, trusted=None, prefix=None):
"""Create a MumbleBot with test config."""
config = {
"mumble": {
"enabled": True,
"host": "127.0.0.1",
"port": 64738,
"username": "derp",
"password": "",
"admins": admins or [],
"operators": operators or [],
"trusted": trusted or [],
},
"bot": {
"prefix": prefix or "!",
"paste_threshold": 4,
"plugins_dir": "plugins",
"rate_limit": 2.0,
"rate_burst": 5,
},
}
registry = PluginRegistry()
bot = MumbleBot("mu-test", config, registry)
return bot
def _mu_msg(text="!ping", nick="Alice", prefix="Alice",
target="0", is_channel=True):
"""Create a MumbleMessage for command testing."""
return MumbleMessage(
raw={}, nick=nick, prefix=prefix, text=text, target=target,
is_channel=is_channel,
params=[target, text],
)
# -- Test helpers for registering commands -----------------------------------
async def _echo_handler(bot, msg):
"""Simple command handler that echoes text."""
args = msg.text.split(None, 1)
reply = args[1] if len(args) > 1 else "no args"
await bot.reply(msg, reply)
async def _admin_handler(bot, msg):
"""Admin-only command handler."""
await bot.reply(msg, "admin action done")
# ---------------------------------------------------------------------------
# TestMumbleMessage
# ---------------------------------------------------------------------------
class TestMumbleMessage:
def test_defaults(self):
msg = MumbleMessage(raw={}, nick=None, prefix=None, text=None,
target=None)
assert msg.is_channel is True
assert msg.command == "PRIVMSG"
assert msg.params == []
assert msg.tags == {}
def test_custom_values(self):
msg = MumbleMessage(
raw={"field": 1}, nick="Alice", prefix="Alice",
text="hello", target="0", is_channel=True,
command="PRIVMSG", params=["0", "hello"],
tags={"key": "val"},
)
assert msg.nick == "Alice"
assert msg.prefix == "Alice"
assert msg.text == "hello"
assert msg.target == "0"
assert msg.tags == {"key": "val"}
def test_duck_type_compat(self):
"""MumbleMessage has the same attribute names as IRC Message."""
msg = _mu_msg()
attrs = ["raw", "nick", "prefix", "text", "target",
"is_channel", "command", "params", "tags"]
for attr in attrs:
assert hasattr(msg, attr), f"missing attribute: {attr}"
def test_dm_message(self):
msg = _mu_msg(target="dm", is_channel=False)
assert msg.is_channel is False
assert msg.target == "dm"
def test_prefix_is_username(self):
msg = _mu_msg(prefix="admin_user")
assert msg.prefix == "admin_user"
# ---------------------------------------------------------------------------
# TestHtmlHelpers
# ---------------------------------------------------------------------------
class TestHtmlHelpers:
def test_strip_html_simple(self):
assert _strip_html("<b>bold</b>") == "bold"
def test_strip_html_entities(self):
assert _strip_html("&amp; &lt; &gt; &quot;") == '& < > "'
def test_strip_html_nested(self):
assert _strip_html("<p><b>hello</b> <i>world</i></p>") == "hello world"
def test_strip_html_plain(self):
assert _strip_html("no tags here") == "no tags here"
def test_escape_html(self):
assert _escape_html("<script>alert('xss')") == "&lt;script&gt;alert('xss')"
def test_escape_html_ampersand(self):
assert _escape_html("a & b") == "a &amp; b"
# ---------------------------------------------------------------------------
# TestMumbleBotReply
# ---------------------------------------------------------------------------
class TestMumbleBotReply:
def test_send_calls_send_html(self):
bot = _make_bot()
sent: list[tuple[str, str]] = []
async def _fake_send_html(target, html_text):
sent.append((target, html_text))
with patch.object(bot, "_send_html", side_effect=_fake_send_html):
asyncio.run(bot.send("5", "hello"))
assert sent == [("5", "hello")]
def test_send_escapes_html(self):
bot = _make_bot()
sent: list[tuple[str, str]] = []
async def _fake_send_html(target, html_text):
sent.append((target, html_text))
with patch.object(bot, "_send_html", side_effect=_fake_send_html):
asyncio.run(bot.send("0", "<script>alert('xss')"))
assert "&lt;script&gt;" in sent[0][1]
def test_reply_sends_to_target(self):
bot = _make_bot()
msg = _mu_msg(target="5")
sent: list[tuple[str, str]] = []
async def _fake_send(target, text):
sent.append((target, text))
with patch.object(bot, "send", side_effect=_fake_send):
asyncio.run(bot.reply(msg, "pong"))
assert sent == [("5", "pong")]
def test_reply_dm_fallback(self):
bot = _make_bot()
msg = _mu_msg(target="dm", is_channel=False)
sent: list[tuple[str, str]] = []
async def _fake_send(target, text):
sent.append((target, text))
with patch.object(bot, "send", side_effect=_fake_send):
asyncio.run(bot.reply(msg, "dm reply"))
assert sent == [("0", "dm reply")]
def test_long_reply_under_threshold(self):
bot = _make_bot()
msg = _mu_msg()
sent: list[str] = []
async def _fake_send(target, text):
sent.append(text)
with patch.object(bot, "send", side_effect=_fake_send):
asyncio.run(bot.long_reply(msg, ["a", "b", "c"]))
assert sent == ["a", "b", "c"]
def test_long_reply_over_threshold_no_paste(self):
bot = _make_bot()
msg = _mu_msg()
sent: list[str] = []
async def _fake_send(target, text):
sent.append(text)
with patch.object(bot, "send", side_effect=_fake_send):
asyncio.run(bot.long_reply(msg, ["a", "b", "c", "d", "e"]))
assert sent == ["a", "b", "c", "d", "e"]
def test_long_reply_empty(self):
bot = _make_bot()
msg = _mu_msg()
with patch.object(bot, "send") as mock_send:
asyncio.run(bot.long_reply(msg, []))
mock_send.assert_not_called()
def test_action_format(self):
bot = _make_bot()
sent: list[tuple[str, str]] = []
async def _fake_send_html(target, html_text):
sent.append((target, html_text))
with patch.object(bot, "_send_html", side_effect=_fake_send_html):
asyncio.run(bot.action("0", "does a thing"))
assert sent == [("0", "<i>does a thing</i>")]
def test_action_escapes_content(self):
bot = _make_bot()
sent: list[tuple[str, str]] = []
async def _fake_send_html(target, html_text):
sent.append((target, html_text))
with patch.object(bot, "_send_html", side_effect=_fake_send_html):
asyncio.run(bot.action("0", "<script>"))
assert sent == [("0", "<i>&lt;script&gt;</i>")]
# ---------------------------------------------------------------------------
# TestMumbleBotDispatch
# ---------------------------------------------------------------------------
class TestMumbleBotDispatch:
def test_dispatch_known_command(self):
bot = _make_bot()
bot.registry.register_command(
"echo", _echo_handler, help="echo", plugin="test")
msg = _mu_msg(text="!echo world")
sent: list[str] = []
async def _fake_send(target, text):
sent.append(text)
with patch.object(bot, "send", side_effect=_fake_send):
asyncio.run(bot._dispatch_command(msg))
assert sent == ["world"]
def test_dispatch_unknown_command(self):
bot = _make_bot()
msg = _mu_msg(text="!nonexistent")
with patch.object(bot, "send") as mock_send:
asyncio.run(bot._dispatch_command(msg))
mock_send.assert_not_called()
def test_dispatch_no_prefix(self):
bot = _make_bot()
msg = _mu_msg(text="just a message")
with patch.object(bot, "send") as mock_send:
asyncio.run(bot._dispatch_command(msg))
mock_send.assert_not_called()
def test_dispatch_empty_text(self):
bot = _make_bot()
msg = _mu_msg(text="")
with patch.object(bot, "send") as mock_send:
asyncio.run(bot._dispatch_command(msg))
mock_send.assert_not_called()
def test_dispatch_none_text(self):
bot = _make_bot()
msg = _mu_msg()
msg.text = None
with patch.object(bot, "send") as mock_send:
asyncio.run(bot._dispatch_command(msg))
mock_send.assert_not_called()
def test_dispatch_ambiguous(self):
bot = _make_bot()
bot.registry.register_command("ping", _echo_handler, plugin="test")
bot.registry.register_command("plugins", _echo_handler, plugin="test")
msg = _mu_msg(text="!p")
sent: list[str] = []
async def _fake_send(target, text):
sent.append(text)
with patch.object(bot, "send", side_effect=_fake_send):
asyncio.run(bot._dispatch_command(msg))
assert len(sent) == 1
assert "Ambiguous" in sent[0]
def test_dispatch_tier_denied(self):
bot = _make_bot()
bot.registry.register_command(
"secret", _admin_handler, plugin="test", tier="admin")
msg = _mu_msg(text="!secret", prefix="nobody")
sent: list[str] = []
async def _fake_send(target, text):
sent.append(text)
with patch.object(bot, "send", side_effect=_fake_send):
asyncio.run(bot._dispatch_command(msg))
assert len(sent) == 1
assert "Permission denied" in sent[0]
def test_dispatch_tier_allowed(self):
bot = _make_bot(admins=["Alice"])
bot.registry.register_command(
"secret", _admin_handler, plugin="test", tier="admin")
msg = _mu_msg(text="!secret", prefix="Alice")
sent: list[str] = []
async def _fake_send(target, text):
sent.append(text)
with patch.object(bot, "send", side_effect=_fake_send):
asyncio.run(bot._dispatch_command(msg))
assert sent == ["admin action done"]
def test_dispatch_prefix_match(self):
bot = _make_bot()
bot.registry.register_command("echo", _echo_handler, plugin="test")
msg = _mu_msg(text="!ec hello")
sent: list[str] = []
async def _fake_send(target, text):
sent.append(text)
with patch.object(bot, "send", side_effect=_fake_send):
asyncio.run(bot._dispatch_command(msg))
assert sent == ["hello"]
# ---------------------------------------------------------------------------
# TestMumbleBotTier
# ---------------------------------------------------------------------------
class TestMumbleBotTier:
def test_admin_tier(self):
bot = _make_bot(admins=["AdminUser"])
msg = _mu_msg(prefix="AdminUser")
assert bot._get_tier(msg) == "admin"
def test_oper_tier(self):
bot = _make_bot(operators=["OperUser"])
msg = _mu_msg(prefix="OperUser")
assert bot._get_tier(msg) == "oper"
def test_trusted_tier(self):
bot = _make_bot(trusted=["TrustedUser"])
msg = _mu_msg(prefix="TrustedUser")
assert bot._get_tier(msg) == "trusted"
def test_user_tier_default(self):
bot = _make_bot()
msg = _mu_msg(prefix="RandomUser")
assert bot._get_tier(msg) == "user"
def test_no_prefix(self):
bot = _make_bot(admins=["Admin"])
msg = _mu_msg()
msg.prefix = None
assert bot._get_tier(msg) == "user"
def test_is_admin_true(self):
bot = _make_bot(admins=["Admin"])
msg = _mu_msg(prefix="Admin")
assert bot._is_admin(msg) is True
def test_is_admin_false(self):
bot = _make_bot()
msg = _mu_msg(prefix="Nobody")
assert bot._is_admin(msg) is False
def test_priority_order(self):
"""Admin takes priority over oper and trusted."""
bot = _make_bot(admins=["User"], operators=["User"], trusted=["User"])
msg = _mu_msg(prefix="User")
assert bot._get_tier(msg) == "admin"
# ---------------------------------------------------------------------------
# TestMumbleBotNoOps
# ---------------------------------------------------------------------------
class TestMumbleBotNoOps:
def test_join_noop(self):
bot = _make_bot()
asyncio.run(bot.join("#channel"))
def test_part_noop(self):
bot = _make_bot()
asyncio.run(bot.part("#channel", "reason"))
def test_kick_noop(self):
bot = _make_bot()
asyncio.run(bot.kick("#channel", "nick", "reason"))
def test_mode_noop(self):
bot = _make_bot()
asyncio.run(bot.mode("#channel", "+o", "nick"))
def test_set_topic_noop(self):
bot = _make_bot()
asyncio.run(bot.set_topic("#channel", "new topic"))
def test_quit_stops(self):
bot = _make_bot()
bot._running = True
asyncio.run(bot.quit())
assert bot._running is False
# ---------------------------------------------------------------------------
# TestPluginManagement
# ---------------------------------------------------------------------------
class TestPluginManagement:
def test_load_plugin_not_found(self):
bot = _make_bot()
ok, msg = bot.load_plugin("nonexistent_xyz")
assert ok is False
assert "not found" in msg
def test_load_plugin_already_loaded(self):
bot = _make_bot()
bot.registry._modules["test"] = object()
ok, msg = bot.load_plugin("test")
assert ok is False
assert "already loaded" in msg
def test_unload_core_refused(self):
bot = _make_bot()
ok, msg = bot.unload_plugin("core")
assert ok is False
assert "cannot unload core" in msg
def test_unload_not_loaded(self):
bot = _make_bot()
ok, msg = bot.unload_plugin("nonexistent")
assert ok is False
assert "not loaded" in msg
def test_reload_delegates(self):
bot = _make_bot()
ok, msg = bot.reload_plugin("nonexistent")
assert ok is False
assert "not loaded" in msg
# ---------------------------------------------------------------------------
# TestMumbleBotConfig
# ---------------------------------------------------------------------------
class TestMumbleBotConfig:
def test_prefix_from_mumble_section(self):
config = {
"mumble": {
"enabled": True,
"host": "127.0.0.1",
"port": 64738,
"username": "derp",
"password": "",
"prefix": "/",
"admins": [],
"operators": [],
"trusted": [],
},
"bot": {"prefix": "!", "rate_limit": 2.0, "rate_burst": 5},
}
bot = MumbleBot("test", config, PluginRegistry())
assert bot.prefix == "/"
def test_prefix_falls_back_to_bot(self):
config = {
"mumble": {
"enabled": True,
"host": "127.0.0.1",
"port": 64738,
"username": "derp",
"password": "",
"admins": [],
"operators": [],
"trusted": [],
},
"bot": {"prefix": "!", "rate_limit": 2.0, "rate_burst": 5},
}
bot = MumbleBot("test", config, PluginRegistry())
assert bot.prefix == "!"
def test_admins_coerced_to_str(self):
bot = _make_bot(admins=[111, 222])
assert bot._admins == ["111", "222"]
def test_default_port(self):
bot = _make_bot()
assert bot._port == 64738
def test_nick_from_username(self):
bot = _make_bot()
assert bot.nick == "derp"
# ---------------------------------------------------------------------------
# TestPcmScaling
# ---------------------------------------------------------------------------
class TestPcmScaling:
def test_unity_volume(self):
pcm = struct.pack("<hh", 1000, -1000)
result = _scale_pcm(pcm, 1.0)
assert result == pcm
def test_half_volume(self):
pcm = struct.pack("<h", 1000)
result = _scale_pcm(pcm, 0.5)
samples = struct.unpack("<h", result)
assert samples[0] == 500
def test_clamp_positive(self):
pcm = struct.pack("<h", 32767)
result = _scale_pcm(pcm, 2.0)
samples = struct.unpack("<h", result)
assert samples[0] == 32767
def test_clamp_negative(self):
pcm = struct.pack("<h", -32768)
result = _scale_pcm(pcm, 2.0)
samples = struct.unpack("<h", result)
assert samples[0] == -32768
def test_zero_volume(self):
pcm = struct.pack("<hh", 32767, -32768)
result = _scale_pcm(pcm, 0.0)
samples = struct.unpack("<hh", result)
assert samples == (0, 0)
def test_preserves_length(self):
pcm = b"\x00" * 1920
result = _scale_pcm(pcm, 0.5)
assert len(result) == 1920
# ---------------------------------------------------------------------------
# TestShellQuote
# ---------------------------------------------------------------------------
class TestShellQuote:
def test_simple(self):
assert _shell_quote("hello") == "'hello'"
def test_single_quote(self):
assert _shell_quote("it's") == "'it'\\''s'"
def test_url(self):
url = "https://youtube.com/watch?v=abc&t=10"
quoted = _shell_quote(url)
assert quoted.startswith("'")
assert quoted.endswith("'")
# ---------------------------------------------------------------------------
# TestPcmRamping
# ---------------------------------------------------------------------------
class TestPcmRamping:
def test_flat_when_equal(self):
"""When vol_start == vol_end, behaves like _scale_pcm."""
pcm = struct.pack("<hh", 1000, -1000)
result = _scale_pcm_ramp(pcm, 0.5, 0.5)
expected = _scale_pcm(pcm, 0.5)
assert result == expected
def test_linear_interpolation(self):
"""Volume ramps linearly from start to end across samples."""
pcm = struct.pack("<hhhh", 10000, 10000, 10000, 10000)
result = _scale_pcm_ramp(pcm, 0.0, 1.0)
samples = struct.unpack("<hhhh", result)
# At i=0: vol=0.0, i=1: vol=0.25, i=2: vol=0.5, i=3: vol=0.75
assert samples[0] == 0
assert samples[1] == 2500
assert samples[2] == 5000
assert samples[3] == 7500
def test_clamp_positive(self):
"""Ramping up with loud samples clamps to 32767."""
pcm = struct.pack("<h", 32767)
result = _scale_pcm_ramp(pcm, 2.0, 2.0)
samples = struct.unpack("<h", result)
assert samples[0] == 32767
def test_clamp_negative(self):
"""Ramping up with negative samples clamps to -32768."""
pcm = struct.pack("<h", -32768)
result = _scale_pcm_ramp(pcm, 2.0, 2.0)
samples = struct.unpack("<h", result)
assert samples[0] == -32768
def test_preserves_length(self):
"""Output length equals input length."""
pcm = b"\x00" * 1920
result = _scale_pcm_ramp(pcm, 0.0, 1.0)
assert len(result) == 1920
def test_empty_data(self):
"""Empty input returns empty output."""
result = _scale_pcm_ramp(b"", 0.0, 1.0)
assert result == b""
def test_reverse_direction(self):
"""Volume ramps down from start to end."""
pcm = struct.pack("<hhhh", 10000, 10000, 10000, 10000)
result = _scale_pcm_ramp(pcm, 1.0, 0.0)
samples = struct.unpack("<hhhh", result)
# At i=0: vol=1.0, i=1: vol=0.75, i=2: vol=0.5, i=3: vol=0.25
assert samples[0] == 10000
assert samples[1] == 7500
assert samples[2] == 5000
assert samples[3] == 2500
# ---------------------------------------------------------------------------
# TestIsAudioReady
# ---------------------------------------------------------------------------
class TestIsAudioReady:
def test_no_mumble_object(self):
bot = _make_bot()
bot._mumble = None
assert bot._is_audio_ready() is False
def test_no_sound_output(self):
bot = _make_bot()
bot._mumble = MagicMock()
bot._mumble.sound_output = None
assert bot._is_audio_ready() is False
def test_no_encoder(self):
bot = _make_bot()
bot._mumble = MagicMock()
bot._mumble.sound_output.encoder = None
assert bot._is_audio_ready() is False
def test_ready(self):
bot = _make_bot()
bot._mumble = MagicMock()
bot._mumble.sound_output.encoder = MagicMock()
assert bot._is_audio_ready() is True
def test_attribute_error_handled(self):
bot = _make_bot()
bot._mumble = MagicMock()
del bot._mumble.sound_output
assert bot._is_audio_ready() is False
# ---------------------------------------------------------------------------
# TestStreamAudioDisconnect
# ---------------------------------------------------------------------------
class TestStreamAudioDisconnect:
def test_stream_survives_disconnect(self):
"""stream_audio keeps ffmpeg alive when connection drops mid-stream."""
bot = _make_bot()
bot._mumble = MagicMock()
bot._mumble.sound_output.encoder = MagicMock()
bot._mumble.sound_output.get_buffer_size.return_value = 0.0
frame = b"\x00" * 1920
# Track which frame we're on; disconnect after frame 3
frame_count = [0]
connected = [True]
async def _fake_read(n):
if frame_count[0] < 5:
frame_count[0] += 1
# Disconnect after 3 frames are read
if frame_count[0] > 3:
connected[0] = False
return frame
return b""
def _ready():
return connected[0]
proc = MagicMock()
proc.stdout.read = _fake_read
proc.stderr.read = AsyncMock(return_value=b"")
proc.wait = AsyncMock(return_value=0)
proc.kill = MagicMock()
progress = [0]
async def _run():
with patch.object(bot, "_is_audio_ready", side_effect=_ready):
with patch("asyncio.create_subprocess_exec",
return_value=proc):
await bot.stream_audio(
"http://example.com/audio",
volume=0.5,
progress=progress,
)
asyncio.run(_run())
# All 5 frames were read (progress tracks all, connected or not)
assert progress[0] == 5
# Only 3 frames were fed to sound_output (the connected ones)
assert bot._mumble.sound_output.add_sound.call_count == 3