feat: add per-proxy backoff after connection failure
Track last_fail timestamp on ProxyEntry. When a connection attempt fails in server.py, report_failure() records the time. The selection weight multiplies by min(fail_age/60, 1.0), ramping back from floor over 60s. Prevents wasting retries on proxies that just failed.
This commit is contained in:
@@ -33,6 +33,7 @@ class ProxyEntry:
|
|||||||
last_seen: float = 0.0
|
last_seen: float = 0.0
|
||||||
last_ok: float = 0.0
|
last_ok: float = 0.0
|
||||||
last_test: float = 0.0
|
last_test: float = 0.0
|
||||||
|
last_fail: float = 0.0
|
||||||
fails: int = 0
|
fails: int = 0
|
||||||
tests: int = 0
|
tests: int = 0
|
||||||
alive: bool = False
|
alive: bool = False
|
||||||
@@ -96,13 +97,25 @@ class ProxyPool:
|
|||||||
key = random.choices(self._alive_keys, weights=weights)[0]
|
key = random.choices(self._alive_keys, weights=weights)[0]
|
||||||
return self._proxies[key].hop
|
return self._proxies[key].hop
|
||||||
|
|
||||||
|
def report_failure(self, hop: ChainHop) -> None:
|
||||||
|
"""Record a connection-time failure for a proxy."""
|
||||||
|
key = f"{hop.proto}://{hop.host}:{hop.port}"
|
||||||
|
entry = self._proxies.get(key)
|
||||||
|
if entry:
|
||||||
|
entry.last_fail = time.time()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _weight(entry: ProxyEntry, now: float) -> float:
|
def _weight(entry: ProxyEntry, now: float) -> float:
|
||||||
"""Compute selection weight from recency of last success."""
|
"""Compute selection weight from recency and failure backoff."""
|
||||||
if entry.last_ok <= 0:
|
if entry.last_ok <= 0:
|
||||||
return 0.01
|
return 0.01
|
||||||
age = now - entry.last_ok
|
age = now - entry.last_ok
|
||||||
return 1.0 / (1.0 + age / 300.0)
|
w = 1.0 / (1.0 + age / 300.0)
|
||||||
|
# penalize proxies that failed since their last health-test success
|
||||||
|
if entry.last_fail >= entry.last_ok:
|
||||||
|
fail_age = now - entry.last_fail
|
||||||
|
w *= min(fail_age / 60.0, 1.0) # ramp back over 60s
|
||||||
|
return max(w, 0.01)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def count(self) -> int:
|
def count(self) -> int:
|
||||||
@@ -355,6 +368,7 @@ class ProxyPool:
|
|||||||
hop=hop,
|
hop=hop,
|
||||||
last_seen=entry.get("last_seen", 0.0),
|
last_seen=entry.get("last_seen", 0.0),
|
||||||
last_ok=entry.get("last_ok", 0.0),
|
last_ok=entry.get("last_ok", 0.0),
|
||||||
|
last_fail=entry.get("last_fail", 0.0),
|
||||||
fails=entry.get("fails", 0),
|
fails=entry.get("fails", 0),
|
||||||
tests=entry.get("tests", 0),
|
tests=entry.get("tests", 0),
|
||||||
alive=entry.get("alive", False),
|
alive=entry.get("alive", False),
|
||||||
@@ -383,6 +397,7 @@ class ProxyPool:
|
|||||||
"password": entry.hop.password,
|
"password": entry.hop.password,
|
||||||
"last_seen": entry.last_seen,
|
"last_seen": entry.last_seen,
|
||||||
"last_ok": entry.last_ok,
|
"last_ok": entry.last_ok,
|
||||||
|
"last_fail": entry.last_fail,
|
||||||
"fails": entry.fails,
|
"fails": entry.fails,
|
||||||
"tests": entry.tests,
|
"tests": entry.tests,
|
||||||
"alive": entry.alive,
|
"alive": entry.alive,
|
||||||
|
|||||||
@@ -103,11 +103,12 @@ async def _handle_client(
|
|||||||
|
|
||||||
for attempt in range(attempts):
|
for attempt in range(attempts):
|
||||||
effective_chain = list(config.chain)
|
effective_chain = list(config.chain)
|
||||||
|
pool_hop = None
|
||||||
if proxy_pool:
|
if proxy_pool:
|
||||||
hop = await proxy_pool.get()
|
pool_hop = await proxy_pool.get()
|
||||||
if hop:
|
if pool_hop:
|
||||||
effective_chain.append(hop)
|
effective_chain.append(pool_hop)
|
||||||
logger.debug("[%s] +proxy %s", tag, hop)
|
logger.debug("[%s] +proxy %s", tag, pool_hop)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
t0 = time.monotonic()
|
t0 = time.monotonic()
|
||||||
@@ -119,6 +120,8 @@ async def _handle_client(
|
|||||||
break
|
break
|
||||||
except (ProtoError, asyncio.TimeoutError, ConnectionError, OSError) as e:
|
except (ProtoError, asyncio.TimeoutError, ConnectionError, OSError) as e:
|
||||||
last_err = e
|
last_err = e
|
||||||
|
if pool_hop and isinstance(proxy_pool, ProxyPool):
|
||||||
|
proxy_pool.report_failure(pool_hop)
|
||||||
if metrics:
|
if metrics:
|
||||||
metrics.retries += 1
|
metrics.retries += 1
|
||||||
if attempt + 1 < attempts:
|
if attempt + 1 < attempts:
|
||||||
|
|||||||
@@ -116,6 +116,42 @@ class TestProxyPoolWeight:
|
|||||||
entry.last_ok = 0
|
entry.last_ok = 0
|
||||||
assert ProxyPool._weight(entry, now) == pytest.approx(0.01)
|
assert ProxyPool._weight(entry, now) == pytest.approx(0.01)
|
||||||
|
|
||||||
|
def test_weight_failure_penalty(self):
|
||||||
|
hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080)
|
||||||
|
now = 1000.0
|
||||||
|
|
||||||
|
# healthy proxy: weight ~1.0
|
||||||
|
entry = ProxyEntry(hop=hop, last_ok=now)
|
||||||
|
assert ProxyPool._weight(entry, now) == pytest.approx(1.0)
|
||||||
|
|
||||||
|
# just failed: weight drops to floor (fail_age=0 -> multiplier=0)
|
||||||
|
entry.last_fail = now
|
||||||
|
assert ProxyPool._weight(entry, now) == pytest.approx(0.01)
|
||||||
|
|
||||||
|
# 30s after failure: base=1/(1+30/300)=0.909, penalty=30/60=0.5 -> ~0.45
|
||||||
|
assert ProxyPool._weight(entry, now + 30) == pytest.approx(0.4545, abs=0.05)
|
||||||
|
|
||||||
|
# 60s after failure: penalty=1.0, only base decay -> 1/(1+60/300)=0.833
|
||||||
|
assert ProxyPool._weight(entry, now + 60) == pytest.approx(0.833, abs=0.05)
|
||||||
|
|
||||||
|
def test_report_failure(self):
|
||||||
|
cfg = ProxyPoolConfig(sources=[])
|
||||||
|
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||||
|
hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080)
|
||||||
|
pool._proxies["socks5://1.2.3.4:1080"] = ProxyEntry(
|
||||||
|
hop=hop, alive=True, last_ok=time.time(),
|
||||||
|
)
|
||||||
|
|
||||||
|
assert pool._proxies["socks5://1.2.3.4:1080"].last_fail == 0.0
|
||||||
|
pool.report_failure(hop)
|
||||||
|
assert pool._proxies["socks5://1.2.3.4:1080"].last_fail > 0.0
|
||||||
|
|
||||||
|
def test_report_failure_unknown_proxy(self):
|
||||||
|
cfg = ProxyPoolConfig(sources=[])
|
||||||
|
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||||
|
hop = ChainHop(proto="socks5", host="9.9.9.9", port=1080)
|
||||||
|
pool.report_failure(hop) # should not raise
|
||||||
|
|
||||||
|
|
||||||
class TestProxyPoolFetchFile:
|
class TestProxyPoolFetchFile:
|
||||||
"""Test file source parsing."""
|
"""Test file source parsing."""
|
||||||
|
|||||||
Reference in New Issue
Block a user