feat: add fast warm start with deferred full health test
On warm start (state has alive proxies), only quick-test the previously-alive subset before serving. Full health test runs in background. Cold start behavior unchanged (test all before serving). Reduces startup blocking from minutes to seconds on warm restarts.
This commit is contained in:
@@ -69,9 +69,24 @@ class ProxyPool:
|
|||||||
async def start(self) -> None:
|
async def start(self) -> None:
|
||||||
"""Load state, fetch sources, run initial health test, start loops."""
|
"""Load state, fetch sources, run initial health test, start loops."""
|
||||||
self._load_state()
|
self._load_state()
|
||||||
|
warm_keys = list(self._alive_keys)
|
||||||
await self._fetch_all_sources()
|
await self._fetch_all_sources()
|
||||||
await self._run_health_tests()
|
|
||||||
self._save_state()
|
if warm_keys:
|
||||||
|
# warm start: quick-test previously-alive proxies first
|
||||||
|
valid_keys = [k for k in warm_keys if k in self._proxies]
|
||||||
|
if valid_keys:
|
||||||
|
await self._run_health_tests(keys=valid_keys)
|
||||||
|
self._save_state()
|
||||||
|
self._tasks.append(asyncio.create_task(self._deferred_full_test()))
|
||||||
|
else:
|
||||||
|
await self._run_health_tests()
|
||||||
|
self._save_state()
|
||||||
|
else:
|
||||||
|
# cold start: test everything before serving
|
||||||
|
await self._run_health_tests()
|
||||||
|
self._save_state()
|
||||||
|
|
||||||
self._tasks.append(asyncio.create_task(self._refresh_loop()))
|
self._tasks.append(asyncio.create_task(self._refresh_loop()))
|
||||||
self._tasks.append(asyncio.create_task(self._health_loop()))
|
self._tasks.append(asyncio.create_task(self._health_loop()))
|
||||||
|
|
||||||
@@ -252,11 +267,23 @@ class ProxyPool:
|
|||||||
except OSError:
|
except OSError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def _run_health_tests(self) -> None:
|
async def _run_health_tests(self, keys: list[str] | None = None) -> None:
|
||||||
"""Test all proxies with bounded concurrency."""
|
"""Test proxies with bounded concurrency.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
keys: Subset of proxy keys to test. None tests all.
|
||||||
|
"""
|
||||||
if not self._proxies:
|
if not self._proxies:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
target = (
|
||||||
|
[(k, self._proxies[k]) for k in keys if k in self._proxies]
|
||||||
|
if keys is not None
|
||||||
|
else list(self._proxies.items())
|
||||||
|
)
|
||||||
|
if not target:
|
||||||
|
return
|
||||||
|
|
||||||
sem = asyncio.Semaphore(self._cfg.test_concurrency)
|
sem = asyncio.Semaphore(self._cfg.test_concurrency)
|
||||||
results: dict[str, bool] = {}
|
results: dict[str, bool] = {}
|
||||||
|
|
||||||
@@ -267,7 +294,7 @@ class ProxyPool:
|
|||||||
except Exception:
|
except Exception:
|
||||||
results[key] = False
|
results[key] = False
|
||||||
|
|
||||||
tasks = [_test(k, e) for k, e in list(self._proxies.items())]
|
tasks = [_test(k, e) for k, e in target]
|
||||||
await asyncio.gather(*tasks)
|
await asyncio.gather(*tasks)
|
||||||
|
|
||||||
total = len(results)
|
total = len(results)
|
||||||
@@ -330,6 +357,11 @@ class ProxyPool:
|
|||||||
|
|
||||||
# -- background loops ----------------------------------------------------
|
# -- background loops ----------------------------------------------------
|
||||||
|
|
||||||
|
async def _deferred_full_test(self) -> None:
|
||||||
|
"""Run a full health test in background after warm start."""
|
||||||
|
await self._run_health_tests()
|
||||||
|
self._save_state()
|
||||||
|
|
||||||
async def _refresh_loop(self) -> None:
|
async def _refresh_loop(self) -> None:
|
||||||
"""Periodically re-fetch sources and merge."""
|
"""Periodically re-fetch sources and merge."""
|
||||||
while not self._stop.is_set():
|
while not self._stop.is_set():
|
||||||
|
|||||||
@@ -153,6 +153,38 @@ class TestProxyPoolWeight:
|
|||||||
pool.report_failure(hop) # should not raise
|
pool.report_failure(hop) # should not raise
|
||||||
|
|
||||||
|
|
||||||
|
class TestProxyPoolHealthTests:
|
||||||
|
"""Test selective health testing."""
|
||||||
|
|
||||||
|
def test_selective_keys(self):
|
||||||
|
import asyncio
|
||||||
|
from unittest.mock import AsyncMock, patch
|
||||||
|
|
||||||
|
cfg = ProxyPoolConfig(sources=[])
|
||||||
|
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
hop_a = ChainHop(proto="socks5", host="10.0.0.1", port=1080)
|
||||||
|
hop_b = ChainHop(proto="socks5", host="10.0.0.2", port=1080)
|
||||||
|
pool._proxies["socks5://10.0.0.1:1080"] = ProxyEntry(
|
||||||
|
hop=hop_a, alive=False, last_seen=now,
|
||||||
|
)
|
||||||
|
pool._proxies["socks5://10.0.0.2:1080"] = ProxyEntry(
|
||||||
|
hop=hop_b, alive=False, last_seen=now,
|
||||||
|
)
|
||||||
|
|
||||||
|
# only test proxy A
|
||||||
|
with patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True) as mock:
|
||||||
|
asyncio.run(pool._run_health_tests(keys=["socks5://10.0.0.1:1080"]))
|
||||||
|
# should only have been called for proxy A
|
||||||
|
assert mock.call_count == 1
|
||||||
|
assert mock.call_args[0][0] == "socks5://10.0.0.1:1080"
|
||||||
|
|
||||||
|
assert pool._proxies["socks5://10.0.0.1:1080"].alive is True
|
||||||
|
# proxy B untouched
|
||||||
|
assert pool._proxies["socks5://10.0.0.2:1080"].alive is False
|
||||||
|
|
||||||
|
|
||||||
class TestProxyPoolStaleExpiry:
|
class TestProxyPoolStaleExpiry:
|
||||||
"""Test stale proxy eviction."""
|
"""Test stale proxy eviction."""
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user