Compare commits

...

5 Commits

Author SHA1 Message Date
user
29b4a36863 feat: named proxy pools with per-listener assignment
Add proxy_pools: top-level config (dict of name -> pool config) so
listeners can draw from different proxy sources. Each pool has
independent sources, health testing, state persistence, and refresh
cycles.

- PoolSourceConfig gains mitm: bool|None for API ?mitm=0/1 filtering
- ListenerConfig gains pool_name for named pool assignment
- ProxyPool gains name param with prefixed log messages and
  per-name state file derivation (pool-{name}.json)
- server.py replaces single proxy_pool with proxy_pools dict,
  validates listener pool references at startup, per-listener closure
- API /pool merges all pools (with pool field on multi-pool entries),
  /status and /config expose per-pool summaries
- Backward compat: singular proxy_pool: registers as "default"

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 11:33:53 +01:00
user
288bd95f62 feat: multi-Tor round-robin via tor_nodes config
New top-level tor_nodes list distributes traffic across multiple Tor
SOCKS proxies. First hop is replaced at connection time by round-robin
selection; health tests also rotate across all nodes. FirstHopPools
are created for each node when pool_size > 0.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 10:12:58 +01:00
user
b3966c9a9f feat: dynamic health test concurrency
Auto-scale test concurrency to ~10% of proxy count, capped by
test_concurrency config ceiling (default raised from 5 to 25).
Prevents saturating upstream Tor when pool size varies.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 10:09:44 +01:00
user
d4e3638143 feat: per-listener latency tracking
Each listener now tracks chain setup latency independently via a
dict[str, LatencyTracker] on Metrics. The global aggregate stays for
summary output. /status embeds per-listener latency on each listener
entry; /metrics includes a listener_latency map keyed by host:port.
2026-02-18 08:14:09 +01:00
user
b8f7217e43 feat: connection rate and chain latency metrics
Add RateTracker (rolling deque, events/sec) and LatencyTracker (circular
buffer, p50/p95/p99 in ms) to the Metrics class.  Both are recorded in
_handle_client and exposed via summary(), to_dict(), /status, and /metrics.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 00:16:46 +01:00
14 changed files with 1385 additions and 163 deletions

View File

@@ -25,7 +25,7 @@
- [x] Built-in control API (runtime metrics, pool state, config reload)
- [ ] SOCKS5 server authentication (username/password)
- [x] Tor control port integration (circuit renewal via NEWNYM)
- [ ] Metrics (connections/sec, bytes relayed, hop latency)
- [x] Metrics (connections/sec, bytes relayed, hop latency)
## v0.3.0

View File

@@ -48,6 +48,7 @@
- [x] Replace HTTP health check with TLS handshake (round-robin targets, no httpbin dependency)
- [x] Multi-listener with configurable proxy chaining (per-port chain depth)
- [x] Connection rate and chain latency metrics (rate/s, p50/p95/p99)
## Next
- [ ] Integration tests with mock proxy server

View File

@@ -20,14 +20,37 @@ chain:
# - socks4://proxy:1080 # post-Tor SOCKS4/4a proxy
# - http://user:pass@proxy:8080 # post-Tor HTTP CONNECT proxy
# Managed proxy pool -- fetches from multiple sources, health-tests,
# and rotates alive proxies per-connection after the static chain.
# Named proxy pools -- each pool has its own sources, health tests,
# and state file. Listeners reference pools by name via the "pool:" key.
#
# proxy_pools:
# clean: # MITM-free proxies
# sources:
# - url: http://10.200.1.250:8081/proxies/all
# mitm: false # filter: mitm=0 query param
# state_file: /data/pool-clean.json
# refresh: 300
# test_interval: 120
# test_timeout: 8
# max_fails: 3
# mitm: # MITM-capable proxies
# sources:
# - url: http://10.200.1.250:8081/proxies/all
# mitm: true # filter: mitm=1 query param
# state_file: /data/pool-mitm.json
# refresh: 300
# test_interval: 120
# test_timeout: 8
# max_fails: 3
# Single proxy pool (legacy, still supported -- becomes pool "default"):
# proxy_pool:
# sources:
# - url: http://10.200.1.250:8081/proxies
# proto: socks5 # optional: filter by protocol
# country: US # optional: filter by country
# limit: 1000 # optional: max proxies to fetch
# mitm: false # optional: filter by MITM status (true/false)
# - file: /etc/s5p/proxies.txt # text file, one proxy URL per line
# refresh: 300 # re-fetch sources interval (seconds)
# test_interval: 120 # health test cycle interval (seconds)
@@ -36,7 +59,7 @@ chain:
# - www.cloudflare.com
# - www.amazon.com
# test_timeout: 15 # per-test timeout (seconds)
# test_concurrency: 5 # parallel health tests
# test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool)
# max_fails: 3 # consecutive fails before eviction
# state_file: "" # empty = ~/.cache/s5p/pool.json
# report_url: "" # POST dead proxies here (optional)
@@ -50,26 +73,45 @@ chain:
# cookie_file: "" # CookieAuthentication file path
# newnym_interval: 0 # periodic NEWNYM (seconds, 0 = manual only)
# Multi-listener mode -- each listener gets its own address and chain.
# The "pool" keyword in a chain appends a random alive proxy from the pool.
# Multi-Tor round-robin -- distribute traffic across multiple Tor nodes.
# When present, the first hop in each listener's chain is replaced at
# connection time by round-robin selection from this list.
# tor_nodes:
# - socks5://10.200.1.1:9050
# - socks5://10.200.1.254:9050
# - socks5://10.200.1.250:9050
# - socks5://10.200.1.13:9050
# Multi-listener mode -- each listener gets its own address, chain,
# and optional pool assignment. The "pool" keyword in a chain appends
# a random alive proxy from the named pool (or "default" if unnamed).
# Multiple "pool" entries = multiple pool hops (deeper chaining).
#
# listeners:
# - listen: 0.0.0.0:1080
# pool: clean # draw from "clean" pool
# chain:
# - socks5://127.0.0.1:9050
# - pool # Tor + 2 random pool proxies
# - pool # Tor + 2 clean pool proxies
# - pool
#
# - listen: 0.0.0.0:1081
# pool: clean
# chain:
# - socks5://127.0.0.1:9050
# - pool # Tor + 1 random pool proxy
# - pool # Tor + 1 clean pool proxy
#
# - listen: 0.0.0.0:1082
# chain:
# - socks5://127.0.0.1:9050 # Tor only (no pool hops)
#
# - listen: 0.0.0.0:1083
# pool: mitm # draw from "mitm" pool
# chain:
# - socks5://127.0.0.1:9050
# - pool # Tor + 2 MITM pool proxies
# - pool
#
# When using "listeners:", the top-level "listen" and "chain" keys are ignored.
# If "listeners:" is absent, the old format is used (single listener).

View File

@@ -43,17 +43,35 @@ cp config/example.yaml config/s5p.yaml # create live config (gitignored)
```yaml
listeners:
- listen: 0.0.0.0:1080
pool: clean # named pool assignment
chain:
- socks5://127.0.0.1:9050
- pool # Tor + 2 pool hops
- pool # Tor + 2 clean hops
- pool
- listen: 0.0.0.0:1081
pool: clean
chain:
- socks5://127.0.0.1:9050
- pool # Tor + 1 pool hop
- pool # Tor + 1 clean hop
- listen: 0.0.0.0:1082
chain:
- socks5://127.0.0.1:9050 # Tor only
- listen: 0.0.0.0:1083
pool: mitm # MITM-capable proxies
chain:
- socks5://127.0.0.1:9050
- pool
- pool
```
## Multi-Tor Round-Robin (config)
```yaml
tor_nodes: # overrides first hop in all listeners
- socks5://10.200.1.1:9050
- socks5://10.200.1.254:9050
- socks5://10.200.1.250:9050
- socks5://10.200.1.13:9050
```
## Performance Tuning (config)
@@ -64,25 +82,39 @@ pool_size: 8 # pre-warmed TCP conns to first hop (0 = off)
pool_max_idle: 30 # evict idle pooled conns (seconds)
```
## Proxy Pool (config)
## Named Proxy Pools (config)
```yaml
proxy_pool:
sources:
- url: http://10.200.1.250:8081/proxies
proto: socks5
limit: 1000
- file: /etc/s5p/proxies.txt
refresh: 300 # re-fetch interval
test_interval: 120 # health test cycle
test_targets: # TLS handshake targets (round-robin)
- www.google.com
- www.cloudflare.com
- www.amazon.com
max_fails: 3 # evict after N fails
report_url: "" # POST dead proxies (optional)
proxy_pools:
clean:
sources:
- url: http://10.200.1.250:8081/proxies/all
mitm: false # adds ?mitm=0
state_file: /data/pool-clean.json
refresh: 300
test_interval: 120
max_fails: 3
mitm:
sources:
- url: http://10.200.1.250:8081/proxies/all
mitm: true # adds ?mitm=1
state_file: /data/pool-mitm.json
refresh: 300
test_interval: 120
max_fails: 3
```
Singular `proxy_pool:` still works (becomes pool "default").
## Source Filters (proxy_pool sources)
| Filter | Values | Query param |
|--------|--------|-------------|
| `proto` | socks5/socks4/http | `?proto=...` |
| `country` | ISO alpha-2 | `?country=...` |
| `limit` | integer | `?limit=...` |
| `mitm` | true/false | `?mitm=1` / `?mitm=0` |
## Tor Control Port (config)
```yaml
@@ -164,9 +196,31 @@ python -m pstats ~/.cache/s5p/s5p.prof # container profile output
## Metrics Log
```
metrics: conn=142 ok=98 fail=44 retries=88 active=3 in=1.2M out=4.5M up=0h05m12s pool=42/65
metrics: conn=1842 ok=1790 fail=52 retries=67 active=3 in=50.0M out=1.0G rate=4.72/s p50=198.3ms p95=890.1ms up=1h01m01s pool=42/65
```
## Metrics JSON (`/metrics`)
```bash
curl -s http://127.0.0.1:1081/metrics | jq .
```
```json
{
"connections": 1842,
"success": 1790,
"rate": 4.72,
"latency": {"count": 1000, "min": 45.2, "max": 2841.7, "avg": 312.4, "p50": 198.3, "p95": 890.1, "p99": 1523.6},
"listener_latency": {
"0.0.0.0:1080": {"count": 500, "p50": 1800.2, "p95": 8200.1, "...": "..."},
"0.0.0.0:1081": {"count": 300, "p50": 1000.1, "p95": 3500.2, "...": "..."},
"0.0.0.0:1082": {"count": 200, "p50": 400.1, "p95": 1200.5, "...": "..."}
}
}
```
Per-listener latency also appears in `/status` under each listener entry.
## Troubleshooting
| Symptom | Check |

View File

@@ -47,77 +47,146 @@ pool_size: 0 # pre-warmed TCP connections to first hop (0 = disable
pool_max_idle: 30 # max idle time for pooled connections (seconds)
api_listen: "" # control API bind address (empty = disabled)
# Multi-listener (each port gets its own chain depth)
# Named proxy pools (each with its own sources and filters)
proxy_pools:
clean:
sources:
- url: http://10.200.1.250:8081/proxies/all
mitm: false
refresh: 300
test_interval: 120
test_timeout: 8
max_fails: 3
# Multi-listener (each port gets its own chain depth and pool)
listeners:
- listen: 0.0.0.0:1080
pool: clean
chain:
- socks5://127.0.0.1:9050
- pool # Tor + 2 pool proxies
- pool # Tor + 2 clean proxies
- pool
- listen: 0.0.0.0:1081
pool: clean
chain:
- socks5://127.0.0.1:9050
- pool # Tor + 1 pool proxy
- pool # Tor + 1 clean proxy
# Or single-listener (old format):
# listen: 127.0.0.1:1080
# chain:
# - socks5://127.0.0.1:9050
proxy_pool:
sources:
- url: http://10.200.1.250:8081/proxies
proto: socks5
limit: 1000
- file: /etc/s5p/proxies.txt
refresh: 300
test_interval: 120
test_targets: # TLS handshake targets (round-robin)
- www.google.com
- www.cloudflare.com
- www.amazon.com
test_timeout: 15
test_concurrency: 5
max_fails: 3
state_file: "" # empty = ~/.cache/s5p/pool.json
```
## Multi-Tor Round-Robin
Distribute traffic across multiple Tor nodes instead of funneling everything
through a single one. When `tor_nodes` is configured, the first hop in each
listener's chain is replaced at connection time by round-robin selection.
Health tests also rotate across all nodes.
```yaml
tor_nodes:
- socks5://10.200.1.1:9050
- socks5://10.200.1.254:9050
- socks5://10.200.1.250:9050
- socks5://10.200.1.13:9050
```
When `tor_nodes` is absent, listeners use their configured first hop as before.
When present, `tor_nodes` overrides the first hop everywhere.
If `pool_size > 0`, pre-warmed connection pools are created for all nodes
automatically.
### API
`tor_nodes` appears in both `/config` and `/status` responses:
```bash
curl -s http://127.0.0.1:1090/config | jq '.tor_nodes'
curl -s http://127.0.0.1:1090/status | jq '.tor_nodes'
```
## Named Proxy Pools
Define multiple proxy pools with different source filters. Each listener can
reference a specific pool by name via the `pool:` key.
```yaml
proxy_pools:
clean:
sources:
- url: http://10.200.1.250:8081/proxies/all
mitm: false
state_file: /data/pool-clean.json
refresh: 300
test_interval: 120
test_timeout: 8
max_fails: 3
mitm:
sources:
- url: http://10.200.1.250:8081/proxies/all
mitm: true
state_file: /data/pool-mitm.json
refresh: 300
test_interval: 120
test_timeout: 8
max_fails: 3
```
Each pool has independent health testing, state persistence, and source
refresh cycles. The `mitm` source filter adds `?mitm=0` or `?mitm=1` to
API requests.
### Backward compatibility
The singular `proxy_pool:` key still works -- it registers as pool `"default"`.
If both `proxy_pool:` and `proxy_pools:` are present, `proxy_pools:` wins;
the singular is registered as `"default"` only when not already defined.
## Multi-Listener Mode
Run multiple listeners on different ports, each with a different number
of proxy hops after the static chain. Config-file only (not available via CLI).
of proxy hops and pool assignment. Config-file only (not available via CLI).
```yaml
listeners:
- listen: 0.0.0.0:1080
pool: clean
chain:
- socks5://10.200.1.13:9050
- pool # Tor + 2 pool proxies
- pool # Tor + 2 clean proxies
- pool
- listen: 0.0.0.0:1081
pool: clean
chain:
- socks5://10.200.1.13:9050
- pool # Tor + 1 pool proxy
- pool # Tor + 1 clean proxy
- listen: 0.0.0.0:1082
chain:
- socks5://10.200.1.13:9050 # Tor only
- socks5://10.200.1.13:9050 # Tor only (no pool)
proxy_pool:
sources:
- url: http://10.200.1.250:8081/proxies/all?mitm=0
refresh: 300
test_interval: 120
max_fails: 3
- listen: 0.0.0.0:1083
pool: mitm
chain:
- socks5://10.200.1.13:9050
- pool # Tor + 2 MITM proxies
- pool
```
The `pool` keyword in a chain means "append a random alive proxy from the
shared pool". Multiple `pool` entries = multiple pool hops (deeper chaining).
assigned pool". Multiple `pool` entries = multiple pool hops (deeper chaining).
When `pool:` is omitted on a listener with pool hops, it defaults to
`"default"`. A listener referencing an unknown pool name causes a fatal
error at startup. Listeners without pool hops ignore the `pool:` key.
| Resource | Scope | Notes |
|----------|-------|-------|
| ProxyPool | shared | All listeners draw from one pool |
| ProxyPool | per name | Each named pool is independent |
| TorController | shared | One Tor instance |
| Metrics | shared | Aggregate stats across listeners |
| Semaphore | shared | Global `max_connections` cap |
@@ -172,6 +241,7 @@ proxy_pool:
proto: socks5 # optional: filter by protocol
country: US # optional: filter by country
limit: 1000 # max proxies to fetch from API
mitm: false # optional: filter by MITM status (true/false)
- file: /etc/s5p/proxies.txt # text file, one proxy URL per line
refresh: 300 # re-fetch sources every 300 seconds
test_interval: 120 # health test cycle every 120 seconds
@@ -180,9 +250,9 @@ proxy_pool:
- www.cloudflare.com
- www.amazon.com
test_timeout: 15 # per-test timeout (seconds)
test_concurrency: 5 # parallel health tests
test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool)
max_fails: 3 # evict after N consecutive failures
state_file: "" # empty = ~/.cache/s5p/pool.json
state_file: "" # empty = ~/.cache/s5p/pool[-name].json
report_url: "" # POST dead proxies here (optional)
```
@@ -193,6 +263,17 @@ proxy_pool:
| HTTP API | `url` | JSON: `{"proxies": [{"proto": "socks5", "proxy": "host:port"}, ...]}` |
| Text file | `file` | One proxy URL per line, `#` comments, blank lines ignored |
### Source filters
| Filter | Values | Effect |
|--------|--------|--------|
| `proto` | `socks5`, `socks4`, `http` | Adds `?proto=...` to API URL |
| `country` | ISO 3166-1 alpha-2 | Adds `?country=...` to API URL |
| `limit` | integer | Adds `?limit=...` to API URL |
| `mitm` | `true` / `false` | Adds `?mitm=1` / `?mitm=0` to API URL |
The `mitm` filter is silently ignored for file sources.
### Proxy file format
```
@@ -209,6 +290,10 @@ by performing a TLS handshake against one of the `test_targets` (rotated
round-robin). A successful handshake marks the proxy alive. After `max_fails`
consecutive failures, a proxy is evicted.
Concurrency auto-scales to ~10% of the proxy count, capped by
`test_concurrency` (default 25, minimum 3). For example, a pool of 73 proxies
tests 7 at a time rather than saturating the upstream Tor node.
Before each health test cycle, the static chain is tested without any pool
proxy. If the chain itself is unreachable (e.g., Tor is down), proxy tests
are skipped entirely and a warning is logged. This prevents false mass-failure
@@ -307,7 +392,7 @@ s5p --api 127.0.0.1:1081 -c config/s5p.yaml
| Method | Path | Description |
|--------|------|-------------|
| `GET` | `/status` | Combined summary: uptime, metrics, pool stats, chain |
| `GET` | `/metrics` | Full metrics counters (connections, bytes, uptime) |
| `GET` | `/metrics` | Full metrics counters (connections, bytes, rate, latency) |
| `GET` | `/pool` | All proxies with per-entry state |
| `GET` | `/pool/alive` | Alive proxies only |
| `GET` | `/config` | Current runtime config (sanitized) |
@@ -461,7 +546,7 @@ s5p tracks connection metrics and logs a summary every 60 seconds and on
shutdown:
```
metrics: conn=142 ok=98 fail=44 retries=88 active=3 in=1.2M out=4.5M up=0h05m12s pool=42/65
metrics: conn=1842 ok=1790 fail=52 retries=67 active=3 in=50.0M out=1.0G rate=4.72/s p50=198.3ms p95=890.1ms up=1h01m01s pool=42/65
```
| Counter | Meaning |
@@ -473,9 +558,77 @@ metrics: conn=142 ok=98 fail=44 retries=88 active=3 in=1.2M out=4.5M up=0h05m12s
| `active` | Currently relaying |
| `in` | Bytes client -> remote |
| `out` | Bytes remote -> client |
| `rate` | Connection rate (events/sec, rolling window) |
| `p50` | Median chain setup latency in ms |
| `p95` | 95th percentile chain setup latency in ms |
| `up` | Server uptime |
| `pool` | Alive/total proxies (only when pool is active) |
### `/metrics` JSON response
`GET /metrics` returns all counters plus rate, latency percentiles, and
per-listener latency breakdowns:
```json
{
"connections": 1842,
"success": 1790,
"failed": 52,
"retries": 67,
"active": 3,
"bytes_in": 52428800,
"bytes_out": 1073741824,
"uptime": 3661.2,
"rate": 4.72,
"latency": {
"count": 1000,
"min": 45.2,
"max": 2841.7,
"avg": 312.4,
"p50": 198.3,
"p95": 890.1,
"p99": 1523.6
},
"listener_latency": {
"0.0.0.0:1080": {"count": 500, "min": 800.1, "max": 12400.3, "avg": 2100.5, "p50": 1800.2, "p95": 8200.1, "p99": 10500.3},
"0.0.0.0:1081": {"count": 300, "min": 400.5, "max": 5200.1, "avg": 1200.3, "p50": 1000.1, "p95": 3500.2, "p99": 4800.7},
"0.0.0.0:1082": {"count": 200, "min": 150.2, "max": 2000.1, "avg": 500.3, "p50": 400.1, "p95": 1200.5, "p99": 1800.2}
}
}
```
| Field | Type | Description |
|-------|------|-------------|
| `rate` | float | Connections/sec (rolling window of last 256 events) |
| `latency` | object/null | Aggregate chain setup latency in ms (null if no samples) |
| `latency.count` | int | Number of samples in buffer (max 1000) |
| `latency.p50` | float | Median latency (ms) |
| `latency.p95` | float | 95th percentile (ms) |
| `latency.p99` | float | 99th percentile (ms) |
| `listener_latency` | object | Per-listener latency, keyed by `host:port` |
### Per-listener latency
Each listener tracks chain setup latency independently. The `/status`
endpoint includes a `latency` field on each listener entry:
```json
{
"listeners": [
{
"listen": "0.0.0.0:1080",
"chain": ["socks5://10.200.1.13:9050"],
"pool_hops": 2,
"latency": {"count": 500, "p50": 1800.2, "p95": 8200.1, "...": "..."}
}
]
}
```
The aggregate `latency` in `/metrics` combines all listeners. Use
`listener_latency` or the per-listener `latency` in `/status` to
isolate latency by chain depth.
## Profiling
```bash

View File

@@ -63,17 +63,34 @@ def _handle_status(ctx: dict) -> tuple[int, dict]:
"active": metrics.active,
"bytes_in": metrics.bytes_in,
"bytes_out": metrics.bytes_out,
"rate": round(metrics.conn_rate.rate(), 2),
"latency": metrics.latency.stats(),
}
pool = ctx.get("pool")
if pool:
pools: dict = ctx.get("pools") or {}
if pools:
total_alive = sum(p.alive_count for p in pools.values())
total_count = sum(p.count for p in pools.values())
data["pool"] = {"alive": total_alive, "total": total_count}
data["pools"] = {
name: {"alive": p.alive_count, "total": p.count}
for name, p in pools.items()
}
elif ctx.get("pool"):
pool = ctx["pool"]
data["pool"] = {"alive": pool.alive_count, "total": pool.count}
config = ctx.get("config")
if config and config.tor_nodes:
data["tor_nodes"] = [str(n) for n in config.tor_nodes]
if config:
data["listeners"] = [
{
"listen": f"{lc.listen_host}:{lc.listen_port}",
"chain": [str(h) for h in lc.chain],
"pool_hops": lc.pool_hops,
**({"pool": lc.pool_name} if lc.pool_name else {}),
"latency": metrics.get_listener_latency(
f"{lc.listen_host}:{lc.listen_port}"
).stats(),
}
for lc in config.listeners
]
@@ -87,27 +104,48 @@ def _handle_metrics(ctx: dict) -> tuple[int, dict]:
def _handle_pool(ctx: dict, alive_only: bool = False) -> tuple[int, dict]:
"""GET /pool or /pool/alive -- proxy pool state."""
pool = ctx.get("pool")
if not pool:
pools: dict = ctx.get("pools") or {}
pool_list = list(pools.values()) if pools else []
# backward compat: fall back to single "pool" key
if not pool_list and ctx.get("pool"):
pool_list = [ctx["pool"]]
if not pool_list:
return 200, {"alive": 0, "total": 0, "proxies": {}}
multi = len(pool_list) > 1
proxies = {}
for key, entry in pool._proxies.items():
if alive_only and not entry.alive:
continue
proxies[key] = {
"alive": entry.alive,
"fails": entry.fails,
"tests": entry.tests,
"last_ok": entry.last_ok,
"last_test": entry.last_test,
"last_seen": entry.last_seen,
}
return 200, {
"alive": pool.alive_count,
"total": pool.count,
total_alive = 0
total_count = 0
for p in pool_list:
total_alive += p.alive_count
total_count += p.count
for key, entry in p._proxies.items():
if alive_only and not entry.alive:
continue
rec: dict = {
"alive": entry.alive,
"fails": entry.fails,
"tests": entry.tests,
"last_ok": entry.last_ok,
"last_test": entry.last_test,
"last_seen": entry.last_seen,
}
if multi:
rec["pool"] = p.name
proxies[key] = rec
result: dict = {
"alive": total_alive,
"total": total_count,
"proxies": proxies,
}
if multi:
result["pools"] = {
p.name: {"alive": p.alive_count, "total": p.count}
for p in pool_list
}
return 200, result
def _handle_config(ctx: dict) -> tuple[int, dict]:
@@ -127,15 +165,38 @@ def _handle_config(ctx: dict) -> tuple[int, dict]:
"listen": f"{lc.listen_host}:{lc.listen_port}",
"chain": [str(h) for h in lc.chain],
"pool_hops": lc.pool_hops,
**({"pool": lc.pool_name} if lc.pool_name else {}),
}
for lc in config.listeners
],
}
if config.proxy_pool:
if config.tor_nodes:
data["tor_nodes"] = [str(n) for n in config.tor_nodes]
if config.proxy_pools:
pools_data: dict = {}
for name, pp in config.proxy_pools.items():
sources = []
for src in pp.sources:
s: dict = {}
if src.url:
s["url"] = src.url
if src.file:
s["file"] = src.file
if src.mitm is not None:
s["mitm"] = src.mitm
sources.append(s)
pools_data[name] = {
"sources": sources,
"refresh": pp.refresh,
"test_interval": pp.test_interval,
"max_fails": pp.max_fails,
}
data["proxy_pools"] = pools_data
elif config.proxy_pool:
pp = config.proxy_pool
sources = []
for src in pp.sources:
s: dict = {}
s = {}
if src.url:
s["url"] = src.url
if src.file:
@@ -164,19 +225,27 @@ async def _handle_reload(ctx: dict) -> tuple[int, dict]:
async def _handle_pool_test(ctx: dict) -> tuple[int, dict]:
"""POST /pool/test -- trigger immediate health test."""
pool = ctx.get("pool")
if not pool:
pools: dict = ctx.get("pools") or {}
pool_list = list(pools.values()) if pools else []
if not pool_list and ctx.get("pool"):
pool_list = [ctx["pool"]]
if not pool_list:
return 400, {"error": "no proxy pool configured"}
asyncio.create_task(pool._run_health_tests())
for p in pool_list:
asyncio.create_task(p._run_health_tests())
return 200, {"ok": True}
async def _handle_pool_refresh(ctx: dict) -> tuple[int, dict]:
"""POST /pool/refresh -- trigger immediate source re-fetch."""
pool = ctx.get("pool")
if not pool:
pools: dict = ctx.get("pools") or {}
pool_list = list(pools.values()) if pools else []
if not pool_list and ctx.get("pool"):
pool_list = [ctx["pool"]]
if not pool_list:
return 400, {"error": "no proxy pool configured"}
asyncio.create_task(pool._fetch_all_sources())
for p in pool_list:
asyncio.create_task(p._fetch_all_sources())
return 200, {"ok": True}

View File

@@ -36,6 +36,7 @@ class PoolSourceConfig:
proto: str | None = None
country: str | None = None
limit: int | None = 1000
mitm: bool | None = None
@dataclass
@@ -52,7 +53,7 @@ class ProxyPoolConfig:
"www.amazon.com",
])
test_timeout: float = 15.0
test_concurrency: int = 5
test_concurrency: int = 25
max_fails: int = 3
state_file: str = ""
report_url: str = ""
@@ -85,6 +86,7 @@ class ListenerConfig:
listen_port: int = 1080
chain: list[ChainHop] = field(default_factory=list)
pool_hops: int = 0
pool_name: str = ""
@dataclass
@@ -104,7 +106,9 @@ class Config:
api_host: str = ""
api_port: int = 0
proxy_pool: ProxyPoolConfig | None = None
proxy_pools: dict[str, ProxyPoolConfig] = field(default_factory=dict)
tor: TorConfig | None = None
tor_nodes: list[ChainHop] = field(default_factory=list)
config_file: str = ""
@@ -151,6 +155,39 @@ def parse_api_proxies(data: dict) -> list[ChainHop]:
return proxies
def _parse_pool_config(pool_raw: dict) -> ProxyPoolConfig:
"""Parse a single proxy pool config block from YAML."""
sources = []
for src in pool_raw.get("sources", []):
mitm = src.get("mitm")
if mitm is not None:
mitm = bool(mitm)
sources.append(
PoolSourceConfig(
url=src.get("url"),
file=src.get("file"),
proto=src.get("proto"),
country=src.get("country"),
limit=src.get("limit", 1000),
mitm=mitm,
)
)
kwargs: dict = {
"sources": sources,
"refresh": float(pool_raw.get("refresh", 300)),
"test_interval": float(pool_raw.get("test_interval", 120)),
"test_url": pool_raw.get("test_url", ""),
"test_timeout": float(pool_raw.get("test_timeout", 15)),
"test_concurrency": int(pool_raw.get("test_concurrency", 25)),
"max_fails": int(pool_raw.get("max_fails", 3)),
"state_file": pool_raw.get("state_file", ""),
"report_url": pool_raw.get("report_url", ""),
}
if "test_targets" in pool_raw:
kwargs["test_targets"] = list(pool_raw["test_targets"])
return ProxyPoolConfig(**kwargs)
def load_config(path: str | Path) -> Config:
"""Load configuration from a YAML file."""
path = Path(path)
@@ -210,33 +247,16 @@ def load_config(path: str | Path) -> Config:
)
)
# -- proxy pools (named) ------------------------------------------------
if "proxy_pools" in raw:
for name, pool_raw in raw["proxy_pools"].items():
config.proxy_pools[name] = _parse_pool_config(pool_raw)
if "proxy_pool" in raw:
pool_raw = raw["proxy_pool"]
sources = []
for src in pool_raw.get("sources", []):
sources.append(
PoolSourceConfig(
url=src.get("url"),
file=src.get("file"),
proto=src.get("proto"),
country=src.get("country"),
limit=src.get("limit", 1000),
)
)
kwargs: dict = {
"sources": sources,
"refresh": float(pool_raw.get("refresh", 300)),
"test_interval": float(pool_raw.get("test_interval", 120)),
"test_url": pool_raw.get("test_url", ""),
"test_timeout": float(pool_raw.get("test_timeout", 15)),
"test_concurrency": int(pool_raw.get("test_concurrency", 5)),
"max_fails": int(pool_raw.get("max_fails", 3)),
"state_file": pool_raw.get("state_file", ""),
"report_url": pool_raw.get("report_url", ""),
}
if "test_targets" in pool_raw:
kwargs["test_targets"] = list(pool_raw["test_targets"])
config.proxy_pool = ProxyPoolConfig(**kwargs)
config.proxy_pool = _parse_pool_config(raw["proxy_pool"])
# register singular as "default" when proxy_pools doesn't already have it
if "default" not in config.proxy_pools:
config.proxy_pools["default"] = config.proxy_pool
elif "proxy_source" in raw:
# backward compat: convert legacy proxy_source to proxy_pool
src_raw = raw["proxy_source"]
@@ -267,6 +287,10 @@ def load_config(path: str | Path) -> Config:
newnym_interval=float(tor_raw.get("newnym_interval", 0)),
)
# -- tor_nodes -------------------------------------------------------
if "tor_nodes" in raw:
config.tor_nodes = [parse_proxy_url(u) for u in raw["tor_nodes"]]
# -- listeners -------------------------------------------------------
if "listeners" in raw:
for entry in raw["listeners"]:
@@ -278,6 +302,8 @@ def load_config(path: str | Path) -> Config:
lc.listen_port = int(port_str)
elif isinstance(listen, (str, int)) and listen:
lc.listen_port = int(listen)
if "pool" in entry:
lc.pool_name = entry["pool"]
chain_raw = entry.get("chain", [])
for item in chain_raw:
if isinstance(item, str) and item.lower() == "pool":

View File

@@ -3,6 +3,69 @@
from __future__ import annotations
import time
from collections import deque
class RateTracker:
"""Rolling window event rate (events/sec).
Stores up to ``maxlen`` monotonic timestamps. Rate is computed
on read as ``(n - 1) / span`` over the window -- no background timer.
"""
def __init__(self, maxlen: int = 256) -> None:
self._times: deque[float] = deque(maxlen=maxlen)
def record(self, now: float | None = None) -> None:
"""Record an event at *now* (default: ``time.monotonic()``)."""
self._times.append(now if now is not None else time.monotonic())
def rate(self) -> float:
"""Return events/sec over the window, 0.0 if < 2 events."""
n = len(self._times)
if n < 2:
return 0.0
span = self._times[-1] - self._times[0]
if span <= 0:
return 0.0
return (n - 1) / span
class LatencyTracker:
"""Circular buffer of latency samples with percentile stats.
Stores up to ``maxlen`` float-second samples. ``stats()`` sorts
a copy on read (~0.1 ms for 1000 floats) and returns millisecond values.
"""
def __init__(self, maxlen: int = 1000) -> None:
self._samples: deque[float] = deque(maxlen=maxlen)
def record(self, seconds: float) -> None:
"""Append a latency sample (in seconds)."""
self._samples.append(seconds)
@property
def count(self) -> int:
"""Number of samples currently stored."""
return len(self._samples)
def stats(self) -> dict | None:
"""Return ``{count, min, max, avg, p50, p95, p99}`` in ms, or None."""
n = len(self._samples)
if n == 0:
return None
s = sorted(self._samples)
ms = [v * 1000 for v in s]
return {
"count": n,
"min": round(ms[0], 1),
"max": round(ms[-1], 1),
"avg": round(sum(ms) / n, 1),
"p50": round(ms[int(n * 0.50)], 1),
"p95": round(ms[min(int(n * 0.95), n - 1)], 1),
"p99": round(ms[min(int(n * 0.99), n - 1)], 1),
}
class Metrics:
@@ -21,16 +84,30 @@ class Metrics:
self.bytes_out: int = 0
self.active: int = 0
self.started: float = time.monotonic()
self.conn_rate: RateTracker = RateTracker()
self.latency: LatencyTracker = LatencyTracker()
self.listener_latency: dict[str, LatencyTracker] = {}
def get_listener_latency(self, key: str) -> LatencyTracker:
"""Get or create a per-listener latency tracker."""
if key not in self.listener_latency:
self.listener_latency[key] = LatencyTracker()
return self.listener_latency[key]
def summary(self) -> str:
"""One-line log-friendly summary."""
uptime = time.monotonic() - self.started
h, rem = divmod(int(uptime), 3600)
m, s = divmod(rem, 60)
rate = self.conn_rate.rate()
lat = self.latency.stats()
p50 = f" p50={lat['p50']:.1f}ms" if lat else ""
p95 = f" p95={lat['p95']:.1f}ms" if lat else ""
return (
f"conn={self.connections} ok={self.success} fail={self.failed} "
f"retries={self.retries} active={self.active} "
f"in={_human_bytes(self.bytes_in)} out={_human_bytes(self.bytes_out)} "
f"rate={rate:.2f}/s{p50}{p95} "
f"up={h}h{m:02d}m{s:02d}s"
)
@@ -45,6 +122,11 @@ class Metrics:
"bytes_in": self.bytes_in,
"bytes_out": self.bytes_out,
"uptime": round(time.monotonic() - self.started, 1),
"rate": round(self.conn_rate.rate(), 2),
"latency": self.latency.stats(),
"listener_latency": {
k: v.stats() for k, v in self.listener_latency.items()
},
}

View File

@@ -54,10 +54,16 @@ class ProxyPool:
cfg: ProxyPoolConfig,
chain: list[ChainHop],
timeout: float,
chain_nodes: list[ChainHop] | None = None,
name: str = "default",
) -> None:
self._cfg = cfg
self._chain = list(chain)
self._chain_nodes = chain_nodes or []
self._chain_idx = 0
self._timeout = timeout
self._name = name
self._log_prefix = f"pool[{name}]" if name != "default" else "pool"
self._proxies: dict[str, ProxyEntry] = {}
self._alive_keys: list[str] = []
self._tasks: list[asyncio.Task] = []
@@ -66,6 +72,20 @@ class ProxyPool:
self._ssl_ctx = ssl.create_default_context()
self._target_idx = 0
def _effective_chain(self) -> list[ChainHop]:
"""Return chain with first hop rotated across tor_nodes (if configured)."""
if not self._chain_nodes or not self._chain:
return self._chain
chain = list(self._chain)
chain[0] = self._chain_nodes[self._chain_idx % len(self._chain_nodes)]
self._chain_idx += 1
return chain
@property
def name(self) -> str:
"""Pool name."""
return self._name
# -- public interface ----------------------------------------------------
async def start(self) -> None:
@@ -87,7 +107,7 @@ class ProxyPool:
async def reload(self, cfg: ProxyPoolConfig) -> None:
"""Update pool config and trigger source re-fetch."""
self._cfg = cfg
logger.info("pool: config reloaded, re-fetching sources")
logger.info("%s: config reloaded, re-fetching sources", self._log_prefix)
await self._fetch_all_sources()
self._save_state()
@@ -167,10 +187,10 @@ class ProxyPool:
label = src.url or src.file or "?"
if isinstance(result, Exception):
err = str(result) or type(result).__name__
logger.warning("pool: source %s failed: %s", label, err)
logger.warning("%s: source %s failed: %s", self._log_prefix, label, err)
else:
kind = "fetched" if src.url else "loaded"
logger.info("pool: %s %d proxies from %s", kind, len(result), label)
logger.info("%s: %s %d proxies from %s", self._log_prefix, kind, len(result), label)
proxies.extend(result)
self._merge(proxies)
@@ -183,6 +203,8 @@ class ProxyPool:
params["proto"] = src.proto
if src.country:
params["country"] = src.country
if src.mitm is not None:
params["mitm"] = "1" if src.mitm else "0"
url = src.url
if params:
@@ -196,7 +218,7 @@ class ProxyPool:
"""Parse a text file with one proxy URL per line (runs in executor)."""
path = Path(src.file).expanduser()
if not path.is_file():
logger.warning("pool: file not found: %s", path)
logger.warning("%s: file not found: %s", self._log_prefix, path)
return []
proxies: list[ChainHop] = []
@@ -207,7 +229,7 @@ class ProxyPool:
try:
hop = parse_proxy_url(line)
except ValueError as e:
logger.debug("pool: skipping invalid line %r: %s", line, e)
logger.debug("%s: skipping invalid line %r: %s", self._log_prefix, line, e)
continue
if src.proto and hop.proto != src.proto:
continue
@@ -268,13 +290,13 @@ class ProxyPool:
"""Test a single proxy via TLS handshake through the full chain."""
entry.last_test = time.time()
entry.tests += 1
return await self._tls_check(self._chain + [entry.hop])
return await self._tls_check(self._effective_chain() + [entry.hop])
async def _test_chain(self) -> bool:
"""Test the static chain without any pool proxy."""
if not self._chain:
return True
return await self._tls_check(self._chain)
return await self._tls_check(self._effective_chain())
async def _run_health_tests(self, keys: list[str] | None = None) -> None:
"""Test proxies with bounded concurrency.
@@ -289,7 +311,10 @@ class ProxyPool:
if self._chain:
chain_ok = await self._test_chain()
if not chain_ok:
logger.warning("pool: static chain unreachable, skipping proxy tests")
logger.warning(
"%s: static chain unreachable, skipping proxy tests",
self._log_prefix,
)
return
target = (
@@ -300,7 +325,12 @@ class ProxyPool:
if not target:
return
sem = asyncio.Semaphore(self._cfg.test_concurrency)
effective = max(3, min(len(target) // 10, self._cfg.test_concurrency))
sem = asyncio.Semaphore(effective)
logger.debug(
"%s: testing %d proxies (concurrency=%d)",
self._log_prefix, len(target), effective,
)
results: dict[str, bool] = {}
async def _test(key: str, entry: ProxyEntry) -> None:
@@ -329,8 +359,8 @@ class ProxyPool:
skip_eviction = fail_rate > 0.90 and total > 10
if skip_eviction:
logger.warning(
"pool: %d/%d tests failed (%.0f%%), skipping eviction",
total - passed, total, fail_rate * 100,
"%s: %d/%d tests failed (%.0f%%), skipping eviction",
self._log_prefix, total - passed, total, fail_rate * 100,
)
evict_keys: list[str] = []
@@ -369,7 +399,8 @@ class ProxyPool:
parts.append(f"stale {len(stale_keys)}")
suffix = f" ({', '.join(parts)})" if parts else ""
logger.info(
"pool: %d proxies, %d alive%s",
"%s: %d proxies, %d alive%s",
self._log_prefix,
len(self._proxies),
len(self._alive_keys),
suffix,
@@ -393,9 +424,12 @@ class ProxyPool:
try:
await http_post_json(self._cfg.report_url, {"dead": dead})
logger.info("pool: reported %d dead proxies to %s", len(dead), self._cfg.report_url)
logger.info(
"%s: reported %d dead proxies to %s",
self._log_prefix, len(dead), self._cfg.report_url,
)
except Exception as e:
logger.debug("pool: report failed: %s", e)
logger.debug("%s: report failed: %s", self._log_prefix, e)
def _rebuild_alive(self) -> None:
"""Rebuild the alive keys list from current state."""
@@ -435,11 +469,12 @@ class ProxyPool:
# -- persistence ---------------------------------------------------------
def _resolve_state_path(self) -> Path:
"""Resolve state file path, defaulting to ~/.cache/s5p/pool.json."""
"""Resolve state file path, defaulting to ~/.cache/s5p/pool[-name].json."""
if self._cfg.state_file:
return Path(self._cfg.state_file).expanduser()
cache_dir = Path.home() / ".cache" / "s5p"
return cache_dir / "pool.json"
filename = "pool.json" if self._name == "default" else f"pool-{self._name}.json"
return cache_dir / filename
def _load_state(self) -> None:
"""Load proxy state from JSON file (warm start)."""
@@ -448,7 +483,7 @@ class ProxyPool:
try:
data = json.loads(self._state_path.read_text())
if data.get("version") != STATE_VERSION:
logger.warning("pool: state file version mismatch, starting fresh")
logger.warning("%s: state file version mismatch, starting fresh", self._log_prefix)
return
for key, entry in data.get("proxies", {}).items():
hop = ChainHop(
@@ -469,11 +504,11 @@ class ProxyPool:
)
self._rebuild_alive()
logger.info(
"pool: loaded state (%d proxies, %d alive)",
len(self._proxies), len(self._alive_keys),
"%s: loaded state (%d proxies, %d alive)",
self._log_prefix, len(self._proxies), len(self._alive_keys),
)
except (json.JSONDecodeError, KeyError, TypeError, ValueError) as e:
logger.warning("pool: corrupt state file: %s", e)
logger.warning("%s: corrupt state file: %s", self._log_prefix, e)
self._proxies.clear()
self._alive_keys.clear()
@@ -505,4 +540,4 @@ class ProxyPool:
tmp.write_text(json.dumps(data, indent=2))
os.replace(tmp, self._state_path)
except OSError as e:
logger.warning("pool: failed to save state: %s", e)
logger.warning("%s: failed to save state: %s", self._log_prefix, e)

View File

@@ -21,6 +21,19 @@ logger = logging.getLogger("s5p")
BUFFER_SIZE = 65536
class _RoundRobin:
"""Simple round-robin selector (single-threaded asyncio, no lock)."""
def __init__(self, items: list[ChainHop]) -> None:
self._items = items
self._idx = 0
def next(self) -> ChainHop:
item = self._items[self._idx % len(self._items)]
self._idx += 1
return item
# -- relay -------------------------------------------------------------------
@@ -66,6 +79,8 @@ async def _handle_client(
proxy_pool: ProxyPool | None = None,
metrics: Metrics | None = None,
first_hop_pool: FirstHopPool | None = None,
tor_rr: _RoundRobin | None = None,
hop_pools: dict[tuple[str, int], FirstHopPool] | None = None,
) -> None:
"""Handle a single SOCKS5 client connection."""
peer = client_writer.get_extra_info("peername")
@@ -73,6 +88,7 @@ async def _handle_client(
if metrics:
metrics.connections += 1
metrics.conn_rate.record()
try:
# -- greeting --
@@ -108,6 +124,13 @@ async def _handle_client(
for attempt in range(attempts):
effective_chain = list(listener.chain)
fhp = first_hop_pool
if tor_rr and effective_chain:
node = tor_rr.next()
effective_chain[0] = node
if hop_pools:
fhp = hop_pools.get((node.host, node.port))
pool_hops: list[ChainHop] = []
if proxy_pool and listener.pool_hops > 0:
for _ in range(listener.pool_hops):
@@ -122,10 +145,15 @@ async def _handle_client(
t0 = time.monotonic()
remote_reader, remote_writer = await build_chain(
effective_chain, target_host, target_port,
timeout=timeout, first_hop_pool=first_hop_pool,
timeout=timeout, first_hop_pool=fhp,
)
dt = time.monotonic() - t0
logger.debug("[%s] chain up in %.0fms", tag, dt * 1000)
if metrics:
metrics.latency.record(dt)
metrics.get_listener_latency(
f"{listener.listen_host}:{listener.listen_port}"
).record(dt)
break
except (ProtoError, TimeoutError, ConnectionError, OSError) as e:
last_err = e
@@ -205,7 +233,7 @@ async def _handle_client(
async def _metrics_logger(
metrics: Metrics,
stop: asyncio.Event,
pool: ProxyPool | None = None,
pools: dict[str, ProxyPool] | None = None,
) -> None:
"""Log metrics summary every 60 seconds."""
while not stop.is_set():
@@ -215,8 +243,13 @@ async def _metrics_logger(
pass
if not stop.is_set():
line = metrics.summary()
if pool:
line += f" pool={pool.alive_count}/{pool.count}"
if pools:
if len(pools) == 1:
p = next(iter(pools.values()))
line += f" pool={p.alive_count}/{p.count}"
else:
for name, p in pools.items():
line += f" pool[{name}]={p.alive_count}/{p.count}"
logger.info("metrics: %s", line)
@@ -232,13 +265,47 @@ async def serve(config: Config) -> None:
metrics = Metrics()
listeners = config.listeners
# -- shared proxy pool ---------------------------------------------------
proxy_pool: ProxyPool | None = None
if config.proxy_pool and config.proxy_pool.sources:
# use first listener's chain as base chain for pool health tests
base_chain = listeners[0].chain if listeners else config.chain
proxy_pool = ProxyPool(config.proxy_pool, base_chain, config.timeout)
await proxy_pool.start()
# -- tor_nodes round-robin -----------------------------------------------
tor_rr: _RoundRobin | None = None
if config.tor_nodes:
tor_rr = _RoundRobin(config.tor_nodes)
nodes = ", ".join(str(n) for n in config.tor_nodes)
logger.info("tor_nodes: %s (round-robin)", nodes)
# -- named proxy pools ---------------------------------------------------
proxy_pools: dict[str, ProxyPool] = {}
base_chain = listeners[0].chain if listeners else config.chain
for pool_name, pool_cfg in config.proxy_pools.items():
if not pool_cfg.sources:
continue
pool = ProxyPool(
pool_cfg, base_chain, config.timeout,
chain_nodes=config.tor_nodes or None,
name=pool_name,
)
await pool.start()
proxy_pools[pool_name] = pool
# backward compat: single proxy_pool -> "default"
if not proxy_pools and config.proxy_pool and config.proxy_pool.sources:
pool = ProxyPool(
config.proxy_pool, base_chain, config.timeout,
chain_nodes=config.tor_nodes or None,
)
await pool.start()
proxy_pools["default"] = pool
def _pool_for(lc: ListenerConfig) -> ProxyPool | None:
"""Resolve the proxy pool for a listener."""
if lc.pool_hops <= 0:
return None
name = lc.pool_name or "default"
if name not in proxy_pools:
raise RuntimeError(
f"listener {lc.listen_host}:{lc.listen_port} "
f"references unknown pool {name!r}"
)
return proxy_pools[name]
# -- per-unique first-hop connection pools --------------------------------
hop_pools: dict[tuple[str, int], FirstHopPool] = {}
@@ -254,6 +321,17 @@ async def serve(config: Config) -> None:
)
await hp.start()
hop_pools[key] = hp
# create pools for all tor_nodes
if config.tor_nodes:
for node in config.tor_nodes:
key = (node.host, node.port)
if key not in hop_pools:
hp = FirstHopPool(
node, size=config.pool_size,
max_idle=config.pool_max_idle,
)
await hp.start()
hop_pools[key] = hp
def _hop_pool_for(lc: ListenerConfig) -> FirstHopPool | None:
if not lc.chain:
@@ -283,15 +361,17 @@ async def serve(config: Config) -> None:
servers: list[asyncio.Server] = []
for lc in listeners:
hp = _hop_pool_for(lc)
lc_pool = _pool_for(lc)
async def on_client(
r: asyncio.StreamReader, w: asyncio.StreamWriter,
_lc: ListenerConfig = lc, _hp: FirstHopPool | None = hp,
_pool: ProxyPool | None = lc_pool,
) -> None:
async with sem:
await _handle_client(
r, w, _lc, config.timeout, config.retries,
proxy_pool, metrics, _hp,
_pool, metrics, _hp, tor_rr, hop_pools,
)
srv = await asyncio.start_server(on_client, lc.listen_host, lc.listen_port)
@@ -301,16 +381,21 @@ async def serve(config: Config) -> None:
chain_desc = " -> ".join(str(h) for h in lc.chain) if lc.chain else "direct"
nhops = lc.pool_hops
pool_desc = f" + {nhops} pool hop{'s' if nhops != 1 else ''}" if nhops else ""
if lc_pool and lc_pool.name != "default":
pool_desc += f" [{lc_pool.name}]"
logger.info("listener %s chain: %s%s", addr, chain_desc, pool_desc)
logger.info("max_connections=%d", config.max_connections)
if proxy_pool:
nsrc = len(config.proxy_pool.sources)
logger.info(
"pool: %d proxies, %d alive (from %d source%s)",
proxy_pool.count, proxy_pool.alive_count, nsrc, "s" if nsrc != 1 else "",
)
if proxy_pools:
for pname, pp in proxy_pools.items():
cfg = config.proxy_pools.get(pname, config.proxy_pool)
nsrc = len(cfg.sources) if cfg else 0
prefix = f"pool[{pname}]" if pname != "default" else "pool"
logger.info(
"%s: %d proxies, %d alive (from %d source%s)",
prefix, pp.count, pp.alive_count, nsrc, "s" if nsrc != 1 else "",
)
logger.info("retries: %d", config.retries)
if tor:
@@ -325,7 +410,8 @@ async def serve(config: Config) -> None:
api_ctx: dict = {
"config": config,
"metrics": metrics,
"pool": proxy_pool,
"pools": proxy_pools,
"pool": next(iter(proxy_pools.values()), None), # backward compat
"hop_pools": hop_pools,
"tor": tor,
}
@@ -351,8 +437,13 @@ async def serve(config: Config) -> None:
for h in root.handlers:
h.setLevel(level)
logging.getLogger("s5p").setLevel(level)
if proxy_pool and new.proxy_pool:
await proxy_pool.reload(new.proxy_pool)
# reload named pools (match by name)
for pname, pp in proxy_pools.items():
new_cfg = new.proxy_pools.get(pname)
if new_cfg:
await pp.reload(new_cfg)
elif new.proxy_pool and pname == "default":
await pp.reload(new.proxy_pool)
logger.info("reload: config reloaded (listeners require restart)")
def _on_sighup() -> None:
@@ -365,8 +456,7 @@ async def serve(config: Config) -> None:
api_srv = await start_api(config.api_host, config.api_port, api_ctx)
metrics_stop = asyncio.Event()
pool_ref = proxy_pool
metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, pool_ref))
metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, proxy_pools or None))
# keep all servers open until stop signal
try:
@@ -386,11 +476,16 @@ async def serve(config: Config) -> None:
await tor.stop()
for hp in hop_pools.values():
await hp.stop()
if proxy_pool:
await proxy_pool.stop()
for pp in proxy_pools.values():
await pp.stop()
shutdown_line = metrics.summary()
if pool_ref:
shutdown_line += f" pool={pool_ref.alive_count}/{pool_ref.count}"
if proxy_pools:
if len(proxy_pools) == 1:
p = next(iter(proxy_pools.values()))
shutdown_line += f" pool={p.alive_count}/{p.count}"
else:
for pname, p in proxy_pools.items():
shutdown_line += f" pool[{pname}]={p.alive_count}/{p.count}"
logger.info("metrics: %s", shutdown_line)
metrics_stop.set()
await metrics_task

View File

@@ -82,6 +82,7 @@ class TestJsonResponse:
def _make_ctx(
config: Config | None = None,
pool: MagicMock | None = None,
pools: dict | None = None,
tor: MagicMock | None = None,
) -> dict:
"""Build a mock context dict."""
@@ -89,6 +90,7 @@ def _make_ctx(
"config": config or Config(),
"metrics": Metrics(),
"pool": pool,
"pools": pools,
"hop_pool": None,
"tor": tor,
}
@@ -109,6 +111,8 @@ class TestHandleStatus:
assert body["connections"] == 10
assert body["success"] == 8
assert "uptime" in body
assert "rate" in body
assert "latency" in body
def test_with_pool(self):
pool = MagicMock()
@@ -133,11 +137,37 @@ class TestHandleStatus:
],
)
ctx = _make_ctx(config=config)
# record some latency for the first listener
ctx["metrics"].get_listener_latency("0.0.0.0:1080").record(0.2)
_, body = _handle_status(ctx)
assert len(body["listeners"]) == 2
assert body["listeners"][0]["chain"] == ["socks5://127.0.0.1:9050"]
assert body["listeners"][0]["pool_hops"] == 0
assert body["listeners"][1]["pool_hops"] == 1
# per-listener latency present on each entry
assert "latency" in body["listeners"][0]
assert body["listeners"][0]["latency"]["count"] == 1
assert "latency" in body["listeners"][1]
assert body["listeners"][1]["latency"] is None
class TestHandleStatusPools:
"""Test GET /status with multiple named pools."""
def test_multi_pool_summary(self):
pool_a = MagicMock()
pool_a.alive_count = 5
pool_a.count = 10
pool_a.name = "clean"
pool_b = MagicMock()
pool_b.alive_count = 3
pool_b.count = 8
pool_b.name = "mitm"
ctx = _make_ctx(pools={"clean": pool_a, "mitm": pool_b})
_, body = _handle_status(ctx)
assert body["pool"] == {"alive": 8, "total": 18}
assert body["pools"]["clean"] == {"alive": 5, "total": 10}
assert body["pools"]["mitm"] == {"alive": 3, "total": 8}
class TestHandleMetrics:
@@ -152,6 +182,9 @@ class TestHandleMetrics:
assert body["connections"] == 42
assert body["bytes_in"] == 1024
assert "uptime" in body
assert "rate" in body
assert "latency" in body
assert "listener_latency" in body
class TestHandlePool:
@@ -206,6 +239,57 @@ class TestHandlePool:
assert "socks5://1.2.3.4:1080" in body["proxies"]
class TestHandlePoolMulti:
"""Test GET /pool with multiple named pools."""
def test_merges_entries(self):
pool_a = MagicMock()
pool_a.alive_count = 1
pool_a.count = 1
pool_a.name = "clean"
entry_a = MagicMock(
alive=True, fails=0, tests=5,
last_ok=100.0, last_test=100.0, last_seen=100.0,
)
pool_a._proxies = {"socks5://1.2.3.4:1080": entry_a}
pool_b = MagicMock()
pool_b.alive_count = 1
pool_b.count = 1
pool_b.name = "mitm"
entry_b = MagicMock(
alive=True, fails=0, tests=3,
last_ok=90.0, last_test=90.0, last_seen=90.0,
)
pool_b._proxies = {"socks5://5.6.7.8:1080": entry_b}
ctx = _make_ctx(pools={"clean": pool_a, "mitm": pool_b})
_, body = _handle_pool(ctx)
assert body["alive"] == 2
assert body["total"] == 2
assert len(body["proxies"]) == 2
assert body["proxies"]["socks5://1.2.3.4:1080"]["pool"] == "clean"
assert body["proxies"]["socks5://5.6.7.8:1080"]["pool"] == "mitm"
assert "pools" in body
assert body["pools"]["clean"] == {"alive": 1, "total": 1}
def test_single_pool_no_pool_field(self):
"""Single pool: no 'pool' field on entries, no 'pools' summary."""
pool = MagicMock()
pool.alive_count = 1
pool.count = 1
pool.name = "default"
entry = MagicMock(
alive=True, fails=0, tests=5,
last_ok=100.0, last_test=100.0, last_seen=100.0,
)
pool._proxies = {"socks5://1.2.3.4:1080": entry}
ctx = _make_ctx(pools={"default": pool})
_, body = _handle_pool(ctx)
assert "pool" not in body["proxies"]["socks5://1.2.3.4:1080"]
assert "pools" not in body
class TestHandleConfig:
"""Test GET /config handler."""
@@ -243,6 +327,79 @@ class TestHandleConfig:
assert body["proxy_pool"]["sources"][0]["url"] == "http://api:8081/proxies"
assert body["listeners"][0]["pool_hops"] == 1
def test_with_proxy_pools(self):
pp_clean = ProxyPoolConfig(
sources=[PoolSourceConfig(url="http://api:8081/proxies/all", mitm=False)],
refresh=300.0,
test_interval=120.0,
max_fails=3,
)
pp_mitm = ProxyPoolConfig(
sources=[PoolSourceConfig(url="http://api:8081/proxies/all", mitm=True)],
refresh=300.0,
test_interval=120.0,
max_fails=3,
)
config = Config(
proxy_pools={"clean": pp_clean, "mitm": pp_mitm},
listeners=[ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_hops=2, pool_name="clean",
)],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert "proxy_pools" in body
assert body["proxy_pools"]["clean"]["sources"][0]["mitm"] is False
assert body["proxy_pools"]["mitm"]["sources"][0]["mitm"] is True
assert body["listeners"][0]["pool"] == "clean"
def test_with_tor_nodes(self):
config = Config(
tor_nodes=[
ChainHop("socks5", "10.200.1.1", 9050),
ChainHop("socks5", "10.200.1.13", 9050),
],
listeners=[ListenerConfig()],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert body["tor_nodes"] == [
"socks5://10.200.1.1:9050",
"socks5://10.200.1.13:9050",
]
def test_no_tor_nodes(self):
config = Config(listeners=[ListenerConfig()])
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert "tor_nodes" not in body
class TestHandleStatusTorNodes:
"""Test tor_nodes in GET /status response."""
def test_tor_nodes_in_status(self):
config = Config(
tor_nodes=[
ChainHop("socks5", "10.200.1.1", 9050),
ChainHop("socks5", "10.200.1.13", 9050),
],
listeners=[ListenerConfig()],
)
ctx = _make_ctx(config=config)
_, body = _handle_status(ctx)
assert body["tor_nodes"] == [
"socks5://10.200.1.1:9050",
"socks5://10.200.1.13:9050",
]
def test_no_tor_nodes_in_status(self):
ctx = _make_ctx()
_, body = _handle_status(ctx)
assert "tor_nodes" not in body
# -- routing -----------------------------------------------------------------

View File

@@ -135,6 +135,7 @@ class TestConfig:
assert c.max_connections == 256
assert c.pool_size == 0
assert c.pool_max_idle == 30.0
assert c.tor_nodes == []
def test_max_connections_from_yaml(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
@@ -221,6 +222,115 @@ class TestConfig:
]
class TestProxyPools:
"""Test named proxy_pools config parsing."""
def test_proxy_pools_from_yaml(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"proxy_pools:\n"
" clean:\n"
" sources:\n"
" - url: http://api:8081/proxies/all\n"
" mitm: false\n"
" refresh: 300\n"
" state_file: /data/pool-clean.json\n"
" mitm:\n"
" sources:\n"
" - url: http://api:8081/proxies/all\n"
" mitm: true\n"
" state_file: /data/pool-mitm.json\n"
)
c = load_config(cfg_file)
assert "clean" in c.proxy_pools
assert "mitm" in c.proxy_pools
assert c.proxy_pools["clean"].sources[0].mitm is False
assert c.proxy_pools["mitm"].sources[0].mitm is True
assert c.proxy_pools["clean"].state_file == "/data/pool-clean.json"
assert c.proxy_pools["mitm"].state_file == "/data/pool-mitm.json"
def test_mitm_none_when_absent(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"proxy_pool:\n"
" sources:\n"
" - url: http://api:8081/proxies\n"
)
c = load_config(cfg_file)
assert c.proxy_pool is not None
assert c.proxy_pool.sources[0].mitm is None
def test_singular_becomes_default(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"proxy_pool:\n"
" sources:\n"
" - url: http://api:8081/proxies\n"
)
c = load_config(cfg_file)
assert "default" in c.proxy_pools
assert c.proxy_pools["default"] is c.proxy_pool
def test_proxy_pools_wins_over_singular(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"proxy_pools:\n"
" default:\n"
" sources:\n"
" - url: http://api:8081/pools-default\n"
"proxy_pool:\n"
" sources:\n"
" - url: http://api:8081/singular\n"
)
c = load_config(cfg_file)
# proxy_pools "default" wins, singular does not overwrite
assert c.proxy_pools["default"].sources[0].url == "http://api:8081/pools-default"
def test_listener_pool_name(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"listeners:\n"
" - listen: 0.0.0.0:1080\n"
" pool: clean\n"
" chain:\n"
" - socks5://127.0.0.1:9050\n"
" - pool\n"
" - listen: 0.0.0.0:1081\n"
" chain:\n"
" - socks5://127.0.0.1:9050\n"
)
c = load_config(cfg_file)
assert c.listeners[0].pool_name == "clean"
assert c.listeners[0].pool_hops == 1
assert c.listeners[1].pool_name == ""
assert c.listeners[1].pool_hops == 0
class TestTorNodes:
"""Test tor_nodes config parsing."""
def test_tor_nodes_from_yaml(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"tor_nodes:\n"
" - socks5://10.200.1.1:9050\n"
" - socks5://10.200.1.254:9050\n"
" - socks5://10.200.1.13:9050\n"
)
c = load_config(cfg_file)
assert len(c.tor_nodes) == 3
assert c.tor_nodes[0].host == "10.200.1.1"
assert c.tor_nodes[0].port == 9050
assert c.tor_nodes[1].host == "10.200.1.254"
assert c.tor_nodes[2].host == "10.200.1.13"
def test_no_tor_nodes(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text("listen: 1080\n")
c = load_config(cfg_file)
assert c.tor_nodes == []
class TestListenerConfig:
"""Test multi-listener config parsing."""

181
tests/test_metrics.py Normal file
View File

@@ -0,0 +1,181 @@
"""Tests for metrics trackers and helpers."""
from s5p.metrics import LatencyTracker, Metrics, RateTracker, _human_bytes
# -- LatencyTracker ----------------------------------------------------------
class TestLatencyTracker:
"""Test latency sample collection and percentile stats."""
def test_empty(self):
lt = LatencyTracker()
assert lt.count == 0
assert lt.stats() is None
def test_single(self):
lt = LatencyTracker()
lt.record(0.1)
s = lt.stats()
assert s is not None
assert s["count"] == 1
assert s["min"] == s["max"] == s["avg"] == s["p50"]
def test_percentiles(self):
lt = LatencyTracker()
# 100 evenly spaced samples: 0.001, 0.002, ..., 0.100
for i in range(1, 101):
lt.record(i / 1000)
s = lt.stats()
assert s["count"] == 100
assert s["min"] == 1.0 # 0.001s = 1.0ms
assert s["max"] == 100.0 # 0.100s = 100.0ms
assert 49.0 <= s["avg"] <= 52.0
assert 50.0 <= s["p50"] <= 52.0
assert 95.0 <= s["p95"] <= 97.0
assert 99.0 <= s["p99"] <= 101.0
def test_bounded(self):
lt = LatencyTracker(maxlen=5)
for i in range(10):
lt.record(i / 100)
assert lt.count == 5
s = lt.stats()
# only the last 5 samples remain: 0.05..0.09
assert s["min"] == 50.0
assert s["max"] == 90.0
def test_milliseconds(self):
lt = LatencyTracker()
lt.record(0.5) # 500ms
s = lt.stats()
assert s["min"] == 500.0
assert s["max"] == 500.0
# -- RateTracker -------------------------------------------------------------
class TestRateTracker:
"""Test rolling window event rate calculation."""
def test_empty(self):
rt = RateTracker()
assert rt.rate() == 0.0
def test_single(self):
rt = RateTracker()
rt.record()
assert rt.rate() == 0.0
def test_known_rate(self):
rt = RateTracker()
# 11 events at 0.1s intervals = 10/1.0 = 10.0/s
for i in range(11):
rt.record(now=100.0 + i * 0.1)
assert abs(rt.rate() - 10.0) < 0.01
def test_bounded(self):
rt = RateTracker(maxlen=5)
for i in range(10):
rt.record(now=float(i))
# only last 5 events: t=5..9, span=4, rate=4/4=1.0
assert abs(rt.rate() - 1.0) < 0.01
def test_zero_span(self):
rt = RateTracker()
rt.record(now=1.0)
rt.record(now=1.0)
assert rt.rate() == 0.0
# -- Metrics -----------------------------------------------------------------
class TestMetrics:
"""Test Metrics aggregation and output."""
def test_to_dict_includes_rate_and_latency(self):
m = Metrics()
m.connections = 10
m.conn_rate.record(now=0.0)
m.conn_rate.record(now=1.0)
m.latency.record(0.2)
m.latency.record(0.3)
d = m.to_dict()
assert "rate" in d
assert isinstance(d["rate"], float)
assert "latency" in d
assert d["latency"] is not None
assert d["latency"]["count"] == 2
def test_to_dict_latency_none_when_empty(self):
m = Metrics()
d = m.to_dict()
assert d["latency"] is None
assert d["rate"] == 0.0
def test_summary_includes_rate(self):
m = Metrics()
m.conn_rate.record(now=0.0)
m.conn_rate.record(now=1.0)
s = m.summary()
assert "rate=" in s
assert "/s" in s
def test_summary_includes_latency(self):
m = Metrics()
m.latency.record(0.2)
s = m.summary()
assert "p50=" in s
assert "p95=" in s
def test_summary_no_latency_when_empty(self):
m = Metrics()
s = m.summary()
assert "p50=" not in s
assert "p95=" not in s
def test_listener_latency(self):
m = Metrics()
m.get_listener_latency("0.0.0.0:1080").record(0.5)
m.get_listener_latency("0.0.0.0:1080").record(0.6)
m.get_listener_latency("0.0.0.0:1081").record(0.1)
d = m.to_dict()
assert "listener_latency" in d
assert "0.0.0.0:1080" in d["listener_latency"]
assert "0.0.0.0:1081" in d["listener_latency"]
assert d["listener_latency"]["0.0.0.0:1080"]["count"] == 2
assert d["listener_latency"]["0.0.0.0:1081"]["count"] == 1
def test_listener_latency_empty(self):
m = Metrics()
d = m.to_dict()
assert d["listener_latency"] == {}
# -- _human_bytes ------------------------------------------------------------
class TestHumanBytes:
"""Test byte count formatting."""
def test_bytes(self):
assert _human_bytes(0) == "0B"
assert _human_bytes(512) == "512B"
def test_kilobytes(self):
assert _human_bytes(1024) == "1.0K"
assert _human_bytes(1536) == "1.5K"
def test_megabytes(self):
assert _human_bytes(1024 * 1024) == "1.0M"
def test_gigabytes(self):
assert _human_bytes(1024**3) == "1.0G"
def test_terabytes(self):
assert _human_bytes(1024**4) == "1.0T"
def test_petabytes(self):
assert _human_bytes(1024**5) == "1.0P"

View File

@@ -11,6 +11,95 @@ from s5p.config import ChainHop, PoolSourceConfig, ProxyPoolConfig
from s5p.pool import ProxyEntry, ProxyPool
class TestProxyPoolName:
"""Test pool name and state path derivation."""
def test_default_name(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
assert pool.name == "default"
assert pool._log_prefix == "pool"
def test_named_pool(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0, name="clean")
assert pool.name == "clean"
assert pool._log_prefix == "pool[clean]"
def test_state_path_default(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
assert pool._state_path.name == "pool.json"
def test_state_path_named(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0, name="clean")
assert pool._state_path.name == "pool-clean.json"
def test_state_path_explicit_overrides_name(self):
cfg = ProxyPoolConfig(sources=[], state_file="/data/custom.json")
pool = ProxyPool(cfg, [], timeout=10.0, name="clean")
assert str(pool._state_path) == "/data/custom.json"
class TestProxyPoolMitmQuery:
"""Test mitm query parameter in API fetch."""
def test_mitm_false(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
src = PoolSourceConfig(url="http://api:8081/proxies/all", mitm=False)
async def run():
from unittest.mock import AsyncMock, patch
mock_ret = {"proxies": []}
with patch(
"s5p.pool.http_get_json",
new_callable=AsyncMock, return_value=mock_ret,
) as mock:
await pool._fetch_api(src)
call_url = mock.call_args[0][0]
assert "mitm=0" in call_url
asyncio.run(run())
def test_mitm_true(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
src = PoolSourceConfig(url="http://api:8081/proxies/all", mitm=True)
async def run():
from unittest.mock import AsyncMock, patch
mock_ret = {"proxies": []}
with patch(
"s5p.pool.http_get_json",
new_callable=AsyncMock, return_value=mock_ret,
) as mock:
await pool._fetch_api(src)
call_url = mock.call_args[0][0]
assert "mitm=1" in call_url
asyncio.run(run())
def test_mitm_none_omitted(self):
cfg = ProxyPoolConfig(sources=[])
pool = ProxyPool(cfg, [], timeout=10.0)
src = PoolSourceConfig(url="http://api:8081/proxies/all", mitm=None)
async def run():
from unittest.mock import AsyncMock, patch
mock_ret = {"proxies": []}
with patch(
"s5p.pool.http_get_json",
new_callable=AsyncMock, return_value=mock_ret,
) as mock:
await pool._fetch_api(src)
call_url = mock.call_args[0][0]
assert "mitm" not in call_url
asyncio.run(run())
class TestProxyEntry:
"""Test ProxyEntry defaults."""
@@ -22,6 +111,38 @@ class TestProxyEntry:
assert entry.tests == 0
class TestEffectiveChain:
"""Test chain_nodes round-robin in pool health tests."""
def test_no_nodes_returns_original(self):
cfg = ProxyPoolConfig(sources=[])
chain = [ChainHop(proto="socks5", host="10.0.0.1", port=9050)]
pool = ProxyPool(cfg, chain, timeout=10.0)
assert pool._effective_chain() == chain
def test_round_robin_across_nodes(self):
cfg = ProxyPoolConfig(sources=[])
chain = [ChainHop(proto="socks5", host="original", port=9050)]
nodes = [
ChainHop(proto="socks5", host="node-a", port=9050),
ChainHop(proto="socks5", host="node-b", port=9050),
ChainHop(proto="socks5", host="node-c", port=9050),
]
pool = ProxyPool(cfg, chain, timeout=10.0, chain_nodes=nodes)
hosts = [pool._effective_chain()[0].host for _ in range(6)]
assert hosts == [
"node-a", "node-b", "node-c",
"node-a", "node-b", "node-c",
]
def test_empty_chain_no_replacement(self):
cfg = ProxyPoolConfig(sources=[])
nodes = [ChainHop(proto="socks5", host="node-a", port=9050)]
pool = ProxyPool(cfg, [], timeout=10.0, chain_nodes=nodes)
assert pool._effective_chain() == []
class TestProxyPoolMerge:
"""Test proxy deduplication and merge."""
@@ -155,6 +276,102 @@ class TestProxyPoolWeight:
pool.report_failure(hop) # should not raise
class TestDynamicConcurrency:
"""Test dynamic health test concurrency scaling."""
def test_scales_to_ten_percent(self):
import asyncio
from unittest.mock import AsyncMock, patch
cfg = ProxyPoolConfig(sources=[], test_concurrency=25)
pool = ProxyPool(cfg, [], timeout=10.0)
now = time.time()
# add 100 proxies -> effective concurrency = max(3, min(100//10, 25)) = 10
for i in range(100):
hop = ChainHop(proto="socks5", host=f"10.0.{i // 256}.{i % 256}", port=1080)
key = f"socks5://10.0.{i // 256}.{i % 256}:1080"
pool._proxies[key] = ProxyEntry(hop=hop, alive=False, last_seen=now)
captured = {}
original_semaphore = asyncio.Semaphore
def capture_semaphore(value):
captured["concurrency"] = value
return original_semaphore(value)
with (
patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True),
patch("s5p.pool.asyncio.Semaphore", side_effect=capture_semaphore),
):
asyncio.run(pool._run_health_tests())
assert captured["concurrency"] == 10
def test_minimum_of_three(self):
import asyncio
from unittest.mock import AsyncMock, patch
cfg = ProxyPoolConfig(sources=[], test_concurrency=25)
pool = ProxyPool(cfg, [], timeout=10.0)
now = time.time()
# 5 proxies -> 5//10=0, but min is 3
for i in range(5):
hop = ChainHop(proto="socks5", host=f"10.0.0.{i}", port=1080)
pool._proxies[f"socks5://10.0.0.{i}:1080"] = ProxyEntry(
hop=hop, alive=False, last_seen=now,
)
captured = {}
original_semaphore = asyncio.Semaphore
def capture_semaphore(value):
captured["concurrency"] = value
return original_semaphore(value)
with (
patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True),
patch("s5p.pool.asyncio.Semaphore", side_effect=capture_semaphore),
):
asyncio.run(pool._run_health_tests())
assert captured["concurrency"] == 3
def test_capped_by_config(self):
import asyncio
from unittest.mock import AsyncMock, patch
cfg = ProxyPoolConfig(sources=[], test_concurrency=5)
pool = ProxyPool(cfg, [], timeout=10.0)
now = time.time()
# 1000 proxies -> 1000//10=100, capped at 5
for i in range(1000):
h = f"10.{i // 65536}.{(i // 256) % 256}.{i % 256}"
hop = ChainHop(proto="socks5", host=h, port=1080)
key = str(hop)
pool._proxies[key] = ProxyEntry(hop=hop, alive=False, last_seen=now)
captured = {}
original_semaphore = asyncio.Semaphore
def capture_semaphore(value):
captured["concurrency"] = value
return original_semaphore(value)
with (
patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True),
patch("s5p.pool.asyncio.Semaphore", side_effect=capture_semaphore),
):
asyncio.run(pool._run_health_tests())
assert captured["concurrency"] == 5
class TestProxyPoolHealthTests:
"""Test selective health testing."""