diff --git a/plugins/alert.py b/plugins/alert.py index 3c443f4..ff984a9 100644 --- a/plugins/alert.py +++ b/plugins/alert.py @@ -5,10 +5,10 @@ from __future__ import annotations import asyncio import json import re -import ssl import urllib.request from datetime import datetime, timezone +from derp.http import urlopen as _urlopen from derp.plugin import command, event # -- Constants --------------------------------------------------------------- @@ -106,8 +106,7 @@ def _search_youtube(keyword: str) -> list[dict]: req = urllib.request.Request(_YT_SEARCH_URL, data=payload, method="POST") req.add_header("Content-Type", "application/json") - ctx = ssl.create_default_context() - resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT, context=ctx) + resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() @@ -141,8 +140,7 @@ def _search_twitch(keyword: str) -> list[dict]: req.add_header("Client-Id", _GQL_CLIENT_ID) req.add_header("Content-Type", "application/json") - ctx = ssl.create_default_context() - resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT, context=ctx) + resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() diff --git a/plugins/crtsh.py b/plugins/crtsh.py index aae5b55..7ea1547 100644 --- a/plugins/crtsh.py +++ b/plugins/crtsh.py @@ -13,6 +13,7 @@ import urllib.request from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone +from derp.http import urlopen as _urlopen from derp.plugin import command log = logging.getLogger(__name__) @@ -32,7 +33,7 @@ def fetch_crtsh(domain: str) -> list[dict]: """GET crt.sh JSON for a domain. Blocking.""" url = _CRTSH_URL.format(domain=domain) req = urllib.request.Request(url, headers={"User-Agent": "derp-irc-bot"}) - with urllib.request.urlopen(req, timeout=_CRTSH_TIMEOUT) as resp: + with _urlopen(req, timeout=_CRTSH_TIMEOUT) as resp: return json.loads(resp.read()) diff --git a/plugins/cve.py b/plugins/cve.py index ebc095a..495dc2a 100644 --- a/plugins/cve.py +++ b/plugins/cve.py @@ -8,6 +8,7 @@ import re import time from pathlib import Path +from derp.http import urlopen as _urlopen from derp.plugin import command log = logging.getLogger(__name__) @@ -121,7 +122,7 @@ async def _download_nvd() -> tuple[int, str]: def _fetch(url): req = urllib.request.Request(url, headers={"User-Agent": "derp-bot"}) - with urllib.request.urlopen(req, timeout=120) as resp: # noqa: S310 + with _urlopen(req, timeout=120) as resp: return resp.read() try: diff --git a/plugins/exploitdb.py b/plugins/exploitdb.py index a3e64f7..bda5a63 100644 --- a/plugins/exploitdb.py +++ b/plugins/exploitdb.py @@ -7,6 +7,7 @@ import logging import time from pathlib import Path +from derp.http import urlopen as _urlopen from derp.plugin import command log = logging.getLogger(__name__) @@ -85,7 +86,7 @@ async def _download_csv() -> tuple[int, str]: def _fetch(): req = urllib.request.Request(_CSV_URL, headers={"User-Agent": "derp-bot"}) - with urllib.request.urlopen(req, timeout=60) as resp: # noqa: S310 + with _urlopen(req, timeout=60) as resp: return resp.read() try: diff --git a/plugins/headers.py b/plugins/headers.py index 6211b51..9c30f14 100644 --- a/plugins/headers.py +++ b/plugins/headers.py @@ -8,6 +8,7 @@ import re import ssl import urllib.request +from derp.http import build_opener as _build_opener from derp.plugin import command log = logging.getLogger(__name__) @@ -84,9 +85,7 @@ def _fetch_headers(url: str) -> tuple[dict[str, str], str]: ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE - opener = urllib.request.build_opener( - urllib.request.HTTPSHandler(context=ctx), - ) + opener = _build_opener(context=ctx) req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", _USER_AGENT) diff --git a/plugins/httpcheck.py b/plugins/httpcheck.py index eeb763a..71bab3e 100644 --- a/plugins/httpcheck.py +++ b/plugins/httpcheck.py @@ -7,6 +7,7 @@ import ssl import time import urllib.request +from derp.http import build_opener as _build_opener from derp.plugin import command _TIMEOUT = 10 @@ -41,10 +42,7 @@ def _check(url: str) -> dict: ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE - opener = urllib.request.build_opener( - NoRedirect, - urllib.request.HTTPSHandler(context=ctx), - ) + opener = _build_opener(NoRedirect, context=ctx) req = urllib.request.Request(url, method="HEAD") req.add_header("User-Agent", _USER_AGENT) diff --git a/plugins/iprep.py b/plugins/iprep.py index e55de73..f87d337 100644 --- a/plugins/iprep.py +++ b/plugins/iprep.py @@ -7,6 +7,7 @@ import logging import time from pathlib import Path +from derp.http import urlopen as _urlopen from derp.plugin import command log = logging.getLogger(__name__) @@ -111,7 +112,7 @@ async def _download_feeds() -> tuple[int, int]: async def _fetch_one(filename: str, url: str) -> bool: def _do(): req = urllib.request.Request(url, headers={"User-Agent": "derp-bot"}) - with urllib.request.urlopen(req, timeout=30) as resp: # noqa: S310 + with _urlopen(req, timeout=30) as resp: return resp.read() try: diff --git a/plugins/rss.py b/plugins/rss.py index f2776bc..24b07f6 100644 --- a/plugins/rss.py +++ b/plugins/rss.py @@ -5,12 +5,12 @@ from __future__ import annotations import asyncio import json import re -import ssl import urllib.request import xml.etree.ElementTree as ET from datetime import datetime, timezone from urllib.parse import urlparse +from derp.http import urlopen as _urlopen from derp.plugin import command, event # -- Constants --------------------------------------------------------------- @@ -111,10 +111,8 @@ def _fetch_feed(url: str, etag: str = "", last_modified: str = "") -> dict: if last_modified: req.add_header("If-Modified-Since", last_modified) - ctx = ssl.create_default_context() - try: - resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT, context=ctx) + resp = _urlopen(req, timeout=_FETCH_TIMEOUT) result["status"] = resp.status result["body"] = resp.read() result["etag"] = resp.headers.get("ETag", "") diff --git a/plugins/subdomain.py b/plugins/subdomain.py index 5178be7..7b9e956 100644 --- a/plugins/subdomain.py +++ b/plugins/subdomain.py @@ -12,6 +12,7 @@ import socket import struct import urllib.request +from derp.http import urlopen as _urlopen from derp.plugin import command log = logging.getLogger(__name__) @@ -130,7 +131,7 @@ def _fetch_crtsh(domain: str) -> set[str]: """Fetch subdomains from crt.sh CT logs. Blocking.""" url = _CRTSH_URL.format(domain=domain) req = urllib.request.Request(url, headers={"User-Agent": "derp-bot"}) - with urllib.request.urlopen(req, timeout=_CRTSH_TIMEOUT) as resp: # noqa: S310 + with _urlopen(req, timeout=_CRTSH_TIMEOUT) as resp: data = json.loads(resp.read()) subs: set[str] = set() diff --git a/plugins/torcheck.py b/plugins/torcheck.py index 2e53e00..9bb1a4c 100644 --- a/plugins/torcheck.py +++ b/plugins/torcheck.py @@ -7,6 +7,7 @@ import logging import time from pathlib import Path +from derp.http import urlopen as _urlopen from derp.plugin import command log = logging.getLogger(__name__) @@ -66,7 +67,7 @@ async def _download_exits() -> int: def _fetch(): req = urllib.request.Request(_TOR_EXIT_URL, headers={"User-Agent": "derp-bot"}) - with urllib.request.urlopen(req, timeout=30) as resp: # noqa: S310 + with _urlopen(req, timeout=30) as resp: return resp.read().decode("utf-8", errors="replace") try: diff --git a/plugins/twitch.py b/plugins/twitch.py index 201b597..67d773b 100644 --- a/plugins/twitch.py +++ b/plugins/twitch.py @@ -5,10 +5,10 @@ from __future__ import annotations import asyncio import json import re -import ssl import urllib.request from datetime import datetime, timezone +from derp.http import urlopen as _urlopen from derp.plugin import command, event # -- Constants --------------------------------------------------------------- @@ -79,10 +79,8 @@ def _query_stream(login: str) -> dict: req.add_header("Client-Id", _GQL_CLIENT_ID) req.add_header("Content-Type", "application/json") - ctx = ssl.create_default_context() - try: - resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT, context=ctx) + resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) diff --git a/plugins/username.py b/plugins/username.py index 0efbf0a..3dd46fa 100644 --- a/plugins/username.py +++ b/plugins/username.py @@ -17,6 +17,7 @@ import urllib.request from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass +from derp.http import urlopen as _urlopen from derp.plugin import command log = logging.getLogger(__name__) @@ -94,7 +95,7 @@ def _http_get(url: str, timeout: int = _TIMEOUT) -> tuple[int, str]: req = urllib.request.Request(url, headers={"User-Agent": _USER_AGENT}) try: - with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: + with _urlopen(req, timeout=timeout, context=ctx) as resp: body = resp.read().decode("utf-8", errors="replace") return resp.status, body except urllib.error.HTTPError as exc: diff --git a/plugins/wayback.py b/plugins/wayback.py index 292cd51..2665078 100644 --- a/plugins/wayback.py +++ b/plugins/wayback.py @@ -7,6 +7,7 @@ import json import logging import urllib.request +from derp.http import urlopen as _urlopen from derp.plugin import command log = logging.getLogger(__name__) @@ -28,7 +29,7 @@ def _lookup(url: str, timestamp: str = "") -> dict: ) try: - resp = urllib.request.urlopen(req, timeout=_TIMEOUT) + resp = _urlopen(req, timeout=_TIMEOUT) data = json.loads(resp.read().decode("utf-8")) resp.close() return data diff --git a/plugins/youtube.py b/plugins/youtube.py index 098fdce..4e2054d 100644 --- a/plugins/youtube.py +++ b/plugins/youtube.py @@ -5,12 +5,12 @@ from __future__ import annotations import asyncio import json import re -import ssl import urllib.request import xml.etree.ElementTree as ET from datetime import datetime, timezone from urllib.parse import urlparse +from derp.http import urlopen as _urlopen from derp.plugin import command, event # -- Constants --------------------------------------------------------------- @@ -97,9 +97,8 @@ def _resolve_channel(url: str) -> str | None: """ req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", _BROWSER_UA) - ctx = ssl.create_default_context() try: - resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT, context=ctx) + resp = _urlopen(req, timeout=_FETCH_TIMEOUT) body = resp.read(1_048_576) # Read up to 1MB resp.close() except Exception: @@ -128,10 +127,8 @@ def _fetch_feed(url: str, etag: str = "", last_modified: str = "") -> dict: if last_modified: req.add_header("If-Modified-Since", last_modified) - ctx = ssl.create_default_context() - try: - resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT, context=ctx) + resp = _urlopen(req, timeout=_FETCH_TIMEOUT) result["status"] = resp.status result["body"] = resp.read() result["etag"] = resp.headers.get("ETag", "") diff --git a/pyproject.toml b/pyproject.toml index ee0b9ca..01665b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ requires-python = ">=3.11" license = "MIT" dependencies = [ "maxminddb>=2.0", + "PySocks>=1.7.1", ] [project.scripts] diff --git a/src/derp/http.py b/src/derp/http.py new file mode 100644 index 0000000..b6f8db6 --- /dev/null +++ b/src/derp/http.py @@ -0,0 +1,48 @@ +"""Proxy-aware HTTP helpers -- routes outbound traffic through SOCKS5.""" + +import socket +import ssl +import urllib.request + +from socks import SOCKS5 +from sockshandler import SocksiPyConnectionS, SocksiPyHandler + +_PROXY_ADDR = "127.0.0.1" +_PROXY_PORT = 1080 + + +class _ProxyHandler(SocksiPyHandler, urllib.request.HTTPSHandler): + """SOCKS5 handler that forwards SSL context to HTTPS connections.""" + + def __init__(self, context=None): + self._ssl_context = context or ssl.create_default_context() + SocksiPyHandler.__init__(self, SOCKS5, _PROXY_ADDR, _PROXY_PORT, True) + + def https_open(self, req): + """Open HTTPS connection through SOCKS5 with SSL context.""" + ctx = self._ssl_context + + def build(host, port=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, **kwargs): + conn = SocksiPyConnectionS( + *self.args, host=host, port=port, context=ctx, **self.kw, + ) + conn.timeout = timeout + return conn + + return self.do_open(build, req) + + +def urlopen(req, *, timeout=None, context=None): + """Proxy-aware drop-in for urllib.request.urlopen.""" + handler = _ProxyHandler(context=context) + opener = urllib.request.build_opener(handler) + kwargs = {} + if timeout is not None: + kwargs["timeout"] = timeout + return opener.open(req, **kwargs) + + +def build_opener(*handlers, context=None): + """Proxy-aware drop-in for urllib.request.build_opener.""" + proxy = _ProxyHandler(context=context) + return urllib.request.build_opener(proxy, *handlers) diff --git a/tests/test_alert.py b/tests/test_alert.py index e9885a8..ef393f8 100644 --- a/tests/test_alert.py +++ b/tests/test_alert.py @@ -370,7 +370,7 @@ class TestExtractVideos: def close(self): pass - with patch("urllib.request.urlopen", return_value=FakeResp()): + with patch.object(_mod, "_urlopen", return_value=FakeResp()): results = _search_youtube("test") assert len(results) == 1 assert results[0]["id"] == "dup1" @@ -388,7 +388,7 @@ class TestSearchYoutube: def close(self): pass - with patch("urllib.request.urlopen", return_value=FakeResp()): + with patch.object(_mod, "_urlopen", return_value=FakeResp()): results = _search_youtube("test query") assert len(results) == 2 assert results[0]["id"] == "abc123" @@ -396,7 +396,7 @@ class TestSearchYoutube: def test_http_error_propagates(self): import pytest - with patch("urllib.request.urlopen", side_effect=ConnectionError("fail")): + with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")): with pytest.raises(ConnectionError): _search_youtube("test") @@ -413,7 +413,7 @@ class TestSearchTwitch: def close(self): pass - with patch("urllib.request.urlopen", return_value=FakeResp()): + with patch.object(_mod, "_urlopen", return_value=FakeResp()): results = _search_twitch("minecraft") assert len(results) == 2 # Stream @@ -435,7 +435,7 @@ class TestSearchTwitch: def close(self): pass - with patch("urllib.request.urlopen", return_value=FakeResp()): + with patch.object(_mod, "_urlopen", return_value=FakeResp()): results = _search_twitch("nothing") assert results == [] @@ -448,13 +448,13 @@ class TestSearchTwitch: def close(self): pass - with patch("urllib.request.urlopen", return_value=FakeResp()): + with patch.object(_mod, "_urlopen", return_value=FakeResp()): results = _search_twitch("bad") assert results == [] def test_http_error_propagates(self): import pytest - with patch("urllib.request.urlopen", side_effect=ConnectionError("fail")): + with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")): with pytest.raises(ConnectionError): _search_twitch("test") @@ -482,7 +482,7 @@ class TestSearchTwitch: def close(self): pass - with patch("urllib.request.urlopen", return_value=FakeResp()): + with patch.object(_mod, "_urlopen", return_value=FakeResp()): results = _search_twitch("chat") assert len(results) == 1 assert "()" not in results[0]["title"] diff --git a/tests/test_http.py b/tests/test_http.py new file mode 100644 index 0000000..9ff9f09 --- /dev/null +++ b/tests/test_http.py @@ -0,0 +1,59 @@ +"""Tests for the SOCKS5 proxy HTTP module.""" + +import ssl +import urllib.request + +from socks import SOCKS5 + +from derp.http import _PROXY_ADDR, _PROXY_PORT, _ProxyHandler, build_opener + + +class TestProxyHandler: + def test_uses_socks5(self): + handler = _ProxyHandler() + assert handler.args[0] == SOCKS5 + + def test_proxy_address(self): + handler = _ProxyHandler() + assert handler.args[1] == _PROXY_ADDR + assert handler.args[2] == _PROXY_PORT + + def test_rdns_enabled(self): + handler = _ProxyHandler() + assert handler.args[3] is True + + def test_default_ssl_context(self): + handler = _ProxyHandler() + assert isinstance(handler._ssl_context, ssl.SSLContext) + + def test_custom_ssl_context(self): + ctx = ssl.create_default_context() + ctx.check_hostname = False + handler = _ProxyHandler(context=ctx) + assert handler._ssl_context is ctx + + def test_is_https_handler(self): + handler = _ProxyHandler() + assert isinstance(handler, urllib.request.HTTPSHandler) + + +class TestBuildOpener: + def test_includes_proxy_handler(self): + opener = build_opener() + proxy = [h for h in opener.handlers if isinstance(h, _ProxyHandler)] + assert len(proxy) == 1 + + def test_passes_extra_handlers(self): + class Custom(urllib.request.HTTPRedirectHandler): + pass + + opener = build_opener(Custom) + custom = [h for h in opener.handlers if isinstance(h, Custom)] + assert len(custom) == 1 + + def test_passes_ssl_context(self): + ctx = ssl.create_default_context() + ctx.check_hostname = False + opener = build_opener(context=ctx) + proxy = [h for h in opener.handlers if isinstance(h, _ProxyHandler)][0] + assert proxy._ssl_context is ctx diff --git a/tests/test_twitch.py b/tests/test_twitch.py index 778d142..23b316b 100644 --- a/tests/test_twitch.py +++ b/tests/test_twitch.py @@ -345,7 +345,7 @@ class TestQueryStream: """Test _query_stream response parsing with mocked HTTP.""" def test_live_response(self): - with patch("urllib.request.urlopen", return_value=_FakeGqlResp(GQL_LIVE)): + with patch.object(_mod, "_urlopen", return_value=_FakeGqlResp(GQL_LIVE)): result = _mod._query_stream("xqc") assert result["exists"] is True assert result["live"] is True @@ -358,7 +358,7 @@ class TestQueryStream: assert result["error"] == "" def test_offline_response(self): - with patch("urllib.request.urlopen", return_value=_FakeGqlResp(GQL_OFFLINE)): + with patch.object(_mod, "_urlopen", return_value=_FakeGqlResp(GQL_OFFLINE)): result = _mod._query_stream("xqc") assert result["exists"] is True assert result["live"] is False @@ -366,20 +366,20 @@ class TestQueryStream: assert result["stream_id"] == "" def test_not_found_response(self): - with patch("urllib.request.urlopen", return_value=_FakeGqlResp(GQL_NOT_FOUND)): + with patch.object(_mod, "_urlopen", return_value=_FakeGqlResp(GQL_NOT_FOUND)): result = _mod._query_stream("nobody") assert result["exists"] is False assert result["live"] is False def test_no_game_response(self): - with patch("urllib.request.urlopen", return_value=_FakeGqlResp(GQL_LIVE_NO_GAME)): + with patch.object(_mod, "_urlopen", return_value=_FakeGqlResp(GQL_LIVE_NO_GAME)): result = _mod._query_stream("streamer") assert result["exists"] is True assert result["live"] is True assert result["game"] == "" def test_network_error(self): - with patch("urllib.request.urlopen", side_effect=Exception("timeout")): + with patch.object(_mod, "_urlopen", side_effect=Exception("timeout")): result = _mod._query_stream("xqc") assert result["error"] == "timeout" assert result["exists"] is False