diff --git a/src/s5p/pool.py b/src/s5p/pool.py index f1d25d4..cc3b8fd 100644 --- a/src/s5p/pool.py +++ b/src/s5p/pool.py @@ -33,6 +33,7 @@ class ProxyEntry: last_seen: float = 0.0 last_ok: float = 0.0 last_test: float = 0.0 + last_fail: float = 0.0 fails: int = 0 tests: int = 0 alive: bool = False @@ -96,13 +97,25 @@ class ProxyPool: key = random.choices(self._alive_keys, weights=weights)[0] return self._proxies[key].hop + def report_failure(self, hop: ChainHop) -> None: + """Record a connection-time failure for a proxy.""" + key = f"{hop.proto}://{hop.host}:{hop.port}" + entry = self._proxies.get(key) + if entry: + entry.last_fail = time.time() + @staticmethod def _weight(entry: ProxyEntry, now: float) -> float: - """Compute selection weight from recency of last success.""" + """Compute selection weight from recency and failure backoff.""" if entry.last_ok <= 0: return 0.01 age = now - entry.last_ok - return 1.0 / (1.0 + age / 300.0) + w = 1.0 / (1.0 + age / 300.0) + # penalize proxies that failed since their last health-test success + if entry.last_fail >= entry.last_ok: + fail_age = now - entry.last_fail + w *= min(fail_age / 60.0, 1.0) # ramp back over 60s + return max(w, 0.01) @property def count(self) -> int: @@ -355,6 +368,7 @@ class ProxyPool: hop=hop, last_seen=entry.get("last_seen", 0.0), last_ok=entry.get("last_ok", 0.0), + last_fail=entry.get("last_fail", 0.0), fails=entry.get("fails", 0), tests=entry.get("tests", 0), alive=entry.get("alive", False), @@ -383,6 +397,7 @@ class ProxyPool: "password": entry.hop.password, "last_seen": entry.last_seen, "last_ok": entry.last_ok, + "last_fail": entry.last_fail, "fails": entry.fails, "tests": entry.tests, "alive": entry.alive, diff --git a/src/s5p/server.py b/src/s5p/server.py index 5f243ff..406d6d5 100644 --- a/src/s5p/server.py +++ b/src/s5p/server.py @@ -103,11 +103,12 @@ async def _handle_client( for attempt in range(attempts): effective_chain = list(config.chain) + pool_hop = None if proxy_pool: - hop = await proxy_pool.get() - if hop: - effective_chain.append(hop) - logger.debug("[%s] +proxy %s", tag, hop) + pool_hop = await proxy_pool.get() + if pool_hop: + effective_chain.append(pool_hop) + logger.debug("[%s] +proxy %s", tag, pool_hop) try: t0 = time.monotonic() @@ -119,6 +120,8 @@ async def _handle_client( break except (ProtoError, asyncio.TimeoutError, ConnectionError, OSError) as e: last_err = e + if pool_hop and isinstance(proxy_pool, ProxyPool): + proxy_pool.report_failure(pool_hop) if metrics: metrics.retries += 1 if attempt + 1 < attempts: diff --git a/tests/test_pool.py b/tests/test_pool.py index 131ed59..97b0e39 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -116,6 +116,42 @@ class TestProxyPoolWeight: entry.last_ok = 0 assert ProxyPool._weight(entry, now) == pytest.approx(0.01) + def test_weight_failure_penalty(self): + hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080) + now = 1000.0 + + # healthy proxy: weight ~1.0 + entry = ProxyEntry(hop=hop, last_ok=now) + assert ProxyPool._weight(entry, now) == pytest.approx(1.0) + + # just failed: weight drops to floor (fail_age=0 -> multiplier=0) + entry.last_fail = now + assert ProxyPool._weight(entry, now) == pytest.approx(0.01) + + # 30s after failure: base=1/(1+30/300)=0.909, penalty=30/60=0.5 -> ~0.45 + assert ProxyPool._weight(entry, now + 30) == pytest.approx(0.4545, abs=0.05) + + # 60s after failure: penalty=1.0, only base decay -> 1/(1+60/300)=0.833 + assert ProxyPool._weight(entry, now + 60) == pytest.approx(0.833, abs=0.05) + + def test_report_failure(self): + cfg = ProxyPoolConfig(sources=[]) + pool = ProxyPool(cfg, [], timeout=10.0) + hop = ChainHop(proto="socks5", host="1.2.3.4", port=1080) + pool._proxies["socks5://1.2.3.4:1080"] = ProxyEntry( + hop=hop, alive=True, last_ok=time.time(), + ) + + assert pool._proxies["socks5://1.2.3.4:1080"].last_fail == 0.0 + pool.report_failure(hop) + assert pool._proxies["socks5://1.2.3.4:1080"].last_fail > 0.0 + + def test_report_failure_unknown_proxy(self): + cfg = ProxyPoolConfig(sources=[]) + pool = ProxyPool(cfg, [], timeout=10.0) + hop = ChainHop(proto="socks5", host="9.9.9.9", port=1080) + pool.report_failure(hop) # should not raise + class TestProxyPoolFetchFile: """Test file source parsing."""