Files
s5p/tests/test_pool.py
user e1403a67fc feat: add stale proxy expiry based on last_seen TTL
Evict proxies not returned by sources for >3 refresh cycles and not
currently alive. Cleans up proxies removed upstream faster than waiting
for max_fails consecutive health test failures.
2026-02-15 15:54:17 +01:00

305 lines
10 KiB
Python

"""Tests for the managed proxy pool."""
import json
import time
import pytest
from s5p.config import ChainHop, PoolSourceConfig, ProxyPoolConfig
from s5p.pool import ProxyEntry, ProxyPool
class TestProxyEntry:
"""Test ProxyEntry defaults."""
def test_defaults(self):
hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080)
entry = ProxyEntry(hop=hop)
assert entry.alive is False
assert entry.fails == 0
assert entry.tests == 0
class TestProxyPoolMerge:
"""Test proxy deduplication and merge."""
def test_merge_dedup(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
proxies = [
ChainHop(proto="socks5", host="1.2.3.4", port=1080),
ChainHop(proto="socks5", host="1.2.3.4", port=1080),
ChainHop(proto="socks5", host="5.6.7.8", port=1080),
]
pool._merge(proxies)
assert pool.count == 2
def test_merge_updates_existing(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080)
pool._merge([hop])
first_seen = pool._proxies["socks5://1.2.3.4:1080"].last_seen
# merge again -- last_seen should update
time.sleep(0.01)
pool._merge([hop])
assert pool._proxies["socks5://1.2.3.4:1080"].last_seen >= first_seen
assert pool.count == 1
class TestProxyPoolGet:
"""Test proxy selection."""
def test_get_empty(self):
import asyncio
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
result = asyncio.run(pool.get())
assert result is None
def test_get_returns_alive(self):
import asyncio
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080)
pool._proxies["socks5://1.2.3.4:1080"] = ProxyEntry(hop=hop, alive=True)
pool._rebuild_alive()
result = asyncio.run(pool.get())
assert result is not None
assert result.host == "1.2.3.4"
class TestProxyPoolWeight:
"""Test weighted proxy selection."""
def test_recent_proxy_preferred(self):
import asyncio
from collections import Counter
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
now = time.time()
fresh = ChainHop(proto="socks5", host="10.0.0.1", port=1080)
stale = ChainHop(proto="socks5", host="10.0.0.2", port=1080)
pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry(
hop=fresh, alive=True, last_ok=now,
)
pool._proxies["socks5://10.0.0.2:1080"] = ProxyEntry(
hop=stale, alive=True, last_ok=now - 3600,
)
pool._rebuild_alive()
counts: Counter[str] = Counter()
for _ in range(1000):
hop = asyncio.run(pool.get())
counts[hop.host] += 1
assert counts["10.0.0.1"] > counts["10.0.0.2"] * 3
def test_weight_values(self):
hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080)
now = 1000.0
# just tested
entry = ProxyEntry(hop=hop, last_ok=now)
assert ProxyPool._weight(entry, now) == pytest.approx(1.0)
# 5 minutes ago
entry.last_ok = now - 300
assert ProxyPool._weight(entry, now) == pytest.approx(0.5)
# never tested
entry.last_ok = 0
assert ProxyPool._weight(entry, now) == pytest.approx(0.01)
def test_weight_failure_penalty(self):
hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080)
now = 1000.0
# healthy proxy: weight ~1.0
entry = ProxyEntry(hop=hop, last_ok=now)
assert ProxyPool._weight(entry, now) == pytest.approx(1.0)
# just failed: weight drops to floor (fail_age=0 -> multiplier=0)
entry.last_fail = now
assert ProxyPool._weight(entry, now) == pytest.approx(0.01)
# 30s after failure: base=1/(1+30/300)=0.909, penalty=30/60=0.5 -> ~0.45
assert ProxyPool._weight(entry, now + 30) == pytest.approx(0.4545, abs=0.05)
# 60s after failure: penalty=1.0, only base decay -> 1/(1+60/300)=0.833
assert ProxyPool._weight(entry, now + 60) == pytest.approx(0.833, abs=0.05)
def test_report_failure(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080)
pool._proxies["socks5://1.2.3.4:1080"] = ProxyEntry(
hop=hop, alive=True, last_ok=time.time(),
)
assert pool._proxies["socks5://1.2.3.4:1080"].last_fail == 0.0
pool.report_failure(hop)
assert pool._proxies["socks5://1.2.3.4:1080"].last_fail > 0.0
def test_report_failure_unknown_proxy(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
hop = ChainHop(proto="socks5", host="9.9.9.9", port=1080)
pool.report_failure(hop) # should not raise
class TestProxyPoolStaleExpiry:
"""Test stale proxy eviction."""
def test_stale_dead_proxy_evicted(self):
import asyncio
from unittest.mock import AsyncMock, patch
cfg = ProxyPoolConfig(sources=[], refresh=300)
pool = ProxyPool(cfg, [], timeout=10.0)
now = time.time()
# stale + dead: should be evicted
stale = ChainHop(proto="socks5", host="10.0.0.1", port=1080)
pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry(
hop=stale, alive=False, last_seen=now - 1200,
)
# fresh + dead: should survive (recently seen, might recover)
fresh_dead = ChainHop(proto="socks5", host="10.0.0.2", port=1080)
pool._proxies["socks5://10.0.0.2:1080"] = ProxyEntry(
hop=fresh_dead, alive=False, last_seen=now,
)
with patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=False):
asyncio.run(pool._run_health_tests())
assert "socks5://10.0.0.1:1080" not in pool._proxies
assert "socks5://10.0.0.2:1080" in pool._proxies
def test_stale_alive_proxy_kept(self):
import asyncio
from unittest.mock import AsyncMock, patch
cfg = ProxyPoolConfig(sources=[], refresh=300)
pool = ProxyPool(cfg, [], timeout=10.0)
now = time.time()
# stale but alive: should survive (still passing health tests)
hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080)
pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry(
hop=hop, alive=True, last_seen=now - 1200, last_ok=now,
)
with patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True):
asyncio.run(pool._run_health_tests())
assert "socks5://10.0.0.1:1080" in pool._proxies
class TestProxyPoolFetchFile:
"""Test file source parsing."""
def test_parse_file(self, tmp_path):
proxy_file = tmp_path / "proxies.txt"
proxy_file.write_text(
"# comment\n"
"socks5://1.2.3.4:1080\n"
"socks5://user:pass@5.6.7.8:1080\n"
"http://proxy.example.com:8080\n"
"\n"
" # another comment\n"
)
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
src = PoolSourceConfig(file=str(proxy_file))
result = pool._fetch_file_sync(src)
assert len(result) == 3
assert result[0].proto == "socks5"
assert result[0].host == "1.2.3.4"
assert result[1].username == "user"
assert result[1].password == "pass"
assert result[2].proto == "http"
def test_parse_file_with_proto_filter(self, tmp_path):
proxy_file = tmp_path / "proxies.txt"
proxy_file.write_text(
"socks5://1.2.3.4:1080\n"
"http://proxy.example.com:8080\n"
)
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
src = PoolSourceConfig(file=str(proxy_file), proto="socks5")
result = pool._fetch_file_sync(src)
assert len(result) == 1
assert result[0].proto == "socks5"
def test_missing_file(self, tmp_path):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
src = PoolSourceConfig(file=str(tmp_path / "nonexistent.txt"))
result = pool._fetch_file_sync(src)
assert result == []
class TestProxyPoolPersistence:
"""Test state save/load."""
def test_save_and_load(self, tmp_path):
state_file = str(tmp_path / "pool.json")
cfg = ProxyPoolConfig(sources=[], state_file=state_file)
pool = ProxyPool(cfg, [], timeout=10.0)
hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080)
pool._proxies["socks5://1.2.3.4:1080"] = ProxyEntry(
hop=hop, alive=True, fails=0, tests=5, last_ok=1000.0,
)
pool._rebuild_alive()
pool._save_state()
# load into a fresh pool
pool2 = ProxyPool(cfg, [], timeout=10.0)
pool2._load_state()
assert pool2.count == 1
assert pool2.alive_count == 1
entry = pool2._proxies["socks5://1.2.3.4:1080"]
assert entry.hop.host == "1.2.3.4"
assert entry.tests == 5
assert entry.alive is True
def test_corrupt_state(self, tmp_path):
state_file = tmp_path / "pool.json"
state_file.write_text("{invalid json")
cfg = ProxyPoolConfig(sources=[], state_file=str(state_file))
pool = ProxyPool(cfg, [], timeout=10.0)
pool._load_state()
assert pool.count == 0
def test_missing_state(self, tmp_path):
cfg = ProxyPoolConfig(sources=[], state_file=str(tmp_path / "missing.json"))
pool = ProxyPool(cfg, [], timeout=10.0)
pool._load_state() # should not raise
assert pool.count == 0
def test_state_with_auth(self, tmp_path):
state_file = str(tmp_path / "pool.json")
cfg = ProxyPoolConfig(sources=[], state_file=state_file)
pool = ProxyPool(cfg, [], timeout=10.0)
hop = ChainHop(
proto="socks5", host="1.2.3.4", port=1080,
username="user", password="pass",
)
pool._proxies["socks5://1.2.3.4:1080"] = ProxyEntry(hop=hop, alive=True)
pool._save_state()
pool2 = ProxyPool(cfg, [], timeout=10.0)
pool2._load_state()
entry = pool2._proxies["socks5://1.2.3.4:1080"]
assert entry.hop.username == "user"
assert entry.hop.password == "pass"