Covers: password/cookie/bare auth, auth failure, connect failure, NEWNYM success/rate-limiting/reconnect, GETINFO multi-line parsing, start/stop lifecycle, GET /tor status, POST /tor/newnym dispatch, and TorConfig YAML parsing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
259 lines
8.1 KiB
Python
259 lines
8.1 KiB
Python
"""Tests for the Tor control port client."""
|
|
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, MagicMock, mock_open, patch
|
|
|
|
import pytest
|
|
|
|
from s5p.tor import TorController
|
|
|
|
# -- helpers -----------------------------------------------------------------
|
|
|
|
|
|
def _mock_reader(responses: list[bytes]) -> AsyncMock:
|
|
"""Create a mock StreamReader that yields canned lines."""
|
|
reader = AsyncMock(spec=asyncio.StreamReader)
|
|
reader.readline = AsyncMock(side_effect=responses)
|
|
return reader
|
|
|
|
|
|
def _mock_writer() -> MagicMock:
|
|
"""Create a mock StreamWriter."""
|
|
writer = MagicMock(spec=asyncio.StreamWriter)
|
|
writer.write = MagicMock()
|
|
writer.drain = AsyncMock()
|
|
writer.close = MagicMock()
|
|
writer.wait_closed = AsyncMock()
|
|
writer.is_closing = MagicMock(return_value=False)
|
|
return writer
|
|
|
|
|
|
# -- authentication ----------------------------------------------------------
|
|
|
|
|
|
class TestAuthentication:
|
|
"""Test Tor control port authentication modes."""
|
|
|
|
def test_password_auth(self):
|
|
reader = _mock_reader([b"250 OK\r\n"])
|
|
writer = _mock_writer()
|
|
|
|
async def run():
|
|
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
|
tc = TorController(password="secret")
|
|
await tc._connect()
|
|
# verify AUTHENTICATE command was sent with password
|
|
calls = writer.write.call_args_list
|
|
assert any(b'AUTHENTICATE "secret"' in c[0][0] for c in calls)
|
|
tc._close()
|
|
|
|
asyncio.run(run())
|
|
|
|
def test_cookie_auth(self):
|
|
reader = _mock_reader([b"250 OK\r\n"])
|
|
writer = _mock_writer()
|
|
cookie_data = b"\xde\xad\xbe\xef"
|
|
|
|
async def run():
|
|
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
|
with patch("builtins.open", mock_open(read_data=cookie_data)):
|
|
tc = TorController(cookie_file="/var/run/tor/control.authcookie")
|
|
await tc._connect()
|
|
calls = writer.write.call_args_list
|
|
assert any(b"AUTHENTICATE deadbeef" in c[0][0] for c in calls)
|
|
tc._close()
|
|
|
|
asyncio.run(run())
|
|
|
|
def test_bare_auth(self):
|
|
reader = _mock_reader([b"250 OK\r\n"])
|
|
writer = _mock_writer()
|
|
|
|
async def run():
|
|
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
|
tc = TorController()
|
|
await tc._connect()
|
|
calls = writer.write.call_args_list
|
|
assert any(c[0][0] == b"AUTHENTICATE\r\n" for c in calls)
|
|
tc._close()
|
|
|
|
asyncio.run(run())
|
|
|
|
def test_auth_failure(self):
|
|
reader = _mock_reader([b"515 Bad authentication\r\n"])
|
|
writer = _mock_writer()
|
|
|
|
async def run():
|
|
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
|
tc = TorController(password="wrong")
|
|
with pytest.raises(ConnectionError, match="auth failed"):
|
|
await tc._connect()
|
|
|
|
asyncio.run(run())
|
|
|
|
def test_connect_failure(self):
|
|
async def run():
|
|
with patch("asyncio.open_connection", side_effect=OSError("refused")):
|
|
tc = TorController()
|
|
with pytest.raises(OSError, match="refused"):
|
|
await tc._connect()
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
# -- NEWNYM ------------------------------------------------------------------
|
|
|
|
|
|
class TestNewnym:
|
|
"""Test NEWNYM signaling."""
|
|
|
|
def test_newnym_success(self):
|
|
# auth response + newnym response
|
|
reader = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"])
|
|
writer = _mock_writer()
|
|
|
|
async def run():
|
|
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
|
tc = TorController()
|
|
await tc._connect()
|
|
ok = await tc.newnym()
|
|
assert ok is True
|
|
assert tc.last_newnym > 0
|
|
tc._close()
|
|
|
|
asyncio.run(run())
|
|
|
|
def test_newnym_rate_limited(self):
|
|
# auth + first newnym
|
|
reader = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"])
|
|
writer = _mock_writer()
|
|
|
|
async def run():
|
|
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
|
tc = TorController()
|
|
await tc._connect()
|
|
ok = await tc.newnym()
|
|
assert ok is True
|
|
# immediate second call should be rate-limited
|
|
ok2 = await tc.newnym()
|
|
assert ok2 is False
|
|
tc._close()
|
|
|
|
asyncio.run(run())
|
|
|
|
def test_newnym_reconnects_on_disconnect(self):
|
|
# first connect auth, then reconnect auth + newnym
|
|
reader1 = _mock_reader([b"250 OK\r\n"])
|
|
writer1 = _mock_writer()
|
|
reader2 = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"])
|
|
writer2 = _mock_writer()
|
|
|
|
call_count = 0
|
|
|
|
async def fake_connect(*args, **kwargs):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count == 1:
|
|
return reader1, writer1
|
|
return reader2, writer2
|
|
|
|
async def run():
|
|
with patch("asyncio.open_connection", side_effect=fake_connect):
|
|
tc = TorController()
|
|
await tc._connect()
|
|
# simulate disconnect
|
|
tc._close()
|
|
assert not tc.connected
|
|
# newnym should reconnect
|
|
ok = await tc.newnym()
|
|
assert ok is True
|
|
tc._close()
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
# -- GETINFO -----------------------------------------------------------------
|
|
|
|
|
|
class TestGetInfo:
|
|
"""Test GETINFO command."""
|
|
|
|
def test_getinfo_version(self):
|
|
# auth + getinfo multi-line response
|
|
reader = _mock_reader([
|
|
b"250 OK\r\n",
|
|
b"250-version=0.4.8.12\r\n",
|
|
b"250 OK\r\n",
|
|
])
|
|
writer = _mock_writer()
|
|
|
|
async def run():
|
|
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
|
tc = TorController()
|
|
await tc._connect()
|
|
version = await tc.get_info("version")
|
|
assert version == "0.4.8.12"
|
|
tc._close()
|
|
|
|
asyncio.run(run())
|
|
|
|
def test_getinfo_not_connected(self):
|
|
# auth for reconnect + getinfo
|
|
reader = _mock_reader([
|
|
b"250 OK\r\n",
|
|
b"250-traffic/read=12345\r\n",
|
|
b"250 OK\r\n",
|
|
])
|
|
writer = _mock_writer()
|
|
|
|
async def run():
|
|
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
|
tc = TorController()
|
|
# not connected, should auto-connect
|
|
result = await tc.get_info("traffic/read")
|
|
assert result == "12345"
|
|
tc._close()
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
# -- lifecycle ---------------------------------------------------------------
|
|
|
|
|
|
class TestLifecycle:
|
|
"""Test start/stop lifecycle."""
|
|
|
|
def test_start_stop(self):
|
|
reader = _mock_reader([b"250 OK\r\n"])
|
|
writer = _mock_writer()
|
|
|
|
async def run():
|
|
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
|
tc = TorController()
|
|
await tc.start()
|
|
assert tc.connected
|
|
await tc.stop()
|
|
assert not tc.connected
|
|
|
|
asyncio.run(run())
|
|
|
|
def test_start_with_newnym_loop(self):
|
|
reader = _mock_reader([b"250 OK\r\n"])
|
|
writer = _mock_writer()
|
|
|
|
async def run():
|
|
with patch("asyncio.open_connection", return_value=(reader, writer)):
|
|
tc = TorController(newnym_interval=60.0)
|
|
await tc.start()
|
|
assert len(tc._tasks) == 1
|
|
await tc.stop()
|
|
assert len(tc._tasks) == 0
|
|
|
|
asyncio.run(run())
|
|
|
|
def test_properties(self):
|
|
tc = TorController(newnym_interval=30.0)
|
|
assert not tc.connected
|
|
assert tc.last_newnym == 0.0
|
|
assert tc.newnym_interval == 30.0
|