feat: add per-hop pool references in listener chains

Allow listeners to mix named pools in a single chain using pool:name
syntax. Bare "pool" continues to use the listener's default pool.
Replaces pool_hops field with pool_seq list; pool_hops is now a
backward-compatible property. Each hop draws from its own pool and
failure reporting targets the correct source pool.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-20 17:50:17 +01:00
parent a1c238d4a1
commit ef0d8f347b
9 changed files with 275 additions and 66 deletions

View File

@@ -93,6 +93,7 @@ proxy_pools:
max_fails: 3 max_fails: 3
# Multi-listener: each port gets a chain depth and pool assignment # Multi-listener: each port gets a chain depth and pool assignment
# Use "pool" for listener default, "pool:name" for explicit pool per hop
listeners: listeners:
- listen: 0.0.0.0:1080 - listen: 0.0.0.0:1080
pool: clean pool: clean
@@ -104,16 +105,11 @@ listeners:
pool: clean pool: clean
chain: chain:
- socks5://127.0.0.1:9050 - socks5://127.0.0.1:9050
- pool # Tor + 1 clean proxy - pool:clean # per-hop: clean pool
- pool:mitm # per-hop: mitm pool
- listen: 0.0.0.0:1082 - listen: 0.0.0.0:1082
chain: chain:
- socks5://127.0.0.1:9050 # Tor only - socks5://127.0.0.1:9050 # Tor only
- listen: 0.0.0.0:1083
pool: mitm
chain:
- socks5://127.0.0.1:9050
- pool # Tor + 2 MITM proxies
- pool
# Singular proxy_pool: still works (becomes pool "default") # Singular proxy_pool: still works (becomes pool "default")
@@ -150,9 +146,8 @@ Options:
``` ```
:1080 Client -> s5p -> Tor -> [clean] -> [clean] -> Dest (2 clean hops) :1080 Client -> s5p -> Tor -> [clean] -> [clean] -> Dest (2 clean hops)
:1081 Client -> s5p -> Tor -> [clean] -> Dest (1 clean hop) :1081 Client -> s5p -> Tor -> [clean] -> [mitm] -> Dest (mixed pools)
:1082 Client -> s5p -> Tor -> Dest (Tor only) :1082 Client -> s5p -> Tor -> Dest (Tor only)
:1083 Client -> s5p -> Tor -> [mitm] -> [mitm] -> Dest (2 MITM hops)
``` ```
s5p connects to Hop1 via TCP, negotiates the hop protocol (SOCKS5/4/HTTP), s5p connects to Hop1 via TCP, negotiates the hop protocol (SOCKS5/4/HTTP),

View File

@@ -87,30 +87,35 @@ chain:
# a random alive proxy from the named pool (or "default" if unnamed). # a random alive proxy from the named pool (or "default" if unnamed).
# Multiple "pool" entries = multiple pool hops (deeper chaining). # Multiple "pool" entries = multiple pool hops (deeper chaining).
# #
# Per-hop pool references: use "pool:name" to draw from a specific pool
# at that hop position. Bare "pool" uses the listener's "pool:" default.
# This lets a single listener mix pools in one chain.
#
# listeners: # listeners:
# - listen: 0.0.0.0:1080 # - listen: 0.0.0.0:1080
# pool: clean # draw from "clean" pool # pool: clean # default for bare "pool"
# chain: # chain:
# - socks5://127.0.0.1:9050 # - socks5://127.0.0.1:9050
# - pool # Tor + 2 clean pool proxies # - pool:clean # explicit: from clean pool
# - pool # - pool:mitm # explicit: from mitm pool
# #
# - listen: 0.0.0.0:1081 # - listen: 0.0.0.0:1081
# pool: clean # pool: clean
# chain: # chain:
# - socks5://127.0.0.1:9050 # - socks5://127.0.0.1:9050
# - pool # Tor + 1 clean pool proxy # - pool # bare: uses default "clean"
# - pool
# #
# - listen: 0.0.0.0:1082 # - listen: 0.0.0.0:1082
# chain: # chain:
# - socks5://127.0.0.1:9050 # Tor only (no pool hops) # - socks5://127.0.0.1:9050 # Tor only (no pool hops)
# #
# - listen: 0.0.0.0:1083 # - listen: 0.0.0.0:1083
# pool: mitm # draw from "mitm" pool # pool: clean
# chain: # chain:
# - socks5://127.0.0.1:9050 # - socks5://127.0.0.1:9050
# - pool # Tor + 2 MITM pool proxies # - pool # bare "pool" = clean
# - pool # - pool:mitm # explicit = mitm
# #
# When using "listeners:", the top-level "listen" and "chain" keys are ignored. # When using "listeners:", the top-level "listen" and "chain" keys are ignored.
# If "listeners:" is absent, the old format is used (single listener). # If "listeners:" is absent, the old format is used (single listener).

View File

@@ -43,7 +43,7 @@ cp config/example.yaml config/s5p.yaml # create live config (gitignored)
```yaml ```yaml
listeners: listeners:
- listen: 0.0.0.0:1080 - listen: 0.0.0.0:1080
pool: clean # named pool assignment pool: clean # default for bare "pool"
chain: chain:
- socks5://127.0.0.1:9050 - socks5://127.0.0.1:9050
- pool # Tor + 2 clean hops - pool # Tor + 2 clean hops
@@ -52,18 +52,21 @@ listeners:
pool: clean pool: clean
chain: chain:
- socks5://127.0.0.1:9050 - socks5://127.0.0.1:9050
- pool # Tor + 1 clean hop - pool:clean # per-hop: explicit clean
- pool:mitm # per-hop: explicit mitm
- listen: 0.0.0.0:1082 - listen: 0.0.0.0:1082
chain: chain:
- socks5://127.0.0.1:9050 # Tor only - socks5://127.0.0.1:9050 # Tor only
- listen: 0.0.0.0:1083 - listen: 0.0.0.0:1083
pool: mitm # MITM-capable proxies pool: clean
chain: chain:
- socks5://127.0.0.1:9050 - socks5://127.0.0.1:9050
- pool - pool # bare = clean (default)
- pool - pool:mitm # explicit = mitm
``` ```
Per-hop pool: `pool` = listener default, `pool:name` = explicit pool.
## Multi-Tor Round-Robin (config) ## Multi-Tor Round-Robin (config)
```yaml ```yaml

View File

@@ -177,6 +177,36 @@ listeners:
- pool - pool
``` ```
### Per-hop pool references
Use `pool:name` to draw from a specific named pool at that hop position.
Bare `pool` uses the listener's `pool:` default. This lets a single listener
mix pools in one chain.
```yaml
listeners:
- listen: 0.0.0.0:1080
pool: clean # default for bare "pool"
chain:
- socks5://10.200.1.13:9050
- pool:clean # explicit: from clean pool
- pool:mitm # explicit: from mitm pool
- listen: 0.0.0.0:1081
pool: clean
chain:
- socks5://10.200.1.13:9050
- pool # bare: uses default "clean"
- pool:mitm # explicit: from mitm pool
```
| Syntax | Resolves to |
|--------|-------------|
| `pool` | Listener's `pool:` value, or `"default"` if unset |
| `pool:name` | Named pool `name` (case-sensitive) |
| `pool:` | Same as bare `pool` (empty name = default) |
| `Pool:name` | Prefix is case-insensitive; name is case-sensitive |
The `pool` keyword in a chain means "append a random alive proxy from the The `pool` keyword in a chain means "append a random alive proxy from the
assigned pool". Multiple `pool` entries = multiple pool hops (deeper chaining). assigned pool". Multiple `pool` entries = multiple pool hops (deeper chaining).

View File

@@ -88,6 +88,7 @@ def _handle_status(ctx: dict) -> tuple[int, dict]:
"chain": [str(h) for h in lc.chain], "chain": [str(h) for h in lc.chain],
"pool_hops": lc.pool_hops, "pool_hops": lc.pool_hops,
**({"pool": lc.pool_name} if lc.pool_name else {}), **({"pool": lc.pool_name} if lc.pool_name else {}),
**({"pool_seq": lc.pool_seq} if len(set(lc.pool_seq)) > 1 else {}),
"latency": metrics.get_listener_latency( "latency": metrics.get_listener_latency(
f"{lc.listen_host}:{lc.listen_port}" f"{lc.listen_host}:{lc.listen_port}"
).stats(), ).stats(),
@@ -166,6 +167,7 @@ def _handle_config(ctx: dict) -> tuple[int, dict]:
"chain": [str(h) for h in lc.chain], "chain": [str(h) for h in lc.chain],
"pool_hops": lc.pool_hops, "pool_hops": lc.pool_hops,
**({"pool": lc.pool_name} if lc.pool_name else {}), **({"pool": lc.pool_name} if lc.pool_name else {}),
**({"pool_seq": lc.pool_seq} if len(set(lc.pool_seq)) > 1 else {}),
} }
for lc in config.listeners for lc in config.listeners
], ],

View File

@@ -85,9 +85,14 @@ class ListenerConfig:
listen_host: str = "127.0.0.1" listen_host: str = "127.0.0.1"
listen_port: int = 1080 listen_port: int = 1080
chain: list[ChainHop] = field(default_factory=list) chain: list[ChainHop] = field(default_factory=list)
pool_hops: int = 0 pool_seq: list[str] = field(default_factory=list)
pool_name: str = "" pool_name: str = ""
@property
def pool_hops(self) -> int:
"""Number of pool hops (backward compat)."""
return len(self.pool_seq)
@dataclass @dataclass
class Config: class Config:
@@ -306,11 +311,20 @@ def load_config(path: str | Path) -> Config:
lc.pool_name = entry["pool"] lc.pool_name = entry["pool"]
chain_raw = entry.get("chain", []) chain_raw = entry.get("chain", [])
for item in chain_raw: for item in chain_raw:
if isinstance(item, str) and item.lower() == "pool": if isinstance(item, str):
lc.pool_hops += 1 lower = item.lower()
elif isinstance(item, str): if lower == "pool" or lower.startswith("pool:"):
_, _, name = item.partition(":")
lc.pool_seq.append(name if name else (lc.pool_name or "default"))
else:
lc.chain.append(parse_proxy_url(item)) lc.chain.append(parse_proxy_url(item))
elif isinstance(item, dict): elif isinstance(item, dict):
# YAML parses "pool:" and "pool: name" as dicts
pool_key = next((k for k in item if k.lower() == "pool"), None)
if pool_key is not None and len(item) == 1:
name = item[pool_key]
lc.pool_seq.append(name if name else (lc.pool_name or "default"))
else:
lc.chain.append( lc.chain.append(
ChainHop( ChainHop(
proto=item.get("proto", "socks5"), proto=item.get("proto", "socks5"),
@@ -330,7 +344,7 @@ def load_config(path: str | Path) -> Config:
) )
# legacy behavior: if proxy_pool configured, auto-append 1 pool hop # legacy behavior: if proxy_pool configured, auto-append 1 pool hop
if config.proxy_pool and config.proxy_pool.sources: if config.proxy_pool and config.proxy_pool.sources:
lc.pool_hops = 1 lc.pool_seq = ["default"]
config.listeners.append(lc) config.listeners.append(lc)
return config return config

View File

@@ -76,7 +76,7 @@ async def _handle_client(
listener: ListenerConfig, listener: ListenerConfig,
timeout: float, timeout: float,
retries: int, retries: int,
proxy_pool: ProxyPool | None = None, pool_seq: list[ProxyPool] | None = None,
metrics: Metrics | None = None, metrics: Metrics | None = None,
first_hop_pool: FirstHopPool | None = None, first_hop_pool: FirstHopPool | None = None,
tor_rr: _RoundRobin | None = None, tor_rr: _RoundRobin | None = None,
@@ -119,7 +119,7 @@ async def _handle_client(
logger.info("[%s] connect %s:%d", tag, target_host, target_port) logger.info("[%s] connect %s:%d", tag, target_host, target_port)
# -- build chain (with retry) -- # -- build chain (with retry) --
attempts = retries if proxy_pool and listener.pool_hops > 0 else 1 attempts = retries if pool_seq else 1
last_err: Exception | None = None last_err: Exception | None = None
for attempt in range(attempts): for attempt in range(attempts):
@@ -131,15 +131,15 @@ async def _handle_client(
if hop_pools: if hop_pools:
fhp = hop_pools.get((node.host, node.port)) fhp = hop_pools.get((node.host, node.port))
pool_hops: list[ChainHop] = [] pool_hops: list[tuple[ChainHop, ProxyPool]] = []
if proxy_pool and listener.pool_hops > 0: if pool_seq:
for _ in range(listener.pool_hops): for pp in pool_seq:
hop = await proxy_pool.get() hop = await pp.get()
if hop: if hop:
pool_hops.append(hop) pool_hops.append((hop, pp))
effective_chain.append(hop) effective_chain.append(hop)
if pool_hops: if pool_hops:
logger.debug("[%s] +pool %s", tag, " ".join(str(h) for h in pool_hops)) logger.debug("[%s] +pool %s", tag, " ".join(str(h) for h, _ in pool_hops))
try: try:
t0 = time.monotonic() t0 = time.monotonic()
@@ -157,9 +157,9 @@ async def _handle_client(
break break
except (ProtoError, TimeoutError, ConnectionError, OSError) as e: except (ProtoError, TimeoutError, ConnectionError, OSError) as e:
last_err = e last_err = e
if pool_hops and proxy_pool: if pool_hops:
for hop in pool_hops: for hop, pp in pool_hops:
proxy_pool.report_failure(hop) pp.report_failure(hop)
if metrics: if metrics:
metrics.retries += 1 metrics.retries += 1
if attempt + 1 < attempts: if attempt + 1 < attempts:
@@ -295,17 +295,17 @@ async def serve(config: Config) -> None:
await pool.start() await pool.start()
proxy_pools["default"] = pool proxy_pools["default"] = pool
def _pool_for(lc: ListenerConfig) -> ProxyPool | None: def _pools_for(lc: ListenerConfig) -> list[ProxyPool]:
"""Resolve the proxy pool for a listener.""" """Resolve the ordered list of proxy pools for a listener."""
if lc.pool_hops <= 0: result: list[ProxyPool] = []
return None for name in lc.pool_seq:
name = lc.pool_name or "default"
if name not in proxy_pools: if name not in proxy_pools:
raise RuntimeError( raise RuntimeError(
f"listener {lc.listen_host}:{lc.listen_port} " f"listener {lc.listen_host}:{lc.listen_port} "
f"references unknown pool {name!r}" f"references unknown pool {name!r}"
) )
return proxy_pools[name] result.append(proxy_pools[name])
return result
# -- per-unique first-hop connection pools -------------------------------- # -- per-unique first-hop connection pools --------------------------------
hop_pools: dict[tuple[str, int], FirstHopPool] = {} hop_pools: dict[tuple[str, int], FirstHopPool] = {}
@@ -361,17 +361,17 @@ async def serve(config: Config) -> None:
servers: list[asyncio.Server] = [] servers: list[asyncio.Server] = []
for lc in listeners: for lc in listeners:
hp = _hop_pool_for(lc) hp = _hop_pool_for(lc)
lc_pool = _pool_for(lc) lc_pools = _pools_for(lc)
async def on_client( async def on_client(
r: asyncio.StreamReader, w: asyncio.StreamWriter, r: asyncio.StreamReader, w: asyncio.StreamWriter,
_lc: ListenerConfig = lc, _hp: FirstHopPool | None = hp, _lc: ListenerConfig = lc, _hp: FirstHopPool | None = hp,
_pool: ProxyPool | None = lc_pool, _pools: list[ProxyPool] = lc_pools,
) -> None: ) -> None:
async with sem: async with sem:
await _handle_client( await _handle_client(
r, w, _lc, config.timeout, config.retries, r, w, _lc, config.timeout, config.retries,
_pool, metrics, _hp, tor_rr, hop_pools, _pools, metrics, _hp, tor_rr, hop_pools,
) )
srv = await asyncio.start_server(on_client, lc.listen_host, lc.listen_port) srv = await asyncio.start_server(on_client, lc.listen_host, lc.listen_port)
@@ -380,9 +380,15 @@ async def serve(config: Config) -> None:
addr = f"{lc.listen_host}:{lc.listen_port}" addr = f"{lc.listen_host}:{lc.listen_port}"
chain_desc = " -> ".join(str(h) for h in lc.chain) if lc.chain else "direct" chain_desc = " -> ".join(str(h) for h in lc.chain) if lc.chain else "direct"
nhops = lc.pool_hops nhops = lc.pool_hops
pool_desc = f" + {nhops} pool hop{'s' if nhops != 1 else ''}" if nhops else "" pool_desc = ""
if lc_pool and lc_pool.name != "default": if nhops:
pool_desc += f" [{lc_pool.name}]" distinct = list(dict.fromkeys(lc.pool_seq))
if len(distinct) == 1:
pool_desc = f" + {nhops} pool hop{'s' if nhops != 1 else ''}"
if distinct[0] != "default":
pool_desc += f" [{distinct[0]}]"
else:
pool_desc = f" + pool [{' -> '.join(lc.pool_seq)}]"
logger.info("listener %s chain: %s%s", addr, chain_desc, pool_desc) logger.info("listener %s chain: %s%s", addr, chain_desc, pool_desc)
logger.info("max_connections=%d", config.max_connections) logger.info("max_connections=%d", config.max_connections)

View File

@@ -132,7 +132,7 @@ class TestHandleStatus:
ListenerConfig( ListenerConfig(
listen_host="0.0.0.0", listen_port=1081, listen_host="0.0.0.0", listen_port=1081,
chain=[ChainHop("socks5", "127.0.0.1", 9050)], chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_hops=1, pool_seq=["default"],
), ),
], ],
) )
@@ -170,6 +170,56 @@ class TestHandleStatusPools:
assert body["pools"]["mitm"] == {"alive": 3, "total": 8} assert body["pools"]["mitm"] == {"alive": 3, "total": 8}
class TestHandleStatusMultiPool:
"""Test pool_seq appears in /status only for multi-pool listeners."""
def test_single_pool_no_pool_seq(self):
"""Single-pool listener: no pool_seq in response."""
config = Config(
listeners=[
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_seq=["clean", "clean"], pool_name="clean",
),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_status(ctx)
assert "pool_seq" not in body["listeners"][0]
def test_multi_pool_has_pool_seq(self):
"""Multi-pool listener: pool_seq appears in response."""
config = Config(
listeners=[
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_seq=["clean", "mitm"], pool_name="clean",
),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_status(ctx)
assert body["listeners"][0]["pool_seq"] == ["clean", "mitm"]
assert body["listeners"][0]["pool_hops"] == 2
def test_multi_pool_in_config(self):
"""Multi-pool listener: pool_seq appears in /config response."""
config = Config(
listeners=[
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_seq=["clean", "mitm"], pool_name="clean",
),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert body["listeners"][0]["pool_seq"] == ["clean", "mitm"]
class TestHandleMetrics: class TestHandleMetrics:
"""Test GET /metrics handler.""" """Test GET /metrics handler."""
@@ -318,7 +368,7 @@ class TestHandleConfig:
proxy_pool=pp, proxy_pool=pp,
listeners=[ListenerConfig( listeners=[ListenerConfig(
chain=[ChainHop("socks5", "127.0.0.1", 9050)], chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_hops=1, pool_seq=["default"],
)], )],
) )
ctx = _make_ctx(config=config) ctx = _make_ctx(config=config)
@@ -345,7 +395,7 @@ class TestHandleConfig:
listeners=[ListenerConfig( listeners=[ListenerConfig(
listen_host="0.0.0.0", listen_port=1080, listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)], chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_hops=2, pool_name="clean", pool_seq=["clean", "clean"], pool_name="clean",
)], )],
) )
ctx = _make_ctx(config=config) ctx = _make_ctx(config=config)

View File

@@ -407,6 +407,110 @@ class TestListenerConfig:
assert c.listeners[0].chain == [] assert c.listeners[0].chain == []
class TestPoolSeq:
"""Test per-hop pool references (pool:name syntax)."""
def test_bare_pool_uses_default_name(self, tmp_path):
"""Bare `pool` + `pool: clean` -> pool_seq=["clean"]."""
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
" - listen: 1080\n"
" pool: clean\n"
" chain:\n"
" - pool\n"
)
c = load_config(cfg_file)
assert c.listeners[0].pool_seq == ["clean"]
def test_bare_pool_no_pool_name(self, tmp_path):
"""Bare `pool` with no `pool:` key -> pool_seq=["default"]."""
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
" - listen: 1080\n"
" chain:\n"
" - pool\n"
)
c = load_config(cfg_file)
assert c.listeners[0].pool_seq == ["default"]
def test_pool_colon_name(self, tmp_path):
"""`pool:clean, pool:mitm` -> pool_seq=["clean", "mitm"]."""
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
" - listen: 1080\n"
" chain:\n"
" - pool:clean\n"
" - pool:mitm\n"
)
c = load_config(cfg_file)
assert c.listeners[0].pool_seq == ["clean", "mitm"]
def test_mixed_bare_and_named(self, tmp_path):
"""Bare `pool` + `pool:mitm` with `pool: clean` -> ["clean", "mitm"]."""
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
" - listen: 1080\n"
" pool: clean\n"
" chain:\n"
" - pool\n"
" - pool:mitm\n"
)
c = load_config(cfg_file)
assert c.listeners[0].pool_seq == ["clean", "mitm"]
def test_pool_colon_case_insensitive_prefix(self, tmp_path):
"""`Pool:MyPool` -> pool_seq=["MyPool"] (prefix case-insensitive)."""
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
" - listen: 1080\n"
" chain:\n"
" - Pool:MyPool\n"
)
c = load_config(cfg_file)
assert c.listeners[0].pool_seq == ["MyPool"]
def test_pool_colon_empty_is_bare(self, tmp_path):
"""`pool:` (empty name) -> treated as bare pool."""
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
" - listen: 1080\n"
" pool: clean\n"
" chain:\n"
" - pool:\n"
)
c = load_config(cfg_file)
assert c.listeners[0].pool_seq == ["clean"]
def test_backward_compat_pool_hops_property(self):
"""pool_hops property returns len(pool_seq)."""
lc = ListenerConfig(pool_seq=["clean", "mitm"])
assert lc.pool_hops == 2
lc2 = ListenerConfig()
assert lc2.pool_hops == 0
def test_legacy_auto_append(self, tmp_path):
"""Singular `proxy_pool:` -> pool_seq=["default"]."""
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listen: 0.0.0.0:1080\n"
"chain:\n"
" - socks5://127.0.0.1:9050\n"
"proxy_pool:\n"
" sources:\n"
" - url: http://api:8081/proxies\n"
)
c = load_config(cfg_file)
lc = c.listeners[0]
assert lc.pool_seq == ["default"]
assert lc.pool_hops == 1
class TestListenerBackwardCompat: class TestListenerBackwardCompat:
"""Test backward-compatible single listener from old format.""" """Test backward-compatible single listener from old format."""