diff --git a/src/s5p/proto.py b/src/s5p/proto.py index 4beeab0..13e2901 100644 --- a/src/s5p/proto.py +++ b/src/s5p/proto.py @@ -4,10 +4,15 @@ from __future__ import annotations import asyncio import base64 +import logging import socket import struct from enum import IntEnum +from .config import ChainHop + +logger = logging.getLogger("s5p") + class Socks5Reply(IntEnum): """SOCKS5 reply codes (RFC 1928).""" @@ -181,3 +186,74 @@ async def http_connect( header_line = await reader.readline() if header_line in (b"\r\n", b"\n", b""): break + + +# -- chain building ---------------------------------------------------------- + + +async def _negotiate_hop( + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + hop: ChainHop, + dest_host: str, + dest_port: int, +) -> None: + """Negotiate a single hop in the chain.""" + if hop.proto == "socks5": + await socks5_connect(reader, writer, dest_host, dest_port, hop.username, hop.password) + elif hop.proto == "socks4": + await socks4_connect(reader, writer, dest_host, dest_port) + elif hop.proto == "http": + await http_connect(reader, writer, dest_host, dest_port, hop.username, hop.password) + else: + raise ProtoError(f"unsupported protocol: {hop.proto}") + + +async def build_chain( + chain: list[ChainHop], + target_host: str, + target_port: int, + timeout: float = 10.0, +) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: + """Build a tunnel through the proxy chain to the target. + + Connects to the first hop via TCP, then negotiates each subsequent + hop over the tunnel established by the previous one. + """ + if not chain: + return await asyncio.wait_for( + asyncio.open_connection(target_host, target_port), + timeout=timeout, + ) + + reader, writer = await asyncio.wait_for( + asyncio.open_connection(chain[0].host, chain[0].port), + timeout=timeout, + ) + + try: + for i, hop in enumerate(chain): + if i + 1 < len(chain): + dest_host = chain[i + 1].host + dest_port = chain[i + 1].port + else: + dest_host = target_host + dest_port = target_port + + await asyncio.wait_for( + _negotiate_hop(reader, writer, hop, dest_host, dest_port), + timeout=timeout, + ) + logger.debug( + "hop %d/%d ok %s -> %s:%d", + i + 1, + len(chain), + hop.proto, + dest_host, + dest_port, + ) + except Exception: + writer.close() + raise + + return reader, writer diff --git a/src/s5p/server.py b/src/s5p/server.py index f92fb14..050677f 100644 --- a/src/s5p/server.py +++ b/src/s5p/server.py @@ -8,17 +8,10 @@ import signal import struct import time -from .config import ChainHop, Config +from .config import Config from .metrics import Metrics +from .proto import ProtoError, Socks5Reply, build_chain, read_socks5_address from .source import ProxySource -from .proto import ( - ProtoError, - Socks5Reply, - http_connect, - read_socks5_address, - socks4_connect, - socks5_connect, -) logger = logging.getLogger("s5p") @@ -53,77 +46,6 @@ async def _relay( return total -# -- chain building ---------------------------------------------------------- - - -async def _negotiate_hop( - reader: asyncio.StreamReader, - writer: asyncio.StreamWriter, - hop: ChainHop, - dest_host: str, - dest_port: int, -) -> None: - """Negotiate a single hop in the chain.""" - if hop.proto == "socks5": - await socks5_connect(reader, writer, dest_host, dest_port, hop.username, hop.password) - elif hop.proto == "socks4": - await socks4_connect(reader, writer, dest_host, dest_port) - elif hop.proto == "http": - await http_connect(reader, writer, dest_host, dest_port, hop.username, hop.password) - else: - raise ProtoError(f"unsupported protocol: {hop.proto}") - - -async def build_chain( - chain: list[ChainHop], - target_host: str, - target_port: int, - timeout: float = 10.0, -) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: - """Build a tunnel through the proxy chain to the target. - - Connects to the first hop via TCP, then negotiates each subsequent - hop over the tunnel established by the previous one. - """ - if not chain: - return await asyncio.wait_for( - asyncio.open_connection(target_host, target_port), - timeout=timeout, - ) - - reader, writer = await asyncio.wait_for( - asyncio.open_connection(chain[0].host, chain[0].port), - timeout=timeout, - ) - - try: - for i, hop in enumerate(chain): - if i + 1 < len(chain): - dest_host = chain[i + 1].host - dest_port = chain[i + 1].port - else: - dest_host = target_host - dest_port = target_port - - await asyncio.wait_for( - _negotiate_hop(reader, writer, hop, dest_host, dest_port), - timeout=timeout, - ) - logger.debug( - "hop %d/%d ok %s -> %s:%d", - i + 1, - len(chain), - hop.proto, - dest_host, - dest_port, - ) - except Exception: - writer.close() - raise - - return reader, writer - - # -- SOCKS5 server -----------------------------------------------------------