- Per-listener `retries` overrides global default (0 = inherit) - Pool-level `allowed_protos` filters proxies during merge - Connection pooling documented in CHEATSHEET.md - Both features exposed in /config and /status API responses - 12 new tests (config parsing, API exposure, merge filtering) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
792 lines
26 KiB
Python
792 lines
26 KiB
Python
"""Tests for configuration loading and proxy URL parsing."""
|
|
|
|
import pytest
|
|
|
|
from s5p.config import (
|
|
ChainHop,
|
|
Config,
|
|
ListenerConfig,
|
|
load_config,
|
|
parse_api_proxies,
|
|
parse_proxy_url,
|
|
)
|
|
from s5p.server import _bypass_match
|
|
|
|
|
|
class TestParseProxyUrl:
|
|
"""Test proxy URL parsing."""
|
|
|
|
def test_socks5_basic(self):
|
|
hop = parse_proxy_url("socks5://127.0.0.1:9050")
|
|
assert hop.proto == "socks5"
|
|
assert hop.host == "127.0.0.1"
|
|
assert hop.port == 9050
|
|
assert hop.username is None
|
|
assert hop.password is None
|
|
|
|
def test_socks5_with_auth(self):
|
|
hop = parse_proxy_url("socks5://user:pass@proxy.example.com:1080")
|
|
assert hop.proto == "socks5"
|
|
assert hop.host == "proxy.example.com"
|
|
assert hop.port == 1080
|
|
assert hop.username == "user"
|
|
assert hop.password == "pass"
|
|
|
|
def test_socks4(self):
|
|
hop = parse_proxy_url("socks4://10.0.0.1:1080")
|
|
assert hop.proto == "socks4"
|
|
assert hop.host == "10.0.0.1"
|
|
assert hop.port == 1080
|
|
|
|
def test_http_connect(self):
|
|
hop = parse_proxy_url("http://proxy:8080")
|
|
assert hop.proto == "http"
|
|
assert hop.host == "proxy"
|
|
assert hop.port == 8080
|
|
|
|
def test_default_port_socks5(self):
|
|
hop = parse_proxy_url("socks5://host")
|
|
assert hop.port == 1080
|
|
|
|
def test_default_port_http(self):
|
|
hop = parse_proxy_url("http://host")
|
|
assert hop.port == 8080
|
|
|
|
def test_unsupported_protocol(self):
|
|
with pytest.raises(ValueError, match="unsupported protocol"):
|
|
parse_proxy_url("ftp://host:21")
|
|
|
|
def test_missing_host(self):
|
|
with pytest.raises(ValueError, match="missing host"):
|
|
parse_proxy_url("socks5://")
|
|
|
|
|
|
class TestChainHop:
|
|
"""Test ChainHop string representation."""
|
|
|
|
def test_str_without_auth(self):
|
|
hop = ChainHop(proto="socks5", host="localhost", port=9050)
|
|
assert str(hop) == "socks5://localhost:9050"
|
|
|
|
def test_str_with_auth(self):
|
|
hop = ChainHop(proto="http", host="proxy", port=8080, username="u", password="p")
|
|
assert str(hop) == "http://u@proxy:8080"
|
|
|
|
|
|
class TestParseApiProxies:
|
|
"""Test API response proxy parsing."""
|
|
|
|
def test_valid_entries(self):
|
|
data = {
|
|
"proxies": [
|
|
{"proto": "socks5", "proxy": "1.2.3.4:1080"},
|
|
{"proto": "http", "proxy": "5.6.7.8:8080"},
|
|
],
|
|
}
|
|
result = parse_api_proxies(data)
|
|
assert len(result) == 2
|
|
assert result[0] == ChainHop(proto="socks5", host="1.2.3.4", port=1080)
|
|
assert result[1] == ChainHop(proto="http", host="5.6.7.8", port=8080)
|
|
|
|
def test_skips_invalid_proto(self):
|
|
data = {"proxies": [{"proto": "ftp", "proxy": "1.2.3.4:21"}]}
|
|
assert parse_api_proxies(data) == []
|
|
|
|
def test_skips_missing_proto(self):
|
|
data = {"proxies": [{"proxy": "1.2.3.4:1080"}]}
|
|
assert parse_api_proxies(data) == []
|
|
|
|
def test_skips_missing_colon(self):
|
|
data = {"proxies": [{"proto": "socks5", "proxy": "no-port"}]}
|
|
assert parse_api_proxies(data) == []
|
|
|
|
def test_skips_bad_port(self):
|
|
data = {"proxies": [{"proto": "socks5", "proxy": "1.2.3.4:abc"}]}
|
|
assert parse_api_proxies(data) == []
|
|
|
|
def test_empty_proxies(self):
|
|
assert parse_api_proxies({"proxies": []}) == []
|
|
|
|
def test_missing_proxies_key(self):
|
|
assert parse_api_proxies({}) == []
|
|
|
|
def test_mixed_valid_invalid(self):
|
|
data = {
|
|
"proxies": [
|
|
{"proto": "socks5", "proxy": "1.2.3.4:1080"},
|
|
{"proto": "ftp", "proxy": "bad:21"},
|
|
{"proto": "socks4", "proxy": "5.6.7.8:1080"},
|
|
],
|
|
}
|
|
result = parse_api_proxies(data)
|
|
assert len(result) == 2
|
|
assert result[0].proto == "socks5"
|
|
assert result[1].proto == "socks4"
|
|
|
|
|
|
class TestConfig:
|
|
"""Test Config defaults."""
|
|
|
|
def test_defaults(self):
|
|
c = Config()
|
|
assert c.listen_host == "127.0.0.1"
|
|
assert c.listen_port == 1080
|
|
assert c.chain == []
|
|
assert c.timeout == 10.0
|
|
assert c.max_connections == 256
|
|
assert c.pool_size == 0
|
|
assert c.pool_max_idle == 30.0
|
|
assert c.tor_nodes == []
|
|
|
|
def test_max_connections_from_yaml(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text("max_connections: 512\n")
|
|
c = load_config(cfg_file)
|
|
assert c.max_connections == 512
|
|
|
|
def test_pool_size_from_yaml(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text("pool_size: 16\npool_max_idle: 45.0\n")
|
|
c = load_config(cfg_file)
|
|
assert c.pool_size == 16
|
|
assert c.pool_max_idle == 45.0
|
|
|
|
def test_tor_config_from_yaml(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"tor:\n"
|
|
" control_host: 10.0.0.1\n"
|
|
" control_port: 9151\n"
|
|
" password: secret\n"
|
|
" cookie_file: /var/run/tor/cookie\n"
|
|
" newnym_interval: 60\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.tor is not None
|
|
assert c.tor.control_host == "10.0.0.1"
|
|
assert c.tor.control_port == 9151
|
|
assert c.tor.password == "secret"
|
|
assert c.tor.cookie_file == "/var/run/tor/cookie"
|
|
assert c.tor.newnym_interval == 60.0
|
|
|
|
def test_tor_config_defaults(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text("tor:\n password: test\n")
|
|
c = load_config(cfg_file)
|
|
assert c.tor is not None
|
|
assert c.tor.control_host == "127.0.0.1"
|
|
assert c.tor.control_port == 9051
|
|
assert c.tor.cookie_file == ""
|
|
assert c.tor.newnym_interval == 0.0
|
|
|
|
def test_no_tor_config(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text("listen: 1080\n")
|
|
c = load_config(cfg_file)
|
|
assert c.tor is None
|
|
|
|
def test_proxy_pool_test_targets(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"proxy_pool:\n"
|
|
" sources: []\n"
|
|
" test_targets:\n"
|
|
" - host-a.example.com\n"
|
|
" - host-b.example.com\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.proxy_pool is not None
|
|
assert c.proxy_pool.test_targets == ["host-a.example.com", "host-b.example.com"]
|
|
assert c.proxy_pool.test_url == ""
|
|
|
|
def test_proxy_pool_legacy_test_url(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"proxy_pool:\n"
|
|
" sources: []\n"
|
|
" test_url: http://httpbin.org/ip\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.proxy_pool is not None
|
|
assert c.proxy_pool.test_targets == ["httpbin.org"]
|
|
|
|
def test_proxy_pool_defaults(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"proxy_pool:\n"
|
|
" sources: []\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.proxy_pool is not None
|
|
assert c.proxy_pool.test_targets == [
|
|
"www.google.com", "www.cloudflare.com", "www.amazon.com",
|
|
]
|
|
|
|
|
|
class TestProxyPools:
|
|
"""Test named proxy_pools config parsing."""
|
|
|
|
def test_proxy_pools_from_yaml(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"proxy_pools:\n"
|
|
" clean:\n"
|
|
" sources:\n"
|
|
" - url: http://api:8081/proxies/all\n"
|
|
" mitm: false\n"
|
|
" refresh: 300\n"
|
|
" state_file: /data/pool-clean.json\n"
|
|
" mitm:\n"
|
|
" sources:\n"
|
|
" - url: http://api:8081/proxies/all\n"
|
|
" mitm: true\n"
|
|
" state_file: /data/pool-mitm.json\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert "clean" in c.proxy_pools
|
|
assert "mitm" in c.proxy_pools
|
|
assert c.proxy_pools["clean"].sources[0].mitm is False
|
|
assert c.proxy_pools["mitm"].sources[0].mitm is True
|
|
assert c.proxy_pools["clean"].state_file == "/data/pool-clean.json"
|
|
assert c.proxy_pools["mitm"].state_file == "/data/pool-mitm.json"
|
|
|
|
def test_mitm_none_when_absent(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"proxy_pool:\n"
|
|
" sources:\n"
|
|
" - url: http://api:8081/proxies\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.proxy_pool is not None
|
|
assert c.proxy_pool.sources[0].mitm is None
|
|
|
|
def test_singular_becomes_default(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"proxy_pool:\n"
|
|
" sources:\n"
|
|
" - url: http://api:8081/proxies\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert "default" in c.proxy_pools
|
|
assert c.proxy_pools["default"] is c.proxy_pool
|
|
|
|
def test_proxy_pools_wins_over_singular(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"proxy_pools:\n"
|
|
" default:\n"
|
|
" sources:\n"
|
|
" - url: http://api:8081/pools-default\n"
|
|
"proxy_pool:\n"
|
|
" sources:\n"
|
|
" - url: http://api:8081/singular\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
# proxy_pools "default" wins, singular does not overwrite
|
|
assert c.proxy_pools["default"].sources[0].url == "http://api:8081/pools-default"
|
|
|
|
def test_listener_pool_name(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 0.0.0.0:1080\n"
|
|
" pool: clean\n"
|
|
" chain:\n"
|
|
" - socks5://127.0.0.1:9050\n"
|
|
" - pool\n"
|
|
" - listen: 0.0.0.0:1081\n"
|
|
" chain:\n"
|
|
" - socks5://127.0.0.1:9050\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.listeners[0].pool_name == "clean"
|
|
assert c.listeners[0].pool_hops == 1
|
|
assert c.listeners[1].pool_name == ""
|
|
assert c.listeners[1].pool_hops == 0
|
|
|
|
|
|
class TestAllowedProtos:
|
|
"""Test pool-level allowed_protos config."""
|
|
|
|
def test_allowed_protos_from_yaml(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"proxy_pools:\n"
|
|
" socks_only:\n"
|
|
" sources: []\n"
|
|
" allowed_protos: [socks5]\n"
|
|
" any:\n"
|
|
" sources: []\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.proxy_pools["socks_only"].allowed_protos == ["socks5"]
|
|
assert c.proxy_pools["any"].allowed_protos == []
|
|
|
|
def test_allowed_protos_multiple(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"proxy_pool:\n"
|
|
" sources: []\n"
|
|
" allowed_protos: [socks5, http]\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.proxy_pool.allowed_protos == ["socks5", "http"]
|
|
|
|
def test_allowed_protos_default_empty(self):
|
|
from s5p.config import ProxyPoolConfig
|
|
cfg = ProxyPoolConfig()
|
|
assert cfg.allowed_protos == []
|
|
|
|
|
|
class TestTorNodes:
|
|
"""Test tor_nodes config parsing."""
|
|
|
|
def test_tor_nodes_from_yaml(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"tor_nodes:\n"
|
|
" - socks5://10.200.1.1:9050\n"
|
|
" - socks5://10.200.1.254:9050\n"
|
|
" - socks5://10.200.1.13:9050\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert len(c.tor_nodes) == 3
|
|
assert c.tor_nodes[0].host == "10.200.1.1"
|
|
assert c.tor_nodes[0].port == 9050
|
|
assert c.tor_nodes[1].host == "10.200.1.254"
|
|
assert c.tor_nodes[2].host == "10.200.1.13"
|
|
|
|
def test_no_tor_nodes(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text("listen: 1080\n")
|
|
c = load_config(cfg_file)
|
|
assert c.tor_nodes == []
|
|
|
|
|
|
class TestListenerConfig:
|
|
"""Test multi-listener config parsing."""
|
|
|
|
def test_defaults(self):
|
|
lc = ListenerConfig()
|
|
assert lc.listen_host == "127.0.0.1"
|
|
assert lc.listen_port == 1080
|
|
assert lc.chain == []
|
|
assert lc.pool_hops == 0
|
|
|
|
def test_listeners_from_yaml(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 0.0.0.0:1080\n"
|
|
" chain:\n"
|
|
" - socks5://127.0.0.1:9050\n"
|
|
" - listen: 0.0.0.0:1081\n"
|
|
" chain:\n"
|
|
" - socks5://127.0.0.1:9050\n"
|
|
" - pool\n"
|
|
" - listen: 0.0.0.0:1082\n"
|
|
" chain:\n"
|
|
" - socks5://127.0.0.1:9050\n"
|
|
" - pool\n"
|
|
" - pool\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert len(c.listeners) == 3
|
|
|
|
# listener 0: no pool hops
|
|
assert c.listeners[0].listen_host == "0.0.0.0"
|
|
assert c.listeners[0].listen_port == 1080
|
|
assert len(c.listeners[0].chain) == 1
|
|
assert c.listeners[0].pool_hops == 0
|
|
|
|
# listener 1: 1 pool hop
|
|
assert c.listeners[1].listen_port == 1081
|
|
assert len(c.listeners[1].chain) == 1
|
|
assert c.listeners[1].pool_hops == 1
|
|
|
|
# listener 2: 2 pool hops
|
|
assert c.listeners[2].listen_port == 1082
|
|
assert len(c.listeners[2].chain) == 1
|
|
assert c.listeners[2].pool_hops == 2
|
|
|
|
def test_pool_keyword_stripped_from_chain(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
" chain:\n"
|
|
" - socks5://tor:9050\n"
|
|
" - pool\n"
|
|
" - pool\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
lc = c.listeners[0]
|
|
# only the real hop remains in chain
|
|
assert len(lc.chain) == 1
|
|
assert lc.chain[0].host == "tor"
|
|
assert lc.pool_hops == 2
|
|
|
|
def test_pool_keyword_case_insensitive(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
" chain:\n"
|
|
" - Pool\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.listeners[0].pool_hops == 1
|
|
assert c.listeners[0].chain == []
|
|
|
|
|
|
class TestPoolSeq:
|
|
"""Test per-hop pool references (pool:name syntax)."""
|
|
|
|
def test_bare_pool_uses_default_name(self, tmp_path):
|
|
"""Bare `pool` + `pool: clean` -> pool_seq=[["clean"]]."""
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
" pool: clean\n"
|
|
" chain:\n"
|
|
" - pool\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.listeners[0].pool_seq == [["clean"]]
|
|
|
|
def test_bare_pool_no_pool_name(self, tmp_path):
|
|
"""Bare `pool` with no `pool:` key -> pool_seq=[["default"]]."""
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
" chain:\n"
|
|
" - pool\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.listeners[0].pool_seq == [["default"]]
|
|
|
|
def test_pool_colon_name(self, tmp_path):
|
|
"""`pool:clean, pool:mitm` -> pool_seq=[["clean"], ["mitm"]]."""
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
" chain:\n"
|
|
" - pool:clean\n"
|
|
" - pool:mitm\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.listeners[0].pool_seq == [["clean"], ["mitm"]]
|
|
|
|
def test_mixed_bare_and_named(self, tmp_path):
|
|
"""Bare `pool` + `pool:mitm` with `pool: clean` -> [["clean"], ["mitm"]]."""
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
" pool: clean\n"
|
|
" chain:\n"
|
|
" - pool\n"
|
|
" - pool:mitm\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.listeners[0].pool_seq == [["clean"], ["mitm"]]
|
|
|
|
def test_pool_colon_case_insensitive_prefix(self, tmp_path):
|
|
"""`Pool:MyPool` -> pool_seq=[["MyPool"]] (prefix case-insensitive)."""
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
" chain:\n"
|
|
" - Pool:MyPool\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.listeners[0].pool_seq == [["MyPool"]]
|
|
|
|
def test_pool_colon_empty_is_bare(self, tmp_path):
|
|
"""`pool:` (empty name) -> treated as bare pool."""
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
" pool: clean\n"
|
|
" chain:\n"
|
|
" - pool:\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.listeners[0].pool_seq == [["clean"]]
|
|
|
|
def test_backward_compat_pool_hops_property(self):
|
|
"""pool_hops property returns len(pool_seq)."""
|
|
lc = ListenerConfig(pool_seq=[["clean"], ["mitm"]])
|
|
assert lc.pool_hops == 2
|
|
lc2 = ListenerConfig()
|
|
assert lc2.pool_hops == 0
|
|
|
|
def test_legacy_auto_append(self, tmp_path):
|
|
"""Singular `proxy_pool:` -> pool_seq=[["default"]]."""
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listen: 0.0.0.0:1080\n"
|
|
"chain:\n"
|
|
" - socks5://127.0.0.1:9050\n"
|
|
"proxy_pool:\n"
|
|
" sources:\n"
|
|
" - url: http://api:8081/proxies\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
lc = c.listeners[0]
|
|
assert lc.pool_seq == [["default"]]
|
|
assert lc.pool_hops == 1
|
|
|
|
def test_list_candidates(self, tmp_path):
|
|
"""List in chain -> multi-candidate hop."""
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
" chain:\n"
|
|
" - socks5://tor:9050\n"
|
|
" - [pool:clean, pool:mitm]\n"
|
|
" - [pool:clean, pool:mitm]\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
lc = c.listeners[0]
|
|
assert len(lc.chain) == 1
|
|
assert lc.pool_hops == 2
|
|
assert lc.pool_seq == [["clean", "mitm"], ["clean", "mitm"]]
|
|
|
|
|
|
class TestListenerBackwardCompat:
|
|
"""Test backward-compatible single listener from old format."""
|
|
|
|
def test_old_format_creates_single_listener(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listen: 0.0.0.0:9999\n"
|
|
"chain:\n"
|
|
" - socks5://127.0.0.1:9050\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert len(c.listeners) == 1
|
|
lc = c.listeners[0]
|
|
assert lc.listen_host == "0.0.0.0"
|
|
assert lc.listen_port == 9999
|
|
assert len(lc.chain) == 1
|
|
assert lc.pool_hops == 0
|
|
|
|
def test_empty_config_creates_single_listener(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text("")
|
|
c = load_config(cfg_file)
|
|
assert len(c.listeners) == 1
|
|
lc = c.listeners[0]
|
|
assert lc.listen_host == "127.0.0.1"
|
|
assert lc.listen_port == 1080
|
|
|
|
|
|
class TestListenerPoolCompat:
|
|
"""Test that proxy_pool + old format auto-sets pool_hops=1."""
|
|
|
|
def test_pool_auto_appends(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listen: 0.0.0.0:1080\n"
|
|
"chain:\n"
|
|
" - socks5://127.0.0.1:9050\n"
|
|
"proxy_pool:\n"
|
|
" sources:\n"
|
|
" - url: http://api:8081/proxies\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert len(c.listeners) == 1
|
|
lc = c.listeners[0]
|
|
assert lc.pool_hops == 1
|
|
|
|
def test_explicit_listeners_no_auto_append(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 0.0.0.0:1080\n"
|
|
" chain:\n"
|
|
" - socks5://127.0.0.1:9050\n"
|
|
"proxy_pool:\n"
|
|
" sources:\n"
|
|
" - url: http://api:8081/proxies\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert len(c.listeners) == 1
|
|
lc = c.listeners[0]
|
|
# explicit listeners: no auto pool_hops
|
|
assert lc.pool_hops == 0
|
|
|
|
|
|
class TestListenerRetries:
|
|
"""Test per-listener retry override config."""
|
|
|
|
def test_retries_default(self):
|
|
lc = ListenerConfig()
|
|
assert lc.retries == 0
|
|
|
|
def test_retries_from_yaml(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
" retries: 5\n"
|
|
" chain:\n"
|
|
" - socks5://127.0.0.1:9050\n"
|
|
" - pool\n"
|
|
" - listen: 1081\n"
|
|
" chain:\n"
|
|
" - socks5://127.0.0.1:9050\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.listeners[0].retries == 5
|
|
assert c.listeners[1].retries == 0
|
|
|
|
def test_retries_absent_from_yaml(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.listeners[0].retries == 0
|
|
|
|
|
|
class TestAuthConfig:
|
|
"""Test auth field in listener config."""
|
|
|
|
def test_auth_from_yaml(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
" auth:\n"
|
|
" alice: s3cret\n"
|
|
" bob: hunter2\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.listeners[0].auth == {"alice": "s3cret", "bob": "hunter2"}
|
|
|
|
def test_auth_empty_default(self):
|
|
lc = ListenerConfig()
|
|
assert lc.auth == {}
|
|
|
|
def test_auth_absent_from_yaml(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.listeners[0].auth == {}
|
|
|
|
def test_auth_numeric_password(self, tmp_path):
|
|
"""YAML parses `admin: 12345` as int; must be coerced to str."""
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
" auth:\n"
|
|
" admin: 12345\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.listeners[0].auth == {"admin": "12345"}
|
|
|
|
def test_auth_mixed_listeners(self, tmp_path):
|
|
"""One listener with auth, one without."""
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
" auth:\n"
|
|
" alice: pass\n"
|
|
" - listen: 1081\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.listeners[0].auth == {"alice": "pass"}
|
|
assert c.listeners[1].auth == {}
|
|
|
|
|
|
class TestBypassConfig:
|
|
"""Test bypass rules in listener config."""
|
|
|
|
def test_bypass_from_yaml(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
" bypass:\n"
|
|
" - 127.0.0.0/8\n"
|
|
" - 192.168.0.0/16\n"
|
|
" - localhost\n"
|
|
" - .local\n"
|
|
" chain:\n"
|
|
" - socks5://127.0.0.1:9050\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
lc = c.listeners[0]
|
|
assert lc.bypass == ["127.0.0.0/8", "192.168.0.0/16", "localhost", ".local"]
|
|
|
|
def test_bypass_empty_default(self):
|
|
lc = ListenerConfig()
|
|
assert lc.bypass == []
|
|
|
|
def test_bypass_absent_from_yaml(self, tmp_path):
|
|
cfg_file = tmp_path / "test.yaml"
|
|
cfg_file.write_text(
|
|
"listeners:\n"
|
|
" - listen: 1080\n"
|
|
" chain:\n"
|
|
" - socks5://127.0.0.1:9050\n"
|
|
)
|
|
c = load_config(cfg_file)
|
|
assert c.listeners[0].bypass == []
|
|
|
|
|
|
class TestBypassMatch:
|
|
"""Test _bypass_match function."""
|
|
|
|
def test_cidr_ipv4(self):
|
|
assert _bypass_match(["10.0.0.0/8"], "10.1.2.3") is True
|
|
assert _bypass_match(["10.0.0.0/8"], "11.0.0.1") is False
|
|
|
|
def test_cidr_ipv6(self):
|
|
assert _bypass_match(["fc00::/7"], "fd00::1") is True
|
|
assert _bypass_match(["fc00::/7"], "2001:db8::1") is False
|
|
|
|
def test_exact_ip(self):
|
|
assert _bypass_match(["127.0.0.1"], "127.0.0.1") is True
|
|
assert _bypass_match(["127.0.0.1"], "127.0.0.2") is False
|
|
|
|
def test_exact_hostname(self):
|
|
assert _bypass_match(["localhost"], "localhost") is True
|
|
assert _bypass_match(["localhost"], "otherhost") is False
|
|
|
|
def test_domain_suffix(self):
|
|
assert _bypass_match([".local"], "myhost.local") is True
|
|
assert _bypass_match([".local"], "local") is True
|
|
assert _bypass_match([".local"], "notlocal") is False
|
|
assert _bypass_match([".example.com"], "api.example.com") is True
|
|
assert _bypass_match([".example.com"], "example.com") is True
|
|
|
|
def test_multiple_rules(self):
|
|
rules = ["10.0.0.0/8", "192.168.0.0/16", ".local"]
|
|
assert _bypass_match(rules, "10.1.2.3") is True
|
|
assert _bypass_match(rules, "192.168.1.1") is True
|
|
assert _bypass_match(rules, "host.local") is True
|
|
assert _bypass_match(rules, "8.8.8.8") is False
|
|
|
|
def test_empty_rules(self):
|
|
assert _bypass_match([], "anything") is False
|
|
|
|
def test_hostname_not_matched_by_cidr(self):
|
|
assert _bypass_match(["10.0.0.0/8"], "example.com") is False
|