diff --git a/README.md b/README.md index f6eb5e7..9cb6980 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,8 @@ proxy_pools: max_fails: 3 # Multi-listener: each port gets a chain depth and pool assignment -# Use "pool" for listener default, "pool:name" for explicit pool per hop +# Use "pool" for listener default, "pool:name" for explicit pool per hop, +# or [pool:a, pool:b] for random choice from candidates per connection listeners: - listen: 0.0.0.0:1080 pool: clean @@ -102,11 +103,10 @@ listeners: - pool # Tor + 2 clean proxies - pool - listen: 0.0.0.0:1081 - pool: clean chain: - socks5://127.0.0.1:9050 - - pool:clean # per-hop: clean pool - - pool:mitm # per-hop: mitm pool + - [pool:clean, pool:mitm] # random choice per connection + - [pool:clean, pool:mitm] # independent random choice - listen: 0.0.0.0:1082 chain: - socks5://127.0.0.1:9050 # Tor only @@ -145,9 +145,9 @@ Options: ## How Chaining Works ``` -:1080 Client -> s5p -> Tor -> [clean] -> [clean] -> Dest (2 clean hops) -:1081 Client -> s5p -> Tor -> [clean] -> [mitm] -> Dest (mixed pools) -:1082 Client -> s5p -> Tor -> Dest (Tor only) +:1080 Client -> s5p -> Tor -> [clean] -> [clean] -> Dest (2 clean hops) +:1081 Client -> s5p -> Tor -> [clean|mitm] -> [clean|mitm] -> Dest (random) +:1082 Client -> s5p -> Tor -> Dest (Tor only) ``` s5p connects to Hop1 via TCP, negotiates the hop protocol (SOCKS5/4/HTTP), diff --git a/config/example.yaml b/config/example.yaml index 34d7fa6..76bf175 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -91,13 +91,24 @@ chain: # at that hop position. Bare "pool" uses the listener's "pool:" default. # This lets a single listener mix pools in one chain. # +# Multi-candidate hops: use a YAML list to randomly pick from a set of +# pools at each hop. On each connection, one pool is chosen per hop. +# # listeners: # - listen: 0.0.0.0:1080 # pool: clean # default for bare "pool" +# bypass: # skip chain for these destinations +# - 127.0.0.0/8 # loopback +# - 10.0.0.0/8 # RFC 1918 +# - 192.168.0.0/16 # RFC 1918 +# - 172.16.0.0/12 # RFC 1918 +# - fc00::/7 # IPv6 ULA +# - localhost # exact hostname +# - .local # domain suffix # chain: # - socks5://127.0.0.1:9050 -# - pool:clean # explicit: from clean pool -# - pool:mitm # explicit: from mitm pool +# - [pool:clean, pool:mitm] # random choice per connection +# - [pool:clean, pool:mitm] # independent random choice # # - listen: 0.0.0.0:1081 # pool: clean diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index 9b8f298..27a7d07 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -58,14 +58,37 @@ listeners: chain: - socks5://127.0.0.1:9050 # Tor only - listen: 0.0.0.0:1083 - pool: clean chain: - socks5://127.0.0.1:9050 - - pool # bare = clean (default) - - pool:mitm # explicit = mitm + - [pool:clean, pool:mitm] # random choice per connection + - [pool:clean, pool:mitm] # independent random choice ``` -Per-hop pool: `pool` = listener default, `pool:name` = explicit pool. +Per-hop pool: `pool` = listener default, `pool:name` = explicit pool, +`[pool:a, pool:b]` = random choice from candidates. + +## Bypass Rules (config) + +```yaml +listeners: + - listen: 0.0.0.0:1080 + bypass: + - 127.0.0.0/8 # CIDR + - 10.0.0.0/8 # CIDR + - 192.168.0.0/16 # CIDR + - localhost # exact hostname + - .local # domain suffix + chain: + - socks5://127.0.0.1:9050 + - pool +``` + +| Pattern | Type | Matches | +|---------|------|---------| +| `10.0.0.0/8` | CIDR | IPs in network | +| `127.0.0.1` | Exact IP | That IP only | +| `localhost` | Exact host | String equal | +| `.local` | Suffix | `*.local` and `local` | ## Multi-Tor Round-Robin (config) diff --git a/docs/USAGE.md b/docs/USAGE.md index 469d2d9..a93b6d2 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -206,10 +206,28 @@ listeners: | `pool:name` | Named pool `name` (case-sensitive) | | `pool:` | Same as bare `pool` (empty name = default) | | `Pool:name` | Prefix is case-insensitive; name is case-sensitive | +| `[pool:a, pool:b]` | Random choice from candidates `a` or `b` per connection | The `pool` keyword in a chain means "append a random alive proxy from the assigned pool". Multiple `pool` entries = multiple pool hops (deeper chaining). +### Multi-candidate pool hops + +Use a YAML list to randomly pick from a set of candidate pools at each hop. +On each connection, one candidate is chosen at random per hop (independently). + +```yaml +listeners: + - listen: 0.0.0.0:1080 + chain: + - socks5://10.200.1.13:9050 + - [pool:clean, pool:mitm] # hop 1: random choice + - [pool:clean, pool:mitm] # hop 2: random choice +``` + +Single-element pool references (`pool`, `pool:name`) and multi-candidate +lists can be mixed freely in the same chain. All existing syntax is unchanged. + When `pool:` is omitted on a listener with pool hops, it defaults to `"default"`. A listener referencing an unknown pool name causes a fatal error at startup. Listeners without pool hops ignore the `pool:` key. @@ -224,6 +242,41 @@ error at startup. Listeners without pool hops ignore the `pool:` key. | FirstHopPool | per unique first hop | Listeners with same first hop share it | | Chain + pool_hops | per listener | Each listener has its own chain depth | +## Bypass Rules + +Per-listener rules to skip the chain for specific destinations. When a target +matches a bypass rule, s5p connects directly (no chain, no pool hops). + +```yaml +listeners: + - listen: 0.0.0.0:1080 + bypass: + - 127.0.0.0/8 # CIDR: loopback + - 10.0.0.0/8 # CIDR: RFC 1918 + - 192.168.0.0/16 # CIDR: RFC 1918 + - fc00::/7 # CIDR: IPv6 ULA + - localhost # exact hostname + - .local # domain suffix (matches *.local and local) + chain: + - socks5://127.0.0.1:9050 + - pool +``` + +### Rule syntax + +| Pattern | Type | Matches | +|---------|------|---------| +| `10.0.0.0/8` | CIDR | Any IP in the network | +| `127.0.0.1` | Exact IP | That IP only | +| `localhost` | Exact hostname | String-equal match | +| `.local` | Domain suffix | `*.local` and `local` itself | + +CIDR rules only match IP addresses, not hostnames. Domain suffix rules only +match hostnames, not IPs. Exact rules match both (string compare for hostnames, +parsed compare for IPs). + +When bypass is active, retries are disabled (direct connections are not retried). + ### Backward compatibility When no `listeners:` key is present, the old `listen`/`chain` format creates diff --git a/src/s5p/api.py b/src/s5p/api.py index 3001a84..12abca2 100644 --- a/src/s5p/api.py +++ b/src/s5p/api.py @@ -88,7 +88,7 @@ def _handle_status(ctx: dict) -> tuple[int, dict]: "chain": [str(h) for h in lc.chain], "pool_hops": lc.pool_hops, **({"pool": lc.pool_name} if lc.pool_name else {}), - **({"pool_seq": lc.pool_seq} if len(set(lc.pool_seq)) > 1 else {}), + **({"pool_seq": lc.pool_seq} if len({n for c in lc.pool_seq for n in c}) > 1 else {}), "latency": metrics.get_listener_latency( f"{lc.listen_host}:{lc.listen_port}" ).stats(), @@ -167,7 +167,7 @@ def _handle_config(ctx: dict) -> tuple[int, dict]: "chain": [str(h) for h in lc.chain], "pool_hops": lc.pool_hops, **({"pool": lc.pool_name} if lc.pool_name else {}), - **({"pool_seq": lc.pool_seq} if len(set(lc.pool_seq)) > 1 else {}), + **({"pool_seq": lc.pool_seq} if len({n for c in lc.pool_seq for n in c}) > 1 else {}), } for lc in config.listeners ], diff --git a/src/s5p/config.py b/src/s5p/config.py index 1fad28d..f457a5a 100644 --- a/src/s5p/config.py +++ b/src/s5p/config.py @@ -85,8 +85,9 @@ class ListenerConfig: listen_host: str = "127.0.0.1" listen_port: int = 1080 chain: list[ChainHop] = field(default_factory=list) - pool_seq: list[str] = field(default_factory=list) + pool_seq: list[list[str]] = field(default_factory=list) pool_name: str = "" + bypass: list[str] = field(default_factory=list) @property def pool_hops(self) -> int: @@ -193,6 +194,21 @@ def _parse_pool_config(pool_raw: dict) -> ProxyPoolConfig: return ProxyPoolConfig(**kwargs) +def _parse_pool_ref(item: str, default: str) -> str: + """Resolve a pool reference string to a pool name. + + ``pool`` or ``pool:`` -> *default*; ``pool:name`` -> ``name``. + The ``pool`` prefix is matched case-insensitively. + """ + lower = item.lower() + if lower == "pool" or lower == "pool:": + return default + if lower.startswith("pool:"): + _, _, name = item.partition(":") + return name if name else default + raise ValueError(f"not a pool reference: {item!r}") + + def load_config(path: str | Path) -> Config: """Load configuration from a YAML file.""" path = Path(path) @@ -307,15 +323,17 @@ def load_config(path: str | Path) -> Config: lc.listen_port = int(port_str) elif isinstance(listen, (str, int)) and listen: lc.listen_port = int(listen) + if "bypass" in entry: + lc.bypass = list(entry["bypass"]) if "pool" in entry: lc.pool_name = entry["pool"] + default_pool = lc.pool_name or "default" chain_raw = entry.get("chain", []) for item in chain_raw: if isinstance(item, str): lower = item.lower() if lower == "pool" or lower.startswith("pool:"): - _, _, name = item.partition(":") - lc.pool_seq.append(name if name else (lc.pool_name or "default")) + lc.pool_seq.append([_parse_pool_ref(item, default_pool)]) else: lc.chain.append(parse_proxy_url(item)) elif isinstance(item, dict): @@ -323,7 +341,7 @@ def load_config(path: str | Path) -> Config: pool_key = next((k for k in item if k.lower() == "pool"), None) if pool_key is not None and len(item) == 1: name = item[pool_key] - lc.pool_seq.append(name if name else (lc.pool_name or "default")) + lc.pool_seq.append([name if name else default_pool]) else: lc.chain.append( ChainHop( @@ -334,6 +352,10 @@ def load_config(path: str | Path) -> Config: password=item.get("password"), ) ) + elif isinstance(item, list): + # multi-candidate hop: [pool:clean, pool:mitm] + candidates = [_parse_pool_ref(str(el), default_pool) for el in item] + lc.pool_seq.append(candidates) config.listeners.append(lc) else: # backward compat: build single listener from top-level fields @@ -344,7 +366,7 @@ def load_config(path: str | Path) -> Config: ) # legacy behavior: if proxy_pool configured, auto-append 1 pool hop if config.proxy_pool and config.proxy_pool.sources: - lc.pool_seq = ["default"] + lc.pool_seq = [["default"]] config.listeners.append(lc) return config diff --git a/src/s5p/server.py b/src/s5p/server.py index 14fcbef..5fb152e 100644 --- a/src/s5p/server.py +++ b/src/s5p/server.py @@ -3,7 +3,9 @@ from __future__ import annotations import asyncio +import ipaddress import logging +import random import signal import struct import time @@ -70,13 +72,44 @@ def _socks5_reply(rep: int) -> bytes: return struct.pack("!BBB", 0x05, rep, 0x00) + b"\x01\x00\x00\x00\x00\x00\x00" +def _bypass_match(rules: list[str], host: str) -> bool: + """Check if host matches any bypass rule (CIDR, suffix, or exact).""" + addr = None + try: + addr = ipaddress.ip_address(host) + except ValueError: + pass + + for rule in rules: + if "/" in rule: + if addr is not None: + try: + if addr in ipaddress.ip_network(rule, strict=False): + return True + except ValueError: + pass + elif rule.startswith("."): + if addr is None and (host.endswith(rule) or host == rule[1:]): + return True + else: + if addr is not None: + try: + if addr == ipaddress.ip_address(rule): + return True + except ValueError: + pass + if host == rule: + return True + return False + + async def _handle_client( client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter, listener: ListenerConfig, timeout: float, retries: int, - pool_seq: list[ProxyPool] | None = None, + pool_seq: list[list[ProxyPool]] | None = None, metrics: Metrics | None = None, first_hop_pool: FirstHopPool | None = None, tor_rr: _RoundRobin | None = None, @@ -118,22 +151,33 @@ async def _handle_client( target_host, target_port = await read_socks5_address(client_reader) logger.info("[%s] connect %s:%d", tag, target_host, target_port) + # -- bypass check -- + bypass = bool(listener.bypass and _bypass_match(listener.bypass, target_host)) + if bypass: + logger.debug("[%s] bypass %s:%d", tag, target_host, target_port) + # -- build chain (with retry) -- - attempts = retries if pool_seq else 1 + attempts = retries if pool_seq and not bypass else 1 last_err: Exception | None = None for attempt in range(attempts): - effective_chain = list(listener.chain) - fhp = first_hop_pool - if tor_rr and effective_chain: - node = tor_rr.next() - effective_chain[0] = node - if hop_pools: - fhp = hop_pools.get((node.host, node.port)) + if bypass: + effective_chain: list[ChainHop] = [] + fhp = None + else: + effective_chain = list(listener.chain) + fhp = first_hop_pool + if tor_rr and effective_chain: + node = tor_rr.next() + effective_chain[0] = node + if hop_pools: + fhp = hop_pools.get((node.host, node.port)) pool_hops: list[tuple[ChainHop, ProxyPool]] = [] - if pool_seq: - for pp in pool_seq: + if pool_seq and not bypass: + for candidates in pool_seq: + weights = [max(pp.alive_count, 1) for pp in candidates] + pp = random.choices(candidates, weights=weights)[0] hop = await pp.get() if hop: pool_hops.append((hop, pp)) @@ -295,16 +339,19 @@ async def serve(config: Config) -> None: await pool.start() proxy_pools["default"] = pool - def _pools_for(lc: ListenerConfig) -> list[ProxyPool]: - """Resolve the ordered list of proxy pools for a listener.""" - result: list[ProxyPool] = [] - for name in lc.pool_seq: - if name not in proxy_pools: - raise RuntimeError( - f"listener {lc.listen_host}:{lc.listen_port} " - f"references unknown pool {name!r}" - ) - result.append(proxy_pools[name]) + def _pools_for(lc: ListenerConfig) -> list[list[ProxyPool]]: + """Resolve the ordered list of candidate proxy pools for a listener.""" + result: list[list[ProxyPool]] = [] + for candidates in lc.pool_seq: + resolved: list[ProxyPool] = [] + for name in candidates: + if name not in proxy_pools: + raise RuntimeError( + f"listener {lc.listen_host}:{lc.listen_port} " + f"references unknown pool {name!r}" + ) + resolved.append(proxy_pools[name]) + result.append(resolved) return result # -- per-unique first-hop connection pools -------------------------------- @@ -366,7 +413,7 @@ async def serve(config: Config) -> None: async def on_client( r: asyncio.StreamReader, w: asyncio.StreamWriter, _lc: ListenerConfig = lc, _hp: FirstHopPool | None = hp, - _pools: list[ProxyPool] = lc_pools, + _pools: list[list[ProxyPool]] = lc_pools, ) -> None: async with sem: await _handle_client( @@ -382,14 +429,17 @@ async def serve(config: Config) -> None: nhops = lc.pool_hops pool_desc = "" if nhops: - distinct = list(dict.fromkeys(lc.pool_seq)) - if len(distinct) == 1: + all_names = {n for cands in lc.pool_seq for n in cands} + hop_labels = ["|".join(cands) for cands in lc.pool_seq] + if len(all_names) == 1: + name = next(iter(all_names)) pool_desc = f" + {nhops} pool hop{'s' if nhops != 1 else ''}" - if distinct[0] != "default": - pool_desc += f" [{distinct[0]}]" + if name != "default": + pool_desc += f" [{name}]" else: - pool_desc = f" + pool [{' -> '.join(lc.pool_seq)}]" - logger.info("listener %s chain: %s%s", addr, chain_desc, pool_desc) + pool_desc = f" + pool [{' -> '.join(hop_labels)}]" + bypass_desc = f" bypass: {len(lc.bypass)} rules" if lc.bypass else "" + logger.info("listener %s chain: %s%s%s", addr, chain_desc, pool_desc, bypass_desc) logger.info("max_connections=%d", config.max_connections) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..8128501 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,138 @@ +"""Shared helpers for integration tests.""" + +from __future__ import annotations + +import asyncio +import socket +import struct + +from s5p.proto import encode_address, read_socks5_address + + +def free_port() -> int: + """Return an available TCP port.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +# -- echo server ------------------------------------------------------------- + + +async def _echo_handler( + reader: asyncio.StreamReader, writer: asyncio.StreamWriter, +) -> None: + """Echo back everything received, then close.""" + try: + while True: + data = await reader.read(65536) + if not data: + break + writer.write(data) + await writer.drain() + except (ConnectionError, asyncio.CancelledError): + pass + finally: + writer.close() + await writer.wait_closed() + + +async def start_echo_server() -> tuple[str, int, asyncio.Server]: + """Start a TCP echo server. Returns (host, port, server).""" + host = "127.0.0.1" + port = free_port() + srv = await asyncio.start_server(_echo_handler, host, port) + await srv.start_serving() + return host, port, srv + + +# -- mock SOCKS5 proxy ------------------------------------------------------- + + +async def _mock_socks5_handler( + reader: asyncio.StreamReader, writer: asyncio.StreamWriter, +) -> None: + """Minimal SOCKS5 proxy: greeting, CONNECT, relay.""" + remote_writer = None + try: + # greeting + header = await reader.readexactly(2) + if header[0] != 0x05: + return + await reader.readexactly(header[1]) # skip methods + writer.write(b"\x05\x00") + await writer.drain() + + # connect request + req = await reader.readexactly(3) + if req[0] != 0x05 or req[1] != 0x01: + return + + target_host, target_port = await read_socks5_address(reader) + + # connect to actual target + try: + remote_reader, remote_writer = await asyncio.wait_for( + asyncio.open_connection(target_host, target_port), + timeout=5.0, + ) + except (OSError, TimeoutError): + # connection refused reply + reply = struct.pack("!BBB", 0x05, 0x05, 0x00) + reply += b"\x01\x00\x00\x00\x00\x00\x00" + writer.write(reply) + await writer.drain() + return + + # success reply + atyp, addr_bytes = encode_address(target_host) + reply = struct.pack("!BBB", 0x05, 0x00, 0x00) + reply += bytes([atyp]) + addr_bytes + struct.pack("!H", target_port) + writer.write(reply) + await writer.drain() + + # relay both directions (close dst on EOF so peer sees shutdown) + async def _fwd(src: asyncio.StreamReader, dst: asyncio.StreamWriter) -> None: + try: + while True: + data = await src.read(65536) + if not data: + break + dst.write(data) + await dst.drain() + except (ConnectionError, asyncio.CancelledError): + pass + finally: + try: + dst.close() + await dst.wait_closed() + except OSError: + pass + + await asyncio.gather( + _fwd(reader, remote_writer), + _fwd(remote_reader, writer), + ) + except (ConnectionError, asyncio.IncompleteReadError, asyncio.CancelledError): + pass + finally: + if remote_writer: + remote_writer.close() + try: + await remote_writer.wait_closed() + except OSError: + pass + writer.close() + try: + await writer.wait_closed() + except OSError: + pass + + +async def start_mock_socks5() -> tuple[str, int, asyncio.Server]: + """Start a mock SOCKS5 proxy. Returns (host, port, server).""" + host = "127.0.0.1" + port = free_port() + srv = await asyncio.start_server(_mock_socks5_handler, host, port) + await srv.start_serving() + return host, port, srv diff --git a/tests/test_api.py b/tests/test_api.py index 0e121b8..9ff38fd 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -132,7 +132,7 @@ class TestHandleStatus: ListenerConfig( listen_host="0.0.0.0", listen_port=1081, chain=[ChainHop("socks5", "127.0.0.1", 9050)], - pool_seq=["default"], + pool_seq=[["default"]], ), ], ) @@ -180,7 +180,7 @@ class TestHandleStatusMultiPool: ListenerConfig( listen_host="0.0.0.0", listen_port=1080, chain=[ChainHop("socks5", "127.0.0.1", 9050)], - pool_seq=["clean", "clean"], pool_name="clean", + pool_seq=[["clean"], ["clean"]], pool_name="clean", ), ], ) @@ -195,13 +195,13 @@ class TestHandleStatusMultiPool: ListenerConfig( listen_host="0.0.0.0", listen_port=1080, chain=[ChainHop("socks5", "127.0.0.1", 9050)], - pool_seq=["clean", "mitm"], pool_name="clean", + pool_seq=[["clean"], ["mitm"]], pool_name="clean", ), ], ) ctx = _make_ctx(config=config) _, body = _handle_status(ctx) - assert body["listeners"][0]["pool_seq"] == ["clean", "mitm"] + assert body["listeners"][0]["pool_seq"] == [["clean"], ["mitm"]] assert body["listeners"][0]["pool_hops"] == 2 def test_multi_pool_in_config(self): @@ -211,13 +211,13 @@ class TestHandleStatusMultiPool: ListenerConfig( listen_host="0.0.0.0", listen_port=1080, chain=[ChainHop("socks5", "127.0.0.1", 9050)], - pool_seq=["clean", "mitm"], pool_name="clean", + pool_seq=[["clean"], ["mitm"]], pool_name="clean", ), ], ) ctx = _make_ctx(config=config) _, body = _handle_config(ctx) - assert body["listeners"][0]["pool_seq"] == ["clean", "mitm"] + assert body["listeners"][0]["pool_seq"] == [["clean"], ["mitm"]] class TestHandleMetrics: @@ -395,7 +395,7 @@ class TestHandleConfig: listeners=[ListenerConfig( listen_host="0.0.0.0", listen_port=1080, chain=[ChainHop("socks5", "127.0.0.1", 9050)], - pool_seq=["clean", "clean"], pool_name="clean", + pool_seq=[["clean"], ["clean"]], pool_name="clean", )], ) ctx = _make_ctx(config=config) diff --git a/tests/test_config.py b/tests/test_config.py index eccff43..534df67 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -10,6 +10,7 @@ from s5p.config import ( parse_api_proxies, parse_proxy_url, ) +from s5p.server import _bypass_match class TestParseProxyUrl: @@ -411,7 +412,7 @@ class TestPoolSeq: """Test per-hop pool references (pool:name syntax).""" def test_bare_pool_uses_default_name(self, tmp_path): - """Bare `pool` + `pool: clean` -> pool_seq=["clean"].""" + """Bare `pool` + `pool: clean` -> pool_seq=[["clean"]].""" cfg_file = tmp_path / "test.yaml" cfg_file.write_text( "listeners:\n" @@ -421,10 +422,10 @@ class TestPoolSeq: " - pool\n" ) c = load_config(cfg_file) - assert c.listeners[0].pool_seq == ["clean"] + assert c.listeners[0].pool_seq == [["clean"]] def test_bare_pool_no_pool_name(self, tmp_path): - """Bare `pool` with no `pool:` key -> pool_seq=["default"].""" + """Bare `pool` with no `pool:` key -> pool_seq=[["default"]].""" cfg_file = tmp_path / "test.yaml" cfg_file.write_text( "listeners:\n" @@ -433,10 +434,10 @@ class TestPoolSeq: " - pool\n" ) c = load_config(cfg_file) - assert c.listeners[0].pool_seq == ["default"] + assert c.listeners[0].pool_seq == [["default"]] def test_pool_colon_name(self, tmp_path): - """`pool:clean, pool:mitm` -> pool_seq=["clean", "mitm"].""" + """`pool:clean, pool:mitm` -> pool_seq=[["clean"], ["mitm"]].""" cfg_file = tmp_path / "test.yaml" cfg_file.write_text( "listeners:\n" @@ -446,10 +447,10 @@ class TestPoolSeq: " - pool:mitm\n" ) c = load_config(cfg_file) - assert c.listeners[0].pool_seq == ["clean", "mitm"] + assert c.listeners[0].pool_seq == [["clean"], ["mitm"]] def test_mixed_bare_and_named(self, tmp_path): - """Bare `pool` + `pool:mitm` with `pool: clean` -> ["clean", "mitm"].""" + """Bare `pool` + `pool:mitm` with `pool: clean` -> [["clean"], ["mitm"]].""" cfg_file = tmp_path / "test.yaml" cfg_file.write_text( "listeners:\n" @@ -460,10 +461,10 @@ class TestPoolSeq: " - pool:mitm\n" ) c = load_config(cfg_file) - assert c.listeners[0].pool_seq == ["clean", "mitm"] + assert c.listeners[0].pool_seq == [["clean"], ["mitm"]] def test_pool_colon_case_insensitive_prefix(self, tmp_path): - """`Pool:MyPool` -> pool_seq=["MyPool"] (prefix case-insensitive).""" + """`Pool:MyPool` -> pool_seq=[["MyPool"]] (prefix case-insensitive).""" cfg_file = tmp_path / "test.yaml" cfg_file.write_text( "listeners:\n" @@ -472,7 +473,7 @@ class TestPoolSeq: " - Pool:MyPool\n" ) c = load_config(cfg_file) - assert c.listeners[0].pool_seq == ["MyPool"] + assert c.listeners[0].pool_seq == [["MyPool"]] def test_pool_colon_empty_is_bare(self, tmp_path): """`pool:` (empty name) -> treated as bare pool.""" @@ -485,17 +486,17 @@ class TestPoolSeq: " - pool:\n" ) c = load_config(cfg_file) - assert c.listeners[0].pool_seq == ["clean"] + assert c.listeners[0].pool_seq == [["clean"]] def test_backward_compat_pool_hops_property(self): """pool_hops property returns len(pool_seq).""" - lc = ListenerConfig(pool_seq=["clean", "mitm"]) + lc = ListenerConfig(pool_seq=[["clean"], ["mitm"]]) assert lc.pool_hops == 2 lc2 = ListenerConfig() assert lc2.pool_hops == 0 def test_legacy_auto_append(self, tmp_path): - """Singular `proxy_pool:` -> pool_seq=["default"].""" + """Singular `proxy_pool:` -> pool_seq=[["default"]].""" cfg_file = tmp_path / "test.yaml" cfg_file.write_text( "listen: 0.0.0.0:1080\n" @@ -507,9 +508,26 @@ class TestPoolSeq: ) c = load_config(cfg_file) lc = c.listeners[0] - assert lc.pool_seq == ["default"] + assert lc.pool_seq == [["default"]] assert lc.pool_hops == 1 + def test_list_candidates(self, tmp_path): + """List in chain -> multi-candidate hop.""" + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "listeners:\n" + " - listen: 1080\n" + " chain:\n" + " - socks5://tor:9050\n" + " - [pool:clean, pool:mitm]\n" + " - [pool:clean, pool:mitm]\n" + ) + c = load_config(cfg_file) + lc = c.listeners[0] + assert len(lc.chain) == 1 + assert lc.pool_hops == 2 + assert lc.pool_seq == [["clean", "mitm"], ["clean", "mitm"]] + class TestListenerBackwardCompat: """Test backward-compatible single listener from old format.""" @@ -573,3 +591,79 @@ class TestListenerPoolCompat: lc = c.listeners[0] # explicit listeners: no auto pool_hops assert lc.pool_hops == 0 + + +class TestBypassConfig: + """Test bypass rules in listener config.""" + + def test_bypass_from_yaml(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "listeners:\n" + " - listen: 1080\n" + " bypass:\n" + " - 127.0.0.0/8\n" + " - 192.168.0.0/16\n" + " - localhost\n" + " - .local\n" + " chain:\n" + " - socks5://127.0.0.1:9050\n" + ) + c = load_config(cfg_file) + lc = c.listeners[0] + assert lc.bypass == ["127.0.0.0/8", "192.168.0.0/16", "localhost", ".local"] + + def test_bypass_empty_default(self): + lc = ListenerConfig() + assert lc.bypass == [] + + def test_bypass_absent_from_yaml(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "listeners:\n" + " - listen: 1080\n" + " chain:\n" + " - socks5://127.0.0.1:9050\n" + ) + c = load_config(cfg_file) + assert c.listeners[0].bypass == [] + + +class TestBypassMatch: + """Test _bypass_match function.""" + + def test_cidr_ipv4(self): + assert _bypass_match(["10.0.0.0/8"], "10.1.2.3") is True + assert _bypass_match(["10.0.0.0/8"], "11.0.0.1") is False + + def test_cidr_ipv6(self): + assert _bypass_match(["fc00::/7"], "fd00::1") is True + assert _bypass_match(["fc00::/7"], "2001:db8::1") is False + + def test_exact_ip(self): + assert _bypass_match(["127.0.0.1"], "127.0.0.1") is True + assert _bypass_match(["127.0.0.1"], "127.0.0.2") is False + + def test_exact_hostname(self): + assert _bypass_match(["localhost"], "localhost") is True + assert _bypass_match(["localhost"], "otherhost") is False + + def test_domain_suffix(self): + assert _bypass_match([".local"], "myhost.local") is True + assert _bypass_match([".local"], "local") is True + assert _bypass_match([".local"], "notlocal") is False + assert _bypass_match([".example.com"], "api.example.com") is True + assert _bypass_match([".example.com"], "example.com") is True + + def test_multiple_rules(self): + rules = ["10.0.0.0/8", "192.168.0.0/16", ".local"] + assert _bypass_match(rules, "10.1.2.3") is True + assert _bypass_match(rules, "192.168.1.1") is True + assert _bypass_match(rules, "host.local") is True + assert _bypass_match(rules, "8.8.8.8") is False + + def test_empty_rules(self): + assert _bypass_match([], "anything") is False + + def test_hostname_not_matched_by_cidr(self): + assert _bypass_match(["10.0.0.0/8"], "example.com") is False diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..23a757f --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,285 @@ +"""End-to-end integration tests with mock SOCKS5 proxies.""" + +from __future__ import annotations + +import asyncio +import struct + +from s5p.config import ChainHop, ListenerConfig +from s5p.proto import encode_address +from s5p.server import _handle_client + +from .conftest import free_port, start_echo_server, start_mock_socks5 + +# -- helpers ----------------------------------------------------------------- + + +async def _socks5_connect( + host: str, port: int, target_host: str, target_port: int, +) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: + """Connect as a SOCKS5 client, perform greeting + CONNECT.""" + reader, writer = await asyncio.open_connection(host, port) + + # greeting: version 5, 1 method (no-auth) + writer.write(b"\x05\x01\x00") + await writer.drain() + resp = await reader.readexactly(2) + assert resp == b"\x05\x00", f"greeting failed: {resp!r}" + + # connect request + atyp, addr_bytes = encode_address(target_host) + writer.write( + struct.pack("!BBB", 0x05, 0x01, 0x00) + + bytes([atyp]) + + addr_bytes + + struct.pack("!H", target_port) + ) + await writer.drain() + + # read reply + rep_header = await reader.readexactly(3) + atyp_resp = (await reader.readexactly(1))[0] + if atyp_resp == 0x01: + await reader.readexactly(4) + elif atyp_resp == 0x03: + length = (await reader.readexactly(1))[0] + await reader.readexactly(length) + elif atyp_resp == 0x04: + await reader.readexactly(16) + await reader.readexactly(2) # port + + if rep_header[1] != 0x00: + writer.close() + await writer.wait_closed() + raise ConnectionError(f"SOCKS5 reply={rep_header[1]:#x}") + + return reader, writer + + +async def _close_server(srv: asyncio.Server) -> None: + """Close a server and wait.""" + srv.close() + await srv.wait_closed() + + +# -- tests ------------------------------------------------------------------- + + +class TestDirectNoChain: + """Client -> s5p -> echo (empty chain).""" + + def test_echo(self): + async def _run(): + servers = [] + try: + echo_host, echo_port, echo_srv = await start_echo_server() + servers.append(echo_srv) + + listener = ListenerConfig(listen_host="127.0.0.1", listen_port=free_port()) + s5p_srv = await asyncio.start_server( + lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1), + listener.listen_host, listener.listen_port, + ) + servers.append(s5p_srv) + await s5p_srv.start_serving() + + reader, writer = await _socks5_connect( + listener.listen_host, listener.listen_port, echo_host, echo_port, + ) + writer.write(b"hello direct") + await writer.drain() + data = await asyncio.wait_for(reader.read(4096), timeout=2.0) + assert data == b"hello direct" + + writer.close() + await writer.wait_closed() + finally: + for s in servers: + await _close_server(s) + + asyncio.run(_run()) + + +class TestSingleHop: + """Client -> s5p -> mock socks5 -> echo.""" + + def test_echo_through_one_hop(self): + async def _run(): + servers = [] + try: + echo_host, echo_port, echo_srv = await start_echo_server() + servers.append(echo_srv) + mock_host, mock_port, mock_srv = await start_mock_socks5() + servers.append(mock_srv) + + listener = ListenerConfig( + listen_host="127.0.0.1", + listen_port=free_port(), + chain=[ChainHop(proto="socks5", host=mock_host, port=mock_port)], + ) + s5p_srv = await asyncio.start_server( + lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1), + listener.listen_host, listener.listen_port, + ) + servers.append(s5p_srv) + await s5p_srv.start_serving() + + reader, writer = await _socks5_connect( + listener.listen_host, listener.listen_port, echo_host, echo_port, + ) + writer.write(b"hello one hop") + await writer.drain() + data = await asyncio.wait_for(reader.read(4096), timeout=2.0) + assert data == b"hello one hop" + + writer.close() + await writer.wait_closed() + finally: + for s in servers: + await _close_server(s) + + asyncio.run(_run()) + + +class TestTwoHops: + """Client -> s5p -> mock1 -> mock2 -> echo.""" + + def test_echo_through_two_hops(self): + async def _run(): + servers = [] + try: + echo_host, echo_port, echo_srv = await start_echo_server() + servers.append(echo_srv) + m1_host, m1_port, m1_srv = await start_mock_socks5() + servers.append(m1_srv) + m2_host, m2_port, m2_srv = await start_mock_socks5() + servers.append(m2_srv) + + listener = ListenerConfig( + listen_host="127.0.0.1", + listen_port=free_port(), + chain=[ + ChainHop(proto="socks5", host=m1_host, port=m1_port), + ChainHop(proto="socks5", host=m2_host, port=m2_port), + ], + ) + s5p_srv = await asyncio.start_server( + lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1), + listener.listen_host, listener.listen_port, + ) + servers.append(s5p_srv) + await s5p_srv.start_serving() + + reader, writer = await _socks5_connect( + listener.listen_host, listener.listen_port, echo_host, echo_port, + ) + writer.write(b"hello two hops") + await writer.drain() + data = await asyncio.wait_for(reader.read(4096), timeout=2.0) + assert data == b"hello two hops" + + writer.close() + await writer.wait_closed() + finally: + for s in servers: + await _close_server(s) + + asyncio.run(_run()) + + +class TestConnectionRefused: + """Dead hop returns SOCKS5 error to client.""" + + def test_refused(self): + async def _run(): + servers = [] + try: + # use a port with nothing listening + dead_port = free_port() + + listener = ListenerConfig( + listen_host="127.0.0.1", + listen_port=free_port(), + chain=[ChainHop(proto="socks5", host="127.0.0.1", port=dead_port)], + ) + s5p_srv = await asyncio.start_server( + lambda r, w: _handle_client(r, w, listener, timeout=3.0, retries=1), + listener.listen_host, listener.listen_port, + ) + servers.append(s5p_srv) + await s5p_srv.start_serving() + + reader, writer = await asyncio.open_connection( + listener.listen_host, listener.listen_port, + ) + # greeting + writer.write(b"\x05\x01\x00") + await writer.drain() + resp = await reader.readexactly(2) + assert resp == b"\x05\x00" + + # connect to a dummy target + atyp, addr_bytes = encode_address("127.0.0.1") + writer.write( + struct.pack("!BBB", 0x05, 0x01, 0x00) + + bytes([atyp]) + + addr_bytes + + struct.pack("!H", 9999) + ) + await writer.drain() + + # should get error reply (non-zero rep field) + rep = await asyncio.wait_for(reader.read(4096), timeout=5.0) + assert len(rep) >= 3 + assert rep[1] != 0x00, "expected non-zero SOCKS5 reply code" + + writer.close() + await writer.wait_closed() + finally: + for s in servers: + await _close_server(s) + + asyncio.run(_run()) + + +class TestBypassDirectConnect: + """Target matches bypass rule -> chain skipped, direct connect to echo.""" + + def test_bypass_skips_chain(self): + async def _run(): + servers = [] + try: + echo_host, echo_port, echo_srv = await start_echo_server() + servers.append(echo_srv) + + # dead hop -- would fail if bypass didn't skip it + dead_port = free_port() + + listener = ListenerConfig( + listen_host="127.0.0.1", + listen_port=free_port(), + chain=[ChainHop(proto="socks5", host="127.0.0.1", port=dead_port)], + bypass=["127.0.0.0/8"], + ) + s5p_srv = await asyncio.start_server( + lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1), + listener.listen_host, listener.listen_port, + ) + servers.append(s5p_srv) + await s5p_srv.start_serving() + + reader, writer = await _socks5_connect( + listener.listen_host, listener.listen_port, echo_host, echo_port, + ) + writer.write(b"hello bypass") + await writer.drain() + data = await asyncio.wait_for(reader.read(4096), timeout=2.0) + assert data == b"hello bypass" + + writer.close() + await writer.wait_closed() + finally: + for s in servers: + await _close_server(s) + + asyncio.run(_run())