diff --git a/src/s5p/config.py b/src/s5p/config.py index 16fe35d..6e6a7ab 100644 --- a/src/s5p/config.py +++ b/src/s5p/config.py @@ -60,6 +60,7 @@ class ProxyPoolConfig: test_concurrency: int = 5 max_fails: int = 3 state_file: str = "" + report_url: str = "" @dataclass @@ -162,6 +163,7 @@ def load_config(path: str | Path) -> Config: test_concurrency=int(pp.get("test_concurrency", 5)), max_fails=int(pp.get("max_fails", 3)), state_file=pp.get("state_file", ""), + report_url=pp.get("report_url", ""), ) elif "proxy_source" in raw: # backward compat: convert legacy proxy_source to proxy_pool diff --git a/src/s5p/pool.py b/src/s5p/pool.py index 1c5d1ba..cf8e905 100644 --- a/src/s5p/pool.py +++ b/src/s5p/pool.py @@ -399,6 +399,42 @@ class ProxyPool: suffix, ) + # report evicted proxies to upstream API + if evict_keys and self._cfg.report_url: + dead = [k for k in evict_keys] + asyncio.ensure_future(self._report_dead(dead)) + + async def _report_dead(self, keys: list[str]) -> None: + """POST dead proxy list to report_url (fire-and-forget).""" + dead = [] + for key in keys: + # key format: proto://host:port + proto, _, addr = key.partition("://") + if addr: + dead.append({"proto": proto, "proxy": addr}) + + if not dead: + return + + loop = asyncio.get_running_loop() + try: + await loop.run_in_executor(None, self._report_sync, dead) + logger.info("pool: reported %d dead proxies to %s", len(dead), self._cfg.report_url) + except Exception as e: + logger.debug("pool: report failed: %s", e) + + def _report_sync(self, dead: list[dict[str, str]]) -> None: + """Synchronous POST to report_url (runs in executor).""" + payload = json.dumps({"dead": dead}).encode() + req = urllib.request.Request( + self._cfg.report_url, + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=10): + pass + def _rebuild_alive(self) -> None: """Rebuild the alive keys list from current state.""" self._alive_keys = [k for k, e in self._proxies.items() if e.alive] diff --git a/tests/test_pool.py b/tests/test_pool.py index 47d2fca..0cf7540 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -257,6 +257,70 @@ class TestProxyPoolHealthTests: assert pool._proxies["socks5://10.0.0.1:1080"].alive is True +class TestProxyPoolReport: + """Test dead proxy reporting.""" + + def test_report_called_on_eviction(self): + import asyncio + from unittest.mock import AsyncMock, patch + + cfg = ProxyPoolConfig(sources=[], report_url="http://api:8081/report", max_fails=1) + pool = ProxyPool(cfg, [], timeout=10.0) + + now = time.time() + hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080) + pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry( + hop=hop, alive=False, last_seen=now, fails=0, + ) + + with ( + patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=False), + patch.object(pool, "_report_dead", new_callable=AsyncMock) as mock_report, + ): + asyncio.run(pool._run_health_tests()) + # proxy should be evicted (fails=1 >= max_fails=1) + assert "socks5://10.0.0.1:1080" not in pool._proxies + mock_report.assert_called_once() + keys = mock_report.call_args[0][0] + assert "socks5://10.0.0.1:1080" in keys + + def test_report_not_called_without_url(self): + import asyncio + from unittest.mock import AsyncMock, patch + + cfg = ProxyPoolConfig(sources=[], max_fails=1) # no report_url + pool = ProxyPool(cfg, [], timeout=10.0) + + now = time.time() + hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080) + pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry( + hop=hop, alive=False, last_seen=now, fails=0, + ) + + with ( + patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=False), + patch.object(pool, "_report_dead", new_callable=AsyncMock) as mock_report, + ): + asyncio.run(pool._run_health_tests()) + mock_report.assert_not_called() + + def test_report_sync_payload(self): + from unittest.mock import MagicMock, patch + + cfg = ProxyPoolConfig(sources=[], report_url="http://api:8081/report") + pool = ProxyPool(cfg, [], timeout=10.0) + + dead = [{"proto": "socks5", "proxy": "10.0.0.1:1080"}] + with patch("s5p.pool.urllib.request.urlopen", new_callable=MagicMock) as mock_open: + mock_open.return_value.__enter__ = MagicMock() + mock_open.return_value.__exit__ = MagicMock(return_value=False) + pool._report_sync(dead) + req = mock_open.call_args[0][0] + assert req.method == "POST" + assert req.full_url == "http://api:8081/report" + assert b'"dead"' in req.data + + class TestProxyPoolStaleExpiry: """Test stale proxy eviction."""