test: Tor controller and API endpoint tests
Covers: password/cookie/bare auth, auth failure, connect failure, NEWNYM success/rate-limiting/reconnect, GETINFO multi-line parsing, start/stop lifecycle, GET /tor status, POST /tor/newnym dispatch, and TorConfig YAML parsing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -9,6 +9,8 @@ from s5p.api import (
|
||||
_handle_metrics,
|
||||
_handle_pool,
|
||||
_handle_status,
|
||||
_handle_tor,
|
||||
_handle_tor_newnym,
|
||||
_json_response,
|
||||
_parse_request,
|
||||
_route,
|
||||
@@ -80,6 +82,7 @@ class TestJsonResponse:
|
||||
def _make_ctx(
|
||||
config: Config | None = None,
|
||||
pool: MagicMock | None = None,
|
||||
tor: MagicMock | None = None,
|
||||
) -> dict:
|
||||
"""Build a mock context dict."""
|
||||
return {
|
||||
@@ -87,6 +90,7 @@ def _make_ctx(
|
||||
"metrics": Metrics(),
|
||||
"pool": pool,
|
||||
"hop_pool": None,
|
||||
"tor": tor,
|
||||
}
|
||||
|
||||
|
||||
@@ -284,3 +288,82 @@ class TestRouting:
|
||||
status, body = asyncio.run(_route("GET", "/reload", ctx))
|
||||
assert status == 405
|
||||
assert "POST" in body["error"]
|
||||
|
||||
def test_get_tor(self):
|
||||
ctx = _make_ctx()
|
||||
status, body = asyncio.run(_route("GET", "/tor", ctx))
|
||||
assert status == 200
|
||||
assert body == {"enabled": False}
|
||||
|
||||
def test_post_tor_newnym(self):
|
||||
tor = MagicMock()
|
||||
tor.newnym = AsyncMock(return_value=True)
|
||||
ctx = _make_ctx(tor=tor)
|
||||
status, body = asyncio.run(_route("POST", "/tor/newnym", ctx))
|
||||
assert status == 200
|
||||
assert body == {"ok": True}
|
||||
|
||||
|
||||
# -- Tor endpoints ----------------------------------------------------------
|
||||
|
||||
|
||||
class TestHandleTor:
|
||||
"""Test GET /tor handler."""
|
||||
|
||||
def test_disabled(self):
|
||||
ctx = _make_ctx()
|
||||
status, body = _handle_tor(ctx)
|
||||
assert status == 200
|
||||
assert body == {"enabled": False}
|
||||
|
||||
def test_connected(self):
|
||||
tor = MagicMock()
|
||||
tor.connected = True
|
||||
tor.last_newnym = 0.0
|
||||
tor.newnym_interval = 60.0
|
||||
ctx = _make_ctx(tor=tor)
|
||||
status, body = _handle_tor(ctx)
|
||||
assert status == 200
|
||||
assert body["enabled"] is True
|
||||
assert body["connected"] is True
|
||||
assert body["last_newnym"] is None
|
||||
assert body["newnym_interval"] == 60.0
|
||||
|
||||
def test_with_last_newnym(self):
|
||||
import time
|
||||
tor = MagicMock()
|
||||
tor.connected = True
|
||||
tor.last_newnym = time.monotonic() - 42.0
|
||||
tor.newnym_interval = 0.0
|
||||
ctx = _make_ctx(tor=tor)
|
||||
status, body = _handle_tor(ctx)
|
||||
assert status == 200
|
||||
assert body["last_newnym"] is not None
|
||||
assert body["last_newnym"] >= 42.0
|
||||
|
||||
|
||||
class TestHandleTorNewnym:
|
||||
"""Test POST /tor/newnym handler."""
|
||||
|
||||
def test_success(self):
|
||||
tor = MagicMock()
|
||||
tor.newnym = AsyncMock(return_value=True)
|
||||
ctx = _make_ctx(tor=tor)
|
||||
status, body = asyncio.run(_handle_tor_newnym(ctx))
|
||||
assert status == 200
|
||||
assert body == {"ok": True}
|
||||
|
||||
def test_rate_limited(self):
|
||||
tor = MagicMock()
|
||||
tor.newnym = AsyncMock(return_value=False)
|
||||
ctx = _make_ctx(tor=tor)
|
||||
status, body = asyncio.run(_handle_tor_newnym(ctx))
|
||||
assert status == 200
|
||||
assert body["ok"] is False
|
||||
assert "reason" in body
|
||||
|
||||
def test_not_configured(self):
|
||||
ctx = _make_ctx()
|
||||
status, body = asyncio.run(_handle_tor_newnym(ctx))
|
||||
assert status == 400
|
||||
assert "error" in body
|
||||
|
||||
@@ -141,3 +141,37 @@ class TestConfig:
|
||||
c = load_config(cfg_file)
|
||||
assert c.pool_size == 16
|
||||
assert c.pool_max_idle == 45.0
|
||||
|
||||
def test_tor_config_from_yaml(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text(
|
||||
"tor:\n"
|
||||
" control_host: 10.0.0.1\n"
|
||||
" control_port: 9151\n"
|
||||
" password: secret\n"
|
||||
" cookie_file: /var/run/tor/cookie\n"
|
||||
" newnym_interval: 60\n"
|
||||
)
|
||||
c = load_config(cfg_file)
|
||||
assert c.tor is not None
|
||||
assert c.tor.control_host == "10.0.0.1"
|
||||
assert c.tor.control_port == 9151
|
||||
assert c.tor.password == "secret"
|
||||
assert c.tor.cookie_file == "/var/run/tor/cookie"
|
||||
assert c.tor.newnym_interval == 60.0
|
||||
|
||||
def test_tor_config_defaults(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text("tor:\n password: test\n")
|
||||
c = load_config(cfg_file)
|
||||
assert c.tor is not None
|
||||
assert c.tor.control_host == "127.0.0.1"
|
||||
assert c.tor.control_port == 9051
|
||||
assert c.tor.cookie_file == ""
|
||||
assert c.tor.newnym_interval == 0.0
|
||||
|
||||
def test_no_tor_config(self, tmp_path):
|
||||
cfg_file = tmp_path / "test.yaml"
|
||||
cfg_file.write_text("listen: 1080\n")
|
||||
c = load_config(cfg_file)
|
||||
assert c.tor is None
|
||||
|
||||
258
tests/test_tor.py
Normal file
258
tests/test_tor.py
Normal file
@@ -0,0 +1,258 @@
|
||||
"""Tests for the Tor control port client."""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, mock_open, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from s5p.tor import TorController
|
||||
|
||||
# -- helpers -----------------------------------------------------------------
|
||||
|
||||
|
||||
def _mock_reader(responses: list[bytes]) -> AsyncMock:
|
||||
"""Create a mock StreamReader that yields canned lines."""
|
||||
reader = AsyncMock(spec=asyncio.StreamReader)
|
||||
reader.readline = AsyncMock(side_effect=responses)
|
||||
return reader
|
||||
|
||||
|
||||
def _mock_writer() -> MagicMock:
|
||||
"""Create a mock StreamWriter."""
|
||||
writer = MagicMock(spec=asyncio.StreamWriter)
|
||||
writer.write = MagicMock()
|
||||
writer.drain = AsyncMock()
|
||||
writer.close = MagicMock()
|
||||
writer.wait_closed = AsyncMock()
|
||||
writer.is_closing = MagicMock(return_value=False)
|
||||
return writer
|
||||
|
||||
|
||||
# -- authentication ----------------------------------------------------------
|
||||
|
||||
|
||||
class TestAuthentication:
|
||||
"""Test Tor control port authentication modes."""
|
||||
|
||||
def test_password_auth(self):
|
||||
reader = _mock_reader([b"250 OK\r\n"])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController(password="secret")
|
||||
await tc._connect()
|
||||
# verify AUTHENTICATE command was sent with password
|
||||
calls = writer.write.call_args_list
|
||||
assert any(b'AUTHENTICATE "secret"' in c[0][0] for c in calls)
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_cookie_auth(self):
|
||||
reader = _mock_reader([b"250 OK\r\n"])
|
||||
writer = _mock_writer()
|
||||
cookie_data = b"\xde\xad\xbe\xef"
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
with patch("builtins.open", mock_open(read_data=cookie_data)):
|
||||
tc = TorController(cookie_file="/var/run/tor/control.authcookie")
|
||||
await tc._connect()
|
||||
calls = writer.write.call_args_list
|
||||
assert any(b"AUTHENTICATE deadbeef" in c[0][0] for c in calls)
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_bare_auth(self):
|
||||
reader = _mock_reader([b"250 OK\r\n"])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController()
|
||||
await tc._connect()
|
||||
calls = writer.write.call_args_list
|
||||
assert any(c[0][0] == b"AUTHENTICATE\r\n" for c in calls)
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_auth_failure(self):
|
||||
reader = _mock_reader([b"515 Bad authentication\r\n"])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController(password="wrong")
|
||||
with pytest.raises(ConnectionError, match="auth failed"):
|
||||
await tc._connect()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_connect_failure(self):
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", side_effect=OSError("refused")):
|
||||
tc = TorController()
|
||||
with pytest.raises(OSError, match="refused"):
|
||||
await tc._connect()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
|
||||
# -- NEWNYM ------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestNewnym:
|
||||
"""Test NEWNYM signaling."""
|
||||
|
||||
def test_newnym_success(self):
|
||||
# auth response + newnym response
|
||||
reader = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController()
|
||||
await tc._connect()
|
||||
ok = await tc.newnym()
|
||||
assert ok is True
|
||||
assert tc.last_newnym > 0
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_newnym_rate_limited(self):
|
||||
# auth + first newnym
|
||||
reader = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController()
|
||||
await tc._connect()
|
||||
ok = await tc.newnym()
|
||||
assert ok is True
|
||||
# immediate second call should be rate-limited
|
||||
ok2 = await tc.newnym()
|
||||
assert ok2 is False
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_newnym_reconnects_on_disconnect(self):
|
||||
# first connect auth, then reconnect auth + newnym
|
||||
reader1 = _mock_reader([b"250 OK\r\n"])
|
||||
writer1 = _mock_writer()
|
||||
reader2 = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"])
|
||||
writer2 = _mock_writer()
|
||||
|
||||
call_count = 0
|
||||
|
||||
async def fake_connect(*args, **kwargs):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count == 1:
|
||||
return reader1, writer1
|
||||
return reader2, writer2
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", side_effect=fake_connect):
|
||||
tc = TorController()
|
||||
await tc._connect()
|
||||
# simulate disconnect
|
||||
tc._close()
|
||||
assert not tc.connected
|
||||
# newnym should reconnect
|
||||
ok = await tc.newnym()
|
||||
assert ok is True
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
|
||||
# -- GETINFO -----------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetInfo:
|
||||
"""Test GETINFO command."""
|
||||
|
||||
def test_getinfo_version(self):
|
||||
# auth + getinfo multi-line response
|
||||
reader = _mock_reader([
|
||||
b"250 OK\r\n",
|
||||
b"250-version=0.4.8.12\r\n",
|
||||
b"250 OK\r\n",
|
||||
])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController()
|
||||
await tc._connect()
|
||||
version = await tc.get_info("version")
|
||||
assert version == "0.4.8.12"
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_getinfo_not_connected(self):
|
||||
# auth for reconnect + getinfo
|
||||
reader = _mock_reader([
|
||||
b"250 OK\r\n",
|
||||
b"250-traffic/read=12345\r\n",
|
||||
b"250 OK\r\n",
|
||||
])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController()
|
||||
# not connected, should auto-connect
|
||||
result = await tc.get_info("traffic/read")
|
||||
assert result == "12345"
|
||||
tc._close()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
|
||||
# -- lifecycle ---------------------------------------------------------------
|
||||
|
||||
|
||||
class TestLifecycle:
|
||||
"""Test start/stop lifecycle."""
|
||||
|
||||
def test_start_stop(self):
|
||||
reader = _mock_reader([b"250 OK\r\n"])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController()
|
||||
await tc.start()
|
||||
assert tc.connected
|
||||
await tc.stop()
|
||||
assert not tc.connected
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_start_with_newnym_loop(self):
|
||||
reader = _mock_reader([b"250 OK\r\n"])
|
||||
writer = _mock_writer()
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
||||
tc = TorController(newnym_interval=60.0)
|
||||
await tc.start()
|
||||
assert len(tc._tasks) == 1
|
||||
await tc.stop()
|
||||
assert len(tc._tasks) == 0
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
def test_properties(self):
|
||||
tc = TorController(newnym_interval=30.0)
|
||||
assert not tc.connected
|
||||
assert tc.last_newnym == 0.0
|
||||
assert tc.newnym_interval == 30.0
|
||||
Reference in New Issue
Block a user