feat: add granular ACL tiers (trusted/oper/admin)

4-tier permission model: user < trusted < oper < admin.
Commands specify a required tier via tier= parameter.
Backward compatible: admin=True maps to tier="admin".

- TIERS constant and Handler.tier field in plugin.py
- _get_tier() method in bot.py with pattern matching
- _is_admin() preserved as thin wrapper
- operators/trusted config lists in config.py
- whoami shows tier, admins shows all configured tiers
- 32 test cases in test_acl.py
This commit is contained in:
user
2026-02-21 17:59:05 +01:00
parent 5bc59730c4
commit 2514aa777d
6 changed files with 480 additions and 35 deletions

View File

@@ -139,34 +139,33 @@ async def cmd_plugins(bot, message):
await bot.reply(message, f"Plugins: {', '.join(parts)}") await bot.reply(message, f"Plugins: {', '.join(parts)}")
@command("whoami", help="Show your hostmask and admin status") @command("whoami", help="Show your hostmask and permission tier")
async def cmd_whoami(bot, message): async def cmd_whoami(bot, message):
"""Display the sender's hostmask and permission level.""" """Display the sender's hostmask and permission level."""
prefix = message.prefix or "unknown" prefix = message.prefix or "unknown"
is_admin = bot._is_admin(message) tier = bot._get_tier(message)
is_oper = message.prefix in bot._opers if message.prefix else False tags = [tier]
tags = [] if message.prefix and message.prefix in bot._opers:
if is_admin:
tags.append("admin")
else:
tags.append("user")
if is_oper:
tags.append("IRCOP") tags.append("IRCOP")
await bot.reply(message, f"{prefix} [{', '.join(tags)}]") await bot.reply(message, f"{prefix} [{', '.join(tags)}]")
@command("admins", help="Show configured admin patterns and detected opers", admin=True) @command("admins", help="Show configured permission tiers and detected opers", admin=True)
async def cmd_admins(bot, message): async def cmd_admins(bot, message):
"""Display admin hostmask patterns and known IRC operators.""" """Display configured permission tiers and known IRC operators."""
parts = [] parts = []
if bot._admins: if bot._admins:
parts.append(f"Patterns: {', '.join(bot._admins)}") parts.append(f"Admin: {', '.join(bot._admins)}")
else: else:
parts.append("Patterns: (none)") parts.append("Admin: (none)")
if bot._operators:
parts.append(f"Oper: {', '.join(bot._operators)}")
if bot._trusted:
parts.append(f"Trusted: {', '.join(bot._trusted)}")
if bot._opers: if bot._opers:
parts.append(f"Opers: {', '.join(sorted(bot._opers))}") parts.append(f"IRCOPs: {', '.join(sorted(bot._opers))}")
else: else:
parts.append("Opers: (none)") parts.append("IRCOPs: (none)")
await bot.reply(message, " | ".join(parts)) await bot.reply(message, " | ".join(parts))

View File

@@ -13,7 +13,7 @@ from pathlib import Path
from derp import __version__ from derp import __version__
from derp.irc import _MAX_IRC_LINE, IRCConnection, Message, format_msg, parse from derp.irc import _MAX_IRC_LINE, IRCConnection, Message, format_msg, parse
from derp.plugin import Handler, PluginRegistry from derp.plugin import TIERS, Handler, PluginRegistry
from derp.state import StateStore from derp.state import StateStore
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -93,6 +93,8 @@ class Bot:
self._tasks: set[asyncio.Task] = set() self._tasks: set[asyncio.Task] = set()
self._reconnect_delay: float = 5.0 self._reconnect_delay: float = 5.0
self._admins: list[str] = config.get("bot", {}).get("admins", []) self._admins: list[str] = config.get("bot", {}).get("admins", [])
self._operators: list[str] = config.get("bot", {}).get("operators", [])
self._trusted: list[str] = config.get("bot", {}).get("trusted", [])
self._opers: set[str] = set() # hostmasks of known IRC operators self._opers: set[str] = set() # hostmasks of known IRC operators
self._caps: set[str] = set() # negotiated IRCv3 caps self._caps: set[str] = set() # negotiated IRCv3 caps
self._who_pending: dict[str, asyncio.Task] = {} # debounced WHO per channel self._who_pending: dict[str, asyncio.Task] = {} # debounced WHO per channel
@@ -359,20 +361,33 @@ class Bot:
return True return True
return plugin_name in allowed return plugin_name in allowed
def _get_tier(self, msg: Message) -> str:
"""Determine the permission tier of the message sender.
Checks (in priority order): IRC operator, admin pattern,
operator pattern, trusted pattern. Falls back to ``"user"``.
"""
if not msg.prefix:
return "user"
if msg.prefix in self._opers:
return "admin"
for pattern in self._admins:
if fnmatch.fnmatch(msg.prefix, pattern):
return "admin"
for pattern in self._operators:
if fnmatch.fnmatch(msg.prefix, pattern):
return "oper"
for pattern in self._trusted:
if fnmatch.fnmatch(msg.prefix, pattern):
return "trusted"
return "user"
def _is_admin(self, msg: Message) -> bool: def _is_admin(self, msg: Message) -> bool:
"""Check if the message sender is a bot admin. """Check if the message sender is a bot admin.
Returns True if the sender is a known IRC operator or matches Thin wrapper around ``_get_tier`` for backward compatibility.
a configured hostmask pattern (fnmatch-style).
""" """
if not msg.prefix: return self._get_tier(msg) == "admin"
return False
if msg.prefix in self._opers:
return True
for pattern in self._admins:
if fnmatch.fnmatch(msg.prefix, pattern):
return True
return False
def _dispatch_command(self, msg: Message) -> None: def _dispatch_command(self, msg: Message) -> None:
"""Check if a PRIVMSG is a bot command and spawn it.""" """Check if a PRIVMSG is a bot command and spawn it."""
@@ -396,10 +411,13 @@ class Bot:
if not self._plugin_allowed(handler.plugin, channel): if not self._plugin_allowed(handler.plugin, channel):
return return
if handler.admin and not self._is_admin(msg): required = handler.tier
deny = f"Permission denied: {self.prefix}{cmd_name} requires admin" if required != "user":
self._spawn(self.reply(msg, deny), name=f"cmd:{cmd_name}:denied") sender = self._get_tier(msg)
return if TIERS.index(sender) < TIERS.index(required):
deny = f"Permission denied: {self.prefix}{cmd_name} requires {required}"
self._spawn(self.reply(msg, deny), name=f"cmd:{cmd_name}:denied")
return
self._spawn(self._run_command(handler, cmd_name, msg), name=f"cmd:{cmd_name}") self._spawn(self._run_command(handler, cmd_name, msg), name=f"cmd:{cmd_name}")

View File

@@ -29,8 +29,16 @@ DEFAULTS: dict = {
"rate_burst": 5, "rate_burst": 5,
"paste_threshold": 4, "paste_threshold": 4,
"admins": [], "admins": [],
"operators": [],
"trusted": [],
}, },
"channels": {}, "channels": {},
"webhook": {
"enabled": False,
"host": "127.0.0.1",
"port": 8080,
"secret": "",
},
"logging": { "logging": {
"level": "info", "level": "info",
"format": "text", "format": "text",

View File

@@ -12,6 +12,8 @@ from typing import Any, Callable
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
TIERS: tuple[str, ...] = ("user", "trusted", "oper", "admin")
@dataclass(slots=True) @dataclass(slots=True)
class Handler: class Handler:
@@ -22,9 +24,10 @@ class Handler:
help: str = "" help: str = ""
plugin: str = "" plugin: str = ""
admin: bool = False admin: bool = False
tier: str = "user"
def command(name: str, help: str = "", admin: bool = False) -> Callable: def command(name: str, help: str = "", admin: bool = False, tier: str = "") -> Callable:
"""Decorator to register an async function as a bot command. """Decorator to register an async function as a bot command.
Usage:: Usage::
@@ -36,12 +39,17 @@ def command(name: str, help: str = "", admin: bool = False) -> Callable:
@command("reload", help="Reload a plugin", admin=True) @command("reload", help="Reload a plugin", admin=True)
async def cmd_reload(bot, message): async def cmd_reload(bot, message):
... ...
@command("trusted_cmd", help="Trusted-only", tier="trusted")
async def cmd_trusted(bot, message):
...
""" """
def decorator(func: Callable) -> Callable: def decorator(func: Callable) -> Callable:
func._derp_command = name # type: ignore[attr-defined] func._derp_command = name # type: ignore[attr-defined]
func._derp_help = help # type: ignore[attr-defined] func._derp_help = help # type: ignore[attr-defined]
func._derp_admin = admin # type: ignore[attr-defined] func._derp_admin = admin # type: ignore[attr-defined]
func._derp_tier = tier if tier else ("admin" if admin else "user") # type: ignore[attr-defined]
return func return func
return decorator return decorator
@@ -74,12 +82,14 @@ class PluginRegistry:
self._paths: dict[str, Path] = {} self._paths: dict[str, Path] = {}
def register_command(self, name: str, callback: Callable, help: str = "", def register_command(self, name: str, callback: Callable, help: str = "",
plugin: str = "", admin: bool = False) -> None: plugin: str = "", admin: bool = False,
tier: str = "user") -> None:
"""Register a command handler.""" """Register a command handler."""
if name in self.commands: if name in self.commands:
log.warning("command '%s' already registered, overwriting", name) log.warning("command '%s' already registered, overwriting", name)
self.commands[name] = Handler( self.commands[name] = Handler(
name=name, callback=callback, help=help, plugin=plugin, admin=admin, name=name, callback=callback, help=help, plugin=plugin,
admin=admin, tier=tier,
) )
log.debug("registered command: %s (%s)", name, plugin) log.debug("registered command: %s (%s)", name, plugin)
@@ -102,6 +112,7 @@ class PluginRegistry:
help=getattr(obj, "_derp_help", ""), help=getattr(obj, "_derp_help", ""),
plugin=plugin_name, plugin=plugin_name,
admin=getattr(obj, "_derp_admin", False), admin=getattr(obj, "_derp_admin", False),
tier=getattr(obj, "_derp_tier", "user"),
) )
count += 1 count += 1
if hasattr(obj, "_derp_event"): if hasattr(obj, "_derp_event"):

409
tests/test_acl.py Normal file
View File

@@ -0,0 +1,409 @@
"""Tests for the granular ACL tier system."""
import asyncio
from pathlib import Path
from derp.bot import Bot
from derp.config import DEFAULTS
from derp.irc import Message, parse
from derp.plugin import TIERS, PluginRegistry, command
# -- Helpers -----------------------------------------------------------------
class _MockConnection:
"""Minimal mock IRC connection for dispatch tests."""
def __init__(self) -> None:
self._queue: asyncio.Queue[str | None] = asyncio.Queue()
self.sent: list[str] = []
async def connect(self) -> None:
pass
async def close(self) -> None:
pass
async def send(self, line: str) -> None:
self.sent.append(line)
async def readline(self) -> str | None:
return await self._queue.get()
def inject(self, line: str) -> None:
self._queue.put_nowait(line)
def disconnect(self) -> None:
self._queue.put_nowait(None)
class _Harness:
"""Test fixture: Bot with mock connection and configurable tiers."""
def __init__(
self,
admins: list[str] | None = None,
operators: list[str] | None = None,
trusted: list[str] | None = None,
) -> None:
config: dict = {
"server": {
"host": "localhost", "port": 6667, "tls": False,
"nick": "test", "user": "test", "realname": "test bot",
},
"bot": {
"prefix": "!",
"channels": [],
"plugins_dir": "plugins",
"admins": admins or [],
"operators": operators or [],
"trusted": trusted or [],
"rate_limit": 100.0,
"rate_burst": 100,
},
}
self.registry = PluginRegistry()
self.bot = Bot(config, self.registry)
self.conn = _MockConnection()
self.bot.conn = self.conn # type: ignore[assignment]
self.registry.load_plugin(Path("plugins/core.py"))
def inject_registration(self) -> None:
self.conn.inject(":server CAP * LS :")
self.conn.inject(":server 001 test :Welcome")
def privmsg(self, nick: str, target: str, text: str,
user: str = "user", host: str = "host") -> None:
self.conn.inject(f":{nick}!{user}@{host} PRIVMSG {target} :{text}")
async def run(self) -> None:
self.conn.disconnect()
self.bot._running = True
await self.bot._connect_and_run()
if self.bot._tasks:
await asyncio.gather(*list(self.bot._tasks), return_exceptions=True)
def sent_privmsgs(self, target: str) -> list[str]:
results = []
for line in self.conn.sent:
msg = parse(line)
if msg.command == "PRIVMSG" and msg.target == target:
results.append(msg.text)
return results
def _msg(text: str, prefix: str = "nick!user@host") -> Message:
nick = prefix.split("!")[0] if "!" in prefix else prefix
return Message(
raw="", prefix=prefix, nick=nick,
command="PRIVMSG", params=["#test", text], tags={},
)
# ---------------------------------------------------------------------------
# TestTierConstants
# ---------------------------------------------------------------------------
class TestTierConstants:
def test_tier_order(self):
assert TIERS == ("user", "trusted", "oper", "admin")
def test_index_comparison(self):
assert TIERS.index("user") < TIERS.index("trusted")
assert TIERS.index("trusted") < TIERS.index("oper")
assert TIERS.index("oper") < TIERS.index("admin")
# ---------------------------------------------------------------------------
# TestGetTier
# ---------------------------------------------------------------------------
class TestGetTier:
def test_no_prefix(self):
h = _Harness()
msg = Message(raw="", prefix="", nick="", command="PRIVMSG",
params=["#test", "hi"], tags={})
assert h.bot._get_tier(msg) == "user"
def test_ircop(self):
h = _Harness()
h.bot._opers.add("op!root@server")
msg = _msg("test", prefix="op!root@server")
assert h.bot._get_tier(msg) == "admin"
def test_admin_pattern(self):
h = _Harness(admins=["*!*@admin.host"])
msg = _msg("test", prefix="alice!user@admin.host")
assert h.bot._get_tier(msg) == "admin"
def test_oper_pattern(self):
h = _Harness(operators=["*!*@oper.host"])
msg = _msg("test", prefix="bob!user@oper.host")
assert h.bot._get_tier(msg) == "oper"
def test_trusted_pattern(self):
h = _Harness(trusted=["*!*@trusted.host"])
msg = _msg("test", prefix="carol!user@trusted.host")
assert h.bot._get_tier(msg) == "trusted"
def test_no_match(self):
h = _Harness(admins=["*!*@admin.host"])
msg = _msg("test", prefix="nobody!user@random.host")
assert h.bot._get_tier(msg) == "user"
def test_priority_admin_over_oper(self):
"""Admin patterns take priority over operator patterns."""
h = _Harness(admins=["*!*@dual.host"], operators=["*!*@dual.host"])
msg = _msg("test", prefix="x!y@dual.host")
assert h.bot._get_tier(msg) == "admin"
def test_priority_oper_over_trusted(self):
"""Operator patterns take priority over trusted patterns."""
h = _Harness(operators=["*!*@dual.host"], trusted=["*!*@dual.host"])
msg = _msg("test", prefix="x!y@dual.host")
assert h.bot._get_tier(msg) == "oper"
def test_ircop_over_admin_pattern(self):
"""IRC operator status takes priority over admin hostmask pattern."""
h = _Harness(admins=["*!*@server"])
h.bot._opers.add("op!root@server")
msg = _msg("test", prefix="op!root@server")
# Both match, but ircop check comes first
assert h.bot._get_tier(msg) == "admin"
# ---------------------------------------------------------------------------
# TestIsAdminBackcompat
# ---------------------------------------------------------------------------
class TestIsAdminBackcompat:
def test_admin_true(self):
h = _Harness(admins=["*!*@admin.host"])
msg = _msg("test", prefix="a!b@admin.host")
assert h.bot._is_admin(msg) is True
def test_oper_false(self):
h = _Harness(operators=["*!*@oper.host"])
msg = _msg("test", prefix="a!b@oper.host")
assert h.bot._is_admin(msg) is False
def test_trusted_false(self):
h = _Harness(trusted=["*!*@trusted.host"])
msg = _msg("test", prefix="a!b@trusted.host")
assert h.bot._is_admin(msg) is False
# ---------------------------------------------------------------------------
# TestCommandDecorator
# ---------------------------------------------------------------------------
class TestCommandDecorator:
def test_admin_true_sets_tier(self):
@command("x", admin=True)
async def handler(bot, msg):
pass
assert handler._derp_tier == "admin"
def test_explicit_tier(self):
@command("x", tier="trusted")
async def handler(bot, msg):
pass
assert handler._derp_tier == "trusted"
def test_tier_overrides_admin(self):
@command("x", admin=True, tier="oper")
async def handler(bot, msg):
pass
assert handler._derp_tier == "oper"
def test_default_tier(self):
@command("x")
async def handler(bot, msg):
pass
assert handler._derp_tier == "user"
# ---------------------------------------------------------------------------
# TestDispatchWithTiers
# ---------------------------------------------------------------------------
class TestDispatchWithTiers:
def test_trusted_allowed_for_trusted_cmd(self):
"""Trusted user can run a trusted-tier command."""
h = _Harness(trusted=["*!user@host"])
@command("tcmd", tier="trusted")
async def cmd_tcmd(bot, message):
await bot.reply(message, "ok")
h.registry.register_command(
"tcmd", cmd_tcmd, tier="trusted", plugin="test",
)
h.inject_registration()
h.privmsg("nick", "#test", "!tcmd")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("ok" in m for m in msgs)
def test_admin_allowed_for_trusted_cmd(self):
"""Admin can run trusted-tier commands (higher tier)."""
h = _Harness(admins=["*!user@host"])
@command("tcmd", tier="trusted")
async def cmd_tcmd(bot, message):
await bot.reply(message, "ok")
h.registry.register_command(
"tcmd", cmd_tcmd, tier="trusted", plugin="test",
)
h.inject_registration()
h.privmsg("nick", "#test", "!tcmd")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("ok" in m for m in msgs)
def test_user_denied_for_trusted_cmd(self):
"""Regular user is denied a trusted-tier command."""
h = _Harness()
@command("tcmd", tier="trusted")
async def cmd_tcmd(bot, message):
await bot.reply(message, "ok")
h.registry.register_command(
"tcmd", cmd_tcmd, tier="trusted", plugin="test",
)
h.inject_registration()
h.privmsg("nick", "#test", "!tcmd")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("Permission denied" in m for m in msgs)
assert any("requires trusted" in m for m in msgs)
def test_backward_compat_admin_flag(self):
"""admin=True commands still work via tier='admin'."""
h = _Harness(admins=["*!user@host"])
h.inject_registration()
# cmd_admins is already registered via core plugin with admin=True
h.privmsg("nick", "#test", "!admins")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("Admin:" in m for m in msgs)
def test_denial_message_shows_tier(self):
"""Permission denial message includes the required tier name."""
h = _Harness()
@command("opcmd", tier="oper")
async def cmd_opcmd(bot, message):
await bot.reply(message, "ok")
h.registry.register_command(
"opcmd", cmd_opcmd, tier="oper", plugin="test",
)
h.inject_registration()
h.privmsg("nick", "#test", "!opcmd")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("requires oper" in m for m in msgs)
# ---------------------------------------------------------------------------
# TestConfigDefaults
# ---------------------------------------------------------------------------
class TestConfigDefaults:
def test_operators_in_defaults(self):
assert "operators" in DEFAULTS["bot"]
assert DEFAULTS["bot"]["operators"] == []
def test_trusted_in_defaults(self):
assert "trusted" in DEFAULTS["bot"]
assert DEFAULTS["bot"]["trusted"] == []
def test_webhook_in_defaults(self):
assert "webhook" in DEFAULTS
assert DEFAULTS["webhook"]["enabled"] is False
# ---------------------------------------------------------------------------
# TestWhoami
# ---------------------------------------------------------------------------
class TestWhoami:
def test_shows_admin(self):
h = _Harness(admins=["*!user@host"])
h.inject_registration()
h.privmsg("nick", "#test", "!whoami")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("admin" in m for m in msgs)
def test_shows_trusted(self):
h = _Harness(trusted=["*!user@host"])
h.inject_registration()
h.privmsg("nick", "#test", "!whoami")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("trusted" in m for m in msgs)
def test_shows_user(self):
h = _Harness()
h.inject_registration()
h.privmsg("nick", "#test", "!whoami")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("user" in m for m in msgs)
def test_shows_ircop_tag(self):
h = _Harness()
h.bot._opers.add("nick!user@host")
h.inject_registration()
h.privmsg("nick", "#test", "!whoami")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("IRCOP" in m for m in msgs)
# ---------------------------------------------------------------------------
# TestAdmins
# ---------------------------------------------------------------------------
class TestAdmins:
def test_shows_all_tiers(self):
h = _Harness(
admins=["*!*@admin.host"],
operators=["*!*@oper.host"],
trusted=["*!*@trusted.host"],
)
h.bot._opers.add("nick!user@host")
h.inject_registration()
# Must be admin to run !admins
h.privmsg("nick", "#test", "!admins", user="x", host="admin.host")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
combined = " ".join(msgs)
assert "Admin:" in combined
assert "Oper:" in combined
assert "Trusted:" in combined
assert "IRCOPs:" in combined
def test_omits_empty_tiers(self):
h = _Harness(admins=["*!user@host"])
h.inject_registration()
h.privmsg("nick", "#test", "!admins")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
combined = " ".join(msgs)
assert "Admin:" in combined
# No operators or trusted configured, so those sections shouldn't appear
assert "Oper:" not in combined
assert "Trusted:" not in combined

View File

@@ -274,7 +274,7 @@ class TestAdmin:
replies = h.sent_privmsgs("#test") replies = h.sent_privmsgs("#test")
assert not any("Permission denied" in r for r in replies) assert not any("Permission denied" in r for r in replies)
assert any("Patterns:" in r for r in replies) assert any("Admin:" in r for r in replies)
def test_oper_detection(self): def test_oper_detection(self):
"""IRC operator detected via WHO reply can use admin commands.""" """IRC operator detected via WHO reply can use admin commands."""
@@ -290,7 +290,7 @@ class TestAdmin:
replies = h.sent_privmsgs("#test") replies = h.sent_privmsgs("#test")
assert not any("Permission denied" in r for r in replies) assert not any("Permission denied" in r for r in replies)
assert any("Opers:" in r for r in replies) assert any("IRCOPs:" in r for r in replies)
def test_oper_detection_on_join(self): def test_oper_detection_on_join(self):
"""User joining a channel triggers debounced WHO for oper detection.""" """User joining a channel triggers debounced WHO for oper detection."""