diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index 0d10374..df9167b 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -18,9 +18,15 @@ derp -v # Verbose/debug mode !ping # Pong !help # List commands !help # Command help +!help # Plugin description + commands !version # Bot version !echo # Echo text back !cert # CT log lookup (max 5 domains) +!load # Hot-load a plugin +!reload # Reload a changed plugin +!unload # Remove a plugin +!plugins # List loaded plugins +!h # Shorthand (any unambiguous prefix works) ``` ## Plugin Template diff --git a/docs/USAGE.md b/docs/USAGE.md index 1a5610d..3c46a2a 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -49,9 +49,26 @@ level = "info" # Logging level: debug, info, warning, error | `!ping` | Bot responds with "pong" | | `!help` | List all available commands | | `!help ` | Show help for a specific command | +| `!help ` | Show plugin description and its commands | | `!version` | Show bot version | | `!echo ` | Echo back text (example plugin) | | `!cert [...]` | Lookup CT logs for up to 5 domains | +| `!load ` | Hot-load a plugin from the plugins directory | +| `!reload ` | Reload a plugin, picking up file changes | +| `!unload ` | Unload a plugin, removing its handlers | +| `!plugins` | List loaded plugins with handler counts | + +### Command Shorthand + +Commands can be abbreviated to any unambiguous prefix: + +``` +!h -> !help (unique match) +!pi -> !ping (unique match) +!p -> error: ambiguous (ping, plugins) +``` + +Exact matches always take priority over prefix matches. ### `!cert` -- Certificate Transparency Lookup @@ -76,6 +93,21 @@ broken.test -- error: timeout - crt.sh can be slow; the bot confirms receipt before querying - Live cert check runs only when expired CT entries exist +## Plugin Management + +Plugins can be loaded, unloaded, and reloaded at runtime without +restarting the bot. + +``` +!load crtsh # Hot-load a new plugin from plugins/ +!reload crtsh # Reload a changed plugin +!unload crtsh # Remove a plugin and all its handlers +!plugins # List loaded plugins with handler counts +``` + +The `core` plugin cannot be unloaded (prevents losing `!load`/`!reload`), +but it can be reloaded. + ## Writing Plugins Create a `.py` file in the `plugins/` directory: diff --git a/plugins/core.py b/plugins/core.py index 25d0d3a..8bf868d 100644 --- a/plugins/core.py +++ b/plugins/core.py @@ -1,4 +1,6 @@ -"""Core plugin: ping, help, version.""" +"""Core plugin: ping, help, version, plugin management.""" + +from collections import Counter from derp import __version__ from derp.plugin import command @@ -10,22 +12,37 @@ async def cmd_ping(bot, message): await bot.reply(message, "pong") -@command("help", help="List commands or show command help") +@command("help", help="List commands or show command/plugin help") async def cmd_help(bot, message): - """Show available commands or help for a specific command. + """Show available commands, or help for a specific command or plugin. - Usage: !help [command] + Usage: !help [command|plugin] """ parts = message.text.split(None, 2) if len(parts) > 1: - # Help for a specific command - cmd_name = parts[1].lower().lstrip(bot.prefix) - handler = bot.registry.commands.get(cmd_name) + name = parts[1].lower().lstrip(bot.prefix) + + # Check command first + handler = bot.registry.commands.get(name) if handler: help_text = handler.help or "No help available." - await bot.reply(message, f"{bot.prefix}{cmd_name} -- {help_text}") - else: - await bot.reply(message, f"Unknown command: {cmd_name}") + await bot.reply(message, f"{bot.prefix}{name} -- {help_text}") + return + + # Check plugin + module = bot.registry._modules.get(name) + if module: + desc = (getattr(module, "__doc__", "") or "").split("\n")[0].strip() + cmds = sorted( + k for k, v in bot.registry.commands.items() if v.plugin == name + ) + lines = [f"{name} -- {desc}" if desc else name] + if cmds: + lines.append(f"Commands: {', '.join(bot.prefix + c for c in cmds)}") + await bot.reply(message, " | ".join(lines)) + return + + await bot.reply(message, f"Unknown command or plugin: {name}") return # List all commands @@ -37,3 +54,61 @@ async def cmd_help(bot, message): async def cmd_version(bot, message): """Report the running version.""" await bot.reply(message, f"derp {__version__}") + + +@command("load", help="Hot-load a plugin: !load ") +async def cmd_load(bot, message): + """Load a new plugin from the plugins directory.""" + parts = message.text.split(None, 2) + if len(parts) < 2: + await bot.reply(message, "Usage: !load ") + return + name = parts[1].lower() + ok, reason = bot.load_plugin(name) + if ok: + await bot.reply(message, f"Loaded plugin: {name} ({reason})") + else: + await bot.reply(message, f"Failed to load plugin: {reason}") + + +@command("reload", help="Reload a plugin: !reload ") +async def cmd_reload(bot, message): + """Unload and reload a plugin, picking up file changes.""" + parts = message.text.split(None, 2) + if len(parts) < 2: + await bot.reply(message, "Usage: !reload ") + return + name = parts[1].lower() + ok, reason = bot.reload_plugin(name) + if ok: + await bot.reply(message, f"Reloaded plugin: {name}") + else: + await bot.reply(message, f"Failed to reload plugin: {reason}") + + +@command("unload", help="Unload a plugin: !unload ") +async def cmd_unload(bot, message): + """Unload a plugin, removing all its handlers.""" + parts = message.text.split(None, 2) + if len(parts) < 2: + await bot.reply(message, "Usage: !unload ") + return + name = parts[1].lower() + ok, reason = bot.unload_plugin(name) + if ok: + await bot.reply(message, f"Unloaded plugin: {name}") + else: + await bot.reply(message, f"Failed to unload plugin: {reason}") + + +@command("plugins", help="List loaded plugins") +async def cmd_plugins(bot, message): + """List all loaded plugins with handler counts.""" + counts: Counter[str] = Counter() + for handler in bot.registry.commands.values(): + counts[handler.plugin] += 1 + for handlers in bot.registry.events.values(): + for handler in handlers: + counts[handler.plugin] += 1 + parts = [f"{name} ({counts[name]})" for name in sorted(counts)] + await bot.reply(message, f"Plugins: {', '.join(parts)}") diff --git a/src/derp/bot.py b/src/derp/bot.py index cde2a18..d81cad5 100644 --- a/src/derp/bot.py +++ b/src/derp/bot.py @@ -7,11 +7,12 @@ import logging from pathlib import Path from derp.irc import IRCConnection, Message, format_msg, parse -from derp.plugin import PluginRegistry +from derp.plugin import Handler, PluginRegistry log = logging.getLogger(__name__) RECONNECT_DELAY = 30 +_AMBIGUOUS = object() # sentinel for ambiguous prefix matches class Bot: @@ -108,15 +109,39 @@ class Bot: parts = text[len(self.prefix):].split(None, 1) cmd_name = parts[0].lower() if parts else "" - handler = self.registry.commands.get(cmd_name) + handler = self._resolve_command(cmd_name) if handler is None: return + if handler is _AMBIGUOUS: + matches = [k for k in self.registry.commands if k.startswith(cmd_name)] + names = ", ".join(self.prefix + m for m in sorted(matches)) + await self.reply(msg, f"Ambiguous command '{self.prefix}{cmd_name}': {names}") + return try: await handler.callback(self, msg) except Exception: log.exception("error in command handler '%s'", cmd_name) + def _resolve_command(self, name: str) -> Handler | None: + """Resolve a command name, supporting unambiguous prefix matching. + + Returns the Handler on exact or unique prefix match, the sentinel + ``_AMBIGUOUS`` if multiple commands match, or None if nothing matches. + """ + # Exact match takes priority + handler = self.registry.commands.get(name) + if handler is not None: + return handler + + # Prefix match + matches = [v for k, v in self.registry.commands.items() if k.startswith(name)] + if len(matches) == 1: + return matches[0] + if len(matches) > 1: + return _AMBIGUOUS # type: ignore[return-value] + return None + # -- Public API for plugins -- async def send(self, target: str, text: str) -> None: @@ -158,3 +183,32 @@ class Bot: plugins_dir = self.config["bot"].get("plugins_dir", "plugins") path = Path(plugins_dir) self.registry.load_directory(path) + + @property + def plugins_dir(self) -> Path: + """Resolved path to the plugins directory.""" + return Path(self.config["bot"].get("plugins_dir", "plugins")) + + def load_plugin(self, name: str) -> tuple[bool, str]: + """Hot-load a new plugin by name from the plugins directory.""" + if name in self.registry._modules: + return False, f"plugin already loaded: {name}" + path = self.plugins_dir / f"{name}.py" + if not path.is_file(): + return False, f"{name}.py not found" + count = self.registry.load_plugin(path) + if count < 0: + return False, f"failed to load {name}" + return True, f"{count} handlers" + + def reload_plugin(self, name: str) -> tuple[bool, str]: + """Reload a plugin, picking up any file changes.""" + return self.registry.reload_plugin(name) + + def unload_plugin(self, name: str) -> tuple[bool, str]: + """Unload a plugin, removing all its handlers.""" + if self.registry.unload_plugin(name): + return True, "" + if name == "core": + return False, "cannot unload core" + return False, f"plugin not loaded: {name}" diff --git a/src/derp/plugin.py b/src/derp/plugin.py index 82ca262..3cc3ba6 100644 --- a/src/derp/plugin.py +++ b/src/derp/plugin.py @@ -2,9 +2,10 @@ from __future__ import annotations -import importlib.util import inspect import logging +import sys +import types from dataclasses import dataclass from pathlib import Path from typing import Any, Callable @@ -64,6 +65,7 @@ class PluginRegistry: self.commands: dict[str, Handler] = {} self.events: dict[str, list[Handler]] = {} self._modules: dict[str, Any] = {} + self._paths: dict[str, Path] = {} def register_command(self, name: str, callback: Callable, help: str = "", plugin: str = "") -> None: @@ -98,23 +100,30 @@ class PluginRegistry: count += 1 return count - def load_plugin(self, path: Path) -> None: - """Load a single plugin from a .py file.""" + def load_plugin(self, path: Path) -> int: + """Load a single plugin from a .py file. + + Returns the number of handlers registered, or -1 on failure. + """ plugin_name = path.stem if plugin_name.startswith("_"): - return + return -1 try: - spec = importlib.util.spec_from_file_location(f"derp.plugins.{plugin_name}", path) - if spec is None or spec.loader is None: - log.error("failed to create spec for %s", path) - return - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) + mod_name = f"derp.plugins.{plugin_name}" + source = path.read_text(encoding="utf-8") + code = compile(source, str(path), "exec") + module = types.ModuleType(mod_name) + module.__file__ = str(path) + sys.modules[mod_name] = module + exec(code, module.__dict__) # noqa: S102 count = self._scan_module(module, plugin_name) self._modules[plugin_name] = module + self._paths[plugin_name] = path.resolve() log.info("loaded plugin: %s (%d handlers)", plugin_name, count) + return count except Exception: log.exception("failed to load plugin: %s", path) + return -1 def load_directory(self, dir_path: Path) -> None: """Load all .py plugin files from a directory.""" @@ -123,3 +132,73 @@ class PluginRegistry: return for path in sorted(dir_path.glob("*.py")): self.load_plugin(path) + + def unload_plugin(self, name: str) -> bool: + """Unload a plugin, removing all its handlers. + + Returns True if the plugin was found and unloaded, False otherwise. + Refuses to unload the ``core`` plugin. + """ + if name == "core": + log.warning("refusing to unload core plugin") + return False + if name not in self._modules: + return False + + # Remove commands belonging to this plugin + self.commands = { + k: v for k, v in self.commands.items() if v.plugin != name + } + + # Remove event handlers belonging to this plugin + for event_type in list(self.events): + self.events[event_type] = [ + h for h in self.events[event_type] if h.plugin != name + ] + if not self.events[event_type]: + del self.events[event_type] + + # Clean up module references + mod_name = f"derp.plugins.{name}" + sys.modules.pop(mod_name, None) + del self._modules[name] + del self._paths[name] + log.info("unloaded plugin: %s", name) + return True + + def reload_plugin(self, name: str) -> tuple[bool, str]: + """Reload a plugin from its original path. + + Returns ``(True, "")`` on success, ``(False, reason)`` on failure. + """ + if name not in self._paths: + return False, f"plugin not loaded: {name}" + + path = self._paths[name] + + # Core can be reloaded (unload guard is bypassed) + if name == "core": + self._force_unload(name) + else: + self.unload_plugin(name) + + count = self.load_plugin(path) + if count < 0: + return False, f"failed to load {name}" + return True, "" + + def _force_unload(self, name: str) -> None: + """Unload a plugin without the core guard (used by reload).""" + self.commands = { + k: v for k, v in self.commands.items() if v.plugin != name + } + for event_type in list(self.events): + self.events[event_type] = [ + h for h in self.events[event_type] if h.plugin != name + ] + if not self.events[event_type]: + del self.events[event_type] + mod_name = f"derp.plugins.{name}" + sys.modules.pop(mod_name, None) + self._modules.pop(name, None) + self._paths.pop(name, None) diff --git a/tests/test_plugin.py b/tests/test_plugin.py index a43b564..07f779f 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -3,6 +3,7 @@ import textwrap from pathlib import Path +from derp.bot import _AMBIGUOUS, Bot from derp.plugin import PluginRegistry, command, event @@ -126,3 +127,242 @@ class TestRegistry: registry = PluginRegistry() registry.load_directory(tmp_path / "nonexistent") assert len(registry.commands) == 0 + + def test_load_plugin_returns_count(self, tmp_path: Path): + plugin_file = tmp_path / "counting.py" + plugin_file.write_text(textwrap.dedent("""\ + from derp.plugin import command, event + + @command("one", help="first") + async def cmd_one(bot, msg): + pass + + @command("two", help="second") + async def cmd_two(bot, msg): + pass + + @event("JOIN") + async def on_join(bot, msg): + pass + """)) + + registry = PluginRegistry() + count = registry.load_plugin(plugin_file) + assert count == 3 + + def test_load_plugin_stores_path(self, tmp_path: Path): + plugin_file = tmp_path / "pathed.py" + plugin_file.write_text(textwrap.dedent("""\ + from derp.plugin import command + + @command("pathed") + async def cmd(bot, msg): + pass + """)) + + registry = PluginRegistry() + registry.load_plugin(plugin_file) + assert "pathed" in registry._paths + assert registry._paths["pathed"] == plugin_file.resolve() + + +class TestHotReload: + """Test hot-load, unload, and reload of plugins.""" + + def _write_plugin(self, tmp_path: Path, name: str, code: str) -> Path: + """Write a plugin file and return its path.""" + path = tmp_path / f"{name}.py" + path.write_text(textwrap.dedent(code)) + return path + + def test_unload_removes_commands(self, tmp_path: Path): + registry = PluginRegistry() + self._write_plugin(tmp_path, "greet", """\ + from derp.plugin import command + + @command("hello") + async def cmd(bot, msg): + pass + """) + registry.load_plugin(tmp_path / "greet.py") + assert "hello" in registry.commands + + result = registry.unload_plugin("greet") + assert result is True + assert "hello" not in registry.commands + assert "greet" not in registry._modules + assert "greet" not in registry._paths + + def test_unload_removes_events(self, tmp_path: Path): + registry = PluginRegistry() + self._write_plugin(tmp_path, "evplug", """\ + from derp.plugin import event + + @event("JOIN") + async def on_join(bot, msg): + pass + """) + registry.load_plugin(tmp_path / "evplug.py") + assert "JOIN" in registry.events + assert len(registry.events["JOIN"]) == 1 + + registry.unload_plugin("evplug") + assert "JOIN" not in registry.events + + def test_unload_unknown_returns_false(self): + registry = PluginRegistry() + assert registry.unload_plugin("nonexistent") is False + + def test_unload_core_refused(self, tmp_path: Path): + registry = PluginRegistry() + self._write_plugin(tmp_path, "core", """\ + from derp.plugin import command + + @command("ping") + async def cmd(bot, msg): + pass + """) + registry.load_plugin(tmp_path / "core.py") + assert "ping" in registry.commands + + result = registry.unload_plugin("core") + assert result is False + assert "ping" in registry.commands + + def test_reload_plugin(self, tmp_path: Path): + registry = PluginRegistry() + path = self._write_plugin(tmp_path, "mutable", """\ + from derp.plugin import command + + @command("old") + async def cmd(bot, msg): + pass + """) + registry.load_plugin(path) + assert "old" in registry.commands + + # Rewrite the file with a different command + path.write_text(textwrap.dedent("""\ + from derp.plugin import command + + @command("new") + async def cmd(bot, msg): + pass + """)) + + ok, reason = registry.reload_plugin("mutable") + assert ok is True + assert "old" not in registry.commands + assert "new" in registry.commands + + def test_reload_unknown_returns_failure(self): + registry = PluginRegistry() + ok, reason = registry.reload_plugin("ghost") + assert ok is False + assert "not loaded" in reason + + def test_reload_core_allowed(self, tmp_path: Path): + registry = PluginRegistry() + self._write_plugin(tmp_path, "core", """\ + from derp.plugin import command + + @command("ping") + async def cmd(bot, msg): + pass + """) + registry.load_plugin(tmp_path / "core.py") + ok, _ = registry.reload_plugin("core") + assert ok is True + assert "ping" in registry.commands + + def test_unload_preserves_other_plugins(self, tmp_path: Path): + registry = PluginRegistry() + self._write_plugin(tmp_path, "keep", """\ + from derp.plugin import command, event + + @command("keeper") + async def cmd(bot, msg): + pass + + @event("JOIN") + async def on_join(bot, msg): + pass + """) + self._write_plugin(tmp_path, "drop", """\ + from derp.plugin import command, event + + @command("dropper") + async def cmd(bot, msg): + pass + + @event("JOIN") + async def on_join(bot, msg): + pass + """) + registry.load_directory(tmp_path) + assert "keeper" in registry.commands + assert "dropper" in registry.commands + assert len(registry.events["JOIN"]) == 2 + + registry.unload_plugin("drop") + assert "keeper" in registry.commands + assert "dropper" not in registry.commands + assert len(registry.events["JOIN"]) == 1 + + +class TestPrefixMatch: + """Test shorthand / prefix matching for commands.""" + + @staticmethod + def _make_bot(commands: list[str]) -> Bot: + """Create a minimal Bot with the given command names registered.""" + config = { + "server": {"host": "localhost", "port": 6667, "tls": False, + "nick": "test", "user": "test", "realname": "test"}, + "bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins"}, + } + registry = PluginRegistry() + for name in commands: + async def _noop(bot, msg): + pass + registry.register_command(name, _noop, plugin="test") + return Bot(config, registry) + + def test_exact_match(self): + bot = self._make_bot(["ping", "pong", "plugins"]) + handler = bot._resolve_command("ping") + assert handler is not None + assert handler.name == "ping" + + def test_unique_prefix(self): + bot = self._make_bot(["ping", "help", "version"]) + handler = bot._resolve_command("h") + assert handler is not None + assert handler.name == "help" + + def test_unique_prefix_partial(self): + bot = self._make_bot(["ping", "plugins", "help"]) + handler = bot._resolve_command("pi") + assert handler is not None + assert handler.name == "ping" + + def test_ambiguous_prefix(self): + bot = self._make_bot(["ping", "plugins", "help"]) + result = bot._resolve_command("p") + assert result is _AMBIGUOUS + + def test_no_match(self): + bot = self._make_bot(["ping", "help"]) + assert bot._resolve_command("z") is None + + def test_exact_match_beats_prefix(self): + bot = self._make_bot(["help", "helper"]) + handler = bot._resolve_command("help") + assert handler is not None + assert handler.name == "help" + + def test_single_char_unique(self): + bot = self._make_bot(["version", "help", "ping"]) + handler = bot._resolve_command("v") + assert handler is not None + assert handler.name == "version"