diff --git a/PROJECT.md b/PROJECT.md index 15952a7..0ef26f8 100644 --- a/PROJECT.md +++ b/PROJECT.md @@ -21,12 +21,12 @@ Client -------> s5p -------> Hop 1 -------> Hop 2 -------> Target - **server.py** -- asyncio SOCKS5 server, bidirectional relay, signal handling - **proto.py** -- protocol handshakes (SOCKS5, SOCKS4/4a, HTTP CONNECT), chain builder -- **config.py** -- YAML config loading, proxy URL parsing, pool config +- **config.py** -- YAML config loading, proxy URL parsing, API response parsing, pool config - **pool.py** -- managed proxy pool (multi-source, health-tested, persistent) -- **source.py** -- legacy proxy source (single HTTP API, kept for backward compat) - **http.py** -- minimal async HTTP/1.1 client (GET/POST JSON, no external deps) - **connpool.py** -- pre-warmed TCP connection pool to first chain hop - **cli.py** -- argparse CLI, logging setup, cProfile support +- **metrics.py** -- connection counters and human-readable summary (lock-free, asyncio-only) ## Deployment @@ -53,14 +53,14 @@ All other functionality uses Python stdlib (`asyncio`, `socket`, `struct`). - **asyncio** -- single-threaded event loop, efficient for I/O-bound proxying - **Domain passthrough** -- never resolve DNS locally to prevent leaks - **Tor as a hop** -- no special Tor handling; it's just `socks5://127.0.0.1:9050` -- **Graceful shutdown** -- SIGTERM/SIGINT handled in the event loop for clean container stops +- **Graceful shutdown** -- SIGTERM/SIGINT registered before startup for clean container stops - **Config split** -- tracked example template, gitignored live config with real addresses - **Proxy pool** -- multi-source (API + file), health-tested, persistent, auto-cleaned - **Weighted selection** -- recently-tested proxies preferred via recency decay weight - **Failure backoff** -- connection failures penalize proxy weight for 60s, avoids retry waste - **Stale expiry** -- proxies dropped from sources evicted after 3 refresh cycles if not alive - **Chain pre-flight** -- static chain tested before pool health tests; skip on failure -- **Warm start** -- quick-test alive subset on restart, defer full test to background +- **Warm start** -- trust cached alive state on restart, defer all health tests to background - **SIGHUP reload** -- re-read config, update pool settings, re-fetch sources - **Dead reporting** -- POST evicted proxies to upstream API for list quality feedback - **Connection semaphore** -- cap concurrent connections to prevent fd exhaustion diff --git a/ROADMAP.md b/ROADMAP.md index 8208f8c..a66082e 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -15,7 +15,7 @@ - [x] Per-proxy backoff (connection failure cooldown) - [x] Stale proxy expiry (last_seen TTL) - [x] Pool stats in periodic metrics log -- [x] Fast warm start (deferred full health test) +- [x] Instant warm start (trust cached state, defer all health tests) - [x] Static chain health check (pre-flight before pool tests) - [x] SIGHUP hot config reload - [x] Dead proxy reporting to source API diff --git a/TASKS.md b/TASKS.md index 2e4279f..1154a94 100644 --- a/TASKS.md +++ b/TASKS.md @@ -23,7 +23,7 @@ - [x] Per-proxy backoff (60s cooldown after connection failure) - [x] Stale proxy expiry (evict dead proxies not seen for 3 refresh cycles) - [x] Pool stats in periodic metrics log (`pool=alive/total`) -- [x] Fast warm start (quick-test alive subset, defer full test) +- [x] Fast warm start (trust cached state, defer all health tests) - [x] Static chain health check (skip pool tests if chain unreachable) - [x] SIGHUP hot config reload (timeout, retries, log_level, pool config) - [x] Dead proxy reporting (`report_url` POST evicted proxies to API) @@ -31,6 +31,18 @@ - [x] Async HTTP client (replace blocking urllib, parallel source fetch) - [x] First-hop TCP connection pool (`pool_size`, `pool_max_idle`) +- [x] Codebase consolidation (refactor/codebase-consolidation) + - [x] Extract shared proxy parsing and constants to config.py + - [x] Consolidate health-check HTTP logic in pool + - [x] Remove threading from metrics (pure asyncio, no lock needed) + - [x] Replace `ensure_future` with `create_task` + - [x] Rename ambiguous variables in config loader + - [x] Remove legacy ProxySource layer (source.py deleted) + - [x] Add tests for extracted `parse_api_proxies` +- [x] Instant warm start (trust cached state, defer all health tests) +- [x] Register signal handlers before startup (fix SIGKILL on stop) +- [x] Use k8s-file logging driver with rotation + ## Next - [ ] Integration tests with mock proxy server - [ ] SOCKS5 server-side authentication diff --git a/compose.yaml b/compose.yaml index 9663e63..75788af 100644 --- a/compose.yaml +++ b/compose.yaml @@ -13,3 +13,7 @@ services: - ~/.cache/s5p:/data:Z # command: ["-c", "/app/config/s5p.yaml", "--cprofile", "/data/s5p.prof"] network_mode: host + logging: + driver: k8s-file + options: + max-size: 10mb diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index 0ab354f..a945474 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -123,7 +123,6 @@ metrics: conn=142 ok=98 fail=44 retries=88 active=3 in=1.2M out=4.5M up=0h05m12s | DNS leak | Use `socks5h://` (not `socks5://`) in client | | Auth failed | Verify credentials in proxy URL | | Port in use | `fuser -k 1080/tcp` to free the port | -| Container slow stop | Rebuild image after SIGTERM fix | | Proxy keeps failing | Backoff penalizes for 60s; check `pool=` in metrics | | "static chain unreachable" | Tor/upstream hop is down; pool tests skipped | -| Slow startup | Normal on cold start; warm restarts use state file | +| Slow startup | Normal on cold start; warm restarts serve instantly from state | diff --git a/docs/USAGE.md b/docs/USAGE.md index e0d5566..f5f548e 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -179,11 +179,10 @@ are loaded for fast warm starts. ### Warm start -When restarting with an existing state file, only the previously-alive proxies -are tested before the server starts accepting connections. A full health test -of all proxies runs in the background. This reduces startup blocking from -minutes to seconds on warm restarts. Cold starts (no state file) test all -proxies before serving. +When restarting with an existing state file, the server trusts the cached +alive state and begins accepting connections immediately. A full health test +of all proxies runs in the background. Startup takes seconds regardless of +pool size. Cold starts (no state file) test all proxies before serving. ### Dead proxy reporting diff --git a/src/s5p/config.py b/src/s5p/config.py index a15b532..867b029 100644 --- a/src/s5p/config.py +++ b/src/s5p/config.py @@ -9,6 +9,7 @@ from urllib.parse import urlparse import yaml DEFAULT_PORTS = {"socks5": 1080, "socks4": 1080, "http": 8080} +VALID_PROTOS = {"socks5", "socks4", "http"} @dataclass @@ -26,17 +27,6 @@ class ChainHop: return f"{self.proto}://{auth}{self.host}:{self.port}" -@dataclass -class ProxySourceConfig: - """Configuration for the dynamic proxy source API (legacy).""" - - url: str = "" - proto: str | None = None - country: str | None = None - limit: int | None = 1000 - refresh: float = 300.0 - - @dataclass class PoolSourceConfig: """A single proxy source: HTTP API or text file.""" @@ -76,7 +66,6 @@ class Config: max_connections: int = 256 pool_size: int = 0 pool_max_idle: float = 30.0 - proxy_source: ProxySourceConfig | None = None proxy_pool: ProxyPoolConfig | None = None config_file: str = "" @@ -103,6 +92,27 @@ def parse_proxy_url(url: str) -> ChainHop: ) +def parse_api_proxies(data: dict) -> list[ChainHop]: + """Parse proxy list from API response ``{"proxies": [...]}``. + + Each entry must have ``proto`` (socks5/socks4/http) and ``proxy`` + (host:port). Invalid entries are silently skipped. + """ + proxies: list[ChainHop] = [] + for entry in data.get("proxies", []): + proto = entry.get("proto") + addr = entry.get("proxy", "") + if not proto or proto not in VALID_PROTOS or ":" not in addr: + continue + host, port_str = addr.rsplit(":", 1) + try: + port = int(port_str) + except ValueError: + continue + proxies.append(ChainHop(proto=proto, host=host, port=port)) + return proxies + + def load_config(path: str | Path) -> Config: """Load configuration from a YAML file.""" path = Path(path) @@ -154,9 +164,9 @@ def load_config(path: str | Path) -> Config: ) if "proxy_pool" in raw: - pp = raw["proxy_pool"] + pool_raw = raw["proxy_pool"] sources = [] - for src in pp.get("sources", []): + for src in pool_raw.get("sources", []): sources.append( PoolSourceConfig( url=src.get("url"), @@ -168,26 +178,26 @@ def load_config(path: str | Path) -> Config: ) config.proxy_pool = ProxyPoolConfig( sources=sources, - refresh=float(pp.get("refresh", 300)), - test_interval=float(pp.get("test_interval", 120)), - test_url=pp.get("test_url", "http://httpbin.org/ip"), - test_timeout=float(pp.get("test_timeout", 15)), - test_concurrency=int(pp.get("test_concurrency", 5)), - max_fails=int(pp.get("max_fails", 3)), - state_file=pp.get("state_file", ""), - report_url=pp.get("report_url", ""), + refresh=float(pool_raw.get("refresh", 300)), + test_interval=float(pool_raw.get("test_interval", 120)), + test_url=pool_raw.get("test_url", "http://httpbin.org/ip"), + test_timeout=float(pool_raw.get("test_timeout", 15)), + test_concurrency=int(pool_raw.get("test_concurrency", 5)), + max_fails=int(pool_raw.get("max_fails", 3)), + state_file=pool_raw.get("state_file", ""), + report_url=pool_raw.get("report_url", ""), ) elif "proxy_source" in raw: # backward compat: convert legacy proxy_source to proxy_pool - ps = raw["proxy_source"] - if isinstance(ps, str): - url, proto, country, limit, refresh = ps, None, None, 1000, 300.0 - elif isinstance(ps, dict): - url = ps.get("url", "") - proto = ps.get("proto") - country = ps.get("country") - limit = ps.get("limit", 1000) - refresh = float(ps.get("refresh", 300)) + src_raw = raw["proxy_source"] + if isinstance(src_raw, str): + url, proto, country, limit, refresh = src_raw, None, None, 1000, 300.0 + elif isinstance(src_raw, dict): + url = src_raw.get("url", "") + proto = src_raw.get("proto") + country = src_raw.get("country") + limit = src_raw.get("limit", 1000) + refresh = float(src_raw.get("refresh", 300)) else: url, proto, country, limit, refresh = "", None, None, 1000, 300.0 @@ -196,9 +206,5 @@ def load_config(path: str | Path) -> Config: sources=[PoolSourceConfig(url=url, proto=proto, country=country, limit=limit)], refresh=refresh, ) - # keep legacy field for source.py compat during transition - config.proxy_source = ProxySourceConfig( - url=url, proto=proto, country=country, limit=limit, refresh=refresh, - ) return config diff --git a/src/s5p/metrics.py b/src/s5p/metrics.py index ab52845..8c97a85 100644 --- a/src/s5p/metrics.py +++ b/src/s5p/metrics.py @@ -2,19 +2,17 @@ from __future__ import annotations -import threading import time class Metrics: - """Thread-safe connection metrics. + """Connection metrics. - Counters use a lock for consistency but tolerate minor races - on hot paths (bytes_in/bytes_out) for performance. + All access runs on the single asyncio event-loop thread, + so no locking is needed. """ def __init__(self) -> None: - self._lock = threading.Lock() self.connections: int = 0 self.success: int = 0 self.failed: int = 0 @@ -26,24 +24,24 @@ class Metrics: def summary(self) -> str: """One-line log-friendly summary.""" - with self._lock: - uptime = time.monotonic() - self.started - h, rem = divmod(int(uptime), 3600) - m, s = divmod(rem, 60) - return ( - f"conn={self.connections} ok={self.success} fail={self.failed} " - f"retries={self.retries} active={self.active} " - f"in={_human_bytes(self.bytes_in)} out={_human_bytes(self.bytes_out)} " - f"up={h}h{m:02d}m{s:02d}s" - ) + uptime = time.monotonic() - self.started + h, rem = divmod(int(uptime), 3600) + m, s = divmod(rem, 60) + return ( + f"conn={self.connections} ok={self.success} fail={self.failed} " + f"retries={self.retries} active={self.active} " + f"in={_human_bytes(self.bytes_in)} out={_human_bytes(self.bytes_out)} " + f"up={h}h{m:02d}m{s:02d}s" + ) def _human_bytes(n: int) -> str: """Format byte count in human-readable form.""" + value = float(n) for unit in ("B", "K", "M", "G", "T"): - if abs(n) < 1024: + if abs(value) < 1024: if unit == "B": - return f"{n}{unit}" - return f"{n:.1f}{unit}" - n /= 1024 # type: ignore[assignment] - return f"{n:.1f}P" + return f"{int(value)}{unit}" + return f"{value:.1f}{unit}" + value /= 1024 + return f"{value:.1f}P" diff --git a/src/s5p/pool.py b/src/s5p/pool.py index ca5a0ef..0d099fb 100644 --- a/src/s5p/pool.py +++ b/src/s5p/pool.py @@ -12,13 +12,12 @@ from dataclasses import dataclass from pathlib import Path from urllib.parse import urlencode, urlparse -from .config import ChainHop, PoolSourceConfig, ProxyPoolConfig, parse_proxy_url +from .config import ChainHop, PoolSourceConfig, ProxyPoolConfig, parse_api_proxies, parse_proxy_url from .http import http_get_json, http_post_json from .proto import ProtoError, build_chain logger = logging.getLogger("s5p") -VALID_PROTOS = {"socks5", "socks4", "http"} STATE_VERSION = 1 @@ -67,21 +66,21 @@ class ProxyPool: # -- public interface ---------------------------------------------------- async def start(self) -> None: - """Load state, fetch sources, run initial health test, start loops.""" + """Load state, fetch sources, start background loops. + + On warm start (state file has alive proxies), the pool begins + serving immediately using cached state and defers all health + testing to background tasks. On cold start, a full health + test runs before returning so the caller has live proxies. + """ self._load_state() - warm_keys = list(self._alive_keys) + warm = bool(self._alive_keys) await self._fetch_all_sources() - if warm_keys: - # warm start: quick-test previously-alive proxies first - valid_keys = [k for k in warm_keys if k in self._proxies] - if valid_keys: - await self._run_health_tests(keys=valid_keys) - self._save_state() - self._tasks.append(asyncio.create_task(self._deferred_full_test())) - else: - await self._run_health_tests() - self._save_state() + if warm: + # trust persisted alive state, verify in background + self._save_state() + self._tasks.append(asyncio.create_task(self._deferred_full_test())) else: # cold start: test everything before serving await self._run_health_tests() @@ -195,20 +194,7 @@ class ProxyPool: url = f"{url}{sep}{urlencode(params)}" data = await http_get_json(url) - - proxies: list[ChainHop] = [] - for entry in data.get("proxies", []): - proto = entry.get("proto") - addr = entry.get("proxy", "") - if not proto or proto not in VALID_PROTOS or ":" not in addr: - continue - host, port_str = addr.rsplit(":", 1) - try: - port = int(port_str) - except ValueError: - continue - proxies.append(ChainHop(proto=proto, host=host, port=port)) - return proxies + return parse_api_proxies(data) def _fetch_file_sync(self, src: PoolSourceConfig) -> list[ChainHop]: """Parse a text file with one proxy URL per line (runs in executor).""" @@ -247,16 +233,13 @@ class ProxyPool: # -- health testing ------------------------------------------------------ - async def _test_proxy(self, key: str, entry: ProxyEntry) -> bool: - """Test a single proxy by building the full chain and sending HTTP GET.""" + async def _http_check(self, chain: list[ChainHop]) -> bool: + """Send an HTTP GET through *chain* and return True on 2xx.""" parsed = urlparse(self._cfg.test_url) host = parsed.hostname or "httpbin.org" port = parsed.port or 80 path = parsed.path or "/" - chain = self._chain + [entry.hop] - entry.last_test = time.time() - entry.tests += 1 try: reader, writer = await build_chain( chain, host, port, timeout=self._cfg.test_timeout, @@ -281,39 +264,17 @@ class ProxyPool: except OSError: pass + async def _test_proxy(self, key: str, entry: ProxyEntry) -> bool: + """Test a single proxy by building the full chain and sending HTTP GET.""" + entry.last_test = time.time() + entry.tests += 1 + return await self._http_check(self._chain + [entry.hop]) + async def _test_chain(self) -> bool: """Test the static chain without any pool proxy.""" if not self._chain: return True - - parsed = urlparse(self._cfg.test_url) - host = parsed.hostname or "httpbin.org" - port = parsed.port or 80 - path = parsed.path or "/" - - try: - reader, writer = await build_chain( - self._chain, host, port, timeout=self._cfg.test_timeout, - ) - except (ProtoError, TimeoutError, ConnectionError, OSError, EOFError): - return False - - try: - request = f"GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n" - writer.write(request.encode()) - await writer.drain() - - line = await asyncio.wait_for(reader.readline(), timeout=self._cfg.test_timeout) - parts = line.decode("utf-8", errors="replace").split(None, 2) - return len(parts) >= 2 and parts[1].startswith("2") - except (TimeoutError, ConnectionError, OSError, EOFError): - return False - finally: - try: - writer.close() - await writer.wait_closed() - except OSError: - pass + return await self._http_check(self._chain) async def _run_health_tests(self, keys: list[str] | None = None) -> None: """Test proxies with bounded concurrency. @@ -408,8 +369,7 @@ class ProxyPool: # report evicted proxies to upstream API if evict_keys and self._cfg.report_url: - dead = [k for k in evict_keys] - asyncio.ensure_future(self._report_dead(dead)) + asyncio.create_task(self._report_dead(list(evict_keys))) async def _report_dead(self, keys: list[str]) -> None: """POST dead proxy list to report_url (fire-and-forget, async).""" diff --git a/src/s5p/server.py b/src/s5p/server.py index 46fe2ad..42df0cc 100644 --- a/src/s5p/server.py +++ b/src/s5p/server.py @@ -13,7 +13,6 @@ from .connpool import FirstHopPool from .metrics import Metrics from .pool import ProxyPool from .proto import ProtoError, Socks5Reply, build_chain, read_socks5_address -from .source import ProxySource logger = logging.getLogger("s5p") @@ -60,7 +59,7 @@ async def _handle_client( client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter, config: Config, - proxy_pool: ProxyPool | ProxySource | None = None, + proxy_pool: ProxyPool | None = None, metrics: Metrics | None = None, first_hop_pool: FirstHopPool | None = None, ) -> None: @@ -123,7 +122,7 @@ async def _handle_client( break except (ProtoError, TimeoutError, ConnectionError, OSError) as e: last_err = e - if pool_hop and isinstance(proxy_pool, ProxyPool): + if pool_hop and proxy_pool: proxy_pool.report_failure(pool_hop) if metrics: metrics.retries += 1 @@ -215,15 +214,18 @@ async def _metrics_logger( async def serve(config: Config) -> None: """Start the SOCKS5 proxy server.""" + # register signal handlers early so SIGTERM is never ignored + loop = asyncio.get_running_loop() + stop = loop.create_future() + + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, lambda s=sig: stop.set_result(s)) + metrics = Metrics() - proxy_pool: ProxyPool | ProxySource | None = None + proxy_pool: ProxyPool | None = None if config.proxy_pool and config.proxy_pool.sources: - pool = ProxyPool(config.proxy_pool, config.chain, config.timeout) - await pool.start() - proxy_pool = pool - elif config.proxy_source and config.proxy_source.url: - proxy_pool = ProxySource(config.proxy_source) + proxy_pool = ProxyPool(config.proxy_pool, config.chain, config.timeout) await proxy_pool.start() hop_pool: FirstHopPool | None = None @@ -249,22 +251,13 @@ async def serve(config: Config) -> None: else: logger.info(" mode: direct (no chain)") - if isinstance(proxy_pool, ProxyPool): + if proxy_pool: nsrc = len(config.proxy_pool.sources) logger.info( " pool: %d proxies, %d alive (from %d source%s)", proxy_pool.count, proxy_pool.alive_count, nsrc, "s" if nsrc != 1 else "", ) logger.info(" retries: %d", config.retries) - elif proxy_pool: - logger.info(" proxy source: %s (%d proxies)", config.proxy_source.url, proxy_pool.count) - logger.info(" retries: %d", config.retries) - - loop = asyncio.get_running_loop() - stop = loop.create_future() - - for sig in (signal.SIGTERM, signal.SIGINT): - loop.add_signal_handler(sig, lambda s=sig: stop.set_result(s)) # SIGHUP: hot-reload config (timeout, retries, log_level, pool settings) async def _reload() -> None: @@ -287,17 +280,17 @@ async def serve(config: Config) -> None: for h in root.handlers: h.setLevel(level) logging.getLogger("s5p").setLevel(level) - if isinstance(proxy_pool, ProxyPool) and new.proxy_pool: + if proxy_pool and new.proxy_pool: await proxy_pool.reload(new.proxy_pool) logger.info("reload: config reloaded") def _on_sighup() -> None: - asyncio.ensure_future(_reload()) + asyncio.create_task(_reload()) loop.add_signal_handler(signal.SIGHUP, _on_sighup) metrics_stop = asyncio.Event() - pool_ref = proxy_pool if isinstance(proxy_pool, ProxyPool) else None + pool_ref = proxy_pool metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, pool_ref)) async with srv: @@ -305,7 +298,7 @@ async def serve(config: Config) -> None: logger.info("received %s, shutting down", signal.Signals(sig).name) if hop_pool: await hop_pool.stop() - if isinstance(proxy_pool, ProxyPool): + if proxy_pool: await proxy_pool.stop() shutdown_line = metrics.summary() if pool_ref: diff --git a/src/s5p/source.py b/src/s5p/source.py deleted file mode 100644 index 8e7cbbd..0000000 --- a/src/s5p/source.py +++ /dev/null @@ -1,93 +0,0 @@ -"""Dynamic proxy source -- fetches proxies from an HTTP API.""" - -from __future__ import annotations - -import asyncio -import logging -import random -import time -from urllib.parse import urlencode - -from .config import ChainHop, ProxySourceConfig -from .http import http_get_json - -logger = logging.getLogger("s5p") - -VALID_PROTOS = {"socks5", "socks4", "http"} - - -class ProxySource: - """Fetches and caches proxies from an HTTP API. - - Picks a random proxy on each ``get()`` call. Refreshes the cache - in the background at a configurable interval. - """ - - def __init__(self, cfg: ProxySourceConfig) -> None: - self._cfg = cfg - self._cache: list[ChainHop] = [] - self._last_fetch: float = 0.0 - self._lock = asyncio.Lock() - - @property - def count(self) -> int: - """Number of proxies currently cached.""" - return len(self._cache) - - async def start(self) -> None: - """Initial fetch. Call once before serving.""" - await self._refresh() - - async def get(self) -> ChainHop | None: - """Return a random proxy from the cache, refreshing if stale.""" - now = time.monotonic() - if now - self._last_fetch > self._cfg.refresh: - await self._refresh() - if not self._cache: - return None - return random.choice(self._cache) - - async def _refresh(self) -> None: - """Fetch proxy list from the API (async).""" - async with self._lock: - try: - proxies = await self._fetch() - self._cache = proxies - self._last_fetch = time.monotonic() - logger.info("proxy source: loaded %d proxies", len(proxies)) - except Exception as e: - logger.warning("proxy source: fetch failed: %s", e) - if self._cache: - logger.info("proxy source: using stale cache (%d proxies)", len(self._cache)) - - async def _fetch(self) -> list[ChainHop]: - """Async HTTP fetch.""" - params: dict[str, str] = {} - if self._cfg.limit: - params["limit"] = str(self._cfg.limit) - if self._cfg.proto: - params["proto"] = self._cfg.proto - if self._cfg.country: - params["country"] = self._cfg.country - - url = self._cfg.url - if params: - sep = "&" if "?" in url else "?" - url = f"{url}{sep}{urlencode(params)}" - - data = await http_get_json(url) - - proxies: list[ChainHop] = [] - for entry in data.get("proxies", []): - proto = entry.get("proto") - addr = entry.get("proxy", "") - if not proto or proto not in VALID_PROTOS or ":" not in addr: - continue - host, port_str = addr.rsplit(":", 1) - try: - port = int(port_str) - except ValueError: - continue - proxies.append(ChainHop(proto=proto, host=host, port=port)) - - return proxies diff --git a/tests/test_config.py b/tests/test_config.py index 3ff8751..b3c82b4 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -2,7 +2,7 @@ import pytest -from s5p.config import ChainHop, Config, load_config, parse_proxy_url +from s5p.config import ChainHop, Config, load_config, parse_api_proxies, parse_proxy_url class TestParseProxyUrl: @@ -65,6 +65,57 @@ class TestChainHop: assert str(hop) == "http://u@proxy:8080" +class TestParseApiProxies: + """Test API response proxy parsing.""" + + def test_valid_entries(self): + data = { + "proxies": [ + {"proto": "socks5", "proxy": "1.2.3.4:1080"}, + {"proto": "http", "proxy": "5.6.7.8:8080"}, + ], + } + result = parse_api_proxies(data) + assert len(result) == 2 + assert result[0] == ChainHop(proto="socks5", host="1.2.3.4", port=1080) + assert result[1] == ChainHop(proto="http", host="5.6.7.8", port=8080) + + def test_skips_invalid_proto(self): + data = {"proxies": [{"proto": "ftp", "proxy": "1.2.3.4:21"}]} + assert parse_api_proxies(data) == [] + + def test_skips_missing_proto(self): + data = {"proxies": [{"proxy": "1.2.3.4:1080"}]} + assert parse_api_proxies(data) == [] + + def test_skips_missing_colon(self): + data = {"proxies": [{"proto": "socks5", "proxy": "no-port"}]} + assert parse_api_proxies(data) == [] + + def test_skips_bad_port(self): + data = {"proxies": [{"proto": "socks5", "proxy": "1.2.3.4:abc"}]} + assert parse_api_proxies(data) == [] + + def test_empty_proxies(self): + assert parse_api_proxies({"proxies": []}) == [] + + def test_missing_proxies_key(self): + assert parse_api_proxies({}) == [] + + def test_mixed_valid_invalid(self): + data = { + "proxies": [ + {"proto": "socks5", "proxy": "1.2.3.4:1080"}, + {"proto": "ftp", "proxy": "bad:21"}, + {"proto": "socks4", "proxy": "5.6.7.8:1080"}, + ], + } + result = parse_api_proxies(data) + assert len(result) == 2 + assert result[0].proto == "socks5" + assert result[1].proto == "socks4" + + class TestConfig: """Test Config defaults."""