Files
derp/tests/test_integration.py
user 073659607e feat: add multi-server support
Connect to multiple IRC servers concurrently from a single config file.
Plugins are loaded once and shared; per-server state is isolated via
separate SQLite databases and per-bot runtime state (bot._pstate).

- Add build_server_configs() for [servers.*] config layout
- Bot.__init__ gains name parameter, _pstate dict for plugin isolation
- cli.py runs multiple bots via asyncio.gather
- 9 stateful plugins migrated from module-level dicts to _ps(bot) pattern
- Backward compatible: legacy [server] config works unchanged

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 19:04:20 +01:00

382 lines
12 KiB
Python

"""Integration tests: full bot pipeline with mock IRC connection.
Replaces IRCConnection with a queue-based mock to test the entire
dispatch pipeline (connection -> registration -> dispatch -> response)
without network I/O.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from derp import __version__
from derp.bot import Bot
from derp.irc import parse
from derp.plugin import PluginRegistry
from derp.state import StateStore
# -- Mock connection --------------------------------------------------------
class _MockConnection:
"""Drop-in replacement for IRCConnection using asyncio.Queue."""
def __init__(self) -> None:
self._queue: asyncio.Queue[str | None] = asyncio.Queue()
self.sent: list[str] = []
self._connected = True
async def connect(self) -> None:
"""No-op: no real TCP."""
async def close(self) -> None:
"""Mark connection as closed."""
self._connected = False
async def send(self, line: str) -> None:
"""Capture a sent line."""
self.sent.append(line)
async def readline(self) -> str | None:
"""Read next queued line, or None for EOF."""
return await self._queue.get()
@property
def connected(self) -> bool:
"""Whether the connection is open."""
return self._connected
def inject(self, line: str) -> None:
"""Queue a raw IRC line for the bot to read."""
self._queue.put_nowait(line)
def disconnect(self) -> None:
"""Queue EOF to end the bot's read loop."""
self._queue.put_nowait(None)
# -- Test harness -----------------------------------------------------------
class _Harness:
"""Test fixture: creates a Bot with _MockConnection and helpers."""
def __init__(
self,
channels: list[str] | None = None,
admins: list[str] | None = None,
channel_config: dict | None = None,
ircv3_caps: list[str] | None = None,
) -> None:
server_cfg: dict = {
"host": "localhost",
"port": 6667,
"tls": False,
"nick": "test",
"user": "test",
"realname": "test bot",
}
if ircv3_caps is not None:
server_cfg["ircv3_caps"] = ircv3_caps
config: dict = {
"server": server_cfg,
"bot": {
"prefix": "!",
"channels": channels or [],
"plugins_dir": "plugins",
"admins": admins or [],
"rate_limit": 100.0,
"rate_burst": 100,
},
}
if channel_config:
config["channels"] = channel_config
self.registry = PluginRegistry()
self.bot = Bot("test", config, self.registry)
self.conn = _MockConnection()
self.bot.conn = self.conn # type: ignore[assignment]
self.bot.state = StateStore(":memory:")
self.registry.load_plugin(Path("plugins/core.py"))
def inject_registration(self, caps: str = "") -> None:
"""Queue minimal CAP LS + 001 handshake.
If ``caps`` is non-empty the server offers those capabilities
and an ACK response is queued so the bot can complete CAP REQ.
"""
self.conn.inject(f":server CAP * LS :{caps}")
if caps:
self.conn.inject(f":server CAP * ACK :{caps}")
self.conn.inject(":server 001 test :Welcome to IRC")
def privmsg(
self, nick: str, target: str, text: str,
user: str = "user", host: str = "host",
) -> None:
"""Queue a PRIVMSG from a user."""
self.conn.inject(f":{nick}!{user}@{host} PRIVMSG {target} :{text}")
async def run(self) -> None:
"""Run the bot until EOF, then drain spawned tasks."""
self.conn.disconnect()
self.bot._running = True
await self.bot._connect_and_run()
if self.bot._tasks:
await asyncio.gather(*list(self.bot._tasks), return_exceptions=True)
self.bot.state.close()
def sent_privmsgs(self, target: str) -> list[str]:
"""Extract text from PRIVMSG lines sent to a target."""
results = []
for line in self.conn.sent:
msg = parse(line)
if msg.command == "PRIVMSG" and msg.target == target:
results.append(msg.text)
return results
def sent_notices(self, target: str) -> list[str]:
"""Extract text from NOTICE lines sent to a target."""
results = []
for line in self.conn.sent:
msg = parse(line)
if msg.command == "NOTICE" and msg.target == target:
results.append(msg.text)
return results
# -- Registration -----------------------------------------------------------
class TestRegistration:
"""Test IRC registration handshake."""
def test_sends_cap_nick_user(self):
"""Bot sends CAP LS, CAP END, NICK, USER in order."""
h = _Harness()
h.inject_registration()
asyncio.run(h.run())
assert h.conn.sent[0] == "CAP LS 302"
assert "CAP END" in h.conn.sent
assert "NICK :test" in h.conn.sent
user_lines = [s for s in h.conn.sent if s.startswith("USER")]
assert len(user_lines) == 1
assert "test bot" in user_lines[0]
def test_joins_channels_on_welcome(self):
"""Bot joins configured channels after RPL_WELCOME (001)."""
h = _Harness(channels=["#test"])
h.inject_registration()
asyncio.run(h.run())
assert "JOIN :#test" in h.conn.sent
assert "WHO :#test" in h.conn.sent
def test_cap_negotiation(self):
"""Bot requests and acknowledges IRCv3 capabilities."""
h = _Harness(ircv3_caps=["multi-prefix"])
h.inject_registration(caps="multi-prefix")
asyncio.run(h.run())
assert "CAP REQ :multi-prefix" in h.conn.sent
idx_req = h.conn.sent.index("CAP REQ :multi-prefix")
idx_end = h.conn.sent.index("CAP END")
assert idx_req < idx_end
# -- Ping/Pong -------------------------------------------------------------
class TestPingPong:
"""Test server PING/PONG handling."""
def test_server_ping(self):
"""Bot responds to server PING with matching PONG."""
h = _Harness()
h.inject_registration()
h.conn.inject("PING :token123")
asyncio.run(h.run())
assert "PONG :token123" in h.conn.sent
# -- Command dispatch -------------------------------------------------------
class TestCommandDispatch:
"""Test command parsing and plugin dispatch."""
def test_ping_command(self):
"""!ping triggers the core ping handler."""
h = _Harness(channels=["#test"])
h.inject_registration()
h.privmsg("nick", "#test", "!ping")
asyncio.run(h.run())
assert "pong" in h.sent_privmsgs("#test")
def test_help_command(self):
"""!help lists available commands."""
h = _Harness(channels=["#test"])
h.inject_registration()
h.privmsg("nick", "#test", "!help")
asyncio.run(h.run())
replies = h.sent_privmsgs("#test")
assert len(replies) == 1
assert "help" in replies[0]
assert "ping" in replies[0]
def test_unknown_command_ignored(self):
"""Unknown commands produce no reply."""
h = _Harness(channels=["#test"])
h.inject_registration()
h.privmsg("nick", "#test", "!nonexistent")
asyncio.run(h.run())
assert h.sent_privmsgs("#test") == []
def test_prefix_matching(self):
"""Unambiguous prefix resolves to the full command."""
h = _Harness(channels=["#test"])
h.inject_registration()
h.privmsg("nick", "#test", "!pi")
asyncio.run(h.run())
assert "pong" in h.sent_privmsgs("#test")
# -- Admin ------------------------------------------------------------------
class TestAdmin:
"""Test admin permission enforcement."""
def test_admin_denied(self):
"""Non-admin user is denied admin commands."""
h = _Harness(channels=["#test"])
h.inject_registration()
h.privmsg("nobody", "#test", "!admins")
asyncio.run(h.run())
replies = h.sent_privmsgs("#test")
assert any("Permission denied" in r for r in replies)
def test_admin_by_hostmask(self):
"""Configured hostmask pattern grants admin access."""
h = _Harness(channels=["#test"], admins=["admin!*@*"])
h.inject_registration()
h.privmsg("admin", "#test", "!admins")
asyncio.run(h.run())
replies = h.sent_privmsgs("#test")
assert not any("Permission denied" in r for r in replies)
assert any("Admin:" in r for r in replies)
def test_oper_detection(self):
"""IRC operator detected via WHO reply can use admin commands."""
h = _Harness(channels=["#test"])
h.inject_registration()
# RPL_WHOREPLY (352): bot chan user host server nick flags :hops realname
h.conn.inject(
":server 352 test #test oper operhost irc.server oper H* "
":0 Oper Name"
)
h.privmsg("oper", "#test", "!admins", user="oper", host="operhost")
asyncio.run(h.run())
replies = h.sent_privmsgs("#test")
assert not any("Permission denied" in r for r in replies)
assert any("IRCOPs:" in r for r in replies)
def test_oper_detection_on_join(self):
"""User joining a channel triggers debounced WHO for oper detection."""
h = _Harness(channels=["#test"])
h.inject_registration()
# User joins after the bot is connected
h.conn.inject(":oper!oper@operhost JOIN #test")
# Server responds to the debounced WHO #channel
h.conn.inject(
":server 352 test #test oper operhost irc.server oper H* "
":0 Oper Name"
)
h.privmsg("oper", "#test", "!admins", user="oper", host="operhost")
asyncio.run(h.run())
# Verify debounced WHO was sent for the channel (not the nick)
who_sent = [s for s in h.conn.sent if "WHO" in s and "#test" in s]
assert len(who_sent) >= 1
# Oper detected from the 352 reply
replies = h.sent_privmsgs("#test")
assert not any("Permission denied" in r for r in replies)
assert any("oper!oper@operhost" in r for r in replies)
def test_join_who_debounce(self):
"""Multiple rapid JOINs produce only one WHO per channel."""
h = _Harness(channels=["#test"])
h.inject_registration()
# Simulate netsplit recovery: many users rejoin at once
for nick in ("alice", "bob", "carol", "dave", "eve"):
h.conn.inject(f":{nick}!{nick}@host JOIN #test")
asyncio.run(h.run())
# Only the initial connect WHO + one debounced WHO, not 5
who_lines = [s for s in h.conn.sent
if "WHO" in s and "#test" in s]
assert len(who_lines) == 2 # 001 connect + 1 debounced
# -- Channel filter ---------------------------------------------------------
class TestChannelFilter:
"""Test per-channel plugin allow/deny in dispatch."""
def test_command_allowed_unrestricted(self):
"""Commands work in channels with no plugin restrictions."""
h = _Harness(channels=["#open"])
h.inject_registration()
h.privmsg("nick", "#open", "!ping")
asyncio.run(h.run())
assert "pong" in h.sent_privmsgs("#open")
def test_command_denied_restricted(self):
"""Non-allowed plugin commands are silently ignored."""
h = _Harness(
channels=["#locked"],
channel_config={"#locked": {"plugins": ["core"]}},
)
h.inject_registration()
# Register a command in a non-core plugin
async def _fakecmd(bot, msg):
await bot.reply(msg, "should not appear")
h.registry.register_command("fakecmd", _fakecmd, plugin="testplugin")
h.privmsg("nick", "#locked", "!fakecmd")
asyncio.run(h.run())
assert h.sent_privmsgs("#locked") == []
# -- CTCP -------------------------------------------------------------------
class TestCTCP:
"""Test CTCP query/response handling."""
def test_ctcp_version(self):
"""Bot responds to CTCP VERSION with a NOTICE."""
h = _Harness()
h.inject_registration()
h.conn.inject(":nick!user@host PRIVMSG test :\x01VERSION\x01")
asyncio.run(h.run())
notices = h.sent_notices("nick")
assert len(notices) == 1
assert f"VERSION derp {__version__}" in notices[0]