feat: add bypass rules, weighted pool selection, integration tests

Per-listener bypass rules skip the chain for local/private destinations
(CIDR, exact IP/hostname, domain suffix). Weighted multi-candidate pool
selection biases toward pools with more alive proxies. End-to-end
integration tests validate the full client->s5p->hop->target path using
mock SOCKS5 proxies.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-20 19:58:12 +01:00
parent ef0d8f347b
commit c191942712
11 changed files with 745 additions and 69 deletions

View File

@@ -93,7 +93,8 @@ proxy_pools:
max_fails: 3
# Multi-listener: each port gets a chain depth and pool assignment
# Use "pool" for listener default, "pool:name" for explicit pool per hop
# Use "pool" for listener default, "pool:name" for explicit pool per hop,
# or [pool:a, pool:b] for random choice from candidates per connection
listeners:
- listen: 0.0.0.0:1080
pool: clean
@@ -102,11 +103,10 @@ listeners:
- pool # Tor + 2 clean proxies
- pool
- listen: 0.0.0.0:1081
pool: clean
chain:
- socks5://127.0.0.1:9050
- pool:clean # per-hop: clean pool
- pool:mitm # per-hop: mitm pool
- [pool:clean, pool:mitm] # random choice per connection
- [pool:clean, pool:mitm] # independent random choice
- listen: 0.0.0.0:1082
chain:
- socks5://127.0.0.1:9050 # Tor only
@@ -145,9 +145,9 @@ Options:
## How Chaining Works
```
:1080 Client -> s5p -> Tor -> [clean] -> [clean] -> Dest (2 clean hops)
:1081 Client -> s5p -> Tor -> [clean] -> [mitm] -> Dest (mixed pools)
:1082 Client -> s5p -> Tor -> Dest (Tor only)
:1080 Client -> s5p -> Tor -> [clean] -> [clean] -> Dest (2 clean hops)
:1081 Client -> s5p -> Tor -> [clean|mitm] -> [clean|mitm] -> Dest (random)
:1082 Client -> s5p -> Tor -> Dest (Tor only)
```
s5p connects to Hop1 via TCP, negotiates the hop protocol (SOCKS5/4/HTTP),

View File

@@ -91,13 +91,24 @@ chain:
# at that hop position. Bare "pool" uses the listener's "pool:" default.
# This lets a single listener mix pools in one chain.
#
# Multi-candidate hops: use a YAML list to randomly pick from a set of
# pools at each hop. On each connection, one pool is chosen per hop.
#
# listeners:
# - listen: 0.0.0.0:1080
# pool: clean # default for bare "pool"
# bypass: # skip chain for these destinations
# - 127.0.0.0/8 # loopback
# - 10.0.0.0/8 # RFC 1918
# - 192.168.0.0/16 # RFC 1918
# - 172.16.0.0/12 # RFC 1918
# - fc00::/7 # IPv6 ULA
# - localhost # exact hostname
# - .local # domain suffix
# chain:
# - socks5://127.0.0.1:9050
# - pool:clean # explicit: from clean pool
# - pool:mitm # explicit: from mitm pool
# - [pool:clean, pool:mitm] # random choice per connection
# - [pool:clean, pool:mitm] # independent random choice
#
# - listen: 0.0.0.0:1081
# pool: clean

View File

@@ -58,14 +58,37 @@ listeners:
chain:
- socks5://127.0.0.1:9050 # Tor only
- listen: 0.0.0.0:1083
pool: clean
chain:
- socks5://127.0.0.1:9050
- pool # bare = clean (default)
- pool:mitm # explicit = mitm
- [pool:clean, pool:mitm] # random choice per connection
- [pool:clean, pool:mitm] # independent random choice
```
Per-hop pool: `pool` = listener default, `pool:name` = explicit pool.
Per-hop pool: `pool` = listener default, `pool:name` = explicit pool,
`[pool:a, pool:b]` = random choice from candidates.
## Bypass Rules (config)
```yaml
listeners:
- listen: 0.0.0.0:1080
bypass:
- 127.0.0.0/8 # CIDR
- 10.0.0.0/8 # CIDR
- 192.168.0.0/16 # CIDR
- localhost # exact hostname
- .local # domain suffix
chain:
- socks5://127.0.0.1:9050
- pool
```
| Pattern | Type | Matches |
|---------|------|---------|
| `10.0.0.0/8` | CIDR | IPs in network |
| `127.0.0.1` | Exact IP | That IP only |
| `localhost` | Exact host | String equal |
| `.local` | Suffix | `*.local` and `local` |
## Multi-Tor Round-Robin (config)

View File

@@ -206,10 +206,28 @@ listeners:
| `pool:name` | Named pool `name` (case-sensitive) |
| `pool:` | Same as bare `pool` (empty name = default) |
| `Pool:name` | Prefix is case-insensitive; name is case-sensitive |
| `[pool:a, pool:b]` | Random choice from candidates `a` or `b` per connection |
The `pool` keyword in a chain means "append a random alive proxy from the
assigned pool". Multiple `pool` entries = multiple pool hops (deeper chaining).
### Multi-candidate pool hops
Use a YAML list to randomly pick from a set of candidate pools at each hop.
On each connection, one candidate is chosen at random per hop (independently).
```yaml
listeners:
- listen: 0.0.0.0:1080
chain:
- socks5://10.200.1.13:9050
- [pool:clean, pool:mitm] # hop 1: random choice
- [pool:clean, pool:mitm] # hop 2: random choice
```
Single-element pool references (`pool`, `pool:name`) and multi-candidate
lists can be mixed freely in the same chain. All existing syntax is unchanged.
When `pool:` is omitted on a listener with pool hops, it defaults to
`"default"`. A listener referencing an unknown pool name causes a fatal
error at startup. Listeners without pool hops ignore the `pool:` key.
@@ -224,6 +242,41 @@ error at startup. Listeners without pool hops ignore the `pool:` key.
| FirstHopPool | per unique first hop | Listeners with same first hop share it |
| Chain + pool_hops | per listener | Each listener has its own chain depth |
## Bypass Rules
Per-listener rules to skip the chain for specific destinations. When a target
matches a bypass rule, s5p connects directly (no chain, no pool hops).
```yaml
listeners:
- listen: 0.0.0.0:1080
bypass:
- 127.0.0.0/8 # CIDR: loopback
- 10.0.0.0/8 # CIDR: RFC 1918
- 192.168.0.0/16 # CIDR: RFC 1918
- fc00::/7 # CIDR: IPv6 ULA
- localhost # exact hostname
- .local # domain suffix (matches *.local and local)
chain:
- socks5://127.0.0.1:9050
- pool
```
### Rule syntax
| Pattern | Type | Matches |
|---------|------|---------|
| `10.0.0.0/8` | CIDR | Any IP in the network |
| `127.0.0.1` | Exact IP | That IP only |
| `localhost` | Exact hostname | String-equal match |
| `.local` | Domain suffix | `*.local` and `local` itself |
CIDR rules only match IP addresses, not hostnames. Domain suffix rules only
match hostnames, not IPs. Exact rules match both (string compare for hostnames,
parsed compare for IPs).
When bypass is active, retries are disabled (direct connections are not retried).
### Backward compatibility
When no `listeners:` key is present, the old `listen`/`chain` format creates

View File

@@ -88,7 +88,7 @@ def _handle_status(ctx: dict) -> tuple[int, dict]:
"chain": [str(h) for h in lc.chain],
"pool_hops": lc.pool_hops,
**({"pool": lc.pool_name} if lc.pool_name else {}),
**({"pool_seq": lc.pool_seq} if len(set(lc.pool_seq)) > 1 else {}),
**({"pool_seq": lc.pool_seq} if len({n for c in lc.pool_seq for n in c}) > 1 else {}),
"latency": metrics.get_listener_latency(
f"{lc.listen_host}:{lc.listen_port}"
).stats(),
@@ -167,7 +167,7 @@ def _handle_config(ctx: dict) -> tuple[int, dict]:
"chain": [str(h) for h in lc.chain],
"pool_hops": lc.pool_hops,
**({"pool": lc.pool_name} if lc.pool_name else {}),
**({"pool_seq": lc.pool_seq} if len(set(lc.pool_seq)) > 1 else {}),
**({"pool_seq": lc.pool_seq} if len({n for c in lc.pool_seq for n in c}) > 1 else {}),
}
for lc in config.listeners
],

View File

@@ -85,8 +85,9 @@ class ListenerConfig:
listen_host: str = "127.0.0.1"
listen_port: int = 1080
chain: list[ChainHop] = field(default_factory=list)
pool_seq: list[str] = field(default_factory=list)
pool_seq: list[list[str]] = field(default_factory=list)
pool_name: str = ""
bypass: list[str] = field(default_factory=list)
@property
def pool_hops(self) -> int:
@@ -193,6 +194,21 @@ def _parse_pool_config(pool_raw: dict) -> ProxyPoolConfig:
return ProxyPoolConfig(**kwargs)
def _parse_pool_ref(item: str, default: str) -> str:
"""Resolve a pool reference string to a pool name.
``pool`` or ``pool:`` -> *default*; ``pool:name`` -> ``name``.
The ``pool`` prefix is matched case-insensitively.
"""
lower = item.lower()
if lower == "pool" or lower == "pool:":
return default
if lower.startswith("pool:"):
_, _, name = item.partition(":")
return name if name else default
raise ValueError(f"not a pool reference: {item!r}")
def load_config(path: str | Path) -> Config:
"""Load configuration from a YAML file."""
path = Path(path)
@@ -307,15 +323,17 @@ def load_config(path: str | Path) -> Config:
lc.listen_port = int(port_str)
elif isinstance(listen, (str, int)) and listen:
lc.listen_port = int(listen)
if "bypass" in entry:
lc.bypass = list(entry["bypass"])
if "pool" in entry:
lc.pool_name = entry["pool"]
default_pool = lc.pool_name or "default"
chain_raw = entry.get("chain", [])
for item in chain_raw:
if isinstance(item, str):
lower = item.lower()
if lower == "pool" or lower.startswith("pool:"):
_, _, name = item.partition(":")
lc.pool_seq.append(name if name else (lc.pool_name or "default"))
lc.pool_seq.append([_parse_pool_ref(item, default_pool)])
else:
lc.chain.append(parse_proxy_url(item))
elif isinstance(item, dict):
@@ -323,7 +341,7 @@ def load_config(path: str | Path) -> Config:
pool_key = next((k for k in item if k.lower() == "pool"), None)
if pool_key is not None and len(item) == 1:
name = item[pool_key]
lc.pool_seq.append(name if name else (lc.pool_name or "default"))
lc.pool_seq.append([name if name else default_pool])
else:
lc.chain.append(
ChainHop(
@@ -334,6 +352,10 @@ def load_config(path: str | Path) -> Config:
password=item.get("password"),
)
)
elif isinstance(item, list):
# multi-candidate hop: [pool:clean, pool:mitm]
candidates = [_parse_pool_ref(str(el), default_pool) for el in item]
lc.pool_seq.append(candidates)
config.listeners.append(lc)
else:
# backward compat: build single listener from top-level fields
@@ -344,7 +366,7 @@ def load_config(path: str | Path) -> Config:
)
# legacy behavior: if proxy_pool configured, auto-append 1 pool hop
if config.proxy_pool and config.proxy_pool.sources:
lc.pool_seq = ["default"]
lc.pool_seq = [["default"]]
config.listeners.append(lc)
return config

View File

@@ -3,7 +3,9 @@
from __future__ import annotations
import asyncio
import ipaddress
import logging
import random
import signal
import struct
import time
@@ -70,13 +72,44 @@ def _socks5_reply(rep: int) -> bytes:
return struct.pack("!BBB", 0x05, rep, 0x00) + b"\x01\x00\x00\x00\x00\x00\x00"
def _bypass_match(rules: list[str], host: str) -> bool:
"""Check if host matches any bypass rule (CIDR, suffix, or exact)."""
addr = None
try:
addr = ipaddress.ip_address(host)
except ValueError:
pass
for rule in rules:
if "/" in rule:
if addr is not None:
try:
if addr in ipaddress.ip_network(rule, strict=False):
return True
except ValueError:
pass
elif rule.startswith("."):
if addr is None and (host.endswith(rule) or host == rule[1:]):
return True
else:
if addr is not None:
try:
if addr == ipaddress.ip_address(rule):
return True
except ValueError:
pass
if host == rule:
return True
return False
async def _handle_client(
client_reader: asyncio.StreamReader,
client_writer: asyncio.StreamWriter,
listener: ListenerConfig,
timeout: float,
retries: int,
pool_seq: list[ProxyPool] | None = None,
pool_seq: list[list[ProxyPool]] | None = None,
metrics: Metrics | None = None,
first_hop_pool: FirstHopPool | None = None,
tor_rr: _RoundRobin | None = None,
@@ -118,22 +151,33 @@ async def _handle_client(
target_host, target_port = await read_socks5_address(client_reader)
logger.info("[%s] connect %s:%d", tag, target_host, target_port)
# -- bypass check --
bypass = bool(listener.bypass and _bypass_match(listener.bypass, target_host))
if bypass:
logger.debug("[%s] bypass %s:%d", tag, target_host, target_port)
# -- build chain (with retry) --
attempts = retries if pool_seq else 1
attempts = retries if pool_seq and not bypass else 1
last_err: Exception | None = None
for attempt in range(attempts):
effective_chain = list(listener.chain)
fhp = first_hop_pool
if tor_rr and effective_chain:
node = tor_rr.next()
effective_chain[0] = node
if hop_pools:
fhp = hop_pools.get((node.host, node.port))
if bypass:
effective_chain: list[ChainHop] = []
fhp = None
else:
effective_chain = list(listener.chain)
fhp = first_hop_pool
if tor_rr and effective_chain:
node = tor_rr.next()
effective_chain[0] = node
if hop_pools:
fhp = hop_pools.get((node.host, node.port))
pool_hops: list[tuple[ChainHop, ProxyPool]] = []
if pool_seq:
for pp in pool_seq:
if pool_seq and not bypass:
for candidates in pool_seq:
weights = [max(pp.alive_count, 1) for pp in candidates]
pp = random.choices(candidates, weights=weights)[0]
hop = await pp.get()
if hop:
pool_hops.append((hop, pp))
@@ -295,16 +339,19 @@ async def serve(config: Config) -> None:
await pool.start()
proxy_pools["default"] = pool
def _pools_for(lc: ListenerConfig) -> list[ProxyPool]:
"""Resolve the ordered list of proxy pools for a listener."""
result: list[ProxyPool] = []
for name in lc.pool_seq:
if name not in proxy_pools:
raise RuntimeError(
f"listener {lc.listen_host}:{lc.listen_port} "
f"references unknown pool {name!r}"
)
result.append(proxy_pools[name])
def _pools_for(lc: ListenerConfig) -> list[list[ProxyPool]]:
"""Resolve the ordered list of candidate proxy pools for a listener."""
result: list[list[ProxyPool]] = []
for candidates in lc.pool_seq:
resolved: list[ProxyPool] = []
for name in candidates:
if name not in proxy_pools:
raise RuntimeError(
f"listener {lc.listen_host}:{lc.listen_port} "
f"references unknown pool {name!r}"
)
resolved.append(proxy_pools[name])
result.append(resolved)
return result
# -- per-unique first-hop connection pools --------------------------------
@@ -366,7 +413,7 @@ async def serve(config: Config) -> None:
async def on_client(
r: asyncio.StreamReader, w: asyncio.StreamWriter,
_lc: ListenerConfig = lc, _hp: FirstHopPool | None = hp,
_pools: list[ProxyPool] = lc_pools,
_pools: list[list[ProxyPool]] = lc_pools,
) -> None:
async with sem:
await _handle_client(
@@ -382,14 +429,17 @@ async def serve(config: Config) -> None:
nhops = lc.pool_hops
pool_desc = ""
if nhops:
distinct = list(dict.fromkeys(lc.pool_seq))
if len(distinct) == 1:
all_names = {n for cands in lc.pool_seq for n in cands}
hop_labels = ["|".join(cands) for cands in lc.pool_seq]
if len(all_names) == 1:
name = next(iter(all_names))
pool_desc = f" + {nhops} pool hop{'s' if nhops != 1 else ''}"
if distinct[0] != "default":
pool_desc += f" [{distinct[0]}]"
if name != "default":
pool_desc += f" [{name}]"
else:
pool_desc = f" + pool [{' -> '.join(lc.pool_seq)}]"
logger.info("listener %s chain: %s%s", addr, chain_desc, pool_desc)
pool_desc = f" + pool [{' -> '.join(hop_labels)}]"
bypass_desc = f" bypass: {len(lc.bypass)} rules" if lc.bypass else ""
logger.info("listener %s chain: %s%s%s", addr, chain_desc, pool_desc, bypass_desc)
logger.info("max_connections=%d", config.max_connections)

138
tests/conftest.py Normal file
View File

@@ -0,0 +1,138 @@
"""Shared helpers for integration tests."""
from __future__ import annotations
import asyncio
import socket
import struct
from s5p.proto import encode_address, read_socks5_address
def free_port() -> int:
"""Return an available TCP port."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
# -- echo server -------------------------------------------------------------
async def _echo_handler(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
) -> None:
"""Echo back everything received, then close."""
try:
while True:
data = await reader.read(65536)
if not data:
break
writer.write(data)
await writer.drain()
except (ConnectionError, asyncio.CancelledError):
pass
finally:
writer.close()
await writer.wait_closed()
async def start_echo_server() -> tuple[str, int, asyncio.Server]:
"""Start a TCP echo server. Returns (host, port, server)."""
host = "127.0.0.1"
port = free_port()
srv = await asyncio.start_server(_echo_handler, host, port)
await srv.start_serving()
return host, port, srv
# -- mock SOCKS5 proxy -------------------------------------------------------
async def _mock_socks5_handler(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
) -> None:
"""Minimal SOCKS5 proxy: greeting, CONNECT, relay."""
remote_writer = None
try:
# greeting
header = await reader.readexactly(2)
if header[0] != 0x05:
return
await reader.readexactly(header[1]) # skip methods
writer.write(b"\x05\x00")
await writer.drain()
# connect request
req = await reader.readexactly(3)
if req[0] != 0x05 or req[1] != 0x01:
return
target_host, target_port = await read_socks5_address(reader)
# connect to actual target
try:
remote_reader, remote_writer = await asyncio.wait_for(
asyncio.open_connection(target_host, target_port),
timeout=5.0,
)
except (OSError, TimeoutError):
# connection refused reply
reply = struct.pack("!BBB", 0x05, 0x05, 0x00)
reply += b"\x01\x00\x00\x00\x00\x00\x00"
writer.write(reply)
await writer.drain()
return
# success reply
atyp, addr_bytes = encode_address(target_host)
reply = struct.pack("!BBB", 0x05, 0x00, 0x00)
reply += bytes([atyp]) + addr_bytes + struct.pack("!H", target_port)
writer.write(reply)
await writer.drain()
# relay both directions (close dst on EOF so peer sees shutdown)
async def _fwd(src: asyncio.StreamReader, dst: asyncio.StreamWriter) -> None:
try:
while True:
data = await src.read(65536)
if not data:
break
dst.write(data)
await dst.drain()
except (ConnectionError, asyncio.CancelledError):
pass
finally:
try:
dst.close()
await dst.wait_closed()
except OSError:
pass
await asyncio.gather(
_fwd(reader, remote_writer),
_fwd(remote_reader, writer),
)
except (ConnectionError, asyncio.IncompleteReadError, asyncio.CancelledError):
pass
finally:
if remote_writer:
remote_writer.close()
try:
await remote_writer.wait_closed()
except OSError:
pass
writer.close()
try:
await writer.wait_closed()
except OSError:
pass
async def start_mock_socks5() -> tuple[str, int, asyncio.Server]:
"""Start a mock SOCKS5 proxy. Returns (host, port, server)."""
host = "127.0.0.1"
port = free_port()
srv = await asyncio.start_server(_mock_socks5_handler, host, port)
await srv.start_serving()
return host, port, srv

View File

@@ -132,7 +132,7 @@ class TestHandleStatus:
ListenerConfig(
listen_host="0.0.0.0", listen_port=1081,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_seq=["default"],
pool_seq=[["default"]],
),
],
)
@@ -180,7 +180,7 @@ class TestHandleStatusMultiPool:
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_seq=["clean", "clean"], pool_name="clean",
pool_seq=[["clean"], ["clean"]], pool_name="clean",
),
],
)
@@ -195,13 +195,13 @@ class TestHandleStatusMultiPool:
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_seq=["clean", "mitm"], pool_name="clean",
pool_seq=[["clean"], ["mitm"]], pool_name="clean",
),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_status(ctx)
assert body["listeners"][0]["pool_seq"] == ["clean", "mitm"]
assert body["listeners"][0]["pool_seq"] == [["clean"], ["mitm"]]
assert body["listeners"][0]["pool_hops"] == 2
def test_multi_pool_in_config(self):
@@ -211,13 +211,13 @@ class TestHandleStatusMultiPool:
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_seq=["clean", "mitm"], pool_name="clean",
pool_seq=[["clean"], ["mitm"]], pool_name="clean",
),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert body["listeners"][0]["pool_seq"] == ["clean", "mitm"]
assert body["listeners"][0]["pool_seq"] == [["clean"], ["mitm"]]
class TestHandleMetrics:
@@ -395,7 +395,7 @@ class TestHandleConfig:
listeners=[ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_seq=["clean", "clean"], pool_name="clean",
pool_seq=[["clean"], ["clean"]], pool_name="clean",
)],
)
ctx = _make_ctx(config=config)

View File

@@ -10,6 +10,7 @@ from s5p.config import (
parse_api_proxies,
parse_proxy_url,
)
from s5p.server import _bypass_match
class TestParseProxyUrl:
@@ -411,7 +412,7 @@ class TestPoolSeq:
"""Test per-hop pool references (pool:name syntax)."""
def test_bare_pool_uses_default_name(self, tmp_path):
"""Bare `pool` + `pool: clean` -> pool_seq=["clean"]."""
"""Bare `pool` + `pool: clean` -> pool_seq=[["clean"]]."""
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
@@ -421,10 +422,10 @@ class TestPoolSeq:
" - pool\n"
)
c = load_config(cfg_file)
assert c.listeners[0].pool_seq == ["clean"]
assert c.listeners[0].pool_seq == [["clean"]]
def test_bare_pool_no_pool_name(self, tmp_path):
"""Bare `pool` with no `pool:` key -> pool_seq=["default"]."""
"""Bare `pool` with no `pool:` key -> pool_seq=[["default"]]."""
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
@@ -433,10 +434,10 @@ class TestPoolSeq:
" - pool\n"
)
c = load_config(cfg_file)
assert c.listeners[0].pool_seq == ["default"]
assert c.listeners[0].pool_seq == [["default"]]
def test_pool_colon_name(self, tmp_path):
"""`pool:clean, pool:mitm` -> pool_seq=["clean", "mitm"]."""
"""`pool:clean, pool:mitm` -> pool_seq=[["clean"], ["mitm"]]."""
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
@@ -446,10 +447,10 @@ class TestPoolSeq:
" - pool:mitm\n"
)
c = load_config(cfg_file)
assert c.listeners[0].pool_seq == ["clean", "mitm"]
assert c.listeners[0].pool_seq == [["clean"], ["mitm"]]
def test_mixed_bare_and_named(self, tmp_path):
"""Bare `pool` + `pool:mitm` with `pool: clean` -> ["clean", "mitm"]."""
"""Bare `pool` + `pool:mitm` with `pool: clean` -> [["clean"], ["mitm"]]."""
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
@@ -460,10 +461,10 @@ class TestPoolSeq:
" - pool:mitm\n"
)
c = load_config(cfg_file)
assert c.listeners[0].pool_seq == ["clean", "mitm"]
assert c.listeners[0].pool_seq == [["clean"], ["mitm"]]
def test_pool_colon_case_insensitive_prefix(self, tmp_path):
"""`Pool:MyPool` -> pool_seq=["MyPool"] (prefix case-insensitive)."""
"""`Pool:MyPool` -> pool_seq=[["MyPool"]] (prefix case-insensitive)."""
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
@@ -472,7 +473,7 @@ class TestPoolSeq:
" - Pool:MyPool\n"
)
c = load_config(cfg_file)
assert c.listeners[0].pool_seq == ["MyPool"]
assert c.listeners[0].pool_seq == [["MyPool"]]
def test_pool_colon_empty_is_bare(self, tmp_path):
"""`pool:` (empty name) -> treated as bare pool."""
@@ -485,17 +486,17 @@ class TestPoolSeq:
" - pool:\n"
)
c = load_config(cfg_file)
assert c.listeners[0].pool_seq == ["clean"]
assert c.listeners[0].pool_seq == [["clean"]]
def test_backward_compat_pool_hops_property(self):
"""pool_hops property returns len(pool_seq)."""
lc = ListenerConfig(pool_seq=["clean", "mitm"])
lc = ListenerConfig(pool_seq=[["clean"], ["mitm"]])
assert lc.pool_hops == 2
lc2 = ListenerConfig()
assert lc2.pool_hops == 0
def test_legacy_auto_append(self, tmp_path):
"""Singular `proxy_pool:` -> pool_seq=["default"]."""
"""Singular `proxy_pool:` -> pool_seq=[["default"]]."""
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listen: 0.0.0.0:1080\n"
@@ -507,9 +508,26 @@ class TestPoolSeq:
)
c = load_config(cfg_file)
lc = c.listeners[0]
assert lc.pool_seq == ["default"]
assert lc.pool_seq == [["default"]]
assert lc.pool_hops == 1
def test_list_candidates(self, tmp_path):
"""List in chain -> multi-candidate hop."""
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
" - listen: 1080\n"
" chain:\n"
" - socks5://tor:9050\n"
" - [pool:clean, pool:mitm]\n"
" - [pool:clean, pool:mitm]\n"
)
c = load_config(cfg_file)
lc = c.listeners[0]
assert len(lc.chain) == 1
assert lc.pool_hops == 2
assert lc.pool_seq == [["clean", "mitm"], ["clean", "mitm"]]
class TestListenerBackwardCompat:
"""Test backward-compatible single listener from old format."""
@@ -573,3 +591,79 @@ class TestListenerPoolCompat:
lc = c.listeners[0]
# explicit listeners: no auto pool_hops
assert lc.pool_hops == 0
class TestBypassConfig:
"""Test bypass rules in listener config."""
def test_bypass_from_yaml(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
" - listen: 1080\n"
" bypass:\n"
" - 127.0.0.0/8\n"
" - 192.168.0.0/16\n"
" - localhost\n"
" - .local\n"
" chain:\n"
" - socks5://127.0.0.1:9050\n"
)
c = load_config(cfg_file)
lc = c.listeners[0]
assert lc.bypass == ["127.0.0.0/8", "192.168.0.0/16", "localhost", ".local"]
def test_bypass_empty_default(self):
lc = ListenerConfig()
assert lc.bypass == []
def test_bypass_absent_from_yaml(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
" - listen: 1080\n"
" chain:\n"
" - socks5://127.0.0.1:9050\n"
)
c = load_config(cfg_file)
assert c.listeners[0].bypass == []
class TestBypassMatch:
"""Test _bypass_match function."""
def test_cidr_ipv4(self):
assert _bypass_match(["10.0.0.0/8"], "10.1.2.3") is True
assert _bypass_match(["10.0.0.0/8"], "11.0.0.1") is False
def test_cidr_ipv6(self):
assert _bypass_match(["fc00::/7"], "fd00::1") is True
assert _bypass_match(["fc00::/7"], "2001:db8::1") is False
def test_exact_ip(self):
assert _bypass_match(["127.0.0.1"], "127.0.0.1") is True
assert _bypass_match(["127.0.0.1"], "127.0.0.2") is False
def test_exact_hostname(self):
assert _bypass_match(["localhost"], "localhost") is True
assert _bypass_match(["localhost"], "otherhost") is False
def test_domain_suffix(self):
assert _bypass_match([".local"], "myhost.local") is True
assert _bypass_match([".local"], "local") is True
assert _bypass_match([".local"], "notlocal") is False
assert _bypass_match([".example.com"], "api.example.com") is True
assert _bypass_match([".example.com"], "example.com") is True
def test_multiple_rules(self):
rules = ["10.0.0.0/8", "192.168.0.0/16", ".local"]
assert _bypass_match(rules, "10.1.2.3") is True
assert _bypass_match(rules, "192.168.1.1") is True
assert _bypass_match(rules, "host.local") is True
assert _bypass_match(rules, "8.8.8.8") is False
def test_empty_rules(self):
assert _bypass_match([], "anything") is False
def test_hostname_not_matched_by_cidr(self):
assert _bypass_match(["10.0.0.0/8"], "example.com") is False

285
tests/test_integration.py Normal file
View File

@@ -0,0 +1,285 @@
"""End-to-end integration tests with mock SOCKS5 proxies."""
from __future__ import annotations
import asyncio
import struct
from s5p.config import ChainHop, ListenerConfig
from s5p.proto import encode_address
from s5p.server import _handle_client
from .conftest import free_port, start_echo_server, start_mock_socks5
# -- helpers -----------------------------------------------------------------
async def _socks5_connect(
host: str, port: int, target_host: str, target_port: int,
) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
"""Connect as a SOCKS5 client, perform greeting + CONNECT."""
reader, writer = await asyncio.open_connection(host, port)
# greeting: version 5, 1 method (no-auth)
writer.write(b"\x05\x01\x00")
await writer.drain()
resp = await reader.readexactly(2)
assert resp == b"\x05\x00", f"greeting failed: {resp!r}"
# connect request
atyp, addr_bytes = encode_address(target_host)
writer.write(
struct.pack("!BBB", 0x05, 0x01, 0x00)
+ bytes([atyp])
+ addr_bytes
+ struct.pack("!H", target_port)
)
await writer.drain()
# read reply
rep_header = await reader.readexactly(3)
atyp_resp = (await reader.readexactly(1))[0]
if atyp_resp == 0x01:
await reader.readexactly(4)
elif atyp_resp == 0x03:
length = (await reader.readexactly(1))[0]
await reader.readexactly(length)
elif atyp_resp == 0x04:
await reader.readexactly(16)
await reader.readexactly(2) # port
if rep_header[1] != 0x00:
writer.close()
await writer.wait_closed()
raise ConnectionError(f"SOCKS5 reply={rep_header[1]:#x}")
return reader, writer
async def _close_server(srv: asyncio.Server) -> None:
"""Close a server and wait."""
srv.close()
await srv.wait_closed()
# -- tests -------------------------------------------------------------------
class TestDirectNoChain:
"""Client -> s5p -> echo (empty chain)."""
def test_echo(self):
async def _run():
servers = []
try:
echo_host, echo_port, echo_srv = await start_echo_server()
servers.append(echo_srv)
listener = ListenerConfig(listen_host="127.0.0.1", listen_port=free_port())
s5p_srv = await asyncio.start_server(
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
listener.listen_host, listener.listen_port,
)
servers.append(s5p_srv)
await s5p_srv.start_serving()
reader, writer = await _socks5_connect(
listener.listen_host, listener.listen_port, echo_host, echo_port,
)
writer.write(b"hello direct")
await writer.drain()
data = await asyncio.wait_for(reader.read(4096), timeout=2.0)
assert data == b"hello direct"
writer.close()
await writer.wait_closed()
finally:
for s in servers:
await _close_server(s)
asyncio.run(_run())
class TestSingleHop:
"""Client -> s5p -> mock socks5 -> echo."""
def test_echo_through_one_hop(self):
async def _run():
servers = []
try:
echo_host, echo_port, echo_srv = await start_echo_server()
servers.append(echo_srv)
mock_host, mock_port, mock_srv = await start_mock_socks5()
servers.append(mock_srv)
listener = ListenerConfig(
listen_host="127.0.0.1",
listen_port=free_port(),
chain=[ChainHop(proto="socks5", host=mock_host, port=mock_port)],
)
s5p_srv = await asyncio.start_server(
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
listener.listen_host, listener.listen_port,
)
servers.append(s5p_srv)
await s5p_srv.start_serving()
reader, writer = await _socks5_connect(
listener.listen_host, listener.listen_port, echo_host, echo_port,
)
writer.write(b"hello one hop")
await writer.drain()
data = await asyncio.wait_for(reader.read(4096), timeout=2.0)
assert data == b"hello one hop"
writer.close()
await writer.wait_closed()
finally:
for s in servers:
await _close_server(s)
asyncio.run(_run())
class TestTwoHops:
"""Client -> s5p -> mock1 -> mock2 -> echo."""
def test_echo_through_two_hops(self):
async def _run():
servers = []
try:
echo_host, echo_port, echo_srv = await start_echo_server()
servers.append(echo_srv)
m1_host, m1_port, m1_srv = await start_mock_socks5()
servers.append(m1_srv)
m2_host, m2_port, m2_srv = await start_mock_socks5()
servers.append(m2_srv)
listener = ListenerConfig(
listen_host="127.0.0.1",
listen_port=free_port(),
chain=[
ChainHop(proto="socks5", host=m1_host, port=m1_port),
ChainHop(proto="socks5", host=m2_host, port=m2_port),
],
)
s5p_srv = await asyncio.start_server(
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
listener.listen_host, listener.listen_port,
)
servers.append(s5p_srv)
await s5p_srv.start_serving()
reader, writer = await _socks5_connect(
listener.listen_host, listener.listen_port, echo_host, echo_port,
)
writer.write(b"hello two hops")
await writer.drain()
data = await asyncio.wait_for(reader.read(4096), timeout=2.0)
assert data == b"hello two hops"
writer.close()
await writer.wait_closed()
finally:
for s in servers:
await _close_server(s)
asyncio.run(_run())
class TestConnectionRefused:
"""Dead hop returns SOCKS5 error to client."""
def test_refused(self):
async def _run():
servers = []
try:
# use a port with nothing listening
dead_port = free_port()
listener = ListenerConfig(
listen_host="127.0.0.1",
listen_port=free_port(),
chain=[ChainHop(proto="socks5", host="127.0.0.1", port=dead_port)],
)
s5p_srv = await asyncio.start_server(
lambda r, w: _handle_client(r, w, listener, timeout=3.0, retries=1),
listener.listen_host, listener.listen_port,
)
servers.append(s5p_srv)
await s5p_srv.start_serving()
reader, writer = await asyncio.open_connection(
listener.listen_host, listener.listen_port,
)
# greeting
writer.write(b"\x05\x01\x00")
await writer.drain()
resp = await reader.readexactly(2)
assert resp == b"\x05\x00"
# connect to a dummy target
atyp, addr_bytes = encode_address("127.0.0.1")
writer.write(
struct.pack("!BBB", 0x05, 0x01, 0x00)
+ bytes([atyp])
+ addr_bytes
+ struct.pack("!H", 9999)
)
await writer.drain()
# should get error reply (non-zero rep field)
rep = await asyncio.wait_for(reader.read(4096), timeout=5.0)
assert len(rep) >= 3
assert rep[1] != 0x00, "expected non-zero SOCKS5 reply code"
writer.close()
await writer.wait_closed()
finally:
for s in servers:
await _close_server(s)
asyncio.run(_run())
class TestBypassDirectConnect:
"""Target matches bypass rule -> chain skipped, direct connect to echo."""
def test_bypass_skips_chain(self):
async def _run():
servers = []
try:
echo_host, echo_port, echo_srv = await start_echo_server()
servers.append(echo_srv)
# dead hop -- would fail if bypass didn't skip it
dead_port = free_port()
listener = ListenerConfig(
listen_host="127.0.0.1",
listen_port=free_port(),
chain=[ChainHop(proto="socks5", host="127.0.0.1", port=dead_port)],
bypass=["127.0.0.0/8"],
)
s5p_srv = await asyncio.start_server(
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
listener.listen_host, listener.listen_port,
)
servers.append(s5p_srv)
await s5p_srv.start_serving()
reader, writer = await _socks5_connect(
listener.listen_host, listener.listen_port, echo_host, echo_port,
)
writer.write(b"hello bypass")
await writer.drain()
data = await asyncio.wait_for(reader.read(4096), timeout=2.0)
assert data == b"hello bypass"
writer.close()
await writer.wait_closed()
finally:
for s in servers:
await _close_server(s)
asyncio.run(_run())