From ef0d8f347b3be39893dfb86e6f32cc7b7df0b74a Mon Sep 17 00:00:00 2001 From: user Date: Fri, 20 Feb 2026 17:50:17 +0100 Subject: [PATCH] feat: add per-hop pool references in listener chains Allow listeners to mix named pools in a single chain using pool:name syntax. Bare "pool" continues to use the listener's default pool. Replaces pool_hops field with pool_seq list; pool_hops is now a backward-compatible property. Each hop draws from its own pool and failure reporting targets the correct source pool. Co-Authored-By: Claude Opus 4.6 --- README.md | 13 ++---- config/example.yaml | 19 +++++--- docs/CHEATSHEET.md | 13 +++--- docs/USAGE.md | 30 +++++++++++++ src/s5p/api.py | 2 + src/s5p/config.py | 42 +++++++++++------ src/s5p/server.py | 62 ++++++++++++++------------ tests/test_api.py | 56 +++++++++++++++++++++-- tests/test_config.py | 104 +++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 275 insertions(+), 66 deletions(-) diff --git a/README.md b/README.md index 4eab144..f6eb5e7 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,7 @@ proxy_pools: max_fails: 3 # Multi-listener: each port gets a chain depth and pool assignment +# Use "pool" for listener default, "pool:name" for explicit pool per hop listeners: - listen: 0.0.0.0:1080 pool: clean @@ -104,16 +105,11 @@ listeners: pool: clean chain: - socks5://127.0.0.1:9050 - - pool # Tor + 1 clean proxy + - pool:clean # per-hop: clean pool + - pool:mitm # per-hop: mitm pool - listen: 0.0.0.0:1082 chain: - socks5://127.0.0.1:9050 # Tor only - - listen: 0.0.0.0:1083 - pool: mitm - chain: - - socks5://127.0.0.1:9050 - - pool # Tor + 2 MITM proxies - - pool # Singular proxy_pool: still works (becomes pool "default") @@ -150,9 +146,8 @@ Options: ``` :1080 Client -> s5p -> Tor -> [clean] -> [clean] -> Dest (2 clean hops) -:1081 Client -> s5p -> Tor -> [clean] -> Dest (1 clean hop) +:1081 Client -> s5p -> Tor -> [clean] -> [mitm] -> Dest (mixed pools) :1082 Client -> s5p -> Tor -> Dest (Tor only) -:1083 Client -> s5p -> Tor -> [mitm] -> [mitm] -> Dest (2 MITM hops) ``` s5p connects to Hop1 via TCP, negotiates the hop protocol (SOCKS5/4/HTTP), diff --git a/config/example.yaml b/config/example.yaml index 5b72b45..34d7fa6 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -87,30 +87,35 @@ chain: # a random alive proxy from the named pool (or "default" if unnamed). # Multiple "pool" entries = multiple pool hops (deeper chaining). # +# Per-hop pool references: use "pool:name" to draw from a specific pool +# at that hop position. Bare "pool" uses the listener's "pool:" default. +# This lets a single listener mix pools in one chain. +# # listeners: # - listen: 0.0.0.0:1080 -# pool: clean # draw from "clean" pool +# pool: clean # default for bare "pool" # chain: # - socks5://127.0.0.1:9050 -# - pool # Tor + 2 clean pool proxies -# - pool +# - pool:clean # explicit: from clean pool +# - pool:mitm # explicit: from mitm pool # # - listen: 0.0.0.0:1081 # pool: clean # chain: # - socks5://127.0.0.1:9050 -# - pool # Tor + 1 clean pool proxy +# - pool # bare: uses default "clean" +# - pool # # - listen: 0.0.0.0:1082 # chain: # - socks5://127.0.0.1:9050 # Tor only (no pool hops) # # - listen: 0.0.0.0:1083 -# pool: mitm # draw from "mitm" pool +# pool: clean # chain: # - socks5://127.0.0.1:9050 -# - pool # Tor + 2 MITM pool proxies -# - pool +# - pool # bare "pool" = clean +# - pool:mitm # explicit = mitm # # When using "listeners:", the top-level "listen" and "chain" keys are ignored. # If "listeners:" is absent, the old format is used (single listener). diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index f9dc461..9b8f298 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -43,7 +43,7 @@ cp config/example.yaml config/s5p.yaml # create live config (gitignored) ```yaml listeners: - listen: 0.0.0.0:1080 - pool: clean # named pool assignment + pool: clean # default for bare "pool" chain: - socks5://127.0.0.1:9050 - pool # Tor + 2 clean hops @@ -52,18 +52,21 @@ listeners: pool: clean chain: - socks5://127.0.0.1:9050 - - pool # Tor + 1 clean hop + - pool:clean # per-hop: explicit clean + - pool:mitm # per-hop: explicit mitm - listen: 0.0.0.0:1082 chain: - socks5://127.0.0.1:9050 # Tor only - listen: 0.0.0.0:1083 - pool: mitm # MITM-capable proxies + pool: clean chain: - socks5://127.0.0.1:9050 - - pool - - pool + - pool # bare = clean (default) + - pool:mitm # explicit = mitm ``` +Per-hop pool: `pool` = listener default, `pool:name` = explicit pool. + ## Multi-Tor Round-Robin (config) ```yaml diff --git a/docs/USAGE.md b/docs/USAGE.md index ca31ed7..469d2d9 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -177,6 +177,36 @@ listeners: - pool ``` +### Per-hop pool references + +Use `pool:name` to draw from a specific named pool at that hop position. +Bare `pool` uses the listener's `pool:` default. This lets a single listener +mix pools in one chain. + +```yaml +listeners: + - listen: 0.0.0.0:1080 + pool: clean # default for bare "pool" + chain: + - socks5://10.200.1.13:9050 + - pool:clean # explicit: from clean pool + - pool:mitm # explicit: from mitm pool + + - listen: 0.0.0.0:1081 + pool: clean + chain: + - socks5://10.200.1.13:9050 + - pool # bare: uses default "clean" + - pool:mitm # explicit: from mitm pool +``` + +| Syntax | Resolves to | +|--------|-------------| +| `pool` | Listener's `pool:` value, or `"default"` if unset | +| `pool:name` | Named pool `name` (case-sensitive) | +| `pool:` | Same as bare `pool` (empty name = default) | +| `Pool:name` | Prefix is case-insensitive; name is case-sensitive | + The `pool` keyword in a chain means "append a random alive proxy from the assigned pool". Multiple `pool` entries = multiple pool hops (deeper chaining). diff --git a/src/s5p/api.py b/src/s5p/api.py index 19d0113..3001a84 100644 --- a/src/s5p/api.py +++ b/src/s5p/api.py @@ -88,6 +88,7 @@ def _handle_status(ctx: dict) -> tuple[int, dict]: "chain": [str(h) for h in lc.chain], "pool_hops": lc.pool_hops, **({"pool": lc.pool_name} if lc.pool_name else {}), + **({"pool_seq": lc.pool_seq} if len(set(lc.pool_seq)) > 1 else {}), "latency": metrics.get_listener_latency( f"{lc.listen_host}:{lc.listen_port}" ).stats(), @@ -166,6 +167,7 @@ def _handle_config(ctx: dict) -> tuple[int, dict]: "chain": [str(h) for h in lc.chain], "pool_hops": lc.pool_hops, **({"pool": lc.pool_name} if lc.pool_name else {}), + **({"pool_seq": lc.pool_seq} if len(set(lc.pool_seq)) > 1 else {}), } for lc in config.listeners ], diff --git a/src/s5p/config.py b/src/s5p/config.py index 9c7c0c1..1fad28d 100644 --- a/src/s5p/config.py +++ b/src/s5p/config.py @@ -85,9 +85,14 @@ class ListenerConfig: listen_host: str = "127.0.0.1" listen_port: int = 1080 chain: list[ChainHop] = field(default_factory=list) - pool_hops: int = 0 + pool_seq: list[str] = field(default_factory=list) pool_name: str = "" + @property + def pool_hops(self) -> int: + """Number of pool hops (backward compat).""" + return len(self.pool_seq) + @dataclass class Config: @@ -306,20 +311,29 @@ def load_config(path: str | Path) -> Config: lc.pool_name = entry["pool"] chain_raw = entry.get("chain", []) for item in chain_raw: - if isinstance(item, str) and item.lower() == "pool": - lc.pool_hops += 1 - elif isinstance(item, str): - lc.chain.append(parse_proxy_url(item)) + if isinstance(item, str): + lower = item.lower() + if lower == "pool" or lower.startswith("pool:"): + _, _, name = item.partition(":") + lc.pool_seq.append(name if name else (lc.pool_name or "default")) + else: + lc.chain.append(parse_proxy_url(item)) elif isinstance(item, dict): - lc.chain.append( - ChainHop( - proto=item.get("proto", "socks5"), - host=item["host"], - port=int(item["port"]), - username=item.get("username"), - password=item.get("password"), + # YAML parses "pool:" and "pool: name" as dicts + pool_key = next((k for k in item if k.lower() == "pool"), None) + if pool_key is not None and len(item) == 1: + name = item[pool_key] + lc.pool_seq.append(name if name else (lc.pool_name or "default")) + else: + lc.chain.append( + ChainHop( + proto=item.get("proto", "socks5"), + host=item["host"], + port=int(item["port"]), + username=item.get("username"), + password=item.get("password"), + ) ) - ) config.listeners.append(lc) else: # backward compat: build single listener from top-level fields @@ -330,7 +344,7 @@ def load_config(path: str | Path) -> Config: ) # legacy behavior: if proxy_pool configured, auto-append 1 pool hop if config.proxy_pool and config.proxy_pool.sources: - lc.pool_hops = 1 + lc.pool_seq = ["default"] config.listeners.append(lc) return config diff --git a/src/s5p/server.py b/src/s5p/server.py index 0046559..14fcbef 100644 --- a/src/s5p/server.py +++ b/src/s5p/server.py @@ -76,7 +76,7 @@ async def _handle_client( listener: ListenerConfig, timeout: float, retries: int, - proxy_pool: ProxyPool | None = None, + pool_seq: list[ProxyPool] | None = None, metrics: Metrics | None = None, first_hop_pool: FirstHopPool | None = None, tor_rr: _RoundRobin | None = None, @@ -119,7 +119,7 @@ async def _handle_client( logger.info("[%s] connect %s:%d", tag, target_host, target_port) # -- build chain (with retry) -- - attempts = retries if proxy_pool and listener.pool_hops > 0 else 1 + attempts = retries if pool_seq else 1 last_err: Exception | None = None for attempt in range(attempts): @@ -131,15 +131,15 @@ async def _handle_client( if hop_pools: fhp = hop_pools.get((node.host, node.port)) - pool_hops: list[ChainHop] = [] - if proxy_pool and listener.pool_hops > 0: - for _ in range(listener.pool_hops): - hop = await proxy_pool.get() + pool_hops: list[tuple[ChainHop, ProxyPool]] = [] + if pool_seq: + for pp in pool_seq: + hop = await pp.get() if hop: - pool_hops.append(hop) + pool_hops.append((hop, pp)) effective_chain.append(hop) if pool_hops: - logger.debug("[%s] +pool %s", tag, " ".join(str(h) for h in pool_hops)) + logger.debug("[%s] +pool %s", tag, " ".join(str(h) for h, _ in pool_hops)) try: t0 = time.monotonic() @@ -157,9 +157,9 @@ async def _handle_client( break except (ProtoError, TimeoutError, ConnectionError, OSError) as e: last_err = e - if pool_hops and proxy_pool: - for hop in pool_hops: - proxy_pool.report_failure(hop) + if pool_hops: + for hop, pp in pool_hops: + pp.report_failure(hop) if metrics: metrics.retries += 1 if attempt + 1 < attempts: @@ -295,17 +295,17 @@ async def serve(config: Config) -> None: await pool.start() proxy_pools["default"] = pool - def _pool_for(lc: ListenerConfig) -> ProxyPool | None: - """Resolve the proxy pool for a listener.""" - if lc.pool_hops <= 0: - return None - name = lc.pool_name or "default" - if name not in proxy_pools: - raise RuntimeError( - f"listener {lc.listen_host}:{lc.listen_port} " - f"references unknown pool {name!r}" - ) - return proxy_pools[name] + def _pools_for(lc: ListenerConfig) -> list[ProxyPool]: + """Resolve the ordered list of proxy pools for a listener.""" + result: list[ProxyPool] = [] + for name in lc.pool_seq: + if name not in proxy_pools: + raise RuntimeError( + f"listener {lc.listen_host}:{lc.listen_port} " + f"references unknown pool {name!r}" + ) + result.append(proxy_pools[name]) + return result # -- per-unique first-hop connection pools -------------------------------- hop_pools: dict[tuple[str, int], FirstHopPool] = {} @@ -361,17 +361,17 @@ async def serve(config: Config) -> None: servers: list[asyncio.Server] = [] for lc in listeners: hp = _hop_pool_for(lc) - lc_pool = _pool_for(lc) + lc_pools = _pools_for(lc) async def on_client( r: asyncio.StreamReader, w: asyncio.StreamWriter, _lc: ListenerConfig = lc, _hp: FirstHopPool | None = hp, - _pool: ProxyPool | None = lc_pool, + _pools: list[ProxyPool] = lc_pools, ) -> None: async with sem: await _handle_client( r, w, _lc, config.timeout, config.retries, - _pool, metrics, _hp, tor_rr, hop_pools, + _pools, metrics, _hp, tor_rr, hop_pools, ) srv = await asyncio.start_server(on_client, lc.listen_host, lc.listen_port) @@ -380,9 +380,15 @@ async def serve(config: Config) -> None: addr = f"{lc.listen_host}:{lc.listen_port}" chain_desc = " -> ".join(str(h) for h in lc.chain) if lc.chain else "direct" nhops = lc.pool_hops - pool_desc = f" + {nhops} pool hop{'s' if nhops != 1 else ''}" if nhops else "" - if lc_pool and lc_pool.name != "default": - pool_desc += f" [{lc_pool.name}]" + pool_desc = "" + if nhops: + distinct = list(dict.fromkeys(lc.pool_seq)) + if len(distinct) == 1: + pool_desc = f" + {nhops} pool hop{'s' if nhops != 1 else ''}" + if distinct[0] != "default": + pool_desc += f" [{distinct[0]}]" + else: + pool_desc = f" + pool [{' -> '.join(lc.pool_seq)}]" logger.info("listener %s chain: %s%s", addr, chain_desc, pool_desc) logger.info("max_connections=%d", config.max_connections) diff --git a/tests/test_api.py b/tests/test_api.py index e48a821..0e121b8 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -132,7 +132,7 @@ class TestHandleStatus: ListenerConfig( listen_host="0.0.0.0", listen_port=1081, chain=[ChainHop("socks5", "127.0.0.1", 9050)], - pool_hops=1, + pool_seq=["default"], ), ], ) @@ -170,6 +170,56 @@ class TestHandleStatusPools: assert body["pools"]["mitm"] == {"alive": 3, "total": 8} +class TestHandleStatusMultiPool: + """Test pool_seq appears in /status only for multi-pool listeners.""" + + def test_single_pool_no_pool_seq(self): + """Single-pool listener: no pool_seq in response.""" + config = Config( + listeners=[ + ListenerConfig( + listen_host="0.0.0.0", listen_port=1080, + chain=[ChainHop("socks5", "127.0.0.1", 9050)], + pool_seq=["clean", "clean"], pool_name="clean", + ), + ], + ) + ctx = _make_ctx(config=config) + _, body = _handle_status(ctx) + assert "pool_seq" not in body["listeners"][0] + + def test_multi_pool_has_pool_seq(self): + """Multi-pool listener: pool_seq appears in response.""" + config = Config( + listeners=[ + ListenerConfig( + listen_host="0.0.0.0", listen_port=1080, + chain=[ChainHop("socks5", "127.0.0.1", 9050)], + pool_seq=["clean", "mitm"], pool_name="clean", + ), + ], + ) + ctx = _make_ctx(config=config) + _, body = _handle_status(ctx) + assert body["listeners"][0]["pool_seq"] == ["clean", "mitm"] + assert body["listeners"][0]["pool_hops"] == 2 + + def test_multi_pool_in_config(self): + """Multi-pool listener: pool_seq appears in /config response.""" + config = Config( + listeners=[ + ListenerConfig( + listen_host="0.0.0.0", listen_port=1080, + chain=[ChainHop("socks5", "127.0.0.1", 9050)], + pool_seq=["clean", "mitm"], pool_name="clean", + ), + ], + ) + ctx = _make_ctx(config=config) + _, body = _handle_config(ctx) + assert body["listeners"][0]["pool_seq"] == ["clean", "mitm"] + + class TestHandleMetrics: """Test GET /metrics handler.""" @@ -318,7 +368,7 @@ class TestHandleConfig: proxy_pool=pp, listeners=[ListenerConfig( chain=[ChainHop("socks5", "127.0.0.1", 9050)], - pool_hops=1, + pool_seq=["default"], )], ) ctx = _make_ctx(config=config) @@ -345,7 +395,7 @@ class TestHandleConfig: listeners=[ListenerConfig( listen_host="0.0.0.0", listen_port=1080, chain=[ChainHop("socks5", "127.0.0.1", 9050)], - pool_hops=2, pool_name="clean", + pool_seq=["clean", "clean"], pool_name="clean", )], ) ctx = _make_ctx(config=config) diff --git a/tests/test_config.py b/tests/test_config.py index eccf721..eccff43 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -407,6 +407,110 @@ class TestListenerConfig: assert c.listeners[0].chain == [] +class TestPoolSeq: + """Test per-hop pool references (pool:name syntax).""" + + def test_bare_pool_uses_default_name(self, tmp_path): + """Bare `pool` + `pool: clean` -> pool_seq=["clean"].""" + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "listeners:\n" + " - listen: 1080\n" + " pool: clean\n" + " chain:\n" + " - pool\n" + ) + c = load_config(cfg_file) + assert c.listeners[0].pool_seq == ["clean"] + + def test_bare_pool_no_pool_name(self, tmp_path): + """Bare `pool` with no `pool:` key -> pool_seq=["default"].""" + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "listeners:\n" + " - listen: 1080\n" + " chain:\n" + " - pool\n" + ) + c = load_config(cfg_file) + assert c.listeners[0].pool_seq == ["default"] + + def test_pool_colon_name(self, tmp_path): + """`pool:clean, pool:mitm` -> pool_seq=["clean", "mitm"].""" + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "listeners:\n" + " - listen: 1080\n" + " chain:\n" + " - pool:clean\n" + " - pool:mitm\n" + ) + c = load_config(cfg_file) + assert c.listeners[0].pool_seq == ["clean", "mitm"] + + def test_mixed_bare_and_named(self, tmp_path): + """Bare `pool` + `pool:mitm` with `pool: clean` -> ["clean", "mitm"].""" + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "listeners:\n" + " - listen: 1080\n" + " pool: clean\n" + " chain:\n" + " - pool\n" + " - pool:mitm\n" + ) + c = load_config(cfg_file) + assert c.listeners[0].pool_seq == ["clean", "mitm"] + + def test_pool_colon_case_insensitive_prefix(self, tmp_path): + """`Pool:MyPool` -> pool_seq=["MyPool"] (prefix case-insensitive).""" + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "listeners:\n" + " - listen: 1080\n" + " chain:\n" + " - Pool:MyPool\n" + ) + c = load_config(cfg_file) + assert c.listeners[0].pool_seq == ["MyPool"] + + def test_pool_colon_empty_is_bare(self, tmp_path): + """`pool:` (empty name) -> treated as bare pool.""" + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "listeners:\n" + " - listen: 1080\n" + " pool: clean\n" + " chain:\n" + " - pool:\n" + ) + c = load_config(cfg_file) + assert c.listeners[0].pool_seq == ["clean"] + + def test_backward_compat_pool_hops_property(self): + """pool_hops property returns len(pool_seq).""" + lc = ListenerConfig(pool_seq=["clean", "mitm"]) + assert lc.pool_hops == 2 + lc2 = ListenerConfig() + assert lc2.pool_hops == 0 + + def test_legacy_auto_append(self, tmp_path): + """Singular `proxy_pool:` -> pool_seq=["default"].""" + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "listen: 0.0.0.0:1080\n" + "chain:\n" + " - socks5://127.0.0.1:9050\n" + "proxy_pool:\n" + " sources:\n" + " - url: http://api:8081/proxies\n" + ) + c = load_config(cfg_file) + lc = c.listeners[0] + assert lc.pool_seq == ["default"] + assert lc.pool_hops == 1 + + class TestListenerBackwardCompat: """Test backward-compatible single listener from old format."""