From ddad839fca66ced80943a33ce219ba57f6b22661 Mon Sep 17 00:00:00 2001 From: user Date: Sun, 15 Feb 2026 04:19:29 +0100 Subject: [PATCH] feat: add dynamic proxy source API integration Fetches proxies from an HTTP API, caches them in memory, and appends a random proxy to the chain on each connection. Supports proto/country filters and configurable refresh interval. Config: proxy_source.url, proto, country, limit, refresh CLI: -S/--proxy-source URL --- config/example.yaml | 9 +++++ src/s5p/cli.py | 9 ++++- src/s5p/config.py | 25 ++++++++++++ src/s5p/server.py | 20 +++++++++- src/s5p/source.py | 97 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 157 insertions(+), 3 deletions(-) create mode 100644 src/s5p/source.py diff --git a/config/example.yaml b/config/example.yaml index d6635fe..03924ca 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -14,3 +14,12 @@ chain: # - socks5://user:pass@proxy:1080 # post-Tor SOCKS5 proxy # - socks4://proxy:1080 # post-Tor SOCKS4/4a proxy # - http://user:pass@proxy:8080 # post-Tor HTTP CONNECT proxy + +# Dynamic proxy source -- appends a random proxy after the static chain. +# Fetches from an HTTP API and caches the list. +# proxy_source: +# url: http://10.200.1.250:8081/proxies +# proto: socks5 # optional: filter by protocol +# country: US # optional: filter by country +# limit: 1000 # optional: max proxies to fetch +# refresh: 300 # cache refresh interval (seconds) diff --git a/src/s5p/cli.py b/src/s5p/cli.py index a4fde99..a89ed64 100644 --- a/src/s5p/cli.py +++ b/src/s5p/cli.py @@ -7,7 +7,7 @@ import asyncio import logging from . import __version__ -from .config import Config, load_config, parse_proxy_url +from .config import Config, ProxySourceConfig, load_config, parse_proxy_url from .server import serve @@ -42,6 +42,10 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: ) p.add_argument("-v", "--verbose", action="store_true", help="debug logging") p.add_argument("-q", "--quiet", action="store_true", help="errors only") + p.add_argument( + "-S", "--proxy-source", metavar="URL", + help="proxy source API URL", + ) p.add_argument( "--cprofile", metavar="FILE", nargs="?", const="s5p.prof", help="enable cProfile, dump stats to FILE (default: s5p.prof)", @@ -69,6 +73,9 @@ def main(argv: list[str] | None = None) -> int: if args.timeout is not None: config.timeout = args.timeout + if args.proxy_source: + config.proxy_source = ProxySourceConfig(url=args.proxy_source) + if args.verbose: config.log_level = "debug" elif args.quiet: diff --git a/src/s5p/config.py b/src/s5p/config.py index 4c31589..4a478b0 100644 --- a/src/s5p/config.py +++ b/src/s5p/config.py @@ -27,6 +27,17 @@ class ChainHop: return f"{self.proto}://{auth}{self.host}:{self.port}" +@dataclass +class ProxySourceConfig: + """Configuration for the dynamic proxy source API.""" + + url: str = "" + proto: str | None = None + country: str | None = None + limit: int | None = 1000 + refresh: float = 300.0 + + @dataclass class Config: """Server configuration.""" @@ -36,6 +47,7 @@ class Config: chain: list[ChainHop] = field(default_factory=list) timeout: float = 10.0 log_level: str = "info" + proxy_source: ProxySourceConfig | None = None def parse_proxy_url(url: str) -> ChainHop: @@ -98,4 +110,17 @@ def load_config(path: str | Path) -> Config: ) ) + if "proxy_source" in raw: + ps = raw["proxy_source"] + if isinstance(ps, str): + config.proxy_source = ProxySourceConfig(url=ps) + elif isinstance(ps, dict): + config.proxy_source = ProxySourceConfig( + url=ps.get("url", ""), + proto=ps.get("proto"), + country=ps.get("country"), + limit=ps.get("limit", 1000), + refresh=float(ps.get("refresh", 300)), + ) + return config diff --git a/src/s5p/server.py b/src/s5p/server.py index ec20358..873cb87 100644 --- a/src/s5p/server.py +++ b/src/s5p/server.py @@ -9,6 +9,7 @@ import struct import time from .config import ChainHop, Config +from .source import ProxySource from .proto import ( ProtoError, Socks5Reply, @@ -131,6 +132,7 @@ async def _handle_client( client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter, config: Config, + proxy_source: ProxySource | None = None, ) -> None: """Handle a single SOCKS5 client connection.""" peer = client_writer.get_extra_info("peername") @@ -165,9 +167,16 @@ async def _handle_client( logger.info("[%s] connect %s:%d", tag, target_host, target_port) # -- build chain -- + effective_chain = list(config.chain) + if proxy_source: + hop = await proxy_source.get() + if hop: + effective_chain.append(hop) + logger.debug("[%s] +proxy %s", tag, hop) + t0 = time.monotonic() remote_reader, remote_writer = await build_chain( - config.chain, target_host, target_port, timeout=config.timeout + effective_chain, target_host, target_port, timeout=config.timeout ) dt = time.monotonic() - t0 logger.debug("[%s] chain up in %.0fms", tag, dt * 1000) @@ -218,9 +227,13 @@ async def _handle_client( async def serve(config: Config) -> None: """Start the SOCKS5 proxy server.""" + proxy_source: ProxySource | None = None + if config.proxy_source and config.proxy_source.url: + proxy_source = ProxySource(config.proxy_source) + await proxy_source.start() async def on_client(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None: - await _handle_client(r, w, config) + await _handle_client(r, w, config, proxy_source) srv = await asyncio.start_server(on_client, config.listen_host, config.listen_port) addrs = ", ".join(str(s.getsockname()) for s in srv.sockets) @@ -232,6 +245,9 @@ async def serve(config: Config) -> None: else: logger.info(" mode: direct (no chain)") + if proxy_source: + logger.info(" proxy source: %s (%d proxies)", config.proxy_source.url, proxy_source.count) + loop = asyncio.get_running_loop() stop = loop.create_future() diff --git a/src/s5p/source.py b/src/s5p/source.py new file mode 100644 index 0000000..a742248 --- /dev/null +++ b/src/s5p/source.py @@ -0,0 +1,97 @@ +"""Dynamic proxy source -- fetches proxies from an HTTP API.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import random +import time +import urllib.request +from urllib.parse import urlencode + +from .config import ChainHop, ProxySourceConfig + +logger = logging.getLogger("s5p") + +VALID_PROTOS = {"socks5", "socks4", "http"} + + +class ProxySource: + """Fetches and caches proxies from an HTTP API. + + Picks a random proxy on each ``get()`` call. Refreshes the cache + in the background at a configurable interval. + """ + + def __init__(self, cfg: ProxySourceConfig) -> None: + self._cfg = cfg + self._cache: list[ChainHop] = [] + self._last_fetch: float = 0.0 + self._lock = asyncio.Lock() + + @property + def count(self) -> int: + """Number of proxies currently cached.""" + return len(self._cache) + + async def start(self) -> None: + """Initial fetch. Call once before serving.""" + await self._refresh() + + async def get(self) -> ChainHop | None: + """Return a random proxy from the cache, refreshing if stale.""" + now = time.monotonic() + if now - self._last_fetch > self._cfg.refresh: + await self._refresh() + if not self._cache: + return None + return random.choice(self._cache) + + async def _refresh(self) -> None: + """Fetch proxy list from the API.""" + async with self._lock: + loop = asyncio.get_running_loop() + try: + proxies = await loop.run_in_executor(None, self._fetch_sync) + self._cache = proxies + self._last_fetch = time.monotonic() + logger.info("proxy source: loaded %d proxies", len(proxies)) + except Exception as e: + logger.warning("proxy source: fetch failed: %s", e) + if self._cache: + logger.info("proxy source: using stale cache (%d proxies)", len(self._cache)) + + def _fetch_sync(self) -> list[ChainHop]: + """Synchronous HTTP fetch (runs in executor).""" + params: dict[str, str] = {} + if self._cfg.limit: + params["limit"] = str(self._cfg.limit) + if self._cfg.proto: + params["proto"] = self._cfg.proto + if self._cfg.country: + params["country"] = self._cfg.country + + url = self._cfg.url + if params: + sep = "&" if "?" in url else "?" + url = f"{url}{sep}{urlencode(params)}" + + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + with urllib.request.urlopen(req, timeout=10) as resp: + data = json.loads(resp.read()) + + proxies: list[ChainHop] = [] + for entry in data.get("proxies", []): + proto = entry.get("proto") + addr = entry.get("proxy", "") + if not proto or proto not in VALID_PROTOS or ":" not in addr: + continue + host, port_str = addr.rsplit(":", 1) + try: + port = int(port_str) + except ValueError: + continue + proxies.append(ChainHop(proto=proto, host=host, port=port)) + + return proxies