Files
s5p/tests/test_integration.py
user 918d03cc58 feat: skip pool hops for .onion destinations
Onion addresses require Tor to resolve, so pool proxies after Tor
would break connectivity. Detect .onion targets and use the static
chain only (Tor), skipping pool selection and retries.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 02:28:34 +01:00

357 lines
12 KiB
Python

"""End-to-end integration tests with mock SOCKS5 proxies."""
from __future__ import annotations
import asyncio
import struct
from s5p.config import ChainHop, ListenerConfig
from s5p.proto import encode_address
from s5p.server import _handle_client
from .conftest import free_port, start_echo_server, start_mock_socks5
# -- helpers -----------------------------------------------------------------
async def _socks5_connect(
host: str, port: int, target_host: str, target_port: int,
) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
"""Connect as a SOCKS5 client, perform greeting + CONNECT."""
reader, writer = await asyncio.open_connection(host, port)
# greeting: version 5, 1 method (no-auth)
writer.write(b"\x05\x01\x00")
await writer.drain()
resp = await reader.readexactly(2)
assert resp == b"\x05\x00", f"greeting failed: {resp!r}"
# connect request
atyp, addr_bytes = encode_address(target_host)
writer.write(
struct.pack("!BBB", 0x05, 0x01, 0x00)
+ bytes([atyp])
+ addr_bytes
+ struct.pack("!H", target_port)
)
await writer.drain()
# read reply
rep_header = await reader.readexactly(3)
atyp_resp = (await reader.readexactly(1))[0]
if atyp_resp == 0x01:
await reader.readexactly(4)
elif atyp_resp == 0x03:
length = (await reader.readexactly(1))[0]
await reader.readexactly(length)
elif atyp_resp == 0x04:
await reader.readexactly(16)
await reader.readexactly(2) # port
if rep_header[1] != 0x00:
writer.close()
await writer.wait_closed()
raise ConnectionError(f"SOCKS5 reply={rep_header[1]:#x}")
return reader, writer
async def _close_server(srv: asyncio.Server) -> None:
"""Close a server and wait."""
srv.close()
await srv.wait_closed()
# -- tests -------------------------------------------------------------------
class TestDirectNoChain:
"""Client -> s5p -> echo (empty chain)."""
def test_echo(self):
async def _run():
servers = []
try:
echo_host, echo_port, echo_srv = await start_echo_server()
servers.append(echo_srv)
listener = ListenerConfig(listen_host="127.0.0.1", listen_port=free_port())
s5p_srv = await asyncio.start_server(
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
listener.listen_host, listener.listen_port,
)
servers.append(s5p_srv)
await s5p_srv.start_serving()
reader, writer = await _socks5_connect(
listener.listen_host, listener.listen_port, echo_host, echo_port,
)
writer.write(b"hello direct")
await writer.drain()
data = await asyncio.wait_for(reader.read(4096), timeout=2.0)
assert data == b"hello direct"
writer.close()
await writer.wait_closed()
finally:
for s in servers:
await _close_server(s)
asyncio.run(_run())
class TestSingleHop:
"""Client -> s5p -> mock socks5 -> echo."""
def test_echo_through_one_hop(self):
async def _run():
servers = []
try:
echo_host, echo_port, echo_srv = await start_echo_server()
servers.append(echo_srv)
mock_host, mock_port, mock_srv = await start_mock_socks5()
servers.append(mock_srv)
listener = ListenerConfig(
listen_host="127.0.0.1",
listen_port=free_port(),
chain=[ChainHop(proto="socks5", host=mock_host, port=mock_port)],
)
s5p_srv = await asyncio.start_server(
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
listener.listen_host, listener.listen_port,
)
servers.append(s5p_srv)
await s5p_srv.start_serving()
reader, writer = await _socks5_connect(
listener.listen_host, listener.listen_port, echo_host, echo_port,
)
writer.write(b"hello one hop")
await writer.drain()
data = await asyncio.wait_for(reader.read(4096), timeout=2.0)
assert data == b"hello one hop"
writer.close()
await writer.wait_closed()
finally:
for s in servers:
await _close_server(s)
asyncio.run(_run())
class TestTwoHops:
"""Client -> s5p -> mock1 -> mock2 -> echo."""
def test_echo_through_two_hops(self):
async def _run():
servers = []
try:
echo_host, echo_port, echo_srv = await start_echo_server()
servers.append(echo_srv)
m1_host, m1_port, m1_srv = await start_mock_socks5()
servers.append(m1_srv)
m2_host, m2_port, m2_srv = await start_mock_socks5()
servers.append(m2_srv)
listener = ListenerConfig(
listen_host="127.0.0.1",
listen_port=free_port(),
chain=[
ChainHop(proto="socks5", host=m1_host, port=m1_port),
ChainHop(proto="socks5", host=m2_host, port=m2_port),
],
)
s5p_srv = await asyncio.start_server(
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
listener.listen_host, listener.listen_port,
)
servers.append(s5p_srv)
await s5p_srv.start_serving()
reader, writer = await _socks5_connect(
listener.listen_host, listener.listen_port, echo_host, echo_port,
)
writer.write(b"hello two hops")
await writer.drain()
data = await asyncio.wait_for(reader.read(4096), timeout=2.0)
assert data == b"hello two hops"
writer.close()
await writer.wait_closed()
finally:
for s in servers:
await _close_server(s)
asyncio.run(_run())
class TestConnectionRefused:
"""Dead hop returns SOCKS5 error to client."""
def test_refused(self):
async def _run():
servers = []
try:
# use a port with nothing listening
dead_port = free_port()
listener = ListenerConfig(
listen_host="127.0.0.1",
listen_port=free_port(),
chain=[ChainHop(proto="socks5", host="127.0.0.1", port=dead_port)],
)
s5p_srv = await asyncio.start_server(
lambda r, w: _handle_client(r, w, listener, timeout=3.0, retries=1),
listener.listen_host, listener.listen_port,
)
servers.append(s5p_srv)
await s5p_srv.start_serving()
reader, writer = await asyncio.open_connection(
listener.listen_host, listener.listen_port,
)
# greeting
writer.write(b"\x05\x01\x00")
await writer.drain()
resp = await reader.readexactly(2)
assert resp == b"\x05\x00"
# connect to a dummy target
atyp, addr_bytes = encode_address("127.0.0.1")
writer.write(
struct.pack("!BBB", 0x05, 0x01, 0x00)
+ bytes([atyp])
+ addr_bytes
+ struct.pack("!H", 9999)
)
await writer.drain()
# should get error reply (non-zero rep field)
rep = await asyncio.wait_for(reader.read(4096), timeout=5.0)
assert len(rep) >= 3
assert rep[1] != 0x00, "expected non-zero SOCKS5 reply code"
writer.close()
await writer.wait_closed()
finally:
for s in servers:
await _close_server(s)
asyncio.run(_run())
class TestBypassDirectConnect:
"""Target matches bypass rule -> chain skipped, direct connect to echo."""
def test_bypass_skips_chain(self):
async def _run():
servers = []
try:
echo_host, echo_port, echo_srv = await start_echo_server()
servers.append(echo_srv)
# dead hop -- would fail if bypass didn't skip it
dead_port = free_port()
listener = ListenerConfig(
listen_host="127.0.0.1",
listen_port=free_port(),
chain=[ChainHop(proto="socks5", host="127.0.0.1", port=dead_port)],
bypass=["127.0.0.0/8"],
)
s5p_srv = await asyncio.start_server(
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
listener.listen_host, listener.listen_port,
)
servers.append(s5p_srv)
await s5p_srv.start_serving()
reader, writer = await _socks5_connect(
listener.listen_host, listener.listen_port, echo_host, echo_port,
)
writer.write(b"hello bypass")
await writer.drain()
data = await asyncio.wait_for(reader.read(4096), timeout=2.0)
assert data == b"hello bypass"
writer.close()
await writer.wait_closed()
finally:
for s in servers:
await _close_server(s)
asyncio.run(_run())
class TestOnionChainOnly:
"""Onion target uses static chain only, pool hops skipped."""
def test_onion_skips_pool(self):
async def _run():
servers = []
try:
# mock socks5 acts as the "Tor" hop
mock_host, mock_port, mock_srv = await start_mock_socks5()
servers.append(mock_srv)
# fake pool that would add a dead hop if called
from unittest.mock import AsyncMock, MagicMock
dead_port = free_port()
fake_pool = MagicMock()
fake_pool.alive_count = 1
fake_pool.get = AsyncMock(
return_value=ChainHop(
proto="socks5", host="127.0.0.1", port=dead_port,
),
)
listener = ListenerConfig(
listen_host="127.0.0.1",
listen_port=free_port(),
chain=[ChainHop(proto="socks5", host=mock_host, port=mock_port)],
pool_seq=[["default"]],
)
s5p_srv = await asyncio.start_server(
lambda r, w: _handle_client(
r, w, listener, timeout=5.0, retries=1,
pool_seq=[[fake_pool]],
),
listener.listen_host, listener.listen_port,
)
servers.append(s5p_srv)
await s5p_srv.start_serving()
# connect with .onion target -- mock socks5 will fail to
# resolve it, but the key assertion is pool.get NOT called
reader, writer = await asyncio.open_connection(
listener.listen_host, listener.listen_port,
)
writer.write(b"\x05\x01\x00")
await writer.drain()
await reader.readexactly(2)
atyp, addr_bytes = encode_address("fake.onion")
writer.write(
struct.pack("!BBB", 0x05, 0x01, 0x00)
+ bytes([atyp])
+ addr_bytes
+ struct.pack("!H", 80)
)
await writer.drain()
await asyncio.wait_for(reader.read(4096), timeout=3.0)
writer.close()
await writer.wait_closed()
# pool.get must NOT have been called (onion skips pool)
fake_pool.get.assert_not_called()
finally:
for s in servers:
await _close_server(s)
asyncio.run(_run())