feat: route plugin HTTP traffic through SOCKS5 proxy
Add PySocks dependency and shared src/derp/http.py module providing proxy-aware urlopen() and build_opener() that route through socks5h://127.0.0.1:1080. Subclassed SocksiPyHandler passes SSL context through to HTTPS connections. Swapped 14 external-facing plugins to use the proxied helpers. Local-only traffic (SearXNG, raw DNS/TLS sockets) stays direct. Updated test mocks in test_twitch and test_alert accordingly.
This commit is contained in:
@@ -345,7 +345,7 @@ class TestQueryStream:
|
||||
"""Test _query_stream response parsing with mocked HTTP."""
|
||||
|
||||
def test_live_response(self):
|
||||
with patch("urllib.request.urlopen", return_value=_FakeGqlResp(GQL_LIVE)):
|
||||
with patch.object(_mod, "_urlopen", return_value=_FakeGqlResp(GQL_LIVE)):
|
||||
result = _mod._query_stream("xqc")
|
||||
assert result["exists"] is True
|
||||
assert result["live"] is True
|
||||
@@ -358,7 +358,7 @@ class TestQueryStream:
|
||||
assert result["error"] == ""
|
||||
|
||||
def test_offline_response(self):
|
||||
with patch("urllib.request.urlopen", return_value=_FakeGqlResp(GQL_OFFLINE)):
|
||||
with patch.object(_mod, "_urlopen", return_value=_FakeGqlResp(GQL_OFFLINE)):
|
||||
result = _mod._query_stream("xqc")
|
||||
assert result["exists"] is True
|
||||
assert result["live"] is False
|
||||
@@ -366,20 +366,20 @@ class TestQueryStream:
|
||||
assert result["stream_id"] == ""
|
||||
|
||||
def test_not_found_response(self):
|
||||
with patch("urllib.request.urlopen", return_value=_FakeGqlResp(GQL_NOT_FOUND)):
|
||||
with patch.object(_mod, "_urlopen", return_value=_FakeGqlResp(GQL_NOT_FOUND)):
|
||||
result = _mod._query_stream("nobody")
|
||||
assert result["exists"] is False
|
||||
assert result["live"] is False
|
||||
|
||||
def test_no_game_response(self):
|
||||
with patch("urllib.request.urlopen", return_value=_FakeGqlResp(GQL_LIVE_NO_GAME)):
|
||||
with patch.object(_mod, "_urlopen", return_value=_FakeGqlResp(GQL_LIVE_NO_GAME)):
|
||||
result = _mod._query_stream("streamer")
|
||||
assert result["exists"] is True
|
||||
assert result["live"] is True
|
||||
assert result["game"] == ""
|
||||
|
||||
def test_network_error(self):
|
||||
with patch("urllib.request.urlopen", side_effect=Exception("timeout")):
|
||||
with patch.object(_mod, "_urlopen", side_effect=Exception("timeout")):
|
||||
result = _mod._query_stream("xqc")
|
||||
assert result["error"] == "timeout"
|
||||
assert result["exists"] is False
|
||||
|
||||
Reference in New Issue
Block a user