feat: replace HTTP health check with TLS handshake
Replace _http_check (HTTP GET to httpbin.org) with _tls_check that performs a TLS handshake through the proxy chain. Multiple targets (google, cloudflare, amazon) rotated round-robin eliminate the single point of failure. Lighter, faster, harder to block than HTTP. - Add test_targets config field (replaces test_url) - Backward compat: legacy test_url extracts hostname automatically - Add ssl.create_default_context() and round-robin index to ProxyPool - Update docs (example.yaml, USAGE.md, CHEATSHEET.md)
This commit is contained in:
@@ -31,7 +31,10 @@ chain:
|
||||
# - file: /etc/s5p/proxies.txt # text file, one proxy URL per line
|
||||
# refresh: 300 # re-fetch sources interval (seconds)
|
||||
# test_interval: 120 # health test cycle interval (seconds)
|
||||
# test_url: http://httpbin.org/ip # URL for health checks
|
||||
# test_targets: # TLS handshake targets (round-robin)
|
||||
# - www.google.com
|
||||
# - www.cloudflare.com
|
||||
# - www.amazon.com
|
||||
# test_timeout: 15 # per-test timeout (seconds)
|
||||
# test_concurrency: 5 # parallel health tests
|
||||
# max_fails: 3 # consecutive fails before eviction
|
||||
|
||||
@@ -57,6 +57,10 @@ proxy_pool:
|
||||
- file: /etc/s5p/proxies.txt
|
||||
refresh: 300 # re-fetch interval
|
||||
test_interval: 120 # health test cycle
|
||||
test_targets: # TLS handshake targets (round-robin)
|
||||
- www.google.com
|
||||
- www.cloudflare.com
|
||||
- www.amazon.com
|
||||
max_fails: 3 # evict after N fails
|
||||
report_url: "" # POST dead proxies (optional)
|
||||
```
|
||||
|
||||
@@ -59,7 +59,10 @@ proxy_pool:
|
||||
- file: /etc/s5p/proxies.txt
|
||||
refresh: 300
|
||||
test_interval: 120
|
||||
test_url: http://httpbin.org/ip
|
||||
test_targets: # TLS handshake targets (round-robin)
|
||||
- www.google.com
|
||||
- www.cloudflare.com
|
||||
- www.amazon.com
|
||||
test_timeout: 15
|
||||
test_concurrency: 5
|
||||
max_fails: 3
|
||||
@@ -107,7 +110,10 @@ proxy_pool:
|
||||
- file: /etc/s5p/proxies.txt # text file, one proxy URL per line
|
||||
refresh: 300 # re-fetch sources every 300 seconds
|
||||
test_interval: 120 # health test cycle every 120 seconds
|
||||
test_url: http://httpbin.org/ip # URL for health checks
|
||||
test_targets: # TLS handshake targets (round-robin)
|
||||
- www.google.com
|
||||
- www.cloudflare.com
|
||||
- www.amazon.com
|
||||
test_timeout: 15 # per-test timeout (seconds)
|
||||
test_concurrency: 5 # parallel health tests
|
||||
max_fails: 3 # evict after N consecutive failures
|
||||
@@ -134,8 +140,9 @@ http://proxy.example.com:8080
|
||||
### Health testing
|
||||
|
||||
Each cycle tests all proxies through the full chain (static chain + proxy)
|
||||
by sending an HTTP GET to `test_url`. Proxies are marked alive on `200` response.
|
||||
After `max_fails` consecutive failures, a proxy is evicted.
|
||||
by performing a TLS handshake against one of the `test_targets` (rotated
|
||||
round-robin). A successful handshake marks the proxy alive. After `max_fails`
|
||||
consecutive failures, a proxy is evicted.
|
||||
|
||||
Before each health test cycle, the static chain is tested without any pool
|
||||
proxy. If the chain itself is unreachable (e.g., Tor is down), proxy tests
|
||||
|
||||
@@ -45,13 +45,26 @@ class ProxyPoolConfig:
|
||||
sources: list[PoolSourceConfig] = field(default_factory=list)
|
||||
refresh: float = 300.0
|
||||
test_interval: float = 120.0
|
||||
test_url: str = "http://httpbin.org/ip"
|
||||
test_url: str = "" # deprecated, kept for backward compat
|
||||
test_targets: list[str] = field(default_factory=lambda: [
|
||||
"www.google.com",
|
||||
"www.cloudflare.com",
|
||||
"www.amazon.com",
|
||||
])
|
||||
test_timeout: float = 15.0
|
||||
test_concurrency: int = 5
|
||||
max_fails: int = 3
|
||||
state_file: str = ""
|
||||
report_url: str = ""
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
"""Backward compat: extract hostname from legacy test_url."""
|
||||
defaults = ["www.google.com", "www.cloudflare.com", "www.amazon.com"]
|
||||
if self.test_url and self.test_targets == defaults:
|
||||
host = urlparse(self.test_url).hostname
|
||||
if host:
|
||||
self.test_targets = [host]
|
||||
|
||||
|
||||
@dataclass
|
||||
class TorConfig:
|
||||
@@ -199,17 +212,20 @@ def load_config(path: str | Path) -> Config:
|
||||
limit=src.get("limit", 1000),
|
||||
)
|
||||
)
|
||||
config.proxy_pool = ProxyPoolConfig(
|
||||
sources=sources,
|
||||
refresh=float(pool_raw.get("refresh", 300)),
|
||||
test_interval=float(pool_raw.get("test_interval", 120)),
|
||||
test_url=pool_raw.get("test_url", "http://httpbin.org/ip"),
|
||||
test_timeout=float(pool_raw.get("test_timeout", 15)),
|
||||
test_concurrency=int(pool_raw.get("test_concurrency", 5)),
|
||||
max_fails=int(pool_raw.get("max_fails", 3)),
|
||||
state_file=pool_raw.get("state_file", ""),
|
||||
report_url=pool_raw.get("report_url", ""),
|
||||
)
|
||||
kwargs: dict = {
|
||||
"sources": sources,
|
||||
"refresh": float(pool_raw.get("refresh", 300)),
|
||||
"test_interval": float(pool_raw.get("test_interval", 120)),
|
||||
"test_url": pool_raw.get("test_url", ""),
|
||||
"test_timeout": float(pool_raw.get("test_timeout", 15)),
|
||||
"test_concurrency": int(pool_raw.get("test_concurrency", 5)),
|
||||
"max_fails": int(pool_raw.get("max_fails", 3)),
|
||||
"state_file": pool_raw.get("state_file", ""),
|
||||
"report_url": pool_raw.get("report_url", ""),
|
||||
}
|
||||
if "test_targets" in pool_raw:
|
||||
kwargs["test_targets"] = list(pool_raw["test_targets"])
|
||||
config.proxy_pool = ProxyPoolConfig(**kwargs)
|
||||
elif "proxy_source" in raw:
|
||||
# backward compat: convert legacy proxy_source to proxy_pool
|
||||
src_raw = raw["proxy_source"]
|
||||
|
||||
@@ -7,10 +7,11 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import ssl
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlencode, urlparse
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from .config import ChainHop, PoolSourceConfig, ProxyPoolConfig, parse_api_proxies, parse_proxy_url
|
||||
from .http import http_get_json, http_post_json
|
||||
@@ -62,6 +63,8 @@ class ProxyPool:
|
||||
self._tasks: list[asyncio.Task] = []
|
||||
self._stop = asyncio.Event()
|
||||
self._state_path = self._resolve_state_path()
|
||||
self._ssl_ctx = ssl.create_default_context()
|
||||
self._target_idx = 0
|
||||
|
||||
# -- public interface ----------------------------------------------------
|
||||
|
||||
@@ -234,48 +237,52 @@ class ProxyPool:
|
||||
|
||||
# -- health testing ------------------------------------------------------
|
||||
|
||||
async def _http_check(self, chain: list[ChainHop]) -> bool:
|
||||
"""Send an HTTP GET through *chain* and return True on 2xx."""
|
||||
parsed = urlparse(self._cfg.test_url)
|
||||
host = parsed.hostname or "httpbin.org"
|
||||
port = parsed.port or 80
|
||||
path = parsed.path or "/"
|
||||
async def _tls_check(self, chain: list[ChainHop]) -> bool:
|
||||
"""Perform a TLS handshake through *chain* and return True on success."""
|
||||
targets = self._cfg.test_targets
|
||||
if not targets:
|
||||
return False
|
||||
|
||||
host = targets[self._target_idx % len(targets)]
|
||||
self._target_idx += 1
|
||||
|
||||
try:
|
||||
reader, writer = await build_chain(
|
||||
chain, host, port, timeout=self._cfg.test_timeout,
|
||||
chain, host, 443, timeout=self._cfg.test_timeout,
|
||||
)
|
||||
except (ProtoError, TimeoutError, ConnectionError, OSError, EOFError):
|
||||
return False
|
||||
|
||||
try:
|
||||
request = f"GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"
|
||||
writer.write(request.encode())
|
||||
await writer.drain()
|
||||
|
||||
line = await asyncio.wait_for(reader.readline(), timeout=self._cfg.test_timeout)
|
||||
parts = line.decode("utf-8", errors="replace").split(None, 2)
|
||||
return len(parts) >= 2 and parts[1].startswith("2")
|
||||
except (TimeoutError, ConnectionError, OSError, EOFError):
|
||||
transport = writer.transport
|
||||
protocol = transport.get_protocol()
|
||||
loop = asyncio.get_running_loop()
|
||||
new_transport = await asyncio.wait_for(
|
||||
loop.start_tls(transport, protocol, self._ssl_ctx, server_hostname=host),
|
||||
timeout=self._cfg.test_timeout,
|
||||
)
|
||||
new_transport.close()
|
||||
return True
|
||||
except (ssl.SSLError, TimeoutError, ConnectionError, OSError, EOFError):
|
||||
return False
|
||||
finally:
|
||||
try:
|
||||
if not writer.is_closing():
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
async def _test_proxy(self, key: str, entry: ProxyEntry) -> bool:
|
||||
"""Test a single proxy by building the full chain and sending HTTP GET."""
|
||||
"""Test a single proxy via TLS handshake through the full chain."""
|
||||
entry.last_test = time.time()
|
||||
entry.tests += 1
|
||||
return await self._http_check(self._chain + [entry.hop])
|
||||
return await self._tls_check(self._chain + [entry.hop])
|
||||
|
||||
async def _test_chain(self) -> bool:
|
||||
"""Test the static chain without any pool proxy."""
|
||||
if not self._chain:
|
||||
return True
|
||||
return await self._http_check(self._chain)
|
||||
return await self._tls_check(self._chain)
|
||||
|
||||
async def _run_health_tests(self, keys: list[str] | None = None) -> None:
|
||||
"""Test proxies with bounded concurrency.
|
||||
|
||||
@@ -175,3 +175,40 @@ class TestConfig:
|
||||
cfg_file.write_text("listen: 1080\n")
|
||||
c = load_config(cfg_file)
|
||||
assert c.tor is None
|
||||
|
||||
def test_proxy_pool_test_targets(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text(
|
||||
"proxy_pool:\n"
|
||||
" sources: []\n"
|
||||
" test_targets:\n"
|
||||
" - host-a.example.com\n"
|
||||
" - host-b.example.com\n"
|
||||
)
|
||||
c = load_config(cfg_file)
|
||||
assert c.proxy_pool is not None
|
||||
assert c.proxy_pool.test_targets == ["host-a.example.com", "host-b.example.com"]
|
||||
assert c.proxy_pool.test_url == ""
|
||||
|
||||
def test_proxy_pool_legacy_test_url(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text(
|
||||
"proxy_pool:\n"
|
||||
" sources: []\n"
|
||||
" test_url: http://httpbin.org/ip\n"
|
||||
)
|
||||
c = load_config(cfg_file)
|
||||
assert c.proxy_pool is not None
|
||||
assert c.proxy_pool.test_targets == ["httpbin.org"]
|
||||
|
||||
def test_proxy_pool_defaults(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text(
|
||||
"proxy_pool:\n"
|
||||
" sources: []\n"
|
||||
)
|
||||
c = load_config(cfg_file)
|
||||
assert c.proxy_pool is not None
|
||||
assert c.proxy_pool.test_targets == [
|
||||
"www.google.com", "www.cloudflare.com", "www.amazon.com",
|
||||
]
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
"""Tests for the managed proxy pool."""
|
||||
|
||||
import asyncio
|
||||
import ssl
|
||||
import time
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -468,3 +471,118 @@ class TestProxyPoolPersistence:
|
||||
entry = pool2._proxies["socks5://1.2.3.4:1080"]
|
||||
assert entry.hop.username == "user"
|
||||
assert entry.hop.password == "pass"
|
||||
|
||||
|
||||
class TestTlsCheck:
|
||||
"""Test TLS handshake health check."""
|
||||
|
||||
def _make_pool(self, **kwargs):
|
||||
cfg = ProxyPoolConfig(sources=[], **kwargs)
|
||||
return ProxyPool(cfg, [], timeout=10.0)
|
||||
|
||||
def test_success(self):
|
||||
pool = self._make_pool(test_targets=["www.example.com"])
|
||||
|
||||
mock_writer = MagicMock()
|
||||
mock_writer.is_closing.return_value = False
|
||||
mock_transport = MagicMock()
|
||||
mock_protocol = MagicMock()
|
||||
mock_transport.get_protocol.return_value = mock_protocol
|
||||
mock_writer.transport = mock_transport
|
||||
|
||||
new_transport = MagicMock()
|
||||
|
||||
chain_ret = (MagicMock(), mock_writer)
|
||||
with (
|
||||
patch("s5p.pool.build_chain", new_callable=AsyncMock, return_value=chain_ret),
|
||||
patch("asyncio.get_running_loop") as mock_loop_fn,
|
||||
):
|
||||
mock_loop = MagicMock()
|
||||
mock_loop.start_tls = AsyncMock(return_value=new_transport)
|
||||
mock_loop_fn.return_value = mock_loop
|
||||
|
||||
result = asyncio.run(pool._tls_check([]))
|
||||
|
||||
assert result is True
|
||||
mock_loop.start_tls.assert_called_once_with(
|
||||
mock_transport, mock_protocol, pool._ssl_ctx,
|
||||
server_hostname="www.example.com",
|
||||
)
|
||||
new_transport.close.assert_called_once()
|
||||
|
||||
def test_build_chain_failure(self):
|
||||
pool = self._make_pool(test_targets=["www.example.com"])
|
||||
|
||||
with patch(
|
||||
"s5p.pool.build_chain", new_callable=AsyncMock,
|
||||
side_effect=ConnectionError("refused"),
|
||||
):
|
||||
result = asyncio.run(pool._tls_check([]))
|
||||
|
||||
assert result is False
|
||||
|
||||
def test_handshake_failure(self):
|
||||
pool = self._make_pool(test_targets=["www.example.com"])
|
||||
|
||||
mock_writer = MagicMock()
|
||||
mock_writer.is_closing.return_value = False
|
||||
mock_transport = MagicMock()
|
||||
mock_transport.get_protocol.return_value = MagicMock()
|
||||
mock_writer.transport = mock_transport
|
||||
|
||||
chain_ret = (MagicMock(), mock_writer)
|
||||
with (
|
||||
patch("s5p.pool.build_chain", new_callable=AsyncMock, return_value=chain_ret),
|
||||
patch("asyncio.get_running_loop") as mock_loop_fn,
|
||||
):
|
||||
mock_loop = MagicMock()
|
||||
mock_loop.start_tls = AsyncMock(
|
||||
side_effect=ssl.SSLError("handshake failed"),
|
||||
)
|
||||
mock_loop_fn.return_value = mock_loop
|
||||
|
||||
result = asyncio.run(pool._tls_check([]))
|
||||
|
||||
assert result is False
|
||||
|
||||
def test_round_robin_rotation(self):
|
||||
targets = ["host-a.example.com", "host-b.example.com", "host-c.example.com"]
|
||||
pool = self._make_pool(test_targets=targets)
|
||||
|
||||
selected: list[str] = []
|
||||
|
||||
async def fake_build_chain(chain, host, port, timeout=None):
|
||||
selected.append(host)
|
||||
raise ConnectionError("test")
|
||||
|
||||
with patch("s5p.pool.build_chain", side_effect=fake_build_chain):
|
||||
for _ in range(6):
|
||||
asyncio.run(pool._tls_check([]))
|
||||
|
||||
assert selected == ["host-a.example.com", "host-b.example.com", "host-c.example.com",
|
||||
"host-a.example.com", "host-b.example.com", "host-c.example.com"]
|
||||
|
||||
def test_empty_targets(self):
|
||||
pool = self._make_pool(test_targets=[])
|
||||
result = asyncio.run(pool._tls_check([]))
|
||||
assert result is False
|
||||
|
||||
|
||||
class TestProxyPoolConfigCompat:
|
||||
"""Test backward compatibility for test_url -> test_targets."""
|
||||
|
||||
def test_legacy_test_url_converts(self):
|
||||
cfg = ProxyPoolConfig(test_url="http://httpbin.org/ip")
|
||||
assert cfg.test_targets == ["httpbin.org"]
|
||||
|
||||
def test_explicit_test_targets_wins(self):
|
||||
cfg = ProxyPoolConfig(
|
||||
test_url="http://httpbin.org/ip",
|
||||
test_targets=["custom.example.com"],
|
||||
)
|
||||
assert cfg.test_targets == ["custom.example.com"]
|
||||
|
||||
def test_defaults_when_neither_set(self):
|
||||
cfg = ProxyPoolConfig()
|
||||
assert cfg.test_targets == ["www.google.com", "www.cloudflare.com", "www.amazon.com"]
|
||||
assert cfg.test_url == ""
|
||||
|
||||
Reference in New Issue
Block a user