feat: add connection retry and metrics

Retry failed proxy connections with a fresh random proxy on each
attempt (configurable via retries setting, proxy_source only).
Track connection metrics and log summary every 60s and on shutdown.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-15 04:54:13 +01:00
parent 9b18a59df9
commit b07ea49965
9 changed files with 185 additions and 21 deletions

View File

@@ -12,6 +12,8 @@ through configurable chains of SOCKS4, SOCKS5, and HTTP CONNECT proxies.
- DNS leak prevention (domain names forwarded to proxies, never resolved locally) - DNS leak prevention (domain names forwarded to proxies, never resolved locally)
- Tor integration (Tor is just another SOCKS5 hop) - Tor integration (Tor is just another SOCKS5 hop)
- Dynamic proxy source: fetch proxies from an HTTP API, rotate per-connection - Dynamic proxy source: fetch proxies from an HTTP API, rotate per-connection
- Connection retry with proxy rotation (configurable attempts)
- Connection metrics (logged periodically and on shutdown)
- Container-ready (Alpine-based, podman/docker) - Container-ready (Alpine-based, podman/docker)
- Graceful shutdown (SIGTERM/SIGINT) - Graceful shutdown (SIGTERM/SIGINT)
- Pure Python, asyncio-based, minimal dependencies - Pure Python, asyncio-based, minimal dependencies
@@ -57,6 +59,7 @@ cp config/example.yaml config/s5p.yaml
```yaml ```yaml
listen: 127.0.0.1:1080 listen: 127.0.0.1:1080
timeout: 10 timeout: 10
retries: 3 # max attempts (proxy_source only)
chain: chain:
- socks5://127.0.0.1:9050 # Tor - socks5://127.0.0.1:9050 # Tor
@@ -73,7 +76,7 @@ proxy_source:
## CLI Reference ## CLI Reference
``` ```
s5p [-c FILE] [-l [HOST:]PORT] [-C URL[,URL,...]] [-S URL] [-t SEC] [-v|-q] s5p [-c FILE] [-l [HOST:]PORT] [-C URL[,URL,...]] [-S URL] [-t SEC] [-r N] [-v|-q]
Options: Options:
-c, --config FILE YAML config file -c, --config FILE YAML config file
@@ -81,6 +84,7 @@ Options:
-C, --chain URL[,URL] Comma-separated proxy chain -C, --chain URL[,URL] Comma-separated proxy chain
-S, --proxy-source URL Proxy source API URL -S, --proxy-source URL Proxy source API URL
-t, --timeout SEC Per-hop timeout (default: 10) -t, --timeout SEC Per-hop timeout (default: 10)
-r, --retries N Max attempts per connection (default: 3, proxy_source only)
-v, --verbose Debug logging -v, --verbose Debug logging
-q, --quiet Errors only -q, --quiet Errors only
--cprofile [FILE] Enable cProfile, dump to FILE (default: s5p.prof) --cprofile [FILE] Enable cProfile, dump to FILE (default: s5p.prof)

View File

@@ -16,6 +16,8 @@
- [x] Config split (example.yaml tracked, s5p.yaml gitignored) - [x] Config split (example.yaml tracked, s5p.yaml gitignored)
- [x] Dynamic proxy source API integration - [x] Dynamic proxy source API integration
- [x] Connection retry with proxy rotation
- [x] Connection metrics (periodic + shutdown logging)
## Next ## Next
- [ ] Integration tests with mock proxy server - [ ] Integration tests with mock proxy server

View File

@@ -3,6 +3,7 @@
listen: 127.0.0.1:1080 listen: 127.0.0.1:1080
timeout: 10 timeout: 10
retries: 3 # max attempts per connection (proxy_source only)
log_level: info log_level: info
# Proxy chain -- connections tunnel through each hop in order. # Proxy chain -- connections tunnel through each hop in order.

View File

@@ -12,6 +12,7 @@ s5p -t 30 # 30s per-hop timeout
s5p -v # debug logging s5p -v # debug logging
s5p -q # errors only s5p -q # errors only
s5p -S http://api:8081/proxies # proxy source API s5p -S http://api:8081/proxies # proxy source API
s5p -r 5 # retry up to 5 proxies
s5p --cprofile # profile to s5p.prof s5p --cprofile # profile to s5p.prof
s5p --cprofile out.prof # profile to custom file s5p --cprofile out.prof # profile to custom file
``` ```

View File

@@ -41,6 +41,7 @@ cp config/example.yaml config/s5p.yaml
```yaml ```yaml
listen: 127.0.0.1:1080 listen: 127.0.0.1:1080
timeout: 10 timeout: 10
retries: 3 # max attempts per connection (proxy_source only)
log_level: info log_level: info
chain: chain:
@@ -101,6 +102,40 @@ s5p -C socks5://127.0.0.1:9050 -S http://10.200.1.250:8081/proxies
The API must return JSON: `{"proxies": [{"proto": "socks5", "proxy": "host:port"}, ...]}`. The API must return JSON: `{"proxies": [{"proto": "socks5", "proxy": "host:port"}, ...]}`.
Entries with `null` proto are skipped. Entries with `null` proto are skipped.
## Connection Retry
When `proxy_source` is active, s5p retries failed connections with a different
random proxy. Controlled by the `retries` setting (default: 3). Static-only
chains do not retry (retrying the same chain is pointless).
```yaml
retries: 5 # try up to 5 different proxies per connection
```
```bash
s5p -r 5 -C socks5://127.0.0.1:9050 -S http://api:8081/proxies
```
## Metrics
s5p tracks connection metrics and logs a summary every 60 seconds and on
shutdown:
```
metrics: conn=142 ok=98 fail=44 retries=88 active=3 in=1.2M out=4.5M up=0h05m12s
```
| Counter | Meaning |
|---------|---------|
| `conn` | Total incoming connections |
| `ok` | Successfully connected + relayed |
| `fail` | All retries exhausted |
| `retries` | Total retry attempts |
| `active` | Currently relaying |
| `in` | Bytes client -> remote |
| `out` | Bytes remote -> client |
| `up` | Server uptime |
## Profiling ## Profiling
```bash ```bash

View File

@@ -42,6 +42,10 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
) )
p.add_argument("-v", "--verbose", action="store_true", help="debug logging") p.add_argument("-v", "--verbose", action="store_true", help="debug logging")
p.add_argument("-q", "--quiet", action="store_true", help="errors only") p.add_argument("-q", "--quiet", action="store_true", help="errors only")
p.add_argument(
"-r", "--retries", type=int, metavar="N",
help="max connection attempts per request (default: 3, proxy_source only)",
)
p.add_argument( p.add_argument(
"-S", "--proxy-source", metavar="URL", "-S", "--proxy-source", metavar="URL",
help="proxy source API URL", help="proxy source API URL",
@@ -73,6 +77,9 @@ def main(argv: list[str] | None = None) -> int:
if args.timeout is not None: if args.timeout is not None:
config.timeout = args.timeout config.timeout = args.timeout
if args.retries is not None:
config.retries = args.retries
if args.proxy_source: if args.proxy_source:
config.proxy_source = ProxySourceConfig(url=args.proxy_source) config.proxy_source = ProxySourceConfig(url=args.proxy_source)

View File

@@ -46,6 +46,7 @@ class Config:
listen_port: int = 1080 listen_port: int = 1080
chain: list[ChainHop] = field(default_factory=list) chain: list[ChainHop] = field(default_factory=list)
timeout: float = 10.0 timeout: float = 10.0
retries: int = 3
log_level: str = "info" log_level: str = "info"
proxy_source: ProxySourceConfig | None = None proxy_source: ProxySourceConfig | None = None
@@ -92,6 +93,9 @@ def load_config(path: str | Path) -> Config:
if "timeout" in raw: if "timeout" in raw:
config.timeout = float(raw["timeout"]) config.timeout = float(raw["timeout"])
if "retries" in raw:
config.retries = int(raw["retries"])
if "log_level" in raw: if "log_level" in raw:
config.log_level = raw["log_level"] config.log_level = raw["log_level"]

49
src/s5p/metrics.py Normal file
View File

@@ -0,0 +1,49 @@
"""Connection metrics and counters."""
from __future__ import annotations
import threading
import time
class Metrics:
"""Thread-safe connection metrics.
Counters use a lock for consistency but tolerate minor races
on hot paths (bytes_in/bytes_out) for performance.
"""
def __init__(self) -> None:
self._lock = threading.Lock()
self.connections: int = 0
self.success: int = 0
self.failed: int = 0
self.retries: int = 0
self.bytes_in: int = 0
self.bytes_out: int = 0
self.active: int = 0
self.started: float = time.monotonic()
def summary(self) -> str:
"""One-line log-friendly summary."""
with self._lock:
uptime = time.monotonic() - self.started
h, rem = divmod(int(uptime), 3600)
m, s = divmod(rem, 60)
return (
f"conn={self.connections} ok={self.success} fail={self.failed} "
f"retries={self.retries} active={self.active} "
f"in={_human_bytes(self.bytes_in)} out={_human_bytes(self.bytes_out)} "
f"up={h}h{m:02d}m{s:02d}s"
)
def _human_bytes(n: int) -> str:
"""Format byte count in human-readable form."""
for unit in ("B", "K", "M", "G", "T"):
if abs(n) < 1024:
if unit == "B":
return f"{n}{unit}"
return f"{n:.1f}{unit}"
n /= 1024 # type: ignore[assignment]
return f"{n:.1f}P"

View File

@@ -9,6 +9,7 @@ import struct
import time import time
from .config import ChainHop, Config from .config import ChainHop, Config
from .metrics import Metrics
from .source import ProxySource from .source import ProxySource
from .proto import ( from .proto import (
ProtoError, ProtoError,
@@ -30,13 +31,15 @@ BUFFER_SIZE = 65536
async def _relay( async def _relay(
reader: asyncio.StreamReader, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter, writer: asyncio.StreamWriter,
) -> None: ) -> int:
"""Unidirectional data relay.""" """Unidirectional data relay. Returns total bytes transferred."""
total = 0
try: try:
while True: while True:
data = await reader.read(BUFFER_SIZE) data = await reader.read(BUFFER_SIZE)
if not data: if not data:
break break
total += len(data)
writer.write(data) writer.write(data)
await writer.drain() await writer.drain()
except (ConnectionError, asyncio.CancelledError, OSError): except (ConnectionError, asyncio.CancelledError, OSError):
@@ -47,6 +50,7 @@ async def _relay(
await writer.wait_closed() await writer.wait_closed()
except (OSError, ConnectionError): except (OSError, ConnectionError):
pass pass
return total
# -- chain building ---------------------------------------------------------- # -- chain building ----------------------------------------------------------
@@ -133,11 +137,15 @@ async def _handle_client(
client_writer: asyncio.StreamWriter, client_writer: asyncio.StreamWriter,
config: Config, config: Config,
proxy_source: ProxySource | None = None, proxy_source: ProxySource | None = None,
metrics: Metrics | None = None,
) -> None: ) -> None:
"""Handle a single SOCKS5 client connection.""" """Handle a single SOCKS5 client connection."""
peer = client_writer.get_extra_info("peername") peer = client_writer.get_extra_info("peername")
tag = f"{peer[0]}:{peer[1]}" if peer else "?" tag = f"{peer[0]}:{peer[1]}" if peer else "?"
if metrics:
metrics.connections += 1
try: try:
# -- greeting -- # -- greeting --
header = await asyncio.wait_for(client_reader.readexactly(2), timeout=10.0) header = await asyncio.wait_for(client_reader.readexactly(2), timeout=10.0)
@@ -166,33 +174,60 @@ async def _handle_client(
target_host, target_port = await read_socks5_address(client_reader) target_host, target_port = await read_socks5_address(client_reader)
logger.info("[%s] connect %s:%d", tag, target_host, target_port) logger.info("[%s] connect %s:%d", tag, target_host, target_port)
# -- build chain -- # -- build chain (with retry) --
effective_chain = list(config.chain) attempts = config.retries if proxy_source else 1
if proxy_source: last_err: Exception | None = None
hop = await proxy_source.get()
if hop:
effective_chain.append(hop)
logger.debug("[%s] +proxy %s", tag, hop)
t0 = time.monotonic() for attempt in range(attempts):
remote_reader, remote_writer = await build_chain( effective_chain = list(config.chain)
effective_chain, target_host, target_port, timeout=config.timeout if proxy_source:
) hop = await proxy_source.get()
dt = time.monotonic() - t0 if hop:
logger.debug("[%s] chain up in %.0fms", tag, dt * 1000) effective_chain.append(hop)
logger.debug("[%s] +proxy %s", tag, hop)
try:
t0 = time.monotonic()
remote_reader, remote_writer = await build_chain(
effective_chain, target_host, target_port, timeout=config.timeout
)
dt = time.monotonic() - t0
logger.debug("[%s] chain up in %.0fms", tag, dt * 1000)
break
except (ProtoError, asyncio.TimeoutError, ConnectionError, OSError) as e:
last_err = e
if metrics:
metrics.retries += 1
if attempt + 1 < attempts:
logger.debug("[%s] attempt %d/%d failed: %s", tag, attempt + 1, attempts, e)
continue
raise last_err
# -- success -- # -- success --
if metrics:
metrics.success += 1
metrics.active += 1
client_writer.write(_socks5_reply(Socks5Reply.SUCCEEDED)) client_writer.write(_socks5_reply(Socks5Reply.SUCCEEDED))
await client_writer.drain() await client_writer.drain()
# -- relay -- # -- relay --
await asyncio.gather( try:
_relay(client_reader, remote_writer), bytes_up, bytes_down = await asyncio.gather(
_relay(remote_reader, client_writer), _relay(client_reader, remote_writer),
) _relay(remote_reader, client_writer),
)
if metrics:
metrics.bytes_in += bytes_up
metrics.bytes_out += bytes_down
finally:
if metrics:
metrics.active -= 1
except ProtoError as e: except ProtoError as e:
logger.warning("[%s] %s", tag, e) logger.warning("[%s] %s", tag, e)
if metrics:
metrics.failed += 1
try: try:
client_writer.write(_socks5_reply(e.reply)) client_writer.write(_socks5_reply(e.reply))
await client_writer.drain() await client_writer.drain()
@@ -200,6 +235,8 @@ async def _handle_client(
pass pass
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.warning("[%s] timeout", tag) logger.warning("[%s] timeout", tag)
if metrics:
metrics.failed += 1
try: try:
client_writer.write(_socks5_reply(Socks5Reply.TTL_EXPIRED)) client_writer.write(_socks5_reply(Socks5Reply.TTL_EXPIRED))
await client_writer.drain() await client_writer.drain()
@@ -207,6 +244,8 @@ async def _handle_client(
pass pass
except (ConnectionError, OSError) as e: except (ConnectionError, OSError) as e:
logger.debug("[%s] %s", tag, e) logger.debug("[%s] %s", tag, e)
if metrics:
metrics.failed += 1
try: try:
client_writer.write(_socks5_reply(Socks5Reply.CONNECTION_REFUSED)) client_writer.write(_socks5_reply(Socks5Reply.CONNECTION_REFUSED))
await client_writer.drain() await client_writer.drain()
@@ -214,6 +253,8 @@ async def _handle_client(
pass pass
except Exception: except Exception:
logger.exception("[%s] unexpected error", tag) logger.exception("[%s] unexpected error", tag)
if metrics:
metrics.failed += 1
finally: finally:
try: try:
client_writer.close() client_writer.close()
@@ -225,15 +266,28 @@ async def _handle_client(
# -- entry point ------------------------------------------------------------- # -- entry point -------------------------------------------------------------
async def _metrics_logger(metrics: Metrics, stop: asyncio.Event) -> None:
"""Log metrics summary every 60 seconds."""
while not stop.is_set():
try:
await asyncio.wait_for(stop.wait(), timeout=60.0)
except asyncio.TimeoutError:
pass
if not stop.is_set():
logger.info("metrics: %s", metrics.summary())
async def serve(config: Config) -> None: async def serve(config: Config) -> None:
"""Start the SOCKS5 proxy server.""" """Start the SOCKS5 proxy server."""
metrics = Metrics()
proxy_source: ProxySource | None = None proxy_source: ProxySource | None = None
if config.proxy_source and config.proxy_source.url: if config.proxy_source and config.proxy_source.url:
proxy_source = ProxySource(config.proxy_source) proxy_source = ProxySource(config.proxy_source)
await proxy_source.start() await proxy_source.start()
async def on_client(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None: async def on_client(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None:
await _handle_client(r, w, config, proxy_source) await _handle_client(r, w, config, proxy_source, metrics)
srv = await asyncio.start_server(on_client, config.listen_host, config.listen_port) srv = await asyncio.start_server(on_client, config.listen_host, config.listen_port)
addrs = ", ".join(str(s.getsockname()) for s in srv.sockets) addrs = ", ".join(str(s.getsockname()) for s in srv.sockets)
@@ -247,6 +301,7 @@ async def serve(config: Config) -> None:
if proxy_source: if proxy_source:
logger.info(" proxy source: %s (%d proxies)", config.proxy_source.url, proxy_source.count) logger.info(" proxy source: %s (%d proxies)", config.proxy_source.url, proxy_source.count)
logger.info(" retries: %d", config.retries)
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
stop = loop.create_future() stop = loop.create_future()
@@ -254,6 +309,12 @@ async def serve(config: Config) -> None:
for sig in (signal.SIGTERM, signal.SIGINT): for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda s=sig: stop.set_result(s)) loop.add_signal_handler(sig, lambda s=sig: stop.set_result(s))
metrics_stop = asyncio.Event()
metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop))
async with srv: async with srv:
sig = await stop sig = await stop
logger.info("received %s, shutting down", signal.Signals(sig).name) logger.info("received %s, shutting down", signal.Signals(sig).name)
logger.info("metrics: %s", metrics.summary())
metrics_stop.set()
await metrics_task