feat: add admin/owner permission system

Hostmask-based admin controls with automatic IRCOP detection via WHO.
Permission enforcement in the central dispatch path denies restricted
commands to non-admins. Includes !whoami and !admins commands, marks
load/reload/unload as admin-only.

Also lands previously-implemented SASL PLAIN auth, token-bucket rate
limiting, and CTCP VERSION/TIME/PING responses that were staged but
uncommitted.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-15 02:24:56 +01:00
parent 36b21e2463
commit f96224afb1
9 changed files with 408 additions and 18 deletions

View File

@@ -6,11 +6,19 @@ nick = "derp"
user = "derp" user = "derp"
realname = "derp IRC bot" realname = "derp IRC bot"
password = "" password = ""
# sasl_user = "account" # SASL PLAIN username (optional)
# sasl_pass = "secret" # SASL PLAIN password (optional)
[bot] [bot]
prefix = "!" prefix = "!"
channels = ["#test"] channels = ["#test"]
plugins_dir = "plugins" plugins_dir = "plugins"
# rate_limit = 2.0 # Messages per second (default: 2.0)
# rate_burst = 5 # Burst capacity (default: 5)
# admins = [ # Hostmask patterns (fnmatch), IRCOPs auto-detected
# "*!~user@trusted.host",
# "ops!*@*.ops.net",
# ]
[logging] [logging]
level = "info" level = "info"

View File

@@ -8,5 +8,4 @@ services:
volumes: volumes:
- ./config/derp.toml:/app/config/derp.toml:ro,Z - ./config/derp.toml:/app/config/derp.toml:ro,Z
- ./plugins:/app/plugins:ro,Z - ./plugins:/app/plugins:ro,Z
- ./profile:/app/profile:Z command: ["--verbose"]
command: ["--verbose", "--cprofile", "/app/profile/derp.prof"]

View File

@@ -13,6 +13,24 @@ derp -v # Verbose/debug mode
derp --cprofile # Profile to derp.prof derp --cprofile # Profile to derp.prof
``` ```
## SASL Authentication
```toml
# In config/derp.toml
[server]
sasl_user = "account"
sasl_pass = "password"
```
## Rate Limiting
```toml
# In config/derp.toml (defaults shown)
[bot]
rate_limit = 2.0 # Messages per second
rate_burst = 5 # Burst capacity
```
## Container ## Container
```bash ```bash
@@ -35,13 +53,28 @@ make logs # Follow logs
!h # Shorthand (any unambiguous prefix works) !h # Shorthand (any unambiguous prefix works)
``` ```
## Plugin Management ## Admin
```
!whoami # Show your hostmask + admin status
!admins # Show admin patterns + detected opers (admin)
```
```toml
# config/derp.toml
[bot]
admins = ["*!~user@trusted.host", "ops!*@*.ops.net"]
```
IRC operators are auto-detected via WHO. Hostmask patterns use fnmatch.
## Plugin Management (admin)
``` ```
!plugins # List loaded plugins !plugins # List loaded plugins
!load <plugin> # Hot-load a plugin !load <plugin> # Hot-load a plugin (admin)
!reload <plugin> # Reload a changed plugin !reload <plugin> # Reload a changed plugin (admin)
!unload <plugin> # Remove a plugin !unload <plugin> # Remove a plugin (admin)
``` ```
## OSINT ## OSINT

View File

@@ -33,11 +33,16 @@ nick = "derp" # Bot nickname
user = "derp" # Username (ident) user = "derp" # Username (ident)
realname = "derp IRC bot" # Real name field realname = "derp IRC bot" # Real name field
password = "" # Server password (optional) password = "" # Server password (optional)
sasl_user = "" # SASL PLAIN username (optional)
sasl_pass = "" # SASL PLAIN password (optional)
[bot] [bot]
prefix = "!" # Command prefix character prefix = "!" # Command prefix character
channels = ["#test"] # Channels to join on connect channels = ["#test"] # Channels to join on connect
plugins_dir = "plugins" # Plugin directory path plugins_dir = "plugins" # Plugin directory path
rate_limit = 2.0 # Max messages per second (default: 2.0)
rate_burst = 5 # Burst capacity (default: 5)
admins = [] # Hostmask patterns (fnmatch), IRCOPs auto-detected
[logging] [logging]
level = "info" # Logging level: debug, info, warning, error level = "info" # Logging level: debug, info, warning, error
@@ -55,9 +60,11 @@ level = "info" # Logging level: debug, info, warning, error
| `!uptime` | Show how long the bot has been running | | `!uptime` | Show how long the bot has been running |
| `!echo <text>` | Echo back text (example plugin) | | `!echo <text>` | Echo back text (example plugin) |
| `!cert <domain> [...]` | Lookup CT logs for up to 5 domains | | `!cert <domain> [...]` | Lookup CT logs for up to 5 domains |
| `!load <plugin>` | Hot-load a plugin from the plugins directory | | `!whoami` | Show your hostmask and admin status |
| `!reload <plugin>` | Reload a plugin, picking up file changes | | `!load <plugin>` | Hot-load a plugin (admin) |
| `!unload <plugin>` | Unload a plugin, removing its handlers | | `!reload <plugin>` | Reload a plugin (admin) |
| `!unload <plugin>` | Unload a plugin (admin) |
| `!admins` | Show admin patterns and detected opers (admin) |
| `!plugins` | List loaded plugins with handler counts | | `!plugins` | List loaded plugins with handler counts |
| `!dns <target> [type]` | DNS lookup (A, AAAA, MX, NS, TXT, CNAME, PTR, SOA) | | `!dns <target> [type]` | DNS lookup (A, AAAA, MX, NS, TXT, CNAME, PTR, SOA) |
| `!encode <fmt> <text>` | Encode text (b64, hex, url, rot13) | | `!encode <fmt> <text>` | Encode text (b64, hex, url, rot13) |
@@ -114,6 +121,40 @@ broken.test -- error: timeout
- crt.sh can be slow; the bot confirms receipt before querying - crt.sh can be slow; the bot confirms receipt before querying
- Live cert check runs only when expired CT entries exist - Live cert check runs only when expired CT entries exist
## Admin System
Commands marked as `admin` require elevated permissions. Admin access is
granted via:
1. **IRC operator status** -- detected automatically via `WHO` on channel join
2. **Hostmask patterns** -- configured in `[bot] admins`, fnmatch-style
```toml
[bot]
admins = [
"*!~user@trusted.host",
"ops!*@*.ops.net",
]
```
Empty by default -- only IRC operators get admin access unless patterns
are configured.
| Command | Description |
|---------|-------------|
| `!whoami` | Show your hostmask and admin status |
| `!admins` | Show configured patterns and detected opers (admin) |
Admin-restricted commands: `!load`, `!reload`, `!unload`, `!admins`.
### Writing Admin Commands
```python
@command("dangerous", help="Admin-only action", admin=True)
async def cmd_dangerous(bot, message):
...
```
## Plugin Management ## Plugin Management
Plugins can be loaded, unloaded, and reloaded at runtime without Plugins can be loaded, unloaded, and reloaded at runtime without

View File

@@ -76,7 +76,7 @@ async def cmd_uptime(bot, message):
await bot.reply(message, f"up {' '.join(parts)}") await bot.reply(message, f"up {' '.join(parts)}")
@command("load", help="Hot-load a plugin: !load <name>") @command("load", help="Hot-load a plugin: !load <name>", admin=True)
async def cmd_load(bot, message): async def cmd_load(bot, message):
"""Load a new plugin from the plugins directory.""" """Load a new plugin from the plugins directory."""
parts = message.text.split(None, 2) parts = message.text.split(None, 2)
@@ -91,7 +91,7 @@ async def cmd_load(bot, message):
await bot.reply(message, f"Failed to load plugin: {reason}") await bot.reply(message, f"Failed to load plugin: {reason}")
@command("reload", help="Reload a plugin: !reload <name>") @command("reload", help="Reload a plugin: !reload <name>", admin=True)
async def cmd_reload(bot, message): async def cmd_reload(bot, message):
"""Unload and reload a plugin, picking up file changes.""" """Unload and reload a plugin, picking up file changes."""
parts = message.text.split(None, 2) parts = message.text.split(None, 2)
@@ -106,7 +106,7 @@ async def cmd_reload(bot, message):
await bot.reply(message, f"Failed to reload plugin: {reason}") await bot.reply(message, f"Failed to reload plugin: {reason}")
@command("unload", help="Unload a plugin: !unload <name>") @command("unload", help="Unload a plugin: !unload <name>", admin=True)
async def cmd_unload(bot, message): async def cmd_unload(bot, message):
"""Unload a plugin, removing all its handlers.""" """Unload a plugin, removing all its handlers."""
parts = message.text.split(None, 2) parts = message.text.split(None, 2)
@@ -132,3 +132,34 @@ async def cmd_plugins(bot, message):
counts[handler.plugin] += 1 counts[handler.plugin] += 1
parts = [f"{name} ({counts[name]})" for name in sorted(counts)] parts = [f"{name} ({counts[name]})" for name in sorted(counts)]
await bot.reply(message, f"Plugins: {', '.join(parts)}") await bot.reply(message, f"Plugins: {', '.join(parts)}")
@command("whoami", help="Show your hostmask and admin status")
async def cmd_whoami(bot, message):
"""Display the sender's hostmask and permission level."""
prefix = message.prefix or "unknown"
is_admin = bot._is_admin(message)
is_oper = message.prefix in bot._opers if message.prefix else False
tags = []
if is_admin:
tags.append("admin")
else:
tags.append("user")
if is_oper:
tags.append("IRCOP")
await bot.reply(message, f"{prefix} [{', '.join(tags)}]")
@command("admins", help="Show configured admin patterns and detected opers", admin=True)
async def cmd_admins(bot, message):
"""Display admin hostmask patterns and known IRC operators."""
parts = []
if bot._admins:
parts.append(f"Patterns: {', '.join(bot._admins)}")
else:
parts.append("Patterns: (none)")
if bot._opers:
parts.append(f"Opers: {', '.join(sorted(bot._opers))}")
else:
parts.append("Opers: (none)")
await bot.reply(message, " | ".join(parts))

View File

@@ -3,10 +3,14 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import base64
import fnmatch
import logging import logging
import time import time
from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from derp import __version__
from derp.irc import IRCConnection, Message, format_msg, parse from derp.irc import IRCConnection, Message, format_msg, parse
from derp.plugin import Handler, PluginRegistry from derp.plugin import Handler, PluginRegistry
@@ -16,6 +20,30 @@ RECONNECT_DELAY = 30
_AMBIGUOUS = object() # sentinel for ambiguous prefix matches _AMBIGUOUS = object() # sentinel for ambiguous prefix matches
class _TokenBucket:
"""Token bucket rate limiter for outgoing messages."""
def __init__(self, rate: float, burst: int) -> None:
self._rate = rate
self._burst = burst
self._tokens = float(burst)
self._last = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self) -> None:
"""Wait until a token is available."""
async with self._lock:
now = time.monotonic()
self._tokens = min(self._burst, self._tokens + (now - self._last) * self._rate)
self._last = now
if self._tokens < 1:
wait = (1 - self._tokens) / self._rate
await asyncio.sleep(wait)
self._tokens = 0
else:
self._tokens -= 1
class Bot: class Bot:
"""IRC bot: ties connection, config, and plugins together.""" """IRC bot: ties connection, config, and plugins together."""
@@ -33,6 +61,14 @@ class Bot:
self._running = False self._running = False
self._started: float = time.monotonic() self._started: float = time.monotonic()
self._tasks: set[asyncio.Task] = set() self._tasks: set[asyncio.Task] = set()
self._admins: list[str] = config.get("bot", {}).get("admins", [])
self._opers: set[str] = set() # hostmasks of known IRC operators
# Rate limiter: default 2 msg/sec, burst of 5
rate_cfg = config.get("bot", {})
self._bucket = _TokenBucket(
rate=rate_cfg.get("rate_limit", 2.0),
burst=rate_cfg.get("rate_burst", 5),
)
async def start(self) -> None: async def start(self) -> None:
"""Connect, register, join channels, and enter the main loop.""" """Connect, register, join channels, and enter the main loop."""
@@ -56,13 +92,71 @@ class Bot:
await self.conn.close() await self.conn.close()
async def _register(self) -> None: async def _register(self) -> None:
"""Send NICK/USER registration to the server.""" """Send NICK/USER registration, with optional SASL PLAIN."""
srv = self.config["server"] srv = self.config["server"]
sasl_user = srv.get("sasl_user", "")
sasl_pass = srv.get("sasl_pass", "")
if sasl_user and sasl_pass:
await self._sasl_auth(sasl_user, sasl_pass)
if srv.get("password"): if srv.get("password"):
await self.conn.send(format_msg("PASS", srv["password"])) await self.conn.send(format_msg("PASS", srv["password"]))
await self.conn.send(format_msg("NICK", self.nick)) await self.conn.send(format_msg("NICK", self.nick))
await self.conn.send(format_msg("USER", srv["user"], "0", "*", srv["realname"])) await self.conn.send(format_msg("USER", srv["user"], "0", "*", srv["realname"]))
async def _sasl_auth(self, user: str, password: str) -> None:
"""Perform SASL PLAIN authentication during registration."""
await self.conn.send("CAP REQ :sasl")
# Wait for CAP ACK or NAK
while True:
line = await self.conn.readline()
if line is None:
log.error("connection closed during SASL negotiation")
return
msg = parse(line)
if msg.command == "CAP" and len(msg.params) >= 3:
sub = msg.params[1].upper()
if sub == "ACK" and "sasl" in msg.params[-1].lower():
break
if sub == "NAK":
log.warning("server rejected SASL capability")
await self.conn.send("CAP END")
return
# Send AUTHENTICATE PLAIN
await self.conn.send("AUTHENTICATE PLAIN")
# Wait for AUTHENTICATE +
while True:
line = await self.conn.readline()
if line is None:
return
msg = parse(line)
if msg.command == "AUTHENTICATE" and msg.params and msg.params[0] == "+":
break
# Send credentials: base64(user\0user\0pass)
payload = f"{user}\x00{user}\x00{password}"
encoded = base64.b64encode(payload.encode("utf-8")).decode("ascii")
await self.conn.send(f"AUTHENTICATE {encoded}")
# Wait for 903 (success) or 904 (failure)
while True:
line = await self.conn.readline()
if line is None:
return
msg = parse(line)
if msg.command == "903":
log.info("SASL authentication successful")
break
if msg.command in ("904", "905", "906"):
log.error("SASL authentication failed: %s", msg.params[-1] if msg.params else "")
break
await self.conn.send("CAP END")
async def _loop(self) -> None: async def _loop(self) -> None:
"""Read and dispatch messages until disconnect.""" """Read and dispatch messages until disconnect."""
while self._running: while self._running:
@@ -80,11 +174,22 @@ class Bot:
await self.conn.send(format_msg("PONG", msg.params[0] if msg.params else "")) await self.conn.send(format_msg("PONG", msg.params[0] if msg.params else ""))
return return
# RPL_WELCOME (001) — join channels # RPL_WELCOME (001) — join channels and WHO for oper detection
if msg.command == "001": if msg.command == "001":
self.nick = msg.params[0] if msg.params else self.nick self.nick = msg.params[0] if msg.params else self.nick
for channel in self.config["bot"]["channels"]: for channel in self.config["bot"]["channels"]:
await self.join(channel) await self.join(channel)
await self.conn.send(format_msg("WHO", channel))
# RPL_WHOREPLY (352) — detect IRC operators
if msg.command == "352" and len(msg.params) >= 7:
_chan, user, host, _server, nick, flags = msg.params[1:7]
if "*" in flags:
self._opers.add(f"{nick}!{user}@{host}")
# QUIT — remove departed nicks from oper set
if msg.command == "QUIT" and msg.prefix:
self._opers.discard(msg.prefix)
# Nick already in use (433) — append underscore # Nick already in use (433) — append underscore
if msg.command == "433": if msg.command == "433":
@@ -92,6 +197,11 @@ class Bot:
await self.conn.send(format_msg("NICK", self.nick)) await self.conn.send(format_msg("NICK", self.nick))
return return
# CTCP queries (PRIVMSG with \x01...\x01 wrapping)
if msg.command == "PRIVMSG" and msg.text and msg.text.startswith("\x01"):
self._spawn(self._handle_ctcp(msg), name="ctcp")
return
# Dispatch to event handlers (fire-and-forget) # Dispatch to event handlers (fire-and-forget)
event_type = msg.command event_type = msg.command
for handler in self.registry.events.get(event_type, []): for handler in self.registry.events.get(event_type, []):
@@ -101,6 +211,44 @@ class Bot:
if msg.command == "PRIVMSG" and msg.text: if msg.command == "PRIVMSG" and msg.text:
self._dispatch_command(msg) self._dispatch_command(msg)
async def _handle_ctcp(self, msg: Message) -> None:
"""Respond to CTCP VERSION, TIME, and PING queries."""
text = msg.text.strip("\x01")
parts = text.split(None, 1)
ctcp_cmd = parts[0].upper() if parts else ""
ctcp_arg = parts[1] if len(parts) > 1 else ""
if not msg.nick:
return
if ctcp_cmd == "VERSION":
reply = f"\x01VERSION derp {__version__}\x01"
elif ctcp_cmd == "TIME":
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
reply = f"\x01TIME {now}\x01"
elif ctcp_cmd == "PING":
reply = f"\x01PING {ctcp_arg}\x01"
else:
return
await self.conn.send(format_msg("NOTICE", msg.nick, reply))
log.debug("CTCP %s reply to %s", ctcp_cmd, msg.nick)
def _is_admin(self, msg: Message) -> bool:
"""Check if the message sender is a bot admin.
Returns True if the sender is a known IRC operator or matches
a configured hostmask pattern (fnmatch-style).
"""
if not msg.prefix:
return False
if msg.prefix in self._opers:
return True
for pattern in self._admins:
if fnmatch.fnmatch(msg.prefix, pattern):
return True
return False
def _dispatch_command(self, msg: Message) -> None: def _dispatch_command(self, msg: Message) -> None:
"""Check if a PRIVMSG is a bot command and spawn it.""" """Check if a PRIVMSG is a bot command and spawn it."""
text = msg.text text = msg.text
@@ -119,6 +267,11 @@ class Bot:
name=f"cmd:{cmd_name}:ambiguous") name=f"cmd:{cmd_name}:ambiguous")
return return
if handler.admin and not self._is_admin(msg):
deny = f"Permission denied: {self.prefix}{cmd_name} requires admin"
self._spawn(self.reply(msg, deny), name=f"cmd:{cmd_name}:denied")
return
self._spawn(self._run_command(handler, cmd_name, msg), name=f"cmd:{cmd_name}") self._spawn(self._run_command(handler, cmd_name, msg), name=f"cmd:{cmd_name}")
async def _run_command(self, handler: Handler, cmd_name: str, msg: Message) -> None: async def _run_command(self, handler: Handler, cmd_name: str, msg: Message) -> None:
@@ -157,8 +310,9 @@ class Bot:
# -- Public API for plugins -- # -- Public API for plugins --
async def send(self, target: str, text: str) -> None: async def send(self, target: str, text: str) -> None:
"""Send a PRIVMSG to a target (channel or nick).""" """Send a PRIVMSG to a target (channel or nick), rate-limited."""
for line in text.split("\n"): for line in text.split("\n"):
await self._bucket.acquire()
await self.conn.send(format_msg("PRIVMSG", target, line)) await self.conn.send(format_msg("PRIVMSG", target, line))
async def reply(self, msg: Message, text: str) -> None: async def reply(self, msg: Message, text: str) -> None:

View File

@@ -14,11 +14,16 @@ DEFAULTS: dict = {
"user": "derp", "user": "derp",
"realname": "derp IRC bot", "realname": "derp IRC bot",
"password": "", "password": "",
"sasl_user": "",
"sasl_pass": "",
}, },
"bot": { "bot": {
"prefix": "!", "prefix": "!",
"channels": ["#test"], "channels": ["#test"],
"plugins_dir": "plugins", "plugins_dir": "plugins",
"rate_limit": 2.0,
"rate_burst": 5,
"admins": [],
}, },
"logging": { "logging": {
"level": "info", "level": "info",

View File

@@ -21,9 +21,10 @@ class Handler:
callback: Callable callback: Callable
help: str = "" help: str = ""
plugin: str = "" plugin: str = ""
admin: bool = False
def command(name: str, help: str = "") -> Callable: def command(name: str, help: str = "", admin: bool = False) -> Callable:
"""Decorator to register an async function as a bot command. """Decorator to register an async function as a bot command.
Usage:: Usage::
@@ -31,11 +32,16 @@ def command(name: str, help: str = "") -> Callable:
@command("ping", help="Check if the bot is alive") @command("ping", help="Check if the bot is alive")
async def cmd_ping(bot, message): async def cmd_ping(bot, message):
await bot.reply(message, "pong") await bot.reply(message, "pong")
@command("reload", help="Reload a plugin", admin=True)
async def cmd_reload(bot, message):
...
""" """
def decorator(func: Callable) -> Callable: def decorator(func: Callable) -> Callable:
func._derp_command = name # type: ignore[attr-defined] func._derp_command = name # type: ignore[attr-defined]
func._derp_help = help # type: ignore[attr-defined] func._derp_help = help # type: ignore[attr-defined]
func._derp_admin = admin # type: ignore[attr-defined]
return func return func
return decorator return decorator
@@ -68,11 +74,13 @@ class PluginRegistry:
self._paths: dict[str, Path] = {} self._paths: dict[str, Path] = {}
def register_command(self, name: str, callback: Callable, help: str = "", def register_command(self, name: str, callback: Callable, help: str = "",
plugin: str = "") -> None: plugin: str = "", admin: bool = False) -> None:
"""Register a command handler.""" """Register a command handler."""
if name in self.commands: if name in self.commands:
log.warning("command '%s' already registered, overwriting", name) log.warning("command '%s' already registered, overwriting", name)
self.commands[name] = Handler(name=name, callback=callback, help=help, plugin=plugin) self.commands[name] = Handler(
name=name, callback=callback, help=help, plugin=plugin, admin=admin,
)
log.debug("registered command: %s (%s)", name, plugin) log.debug("registered command: %s (%s)", name, plugin)
def register_event(self, event_type: str, callback: Callable, plugin: str = "") -> None: def register_event(self, event_type: str, callback: Callable, plugin: str = "") -> None:
@@ -93,6 +101,7 @@ class PluginRegistry:
obj._derp_command, obj, obj._derp_command, obj,
help=getattr(obj, "_derp_help", ""), help=getattr(obj, "_derp_help", ""),
plugin=plugin_name, plugin=plugin_name,
admin=getattr(obj, "_derp_admin", False),
) )
count += 1 count += 1
if hasattr(obj, "_derp_event"): if hasattr(obj, "_derp_event"):

View File

@@ -4,6 +4,7 @@ import textwrap
from pathlib import Path from pathlib import Path
from derp.bot import _AMBIGUOUS, Bot from derp.bot import _AMBIGUOUS, Bot
from derp.irc import Message
from derp.plugin import PluginRegistry, command, event from derp.plugin import PluginRegistry, command, event
@@ -33,6 +34,22 @@ class TestDecorators:
assert handler._derp_event == "PRIVMSG" assert handler._derp_event == "PRIVMSG"
def test_command_decorator_admin(self):
@command("secret", help="admin only", admin=True)
async def handler(bot, msg):
pass
assert handler._derp_command == "secret"
assert handler._derp_admin is True
def test_command_decorator_admin_default(self):
@command("public", help="everyone")
async def handler(bot, msg):
pass
assert getattr(handler, "_derp_admin", False) is False
class TestRegistry: class TestRegistry:
"""Test the plugin registry.""" """Test the plugin registry."""
@@ -46,6 +63,24 @@ class TestRegistry:
assert "test" in registry.commands assert "test" in registry.commands
assert registry.commands["test"].help == "test help" assert registry.commands["test"].help == "test help"
def test_register_command_admin(self):
registry = PluginRegistry()
async def handler(bot, msg):
pass
registry.register_command("secret", handler, help="admin", admin=True)
assert registry.commands["secret"].admin is True
def test_register_command_admin_default(self):
registry = PluginRegistry()
async def handler(bot, msg):
pass
registry.register_command("public", handler, help="public")
assert registry.commands["public"].admin is False
def test_register_event(self): def test_register_event(self):
registry = PluginRegistry() registry = PluginRegistry()
@@ -150,6 +185,27 @@ class TestRegistry:
count = registry.load_plugin(plugin_file) count = registry.load_plugin(plugin_file)
assert count == 3 assert count == 3
def test_load_plugin_admin_flag(self, tmp_path: Path):
plugin_code = textwrap.dedent("""\
from derp.plugin import command
@command("secret", help="Admin only", admin=True)
async def cmd_secret(bot, msg):
pass
@command("public", help="Everyone")
async def cmd_public(bot, msg):
pass
""")
plugin_file = tmp_path / "mixed.py"
plugin_file.write_text(plugin_code)
registry = PluginRegistry()
registry.load_plugin(plugin_file)
assert registry.commands["secret"].admin is True
assert registry.commands["public"].admin is False
def test_load_plugin_stores_path(self, tmp_path: Path): def test_load_plugin_stores_path(self, tmp_path: Path):
plugin_file = tmp_path / "pathed.py" plugin_file = tmp_path / "pathed.py"
plugin_file.write_text(textwrap.dedent("""\ plugin_file.write_text(textwrap.dedent("""\
@@ -366,3 +422,57 @@ class TestPrefixMatch:
handler = bot._resolve_command("v") handler = bot._resolve_command("v")
assert handler is not None assert handler is not None
assert handler.name == "version" assert handler.name == "version"
class TestIsAdmin:
"""Test admin permission checks."""
@staticmethod
def _make_bot(admins: list[str] | None = None, opers: set[str] | None = None) -> Bot:
"""Create a Bot with optional admin patterns and oper set."""
config = {
"server": {"host": "localhost", "port": 6667, "tls": False,
"nick": "test", "user": "test", "realname": "test"},
"bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins",
"admins": admins or []},
}
bot = Bot(config, PluginRegistry())
if opers:
bot._opers = opers
return bot
@staticmethod
def _msg(prefix: str) -> Message:
"""Create a minimal Message with a given prefix."""
return Message(raw="", prefix=prefix, nick=prefix.split("!")[0],
command="PRIVMSG", params=["#test", "!test"])
def test_no_prefix_not_admin(self):
bot = self._make_bot()
msg = Message(raw="", prefix=None, nick=None, command="PRIVMSG", params=[])
assert bot._is_admin(msg) is False
def test_oper_is_admin(self):
bot = self._make_bot(opers={"alice!~alice@host"})
msg = self._msg("alice!~alice@host")
assert bot._is_admin(msg) is True
def test_hostmask_pattern_match(self):
bot = self._make_bot(admins=["*!~user@trusted.host"])
msg = self._msg("bob!~user@trusted.host")
assert bot._is_admin(msg) is True
def test_hostmask_pattern_no_match(self):
bot = self._make_bot(admins=["*!~user@trusted.host"])
msg = self._msg("bob!~other@untrusted.host")
assert bot._is_admin(msg) is False
def test_wildcard_pattern(self):
bot = self._make_bot(admins=["ops!*@*.ops.net"])
msg = self._msg("ops!~ident@server.ops.net")
assert bot._is_admin(msg) is True
def test_no_patterns_no_opers(self):
bot = self._make_bot()
msg = self._msg("nobody!~user@host")
assert bot._is_admin(msg) is False