Files
derp/tests/test_integration.py
user fd8f72c3cc fix: detect oper status when users join channels
Previously the bot only sent WHO on connect (001), so users joining
after the initial scan were never checked for oper status. Now sends
WHO <nick> on every JOIN event to detect opers mid-session.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 12:53:12 +01:00

367 lines
12 KiB
Python

"""Integration tests: full bot pipeline with mock IRC connection.
Replaces IRCConnection with a queue-based mock to test the entire
dispatch pipeline (connection -> registration -> dispatch -> response)
without network I/O.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from derp import __version__
from derp.bot import Bot
from derp.irc import parse
from derp.plugin import PluginRegistry
from derp.state import StateStore
# -- Mock connection --------------------------------------------------------
class _MockConnection:
"""Drop-in replacement for IRCConnection using asyncio.Queue."""
def __init__(self) -> None:
self._queue: asyncio.Queue[str | None] = asyncio.Queue()
self.sent: list[str] = []
self._connected = True
async def connect(self) -> None:
"""No-op: no real TCP."""
async def close(self) -> None:
"""Mark connection as closed."""
self._connected = False
async def send(self, line: str) -> None:
"""Capture a sent line."""
self.sent.append(line)
async def readline(self) -> str | None:
"""Read next queued line, or None for EOF."""
return await self._queue.get()
@property
def connected(self) -> bool:
"""Whether the connection is open."""
return self._connected
def inject(self, line: str) -> None:
"""Queue a raw IRC line for the bot to read."""
self._queue.put_nowait(line)
def disconnect(self) -> None:
"""Queue EOF to end the bot's read loop."""
self._queue.put_nowait(None)
# -- Test harness -----------------------------------------------------------
class _Harness:
"""Test fixture: creates a Bot with _MockConnection and helpers."""
def __init__(
self,
channels: list[str] | None = None,
admins: list[str] | None = None,
channel_config: dict | None = None,
ircv3_caps: list[str] | None = None,
) -> None:
server_cfg: dict = {
"host": "localhost",
"port": 6667,
"tls": False,
"nick": "test",
"user": "test",
"realname": "test bot",
}
if ircv3_caps is not None:
server_cfg["ircv3_caps"] = ircv3_caps
config: dict = {
"server": server_cfg,
"bot": {
"prefix": "!",
"channels": channels or [],
"plugins_dir": "plugins",
"admins": admins or [],
"rate_limit": 100.0,
"rate_burst": 100,
},
}
if channel_config:
config["channels"] = channel_config
self.registry = PluginRegistry()
self.bot = Bot(config, self.registry)
self.conn = _MockConnection()
self.bot.conn = self.conn # type: ignore[assignment]
self.bot.state = StateStore(":memory:")
self.registry.load_plugin(Path("plugins/core.py"))
def inject_registration(self, caps: str = "") -> None:
"""Queue minimal CAP LS + 001 handshake.
If ``caps`` is non-empty the server offers those capabilities
and an ACK response is queued so the bot can complete CAP REQ.
"""
self.conn.inject(f":server CAP * LS :{caps}")
if caps:
self.conn.inject(f":server CAP * ACK :{caps}")
self.conn.inject(":server 001 test :Welcome to IRC")
def privmsg(
self, nick: str, target: str, text: str,
user: str = "user", host: str = "host",
) -> None:
"""Queue a PRIVMSG from a user."""
self.conn.inject(f":{nick}!{user}@{host} PRIVMSG {target} :{text}")
async def run(self) -> None:
"""Run the bot until EOF, then drain spawned tasks."""
self.conn.disconnect()
self.bot._running = True
await self.bot._connect_and_run()
if self.bot._tasks:
await asyncio.gather(*list(self.bot._tasks), return_exceptions=True)
self.bot.state.close()
def sent_privmsgs(self, target: str) -> list[str]:
"""Extract text from PRIVMSG lines sent to a target."""
results = []
for line in self.conn.sent:
msg = parse(line)
if msg.command == "PRIVMSG" and msg.target == target:
results.append(msg.text)
return results
def sent_notices(self, target: str) -> list[str]:
"""Extract text from NOTICE lines sent to a target."""
results = []
for line in self.conn.sent:
msg = parse(line)
if msg.command == "NOTICE" and msg.target == target:
results.append(msg.text)
return results
# -- Registration -----------------------------------------------------------
class TestRegistration:
"""Test IRC registration handshake."""
def test_sends_cap_nick_user(self):
"""Bot sends CAP LS, CAP END, NICK, USER in order."""
h = _Harness()
h.inject_registration()
asyncio.run(h.run())
assert h.conn.sent[0] == "CAP LS 302"
assert "CAP END" in h.conn.sent
assert "NICK :test" in h.conn.sent
user_lines = [s for s in h.conn.sent if s.startswith("USER")]
assert len(user_lines) == 1
assert "test bot" in user_lines[0]
def test_joins_channels_on_welcome(self):
"""Bot joins configured channels after RPL_WELCOME (001)."""
h = _Harness(channels=["#test"])
h.inject_registration()
asyncio.run(h.run())
assert "JOIN :#test" in h.conn.sent
assert "WHO :#test" in h.conn.sent
def test_cap_negotiation(self):
"""Bot requests and acknowledges IRCv3 capabilities."""
h = _Harness(ircv3_caps=["multi-prefix"])
h.inject_registration(caps="multi-prefix")
asyncio.run(h.run())
assert "CAP REQ :multi-prefix" in h.conn.sent
idx_req = h.conn.sent.index("CAP REQ :multi-prefix")
idx_end = h.conn.sent.index("CAP END")
assert idx_req < idx_end
# -- Ping/Pong -------------------------------------------------------------
class TestPingPong:
"""Test server PING/PONG handling."""
def test_server_ping(self):
"""Bot responds to server PING with matching PONG."""
h = _Harness()
h.inject_registration()
h.conn.inject("PING :token123")
asyncio.run(h.run())
assert "PONG :token123" in h.conn.sent
# -- Command dispatch -------------------------------------------------------
class TestCommandDispatch:
"""Test command parsing and plugin dispatch."""
def test_ping_command(self):
"""!ping triggers the core ping handler."""
h = _Harness(channels=["#test"])
h.inject_registration()
h.privmsg("nick", "#test", "!ping")
asyncio.run(h.run())
assert "pong" in h.sent_privmsgs("#test")
def test_help_command(self):
"""!help lists available commands."""
h = _Harness(channels=["#test"])
h.inject_registration()
h.privmsg("nick", "#test", "!help")
asyncio.run(h.run())
replies = h.sent_privmsgs("#test")
assert len(replies) == 1
assert "Commands:" in replies[0]
assert "!ping" in replies[0]
def test_unknown_command_ignored(self):
"""Unknown commands produce no reply."""
h = _Harness(channels=["#test"])
h.inject_registration()
h.privmsg("nick", "#test", "!nonexistent")
asyncio.run(h.run())
assert h.sent_privmsgs("#test") == []
def test_prefix_matching(self):
"""Unambiguous prefix resolves to the full command."""
h = _Harness(channels=["#test"])
h.inject_registration()
h.privmsg("nick", "#test", "!pi")
asyncio.run(h.run())
assert "pong" in h.sent_privmsgs("#test")
# -- Admin ------------------------------------------------------------------
class TestAdmin:
"""Test admin permission enforcement."""
def test_admin_denied(self):
"""Non-admin user is denied admin commands."""
h = _Harness(channels=["#test"])
h.inject_registration()
h.privmsg("nobody", "#test", "!admins")
asyncio.run(h.run())
replies = h.sent_privmsgs("#test")
assert any("Permission denied" in r for r in replies)
def test_admin_by_hostmask(self):
"""Configured hostmask pattern grants admin access."""
h = _Harness(channels=["#test"], admins=["admin!*@*"])
h.inject_registration()
h.privmsg("admin", "#test", "!admins")
asyncio.run(h.run())
replies = h.sent_privmsgs("#test")
assert not any("Permission denied" in r for r in replies)
assert any("Patterns:" in r for r in replies)
def test_oper_detection(self):
"""IRC operator detected via WHO reply can use admin commands."""
h = _Harness(channels=["#test"])
h.inject_registration()
# RPL_WHOREPLY (352): bot chan user host server nick flags :hops realname
h.conn.inject(
":server 352 test #test oper operhost irc.server oper H* "
":0 Oper Name"
)
h.privmsg("oper", "#test", "!admins", user="oper", host="operhost")
asyncio.run(h.run())
replies = h.sent_privmsgs("#test")
assert not any("Permission denied" in r for r in replies)
assert any("Opers:" in r for r in replies)
def test_oper_detection_on_join(self):
"""User joining a channel triggers WHO; oper detected mid-session."""
h = _Harness(channels=["#test"])
h.inject_registration()
# User joins the channel after the bot is already connected
h.conn.inject(":oper!oper@operhost JOIN #test")
# Server responds to the WHO triggered by the JOIN
h.conn.inject(
":server 352 test #test oper operhost irc.server oper H* "
":0 Oper Name"
)
h.privmsg("oper", "#test", "!admins", user="oper", host="operhost")
asyncio.run(h.run())
# Verify WHO was sent for the joining nick
who_sent = [s for s in h.conn.sent if "WHO" in s and "oper" in s]
assert len(who_sent) >= 1
replies = h.sent_privmsgs("#test")
assert not any("Permission denied" in r for r in replies)
assert any("oper!oper@operhost" in r for r in replies)
# -- Channel filter ---------------------------------------------------------
class TestChannelFilter:
"""Test per-channel plugin allow/deny in dispatch."""
def test_command_allowed_unrestricted(self):
"""Commands work in channels with no plugin restrictions."""
h = _Harness(channels=["#open"])
h.inject_registration()
h.privmsg("nick", "#open", "!ping")
asyncio.run(h.run())
assert "pong" in h.sent_privmsgs("#open")
def test_command_denied_restricted(self):
"""Non-allowed plugin commands are silently ignored."""
h = _Harness(
channels=["#locked"],
channel_config={"#locked": {"plugins": ["core"]}},
)
h.inject_registration()
# Register a command in a non-core plugin
async def _fakecmd(bot, msg):
await bot.reply(msg, "should not appear")
h.registry.register_command("fakecmd", _fakecmd, plugin="testplugin")
h.privmsg("nick", "#locked", "!fakecmd")
asyncio.run(h.run())
assert h.sent_privmsgs("#locked") == []
# -- CTCP -------------------------------------------------------------------
class TestCTCP:
"""Test CTCP query/response handling."""
def test_ctcp_version(self):
"""Bot responds to CTCP VERSION with a NOTICE."""
h = _Harness()
h.inject_registration()
h.conn.inject(":nick!user@host PRIVMSG test :\x01VERSION\x01")
asyncio.run(h.run())
notices = h.sent_notices("nick")
assert len(notices) == 1
assert f"VERSION derp {__version__}" in notices[0]