feat: Tor control port client with NEWNYM support
Async TCP client for the Tor control protocol (port 9051). Supports password, cookie, and bare authentication. Provides NEWNYM signaling with client-side 10s rate limiting and optional periodic timer. Auto-reconnects on disconnect. Adds TorConfig dataclass and YAML parsing to config module. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -53,6 +53,17 @@ class ProxyPoolConfig:
|
||||
report_url: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class TorConfig:
|
||||
"""Tor control port configuration."""
|
||||
|
||||
control_host: str = "127.0.0.1"
|
||||
control_port: int = 9051
|
||||
password: str = ""
|
||||
cookie_file: str = ""
|
||||
newnym_interval: float = 0.0 # 0 = manual only
|
||||
|
||||
|
||||
@dataclass
|
||||
class Config:
|
||||
"""Server configuration."""
|
||||
@@ -69,6 +80,7 @@ class Config:
|
||||
api_host: str = ""
|
||||
api_port: int = 0
|
||||
proxy_pool: ProxyPoolConfig | None = None
|
||||
tor: TorConfig | None = None
|
||||
config_file: str = ""
|
||||
|
||||
|
||||
@@ -218,4 +230,14 @@ def load_config(path: str | Path) -> Config:
|
||||
refresh=refresh,
|
||||
)
|
||||
|
||||
if "tor" in raw:
|
||||
tor_raw = raw["tor"]
|
||||
config.tor = TorConfig(
|
||||
control_host=tor_raw.get("control_host", "127.0.0.1"),
|
||||
control_port=int(tor_raw.get("control_port", 9051)),
|
||||
password=tor_raw.get("password", ""),
|
||||
cookie_file=tor_raw.get("cookie_file", ""),
|
||||
newnym_interval=float(tor_raw.get("newnym_interval", 0)),
|
||||
)
|
||||
|
||||
return config
|
||||
|
||||
204
src/s5p/tor.py
Normal file
204
src/s5p/tor.py
Normal file
@@ -0,0 +1,204 @@
|
||||
"""Tor control port client with NEWNYM support."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
|
||||
logger = logging.getLogger("s5p")
|
||||
|
||||
_NEWNYM_MIN_INTERVAL = 10.0 # Tor enforces 10s between NEWNYMs
|
||||
|
||||
|
||||
class TorController:
|
||||
"""Async client for the Tor control protocol.
|
||||
|
||||
Supports password, cookie, and bare authentication. Provides NEWNYM
|
||||
signaling (new circuit) on demand or on a periodic timer.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str = "127.0.0.1",
|
||||
port: int = 9051,
|
||||
password: str = "",
|
||||
cookie_file: str = "",
|
||||
newnym_interval: float = 0.0,
|
||||
) -> None:
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._password = password
|
||||
self._cookie_file = cookie_file
|
||||
self._newnym_interval = newnym_interval
|
||||
self._reader: asyncio.StreamReader | None = None
|
||||
self._writer: asyncio.StreamWriter | None = None
|
||||
self._last_newnym: float = 0.0
|
||||
self._stop = asyncio.Event()
|
||||
self._tasks: list[asyncio.Task] = []
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
# -- properties ----------------------------------------------------------
|
||||
|
||||
@property
|
||||
def connected(self) -> bool:
|
||||
"""True if the control connection is open."""
|
||||
return self._writer is not None and not self._writer.is_closing()
|
||||
|
||||
@property
|
||||
def last_newnym(self) -> float:
|
||||
"""Monotonic timestamp of the last successful NEWNYM (0 if never)."""
|
||||
return self._last_newnym
|
||||
|
||||
@property
|
||||
def newnym_interval(self) -> float:
|
||||
"""Periodic NEWNYM interval in seconds (0 = manual only)."""
|
||||
return self._newnym_interval
|
||||
|
||||
# -- lifecycle -----------------------------------------------------------
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Connect, authenticate, and start optional newnym loop."""
|
||||
await self._connect()
|
||||
if self._newnym_interval > 0:
|
||||
self._tasks.append(asyncio.create_task(self._newnym_loop()))
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Cancel tasks and close the connection."""
|
||||
self._stop.set()
|
||||
for task in self._tasks:
|
||||
task.cancel()
|
||||
for task in self._tasks:
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._tasks.clear()
|
||||
self._close()
|
||||
|
||||
# -- public commands -----------------------------------------------------
|
||||
|
||||
async def newnym(self) -> bool:
|
||||
"""Send SIGNAL NEWNYM with client-side 10s rate limit.
|
||||
|
||||
Reconnects automatically if the connection was lost.
|
||||
Returns True on success, False on rate-limit or failure.
|
||||
"""
|
||||
now = time.monotonic()
|
||||
if self._last_newnym and (now - self._last_newnym) < _NEWNYM_MIN_INTERVAL:
|
||||
return False
|
||||
|
||||
async with self._lock:
|
||||
try:
|
||||
if not self.connected:
|
||||
await self._connect()
|
||||
code, _ = await self._command("SIGNAL NEWNYM")
|
||||
if code == 250:
|
||||
self._last_newnym = time.monotonic()
|
||||
logger.debug("tor: NEWNYM sent")
|
||||
return True
|
||||
logger.warning("tor: NEWNYM failed: %d", code)
|
||||
return False
|
||||
except (ConnectionError, OSError, TimeoutError) as e:
|
||||
logger.warning("tor: NEWNYM error: %s", e)
|
||||
self._close()
|
||||
return False
|
||||
|
||||
async def get_info(self, keyword: str) -> str | None:
|
||||
"""Send GETINFO and return the response value, or None on error."""
|
||||
async with self._lock:
|
||||
try:
|
||||
if not self.connected:
|
||||
await self._connect()
|
||||
code, lines = await self._command(f"GETINFO {keyword}")
|
||||
if code == 250 and lines:
|
||||
# response format: "keyword=value"
|
||||
for line in lines:
|
||||
if "=" in line:
|
||||
return line.split("=", 1)[1]
|
||||
return None
|
||||
except (ConnectionError, OSError, TimeoutError):
|
||||
self._close()
|
||||
return None
|
||||
|
||||
# -- internals -----------------------------------------------------------
|
||||
|
||||
async def _connect(self) -> None:
|
||||
"""Open TCP connection and authenticate."""
|
||||
self._close()
|
||||
self._reader, self._writer = await asyncio.wait_for(
|
||||
asyncio.open_connection(self._host, self._port),
|
||||
timeout=10.0,
|
||||
)
|
||||
await self._authenticate()
|
||||
logger.info("tor: connected to %s:%d", self._host, self._port)
|
||||
|
||||
async def _authenticate(self) -> None:
|
||||
"""Send AUTHENTICATE with configured credentials."""
|
||||
if self._cookie_file:
|
||||
try:
|
||||
with open(self._cookie_file, "rb") as f:
|
||||
cookie = f.read().hex()
|
||||
cmd = f"AUTHENTICATE {cookie}"
|
||||
except OSError as e:
|
||||
self._close()
|
||||
raise ConnectionError(f"cannot read cookie file: {e}") from e
|
||||
elif self._password:
|
||||
cmd = f'AUTHENTICATE "{self._password}"'
|
||||
else:
|
||||
cmd = "AUTHENTICATE"
|
||||
|
||||
code, _ = await self._command(cmd)
|
||||
if code != 250:
|
||||
self._close()
|
||||
raise ConnectionError(f"tor auth failed: {code}")
|
||||
|
||||
async def _command(self, cmd: str) -> tuple[int, list[str]]:
|
||||
"""Send a command and read the multi-line response.
|
||||
|
||||
Returns (status_code, [response_lines]).
|
||||
"""
|
||||
if not self._writer or not self._reader:
|
||||
raise ConnectionError("not connected")
|
||||
|
||||
self._writer.write(f"{cmd}\r\n".encode())
|
||||
await self._writer.drain()
|
||||
|
||||
lines: list[str] = []
|
||||
while True:
|
||||
raw = await asyncio.wait_for(self._reader.readline(), timeout=10.0)
|
||||
if not raw:
|
||||
raise ConnectionError("connection closed")
|
||||
line = raw.decode("ascii", errors="replace").rstrip("\r\n")
|
||||
if len(line) < 4:
|
||||
raise ConnectionError(f"malformed response: {line!r}")
|
||||
code = int(line[:3])
|
||||
sep = line[3]
|
||||
text = line[4:]
|
||||
lines.append(text)
|
||||
if sep == " ":
|
||||
return code, lines
|
||||
# sep == '-' means continuation
|
||||
|
||||
def _close(self) -> None:
|
||||
"""Close TCP connection silently."""
|
||||
if self._writer:
|
||||
try:
|
||||
self._writer.close()
|
||||
except OSError:
|
||||
pass
|
||||
self._writer = None
|
||||
self._reader = None
|
||||
|
||||
async def _newnym_loop(self) -> None:
|
||||
"""Periodic NEWNYM on configured interval."""
|
||||
while not self._stop.is_set():
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
self._stop.wait(),
|
||||
timeout=self._newnym_interval,
|
||||
)
|
||||
except TimeoutError:
|
||||
pass
|
||||
if not self._stop.is_set():
|
||||
await self.newnym()
|
||||
Reference in New Issue
Block a user