feat: multi-listener with configurable proxy chaining

Each listener binds to its own port with an independent chain.
The "pool" keyword in a chain appends a random alive proxy from
the shared pool; multiple pool entries = multiple hops.

  :1080 -> Tor only (0 pool hops)
  :1081 -> Tor + 1 pool proxy
  :1082 -> Tor + 2 pool proxies

Shared resources (ProxyPool, Tor, metrics, semaphore, API) are
reused across listeners. FirstHopPool is shared per unique first
hop. Backward compatible: old listen/chain format still works.
This commit is contained in:
user
2026-02-17 22:03:37 +01:00
parent ba60d087c0
commit 7dc3926f48
11 changed files with 495 additions and 62 deletions

View File

@@ -11,6 +11,7 @@ through configurable chains of SOCKS4, SOCKS5, and HTTP CONNECT proxies.
- Per-hop authentication (username/password) - Per-hop authentication (username/password)
- DNS leak prevention (domain names forwarded to proxies, never resolved locally) - DNS leak prevention (domain names forwarded to proxies, never resolved locally)
- Tor integration (SOCKS5 hop + control port NEWNYM for circuit rotation) - Tor integration (SOCKS5 hop + control port NEWNYM for circuit rotation)
- Multi-listener: different ports with different chain depths (Tor-only, Tor+1, Tor+2)
- Managed proxy pool: multiple sources (API + file), health-tested, weighted selection - Managed proxy pool: multiple sources (API + file), health-tested, weighted selection
- Per-proxy failure backoff (60s cooldown), stale proxy expiry, chain pre-flight - Per-proxy failure backoff (60s cooldown), stale proxy expiry, chain pre-flight
- Fast warm start (seconds on restart vs minutes on cold start) - Fast warm start (seconds on restart vs minutes on cold start)
@@ -66,15 +67,31 @@ cp config/example.yaml config/s5p.yaml
``` ```
```yaml ```yaml
listen: 127.0.0.1:1080
timeout: 10 timeout: 10
retries: 3 retries: 3
max_connections: 256 # concurrent connection limit max_connections: 256 # concurrent connection limit
pool_size: 8 # pre-warmed connections to first hop pool_size: 8 # pre-warmed connections to first hop
api_listen: 127.0.0.1:1081 # control API (disabled by default) api_listen: 127.0.0.1:1081 # control API (disabled by default)
chain: # Multi-listener: each port gets a different chain depth
- socks5://127.0.0.1:9050 # Tor listeners:
- listen: 0.0.0.0:1080
chain:
- socks5://127.0.0.1:9050 # Tor only
- listen: 0.0.0.0:1081
chain:
- socks5://127.0.0.1:9050
- pool # Tor + 1 pool proxy
- listen: 0.0.0.0:1082
chain:
- socks5://127.0.0.1:9050
- pool # Tor + 2 pool proxies
- pool
# Old single-listener format still works:
# listen: 127.0.0.1:1080
# chain:
# - socks5://127.0.0.1:9050
tor: tor:
control_port: 9051 # Tor control port (NEWNYM) control_port: 9051 # Tor control port (NEWNYM)
@@ -117,11 +134,13 @@ Options:
## How Chaining Works ## How Chaining Works
``` ```
Client -> s5p -> [static chain] -> [weighted alive proxy from pool] -> Destination :1080 Client -> s5p -> Tor -> Destination (0 pool hops)
:1081 Client -> s5p -> Tor -> [pool proxy] -> Destination (1 pool hop)
:1082 Client -> s5p -> Tor -> [pool proxy] -> [pool proxy] -> Dest (2 pool hops)
``` ```
s5p connects to Hop1 via TCP, negotiates the hop protocol (SOCKS5/4/HTTP), s5p connects to Hop1 via TCP, negotiates the hop protocol (SOCKS5/4/HTTP),
then over that tunnel negotiates with Hop2, and so on. If a proxy pool is then over that tunnel negotiates with Hop2, and so on. If a proxy pool is
configured, an alive proxy is appended per-connection, weighted toward those configured, alive proxies are appended per-connection (one per `pool` entry),
with the most recent successful health test. Each hop only sees its immediate weighted toward those with the most recent successful health test. Each hop
neighbors. only sees its immediate neighbors.

View File

@@ -47,6 +47,8 @@
- [x] Tor control port integration (NEWNYM signaling, periodic rotation) - [x] Tor control port integration (NEWNYM signaling, periodic rotation)
- [x] Replace HTTP health check with TLS handshake (round-robin targets, no httpbin dependency) - [x] Replace HTTP health check with TLS handshake (round-robin targets, no httpbin dependency)
- [x] Multi-listener with configurable proxy chaining (per-port chain depth)
## Next ## Next
- [ ] Integration tests with mock proxy server - [ ] Integration tests with mock proxy server
- [ ] SOCKS5 server-side authentication - [ ] SOCKS5 server-side authentication

View File

@@ -50,6 +50,29 @@ chain:
# cookie_file: "" # CookieAuthentication file path # cookie_file: "" # CookieAuthentication file path
# newnym_interval: 0 # periodic NEWNYM (seconds, 0 = manual only) # newnym_interval: 0 # periodic NEWNYM (seconds, 0 = manual only)
# Multi-listener mode -- each listener gets its own address and chain.
# The "pool" keyword in a chain appends a random alive proxy from the pool.
# Multiple "pool" entries = multiple pool hops (deeper chaining).
#
# listeners:
# - listen: 0.0.0.0:1080
# chain:
# - socks5://127.0.0.1:9050 # Tor only (no pool hops)
#
# - listen: 0.0.0.0:1081
# chain:
# - socks5://127.0.0.1:9050
# - pool # Tor + 1 random pool proxy
#
# - listen: 0.0.0.0:1082
# chain:
# - socks5://127.0.0.1:9050
# - pool # Tor + 2 random pool proxies
# - pool
#
# When using "listeners:", the top-level "listen" and "chain" keys are ignored.
# If "listeners:" is absent, the old format is used (single listener).
# Legacy proxy source (still supported, auto-converts to proxy_pool): # Legacy proxy source (still supported, auto-converts to proxy_pool):
# proxy_source: # proxy_source:
# url: http://10.200.1.250:8081/proxies # url: http://10.200.1.250:8081/proxies

View File

@@ -38,6 +38,24 @@ Volumes: `./src` (ro), `./config/s5p.yaml` (ro), `~/.cache/s5p` → `/data` (poo
cp config/example.yaml config/s5p.yaml # create live config (gitignored) cp config/example.yaml config/s5p.yaml # create live config (gitignored)
``` ```
## Multi-Listener (config)
```yaml
listeners:
- listen: 0.0.0.0:1080
chain:
- socks5://127.0.0.1:9050 # Tor only
- listen: 0.0.0.0:1081
chain:
- socks5://127.0.0.1:9050
- pool # Tor + 1 pool hop
- listen: 0.0.0.0:1082
chain:
- socks5://127.0.0.1:9050
- pool # Tor + 2 pool hops
- pool
```
## Performance Tuning (config) ## Performance Tuning (config)
```yaml ```yaml

View File

@@ -39,7 +39,6 @@ cp config/example.yaml config/s5p.yaml
| `config/s5p.yaml` | no (gitignored) | Live config with real proxy addresses | | `config/s5p.yaml` | no (gitignored) | Live config with real proxy addresses |
```yaml ```yaml
listen: 127.0.0.1:1080
timeout: 10 timeout: 10
retries: 3 retries: 3
log_level: info log_level: info
@@ -48,8 +47,20 @@ pool_size: 0 # pre-warmed TCP connections to first hop (0 = disable
pool_max_idle: 30 # max idle time for pooled connections (seconds) pool_max_idle: 30 # max idle time for pooled connections (seconds)
api_listen: "" # control API bind address (empty = disabled) api_listen: "" # control API bind address (empty = disabled)
chain: # Multi-listener (each port gets its own chain depth)
- socks5://127.0.0.1:9050 listeners:
- listen: 0.0.0.0:1080
chain:
- socks5://127.0.0.1:9050 # Tor only
- listen: 0.0.0.0:1081
chain:
- socks5://127.0.0.1:9050
- pool # Tor + 1 pool proxy
# Or single-listener (old format):
# listen: 127.0.0.1:1080
# chain:
# - socks5://127.0.0.1:9050
proxy_pool: proxy_pool:
sources: sources:
@@ -69,6 +80,58 @@ proxy_pool:
state_file: "" # empty = ~/.cache/s5p/pool.json state_file: "" # empty = ~/.cache/s5p/pool.json
``` ```
## Multi-Listener Mode
Run multiple listeners on different ports, each with a different number
of proxy hops after the static chain. Config-file only (not available via CLI).
```yaml
listeners:
- listen: 0.0.0.0:1080
chain:
- socks5://10.200.1.13:9050 # Tor only
- listen: 0.0.0.0:1081
chain:
- socks5://10.200.1.13:9050
- pool # Tor + 1 pool proxy
- listen: 0.0.0.0:1082
chain:
- socks5://10.200.1.13:9050
- pool # Tor + 2 pool proxies
- pool
proxy_pool:
sources:
- url: http://10.200.1.250:8081/proxies/all?mitm=0
refresh: 300
test_interval: 120
max_fails: 3
```
The `pool` keyword in a chain means "append a random alive proxy from the
shared pool". Multiple `pool` entries = multiple pool hops (deeper chaining).
| Resource | Scope | Notes |
|----------|-------|-------|
| ProxyPool | shared | All listeners draw from one pool |
| TorController | shared | One Tor instance |
| Metrics | shared | Aggregate stats across listeners |
| Semaphore | shared | Global `max_connections` cap |
| API server | shared | One control endpoint |
| FirstHopPool | per unique first hop | Listeners with same first hop share it |
| Chain + pool_hops | per listener | Each listener has its own chain depth |
### Backward compatibility
When no `listeners:` key is present, the old `listen`/`chain` format creates
a single listener. If `proxy_pool` is configured without explicit `pool` in
the chain, legacy behavior is preserved (1 pool hop auto-appended).
Settings that require a restart: `listeners`, `listen`, `chain`, `pool_size`,
`pool_max_idle`, `api_listen`.
## Proxy URL Format ## Proxy URL Format
``` ```
@@ -385,7 +448,7 @@ Settings reloaded on SIGHUP:
| `max_connections` | Concurrent connection limit | | `max_connections` | Concurrent connection limit |
| `proxy_pool.*` | Sources, intervals, thresholds | | `proxy_pool.*` | Sources, intervals, thresholds |
Settings that require a restart: `listen`, `chain`, `pool_size`, `pool_max_idle`, `api_listen`. Settings that require a restart: `listeners`, `listen`, `chain`, `pool_size`, `pool_max_idle`, `api_listen`.
Requires `-c` / `--config` to know which file to re-read. Without a Requires `-c` / `--config` to know which file to re-read. Without a
config file, SIGHUP is ignored with a warning. config file, SIGHUP is ignored with a warning.

View File

@@ -55,7 +55,7 @@ def _json_response(
def _handle_status(ctx: dict) -> tuple[int, dict]: def _handle_status(ctx: dict) -> tuple[int, dict]:
"""GET /status -- combined runtime summary.""" """GET /status -- combined runtime summary."""
metrics: Metrics = ctx["metrics"] metrics: Metrics = ctx["metrics"]
data = { data: dict = {
"uptime": round(time.monotonic() - metrics.started, 1), "uptime": round(time.monotonic() - metrics.started, 1),
"connections": metrics.connections, "connections": metrics.connections,
"success": metrics.success, "success": metrics.success,
@@ -69,7 +69,14 @@ def _handle_status(ctx: dict) -> tuple[int, dict]:
data["pool"] = {"alive": pool.alive_count, "total": pool.count} data["pool"] = {"alive": pool.alive_count, "total": pool.count}
config = ctx.get("config") config = ctx.get("config")
if config: if config:
data["chain"] = [str(h) for h in config.chain] data["listeners"] = [
{
"listen": f"{lc.listen_host}:{lc.listen_port}",
"chain": [str(h) for h in lc.chain],
"pool_hops": lc.pool_hops,
}
for lc in config.listeners
]
return 200, data return 200, data
@@ -110,13 +117,19 @@ def _handle_config(ctx: dict) -> tuple[int, dict]:
return 500, {"error": "config unavailable"} return 500, {"error": "config unavailable"}
data: dict = { data: dict = {
"listen": f"{config.listen_host}:{config.listen_port}",
"timeout": config.timeout, "timeout": config.timeout,
"retries": config.retries, "retries": config.retries,
"log_level": config.log_level, "log_level": config.log_level,
"max_connections": config.max_connections, "max_connections": config.max_connections,
"pool_size": config.pool_size, "pool_size": config.pool_size,
"chain": [str(h) for h in config.chain], "listeners": [
{
"listen": f"{lc.listen_host}:{lc.listen_port}",
"chain": [str(h) for h in lc.chain],
"pool_hops": lc.pool_hops,
}
for lc in config.listeners
],
} }
if config.proxy_pool: if config.proxy_pool:
pp = config.proxy_pool pp = config.proxy_pool

View File

@@ -7,7 +7,14 @@ import asyncio
import logging import logging
from . import __version__ from . import __version__
from .config import Config, PoolSourceConfig, ProxyPoolConfig, load_config, parse_proxy_url from .config import (
Config,
ListenerConfig,
PoolSourceConfig,
ProxyPoolConfig,
load_config,
parse_proxy_url,
)
from .server import serve from .server import serve
@@ -115,6 +122,25 @@ def main(argv: list[str] | None = None) -> int:
elif args.quiet: elif args.quiet:
config.log_level = "error" config.log_level = "error"
# ensure listeners list is populated (CLI-only mode, no config file)
if not config.listeners:
lc = ListenerConfig(
listen_host=config.listen_host,
listen_port=config.listen_port,
chain=list(config.chain),
)
if config.proxy_pool and config.proxy_pool.sources:
lc.pool_hops = 1
config.listeners.append(lc)
elif len(config.listeners) == 1:
# sync CLI overrides (-l, -C) to the single listener
lc = config.listeners[0]
if args.listen:
lc.listen_host = config.listen_host
lc.listen_port = config.listen_port
if args.chain:
lc.chain = list(config.chain)
_setup_logging(config.log_level) _setup_logging(config.log_level)
logger = logging.getLogger("s5p") logger = logging.getLogger("s5p")

View File

@@ -77,6 +77,16 @@ class TorConfig:
newnym_interval: float = 0.0 # 0 = manual only newnym_interval: float = 0.0 # 0 = manual only
@dataclass
class ListenerConfig:
"""A single listener with its own address and chain."""
listen_host: str = "127.0.0.1"
listen_port: int = 1080
chain: list[ChainHop] = field(default_factory=list)
pool_hops: int = 0
@dataclass @dataclass
class Config: class Config:
"""Server configuration.""" """Server configuration."""
@@ -84,6 +94,7 @@ class Config:
listen_host: str = "127.0.0.1" listen_host: str = "127.0.0.1"
listen_port: int = 1080 listen_port: int = 1080
chain: list[ChainHop] = field(default_factory=list) chain: list[ChainHop] = field(default_factory=list)
listeners: list[ListenerConfig] = field(default_factory=list)
timeout: float = 10.0 timeout: float = 10.0
retries: int = 3 retries: int = 3
log_level: str = "info" log_level: str = "info"
@@ -256,4 +267,44 @@ def load_config(path: str | Path) -> Config:
newnym_interval=float(tor_raw.get("newnym_interval", 0)), newnym_interval=float(tor_raw.get("newnym_interval", 0)),
) )
# -- listeners -------------------------------------------------------
if "listeners" in raw:
for entry in raw["listeners"]:
lc = ListenerConfig()
listen = entry.get("listen", "")
if isinstance(listen, str) and ":" in listen:
host, port_str = listen.rsplit(":", 1)
lc.listen_host = host
lc.listen_port = int(port_str)
elif isinstance(listen, (str, int)) and listen:
lc.listen_port = int(listen)
chain_raw = entry.get("chain", [])
for item in chain_raw:
if isinstance(item, str) and item.lower() == "pool":
lc.pool_hops += 1
elif isinstance(item, str):
lc.chain.append(parse_proxy_url(item))
elif isinstance(item, dict):
lc.chain.append(
ChainHop(
proto=item.get("proto", "socks5"),
host=item["host"],
port=int(item["port"]),
username=item.get("username"),
password=item.get("password"),
)
)
config.listeners.append(lc)
else:
# backward compat: build single listener from top-level fields
lc = ListenerConfig(
listen_host=config.listen_host,
listen_port=config.listen_port,
chain=list(config.chain),
)
# legacy behavior: if proxy_pool configured, auto-append 1 pool hop
if config.proxy_pool and config.proxy_pool.sources:
lc.pool_hops = 1
config.listeners.append(lc)
return config return config

View File

@@ -9,7 +9,7 @@ import struct
import time import time
from .api import start_api from .api import start_api
from .config import Config, load_config from .config import ChainHop, Config, ListenerConfig, load_config
from .connpool import FirstHopPool from .connpool import FirstHopPool
from .metrics import Metrics from .metrics import Metrics
from .pool import ProxyPool from .pool import ProxyPool
@@ -60,7 +60,9 @@ def _socks5_reply(rep: int) -> bytes:
async def _handle_client( async def _handle_client(
client_reader: asyncio.StreamReader, client_reader: asyncio.StreamReader,
client_writer: asyncio.StreamWriter, client_writer: asyncio.StreamWriter,
config: Config, listener: ListenerConfig,
timeout: float,
retries: int,
proxy_pool: ProxyPool | None = None, proxy_pool: ProxyPool | None = None,
metrics: Metrics | None = None, metrics: Metrics | None = None,
first_hop_pool: FirstHopPool | None = None, first_hop_pool: FirstHopPool | None = None,
@@ -101,31 +103,35 @@ async def _handle_client(
logger.info("[%s] connect %s:%d", tag, target_host, target_port) logger.info("[%s] connect %s:%d", tag, target_host, target_port)
# -- build chain (with retry) -- # -- build chain (with retry) --
attempts = config.retries if proxy_pool else 1 attempts = retries if proxy_pool and listener.pool_hops > 0 else 1
last_err: Exception | None = None last_err: Exception | None = None
for attempt in range(attempts): for attempt in range(attempts):
effective_chain = list(config.chain) effective_chain = list(listener.chain)
pool_hop = None pool_hops: list[ChainHop] = []
if proxy_pool: if proxy_pool and listener.pool_hops > 0:
pool_hop = await proxy_pool.get() for _ in range(listener.pool_hops):
if pool_hop: hop = await proxy_pool.get()
effective_chain.append(pool_hop) if hop:
logger.debug("[%s] +proxy %s", tag, pool_hop) pool_hops.append(hop)
effective_chain.append(hop)
if pool_hops:
logger.debug("[%s] +pool %s", tag, " ".join(str(h) for h in pool_hops))
try: try:
t0 = time.monotonic() t0 = time.monotonic()
remote_reader, remote_writer = await build_chain( remote_reader, remote_writer = await build_chain(
effective_chain, target_host, target_port, effective_chain, target_host, target_port,
timeout=config.timeout, first_hop_pool=first_hop_pool, timeout=timeout, first_hop_pool=first_hop_pool,
) )
dt = time.monotonic() - t0 dt = time.monotonic() - t0
logger.debug("[%s] chain up in %.0fms", tag, dt * 1000) logger.debug("[%s] chain up in %.0fms", tag, dt * 1000)
break break
except (ProtoError, TimeoutError, ConnectionError, OSError) as e: except (ProtoError, TimeoutError, ConnectionError, OSError) as e:
last_err = e last_err = e
if pool_hop and proxy_pool: if pool_hops and proxy_pool:
proxy_pool.report_failure(pool_hop) for hop in pool_hops:
proxy_pool.report_failure(hop)
if metrics: if metrics:
metrics.retries += 1 metrics.retries += 1
if attempt + 1 < attempts: if attempt + 1 < attempts:
@@ -224,19 +230,37 @@ async def serve(config: Config) -> None:
loop.add_signal_handler(sig, lambda s=sig: stop.set_result(s)) loop.add_signal_handler(sig, lambda s=sig: stop.set_result(s))
metrics = Metrics() metrics = Metrics()
listeners = config.listeners
# -- shared proxy pool ---------------------------------------------------
proxy_pool: ProxyPool | None = None proxy_pool: ProxyPool | None = None
if config.proxy_pool and config.proxy_pool.sources: if config.proxy_pool and config.proxy_pool.sources:
proxy_pool = ProxyPool(config.proxy_pool, config.chain, config.timeout) # use first listener's chain as base chain for pool health tests
base_chain = listeners[0].chain if listeners else config.chain
proxy_pool = ProxyPool(config.proxy_pool, base_chain, config.timeout)
await proxy_pool.start() await proxy_pool.start()
hop_pool: FirstHopPool | None = None # -- per-unique first-hop connection pools --------------------------------
if config.pool_size > 0 and config.chain: hop_pools: dict[tuple[str, int], FirstHopPool] = {}
hop_pool = FirstHopPool( if config.pool_size > 0:
config.chain[0], size=config.pool_size, max_idle=config.pool_max_idle, for lc in listeners:
) if not lc.chain:
await hop_pool.start() continue
first = lc.chain[0]
key = (first.host, first.port)
if key not in hop_pools:
hp = FirstHopPool(
first, size=config.pool_size, max_idle=config.pool_max_idle,
)
await hp.start()
hop_pools[key] = hp
def _hop_pool_for(lc: ListenerConfig) -> FirstHopPool | None:
if not lc.chain:
return None
return hop_pools.get((lc.chain[0].host, lc.chain[0].port))
# -- tor controller ------------------------------------------------------
tor: TorController | None = None tor: TorController | None = None
if config.tor: if config.tor:
tc = config.tor tc = config.tor
@@ -255,32 +279,44 @@ async def serve(config: Config) -> None:
sem = asyncio.Semaphore(config.max_connections) sem = asyncio.Semaphore(config.max_connections)
async def on_client(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None: # -- start one server per listener ---------------------------------------
async with sem: servers: list[asyncio.Server] = []
await _handle_client(r, w, config, proxy_pool, metrics, hop_pool) for lc in listeners:
hp = _hop_pool_for(lc)
srv = await asyncio.start_server(on_client, config.listen_host, config.listen_port) async def on_client(
addrs = ", ".join(str(s.getsockname()) for s in srv.sockets) r: asyncio.StreamReader, w: asyncio.StreamWriter,
logger.info("listening on %s max_connections=%d", addrs, config.max_connections) _lc: ListenerConfig = lc, _hp: FirstHopPool | None = hp,
) -> None:
async with sem:
await _handle_client(
r, w, _lc, config.timeout, config.retries,
proxy_pool, metrics, _hp,
)
if config.chain: srv = await asyncio.start_server(on_client, lc.listen_host, lc.listen_port)
for i, hop in enumerate(config.chain): servers.append(srv)
logger.info(" chain[%d] %s", i, hop)
else: addr = f"{lc.listen_host}:{lc.listen_port}"
logger.info(" mode: direct (no chain)") chain_desc = " -> ".join(str(h) for h in lc.chain) if lc.chain else "direct"
nhops = lc.pool_hops
pool_desc = f" + {nhops} pool hop{'s' if nhops != 1 else ''}" if nhops else ""
logger.info("listener %s chain: %s%s", addr, chain_desc, pool_desc)
logger.info("max_connections=%d", config.max_connections)
if proxy_pool: if proxy_pool:
nsrc = len(config.proxy_pool.sources) nsrc = len(config.proxy_pool.sources)
logger.info( logger.info(
" pool: %d proxies, %d alive (from %d source%s)", "pool: %d proxies, %d alive (from %d source%s)",
proxy_pool.count, proxy_pool.alive_count, nsrc, "s" if nsrc != 1 else "", proxy_pool.count, proxy_pool.alive_count, nsrc, "s" if nsrc != 1 else "",
) )
logger.info(" retries: %d", config.retries) logger.info("retries: %d", config.retries)
if tor: if tor:
extra = f", newnym every {tor.newnym_interval:.0f}s" if tor.newnym_interval else "" extra = f", newnym every {tor.newnym_interval:.0f}s" if tor.newnym_interval else ""
logger.info( logger.info(
" tor: control %s:%d%s", config.tor.control_host, config.tor.control_port, extra, "tor: control %s:%d%s", config.tor.control_host, config.tor.control_port, extra,
) )
# -- control API --------------------------------------------------------- # -- control API ---------------------------------------------------------
@@ -290,7 +326,7 @@ async def serve(config: Config) -> None:
"config": config, "config": config,
"metrics": metrics, "metrics": metrics,
"pool": proxy_pool, "pool": proxy_pool,
"hop_pool": hop_pool, "hop_pools": hop_pools,
"tor": tor, "tor": tor,
} }
@@ -317,7 +353,7 @@ async def serve(config: Config) -> None:
logging.getLogger("s5p").setLevel(level) logging.getLogger("s5p").setLevel(level)
if proxy_pool and new.proxy_pool: if proxy_pool and new.proxy_pool:
await proxy_pool.reload(new.proxy_pool) await proxy_pool.reload(new.proxy_pool)
logger.info("reload: config reloaded") logger.info("reload: config reloaded (listeners require restart)")
def _on_sighup() -> None: def _on_sighup() -> None:
asyncio.create_task(_reload()) asyncio.create_task(_reload())
@@ -332,16 +368,24 @@ async def serve(config: Config) -> None:
pool_ref = proxy_pool pool_ref = proxy_pool
metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, pool_ref)) metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, pool_ref))
async with srv: # keep all servers open until stop signal
try:
for srv in servers:
await srv.start_serving()
sig = await stop sig = await stop
finally:
logger.info("received %s, shutting down", signal.Signals(sig).name) logger.info("received %s, shutting down", signal.Signals(sig).name)
for srv in servers:
srv.close()
for srv in servers:
await srv.wait_closed()
if api_srv: if api_srv:
api_srv.close() api_srv.close()
await api_srv.wait_closed() await api_srv.wait_closed()
if tor: if tor:
await tor.stop() await tor.stop()
if hop_pool: for hp in hop_pools.values():
await hop_pool.stop() await hp.stop()
if proxy_pool: if proxy_pool:
await proxy_pool.stop() await proxy_pool.stop()
shutdown_line = metrics.summary() shutdown_line = metrics.summary()

View File

@@ -15,7 +15,7 @@ from s5p.api import (
_parse_request, _parse_request,
_route, _route,
) )
from s5p.config import ChainHop, Config, PoolSourceConfig, ProxyPoolConfig from s5p.config import ChainHop, Config, ListenerConfig, PoolSourceConfig, ProxyPoolConfig
from s5p.metrics import Metrics from s5p.metrics import Metrics
# -- request parsing --------------------------------------------------------- # -- request parsing ---------------------------------------------------------
@@ -118,11 +118,26 @@ class TestHandleStatus:
_, body = _handle_status(ctx) _, body = _handle_status(ctx)
assert body["pool"] == {"alive": 5, "total": 10} assert body["pool"] == {"alive": 5, "total": 10}
def test_with_chain(self): def test_with_listeners(self):
config = Config(chain=[ChainHop("socks5", "127.0.0.1", 9050)]) config = Config(
listeners=[
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
),
ListenerConfig(
listen_host="0.0.0.0", listen_port=1081,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_hops=1,
),
],
)
ctx = _make_ctx(config=config) ctx = _make_ctx(config=config)
_, body = _handle_status(ctx) _, body = _handle_status(ctx)
assert body["chain"] == ["socks5://127.0.0.1:9050"] assert len(body["listeners"]) == 2
assert body["listeners"][0]["chain"] == ["socks5://127.0.0.1:9050"]
assert body["listeners"][0]["pool_hops"] == 0
assert body["listeners"][1]["pool_hops"] == 1
class TestHandleMetrics: class TestHandleMetrics:
@@ -195,13 +210,18 @@ class TestHandleConfig:
"""Test GET /config handler.""" """Test GET /config handler."""
def test_basic(self): def test_basic(self):
config = Config(timeout=15.0, retries=5, log_level="debug") config = Config(
timeout=15.0, retries=5, log_level="debug",
listeners=[ListenerConfig(listen_host="0.0.0.0", listen_port=1080)],
)
ctx = _make_ctx(config=config) ctx = _make_ctx(config=config)
status, body = _handle_config(ctx) status, body = _handle_config(ctx)
assert status == 200 assert status == 200
assert body["timeout"] == 15.0 assert body["timeout"] == 15.0
assert body["retries"] == 5 assert body["retries"] == 5
assert body["log_level"] == "debug" assert body["log_level"] == "debug"
assert len(body["listeners"]) == 1
assert body["listeners"][0]["listen"] == "0.0.0.0:1080"
def test_with_proxy_pool(self): def test_with_proxy_pool(self):
pp = ProxyPoolConfig( pp = ProxyPoolConfig(
@@ -210,11 +230,18 @@ class TestHandleConfig:
test_interval=60.0, test_interval=60.0,
max_fails=5, max_fails=5,
) )
config = Config(proxy_pool=pp) config = Config(
proxy_pool=pp,
listeners=[ListenerConfig(
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_hops=1,
)],
)
ctx = _make_ctx(config=config) ctx = _make_ctx(config=config)
_, body = _handle_config(ctx) _, body = _handle_config(ctx)
assert body["proxy_pool"]["refresh"] == 600.0 assert body["proxy_pool"]["refresh"] == 600.0
assert body["proxy_pool"]["sources"][0]["url"] == "http://api:8081/proxies" assert body["proxy_pool"]["sources"][0]["url"] == "http://api:8081/proxies"
assert body["listeners"][0]["pool_hops"] == 1
# -- routing ----------------------------------------------------------------- # -- routing -----------------------------------------------------------------

View File

@@ -2,7 +2,14 @@
import pytest import pytest
from s5p.config import ChainHop, Config, load_config, parse_api_proxies, parse_proxy_url from s5p.config import (
ChainHop,
Config,
ListenerConfig,
load_config,
parse_api_proxies,
parse_proxy_url,
)
class TestParseProxyUrl: class TestParseProxyUrl:
@@ -212,3 +219,143 @@ class TestConfig:
assert c.proxy_pool.test_targets == [ assert c.proxy_pool.test_targets == [
"www.google.com", "www.cloudflare.com", "www.amazon.com", "www.google.com", "www.cloudflare.com", "www.amazon.com",
] ]
class TestListenerConfig:
"""Test multi-listener config parsing."""
def test_defaults(self):
lc = ListenerConfig()
assert lc.listen_host == "127.0.0.1"
assert lc.listen_port == 1080
assert lc.chain == []
assert lc.pool_hops == 0
def test_listeners_from_yaml(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
" - listen: 0.0.0.0:1080\n"
" chain:\n"
" - socks5://127.0.0.1:9050\n"
" - listen: 0.0.0.0:1081\n"
" chain:\n"
" - socks5://127.0.0.1:9050\n"
" - pool\n"
" - listen: 0.0.0.0:1082\n"
" chain:\n"
" - socks5://127.0.0.1:9050\n"
" - pool\n"
" - pool\n"
)
c = load_config(cfg_file)
assert len(c.listeners) == 3
# listener 0: no pool hops
assert c.listeners[0].listen_host == "0.0.0.0"
assert c.listeners[0].listen_port == 1080
assert len(c.listeners[0].chain) == 1
assert c.listeners[0].pool_hops == 0
# listener 1: 1 pool hop
assert c.listeners[1].listen_port == 1081
assert len(c.listeners[1].chain) == 1
assert c.listeners[1].pool_hops == 1
# listener 2: 2 pool hops
assert c.listeners[2].listen_port == 1082
assert len(c.listeners[2].chain) == 1
assert c.listeners[2].pool_hops == 2
def test_pool_keyword_stripped_from_chain(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
" - listen: 1080\n"
" chain:\n"
" - socks5://tor:9050\n"
" - pool\n"
" - pool\n"
)
c = load_config(cfg_file)
lc = c.listeners[0]
# only the real hop remains in chain
assert len(lc.chain) == 1
assert lc.chain[0].host == "tor"
assert lc.pool_hops == 2
def test_pool_keyword_case_insensitive(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
" - listen: 1080\n"
" chain:\n"
" - Pool\n"
)
c = load_config(cfg_file)
assert c.listeners[0].pool_hops == 1
assert c.listeners[0].chain == []
class TestListenerBackwardCompat:
"""Test backward-compatible single listener from old format."""
def test_old_format_creates_single_listener(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listen: 0.0.0.0:9999\n"
"chain:\n"
" - socks5://127.0.0.1:9050\n"
)
c = load_config(cfg_file)
assert len(c.listeners) == 1
lc = c.listeners[0]
assert lc.listen_host == "0.0.0.0"
assert lc.listen_port == 9999
assert len(lc.chain) == 1
assert lc.pool_hops == 0
def test_empty_config_creates_single_listener(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text("")
c = load_config(cfg_file)
assert len(c.listeners) == 1
lc = c.listeners[0]
assert lc.listen_host == "127.0.0.1"
assert lc.listen_port == 1080
class TestListenerPoolCompat:
"""Test that proxy_pool + old format auto-sets pool_hops=1."""
def test_pool_auto_appends(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listen: 0.0.0.0:1080\n"
"chain:\n"
" - socks5://127.0.0.1:9050\n"
"proxy_pool:\n"
" sources:\n"
" - url: http://api:8081/proxies\n"
)
c = load_config(cfg_file)
assert len(c.listeners) == 1
lc = c.listeners[0]
assert lc.pool_hops == 1
def test_explicit_listeners_no_auto_append(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
" - listen: 0.0.0.0:1080\n"
" chain:\n"
" - socks5://127.0.0.1:9050\n"
"proxy_pool:\n"
" sources:\n"
" - url: http://api:8081/proxies\n"
)
c = load_config(cfg_file)
assert len(c.listeners) == 1
lc = c.listeners[0]
# explicit listeners: no auto pool_hops
assert lc.pool_hops == 0