feat: add dynamic proxy source API integration
Fetches proxies from an HTTP API, caches them in memory, and appends a random proxy to the chain on each connection. Supports proto/country filters and configurable refresh interval. Config: proxy_source.url, proto, country, limit, refresh CLI: -S/--proxy-source URL
This commit is contained in:
@@ -14,3 +14,12 @@ chain:
|
|||||||
# - socks5://user:pass@proxy:1080 # post-Tor SOCKS5 proxy
|
# - socks5://user:pass@proxy:1080 # post-Tor SOCKS5 proxy
|
||||||
# - socks4://proxy:1080 # post-Tor SOCKS4/4a proxy
|
# - socks4://proxy:1080 # post-Tor SOCKS4/4a proxy
|
||||||
# - http://user:pass@proxy:8080 # post-Tor HTTP CONNECT proxy
|
# - http://user:pass@proxy:8080 # post-Tor HTTP CONNECT proxy
|
||||||
|
|
||||||
|
# Dynamic proxy source -- appends a random proxy after the static chain.
|
||||||
|
# Fetches from an HTTP API and caches the list.
|
||||||
|
# proxy_source:
|
||||||
|
# url: http://10.200.1.250:8081/proxies
|
||||||
|
# proto: socks5 # optional: filter by protocol
|
||||||
|
# country: US # optional: filter by country
|
||||||
|
# limit: 1000 # optional: max proxies to fetch
|
||||||
|
# refresh: 300 # cache refresh interval (seconds)
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import asyncio
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
from . import __version__
|
from . import __version__
|
||||||
from .config import Config, load_config, parse_proxy_url
|
from .config import Config, ProxySourceConfig, load_config, parse_proxy_url
|
||||||
from .server import serve
|
from .server import serve
|
||||||
|
|
||||||
|
|
||||||
@@ -42,6 +42,10 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
|
|||||||
)
|
)
|
||||||
p.add_argument("-v", "--verbose", action="store_true", help="debug logging")
|
p.add_argument("-v", "--verbose", action="store_true", help="debug logging")
|
||||||
p.add_argument("-q", "--quiet", action="store_true", help="errors only")
|
p.add_argument("-q", "--quiet", action="store_true", help="errors only")
|
||||||
|
p.add_argument(
|
||||||
|
"-S", "--proxy-source", metavar="URL",
|
||||||
|
help="proxy source API URL",
|
||||||
|
)
|
||||||
p.add_argument(
|
p.add_argument(
|
||||||
"--cprofile", metavar="FILE", nargs="?", const="s5p.prof",
|
"--cprofile", metavar="FILE", nargs="?", const="s5p.prof",
|
||||||
help="enable cProfile, dump stats to FILE (default: s5p.prof)",
|
help="enable cProfile, dump stats to FILE (default: s5p.prof)",
|
||||||
@@ -69,6 +73,9 @@ def main(argv: list[str] | None = None) -> int:
|
|||||||
if args.timeout is not None:
|
if args.timeout is not None:
|
||||||
config.timeout = args.timeout
|
config.timeout = args.timeout
|
||||||
|
|
||||||
|
if args.proxy_source:
|
||||||
|
config.proxy_source = ProxySourceConfig(url=args.proxy_source)
|
||||||
|
|
||||||
if args.verbose:
|
if args.verbose:
|
||||||
config.log_level = "debug"
|
config.log_level = "debug"
|
||||||
elif args.quiet:
|
elif args.quiet:
|
||||||
|
|||||||
@@ -27,6 +27,17 @@ class ChainHop:
|
|||||||
return f"{self.proto}://{auth}{self.host}:{self.port}"
|
return f"{self.proto}://{auth}{self.host}:{self.port}"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ProxySourceConfig:
|
||||||
|
"""Configuration for the dynamic proxy source API."""
|
||||||
|
|
||||||
|
url: str = ""
|
||||||
|
proto: str | None = None
|
||||||
|
country: str | None = None
|
||||||
|
limit: int | None = 1000
|
||||||
|
refresh: float = 300.0
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Config:
|
class Config:
|
||||||
"""Server configuration."""
|
"""Server configuration."""
|
||||||
@@ -36,6 +47,7 @@ class Config:
|
|||||||
chain: list[ChainHop] = field(default_factory=list)
|
chain: list[ChainHop] = field(default_factory=list)
|
||||||
timeout: float = 10.0
|
timeout: float = 10.0
|
||||||
log_level: str = "info"
|
log_level: str = "info"
|
||||||
|
proxy_source: ProxySourceConfig | None = None
|
||||||
|
|
||||||
|
|
||||||
def parse_proxy_url(url: str) -> ChainHop:
|
def parse_proxy_url(url: str) -> ChainHop:
|
||||||
@@ -98,4 +110,17 @@ def load_config(path: str | Path) -> Config:
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if "proxy_source" in raw:
|
||||||
|
ps = raw["proxy_source"]
|
||||||
|
if isinstance(ps, str):
|
||||||
|
config.proxy_source = ProxySourceConfig(url=ps)
|
||||||
|
elif isinstance(ps, dict):
|
||||||
|
config.proxy_source = ProxySourceConfig(
|
||||||
|
url=ps.get("url", ""),
|
||||||
|
proto=ps.get("proto"),
|
||||||
|
country=ps.get("country"),
|
||||||
|
limit=ps.get("limit", 1000),
|
||||||
|
refresh=float(ps.get("refresh", 300)),
|
||||||
|
)
|
||||||
|
|
||||||
return config
|
return config
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import struct
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from .config import ChainHop, Config
|
from .config import ChainHop, Config
|
||||||
|
from .source import ProxySource
|
||||||
from .proto import (
|
from .proto import (
|
||||||
ProtoError,
|
ProtoError,
|
||||||
Socks5Reply,
|
Socks5Reply,
|
||||||
@@ -131,6 +132,7 @@ async def _handle_client(
|
|||||||
client_reader: asyncio.StreamReader,
|
client_reader: asyncio.StreamReader,
|
||||||
client_writer: asyncio.StreamWriter,
|
client_writer: asyncio.StreamWriter,
|
||||||
config: Config,
|
config: Config,
|
||||||
|
proxy_source: ProxySource | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Handle a single SOCKS5 client connection."""
|
"""Handle a single SOCKS5 client connection."""
|
||||||
peer = client_writer.get_extra_info("peername")
|
peer = client_writer.get_extra_info("peername")
|
||||||
@@ -165,9 +167,16 @@ async def _handle_client(
|
|||||||
logger.info("[%s] connect %s:%d", tag, target_host, target_port)
|
logger.info("[%s] connect %s:%d", tag, target_host, target_port)
|
||||||
|
|
||||||
# -- build chain --
|
# -- build chain --
|
||||||
|
effective_chain = list(config.chain)
|
||||||
|
if proxy_source:
|
||||||
|
hop = await proxy_source.get()
|
||||||
|
if hop:
|
||||||
|
effective_chain.append(hop)
|
||||||
|
logger.debug("[%s] +proxy %s", tag, hop)
|
||||||
|
|
||||||
t0 = time.monotonic()
|
t0 = time.monotonic()
|
||||||
remote_reader, remote_writer = await build_chain(
|
remote_reader, remote_writer = await build_chain(
|
||||||
config.chain, target_host, target_port, timeout=config.timeout
|
effective_chain, target_host, target_port, timeout=config.timeout
|
||||||
)
|
)
|
||||||
dt = time.monotonic() - t0
|
dt = time.monotonic() - t0
|
||||||
logger.debug("[%s] chain up in %.0fms", tag, dt * 1000)
|
logger.debug("[%s] chain up in %.0fms", tag, dt * 1000)
|
||||||
@@ -218,9 +227,13 @@ async def _handle_client(
|
|||||||
|
|
||||||
async def serve(config: Config) -> None:
|
async def serve(config: Config) -> None:
|
||||||
"""Start the SOCKS5 proxy server."""
|
"""Start the SOCKS5 proxy server."""
|
||||||
|
proxy_source: ProxySource | None = None
|
||||||
|
if config.proxy_source and config.proxy_source.url:
|
||||||
|
proxy_source = ProxySource(config.proxy_source)
|
||||||
|
await proxy_source.start()
|
||||||
|
|
||||||
async def on_client(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None:
|
async def on_client(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None:
|
||||||
await _handle_client(r, w, config)
|
await _handle_client(r, w, config, proxy_source)
|
||||||
|
|
||||||
srv = await asyncio.start_server(on_client, config.listen_host, config.listen_port)
|
srv = await asyncio.start_server(on_client, config.listen_host, config.listen_port)
|
||||||
addrs = ", ".join(str(s.getsockname()) for s in srv.sockets)
|
addrs = ", ".join(str(s.getsockname()) for s in srv.sockets)
|
||||||
@@ -232,6 +245,9 @@ async def serve(config: Config) -> None:
|
|||||||
else:
|
else:
|
||||||
logger.info(" mode: direct (no chain)")
|
logger.info(" mode: direct (no chain)")
|
||||||
|
|
||||||
|
if proxy_source:
|
||||||
|
logger.info(" proxy source: %s (%d proxies)", config.proxy_source.url, proxy_source.count)
|
||||||
|
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
stop = loop.create_future()
|
stop = loop.create_future()
|
||||||
|
|
||||||
|
|||||||
97
src/s5p/source.py
Normal file
97
src/s5p/source.py
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
"""Dynamic proxy source -- fetches proxies from an HTTP API."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
import urllib.request
|
||||||
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
|
from .config import ChainHop, ProxySourceConfig
|
||||||
|
|
||||||
|
logger = logging.getLogger("s5p")
|
||||||
|
|
||||||
|
VALID_PROTOS = {"socks5", "socks4", "http"}
|
||||||
|
|
||||||
|
|
||||||
|
class ProxySource:
|
||||||
|
"""Fetches and caches proxies from an HTTP API.
|
||||||
|
|
||||||
|
Picks a random proxy on each ``get()`` call. Refreshes the cache
|
||||||
|
in the background at a configurable interval.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, cfg: ProxySourceConfig) -> None:
|
||||||
|
self._cfg = cfg
|
||||||
|
self._cache: list[ChainHop] = []
|
||||||
|
self._last_fetch: float = 0.0
|
||||||
|
self._lock = asyncio.Lock()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def count(self) -> int:
|
||||||
|
"""Number of proxies currently cached."""
|
||||||
|
return len(self._cache)
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
"""Initial fetch. Call once before serving."""
|
||||||
|
await self._refresh()
|
||||||
|
|
||||||
|
async def get(self) -> ChainHop | None:
|
||||||
|
"""Return a random proxy from the cache, refreshing if stale."""
|
||||||
|
now = time.monotonic()
|
||||||
|
if now - self._last_fetch > self._cfg.refresh:
|
||||||
|
await self._refresh()
|
||||||
|
if not self._cache:
|
||||||
|
return None
|
||||||
|
return random.choice(self._cache)
|
||||||
|
|
||||||
|
async def _refresh(self) -> None:
|
||||||
|
"""Fetch proxy list from the API."""
|
||||||
|
async with self._lock:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
try:
|
||||||
|
proxies = await loop.run_in_executor(None, self._fetch_sync)
|
||||||
|
self._cache = proxies
|
||||||
|
self._last_fetch = time.monotonic()
|
||||||
|
logger.info("proxy source: loaded %d proxies", len(proxies))
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("proxy source: fetch failed: %s", e)
|
||||||
|
if self._cache:
|
||||||
|
logger.info("proxy source: using stale cache (%d proxies)", len(self._cache))
|
||||||
|
|
||||||
|
def _fetch_sync(self) -> list[ChainHop]:
|
||||||
|
"""Synchronous HTTP fetch (runs in executor)."""
|
||||||
|
params: dict[str, str] = {}
|
||||||
|
if self._cfg.limit:
|
||||||
|
params["limit"] = str(self._cfg.limit)
|
||||||
|
if self._cfg.proto:
|
||||||
|
params["proto"] = self._cfg.proto
|
||||||
|
if self._cfg.country:
|
||||||
|
params["country"] = self._cfg.country
|
||||||
|
|
||||||
|
url = self._cfg.url
|
||||||
|
if params:
|
||||||
|
sep = "&" if "?" in url else "?"
|
||||||
|
url = f"{url}{sep}{urlencode(params)}"
|
||||||
|
|
||||||
|
req = urllib.request.Request(url, headers={"Accept": "application/json"})
|
||||||
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||||
|
data = json.loads(resp.read())
|
||||||
|
|
||||||
|
proxies: list[ChainHop] = []
|
||||||
|
for entry in data.get("proxies", []):
|
||||||
|
proto = entry.get("proto")
|
||||||
|
addr = entry.get("proxy", "")
|
||||||
|
if not proto or proto not in VALID_PROTOS or ":" not in addr:
|
||||||
|
continue
|
||||||
|
host, port_str = addr.rsplit(":", 1)
|
||||||
|
try:
|
||||||
|
port = int(port_str)
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
proxies.append(ChainHop(proto=proto, host=host, port=port))
|
||||||
|
|
||||||
|
return proxies
|
||||||
Reference in New Issue
Block a user