perf: concurrent fetches for multi-instance alert backends
Add _fetch_many() helper using ThreadPoolExecutor to query instances in parallel. Refactors PeerTube, Mastodon, Lemmy, and SearXNG from sequential to concurrent fetches. Also adds retries parameter to derp.http.urlopen; multi-instance backends use retries=1 since instance redundancy already provides resilience. Worst-case wall time per backend drops from N*timeout to 1*timeout. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
215
plugins/alert.py
215
plugins/alert.py
@@ -83,6 +83,41 @@ _subscriptions: dict[str, dict] = {}
|
||||
_errors: dict[str, dict[str, int]] = {}
|
||||
_poll_count: dict[str, int] = {}
|
||||
|
||||
# -- Concurrent fetch helper -------------------------------------------------
|
||||
|
||||
|
||||
def _fetch_many(targets, *, build_req, timeout, parse):
|
||||
"""Fetch multiple URLs concurrently, return combined results.
|
||||
|
||||
Args:
|
||||
targets: iterable of labels (instance hostnames, categories, etc.)
|
||||
build_req: callable(target) -> (urllib.request.Request, label_for_log)
|
||||
timeout: per-request timeout in seconds
|
||||
parse: callable(raw_bytes, target) -> list[dict]
|
||||
|
||||
Returns combined list of parsed results (deduped by caller).
|
||||
"""
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
def _do(target):
|
||||
req, label = build_req(target)
|
||||
try:
|
||||
resp = _urlopen(req, timeout=timeout, retries=1)
|
||||
raw = resp.read()
|
||||
resp.close()
|
||||
return parse(raw, target)
|
||||
except Exception as exc:
|
||||
_log.debug("%s failed: %s", label, exc)
|
||||
return []
|
||||
|
||||
results = []
|
||||
with ThreadPoolExecutor(max_workers=len(targets)) as pool:
|
||||
futures = {pool.submit(_do, t): t for t in targets}
|
||||
for fut in as_completed(futures):
|
||||
results.extend(fut.result())
|
||||
return results
|
||||
|
||||
|
||||
# -- History database --------------------------------------------------------
|
||||
|
||||
_DB_PATH = Path("data/alert_history.db")
|
||||
@@ -451,49 +486,46 @@ _SEARX_CATEGORIES = ["general", "news", "videos", "social media"]
|
||||
def _search_searx(keyword: str) -> list[dict]:
|
||||
"""Search SearXNG across multiple categories, filtered to last day. Blocking."""
|
||||
import urllib.parse
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
results: list[dict] = []
|
||||
seen_urls: set[str] = set()
|
||||
|
||||
for category in _SEARX_CATEGORIES:
|
||||
def _do(category):
|
||||
params = urllib.parse.urlencode({
|
||||
"q": keyword,
|
||||
"format": "json",
|
||||
"categories": category,
|
||||
"time_range": "day",
|
||||
"q": keyword, "format": "json",
|
||||
"categories": category, "time_range": "day",
|
||||
})
|
||||
url = f"{_SEARX_URL}?{params}"
|
||||
|
||||
req = urllib.request.Request(url, method="GET")
|
||||
req = urllib.request.Request(f"{_SEARX_URL}?{params}", method="GET")
|
||||
try:
|
||||
resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT)
|
||||
raw = resp.read()
|
||||
resp.close()
|
||||
except Exception as exc:
|
||||
_log.debug("searx category %s failed: %s", category, exc)
|
||||
continue
|
||||
|
||||
return []
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
return []
|
||||
items = []
|
||||
for item in data.get("results", []):
|
||||
item_url = item.get("url", "")
|
||||
if not item_url or item_url in seen_urls:
|
||||
if not item_url:
|
||||
continue
|
||||
seen_urls.add(item_url)
|
||||
title = item.get("title", "")
|
||||
date = _parse_date(item.get("publishedDate") or "")
|
||||
results.append({
|
||||
items.append({
|
||||
"id": item_url,
|
||||
"title": title,
|
||||
"title": item.get("title", ""),
|
||||
"url": item_url,
|
||||
"date": date,
|
||||
"date": _parse_date(item.get("publishedDate") or ""),
|
||||
"extra": "",
|
||||
})
|
||||
return items
|
||||
|
||||
return results
|
||||
results = []
|
||||
with ThreadPoolExecutor(max_workers=len(_SEARX_CATEGORIES)) as pool:
|
||||
futures = {pool.submit(_do, c): c for c in _SEARX_CATEGORIES}
|
||||
for fut in as_completed(futures):
|
||||
results.extend(fut.result())
|
||||
seen: set[str] = set()
|
||||
return [r for r in results if r["id"] not in seen and not seen.add(r["id"])]
|
||||
|
||||
|
||||
# -- Reddit search (blocking) ------------------------------------------------
|
||||
@@ -546,56 +578,48 @@ def _search_mastodon(keyword: str) -> list[dict]:
|
||||
"""Search Mastodon instances via public hashtag timeline. Blocking."""
|
||||
import urllib.parse
|
||||
|
||||
# Sanitize keyword to alphanumeric for hashtag search
|
||||
hashtag = re.sub(r"[^a-zA-Z0-9]", "", keyword).lower()
|
||||
if not hashtag:
|
||||
return []
|
||||
|
||||
results: list[dict] = []
|
||||
seen_urls: set[str] = set()
|
||||
tag_path = urllib.parse.quote(hashtag, safe="")
|
||||
|
||||
for instance in _MASTODON_INSTANCES:
|
||||
tag_url = (
|
||||
f"https://{instance}/api/v1/timelines/tag/"
|
||||
f"{urllib.parse.quote(hashtag, safe='')}"
|
||||
)
|
||||
req = urllib.request.Request(tag_url, method="GET")
|
||||
def _build(instance):
|
||||
url = f"https://{instance}/api/v1/timelines/tag/{tag_path}"
|
||||
req = urllib.request.Request(url, method="GET")
|
||||
req.add_header("User-Agent", "derp-bot/1.0 (IRC keyword alert)")
|
||||
try:
|
||||
resp = _urlopen(req, timeout=_MASTODON_TAG_TIMEOUT)
|
||||
raw = resp.read()
|
||||
resp.close()
|
||||
except Exception as exc:
|
||||
_log.debug("mastodon %s failed: %s", instance, exc)
|
||||
continue
|
||||
return req, f"mastodon {instance}"
|
||||
|
||||
def _parse(raw, _instance):
|
||||
try:
|
||||
statuses = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
return []
|
||||
if not isinstance(statuses, list):
|
||||
continue
|
||||
|
||||
return []
|
||||
items = []
|
||||
for status in statuses:
|
||||
status_url = status.get("url") or status.get("uri", "")
|
||||
if not status_url or status_url in seen_urls:
|
||||
if not status_url:
|
||||
continue
|
||||
seen_urls.add(status_url)
|
||||
|
||||
acct = (status.get("account") or {}).get("acct", "")
|
||||
content = _strip_html(status.get("content", ""))
|
||||
title = f"@{acct}: {_truncate(content, 60)}" if acct else content
|
||||
date = _parse_date(status.get("created_at", ""))
|
||||
results.append({
|
||||
items.append({
|
||||
"id": status_url,
|
||||
"title": title,
|
||||
"url": status_url,
|
||||
"date": date,
|
||||
"date": _parse_date(status.get("created_at", "")),
|
||||
"extra": "",
|
||||
})
|
||||
return items
|
||||
|
||||
return results
|
||||
results = _fetch_many(
|
||||
_MASTODON_INSTANCES, build_req=_build,
|
||||
timeout=_MASTODON_TAG_TIMEOUT, parse=_parse,
|
||||
)
|
||||
seen: set[str] = set()
|
||||
return [r for r in results if r["id"] not in seen and not seen.add(r["id"])]
|
||||
|
||||
|
||||
# -- DuckDuckGo search (blocking) -------------------------------------------
|
||||
@@ -808,49 +832,44 @@ def _search_peertube(keyword: str) -> list[dict]:
|
||||
"""Search PeerTube instances via public API. Blocking."""
|
||||
import urllib.parse
|
||||
|
||||
results: list[dict] = []
|
||||
seen_urls: set[str] = set()
|
||||
params = urllib.parse.urlencode({
|
||||
"search": keyword, "count": "15", "sort": "-publishedAt",
|
||||
})
|
||||
|
||||
for instance in _PEERTUBE_INSTANCES:
|
||||
params = urllib.parse.urlencode({
|
||||
"search": keyword, "count": "15", "sort": "-publishedAt",
|
||||
})
|
||||
api_url = f"https://{instance}/api/v1/search/videos?{params}"
|
||||
|
||||
req = urllib.request.Request(api_url, method="GET")
|
||||
def _build(instance):
|
||||
url = f"https://{instance}/api/v1/search/videos?{params}"
|
||||
req = urllib.request.Request(url, method="GET")
|
||||
req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)")
|
||||
try:
|
||||
resp = _urlopen(req, timeout=_PEERTUBE_TIMEOUT)
|
||||
raw = resp.read()
|
||||
resp.close()
|
||||
except Exception as exc:
|
||||
_log.debug("peertube %s failed: %s", instance, exc)
|
||||
continue
|
||||
return req, f"peertube {instance}"
|
||||
|
||||
def _parse(raw, _instance):
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
return []
|
||||
items = []
|
||||
for video in data.get("data") or []:
|
||||
video_url = video.get("url", "")
|
||||
if not video_url or video_url in seen_urls:
|
||||
if not video_url:
|
||||
continue
|
||||
seen_urls.add(video_url)
|
||||
|
||||
name = video.get("name", "")
|
||||
acct = (video.get("account") or {}).get("displayName", "")
|
||||
title = f"{acct}: {name}" if acct else name
|
||||
date = _parse_date(video.get("publishedAt", ""))
|
||||
results.append({
|
||||
items.append({
|
||||
"id": video_url,
|
||||
"title": title,
|
||||
"url": video_url,
|
||||
"date": date,
|
||||
"date": _parse_date(video.get("publishedAt", "")),
|
||||
"extra": "",
|
||||
})
|
||||
return items
|
||||
|
||||
return results
|
||||
results = _fetch_many(
|
||||
_PEERTUBE_INSTANCES, build_req=_build,
|
||||
timeout=_PEERTUBE_TIMEOUT, parse=_parse,
|
||||
)
|
||||
seen: set[str] = set()
|
||||
return [r for r in results if r["id"] not in seen and not seen.add(r["id"])]
|
||||
|
||||
|
||||
# -- Bluesky search (blocking) ----------------------------------------------
|
||||
@@ -903,53 +922,47 @@ def _search_lemmy(keyword: str) -> list[dict]:
|
||||
"""Search Lemmy instances via public API. Blocking."""
|
||||
import urllib.parse
|
||||
|
||||
results: list[dict] = []
|
||||
seen_ids: set[str] = set()
|
||||
params = urllib.parse.urlencode({
|
||||
"q": keyword, "type_": "Posts", "sort": "New", "limit": "25",
|
||||
})
|
||||
|
||||
for instance in _LEMMY_INSTANCES:
|
||||
params = urllib.parse.urlencode({
|
||||
"q": keyword, "type_": "Posts", "sort": "New", "limit": "25",
|
||||
})
|
||||
api_url = f"https://{instance}/api/v3/search?{params}"
|
||||
|
||||
req = urllib.request.Request(api_url, method="GET")
|
||||
def _build(instance):
|
||||
url = f"https://{instance}/api/v3/search?{params}"
|
||||
req = urllib.request.Request(url, method="GET")
|
||||
req.add_header("Accept", "application/json")
|
||||
req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)")
|
||||
try:
|
||||
resp = _urlopen(req, timeout=_LEMMY_TIMEOUT)
|
||||
raw = resp.read()
|
||||
resp.close()
|
||||
except Exception as exc:
|
||||
_log.debug("lemmy %s failed: %s", instance, exc)
|
||||
continue
|
||||
return req, f"lemmy {instance}"
|
||||
|
||||
def _parse(raw, _instance):
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
return []
|
||||
items = []
|
||||
for entry in data.get("posts") or []:
|
||||
post = entry.get("post") or {}
|
||||
ap_id = post.get("ap_id", "")
|
||||
if not ap_id or ap_id in seen_ids:
|
||||
if not ap_id:
|
||||
continue
|
||||
seen_ids.add(ap_id)
|
||||
|
||||
name = post.get("name", "")
|
||||
community = (entry.get("community") or {}).get("name", "")
|
||||
title = f"{community}: {name}" if community else name
|
||||
date = _parse_date(post.get("published", ""))
|
||||
# Use linked URL if present, otherwise the post's ap_id
|
||||
post_url = post.get("url") or ap_id
|
||||
results.append({
|
||||
items.append({
|
||||
"id": ap_id,
|
||||
"title": title,
|
||||
"url": post_url,
|
||||
"date": date,
|
||||
"date": _parse_date(post.get("published", "")),
|
||||
"extra": "",
|
||||
})
|
||||
return items
|
||||
|
||||
return results
|
||||
results = _fetch_many(
|
||||
_LEMMY_INSTANCES, build_req=_build,
|
||||
timeout=_LEMMY_TIMEOUT, parse=_parse,
|
||||
)
|
||||
seen: set[str] = set()
|
||||
return [r for r in results if r["id"] not in seen and not seen.add(r["id"])]
|
||||
|
||||
|
||||
# -- Odysee/LBRY search (blocking) ------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user