From b07ea499656fae163e73f81c8964e5cb9a894c4b Mon Sep 17 00:00:00 2001 From: user Date: Sun, 15 Feb 2026 04:54:13 +0100 Subject: [PATCH] feat: add connection retry and metrics Retry failed proxy connections with a fresh random proxy on each attempt (configurable via retries setting, proxy_source only). Track connection metrics and log summary every 60s and on shutdown. Co-Authored-By: Claude Opus 4.6 --- README.md | 6 ++- TASKS.md | 2 + config/example.yaml | 1 + docs/CHEATSHEET.md | 1 + docs/USAGE.md | 35 +++++++++++++++ src/s5p/cli.py | 7 +++ src/s5p/config.py | 4 ++ src/s5p/metrics.py | 49 +++++++++++++++++++++ src/s5p/server.py | 101 +++++++++++++++++++++++++++++++++++--------- 9 files changed, 185 insertions(+), 21 deletions(-) create mode 100644 src/s5p/metrics.py diff --git a/README.md b/README.md index c68aac4..458b8e3 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,8 @@ through configurable chains of SOCKS4, SOCKS5, and HTTP CONNECT proxies. - DNS leak prevention (domain names forwarded to proxies, never resolved locally) - Tor integration (Tor is just another SOCKS5 hop) - Dynamic proxy source: fetch proxies from an HTTP API, rotate per-connection +- Connection retry with proxy rotation (configurable attempts) +- Connection metrics (logged periodically and on shutdown) - Container-ready (Alpine-based, podman/docker) - Graceful shutdown (SIGTERM/SIGINT) - Pure Python, asyncio-based, minimal dependencies @@ -57,6 +59,7 @@ cp config/example.yaml config/s5p.yaml ```yaml listen: 127.0.0.1:1080 timeout: 10 +retries: 3 # max attempts (proxy_source only) chain: - socks5://127.0.0.1:9050 # Tor @@ -73,7 +76,7 @@ proxy_source: ## CLI Reference ``` -s5p [-c FILE] [-l [HOST:]PORT] [-C URL[,URL,...]] [-S URL] [-t SEC] [-v|-q] +s5p [-c FILE] [-l [HOST:]PORT] [-C URL[,URL,...]] [-S URL] [-t SEC] [-r N] [-v|-q] Options: -c, --config FILE YAML config file @@ -81,6 +84,7 @@ Options: -C, --chain URL[,URL] Comma-separated proxy chain -S, --proxy-source URL Proxy source API URL -t, --timeout SEC Per-hop timeout (default: 10) + -r, --retries N Max attempts per connection (default: 3, proxy_source only) -v, --verbose Debug logging -q, --quiet Errors only --cprofile [FILE] Enable cProfile, dump to FILE (default: s5p.prof) diff --git a/TASKS.md b/TASKS.md index 2dbfa5a..a38356c 100644 --- a/TASKS.md +++ b/TASKS.md @@ -16,6 +16,8 @@ - [x] Config split (example.yaml tracked, s5p.yaml gitignored) - [x] Dynamic proxy source API integration +- [x] Connection retry with proxy rotation +- [x] Connection metrics (periodic + shutdown logging) ## Next - [ ] Integration tests with mock proxy server diff --git a/config/example.yaml b/config/example.yaml index 03924ca..6a32646 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -3,6 +3,7 @@ listen: 127.0.0.1:1080 timeout: 10 +retries: 3 # max attempts per connection (proxy_source only) log_level: info # Proxy chain -- connections tunnel through each hop in order. diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index fe076e8..0a2cd37 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -12,6 +12,7 @@ s5p -t 30 # 30s per-hop timeout s5p -v # debug logging s5p -q # errors only s5p -S http://api:8081/proxies # proxy source API +s5p -r 5 # retry up to 5 proxies s5p --cprofile # profile to s5p.prof s5p --cprofile out.prof # profile to custom file ``` diff --git a/docs/USAGE.md b/docs/USAGE.md index a21f0e1..99fb394 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -41,6 +41,7 @@ cp config/example.yaml config/s5p.yaml ```yaml listen: 127.0.0.1:1080 timeout: 10 +retries: 3 # max attempts per connection (proxy_source only) log_level: info chain: @@ -101,6 +102,40 @@ s5p -C socks5://127.0.0.1:9050 -S http://10.200.1.250:8081/proxies The API must return JSON: `{"proxies": [{"proto": "socks5", "proxy": "host:port"}, ...]}`. Entries with `null` proto are skipped. +## Connection Retry + +When `proxy_source` is active, s5p retries failed connections with a different +random proxy. Controlled by the `retries` setting (default: 3). Static-only +chains do not retry (retrying the same chain is pointless). + +```yaml +retries: 5 # try up to 5 different proxies per connection +``` + +```bash +s5p -r 5 -C socks5://127.0.0.1:9050 -S http://api:8081/proxies +``` + +## Metrics + +s5p tracks connection metrics and logs a summary every 60 seconds and on +shutdown: + +``` +metrics: conn=142 ok=98 fail=44 retries=88 active=3 in=1.2M out=4.5M up=0h05m12s +``` + +| Counter | Meaning | +|---------|---------| +| `conn` | Total incoming connections | +| `ok` | Successfully connected + relayed | +| `fail` | All retries exhausted | +| `retries` | Total retry attempts | +| `active` | Currently relaying | +| `in` | Bytes client -> remote | +| `out` | Bytes remote -> client | +| `up` | Server uptime | + ## Profiling ```bash diff --git a/src/s5p/cli.py b/src/s5p/cli.py index a89ed64..662b2fe 100644 --- a/src/s5p/cli.py +++ b/src/s5p/cli.py @@ -42,6 +42,10 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: ) p.add_argument("-v", "--verbose", action="store_true", help="debug logging") p.add_argument("-q", "--quiet", action="store_true", help="errors only") + p.add_argument( + "-r", "--retries", type=int, metavar="N", + help="max connection attempts per request (default: 3, proxy_source only)", + ) p.add_argument( "-S", "--proxy-source", metavar="URL", help="proxy source API URL", @@ -73,6 +77,9 @@ def main(argv: list[str] | None = None) -> int: if args.timeout is not None: config.timeout = args.timeout + if args.retries is not None: + config.retries = args.retries + if args.proxy_source: config.proxy_source = ProxySourceConfig(url=args.proxy_source) diff --git a/src/s5p/config.py b/src/s5p/config.py index 4a478b0..7708ae8 100644 --- a/src/s5p/config.py +++ b/src/s5p/config.py @@ -46,6 +46,7 @@ class Config: listen_port: int = 1080 chain: list[ChainHop] = field(default_factory=list) timeout: float = 10.0 + retries: int = 3 log_level: str = "info" proxy_source: ProxySourceConfig | None = None @@ -92,6 +93,9 @@ def load_config(path: str | Path) -> Config: if "timeout" in raw: config.timeout = float(raw["timeout"]) + if "retries" in raw: + config.retries = int(raw["retries"]) + if "log_level" in raw: config.log_level = raw["log_level"] diff --git a/src/s5p/metrics.py b/src/s5p/metrics.py new file mode 100644 index 0000000..ab52845 --- /dev/null +++ b/src/s5p/metrics.py @@ -0,0 +1,49 @@ +"""Connection metrics and counters.""" + +from __future__ import annotations + +import threading +import time + + +class Metrics: + """Thread-safe connection metrics. + + Counters use a lock for consistency but tolerate minor races + on hot paths (bytes_in/bytes_out) for performance. + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self.connections: int = 0 + self.success: int = 0 + self.failed: int = 0 + self.retries: int = 0 + self.bytes_in: int = 0 + self.bytes_out: int = 0 + self.active: int = 0 + self.started: float = time.monotonic() + + def summary(self) -> str: + """One-line log-friendly summary.""" + with self._lock: + uptime = time.monotonic() - self.started + h, rem = divmod(int(uptime), 3600) + m, s = divmod(rem, 60) + return ( + f"conn={self.connections} ok={self.success} fail={self.failed} " + f"retries={self.retries} active={self.active} " + f"in={_human_bytes(self.bytes_in)} out={_human_bytes(self.bytes_out)} " + f"up={h}h{m:02d}m{s:02d}s" + ) + + +def _human_bytes(n: int) -> str: + """Format byte count in human-readable form.""" + for unit in ("B", "K", "M", "G", "T"): + if abs(n) < 1024: + if unit == "B": + return f"{n}{unit}" + return f"{n:.1f}{unit}" + n /= 1024 # type: ignore[assignment] + return f"{n:.1f}P" diff --git a/src/s5p/server.py b/src/s5p/server.py index 873cb87..f92fb14 100644 --- a/src/s5p/server.py +++ b/src/s5p/server.py @@ -9,6 +9,7 @@ import struct import time from .config import ChainHop, Config +from .metrics import Metrics from .source import ProxySource from .proto import ( ProtoError, @@ -30,13 +31,15 @@ BUFFER_SIZE = 65536 async def _relay( reader: asyncio.StreamReader, writer: asyncio.StreamWriter, -) -> None: - """Unidirectional data relay.""" +) -> int: + """Unidirectional data relay. Returns total bytes transferred.""" + total = 0 try: while True: data = await reader.read(BUFFER_SIZE) if not data: break + total += len(data) writer.write(data) await writer.drain() except (ConnectionError, asyncio.CancelledError, OSError): @@ -47,6 +50,7 @@ async def _relay( await writer.wait_closed() except (OSError, ConnectionError): pass + return total # -- chain building ---------------------------------------------------------- @@ -133,11 +137,15 @@ async def _handle_client( client_writer: asyncio.StreamWriter, config: Config, proxy_source: ProxySource | None = None, + metrics: Metrics | None = None, ) -> None: """Handle a single SOCKS5 client connection.""" peer = client_writer.get_extra_info("peername") tag = f"{peer[0]}:{peer[1]}" if peer else "?" + if metrics: + metrics.connections += 1 + try: # -- greeting -- header = await asyncio.wait_for(client_reader.readexactly(2), timeout=10.0) @@ -166,33 +174,60 @@ async def _handle_client( target_host, target_port = await read_socks5_address(client_reader) logger.info("[%s] connect %s:%d", tag, target_host, target_port) - # -- build chain -- - effective_chain = list(config.chain) - if proxy_source: - hop = await proxy_source.get() - if hop: - effective_chain.append(hop) - logger.debug("[%s] +proxy %s", tag, hop) + # -- build chain (with retry) -- + attempts = config.retries if proxy_source else 1 + last_err: Exception | None = None - t0 = time.monotonic() - remote_reader, remote_writer = await build_chain( - effective_chain, target_host, target_port, timeout=config.timeout - ) - dt = time.monotonic() - t0 - logger.debug("[%s] chain up in %.0fms", tag, dt * 1000) + for attempt in range(attempts): + effective_chain = list(config.chain) + if proxy_source: + hop = await proxy_source.get() + if hop: + effective_chain.append(hop) + logger.debug("[%s] +proxy %s", tag, hop) + + try: + t0 = time.monotonic() + remote_reader, remote_writer = await build_chain( + effective_chain, target_host, target_port, timeout=config.timeout + ) + dt = time.monotonic() - t0 + logger.debug("[%s] chain up in %.0fms", tag, dt * 1000) + break + except (ProtoError, asyncio.TimeoutError, ConnectionError, OSError) as e: + last_err = e + if metrics: + metrics.retries += 1 + if attempt + 1 < attempts: + logger.debug("[%s] attempt %d/%d failed: %s", tag, attempt + 1, attempts, e) + continue + raise last_err # -- success -- + if metrics: + metrics.success += 1 + metrics.active += 1 + client_writer.write(_socks5_reply(Socks5Reply.SUCCEEDED)) await client_writer.drain() # -- relay -- - await asyncio.gather( - _relay(client_reader, remote_writer), - _relay(remote_reader, client_writer), - ) + try: + bytes_up, bytes_down = await asyncio.gather( + _relay(client_reader, remote_writer), + _relay(remote_reader, client_writer), + ) + if metrics: + metrics.bytes_in += bytes_up + metrics.bytes_out += bytes_down + finally: + if metrics: + metrics.active -= 1 except ProtoError as e: logger.warning("[%s] %s", tag, e) + if metrics: + metrics.failed += 1 try: client_writer.write(_socks5_reply(e.reply)) await client_writer.drain() @@ -200,6 +235,8 @@ async def _handle_client( pass except asyncio.TimeoutError: logger.warning("[%s] timeout", tag) + if metrics: + metrics.failed += 1 try: client_writer.write(_socks5_reply(Socks5Reply.TTL_EXPIRED)) await client_writer.drain() @@ -207,6 +244,8 @@ async def _handle_client( pass except (ConnectionError, OSError) as e: logger.debug("[%s] %s", tag, e) + if metrics: + metrics.failed += 1 try: client_writer.write(_socks5_reply(Socks5Reply.CONNECTION_REFUSED)) await client_writer.drain() @@ -214,6 +253,8 @@ async def _handle_client( pass except Exception: logger.exception("[%s] unexpected error", tag) + if metrics: + metrics.failed += 1 finally: try: client_writer.close() @@ -225,15 +266,28 @@ async def _handle_client( # -- entry point ------------------------------------------------------------- +async def _metrics_logger(metrics: Metrics, stop: asyncio.Event) -> None: + """Log metrics summary every 60 seconds.""" + while not stop.is_set(): + try: + await asyncio.wait_for(stop.wait(), timeout=60.0) + except asyncio.TimeoutError: + pass + if not stop.is_set(): + logger.info("metrics: %s", metrics.summary()) + + async def serve(config: Config) -> None: """Start the SOCKS5 proxy server.""" + metrics = Metrics() + proxy_source: ProxySource | None = None if config.proxy_source and config.proxy_source.url: proxy_source = ProxySource(config.proxy_source) await proxy_source.start() async def on_client(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None: - await _handle_client(r, w, config, proxy_source) + await _handle_client(r, w, config, proxy_source, metrics) srv = await asyncio.start_server(on_client, config.listen_host, config.listen_port) addrs = ", ".join(str(s.getsockname()) for s in srv.sockets) @@ -247,6 +301,7 @@ async def serve(config: Config) -> None: if proxy_source: logger.info(" proxy source: %s (%d proxies)", config.proxy_source.url, proxy_source.count) + logger.info(" retries: %d", config.retries) loop = asyncio.get_running_loop() stop = loop.create_future() @@ -254,6 +309,12 @@ async def serve(config: Config) -> None: for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, lambda s=sig: stop.set_result(s)) + metrics_stop = asyncio.Event() + metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop)) + async with srv: sig = await stop logger.info("received %s, shutting down", signal.Signals(sig).name) + logger.info("metrics: %s", metrics.summary()) + metrics_stop.set() + await metrics_task