From ecf9a840e43c315a6daa325baaf8f6ec906ce3a4 Mon Sep 17 00:00:00 2001 From: user Date: Mon, 16 Feb 2026 19:02:43 +0100 Subject: [PATCH 1/9] feat: add control API module Lightweight asyncio HTTP handler for runtime inspection and management. Endpoints: /status, /metrics, /pool, /pool/alive, /config (GET) and /reload, /pool/test, /pool/refresh (POST). Raw HTTP/1.1 parsing, JSON responses, no new dependencies. Co-Authored-By: Claude Opus 4.6 --- src/s5p/api.py | 274 +++++++++++++++++++++++++++++++++++++++++++++ src/s5p/metrics.py | 13 +++ 2 files changed, 287 insertions(+) create mode 100644 src/s5p/api.py diff --git a/src/s5p/api.py b/src/s5p/api.py new file mode 100644 index 0000000..fb0d40e --- /dev/null +++ b/src/s5p/api.py @@ -0,0 +1,274 @@ +"""Built-in HTTP control API for runtime inspection and management.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import time + +from .metrics import Metrics + +logger = logging.getLogger("s5p") + + +# -- HTTP helpers ------------------------------------------------------------ + + +def _parse_request(data: bytes) -> tuple[str, str]: + """Extract method and path from an HTTP/1.1 request line. + + Returns (method, path) or ("", "") on parse failure. + """ + try: + line = data.split(b"\r\n", 1)[0].decode("ascii") + except (UnicodeDecodeError, IndexError): + return "", "" + parts = line.split(None, 2) + if len(parts) < 2: + return "", "" + return parts[0].upper(), parts[1].split("?", 1)[0] + + +def _json_response( + writer: asyncio.StreamWriter, + status: int, + body: dict | list, +) -> None: + """Write an HTTP response with JSON body and close.""" + phrases = {200: "OK", 400: "Bad Request", 404: "Not Found", + 405: "Method Not Allowed", 500: "Internal Server Error"} + payload = json.dumps(body, separators=(",", ":")).encode() + header = ( + f"HTTP/1.1 {status} {phrases.get(status, 'Error')}\r\n" + f"Content-Type: application/json\r\n" + f"Content-Length: {len(payload)}\r\n" + f"Connection: close\r\n" + f"\r\n" + ) + writer.write(header.encode() + payload) + + +# -- route handlers ---------------------------------------------------------- + + +def _handle_status(ctx: dict) -> tuple[int, dict]: + """GET /status -- combined runtime summary.""" + metrics: Metrics = ctx["metrics"] + data = { + "uptime": round(time.monotonic() - metrics.started, 1), + "connections": metrics.connections, + "success": metrics.success, + "failed": metrics.failed, + "active": metrics.active, + "bytes_in": metrics.bytes_in, + "bytes_out": metrics.bytes_out, + } + pool = ctx.get("pool") + if pool: + data["pool"] = {"alive": pool.alive_count, "total": pool.count} + config = ctx.get("config") + if config: + data["chain"] = [str(h) for h in config.chain] + return 200, data + + +def _handle_metrics(ctx: dict) -> tuple[int, dict]: + """GET /metrics -- full metrics counters.""" + return 200, ctx["metrics"].to_dict() + + +def _handle_pool(ctx: dict, alive_only: bool = False) -> tuple[int, dict]: + """GET /pool or /pool/alive -- proxy pool state.""" + pool = ctx.get("pool") + if not pool: + return 200, {"alive": 0, "total": 0, "proxies": {}} + + proxies = {} + for key, entry in pool._proxies.items(): + if alive_only and not entry.alive: + continue + proxies[key] = { + "alive": entry.alive, + "fails": entry.fails, + "tests": entry.tests, + "last_ok": entry.last_ok, + "last_test": entry.last_test, + "last_seen": entry.last_seen, + } + return 200, { + "alive": pool.alive_count, + "total": pool.count, + "proxies": proxies, + } + + +def _handle_config(ctx: dict) -> tuple[int, dict]: + """GET /config -- sanitized runtime config.""" + config = ctx.get("config") + if not config: + return 500, {"error": "config unavailable"} + + data: dict = { + "listen": f"{config.listen_host}:{config.listen_port}", + "timeout": config.timeout, + "retries": config.retries, + "log_level": config.log_level, + "max_connections": config.max_connections, + "pool_size": config.pool_size, + "chain": [str(h) for h in config.chain], + } + if config.proxy_pool: + pp = config.proxy_pool + sources = [] + for src in pp.sources: + s: dict = {} + if src.url: + s["url"] = src.url + if src.file: + s["file"] = src.file + sources.append(s) + data["proxy_pool"] = { + "sources": sources, + "refresh": pp.refresh, + "test_interval": pp.test_interval, + "max_fails": pp.max_fails, + } + return 200, data + + +async def _handle_reload(ctx: dict) -> tuple[int, dict]: + """POST /reload -- re-read config (like SIGHUP).""" + reload_fn = ctx.get("reload_fn") + if not reload_fn: + return 500, {"error": "reload not available"} + try: + await reload_fn() + return 200, {"ok": True} + except Exception as e: + return 500, {"error": str(e)} + + +async def _handle_pool_test(ctx: dict) -> tuple[int, dict]: + """POST /pool/test -- trigger immediate health test.""" + pool = ctx.get("pool") + if not pool: + return 400, {"error": "no proxy pool configured"} + asyncio.create_task(pool._run_health_tests()) + return 200, {"ok": True} + + +async def _handle_pool_refresh(ctx: dict) -> tuple[int, dict]: + """POST /pool/refresh -- trigger immediate source re-fetch.""" + pool = ctx.get("pool") + if not pool: + return 400, {"error": "no proxy pool configured"} + asyncio.create_task(pool._fetch_all_sources()) + return 200, {"ok": True} + + +# -- routing ----------------------------------------------------------------- + +_GET_ROUTES: dict[str, str] = { + "/status": "status", + "/metrics": "metrics", + "/pool": "pool", + "/pool/alive": "pool_alive", + "/config": "config", +} + +_POST_ROUTES: dict[str, str] = { + "/reload": "reload", + "/pool/test": "pool_test", + "/pool/refresh": "pool_refresh", +} + + +async def _route(method: str, path: str, ctx: dict) -> tuple[int, dict]: + """Dispatch request to the appropriate handler.""" + if method == "GET" and path in _GET_ROUTES: + name = _GET_ROUTES[path] + if name == "status": + return _handle_status(ctx) + if name == "metrics": + return _handle_metrics(ctx) + if name == "pool": + return _handle_pool(ctx) + if name == "pool_alive": + return _handle_pool(ctx, alive_only=True) + if name == "config": + return _handle_config(ctx) + + if method == "POST" and path in _POST_ROUTES: + name = _POST_ROUTES[path] + if name == "reload": + return await _handle_reload(ctx) + if name == "pool_test": + return await _handle_pool_test(ctx) + if name == "pool_refresh": + return await _handle_pool_refresh(ctx) + + # wrong method on a known path + if path in _GET_ROUTES or path in _POST_ROUTES: + expected = "GET" if path in _GET_ROUTES else "POST" + return 405, {"error": f"use {expected} for {path}"} + + return 404, {"error": "not found"} + + +# -- server ------------------------------------------------------------------ + + +async def _handle_connection( + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + ctx: dict, +) -> None: + """Handle a single HTTP request on the control API.""" + try: + data = await asyncio.wait_for(reader.read(8192), timeout=5.0) + if not data: + return + method, path = _parse_request(data) + if not method: + _json_response(writer, 400, {"error": "bad request"}) + await writer.drain() + return + + status, body = await _route(method, path, ctx) + _json_response(writer, status, body) + await writer.drain() + except (TimeoutError, ConnectionError, OSError): + pass + except Exception: + logger.debug("api: unexpected error handling request", exc_info=True) + finally: + try: + writer.close() + await writer.wait_closed() + except OSError: + pass + + +async def start_api( + host: str, + port: int, + ctx: dict, +) -> asyncio.Server: + """Start the control API HTTP server. + + Args: + host: Bind address. + port: Bind port. + ctx: Shared context dict with config, metrics, pool, reload_fn. + + Returns: + The running asyncio.Server (caller manages lifecycle). + """ + async def handler(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None: + await _handle_connection(r, w, ctx) + + srv = await asyncio.start_server(handler, host, port) + addrs = ", ".join(str(s.getsockname()) for s in srv.sockets) + logger.info("api: listening on %s", addrs) + return srv diff --git a/src/s5p/metrics.py b/src/s5p/metrics.py index 8c97a85..01ea7c3 100644 --- a/src/s5p/metrics.py +++ b/src/s5p/metrics.py @@ -34,6 +34,19 @@ class Metrics: f"up={h}h{m:02d}m{s:02d}s" ) + def to_dict(self) -> dict: + """Return all counters as a dict (for JSON serialization).""" + return { + "connections": self.connections, + "success": self.success, + "failed": self.failed, + "retries": self.retries, + "active": self.active, + "bytes_in": self.bytes_in, + "bytes_out": self.bytes_out, + "uptime": round(time.monotonic() - self.started, 1), + } + def _human_bytes(n: int) -> str: """Format byte count in human-readable form.""" -- 2.49.1 From b72d083f56be3d714a0d833cd07ecbc1691f568c Mon Sep 17 00:00:00 2001 From: user Date: Mon, 16 Feb 2026 19:03:44 +0100 Subject: [PATCH 2/9] feat: wire control API into server and config Add api_host/api_port to Config dataclass, parse api_listen key in load_config(), add --api [HOST:]PORT CLI flag. Start/stop API server in serve() alongside the SOCKS5 listener. Co-Authored-By: Claude Opus 4.6 --- src/s5p/cli.py | 12 ++++++++++++ src/s5p/config.py | 11 +++++++++++ src/s5p/server.py | 18 ++++++++++++++++++ 3 files changed, 41 insertions(+) diff --git a/src/s5p/cli.py b/src/s5p/cli.py index 61df0be..718d705 100644 --- a/src/s5p/cli.py +++ b/src/s5p/cli.py @@ -54,6 +54,10 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: "-S", "--proxy-source", metavar="URL", help="proxy source API URL", ) + p.add_argument( + "--api", metavar="[HOST:]PORT", + help="enable control API on address (e.g. 127.0.0.1:1081)", + ) p.add_argument( "--cprofile", metavar="FILE", nargs="?", const="s5p.prof", help="enable cProfile, dump stats to FILE (default: s5p.prof)", @@ -89,6 +93,14 @@ def main(argv: list[str] | None = None) -> int: if args.max_connections is not None: config.max_connections = args.max_connections + if args.api: + if ":" in args.api: + host, port_str = args.api.rsplit(":", 1) + config.api_host = host + config.api_port = int(port_str) + else: + config.api_port = int(args.api) + if args.proxy_source: config.proxy_pool = ProxyPoolConfig( sources=[PoolSourceConfig(url=args.proxy_source)], diff --git a/src/s5p/config.py b/src/s5p/config.py index 867b029..157f865 100644 --- a/src/s5p/config.py +++ b/src/s5p/config.py @@ -66,6 +66,8 @@ class Config: max_connections: int = 256 pool_size: int = 0 pool_max_idle: float = 30.0 + api_host: str = "" + api_port: int = 0 proxy_pool: ProxyPoolConfig | None = None config_file: str = "" @@ -148,6 +150,15 @@ def load_config(path: str | Path) -> Config: if "pool_max_idle" in raw: config.pool_max_idle = float(raw["pool_max_idle"]) + if "api_listen" in raw: + api = raw["api_listen"] + if isinstance(api, str) and ":" in api: + host, port_str = api.rsplit(":", 1) + config.api_host = host + config.api_port = int(port_str) + elif isinstance(api, (str, int)): + config.api_port = int(api) + if "chain" in raw: for entry in raw["chain"]: if isinstance(entry, str): diff --git a/src/s5p/server.py b/src/s5p/server.py index 42df0cc..9eb98cd 100644 --- a/src/s5p/server.py +++ b/src/s5p/server.py @@ -8,6 +8,7 @@ import signal import struct import time +from .api import start_api from .config import Config, load_config from .connpool import FirstHopPool from .metrics import Metrics @@ -259,6 +260,16 @@ async def serve(config: Config) -> None: ) logger.info(" retries: %d", config.retries) + # -- control API --------------------------------------------------------- + api_srv: asyncio.Server | None = None + if config.api_port: + api_ctx: dict = { + "config": config, + "metrics": metrics, + "pool": proxy_pool, + "hop_pool": hop_pool, + } + # SIGHUP: hot-reload config (timeout, retries, log_level, pool settings) async def _reload() -> None: if not config.config_file: @@ -289,6 +300,10 @@ async def serve(config: Config) -> None: loop.add_signal_handler(signal.SIGHUP, _on_sighup) + if config.api_port: + api_ctx["reload_fn"] = _reload + api_srv = await start_api(config.api_host, config.api_port, api_ctx) + metrics_stop = asyncio.Event() pool_ref = proxy_pool metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, pool_ref)) @@ -296,6 +311,9 @@ async def serve(config: Config) -> None: async with srv: sig = await stop logger.info("received %s, shutting down", signal.Signals(sig).name) + if api_srv: + api_srv.close() + await api_srv.wait_closed() if hop_pool: await hop_pool.stop() if proxy_pool: -- 2.49.1 From 4ee2cf5bb08c4f4922250977bfeda58d45673f0a Mon Sep 17 00:00:00 2001 From: user Date: Mon, 16 Feb 2026 19:05:28 +0100 Subject: [PATCH 3/9] test: add control API tests 29 tests covering request parsing, JSON response format, all GET/POST handlers with mock context, 404/405 error routing. Co-Authored-By: Claude Opus 4.6 --- tests/test_api.py | 286 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 286 insertions(+) create mode 100644 tests/test_api.py diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..bab6683 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,286 @@ +"""Tests for the control API module.""" + +import asyncio +import json +from unittest.mock import AsyncMock, MagicMock + +from s5p.api import ( + _handle_config, + _handle_metrics, + _handle_pool, + _handle_status, + _json_response, + _parse_request, + _route, +) +from s5p.config import ChainHop, Config, PoolSourceConfig, ProxyPoolConfig +from s5p.metrics import Metrics + +# -- request parsing --------------------------------------------------------- + + +class TestParseRequest: + """Test HTTP request line parsing.""" + + def test_get(self): + assert _parse_request(b"GET /status HTTP/1.1\r\n") == ("GET", "/status") + + def test_post(self): + assert _parse_request(b"POST /reload HTTP/1.1\r\n") == ("POST", "/reload") + + def test_strips_query_string(self): + assert _parse_request(b"GET /pool?foo=bar HTTP/1.1\r\n") == ("GET", "/pool") + + def test_method_uppercased(self): + assert _parse_request(b"get /metrics HTTP/1.1\r\n") == ("GET", "/metrics") + + def test_empty(self): + assert _parse_request(b"") == ("", "") + + def test_garbage(self): + assert _parse_request(b"\xff\xfe") == ("", "") + + def test_incomplete(self): + assert _parse_request(b"GET\r\n") == ("", "") + + +# -- JSON response ----------------------------------------------------------- + + +class TestJsonResponse: + """Test HTTP JSON response builder.""" + + def test_format(self): + writer = MagicMock() + written = bytearray() + writer.write = lambda data: written.extend(data) + + _json_response(writer, 200, {"ok": True}) + text = written.decode() + assert "HTTP/1.1 200 OK\r\n" in text + assert "Content-Type: application/json\r\n" in text + assert "Connection: close\r\n" in text + # body after double newline + body = text.split("\r\n\r\n", 1)[1] + assert json.loads(body) == {"ok": True} + + def test_404(self): + writer = MagicMock() + written = bytearray() + writer.write = lambda data: written.extend(data) + + _json_response(writer, 404, {"error": "not found"}) + text = written.decode() + assert "HTTP/1.1 404 Not Found\r\n" in text + + +# -- helpers ----------------------------------------------------------------- + + +def _make_ctx( + config: Config | None = None, + pool: MagicMock | None = None, +) -> dict: + """Build a mock context dict.""" + return { + "config": config or Config(), + "metrics": Metrics(), + "pool": pool, + "hop_pool": None, + } + + +# -- GET handlers ------------------------------------------------------------ + + +class TestHandleStatus: + """Test GET /status handler.""" + + def test_basic(self): + ctx = _make_ctx() + ctx["metrics"].connections = 10 + ctx["metrics"].success = 8 + status, body = _handle_status(ctx) + assert status == 200 + assert body["connections"] == 10 + assert body["success"] == 8 + assert "uptime" in body + + def test_with_pool(self): + pool = MagicMock() + pool.alive_count = 5 + pool.count = 10 + ctx = _make_ctx(pool=pool) + _, body = _handle_status(ctx) + assert body["pool"] == {"alive": 5, "total": 10} + + def test_with_chain(self): + config = Config(chain=[ChainHop("socks5", "127.0.0.1", 9050)]) + ctx = _make_ctx(config=config) + _, body = _handle_status(ctx) + assert body["chain"] == ["socks5://127.0.0.1:9050"] + + +class TestHandleMetrics: + """Test GET /metrics handler.""" + + def test_returns_dict(self): + ctx = _make_ctx() + ctx["metrics"].connections = 42 + ctx["metrics"].bytes_in = 1024 + status, body = _handle_metrics(ctx) + assert status == 200 + assert body["connections"] == 42 + assert body["bytes_in"] == 1024 + assert "uptime" in body + + +class TestHandlePool: + """Test GET /pool handler.""" + + def test_no_pool(self): + ctx = _make_ctx() + status, body = _handle_pool(ctx) + assert status == 200 + assert body == {"alive": 0, "total": 0, "proxies": {}} + + def test_with_entries(self): + pool = MagicMock() + pool.alive_count = 1 + pool.count = 2 + entry_alive = MagicMock( + alive=True, fails=0, tests=5, + last_ok=100.0, last_test=100.0, last_seen=100.0, + ) + entry_dead = MagicMock( + alive=False, fails=3, tests=5, + last_ok=0.0, last_test=100.0, last_seen=100.0, + ) + pool._proxies = { + "socks5://1.2.3.4:1080": entry_alive, + "socks5://5.6.7.8:1080": entry_dead, + } + ctx = _make_ctx(pool=pool) + _, body = _handle_pool(ctx) + assert len(body["proxies"]) == 2 + assert body["proxies"]["socks5://1.2.3.4:1080"]["alive"] is True + + def test_alive_only(self): + pool = MagicMock() + pool.alive_count = 1 + pool.count = 2 + entry_alive = MagicMock( + alive=True, fails=0, tests=5, + last_ok=100.0, last_test=100.0, last_seen=100.0, + ) + entry_dead = MagicMock( + alive=False, fails=3, tests=5, + last_ok=0.0, last_test=100.0, last_seen=100.0, + ) + pool._proxies = { + "socks5://1.2.3.4:1080": entry_alive, + "socks5://5.6.7.8:1080": entry_dead, + } + ctx = _make_ctx(pool=pool) + _, body = _handle_pool(ctx, alive_only=True) + assert len(body["proxies"]) == 1 + assert "socks5://1.2.3.4:1080" in body["proxies"] + + +class TestHandleConfig: + """Test GET /config handler.""" + + def test_basic(self): + config = Config(timeout=15.0, retries=5, log_level="debug") + ctx = _make_ctx(config=config) + status, body = _handle_config(ctx) + assert status == 200 + assert body["timeout"] == 15.0 + assert body["retries"] == 5 + assert body["log_level"] == "debug" + + def test_with_proxy_pool(self): + pp = ProxyPoolConfig( + sources=[PoolSourceConfig(url="http://api:8081/proxies")], + refresh=600.0, + test_interval=60.0, + max_fails=5, + ) + config = Config(proxy_pool=pp) + ctx = _make_ctx(config=config) + _, body = _handle_config(ctx) + assert body["proxy_pool"]["refresh"] == 600.0 + assert body["proxy_pool"]["sources"][0]["url"] == "http://api:8081/proxies" + + +# -- routing ----------------------------------------------------------------- + + +class TestRouting: + """Test request routing and error responses.""" + + def test_get_status(self): + ctx = _make_ctx() + status, body = asyncio.run(_route("GET", "/status", ctx)) + assert status == 200 + + def test_get_metrics(self): + ctx = _make_ctx() + status, _ = asyncio.run(_route("GET", "/metrics", ctx)) + assert status == 200 + + def test_get_pool(self): + ctx = _make_ctx() + status, _ = asyncio.run(_route("GET", "/pool", ctx)) + assert status == 200 + + def test_get_pool_alive(self): + ctx = _make_ctx() + status, _ = asyncio.run(_route("GET", "/pool/alive", ctx)) + assert status == 200 + + def test_get_config(self): + ctx = _make_ctx() + status, _ = asyncio.run(_route("GET", "/config", ctx)) + assert status == 200 + + def test_post_reload(self): + ctx = _make_ctx() + ctx["reload_fn"] = AsyncMock() + status, body = asyncio.run(_route("POST", "/reload", ctx)) + assert status == 200 + assert body == {"ok": True} + + def test_post_pool_test(self): + pool = MagicMock() + pool._run_health_tests = AsyncMock() + ctx = _make_ctx(pool=pool) + status, body = asyncio.run(_route("POST", "/pool/test", ctx)) + assert status == 200 + assert body == {"ok": True} + + def test_post_pool_refresh(self): + pool = MagicMock() + pool._fetch_all_sources = AsyncMock() + ctx = _make_ctx(pool=pool) + status, body = asyncio.run(_route("POST", "/pool/refresh", ctx)) + assert status == 200 + assert body == {"ok": True} + + def test_404(self): + ctx = _make_ctx() + status, body = asyncio.run(_route("GET", "/nonexistent", ctx)) + assert status == 404 + assert "error" in body + + def test_405_wrong_method(self): + ctx = _make_ctx() + status, body = asyncio.run(_route("POST", "/status", ctx)) + assert status == 405 + assert "GET" in body["error"] + + def test_405_get_on_post_route(self): + ctx = _make_ctx() + status, body = asyncio.run(_route("GET", "/reload", ctx)) + assert status == 405 + assert "POST" in body["error"] -- 2.49.1 From c939101a739ffae173bf3b5339698d896a165150 Mon Sep 17 00:00:00 2001 From: user Date: Mon, 16 Feb 2026 19:07:10 +0100 Subject: [PATCH 4/9] docs: document control API Add API section to README (features, CLI, config), PROJECT (architecture), USAGE (full endpoint reference with examples), CHEATSHEET (curl one-liners). Update TASKS and ROADMAP. Co-Authored-By: Claude Opus 4.6 --- PROJECT.md | 2 ++ README.md | 5 +++- ROADMAP.md | 1 + TASKS.md | 1 + docs/CHEATSHEET.md | 15 +++++++++++ docs/USAGE.md | 67 +++++++++++++++++++++++++++++++++++++++++++++- 6 files changed, 89 insertions(+), 2 deletions(-) diff --git a/PROJECT.md b/PROJECT.md index 0ef26f8..a94434e 100644 --- a/PROJECT.md +++ b/PROJECT.md @@ -25,6 +25,7 @@ Client -------> s5p -------> Hop 1 -------> Hop 2 -------> Target - **pool.py** -- managed proxy pool (multi-source, health-tested, persistent) - **http.py** -- minimal async HTTP/1.1 client (GET/POST JSON, no external deps) - **connpool.py** -- pre-warmed TCP connection pool to first chain hop +- **api.py** -- built-in HTTP control API (runtime metrics, pool state, config reload) - **cli.py** -- argparse CLI, logging setup, cProfile support - **metrics.py** -- connection counters and human-readable summary (lock-free, asyncio-only) @@ -66,3 +67,4 @@ All other functionality uses Python stdlib (`asyncio`, `socket`, `struct`). - **Connection semaphore** -- cap concurrent connections to prevent fd exhaustion - **Async HTTP** -- native asyncio HTTP client replaces blocking urllib, parallel fetches - **First-hop pool** -- pre-warmed TCP connections to chain[0], stale-evicted, auto-refilled +- **Control API** -- built-in asyncio HTTP server, no Flask/external deps, disabled by default diff --git a/README.md b/README.md index fe6337a..d5ecbe0 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ through configurable chains of SOCKS4, SOCKS5, and HTTP CONNECT proxies. - Concurrent connection limit with backpressure (`max_connections`) - Async HTTP client for proxy source fetching (parallel, no threads) - First-hop TCP connection pool (pre-warmed, stale-evicted) +- Built-in control API (runtime metrics, pool state, config reload via HTTP) - Container-ready (Alpine-based, podman/docker) - Graceful shutdown (SIGTERM/SIGINT) - Pure Python, asyncio-based, minimal dependencies @@ -70,6 +71,7 @@ timeout: 10 retries: 3 max_connections: 256 # concurrent connection limit pool_size: 8 # pre-warmed connections to first hop +api_listen: 127.0.0.1:1081 # control API (disabled by default) chain: - socks5://127.0.0.1:9050 # Tor @@ -89,7 +91,7 @@ proxy_pool: ## CLI Reference ``` -s5p [-c FILE] [-l [HOST:]PORT] [-C URL[,URL,...]] [-S URL] [-t SEC] [-r N] [-m N] [-v|-q] +s5p [-c FILE] [-l [HOST:]PORT] [-C URL[,URL,...]] [-S URL] [-t SEC] [-r N] [-m N] [--api [HOST:]PORT] [-v|-q] Options: -c, --config FILE YAML config file @@ -99,6 +101,7 @@ Options: -t, --timeout SEC Per-hop timeout (default: 10) -r, --retries N Max attempts per connection (default: 3, proxy_source only) -m, --max-connections N Max concurrent connections (default: 256) + --api [HOST:]PORT Enable control API (e.g. 127.0.0.1:1081) -v, --verbose Debug logging -q, --quiet Errors only --cprofile [FILE] Enable cProfile, dump to FILE (default: s5p.prof) diff --git a/ROADMAP.md b/ROADMAP.md index a66082e..ac5f329 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -22,6 +22,7 @@ ## v0.2.0 +- [x] Built-in control API (runtime metrics, pool state, config reload) - [ ] SOCKS5 server authentication (username/password) - [ ] Tor control port integration (circuit renewal via NEWNYM) - [ ] Metrics (connections/sec, bytes relayed, hop latency) diff --git a/TASKS.md b/TASKS.md index 1154a94..3bb79c2 100644 --- a/TASKS.md +++ b/TASKS.md @@ -42,6 +42,7 @@ - [x] Instant warm start (trust cached state, defer all health tests) - [x] Register signal handlers before startup (fix SIGKILL on stop) - [x] Use k8s-file logging driver with rotation +- [x] Built-in control API (`api.py`, `--api`, `api_listen`) ## Next - [ ] Integration tests with mock proxy server diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index a945474..bff5a6d 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -14,6 +14,7 @@ s5p -q # errors only s5p -S http://api:8081/proxies # proxy source API s5p -r 5 # retry up to 5 proxies s5p -m 512 # max concurrent connections +s5p --api 127.0.0.1:1081 # enable control API s5p --cprofile # profile to s5p.prof s5p --cprofile out.prof # profile to custom file ``` @@ -84,6 +85,20 @@ http://host:port http://user:pass@host:port ``` +## Control API + +```bash +s5p --api 127.0.0.1:1081 -c config/s5p.yaml # enable API + +curl -s http://127.0.0.1:1081/status | jq . # runtime status +curl -s http://127.0.0.1:1081/metrics | jq . # full metrics +curl -s http://127.0.0.1:1081/pool | jq . # all proxies +curl -s http://127.0.0.1:1081/pool/alive | jq . # alive only +curl -s http://127.0.0.1:1081/config | jq . # current config +curl -s -X POST http://127.0.0.1:1081/reload # reload config +curl -s -X POST http://127.0.0.1:1081/pool/test # health test now +``` + ## Testing ```bash diff --git a/docs/USAGE.md b/docs/USAGE.md index f5f548e..3a05155 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -46,6 +46,7 @@ log_level: info max_connections: 256 # concurrent connection limit (backpressure) pool_size: 0 # pre-warmed TCP connections to first hop (0 = disabled) pool_max_idle: 30 # max idle time for pooled connections (seconds) +api_listen: "" # control API bind address (empty = disabled) chain: - socks5://127.0.0.1:9050 @@ -216,6 +217,70 @@ other pool settings). The old `proxy_source` key is still supported and auto-converts to `proxy_pool` with a single API source. `proxy_pool` takes precedence if both are present. +## Control API + +Built-in HTTP API for runtime inspection and management. Disabled by default; +enable with `api_listen` in config or `--api` on the command line. + +```yaml +api_listen: 127.0.0.1:1081 +``` + +```bash +s5p --api 127.0.0.1:1081 -c config/s5p.yaml +``` + +### Read endpoints + +| Method | Path | Description | +|--------|------|-------------| +| `GET` | `/status` | Combined summary: uptime, metrics, pool stats, chain | +| `GET` | `/metrics` | Full metrics counters (connections, bytes, uptime) | +| `GET` | `/pool` | All proxies with per-entry state | +| `GET` | `/pool/alive` | Alive proxies only | +| `GET` | `/config` | Current runtime config (sanitized) | + +### Write endpoints + +| Method | Path | Description | +|--------|------|-------------| +| `POST` | `/reload` | Re-read config file (replaces SIGHUP) | +| `POST` | `/pool/test` | Trigger immediate health test cycle | +| `POST` | `/pool/refresh` | Trigger immediate source re-fetch | + +All responses are `application/json`. Errors return `{"error": "message"}` with +appropriate status code (400, 404, 405, 500). + +### Examples + +```bash +# Runtime status +curl -s http://127.0.0.1:1081/status | jq . + +# Full metrics +curl -s http://127.0.0.1:1081/metrics | jq . + +# Pool state (all proxies) +curl -s http://127.0.0.1:1081/pool | jq . + +# Alive proxies only +curl -s http://127.0.0.1:1081/pool/alive | jq '.proxies | length' + +# Current config +curl -s http://127.0.0.1:1081/config | jq . + +# Reload config (like SIGHUP) +curl -s -X POST http://127.0.0.1:1081/reload | jq . + +# Trigger health tests now +curl -s -X POST http://127.0.0.1:1081/pool/test | jq . + +# Re-fetch proxy sources now +curl -s -X POST http://127.0.0.1:1081/pool/refresh | jq . +``` + +Settings that require a restart: `listen`, `chain`, `pool_size`, `pool_max_idle`, `api_listen`. + ## Connection Retry When a proxy pool is active, s5p retries failed connections with a different @@ -250,7 +315,7 @@ Settings reloaded on SIGHUP: | `max_connections` | Concurrent connection limit | | `proxy_pool.*` | Sources, intervals, thresholds | -Settings that require a restart: `listen`, `chain`, `pool_size`, `pool_max_idle`. +Settings that require a restart: `listen`, `chain`, `pool_size`, `pool_max_idle`, `api_listen`. Requires `-c` / `--config` to know which file to re-read. Without a config file, SIGHUP is ignored with a warning. -- 2.49.1 From b07135ad442c5cd22fb7da03b89eaedd18f94ec2 Mon Sep 17 00:00:00 2001 From: user Date: Mon, 16 Feb 2026 20:06:07 +0100 Subject: [PATCH 5/9] feat: Tor control port client with NEWNYM support Async TCP client for the Tor control protocol (port 9051). Supports password, cookie, and bare authentication. Provides NEWNYM signaling with client-side 10s rate limiting and optional periodic timer. Auto-reconnects on disconnect. Adds TorConfig dataclass and YAML parsing to config module. Co-Authored-By: Claude Opus 4.6 --- src/s5p/config.py | 22 +++++ src/s5p/tor.py | 204 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 226 insertions(+) create mode 100644 src/s5p/tor.py diff --git a/src/s5p/config.py b/src/s5p/config.py index 157f865..b6c8f2f 100644 --- a/src/s5p/config.py +++ b/src/s5p/config.py @@ -53,6 +53,17 @@ class ProxyPoolConfig: report_url: str = "" +@dataclass +class TorConfig: + """Tor control port configuration.""" + + control_host: str = "127.0.0.1" + control_port: int = 9051 + password: str = "" + cookie_file: str = "" + newnym_interval: float = 0.0 # 0 = manual only + + @dataclass class Config: """Server configuration.""" @@ -69,6 +80,7 @@ class Config: api_host: str = "" api_port: int = 0 proxy_pool: ProxyPoolConfig | None = None + tor: TorConfig | None = None config_file: str = "" @@ -218,4 +230,14 @@ def load_config(path: str | Path) -> Config: refresh=refresh, ) + if "tor" in raw: + tor_raw = raw["tor"] + config.tor = TorConfig( + control_host=tor_raw.get("control_host", "127.0.0.1"), + control_port=int(tor_raw.get("control_port", 9051)), + password=tor_raw.get("password", ""), + cookie_file=tor_raw.get("cookie_file", ""), + newnym_interval=float(tor_raw.get("newnym_interval", 0)), + ) + return config diff --git a/src/s5p/tor.py b/src/s5p/tor.py new file mode 100644 index 0000000..b0bb473 --- /dev/null +++ b/src/s5p/tor.py @@ -0,0 +1,204 @@ +"""Tor control port client with NEWNYM support.""" + +from __future__ import annotations + +import asyncio +import logging +import time + +logger = logging.getLogger("s5p") + +_NEWNYM_MIN_INTERVAL = 10.0 # Tor enforces 10s between NEWNYMs + + +class TorController: + """Async client for the Tor control protocol. + + Supports password, cookie, and bare authentication. Provides NEWNYM + signaling (new circuit) on demand or on a periodic timer. + """ + + def __init__( + self, + host: str = "127.0.0.1", + port: int = 9051, + password: str = "", + cookie_file: str = "", + newnym_interval: float = 0.0, + ) -> None: + self._host = host + self._port = port + self._password = password + self._cookie_file = cookie_file + self._newnym_interval = newnym_interval + self._reader: asyncio.StreamReader | None = None + self._writer: asyncio.StreamWriter | None = None + self._last_newnym: float = 0.0 + self._stop = asyncio.Event() + self._tasks: list[asyncio.Task] = [] + self._lock = asyncio.Lock() + + # -- properties ---------------------------------------------------------- + + @property + def connected(self) -> bool: + """True if the control connection is open.""" + return self._writer is not None and not self._writer.is_closing() + + @property + def last_newnym(self) -> float: + """Monotonic timestamp of the last successful NEWNYM (0 if never).""" + return self._last_newnym + + @property + def newnym_interval(self) -> float: + """Periodic NEWNYM interval in seconds (0 = manual only).""" + return self._newnym_interval + + # -- lifecycle ----------------------------------------------------------- + + async def start(self) -> None: + """Connect, authenticate, and start optional newnym loop.""" + await self._connect() + if self._newnym_interval > 0: + self._tasks.append(asyncio.create_task(self._newnym_loop())) + + async def stop(self) -> None: + """Cancel tasks and close the connection.""" + self._stop.set() + for task in self._tasks: + task.cancel() + for task in self._tasks: + try: + await task + except asyncio.CancelledError: + pass + self._tasks.clear() + self._close() + + # -- public commands ----------------------------------------------------- + + async def newnym(self) -> bool: + """Send SIGNAL NEWNYM with client-side 10s rate limit. + + Reconnects automatically if the connection was lost. + Returns True on success, False on rate-limit or failure. + """ + now = time.monotonic() + if self._last_newnym and (now - self._last_newnym) < _NEWNYM_MIN_INTERVAL: + return False + + async with self._lock: + try: + if not self.connected: + await self._connect() + code, _ = await self._command("SIGNAL NEWNYM") + if code == 250: + self._last_newnym = time.monotonic() + logger.debug("tor: NEWNYM sent") + return True + logger.warning("tor: NEWNYM failed: %d", code) + return False + except (ConnectionError, OSError, TimeoutError) as e: + logger.warning("tor: NEWNYM error: %s", e) + self._close() + return False + + async def get_info(self, keyword: str) -> str | None: + """Send GETINFO and return the response value, or None on error.""" + async with self._lock: + try: + if not self.connected: + await self._connect() + code, lines = await self._command(f"GETINFO {keyword}") + if code == 250 and lines: + # response format: "keyword=value" + for line in lines: + if "=" in line: + return line.split("=", 1)[1] + return None + except (ConnectionError, OSError, TimeoutError): + self._close() + return None + + # -- internals ----------------------------------------------------------- + + async def _connect(self) -> None: + """Open TCP connection and authenticate.""" + self._close() + self._reader, self._writer = await asyncio.wait_for( + asyncio.open_connection(self._host, self._port), + timeout=10.0, + ) + await self._authenticate() + logger.info("tor: connected to %s:%d", self._host, self._port) + + async def _authenticate(self) -> None: + """Send AUTHENTICATE with configured credentials.""" + if self._cookie_file: + try: + with open(self._cookie_file, "rb") as f: + cookie = f.read().hex() + cmd = f"AUTHENTICATE {cookie}" + except OSError as e: + self._close() + raise ConnectionError(f"cannot read cookie file: {e}") from e + elif self._password: + cmd = f'AUTHENTICATE "{self._password}"' + else: + cmd = "AUTHENTICATE" + + code, _ = await self._command(cmd) + if code != 250: + self._close() + raise ConnectionError(f"tor auth failed: {code}") + + async def _command(self, cmd: str) -> tuple[int, list[str]]: + """Send a command and read the multi-line response. + + Returns (status_code, [response_lines]). + """ + if not self._writer or not self._reader: + raise ConnectionError("not connected") + + self._writer.write(f"{cmd}\r\n".encode()) + await self._writer.drain() + + lines: list[str] = [] + while True: + raw = await asyncio.wait_for(self._reader.readline(), timeout=10.0) + if not raw: + raise ConnectionError("connection closed") + line = raw.decode("ascii", errors="replace").rstrip("\r\n") + if len(line) < 4: + raise ConnectionError(f"malformed response: {line!r}") + code = int(line[:3]) + sep = line[3] + text = line[4:] + lines.append(text) + if sep == " ": + return code, lines + # sep == '-' means continuation + + def _close(self) -> None: + """Close TCP connection silently.""" + if self._writer: + try: + self._writer.close() + except OSError: + pass + self._writer = None + self._reader = None + + async def _newnym_loop(self) -> None: + """Periodic NEWNYM on configured interval.""" + while not self._stop.is_set(): + try: + await asyncio.wait_for( + self._stop.wait(), + timeout=self._newnym_interval, + ) + except TimeoutError: + pass + if not self._stop.is_set(): + await self.newnym() -- 2.49.1 From ff217be9c82a88a84e537b1c6afe21b176735c9b Mon Sep 17 00:00:00 2001 From: user Date: Mon, 16 Feb 2026 20:07:18 +0100 Subject: [PATCH 6/9] feat: wire Tor controller into server and API Start/stop TorController in serve() lifecycle when tor: config is present. Adds GET /tor (status) and POST /tor/newnym (signal) endpoints to the control API. Logs control address at startup. Adds tor: section and api_listen to example config. Co-Authored-By: Claude Opus 4.6 --- config/example.yaml | 10 ++++++++++ src/s5p/api.py | 31 +++++++++++++++++++++++++++++++ src/s5p/server.py | 24 ++++++++++++++++++++++++ 3 files changed, 65 insertions(+) diff --git a/config/example.yaml b/config/example.yaml index 88cccf3..b440c15 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -8,6 +8,7 @@ log_level: info # max_connections: 256 # max concurrent client connections (backpressure) # pool_size: 0 # pre-warmed TCP connections to first hop (0 = disabled) # pool_max_idle: 30 # max idle time (seconds) for pooled connections +# api_listen: 127.0.0.1:1081 # control API (disabled by default) # Proxy chain -- connections tunnel through each hop in order. # Supported protocols: socks5://, socks4://, http:// @@ -37,6 +38,15 @@ chain: # state_file: "" # empty = ~/.cache/s5p/pool.json # report_url: "" # POST dead proxies here (optional) +# Tor control port -- enables NEWNYM signaling (new circuit on demand). +# Requires Tor's ControlPort enabled (torrc: ControlPort 9051). +# tor: +# control_host: 127.0.0.1 +# control_port: 9051 +# password: "" # HashedControlPassword in torrc +# cookie_file: "" # CookieAuthentication file path +# newnym_interval: 0 # periodic NEWNYM (seconds, 0 = manual only) + # Legacy proxy source (still supported, auto-converts to proxy_pool): # proxy_source: # url: http://10.200.1.250:8081/proxies diff --git a/src/s5p/api.py b/src/s5p/api.py index fb0d40e..b8b6c2d 100644 --- a/src/s5p/api.py +++ b/src/s5p/api.py @@ -167,6 +167,31 @@ async def _handle_pool_refresh(ctx: dict) -> tuple[int, dict]: return 200, {"ok": True} +def _handle_tor(ctx: dict) -> tuple[int, dict]: + """GET /tor -- Tor controller status.""" + tor = ctx.get("tor") + if not tor: + return 200, {"enabled": False} + last = tor.last_newnym + return 200, { + "enabled": True, + "connected": tor.connected, + "last_newnym": round(time.monotonic() - last, 1) if last else None, + "newnym_interval": tor.newnym_interval, + } + + +async def _handle_tor_newnym(ctx: dict) -> tuple[int, dict]: + """POST /tor/newnym -- trigger NEWNYM signal.""" + tor = ctx.get("tor") + if not tor: + return 400, {"error": "tor control not configured"} + ok = await tor.newnym() + if ok: + return 200, {"ok": True} + return 200, {"ok": False, "reason": "rate-limited or not connected"} + + # -- routing ----------------------------------------------------------------- _GET_ROUTES: dict[str, str] = { @@ -175,12 +200,14 @@ _GET_ROUTES: dict[str, str] = { "/pool": "pool", "/pool/alive": "pool_alive", "/config": "config", + "/tor": "tor", } _POST_ROUTES: dict[str, str] = { "/reload": "reload", "/pool/test": "pool_test", "/pool/refresh": "pool_refresh", + "/tor/newnym": "tor_newnym", } @@ -198,6 +225,8 @@ async def _route(method: str, path: str, ctx: dict) -> tuple[int, dict]: return _handle_pool(ctx, alive_only=True) if name == "config": return _handle_config(ctx) + if name == "tor": + return _handle_tor(ctx) if method == "POST" and path in _POST_ROUTES: name = _POST_ROUTES[path] @@ -207,6 +236,8 @@ async def _route(method: str, path: str, ctx: dict) -> tuple[int, dict]: return await _handle_pool_test(ctx) if name == "pool_refresh": return await _handle_pool_refresh(ctx) + if name == "tor_newnym": + return await _handle_tor_newnym(ctx) # wrong method on a known path if path in _GET_ROUTES or path in _POST_ROUTES: diff --git a/src/s5p/server.py b/src/s5p/server.py index 9eb98cd..af56c6e 100644 --- a/src/s5p/server.py +++ b/src/s5p/server.py @@ -14,6 +14,7 @@ from .connpool import FirstHopPool from .metrics import Metrics from .pool import ProxyPool from .proto import ProtoError, Socks5Reply, build_chain, read_socks5_address +from .tor import TorController logger = logging.getLogger("s5p") @@ -236,6 +237,22 @@ async def serve(config: Config) -> None: ) await hop_pool.start() + tor: TorController | None = None + if config.tor: + tc = config.tor + tor = TorController( + host=tc.control_host, + port=tc.control_port, + password=tc.password, + cookie_file=tc.cookie_file, + newnym_interval=tc.newnym_interval, + ) + try: + await tor.start() + except (ConnectionError, OSError, TimeoutError) as e: + logger.warning("tor: control port unavailable: %s", e) + tor = None + sem = asyncio.Semaphore(config.max_connections) async def on_client(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None: @@ -260,6 +277,10 @@ async def serve(config: Config) -> None: ) logger.info(" retries: %d", config.retries) + if tor: + interval = f", newnym every {tor.newnym_interval:.0f}s" if tor.newnym_interval else "" + logger.info(" tor: control %s:%d%s", config.tor.control_host, config.tor.control_port, interval) + # -- control API --------------------------------------------------------- api_srv: asyncio.Server | None = None if config.api_port: @@ -268,6 +289,7 @@ async def serve(config: Config) -> None: "metrics": metrics, "pool": proxy_pool, "hop_pool": hop_pool, + "tor": tor, } # SIGHUP: hot-reload config (timeout, retries, log_level, pool settings) @@ -314,6 +336,8 @@ async def serve(config: Config) -> None: if api_srv: api_srv.close() await api_srv.wait_closed() + if tor: + await tor.stop() if hop_pool: await hop_pool.stop() if proxy_pool: -- 2.49.1 From f0281c40699a5fbcbfc1d166acf7cdb5ebac1d51 Mon Sep 17 00:00:00 2001 From: user Date: Mon, 16 Feb 2026 20:09:05 +0100 Subject: [PATCH 7/9] test: Tor controller and API endpoint tests Covers: password/cookie/bare auth, auth failure, connect failure, NEWNYM success/rate-limiting/reconnect, GETINFO multi-line parsing, start/stop lifecycle, GET /tor status, POST /tor/newnym dispatch, and TorConfig YAML parsing. Co-Authored-By: Claude Opus 4.6 --- src/s5p/server.py | 6 +- tests/test_api.py | 83 ++++++++++++++ tests/test_config.py | 34 ++++++ tests/test_tor.py | 258 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 379 insertions(+), 2 deletions(-) create mode 100644 tests/test_tor.py diff --git a/src/s5p/server.py b/src/s5p/server.py index af56c6e..4a02e58 100644 --- a/src/s5p/server.py +++ b/src/s5p/server.py @@ -278,8 +278,10 @@ async def serve(config: Config) -> None: logger.info(" retries: %d", config.retries) if tor: - interval = f", newnym every {tor.newnym_interval:.0f}s" if tor.newnym_interval else "" - logger.info(" tor: control %s:%d%s", config.tor.control_host, config.tor.control_port, interval) + extra = f", newnym every {tor.newnym_interval:.0f}s" if tor.newnym_interval else "" + logger.info( + " tor: control %s:%d%s", config.tor.control_host, config.tor.control_port, extra, + ) # -- control API --------------------------------------------------------- api_srv: asyncio.Server | None = None diff --git a/tests/test_api.py b/tests/test_api.py index bab6683..fc94780 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -9,6 +9,8 @@ from s5p.api import ( _handle_metrics, _handle_pool, _handle_status, + _handle_tor, + _handle_tor_newnym, _json_response, _parse_request, _route, @@ -80,6 +82,7 @@ class TestJsonResponse: def _make_ctx( config: Config | None = None, pool: MagicMock | None = None, + tor: MagicMock | None = None, ) -> dict: """Build a mock context dict.""" return { @@ -87,6 +90,7 @@ def _make_ctx( "metrics": Metrics(), "pool": pool, "hop_pool": None, + "tor": tor, } @@ -284,3 +288,82 @@ class TestRouting: status, body = asyncio.run(_route("GET", "/reload", ctx)) assert status == 405 assert "POST" in body["error"] + + def test_get_tor(self): + ctx = _make_ctx() + status, body = asyncio.run(_route("GET", "/tor", ctx)) + assert status == 200 + assert body == {"enabled": False} + + def test_post_tor_newnym(self): + tor = MagicMock() + tor.newnym = AsyncMock(return_value=True) + ctx = _make_ctx(tor=tor) + status, body = asyncio.run(_route("POST", "/tor/newnym", ctx)) + assert status == 200 + assert body == {"ok": True} + + +# -- Tor endpoints ---------------------------------------------------------- + + +class TestHandleTor: + """Test GET /tor handler.""" + + def test_disabled(self): + ctx = _make_ctx() + status, body = _handle_tor(ctx) + assert status == 200 + assert body == {"enabled": False} + + def test_connected(self): + tor = MagicMock() + tor.connected = True + tor.last_newnym = 0.0 + tor.newnym_interval = 60.0 + ctx = _make_ctx(tor=tor) + status, body = _handle_tor(ctx) + assert status == 200 + assert body["enabled"] is True + assert body["connected"] is True + assert body["last_newnym"] is None + assert body["newnym_interval"] == 60.0 + + def test_with_last_newnym(self): + import time + tor = MagicMock() + tor.connected = True + tor.last_newnym = time.monotonic() - 42.0 + tor.newnym_interval = 0.0 + ctx = _make_ctx(tor=tor) + status, body = _handle_tor(ctx) + assert status == 200 + assert body["last_newnym"] is not None + assert body["last_newnym"] >= 42.0 + + +class TestHandleTorNewnym: + """Test POST /tor/newnym handler.""" + + def test_success(self): + tor = MagicMock() + tor.newnym = AsyncMock(return_value=True) + ctx = _make_ctx(tor=tor) + status, body = asyncio.run(_handle_tor_newnym(ctx)) + assert status == 200 + assert body == {"ok": True} + + def test_rate_limited(self): + tor = MagicMock() + tor.newnym = AsyncMock(return_value=False) + ctx = _make_ctx(tor=tor) + status, body = asyncio.run(_handle_tor_newnym(ctx)) + assert status == 200 + assert body["ok"] is False + assert "reason" in body + + def test_not_configured(self): + ctx = _make_ctx() + status, body = asyncio.run(_handle_tor_newnym(ctx)) + assert status == 400 + assert "error" in body diff --git a/tests/test_config.py b/tests/test_config.py index b3c82b4..89256f7 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -141,3 +141,37 @@ class TestConfig: c = load_config(cfg_file) assert c.pool_size == 16 assert c.pool_max_idle == 45.0 + + def test_tor_config_from_yaml(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text( + "tor:\n" + " control_host: 10.0.0.1\n" + " control_port: 9151\n" + " password: secret\n" + " cookie_file: /var/run/tor/cookie\n" + " newnym_interval: 60\n" + ) + c = load_config(cfg_file) + assert c.tor is not None + assert c.tor.control_host == "10.0.0.1" + assert c.tor.control_port == 9151 + assert c.tor.password == "secret" + assert c.tor.cookie_file == "/var/run/tor/cookie" + assert c.tor.newnym_interval == 60.0 + + def test_tor_config_defaults(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text("tor:\n password: test\n") + c = load_config(cfg_file) + assert c.tor is not None + assert c.tor.control_host == "127.0.0.1" + assert c.tor.control_port == 9051 + assert c.tor.cookie_file == "" + assert c.tor.newnym_interval == 0.0 + + def test_no_tor_config(self, tmp_path): + cfg_file = tmp_path / "test.yaml" + cfg_file.write_text("listen: 1080\n") + c = load_config(cfg_file) + assert c.tor is None diff --git a/tests/test_tor.py b/tests/test_tor.py new file mode 100644 index 0000000..b2200b6 --- /dev/null +++ b/tests/test_tor.py @@ -0,0 +1,258 @@ +"""Tests for the Tor control port client.""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock, mock_open, patch + +import pytest + +from s5p.tor import TorController + +# -- helpers ----------------------------------------------------------------- + + +def _mock_reader(responses: list[bytes]) -> AsyncMock: + """Create a mock StreamReader that yields canned lines.""" + reader = AsyncMock(spec=asyncio.StreamReader) + reader.readline = AsyncMock(side_effect=responses) + return reader + + +def _mock_writer() -> MagicMock: + """Create a mock StreamWriter.""" + writer = MagicMock(spec=asyncio.StreamWriter) + writer.write = MagicMock() + writer.drain = AsyncMock() + writer.close = MagicMock() + writer.wait_closed = AsyncMock() + writer.is_closing = MagicMock(return_value=False) + return writer + + +# -- authentication ---------------------------------------------------------- + + +class TestAuthentication: + """Test Tor control port authentication modes.""" + + def test_password_auth(self): + reader = _mock_reader([b"250 OK\r\n"]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController(password="secret") + await tc._connect() + # verify AUTHENTICATE command was sent with password + calls = writer.write.call_args_list + assert any(b'AUTHENTICATE "secret"' in c[0][0] for c in calls) + tc._close() + + asyncio.run(run()) + + def test_cookie_auth(self): + reader = _mock_reader([b"250 OK\r\n"]) + writer = _mock_writer() + cookie_data = b"\xde\xad\xbe\xef" + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + with patch("builtins.open", mock_open(read_data=cookie_data)): + tc = TorController(cookie_file="/var/run/tor/control.authcookie") + await tc._connect() + calls = writer.write.call_args_list + assert any(b"AUTHENTICATE deadbeef" in c[0][0] for c in calls) + tc._close() + + asyncio.run(run()) + + def test_bare_auth(self): + reader = _mock_reader([b"250 OK\r\n"]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController() + await tc._connect() + calls = writer.write.call_args_list + assert any(c[0][0] == b"AUTHENTICATE\r\n" for c in calls) + tc._close() + + asyncio.run(run()) + + def test_auth_failure(self): + reader = _mock_reader([b"515 Bad authentication\r\n"]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController(password="wrong") + with pytest.raises(ConnectionError, match="auth failed"): + await tc._connect() + + asyncio.run(run()) + + def test_connect_failure(self): + async def run(): + with patch("asyncio.open_connection", side_effect=OSError("refused")): + tc = TorController() + with pytest.raises(OSError, match="refused"): + await tc._connect() + + asyncio.run(run()) + + +# -- NEWNYM ------------------------------------------------------------------ + + +class TestNewnym: + """Test NEWNYM signaling.""" + + def test_newnym_success(self): + # auth response + newnym response + reader = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController() + await tc._connect() + ok = await tc.newnym() + assert ok is True + assert tc.last_newnym > 0 + tc._close() + + asyncio.run(run()) + + def test_newnym_rate_limited(self): + # auth + first newnym + reader = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController() + await tc._connect() + ok = await tc.newnym() + assert ok is True + # immediate second call should be rate-limited + ok2 = await tc.newnym() + assert ok2 is False + tc._close() + + asyncio.run(run()) + + def test_newnym_reconnects_on_disconnect(self): + # first connect auth, then reconnect auth + newnym + reader1 = _mock_reader([b"250 OK\r\n"]) + writer1 = _mock_writer() + reader2 = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"]) + writer2 = _mock_writer() + + call_count = 0 + + async def fake_connect(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count == 1: + return reader1, writer1 + return reader2, writer2 + + async def run(): + with patch("asyncio.open_connection", side_effect=fake_connect): + tc = TorController() + await tc._connect() + # simulate disconnect + tc._close() + assert not tc.connected + # newnym should reconnect + ok = await tc.newnym() + assert ok is True + tc._close() + + asyncio.run(run()) + + +# -- GETINFO ----------------------------------------------------------------- + + +class TestGetInfo: + """Test GETINFO command.""" + + def test_getinfo_version(self): + # auth + getinfo multi-line response + reader = _mock_reader([ + b"250 OK\r\n", + b"250-version=0.4.8.12\r\n", + b"250 OK\r\n", + ]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController() + await tc._connect() + version = await tc.get_info("version") + assert version == "0.4.8.12" + tc._close() + + asyncio.run(run()) + + def test_getinfo_not_connected(self): + # auth for reconnect + getinfo + reader = _mock_reader([ + b"250 OK\r\n", + b"250-traffic/read=12345\r\n", + b"250 OK\r\n", + ]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController() + # not connected, should auto-connect + result = await tc.get_info("traffic/read") + assert result == "12345" + tc._close() + + asyncio.run(run()) + + +# -- lifecycle --------------------------------------------------------------- + + +class TestLifecycle: + """Test start/stop lifecycle.""" + + def test_start_stop(self): + reader = _mock_reader([b"250 OK\r\n"]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController() + await tc.start() + assert tc.connected + await tc.stop() + assert not tc.connected + + asyncio.run(run()) + + def test_start_with_newnym_loop(self): + reader = _mock_reader([b"250 OK\r\n"]) + writer = _mock_writer() + + async def run(): + with patch("asyncio.open_connection", return_value=(reader, writer)): + tc = TorController(newnym_interval=60.0) + await tc.start() + assert len(tc._tasks) == 1 + await tc.stop() + assert len(tc._tasks) == 0 + + asyncio.run(run()) + + def test_properties(self): + tc = TorController(newnym_interval=30.0) + assert not tc.connected + assert tc.last_newnym == 0.0 + assert tc.newnym_interval == 30.0 -- 2.49.1 From d2df32fdab908f9569c09da60739cd42fca9d326 Mon Sep 17 00:00:00 2001 From: user Date: Mon, 16 Feb 2026 20:09:51 +0100 Subject: [PATCH 8/9] docs: document Tor control port integration Adds Tor control port section to USAGE.md covering config, auth modes, rate limiting, and API endpoints. Updates README feature line and config example, CHEATSHEET with tor snippets, and marks the feature complete in TASKS.md and ROADMAP.md. Co-Authored-By: Claude Opus 4.6 --- README.md | 7 +++++- ROADMAP.md | 2 +- TASKS.md | 3 ++- docs/CHEATSHEET.md | 14 +++++++++++ docs/USAGE.md | 63 ++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 86 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index d5ecbe0..58c0193 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ through configurable chains of SOCKS4, SOCKS5, and HTTP CONNECT proxies. - Supported hop protocols: SOCKS5, SOCKS4/4a, HTTP CONNECT - Per-hop authentication (username/password) - DNS leak prevention (domain names forwarded to proxies, never resolved locally) -- Tor integration (Tor is just another SOCKS5 hop) +- Tor integration (SOCKS5 hop + control port NEWNYM for circuit rotation) - Managed proxy pool: multiple sources (API + file), health-tested, weighted selection - Per-proxy failure backoff (60s cooldown), stale proxy expiry, chain pre-flight - Fast warm start (seconds on restart vs minutes on cold start) @@ -76,6 +76,11 @@ api_listen: 127.0.0.1:1081 # control API (disabled by default) chain: - socks5://127.0.0.1:9050 # Tor +tor: + control_port: 9051 # Tor control port (NEWNYM) + password: "" # or cookie_file for auth + newnym_interval: 0 # periodic circuit rotation (0 = manual) + proxy_pool: sources: - url: http://10.200.1.250:8081/proxies diff --git a/ROADMAP.md b/ROADMAP.md index ac5f329..6ec6df6 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -24,7 +24,7 @@ - [x] Built-in control API (runtime metrics, pool state, config reload) - [ ] SOCKS5 server authentication (username/password) -- [ ] Tor control port integration (circuit renewal via NEWNYM) +- [x] Tor control port integration (circuit renewal via NEWNYM) - [ ] Metrics (connections/sec, bytes relayed, hop latency) ## v0.3.0 diff --git a/TASKS.md b/TASKS.md index 3bb79c2..a45ec1f 100644 --- a/TASKS.md +++ b/TASKS.md @@ -44,7 +44,8 @@ - [x] Use k8s-file logging driver with rotation - [x] Built-in control API (`api.py`, `--api`, `api_listen`) +- [x] Tor control port integration (NEWNYM signaling, periodic rotation) + ## Next - [ ] Integration tests with mock proxy server - [ ] SOCKS5 server-side authentication -- [ ] Tor control port integration diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index bff5a6d..c338fcc 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -59,6 +59,20 @@ proxy_pool: report_url: "" # POST dead proxies (optional) ``` +## Tor Control Port (config) + +```yaml +tor: + control_port: 9051 + password: "" # or cookie_file: /path/to/cookie + newnym_interval: 60 # auto-rotate every 60s (0 = manual) +``` + +```bash +curl -s http://127.0.0.1:1081/tor | jq . # status +curl -s -X POST http://127.0.0.1:1081/tor/newnym | jq . # new circuit +``` + ## Hot Reload ```bash diff --git a/docs/USAGE.md b/docs/USAGE.md index 3a05155..eea824e 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -281,6 +281,69 @@ curl -s -X POST http://127.0.0.1:1081/pool/refresh | jq . Settings that require a restart: `listen`, `chain`, `pool_size`, `pool_max_idle`, `api_listen`. +## Tor Control Port + +Optional integration with Tor's control protocol for circuit management. +When enabled, s5p connects to Tor's control port and can send NEWNYM signals +to request new circuits (new exit node) on demand or on a timer. + +### Configuration + +```yaml +tor: + control_host: 127.0.0.1 # Tor control address + control_port: 9051 # Tor control port + password: "" # HashedControlPassword (torrc) + cookie_file: "" # CookieAuthentication file path + newnym_interval: 0 # periodic NEWNYM in seconds (0 = manual only) +``` + +Requires Tor's `ControlPort` enabled in `torrc`: + +``` +ControlPort 9051 +HashedControlPassword 16:... # or CookieAuthentication 1 +``` + +### Authentication modes + +| Mode | Config | torrc | +|------|--------|-------| +| Password | `password: "secret"` | `HashedControlPassword 16:...` | +| Cookie | `cookie_file: /var/run/tor/control.authcookie` | `CookieAuthentication 1` | +| None | (leave both empty) | No auth configured | + +### Rate limiting + +Tor enforces a minimum 10-second interval between NEWNYM signals. s5p +applies the same client-side rate limit to avoid unnecessary rejections. + +### API endpoints + +| Method | Path | Description | +|--------|------|-------------| +| `GET` | `/tor` | Controller status (enabled, connected, last NEWNYM) | +| `POST` | `/tor/newnym` | Trigger NEWNYM signal (new circuit) | + +```bash +# Check tor controller status +curl -s http://127.0.0.1:1081/tor | jq . + +# Request new circuit +curl -s -X POST http://127.0.0.1:1081/tor/newnym | jq . +``` + +### Periodic NEWNYM + +Set `newnym_interval` to automatically rotate circuits: + +```yaml +tor: + newnym_interval: 60 # new circuit every 60 seconds +``` + +Values below 10 are clamped to Tor's minimum interval. + ## Connection Retry When a proxy pool is active, s5p retries failed connections with a different -- 2.49.1 From 6c84a144c047e389e4e7fbb4518f985d9451a2c4 Mon Sep 17 00:00:00 2001 From: user Date: Tue, 17 Feb 2026 10:43:47 +0100 Subject: [PATCH 9/9] feat: add --tracemalloc flag for memory profiling Uses Python's built-in tracemalloc module to show top N memory allocators on exit. Orthogonal to --cprofile; both can run together. Co-Authored-By: Claude Opus 4.6 --- README.md | 1 + docs/CHEATSHEET.md | 2 ++ docs/USAGE.md | 9 +++++++++ src/s5p/cli.py | 21 +++++++++++++++++++-- 4 files changed, 31 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 58c0193..daddcfc 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,7 @@ Options: -v, --verbose Debug logging -q, --quiet Errors only --cprofile [FILE] Enable cProfile, dump to FILE (default: s5p.prof) + --tracemalloc [N] Enable tracemalloc, show top N allocators on exit (default: 10) -V, --version Show version ``` diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index c338fcc..e6db104 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -17,6 +17,8 @@ s5p -m 512 # max concurrent connections s5p --api 127.0.0.1:1081 # enable control API s5p --cprofile # profile to s5p.prof s5p --cprofile out.prof # profile to custom file +s5p --tracemalloc # memory profile (top 10) +s5p --tracemalloc 20 # memory profile (top 20) ``` ## Container diff --git a/docs/USAGE.md b/docs/USAGE.md index eea824e..e0e0356 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -419,6 +419,15 @@ s5p --cprofile output.prof -c config/s5p.yaml # Analyze after stopping python -m pstats s5p.prof + +# Memory profiling with tracemalloc (top 10 allocators on exit) +s5p --tracemalloc -c config/s5p.yaml + +# Show top 20 allocators +s5p --tracemalloc 20 -c config/s5p.yaml + +# Both profilers simultaneously +s5p --cprofile --tracemalloc -c config/s5p.yaml ``` ## Testing the Proxy diff --git a/src/s5p/cli.py b/src/s5p/cli.py index 718d705..6991aed 100644 --- a/src/s5p/cli.py +++ b/src/s5p/cli.py @@ -62,6 +62,10 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: "--cprofile", metavar="FILE", nargs="?", const="s5p.prof", help="enable cProfile, dump stats to FILE (default: s5p.prof)", ) + p.add_argument( + "--tracemalloc", metavar="N", nargs="?", const=10, type=int, + help="enable tracemalloc, show top N allocators on exit (default: 10)", + ) return p.parse_args(argv) @@ -112,6 +116,11 @@ def main(argv: list[str] | None = None) -> int: config.log_level = "error" _setup_logging(config.log_level) + logger = logging.getLogger("s5p") + + if args.tracemalloc: + import tracemalloc + tracemalloc.start() try: if args.cprofile: @@ -123,13 +132,21 @@ def main(argv: list[str] | None = None) -> int: finally: prof.disable() prof.dump_stats(args.cprofile) - logging.getLogger("s5p").info("profile saved to %s", args.cprofile) + logger.info("profile saved to %s", args.cprofile) else: asyncio.run(serve(config)) except KeyboardInterrupt: return 0 except Exception as e: - logging.getLogger("s5p").error("%s", e) + logger.error("%s", e) return 1 + finally: + if args.tracemalloc: + import tracemalloc + snapshot = tracemalloc.take_snapshot() + stats = snapshot.statistics("lineno") + logger.info("tracemalloc: top %d allocations", args.tracemalloc) + for stat in stats[:args.tracemalloc]: + logger.info(" %s", stat) return 0 -- 2.49.1