diff --git a/plugins/mumble_admin.py b/plugins/mumble_admin.py new file mode 100644 index 0000000..8d65bde --- /dev/null +++ b/plugins/mumble_admin.py @@ -0,0 +1,276 @@ +"""Plugin: Mumble server administration via chat commands.""" + +from __future__ import annotations + +import logging + +from derp.plugin import command + +_log = logging.getLogger(__name__) + + +# -- Helpers ----------------------------------------------------------------- + + +def _find_user(bot, name: str): + """Case-insensitive user lookup by name. Returns pymumble User or None.""" + mumble = getattr(bot, "_mumble", None) + if mumble is None: + return None + lower = name.lower() + for sid in list(mumble.users): + user = mumble.users[sid] + if user["name"].lower() == lower: + return user + return None + + +def _find_channel(bot, name: str): + """Case-insensitive channel lookup by name. Returns pymumble Channel or None.""" + mumble = getattr(bot, "_mumble", None) + if mumble is None: + return None + lower = name.lower() + for cid in list(mumble.channels): + chan = mumble.channels[cid] + if chan["name"].lower() == lower: + return chan + return None + + +def _channel_name(bot, channel_id: int) -> str: + """Resolve a channel ID to its name, or return the ID as string.""" + mumble = getattr(bot, "_mumble", None) + if mumble is None: + return str(channel_id) + chan = mumble.channels.get(channel_id) + if chan is None: + return str(channel_id) + return chan["name"] + + +# -- Sub-handlers ------------------------------------------------------------ + + +async def _sub_kick(bot, message, args: list[str]) -> None: + if not args: + await bot.reply(message, "Usage: !mu kick [reason]") + return + user = _find_user(bot, args[0]) + if user is None: + await bot.reply(message, f"User not found: {args[0]}") + return + reason = " ".join(args[1:]) if len(args) > 1 else "" + user.kick(reason) + await bot.reply(message, f"Kicked {user['name']}") + + +async def _sub_ban(bot, message, args: list[str]) -> None: + if not args: + await bot.reply(message, "Usage: !mu ban [reason]") + return + user = _find_user(bot, args[0]) + if user is None: + await bot.reply(message, f"User not found: {args[0]}") + return + reason = " ".join(args[1:]) if len(args) > 1 else "" + user.ban(reason) + await bot.reply(message, f"Banned {user['name']}") + + +async def _sub_mute(bot, message, args: list[str]) -> None: + if not args: + await bot.reply(message, "Usage: !mu mute ") + return + user = _find_user(bot, args[0]) + if user is None: + await bot.reply(message, f"User not found: {args[0]}") + return + user.mute() + await bot.reply(message, f"Muted {user['name']}") + + +async def _sub_unmute(bot, message, args: list[str]) -> None: + if not args: + await bot.reply(message, "Usage: !mu unmute ") + return + user = _find_user(bot, args[0]) + if user is None: + await bot.reply(message, f"User not found: {args[0]}") + return + user.unmute() + await bot.reply(message, f"Unmuted {user['name']}") + + +async def _sub_deafen(bot, message, args: list[str]) -> None: + if not args: + await bot.reply(message, "Usage: !mu deafen ") + return + user = _find_user(bot, args[0]) + if user is None: + await bot.reply(message, f"User not found: {args[0]}") + return + user.deafen() + await bot.reply(message, f"Deafened {user['name']}") + + +async def _sub_undeafen(bot, message, args: list[str]) -> None: + if not args: + await bot.reply(message, "Usage: !mu undeafen ") + return + user = _find_user(bot, args[0]) + if user is None: + await bot.reply(message, f"User not found: {args[0]}") + return + user.undeafen() + await bot.reply(message, f"Undeafened {user['name']}") + + +async def _sub_move(bot, message, args: list[str]) -> None: + if len(args) < 2: + await bot.reply(message, "Usage: !mu move ") + return + user = _find_user(bot, args[0]) + if user is None: + await bot.reply(message, f"User not found: {args[0]}") + return + chan = _find_channel(bot, " ".join(args[1:])) + if chan is None: + await bot.reply(message, f"Channel not found: {' '.join(args[1:])}") + return + user.move_in(chan["channel_id"]) + await bot.reply(message, f"Moved {user['name']} to {chan['name']}") + + +async def _sub_users(bot, message, args: list[str]) -> None: + mumble = getattr(bot, "_mumble", None) + if mumble is None: + return + bots = getattr(bot.registry, "_bots", {}) + lines: list[str] = [] + for sid in sorted(mumble.users): + user = mumble.users[sid] + name = user["name"] + flags: list[str] = [] + if name in bots: + flags.append("bot") + if user.get("mute") or user.get("self_mute"): + flags.append("muted") + if user.get("deaf") or user.get("self_deaf"): + flags.append("deaf") + chan = _channel_name(bot, user.get("channel_id", 0)) + tag = f" [{', '.join(flags)}]" if flags else "" + lines.append(f" {name} in {chan}{tag}") + header = f"Online: {len(lines)} user(s)" + await bot.reply(message, header + "\n" + "\n".join(lines)) + + +async def _sub_channels(bot, message, args: list[str]) -> None: + mumble = getattr(bot, "_mumble", None) + if mumble is None: + return + lines: list[str] = [] + for cid in sorted(mumble.channels): + chan = mumble.channels[cid] + name = chan["name"] + # Count users in this channel + count = sum( + 1 for sid in mumble.users + if mumble.users[sid].get("channel_id") == cid + ) + lines.append(f" {name} ({count})") + await bot.reply(message, "Channels:\n" + "\n".join(lines)) + + +async def _sub_mkchan(bot, message, args: list[str]) -> None: + if not args: + await bot.reply(message, "Usage: !mu mkchan [temp]") + return + mumble = getattr(bot, "_mumble", None) + if mumble is None: + return + name = args[0] + temp = len(args) > 1 and args[1].lower() in ("temp", "temporary", "true") + mumble.channels.new_channel(0, name, temporary=temp) + label = " (temporary)" if temp else "" + await bot.reply(message, f"Created channel: {name}{label}") + + +async def _sub_rmchan(bot, message, args: list[str]) -> None: + if not args: + await bot.reply(message, "Usage: !mu rmchan ") + return + chan = _find_channel(bot, " ".join(args)) + if chan is None: + await bot.reply(message, f"Channel not found: {' '.join(args)}") + return + name = chan["name"] + chan.remove() + await bot.reply(message, f"Removed channel: {name}") + + +async def _sub_rename(bot, message, args: list[str]) -> None: + if len(args) < 2: + await bot.reply(message, "Usage: !mu rename ") + return + chan = _find_channel(bot, args[0]) + if chan is None: + await bot.reply(message, f"Channel not found: {args[0]}") + return + old = chan["name"] + chan.rename_channel(args[1]) + await bot.reply(message, f"Renamed {old} to {args[1]}") + + +async def _sub_desc(bot, message, args: list[str]) -> None: + if len(args) < 2: + await bot.reply(message, "Usage: !mu desc ") + return + chan = _find_channel(bot, args[0]) + if chan is None: + await bot.reply(message, f"Channel not found: {args[0]}") + return + text = " ".join(args[1:]) + chan.set_channel_description(text) + await bot.reply(message, f"Set description for {chan['name']}") + + +# -- Dispatch table ---------------------------------------------------------- + + +_SUBS: dict[str, object] = { + "kick": _sub_kick, + "ban": _sub_ban, + "mute": _sub_mute, + "unmute": _sub_unmute, + "deafen": _sub_deafen, + "undeafen": _sub_undeafen, + "move": _sub_move, + "users": _sub_users, + "channels": _sub_channels, + "mkchan": _sub_mkchan, + "rmchan": _sub_rmchan, + "rename": _sub_rename, + "desc": _sub_desc, +} + +_USAGE = ( + "Usage: !mu [args]\n" + "Actions: kick, ban, mute, unmute, deafen, undeafen, move, " + "users, channels, mkchan, rmchan, rename, desc" +) + + +@command("mu", help="Mumble admin: !mu [args]", tier="admin") +async def cmd_mu(bot, message): + """Mumble server administration commands.""" + parts = message.text.split() + if len(parts) < 2: + await bot.reply(message, _USAGE) + return + sub = parts[1].lower() + handler = _SUBS.get(sub) + if handler is None: + await bot.reply(message, _USAGE) + return + await handler(bot, message, parts[2:]) diff --git a/tests/test_mumble_admin.py b/tests/test_mumble_admin.py new file mode 100644 index 0000000..6340641 --- /dev/null +++ b/tests/test_mumble_admin.py @@ -0,0 +1,498 @@ +"""Tests for the mumble_admin plugin.""" + +import asyncio +import importlib.util +from dataclasses import dataclass, field +from unittest.mock import MagicMock + +# -- Load plugin module directly --------------------------------------------- + +_spec = importlib.util.spec_from_file_location( + "mumble_admin", "plugins/mumble_admin.py", +) +_mod = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(_mod) + +cmd_mu = _mod.cmd_mu +_find_user = _mod._find_user +_find_channel = _mod._find_channel +_channel_name = _mod._channel_name + + +# -- Fakes ------------------------------------------------------------------- + + +@dataclass +class _FakeMessage: + text: str = "" + nick: str = "admin" + prefix: str = "admin" + target: str = "0" + is_channel: bool = True + params: list[str] = field(default_factory=list) + + +class _FakeRegistry: + _bots: dict = field(default_factory=dict) + + def __init__(self): + self._bots = {} + + +class _FakeBot: + def __init__(self, users=None, channels=None): + self.registry = _FakeRegistry() + self._mumble = MagicMock() + if users is not None: + self._mumble.users = users + else: + self._mumble.users = {} + if channels is not None: + self._mumble.channels = channels + self._replies: list[str] = [] + + async def reply(self, message, text): + self._replies.append(text) + + async def send(self, target, text): + self._replies.append(text) + + +def _make_user(name, channel_id=0, mute=False, deaf=False, + self_mute=False, self_deaf=False): + """Create a fake pymumble user (dict with methods).""" + u = MagicMock() + u.__getitem__ = lambda s, k: { + "name": name, + "channel_id": channel_id, + "mute": mute, + "deaf": deaf, + "self_mute": self_mute, + "self_deaf": self_deaf, + }[k] + u.get = lambda k, d=None: { + "name": name, + "channel_id": channel_id, + "mute": mute, + "deaf": deaf, + "self_mute": self_mute, + "self_deaf": self_deaf, + }.get(k, d) + return u + + +def _make_channel(name, channel_id=0, parent=0): + """Create a fake pymumble channel (dict with methods).""" + c = MagicMock() + c.__getitem__ = lambda s, k: { + "name": name, + "channel_id": channel_id, + "parent": parent, + }[k] + c.get = lambda k, d=None: { + "name": name, + "channel_id": channel_id, + "parent": parent, + }.get(k, d) + return c + + +# -- TestFindUser ------------------------------------------------------------ + + +class TestFindUser: + def test_case_insensitive(self): + alice = _make_user("Alice") + bot = _FakeBot(users={1: alice}) + assert _find_user(bot, "alice") is alice + assert _find_user(bot, "ALICE") is alice + assert _find_user(bot, "Alice") is alice + + def test_not_found(self): + bot = _FakeBot(users={1: _make_user("Alice")}) + assert _find_user(bot, "Bob") is None + + def test_no_mumble(self): + bot = _FakeBot() + bot._mumble = None + assert _find_user(bot, "anyone") is None + + +# -- TestFindChannel --------------------------------------------------------- + + +class TestFindChannel: + def test_case_insensitive(self): + lobby = _make_channel("Lobby", channel_id=0) + bot = _FakeBot(channels={0: lobby}) + assert _find_channel(bot, "lobby") is lobby + assert _find_channel(bot, "LOBBY") is lobby + + def test_not_found(self): + bot = _FakeBot(channels={0: _make_channel("Lobby")}) + assert _find_channel(bot, "AFK") is None + + def test_no_mumble(self): + bot = _FakeBot() + bot._mumble = None + assert _find_channel(bot, "any") is None + + +# -- TestChannelName --------------------------------------------------------- + + +class TestChannelName: + def test_resolves(self): + lobby = _make_channel("Lobby", channel_id=0) + bot = _FakeBot(channels={0: lobby}) + assert _channel_name(bot, 0) == "Lobby" + + def test_missing_returns_id(self): + bot = _FakeBot(channels={}) + assert _channel_name(bot, 42) == "42" + + def test_no_mumble(self): + bot = _FakeBot() + bot._mumble = None + assert _channel_name(bot, 5) == "5" + + +# -- TestDispatch ------------------------------------------------------------ + + +class TestDispatch: + def test_no_args_shows_usage(self): + bot = _FakeBot() + msg = _FakeMessage(text="!mu") + asyncio.run(cmd_mu(bot, msg)) + assert len(bot._replies) == 1 + assert "Usage" in bot._replies[0] + + def test_unknown_sub_shows_usage(self): + bot = _FakeBot() + msg = _FakeMessage(text="!mu bogus") + asyncio.run(cmd_mu(bot, msg)) + assert "Usage" in bot._replies[0] + + def test_valid_sub_routes(self): + alice = _make_user("Alice") + bot = _FakeBot(users={1: alice}) + msg = _FakeMessage(text="!mu kick Alice") + asyncio.run(cmd_mu(bot, msg)) + alice.kick.assert_called_once_with("") + assert "Kicked" in bot._replies[0] + + +# -- TestKick ---------------------------------------------------------------- + + +class TestKick: + def test_kick_user(self): + alice = _make_user("Alice") + bot = _FakeBot(users={1: alice}) + msg = _FakeMessage(text="!mu kick Alice") + asyncio.run(cmd_mu(bot, msg)) + alice.kick.assert_called_once_with("") + assert "Kicked Alice" in bot._replies[0] + + def test_kick_with_reason(self): + alice = _make_user("Alice") + bot = _FakeBot(users={1: alice}) + msg = _FakeMessage(text="!mu kick Alice being rude") + asyncio.run(cmd_mu(bot, msg)) + alice.kick.assert_called_once_with("being rude") + + def test_kick_user_not_found(self): + bot = _FakeBot(users={}) + msg = _FakeMessage(text="!mu kick Ghost") + asyncio.run(cmd_mu(bot, msg)) + assert "not found" in bot._replies[0].lower() + + def test_kick_no_args(self): + bot = _FakeBot() + msg = _FakeMessage(text="!mu kick") + asyncio.run(cmd_mu(bot, msg)) + assert "Usage" in bot._replies[0] + + +# -- TestBan ----------------------------------------------------------------- + + +class TestBan: + def test_ban_user(self): + bob = _make_user("Bob") + bot = _FakeBot(users={1: bob}) + msg = _FakeMessage(text="!mu ban Bob") + asyncio.run(cmd_mu(bot, msg)) + bob.ban.assert_called_once_with("") + assert "Banned Bob" in bot._replies[0] + + def test_ban_with_reason(self): + bob = _make_user("Bob") + bot = _FakeBot(users={1: bob}) + msg = _FakeMessage(text="!mu ban Bob spamming") + asyncio.run(cmd_mu(bot, msg)) + bob.ban.assert_called_once_with("spamming") + + def test_ban_user_not_found(self): + bot = _FakeBot(users={}) + msg = _FakeMessage(text="!mu ban Ghost") + asyncio.run(cmd_mu(bot, msg)) + assert "not found" in bot._replies[0].lower() + + +# -- TestMuteUnmute ---------------------------------------------------------- + + +class TestMuteUnmute: + def test_mute(self): + alice = _make_user("Alice") + bot = _FakeBot(users={1: alice}) + msg = _FakeMessage(text="!mu mute Alice") + asyncio.run(cmd_mu(bot, msg)) + alice.mute.assert_called_once() + assert "Muted" in bot._replies[0] + + def test_unmute(self): + alice = _make_user("Alice", mute=True) + bot = _FakeBot(users={1: alice}) + msg = _FakeMessage(text="!mu unmute Alice") + asyncio.run(cmd_mu(bot, msg)) + alice.unmute.assert_called_once() + assert "Unmuted" in bot._replies[0] + + def test_mute_not_found(self): + bot = _FakeBot(users={}) + msg = _FakeMessage(text="!mu mute Nobody") + asyncio.run(cmd_mu(bot, msg)) + assert "not found" in bot._replies[0].lower() + + def test_unmute_not_found(self): + bot = _FakeBot(users={}) + msg = _FakeMessage(text="!mu unmute Nobody") + asyncio.run(cmd_mu(bot, msg)) + assert "not found" in bot._replies[0].lower() + + +# -- TestDeafenUndeafen ------------------------------------------------------ + + +class TestDeafenUndeafen: + def test_deafen(self): + alice = _make_user("Alice") + bot = _FakeBot(users={1: alice}) + msg = _FakeMessage(text="!mu deafen Alice") + asyncio.run(cmd_mu(bot, msg)) + alice.deafen.assert_called_once() + assert "Deafened" in bot._replies[0] + + def test_undeafen(self): + alice = _make_user("Alice", deaf=True) + bot = _FakeBot(users={1: alice}) + msg = _FakeMessage(text="!mu undeafen Alice") + asyncio.run(cmd_mu(bot, msg)) + alice.undeafen.assert_called_once() + assert "Undeafened" in bot._replies[0] + + def test_deafen_not_found(self): + bot = _FakeBot(users={}) + msg = _FakeMessage(text="!mu deafen Nobody") + asyncio.run(cmd_mu(bot, msg)) + assert "not found" in bot._replies[0].lower() + + def test_undeafen_not_found(self): + bot = _FakeBot(users={}) + msg = _FakeMessage(text="!mu undeafen Nobody") + asyncio.run(cmd_mu(bot, msg)) + assert "not found" in bot._replies[0].lower() + + +# -- TestMove ---------------------------------------------------------------- + + +class TestMove: + def test_move_user(self): + alice = _make_user("Alice", channel_id=0) + afk = _make_channel("AFK", channel_id=5) + bot = _FakeBot(users={1: alice}, channels={0: _make_channel("Root"), 5: afk}) + msg = _FakeMessage(text="!mu move Alice AFK") + asyncio.run(cmd_mu(bot, msg)) + alice.move_in.assert_called_once_with(5) + assert "Moved Alice to AFK" in bot._replies[0] + + def test_move_user_not_found(self): + bot = _FakeBot(users={}, channels={5: _make_channel("AFK", channel_id=5)}) + msg = _FakeMessage(text="!mu move Ghost AFK") + asyncio.run(cmd_mu(bot, msg)) + assert "user not found" in bot._replies[0].lower() + + def test_move_channel_not_found(self): + alice = _make_user("Alice") + bot = _FakeBot(users={1: alice}, channels={}) + msg = _FakeMessage(text="!mu move Alice Nowhere") + asyncio.run(cmd_mu(bot, msg)) + assert "channel not found" in bot._replies[0].lower() + + def test_move_missing_args(self): + bot = _FakeBot() + msg = _FakeMessage(text="!mu move Alice") + asyncio.run(cmd_mu(bot, msg)) + assert "Usage" in bot._replies[0] + + +# -- TestUsers --------------------------------------------------------------- + + +class TestUsers: + def test_list_users(self): + alice = _make_user("Alice", channel_id=0) + bob = _make_user("Bob", channel_id=0, self_mute=True) + lobby = _make_channel("Lobby", channel_id=0) + bot = _FakeBot(users={1: alice, 2: bob}, channels={0: lobby}) + msg = _FakeMessage(text="!mu users") + asyncio.run(cmd_mu(bot, msg)) + reply = bot._replies[0] + assert "2 user(s)" in reply + assert "Alice" in reply + assert "Bob" in reply + assert "muted" in reply + + def test_list_with_bots(self): + alice = _make_user("Alice", channel_id=0) + derp = _make_user("derp", channel_id=0) + lobby = _make_channel("Lobby", channel_id=0) + bot = _FakeBot(users={1: alice, 2: derp}, channels={0: lobby}) + bot.registry._bots = {"derp": MagicMock()} + msg = _FakeMessage(text="!mu users") + asyncio.run(cmd_mu(bot, msg)) + reply = bot._replies[0] + assert "bot" in reply + assert "2 user(s)" in reply + + def test_deaf_flag(self): + alice = _make_user("Alice", channel_id=0, self_deaf=True) + lobby = _make_channel("Lobby", channel_id=0) + bot = _FakeBot(users={1: alice}, channels={0: lobby}) + msg = _FakeMessage(text="!mu users") + asyncio.run(cmd_mu(bot, msg)) + assert "deaf" in bot._replies[0] + + +# -- TestChannels ------------------------------------------------------------ + + +class TestChannels: + def test_list_channels(self): + lobby = _make_channel("Lobby", channel_id=0) + afk = _make_channel("AFK", channel_id=1) + alice = _make_user("Alice", channel_id=0) + bot = _FakeBot(users={1: alice}, channels={0: lobby, 1: afk}) + msg = _FakeMessage(text="!mu channels") + asyncio.run(cmd_mu(bot, msg)) + reply = bot._replies[0] + assert "Lobby (1)" in reply + assert "AFK (0)" in reply + + +# -- TestMkchan -------------------------------------------------------------- + + +class TestMkchan: + def test_create_channel(self): + bot = _FakeBot() + msg = _FakeMessage(text="!mu mkchan Gaming") + asyncio.run(cmd_mu(bot, msg)) + bot._mumble.channels.new_channel.assert_called_once_with( + 0, "Gaming", temporary=False, + ) + assert "Created" in bot._replies[0] + + def test_create_temp_channel(self): + bot = _FakeBot() + msg = _FakeMessage(text="!mu mkchan Gaming temp") + asyncio.run(cmd_mu(bot, msg)) + bot._mumble.channels.new_channel.assert_called_once_with( + 0, "Gaming", temporary=True, + ) + assert "temporary" in bot._replies[0] + + def test_missing_name(self): + bot = _FakeBot() + msg = _FakeMessage(text="!mu mkchan") + asyncio.run(cmd_mu(bot, msg)) + assert "Usage" in bot._replies[0] + + +# -- TestRmchan -------------------------------------------------------------- + + +class TestRmchan: + def test_remove_channel(self): + afk = _make_channel("AFK", channel_id=5) + bot = _FakeBot(channels={5: afk}) + msg = _FakeMessage(text="!mu rmchan AFK") + asyncio.run(cmd_mu(bot, msg)) + afk.remove.assert_called_once() + assert "Removed" in bot._replies[0] + + def test_channel_not_found(self): + bot = _FakeBot(channels={}) + msg = _FakeMessage(text="!mu rmchan Nowhere") + asyncio.run(cmd_mu(bot, msg)) + assert "not found" in bot._replies[0].lower() + + def test_missing_args(self): + bot = _FakeBot() + msg = _FakeMessage(text="!mu rmchan") + asyncio.run(cmd_mu(bot, msg)) + assert "Usage" in bot._replies[0] + + +# -- TestRename -------------------------------------------------------------- + + +class TestRename: + def test_rename_channel(self): + afk = _make_channel("AFK", channel_id=5) + bot = _FakeBot(channels={5: afk}) + msg = _FakeMessage(text="!mu rename AFK Chill") + asyncio.run(cmd_mu(bot, msg)) + afk.rename_channel.assert_called_once_with("Chill") + assert "Renamed" in bot._replies[0] + + def test_channel_not_found(self): + bot = _FakeBot(channels={}) + msg = _FakeMessage(text="!mu rename Nowhere New") + asyncio.run(cmd_mu(bot, msg)) + assert "not found" in bot._replies[0].lower() + + def test_missing_args(self): + bot = _FakeBot() + msg = _FakeMessage(text="!mu rename AFK") + asyncio.run(cmd_mu(bot, msg)) + assert "Usage" in bot._replies[0] + + +# -- TestDesc ---------------------------------------------------------------- + + +class TestDesc: + def test_set_description(self): + afk = _make_channel("AFK", channel_id=5) + bot = _FakeBot(channels={5: afk}) + msg = _FakeMessage(text="!mu desc AFK Away from keyboard") + asyncio.run(cmd_mu(bot, msg)) + afk.set_channel_description.assert_called_once_with("Away from keyboard") + assert "description" in bot._replies[0].lower() + + def test_channel_not_found(self): + bot = _FakeBot(channels={}) + msg = _FakeMessage(text="!mu desc Nowhere some text") + asyncio.run(cmd_mu(bot, msg)) + assert "not found" in bot._replies[0].lower() + + def test_missing_args(self): + bot = _FakeBot() + msg = _FakeMessage(text="!mu desc AFK") + asyncio.run(cmd_mu(bot, msg)) + assert "Usage" in bot._replies[0]