From 29b4a36863c442eba75f8fbaeebd4f6cbc04c450 Mon Sep 17 00:00:00 2001 From: user Date: Wed, 18 Feb 2026 11:33:53 +0100 Subject: [PATCH] feat: named proxy pools with per-listener assignment Add proxy_pools: top-level config (dict of name -> pool config) so listeners can draw from different proxy sources. Each pool has independent sources, health testing, state persistence, and refresh cycles. - PoolSourceConfig gains mitm: bool|None for API ?mitm=0/1 filtering - ListenerConfig gains pool_name for named pool assignment - ProxyPool gains name param with prefixed log messages and per-name state file derivation (pool-{name}.json) - server.py replaces single proxy_pool with proxy_pools dict, validates listener pool references at startup, per-listener closure - API /pool merges all pools (with pool field on multi-pool entries), /status and /config expose per-pool summaries - Backward compat: singular proxy_pool: registers as "default" Co-Authored-By: Claude Opus 4.6 --- config/example.yaml | 45 ++++++++++++++--- docs/CHEATSHEET.md | 57 ++++++++++++++------- docs/USAGE.md | 117 +++++++++++++++++++++++++++++++------------ src/s5p/api.py | 112 +++++++++++++++++++++++++++++++---------- src/s5p/config.py | 73 +++++++++++++++++---------- src/s5p/pool.py | 59 +++++++++++++++------- src/s5p/server.py | 98 ++++++++++++++++++++++++++---------- tests/test_api.py | 100 ++++++++++++++++++++++++++++++++++++ tests/test_config.py | 84 +++++++++++++++++++++++++++++++ tests/test_pool.py | 89 ++++++++++++++++++++++++++++++++ 10 files changed, 680 insertions(+), 154 deletions(-) diff --git a/config/example.yaml b/config/example.yaml index f64f802..5b72b45 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -20,14 +20,37 @@ chain: # - socks4://proxy:1080 # post-Tor SOCKS4/4a proxy # - http://user:pass@proxy:8080 # post-Tor HTTP CONNECT proxy -# Managed proxy pool -- fetches from multiple sources, health-tests, -# and rotates alive proxies per-connection after the static chain. +# Named proxy pools -- each pool has its own sources, health tests, +# and state file. Listeners reference pools by name via the "pool:" key. +# +# proxy_pools: +# clean: # MITM-free proxies +# sources: +# - url: http://10.200.1.250:8081/proxies/all +# mitm: false # filter: mitm=0 query param +# state_file: /data/pool-clean.json +# refresh: 300 +# test_interval: 120 +# test_timeout: 8 +# max_fails: 3 +# mitm: # MITM-capable proxies +# sources: +# - url: http://10.200.1.250:8081/proxies/all +# mitm: true # filter: mitm=1 query param +# state_file: /data/pool-mitm.json +# refresh: 300 +# test_interval: 120 +# test_timeout: 8 +# max_fails: 3 + +# Single proxy pool (legacy, still supported -- becomes pool "default"): # proxy_pool: # sources: # - url: http://10.200.1.250:8081/proxies # proto: socks5 # optional: filter by protocol # country: US # optional: filter by country # limit: 1000 # optional: max proxies to fetch +# mitm: false # optional: filter by MITM status (true/false) # - file: /etc/s5p/proxies.txt # text file, one proxy URL per line # refresh: 300 # re-fetch sources interval (seconds) # test_interval: 120 # health test cycle interval (seconds) @@ -59,26 +82,36 @@ chain: # - socks5://10.200.1.250:9050 # - socks5://10.200.1.13:9050 -# Multi-listener mode -- each listener gets its own address and chain. -# The "pool" keyword in a chain appends a random alive proxy from the pool. +# Multi-listener mode -- each listener gets its own address, chain, +# and optional pool assignment. The "pool" keyword in a chain appends +# a random alive proxy from the named pool (or "default" if unnamed). # Multiple "pool" entries = multiple pool hops (deeper chaining). # # listeners: # - listen: 0.0.0.0:1080 +# pool: clean # draw from "clean" pool # chain: # - socks5://127.0.0.1:9050 -# - pool # Tor + 2 random pool proxies +# - pool # Tor + 2 clean pool proxies # - pool # # - listen: 0.0.0.0:1081 +# pool: clean # chain: # - socks5://127.0.0.1:9050 -# - pool # Tor + 1 random pool proxy +# - pool # Tor + 1 clean pool proxy # # - listen: 0.0.0.0:1082 # chain: # - socks5://127.0.0.1:9050 # Tor only (no pool hops) # +# - listen: 0.0.0.0:1083 +# pool: mitm # draw from "mitm" pool +# chain: +# - socks5://127.0.0.1:9050 +# - pool # Tor + 2 MITM pool proxies +# - pool +# # When using "listeners:", the top-level "listen" and "chain" keys are ignored. # If "listeners:" is absent, the old format is used (single listener). diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index af2696c..f9dc461 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -43,17 +43,25 @@ cp config/example.yaml config/s5p.yaml # create live config (gitignored) ```yaml listeners: - listen: 0.0.0.0:1080 + pool: clean # named pool assignment chain: - socks5://127.0.0.1:9050 - - pool # Tor + 2 pool hops + - pool # Tor + 2 clean hops - pool - listen: 0.0.0.0:1081 + pool: clean chain: - socks5://127.0.0.1:9050 - - pool # Tor + 1 pool hop + - pool # Tor + 1 clean hop - listen: 0.0.0.0:1082 chain: - socks5://127.0.0.1:9050 # Tor only + - listen: 0.0.0.0:1083 + pool: mitm # MITM-capable proxies + chain: + - socks5://127.0.0.1:9050 + - pool + - pool ``` ## Multi-Tor Round-Robin (config) @@ -74,26 +82,39 @@ pool_size: 8 # pre-warmed TCP conns to first hop (0 = off) pool_max_idle: 30 # evict idle pooled conns (seconds) ``` -## Proxy Pool (config) +## Named Proxy Pools (config) ```yaml -proxy_pool: - sources: - - url: http://10.200.1.250:8081/proxies - proto: socks5 - limit: 1000 - - file: /etc/s5p/proxies.txt - refresh: 300 # re-fetch interval - test_interval: 120 # health test cycle - test_targets: # TLS handshake targets (round-robin) - - www.google.com - - www.cloudflare.com - - www.amazon.com - test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool) - max_fails: 3 # evict after N fails - report_url: "" # POST dead proxies (optional) +proxy_pools: + clean: + sources: + - url: http://10.200.1.250:8081/proxies/all + mitm: false # adds ?mitm=0 + state_file: /data/pool-clean.json + refresh: 300 + test_interval: 120 + max_fails: 3 + mitm: + sources: + - url: http://10.200.1.250:8081/proxies/all + mitm: true # adds ?mitm=1 + state_file: /data/pool-mitm.json + refresh: 300 + test_interval: 120 + max_fails: 3 ``` +Singular `proxy_pool:` still works (becomes pool "default"). + +## Source Filters (proxy_pool sources) + +| Filter | Values | Query param | +|--------|--------|-------------| +| `proto` | socks5/socks4/http | `?proto=...` | +| `country` | ISO alpha-2 | `?country=...` | +| `limit` | integer | `?limit=...` | +| `mitm` | true/false | `?mitm=1` / `?mitm=0` | + ## Tor Control Port (config) ```yaml diff --git a/docs/USAGE.md b/docs/USAGE.md index 85ac066..ca31ed7 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -47,39 +47,35 @@ pool_size: 0 # pre-warmed TCP connections to first hop (0 = disable pool_max_idle: 30 # max idle time for pooled connections (seconds) api_listen: "" # control API bind address (empty = disabled) -# Multi-listener (each port gets its own chain depth) +# Named proxy pools (each with its own sources and filters) +proxy_pools: + clean: + sources: + - url: http://10.200.1.250:8081/proxies/all + mitm: false + refresh: 300 + test_interval: 120 + test_timeout: 8 + max_fails: 3 + +# Multi-listener (each port gets its own chain depth and pool) listeners: - listen: 0.0.0.0:1080 + pool: clean chain: - socks5://127.0.0.1:9050 - - pool # Tor + 2 pool proxies + - pool # Tor + 2 clean proxies - pool - listen: 0.0.0.0:1081 + pool: clean chain: - socks5://127.0.0.1:9050 - - pool # Tor + 1 pool proxy + - pool # Tor + 1 clean proxy # Or single-listener (old format): # listen: 127.0.0.1:1080 # chain: # - socks5://127.0.0.1:9050 - -proxy_pool: - sources: - - url: http://10.200.1.250:8081/proxies - proto: socks5 - limit: 1000 - - file: /etc/s5p/proxies.txt - refresh: 300 - test_interval: 120 - test_targets: # TLS handshake targets (round-robin) - - www.google.com - - www.cloudflare.com - - www.amazon.com - test_timeout: 15 - test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool) - max_fails: 3 - state_file: "" # empty = ~/.cache/s5p/pool.json ``` ## Multi-Tor Round-Robin @@ -112,42 +108,85 @@ curl -s http://127.0.0.1:1090/config | jq '.tor_nodes' curl -s http://127.0.0.1:1090/status | jq '.tor_nodes' ``` +## Named Proxy Pools + +Define multiple proxy pools with different source filters. Each listener can +reference a specific pool by name via the `pool:` key. + +```yaml +proxy_pools: + clean: + sources: + - url: http://10.200.1.250:8081/proxies/all + mitm: false + state_file: /data/pool-clean.json + refresh: 300 + test_interval: 120 + test_timeout: 8 + max_fails: 3 + mitm: + sources: + - url: http://10.200.1.250:8081/proxies/all + mitm: true + state_file: /data/pool-mitm.json + refresh: 300 + test_interval: 120 + test_timeout: 8 + max_fails: 3 +``` + +Each pool has independent health testing, state persistence, and source +refresh cycles. The `mitm` source filter adds `?mitm=0` or `?mitm=1` to +API requests. + +### Backward compatibility + +The singular `proxy_pool:` key still works -- it registers as pool `"default"`. +If both `proxy_pool:` and `proxy_pools:` are present, `proxy_pools:` wins; +the singular is registered as `"default"` only when not already defined. + ## Multi-Listener Mode Run multiple listeners on different ports, each with a different number -of proxy hops after the static chain. Config-file only (not available via CLI). +of proxy hops and pool assignment. Config-file only (not available via CLI). ```yaml listeners: - listen: 0.0.0.0:1080 + pool: clean chain: - socks5://10.200.1.13:9050 - - pool # Tor + 2 pool proxies + - pool # Tor + 2 clean proxies - pool - listen: 0.0.0.0:1081 + pool: clean chain: - socks5://10.200.1.13:9050 - - pool # Tor + 1 pool proxy + - pool # Tor + 1 clean proxy - listen: 0.0.0.0:1082 chain: - - socks5://10.200.1.13:9050 # Tor only + - socks5://10.200.1.13:9050 # Tor only (no pool) -proxy_pool: - sources: - - url: http://10.200.1.250:8081/proxies/all?mitm=0 - refresh: 300 - test_interval: 120 - max_fails: 3 + - listen: 0.0.0.0:1083 + pool: mitm + chain: + - socks5://10.200.1.13:9050 + - pool # Tor + 2 MITM proxies + - pool ``` The `pool` keyword in a chain means "append a random alive proxy from the -shared pool". Multiple `pool` entries = multiple pool hops (deeper chaining). +assigned pool". Multiple `pool` entries = multiple pool hops (deeper chaining). + +When `pool:` is omitted on a listener with pool hops, it defaults to +`"default"`. A listener referencing an unknown pool name causes a fatal +error at startup. Listeners without pool hops ignore the `pool:` key. | Resource | Scope | Notes | |----------|-------|-------| -| ProxyPool | shared | All listeners draw from one pool | +| ProxyPool | per name | Each named pool is independent | | TorController | shared | One Tor instance | | Metrics | shared | Aggregate stats across listeners | | Semaphore | shared | Global `max_connections` cap | @@ -202,6 +241,7 @@ proxy_pool: proto: socks5 # optional: filter by protocol country: US # optional: filter by country limit: 1000 # max proxies to fetch from API + mitm: false # optional: filter by MITM status (true/false) - file: /etc/s5p/proxies.txt # text file, one proxy URL per line refresh: 300 # re-fetch sources every 300 seconds test_interval: 120 # health test cycle every 120 seconds @@ -212,7 +252,7 @@ proxy_pool: test_timeout: 15 # per-test timeout (seconds) test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool) max_fails: 3 # evict after N consecutive failures - state_file: "" # empty = ~/.cache/s5p/pool.json + state_file: "" # empty = ~/.cache/s5p/pool[-name].json report_url: "" # POST dead proxies here (optional) ``` @@ -223,6 +263,17 @@ proxy_pool: | HTTP API | `url` | JSON: `{"proxies": [{"proto": "socks5", "proxy": "host:port"}, ...]}` | | Text file | `file` | One proxy URL per line, `#` comments, blank lines ignored | +### Source filters + +| Filter | Values | Effect | +|--------|--------|--------| +| `proto` | `socks5`, `socks4`, `http` | Adds `?proto=...` to API URL | +| `country` | ISO 3166-1 alpha-2 | Adds `?country=...` to API URL | +| `limit` | integer | Adds `?limit=...` to API URL | +| `mitm` | `true` / `false` | Adds `?mitm=1` / `?mitm=0` to API URL | + +The `mitm` filter is silently ignored for file sources. + ### Proxy file format ``` diff --git a/src/s5p/api.py b/src/s5p/api.py index 66fff3e..19d0113 100644 --- a/src/s5p/api.py +++ b/src/s5p/api.py @@ -66,8 +66,17 @@ def _handle_status(ctx: dict) -> tuple[int, dict]: "rate": round(metrics.conn_rate.rate(), 2), "latency": metrics.latency.stats(), } - pool = ctx.get("pool") - if pool: + pools: dict = ctx.get("pools") or {} + if pools: + total_alive = sum(p.alive_count for p in pools.values()) + total_count = sum(p.count for p in pools.values()) + data["pool"] = {"alive": total_alive, "total": total_count} + data["pools"] = { + name: {"alive": p.alive_count, "total": p.count} + for name, p in pools.items() + } + elif ctx.get("pool"): + pool = ctx["pool"] data["pool"] = {"alive": pool.alive_count, "total": pool.count} config = ctx.get("config") if config and config.tor_nodes: @@ -78,6 +87,7 @@ def _handle_status(ctx: dict) -> tuple[int, dict]: "listen": f"{lc.listen_host}:{lc.listen_port}", "chain": [str(h) for h in lc.chain], "pool_hops": lc.pool_hops, + **({"pool": lc.pool_name} if lc.pool_name else {}), "latency": metrics.get_listener_latency( f"{lc.listen_host}:{lc.listen_port}" ).stats(), @@ -94,27 +104,48 @@ def _handle_metrics(ctx: dict) -> tuple[int, dict]: def _handle_pool(ctx: dict, alive_only: bool = False) -> tuple[int, dict]: """GET /pool or /pool/alive -- proxy pool state.""" - pool = ctx.get("pool") - if not pool: + pools: dict = ctx.get("pools") or {} + pool_list = list(pools.values()) if pools else [] + # backward compat: fall back to single "pool" key + if not pool_list and ctx.get("pool"): + pool_list = [ctx["pool"]] + + if not pool_list: return 200, {"alive": 0, "total": 0, "proxies": {}} + multi = len(pool_list) > 1 proxies = {} - for key, entry in pool._proxies.items(): - if alive_only and not entry.alive: - continue - proxies[key] = { - "alive": entry.alive, - "fails": entry.fails, - "tests": entry.tests, - "last_ok": entry.last_ok, - "last_test": entry.last_test, - "last_seen": entry.last_seen, - } - return 200, { - "alive": pool.alive_count, - "total": pool.count, + total_alive = 0 + total_count = 0 + for p in pool_list: + total_alive += p.alive_count + total_count += p.count + for key, entry in p._proxies.items(): + if alive_only and not entry.alive: + continue + rec: dict = { + "alive": entry.alive, + "fails": entry.fails, + "tests": entry.tests, + "last_ok": entry.last_ok, + "last_test": entry.last_test, + "last_seen": entry.last_seen, + } + if multi: + rec["pool"] = p.name + proxies[key] = rec + + result: dict = { + "alive": total_alive, + "total": total_count, "proxies": proxies, } + if multi: + result["pools"] = { + p.name: {"alive": p.alive_count, "total": p.count} + for p in pool_list + } + return 200, result def _handle_config(ctx: dict) -> tuple[int, dict]: @@ -134,17 +165,38 @@ def _handle_config(ctx: dict) -> tuple[int, dict]: "listen": f"{lc.listen_host}:{lc.listen_port}", "chain": [str(h) for h in lc.chain], "pool_hops": lc.pool_hops, + **({"pool": lc.pool_name} if lc.pool_name else {}), } for lc in config.listeners ], } if config.tor_nodes: data["tor_nodes"] = [str(n) for n in config.tor_nodes] - if config.proxy_pool: + if config.proxy_pools: + pools_data: dict = {} + for name, pp in config.proxy_pools.items(): + sources = [] + for src in pp.sources: + s: dict = {} + if src.url: + s["url"] = src.url + if src.file: + s["file"] = src.file + if src.mitm is not None: + s["mitm"] = src.mitm + sources.append(s) + pools_data[name] = { + "sources": sources, + "refresh": pp.refresh, + "test_interval": pp.test_interval, + "max_fails": pp.max_fails, + } + data["proxy_pools"] = pools_data + elif config.proxy_pool: pp = config.proxy_pool sources = [] for src in pp.sources: - s: dict = {} + s = {} if src.url: s["url"] = src.url if src.file: @@ -173,19 +225,27 @@ async def _handle_reload(ctx: dict) -> tuple[int, dict]: async def _handle_pool_test(ctx: dict) -> tuple[int, dict]: """POST /pool/test -- trigger immediate health test.""" - pool = ctx.get("pool") - if not pool: + pools: dict = ctx.get("pools") or {} + pool_list = list(pools.values()) if pools else [] + if not pool_list and ctx.get("pool"): + pool_list = [ctx["pool"]] + if not pool_list: return 400, {"error": "no proxy pool configured"} - asyncio.create_task(pool._run_health_tests()) + for p in pool_list: + asyncio.create_task(p._run_health_tests()) return 200, {"ok": True} async def _handle_pool_refresh(ctx: dict) -> tuple[int, dict]: """POST /pool/refresh -- trigger immediate source re-fetch.""" - pool = ctx.get("pool") - if not pool: + pools: dict = ctx.get("pools") or {} + pool_list = list(pools.values()) if pools else [] + if not pool_list and ctx.get("pool"): + pool_list = [ctx["pool"]] + if not pool_list: return 400, {"error": "no proxy pool configured"} - asyncio.create_task(pool._fetch_all_sources()) + for p in pool_list: + asyncio.create_task(p._fetch_all_sources()) return 200, {"ok": True} diff --git a/src/s5p/config.py b/src/s5p/config.py index 8858c24..9c7c0c1 100644 --- a/src/s5p/config.py +++ b/src/s5p/config.py @@ -36,6 +36,7 @@ class PoolSourceConfig: proto: str | None = None country: str | None = None limit: int | None = 1000 + mitm: bool | None = None @dataclass @@ -85,6 +86,7 @@ class ListenerConfig: listen_port: int = 1080 chain: list[ChainHop] = field(default_factory=list) pool_hops: int = 0 + pool_name: str = "" @dataclass @@ -104,6 +106,7 @@ class Config: api_host: str = "" api_port: int = 0 proxy_pool: ProxyPoolConfig | None = None + proxy_pools: dict[str, ProxyPoolConfig] = field(default_factory=dict) tor: TorConfig | None = None tor_nodes: list[ChainHop] = field(default_factory=list) config_file: str = "" @@ -152,6 +155,39 @@ def parse_api_proxies(data: dict) -> list[ChainHop]: return proxies +def _parse_pool_config(pool_raw: dict) -> ProxyPoolConfig: + """Parse a single proxy pool config block from YAML.""" + sources = [] + for src in pool_raw.get("sources", []): + mitm = src.get("mitm") + if mitm is not None: + mitm = bool(mitm) + sources.append( + PoolSourceConfig( + url=src.get("url"), + file=src.get("file"), + proto=src.get("proto"), + country=src.get("country"), + limit=src.get("limit", 1000), + mitm=mitm, + ) + ) + kwargs: dict = { + "sources": sources, + "refresh": float(pool_raw.get("refresh", 300)), + "test_interval": float(pool_raw.get("test_interval", 120)), + "test_url": pool_raw.get("test_url", ""), + "test_timeout": float(pool_raw.get("test_timeout", 15)), + "test_concurrency": int(pool_raw.get("test_concurrency", 25)), + "max_fails": int(pool_raw.get("max_fails", 3)), + "state_file": pool_raw.get("state_file", ""), + "report_url": pool_raw.get("report_url", ""), + } + if "test_targets" in pool_raw: + kwargs["test_targets"] = list(pool_raw["test_targets"]) + return ProxyPoolConfig(**kwargs) + + def load_config(path: str | Path) -> Config: """Load configuration from a YAML file.""" path = Path(path) @@ -211,33 +247,16 @@ def load_config(path: str | Path) -> Config: ) ) + # -- proxy pools (named) ------------------------------------------------ + if "proxy_pools" in raw: + for name, pool_raw in raw["proxy_pools"].items(): + config.proxy_pools[name] = _parse_pool_config(pool_raw) + if "proxy_pool" in raw: - pool_raw = raw["proxy_pool"] - sources = [] - for src in pool_raw.get("sources", []): - sources.append( - PoolSourceConfig( - url=src.get("url"), - file=src.get("file"), - proto=src.get("proto"), - country=src.get("country"), - limit=src.get("limit", 1000), - ) - ) - kwargs: dict = { - "sources": sources, - "refresh": float(pool_raw.get("refresh", 300)), - "test_interval": float(pool_raw.get("test_interval", 120)), - "test_url": pool_raw.get("test_url", ""), - "test_timeout": float(pool_raw.get("test_timeout", 15)), - "test_concurrency": int(pool_raw.get("test_concurrency", 25)), - "max_fails": int(pool_raw.get("max_fails", 3)), - "state_file": pool_raw.get("state_file", ""), - "report_url": pool_raw.get("report_url", ""), - } - if "test_targets" in pool_raw: - kwargs["test_targets"] = list(pool_raw["test_targets"]) - config.proxy_pool = ProxyPoolConfig(**kwargs) + config.proxy_pool = _parse_pool_config(raw["proxy_pool"]) + # register singular as "default" when proxy_pools doesn't already have it + if "default" not in config.proxy_pools: + config.proxy_pools["default"] = config.proxy_pool elif "proxy_source" in raw: # backward compat: convert legacy proxy_source to proxy_pool src_raw = raw["proxy_source"] @@ -283,6 +302,8 @@ def load_config(path: str | Path) -> Config: lc.listen_port = int(port_str) elif isinstance(listen, (str, int)) and listen: lc.listen_port = int(listen) + if "pool" in entry: + lc.pool_name = entry["pool"] chain_raw = entry.get("chain", []) for item in chain_raw: if isinstance(item, str) and item.lower() == "pool": diff --git a/src/s5p/pool.py b/src/s5p/pool.py index bf9a5ee..5ceb7ca 100644 --- a/src/s5p/pool.py +++ b/src/s5p/pool.py @@ -55,12 +55,15 @@ class ProxyPool: chain: list[ChainHop], timeout: float, chain_nodes: list[ChainHop] | None = None, + name: str = "default", ) -> None: self._cfg = cfg self._chain = list(chain) self._chain_nodes = chain_nodes or [] self._chain_idx = 0 self._timeout = timeout + self._name = name + self._log_prefix = f"pool[{name}]" if name != "default" else "pool" self._proxies: dict[str, ProxyEntry] = {} self._alive_keys: list[str] = [] self._tasks: list[asyncio.Task] = [] @@ -78,6 +81,11 @@ class ProxyPool: self._chain_idx += 1 return chain + @property + def name(self) -> str: + """Pool name.""" + return self._name + # -- public interface ---------------------------------------------------- async def start(self) -> None: @@ -99,7 +107,7 @@ class ProxyPool: async def reload(self, cfg: ProxyPoolConfig) -> None: """Update pool config and trigger source re-fetch.""" self._cfg = cfg - logger.info("pool: config reloaded, re-fetching sources") + logger.info("%s: config reloaded, re-fetching sources", self._log_prefix) await self._fetch_all_sources() self._save_state() @@ -179,10 +187,10 @@ class ProxyPool: label = src.url or src.file or "?" if isinstance(result, Exception): err = str(result) or type(result).__name__ - logger.warning("pool: source %s failed: %s", label, err) + logger.warning("%s: source %s failed: %s", self._log_prefix, label, err) else: kind = "fetched" if src.url else "loaded" - logger.info("pool: %s %d proxies from %s", kind, len(result), label) + logger.info("%s: %s %d proxies from %s", self._log_prefix, kind, len(result), label) proxies.extend(result) self._merge(proxies) @@ -195,6 +203,8 @@ class ProxyPool: params["proto"] = src.proto if src.country: params["country"] = src.country + if src.mitm is not None: + params["mitm"] = "1" if src.mitm else "0" url = src.url if params: @@ -208,7 +218,7 @@ class ProxyPool: """Parse a text file with one proxy URL per line (runs in executor).""" path = Path(src.file).expanduser() if not path.is_file(): - logger.warning("pool: file not found: %s", path) + logger.warning("%s: file not found: %s", self._log_prefix, path) return [] proxies: list[ChainHop] = [] @@ -219,7 +229,7 @@ class ProxyPool: try: hop = parse_proxy_url(line) except ValueError as e: - logger.debug("pool: skipping invalid line %r: %s", line, e) + logger.debug("%s: skipping invalid line %r: %s", self._log_prefix, line, e) continue if src.proto and hop.proto != src.proto: continue @@ -301,7 +311,10 @@ class ProxyPool: if self._chain: chain_ok = await self._test_chain() if not chain_ok: - logger.warning("pool: static chain unreachable, skipping proxy tests") + logger.warning( + "%s: static chain unreachable, skipping proxy tests", + self._log_prefix, + ) return target = ( @@ -314,7 +327,10 @@ class ProxyPool: effective = max(3, min(len(target) // 10, self._cfg.test_concurrency)) sem = asyncio.Semaphore(effective) - logger.debug("pool: testing %d proxies (concurrency=%d)", len(target), effective) + logger.debug( + "%s: testing %d proxies (concurrency=%d)", + self._log_prefix, len(target), effective, + ) results: dict[str, bool] = {} async def _test(key: str, entry: ProxyEntry) -> None: @@ -343,8 +359,8 @@ class ProxyPool: skip_eviction = fail_rate > 0.90 and total > 10 if skip_eviction: logger.warning( - "pool: %d/%d tests failed (%.0f%%), skipping eviction", - total - passed, total, fail_rate * 100, + "%s: %d/%d tests failed (%.0f%%), skipping eviction", + self._log_prefix, total - passed, total, fail_rate * 100, ) evict_keys: list[str] = [] @@ -383,7 +399,8 @@ class ProxyPool: parts.append(f"stale {len(stale_keys)}") suffix = f" ({', '.join(parts)})" if parts else "" logger.info( - "pool: %d proxies, %d alive%s", + "%s: %d proxies, %d alive%s", + self._log_prefix, len(self._proxies), len(self._alive_keys), suffix, @@ -407,9 +424,12 @@ class ProxyPool: try: await http_post_json(self._cfg.report_url, {"dead": dead}) - logger.info("pool: reported %d dead proxies to %s", len(dead), self._cfg.report_url) + logger.info( + "%s: reported %d dead proxies to %s", + self._log_prefix, len(dead), self._cfg.report_url, + ) except Exception as e: - logger.debug("pool: report failed: %s", e) + logger.debug("%s: report failed: %s", self._log_prefix, e) def _rebuild_alive(self) -> None: """Rebuild the alive keys list from current state.""" @@ -449,11 +469,12 @@ class ProxyPool: # -- persistence --------------------------------------------------------- def _resolve_state_path(self) -> Path: - """Resolve state file path, defaulting to ~/.cache/s5p/pool.json.""" + """Resolve state file path, defaulting to ~/.cache/s5p/pool[-name].json.""" if self._cfg.state_file: return Path(self._cfg.state_file).expanduser() cache_dir = Path.home() / ".cache" / "s5p" - return cache_dir / "pool.json" + filename = "pool.json" if self._name == "default" else f"pool-{self._name}.json" + return cache_dir / filename def _load_state(self) -> None: """Load proxy state from JSON file (warm start).""" @@ -462,7 +483,7 @@ class ProxyPool: try: data = json.loads(self._state_path.read_text()) if data.get("version") != STATE_VERSION: - logger.warning("pool: state file version mismatch, starting fresh") + logger.warning("%s: state file version mismatch, starting fresh", self._log_prefix) return for key, entry in data.get("proxies", {}).items(): hop = ChainHop( @@ -483,11 +504,11 @@ class ProxyPool: ) self._rebuild_alive() logger.info( - "pool: loaded state (%d proxies, %d alive)", - len(self._proxies), len(self._alive_keys), + "%s: loaded state (%d proxies, %d alive)", + self._log_prefix, len(self._proxies), len(self._alive_keys), ) except (json.JSONDecodeError, KeyError, TypeError, ValueError) as e: - logger.warning("pool: corrupt state file: %s", e) + logger.warning("%s: corrupt state file: %s", self._log_prefix, e) self._proxies.clear() self._alive_keys.clear() @@ -519,4 +540,4 @@ class ProxyPool: tmp.write_text(json.dumps(data, indent=2)) os.replace(tmp, self._state_path) except OSError as e: - logger.warning("pool: failed to save state: %s", e) + logger.warning("%s: failed to save state: %s", self._log_prefix, e) diff --git a/src/s5p/server.py b/src/s5p/server.py index 1fe1e9c..0046559 100644 --- a/src/s5p/server.py +++ b/src/s5p/server.py @@ -233,7 +233,7 @@ async def _handle_client( async def _metrics_logger( metrics: Metrics, stop: asyncio.Event, - pool: ProxyPool | None = None, + pools: dict[str, ProxyPool] | None = None, ) -> None: """Log metrics summary every 60 seconds.""" while not stop.is_set(): @@ -243,8 +243,13 @@ async def _metrics_logger( pass if not stop.is_set(): line = metrics.summary() - if pool: - line += f" pool={pool.alive_count}/{pool.count}" + if pools: + if len(pools) == 1: + p = next(iter(pools.values())) + line += f" pool={p.alive_count}/{p.count}" + else: + for name, p in pools.items(): + line += f" pool[{name}]={p.alive_count}/{p.count}" logger.info("metrics: %s", line) @@ -267,16 +272,40 @@ async def serve(config: Config) -> None: nodes = ", ".join(str(n) for n in config.tor_nodes) logger.info("tor_nodes: %s (round-robin)", nodes) - # -- shared proxy pool --------------------------------------------------- - proxy_pool: ProxyPool | None = None - if config.proxy_pool and config.proxy_pool.sources: - # use first listener's chain as base chain for pool health tests - base_chain = listeners[0].chain if listeners else config.chain - proxy_pool = ProxyPool( + # -- named proxy pools --------------------------------------------------- + proxy_pools: dict[str, ProxyPool] = {} + base_chain = listeners[0].chain if listeners else config.chain + for pool_name, pool_cfg in config.proxy_pools.items(): + if not pool_cfg.sources: + continue + pool = ProxyPool( + pool_cfg, base_chain, config.timeout, + chain_nodes=config.tor_nodes or None, + name=pool_name, + ) + await pool.start() + proxy_pools[pool_name] = pool + + # backward compat: single proxy_pool -> "default" + if not proxy_pools and config.proxy_pool and config.proxy_pool.sources: + pool = ProxyPool( config.proxy_pool, base_chain, config.timeout, chain_nodes=config.tor_nodes or None, ) - await proxy_pool.start() + await pool.start() + proxy_pools["default"] = pool + + def _pool_for(lc: ListenerConfig) -> ProxyPool | None: + """Resolve the proxy pool for a listener.""" + if lc.pool_hops <= 0: + return None + name = lc.pool_name or "default" + if name not in proxy_pools: + raise RuntimeError( + f"listener {lc.listen_host}:{lc.listen_port} " + f"references unknown pool {name!r}" + ) + return proxy_pools[name] # -- per-unique first-hop connection pools -------------------------------- hop_pools: dict[tuple[str, int], FirstHopPool] = {} @@ -332,15 +361,17 @@ async def serve(config: Config) -> None: servers: list[asyncio.Server] = [] for lc in listeners: hp = _hop_pool_for(lc) + lc_pool = _pool_for(lc) async def on_client( r: asyncio.StreamReader, w: asyncio.StreamWriter, _lc: ListenerConfig = lc, _hp: FirstHopPool | None = hp, + _pool: ProxyPool | None = lc_pool, ) -> None: async with sem: await _handle_client( r, w, _lc, config.timeout, config.retries, - proxy_pool, metrics, _hp, tor_rr, hop_pools, + _pool, metrics, _hp, tor_rr, hop_pools, ) srv = await asyncio.start_server(on_client, lc.listen_host, lc.listen_port) @@ -350,16 +381,21 @@ async def serve(config: Config) -> None: chain_desc = " -> ".join(str(h) for h in lc.chain) if lc.chain else "direct" nhops = lc.pool_hops pool_desc = f" + {nhops} pool hop{'s' if nhops != 1 else ''}" if nhops else "" + if lc_pool and lc_pool.name != "default": + pool_desc += f" [{lc_pool.name}]" logger.info("listener %s chain: %s%s", addr, chain_desc, pool_desc) logger.info("max_connections=%d", config.max_connections) - if proxy_pool: - nsrc = len(config.proxy_pool.sources) - logger.info( - "pool: %d proxies, %d alive (from %d source%s)", - proxy_pool.count, proxy_pool.alive_count, nsrc, "s" if nsrc != 1 else "", - ) + if proxy_pools: + for pname, pp in proxy_pools.items(): + cfg = config.proxy_pools.get(pname, config.proxy_pool) + nsrc = len(cfg.sources) if cfg else 0 + prefix = f"pool[{pname}]" if pname != "default" else "pool" + logger.info( + "%s: %d proxies, %d alive (from %d source%s)", + prefix, pp.count, pp.alive_count, nsrc, "s" if nsrc != 1 else "", + ) logger.info("retries: %d", config.retries) if tor: @@ -374,7 +410,8 @@ async def serve(config: Config) -> None: api_ctx: dict = { "config": config, "metrics": metrics, - "pool": proxy_pool, + "pools": proxy_pools, + "pool": next(iter(proxy_pools.values()), None), # backward compat "hop_pools": hop_pools, "tor": tor, } @@ -400,8 +437,13 @@ async def serve(config: Config) -> None: for h in root.handlers: h.setLevel(level) logging.getLogger("s5p").setLevel(level) - if proxy_pool and new.proxy_pool: - await proxy_pool.reload(new.proxy_pool) + # reload named pools (match by name) + for pname, pp in proxy_pools.items(): + new_cfg = new.proxy_pools.get(pname) + if new_cfg: + await pp.reload(new_cfg) + elif new.proxy_pool and pname == "default": + await pp.reload(new.proxy_pool) logger.info("reload: config reloaded (listeners require restart)") def _on_sighup() -> None: @@ -414,8 +456,7 @@ async def serve(config: Config) -> None: api_srv = await start_api(config.api_host, config.api_port, api_ctx) metrics_stop = asyncio.Event() - pool_ref = proxy_pool - metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, pool_ref)) + metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, proxy_pools or None)) # keep all servers open until stop signal try: @@ -435,11 +476,16 @@ async def serve(config: Config) -> None: await tor.stop() for hp in hop_pools.values(): await hp.stop() - if proxy_pool: - await proxy_pool.stop() + for pp in proxy_pools.values(): + await pp.stop() shutdown_line = metrics.summary() - if pool_ref: - shutdown_line += f" pool={pool_ref.alive_count}/{pool_ref.count}" + if proxy_pools: + if len(proxy_pools) == 1: + p = next(iter(proxy_pools.values())) + shutdown_line += f" pool={p.alive_count}/{p.count}" + else: + for pname, p in proxy_pools.items(): + shutdown_line += f" pool[{pname}]={p.alive_count}/{p.count}" logger.info("metrics: %s", shutdown_line) metrics_stop.set() await metrics_task diff --git a/tests/test_api.py b/tests/test_api.py index fce8b22..e48a821 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -82,6 +82,7 @@ class TestJsonResponse: def _make_ctx( config: Config | None = None, pool: MagicMock | None = None, + pools: dict | None = None, tor: MagicMock | None = None, ) -> dict: """Build a mock context dict.""" @@ -89,6 +90,7 @@ def _make_ctx( "config": config or Config(), "metrics": Metrics(), "pool": pool, + "pools": pools, "hop_pool": None, "tor": tor, } @@ -149,6 +151,25 @@ class TestHandleStatus: assert body["listeners"][1]["latency"] is None +class TestHandleStatusPools: + """Test GET /status with multiple named pools.""" + + def test_multi_pool_summary(self): + pool_a = MagicMock() + pool_a.alive_count = 5 + pool_a.count = 10 + pool_a.name = "clean" + pool_b = MagicMock() + pool_b.alive_count = 3 + pool_b.count = 8 + pool_b.name = "mitm" + ctx = _make_ctx(pools={"clean": pool_a, "mitm": pool_b}) + _, body = _handle_status(ctx) + assert body["pool"] == {"alive": 8, "total": 18} + assert body["pools"]["clean"] == {"alive": 5, "total": 10} + assert body["pools"]["mitm"] == {"alive": 3, "total": 8} + + class TestHandleMetrics: """Test GET /metrics handler.""" @@ -218,6 +239,57 @@ class TestHandlePool: assert "socks5://1.2.3.4:1080" in body["proxies"] +class TestHandlePoolMulti: + """Test GET /pool with multiple named pools.""" + + def test_merges_entries(self): + pool_a = MagicMock() + pool_a.alive_count = 1 + pool_a.count = 1 + pool_a.name = "clean" + entry_a = MagicMock( + alive=True, fails=0, tests=5, + last_ok=100.0, last_test=100.0, last_seen=100.0, + ) + pool_a._proxies = {"socks5://1.2.3.4:1080": entry_a} + + pool_b = MagicMock() + pool_b.alive_count = 1 + pool_b.count = 1 + pool_b.name = "mitm" + entry_b = MagicMock( + alive=True, fails=0, tests=3, + last_ok=90.0, last_test=90.0, last_seen=90.0, + ) + pool_b._proxies = {"socks5://5.6.7.8:1080": entry_b} + + ctx = _make_ctx(pools={"clean": pool_a, "mitm": pool_b}) + _, body = _handle_pool(ctx) + assert body["alive"] == 2 + assert body["total"] == 2 + assert len(body["proxies"]) == 2 + assert body["proxies"]["socks5://1.2.3.4:1080"]["pool"] == "clean" + assert body["proxies"]["socks5://5.6.7.8:1080"]["pool"] == "mitm" + assert "pools" in body + assert body["pools"]["clean"] == {"alive": 1, "total": 1} + + def test_single_pool_no_pool_field(self): + """Single pool: no 'pool' field on entries, no 'pools' summary.""" + pool = MagicMock() + pool.alive_count = 1 + pool.count = 1 + pool.name = "default" + entry = MagicMock( + alive=True, fails=0, tests=5, + last_ok=100.0, last_test=100.0, last_seen=100.0, + ) + pool._proxies = {"socks5://1.2.3.4:1080": entry} + ctx = _make_ctx(pools={"default": pool}) + _, body = _handle_pool(ctx) + assert "pool" not in body["proxies"]["socks5://1.2.3.4:1080"] + assert "pools" not in body + + class TestHandleConfig: """Test GET /config handler.""" @@ -255,6 +327,34 @@ class TestHandleConfig: assert body["proxy_pool"]["sources"][0]["url"] == "http://api:8081/proxies" assert body["listeners"][0]["pool_hops"] == 1 + def test_with_proxy_pools(self): + pp_clean = ProxyPoolConfig( + sources=[PoolSourceConfig(url="http://api:8081/proxies/all", mitm=False)], + refresh=300.0, + test_interval=120.0, + max_fails=3, + ) + pp_mitm = ProxyPoolConfig( + sources=[PoolSourceConfig(url="http://api:8081/proxies/all", mitm=True)], + refresh=300.0, + test_interval=120.0, + max_fails=3, + ) + config = Config( + proxy_pools={"clean": pp_clean, "mitm": pp_mitm}, + listeners=[ListenerConfig( + listen_host="0.0.0.0", listen_port=1080, + chain=[ChainHop("socks5", "127.0.0.1", 9050)], + pool_hops=2, pool_name="clean", + )], + ) + ctx = _make_ctx(config=config) + _, body = _handle_config(ctx) + assert "proxy_pools" in body + assert body["proxy_pools"]["clean"]["sources"][0]["mitm"] is False + assert body["proxy_pools"]["mitm"]["sources"][0]["mitm"] is True + assert body["listeners"][0]["pool"] == "clean" + def test_with_tor_nodes(self): config = Config( tor_nodes=[ diff --git a/tests/test_config.py b/tests/test_config.py index e621b3c..eccf721 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -222,6 +222,90 @@ class TestConfig: ] +class TestProxyPools: + """Test named proxy_pools config parsing.""" + + def test_proxy_pools_from_yaml(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "proxy_pools:\n" + " clean:\n" + " sources:\n" + " - url: http://api:8081/proxies/all\n" + " mitm: false\n" + " refresh: 300\n" + " state_file: /data/pool-clean.json\n" + " mitm:\n" + " sources:\n" + " - url: http://api:8081/proxies/all\n" + " mitm: true\n" + " state_file: /data/pool-mitm.json\n" + ) + c = load_config(cfg_file) + assert "clean" in c.proxy_pools + assert "mitm" in c.proxy_pools + assert c.proxy_pools["clean"].sources[0].mitm is False + assert c.proxy_pools["mitm"].sources[0].mitm is True + assert c.proxy_pools["clean"].state_file == "/data/pool-clean.json" + assert c.proxy_pools["mitm"].state_file == "/data/pool-mitm.json" + + def test_mitm_none_when_absent(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "proxy_pool:\n" + " sources:\n" + " - url: http://api:8081/proxies\n" + ) + c = load_config(cfg_file) + assert c.proxy_pool is not None + assert c.proxy_pool.sources[0].mitm is None + + def test_singular_becomes_default(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "proxy_pool:\n" + " sources:\n" + " - url: http://api:8081/proxies\n" + ) + c = load_config(cfg_file) + assert "default" in c.proxy_pools + assert c.proxy_pools["default"] is c.proxy_pool + + def test_proxy_pools_wins_over_singular(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "proxy_pools:\n" + " default:\n" + " sources:\n" + " - url: http://api:8081/pools-default\n" + "proxy_pool:\n" + " sources:\n" + " - url: http://api:8081/singular\n" + ) + c = load_config(cfg_file) + # proxy_pools "default" wins, singular does not overwrite + assert c.proxy_pools["default"].sources[0].url == "http://api:8081/pools-default" + + def test_listener_pool_name(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "listeners:\n" + " - listen: 0.0.0.0:1080\n" + " pool: clean\n" + " chain:\n" + " - socks5://127.0.0.1:9050\n" + " - pool\n" + " - listen: 0.0.0.0:1081\n" + " chain:\n" + " - socks5://127.0.0.1:9050\n" + ) + c = load_config(cfg_file) + assert c.listeners[0].pool_name == "clean" + assert c.listeners[0].pool_hops == 1 + assert c.listeners[1].pool_name == "" + assert c.listeners[1].pool_hops == 0 + + class TestTorNodes: """Test tor_nodes config parsing.""" diff --git a/tests/test_pool.py b/tests/test_pool.py index 6b78e5d..f4c9f05 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -11,6 +11,95 @@ from s5p.config import ChainHop, PoolSourceConfig, ProxyPoolConfig from s5p.pool import ProxyEntry, ProxyPool +class TestProxyPoolName: + """Test pool name and state path derivation.""" + + def test_default_name(self): + cfg = ProxyPoolConfig(sources=[]) + pool = ProxyPool(cfg, [], timeout=10.0) + assert pool.name == "default" + assert pool._log_prefix == "pool" + + def test_named_pool(self): + cfg = ProxyPoolConfig(sources=[]) + pool = ProxyPool(cfg, [], timeout=10.0, name="clean") + assert pool.name == "clean" + assert pool._log_prefix == "pool[clean]" + + def test_state_path_default(self): + cfg = ProxyPoolConfig(sources=[]) + pool = ProxyPool(cfg, [], timeout=10.0) + assert pool._state_path.name == "pool.json" + + def test_state_path_named(self): + cfg = ProxyPoolConfig(sources=[]) + pool = ProxyPool(cfg, [], timeout=10.0, name="clean") + assert pool._state_path.name == "pool-clean.json" + + def test_state_path_explicit_overrides_name(self): + cfg = ProxyPoolConfig(sources=[], state_file="/data/custom.json") + pool = ProxyPool(cfg, [], timeout=10.0, name="clean") + assert str(pool._state_path) == "/data/custom.json" + + +class TestProxyPoolMitmQuery: + """Test mitm query parameter in API fetch.""" + + def test_mitm_false(self): + cfg = ProxyPoolConfig(sources=[]) + pool = ProxyPool(cfg, [], timeout=10.0) + src = PoolSourceConfig(url="http://api:8081/proxies/all", mitm=False) + + async def run(): + from unittest.mock import AsyncMock, patch + mock_ret = {"proxies": []} + with patch( + "s5p.pool.http_get_json", + new_callable=AsyncMock, return_value=mock_ret, + ) as mock: + await pool._fetch_api(src) + call_url = mock.call_args[0][0] + assert "mitm=0" in call_url + + asyncio.run(run()) + + def test_mitm_true(self): + cfg = ProxyPoolConfig(sources=[]) + pool = ProxyPool(cfg, [], timeout=10.0) + src = PoolSourceConfig(url="http://api:8081/proxies/all", mitm=True) + + async def run(): + from unittest.mock import AsyncMock, patch + mock_ret = {"proxies": []} + with patch( + "s5p.pool.http_get_json", + new_callable=AsyncMock, return_value=mock_ret, + ) as mock: + await pool._fetch_api(src) + call_url = mock.call_args[0][0] + assert "mitm=1" in call_url + + asyncio.run(run()) + + def test_mitm_none_omitted(self): + cfg = ProxyPoolConfig(sources=[]) + pool = ProxyPool(cfg, [], timeout=10.0) + src = PoolSourceConfig(url="http://api:8081/proxies/all", mitm=None) + + async def run(): + from unittest.mock import AsyncMock, patch + mock_ret = {"proxies": []} + with patch( + "s5p.pool.http_get_json", + new_callable=AsyncMock, return_value=mock_ret, + ) as mock: + await pool._fetch_api(src) + call_url = mock.call_args[0][0] + assert "mitm" not in call_url + + asyncio.run(run()) + + class TestProxyEntry: """Test ProxyEntry defaults."""