diff --git a/src/s5p/pool.py b/src/s5p/pool.py index 47b5002..0fb44ce 100644 --- a/src/s5p/pool.py +++ b/src/s5p/pool.py @@ -267,6 +267,40 @@ class ProxyPool: except OSError: pass + async def _test_chain(self) -> bool: + """Test the static chain without any pool proxy.""" + if not self._chain: + return True + + parsed = urlparse(self._cfg.test_url) + host = parsed.hostname or "httpbin.org" + port = parsed.port or 80 + path = parsed.path or "/" + + try: + reader, writer = await build_chain( + self._chain, host, port, timeout=self._cfg.test_timeout, + ) + except (ProtoError, TimeoutError, ConnectionError, OSError, EOFError): + return False + + try: + request = f"GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n" + writer.write(request.encode()) + await writer.drain() + + line = await asyncio.wait_for(reader.readline(), timeout=self._cfg.test_timeout) + parts = line.decode("utf-8", errors="replace").split(None, 2) + return len(parts) >= 2 and parts[1].startswith("2") + except (TimeoutError, ConnectionError, OSError, EOFError): + return False + finally: + try: + writer.close() + await writer.wait_closed() + except OSError: + pass + async def _run_health_tests(self, keys: list[str] | None = None) -> None: """Test proxies with bounded concurrency. @@ -276,6 +310,13 @@ class ProxyPool: if not self._proxies: return + # pre-flight: verify the static chain is reachable + if self._chain: + chain_ok = await self._test_chain() + if not chain_ok: + logger.warning("pool: static chain unreachable, skipping proxy tests") + return + target = ( [(k, self._proxies[k]) for k in keys if k in self._proxies] if keys is not None diff --git a/tests/test_pool.py b/tests/test_pool.py index 3451090..47d2fca 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -184,6 +184,78 @@ class TestProxyPoolHealthTests: # proxy B untouched assert pool._proxies["socks5://10.0.0.2:1080"].alive is False + def test_chain_check_skips_on_failure(self): + import asyncio + from unittest.mock import AsyncMock, patch + + chain_hop = ChainHop(proto="socks5", host="127.0.0.1", port=9050) + cfg = ProxyPoolConfig(sources=[]) + pool = ProxyPool(cfg, [chain_hop], timeout=10.0) + + now = time.time() + hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080) + pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry( + hop=hop, alive=True, last_seen=now, last_ok=now, + ) + pool._rebuild_alive() + + # chain test fails -> proxy tests should be skipped + with ( + patch.object(pool, "_test_chain", new_callable=AsyncMock, return_value=False), + patch.object(pool, "_test_proxy", new_callable=AsyncMock) as mock_proxy, + ): + asyncio.run(pool._run_health_tests()) + mock_proxy.assert_not_called() + + # proxy should remain in its previous state (untouched) + assert pool._proxies["socks5://10.0.0.1:1080"].alive is True + + def test_chain_check_passes(self): + import asyncio + from unittest.mock import AsyncMock, patch + + chain_hop = ChainHop(proto="socks5", host="127.0.0.1", port=9050) + cfg = ProxyPoolConfig(sources=[]) + pool = ProxyPool(cfg, [chain_hop], timeout=10.0) + + now = time.time() + hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080) + pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry( + hop=hop, alive=False, last_seen=now, + ) + + # chain test passes -> proxy tests should run + with ( + patch.object(pool, "_test_chain", new_callable=AsyncMock, return_value=True), + patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True), + ): + asyncio.run(pool._run_health_tests()) + + assert pool._proxies["socks5://10.0.0.1:1080"].alive is True + + def test_no_chain_skips_check(self): + import asyncio + from unittest.mock import AsyncMock, patch + + cfg = ProxyPoolConfig(sources=[]) + pool = ProxyPool(cfg, [], timeout=10.0) # no static chain + + now = time.time() + hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080) + pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry( + hop=hop, alive=False, last_seen=now, + ) + + # no chain -> _test_chain should not be called, proxy tests run + with ( + patch.object(pool, "_test_chain", new_callable=AsyncMock) as mock_chain, + patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True), + ): + asyncio.run(pool._run_health_tests()) + mock_chain.assert_not_called() + + assert pool._proxies["socks5://10.0.0.1:1080"].alive is True + class TestProxyPoolStaleExpiry: """Test stale proxy eviction."""