feat: named proxy pools with per-listener assignment

Add proxy_pools: top-level config (dict of name -> pool config) so
listeners can draw from different proxy sources. Each pool has
independent sources, health testing, state persistence, and refresh
cycles.

- PoolSourceConfig gains mitm: bool|None for API ?mitm=0/1 filtering
- ListenerConfig gains pool_name for named pool assignment
- ProxyPool gains name param with prefixed log messages and
  per-name state file derivation (pool-{name}.json)
- server.py replaces single proxy_pool with proxy_pools dict,
  validates listener pool references at startup, per-listener closure
- API /pool merges all pools (with pool field on multi-pool entries),
  /status and /config expose per-pool summaries
- Backward compat: singular proxy_pool: registers as "default"

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-18 11:33:53 +01:00
parent 288bd95f62
commit 29b4a36863
10 changed files with 680 additions and 154 deletions

View File

@@ -20,14 +20,37 @@ chain:
# - socks4://proxy:1080 # post-Tor SOCKS4/4a proxy
# - http://user:pass@proxy:8080 # post-Tor HTTP CONNECT proxy
# Managed proxy pool -- fetches from multiple sources, health-tests,
# and rotates alive proxies per-connection after the static chain.
# Named proxy pools -- each pool has its own sources, health tests,
# and state file. Listeners reference pools by name via the "pool:" key.
#
# proxy_pools:
# clean: # MITM-free proxies
# sources:
# - url: http://10.200.1.250:8081/proxies/all
# mitm: false # filter: mitm=0 query param
# state_file: /data/pool-clean.json
# refresh: 300
# test_interval: 120
# test_timeout: 8
# max_fails: 3
# mitm: # MITM-capable proxies
# sources:
# - url: http://10.200.1.250:8081/proxies/all
# mitm: true # filter: mitm=1 query param
# state_file: /data/pool-mitm.json
# refresh: 300
# test_interval: 120
# test_timeout: 8
# max_fails: 3
# Single proxy pool (legacy, still supported -- becomes pool "default"):
# proxy_pool:
# sources:
# - url: http://10.200.1.250:8081/proxies
# proto: socks5 # optional: filter by protocol
# country: US # optional: filter by country
# limit: 1000 # optional: max proxies to fetch
# mitm: false # optional: filter by MITM status (true/false)
# - file: /etc/s5p/proxies.txt # text file, one proxy URL per line
# refresh: 300 # re-fetch sources interval (seconds)
# test_interval: 120 # health test cycle interval (seconds)
@@ -59,26 +82,36 @@ chain:
# - socks5://10.200.1.250:9050
# - socks5://10.200.1.13:9050
# Multi-listener mode -- each listener gets its own address and chain.
# The "pool" keyword in a chain appends a random alive proxy from the pool.
# Multi-listener mode -- each listener gets its own address, chain,
# and optional pool assignment. The "pool" keyword in a chain appends
# a random alive proxy from the named pool (or "default" if unnamed).
# Multiple "pool" entries = multiple pool hops (deeper chaining).
#
# listeners:
# - listen: 0.0.0.0:1080
# pool: clean # draw from "clean" pool
# chain:
# - socks5://127.0.0.1:9050
# - pool # Tor + 2 random pool proxies
# - pool # Tor + 2 clean pool proxies
# - pool
#
# - listen: 0.0.0.0:1081
# pool: clean
# chain:
# - socks5://127.0.0.1:9050
# - pool # Tor + 1 random pool proxy
# - pool # Tor + 1 clean pool proxy
#
# - listen: 0.0.0.0:1082
# chain:
# - socks5://127.0.0.1:9050 # Tor only (no pool hops)
#
# - listen: 0.0.0.0:1083
# pool: mitm # draw from "mitm" pool
# chain:
# - socks5://127.0.0.1:9050
# - pool # Tor + 2 MITM pool proxies
# - pool
#
# When using "listeners:", the top-level "listen" and "chain" keys are ignored.
# If "listeners:" is absent, the old format is used (single listener).

View File

@@ -43,17 +43,25 @@ cp config/example.yaml config/s5p.yaml # create live config (gitignored)
```yaml
listeners:
- listen: 0.0.0.0:1080
pool: clean # named pool assignment
chain:
- socks5://127.0.0.1:9050
- pool # Tor + 2 pool hops
- pool # Tor + 2 clean hops
- pool
- listen: 0.0.0.0:1081
pool: clean
chain:
- socks5://127.0.0.1:9050
- pool # Tor + 1 pool hop
- pool # Tor + 1 clean hop
- listen: 0.0.0.0:1082
chain:
- socks5://127.0.0.1:9050 # Tor only
- listen: 0.0.0.0:1083
pool: mitm # MITM-capable proxies
chain:
- socks5://127.0.0.1:9050
- pool
- pool
```
## Multi-Tor Round-Robin (config)
@@ -74,26 +82,39 @@ pool_size: 8 # pre-warmed TCP conns to first hop (0 = off)
pool_max_idle: 30 # evict idle pooled conns (seconds)
```
## Proxy Pool (config)
## Named Proxy Pools (config)
```yaml
proxy_pool:
proxy_pools:
clean:
sources:
- url: http://10.200.1.250:8081/proxies
proto: socks5
limit: 1000
- file: /etc/s5p/proxies.txt
refresh: 300 # re-fetch interval
test_interval: 120 # health test cycle
test_targets: # TLS handshake targets (round-robin)
- www.google.com
- www.cloudflare.com
- www.amazon.com
test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool)
max_fails: 3 # evict after N fails
report_url: "" # POST dead proxies (optional)
- url: http://10.200.1.250:8081/proxies/all
mitm: false # adds ?mitm=0
state_file: /data/pool-clean.json
refresh: 300
test_interval: 120
max_fails: 3
mitm:
sources:
- url: http://10.200.1.250:8081/proxies/all
mitm: true # adds ?mitm=1
state_file: /data/pool-mitm.json
refresh: 300
test_interval: 120
max_fails: 3
```
Singular `proxy_pool:` still works (becomes pool "default").
## Source Filters (proxy_pool sources)
| Filter | Values | Query param |
|--------|--------|-------------|
| `proto` | socks5/socks4/http | `?proto=...` |
| `country` | ISO alpha-2 | `?country=...` |
| `limit` | integer | `?limit=...` |
| `mitm` | true/false | `?mitm=1` / `?mitm=0` |
## Tor Control Port (config)
```yaml

View File

@@ -47,39 +47,35 @@ pool_size: 0 # pre-warmed TCP connections to first hop (0 = disable
pool_max_idle: 30 # max idle time for pooled connections (seconds)
api_listen: "" # control API bind address (empty = disabled)
# Multi-listener (each port gets its own chain depth)
# Named proxy pools (each with its own sources and filters)
proxy_pools:
clean:
sources:
- url: http://10.200.1.250:8081/proxies/all
mitm: false
refresh: 300
test_interval: 120
test_timeout: 8
max_fails: 3
# Multi-listener (each port gets its own chain depth and pool)
listeners:
- listen: 0.0.0.0:1080
pool: clean
chain:
- socks5://127.0.0.1:9050
- pool # Tor + 2 pool proxies
- pool # Tor + 2 clean proxies
- pool
- listen: 0.0.0.0:1081
pool: clean
chain:
- socks5://127.0.0.1:9050
- pool # Tor + 1 pool proxy
- pool # Tor + 1 clean proxy
# Or single-listener (old format):
# listen: 127.0.0.1:1080
# chain:
# - socks5://127.0.0.1:9050
proxy_pool:
sources:
- url: http://10.200.1.250:8081/proxies
proto: socks5
limit: 1000
- file: /etc/s5p/proxies.txt
refresh: 300
test_interval: 120
test_targets: # TLS handshake targets (round-robin)
- www.google.com
- www.cloudflare.com
- www.amazon.com
test_timeout: 15
test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool)
max_fails: 3
state_file: "" # empty = ~/.cache/s5p/pool.json
```
## Multi-Tor Round-Robin
@@ -112,42 +108,85 @@ curl -s http://127.0.0.1:1090/config | jq '.tor_nodes'
curl -s http://127.0.0.1:1090/status | jq '.tor_nodes'
```
## Named Proxy Pools
Define multiple proxy pools with different source filters. Each listener can
reference a specific pool by name via the `pool:` key.
```yaml
proxy_pools:
clean:
sources:
- url: http://10.200.1.250:8081/proxies/all
mitm: false
state_file: /data/pool-clean.json
refresh: 300
test_interval: 120
test_timeout: 8
max_fails: 3
mitm:
sources:
- url: http://10.200.1.250:8081/proxies/all
mitm: true
state_file: /data/pool-mitm.json
refresh: 300
test_interval: 120
test_timeout: 8
max_fails: 3
```
Each pool has independent health testing, state persistence, and source
refresh cycles. The `mitm` source filter adds `?mitm=0` or `?mitm=1` to
API requests.
### Backward compatibility
The singular `proxy_pool:` key still works -- it registers as pool `"default"`.
If both `proxy_pool:` and `proxy_pools:` are present, `proxy_pools:` wins;
the singular is registered as `"default"` only when not already defined.
## Multi-Listener Mode
Run multiple listeners on different ports, each with a different number
of proxy hops after the static chain. Config-file only (not available via CLI).
of proxy hops and pool assignment. Config-file only (not available via CLI).
```yaml
listeners:
- listen: 0.0.0.0:1080
pool: clean
chain:
- socks5://10.200.1.13:9050
- pool # Tor + 2 pool proxies
- pool # Tor + 2 clean proxies
- pool
- listen: 0.0.0.0:1081
pool: clean
chain:
- socks5://10.200.1.13:9050
- pool # Tor + 1 pool proxy
- pool # Tor + 1 clean proxy
- listen: 0.0.0.0:1082
chain:
- socks5://10.200.1.13:9050 # Tor only
- socks5://10.200.1.13:9050 # Tor only (no pool)
proxy_pool:
sources:
- url: http://10.200.1.250:8081/proxies/all?mitm=0
refresh: 300
test_interval: 120
max_fails: 3
- listen: 0.0.0.0:1083
pool: mitm
chain:
- socks5://10.200.1.13:9050
- pool # Tor + 2 MITM proxies
- pool
```
The `pool` keyword in a chain means "append a random alive proxy from the
shared pool". Multiple `pool` entries = multiple pool hops (deeper chaining).
assigned pool". Multiple `pool` entries = multiple pool hops (deeper chaining).
When `pool:` is omitted on a listener with pool hops, it defaults to
`"default"`. A listener referencing an unknown pool name causes a fatal
error at startup. Listeners without pool hops ignore the `pool:` key.
| Resource | Scope | Notes |
|----------|-------|-------|
| ProxyPool | shared | All listeners draw from one pool |
| ProxyPool | per name | Each named pool is independent |
| TorController | shared | One Tor instance |
| Metrics | shared | Aggregate stats across listeners |
| Semaphore | shared | Global `max_connections` cap |
@@ -202,6 +241,7 @@ proxy_pool:
proto: socks5 # optional: filter by protocol
country: US # optional: filter by country
limit: 1000 # max proxies to fetch from API
mitm: false # optional: filter by MITM status (true/false)
- file: /etc/s5p/proxies.txt # text file, one proxy URL per line
refresh: 300 # re-fetch sources every 300 seconds
test_interval: 120 # health test cycle every 120 seconds
@@ -212,7 +252,7 @@ proxy_pool:
test_timeout: 15 # per-test timeout (seconds)
test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool)
max_fails: 3 # evict after N consecutive failures
state_file: "" # empty = ~/.cache/s5p/pool.json
state_file: "" # empty = ~/.cache/s5p/pool[-name].json
report_url: "" # POST dead proxies here (optional)
```
@@ -223,6 +263,17 @@ proxy_pool:
| HTTP API | `url` | JSON: `{"proxies": [{"proto": "socks5", "proxy": "host:port"}, ...]}` |
| Text file | `file` | One proxy URL per line, `#` comments, blank lines ignored |
### Source filters
| Filter | Values | Effect |
|--------|--------|--------|
| `proto` | `socks5`, `socks4`, `http` | Adds `?proto=...` to API URL |
| `country` | ISO 3166-1 alpha-2 | Adds `?country=...` to API URL |
| `limit` | integer | Adds `?limit=...` to API URL |
| `mitm` | `true` / `false` | Adds `?mitm=1` / `?mitm=0` to API URL |
The `mitm` filter is silently ignored for file sources.
### Proxy file format
```

View File

@@ -66,8 +66,17 @@ def _handle_status(ctx: dict) -> tuple[int, dict]:
"rate": round(metrics.conn_rate.rate(), 2),
"latency": metrics.latency.stats(),
}
pool = ctx.get("pool")
if pool:
pools: dict = ctx.get("pools") or {}
if pools:
total_alive = sum(p.alive_count for p in pools.values())
total_count = sum(p.count for p in pools.values())
data["pool"] = {"alive": total_alive, "total": total_count}
data["pools"] = {
name: {"alive": p.alive_count, "total": p.count}
for name, p in pools.items()
}
elif ctx.get("pool"):
pool = ctx["pool"]
data["pool"] = {"alive": pool.alive_count, "total": pool.count}
config = ctx.get("config")
if config and config.tor_nodes:
@@ -78,6 +87,7 @@ def _handle_status(ctx: dict) -> tuple[int, dict]:
"listen": f"{lc.listen_host}:{lc.listen_port}",
"chain": [str(h) for h in lc.chain],
"pool_hops": lc.pool_hops,
**({"pool": lc.pool_name} if lc.pool_name else {}),
"latency": metrics.get_listener_latency(
f"{lc.listen_host}:{lc.listen_port}"
).stats(),
@@ -94,15 +104,26 @@ def _handle_metrics(ctx: dict) -> tuple[int, dict]:
def _handle_pool(ctx: dict, alive_only: bool = False) -> tuple[int, dict]:
"""GET /pool or /pool/alive -- proxy pool state."""
pool = ctx.get("pool")
if not pool:
pools: dict = ctx.get("pools") or {}
pool_list = list(pools.values()) if pools else []
# backward compat: fall back to single "pool" key
if not pool_list and ctx.get("pool"):
pool_list = [ctx["pool"]]
if not pool_list:
return 200, {"alive": 0, "total": 0, "proxies": {}}
multi = len(pool_list) > 1
proxies = {}
for key, entry in pool._proxies.items():
total_alive = 0
total_count = 0
for p in pool_list:
total_alive += p.alive_count
total_count += p.count
for key, entry in p._proxies.items():
if alive_only and not entry.alive:
continue
proxies[key] = {
rec: dict = {
"alive": entry.alive,
"fails": entry.fails,
"tests": entry.tests,
@@ -110,11 +131,21 @@ def _handle_pool(ctx: dict, alive_only: bool = False) -> tuple[int, dict]:
"last_test": entry.last_test,
"last_seen": entry.last_seen,
}
return 200, {
"alive": pool.alive_count,
"total": pool.count,
if multi:
rec["pool"] = p.name
proxies[key] = rec
result: dict = {
"alive": total_alive,
"total": total_count,
"proxies": proxies,
}
if multi:
result["pools"] = {
p.name: {"alive": p.alive_count, "total": p.count}
for p in pool_list
}
return 200, result
def _handle_config(ctx: dict) -> tuple[int, dict]:
@@ -134,17 +165,38 @@ def _handle_config(ctx: dict) -> tuple[int, dict]:
"listen": f"{lc.listen_host}:{lc.listen_port}",
"chain": [str(h) for h in lc.chain],
"pool_hops": lc.pool_hops,
**({"pool": lc.pool_name} if lc.pool_name else {}),
}
for lc in config.listeners
],
}
if config.tor_nodes:
data["tor_nodes"] = [str(n) for n in config.tor_nodes]
if config.proxy_pool:
pp = config.proxy_pool
if config.proxy_pools:
pools_data: dict = {}
for name, pp in config.proxy_pools.items():
sources = []
for src in pp.sources:
s: dict = {}
if src.url:
s["url"] = src.url
if src.file:
s["file"] = src.file
if src.mitm is not None:
s["mitm"] = src.mitm
sources.append(s)
pools_data[name] = {
"sources": sources,
"refresh": pp.refresh,
"test_interval": pp.test_interval,
"max_fails": pp.max_fails,
}
data["proxy_pools"] = pools_data
elif config.proxy_pool:
pp = config.proxy_pool
sources = []
for src in pp.sources:
s = {}
if src.url:
s["url"] = src.url
if src.file:
@@ -173,19 +225,27 @@ async def _handle_reload(ctx: dict) -> tuple[int, dict]:
async def _handle_pool_test(ctx: dict) -> tuple[int, dict]:
"""POST /pool/test -- trigger immediate health test."""
pool = ctx.get("pool")
if not pool:
pools: dict = ctx.get("pools") or {}
pool_list = list(pools.values()) if pools else []
if not pool_list and ctx.get("pool"):
pool_list = [ctx["pool"]]
if not pool_list:
return 400, {"error": "no proxy pool configured"}
asyncio.create_task(pool._run_health_tests())
for p in pool_list:
asyncio.create_task(p._run_health_tests())
return 200, {"ok": True}
async def _handle_pool_refresh(ctx: dict) -> tuple[int, dict]:
"""POST /pool/refresh -- trigger immediate source re-fetch."""
pool = ctx.get("pool")
if not pool:
pools: dict = ctx.get("pools") or {}
pool_list = list(pools.values()) if pools else []
if not pool_list and ctx.get("pool"):
pool_list = [ctx["pool"]]
if not pool_list:
return 400, {"error": "no proxy pool configured"}
asyncio.create_task(pool._fetch_all_sources())
for p in pool_list:
asyncio.create_task(p._fetch_all_sources())
return 200, {"ok": True}

View File

@@ -36,6 +36,7 @@ class PoolSourceConfig:
proto: str | None = None
country: str | None = None
limit: int | None = 1000
mitm: bool | None = None
@dataclass
@@ -85,6 +86,7 @@ class ListenerConfig:
listen_port: int = 1080
chain: list[ChainHop] = field(default_factory=list)
pool_hops: int = 0
pool_name: str = ""
@dataclass
@@ -104,6 +106,7 @@ class Config:
api_host: str = ""
api_port: int = 0
proxy_pool: ProxyPoolConfig | None = None
proxy_pools: dict[str, ProxyPoolConfig] = field(default_factory=dict)
tor: TorConfig | None = None
tor_nodes: list[ChainHop] = field(default_factory=list)
config_file: str = ""
@@ -152,6 +155,39 @@ def parse_api_proxies(data: dict) -> list[ChainHop]:
return proxies
def _parse_pool_config(pool_raw: dict) -> ProxyPoolConfig:
"""Parse a single proxy pool config block from YAML."""
sources = []
for src in pool_raw.get("sources", []):
mitm = src.get("mitm")
if mitm is not None:
mitm = bool(mitm)
sources.append(
PoolSourceConfig(
url=src.get("url"),
file=src.get("file"),
proto=src.get("proto"),
country=src.get("country"),
limit=src.get("limit", 1000),
mitm=mitm,
)
)
kwargs: dict = {
"sources": sources,
"refresh": float(pool_raw.get("refresh", 300)),
"test_interval": float(pool_raw.get("test_interval", 120)),
"test_url": pool_raw.get("test_url", ""),
"test_timeout": float(pool_raw.get("test_timeout", 15)),
"test_concurrency": int(pool_raw.get("test_concurrency", 25)),
"max_fails": int(pool_raw.get("max_fails", 3)),
"state_file": pool_raw.get("state_file", ""),
"report_url": pool_raw.get("report_url", ""),
}
if "test_targets" in pool_raw:
kwargs["test_targets"] = list(pool_raw["test_targets"])
return ProxyPoolConfig(**kwargs)
def load_config(path: str | Path) -> Config:
"""Load configuration from a YAML file."""
path = Path(path)
@@ -211,33 +247,16 @@ def load_config(path: str | Path) -> Config:
)
)
# -- proxy pools (named) ------------------------------------------------
if "proxy_pools" in raw:
for name, pool_raw in raw["proxy_pools"].items():
config.proxy_pools[name] = _parse_pool_config(pool_raw)
if "proxy_pool" in raw:
pool_raw = raw["proxy_pool"]
sources = []
for src in pool_raw.get("sources", []):
sources.append(
PoolSourceConfig(
url=src.get("url"),
file=src.get("file"),
proto=src.get("proto"),
country=src.get("country"),
limit=src.get("limit", 1000),
)
)
kwargs: dict = {
"sources": sources,
"refresh": float(pool_raw.get("refresh", 300)),
"test_interval": float(pool_raw.get("test_interval", 120)),
"test_url": pool_raw.get("test_url", ""),
"test_timeout": float(pool_raw.get("test_timeout", 15)),
"test_concurrency": int(pool_raw.get("test_concurrency", 25)),
"max_fails": int(pool_raw.get("max_fails", 3)),
"state_file": pool_raw.get("state_file", ""),
"report_url": pool_raw.get("report_url", ""),
}
if "test_targets" in pool_raw:
kwargs["test_targets"] = list(pool_raw["test_targets"])
config.proxy_pool = ProxyPoolConfig(**kwargs)
config.proxy_pool = _parse_pool_config(raw["proxy_pool"])
# register singular as "default" when proxy_pools doesn't already have it
if "default" not in config.proxy_pools:
config.proxy_pools["default"] = config.proxy_pool
elif "proxy_source" in raw:
# backward compat: convert legacy proxy_source to proxy_pool
src_raw = raw["proxy_source"]
@@ -283,6 +302,8 @@ def load_config(path: str | Path) -> Config:
lc.listen_port = int(port_str)
elif isinstance(listen, (str, int)) and listen:
lc.listen_port = int(listen)
if "pool" in entry:
lc.pool_name = entry["pool"]
chain_raw = entry.get("chain", [])
for item in chain_raw:
if isinstance(item, str) and item.lower() == "pool":

View File

@@ -55,12 +55,15 @@ class ProxyPool:
chain: list[ChainHop],
timeout: float,
chain_nodes: list[ChainHop] | None = None,
name: str = "default",
) -> None:
self._cfg = cfg
self._chain = list(chain)
self._chain_nodes = chain_nodes or []
self._chain_idx = 0
self._timeout = timeout
self._name = name
self._log_prefix = f"pool[{name}]" if name != "default" else "pool"
self._proxies: dict[str, ProxyEntry] = {}
self._alive_keys: list[str] = []
self._tasks: list[asyncio.Task] = []
@@ -78,6 +81,11 @@ class ProxyPool:
self._chain_idx += 1
return chain
@property
def name(self) -> str:
"""Pool name."""
return self._name
# -- public interface ----------------------------------------------------
async def start(self) -> None:
@@ -99,7 +107,7 @@ class ProxyPool:
async def reload(self, cfg: ProxyPoolConfig) -> None:
"""Update pool config and trigger source re-fetch."""
self._cfg = cfg
logger.info("pool: config reloaded, re-fetching sources")
logger.info("%s: config reloaded, re-fetching sources", self._log_prefix)
await self._fetch_all_sources()
self._save_state()
@@ -179,10 +187,10 @@ class ProxyPool:
label = src.url or src.file or "?"
if isinstance(result, Exception):
err = str(result) or type(result).__name__
logger.warning("pool: source %s failed: %s", label, err)
logger.warning("%s: source %s failed: %s", self._log_prefix, label, err)
else:
kind = "fetched" if src.url else "loaded"
logger.info("pool: %s %d proxies from %s", kind, len(result), label)
logger.info("%s: %s %d proxies from %s", self._log_prefix, kind, len(result), label)
proxies.extend(result)
self._merge(proxies)
@@ -195,6 +203,8 @@ class ProxyPool:
params["proto"] = src.proto
if src.country:
params["country"] = src.country
if src.mitm is not None:
params["mitm"] = "1" if src.mitm else "0"
url = src.url
if params:
@@ -208,7 +218,7 @@ class ProxyPool:
"""Parse a text file with one proxy URL per line (runs in executor)."""
path = Path(src.file).expanduser()
if not path.is_file():
logger.warning("pool: file not found: %s", path)
logger.warning("%s: file not found: %s", self._log_prefix, path)
return []
proxies: list[ChainHop] = []
@@ -219,7 +229,7 @@ class ProxyPool:
try:
hop = parse_proxy_url(line)
except ValueError as e:
logger.debug("pool: skipping invalid line %r: %s", line, e)
logger.debug("%s: skipping invalid line %r: %s", self._log_prefix, line, e)
continue
if src.proto and hop.proto != src.proto:
continue
@@ -301,7 +311,10 @@ class ProxyPool:
if self._chain:
chain_ok = await self._test_chain()
if not chain_ok:
logger.warning("pool: static chain unreachable, skipping proxy tests")
logger.warning(
"%s: static chain unreachable, skipping proxy tests",
self._log_prefix,
)
return
target = (
@@ -314,7 +327,10 @@ class ProxyPool:
effective = max(3, min(len(target) // 10, self._cfg.test_concurrency))
sem = asyncio.Semaphore(effective)
logger.debug("pool: testing %d proxies (concurrency=%d)", len(target), effective)
logger.debug(
"%s: testing %d proxies (concurrency=%d)",
self._log_prefix, len(target), effective,
)
results: dict[str, bool] = {}
async def _test(key: str, entry: ProxyEntry) -> None:
@@ -343,8 +359,8 @@ class ProxyPool:
skip_eviction = fail_rate > 0.90 and total > 10
if skip_eviction:
logger.warning(
"pool: %d/%d tests failed (%.0f%%), skipping eviction",
total - passed, total, fail_rate * 100,
"%s: %d/%d tests failed (%.0f%%), skipping eviction",
self._log_prefix, total - passed, total, fail_rate * 100,
)
evict_keys: list[str] = []
@@ -383,7 +399,8 @@ class ProxyPool:
parts.append(f"stale {len(stale_keys)}")
suffix = f" ({', '.join(parts)})" if parts else ""
logger.info(
"pool: %d proxies, %d alive%s",
"%s: %d proxies, %d alive%s",
self._log_prefix,
len(self._proxies),
len(self._alive_keys),
suffix,
@@ -407,9 +424,12 @@ class ProxyPool:
try:
await http_post_json(self._cfg.report_url, {"dead": dead})
logger.info("pool: reported %d dead proxies to %s", len(dead), self._cfg.report_url)
logger.info(
"%s: reported %d dead proxies to %s",
self._log_prefix, len(dead), self._cfg.report_url,
)
except Exception as e:
logger.debug("pool: report failed: %s", e)
logger.debug("%s: report failed: %s", self._log_prefix, e)
def _rebuild_alive(self) -> None:
"""Rebuild the alive keys list from current state."""
@@ -449,11 +469,12 @@ class ProxyPool:
# -- persistence ---------------------------------------------------------
def _resolve_state_path(self) -> Path:
"""Resolve state file path, defaulting to ~/.cache/s5p/pool.json."""
"""Resolve state file path, defaulting to ~/.cache/s5p/pool[-name].json."""
if self._cfg.state_file:
return Path(self._cfg.state_file).expanduser()
cache_dir = Path.home() / ".cache" / "s5p"
return cache_dir / "pool.json"
filename = "pool.json" if self._name == "default" else f"pool-{self._name}.json"
return cache_dir / filename
def _load_state(self) -> None:
"""Load proxy state from JSON file (warm start)."""
@@ -462,7 +483,7 @@ class ProxyPool:
try:
data = json.loads(self._state_path.read_text())
if data.get("version") != STATE_VERSION:
logger.warning("pool: state file version mismatch, starting fresh")
logger.warning("%s: state file version mismatch, starting fresh", self._log_prefix)
return
for key, entry in data.get("proxies", {}).items():
hop = ChainHop(
@@ -483,11 +504,11 @@ class ProxyPool:
)
self._rebuild_alive()
logger.info(
"pool: loaded state (%d proxies, %d alive)",
len(self._proxies), len(self._alive_keys),
"%s: loaded state (%d proxies, %d alive)",
self._log_prefix, len(self._proxies), len(self._alive_keys),
)
except (json.JSONDecodeError, KeyError, TypeError, ValueError) as e:
logger.warning("pool: corrupt state file: %s", e)
logger.warning("%s: corrupt state file: %s", self._log_prefix, e)
self._proxies.clear()
self._alive_keys.clear()
@@ -519,4 +540,4 @@ class ProxyPool:
tmp.write_text(json.dumps(data, indent=2))
os.replace(tmp, self._state_path)
except OSError as e:
logger.warning("pool: failed to save state: %s", e)
logger.warning("%s: failed to save state: %s", self._log_prefix, e)

View File

@@ -233,7 +233,7 @@ async def _handle_client(
async def _metrics_logger(
metrics: Metrics,
stop: asyncio.Event,
pool: ProxyPool | None = None,
pools: dict[str, ProxyPool] | None = None,
) -> None:
"""Log metrics summary every 60 seconds."""
while not stop.is_set():
@@ -243,8 +243,13 @@ async def _metrics_logger(
pass
if not stop.is_set():
line = metrics.summary()
if pool:
line += f" pool={pool.alive_count}/{pool.count}"
if pools:
if len(pools) == 1:
p = next(iter(pools.values()))
line += f" pool={p.alive_count}/{p.count}"
else:
for name, p in pools.items():
line += f" pool[{name}]={p.alive_count}/{p.count}"
logger.info("metrics: %s", line)
@@ -267,16 +272,40 @@ async def serve(config: Config) -> None:
nodes = ", ".join(str(n) for n in config.tor_nodes)
logger.info("tor_nodes: %s (round-robin)", nodes)
# -- shared proxy pool ---------------------------------------------------
proxy_pool: ProxyPool | None = None
if config.proxy_pool and config.proxy_pool.sources:
# use first listener's chain as base chain for pool health tests
# -- named proxy pools ---------------------------------------------------
proxy_pools: dict[str, ProxyPool] = {}
base_chain = listeners[0].chain if listeners else config.chain
proxy_pool = ProxyPool(
for pool_name, pool_cfg in config.proxy_pools.items():
if not pool_cfg.sources:
continue
pool = ProxyPool(
pool_cfg, base_chain, config.timeout,
chain_nodes=config.tor_nodes or None,
name=pool_name,
)
await pool.start()
proxy_pools[pool_name] = pool
# backward compat: single proxy_pool -> "default"
if not proxy_pools and config.proxy_pool and config.proxy_pool.sources:
pool = ProxyPool(
config.proxy_pool, base_chain, config.timeout,
chain_nodes=config.tor_nodes or None,
)
await proxy_pool.start()
await pool.start()
proxy_pools["default"] = pool
def _pool_for(lc: ListenerConfig) -> ProxyPool | None:
"""Resolve the proxy pool for a listener."""
if lc.pool_hops <= 0:
return None
name = lc.pool_name or "default"
if name not in proxy_pools:
raise RuntimeError(
f"listener {lc.listen_host}:{lc.listen_port} "
f"references unknown pool {name!r}"
)
return proxy_pools[name]
# -- per-unique first-hop connection pools --------------------------------
hop_pools: dict[tuple[str, int], FirstHopPool] = {}
@@ -332,15 +361,17 @@ async def serve(config: Config) -> None:
servers: list[asyncio.Server] = []
for lc in listeners:
hp = _hop_pool_for(lc)
lc_pool = _pool_for(lc)
async def on_client(
r: asyncio.StreamReader, w: asyncio.StreamWriter,
_lc: ListenerConfig = lc, _hp: FirstHopPool | None = hp,
_pool: ProxyPool | None = lc_pool,
) -> None:
async with sem:
await _handle_client(
r, w, _lc, config.timeout, config.retries,
proxy_pool, metrics, _hp, tor_rr, hop_pools,
_pool, metrics, _hp, tor_rr, hop_pools,
)
srv = await asyncio.start_server(on_client, lc.listen_host, lc.listen_port)
@@ -350,15 +381,20 @@ async def serve(config: Config) -> None:
chain_desc = " -> ".join(str(h) for h in lc.chain) if lc.chain else "direct"
nhops = lc.pool_hops
pool_desc = f" + {nhops} pool hop{'s' if nhops != 1 else ''}" if nhops else ""
if lc_pool and lc_pool.name != "default":
pool_desc += f" [{lc_pool.name}]"
logger.info("listener %s chain: %s%s", addr, chain_desc, pool_desc)
logger.info("max_connections=%d", config.max_connections)
if proxy_pool:
nsrc = len(config.proxy_pool.sources)
if proxy_pools:
for pname, pp in proxy_pools.items():
cfg = config.proxy_pools.get(pname, config.proxy_pool)
nsrc = len(cfg.sources) if cfg else 0
prefix = f"pool[{pname}]" if pname != "default" else "pool"
logger.info(
"pool: %d proxies, %d alive (from %d source%s)",
proxy_pool.count, proxy_pool.alive_count, nsrc, "s" if nsrc != 1 else "",
"%s: %d proxies, %d alive (from %d source%s)",
prefix, pp.count, pp.alive_count, nsrc, "s" if nsrc != 1 else "",
)
logger.info("retries: %d", config.retries)
@@ -374,7 +410,8 @@ async def serve(config: Config) -> None:
api_ctx: dict = {
"config": config,
"metrics": metrics,
"pool": proxy_pool,
"pools": proxy_pools,
"pool": next(iter(proxy_pools.values()), None), # backward compat
"hop_pools": hop_pools,
"tor": tor,
}
@@ -400,8 +437,13 @@ async def serve(config: Config) -> None:
for h in root.handlers:
h.setLevel(level)
logging.getLogger("s5p").setLevel(level)
if proxy_pool and new.proxy_pool:
await proxy_pool.reload(new.proxy_pool)
# reload named pools (match by name)
for pname, pp in proxy_pools.items():
new_cfg = new.proxy_pools.get(pname)
if new_cfg:
await pp.reload(new_cfg)
elif new.proxy_pool and pname == "default":
await pp.reload(new.proxy_pool)
logger.info("reload: config reloaded (listeners require restart)")
def _on_sighup() -> None:
@@ -414,8 +456,7 @@ async def serve(config: Config) -> None:
api_srv = await start_api(config.api_host, config.api_port, api_ctx)
metrics_stop = asyncio.Event()
pool_ref = proxy_pool
metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, pool_ref))
metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, proxy_pools or None))
# keep all servers open until stop signal
try:
@@ -435,11 +476,16 @@ async def serve(config: Config) -> None:
await tor.stop()
for hp in hop_pools.values():
await hp.stop()
if proxy_pool:
await proxy_pool.stop()
for pp in proxy_pools.values():
await pp.stop()
shutdown_line = metrics.summary()
if pool_ref:
shutdown_line += f" pool={pool_ref.alive_count}/{pool_ref.count}"
if proxy_pools:
if len(proxy_pools) == 1:
p = next(iter(proxy_pools.values()))
shutdown_line += f" pool={p.alive_count}/{p.count}"
else:
for pname, p in proxy_pools.items():
shutdown_line += f" pool[{pname}]={p.alive_count}/{p.count}"
logger.info("metrics: %s", shutdown_line)
metrics_stop.set()
await metrics_task

View File

@@ -82,6 +82,7 @@ class TestJsonResponse:
def _make_ctx(
config: Config | None = None,
pool: MagicMock | None = None,
pools: dict | None = None,
tor: MagicMock | None = None,
) -> dict:
"""Build a mock context dict."""
@@ -89,6 +90,7 @@ def _make_ctx(
"config": config or Config(),
"metrics": Metrics(),
"pool": pool,
"pools": pools,
"hop_pool": None,
"tor": tor,
}
@@ -149,6 +151,25 @@ class TestHandleStatus:
assert body["listeners"][1]["latency"] is None
class TestHandleStatusPools:
"""Test GET /status with multiple named pools."""
def test_multi_pool_summary(self):
pool_a = MagicMock()
pool_a.alive_count = 5
pool_a.count = 10
pool_a.name = "clean"
pool_b = MagicMock()
pool_b.alive_count = 3
pool_b.count = 8
pool_b.name = "mitm"
ctx = _make_ctx(pools={"clean": pool_a, "mitm": pool_b})
_, body = _handle_status(ctx)
assert body["pool"] == {"alive": 8, "total": 18}
assert body["pools"]["clean"] == {"alive": 5, "total": 10}
assert body["pools"]["mitm"] == {"alive": 3, "total": 8}
class TestHandleMetrics:
"""Test GET /metrics handler."""
@@ -218,6 +239,57 @@ class TestHandlePool:
assert "socks5://1.2.3.4:1080" in body["proxies"]
class TestHandlePoolMulti:
"""Test GET /pool with multiple named pools."""
def test_merges_entries(self):
pool_a = MagicMock()
pool_a.alive_count = 1
pool_a.count = 1
pool_a.name = "clean"
entry_a = MagicMock(
alive=True, fails=0, tests=5,
last_ok=100.0, last_test=100.0, last_seen=100.0,
)
pool_a._proxies = {"socks5://1.2.3.4:1080": entry_a}
pool_b = MagicMock()
pool_b.alive_count = 1
pool_b.count = 1
pool_b.name = "mitm"
entry_b = MagicMock(
alive=True, fails=0, tests=3,
last_ok=90.0, last_test=90.0, last_seen=90.0,
)
pool_b._proxies = {"socks5://5.6.7.8:1080": entry_b}
ctx = _make_ctx(pools={"clean": pool_a, "mitm": pool_b})
_, body = _handle_pool(ctx)
assert body["alive"] == 2
assert body["total"] == 2
assert len(body["proxies"]) == 2
assert body["proxies"]["socks5://1.2.3.4:1080"]["pool"] == "clean"
assert body["proxies"]["socks5://5.6.7.8:1080"]["pool"] == "mitm"
assert "pools" in body
assert body["pools"]["clean"] == {"alive": 1, "total": 1}
def test_single_pool_no_pool_field(self):
"""Single pool: no 'pool' field on entries, no 'pools' summary."""
pool = MagicMock()
pool.alive_count = 1
pool.count = 1
pool.name = "default"
entry = MagicMock(
alive=True, fails=0, tests=5,
last_ok=100.0, last_test=100.0, last_seen=100.0,
)
pool._proxies = {"socks5://1.2.3.4:1080": entry}
ctx = _make_ctx(pools={"default": pool})
_, body = _handle_pool(ctx)
assert "pool" not in body["proxies"]["socks5://1.2.3.4:1080"]
assert "pools" not in body
class TestHandleConfig:
"""Test GET /config handler."""
@@ -255,6 +327,34 @@ class TestHandleConfig:
assert body["proxy_pool"]["sources"][0]["url"] == "http://api:8081/proxies"
assert body["listeners"][0]["pool_hops"] == 1
def test_with_proxy_pools(self):
pp_clean = ProxyPoolConfig(
sources=[PoolSourceConfig(url="http://api:8081/proxies/all", mitm=False)],
refresh=300.0,
test_interval=120.0,
max_fails=3,
)
pp_mitm = ProxyPoolConfig(
sources=[PoolSourceConfig(url="http://api:8081/proxies/all", mitm=True)],
refresh=300.0,
test_interval=120.0,
max_fails=3,
)
config = Config(
proxy_pools={"clean": pp_clean, "mitm": pp_mitm},
listeners=[ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_hops=2, pool_name="clean",
)],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert "proxy_pools" in body
assert body["proxy_pools"]["clean"]["sources"][0]["mitm"] is False
assert body["proxy_pools"]["mitm"]["sources"][0]["mitm"] is True
assert body["listeners"][0]["pool"] == "clean"
def test_with_tor_nodes(self):
config = Config(
tor_nodes=[

View File

@@ -222,6 +222,90 @@ class TestConfig:
]
class TestProxyPools:
"""Test named proxy_pools config parsing."""
def test_proxy_pools_from_yaml(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"proxy_pools:\n"
" clean:\n"
" sources:\n"
" - url: http://api:8081/proxies/all\n"
" mitm: false\n"
" refresh: 300\n"
" state_file: /data/pool-clean.json\n"
" mitm:\n"
" sources:\n"
" - url: http://api:8081/proxies/all\n"
" mitm: true\n"
" state_file: /data/pool-mitm.json\n"
)
c = load_config(cfg_file)
assert "clean" in c.proxy_pools
assert "mitm" in c.proxy_pools
assert c.proxy_pools["clean"].sources[0].mitm is False
assert c.proxy_pools["mitm"].sources[0].mitm is True
assert c.proxy_pools["clean"].state_file == "/data/pool-clean.json"
assert c.proxy_pools["mitm"].state_file == "/data/pool-mitm.json"
def test_mitm_none_when_absent(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"proxy_pool:\n"
" sources:\n"
" - url: http://api:8081/proxies\n"
)
c = load_config(cfg_file)
assert c.proxy_pool is not None
assert c.proxy_pool.sources[0].mitm is None
def test_singular_becomes_default(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"proxy_pool:\n"
" sources:\n"
" - url: http://api:8081/proxies\n"
)
c = load_config(cfg_file)
assert "default" in c.proxy_pools
assert c.proxy_pools["default"] is c.proxy_pool
def test_proxy_pools_wins_over_singular(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"proxy_pools:\n"
" default:\n"
" sources:\n"
" - url: http://api:8081/pools-default\n"
"proxy_pool:\n"
" sources:\n"
" - url: http://api:8081/singular\n"
)
c = load_config(cfg_file)
# proxy_pools "default" wins, singular does not overwrite
assert c.proxy_pools["default"].sources[0].url == "http://api:8081/pools-default"
def test_listener_pool_name(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
" - listen: 0.0.0.0:1080\n"
" pool: clean\n"
" chain:\n"
" - socks5://127.0.0.1:9050\n"
" - pool\n"
" - listen: 0.0.0.0:1081\n"
" chain:\n"
" - socks5://127.0.0.1:9050\n"
)
c = load_config(cfg_file)
assert c.listeners[0].pool_name == "clean"
assert c.listeners[0].pool_hops == 1
assert c.listeners[1].pool_name == ""
assert c.listeners[1].pool_hops == 0
class TestTorNodes:
"""Test tor_nodes config parsing."""

View File

@@ -11,6 +11,95 @@ from s5p.config import ChainHop, PoolSourceConfig, ProxyPoolConfig
from s5p.pool import ProxyEntry, ProxyPool
class TestProxyPoolName:
"""Test pool name and state path derivation."""
def test_default_name(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
assert pool.name == "default"
assert pool._log_prefix == "pool"
def test_named_pool(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0, name="clean")
assert pool.name == "clean"
assert pool._log_prefix == "pool[clean]"
def test_state_path_default(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
assert pool._state_path.name == "pool.json"
def test_state_path_named(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0, name="clean")
assert pool._state_path.name == "pool-clean.json"
def test_state_path_explicit_overrides_name(self):
cfg = ProxyPoolConfig(sources=[], state_file="/data/custom.json")
pool = ProxyPool(cfg, [], timeout=10.0, name="clean")
assert str(pool._state_path) == "/data/custom.json"
class TestProxyPoolMitmQuery:
"""Test mitm query parameter in API fetch."""
def test_mitm_false(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
src = PoolSourceConfig(url="http://api:8081/proxies/all", mitm=False)
async def run():
from unittest.mock import AsyncMock, patch
mock_ret = {"proxies": []}
with patch(
"s5p.pool.http_get_json",
new_callable=AsyncMock, return_value=mock_ret,
) as mock:
await pool._fetch_api(src)
call_url = mock.call_args[0][0]
assert "mitm=0" in call_url
asyncio.run(run())
def test_mitm_true(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
src = PoolSourceConfig(url="http://api:8081/proxies/all", mitm=True)
async def run():
from unittest.mock import AsyncMock, patch
mock_ret = {"proxies": []}
with patch(
"s5p.pool.http_get_json",
new_callable=AsyncMock, return_value=mock_ret,
) as mock:
await pool._fetch_api(src)
call_url = mock.call_args[0][0]
assert "mitm=1" in call_url
asyncio.run(run())
def test_mitm_none_omitted(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
src = PoolSourceConfig(url="http://api:8081/proxies/all", mitm=None)
async def run():
from unittest.mock import AsyncMock, patch
mock_ret = {"proxies": []}
with patch(
"s5p.pool.http_get_json",
new_callable=AsyncMock, return_value=mock_ret,
) as mock:
await pool._fetch_api(src)
call_url = mock.call_args[0][0]
assert "mitm" not in call_url
asyncio.run(run())
class TestProxyEntry:
"""Test ProxyEntry defaults."""