"""Shared helpers for integration tests.""" from __future__ import annotations import asyncio import socket import struct from s5p.proto import encode_address, read_socks5_address def free_port() -> int: """Return an available TCP port.""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] # -- echo server ------------------------------------------------------------- async def _echo_handler( reader: asyncio.StreamReader, writer: asyncio.StreamWriter, ) -> None: """Echo back everything received, then close.""" try: while True: data = await reader.read(65536) if not data: break writer.write(data) await writer.drain() except (ConnectionError, asyncio.CancelledError): pass finally: writer.close() await writer.wait_closed() async def start_echo_server() -> tuple[str, int, asyncio.Server]: """Start a TCP echo server. Returns (host, port, server).""" host = "127.0.0.1" port = free_port() srv = await asyncio.start_server(_echo_handler, host, port) await srv.start_serving() return host, port, srv # -- mock SOCKS5 proxy ------------------------------------------------------- async def _mock_socks5_handler( reader: asyncio.StreamReader, writer: asyncio.StreamWriter, ) -> None: """Minimal SOCKS5 proxy: greeting, CONNECT, relay.""" remote_writer = None try: # greeting header = await reader.readexactly(2) if header[0] != 0x05: return await reader.readexactly(header[1]) # skip methods writer.write(b"\x05\x00") await writer.drain() # connect request req = await reader.readexactly(3) if req[0] != 0x05 or req[1] != 0x01: return target_host, target_port = await read_socks5_address(reader) # connect to actual target try: remote_reader, remote_writer = await asyncio.wait_for( asyncio.open_connection(target_host, target_port), timeout=5.0, ) except (OSError, TimeoutError): # connection refused reply reply = struct.pack("!BBB", 0x05, 0x05, 0x00) reply += b"\x01\x00\x00\x00\x00\x00\x00" writer.write(reply) await writer.drain() return # success reply atyp, addr_bytes = encode_address(target_host) reply = struct.pack("!BBB", 0x05, 0x00, 0x00) reply += bytes([atyp]) + addr_bytes + struct.pack("!H", target_port) writer.write(reply) await writer.drain() # relay both directions (close dst on EOF so peer sees shutdown) async def _fwd(src: asyncio.StreamReader, dst: asyncio.StreamWriter) -> None: try: while True: data = await src.read(65536) if not data: break dst.write(data) await dst.drain() except (ConnectionError, asyncio.CancelledError): pass finally: try: dst.close() await dst.wait_closed() except OSError: pass await asyncio.gather( _fwd(reader, remote_writer), _fwd(remote_reader, writer), ) except (ConnectionError, asyncio.IncompleteReadError, asyncio.CancelledError): pass finally: if remote_writer: remote_writer.close() try: await remote_writer.wait_closed() except OSError: pass writer.close() try: await writer.wait_closed() except OSError: pass async def start_mock_socks5() -> tuple[str, int, asyncio.Server]: """Start a mock SOCKS5 proxy. Returns (host, port, server).""" host = "127.0.0.1" port = free_port() srv = await asyncio.start_server(_mock_socks5_handler, host, port) await srv.start_serving() return host, port, srv