Per-listener username/password auth via `auth:` config key. When set, clients must negotiate method 0x02 and pass RFC 1929 subnegotiation; no-auth (0x00) is rejected to prevent downgrade. Listeners without `auth` keep current no-auth behavior. Includes auth_failures metric, API integration (/status auth flag, /config auth_users count without exposing passwords), config parsing with YAML int coercion, integration tests (success, failure, method rejection, no-auth unchanged), and documentation updates. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
579 lines
20 KiB
Python
579 lines
20 KiB
Python
"""End-to-end integration tests with mock SOCKS5 proxies."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import struct
|
|
|
|
from s5p.config import ChainHop, ListenerConfig
|
|
from s5p.proto import encode_address
|
|
from s5p.server import _handle_client
|
|
|
|
from .conftest import free_port, start_echo_server, start_mock_socks5
|
|
|
|
# -- helpers -----------------------------------------------------------------
|
|
|
|
|
|
async def _socks5_connect(
|
|
host: str, port: int, target_host: str, target_port: int,
|
|
) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
|
|
"""Connect as a SOCKS5 client, perform greeting + CONNECT."""
|
|
reader, writer = await asyncio.open_connection(host, port)
|
|
|
|
# greeting: version 5, 1 method (no-auth)
|
|
writer.write(b"\x05\x01\x00")
|
|
await writer.drain()
|
|
resp = await reader.readexactly(2)
|
|
assert resp == b"\x05\x00", f"greeting failed: {resp!r}"
|
|
|
|
# connect request
|
|
atyp, addr_bytes = encode_address(target_host)
|
|
writer.write(
|
|
struct.pack("!BBB", 0x05, 0x01, 0x00)
|
|
+ bytes([atyp])
|
|
+ addr_bytes
|
|
+ struct.pack("!H", target_port)
|
|
)
|
|
await writer.drain()
|
|
|
|
# read reply
|
|
rep_header = await reader.readexactly(3)
|
|
atyp_resp = (await reader.readexactly(1))[0]
|
|
if atyp_resp == 0x01:
|
|
await reader.readexactly(4)
|
|
elif atyp_resp == 0x03:
|
|
length = (await reader.readexactly(1))[0]
|
|
await reader.readexactly(length)
|
|
elif atyp_resp == 0x04:
|
|
await reader.readexactly(16)
|
|
await reader.readexactly(2) # port
|
|
|
|
if rep_header[1] != 0x00:
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
raise ConnectionError(f"SOCKS5 reply={rep_header[1]:#x}")
|
|
|
|
return reader, writer
|
|
|
|
|
|
async def _socks5_connect_auth(
|
|
host: str, port: int, target_host: str, target_port: int,
|
|
username: str, password: str,
|
|
) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
|
|
"""Connect as a SOCKS5 client with username/password auth (RFC 1929)."""
|
|
reader, writer = await asyncio.open_connection(host, port)
|
|
|
|
# greeting: version 5, 1 method (user/pass)
|
|
writer.write(b"\x05\x01\x02")
|
|
await writer.drain()
|
|
resp = await reader.readexactly(2)
|
|
assert resp == b"\x05\x02", f"greeting failed: {resp!r}"
|
|
|
|
# subnegotiation
|
|
uname = username.encode("utf-8")
|
|
passwd = password.encode("utf-8")
|
|
writer.write(
|
|
b"\x01"
|
|
+ bytes([len(uname)]) + uname
|
|
+ bytes([len(passwd)]) + passwd
|
|
)
|
|
await writer.drain()
|
|
auth_resp = await reader.readexactly(2)
|
|
if auth_resp[1] != 0x00:
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
raise ConnectionError(f"auth failed: status={auth_resp[1]:#x}")
|
|
|
|
# connect request
|
|
atyp, addr_bytes = encode_address(target_host)
|
|
writer.write(
|
|
struct.pack("!BBB", 0x05, 0x01, 0x00)
|
|
+ bytes([atyp])
|
|
+ addr_bytes
|
|
+ struct.pack("!H", target_port)
|
|
)
|
|
await writer.drain()
|
|
|
|
# read reply
|
|
rep_header = await reader.readexactly(3)
|
|
atyp_resp = (await reader.readexactly(1))[0]
|
|
if atyp_resp == 0x01:
|
|
await reader.readexactly(4)
|
|
elif atyp_resp == 0x03:
|
|
length = (await reader.readexactly(1))[0]
|
|
await reader.readexactly(length)
|
|
elif atyp_resp == 0x04:
|
|
await reader.readexactly(16)
|
|
await reader.readexactly(2) # port
|
|
|
|
if rep_header[1] != 0x00:
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
raise ConnectionError(f"SOCKS5 reply={rep_header[1]:#x}")
|
|
|
|
return reader, writer
|
|
|
|
|
|
async def _close_server(srv: asyncio.Server) -> None:
|
|
"""Close a server and wait."""
|
|
srv.close()
|
|
await srv.wait_closed()
|
|
|
|
|
|
# -- tests -------------------------------------------------------------------
|
|
|
|
|
|
class TestDirectNoChain:
|
|
"""Client -> s5p -> echo (empty chain)."""
|
|
|
|
def test_echo(self):
|
|
async def _run():
|
|
servers = []
|
|
try:
|
|
echo_host, echo_port, echo_srv = await start_echo_server()
|
|
servers.append(echo_srv)
|
|
|
|
listener = ListenerConfig(listen_host="127.0.0.1", listen_port=free_port())
|
|
s5p_srv = await asyncio.start_server(
|
|
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
|
|
listener.listen_host, listener.listen_port,
|
|
)
|
|
servers.append(s5p_srv)
|
|
await s5p_srv.start_serving()
|
|
|
|
reader, writer = await _socks5_connect(
|
|
listener.listen_host, listener.listen_port, echo_host, echo_port,
|
|
)
|
|
writer.write(b"hello direct")
|
|
await writer.drain()
|
|
data = await asyncio.wait_for(reader.read(4096), timeout=2.0)
|
|
assert data == b"hello direct"
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
finally:
|
|
for s in servers:
|
|
await _close_server(s)
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
class TestSingleHop:
|
|
"""Client -> s5p -> mock socks5 -> echo."""
|
|
|
|
def test_echo_through_one_hop(self):
|
|
async def _run():
|
|
servers = []
|
|
try:
|
|
echo_host, echo_port, echo_srv = await start_echo_server()
|
|
servers.append(echo_srv)
|
|
mock_host, mock_port, mock_srv = await start_mock_socks5()
|
|
servers.append(mock_srv)
|
|
|
|
listener = ListenerConfig(
|
|
listen_host="127.0.0.1",
|
|
listen_port=free_port(),
|
|
chain=[ChainHop(proto="socks5", host=mock_host, port=mock_port)],
|
|
)
|
|
s5p_srv = await asyncio.start_server(
|
|
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
|
|
listener.listen_host, listener.listen_port,
|
|
)
|
|
servers.append(s5p_srv)
|
|
await s5p_srv.start_serving()
|
|
|
|
reader, writer = await _socks5_connect(
|
|
listener.listen_host, listener.listen_port, echo_host, echo_port,
|
|
)
|
|
writer.write(b"hello one hop")
|
|
await writer.drain()
|
|
data = await asyncio.wait_for(reader.read(4096), timeout=2.0)
|
|
assert data == b"hello one hop"
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
finally:
|
|
for s in servers:
|
|
await _close_server(s)
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
class TestTwoHops:
|
|
"""Client -> s5p -> mock1 -> mock2 -> echo."""
|
|
|
|
def test_echo_through_two_hops(self):
|
|
async def _run():
|
|
servers = []
|
|
try:
|
|
echo_host, echo_port, echo_srv = await start_echo_server()
|
|
servers.append(echo_srv)
|
|
m1_host, m1_port, m1_srv = await start_mock_socks5()
|
|
servers.append(m1_srv)
|
|
m2_host, m2_port, m2_srv = await start_mock_socks5()
|
|
servers.append(m2_srv)
|
|
|
|
listener = ListenerConfig(
|
|
listen_host="127.0.0.1",
|
|
listen_port=free_port(),
|
|
chain=[
|
|
ChainHop(proto="socks5", host=m1_host, port=m1_port),
|
|
ChainHop(proto="socks5", host=m2_host, port=m2_port),
|
|
],
|
|
)
|
|
s5p_srv = await asyncio.start_server(
|
|
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
|
|
listener.listen_host, listener.listen_port,
|
|
)
|
|
servers.append(s5p_srv)
|
|
await s5p_srv.start_serving()
|
|
|
|
reader, writer = await _socks5_connect(
|
|
listener.listen_host, listener.listen_port, echo_host, echo_port,
|
|
)
|
|
writer.write(b"hello two hops")
|
|
await writer.drain()
|
|
data = await asyncio.wait_for(reader.read(4096), timeout=2.0)
|
|
assert data == b"hello two hops"
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
finally:
|
|
for s in servers:
|
|
await _close_server(s)
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
class TestConnectionRefused:
|
|
"""Dead hop returns SOCKS5 error to client."""
|
|
|
|
def test_refused(self):
|
|
async def _run():
|
|
servers = []
|
|
try:
|
|
# use a port with nothing listening
|
|
dead_port = free_port()
|
|
|
|
listener = ListenerConfig(
|
|
listen_host="127.0.0.1",
|
|
listen_port=free_port(),
|
|
chain=[ChainHop(proto="socks5", host="127.0.0.1", port=dead_port)],
|
|
)
|
|
s5p_srv = await asyncio.start_server(
|
|
lambda r, w: _handle_client(r, w, listener, timeout=3.0, retries=1),
|
|
listener.listen_host, listener.listen_port,
|
|
)
|
|
servers.append(s5p_srv)
|
|
await s5p_srv.start_serving()
|
|
|
|
reader, writer = await asyncio.open_connection(
|
|
listener.listen_host, listener.listen_port,
|
|
)
|
|
# greeting
|
|
writer.write(b"\x05\x01\x00")
|
|
await writer.drain()
|
|
resp = await reader.readexactly(2)
|
|
assert resp == b"\x05\x00"
|
|
|
|
# connect to a dummy target
|
|
atyp, addr_bytes = encode_address("127.0.0.1")
|
|
writer.write(
|
|
struct.pack("!BBB", 0x05, 0x01, 0x00)
|
|
+ bytes([atyp])
|
|
+ addr_bytes
|
|
+ struct.pack("!H", 9999)
|
|
)
|
|
await writer.drain()
|
|
|
|
# should get error reply (non-zero rep field)
|
|
rep = await asyncio.wait_for(reader.read(4096), timeout=5.0)
|
|
assert len(rep) >= 3
|
|
assert rep[1] != 0x00, "expected non-zero SOCKS5 reply code"
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
finally:
|
|
for s in servers:
|
|
await _close_server(s)
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
class TestBypassDirectConnect:
|
|
"""Target matches bypass rule -> chain skipped, direct connect to echo."""
|
|
|
|
def test_bypass_skips_chain(self):
|
|
async def _run():
|
|
servers = []
|
|
try:
|
|
echo_host, echo_port, echo_srv = await start_echo_server()
|
|
servers.append(echo_srv)
|
|
|
|
# dead hop -- would fail if bypass didn't skip it
|
|
dead_port = free_port()
|
|
|
|
listener = ListenerConfig(
|
|
listen_host="127.0.0.1",
|
|
listen_port=free_port(),
|
|
chain=[ChainHop(proto="socks5", host="127.0.0.1", port=dead_port)],
|
|
bypass=["127.0.0.0/8"],
|
|
)
|
|
s5p_srv = await asyncio.start_server(
|
|
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
|
|
listener.listen_host, listener.listen_port,
|
|
)
|
|
servers.append(s5p_srv)
|
|
await s5p_srv.start_serving()
|
|
|
|
reader, writer = await _socks5_connect(
|
|
listener.listen_host, listener.listen_port, echo_host, echo_port,
|
|
)
|
|
writer.write(b"hello bypass")
|
|
await writer.drain()
|
|
data = await asyncio.wait_for(reader.read(4096), timeout=2.0)
|
|
assert data == b"hello bypass"
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
finally:
|
|
for s in servers:
|
|
await _close_server(s)
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
class TestOnionChainOnly:
|
|
"""Onion target uses static chain only, pool hops skipped."""
|
|
|
|
def test_onion_skips_pool(self):
|
|
async def _run():
|
|
servers = []
|
|
try:
|
|
# mock socks5 acts as the "Tor" hop
|
|
mock_host, mock_port, mock_srv = await start_mock_socks5()
|
|
servers.append(mock_srv)
|
|
|
|
# fake pool that would add a dead hop if called
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
dead_port = free_port()
|
|
fake_pool = MagicMock()
|
|
fake_pool.alive_count = 1
|
|
fake_pool.get = AsyncMock(
|
|
return_value=ChainHop(
|
|
proto="socks5", host="127.0.0.1", port=dead_port,
|
|
),
|
|
)
|
|
|
|
listener = ListenerConfig(
|
|
listen_host="127.0.0.1",
|
|
listen_port=free_port(),
|
|
chain=[ChainHop(proto="socks5", host=mock_host, port=mock_port)],
|
|
pool_seq=[["default"]],
|
|
)
|
|
s5p_srv = await asyncio.start_server(
|
|
lambda r, w: _handle_client(
|
|
r, w, listener, timeout=5.0, retries=1,
|
|
pool_seq=[[fake_pool]],
|
|
),
|
|
listener.listen_host, listener.listen_port,
|
|
)
|
|
servers.append(s5p_srv)
|
|
await s5p_srv.start_serving()
|
|
|
|
# connect with .onion target -- mock socks5 will fail to
|
|
# resolve it, but the key assertion is pool.get NOT called
|
|
reader, writer = await asyncio.open_connection(
|
|
listener.listen_host, listener.listen_port,
|
|
)
|
|
writer.write(b"\x05\x01\x00")
|
|
await writer.drain()
|
|
await reader.readexactly(2)
|
|
|
|
atyp, addr_bytes = encode_address("fake.onion")
|
|
writer.write(
|
|
struct.pack("!BBB", 0x05, 0x01, 0x00)
|
|
+ bytes([atyp])
|
|
+ addr_bytes
|
|
+ struct.pack("!H", 80)
|
|
)
|
|
await writer.drain()
|
|
await asyncio.wait_for(reader.read(4096), timeout=3.0)
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
# pool.get must NOT have been called (onion skips pool)
|
|
fake_pool.get.assert_not_called()
|
|
|
|
finally:
|
|
for s in servers:
|
|
await _close_server(s)
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
class TestAuthSuccess:
|
|
"""Authenticate with valid credentials, relay echo data."""
|
|
|
|
def test_auth_echo(self):
|
|
async def _run():
|
|
servers = []
|
|
try:
|
|
echo_host, echo_port, echo_srv = await start_echo_server()
|
|
servers.append(echo_srv)
|
|
|
|
listener = ListenerConfig(
|
|
listen_host="127.0.0.1",
|
|
listen_port=free_port(),
|
|
auth={"alice": "s3cret"},
|
|
)
|
|
s5p_srv = await asyncio.start_server(
|
|
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
|
|
listener.listen_host, listener.listen_port,
|
|
)
|
|
servers.append(s5p_srv)
|
|
await s5p_srv.start_serving()
|
|
|
|
reader, writer = await _socks5_connect_auth(
|
|
listener.listen_host, listener.listen_port,
|
|
echo_host, echo_port, "alice", "s3cret",
|
|
)
|
|
writer.write(b"hello auth")
|
|
await writer.drain()
|
|
data = await asyncio.wait_for(reader.read(4096), timeout=2.0)
|
|
assert data == b"hello auth"
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
finally:
|
|
for s in servers:
|
|
await _close_server(s)
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
class TestAuthFailure:
|
|
"""Wrong password returns auth failure response."""
|
|
|
|
def test_wrong_password(self):
|
|
async def _run():
|
|
servers = []
|
|
try:
|
|
listener = ListenerConfig(
|
|
listen_host="127.0.0.1",
|
|
listen_port=free_port(),
|
|
auth={"alice": "s3cret"},
|
|
)
|
|
s5p_srv = await asyncio.start_server(
|
|
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
|
|
listener.listen_host, listener.listen_port,
|
|
)
|
|
servers.append(s5p_srv)
|
|
await s5p_srv.start_serving()
|
|
|
|
reader, writer = await asyncio.open_connection(
|
|
listener.listen_host, listener.listen_port,
|
|
)
|
|
# greeting with auth method
|
|
writer.write(b"\x05\x01\x02")
|
|
await writer.drain()
|
|
resp = await reader.readexactly(2)
|
|
assert resp == b"\x05\x02"
|
|
|
|
# subnegotiation with wrong password
|
|
uname = b"alice"
|
|
passwd = b"wrong"
|
|
writer.write(
|
|
b"\x01"
|
|
+ bytes([len(uname)]) + uname
|
|
+ bytes([len(passwd)]) + passwd
|
|
)
|
|
await writer.drain()
|
|
auth_resp = await reader.readexactly(2)
|
|
assert auth_resp == b"\x01\x01", f"expected auth failure, got {auth_resp!r}"
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
finally:
|
|
for s in servers:
|
|
await _close_server(s)
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
class TestAuthMethodNotOffered:
|
|
"""Client offers only no-auth when auth is required -> 0xFF rejection."""
|
|
|
|
def test_no_auth_method_rejected(self):
|
|
async def _run():
|
|
servers = []
|
|
try:
|
|
listener = ListenerConfig(
|
|
listen_host="127.0.0.1",
|
|
listen_port=free_port(),
|
|
auth={"alice": "s3cret"},
|
|
)
|
|
s5p_srv = await asyncio.start_server(
|
|
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
|
|
listener.listen_host, listener.listen_port,
|
|
)
|
|
servers.append(s5p_srv)
|
|
await s5p_srv.start_serving()
|
|
|
|
reader, writer = await asyncio.open_connection(
|
|
listener.listen_host, listener.listen_port,
|
|
)
|
|
# greeting with only no-auth method (0x00)
|
|
writer.write(b"\x05\x01\x00")
|
|
await writer.drain()
|
|
resp = await reader.readexactly(2)
|
|
assert resp == b"\x05\xff", f"expected method rejection, got {resp!r}"
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
finally:
|
|
for s in servers:
|
|
await _close_server(s)
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
class TestNoAuthListenerUnchanged:
|
|
"""No auth configured -- 0x00 still works as before."""
|
|
|
|
def test_no_auth_still_works(self):
|
|
async def _run():
|
|
servers = []
|
|
try:
|
|
echo_host, echo_port, echo_srv = await start_echo_server()
|
|
servers.append(echo_srv)
|
|
|
|
listener = ListenerConfig(
|
|
listen_host="127.0.0.1",
|
|
listen_port=free_port(),
|
|
)
|
|
s5p_srv = await asyncio.start_server(
|
|
lambda r, w: _handle_client(r, w, listener, timeout=5.0, retries=1),
|
|
listener.listen_host, listener.listen_port,
|
|
)
|
|
servers.append(s5p_srv)
|
|
await s5p_srv.start_serving()
|
|
|
|
reader, writer = await _socks5_connect(
|
|
listener.listen_host, listener.listen_port, echo_host, echo_port,
|
|
)
|
|
writer.write(b"hello no auth")
|
|
await writer.drain()
|
|
data = await asyncio.wait_for(reader.read(4096), timeout=2.0)
|
|
assert data == b"hello no auth"
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
finally:
|
|
for s in servers:
|
|
await _close_server(s)
|
|
|
|
asyncio.run(_run())
|