feat: dynamic health test concurrency
Auto-scale test concurrency to ~10% of proxy count, capped by test_concurrency config ceiling (default raised from 5 to 25). Prevents saturating upstream Tor when pool size varies. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -36,7 +36,7 @@ chain:
|
||||
# - www.cloudflare.com
|
||||
# - www.amazon.com
|
||||
# test_timeout: 15 # per-test timeout (seconds)
|
||||
# test_concurrency: 5 # parallel health tests
|
||||
# test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool)
|
||||
# max_fails: 3 # consecutive fails before eviction
|
||||
# state_file: "" # empty = ~/.cache/s5p/pool.json
|
||||
# report_url: "" # POST dead proxies here (optional)
|
||||
|
||||
@@ -79,6 +79,7 @@ proxy_pool:
|
||||
- www.google.com
|
||||
- www.cloudflare.com
|
||||
- www.amazon.com
|
||||
test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool)
|
||||
max_fails: 3 # evict after N fails
|
||||
report_url: "" # POST dead proxies (optional)
|
||||
```
|
||||
|
||||
@@ -77,7 +77,7 @@ proxy_pool:
|
||||
- www.cloudflare.com
|
||||
- www.amazon.com
|
||||
test_timeout: 15
|
||||
test_concurrency: 5
|
||||
test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool)
|
||||
max_fails: 3
|
||||
state_file: "" # empty = ~/.cache/s5p/pool.json
|
||||
```
|
||||
@@ -180,7 +180,7 @@ proxy_pool:
|
||||
- www.cloudflare.com
|
||||
- www.amazon.com
|
||||
test_timeout: 15 # per-test timeout (seconds)
|
||||
test_concurrency: 5 # parallel health tests
|
||||
test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool)
|
||||
max_fails: 3 # evict after N consecutive failures
|
||||
state_file: "" # empty = ~/.cache/s5p/pool.json
|
||||
report_url: "" # POST dead proxies here (optional)
|
||||
@@ -209,6 +209,10 @@ by performing a TLS handshake against one of the `test_targets` (rotated
|
||||
round-robin). A successful handshake marks the proxy alive. After `max_fails`
|
||||
consecutive failures, a proxy is evicted.
|
||||
|
||||
Concurrency auto-scales to ~10% of the proxy count, capped by
|
||||
`test_concurrency` (default 25, minimum 3). For example, a pool of 73 proxies
|
||||
tests 7 at a time rather than saturating the upstream Tor node.
|
||||
|
||||
Before each health test cycle, the static chain is tested without any pool
|
||||
proxy. If the chain itself is unreachable (e.g., Tor is down), proxy tests
|
||||
are skipped entirely and a warning is logged. This prevents false mass-failure
|
||||
|
||||
@@ -52,7 +52,7 @@ class ProxyPoolConfig:
|
||||
"www.amazon.com",
|
||||
])
|
||||
test_timeout: float = 15.0
|
||||
test_concurrency: int = 5
|
||||
test_concurrency: int = 25
|
||||
max_fails: int = 3
|
||||
state_file: str = ""
|
||||
report_url: str = ""
|
||||
@@ -229,7 +229,7 @@ def load_config(path: str | Path) -> Config:
|
||||
"test_interval": float(pool_raw.get("test_interval", 120)),
|
||||
"test_url": pool_raw.get("test_url", ""),
|
||||
"test_timeout": float(pool_raw.get("test_timeout", 15)),
|
||||
"test_concurrency": int(pool_raw.get("test_concurrency", 5)),
|
||||
"test_concurrency": int(pool_raw.get("test_concurrency", 25)),
|
||||
"max_fails": int(pool_raw.get("max_fails", 3)),
|
||||
"state_file": pool_raw.get("state_file", ""),
|
||||
"report_url": pool_raw.get("report_url", ""),
|
||||
|
||||
@@ -300,7 +300,9 @@ class ProxyPool:
|
||||
if not target:
|
||||
return
|
||||
|
||||
sem = asyncio.Semaphore(self._cfg.test_concurrency)
|
||||
effective = max(3, min(len(target) // 10, self._cfg.test_concurrency))
|
||||
sem = asyncio.Semaphore(effective)
|
||||
logger.debug("pool: testing %d proxies (concurrency=%d)", len(target), effective)
|
||||
results: dict[str, bool] = {}
|
||||
|
||||
async def _test(key: str, entry: ProxyEntry) -> None:
|
||||
|
||||
@@ -155,6 +155,102 @@ class TestProxyPoolWeight:
|
||||
pool.report_failure(hop) # should not raise
|
||||
|
||||
|
||||
class TestDynamicConcurrency:
|
||||
"""Test dynamic health test concurrency scaling."""
|
||||
|
||||
def test_scales_to_ten_percent(self):
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
cfg = ProxyPoolConfig(sources=[], test_concurrency=25)
|
||||
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||
|
||||
now = time.time()
|
||||
# add 100 proxies -> effective concurrency = max(3, min(100//10, 25)) = 10
|
||||
for i in range(100):
|
||||
hop = ChainHop(proto="socks5", host=f"10.0.{i // 256}.{i % 256}", port=1080)
|
||||
key = f"socks5://10.0.{i // 256}.{i % 256}:1080"
|
||||
pool._proxies[key] = ProxyEntry(hop=hop, alive=False, last_seen=now)
|
||||
|
||||
captured = {}
|
||||
|
||||
original_semaphore = asyncio.Semaphore
|
||||
|
||||
def capture_semaphore(value):
|
||||
captured["concurrency"] = value
|
||||
return original_semaphore(value)
|
||||
|
||||
with (
|
||||
patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True),
|
||||
patch("s5p.pool.asyncio.Semaphore", side_effect=capture_semaphore),
|
||||
):
|
||||
asyncio.run(pool._run_health_tests())
|
||||
|
||||
assert captured["concurrency"] == 10
|
||||
|
||||
def test_minimum_of_three(self):
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
cfg = ProxyPoolConfig(sources=[], test_concurrency=25)
|
||||
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||
|
||||
now = time.time()
|
||||
# 5 proxies -> 5//10=0, but min is 3
|
||||
for i in range(5):
|
||||
hop = ChainHop(proto="socks5", host=f"10.0.0.{i}", port=1080)
|
||||
pool._proxies[f"socks5://10.0.0.{i}:1080"] = ProxyEntry(
|
||||
hop=hop, alive=False, last_seen=now,
|
||||
)
|
||||
|
||||
captured = {}
|
||||
|
||||
original_semaphore = asyncio.Semaphore
|
||||
|
||||
def capture_semaphore(value):
|
||||
captured["concurrency"] = value
|
||||
return original_semaphore(value)
|
||||
|
||||
with (
|
||||
patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True),
|
||||
patch("s5p.pool.asyncio.Semaphore", side_effect=capture_semaphore),
|
||||
):
|
||||
asyncio.run(pool._run_health_tests())
|
||||
|
||||
assert captured["concurrency"] == 3
|
||||
|
||||
def test_capped_by_config(self):
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
cfg = ProxyPoolConfig(sources=[], test_concurrency=5)
|
||||
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||
|
||||
now = time.time()
|
||||
# 1000 proxies -> 1000//10=100, capped at 5
|
||||
for i in range(1000):
|
||||
h = f"10.{i // 65536}.{(i // 256) % 256}.{i % 256}"
|
||||
hop = ChainHop(proto="socks5", host=h, port=1080)
|
||||
key = str(hop)
|
||||
pool._proxies[key] = ProxyEntry(hop=hop, alive=False, last_seen=now)
|
||||
|
||||
captured = {}
|
||||
|
||||
original_semaphore = asyncio.Semaphore
|
||||
|
||||
def capture_semaphore(value):
|
||||
captured["concurrency"] = value
|
||||
return original_semaphore(value)
|
||||
|
||||
with (
|
||||
patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True),
|
||||
patch("s5p.pool.asyncio.Semaphore", side_effect=capture_semaphore),
|
||||
):
|
||||
asyncio.run(pool._run_health_tests())
|
||||
|
||||
assert captured["concurrency"] == 5
|
||||
|
||||
|
||||
class TestProxyPoolHealthTests:
|
||||
"""Test selective health testing."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user