"""Tests for protocol helpers.""" import asyncio import pytest from s5p.proto import ( ProtoError, Socks5AddrType, Socks5Reply, encode_address, http_connect, socks4_connect, socks5_connect, ) # -- helpers ----------------------------------------------------------------- class _MockTransport(asyncio.Transport): """Minimal transport that captures writes and supports drain.""" def __init__(self): super().__init__() self.written = bytearray() self._closing = False def write(self, data): self.written.extend(data) def is_closing(self): return self._closing def close(self): self._closing = True def get_extra_info(self, name, default=None): return default def _make_streams(response_data: bytes): """Create mock reader/writer for protocol tests. Must be called from within a running event loop. """ reader = asyncio.StreamReader() reader.feed_data(response_data) reader.feed_eof() protocol = asyncio.StreamReaderProtocol(reader) transport = _MockTransport() protocol.connection_made(transport) writer = asyncio.StreamWriter(transport, protocol, reader, asyncio.get_running_loop()) return reader, writer def _run(coro): """Run a coroutine in a fresh event loop.""" asyncio.run(coro) # -- encode_address ---------------------------------------------------------- class TestEncodeAddress: """Test SOCKS5 address encoding.""" def test_ipv4(self): atyp, data = encode_address("127.0.0.1") assert atyp == Socks5AddrType.IPV4 assert data == b"\x7f\x00\x00\x01" def test_ipv4_zeros(self): atyp, data = encode_address("0.0.0.0") assert atyp == Socks5AddrType.IPV4 assert data == b"\x00\x00\x00\x00" def test_ipv6(self): atyp, data = encode_address("::1") assert atyp == Socks5AddrType.IPV6 assert len(data) == 16 assert data[-1] == 1 def test_ipv6_full(self): atyp, data = encode_address("2001:db8::1") assert atyp == Socks5AddrType.IPV6 assert len(data) == 16 def test_domain(self): atyp, data = encode_address("example.com") assert atyp == Socks5AddrType.DOMAIN assert data == bytes([11]) + b"example.com" def test_domain_short(self): atyp, data = encode_address("a.co") assert atyp == Socks5AddrType.DOMAIN assert data == bytes([4]) + b"a.co" def test_domain_long(self): host = "sub.domain.example.com" atyp, data = encode_address(host) assert atyp == Socks5AddrType.DOMAIN assert data[0] == len(host) assert data[1:] == host.encode("ascii") # -- socks5_connect ---------------------------------------------------------- class TestSocks5Connect: """Test SOCKS5 handshake building.""" def test_no_auth_success(self): """Successful SOCKS5 connect without auth.""" bind_addr = b"\x01\x00\x00\x00\x00\x00\x00" # IPv4 0.0.0.0:0 response = b"\x05\x00" + b"\x05\x00\x00" + bind_addr async def run(): reader, writer = _make_streams(response) await socks5_connect(reader, writer, "example.com", 80) _run(run()) def test_auth_success(self): """Successful SOCKS5 connect with username/password auth.""" bind_addr = b"\x01\x00\x00\x00\x00\x00\x00" response = b"\x05\x02" + b"\x01\x00" + b"\x05\x00\x00" + bind_addr async def run(): reader, writer = _make_streams(response) await socks5_connect(reader, writer, "example.com", 80, "user", "pass") _run(run()) def test_auth_failure(self): """SOCKS5 auth rejected by server.""" response = b"\x05\x02" + b"\x01\x01" async def run(): reader, writer = _make_streams(response) with pytest.raises(ProtoError, match="authentication failed"): await socks5_connect(reader, writer, "example.com", 80, "user", "bad") _run(run()) def test_no_acceptable_methods(self): """Server rejects all auth methods (0xFF).""" response = b"\x05\xff" async def run(): reader, writer = _make_streams(response) with pytest.raises(ProtoError, match="no acceptable"): await socks5_connect(reader, writer, "example.com", 80) _run(run()) def test_connect_refused(self): """SOCKS5 connect reply with connection refused.""" bind_addr = b"\x01\x00\x00\x00\x00\x00\x00" response = b"\x05\x00" + b"\x05\x05\x00" + bind_addr async def run(): reader, writer = _make_streams(response) with pytest.raises(ProtoError, match="connect failed"): await socks5_connect(reader, writer, "example.com", 80) _run(run()) def test_wrong_version(self): """Server responds with wrong SOCKS version.""" response = b"\x04\x00" async def run(): reader, writer = _make_streams(response) with pytest.raises(ProtoError, match="unexpected version"): await socks5_connect(reader, writer, "example.com", 80) _run(run()) def test_server_requires_auth_no_creds(self): """Server demands auth but no credentials provided.""" response = b"\x05\x02" async def run(): reader, writer = _make_streams(response) with pytest.raises(ProtoError, match="requires auth"): await socks5_connect(reader, writer, "example.com", 80) _run(run()) # -- socks4_connect ---------------------------------------------------------- class TestSocks4Connect: """Test SOCKS4/4a request building.""" def test_ip_success(self): """SOCKS4 connect with IP address.""" response = b"\x00\x5a" + b"\x00" * 6 async def run(): reader, writer = _make_streams(response) await socks4_connect(reader, writer, "1.2.3.4", 80) _run(run()) def test_domain_success(self): """SOCKS4a connect with domain name.""" response = b"\x00\x5a" + b"\x00" * 6 async def run(): reader, writer = _make_streams(response) await socks4_connect(reader, writer, "example.com", 80) _run(run()) def test_rejected(self): """SOCKS4 request rejected.""" response = b"\x00\x5b" + b"\x00" * 6 async def run(): reader, writer = _make_streams(response) with pytest.raises(ProtoError, match="rejected"): await socks4_connect(reader, writer, "1.2.3.4", 80) _run(run()) # -- http_connect ------------------------------------------------------------ class TestHttpConnect: """Test HTTP CONNECT request building.""" def test_success_200(self): """HTTP CONNECT with 200 response.""" response = b"HTTP/1.1 200 Connection Established\r\n\r\n" async def run(): reader, writer = _make_streams(response) await http_connect(reader, writer, "example.com", 443) _run(run()) def test_success_with_headers(self): """HTTP CONNECT with extra headers in response.""" response = b"HTTP/1.1 200 OK\r\nX-Proxy: test\r\n\r\n" async def run(): reader, writer = _make_streams(response) await http_connect(reader, writer, "example.com", 443) _run(run()) def test_auth_success(self): """HTTP CONNECT with proxy authentication.""" response = b"HTTP/1.1 200 OK\r\n\r\n" async def run(): reader, writer = _make_streams(response) await http_connect(reader, writer, "example.com", 443, "user", "pass") _run(run()) def test_forbidden(self): """HTTP CONNECT with 403 response.""" response = b"HTTP/1.1 403 Forbidden\r\n\r\n" async def run(): reader, writer = _make_streams(response) with pytest.raises(ProtoError, match="connect failed"): await http_connect(reader, writer, "example.com", 443) _run(run()) def test_proxy_auth_required(self): """HTTP CONNECT with 407 response.""" response = b"HTTP/1.1 407 Proxy Authentication Required\r\n\r\n" async def run(): reader, writer = _make_streams(response) with pytest.raises(ProtoError, match="connect failed"): await http_connect(reader, writer, "example.com", 443) _run(run()) def test_empty_response(self): """HTTP CONNECT with empty response.""" async def run(): reader, writer = _make_streams(b"") with pytest.raises(ProtoError, match="empty response"): await http_connect(reader, writer, "example.com", 443) _run(run()) # -- Socks5Reply enum ------------------------------------------------------- class TestSocks5Reply: """Test SOCKS5 reply code values.""" def test_succeeded(self): assert Socks5Reply.SUCCEEDED == 0x00 def test_general_failure(self): assert Socks5Reply.GENERAL_FAILURE == 0x01 def test_connection_refused(self): assert Socks5Reply.CONNECTION_REFUSED == 0x05 def test_command_not_supported(self): assert Socks5Reply.COMMAND_NOT_SUPPORTED == 0x07 def test_address_type_not_supported(self): assert Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED == 0x08 # -- ProtoError -------------------------------------------------------------- class TestProtoError: """Test ProtoError exception.""" def test_default_reply(self): err = ProtoError("test error") assert str(err) == "test error" assert err.reply == Socks5Reply.GENERAL_FAILURE def test_custom_reply(self): err = ProtoError("refused", Socks5Reply.CONNECTION_REFUSED) assert err.reply == Socks5Reply.CONNECTION_REFUSED