refactor: extract build_chain into proto module

Moves _negotiate_hop() and build_chain() from server.py to proto.py
to break circular import between server and the upcoming pool module.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-15 06:07:31 +01:00
parent b07ea49965
commit 4463adf08b
2 changed files with 78 additions and 80 deletions

View File

@@ -4,10 +4,15 @@ from __future__ import annotations
import asyncio
import base64
import logging
import socket
import struct
from enum import IntEnum
from .config import ChainHop
logger = logging.getLogger("s5p")
class Socks5Reply(IntEnum):
"""SOCKS5 reply codes (RFC 1928)."""
@@ -181,3 +186,74 @@ async def http_connect(
header_line = await reader.readline()
if header_line in (b"\r\n", b"\n", b""):
break
# -- chain building ----------------------------------------------------------
async def _negotiate_hop(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
hop: ChainHop,
dest_host: str,
dest_port: int,
) -> None:
"""Negotiate a single hop in the chain."""
if hop.proto == "socks5":
await socks5_connect(reader, writer, dest_host, dest_port, hop.username, hop.password)
elif hop.proto == "socks4":
await socks4_connect(reader, writer, dest_host, dest_port)
elif hop.proto == "http":
await http_connect(reader, writer, dest_host, dest_port, hop.username, hop.password)
else:
raise ProtoError(f"unsupported protocol: {hop.proto}")
async def build_chain(
chain: list[ChainHop],
target_host: str,
target_port: int,
timeout: float = 10.0,
) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
"""Build a tunnel through the proxy chain to the target.
Connects to the first hop via TCP, then negotiates each subsequent
hop over the tunnel established by the previous one.
"""
if not chain:
return await asyncio.wait_for(
asyncio.open_connection(target_host, target_port),
timeout=timeout,
)
reader, writer = await asyncio.wait_for(
asyncio.open_connection(chain[0].host, chain[0].port),
timeout=timeout,
)
try:
for i, hop in enumerate(chain):
if i + 1 < len(chain):
dest_host = chain[i + 1].host
dest_port = chain[i + 1].port
else:
dest_host = target_host
dest_port = target_port
await asyncio.wait_for(
_negotiate_hop(reader, writer, hop, dest_host, dest_port),
timeout=timeout,
)
logger.debug(
"hop %d/%d ok %s -> %s:%d",
i + 1,
len(chain),
hop.proto,
dest_host,
dest_port,
)
except Exception:
writer.close()
raise
return reader, writer

View File

@@ -8,17 +8,10 @@ import signal
import struct
import time
from .config import ChainHop, Config
from .config import Config
from .metrics import Metrics
from .proto import ProtoError, Socks5Reply, build_chain, read_socks5_address
from .source import ProxySource
from .proto import (
ProtoError,
Socks5Reply,
http_connect,
read_socks5_address,
socks4_connect,
socks5_connect,
)
logger = logging.getLogger("s5p")
@@ -53,77 +46,6 @@ async def _relay(
return total
# -- chain building ----------------------------------------------------------
async def _negotiate_hop(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
hop: ChainHop,
dest_host: str,
dest_port: int,
) -> None:
"""Negotiate a single hop in the chain."""
if hop.proto == "socks5":
await socks5_connect(reader, writer, dest_host, dest_port, hop.username, hop.password)
elif hop.proto == "socks4":
await socks4_connect(reader, writer, dest_host, dest_port)
elif hop.proto == "http":
await http_connect(reader, writer, dest_host, dest_port, hop.username, hop.password)
else:
raise ProtoError(f"unsupported protocol: {hop.proto}")
async def build_chain(
chain: list[ChainHop],
target_host: str,
target_port: int,
timeout: float = 10.0,
) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
"""Build a tunnel through the proxy chain to the target.
Connects to the first hop via TCP, then negotiates each subsequent
hop over the tunnel established by the previous one.
"""
if not chain:
return await asyncio.wait_for(
asyncio.open_connection(target_host, target_port),
timeout=timeout,
)
reader, writer = await asyncio.wait_for(
asyncio.open_connection(chain[0].host, chain[0].port),
timeout=timeout,
)
try:
for i, hop in enumerate(chain):
if i + 1 < len(chain):
dest_host = chain[i + 1].host
dest_port = chain[i + 1].port
else:
dest_host = target_host
dest_port = target_port
await asyncio.wait_for(
_negotiate_hop(reader, writer, hop, dest_host, dest_port),
timeout=timeout,
)
logger.debug(
"hop %d/%d ok %s -> %s:%d",
i + 1,
len(chain),
hop.proto,
dest_host,
dest_port,
)
except Exception:
writer.close()
raise
return reader, writer
# -- SOCKS5 server -----------------------------------------------------------