From b07135ad442c5cd22fb7da03b89eaedd18f94ec2 Mon Sep 17 00:00:00 2001 From: user Date: Mon, 16 Feb 2026 20:06:07 +0100 Subject: [PATCH] feat: Tor control port client with NEWNYM support Async TCP client for the Tor control protocol (port 9051). Supports password, cookie, and bare authentication. Provides NEWNYM signaling with client-side 10s rate limiting and optional periodic timer. Auto-reconnects on disconnect. Adds TorConfig dataclass and YAML parsing to config module. Co-Authored-By: Claude Opus 4.6 --- src/s5p/config.py | 22 +++++ src/s5p/tor.py | 204 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 226 insertions(+) create mode 100644 src/s5p/tor.py diff --git a/src/s5p/config.py b/src/s5p/config.py index 157f865..b6c8f2f 100644 --- a/src/s5p/config.py +++ b/src/s5p/config.py @@ -53,6 +53,17 @@ class ProxyPoolConfig: report_url: str = "" +@dataclass +class TorConfig: + """Tor control port configuration.""" + + control_host: str = "127.0.0.1" + control_port: int = 9051 + password: str = "" + cookie_file: str = "" + newnym_interval: float = 0.0 # 0 = manual only + + @dataclass class Config: """Server configuration.""" @@ -69,6 +80,7 @@ class Config: api_host: str = "" api_port: int = 0 proxy_pool: ProxyPoolConfig | None = None + tor: TorConfig | None = None config_file: str = "" @@ -218,4 +230,14 @@ def load_config(path: str | Path) -> Config: refresh=refresh, ) + if "tor" in raw: + tor_raw = raw["tor"] + config.tor = TorConfig( + control_host=tor_raw.get("control_host", "127.0.0.1"), + control_port=int(tor_raw.get("control_port", 9051)), + password=tor_raw.get("password", ""), + cookie_file=tor_raw.get("cookie_file", ""), + newnym_interval=float(tor_raw.get("newnym_interval", 0)), + ) + return config diff --git a/src/s5p/tor.py b/src/s5p/tor.py new file mode 100644 index 0000000..b0bb473 --- /dev/null +++ b/src/s5p/tor.py @@ -0,0 +1,204 @@ +"""Tor control port client with NEWNYM support.""" + +from __future__ import annotations + +import asyncio +import logging +import time + +logger = logging.getLogger("s5p") + +_NEWNYM_MIN_INTERVAL = 10.0 # Tor enforces 10s between NEWNYMs + + +class TorController: + """Async client for the Tor control protocol. + + Supports password, cookie, and bare authentication. Provides NEWNYM + signaling (new circuit) on demand or on a periodic timer. + """ + + def __init__( + self, + host: str = "127.0.0.1", + port: int = 9051, + password: str = "", + cookie_file: str = "", + newnym_interval: float = 0.0, + ) -> None: + self._host = host + self._port = port + self._password = password + self._cookie_file = cookie_file + self._newnym_interval = newnym_interval + self._reader: asyncio.StreamReader | None = None + self._writer: asyncio.StreamWriter | None = None + self._last_newnym: float = 0.0 + self._stop = asyncio.Event() + self._tasks: list[asyncio.Task] = [] + self._lock = asyncio.Lock() + + # -- properties ---------------------------------------------------------- + + @property + def connected(self) -> bool: + """True if the control connection is open.""" + return self._writer is not None and not self._writer.is_closing() + + @property + def last_newnym(self) -> float: + """Monotonic timestamp of the last successful NEWNYM (0 if never).""" + return self._last_newnym + + @property + def newnym_interval(self) -> float: + """Periodic NEWNYM interval in seconds (0 = manual only).""" + return self._newnym_interval + + # -- lifecycle ----------------------------------------------------------- + + async def start(self) -> None: + """Connect, authenticate, and start optional newnym loop.""" + await self._connect() + if self._newnym_interval > 0: + self._tasks.append(asyncio.create_task(self._newnym_loop())) + + async def stop(self) -> None: + """Cancel tasks and close the connection.""" + self._stop.set() + for task in self._tasks: + task.cancel() + for task in self._tasks: + try: + await task + except asyncio.CancelledError: + pass + self._tasks.clear() + self._close() + + # -- public commands ----------------------------------------------------- + + async def newnym(self) -> bool: + """Send SIGNAL NEWNYM with client-side 10s rate limit. + + Reconnects automatically if the connection was lost. + Returns True on success, False on rate-limit or failure. + """ + now = time.monotonic() + if self._last_newnym and (now - self._last_newnym) < _NEWNYM_MIN_INTERVAL: + return False + + async with self._lock: + try: + if not self.connected: + await self._connect() + code, _ = await self._command("SIGNAL NEWNYM") + if code == 250: + self._last_newnym = time.monotonic() + logger.debug("tor: NEWNYM sent") + return True + logger.warning("tor: NEWNYM failed: %d", code) + return False + except (ConnectionError, OSError, TimeoutError) as e: + logger.warning("tor: NEWNYM error: %s", e) + self._close() + return False + + async def get_info(self, keyword: str) -> str | None: + """Send GETINFO and return the response value, or None on error.""" + async with self._lock: + try: + if not self.connected: + await self._connect() + code, lines = await self._command(f"GETINFO {keyword}") + if code == 250 and lines: + # response format: "keyword=value" + for line in lines: + if "=" in line: + return line.split("=", 1)[1] + return None + except (ConnectionError, OSError, TimeoutError): + self._close() + return None + + # -- internals ----------------------------------------------------------- + + async def _connect(self) -> None: + """Open TCP connection and authenticate.""" + self._close() + self._reader, self._writer = await asyncio.wait_for( + asyncio.open_connection(self._host, self._port), + timeout=10.0, + ) + await self._authenticate() + logger.info("tor: connected to %s:%d", self._host, self._port) + + async def _authenticate(self) -> None: + """Send AUTHENTICATE with configured credentials.""" + if self._cookie_file: + try: + with open(self._cookie_file, "rb") as f: + cookie = f.read().hex() + cmd = f"AUTHENTICATE {cookie}" + except OSError as e: + self._close() + raise ConnectionError(f"cannot read cookie file: {e}") from e + elif self._password: + cmd = f'AUTHENTICATE "{self._password}"' + else: + cmd = "AUTHENTICATE" + + code, _ = await self._command(cmd) + if code != 250: + self._close() + raise ConnectionError(f"tor auth failed: {code}") + + async def _command(self, cmd: str) -> tuple[int, list[str]]: + """Send a command and read the multi-line response. + + Returns (status_code, [response_lines]). + """ + if not self._writer or not self._reader: + raise ConnectionError("not connected") + + self._writer.write(f"{cmd}\r\n".encode()) + await self._writer.drain() + + lines: list[str] = [] + while True: + raw = await asyncio.wait_for(self._reader.readline(), timeout=10.0) + if not raw: + raise ConnectionError("connection closed") + line = raw.decode("ascii", errors="replace").rstrip("\r\n") + if len(line) < 4: + raise ConnectionError(f"malformed response: {line!r}") + code = int(line[:3]) + sep = line[3] + text = line[4:] + lines.append(text) + if sep == " ": + return code, lines + # sep == '-' means continuation + + def _close(self) -> None: + """Close TCP connection silently.""" + if self._writer: + try: + self._writer.close() + except OSError: + pass + self._writer = None + self._reader = None + + async def _newnym_loop(self) -> None: + """Periodic NEWNYM on configured interval.""" + while not self._stop.is_set(): + try: + await asyncio.wait_for( + self._stop.wait(), + timeout=self._newnym_interval, + ) + except TimeoutError: + pass + if not self._stop.is_set(): + await self.newnym()