Compare commits
9 Commits
66fc76ceb3
...
6c84a144c0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c84a144c0 | ||
|
|
d2df32fdab | ||
|
|
f0281c4069 | ||
|
|
ff217be9c8 | ||
|
|
b07135ad44 | ||
|
|
c939101a73 | ||
|
|
4ee2cf5bb0 | ||
|
|
b72d083f56 | ||
|
|
ecf9a840e4 |
@@ -25,6 +25,7 @@ Client -------> s5p -------> Hop 1 -------> Hop 2 -------> Target
|
||||
- **pool.py** -- managed proxy pool (multi-source, health-tested, persistent)
|
||||
- **http.py** -- minimal async HTTP/1.1 client (GET/POST JSON, no external deps)
|
||||
- **connpool.py** -- pre-warmed TCP connection pool to first chain hop
|
||||
- **api.py** -- built-in HTTP control API (runtime metrics, pool state, config reload)
|
||||
- **cli.py** -- argparse CLI, logging setup, cProfile support
|
||||
- **metrics.py** -- connection counters and human-readable summary (lock-free, asyncio-only)
|
||||
|
||||
@@ -66,3 +67,4 @@ All other functionality uses Python stdlib (`asyncio`, `socket`, `struct`).
|
||||
- **Connection semaphore** -- cap concurrent connections to prevent fd exhaustion
|
||||
- **Async HTTP** -- native asyncio HTTP client replaces blocking urllib, parallel fetches
|
||||
- **First-hop pool** -- pre-warmed TCP connections to chain[0], stale-evicted, auto-refilled
|
||||
- **Control API** -- built-in asyncio HTTP server, no Flask/external deps, disabled by default
|
||||
|
||||
13
README.md
13
README.md
@@ -10,7 +10,7 @@ through configurable chains of SOCKS4, SOCKS5, and HTTP CONNECT proxies.
|
||||
- Supported hop protocols: SOCKS5, SOCKS4/4a, HTTP CONNECT
|
||||
- Per-hop authentication (username/password)
|
||||
- DNS leak prevention (domain names forwarded to proxies, never resolved locally)
|
||||
- Tor integration (Tor is just another SOCKS5 hop)
|
||||
- Tor integration (SOCKS5 hop + control port NEWNYM for circuit rotation)
|
||||
- Managed proxy pool: multiple sources (API + file), health-tested, weighted selection
|
||||
- Per-proxy failure backoff (60s cooldown), stale proxy expiry, chain pre-flight
|
||||
- Fast warm start (seconds on restart vs minutes on cold start)
|
||||
@@ -21,6 +21,7 @@ through configurable chains of SOCKS4, SOCKS5, and HTTP CONNECT proxies.
|
||||
- Concurrent connection limit with backpressure (`max_connections`)
|
||||
- Async HTTP client for proxy source fetching (parallel, no threads)
|
||||
- First-hop TCP connection pool (pre-warmed, stale-evicted)
|
||||
- Built-in control API (runtime metrics, pool state, config reload via HTTP)
|
||||
- Container-ready (Alpine-based, podman/docker)
|
||||
- Graceful shutdown (SIGTERM/SIGINT)
|
||||
- Pure Python, asyncio-based, minimal dependencies
|
||||
@@ -70,10 +71,16 @@ timeout: 10
|
||||
retries: 3
|
||||
max_connections: 256 # concurrent connection limit
|
||||
pool_size: 8 # pre-warmed connections to first hop
|
||||
api_listen: 127.0.0.1:1081 # control API (disabled by default)
|
||||
|
||||
chain:
|
||||
- socks5://127.0.0.1:9050 # Tor
|
||||
|
||||
tor:
|
||||
control_port: 9051 # Tor control port (NEWNYM)
|
||||
password: "" # or cookie_file for auth
|
||||
newnym_interval: 0 # periodic circuit rotation (0 = manual)
|
||||
|
||||
proxy_pool:
|
||||
sources:
|
||||
- url: http://10.200.1.250:8081/proxies
|
||||
@@ -89,7 +96,7 @@ proxy_pool:
|
||||
## CLI Reference
|
||||
|
||||
```
|
||||
s5p [-c FILE] [-l [HOST:]PORT] [-C URL[,URL,...]] [-S URL] [-t SEC] [-r N] [-m N] [-v|-q]
|
||||
s5p [-c FILE] [-l [HOST:]PORT] [-C URL[,URL,...]] [-S URL] [-t SEC] [-r N] [-m N] [--api [HOST:]PORT] [-v|-q]
|
||||
|
||||
Options:
|
||||
-c, --config FILE YAML config file
|
||||
@@ -99,9 +106,11 @@ Options:
|
||||
-t, --timeout SEC Per-hop timeout (default: 10)
|
||||
-r, --retries N Max attempts per connection (default: 3, proxy_source only)
|
||||
-m, --max-connections N Max concurrent connections (default: 256)
|
||||
--api [HOST:]PORT Enable control API (e.g. 127.0.0.1:1081)
|
||||
-v, --verbose Debug logging
|
||||
-q, --quiet Errors only
|
||||
--cprofile [FILE] Enable cProfile, dump to FILE (default: s5p.prof)
|
||||
--tracemalloc [N] Enable tracemalloc, show top N allocators on exit (default: 10)
|
||||
-V, --version Show version
|
||||
```
|
||||
|
||||
|
||||
@@ -22,8 +22,9 @@
|
||||
|
||||
## v0.2.0
|
||||
|
||||
- [x] Built-in control API (runtime metrics, pool state, config reload)
|
||||
- [ ] SOCKS5 server authentication (username/password)
|
||||
- [ ] Tor control port integration (circuit renewal via NEWNYM)
|
||||
- [x] Tor control port integration (circuit renewal via NEWNYM)
|
||||
- [ ] Metrics (connections/sec, bytes relayed, hop latency)
|
||||
|
||||
## v0.3.0
|
||||
|
||||
4
TASKS.md
4
TASKS.md
@@ -42,8 +42,10 @@
|
||||
- [x] Instant warm start (trust cached state, defer all health tests)
|
||||
- [x] Register signal handlers before startup (fix SIGKILL on stop)
|
||||
- [x] Use k8s-file logging driver with rotation
|
||||
- [x] Built-in control API (`api.py`, `--api`, `api_listen`)
|
||||
|
||||
- [x] Tor control port integration (NEWNYM signaling, periodic rotation)
|
||||
|
||||
## Next
|
||||
- [ ] Integration tests with mock proxy server
|
||||
- [ ] SOCKS5 server-side authentication
|
||||
- [ ] Tor control port integration
|
||||
|
||||
@@ -8,6 +8,7 @@ log_level: info
|
||||
# max_connections: 256 # max concurrent client connections (backpressure)
|
||||
# pool_size: 0 # pre-warmed TCP connections to first hop (0 = disabled)
|
||||
# pool_max_idle: 30 # max idle time (seconds) for pooled connections
|
||||
# api_listen: 127.0.0.1:1081 # control API (disabled by default)
|
||||
|
||||
# Proxy chain -- connections tunnel through each hop in order.
|
||||
# Supported protocols: socks5://, socks4://, http://
|
||||
@@ -37,6 +38,15 @@ chain:
|
||||
# state_file: "" # empty = ~/.cache/s5p/pool.json
|
||||
# report_url: "" # POST dead proxies here (optional)
|
||||
|
||||
# Tor control port -- enables NEWNYM signaling (new circuit on demand).
|
||||
# Requires Tor's ControlPort enabled (torrc: ControlPort 9051).
|
||||
# tor:
|
||||
# control_host: 127.0.0.1
|
||||
# control_port: 9051
|
||||
# password: "" # HashedControlPassword in torrc
|
||||
# cookie_file: "" # CookieAuthentication file path
|
||||
# newnym_interval: 0 # periodic NEWNYM (seconds, 0 = manual only)
|
||||
|
||||
# Legacy proxy source (still supported, auto-converts to proxy_pool):
|
||||
# proxy_source:
|
||||
# url: http://10.200.1.250:8081/proxies
|
||||
|
||||
@@ -14,8 +14,11 @@ s5p -q # errors only
|
||||
s5p -S http://api:8081/proxies # proxy source API
|
||||
s5p -r 5 # retry up to 5 proxies
|
||||
s5p -m 512 # max concurrent connections
|
||||
s5p --api 127.0.0.1:1081 # enable control API
|
||||
s5p --cprofile # profile to s5p.prof
|
||||
s5p --cprofile out.prof # profile to custom file
|
||||
s5p --tracemalloc # memory profile (top 10)
|
||||
s5p --tracemalloc 20 # memory profile (top 20)
|
||||
```
|
||||
|
||||
## Container
|
||||
@@ -58,6 +61,20 @@ proxy_pool:
|
||||
report_url: "" # POST dead proxies (optional)
|
||||
```
|
||||
|
||||
## Tor Control Port (config)
|
||||
|
||||
```yaml
|
||||
tor:
|
||||
control_port: 9051
|
||||
password: "" # or cookie_file: /path/to/cookie
|
||||
newnym_interval: 60 # auto-rotate every 60s (0 = manual)
|
||||
```
|
||||
|
||||
```bash
|
||||
curl -s http://127.0.0.1:1081/tor | jq . # status
|
||||
curl -s -X POST http://127.0.0.1:1081/tor/newnym | jq . # new circuit
|
||||
```
|
||||
|
||||
## Hot Reload
|
||||
|
||||
```bash
|
||||
@@ -84,6 +101,20 @@ http://host:port
|
||||
http://user:pass@host:port
|
||||
```
|
||||
|
||||
## Control API
|
||||
|
||||
```bash
|
||||
s5p --api 127.0.0.1:1081 -c config/s5p.yaml # enable API
|
||||
|
||||
curl -s http://127.0.0.1:1081/status | jq . # runtime status
|
||||
curl -s http://127.0.0.1:1081/metrics | jq . # full metrics
|
||||
curl -s http://127.0.0.1:1081/pool | jq . # all proxies
|
||||
curl -s http://127.0.0.1:1081/pool/alive | jq . # alive only
|
||||
curl -s http://127.0.0.1:1081/config | jq . # current config
|
||||
curl -s -X POST http://127.0.0.1:1081/reload # reload config
|
||||
curl -s -X POST http://127.0.0.1:1081/pool/test # health test now
|
||||
```
|
||||
|
||||
## Testing
|
||||
|
||||
```bash
|
||||
|
||||
139
docs/USAGE.md
139
docs/USAGE.md
@@ -46,6 +46,7 @@ log_level: info
|
||||
max_connections: 256 # concurrent connection limit (backpressure)
|
||||
pool_size: 0 # pre-warmed TCP connections to first hop (0 = disabled)
|
||||
pool_max_idle: 30 # max idle time for pooled connections (seconds)
|
||||
api_listen: "" # control API bind address (empty = disabled)
|
||||
|
||||
chain:
|
||||
- socks5://127.0.0.1:9050
|
||||
@@ -216,6 +217,133 @@ other pool settings).
|
||||
The old `proxy_source` key is still supported and auto-converts to `proxy_pool`
|
||||
with a single API source. `proxy_pool` takes precedence if both are present.
|
||||
|
||||
## Control API
|
||||
|
||||
Built-in HTTP API for runtime inspection and management. Disabled by default;
|
||||
enable with `api_listen` in config or `--api` on the command line.
|
||||
|
||||
```yaml
|
||||
api_listen: 127.0.0.1:1081
|
||||
```
|
||||
|
||||
```bash
|
||||
s5p --api 127.0.0.1:1081 -c config/s5p.yaml
|
||||
```
|
||||
|
||||
### Read endpoints
|
||||
|
||||
| Method | Path | Description |
|
||||
|--------|------|-------------|
|
||||
| `GET` | `/status` | Combined summary: uptime, metrics, pool stats, chain |
|
||||
| `GET` | `/metrics` | Full metrics counters (connections, bytes, uptime) |
|
||||
| `GET` | `/pool` | All proxies with per-entry state |
|
||||
| `GET` | `/pool/alive` | Alive proxies only |
|
||||
| `GET` | `/config` | Current runtime config (sanitized) |
|
||||
|
||||
### Write endpoints
|
||||
|
||||
| Method | Path | Description |
|
||||
|--------|------|-------------|
|
||||
| `POST` | `/reload` | Re-read config file (replaces SIGHUP) |
|
||||
| `POST` | `/pool/test` | Trigger immediate health test cycle |
|
||||
| `POST` | `/pool/refresh` | Trigger immediate source re-fetch |
|
||||
|
||||
All responses are `application/json`. Errors return `{"error": "message"}` with
|
||||
appropriate status code (400, 404, 405, 500).
|
||||
|
||||
### Examples
|
||||
|
||||
```bash
|
||||
# Runtime status
|
||||
curl -s http://127.0.0.1:1081/status | jq .
|
||||
|
||||
# Full metrics
|
||||
curl -s http://127.0.0.1:1081/metrics | jq .
|
||||
|
||||
# Pool state (all proxies)
|
||||
curl -s http://127.0.0.1:1081/pool | jq .
|
||||
|
||||
# Alive proxies only
|
||||
curl -s http://127.0.0.1:1081/pool/alive | jq '.proxies | length'
|
||||
|
||||
# Current config
|
||||
curl -s http://127.0.0.1:1081/config | jq .
|
||||
|
||||
# Reload config (like SIGHUP)
|
||||
curl -s -X POST http://127.0.0.1:1081/reload | jq .
|
||||
|
||||
# Trigger health tests now
|
||||
curl -s -X POST http://127.0.0.1:1081/pool/test | jq .
|
||||
|
||||
# Re-fetch proxy sources now
|
||||
curl -s -X POST http://127.0.0.1:1081/pool/refresh | jq .
|
||||
```
|
||||
|
||||
Settings that require a restart: `listen`, `chain`, `pool_size`, `pool_max_idle`, `api_listen`.
|
||||
|
||||
## Tor Control Port
|
||||
|
||||
Optional integration with Tor's control protocol for circuit management.
|
||||
When enabled, s5p connects to Tor's control port and can send NEWNYM signals
|
||||
to request new circuits (new exit node) on demand or on a timer.
|
||||
|
||||
### Configuration
|
||||
|
||||
```yaml
|
||||
tor:
|
||||
control_host: 127.0.0.1 # Tor control address
|
||||
control_port: 9051 # Tor control port
|
||||
password: "" # HashedControlPassword (torrc)
|
||||
cookie_file: "" # CookieAuthentication file path
|
||||
newnym_interval: 0 # periodic NEWNYM in seconds (0 = manual only)
|
||||
```
|
||||
|
||||
Requires Tor's `ControlPort` enabled in `torrc`:
|
||||
|
||||
```
|
||||
ControlPort 9051
|
||||
HashedControlPassword 16:... # or CookieAuthentication 1
|
||||
```
|
||||
|
||||
### Authentication modes
|
||||
|
||||
| Mode | Config | torrc |
|
||||
|------|--------|-------|
|
||||
| Password | `password: "secret"` | `HashedControlPassword 16:...` |
|
||||
| Cookie | `cookie_file: /var/run/tor/control.authcookie` | `CookieAuthentication 1` |
|
||||
| None | (leave both empty) | No auth configured |
|
||||
|
||||
### Rate limiting
|
||||
|
||||
Tor enforces a minimum 10-second interval between NEWNYM signals. s5p
|
||||
applies the same client-side rate limit to avoid unnecessary rejections.
|
||||
|
||||
### API endpoints
|
||||
|
||||
| Method | Path | Description |
|
||||
|--------|------|-------------|
|
||||
| `GET` | `/tor` | Controller status (enabled, connected, last NEWNYM) |
|
||||
| `POST` | `/tor/newnym` | Trigger NEWNYM signal (new circuit) |
|
||||
|
||||
```bash
|
||||
# Check tor controller status
|
||||
curl -s http://127.0.0.1:1081/tor | jq .
|
||||
|
||||
# Request new circuit
|
||||
curl -s -X POST http://127.0.0.1:1081/tor/newnym | jq .
|
||||
```
|
||||
|
||||
### Periodic NEWNYM
|
||||
|
||||
Set `newnym_interval` to automatically rotate circuits:
|
||||
|
||||
```yaml
|
||||
tor:
|
||||
newnym_interval: 60 # new circuit every 60 seconds
|
||||
```
|
||||
|
||||
Values below 10 are clamped to Tor's minimum interval.
|
||||
|
||||
## Connection Retry
|
||||
|
||||
When a proxy pool is active, s5p retries failed connections with a different
|
||||
@@ -250,7 +378,7 @@ Settings reloaded on SIGHUP:
|
||||
| `max_connections` | Concurrent connection limit |
|
||||
| `proxy_pool.*` | Sources, intervals, thresholds |
|
||||
|
||||
Settings that require a restart: `listen`, `chain`, `pool_size`, `pool_max_idle`.
|
||||
Settings that require a restart: `listen`, `chain`, `pool_size`, `pool_max_idle`, `api_listen`.
|
||||
|
||||
Requires `-c` / `--config` to know which file to re-read. Without a
|
||||
config file, SIGHUP is ignored with a warning.
|
||||
@@ -291,6 +419,15 @@ s5p --cprofile output.prof -c config/s5p.yaml
|
||||
|
||||
# Analyze after stopping
|
||||
python -m pstats s5p.prof
|
||||
|
||||
# Memory profiling with tracemalloc (top 10 allocators on exit)
|
||||
s5p --tracemalloc -c config/s5p.yaml
|
||||
|
||||
# Show top 20 allocators
|
||||
s5p --tracemalloc 20 -c config/s5p.yaml
|
||||
|
||||
# Both profilers simultaneously
|
||||
s5p --cprofile --tracemalloc -c config/s5p.yaml
|
||||
```
|
||||
|
||||
## Testing the Proxy
|
||||
|
||||
305
src/s5p/api.py
Normal file
305
src/s5p/api.py
Normal file
@@ -0,0 +1,305 @@
|
||||
"""Built-in HTTP control API for runtime inspection and management."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
|
||||
from .metrics import Metrics
|
||||
|
||||
logger = logging.getLogger("s5p")
|
||||
|
||||
|
||||
# -- HTTP helpers ------------------------------------------------------------
|
||||
|
||||
|
||||
def _parse_request(data: bytes) -> tuple[str, str]:
|
||||
"""Extract method and path from an HTTP/1.1 request line.
|
||||
|
||||
Returns (method, path) or ("", "") on parse failure.
|
||||
"""
|
||||
try:
|
||||
line = data.split(b"\r\n", 1)[0].decode("ascii")
|
||||
except (UnicodeDecodeError, IndexError):
|
||||
return "", ""
|
||||
parts = line.split(None, 2)
|
||||
if len(parts) < 2:
|
||||
return "", ""
|
||||
return parts[0].upper(), parts[1].split("?", 1)[0]
|
||||
|
||||
|
||||
def _json_response(
|
||||
writer: asyncio.StreamWriter,
|
||||
status: int,
|
||||
body: dict | list,
|
||||
) -> None:
|
||||
"""Write an HTTP response with JSON body and close."""
|
||||
phrases = {200: "OK", 400: "Bad Request", 404: "Not Found",
|
||||
405: "Method Not Allowed", 500: "Internal Server Error"}
|
||||
payload = json.dumps(body, separators=(",", ":")).encode()
|
||||
header = (
|
||||
f"HTTP/1.1 {status} {phrases.get(status, 'Error')}\r\n"
|
||||
f"Content-Type: application/json\r\n"
|
||||
f"Content-Length: {len(payload)}\r\n"
|
||||
f"Connection: close\r\n"
|
||||
f"\r\n"
|
||||
)
|
||||
writer.write(header.encode() + payload)
|
||||
|
||||
|
||||
# -- route handlers ----------------------------------------------------------
|
||||
|
||||
|
||||
def _handle_status(ctx: dict) -> tuple[int, dict]:
|
||||
"""GET /status -- combined runtime summary."""
|
||||
metrics: Metrics = ctx["metrics"]
|
||||
data = {
|
||||
"uptime": round(time.monotonic() - metrics.started, 1),
|
||||
"connections": metrics.connections,
|
||||
"success": metrics.success,
|
||||
"failed": metrics.failed,
|
||||
"active": metrics.active,
|
||||
"bytes_in": metrics.bytes_in,
|
||||
"bytes_out": metrics.bytes_out,
|
||||
}
|
||||
pool = ctx.get("pool")
|
||||
if pool:
|
||||
data["pool"] = {"alive": pool.alive_count, "total": pool.count}
|
||||
config = ctx.get("config")
|
||||
if config:
|
||||
data["chain"] = [str(h) for h in config.chain]
|
||||
return 200, data
|
||||
|
||||
|
||||
def _handle_metrics(ctx: dict) -> tuple[int, dict]:
|
||||
"""GET /metrics -- full metrics counters."""
|
||||
return 200, ctx["metrics"].to_dict()
|
||||
|
||||
|
||||
def _handle_pool(ctx: dict, alive_only: bool = False) -> tuple[int, dict]:
|
||||
"""GET /pool or /pool/alive -- proxy pool state."""
|
||||
pool = ctx.get("pool")
|
||||
if not pool:
|
||||
return 200, {"alive": 0, "total": 0, "proxies": {}}
|
||||
|
||||
proxies = {}
|
||||
for key, entry in pool._proxies.items():
|
||||
if alive_only and not entry.alive:
|
||||
continue
|
||||
proxies[key] = {
|
||||
"alive": entry.alive,
|
||||
"fails": entry.fails,
|
||||
"tests": entry.tests,
|
||||
"last_ok": entry.last_ok,
|
||||
"last_test": entry.last_test,
|
||||
"last_seen": entry.last_seen,
|
||||
}
|
||||
return 200, {
|
||||
"alive": pool.alive_count,
|
||||
"total": pool.count,
|
||||
"proxies": proxies,
|
||||
}
|
||||
|
||||
|
||||
def _handle_config(ctx: dict) -> tuple[int, dict]:
|
||||
"""GET /config -- sanitized runtime config."""
|
||||
config = ctx.get("config")
|
||||
if not config:
|
||||
return 500, {"error": "config unavailable"}
|
||||
|
||||
data: dict = {
|
||||
"listen": f"{config.listen_host}:{config.listen_port}",
|
||||
"timeout": config.timeout,
|
||||
"retries": config.retries,
|
||||
"log_level": config.log_level,
|
||||
"max_connections": config.max_connections,
|
||||
"pool_size": config.pool_size,
|
||||
"chain": [str(h) for h in config.chain],
|
||||
}
|
||||
if config.proxy_pool:
|
||||
pp = config.proxy_pool
|
||||
sources = []
|
||||
for src in pp.sources:
|
||||
s: dict = {}
|
||||
if src.url:
|
||||
s["url"] = src.url
|
||||
if src.file:
|
||||
s["file"] = src.file
|
||||
sources.append(s)
|
||||
data["proxy_pool"] = {
|
||||
"sources": sources,
|
||||
"refresh": pp.refresh,
|
||||
"test_interval": pp.test_interval,
|
||||
"max_fails": pp.max_fails,
|
||||
}
|
||||
return 200, data
|
||||
|
||||
|
||||
async def _handle_reload(ctx: dict) -> tuple[int, dict]:
|
||||
"""POST /reload -- re-read config (like SIGHUP)."""
|
||||
reload_fn = ctx.get("reload_fn")
|
||||
if not reload_fn:
|
||||
return 500, {"error": "reload not available"}
|
||||
try:
|
||||
await reload_fn()
|
||||
return 200, {"ok": True}
|
||||
except Exception as e:
|
||||
return 500, {"error": str(e)}
|
||||
|
||||
|
||||
async def _handle_pool_test(ctx: dict) -> tuple[int, dict]:
|
||||
"""POST /pool/test -- trigger immediate health test."""
|
||||
pool = ctx.get("pool")
|
||||
if not pool:
|
||||
return 400, {"error": "no proxy pool configured"}
|
||||
asyncio.create_task(pool._run_health_tests())
|
||||
return 200, {"ok": True}
|
||||
|
||||
|
||||
async def _handle_pool_refresh(ctx: dict) -> tuple[int, dict]:
|
||||
"""POST /pool/refresh -- trigger immediate source re-fetch."""
|
||||
pool = ctx.get("pool")
|
||||
if not pool:
|
||||
return 400, {"error": "no proxy pool configured"}
|
||||
asyncio.create_task(pool._fetch_all_sources())
|
||||
return 200, {"ok": True}
|
||||
|
||||
|
||||
def _handle_tor(ctx: dict) -> tuple[int, dict]:
|
||||
"""GET /tor -- Tor controller status."""
|
||||
tor = ctx.get("tor")
|
||||
if not tor:
|
||||
return 200, {"enabled": False}
|
||||
last = tor.last_newnym
|
||||
return 200, {
|
||||
"enabled": True,
|
||||
"connected": tor.connected,
|
||||
"last_newnym": round(time.monotonic() - last, 1) if last else None,
|
||||
"newnym_interval": tor.newnym_interval,
|
||||
}
|
||||
|
||||
|
||||
async def _handle_tor_newnym(ctx: dict) -> tuple[int, dict]:
|
||||
"""POST /tor/newnym -- trigger NEWNYM signal."""
|
||||
tor = ctx.get("tor")
|
||||
if not tor:
|
||||
return 400, {"error": "tor control not configured"}
|
||||
ok = await tor.newnym()
|
||||
if ok:
|
||||
return 200, {"ok": True}
|
||||
return 200, {"ok": False, "reason": "rate-limited or not connected"}
|
||||
|
||||
|
||||
# -- routing -----------------------------------------------------------------
|
||||
|
||||
_GET_ROUTES: dict[str, str] = {
|
||||
"/status": "status",
|
||||
"/metrics": "metrics",
|
||||
"/pool": "pool",
|
||||
"/pool/alive": "pool_alive",
|
||||
"/config": "config",
|
||||
"/tor": "tor",
|
||||
}
|
||||
|
||||
_POST_ROUTES: dict[str, str] = {
|
||||
"/reload": "reload",
|
||||
"/pool/test": "pool_test",
|
||||
"/pool/refresh": "pool_refresh",
|
||||
"/tor/newnym": "tor_newnym",
|
||||
}
|
||||
|
||||
|
||||
async def _route(method: str, path: str, ctx: dict) -> tuple[int, dict]:
|
||||
"""Dispatch request to the appropriate handler."""
|
||||
if method == "GET" and path in _GET_ROUTES:
|
||||
name = _GET_ROUTES[path]
|
||||
if name == "status":
|
||||
return _handle_status(ctx)
|
||||
if name == "metrics":
|
||||
return _handle_metrics(ctx)
|
||||
if name == "pool":
|
||||
return _handle_pool(ctx)
|
||||
if name == "pool_alive":
|
||||
return _handle_pool(ctx, alive_only=True)
|
||||
if name == "config":
|
||||
return _handle_config(ctx)
|
||||
if name == "tor":
|
||||
return _handle_tor(ctx)
|
||||
|
||||
if method == "POST" and path in _POST_ROUTES:
|
||||
name = _POST_ROUTES[path]
|
||||
if name == "reload":
|
||||
return await _handle_reload(ctx)
|
||||
if name == "pool_test":
|
||||
return await _handle_pool_test(ctx)
|
||||
if name == "pool_refresh":
|
||||
return await _handle_pool_refresh(ctx)
|
||||
if name == "tor_newnym":
|
||||
return await _handle_tor_newnym(ctx)
|
||||
|
||||
# wrong method on a known path
|
||||
if path in _GET_ROUTES or path in _POST_ROUTES:
|
||||
expected = "GET" if path in _GET_ROUTES else "POST"
|
||||
return 405, {"error": f"use {expected} for {path}"}
|
||||
|
||||
return 404, {"error": "not found"}
|
||||
|
||||
|
||||
# -- server ------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _handle_connection(
|
||||
reader: asyncio.StreamReader,
|
||||
writer: asyncio.StreamWriter,
|
||||
ctx: dict,
|
||||
) -> None:
|
||||
"""Handle a single HTTP request on the control API."""
|
||||
try:
|
||||
data = await asyncio.wait_for(reader.read(8192), timeout=5.0)
|
||||
if not data:
|
||||
return
|
||||
method, path = _parse_request(data)
|
||||
if not method:
|
||||
_json_response(writer, 400, {"error": "bad request"})
|
||||
await writer.drain()
|
||||
return
|
||||
|
||||
status, body = await _route(method, path, ctx)
|
||||
_json_response(writer, status, body)
|
||||
await writer.drain()
|
||||
except (TimeoutError, ConnectionError, OSError):
|
||||
pass
|
||||
except Exception:
|
||||
logger.debug("api: unexpected error handling request", exc_info=True)
|
||||
finally:
|
||||
try:
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
async def start_api(
|
||||
host: str,
|
||||
port: int,
|
||||
ctx: dict,
|
||||
) -> asyncio.Server:
|
||||
"""Start the control API HTTP server.
|
||||
|
||||
Args:
|
||||
host: Bind address.
|
||||
port: Bind port.
|
||||
ctx: Shared context dict with config, metrics, pool, reload_fn.
|
||||
|
||||
Returns:
|
||||
The running asyncio.Server (caller manages lifecycle).
|
||||
"""
|
||||
async def handler(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None:
|
||||
await _handle_connection(r, w, ctx)
|
||||
|
||||
srv = await asyncio.start_server(handler, host, port)
|
||||
addrs = ", ".join(str(s.getsockname()) for s in srv.sockets)
|
||||
logger.info("api: listening on %s", addrs)
|
||||
return srv
|
||||
@@ -54,10 +54,18 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
|
||||
"-S", "--proxy-source", metavar="URL",
|
||||
help="proxy source API URL",
|
||||
)
|
||||
p.add_argument(
|
||||
"--api", metavar="[HOST:]PORT",
|
||||
help="enable control API on address (e.g. 127.0.0.1:1081)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--cprofile", metavar="FILE", nargs="?", const="s5p.prof",
|
||||
help="enable cProfile, dump stats to FILE (default: s5p.prof)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--tracemalloc", metavar="N", nargs="?", const=10, type=int,
|
||||
help="enable tracemalloc, show top N allocators on exit (default: 10)",
|
||||
)
|
||||
return p.parse_args(argv)
|
||||
|
||||
|
||||
@@ -89,6 +97,14 @@ def main(argv: list[str] | None = None) -> int:
|
||||
if args.max_connections is not None:
|
||||
config.max_connections = args.max_connections
|
||||
|
||||
if args.api:
|
||||
if ":" in args.api:
|
||||
host, port_str = args.api.rsplit(":", 1)
|
||||
config.api_host = host
|
||||
config.api_port = int(port_str)
|
||||
else:
|
||||
config.api_port = int(args.api)
|
||||
|
||||
if args.proxy_source:
|
||||
config.proxy_pool = ProxyPoolConfig(
|
||||
sources=[PoolSourceConfig(url=args.proxy_source)],
|
||||
@@ -100,6 +116,11 @@ def main(argv: list[str] | None = None) -> int:
|
||||
config.log_level = "error"
|
||||
|
||||
_setup_logging(config.log_level)
|
||||
logger = logging.getLogger("s5p")
|
||||
|
||||
if args.tracemalloc:
|
||||
import tracemalloc
|
||||
tracemalloc.start()
|
||||
|
||||
try:
|
||||
if args.cprofile:
|
||||
@@ -111,13 +132,21 @@ def main(argv: list[str] | None = None) -> int:
|
||||
finally:
|
||||
prof.disable()
|
||||
prof.dump_stats(args.cprofile)
|
||||
logging.getLogger("s5p").info("profile saved to %s", args.cprofile)
|
||||
logger.info("profile saved to %s", args.cprofile)
|
||||
else:
|
||||
asyncio.run(serve(config))
|
||||
except KeyboardInterrupt:
|
||||
return 0
|
||||
except Exception as e:
|
||||
logging.getLogger("s5p").error("%s", e)
|
||||
logger.error("%s", e)
|
||||
return 1
|
||||
finally:
|
||||
if args.tracemalloc:
|
||||
import tracemalloc
|
||||
snapshot = tracemalloc.take_snapshot()
|
||||
stats = snapshot.statistics("lineno")
|
||||
logger.info("tracemalloc: top %d allocations", args.tracemalloc)
|
||||
for stat in stats[:args.tracemalloc]:
|
||||
logger.info(" %s", stat)
|
||||
|
||||
return 0
|
||||
|
||||
@@ -53,6 +53,17 @@ class ProxyPoolConfig:
|
||||
report_url: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class TorConfig:
|
||||
"""Tor control port configuration."""
|
||||
|
||||
control_host: str = "127.0.0.1"
|
||||
control_port: int = 9051
|
||||
password: str = ""
|
||||
cookie_file: str = ""
|
||||
newnym_interval: float = 0.0 # 0 = manual only
|
||||
|
||||
|
||||
@dataclass
|
||||
class Config:
|
||||
"""Server configuration."""
|
||||
@@ -66,7 +77,10 @@ class Config:
|
||||
max_connections: int = 256
|
||||
pool_size: int = 0
|
||||
pool_max_idle: float = 30.0
|
||||
api_host: str = ""
|
||||
api_port: int = 0
|
||||
proxy_pool: ProxyPoolConfig | None = None
|
||||
tor: TorConfig | None = None
|
||||
config_file: str = ""
|
||||
|
||||
|
||||
@@ -148,6 +162,15 @@ def load_config(path: str | Path) -> Config:
|
||||
if "pool_max_idle" in raw:
|
||||
config.pool_max_idle = float(raw["pool_max_idle"])
|
||||
|
||||
if "api_listen" in raw:
|
||||
api = raw["api_listen"]
|
||||
if isinstance(api, str) and ":" in api:
|
||||
host, port_str = api.rsplit(":", 1)
|
||||
config.api_host = host
|
||||
config.api_port = int(port_str)
|
||||
elif isinstance(api, (str, int)):
|
||||
config.api_port = int(api)
|
||||
|
||||
if "chain" in raw:
|
||||
for entry in raw["chain"]:
|
||||
if isinstance(entry, str):
|
||||
@@ -207,4 +230,14 @@ def load_config(path: str | Path) -> Config:
|
||||
refresh=refresh,
|
||||
)
|
||||
|
||||
if "tor" in raw:
|
||||
tor_raw = raw["tor"]
|
||||
config.tor = TorConfig(
|
||||
control_host=tor_raw.get("control_host", "127.0.0.1"),
|
||||
control_port=int(tor_raw.get("control_port", 9051)),
|
||||
password=tor_raw.get("password", ""),
|
||||
cookie_file=tor_raw.get("cookie_file", ""),
|
||||
newnym_interval=float(tor_raw.get("newnym_interval", 0)),
|
||||
)
|
||||
|
||||
return config
|
||||
|
||||
@@ -34,6 +34,19 @@ class Metrics:
|
||||
f"up={h}h{m:02d}m{s:02d}s"
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Return all counters as a dict (for JSON serialization)."""
|
||||
return {
|
||||
"connections": self.connections,
|
||||
"success": self.success,
|
||||
"failed": self.failed,
|
||||
"retries": self.retries,
|
||||
"active": self.active,
|
||||
"bytes_in": self.bytes_in,
|
||||
"bytes_out": self.bytes_out,
|
||||
"uptime": round(time.monotonic() - self.started, 1),
|
||||
}
|
||||
|
||||
|
||||
def _human_bytes(n: int) -> str:
|
||||
"""Format byte count in human-readable form."""
|
||||
|
||||
@@ -8,11 +8,13 @@ import signal
|
||||
import struct
|
||||
import time
|
||||
|
||||
from .api import start_api
|
||||
from .config import Config, load_config
|
||||
from .connpool import FirstHopPool
|
||||
from .metrics import Metrics
|
||||
from .pool import ProxyPool
|
||||
from .proto import ProtoError, Socks5Reply, build_chain, read_socks5_address
|
||||
from .tor import TorController
|
||||
|
||||
logger = logging.getLogger("s5p")
|
||||
|
||||
@@ -235,6 +237,22 @@ async def serve(config: Config) -> None:
|
||||
)
|
||||
await hop_pool.start()
|
||||
|
||||
tor: TorController | None = None
|
||||
if config.tor:
|
||||
tc = config.tor
|
||||
tor = TorController(
|
||||
host=tc.control_host,
|
||||
port=tc.control_port,
|
||||
password=tc.password,
|
||||
cookie_file=tc.cookie_file,
|
||||
newnym_interval=tc.newnym_interval,
|
||||
)
|
||||
try:
|
||||
await tor.start()
|
||||
except (ConnectionError, OSError, TimeoutError) as e:
|
||||
logger.warning("tor: control port unavailable: %s", e)
|
||||
tor = None
|
||||
|
||||
sem = asyncio.Semaphore(config.max_connections)
|
||||
|
||||
async def on_client(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None:
|
||||
@@ -259,6 +277,23 @@ async def serve(config: Config) -> None:
|
||||
)
|
||||
logger.info(" retries: %d", config.retries)
|
||||
|
||||
if tor:
|
||||
extra = f", newnym every {tor.newnym_interval:.0f}s" if tor.newnym_interval else ""
|
||||
logger.info(
|
||||
" tor: control %s:%d%s", config.tor.control_host, config.tor.control_port, extra,
|
||||
)
|
||||
|
||||
# -- control API ---------------------------------------------------------
|
||||
api_srv: asyncio.Server | None = None
|
||||
if config.api_port:
|
||||
api_ctx: dict = {
|
||||
"config": config,
|
||||
"metrics": metrics,
|
||||
"pool": proxy_pool,
|
||||
"hop_pool": hop_pool,
|
||||
"tor": tor,
|
||||
}
|
||||
|
||||
# SIGHUP: hot-reload config (timeout, retries, log_level, pool settings)
|
||||
async def _reload() -> None:
|
||||
if not config.config_file:
|
||||
@@ -289,6 +324,10 @@ async def serve(config: Config) -> None:
|
||||
|
||||
loop.add_signal_handler(signal.SIGHUP, _on_sighup)
|
||||
|
||||
if config.api_port:
|
||||
api_ctx["reload_fn"] = _reload
|
||||
api_srv = await start_api(config.api_host, config.api_port, api_ctx)
|
||||
|
||||
metrics_stop = asyncio.Event()
|
||||
pool_ref = proxy_pool
|
||||
metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, pool_ref))
|
||||
@@ -296,6 +335,11 @@ async def serve(config: Config) -> None:
|
||||
async with srv:
|
||||
sig = await stop
|
||||
logger.info("received %s, shutting down", signal.Signals(sig).name)
|
||||
if api_srv:
|
||||
api_srv.close()
|
||||
await api_srv.wait_closed()
|
||||
if tor:
|
||||
await tor.stop()
|
||||
if hop_pool:
|
||||
await hop_pool.stop()
|
||||
if proxy_pool:
|
||||
|
||||
204
src/s5p/tor.py
Normal file
204
src/s5p/tor.py
Normal file
@@ -0,0 +1,204 @@
|
||||
"""Tor control port client with NEWNYM support."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
|
||||
logger = logging.getLogger("s5p")
|
||||
|
||||
_NEWNYM_MIN_INTERVAL = 10.0 # Tor enforces 10s between NEWNYMs
|
||||
|
||||
|
||||
class TorController:
|
||||
"""Async client for the Tor control protocol.
|
||||
|
||||
Supports password, cookie, and bare authentication. Provides NEWNYM
|
||||
signaling (new circuit) on demand or on a periodic timer.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str = "127.0.0.1",
|
||||
port: int = 9051,
|
||||
password: str = "",
|
||||
cookie_file: str = "",
|
||||
newnym_interval: float = 0.0,
|
||||
) -> None:
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._password = password
|
||||
self._cookie_file = cookie_file
|
||||
self._newnym_interval = newnym_interval
|
||||
self._reader: asyncio.StreamReader | None = None
|
||||
self._writer: asyncio.StreamWriter | None = None
|
||||
self._last_newnym: float = 0.0
|
||||
self._stop = asyncio.Event()
|
||||
self._tasks: list[asyncio.Task] = []
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
# -- properties ----------------------------------------------------------
|
||||
|
||||
@property
|
||||
def connected(self) -> bool:
|
||||
"""True if the control connection is open."""
|
||||
return self._writer is not None and not self._writer.is_closing()
|
||||
|
||||
@property
|
||||
def last_newnym(self) -> float:
|
||||
"""Monotonic timestamp of the last successful NEWNYM (0 if never)."""
|
||||
return self._last_newnym
|
||||
|
||||
@property
|
||||
def newnym_interval(self) -> float:
|
||||
"""Periodic NEWNYM interval in seconds (0 = manual only)."""
|
||||
return self._newnym_interval
|
||||
|
||||
# -- lifecycle -----------------------------------------------------------
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Connect, authenticate, and start optional newnym loop."""
|
||||
await self._connect()
|
||||
if self._newnym_interval > 0:
|
||||
self._tasks.append(asyncio.create_task(self._newnym_loop()))
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Cancel tasks and close the connection."""
|
||||
self._stop.set()
|
||||
for task in self._tasks:
|
||||
task.cancel()
|
||||
for task in self._tasks:
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._tasks.clear()
|
||||
self._close()
|
||||
|
||||
# -- public commands -----------------------------------------------------
|
||||
|
||||
async def newnym(self) -> bool:
|
||||
"""Send SIGNAL NEWNYM with client-side 10s rate limit.
|
||||
|
||||
Reconnects automatically if the connection was lost.
|
||||
Returns True on success, False on rate-limit or failure.
|
||||
"""
|
||||
now = time.monotonic()
|
||||
if self._last_newnym and (now - self._last_newnym) < _NEWNYM_MIN_INTERVAL:
|
||||
return False
|
||||
|
||||
async with self._lock:
|
||||
try:
|
||||
if not self.connected:
|
||||
await self._connect()
|
||||
code, _ = await self._command("SIGNAL NEWNYM")
|
||||
if code == 250:
|
||||
self._last_newnym = time.monotonic()
|
||||
logger.debug("tor: NEWNYM sent")
|
||||
return True
|
||||
logger.warning("tor: NEWNYM failed: %d", code)
|
||||
return False
|
||||
except (ConnectionError, OSError, TimeoutError) as e:
|
||||
logger.warning("tor: NEWNYM error: %s", e)
|
||||
self._close()
|
||||
return False
|
||||
|
||||
async def get_info(self, keyword: str) -> str | None:
|
||||
"""Send GETINFO and return the response value, or None on error."""
|
||||
async with self._lock:
|
||||
try:
|
||||
if not self.connected:
|
||||
await self._connect()
|
||||
code, lines = await self._command(f"GETINFO {keyword}")
|
||||
if code == 250 and lines:
|
||||
# response format: "keyword=value"
|
||||
for line in lines:
|
||||
if "=" in line:
|
||||
return line.split("=", 1)[1]
|
||||
return None
|
||||
except (ConnectionError, OSError, TimeoutError):
|
||||
self._close()
|
||||
return None
|
||||
|
||||
# -- internals -----------------------------------------------------------
|
||||
|
||||
async def _connect(self) -> None:
|
||||
"""Open TCP connection and authenticate."""
|
||||
self._close()
|
||||
self._reader, self._writer = await asyncio.wait_for(
|
||||
asyncio.open_connection(self._host, self._port),
|
||||
timeout=10.0,
|
||||
)
|
||||
await self._authenticate()
|
||||
logger.info("tor: connected to %s:%d", self._host, self._port)
|
||||
|
||||
async def _authenticate(self) -> None:
|
||||
"""Send AUTHENTICATE with configured credentials."""
|
||||
if self._cookie_file:
|
||||
try:
|
||||
with open(self._cookie_file, "rb") as f:
|
||||
cookie = f.read().hex()
|
||||
cmd = f"AUTHENTICATE {cookie}"
|
||||
except OSError as e:
|
||||
self._close()
|
||||
raise ConnectionError(f"cannot read cookie file: {e}") from e
|
||||
elif self._password:
|
||||
cmd = f'AUTHENTICATE "{self._password}"'
|
||||
else:
|
||||
cmd = "AUTHENTICATE"
|
||||
|
||||
code, _ = await self._command(cmd)
|
||||
if code != 250:
|
||||
self._close()
|
||||
raise ConnectionError(f"tor auth failed: {code}")
|
||||
|
||||
async def _command(self, cmd: str) -> tuple[int, list[str]]:
|
||||
"""Send a command and read the multi-line response.
|
||||
|
||||
Returns (status_code, [response_lines]).
|
||||
"""
|
||||
if not self._writer or not self._reader:
|
||||
raise ConnectionError("not connected")
|
||||
|
||||
self._writer.write(f"{cmd}\r\n".encode())
|
||||
await self._writer.drain()
|
||||
|
||||
lines: list[str] = []
|
||||
while True:
|
||||
raw = await asyncio.wait_for(self._reader.readline(), timeout=10.0)
|
||||
if not raw:
|
||||
raise ConnectionError("connection closed")
|
||||
line = raw.decode("ascii", errors="replace").rstrip("\r\n")
|
||||
if len(line) < 4:
|
||||
raise ConnectionError(f"malformed response: {line!r}")
|
||||
code = int(line[:3])
|
||||
sep = line[3]
|
||||
text = line[4:]
|
||||
lines.append(text)
|
||||
if sep == " ":
|
||||
return code, lines
|
||||
# sep == '-' means continuation
|
||||
|
||||
def _close(self) -> None:
|
||||
"""Close TCP connection silently."""
|
||||
if self._writer:
|
||||
try:
|
||||
self._writer.close()
|
||||
except OSError:
|
||||
pass
|
||||
self._writer = None
|
||||
self._reader = None
|
||||
|
||||
async def _newnym_loop(self) -> None:
|
||||
"""Periodic NEWNYM on configured interval."""
|
||||
while not self._stop.is_set():
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
self._stop.wait(),
|
||||
timeout=self._newnym_interval,
|
||||
)
|
||||
except TimeoutError:
|
||||
pass
|
||||
if not self._stop.is_set():
|
||||
await self.newnym()
|
||||
369
tests/test_api.py
Normal file
369
tests/test_api.py
Normal file
@@ -0,0 +1,369 @@
|
||||
"""Tests for the control API module."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
from s5p.api import (
|
||||
_handle_config,
|
||||
_handle_metrics,
|
||||
_handle_pool,
|
||||
_handle_status,
|
||||
_handle_tor,
|
||||
_handle_tor_newnym,
|
||||
_json_response,
|
||||
_parse_request,
|
||||
_route,
|
||||
)
|
||||
from s5p.config import ChainHop, Config, PoolSourceConfig, ProxyPoolConfig
|
||||
from s5p.metrics import Metrics
|
||||
|
||||
# -- request parsing ---------------------------------------------------------
|
||||
|
||||
|
||||
class TestParseRequest:
|
||||
"""Test HTTP request line parsing."""
|
||||
|
||||
def test_get(self):
|
||||
assert _parse_request(b"GET /status HTTP/1.1\r\n") == ("GET", "/status")
|
||||
|
||||
def test_post(self):
|
||||
assert _parse_request(b"POST /reload HTTP/1.1\r\n") == ("POST", "/reload")
|
||||
|
||||
def test_strips_query_string(self):
|
||||
assert _parse_request(b"GET /pool?foo=bar HTTP/1.1\r\n") == ("GET", "/pool")
|
||||
|
||||
def test_method_uppercased(self):
|
||||
assert _parse_request(b"get /metrics HTTP/1.1\r\n") == ("GET", "/metrics")
|
||||
|
||||
def test_empty(self):
|
||||
assert _parse_request(b"") == ("", "")
|
||||
|
||||
def test_garbage(self):
|
||||
assert _parse_request(b"\xff\xfe") == ("", "")
|
||||
|
||||
def test_incomplete(self):
|
||||
assert _parse_request(b"GET\r\n") == ("", "")
|
||||
|
||||
|
||||
# -- JSON response -----------------------------------------------------------
|
||||
|
||||
|
||||
class TestJsonResponse:
|
||||
"""Test HTTP JSON response builder."""
|
||||
|
||||
def test_format(self):
|
||||
writer = MagicMock()
|
||||
written = bytearray()
|
||||
writer.write = lambda data: written.extend(data)
|
||||
|
||||
_json_response(writer, 200, {"ok": True})
|
||||
text = written.decode()
|
||||
assert "HTTP/1.1 200 OK\r\n" in text
|
||||
assert "Content-Type: application/json\r\n" in text
|
||||
assert "Connection: close\r\n" in text
|
||||
# body after double newline
|
||||
body = text.split("\r\n\r\n", 1)[1]
|
||||
assert json.loads(body) == {"ok": True}
|
||||
|
||||
def test_404(self):
|
||||
writer = MagicMock()
|
||||
written = bytearray()
|
||||
writer.write = lambda data: written.extend(data)
|
||||
|
||||
_json_response(writer, 404, {"error": "not found"})
|
||||
text = written.decode()
|
||||
assert "HTTP/1.1 404 Not Found\r\n" in text
|
||||
|
||||
|
||||
# -- helpers -----------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_ctx(
|
||||
config: Config | None = None,
|
||||
pool: MagicMock | None = None,
|
||||
tor: MagicMock | None = None,
|
||||
) -> dict:
|
||||
"""Build a mock context dict."""
|
||||
return {
|
||||
"config": config or Config(),
|
||||
"metrics": Metrics(),
|
||||
"pool": pool,
|
||||
"hop_pool": None,
|
||||
"tor": tor,
|
||||
}
|
||||
|
||||
|
||||
# -- GET handlers ------------------------------------------------------------
|
||||
|
||||
|
||||
class TestHandleStatus:
|
||||
"""Test GET /status handler."""
|
||||
|
||||
def test_basic(self):
|
||||
ctx = _make_ctx()
|
||||
ctx["metrics"].connections = 10
|
||||
ctx["metrics"].success = 8
|
||||
status, body = _handle_status(ctx)
|
||||
assert status == 200
|
||||
assert body["connections"] == 10
|
||||
assert body["success"] == 8
|
||||
assert "uptime" in body
|
||||
|
||||
def test_with_pool(self):
|
||||
pool = MagicMock()
|
||||
pool.alive_count = 5
|
||||
pool.count = 10
|
||||
ctx = _make_ctx(pool=pool)
|
||||
_, body = _handle_status(ctx)
|
||||
assert body["pool"] == {"alive": 5, "total": 10}
|
||||
|
||||
def test_with_chain(self):
|
||||
config = Config(chain=[ChainHop("socks5", "127.0.0.1", 9050)])
|
||||
ctx = _make_ctx(config=config)
|
||||
_, body = _handle_status(ctx)
|
||||
assert body["chain"] == ["socks5://127.0.0.1:9050"]
|
||||
|
||||
|
||||
class TestHandleMetrics:
|
||||
"""Test GET /metrics handler."""
|
||||
|
||||
def test_returns_dict(self):
|
||||
ctx = _make_ctx()
|
||||
ctx["metrics"].connections = 42
|
||||
ctx["metrics"].bytes_in = 1024
|
||||
status, body = _handle_metrics(ctx)
|
||||
assert status == 200
|
||||
assert body["connections"] == 42
|
||||
assert body["bytes_in"] == 1024
|
||||
assert "uptime" in body
|
||||
|
||||
|
||||
class TestHandlePool:
|
||||
"""Test GET /pool handler."""
|
||||
|
||||
def test_no_pool(self):
|
||||
ctx = _make_ctx()
|
||||
status, body = _handle_pool(ctx)
|
||||
assert status == 200
|
||||
assert body == {"alive": 0, "total": 0, "proxies": {}}
|
||||
|
||||
def test_with_entries(self):
|
||||
pool = MagicMock()
|
||||
pool.alive_count = 1
|
||||
pool.count = 2
|
||||
entry_alive = MagicMock(
|
||||
alive=True, fails=0, tests=5,
|
||||
last_ok=100.0, last_test=100.0, last_seen=100.0,
|
||||
)
|
||||
entry_dead = MagicMock(
|
||||
alive=False, fails=3, tests=5,
|
||||
last_ok=0.0, last_test=100.0, last_seen=100.0,
|
||||
)
|
||||
pool._proxies = {
|
||||
"socks5://1.2.3.4:1080": entry_alive,
|
||||
"socks5://5.6.7.8:1080": entry_dead,
|
||||
}
|
||||
ctx = _make_ctx(pool=pool)
|
||||
_, body = _handle_pool(ctx)
|
||||
assert len(body["proxies"]) == 2
|
||||
assert body["proxies"]["socks5://1.2.3.4:1080"]["alive"] is True
|
||||
|
||||
def test_alive_only(self):
|
||||
pool = MagicMock()
|
||||
pool.alive_count = 1
|
||||
pool.count = 2
|
||||
entry_alive = MagicMock(
|
||||
alive=True, fails=0, tests=5,
|
||||
last_ok=100.0, last_test=100.0, last_seen=100.0,
|
||||
)
|
||||
entry_dead = MagicMock(
|
||||
alive=False, fails=3, tests=5,
|
||||
last_ok=0.0, last_test=100.0, last_seen=100.0,
|
||||
)
|
||||
pool._proxies = {
|
||||
"socks5://1.2.3.4:1080": entry_alive,
|
||||
"socks5://5.6.7.8:1080": entry_dead,
|
||||
}
|
||||
ctx = _make_ctx(pool=pool)
|
||||
_, body = _handle_pool(ctx, alive_only=True)
|
||||
assert len(body["proxies"]) == 1
|
||||
assert "socks5://1.2.3.4:1080" in body["proxies"]
|
||||
|
||||
|
||||
class TestHandleConfig:
|
||||
"""Test GET /config handler."""
|
||||
|
||||
def test_basic(self):
|
||||
config = Config(timeout=15.0, retries=5, log_level="debug")
|
||||
ctx = _make_ctx(config=config)
|
||||
status, body = _handle_config(ctx)
|
||||
assert status == 200
|
||||
assert body["timeout"] == 15.0
|
||||
assert body["retries"] == 5
|
||||
assert body["log_level"] == "debug"
|
||||
|
||||
def test_with_proxy_pool(self):
|
||||
pp = ProxyPoolConfig(
|
||||
sources=[PoolSourceConfig(url="http://api:8081/proxies")],
|
||||
refresh=600.0,
|
||||
test_interval=60.0,
|
||||
max_fails=5,
|
||||
)
|
||||
config = Config(proxy_pool=pp)
|
||||
ctx = _make_ctx(config=config)
|
||||
_, body = _handle_config(ctx)
|
||||
assert body["proxy_pool"]["refresh"] == 600.0
|
||||
assert body["proxy_pool"]["sources"][0]["url"] == "http://api:8081/proxies"
|
||||
|
||||
|
||||
# -- routing -----------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRouting:
|
||||
"""Test request routing and error responses."""
|
||||
|
||||
def test_get_status(self):
|
||||
ctx = _make_ctx()
|
||||
status, body = asyncio.run(_route("GET", "/status", ctx))
|
||||
assert status == 200
|
||||
|
||||
def test_get_metrics(self):
|
||||
ctx = _make_ctx()
|
||||
status, _ = asyncio.run(_route("GET", "/metrics", ctx))
|
||||
assert status == 200
|
||||
|
||||
def test_get_pool(self):
|
||||
ctx = _make_ctx()
|
||||
status, _ = asyncio.run(_route("GET", "/pool", ctx))
|
||||
assert status == 200
|
||||
|
||||
def test_get_pool_alive(self):
|
||||
ctx = _make_ctx()
|
||||
status, _ = asyncio.run(_route("GET", "/pool/alive", ctx))
|
||||
assert status == 200
|
||||
|
||||
def test_get_config(self):
|
||||
ctx = _make_ctx()
|
||||
status, _ = asyncio.run(_route("GET", "/config", ctx))
|
||||
assert status == 200
|
||||
|
||||
def test_post_reload(self):
|
||||
ctx = _make_ctx()
|
||||
ctx["reload_fn"] = AsyncMock()
|
||||
status, body = asyncio.run(_route("POST", "/reload", ctx))
|
||||
assert status == 200
|
||||
assert body == {"ok": True}
|
||||
|
||||
def test_post_pool_test(self):
|
||||
pool = MagicMock()
|
||||
pool._run_health_tests = AsyncMock()
|
||||
ctx = _make_ctx(pool=pool)
|
||||
status, body = asyncio.run(_route("POST", "/pool/test", ctx))
|
||||
assert status == 200
|
||||
assert body == {"ok": True}
|
||||
|
||||
def test_post_pool_refresh(self):
|
||||
pool = MagicMock()
|
||||
pool._fetch_all_sources = AsyncMock()
|
||||
ctx = _make_ctx(pool=pool)
|
||||
status, body = asyncio.run(_route("POST", "/pool/refresh", ctx))
|
||||
assert status == 200
|
||||
assert body == {"ok": True}
|
||||
|
||||
def test_404(self):
|
||||
ctx = _make_ctx()
|
||||
status, body = asyncio.run(_route("GET", "/nonexistent", ctx))
|
||||
assert status == 404
|
||||
assert "error" in body
|
||||
|
||||
def test_405_wrong_method(self):
|
||||
ctx = _make_ctx()
|
||||
status, body = asyncio.run(_route("POST", "/status", ctx))
|
||||
assert status == 405
|
||||
assert "GET" in body["error"]
|
||||
|
||||
def test_405_get_on_post_route(self):
|
||||
ctx = _make_ctx()
|
||||
status, body = asyncio.run(_route("GET", "/reload", ctx))
|
||||
assert status == 405
|
||||
assert "POST" in body["error"]
|
||||
|
||||
def test_get_tor(self):
|
||||
ctx = _make_ctx()
|
||||
status, body = asyncio.run(_route("GET", "/tor", ctx))
|
||||
assert status == 200
|
||||
assert body == {"enabled": False}
|
||||
|
||||
def test_post_tor_newnym(self):
|
||||
tor = MagicMock()
|
||||
tor.newnym = AsyncMock(return_value=True)
|
||||
ctx = _make_ctx(tor=tor)
|
||||
status, body = asyncio.run(_route("POST", "/tor/newnym", ctx))
|
||||
assert status == 200
|
||||
assert body == {"ok": True}
|
||||
|
||||
|
||||
# -- Tor endpoints ----------------------------------------------------------
|
||||
|
||||
|
||||
class TestHandleTor:
|
||||
"""Test GET /tor handler."""
|
||||
|
||||
def test_disabled(self):
|
||||
ctx = _make_ctx()
|
||||
status, body = _handle_tor(ctx)
|
||||
assert status == 200
|
||||
assert body == {"enabled": False}
|
||||
|
||||
def test_connected(self):
|
||||
tor = MagicMock()
|
||||
tor.connected = True
|
||||
tor.last_newnym = 0.0
|
||||
tor.newnym_interval = 60.0
|
||||
ctx = _make_ctx(tor=tor)
|
||||
status, body = _handle_tor(ctx)
|
||||
assert status == 200
|
||||
assert body["enabled"] is True
|
||||
assert body["connected"] is True
|
||||
assert body["last_newnym"] is None
|
||||
assert body["newnym_interval"] == 60.0
|
||||
|
||||
def test_with_last_newnym(self):
|
||||
import time
|
||||
tor = MagicMock()
|
||||
tor.connected = True
|
||||
tor.last_newnym = time.monotonic() - 42.0
|
||||
tor.newnym_interval = 0.0
|
||||
ctx = _make_ctx(tor=tor)
|
||||
status, body = _handle_tor(ctx)
|
||||
assert status == 200
|
||||
assert body["last_newnym"] is not None
|
||||
assert body["last_newnym"] >= 42.0
|
||||
|
||||
|
||||
class TestHandleTorNewnym:
|
||||
"""Test POST /tor/newnym handler."""
|
||||
|
||||
def test_success(self):
|
||||
tor = MagicMock()
|
||||
tor.newnym = AsyncMock(return_value=True)
|
||||
ctx = _make_ctx(tor=tor)
|
||||
status, body = asyncio.run(_handle_tor_newnym(ctx))
|
||||
assert status == 200
|
||||
assert body == {"ok": True}
|
||||
|
||||
def test_rate_limited(self):
|
||||
tor = MagicMock()
|
||||
tor.newnym = AsyncMock(return_value=False)
|
||||
ctx = _make_ctx(tor=tor)
|
||||
status, body = asyncio.run(_handle_tor_newnym(ctx))
|
||||
assert status == 200
|
||||
assert body["ok"] is False
|
||||
assert "reason" in body
|
||||
|
||||
def test_not_configured(self):
|
||||
ctx = _make_ctx()
|
||||
status, body = asyncio.run(_handle_tor_newnym(ctx))
|
||||
assert status == 400
|
||||
assert "error" in body
|
||||
@@ -141,3 +141,37 @@ class TestConfig:
|
||||
c = load_config(cfg_file)
|
||||
assert c.pool_size == 16
|
||||
assert c.pool_max_idle == 45.0
|
||||
|
||||
def test_tor_config_from_yaml(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text(
|
||||
"tor:\n"
|
||||
" control_host: 10.0.0.1\n"
|
||||
" control_port: 9151\n"
|
||||
" password: secret\n"
|
||||
" cookie_file: /var/run/tor/cookie\n"
|
||||
" newnym_interval: 60\n"
|
||||
)
|
||||
c = load_config(cfg_file)
|
||||
assert c.tor is not None
|
||||
assert c.tor.control_host == "10.0.0.1"
|
||||
assert c.tor.control_port == 9151
|
||||
assert c.tor.password == "secret"
|
||||
assert c.tor.cookie_file == "/var/run/tor/cookie"
|
||||
assert c.tor.newnym_interval == 60.0
|
||||
|
||||
def test_tor_config_defaults(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text("tor:\n password: test\n")
|
||||
c = load_config(cfg_file)
|
||||
assert c.tor is not None
|
||||
assert c.tor.control_host == "127.0.0.1"
|
||||
assert c.tor.control_port == 9051
|
||||
assert c.tor.cookie_file == ""
|
||||
assert c.tor.newnym_interval == 0.0
|
||||
|
||||
def test_no_tor_config(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text("listen: 1080\n")
|
||||
c = load_config(cfg_file)
|
||||
assert c.tor is None
|
||||
|
||||
258
tests/test_tor.py
Normal file
258
tests/test_tor.py
Normal file
@@ -0,0 +1,258 @@
|
||||
"""Tests for the Tor control port client."""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, mock_open, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from s5p.tor import TorController
|
||||
|
||||
# -- helpers -----------------------------------------------------------------
|
||||
|
||||
|
||||
def _mock_reader(responses: list[bytes]) -> AsyncMock:
|
||||
"""Create a mock StreamReader that yields canned lines."""
|
||||
reader = AsyncMock(spec=asyncio.StreamReader)
|
||||
reader.readline = AsyncMock(side_effect=responses)
|
||||
return reader
|
||||
|
||||
|
||||
def _mock_writer() -> MagicMock:
|
||||
"""Create a mock StreamWriter."""
|
||||
writer = MagicMock(spec=asyncio.StreamWriter)
|
||||
writer.write = MagicMock()
|
||||
writer.drain = AsyncMock()
|
||||
writer.close = MagicMock()
|
||||
writer.wait_closed = AsyncMock()
|
||||
writer.is_closing = MagicMock(return_value=False)
|
||||
return writer
|
||||
|
||||
|
||||
# -- authentication ----------------------------------------------------------
|
||||
|
||||
|
||||
class TestAuthentication:
|
||||
"""Test Tor control port authentication modes."""
|
||||
|
||||
def test_password_auth(self):
|
||||
reader = _mock_reader([b"250 OK\r\n"])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController(password="secret")
|
||||
await tc._connect()
|
||||
# verify AUTHENTICATE command was sent with password
|
||||
calls = writer.write.call_args_list
|
||||
assert any(b'AUTHENTICATE "secret"' in c[0][0] for c in calls)
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_cookie_auth(self):
|
||||
reader = _mock_reader([b"250 OK\r\n"])
|
||||
writer = _mock_writer()
|
||||
cookie_data = b"\xde\xad\xbe\xef"
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
with patch("builtins.open", mock_open(read_data=cookie_data)):
|
||||
tc = TorController(cookie_file="/var/run/tor/control.authcookie")
|
||||
await tc._connect()
|
||||
calls = writer.write.call_args_list
|
||||
assert any(b"AUTHENTICATE deadbeef" in c[0][0] for c in calls)
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_bare_auth(self):
|
||||
reader = _mock_reader([b"250 OK\r\n"])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController()
|
||||
await tc._connect()
|
||||
calls = writer.write.call_args_list
|
||||
assert any(c[0][0] == b"AUTHENTICATE\r\n" for c in calls)
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_auth_failure(self):
|
||||
reader = _mock_reader([b"515 Bad authentication\r\n"])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController(password="wrong")
|
||||
with pytest.raises(ConnectionError, match="auth failed"):
|
||||
await tc._connect()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_connect_failure(self):
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", side_effect=OSError("refused")):
|
||||
tc = TorController()
|
||||
with pytest.raises(OSError, match="refused"):
|
||||
await tc._connect()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
|
||||
# -- NEWNYM ------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestNewnym:
|
||||
"""Test NEWNYM signaling."""
|
||||
|
||||
def test_newnym_success(self):
|
||||
# auth response + newnym response
|
||||
reader = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController()
|
||||
await tc._connect()
|
||||
ok = await tc.newnym()
|
||||
assert ok is True
|
||||
assert tc.last_newnym > 0
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_newnym_rate_limited(self):
|
||||
# auth + first newnym
|
||||
reader = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController()
|
||||
await tc._connect()
|
||||
ok = await tc.newnym()
|
||||
assert ok is True
|
||||
# immediate second call should be rate-limited
|
||||
ok2 = await tc.newnym()
|
||||
assert ok2 is False
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_newnym_reconnects_on_disconnect(self):
|
||||
# first connect auth, then reconnect auth + newnym
|
||||
reader1 = _mock_reader([b"250 OK\r\n"])
|
||||
writer1 = _mock_writer()
|
||||
reader2 = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"])
|
||||
writer2 = _mock_writer()
|
||||
|
||||
call_count = 0
|
||||
|
||||
async def fake_connect(*args, **kwargs):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count == 1:
|
||||
return reader1, writer1
|
||||
return reader2, writer2
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", side_effect=fake_connect):
|
||||
tc = TorController()
|
||||
await tc._connect()
|
||||
# simulate disconnect
|
||||
tc._close()
|
||||
assert not tc.connected
|
||||
# newnym should reconnect
|
||||
ok = await tc.newnym()
|
||||
assert ok is True
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
|
||||
# -- GETINFO -----------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetInfo:
|
||||
"""Test GETINFO command."""
|
||||
|
||||
def test_getinfo_version(self):
|
||||
# auth + getinfo multi-line response
|
||||
reader = _mock_reader([
|
||||
b"250 OK\r\n",
|
||||
b"250-version=0.4.8.12\r\n",
|
||||
b"250 OK\r\n",
|
||||
])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController()
|
||||
await tc._connect()
|
||||
version = await tc.get_info("version")
|
||||
assert version == "0.4.8.12"
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_getinfo_not_connected(self):
|
||||
# auth for reconnect + getinfo
|
||||
reader = _mock_reader([
|
||||
b"250 OK\r\n",
|
||||
b"250-traffic/read=12345\r\n",
|
||||
b"250 OK\r\n",
|
||||
])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController()
|
||||
# not connected, should auto-connect
|
||||
result = await tc.get_info("traffic/read")
|
||||
assert result == "12345"
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
|
||||
# -- lifecycle ---------------------------------------------------------------
|
||||
|
||||
|
||||
class TestLifecycle:
|
||||
"""Test start/stop lifecycle."""
|
||||
|
||||
def test_start_stop(self):
|
||||
reader = _mock_reader([b"250 OK\r\n"])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController()
|
||||
await tc.start()
|
||||
assert tc.connected
|
||||
await tc.stop()
|
||||
assert not tc.connected
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_start_with_newnym_loop(self):
|
||||
reader = _mock_reader([b"250 OK\r\n"])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController(newnym_interval=60.0)
|
||||
await tc.start()
|
||||
assert len(tc._tasks) == 1
|
||||
await tc.stop()
|
||||
assert len(tc._tasks) == 0
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_properties(self):
|
||||
tc = TorController(newnym_interval=30.0)
|
||||
assert not tc.connected
|
||||
assert tc.last_newnym == 0.0
|
||||
assert tc.newnym_interval == 30.0
|
||||
Reference in New Issue
Block a user