feat: route raw TCP traffic through SOCKS5 proxy
Add create_connection and open_connection helpers to the shared proxy module, covering portcheck, whois, tlscheck, and crtsh live-cert check. UDP-based plugins (dns, blacklist, subdomain) stay direct. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,11 +1,19 @@
|
||||
"""Tests for the SOCKS5 proxy HTTP module."""
|
||||
"""Tests for the SOCKS5 proxy HTTP/TCP module."""
|
||||
|
||||
import ssl
|
||||
import urllib.request
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import socks
|
||||
from socks import SOCKS5
|
||||
|
||||
from derp.http import _PROXY_ADDR, _PROXY_PORT, _ProxyHandler, build_opener
|
||||
from derp.http import (
|
||||
_PROXY_ADDR,
|
||||
_PROXY_PORT,
|
||||
_ProxyHandler,
|
||||
build_opener,
|
||||
create_connection,
|
||||
)
|
||||
|
||||
|
||||
class TestProxyHandler:
|
||||
@@ -57,3 +65,42 @@ class TestBuildOpener:
|
||||
opener = build_opener(context=ctx)
|
||||
proxy = [h for h in opener.handlers if isinstance(h, _ProxyHandler)][0]
|
||||
assert proxy._ssl_context is ctx
|
||||
|
||||
|
||||
class TestCreateConnection:
|
||||
@patch("derp.http.socks.socksocket")
|
||||
def test_sets_socks5_proxy(self, mock_cls):
|
||||
sock = MagicMock()
|
||||
mock_cls.return_value = sock
|
||||
create_connection(("example.com", 443), timeout=5)
|
||||
sock.set_proxy.assert_called_once_with(
|
||||
SOCKS5, _PROXY_ADDR, _PROXY_PORT, rdns=True,
|
||||
)
|
||||
|
||||
@patch("derp.http.socks.socksocket")
|
||||
def test_connects_to_target(self, mock_cls):
|
||||
sock = MagicMock()
|
||||
mock_cls.return_value = sock
|
||||
create_connection(("example.com", 443))
|
||||
sock.connect.assert_called_once_with(("example.com", 443))
|
||||
|
||||
@patch("derp.http.socks.socksocket")
|
||||
def test_sets_timeout(self, mock_cls):
|
||||
sock = MagicMock()
|
||||
mock_cls.return_value = sock
|
||||
create_connection(("example.com", 80), timeout=7)
|
||||
sock.settimeout.assert_called_once_with(7)
|
||||
|
||||
@patch("derp.http.socks.socksocket")
|
||||
def test_no_timeout_when_none(self, mock_cls):
|
||||
sock = MagicMock()
|
||||
mock_cls.return_value = sock
|
||||
create_connection(("example.com", 80))
|
||||
sock.settimeout.assert_not_called()
|
||||
|
||||
@patch("derp.http.socks.socksocket")
|
||||
def test_returns_socket(self, mock_cls):
|
||||
sock = MagicMock()
|
||||
mock_cls.return_value = sock
|
||||
result = create_connection(("example.com", 443))
|
||||
assert result is sock
|
||||
|
||||
Reference in New Issue
Block a user