feat: add static chain health check before pool tests
Test the static chain (without pool proxy) before running pool health tests. If the chain itself is unreachable, skip proxy testing and log a clear warning. Prevents false mass-failure when the issue is upstream (e.g., Tor is down), not the exit proxies.
This commit is contained in:
@@ -267,6 +267,40 @@ class ProxyPool:
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
async def _test_chain(self) -> bool:
|
||||
"""Test the static chain without any pool proxy."""
|
||||
if not self._chain:
|
||||
return True
|
||||
|
||||
parsed = urlparse(self._cfg.test_url)
|
||||
host = parsed.hostname or "httpbin.org"
|
||||
port = parsed.port or 80
|
||||
path = parsed.path or "/"
|
||||
|
||||
try:
|
||||
reader, writer = await build_chain(
|
||||
self._chain, host, port, timeout=self._cfg.test_timeout,
|
||||
)
|
||||
except (ProtoError, TimeoutError, ConnectionError, OSError, EOFError):
|
||||
return False
|
||||
|
||||
try:
|
||||
request = f"GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"
|
||||
writer.write(request.encode())
|
||||
await writer.drain()
|
||||
|
||||
line = await asyncio.wait_for(reader.readline(), timeout=self._cfg.test_timeout)
|
||||
parts = line.decode("utf-8", errors="replace").split(None, 2)
|
||||
return len(parts) >= 2 and parts[1].startswith("2")
|
||||
except (TimeoutError, ConnectionError, OSError, EOFError):
|
||||
return False
|
||||
finally:
|
||||
try:
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
async def _run_health_tests(self, keys: list[str] | None = None) -> None:
|
||||
"""Test proxies with bounded concurrency.
|
||||
|
||||
@@ -276,6 +310,13 @@ class ProxyPool:
|
||||
if not self._proxies:
|
||||
return
|
||||
|
||||
# pre-flight: verify the static chain is reachable
|
||||
if self._chain:
|
||||
chain_ok = await self._test_chain()
|
||||
if not chain_ok:
|
||||
logger.warning("pool: static chain unreachable, skipping proxy tests")
|
||||
return
|
||||
|
||||
target = (
|
||||
[(k, self._proxies[k]) for k in keys if k in self._proxies]
|
||||
if keys is not None
|
||||
|
||||
@@ -184,6 +184,78 @@ class TestProxyPoolHealthTests:
|
||||
# proxy B untouched
|
||||
assert pool._proxies["socks5://10.0.0.2:1080"].alive is False
|
||||
|
||||
def test_chain_check_skips_on_failure(self):
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
chain_hop = ChainHop(proto="socks5", host="127.0.0.1", port=9050)
|
||||
cfg = ProxyPoolConfig(sources=[])
|
||||
pool = ProxyPool(cfg, [chain_hop], timeout=10.0)
|
||||
|
||||
now = time.time()
|
||||
hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080)
|
||||
pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry(
|
||||
hop=hop, alive=True, last_seen=now, last_ok=now,
|
||||
)
|
||||
pool._rebuild_alive()
|
||||
|
||||
# chain test fails -> proxy tests should be skipped
|
||||
with (
|
||||
patch.object(pool, "_test_chain", new_callable=AsyncMock, return_value=False),
|
||||
patch.object(pool, "_test_proxy", new_callable=AsyncMock) as mock_proxy,
|
||||
):
|
||||
asyncio.run(pool._run_health_tests())
|
||||
mock_proxy.assert_not_called()
|
||||
|
||||
# proxy should remain in its previous state (untouched)
|
||||
assert pool._proxies["socks5://10.0.0.1:1080"].alive is True
|
||||
|
||||
def test_chain_check_passes(self):
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
chain_hop = ChainHop(proto="socks5", host="127.0.0.1", port=9050)
|
||||
cfg = ProxyPoolConfig(sources=[])
|
||||
pool = ProxyPool(cfg, [chain_hop], timeout=10.0)
|
||||
|
||||
now = time.time()
|
||||
hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080)
|
||||
pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry(
|
||||
hop=hop, alive=False, last_seen=now,
|
||||
)
|
||||
|
||||
# chain test passes -> proxy tests should run
|
||||
with (
|
||||
patch.object(pool, "_test_chain", new_callable=AsyncMock, return_value=True),
|
||||
patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True),
|
||||
):
|
||||
asyncio.run(pool._run_health_tests())
|
||||
|
||||
assert pool._proxies["socks5://10.0.0.1:1080"].alive is True
|
||||
|
||||
def test_no_chain_skips_check(self):
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
cfg = ProxyPoolConfig(sources=[])
|
||||
pool = ProxyPool(cfg, [], timeout=10.0) # no static chain
|
||||
|
||||
now = time.time()
|
||||
hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080)
|
||||
pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry(
|
||||
hop=hop, alive=False, last_seen=now,
|
||||
)
|
||||
|
||||
# no chain -> _test_chain should not be called, proxy tests run
|
||||
with (
|
||||
patch.object(pool, "_test_chain", new_callable=AsyncMock) as mock_chain,
|
||||
patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True),
|
||||
):
|
||||
asyncio.run(pool._run_health_tests())
|
||||
mock_chain.assert_not_called()
|
||||
|
||||
assert pool._proxies["socks5://10.0.0.1:1080"].alive is True
|
||||
|
||||
|
||||
class TestProxyPoolStaleExpiry:
|
||||
"""Test stale proxy eviction."""
|
||||
|
||||
Reference in New Issue
Block a user