"""Tests for the control API module.""" import asyncio import json from unittest.mock import AsyncMock, MagicMock from s5p.api import ( _handle_config, _handle_metrics, _handle_pool, _handle_status, _handle_tor, _handle_tor_newnym, _json_response, _parse_request, _render_openmetrics, _route, ) from s5p.config import ChainHop, Config, ListenerConfig, PoolSourceConfig, ProxyPoolConfig from s5p.metrics import Metrics # -- request parsing --------------------------------------------------------- class TestParseRequest: """Test HTTP request line parsing.""" def test_get(self): assert _parse_request(b"GET /status HTTP/1.1\r\n") == ("GET", "/status") def test_post(self): assert _parse_request(b"POST /reload HTTP/1.1\r\n") == ("POST", "/reload") def test_strips_query_string(self): assert _parse_request(b"GET /pool?foo=bar HTTP/1.1\r\n") == ("GET", "/pool") def test_method_uppercased(self): assert _parse_request(b"get /metrics HTTP/1.1\r\n") == ("GET", "/metrics") def test_empty(self): assert _parse_request(b"") == ("", "") def test_garbage(self): assert _parse_request(b"\xff\xfe") == ("", "") def test_incomplete(self): assert _parse_request(b"GET\r\n") == ("", "") # -- JSON response ----------------------------------------------------------- class TestJsonResponse: """Test HTTP JSON response builder.""" def test_format(self): writer = MagicMock() written = bytearray() writer.write = lambda data: written.extend(data) _json_response(writer, 200, {"ok": True}) text = written.decode() assert "HTTP/1.1 200 OK\r\n" in text assert "Content-Type: application/json\r\n" in text assert "Connection: close\r\n" in text # body after double newline body = text.split("\r\n\r\n", 1)[1] assert json.loads(body) == {"ok": True} def test_404(self): writer = MagicMock() written = bytearray() writer.write = lambda data: written.extend(data) _json_response(writer, 404, {"error": "not found"}) text = written.decode() assert "HTTP/1.1 404 Not Found\r\n" in text # -- helpers ----------------------------------------------------------------- def _make_ctx( config: Config | None = None, pool: MagicMock | None = None, pools: dict | None = None, tor: MagicMock | None = None, ) -> dict: """Build a mock context dict.""" return { "config": config or Config(), "metrics": Metrics(), "pool": pool, "pools": pools, "hop_pool": None, "tor": tor, } # -- GET handlers ------------------------------------------------------------ class TestHandleStatus: """Test GET /status handler.""" def test_basic(self): ctx = _make_ctx() ctx["metrics"].connections = 10 ctx["metrics"].success = 8 status, body = _handle_status(ctx) assert status == 200 assert body["connections"] == 10 assert body["success"] == 8 assert "uptime" in body assert "rate" in body assert "latency" in body def test_with_pool(self): pool = MagicMock() pool.alive_count = 5 pool.count = 10 ctx = _make_ctx(pool=pool) _, body = _handle_status(ctx) assert body["pool"] == {"alive": 5, "total": 10} def test_with_listeners(self): config = Config( listeners=[ ListenerConfig( listen_host="0.0.0.0", listen_port=1080, chain=[ChainHop("socks5", "127.0.0.1", 9050)], ), ListenerConfig( listen_host="0.0.0.0", listen_port=1081, chain=[ChainHop("socks5", "127.0.0.1", 9050)], pool_seq=[["default"]], ), ], ) ctx = _make_ctx(config=config) # record some latency for the first listener ctx["metrics"].get_listener_latency("0.0.0.0:1080").record(0.2) _, body = _handle_status(ctx) assert len(body["listeners"]) == 2 assert body["listeners"][0]["chain"] == ["socks5://127.0.0.1:9050"] assert body["listeners"][0]["pool_hops"] == 0 assert body["listeners"][1]["pool_hops"] == 1 # per-listener latency present on each entry assert "latency" in body["listeners"][0] assert body["listeners"][0]["latency"]["count"] == 1 assert "latency" in body["listeners"][1] assert body["listeners"][1]["latency"] is None class TestHandleStatusAuth: """Test auth flag in /status listener entries.""" def test_auth_flag_present(self): config = Config( listeners=[ ListenerConfig( listen_host="0.0.0.0", listen_port=1080, auth={"alice": "s3cret", "bob": "hunter2"}, ), ], ) ctx = _make_ctx(config=config) _, body = _handle_status(ctx) assert body["listeners"][0]["auth"] is True def test_auth_flag_absent_when_empty(self): config = Config( listeners=[ ListenerConfig(listen_host="0.0.0.0", listen_port=1080), ], ) ctx = _make_ctx(config=config) _, body = _handle_status(ctx) assert "auth" not in body["listeners"][0] class TestHandleConfigAuth: """Test auth_users in /config listener entries.""" def test_auth_users_count(self): config = Config( listeners=[ ListenerConfig( listen_host="0.0.0.0", listen_port=1080, auth={"alice": "s3cret", "bob": "hunter2"}, ), ], ) ctx = _make_ctx(config=config) _, body = _handle_config(ctx) assert body["listeners"][0]["auth_users"] == 2 def test_auth_users_absent_when_empty(self): config = Config( listeners=[ ListenerConfig(listen_host="0.0.0.0", listen_port=1080), ], ) ctx = _make_ctx(config=config) _, body = _handle_config(ctx) assert "auth_users" not in body["listeners"][0] def test_passwords_not_exposed(self): config = Config( listeners=[ ListenerConfig( listen_host="0.0.0.0", listen_port=1080, auth={"alice": "s3cret"}, ), ], ) ctx = _make_ctx(config=config) _, body = _handle_config(ctx) listener = body["listeners"][0] # only count, never passwords assert "auth_users" in listener assert "auth" not in listener assert "s3cret" not in str(body) class TestHandleStatusRetries: """Test retries in /status listener entries.""" def test_retries_present_when_set(self): config = Config( listeners=[ ListenerConfig( listen_host="0.0.0.0", listen_port=1080, retries=5, ), ], ) ctx = _make_ctx(config=config) _, body = _handle_status(ctx) assert body["listeners"][0]["retries"] == 5 def test_retries_absent_when_zero(self): config = Config( listeners=[ ListenerConfig(listen_host="0.0.0.0", listen_port=1080), ], ) ctx = _make_ctx(config=config) _, body = _handle_status(ctx) assert "retries" not in body["listeners"][0] class TestHandleConfigRetries: """Test retries in /config listener entries.""" def test_retries_present_when_set(self): config = Config( listeners=[ ListenerConfig( listen_host="0.0.0.0", listen_port=1080, retries=7, ), ], ) ctx = _make_ctx(config=config) _, body = _handle_config(ctx) assert body["listeners"][0]["retries"] == 7 def test_retries_absent_when_zero(self): config = Config( listeners=[ ListenerConfig(listen_host="0.0.0.0", listen_port=1080), ], ) ctx = _make_ctx(config=config) _, body = _handle_config(ctx) assert "retries" not in body["listeners"][0] class TestHandleConfigAllowedProtos: """Test allowed_protos in /config pool entries.""" def test_allowed_protos_present(self): pp = ProxyPoolConfig( sources=[], allowed_protos=["socks5"], ) config = Config( proxy_pools={"socks_only": pp}, listeners=[ListenerConfig()], ) ctx = _make_ctx(config=config) _, body = _handle_config(ctx) assert body["proxy_pools"]["socks_only"]["allowed_protos"] == ["socks5"] def test_allowed_protos_absent_when_empty(self): pp = ProxyPoolConfig(sources=[]) config = Config( proxy_pools={"default": pp}, listeners=[ListenerConfig()], ) ctx = _make_ctx(config=config) _, body = _handle_config(ctx) assert "allowed_protos" not in body["proxy_pools"]["default"] class TestHandleStatusPools: """Test GET /status with multiple named pools.""" def test_multi_pool_summary(self): pool_a = MagicMock() pool_a.alive_count = 5 pool_a.count = 10 pool_a.name = "clean" pool_b = MagicMock() pool_b.alive_count = 3 pool_b.count = 8 pool_b.name = "mitm" ctx = _make_ctx(pools={"clean": pool_a, "mitm": pool_b}) _, body = _handle_status(ctx) assert body["pool"] == {"alive": 8, "total": 18} assert body["pools"]["clean"] == {"alive": 5, "total": 10} assert body["pools"]["mitm"] == {"alive": 3, "total": 8} class TestHandleStatusMultiPool: """Test pool_seq appears in /status only for multi-pool listeners.""" def test_single_pool_no_pool_seq(self): """Single-pool listener: no pool_seq in response.""" config = Config( listeners=[ ListenerConfig( listen_host="0.0.0.0", listen_port=1080, chain=[ChainHop("socks5", "127.0.0.1", 9050)], pool_seq=[["clean"], ["clean"]], pool_name="clean", ), ], ) ctx = _make_ctx(config=config) _, body = _handle_status(ctx) assert "pool_seq" not in body["listeners"][0] def test_multi_pool_has_pool_seq(self): """Multi-pool listener: pool_seq appears in response.""" config = Config( listeners=[ ListenerConfig( listen_host="0.0.0.0", listen_port=1080, chain=[ChainHop("socks5", "127.0.0.1", 9050)], pool_seq=[["clean"], ["mitm"]], pool_name="clean", ), ], ) ctx = _make_ctx(config=config) _, body = _handle_status(ctx) assert body["listeners"][0]["pool_seq"] == [["clean"], ["mitm"]] assert body["listeners"][0]["pool_hops"] == 2 def test_multi_pool_in_config(self): """Multi-pool listener: pool_seq appears in /config response.""" config = Config( listeners=[ ListenerConfig( listen_host="0.0.0.0", listen_port=1080, chain=[ChainHop("socks5", "127.0.0.1", 9050)], pool_seq=[["clean"], ["mitm"]], pool_name="clean", ), ], ) ctx = _make_ctx(config=config) _, body = _handle_config(ctx) assert body["listeners"][0]["pool_seq"] == [["clean"], ["mitm"]] class TestHandleMetrics: """Test GET /metrics handler (OpenMetrics format).""" def test_returns_openmetrics_string(self): ctx = _make_ctx() ctx["metrics"].connections = 42 ctx["metrics"].bytes_in = 1024 status, body = _handle_metrics(ctx) assert status == 200 assert isinstance(body, str) assert body.rstrip().endswith("# EOF") def test_counter_values(self): ctx = _make_ctx() ctx["metrics"].connections = 42 ctx["metrics"].bytes_in = 1024 _, body = _handle_metrics(ctx) assert "s5p_connections_total 42" in body assert "s5p_bytes_in_total 1024" in body class TestRenderOpenMetrics: """Test OpenMetrics text rendering.""" def test_eof_terminator(self): ctx = _make_ctx() text = _render_openmetrics(ctx) assert text.rstrip().endswith("# EOF") assert text.endswith("\n") def test_type_declarations(self): ctx = _make_ctx() text = _render_openmetrics(ctx) assert "# TYPE s5p_connections counter" in text assert "# TYPE s5p_active_connections gauge" in text assert "# TYPE s5p_uptime_seconds gauge" in text def test_help_lines(self): ctx = _make_ctx() text = _render_openmetrics(ctx) assert "# HELP s5p_connections Total connection attempts." in text assert "# HELP s5p_active_connections Currently open connections." in text def test_counter_values(self): ctx = _make_ctx() ctx["metrics"].connections = 100 ctx["metrics"].success = 95 ctx["metrics"].failed = 5 ctx["metrics"].retries = 10 ctx["metrics"].auth_failures = 2 ctx["metrics"].bytes_in = 4096 ctx["metrics"].bytes_out = 8192 text = _render_openmetrics(ctx) assert "s5p_connections_total 100" in text assert "s5p_connections_success_total 95" in text assert "s5p_connections_failed_total 5" in text assert "s5p_retries_total 10" in text assert "s5p_auth_failures_total 2" in text assert "s5p_bytes_in_total 4096" in text assert "s5p_bytes_out_total 8192" in text def test_gauge_values(self): ctx = _make_ctx() ctx["metrics"].active = 7 text = _render_openmetrics(ctx) assert "s5p_active_connections 7" in text assert "s5p_uptime_seconds " in text assert "s5p_connection_rate " in text def test_no_latency_when_empty(self): ctx = _make_ctx() text = _render_openmetrics(ctx) assert "s5p_chain_latency_seconds" not in text def test_latency_summary(self): ctx = _make_ctx() for i in range(1, 101): ctx["metrics"].latency.record(i / 1000) text = _render_openmetrics(ctx) assert "# TYPE s5p_chain_latency_seconds summary" in text assert 's5p_chain_latency_seconds{quantile="0.5"}' in text assert 's5p_chain_latency_seconds{quantile="0.95"}' in text assert 's5p_chain_latency_seconds{quantile="0.99"}' in text assert "s5p_chain_latency_seconds_count 100" in text assert "s5p_chain_latency_seconds_sum " in text def test_listener_latency_summary(self): ctx = _make_ctx() tracker = ctx["metrics"].get_listener_latency("0.0.0.0:1080") for i in range(1, 51): tracker.record(i / 1000) text = _render_openmetrics(ctx) assert "# TYPE s5p_listener_chain_latency_seconds summary" in text assert ( 's5p_listener_chain_latency_seconds{listener="0.0.0.0:1080",' 'quantile="0.5"}' ) in text assert ( 's5p_listener_chain_latency_seconds_count{listener="0.0.0.0:1080"} 50' ) in text def test_pool_gauges_multi(self): pool_a = MagicMock() pool_a.alive_count = 5 pool_a.count = 10 pool_a.name = "clean" pool_b = MagicMock() pool_b.alive_count = 3 pool_b.count = 8 pool_b.name = "mitm" ctx = _make_ctx(pools={"clean": pool_a, "mitm": pool_b}) text = _render_openmetrics(ctx) assert '# TYPE s5p_pool_proxies_alive gauge' in text assert 's5p_pool_proxies_alive{pool="clean"} 5' in text assert 's5p_pool_proxies_alive{pool="mitm"} 3' in text assert 's5p_pool_proxies_total{pool="clean"} 10' in text assert 's5p_pool_proxies_total{pool="mitm"} 8' in text def test_pool_gauges_single(self): pool = MagicMock() pool.alive_count = 12 pool.count = 20 ctx = _make_ctx(pool=pool) text = _render_openmetrics(ctx) assert "s5p_pool_proxies_alive 12" in text assert "s5p_pool_proxies_total 20" in text def test_no_pool_metrics_when_unconfigured(self): ctx = _make_ctx() text = _render_openmetrics(ctx) assert "s5p_pool_proxies" not in text class TestHandlePool: """Test GET /pool handler.""" def test_no_pool(self): ctx = _make_ctx() status, body = _handle_pool(ctx) assert status == 200 assert body == {"alive": 0, "total": 0, "proxies": {}} def test_with_entries(self): pool = MagicMock() pool.alive_count = 1 pool.count = 2 entry_alive = MagicMock( alive=True, fails=0, tests=5, last_ok=100.0, last_test=100.0, last_seen=100.0, ) entry_dead = MagicMock( alive=False, fails=3, tests=5, last_ok=0.0, last_test=100.0, last_seen=100.0, ) pool._proxies = { "socks5://1.2.3.4:1080": entry_alive, "socks5://5.6.7.8:1080": entry_dead, } ctx = _make_ctx(pool=pool) _, body = _handle_pool(ctx) assert len(body["proxies"]) == 2 assert body["proxies"]["socks5://1.2.3.4:1080"]["alive"] is True def test_alive_only(self): pool = MagicMock() pool.alive_count = 1 pool.count = 2 entry_alive = MagicMock( alive=True, fails=0, tests=5, last_ok=100.0, last_test=100.0, last_seen=100.0, ) entry_dead = MagicMock( alive=False, fails=3, tests=5, last_ok=0.0, last_test=100.0, last_seen=100.0, ) pool._proxies = { "socks5://1.2.3.4:1080": entry_alive, "socks5://5.6.7.8:1080": entry_dead, } ctx = _make_ctx(pool=pool) _, body = _handle_pool(ctx, alive_only=True) assert len(body["proxies"]) == 1 assert "socks5://1.2.3.4:1080" in body["proxies"] class TestHandlePoolMulti: """Test GET /pool with multiple named pools.""" def test_merges_entries(self): pool_a = MagicMock() pool_a.alive_count = 1 pool_a.count = 1 pool_a.name = "clean" entry_a = MagicMock( alive=True, fails=0, tests=5, last_ok=100.0, last_test=100.0, last_seen=100.0, ) pool_a._proxies = {"socks5://1.2.3.4:1080": entry_a} pool_b = MagicMock() pool_b.alive_count = 1 pool_b.count = 1 pool_b.name = "mitm" entry_b = MagicMock( alive=True, fails=0, tests=3, last_ok=90.0, last_test=90.0, last_seen=90.0, ) pool_b._proxies = {"socks5://5.6.7.8:1080": entry_b} ctx = _make_ctx(pools={"clean": pool_a, "mitm": pool_b}) _, body = _handle_pool(ctx) assert body["alive"] == 2 assert body["total"] == 2 assert len(body["proxies"]) == 2 assert body["proxies"]["socks5://1.2.3.4:1080"]["pool"] == "clean" assert body["proxies"]["socks5://5.6.7.8:1080"]["pool"] == "mitm" assert "pools" in body assert body["pools"]["clean"] == {"alive": 1, "total": 1} def test_single_pool_no_pool_field(self): """Single pool: no 'pool' field on entries, no 'pools' summary.""" pool = MagicMock() pool.alive_count = 1 pool.count = 1 pool.name = "default" entry = MagicMock( alive=True, fails=0, tests=5, last_ok=100.0, last_test=100.0, last_seen=100.0, ) pool._proxies = {"socks5://1.2.3.4:1080": entry} ctx = _make_ctx(pools={"default": pool}) _, body = _handle_pool(ctx) assert "pool" not in body["proxies"]["socks5://1.2.3.4:1080"] assert "pools" not in body class TestHandleConfig: """Test GET /config handler.""" def test_basic(self): config = Config( timeout=15.0, retries=5, log_level="debug", listeners=[ListenerConfig(listen_host="0.0.0.0", listen_port=1080)], ) ctx = _make_ctx(config=config) status, body = _handle_config(ctx) assert status == 200 assert body["timeout"] == 15.0 assert body["retries"] == 5 assert body["log_level"] == "debug" assert len(body["listeners"]) == 1 assert body["listeners"][0]["listen"] == "0.0.0.0:1080" def test_with_proxy_pool(self): pp = ProxyPoolConfig( sources=[PoolSourceConfig(url="http://api:8081/proxies")], refresh=600.0, test_interval=60.0, max_fails=5, ) config = Config( proxy_pool=pp, listeners=[ListenerConfig( chain=[ChainHop("socks5", "127.0.0.1", 9050)], pool_seq=["default"], )], ) ctx = _make_ctx(config=config) _, body = _handle_config(ctx) assert body["proxy_pool"]["refresh"] == 600.0 assert body["proxy_pool"]["sources"][0]["url"] == "http://api:8081/proxies" assert body["listeners"][0]["pool_hops"] == 1 def test_with_proxy_pools(self): pp_clean = ProxyPoolConfig( sources=[PoolSourceConfig(url="http://api:8081/proxies/all", mitm=False)], refresh=300.0, test_interval=120.0, max_fails=3, ) pp_mitm = ProxyPoolConfig( sources=[PoolSourceConfig(url="http://api:8081/proxies/all", mitm=True)], refresh=300.0, test_interval=120.0, max_fails=3, ) config = Config( proxy_pools={"clean": pp_clean, "mitm": pp_mitm}, listeners=[ListenerConfig( listen_host="0.0.0.0", listen_port=1080, chain=[ChainHop("socks5", "127.0.0.1", 9050)], pool_seq=[["clean"], ["clean"]], pool_name="clean", )], ) ctx = _make_ctx(config=config) _, body = _handle_config(ctx) assert "proxy_pools" in body assert body["proxy_pools"]["clean"]["sources"][0]["mitm"] is False assert body["proxy_pools"]["mitm"]["sources"][0]["mitm"] is True assert body["listeners"][0]["pool"] == "clean" def test_with_tor_nodes(self): config = Config( tor_nodes=[ ChainHop("socks5", "10.200.1.1", 9050), ChainHop("socks5", "10.200.1.13", 9050), ], listeners=[ListenerConfig()], ) ctx = _make_ctx(config=config) _, body = _handle_config(ctx) assert body["tor_nodes"] == [ "socks5://10.200.1.1:9050", "socks5://10.200.1.13:9050", ] def test_no_tor_nodes(self): config = Config(listeners=[ListenerConfig()]) ctx = _make_ctx(config=config) _, body = _handle_config(ctx) assert "tor_nodes" not in body class TestHandleStatusTorNodes: """Test tor_nodes in GET /status response.""" def test_tor_nodes_in_status(self): config = Config( tor_nodes=[ ChainHop("socks5", "10.200.1.1", 9050), ChainHop("socks5", "10.200.1.13", 9050), ], listeners=[ListenerConfig()], ) ctx = _make_ctx(config=config) _, body = _handle_status(ctx) assert body["tor_nodes"] == [ "socks5://10.200.1.1:9050", "socks5://10.200.1.13:9050", ] def test_no_tor_nodes_in_status(self): ctx = _make_ctx() _, body = _handle_status(ctx) assert "tor_nodes" not in body # -- routing ----------------------------------------------------------------- class TestRouting: """Test request routing and error responses.""" def test_get_status(self): ctx = _make_ctx() status, body = asyncio.run(_route("GET", "/status", ctx)) assert status == 200 def test_get_metrics(self): ctx = _make_ctx() status, _ = asyncio.run(_route("GET", "/metrics", ctx)) assert status == 200 def test_get_pool(self): ctx = _make_ctx() status, _ = asyncio.run(_route("GET", "/pool", ctx)) assert status == 200 def test_get_pool_alive(self): ctx = _make_ctx() status, _ = asyncio.run(_route("GET", "/pool/alive", ctx)) assert status == 200 def test_get_config(self): ctx = _make_ctx() status, _ = asyncio.run(_route("GET", "/config", ctx)) assert status == 200 def test_post_reload(self): ctx = _make_ctx() ctx["reload_fn"] = AsyncMock() status, body = asyncio.run(_route("POST", "/reload", ctx)) assert status == 200 assert body == {"ok": True} def test_post_pool_test(self): pool = MagicMock() pool._run_health_tests = AsyncMock() ctx = _make_ctx(pool=pool) status, body = asyncio.run(_route("POST", "/pool/test", ctx)) assert status == 200 assert body == {"ok": True} def test_post_pool_refresh(self): pool = MagicMock() pool._fetch_all_sources = AsyncMock() ctx = _make_ctx(pool=pool) status, body = asyncio.run(_route("POST", "/pool/refresh", ctx)) assert status == 200 assert body == {"ok": True} def test_404(self): ctx = _make_ctx() status, body = asyncio.run(_route("GET", "/nonexistent", ctx)) assert status == 404 assert "error" in body def test_405_wrong_method(self): ctx = _make_ctx() status, body = asyncio.run(_route("POST", "/status", ctx)) assert status == 405 assert "GET" in body["error"] def test_405_get_on_post_route(self): ctx = _make_ctx() status, body = asyncio.run(_route("GET", "/reload", ctx)) assert status == 405 assert "POST" in body["error"] def test_get_tor(self): ctx = _make_ctx() status, body = asyncio.run(_route("GET", "/tor", ctx)) assert status == 200 assert body == {"enabled": False} def test_post_tor_newnym(self): tor = MagicMock() tor.newnym = AsyncMock(return_value=True) ctx = _make_ctx(tor=tor) status, body = asyncio.run(_route("POST", "/tor/newnym", ctx)) assert status == 200 assert body == {"ok": True} # -- Tor endpoints ---------------------------------------------------------- class TestHandleTor: """Test GET /tor handler.""" def test_disabled(self): ctx = _make_ctx() status, body = _handle_tor(ctx) assert status == 200 assert body == {"enabled": False} def test_connected(self): tor = MagicMock() tor.connected = True tor.last_newnym = 0.0 tor.newnym_interval = 60.0 ctx = _make_ctx(tor=tor) status, body = _handle_tor(ctx) assert status == 200 assert body["enabled"] is True assert body["connected"] is True assert body["last_newnym"] is None assert body["newnym_interval"] == 60.0 def test_with_last_newnym(self): import time tor = MagicMock() tor.connected = True tor.last_newnym = time.monotonic() - 42.0 tor.newnym_interval = 0.0 ctx = _make_ctx(tor=tor) status, body = _handle_tor(ctx) assert status == 200 assert body["last_newnym"] is not None assert body["last_newnym"] >= 42.0 class TestHandleTorNewnym: """Test POST /tor/newnym handler.""" def test_success(self): tor = MagicMock() tor.newnym = AsyncMock(return_value=True) ctx = _make_ctx(tor=tor) status, body = asyncio.run(_handle_tor_newnym(ctx)) assert status == 200 assert body == {"ok": True} def test_rate_limited(self): tor = MagicMock() tor.newnym = AsyncMock(return_value=False) ctx = _make_ctx(tor=tor) status, body = asyncio.run(_handle_tor_newnym(ctx)) assert status == 200 assert body["ok"] is False assert "reason" in body def test_not_configured(self): ctx = _make_ctx() status, body = asyncio.run(_handle_tor_newnym(ctx)) assert status == 400 assert "error" in body