Add RegistrationManager that periodically spawns ephemeral Network connections to register new NickServ accounts across all configured networks. Ephemeral connections reuse the existing registration lifecycle (random nick, email verification, captcha solving) with two new Network parameters: cred_network (redirect credential storage to the real network name) and ephemeral (suppress status broadcasts, skip SASL/IDENTIFY, go straight to REGISTER). - backlog: add count_verified_creds() query - config: farm_enabled, farm_interval, farm_max_accounts - network: cred_network/ephemeral params, credential ref redirection - farm: new module with sweep loop, per-network cooldown, stats - router: farm lifecycle integration, farm property - commands: FARM (status/trigger) and ACCOUNTS (list stored creds) - tests: 14 farm tests + 5 ephemeral/cred_network network tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1564 lines
52 KiB
Python
1564 lines
52 KiB
Python
"""Tests for network connection manager."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import base64
|
|
import random
|
|
import time
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from bouncer.config import BouncerConfig, NetworkConfig, ProxyConfig
|
|
from bouncer.irc import IRCMessage, parse
|
|
from bouncer.network import (
|
|
_STARTERS,
|
|
_VOWELS,
|
|
Network,
|
|
State,
|
|
_markov_word,
|
|
_nick_for_host,
|
|
_password_for_host,
|
|
_random_nick,
|
|
_rng_for_key,
|
|
_seeded_markov,
|
|
)
|
|
|
|
# -- helpers -----------------------------------------------------------------
|
|
|
|
def _cfg(name: str = "testnet", host: str = "irc.test.net", port: int = 6697,
|
|
tls: bool = True, nick: str = "", channels: list[str] | None = None,
|
|
password: str | None = None, auth_service: str = "nickserv") -> NetworkConfig:
|
|
return NetworkConfig(
|
|
name=name, host=host, port=port, tls=tls, nick=nick,
|
|
channels=channels or [], password=password, auth_service=auth_service,
|
|
)
|
|
|
|
|
|
def _proxy() -> ProxyConfig:
|
|
return ProxyConfig(host="127.0.0.1", port=1080)
|
|
|
|
|
|
def _bouncer(**overrides: object) -> BouncerConfig:
|
|
defaults: dict[str, object] = {
|
|
"probation_seconds": 1,
|
|
"nick_timeout": 1,
|
|
"rejoin_delay": 0,
|
|
"backoff_steps": [0],
|
|
}
|
|
defaults.update(overrides)
|
|
return BouncerConfig(**defaults) # type: ignore[arg-type]
|
|
|
|
|
|
def _net(cfg: NetworkConfig | None = None, backlog: AsyncMock | None = None,
|
|
bouncer_cfg: BouncerConfig | None = None,
|
|
on_message: MagicMock | None = None) -> Network:
|
|
return Network(
|
|
cfg=cfg or _cfg(),
|
|
proxy_cfg=_proxy(),
|
|
backlog=backlog,
|
|
on_message=on_message,
|
|
bouncer_cfg=bouncer_cfg or _bouncer(),
|
|
)
|
|
|
|
|
|
def _mock_backlog(**kw: object) -> AsyncMock:
|
|
bl = AsyncMock()
|
|
bl.get_nickserv_creds_by_network.return_value = kw.get("creds_by_network")
|
|
bl.get_nickserv_creds_by_host.return_value = kw.get("creds_by_host")
|
|
bl.get_pending_registration.return_value = kw.get("pending")
|
|
bl.save_nickserv_creds = AsyncMock()
|
|
bl.mark_nickserv_verified = AsyncMock()
|
|
return bl
|
|
|
|
|
|
def _msg(raw: str) -> IRCMessage:
|
|
"""Parse a raw IRC line into an IRCMessage."""
|
|
return parse(raw.encode())
|
|
|
|
|
|
# -- markov / nick generation -----------------------------------------------
|
|
|
|
class TestMarkovWord:
|
|
def test_length_bounds(self) -> None:
|
|
for _ in range(100):
|
|
word = _markov_word(4, 6)
|
|
assert 4 <= len(word) <= 6
|
|
|
|
def test_all_alpha(self) -> None:
|
|
for _ in range(50):
|
|
assert _markov_word(5, 8).isalpha()
|
|
|
|
def test_no_triple_consonants(self) -> None:
|
|
for _ in range(200):
|
|
word = _markov_word(5, 10)
|
|
consonant_run = 0
|
|
for ch in word:
|
|
if ch not in _VOWELS:
|
|
consonant_run += 1
|
|
else:
|
|
consonant_run = 0
|
|
assert consonant_run <= 2, f"triple consonant in {word!r}"
|
|
|
|
def test_starts_with_starter(self) -> None:
|
|
for _ in range(50):
|
|
word = _markov_word(3, 5)
|
|
assert word[0] in _STARTERS
|
|
|
|
|
|
class TestSeededMarkov:
|
|
def test_deterministic(self) -> None:
|
|
rng1 = random.Random(42)
|
|
rng2 = random.Random(42)
|
|
assert _seeded_markov(rng1, 5, 8) == _seeded_markov(rng2, 5, 8)
|
|
|
|
def test_different_seeds_differ(self) -> None:
|
|
results = {_seeded_markov(random.Random(i), 5, 8) for i in range(20)}
|
|
assert len(results) > 1
|
|
|
|
|
|
class TestRandomNick:
|
|
def test_length(self) -> None:
|
|
for _ in range(50):
|
|
nick = _random_nick()
|
|
assert 5 <= len(nick) <= 10 # 5-8 base + optional 0-2 digit suffix
|
|
|
|
def test_starts_alpha(self) -> None:
|
|
for _ in range(50):
|
|
assert _random_nick()[0].isalpha()
|
|
|
|
|
|
class TestNickForHost:
|
|
def test_deterministic(self) -> None:
|
|
assert _nick_for_host("example.com") == _nick_for_host("example.com")
|
|
|
|
def test_different_hosts_differ(self) -> None:
|
|
nicks = {_nick_for_host(f"host{i}.example.com") for i in range(10)}
|
|
assert len(nicks) > 1
|
|
|
|
|
|
class TestPasswordForHost:
|
|
def test_deterministic(self) -> None:
|
|
assert _password_for_host("h.com") == _password_for_host("h.com")
|
|
|
|
def test_length_16(self) -> None:
|
|
assert len(_password_for_host("any.host")) == 16
|
|
|
|
def test_hex_chars(self) -> None:
|
|
pw = _password_for_host("foo.bar")
|
|
assert all(c in "0123456789abcdef" for c in pw)
|
|
|
|
def test_different_from_nick(self) -> None:
|
|
"""Password and nick use different hash domains."""
|
|
host = "same.host"
|
|
pw = _password_for_host(host)
|
|
nick = _nick_for_host(host)
|
|
assert pw != nick
|
|
|
|
|
|
class TestRngForKey:
|
|
def test_deterministic(self) -> None:
|
|
a = _rng_for_key("test").random()
|
|
b = _rng_for_key("test").random()
|
|
assert a == b
|
|
|
|
def test_different_keys(self) -> None:
|
|
a = _rng_for_key("key1").random()
|
|
b = _rng_for_key("key2").random()
|
|
assert a != b
|
|
|
|
|
|
class TestEmailForHost:
|
|
@patch("bouncer.network._email_domains", return_value=["mail.tm", "mail.gw"])
|
|
def test_returns_email(self, _mock: MagicMock) -> None:
|
|
from bouncer.network import _email_for_host
|
|
email = _email_for_host("test.host")
|
|
assert email is not None
|
|
assert "@" in email
|
|
|
|
@patch("bouncer.network._email_domains", return_value=["mail.tm"])
|
|
def test_excludes_domain(self, _mock: MagicMock) -> None:
|
|
from bouncer.network import _email_for_host
|
|
result = _email_for_host("host", excluded={"mail.tm"})
|
|
assert result is None
|
|
|
|
@patch("bouncer.network._email_domains", return_value=[])
|
|
def test_no_domains_returns_none(self, _mock: MagicMock) -> None:
|
|
from bouncer.network import _email_for_host
|
|
assert _email_for_host("host") is None
|
|
|
|
|
|
# -- state properties --------------------------------------------------------
|
|
|
|
class TestStateProperties:
|
|
def test_initial_state(self) -> None:
|
|
net = _net()
|
|
assert net.state == State.DISCONNECTED
|
|
assert not net.connected
|
|
assert not net.registered
|
|
assert not net.ready
|
|
|
|
def test_connecting_not_connected(self) -> None:
|
|
net = _net()
|
|
net.state = State.CONNECTING
|
|
assert not net.connected
|
|
|
|
def test_registering_is_connected(self) -> None:
|
|
net = _net()
|
|
net.state = State.REGISTERING
|
|
assert net.connected
|
|
assert not net.registered
|
|
|
|
def test_probation_is_registered(self) -> None:
|
|
net = _net()
|
|
net.state = State.PROBATION
|
|
assert net.connected
|
|
assert net.registered
|
|
assert not net.ready
|
|
|
|
def test_ready(self) -> None:
|
|
net = _net()
|
|
net.state = State.READY
|
|
assert net.connected
|
|
assert net.registered
|
|
assert net.ready
|
|
|
|
|
|
class TestInit:
|
|
def test_defaults(self) -> None:
|
|
net = _net()
|
|
assert net.nick == "*"
|
|
assert net.channels == set()
|
|
assert net.topics == {}
|
|
assert net.names == {}
|
|
assert net.visible_host is None
|
|
|
|
def test_nick_from_config(self) -> None:
|
|
net = _net(cfg=_cfg(nick="mynick"))
|
|
assert net.nick == "mynick"
|
|
|
|
|
|
# -- send helpers ------------------------------------------------------------
|
|
|
|
class TestSend:
|
|
@pytest.mark.asyncio
|
|
async def test_send_writes_to_writer(self) -> None:
|
|
net = _net()
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
msg = IRCMessage(command="PRIVMSG", params=["#test", "hello world"])
|
|
await net.send(msg)
|
|
writer.write.assert_called_once_with(b"PRIVMSG #test :hello world\r\n")
|
|
writer.drain.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_skips_closing_writer(self) -> None:
|
|
net = _net()
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = True
|
|
net._writer = writer
|
|
|
|
await net.send(IRCMessage(command="PING"))
|
|
writer.write.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_skips_none_writer(self) -> None:
|
|
net = _net()
|
|
net._writer = None
|
|
# Should not raise
|
|
await net.send(IRCMessage(command="PING"))
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_raw(self) -> None:
|
|
net = _net()
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net.send_raw("JOIN", "#channel")
|
|
writer.write.assert_called_once_with(b"JOIN #channel\r\n")
|
|
|
|
|
|
# -- _handle: PING/PONG -----------------------------------------------------
|
|
|
|
class TestHandlePing:
|
|
@pytest.mark.asyncio
|
|
async def test_ping_pong(self) -> None:
|
|
net = _net()
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg("PING :server.example.com"))
|
|
writer.write.assert_called_once_with(b"PONG server.example.com\r\n")
|
|
|
|
|
|
# -- _handle: ERROR ---------------------------------------------------------
|
|
|
|
class TestHandleError:
|
|
@pytest.mark.asyncio
|
|
async def test_error_sets_status(self) -> None:
|
|
status = MagicMock()
|
|
net = _net()
|
|
net.on_status = status
|
|
|
|
await net._handle(_msg(":server ERROR :K-lined"))
|
|
status.assert_called_once_with("testnet", "ERROR: K-lined")
|
|
|
|
|
|
# -- _handle: 001 (RPL_WELCOME) ---------------------------------------------
|
|
|
|
class TestHandleWelcome:
|
|
@pytest.mark.asyncio
|
|
async def test_sets_nick_from_params(self) -> None:
|
|
net = _net()
|
|
net.state = State.REGISTERING
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg(":server 001 coolguy :Welcome to the network coolguy!user@host.example.com"))
|
|
assert net.nick == "coolguy"
|
|
assert net.state == State.PROBATION
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extracts_visible_host(self) -> None:
|
|
net = _net()
|
|
net.state = State.REGISTERING
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg(":server 001 nick :Welcome to the IRC Network nick!user@visible.host.com"))
|
|
assert net.visible_host == "visible.host.com"
|
|
|
|
|
|
# -- _handle: 396 (RPL_VISIBLEHOST) -----------------------------------------
|
|
|
|
class TestHandleVisibleHost:
|
|
@pytest.mark.asyncio
|
|
async def test_updates_visible_host(self) -> None:
|
|
net = _net()
|
|
await net._handle(_msg(":server 396 nick new.host.com :is now your displayed host"))
|
|
assert net.visible_host == "new.host.com"
|
|
|
|
|
|
# -- _handle: NICK ----------------------------------------------------------
|
|
|
|
class TestHandleNick:
|
|
@pytest.mark.asyncio
|
|
async def test_own_nick_change(self) -> None:
|
|
net = _net()
|
|
net.nick = "oldnick"
|
|
await net._handle(_msg(":oldnick!user@host NICK newnick"))
|
|
assert net.nick == "newnick"
|
|
assert net._nick_confirmed.is_set()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_other_nick_change_ignored(self) -> None:
|
|
net = _net()
|
|
net.nick = "mynick"
|
|
await net._handle(_msg(":othernick!user@host NICK somethingelse"))
|
|
assert net.nick == "mynick"
|
|
assert not net._nick_confirmed.is_set()
|
|
|
|
|
|
# -- _handle: JOIN/PART -----------------------------------------------------
|
|
|
|
class TestHandleJoinPart:
|
|
@pytest.mark.asyncio
|
|
async def test_own_join(self) -> None:
|
|
net = _net()
|
|
net.nick = "me"
|
|
await net._handle(_msg(":me!user@host JOIN #test"))
|
|
assert "#test" in net.channels
|
|
assert "#test" in net.names
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_other_join_ignored(self) -> None:
|
|
net = _net()
|
|
net.nick = "me"
|
|
await net._handle(_msg(":other!user@host JOIN #test"))
|
|
assert "#test" not in net.channels
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_own_part(self) -> None:
|
|
net = _net()
|
|
net.nick = "me"
|
|
net.channels = {"#test"}
|
|
net.names["#test"] = {"me", "other"}
|
|
net.topics["#test"] = "Topic"
|
|
|
|
await net._handle(_msg(":me!user@host PART #test"))
|
|
assert "#test" not in net.channels
|
|
assert "#test" not in net.names
|
|
assert "#test" not in net.topics
|
|
|
|
|
|
# -- _handle: 332 (RPL_TOPIC), 353 (RPL_NAMREPLY) --------------------------
|
|
|
|
class TestHandleTopicNames:
|
|
@pytest.mark.asyncio
|
|
async def test_topic(self) -> None:
|
|
net = _net()
|
|
await net._handle(_msg(":server 332 me #test :Welcome to the channel"))
|
|
assert net.topics["#test"] == "Welcome to the channel"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_namreply(self) -> None:
|
|
net = _net()
|
|
await net._handle(_msg(":server 353 me = #test :@op +voice regular"))
|
|
assert net.names["#test"] == {"@op", "+voice", "regular"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_namreply_accumulates(self) -> None:
|
|
net = _net()
|
|
await net._handle(_msg(":server 353 me = #test :nick1 nick2"))
|
|
await net._handle(_msg(":server 353 me = #test :nick3"))
|
|
assert net.names["#test"] == {"nick1", "nick2", "nick3"}
|
|
|
|
|
|
# -- _handle: 433 (ERR_NICKNAMEINUSE) ---------------------------------------
|
|
|
|
class TestHandleNickInUse:
|
|
@pytest.mark.asyncio
|
|
async def test_during_registration_picks_random(self) -> None:
|
|
net = _net()
|
|
net.state = State.REGISTERING
|
|
net.nick = "taken"
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg(":server 433 * taken :Nickname is already in use"))
|
|
# Should have changed to a new random nick
|
|
assert net.nick != "taken"
|
|
assert writer.write.called
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_when_ready_appends_underscore(self) -> None:
|
|
net = _net()
|
|
net.state = State.READY
|
|
net.nick = "desired"
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg(":server 433 * desired :Nickname is already in use"))
|
|
assert net.nick == "desired_"
|
|
writer.write.assert_called_once_with(b"NICK desired_\r\n")
|
|
|
|
|
|
# -- _handle: KICK ----------------------------------------------------------
|
|
|
|
class TestHandleKick:
|
|
@pytest.mark.asyncio
|
|
async def test_kicked_removes_channel(self) -> None:
|
|
net = _net(cfg=_cfg(channels=["#test"]))
|
|
net.state = State.READY
|
|
net._running = True
|
|
net.nick = "me"
|
|
net.channels = {"#test"}
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg(":op!user@host KICK #test me :reason"))
|
|
assert "#test" not in net.channels
|
|
# Should rejoin (rejoin_delay=0)
|
|
writer.write.assert_called_with(b"JOIN #test\r\n")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_kick_other_user_ignored(self) -> None:
|
|
net = _net()
|
|
net.nick = "me"
|
|
net.channels = {"#test"}
|
|
await net._handle(_msg(":op!user@host KICK #test other :reason"))
|
|
assert "#test" in net.channels
|
|
|
|
|
|
# -- _handle: NOTICE (hostname extraction) -----------------------------------
|
|
|
|
class TestHandleNotice:
|
|
@pytest.mark.asyncio
|
|
async def test_found_hostname_notice(self) -> None:
|
|
net = _net()
|
|
await net._handle(
|
|
_msg(":server NOTICE * :*** Found your hostname: some.isp.example.com")
|
|
)
|
|
assert net.visible_host == "some.isp.example.com"
|
|
|
|
|
|
# -- _handle: CAP (SASL negotiation) ----------------------------------------
|
|
|
|
class TestHandleCap:
|
|
@pytest.mark.asyncio
|
|
async def test_cap_ack_sasl_starts_authenticate(self) -> None:
|
|
net = _net()
|
|
net._sasl_mechanism = "PLAIN"
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg(":server CAP * ACK :sasl"))
|
|
writer.write.assert_called_with(b"AUTHENTICATE PLAIN\r\n")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cap_nak_sasl_ends_cap(self) -> None:
|
|
net = _net()
|
|
net._sasl_mechanism = "PLAIN"
|
|
net._sasl_nick = "nick"
|
|
net._sasl_pass = "pass"
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg(":server CAP * NAK :sasl"))
|
|
writer.write.assert_called_with(b"CAP END\r\n")
|
|
assert net._sasl_mechanism == ""
|
|
assert net._sasl_nick == ""
|
|
|
|
|
|
# -- _handle: AUTHENTICATE --------------------------------------------------
|
|
|
|
class TestHandleAuthenticate:
|
|
@pytest.mark.asyncio
|
|
async def test_plain_credentials(self) -> None:
|
|
net = _net()
|
|
net._sasl_mechanism = "PLAIN"
|
|
net._sasl_nick = "mynick"
|
|
net._sasl_pass = "mypass"
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg("AUTHENTICATE +"))
|
|
expected = base64.b64encode(b"mynick\0mynick\0mypass").decode()
|
|
writer.write.assert_called_with(f"AUTHENTICATE {expected}\r\n".encode())
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_external_sends_nick(self) -> None:
|
|
net = _net()
|
|
net._sasl_mechanism = "EXTERNAL"
|
|
net._sasl_nick = "extnick"
|
|
net._sasl_pass = "pass"
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg("AUTHENTICATE +"))
|
|
expected = base64.b64encode(b"extnick").decode()
|
|
writer.write.assert_called_with(f"AUTHENTICATE {expected}\r\n".encode())
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_creds_aborts(self) -> None:
|
|
net = _net()
|
|
net._sasl_mechanism = "PLAIN"
|
|
net._sasl_nick = ""
|
|
net._sasl_pass = ""
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg("AUTHENTICATE +"))
|
|
writer.write.assert_called_with(b"AUTHENTICATE *\r\n")
|
|
|
|
|
|
# -- _handle: 903 (RPL_SASLSUCCESS) ----------------------------------------
|
|
|
|
class TestHandleSaslSuccess:
|
|
@pytest.mark.asyncio
|
|
async def test_sasl_success_sets_event(self) -> None:
|
|
net = _net()
|
|
net._sasl_mechanism = "PLAIN"
|
|
net._sasl_nick = "nick"
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg(":server 903 nick :SASL authentication successful"))
|
|
assert net._sasl_complete.is_set()
|
|
writer.write.assert_called_with(b"CAP END\r\n")
|
|
|
|
|
|
# -- _handle: 904/905 (SASL failure) ----------------------------------------
|
|
|
|
class TestHandleSaslFailure:
|
|
@pytest.mark.asyncio
|
|
async def test_external_falls_back_to_plain(self) -> None:
|
|
net = _net()
|
|
net._sasl_mechanism = "EXTERNAL"
|
|
net._sasl_nick = "nick"
|
|
net._sasl_pass = "pass"
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg(":server 904 nick :SASL authentication failed"))
|
|
assert net._sasl_mechanism == "PLAIN"
|
|
writer.write.assert_called_with(b"AUTHENTICATE PLAIN\r\n")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plain_failure_ends_cap(self) -> None:
|
|
net = _net()
|
|
net._sasl_mechanism = "PLAIN"
|
|
net._sasl_nick = "nick"
|
|
net._sasl_pass = "pass"
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg(":server 904 nick :SASL authentication failed"))
|
|
assert net._sasl_mechanism == ""
|
|
assert net._sasl_nick == ""
|
|
writer.write.assert_called_with(b"CAP END\r\n")
|
|
|
|
|
|
# -- _handle: 906/908 (SASL aborted/mechs) ----------------------------------
|
|
|
|
class TestHandleSaslAborted:
|
|
@pytest.mark.asyncio
|
|
async def test_906_ends_cap(self) -> None:
|
|
net = _net()
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg(":server 906 nick :SASL authentication aborted"))
|
|
writer.write.assert_called_with(b"CAP END\r\n")
|
|
|
|
|
|
# -- message forwarding to router -------------------------------------------
|
|
|
|
class TestOnMessage:
|
|
@pytest.mark.asyncio
|
|
async def test_on_message_called_for_non_ping(self) -> None:
|
|
cb = MagicMock()
|
|
net = _net(on_message=cb)
|
|
await net._handle(_msg(":nick!user@host PRIVMSG #test :hello"))
|
|
cb.assert_called_once()
|
|
assert cb.call_args[0][0] == "testnet"
|
|
assert cb.call_args[0][1].command == "PRIVMSG"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_on_message_not_called_for_ping(self) -> None:
|
|
cb = MagicMock()
|
|
net = _net(on_message=cb)
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle(_msg("PING :server"))
|
|
cb.assert_not_called()
|
|
|
|
|
|
# -- probation timer ---------------------------------------------------------
|
|
|
|
class TestProbation:
|
|
@pytest.mark.asyncio
|
|
async def test_enter_probation(self) -> None:
|
|
net = _net(bouncer_cfg=_bouncer(probation_seconds=0))
|
|
net.state = State.REGISTERING
|
|
net._running = True
|
|
# Mock _go_ready to prevent full NickServ flow
|
|
net._go_ready = AsyncMock()
|
|
|
|
await net._enter_probation()
|
|
assert net.state == State.PROBATION
|
|
assert net._probation_task is not None
|
|
|
|
# Wait for the probation timer to fire
|
|
await net._probation_task
|
|
net._go_ready.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_probation_cancelled(self) -> None:
|
|
net = _net(bouncer_cfg=_bouncer(probation_seconds=60))
|
|
net.state = State.REGISTERING
|
|
net._running = True
|
|
net._go_ready = AsyncMock()
|
|
|
|
await net._enter_probation()
|
|
net._probation_task.cancel()
|
|
try:
|
|
await net._probation_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
net._go_ready.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_probation_resets_reconnect_counter(self) -> None:
|
|
net = _net(bouncer_cfg=_bouncer(probation_seconds=0))
|
|
net.state = State.REGISTERING
|
|
net._running = True
|
|
net._reconnect_attempt = 5
|
|
net._go_ready = AsyncMock()
|
|
|
|
await net._enter_probation()
|
|
await net._probation_task
|
|
assert net._reconnect_attempt == 0
|
|
|
|
|
|
# -- reconnection backoff ---------------------------------------------------
|
|
|
|
class TestReconnect:
|
|
@pytest.mark.asyncio
|
|
async def test_backoff_increments(self) -> None:
|
|
net = _net(bouncer_cfg=_bouncer(backoff_steps=[0, 0, 0]))
|
|
net._running = True
|
|
|
|
with patch.object(net, "_connect", new_callable=AsyncMock) as mock_connect:
|
|
assert net._reconnect_attempt == 0
|
|
net._schedule_reconnect()
|
|
await net._reconnect_task
|
|
assert net._reconnect_attempt == 1
|
|
mock_connect.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backoff_clamps_to_last_step(self) -> None:
|
|
steps = [1, 2, 5]
|
|
net = _net(bouncer_cfg=_bouncer(backoff_steps=steps))
|
|
net._running = True
|
|
net._reconnect_attempt = 100
|
|
|
|
with patch.object(net, "_connect", new_callable=AsyncMock):
|
|
net._schedule_reconnect()
|
|
# The delay should use the last step (5), but since we can't
|
|
# easily measure the sleep duration, verify the attempt incremented
|
|
await net._reconnect_task
|
|
assert net._reconnect_attempt == 101
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reconnect_cancelled(self) -> None:
|
|
net = _net(bouncer_cfg=_bouncer(backoff_steps=[60]))
|
|
net._running = True
|
|
|
|
with patch.object(net, "_connect", new_callable=AsyncMock) as mock_connect:
|
|
net._schedule_reconnect()
|
|
net._reconnect_task.cancel()
|
|
try:
|
|
await net._reconnect_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
mock_connect.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_reconnect_when_stopped(self) -> None:
|
|
net = _net(bouncer_cfg=_bouncer(backoff_steps=[0]))
|
|
net._running = False
|
|
|
|
with patch.object(net, "_connect", new_callable=AsyncMock) as mock_connect:
|
|
net._schedule_reconnect()
|
|
await net._reconnect_task
|
|
mock_connect.assert_not_awaited()
|
|
|
|
|
|
# -- disconnect --------------------------------------------------------------
|
|
|
|
class TestDisconnect:
|
|
@pytest.mark.asyncio
|
|
async def test_disconnect_closes_writer(self) -> None:
|
|
net = _net()
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.wait_closed = AsyncMock()
|
|
net._writer = writer
|
|
net._reader = MagicMock()
|
|
|
|
await net._disconnect()
|
|
assert net.state == State.DISCONNECTED
|
|
writer.close.assert_called_once()
|
|
assert net._writer is None
|
|
assert net._reader is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_disconnect_cancels_probation(self) -> None:
|
|
net = _net()
|
|
task = MagicMock()
|
|
task.done.return_value = False
|
|
net._probation_task = task
|
|
|
|
await net._disconnect()
|
|
task.cancel.assert_called_once()
|
|
assert net._probation_task is None
|
|
|
|
|
|
# -- stop --------------------------------------------------------------------
|
|
|
|
class TestStop:
|
|
@pytest.mark.asyncio
|
|
async def test_stop_cancels_tasks(self) -> None:
|
|
net = _net()
|
|
net._running = True
|
|
read_task = MagicMock()
|
|
read_task.done.return_value = False
|
|
reconnect_task = MagicMock()
|
|
reconnect_task.done.return_value = False
|
|
net._read_task = read_task
|
|
net._reconnect_task = reconnect_task
|
|
|
|
await net.stop()
|
|
assert not net._running
|
|
read_task.cancel.assert_called_once()
|
|
reconnect_task.cancel.assert_called_once()
|
|
assert net.state == State.DISCONNECTED
|
|
|
|
|
|
# -- _connect ----------------------------------------------------------------
|
|
|
|
class TestConnect:
|
|
@pytest.mark.asyncio
|
|
async def test_connect_no_sasl(self) -> None:
|
|
"""Without stored creds, uses random nick and no SASL."""
|
|
net = _net()
|
|
reader = MagicMock()
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
|
|
with patch("bouncer.proxy.connect", new_callable=AsyncMock,
|
|
return_value=(reader, writer)):
|
|
# Prevent the read loop from actually running
|
|
with patch.object(net, "_read_loop", new_callable=AsyncMock):
|
|
await net._connect()
|
|
|
|
assert net.state == State.REGISTERING
|
|
# Should have sent NICK and USER
|
|
calls = [c.args[0] for c in writer.write.call_args_list]
|
|
nick_sent = any(b"NICK " in c for c in calls)
|
|
user_sent = any(b"USER " in c for c in calls)
|
|
assert nick_sent
|
|
assert user_sent
|
|
# Should have sent CAP REQ server-time but NOT CAP REQ sasl
|
|
cap_server_time = any(b"CAP REQ server-time" in c for c in calls)
|
|
cap_sasl = any(b"CAP REQ sasl" in c for c in calls)
|
|
assert cap_server_time
|
|
assert not cap_sasl
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connect_with_sasl_plain(self) -> None:
|
|
"""With stored creds and no cert, uses SASL PLAIN."""
|
|
bl = _mock_backlog(creds_by_network=("registered_nick", "secret"))
|
|
net = _net(backlog=bl)
|
|
reader = MagicMock()
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
|
|
with patch("bouncer.proxy.connect", new_callable=AsyncMock,
|
|
return_value=(reader, writer)):
|
|
with patch.object(net, "_read_loop", new_callable=AsyncMock):
|
|
await net._connect()
|
|
|
|
assert net._sasl_mechanism == "PLAIN"
|
|
assert net._sasl_nick == "registered_nick"
|
|
assert net._connect_nick == "registered_nick"
|
|
calls = [c.args[0] for c in writer.write.call_args_list]
|
|
cap_sent = any(b"CAP REQ sasl" in c for c in calls)
|
|
assert cap_sent
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connect_failure_schedules_reconnect(self) -> None:
|
|
"""Connection failure should schedule a reconnect."""
|
|
net = _net()
|
|
net._running = True
|
|
|
|
with patch("bouncer.proxy.connect", new_callable=AsyncMock,
|
|
side_effect=ConnectionRefusedError("refused")):
|
|
with patch.object(net, "_schedule_reconnect") as mock_sched:
|
|
await net._connect()
|
|
|
|
assert net.state == State.DISCONNECTED
|
|
mock_sched.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connect_sends_server_password(self) -> None:
|
|
"""If network has a password, sends PASS before NICK."""
|
|
net = _net(cfg=_cfg(password="serverpass"))
|
|
reader = MagicMock()
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
|
|
with patch("bouncer.proxy.connect", new_callable=AsyncMock,
|
|
return_value=(reader, writer)):
|
|
with patch.object(net, "_read_loop", new_callable=AsyncMock):
|
|
await net._connect()
|
|
|
|
calls = [c.args[0] for c in writer.write.call_args_list]
|
|
pass_sent = any(b"PASS serverpass" in c for c in calls)
|
|
assert pass_sent
|
|
|
|
|
|
# -- _go_ready ---------------------------------------------------------------
|
|
|
|
class TestGoReady:
|
|
@pytest.mark.asyncio
|
|
async def test_sasl_skips_nickserv(self) -> None:
|
|
"""When SASL already authenticated, skip NickServ flow."""
|
|
bl = _mock_backlog()
|
|
net = _net(backlog=bl, cfg=_cfg(auth_service="nickserv"))
|
|
net.state = State.PROBATION
|
|
net.nick = "autheduser"
|
|
net._sasl_complete.set()
|
|
|
|
await net._go_ready()
|
|
assert net.state == State.READY
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_auth_service_none_skips_auth(self) -> None:
|
|
"""When auth_service=none, no authentication attempted."""
|
|
status = MagicMock()
|
|
net = _net(cfg=_cfg(auth_service="none"))
|
|
net.on_status = status
|
|
net.state = State.PROBATION
|
|
net.nick = "anon"
|
|
|
|
await net._go_ready()
|
|
assert net.state == State.READY
|
|
status.assert_called_with("testnet", "ready as anon (no auth service)")
|
|
|
|
|
|
# -- NickServ response parsing -----------------------------------------------
|
|
|
|
class TestNickServMatchers:
|
|
def test_registration_confirmed(self) -> None:
|
|
net = _net()
|
|
assert net._registration_confirmed("a passcode has been sent to your email")
|
|
assert net._registration_confirmed("activation instructions have been sent")
|
|
assert not net._registration_confirmed("you are now identified")
|
|
|
|
def test_registration_immediate(self) -> None:
|
|
net = _net()
|
|
assert net._registration_immediate("nickname registered under your account")
|
|
assert not net._registration_immediate("nickname registered, email verification required")
|
|
|
|
def test_verification_succeeded(self) -> None:
|
|
net = _net()
|
|
assert net._verification_succeeded("your nick has been verified")
|
|
assert net._verification_succeeded("has now been activated for use")
|
|
assert net._verification_succeeded("you are now identified for this nick")
|
|
assert not net._verification_succeeded("verification pending")
|
|
|
|
|
|
# -- _handle_nickserv --------------------------------------------------------
|
|
|
|
class TestHandleNickserv:
|
|
@pytest.mark.asyncio
|
|
async def test_identify_success(self) -> None:
|
|
bl = _mock_backlog()
|
|
net = _net(backlog=bl)
|
|
net.nick = "mynick"
|
|
net.visible_host = "user/mynick"
|
|
net._nickserv_pending = "identify"
|
|
net._nickserv_password = "secret"
|
|
net._nickserv_done = asyncio.Event()
|
|
|
|
await net._handle_nickserv("You are now identified for mynick")
|
|
assert net._nickserv_pending == ""
|
|
assert net._nickserv_done.is_set()
|
|
bl.save_nickserv_creds.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_identify_not_registered_triggers_register(self) -> None:
|
|
net = _net()
|
|
net._nickserv_pending = "identify"
|
|
net._nickserv_done = asyncio.Event()
|
|
|
|
with patch.object(net, "_nickserv_register", new_callable=AsyncMock) as mock_reg:
|
|
await net._handle_nickserv("mynick is not a registered nickname")
|
|
mock_reg.assert_awaited_once()
|
|
assert net._nickserv_pending == ""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_identify_wrong_password(self) -> None:
|
|
net = _net()
|
|
net._nickserv_pending = "identify"
|
|
net._nickserv_done = asyncio.Event()
|
|
|
|
await net._handle_nickserv("Invalid password for mynick")
|
|
assert net._nickserv_pending == ""
|
|
assert net._nickserv_done.is_set()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_email_sent(self) -> None:
|
|
bl = _mock_backlog()
|
|
net = _net(backlog=bl)
|
|
net.nick = "newnick"
|
|
net._nickserv_pending = "register"
|
|
net._nickserv_password = "pass"
|
|
net._nickserv_email = "test@mail.tm"
|
|
net._nickserv_done = asyncio.Event()
|
|
|
|
await net._handle_nickserv("A passcode has been sent to test@mail.tm")
|
|
assert net._nickserv_pending == "verify"
|
|
bl.save_nickserv_creds.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_domain_rejected(self) -> None:
|
|
net = _net()
|
|
net._nickserv_pending = "register"
|
|
net._nickserv_email = "user@bad.domain"
|
|
net._nickserv_done = asyncio.Event()
|
|
|
|
with patch.object(net, "_nickserv_register", new_callable=AsyncMock):
|
|
await net._handle_nickserv("bad.domain do not accept email from that address")
|
|
assert "bad.domain" in net._rejected_email_domains
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_nick_taken(self) -> None:
|
|
net = _net()
|
|
net._nickserv_pending = "register"
|
|
net._nickserv_done = asyncio.Event()
|
|
|
|
await net._handle_nickserv("mynick is already registered by someone else")
|
|
assert net._nickserv_pending == ""
|
|
assert net._nickserv_done.is_set()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_rate_limited(self) -> None:
|
|
net = _net()
|
|
net._nickserv_pending = "register"
|
|
net._nickserv_done = asyncio.Event()
|
|
|
|
await net._handle_nickserv("You have sent too many registration requests")
|
|
assert net._nickserv_pending == ""
|
|
assert net._nickserv_done.is_set()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_verify_success(self) -> None:
|
|
bl = _mock_backlog()
|
|
net = _net(backlog=bl)
|
|
net.nick = "verified_nick"
|
|
net._nickserv_pending = "verify"
|
|
net._nickserv_password = "pass"
|
|
|
|
await net._handle_nickserv("verified_nick has been verified")
|
|
assert net._nickserv_pending == ""
|
|
bl.mark_nickserv_verified.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_verify_success_signals_completion(self) -> None:
|
|
"""_on_verify_success must signal _nickserv_done."""
|
|
bl = _mock_backlog()
|
|
net = _net(backlog=bl)
|
|
net.nick = "verified_nick"
|
|
net._nickserv_pending = "verify"
|
|
net._nickserv_password = "pass"
|
|
net._nickserv_done = asyncio.Event()
|
|
|
|
await net._handle_nickserv("verified_nick has been verified")
|
|
assert net._nickserv_pending == ""
|
|
assert net._nickserv_done.is_set()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_registration_immediate_signals_completion(self) -> None:
|
|
"""Immediate registration (no email) must signal _nickserv_done."""
|
|
bl = _mock_backlog()
|
|
net = _net(backlog=bl)
|
|
net.nick = "fastnick"
|
|
net._nickserv_pending = "register"
|
|
net._nickserv_password = "pass"
|
|
net._nickserv_done = asyncio.Event()
|
|
|
|
await net._handle_nickserv("Nickname registered under your account")
|
|
assert net._nickserv_pending == ""
|
|
assert net._nickserv_done.is_set()
|
|
bl.mark_nickserv_verified.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_verify_failure(self) -> None:
|
|
net = _net()
|
|
net._nickserv_pending = "verify"
|
|
status = MagicMock()
|
|
net.on_status = status
|
|
|
|
await net._handle_nickserv("Invalid verification code")
|
|
assert net._nickserv_pending == ""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resume_pending_restores_verify_url(self) -> None:
|
|
"""Cross-session resume must restore the verify_url from DB."""
|
|
bl = _mock_backlog(
|
|
pending=("oldnick", "pass", "e@mail.tm", "host", "https://oftc/verify/abc"),
|
|
)
|
|
net = _net(backlog=bl)
|
|
net.state = State.READY
|
|
net._running = True
|
|
net.nick = "oldnick"
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
result = await net._resume_pending_verification()
|
|
assert result is True
|
|
assert net._verify_url == "https://oftc/verify/abc"
|
|
assert net._nickserv_email == "e@mail.tm"
|
|
assert net._nickserv_password == "pass"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resume_pending_no_url(self) -> None:
|
|
"""Resume works when verify_url is empty."""
|
|
bl = _mock_backlog(
|
|
pending=("nick", "pass", "e@mail.tm", "host", ""),
|
|
)
|
|
net = _net(backlog=bl)
|
|
net.state = State.READY
|
|
net._running = True
|
|
net.nick = "nick"
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
result = await net._resume_pending_verification()
|
|
assert result is True
|
|
assert net._verify_url == ""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_late_registration_confirmation(self) -> None:
|
|
"""After timeout clears pending state, late confirmation still works."""
|
|
bl = _mock_backlog()
|
|
net = _net(backlog=bl)
|
|
net.nick = "latenick"
|
|
net._nickserv_pending = ""
|
|
net._nickserv_password = "pass"
|
|
net._nickserv_email = "late@mail.tm"
|
|
net._nickserv_done = asyncio.Event()
|
|
|
|
await net._handle_nickserv("A passcode has been sent to your email")
|
|
bl.save_nickserv_creds.assert_awaited_once()
|
|
|
|
|
|
# -- _handle_qbot -----------------------------------------------------------
|
|
|
|
class TestHandleQbot:
|
|
@pytest.mark.asyncio
|
|
async def test_qbot_auth_success(self) -> None:
|
|
net = _net(cfg=_cfg(nick="quser", auth_service="qbot"))
|
|
net.nick = "quser"
|
|
net._nickserv_pending = "qbot_auth"
|
|
net._nickserv_done = asyncio.Event()
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
await net._handle_qbot("You are now logged in as quser.")
|
|
assert net._nickserv_pending == ""
|
|
assert net._nickserv_done.is_set()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_qbot_auth_failure(self) -> None:
|
|
net = _net(cfg=_cfg(auth_service="qbot"))
|
|
net._nickserv_pending = "qbot_auth"
|
|
net._nickserv_done = asyncio.Event()
|
|
|
|
await net._handle_qbot("Incorrect password for quser")
|
|
assert net._nickserv_pending == ""
|
|
assert net._nickserv_done.is_set()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_qbot_ignores_when_not_pending(self) -> None:
|
|
net = _net()
|
|
net._nickserv_pending = ""
|
|
net._nickserv_done = asyncio.Event()
|
|
|
|
await net._handle_qbot("You are now logged in as quser.")
|
|
# Should not have set the event (wasn't waiting for qbot)
|
|
assert not net._nickserv_done.is_set()
|
|
|
|
|
|
# -- read loop ---------------------------------------------------------------
|
|
|
|
class TestReadLoop:
|
|
@pytest.mark.asyncio
|
|
async def test_processes_complete_lines(self) -> None:
|
|
"""Read loop should parse complete CRLF-terminated lines."""
|
|
net = _net()
|
|
net.state = State.REGISTERING
|
|
net._running = True
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
reader = AsyncMock()
|
|
reader.read = AsyncMock(side_effect=[
|
|
b"PING :test\r\n",
|
|
b"", # EOF
|
|
])
|
|
net._reader = reader
|
|
|
|
with patch.object(net, "_disconnect", new_callable=AsyncMock):
|
|
with patch.object(net, "_schedule_reconnect"):
|
|
await net._read_loop()
|
|
|
|
writer.write.assert_called_with(b"PONG test\r\n")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handles_partial_reads(self) -> None:
|
|
"""Read loop should buffer partial lines across reads."""
|
|
net = _net()
|
|
net.state = State.REGISTERING
|
|
net._running = True
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
reader = AsyncMock()
|
|
reader.read = AsyncMock(side_effect=[
|
|
b"PING :tes", # partial
|
|
b"t\r\n", # completion
|
|
b"", # EOF
|
|
])
|
|
net._reader = reader
|
|
|
|
with patch.object(net, "_disconnect", new_callable=AsyncMock):
|
|
with patch.object(net, "_schedule_reconnect"):
|
|
await net._read_loop()
|
|
|
|
writer.write.assert_called_with(b"PONG test\r\n")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_lines_in_one_read(self) -> None:
|
|
"""Read loop should handle multiple lines in a single read."""
|
|
cb = MagicMock()
|
|
net = _net(on_message=cb)
|
|
net.state = State.READY
|
|
net._running = True
|
|
net.nick = "me"
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
reader = AsyncMock()
|
|
reader.read = AsyncMock(side_effect=[
|
|
b":a!u@h PRIVMSG #ch :msg1\r\n:b!u@h PRIVMSG #ch :msg2\r\n",
|
|
b"",
|
|
])
|
|
net._reader = reader
|
|
|
|
with patch.object(net, "_disconnect", new_callable=AsyncMock):
|
|
with patch.object(net, "_schedule_reconnect"):
|
|
await net._read_loop()
|
|
|
|
assert cb.call_count == 2
|
|
|
|
|
|
# -- PING watchdog -----------------------------------------------------------
|
|
|
|
class TestPingWatchdog:
|
|
@pytest.mark.asyncio
|
|
async def test_timeout_triggers_disconnect(self) -> None:
|
|
"""Stale connection triggers disconnect + reconnect."""
|
|
net = _net(bouncer_cfg=_bouncer(ping_interval=0, ping_timeout=0))
|
|
net.state = State.READY
|
|
net._running = True
|
|
net._last_recv = time.monotonic() - 1000 # stale
|
|
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
with patch.object(net, "_disconnect", new_callable=AsyncMock) as mock_disc:
|
|
with patch.object(net, "_schedule_reconnect") as mock_recon:
|
|
await net._ping_watchdog()
|
|
|
|
mock_disc.assert_awaited_once()
|
|
mock_recon.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_healthy_connection_stays_alive(self) -> None:
|
|
"""Fresh data during timeout window prevents disconnect."""
|
|
net = _net(bouncer_cfg=_bouncer(ping_interval=10, ping_timeout=5))
|
|
net.state = State.READY
|
|
net._running = True
|
|
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
# Control time progression: stale at first, then fresh after PING
|
|
clock = [100.0] # start at T=100
|
|
|
|
def fake_monotonic() -> float:
|
|
return clock[0]
|
|
|
|
original_sleep = asyncio.sleep
|
|
call_count = 0
|
|
|
|
async def fake_sleep(delay: float) -> None:
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count == 1:
|
|
# After interval sleep: time advanced, data is stale -> PING sent
|
|
clock[0] = 120.0
|
|
net._last_recv = 100.0 # stale (20s > interval=10)
|
|
elif call_count == 2:
|
|
# During timeout wait: simulate PONG received (fresh data)
|
|
clock[0] = 122.0
|
|
net._last_recv = 122.0 # fresh
|
|
elif call_count == 3:
|
|
# Next interval sleep: exit loop
|
|
net.state = State.DISCONNECTED
|
|
await original_sleep(0)
|
|
|
|
with patch("time.monotonic", side_effect=fake_monotonic):
|
|
with patch("asyncio.sleep", side_effect=fake_sleep):
|
|
with patch.object(net, "_disconnect", new_callable=AsyncMock) as mock_disc:
|
|
await net._ping_watchdog()
|
|
|
|
# PING was sent, but fresh data arrived -- no disconnect
|
|
ping_sent = any(b"PING" in c.args[0] for c in writer.write.call_args_list)
|
|
assert ping_sent
|
|
mock_disc.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_watchdog_cancelled_on_disconnect(self) -> None:
|
|
net = _net()
|
|
net.state = State.READY
|
|
net._running = True
|
|
net._last_recv = time.monotonic()
|
|
|
|
task = MagicMock()
|
|
task.done.return_value = False
|
|
net._ping_task = task
|
|
|
|
await net._disconnect()
|
|
task.cancel.assert_called_once()
|
|
assert net._ping_task is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ping_task_started_in_go_ready(self) -> None:
|
|
net = _net(bouncer_cfg=_bouncer(probation_seconds=0, ping_interval=999))
|
|
net.state = State.PROBATION
|
|
net._running = True
|
|
net._sasl_complete.set()
|
|
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
bl = _mock_backlog()
|
|
net.backlog = bl
|
|
|
|
await net._go_ready()
|
|
assert net._ping_task is not None
|
|
# Cancel it so test doesn't leak
|
|
net._ping_task.cancel()
|
|
try:
|
|
await net._ping_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ping_task_in_stop(self) -> None:
|
|
"""stop() cancels the ping task."""
|
|
net = _net()
|
|
net._running = True
|
|
ping_task = MagicMock()
|
|
ping_task.done.return_value = False
|
|
net._ping_task = ping_task
|
|
|
|
await net.stop()
|
|
# cancel() is called in both stop() and _disconnect()
|
|
assert ping_task.cancel.call_count >= 1
|
|
|
|
|
|
# -- IRCv3 CAP negotiation (server-time) ------------------------------------
|
|
|
|
class TestCapServerTime:
|
|
@pytest.mark.asyncio
|
|
async def test_server_time_ack_sets_flag(self) -> None:
|
|
net = _net()
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
net._caps_pending = 1
|
|
|
|
await net._handle(_msg(":server CAP * ACK :server-time"))
|
|
assert net._server_time is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_server_time_nak_handled(self) -> None:
|
|
net = _net()
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
net._caps_pending = 1
|
|
|
|
await net._handle(_msg(":server CAP * NAK :server-time"))
|
|
assert net._server_time is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_combined_sasl_and_server_time(self) -> None:
|
|
"""Both caps requested: ACK server-time + ACK sasl."""
|
|
net = _net()
|
|
net._sasl_mechanism = "PLAIN"
|
|
net._sasl_nick = "nick"
|
|
net._sasl_pass = "pass"
|
|
net._caps_pending = 2
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
|
|
# ACK server-time first
|
|
await net._handle(_msg(":server CAP * ACK :server-time"))
|
|
assert net._server_time is True
|
|
# Should NOT have sent CAP END yet (SASL still pending)
|
|
cap_end_calls = [c for c in writer.write.call_args_list
|
|
if b"CAP END" in c.args[0]]
|
|
assert len(cap_end_calls) == 0
|
|
|
|
# ACK sasl starts AUTHENTICATE flow
|
|
await net._handle(_msg(":server CAP * ACK :sasl"))
|
|
writer.write.assert_called_with(b"AUTHENTICATE PLAIN\r\n")
|
|
|
|
# SASL success resolves the last cap
|
|
await net._handle(_msg(":server 903 nick :SASL authentication successful"))
|
|
cap_end_calls = [c for c in writer.write.call_args_list
|
|
if b"CAP END" in c.args[0]]
|
|
assert len(cap_end_calls) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cap_end_after_all_resolved(self) -> None:
|
|
"""CAP END sent only after all caps are resolved."""
|
|
net = _net()
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
net._writer = writer
|
|
net._caps_pending = 1 # only server-time, no SASL
|
|
|
|
await net._handle(_msg(":server CAP * ACK :server-time"))
|
|
cap_end_calls = [c for c in writer.write.call_args_list
|
|
if b"CAP END" in c.args[0]]
|
|
assert len(cap_end_calls) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_server_time_property(self) -> None:
|
|
net = _net()
|
|
assert net.server_time is False
|
|
net._server_time = True
|
|
assert net.server_time is True
|
|
|
|
|
|
# -- cred_network + ephemeral -----------------------------------------------
|
|
|
|
class TestCredNetwork:
|
|
def test_defaults_to_cfg_name(self) -> None:
|
|
"""cred_network defaults to cfg.name when not overridden."""
|
|
net = _net()
|
|
assert net.cred_network == "testnet"
|
|
|
|
def test_override(self) -> None:
|
|
"""cred_network uses explicit value when provided."""
|
|
net = Network(
|
|
cfg=_cfg(name="_farm_libera"),
|
|
proxy_cfg=_proxy(),
|
|
bouncer_cfg=_bouncer(),
|
|
cred_network="libera",
|
|
)
|
|
assert net.cred_network == "libera"
|
|
|
|
|
|
class TestEphemeral:
|
|
def test_status_suppressed(self) -> None:
|
|
"""Ephemeral _status() logs but doesn't call on_status."""
|
|
status_cb = MagicMock()
|
|
net = Network(
|
|
cfg=_cfg(),
|
|
proxy_cfg=_proxy(),
|
|
bouncer_cfg=_bouncer(),
|
|
on_status=status_cb,
|
|
ephemeral=True,
|
|
)
|
|
net._status("test message")
|
|
status_cb.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_go_ready_registers_directly(self) -> None:
|
|
"""Ephemeral _go_ready() calls _nickserv_register() directly."""
|
|
net = Network(
|
|
cfg=_cfg(),
|
|
proxy_cfg=_proxy(),
|
|
bouncer_cfg=_bouncer(),
|
|
ephemeral=True,
|
|
)
|
|
net.state = State.PROBATION
|
|
net._running = True
|
|
net.nick = "ephemeral_nick"
|
|
|
|
register_called = False
|
|
|
|
async def mock_register() -> None:
|
|
nonlocal register_called
|
|
register_called = True
|
|
# Signal done so _go_ready doesn't block forever
|
|
net._nickserv_done.set()
|
|
|
|
with patch.object(net, "_nickserv_register", side_effect=mock_register):
|
|
await net._go_ready()
|
|
|
|
assert register_called
|
|
assert net.state == State.READY
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_skips_sasl(self) -> None:
|
|
"""Ephemeral _connect() uses random nick, no SASL."""
|
|
bl = _mock_backlog(creds_by_network=("stored_nick", "secret"))
|
|
net = Network(
|
|
cfg=_cfg(),
|
|
proxy_cfg=_proxy(),
|
|
backlog=bl,
|
|
bouncer_cfg=_bouncer(),
|
|
ephemeral=True,
|
|
)
|
|
reader = MagicMock()
|
|
writer = MagicMock()
|
|
writer.is_closing.return_value = False
|
|
writer.drain = AsyncMock()
|
|
|
|
with patch("bouncer.proxy.connect", new_callable=AsyncMock,
|
|
return_value=(reader, writer)):
|
|
with patch.object(net, "_read_loop", new_callable=AsyncMock):
|
|
await net._connect()
|
|
|
|
# Should NOT have looked up creds (ephemeral skips SASL)
|
|
bl.get_nickserv_creds_by_network.assert_not_called()
|
|
assert net._sasl_mechanism == ""
|
|
# Should have sent NICK with random nick, not stored
|
|
calls = [c.args[0] for c in writer.write.call_args_list]
|
|
cap_sasl = any(b"CAP REQ sasl" in c for c in calls)
|
|
assert not cap_sasl
|