feat: PING watchdog, IRCv3 server-time, push notifications

PING watchdog sends PING after configurable silence interval and
disconnects on timeout, detecting stale connections that TCP alone
misses. IRCv3 server-time capability is requested on every connection;
timestamps are injected on dispatch and backlog replay for clients
that support message tags. Push notifications via ntfy or generic
webhook fire on highlights and PMs when no clients are attached,
with configurable cooldown and optional SOCKS5 routing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-21 17:41:38 +01:00
parent 4dd817ea75
commit 0d762ced49
10 changed files with 1733 additions and 27 deletions

View File

@@ -4,8 +4,8 @@ from __future__ import annotations
import asyncio
import base64
import hashlib
import random
import time
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@@ -13,13 +13,10 @@ import pytest
from bouncer.config import BouncerConfig, NetworkConfig, ProxyConfig
from bouncer.irc import IRCMessage, parse
from bouncer.network import (
Network,
State,
_BIGRAMS,
_GENERIC_IDENTS,
_GENERIC_REALNAMES,
_STARTERS,
_VOWELS,
Network,
State,
_markov_word,
_nick_for_host,
_password_for_host,
@@ -28,7 +25,6 @@ from bouncer.network import (
_seeded_markov,
)
# -- helpers -----------------------------------------------------------------
def _cfg(name: str = "testnet", host: str = "irc.test.net", port: int = 6697,
@@ -854,9 +850,11 @@ class TestConnect:
user_sent = any(b"USER " in c for c in calls)
assert nick_sent
assert user_sent
# Should NOT have sent CAP REQ sasl
cap_sent = any(b"CAP REQ" in c for c in calls)
assert not cap_sent
# Should have sent CAP REQ server-time but NOT CAP REQ sasl
cap_server_time = any(b"CAP REQ server-time" in c for c in calls)
cap_sasl = any(b"CAP REQ sasl" in c for c in calls)
assert cap_server_time
assert not cap_sasl
@pytest.mark.asyncio
async def test_connect_with_sasl_plain(self) -> None:
@@ -1201,3 +1199,206 @@ class TestReadLoop:
await net._read_loop()
assert cb.call_count == 2
# -- PING watchdog -----------------------------------------------------------
class TestPingWatchdog:
@pytest.mark.asyncio
async def test_timeout_triggers_disconnect(self) -> None:
"""Stale connection triggers disconnect + reconnect."""
net = _net(bouncer_cfg=_bouncer(ping_interval=0, ping_timeout=0))
net.state = State.READY
net._running = True
net._last_recv = time.monotonic() - 1000 # stale
writer = MagicMock()
writer.is_closing.return_value = False
writer.drain = AsyncMock()
net._writer = writer
with patch.object(net, "_disconnect", new_callable=AsyncMock) as mock_disc:
with patch.object(net, "_schedule_reconnect") as mock_recon:
await net._ping_watchdog()
mock_disc.assert_awaited_once()
mock_recon.assert_called_once()
@pytest.mark.asyncio
async def test_healthy_connection_stays_alive(self) -> None:
"""Fresh data during timeout window prevents disconnect."""
net = _net(bouncer_cfg=_bouncer(ping_interval=10, ping_timeout=5))
net.state = State.READY
net._running = True
writer = MagicMock()
writer.is_closing.return_value = False
writer.drain = AsyncMock()
net._writer = writer
# Control time progression: stale at first, then fresh after PING
clock = [100.0] # start at T=100
def fake_monotonic() -> float:
return clock[0]
original_sleep = asyncio.sleep
call_count = 0
async def fake_sleep(delay: float) -> None:
nonlocal call_count
call_count += 1
if call_count == 1:
# After interval sleep: time advanced, data is stale -> PING sent
clock[0] = 120.0
net._last_recv = 100.0 # stale (20s > interval=10)
elif call_count == 2:
# During timeout wait: simulate PONG received (fresh data)
clock[0] = 122.0
net._last_recv = 122.0 # fresh
elif call_count == 3:
# Next interval sleep: exit loop
net.state = State.DISCONNECTED
await original_sleep(0)
with patch("time.monotonic", side_effect=fake_monotonic):
with patch("asyncio.sleep", side_effect=fake_sleep):
with patch.object(net, "_disconnect", new_callable=AsyncMock) as mock_disc:
await net._ping_watchdog()
# PING was sent, but fresh data arrived -- no disconnect
ping_sent = any(b"PING" in c.args[0] for c in writer.write.call_args_list)
assert ping_sent
mock_disc.assert_not_awaited()
@pytest.mark.asyncio
async def test_watchdog_cancelled_on_disconnect(self) -> None:
net = _net()
net.state = State.READY
net._running = True
net._last_recv = time.monotonic()
task = MagicMock()
task.done.return_value = False
net._ping_task = task
await net._disconnect()
task.cancel.assert_called_once()
assert net._ping_task is None
@pytest.mark.asyncio
async def test_ping_task_started_in_go_ready(self) -> None:
net = _net(bouncer_cfg=_bouncer(probation_seconds=0, ping_interval=999))
net.state = State.PROBATION
net._running = True
net._sasl_complete.set()
writer = MagicMock()
writer.is_closing.return_value = False
writer.drain = AsyncMock()
net._writer = writer
bl = _mock_backlog()
net.backlog = bl
await net._go_ready()
assert net._ping_task is not None
# Cancel it so test doesn't leak
net._ping_task.cancel()
try:
await net._ping_task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_ping_task_in_stop(self) -> None:
"""stop() cancels the ping task."""
net = _net()
net._running = True
ping_task = MagicMock()
ping_task.done.return_value = False
net._ping_task = ping_task
await net.stop()
# cancel() is called in both stop() and _disconnect()
assert ping_task.cancel.call_count >= 1
# -- IRCv3 CAP negotiation (server-time) ------------------------------------
class TestCapServerTime:
@pytest.mark.asyncio
async def test_server_time_ack_sets_flag(self) -> None:
net = _net()
writer = MagicMock()
writer.is_closing.return_value = False
writer.drain = AsyncMock()
net._writer = writer
net._caps_pending = 1
await net._handle(_msg(":server CAP * ACK :server-time"))
assert net._server_time is True
@pytest.mark.asyncio
async def test_server_time_nak_handled(self) -> None:
net = _net()
writer = MagicMock()
writer.is_closing.return_value = False
writer.drain = AsyncMock()
net._writer = writer
net._caps_pending = 1
await net._handle(_msg(":server CAP * NAK :server-time"))
assert net._server_time is False
@pytest.mark.asyncio
async def test_combined_sasl_and_server_time(self) -> None:
"""Both caps requested: ACK server-time + ACK sasl."""
net = _net()
net._sasl_mechanism = "PLAIN"
net._sasl_nick = "nick"
net._sasl_pass = "pass"
net._caps_pending = 2
writer = MagicMock()
writer.is_closing.return_value = False
writer.drain = AsyncMock()
net._writer = writer
# ACK server-time first
await net._handle(_msg(":server CAP * ACK :server-time"))
assert net._server_time is True
# Should NOT have sent CAP END yet (SASL still pending)
cap_end_calls = [c for c in writer.write.call_args_list
if b"CAP END" in c.args[0]]
assert len(cap_end_calls) == 0
# ACK sasl starts AUTHENTICATE flow
await net._handle(_msg(":server CAP * ACK :sasl"))
writer.write.assert_called_with(b"AUTHENTICATE PLAIN\r\n")
# SASL success resolves the last cap
await net._handle(_msg(":server 903 nick :SASL authentication successful"))
cap_end_calls = [c for c in writer.write.call_args_list
if b"CAP END" in c.args[0]]
assert len(cap_end_calls) == 1
@pytest.mark.asyncio
async def test_cap_end_after_all_resolved(self) -> None:
"""CAP END sent only after all caps are resolved."""
net = _net()
writer = MagicMock()
writer.is_closing.return_value = False
writer.drain = AsyncMock()
net._writer = writer
net._caps_pending = 1 # only server-time, no SASL
await net._handle(_msg(":server CAP * ACK :server-time"))
cap_end_calls = [c for c in writer.write.call_args_list
if b"CAP END" in c.args[0]]
assert len(cap_end_calls) == 1
@pytest.mark.asyncio
async def test_server_time_property(self) -> None:
net = _net()
assert net.server_time is False
net._server_time = True
assert net.server_time is True

185
tests/test_notify.py Normal file
View File

@@ -0,0 +1,185 @@
"""Tests for push notification module."""
from __future__ import annotations
import time
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from bouncer.config import BouncerConfig, ProxyConfig
from bouncer.notify import Notifier
# -- helpers -----------------------------------------------------------------
def _cfg(**overrides: object) -> BouncerConfig:
defaults: dict[str, object] = {
"notify_url": "https://ntfy.sh/bouncer",
"notify_on_highlight": True,
"notify_on_privmsg": True,
"notify_cooldown": 60,
"notify_proxy": False,
}
defaults.update(overrides)
return BouncerConfig(**defaults) # type: ignore[arg-type]
def _proxy() -> ProxyConfig:
return ProxyConfig(host="127.0.0.1", port=1080)
def _notifier(**overrides: object) -> Notifier:
return Notifier(_cfg(**overrides), _proxy())
# -- enabled -----------------------------------------------------------------
class TestEnabled:
def test_enabled_with_url(self) -> None:
n = _notifier(notify_url="https://ntfy.sh/test")
assert n.enabled is True
def test_disabled_without_url(self) -> None:
n = _notifier(notify_url="")
assert n.enabled is False
# -- should_notify -----------------------------------------------------------
class TestShouldNotify:
def test_pm_triggers(self) -> None:
n = _notifier()
assert n.should_notify("sender", "mynick", "hello", "mynick") is True
def test_highlight_triggers(self) -> None:
n = _notifier()
assert n.should_notify("sender", "#channel", "hey mynick!", "mynick") is True
def test_highlight_case_insensitive(self) -> None:
n = _notifier()
assert n.should_notify("sender", "#channel", "hey MYNICK!", "mynick") is True
def test_normal_channel_msg_does_not_trigger(self) -> None:
n = _notifier()
assert n.should_notify("sender", "#channel", "hello world", "mynick") is False
def test_disabled_does_not_trigger(self) -> None:
n = _notifier(notify_url="")
assert n.should_notify("sender", "mynick", "hello", "mynick") is False
def test_pm_disabled(self) -> None:
n = _notifier(notify_on_privmsg=False)
assert n.should_notify("sender", "mynick", "hello", "mynick") is False
def test_highlight_disabled(self) -> None:
n = _notifier(notify_on_highlight=False)
assert n.should_notify("sender", "#channel", "hey mynick!", "mynick") is False
def test_cooldown_respected(self) -> None:
n = _notifier(notify_cooldown=60)
n._last_sent = time.monotonic() # just sent
assert n.should_notify("sender", "mynick", "hello", "mynick") is False
def test_cooldown_expired(self) -> None:
n = _notifier(notify_cooldown=60)
n._last_sent = time.monotonic() - 120 # expired
assert n.should_notify("sender", "mynick", "hello", "mynick") is True
def test_channel_prefixes(self) -> None:
"""Targets starting with #, &, +, ! are channels, not PMs."""
n = _notifier()
for prefix in ("#", "&", "+", "!"):
target = f"{prefix}channel"
assert n.should_notify("sender", target, "hello", "mynick") is False
# -- _is_ntfy ---------------------------------------------------------------
class TestIsNtfy:
def test_ntfy_sh(self) -> None:
n = _notifier(notify_url="https://ntfy.sh/mytopic")
assert n._is_ntfy() is True
def test_self_hosted_ntfy(self) -> None:
n = _notifier(notify_url="https://ntfy.example.com/mytopic")
assert n._is_ntfy() is True
def test_generic_webhook(self) -> None:
n = _notifier(notify_url="https://hooks.example.com/webhook")
assert n._is_ntfy() is False
# -- send --------------------------------------------------------------------
class TestSend:
@pytest.mark.asyncio
async def test_ntfy_sends_post(self) -> None:
n = _notifier(notify_url="https://ntfy.sh/bouncer")
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
mock_session = AsyncMock()
mock_session.post = MagicMock(return_value=mock_resp)
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=False)
with patch("bouncer.notify.aiohttp.ClientSession", return_value=mock_session):
await n.send("libera", "user", "#test", "hello world")
mock_session.post.assert_called_once()
call_kwargs = mock_session.post.call_args
assert call_kwargs[1]["data"] == b"hello world"
assert "Title" in call_kwargs[1]["headers"]
assert n._last_sent > 0
@pytest.mark.asyncio
async def test_webhook_sends_json(self) -> None:
n = _notifier(notify_url="https://hooks.example.com/webhook")
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
mock_session = AsyncMock()
mock_session.post = MagicMock(return_value=mock_resp)
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=False)
with patch("bouncer.notify.aiohttp.ClientSession", return_value=mock_session):
await n.send("libera", "user", "#test", "hello world")
call_kwargs = mock_session.post.call_args
assert call_kwargs[1]["json"]["network"] == "libera"
assert call_kwargs[1]["json"]["sender"] == "user"
@pytest.mark.asyncio
async def test_proxy_connector_used(self) -> None:
n = _notifier(notify_url="https://ntfy.sh/test", notify_proxy=True)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
mock_session = AsyncMock()
mock_session.post = MagicMock(return_value=mock_resp)
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=False)
with patch("bouncer.notify.aiohttp.ClientSession", return_value=mock_session):
with patch("bouncer.notify.Notifier._send_ntfy", new_callable=AsyncMock):
with patch("aiohttp_socks.ProxyConnector.from_url") as mock_proxy:
mock_proxy.return_value = MagicMock()
await n.send("libera", "user", "#test", "hello")
mock_proxy.assert_called_once_with("socks5://127.0.0.1:1080")
@pytest.mark.asyncio
async def test_send_error_does_not_raise(self) -> None:
"""Send errors are logged, not propagated."""
n = _notifier(notify_url="https://ntfy.sh/test")
with patch("bouncer.notify.aiohttp.ClientSession", side_effect=Exception("boom")):
await n.send("libera", "user", "#test", "hello") # should not raise

932
tests/test_router.py Normal file
View File

@@ -0,0 +1,932 @@
"""Tests for message router."""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from bouncer.config import BouncerConfig, Config, NetworkConfig, ProxyConfig
from bouncer.irc import IRCMessage
from bouncer.router import BACKLOG_COMMANDS, Router, _suppress
# -- helpers -----------------------------------------------------------------
def _net_cfg(name: str = "libera", host: str = "irc.libera.chat",
port: int = 6697, tls: bool = True,
proxy_host: str | None = None,
proxy_port: int | None = None) -> NetworkConfig:
return NetworkConfig(
name=name, host=host, port=port, tls=tls,
proxy_host=proxy_host, proxy_port=proxy_port,
)
def _config(*net_cfgs: NetworkConfig) -> Config:
nets = {n.name: n for n in net_cfgs} if net_cfgs else {
"libera": _net_cfg("libera"),
}
return Config(
bouncer=BouncerConfig(),
proxy=ProxyConfig(host="127.0.0.1", port=1080),
networks=nets,
)
def _backlog() -> AsyncMock:
bl = AsyncMock()
bl.get_last_seen = AsyncMock(return_value=0)
bl.replay = AsyncMock(return_value=[])
bl.mark_seen = AsyncMock()
return bl
def _mock_network(name: str = "libera", nick: str = "botnick",
connected: bool = True, channels: set[str] | None = None,
topics: dict[str, str] | None = None,
names: dict[str, set[str]] | None = None) -> MagicMock:
net = MagicMock()
net.cfg.name = name
net.nick = nick
net.connected = connected
net.channels = channels or set()
net.topics = topics or {}
net.names = names or {}
net.send = AsyncMock()
net.stop = AsyncMock()
net.start = AsyncMock()
return net
def _mock_client(nick: str = "testuser") -> MagicMock:
client = MagicMock()
client.nick = nick
client.write = MagicMock()
return client
def _msg(command: str, params: list[str] | None = None,
prefix: str | None = None, tags: dict | None = None) -> IRCMessage:
return IRCMessage(
command=command,
params=params or [],
prefix=prefix,
tags=tags or {},
)
# -- _suppress ---------------------------------------------------------------
class TestSuppress:
def test_suppresses_welcome_numerics(self) -> None:
for num in ("001", "002", "003", "004", "005"):
assert _suppress(_msg(num, ["nick", "text"])) is True
def test_suppresses_motd(self) -> None:
for num in ("375", "372", "376", "422"):
assert _suppress(_msg(num, ["nick", "text"])) is True
def test_suppresses_lusers(self) -> None:
for num in ("250", "251", "252", "253", "254", "255", "265", "266"):
assert _suppress(_msg(num, ["nick", "text"])) is True
def test_suppresses_uid_and_visiblehost(self) -> None:
assert _suppress(_msg("042", ["nick", "UID"])) is True
assert _suppress(_msg("396", ["nick", "host"])) is True
def test_suppresses_nick_in_use(self) -> None:
assert _suppress(_msg("433", ["*", "nick", "in use"])) is True
def test_suppresses_server_notice(self) -> None:
msg = _msg("NOTICE", ["nick", "server message"], prefix="server.example.com")
assert _suppress(msg) is True
def test_passes_user_notice(self) -> None:
msg = _msg("NOTICE", ["nick", "hello"], prefix="user!ident@host")
assert _suppress(msg) is False
def test_suppresses_connection_notice_star(self) -> None:
msg = _msg("NOTICE", ["*", "Looking up your hostname..."])
assert _suppress(msg) is True
def test_suppresses_connection_notice_auth(self) -> None:
msg = _msg("NOTICE", ["AUTH", "*** Checking Ident"])
assert _suppress(msg) is True
def test_suppresses_ctcp_reply_in_notice(self) -> None:
msg = _msg("NOTICE", ["nick", "\x01VERSION mIRC\x01"], prefix="user!i@h")
assert _suppress(msg) is True
def test_suppresses_ctcp_in_privmsg(self) -> None:
msg = _msg("PRIVMSG", ["nick", "\x01VERSION\x01"], prefix="user!i@h")
assert _suppress(msg) is True
def test_passes_action_in_privmsg(self) -> None:
msg = _msg("PRIVMSG", ["#ch", "\x01ACTION waves\x01"], prefix="user!i@h")
assert _suppress(msg) is False
def test_suppresses_user_mode(self) -> None:
msg = _msg("MODE", ["nick", "+i"])
assert _suppress(msg) is True
def test_passes_channel_mode(self) -> None:
msg = _msg("MODE", ["#channel", "+o", "nick"])
assert _suppress(msg) is False
def test_passes_normal_privmsg(self) -> None:
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="user!i@h")
assert _suppress(msg) is False
def test_passes_join(self) -> None:
msg = _msg("JOIN", ["#test"], prefix="user!i@h")
assert _suppress(msg) is False
def test_passes_part(self) -> None:
msg = _msg("PART", ["#test"], prefix="user!i@h")
assert _suppress(msg) is False
def test_passes_kick(self) -> None:
msg = _msg("KICK", ["#test", "nick", "reason"], prefix="op!i@h")
assert _suppress(msg) is False
def test_passes_topic(self) -> None:
msg = _msg("TOPIC", ["#test", "new topic"], prefix="user!i@h")
assert _suppress(msg) is False
# -- Router._proxy_for ------------------------------------------------------
class TestProxyFor:
def test_default_proxy(self) -> None:
cfg = _config()
router = Router(cfg, _backlog())
proxy = router._proxy_for(_net_cfg())
assert proxy.host == "127.0.0.1"
assert proxy.port == 1080
def test_per_network_proxy_override(self) -> None:
cfg = _config()
router = Router(cfg, _backlog())
net = _net_cfg(proxy_host="10.0.0.1", proxy_port=9050)
proxy = router._proxy_for(net)
assert proxy.host == "10.0.0.1"
assert proxy.port == 9050
def test_per_network_proxy_inherits_port(self) -> None:
cfg = _config()
router = Router(cfg, _backlog())
net = _net_cfg(proxy_host="10.0.0.1")
proxy = router._proxy_for(net)
assert proxy.host == "10.0.0.1"
assert proxy.port == 1080 # from global config
# -- Router init and network management --------------------------------------
class TestNetworkManagement:
def test_network_names_empty(self) -> None:
cfg = _config()
router = Router(cfg, _backlog())
assert router.network_names() == []
def test_get_network_none(self) -> None:
cfg = _config()
router = Router(cfg, _backlog())
assert router.get_network("nonexistent") is None
def test_get_network_found(self) -> None:
cfg = _config()
router = Router(cfg, _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
assert router.get_network("libera") is net
def test_network_names(self) -> None:
cfg = _config()
router = Router(cfg, _backlog())
router.networks["libera"] = _mock_network("libera")
router.networks["oftc"] = _mock_network("oftc")
assert sorted(router.network_names()) == ["libera", "oftc"]
def test_get_own_nicks(self) -> None:
cfg = _config()
router = Router(cfg, _backlog())
router.networks["libera"] = _mock_network("libera", nick="lnick")
router.networks["oftc"] = _mock_network("oftc", nick="onick")
nicks = router.get_own_nicks()
assert nicks == {"libera": "lnick", "oftc": "onick"}
@pytest.mark.asyncio
async def test_add_network(self) -> None:
cfg = _config()
router = Router(cfg, _backlog())
net_cfg = _net_cfg("hackint", host="irc.hackint.org")
with patch("bouncer.router.Network") as MockNet:
mock_instance = MagicMock()
mock_instance.start = AsyncMock()
MockNet.return_value = mock_instance
result = await router.add_network(net_cfg)
assert "hackint" in router.networks
assert result is mock_instance
@pytest.mark.asyncio
async def test_remove_network(self) -> None:
cfg = _config()
router = Router(cfg, _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
result = await router.remove_network("libera")
assert result is True
assert "libera" not in router.networks
net.stop.assert_awaited_once()
@pytest.mark.asyncio
async def test_remove_network_not_found(self) -> None:
cfg = _config()
router = Router(cfg, _backlog())
result = await router.remove_network("nonexistent")
assert result is False
# -- Client attach/detach ---------------------------------------------------
class TestClientAttachDetach:
@pytest.mark.asyncio
async def test_attach_adds_client(self) -> None:
router = Router(_config(), _backlog())
client = _mock_client()
await router.attach_all(client)
assert client in router.clients
@pytest.mark.asyncio
async def test_detach_removes_client(self) -> None:
router = Router(_config(), _backlog())
client = _mock_client()
await router.attach_all(client)
await router.detach_all(client)
assert client not in router.clients
@pytest.mark.asyncio
async def test_detach_missing_client_no_error(self) -> None:
router = Router(_config(), _backlog())
client = _mock_client()
await router.detach_all(client) # should not raise
@pytest.mark.asyncio
async def test_multiple_clients(self) -> None:
router = Router(_config(), _backlog())
c1 = _mock_client("user1")
c2 = _mock_client("user2")
await router.attach_all(c1)
await router.attach_all(c2)
assert len(router.clients) == 2
await router.detach_all(c1)
assert len(router.clients) == 1
assert c2 in router.clients
# -- stop_networks -----------------------------------------------------------
class TestStopNetworks:
@pytest.mark.asyncio
async def test_stops_all(self) -> None:
router = Router(_config(), _backlog())
n1 = _mock_network("libera")
n2 = _mock_network("oftc")
router.networks = {"libera": n1, "oftc": n2}
await router.stop_networks()
n1.stop.assert_awaited_once()
n2.stop.assert_awaited_once()
# -- route_client_message (outbound: client -> network) ----------------------
class TestRouteClientMessage:
@pytest.mark.asyncio
async def test_privmsg_to_channel(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("PRIVMSG", ["#test/libera", "hello"])
await router.route_client_message(msg)
net.send.assert_awaited_once()
sent = net.send.call_args[0][0]
assert sent.command == "PRIVMSG"
assert sent.params == ["#test", "hello"]
@pytest.mark.asyncio
async def test_privmsg_to_nick(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("PRIVMSG", ["user123/libera", "hi there"])
await router.route_client_message(msg)
sent = net.send.call_args[0][0]
assert sent.params == ["user123", "hi there"]
@pytest.mark.asyncio
async def test_no_namespace_dropped(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("PRIVMSG", ["#test", "hello"])
await router.route_client_message(msg)
net.send.assert_not_awaited()
@pytest.mark.asyncio
async def test_unknown_network_dropped(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("PRIVMSG", ["#test/fakenet", "hello"])
await router.route_client_message(msg)
net.send.assert_not_awaited()
@pytest.mark.asyncio
async def test_disconnected_network_dropped(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera", connected=False)
router.networks["libera"] = net
msg = _msg("PRIVMSG", ["#test/libera", "hello"])
await router.route_client_message(msg)
net.send.assert_not_awaited()
@pytest.mark.asyncio
async def test_empty_params_ignored(self) -> None:
router = Router(_config(), _backlog())
msg = _msg("PRIVMSG")
await router.route_client_message(msg) # should not raise
@pytest.mark.asyncio
async def test_join_single_channel(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("JOIN", ["#dev/libera"])
await router.route_client_message(msg)
sent = net.send.call_args[0][0]
assert sent.command == "JOIN"
assert sent.params == ["#dev"]
@pytest.mark.asyncio
async def test_join_comma_separated_same_network(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("JOIN", ["#a/libera,#b/libera"])
await router.route_client_message(msg)
sent = net.send.call_args[0][0]
assert sent.params[0] == "#a,#b"
@pytest.mark.asyncio
async def test_join_comma_separated_multi_network(self) -> None:
router = Router(_config(), _backlog())
n1 = _mock_network("libera")
n2 = _mock_network("oftc")
router.networks = {"libera": n1, "oftc": n2}
msg = _msg("JOIN", ["#a/libera,#b/oftc"])
await router.route_client_message(msg)
assert n1.send.await_count == 1
assert n2.send.await_count == 1
s1 = n1.send.call_args[0][0]
s2 = n2.send.call_args[0][0]
assert s1.params[0] == "#a"
assert s2.params[0] == "#b"
@pytest.mark.asyncio
async def test_part(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("PART", ["#dev/libera", "leaving"])
await router.route_client_message(msg)
sent = net.send.call_args[0][0]
assert sent.command == "PART"
assert sent.params == ["#dev", "leaving"]
@pytest.mark.asyncio
async def test_kick(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("KICK", ["#test/libera", "baduser/libera", "bye"])
await router.route_client_message(msg)
sent = net.send.call_args[0][0]
assert sent.command == "KICK"
assert sent.params == ["#test", "baduser", "bye"]
@pytest.mark.asyncio
async def test_kick_no_namespace_dropped(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("KICK", ["#test", "baduser"])
await router.route_client_message(msg)
net.send.assert_not_awaited()
@pytest.mark.asyncio
async def test_invite(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("INVITE", ["user/libera", "#test/libera"])
await router.route_client_message(msg)
sent = net.send.call_args[0][0]
assert sent.command == "INVITE"
assert sent.params == ["user", "#test"]
@pytest.mark.asyncio
async def test_invite_network_from_either_param(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
# Network suffix only on the nick, not the channel
msg = _msg("INVITE", ["user/libera", "#test"])
await router.route_client_message(msg)
net.send.assert_awaited_once()
@pytest.mark.asyncio
async def test_mode_channel(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("MODE", ["#test/libera", "+o", "nick"])
await router.route_client_message(msg)
sent = net.send.call_args[0][0]
assert sent.params == ["#test", "+o", "nick"]
@pytest.mark.asyncio
async def test_who(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("WHO", ["#test/libera"])
await router.route_client_message(msg)
sent = net.send.call_args[0][0]
assert sent.params == ["#test"]
@pytest.mark.asyncio
async def test_notice(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("NOTICE", ["user/libera", "you've been warned"])
await router.route_client_message(msg)
sent = net.send.call_args[0][0]
assert sent.params == ["user", "you've been warned"]
@pytest.mark.asyncio
async def test_topic(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("TOPIC", ["#test/libera", "new topic"])
await router.route_client_message(msg)
sent = net.send.call_args[0][0]
assert sent.params == ["#test", "new topic"]
@pytest.mark.asyncio
async def test_preserves_tags(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("PRIVMSG", ["#test/libera", "hi"],
tags={"label": "abc"})
await router.route_client_message(msg)
sent = net.send.call_args[0][0]
assert sent.tags == {"label": "abc"}
@pytest.mark.asyncio
async def test_preserves_prefix(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera")
router.networks["libera"] = net
msg = _msg("PRIVMSG", ["#test/libera", "hi"], prefix="me!u@h")
await router.route_client_message(msg)
sent = net.send.call_args[0][0]
assert sent.prefix == "me!u@h"
# -- _dispatch (inbound: network -> clients) ---------------------------------
class TestDispatch:
@pytest.mark.asyncio
async def test_delivers_to_all_clients(self) -> None:
router = Router(_config(), _backlog())
net = _mock_network("libera", nick="bot")
router.networks["libera"] = net
c1 = _mock_client("user1")
c2 = _mock_client("user2")
router.clients = [c1, c2]
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="sender!u@h")
await router._dispatch("libera", msg)
assert c1.write.call_count == 1
assert c2.write.call_count == 1
@pytest.mark.asyncio
async def test_suppressed_not_delivered(self) -> None:
router = Router(_config(), _backlog())
router.networks["libera"] = _mock_network("libera")
client = _mock_client()
router.clients = [client]
msg = _msg("001", ["nick", "Welcome"])
await router._dispatch("libera", msg)
client.write.assert_not_called()
@pytest.mark.asyncio
async def test_namespaces_channel(self) -> None:
router = Router(_config(), _backlog())
router.networks["libera"] = _mock_network("libera", nick="bot")
client = _mock_client("me")
router.clients = [client]
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="user!i@h")
await router._dispatch("libera", msg)
written = client.write.call_args[0][0]
assert b"#test/libera" in written
@pytest.mark.asyncio
async def test_namespaces_prefix(self) -> None:
router = Router(_config(), _backlog())
router.networks["libera"] = _mock_network("libera", nick="bot")
client = _mock_client("me")
router.clients = [client]
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="sender!i@h")
await router._dispatch("libera", msg)
written = client.write.call_args[0][0]
assert b"sender/libera" in written
@pytest.mark.asyncio
async def test_own_nick_rewritten_to_client_nick(self) -> None:
router = Router(_config(), _backlog())
router.networks["libera"] = _mock_network("libera", nick="bot")
client = _mock_client("clientnick")
router.clients = [client]
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="bot!i@h")
await router._dispatch("libera", msg)
written = client.write.call_args[0][0]
assert b"clientnick" in written
assert b"bot/libera" not in written
@pytest.mark.asyncio
async def test_client_write_error_handled(self) -> None:
router = Router(_config(), _backlog())
router.networks["libera"] = _mock_network("libera", nick="bot")
bad_client = _mock_client()
bad_client.write.side_effect = ConnectionResetError
good_client = _mock_client()
router.clients = [bad_client, good_client]
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="user!i@h")
await router._dispatch("libera", msg)
# Bad client raised, but good client still received
good_client.write.assert_called_once()
@pytest.mark.asyncio
async def test_no_clients_no_error(self) -> None:
router = Router(_config(), _backlog())
router.networks["libera"] = _mock_network("libera")
router.clients = []
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="user!i@h")
await router._dispatch("libera", msg) # should not raise
# -- _on_network_status ------------------------------------------------------
class TestOnNetworkStatus:
def test_broadcasts_to_all_clients(self) -> None:
router = Router(_config(), _backlog())
c1 = _mock_client()
c2 = _mock_client()
router.clients = [c1, c2]
router._on_network_status("libera", "connection stable")
assert c1.write.call_count == 1
assert c2.write.call_count == 1
written = c1.write.call_args[0][0]
assert b"[libera] connection stable" in written
assert b"bouncer" in written # prefix
def test_client_error_does_not_propagate(self) -> None:
router = Router(_config(), _backlog())
bad = _mock_client()
bad.write.side_effect = ConnectionResetError
good = _mock_client()
router.clients = [bad, good]
router._on_network_status("libera", "test")
good.write.assert_called_once()
# -- _on_network_message (sync -> async bridge) -----------------------------
class TestOnNetworkMessage:
@pytest.mark.asyncio
async def test_creates_dispatch_task(self) -> None:
router = Router(_config(), _backlog())
router.networks["libera"] = _mock_network("libera")
with patch.object(router, "_dispatch", new_callable=AsyncMock) as mock_disp:
msg = _msg("PRIVMSG", ["#test", "hi"], prefix="u!i@h")
router._on_network_message("libera", msg)
# Let the task run
await asyncio.sleep(0)
mock_disp.assert_awaited_once_with("libera", msg)
# -- _replay_backlog ---------------------------------------------------------
class TestReplayBacklog:
@pytest.mark.asyncio
async def test_empty_backlog(self) -> None:
bl = _backlog()
router = Router(_config(), bl)
router.networks["libera"] = _mock_network("libera", nick="bot")
client = _mock_client()
await router._replay_backlog(client, "libera")
client.write.assert_not_called()
@pytest.mark.asyncio
async def test_replays_messages(self) -> None:
bl = _backlog()
entry1 = MagicMock()
entry1.id = 1
entry1.command = "PRIVMSG"
entry1.target = "#test"
entry1.content = "hello"
entry1.sender = "user!i@h"
entry2 = MagicMock()
entry2.id = 2
entry2.command = "PRIVMSG"
entry2.target = "#test"
entry2.content = "world"
entry2.sender = "other!i@h"
bl.replay.return_value = [entry1, entry2]
router = Router(_config(), bl)
router.networks["libera"] = _mock_network("libera", nick="bot")
client = _mock_client("me")
await router._replay_backlog(client, "libera")
assert client.write.call_count == 2
# Verify namespaced
first = client.write.call_args_list[0][0][0]
assert b"#test/libera" in first
# Should mark last seen
bl.mark_seen.assert_awaited_once_with("libera", 2)
@pytest.mark.asyncio
async def test_replays_since_last_seen(self) -> None:
bl = _backlog()
bl.get_last_seen.return_value = 42
router = Router(_config(), bl)
router.networks["libera"] = _mock_network("libera", nick="bot")
client = _mock_client()
await router._replay_backlog(client, "libera")
bl.replay.assert_awaited_once_with("libera", since_id=42)
@pytest.mark.asyncio
async def test_suppressed_skipped_during_replay(self) -> None:
bl = _backlog()
entry = MagicMock()
entry.id = 1
entry.command = "NOTICE"
entry.target = "nick"
entry.content = "\x01VERSION mIRC\x01"
entry.sender = "server.example.com" # no '!' -> server notice
bl.replay.return_value = [entry]
router = Router(_config(), bl)
router.networks["libera"] = _mock_network("libera", nick="bot")
client = _mock_client()
await router._replay_backlog(client, "libera")
client.write.assert_not_called()
# Still marks seen even if all suppressed
bl.mark_seen.assert_awaited_once_with("libera", 1)
@pytest.mark.asyncio
async def test_client_error_stops_replay(self) -> None:
bl = _backlog()
entry1 = MagicMock()
entry1.id = 1
entry1.command = "PRIVMSG"
entry1.target = "#test"
entry1.content = "msg1"
entry1.sender = "user!i@h"
entry2 = MagicMock()
entry2.id = 2
entry2.command = "PRIVMSG"
entry2.target = "#test"
entry2.content = "msg2"
entry2.sender = "user!i@h"
bl.replay.return_value = [entry1, entry2]
router = Router(_config(), bl)
router.networks["libera"] = _mock_network("libera", nick="bot")
client = _mock_client()
client.write.side_effect = [ConnectionResetError, None]
await router._replay_backlog(client, "libera")
# Should have stopped after first error
assert client.write.call_count == 1
# -- BACKLOG_COMMANDS constant -----------------------------------------------
class TestBacklogCommands:
def test_expected_commands(self) -> None:
assert "PRIVMSG" in BACKLOG_COMMANDS
assert "NOTICE" in BACKLOG_COMMANDS
assert "TOPIC" in BACKLOG_COMMANDS
assert "KICK" in BACKLOG_COMMANDS
assert "MODE" in BACKLOG_COMMANDS
def test_join_not_in_backlog(self) -> None:
assert "JOIN" not in BACKLOG_COMMANDS
assert "PART" not in BACKLOG_COMMANDS
assert "QUIT" not in BACKLOG_COMMANDS
# -- server-time tag injection -----------------------------------------------
class TestServerTimeDispatch:
@pytest.mark.asyncio
async def test_injects_time_tag_when_missing(self) -> None:
router = Router(_config(), _backlog())
router.networks["libera"] = _mock_network("libera", nick="bot")
client = _mock_client("me")
router.clients = [client]
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="user!i@h")
await router._dispatch("libera", msg)
assert "time" in msg.tags
# Verify ISO8601 format
assert msg.tags["time"].endswith("Z")
assert "T" in msg.tags["time"]
@pytest.mark.asyncio
async def test_preserves_existing_time_tag(self) -> None:
router = Router(_config(), _backlog())
router.networks["libera"] = _mock_network("libera", nick="bot")
client = _mock_client("me")
router.clients = [client]
original_time = "2025-01-15T12:00:00.000000Z"
msg = _msg("PRIVMSG", ["#test", "hello"], prefix="user!i@h",
tags={"time": original_time})
await router._dispatch("libera", msg)
assert msg.tags["time"] == original_time
class TestServerTimeReplay:
@pytest.mark.asyncio
async def test_replay_injects_timestamp(self) -> None:
bl = _backlog()
entry = MagicMock()
entry.id = 1
entry.command = "PRIVMSG"
entry.target = "#test"
entry.content = "hello"
entry.sender = "user!i@h"
entry.timestamp = 1705320000.0 # 2024-01-15T12:00:00Z
bl.replay.return_value = [entry]
router = Router(_config(), bl)
router.networks["libera"] = _mock_network("libera", nick="bot")
client = _mock_client("me")
await router._replay_backlog(client, "libera")
written = client.write.call_args[0][0]
# The time tag should be in the wire format
assert b"time=" in written
# -- push notifications ------------------------------------------------------
class TestNotifications:
@pytest.mark.asyncio
async def test_notification_triggered_on_pm_no_clients(self) -> None:
cfg = _config()
cfg.bouncer.notify_url = "https://ntfy.sh/test"
router = Router(cfg, _backlog())
net = _mock_network("libera", nick="bot")
router.networks["libera"] = net
router.clients = [] # no clients
with patch.object(router._notifier, "should_notify", return_value=True):
with patch.object(router._notifier, "send", new_callable=AsyncMock) as mock_send:
msg = _msg("PRIVMSG", ["bot", "hello bot"], prefix="user!i@h")
await router._dispatch("libera", msg)
# Let fire-and-forget task run
await asyncio.sleep(0)
mock_send.assert_awaited_once_with("libera", "user", "bot", "hello bot")
@pytest.mark.asyncio
async def test_notification_not_triggered_with_clients(self) -> None:
cfg = _config()
cfg.bouncer.notify_url = "https://ntfy.sh/test"
router = Router(cfg, _backlog())
net = _mock_network("libera", nick="bot")
router.networks["libera"] = net
client = _mock_client()
router.clients = [client]
with patch.object(router._notifier, "send", new_callable=AsyncMock) as mock_send:
msg = _msg("PRIVMSG", ["bot", "hello bot"], prefix="user!i@h")
await router._dispatch("libera", msg)
await asyncio.sleep(0)
mock_send.assert_not_awaited()
@pytest.mark.asyncio
async def test_notification_respects_should_notify(self) -> None:
cfg = _config()
cfg.bouncer.notify_url = "https://ntfy.sh/test"
router = Router(cfg, _backlog())
net = _mock_network("libera", nick="bot")
router.networks["libera"] = net
router.clients = []
with patch.object(router._notifier, "should_notify", return_value=False):
with patch.object(router._notifier, "send", new_callable=AsyncMock) as mock_send:
msg = _msg("PRIVMSG", ["#channel", "random msg"], prefix="user!i@h")
await router._dispatch("libera", msg)
await asyncio.sleep(0)
mock_send.assert_not_awaited()
@pytest.mark.asyncio
async def test_notification_disabled_skips(self) -> None:
cfg = _config()
cfg.bouncer.notify_url = "" # disabled
router = Router(cfg, _backlog())
net = _mock_network("libera", nick="bot")
router.networks["libera"] = net
router.clients = []
with patch.object(router._notifier, "send", new_callable=AsyncMock) as mock_send:
msg = _msg("PRIVMSG", ["bot", "hello"], prefix="user!i@h")
await router._dispatch("libera", msg)
await asyncio.sleep(0)
mock_send.assert_not_awaited()