Files
s5p/tests/conftest.py
user c191942712 feat: add bypass rules, weighted pool selection, integration tests
Per-listener bypass rules skip the chain for local/private destinations
(CIDR, exact IP/hostname, domain suffix). Weighted multi-candidate pool
selection biases toward pools with more alive proxies. End-to-end
integration tests validate the full client->s5p->hop->target path using
mock SOCKS5 proxies.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 19:58:12 +01:00

139 lines
4.1 KiB
Python

"""Shared helpers for integration tests."""
from __future__ import annotations
import asyncio
import socket
import struct
from s5p.proto import encode_address, read_socks5_address
def free_port() -> int:
"""Return an available TCP port."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
# -- echo server -------------------------------------------------------------
async def _echo_handler(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
) -> None:
"""Echo back everything received, then close."""
try:
while True:
data = await reader.read(65536)
if not data:
break
writer.write(data)
await writer.drain()
except (ConnectionError, asyncio.CancelledError):
pass
finally:
writer.close()
await writer.wait_closed()
async def start_echo_server() -> tuple[str, int, asyncio.Server]:
"""Start a TCP echo server. Returns (host, port, server)."""
host = "127.0.0.1"
port = free_port()
srv = await asyncio.start_server(_echo_handler, host, port)
await srv.start_serving()
return host, port, srv
# -- mock SOCKS5 proxy -------------------------------------------------------
async def _mock_socks5_handler(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
) -> None:
"""Minimal SOCKS5 proxy: greeting, CONNECT, relay."""
remote_writer = None
try:
# greeting
header = await reader.readexactly(2)
if header[0] != 0x05:
return
await reader.readexactly(header[1]) # skip methods
writer.write(b"\x05\x00")
await writer.drain()
# connect request
req = await reader.readexactly(3)
if req[0] != 0x05 or req[1] != 0x01:
return
target_host, target_port = await read_socks5_address(reader)
# connect to actual target
try:
remote_reader, remote_writer = await asyncio.wait_for(
asyncio.open_connection(target_host, target_port),
timeout=5.0,
)
except (OSError, TimeoutError):
# connection refused reply
reply = struct.pack("!BBB", 0x05, 0x05, 0x00)
reply += b"\x01\x00\x00\x00\x00\x00\x00"
writer.write(reply)
await writer.drain()
return
# success reply
atyp, addr_bytes = encode_address(target_host)
reply = struct.pack("!BBB", 0x05, 0x00, 0x00)
reply += bytes([atyp]) + addr_bytes + struct.pack("!H", target_port)
writer.write(reply)
await writer.drain()
# relay both directions (close dst on EOF so peer sees shutdown)
async def _fwd(src: asyncio.StreamReader, dst: asyncio.StreamWriter) -> None:
try:
while True:
data = await src.read(65536)
if not data:
break
dst.write(data)
await dst.drain()
except (ConnectionError, asyncio.CancelledError):
pass
finally:
try:
dst.close()
await dst.wait_closed()
except OSError:
pass
await asyncio.gather(
_fwd(reader, remote_writer),
_fwd(remote_reader, writer),
)
except (ConnectionError, asyncio.IncompleteReadError, asyncio.CancelledError):
pass
finally:
if remote_writer:
remote_writer.close()
try:
await remote_writer.wait_closed()
except OSError:
pass
writer.close()
try:
await writer.wait_closed()
except OSError:
pass
async def start_mock_socks5() -> tuple[str, int, asyncio.Server]:
"""Start a mock SOCKS5 proxy. Returns (host, port, server)."""
host = "127.0.0.1"
port = free_port()
srv = await asyncio.start_server(_mock_socks5_handler, host, port)
await srv.start_serving()
return host, port, srv