"""Tests for background account farming.""" from __future__ import annotations import asyncio from unittest.mock import AsyncMock, MagicMock, patch import pytest from bouncer.config import BouncerConfig, NetworkConfig, ProxyConfig from bouncer.farm import FarmStats, RegistrationManager # -- helpers ----------------------------------------------------------------- def _bouncer(**overrides: object) -> BouncerConfig: defaults: dict[str, object] = { "farm_enabled": True, "farm_interval": 3600, "farm_max_accounts": 10, "probation_seconds": 1, "backoff_steps": [0], } defaults.update(overrides) return BouncerConfig(**defaults) # type: ignore[arg-type] def _net_cfg(name: str = "testnet", auth_service: str = "nickserv") -> NetworkConfig: return NetworkConfig( name=name, host="irc.test.net", port=6697, tls=True, auth_service=auth_service, ) def _proxy_resolver(net_cfg: NetworkConfig) -> ProxyConfig: return ProxyConfig(host="127.0.0.1", port=1080) def _mock_backlog(verified_count: int = 0) -> AsyncMock: bl = AsyncMock() bl.count_verified_creds = AsyncMock(return_value=verified_count) return bl def _manager( networks: dict[str, NetworkConfig] | None = None, backlog: AsyncMock | None = None, **bouncer_kw: object, ) -> RegistrationManager: nets = networks or {"testnet": _net_cfg()} return RegistrationManager( bouncer_cfg=_bouncer(**bouncer_kw), networks=nets, proxy_resolver=_proxy_resolver, backlog=backlog or _mock_backlog(), ) # -- tests ------------------------------------------------------------------- class TestFarmDisabled: @pytest.mark.asyncio async def test_start_noop_when_disabled(self) -> None: """start() is a no-op when farm_enabled=False.""" mgr = _manager(farm_enabled=False) await mgr.start() assert mgr._loop_task is None @pytest.mark.asyncio async def test_stop_safe_when_not_started(self) -> None: mgr = _manager(farm_enabled=False) await mgr.stop() # should not raise class TestFarmSkipsNonNickserv: @pytest.mark.asyncio async def test_skips_qbot(self) -> None: """Networks with auth_service='qbot' are skipped.""" nets = {"quake": _net_cfg("quake", auth_service="qbot")} mgr = _manager(networks=nets) await mgr._maybe_spawn("quake", nets["quake"]) assert "quake" not in mgr._active @pytest.mark.asyncio async def test_skips_none(self) -> None: """Networks with auth_service='none' are skipped.""" nets = {"anon": _net_cfg("anon", auth_service="none")} mgr = _manager(networks=nets) await mgr._maybe_spawn("anon", nets["anon"]) assert "anon" not in mgr._active class TestFarmMaxAccounts: @pytest.mark.asyncio async def test_skips_when_at_max(self) -> None: """No spawn when verified count >= farm_max_accounts.""" bl = _mock_backlog(verified_count=10) mgr = _manager(backlog=bl, farm_max_accounts=10) net_cfg = _net_cfg() await mgr._maybe_spawn("testnet", net_cfg) assert "testnet" not in mgr._active @pytest.mark.asyncio async def test_spawns_below_max(self) -> None: """Spawn when verified count < farm_max_accounts.""" bl = _mock_backlog(verified_count=5) mgr = _manager(backlog=bl, farm_max_accounts=10) net_cfg = _net_cfg() with patch.object(mgr, "_spawn_ephemeral") as mock_spawn: await mgr._maybe_spawn("testnet", net_cfg) mock_spawn.assert_called_once_with("testnet", net_cfg) class TestFarmInterval: @pytest.mark.asyncio async def test_respects_cooldown(self) -> None: """Cooldown enforced between attempts.""" import time bl = _mock_backlog(verified_count=0) mgr = _manager(backlog=bl, farm_interval=3600) # Simulate recent attempt mgr._stats["testnet"] = FarmStats( attempts=1, last_attempt=time.time(), ) net_cfg = _net_cfg() with patch.object(mgr, "_spawn_ephemeral") as mock_spawn: await mgr._maybe_spawn("testnet", net_cfg) mock_spawn.assert_not_called() class TestFarmSpawnEphemeral: @pytest.mark.asyncio async def test_creates_correct_config(self) -> None: """Ephemeral Network gets correct config (cred_network, ephemeral, no channels).""" bl = _mock_backlog() mgr = _manager(backlog=bl) net_cfg = _net_cfg() with patch("bouncer.farm.Network") as MockNetwork: mock_eph = MagicMock() mock_eph._nickserv_done = asyncio.Event() mock_eph.start = AsyncMock() mock_eph.stop = AsyncMock() MockNetwork.return_value = mock_eph mgr._spawn_ephemeral("testnet", net_cfg) # Verify Network was constructed with correct params call_kwargs = MockNetwork.call_args[1] assert call_kwargs["cred_network"] == "testnet" assert call_kwargs["ephemeral"] is True assert call_kwargs["on_message"] is None assert call_kwargs["on_status"] is None eph_cfg = MockNetwork.call_args[1]["cfg"] assert eph_cfg.name == "_farm_testnet" assert eph_cfg.channels == [] assert eph_cfg.host == "irc.test.net" assert "testnet" in mgr._active # Cleanup spawned task mgr._active["testnet"].cancel() try: await mgr._active["testnet"] except asyncio.CancelledError: pass class TestFarmOneAtATime: @pytest.mark.asyncio async def test_blocks_second_spawn(self) -> None: """Second spawn blocked while first is active.""" bl = _mock_backlog(verified_count=0) mgr = _manager(backlog=bl) net_cfg = _net_cfg() # Simulate an active task mgr._active["testnet"] = asyncio.create_task(asyncio.sleep(999)) try: with patch.object(mgr, "_spawn_ephemeral") as mock_spawn: await mgr._maybe_spawn("testnet", net_cfg) mock_spawn.assert_not_called() finally: mgr._active["testnet"].cancel() try: await mgr._active["testnet"] except asyncio.CancelledError: pass class TestFarmCleanup: @pytest.mark.asyncio async def test_stop_cancels_active(self) -> None: """All active ephemerals stopped on stop().""" mgr = _manager() task = asyncio.create_task(asyncio.sleep(999)) mgr._active["testnet"] = task mgr._loop_task = asyncio.create_task(asyncio.sleep(999)) await mgr.stop() assert task.cancelled() assert not mgr._active assert mgr._loop_task is None class TestFarmStatsTracking: @pytest.mark.asyncio async def test_stats_updated_on_success(self) -> None: """FarmStats updated on success.""" bl = AsyncMock() # Before: 0 verified, after: 1 verified bl.count_verified_creds = AsyncMock(side_effect=[0, 1]) mgr = _manager(backlog=bl) mock_eph = MagicMock() done_event = asyncio.Event() done_event.set() mock_eph._nickserv_done = done_event mock_eph.start = AsyncMock() mock_eph.stop = AsyncMock() await mgr._run_ephemeral("testnet", mock_eph) stats = mgr._stats["testnet"] assert stats.successes == 1 assert stats.last_success > 0 @pytest.mark.asyncio async def test_stats_updated_on_failure(self) -> None: """FarmStats updated on failure.""" bl = AsyncMock() # Before: 0, after: still 0 bl.count_verified_creds = AsyncMock(side_effect=[0, 0]) mgr = _manager(backlog=bl) mock_eph = MagicMock() done_event = asyncio.Event() done_event.set() mock_eph._nickserv_done = done_event mock_eph.start = AsyncMock() mock_eph.stop = AsyncMock() await mgr._run_ephemeral("testnet", mock_eph) stats = mgr._stats["testnet"] assert stats.failures == 1 assert stats.last_error == "no new verified account" class TestFarmManualTrigger: @pytest.mark.asyncio async def test_trigger_bypasses_cooldown(self) -> None: """trigger() bypasses cooldown for a specific network.""" bl = _mock_backlog() mgr = _manager(backlog=bl) import time mgr._stats["testnet"] = FarmStats( attempts=1, last_attempt=time.time(), ) with patch("bouncer.farm.Network") as MockNetwork: mock_eph = MagicMock() mock_eph._nickserv_done = asyncio.Event() mock_eph.start = AsyncMock() mock_eph.stop = AsyncMock() MockNetwork.return_value = mock_eph result = mgr.trigger("testnet") assert result is True assert "testnet" in mgr._active # Cleanup mgr._active["testnet"].cancel() try: await mgr._active["testnet"] except asyncio.CancelledError: pass def test_trigger_unknown_network(self) -> None: """trigger() returns False for unknown network.""" mgr = _manager() assert mgr.trigger("nonexistent") is False @pytest.mark.asyncio async def test_trigger_already_active(self) -> None: """trigger() returns False when ephemeral already active.""" mgr = _manager() mgr._active["testnet"] = asyncio.create_task(asyncio.sleep(999)) try: assert mgr.trigger("testnet") is False finally: mgr._active["testnet"].cancel() try: await mgr._active["testnet"] except asyncio.CancelledError: pass