diff --git a/Containerfile b/Containerfile index b1827e0..00d20c5 100644 --- a/Containerfile +++ b/Containerfile @@ -2,7 +2,8 @@ FROM python:3.13-alpine WORKDIR /app -RUN pip install --no-cache-dir maxminddb>=2.0 PySocks>=1.7.1 +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt ENV PYTHONPATH=/app/src ENV PYTHONUNBUFFERED=1 diff --git a/TASKS.md b/TASKS.md index 5590299..0e1f76f 100644 --- a/TASKS.md +++ b/TASKS.md @@ -1,6 +1,17 @@ # derp - Tasks -## Current Sprint -- v1.2.1 Performance + Polish (2026-02-17) +## Current Sprint -- v1.2.2 Connection Pooling + Batch OG (2026-02-17) + +| Pri | Status | Task | +|-----|--------|------| +| P0 | [x] | Batch `_fetch_og` calls via ThreadPoolExecutor (alert.py) | +| P0 | [x] | Connection pooling via `urllib3[socks]` SOCKSProxyManager (http.py) | +| P1 | [x] | Cache FlaskPaste `_ssl_context()` at module level | +| P1 | [x] | Backward-compat `urllib.error.HTTPError` for 4xx/5xx in pooled path | +| P1 | [x] | Legacy opener fallback for `context=` callers (username.py) | +| P2 | [x] | Containerfile uses requirements.txt for deps | + +## Previous Sprint -- v1.2.1 Performance + Polish (2026-02-17) | Pri | Status | Task | |-----|--------|------| diff --git a/plugins/alert.py b/plugins/alert.py index 7005573..958cc4a 100644 --- a/plugins/alert.py +++ b/plugins/alert.py @@ -330,6 +330,23 @@ def _fetch_og(url: str) -> tuple[str, str, str]: return "", "", "" +def _fetch_og_batch(urls: list[str]) -> dict[str, tuple[str, str, str]]: + """Fetch OG tags for multiple URLs concurrently. + + Returns {url: (og_title, og_description, date)} for each input URL. + """ + from concurrent.futures import ThreadPoolExecutor, as_completed + + if not urls: + return {} + results: dict[str, tuple[str, str, str]] = {} + with ThreadPoolExecutor(max_workers=min(len(urls), 8)) as pool: + futures = {pool.submit(_fetch_og, url): url for url in urls} + for fut in as_completed(futures): + results[futures[fut]] = fut.result() + return results + + # -- YouTube InnerTube search (blocking) ------------------------------------ def _extract_videos(obj: object, depth: int = 0) -> list[dict]: @@ -1753,26 +1770,41 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None: # Filter: only announce results that actually contain the keyword # Check title/URL first, then fall back to og:title/og:description kw_lower = keyword.lower() + + # Collect URLs that need OG enrichment (batch fetch) + urls_needing_og: set[str] = set() + for item in new_items: + title_l = item.get("title", "").lower() + url_l = item.get("url", "").lower() + if kw_lower in title_l or kw_lower in url_l: + # Title/URL match -- only need OG for date enrichment + if not item.get("date") and item.get("url"): + urls_needing_og.add(item["url"]) + elif item.get("url"): + # No title/URL match -- need OG for keyword fallback + urls_needing_og.add(item["url"]) + + og_cache: dict[str, tuple[str, str, str]] = {} + if urls_needing_og: + og_cache = await loop.run_in_executor( + None, _fetch_og_batch, list(urls_needing_og), + ) + matched = [] for item in new_items: title_l = item.get("title", "").lower() url_l = item.get("url", "").lower() if kw_lower in title_l or kw_lower in url_l: - # Fetch OG tags for date if backend didn't provide one if not item.get("date") and item.get("url"): - _, _, og_date = await loop.run_in_executor( - None, _fetch_og, item["url"], - ) + _, _, og_date = og_cache.get(item["url"], ("", "", "")) if og_date: item["date"] = og_date matched.append(item) continue - # Fetch OG tags for items that didn't match on title/URL + # Check OG tags for keyword match item_url = item.get("url", "") if item_url: - og_title, og_desc, og_date = await loop.run_in_executor( - None, _fetch_og, item_url, - ) + og_title, og_desc, og_date = og_cache.get(item_url, ("", "", "")) if (kw_lower in og_title.lower() or kw_lower in og_desc.lower()): if og_title and len(og_title) > len(item.get("title", "")): diff --git a/plugins/flaskpaste.py b/plugins/flaskpaste.py index 4791b05..e908360 100644 --- a/plugins/flaskpaste.py +++ b/plugins/flaskpaste.py @@ -34,14 +34,23 @@ def _has_client_cert() -> bool: return (_CERT_DIR / "derp.crt").exists() and (_CERT_DIR / "derp.key").exists() +_cached_ssl_ctx: ssl.SSLContext | None = None + + def _ssl_context() -> ssl.SSLContext: - """Build SSL context, loading client cert for mTLS if available.""" - ctx = ssl.create_default_context() - cert_path = _CERT_DIR / "derp.crt" - key_path = _CERT_DIR / "derp.key" - if cert_path.exists() and key_path.exists(): - ctx.load_cert_chain(str(cert_path), str(key_path)) - return ctx + """Build SSL context, loading client cert for mTLS if available. + + Cached at module level -- cert files are static at runtime. + """ + global _cached_ssl_ctx + if _cached_ssl_ctx is None: + ctx = ssl.create_default_context() + cert_path = _CERT_DIR / "derp.crt" + key_path = _CERT_DIR / "derp.key" + if cert_path.exists() and key_path.exists(): + ctx.load_cert_chain(str(cert_path), str(key_path)) + _cached_ssl_ctx = ctx + return _cached_ssl_ctx def _solve_pow(nonce: str, difficulty: int) -> int: diff --git a/pyproject.toml b/pyproject.toml index 01665b4..665aac2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ license = "MIT" dependencies = [ "maxminddb>=2.0", "PySocks>=1.7.1", + "urllib3[socks]>=2.0", ] [project.scripts] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..47fe5f1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +maxminddb>=2.0 +PySocks>=1.7.1 +urllib3[socks]>=2.0 diff --git a/src/derp/http.py b/src/derp/http.py index 3c159ec..b4f55ff 100644 --- a/src/derp/http.py +++ b/src/derp/http.py @@ -5,19 +5,50 @@ import logging import socket import ssl import time +import urllib.error import urllib.request import socks +import urllib3 from socks import SOCKS5 from sockshandler import SocksiPyConnectionS, SocksiPyHandler +from urllib3.contrib.socks import SOCKSProxyManager _PROXY_ADDR = "127.0.0.1" _PROXY_PORT = 1080 _MAX_RETRIES = 3 -_RETRY_ERRORS = (ssl.SSLError, ConnectionError, TimeoutError, OSError) +_RETRY_ERRORS = ( + ssl.SSLError, ConnectionError, TimeoutError, OSError, + urllib3.exceptions.HTTPError, +) _log = logging.getLogger(__name__) +# -- Connection pool (urllib3) ------------------------------------------------ + +_pool: SOCKSProxyManager | None = None + +# Allow redirects but no urllib3-level retries (we retry ourselves). +_POOL_RETRIES = urllib3.Retry( + total=10, connect=0, read=0, redirect=10, status=0, other=0, +) + + +def _get_pool() -> SOCKSProxyManager: + """Lazy-init the SOCKS5 connection pool.""" + global _pool + if _pool is None: + _pool = SOCKSProxyManager( + f"socks5h://{_PROXY_ADDR}:{_PROXY_PORT}/", + num_pools=20, + maxsize=4, + retries=_POOL_RETRIES, + ) + return _pool + + +# -- Legacy opener (for build_opener / context= callers) --------------------- + _default_opener: urllib.request.OpenerDirector | None = None @@ -52,12 +83,66 @@ class _ProxyHandler(SocksiPyHandler, urllib.request.HTTPSHandler): return self.do_open(build, req) +# -- Public HTTP interface --------------------------------------------------- + def urlopen(req, *, timeout=None, context=None, retries=None): """Proxy-aware drop-in for urllib.request.urlopen. + Uses connection pooling via urllib3 for default requests. + Falls back to legacy opener for custom SSL context. Retries on transient SSL/connection errors with exponential backoff. """ max_retries = retries if retries is not None else _MAX_RETRIES + + # Custom SSL context -> fall back to opener (rare: username.py only) + if context is not None: + return _urlopen_legacy(req, timeout=timeout, context=context, retries=max_retries) + + # Default path: pooled urllib3 + pool = _get_pool() + + if isinstance(req, str): + url, headers, body, method = req, {}, None, "GET" + else: + url = req.full_url + headers = dict(req.header_items()) + body = req.data + method = req.get_method() + + to = urllib3.Timeout(total=timeout) if timeout else urllib3.Timeout(total=30) + + for attempt in range(max_retries): + try: + resp = pool.request( + method, url, + headers=headers, + body=body, + timeout=to, + preload_content=False, + ) + if resp.status >= 400: + # Drain body so connection returns to pool, then raise + # urllib.error.HTTPError for backward compatibility. + resp.read() + raise urllib.error.HTTPError( + url, resp.status, resp.reason or "", + resp.headers, None, + ) + return resp + except urllib.error.HTTPError: + raise + except _RETRY_ERRORS as exc: + if attempt + 1 >= max_retries: + raise + delay = 2 ** attempt + _log.debug("urlopen retry %d/%d after %s: %s", + attempt + 1, max_retries, type(exc).__name__, exc) + time.sleep(delay) + + +def _urlopen_legacy(req, *, timeout=None, context=None, retries=None): + """Open URL through legacy opener (custom SSL context).""" + max_retries = retries if retries is not None else _MAX_RETRIES opener = _get_opener(context) kwargs = {} if timeout is not None: @@ -82,6 +167,8 @@ def build_opener(*handlers, context=None): return urllib.request.build_opener(proxy, *handlers) +# -- Raw TCP helpers (unchanged) --------------------------------------------- + def create_connection(address, *, timeout=None): """SOCKS5-proxied drop-in for socket.create_connection. diff --git a/tests/test_http.py b/tests/test_http.py index fc041dd..9519d29 100644 --- a/tests/test_http.py +++ b/tests/test_http.py @@ -1,6 +1,7 @@ """Tests for the SOCKS5 proxy HTTP/TCP module.""" import ssl +import urllib.error import urllib.request from unittest.mock import MagicMock, patch @@ -12,20 +13,46 @@ from derp.http import ( _PROXY_ADDR, _PROXY_PORT, _get_opener, + _get_pool, _ProxyHandler, build_opener, create_connection, + urlopen, ) @pytest.fixture(autouse=True) -def _reset_opener_cache(): - """Clear cached opener between tests.""" +def _reset_caches(): + """Clear cached opener and pool between tests.""" derp.http._default_opener = None + derp.http._pool = None yield derp.http._default_opener = None + derp.http._pool = None +# -- Connection pool --------------------------------------------------------- + +class TestConnectionPool: + def test_pool_lazy_init(self): + assert derp.http._pool is None + pool = _get_pool() + assert pool is not None + assert derp.http._pool is pool + + def test_pool_cached(self): + a = _get_pool() + b = _get_pool() + assert a is b + + def test_pool_is_socks_manager(self): + from urllib3.contrib.socks import SOCKSProxyManager + pool = _get_pool() + assert isinstance(pool, SOCKSProxyManager) + + +# -- Legacy opener ----------------------------------------------------------- + class TestProxyHandler: def test_uses_socks5(self): handler = _ProxyHandler() @@ -103,6 +130,106 @@ class TestOpenerCache: assert a is not b +# -- urlopen (pooled path) -------------------------------------------------- + +class TestUrlopen: + @patch.object(derp.http, "_get_pool") + def test_extracts_request_fields(self, mock_pool_fn): + pool = MagicMock() + resp = MagicMock() + resp.status = 200 + pool.request.return_value = resp + mock_pool_fn.return_value = pool + + req = urllib.request.Request( + "https://example.com/test", + headers={"X-Custom": "val"}, + method="POST", + ) + req.data = b"body" + urlopen(req, timeout=10) + + pool.request.assert_called_once() + call_kw = pool.request.call_args + assert call_kw[0][0] == "POST" + assert call_kw[0][1] == "https://example.com/test" + assert call_kw[1]["body"] == b"body" + + @patch.object(derp.http, "_get_pool") + def test_string_url(self, mock_pool_fn): + pool = MagicMock() + resp = MagicMock() + resp.status = 200 + pool.request.return_value = resp + mock_pool_fn.return_value = pool + + urlopen("https://example.com/") + call_args = pool.request.call_args + assert call_args[0] == ("GET", "https://example.com/") + + @patch.object(derp.http, "_get_pool") + def test_raises_http_error_on_4xx(self, mock_pool_fn): + pool = MagicMock() + resp = MagicMock() + resp.status = 404 + resp.reason = "Not Found" + resp.headers = {} + resp.read.return_value = b"" + pool.request.return_value = resp + mock_pool_fn.return_value = pool + + with pytest.raises(urllib.error.HTTPError) as exc_info: + urlopen("https://example.com/missing") + assert exc_info.value.code == 404 + + @patch.object(derp.http, "_get_pool") + def test_raises_http_error_on_5xx(self, mock_pool_fn): + pool = MagicMock() + resp = MagicMock() + resp.status = 500 + resp.reason = "Internal Server Error" + resp.headers = {} + resp.read.return_value = b"" + pool.request.return_value = resp + mock_pool_fn.return_value = pool + + with pytest.raises(urllib.error.HTTPError) as exc_info: + urlopen("https://example.com/error") + assert exc_info.value.code == 500 + + @patch.object(derp.http, "_get_pool") + def test_returns_response_on_2xx(self, mock_pool_fn): + pool = MagicMock() + resp = MagicMock() + resp.status = 200 + pool.request.return_value = resp + mock_pool_fn.return_value = pool + + result = urlopen("https://example.com/") + assert result is resp + + @patch.object(derp.http, "_get_pool") + def test_context_falls_back_to_opener(self, mock_pool_fn): + """Custom SSL context should use legacy opener, not pool.""" + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + with patch.object(derp.http, "_get_opener") as mock_opener_fn: + opener = MagicMock() + resp = MagicMock() + opener.open.return_value = resp + mock_opener_fn.return_value = opener + + result = urlopen("https://example.com/", context=ctx) + + mock_pool_fn.assert_not_called() + mock_opener_fn.assert_called_once_with(ctx) + assert result is resp + + +# -- create_connection ------------------------------------------------------- + class TestCreateConnection: @patch("derp.http.socks.socksocket") def test_sets_socks5_proxy(self, mock_cls):