Add channel_keys dict to NetworkConfig for storing per-channel keys. Keys are used in KICK rejoin, passed via AUTOJOIN +#channel key syntax, supported in ADDNETWORK channel_keys= parameter, and propagated through REHASH. Extract rehash() as reusable async function for SIGHUP reuse. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1095 lines
41 KiB
Python
1095 lines
41 KiB
Python
"""Tests for bouncer control commands."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from bouncer import commands
|
|
from bouncer.network import State
|
|
|
|
|
|
def _make_network(name: str, state: State, nick: str = "testnick",
|
|
host: str | None = None, channels: set[str] | None = None,
|
|
topics: dict[str, str] | None = None,
|
|
channel_keys: dict[str, str] | None = None) -> MagicMock:
|
|
"""Create a mock Network."""
|
|
net = MagicMock()
|
|
net.cfg.name = name
|
|
net.cfg.host = f"irc.{name}.chat"
|
|
net.cfg.port = 6697
|
|
net.cfg.tls = True
|
|
net.cfg.channels = list(channels) if channels else []
|
|
net.cfg.channel_keys = dict(channel_keys) if channel_keys else {}
|
|
net.cfg.nick = nick
|
|
net.cfg.password = None
|
|
net.state = state
|
|
net.nick = nick
|
|
net.visible_host = host
|
|
net.channels = channels or set()
|
|
net.topics = topics or {}
|
|
net.names = {}
|
|
net._reconnect_attempt = 0
|
|
net.connected = state not in (State.DISCONNECTED, State.CONNECTING)
|
|
net.ready = state == State.READY
|
|
net.start = AsyncMock()
|
|
net.stop = AsyncMock()
|
|
net.send = AsyncMock()
|
|
net.send_raw = AsyncMock()
|
|
net._nickserv_register = AsyncMock()
|
|
return net
|
|
|
|
|
|
def _make_router(*networks: MagicMock) -> MagicMock:
|
|
"""Create a mock Router with the given networks."""
|
|
router = MagicMock()
|
|
router.networks = {n.cfg.name: n for n in networks}
|
|
router.network_names.return_value = [n.cfg.name for n in networks]
|
|
router.get_network = lambda name: router.networks.get(name)
|
|
router.backlog = AsyncMock()
|
|
router.add_network = AsyncMock()
|
|
router.remove_network = AsyncMock(return_value=True)
|
|
router.config = MagicMock()
|
|
router.config.bouncer.cert_validity_days = 3650
|
|
return router
|
|
|
|
|
|
def _make_client(nick: str = "testuser") -> MagicMock:
|
|
"""Create a mock Client."""
|
|
client = MagicMock()
|
|
client.nick = nick
|
|
client._connected_at = time.time() - 120
|
|
client._addr = ("127.0.0.1", 54321)
|
|
return client
|
|
|
|
|
|
class TestHelp:
|
|
@pytest.mark.asyncio
|
|
async def test_help_lists_commands(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("HELP", router, client)
|
|
assert lines[0] == "[HELP]"
|
|
assert any("STATUS" in line for line in lines)
|
|
assert any("UPTIME" in line for line in lines)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_input_shows_help(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("", router, client)
|
|
assert lines[0] == "[HELP]"
|
|
|
|
|
|
class TestStatus:
|
|
@pytest.mark.asyncio
|
|
async def test_status_no_networks(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("STATUS", router, client)
|
|
assert lines[0] == "[STATUS]"
|
|
assert "(no networks configured)" in lines[1]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_ready_network(self) -> None:
|
|
net = _make_network("libera", State.READY, nick="fabesune", host="user/fabesune")
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("STATUS", router, client)
|
|
assert lines[0] == "[STATUS]"
|
|
assert "ready" in lines[1]
|
|
assert "fabesune" in lines[1]
|
|
assert "user/fabesune" in lines[1]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_connecting_shows_attempt(self) -> None:
|
|
net = _make_network("hackint", State.CONNECTING)
|
|
net._reconnect_attempt = 3
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("STATUS", router, client)
|
|
assert "connecting" in lines[1]
|
|
assert "attempt 3" in lines[1]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_case_insensitive(self) -> None:
|
|
net = _make_network("libera", State.READY, nick="testnick")
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("status", router, client)
|
|
assert lines[0] == "[STATUS]"
|
|
|
|
|
|
class TestInfo:
|
|
@pytest.mark.asyncio
|
|
async def test_info_missing_arg(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("INFO", router, client)
|
|
assert "Usage" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_info_unknown_network(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("INFO fakenet", router, client)
|
|
assert "Unknown network" in lines[0]
|
|
assert "libera" in lines[1]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_info_valid_network(self) -> None:
|
|
net = _make_network("libera", State.READY, nick="fabesune",
|
|
host="user/fabesune", channels={"#test", "#dev"})
|
|
router = _make_router(net)
|
|
router.backlog.list_nickserv_creds.return_value = [
|
|
("libera", "fabesune", "test@mail.tm", "user/fabesune", 1700000000.0, "verified", ""),
|
|
]
|
|
client = _make_client()
|
|
lines = await commands.dispatch("INFO libera", router, client)
|
|
assert lines[0] == "[INFO] libera"
|
|
assert any("ready" in line for line in lines)
|
|
assert any("fabesune" in line for line in lines)
|
|
assert any("#dev" in line or "#test" in line for line in lines)
|
|
assert any("verified" in line for line in lines)
|
|
|
|
|
|
class TestUptime:
|
|
@pytest.mark.asyncio
|
|
async def test_uptime(self) -> None:
|
|
commands.STARTUP_TIME = time.time() - 3661 # 1h 1m 1s
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("UPTIME", router, client)
|
|
assert lines[0].startswith("[UPTIME]")
|
|
assert "1h" in lines[0]
|
|
assert "1m" in lines[0]
|
|
assert "1s" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_uptime_unknown(self) -> None:
|
|
commands.STARTUP_TIME = 0.0
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("UPTIME", router, client)
|
|
assert "unknown" in lines[0]
|
|
|
|
|
|
class TestNetworks:
|
|
@pytest.mark.asyncio
|
|
async def test_networks_empty(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("NETWORKS", router, client)
|
|
assert lines[0] == "[NETWORKS]"
|
|
assert "(none)" in lines[1]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_networks_lists_all(self) -> None:
|
|
libera = _make_network("libera", State.READY)
|
|
oftc = _make_network("oftc", State.CONNECTING)
|
|
router = _make_router(libera, oftc)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("NETWORKS", router, client)
|
|
assert lines[0] == "[NETWORKS]"
|
|
assert any("libera" in line and "ready" in line for line in lines[1:])
|
|
assert any("oftc" in line and "connecting" in line for line in lines[1:])
|
|
|
|
|
|
class TestCreds:
|
|
@pytest.mark.asyncio
|
|
async def test_creds_no_backlog(self) -> None:
|
|
router = _make_router()
|
|
router.backlog = None
|
|
client = _make_client()
|
|
lines = await commands.dispatch("CREDS", router, client)
|
|
assert "not available" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_creds_empty(self) -> None:
|
|
router = _make_router()
|
|
router.backlog.list_nickserv_creds.return_value = []
|
|
client = _make_client()
|
|
lines = await commands.dispatch("CREDS", router, client)
|
|
assert "no stored credentials" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_creds_lists_entries(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
router.backlog.list_nickserv_creds.return_value = [
|
|
("libera", "fabesune", "test@mail.tm", "user/fabesune", 1700000000.0, "verified", ""),
|
|
("libera", "oldnick", "old@mail.tm", "old/host", 1699000000.0, "pending", "https://example.com/verify/abc"),
|
|
]
|
|
client = _make_client()
|
|
lines = await commands.dispatch("CREDS libera", router, client)
|
|
assert lines[0] == "[CREDS]"
|
|
assert any("+" in line and "fabesune" in line and "verified" in line for line in lines)
|
|
assert any("~" in line and "oldnick" in line and "pending" in line for line in lines)
|
|
assert any("verify: https://example.com/verify/abc" in line for line in lines)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_creds_unknown_network(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("CREDS fakenet", router, client)
|
|
assert "Unknown network" in lines[0]
|
|
|
|
|
|
class TestConnect:
|
|
@pytest.mark.asyncio
|
|
async def test_connect_missing_arg(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("CONNECT", router, client)
|
|
assert "Usage" in lines[0] or "provide" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connect_unknown_network(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("CONNECT fakenet", router, client)
|
|
assert "Unknown network" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connect_already_connected(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("CONNECT libera", router, client)
|
|
assert "already" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connect_disconnected(self) -> None:
|
|
net = _make_network("libera", State.DISCONNECTED)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("CONNECT libera", router, client)
|
|
assert "[CONNECT]" in lines[0]
|
|
assert "starting" in lines[0]
|
|
|
|
|
|
class TestDisconnect:
|
|
@pytest.mark.asyncio
|
|
async def test_disconnect_ready(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("DISCONNECT libera", router, client)
|
|
assert "[DISCONNECT]" in lines[0]
|
|
net.stop.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_disconnect_already_disconnected(self) -> None:
|
|
net = _make_network("libera", State.DISCONNECTED)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("DISCONNECT libera", router, client)
|
|
assert "already disconnected" in lines[0]
|
|
|
|
|
|
class TestReconnect:
|
|
@pytest.mark.asyncio
|
|
async def test_reconnect(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("RECONNECT libera", router, client)
|
|
assert "[RECONNECT]" in lines[0]
|
|
net.stop.assert_awaited_once()
|
|
|
|
|
|
class TestNick:
|
|
@pytest.mark.asyncio
|
|
async def test_nick_missing_args(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("NICK", router, client)
|
|
assert "Usage" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_nick_missing_nick(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("NICK libera", router, client)
|
|
assert "Usage" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_nick_change(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("NICK libera newnick", router, client)
|
|
assert "[NICK]" in lines[0]
|
|
net.send_raw.assert_awaited_once_with("NICK", "newnick")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_nick_not_connected(self) -> None:
|
|
net = _make_network("libera", State.DISCONNECTED)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("NICK libera newnick", router, client)
|
|
assert "not connected" in lines[0]
|
|
|
|
|
|
class TestRaw:
|
|
@pytest.mark.asyncio
|
|
async def test_raw_missing_args(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("RAW", router, client)
|
|
assert "Usage" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_raw_send(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("RAW libera WHOIS testuser", router, client)
|
|
assert "[RAW]" in lines[0]
|
|
net.send.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_raw_not_connected(self) -> None:
|
|
net = _make_network("libera", State.DISCONNECTED)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("RAW libera WHOIS testuser", router, client)
|
|
assert "not connected" in lines[0]
|
|
|
|
|
|
class TestChannels:
|
|
@pytest.mark.asyncio
|
|
async def test_channels_all(self) -> None:
|
|
net = _make_network("libera", State.READY, channels={"#test", "#dev"})
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("CHANNELS", router, client)
|
|
assert lines[0] == "[CHANNELS]"
|
|
assert any("#test" in line for line in lines)
|
|
assert any("#dev" in line for line in lines)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channels_specific_network(self) -> None:
|
|
net = _make_network("libera", State.READY, channels={"#test"})
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("CHANNELS libera", router, client)
|
|
assert lines[0] == "[CHANNELS]"
|
|
assert any("#test" in line for line in lines)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channels_unknown_network(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("CHANNELS fakenet", router, client)
|
|
assert "Unknown network" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channels_with_topics(self) -> None:
|
|
net = _make_network("libera", State.READY, channels={"#test"},
|
|
topics={"#test": "Welcome to test"})
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("CHANNELS libera", router, client)
|
|
assert any("Welcome to test" in line for line in lines)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channels_empty(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("CHANNELS libera", router, client)
|
|
assert any("(none)" in line for line in lines)
|
|
|
|
|
|
class TestClients:
|
|
@pytest.mark.asyncio
|
|
async def test_clients_empty(self) -> None:
|
|
router = _make_router()
|
|
router.clients = []
|
|
client = _make_client()
|
|
lines = await commands.dispatch("CLIENTS", router, client)
|
|
assert lines[0] == "[CLIENTS]"
|
|
assert "(none)" in lines[1]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_clients_lists_connected(self) -> None:
|
|
router = _make_router()
|
|
c1 = _make_client("user1")
|
|
c2 = _make_client("user2")
|
|
router.clients = [c1, c2]
|
|
client = _make_client()
|
|
lines = await commands.dispatch("CLIENTS", router, client)
|
|
assert lines[0] == "[CLIENTS]"
|
|
assert any("user1" in line for line in lines)
|
|
assert any("user2" in line for line in lines)
|
|
assert any("connected" in line for line in lines)
|
|
|
|
|
|
class TestBacklog:
|
|
@pytest.mark.asyncio
|
|
async def test_backlog_no_backlog(self) -> None:
|
|
router = _make_router()
|
|
router.backlog = None
|
|
client = _make_client()
|
|
lines = await commands.dispatch("BACKLOG", router, client)
|
|
assert "not available" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backlog_stats(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
router.backlog.stats.return_value = [("libera", 1500)]
|
|
router.backlog.db_size.return_value = 2_097_152 # 2 MB
|
|
client = _make_client()
|
|
lines = await commands.dispatch("BACKLOG", router, client)
|
|
assert lines[0] == "[BACKLOG]"
|
|
assert any("1,500" in line for line in lines)
|
|
assert any("2.0 MB" in line for line in lines)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backlog_specific_network(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
router.backlog.stats.return_value = [("libera", 42)]
|
|
router.backlog.db_size.return_value = 4096
|
|
client = _make_client()
|
|
lines = await commands.dispatch("BACKLOG libera", router, client)
|
|
assert lines[0] == "[BACKLOG]"
|
|
assert any("42" in line for line in lines)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backlog_unknown_network(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("BACKLOG fakenet", router, client)
|
|
assert "Unknown network" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backlog_empty(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
router.backlog.stats.return_value = []
|
|
router.backlog.db_size.return_value = 1024
|
|
client = _make_client()
|
|
lines = await commands.dispatch("BACKLOG", router, client)
|
|
assert any("no messages" in line for line in lines)
|
|
|
|
|
|
class TestVersion:
|
|
@pytest.mark.asyncio
|
|
async def test_version(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("VERSION", router, client)
|
|
assert "[VERSION]" in lines[0]
|
|
assert "bouncer" in lines[0]
|
|
assert "Python" in lines[0]
|
|
|
|
|
|
class TestRehash:
|
|
@pytest.mark.asyncio
|
|
async def test_rehash_no_config_path(self) -> None:
|
|
commands.CONFIG_PATH = None
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("REHASH", router, client)
|
|
assert "config path not set" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rehash_config_error(self) -> None:
|
|
commands.CONFIG_PATH = Path("/nonexistent/config.toml")
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("REHASH", router, client)
|
|
assert "config error" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rehash_adds_and_removes(self) -> None:
|
|
from bouncer.config import BouncerConfig, Config, NetworkConfig, ProxyConfig
|
|
|
|
old_net = _make_network("libera", State.READY)
|
|
router = _make_router(old_net)
|
|
router._notifier = MagicMock()
|
|
router._farm = MagicMock()
|
|
router._farm._cfg = BouncerConfig()
|
|
router._farm.start = AsyncMock()
|
|
router._farm.stop = AsyncMock()
|
|
|
|
new_cfg = Config(
|
|
bouncer=BouncerConfig(),
|
|
proxy=ProxyConfig(),
|
|
networks={
|
|
"oftc": NetworkConfig(name="oftc", host="irc.oftc.net", port=6697, tls=True),
|
|
},
|
|
)
|
|
|
|
commands.CONFIG_PATH = Path("/tmp/test.toml")
|
|
with patch("bouncer.config.load", return_value=new_cfg):
|
|
client = _make_client()
|
|
lines = await commands.dispatch("REHASH", router, client)
|
|
|
|
assert lines[0] == "[REHASH]"
|
|
assert any("removed: libera" in line for line in lines)
|
|
assert any("added: oftc" in line for line in lines)
|
|
router.remove_network.assert_awaited()
|
|
router.add_network.assert_awaited()
|
|
|
|
|
|
class TestRehashFunction:
|
|
@pytest.mark.asyncio
|
|
async def test_rehash_function_directly(self) -> None:
|
|
from bouncer.commands import rehash
|
|
from bouncer.config import BouncerConfig, Config, NetworkConfig, ProxyConfig
|
|
|
|
old_net = _make_network("libera", State.READY)
|
|
router = _make_router(old_net)
|
|
router._notifier = MagicMock()
|
|
router._farm = MagicMock()
|
|
router._farm._cfg = BouncerConfig()
|
|
router._farm.start = AsyncMock()
|
|
router._farm.stop = AsyncMock()
|
|
|
|
new_cfg = Config(
|
|
bouncer=BouncerConfig(),
|
|
proxy=ProxyConfig(),
|
|
networks={
|
|
"oftc": NetworkConfig(name="oftc", host="irc.oftc.net", port=6697, tls=True),
|
|
},
|
|
)
|
|
|
|
with patch("bouncer.config.load", return_value=new_cfg):
|
|
lines = await rehash(router, Path("/tmp/test.toml"))
|
|
|
|
assert lines[0] == "[REHASH]"
|
|
assert any("removed: libera" in line for line in lines)
|
|
assert any("added: oftc" in line for line in lines)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rehash_updates_bouncer_config(self) -> None:
|
|
from bouncer.commands import rehash
|
|
from bouncer.config import BouncerConfig, Config, NetworkConfig, ProxyConfig
|
|
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
router._notifier = MagicMock()
|
|
router._farm = MagicMock()
|
|
router._farm._cfg = BouncerConfig()
|
|
router._farm.start = AsyncMock()
|
|
router._farm.stop = AsyncMock()
|
|
|
|
new_cfg = Config(
|
|
bouncer=BouncerConfig(notify_url="https://ntfy.sh/test"),
|
|
proxy=ProxyConfig(),
|
|
networks={
|
|
"libera": NetworkConfig(name="libera", host="irc.libera.chat",
|
|
port=6697, tls=True,
|
|
channel_keys={"#secret": "key"}),
|
|
},
|
|
)
|
|
|
|
with patch("bouncer.config.load", return_value=new_cfg):
|
|
result = await rehash(router, Path("/tmp/test.toml"))
|
|
|
|
assert result[0] == "[REHASH]"
|
|
assert router.config == new_cfg
|
|
# Notifier was replaced (new instance)
|
|
assert router._notifier is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rehash_propagates_channel_keys(self) -> None:
|
|
from bouncer.commands import rehash
|
|
from bouncer.config import BouncerConfig, Config, NetworkConfig, ProxyConfig
|
|
|
|
net = _make_network("libera", State.READY)
|
|
net.cfg.host = "irc.libera.chat"
|
|
net.cfg.port = 6697
|
|
net.cfg.tls = True
|
|
net.cfg.proxy_host = None
|
|
net.cfg.proxy_port = None
|
|
net.cfg.channel_keys = {}
|
|
router = _make_router(net)
|
|
router._notifier = MagicMock()
|
|
router._farm = MagicMock()
|
|
router._farm._cfg = BouncerConfig()
|
|
router._farm.start = AsyncMock()
|
|
router._farm.stop = AsyncMock()
|
|
|
|
new_cfg = Config(
|
|
bouncer=BouncerConfig(),
|
|
proxy=ProxyConfig(),
|
|
networks={
|
|
"libera": NetworkConfig(name="libera", host="irc.libera.chat",
|
|
port=6697, tls=True,
|
|
channel_keys={"#secret": "key123"}),
|
|
},
|
|
)
|
|
|
|
with patch("bouncer.config.load", return_value=new_cfg):
|
|
await rehash(router, Path("/tmp/test.toml"))
|
|
|
|
assert net.cfg.channel_keys == {"#secret": "key123"}
|
|
|
|
|
|
class TestAddNetwork:
|
|
@pytest.mark.asyncio
|
|
async def test_addnetwork_missing_args(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("ADDNETWORK", router, client)
|
|
assert "Usage" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_addnetwork_missing_host(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("ADDNETWORK testnet port=6667", router, client)
|
|
assert "Required" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_addnetwork_already_exists(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("ADDNETWORK libera host=irc.libera.chat", router, client)
|
|
assert "already exists" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_addnetwork_success(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch(
|
|
"ADDNETWORK testnet host=irc.test.com port=6697 tls=yes nick=mynick channels=#a,#b",
|
|
router, client,
|
|
)
|
|
assert "[ADDNETWORK]" in lines[0]
|
|
assert "testnet" in lines[0]
|
|
router.add_network.assert_awaited_once()
|
|
cfg = router.add_network.call_args[0][0]
|
|
assert cfg.name == "testnet"
|
|
assert cfg.host == "irc.test.com"
|
|
assert cfg.port == 6697
|
|
assert cfg.tls is True
|
|
assert cfg.nick == "mynick"
|
|
assert cfg.channels == ["#a", "#b"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_addnetwork_slash_in_name(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("ADDNETWORK test/net host=h", router, client)
|
|
assert "must not contain" in lines[0]
|
|
|
|
|
|
class TestDelNetwork:
|
|
@pytest.mark.asyncio
|
|
async def test_delnetwork_missing_arg(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("DELNETWORK", router, client)
|
|
assert "Usage" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delnetwork_unknown(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("DELNETWORK fakenet", router, client)
|
|
assert "Unknown network" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delnetwork_success(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("DELNETWORK libera", router, client)
|
|
assert "[DELNETWORK]" in lines[0]
|
|
router.remove_network.assert_awaited_once_with("libera")
|
|
|
|
|
|
class TestAutojoin:
|
|
@pytest.mark.asyncio
|
|
async def test_autojoin_missing_args(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("AUTOJOIN", router, client)
|
|
assert "Usage" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_autojoin_add(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
net.cfg.channels = ["#test"]
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("AUTOJOIN libera +#dev", router, client)
|
|
assert "[AUTOJOIN]" in lines[0]
|
|
assert any("added: #dev" in line for line in lines)
|
|
assert "#dev" in net.cfg.channels
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_autojoin_remove(self) -> None:
|
|
net = _make_network("libera", State.READY, channels={"#test"})
|
|
net.cfg.channels = ["#test"]
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("AUTOJOIN libera -#test", router, client)
|
|
assert any("removed: #test" in line for line in lines)
|
|
assert "#test" not in net.cfg.channels
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_autojoin_remove_missing(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
net.cfg.channels = []
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("AUTOJOIN libera -#missing", router, client)
|
|
assert any("not in autojoin" in line for line in lines)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_autojoin_with_key(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
net.cfg.channels = []
|
|
net.cfg.channel_keys = {}
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("AUTOJOIN libera +#secret hunter2", router, client)
|
|
assert "[AUTOJOIN]" in lines[0]
|
|
assert any("added: #secret" in line for line in lines)
|
|
assert "#secret" in net.cfg.channels
|
|
assert net.cfg.channel_keys["#secret"] == "hunter2"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_autojoin_remove_clears_key(self) -> None:
|
|
net = _make_network("libera", State.READY,
|
|
channels={"#secret"},
|
|
channel_keys={"#secret": "hunter2"})
|
|
net.cfg.channels = ["#secret"]
|
|
net.cfg.channel_keys = {"#secret": "hunter2"}
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("AUTOJOIN libera -#secret", router, client)
|
|
assert any("removed: #secret" in line for line in lines)
|
|
assert "#secret" not in net.cfg.channels
|
|
assert "#secret" not in net.cfg.channel_keys
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_autojoin_invalid_spec(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("AUTOJOIN libera #test", router, client)
|
|
assert "must start with" in lines[0]
|
|
|
|
|
|
class TestIdentify:
|
|
@pytest.mark.asyncio
|
|
async def test_identify_missing_arg(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("IDENTIFY", router, client)
|
|
assert "Usage" in lines[0] or "provide" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_identify_not_connected(self) -> None:
|
|
net = _make_network("libera", State.DISCONNECTED)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("IDENTIFY libera", router, client)
|
|
assert "not connected" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_identify_no_creds(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
router.backlog.get_nickserv_creds_by_network.return_value = None
|
|
client = _make_client()
|
|
lines = await commands.dispatch("IDENTIFY libera", router, client)
|
|
assert "No stored credentials" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_identify_success(self) -> None:
|
|
net = _make_network("libera", State.READY, nick="fabesune")
|
|
router = _make_router(net)
|
|
router.backlog.get_nickserv_creds_by_network.return_value = ("fabesune", "secret123")
|
|
client = _make_client()
|
|
lines = await commands.dispatch("IDENTIFY libera", router, client)
|
|
assert "[IDENTIFY]" in lines[0]
|
|
net.send_raw.assert_awaited_with("PRIVMSG", "NickServ", "IDENTIFY secret123")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_identify_nick_switch(self) -> None:
|
|
net = _make_network("libera", State.READY, nick="randomnick")
|
|
router = _make_router(net)
|
|
router.backlog.get_nickserv_creds_by_network.return_value = ("fabesune", "secret123")
|
|
client = _make_client()
|
|
lines = await commands.dispatch("IDENTIFY libera", router, client)
|
|
assert any("switching nick" in line for line in lines)
|
|
calls = net.send_raw.await_args_list
|
|
assert any(c.args == ("NICK", "fabesune") for c in calls)
|
|
|
|
|
|
class TestRegister:
|
|
@pytest.mark.asyncio
|
|
async def test_register_missing_arg(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("REGISTER", router, client)
|
|
assert "Usage" in lines[0] or "provide" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_not_ready(self) -> None:
|
|
net = _make_network("libera", State.CONNECTING)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("REGISTER libera", router, client)
|
|
assert "not ready" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_success(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("REGISTER libera", router, client)
|
|
assert "[REGISTER]" in lines[0]
|
|
|
|
|
|
class TestDropCreds:
|
|
@pytest.mark.asyncio
|
|
async def test_dropcreds_no_backlog(self) -> None:
|
|
router = _make_router()
|
|
router.backlog = None
|
|
client = _make_client()
|
|
lines = await commands.dispatch("DROPCREDS libera", router, client)
|
|
assert "not available" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dropcreds_missing_arg(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("DROPCREDS", router, client)
|
|
assert "Usage" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dropcreds_specific_nick(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("DROPCREDS libera fabesune", router, client)
|
|
assert "[DROPCREDS]" in lines[0]
|
|
assert any("deleted: fabesune" in line for line in lines)
|
|
router.backlog.delete_nickserv_creds.assert_awaited_once_with("libera", "fabesune")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dropcreds_all(self) -> None:
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
router.backlog.list_nickserv_creds.return_value = [
|
|
("libera", "nick1", "a@b.c", "", 0.0, "verified", ""),
|
|
("libera", "nick2", "d@e.f", "", 0.0, "pending", ""),
|
|
]
|
|
client = _make_client()
|
|
lines = await commands.dispatch("DROPCREDS libera", router, client)
|
|
assert any("deleted: nick1" in line for line in lines)
|
|
assert any("deleted: nick2" in line for line in lines)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dropcreds_unknown_network(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("DROPCREDS fakenet", router, client)
|
|
assert "Unknown network" in lines[0]
|
|
|
|
|
|
class TestGencert:
|
|
@pytest.mark.asyncio
|
|
async def test_gencert_no_data_dir(self) -> None:
|
|
commands.DATA_DIR = None
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("GENCERT libera", router, client)
|
|
assert "not available" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gencert_missing_arg(self) -> None:
|
|
commands.DATA_DIR = Path("/tmp")
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("GENCERT", router, client)
|
|
assert "Usage" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gencert_unknown_network(self) -> None:
|
|
commands.DATA_DIR = Path("/tmp")
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("GENCERT fakenet", router, client)
|
|
assert "Unknown network" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gencert_with_nick(self, tmp_path: Path) -> None:
|
|
commands.DATA_DIR = tmp_path
|
|
net = _make_network("libera", State.READY, nick="fabesune")
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("GENCERT libera testnick", router, client)
|
|
assert "[GENCERT]" in lines[0]
|
|
assert "testnick" in lines[0]
|
|
assert any("fingerprint" in line for line in lines)
|
|
# Should auto-send CERT ADD since network is ready
|
|
net.send_raw.assert_awaited()
|
|
calls = net.send_raw.await_args_list
|
|
assert any(
|
|
c.args[0] == "PRIVMSG" and c.args[1] == "NickServ"
|
|
and "CERT ADD" in c.args[2]
|
|
for c in calls
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gencert_uses_current_nick(self, tmp_path: Path) -> None:
|
|
commands.DATA_DIR = tmp_path
|
|
net = _make_network("libera", State.READY, nick="fabesune")
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("GENCERT libera", router, client)
|
|
assert "[GENCERT]" in lines[0]
|
|
assert "fabesune" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gencert_not_ready(self, tmp_path: Path) -> None:
|
|
commands.DATA_DIR = tmp_path
|
|
net = _make_network("libera", State.CONNECTING, nick="fabesune")
|
|
router = _make_router(net)
|
|
client = _make_client()
|
|
lines = await commands.dispatch("GENCERT libera", router, client)
|
|
assert "[GENCERT]" in lines[0]
|
|
assert any("not ready" in line or "manually" in line for line in lines)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gencert_no_nick(self, tmp_path: Path) -> None:
|
|
commands.DATA_DIR = tmp_path
|
|
net = _make_network("libera", State.READY, nick="*")
|
|
router = _make_router(net)
|
|
router.backlog.get_nickserv_creds_by_network.return_value = None
|
|
client = _make_client()
|
|
lines = await commands.dispatch("GENCERT libera", router, client)
|
|
assert "No nick available" in lines[0]
|
|
|
|
|
|
class TestCertfp:
|
|
def test_certfp_no_data_dir(self) -> None:
|
|
commands.DATA_DIR = None
|
|
router = _make_router()
|
|
lines = _cmd_certfp_sync(router, None)
|
|
assert "not available" in lines[0]
|
|
|
|
def test_certfp_empty(self, tmp_path: Path) -> None:
|
|
commands.DATA_DIR = tmp_path
|
|
router = _make_router()
|
|
lines = _cmd_certfp_sync(router, None)
|
|
assert "no certificates" in lines[0]
|
|
|
|
def test_certfp_lists_certs(self, tmp_path: Path) -> None:
|
|
from bouncer.cert import generate_cert
|
|
commands.DATA_DIR = tmp_path
|
|
generate_cert(tmp_path, "libera", "fabesune")
|
|
net = _make_network("libera", State.READY)
|
|
router = _make_router(net)
|
|
lines = _cmd_certfp_sync(router, None)
|
|
assert lines[0] == "[CERTFP]"
|
|
assert any("libera" in line and "fabesune" in line for line in lines)
|
|
|
|
def test_certfp_filter_network(self, tmp_path: Path) -> None:
|
|
from bouncer.cert import generate_cert
|
|
commands.DATA_DIR = tmp_path
|
|
generate_cert(tmp_path, "libera", "nick1")
|
|
generate_cert(tmp_path, "oftc", "nick2")
|
|
libera = _make_network("libera", State.READY)
|
|
oftc = _make_network("oftc", State.READY)
|
|
router = _make_router(libera, oftc)
|
|
lines = _cmd_certfp_sync(router, "libera")
|
|
assert lines[0] == "[CERTFP]"
|
|
assert any("nick1" in line for line in lines)
|
|
assert not any("nick2" in line for line in lines)
|
|
|
|
def test_certfp_unknown_network(self, tmp_path: Path) -> None:
|
|
commands.DATA_DIR = tmp_path
|
|
router = _make_router()
|
|
lines = _cmd_certfp_sync(router, "fakenet")
|
|
assert "Unknown network" in lines[0]
|
|
|
|
|
|
class TestDelcert:
|
|
def test_delcert_no_data_dir(self) -> None:
|
|
commands.DATA_DIR = None
|
|
router = _make_router()
|
|
lines = _cmd_delcert_sync(router, "libera")
|
|
assert "not available" in lines[0]
|
|
|
|
def test_delcert_missing_arg(self) -> None:
|
|
commands.DATA_DIR = Path("/tmp")
|
|
router = _make_router()
|
|
lines = _cmd_delcert_sync(router, "")
|
|
assert "Usage" in lines[0]
|
|
|
|
def test_delcert_unknown_network(self) -> None:
|
|
commands.DATA_DIR = Path("/tmp")
|
|
router = _make_router()
|
|
lines = _cmd_delcert_sync(router, "fakenet")
|
|
assert "Unknown network" in lines[0]
|
|
|
|
def test_delcert_removes_cert(self, tmp_path: Path) -> None:
|
|
from bouncer.cert import generate_cert, has_cert
|
|
commands.DATA_DIR = tmp_path
|
|
generate_cert(tmp_path, "libera", "testnick")
|
|
assert has_cert(tmp_path, "libera", "testnick")
|
|
net = _make_network("libera", State.READY, nick="testnick")
|
|
router = _make_router(net)
|
|
lines = _cmd_delcert_sync(router, "libera testnick")
|
|
assert "[DELCERT]" in lines[0]
|
|
assert "deleted" in lines[0]
|
|
assert not has_cert(tmp_path, "libera", "testnick")
|
|
|
|
def test_delcert_nonexistent(self, tmp_path: Path) -> None:
|
|
commands.DATA_DIR = tmp_path
|
|
net = _make_network("libera", State.READY, nick="testnick")
|
|
router = _make_router(net)
|
|
lines = _cmd_delcert_sync(router, "libera testnick")
|
|
assert "no cert found" in lines[0]
|
|
|
|
def test_delcert_uses_current_nick(self, tmp_path: Path) -> None:
|
|
from bouncer.cert import generate_cert, has_cert
|
|
commands.DATA_DIR = tmp_path
|
|
generate_cert(tmp_path, "libera", "fabesune")
|
|
net = _make_network("libera", State.READY, nick="fabesune")
|
|
router = _make_router(net)
|
|
lines = _cmd_delcert_sync(router, "libera")
|
|
assert "deleted" in lines[0]
|
|
assert not has_cert(tmp_path, "libera", "fabesune")
|
|
|
|
|
|
def _cmd_certfp_sync(router: MagicMock, network_name: str | None) -> list[str]:
|
|
"""Call _cmd_certfp synchronously (it's not async)."""
|
|
from bouncer.commands import _cmd_certfp
|
|
return _cmd_certfp(router, network_name)
|
|
|
|
|
|
def _cmd_delcert_sync(router: MagicMock, arg: str) -> list[str]:
|
|
"""Call _cmd_delcert synchronously (it's not async)."""
|
|
from bouncer.commands import _cmd_delcert
|
|
return _cmd_delcert(router, arg)
|
|
|
|
|
|
class TestUnknownCommand:
|
|
@pytest.mark.asyncio
|
|
async def test_unknown_command(self) -> None:
|
|
router = _make_router()
|
|
client = _make_client()
|
|
lines = await commands.dispatch("FOOBAR", router, client)
|
|
assert "Unknown command" in lines[0]
|
|
assert "HELP" in lines[1]
|