Files
s5p/tests/test_api.py
user 3593481b30 feat: listener retry override, pool protocol filter, conn pool docs
- Per-listener `retries` overrides global default (0 = inherit)
- Pool-level `allowed_protos` filters proxies during merge
- Connection pooling documented in CHEATSHEET.md
- Both features exposed in /config and /status API responses
- 12 new tests (config parsing, API exposure, merge filtering)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 20:35:14 +01:00

873 lines
28 KiB
Python

"""Tests for the control API module."""
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock
from s5p.api import (
_handle_config,
_handle_metrics,
_handle_pool,
_handle_status,
_handle_tor,
_handle_tor_newnym,
_json_response,
_parse_request,
_render_openmetrics,
_route,
)
from s5p.config import ChainHop, Config, ListenerConfig, PoolSourceConfig, ProxyPoolConfig
from s5p.metrics import Metrics
# -- request parsing ---------------------------------------------------------
class TestParseRequest:
"""Test HTTP request line parsing."""
def test_get(self):
assert _parse_request(b"GET /status HTTP/1.1\r\n") == ("GET", "/status")
def test_post(self):
assert _parse_request(b"POST /reload HTTP/1.1\r\n") == ("POST", "/reload")
def test_strips_query_string(self):
assert _parse_request(b"GET /pool?foo=bar HTTP/1.1\r\n") == ("GET", "/pool")
def test_method_uppercased(self):
assert _parse_request(b"get /metrics HTTP/1.1\r\n") == ("GET", "/metrics")
def test_empty(self):
assert _parse_request(b"") == ("", "")
def test_garbage(self):
assert _parse_request(b"\xff\xfe") == ("", "")
def test_incomplete(self):
assert _parse_request(b"GET\r\n") == ("", "")
# -- JSON response -----------------------------------------------------------
class TestJsonResponse:
"""Test HTTP JSON response builder."""
def test_format(self):
writer = MagicMock()
written = bytearray()
writer.write = lambda data: written.extend(data)
_json_response(writer, 200, {"ok": True})
text = written.decode()
assert "HTTP/1.1 200 OK\r\n" in text
assert "Content-Type: application/json\r\n" in text
assert "Connection: close\r\n" in text
# body after double newline
body = text.split("\r\n\r\n", 1)[1]
assert json.loads(body) == {"ok": True}
def test_404(self):
writer = MagicMock()
written = bytearray()
writer.write = lambda data: written.extend(data)
_json_response(writer, 404, {"error": "not found"})
text = written.decode()
assert "HTTP/1.1 404 Not Found\r\n" in text
# -- helpers -----------------------------------------------------------------
def _make_ctx(
config: Config | None = None,
pool: MagicMock | None = None,
pools: dict | None = None,
tor: MagicMock | None = None,
) -> dict:
"""Build a mock context dict."""
return {
"config": config or Config(),
"metrics": Metrics(),
"pool": pool,
"pools": pools,
"hop_pool": None,
"tor": tor,
}
# -- GET handlers ------------------------------------------------------------
class TestHandleStatus:
"""Test GET /status handler."""
def test_basic(self):
ctx = _make_ctx()
ctx["metrics"].connections = 10
ctx["metrics"].success = 8
status, body = _handle_status(ctx)
assert status == 200
assert body["connections"] == 10
assert body["success"] == 8
assert "uptime" in body
assert "rate" in body
assert "latency" in body
def test_with_pool(self):
pool = MagicMock()
pool.alive_count = 5
pool.count = 10
ctx = _make_ctx(pool=pool)
_, body = _handle_status(ctx)
assert body["pool"] == {"alive": 5, "total": 10}
def test_with_listeners(self):
config = Config(
listeners=[
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
),
ListenerConfig(
listen_host="0.0.0.0", listen_port=1081,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_seq=[["default"]],
),
],
)
ctx = _make_ctx(config=config)
# record some latency for the first listener
ctx["metrics"].get_listener_latency("0.0.0.0:1080").record(0.2)
_, body = _handle_status(ctx)
assert len(body["listeners"]) == 2
assert body["listeners"][0]["chain"] == ["socks5://127.0.0.1:9050"]
assert body["listeners"][0]["pool_hops"] == 0
assert body["listeners"][1]["pool_hops"] == 1
# per-listener latency present on each entry
assert "latency" in body["listeners"][0]
assert body["listeners"][0]["latency"]["count"] == 1
assert "latency" in body["listeners"][1]
assert body["listeners"][1]["latency"] is None
class TestHandleStatusAuth:
"""Test auth flag in /status listener entries."""
def test_auth_flag_present(self):
config = Config(
listeners=[
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
auth={"alice": "s3cret", "bob": "hunter2"},
),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_status(ctx)
assert body["listeners"][0]["auth"] is True
def test_auth_flag_absent_when_empty(self):
config = Config(
listeners=[
ListenerConfig(listen_host="0.0.0.0", listen_port=1080),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_status(ctx)
assert "auth" not in body["listeners"][0]
class TestHandleConfigAuth:
"""Test auth_users in /config listener entries."""
def test_auth_users_count(self):
config = Config(
listeners=[
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
auth={"alice": "s3cret", "bob": "hunter2"},
),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert body["listeners"][0]["auth_users"] == 2
def test_auth_users_absent_when_empty(self):
config = Config(
listeners=[
ListenerConfig(listen_host="0.0.0.0", listen_port=1080),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert "auth_users" not in body["listeners"][0]
def test_passwords_not_exposed(self):
config = Config(
listeners=[
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
auth={"alice": "s3cret"},
),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
listener = body["listeners"][0]
# only count, never passwords
assert "auth_users" in listener
assert "auth" not in listener
assert "s3cret" not in str(body)
class TestHandleStatusRetries:
"""Test retries in /status listener entries."""
def test_retries_present_when_set(self):
config = Config(
listeners=[
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
retries=5,
),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_status(ctx)
assert body["listeners"][0]["retries"] == 5
def test_retries_absent_when_zero(self):
config = Config(
listeners=[
ListenerConfig(listen_host="0.0.0.0", listen_port=1080),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_status(ctx)
assert "retries" not in body["listeners"][0]
class TestHandleConfigRetries:
"""Test retries in /config listener entries."""
def test_retries_present_when_set(self):
config = Config(
listeners=[
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
retries=7,
),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert body["listeners"][0]["retries"] == 7
def test_retries_absent_when_zero(self):
config = Config(
listeners=[
ListenerConfig(listen_host="0.0.0.0", listen_port=1080),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert "retries" not in body["listeners"][0]
class TestHandleConfigAllowedProtos:
"""Test allowed_protos in /config pool entries."""
def test_allowed_protos_present(self):
pp = ProxyPoolConfig(
sources=[],
allowed_protos=["socks5"],
)
config = Config(
proxy_pools={"socks_only": pp},
listeners=[ListenerConfig()],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert body["proxy_pools"]["socks_only"]["allowed_protos"] == ["socks5"]
def test_allowed_protos_absent_when_empty(self):
pp = ProxyPoolConfig(sources=[])
config = Config(
proxy_pools={"default": pp},
listeners=[ListenerConfig()],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert "allowed_protos" not in body["proxy_pools"]["default"]
class TestHandleStatusPools:
"""Test GET /status with multiple named pools."""
def test_multi_pool_summary(self):
pool_a = MagicMock()
pool_a.alive_count = 5
pool_a.count = 10
pool_a.name = "clean"
pool_b = MagicMock()
pool_b.alive_count = 3
pool_b.count = 8
pool_b.name = "mitm"
ctx = _make_ctx(pools={"clean": pool_a, "mitm": pool_b})
_, body = _handle_status(ctx)
assert body["pool"] == {"alive": 8, "total": 18}
assert body["pools"]["clean"] == {"alive": 5, "total": 10}
assert body["pools"]["mitm"] == {"alive": 3, "total": 8}
class TestHandleStatusMultiPool:
"""Test pool_seq appears in /status only for multi-pool listeners."""
def test_single_pool_no_pool_seq(self):
"""Single-pool listener: no pool_seq in response."""
config = Config(
listeners=[
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_seq=[["clean"], ["clean"]], pool_name="clean",
),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_status(ctx)
assert "pool_seq" not in body["listeners"][0]
def test_multi_pool_has_pool_seq(self):
"""Multi-pool listener: pool_seq appears in response."""
config = Config(
listeners=[
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_seq=[["clean"], ["mitm"]], pool_name="clean",
),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_status(ctx)
assert body["listeners"][0]["pool_seq"] == [["clean"], ["mitm"]]
assert body["listeners"][0]["pool_hops"] == 2
def test_multi_pool_in_config(self):
"""Multi-pool listener: pool_seq appears in /config response."""
config = Config(
listeners=[
ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_seq=[["clean"], ["mitm"]], pool_name="clean",
),
],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert body["listeners"][0]["pool_seq"] == [["clean"], ["mitm"]]
class TestHandleMetrics:
"""Test GET /metrics handler (OpenMetrics format)."""
def test_returns_openmetrics_string(self):
ctx = _make_ctx()
ctx["metrics"].connections = 42
ctx["metrics"].bytes_in = 1024
status, body = _handle_metrics(ctx)
assert status == 200
assert isinstance(body, str)
assert body.rstrip().endswith("# EOF")
def test_counter_values(self):
ctx = _make_ctx()
ctx["metrics"].connections = 42
ctx["metrics"].bytes_in = 1024
_, body = _handle_metrics(ctx)
assert "s5p_connections_total 42" in body
assert "s5p_bytes_in_total 1024" in body
class TestRenderOpenMetrics:
"""Test OpenMetrics text rendering."""
def test_eof_terminator(self):
ctx = _make_ctx()
text = _render_openmetrics(ctx)
assert text.rstrip().endswith("# EOF")
assert text.endswith("\n")
def test_type_declarations(self):
ctx = _make_ctx()
text = _render_openmetrics(ctx)
assert "# TYPE s5p_connections counter" in text
assert "# TYPE s5p_active_connections gauge" in text
assert "# TYPE s5p_uptime_seconds gauge" in text
def test_help_lines(self):
ctx = _make_ctx()
text = _render_openmetrics(ctx)
assert "# HELP s5p_connections Total connection attempts." in text
assert "# HELP s5p_active_connections Currently open connections." in text
def test_counter_values(self):
ctx = _make_ctx()
ctx["metrics"].connections = 100
ctx["metrics"].success = 95
ctx["metrics"].failed = 5
ctx["metrics"].retries = 10
ctx["metrics"].auth_failures = 2
ctx["metrics"].bytes_in = 4096
ctx["metrics"].bytes_out = 8192
text = _render_openmetrics(ctx)
assert "s5p_connections_total 100" in text
assert "s5p_connections_success_total 95" in text
assert "s5p_connections_failed_total 5" in text
assert "s5p_retries_total 10" in text
assert "s5p_auth_failures_total 2" in text
assert "s5p_bytes_in_total 4096" in text
assert "s5p_bytes_out_total 8192" in text
def test_gauge_values(self):
ctx = _make_ctx()
ctx["metrics"].active = 7
text = _render_openmetrics(ctx)
assert "s5p_active_connections 7" in text
assert "s5p_uptime_seconds " in text
assert "s5p_connection_rate " in text
def test_no_latency_when_empty(self):
ctx = _make_ctx()
text = _render_openmetrics(ctx)
assert "s5p_chain_latency_seconds" not in text
def test_latency_summary(self):
ctx = _make_ctx()
for i in range(1, 101):
ctx["metrics"].latency.record(i / 1000)
text = _render_openmetrics(ctx)
assert "# TYPE s5p_chain_latency_seconds summary" in text
assert 's5p_chain_latency_seconds{quantile="0.5"}' in text
assert 's5p_chain_latency_seconds{quantile="0.95"}' in text
assert 's5p_chain_latency_seconds{quantile="0.99"}' in text
assert "s5p_chain_latency_seconds_count 100" in text
assert "s5p_chain_latency_seconds_sum " in text
def test_listener_latency_summary(self):
ctx = _make_ctx()
tracker = ctx["metrics"].get_listener_latency("0.0.0.0:1080")
for i in range(1, 51):
tracker.record(i / 1000)
text = _render_openmetrics(ctx)
assert "# TYPE s5p_listener_chain_latency_seconds summary" in text
assert (
's5p_listener_chain_latency_seconds{listener="0.0.0.0:1080",'
'quantile="0.5"}'
) in text
assert (
's5p_listener_chain_latency_seconds_count{listener="0.0.0.0:1080"} 50'
) in text
def test_pool_gauges_multi(self):
pool_a = MagicMock()
pool_a.alive_count = 5
pool_a.count = 10
pool_a.name = "clean"
pool_b = MagicMock()
pool_b.alive_count = 3
pool_b.count = 8
pool_b.name = "mitm"
ctx = _make_ctx(pools={"clean": pool_a, "mitm": pool_b})
text = _render_openmetrics(ctx)
assert '# TYPE s5p_pool_proxies_alive gauge' in text
assert 's5p_pool_proxies_alive{pool="clean"} 5' in text
assert 's5p_pool_proxies_alive{pool="mitm"} 3' in text
assert 's5p_pool_proxies_total{pool="clean"} 10' in text
assert 's5p_pool_proxies_total{pool="mitm"} 8' in text
def test_pool_gauges_single(self):
pool = MagicMock()
pool.alive_count = 12
pool.count = 20
ctx = _make_ctx(pool=pool)
text = _render_openmetrics(ctx)
assert "s5p_pool_proxies_alive 12" in text
assert "s5p_pool_proxies_total 20" in text
def test_no_pool_metrics_when_unconfigured(self):
ctx = _make_ctx()
text = _render_openmetrics(ctx)
assert "s5p_pool_proxies" not in text
class TestHandlePool:
"""Test GET /pool handler."""
def test_no_pool(self):
ctx = _make_ctx()
status, body = _handle_pool(ctx)
assert status == 200
assert body == {"alive": 0, "total": 0, "proxies": {}}
def test_with_entries(self):
pool = MagicMock()
pool.alive_count = 1
pool.count = 2
entry_alive = MagicMock(
alive=True, fails=0, tests=5,
last_ok=100.0, last_test=100.0, last_seen=100.0,
)
entry_dead = MagicMock(
alive=False, fails=3, tests=5,
last_ok=0.0, last_test=100.0, last_seen=100.0,
)
pool._proxies = {
"socks5://1.2.3.4:1080": entry_alive,
"socks5://5.6.7.8:1080": entry_dead,
}
ctx = _make_ctx(pool=pool)
_, body = _handle_pool(ctx)
assert len(body["proxies"]) == 2
assert body["proxies"]["socks5://1.2.3.4:1080"]["alive"] is True
def test_alive_only(self):
pool = MagicMock()
pool.alive_count = 1
pool.count = 2
entry_alive = MagicMock(
alive=True, fails=0, tests=5,
last_ok=100.0, last_test=100.0, last_seen=100.0,
)
entry_dead = MagicMock(
alive=False, fails=3, tests=5,
last_ok=0.0, last_test=100.0, last_seen=100.0,
)
pool._proxies = {
"socks5://1.2.3.4:1080": entry_alive,
"socks5://5.6.7.8:1080": entry_dead,
}
ctx = _make_ctx(pool=pool)
_, body = _handle_pool(ctx, alive_only=True)
assert len(body["proxies"]) == 1
assert "socks5://1.2.3.4:1080" in body["proxies"]
class TestHandlePoolMulti:
"""Test GET /pool with multiple named pools."""
def test_merges_entries(self):
pool_a = MagicMock()
pool_a.alive_count = 1
pool_a.count = 1
pool_a.name = "clean"
entry_a = MagicMock(
alive=True, fails=0, tests=5,
last_ok=100.0, last_test=100.0, last_seen=100.0,
)
pool_a._proxies = {"socks5://1.2.3.4:1080": entry_a}
pool_b = MagicMock()
pool_b.alive_count = 1
pool_b.count = 1
pool_b.name = "mitm"
entry_b = MagicMock(
alive=True, fails=0, tests=3,
last_ok=90.0, last_test=90.0, last_seen=90.0,
)
pool_b._proxies = {"socks5://5.6.7.8:1080": entry_b}
ctx = _make_ctx(pools={"clean": pool_a, "mitm": pool_b})
_, body = _handle_pool(ctx)
assert body["alive"] == 2
assert body["total"] == 2
assert len(body["proxies"]) == 2
assert body["proxies"]["socks5://1.2.3.4:1080"]["pool"] == "clean"
assert body["proxies"]["socks5://5.6.7.8:1080"]["pool"] == "mitm"
assert "pools" in body
assert body["pools"]["clean"] == {"alive": 1, "total": 1}
def test_single_pool_no_pool_field(self):
"""Single pool: no 'pool' field on entries, no 'pools' summary."""
pool = MagicMock()
pool.alive_count = 1
pool.count = 1
pool.name = "default"
entry = MagicMock(
alive=True, fails=0, tests=5,
last_ok=100.0, last_test=100.0, last_seen=100.0,
)
pool._proxies = {"socks5://1.2.3.4:1080": entry}
ctx = _make_ctx(pools={"default": pool})
_, body = _handle_pool(ctx)
assert "pool" not in body["proxies"]["socks5://1.2.3.4:1080"]
assert "pools" not in body
class TestHandleConfig:
"""Test GET /config handler."""
def test_basic(self):
config = Config(
timeout=15.0, retries=5, log_level="debug",
listeners=[ListenerConfig(listen_host="0.0.0.0", listen_port=1080)],
)
ctx = _make_ctx(config=config)
status, body = _handle_config(ctx)
assert status == 200
assert body["timeout"] == 15.0
assert body["retries"] == 5
assert body["log_level"] == "debug"
assert len(body["listeners"]) == 1
assert body["listeners"][0]["listen"] == "0.0.0.0:1080"
def test_with_proxy_pool(self):
pp = ProxyPoolConfig(
sources=[PoolSourceConfig(url="http://api:8081/proxies")],
refresh=600.0,
test_interval=60.0,
max_fails=5,
)
config = Config(
proxy_pool=pp,
listeners=[ListenerConfig(
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_seq=["default"],
)],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert body["proxy_pool"]["refresh"] == 600.0
assert body["proxy_pool"]["sources"][0]["url"] == "http://api:8081/proxies"
assert body["listeners"][0]["pool_hops"] == 1
def test_with_proxy_pools(self):
pp_clean = ProxyPoolConfig(
sources=[PoolSourceConfig(url="http://api:8081/proxies/all", mitm=False)],
refresh=300.0,
test_interval=120.0,
max_fails=3,
)
pp_mitm = ProxyPoolConfig(
sources=[PoolSourceConfig(url="http://api:8081/proxies/all", mitm=True)],
refresh=300.0,
test_interval=120.0,
max_fails=3,
)
config = Config(
proxy_pools={"clean": pp_clean, "mitm": pp_mitm},
listeners=[ListenerConfig(
listen_host="0.0.0.0", listen_port=1080,
chain=[ChainHop("socks5", "127.0.0.1", 9050)],
pool_seq=[["clean"], ["clean"]], pool_name="clean",
)],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert "proxy_pools" in body
assert body["proxy_pools"]["clean"]["sources"][0]["mitm"] is False
assert body["proxy_pools"]["mitm"]["sources"][0]["mitm"] is True
assert body["listeners"][0]["pool"] == "clean"
def test_with_tor_nodes(self):
config = Config(
tor_nodes=[
ChainHop("socks5", "10.200.1.1", 9050),
ChainHop("socks5", "10.200.1.13", 9050),
],
listeners=[ListenerConfig()],
)
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert body["tor_nodes"] == [
"socks5://10.200.1.1:9050",
"socks5://10.200.1.13:9050",
]
def test_no_tor_nodes(self):
config = Config(listeners=[ListenerConfig()])
ctx = _make_ctx(config=config)
_, body = _handle_config(ctx)
assert "tor_nodes" not in body
class TestHandleStatusTorNodes:
"""Test tor_nodes in GET /status response."""
def test_tor_nodes_in_status(self):
config = Config(
tor_nodes=[
ChainHop("socks5", "10.200.1.1", 9050),
ChainHop("socks5", "10.200.1.13", 9050),
],
listeners=[ListenerConfig()],
)
ctx = _make_ctx(config=config)
_, body = _handle_status(ctx)
assert body["tor_nodes"] == [
"socks5://10.200.1.1:9050",
"socks5://10.200.1.13:9050",
]
def test_no_tor_nodes_in_status(self):
ctx = _make_ctx()
_, body = _handle_status(ctx)
assert "tor_nodes" not in body
# -- routing -----------------------------------------------------------------
class TestRouting:
"""Test request routing and error responses."""
def test_get_status(self):
ctx = _make_ctx()
status, body = asyncio.run(_route("GET", "/status", ctx))
assert status == 200
def test_get_metrics(self):
ctx = _make_ctx()
status, _ = asyncio.run(_route("GET", "/metrics", ctx))
assert status == 200
def test_get_pool(self):
ctx = _make_ctx()
status, _ = asyncio.run(_route("GET", "/pool", ctx))
assert status == 200
def test_get_pool_alive(self):
ctx = _make_ctx()
status, _ = asyncio.run(_route("GET", "/pool/alive", ctx))
assert status == 200
def test_get_config(self):
ctx = _make_ctx()
status, _ = asyncio.run(_route("GET", "/config", ctx))
assert status == 200
def test_post_reload(self):
ctx = _make_ctx()
ctx["reload_fn"] = AsyncMock()
status, body = asyncio.run(_route("POST", "/reload", ctx))
assert status == 200
assert body == {"ok": True}
def test_post_pool_test(self):
pool = MagicMock()
pool._run_health_tests = AsyncMock()
ctx = _make_ctx(pool=pool)
status, body = asyncio.run(_route("POST", "/pool/test", ctx))
assert status == 200
assert body == {"ok": True}
def test_post_pool_refresh(self):
pool = MagicMock()
pool._fetch_all_sources = AsyncMock()
ctx = _make_ctx(pool=pool)
status, body = asyncio.run(_route("POST", "/pool/refresh", ctx))
assert status == 200
assert body == {"ok": True}
def test_404(self):
ctx = _make_ctx()
status, body = asyncio.run(_route("GET", "/nonexistent", ctx))
assert status == 404
assert "error" in body
def test_405_wrong_method(self):
ctx = _make_ctx()
status, body = asyncio.run(_route("POST", "/status", ctx))
assert status == 405
assert "GET" in body["error"]
def test_405_get_on_post_route(self):
ctx = _make_ctx()
status, body = asyncio.run(_route("GET", "/reload", ctx))
assert status == 405
assert "POST" in body["error"]
def test_get_tor(self):
ctx = _make_ctx()
status, body = asyncio.run(_route("GET", "/tor", ctx))
assert status == 200
assert body == {"enabled": False}
def test_post_tor_newnym(self):
tor = MagicMock()
tor.newnym = AsyncMock(return_value=True)
ctx = _make_ctx(tor=tor)
status, body = asyncio.run(_route("POST", "/tor/newnym", ctx))
assert status == 200
assert body == {"ok": True}
# -- Tor endpoints ----------------------------------------------------------
class TestHandleTor:
"""Test GET /tor handler."""
def test_disabled(self):
ctx = _make_ctx()
status, body = _handle_tor(ctx)
assert status == 200
assert body == {"enabled": False}
def test_connected(self):
tor = MagicMock()
tor.connected = True
tor.last_newnym = 0.0
tor.newnym_interval = 60.0
ctx = _make_ctx(tor=tor)
status, body = _handle_tor(ctx)
assert status == 200
assert body["enabled"] is True
assert body["connected"] is True
assert body["last_newnym"] is None
assert body["newnym_interval"] == 60.0
def test_with_last_newnym(self):
import time
tor = MagicMock()
tor.connected = True
tor.last_newnym = time.monotonic() - 42.0
tor.newnym_interval = 0.0
ctx = _make_ctx(tor=tor)
status, body = _handle_tor(ctx)
assert status == 200
assert body["last_newnym"] is not None
assert body["last_newnym"] >= 42.0
class TestHandleTorNewnym:
"""Test POST /tor/newnym handler."""
def test_success(self):
tor = MagicMock()
tor.newnym = AsyncMock(return_value=True)
ctx = _make_ctx(tor=tor)
status, body = asyncio.run(_handle_tor_newnym(ctx))
assert status == 200
assert body == {"ok": True}
def test_rate_limited(self):
tor = MagicMock()
tor.newnym = AsyncMock(return_value=False)
ctx = _make_ctx(tor=tor)
status, body = asyncio.run(_handle_tor_newnym(ctx))
assert status == 200
assert body["ok"] is False
assert "reason" in body
def test_not_configured(self):
ctx = _make_ctx()
status, body = asyncio.run(_handle_tor_newnym(ctx))
assert status == 400
assert "error" in body