"""Tests for the managed proxy pool.""" import json import time import pytest from s5p.config import ChainHop, PoolSourceConfig, ProxyPoolConfig from s5p.pool import ProxyEntry, ProxyPool class TestProxyEntry: """Test ProxyEntry defaults.""" def test_defaults(self): hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080) entry = ProxyEntry(hop=hop) assert entry.alive is False assert entry.fails == 0 assert entry.tests == 0 class TestProxyPoolMerge: """Test proxy deduplication and merge.""" def test_merge_dedup(self): cfg = ProxyPoolConfig(sources=[]) pool = ProxyPool(cfg, [], timeout=10.0) proxies = [ ChainHop(proto="socks5", host="1.2.3.4", port=1080), ChainHop(proto="socks5", host="1.2.3.4", port=1080), ChainHop(proto="socks5", host="5.6.7.8", port=1080), ] pool._merge(proxies) assert pool.count == 2 def test_merge_updates_existing(self): cfg = ProxyPoolConfig(sources=[]) pool = ProxyPool(cfg, [], timeout=10.0) hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080) pool._merge([hop]) first_seen = pool._proxies["socks5://1.2.3.4:1080"].last_seen # merge again -- last_seen should update time.sleep(0.01) pool._merge([hop]) assert pool._proxies["socks5://1.2.3.4:1080"].last_seen >= first_seen assert pool.count == 1 class TestProxyPoolGet: """Test proxy selection.""" def test_get_empty(self): import asyncio cfg = ProxyPoolConfig(sources=[]) pool = ProxyPool(cfg, [], timeout=10.0) result = asyncio.run(pool.get()) assert result is None def test_get_returns_alive(self): import asyncio cfg = ProxyPoolConfig(sources=[]) pool = ProxyPool(cfg, [], timeout=10.0) hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080) pool._proxies["socks5://1.2.3.4:1080"] = ProxyEntry(hop=hop, alive=True) pool._rebuild_alive() result = asyncio.run(pool.get()) assert result is not None assert result.host == "1.2.3.4" class TestProxyPoolWeight: """Test weighted proxy selection.""" def test_recent_proxy_preferred(self): import asyncio from collections import Counter cfg = ProxyPoolConfig(sources=[]) pool = ProxyPool(cfg, [], timeout=10.0) now = time.time() fresh = ChainHop(proto="socks5", host="10.0.0.1", port=1080) stale = ChainHop(proto="socks5", host="10.0.0.2", port=1080) pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry( hop=fresh, alive=True, last_ok=now, ) pool._proxies["socks5://10.0.0.2:1080"] = ProxyEntry( hop=stale, alive=True, last_ok=now - 3600, ) pool._rebuild_alive() counts: Counter[str] = Counter() for _ in range(1000): hop = asyncio.run(pool.get()) counts[hop.host] += 1 assert counts["10.0.0.1"] > counts["10.0.0.2"] * 3 def test_weight_values(self): hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080) now = 1000.0 # just tested entry = ProxyEntry(hop=hop, last_ok=now) assert ProxyPool._weight(entry, now) == pytest.approx(1.0) # 5 minutes ago entry.last_ok = now - 300 assert ProxyPool._weight(entry, now) == pytest.approx(0.5) # never tested entry.last_ok = 0 assert ProxyPool._weight(entry, now) == pytest.approx(0.01) def test_weight_failure_penalty(self): hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080) now = 1000.0 # healthy proxy: weight ~1.0 entry = ProxyEntry(hop=hop, last_ok=now) assert ProxyPool._weight(entry, now) == pytest.approx(1.0) # just failed: weight drops to floor (fail_age=0 -> multiplier=0) entry.last_fail = now assert ProxyPool._weight(entry, now) == pytest.approx(0.01) # 30s after failure: base=1/(1+30/300)=0.909, penalty=30/60=0.5 -> ~0.45 assert ProxyPool._weight(entry, now + 30) == pytest.approx(0.4545, abs=0.05) # 60s after failure: penalty=1.0, only base decay -> 1/(1+60/300)=0.833 assert ProxyPool._weight(entry, now + 60) == pytest.approx(0.833, abs=0.05) def test_report_failure(self): cfg = ProxyPoolConfig(sources=[]) pool = ProxyPool(cfg, [], timeout=10.0) hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080) pool._proxies["socks5://1.2.3.4:1080"] = ProxyEntry( hop=hop, alive=True, last_ok=time.time(), ) assert pool._proxies["socks5://1.2.3.4:1080"].last_fail == 0.0 pool.report_failure(hop) assert pool._proxies["socks5://1.2.3.4:1080"].last_fail > 0.0 def test_report_failure_unknown_proxy(self): cfg = ProxyPoolConfig(sources=[]) pool = ProxyPool(cfg, [], timeout=10.0) hop = ChainHop(proto="socks5", host="9.9.9.9", port=1080) pool.report_failure(hop) # should not raise class TestProxyPoolHealthTests: """Test selective health testing.""" def test_selective_keys(self): import asyncio from unittest.mock import AsyncMock, patch cfg = ProxyPoolConfig(sources=[]) pool = ProxyPool(cfg, [], timeout=10.0) now = time.time() hop_a = ChainHop(proto="socks5", host="10.0.0.1", port=1080) hop_b = ChainHop(proto="socks5", host="10.0.0.2", port=1080) pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry( hop=hop_a, alive=False, last_seen=now, ) pool._proxies["socks5://10.0.0.2:1080"] = ProxyEntry( hop=hop_b, alive=False, last_seen=now, ) # only test proxy A with patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True) as mock: asyncio.run(pool._run_health_tests(keys=["socks5://10.0.0.1:1080"])) # should only have been called for proxy A assert mock.call_count == 1 assert mock.call_args[0][0] == "socks5://10.0.0.1:1080" assert pool._proxies["socks5://10.0.0.1:1080"].alive is True # proxy B untouched assert pool._proxies["socks5://10.0.0.2:1080"].alive is False def test_chain_check_skips_on_failure(self): import asyncio from unittest.mock import AsyncMock, patch chain_hop = ChainHop(proto="socks5", host="127.0.0.1", port=9050) cfg = ProxyPoolConfig(sources=[]) pool = ProxyPool(cfg, [chain_hop], timeout=10.0) now = time.time() hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080) pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry( hop=hop, alive=True, last_seen=now, last_ok=now, ) pool._rebuild_alive() # chain test fails -> proxy tests should be skipped with ( patch.object(pool, "_test_chain", new_callable=AsyncMock, return_value=False), patch.object(pool, "_test_proxy", new_callable=AsyncMock) as mock_proxy, ): asyncio.run(pool._run_health_tests()) mock_proxy.assert_not_called() # proxy should remain in its previous state (untouched) assert pool._proxies["socks5://10.0.0.1:1080"].alive is True def test_chain_check_passes(self): import asyncio from unittest.mock import AsyncMock, patch chain_hop = ChainHop(proto="socks5", host="127.0.0.1", port=9050) cfg = ProxyPoolConfig(sources=[]) pool = ProxyPool(cfg, [chain_hop], timeout=10.0) now = time.time() hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080) pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry( hop=hop, alive=False, last_seen=now, ) # chain test passes -> proxy tests should run with ( patch.object(pool, "_test_chain", new_callable=AsyncMock, return_value=True), patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True), ): asyncio.run(pool._run_health_tests()) assert pool._proxies["socks5://10.0.0.1:1080"].alive is True def test_no_chain_skips_check(self): import asyncio from unittest.mock import AsyncMock, patch cfg = ProxyPoolConfig(sources=[]) pool = ProxyPool(cfg, [], timeout=10.0) # no static chain now = time.time() hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080) pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry( hop=hop, alive=False, last_seen=now, ) # no chain -> _test_chain should not be called, proxy tests run with ( patch.object(pool, "_test_chain", new_callable=AsyncMock) as mock_chain, patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True), ): asyncio.run(pool._run_health_tests()) mock_chain.assert_not_called() assert pool._proxies["socks5://10.0.0.1:1080"].alive is True class TestProxyPoolStaleExpiry: """Test stale proxy eviction.""" def test_stale_dead_proxy_evicted(self): import asyncio from unittest.mock import AsyncMock, patch cfg = ProxyPoolConfig(sources=[], refresh=300) pool = ProxyPool(cfg, [], timeout=10.0) now = time.time() # stale + dead: should be evicted stale = ChainHop(proto="socks5", host="10.0.0.1", port=1080) pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry( hop=stale, alive=False, last_seen=now - 1200, ) # fresh + dead: should survive (recently seen, might recover) fresh_dead = ChainHop(proto="socks5", host="10.0.0.2", port=1080) pool._proxies["socks5://10.0.0.2:1080"] = ProxyEntry( hop=fresh_dead, alive=False, last_seen=now, ) with patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=False): asyncio.run(pool._run_health_tests()) assert "socks5://10.0.0.1:1080" not in pool._proxies assert "socks5://10.0.0.2:1080" in pool._proxies def test_stale_alive_proxy_kept(self): import asyncio from unittest.mock import AsyncMock, patch cfg = ProxyPoolConfig(sources=[], refresh=300) pool = ProxyPool(cfg, [], timeout=10.0) now = time.time() # stale but alive: should survive (still passing health tests) hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080) pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry( hop=hop, alive=True, last_seen=now - 1200, last_ok=now, ) with patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True): asyncio.run(pool._run_health_tests()) assert "socks5://10.0.0.1:1080" in pool._proxies class TestProxyPoolFetchFile: """Test file source parsing.""" def test_parse_file(self, tmp_path): proxy_file = tmp_path / "proxies.txt" proxy_file.write_text( "# comment\n" "socks5://1.2.3.4:1080\n" "socks5://user:pass@5.6.7.8:1080\n" "http://proxy.example.com:8080\n" "\n" " # another comment\n" ) cfg = ProxyPoolConfig(sources=[]) pool = ProxyPool(cfg, [], timeout=10.0) src = PoolSourceConfig(file=str(proxy_file)) result = pool._fetch_file_sync(src) assert len(result) == 3 assert result[0].proto == "socks5" assert result[0].host == "1.2.3.4" assert result[1].username == "user" assert result[1].password == "pass" assert result[2].proto == "http" def test_parse_file_with_proto_filter(self, tmp_path): proxy_file = tmp_path / "proxies.txt" proxy_file.write_text( "socks5://1.2.3.4:1080\n" "http://proxy.example.com:8080\n" ) cfg = ProxyPoolConfig(sources=[]) pool = ProxyPool(cfg, [], timeout=10.0) src = PoolSourceConfig(file=str(proxy_file), proto="socks5") result = pool._fetch_file_sync(src) assert len(result) == 1 assert result[0].proto == "socks5" def test_missing_file(self, tmp_path): cfg = ProxyPoolConfig(sources=[]) pool = ProxyPool(cfg, [], timeout=10.0) src = PoolSourceConfig(file=str(tmp_path / "nonexistent.txt")) result = pool._fetch_file_sync(src) assert result == [] class TestProxyPoolPersistence: """Test state save/load.""" def test_save_and_load(self, tmp_path): state_file = str(tmp_path / "pool.json") cfg = ProxyPoolConfig(sources=[], state_file=state_file) pool = ProxyPool(cfg, [], timeout=10.0) hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080) pool._proxies["socks5://1.2.3.4:1080"] = ProxyEntry( hop=hop, alive=True, fails=0, tests=5, last_ok=1000.0, ) pool._rebuild_alive() pool._save_state() # load into a fresh pool pool2 = ProxyPool(cfg, [], timeout=10.0) pool2._load_state() assert pool2.count == 1 assert pool2.alive_count == 1 entry = pool2._proxies["socks5://1.2.3.4:1080"] assert entry.hop.host == "1.2.3.4" assert entry.tests == 5 assert entry.alive is True def test_corrupt_state(self, tmp_path): state_file = tmp_path / "pool.json" state_file.write_text("{invalid json") cfg = ProxyPoolConfig(sources=[], state_file=str(state_file)) pool = ProxyPool(cfg, [], timeout=10.0) pool._load_state() assert pool.count == 0 def test_missing_state(self, tmp_path): cfg = ProxyPoolConfig(sources=[], state_file=str(tmp_path / "missing.json")) pool = ProxyPool(cfg, [], timeout=10.0) pool._load_state() # should not raise assert pool.count == 0 def test_state_with_auth(self, tmp_path): state_file = str(tmp_path / "pool.json") cfg = ProxyPoolConfig(sources=[], state_file=state_file) pool = ProxyPool(cfg, [], timeout=10.0) hop = ChainHop( proto="socks5", host="1.2.3.4", port=1080, username="user", password="pass", ) pool._proxies["socks5://1.2.3.4:1080"] = ProxyEntry(hop=hop, alive=True) pool._save_state() pool2 = ProxyPool(cfg, [], timeout=10.0) pool2._load_state() entry = pool2._proxies["socks5://1.2.3.4:1080"] assert entry.hop.username == "user" assert entry.hop.password == "pass"