diff --git a/PROJECT.md b/PROJECT.md index 0ef26f8..a94434e 100644 --- a/PROJECT.md +++ b/PROJECT.md @@ -25,6 +25,7 @@ Client -------> s5p -------> Hop 1 -------> Hop 2 -------> Target - **pool.py** -- managed proxy pool (multi-source, health-tested, persistent) - **http.py** -- minimal async HTTP/1.1 client (GET/POST JSON, no external deps) - **connpool.py** -- pre-warmed TCP connection pool to first chain hop +- **api.py** -- built-in HTTP control API (runtime metrics, pool state, config reload) - **cli.py** -- argparse CLI, logging setup, cProfile support - **metrics.py** -- connection counters and human-readable summary (lock-free, asyncio-only) @@ -66,3 +67,4 @@ All other functionality uses Python stdlib (`asyncio`, `socket`, `struct`). - **Connection semaphore** -- cap concurrent connections to prevent fd exhaustion - **Async HTTP** -- native asyncio HTTP client replaces blocking urllib, parallel fetches - **First-hop pool** -- pre-warmed TCP connections to chain[0], stale-evicted, auto-refilled +- **Control API** -- built-in asyncio HTTP server, no Flask/external deps, disabled by default diff --git a/README.md b/README.md index fe6337a..daddcfc 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ through configurable chains of SOCKS4, SOCKS5, and HTTP CONNECT proxies. - Supported hop protocols: SOCKS5, SOCKS4/4a, HTTP CONNECT - Per-hop authentication (username/password) - DNS leak prevention (domain names forwarded to proxies, never resolved locally) -- Tor integration (Tor is just another SOCKS5 hop) +- Tor integration (SOCKS5 hop + control port NEWNYM for circuit rotation) - Managed proxy pool: multiple sources (API + file), health-tested, weighted selection - Per-proxy failure backoff (60s cooldown), stale proxy expiry, chain pre-flight - Fast warm start (seconds on restart vs minutes on cold start) @@ -21,6 +21,7 @@ through configurable chains of SOCKS4, SOCKS5, and HTTP CONNECT proxies. - Concurrent connection limit with backpressure (`max_connections`) - Async HTTP client for proxy source fetching (parallel, no threads) - First-hop TCP connection pool (pre-warmed, stale-evicted) +- Built-in control API (runtime metrics, pool state, config reload via HTTP) - Container-ready (Alpine-based, podman/docker) - Graceful shutdown (SIGTERM/SIGINT) - Pure Python, asyncio-based, minimal dependencies @@ -70,10 +71,16 @@ timeout: 10 retries: 3 max_connections: 256 # concurrent connection limit pool_size: 8 # pre-warmed connections to first hop +api_listen: 127.0.0.1:1081 # control API (disabled by default) chain: - socks5://127.0.0.1:9050 # Tor +tor: + control_port: 9051 # Tor control port (NEWNYM) + password: "" # or cookie_file for auth + newnym_interval: 0 # periodic circuit rotation (0 = manual) + proxy_pool: sources: - url: http://10.200.1.250:8081/proxies @@ -89,7 +96,7 @@ proxy_pool: ## CLI Reference ``` -s5p [-c FILE] [-l [HOST:]PORT] [-C URL[,URL,...]] [-S URL] [-t SEC] [-r N] [-m N] [-v|-q] +s5p [-c FILE] [-l [HOST:]PORT] [-C URL[,URL,...]] [-S URL] [-t SEC] [-r N] [-m N] [--api [HOST:]PORT] [-v|-q] Options: -c, --config FILE YAML config file @@ -99,9 +106,11 @@ Options: -t, --timeout SEC Per-hop timeout (default: 10) -r, --retries N Max attempts per connection (default: 3, proxy_source only) -m, --max-connections N Max concurrent connections (default: 256) + --api [HOST:]PORT Enable control API (e.g. 127.0.0.1:1081) -v, --verbose Debug logging -q, --quiet Errors only --cprofile [FILE] Enable cProfile, dump to FILE (default: s5p.prof) + --tracemalloc [N] Enable tracemalloc, show top N allocators on exit (default: 10) -V, --version Show version ``` diff --git a/ROADMAP.md b/ROADMAP.md index a66082e..6ec6df6 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -22,8 +22,9 @@ ## v0.2.0 +- [x] Built-in control API (runtime metrics, pool state, config reload) - [ ] SOCKS5 server authentication (username/password) -- [ ] Tor control port integration (circuit renewal via NEWNYM) +- [x] Tor control port integration (circuit renewal via NEWNYM) - [ ] Metrics (connections/sec, bytes relayed, hop latency) ## v0.3.0 diff --git a/TASKS.md b/TASKS.md index 1154a94..a45ec1f 100644 --- a/TASKS.md +++ b/TASKS.md @@ -42,8 +42,10 @@ - [x] Instant warm start (trust cached state, defer all health tests) - [x] Register signal handlers before startup (fix SIGKILL on stop) - [x] Use k8s-file logging driver with rotation +- [x] Built-in control API (`api.py`, `--api`, `api_listen`) + +- [x] Tor control port integration (NEWNYM signaling, periodic rotation) ## Next - [ ] Integration tests with mock proxy server - [ ] SOCKS5 server-side authentication -- [ ] Tor control port integration diff --git a/config/example.yaml b/config/example.yaml index 88cccf3..b440c15 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -8,6 +8,7 @@ log_level: info # max_connections: 256 # max concurrent client connections (backpressure) # pool_size: 0 # pre-warmed TCP connections to first hop (0 = disabled) # pool_max_idle: 30 # max idle time (seconds) for pooled connections +# api_listen: 127.0.0.1:1081 # control API (disabled by default) # Proxy chain -- connections tunnel through each hop in order. # Supported protocols: socks5://, socks4://, http:// @@ -37,6 +38,15 @@ chain: # state_file: "" # empty = ~/.cache/s5p/pool.json # report_url: "" # POST dead proxies here (optional) +# Tor control port -- enables NEWNYM signaling (new circuit on demand). +# Requires Tor's ControlPort enabled (torrc: ControlPort 9051). +# tor: +# control_host: 127.0.0.1 +# control_port: 9051 +# password: "" # HashedControlPassword in torrc +# cookie_file: "" # CookieAuthentication file path +# newnym_interval: 0 # periodic NEWNYM (seconds, 0 = manual only) + # Legacy proxy source (still supported, auto-converts to proxy_pool): # proxy_source: # url: http://10.200.1.250:8081/proxies diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index a945474..e6db104 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -14,8 +14,11 @@ s5p -q # errors only s5p -S http://api:8081/proxies # proxy source API s5p -r 5 # retry up to 5 proxies s5p -m 512 # max concurrent connections +s5p --api 127.0.0.1:1081 # enable control API s5p --cprofile # profile to s5p.prof s5p --cprofile out.prof # profile to custom file +s5p --tracemalloc # memory profile (top 10) +s5p --tracemalloc 20 # memory profile (top 20) ``` ## Container @@ -58,6 +61,20 @@ proxy_pool: report_url: "" # POST dead proxies (optional) ``` +## Tor Control Port (config) + +```yaml +tor: + control_port: 9051 + password: "" # or cookie_file: /path/to/cookie + newnym_interval: 60 # auto-rotate every 60s (0 = manual) +``` + +```bash +curl -s http://127.0.0.1:1081/tor | jq . # status +curl -s -X POST http://127.0.0.1:1081/tor/newnym | jq . # new circuit +``` + ## Hot Reload ```bash @@ -84,6 +101,20 @@ http://host:port http://user:pass@host:port ``` +## Control API + +```bash +s5p --api 127.0.0.1:1081 -c config/s5p.yaml # enable API + +curl -s http://127.0.0.1:1081/status | jq . # runtime status +curl -s http://127.0.0.1:1081/metrics | jq . # full metrics +curl -s http://127.0.0.1:1081/pool | jq . # all proxies +curl -s http://127.0.0.1:1081/pool/alive | jq . # alive only +curl -s http://127.0.0.1:1081/config | jq . # current config +curl -s -X POST http://127.0.0.1:1081/reload # reload config +curl -s -X POST http://127.0.0.1:1081/pool/test # health test now +``` + ## Testing ```bash diff --git a/docs/USAGE.md b/docs/USAGE.md index f5f548e..e0e0356 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -46,6 +46,7 @@ log_level: info max_connections: 256 # concurrent connection limit (backpressure) pool_size: 0 # pre-warmed TCP connections to first hop (0 = disabled) pool_max_idle: 30 # max idle time for pooled connections (seconds) +api_listen: "" # control API bind address (empty = disabled) chain: - socks5://127.0.0.1:9050 @@ -216,6 +217,133 @@ other pool settings). The old `proxy_source` key is still supported and auto-converts to `proxy_pool` with a single API source. `proxy_pool` takes precedence if both are present. +## Control API + +Built-in HTTP API for runtime inspection and management. Disabled by default; +enable with `api_listen` in config or `--api` on the command line. + +```yaml +api_listen: 127.0.0.1:1081 +``` + +```bash +s5p --api 127.0.0.1:1081 -c config/s5p.yaml +``` + +### Read endpoints + +| Method | Path | Description | +|--------|------|-------------| +| `GET` | `/status` | Combined summary: uptime, metrics, pool stats, chain | +| `GET` | `/metrics` | Full metrics counters (connections, bytes, uptime) | +| `GET` | `/pool` | All proxies with per-entry state | +| `GET` | `/pool/alive` | Alive proxies only | +| `GET` | `/config` | Current runtime config (sanitized) | + +### Write endpoints + +| Method | Path | Description | +|--------|------|-------------| +| `POST` | `/reload` | Re-read config file (replaces SIGHUP) | +| `POST` | `/pool/test` | Trigger immediate health test cycle | +| `POST` | `/pool/refresh` | Trigger immediate source re-fetch | + +All responses are `application/json`. Errors return `{"error": "message"}` with +appropriate status code (400, 404, 405, 500). + +### Examples + +```bash +# Runtime status +curl -s http://127.0.0.1:1081/status | jq . + +# Full metrics +curl -s http://127.0.0.1:1081/metrics | jq . + +# Pool state (all proxies) +curl -s http://127.0.0.1:1081/pool | jq . + +# Alive proxies only +curl -s http://127.0.0.1:1081/pool/alive | jq '.proxies | length' + +# Current config +curl -s http://127.0.0.1:1081/config | jq . + +# Reload config (like SIGHUP) +curl -s -X POST http://127.0.0.1:1081/reload | jq . + +# Trigger health tests now +curl -s -X POST http://127.0.0.1:1081/pool/test | jq . + +# Re-fetch proxy sources now +curl -s -X POST http://127.0.0.1:1081/pool/refresh | jq . +``` + +Settings that require a restart: `listen`, `chain`, `pool_size`, `pool_max_idle`, `api_listen`. + +## Tor Control Port + +Optional integration with Tor's control protocol for circuit management. +When enabled, s5p connects to Tor's control port and can send NEWNYM signals +to request new circuits (new exit node) on demand or on a timer. + +### Configuration + +```yaml +tor: + control_host: 127.0.0.1 # Tor control address + control_port: 9051 # Tor control port + password: "" # HashedControlPassword (torrc) + cookie_file: "" # CookieAuthentication file path + newnym_interval: 0 # periodic NEWNYM in seconds (0 = manual only) +``` + +Requires Tor's `ControlPort` enabled in `torrc`: + +``` +ControlPort 9051 +HashedControlPassword 16:... # or CookieAuthentication 1 +``` + +### Authentication modes + +| Mode | Config | torrc | +|------|--------|-------| +| Password | `password: "secret"` | `HashedControlPassword 16:...` | +| Cookie | `cookie_file: /var/run/tor/control.authcookie` | `CookieAuthentication 1` | +| None | (leave both empty) | No auth configured | + +### Rate limiting + +Tor enforces a minimum 10-second interval between NEWNYM signals. s5p +applies the same client-side rate limit to avoid unnecessary rejections. + +### API endpoints + +| Method | Path | Description | +|--------|------|-------------| +| `GET` | `/tor` | Controller status (enabled, connected, last NEWNYM) | +| `POST` | `/tor/newnym` | Trigger NEWNYM signal (new circuit) | + +```bash +# Check tor controller status +curl -s http://127.0.0.1:1081/tor | jq . + +# Request new circuit +curl -s -X POST http://127.0.0.1:1081/tor/newnym | jq . +``` + +### Periodic NEWNYM + +Set `newnym_interval` to automatically rotate circuits: + +```yaml +tor: + newnym_interval: 60 # new circuit every 60 seconds +``` + +Values below 10 are clamped to Tor's minimum interval. + ## Connection Retry When a proxy pool is active, s5p retries failed connections with a different @@ -250,7 +378,7 @@ Settings reloaded on SIGHUP: | `max_connections` | Concurrent connection limit | | `proxy_pool.*` | Sources, intervals, thresholds | -Settings that require a restart: `listen`, `chain`, `pool_size`, `pool_max_idle`. +Settings that require a restart: `listen`, `chain`, `pool_size`, `pool_max_idle`, `api_listen`. Requires `-c` / `--config` to know which file to re-read. Without a config file, SIGHUP is ignored with a warning. @@ -291,6 +419,15 @@ s5p --cprofile output.prof -c config/s5p.yaml # Analyze after stopping python -m pstats s5p.prof + +# Memory profiling with tracemalloc (top 10 allocators on exit) +s5p --tracemalloc -c config/s5p.yaml + +# Show top 20 allocators +s5p --tracemalloc 20 -c config/s5p.yaml + +# Both profilers simultaneously +s5p --cprofile --tracemalloc -c config/s5p.yaml ``` ## Testing the Proxy diff --git a/src/s5p/api.py b/src/s5p/api.py new file mode 100644 index 0000000..b8b6c2d --- /dev/null +++ b/src/s5p/api.py @@ -0,0 +1,305 @@ +"""Built-in HTTP control API for runtime inspection and management.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import time + +from .metrics import Metrics + +logger = logging.getLogger("s5p") + + +# -- HTTP helpers ------------------------------------------------------------ + + +def _parse_request(data: bytes) -> tuple[str, str]: + """Extract method and path from an HTTP/1.1 request line. + + Returns (method, path) or ("", "") on parse failure. + """ + try: + line = data.split(b"\r\n", 1)[0].decode("ascii") + except (UnicodeDecodeError, IndexError): + return "", "" + parts = line.split(None, 2) + if len(parts) < 2: + return "", "" + return parts[0].upper(), parts[1].split("?", 1)[0] + + +def _json_response( + writer: asyncio.StreamWriter, + status: int, + body: dict | list, +) -> None: + """Write an HTTP response with JSON body and close.""" + phrases = {200: "OK", 400: "Bad Request", 404: "Not Found", + 405: "Method Not Allowed", 500: "Internal Server Error"} + payload = json.dumps(body, separators=(",", ":")).encode() + header = ( + f"HTTP/1.1 {status} {phrases.get(status, 'Error')}\r\n" + f"Content-Type: application/json\r\n" + f"Content-Length: {len(payload)}\r\n" + f"Connection: close\r\n" + f"\r\n" + ) + writer.write(header.encode() + payload) + + +# -- route handlers ---------------------------------------------------------- + + +def _handle_status(ctx: dict) -> tuple[int, dict]: + """GET /status -- combined runtime summary.""" + metrics: Metrics = ctx["metrics"] + data = { + "uptime": round(time.monotonic() - metrics.started, 1), + "connections": metrics.connections, + "success": metrics.success, + "failed": metrics.failed, + "active": metrics.active, + "bytes_in": metrics.bytes_in, + "bytes_out": metrics.bytes_out, + } + pool = ctx.get("pool") + if pool: + data["pool"] = {"alive": pool.alive_count, "total": pool.count} + config = ctx.get("config") + if config: + data["chain"] = [str(h) for h in config.chain] + return 200, data + + +def _handle_metrics(ctx: dict) -> tuple[int, dict]: + """GET /metrics -- full metrics counters.""" + return 200, ctx["metrics"].to_dict() + + +def _handle_pool(ctx: dict, alive_only: bool = False) -> tuple[int, dict]: + """GET /pool or /pool/alive -- proxy pool state.""" + pool = ctx.get("pool") + if not pool: + return 200, {"alive": 0, "total": 0, "proxies": {}} + + proxies = {} + for key, entry in pool._proxies.items(): + if alive_only and not entry.alive: + continue + proxies[key] = { + "alive": entry.alive, + "fails": entry.fails, + "tests": entry.tests, + "last_ok": entry.last_ok, + "last_test": entry.last_test, + "last_seen": entry.last_seen, + } + return 200, { + "alive": pool.alive_count, + "total": pool.count, + "proxies": proxies, + } + + +def _handle_config(ctx: dict) -> tuple[int, dict]: + """GET /config -- sanitized runtime config.""" + config = ctx.get("config") + if not config: + return 500, {"error": "config unavailable"} + + data: dict = { + "listen": f"{config.listen_host}:{config.listen_port}", + "timeout": config.timeout, + "retries": config.retries, + "log_level": config.log_level, + "max_connections": config.max_connections, + "pool_size": config.pool_size, + "chain": [str(h) for h in config.chain], + } + if config.proxy_pool: + pp = config.proxy_pool + sources = [] + for src in pp.sources: + s: dict = {} + if src.url: + s["url"] = src.url + if src.file: + s["file"] = src.file + sources.append(s) + data["proxy_pool"] = { + "sources": sources, + "refresh": pp.refresh, + "test_interval": pp.test_interval, + "max_fails": pp.max_fails, + } + return 200, data + + +async def _handle_reload(ctx: dict) -> tuple[int, dict]: + """POST /reload -- re-read config (like SIGHUP).""" + reload_fn = ctx.get("reload_fn") + if not reload_fn: + return 500, {"error": "reload not available"} + try: + await reload_fn() + return 200, {"ok": True} + except Exception as e: + return 500, {"error": str(e)} + + +async def _handle_pool_test(ctx: dict) -> tuple[int, dict]: + """POST /pool/test -- trigger immediate health test.""" + pool = ctx.get("pool") + if not pool: + return 400, {"error": "no proxy pool configured"} + asyncio.create_task(pool._run_health_tests()) + return 200, {"ok": True} + + +async def _handle_pool_refresh(ctx: dict) -> tuple[int, dict]: + """POST /pool/refresh -- trigger immediate source re-fetch.""" + pool = ctx.get("pool") + if not pool: + return 400, {"error": "no proxy pool configured"} + asyncio.create_task(pool._fetch_all_sources()) + return 200, {"ok": True} + + +def _handle_tor(ctx: dict) -> tuple[int, dict]: + """GET /tor -- Tor controller status.""" + tor = ctx.get("tor") + if not tor: + return 200, {"enabled": False} + last = tor.last_newnym + return 200, { + "enabled": True, + "connected": tor.connected, + "last_newnym": round(time.monotonic() - last, 1) if last else None, + "newnym_interval": tor.newnym_interval, + } + + +async def _handle_tor_newnym(ctx: dict) -> tuple[int, dict]: + """POST /tor/newnym -- trigger NEWNYM signal.""" + tor = ctx.get("tor") + if not tor: + return 400, {"error": "tor control not configured"} + ok = await tor.newnym() + if ok: + return 200, {"ok": True} + return 200, {"ok": False, "reason": "rate-limited or not connected"} + + +# -- routing ----------------------------------------------------------------- + +_GET_ROUTES: dict[str, str] = { + "/status": "status", + "/metrics": "metrics", + "/pool": "pool", + "/pool/alive": "pool_alive", + "/config": "config", + "/tor": "tor", +} + +_POST_ROUTES: dict[str, str] = { + "/reload": "reload", + "/pool/test": "pool_test", + "/pool/refresh": "pool_refresh", + "/tor/newnym": "tor_newnym", +} + + +async def _route(method: str, path: str, ctx: dict) -> tuple[int, dict]: + """Dispatch request to the appropriate handler.""" + if method == "GET" and path in _GET_ROUTES: + name = _GET_ROUTES[path] + if name == "status": + return _handle_status(ctx) + if name == "metrics": + return _handle_metrics(ctx) + if name == "pool": + return _handle_pool(ctx) + if name == "pool_alive": + return _handle_pool(ctx, alive_only=True) + if name == "config": + return _handle_config(ctx) + if name == "tor": + return _handle_tor(ctx) + + if method == "POST" and path in _POST_ROUTES: + name = _POST_ROUTES[path] + if name == "reload": + return await _handle_reload(ctx) + if name == "pool_test": + return await _handle_pool_test(ctx) + if name == "pool_refresh": + return await _handle_pool_refresh(ctx) + if name == "tor_newnym": + return await _handle_tor_newnym(ctx) + + # wrong method on a known path + if path in _GET_ROUTES or path in _POST_ROUTES: + expected = "GET" if path in _GET_ROUTES else "POST" + return 405, {"error": f"use {expected} for {path}"} + + return 404, {"error": "not found"} + + +# -- server ------------------------------------------------------------------ + + +async def _handle_connection( + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + ctx: dict, +) -> None: + """Handle a single HTTP request on the control API.""" + try: + data = await asyncio.wait_for(reader.read(8192), timeout=5.0) + if not data: + return + method, path = _parse_request(data) + if not method: + _json_response(writer, 400, {"error": "bad request"}) + await writer.drain() + return + + status, body = await _route(method, path, ctx) + _json_response(writer, status, body) + await writer.drain() + except (TimeoutError, ConnectionError, OSError): + pass + except Exception: + logger.debug("api: unexpected error handling request", exc_info=True) + finally: + try: + writer.close() + await writer.wait_closed() + except OSError: + pass + + +async def start_api( + host: str, + port: int, + ctx: dict, +) -> asyncio.Server: + """Start the control API HTTP server. + + Args: + host: Bind address. + port: Bind port. + ctx: Shared context dict with config, metrics, pool, reload_fn. + + Returns: + The running asyncio.Server (caller manages lifecycle). + """ + async def handler(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None: + await _handle_connection(r, w, ctx) + + srv = await asyncio.start_server(handler, host, port) + addrs = ", ".join(str(s.getsockname()) for s in srv.sockets) + logger.info("api: listening on %s", addrs) + return srv diff --git a/src/s5p/cli.py b/src/s5p/cli.py index 61df0be..6991aed 100644 --- a/src/s5p/cli.py +++ b/src/s5p/cli.py @@ -54,10 +54,18 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: "-S", "--proxy-source", metavar="URL", help="proxy source API URL", ) + p.add_argument( + "--api", metavar="[HOST:]PORT", + help="enable control API on address (e.g. 127.0.0.1:1081)", + ) p.add_argument( "--cprofile", metavar="FILE", nargs="?", const="s5p.prof", help="enable cProfile, dump stats to FILE (default: s5p.prof)", ) + p.add_argument( + "--tracemalloc", metavar="N", nargs="?", const=10, type=int, + help="enable tracemalloc, show top N allocators on exit (default: 10)", + ) return p.parse_args(argv) @@ -89,6 +97,14 @@ def main(argv: list[str] | None = None) -> int: if args.max_connections is not None: config.max_connections = args.max_connections + if args.api: + if ":" in args.api: + host, port_str = args.api.rsplit(":", 1) + config.api_host = host + config.api_port = int(port_str) + else: + config.api_port = int(args.api) + if args.proxy_source: config.proxy_pool = ProxyPoolConfig( sources=[PoolSourceConfig(url=args.proxy_source)], @@ -100,6 +116,11 @@ def main(argv: list[str] | None = None) -> int: config.log_level = "error" _setup_logging(config.log_level) + logger = logging.getLogger("s5p") + + if args.tracemalloc: + import tracemalloc + tracemalloc.start() try: if args.cprofile: @@ -111,13 +132,21 @@ def main(argv: list[str] | None = None) -> int: finally: prof.disable() prof.dump_stats(args.cprofile) - logging.getLogger("s5p").info("profile saved to %s", args.cprofile) + logger.info("profile saved to %s", args.cprofile) else: asyncio.run(serve(config)) except KeyboardInterrupt: return 0 except Exception as e: - logging.getLogger("s5p").error("%s", e) + logger.error("%s", e) return 1 + finally: + if args.tracemalloc: + import tracemalloc + snapshot = tracemalloc.take_snapshot() + stats = snapshot.statistics("lineno") + logger.info("tracemalloc: top %d allocations", args.tracemalloc) + for stat in stats[:args.tracemalloc]: + logger.info(" %s", stat) return 0 diff --git a/src/s5p/config.py b/src/s5p/config.py index 867b029..b6c8f2f 100644 --- a/src/s5p/config.py +++ b/src/s5p/config.py @@ -53,6 +53,17 @@ class ProxyPoolConfig: report_url: str = "" +@dataclass +class TorConfig: + """Tor control port configuration.""" + + control_host: str = "127.0.0.1" + control_port: int = 9051 + password: str = "" + cookie_file: str = "" + newnym_interval: float = 0.0 # 0 = manual only + + @dataclass class Config: """Server configuration.""" @@ -66,7 +77,10 @@ class Config: max_connections: int = 256 pool_size: int = 0 pool_max_idle: float = 30.0 + api_host: str = "" + api_port: int = 0 proxy_pool: ProxyPoolConfig | None = None + tor: TorConfig | None = None config_file: str = "" @@ -148,6 +162,15 @@ def load_config(path: str | Path) -> Config: if "pool_max_idle" in raw: config.pool_max_idle = float(raw["pool_max_idle"]) + if "api_listen" in raw: + api = raw["api_listen"] + if isinstance(api, str) and ":" in api: + host, port_str = api.rsplit(":", 1) + config.api_host = host + config.api_port = int(port_str) + elif isinstance(api, (str, int)): + config.api_port = int(api) + if "chain" in raw: for entry in raw["chain"]: if isinstance(entry, str): @@ -207,4 +230,14 @@ def load_config(path: str | Path) -> Config: refresh=refresh, ) + if "tor" in raw: + tor_raw = raw["tor"] + config.tor = TorConfig( + control_host=tor_raw.get("control_host", "127.0.0.1"), + control_port=int(tor_raw.get("control_port", 9051)), + password=tor_raw.get("password", ""), + cookie_file=tor_raw.get("cookie_file", ""), + newnym_interval=float(tor_raw.get("newnym_interval", 0)), + ) + return config diff --git a/src/s5p/metrics.py b/src/s5p/metrics.py index 8c97a85..01ea7c3 100644 --- a/src/s5p/metrics.py +++ b/src/s5p/metrics.py @@ -34,6 +34,19 @@ class Metrics: f"up={h}h{m:02d}m{s:02d}s" ) + def to_dict(self) -> dict: + """Return all counters as a dict (for JSON serialization).""" + return { + "connections": self.connections, + "success": self.success, + "failed": self.failed, + "retries": self.retries, + "active": self.active, + "bytes_in": self.bytes_in, + "bytes_out": self.bytes_out, + "uptime": round(time.monotonic() - self.started, 1), + } + def _human_bytes(n: int) -> str: """Format byte count in human-readable form.""" diff --git a/src/s5p/server.py b/src/s5p/server.py index 42df0cc..4a02e58 100644 --- a/src/s5p/server.py +++ b/src/s5p/server.py @@ -8,11 +8,13 @@ import signal import struct import time +from .api import start_api from .config import Config, load_config from .connpool import FirstHopPool from .metrics import Metrics from .pool import ProxyPool from .proto import ProtoError, Socks5Reply, build_chain, read_socks5_address +from .tor import TorController logger = logging.getLogger("s5p") @@ -235,6 +237,22 @@ async def serve(config: Config) -> None: ) await hop_pool.start() + tor: TorController | None = None + if config.tor: + tc = config.tor + tor = TorController( + host=tc.control_host, + port=tc.control_port, + password=tc.password, + cookie_file=tc.cookie_file, + newnym_interval=tc.newnym_interval, + ) + try: + await tor.start() + except (ConnectionError, OSError, TimeoutError) as e: + logger.warning("tor: control port unavailable: %s", e) + tor = None + sem = asyncio.Semaphore(config.max_connections) async def on_client(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None: @@ -259,6 +277,23 @@ async def serve(config: Config) -> None: ) logger.info(" retries: %d", config.retries) + if tor: + extra = f", newnym every {tor.newnym_interval:.0f}s" if tor.newnym_interval else "" + logger.info( + " tor: control %s:%d%s", config.tor.control_host, config.tor.control_port, extra, + ) + + # -- control API --------------------------------------------------------- + api_srv: asyncio.Server | None = None + if config.api_port: + api_ctx: dict = { + "config": config, + "metrics": metrics, + "pool": proxy_pool, + "hop_pool": hop_pool, + "tor": tor, + } + # SIGHUP: hot-reload config (timeout, retries, log_level, pool settings) async def _reload() -> None: if not config.config_file: @@ -289,6 +324,10 @@ async def serve(config: Config) -> None: loop.add_signal_handler(signal.SIGHUP, _on_sighup) + if config.api_port: + api_ctx["reload_fn"] = _reload + api_srv = await start_api(config.api_host, config.api_port, api_ctx) + metrics_stop = asyncio.Event() pool_ref = proxy_pool metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, pool_ref)) @@ -296,6 +335,11 @@ async def serve(config: Config) -> None: async with srv: sig = await stop logger.info("received %s, shutting down", signal.Signals(sig).name) + if api_srv: + api_srv.close() + await api_srv.wait_closed() + if tor: + await tor.stop() if hop_pool: await hop_pool.stop() if proxy_pool: diff --git a/src/s5p/tor.py b/src/s5p/tor.py new file mode 100644 index 0000000..b0bb473 --- /dev/null +++ b/src/s5p/tor.py @@ -0,0 +1,204 @@ +"""Tor control port client with NEWNYM support.""" + +from __future__ import annotations + +import asyncio +import logging +import time + +logger = logging.getLogger("s5p") + +_NEWNYM_MIN_INTERVAL = 10.0 # Tor enforces 10s between NEWNYMs + + +class TorController: + """Async client for the Tor control protocol. + + Supports password, cookie, and bare authentication. Provides NEWNYM + signaling (new circuit) on demand or on a periodic timer. + """ + + def __init__( + self, + host: str = "127.0.0.1", + port: int = 9051, + password: str = "", + cookie_file: str = "", + newnym_interval: float = 0.0, + ) -> None: + self._host = host + self._port = port + self._password = password + self._cookie_file = cookie_file + self._newnym_interval = newnym_interval + self._reader: asyncio.StreamReader | None = None + self._writer: asyncio.StreamWriter | None = None + self._last_newnym: float = 0.0 + self._stop = asyncio.Event() + self._tasks: list[asyncio.Task] = [] + self._lock = asyncio.Lock() + + # -- properties ---------------------------------------------------------- + + @property + def connected(self) -> bool: + """True if the control connection is open.""" + return self._writer is not None and not self._writer.is_closing() + + @property + def last_newnym(self) -> float: + """Monotonic timestamp of the last successful NEWNYM (0 if never).""" + return self._last_newnym + + @property + def newnym_interval(self) -> float: + """Periodic NEWNYM interval in seconds (0 = manual only).""" + return self._newnym_interval + + # -- lifecycle ----------------------------------------------------------- + + async def start(self) -> None: + """Connect, authenticate, and start optional newnym loop.""" + await self._connect() + if self._newnym_interval > 0: + self._tasks.append(asyncio.create_task(self._newnym_loop())) + + async def stop(self) -> None: + """Cancel tasks and close the connection.""" + self._stop.set() + for task in self._tasks: + task.cancel() + for task in self._tasks: + try: + await task + except asyncio.CancelledError: + pass + self._tasks.clear() + self._close() + + # -- public commands ----------------------------------------------------- + + async def newnym(self) -> bool: + """Send SIGNAL NEWNYM with client-side 10s rate limit. + + Reconnects automatically if the connection was lost. + Returns True on success, False on rate-limit or failure. + """ + now = time.monotonic() + if self._last_newnym and (now - self._last_newnym) < _NEWNYM_MIN_INTERVAL: + return False + + async with self._lock: + try: + if not self.connected: + await self._connect() + code, _ = await self._command("SIGNAL NEWNYM") + if code == 250: + self._last_newnym = time.monotonic() + logger.debug("tor: NEWNYM sent") + return True + logger.warning("tor: NEWNYM failed: %d", code) + return False + except (ConnectionError, OSError, TimeoutError) as e: + logger.warning("tor: NEWNYM error: %s", e) + self._close() + return False + + async def get_info(self, keyword: str) -> str | None: + """Send GETINFO and return the response value, or None on error.""" + async with self._lock: + try: + if not self.connected: + await self._connect() + code, lines = await self._command(f"GETINFO {keyword}") + if code == 250 and lines: + # response format: "keyword=value" + for line in lines: + if "=" in line: + return line.split("=", 1)[1] + return None + except (ConnectionError, OSError, TimeoutError): + self._close() + return None + + # -- internals ----------------------------------------------------------- + + async def _connect(self) -> None: + """Open TCP connection and authenticate.""" + self._close() + self._reader, self._writer = await asyncio.wait_for( + asyncio.open_connection(self._host, self._port), + timeout=10.0, + ) + await self._authenticate() + logger.info("tor: connected to %s:%d", self._host, self._port) + + async def _authenticate(self) -> None: + """Send AUTHENTICATE with configured credentials.""" + if self._cookie_file: + try: + with open(self._cookie_file, "rb") as f: + cookie = f.read().hex() + cmd = f"AUTHENTICATE {cookie}" + except OSError as e: + self._close() + raise ConnectionError(f"cannot read cookie file: {e}") from e + elif self._password: + cmd = f'AUTHENTICATE "{self._password}"' + else: + cmd = "AUTHENTICATE" + + code, _ = await self._command(cmd) + if code != 250: + self._close() + raise ConnectionError(f"tor auth failed: {code}") + + async def _command(self, cmd: str) -> tuple[int, list[str]]: + """Send a command and read the multi-line response. + + Returns (status_code, [response_lines]). + """ + if not self._writer or not self._reader: + raise ConnectionError("not connected") + + self._writer.write(f"{cmd}\r\n".encode()) + await self._writer.drain() + + lines: list[str] = [] + while True: + raw = await asyncio.wait_for(self._reader.readline(), timeout=10.0) + if not raw: + raise ConnectionError("connection closed") + line = raw.decode("ascii", errors="replace").rstrip("\r\n") + if len(line) < 4: + raise ConnectionError(f"malformed response: {line!r}") + code = int(line[:3]) + sep = line[3] + text = line[4:] + lines.append(text) + if sep == " ": + return code, lines + # sep == '-' means continuation + + def _close(self) -> None: + """Close TCP connection silently.""" + if self._writer: + try: + self._writer.close() + except OSError: + pass + self._writer = None + self._reader = None + + async def _newnym_loop(self) -> None: + """Periodic NEWNYM on configured interval.""" + while not self._stop.is_set(): + try: + await asyncio.wait_for( + self._stop.wait(), + timeout=self._newnym_interval, + ) + except TimeoutError: + pass + if not self._stop.is_set(): + await self.newnym() diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..fc94780 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,369 @@ +"""Tests for the control API module.""" + +import asyncio +import json +from unittest.mock import AsyncMock, MagicMock + +from s5p.api import ( + _handle_config, + _handle_metrics, + _handle_pool, + _handle_status, + _handle_tor, + _handle_tor_newnym, + _json_response, + _parse_request, + _route, +) +from s5p.config import ChainHop, Config, PoolSourceConfig, ProxyPoolConfig +from s5p.metrics import Metrics + +# -- request parsing --------------------------------------------------------- + + +class TestParseRequest: + """Test HTTP request line parsing.""" + + def test_get(self): + assert _parse_request(b"GET /status HTTP/1.1\r\n") == ("GET", "/status") + + def test_post(self): + assert _parse_request(b"POST /reload HTTP/1.1\r\n") == ("POST", "/reload") + + def test_strips_query_string(self): + assert _parse_request(b"GET /pool?foo=bar HTTP/1.1\r\n") == ("GET", "/pool") + + def test_method_uppercased(self): + assert _parse_request(b"get /metrics HTTP/1.1\r\n") == ("GET", "/metrics") + + def test_empty(self): + assert _parse_request(b"") == ("", "") + + def test_garbage(self): + assert _parse_request(b"\xff\xfe") == ("", "") + + def test_incomplete(self): + assert _parse_request(b"GET\r\n") == ("", "") + + +# -- JSON response ----------------------------------------------------------- + + +class TestJsonResponse: + """Test HTTP JSON response builder.""" + + def test_format(self): + writer = MagicMock() + written = bytearray() + writer.write = lambda data: written.extend(data) + + _json_response(writer, 200, {"ok": True}) + text = written.decode() + assert "HTTP/1.1 200 OK\r\n" in text + assert "Content-Type: application/json\r\n" in text + assert "Connection: close\r\n" in text + # body after double newline + body = text.split("\r\n\r\n", 1)[1] + assert json.loads(body) == {"ok": True} + + def test_404(self): + writer = MagicMock() + written = bytearray() + writer.write = lambda data: written.extend(data) + + _json_response(writer, 404, {"error": "not found"}) + text = written.decode() + assert "HTTP/1.1 404 Not Found\r\n" in text + + +# -- helpers ----------------------------------------------------------------- + + +def _make_ctx( + config: Config | None = None, + pool: MagicMock | None = None, + tor: MagicMock | None = None, +) -> dict: + """Build a mock context dict.""" + return { + "config": config or Config(), + "metrics": Metrics(), + "pool": pool, + "hop_pool": None, + "tor": tor, + } + + +# -- GET handlers ------------------------------------------------------------ + + +class TestHandleStatus: + """Test GET /status handler.""" + + def test_basic(self): + ctx = _make_ctx() + ctx["metrics"].connections = 10 + ctx["metrics"].success = 8 + status, body = _handle_status(ctx) + assert status == 200 + assert body["connections"] == 10 + assert body["success"] == 8 + assert "uptime" in body + + def test_with_pool(self): + pool = MagicMock() + pool.alive_count = 5 + pool.count = 10 + ctx = _make_ctx(pool=pool) + _, body = _handle_status(ctx) + assert body["pool"] == {"alive": 5, "total": 10} + + def test_with_chain(self): + config = Config(chain=[ChainHop("socks5", "127.0.0.1", 9050)]) + ctx = _make_ctx(config=config) + _, body = _handle_status(ctx) + assert body["chain"] == ["socks5://127.0.0.1:9050"] + + +class TestHandleMetrics: + """Test GET /metrics handler.""" + + def test_returns_dict(self): + ctx = _make_ctx() + ctx["metrics"].connections = 42 + ctx["metrics"].bytes_in = 1024 + status, body = _handle_metrics(ctx) + assert status == 200 + assert body["connections"] == 42 + assert body["bytes_in"] == 1024 + assert "uptime" in body + + +class TestHandlePool: + """Test GET /pool handler.""" + + def test_no_pool(self): + ctx = _make_ctx() + status, body = _handle_pool(ctx) + assert status == 200 + assert body == {"alive": 0, "total": 0, "proxies": {}} + + def test_with_entries(self): + pool = MagicMock() + pool.alive_count = 1 + pool.count = 2 + entry_alive = MagicMock( + alive=True, fails=0, tests=5, + last_ok=100.0, last_test=100.0, last_seen=100.0, + ) + entry_dead = MagicMock( + alive=False, fails=3, tests=5, + last_ok=0.0, last_test=100.0, last_seen=100.0, + ) + pool._proxies = { + "socks5://1.2.3.4:1080": entry_alive, + "socks5://5.6.7.8:1080": entry_dead, + } + ctx = _make_ctx(pool=pool) + _, body = _handle_pool(ctx) + assert len(body["proxies"]) == 2 + assert body["proxies"]["socks5://1.2.3.4:1080"]["alive"] is True + + def test_alive_only(self): + pool = MagicMock() + pool.alive_count = 1 + pool.count = 2 + entry_alive = MagicMock( + alive=True, fails=0, tests=5, + last_ok=100.0, last_test=100.0, last_seen=100.0, + ) + entry_dead = MagicMock( + alive=False, fails=3, tests=5, + last_ok=0.0, last_test=100.0, last_seen=100.0, + ) + pool._proxies = { + "socks5://1.2.3.4:1080": entry_alive, + "socks5://5.6.7.8:1080": entry_dead, + } + ctx = _make_ctx(pool=pool) + _, body = _handle_pool(ctx, alive_only=True) + assert len(body["proxies"]) == 1 + assert "socks5://1.2.3.4:1080" in body["proxies"] + + +class TestHandleConfig: + """Test GET /config handler.""" + + def test_basic(self): + config = Config(timeout=15.0, retries=5, log_level="debug") + ctx = _make_ctx(config=config) + status, body = _handle_config(ctx) + assert status == 200 + assert body["timeout"] == 15.0 + assert body["retries"] == 5 + assert body["log_level"] == "debug" + + def test_with_proxy_pool(self): + pp = ProxyPoolConfig( + sources=[PoolSourceConfig(url="http://api:8081/proxies")], + refresh=600.0, + test_interval=60.0, + max_fails=5, + ) + config = Config(proxy_pool=pp) + ctx = _make_ctx(config=config) + _, body = _handle_config(ctx) + assert body["proxy_pool"]["refresh"] == 600.0 + assert body["proxy_pool"]["sources"][0]["url"] == "http://api:8081/proxies" + + +# -- routing ----------------------------------------------------------------- + + +class TestRouting: + """Test request routing and error responses.""" + + def test_get_status(self): + ctx = _make_ctx() + status, body = asyncio.run(_route("GET", "/status", ctx)) + assert status == 200 + + def test_get_metrics(self): + ctx = _make_ctx() + status, _ = asyncio.run(_route("GET", "/metrics", ctx)) + assert status == 200 + + def test_get_pool(self): + ctx = _make_ctx() + status, _ = asyncio.run(_route("GET", "/pool", ctx)) + assert status == 200 + + def test_get_pool_alive(self): + ctx = _make_ctx() + status, _ = asyncio.run(_route("GET", "/pool/alive", ctx)) + assert status == 200 + + def test_get_config(self): + ctx = _make_ctx() + status, _ = asyncio.run(_route("GET", "/config", ctx)) + assert status == 200 + + def test_post_reload(self): + ctx = _make_ctx() + ctx["reload_fn"] = AsyncMock() + status, body = asyncio.run(_route("POST", "/reload", ctx)) + assert status == 200 + assert body == {"ok": True} + + def test_post_pool_test(self): + pool = MagicMock() + pool._run_health_tests = AsyncMock() + ctx = _make_ctx(pool=pool) + status, body = asyncio.run(_route("POST", "/pool/test", ctx)) + assert status == 200 + assert body == {"ok": True} + + def test_post_pool_refresh(self): + pool = MagicMock() + pool._fetch_all_sources = AsyncMock() + ctx = _make_ctx(pool=pool) + status, body = asyncio.run(_route("POST", "/pool/refresh", ctx)) + assert status == 200 + assert body == {"ok": True} + + def test_404(self): + ctx = _make_ctx() + status, body = asyncio.run(_route("GET", "/nonexistent", ctx)) + assert status == 404 + assert "error" in body + + def test_405_wrong_method(self): + ctx = _make_ctx() + status, body = asyncio.run(_route("POST", "/status", ctx)) + assert status == 405 + assert "GET" in body["error"] + + def test_405_get_on_post_route(self): + ctx = _make_ctx() + status, body = asyncio.run(_route("GET", "/reload", ctx)) + assert status == 405 + assert "POST" in body["error"] + + def test_get_tor(self): + ctx = _make_ctx() + status, body = asyncio.run(_route("GET", "/tor", ctx)) + assert status == 200 + assert body == {"enabled": False} + + def test_post_tor_newnym(self): + tor = MagicMock() + tor.newnym = AsyncMock(return_value=True) + ctx = _make_ctx(tor=tor) + status, body = asyncio.run(_route("POST", "/tor/newnym", ctx)) + assert status == 200 + assert body == {"ok": True} + + +# -- Tor endpoints ---------------------------------------------------------- + + +class TestHandleTor: + """Test GET /tor handler.""" + + def test_disabled(self): + ctx = _make_ctx() + status, body = _handle_tor(ctx) + assert status == 200 + assert body == {"enabled": False} + + def test_connected(self): + tor = MagicMock() + tor.connected = True + tor.last_newnym = 0.0 + tor.newnym_interval = 60.0 + ctx = _make_ctx(tor=tor) + status, body = _handle_tor(ctx) + assert status == 200 + assert body["enabled"] is True + assert body["connected"] is True + assert body["last_newnym"] is None + assert body["newnym_interval"] == 60.0 + + def test_with_last_newnym(self): + import time + tor = MagicMock() + tor.connected = True + tor.last_newnym = time.monotonic() - 42.0 + tor.newnym_interval = 0.0 + ctx = _make_ctx(tor=tor) + status, body = _handle_tor(ctx) + assert status == 200 + assert body["last_newnym"] is not None + assert body["last_newnym"] >= 42.0 + + +class TestHandleTorNewnym: + """Test POST /tor/newnym handler.""" + + def test_success(self): + tor = MagicMock() + tor.newnym = AsyncMock(return_value=True) + ctx = _make_ctx(tor=tor) + status, body = asyncio.run(_handle_tor_newnym(ctx)) + assert status == 200 + assert body == {"ok": True} + + def test_rate_limited(self): + tor = MagicMock() + tor.newnym = AsyncMock(return_value=False) + ctx = _make_ctx(tor=tor) + status, body = asyncio.run(_handle_tor_newnym(ctx)) + assert status == 200 + assert body["ok"] is False + assert "reason" in body + + def test_not_configured(self): + ctx = _make_ctx() + status, body = asyncio.run(_handle_tor_newnym(ctx)) + assert status == 400 + assert "error" in body diff --git a/tests/test_config.py b/tests/test_config.py index b3c82b4..89256f7 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -141,3 +141,37 @@ class TestConfig: c = load_config(cfg_file) assert c.pool_size == 16 assert c.pool_max_idle == 45.0 + + def test_tor_config_from_yaml(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "tor:\n" + " control_host: 10.0.0.1\n" + " control_port: 9151\n" + " password: secret\n" + " cookie_file: /var/run/tor/cookie\n" + " newnym_interval: 60\n" + ) + c = load_config(cfg_file) + assert c.tor is not None + assert c.tor.control_host == "10.0.0.1" + assert c.tor.control_port == 9151 + assert c.tor.password == "secret" + assert c.tor.cookie_file == "/var/run/tor/cookie" + assert c.tor.newnym_interval == 60.0 + + def test_tor_config_defaults(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text("tor:\n password: test\n") + c = load_config(cfg_file) + assert c.tor is not None + assert c.tor.control_host == "127.0.0.1" + assert c.tor.control_port == 9051 + assert c.tor.cookie_file == "" + assert c.tor.newnym_interval == 0.0 + + def test_no_tor_config(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text("listen: 1080\n") + c = load_config(cfg_file) + assert c.tor is None diff --git a/tests/test_tor.py b/tests/test_tor.py new file mode 100644 index 0000000..b2200b6 --- /dev/null +++ b/tests/test_tor.py @@ -0,0 +1,258 @@ +"""Tests for the Tor control port client.""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock, mock_open, patch + +import pytest + +from s5p.tor import TorController + +# -- helpers ----------------------------------------------------------------- + + +def _mock_reader(responses: list[bytes]) -> AsyncMock: + """Create a mock StreamReader that yields canned lines.""" + reader = AsyncMock(spec=asyncio.StreamReader) + reader.readline = AsyncMock(side_effect=responses) + return reader + + +def _mock_writer() -> MagicMock: + """Create a mock StreamWriter.""" + writer = MagicMock(spec=asyncio.StreamWriter) + writer.write = MagicMock() + writer.drain = AsyncMock() + writer.close = MagicMock() + writer.wait_closed = AsyncMock() + writer.is_closing = MagicMock(return_value=False) + return writer + + +# -- authentication ---------------------------------------------------------- + + +class TestAuthentication: + """Test Tor control port authentication modes.""" + + def test_password_auth(self): + reader = _mock_reader([b"250 OK\r\n"]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController(password="secret") + await tc._connect() + # verify AUTHENTICATE command was sent with password + calls = writer.write.call_args_list + assert any(b'AUTHENTICATE "secret"' in c[0][0] for c in calls) + tc._close() + + asyncio.run(run()) + + def test_cookie_auth(self): + reader = _mock_reader([b"250 OK\r\n"]) + writer = _mock_writer() + cookie_data = b"\xde\xad\xbe\xef" + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + with patch("builtins.open", mock_open(read_data=cookie_data)): + tc = TorController(cookie_file="/var/run/tor/control.authcookie") + await tc._connect() + calls = writer.write.call_args_list + assert any(b"AUTHENTICATE deadbeef" in c[0][0] for c in calls) + tc._close() + + asyncio.run(run()) + + def test_bare_auth(self): + reader = _mock_reader([b"250 OK\r\n"]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController() + await tc._connect() + calls = writer.write.call_args_list + assert any(c[0][0] == b"AUTHENTICATE\r\n" for c in calls) + tc._close() + + asyncio.run(run()) + + def test_auth_failure(self): + reader = _mock_reader([b"515 Bad authentication\r\n"]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController(password="wrong") + with pytest.raises(ConnectionError, match="auth failed"): + await tc._connect() + + asyncio.run(run()) + + def test_connect_failure(self): + async def run(): + with patch("asyncio.open_connection", side_effect=OSError("refused")): + tc = TorController() + with pytest.raises(OSError, match="refused"): + await tc._connect() + + asyncio.run(run()) + + +# -- NEWNYM ------------------------------------------------------------------ + + +class TestNewnym: + """Test NEWNYM signaling.""" + + def test_newnym_success(self): + # auth response + newnym response + reader = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController() + await tc._connect() + ok = await tc.newnym() + assert ok is True + assert tc.last_newnym > 0 + tc._close() + + asyncio.run(run()) + + def test_newnym_rate_limited(self): + # auth + first newnym + reader = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController() + await tc._connect() + ok = await tc.newnym() + assert ok is True + # immediate second call should be rate-limited + ok2 = await tc.newnym() + assert ok2 is False + tc._close() + + asyncio.run(run()) + + def test_newnym_reconnects_on_disconnect(self): + # first connect auth, then reconnect auth + newnym + reader1 = _mock_reader([b"250 OK\r\n"]) + writer1 = _mock_writer() + reader2 = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"]) + writer2 = _mock_writer() + + call_count = 0 + + async def fake_connect(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count == 1: + return reader1, writer1 + return reader2, writer2 + + async def run(): + with patch("asyncio.open_connection", side_effect=fake_connect): + tc = TorController() + await tc._connect() + # simulate disconnect + tc._close() + assert not tc.connected + # newnym should reconnect + ok = await tc.newnym() + assert ok is True + tc._close() + + asyncio.run(run()) + + +# -- GETINFO ----------------------------------------------------------------- + + +class TestGetInfo: + """Test GETINFO command.""" + + def test_getinfo_version(self): + # auth + getinfo multi-line response + reader = _mock_reader([ + b"250 OK\r\n", + b"250-version=0.4.8.12\r\n", + b"250 OK\r\n", + ]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController() + await tc._connect() + version = await tc.get_info("version") + assert version == "0.4.8.12" + tc._close() + + asyncio.run(run()) + + def test_getinfo_not_connected(self): + # auth for reconnect + getinfo + reader = _mock_reader([ + b"250 OK\r\n", + b"250-traffic/read=12345\r\n", + b"250 OK\r\n", + ]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController() + # not connected, should auto-connect + result = await tc.get_info("traffic/read") + assert result == "12345" + tc._close() + + asyncio.run(run()) + + +# -- lifecycle --------------------------------------------------------------- + + +class TestLifecycle: + """Test start/stop lifecycle.""" + + def test_start_stop(self): + reader = _mock_reader([b"250 OK\r\n"]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController() + await tc.start() + assert tc.connected + await tc.stop() + assert not tc.connected + + asyncio.run(run()) + + def test_start_with_newnym_loop(self): + reader = _mock_reader([b"250 OK\r\n"]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController(newnym_interval=60.0) + await tc.start() + assert len(tc._tasks) == 1 + await tc.stop() + assert len(tc._tasks) == 0 + + asyncio.run(run()) + + def test_properties(self): + tc = TorController(newnym_interval=30.0) + assert not tc.connected + assert tc.last_newnym == 0.0 + assert tc.newnym_interval == 30.0