TestChannelFilter: allowed/denied/PM/no-config/core-exempt/ampersand. TestChannelConfig: TOML loading, defaults. TestJsonFormatter: fields, exception, unicode, single-line, timestamp format.
708 lines
23 KiB
Python
708 lines
23 KiB
Python
"""Tests for the plugin system."""
|
|
|
|
import asyncio
|
|
import textwrap
|
|
from pathlib import Path
|
|
|
|
from derp.bot import _AMBIGUOUS, Bot, _split_utf8
|
|
from derp.irc import Message
|
|
from derp.plugin import PluginRegistry, command, event
|
|
from derp.state import StateStore
|
|
|
|
|
|
class TestDecorators:
|
|
"""Test that decorators mark functions correctly."""
|
|
|
|
def test_command_decorator(self):
|
|
@command("test", help="a test command")
|
|
async def handler(bot, msg):
|
|
pass
|
|
|
|
assert handler._derp_command == "test"
|
|
assert handler._derp_help == "a test command"
|
|
|
|
def test_event_decorator(self):
|
|
@event("join")
|
|
async def handler(bot, msg):
|
|
pass
|
|
|
|
assert handler._derp_event == "JOIN"
|
|
|
|
def test_event_decorator_case(self):
|
|
@event("PRIVMSG")
|
|
async def handler(bot, msg):
|
|
pass
|
|
|
|
assert handler._derp_event == "PRIVMSG"
|
|
|
|
|
|
def test_command_decorator_admin(self):
|
|
@command("secret", help="admin only", admin=True)
|
|
async def handler(bot, msg):
|
|
pass
|
|
|
|
assert handler._derp_command == "secret"
|
|
assert handler._derp_admin is True
|
|
|
|
def test_command_decorator_admin_default(self):
|
|
@command("public", help="everyone")
|
|
async def handler(bot, msg):
|
|
pass
|
|
|
|
assert getattr(handler, "_derp_admin", False) is False
|
|
|
|
|
|
class TestRegistry:
|
|
"""Test the plugin registry."""
|
|
|
|
def test_register_command(self):
|
|
registry = PluginRegistry()
|
|
|
|
async def handler(bot, msg):
|
|
pass
|
|
|
|
registry.register_command("test", handler, help="test help")
|
|
assert "test" in registry.commands
|
|
assert registry.commands["test"].help == "test help"
|
|
|
|
def test_register_command_admin(self):
|
|
registry = PluginRegistry()
|
|
|
|
async def handler(bot, msg):
|
|
pass
|
|
|
|
registry.register_command("secret", handler, help="admin", admin=True)
|
|
assert registry.commands["secret"].admin is True
|
|
|
|
def test_register_command_admin_default(self):
|
|
registry = PluginRegistry()
|
|
|
|
async def handler(bot, msg):
|
|
pass
|
|
|
|
registry.register_command("public", handler, help="public")
|
|
assert registry.commands["public"].admin is False
|
|
|
|
def test_register_event(self):
|
|
registry = PluginRegistry()
|
|
|
|
async def handler(bot, msg):
|
|
pass
|
|
|
|
registry.register_event("PRIVMSG", handler)
|
|
assert "PRIVMSG" in registry.events
|
|
assert len(registry.events["PRIVMSG"]) == 1
|
|
|
|
def test_multiple_event_handlers(self):
|
|
registry = PluginRegistry()
|
|
|
|
async def handler_a(bot, msg):
|
|
pass
|
|
|
|
async def handler_b(bot, msg):
|
|
pass
|
|
|
|
registry.register_event("JOIN", handler_a)
|
|
registry.register_event("JOIN", handler_b)
|
|
assert len(registry.events["JOIN"]) == 2
|
|
|
|
def test_load_plugin_file(self, tmp_path: Path):
|
|
plugin_code = textwrap.dedent("""\
|
|
from derp.plugin import command, event
|
|
|
|
@command("greet", help="Say hello")
|
|
async def cmd_greet(bot, msg):
|
|
pass
|
|
|
|
@event("JOIN")
|
|
async def on_join(bot, msg):
|
|
pass
|
|
""")
|
|
plugin_file = tmp_path / "greet.py"
|
|
plugin_file.write_text(plugin_code)
|
|
|
|
registry = PluginRegistry()
|
|
registry.load_plugin(plugin_file)
|
|
|
|
assert "greet" in registry.commands
|
|
assert "JOIN" in registry.events
|
|
|
|
def test_load_directory(self, tmp_path: Path):
|
|
for name in ("a.py", "b.py"):
|
|
(tmp_path / name).write_text(textwrap.dedent(f"""\
|
|
from derp.plugin import command
|
|
|
|
@command("{name[0]}", help="{name}")
|
|
async def cmd(bot, msg):
|
|
pass
|
|
"""))
|
|
|
|
registry = PluginRegistry()
|
|
registry.load_directory(tmp_path)
|
|
|
|
assert "a" in registry.commands
|
|
assert "b" in registry.commands
|
|
|
|
def test_skip_underscore_files(self, tmp_path: Path):
|
|
(tmp_path / "__init__.py").write_text("")
|
|
(tmp_path / "_private.py").write_text("x = 1\n")
|
|
(tmp_path / "valid.py").write_text(textwrap.dedent("""\
|
|
from derp.plugin import command
|
|
|
|
@command("valid")
|
|
async def cmd(bot, msg):
|
|
pass
|
|
"""))
|
|
|
|
registry = PluginRegistry()
|
|
registry.load_directory(tmp_path)
|
|
|
|
assert "valid" in registry.commands
|
|
assert len(registry.commands) == 1
|
|
|
|
def test_load_missing_directory(self, tmp_path: Path):
|
|
registry = PluginRegistry()
|
|
registry.load_directory(tmp_path / "nonexistent")
|
|
assert len(registry.commands) == 0
|
|
|
|
def test_load_plugin_returns_count(self, tmp_path: Path):
|
|
plugin_file = tmp_path / "counting.py"
|
|
plugin_file.write_text(textwrap.dedent("""\
|
|
from derp.plugin import command, event
|
|
|
|
@command("one", help="first")
|
|
async def cmd_one(bot, msg):
|
|
pass
|
|
|
|
@command("two", help="second")
|
|
async def cmd_two(bot, msg):
|
|
pass
|
|
|
|
@event("JOIN")
|
|
async def on_join(bot, msg):
|
|
pass
|
|
"""))
|
|
|
|
registry = PluginRegistry()
|
|
count = registry.load_plugin(plugin_file)
|
|
assert count == 3
|
|
|
|
def test_load_plugin_admin_flag(self, tmp_path: Path):
|
|
plugin_code = textwrap.dedent("""\
|
|
from derp.plugin import command
|
|
|
|
@command("secret", help="Admin only", admin=True)
|
|
async def cmd_secret(bot, msg):
|
|
pass
|
|
|
|
@command("public", help="Everyone")
|
|
async def cmd_public(bot, msg):
|
|
pass
|
|
""")
|
|
plugin_file = tmp_path / "mixed.py"
|
|
plugin_file.write_text(plugin_code)
|
|
|
|
registry = PluginRegistry()
|
|
registry.load_plugin(plugin_file)
|
|
|
|
assert registry.commands["secret"].admin is True
|
|
assert registry.commands["public"].admin is False
|
|
|
|
def test_load_plugin_stores_path(self, tmp_path: Path):
|
|
plugin_file = tmp_path / "pathed.py"
|
|
plugin_file.write_text(textwrap.dedent("""\
|
|
from derp.plugin import command
|
|
|
|
@command("pathed")
|
|
async def cmd(bot, msg):
|
|
pass
|
|
"""))
|
|
|
|
registry = PluginRegistry()
|
|
registry.load_plugin(plugin_file)
|
|
assert "pathed" in registry._paths
|
|
assert registry._paths["pathed"] == plugin_file.resolve()
|
|
|
|
|
|
class TestHotReload:
|
|
"""Test hot-load, unload, and reload of plugins."""
|
|
|
|
def _write_plugin(self, tmp_path: Path, name: str, code: str) -> Path:
|
|
"""Write a plugin file and return its path."""
|
|
path = tmp_path / f"{name}.py"
|
|
path.write_text(textwrap.dedent(code))
|
|
return path
|
|
|
|
def test_unload_removes_commands(self, tmp_path: Path):
|
|
registry = PluginRegistry()
|
|
self._write_plugin(tmp_path, "greet", """\
|
|
from derp.plugin import command
|
|
|
|
@command("hello")
|
|
async def cmd(bot, msg):
|
|
pass
|
|
""")
|
|
registry.load_plugin(tmp_path / "greet.py")
|
|
assert "hello" in registry.commands
|
|
|
|
result = registry.unload_plugin("greet")
|
|
assert result is True
|
|
assert "hello" not in registry.commands
|
|
assert "greet" not in registry._modules
|
|
assert "greet" not in registry._paths
|
|
|
|
def test_unload_removes_events(self, tmp_path: Path):
|
|
registry = PluginRegistry()
|
|
self._write_plugin(tmp_path, "evplug", """\
|
|
from derp.plugin import event
|
|
|
|
@event("JOIN")
|
|
async def on_join(bot, msg):
|
|
pass
|
|
""")
|
|
registry.load_plugin(tmp_path / "evplug.py")
|
|
assert "JOIN" in registry.events
|
|
assert len(registry.events["JOIN"]) == 1
|
|
|
|
registry.unload_plugin("evplug")
|
|
assert "JOIN" not in registry.events
|
|
|
|
def test_unload_unknown_returns_false(self):
|
|
registry = PluginRegistry()
|
|
assert registry.unload_plugin("nonexistent") is False
|
|
|
|
def test_unload_core_refused(self, tmp_path: Path):
|
|
registry = PluginRegistry()
|
|
self._write_plugin(tmp_path, "core", """\
|
|
from derp.plugin import command
|
|
|
|
@command("ping")
|
|
async def cmd(bot, msg):
|
|
pass
|
|
""")
|
|
registry.load_plugin(tmp_path / "core.py")
|
|
assert "ping" in registry.commands
|
|
|
|
result = registry.unload_plugin("core")
|
|
assert result is False
|
|
assert "ping" in registry.commands
|
|
|
|
def test_reload_plugin(self, tmp_path: Path):
|
|
registry = PluginRegistry()
|
|
path = self._write_plugin(tmp_path, "mutable", """\
|
|
from derp.plugin import command
|
|
|
|
@command("old")
|
|
async def cmd(bot, msg):
|
|
pass
|
|
""")
|
|
registry.load_plugin(path)
|
|
assert "old" in registry.commands
|
|
|
|
# Rewrite the file with a different command
|
|
path.write_text(textwrap.dedent("""\
|
|
from derp.plugin import command
|
|
|
|
@command("new")
|
|
async def cmd(bot, msg):
|
|
pass
|
|
"""))
|
|
|
|
ok, reason = registry.reload_plugin("mutable")
|
|
assert ok is True
|
|
assert "old" not in registry.commands
|
|
assert "new" in registry.commands
|
|
|
|
def test_reload_unknown_returns_failure(self):
|
|
registry = PluginRegistry()
|
|
ok, reason = registry.reload_plugin("ghost")
|
|
assert ok is False
|
|
assert "not loaded" in reason
|
|
|
|
def test_reload_core_allowed(self, tmp_path: Path):
|
|
registry = PluginRegistry()
|
|
self._write_plugin(tmp_path, "core", """\
|
|
from derp.plugin import command
|
|
|
|
@command("ping")
|
|
async def cmd(bot, msg):
|
|
pass
|
|
""")
|
|
registry.load_plugin(tmp_path / "core.py")
|
|
ok, _ = registry.reload_plugin("core")
|
|
assert ok is True
|
|
assert "ping" in registry.commands
|
|
|
|
def test_unload_preserves_other_plugins(self, tmp_path: Path):
|
|
registry = PluginRegistry()
|
|
self._write_plugin(tmp_path, "keep", """\
|
|
from derp.plugin import command, event
|
|
|
|
@command("keeper")
|
|
async def cmd(bot, msg):
|
|
pass
|
|
|
|
@event("JOIN")
|
|
async def on_join(bot, msg):
|
|
pass
|
|
""")
|
|
self._write_plugin(tmp_path, "drop", """\
|
|
from derp.plugin import command, event
|
|
|
|
@command("dropper")
|
|
async def cmd(bot, msg):
|
|
pass
|
|
|
|
@event("JOIN")
|
|
async def on_join(bot, msg):
|
|
pass
|
|
""")
|
|
registry.load_directory(tmp_path)
|
|
assert "keeper" in registry.commands
|
|
assert "dropper" in registry.commands
|
|
assert len(registry.events["JOIN"]) == 2
|
|
|
|
registry.unload_plugin("drop")
|
|
assert "keeper" in registry.commands
|
|
assert "dropper" not in registry.commands
|
|
assert len(registry.events["JOIN"]) == 1
|
|
|
|
|
|
class TestPrefixMatch:
|
|
"""Test shorthand / prefix matching for commands."""
|
|
|
|
@staticmethod
|
|
def _make_bot(commands: list[str]) -> Bot:
|
|
"""Create a minimal Bot with the given command names registered."""
|
|
config = {
|
|
"server": {"host": "localhost", "port": 6667, "tls": False,
|
|
"nick": "test", "user": "test", "realname": "test"},
|
|
"bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins"},
|
|
}
|
|
registry = PluginRegistry()
|
|
for name in commands:
|
|
async def _noop(bot, msg):
|
|
pass
|
|
registry.register_command(name, _noop, plugin="test")
|
|
return Bot(config, registry)
|
|
|
|
def test_exact_match(self):
|
|
bot = self._make_bot(["ping", "pong", "plugins"])
|
|
handler = bot._resolve_command("ping")
|
|
assert handler is not None
|
|
assert handler.name == "ping"
|
|
|
|
def test_unique_prefix(self):
|
|
bot = self._make_bot(["ping", "help", "version"])
|
|
handler = bot._resolve_command("h")
|
|
assert handler is not None
|
|
assert handler.name == "help"
|
|
|
|
def test_unique_prefix_partial(self):
|
|
bot = self._make_bot(["ping", "plugins", "help"])
|
|
handler = bot._resolve_command("pi")
|
|
assert handler is not None
|
|
assert handler.name == "ping"
|
|
|
|
def test_ambiguous_prefix(self):
|
|
bot = self._make_bot(["ping", "plugins", "help"])
|
|
result = bot._resolve_command("p")
|
|
assert result is _AMBIGUOUS
|
|
|
|
def test_no_match(self):
|
|
bot = self._make_bot(["ping", "help"])
|
|
assert bot._resolve_command("z") is None
|
|
|
|
def test_exact_match_beats_prefix(self):
|
|
bot = self._make_bot(["help", "helper"])
|
|
handler = bot._resolve_command("help")
|
|
assert handler is not None
|
|
assert handler.name == "help"
|
|
|
|
def test_single_char_unique(self):
|
|
bot = self._make_bot(["version", "help", "ping"])
|
|
handler = bot._resolve_command("v")
|
|
assert handler is not None
|
|
assert handler.name == "version"
|
|
|
|
|
|
class TestIsAdmin:
|
|
"""Test admin permission checks."""
|
|
|
|
@staticmethod
|
|
def _make_bot(admins: list[str] | None = None, opers: set[str] | None = None) -> Bot:
|
|
"""Create a Bot with optional admin patterns and oper set."""
|
|
config = {
|
|
"server": {"host": "localhost", "port": 6667, "tls": False,
|
|
"nick": "test", "user": "test", "realname": "test"},
|
|
"bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins",
|
|
"admins": admins or []},
|
|
}
|
|
bot = Bot(config, PluginRegistry())
|
|
if opers:
|
|
bot._opers = opers
|
|
return bot
|
|
|
|
@staticmethod
|
|
def _msg(prefix: str) -> Message:
|
|
"""Create a minimal Message with a given prefix."""
|
|
return Message(raw="", prefix=prefix, nick=prefix.split("!")[0],
|
|
command="PRIVMSG", params=["#test", "!test"], tags={})
|
|
|
|
def test_no_prefix_not_admin(self):
|
|
bot = self._make_bot()
|
|
msg = Message(raw="", prefix=None, nick=None, command="PRIVMSG", params=[], tags={})
|
|
assert bot._is_admin(msg) is False
|
|
|
|
def test_oper_is_admin(self):
|
|
bot = self._make_bot(opers={"alice!~alice@host"})
|
|
msg = self._msg("alice!~alice@host")
|
|
assert bot._is_admin(msg) is True
|
|
|
|
def test_hostmask_pattern_match(self):
|
|
bot = self._make_bot(admins=["*!~user@trusted.host"])
|
|
msg = self._msg("bob!~user@trusted.host")
|
|
assert bot._is_admin(msg) is True
|
|
|
|
def test_hostmask_pattern_no_match(self):
|
|
bot = self._make_bot(admins=["*!~user@trusted.host"])
|
|
msg = self._msg("bob!~other@untrusted.host")
|
|
assert bot._is_admin(msg) is False
|
|
|
|
def test_wildcard_pattern(self):
|
|
bot = self._make_bot(admins=["ops!*@*.ops.net"])
|
|
msg = self._msg("ops!~ident@server.ops.net")
|
|
assert bot._is_admin(msg) is True
|
|
|
|
def test_no_patterns_no_opers(self):
|
|
bot = self._make_bot()
|
|
msg = self._msg("nobody!~user@host")
|
|
assert bot._is_admin(msg) is False
|
|
|
|
|
|
class TestStateStore:
|
|
"""Test the SQLite key-value state store."""
|
|
|
|
def test_get_missing_returns_default(self, tmp_path: Path):
|
|
store = StateStore(tmp_path / "state.db")
|
|
assert store.get("plug", "key") is None
|
|
assert store.get("plug", "key", "fallback") == "fallback"
|
|
|
|
def test_set_and_get(self, tmp_path: Path):
|
|
store = StateStore(tmp_path / "state.db")
|
|
store.set("plug", "color", "blue")
|
|
assert store.get("plug", "color") == "blue"
|
|
|
|
def test_set_overwrite(self, tmp_path: Path):
|
|
store = StateStore(tmp_path / "state.db")
|
|
store.set("plug", "val", "one")
|
|
store.set("plug", "val", "two")
|
|
assert store.get("plug", "val") == "two"
|
|
|
|
def test_namespace_isolation(self, tmp_path: Path):
|
|
store = StateStore(tmp_path / "state.db")
|
|
store.set("alpha", "key", "a")
|
|
store.set("beta", "key", "b")
|
|
assert store.get("alpha", "key") == "a"
|
|
assert store.get("beta", "key") == "b"
|
|
|
|
def test_delete_existing(self, tmp_path: Path):
|
|
store = StateStore(tmp_path / "state.db")
|
|
store.set("plug", "key", "val")
|
|
assert store.delete("plug", "key") is True
|
|
assert store.get("plug", "key") is None
|
|
|
|
def test_delete_missing(self, tmp_path: Path):
|
|
store = StateStore(tmp_path / "state.db")
|
|
assert store.delete("plug", "ghost") is False
|
|
|
|
def test_keys(self, tmp_path: Path):
|
|
store = StateStore(tmp_path / "state.db")
|
|
store.set("plug", "b", "2")
|
|
store.set("plug", "a", "1")
|
|
store.set("other", "c", "3")
|
|
assert store.keys("plug") == ["a", "b"]
|
|
assert store.keys("other") == ["c"]
|
|
assert store.keys("empty") == []
|
|
|
|
def test_clear(self, tmp_path: Path):
|
|
store = StateStore(tmp_path / "state.db")
|
|
store.set("plug", "a", "1")
|
|
store.set("plug", "b", "2")
|
|
store.set("other", "c", "3")
|
|
count = store.clear("plug")
|
|
assert count == 2
|
|
assert store.keys("plug") == []
|
|
assert store.keys("other") == ["c"]
|
|
|
|
def test_clear_empty(self, tmp_path: Path):
|
|
store = StateStore(tmp_path / "state.db")
|
|
assert store.clear("empty") == 0
|
|
|
|
def test_close_and_reopen(self, tmp_path: Path):
|
|
db_path = tmp_path / "state.db"
|
|
store = StateStore(db_path)
|
|
store.set("plug", "persist", "yes")
|
|
store.close()
|
|
store2 = StateStore(db_path)
|
|
assert store2.get("plug", "persist") == "yes"
|
|
|
|
|
|
class _FakeConnection:
|
|
"""Minimal IRCConnection stand-in that captures sent lines."""
|
|
|
|
def __init__(self):
|
|
self.sent: list[str] = []
|
|
|
|
async def send(self, line: str) -> None:
|
|
self.sent.append(line)
|
|
|
|
|
|
def _make_test_bot() -> Bot:
|
|
"""Create a Bot with a FakeConnection for testing the public API."""
|
|
config = {
|
|
"server": {"host": "localhost", "port": 6667, "tls": False,
|
|
"nick": "test", "user": "test", "realname": "test"},
|
|
"bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins"},
|
|
}
|
|
bot = Bot(config, PluginRegistry())
|
|
bot.conn = _FakeConnection() # type: ignore[assignment]
|
|
return bot
|
|
|
|
|
|
class TestBotAPI:
|
|
"""Test Bot public API methods via FakeConnection."""
|
|
|
|
def test_send(self):
|
|
bot = _make_test_bot()
|
|
asyncio.run(bot.send("#ch", "hello world"))
|
|
assert bot.conn.sent == ["PRIVMSG #ch :hello world"]
|
|
|
|
def test_send_single_word(self):
|
|
bot = _make_test_bot()
|
|
asyncio.run(bot.send("#ch", "hello"))
|
|
assert bot.conn.sent == ["PRIVMSG #ch hello"]
|
|
|
|
def test_send_multiline(self):
|
|
bot = _make_test_bot()
|
|
asyncio.run(bot.send("#ch", "line one\nline two"))
|
|
assert len(bot.conn.sent) == 2
|
|
assert bot.conn.sent[0] == "PRIVMSG #ch :line one"
|
|
assert bot.conn.sent[1] == "PRIVMSG #ch :line two"
|
|
|
|
def test_reply_channel(self):
|
|
bot = _make_test_bot()
|
|
msg = Message(raw="", prefix="nick!u@h", nick="nick",
|
|
command="PRIVMSG", params=["#ch", "hi"], tags={})
|
|
asyncio.run(bot.reply(msg, "hello there"))
|
|
assert bot.conn.sent == ["PRIVMSG #ch :hello there"]
|
|
|
|
def test_reply_pm(self):
|
|
bot = _make_test_bot()
|
|
msg = Message(raw="", prefix="nick!u@h", nick="nick",
|
|
command="PRIVMSG", params=["testbot", "hi"], tags={})
|
|
asyncio.run(bot.reply(msg, "hello there"))
|
|
assert bot.conn.sent == ["PRIVMSG nick :hello there"]
|
|
|
|
def test_join(self):
|
|
bot = _make_test_bot()
|
|
asyncio.run(bot.join("#test"))
|
|
assert bot.conn.sent == ["JOIN :#test"]
|
|
|
|
def test_kick(self):
|
|
bot = _make_test_bot()
|
|
asyncio.run(bot.kick("#ch", "baduser", "bad behavior"))
|
|
assert bot.conn.sent == ["KICK #ch baduser :bad behavior"]
|
|
|
|
def test_mode(self):
|
|
bot = _make_test_bot()
|
|
asyncio.run(bot.mode("#ch", "+o", "nick"))
|
|
assert bot.conn.sent == ["MODE #ch +o nick"]
|
|
|
|
def test_set_topic(self):
|
|
bot = _make_test_bot()
|
|
asyncio.run(bot.set_topic("#ch", "new topic"))
|
|
assert bot.conn.sent == ["TOPIC #ch :new topic"]
|
|
|
|
|
|
class TestChannelFilter:
|
|
"""Test per-channel plugin allow/deny."""
|
|
|
|
@staticmethod
|
|
def _make_bot(channels_cfg: dict | None = None) -> Bot:
|
|
"""Create a Bot with optional per-channel config."""
|
|
config = {
|
|
"server": {"host": "localhost", "port": 6667, "tls": False,
|
|
"nick": "test", "user": "test", "realname": "test"},
|
|
"bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins"},
|
|
"channels": channels_cfg or {},
|
|
}
|
|
return Bot(config, PluginRegistry())
|
|
|
|
def test_core_always_allowed(self):
|
|
bot = self._make_bot({"#locked": {"plugins": ["core"]}})
|
|
assert bot._plugin_allowed("core", "#locked") is True
|
|
|
|
def test_listed_plugin_allowed(self):
|
|
bot = self._make_bot({"#ops": {"plugins": ["core", "dns"]}})
|
|
assert bot._plugin_allowed("dns", "#ops") is True
|
|
|
|
def test_unlisted_plugin_denied(self):
|
|
bot = self._make_bot({"#ops": {"plugins": ["core", "dns"]}})
|
|
assert bot._plugin_allowed("encode", "#ops") is False
|
|
|
|
def test_unconfigured_channel_allows_all(self):
|
|
bot = self._make_bot({"#locked": {"plugins": ["core"]}})
|
|
assert bot._plugin_allowed("encode", "#open") is True
|
|
|
|
def test_no_channels_config_allows_all(self):
|
|
bot = self._make_bot()
|
|
assert bot._plugin_allowed("anything", "#test") is True
|
|
|
|
def test_pm_always_allowed(self):
|
|
bot = self._make_bot({"#locked": {"plugins": ["core"]}})
|
|
assert bot._plugin_allowed("encode", "someone") is True
|
|
|
|
def test_none_channel_allowed(self):
|
|
bot = self._make_bot({"#locked": {"plugins": ["core"]}})
|
|
assert bot._plugin_allowed("encode", None) is True
|
|
|
|
def test_channel_without_plugins_key(self):
|
|
bot = self._make_bot({"#other": {"some_setting": True}})
|
|
assert bot._plugin_allowed("encode", "#other") is True
|
|
|
|
def test_ampersand_channel(self):
|
|
bot = self._make_bot({"&local": {"plugins": ["core", "dns"]}})
|
|
assert bot._plugin_allowed("dns", "&local") is True
|
|
assert bot._plugin_allowed("encode", "&local") is False
|
|
|
|
|
|
class TestSplitUtf8:
|
|
"""Test UTF-8 safe message splitting."""
|
|
|
|
def test_short_text(self):
|
|
assert _split_utf8("hello", 100) == ["hello"]
|
|
|
|
def test_ascii_split(self):
|
|
text = "a" * 20
|
|
chunks = _split_utf8(text, 10)
|
|
assert len(chunks) == 2
|
|
assert all(len(c.encode("utf-8")) <= 10 for c in chunks)
|
|
assert "".join(chunks) == text
|
|
|
|
def test_multibyte_boundary(self):
|
|
# Each char is 3 bytes in UTF-8
|
|
text = "\u00e9" * 10 # e-acute, 2 bytes each -> 20 bytes total
|
|
chunks = _split_utf8(text, 7)
|
|
recombined = "".join(chunks)
|
|
assert recombined == text
|
|
for chunk in chunks:
|
|
assert len(chunk.encode("utf-8")) <= 7
|
|
|
|
def test_empty(self):
|
|
assert _split_utf8("", 10) == [""]
|
|
|
|
def test_exact_fit(self):
|
|
text = "abcde"
|
|
assert _split_utf8(text, 5) == ["abcde"]
|