"""Plugin: keyword alert subscriptions across multiple platforms.""" from __future__ import annotations import asyncio import hashlib import json import logging import re import sqlite3 import urllib.request from datetime import datetime, timezone from html.parser import HTMLParser from pathlib import Path from derp.http import urlopen as _urlopen from derp.plugin import command, event _log = logging.getLogger(__name__) # -- Constants --------------------------------------------------------------- _NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,19}$") _MAX_KEYWORD_LEN = 100 _MAX_SEEN = 200 _DEFAULT_INTERVAL = 300 _MAX_INTERVAL = 3600 _FETCH_TIMEOUT = 15 _MAX_TITLE_LEN = 80 _MAX_SUBS = 20 _YT_SEARCH_URL = "https://www.youtube.com/youtubei/v1/search" _YT_CLIENT_VERSION = "2.20250101.00.00" _GQL_URL = "https://gql.twitch.tv/gql" _GQL_CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko" _SEARX_URL = "https://searx.mymx.me/search" _REDDIT_SEARCH_URL = "https://old.reddit.com/search.json" _MASTODON_INSTANCES = [ "mastodon.social", "fosstodon.org", "hachyderm.io", "infosec.exchange", ] _MASTODON_TAG_TIMEOUT = 4 _DDG_URL = "https://html.duckduckgo.com/html/" _GOOGLE_NEWS_RSS = "https://news.google.com/rss/search" _KICK_SEARCH_URL = "https://kick.com/api/search" _DAILYMOTION_API = "https://api.dailymotion.com/videos" _PEERTUBE_INSTANCES = [ "videos.framasoft.org", "tilvids.com", "tube.tchncs.de", "diode.zone", ] _PEERTUBE_TIMEOUT = 4 _BLUESKY_SEARCH_URL = "https://public.api.bsky.app/xrpc/app.bsky.feed.searchPosts" _LEMMY_INSTANCES = [ "lemmy.ml", "lemmy.world", "programming.dev", "infosec.pub", ] _LEMMY_TIMEOUT = 4 _ODYSEE_API = "https://api.na-backend.odysee.com/api/v1/proxy" _ARCHIVE_SEARCH_URL = "https://archive.org/advancedsearch.php" _HN_SEARCH_URL = "https://hn.algolia.com/api/v1/search_by_date" _GITHUB_SEARCH_URL = "https://api.github.com/search/repositories" _WIKIPEDIA_API = "https://en.wikipedia.org/w/api.php" _STACKEXCHANGE_URL = "https://api.stackexchange.com/2.3/search" _GITLAB_SEARCH_URL = "https://gitlab.com/api/v4/projects" _NPM_SEARCH_URL = "https://registry.npmjs.org/-/v1/search" _PYPI_RSS_URL = "https://pypi.org/rss/updates.xml" _DOCKERHUB_SEARCH_URL = "https://hub.docker.com/v2/search/repositories/" _ARXIV_API = "https://export.arxiv.org/api/query" _LOBSTERS_SEARCH_URL = "https://lobste.rs/search" _DEVTO_API = "https://dev.to/api/articles" _MEDIUM_FEED_URL = "https://medium.com/feed/tag" _HUGGINGFACE_API = "https://huggingface.co/api/models" # -- Module-level tracking --------------------------------------------------- _pollers: dict[str, asyncio.Task] = {} _subscriptions: dict[str, dict] = {} _errors: dict[str, int] = {} # -- History database -------------------------------------------------------- _DB_PATH = Path("data/alert_history.db") _conn: sqlite3.Connection | None = None def _db() -> sqlite3.Connection: """Lazy-init the history database connection and schema.""" global _conn if _conn is not None: return _conn _DB_PATH.parent.mkdir(parents=True, exist_ok=True) _conn = sqlite3.connect(str(_DB_PATH)) _conn.execute(""" CREATE TABLE IF NOT EXISTS results ( id INTEGER PRIMARY KEY AUTOINCREMENT, channel TEXT NOT NULL, alert TEXT NOT NULL, backend TEXT NOT NULL, item_id TEXT NOT NULL, title TEXT NOT NULL, url TEXT NOT NULL, date TEXT NOT NULL DEFAULT '', found_at TEXT NOT NULL, short_id TEXT NOT NULL DEFAULT '' ) """) try: _conn.execute( "ALTER TABLE results ADD COLUMN short_id TEXT NOT NULL DEFAULT ''" ) except sqlite3.OperationalError: pass # column already exists _conn.execute( "CREATE INDEX IF NOT EXISTS idx_results_alert ON results(channel, alert)" ) _conn.execute( "CREATE INDEX IF NOT EXISTS idx_results_short_id ON results(short_id)" ) # Backfill short_id for rows that predate the column for row_id, backend, item_id in _conn.execute( "SELECT id, backend, item_id FROM results WHERE short_id = ''" ).fetchall(): _conn.execute( "UPDATE results SET short_id = ? WHERE id = ?", (_make_short_id(backend, item_id), row_id), ) _conn.commit() return _conn def _save_result(channel: str, alert: str, backend: str, item: dict) -> str: """Persist a matched result to the history database. Returns short_id.""" short_id = _make_short_id(backend, item.get("id", "")) db = _db() db.execute( "INSERT INTO results" " (channel, alert, backend, item_id, title, url, date, found_at, short_id)" " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ( channel, alert, backend, item.get("id", ""), item.get("title", ""), item.get("url", ""), item.get("date", ""), datetime.now(timezone.utc).isoformat(), short_id, ), ) db.commit() return short_id # -- Pure helpers ------------------------------------------------------------ def _state_key(channel: str, name: str) -> str: """Build composite state key.""" return f"{channel}:{name}" def _validate_name(name: str) -> bool: """Check name against allowed pattern.""" return bool(_NAME_RE.match(name)) def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str: """Truncate text with ellipsis if needed.""" if len(text) <= max_len: return text return text[: max_len - 3].rstrip() + "..." _DATE_PROPS = { "article:published_time", "og:article:published_time", "og:updated_time", "date", "dc.date", "dcterms.date", "sailthru.date", } class _OGParser(HTMLParser): """Extract og:title, og:description, and published date from tags.""" def __init__(self): super().__init__() self.og_title = "" self.og_description = "" self.published = "" def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: if tag != "meta": return attr_map = {k.lower(): (v or "") for k, v in attrs} prop = attr_map.get("property", "").lower() name = attr_map.get("name", "").lower() content = attr_map.get("content", "") if prop == "og:title": self.og_title = content elif prop == "og:description": self.og_description = content if not self.published and content: if prop in _DATE_PROPS or name in _DATE_PROPS: self.published = content _OG_TIMEOUT = 10 _OG_MAX_BYTES = 64 * 1024 # Only read first 64 KB (OG tags are in
) class _DDGParser(HTMLParser): """Extract search results from DuckDuckGo HTML lite page.""" def __init__(self): super().__init__() self.results: list[tuple[str, str]] = [] # (url, title) self._in_link = False self._url = "" self._title_parts: list[str] = [] def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: if tag != "a": return attr_map = dict(attrs) if "result__a" in (attr_map.get("class") or ""): self._in_link = True self._url = attr_map.get("href", "") self._title_parts = [] def handle_data(self, data: str) -> None: if self._in_link: self._title_parts.append(data) def handle_endtag(self, tag: str) -> None: if tag == "a" and self._in_link: self._in_link = False title = "".join(self._title_parts).strip() if self._url and title: self.results.append((self._url, title)) def _make_short_id(backend: str, item_id: str) -> str: """Deterministic 8-char base36 hash from backend:item_id.""" digest = hashlib.sha256(f"{backend}:{item_id}".encode()).digest() n = int.from_bytes(digest[:5], "big") chars = "0123456789abcdefghijklmnopqrstuvwxyz" parts = [] while n: n, r = divmod(n, 36) parts.append(chars[r]) return "".join(reversed(parts)) or "0" def _parse_date(raw: str) -> str: """Try to extract a YYYY-MM-DD date from a raw date string.""" m = re.search(r"\d{4}-\d{2}-\d{2}", raw) return m.group(0) if m else "" def _strip_html(text: str) -> str: """Remove HTML tags from text.""" return re.sub(r"<[^>]+>", "", text).strip() def _fetch_og(url: str) -> tuple[str, str, str]: """Fetch og:title, og:description, and published date from a URL. Returns (og_title, og_description, date). Empty strings on failure. """ try: req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_OG_TIMEOUT) raw = resp.read(_OG_MAX_BYTES) resp.close() html = raw.decode("utf-8", errors="replace") parser = _OGParser() parser.feed(html) date = _parse_date(parser.published) return parser.og_title, parser.og_description, date except Exception as exc: _log.debug("og fetch failed for %s: %s", url, exc) return "", "", "" # -- YouTube InnerTube search (blocking) ------------------------------------ def _extract_videos(obj: object, depth: int = 0) -> list[dict]: """Recursively walk YouTube JSON to find video results. Finds all objects containing both 'videoId' and 'title' keys. Resilient to YouTube rearranging wrapper layers. """ if depth > 20: return [] results = [] if isinstance(obj, dict): video_id = obj.get("videoId") title_obj = obj.get("title") if isinstance(video_id, str) and video_id and title_obj is not None: if isinstance(title_obj, dict): runs = title_obj.get("runs", []) title = "".join(r.get("text", "") for r in runs if isinstance(r, dict)) elif isinstance(title_obj, str): title = title_obj else: title = "" if title: # Extract relative publish time (e.g. "2 days ago") pub_obj = obj.get("publishedTimeText") date = "" if isinstance(pub_obj, dict): date = pub_obj.get("simpleText", "") elif isinstance(pub_obj, str): date = pub_obj results.append({ "id": video_id, "title": title, "url": f"https://www.youtube.com/watch?v={video_id}", "date": date, "extra": "", }) for val in obj.values(): results.extend(_extract_videos(val, depth + 1)) elif isinstance(obj, list): for item in obj: results.extend(_extract_videos(item, depth + 1)) return results def _search_youtube(keyword: str) -> list[dict]: """Search YouTube via InnerTube API. Blocking.""" payload = json.dumps({ "context": { "client": { "clientName": "WEB", "clientVersion": _YT_CLIENT_VERSION, }, }, "query": keyword, }).encode() req = urllib.request.Request(_YT_SEARCH_URL, data=payload, method="POST") req.add_header("Content-Type", "application/json") resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) videos = _extract_videos(data) # Deduplicate by videoId (same video can appear in multiple sections) seen_ids: set[str] = set() unique: list[dict] = [] for v in videos: if v["id"] not in seen_ids: seen_ids.add(v["id"]) unique.append(v) return unique # -- Twitch GQL search (blocking) ------------------------------------------ def _search_twitch(keyword: str) -> list[dict]: """Search Twitch via public GQL. Blocking.""" query = ( 'query{searchFor(userQuery:"' + keyword.replace("\\", "\\\\").replace('"', '\\"') + '",options:{targets:[{index:STREAM},{index:VOD}]})' "{streams{items{id broadcaster{login displayName}title game{name}" "viewersCount}}videos{items{id owner{login displayName}title" " game{name}viewCount}}}}" ) body = json.dumps({"query": query}).encode() req = urllib.request.Request(_GQL_URL, data=body, method="POST") req.add_header("Client-Id", _GQL_CLIENT_ID) req.add_header("Content-Type", "application/json") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) results: list[dict] = [] try: search = data["data"]["searchFor"] except (KeyError, TypeError): return results if not search: return results # Live streams streams = search.get("streams") or {} for item in streams.get("items") or []: stream_id = str(item.get("id", "")) if not stream_id: continue broadcaster = item.get("broadcaster") or {} login = broadcaster.get("login", "") display = broadcaster.get("displayName", login) title = item.get("title", "") game = (item.get("game") or {}).get("name", "") line = f"{display} is live: {title}" if game: line += f" ({game})" results.append({ "id": f"stream:{stream_id}", "title": line, "url": f"https://twitch.tv/{login}", "date": "", "extra": "", }) # VODs videos = search.get("videos") or {} for item in videos.get("items") or []: vod_id = str(item.get("id", "")) if not vod_id: continue title = item.get("title", "") results.append({ "id": f"vod:{vod_id}", "title": title, "url": f"https://twitch.tv/videos/{vod_id}", "date": "", "extra": "", }) return results # -- SearXNG search (blocking) ---------------------------------------------- _SEARX_CATEGORIES = ["general", "news", "videos", "social media"] def _search_searx(keyword: str) -> list[dict]: """Search SearXNG across multiple categories, filtered to last day. Blocking.""" import urllib.parse results: list[dict] = [] seen_urls: set[str] = set() for category in _SEARX_CATEGORIES: params = urllib.parse.urlencode({ "q": keyword, "format": "json", "categories": category, "time_range": "day", }) url = f"{_SEARX_URL}?{params}" req = urllib.request.Request(url, method="GET") try: resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() except Exception as exc: _log.debug("searx category %s failed: %s", category, exc) continue try: data = json.loads(raw) except json.JSONDecodeError: continue for item in data.get("results", []): item_url = item.get("url", "") if not item_url or item_url in seen_urls: continue seen_urls.add(item_url) title = item.get("title", "") date = _parse_date(item.get("publishedDate") or "") results.append({ "id": item_url, "title": title, "url": item_url, "date": date, "extra": "", }) return results # -- Reddit search (blocking) ------------------------------------------------ def _search_reddit(keyword: str) -> list[dict]: """Search Reddit via JSON API. Blocking.""" import urllib.parse params = urllib.parse.urlencode({ "q": keyword, "sort": "new", "limit": "25", "t": "week", }) url = f"{_REDDIT_SEARCH_URL}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "derp-bot/1.0 (IRC keyword alert)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) results: list[dict] = [] for child in (data.get("data") or {}).get("children") or []: post = child.get("data") or {} post_id = post.get("name", "") permalink = post.get("permalink", "") title = post.get("title", "") created = post.get("created_utc") date = "" if created: try: date = datetime.fromtimestamp( float(created), tz=timezone.utc, ).strftime("%Y-%m-%d") except (ValueError, OSError): pass results.append({ "id": post_id, "title": title, "url": f"https://www.reddit.com{permalink}" if permalink else "", "date": date, "extra": "", }) return results # -- Mastodon/Fediverse search (blocking) ----------------------------------- def _search_mastodon(keyword: str) -> list[dict]: """Search Mastodon instances via public hashtag timeline. Blocking.""" import urllib.parse # Sanitize keyword to alphanumeric for hashtag search hashtag = re.sub(r"[^a-zA-Z0-9]", "", keyword).lower() if not hashtag: return [] results: list[dict] = [] seen_urls: set[str] = set() for instance in _MASTODON_INSTANCES: tag_url = ( f"https://{instance}/api/v1/timelines/tag/" f"{urllib.parse.quote(hashtag, safe='')}" ) req = urllib.request.Request(tag_url, method="GET") req.add_header("User-Agent", "derp-bot/1.0 (IRC keyword alert)") try: resp = _urlopen(req, timeout=_MASTODON_TAG_TIMEOUT) raw = resp.read() resp.close() except Exception as exc: _log.debug("mastodon %s failed: %s", instance, exc) continue try: statuses = json.loads(raw) except json.JSONDecodeError: continue if not isinstance(statuses, list): continue for status in statuses: status_url = status.get("url") or status.get("uri", "") if not status_url or status_url in seen_urls: continue seen_urls.add(status_url) acct = (status.get("account") or {}).get("acct", "") content = _strip_html(status.get("content", "")) title = f"@{acct}: {_truncate(content, 60)}" if acct else content date = _parse_date(status.get("created_at", "")) results.append({ "id": status_url, "title": title, "url": status_url, "date": date, "extra": "", }) return results # -- DuckDuckGo search (blocking) ------------------------------------------- def _resolve_ddg_url(raw_url: str) -> str: """Resolve DuckDuckGo redirect URLs to actual target URLs.""" import urllib.parse if "duckduckgo.com/l/" in raw_url: parsed = urllib.parse.urlparse(raw_url) params = urllib.parse.parse_qs(parsed.query) uddg = params.get("uddg", []) if uddg: return uddg[0] # Strip leading // scheme-relative URLs if raw_url.startswith("//"): return "https:" + raw_url return raw_url def _search_duckduckgo(keyword: str) -> list[dict]: """Search DuckDuckGo via HTML lite endpoint. Blocking.""" import urllib.parse body = urllib.parse.urlencode({"q": keyword}).encode() req = urllib.request.Request(_DDG_URL, data=body, method="POST") req.add_header("Content-Type", "application/x-www-form-urlencoded") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() html = raw.decode("utf-8", errors="replace") parser = _DDGParser() parser.feed(html) results: list[dict] = [] seen_urls: set[str] = set() for raw_url, title in parser.results: url = _resolve_ddg_url(raw_url) if not url or url in seen_urls: continue seen_urls.add(url) results.append({ "id": url, "title": title, "url": url, "date": "", "extra": "", }) return results # -- Google News search (blocking) ------------------------------------------ def _search_google_news(keyword: str) -> list[dict]: """Search Google News via public RSS feed. Blocking.""" import urllib.parse import xml.etree.ElementTree as ET from email.utils import parsedate_to_datetime params = urllib.parse.urlencode({ "q": keyword, "hl": "en", "gl": "US", "ceid": "US:en", }) url = f"{_GOOGLE_NEWS_RSS}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() root = ET.fromstring(raw) results: list[dict] = [] for item in root.iter("item"): title = (item.findtext("title") or "").strip() link = (item.findtext("link") or "").strip() if not link: continue pub_date = item.findtext("pubDate") or "" date = "" if pub_date: try: dt = parsedate_to_datetime(pub_date) date = dt.strftime("%Y-%m-%d") except (ValueError, TypeError): date = _parse_date(pub_date) results.append({ "id": link, "title": title, "url": link, "date": date, "extra": "", }) return results # -- Kick search (blocking) ------------------------------------------------- def _search_kick(keyword: str) -> list[dict]: """Search Kick via public search API. Blocking.""" import urllib.parse params = urllib.parse.urlencode({"searched_word": keyword}) url = f"{_KICK_SEARCH_URL}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("Accept", "application/json") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) results: list[dict] = [] # Channels (may be live) for ch in data.get("channels") or []: slug = ch.get("slug", "") if not slug: continue username = (ch.get("user") or {}).get("username", slug) is_live = ch.get("isLive", False) title = f"{username} (live)" if is_live else username results.append({ "id": f"ch:{ch.get('id', slug)}", "title": title, "url": f"https://kick.com/{slug}", "date": "", "extra": "", }) # Livestreams livestreams = data.get("livestreams") or {} for stream in livestreams.get("tags") or []: stream_id = str(stream.get("id", "")) if not stream_id: continue session_title = stream.get("session_title", "") channel = stream.get("channel") or {} slug = channel.get("slug", "") viewers = stream.get("viewer_count", 0) title = session_title if viewers: title += f" ({viewers} viewers)" results.append({ "id": f"live:{stream_id}", "title": title, "url": f"https://kick.com/{slug}" if slug else "", "date": _parse_date(stream.get("start_time", "")), "extra": "", }) return results # -- Dailymotion search (blocking) ------------------------------------------ def _search_dailymotion(keyword: str) -> list[dict]: """Search Dailymotion via public API. Blocking.""" import urllib.parse params = urllib.parse.urlencode({ "search": keyword, "sort": "recent", "limit": "25", "fields": "id,title,url,created_time", }) url = f"{_DAILYMOTION_API}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) results: list[dict] = [] for item in data.get("list") or []: video_id = item.get("id", "") title = item.get("title", "") video_url = item.get("url", "") created = item.get("created_time") date = "" if created: try: date = datetime.fromtimestamp( int(created), tz=timezone.utc, ).strftime("%Y-%m-%d") except (ValueError, OSError): pass results.append({ "id": video_id, "title": title, "url": video_url, "date": date, "extra": "", }) return results # -- PeerTube search (blocking) --------------------------------------------- def _search_peertube(keyword: str) -> list[dict]: """Search PeerTube instances via public API. Blocking.""" import urllib.parse results: list[dict] = [] seen_urls: set[str] = set() for instance in _PEERTUBE_INSTANCES: params = urllib.parse.urlencode({ "search": keyword, "count": "15", "sort": "-publishedAt", }) api_url = f"https://{instance}/api/v1/search/videos?{params}" req = urllib.request.Request(api_url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") try: resp = _urlopen(req, timeout=_PEERTUBE_TIMEOUT) raw = resp.read() resp.close() except Exception as exc: _log.debug("peertube %s failed: %s", instance, exc) continue try: data = json.loads(raw) except json.JSONDecodeError: continue for video in data.get("data") or []: video_url = video.get("url", "") if not video_url or video_url in seen_urls: continue seen_urls.add(video_url) name = video.get("name", "") acct = (video.get("account") or {}).get("displayName", "") title = f"{acct}: {name}" if acct else name date = _parse_date(video.get("publishedAt", "")) results.append({ "id": video_url, "title": title, "url": video_url, "date": date, "extra": "", }) return results # -- Bluesky search (blocking) ---------------------------------------------- def _search_bluesky(keyword: str) -> list[dict]: """Search Bluesky via public search API. Blocking.""" import urllib.parse params = urllib.parse.urlencode({"q": keyword, "limit": "25", "sort": "latest"}) url = f"{_BLUESKY_SEARCH_URL}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("Accept", "application/json") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) results: list[dict] = [] for post in data.get("posts") or []: uri = post.get("uri", "") if not uri: continue # Extract rkey from at:// URI for web URL # URI format: at://did:plc:xxx/app.bsky.feed.post/rkey rkey = uri.rsplit("/", 1)[-1] if "/" in uri else "" author = post.get("author") or {} handle = author.get("handle", "") display = author.get("displayName") or handle record = post.get("record") or {} text = record.get("text", "") title = f"@{display}: {_truncate(text, 60)}" date = _parse_date(record.get("createdAt", "")) post_url = f"https://bsky.app/profile/{handle}/post/{rkey}" if handle else "" results.append({ "id": uri, "title": title, "url": post_url, "date": date, "extra": "", }) return results # -- Lemmy search (blocking) ------------------------------------------------ def _search_lemmy(keyword: str) -> list[dict]: """Search Lemmy instances via public API. Blocking.""" import urllib.parse results: list[dict] = [] seen_ids: set[str] = set() for instance in _LEMMY_INSTANCES: params = urllib.parse.urlencode({ "q": keyword, "type_": "Posts", "sort": "New", "limit": "25", }) api_url = f"https://{instance}/api/v3/search?{params}" req = urllib.request.Request(api_url, method="GET") req.add_header("Accept", "application/json") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") try: resp = _urlopen(req, timeout=_LEMMY_TIMEOUT) raw = resp.read() resp.close() except Exception as exc: _log.debug("lemmy %s failed: %s", instance, exc) continue try: data = json.loads(raw) except json.JSONDecodeError: continue for entry in data.get("posts") or []: post = entry.get("post") or {} ap_id = post.get("ap_id", "") if not ap_id or ap_id in seen_ids: continue seen_ids.add(ap_id) name = post.get("name", "") community = (entry.get("community") or {}).get("name", "") title = f"{community}: {name}" if community else name date = _parse_date(post.get("published", "")) # Use linked URL if present, otherwise the post's ap_id post_url = post.get("url") or ap_id results.append({ "id": ap_id, "title": title, "url": post_url, "date": date, "extra": "", }) return results # -- Odysee/LBRY search (blocking) ------------------------------------------ def _lbry_to_odysee_url(lbry_url: str) -> str: """Convert lbry:// URI to https://odysee.com/ web URL.""" if not lbry_url.startswith("lbry://"): return lbry_url return "https://odysee.com/" + lbry_url[7:].replace("#", ":") def _search_odysee(keyword: str) -> list[dict]: """Search Odysee/LBRY via JSON-RPC claim_search. Blocking.""" payload = json.dumps({ "jsonrpc": "2.0", "method": "claim_search", "params": { "text": keyword, "order_by": ["release_time"], "page_size": 25, "stream_types": ["video", "audio", "document"], }, "id": 1, }).encode() req = urllib.request.Request( f"{_ODYSEE_API}?m=claim_search", data=payload, method="POST", ) req.add_header("Content-Type", "application/json") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) results: list[dict] = [] for item in (data.get("result") or {}).get("items") or []: claim_id = item.get("claim_id", "") if not claim_id: continue value = item.get("value") or {} title = value.get("title", "") canonical = item.get("canonical_url", "") web_url = _lbry_to_odysee_url(canonical) # Use block timestamp for date (release_time can be bogus) timestamp = item.get("timestamp") date = "" if timestamp and isinstance(timestamp, int) and timestamp < 2000000000: try: date = datetime.fromtimestamp( timestamp, tz=timezone.utc, ).strftime("%Y-%m-%d") except (ValueError, OSError): pass results.append({ "id": claim_id, "title": title, "url": web_url, "date": date, "extra": "", }) return results # -- Archive.org search (blocking) ------------------------------------------ def _search_archive(keyword: str) -> list[dict]: """Search Archive.org via advanced search API. Blocking.""" import urllib.parse params = urllib.parse.urlencode({ "q": keyword, "output": "json", "rows": "25", "sort[]": "date desc", "fl[]": "identifier,title,date,mediatype", }) url = f"{_ARCHIVE_SEARCH_URL}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) results: list[dict] = [] for doc in (data.get("response") or {}).get("docs") or []: identifier = doc.get("identifier", "") if not identifier: continue title = doc.get("title", "") mediatype = doc.get("mediatype", "") if mediatype: title = f"[{mediatype}] {title}" date = _parse_date(doc.get("date", "")) results.append({ "id": identifier, "title": title, "url": f"https://archive.org/details/{identifier}", "date": date, "extra": "", }) return results # -- Hacker News search (blocking) ------------------------------------------ def _search_hackernews(keyword: str) -> list[dict]: """Search Hacker News via Algolia API, sorted by date. Blocking.""" import urllib.parse params = urllib.parse.urlencode({ "query": keyword, "tags": "story", "hitsPerPage": "25", }) url = f"{_HN_SEARCH_URL}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) results: list[dict] = [] for hit in data.get("hits") or []: object_id = hit.get("objectID", "") if not object_id: continue title = hit.get("title", "") # External URL if available, otherwise HN discussion link item_url = hit.get("url") or f"https://news.ycombinator.com/item?id={object_id}" date = _parse_date(hit.get("created_at", "")) points = hit.get("points") if points: title += f" ({points}pts)" results.append({ "id": object_id, "title": title, "url": item_url, "date": date, "extra": "", }) return results # -- GitHub search (blocking) ----------------------------------------------- def _search_github(keyword: str) -> list[dict]: """Search GitHub repositories via public API. Blocking.""" import urllib.parse params = urllib.parse.urlencode({ "q": keyword, "sort": "updated", "order": "desc", "per_page": "25", }) url = f"{_GITHUB_SEARCH_URL}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("Accept", "application/vnd.github+json") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) results: list[dict] = [] for repo in data.get("items") or []: repo_id = str(repo.get("id", "")) if not repo_id: continue full_name = repo.get("full_name", "") description = repo.get("description") or "" html_url = repo.get("html_url", "") stars = repo.get("stargazers_count", 0) title = full_name if description: title += f": {_truncate(description, 50)}" if stars: title += f" [{stars}*]" date = _parse_date(repo.get("updated_at", "")) results.append({ "id": repo_id, "title": title, "url": html_url, "date": date, "extra": "", }) return results # -- Wikipedia search (blocking) -------------------------------------------- def _search_wikipedia(keyword: str) -> list[dict]: """Search Wikipedia articles via public API. Blocking.""" import urllib.parse params = urllib.parse.urlencode({ "action": "query", "list": "search", "srsearch": keyword, "srlimit": "25", "format": "json", }) url = f"{_WIKIPEDIA_API}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) results: list[dict] = [] for item in (data.get("query") or {}).get("search") or []: title = item.get("title", "") pageid = str(item.get("pageid", "")) if not pageid: continue date = _parse_date(item.get("timestamp", "")) slug = title.replace(" ", "_") results.append({ "id": pageid, "title": title, "url": f"https://en.wikipedia.org/wiki/{slug}", "date": date, "extra": "", }) return results # -- Stack Exchange search (blocking) --------------------------------------- def _search_stackexchange(keyword: str) -> list[dict]: """Search Stack Overflow questions via public API. Blocking.""" import gzip import io import urllib.parse params = urllib.parse.urlencode({ "order": "desc", "sort": "creation", "intitle": keyword, "site": "stackoverflow", "pagesize": "25", }) url = f"{_STACKEXCHANGE_URL}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") req.add_header("Accept-Encoding", "gzip") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() try: raw = gzip.GzipFile(fileobj=io.BytesIO(raw)).read() except OSError: pass data = json.loads(raw) results: list[dict] = [] for item in data.get("items") or []: qid = str(item.get("question_id", "")) if not qid: continue title = _strip_html(item.get("title", "")) link = item.get("link", "") score = item.get("score", 0) if score: title += f" [{score}v]" created = item.get("creation_date") date = "" if created: try: date = datetime.fromtimestamp( int(created), tz=timezone.utc, ).strftime("%Y-%m-%d") except (ValueError, OSError): pass results.append({ "id": qid, "title": title, "url": link, "date": date, "extra": "", }) return results # -- GitLab search (blocking) ---------------------------------------------- def _search_gitlab(keyword: str) -> list[dict]: """Search GitLab projects via public API. Blocking.""" import urllib.parse params = urllib.parse.urlencode({ "search": keyword, "order_by": "updated_at", "sort": "desc", "per_page": "25", }) url = f"{_GITLAB_SEARCH_URL}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) results: list[dict] = [] for repo in data if isinstance(data, list) else []: rid = str(repo.get("id", "")) if not rid: continue name = repo.get("path_with_namespace", "") description = repo.get("description") or "" web_url = repo.get("web_url", "") stars = repo.get("star_count", 0) title = name if description: title += f": {_truncate(description, 50)}" if stars: title += f" [{stars}*]" date = _parse_date(repo.get("last_activity_at", "")) results.append({ "id": rid, "title": title, "url": web_url, "date": date, "extra": "", }) return results # -- npm search (blocking) ------------------------------------------------- def _search_npm(keyword: str) -> list[dict]: """Search npm packages via registry API. Blocking.""" import urllib.parse params = urllib.parse.urlencode({"text": keyword, "size": "25"}) url = f"{_NPM_SEARCH_URL}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) results: list[dict] = [] for obj in data.get("objects") or []: pkg = obj.get("package") or {} name = pkg.get("name", "") if not name: continue description = pkg.get("description") or "" version = pkg.get("version", "") links = pkg.get("links") or {} npm_url = links.get("npm", f"https://www.npmjs.com/package/{name}") title = f"{name}@{version}" if version else name if description: title += f": {_truncate(description, 50)}" date = _parse_date(pkg.get("date", "")) results.append({ "id": name, "title": title, "url": npm_url, "date": date, "extra": "", }) return results # -- PyPI search (blocking) ------------------------------------------------ def _search_pypi(keyword: str) -> list[dict]: """Search PyPI recent updates via RSS feed, filtered by keyword. Blocking.""" import xml.etree.ElementTree as ET req = urllib.request.Request(_PYPI_RSS_URL, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() root = ET.fromstring(raw) kw_lower = keyword.lower() results: list[dict] = [] for item in root.findall(".//item"): title = (item.findtext("title") or "").strip() link = (item.findtext("link") or "").strip() desc = (item.findtext("description") or "").strip() if not title or not link: continue if kw_lower not in title.lower() and kw_lower not in desc.lower(): continue pkg_name = title.split()[0] if title else "" display = title if desc: display += f": {_truncate(desc, 50)}" results.append({ "id": pkg_name or link, "title": display, "url": link, "date": "", "extra": "", }) return results # -- Docker Hub search (blocking) ------------------------------------------ def _search_dockerhub(keyword: str) -> list[dict]: """Search Docker Hub repositories via public API. Blocking.""" import urllib.parse params = urllib.parse.urlencode({"query": keyword, "page_size": "25"}) url = f"{_DOCKERHUB_SEARCH_URL}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) results: list[dict] = [] for item in data.get("results") or []: name = item.get("repo_name", "") if not name: continue description = item.get("short_description") or "" stars = item.get("star_count", 0) title = name if description: title += f": {_truncate(description, 50)}" if stars: title += f" [{stars}*]" hub_url = ( f"https://hub.docker.com/r/{name}" if "/" in name else f"https://hub.docker.com/_/{name}" ) results.append({ "id": name, "title": title, "url": hub_url, "date": "", "extra": "", }) return results # -- arXiv search (blocking) ----------------------------------------------- def _search_arxiv(keyword: str) -> list[dict]: """Search arXiv preprints via Atom API. Blocking.""" import urllib.parse import xml.etree.ElementTree as ET params = urllib.parse.urlencode({ "search_query": f"all:{keyword}", "sortBy": "submittedDate", "sortOrder": "descending", "max_results": "25", }) url = f"{_ARXIV_API}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() ns = {"a": "http://www.w3.org/2005/Atom"} root = ET.fromstring(raw) results: list[dict] = [] for entry in root.findall("a:entry", ns): entry_id = (entry.findtext("a:id", "", ns) or "").strip() title = (entry.findtext("a:title", "", ns) or "").strip() title = " ".join(title.split()) # collapse whitespace published = entry.findtext("a:published", "", ns) or "" link_url = "" for link in entry.findall("a:link", ns): if link.get("type") == "text/html": link_url = link.get("href", "") break if not link_url: link_url = entry_id arxiv_id = entry_id.rsplit("/abs/", 1)[-1] if "/abs/" in entry_id else entry_id date = _parse_date(published) if title: results.append({ "id": arxiv_id, "title": title, "url": link_url, "date": date, "extra": "", }) return results # -- Lobsters search (blocking) -------------------------------------------- class _LobstersParser(HTMLParser): """Extract story links from Lobsters search HTML.""" def __init__(self): super().__init__() self.results: list[tuple[str, str]] = [] self._in_link = False self._url = "" self._title_parts: list[str] = [] def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: if tag != "a": return attr_map = {k: (v or "") for k, v in attrs} cls = attr_map.get("class", "") if "u-url" in cls: self._in_link = True self._url = attr_map.get("href", "") self._title_parts = [] def handle_data(self, data: str) -> None: if self._in_link: self._title_parts.append(data) def handle_endtag(self, tag: str) -> None: if tag == "a" and self._in_link: self._in_link = False title = "".join(self._title_parts).strip() if self._url and title: self.results.append((self._url, title)) def _search_lobsters(keyword: str) -> list[dict]: """Search Lobsters stories via HTML search page. Blocking.""" import urllib.parse params = urllib.parse.urlencode({ "q": keyword, "what": "stories", "order": "newest", }) url = f"{_LOBSTERS_SEARCH_URL}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() html = raw.decode("utf-8", errors="replace") parser = _LobstersParser() parser.feed(html) results: list[dict] = [] seen_urls: set[str] = set() for item_url, title in parser.results: if item_url in seen_urls: continue seen_urls.add(item_url) results.append({ "id": item_url, "title": title, "url": item_url, "date": "", "extra": "", }) return results # -- DEV.to search (blocking) ---------------------------------------------- def _search_devto(keyword: str) -> list[dict]: """Search DEV.to articles via public articles API. Blocking.""" import urllib.parse tag = re.sub(r"[^a-zA-Z0-9]", "", keyword).lower() params = urllib.parse.urlencode({"per_page": "25", "tag": tag}) url = f"{_DEVTO_API}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) if not isinstance(data, list): return [] results: list[dict] = [] for item in data: article_id = str(item.get("id", "")) if not article_id: continue title = item.get("title", "") article_url = item.get("url", "") user = item.get("user", {}) if isinstance(user, dict): author = user.get("username", "") else: author = "" if author: title = f"{author}: {title}" date = _parse_date(item.get("published_at", "")) results.append({ "id": article_id, "title": title, "url": article_url, "date": date, "extra": "", }) return results # -- Medium tag feed search (blocking) ------------------------------------- def _search_medium(keyword: str) -> list[dict]: """Search Medium via tag RSS feed. Blocking.""" import urllib.parse import xml.etree.ElementTree as ET tag = re.sub(r"[^a-zA-Z0-9-]", "-", keyword).lower().strip("-") if not tag: return [] url = f"{_MEDIUM_FEED_URL}/{urllib.parse.quote(tag, safe='')}" req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() root = ET.fromstring(raw) results: list[dict] = [] for item in root.iter("item"): title = (item.findtext("title") or "").strip() link = (item.findtext("link") or "").strip() if not link: continue guid = (item.findtext("guid") or link).strip() creator = item.findtext("{http://purl.org/dc/elements/1.1/}creator") or "" if creator: title = f"{creator}: {title}" pub_date = item.findtext("pubDate") or "" date = _parse_date(pub_date) if not date and pub_date: from email.utils import parsedate_to_datetime try: dt = parsedate_to_datetime(pub_date) date = dt.strftime("%Y-%m-%d") except (ValueError, TypeError): pass results.append({ "id": guid, "title": title, "url": link, "date": date, "extra": "", }) return results # -- Hugging Face search (blocking) ---------------------------------------- def _search_huggingface(keyword: str) -> list[dict]: """Search Hugging Face models via public API. Blocking.""" import urllib.parse params = urllib.parse.urlencode({ "search": keyword, "sort": "lastModified", "direction": "-1", "limit": "25", }) url = f"{_HUGGINGFACE_API}?{params}" req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)") resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) results: list[dict] = [] for model in data if isinstance(data, list) else []: model_id = model.get("modelId") or model.get("id", "") if not model_id: continue downloads = model.get("downloads", 0) likes = model.get("likes", 0) title = model_id if downloads: title += f" [{downloads} dl]" elif likes: title += f" [{likes} likes]" date = _parse_date(model.get("lastModified", "")) results.append({ "id": model_id, "title": title, "url": f"https://huggingface.co/{model_id}", "date": date, "extra": "", }) return results # -- Backend registry ------------------------------------------------------- _BACKENDS: dict[str, callable] = { "yt": _search_youtube, "tw": _search_twitch, "sx": _search_searx, "rd": _search_reddit, "ft": _search_mastodon, "dg": _search_duckduckgo, "gn": _search_google_news, "kk": _search_kick, "dm": _search_dailymotion, "pt": _search_peertube, "bs": _search_bluesky, "ly": _search_lemmy, "od": _search_odysee, "ia": _search_archive, "hn": _search_hackernews, "gh": _search_github, "wp": _search_wikipedia, "se": _search_stackexchange, "gl": _search_gitlab, "nm": _search_npm, "pp": _search_pypi, "dh": _search_dockerhub, "ax": _search_arxiv, "lb": _search_lobsters, "dv": _search_devto, "md": _search_medium, "hf": _search_huggingface, } # -- State helpers ----------------------------------------------------------- def _save(bot, key: str, data: dict) -> None: """Persist subscription data to bot.state.""" bot.state.set("alert", key, json.dumps(data)) def _load(bot, key: str) -> dict | None: """Load subscription data from bot.state.""" raw = bot.state.get("alert", key) if raw is None: return None try: return json.loads(raw) except json.JSONDecodeError: return None def _delete(bot, key: str) -> None: """Remove subscription data from bot.state.""" bot.state.delete("alert", key) # -- Polling ----------------------------------------------------------------- async def _poll_once(bot, key: str, announce: bool = True) -> None: """Single poll cycle for one alert subscription (all backends).""" data = _subscriptions.get(key) if data is None: data = _load(bot, key) if data is None: return _subscriptions[key] = data keyword = data["keyword"] now = datetime.now(timezone.utc).isoformat() data["last_poll"] = now had_error = False loop = asyncio.get_running_loop() for tag, backend in _BACKENDS.items(): try: items = await loop.run_in_executor(None, backend, keyword) except Exception as exc: data["last_error"] = f"{tag}: {exc}" had_error = True continue seen_set = set(data.get("seen", {}).get(tag, [])) seen_list = list(data.get("seen", {}).get(tag, [])) new_items = [item for item in items if item["id"] not in seen_set] # Filter: only announce results that actually contain the keyword # Check title/URL first, then fall back to og:title/og:description kw_lower = keyword.lower() matched = [] for item in new_items: title_l = item.get("title", "").lower() url_l = item.get("url", "").lower() if kw_lower in title_l or kw_lower in url_l: # Fetch OG tags for date if backend didn't provide one if not item.get("date") and item.get("url"): _, _, og_date = await loop.run_in_executor( None, _fetch_og, item["url"], ) if og_date: item["date"] = og_date matched.append(item) continue # Fetch OG tags for items that didn't match on title/URL item_url = item.get("url", "") if item_url: og_title, og_desc, og_date = await loop.run_in_executor( None, _fetch_og, item_url, ) if (kw_lower in og_title.lower() or kw_lower in og_desc.lower()): if og_title and len(og_title) > len(item.get("title", "")): item["title"] = og_title if og_date and not item.get("date"): item["date"] = og_date matched.append(item) if announce and matched: channel = data["channel"] name = data["name"] for item in matched: short_id = _save_result(channel, name, tag, item) title = _truncate(item["title"]) if item["title"] else "(no title)" url = item["url"] date = item.get("date", "") line = f"[{name}/{tag}/{short_id}]" if date: line += f" ({date})" line += f" {title}" if url: line += f" -- {url}" await bot.send(channel, line) for item in new_items: seen_list.append(item["id"]) if len(seen_list) > _MAX_SEEN: seen_list = seen_list[-_MAX_SEEN:] data.setdefault("seen", {})[tag] = seen_list if had_error: _errors[key] = _errors.get(key, 0) + 1 else: data["last_error"] = "" _errors[key] = 0 _subscriptions[key] = data _save(bot, key, data) async def _poll_loop(bot, key: str) -> None: """Infinite poll loop for one alert subscription.""" try: while True: data = _subscriptions.get(key) or _load(bot, key) if data is None: return interval = data.get("interval", _DEFAULT_INTERVAL) errs = _errors.get(key, 0) if errs >= 5: interval = min(interval * 2, _MAX_INTERVAL) await asyncio.sleep(interval) await _poll_once(bot, key, announce=True) except asyncio.CancelledError: pass def _start_poller(bot, key: str) -> None: """Create and track a poller task.""" existing = _pollers.get(key) if existing and not existing.done(): return task = asyncio.create_task(_poll_loop(bot, key)) _pollers[key] = task def _stop_poller(key: str) -> None: """Cancel and remove a poller task.""" task = _pollers.pop(key, None) if task and not task.done(): task.cancel() _subscriptions.pop(key, None) _errors.pop(key, 0) # -- Restore on connect ----------------------------------------------------- def _restore(bot) -> None: """Rebuild pollers from persisted state.""" for key in bot.state.keys("alert"): existing = _pollers.get(key) if existing and not existing.done(): continue data = _load(bot, key) if data is None: continue _subscriptions[key] = data _start_poller(bot, key) @event("001") async def on_connect(bot, message): """Restore alert subscription pollers on connect.""" _restore(bot) # -- Command handler --------------------------------------------------------- @command("alert", help="Alert: !alert add|del|list|check|info|history") async def cmd_alert(bot, message): """Per-channel keyword alert subscriptions across platforms. Usage: !alert add