From 0816a7f0cbca41bd0cee61bd22aaa5dbc4bcb22b Mon Sep 17 00:00:00 2001 From: user Date: Sun, 15 Feb 2026 15:59:26 +0100 Subject: [PATCH] feat: add static chain health check before pool tests Test the static chain (without pool proxy) before running pool health tests. If the chain itself is unreachable, skip proxy testing and log a clear warning. Prevents false mass-failure when the issue is upstream (e.g., Tor is down), not the exit proxies. --- src/s5p/pool.py | 41 ++++++++++++++++++++++++++ tests/test_pool.py | 72 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+) diff --git a/src/s5p/pool.py b/src/s5p/pool.py index 47b5002..0fb44ce 100644 --- a/src/s5p/pool.py +++ b/src/s5p/pool.py @@ -267,6 +267,40 @@ class ProxyPool: except OSError: pass + async def _test_chain(self) -> bool: + """Test the static chain without any pool proxy.""" + if not self._chain: + return True + + parsed = urlparse(self._cfg.test_url) + host = parsed.hostname or "httpbin.org" + port = parsed.port or 80 + path = parsed.path or "/" + + try: + reader, writer = await build_chain( + self._chain, host, port, timeout=self._cfg.test_timeout, + ) + except (ProtoError, TimeoutError, ConnectionError, OSError, EOFError): + return False + + try: + request = f"GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n" + writer.write(request.encode()) + await writer.drain() + + line = await asyncio.wait_for(reader.readline(), timeout=self._cfg.test_timeout) + parts = line.decode("utf-8", errors="replace").split(None, 2) + return len(parts) >= 2 and parts[1].startswith("2") + except (TimeoutError, ConnectionError, OSError, EOFError): + return False + finally: + try: + writer.close() + await writer.wait_closed() + except OSError: + pass + async def _run_health_tests(self, keys: list[str] | None = None) -> None: """Test proxies with bounded concurrency. @@ -276,6 +310,13 @@ class ProxyPool: if not self._proxies: return + # pre-flight: verify the static chain is reachable + if self._chain: + chain_ok = await self._test_chain() + if not chain_ok: + logger.warning("pool: static chain unreachable, skipping proxy tests") + return + target = ( [(k, self._proxies[k]) for k in keys if k in self._proxies] if keys is not None diff --git a/tests/test_pool.py b/tests/test_pool.py index 3451090..47d2fca 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -184,6 +184,78 @@ class TestProxyPoolHealthTests: # proxy B untouched assert pool._proxies["socks5://10.0.0.2:1080"].alive is False + def test_chain_check_skips_on_failure(self): + import asyncio + from unittest.mock import AsyncMock, patch + + chain_hop = ChainHop(proto="socks5", host="127.0.0.1", port=9050) + cfg = ProxyPoolConfig(sources=[]) + pool = ProxyPool(cfg, [chain_hop], timeout=10.0) + + now = time.time() + hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080) + pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry( + hop=hop, alive=True, last_seen=now, last_ok=now, + ) + pool._rebuild_alive() + + # chain test fails -> proxy tests should be skipped + with ( + patch.object(pool, "_test_chain", new_callable=AsyncMock, return_value=False), + patch.object(pool, "_test_proxy", new_callable=AsyncMock) as mock_proxy, + ): + asyncio.run(pool._run_health_tests()) + mock_proxy.assert_not_called() + + # proxy should remain in its previous state (untouched) + assert pool._proxies["socks5://10.0.0.1:1080"].alive is True + + def test_chain_check_passes(self): + import asyncio + from unittest.mock import AsyncMock, patch + + chain_hop = ChainHop(proto="socks5", host="127.0.0.1", port=9050) + cfg = ProxyPoolConfig(sources=[]) + pool = ProxyPool(cfg, [chain_hop], timeout=10.0) + + now = time.time() + hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080) + pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry( + hop=hop, alive=False, last_seen=now, + ) + + # chain test passes -> proxy tests should run + with ( + patch.object(pool, "_test_chain", new_callable=AsyncMock, return_value=True), + patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True), + ): + asyncio.run(pool._run_health_tests()) + + assert pool._proxies["socks5://10.0.0.1:1080"].alive is True + + def test_no_chain_skips_check(self): + import asyncio + from unittest.mock import AsyncMock, patch + + cfg = ProxyPoolConfig(sources=[]) + pool = ProxyPool(cfg, [], timeout=10.0) # no static chain + + now = time.time() + hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080) + pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry( + hop=hop, alive=False, last_seen=now, + ) + + # no chain -> _test_chain should not be called, proxy tests run + with ( + patch.object(pool, "_test_chain", new_callable=AsyncMock) as mock_chain, + patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True), + ): + asyncio.run(pool._run_health_tests()) + mock_chain.assert_not_called() + + assert pool._proxies["socks5://10.0.0.1:1080"].alive is True + class TestProxyPoolStaleExpiry: """Test stale proxy eviction."""