"""Tests for the plugin system.""" import asyncio import textwrap from pathlib import Path from derp.bot import _AMBIGUOUS, Bot, _split_utf8 from derp.irc import Message from derp.plugin import PluginRegistry, command, event from derp.state import StateStore class TestDecorators: """Test that decorators mark functions correctly.""" def test_command_decorator(self): @command("test", help="a test command") async def handler(bot, msg): pass assert handler._derp_command == "test" assert handler._derp_help == "a test command" def test_event_decorator(self): @event("join") async def handler(bot, msg): pass assert handler._derp_event == "JOIN" def test_event_decorator_case(self): @event("PRIVMSG") async def handler(bot, msg): pass assert handler._derp_event == "PRIVMSG" def test_command_decorator_aliases(self): @command("skip", help="skip track", aliases=["next", "s"]) async def handler(bot, msg): pass assert handler._derp_aliases == ["next", "s"] def test_command_decorator_aliases_default(self): @command("ping", help="ping") async def handler(bot, msg): pass assert handler._derp_aliases == [] def test_command_decorator_admin(self): @command("secret", help="admin only", admin=True) async def handler(bot, msg): pass assert handler._derp_command == "secret" assert handler._derp_admin is True def test_command_decorator_admin_default(self): @command("public", help="everyone") async def handler(bot, msg): pass assert getattr(handler, "_derp_admin", False) is False class TestRegistry: """Test the plugin registry.""" def test_register_command(self): registry = PluginRegistry() async def handler(bot, msg): pass registry.register_command("test", handler, help="test help") assert "test" in registry.commands assert registry.commands["test"].help == "test help" def test_register_command_admin(self): registry = PluginRegistry() async def handler(bot, msg): pass registry.register_command("secret", handler, help="admin", admin=True) assert registry.commands["secret"].admin is True def test_register_command_admin_default(self): registry = PluginRegistry() async def handler(bot, msg): pass registry.register_command("public", handler, help="public") assert registry.commands["public"].admin is False def test_register_event(self): registry = PluginRegistry() async def handler(bot, msg): pass registry.register_event("PRIVMSG", handler) assert "PRIVMSG" in registry.events assert len(registry.events["PRIVMSG"]) == 1 def test_multiple_event_handlers(self): registry = PluginRegistry() async def handler_a(bot, msg): pass async def handler_b(bot, msg): pass registry.register_event("JOIN", handler_a) registry.register_event("JOIN", handler_b) assert len(registry.events["JOIN"]) == 2 def test_load_plugin_file(self, tmp_path: Path): plugin_code = textwrap.dedent("""\ from derp.plugin import command, event @command("greet", help="Say hello") async def cmd_greet(bot, msg): pass @event("JOIN") async def on_join(bot, msg): pass """) plugin_file = tmp_path / "greet.py" plugin_file.write_text(plugin_code) registry = PluginRegistry() registry.load_plugin(plugin_file) assert "greet" in registry.commands assert "JOIN" in registry.events def test_load_directory(self, tmp_path: Path): for name in ("a.py", "b.py"): (tmp_path / name).write_text(textwrap.dedent(f"""\ from derp.plugin import command @command("{name[0]}", help="{name}") async def cmd(bot, msg): pass """)) registry = PluginRegistry() registry.load_directory(tmp_path) assert "a" in registry.commands assert "b" in registry.commands def test_skip_underscore_files(self, tmp_path: Path): (tmp_path / "__init__.py").write_text("") (tmp_path / "_private.py").write_text("x = 1\n") (tmp_path / "valid.py").write_text(textwrap.dedent("""\ from derp.plugin import command @command("valid") async def cmd(bot, msg): pass """)) registry = PluginRegistry() registry.load_directory(tmp_path) assert "valid" in registry.commands assert len(registry.commands) == 1 def test_load_missing_directory(self, tmp_path: Path): registry = PluginRegistry() registry.load_directory(tmp_path / "nonexistent") assert len(registry.commands) == 0 def test_load_plugin_returns_count(self, tmp_path: Path): plugin_file = tmp_path / "counting.py" plugin_file.write_text(textwrap.dedent("""\ from derp.plugin import command, event @command("one", help="first") async def cmd_one(bot, msg): pass @command("two", help="second") async def cmd_two(bot, msg): pass @event("JOIN") async def on_join(bot, msg): pass """)) registry = PluginRegistry() count = registry.load_plugin(plugin_file) assert count == 3 def test_load_plugin_admin_flag(self, tmp_path: Path): plugin_code = textwrap.dedent("""\ from derp.plugin import command @command("secret", help="Admin only", admin=True) async def cmd_secret(bot, msg): pass @command("public", help="Everyone") async def cmd_public(bot, msg): pass """) plugin_file = tmp_path / "mixed.py" plugin_file.write_text(plugin_code) registry = PluginRegistry() registry.load_plugin(plugin_file) assert registry.commands["secret"].admin is True assert registry.commands["public"].admin is False def test_load_plugin_aliases(self, tmp_path: Path): plugin_file = tmp_path / "aliased.py" plugin_file.write_text(textwrap.dedent("""\ from derp.plugin import command @command("skip", help="Skip track", aliases=["next", "s"]) async def cmd_skip(bot, msg): pass """)) registry = PluginRegistry() count = registry.load_plugin(plugin_file) assert count == 3 # primary + 2 aliases assert "skip" in registry.commands assert "next" in registry.commands assert "s" in registry.commands # Aliases point to the same callback assert registry.commands["next"].callback is registry.commands["skip"].callback assert registry.commands["s"].callback is registry.commands["skip"].callback # Alias help text references the primary command assert registry.commands["next"].help == "alias for !skip" def test_unload_removes_aliases(self, tmp_path: Path): plugin_file = tmp_path / "aliased.py" plugin_file.write_text(textwrap.dedent("""\ from derp.plugin import command @command("skip", help="Skip track", aliases=["next"]) async def cmd_skip(bot, msg): pass """)) registry = PluginRegistry() registry.load_plugin(plugin_file) assert "next" in registry.commands registry.unload_plugin("aliased") assert "skip" not in registry.commands assert "next" not in registry.commands def test_load_plugin_stores_path(self, tmp_path: Path): plugin_file = tmp_path / "pathed.py" plugin_file.write_text(textwrap.dedent("""\ from derp.plugin import command @command("pathed") async def cmd(bot, msg): pass """)) registry = PluginRegistry() registry.load_plugin(plugin_file) assert "pathed" in registry._paths assert registry._paths["pathed"] == plugin_file.resolve() class TestHotReload: """Test hot-load, unload, and reload of plugins.""" def _write_plugin(self, tmp_path: Path, name: str, code: str) -> Path: """Write a plugin file and return its path.""" path = tmp_path / f"{name}.py" path.write_text(textwrap.dedent(code)) return path def test_unload_removes_commands(self, tmp_path: Path): registry = PluginRegistry() self._write_plugin(tmp_path, "greet", """\ from derp.plugin import command @command("hello") async def cmd(bot, msg): pass """) registry.load_plugin(tmp_path / "greet.py") assert "hello" in registry.commands result = registry.unload_plugin("greet") assert result is True assert "hello" not in registry.commands assert "greet" not in registry._modules assert "greet" not in registry._paths def test_unload_removes_events(self, tmp_path: Path): registry = PluginRegistry() self._write_plugin(tmp_path, "evplug", """\ from derp.plugin import event @event("JOIN") async def on_join(bot, msg): pass """) registry.load_plugin(tmp_path / "evplug.py") assert "JOIN" in registry.events assert len(registry.events["JOIN"]) == 1 registry.unload_plugin("evplug") assert "JOIN" not in registry.events def test_unload_unknown_returns_false(self): registry = PluginRegistry() assert registry.unload_plugin("nonexistent") is False def test_unload_core_refused(self, tmp_path: Path): registry = PluginRegistry() self._write_plugin(tmp_path, "core", """\ from derp.plugin import command @command("ping") async def cmd(bot, msg): pass """) registry.load_plugin(tmp_path / "core.py") assert "ping" in registry.commands result = registry.unload_plugin("core") assert result is False assert "ping" in registry.commands def test_reload_plugin(self, tmp_path: Path): registry = PluginRegistry() path = self._write_plugin(tmp_path, "mutable", """\ from derp.plugin import command @command("old") async def cmd(bot, msg): pass """) registry.load_plugin(path) assert "old" in registry.commands # Rewrite the file with a different command path.write_text(textwrap.dedent("""\ from derp.plugin import command @command("new") async def cmd(bot, msg): pass """)) ok, reason = registry.reload_plugin("mutable") assert ok is True assert "old" not in registry.commands assert "new" in registry.commands def test_reload_unknown_returns_failure(self): registry = PluginRegistry() ok, reason = registry.reload_plugin("ghost") assert ok is False assert "not loaded" in reason def test_reload_core_allowed(self, tmp_path: Path): registry = PluginRegistry() self._write_plugin(tmp_path, "core", """\ from derp.plugin import command @command("ping") async def cmd(bot, msg): pass """) registry.load_plugin(tmp_path / "core.py") ok, _ = registry.reload_plugin("core") assert ok is True assert "ping" in registry.commands def test_unload_preserves_other_plugins(self, tmp_path: Path): registry = PluginRegistry() self._write_plugin(tmp_path, "keep", """\ from derp.plugin import command, event @command("keeper") async def cmd(bot, msg): pass @event("JOIN") async def on_join(bot, msg): pass """) self._write_plugin(tmp_path, "drop", """\ from derp.plugin import command, event @command("dropper") async def cmd(bot, msg): pass @event("JOIN") async def on_join(bot, msg): pass """) registry.load_directory(tmp_path) assert "keeper" in registry.commands assert "dropper" in registry.commands assert len(registry.events["JOIN"]) == 2 registry.unload_plugin("drop") assert "keeper" in registry.commands assert "dropper" not in registry.commands assert len(registry.events["JOIN"]) == 1 class TestPrefixMatch: """Test shorthand / prefix matching for commands.""" @staticmethod def _make_bot(commands: list[str]) -> Bot: """Create a minimal Bot with the given command names registered.""" config = { "server": {"host": "localhost", "port": 6667, "tls": False, "nick": "test", "user": "test", "realname": "test"}, "bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins"}, } registry = PluginRegistry() for name in commands: async def _noop(bot, msg): pass registry.register_command(name, _noop, plugin="test") return Bot("test", config, registry) def test_exact_match(self): bot = self._make_bot(["ping", "pong", "plugins"]) handler = bot._resolve_command("ping") assert handler is not None assert handler.name == "ping" def test_unique_prefix(self): bot = self._make_bot(["ping", "help", "version"]) handler = bot._resolve_command("h") assert handler is not None assert handler.name == "help" def test_unique_prefix_partial(self): bot = self._make_bot(["ping", "plugins", "help"]) handler = bot._resolve_command("pi") assert handler is not None assert handler.name == "ping" def test_ambiguous_prefix(self): bot = self._make_bot(["ping", "plugins", "help"]) result = bot._resolve_command("p") assert result is _AMBIGUOUS def test_no_match(self): bot = self._make_bot(["ping", "help"]) assert bot._resolve_command("z") is None def test_exact_match_beats_prefix(self): bot = self._make_bot(["help", "helper"]) handler = bot._resolve_command("help") assert handler is not None assert handler.name == "help" def test_single_char_unique(self): bot = self._make_bot(["version", "help", "ping"]) handler = bot._resolve_command("v") assert handler is not None assert handler.name == "version" class TestIsAdmin: """Test admin permission checks.""" @staticmethod def _make_bot(admins: list[str] | None = None, opers: set[str] | None = None) -> Bot: """Create a Bot with optional admin patterns and oper set.""" config = { "server": {"host": "localhost", "port": 6667, "tls": False, "nick": "test", "user": "test", "realname": "test"}, "bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins", "admins": admins or []}, } bot = Bot("test", config, PluginRegistry()) if opers: bot._opers = opers return bot @staticmethod def _msg(prefix: str) -> Message: """Create a minimal Message with a given prefix.""" return Message(raw="", prefix=prefix, nick=prefix.split("!")[0], command="PRIVMSG", params=["#test", "!test"], tags={}) def test_no_prefix_not_admin(self): bot = self._make_bot() msg = Message(raw="", prefix=None, nick=None, command="PRIVMSG", params=[], tags={}) assert bot._is_admin(msg) is False def test_oper_is_admin(self): bot = self._make_bot(opers={"alice!~alice@host"}) msg = self._msg("alice!~alice@host") assert bot._is_admin(msg) is True def test_hostmask_pattern_match(self): bot = self._make_bot(admins=["*!~user@trusted.host"]) msg = self._msg("bob!~user@trusted.host") assert bot._is_admin(msg) is True def test_hostmask_pattern_no_match(self): bot = self._make_bot(admins=["*!~user@trusted.host"]) msg = self._msg("bob!~other@untrusted.host") assert bot._is_admin(msg) is False def test_wildcard_pattern(self): bot = self._make_bot(admins=["ops!*@*.ops.net"]) msg = self._msg("ops!~ident@server.ops.net") assert bot._is_admin(msg) is True def test_no_patterns_no_opers(self): bot = self._make_bot() msg = self._msg("nobody!~user@host") assert bot._is_admin(msg) is False class TestStateStore: """Test the SQLite key-value state store.""" def test_get_missing_returns_default(self, tmp_path: Path): store = StateStore(tmp_path / "state.db") assert store.get("plug", "key") is None assert store.get("plug", "key", "fallback") == "fallback" def test_set_and_get(self, tmp_path: Path): store = StateStore(tmp_path / "state.db") store.set("plug", "color", "blue") assert store.get("plug", "color") == "blue" def test_set_overwrite(self, tmp_path: Path): store = StateStore(tmp_path / "state.db") store.set("plug", "val", "one") store.set("plug", "val", "two") assert store.get("plug", "val") == "two" def test_namespace_isolation(self, tmp_path: Path): store = StateStore(tmp_path / "state.db") store.set("alpha", "key", "a") store.set("beta", "key", "b") assert store.get("alpha", "key") == "a" assert store.get("beta", "key") == "b" def test_delete_existing(self, tmp_path: Path): store = StateStore(tmp_path / "state.db") store.set("plug", "key", "val") assert store.delete("plug", "key") is True assert store.get("plug", "key") is None def test_delete_missing(self, tmp_path: Path): store = StateStore(tmp_path / "state.db") assert store.delete("plug", "ghost") is False def test_keys(self, tmp_path: Path): store = StateStore(tmp_path / "state.db") store.set("plug", "b", "2") store.set("plug", "a", "1") store.set("other", "c", "3") assert store.keys("plug") == ["a", "b"] assert store.keys("other") == ["c"] assert store.keys("empty") == [] def test_clear(self, tmp_path: Path): store = StateStore(tmp_path / "state.db") store.set("plug", "a", "1") store.set("plug", "b", "2") store.set("other", "c", "3") count = store.clear("plug") assert count == 2 assert store.keys("plug") == [] assert store.keys("other") == ["c"] def test_clear_empty(self, tmp_path: Path): store = StateStore(tmp_path / "state.db") assert store.clear("empty") == 0 def test_close_and_reopen(self, tmp_path: Path): db_path = tmp_path / "state.db" store = StateStore(db_path) store.set("plug", "persist", "yes") store.close() store2 = StateStore(db_path) assert store2.get("plug", "persist") == "yes" class _FakeConnection: """Minimal IRCConnection stand-in that captures sent lines.""" def __init__(self): self.sent: list[str] = [] async def send(self, line: str) -> None: self.sent.append(line) def _make_test_bot() -> Bot: """Create a Bot with a FakeConnection for testing the public API.""" config = { "server": {"host": "localhost", "port": 6667, "tls": False, "nick": "test", "user": "test", "realname": "test"}, "bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins"}, } bot = Bot("test", config, PluginRegistry()) bot.conn = _FakeConnection() # type: ignore[assignment] return bot class TestBotAPI: """Test Bot public API methods via FakeConnection.""" def test_send(self): bot = _make_test_bot() asyncio.run(bot.send("#ch", "hello world")) assert bot.conn.sent == ["PRIVMSG #ch :hello world"] def test_send_single_word(self): bot = _make_test_bot() asyncio.run(bot.send("#ch", "hello")) assert bot.conn.sent == ["PRIVMSG #ch hello"] def test_send_multiline(self): bot = _make_test_bot() asyncio.run(bot.send("#ch", "line one\nline two")) assert len(bot.conn.sent) == 2 assert bot.conn.sent[0] == "PRIVMSG #ch :line one" assert bot.conn.sent[1] == "PRIVMSG #ch :line two" def test_reply_channel(self): bot = _make_test_bot() msg = Message(raw="", prefix="nick!u@h", nick="nick", command="PRIVMSG", params=["#ch", "hi"], tags={}) asyncio.run(bot.reply(msg, "hello there")) assert bot.conn.sent == ["PRIVMSG #ch :hello there"] def test_reply_pm(self): bot = _make_test_bot() msg = Message(raw="", prefix="nick!u@h", nick="nick", command="PRIVMSG", params=["testbot", "hi"], tags={}) asyncio.run(bot.reply(msg, "hello there")) assert bot.conn.sent == ["PRIVMSG nick :hello there"] def test_join(self): bot = _make_test_bot() asyncio.run(bot.join("#test")) assert bot.conn.sent == ["JOIN :#test"] def test_kick(self): bot = _make_test_bot() asyncio.run(bot.kick("#ch", "baduser", "bad behavior")) assert bot.conn.sent == ["KICK #ch baduser :bad behavior"] def test_mode(self): bot = _make_test_bot() asyncio.run(bot.mode("#ch", "+o", "nick")) assert bot.conn.sent == ["MODE #ch +o nick"] def test_set_topic(self): bot = _make_test_bot() asyncio.run(bot.set_topic("#ch", "new topic")) assert bot.conn.sent == ["TOPIC #ch :new topic"] class TestChannelFilter: """Test per-channel plugin allow/deny.""" @staticmethod def _make_bot(channels_cfg: dict | None = None) -> Bot: """Create a Bot with optional per-channel config.""" config = { "server": {"host": "localhost", "port": 6667, "tls": False, "nick": "test", "user": "test", "realname": "test"}, "bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins"}, "channels": channels_cfg or {}, } return Bot("test", config, PluginRegistry()) def test_core_always_allowed(self): bot = self._make_bot({"#locked": {"plugins": ["core"]}}) assert bot._plugin_allowed("core", "#locked") is True def test_listed_plugin_allowed(self): bot = self._make_bot({"#ops": {"plugins": ["core", "dns"]}}) assert bot._plugin_allowed("dns", "#ops") is True def test_unlisted_plugin_denied(self): bot = self._make_bot({"#ops": {"plugins": ["core", "dns"]}}) assert bot._plugin_allowed("encode", "#ops") is False def test_unconfigured_channel_allows_all(self): bot = self._make_bot({"#locked": {"plugins": ["core"]}}) assert bot._plugin_allowed("encode", "#open") is True def test_no_channels_config_allows_all(self): bot = self._make_bot() assert bot._plugin_allowed("anything", "#test") is True def test_pm_always_allowed(self): bot = self._make_bot({"#locked": {"plugins": ["core"]}}) assert bot._plugin_allowed("encode", "someone") is True def test_none_channel_allowed(self): bot = self._make_bot({"#locked": {"plugins": ["core"]}}) assert bot._plugin_allowed("encode", None) is True def test_channel_without_plugins_key(self): bot = self._make_bot({"#other": {"some_setting": True}}) assert bot._plugin_allowed("encode", "#other") is True def test_ampersand_channel(self): bot = self._make_bot({"&local": {"plugins": ["core", "dns"]}}) assert bot._plugin_allowed("dns", "&local") is True assert bot._plugin_allowed("encode", "&local") is False class TestAliasDispatch: """Test alias fallback in _dispatch_command.""" @staticmethod def _make_bot_with_alias(alias_name: str, target_cmd: str) -> tuple[Bot, list]: """Create a Bot with a command and an alias pointing to it.""" config = { "server": {"host": "localhost", "port": 6667, "tls": False, "nick": "test", "user": "test", "realname": "test"}, "bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins"}, } registry = PluginRegistry() called = [] async def _handler(bot, msg): called.append(msg.text) registry.register_command(target_cmd, _handler, plugin="test") bot = Bot("test", config, registry) bot.conn = _FakeConnection() bot.state.set("alias", alias_name, target_cmd) return bot, called def test_alias_resolves_command(self): """An alias triggers the target command handler.""" bot, called = self._make_bot_with_alias("s", "skip") msg = Message(raw="", prefix="nick!u@h", nick="nick", command="PRIVMSG", params=["#ch", "!s"], tags={}) async def _run(): bot._dispatch_command(msg) await asyncio.sleep(0.05) # let spawned task run asyncio.run(_run()) assert len(called) == 1 def test_alias_ignored_when_command_exists(self): """Direct command match takes priority over alias.""" bot, called = self._make_bot_with_alias("skip", "stop") # "skip" is both a real command and an alias to "stop"; real wins msg = Message(raw="", prefix="nick!u@h", nick="nick", command="PRIVMSG", params=["#ch", "!skip"], tags={}) async def _run(): bot._dispatch_command(msg) await asyncio.sleep(0.05) asyncio.run(_run()) assert len(called) == 1 # Handler was the "skip" handler, not "stop" def test_no_alias_no_crash(self): """Unknown command with no alias silently returns.""" config = { "server": {"host": "localhost", "port": 6667, "tls": False, "nick": "test", "user": "test", "realname": "test"}, "bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins"}, } bot = Bot("test", config, PluginRegistry()) bot.conn = _FakeConnection() msg = Message(raw="", prefix="nick!u@h", nick="nick", command="PRIVMSG", params=["#ch", "!nonexistent"], tags={}) bot._dispatch_command(msg) # should not raise class TestSplitUtf8: """Test UTF-8 safe message splitting.""" def test_short_text(self): assert _split_utf8("hello", 100) == ["hello"] def test_ascii_split(self): text = "a" * 20 chunks = _split_utf8(text, 10) assert len(chunks) == 2 assert all(len(c.encode("utf-8")) <= 10 for c in chunks) assert "".join(chunks) == text def test_multibyte_boundary(self): # Each char is 3 bytes in UTF-8 text = "\u00e9" * 10 # e-acute, 2 bytes each -> 20 bytes total chunks = _split_utf8(text, 7) recombined = "".join(chunks) assert recombined == text for chunk in chunks: assert len(chunk.encode("utf-8")) <= 7 def test_empty(self): assert _split_utf8("", 10) == [""] def test_exact_fit(self): text = "abcde" assert _split_utf8(text, 5) == ["abcde"]