feat: add dead proxy reporting to source API
When report_url is configured, POST evicted proxy list as JSON after
each health test cycle. Fire-and-forget: failures are logged at debug
level. Payload format: {"dead": [{"proto": "socks5", "proxy": "host:port"}]}.
This commit is contained in:
@@ -60,6 +60,7 @@ class ProxyPoolConfig:
|
||||
test_concurrency: int = 5
|
||||
max_fails: int = 3
|
||||
state_file: str = ""
|
||||
report_url: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -162,6 +163,7 @@ def load_config(path: str | Path) -> Config:
|
||||
test_concurrency=int(pp.get("test_concurrency", 5)),
|
||||
max_fails=int(pp.get("max_fails", 3)),
|
||||
state_file=pp.get("state_file", ""),
|
||||
report_url=pp.get("report_url", ""),
|
||||
)
|
||||
elif "proxy_source" in raw:
|
||||
# backward compat: convert legacy proxy_source to proxy_pool
|
||||
|
||||
@@ -399,6 +399,42 @@ class ProxyPool:
|
||||
suffix,
|
||||
)
|
||||
|
||||
# report evicted proxies to upstream API
|
||||
if evict_keys and self._cfg.report_url:
|
||||
dead = [k for k in evict_keys]
|
||||
asyncio.ensure_future(self._report_dead(dead))
|
||||
|
||||
async def _report_dead(self, keys: list[str]) -> None:
|
||||
"""POST dead proxy list to report_url (fire-and-forget)."""
|
||||
dead = []
|
||||
for key in keys:
|
||||
# key format: proto://host:port
|
||||
proto, _, addr = key.partition("://")
|
||||
if addr:
|
||||
dead.append({"proto": proto, "proxy": addr})
|
||||
|
||||
if not dead:
|
||||
return
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
try:
|
||||
await loop.run_in_executor(None, self._report_sync, dead)
|
||||
logger.info("pool: reported %d dead proxies to %s", len(dead), self._cfg.report_url)
|
||||
except Exception as e:
|
||||
logger.debug("pool: report failed: %s", e)
|
||||
|
||||
def _report_sync(self, dead: list[dict[str, str]]) -> None:
|
||||
"""Synchronous POST to report_url (runs in executor)."""
|
||||
payload = json.dumps({"dead": dead}).encode()
|
||||
req = urllib.request.Request(
|
||||
self._cfg.report_url,
|
||||
data=payload,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=10):
|
||||
pass
|
||||
|
||||
def _rebuild_alive(self) -> None:
|
||||
"""Rebuild the alive keys list from current state."""
|
||||
self._alive_keys = [k for k, e in self._proxies.items() if e.alive]
|
||||
|
||||
@@ -257,6 +257,70 @@ class TestProxyPoolHealthTests:
|
||||
assert pool._proxies["socks5://10.0.0.1:1080"].alive is True
|
||||
|
||||
|
||||
class TestProxyPoolReport:
|
||||
"""Test dead proxy reporting."""
|
||||
|
||||
def test_report_called_on_eviction(self):
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
cfg = ProxyPoolConfig(sources=[], report_url="http://api:8081/report", max_fails=1)
|
||||
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||
|
||||
now = time.time()
|
||||
hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080)
|
||||
pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry(
|
||||
hop=hop, alive=False, last_seen=now, fails=0,
|
||||
)
|
||||
|
||||
with (
|
||||
patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=False),
|
||||
patch.object(pool, "_report_dead", new_callable=AsyncMock) as mock_report,
|
||||
):
|
||||
asyncio.run(pool._run_health_tests())
|
||||
# proxy should be evicted (fails=1 >= max_fails=1)
|
||||
assert "socks5://10.0.0.1:1080" not in pool._proxies
|
||||
mock_report.assert_called_once()
|
||||
keys = mock_report.call_args[0][0]
|
||||
assert "socks5://10.0.0.1:1080" in keys
|
||||
|
||||
def test_report_not_called_without_url(self):
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
cfg = ProxyPoolConfig(sources=[], max_fails=1) # no report_url
|
||||
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||
|
||||
now = time.time()
|
||||
hop = ChainHop(proto="socks5", host="10.0.0.1", port=1080)
|
||||
pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry(
|
||||
hop=hop, alive=False, last_seen=now, fails=0,
|
||||
)
|
||||
|
||||
with (
|
||||
patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=False),
|
||||
patch.object(pool, "_report_dead", new_callable=AsyncMock) as mock_report,
|
||||
):
|
||||
asyncio.run(pool._run_health_tests())
|
||||
mock_report.assert_not_called()
|
||||
|
||||
def test_report_sync_payload(self):
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
cfg = ProxyPoolConfig(sources=[], report_url="http://api:8081/report")
|
||||
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||
|
||||
dead = [{"proto": "socks5", "proxy": "10.0.0.1:1080"}]
|
||||
with patch("s5p.pool.urllib.request.urlopen", new_callable=MagicMock) as mock_open:
|
||||
mock_open.return_value.__enter__ = MagicMock()
|
||||
mock_open.return_value.__exit__ = MagicMock(return_value=False)
|
||||
pool._report_sync(dead)
|
||||
req = mock_open.call_args[0][0]
|
||||
assert req.method == "POST"
|
||||
assert req.full_url == "http://api:8081/report"
|
||||
assert b'"dead"' in req.data
|
||||
|
||||
|
||||
class TestProxyPoolStaleExpiry:
|
||||
"""Test stale proxy eviction."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user