Block all non-ACTION CTCP/DCC from client-to-server (outbound) and add security logging when inbound CTCP/DCC is stripped. Hard boundary with no config toggle -- DCC exposes the client's real IP which defeats the stealth proxy architecture. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
983 lines
34 KiB
Python
983 lines
34 KiB
Python
"""Tests for message router."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from bouncer.config import BouncerConfig, Config, NetworkConfig, ProxyConfig
|
|
from bouncer.irc import IRCMessage
|
|
from bouncer.router import BACKLOG_COMMANDS, Router, _suppress
|
|
|
|
# -- helpers -----------------------------------------------------------------
|
|
|
|
def _net_cfg(name: str = "libera", host: str = "irc.libera.chat",
|
|
port: int = 6697, tls: bool = True,
|
|
proxy_host: str | None = None,
|
|
proxy_port: int | None = None) -> NetworkConfig:
|
|
return NetworkConfig(
|
|
name=name, host=host, port=port, tls=tls,
|
|
proxy_host=proxy_host, proxy_port=proxy_port,
|
|
)
|
|
|
|
|
|
def _config(*net_cfgs: NetworkConfig) -> Config:
|
|
nets = {n.name: n for n in net_cfgs} if net_cfgs else {
|
|
"libera": _net_cfg("libera"),
|
|
}
|
|
return Config(
|
|
bouncer=BouncerConfig(),
|
|
proxy=ProxyConfig(host="127.0.0.1", port=1080),
|
|
networks=nets,
|
|
)
|
|
|
|
|
|
def _backlog() -> AsyncMock:
|
|
bl = AsyncMock()
|
|
bl.get_last_seen = AsyncMock(return_value=0)
|
|
bl.replay = AsyncMock(return_value=[])
|
|
bl.mark_seen = AsyncMock()
|
|
return bl
|
|
|
|
|
|
def _mock_network(name: str = "libera", nick: str = "botnick",
|
|
connected: bool = True, channels: set[str] | None = None,
|
|
topics: dict[str, str] | None = None,
|
|
names: dict[str, set[str]] | None = None) -> MagicMock:
|
|
net = MagicMock()
|
|
net.cfg.name = name
|
|
net.nick = nick
|
|
net.connected = connected
|
|
net.channels = channels or set()
|
|
net.topics = topics or {}
|
|
net.names = names or {}
|
|
net.send = AsyncMock()
|
|
net.stop = AsyncMock()
|
|
net.start = AsyncMock()
|
|
return net
|
|
|
|
|
|
def _mock_client(nick: str = "testuser") -> MagicMock:
|
|
client = MagicMock()
|
|
client.nick = nick
|
|
client.write = MagicMock()
|
|
return client
|
|
|
|
|
|
def _msg(command: str, params: list[str] | None = None,
|
|
prefix: str | None = None, tags: dict | None = None) -> IRCMessage:
|
|
return IRCMessage(
|
|
command=command,
|
|
params=params or [],
|
|
prefix=prefix,
|
|
tags=tags or {},
|
|
)
|
|
|
|
|
|
# -- _suppress ---------------------------------------------------------------
|
|
|
|
class TestSuppress:
|
|
def test_suppresses_welcome_numerics(self) -> None:
|
|
for num in ("001", "002", "003", "004", "005"):
|
|
assert _suppress(_msg(num, ["nick", "text"])) is True
|
|
|
|
def test_suppresses_motd(self) -> None:
|
|
for num in ("375", "372", "376", "422"):
|
|
assert _suppress(_msg(num, ["nick", "text"])) is True
|
|
|
|
def test_suppresses_lusers(self) -> None:
|
|
for num in ("250", "251", "252", "253", "254", "255", "265", "266"):
|
|
assert _suppress(_msg(num, ["nick", "text"])) is True
|
|
|
|
def test_suppresses_uid_and_visiblehost(self) -> None:
|
|
assert _suppress(_msg("042", ["nick", "UID"])) is True
|
|
assert _suppress(_msg("396", ["nick", "host"])) is True
|
|
|
|
def test_suppresses_nick_in_use(self) -> None:
|
|
assert _suppress(_msg("433", ["*", "nick", "in use"])) is True
|
|
|
|
def test_suppresses_server_notice(self) -> None:
|
|
msg = _msg("NOTICE", ["nick", "server message"], prefix="server.example.com")
|
|
assert _suppress(msg) is True
|
|
|
|
def test_passes_user_notice(self) -> None:
|
|
msg = _msg("NOTICE", ["nick", "hello"], prefix="user!ident@host")
|
|
assert _suppress(msg) is False
|
|
|
|
def test_suppresses_connection_notice_star(self) -> None:
|
|
msg = _msg("NOTICE", ["*", "Looking up your hostname..."])
|
|
assert _suppress(msg) is True
|
|
|
|
def test_suppresses_connection_notice_auth(self) -> None:
|
|
msg = _msg("NOTICE", ["AUTH", "*** Checking Ident"])
|
|
assert _suppress(msg) is True
|
|
|
|
def test_suppresses_ctcp_reply_in_notice(self) -> None:
|
|
msg = _msg("NOTICE", ["nick", "\x01VERSION mIRC\x01"], prefix="user!i@h")
|
|
assert _suppress(msg) is True
|
|
|
|
def test_suppresses_ctcp_in_privmsg(self) -> None:
|
|
msg = _msg("PRIVMSG", ["nick", "\x01VERSION\x01"], prefix="user!i@h")
|
|
assert _suppress(msg) is True
|
|
|
|
def test_passes_action_in_privmsg(self) -> None:
|
|
msg = _msg("PRIVMSG", ["#ch", "\x01ACTION waves\x01"], prefix="user!i@h")
|
|
assert _suppress(msg) is False
|
|
|
|
def test_suppresses_user_mode(self) -> None:
|
|
msg = _msg("MODE", ["nick", "+i"])
|
|
assert _suppress(msg) is True
|
|
|
|
def test_passes_channel_mode(self) -> None:
|
|
msg = _msg("MODE", ["#channel", "+o", "nick"])
|
|
assert _suppress(msg) is False
|
|
|
|
def test_passes_normal_privmsg(self) -> None:
|
|
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="user!i@h")
|
|
assert _suppress(msg) is False
|
|
|
|
def test_passes_join(self) -> None:
|
|
msg = _msg("JOIN", ["#test"], prefix="user!i@h")
|
|
assert _suppress(msg) is False
|
|
|
|
def test_passes_part(self) -> None:
|
|
msg = _msg("PART", ["#test"], prefix="user!i@h")
|
|
assert _suppress(msg) is False
|
|
|
|
def test_passes_kick(self) -> None:
|
|
msg = _msg("KICK", ["#test", "nick", "reason"], prefix="op!i@h")
|
|
assert _suppress(msg) is False
|
|
|
|
def test_passes_topic(self) -> None:
|
|
msg = _msg("TOPIC", ["#test", "new topic"], prefix="user!i@h")
|
|
assert _suppress(msg) is False
|
|
|
|
def test_suppresses_dcc_send(self) -> None:
|
|
msg = _msg("PRIVMSG", ["nick", "\x01DCC SEND file 3232235777 5000 1024\x01"],
|
|
prefix="user!i@h")
|
|
assert _suppress(msg) is True
|
|
|
|
def test_suppresses_dcc_chat(self) -> None:
|
|
msg = _msg("PRIVMSG", ["nick", "\x01DCC CHAT chat 3232235777 5000\x01"],
|
|
prefix="user!i@h")
|
|
assert _suppress(msg) is True
|
|
|
|
|
|
# -- Router._proxy_for ------------------------------------------------------
|
|
|
|
class TestProxyFor:
|
|
def test_default_proxy(self) -> None:
|
|
cfg = _config()
|
|
router = Router(cfg, _backlog())
|
|
proxy = router._proxy_for(_net_cfg())
|
|
assert proxy.host == "127.0.0.1"
|
|
assert proxy.port == 1080
|
|
|
|
def test_per_network_proxy_override(self) -> None:
|
|
cfg = _config()
|
|
router = Router(cfg, _backlog())
|
|
net = _net_cfg(proxy_host="10.0.0.1", proxy_port=9050)
|
|
proxy = router._proxy_for(net)
|
|
assert proxy.host == "10.0.0.1"
|
|
assert proxy.port == 9050
|
|
|
|
def test_per_network_proxy_inherits_port(self) -> None:
|
|
cfg = _config()
|
|
router = Router(cfg, _backlog())
|
|
net = _net_cfg(proxy_host="10.0.0.1")
|
|
proxy = router._proxy_for(net)
|
|
assert proxy.host == "10.0.0.1"
|
|
assert proxy.port == 1080 # from global config
|
|
|
|
|
|
# -- Router init and network management --------------------------------------
|
|
|
|
class TestNetworkManagement:
|
|
def test_network_names_empty(self) -> None:
|
|
cfg = _config()
|
|
router = Router(cfg, _backlog())
|
|
assert router.network_names() == []
|
|
|
|
def test_get_network_none(self) -> None:
|
|
cfg = _config()
|
|
router = Router(cfg, _backlog())
|
|
assert router.get_network("nonexistent") is None
|
|
|
|
def test_get_network_found(self) -> None:
|
|
cfg = _config()
|
|
router = Router(cfg, _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
assert router.get_network("libera") is net
|
|
|
|
def test_network_names(self) -> None:
|
|
cfg = _config()
|
|
router = Router(cfg, _backlog())
|
|
router.networks["libera"] = _mock_network("libera")
|
|
router.networks["oftc"] = _mock_network("oftc")
|
|
assert sorted(router.network_names()) == ["libera", "oftc"]
|
|
|
|
def test_get_own_nicks(self) -> None:
|
|
cfg = _config()
|
|
router = Router(cfg, _backlog())
|
|
router.networks["libera"] = _mock_network("libera", nick="lnick")
|
|
router.networks["oftc"] = _mock_network("oftc", nick="onick")
|
|
nicks = router.get_own_nicks()
|
|
assert nicks == {"libera": "lnick", "oftc": "onick"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_network(self) -> None:
|
|
cfg = _config()
|
|
router = Router(cfg, _backlog())
|
|
net_cfg = _net_cfg("hackint", host="irc.hackint.org")
|
|
|
|
with patch("bouncer.router.Network") as MockNet:
|
|
mock_instance = MagicMock()
|
|
mock_instance.start = AsyncMock()
|
|
MockNet.return_value = mock_instance
|
|
result = await router.add_network(net_cfg)
|
|
|
|
assert "hackint" in router.networks
|
|
assert result is mock_instance
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_remove_network(self) -> None:
|
|
cfg = _config()
|
|
router = Router(cfg, _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
result = await router.remove_network("libera")
|
|
assert result is True
|
|
assert "libera" not in router.networks
|
|
net.stop.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_remove_network_not_found(self) -> None:
|
|
cfg = _config()
|
|
router = Router(cfg, _backlog())
|
|
result = await router.remove_network("nonexistent")
|
|
assert result is False
|
|
|
|
|
|
# -- Client attach/detach ---------------------------------------------------
|
|
|
|
class TestClientAttachDetach:
|
|
@pytest.mark.asyncio
|
|
async def test_attach_adds_client(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
client = _mock_client()
|
|
await router.attach_all(client)
|
|
assert client in router.clients
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_detach_removes_client(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
client = _mock_client()
|
|
await router.attach_all(client)
|
|
await router.detach_all(client)
|
|
assert client not in router.clients
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_detach_missing_client_no_error(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
client = _mock_client()
|
|
await router.detach_all(client) # should not raise
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_clients(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
c1 = _mock_client("user1")
|
|
c2 = _mock_client("user2")
|
|
await router.attach_all(c1)
|
|
await router.attach_all(c2)
|
|
assert len(router.clients) == 2
|
|
|
|
await router.detach_all(c1)
|
|
assert len(router.clients) == 1
|
|
assert c2 in router.clients
|
|
|
|
|
|
# -- stop_networks -----------------------------------------------------------
|
|
|
|
class TestStopNetworks:
|
|
@pytest.mark.asyncio
|
|
async def test_stops_all(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
n1 = _mock_network("libera")
|
|
n2 = _mock_network("oftc")
|
|
router.networks = {"libera": n1, "oftc": n2}
|
|
|
|
await router.stop_networks()
|
|
n1.stop.assert_awaited_once()
|
|
n2.stop.assert_awaited_once()
|
|
|
|
|
|
# -- route_client_message (outbound: client -> network) ----------------------
|
|
|
|
class TestRouteClientMessage:
|
|
@pytest.mark.asyncio
|
|
async def test_privmsg_to_channel(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("PRIVMSG", ["#test/libera", "hello"])
|
|
await router.route_client_message(msg)
|
|
|
|
net.send.assert_awaited_once()
|
|
sent = net.send.call_args[0][0]
|
|
assert sent.command == "PRIVMSG"
|
|
assert sent.params == ["#test", "hello"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_privmsg_to_nick(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("PRIVMSG", ["user123/libera", "hi there"])
|
|
await router.route_client_message(msg)
|
|
|
|
sent = net.send.call_args[0][0]
|
|
assert sent.params == ["user123", "hi there"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_namespace_dropped(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("PRIVMSG", ["#test", "hello"])
|
|
await router.route_client_message(msg)
|
|
net.send.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unknown_network_dropped(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("PRIVMSG", ["#test/fakenet", "hello"])
|
|
await router.route_client_message(msg)
|
|
net.send.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_disconnected_network_dropped(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera", connected=False)
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("PRIVMSG", ["#test/libera", "hello"])
|
|
await router.route_client_message(msg)
|
|
net.send.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_params_ignored(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
msg = _msg("PRIVMSG")
|
|
await router.route_client_message(msg) # should not raise
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_join_single_channel(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("JOIN", ["#dev/libera"])
|
|
await router.route_client_message(msg)
|
|
|
|
sent = net.send.call_args[0][0]
|
|
assert sent.command == "JOIN"
|
|
assert sent.params == ["#dev"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_join_comma_separated_same_network(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("JOIN", ["#a/libera,#b/libera"])
|
|
await router.route_client_message(msg)
|
|
|
|
sent = net.send.call_args[0][0]
|
|
assert sent.params[0] == "#a,#b"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_join_comma_separated_multi_network(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
n1 = _mock_network("libera")
|
|
n2 = _mock_network("oftc")
|
|
router.networks = {"libera": n1, "oftc": n2}
|
|
|
|
msg = _msg("JOIN", ["#a/libera,#b/oftc"])
|
|
await router.route_client_message(msg)
|
|
|
|
assert n1.send.await_count == 1
|
|
assert n2.send.await_count == 1
|
|
s1 = n1.send.call_args[0][0]
|
|
s2 = n2.send.call_args[0][0]
|
|
assert s1.params[0] == "#a"
|
|
assert s2.params[0] == "#b"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_part(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("PART", ["#dev/libera", "leaving"])
|
|
await router.route_client_message(msg)
|
|
|
|
sent = net.send.call_args[0][0]
|
|
assert sent.command == "PART"
|
|
assert sent.params == ["#dev", "leaving"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_kick(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("KICK", ["#test/libera", "baduser/libera", "bye"])
|
|
await router.route_client_message(msg)
|
|
|
|
sent = net.send.call_args[0][0]
|
|
assert sent.command == "KICK"
|
|
assert sent.params == ["#test", "baduser", "bye"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_kick_no_namespace_dropped(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("KICK", ["#test", "baduser"])
|
|
await router.route_client_message(msg)
|
|
net.send.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invite(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("INVITE", ["user/libera", "#test/libera"])
|
|
await router.route_client_message(msg)
|
|
|
|
sent = net.send.call_args[0][0]
|
|
assert sent.command == "INVITE"
|
|
assert sent.params == ["user", "#test"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invite_network_from_either_param(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
# Network suffix only on the nick, not the channel
|
|
msg = _msg("INVITE", ["user/libera", "#test"])
|
|
await router.route_client_message(msg)
|
|
net.send.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mode_channel(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("MODE", ["#test/libera", "+o", "nick"])
|
|
await router.route_client_message(msg)
|
|
|
|
sent = net.send.call_args[0][0]
|
|
assert sent.params == ["#test", "+o", "nick"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_who(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("WHO", ["#test/libera"])
|
|
await router.route_client_message(msg)
|
|
|
|
sent = net.send.call_args[0][0]
|
|
assert sent.params == ["#test"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_notice(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("NOTICE", ["user/libera", "you've been warned"])
|
|
await router.route_client_message(msg)
|
|
|
|
sent = net.send.call_args[0][0]
|
|
assert sent.params == ["user", "you've been warned"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_topic(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("TOPIC", ["#test/libera", "new topic"])
|
|
await router.route_client_message(msg)
|
|
|
|
sent = net.send.call_args[0][0]
|
|
assert sent.params == ["#test", "new topic"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_preserves_tags(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("PRIVMSG", ["#test/libera", "hi"],
|
|
tags={"label": "abc"})
|
|
await router.route_client_message(msg)
|
|
|
|
sent = net.send.call_args[0][0]
|
|
assert sent.tags == {"label": "abc"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_preserves_prefix(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("PRIVMSG", ["#test/libera", "hi"], prefix="me!u@h")
|
|
await router.route_client_message(msg)
|
|
|
|
sent = net.send.call_args[0][0]
|
|
assert sent.prefix == "me!u@h"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_blocks_outbound_dcc_send(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("PRIVMSG", ["user/libera", "\x01DCC SEND file 3232235777 5000 1024\x01"])
|
|
await router.route_client_message(msg)
|
|
net.send.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_blocks_outbound_dcc_chat(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("PRIVMSG", ["user/libera", "\x01DCC CHAT chat 3232235777 5000\x01"])
|
|
await router.route_client_message(msg)
|
|
net.send.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_passes_outbound_action(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("PRIVMSG", ["#test/libera", "\x01ACTION waves\x01"])
|
|
await router.route_client_message(msg)
|
|
net.send.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_passes_outbound_normal_privmsg(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera")
|
|
router.networks["libera"] = net
|
|
|
|
msg = _msg("PRIVMSG", ["#test/libera", "just a normal message"])
|
|
await router.route_client_message(msg)
|
|
net.send.assert_awaited_once()
|
|
|
|
|
|
# -- _dispatch (inbound: network -> clients) ---------------------------------
|
|
|
|
class TestDispatch:
|
|
@pytest.mark.asyncio
|
|
async def test_delivers_to_all_clients(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
net = _mock_network("libera", nick="bot")
|
|
router.networks["libera"] = net
|
|
c1 = _mock_client("user1")
|
|
c2 = _mock_client("user2")
|
|
router.clients = [c1, c2]
|
|
|
|
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="sender!u@h")
|
|
await router._dispatch("libera", msg)
|
|
|
|
assert c1.write.call_count == 1
|
|
assert c2.write.call_count == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_suppressed_not_delivered(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
router.networks["libera"] = _mock_network("libera")
|
|
client = _mock_client()
|
|
router.clients = [client]
|
|
|
|
msg = _msg("001", ["nick", "Welcome"])
|
|
await router._dispatch("libera", msg)
|
|
client.write.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_namespaces_channel(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
router.networks["libera"] = _mock_network("libera", nick="bot")
|
|
client = _mock_client("me")
|
|
router.clients = [client]
|
|
|
|
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="user!i@h")
|
|
await router._dispatch("libera", msg)
|
|
|
|
written = client.write.call_args[0][0]
|
|
assert b"#test/libera" in written
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_namespaces_prefix(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
router.networks["libera"] = _mock_network("libera", nick="bot")
|
|
client = _mock_client("me")
|
|
router.clients = [client]
|
|
|
|
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="sender!i@h")
|
|
await router._dispatch("libera", msg)
|
|
|
|
written = client.write.call_args[0][0]
|
|
assert b"sender/libera" in written
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_own_nick_rewritten_to_client_nick(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
router.networks["libera"] = _mock_network("libera", nick="bot")
|
|
client = _mock_client("clientnick")
|
|
router.clients = [client]
|
|
|
|
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="bot!i@h")
|
|
await router._dispatch("libera", msg)
|
|
|
|
written = client.write.call_args[0][0]
|
|
assert b"clientnick" in written
|
|
assert b"bot/libera" not in written
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_write_error_handled(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
router.networks["libera"] = _mock_network("libera", nick="bot")
|
|
bad_client = _mock_client()
|
|
bad_client.write.side_effect = ConnectionResetError
|
|
good_client = _mock_client()
|
|
router.clients = [bad_client, good_client]
|
|
|
|
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="user!i@h")
|
|
await router._dispatch("libera", msg)
|
|
|
|
# Bad client raised, but good client still received
|
|
good_client.write.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_clients_no_error(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
router.networks["libera"] = _mock_network("libera")
|
|
router.clients = []
|
|
|
|
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="user!i@h")
|
|
await router._dispatch("libera", msg) # should not raise
|
|
|
|
|
|
# -- _on_network_status ------------------------------------------------------
|
|
|
|
class TestOnNetworkStatus:
|
|
def test_broadcasts_to_all_clients(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
c1 = _mock_client()
|
|
c2 = _mock_client()
|
|
router.clients = [c1, c2]
|
|
|
|
router._on_network_status("libera", "connection stable")
|
|
|
|
assert c1.write.call_count == 1
|
|
assert c2.write.call_count == 1
|
|
written = c1.write.call_args[0][0]
|
|
assert b"[libera] connection stable" in written
|
|
assert b"bouncer" in written # prefix
|
|
|
|
def test_client_error_does_not_propagate(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
bad = _mock_client()
|
|
bad.write.side_effect = ConnectionResetError
|
|
good = _mock_client()
|
|
router.clients = [bad, good]
|
|
|
|
router._on_network_status("libera", "test")
|
|
good.write.assert_called_once()
|
|
|
|
|
|
# -- _on_network_message (sync -> async bridge) -----------------------------
|
|
|
|
class TestOnNetworkMessage:
|
|
@pytest.mark.asyncio
|
|
async def test_creates_dispatch_task(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
router.networks["libera"] = _mock_network("libera")
|
|
|
|
with patch.object(router, "_dispatch", new_callable=AsyncMock) as mock_disp:
|
|
msg = _msg("PRIVMSG", ["#test", "hi"], prefix="u!i@h")
|
|
router._on_network_message("libera", msg)
|
|
# Let the task run
|
|
await asyncio.sleep(0)
|
|
|
|
mock_disp.assert_awaited_once_with("libera", msg)
|
|
|
|
|
|
# -- _replay_backlog ---------------------------------------------------------
|
|
|
|
class TestReplayBacklog:
|
|
@pytest.mark.asyncio
|
|
async def test_empty_backlog(self) -> None:
|
|
bl = _backlog()
|
|
router = Router(_config(), bl)
|
|
router.networks["libera"] = _mock_network("libera", nick="bot")
|
|
client = _mock_client()
|
|
|
|
await router._replay_backlog(client, "libera")
|
|
client.write.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_replays_messages(self) -> None:
|
|
bl = _backlog()
|
|
entry1 = MagicMock()
|
|
entry1.id = 1
|
|
entry1.command = "PRIVMSG"
|
|
entry1.target = "#test"
|
|
entry1.content = "hello"
|
|
entry1.sender = "user!i@h"
|
|
entry2 = MagicMock()
|
|
entry2.id = 2
|
|
entry2.command = "PRIVMSG"
|
|
entry2.target = "#test"
|
|
entry2.content = "world"
|
|
entry2.sender = "other!i@h"
|
|
bl.replay.return_value = [entry1, entry2]
|
|
|
|
router = Router(_config(), bl)
|
|
router.networks["libera"] = _mock_network("libera", nick="bot")
|
|
client = _mock_client("me")
|
|
|
|
await router._replay_backlog(client, "libera")
|
|
|
|
assert client.write.call_count == 2
|
|
# Verify namespaced
|
|
first = client.write.call_args_list[0][0][0]
|
|
assert b"#test/libera" in first
|
|
# Should mark last seen
|
|
bl.mark_seen.assert_awaited_once_with("libera", 2)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_replays_since_last_seen(self) -> None:
|
|
bl = _backlog()
|
|
bl.get_last_seen.return_value = 42
|
|
|
|
router = Router(_config(), bl)
|
|
router.networks["libera"] = _mock_network("libera", nick="bot")
|
|
client = _mock_client()
|
|
|
|
await router._replay_backlog(client, "libera")
|
|
bl.replay.assert_awaited_once_with("libera", since_id=42)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_suppressed_skipped_during_replay(self) -> None:
|
|
bl = _backlog()
|
|
entry = MagicMock()
|
|
entry.id = 1
|
|
entry.command = "NOTICE"
|
|
entry.target = "nick"
|
|
entry.content = "\x01VERSION mIRC\x01"
|
|
entry.sender = "server.example.com" # no '!' -> server notice
|
|
bl.replay.return_value = [entry]
|
|
|
|
router = Router(_config(), bl)
|
|
router.networks["libera"] = _mock_network("libera", nick="bot")
|
|
client = _mock_client()
|
|
|
|
await router._replay_backlog(client, "libera")
|
|
client.write.assert_not_called()
|
|
# Still marks seen even if all suppressed
|
|
bl.mark_seen.assert_awaited_once_with("libera", 1)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_error_stops_replay(self) -> None:
|
|
bl = _backlog()
|
|
entry1 = MagicMock()
|
|
entry1.id = 1
|
|
entry1.command = "PRIVMSG"
|
|
entry1.target = "#test"
|
|
entry1.content = "msg1"
|
|
entry1.sender = "user!i@h"
|
|
entry2 = MagicMock()
|
|
entry2.id = 2
|
|
entry2.command = "PRIVMSG"
|
|
entry2.target = "#test"
|
|
entry2.content = "msg2"
|
|
entry2.sender = "user!i@h"
|
|
bl.replay.return_value = [entry1, entry2]
|
|
|
|
router = Router(_config(), bl)
|
|
router.networks["libera"] = _mock_network("libera", nick="bot")
|
|
client = _mock_client()
|
|
client.write.side_effect = [ConnectionResetError, None]
|
|
|
|
await router._replay_backlog(client, "libera")
|
|
# Should have stopped after first error
|
|
assert client.write.call_count == 1
|
|
|
|
|
|
# -- BACKLOG_COMMANDS constant -----------------------------------------------
|
|
|
|
class TestBacklogCommands:
|
|
def test_expected_commands(self) -> None:
|
|
assert "PRIVMSG" in BACKLOG_COMMANDS
|
|
assert "NOTICE" in BACKLOG_COMMANDS
|
|
assert "TOPIC" in BACKLOG_COMMANDS
|
|
assert "KICK" in BACKLOG_COMMANDS
|
|
assert "MODE" in BACKLOG_COMMANDS
|
|
|
|
def test_join_not_in_backlog(self) -> None:
|
|
assert "JOIN" not in BACKLOG_COMMANDS
|
|
assert "PART" not in BACKLOG_COMMANDS
|
|
assert "QUIT" not in BACKLOG_COMMANDS
|
|
|
|
|
|
# -- server-time tag injection -----------------------------------------------
|
|
|
|
class TestServerTimeDispatch:
|
|
@pytest.mark.asyncio
|
|
async def test_injects_time_tag_when_missing(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
router.networks["libera"] = _mock_network("libera", nick="bot")
|
|
client = _mock_client("me")
|
|
router.clients = [client]
|
|
|
|
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="user!i@h")
|
|
await router._dispatch("libera", msg)
|
|
|
|
assert "time" in msg.tags
|
|
# Verify ISO8601 format
|
|
assert msg.tags["time"].endswith("Z")
|
|
assert "T" in msg.tags["time"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_preserves_existing_time_tag(self) -> None:
|
|
router = Router(_config(), _backlog())
|
|
router.networks["libera"] = _mock_network("libera", nick="bot")
|
|
client = _mock_client("me")
|
|
router.clients = [client]
|
|
|
|
original_time = "2025-01-15T12:00:00.000000Z"
|
|
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="user!i@h",
|
|
tags={"time": original_time})
|
|
await router._dispatch("libera", msg)
|
|
|
|
assert msg.tags["time"] == original_time
|
|
|
|
|
|
class TestServerTimeReplay:
|
|
@pytest.mark.asyncio
|
|
async def test_replay_injects_timestamp(self) -> None:
|
|
bl = _backlog()
|
|
entry = MagicMock()
|
|
entry.id = 1
|
|
entry.command = "PRIVMSG"
|
|
entry.target = "#test"
|
|
entry.content = "hello"
|
|
entry.sender = "user!i@h"
|
|
entry.timestamp = 1705320000.0 # 2024-01-15T12:00:00Z
|
|
bl.replay.return_value = [entry]
|
|
|
|
router = Router(_config(), bl)
|
|
router.networks["libera"] = _mock_network("libera", nick="bot")
|
|
client = _mock_client("me")
|
|
|
|
await router._replay_backlog(client, "libera")
|
|
|
|
written = client.write.call_args[0][0]
|
|
# The time tag should be in the wire format
|
|
assert b"time=" in written
|
|
|
|
|
|
# -- push notifications ------------------------------------------------------
|
|
|
|
class TestNotifications:
|
|
@pytest.mark.asyncio
|
|
async def test_notification_triggered_on_pm_no_clients(self) -> None:
|
|
cfg = _config()
|
|
cfg.bouncer.notify_url = "https://ntfy.sh/test"
|
|
router = Router(cfg, _backlog())
|
|
net = _mock_network("libera", nick="bot")
|
|
router.networks["libera"] = net
|
|
router.clients = [] # no clients
|
|
|
|
with patch.object(router._notifier, "should_notify", return_value=True):
|
|
with patch.object(router._notifier, "send", new_callable=AsyncMock) as mock_send:
|
|
msg = _msg("PRIVMSG", ["bot", "hello bot"], prefix="user!i@h")
|
|
await router._dispatch("libera", msg)
|
|
# Let fire-and-forget task run
|
|
await asyncio.sleep(0)
|
|
|
|
mock_send.assert_awaited_once_with("libera", "user", "bot", "hello bot")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_notification_not_triggered_with_clients(self) -> None:
|
|
cfg = _config()
|
|
cfg.bouncer.notify_url = "https://ntfy.sh/test"
|
|
router = Router(cfg, _backlog())
|
|
net = _mock_network("libera", nick="bot")
|
|
router.networks["libera"] = net
|
|
client = _mock_client()
|
|
router.clients = [client]
|
|
|
|
with patch.object(router._notifier, "send", new_callable=AsyncMock) as mock_send:
|
|
msg = _msg("PRIVMSG", ["bot", "hello bot"], prefix="user!i@h")
|
|
await router._dispatch("libera", msg)
|
|
await asyncio.sleep(0)
|
|
|
|
mock_send.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_notification_respects_should_notify(self) -> None:
|
|
cfg = _config()
|
|
cfg.bouncer.notify_url = "https://ntfy.sh/test"
|
|
router = Router(cfg, _backlog())
|
|
net = _mock_network("libera", nick="bot")
|
|
router.networks["libera"] = net
|
|
router.clients = []
|
|
|
|
with patch.object(router._notifier, "should_notify", return_value=False):
|
|
with patch.object(router._notifier, "send", new_callable=AsyncMock) as mock_send:
|
|
msg = _msg("PRIVMSG", ["#channel", "random msg"], prefix="user!i@h")
|
|
await router._dispatch("libera", msg)
|
|
await asyncio.sleep(0)
|
|
|
|
mock_send.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_notification_disabled_skips(self) -> None:
|
|
cfg = _config()
|
|
cfg.bouncer.notify_url = "" # disabled
|
|
router = Router(cfg, _backlog())
|
|
net = _mock_network("libera", nick="bot")
|
|
router.networks["libera"] = net
|
|
router.clients = []
|
|
|
|
with patch.object(router._notifier, "send", new_callable=AsyncMock) as mock_send:
|
|
msg = _msg("PRIVMSG", ["bot", "hello"], prefix="user!i@h")
|
|
await router._dispatch("libera", msg)
|
|
await asyncio.sleep(0)
|
|
|
|
mock_send.assert_not_awaited()
|