Files
derp/tests/test_plugin.py
user 8129b79cdb test: add config, format_msg, and Bot API tests
New test_config.py: merge, load, resolve_config tests.
Extend test_irc.py: format_msg edge cases (colon, empty, multi-param).
Extend test_plugin.py: Bot API via FakeConnection, _split_utf8 tests.
Test count: 92 -> 120.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 03:27:18 +01:00

656 lines
21 KiB
Python

"""Tests for the plugin system."""
import asyncio
import textwrap
from pathlib import Path
from derp.bot import _AMBIGUOUS, Bot, _split_utf8
from derp.irc import Message
from derp.plugin import PluginRegistry, command, event
from derp.state import StateStore
class TestDecorators:
"""Test that decorators mark functions correctly."""
def test_command_decorator(self):
@command("test", help="a test command")
async def handler(bot, msg):
pass
assert handler._derp_command == "test"
assert handler._derp_help == "a test command"
def test_event_decorator(self):
@event("join")
async def handler(bot, msg):
pass
assert handler._derp_event == "JOIN"
def test_event_decorator_case(self):
@event("PRIVMSG")
async def handler(bot, msg):
pass
assert handler._derp_event == "PRIVMSG"
def test_command_decorator_admin(self):
@command("secret", help="admin only", admin=True)
async def handler(bot, msg):
pass
assert handler._derp_command == "secret"
assert handler._derp_admin is True
def test_command_decorator_admin_default(self):
@command("public", help="everyone")
async def handler(bot, msg):
pass
assert getattr(handler, "_derp_admin", False) is False
class TestRegistry:
"""Test the plugin registry."""
def test_register_command(self):
registry = PluginRegistry()
async def handler(bot, msg):
pass
registry.register_command("test", handler, help="test help")
assert "test" in registry.commands
assert registry.commands["test"].help == "test help"
def test_register_command_admin(self):
registry = PluginRegistry()
async def handler(bot, msg):
pass
registry.register_command("secret", handler, help="admin", admin=True)
assert registry.commands["secret"].admin is True
def test_register_command_admin_default(self):
registry = PluginRegistry()
async def handler(bot, msg):
pass
registry.register_command("public", handler, help="public")
assert registry.commands["public"].admin is False
def test_register_event(self):
registry = PluginRegistry()
async def handler(bot, msg):
pass
registry.register_event("PRIVMSG", handler)
assert "PRIVMSG" in registry.events
assert len(registry.events["PRIVMSG"]) == 1
def test_multiple_event_handlers(self):
registry = PluginRegistry()
async def handler_a(bot, msg):
pass
async def handler_b(bot, msg):
pass
registry.register_event("JOIN", handler_a)
registry.register_event("JOIN", handler_b)
assert len(registry.events["JOIN"]) == 2
def test_load_plugin_file(self, tmp_path: Path):
plugin_code = textwrap.dedent("""\
from derp.plugin import command, event
@command("greet", help="Say hello")
async def cmd_greet(bot, msg):
pass
@event("JOIN")
async def on_join(bot, msg):
pass
""")
plugin_file = tmp_path / "greet.py"
plugin_file.write_text(plugin_code)
registry = PluginRegistry()
registry.load_plugin(plugin_file)
assert "greet" in registry.commands
assert "JOIN" in registry.events
def test_load_directory(self, tmp_path: Path):
for name in ("a.py", "b.py"):
(tmp_path / name).write_text(textwrap.dedent(f"""\
from derp.plugin import command
@command("{name[0]}", help="{name}")
async def cmd(bot, msg):
pass
"""))
registry = PluginRegistry()
registry.load_directory(tmp_path)
assert "a" in registry.commands
assert "b" in registry.commands
def test_skip_underscore_files(self, tmp_path: Path):
(tmp_path / "__init__.py").write_text("")
(tmp_path / "_private.py").write_text("x = 1\n")
(tmp_path / "valid.py").write_text(textwrap.dedent("""\
from derp.plugin import command
@command("valid")
async def cmd(bot, msg):
pass
"""))
registry = PluginRegistry()
registry.load_directory(tmp_path)
assert "valid" in registry.commands
assert len(registry.commands) == 1
def test_load_missing_directory(self, tmp_path: Path):
registry = PluginRegistry()
registry.load_directory(tmp_path / "nonexistent")
assert len(registry.commands) == 0
def test_load_plugin_returns_count(self, tmp_path: Path):
plugin_file = tmp_path / "counting.py"
plugin_file.write_text(textwrap.dedent("""\
from derp.plugin import command, event
@command("one", help="first")
async def cmd_one(bot, msg):
pass
@command("two", help="second")
async def cmd_two(bot, msg):
pass
@event("JOIN")
async def on_join(bot, msg):
pass
"""))
registry = PluginRegistry()
count = registry.load_plugin(plugin_file)
assert count == 3
def test_load_plugin_admin_flag(self, tmp_path: Path):
plugin_code = textwrap.dedent("""\
from derp.plugin import command
@command("secret", help="Admin only", admin=True)
async def cmd_secret(bot, msg):
pass
@command("public", help="Everyone")
async def cmd_public(bot, msg):
pass
""")
plugin_file = tmp_path / "mixed.py"
plugin_file.write_text(plugin_code)
registry = PluginRegistry()
registry.load_plugin(plugin_file)
assert registry.commands["secret"].admin is True
assert registry.commands["public"].admin is False
def test_load_plugin_stores_path(self, tmp_path: Path):
plugin_file = tmp_path / "pathed.py"
plugin_file.write_text(textwrap.dedent("""\
from derp.plugin import command
@command("pathed")
async def cmd(bot, msg):
pass
"""))
registry = PluginRegistry()
registry.load_plugin(plugin_file)
assert "pathed" in registry._paths
assert registry._paths["pathed"] == plugin_file.resolve()
class TestHotReload:
"""Test hot-load, unload, and reload of plugins."""
def _write_plugin(self, tmp_path: Path, name: str, code: str) -> Path:
"""Write a plugin file and return its path."""
path = tmp_path / f"{name}.py"
path.write_text(textwrap.dedent(code))
return path
def test_unload_removes_commands(self, tmp_path: Path):
registry = PluginRegistry()
self._write_plugin(tmp_path, "greet", """\
from derp.plugin import command
@command("hello")
async def cmd(bot, msg):
pass
""")
registry.load_plugin(tmp_path / "greet.py")
assert "hello" in registry.commands
result = registry.unload_plugin("greet")
assert result is True
assert "hello" not in registry.commands
assert "greet" not in registry._modules
assert "greet" not in registry._paths
def test_unload_removes_events(self, tmp_path: Path):
registry = PluginRegistry()
self._write_plugin(tmp_path, "evplug", """\
from derp.plugin import event
@event("JOIN")
async def on_join(bot, msg):
pass
""")
registry.load_plugin(tmp_path / "evplug.py")
assert "JOIN" in registry.events
assert len(registry.events["JOIN"]) == 1
registry.unload_plugin("evplug")
assert "JOIN" not in registry.events
def test_unload_unknown_returns_false(self):
registry = PluginRegistry()
assert registry.unload_plugin("nonexistent") is False
def test_unload_core_refused(self, tmp_path: Path):
registry = PluginRegistry()
self._write_plugin(tmp_path, "core", """\
from derp.plugin import command
@command("ping")
async def cmd(bot, msg):
pass
""")
registry.load_plugin(tmp_path / "core.py")
assert "ping" in registry.commands
result = registry.unload_plugin("core")
assert result is False
assert "ping" in registry.commands
def test_reload_plugin(self, tmp_path: Path):
registry = PluginRegistry()
path = self._write_plugin(tmp_path, "mutable", """\
from derp.plugin import command
@command("old")
async def cmd(bot, msg):
pass
""")
registry.load_plugin(path)
assert "old" in registry.commands
# Rewrite the file with a different command
path.write_text(textwrap.dedent("""\
from derp.plugin import command
@command("new")
async def cmd(bot, msg):
pass
"""))
ok, reason = registry.reload_plugin("mutable")
assert ok is True
assert "old" not in registry.commands
assert "new" in registry.commands
def test_reload_unknown_returns_failure(self):
registry = PluginRegistry()
ok, reason = registry.reload_plugin("ghost")
assert ok is False
assert "not loaded" in reason
def test_reload_core_allowed(self, tmp_path: Path):
registry = PluginRegistry()
self._write_plugin(tmp_path, "core", """\
from derp.plugin import command
@command("ping")
async def cmd(bot, msg):
pass
""")
registry.load_plugin(tmp_path / "core.py")
ok, _ = registry.reload_plugin("core")
assert ok is True
assert "ping" in registry.commands
def test_unload_preserves_other_plugins(self, tmp_path: Path):
registry = PluginRegistry()
self._write_plugin(tmp_path, "keep", """\
from derp.plugin import command, event
@command("keeper")
async def cmd(bot, msg):
pass
@event("JOIN")
async def on_join(bot, msg):
pass
""")
self._write_plugin(tmp_path, "drop", """\
from derp.plugin import command, event
@command("dropper")
async def cmd(bot, msg):
pass
@event("JOIN")
async def on_join(bot, msg):
pass
""")
registry.load_directory(tmp_path)
assert "keeper" in registry.commands
assert "dropper" in registry.commands
assert len(registry.events["JOIN"]) == 2
registry.unload_plugin("drop")
assert "keeper" in registry.commands
assert "dropper" not in registry.commands
assert len(registry.events["JOIN"]) == 1
class TestPrefixMatch:
"""Test shorthand / prefix matching for commands."""
@staticmethod
def _make_bot(commands: list[str]) -> Bot:
"""Create a minimal Bot with the given command names registered."""
config = {
"server": {"host": "localhost", "port": 6667, "tls": False,
"nick": "test", "user": "test", "realname": "test"},
"bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins"},
}
registry = PluginRegistry()
for name in commands:
async def _noop(bot, msg):
pass
registry.register_command(name, _noop, plugin="test")
return Bot(config, registry)
def test_exact_match(self):
bot = self._make_bot(["ping", "pong", "plugins"])
handler = bot._resolve_command("ping")
assert handler is not None
assert handler.name == "ping"
def test_unique_prefix(self):
bot = self._make_bot(["ping", "help", "version"])
handler = bot._resolve_command("h")
assert handler is not None
assert handler.name == "help"
def test_unique_prefix_partial(self):
bot = self._make_bot(["ping", "plugins", "help"])
handler = bot._resolve_command("pi")
assert handler is not None
assert handler.name == "ping"
def test_ambiguous_prefix(self):
bot = self._make_bot(["ping", "plugins", "help"])
result = bot._resolve_command("p")
assert result is _AMBIGUOUS
def test_no_match(self):
bot = self._make_bot(["ping", "help"])
assert bot._resolve_command("z") is None
def test_exact_match_beats_prefix(self):
bot = self._make_bot(["help", "helper"])
handler = bot._resolve_command("help")
assert handler is not None
assert handler.name == "help"
def test_single_char_unique(self):
bot = self._make_bot(["version", "help", "ping"])
handler = bot._resolve_command("v")
assert handler is not None
assert handler.name == "version"
class TestIsAdmin:
"""Test admin permission checks."""
@staticmethod
def _make_bot(admins: list[str] | None = None, opers: set[str] | None = None) -> Bot:
"""Create a Bot with optional admin patterns and oper set."""
config = {
"server": {"host": "localhost", "port": 6667, "tls": False,
"nick": "test", "user": "test", "realname": "test"},
"bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins",
"admins": admins or []},
}
bot = Bot(config, PluginRegistry())
if opers:
bot._opers = opers
return bot
@staticmethod
def _msg(prefix: str) -> Message:
"""Create a minimal Message with a given prefix."""
return Message(raw="", prefix=prefix, nick=prefix.split("!")[0],
command="PRIVMSG", params=["#test", "!test"], tags={})
def test_no_prefix_not_admin(self):
bot = self._make_bot()
msg = Message(raw="", prefix=None, nick=None, command="PRIVMSG", params=[], tags={})
assert bot._is_admin(msg) is False
def test_oper_is_admin(self):
bot = self._make_bot(opers={"alice!~alice@host"})
msg = self._msg("alice!~alice@host")
assert bot._is_admin(msg) is True
def test_hostmask_pattern_match(self):
bot = self._make_bot(admins=["*!~user@trusted.host"])
msg = self._msg("bob!~user@trusted.host")
assert bot._is_admin(msg) is True
def test_hostmask_pattern_no_match(self):
bot = self._make_bot(admins=["*!~user@trusted.host"])
msg = self._msg("bob!~other@untrusted.host")
assert bot._is_admin(msg) is False
def test_wildcard_pattern(self):
bot = self._make_bot(admins=["ops!*@*.ops.net"])
msg = self._msg("ops!~ident@server.ops.net")
assert bot._is_admin(msg) is True
def test_no_patterns_no_opers(self):
bot = self._make_bot()
msg = self._msg("nobody!~user@host")
assert bot._is_admin(msg) is False
class TestStateStore:
"""Test the SQLite key-value state store."""
def test_get_missing_returns_default(self, tmp_path: Path):
store = StateStore(tmp_path / "state.db")
assert store.get("plug", "key") is None
assert store.get("plug", "key", "fallback") == "fallback"
def test_set_and_get(self, tmp_path: Path):
store = StateStore(tmp_path / "state.db")
store.set("plug", "color", "blue")
assert store.get("plug", "color") == "blue"
def test_set_overwrite(self, tmp_path: Path):
store = StateStore(tmp_path / "state.db")
store.set("plug", "val", "one")
store.set("plug", "val", "two")
assert store.get("plug", "val") == "two"
def test_namespace_isolation(self, tmp_path: Path):
store = StateStore(tmp_path / "state.db")
store.set("alpha", "key", "a")
store.set("beta", "key", "b")
assert store.get("alpha", "key") == "a"
assert store.get("beta", "key") == "b"
def test_delete_existing(self, tmp_path: Path):
store = StateStore(tmp_path / "state.db")
store.set("plug", "key", "val")
assert store.delete("plug", "key") is True
assert store.get("plug", "key") is None
def test_delete_missing(self, tmp_path: Path):
store = StateStore(tmp_path / "state.db")
assert store.delete("plug", "ghost") is False
def test_keys(self, tmp_path: Path):
store = StateStore(tmp_path / "state.db")
store.set("plug", "b", "2")
store.set("plug", "a", "1")
store.set("other", "c", "3")
assert store.keys("plug") == ["a", "b"]
assert store.keys("other") == ["c"]
assert store.keys("empty") == []
def test_clear(self, tmp_path: Path):
store = StateStore(tmp_path / "state.db")
store.set("plug", "a", "1")
store.set("plug", "b", "2")
store.set("other", "c", "3")
count = store.clear("plug")
assert count == 2
assert store.keys("plug") == []
assert store.keys("other") == ["c"]
def test_clear_empty(self, tmp_path: Path):
store = StateStore(tmp_path / "state.db")
assert store.clear("empty") == 0
def test_close_and_reopen(self, tmp_path: Path):
db_path = tmp_path / "state.db"
store = StateStore(db_path)
store.set("plug", "persist", "yes")
store.close()
store2 = StateStore(db_path)
assert store2.get("plug", "persist") == "yes"
class _FakeConnection:
"""Minimal IRCConnection stand-in that captures sent lines."""
def __init__(self):
self.sent: list[str] = []
async def send(self, line: str) -> None:
self.sent.append(line)
def _make_test_bot() -> Bot:
"""Create a Bot with a FakeConnection for testing the public API."""
config = {
"server": {"host": "localhost", "port": 6667, "tls": False,
"nick": "test", "user": "test", "realname": "test"},
"bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins"},
}
bot = Bot(config, PluginRegistry())
bot.conn = _FakeConnection() # type: ignore[assignment]
return bot
class TestBotAPI:
"""Test Bot public API methods via FakeConnection."""
def test_send(self):
bot = _make_test_bot()
asyncio.run(bot.send("#ch", "hello world"))
assert bot.conn.sent == ["PRIVMSG #ch :hello world"]
def test_send_single_word(self):
bot = _make_test_bot()
asyncio.run(bot.send("#ch", "hello"))
assert bot.conn.sent == ["PRIVMSG #ch hello"]
def test_send_multiline(self):
bot = _make_test_bot()
asyncio.run(bot.send("#ch", "line one\nline two"))
assert len(bot.conn.sent) == 2
assert bot.conn.sent[0] == "PRIVMSG #ch :line one"
assert bot.conn.sent[1] == "PRIVMSG #ch :line two"
def test_reply_channel(self):
bot = _make_test_bot()
msg = Message(raw="", prefix="nick!u@h", nick="nick",
command="PRIVMSG", params=["#ch", "hi"], tags={})
asyncio.run(bot.reply(msg, "hello there"))
assert bot.conn.sent == ["PRIVMSG #ch :hello there"]
def test_reply_pm(self):
bot = _make_test_bot()
msg = Message(raw="", prefix="nick!u@h", nick="nick",
command="PRIVMSG", params=["testbot", "hi"], tags={})
asyncio.run(bot.reply(msg, "hello there"))
assert bot.conn.sent == ["PRIVMSG nick :hello there"]
def test_join(self):
bot = _make_test_bot()
asyncio.run(bot.join("#test"))
assert bot.conn.sent == ["JOIN :#test"]
def test_kick(self):
bot = _make_test_bot()
asyncio.run(bot.kick("#ch", "baduser", "bad behavior"))
assert bot.conn.sent == ["KICK #ch baduser :bad behavior"]
def test_mode(self):
bot = _make_test_bot()
asyncio.run(bot.mode("#ch", "+o", "nick"))
assert bot.conn.sent == ["MODE #ch +o nick"]
def test_set_topic(self):
bot = _make_test_bot()
asyncio.run(bot.set_topic("#ch", "new topic"))
assert bot.conn.sent == ["TOPIC #ch :new topic"]
class TestSplitUtf8:
"""Test UTF-8 safe message splitting."""
def test_short_text(self):
assert _split_utf8("hello", 100) == ["hello"]
def test_ascii_split(self):
text = "a" * 20
chunks = _split_utf8(text, 10)
assert len(chunks) == 2
assert all(len(c.encode("utf-8")) <= 10 for c in chunks)
assert "".join(chunks) == text
def test_multibyte_boundary(self):
# Each char is 3 bytes in UTF-8
text = "\u00e9" * 10 # e-acute, 2 bytes each -> 20 bytes total
chunks = _split_utf8(text, 7)
recombined = "".join(chunks)
assert recombined == text
for chunk in chunks:
assert len(chunk.encode("utf-8")) <= 7
def test_empty(self):
assert _split_utf8("", 10) == [""]
def test_exact_fit(self):
text = "abcde"
assert _split_utf8(text, 5) == ["abcde"]