From 76301ac8f262d0e684e602ec736f343e48b4acad Mon Sep 17 00:00:00 2001 From: user Date: Tue, 17 Feb 2026 12:02:57 +0100 Subject: [PATCH] perf: concurrent fetches for multi-instance alert backends Add _fetch_many() helper using ThreadPoolExecutor to query instances in parallel. Refactors PeerTube, Mastodon, Lemmy, and SearXNG from sequential to concurrent fetches. Also adds retries parameter to derp.http.urlopen; multi-instance backends use retries=1 since instance redundancy already provides resilience. Worst-case wall time per backend drops from N*timeout to 1*timeout. Co-Authored-By: Claude Opus 4.6 --- plugins/alert.py | 215 +++++++++++++++++++++++++---------------------- src/derp/http.py | 9 +- 2 files changed, 119 insertions(+), 105 deletions(-) diff --git a/plugins/alert.py b/plugins/alert.py index af197fe..d93059a 100644 --- a/plugins/alert.py +++ b/plugins/alert.py @@ -83,6 +83,41 @@ _subscriptions: dict[str, dict] = {} _errors: dict[str, dict[str, int]] = {} _poll_count: dict[str, int] = {} +# -- Concurrent fetch helper ------------------------------------------------- + + +def _fetch_many(targets, *, build_req, timeout, parse): + """Fetch multiple URLs concurrently, return combined results. + + Args: + targets: iterable of labels (instance hostnames, categories, etc.) + build_req: callable(target) -> (urllib.request.Request, label_for_log) + timeout: per-request timeout in seconds + parse: callable(raw_bytes, target) -> list[dict] + + Returns combined list of parsed results (deduped by caller). + """ + from concurrent.futures import ThreadPoolExecutor, as_completed + + def _do(target): + req, label = build_req(target) + try: + resp = _urlopen(req, timeout=timeout, retries=1) + raw = resp.read() + resp.close() + return parse(raw, target) + except Exception as exc: + _log.debug("%s failed: %s", label, exc) + return [] + + results = [] + with ThreadPoolExecutor(max_workers=len(targets)) as pool: + futures = {pool.submit(_do, t): t for t in targets} + for fut in as_completed(futures): + results.extend(fut.result()) + return results + + # -- History database -------------------------------------------------------- _DB_PATH = Path("data/alert_history.db") @@ -451,49 +486,46 @@ _SEARX_CATEGORIES = ["general", "news", "videos", "social media"] def _search_searx(keyword: str) -> list[dict]: """Search SearXNG across multiple categories, filtered to last day. Blocking.""" import urllib.parse + from concurrent.futures import ThreadPoolExecutor, as_completed - results: list[dict] = [] - seen_urls: set[str] = set() - - for category in _SEARX_CATEGORIES: + def _do(category): params = urllib.parse.urlencode({ - "q": keyword, - "format": "json", - "categories": category, - "time_range": "day", + "q": keyword, "format": "json", + "categories": category, "time_range": "day", }) - url = f"{_SEARX_URL}?{params}" - - req = urllib.request.Request(url, method="GET") + req = urllib.request.Request(f"{_SEARX_URL}?{params}", method="GET") try: resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() except Exception as exc: _log.debug("searx category %s failed: %s", category, exc) - continue - + return [] try: data = json.loads(raw) except json.JSONDecodeError: - continue - + return [] + items = [] for item in data.get("results", []): item_url = item.get("url", "") - if not item_url or item_url in seen_urls: + if not item_url: continue - seen_urls.add(item_url) - title = item.get("title", "") - date = _parse_date(item.get("publishedDate") or "") - results.append({ + items.append({ "id": item_url, - "title": title, + "title": item.get("title", ""), "url": item_url, - "date": date, + "date": _parse_date(item.get("publishedDate") or ""), "extra": "", }) + return items - return results + results = [] + with ThreadPoolExecutor(max_workers=len(_SEARX_CATEGORIES)) as pool: + futures = {pool.submit(_do, c): c for c in _SEARX_CATEGORIES} + for fut in as_completed(futures): + results.extend(fut.result()) + seen: set[str] = set() + return [r for r in results if r["id"] not in seen and not seen.add(r["id"])] # -- Reddit search (blocking) ------------------------------------------------ @@ -546,56 +578,48 @@ def _search_mastodon(keyword: str) -> list[dict]: """Search Mastodon instances via public hashtag timeline. Blocking.""" import urllib.parse - # Sanitize keyword to alphanumeric for hashtag search hashtag = re.sub(r"[^a-zA-Z0-9]", "", keyword).lower() if not hashtag: return [] - results: list[dict] = [] - seen_urls: set[str] = set() + tag_path = urllib.parse.quote(hashtag, safe="") - for instance in _MASTODON_INSTANCES: - tag_url = ( - f"https://{instance}/api/v1/timelines/tag/" - f"{urllib.parse.quote(hashtag, safe='')}" - ) - req = urllib.request.Request(tag_url, method="GET") + def _build(instance): + url = f"https://{instance}/api/v1/timelines/tag/{tag_path}" + req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "derp-bot/1.0 (IRC keyword alert)") - try: - resp = _urlopen(req, timeout=_MASTODON_TAG_TIMEOUT) - raw = resp.read() - resp.close() - except Exception as exc: - _log.debug("mastodon %s failed: %s", instance, exc) - continue + return req, f"mastodon {instance}" + def _parse(raw, _instance): try: statuses = json.loads(raw) except json.JSONDecodeError: - continue - + return [] if not isinstance(statuses, list): - continue - + return [] + items = [] for status in statuses: status_url = status.get("url") or status.get("uri", "") - if not status_url or status_url in seen_urls: + if not status_url: continue - seen_urls.add(status_url) - acct = (status.get("account") or {}).get("acct", "") content = _strip_html(status.get("content", "")) title = f"@{acct}: {_truncate(content, 60)}" if acct else content - date = _parse_date(status.get("created_at", "")) - results.append({ + items.append({ "id": status_url, "title": title, "url": status_url, - "date": date, + "date": _parse_date(status.get("created_at", "")), "extra": "", }) + return items - return results + results = _fetch_many( + _MASTODON_INSTANCES, build_req=_build, + timeout=_MASTODON_TAG_TIMEOUT, parse=_parse, + ) + seen: set[str] = set() + return [r for r in results if r["id"] not in seen and not seen.add(r["id"])] # -- DuckDuckGo search (blocking) ------------------------------------------- @@ -808,49 +832,44 @@ def _search_peertube(keyword: str) -> list[dict]: """Search PeerTube instances via public API. Blocking.""" import urllib.parse - results: list[dict] = [] - seen_urls: set[str] = set() + params = urllib.parse.urlencode({ + "search": keyword, "count": "15", "sort": "-publishedAt", + }) - for instance in _PEERTUBE_INSTANCES: - params = urllib.parse.urlencode({ - "search": keyword, "count": "15", "sort": "-publishedAt", - }) - api_url = f"https://{instance}/api/v1/search/videos?{params}" - - req = urllib.request.Request(api_url, method="GET") + def _build(instance): + url = f"https://{instance}/api/v1/search/videos?{params}" + req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") - try: - resp = _urlopen(req, timeout=_PEERTUBE_TIMEOUT) - raw = resp.read() - resp.close() - except Exception as exc: - _log.debug("peertube %s failed: %s", instance, exc) - continue + return req, f"peertube {instance}" + def _parse(raw, _instance): try: data = json.loads(raw) except json.JSONDecodeError: - continue - + return [] + items = [] for video in data.get("data") or []: video_url = video.get("url", "") - if not video_url or video_url in seen_urls: + if not video_url: continue - seen_urls.add(video_url) - name = video.get("name", "") acct = (video.get("account") or {}).get("displayName", "") title = f"{acct}: {name}" if acct else name - date = _parse_date(video.get("publishedAt", "")) - results.append({ + items.append({ "id": video_url, "title": title, "url": video_url, - "date": date, + "date": _parse_date(video.get("publishedAt", "")), "extra": "", }) + return items - return results + results = _fetch_many( + _PEERTUBE_INSTANCES, build_req=_build, + timeout=_PEERTUBE_TIMEOUT, parse=_parse, + ) + seen: set[str] = set() + return [r for r in results if r["id"] not in seen and not seen.add(r["id"])] # -- Bluesky search (blocking) ---------------------------------------------- @@ -903,53 +922,47 @@ def _search_lemmy(keyword: str) -> list[dict]: """Search Lemmy instances via public API. Blocking.""" import urllib.parse - results: list[dict] = [] - seen_ids: set[str] = set() + params = urllib.parse.urlencode({ + "q": keyword, "type_": "Posts", "sort": "New", "limit": "25", + }) - for instance in _LEMMY_INSTANCES: - params = urllib.parse.urlencode({ - "q": keyword, "type_": "Posts", "sort": "New", "limit": "25", - }) - api_url = f"https://{instance}/api/v3/search?{params}" - - req = urllib.request.Request(api_url, method="GET") + def _build(instance): + url = f"https://{instance}/api/v3/search?{params}" + req = urllib.request.Request(url, method="GET") req.add_header("Accept", "application/json") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") - try: - resp = _urlopen(req, timeout=_LEMMY_TIMEOUT) - raw = resp.read() - resp.close() - except Exception as exc: - _log.debug("lemmy %s failed: %s", instance, exc) - continue + return req, f"lemmy {instance}" + def _parse(raw, _instance): try: data = json.loads(raw) except json.JSONDecodeError: - continue - + return [] + items = [] for entry in data.get("posts") or []: post = entry.get("post") or {} ap_id = post.get("ap_id", "") - if not ap_id or ap_id in seen_ids: + if not ap_id: continue - seen_ids.add(ap_id) - name = post.get("name", "") community = (entry.get("community") or {}).get("name", "") title = f"{community}: {name}" if community else name - date = _parse_date(post.get("published", "")) - # Use linked URL if present, otherwise the post's ap_id post_url = post.get("url") or ap_id - results.append({ + items.append({ "id": ap_id, "title": title, "url": post_url, - "date": date, + "date": _parse_date(post.get("published", "")), "extra": "", }) + return items - return results + results = _fetch_many( + _LEMMY_INSTANCES, build_req=_build, + timeout=_LEMMY_TIMEOUT, parse=_parse, + ) + seen: set[str] = set() + return [r for r in results if r["id"] not in seen and not seen.add(r["id"])] # -- Odysee/LBRY search (blocking) ------------------------------------------ diff --git a/src/derp/http.py b/src/derp/http.py index d275587..3c159ec 100644 --- a/src/derp/http.py +++ b/src/derp/http.py @@ -52,24 +52,25 @@ class _ProxyHandler(SocksiPyHandler, urllib.request.HTTPSHandler): return self.do_open(build, req) -def urlopen(req, *, timeout=None, context=None): +def urlopen(req, *, timeout=None, context=None, retries=None): """Proxy-aware drop-in for urllib.request.urlopen. Retries on transient SSL/connection errors with exponential backoff. """ + max_retries = retries if retries is not None else _MAX_RETRIES opener = _get_opener(context) kwargs = {} if timeout is not None: kwargs["timeout"] = timeout - for attempt in range(_MAX_RETRIES): + for attempt in range(max_retries): try: return opener.open(req, **kwargs) except _RETRY_ERRORS as exc: - if attempt + 1 >= _MAX_RETRIES: + if attempt + 1 >= max_retries: raise delay = 2 ** attempt _log.debug("urlopen retry %d/%d after %s: %s", - attempt + 1, _MAX_RETRIES, type(exc).__name__, exc) + attempt + 1, max_retries, type(exc).__name__, exc) time.sleep(delay)