diff --git a/config/example.yaml b/config/example.yaml index 906f547..a73b822 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -36,7 +36,7 @@ chain: # - www.cloudflare.com # - www.amazon.com # test_timeout: 15 # per-test timeout (seconds) -# test_concurrency: 5 # parallel health tests +# test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool) # max_fails: 3 # consecutive fails before eviction # state_file: "" # empty = ~/.cache/s5p/pool.json # report_url: "" # POST dead proxies here (optional) diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index ef49d6f..bd97f55 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -79,6 +79,7 @@ proxy_pool: - www.google.com - www.cloudflare.com - www.amazon.com + test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool) max_fails: 3 # evict after N fails report_url: "" # POST dead proxies (optional) ``` diff --git a/docs/USAGE.md b/docs/USAGE.md index 1ec4040..834ca98 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -77,7 +77,7 @@ proxy_pool: - www.cloudflare.com - www.amazon.com test_timeout: 15 - test_concurrency: 5 + test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool) max_fails: 3 state_file: "" # empty = ~/.cache/s5p/pool.json ``` @@ -180,7 +180,7 @@ proxy_pool: - www.cloudflare.com - www.amazon.com test_timeout: 15 # per-test timeout (seconds) - test_concurrency: 5 # parallel health tests + test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool) max_fails: 3 # evict after N consecutive failures state_file: "" # empty = ~/.cache/s5p/pool.json report_url: "" # POST dead proxies here (optional) @@ -209,6 +209,10 @@ by performing a TLS handshake against one of the `test_targets` (rotated round-robin). A successful handshake marks the proxy alive. After `max_fails` consecutive failures, a proxy is evicted. +Concurrency auto-scales to ~10% of the proxy count, capped by +`test_concurrency` (default 25, minimum 3). For example, a pool of 73 proxies +tests 7 at a time rather than saturating the upstream Tor node. + Before each health test cycle, the static chain is tested without any pool proxy. If the chain itself is unreachable (e.g., Tor is down), proxy tests are skipped entirely and a warning is logged. This prevents false mass-failure diff --git a/src/s5p/config.py b/src/s5p/config.py index c2fd85c..ea5e645 100644 --- a/src/s5p/config.py +++ b/src/s5p/config.py @@ -52,7 +52,7 @@ class ProxyPoolConfig: "www.amazon.com", ]) test_timeout: float = 15.0 - test_concurrency: int = 5 + test_concurrency: int = 25 max_fails: int = 3 state_file: str = "" report_url: str = "" @@ -229,7 +229,7 @@ def load_config(path: str | Path) -> Config: "test_interval": float(pool_raw.get("test_interval", 120)), "test_url": pool_raw.get("test_url", ""), "test_timeout": float(pool_raw.get("test_timeout", 15)), - "test_concurrency": int(pool_raw.get("test_concurrency", 5)), + "test_concurrency": int(pool_raw.get("test_concurrency", 25)), "max_fails": int(pool_raw.get("max_fails", 3)), "state_file": pool_raw.get("state_file", ""), "report_url": pool_raw.get("report_url", ""), diff --git a/src/s5p/pool.py b/src/s5p/pool.py index 24487ae..3ebd1e5 100644 --- a/src/s5p/pool.py +++ b/src/s5p/pool.py @@ -300,7 +300,9 @@ class ProxyPool: if not target: return - sem = asyncio.Semaphore(self._cfg.test_concurrency) + effective = max(3, min(len(target) // 10, self._cfg.test_concurrency)) + sem = asyncio.Semaphore(effective) + logger.debug("pool: testing %d proxies (concurrency=%d)", len(target), effective) results: dict[str, bool] = {} async def _test(key: str, entry: ProxyEntry) -> None: diff --git a/tests/test_pool.py b/tests/test_pool.py index 6ed0bed..66eb4d2 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -155,6 +155,102 @@ class TestProxyPoolWeight: pool.report_failure(hop) # should not raise +class TestDynamicConcurrency: + """Test dynamic health test concurrency scaling.""" + + def test_scales_to_ten_percent(self): + import asyncio + from unittest.mock import AsyncMock, patch + + cfg = ProxyPoolConfig(sources=[], test_concurrency=25) + pool = ProxyPool(cfg, [], timeout=10.0) + + now = time.time() + # add 100 proxies -> effective concurrency = max(3, min(100//10, 25)) = 10 + for i in range(100): + hop = ChainHop(proto="socks5", host=f"10.0.{i // 256}.{i % 256}", port=1080) + key = f"socks5://10.0.{i // 256}.{i % 256}:1080" + pool._proxies[key] = ProxyEntry(hop=hop, alive=False, last_seen=now) + + captured = {} + + original_semaphore = asyncio.Semaphore + + def capture_semaphore(value): + captured["concurrency"] = value + return original_semaphore(value) + + with ( + patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True), + patch("s5p.pool.asyncio.Semaphore", side_effect=capture_semaphore), + ): + asyncio.run(pool._run_health_tests()) + + assert captured["concurrency"] == 10 + + def test_minimum_of_three(self): + import asyncio + from unittest.mock import AsyncMock, patch + + cfg = ProxyPoolConfig(sources=[], test_concurrency=25) + pool = ProxyPool(cfg, [], timeout=10.0) + + now = time.time() + # 5 proxies -> 5//10=0, but min is 3 + for i in range(5): + hop = ChainHop(proto="socks5", host=f"10.0.0.{i}", port=1080) + pool._proxies[f"socks5://10.0.0.{i}:1080"] = ProxyEntry( + hop=hop, alive=False, last_seen=now, + ) + + captured = {} + + original_semaphore = asyncio.Semaphore + + def capture_semaphore(value): + captured["concurrency"] = value + return original_semaphore(value) + + with ( + patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True), + patch("s5p.pool.asyncio.Semaphore", side_effect=capture_semaphore), + ): + asyncio.run(pool._run_health_tests()) + + assert captured["concurrency"] == 3 + + def test_capped_by_config(self): + import asyncio + from unittest.mock import AsyncMock, patch + + cfg = ProxyPoolConfig(sources=[], test_concurrency=5) + pool = ProxyPool(cfg, [], timeout=10.0) + + now = time.time() + # 1000 proxies -> 1000//10=100, capped at 5 + for i in range(1000): + h = f"10.{i // 65536}.{(i // 256) % 256}.{i % 256}" + hop = ChainHop(proto="socks5", host=h, port=1080) + key = str(hop) + pool._proxies[key] = ProxyEntry(hop=hop, alive=False, last_seen=now) + + captured = {} + + original_semaphore = asyncio.Semaphore + + def capture_semaphore(value): + captured["concurrency"] = value + return original_semaphore(value) + + with ( + patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True), + patch("s5p.pool.asyncio.Semaphore", side_effect=capture_semaphore), + ): + asyncio.run(pool._run_health_tests()) + + assert captured["concurrency"] == 5 + + class TestProxyPoolHealthTests: """Test selective health testing."""