diff --git a/config/example.yaml b/config/example.yaml index a73b822..f64f802 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -50,6 +50,15 @@ chain: # cookie_file: "" # CookieAuthentication file path # newnym_interval: 0 # periodic NEWNYM (seconds, 0 = manual only) +# Multi-Tor round-robin -- distribute traffic across multiple Tor nodes. +# When present, the first hop in each listener's chain is replaced at +# connection time by round-robin selection from this list. +# tor_nodes: +# - socks5://10.200.1.1:9050 +# - socks5://10.200.1.254:9050 +# - socks5://10.200.1.250:9050 +# - socks5://10.200.1.13:9050 + # Multi-listener mode -- each listener gets its own address and chain. # The "pool" keyword in a chain appends a random alive proxy from the pool. # Multiple "pool" entries = multiple pool hops (deeper chaining). diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index bd97f55..af2696c 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -56,6 +56,16 @@ listeners: - socks5://127.0.0.1:9050 # Tor only ``` +## Multi-Tor Round-Robin (config) + +```yaml +tor_nodes: # overrides first hop in all listeners + - socks5://10.200.1.1:9050 + - socks5://10.200.1.254:9050 + - socks5://10.200.1.250:9050 + - socks5://10.200.1.13:9050 +``` + ## Performance Tuning (config) ```yaml diff --git a/docs/USAGE.md b/docs/USAGE.md index 834ca98..85ac066 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -82,6 +82,36 @@ proxy_pool: state_file: "" # empty = ~/.cache/s5p/pool.json ``` +## Multi-Tor Round-Robin + +Distribute traffic across multiple Tor nodes instead of funneling everything +through a single one. When `tor_nodes` is configured, the first hop in each +listener's chain is replaced at connection time by round-robin selection. +Health tests also rotate across all nodes. + +```yaml +tor_nodes: + - socks5://10.200.1.1:9050 + - socks5://10.200.1.254:9050 + - socks5://10.200.1.250:9050 + - socks5://10.200.1.13:9050 +``` + +When `tor_nodes` is absent, listeners use their configured first hop as before. +When present, `tor_nodes` overrides the first hop everywhere. + +If `pool_size > 0`, pre-warmed connection pools are created for all nodes +automatically. + +### API + +`tor_nodes` appears in both `/config` and `/status` responses: + +```bash +curl -s http://127.0.0.1:1090/config | jq '.tor_nodes' +curl -s http://127.0.0.1:1090/status | jq '.tor_nodes' +``` + ## Multi-Listener Mode Run multiple listeners on different ports, each with a different number diff --git a/src/s5p/api.py b/src/s5p/api.py index cdf16ad..66fff3e 100644 --- a/src/s5p/api.py +++ b/src/s5p/api.py @@ -70,6 +70,8 @@ def _handle_status(ctx: dict) -> tuple[int, dict]: if pool: data["pool"] = {"alive": pool.alive_count, "total": pool.count} config = ctx.get("config") + if config and config.tor_nodes: + data["tor_nodes"] = [str(n) for n in config.tor_nodes] if config: data["listeners"] = [ { @@ -136,6 +138,8 @@ def _handle_config(ctx: dict) -> tuple[int, dict]: for lc in config.listeners ], } + if config.tor_nodes: + data["tor_nodes"] = [str(n) for n in config.tor_nodes] if config.proxy_pool: pp = config.proxy_pool sources = [] diff --git a/src/s5p/config.py b/src/s5p/config.py index ea5e645..8858c24 100644 --- a/src/s5p/config.py +++ b/src/s5p/config.py @@ -105,6 +105,7 @@ class Config: api_port: int = 0 proxy_pool: ProxyPoolConfig | None = None tor: TorConfig | None = None + tor_nodes: list[ChainHop] = field(default_factory=list) config_file: str = "" @@ -267,6 +268,10 @@ def load_config(path: str | Path) -> Config: newnym_interval=float(tor_raw.get("newnym_interval", 0)), ) + # -- tor_nodes ------------------------------------------------------- + if "tor_nodes" in raw: + config.tor_nodes = [parse_proxy_url(u) for u in raw["tor_nodes"]] + # -- listeners ------------------------------------------------------- if "listeners" in raw: for entry in raw["listeners"]: diff --git a/src/s5p/pool.py b/src/s5p/pool.py index 3ebd1e5..bf9a5ee 100644 --- a/src/s5p/pool.py +++ b/src/s5p/pool.py @@ -54,9 +54,12 @@ class ProxyPool: cfg: ProxyPoolConfig, chain: list[ChainHop], timeout: float, + chain_nodes: list[ChainHop] | None = None, ) -> None: self._cfg = cfg self._chain = list(chain) + self._chain_nodes = chain_nodes or [] + self._chain_idx = 0 self._timeout = timeout self._proxies: dict[str, ProxyEntry] = {} self._alive_keys: list[str] = [] @@ -66,6 +69,15 @@ class ProxyPool: self._ssl_ctx = ssl.create_default_context() self._target_idx = 0 + def _effective_chain(self) -> list[ChainHop]: + """Return chain with first hop rotated across tor_nodes (if configured).""" + if not self._chain_nodes or not self._chain: + return self._chain + chain = list(self._chain) + chain[0] = self._chain_nodes[self._chain_idx % len(self._chain_nodes)] + self._chain_idx += 1 + return chain + # -- public interface ---------------------------------------------------- async def start(self) -> None: @@ -268,13 +280,13 @@ class ProxyPool: """Test a single proxy via TLS handshake through the full chain.""" entry.last_test = time.time() entry.tests += 1 - return await self._tls_check(self._chain + [entry.hop]) + return await self._tls_check(self._effective_chain() + [entry.hop]) async def _test_chain(self) -> bool: """Test the static chain without any pool proxy.""" if not self._chain: return True - return await self._tls_check(self._chain) + return await self._tls_check(self._effective_chain()) async def _run_health_tests(self, keys: list[str] | None = None) -> None: """Test proxies with bounded concurrency. diff --git a/src/s5p/server.py b/src/s5p/server.py index 641e644..1fe1e9c 100644 --- a/src/s5p/server.py +++ b/src/s5p/server.py @@ -21,6 +21,19 @@ logger = logging.getLogger("s5p") BUFFER_SIZE = 65536 +class _RoundRobin: + """Simple round-robin selector (single-threaded asyncio, no lock).""" + + def __init__(self, items: list[ChainHop]) -> None: + self._items = items + self._idx = 0 + + def next(self) -> ChainHop: + item = self._items[self._idx % len(self._items)] + self._idx += 1 + return item + + # -- relay ------------------------------------------------------------------- @@ -66,6 +79,8 @@ async def _handle_client( proxy_pool: ProxyPool | None = None, metrics: Metrics | None = None, first_hop_pool: FirstHopPool | None = None, + tor_rr: _RoundRobin | None = None, + hop_pools: dict[tuple[str, int], FirstHopPool] | None = None, ) -> None: """Handle a single SOCKS5 client connection.""" peer = client_writer.get_extra_info("peername") @@ -109,6 +124,13 @@ async def _handle_client( for attempt in range(attempts): effective_chain = list(listener.chain) + fhp = first_hop_pool + if tor_rr and effective_chain: + node = tor_rr.next() + effective_chain[0] = node + if hop_pools: + fhp = hop_pools.get((node.host, node.port)) + pool_hops: list[ChainHop] = [] if proxy_pool and listener.pool_hops > 0: for _ in range(listener.pool_hops): @@ -123,7 +145,7 @@ async def _handle_client( t0 = time.monotonic() remote_reader, remote_writer = await build_chain( effective_chain, target_host, target_port, - timeout=timeout, first_hop_pool=first_hop_pool, + timeout=timeout, first_hop_pool=fhp, ) dt = time.monotonic() - t0 logger.debug("[%s] chain up in %.0fms", tag, dt * 1000) @@ -238,12 +260,22 @@ async def serve(config: Config) -> None: metrics = Metrics() listeners = config.listeners + # -- tor_nodes round-robin ----------------------------------------------- + tor_rr: _RoundRobin | None = None + if config.tor_nodes: + tor_rr = _RoundRobin(config.tor_nodes) + nodes = ", ".join(str(n) for n in config.tor_nodes) + logger.info("tor_nodes: %s (round-robin)", nodes) + # -- shared proxy pool --------------------------------------------------- proxy_pool: ProxyPool | None = None if config.proxy_pool and config.proxy_pool.sources: # use first listener's chain as base chain for pool health tests base_chain = listeners[0].chain if listeners else config.chain - proxy_pool = ProxyPool(config.proxy_pool, base_chain, config.timeout) + proxy_pool = ProxyPool( + config.proxy_pool, base_chain, config.timeout, + chain_nodes=config.tor_nodes or None, + ) await proxy_pool.start() # -- per-unique first-hop connection pools -------------------------------- @@ -260,6 +292,17 @@ async def serve(config: Config) -> None: ) await hp.start() hop_pools[key] = hp + # create pools for all tor_nodes + if config.tor_nodes: + for node in config.tor_nodes: + key = (node.host, node.port) + if key not in hop_pools: + hp = FirstHopPool( + node, size=config.pool_size, + max_idle=config.pool_max_idle, + ) + await hp.start() + hop_pools[key] = hp def _hop_pool_for(lc: ListenerConfig) -> FirstHopPool | None: if not lc.chain: @@ -297,7 +340,7 @@ async def serve(config: Config) -> None: async with sem: await _handle_client( r, w, _lc, config.timeout, config.retries, - proxy_pool, metrics, _hp, + proxy_pool, metrics, _hp, tor_rr, hop_pools, ) srv = await asyncio.start_server(on_client, lc.listen_host, lc.listen_port) diff --git a/tests/test_api.py b/tests/test_api.py index 8ff4027..fce8b22 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -255,6 +255,51 @@ class TestHandleConfig: assert body["proxy_pool"]["sources"][0]["url"] == "http://api:8081/proxies" assert body["listeners"][0]["pool_hops"] == 1 + def test_with_tor_nodes(self): + config = Config( + tor_nodes=[ + ChainHop("socks5", "10.200.1.1", 9050), + ChainHop("socks5", "10.200.1.13", 9050), + ], + listeners=[ListenerConfig()], + ) + ctx = _make_ctx(config=config) + _, body = _handle_config(ctx) + assert body["tor_nodes"] == [ + "socks5://10.200.1.1:9050", + "socks5://10.200.1.13:9050", + ] + + def test_no_tor_nodes(self): + config = Config(listeners=[ListenerConfig()]) + ctx = _make_ctx(config=config) + _, body = _handle_config(ctx) + assert "tor_nodes" not in body + + +class TestHandleStatusTorNodes: + """Test tor_nodes in GET /status response.""" + + def test_tor_nodes_in_status(self): + config = Config( + tor_nodes=[ + ChainHop("socks5", "10.200.1.1", 9050), + ChainHop("socks5", "10.200.1.13", 9050), + ], + listeners=[ListenerConfig()], + ) + ctx = _make_ctx(config=config) + _, body = _handle_status(ctx) + assert body["tor_nodes"] == [ + "socks5://10.200.1.1:9050", + "socks5://10.200.1.13:9050", + ] + + def test_no_tor_nodes_in_status(self): + ctx = _make_ctx() + _, body = _handle_status(ctx) + assert "tor_nodes" not in body + # -- routing ----------------------------------------------------------------- diff --git a/tests/test_config.py b/tests/test_config.py index 5de923d..e621b3c 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -135,6 +135,7 @@ class TestConfig: assert c.max_connections == 256 assert c.pool_size == 0 assert c.pool_max_idle == 30.0 + assert c.tor_nodes == [] def test_max_connections_from_yaml(self, tmp_path): cfg_file = tmp_path / "test.yaml" @@ -221,6 +222,31 @@ class TestConfig: ] +class TestTorNodes: + """Test tor_nodes config parsing.""" + + def test_tor_nodes_from_yaml(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "tor_nodes:\n" + " - socks5://10.200.1.1:9050\n" + " - socks5://10.200.1.254:9050\n" + " - socks5://10.200.1.13:9050\n" + ) + c = load_config(cfg_file) + assert len(c.tor_nodes) == 3 + assert c.tor_nodes[0].host == "10.200.1.1" + assert c.tor_nodes[0].port == 9050 + assert c.tor_nodes[1].host == "10.200.1.254" + assert c.tor_nodes[2].host == "10.200.1.13" + + def test_no_tor_nodes(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text("listen: 1080\n") + c = load_config(cfg_file) + assert c.tor_nodes == [] + + class TestListenerConfig: """Test multi-listener config parsing.""" diff --git a/tests/test_pool.py b/tests/test_pool.py index 66eb4d2..6b78e5d 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -22,6 +22,38 @@ class TestProxyEntry: assert entry.tests == 0 +class TestEffectiveChain: + """Test chain_nodes round-robin in pool health tests.""" + + def test_no_nodes_returns_original(self): + cfg = ProxyPoolConfig(sources=[]) + chain = [ChainHop(proto="socks5", host="10.0.0.1", port=9050)] + pool = ProxyPool(cfg, chain, timeout=10.0) + assert pool._effective_chain() == chain + + def test_round_robin_across_nodes(self): + cfg = ProxyPoolConfig(sources=[]) + chain = [ChainHop(proto="socks5", host="original", port=9050)] + nodes = [ + ChainHop(proto="socks5", host="node-a", port=9050), + ChainHop(proto="socks5", host="node-b", port=9050), + ChainHop(proto="socks5", host="node-c", port=9050), + ] + pool = ProxyPool(cfg, chain, timeout=10.0, chain_nodes=nodes) + + hosts = [pool._effective_chain()[0].host for _ in range(6)] + assert hosts == [ + "node-a", "node-b", "node-c", + "node-a", "node-b", "node-c", + ] + + def test_empty_chain_no_replacement(self): + cfg = ProxyPoolConfig(sources=[]) + nodes = [ChainHop(proto="socks5", host="node-a", port=9050)] + pool = ProxyPool(cfg, [], timeout=10.0, chain_nodes=nodes) + assert pool._effective_chain() == [] + + class TestProxyPoolMerge: """Test proxy deduplication and merge."""