diff --git a/plugins/core.py b/plugins/core.py index 318f04a..c2ca08b 100644 --- a/plugins/core.py +++ b/plugins/core.py @@ -139,34 +139,33 @@ async def cmd_plugins(bot, message): await bot.reply(message, f"Plugins: {', '.join(parts)}") -@command("whoami", help="Show your hostmask and admin status") +@command("whoami", help="Show your hostmask and permission tier") async def cmd_whoami(bot, message): """Display the sender's hostmask and permission level.""" prefix = message.prefix or "unknown" - is_admin = bot._is_admin(message) - is_oper = message.prefix in bot._opers if message.prefix else False - tags = [] - if is_admin: - tags.append("admin") - else: - tags.append("user") - if is_oper: + tier = bot._get_tier(message) + tags = [tier] + if message.prefix and message.prefix in bot._opers: tags.append("IRCOP") await bot.reply(message, f"{prefix} [{', '.join(tags)}]") -@command("admins", help="Show configured admin patterns and detected opers", admin=True) +@command("admins", help="Show configured permission tiers and detected opers", admin=True) async def cmd_admins(bot, message): - """Display admin hostmask patterns and known IRC operators.""" + """Display configured permission tiers and known IRC operators.""" parts = [] if bot._admins: - parts.append(f"Patterns: {', '.join(bot._admins)}") + parts.append(f"Admin: {', '.join(bot._admins)}") else: - parts.append("Patterns: (none)") + parts.append("Admin: (none)") + if bot._operators: + parts.append(f"Oper: {', '.join(bot._operators)}") + if bot._trusted: + parts.append(f"Trusted: {', '.join(bot._trusted)}") if bot._opers: - parts.append(f"Opers: {', '.join(sorted(bot._opers))}") + parts.append(f"IRCOPs: {', '.join(sorted(bot._opers))}") else: - parts.append("Opers: (none)") + parts.append("IRCOPs: (none)") await bot.reply(message, " | ".join(parts)) diff --git a/src/derp/bot.py b/src/derp/bot.py index 8f1a139..fcd1d61 100644 --- a/src/derp/bot.py +++ b/src/derp/bot.py @@ -13,7 +13,7 @@ from pathlib import Path from derp import __version__ from derp.irc import _MAX_IRC_LINE, IRCConnection, Message, format_msg, parse -from derp.plugin import Handler, PluginRegistry +from derp.plugin import TIERS, Handler, PluginRegistry from derp.state import StateStore log = logging.getLogger(__name__) @@ -93,6 +93,8 @@ class Bot: self._tasks: set[asyncio.Task] = set() self._reconnect_delay: float = 5.0 self._admins: list[str] = config.get("bot", {}).get("admins", []) + self._operators: list[str] = config.get("bot", {}).get("operators", []) + self._trusted: list[str] = config.get("bot", {}).get("trusted", []) self._opers: set[str] = set() # hostmasks of known IRC operators self._caps: set[str] = set() # negotiated IRCv3 caps self._who_pending: dict[str, asyncio.Task] = {} # debounced WHO per channel @@ -359,20 +361,33 @@ class Bot: return True return plugin_name in allowed + def _get_tier(self, msg: Message) -> str: + """Determine the permission tier of the message sender. + + Checks (in priority order): IRC operator, admin pattern, + operator pattern, trusted pattern. Falls back to ``"user"``. + """ + if not msg.prefix: + return "user" + if msg.prefix in self._opers: + return "admin" + for pattern in self._admins: + if fnmatch.fnmatch(msg.prefix, pattern): + return "admin" + for pattern in self._operators: + if fnmatch.fnmatch(msg.prefix, pattern): + return "oper" + for pattern in self._trusted: + if fnmatch.fnmatch(msg.prefix, pattern): + return "trusted" + return "user" + def _is_admin(self, msg: Message) -> bool: """Check if the message sender is a bot admin. - Returns True if the sender is a known IRC operator or matches - a configured hostmask pattern (fnmatch-style). + Thin wrapper around ``_get_tier`` for backward compatibility. """ - if not msg.prefix: - return False - if msg.prefix in self._opers: - return True - for pattern in self._admins: - if fnmatch.fnmatch(msg.prefix, pattern): - return True - return False + return self._get_tier(msg) == "admin" def _dispatch_command(self, msg: Message) -> None: """Check if a PRIVMSG is a bot command and spawn it.""" @@ -396,10 +411,13 @@ class Bot: if not self._plugin_allowed(handler.plugin, channel): return - if handler.admin and not self._is_admin(msg): - deny = f"Permission denied: {self.prefix}{cmd_name} requires admin" - self._spawn(self.reply(msg, deny), name=f"cmd:{cmd_name}:denied") - return + required = handler.tier + if required != "user": + sender = self._get_tier(msg) + if TIERS.index(sender) < TIERS.index(required): + deny = f"Permission denied: {self.prefix}{cmd_name} requires {required}" + self._spawn(self.reply(msg, deny), name=f"cmd:{cmd_name}:denied") + return self._spawn(self._run_command(handler, cmd_name, msg), name=f"cmd:{cmd_name}") diff --git a/src/derp/config.py b/src/derp/config.py index 565c908..120a604 100644 --- a/src/derp/config.py +++ b/src/derp/config.py @@ -29,8 +29,16 @@ DEFAULTS: dict = { "rate_burst": 5, "paste_threshold": 4, "admins": [], + "operators": [], + "trusted": [], }, "channels": {}, + "webhook": { + "enabled": False, + "host": "127.0.0.1", + "port": 8080, + "secret": "", + }, "logging": { "level": "info", "format": "text", diff --git a/src/derp/plugin.py b/src/derp/plugin.py index 6b20222..d602004 100644 --- a/src/derp/plugin.py +++ b/src/derp/plugin.py @@ -12,6 +12,8 @@ from typing import Any, Callable log = logging.getLogger(__name__) +TIERS: tuple[str, ...] = ("user", "trusted", "oper", "admin") + @dataclass(slots=True) class Handler: @@ -22,9 +24,10 @@ class Handler: help: str = "" plugin: str = "" admin: bool = False + tier: str = "user" -def command(name: str, help: str = "", admin: bool = False) -> Callable: +def command(name: str, help: str = "", admin: bool = False, tier: str = "") -> Callable: """Decorator to register an async function as a bot command. Usage:: @@ -36,12 +39,17 @@ def command(name: str, help: str = "", admin: bool = False) -> Callable: @command("reload", help="Reload a plugin", admin=True) async def cmd_reload(bot, message): ... + + @command("trusted_cmd", help="Trusted-only", tier="trusted") + async def cmd_trusted(bot, message): + ... """ def decorator(func: Callable) -> Callable: func._derp_command = name # type: ignore[attr-defined] func._derp_help = help # type: ignore[attr-defined] func._derp_admin = admin # type: ignore[attr-defined] + func._derp_tier = tier if tier else ("admin" if admin else "user") # type: ignore[attr-defined] return func return decorator @@ -74,12 +82,14 @@ class PluginRegistry: self._paths: dict[str, Path] = {} def register_command(self, name: str, callback: Callable, help: str = "", - plugin: str = "", admin: bool = False) -> None: + plugin: str = "", admin: bool = False, + tier: str = "user") -> None: """Register a command handler.""" if name in self.commands: log.warning("command '%s' already registered, overwriting", name) self.commands[name] = Handler( - name=name, callback=callback, help=help, plugin=plugin, admin=admin, + name=name, callback=callback, help=help, plugin=plugin, + admin=admin, tier=tier, ) log.debug("registered command: %s (%s)", name, plugin) @@ -102,6 +112,7 @@ class PluginRegistry: help=getattr(obj, "_derp_help", ""), plugin=plugin_name, admin=getattr(obj, "_derp_admin", False), + tier=getattr(obj, "_derp_tier", "user"), ) count += 1 if hasattr(obj, "_derp_event"): diff --git a/tests/test_acl.py b/tests/test_acl.py new file mode 100644 index 0000000..844a4f0 --- /dev/null +++ b/tests/test_acl.py @@ -0,0 +1,409 @@ +"""Tests for the granular ACL tier system.""" + +import asyncio +from pathlib import Path + +from derp.bot import Bot +from derp.config import DEFAULTS +from derp.irc import Message, parse +from derp.plugin import TIERS, PluginRegistry, command + +# -- Helpers ----------------------------------------------------------------- + + +class _MockConnection: + """Minimal mock IRC connection for dispatch tests.""" + + def __init__(self) -> None: + self._queue: asyncio.Queue[str | None] = asyncio.Queue() + self.sent: list[str] = [] + + async def connect(self) -> None: + pass + + async def close(self) -> None: + pass + + async def send(self, line: str) -> None: + self.sent.append(line) + + async def readline(self) -> str | None: + return await self._queue.get() + + def inject(self, line: str) -> None: + self._queue.put_nowait(line) + + def disconnect(self) -> None: + self._queue.put_nowait(None) + + +class _Harness: + """Test fixture: Bot with mock connection and configurable tiers.""" + + def __init__( + self, + admins: list[str] | None = None, + operators: list[str] | None = None, + trusted: list[str] | None = None, + ) -> None: + config: dict = { + "server": { + "host": "localhost", "port": 6667, "tls": False, + "nick": "test", "user": "test", "realname": "test bot", + }, + "bot": { + "prefix": "!", + "channels": [], + "plugins_dir": "plugins", + "admins": admins or [], + "operators": operators or [], + "trusted": trusted or [], + "rate_limit": 100.0, + "rate_burst": 100, + }, + } + self.registry = PluginRegistry() + self.bot = Bot(config, self.registry) + self.conn = _MockConnection() + self.bot.conn = self.conn # type: ignore[assignment] + self.registry.load_plugin(Path("plugins/core.py")) + + def inject_registration(self) -> None: + self.conn.inject(":server CAP * LS :") + self.conn.inject(":server 001 test :Welcome") + + def privmsg(self, nick: str, target: str, text: str, + user: str = "user", host: str = "host") -> None: + self.conn.inject(f":{nick}!{user}@{host} PRIVMSG {target} :{text}") + + async def run(self) -> None: + self.conn.disconnect() + self.bot._running = True + await self.bot._connect_and_run() + if self.bot._tasks: + await asyncio.gather(*list(self.bot._tasks), return_exceptions=True) + + def sent_privmsgs(self, target: str) -> list[str]: + results = [] + for line in self.conn.sent: + msg = parse(line) + if msg.command == "PRIVMSG" and msg.target == target: + results.append(msg.text) + return results + + +def _msg(text: str, prefix: str = "nick!user@host") -> Message: + nick = prefix.split("!")[0] if "!" in prefix else prefix + return Message( + raw="", prefix=prefix, nick=nick, + command="PRIVMSG", params=["#test", text], tags={}, + ) + + +# --------------------------------------------------------------------------- +# TestTierConstants +# --------------------------------------------------------------------------- + + +class TestTierConstants: + def test_tier_order(self): + assert TIERS == ("user", "trusted", "oper", "admin") + + def test_index_comparison(self): + assert TIERS.index("user") < TIERS.index("trusted") + assert TIERS.index("trusted") < TIERS.index("oper") + assert TIERS.index("oper") < TIERS.index("admin") + + +# --------------------------------------------------------------------------- +# TestGetTier +# --------------------------------------------------------------------------- + + +class TestGetTier: + def test_no_prefix(self): + h = _Harness() + msg = Message(raw="", prefix="", nick="", command="PRIVMSG", + params=["#test", "hi"], tags={}) + assert h.bot._get_tier(msg) == "user" + + def test_ircop(self): + h = _Harness() + h.bot._opers.add("op!root@server") + msg = _msg("test", prefix="op!root@server") + assert h.bot._get_tier(msg) == "admin" + + def test_admin_pattern(self): + h = _Harness(admins=["*!*@admin.host"]) + msg = _msg("test", prefix="alice!user@admin.host") + assert h.bot._get_tier(msg) == "admin" + + def test_oper_pattern(self): + h = _Harness(operators=["*!*@oper.host"]) + msg = _msg("test", prefix="bob!user@oper.host") + assert h.bot._get_tier(msg) == "oper" + + def test_trusted_pattern(self): + h = _Harness(trusted=["*!*@trusted.host"]) + msg = _msg("test", prefix="carol!user@trusted.host") + assert h.bot._get_tier(msg) == "trusted" + + def test_no_match(self): + h = _Harness(admins=["*!*@admin.host"]) + msg = _msg("test", prefix="nobody!user@random.host") + assert h.bot._get_tier(msg) == "user" + + def test_priority_admin_over_oper(self): + """Admin patterns take priority over operator patterns.""" + h = _Harness(admins=["*!*@dual.host"], operators=["*!*@dual.host"]) + msg = _msg("test", prefix="x!y@dual.host") + assert h.bot._get_tier(msg) == "admin" + + def test_priority_oper_over_trusted(self): + """Operator patterns take priority over trusted patterns.""" + h = _Harness(operators=["*!*@dual.host"], trusted=["*!*@dual.host"]) + msg = _msg("test", prefix="x!y@dual.host") + assert h.bot._get_tier(msg) == "oper" + + def test_ircop_over_admin_pattern(self): + """IRC operator status takes priority over admin hostmask pattern.""" + h = _Harness(admins=["*!*@server"]) + h.bot._opers.add("op!root@server") + msg = _msg("test", prefix="op!root@server") + # Both match, but ircop check comes first + assert h.bot._get_tier(msg) == "admin" + + +# --------------------------------------------------------------------------- +# TestIsAdminBackcompat +# --------------------------------------------------------------------------- + + +class TestIsAdminBackcompat: + def test_admin_true(self): + h = _Harness(admins=["*!*@admin.host"]) + msg = _msg("test", prefix="a!b@admin.host") + assert h.bot._is_admin(msg) is True + + def test_oper_false(self): + h = _Harness(operators=["*!*@oper.host"]) + msg = _msg("test", prefix="a!b@oper.host") + assert h.bot._is_admin(msg) is False + + def test_trusted_false(self): + h = _Harness(trusted=["*!*@trusted.host"]) + msg = _msg("test", prefix="a!b@trusted.host") + assert h.bot._is_admin(msg) is False + + +# --------------------------------------------------------------------------- +# TestCommandDecorator +# --------------------------------------------------------------------------- + + +class TestCommandDecorator: + def test_admin_true_sets_tier(self): + @command("x", admin=True) + async def handler(bot, msg): + pass + assert handler._derp_tier == "admin" + + def test_explicit_tier(self): + @command("x", tier="trusted") + async def handler(bot, msg): + pass + assert handler._derp_tier == "trusted" + + def test_tier_overrides_admin(self): + @command("x", admin=True, tier="oper") + async def handler(bot, msg): + pass + assert handler._derp_tier == "oper" + + def test_default_tier(self): + @command("x") + async def handler(bot, msg): + pass + assert handler._derp_tier == "user" + + +# --------------------------------------------------------------------------- +# TestDispatchWithTiers +# --------------------------------------------------------------------------- + + +class TestDispatchWithTiers: + def test_trusted_allowed_for_trusted_cmd(self): + """Trusted user can run a trusted-tier command.""" + h = _Harness(trusted=["*!user@host"]) + + @command("tcmd", tier="trusted") + async def cmd_tcmd(bot, message): + await bot.reply(message, "ok") + + h.registry.register_command( + "tcmd", cmd_tcmd, tier="trusted", plugin="test", + ) + h.inject_registration() + h.privmsg("nick", "#test", "!tcmd") + asyncio.run(h.run()) + msgs = h.sent_privmsgs("#test") + assert any("ok" in m for m in msgs) + + def test_admin_allowed_for_trusted_cmd(self): + """Admin can run trusted-tier commands (higher tier).""" + h = _Harness(admins=["*!user@host"]) + + @command("tcmd", tier="trusted") + async def cmd_tcmd(bot, message): + await bot.reply(message, "ok") + + h.registry.register_command( + "tcmd", cmd_tcmd, tier="trusted", plugin="test", + ) + h.inject_registration() + h.privmsg("nick", "#test", "!tcmd") + asyncio.run(h.run()) + msgs = h.sent_privmsgs("#test") + assert any("ok" in m for m in msgs) + + def test_user_denied_for_trusted_cmd(self): + """Regular user is denied a trusted-tier command.""" + h = _Harness() + + @command("tcmd", tier="trusted") + async def cmd_tcmd(bot, message): + await bot.reply(message, "ok") + + h.registry.register_command( + "tcmd", cmd_tcmd, tier="trusted", plugin="test", + ) + h.inject_registration() + h.privmsg("nick", "#test", "!tcmd") + asyncio.run(h.run()) + msgs = h.sent_privmsgs("#test") + assert any("Permission denied" in m for m in msgs) + assert any("requires trusted" in m for m in msgs) + + def test_backward_compat_admin_flag(self): + """admin=True commands still work via tier='admin'.""" + h = _Harness(admins=["*!user@host"]) + h.inject_registration() + # cmd_admins is already registered via core plugin with admin=True + h.privmsg("nick", "#test", "!admins") + asyncio.run(h.run()) + msgs = h.sent_privmsgs("#test") + assert any("Admin:" in m for m in msgs) + + def test_denial_message_shows_tier(self): + """Permission denial message includes the required tier name.""" + h = _Harness() + + @command("opcmd", tier="oper") + async def cmd_opcmd(bot, message): + await bot.reply(message, "ok") + + h.registry.register_command( + "opcmd", cmd_opcmd, tier="oper", plugin="test", + ) + h.inject_registration() + h.privmsg("nick", "#test", "!opcmd") + asyncio.run(h.run()) + msgs = h.sent_privmsgs("#test") + assert any("requires oper" in m for m in msgs) + + +# --------------------------------------------------------------------------- +# TestConfigDefaults +# --------------------------------------------------------------------------- + + +class TestConfigDefaults: + def test_operators_in_defaults(self): + assert "operators" in DEFAULTS["bot"] + assert DEFAULTS["bot"]["operators"] == [] + + def test_trusted_in_defaults(self): + assert "trusted" in DEFAULTS["bot"] + assert DEFAULTS["bot"]["trusted"] == [] + + def test_webhook_in_defaults(self): + assert "webhook" in DEFAULTS + assert DEFAULTS["webhook"]["enabled"] is False + + +# --------------------------------------------------------------------------- +# TestWhoami +# --------------------------------------------------------------------------- + + +class TestWhoami: + def test_shows_admin(self): + h = _Harness(admins=["*!user@host"]) + h.inject_registration() + h.privmsg("nick", "#test", "!whoami") + asyncio.run(h.run()) + msgs = h.sent_privmsgs("#test") + assert any("admin" in m for m in msgs) + + def test_shows_trusted(self): + h = _Harness(trusted=["*!user@host"]) + h.inject_registration() + h.privmsg("nick", "#test", "!whoami") + asyncio.run(h.run()) + msgs = h.sent_privmsgs("#test") + assert any("trusted" in m for m in msgs) + + def test_shows_user(self): + h = _Harness() + h.inject_registration() + h.privmsg("nick", "#test", "!whoami") + asyncio.run(h.run()) + msgs = h.sent_privmsgs("#test") + assert any("user" in m for m in msgs) + + def test_shows_ircop_tag(self): + h = _Harness() + h.bot._opers.add("nick!user@host") + h.inject_registration() + h.privmsg("nick", "#test", "!whoami") + asyncio.run(h.run()) + msgs = h.sent_privmsgs("#test") + assert any("IRCOP" in m for m in msgs) + + +# --------------------------------------------------------------------------- +# TestAdmins +# --------------------------------------------------------------------------- + + +class TestAdmins: + def test_shows_all_tiers(self): + h = _Harness( + admins=["*!*@admin.host"], + operators=["*!*@oper.host"], + trusted=["*!*@trusted.host"], + ) + h.bot._opers.add("nick!user@host") + h.inject_registration() + # Must be admin to run !admins + h.privmsg("nick", "#test", "!admins", user="x", host="admin.host") + asyncio.run(h.run()) + msgs = h.sent_privmsgs("#test") + combined = " ".join(msgs) + assert "Admin:" in combined + assert "Oper:" in combined + assert "Trusted:" in combined + assert "IRCOPs:" in combined + + def test_omits_empty_tiers(self): + h = _Harness(admins=["*!user@host"]) + h.inject_registration() + h.privmsg("nick", "#test", "!admins") + asyncio.run(h.run()) + msgs = h.sent_privmsgs("#test") + combined = " ".join(msgs) + assert "Admin:" in combined + # No operators or trusted configured, so those sections shouldn't appear + assert "Oper:" not in combined + assert "Trusted:" not in combined diff --git a/tests/test_integration.py b/tests/test_integration.py index 05c4682..76dc287 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -274,7 +274,7 @@ class TestAdmin: replies = h.sent_privmsgs("#test") assert not any("Permission denied" in r for r in replies) - assert any("Patterns:" in r for r in replies) + assert any("Admin:" in r for r in replies) def test_oper_detection(self): """IRC operator detected via WHO reply can use admin commands.""" @@ -290,7 +290,7 @@ class TestAdmin: replies = h.sent_privmsgs("#test") assert not any("Permission denied" in r for r in replies) - assert any("Opers:" in r for r in replies) + assert any("IRCOPs:" in r for r in replies) def test_oper_detection_on_join(self): """User joining a channel triggers debounced WHO for oper detection."""