feat: multi-Tor round-robin via tor_nodes config

New top-level tor_nodes list distributes traffic across multiple Tor
SOCKS proxies. First hop is replaced at connection time by round-robin
selection; health tests also rotate across all nodes. FirstHopPools
are created for each node when pool_size > 0.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-18 10:12:58 +01:00
parent b3966c9a9f
commit 288bd95f62
10 changed files with 221 additions and 5 deletions
+9
View File
@@ -50,6 +50,15 @@ chain:
# cookie_file: "" # CookieAuthentication file path
# newnym_interval: 0 # periodic NEWNYM (seconds, 0 = manual only)
# Multi-Tor round-robin -- distribute traffic across multiple Tor nodes.
# When present, the first hop in each listener's chain is replaced at
# connection time by round-robin selection from this list.
# tor_nodes:
# - socks5://10.200.1.1:9050
# - socks5://10.200.1.254:9050
# - socks5://10.200.1.250:9050
# - socks5://10.200.1.13:9050
# Multi-listener mode -- each listener gets its own address and chain.
# The "pool" keyword in a chain appends a random alive proxy from the pool.
# Multiple "pool" entries = multiple pool hops (deeper chaining).
+10
View File
@@ -56,6 +56,16 @@ listeners:
- socks5://127.0.0.1:9050 # Tor only
```
## Multi-Tor Round-Robin (config)
```yaml
tor_nodes: # overrides first hop in all listeners
- socks5://10.200.1.1:9050
- socks5://10.200.1.254:9050
- socks5://10.200.1.250:9050
- socks5://10.200.1.13:9050
```
## Performance Tuning (config)
```yaml
+30
View File
@@ -82,6 +82,36 @@ proxy_pool:
state_file: "" # empty = ~/.cache/s5p/pool.json
```
## Multi-Tor Round-Robin
Distribute traffic across multiple Tor nodes instead of funneling everything
through a single one. When `tor_nodes` is configured, the first hop in each
listener's chain is replaced at connection time by round-robin selection.
Health tests also rotate across all nodes.
```yaml
tor_nodes:
- socks5://10.200.1.1:9050
- socks5://10.200.1.254:9050
- socks5://10.200.1.250:9050
- socks5://10.200.1.13:9050
```
When `tor_nodes` is absent, listeners use their configured first hop as before.
When present, `tor_nodes` overrides the first hop everywhere.
If `pool_size > 0`, pre-warmed connection pools are created for all nodes
automatically.
### API
`tor_nodes` appears in both `/config` and `/status` responses:
```bash
curl -s http://127.0.0.1:1090/config | jq '.tor_nodes'
curl -s http://127.0.0.1:1090/status | jq '.tor_nodes'
```
## Multi-Listener Mode
Run multiple listeners on different ports, each with a different number
+4
View File
@@ -70,6 +70,8 @@ def _handle_status(ctx: dict) -> tuple[int, dict]:
if pool:
data["pool"] = {"alive": pool.alive_count, "total": pool.count}
config = ctx.get("config")
if config and config.tor_nodes:
data["tor_nodes"] = [str(n) for n in config.tor_nodes]
if config:
data["listeners"] = [
{
@@ -136,6 +138,8 @@ def _handle_config(ctx: dict) -> tuple[int, dict]:
for lc in config.listeners
],
}
if config.tor_nodes:
data["tor_nodes"] = [str(n) for n in config.tor_nodes]
if config.proxy_pool:
pp = config.proxy_pool
sources = []
+5
View File
@@ -105,6 +105,7 @@ class Config:
api_port: int = 0
proxy_pool: ProxyPoolConfig | None = None
tor: TorConfig | None = None
tor_nodes: list[ChainHop] = field(default_factory=list)
config_file: str = ""
@@ -267,6 +268,10 @@ def load_config(path: str | Path) -> Config:
newnym_interval=float(tor_raw.get("newnym_interval", 0)),
)
# -- tor_nodes -------------------------------------------------------
if "tor_nodes" in raw:
config.tor_nodes = [parse_proxy_url(u) for u in raw["tor_nodes"]]
# -- listeners -------------------------------------------------------
if "listeners" in raw:
for entry in raw["listeners"]:
+14 -2
View File
@@ -54,9 +54,12 @@ class ProxyPool:
cfg: ProxyPoolConfig,
chain: list[ChainHop],
timeout: float,
chain_nodes: list[ChainHop] | None = None,
) -> None:
self._cfg = cfg
self._chain = list(chain)
self._chain_nodes = chain_nodes or []
self._chain_idx = 0
self._timeout = timeout
self._proxies: dict[str, ProxyEntry] = {}
self._alive_keys: list[str] = []
@@ -66,6 +69,15 @@ class ProxyPool:
self._ssl_ctx = ssl.create_default_context()
self._target_idx = 0
def _effective_chain(self) -> list[ChainHop]:
"""Return chain with first hop rotated across tor_nodes (if configured)."""
if not self._chain_nodes or not self._chain:
return self._chain
chain = list(self._chain)
chain[0] = self._chain_nodes[self._chain_idx % len(self._chain_nodes)]
self._chain_idx += 1
return chain
# -- public interface ----------------------------------------------------
async def start(self) -> None:
@@ -268,13 +280,13 @@ class ProxyPool:
"""Test a single proxy via TLS handshake through the full chain."""
entry.last_test = time.time()
entry.tests += 1
return await self._tls_check(self._chain + [entry.hop])
return await self._tls_check(self._effective_chain() + [entry.hop])
async def _test_chain(self) -> bool:
"""Test the static chain without any pool proxy."""
if not self._chain:
return True
return await self._tls_check(self._chain)
return await self._tls_check(self._effective_chain())
async def _run_health_tests(self, keys: list[str] | None = None) -> None:
"""Test proxies with bounded concurrency.
+46 -3
View File
@@ -21,6 +21,19 @@ logger = logging.getLogger("s5p")
BUFFER_SIZE = 65536
class _RoundRobin:
"""Simple round-robin selector (single-threaded asyncio, no lock)."""
def __init__(self, items: list[ChainHop]) -> None:
self._items = items
self._idx = 0
def next(self) -> ChainHop:
item = self._items[self._idx % len(self._items)]
self._idx += 1
return item
# -- relay -------------------------------------------------------------------
@@ -66,6 +79,8 @@ async def _handle_client(
proxy_pool: ProxyPool | None = None,
metrics: Metrics | None = None,
first_hop_pool: FirstHopPool | None = None,
tor_rr: _RoundRobin | None = None,
hop_pools: dict[tuple[str, int], FirstHopPool] | None = None,
) -> None:
"""Handle a single SOCKS5 client connection."""
peer = client_writer.get_extra_info("peername")
@@ -109,6 +124,13 @@ async def _handle_client(
for attempt in range(attempts):
effective_chain = list(listener.chain)
fhp = first_hop_pool
if tor_rr and effective_chain:
node = tor_rr.next()
effective_chain[0] = node
if hop_pools:
fhp = hop_pools.get((node.host, node.port))
pool_hops: list[ChainHop] = []
if proxy_pool and listener.pool_hops > 0:
for _ in range(listener.pool_hops):
@@ -123,7 +145,7 @@ async def _handle_client(
t0 = time.monotonic()
remote_reader, remote_writer = await build_chain(
effective_chain, target_host, target_port,
timeout=timeout, first_hop_pool=first_hop_pool,
timeout=timeout, first_hop_pool=fhp,
)
dt = time.monotonic() - t0
logger.debug("[%s] chain up in %.0fms", tag, dt * 1000)
@@ -238,12 +260,22 @@ async def serve(config: Config) -> None:
metrics = Metrics()
listeners = config.listeners
# -- tor_nodes round-robin -----------------------------------------------
tor_rr: _RoundRobin | None = None
if config.tor_nodes:
tor_rr = _RoundRobin(config.tor_nodes)
nodes = ", ".join(str(n) for n in config.tor_nodes)
logger.info("tor_nodes: %s (round-robin)", nodes)
# -- shared proxy pool ---------------------------------------------------
proxy_pool: ProxyPool | None = None
if config.proxy_pool and config.proxy_pool.sources:
# use first listener's chain as base chain for pool health tests
base_chain = listeners[0].chain if listeners else config.chain
proxy_pool = ProxyPool(config.proxy_pool, base_chain, config.timeout)
proxy_pool = ProxyPool(
config.proxy_pool, base_chain, config.timeout,
chain_nodes=config.tor_nodes or None,
)
await proxy_pool.start()
# -- per-unique first-hop connection pools --------------------------------
@@ -260,6 +292,17 @@ async def serve(config: Config) -> None:
)
await hp.start()
hop_pools[key] = hp
# create pools for all tor_nodes
if config.tor_nodes:
for node in config.tor_nodes:
key = (node.host, node.port)
if key not in hop_pools:
hp = FirstHopPool(
node, size=config.pool_size,
max_idle=config.pool_max_idle,
)
await hp.start()
hop_pools[key] = hp
def _hop_pool_for(lc: ListenerConfig) -> FirstHopPool | None:
if not lc.chain:
@@ -297,7 +340,7 @@ async def serve(config: Config) -> None:
async with sem:
await _handle_client(
r, w, _lc, config.timeout, config.retries,
proxy_pool, metrics, _hp,
proxy_pool, metrics, _hp, tor_rr, hop_pools,
)
srv = await asyncio.start_server(on_client, lc.listen_host, lc.listen_port)
+45
View File
@@ -255,6 +255,51 @@ class TestHandleConfig:
assert body["proxy_pool"]["sources"][0]["url"] == "http://api:8081/proxies"
assert body["listeners"][0]["pool_hops"] == 1
def test_with_tor_nodes(self):
config = Config(
tor_nodes=[
ChainHop("socks5", "10.200.1.1", 9050),
ChainHop("socks5", "10.200.1.13", 9050),
],
listeners=[ListenerConfig()],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert body["tor_nodes"] == [
"socks5://10.200.1.1:9050",
"socks5://10.200.1.13:9050",
]
def test_no_tor_nodes(self):
config = Config(listeners=[ListenerConfig()])
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert "tor_nodes" not in body
class TestHandleStatusTorNodes:
"""Test tor_nodes in GET /status response."""
def test_tor_nodes_in_status(self):
config = Config(
tor_nodes=[
ChainHop("socks5", "10.200.1.1", 9050),
ChainHop("socks5", "10.200.1.13", 9050),
],
listeners=[ListenerConfig()],
)
ctx = _make_ctx(config=config)
_, body = _handle_status(ctx)
assert body["tor_nodes"] == [
"socks5://10.200.1.1:9050",
"socks5://10.200.1.13:9050",
]
def test_no_tor_nodes_in_status(self):
ctx = _make_ctx()
_, body = _handle_status(ctx)
assert "tor_nodes" not in body
# -- routing -----------------------------------------------------------------
+26
View File
@@ -135,6 +135,7 @@ class TestConfig:
assert c.max_connections == 256
assert c.pool_size == 0
assert c.pool_max_idle == 30.0
assert c.tor_nodes == []
def test_max_connections_from_yaml(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
@@ -221,6 +222,31 @@ class TestConfig:
]
class TestTorNodes:
"""Test tor_nodes config parsing."""
def test_tor_nodes_from_yaml(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"tor_nodes:\n"
" - socks5://10.200.1.1:9050\n"
" - socks5://10.200.1.254:9050\n"
" - socks5://10.200.1.13:9050\n"
)
c = load_config(cfg_file)
assert len(c.tor_nodes) == 3
assert c.tor_nodes[0].host == "10.200.1.1"
assert c.tor_nodes[0].port == 9050
assert c.tor_nodes[1].host == "10.200.1.254"
assert c.tor_nodes[2].host == "10.200.1.13"
def test_no_tor_nodes(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text("listen: 1080\n")
c = load_config(cfg_file)
assert c.tor_nodes == []
class TestListenerConfig:
"""Test multi-listener config parsing."""
+32
View File
@@ -22,6 +22,38 @@ class TestProxyEntry:
assert entry.tests == 0
class TestEffectiveChain:
"""Test chain_nodes round-robin in pool health tests."""
def test_no_nodes_returns_original(self):
cfg = ProxyPoolConfig(sources=[])
chain = [ChainHop(proto="socks5", host="10.0.0.1", port=9050)]
pool = ProxyPool(cfg, chain, timeout=10.0)
assert pool._effective_chain() == chain
def test_round_robin_across_nodes(self):
cfg = ProxyPoolConfig(sources=[])
chain = [ChainHop(proto="socks5", host="original", port=9050)]
nodes = [
ChainHop(proto="socks5", host="node-a", port=9050),
ChainHop(proto="socks5", host="node-b", port=9050),
ChainHop(proto="socks5", host="node-c", port=9050),
]
pool = ProxyPool(cfg, chain, timeout=10.0, chain_nodes=nodes)
hosts = [pool._effective_chain()[0].host for _ in range(6)]
assert hosts == [
"node-a", "node-b", "node-c",
"node-a", "node-b", "node-c",
]
def test_empty_chain_no_replacement(self):
cfg = ProxyPoolConfig(sources=[])
nodes = [ChainHop(proto="socks5", host="node-a", port=9050)]
pool = ProxyPool(cfg, [], timeout=10.0, chain_nodes=nodes)
assert pool._effective_chain() == []
class TestProxyPoolMerge:
"""Test proxy deduplication and merge."""