Files
derp/plugins/alert.py
user 29e77f97b2 fix: route searx and alert SearXNG traffic through SOCKS5 proxy
Both plugins called urllib.request.urlopen directly, bypassing the
proxy. Switch to derp.http.urlopen and update the SearXNG endpoint
to the public domain (searx.mymx.me). Update test mocks to match.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 16:56:45 +01:00

540 lines
17 KiB
Python

"""Plugin: keyword alert subscriptions across multiple platforms."""
from __future__ import annotations
import asyncio
import json
import re
import urllib.request
from datetime import datetime, timezone
from derp.http import urlopen as _urlopen
from derp.plugin import command, event
# -- Constants ---------------------------------------------------------------
_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,19}$")
_MAX_KEYWORD_LEN = 100
_MAX_SEEN = 200
_MAX_ANNOUNCE = 5
_DEFAULT_INTERVAL = 300
_MAX_INTERVAL = 3600
_FETCH_TIMEOUT = 15
_MAX_TITLE_LEN = 80
_MAX_SUBS = 20
_YT_SEARCH_URL = "https://www.youtube.com/youtubei/v1/search"
_YT_CLIENT_VERSION = "2.20250101.00.00"
_GQL_URL = "https://gql.twitch.tv/gql"
_GQL_CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko"
_SEARX_URL = "https://searx.mymx.me/search"
# -- Module-level tracking ---------------------------------------------------
_pollers: dict[str, asyncio.Task] = {}
_subscriptions: dict[str, dict] = {}
_errors: dict[str, int] = {}
# -- Pure helpers ------------------------------------------------------------
def _state_key(channel: str, name: str) -> str:
"""Build composite state key."""
return f"{channel}:{name}"
def _validate_name(name: str) -> bool:
"""Check name against allowed pattern."""
return bool(_NAME_RE.match(name))
def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str:
"""Truncate text with ellipsis if needed."""
if len(text) <= max_len:
return text
return text[: max_len - 3].rstrip() + "..."
# -- YouTube InnerTube search (blocking) ------------------------------------
def _extract_videos(obj: object, depth: int = 0) -> list[dict]:
"""Recursively walk YouTube JSON to find video results.
Finds all objects containing both 'videoId' and 'title' keys.
Resilient to YouTube rearranging wrapper layers.
"""
if depth > 20:
return []
results = []
if isinstance(obj, dict):
video_id = obj.get("videoId")
title_obj = obj.get("title")
if isinstance(video_id, str) and video_id and title_obj is not None:
if isinstance(title_obj, dict):
runs = title_obj.get("runs", [])
title = "".join(r.get("text", "") for r in runs if isinstance(r, dict))
elif isinstance(title_obj, str):
title = title_obj
else:
title = ""
if title:
results.append({
"id": video_id,
"title": title,
"url": f"https://www.youtube.com/watch?v={video_id}",
"extra": "",
})
for val in obj.values():
results.extend(_extract_videos(val, depth + 1))
elif isinstance(obj, list):
for item in obj:
results.extend(_extract_videos(item, depth + 1))
return results
def _search_youtube(keyword: str) -> list[dict]:
"""Search YouTube via InnerTube API. Blocking."""
payload = json.dumps({
"context": {
"client": {
"clientName": "WEB",
"clientVersion": _YT_CLIENT_VERSION,
},
},
"query": keyword,
}).encode()
req = urllib.request.Request(_YT_SEARCH_URL, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
raw = resp.read()
resp.close()
data = json.loads(raw)
videos = _extract_videos(data)
# Deduplicate by videoId (same video can appear in multiple sections)
seen_ids: set[str] = set()
unique: list[dict] = []
for v in videos:
if v["id"] not in seen_ids:
seen_ids.add(v["id"])
unique.append(v)
return unique
# -- Twitch GQL search (blocking) ------------------------------------------
def _search_twitch(keyword: str) -> list[dict]:
"""Search Twitch via public GQL. Blocking."""
query = (
'query{searchFor(userQuery:"'
+ keyword.replace("\\", "\\\\").replace('"', '\\"')
+ '",options:{targets:[{index:STREAM},{index:VOD}]})'
"{streams{items{id broadcaster{login displayName}title game{name}"
"viewersCount}}videos{items{id owner{login displayName}title"
" game{name}viewCount}}}}"
)
body = json.dumps({"query": query}).encode()
req = urllib.request.Request(_GQL_URL, data=body, method="POST")
req.add_header("Client-Id", _GQL_CLIENT_ID)
req.add_header("Content-Type", "application/json")
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
raw = resp.read()
resp.close()
data = json.loads(raw)
results: list[dict] = []
try:
search = data["data"]["searchFor"]
except (KeyError, TypeError):
return results
if not search:
return results
# Live streams
streams = search.get("streams") or {}
for item in streams.get("items") or []:
stream_id = str(item.get("id", ""))
if not stream_id:
continue
broadcaster = item.get("broadcaster") or {}
login = broadcaster.get("login", "")
display = broadcaster.get("displayName", login)
title = item.get("title", "")
game = (item.get("game") or {}).get("name", "")
line = f"{display} is live: {title}"
if game:
line += f" ({game})"
results.append({
"id": f"stream:{stream_id}",
"title": line,
"url": f"https://twitch.tv/{login}",
"extra": "",
})
# VODs
videos = search.get("videos") or {}
for item in videos.get("items") or []:
vod_id = str(item.get("id", ""))
if not vod_id:
continue
title = item.get("title", "")
results.append({
"id": f"vod:{vod_id}",
"title": title,
"url": f"https://twitch.tv/videos/{vod_id}",
"extra": "",
})
return results
# -- SearXNG search (blocking) ----------------------------------------------
def _search_searx(keyword: str) -> list[dict]:
"""Search SearXNG. Blocking."""
import urllib.parse
params = urllib.parse.urlencode({"q": keyword, "format": "json"})
url = f"{_SEARX_URL}?{params}"
req = urllib.request.Request(url, method="GET")
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
raw = resp.read()
resp.close()
data = json.loads(raw)
results: list[dict] = []
for item in data.get("results", []):
item_url = item.get("url", "")
title = item.get("title", "")
results.append({
"id": item_url,
"title": title,
"url": item_url,
"extra": "",
})
return results
# -- Backend registry -------------------------------------------------------
_BACKENDS: dict[str, callable] = {
"yt": _search_youtube,
"tw": _search_twitch,
"sx": _search_searx,
}
# -- State helpers -----------------------------------------------------------
def _save(bot, key: str, data: dict) -> None:
"""Persist subscription data to bot.state."""
bot.state.set("alert", key, json.dumps(data))
def _load(bot, key: str) -> dict | None:
"""Load subscription data from bot.state."""
raw = bot.state.get("alert", key)
if raw is None:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return None
def _delete(bot, key: str) -> None:
"""Remove subscription data from bot.state."""
bot.state.delete("alert", key)
# -- Polling -----------------------------------------------------------------
async def _poll_once(bot, key: str, announce: bool = True) -> None:
"""Single poll cycle for one alert subscription (all backends)."""
data = _subscriptions.get(key)
if data is None:
data = _load(bot, key)
if data is None:
return
_subscriptions[key] = data
keyword = data["keyword"]
now = datetime.now(timezone.utc).isoformat()
data["last_poll"] = now
had_error = False
loop = asyncio.get_running_loop()
for tag, backend in _BACKENDS.items():
try:
items = await loop.run_in_executor(None, backend, keyword)
except Exception as exc:
data["last_error"] = f"{tag}: {exc}"
had_error = True
continue
seen_set = set(data.get("seen", {}).get(tag, []))
seen_list = list(data.get("seen", {}).get(tag, []))
new_items = [item for item in items if item["id"] not in seen_set]
if announce and new_items:
channel = data["channel"]
name = data["name"]
shown = new_items[:_MAX_ANNOUNCE]
for item in shown:
title = _truncate(item["title"]) if item["title"] else "(no title)"
url = item["url"]
line = f"[{name}/{tag}] {title}"
if url:
line += f" -- {url}"
await bot.send(channel, line)
remaining = len(new_items) - len(shown)
if remaining > 0:
await bot.send(channel, f"[{name}/{tag}] ... and {remaining} more")
for item in new_items:
seen_list.append(item["id"])
if len(seen_list) > _MAX_SEEN:
seen_list = seen_list[-_MAX_SEEN:]
data.setdefault("seen", {})[tag] = seen_list
if had_error:
_errors[key] = _errors.get(key, 0) + 1
else:
data["last_error"] = ""
_errors[key] = 0
_subscriptions[key] = data
_save(bot, key, data)
async def _poll_loop(bot, key: str) -> None:
"""Infinite poll loop for one alert subscription."""
try:
while True:
data = _subscriptions.get(key) or _load(bot, key)
if data is None:
return
interval = data.get("interval", _DEFAULT_INTERVAL)
errs = _errors.get(key, 0)
if errs >= 5:
interval = min(interval * 2, _MAX_INTERVAL)
await asyncio.sleep(interval)
await _poll_once(bot, key, announce=True)
except asyncio.CancelledError:
pass
def _start_poller(bot, key: str) -> None:
"""Create and track a poller task."""
existing = _pollers.get(key)
if existing and not existing.done():
return
task = asyncio.create_task(_poll_loop(bot, key))
_pollers[key] = task
def _stop_poller(key: str) -> None:
"""Cancel and remove a poller task."""
task = _pollers.pop(key, None)
if task and not task.done():
task.cancel()
_subscriptions.pop(key, None)
_errors.pop(key, 0)
# -- Restore on connect -----------------------------------------------------
def _restore(bot) -> None:
"""Rebuild pollers from persisted state."""
for key in bot.state.keys("alert"):
existing = _pollers.get(key)
if existing and not existing.done():
continue
data = _load(bot, key)
if data is None:
continue
_subscriptions[key] = data
_start_poller(bot, key)
@event("001")
async def on_connect(bot, message):
"""Restore alert subscription pollers on connect."""
_restore(bot)
# -- Command handler ---------------------------------------------------------
@command("alert", help="Alert: !alert add|del|list|check")
async def cmd_alert(bot, message):
"""Per-channel keyword alert subscriptions across platforms.
Usage:
!alert add <name> <keyword...> Add keyword alert (admin)
!alert del <name> Remove alert (admin)
!alert list List alerts
!alert check <name> Force-poll now
"""
parts = message.text.split(None, 3)
if len(parts) < 2:
await bot.reply(message, "Usage: !alert <add|del|list|check> [args]")
return
sub = parts[1].lower()
# -- list (any user, channel only) ----------------------------------------
if sub == "list":
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
channel = message.target
prefix = f"{channel}:"
subs = []
for key in bot.state.keys("alert"):
if key.startswith(prefix):
data = _load(bot, key)
if data:
name = data["name"]
err = data.get("last_error", "")
if err:
subs.append(f"{name} (error)")
else:
subs.append(name)
if not subs:
await bot.reply(message, "No alerts in this channel")
return
await bot.reply(message, f"Alerts: {', '.join(subs)}")
return
# -- check (any user, channel only) ---------------------------------------
if sub == "check":
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 3:
await bot.reply(message, "Usage: !alert check <name>")
return
name = parts[2].lower()
channel = message.target
key = _state_key(channel, name)
data = _load(bot, key)
if data is None:
await bot.reply(message, f"No alert '{name}' in this channel")
return
_subscriptions[key] = data
await _poll_once(bot, key, announce=True)
data = _subscriptions.get(key, data)
if data.get("last_error"):
await bot.reply(message, f"{name}: error -- {data['last_error']}")
else:
await bot.reply(message, f"{name}: checked")
return
# -- add (admin, channel only) -------------------------------------------
if sub == "add":
if not bot._is_admin(message):
await bot.reply(message, "Permission denied: add requires admin")
return
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 4:
await bot.reply(message, "Usage: !alert add <name> <keyword...>")
return
name = parts[2].lower()
keyword = parts[3]
if not _validate_name(name):
await bot.reply(
message,
"Invalid name (lowercase alphanumeric + hyphens, 1-20 chars)",
)
return
if len(keyword) > _MAX_KEYWORD_LEN:
await bot.reply(message, f"Keyword too long (max {_MAX_KEYWORD_LEN} chars)")
return
irc_channel = message.target
key = _state_key(irc_channel, name)
if _load(bot, key) is not None:
await bot.reply(message, f"Alert '{name}' already exists in this channel")
return
ch_prefix = f"{irc_channel}:"
count = sum(1 for k in bot.state.keys("alert") if k.startswith(ch_prefix))
if count >= _MAX_SUBS:
await bot.reply(message, f"Alert limit reached ({_MAX_SUBS})")
return
# Seed: query all backends to populate seen lists (prevents flood)
loop = asyncio.get_running_loop()
seen: dict[str, list[str]] = {}
for tag, backend in _BACKENDS.items():
try:
items = await loop.run_in_executor(None, backend, keyword)
ids = [item["id"] for item in items]
if len(ids) > _MAX_SEEN:
ids = ids[-_MAX_SEEN:]
seen[tag] = ids
except Exception:
seen[tag] = []
now = datetime.now(timezone.utc).isoformat()
data = {
"keyword": keyword,
"name": name,
"channel": irc_channel,
"interval": _DEFAULT_INTERVAL,
"added_by": message.nick,
"added_at": now,
"last_poll": now,
"last_error": "",
"seen": seen,
}
_save(bot, key, data)
_subscriptions[key] = data
_start_poller(bot, key)
counts = ", ".join(f"{len(seen.get(t, []))} {t}" for t in _BACKENDS)
await bot.reply(
message,
f"Alert '{name}' added for: {keyword} ({counts} existing)",
)
return
# -- del (admin, channel only) -------------------------------------------
if sub == "del":
if not bot._is_admin(message):
await bot.reply(message, "Permission denied: del requires admin")
return
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 3:
await bot.reply(message, "Usage: !alert del <name>")
return
name = parts[2].lower()
channel = message.target
key = _state_key(channel, name)
if _load(bot, key) is None:
await bot.reply(message, f"No alert '{name}' in this channel")
return
_stop_poller(key)
_delete(bot, key)
await bot.reply(message, f"Removed '{name}'")
return
await bot.reply(message, "Usage: !alert <add|del|list|check> [args]")