feat: control API and Tor integration #1

Merged
username merged 9 commits from feat/control-api into main 2026-02-17 09:56:06 +00:00
16 changed files with 1488 additions and 7 deletions

View File

@@ -25,6 +25,7 @@ Client -------> s5p -------> Hop 1 -------> Hop 2 -------> Target
- **pool.py** -- managed proxy pool (multi-source, health-tested, persistent)
- **http.py** -- minimal async HTTP/1.1 client (GET/POST JSON, no external deps)
- **connpool.py** -- pre-warmed TCP connection pool to first chain hop
- **api.py** -- built-in HTTP control API (runtime metrics, pool state, config reload)
- **cli.py** -- argparse CLI, logging setup, cProfile support
- **metrics.py** -- connection counters and human-readable summary (lock-free, asyncio-only)
@@ -66,3 +67,4 @@ All other functionality uses Python stdlib (`asyncio`, `socket`, `struct`).
- **Connection semaphore** -- cap concurrent connections to prevent fd exhaustion
- **Async HTTP** -- native asyncio HTTP client replaces blocking urllib, parallel fetches
- **First-hop pool** -- pre-warmed TCP connections to chain[0], stale-evicted, auto-refilled
- **Control API** -- built-in asyncio HTTP server, no Flask/external deps, disabled by default

View File

@@ -10,7 +10,7 @@ through configurable chains of SOCKS4, SOCKS5, and HTTP CONNECT proxies.
- Supported hop protocols: SOCKS5, SOCKS4/4a, HTTP CONNECT
- Per-hop authentication (username/password)
- DNS leak prevention (domain names forwarded to proxies, never resolved locally)
- Tor integration (Tor is just another SOCKS5 hop)
- Tor integration (SOCKS5 hop + control port NEWNYM for circuit rotation)
- Managed proxy pool: multiple sources (API + file), health-tested, weighted selection
- Per-proxy failure backoff (60s cooldown), stale proxy expiry, chain pre-flight
- Fast warm start (seconds on restart vs minutes on cold start)
@@ -21,6 +21,7 @@ through configurable chains of SOCKS4, SOCKS5, and HTTP CONNECT proxies.
- Concurrent connection limit with backpressure (`max_connections`)
- Async HTTP client for proxy source fetching (parallel, no threads)
- First-hop TCP connection pool (pre-warmed, stale-evicted)
- Built-in control API (runtime metrics, pool state, config reload via HTTP)
- Container-ready (Alpine-based, podman/docker)
- Graceful shutdown (SIGTERM/SIGINT)
- Pure Python, asyncio-based, minimal dependencies
@@ -70,10 +71,16 @@ timeout: 10
retries: 3
max_connections: 256 # concurrent connection limit
pool_size: 8 # pre-warmed connections to first hop
api_listen: 127.0.0.1:1081 # control API (disabled by default)
chain:
- socks5://127.0.0.1:9050 # Tor
tor:
control_port: 9051 # Tor control port (NEWNYM)
password: "" # or cookie_file for auth
newnym_interval: 0 # periodic circuit rotation (0 = manual)
proxy_pool:
sources:
- url: http://10.200.1.250:8081/proxies
@@ -89,7 +96,7 @@ proxy_pool:
## CLI Reference
```
s5p [-c FILE] [-l [HOST:]PORT] [-C URL[,URL,...]] [-S URL] [-t SEC] [-r N] [-m N] [-v|-q]
s5p [-c FILE] [-l [HOST:]PORT] [-C URL[,URL,...]] [-S URL] [-t SEC] [-r N] [-m N] [--api [HOST:]PORT] [-v|-q]
Options:
-c, --config FILE YAML config file
@@ -99,9 +106,11 @@ Options:
-t, --timeout SEC Per-hop timeout (default: 10)
-r, --retries N Max attempts per connection (default: 3, proxy_source only)
-m, --max-connections N Max concurrent connections (default: 256)
--api [HOST:]PORT Enable control API (e.g. 127.0.0.1:1081)
-v, --verbose Debug logging
-q, --quiet Errors only
--cprofile [FILE] Enable cProfile, dump to FILE (default: s5p.prof)
--tracemalloc [N] Enable tracemalloc, show top N allocators on exit (default: 10)
-V, --version Show version
```

View File

@@ -22,8 +22,9 @@
## v0.2.0
- [x] Built-in control API (runtime metrics, pool state, config reload)
- [ ] SOCKS5 server authentication (username/password)
- [ ] Tor control port integration (circuit renewal via NEWNYM)
- [x] Tor control port integration (circuit renewal via NEWNYM)
- [ ] Metrics (connections/sec, bytes relayed, hop latency)
## v0.3.0

View File

@@ -42,8 +42,10 @@
- [x] Instant warm start (trust cached state, defer all health tests)
- [x] Register signal handlers before startup (fix SIGKILL on stop)
- [x] Use k8s-file logging driver with rotation
- [x] Built-in control API (`api.py`, `--api`, `api_listen`)
- [x] Tor control port integration (NEWNYM signaling, periodic rotation)
## Next
- [ ] Integration tests with mock proxy server
- [ ] SOCKS5 server-side authentication
- [ ] Tor control port integration

View File

@@ -8,6 +8,7 @@ log_level: info
# max_connections: 256 # max concurrent client connections (backpressure)
# pool_size: 0 # pre-warmed TCP connections to first hop (0 = disabled)
# pool_max_idle: 30 # max idle time (seconds) for pooled connections
# api_listen: 127.0.0.1:1081 # control API (disabled by default)
# Proxy chain -- connections tunnel through each hop in order.
# Supported protocols: socks5://, socks4://, http://
@@ -37,6 +38,15 @@ chain:
# state_file: "" # empty = ~/.cache/s5p/pool.json
# report_url: "" # POST dead proxies here (optional)
# Tor control port -- enables NEWNYM signaling (new circuit on demand).
# Requires Tor's ControlPort enabled (torrc: ControlPort 9051).
# tor:
# control_host: 127.0.0.1
# control_port: 9051
# password: "" # HashedControlPassword in torrc
# cookie_file: "" # CookieAuthentication file path
# newnym_interval: 0 # periodic NEWNYM (seconds, 0 = manual only)
# Legacy proxy source (still supported, auto-converts to proxy_pool):
# proxy_source:
# url: http://10.200.1.250:8081/proxies

View File

@@ -14,8 +14,11 @@ s5p -q # errors only
s5p -S http://api:8081/proxies # proxy source API
s5p -r 5 # retry up to 5 proxies
s5p -m 512 # max concurrent connections
s5p --api 127.0.0.1:1081 # enable control API
s5p --cprofile # profile to s5p.prof
s5p --cprofile out.prof # profile to custom file
s5p --tracemalloc # memory profile (top 10)
s5p --tracemalloc 20 # memory profile (top 20)
```
## Container
@@ -58,6 +61,20 @@ proxy_pool:
report_url: "" # POST dead proxies (optional)
```
## Tor Control Port (config)
```yaml
tor:
control_port: 9051
password: "" # or cookie_file: /path/to/cookie
newnym_interval: 60 # auto-rotate every 60s (0 = manual)
```
```bash
curl -s http://127.0.0.1:1081/tor | jq . # status
curl -s -X POST http://127.0.0.1:1081/tor/newnym | jq . # new circuit
```
## Hot Reload
```bash
@@ -84,6 +101,20 @@ http://host:port
http://user:pass@host:port
```
## Control API
```bash
s5p --api 127.0.0.1:1081 -c config/s5p.yaml # enable API
curl -s http://127.0.0.1:1081/status | jq . # runtime status
curl -s http://127.0.0.1:1081/metrics | jq . # full metrics
curl -s http://127.0.0.1:1081/pool | jq . # all proxies
curl -s http://127.0.0.1:1081/pool/alive | jq . # alive only
curl -s http://127.0.0.1:1081/config | jq . # current config
curl -s -X POST http://127.0.0.1:1081/reload # reload config
curl -s -X POST http://127.0.0.1:1081/pool/test # health test now
```
## Testing
```bash

View File

@@ -46,6 +46,7 @@ log_level: info
max_connections: 256 # concurrent connection limit (backpressure)
pool_size: 0 # pre-warmed TCP connections to first hop (0 = disabled)
pool_max_idle: 30 # max idle time for pooled connections (seconds)
api_listen: "" # control API bind address (empty = disabled)
chain:
- socks5://127.0.0.1:9050
@@ -216,6 +217,133 @@ other pool settings).
The old `proxy_source` key is still supported and auto-converts to `proxy_pool`
with a single API source. `proxy_pool` takes precedence if both are present.
## Control API
Built-in HTTP API for runtime inspection and management. Disabled by default;
enable with `api_listen` in config or `--api` on the command line.
```yaml
api_listen: 127.0.0.1:1081
```
```bash
s5p --api 127.0.0.1:1081 -c config/s5p.yaml
```
### Read endpoints
| Method | Path | Description |
|--------|------|-------------|
| `GET` | `/status` | Combined summary: uptime, metrics, pool stats, chain |
| `GET` | `/metrics` | Full metrics counters (connections, bytes, uptime) |
| `GET` | `/pool` | All proxies with per-entry state |
| `GET` | `/pool/alive` | Alive proxies only |
| `GET` | `/config` | Current runtime config (sanitized) |
### Write endpoints
| Method | Path | Description |
|--------|------|-------------|
| `POST` | `/reload` | Re-read config file (replaces SIGHUP) |
| `POST` | `/pool/test` | Trigger immediate health test cycle |
| `POST` | `/pool/refresh` | Trigger immediate source re-fetch |
All responses are `application/json`. Errors return `{"error": "message"}` with
appropriate status code (400, 404, 405, 500).
### Examples
```bash
# Runtime status
curl -s http://127.0.0.1:1081/status | jq .
# Full metrics
curl -s http://127.0.0.1:1081/metrics | jq .
# Pool state (all proxies)
curl -s http://127.0.0.1:1081/pool | jq .
# Alive proxies only
curl -s http://127.0.0.1:1081/pool/alive | jq '.proxies | length'
# Current config
curl -s http://127.0.0.1:1081/config | jq .
# Reload config (like SIGHUP)
curl -s -X POST http://127.0.0.1:1081/reload | jq .
# Trigger health tests now
curl -s -X POST http://127.0.0.1:1081/pool/test | jq .
# Re-fetch proxy sources now
curl -s -X POST http://127.0.0.1:1081/pool/refresh | jq .
```
Settings that require a restart: `listen`, `chain`, `pool_size`, `pool_max_idle`, `api_listen`.
## Tor Control Port
Optional integration with Tor's control protocol for circuit management.
When enabled, s5p connects to Tor's control port and can send NEWNYM signals
to request new circuits (new exit node) on demand or on a timer.
### Configuration
```yaml
tor:
control_host: 127.0.0.1 # Tor control address
control_port: 9051 # Tor control port
password: "" # HashedControlPassword (torrc)
cookie_file: "" # CookieAuthentication file path
newnym_interval: 0 # periodic NEWNYM in seconds (0 = manual only)
```
Requires Tor's `ControlPort` enabled in `torrc`:
```
ControlPort 9051
HashedControlPassword 16:... # or CookieAuthentication 1
```
### Authentication modes
| Mode | Config | torrc |
|------|--------|-------|
| Password | `password: "secret"` | `HashedControlPassword 16:...` |
| Cookie | `cookie_file: /var/run/tor/control.authcookie` | `CookieAuthentication 1` |
| None | (leave both empty) | No auth configured |
### Rate limiting
Tor enforces a minimum 10-second interval between NEWNYM signals. s5p
applies the same client-side rate limit to avoid unnecessary rejections.
### API endpoints
| Method | Path | Description |
|--------|------|-------------|
| `GET` | `/tor` | Controller status (enabled, connected, last NEWNYM) |
| `POST` | `/tor/newnym` | Trigger NEWNYM signal (new circuit) |
```bash
# Check tor controller status
curl -s http://127.0.0.1:1081/tor | jq .
# Request new circuit
curl -s -X POST http://127.0.0.1:1081/tor/newnym | jq .
```
### Periodic NEWNYM
Set `newnym_interval` to automatically rotate circuits:
```yaml
tor:
newnym_interval: 60 # new circuit every 60 seconds
```
Values below 10 are clamped to Tor's minimum interval.
## Connection Retry
When a proxy pool is active, s5p retries failed connections with a different
@@ -250,7 +378,7 @@ Settings reloaded on SIGHUP:
| `max_connections` | Concurrent connection limit |
| `proxy_pool.*` | Sources, intervals, thresholds |
Settings that require a restart: `listen`, `chain`, `pool_size`, `pool_max_idle`.
Settings that require a restart: `listen`, `chain`, `pool_size`, `pool_max_idle`, `api_listen`.
Requires `-c` / `--config` to know which file to re-read. Without a
config file, SIGHUP is ignored with a warning.
@@ -291,6 +419,15 @@ s5p --cprofile output.prof -c config/s5p.yaml
# Analyze after stopping
python -m pstats s5p.prof
# Memory profiling with tracemalloc (top 10 allocators on exit)
s5p --tracemalloc -c config/s5p.yaml
# Show top 20 allocators
s5p --tracemalloc 20 -c config/s5p.yaml
# Both profilers simultaneously
s5p --cprofile --tracemalloc -c config/s5p.yaml
```
## Testing the Proxy

305
src/s5p/api.py Normal file
View File

@@ -0,0 +1,305 @@
"""Built-in HTTP control API for runtime inspection and management."""
from __future__ import annotations
import asyncio
import json
import logging
import time
from .metrics import Metrics
logger = logging.getLogger("s5p")
# -- HTTP helpers ------------------------------------------------------------
def _parse_request(data: bytes) -> tuple[str, str]:
"""Extract method and path from an HTTP/1.1 request line.
Returns (method, path) or ("", "") on parse failure.
"""
try:
line = data.split(b"\r\n", 1)[0].decode("ascii")
except (UnicodeDecodeError, IndexError):
return "", ""
parts = line.split(None, 2)
if len(parts) < 2:
return "", ""
return parts[0].upper(), parts[1].split("?", 1)[0]
def _json_response(
writer: asyncio.StreamWriter,
status: int,
body: dict | list,
) -> None:
"""Write an HTTP response with JSON body and close."""
phrases = {200: "OK", 400: "Bad Request", 404: "Not Found",
405: "Method Not Allowed", 500: "Internal Server Error"}
payload = json.dumps(body, separators=(",", ":")).encode()
header = (
f"HTTP/1.1 {status} {phrases.get(status, 'Error')}\r\n"
f"Content-Type: application/json\r\n"
f"Content-Length: {len(payload)}\r\n"
f"Connection: close\r\n"
f"\r\n"
)
writer.write(header.encode() + payload)
# -- route handlers ----------------------------------------------------------
def _handle_status(ctx: dict) -> tuple[int, dict]:
"""GET /status -- combined runtime summary."""
metrics: Metrics = ctx["metrics"]
data = {
"uptime": round(time.monotonic() - metrics.started, 1),
"connections": metrics.connections,
"success": metrics.success,
"failed": metrics.failed,
"active": metrics.active,
"bytes_in": metrics.bytes_in,
"bytes_out": metrics.bytes_out,
}
pool = ctx.get("pool")
if pool:
data["pool"] = {"alive": pool.alive_count, "total": pool.count}
config = ctx.get("config")
if config:
data["chain"] = [str(h) for h in config.chain]
return 200, data
def _handle_metrics(ctx: dict) -> tuple[int, dict]:
"""GET /metrics -- full metrics counters."""
return 200, ctx["metrics"].to_dict()
def _handle_pool(ctx: dict, alive_only: bool = False) -> tuple[int, dict]:
"""GET /pool or /pool/alive -- proxy pool state."""
pool = ctx.get("pool")
if not pool:
return 200, {"alive": 0, "total": 0, "proxies": {}}
proxies = {}
for key, entry in pool._proxies.items():
if alive_only and not entry.alive:
continue
proxies[key] = {
"alive": entry.alive,
"fails": entry.fails,
"tests": entry.tests,
"last_ok": entry.last_ok,
"last_test": entry.last_test,
"last_seen": entry.last_seen,
}
return 200, {
"alive": pool.alive_count,
"total": pool.count,
"proxies": proxies,
}
def _handle_config(ctx: dict) -> tuple[int, dict]:
"""GET /config -- sanitized runtime config."""
config = ctx.get("config")
if not config:
return 500, {"error": "config unavailable"}
data: dict = {
"listen": f"{config.listen_host}:{config.listen_port}",
"timeout": config.timeout,
"retries": config.retries,
"log_level": config.log_level,
"max_connections": config.max_connections,
"pool_size": config.pool_size,
"chain": [str(h) for h in config.chain],
}
if config.proxy_pool:
pp = config.proxy_pool
sources = []
for src in pp.sources:
s: dict = {}
if src.url:
s["url"] = src.url
if src.file:
s["file"] = src.file
sources.append(s)
data["proxy_pool"] = {
"sources": sources,
"refresh": pp.refresh,
"test_interval": pp.test_interval,
"max_fails": pp.max_fails,
}
return 200, data
async def _handle_reload(ctx: dict) -> tuple[int, dict]:
"""POST /reload -- re-read config (like SIGHUP)."""
reload_fn = ctx.get("reload_fn")
if not reload_fn:
return 500, {"error": "reload not available"}
try:
await reload_fn()
return 200, {"ok": True}
except Exception as e:
return 500, {"error": str(e)}
async def _handle_pool_test(ctx: dict) -> tuple[int, dict]:
"""POST /pool/test -- trigger immediate health test."""
pool = ctx.get("pool")
if not pool:
return 400, {"error": "no proxy pool configured"}
asyncio.create_task(pool._run_health_tests())
return 200, {"ok": True}
async def _handle_pool_refresh(ctx: dict) -> tuple[int, dict]:
"""POST /pool/refresh -- trigger immediate source re-fetch."""
pool = ctx.get("pool")
if not pool:
return 400, {"error": "no proxy pool configured"}
asyncio.create_task(pool._fetch_all_sources())
return 200, {"ok": True}
def _handle_tor(ctx: dict) -> tuple[int, dict]:
"""GET /tor -- Tor controller status."""
tor = ctx.get("tor")
if not tor:
return 200, {"enabled": False}
last = tor.last_newnym
return 200, {
"enabled": True,
"connected": tor.connected,
"last_newnym": round(time.monotonic() - last, 1) if last else None,
"newnym_interval": tor.newnym_interval,
}
async def _handle_tor_newnym(ctx: dict) -> tuple[int, dict]:
"""POST /tor/newnym -- trigger NEWNYM signal."""
tor = ctx.get("tor")
if not tor:
return 400, {"error": "tor control not configured"}
ok = await tor.newnym()
if ok:
return 200, {"ok": True}
return 200, {"ok": False, "reason": "rate-limited or not connected"}
# -- routing -----------------------------------------------------------------
_GET_ROUTES: dict[str, str] = {
"/status": "status",
"/metrics": "metrics",
"/pool": "pool",
"/pool/alive": "pool_alive",
"/config": "config",
"/tor": "tor",
}
_POST_ROUTES: dict[str, str] = {
"/reload": "reload",
"/pool/test": "pool_test",
"/pool/refresh": "pool_refresh",
"/tor/newnym": "tor_newnym",
}
async def _route(method: str, path: str, ctx: dict) -> tuple[int, dict]:
"""Dispatch request to the appropriate handler."""
if method == "GET" and path in _GET_ROUTES:
name = _GET_ROUTES[path]
if name == "status":
return _handle_status(ctx)
if name == "metrics":
return _handle_metrics(ctx)
if name == "pool":
return _handle_pool(ctx)
if name == "pool_alive":
return _handle_pool(ctx, alive_only=True)
if name == "config":
return _handle_config(ctx)
if name == "tor":
return _handle_tor(ctx)
if method == "POST" and path in _POST_ROUTES:
name = _POST_ROUTES[path]
if name == "reload":
return await _handle_reload(ctx)
if name == "pool_test":
return await _handle_pool_test(ctx)
if name == "pool_refresh":
return await _handle_pool_refresh(ctx)
if name == "tor_newnym":
return await _handle_tor_newnym(ctx)
# wrong method on a known path
if path in _GET_ROUTES or path in _POST_ROUTES:
expected = "GET" if path in _GET_ROUTES else "POST"
return 405, {"error": f"use {expected} for {path}"}
return 404, {"error": "not found"}
# -- server ------------------------------------------------------------------
async def _handle_connection(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
ctx: dict,
) -> None:
"""Handle a single HTTP request on the control API."""
try:
data = await asyncio.wait_for(reader.read(8192), timeout=5.0)
if not data:
return
method, path = _parse_request(data)
if not method:
_json_response(writer, 400, {"error": "bad request"})
await writer.drain()
return
status, body = await _route(method, path, ctx)
_json_response(writer, status, body)
await writer.drain()
except (TimeoutError, ConnectionError, OSError):
pass
except Exception:
logger.debug("api: unexpected error handling request", exc_info=True)
finally:
try:
writer.close()
await writer.wait_closed()
except OSError:
pass
async def start_api(
host: str,
port: int,
ctx: dict,
) -> asyncio.Server:
"""Start the control API HTTP server.
Args:
host: Bind address.
port: Bind port.
ctx: Shared context dict with config, metrics, pool, reload_fn.
Returns:
The running asyncio.Server (caller manages lifecycle).
"""
async def handler(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None:
await _handle_connection(r, w, ctx)
srv = await asyncio.start_server(handler, host, port)
addrs = ", ".join(str(s.getsockname()) for s in srv.sockets)
logger.info("api: listening on %s", addrs)
return srv

View File

@@ -54,10 +54,18 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
"-S", "--proxy-source", metavar="URL",
help="proxy source API URL",
)
p.add_argument(
"--api", metavar="[HOST:]PORT",
help="enable control API on address (e.g. 127.0.0.1:1081)",
)
p.add_argument(
"--cprofile", metavar="FILE", nargs="?", const="s5p.prof",
help="enable cProfile, dump stats to FILE (default: s5p.prof)",
)
p.add_argument(
"--tracemalloc", metavar="N", nargs="?", const=10, type=int,
help="enable tracemalloc, show top N allocators on exit (default: 10)",
)
return p.parse_args(argv)
@@ -89,6 +97,14 @@ def main(argv: list[str] | None = None) -> int:
if args.max_connections is not None:
config.max_connections = args.max_connections
if args.api:
if ":" in args.api:
host, port_str = args.api.rsplit(":", 1)
config.api_host = host
config.api_port = int(port_str)
else:
config.api_port = int(args.api)
if args.proxy_source:
config.proxy_pool = ProxyPoolConfig(
sources=[PoolSourceConfig(url=args.proxy_source)],
@@ -100,6 +116,11 @@ def main(argv: list[str] | None = None) -> int:
config.log_level = "error"
_setup_logging(config.log_level)
logger = logging.getLogger("s5p")
if args.tracemalloc:
import tracemalloc
tracemalloc.start()
try:
if args.cprofile:
@@ -111,13 +132,21 @@ def main(argv: list[str] | None = None) -> int:
finally:
prof.disable()
prof.dump_stats(args.cprofile)
logging.getLogger("s5p").info("profile saved to %s", args.cprofile)
logger.info("profile saved to %s", args.cprofile)
else:
asyncio.run(serve(config))
except KeyboardInterrupt:
return 0
except Exception as e:
logging.getLogger("s5p").error("%s", e)
logger.error("%s", e)
return 1
finally:
if args.tracemalloc:
import tracemalloc
snapshot = tracemalloc.take_snapshot()
stats = snapshot.statistics("lineno")
logger.info("tracemalloc: top %d allocations", args.tracemalloc)
for stat in stats[:args.tracemalloc]:
logger.info(" %s", stat)
return 0

View File

@@ -53,6 +53,17 @@ class ProxyPoolConfig:
report_url: str = ""
@dataclass
class TorConfig:
"""Tor control port configuration."""
control_host: str = "127.0.0.1"
control_port: int = 9051
password: str = ""
cookie_file: str = ""
newnym_interval: float = 0.0 # 0 = manual only
@dataclass
class Config:
"""Server configuration."""
@@ -66,7 +77,10 @@ class Config:
max_connections: int = 256
pool_size: int = 0
pool_max_idle: float = 30.0
api_host: str = ""
api_port: int = 0
proxy_pool: ProxyPoolConfig | None = None
tor: TorConfig | None = None
config_file: str = ""
@@ -148,6 +162,15 @@ def load_config(path: str | Path) -> Config:
if "pool_max_idle" in raw:
config.pool_max_idle = float(raw["pool_max_idle"])
if "api_listen" in raw:
api = raw["api_listen"]
if isinstance(api, str) and ":" in api:
host, port_str = api.rsplit(":", 1)
config.api_host = host
config.api_port = int(port_str)
elif isinstance(api, (str, int)):
config.api_port = int(api)
if "chain" in raw:
for entry in raw["chain"]:
if isinstance(entry, str):
@@ -207,4 +230,14 @@ def load_config(path: str | Path) -> Config:
refresh=refresh,
)
if "tor" in raw:
tor_raw = raw["tor"]
config.tor = TorConfig(
control_host=tor_raw.get("control_host", "127.0.0.1"),
control_port=int(tor_raw.get("control_port", 9051)),
password=tor_raw.get("password", ""),
cookie_file=tor_raw.get("cookie_file", ""),
newnym_interval=float(tor_raw.get("newnym_interval", 0)),
)
return config

View File

@@ -34,6 +34,19 @@ class Metrics:
f"up={h}h{m:02d}m{s:02d}s"
)
def to_dict(self) -> dict:
"""Return all counters as a dict (for JSON serialization)."""
return {
"connections": self.connections,
"success": self.success,
"failed": self.failed,
"retries": self.retries,
"active": self.active,
"bytes_in": self.bytes_in,
"bytes_out": self.bytes_out,
"uptime": round(time.monotonic() - self.started, 1),
}
def _human_bytes(n: int) -> str:
"""Format byte count in human-readable form."""

View File

@@ -8,11 +8,13 @@ import signal
import struct
import time
from .api import start_api
from .config import Config, load_config
from .connpool import FirstHopPool
from .metrics import Metrics
from .pool import ProxyPool
from .proto import ProtoError, Socks5Reply, build_chain, read_socks5_address
from .tor import TorController
logger = logging.getLogger("s5p")
@@ -235,6 +237,22 @@ async def serve(config: Config) -> None:
)
await hop_pool.start()
tor: TorController | None = None
if config.tor:
tc = config.tor
tor = TorController(
host=tc.control_host,
port=tc.control_port,
password=tc.password,
cookie_file=tc.cookie_file,
newnym_interval=tc.newnym_interval,
)
try:
await tor.start()
except (ConnectionError, OSError, TimeoutError) as e:
logger.warning("tor: control port unavailable: %s", e)
tor = None
sem = asyncio.Semaphore(config.max_connections)
async def on_client(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None:
@@ -259,6 +277,23 @@ async def serve(config: Config) -> None:
)
logger.info(" retries: %d", config.retries)
if tor:
extra = f", newnym every {tor.newnym_interval:.0f}s" if tor.newnym_interval else ""
logger.info(
" tor: control %s:%d%s", config.tor.control_host, config.tor.control_port, extra,
)
# -- control API ---------------------------------------------------------
api_srv: asyncio.Server | None = None
if config.api_port:
api_ctx: dict = {
"config": config,
"metrics": metrics,
"pool": proxy_pool,
"hop_pool": hop_pool,
"tor": tor,
}
# SIGHUP: hot-reload config (timeout, retries, log_level, pool settings)
async def _reload() -> None:
if not config.config_file:
@@ -289,6 +324,10 @@ async def serve(config: Config) -> None:
loop.add_signal_handler(signal.SIGHUP, _on_sighup)
if config.api_port:
api_ctx["reload_fn"] = _reload
api_srv = await start_api(config.api_host, config.api_port, api_ctx)
metrics_stop = asyncio.Event()
pool_ref = proxy_pool
metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, pool_ref))
@@ -296,6 +335,11 @@ async def serve(config: Config) -> None:
async with srv:
sig = await stop
logger.info("received %s, shutting down", signal.Signals(sig).name)
if api_srv:
api_srv.close()
await api_srv.wait_closed()
if tor:
await tor.stop()
if hop_pool:
await hop_pool.stop()
if proxy_pool:

204
src/s5p/tor.py Normal file
View File

@@ -0,0 +1,204 @@
"""Tor control port client with NEWNYM support."""
from __future__ import annotations
import asyncio
import logging
import time
logger = logging.getLogger("s5p")
_NEWNYM_MIN_INTERVAL = 10.0 # Tor enforces 10s between NEWNYMs
class TorController:
"""Async client for the Tor control protocol.
Supports password, cookie, and bare authentication. Provides NEWNYM
signaling (new circuit) on demand or on a periodic timer.
"""
def __init__(
self,
host: str = "127.0.0.1",
port: int = 9051,
password: str = "",
cookie_file: str = "",
newnym_interval: float = 0.0,
) -> None:
self._host = host
self._port = port
self._password = password
self._cookie_file = cookie_file
self._newnym_interval = newnym_interval
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
self._last_newnym: float = 0.0
self._stop = asyncio.Event()
self._tasks: list[asyncio.Task] = []
self._lock = asyncio.Lock()
# -- properties ----------------------------------------------------------
@property
def connected(self) -> bool:
"""True if the control connection is open."""
return self._writer is not None and not self._writer.is_closing()
@property
def last_newnym(self) -> float:
"""Monotonic timestamp of the last successful NEWNYM (0 if never)."""
return self._last_newnym
@property
def newnym_interval(self) -> float:
"""Periodic NEWNYM interval in seconds (0 = manual only)."""
return self._newnym_interval
# -- lifecycle -----------------------------------------------------------
async def start(self) -> None:
"""Connect, authenticate, and start optional newnym loop."""
await self._connect()
if self._newnym_interval > 0:
self._tasks.append(asyncio.create_task(self._newnym_loop()))
async def stop(self) -> None:
"""Cancel tasks and close the connection."""
self._stop.set()
for task in self._tasks:
task.cancel()
for task in self._tasks:
try:
await task
except asyncio.CancelledError:
pass
self._tasks.clear()
self._close()
# -- public commands -----------------------------------------------------
async def newnym(self) -> bool:
"""Send SIGNAL NEWNYM with client-side 10s rate limit.
Reconnects automatically if the connection was lost.
Returns True on success, False on rate-limit or failure.
"""
now = time.monotonic()
if self._last_newnym and (now - self._last_newnym) < _NEWNYM_MIN_INTERVAL:
return False
async with self._lock:
try:
if not self.connected:
await self._connect()
code, _ = await self._command("SIGNAL NEWNYM")
if code == 250:
self._last_newnym = time.monotonic()
logger.debug("tor: NEWNYM sent")
return True
logger.warning("tor: NEWNYM failed: %d", code)
return False
except (ConnectionError, OSError, TimeoutError) as e:
logger.warning("tor: NEWNYM error: %s", e)
self._close()
return False
async def get_info(self, keyword: str) -> str | None:
"""Send GETINFO and return the response value, or None on error."""
async with self._lock:
try:
if not self.connected:
await self._connect()
code, lines = await self._command(f"GETINFO {keyword}")
if code == 250 and lines:
# response format: "keyword=value"
for line in lines:
if "=" in line:
return line.split("=", 1)[1]
return None
except (ConnectionError, OSError, TimeoutError):
self._close()
return None
# -- internals -----------------------------------------------------------
async def _connect(self) -> None:
"""Open TCP connection and authenticate."""
self._close()
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self._host, self._port),
timeout=10.0,
)
await self._authenticate()
logger.info("tor: connected to %s:%d", self._host, self._port)
async def _authenticate(self) -> None:
"""Send AUTHENTICATE with configured credentials."""
if self._cookie_file:
try:
with open(self._cookie_file, "rb") as f:
cookie = f.read().hex()
cmd = f"AUTHENTICATE {cookie}"
except OSError as e:
self._close()
raise ConnectionError(f"cannot read cookie file: {e}") from e
elif self._password:
cmd = f'AUTHENTICATE "{self._password}"'
else:
cmd = "AUTHENTICATE"
code, _ = await self._command(cmd)
if code != 250:
self._close()
raise ConnectionError(f"tor auth failed: {code}")
async def _command(self, cmd: str) -> tuple[int, list[str]]:
"""Send a command and read the multi-line response.
Returns (status_code, [response_lines]).
"""
if not self._writer or not self._reader:
raise ConnectionError("not connected")
self._writer.write(f"{cmd}\r\n".encode())
await self._writer.drain()
lines: list[str] = []
while True:
raw = await asyncio.wait_for(self._reader.readline(), timeout=10.0)
if not raw:
raise ConnectionError("connection closed")
line = raw.decode("ascii", errors="replace").rstrip("\r\n")
if len(line) < 4:
raise ConnectionError(f"malformed response: {line!r}")
code = int(line[:3])
sep = line[3]
text = line[4:]
lines.append(text)
if sep == " ":
return code, lines
# sep == '-' means continuation
def _close(self) -> None:
"""Close TCP connection silently."""
if self._writer:
try:
self._writer.close()
except OSError:
pass
self._writer = None
self._reader = None
async def _newnym_loop(self) -> None:
"""Periodic NEWNYM on configured interval."""
while not self._stop.is_set():
try:
await asyncio.wait_for(
self._stop.wait(),
timeout=self._newnym_interval,
)
except TimeoutError:
pass
if not self._stop.is_set():
await self.newnym()

369
tests/test_api.py Normal file
View File

@@ -0,0 +1,369 @@
"""Tests for the control API module."""
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock
from s5p.api import (
_handle_config,
_handle_metrics,
_handle_pool,
_handle_status,
_handle_tor,
_handle_tor_newnym,
_json_response,
_parse_request,
_route,
)
from s5p.config import ChainHop, Config, PoolSourceConfig, ProxyPoolConfig
from s5p.metrics import Metrics
# -- request parsing ---------------------------------------------------------
class TestParseRequest:
"""Test HTTP request line parsing."""
def test_get(self):
assert _parse_request(b"GET /status HTTP/1.1\r\n") == ("GET", "/status")
def test_post(self):
assert _parse_request(b"POST /reload HTTP/1.1\r\n") == ("POST", "/reload")
def test_strips_query_string(self):
assert _parse_request(b"GET /pool?foo=bar HTTP/1.1\r\n") == ("GET", "/pool")
def test_method_uppercased(self):
assert _parse_request(b"get /metrics HTTP/1.1\r\n") == ("GET", "/metrics")
def test_empty(self):
assert _parse_request(b"") == ("", "")
def test_garbage(self):
assert _parse_request(b"\xff\xfe") == ("", "")
def test_incomplete(self):
assert _parse_request(b"GET\r\n") == ("", "")
# -- JSON response -----------------------------------------------------------
class TestJsonResponse:
"""Test HTTP JSON response builder."""
def test_format(self):
writer = MagicMock()
written = bytearray()
writer.write = lambda data: written.extend(data)
_json_response(writer, 200, {"ok": True})
text = written.decode()
assert "HTTP/1.1 200 OK\r\n" in text
assert "Content-Type: application/json\r\n" in text
assert "Connection: close\r\n" in text
# body after double newline
body = text.split("\r\n\r\n", 1)[1]
assert json.loads(body) == {"ok": True}
def test_404(self):
writer = MagicMock()
written = bytearray()
writer.write = lambda data: written.extend(data)
_json_response(writer, 404, {"error": "not found"})
text = written.decode()
assert "HTTP/1.1 404 Not Found\r\n" in text
# -- helpers -----------------------------------------------------------------
def _make_ctx(
config: Config | None = None,
pool: MagicMock | None = None,
tor: MagicMock | None = None,
) -> dict:
"""Build a mock context dict."""
return {
"config": config or Config(),
"metrics": Metrics(),
"pool": pool,
"hop_pool": None,
"tor": tor,
}
# -- GET handlers ------------------------------------------------------------
class TestHandleStatus:
"""Test GET /status handler."""
def test_basic(self):
ctx = _make_ctx()
ctx["metrics"].connections = 10
ctx["metrics"].success = 8
status, body = _handle_status(ctx)
assert status == 200
assert body["connections"] == 10
assert body["success"] == 8
assert "uptime" in body
def test_with_pool(self):
pool = MagicMock()
pool.alive_count = 5
pool.count = 10
ctx = _make_ctx(pool=pool)
_, body = _handle_status(ctx)
assert body["pool"] == {"alive": 5, "total": 10}
def test_with_chain(self):
config = Config(chain=[ChainHop("socks5", "127.0.0.1", 9050)])
ctx = _make_ctx(config=config)
_, body = _handle_status(ctx)
assert body["chain"] == ["socks5://127.0.0.1:9050"]
class TestHandleMetrics:
"""Test GET /metrics handler."""
def test_returns_dict(self):
ctx = _make_ctx()
ctx["metrics"].connections = 42
ctx["metrics"].bytes_in = 1024
status, body = _handle_metrics(ctx)
assert status == 200
assert body["connections"] == 42
assert body["bytes_in"] == 1024
assert "uptime" in body
class TestHandlePool:
"""Test GET /pool handler."""
def test_no_pool(self):
ctx = _make_ctx()
status, body = _handle_pool(ctx)
assert status == 200
assert body == {"alive": 0, "total": 0, "proxies": {}}
def test_with_entries(self):
pool = MagicMock()
pool.alive_count = 1
pool.count = 2
entry_alive = MagicMock(
alive=True, fails=0, tests=5,
last_ok=100.0, last_test=100.0, last_seen=100.0,
)
entry_dead = MagicMock(
alive=False, fails=3, tests=5,
last_ok=0.0, last_test=100.0, last_seen=100.0,
)
pool._proxies = {
"socks5://1.2.3.4:1080": entry_alive,
"socks5://5.6.7.8:1080": entry_dead,
}
ctx = _make_ctx(pool=pool)
_, body = _handle_pool(ctx)
assert len(body["proxies"]) == 2
assert body["proxies"]["socks5://1.2.3.4:1080"]["alive"] is True
def test_alive_only(self):
pool = MagicMock()
pool.alive_count = 1
pool.count = 2
entry_alive = MagicMock(
alive=True, fails=0, tests=5,
last_ok=100.0, last_test=100.0, last_seen=100.0,
)
entry_dead = MagicMock(
alive=False, fails=3, tests=5,
last_ok=0.0, last_test=100.0, last_seen=100.0,
)
pool._proxies = {
"socks5://1.2.3.4:1080": entry_alive,
"socks5://5.6.7.8:1080": entry_dead,
}
ctx = _make_ctx(pool=pool)
_, body = _handle_pool(ctx, alive_only=True)
assert len(body["proxies"]) == 1
assert "socks5://1.2.3.4:1080" in body["proxies"]
class TestHandleConfig:
"""Test GET /config handler."""
def test_basic(self):
config = Config(timeout=15.0, retries=5, log_level="debug")
ctx = _make_ctx(config=config)
status, body = _handle_config(ctx)
assert status == 200
assert body["timeout"] == 15.0
assert body["retries"] == 5
assert body["log_level"] == "debug"
def test_with_proxy_pool(self):
pp = ProxyPoolConfig(
sources=[PoolSourceConfig(url="http://api:8081/proxies")],
refresh=600.0,
test_interval=60.0,
max_fails=5,
)
config = Config(proxy_pool=pp)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert body["proxy_pool"]["refresh"] == 600.0
assert body["proxy_pool"]["sources"][0]["url"] == "http://api:8081/proxies"
# -- routing -----------------------------------------------------------------
class TestRouting:
"""Test request routing and error responses."""
def test_get_status(self):
ctx = _make_ctx()
status, body = asyncio.run(_route("GET", "/status", ctx))
assert status == 200
def test_get_metrics(self):
ctx = _make_ctx()
status, _ = asyncio.run(_route("GET", "/metrics", ctx))
assert status == 200
def test_get_pool(self):
ctx = _make_ctx()
status, _ = asyncio.run(_route("GET", "/pool", ctx))
assert status == 200
def test_get_pool_alive(self):
ctx = _make_ctx()
status, _ = asyncio.run(_route("GET", "/pool/alive", ctx))
assert status == 200
def test_get_config(self):
ctx = _make_ctx()
status, _ = asyncio.run(_route("GET", "/config", ctx))
assert status == 200
def test_post_reload(self):
ctx = _make_ctx()
ctx["reload_fn"] = AsyncMock()
status, body = asyncio.run(_route("POST", "/reload", ctx))
assert status == 200
assert body == {"ok": True}
def test_post_pool_test(self):
pool = MagicMock()
pool._run_health_tests = AsyncMock()
ctx = _make_ctx(pool=pool)
status, body = asyncio.run(_route("POST", "/pool/test", ctx))
assert status == 200
assert body == {"ok": True}
def test_post_pool_refresh(self):
pool = MagicMock()
pool._fetch_all_sources = AsyncMock()
ctx = _make_ctx(pool=pool)
status, body = asyncio.run(_route("POST", "/pool/refresh", ctx))
assert status == 200
assert body == {"ok": True}
def test_404(self):
ctx = _make_ctx()
status, body = asyncio.run(_route("GET", "/nonexistent", ctx))
assert status == 404
assert "error" in body
def test_405_wrong_method(self):
ctx = _make_ctx()
status, body = asyncio.run(_route("POST", "/status", ctx))
assert status == 405
assert "GET" in body["error"]
def test_405_get_on_post_route(self):
ctx = _make_ctx()
status, body = asyncio.run(_route("GET", "/reload", ctx))
assert status == 405
assert "POST" in body["error"]
def test_get_tor(self):
ctx = _make_ctx()
status, body = asyncio.run(_route("GET", "/tor", ctx))
assert status == 200
assert body == {"enabled": False}
def test_post_tor_newnym(self):
tor = MagicMock()
tor.newnym = AsyncMock(return_value=True)
ctx = _make_ctx(tor=tor)
status, body = asyncio.run(_route("POST", "/tor/newnym", ctx))
assert status == 200
assert body == {"ok": True}
# -- Tor endpoints ----------------------------------------------------------
class TestHandleTor:
"""Test GET /tor handler."""
def test_disabled(self):
ctx = _make_ctx()
status, body = _handle_tor(ctx)
assert status == 200
assert body == {"enabled": False}
def test_connected(self):
tor = MagicMock()
tor.connected = True
tor.last_newnym = 0.0
tor.newnym_interval = 60.0
ctx = _make_ctx(tor=tor)
status, body = _handle_tor(ctx)
assert status == 200
assert body["enabled"] is True
assert body["connected"] is True
assert body["last_newnym"] is None
assert body["newnym_interval"] == 60.0
def test_with_last_newnym(self):
import time
tor = MagicMock()
tor.connected = True
tor.last_newnym = time.monotonic() - 42.0
tor.newnym_interval = 0.0
ctx = _make_ctx(tor=tor)
status, body = _handle_tor(ctx)
assert status == 200
assert body["last_newnym"] is not None
assert body["last_newnym"] >= 42.0
class TestHandleTorNewnym:
"""Test POST /tor/newnym handler."""
def test_success(self):
tor = MagicMock()
tor.newnym = AsyncMock(return_value=True)
ctx = _make_ctx(tor=tor)
status, body = asyncio.run(_handle_tor_newnym(ctx))
assert status == 200
assert body == {"ok": True}
def test_rate_limited(self):
tor = MagicMock()
tor.newnym = AsyncMock(return_value=False)
ctx = _make_ctx(tor=tor)
status, body = asyncio.run(_handle_tor_newnym(ctx))
assert status == 200
assert body["ok"] is False
assert "reason" in body
def test_not_configured(self):
ctx = _make_ctx()
status, body = asyncio.run(_handle_tor_newnym(ctx))
assert status == 400
assert "error" in body

View File

@@ -141,3 +141,37 @@ class TestConfig:
c = load_config(cfg_file)
assert c.pool_size == 16
assert c.pool_max_idle == 45.0
def test_tor_config_from_yaml(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text(
"tor:\n"
" control_host: 10.0.0.1\n"
" control_port: 9151\n"
" password: secret\n"
" cookie_file: /var/run/tor/cookie\n"
" newnym_interval: 60\n"
)
c = load_config(cfg_file)
assert c.tor is not None
assert c.tor.control_host == "10.0.0.1"
assert c.tor.control_port == 9151
assert c.tor.password == "secret"
assert c.tor.cookie_file == "/var/run/tor/cookie"
assert c.tor.newnym_interval == 60.0
def test_tor_config_defaults(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text("tor:\n password: test\n")
c = load_config(cfg_file)
assert c.tor is not None
assert c.tor.control_host == "127.0.0.1"
assert c.tor.control_port == 9051
assert c.tor.cookie_file == ""
assert c.tor.newnym_interval == 0.0
def test_no_tor_config(self, tmp_path):
cfg_file = tmp_path / "test.yaml"
cfg_file.write_text("listen: 1080\n")
c = load_config(cfg_file)
assert c.tor is None

258
tests/test_tor.py Normal file
View File

@@ -0,0 +1,258 @@
"""Tests for the Tor control port client."""
import asyncio
from unittest.mock import AsyncMock, MagicMock, mock_open, patch
import pytest
from s5p.tor import TorController
# -- helpers -----------------------------------------------------------------
def _mock_reader(responses: list[bytes]) -> AsyncMock:
"""Create a mock StreamReader that yields canned lines."""
reader = AsyncMock(spec=asyncio.StreamReader)
reader.readline = AsyncMock(side_effect=responses)
return reader
def _mock_writer() -> MagicMock:
"""Create a mock StreamWriter."""
writer = MagicMock(spec=asyncio.StreamWriter)
writer.write = MagicMock()
writer.drain = AsyncMock()
writer.close = MagicMock()
writer.wait_closed = AsyncMock()
writer.is_closing = MagicMock(return_value=False)
return writer
# -- authentication ----------------------------------------------------------
class TestAuthentication:
"""Test Tor control port authentication modes."""
def test_password_auth(self):
reader = _mock_reader([b"250 OK\r\n"])
writer = _mock_writer()
async def run():
with patch("asyncio.open_connection", return_value=(reader, writer)):
tc = TorController(password="secret")
await tc._connect()
# verify AUTHENTICATE command was sent with password
calls = writer.write.call_args_list
assert any(b'AUTHENTICATE "secret"' in c[0][0] for c in calls)
tc._close()
asyncio.run(run())
def test_cookie_auth(self):
reader = _mock_reader([b"250 OK\r\n"])
writer = _mock_writer()
cookie_data = b"\xde\xad\xbe\xef"
async def run():
with patch("asyncio.open_connection", return_value=(reader, writer)):
with patch("builtins.open", mock_open(read_data=cookie_data)):
tc = TorController(cookie_file="/var/run/tor/control.authcookie")
await tc._connect()
calls = writer.write.call_args_list
assert any(b"AUTHENTICATE deadbeef" in c[0][0] for c in calls)
tc._close()
asyncio.run(run())
def test_bare_auth(self):
reader = _mock_reader([b"250 OK\r\n"])
writer = _mock_writer()
async def run():
with patch("asyncio.open_connection", return_value=(reader, writer)):
tc = TorController()
await tc._connect()
calls = writer.write.call_args_list
assert any(c[0][0] == b"AUTHENTICATE\r\n" for c in calls)
tc._close()
asyncio.run(run())
def test_auth_failure(self):
reader = _mock_reader([b"515 Bad authentication\r\n"])
writer = _mock_writer()
async def run():
with patch("asyncio.open_connection", return_value=(reader, writer)):
tc = TorController(password="wrong")
with pytest.raises(ConnectionError, match="auth failed"):
await tc._connect()
asyncio.run(run())
def test_connect_failure(self):
async def run():
with patch("asyncio.open_connection", side_effect=OSError("refused")):
tc = TorController()
with pytest.raises(OSError, match="refused"):
await tc._connect()
asyncio.run(run())
# -- NEWNYM ------------------------------------------------------------------
class TestNewnym:
"""Test NEWNYM signaling."""
def test_newnym_success(self):
# auth response + newnym response
reader = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"])
writer = _mock_writer()
async def run():
with patch("asyncio.open_connection", return_value=(reader, writer)):
tc = TorController()
await tc._connect()
ok = await tc.newnym()
assert ok is True
assert tc.last_newnym > 0
tc._close()
asyncio.run(run())
def test_newnym_rate_limited(self):
# auth + first newnym
reader = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"])
writer = _mock_writer()
async def run():
with patch("asyncio.open_connection", return_value=(reader, writer)):
tc = TorController()
await tc._connect()
ok = await tc.newnym()
assert ok is True
# immediate second call should be rate-limited
ok2 = await tc.newnym()
assert ok2 is False
tc._close()
asyncio.run(run())
def test_newnym_reconnects_on_disconnect(self):
# first connect auth, then reconnect auth + newnym
reader1 = _mock_reader([b"250 OK\r\n"])
writer1 = _mock_writer()
reader2 = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"])
writer2 = _mock_writer()
call_count = 0
async def fake_connect(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
return reader1, writer1
return reader2, writer2
async def run():
with patch("asyncio.open_connection", side_effect=fake_connect):
tc = TorController()
await tc._connect()
# simulate disconnect
tc._close()
assert not tc.connected
# newnym should reconnect
ok = await tc.newnym()
assert ok is True
tc._close()
asyncio.run(run())
# -- GETINFO -----------------------------------------------------------------
class TestGetInfo:
"""Test GETINFO command."""
def test_getinfo_version(self):
# auth + getinfo multi-line response
reader = _mock_reader([
b"250 OK\r\n",
b"250-version=0.4.8.12\r\n",
b"250 OK\r\n",
])
writer = _mock_writer()
async def run():
with patch("asyncio.open_connection", return_value=(reader, writer)):
tc = TorController()
await tc._connect()
version = await tc.get_info("version")
assert version == "0.4.8.12"
tc._close()
asyncio.run(run())
def test_getinfo_not_connected(self):
# auth for reconnect + getinfo
reader = _mock_reader([
b"250 OK\r\n",
b"250-traffic/read=12345\r\n",
b"250 OK\r\n",
])
writer = _mock_writer()
async def run():
with patch("asyncio.open_connection", return_value=(reader, writer)):
tc = TorController()
# not connected, should auto-connect
result = await tc.get_info("traffic/read")
assert result == "12345"
tc._close()
asyncio.run(run())
# -- lifecycle ---------------------------------------------------------------
class TestLifecycle:
"""Test start/stop lifecycle."""
def test_start_stop(self):
reader = _mock_reader([b"250 OK\r\n"])
writer = _mock_writer()
async def run():
with patch("asyncio.open_connection", return_value=(reader, writer)):
tc = TorController()
await tc.start()
assert tc.connected
await tc.stop()
assert not tc.connected
asyncio.run(run())
def test_start_with_newnym_loop(self):
reader = _mock_reader([b"250 OK\r\n"])
writer = _mock_writer()
async def run():
with patch("asyncio.open_connection", return_value=(reader, writer)):
tc = TorController(newnym_interval=60.0)
await tc.start()
assert len(tc._tasks) == 1
await tc.stop()
assert len(tc._tasks) == 0
asyncio.run(run())
def test_properties(self):
tc = TorController(newnym_interval=30.0)
assert not tc.connected
assert tc.last_newnym == 0.0
assert tc.newnym_interval == 30.0