"""End-to-end integration tests with mock SOCKS5 proxies.""" from __future__ import annotations import asyncio import struct from s5p.config import ChainHop, ListenerConfig from s5p.proto import encode_address from s5p.server import _handle_client from .conftest import free_port, start_echo_server, start_mock_socks5 # -- helpers ----------------------------------------------------------------- async def _socks5_connect( host: str, port: int, target_host: str, target_port: int, ) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: """Connect as a SOCKS5 client, perform greeting + CONNECT.""" reader, writer = await asyncio.open_connection(host, port) # greeting: version 5, 1 method (no-auth) writer.write(b"\x05\x01\x00") await writer.drain() resp = await reader.readexactly(2) assert resp == b"\x05\x00", f"greeting failed: {resp!r}" # connect request atyp, addr_bytes = encode_address(target_host) writer.write( struct.pack("!BBB", 0x05, 0x01, 0x00) + bytes([atyp]) + addr_bytes + struct.pack("!H", target_port) ) await writer.drain() # read reply rep_header = await reader.readexactly(3) atyp_resp = (await reader.readexactly(1))[0] if atyp_resp == 0x01: await reader.readexactly(4) elif atyp_resp == 0x03: length = (await reader.readexactly(1))[0] await reader.readexactly(length) elif atyp_resp == 0x04: await reader.readexactly(16) await reader.readexactly(2) # port if rep_header[1] != 0x00: writer.close() await writer.wait_closed() raise ConnectionError(f"SOCKS5 reply={rep_header[1]:#x}") return reader, writer async def _socks5_connect_auth( host: str, port: int, target_host: str, target_port: int, username: str, password: str, ) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: """Connect as a SOCKS5 client with username/password auth (RFC 1929).""" reader, writer = await asyncio.open_connection(host, port) # greeting: version 5, 1 method (user/pass) writer.write(b"\x05\x01\x02") await writer.drain() resp = await reader.readexactly(2) assert resp == b"\x05\x02", f"greeting failed: {resp!r}" # subnegotiation uname = username.encode("utf-8") passwd = password.encode("utf-8") writer.write( b"\x01" + bytes([len(uname)]) + uname + bytes([len(passwd)]) + passwd ) await writer.drain() auth_resp = await reader.readexactly(2) if auth_resp[1] != 0x00: writer.close() await writer.wait_closed() raise ConnectionError(f"auth failed: status={auth_resp[1]:#x}") # connect request atyp, addr_bytes = encode_address(target_host) writer.write( struct.pack("!BBB", 0x05, 0x01, 0x00) + bytes([atyp]) + addr_bytes + struct.pack("!H", target_port) ) await writer.drain() # read reply rep_header = await reader.readexactly(3) atyp_resp = (await reader.readexactly(1))[0] if atyp_resp == 0x01: await reader.readexactly(4) elif atyp_resp == 0x03: length = (await reader.readexactly(1))[0] await reader.readexactly(length) elif atyp_resp == 0x04: await reader.readexactly(16) await reader.readexactly(2) # port if rep_header[1] != 0x00: writer.close() await writer.wait_closed() raise ConnectionError(f"SOCKS5 reply={rep_header[1]:#x}") return reader, writer async def _close_server(srv: asyncio.Server) -> None: """Close a server and wait.""" srv.close() await srv.wait_closed() # -- tests ------------------------------------------------------------------- class TestDirectNoChain: """Client -> s5p -> echo (empty chain).""" def test_echo(self): async def _run(): servers = [] try: echo_host, echo_port, echo_srv = await start_echo_server() servers.append(echo_srv) listener = ListenerConfig(listen_host="127.0.0.1", listen_port=free_port()) s5p_srv = await asyncio.start_server( lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1), listener.listen_host, listener.listen_port, ) servers.append(s5p_srv) await s5p_srv.start_serving() reader, writer = await _socks5_connect( listener.listen_host, listener.listen_port, echo_host, echo_port, ) writer.write(b"hello direct") await writer.drain() data = await asyncio.wait_for(reader.read(4096), timeout=2.0) assert data == b"hello direct" writer.close() await writer.wait_closed() finally: for s in servers: await _close_server(s) asyncio.run(_run()) class TestSingleHop: """Client -> s5p -> mock socks5 -> echo.""" def test_echo_through_one_hop(self): async def _run(): servers = [] try: echo_host, echo_port, echo_srv = await start_echo_server() servers.append(echo_srv) mock_host, mock_port, mock_srv = await start_mock_socks5() servers.append(mock_srv) listener = ListenerConfig( listen_host="127.0.0.1", listen_port=free_port(), chain=[ChainHop(proto="socks5", host=mock_host, port=mock_port)], ) s5p_srv = await asyncio.start_server( lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1), listener.listen_host, listener.listen_port, ) servers.append(s5p_srv) await s5p_srv.start_serving() reader, writer = await _socks5_connect( listener.listen_host, listener.listen_port, echo_host, echo_port, ) writer.write(b"hello one hop") await writer.drain() data = await asyncio.wait_for(reader.read(4096), timeout=2.0) assert data == b"hello one hop" writer.close() await writer.wait_closed() finally: for s in servers: await _close_server(s) asyncio.run(_run()) class TestTwoHops: """Client -> s5p -> mock1 -> mock2 -> echo.""" def test_echo_through_two_hops(self): async def _run(): servers = [] try: echo_host, echo_port, echo_srv = await start_echo_server() servers.append(echo_srv) m1_host, m1_port, m1_srv = await start_mock_socks5() servers.append(m1_srv) m2_host, m2_port, m2_srv = await start_mock_socks5() servers.append(m2_srv) listener = ListenerConfig( listen_host="127.0.0.1", listen_port=free_port(), chain=[ ChainHop(proto="socks5", host=m1_host, port=m1_port), ChainHop(proto="socks5", host=m2_host, port=m2_port), ], ) s5p_srv = await asyncio.start_server( lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1), listener.listen_host, listener.listen_port, ) servers.append(s5p_srv) await s5p_srv.start_serving() reader, writer = await _socks5_connect( listener.listen_host, listener.listen_port, echo_host, echo_port, ) writer.write(b"hello two hops") await writer.drain() data = await asyncio.wait_for(reader.read(4096), timeout=2.0) assert data == b"hello two hops" writer.close() await writer.wait_closed() finally: for s in servers: await _close_server(s) asyncio.run(_run()) class TestConnectionRefused: """Dead hop returns SOCKS5 error to client.""" def test_refused(self): async def _run(): servers = [] try: # use a port with nothing listening dead_port = free_port() listener = ListenerConfig( listen_host="127.0.0.1", listen_port=free_port(), chain=[ChainHop(proto="socks5", host="127.0.0.1", port=dead_port)], ) s5p_srv = await asyncio.start_server( lambda r, w: _handle_client(r, w, listener, timeout=3.0, retries=1), listener.listen_host, listener.listen_port, ) servers.append(s5p_srv) await s5p_srv.start_serving() reader, writer = await asyncio.open_connection( listener.listen_host, listener.listen_port, ) # greeting writer.write(b"\x05\x01\x00") await writer.drain() resp = await reader.readexactly(2) assert resp == b"\x05\x00" # connect to a dummy target atyp, addr_bytes = encode_address("127.0.0.1") writer.write( struct.pack("!BBB", 0x05, 0x01, 0x00) + bytes([atyp]) + addr_bytes + struct.pack("!H", 9999) ) await writer.drain() # should get error reply (non-zero rep field) rep = await asyncio.wait_for(reader.read(4096), timeout=5.0) assert len(rep) >= 3 assert rep[1] != 0x00, "expected non-zero SOCKS5 reply code" writer.close() await writer.wait_closed() finally: for s in servers: await _close_server(s) asyncio.run(_run()) class TestBypassDirectConnect: """Target matches bypass rule -> chain skipped, direct connect to echo.""" def test_bypass_skips_chain(self): async def _run(): servers = [] try: echo_host, echo_port, echo_srv = await start_echo_server() servers.append(echo_srv) # dead hop -- would fail if bypass didn't skip it dead_port = free_port() listener = ListenerConfig( listen_host="127.0.0.1", listen_port=free_port(), chain=[ChainHop(proto="socks5", host="127.0.0.1", port=dead_port)], bypass=["127.0.0.0/8"], ) s5p_srv = await asyncio.start_server( lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1), listener.listen_host, listener.listen_port, ) servers.append(s5p_srv) await s5p_srv.start_serving() reader, writer = await _socks5_connect( listener.listen_host, listener.listen_port, echo_host, echo_port, ) writer.write(b"hello bypass") await writer.drain() data = await asyncio.wait_for(reader.read(4096), timeout=2.0) assert data == b"hello bypass" writer.close() await writer.wait_closed() finally: for s in servers: await _close_server(s) asyncio.run(_run()) class TestOnionChainOnly: """Onion target uses static chain only, pool hops skipped.""" def test_onion_skips_pool(self): async def _run(): servers = [] try: # mock socks5 acts as the "Tor" hop mock_host, mock_port, mock_srv = await start_mock_socks5() servers.append(mock_srv) # fake pool that would add a dead hop if called from unittest.mock import AsyncMock, MagicMock dead_port = free_port() fake_pool = MagicMock() fake_pool.alive_count = 1 fake_pool.get = AsyncMock( return_value=ChainHop( proto="socks5", host="127.0.0.1", port=dead_port, ), ) listener = ListenerConfig( listen_host="127.0.0.1", listen_port=free_port(), chain=[ChainHop(proto="socks5", host=mock_host, port=mock_port)], pool_seq=[["default"]], ) s5p_srv = await asyncio.start_server( lambda r, w: _handle_client( r, w, listener, timeout=5.0, retries=1, pool_seq=[[fake_pool]], ), listener.listen_host, listener.listen_port, ) servers.append(s5p_srv) await s5p_srv.start_serving() # connect with .onion target -- mock socks5 will fail to # resolve it, but the key assertion is pool.get NOT called reader, writer = await asyncio.open_connection( listener.listen_host, listener.listen_port, ) writer.write(b"\x05\x01\x00") await writer.drain() await reader.readexactly(2) atyp, addr_bytes = encode_address("fake.onion") writer.write( struct.pack("!BBB", 0x05, 0x01, 0x00) + bytes([atyp]) + addr_bytes + struct.pack("!H", 80) ) await writer.drain() await asyncio.wait_for(reader.read(4096), timeout=3.0) writer.close() await writer.wait_closed() # pool.get must NOT have been called (onion skips pool) fake_pool.get.assert_not_called() finally: for s in servers: await _close_server(s) asyncio.run(_run()) class TestAuthSuccess: """Authenticate with valid credentials, relay echo data.""" def test_auth_echo(self): async def _run(): servers = [] try: echo_host, echo_port, echo_srv = await start_echo_server() servers.append(echo_srv) listener = ListenerConfig( listen_host="127.0.0.1", listen_port=free_port(), auth={"alice": "s3cret"}, ) s5p_srv = await asyncio.start_server( lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1), listener.listen_host, listener.listen_port, ) servers.append(s5p_srv) await s5p_srv.start_serving() reader, writer = await _socks5_connect_auth( listener.listen_host, listener.listen_port, echo_host, echo_port, "alice", "s3cret", ) writer.write(b"hello auth") await writer.drain() data = await asyncio.wait_for(reader.read(4096), timeout=2.0) assert data == b"hello auth" writer.close() await writer.wait_closed() finally: for s in servers: await _close_server(s) asyncio.run(_run()) class TestAuthFailure: """Wrong password returns auth failure response.""" def test_wrong_password(self): async def _run(): servers = [] try: listener = ListenerConfig( listen_host="127.0.0.1", listen_port=free_port(), auth={"alice": "s3cret"}, ) s5p_srv = await asyncio.start_server( lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1), listener.listen_host, listener.listen_port, ) servers.append(s5p_srv) await s5p_srv.start_serving() reader, writer = await asyncio.open_connection( listener.listen_host, listener.listen_port, ) # greeting with auth method writer.write(b"\x05\x01\x02") await writer.drain() resp = await reader.readexactly(2) assert resp == b"\x05\x02" # subnegotiation with wrong password uname = b"alice" passwd = b"wrong" writer.write( b"\x01" + bytes([len(uname)]) + uname + bytes([len(passwd)]) + passwd ) await writer.drain() auth_resp = await reader.readexactly(2) assert auth_resp == b"\x01\x01", f"expected auth failure, got {auth_resp!r}" writer.close() await writer.wait_closed() finally: for s in servers: await _close_server(s) asyncio.run(_run()) class TestAuthMethodNotOffered: """Client offers only no-auth when auth is required -> 0xFF rejection.""" def test_no_auth_method_rejected(self): async def _run(): servers = [] try: listener = ListenerConfig( listen_host="127.0.0.1", listen_port=free_port(), auth={"alice": "s3cret"}, ) s5p_srv = await asyncio.start_server( lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1), listener.listen_host, listener.listen_port, ) servers.append(s5p_srv) await s5p_srv.start_serving() reader, writer = await asyncio.open_connection( listener.listen_host, listener.listen_port, ) # greeting with only no-auth method (0x00) writer.write(b"\x05\x01\x00") await writer.drain() resp = await reader.readexactly(2) assert resp == b"\x05\xff", f"expected method rejection, got {resp!r}" writer.close() await writer.wait_closed() finally: for s in servers: await _close_server(s) asyncio.run(_run()) class TestNoAuthListenerUnchanged: """No auth configured -- 0x00 still works as before.""" def test_no_auth_still_works(self): async def _run(): servers = [] try: echo_host, echo_port, echo_srv = await start_echo_server() servers.append(echo_srv) listener = ListenerConfig( listen_host="127.0.0.1", listen_port=free_port(), ) s5p_srv = await asyncio.start_server( lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1), listener.listen_host, listener.listen_port, ) servers.append(s5p_srv) await s5p_srv.start_serving() reader, writer = await _socks5_connect( listener.listen_host, listener.listen_port, echo_host, echo_port, ) writer.write(b"hello no auth") await writer.drain() data = await asyncio.wait_for(reader.read(4096), timeout=2.0) assert data == b"hello no auth" writer.close() await writer.wait_closed() finally: for s in servers: await _close_server(s) asyncio.run(_run())