Per-listener bypass rules skip the chain for local/private destinations (CIDR, exact IP/hostname, domain suffix). Weighted multi-candidate pool selection biases toward pools with more alive proxies. End-to-end integration tests validate the full client->s5p->hop->target path using mock SOCKS5 proxies. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
139 lines
4.1 KiB
Python
139 lines
4.1 KiB
Python
"""Shared helpers for integration tests."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import socket
|
|
import struct
|
|
|
|
from s5p.proto import encode_address, read_socks5_address
|
|
|
|
|
|
def free_port() -> int:
|
|
"""Return an available TCP port."""
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.bind(("127.0.0.1", 0))
|
|
return s.getsockname()[1]
|
|
|
|
|
|
# -- echo server -------------------------------------------------------------
|
|
|
|
|
|
async def _echo_handler(
|
|
reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
|
|
) -> None:
|
|
"""Echo back everything received, then close."""
|
|
try:
|
|
while True:
|
|
data = await reader.read(65536)
|
|
if not data:
|
|
break
|
|
writer.write(data)
|
|
await writer.drain()
|
|
except (ConnectionError, asyncio.CancelledError):
|
|
pass
|
|
finally:
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
|
|
async def start_echo_server() -> tuple[str, int, asyncio.Server]:
|
|
"""Start a TCP echo server. Returns (host, port, server)."""
|
|
host = "127.0.0.1"
|
|
port = free_port()
|
|
srv = await asyncio.start_server(_echo_handler, host, port)
|
|
await srv.start_serving()
|
|
return host, port, srv
|
|
|
|
|
|
# -- mock SOCKS5 proxy -------------------------------------------------------
|
|
|
|
|
|
async def _mock_socks5_handler(
|
|
reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
|
|
) -> None:
|
|
"""Minimal SOCKS5 proxy: greeting, CONNECT, relay."""
|
|
remote_writer = None
|
|
try:
|
|
# greeting
|
|
header = await reader.readexactly(2)
|
|
if header[0] != 0x05:
|
|
return
|
|
await reader.readexactly(header[1]) # skip methods
|
|
writer.write(b"\x05\x00")
|
|
await writer.drain()
|
|
|
|
# connect request
|
|
req = await reader.readexactly(3)
|
|
if req[0] != 0x05 or req[1] != 0x01:
|
|
return
|
|
|
|
target_host, target_port = await read_socks5_address(reader)
|
|
|
|
# connect to actual target
|
|
try:
|
|
remote_reader, remote_writer = await asyncio.wait_for(
|
|
asyncio.open_connection(target_host, target_port),
|
|
timeout=5.0,
|
|
)
|
|
except (OSError, TimeoutError):
|
|
# connection refused reply
|
|
reply = struct.pack("!BBB", 0x05, 0x05, 0x00)
|
|
reply += b"\x01\x00\x00\x00\x00\x00\x00"
|
|
writer.write(reply)
|
|
await writer.drain()
|
|
return
|
|
|
|
# success reply
|
|
atyp, addr_bytes = encode_address(target_host)
|
|
reply = struct.pack("!BBB", 0x05, 0x00, 0x00)
|
|
reply += bytes([atyp]) + addr_bytes + struct.pack("!H", target_port)
|
|
writer.write(reply)
|
|
await writer.drain()
|
|
|
|
# relay both directions (close dst on EOF so peer sees shutdown)
|
|
async def _fwd(src: asyncio.StreamReader, dst: asyncio.StreamWriter) -> None:
|
|
try:
|
|
while True:
|
|
data = await src.read(65536)
|
|
if not data:
|
|
break
|
|
dst.write(data)
|
|
await dst.drain()
|
|
except (ConnectionError, asyncio.CancelledError):
|
|
pass
|
|
finally:
|
|
try:
|
|
dst.close()
|
|
await dst.wait_closed()
|
|
except OSError:
|
|
pass
|
|
|
|
await asyncio.gather(
|
|
_fwd(reader, remote_writer),
|
|
_fwd(remote_reader, writer),
|
|
)
|
|
except (ConnectionError, asyncio.IncompleteReadError, asyncio.CancelledError):
|
|
pass
|
|
finally:
|
|
if remote_writer:
|
|
remote_writer.close()
|
|
try:
|
|
await remote_writer.wait_closed()
|
|
except OSError:
|
|
pass
|
|
writer.close()
|
|
try:
|
|
await writer.wait_closed()
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
async def start_mock_socks5() -> tuple[str, int, asyncio.Server]:
|
|
"""Start a mock SOCKS5 proxy. Returns (host, port, server)."""
|
|
host = "127.0.0.1"
|
|
port = free_port()
|
|
srv = await asyncio.start_server(_mock_socks5_handler, host, port)
|
|
await srv.start_serving()
|
|
return host, port, srv
|