diff --git a/config/derp.toml.example b/config/derp.toml.example index 45f2069..d4e2b9d 100644 --- a/config/derp.toml.example +++ b/config/derp.toml.example @@ -6,11 +6,19 @@ nick = "derp" user = "derp" realname = "derp IRC bot" password = "" +# sasl_user = "account" # SASL PLAIN username (optional) +# sasl_pass = "secret" # SASL PLAIN password (optional) [bot] prefix = "!" channels = ["#test"] plugins_dir = "plugins" +# rate_limit = 2.0 # Messages per second (default: 2.0) +# rate_burst = 5 # Burst capacity (default: 5) +# admins = [ # Hostmask patterns (fnmatch), IRCOPs auto-detected +# "*!~user@trusted.host", +# "ops!*@*.ops.net", +# ] [logging] level = "info" diff --git a/docker-compose.yml b/docker-compose.yml index 85995d1..85efcf6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,5 +8,4 @@ services: volumes: - ./config/derp.toml:/app/config/derp.toml:ro,Z - ./plugins:/app/plugins:ro,Z - - ./profile:/app/profile:Z - command: ["--verbose", "--cprofile", "/app/profile/derp.prof"] + command: ["--verbose"] diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index 4622a59..560b876 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -13,6 +13,24 @@ derp -v # Verbose/debug mode derp --cprofile # Profile to derp.prof ``` +## SASL Authentication + +```toml +# In config/derp.toml +[server] +sasl_user = "account" +sasl_pass = "password" +``` + +## Rate Limiting + +```toml +# In config/derp.toml (defaults shown) +[bot] +rate_limit = 2.0 # Messages per second +rate_burst = 5 # Burst capacity +``` + ## Container ```bash @@ -35,13 +53,28 @@ make logs # Follow logs !h # Shorthand (any unambiguous prefix works) ``` -## Plugin Management +## Admin + +``` +!whoami # Show your hostmask + admin status +!admins # Show admin patterns + detected opers (admin) +``` + +```toml +# config/derp.toml +[bot] +admins = ["*!~user@trusted.host", "ops!*@*.ops.net"] +``` + +IRC operators are auto-detected via WHO. Hostmask patterns use fnmatch. + +## Plugin Management (admin) ``` !plugins # List loaded plugins -!load # Hot-load a plugin -!reload # Reload a changed plugin -!unload # Remove a plugin +!load # Hot-load a plugin (admin) +!reload # Reload a changed plugin (admin) +!unload # Remove a plugin (admin) ``` ## OSINT diff --git a/docs/USAGE.md b/docs/USAGE.md index 2b502b3..c749372 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -33,11 +33,16 @@ nick = "derp" # Bot nickname user = "derp" # Username (ident) realname = "derp IRC bot" # Real name field password = "" # Server password (optional) +sasl_user = "" # SASL PLAIN username (optional) +sasl_pass = "" # SASL PLAIN password (optional) [bot] prefix = "!" # Command prefix character channels = ["#test"] # Channels to join on connect plugins_dir = "plugins" # Plugin directory path +rate_limit = 2.0 # Max messages per second (default: 2.0) +rate_burst = 5 # Burst capacity (default: 5) +admins = [] # Hostmask patterns (fnmatch), IRCOPs auto-detected [logging] level = "info" # Logging level: debug, info, warning, error @@ -55,9 +60,11 @@ level = "info" # Logging level: debug, info, warning, error | `!uptime` | Show how long the bot has been running | | `!echo ` | Echo back text (example plugin) | | `!cert [...]` | Lookup CT logs for up to 5 domains | -| `!load ` | Hot-load a plugin from the plugins directory | -| `!reload ` | Reload a plugin, picking up file changes | -| `!unload ` | Unload a plugin, removing its handlers | +| `!whoami` | Show your hostmask and admin status | +| `!load ` | Hot-load a plugin (admin) | +| `!reload ` | Reload a plugin (admin) | +| `!unload ` | Unload a plugin (admin) | +| `!admins` | Show admin patterns and detected opers (admin) | | `!plugins` | List loaded plugins with handler counts | | `!dns [type]` | DNS lookup (A, AAAA, MX, NS, TXT, CNAME, PTR, SOA) | | `!encode ` | Encode text (b64, hex, url, rot13) | @@ -114,6 +121,40 @@ broken.test -- error: timeout - crt.sh can be slow; the bot confirms receipt before querying - Live cert check runs only when expired CT entries exist +## Admin System + +Commands marked as `admin` require elevated permissions. Admin access is +granted via: + +1. **IRC operator status** -- detected automatically via `WHO` on channel join +2. **Hostmask patterns** -- configured in `[bot] admins`, fnmatch-style + +```toml +[bot] +admins = [ + "*!~user@trusted.host", + "ops!*@*.ops.net", +] +``` + +Empty by default -- only IRC operators get admin access unless patterns +are configured. + +| Command | Description | +|---------|-------------| +| `!whoami` | Show your hostmask and admin status | +| `!admins` | Show configured patterns and detected opers (admin) | + +Admin-restricted commands: `!load`, `!reload`, `!unload`, `!admins`. + +### Writing Admin Commands + +```python +@command("dangerous", help="Admin-only action", admin=True) +async def cmd_dangerous(bot, message): + ... +``` + ## Plugin Management Plugins can be loaded, unloaded, and reloaded at runtime without diff --git a/plugins/core.py b/plugins/core.py index d24b00f..85e4816 100644 --- a/plugins/core.py +++ b/plugins/core.py @@ -76,7 +76,7 @@ async def cmd_uptime(bot, message): await bot.reply(message, f"up {' '.join(parts)}") -@command("load", help="Hot-load a plugin: !load ") +@command("load", help="Hot-load a plugin: !load ", admin=True) async def cmd_load(bot, message): """Load a new plugin from the plugins directory.""" parts = message.text.split(None, 2) @@ -91,7 +91,7 @@ async def cmd_load(bot, message): await bot.reply(message, f"Failed to load plugin: {reason}") -@command("reload", help="Reload a plugin: !reload ") +@command("reload", help="Reload a plugin: !reload ", admin=True) async def cmd_reload(bot, message): """Unload and reload a plugin, picking up file changes.""" parts = message.text.split(None, 2) @@ -106,7 +106,7 @@ async def cmd_reload(bot, message): await bot.reply(message, f"Failed to reload plugin: {reason}") -@command("unload", help="Unload a plugin: !unload ") +@command("unload", help="Unload a plugin: !unload ", admin=True) async def cmd_unload(bot, message): """Unload a plugin, removing all its handlers.""" parts = message.text.split(None, 2) @@ -132,3 +132,34 @@ async def cmd_plugins(bot, message): counts[handler.plugin] += 1 parts = [f"{name} ({counts[name]})" for name in sorted(counts)] await bot.reply(message, f"Plugins: {', '.join(parts)}") + + +@command("whoami", help="Show your hostmask and admin status") +async def cmd_whoami(bot, message): + """Display the sender's hostmask and permission level.""" + prefix = message.prefix or "unknown" + is_admin = bot._is_admin(message) + is_oper = message.prefix in bot._opers if message.prefix else False + tags = [] + if is_admin: + tags.append("admin") + else: + tags.append("user") + if is_oper: + tags.append("IRCOP") + await bot.reply(message, f"{prefix} [{', '.join(tags)}]") + + +@command("admins", help="Show configured admin patterns and detected opers", admin=True) +async def cmd_admins(bot, message): + """Display admin hostmask patterns and known IRC operators.""" + parts = [] + if bot._admins: + parts.append(f"Patterns: {', '.join(bot._admins)}") + else: + parts.append("Patterns: (none)") + if bot._opers: + parts.append(f"Opers: {', '.join(sorted(bot._opers))}") + else: + parts.append("Opers: (none)") + await bot.reply(message, " | ".join(parts)) diff --git a/src/derp/bot.py b/src/derp/bot.py index df5545f..f7d4788 100644 --- a/src/derp/bot.py +++ b/src/derp/bot.py @@ -3,10 +3,14 @@ from __future__ import annotations import asyncio +import base64 +import fnmatch import logging import time +from datetime import datetime, timezone from pathlib import Path +from derp import __version__ from derp.irc import IRCConnection, Message, format_msg, parse from derp.plugin import Handler, PluginRegistry @@ -16,6 +20,30 @@ RECONNECT_DELAY = 30 _AMBIGUOUS = object() # sentinel for ambiguous prefix matches +class _TokenBucket: + """Token bucket rate limiter for outgoing messages.""" + + def __init__(self, rate: float, burst: int) -> None: + self._rate = rate + self._burst = burst + self._tokens = float(burst) + self._last = time.monotonic() + self._lock = asyncio.Lock() + + async def acquire(self) -> None: + """Wait until a token is available.""" + async with self._lock: + now = time.monotonic() + self._tokens = min(self._burst, self._tokens + (now - self._last) * self._rate) + self._last = now + if self._tokens < 1: + wait = (1 - self._tokens) / self._rate + await asyncio.sleep(wait) + self._tokens = 0 + else: + self._tokens -= 1 + + class Bot: """IRC bot: ties connection, config, and plugins together.""" @@ -33,6 +61,14 @@ class Bot: self._running = False self._started: float = time.monotonic() self._tasks: set[asyncio.Task] = set() + self._admins: list[str] = config.get("bot", {}).get("admins", []) + self._opers: set[str] = set() # hostmasks of known IRC operators + # Rate limiter: default 2 msg/sec, burst of 5 + rate_cfg = config.get("bot", {}) + self._bucket = _TokenBucket( + rate=rate_cfg.get("rate_limit", 2.0), + burst=rate_cfg.get("rate_burst", 5), + ) async def start(self) -> None: """Connect, register, join channels, and enter the main loop.""" @@ -56,13 +92,71 @@ class Bot: await self.conn.close() async def _register(self) -> None: - """Send NICK/USER registration to the server.""" + """Send NICK/USER registration, with optional SASL PLAIN.""" srv = self.config["server"] + sasl_user = srv.get("sasl_user", "") + sasl_pass = srv.get("sasl_pass", "") + + if sasl_user and sasl_pass: + await self._sasl_auth(sasl_user, sasl_pass) + if srv.get("password"): await self.conn.send(format_msg("PASS", srv["password"])) await self.conn.send(format_msg("NICK", self.nick)) await self.conn.send(format_msg("USER", srv["user"], "0", "*", srv["realname"])) + async def _sasl_auth(self, user: str, password: str) -> None: + """Perform SASL PLAIN authentication during registration.""" + await self.conn.send("CAP REQ :sasl") + + # Wait for CAP ACK or NAK + while True: + line = await self.conn.readline() + if line is None: + log.error("connection closed during SASL negotiation") + return + msg = parse(line) + if msg.command == "CAP" and len(msg.params) >= 3: + sub = msg.params[1].upper() + if sub == "ACK" and "sasl" in msg.params[-1].lower(): + break + if sub == "NAK": + log.warning("server rejected SASL capability") + await self.conn.send("CAP END") + return + + # Send AUTHENTICATE PLAIN + await self.conn.send("AUTHENTICATE PLAIN") + + # Wait for AUTHENTICATE + + while True: + line = await self.conn.readline() + if line is None: + return + msg = parse(line) + if msg.command == "AUTHENTICATE" and msg.params and msg.params[0] == "+": + break + + # Send credentials: base64(user\0user\0pass) + payload = f"{user}\x00{user}\x00{password}" + encoded = base64.b64encode(payload.encode("utf-8")).decode("ascii") + await self.conn.send(f"AUTHENTICATE {encoded}") + + # Wait for 903 (success) or 904 (failure) + while True: + line = await self.conn.readline() + if line is None: + return + msg = parse(line) + if msg.command == "903": + log.info("SASL authentication successful") + break + if msg.command in ("904", "905", "906"): + log.error("SASL authentication failed: %s", msg.params[-1] if msg.params else "") + break + + await self.conn.send("CAP END") + async def _loop(self) -> None: """Read and dispatch messages until disconnect.""" while self._running: @@ -80,11 +174,22 @@ class Bot: await self.conn.send(format_msg("PONG", msg.params[0] if msg.params else "")) return - # RPL_WELCOME (001) — join channels + # RPL_WELCOME (001) — join channels and WHO for oper detection if msg.command == "001": self.nick = msg.params[0] if msg.params else self.nick for channel in self.config["bot"]["channels"]: await self.join(channel) + await self.conn.send(format_msg("WHO", channel)) + + # RPL_WHOREPLY (352) — detect IRC operators + if msg.command == "352" and len(msg.params) >= 7: + _chan, user, host, _server, nick, flags = msg.params[1:7] + if "*" in flags: + self._opers.add(f"{nick}!{user}@{host}") + + # QUIT — remove departed nicks from oper set + if msg.command == "QUIT" and msg.prefix: + self._opers.discard(msg.prefix) # Nick already in use (433) — append underscore if msg.command == "433": @@ -92,6 +197,11 @@ class Bot: await self.conn.send(format_msg("NICK", self.nick)) return + # CTCP queries (PRIVMSG with \x01...\x01 wrapping) + if msg.command == "PRIVMSG" and msg.text and msg.text.startswith("\x01"): + self._spawn(self._handle_ctcp(msg), name="ctcp") + return + # Dispatch to event handlers (fire-and-forget) event_type = msg.command for handler in self.registry.events.get(event_type, []): @@ -101,6 +211,44 @@ class Bot: if msg.command == "PRIVMSG" and msg.text: self._dispatch_command(msg) + async def _handle_ctcp(self, msg: Message) -> None: + """Respond to CTCP VERSION, TIME, and PING queries.""" + text = msg.text.strip("\x01") + parts = text.split(None, 1) + ctcp_cmd = parts[0].upper() if parts else "" + ctcp_arg = parts[1] if len(parts) > 1 else "" + + if not msg.nick: + return + + if ctcp_cmd == "VERSION": + reply = f"\x01VERSION derp {__version__}\x01" + elif ctcp_cmd == "TIME": + now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") + reply = f"\x01TIME {now}\x01" + elif ctcp_cmd == "PING": + reply = f"\x01PING {ctcp_arg}\x01" + else: + return + + await self.conn.send(format_msg("NOTICE", msg.nick, reply)) + log.debug("CTCP %s reply to %s", ctcp_cmd, msg.nick) + + def _is_admin(self, msg: Message) -> bool: + """Check if the message sender is a bot admin. + + Returns True if the sender is a known IRC operator or matches + a configured hostmask pattern (fnmatch-style). + """ + if not msg.prefix: + return False + if msg.prefix in self._opers: + return True + for pattern in self._admins: + if fnmatch.fnmatch(msg.prefix, pattern): + return True + return False + def _dispatch_command(self, msg: Message) -> None: """Check if a PRIVMSG is a bot command and spawn it.""" text = msg.text @@ -119,6 +267,11 @@ class Bot: name=f"cmd:{cmd_name}:ambiguous") return + if handler.admin and not self._is_admin(msg): + deny = f"Permission denied: {self.prefix}{cmd_name} requires admin" + self._spawn(self.reply(msg, deny), name=f"cmd:{cmd_name}:denied") + return + self._spawn(self._run_command(handler, cmd_name, msg), name=f"cmd:{cmd_name}") async def _run_command(self, handler: Handler, cmd_name: str, msg: Message) -> None: @@ -157,8 +310,9 @@ class Bot: # -- Public API for plugins -- async def send(self, target: str, text: str) -> None: - """Send a PRIVMSG to a target (channel or nick).""" + """Send a PRIVMSG to a target (channel or nick), rate-limited.""" for line in text.split("\n"): + await self._bucket.acquire() await self.conn.send(format_msg("PRIVMSG", target, line)) async def reply(self, msg: Message, text: str) -> None: diff --git a/src/derp/config.py b/src/derp/config.py index 5271db9..c484faf 100644 --- a/src/derp/config.py +++ b/src/derp/config.py @@ -14,11 +14,16 @@ DEFAULTS: dict = { "user": "derp", "realname": "derp IRC bot", "password": "", + "sasl_user": "", + "sasl_pass": "", }, "bot": { "prefix": "!", "channels": ["#test"], "plugins_dir": "plugins", + "rate_limit": 2.0, + "rate_burst": 5, + "admins": [], }, "logging": { "level": "info", diff --git a/src/derp/plugin.py b/src/derp/plugin.py index 3cc3ba6..6b20222 100644 --- a/src/derp/plugin.py +++ b/src/derp/plugin.py @@ -21,9 +21,10 @@ class Handler: callback: Callable help: str = "" plugin: str = "" + admin: bool = False -def command(name: str, help: str = "") -> Callable: +def command(name: str, help: str = "", admin: bool = False) -> Callable: """Decorator to register an async function as a bot command. Usage:: @@ -31,11 +32,16 @@ def command(name: str, help: str = "") -> Callable: @command("ping", help="Check if the bot is alive") async def cmd_ping(bot, message): await bot.reply(message, "pong") + + @command("reload", help="Reload a plugin", admin=True) + async def cmd_reload(bot, message): + ... """ def decorator(func: Callable) -> Callable: func._derp_command = name # type: ignore[attr-defined] func._derp_help = help # type: ignore[attr-defined] + func._derp_admin = admin # type: ignore[attr-defined] return func return decorator @@ -68,11 +74,13 @@ class PluginRegistry: self._paths: dict[str, Path] = {} def register_command(self, name: str, callback: Callable, help: str = "", - plugin: str = "") -> None: + plugin: str = "", admin: bool = False) -> None: """Register a command handler.""" if name in self.commands: log.warning("command '%s' already registered, overwriting", name) - self.commands[name] = Handler(name=name, callback=callback, help=help, plugin=plugin) + self.commands[name] = Handler( + name=name, callback=callback, help=help, plugin=plugin, admin=admin, + ) log.debug("registered command: %s (%s)", name, plugin) def register_event(self, event_type: str, callback: Callable, plugin: str = "") -> None: @@ -93,6 +101,7 @@ class PluginRegistry: obj._derp_command, obj, help=getattr(obj, "_derp_help", ""), plugin=plugin_name, + admin=getattr(obj, "_derp_admin", False), ) count += 1 if hasattr(obj, "_derp_event"): diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 07f779f..e739f80 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -4,6 +4,7 @@ import textwrap from pathlib import Path from derp.bot import _AMBIGUOUS, Bot +from derp.irc import Message from derp.plugin import PluginRegistry, command, event @@ -33,6 +34,22 @@ class TestDecorators: assert handler._derp_event == "PRIVMSG" + def test_command_decorator_admin(self): + @command("secret", help="admin only", admin=True) + async def handler(bot, msg): + pass + + assert handler._derp_command == "secret" + assert handler._derp_admin is True + + def test_command_decorator_admin_default(self): + @command("public", help="everyone") + async def handler(bot, msg): + pass + + assert getattr(handler, "_derp_admin", False) is False + + class TestRegistry: """Test the plugin registry.""" @@ -46,6 +63,24 @@ class TestRegistry: assert "test" in registry.commands assert registry.commands["test"].help == "test help" + def test_register_command_admin(self): + registry = PluginRegistry() + + async def handler(bot, msg): + pass + + registry.register_command("secret", handler, help="admin", admin=True) + assert registry.commands["secret"].admin is True + + def test_register_command_admin_default(self): + registry = PluginRegistry() + + async def handler(bot, msg): + pass + + registry.register_command("public", handler, help="public") + assert registry.commands["public"].admin is False + def test_register_event(self): registry = PluginRegistry() @@ -150,6 +185,27 @@ class TestRegistry: count = registry.load_plugin(plugin_file) assert count == 3 + def test_load_plugin_admin_flag(self, tmp_path: Path): + plugin_code = textwrap.dedent("""\ + from derp.plugin import command + + @command("secret", help="Admin only", admin=True) + async def cmd_secret(bot, msg): + pass + + @command("public", help="Everyone") + async def cmd_public(bot, msg): + pass + """) + plugin_file = tmp_path / "mixed.py" + plugin_file.write_text(plugin_code) + + registry = PluginRegistry() + registry.load_plugin(plugin_file) + + assert registry.commands["secret"].admin is True + assert registry.commands["public"].admin is False + def test_load_plugin_stores_path(self, tmp_path: Path): plugin_file = tmp_path / "pathed.py" plugin_file.write_text(textwrap.dedent("""\ @@ -366,3 +422,57 @@ class TestPrefixMatch: handler = bot._resolve_command("v") assert handler is not None assert handler.name == "version" + + +class TestIsAdmin: + """Test admin permission checks.""" + + @staticmethod + def _make_bot(admins: list[str] | None = None, opers: set[str] | None = None) -> Bot: + """Create a Bot with optional admin patterns and oper set.""" + config = { + "server": {"host": "localhost", "port": 6667, "tls": False, + "nick": "test", "user": "test", "realname": "test"}, + "bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins", + "admins": admins or []}, + } + bot = Bot(config, PluginRegistry()) + if opers: + bot._opers = opers + return bot + + @staticmethod + def _msg(prefix: str) -> Message: + """Create a minimal Message with a given prefix.""" + return Message(raw="", prefix=prefix, nick=prefix.split("!")[0], + command="PRIVMSG", params=["#test", "!test"]) + + def test_no_prefix_not_admin(self): + bot = self._make_bot() + msg = Message(raw="", prefix=None, nick=None, command="PRIVMSG", params=[]) + assert bot._is_admin(msg) is False + + def test_oper_is_admin(self): + bot = self._make_bot(opers={"alice!~alice@host"}) + msg = self._msg("alice!~alice@host") + assert bot._is_admin(msg) is True + + def test_hostmask_pattern_match(self): + bot = self._make_bot(admins=["*!~user@trusted.host"]) + msg = self._msg("bob!~user@trusted.host") + assert bot._is_admin(msg) is True + + def test_hostmask_pattern_no_match(self): + bot = self._make_bot(admins=["*!~user@trusted.host"]) + msg = self._msg("bob!~other@untrusted.host") + assert bot._is_admin(msg) is False + + def test_wildcard_pattern(self): + bot = self._make_bot(admins=["ops!*@*.ops.net"]) + msg = self._msg("ops!~ident@server.ops.net") + assert bot._is_admin(msg) is True + + def test_no_patterns_no_opers(self): + bot = self._make_bot() + msg = self._msg("nobody!~user@host") + assert bot._is_admin(msg) is False