Compare commits
5 Commits
e7de479c88
...
29b4a36863
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
29b4a36863 | ||
|
|
288bd95f62 | ||
|
|
b3966c9a9f | ||
|
|
d4e3638143 | ||
|
|
b8f7217e43 |
@@ -25,7 +25,7 @@
|
||||
- [x] Built-in control API (runtime metrics, pool state, config reload)
|
||||
- [ ] SOCKS5 server authentication (username/password)
|
||||
- [x] Tor control port integration (circuit renewal via NEWNYM)
|
||||
- [ ] Metrics (connections/sec, bytes relayed, hop latency)
|
||||
- [x] Metrics (connections/sec, bytes relayed, hop latency)
|
||||
|
||||
## v0.3.0
|
||||
|
||||
|
||||
1
TASKS.md
1
TASKS.md
@@ -48,6 +48,7 @@
|
||||
- [x] Replace HTTP health check with TLS handshake (round-robin targets, no httpbin dependency)
|
||||
|
||||
- [x] Multi-listener with configurable proxy chaining (per-port chain depth)
|
||||
- [x] Connection rate and chain latency metrics (rate/s, p50/p95/p99)
|
||||
|
||||
## Next
|
||||
- [ ] Integration tests with mock proxy server
|
||||
|
||||
@@ -20,14 +20,37 @@ chain:
|
||||
# - socks4://proxy:1080 # post-Tor SOCKS4/4a proxy
|
||||
# - http://user:pass@proxy:8080 # post-Tor HTTP CONNECT proxy
|
||||
|
||||
# Managed proxy pool -- fetches from multiple sources, health-tests,
|
||||
# and rotates alive proxies per-connection after the static chain.
|
||||
# Named proxy pools -- each pool has its own sources, health tests,
|
||||
# and state file. Listeners reference pools by name via the "pool:" key.
|
||||
#
|
||||
# proxy_pools:
|
||||
# clean: # MITM-free proxies
|
||||
# sources:
|
||||
# - url: http://10.200.1.250:8081/proxies/all
|
||||
# mitm: false # filter: mitm=0 query param
|
||||
# state_file: /data/pool-clean.json
|
||||
# refresh: 300
|
||||
# test_interval: 120
|
||||
# test_timeout: 8
|
||||
# max_fails: 3
|
||||
# mitm: # MITM-capable proxies
|
||||
# sources:
|
||||
# - url: http://10.200.1.250:8081/proxies/all
|
||||
# mitm: true # filter: mitm=1 query param
|
||||
# state_file: /data/pool-mitm.json
|
||||
# refresh: 300
|
||||
# test_interval: 120
|
||||
# test_timeout: 8
|
||||
# max_fails: 3
|
||||
|
||||
# Single proxy pool (legacy, still supported -- becomes pool "default"):
|
||||
# proxy_pool:
|
||||
# sources:
|
||||
# - url: http://10.200.1.250:8081/proxies
|
||||
# proto: socks5 # optional: filter by protocol
|
||||
# country: US # optional: filter by country
|
||||
# limit: 1000 # optional: max proxies to fetch
|
||||
# mitm: false # optional: filter by MITM status (true/false)
|
||||
# - file: /etc/s5p/proxies.txt # text file, one proxy URL per line
|
||||
# refresh: 300 # re-fetch sources interval (seconds)
|
||||
# test_interval: 120 # health test cycle interval (seconds)
|
||||
@@ -36,7 +59,7 @@ chain:
|
||||
# - www.cloudflare.com
|
||||
# - www.amazon.com
|
||||
# test_timeout: 15 # per-test timeout (seconds)
|
||||
# test_concurrency: 5 # parallel health tests
|
||||
# test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool)
|
||||
# max_fails: 3 # consecutive fails before eviction
|
||||
# state_file: "" # empty = ~/.cache/s5p/pool.json
|
||||
# report_url: "" # POST dead proxies here (optional)
|
||||
@@ -50,26 +73,45 @@ chain:
|
||||
# cookie_file: "" # CookieAuthentication file path
|
||||
# newnym_interval: 0 # periodic NEWNYM (seconds, 0 = manual only)
|
||||
|
||||
# Multi-listener mode -- each listener gets its own address and chain.
|
||||
# The "pool" keyword in a chain appends a random alive proxy from the pool.
|
||||
# Multi-Tor round-robin -- distribute traffic across multiple Tor nodes.
|
||||
# When present, the first hop in each listener's chain is replaced at
|
||||
# connection time by round-robin selection from this list.
|
||||
# tor_nodes:
|
||||
# - socks5://10.200.1.1:9050
|
||||
# - socks5://10.200.1.254:9050
|
||||
# - socks5://10.200.1.250:9050
|
||||
# - socks5://10.200.1.13:9050
|
||||
|
||||
# Multi-listener mode -- each listener gets its own address, chain,
|
||||
# and optional pool assignment. The "pool" keyword in a chain appends
|
||||
# a random alive proxy from the named pool (or "default" if unnamed).
|
||||
# Multiple "pool" entries = multiple pool hops (deeper chaining).
|
||||
#
|
||||
# listeners:
|
||||
# - listen: 0.0.0.0:1080
|
||||
# pool: clean # draw from "clean" pool
|
||||
# chain:
|
||||
# - socks5://127.0.0.1:9050
|
||||
# - pool # Tor + 2 random pool proxies
|
||||
# - pool # Tor + 2 clean pool proxies
|
||||
# - pool
|
||||
#
|
||||
# - listen: 0.0.0.0:1081
|
||||
# pool: clean
|
||||
# chain:
|
||||
# - socks5://127.0.0.1:9050
|
||||
# - pool # Tor + 1 random pool proxy
|
||||
# - pool # Tor + 1 clean pool proxy
|
||||
#
|
||||
# - listen: 0.0.0.0:1082
|
||||
# chain:
|
||||
# - socks5://127.0.0.1:9050 # Tor only (no pool hops)
|
||||
#
|
||||
# - listen: 0.0.0.0:1083
|
||||
# pool: mitm # draw from "mitm" pool
|
||||
# chain:
|
||||
# - socks5://127.0.0.1:9050
|
||||
# - pool # Tor + 2 MITM pool proxies
|
||||
# - pool
|
||||
#
|
||||
# When using "listeners:", the top-level "listen" and "chain" keys are ignored.
|
||||
# If "listeners:" is absent, the old format is used (single listener).
|
||||
|
||||
|
||||
@@ -43,17 +43,35 @@ cp config/example.yaml config/s5p.yaml # create live config (gitignored)
|
||||
```yaml
|
||||
listeners:
|
||||
- listen: 0.0.0.0:1080
|
||||
pool: clean # named pool assignment
|
||||
chain:
|
||||
- socks5://127.0.0.1:9050
|
||||
- pool # Tor + 2 pool hops
|
||||
- pool # Tor + 2 clean hops
|
||||
- pool
|
||||
- listen: 0.0.0.0:1081
|
||||
pool: clean
|
||||
chain:
|
||||
- socks5://127.0.0.1:9050
|
||||
- pool # Tor + 1 pool hop
|
||||
- pool # Tor + 1 clean hop
|
||||
- listen: 0.0.0.0:1082
|
||||
chain:
|
||||
- socks5://127.0.0.1:9050 # Tor only
|
||||
- listen: 0.0.0.0:1083
|
||||
pool: mitm # MITM-capable proxies
|
||||
chain:
|
||||
- socks5://127.0.0.1:9050
|
||||
- pool
|
||||
- pool
|
||||
```
|
||||
|
||||
## Multi-Tor Round-Robin (config)
|
||||
|
||||
```yaml
|
||||
tor_nodes: # overrides first hop in all listeners
|
||||
- socks5://10.200.1.1:9050
|
||||
- socks5://10.200.1.254:9050
|
||||
- socks5://10.200.1.250:9050
|
||||
- socks5://10.200.1.13:9050
|
||||
```
|
||||
|
||||
## Performance Tuning (config)
|
||||
@@ -64,25 +82,39 @@ pool_size: 8 # pre-warmed TCP conns to first hop (0 = off)
|
||||
pool_max_idle: 30 # evict idle pooled conns (seconds)
|
||||
```
|
||||
|
||||
## Proxy Pool (config)
|
||||
## Named Proxy Pools (config)
|
||||
|
||||
```yaml
|
||||
proxy_pool:
|
||||
sources:
|
||||
- url: http://10.200.1.250:8081/proxies
|
||||
proto: socks5
|
||||
limit: 1000
|
||||
- file: /etc/s5p/proxies.txt
|
||||
refresh: 300 # re-fetch interval
|
||||
test_interval: 120 # health test cycle
|
||||
test_targets: # TLS handshake targets (round-robin)
|
||||
- www.google.com
|
||||
- www.cloudflare.com
|
||||
- www.amazon.com
|
||||
max_fails: 3 # evict after N fails
|
||||
report_url: "" # POST dead proxies (optional)
|
||||
proxy_pools:
|
||||
clean:
|
||||
sources:
|
||||
- url: http://10.200.1.250:8081/proxies/all
|
||||
mitm: false # adds ?mitm=0
|
||||
state_file: /data/pool-clean.json
|
||||
refresh: 300
|
||||
test_interval: 120
|
||||
max_fails: 3
|
||||
mitm:
|
||||
sources:
|
||||
- url: http://10.200.1.250:8081/proxies/all
|
||||
mitm: true # adds ?mitm=1
|
||||
state_file: /data/pool-mitm.json
|
||||
refresh: 300
|
||||
test_interval: 120
|
||||
max_fails: 3
|
||||
```
|
||||
|
||||
Singular `proxy_pool:` still works (becomes pool "default").
|
||||
|
||||
## Source Filters (proxy_pool sources)
|
||||
|
||||
| Filter | Values | Query param |
|
||||
|--------|--------|-------------|
|
||||
| `proto` | socks5/socks4/http | `?proto=...` |
|
||||
| `country` | ISO alpha-2 | `?country=...` |
|
||||
| `limit` | integer | `?limit=...` |
|
||||
| `mitm` | true/false | `?mitm=1` / `?mitm=0` |
|
||||
|
||||
## Tor Control Port (config)
|
||||
|
||||
```yaml
|
||||
@@ -164,9 +196,31 @@ python -m pstats ~/.cache/s5p/s5p.prof # container profile output
|
||||
## Metrics Log
|
||||
|
||||
```
|
||||
metrics: conn=142 ok=98 fail=44 retries=88 active=3 in=1.2M out=4.5M up=0h05m12s pool=42/65
|
||||
metrics: conn=1842 ok=1790 fail=52 retries=67 active=3 in=50.0M out=1.0G rate=4.72/s p50=198.3ms p95=890.1ms up=1h01m01s pool=42/65
|
||||
```
|
||||
|
||||
## Metrics JSON (`/metrics`)
|
||||
|
||||
```bash
|
||||
curl -s http://127.0.0.1:1081/metrics | jq .
|
||||
```
|
||||
|
||||
```json
|
||||
{
|
||||
"connections": 1842,
|
||||
"success": 1790,
|
||||
"rate": 4.72,
|
||||
"latency": {"count": 1000, "min": 45.2, "max": 2841.7, "avg": 312.4, "p50": 198.3, "p95": 890.1, "p99": 1523.6},
|
||||
"listener_latency": {
|
||||
"0.0.0.0:1080": {"count": 500, "p50": 1800.2, "p95": 8200.1, "...": "..."},
|
||||
"0.0.0.0:1081": {"count": 300, "p50": 1000.1, "p95": 3500.2, "...": "..."},
|
||||
"0.0.0.0:1082": {"count": 200, "p50": 400.1, "p95": 1200.5, "...": "..."}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Per-listener latency also appears in `/status` under each listener entry.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
| Symptom | Check |
|
||||
|
||||
225
docs/USAGE.md
225
docs/USAGE.md
@@ -47,77 +47,146 @@ pool_size: 0 # pre-warmed TCP connections to first hop (0 = disable
|
||||
pool_max_idle: 30 # max idle time for pooled connections (seconds)
|
||||
api_listen: "" # control API bind address (empty = disabled)
|
||||
|
||||
# Multi-listener (each port gets its own chain depth)
|
||||
# Named proxy pools (each with its own sources and filters)
|
||||
proxy_pools:
|
||||
clean:
|
||||
sources:
|
||||
- url: http://10.200.1.250:8081/proxies/all
|
||||
mitm: false
|
||||
refresh: 300
|
||||
test_interval: 120
|
||||
test_timeout: 8
|
||||
max_fails: 3
|
||||
|
||||
# Multi-listener (each port gets its own chain depth and pool)
|
||||
listeners:
|
||||
- listen: 0.0.0.0:1080
|
||||
pool: clean
|
||||
chain:
|
||||
- socks5://127.0.0.1:9050
|
||||
- pool # Tor + 2 pool proxies
|
||||
- pool # Tor + 2 clean proxies
|
||||
- pool
|
||||
- listen: 0.0.0.0:1081
|
||||
pool: clean
|
||||
chain:
|
||||
- socks5://127.0.0.1:9050
|
||||
- pool # Tor + 1 pool proxy
|
||||
- pool # Tor + 1 clean proxy
|
||||
|
||||
# Or single-listener (old format):
|
||||
# listen: 127.0.0.1:1080
|
||||
# chain:
|
||||
# - socks5://127.0.0.1:9050
|
||||
|
||||
proxy_pool:
|
||||
sources:
|
||||
- url: http://10.200.1.250:8081/proxies
|
||||
proto: socks5
|
||||
limit: 1000
|
||||
- file: /etc/s5p/proxies.txt
|
||||
refresh: 300
|
||||
test_interval: 120
|
||||
test_targets: # TLS handshake targets (round-robin)
|
||||
- www.google.com
|
||||
- www.cloudflare.com
|
||||
- www.amazon.com
|
||||
test_timeout: 15
|
||||
test_concurrency: 5
|
||||
max_fails: 3
|
||||
state_file: "" # empty = ~/.cache/s5p/pool.json
|
||||
```
|
||||
|
||||
## Multi-Tor Round-Robin
|
||||
|
||||
Distribute traffic across multiple Tor nodes instead of funneling everything
|
||||
through a single one. When `tor_nodes` is configured, the first hop in each
|
||||
listener's chain is replaced at connection time by round-robin selection.
|
||||
Health tests also rotate across all nodes.
|
||||
|
||||
```yaml
|
||||
tor_nodes:
|
||||
- socks5://10.200.1.1:9050
|
||||
- socks5://10.200.1.254:9050
|
||||
- socks5://10.200.1.250:9050
|
||||
- socks5://10.200.1.13:9050
|
||||
```
|
||||
|
||||
When `tor_nodes` is absent, listeners use their configured first hop as before.
|
||||
When present, `tor_nodes` overrides the first hop everywhere.
|
||||
|
||||
If `pool_size > 0`, pre-warmed connection pools are created for all nodes
|
||||
automatically.
|
||||
|
||||
### API
|
||||
|
||||
`tor_nodes` appears in both `/config` and `/status` responses:
|
||||
|
||||
```bash
|
||||
curl -s http://127.0.0.1:1090/config | jq '.tor_nodes'
|
||||
curl -s http://127.0.0.1:1090/status | jq '.tor_nodes'
|
||||
```
|
||||
|
||||
## Named Proxy Pools
|
||||
|
||||
Define multiple proxy pools with different source filters. Each listener can
|
||||
reference a specific pool by name via the `pool:` key.
|
||||
|
||||
```yaml
|
||||
proxy_pools:
|
||||
clean:
|
||||
sources:
|
||||
- url: http://10.200.1.250:8081/proxies/all
|
||||
mitm: false
|
||||
state_file: /data/pool-clean.json
|
||||
refresh: 300
|
||||
test_interval: 120
|
||||
test_timeout: 8
|
||||
max_fails: 3
|
||||
mitm:
|
||||
sources:
|
||||
- url: http://10.200.1.250:8081/proxies/all
|
||||
mitm: true
|
||||
state_file: /data/pool-mitm.json
|
||||
refresh: 300
|
||||
test_interval: 120
|
||||
test_timeout: 8
|
||||
max_fails: 3
|
||||
```
|
||||
|
||||
Each pool has independent health testing, state persistence, and source
|
||||
refresh cycles. The `mitm` source filter adds `?mitm=0` or `?mitm=1` to
|
||||
API requests.
|
||||
|
||||
### Backward compatibility
|
||||
|
||||
The singular `proxy_pool:` key still works -- it registers as pool `"default"`.
|
||||
If both `proxy_pool:` and `proxy_pools:` are present, `proxy_pools:` wins;
|
||||
the singular is registered as `"default"` only when not already defined.
|
||||
|
||||
## Multi-Listener Mode
|
||||
|
||||
Run multiple listeners on different ports, each with a different number
|
||||
of proxy hops after the static chain. Config-file only (not available via CLI).
|
||||
of proxy hops and pool assignment. Config-file only (not available via CLI).
|
||||
|
||||
```yaml
|
||||
listeners:
|
||||
- listen: 0.0.0.0:1080
|
||||
pool: clean
|
||||
chain:
|
||||
- socks5://10.200.1.13:9050
|
||||
- pool # Tor + 2 pool proxies
|
||||
- pool # Tor + 2 clean proxies
|
||||
- pool
|
||||
|
||||
- listen: 0.0.0.0:1081
|
||||
pool: clean
|
||||
chain:
|
||||
- socks5://10.200.1.13:9050
|
||||
- pool # Tor + 1 pool proxy
|
||||
- pool # Tor + 1 clean proxy
|
||||
|
||||
- listen: 0.0.0.0:1082
|
||||
chain:
|
||||
- socks5://10.200.1.13:9050 # Tor only
|
||||
- socks5://10.200.1.13:9050 # Tor only (no pool)
|
||||
|
||||
proxy_pool:
|
||||
sources:
|
||||
- url: http://10.200.1.250:8081/proxies/all?mitm=0
|
||||
refresh: 300
|
||||
test_interval: 120
|
||||
max_fails: 3
|
||||
- listen: 0.0.0.0:1083
|
||||
pool: mitm
|
||||
chain:
|
||||
- socks5://10.200.1.13:9050
|
||||
- pool # Tor + 2 MITM proxies
|
||||
- pool
|
||||
```
|
||||
|
||||
The `pool` keyword in a chain means "append a random alive proxy from the
|
||||
shared pool". Multiple `pool` entries = multiple pool hops (deeper chaining).
|
||||
assigned pool". Multiple `pool` entries = multiple pool hops (deeper chaining).
|
||||
|
||||
When `pool:` is omitted on a listener with pool hops, it defaults to
|
||||
`"default"`. A listener referencing an unknown pool name causes a fatal
|
||||
error at startup. Listeners without pool hops ignore the `pool:` key.
|
||||
|
||||
| Resource | Scope | Notes |
|
||||
|----------|-------|-------|
|
||||
| ProxyPool | shared | All listeners draw from one pool |
|
||||
| ProxyPool | per name | Each named pool is independent |
|
||||
| TorController | shared | One Tor instance |
|
||||
| Metrics | shared | Aggregate stats across listeners |
|
||||
| Semaphore | shared | Global `max_connections` cap |
|
||||
@@ -172,6 +241,7 @@ proxy_pool:
|
||||
proto: socks5 # optional: filter by protocol
|
||||
country: US # optional: filter by country
|
||||
limit: 1000 # max proxies to fetch from API
|
||||
mitm: false # optional: filter by MITM status (true/false)
|
||||
- file: /etc/s5p/proxies.txt # text file, one proxy URL per line
|
||||
refresh: 300 # re-fetch sources every 300 seconds
|
||||
test_interval: 120 # health test cycle every 120 seconds
|
||||
@@ -180,9 +250,9 @@ proxy_pool:
|
||||
- www.cloudflare.com
|
||||
- www.amazon.com
|
||||
test_timeout: 15 # per-test timeout (seconds)
|
||||
test_concurrency: 5 # parallel health tests
|
||||
test_concurrency: 25 # max parallel tests (auto-scales to ~10% of pool)
|
||||
max_fails: 3 # evict after N consecutive failures
|
||||
state_file: "" # empty = ~/.cache/s5p/pool.json
|
||||
state_file: "" # empty = ~/.cache/s5p/pool[-name].json
|
||||
report_url: "" # POST dead proxies here (optional)
|
||||
```
|
||||
|
||||
@@ -193,6 +263,17 @@ proxy_pool:
|
||||
| HTTP API | `url` | JSON: `{"proxies": [{"proto": "socks5", "proxy": "host:port"}, ...]}` |
|
||||
| Text file | `file` | One proxy URL per line, `#` comments, blank lines ignored |
|
||||
|
||||
### Source filters
|
||||
|
||||
| Filter | Values | Effect |
|
||||
|--------|--------|--------|
|
||||
| `proto` | `socks5`, `socks4`, `http` | Adds `?proto=...` to API URL |
|
||||
| `country` | ISO 3166-1 alpha-2 | Adds `?country=...` to API URL |
|
||||
| `limit` | integer | Adds `?limit=...` to API URL |
|
||||
| `mitm` | `true` / `false` | Adds `?mitm=1` / `?mitm=0` to API URL |
|
||||
|
||||
The `mitm` filter is silently ignored for file sources.
|
||||
|
||||
### Proxy file format
|
||||
|
||||
```
|
||||
@@ -209,6 +290,10 @@ by performing a TLS handshake against one of the `test_targets` (rotated
|
||||
round-robin). A successful handshake marks the proxy alive. After `max_fails`
|
||||
consecutive failures, a proxy is evicted.
|
||||
|
||||
Concurrency auto-scales to ~10% of the proxy count, capped by
|
||||
`test_concurrency` (default 25, minimum 3). For example, a pool of 73 proxies
|
||||
tests 7 at a time rather than saturating the upstream Tor node.
|
||||
|
||||
Before each health test cycle, the static chain is tested without any pool
|
||||
proxy. If the chain itself is unreachable (e.g., Tor is down), proxy tests
|
||||
are skipped entirely and a warning is logged. This prevents false mass-failure
|
||||
@@ -307,7 +392,7 @@ s5p --api 127.0.0.1:1081 -c config/s5p.yaml
|
||||
| Method | Path | Description |
|
||||
|--------|------|-------------|
|
||||
| `GET` | `/status` | Combined summary: uptime, metrics, pool stats, chain |
|
||||
| `GET` | `/metrics` | Full metrics counters (connections, bytes, uptime) |
|
||||
| `GET` | `/metrics` | Full metrics counters (connections, bytes, rate, latency) |
|
||||
| `GET` | `/pool` | All proxies with per-entry state |
|
||||
| `GET` | `/pool/alive` | Alive proxies only |
|
||||
| `GET` | `/config` | Current runtime config (sanitized) |
|
||||
@@ -461,7 +546,7 @@ s5p tracks connection metrics and logs a summary every 60 seconds and on
|
||||
shutdown:
|
||||
|
||||
```
|
||||
metrics: conn=142 ok=98 fail=44 retries=88 active=3 in=1.2M out=4.5M up=0h05m12s pool=42/65
|
||||
metrics: conn=1842 ok=1790 fail=52 retries=67 active=3 in=50.0M out=1.0G rate=4.72/s p50=198.3ms p95=890.1ms up=1h01m01s pool=42/65
|
||||
```
|
||||
|
||||
| Counter | Meaning |
|
||||
@@ -473,9 +558,77 @@ metrics: conn=142 ok=98 fail=44 retries=88 active=3 in=1.2M out=4.5M up=0h05m12s
|
||||
| `active` | Currently relaying |
|
||||
| `in` | Bytes client -> remote |
|
||||
| `out` | Bytes remote -> client |
|
||||
| `rate` | Connection rate (events/sec, rolling window) |
|
||||
| `p50` | Median chain setup latency in ms |
|
||||
| `p95` | 95th percentile chain setup latency in ms |
|
||||
| `up` | Server uptime |
|
||||
| `pool` | Alive/total proxies (only when pool is active) |
|
||||
|
||||
### `/metrics` JSON response
|
||||
|
||||
`GET /metrics` returns all counters plus rate, latency percentiles, and
|
||||
per-listener latency breakdowns:
|
||||
|
||||
```json
|
||||
{
|
||||
"connections": 1842,
|
||||
"success": 1790,
|
||||
"failed": 52,
|
||||
"retries": 67,
|
||||
"active": 3,
|
||||
"bytes_in": 52428800,
|
||||
"bytes_out": 1073741824,
|
||||
"uptime": 3661.2,
|
||||
"rate": 4.72,
|
||||
"latency": {
|
||||
"count": 1000,
|
||||
"min": 45.2,
|
||||
"max": 2841.7,
|
||||
"avg": 312.4,
|
||||
"p50": 198.3,
|
||||
"p95": 890.1,
|
||||
"p99": 1523.6
|
||||
},
|
||||
"listener_latency": {
|
||||
"0.0.0.0:1080": {"count": 500, "min": 800.1, "max": 12400.3, "avg": 2100.5, "p50": 1800.2, "p95": 8200.1, "p99": 10500.3},
|
||||
"0.0.0.0:1081": {"count": 300, "min": 400.5, "max": 5200.1, "avg": 1200.3, "p50": 1000.1, "p95": 3500.2, "p99": 4800.7},
|
||||
"0.0.0.0:1082": {"count": 200, "min": 150.2, "max": 2000.1, "avg": 500.3, "p50": 400.1, "p95": 1200.5, "p99": 1800.2}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
| Field | Type | Description |
|
||||
|-------|------|-------------|
|
||||
| `rate` | float | Connections/sec (rolling window of last 256 events) |
|
||||
| `latency` | object/null | Aggregate chain setup latency in ms (null if no samples) |
|
||||
| `latency.count` | int | Number of samples in buffer (max 1000) |
|
||||
| `latency.p50` | float | Median latency (ms) |
|
||||
| `latency.p95` | float | 95th percentile (ms) |
|
||||
| `latency.p99` | float | 99th percentile (ms) |
|
||||
| `listener_latency` | object | Per-listener latency, keyed by `host:port` |
|
||||
|
||||
### Per-listener latency
|
||||
|
||||
Each listener tracks chain setup latency independently. The `/status`
|
||||
endpoint includes a `latency` field on each listener entry:
|
||||
|
||||
```json
|
||||
{
|
||||
"listeners": [
|
||||
{
|
||||
"listen": "0.0.0.0:1080",
|
||||
"chain": ["socks5://10.200.1.13:9050"],
|
||||
"pool_hops": 2,
|
||||
"latency": {"count": 500, "p50": 1800.2, "p95": 8200.1, "...": "..."}
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
The aggregate `latency` in `/metrics` combines all listeners. Use
|
||||
`listener_latency` or the per-listener `latency` in `/status` to
|
||||
isolate latency by chain depth.
|
||||
|
||||
## Profiling
|
||||
|
||||
```bash
|
||||
|
||||
121
src/s5p/api.py
121
src/s5p/api.py
@@ -63,17 +63,34 @@ def _handle_status(ctx: dict) -> tuple[int, dict]:
|
||||
"active": metrics.active,
|
||||
"bytes_in": metrics.bytes_in,
|
||||
"bytes_out": metrics.bytes_out,
|
||||
"rate": round(metrics.conn_rate.rate(), 2),
|
||||
"latency": metrics.latency.stats(),
|
||||
}
|
||||
pool = ctx.get("pool")
|
||||
if pool:
|
||||
pools: dict = ctx.get("pools") or {}
|
||||
if pools:
|
||||
total_alive = sum(p.alive_count for p in pools.values())
|
||||
total_count = sum(p.count for p in pools.values())
|
||||
data["pool"] = {"alive": total_alive, "total": total_count}
|
||||
data["pools"] = {
|
||||
name: {"alive": p.alive_count, "total": p.count}
|
||||
for name, p in pools.items()
|
||||
}
|
||||
elif ctx.get("pool"):
|
||||
pool = ctx["pool"]
|
||||
data["pool"] = {"alive": pool.alive_count, "total": pool.count}
|
||||
config = ctx.get("config")
|
||||
if config and config.tor_nodes:
|
||||
data["tor_nodes"] = [str(n) for n in config.tor_nodes]
|
||||
if config:
|
||||
data["listeners"] = [
|
||||
{
|
||||
"listen": f"{lc.listen_host}:{lc.listen_port}",
|
||||
"chain": [str(h) for h in lc.chain],
|
||||
"pool_hops": lc.pool_hops,
|
||||
**({"pool": lc.pool_name} if lc.pool_name else {}),
|
||||
"latency": metrics.get_listener_latency(
|
||||
f"{lc.listen_host}:{lc.listen_port}"
|
||||
).stats(),
|
||||
}
|
||||
for lc in config.listeners
|
||||
]
|
||||
@@ -87,27 +104,48 @@ def _handle_metrics(ctx: dict) -> tuple[int, dict]:
|
||||
|
||||
def _handle_pool(ctx: dict, alive_only: bool = False) -> tuple[int, dict]:
|
||||
"""GET /pool or /pool/alive -- proxy pool state."""
|
||||
pool = ctx.get("pool")
|
||||
if not pool:
|
||||
pools: dict = ctx.get("pools") or {}
|
||||
pool_list = list(pools.values()) if pools else []
|
||||
# backward compat: fall back to single "pool" key
|
||||
if not pool_list and ctx.get("pool"):
|
||||
pool_list = [ctx["pool"]]
|
||||
|
||||
if not pool_list:
|
||||
return 200, {"alive": 0, "total": 0, "proxies": {}}
|
||||
|
||||
multi = len(pool_list) > 1
|
||||
proxies = {}
|
||||
for key, entry in pool._proxies.items():
|
||||
if alive_only and not entry.alive:
|
||||
continue
|
||||
proxies[key] = {
|
||||
"alive": entry.alive,
|
||||
"fails": entry.fails,
|
||||
"tests": entry.tests,
|
||||
"last_ok": entry.last_ok,
|
||||
"last_test": entry.last_test,
|
||||
"last_seen": entry.last_seen,
|
||||
}
|
||||
return 200, {
|
||||
"alive": pool.alive_count,
|
||||
"total": pool.count,
|
||||
total_alive = 0
|
||||
total_count = 0
|
||||
for p in pool_list:
|
||||
total_alive += p.alive_count
|
||||
total_count += p.count
|
||||
for key, entry in p._proxies.items():
|
||||
if alive_only and not entry.alive:
|
||||
continue
|
||||
rec: dict = {
|
||||
"alive": entry.alive,
|
||||
"fails": entry.fails,
|
||||
"tests": entry.tests,
|
||||
"last_ok": entry.last_ok,
|
||||
"last_test": entry.last_test,
|
||||
"last_seen": entry.last_seen,
|
||||
}
|
||||
if multi:
|
||||
rec["pool"] = p.name
|
||||
proxies[key] = rec
|
||||
|
||||
result: dict = {
|
||||
"alive": total_alive,
|
||||
"total": total_count,
|
||||
"proxies": proxies,
|
||||
}
|
||||
if multi:
|
||||
result["pools"] = {
|
||||
p.name: {"alive": p.alive_count, "total": p.count}
|
||||
for p in pool_list
|
||||
}
|
||||
return 200, result
|
||||
|
||||
|
||||
def _handle_config(ctx: dict) -> tuple[int, dict]:
|
||||
@@ -127,15 +165,38 @@ def _handle_config(ctx: dict) -> tuple[int, dict]:
|
||||
"listen": f"{lc.listen_host}:{lc.listen_port}",
|
||||
"chain": [str(h) for h in lc.chain],
|
||||
"pool_hops": lc.pool_hops,
|
||||
**({"pool": lc.pool_name} if lc.pool_name else {}),
|
||||
}
|
||||
for lc in config.listeners
|
||||
],
|
||||
}
|
||||
if config.proxy_pool:
|
||||
if config.tor_nodes:
|
||||
data["tor_nodes"] = [str(n) for n in config.tor_nodes]
|
||||
if config.proxy_pools:
|
||||
pools_data: dict = {}
|
||||
for name, pp in config.proxy_pools.items():
|
||||
sources = []
|
||||
for src in pp.sources:
|
||||
s: dict = {}
|
||||
if src.url:
|
||||
s["url"] = src.url
|
||||
if src.file:
|
||||
s["file"] = src.file
|
||||
if src.mitm is not None:
|
||||
s["mitm"] = src.mitm
|
||||
sources.append(s)
|
||||
pools_data[name] = {
|
||||
"sources": sources,
|
||||
"refresh": pp.refresh,
|
||||
"test_interval": pp.test_interval,
|
||||
"max_fails": pp.max_fails,
|
||||
}
|
||||
data["proxy_pools"] = pools_data
|
||||
elif config.proxy_pool:
|
||||
pp = config.proxy_pool
|
||||
sources = []
|
||||
for src in pp.sources:
|
||||
s: dict = {}
|
||||
s = {}
|
||||
if src.url:
|
||||
s["url"] = src.url
|
||||
if src.file:
|
||||
@@ -164,19 +225,27 @@ async def _handle_reload(ctx: dict) -> tuple[int, dict]:
|
||||
|
||||
async def _handle_pool_test(ctx: dict) -> tuple[int, dict]:
|
||||
"""POST /pool/test -- trigger immediate health test."""
|
||||
pool = ctx.get("pool")
|
||||
if not pool:
|
||||
pools: dict = ctx.get("pools") or {}
|
||||
pool_list = list(pools.values()) if pools else []
|
||||
if not pool_list and ctx.get("pool"):
|
||||
pool_list = [ctx["pool"]]
|
||||
if not pool_list:
|
||||
return 400, {"error": "no proxy pool configured"}
|
||||
asyncio.create_task(pool._run_health_tests())
|
||||
for p in pool_list:
|
||||
asyncio.create_task(p._run_health_tests())
|
||||
return 200, {"ok": True}
|
||||
|
||||
|
||||
async def _handle_pool_refresh(ctx: dict) -> tuple[int, dict]:
|
||||
"""POST /pool/refresh -- trigger immediate source re-fetch."""
|
||||
pool = ctx.get("pool")
|
||||
if not pool:
|
||||
pools: dict = ctx.get("pools") or {}
|
||||
pool_list = list(pools.values()) if pools else []
|
||||
if not pool_list and ctx.get("pool"):
|
||||
pool_list = [ctx["pool"]]
|
||||
if not pool_list:
|
||||
return 400, {"error": "no proxy pool configured"}
|
||||
asyncio.create_task(pool._fetch_all_sources())
|
||||
for p in pool_list:
|
||||
asyncio.create_task(p._fetch_all_sources())
|
||||
return 200, {"ok": True}
|
||||
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@ class PoolSourceConfig:
|
||||
proto: str | None = None
|
||||
country: str | None = None
|
||||
limit: int | None = 1000
|
||||
mitm: bool | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -52,7 +53,7 @@ class ProxyPoolConfig:
|
||||
"www.amazon.com",
|
||||
])
|
||||
test_timeout: float = 15.0
|
||||
test_concurrency: int = 5
|
||||
test_concurrency: int = 25
|
||||
max_fails: int = 3
|
||||
state_file: str = ""
|
||||
report_url: str = ""
|
||||
@@ -85,6 +86,7 @@ class ListenerConfig:
|
||||
listen_port: int = 1080
|
||||
chain: list[ChainHop] = field(default_factory=list)
|
||||
pool_hops: int = 0
|
||||
pool_name: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -104,7 +106,9 @@ class Config:
|
||||
api_host: str = ""
|
||||
api_port: int = 0
|
||||
proxy_pool: ProxyPoolConfig | None = None
|
||||
proxy_pools: dict[str, ProxyPoolConfig] = field(default_factory=dict)
|
||||
tor: TorConfig | None = None
|
||||
tor_nodes: list[ChainHop] = field(default_factory=list)
|
||||
config_file: str = ""
|
||||
|
||||
|
||||
@@ -151,6 +155,39 @@ def parse_api_proxies(data: dict) -> list[ChainHop]:
|
||||
return proxies
|
||||
|
||||
|
||||
def _parse_pool_config(pool_raw: dict) -> ProxyPoolConfig:
|
||||
"""Parse a single proxy pool config block from YAML."""
|
||||
sources = []
|
||||
for src in pool_raw.get("sources", []):
|
||||
mitm = src.get("mitm")
|
||||
if mitm is not None:
|
||||
mitm = bool(mitm)
|
||||
sources.append(
|
||||
PoolSourceConfig(
|
||||
url=src.get("url"),
|
||||
file=src.get("file"),
|
||||
proto=src.get("proto"),
|
||||
country=src.get("country"),
|
||||
limit=src.get("limit", 1000),
|
||||
mitm=mitm,
|
||||
)
|
||||
)
|
||||
kwargs: dict = {
|
||||
"sources": sources,
|
||||
"refresh": float(pool_raw.get("refresh", 300)),
|
||||
"test_interval": float(pool_raw.get("test_interval", 120)),
|
||||
"test_url": pool_raw.get("test_url", ""),
|
||||
"test_timeout": float(pool_raw.get("test_timeout", 15)),
|
||||
"test_concurrency": int(pool_raw.get("test_concurrency", 25)),
|
||||
"max_fails": int(pool_raw.get("max_fails", 3)),
|
||||
"state_file": pool_raw.get("state_file", ""),
|
||||
"report_url": pool_raw.get("report_url", ""),
|
||||
}
|
||||
if "test_targets" in pool_raw:
|
||||
kwargs["test_targets"] = list(pool_raw["test_targets"])
|
||||
return ProxyPoolConfig(**kwargs)
|
||||
|
||||
|
||||
def load_config(path: str | Path) -> Config:
|
||||
"""Load configuration from a YAML file."""
|
||||
path = Path(path)
|
||||
@@ -210,33 +247,16 @@ def load_config(path: str | Path) -> Config:
|
||||
)
|
||||
)
|
||||
|
||||
# -- proxy pools (named) ------------------------------------------------
|
||||
if "proxy_pools" in raw:
|
||||
for name, pool_raw in raw["proxy_pools"].items():
|
||||
config.proxy_pools[name] = _parse_pool_config(pool_raw)
|
||||
|
||||
if "proxy_pool" in raw:
|
||||
pool_raw = raw["proxy_pool"]
|
||||
sources = []
|
||||
for src in pool_raw.get("sources", []):
|
||||
sources.append(
|
||||
PoolSourceConfig(
|
||||
url=src.get("url"),
|
||||
file=src.get("file"),
|
||||
proto=src.get("proto"),
|
||||
country=src.get("country"),
|
||||
limit=src.get("limit", 1000),
|
||||
)
|
||||
)
|
||||
kwargs: dict = {
|
||||
"sources": sources,
|
||||
"refresh": float(pool_raw.get("refresh", 300)),
|
||||
"test_interval": float(pool_raw.get("test_interval", 120)),
|
||||
"test_url": pool_raw.get("test_url", ""),
|
||||
"test_timeout": float(pool_raw.get("test_timeout", 15)),
|
||||
"test_concurrency": int(pool_raw.get("test_concurrency", 5)),
|
||||
"max_fails": int(pool_raw.get("max_fails", 3)),
|
||||
"state_file": pool_raw.get("state_file", ""),
|
||||
"report_url": pool_raw.get("report_url", ""),
|
||||
}
|
||||
if "test_targets" in pool_raw:
|
||||
kwargs["test_targets"] = list(pool_raw["test_targets"])
|
||||
config.proxy_pool = ProxyPoolConfig(**kwargs)
|
||||
config.proxy_pool = _parse_pool_config(raw["proxy_pool"])
|
||||
# register singular as "default" when proxy_pools doesn't already have it
|
||||
if "default" not in config.proxy_pools:
|
||||
config.proxy_pools["default"] = config.proxy_pool
|
||||
elif "proxy_source" in raw:
|
||||
# backward compat: convert legacy proxy_source to proxy_pool
|
||||
src_raw = raw["proxy_source"]
|
||||
@@ -267,6 +287,10 @@ def load_config(path: str | Path) -> Config:
|
||||
newnym_interval=float(tor_raw.get("newnym_interval", 0)),
|
||||
)
|
||||
|
||||
# -- tor_nodes -------------------------------------------------------
|
||||
if "tor_nodes" in raw:
|
||||
config.tor_nodes = [parse_proxy_url(u) for u in raw["tor_nodes"]]
|
||||
|
||||
# -- listeners -------------------------------------------------------
|
||||
if "listeners" in raw:
|
||||
for entry in raw["listeners"]:
|
||||
@@ -278,6 +302,8 @@ def load_config(path: str | Path) -> Config:
|
||||
lc.listen_port = int(port_str)
|
||||
elif isinstance(listen, (str, int)) and listen:
|
||||
lc.listen_port = int(listen)
|
||||
if "pool" in entry:
|
||||
lc.pool_name = entry["pool"]
|
||||
chain_raw = entry.get("chain", [])
|
||||
for item in chain_raw:
|
||||
if isinstance(item, str) and item.lower() == "pool":
|
||||
|
||||
@@ -3,6 +3,69 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from collections import deque
|
||||
|
||||
|
||||
class RateTracker:
|
||||
"""Rolling window event rate (events/sec).
|
||||
|
||||
Stores up to ``maxlen`` monotonic timestamps. Rate is computed
|
||||
on read as ``(n - 1) / span`` over the window -- no background timer.
|
||||
"""
|
||||
|
||||
def __init__(self, maxlen: int = 256) -> None:
|
||||
self._times: deque[float] = deque(maxlen=maxlen)
|
||||
|
||||
def record(self, now: float | None = None) -> None:
|
||||
"""Record an event at *now* (default: ``time.monotonic()``)."""
|
||||
self._times.append(now if now is not None else time.monotonic())
|
||||
|
||||
def rate(self) -> float:
|
||||
"""Return events/sec over the window, 0.0 if < 2 events."""
|
||||
n = len(self._times)
|
||||
if n < 2:
|
||||
return 0.0
|
||||
span = self._times[-1] - self._times[0]
|
||||
if span <= 0:
|
||||
return 0.0
|
||||
return (n - 1) / span
|
||||
|
||||
|
||||
class LatencyTracker:
|
||||
"""Circular buffer of latency samples with percentile stats.
|
||||
|
||||
Stores up to ``maxlen`` float-second samples. ``stats()`` sorts
|
||||
a copy on read (~0.1 ms for 1000 floats) and returns millisecond values.
|
||||
"""
|
||||
|
||||
def __init__(self, maxlen: int = 1000) -> None:
|
||||
self._samples: deque[float] = deque(maxlen=maxlen)
|
||||
|
||||
def record(self, seconds: float) -> None:
|
||||
"""Append a latency sample (in seconds)."""
|
||||
self._samples.append(seconds)
|
||||
|
||||
@property
|
||||
def count(self) -> int:
|
||||
"""Number of samples currently stored."""
|
||||
return len(self._samples)
|
||||
|
||||
def stats(self) -> dict | None:
|
||||
"""Return ``{count, min, max, avg, p50, p95, p99}`` in ms, or None."""
|
||||
n = len(self._samples)
|
||||
if n == 0:
|
||||
return None
|
||||
s = sorted(self._samples)
|
||||
ms = [v * 1000 for v in s]
|
||||
return {
|
||||
"count": n,
|
||||
"min": round(ms[0], 1),
|
||||
"max": round(ms[-1], 1),
|
||||
"avg": round(sum(ms) / n, 1),
|
||||
"p50": round(ms[int(n * 0.50)], 1),
|
||||
"p95": round(ms[min(int(n * 0.95), n - 1)], 1),
|
||||
"p99": round(ms[min(int(n * 0.99), n - 1)], 1),
|
||||
}
|
||||
|
||||
|
||||
class Metrics:
|
||||
@@ -21,16 +84,30 @@ class Metrics:
|
||||
self.bytes_out: int = 0
|
||||
self.active: int = 0
|
||||
self.started: float = time.monotonic()
|
||||
self.conn_rate: RateTracker = RateTracker()
|
||||
self.latency: LatencyTracker = LatencyTracker()
|
||||
self.listener_latency: dict[str, LatencyTracker] = {}
|
||||
|
||||
def get_listener_latency(self, key: str) -> LatencyTracker:
|
||||
"""Get or create a per-listener latency tracker."""
|
||||
if key not in self.listener_latency:
|
||||
self.listener_latency[key] = LatencyTracker()
|
||||
return self.listener_latency[key]
|
||||
|
||||
def summary(self) -> str:
|
||||
"""One-line log-friendly summary."""
|
||||
uptime = time.monotonic() - self.started
|
||||
h, rem = divmod(int(uptime), 3600)
|
||||
m, s = divmod(rem, 60)
|
||||
rate = self.conn_rate.rate()
|
||||
lat = self.latency.stats()
|
||||
p50 = f" p50={lat['p50']:.1f}ms" if lat else ""
|
||||
p95 = f" p95={lat['p95']:.1f}ms" if lat else ""
|
||||
return (
|
||||
f"conn={self.connections} ok={self.success} fail={self.failed} "
|
||||
f"retries={self.retries} active={self.active} "
|
||||
f"in={_human_bytes(self.bytes_in)} out={_human_bytes(self.bytes_out)} "
|
||||
f"rate={rate:.2f}/s{p50}{p95} "
|
||||
f"up={h}h{m:02d}m{s:02d}s"
|
||||
)
|
||||
|
||||
@@ -45,6 +122,11 @@ class Metrics:
|
||||
"bytes_in": self.bytes_in,
|
||||
"bytes_out": self.bytes_out,
|
||||
"uptime": round(time.monotonic() - self.started, 1),
|
||||
"rate": round(self.conn_rate.rate(), 2),
|
||||
"latency": self.latency.stats(),
|
||||
"listener_latency": {
|
||||
k: v.stats() for k, v in self.listener_latency.items()
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -54,10 +54,16 @@ class ProxyPool:
|
||||
cfg: ProxyPoolConfig,
|
||||
chain: list[ChainHop],
|
||||
timeout: float,
|
||||
chain_nodes: list[ChainHop] | None = None,
|
||||
name: str = "default",
|
||||
) -> None:
|
||||
self._cfg = cfg
|
||||
self._chain = list(chain)
|
||||
self._chain_nodes = chain_nodes or []
|
||||
self._chain_idx = 0
|
||||
self._timeout = timeout
|
||||
self._name = name
|
||||
self._log_prefix = f"pool[{name}]" if name != "default" else "pool"
|
||||
self._proxies: dict[str, ProxyEntry] = {}
|
||||
self._alive_keys: list[str] = []
|
||||
self._tasks: list[asyncio.Task] = []
|
||||
@@ -66,6 +72,20 @@ class ProxyPool:
|
||||
self._ssl_ctx = ssl.create_default_context()
|
||||
self._target_idx = 0
|
||||
|
||||
def _effective_chain(self) -> list[ChainHop]:
|
||||
"""Return chain with first hop rotated across tor_nodes (if configured)."""
|
||||
if not self._chain_nodes or not self._chain:
|
||||
return self._chain
|
||||
chain = list(self._chain)
|
||||
chain[0] = self._chain_nodes[self._chain_idx % len(self._chain_nodes)]
|
||||
self._chain_idx += 1
|
||||
return chain
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""Pool name."""
|
||||
return self._name
|
||||
|
||||
# -- public interface ----------------------------------------------------
|
||||
|
||||
async def start(self) -> None:
|
||||
@@ -87,7 +107,7 @@ class ProxyPool:
|
||||
async def reload(self, cfg: ProxyPoolConfig) -> None:
|
||||
"""Update pool config and trigger source re-fetch."""
|
||||
self._cfg = cfg
|
||||
logger.info("pool: config reloaded, re-fetching sources")
|
||||
logger.info("%s: config reloaded, re-fetching sources", self._log_prefix)
|
||||
await self._fetch_all_sources()
|
||||
self._save_state()
|
||||
|
||||
@@ -167,10 +187,10 @@ class ProxyPool:
|
||||
label = src.url or src.file or "?"
|
||||
if isinstance(result, Exception):
|
||||
err = str(result) or type(result).__name__
|
||||
logger.warning("pool: source %s failed: %s", label, err)
|
||||
logger.warning("%s: source %s failed: %s", self._log_prefix, label, err)
|
||||
else:
|
||||
kind = "fetched" if src.url else "loaded"
|
||||
logger.info("pool: %s %d proxies from %s", kind, len(result), label)
|
||||
logger.info("%s: %s %d proxies from %s", self._log_prefix, kind, len(result), label)
|
||||
proxies.extend(result)
|
||||
self._merge(proxies)
|
||||
|
||||
@@ -183,6 +203,8 @@ class ProxyPool:
|
||||
params["proto"] = src.proto
|
||||
if src.country:
|
||||
params["country"] = src.country
|
||||
if src.mitm is not None:
|
||||
params["mitm"] = "1" if src.mitm else "0"
|
||||
|
||||
url = src.url
|
||||
if params:
|
||||
@@ -196,7 +218,7 @@ class ProxyPool:
|
||||
"""Parse a text file with one proxy URL per line (runs in executor)."""
|
||||
path = Path(src.file).expanduser()
|
||||
if not path.is_file():
|
||||
logger.warning("pool: file not found: %s", path)
|
||||
logger.warning("%s: file not found: %s", self._log_prefix, path)
|
||||
return []
|
||||
|
||||
proxies: list[ChainHop] = []
|
||||
@@ -207,7 +229,7 @@ class ProxyPool:
|
||||
try:
|
||||
hop = parse_proxy_url(line)
|
||||
except ValueError as e:
|
||||
logger.debug("pool: skipping invalid line %r: %s", line, e)
|
||||
logger.debug("%s: skipping invalid line %r: %s", self._log_prefix, line, e)
|
||||
continue
|
||||
if src.proto and hop.proto != src.proto:
|
||||
continue
|
||||
@@ -268,13 +290,13 @@ class ProxyPool:
|
||||
"""Test a single proxy via TLS handshake through the full chain."""
|
||||
entry.last_test = time.time()
|
||||
entry.tests += 1
|
||||
return await self._tls_check(self._chain + [entry.hop])
|
||||
return await self._tls_check(self._effective_chain() + [entry.hop])
|
||||
|
||||
async def _test_chain(self) -> bool:
|
||||
"""Test the static chain without any pool proxy."""
|
||||
if not self._chain:
|
||||
return True
|
||||
return await self._tls_check(self._chain)
|
||||
return await self._tls_check(self._effective_chain())
|
||||
|
||||
async def _run_health_tests(self, keys: list[str] | None = None) -> None:
|
||||
"""Test proxies with bounded concurrency.
|
||||
@@ -289,7 +311,10 @@ class ProxyPool:
|
||||
if self._chain:
|
||||
chain_ok = await self._test_chain()
|
||||
if not chain_ok:
|
||||
logger.warning("pool: static chain unreachable, skipping proxy tests")
|
||||
logger.warning(
|
||||
"%s: static chain unreachable, skipping proxy tests",
|
||||
self._log_prefix,
|
||||
)
|
||||
return
|
||||
|
||||
target = (
|
||||
@@ -300,7 +325,12 @@ class ProxyPool:
|
||||
if not target:
|
||||
return
|
||||
|
||||
sem = asyncio.Semaphore(self._cfg.test_concurrency)
|
||||
effective = max(3, min(len(target) // 10, self._cfg.test_concurrency))
|
||||
sem = asyncio.Semaphore(effective)
|
||||
logger.debug(
|
||||
"%s: testing %d proxies (concurrency=%d)",
|
||||
self._log_prefix, len(target), effective,
|
||||
)
|
||||
results: dict[str, bool] = {}
|
||||
|
||||
async def _test(key: str, entry: ProxyEntry) -> None:
|
||||
@@ -329,8 +359,8 @@ class ProxyPool:
|
||||
skip_eviction = fail_rate > 0.90 and total > 10
|
||||
if skip_eviction:
|
||||
logger.warning(
|
||||
"pool: %d/%d tests failed (%.0f%%), skipping eviction",
|
||||
total - passed, total, fail_rate * 100,
|
||||
"%s: %d/%d tests failed (%.0f%%), skipping eviction",
|
||||
self._log_prefix, total - passed, total, fail_rate * 100,
|
||||
)
|
||||
|
||||
evict_keys: list[str] = []
|
||||
@@ -369,7 +399,8 @@ class ProxyPool:
|
||||
parts.append(f"stale {len(stale_keys)}")
|
||||
suffix = f" ({', '.join(parts)})" if parts else ""
|
||||
logger.info(
|
||||
"pool: %d proxies, %d alive%s",
|
||||
"%s: %d proxies, %d alive%s",
|
||||
self._log_prefix,
|
||||
len(self._proxies),
|
||||
len(self._alive_keys),
|
||||
suffix,
|
||||
@@ -393,9 +424,12 @@ class ProxyPool:
|
||||
|
||||
try:
|
||||
await http_post_json(self._cfg.report_url, {"dead": dead})
|
||||
logger.info("pool: reported %d dead proxies to %s", len(dead), self._cfg.report_url)
|
||||
logger.info(
|
||||
"%s: reported %d dead proxies to %s",
|
||||
self._log_prefix, len(dead), self._cfg.report_url,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("pool: report failed: %s", e)
|
||||
logger.debug("%s: report failed: %s", self._log_prefix, e)
|
||||
|
||||
def _rebuild_alive(self) -> None:
|
||||
"""Rebuild the alive keys list from current state."""
|
||||
@@ -435,11 +469,12 @@ class ProxyPool:
|
||||
# -- persistence ---------------------------------------------------------
|
||||
|
||||
def _resolve_state_path(self) -> Path:
|
||||
"""Resolve state file path, defaulting to ~/.cache/s5p/pool.json."""
|
||||
"""Resolve state file path, defaulting to ~/.cache/s5p/pool[-name].json."""
|
||||
if self._cfg.state_file:
|
||||
return Path(self._cfg.state_file).expanduser()
|
||||
cache_dir = Path.home() / ".cache" / "s5p"
|
||||
return cache_dir / "pool.json"
|
||||
filename = "pool.json" if self._name == "default" else f"pool-{self._name}.json"
|
||||
return cache_dir / filename
|
||||
|
||||
def _load_state(self) -> None:
|
||||
"""Load proxy state from JSON file (warm start)."""
|
||||
@@ -448,7 +483,7 @@ class ProxyPool:
|
||||
try:
|
||||
data = json.loads(self._state_path.read_text())
|
||||
if data.get("version") != STATE_VERSION:
|
||||
logger.warning("pool: state file version mismatch, starting fresh")
|
||||
logger.warning("%s: state file version mismatch, starting fresh", self._log_prefix)
|
||||
return
|
||||
for key, entry in data.get("proxies", {}).items():
|
||||
hop = ChainHop(
|
||||
@@ -469,11 +504,11 @@ class ProxyPool:
|
||||
)
|
||||
self._rebuild_alive()
|
||||
logger.info(
|
||||
"pool: loaded state (%d proxies, %d alive)",
|
||||
len(self._proxies), len(self._alive_keys),
|
||||
"%s: loaded state (%d proxies, %d alive)",
|
||||
self._log_prefix, len(self._proxies), len(self._alive_keys),
|
||||
)
|
||||
except (json.JSONDecodeError, KeyError, TypeError, ValueError) as e:
|
||||
logger.warning("pool: corrupt state file: %s", e)
|
||||
logger.warning("%s: corrupt state file: %s", self._log_prefix, e)
|
||||
self._proxies.clear()
|
||||
self._alive_keys.clear()
|
||||
|
||||
@@ -505,4 +540,4 @@ class ProxyPool:
|
||||
tmp.write_text(json.dumps(data, indent=2))
|
||||
os.replace(tmp, self._state_path)
|
||||
except OSError as e:
|
||||
logger.warning("pool: failed to save state: %s", e)
|
||||
logger.warning("%s: failed to save state: %s", self._log_prefix, e)
|
||||
|
||||
@@ -21,6 +21,19 @@ logger = logging.getLogger("s5p")
|
||||
BUFFER_SIZE = 65536
|
||||
|
||||
|
||||
class _RoundRobin:
|
||||
"""Simple round-robin selector (single-threaded asyncio, no lock)."""
|
||||
|
||||
def __init__(self, items: list[ChainHop]) -> None:
|
||||
self._items = items
|
||||
self._idx = 0
|
||||
|
||||
def next(self) -> ChainHop:
|
||||
item = self._items[self._idx % len(self._items)]
|
||||
self._idx += 1
|
||||
return item
|
||||
|
||||
|
||||
# -- relay -------------------------------------------------------------------
|
||||
|
||||
|
||||
@@ -66,6 +79,8 @@ async def _handle_client(
|
||||
proxy_pool: ProxyPool | None = None,
|
||||
metrics: Metrics | None = None,
|
||||
first_hop_pool: FirstHopPool | None = None,
|
||||
tor_rr: _RoundRobin | None = None,
|
||||
hop_pools: dict[tuple[str, int], FirstHopPool] | None = None,
|
||||
) -> None:
|
||||
"""Handle a single SOCKS5 client connection."""
|
||||
peer = client_writer.get_extra_info("peername")
|
||||
@@ -73,6 +88,7 @@ async def _handle_client(
|
||||
|
||||
if metrics:
|
||||
metrics.connections += 1
|
||||
metrics.conn_rate.record()
|
||||
|
||||
try:
|
||||
# -- greeting --
|
||||
@@ -108,6 +124,13 @@ async def _handle_client(
|
||||
|
||||
for attempt in range(attempts):
|
||||
effective_chain = list(listener.chain)
|
||||
fhp = first_hop_pool
|
||||
if tor_rr and effective_chain:
|
||||
node = tor_rr.next()
|
||||
effective_chain[0] = node
|
||||
if hop_pools:
|
||||
fhp = hop_pools.get((node.host, node.port))
|
||||
|
||||
pool_hops: list[ChainHop] = []
|
||||
if proxy_pool and listener.pool_hops > 0:
|
||||
for _ in range(listener.pool_hops):
|
||||
@@ -122,10 +145,15 @@ async def _handle_client(
|
||||
t0 = time.monotonic()
|
||||
remote_reader, remote_writer = await build_chain(
|
||||
effective_chain, target_host, target_port,
|
||||
timeout=timeout, first_hop_pool=first_hop_pool,
|
||||
timeout=timeout, first_hop_pool=fhp,
|
||||
)
|
||||
dt = time.monotonic() - t0
|
||||
logger.debug("[%s] chain up in %.0fms", tag, dt * 1000)
|
||||
if metrics:
|
||||
metrics.latency.record(dt)
|
||||
metrics.get_listener_latency(
|
||||
f"{listener.listen_host}:{listener.listen_port}"
|
||||
).record(dt)
|
||||
break
|
||||
except (ProtoError, TimeoutError, ConnectionError, OSError) as e:
|
||||
last_err = e
|
||||
@@ -205,7 +233,7 @@ async def _handle_client(
|
||||
async def _metrics_logger(
|
||||
metrics: Metrics,
|
||||
stop: asyncio.Event,
|
||||
pool: ProxyPool | None = None,
|
||||
pools: dict[str, ProxyPool] | None = None,
|
||||
) -> None:
|
||||
"""Log metrics summary every 60 seconds."""
|
||||
while not stop.is_set():
|
||||
@@ -215,8 +243,13 @@ async def _metrics_logger(
|
||||
pass
|
||||
if not stop.is_set():
|
||||
line = metrics.summary()
|
||||
if pool:
|
||||
line += f" pool={pool.alive_count}/{pool.count}"
|
||||
if pools:
|
||||
if len(pools) == 1:
|
||||
p = next(iter(pools.values()))
|
||||
line += f" pool={p.alive_count}/{p.count}"
|
||||
else:
|
||||
for name, p in pools.items():
|
||||
line += f" pool[{name}]={p.alive_count}/{p.count}"
|
||||
logger.info("metrics: %s", line)
|
||||
|
||||
|
||||
@@ -232,13 +265,47 @@ async def serve(config: Config) -> None:
|
||||
metrics = Metrics()
|
||||
listeners = config.listeners
|
||||
|
||||
# -- shared proxy pool ---------------------------------------------------
|
||||
proxy_pool: ProxyPool | None = None
|
||||
if config.proxy_pool and config.proxy_pool.sources:
|
||||
# use first listener's chain as base chain for pool health tests
|
||||
base_chain = listeners[0].chain if listeners else config.chain
|
||||
proxy_pool = ProxyPool(config.proxy_pool, base_chain, config.timeout)
|
||||
await proxy_pool.start()
|
||||
# -- tor_nodes round-robin -----------------------------------------------
|
||||
tor_rr: _RoundRobin | None = None
|
||||
if config.tor_nodes:
|
||||
tor_rr = _RoundRobin(config.tor_nodes)
|
||||
nodes = ", ".join(str(n) for n in config.tor_nodes)
|
||||
logger.info("tor_nodes: %s (round-robin)", nodes)
|
||||
|
||||
# -- named proxy pools ---------------------------------------------------
|
||||
proxy_pools: dict[str, ProxyPool] = {}
|
||||
base_chain = listeners[0].chain if listeners else config.chain
|
||||
for pool_name, pool_cfg in config.proxy_pools.items():
|
||||
if not pool_cfg.sources:
|
||||
continue
|
||||
pool = ProxyPool(
|
||||
pool_cfg, base_chain, config.timeout,
|
||||
chain_nodes=config.tor_nodes or None,
|
||||
name=pool_name,
|
||||
)
|
||||
await pool.start()
|
||||
proxy_pools[pool_name] = pool
|
||||
|
||||
# backward compat: single proxy_pool -> "default"
|
||||
if not proxy_pools and config.proxy_pool and config.proxy_pool.sources:
|
||||
pool = ProxyPool(
|
||||
config.proxy_pool, base_chain, config.timeout,
|
||||
chain_nodes=config.tor_nodes or None,
|
||||
)
|
||||
await pool.start()
|
||||
proxy_pools["default"] = pool
|
||||
|
||||
def _pool_for(lc: ListenerConfig) -> ProxyPool | None:
|
||||
"""Resolve the proxy pool for a listener."""
|
||||
if lc.pool_hops <= 0:
|
||||
return None
|
||||
name = lc.pool_name or "default"
|
||||
if name not in proxy_pools:
|
||||
raise RuntimeError(
|
||||
f"listener {lc.listen_host}:{lc.listen_port} "
|
||||
f"references unknown pool {name!r}"
|
||||
)
|
||||
return proxy_pools[name]
|
||||
|
||||
# -- per-unique first-hop connection pools --------------------------------
|
||||
hop_pools: dict[tuple[str, int], FirstHopPool] = {}
|
||||
@@ -254,6 +321,17 @@ async def serve(config: Config) -> None:
|
||||
)
|
||||
await hp.start()
|
||||
hop_pools[key] = hp
|
||||
# create pools for all tor_nodes
|
||||
if config.tor_nodes:
|
||||
for node in config.tor_nodes:
|
||||
key = (node.host, node.port)
|
||||
if key not in hop_pools:
|
||||
hp = FirstHopPool(
|
||||
node, size=config.pool_size,
|
||||
max_idle=config.pool_max_idle,
|
||||
)
|
||||
await hp.start()
|
||||
hop_pools[key] = hp
|
||||
|
||||
def _hop_pool_for(lc: ListenerConfig) -> FirstHopPool | None:
|
||||
if not lc.chain:
|
||||
@@ -283,15 +361,17 @@ async def serve(config: Config) -> None:
|
||||
servers: list[asyncio.Server] = []
|
||||
for lc in listeners:
|
||||
hp = _hop_pool_for(lc)
|
||||
lc_pool = _pool_for(lc)
|
||||
|
||||
async def on_client(
|
||||
r: asyncio.StreamReader, w: asyncio.StreamWriter,
|
||||
_lc: ListenerConfig = lc, _hp: FirstHopPool | None = hp,
|
||||
_pool: ProxyPool | None = lc_pool,
|
||||
) -> None:
|
||||
async with sem:
|
||||
await _handle_client(
|
||||
r, w, _lc, config.timeout, config.retries,
|
||||
proxy_pool, metrics, _hp,
|
||||
_pool, metrics, _hp, tor_rr, hop_pools,
|
||||
)
|
||||
|
||||
srv = await asyncio.start_server(on_client, lc.listen_host, lc.listen_port)
|
||||
@@ -301,16 +381,21 @@ async def serve(config: Config) -> None:
|
||||
chain_desc = " -> ".join(str(h) for h in lc.chain) if lc.chain else "direct"
|
||||
nhops = lc.pool_hops
|
||||
pool_desc = f" + {nhops} pool hop{'s' if nhops != 1 else ''}" if nhops else ""
|
||||
if lc_pool and lc_pool.name != "default":
|
||||
pool_desc += f" [{lc_pool.name}]"
|
||||
logger.info("listener %s chain: %s%s", addr, chain_desc, pool_desc)
|
||||
|
||||
logger.info("max_connections=%d", config.max_connections)
|
||||
|
||||
if proxy_pool:
|
||||
nsrc = len(config.proxy_pool.sources)
|
||||
logger.info(
|
||||
"pool: %d proxies, %d alive (from %d source%s)",
|
||||
proxy_pool.count, proxy_pool.alive_count, nsrc, "s" if nsrc != 1 else "",
|
||||
)
|
||||
if proxy_pools:
|
||||
for pname, pp in proxy_pools.items():
|
||||
cfg = config.proxy_pools.get(pname, config.proxy_pool)
|
||||
nsrc = len(cfg.sources) if cfg else 0
|
||||
prefix = f"pool[{pname}]" if pname != "default" else "pool"
|
||||
logger.info(
|
||||
"%s: %d proxies, %d alive (from %d source%s)",
|
||||
prefix, pp.count, pp.alive_count, nsrc, "s" if nsrc != 1 else "",
|
||||
)
|
||||
logger.info("retries: %d", config.retries)
|
||||
|
||||
if tor:
|
||||
@@ -325,7 +410,8 @@ async def serve(config: Config) -> None:
|
||||
api_ctx: dict = {
|
||||
"config": config,
|
||||
"metrics": metrics,
|
||||
"pool": proxy_pool,
|
||||
"pools": proxy_pools,
|
||||
"pool": next(iter(proxy_pools.values()), None), # backward compat
|
||||
"hop_pools": hop_pools,
|
||||
"tor": tor,
|
||||
}
|
||||
@@ -351,8 +437,13 @@ async def serve(config: Config) -> None:
|
||||
for h in root.handlers:
|
||||
h.setLevel(level)
|
||||
logging.getLogger("s5p").setLevel(level)
|
||||
if proxy_pool and new.proxy_pool:
|
||||
await proxy_pool.reload(new.proxy_pool)
|
||||
# reload named pools (match by name)
|
||||
for pname, pp in proxy_pools.items():
|
||||
new_cfg = new.proxy_pools.get(pname)
|
||||
if new_cfg:
|
||||
await pp.reload(new_cfg)
|
||||
elif new.proxy_pool and pname == "default":
|
||||
await pp.reload(new.proxy_pool)
|
||||
logger.info("reload: config reloaded (listeners require restart)")
|
||||
|
||||
def _on_sighup() -> None:
|
||||
@@ -365,8 +456,7 @@ async def serve(config: Config) -> None:
|
||||
api_srv = await start_api(config.api_host, config.api_port, api_ctx)
|
||||
|
||||
metrics_stop = asyncio.Event()
|
||||
pool_ref = proxy_pool
|
||||
metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, pool_ref))
|
||||
metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, proxy_pools or None))
|
||||
|
||||
# keep all servers open until stop signal
|
||||
try:
|
||||
@@ -386,11 +476,16 @@ async def serve(config: Config) -> None:
|
||||
await tor.stop()
|
||||
for hp in hop_pools.values():
|
||||
await hp.stop()
|
||||
if proxy_pool:
|
||||
await proxy_pool.stop()
|
||||
for pp in proxy_pools.values():
|
||||
await pp.stop()
|
||||
shutdown_line = metrics.summary()
|
||||
if pool_ref:
|
||||
shutdown_line += f" pool={pool_ref.alive_count}/{pool_ref.count}"
|
||||
if proxy_pools:
|
||||
if len(proxy_pools) == 1:
|
||||
p = next(iter(proxy_pools.values()))
|
||||
shutdown_line += f" pool={p.alive_count}/{p.count}"
|
||||
else:
|
||||
for pname, p in proxy_pools.items():
|
||||
shutdown_line += f" pool[{pname}]={p.alive_count}/{p.count}"
|
||||
logger.info("metrics: %s", shutdown_line)
|
||||
metrics_stop.set()
|
||||
await metrics_task
|
||||
|
||||
@@ -82,6 +82,7 @@ class TestJsonResponse:
|
||||
def _make_ctx(
|
||||
config: Config | None = None,
|
||||
pool: MagicMock | None = None,
|
||||
pools: dict | None = None,
|
||||
tor: MagicMock | None = None,
|
||||
) -> dict:
|
||||
"""Build a mock context dict."""
|
||||
@@ -89,6 +90,7 @@ def _make_ctx(
|
||||
"config": config or Config(),
|
||||
"metrics": Metrics(),
|
||||
"pool": pool,
|
||||
"pools": pools,
|
||||
"hop_pool": None,
|
||||
"tor": tor,
|
||||
}
|
||||
@@ -109,6 +111,8 @@ class TestHandleStatus:
|
||||
assert body["connections"] == 10
|
||||
assert body["success"] == 8
|
||||
assert "uptime" in body
|
||||
assert "rate" in body
|
||||
assert "latency" in body
|
||||
|
||||
def test_with_pool(self):
|
||||
pool = MagicMock()
|
||||
@@ -133,11 +137,37 @@ class TestHandleStatus:
|
||||
],
|
||||
)
|
||||
ctx = _make_ctx(config=config)
|
||||
# record some latency for the first listener
|
||||
ctx["metrics"].get_listener_latency("0.0.0.0:1080").record(0.2)
|
||||
_, body = _handle_status(ctx)
|
||||
assert len(body["listeners"]) == 2
|
||||
assert body["listeners"][0]["chain"] == ["socks5://127.0.0.1:9050"]
|
||||
assert body["listeners"][0]["pool_hops"] == 0
|
||||
assert body["listeners"][1]["pool_hops"] == 1
|
||||
# per-listener latency present on each entry
|
||||
assert "latency" in body["listeners"][0]
|
||||
assert body["listeners"][0]["latency"]["count"] == 1
|
||||
assert "latency" in body["listeners"][1]
|
||||
assert body["listeners"][1]["latency"] is None
|
||||
|
||||
|
||||
class TestHandleStatusPools:
|
||||
"""Test GET /status with multiple named pools."""
|
||||
|
||||
def test_multi_pool_summary(self):
|
||||
pool_a = MagicMock()
|
||||
pool_a.alive_count = 5
|
||||
pool_a.count = 10
|
||||
pool_a.name = "clean"
|
||||
pool_b = MagicMock()
|
||||
pool_b.alive_count = 3
|
||||
pool_b.count = 8
|
||||
pool_b.name = "mitm"
|
||||
ctx = _make_ctx(pools={"clean": pool_a, "mitm": pool_b})
|
||||
_, body = _handle_status(ctx)
|
||||
assert body["pool"] == {"alive": 8, "total": 18}
|
||||
assert body["pools"]["clean"] == {"alive": 5, "total": 10}
|
||||
assert body["pools"]["mitm"] == {"alive": 3, "total": 8}
|
||||
|
||||
|
||||
class TestHandleMetrics:
|
||||
@@ -152,6 +182,9 @@ class TestHandleMetrics:
|
||||
assert body["connections"] == 42
|
||||
assert body["bytes_in"] == 1024
|
||||
assert "uptime" in body
|
||||
assert "rate" in body
|
||||
assert "latency" in body
|
||||
assert "listener_latency" in body
|
||||
|
||||
|
||||
class TestHandlePool:
|
||||
@@ -206,6 +239,57 @@ class TestHandlePool:
|
||||
assert "socks5://1.2.3.4:1080" in body["proxies"]
|
||||
|
||||
|
||||
class TestHandlePoolMulti:
|
||||
"""Test GET /pool with multiple named pools."""
|
||||
|
||||
def test_merges_entries(self):
|
||||
pool_a = MagicMock()
|
||||
pool_a.alive_count = 1
|
||||
pool_a.count = 1
|
||||
pool_a.name = "clean"
|
||||
entry_a = MagicMock(
|
||||
alive=True, fails=0, tests=5,
|
||||
last_ok=100.0, last_test=100.0, last_seen=100.0,
|
||||
)
|
||||
pool_a._proxies = {"socks5://1.2.3.4:1080": entry_a}
|
||||
|
||||
pool_b = MagicMock()
|
||||
pool_b.alive_count = 1
|
||||
pool_b.count = 1
|
||||
pool_b.name = "mitm"
|
||||
entry_b = MagicMock(
|
||||
alive=True, fails=0, tests=3,
|
||||
last_ok=90.0, last_test=90.0, last_seen=90.0,
|
||||
)
|
||||
pool_b._proxies = {"socks5://5.6.7.8:1080": entry_b}
|
||||
|
||||
ctx = _make_ctx(pools={"clean": pool_a, "mitm": pool_b})
|
||||
_, body = _handle_pool(ctx)
|
||||
assert body["alive"] == 2
|
||||
assert body["total"] == 2
|
||||
assert len(body["proxies"]) == 2
|
||||
assert body["proxies"]["socks5://1.2.3.4:1080"]["pool"] == "clean"
|
||||
assert body["proxies"]["socks5://5.6.7.8:1080"]["pool"] == "mitm"
|
||||
assert "pools" in body
|
||||
assert body["pools"]["clean"] == {"alive": 1, "total": 1}
|
||||
|
||||
def test_single_pool_no_pool_field(self):
|
||||
"""Single pool: no 'pool' field on entries, no 'pools' summary."""
|
||||
pool = MagicMock()
|
||||
pool.alive_count = 1
|
||||
pool.count = 1
|
||||
pool.name = "default"
|
||||
entry = MagicMock(
|
||||
alive=True, fails=0, tests=5,
|
||||
last_ok=100.0, last_test=100.0, last_seen=100.0,
|
||||
)
|
||||
pool._proxies = {"socks5://1.2.3.4:1080": entry}
|
||||
ctx = _make_ctx(pools={"default": pool})
|
||||
_, body = _handle_pool(ctx)
|
||||
assert "pool" not in body["proxies"]["socks5://1.2.3.4:1080"]
|
||||
assert "pools" not in body
|
||||
|
||||
|
||||
class TestHandleConfig:
|
||||
"""Test GET /config handler."""
|
||||
|
||||
@@ -243,6 +327,79 @@ class TestHandleConfig:
|
||||
assert body["proxy_pool"]["sources"][0]["url"] == "http://api:8081/proxies"
|
||||
assert body["listeners"][0]["pool_hops"] == 1
|
||||
|
||||
def test_with_proxy_pools(self):
|
||||
pp_clean = ProxyPoolConfig(
|
||||
sources=[PoolSourceConfig(url="http://api:8081/proxies/all", mitm=False)],
|
||||
refresh=300.0,
|
||||
test_interval=120.0,
|
||||
max_fails=3,
|
||||
)
|
||||
pp_mitm = ProxyPoolConfig(
|
||||
sources=[PoolSourceConfig(url="http://api:8081/proxies/all", mitm=True)],
|
||||
refresh=300.0,
|
||||
test_interval=120.0,
|
||||
max_fails=3,
|
||||
)
|
||||
config = Config(
|
||||
proxy_pools={"clean": pp_clean, "mitm": pp_mitm},
|
||||
listeners=[ListenerConfig(
|
||||
listen_host="0.0.0.0", listen_port=1080,
|
||||
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
|
||||
pool_hops=2, pool_name="clean",
|
||||
)],
|
||||
)
|
||||
ctx = _make_ctx(config=config)
|
||||
_, body = _handle_config(ctx)
|
||||
assert "proxy_pools" in body
|
||||
assert body["proxy_pools"]["clean"]["sources"][0]["mitm"] is False
|
||||
assert body["proxy_pools"]["mitm"]["sources"][0]["mitm"] is True
|
||||
assert body["listeners"][0]["pool"] == "clean"
|
||||
|
||||
def test_with_tor_nodes(self):
|
||||
config = Config(
|
||||
tor_nodes=[
|
||||
ChainHop("socks5", "10.200.1.1", 9050),
|
||||
ChainHop("socks5", "10.200.1.13", 9050),
|
||||
],
|
||||
listeners=[ListenerConfig()],
|
||||
)
|
||||
ctx = _make_ctx(config=config)
|
||||
_, body = _handle_config(ctx)
|
||||
assert body["tor_nodes"] == [
|
||||
"socks5://10.200.1.1:9050",
|
||||
"socks5://10.200.1.13:9050",
|
||||
]
|
||||
|
||||
def test_no_tor_nodes(self):
|
||||
config = Config(listeners=[ListenerConfig()])
|
||||
ctx = _make_ctx(config=config)
|
||||
_, body = _handle_config(ctx)
|
||||
assert "tor_nodes" not in body
|
||||
|
||||
|
||||
class TestHandleStatusTorNodes:
|
||||
"""Test tor_nodes in GET /status response."""
|
||||
|
||||
def test_tor_nodes_in_status(self):
|
||||
config = Config(
|
||||
tor_nodes=[
|
||||
ChainHop("socks5", "10.200.1.1", 9050),
|
||||
ChainHop("socks5", "10.200.1.13", 9050),
|
||||
],
|
||||
listeners=[ListenerConfig()],
|
||||
)
|
||||
ctx = _make_ctx(config=config)
|
||||
_, body = _handle_status(ctx)
|
||||
assert body["tor_nodes"] == [
|
||||
"socks5://10.200.1.1:9050",
|
||||
"socks5://10.200.1.13:9050",
|
||||
]
|
||||
|
||||
def test_no_tor_nodes_in_status(self):
|
||||
ctx = _make_ctx()
|
||||
_, body = _handle_status(ctx)
|
||||
assert "tor_nodes" not in body
|
||||
|
||||
|
||||
# -- routing -----------------------------------------------------------------
|
||||
|
||||
|
||||
@@ -135,6 +135,7 @@ class TestConfig:
|
||||
assert c.max_connections == 256
|
||||
assert c.pool_size == 0
|
||||
assert c.pool_max_idle == 30.0
|
||||
assert c.tor_nodes == []
|
||||
|
||||
def test_max_connections_from_yaml(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
@@ -221,6 +222,115 @@ class TestConfig:
|
||||
]
|
||||
|
||||
|
||||
class TestProxyPools:
|
||||
"""Test named proxy_pools config parsing."""
|
||||
|
||||
def test_proxy_pools_from_yaml(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text(
|
||||
"proxy_pools:\n"
|
||||
" clean:\n"
|
||||
" sources:\n"
|
||||
" - url: http://api:8081/proxies/all\n"
|
||||
" mitm: false\n"
|
||||
" refresh: 300\n"
|
||||
" state_file: /data/pool-clean.json\n"
|
||||
" mitm:\n"
|
||||
" sources:\n"
|
||||
" - url: http://api:8081/proxies/all\n"
|
||||
" mitm: true\n"
|
||||
" state_file: /data/pool-mitm.json\n"
|
||||
)
|
||||
c = load_config(cfg_file)
|
||||
assert "clean" in c.proxy_pools
|
||||
assert "mitm" in c.proxy_pools
|
||||
assert c.proxy_pools["clean"].sources[0].mitm is False
|
||||
assert c.proxy_pools["mitm"].sources[0].mitm is True
|
||||
assert c.proxy_pools["clean"].state_file == "/data/pool-clean.json"
|
||||
assert c.proxy_pools["mitm"].state_file == "/data/pool-mitm.json"
|
||||
|
||||
def test_mitm_none_when_absent(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text(
|
||||
"proxy_pool:\n"
|
||||
" sources:\n"
|
||||
" - url: http://api:8081/proxies\n"
|
||||
)
|
||||
c = load_config(cfg_file)
|
||||
assert c.proxy_pool is not None
|
||||
assert c.proxy_pool.sources[0].mitm is None
|
||||
|
||||
def test_singular_becomes_default(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text(
|
||||
"proxy_pool:\n"
|
||||
" sources:\n"
|
||||
" - url: http://api:8081/proxies\n"
|
||||
)
|
||||
c = load_config(cfg_file)
|
||||
assert "default" in c.proxy_pools
|
||||
assert c.proxy_pools["default"] is c.proxy_pool
|
||||
|
||||
def test_proxy_pools_wins_over_singular(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text(
|
||||
"proxy_pools:\n"
|
||||
" default:\n"
|
||||
" sources:\n"
|
||||
" - url: http://api:8081/pools-default\n"
|
||||
"proxy_pool:\n"
|
||||
" sources:\n"
|
||||
" - url: http://api:8081/singular\n"
|
||||
)
|
||||
c = load_config(cfg_file)
|
||||
# proxy_pools "default" wins, singular does not overwrite
|
||||
assert c.proxy_pools["default"].sources[0].url == "http://api:8081/pools-default"
|
||||
|
||||
def test_listener_pool_name(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text(
|
||||
"listeners:\n"
|
||||
" - listen: 0.0.0.0:1080\n"
|
||||
" pool: clean\n"
|
||||
" chain:\n"
|
||||
" - socks5://127.0.0.1:9050\n"
|
||||
" - pool\n"
|
||||
" - listen: 0.0.0.0:1081\n"
|
||||
" chain:\n"
|
||||
" - socks5://127.0.0.1:9050\n"
|
||||
)
|
||||
c = load_config(cfg_file)
|
||||
assert c.listeners[0].pool_name == "clean"
|
||||
assert c.listeners[0].pool_hops == 1
|
||||
assert c.listeners[1].pool_name == ""
|
||||
assert c.listeners[1].pool_hops == 0
|
||||
|
||||
|
||||
class TestTorNodes:
|
||||
"""Test tor_nodes config parsing."""
|
||||
|
||||
def test_tor_nodes_from_yaml(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text(
|
||||
"tor_nodes:\n"
|
||||
" - socks5://10.200.1.1:9050\n"
|
||||
" - socks5://10.200.1.254:9050\n"
|
||||
" - socks5://10.200.1.13:9050\n"
|
||||
)
|
||||
c = load_config(cfg_file)
|
||||
assert len(c.tor_nodes) == 3
|
||||
assert c.tor_nodes[0].host == "10.200.1.1"
|
||||
assert c.tor_nodes[0].port == 9050
|
||||
assert c.tor_nodes[1].host == "10.200.1.254"
|
||||
assert c.tor_nodes[2].host == "10.200.1.13"
|
||||
|
||||
def test_no_tor_nodes(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text("listen: 1080\n")
|
||||
c = load_config(cfg_file)
|
||||
assert c.tor_nodes == []
|
||||
|
||||
|
||||
class TestListenerConfig:
|
||||
"""Test multi-listener config parsing."""
|
||||
|
||||
|
||||
181
tests/test_metrics.py
Normal file
181
tests/test_metrics.py
Normal file
@@ -0,0 +1,181 @@
|
||||
"""Tests for metrics trackers and helpers."""
|
||||
|
||||
from s5p.metrics import LatencyTracker, Metrics, RateTracker, _human_bytes
|
||||
|
||||
# -- LatencyTracker ----------------------------------------------------------
|
||||
|
||||
|
||||
class TestLatencyTracker:
|
||||
"""Test latency sample collection and percentile stats."""
|
||||
|
||||
def test_empty(self):
|
||||
lt = LatencyTracker()
|
||||
assert lt.count == 0
|
||||
assert lt.stats() is None
|
||||
|
||||
def test_single(self):
|
||||
lt = LatencyTracker()
|
||||
lt.record(0.1)
|
||||
s = lt.stats()
|
||||
assert s is not None
|
||||
assert s["count"] == 1
|
||||
assert s["min"] == s["max"] == s["avg"] == s["p50"]
|
||||
|
||||
def test_percentiles(self):
|
||||
lt = LatencyTracker()
|
||||
# 100 evenly spaced samples: 0.001, 0.002, ..., 0.100
|
||||
for i in range(1, 101):
|
||||
lt.record(i / 1000)
|
||||
s = lt.stats()
|
||||
assert s["count"] == 100
|
||||
assert s["min"] == 1.0 # 0.001s = 1.0ms
|
||||
assert s["max"] == 100.0 # 0.100s = 100.0ms
|
||||
assert 49.0 <= s["avg"] <= 52.0
|
||||
assert 50.0 <= s["p50"] <= 52.0
|
||||
assert 95.0 <= s["p95"] <= 97.0
|
||||
assert 99.0 <= s["p99"] <= 101.0
|
||||
|
||||
def test_bounded(self):
|
||||
lt = LatencyTracker(maxlen=5)
|
||||
for i in range(10):
|
||||
lt.record(i / 100)
|
||||
assert lt.count == 5
|
||||
s = lt.stats()
|
||||
# only the last 5 samples remain: 0.05..0.09
|
||||
assert s["min"] == 50.0
|
||||
assert s["max"] == 90.0
|
||||
|
||||
def test_milliseconds(self):
|
||||
lt = LatencyTracker()
|
||||
lt.record(0.5) # 500ms
|
||||
s = lt.stats()
|
||||
assert s["min"] == 500.0
|
||||
assert s["max"] == 500.0
|
||||
|
||||
|
||||
# -- RateTracker -------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRateTracker:
|
||||
"""Test rolling window event rate calculation."""
|
||||
|
||||
def test_empty(self):
|
||||
rt = RateTracker()
|
||||
assert rt.rate() == 0.0
|
||||
|
||||
def test_single(self):
|
||||
rt = RateTracker()
|
||||
rt.record()
|
||||
assert rt.rate() == 0.0
|
||||
|
||||
def test_known_rate(self):
|
||||
rt = RateTracker()
|
||||
# 11 events at 0.1s intervals = 10/1.0 = 10.0/s
|
||||
for i in range(11):
|
||||
rt.record(now=100.0 + i * 0.1)
|
||||
assert abs(rt.rate() - 10.0) < 0.01
|
||||
|
||||
def test_bounded(self):
|
||||
rt = RateTracker(maxlen=5)
|
||||
for i in range(10):
|
||||
rt.record(now=float(i))
|
||||
# only last 5 events: t=5..9, span=4, rate=4/4=1.0
|
||||
assert abs(rt.rate() - 1.0) < 0.01
|
||||
|
||||
def test_zero_span(self):
|
||||
rt = RateTracker()
|
||||
rt.record(now=1.0)
|
||||
rt.record(now=1.0)
|
||||
assert rt.rate() == 0.0
|
||||
|
||||
|
||||
# -- Metrics -----------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMetrics:
|
||||
"""Test Metrics aggregation and output."""
|
||||
|
||||
def test_to_dict_includes_rate_and_latency(self):
|
||||
m = Metrics()
|
||||
m.connections = 10
|
||||
m.conn_rate.record(now=0.0)
|
||||
m.conn_rate.record(now=1.0)
|
||||
m.latency.record(0.2)
|
||||
m.latency.record(0.3)
|
||||
d = m.to_dict()
|
||||
assert "rate" in d
|
||||
assert isinstance(d["rate"], float)
|
||||
assert "latency" in d
|
||||
assert d["latency"] is not None
|
||||
assert d["latency"]["count"] == 2
|
||||
|
||||
def test_to_dict_latency_none_when_empty(self):
|
||||
m = Metrics()
|
||||
d = m.to_dict()
|
||||
assert d["latency"] is None
|
||||
assert d["rate"] == 0.0
|
||||
|
||||
def test_summary_includes_rate(self):
|
||||
m = Metrics()
|
||||
m.conn_rate.record(now=0.0)
|
||||
m.conn_rate.record(now=1.0)
|
||||
s = m.summary()
|
||||
assert "rate=" in s
|
||||
assert "/s" in s
|
||||
|
||||
def test_summary_includes_latency(self):
|
||||
m = Metrics()
|
||||
m.latency.record(0.2)
|
||||
s = m.summary()
|
||||
assert "p50=" in s
|
||||
assert "p95=" in s
|
||||
|
||||
def test_summary_no_latency_when_empty(self):
|
||||
m = Metrics()
|
||||
s = m.summary()
|
||||
assert "p50=" not in s
|
||||
assert "p95=" not in s
|
||||
|
||||
def test_listener_latency(self):
|
||||
m = Metrics()
|
||||
m.get_listener_latency("0.0.0.0:1080").record(0.5)
|
||||
m.get_listener_latency("0.0.0.0:1080").record(0.6)
|
||||
m.get_listener_latency("0.0.0.0:1081").record(0.1)
|
||||
d = m.to_dict()
|
||||
assert "listener_latency" in d
|
||||
assert "0.0.0.0:1080" in d["listener_latency"]
|
||||
assert "0.0.0.0:1081" in d["listener_latency"]
|
||||
assert d["listener_latency"]["0.0.0.0:1080"]["count"] == 2
|
||||
assert d["listener_latency"]["0.0.0.0:1081"]["count"] == 1
|
||||
|
||||
def test_listener_latency_empty(self):
|
||||
m = Metrics()
|
||||
d = m.to_dict()
|
||||
assert d["listener_latency"] == {}
|
||||
|
||||
|
||||
# -- _human_bytes ------------------------------------------------------------
|
||||
|
||||
|
||||
class TestHumanBytes:
|
||||
"""Test byte count formatting."""
|
||||
|
||||
def test_bytes(self):
|
||||
assert _human_bytes(0) == "0B"
|
||||
assert _human_bytes(512) == "512B"
|
||||
|
||||
def test_kilobytes(self):
|
||||
assert _human_bytes(1024) == "1.0K"
|
||||
assert _human_bytes(1536) == "1.5K"
|
||||
|
||||
def test_megabytes(self):
|
||||
assert _human_bytes(1024 * 1024) == "1.0M"
|
||||
|
||||
def test_gigabytes(self):
|
||||
assert _human_bytes(1024**3) == "1.0G"
|
||||
|
||||
def test_terabytes(self):
|
||||
assert _human_bytes(1024**4) == "1.0T"
|
||||
|
||||
def test_petabytes(self):
|
||||
assert _human_bytes(1024**5) == "1.0P"
|
||||
@@ -11,6 +11,95 @@ from s5p.config import ChainHop, PoolSourceConfig, ProxyPoolConfig
|
||||
from s5p.pool import ProxyEntry, ProxyPool
|
||||
|
||||
|
||||
class TestProxyPoolName:
|
||||
"""Test pool name and state path derivation."""
|
||||
|
||||
def test_default_name(self):
|
||||
cfg = ProxyPoolConfig(sources=[])
|
||||
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||
assert pool.name == "default"
|
||||
assert pool._log_prefix == "pool"
|
||||
|
||||
def test_named_pool(self):
|
||||
cfg = ProxyPoolConfig(sources=[])
|
||||
pool = ProxyPool(cfg, [], timeout=10.0, name="clean")
|
||||
assert pool.name == "clean"
|
||||
assert pool._log_prefix == "pool[clean]"
|
||||
|
||||
def test_state_path_default(self):
|
||||
cfg = ProxyPoolConfig(sources=[])
|
||||
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||
assert pool._state_path.name == "pool.json"
|
||||
|
||||
def test_state_path_named(self):
|
||||
cfg = ProxyPoolConfig(sources=[])
|
||||
pool = ProxyPool(cfg, [], timeout=10.0, name="clean")
|
||||
assert pool._state_path.name == "pool-clean.json"
|
||||
|
||||
def test_state_path_explicit_overrides_name(self):
|
||||
cfg = ProxyPoolConfig(sources=[], state_file="/data/custom.json")
|
||||
pool = ProxyPool(cfg, [], timeout=10.0, name="clean")
|
||||
assert str(pool._state_path) == "/data/custom.json"
|
||||
|
||||
|
||||
class TestProxyPoolMitmQuery:
|
||||
"""Test mitm query parameter in API fetch."""
|
||||
|
||||
def test_mitm_false(self):
|
||||
cfg = ProxyPoolConfig(sources=[])
|
||||
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||
src = PoolSourceConfig(url="http://api:8081/proxies/all", mitm=False)
|
||||
|
||||
async def run():
|
||||
from unittest.mock import AsyncMock, patch
|
||||
mock_ret = {"proxies": []}
|
||||
with patch(
|
||||
"s5p.pool.http_get_json",
|
||||
new_callable=AsyncMock, return_value=mock_ret,
|
||||
) as mock:
|
||||
await pool._fetch_api(src)
|
||||
call_url = mock.call_args[0][0]
|
||||
assert "mitm=0" in call_url
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_mitm_true(self):
|
||||
cfg = ProxyPoolConfig(sources=[])
|
||||
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||
src = PoolSourceConfig(url="http://api:8081/proxies/all", mitm=True)
|
||||
|
||||
async def run():
|
||||
from unittest.mock import AsyncMock, patch
|
||||
mock_ret = {"proxies": []}
|
||||
with patch(
|
||||
"s5p.pool.http_get_json",
|
||||
new_callable=AsyncMock, return_value=mock_ret,
|
||||
) as mock:
|
||||
await pool._fetch_api(src)
|
||||
call_url = mock.call_args[0][0]
|
||||
assert "mitm=1" in call_url
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_mitm_none_omitted(self):
|
||||
cfg = ProxyPoolConfig(sources=[])
|
||||
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||
src = PoolSourceConfig(url="http://api:8081/proxies/all", mitm=None)
|
||||
|
||||
async def run():
|
||||
from unittest.mock import AsyncMock, patch
|
||||
mock_ret = {"proxies": []}
|
||||
with patch(
|
||||
"s5p.pool.http_get_json",
|
||||
new_callable=AsyncMock, return_value=mock_ret,
|
||||
) as mock:
|
||||
await pool._fetch_api(src)
|
||||
call_url = mock.call_args[0][0]
|
||||
assert "mitm" not in call_url
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
|
||||
class TestProxyEntry:
|
||||
"""Test ProxyEntry defaults."""
|
||||
|
||||
@@ -22,6 +111,38 @@ class TestProxyEntry:
|
||||
assert entry.tests == 0
|
||||
|
||||
|
||||
class TestEffectiveChain:
|
||||
"""Test chain_nodes round-robin in pool health tests."""
|
||||
|
||||
def test_no_nodes_returns_original(self):
|
||||
cfg = ProxyPoolConfig(sources=[])
|
||||
chain = [ChainHop(proto="socks5", host="10.0.0.1", port=9050)]
|
||||
pool = ProxyPool(cfg, chain, timeout=10.0)
|
||||
assert pool._effective_chain() == chain
|
||||
|
||||
def test_round_robin_across_nodes(self):
|
||||
cfg = ProxyPoolConfig(sources=[])
|
||||
chain = [ChainHop(proto="socks5", host="original", port=9050)]
|
||||
nodes = [
|
||||
ChainHop(proto="socks5", host="node-a", port=9050),
|
||||
ChainHop(proto="socks5", host="node-b", port=9050),
|
||||
ChainHop(proto="socks5", host="node-c", port=9050),
|
||||
]
|
||||
pool = ProxyPool(cfg, chain, timeout=10.0, chain_nodes=nodes)
|
||||
|
||||
hosts = [pool._effective_chain()[0].host for _ in range(6)]
|
||||
assert hosts == [
|
||||
"node-a", "node-b", "node-c",
|
||||
"node-a", "node-b", "node-c",
|
||||
]
|
||||
|
||||
def test_empty_chain_no_replacement(self):
|
||||
cfg = ProxyPoolConfig(sources=[])
|
||||
nodes = [ChainHop(proto="socks5", host="node-a", port=9050)]
|
||||
pool = ProxyPool(cfg, [], timeout=10.0, chain_nodes=nodes)
|
||||
assert pool._effective_chain() == []
|
||||
|
||||
|
||||
class TestProxyPoolMerge:
|
||||
"""Test proxy deduplication and merge."""
|
||||
|
||||
@@ -155,6 +276,102 @@ class TestProxyPoolWeight:
|
||||
pool.report_failure(hop) # should not raise
|
||||
|
||||
|
||||
class TestDynamicConcurrency:
|
||||
"""Test dynamic health test concurrency scaling."""
|
||||
|
||||
def test_scales_to_ten_percent(self):
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
cfg = ProxyPoolConfig(sources=[], test_concurrency=25)
|
||||
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||
|
||||
now = time.time()
|
||||
# add 100 proxies -> effective concurrency = max(3, min(100//10, 25)) = 10
|
||||
for i in range(100):
|
||||
hop = ChainHop(proto="socks5", host=f"10.0.{i // 256}.{i % 256}", port=1080)
|
||||
key = f"socks5://10.0.{i // 256}.{i % 256}:1080"
|
||||
pool._proxies[key] = ProxyEntry(hop=hop, alive=False, last_seen=now)
|
||||
|
||||
captured = {}
|
||||
|
||||
original_semaphore = asyncio.Semaphore
|
||||
|
||||
def capture_semaphore(value):
|
||||
captured["concurrency"] = value
|
||||
return original_semaphore(value)
|
||||
|
||||
with (
|
||||
patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True),
|
||||
patch("s5p.pool.asyncio.Semaphore", side_effect=capture_semaphore),
|
||||
):
|
||||
asyncio.run(pool._run_health_tests())
|
||||
|
||||
assert captured["concurrency"] == 10
|
||||
|
||||
def test_minimum_of_three(self):
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
cfg = ProxyPoolConfig(sources=[], test_concurrency=25)
|
||||
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||
|
||||
now = time.time()
|
||||
# 5 proxies -> 5//10=0, but min is 3
|
||||
for i in range(5):
|
||||
hop = ChainHop(proto="socks5", host=f"10.0.0.{i}", port=1080)
|
||||
pool._proxies[f"socks5://10.0.0.{i}:1080"] = ProxyEntry(
|
||||
hop=hop, alive=False, last_seen=now,
|
||||
)
|
||||
|
||||
captured = {}
|
||||
|
||||
original_semaphore = asyncio.Semaphore
|
||||
|
||||
def capture_semaphore(value):
|
||||
captured["concurrency"] = value
|
||||
return original_semaphore(value)
|
||||
|
||||
with (
|
||||
patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True),
|
||||
patch("s5p.pool.asyncio.Semaphore", side_effect=capture_semaphore),
|
||||
):
|
||||
asyncio.run(pool._run_health_tests())
|
||||
|
||||
assert captured["concurrency"] == 3
|
||||
|
||||
def test_capped_by_config(self):
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
cfg = ProxyPoolConfig(sources=[], test_concurrency=5)
|
||||
pool = ProxyPool(cfg, [], timeout=10.0)
|
||||
|
||||
now = time.time()
|
||||
# 1000 proxies -> 1000//10=100, capped at 5
|
||||
for i in range(1000):
|
||||
h = f"10.{i // 65536}.{(i // 256) % 256}.{i % 256}"
|
||||
hop = ChainHop(proto="socks5", host=h, port=1080)
|
||||
key = str(hop)
|
||||
pool._proxies[key] = ProxyEntry(hop=hop, alive=False, last_seen=now)
|
||||
|
||||
captured = {}
|
||||
|
||||
original_semaphore = asyncio.Semaphore
|
||||
|
||||
def capture_semaphore(value):
|
||||
captured["concurrency"] = value
|
||||
return original_semaphore(value)
|
||||
|
||||
with (
|
||||
patch.object(pool, "_test_proxy", new_callable=AsyncMock, return_value=True),
|
||||
patch("s5p.pool.asyncio.Semaphore", side_effect=capture_semaphore),
|
||||
):
|
||||
asyncio.run(pool._run_health_tests())
|
||||
|
||||
assert captured["concurrency"] == 5
|
||||
|
||||
|
||||
class TestProxyPoolHealthTests:
|
||||
"""Test selective health testing."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user