merge: codebase consolidation and startup fixes

11 commits: deduplicate constants/parsing, consolidate health-check
logic, remove legacy ProxySource layer, instant warm start, early
signal handler registration, k8s-file logging driver.

60 tests passing, ruff clean, zero behavior changes.
This commit is contained in:
user
2026-02-15 22:24:35 +01:00
12 changed files with 178 additions and 249 deletions

View File

@@ -21,12 +21,12 @@ Client -------> s5p -------> Hop 1 -------> Hop 2 -------> Target
- **server.py** -- asyncio SOCKS5 server, bidirectional relay, signal handling
- **proto.py** -- protocol handshakes (SOCKS5, SOCKS4/4a, HTTP CONNECT), chain builder
- **config.py** -- YAML config loading, proxy URL parsing, pool config
- **config.py** -- YAML config loading, proxy URL parsing, API response parsing, pool config
- **pool.py** -- managed proxy pool (multi-source, health-tested, persistent)
- **source.py** -- legacy proxy source (single HTTP API, kept for backward compat)
- **http.py** -- minimal async HTTP/1.1 client (GET/POST JSON, no external deps)
- **connpool.py** -- pre-warmed TCP connection pool to first chain hop
- **cli.py** -- argparse CLI, logging setup, cProfile support
- **metrics.py** -- connection counters and human-readable summary (lock-free, asyncio-only)
## Deployment
@@ -53,14 +53,14 @@ All other functionality uses Python stdlib (`asyncio`, `socket`, `struct`).
- **asyncio** -- single-threaded event loop, efficient for I/O-bound proxying
- **Domain passthrough** -- never resolve DNS locally to prevent leaks
- **Tor as a hop** -- no special Tor handling; it's just `socks5://127.0.0.1:9050`
- **Graceful shutdown** -- SIGTERM/SIGINT handled in the event loop for clean container stops
- **Graceful shutdown** -- SIGTERM/SIGINT registered before startup for clean container stops
- **Config split** -- tracked example template, gitignored live config with real addresses
- **Proxy pool** -- multi-source (API + file), health-tested, persistent, auto-cleaned
- **Weighted selection** -- recently-tested proxies preferred via recency decay weight
- **Failure backoff** -- connection failures penalize proxy weight for 60s, avoids retry waste
- **Stale expiry** -- proxies dropped from sources evicted after 3 refresh cycles if not alive
- **Chain pre-flight** -- static chain tested before pool health tests; skip on failure
- **Warm start** -- quick-test alive subset on restart, defer full test to background
- **Warm start** -- trust cached alive state on restart, defer all health tests to background
- **SIGHUP reload** -- re-read config, update pool settings, re-fetch sources
- **Dead reporting** -- POST evicted proxies to upstream API for list quality feedback
- **Connection semaphore** -- cap concurrent connections to prevent fd exhaustion

View File

@@ -15,7 +15,7 @@
- [x] Per-proxy backoff (connection failure cooldown)
- [x] Stale proxy expiry (last_seen TTL)
- [x] Pool stats in periodic metrics log
- [x] Fast warm start (deferred full health test)
- [x] Instant warm start (trust cached state, defer all health tests)
- [x] Static chain health check (pre-flight before pool tests)
- [x] SIGHUP hot config reload
- [x] Dead proxy reporting to source API

View File

@@ -23,7 +23,7 @@
- [x] Per-proxy backoff (60s cooldown after connection failure)
- [x] Stale proxy expiry (evict dead proxies not seen for 3 refresh cycles)
- [x] Pool stats in periodic metrics log (`pool=alive/total`)
- [x] Fast warm start (quick-test alive subset, defer full test)
- [x] Fast warm start (trust cached state, defer all health tests)
- [x] Static chain health check (skip pool tests if chain unreachable)
- [x] SIGHUP hot config reload (timeout, retries, log_level, pool config)
- [x] Dead proxy reporting (`report_url` POST evicted proxies to API)
@@ -31,6 +31,18 @@
- [x] Async HTTP client (replace blocking urllib, parallel source fetch)
- [x] First-hop TCP connection pool (`pool_size`, `pool_max_idle`)
- [x] Codebase consolidation (refactor/codebase-consolidation)
- [x] Extract shared proxy parsing and constants to config.py
- [x] Consolidate health-check HTTP logic in pool
- [x] Remove threading from metrics (pure asyncio, no lock needed)
- [x] Replace `ensure_future` with `create_task`
- [x] Rename ambiguous variables in config loader
- [x] Remove legacy ProxySource layer (source.py deleted)
- [x] Add tests for extracted `parse_api_proxies`
- [x] Instant warm start (trust cached state, defer all health tests)
- [x] Register signal handlers before startup (fix SIGKILL on stop)
- [x] Use k8s-file logging driver with rotation
## Next
- [ ] Integration tests with mock proxy server
- [ ] SOCKS5 server-side authentication

View File

@@ -13,3 +13,7 @@ services:
- ~/.cache/s5p:/data:Z
# command: ["-c", "/app/config/s5p.yaml", "--cprofile", "/data/s5p.prof"]
network_mode: host
logging:
driver: k8s-file
options:
max-size: 10mb

View File

@@ -123,7 +123,6 @@ metrics: conn=142 ok=98 fail=44 retries=88 active=3 in=1.2M out=4.5M up=0h05m12s
| DNS leak | Use `socks5h://` (not `socks5://`) in client |
| Auth failed | Verify credentials in proxy URL |
| Port in use | `fuser -k 1080/tcp` to free the port |
| Container slow stop | Rebuild image after SIGTERM fix |
| Proxy keeps failing | Backoff penalizes for 60s; check `pool=` in metrics |
| "static chain unreachable" | Tor/upstream hop is down; pool tests skipped |
| Slow startup | Normal on cold start; warm restarts use state file |
| Slow startup | Normal on cold start; warm restarts serve instantly from state |

View File

@@ -179,11 +179,10 @@ are loaded for fast warm starts.
### Warm start
When restarting with an existing state file, only the previously-alive proxies
are tested before the server starts accepting connections. A full health test
of all proxies runs in the background. This reduces startup blocking from
minutes to seconds on warm restarts. Cold starts (no state file) test all
proxies before serving.
When restarting with an existing state file, the server trusts the cached
alive state and begins accepting connections immediately. A full health test
of all proxies runs in the background. Startup takes seconds regardless of
pool size. Cold starts (no state file) test all proxies before serving.
### Dead proxy reporting

View File

@@ -9,6 +9,7 @@ from urllib.parse import urlparse
import yaml
DEFAULT_PORTS = {"socks5": 1080, "socks4": 1080, "http": 8080}
VALID_PROTOS = {"socks5", "socks4", "http"}
@dataclass
@@ -26,17 +27,6 @@ class ChainHop:
return f"{self.proto}://{auth}{self.host}:{self.port}"
@dataclass
class ProxySourceConfig:
"""Configuration for the dynamic proxy source API (legacy)."""
url: str = ""
proto: str | None = None
country: str | None = None
limit: int | None = 1000
refresh: float = 300.0
@dataclass
class PoolSourceConfig:
"""A single proxy source: HTTP API or text file."""
@@ -76,7 +66,6 @@ class Config:
max_connections: int = 256
pool_size: int = 0
pool_max_idle: float = 30.0
proxy_source: ProxySourceConfig | None = None
proxy_pool: ProxyPoolConfig | None = None
config_file: str = ""
@@ -103,6 +92,27 @@ def parse_proxy_url(url: str) -> ChainHop:
)
def parse_api_proxies(data: dict) -> list[ChainHop]:
"""Parse proxy list from API response ``{"proxies": [...]}``.
Each entry must have ``proto`` (socks5/socks4/http) and ``proxy``
(host:port). Invalid entries are silently skipped.
"""
proxies: list[ChainHop] = []
for entry in data.get("proxies", []):
proto = entry.get("proto")
addr = entry.get("proxy", "")
if not proto or proto not in VALID_PROTOS or ":" not in addr:
continue
host, port_str = addr.rsplit(":", 1)
try:
port = int(port_str)
except ValueError:
continue
proxies.append(ChainHop(proto=proto, host=host, port=port))
return proxies
def load_config(path: str | Path) -> Config:
"""Load configuration from a YAML file."""
path = Path(path)
@@ -154,9 +164,9 @@ def load_config(path: str | Path) -> Config:
)
if "proxy_pool" in raw:
pp = raw["proxy_pool"]
pool_raw = raw["proxy_pool"]
sources = []
for src in pp.get("sources", []):
for src in pool_raw.get("sources", []):
sources.append(
PoolSourceConfig(
url=src.get("url"),
@@ -168,26 +178,26 @@ def load_config(path: str | Path) -> Config:
)
config.proxy_pool = ProxyPoolConfig(
sources=sources,
refresh=float(pp.get("refresh", 300)),
test_interval=float(pp.get("test_interval", 120)),
test_url=pp.get("test_url", "http://httpbin.org/ip"),
test_timeout=float(pp.get("test_timeout", 15)),
test_concurrency=int(pp.get("test_concurrency", 5)),
max_fails=int(pp.get("max_fails", 3)),
state_file=pp.get("state_file", ""),
report_url=pp.get("report_url", ""),
refresh=float(pool_raw.get("refresh", 300)),
test_interval=float(pool_raw.get("test_interval", 120)),
test_url=pool_raw.get("test_url", "http://httpbin.org/ip"),
test_timeout=float(pool_raw.get("test_timeout", 15)),
test_concurrency=int(pool_raw.get("test_concurrency", 5)),
max_fails=int(pool_raw.get("max_fails", 3)),
state_file=pool_raw.get("state_file", ""),
report_url=pool_raw.get("report_url", ""),
)
elif "proxy_source" in raw:
# backward compat: convert legacy proxy_source to proxy_pool
ps = raw["proxy_source"]
if isinstance(ps, str):
url, proto, country, limit, refresh = ps, None, None, 1000, 300.0
elif isinstance(ps, dict):
url = ps.get("url", "")
proto = ps.get("proto")
country = ps.get("country")
limit = ps.get("limit", 1000)
refresh = float(ps.get("refresh", 300))
src_raw = raw["proxy_source"]
if isinstance(src_raw, str):
url, proto, country, limit, refresh = src_raw, None, None, 1000, 300.0
elif isinstance(src_raw, dict):
url = src_raw.get("url", "")
proto = src_raw.get("proto")
country = src_raw.get("country")
limit = src_raw.get("limit", 1000)
refresh = float(src_raw.get("refresh", 300))
else:
url, proto, country, limit, refresh = "", None, None, 1000, 300.0
@@ -196,9 +206,5 @@ def load_config(path: str | Path) -> Config:
sources=[PoolSourceConfig(url=url, proto=proto, country=country, limit=limit)],
refresh=refresh,
)
# keep legacy field for source.py compat during transition
config.proxy_source = ProxySourceConfig(
url=url, proto=proto, country=country, limit=limit, refresh=refresh,
)
return config

View File

@@ -2,19 +2,17 @@
from __future__ import annotations
import threading
import time
class Metrics:
"""Thread-safe connection metrics.
"""Connection metrics.
Counters use a lock for consistency but tolerate minor races
on hot paths (bytes_in/bytes_out) for performance.
All access runs on the single asyncio event-loop thread,
so no locking is needed.
"""
def __init__(self) -> None:
self._lock = threading.Lock()
self.connections: int = 0
self.success: int = 0
self.failed: int = 0
@@ -26,24 +24,24 @@ class Metrics:
def summary(self) -> str:
"""One-line log-friendly summary."""
with self._lock:
uptime = time.monotonic() - self.started
h, rem = divmod(int(uptime), 3600)
m, s = divmod(rem, 60)
return (
f"conn={self.connections} ok={self.success} fail={self.failed} "
f"retries={self.retries} active={self.active} "
f"in={_human_bytes(self.bytes_in)} out={_human_bytes(self.bytes_out)} "
f"up={h}h{m:02d}m{s:02d}s"
)
uptime = time.monotonic() - self.started
h, rem = divmod(int(uptime), 3600)
m, s = divmod(rem, 60)
return (
f"conn={self.connections} ok={self.success} fail={self.failed} "
f"retries={self.retries} active={self.active} "
f"in={_human_bytes(self.bytes_in)} out={_human_bytes(self.bytes_out)} "
f"up={h}h{m:02d}m{s:02d}s"
)
def _human_bytes(n: int) -> str:
"""Format byte count in human-readable form."""
value = float(n)
for unit in ("B", "K", "M", "G", "T"):
if abs(n) < 1024:
if abs(value) < 1024:
if unit == "B":
return f"{n}{unit}"
return f"{n:.1f}{unit}"
n /= 1024 # type: ignore[assignment]
return f"{n:.1f}P"
return f"{int(value)}{unit}"
return f"{value:.1f}{unit}"
value /= 1024
return f"{value:.1f}P"

View File

@@ -12,13 +12,12 @@ from dataclasses import dataclass
from pathlib import Path
from urllib.parse import urlencode, urlparse
from .config import ChainHop, PoolSourceConfig, ProxyPoolConfig, parse_proxy_url
from .config import ChainHop, PoolSourceConfig, ProxyPoolConfig, parse_api_proxies, parse_proxy_url
from .http import http_get_json, http_post_json
from .proto import ProtoError, build_chain
logger = logging.getLogger("s5p")
VALID_PROTOS = {"socks5", "socks4", "http"}
STATE_VERSION = 1
@@ -67,21 +66,21 @@ class ProxyPool:
# -- public interface ----------------------------------------------------
async def start(self) -> None:
"""Load state, fetch sources, run initial health test, start loops."""
"""Load state, fetch sources, start background loops.
On warm start (state file has alive proxies), the pool begins
serving immediately using cached state and defers all health
testing to background tasks. On cold start, a full health
test runs before returning so the caller has live proxies.
"""
self._load_state()
warm_keys = list(self._alive_keys)
warm = bool(self._alive_keys)
await self._fetch_all_sources()
if warm_keys:
# warm start: quick-test previously-alive proxies first
valid_keys = [k for k in warm_keys if k in self._proxies]
if valid_keys:
await self._run_health_tests(keys=valid_keys)
self._save_state()
self._tasks.append(asyncio.create_task(self._deferred_full_test()))
else:
await self._run_health_tests()
self._save_state()
if warm:
# trust persisted alive state, verify in background
self._save_state()
self._tasks.append(asyncio.create_task(self._deferred_full_test()))
else:
# cold start: test everything before serving
await self._run_health_tests()
@@ -195,20 +194,7 @@ class ProxyPool:
url = f"{url}{sep}{urlencode(params)}"
data = await http_get_json(url)
proxies: list[ChainHop] = []
for entry in data.get("proxies", []):
proto = entry.get("proto")
addr = entry.get("proxy", "")
if not proto or proto not in VALID_PROTOS or ":" not in addr:
continue
host, port_str = addr.rsplit(":", 1)
try:
port = int(port_str)
except ValueError:
continue
proxies.append(ChainHop(proto=proto, host=host, port=port))
return proxies
return parse_api_proxies(data)
def _fetch_file_sync(self, src: PoolSourceConfig) -> list[ChainHop]:
"""Parse a text file with one proxy URL per line (runs in executor)."""
@@ -247,16 +233,13 @@ class ProxyPool:
# -- health testing ------------------------------------------------------
async def _test_proxy(self, key: str, entry: ProxyEntry) -> bool:
"""Test a single proxy by building the full chain and sending HTTP GET."""
async def _http_check(self, chain: list[ChainHop]) -> bool:
"""Send an HTTP GET through *chain* and return True on 2xx."""
parsed = urlparse(self._cfg.test_url)
host = parsed.hostname or "httpbin.org"
port = parsed.port or 80
path = parsed.path or "/"
chain = self._chain + [entry.hop]
entry.last_test = time.time()
entry.tests += 1
try:
reader, writer = await build_chain(
chain, host, port, timeout=self._cfg.test_timeout,
@@ -281,39 +264,17 @@ class ProxyPool:
except OSError:
pass
async def _test_proxy(self, key: str, entry: ProxyEntry) -> bool:
"""Test a single proxy by building the full chain and sending HTTP GET."""
entry.last_test = time.time()
entry.tests += 1
return await self._http_check(self._chain + [entry.hop])
async def _test_chain(self) -> bool:
"""Test the static chain without any pool proxy."""
if not self._chain:
return True
parsed = urlparse(self._cfg.test_url)
host = parsed.hostname or "httpbin.org"
port = parsed.port or 80
path = parsed.path or "/"
try:
reader, writer = await build_chain(
self._chain, host, port, timeout=self._cfg.test_timeout,
)
except (ProtoError, TimeoutError, ConnectionError, OSError, EOFError):
return False
try:
request = f"GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"
writer.write(request.encode())
await writer.drain()
line = await asyncio.wait_for(reader.readline(), timeout=self._cfg.test_timeout)
parts = line.decode("utf-8", errors="replace").split(None, 2)
return len(parts) >= 2 and parts[1].startswith("2")
except (TimeoutError, ConnectionError, OSError, EOFError):
return False
finally:
try:
writer.close()
await writer.wait_closed()
except OSError:
pass
return await self._http_check(self._chain)
async def _run_health_tests(self, keys: list[str] | None = None) -> None:
"""Test proxies with bounded concurrency.
@@ -408,8 +369,7 @@ class ProxyPool:
# report evicted proxies to upstream API
if evict_keys and self._cfg.report_url:
dead = [k for k in evict_keys]
asyncio.ensure_future(self._report_dead(dead))
asyncio.create_task(self._report_dead(list(evict_keys)))
async def _report_dead(self, keys: list[str]) -> None:
"""POST dead proxy list to report_url (fire-and-forget, async)."""

View File

@@ -13,7 +13,6 @@ from .connpool import FirstHopPool
from .metrics import Metrics
from .pool import ProxyPool
from .proto import ProtoError, Socks5Reply, build_chain, read_socks5_address
from .source import ProxySource
logger = logging.getLogger("s5p")
@@ -60,7 +59,7 @@ async def _handle_client(
client_reader: asyncio.StreamReader,
client_writer: asyncio.StreamWriter,
config: Config,
proxy_pool: ProxyPool | ProxySource | None = None,
proxy_pool: ProxyPool | None = None,
metrics: Metrics | None = None,
first_hop_pool: FirstHopPool | None = None,
) -> None:
@@ -123,7 +122,7 @@ async def _handle_client(
break
except (ProtoError, TimeoutError, ConnectionError, OSError) as e:
last_err = e
if pool_hop and isinstance(proxy_pool, ProxyPool):
if pool_hop and proxy_pool:
proxy_pool.report_failure(pool_hop)
if metrics:
metrics.retries += 1
@@ -215,15 +214,18 @@ async def _metrics_logger(
async def serve(config: Config) -> None:
"""Start the SOCKS5 proxy server."""
# register signal handlers early so SIGTERM is never ignored
loop = asyncio.get_running_loop()
stop = loop.create_future()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda s=sig: stop.set_result(s))
metrics = Metrics()
proxy_pool: ProxyPool | ProxySource | None = None
proxy_pool: ProxyPool | None = None
if config.proxy_pool and config.proxy_pool.sources:
pool = ProxyPool(config.proxy_pool, config.chain, config.timeout)
await pool.start()
proxy_pool = pool
elif config.proxy_source and config.proxy_source.url:
proxy_pool = ProxySource(config.proxy_source)
proxy_pool = ProxyPool(config.proxy_pool, config.chain, config.timeout)
await proxy_pool.start()
hop_pool: FirstHopPool | None = None
@@ -249,22 +251,13 @@ async def serve(config: Config) -> None:
else:
logger.info(" mode: direct (no chain)")
if isinstance(proxy_pool, ProxyPool):
if proxy_pool:
nsrc = len(config.proxy_pool.sources)
logger.info(
" pool: %d proxies, %d alive (from %d source%s)",
proxy_pool.count, proxy_pool.alive_count, nsrc, "s" if nsrc != 1 else "",
)
logger.info(" retries: %d", config.retries)
elif proxy_pool:
logger.info(" proxy source: %s (%d proxies)", config.proxy_source.url, proxy_pool.count)
logger.info(" retries: %d", config.retries)
loop = asyncio.get_running_loop()
stop = loop.create_future()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda s=sig: stop.set_result(s))
# SIGHUP: hot-reload config (timeout, retries, log_level, pool settings)
async def _reload() -> None:
@@ -287,17 +280,17 @@ async def serve(config: Config) -> None:
for h in root.handlers:
h.setLevel(level)
logging.getLogger("s5p").setLevel(level)
if isinstance(proxy_pool, ProxyPool) and new.proxy_pool:
if proxy_pool and new.proxy_pool:
await proxy_pool.reload(new.proxy_pool)
logger.info("reload: config reloaded")
def _on_sighup() -> None:
asyncio.ensure_future(_reload())
asyncio.create_task(_reload())
loop.add_signal_handler(signal.SIGHUP, _on_sighup)
metrics_stop = asyncio.Event()
pool_ref = proxy_pool if isinstance(proxy_pool, ProxyPool) else None
pool_ref = proxy_pool
metrics_task = asyncio.create_task(_metrics_logger(metrics, metrics_stop, pool_ref))
async with srv:
@@ -305,7 +298,7 @@ async def serve(config: Config) -> None:
logger.info("received %s, shutting down", signal.Signals(sig).name)
if hop_pool:
await hop_pool.stop()
if isinstance(proxy_pool, ProxyPool):
if proxy_pool:
await proxy_pool.stop()
shutdown_line = metrics.summary()
if pool_ref:

View File

@@ -1,93 +0,0 @@
"""Dynamic proxy source -- fetches proxies from an HTTP API."""
from __future__ import annotations
import asyncio
import logging
import random
import time
from urllib.parse import urlencode
from .config import ChainHop, ProxySourceConfig
from .http import http_get_json
logger = logging.getLogger("s5p")
VALID_PROTOS = {"socks5", "socks4", "http"}
class ProxySource:
"""Fetches and caches proxies from an HTTP API.
Picks a random proxy on each ``get()`` call. Refreshes the cache
in the background at a configurable interval.
"""
def __init__(self, cfg: ProxySourceConfig) -> None:
self._cfg = cfg
self._cache: list[ChainHop] = []
self._last_fetch: float = 0.0
self._lock = asyncio.Lock()
@property
def count(self) -> int:
"""Number of proxies currently cached."""
return len(self._cache)
async def start(self) -> None:
"""Initial fetch. Call once before serving."""
await self._refresh()
async def get(self) -> ChainHop | None:
"""Return a random proxy from the cache, refreshing if stale."""
now = time.monotonic()
if now - self._last_fetch > self._cfg.refresh:
await self._refresh()
if not self._cache:
return None
return random.choice(self._cache)
async def _refresh(self) -> None:
"""Fetch proxy list from the API (async)."""
async with self._lock:
try:
proxies = await self._fetch()
self._cache = proxies
self._last_fetch = time.monotonic()
logger.info("proxy source: loaded %d proxies", len(proxies))
except Exception as e:
logger.warning("proxy source: fetch failed: %s", e)
if self._cache:
logger.info("proxy source: using stale cache (%d proxies)", len(self._cache))
async def _fetch(self) -> list[ChainHop]:
"""Async HTTP fetch."""
params: dict[str, str] = {}
if self._cfg.limit:
params["limit"] = str(self._cfg.limit)
if self._cfg.proto:
params["proto"] = self._cfg.proto
if self._cfg.country:
params["country"] = self._cfg.country
url = self._cfg.url
if params:
sep = "&" if "?" in url else "?"
url = f"{url}{sep}{urlencode(params)}"
data = await http_get_json(url)
proxies: list[ChainHop] = []
for entry in data.get("proxies", []):
proto = entry.get("proto")
addr = entry.get("proxy", "")
if not proto or proto not in VALID_PROTOS or ":" not in addr:
continue
host, port_str = addr.rsplit(":", 1)
try:
port = int(port_str)
except ValueError:
continue
proxies.append(ChainHop(proto=proto, host=host, port=port))
return proxies

View File

@@ -2,7 +2,7 @@
import pytest
from s5p.config import ChainHop, Config, load_config, parse_proxy_url
from s5p.config import ChainHop, Config, load_config, parse_api_proxies, parse_proxy_url
class TestParseProxyUrl:
@@ -65,6 +65,57 @@ class TestChainHop:
assert str(hop) == "http://u@proxy:8080"
class TestParseApiProxies:
"""Test API response proxy parsing."""
def test_valid_entries(self):
data = {
"proxies": [
{"proto": "socks5", "proxy": "1.2.3.4:1080"},
{"proto": "http", "proxy": "5.6.7.8:8080"},
],
}
result = parse_api_proxies(data)
assert len(result) == 2
assert result[0] == ChainHop(proto="socks5", host="1.2.3.4", port=1080)
assert result[1] == ChainHop(proto="http", host="5.6.7.8", port=8080)
def test_skips_invalid_proto(self):
data = {"proxies": [{"proto": "ftp", "proxy": "1.2.3.4:21"}]}
assert parse_api_proxies(data) == []
def test_skips_missing_proto(self):
data = {"proxies": [{"proxy": "1.2.3.4:1080"}]}
assert parse_api_proxies(data) == []
def test_skips_missing_colon(self):
data = {"proxies": [{"proto": "socks5", "proxy": "no-port"}]}
assert parse_api_proxies(data) == []
def test_skips_bad_port(self):
data = {"proxies": [{"proto": "socks5", "proxy": "1.2.3.4:abc"}]}
assert parse_api_proxies(data) == []
def test_empty_proxies(self):
assert parse_api_proxies({"proxies": []}) == []
def test_missing_proxies_key(self):
assert parse_api_proxies({}) == []
def test_mixed_valid_invalid(self):
data = {
"proxies": [
{"proto": "socks5", "proxy": "1.2.3.4:1080"},
{"proto": "ftp", "proxy": "bad:21"},
{"proto": "socks4", "proxy": "5.6.7.8:1080"},
],
}
result = parse_api_proxies(data)
assert len(result) == 2
assert result[0].proto == "socks5"
assert result[1].proto == "socks4"
class TestConfig:
"""Test Config defaults."""