"""Tests for the Tor control port client.""" import asyncio from unittest.mock import AsyncMock, MagicMock, mock_open, patch import pytest from s5p.tor import TorController # -- helpers ----------------------------------------------------------------- def _mock_reader(responses: list[bytes]) -> AsyncMock: """Create a mock StreamReader that yields canned lines.""" reader = AsyncMock(spec=asyncio.StreamReader) reader.readline = AsyncMock(side_effect=responses) return reader def _mock_writer() -> MagicMock: """Create a mock StreamWriter.""" writer = MagicMock(spec=asyncio.StreamWriter) writer.write = MagicMock() writer.drain = AsyncMock() writer.close = MagicMock() writer.wait_closed = AsyncMock() writer.is_closing = MagicMock(return_value=False) return writer # -- authentication ---------------------------------------------------------- class TestAuthentication: """Test Tor control port authentication modes.""" def test_password_auth(self): reader = _mock_reader([b"250 OK\r\n"]) writer = _mock_writer() async def run(): with patch("asyncio.open_connection", return_value=(reader, writer)): tc = TorController(password="secret") await tc._connect() # verify AUTHENTICATE command was sent with password calls = writer.write.call_args_list assert any(b'AUTHENTICATE "secret"' in c[0][0] for c in calls) tc._close() asyncio.run(run()) def test_cookie_auth(self): reader = _mock_reader([b"250 OK\r\n"]) writer = _mock_writer() cookie_data = b"\xde\xad\xbe\xef" async def run(): with patch("asyncio.open_connection", return_value=(reader, writer)): with patch("builtins.open", mock_open(read_data=cookie_data)): tc = TorController(cookie_file="/var/run/tor/control.authcookie") await tc._connect() calls = writer.write.call_args_list assert any(b"AUTHENTICATE deadbeef" in c[0][0] for c in calls) tc._close() asyncio.run(run()) def test_bare_auth(self): reader = _mock_reader([b"250 OK\r\n"]) writer = _mock_writer() async def run(): with patch("asyncio.open_connection", return_value=(reader, writer)): tc = TorController() await tc._connect() calls = writer.write.call_args_list assert any(c[0][0] == b"AUTHENTICATE\r\n" for c in calls) tc._close() asyncio.run(run()) def test_auth_failure(self): reader = _mock_reader([b"515 Bad authentication\r\n"]) writer = _mock_writer() async def run(): with patch("asyncio.open_connection", return_value=(reader, writer)): tc = TorController(password="wrong") with pytest.raises(ConnectionError, match="auth failed"): await tc._connect() asyncio.run(run()) def test_connect_failure(self): async def run(): with patch("asyncio.open_connection", side_effect=OSError("refused")): tc = TorController() with pytest.raises(OSError, match="refused"): await tc._connect() asyncio.run(run()) # -- NEWNYM ------------------------------------------------------------------ class TestNewnym: """Test NEWNYM signaling.""" def test_newnym_success(self): # auth response + newnym response reader = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"]) writer = _mock_writer() async def run(): with patch("asyncio.open_connection", return_value=(reader, writer)): tc = TorController() await tc._connect() ok = await tc.newnym() assert ok is True assert tc.last_newnym > 0 tc._close() asyncio.run(run()) def test_newnym_rate_limited(self): # auth + first newnym reader = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"]) writer = _mock_writer() async def run(): with patch("asyncio.open_connection", return_value=(reader, writer)): tc = TorController() await tc._connect() ok = await tc.newnym() assert ok is True # immediate second call should be rate-limited ok2 = await tc.newnym() assert ok2 is False tc._close() asyncio.run(run()) def test_newnym_reconnects_on_disconnect(self): # first connect auth, then reconnect auth + newnym reader1 = _mock_reader([b"250 OK\r\n"]) writer1 = _mock_writer() reader2 = _mock_reader([b"250 OK\r\n", b"250 OK\r\n"]) writer2 = _mock_writer() call_count = 0 async def fake_connect(*args, **kwargs): nonlocal call_count call_count += 1 if call_count == 1: return reader1, writer1 return reader2, writer2 async def run(): with patch("asyncio.open_connection", side_effect=fake_connect): tc = TorController() await tc._connect() # simulate disconnect tc._close() assert not tc.connected # newnym should reconnect ok = await tc.newnym() assert ok is True tc._close() asyncio.run(run()) # -- GETINFO ----------------------------------------------------------------- class TestGetInfo: """Test GETINFO command.""" def test_getinfo_version(self): # auth + getinfo multi-line response reader = _mock_reader([ b"250 OK\r\n", b"250-version=0.4.8.12\r\n", b"250 OK\r\n", ]) writer = _mock_writer() async def run(): with patch("asyncio.open_connection", return_value=(reader, writer)): tc = TorController() await tc._connect() version = await tc.get_info("version") assert version == "0.4.8.12" tc._close() asyncio.run(run()) def test_getinfo_not_connected(self): # auth for reconnect + getinfo reader = _mock_reader([ b"250 OK\r\n", b"250-traffic/read=12345\r\n", b"250 OK\r\n", ]) writer = _mock_writer() async def run(): with patch("asyncio.open_connection", return_value=(reader, writer)): tc = TorController() # not connected, should auto-connect result = await tc.get_info("traffic/read") assert result == "12345" tc._close() asyncio.run(run()) # -- lifecycle --------------------------------------------------------------- class TestLifecycle: """Test start/stop lifecycle.""" def test_start_stop(self): reader = _mock_reader([b"250 OK\r\n"]) writer = _mock_writer() async def run(): with patch("asyncio.open_connection", return_value=(reader, writer)): tc = TorController() await tc.start() assert tc.connected await tc.stop() assert not tc.connected asyncio.run(run()) def test_start_with_newnym_loop(self): reader = _mock_reader([b"250 OK\r\n"]) writer = _mock_writer() async def run(): with patch("asyncio.open_connection", return_value=(reader, writer)): tc = TorController(newnym_interval=60.0) await tc.start() assert len(tc._tasks) == 1 await tc.stop() assert len(tc._tasks) == 0 asyncio.run(run()) def test_properties(self): tc = TorController(newnym_interval=30.0) assert not tc.connected assert tc.last_newnym == 0.0 assert tc.newnym_interval == 30.0