From 3593481b30b9669f8040f82f26e525e5e8c4e4da Mon Sep 17 00:00:00 2001 From: user Date: Sat, 21 Feb 2026 20:35:14 +0100 Subject: [PATCH] feat: listener retry override, pool protocol filter, conn pool docs - Per-listener `retries` overrides global default (0 = inherit) - Pool-level `allowed_protos` filters proxies during merge - Connection pooling documented in CHEATSHEET.md - Both features exposed in /config and /status API responses - 12 new tests (config parsing, API exposure, merge filtering) Co-Authored-By: Claude Opus 4.6 --- ROADMAP.md | 4 + TASKS.md | 6 ++ TODO.md | 8 +- config/example.yaml | 2 + docs/CHEATSHEET.md | 79 ++++++++++++---- docs/USAGE.md | 203 +++++++++++++++++++++++++--------------- src/s5p/api.py | 160 ++++++++++++++++++++++++++++---- src/s5p/config.py | 6 ++ src/s5p/pool.py | 2 + src/s5p/server.py | 3 +- tests/test_api.py | 214 +++++++++++++++++++++++++++++++++++++++++-- tests/test_config.py | 67 ++++++++++++++ tests/test_pool.py | 40 ++++++++ 13 files changed, 674 insertions(+), 120 deletions(-) diff --git a/ROADMAP.md b/ROADMAP.md index 1fe345b..5a3ea00 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -39,6 +39,10 @@ - [x] CLI test coverage - [x] Protocol test coverage (SOCKS5/4/HTTP handshakes) - [x] API documentation (full response schemas) +- [x] Prometheus metrics endpoint (`/metrics` OpenMetrics format) +- [x] Listener-level retry override +- [x] Pool-level proxy protocol filter (`allowed_protos`) +- [x] Connection pooling documentation - [ ] UDP ASSOCIATE support (SOCKS5 UDP relay) - [ ] BIND support - [ ] Chain randomization (random order, random subset) diff --git a/TASKS.md b/TASKS.md index 30ab8b8..4dc2ce5 100644 --- a/TASKS.md +++ b/TASKS.md @@ -73,6 +73,12 @@ - [x] CLI argument parsing tests (`tests/test_cli.py`) - [x] Protocol handshake tests (`tests/test_proto.py` -- SOCKS5/4/HTTP) - [x] API reference documentation (`docs/USAGE.md`) +- [x] Prometheus `/metrics` endpoint (OpenMetrics format) + +## Quick Wins +- [x] Listener-level retry override (`retries` per listener) +- [x] Pool-level proxy protocol filter (`allowed_protos` on proxy pool) +- [x] Document connection pooling (`pool_size`/`pool_max_idle` in CHEATSHEET.md) ## Next - [ ] UDP ASSOCIATE support diff --git a/TODO.md b/TODO.md index 4206489..1ee7d67 100644 --- a/TODO.md +++ b/TODO.md @@ -6,14 +6,14 @@ - Chain randomization modes (round-robin, sticky-per-destination) - Systemd socket activation - Per-pool health test chain override (different base chain per pool) -- Pool-level proxy protocol filter (only socks5 from pool X, only http from pool Y) -- Listener-level retry override (different retry count per listener) +- ~~Pool-level proxy protocol filter (only socks5 from pool X, only http from pool Y)~~ (done) +- ~~Listener-level retry override (different retry count per listener)~~ (done) ## Performance - Benchmark relay throughput vs direct connection - Tune buffer sizes for different workloads -- Connection pooling for frequently-used chains +- ~~Connection pooling for frequently-used chains~~ (done: `pool_size`/`pool_max_idle`) ## Security @@ -23,7 +23,7 @@ ## Observability -- Prometheus metrics endpoint (`/metrics` in OpenMetrics format) +- ~~Prometheus metrics endpoint (`/metrics` in OpenMetrics format)~~ (done) - Per-pool health test success rate tracking - Per-pool latency breakdown in `/status` diff --git a/config/example.yaml b/config/example.yaml index b79cb02..d28e1a9 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -28,6 +28,7 @@ chain: # sources: # - url: http://10.200.1.250:8081/proxies/all # mitm: false # filter: mitm=0 query param +# allowed_protos: [socks5] # only accept socks5 from sources # state_file: /data/pool-clean.json # refresh: 300 # test_interval: 120 @@ -115,6 +116,7 @@ chain: # # - listen: 0.0.0.0:1081 # pool: clean +# retries: 5 # override global retries for this listener # chain: # - socks5://127.0.0.1:9050 # - pool # bare: uses default "clean" diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index 0f53ed6..371bf5d 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -132,6 +132,37 @@ curl -x socks5h://alice:s3cret@127.0.0.1:1080 https://example.com No `auth:` key = no authentication required (default). +## Listener Retry Override (config) + +```yaml +listeners: + - listen: 0.0.0.0:1080 + retries: 5 # override global retries + chain: + - socks5://127.0.0.1:9050 + - pool + - listen: 0.0.0.0:1082 + chain: + - socks5://127.0.0.1:9050 # 0 = use global default +``` + +Per-listener `retries` overrides the global `retries` setting. Set to 0 (or +omit) to inherit the global value. + +## Pool Protocol Filter (config) + +```yaml +proxy_pools: + socks_only: + allowed_protos: [socks5] # reject http proxies + sources: + - url: http://api:8081/proxies/all +``` + +When set, proxies not matching `allowed_protos` are silently dropped during +merge. Useful when a source returns mixed protocols but the pool should +only serve a specific type. + ## Multi-Tor Round-Robin (config) ```yaml @@ -150,6 +181,23 @@ pool_size: 8 # pre-warmed TCP conns to first hop (0 = off) pool_max_idle: 30 # evict idle pooled conns (seconds) ``` +## Connection Pool (config) + +```yaml +pool_size: 8 # pre-warmed TCP connections per first hop (0 = off) +pool_max_idle: 30 # evict idle connections after N seconds +``` + +Pre-warms TCP connections to the first hop in the chain. Only the raw TCP +connection is pooled -- SOCKS/HTTP negotiation consumes it. One pool is +created per unique first hop (shared across listeners). Requires at least +one hop in `chain`. + +| Setting | Default | Notes | +|---------|---------|-------| +| `pool_size` | 0 (off) | Connections per first hop | +| `pool_max_idle` | 30 | Idle eviction in seconds | + ## Named Proxy Pools (config) ```yaml @@ -229,7 +277,7 @@ http://user:pass@host:port s5p --api 127.0.0.1:1081 -c config/s5p.yaml # enable API curl -s http://127.0.0.1:1081/status | jq . # runtime status -curl -s http://127.0.0.1:1081/metrics | jq . # full metrics +curl -s http://127.0.0.1:1081/metrics # prometheus metrics curl -s http://127.0.0.1:1081/pool | jq . # all proxies curl -s http://127.0.0.1:1081/pool/alive | jq . # alive only curl -s http://127.0.0.1:1081/config | jq . # current config @@ -267,27 +315,26 @@ python -m pstats ~/.cache/s5p/s5p.prof # container profile output metrics: conn=1842 ok=1790 fail=52 retries=67 active=3 in=50.0M out=1.0G rate=4.72/s p50=198.3ms p95=890.1ms up=1h01m01s pool=42/65 ``` -## Metrics JSON (`/metrics`) +## Prometheus Metrics (`/metrics`) ```bash -curl -s http://127.0.0.1:1081/metrics | jq . +curl -s http://127.0.0.1:1081/metrics ``` -```json -{ - "connections": 1842, - "success": 1790, - "rate": 4.72, - "latency": {"count": 1000, "min": 45.2, "max": 2841.7, "avg": 312.4, "p50": 198.3, "p95": 890.1, "p99": 1523.6}, - "listener_latency": { - "0.0.0.0:1080": {"count": 500, "p50": 1800.2, "p95": 8200.1, "...": "..."}, - "0.0.0.0:1081": {"count": 300, "p50": 1000.1, "p95": 3500.2, "...": "..."}, - "0.0.0.0:1082": {"count": 200, "p50": 400.1, "p95": 1200.5, "...": "..."} - } -} +``` +# TYPE s5p_connections counter +s5p_connections_total 1842 +# TYPE s5p_active_connections gauge +s5p_active_connections 3 +# TYPE s5p_pool_proxies_alive gauge +s5p_pool_proxies_alive{pool="clean"} 30 +# TYPE s5p_chain_latency_seconds summary +s5p_chain_latency_seconds{quantile="0.5"} 0.198300 +s5p_chain_latency_seconds{quantile="0.95"} 0.890100 +# EOF ``` -Per-listener latency also appears in `/status` under each listener entry. +OpenMetrics format. Use `/status` for JSON equivalent. ## Troubleshooting diff --git a/docs/USAGE.md b/docs/USAGE.md index 71c6fe7..53f404b 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -139,6 +139,26 @@ Each pool has independent health testing, state persistence, and source refresh cycles. The `mitm` source filter adds `?mitm=0` or `?mitm=1` to API requests. +### Pool protocol filter + +Use `allowed_protos` to restrict a pool to specific proxy protocols. +Proxies not matching the list are silently dropped during merge, regardless +of source type (API or file). + +```yaml +proxy_pools: + socks_only: + allowed_protos: [socks5] # reject http/socks4 proxies + sources: + - url: http://api:8081/proxies/all + any_proto: + sources: + - url: http://api:8081/proxies/all # no filter, accept all +``` + +Valid values: `socks5`, `socks4`, `http`. Visible in `/config` API response +when set. + ### Backward compatibility The singular `proxy_pool:` key still works -- it registers as pool `"default"`. @@ -523,8 +543,8 @@ api_listen: 127.0.0.1:1081 s5p --api 127.0.0.1:1081 -c config/s5p.yaml ``` -All responses are `application/json`. Errors return `{"error": "message"}` with -appropriate status code (400, 404, 405, 500). +Responses are `application/json` unless noted otherwise. Errors return +`{"error": "message"}` with appropriate status code (400, 404, 405, 500). Settings that require a restart: `listen`, `chain`, `pool_size`, `pool_max_idle`, `api_listen`. @@ -587,43 +607,90 @@ curl -s http://127.0.0.1:1081/status | jq . #### `GET /metrics` -Full metrics counters with rate, latency percentiles, and per-listener breakdown. +Prometheus/OpenMetrics exposition format. Content-Type: +`application/openmetrics-text; version=1.0.0; charset=utf-8`. ```bash -curl -s http://127.0.0.1:1081/metrics | jq . +curl -s http://127.0.0.1:1081/metrics ``` -```json -{ - "connections": 1842, - "success": 1790, - "failed": 52, - "retries": 67, - "auth_failures": 0, - "active": 3, - "bytes_in": 52428800, - "bytes_out": 1073741824, - "uptime": 3661.2, - "rate": 4.72, - "latency": { - "count": 1000, "min": 45.2, "max": 2841.7, - "avg": 312.4, "p50": 198.3, "p95": 890.1, "p99": 1523.6 - }, - "listener_latency": { - "0.0.0.0:1080": {"count": 500, "min": 800.1, "max": 12400.3, "avg": 2100.5, "p50": 1800.2, "p95": 8200.1, "p99": 10500.3}, - "0.0.0.0:1081": {"count": 300, "min": 400.5, "max": 5200.1, "avg": 1200.3, "p50": 1000.1, "p95": 3500.2, "p99": 4800.7} - } -} +``` +# HELP s5p_connections Total connection attempts. +# TYPE s5p_connections counter +s5p_connections_total 1842 +# HELP s5p_connections_success Connections successfully relayed. +# TYPE s5p_connections_success counter +s5p_connections_success_total 1790 +# HELP s5p_connections_failed Connection failures. +# TYPE s5p_connections_failed counter +s5p_connections_failed_total 52 +# HELP s5p_retries Connection retry attempts. +# TYPE s5p_retries counter +s5p_retries_total 67 +# HELP s5p_auth_failures SOCKS5 authentication failures. +# TYPE s5p_auth_failures counter +s5p_auth_failures_total 0 +# HELP s5p_bytes_in Bytes received from clients. +# TYPE s5p_bytes_in counter +s5p_bytes_in_total 52428800 +# HELP s5p_bytes_out Bytes sent to clients. +# TYPE s5p_bytes_out counter +s5p_bytes_out_total 1073741824 +# HELP s5p_active_connections Currently open connections. +# TYPE s5p_active_connections gauge +s5p_active_connections 3 +# HELP s5p_uptime_seconds Seconds since server start. +# TYPE s5p_uptime_seconds gauge +s5p_uptime_seconds 3661.2 +# HELP s5p_connection_rate Connections per second (rolling window). +# TYPE s5p_connection_rate gauge +s5p_connection_rate 4.72 +# HELP s5p_pool_proxies_alive Alive proxies in pool. +# TYPE s5p_pool_proxies_alive gauge +s5p_pool_proxies_alive{pool="clean"} 30 +s5p_pool_proxies_alive{pool="mitm"} 12 +# HELP s5p_pool_proxies_total Total proxies in pool. +# TYPE s5p_pool_proxies_total gauge +s5p_pool_proxies_total{pool="clean"} 45 +s5p_pool_proxies_total{pool="mitm"} 20 +# HELP s5p_chain_latency_seconds Chain build latency in seconds. +# TYPE s5p_chain_latency_seconds summary +s5p_chain_latency_seconds{quantile="0.5"} 0.198300 +s5p_chain_latency_seconds{quantile="0.95"} 0.890100 +s5p_chain_latency_seconds{quantile="0.99"} 1.523600 +s5p_chain_latency_seconds_count 1000 +s5p_chain_latency_seconds_sum 312.400000 +# EOF ``` -| Field | Type | Description | -|-------|------|-------------| -| `retries` | int | Total retry attempts | -| `auth_failures` | int | SOCKS5 auth failures | -| `latency` | object/null | Aggregate latency stats (ms), null if no samples | -| `latency.count` | int | Number of samples in buffer (max 1000) | -| `latency.p50/p95/p99` | float | Percentile latency (ms) | -| `listener_latency` | object | Per-listener latency, keyed by `host:port` | +**Metrics exposed:** + +| Metric | Type | Labels | Description | +|--------|------|--------|-------------| +| `s5p_connections` | counter | -- | Total connection attempts | +| `s5p_connections_success` | counter | -- | Successfully relayed | +| `s5p_connections_failed` | counter | -- | Connection failures | +| `s5p_retries` | counter | -- | Retry attempts | +| `s5p_auth_failures` | counter | -- | SOCKS5 auth failures | +| `s5p_bytes_in` | counter | -- | Bytes received from clients | +| `s5p_bytes_out` | counter | -- | Bytes sent to clients | +| `s5p_active_connections` | gauge | -- | Currently open connections | +| `s5p_uptime_seconds` | gauge | -- | Seconds since server start | +| `s5p_connection_rate` | gauge | -- | Connections/sec (rolling window) | +| `s5p_pool_proxies_alive` | gauge | `pool` | Alive proxies per pool | +| `s5p_pool_proxies_total` | gauge | `pool` | Total proxies per pool | +| `s5p_chain_latency_seconds` | summary | `quantile` | Chain build latency (p50/p95/p99) | +| `s5p_listener_chain_latency_seconds` | summary | `listener`, `quantile` | Per-listener chain latency | + +**Prometheus scrape config:** + +```yaml +scrape_configs: + - job_name: s5p + metrics_path: /metrics + static_configs: + - targets: ["127.0.0.1:1081"] +``` #### `GET /pool` @@ -892,6 +959,31 @@ retries: 5 # try up to 5 different proxies per connection s5p -r 5 -C socks5://127.0.0.1:9050 -S http://api:8081/proxies ``` +### Per-listener retry override + +Each listener can override the global `retries` setting. Set `retries` on +a listener to use a different retry count for that port. A value of 0 (or +omitting the key) inherits the global setting. + +```yaml +retries: 3 # global default + +listeners: + - listen: 0.0.0.0:1080 + retries: 5 # deep chain: more retries + chain: + - socks5://127.0.0.1:9050 + - pool + - pool + - listen: 0.0.0.0:1082 + chain: + - socks5://127.0.0.1:9050 # Tor only: uses global retries=3 +``` + +The effective retry count for a listener is `listener.retries` if set, +otherwise `config.retries`. Visible in `/config` and `/status` API responses +when overridden. + ## Hot Reload Send `SIGHUP` to reload the config file without restarting: @@ -941,48 +1033,11 @@ metrics: conn=1842 ok=1790 fail=52 retries=67 active=3 in=50.0M out=1.0G rate=4. | `up` | Server uptime | | `pool` | Alive/total proxies (only when pool is active) | -### `/metrics` JSON response +### `/metrics` OpenMetrics endpoint -`GET /metrics` returns all counters plus rate, latency percentiles, and -per-listener latency breakdowns: - -```json -{ - "connections": 1842, - "success": 1790, - "failed": 52, - "retries": 67, - "active": 3, - "bytes_in": 52428800, - "bytes_out": 1073741824, - "uptime": 3661.2, - "rate": 4.72, - "latency": { - "count": 1000, - "min": 45.2, - "max": 2841.7, - "avg": 312.4, - "p50": 198.3, - "p95": 890.1, - "p99": 1523.6 - }, - "listener_latency": { - "0.0.0.0:1080": {"count": 500, "min": 800.1, "max": 12400.3, "avg": 2100.5, "p50": 1800.2, "p95": 8200.1, "p99": 10500.3}, - "0.0.0.0:1081": {"count": 300, "min": 400.5, "max": 5200.1, "avg": 1200.3, "p50": 1000.1, "p95": 3500.2, "p99": 4800.7}, - "0.0.0.0:1082": {"count": 200, "min": 150.2, "max": 2000.1, "avg": 500.3, "p50": 400.1, "p95": 1200.5, "p99": 1800.2} - } -} -``` - -| Field | Type | Description | -|-------|------|-------------| -| `rate` | float | Connections/sec (rolling window of last 256 events) | -| `latency` | object/null | Aggregate chain setup latency in ms (null if no samples) | -| `latency.count` | int | Number of samples in buffer (max 1000) | -| `latency.p50` | float | Median latency (ms) | -| `latency.p95` | float | 95th percentile (ms) | -| `latency.p99` | float | 99th percentile (ms) | -| `listener_latency` | object | Per-listener latency, keyed by `host:port` | +`GET /metrics` returns all counters, gauges, pool stats, and latency summaries +in OpenMetrics format (see [API Reference](#get-metrics) above). Use `/status` +for the JSON equivalent with aggregate data. ### Per-listener latency diff --git a/src/s5p/api.py b/src/s5p/api.py index 74f9585..a94baf5 100644 --- a/src/s5p/api.py +++ b/src/s5p/api.py @@ -30,23 +30,33 @@ def _parse_request(data: bytes) -> tuple[str, str]: return parts[0].upper(), parts[1].split("?", 1)[0] +def _http_response( + writer: asyncio.StreamWriter, + status: int, + payload: bytes, + content_type: str = "application/json", +) -> None: + """Write an HTTP response and close.""" + phrases = {200: "OK", 400: "Bad Request", 404: "Not Found", + 405: "Method Not Allowed", 500: "Internal Server Error"} + header = ( + f"HTTP/1.1 {status} {phrases.get(status, 'Error')}\r\n" + f"Content-Type: {content_type}\r\n" + f"Content-Length: {len(payload)}\r\n" + f"Connection: close\r\n" + f"\r\n" + ) + writer.write(header.encode() + payload) + + def _json_response( writer: asyncio.StreamWriter, status: int, body: dict | list, ) -> None: """Write an HTTP response with JSON body and close.""" - phrases = {200: "OK", 400: "Bad Request", 404: "Not Found", - 405: "Method Not Allowed", 500: "Internal Server Error"} payload = json.dumps(body, separators=(",", ":")).encode() - header = ( - f"HTTP/1.1 {status} {phrases.get(status, 'Error')}\r\n" - f"Content-Type: application/json\r\n" - f"Content-Length: {len(payload)}\r\n" - f"Connection: close\r\n" - f"\r\n" - ) - writer.write(header.encode() + payload) + _http_response(writer, status, payload, "application/json") # -- helpers ----------------------------------------------------------------- @@ -103,6 +113,7 @@ def _handle_status(ctx: dict) -> tuple[int, dict]: **({"pool": lc.pool_name} if lc.pool_name else {}), **(_pool_seq_entry(lc) if _multi_pool(lc) else {}), **({"auth": True} if lc.auth else {}), + **({"retries": lc.retries} if lc.retries else {}), "latency": metrics.get_listener_latency( f"{lc.listen_host}:{lc.listen_port}" ).stats(), @@ -112,9 +123,111 @@ def _handle_status(ctx: dict) -> tuple[int, dict]: return 200, data -def _handle_metrics(ctx: dict) -> tuple[int, dict]: - """GET /metrics -- full metrics counters.""" - return 200, ctx["metrics"].to_dict() +def _render_openmetrics(ctx: dict) -> str: + """Render all metrics in OpenMetrics text format.""" + m: Metrics = ctx["metrics"] + lines: list[str] = [] + + def _counter(name: str, help_text: str, value: int) -> None: + lines.append(f"# HELP {name} {help_text}") + lines.append(f"# TYPE {name} counter") + lines.append(f"{name}_total {value}") + + def _gauge(name: str, help_text: str, value: float) -> None: + lines.append(f"# HELP {name} {help_text}") + lines.append(f"# TYPE {name} gauge") + lines.append(f"{name} {value}") + + def _summary(name: str, help_text: str, q: dict, + labels: str = "") -> None: + lines.append(f"# HELP {name} {help_text}") + lines.append(f"# TYPE {name} summary") + lb = f"{{{labels}," if labels else "{" + for quantile in ("0.5", "0.95", "0.99"): + lines.append(f'{name}{lb}quantile="{quantile}"}} {q[quantile]:.6f}') + lw = f"{{{labels}}}" if labels else "" + lines.append(f"{name}_count{lw} {q['count']}") + lines.append(f"{name}_sum{lw} {q['sum']:.6f}") + + # -- counters + _counter("s5p_connections", "Total connection attempts.", m.connections) + _counter("s5p_connections_success", + "Connections successfully relayed.", m.success) + _counter("s5p_connections_failed", "Connection failures.", m.failed) + _counter("s5p_retries", "Connection retry attempts.", m.retries) + _counter("s5p_auth_failures", + "SOCKS5 authentication failures.", m.auth_failures) + _counter("s5p_bytes_in", + "Bytes received from clients.", m.bytes_in) + _counter("s5p_bytes_out", + "Bytes sent to clients.", m.bytes_out) + + # -- gauges + _gauge("s5p_active_connections", + "Currently open connections.", m.active) + _gauge("s5p_uptime_seconds", + "Seconds since server start.", + round(time.monotonic() - m.started, 1)) + _gauge("s5p_connection_rate", + "Connections per second (rolling window).", + round(m.conn_rate.rate(), 4)) + + # -- pool gauges + pools: dict = ctx.get("pools") or {} + if pools: + lines.append("# HELP s5p_pool_proxies_alive Alive proxies in pool.") + lines.append("# TYPE s5p_pool_proxies_alive gauge") + for name, p in pools.items(): + lines.append(f's5p_pool_proxies_alive{{pool="{name}"}} {p.alive_count}') + lines.append("# HELP s5p_pool_proxies_total Total proxies in pool.") + lines.append("# TYPE s5p_pool_proxies_total gauge") + for name, p in pools.items(): + lines.append(f's5p_pool_proxies_total{{pool="{name}"}} {p.count}') + elif ctx.get("pool"): + p = ctx["pool"] + _gauge("s5p_pool_proxies_alive", "Alive proxies in pool.", p.alive_count) + _gauge("s5p_pool_proxies_total", "Total proxies in pool.", p.count) + + # -- latency summary (global) + q = m.latency.quantiles() + if q: + _summary("s5p_chain_latency_seconds", + "Chain build latency in seconds.", q) + + # -- per-listener latency summaries + if m.listener_latency: + lines.append( + "# HELP s5p_listener_chain_latency_seconds " + "Per-listener chain build latency in seconds.") + lines.append("# TYPE s5p_listener_chain_latency_seconds summary") + for key, tracker in m.listener_latency.items(): + lq = tracker.quantiles() + if not lq: + continue + for quantile in ("0.5", "0.95", "0.99"): + lines.append( + f's5p_listener_chain_latency_seconds' + f'{{listener="{key}",quantile="{quantile}"}} ' + f'{lq[quantile]:.6f}') + lines.append( + f's5p_listener_chain_latency_seconds_count' + f'{{listener="{key}"}} {lq["count"]}') + lines.append( + f's5p_listener_chain_latency_seconds_sum' + f'{{listener="{key}"}} {lq["sum"]:.6f}') + + lines.append("# EOF") + return "\n".join(lines) + "\n" + + +_OPENMETRICS_CT = ( + "application/openmetrics-text; version=1.0.0; charset=utf-8" +) + + +def _handle_metrics(ctx: dict) -> tuple[int, str]: + """GET /metrics -- OpenMetrics exposition.""" + return 200, _render_openmetrics(ctx) def _handle_pool(ctx: dict, alive_only: bool = False) -> tuple[int, dict]: @@ -183,6 +296,7 @@ def _handle_config(ctx: dict) -> tuple[int, dict]: **({"pool": lc.pool_name} if lc.pool_name else {}), **(_pool_seq_entry(lc) if _multi_pool(lc) else {}), **({"auth_users": len(lc.auth)} if lc.auth else {}), + **({"retries": lc.retries} if lc.retries else {}), } for lc in config.listeners ], @@ -202,12 +316,15 @@ def _handle_config(ctx: dict) -> tuple[int, dict]: if src.mitm is not None: s["mitm"] = src.mitm sources.append(s) - pools_data[name] = { + pool_entry: dict = { "sources": sources, "refresh": pp.refresh, "test_interval": pp.test_interval, "max_fails": pp.max_fails, } + if pp.allowed_protos: + pool_entry["allowed_protos"] = pp.allowed_protos + pools_data[name] = pool_entry data["proxy_pools"] = pools_data elif config.proxy_pool: pp = config.proxy_pool @@ -310,8 +427,13 @@ _POST_ROUTES: dict[str, str] = { } -async def _route(method: str, path: str, ctx: dict) -> tuple[int, dict]: - """Dispatch request to the appropriate handler.""" +async def _route( + method: str, path: str, ctx: dict, +) -> tuple[int, dict | str]: + """Dispatch request to the appropriate handler. + + Returns (status, body) where body is a dict (JSON) or str (text). + """ if method == "GET" and path in _GET_ROUTES: name = _GET_ROUTES[path] if name == "status": @@ -366,7 +488,11 @@ async def _handle_connection( return status, body = await _route(method, path, ctx) - _json_response(writer, status, body) + if isinstance(body, str): + _http_response(writer, status, body.encode(), + _OPENMETRICS_CT) + else: + _json_response(writer, status, body) await writer.drain() except (TimeoutError, ConnectionError, OSError): pass diff --git a/src/s5p/config.py b/src/s5p/config.py index c0128f0..cda7016 100644 --- a/src/s5p/config.py +++ b/src/s5p/config.py @@ -57,6 +57,7 @@ class ProxyPoolConfig: max_fails: int = 3 state_file: str = "" report_url: str = "" + allowed_protos: list[str] = field(default_factory=list) def __post_init__(self) -> None: """Backward compat: extract hostname from legacy test_url.""" @@ -89,6 +90,7 @@ class ListenerConfig: pool_name: str = "" bypass: list[str] = field(default_factory=list) auth: dict[str, str] = field(default_factory=dict) + retries: int = 0 # 0 = use global default @property def pool_hops(self) -> int: @@ -192,6 +194,8 @@ def _parse_pool_config(pool_raw: dict) -> ProxyPoolConfig: } if "test_targets" in pool_raw: kwargs["test_targets"] = list(pool_raw["test_targets"]) + if "allowed_protos" in pool_raw: + kwargs["allowed_protos"] = list(pool_raw["allowed_protos"]) return ProxyPoolConfig(**kwargs) @@ -330,6 +334,8 @@ def load_config(path: str | Path) -> Config: auth_raw = entry["auth"] if isinstance(auth_raw, dict): lc.auth = {str(k): str(v) for k, v in auth_raw.items()} + if "retries" in entry: + lc.retries = int(entry["retries"]) if "pool" in entry: lc.pool_name = entry["pool"] default_pool = lc.pool_name or "default" diff --git a/src/s5p/pool.py b/src/s5p/pool.py index 5ceb7ca..0852638 100644 --- a/src/s5p/pool.py +++ b/src/s5p/pool.py @@ -241,6 +241,8 @@ class ProxyPool: now = time.time() seen: set[str] = set() for hop in proxies: + if self._cfg.allowed_protos and hop.proto not in self._cfg.allowed_protos: + continue key = f"{hop.proto}://{hop.host}:{hop.port}" seen.add(key) if key in self._proxies: diff --git a/src/s5p/server.py b/src/s5p/server.py index 914281b..09f069a 100644 --- a/src/s5p/server.py +++ b/src/s5p/server.py @@ -458,7 +458,8 @@ async def serve(config: Config) -> None: ) -> None: async with sem: await _handle_client( - r, w, _lc, config.timeout, config.retries, + r, w, _lc, config.timeout, + _lc.retries or config.retries, _pools, metrics, _hp, tor_rr, hop_pools, ) diff --git a/tests/test_api.py b/tests/test_api.py index 54872dd..8dde753 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -13,6 +13,7 @@ from s5p.api import ( _handle_tor_newnym, _json_response, _parse_request, + _render_openmetrics, _route, ) from s5p.config import ChainHop, Config, ListenerConfig, PoolSourceConfig, ProxyPoolConfig @@ -222,6 +223,87 @@ class TestHandleConfigAuth: assert "s3cret" not in str(body) +class TestHandleStatusRetries: + """Test retries in /status listener entries.""" + + def test_retries_present_when_set(self): + config = Config( + listeners=[ + ListenerConfig( + listen_host="0.0.0.0", listen_port=1080, + retries=5, + ), + ], + ) + ctx = _make_ctx(config=config) + _, body = _handle_status(ctx) + assert body["listeners"][0]["retries"] == 5 + + def test_retries_absent_when_zero(self): + config = Config( + listeners=[ + ListenerConfig(listen_host="0.0.0.0", listen_port=1080), + ], + ) + ctx = _make_ctx(config=config) + _, body = _handle_status(ctx) + assert "retries" not in body["listeners"][0] + + +class TestHandleConfigRetries: + """Test retries in /config listener entries.""" + + def test_retries_present_when_set(self): + config = Config( + listeners=[ + ListenerConfig( + listen_host="0.0.0.0", listen_port=1080, + retries=7, + ), + ], + ) + ctx = _make_ctx(config=config) + _, body = _handle_config(ctx) + assert body["listeners"][0]["retries"] == 7 + + def test_retries_absent_when_zero(self): + config = Config( + listeners=[ + ListenerConfig(listen_host="0.0.0.0", listen_port=1080), + ], + ) + ctx = _make_ctx(config=config) + _, body = _handle_config(ctx) + assert "retries" not in body["listeners"][0] + + +class TestHandleConfigAllowedProtos: + """Test allowed_protos in /config pool entries.""" + + def test_allowed_protos_present(self): + pp = ProxyPoolConfig( + sources=[], + allowed_protos=["socks5"], + ) + config = Config( + proxy_pools={"socks_only": pp}, + listeners=[ListenerConfig()], + ) + ctx = _make_ctx(config=config) + _, body = _handle_config(ctx) + assert body["proxy_pools"]["socks_only"]["allowed_protos"] == ["socks5"] + + def test_allowed_protos_absent_when_empty(self): + pp = ProxyPoolConfig(sources=[]) + config = Config( + proxy_pools={"default": pp}, + listeners=[ListenerConfig()], + ) + ctx = _make_ctx(config=config) + _, body = _handle_config(ctx) + assert "allowed_protos" not in body["proxy_pools"]["default"] + + class TestHandleStatusPools: """Test GET /status with multiple named pools.""" @@ -292,20 +374,136 @@ class TestHandleStatusMultiPool: class TestHandleMetrics: - """Test GET /metrics handler.""" + """Test GET /metrics handler (OpenMetrics format).""" - def test_returns_dict(self): + def test_returns_openmetrics_string(self): ctx = _make_ctx() ctx["metrics"].connections = 42 ctx["metrics"].bytes_in = 1024 status, body = _handle_metrics(ctx) assert status == 200 - assert body["connections"] == 42 - assert body["bytes_in"] == 1024 - assert "uptime" in body - assert "rate" in body - assert "latency" in body - assert "listener_latency" in body + assert isinstance(body, str) + assert body.rstrip().endswith("# EOF") + + def test_counter_values(self): + ctx = _make_ctx() + ctx["metrics"].connections = 42 + ctx["metrics"].bytes_in = 1024 + _, body = _handle_metrics(ctx) + assert "s5p_connections_total 42" in body + assert "s5p_bytes_in_total 1024" in body + + +class TestRenderOpenMetrics: + """Test OpenMetrics text rendering.""" + + def test_eof_terminator(self): + ctx = _make_ctx() + text = _render_openmetrics(ctx) + assert text.rstrip().endswith("# EOF") + assert text.endswith("\n") + + def test_type_declarations(self): + ctx = _make_ctx() + text = _render_openmetrics(ctx) + assert "# TYPE s5p_connections counter" in text + assert "# TYPE s5p_active_connections gauge" in text + assert "# TYPE s5p_uptime_seconds gauge" in text + + def test_help_lines(self): + ctx = _make_ctx() + text = _render_openmetrics(ctx) + assert "# HELP s5p_connections Total connection attempts." in text + assert "# HELP s5p_active_connections Currently open connections." in text + + def test_counter_values(self): + ctx = _make_ctx() + ctx["metrics"].connections = 100 + ctx["metrics"].success = 95 + ctx["metrics"].failed = 5 + ctx["metrics"].retries = 10 + ctx["metrics"].auth_failures = 2 + ctx["metrics"].bytes_in = 4096 + ctx["metrics"].bytes_out = 8192 + text = _render_openmetrics(ctx) + assert "s5p_connections_total 100" in text + assert "s5p_connections_success_total 95" in text + assert "s5p_connections_failed_total 5" in text + assert "s5p_retries_total 10" in text + assert "s5p_auth_failures_total 2" in text + assert "s5p_bytes_in_total 4096" in text + assert "s5p_bytes_out_total 8192" in text + + def test_gauge_values(self): + ctx = _make_ctx() + ctx["metrics"].active = 7 + text = _render_openmetrics(ctx) + assert "s5p_active_connections 7" in text + assert "s5p_uptime_seconds " in text + assert "s5p_connection_rate " in text + + def test_no_latency_when_empty(self): + ctx = _make_ctx() + text = _render_openmetrics(ctx) + assert "s5p_chain_latency_seconds" not in text + + def test_latency_summary(self): + ctx = _make_ctx() + for i in range(1, 101): + ctx["metrics"].latency.record(i / 1000) + text = _render_openmetrics(ctx) + assert "# TYPE s5p_chain_latency_seconds summary" in text + assert 's5p_chain_latency_seconds{quantile="0.5"}' in text + assert 's5p_chain_latency_seconds{quantile="0.95"}' in text + assert 's5p_chain_latency_seconds{quantile="0.99"}' in text + assert "s5p_chain_latency_seconds_count 100" in text + assert "s5p_chain_latency_seconds_sum " in text + + def test_listener_latency_summary(self): + ctx = _make_ctx() + tracker = ctx["metrics"].get_listener_latency("0.0.0.0:1080") + for i in range(1, 51): + tracker.record(i / 1000) + text = _render_openmetrics(ctx) + assert "# TYPE s5p_listener_chain_latency_seconds summary" in text + assert ( + 's5p_listener_chain_latency_seconds{listener="0.0.0.0:1080",' + 'quantile="0.5"}' + ) in text + assert ( + 's5p_listener_chain_latency_seconds_count{listener="0.0.0.0:1080"} 50' + ) in text + + def test_pool_gauges_multi(self): + pool_a = MagicMock() + pool_a.alive_count = 5 + pool_a.count = 10 + pool_a.name = "clean" + pool_b = MagicMock() + pool_b.alive_count = 3 + pool_b.count = 8 + pool_b.name = "mitm" + ctx = _make_ctx(pools={"clean": pool_a, "mitm": pool_b}) + text = _render_openmetrics(ctx) + assert '# TYPE s5p_pool_proxies_alive gauge' in text + assert 's5p_pool_proxies_alive{pool="clean"} 5' in text + assert 's5p_pool_proxies_alive{pool="mitm"} 3' in text + assert 's5p_pool_proxies_total{pool="clean"} 10' in text + assert 's5p_pool_proxies_total{pool="mitm"} 8' in text + + def test_pool_gauges_single(self): + pool = MagicMock() + pool.alive_count = 12 + pool.count = 20 + ctx = _make_ctx(pool=pool) + text = _render_openmetrics(ctx) + assert "s5p_pool_proxies_alive 12" in text + assert "s5p_pool_proxies_total 20" in text + + def test_no_pool_metrics_when_unconfigured(self): + ctx = _make_ctx() + text = _render_openmetrics(ctx) + assert "s5p_pool_proxies" not in text class TestHandlePool: diff --git a/tests/test_config.py b/tests/test_config.py index 4d034b9..1d11a2a 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -307,6 +307,39 @@ class TestProxyPools: assert c.listeners[1].pool_hops == 0 +class TestAllowedProtos: + """Test pool-level allowed_protos config.""" + + def test_allowed_protos_from_yaml(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "proxy_pools:\n" + " socks_only:\n" + " sources: []\n" + " allowed_protos: [socks5]\n" + " any:\n" + " sources: []\n" + ) + c = load_config(cfg_file) + assert c.proxy_pools["socks_only"].allowed_protos == ["socks5"] + assert c.proxy_pools["any"].allowed_protos == [] + + def test_allowed_protos_multiple(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "proxy_pool:\n" + " sources: []\n" + " allowed_protos: [socks5, http]\n" + ) + c = load_config(cfg_file) + assert c.proxy_pool.allowed_protos == ["socks5", "http"] + + def test_allowed_protos_default_empty(self): + from s5p.config import ProxyPoolConfig + cfg = ProxyPoolConfig() + assert cfg.allowed_protos == [] + + class TestTorNodes: """Test tor_nodes config parsing.""" @@ -593,6 +626,40 @@ class TestListenerPoolCompat: assert lc.pool_hops == 0 +class TestListenerRetries: + """Test per-listener retry override config.""" + + def test_retries_default(self): + lc = ListenerConfig() + assert lc.retries == 0 + + def test_retries_from_yaml(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "listeners:\n" + " - listen: 1080\n" + " retries: 5\n" + " chain:\n" + " - socks5://127.0.0.1:9050\n" + " - pool\n" + " - listen: 1081\n" + " chain:\n" + " - socks5://127.0.0.1:9050\n" + ) + c = load_config(cfg_file) + assert c.listeners[0].retries == 5 + assert c.listeners[1].retries == 0 + + def test_retries_absent_from_yaml(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "listeners:\n" + " - listen: 1080\n" + ) + c = load_config(cfg_file) + assert c.listeners[0].retries == 0 + + class TestAuthConfig: """Test auth field in listener config.""" diff --git a/tests/test_pool.py b/tests/test_pool.py index f4c9f05..004c03a 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -171,6 +171,46 @@ class TestProxyPoolMerge: assert pool.count == 1 +class TestProxyPoolAllowedProtos: + """Test pool-level proxy protocol filter.""" + + def test_allowed_protos_filters_merge(self): + cfg = ProxyPoolConfig(sources=[], allowed_protos=["socks5"]) + pool = ProxyPool(cfg, [], timeout=10.0) + proxies = [ + ChainHop(proto="socks5", host="1.2.3.4", port=1080), + ChainHop(proto="http", host="5.6.7.8", port=8080), + ChainHop(proto="socks4", host="9.9.9.9", port=1080), + ] + pool._merge(proxies) + assert pool.count == 1 + assert "socks5://1.2.3.4:1080" in pool._proxies + + def test_allowed_protos_multiple(self): + cfg = ProxyPoolConfig(sources=[], allowed_protos=["socks5", "http"]) + pool = ProxyPool(cfg, [], timeout=10.0) + proxies = [ + ChainHop(proto="socks5", host="1.2.3.4", port=1080), + ChainHop(proto="http", host="5.6.7.8", port=8080), + ChainHop(proto="socks4", host="9.9.9.9", port=1080), + ] + pool._merge(proxies) + assert pool.count == 2 + assert "socks5://1.2.3.4:1080" in pool._proxies + assert "http://5.6.7.8:8080" in pool._proxies + assert "socks4://9.9.9.9:1080" not in pool._proxies + + def test_empty_allowed_protos_accepts_all(self): + cfg = ProxyPoolConfig(sources=[], allowed_protos=[]) + pool = ProxyPool(cfg, [], timeout=10.0) + proxies = [ + ChainHop(proto="socks5", host="1.2.3.4", port=1080), + ChainHop(proto="http", host="5.6.7.8", port=8080), + ] + pool._merge(proxies) + assert pool.count == 2 + + class TestProxyPoolGet: """Test proxy selection."""