Files
derp/plugins/alert.py
user 118cf0de21 fix: centralize retry logic in proxy transport layer
Add exponential-backoff retry (3 attempts) for transient SSL,
connection, timeout, and OS errors to all three proxy functions:
urlopen, create_connection, open_connection. Remove per-plugin
retry from alert.py since transport layer now handles it.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 18:55:21 +01:00

540 lines
17 KiB
Python

"""Plugin: keyword alert subscriptions across multiple platforms."""
from __future__ import annotations
import asyncio
import json
import re
import urllib.request
from datetime import datetime, timezone
from derp.http import urlopen as _urlopen
from derp.plugin import command, event
# -- Constants ---------------------------------------------------------------
_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,19}$")
_MAX_KEYWORD_LEN = 100
_MAX_SEEN = 200
_MAX_ANNOUNCE = 5
_DEFAULT_INTERVAL = 300
_MAX_INTERVAL = 3600
_FETCH_TIMEOUT = 15
_MAX_TITLE_LEN = 80
_MAX_SUBS = 20
_YT_SEARCH_URL = "https://www.youtube.com/youtubei/v1/search"
_YT_CLIENT_VERSION = "2.20250101.00.00"
_GQL_URL = "https://gql.twitch.tv/gql"
_GQL_CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko"
_SEARX_URL = "https://searx.mymx.me/search"
# -- Module-level tracking ---------------------------------------------------
_pollers: dict[str, asyncio.Task] = {}
_subscriptions: dict[str, dict] = {}
_errors: dict[str, int] = {}
# -- Pure helpers ------------------------------------------------------------
def _state_key(channel: str, name: str) -> str:
"""Build composite state key."""
return f"{channel}:{name}"
def _validate_name(name: str) -> bool:
"""Check name against allowed pattern."""
return bool(_NAME_RE.match(name))
def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str:
"""Truncate text with ellipsis if needed."""
if len(text) <= max_len:
return text
return text[: max_len - 3].rstrip() + "..."
# -- YouTube InnerTube search (blocking) ------------------------------------
def _extract_videos(obj: object, depth: int = 0) -> list[dict]:
"""Recursively walk YouTube JSON to find video results.
Finds all objects containing both 'videoId' and 'title' keys.
Resilient to YouTube rearranging wrapper layers.
"""
if depth > 20:
return []
results = []
if isinstance(obj, dict):
video_id = obj.get("videoId")
title_obj = obj.get("title")
if isinstance(video_id, str) and video_id and title_obj is not None:
if isinstance(title_obj, dict):
runs = title_obj.get("runs", [])
title = "".join(r.get("text", "") for r in runs if isinstance(r, dict))
elif isinstance(title_obj, str):
title = title_obj
else:
title = ""
if title:
results.append({
"id": video_id,
"title": title,
"url": f"https://www.youtube.com/watch?v={video_id}",
"extra": "",
})
for val in obj.values():
results.extend(_extract_videos(val, depth + 1))
elif isinstance(obj, list):
for item in obj:
results.extend(_extract_videos(item, depth + 1))
return results
def _search_youtube(keyword: str) -> list[dict]:
"""Search YouTube via InnerTube API. Blocking."""
payload = json.dumps({
"context": {
"client": {
"clientName": "WEB",
"clientVersion": _YT_CLIENT_VERSION,
},
},
"query": keyword,
}).encode()
req = urllib.request.Request(_YT_SEARCH_URL, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
raw = resp.read()
resp.close()
data = json.loads(raw)
videos = _extract_videos(data)
# Deduplicate by videoId (same video can appear in multiple sections)
seen_ids: set[str] = set()
unique: list[dict] = []
for v in videos:
if v["id"] not in seen_ids:
seen_ids.add(v["id"])
unique.append(v)
return unique
# -- Twitch GQL search (blocking) ------------------------------------------
def _search_twitch(keyword: str) -> list[dict]:
"""Search Twitch via public GQL. Blocking."""
query = (
'query{searchFor(userQuery:"'
+ keyword.replace("\\", "\\\\").replace('"', '\\"')
+ '",options:{targets:[{index:STREAM},{index:VOD}]})'
"{streams{items{id broadcaster{login displayName}title game{name}"
"viewersCount}}videos{items{id owner{login displayName}title"
" game{name}viewCount}}}}"
)
body = json.dumps({"query": query}).encode()
req = urllib.request.Request(_GQL_URL, data=body, method="POST")
req.add_header("Client-Id", _GQL_CLIENT_ID)
req.add_header("Content-Type", "application/json")
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
raw = resp.read()
resp.close()
data = json.loads(raw)
results: list[dict] = []
try:
search = data["data"]["searchFor"]
except (KeyError, TypeError):
return results
if not search:
return results
# Live streams
streams = search.get("streams") or {}
for item in streams.get("items") or []:
stream_id = str(item.get("id", ""))
if not stream_id:
continue
broadcaster = item.get("broadcaster") or {}
login = broadcaster.get("login", "")
display = broadcaster.get("displayName", login)
title = item.get("title", "")
game = (item.get("game") or {}).get("name", "")
line = f"{display} is live: {title}"
if game:
line += f" ({game})"
results.append({
"id": f"stream:{stream_id}",
"title": line,
"url": f"https://twitch.tv/{login}",
"extra": "",
})
# VODs
videos = search.get("videos") or {}
for item in videos.get("items") or []:
vod_id = str(item.get("id", ""))
if not vod_id:
continue
title = item.get("title", "")
results.append({
"id": f"vod:{vod_id}",
"title": title,
"url": f"https://twitch.tv/videos/{vod_id}",
"extra": "",
})
return results
# -- SearXNG search (blocking) ----------------------------------------------
def _search_searx(keyword: str) -> list[dict]:
"""Search SearXNG. Blocking."""
import urllib.parse
params = urllib.parse.urlencode({"q": keyword, "format": "json"})
url = f"{_SEARX_URL}?{params}"
req = urllib.request.Request(url, method="GET")
resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT)
raw = resp.read()
resp.close()
data = json.loads(raw)
results: list[dict] = []
for item in data.get("results", []):
item_url = item.get("url", "")
title = item.get("title", "")
results.append({
"id": item_url,
"title": title,
"url": item_url,
"extra": "",
})
return results
# -- Backend registry -------------------------------------------------------
_BACKENDS: dict[str, callable] = {
"yt": _search_youtube,
"tw": _search_twitch,
"sx": _search_searx,
}
# -- State helpers -----------------------------------------------------------
def _save(bot, key: str, data: dict) -> None:
"""Persist subscription data to bot.state."""
bot.state.set("alert", key, json.dumps(data))
def _load(bot, key: str) -> dict | None:
"""Load subscription data from bot.state."""
raw = bot.state.get("alert", key)
if raw is None:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return None
def _delete(bot, key: str) -> None:
"""Remove subscription data from bot.state."""
bot.state.delete("alert", key)
# -- Polling -----------------------------------------------------------------
async def _poll_once(bot, key: str, announce: bool = True) -> None:
"""Single poll cycle for one alert subscription (all backends)."""
data = _subscriptions.get(key)
if data is None:
data = _load(bot, key)
if data is None:
return
_subscriptions[key] = data
keyword = data["keyword"]
now = datetime.now(timezone.utc).isoformat()
data["last_poll"] = now
had_error = False
loop = asyncio.get_running_loop()
for tag, backend in _BACKENDS.items():
try:
items = await loop.run_in_executor(None, backend, keyword)
except Exception as exc:
data["last_error"] = f"{tag}: {exc}"
had_error = True
continue
seen_set = set(data.get("seen", {}).get(tag, []))
seen_list = list(data.get("seen", {}).get(tag, []))
new_items = [item for item in items if item["id"] not in seen_set]
if announce and new_items:
channel = data["channel"]
name = data["name"]
shown = new_items[:_MAX_ANNOUNCE]
for item in shown:
title = _truncate(item["title"]) if item["title"] else "(no title)"
url = item["url"]
line = f"[{name}/{tag}] {title}"
if url:
line += f" -- {url}"
await bot.send(channel, line)
remaining = len(new_items) - len(shown)
if remaining > 0:
await bot.send(channel, f"[{name}/{tag}] ... and {remaining} more")
for item in new_items:
seen_list.append(item["id"])
if len(seen_list) > _MAX_SEEN:
seen_list = seen_list[-_MAX_SEEN:]
data.setdefault("seen", {})[tag] = seen_list
if had_error:
_errors[key] = _errors.get(key, 0) + 1
else:
data["last_error"] = ""
_errors[key] = 0
_subscriptions[key] = data
_save(bot, key, data)
async def _poll_loop(bot, key: str) -> None:
"""Infinite poll loop for one alert subscription."""
try:
while True:
data = _subscriptions.get(key) or _load(bot, key)
if data is None:
return
interval = data.get("interval", _DEFAULT_INTERVAL)
errs = _errors.get(key, 0)
if errs >= 5:
interval = min(interval * 2, _MAX_INTERVAL)
await asyncio.sleep(interval)
await _poll_once(bot, key, announce=True)
except asyncio.CancelledError:
pass
def _start_poller(bot, key: str) -> None:
"""Create and track a poller task."""
existing = _pollers.get(key)
if existing and not existing.done():
return
task = asyncio.create_task(_poll_loop(bot, key))
_pollers[key] = task
def _stop_poller(key: str) -> None:
"""Cancel and remove a poller task."""
task = _pollers.pop(key, None)
if task and not task.done():
task.cancel()
_subscriptions.pop(key, None)
_errors.pop(key, 0)
# -- Restore on connect -----------------------------------------------------
def _restore(bot) -> None:
"""Rebuild pollers from persisted state."""
for key in bot.state.keys("alert"):
existing = _pollers.get(key)
if existing and not existing.done():
continue
data = _load(bot, key)
if data is None:
continue
_subscriptions[key] = data
_start_poller(bot, key)
@event("001")
async def on_connect(bot, message):
"""Restore alert subscription pollers on connect."""
_restore(bot)
# -- Command handler ---------------------------------------------------------
@command("alert", help="Alert: !alert add|del|list|check")
async def cmd_alert(bot, message):
"""Per-channel keyword alert subscriptions across platforms.
Usage:
!alert add <name> <keyword...> Add keyword alert (admin)
!alert del <name> Remove alert (admin)
!alert list List alerts
!alert check <name> Force-poll now
"""
parts = message.text.split(None, 3)
if len(parts) < 2:
await bot.reply(message, "Usage: !alert <add|del|list|check> [args]")
return
sub = parts[1].lower()
# -- list (any user, channel only) ----------------------------------------
if sub == "list":
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
channel = message.target
prefix = f"{channel}:"
subs = []
for key in bot.state.keys("alert"):
if key.startswith(prefix):
data = _load(bot, key)
if data:
name = data["name"]
err = data.get("last_error", "")
if err:
subs.append(f"{name} (error)")
else:
subs.append(name)
if not subs:
await bot.reply(message, "No alerts in this channel")
return
await bot.reply(message, f"Alerts: {', '.join(subs)}")
return
# -- check (any user, channel only) ---------------------------------------
if sub == "check":
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 3:
await bot.reply(message, "Usage: !alert check <name>")
return
name = parts[2].lower()
channel = message.target
key = _state_key(channel, name)
data = _load(bot, key)
if data is None:
await bot.reply(message, f"No alert '{name}' in this channel")
return
_subscriptions[key] = data
await _poll_once(bot, key, announce=True)
data = _subscriptions.get(key, data)
if data.get("last_error"):
await bot.reply(message, f"{name}: error -- {data['last_error']}")
else:
await bot.reply(message, f"{name}: checked")
return
# -- add (admin, channel only) -------------------------------------------
if sub == "add":
if not bot._is_admin(message):
await bot.reply(message, "Permission denied: add requires admin")
return
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 4:
await bot.reply(message, "Usage: !alert add <name> <keyword...>")
return
name = parts[2].lower()
keyword = parts[3]
if not _validate_name(name):
await bot.reply(
message,
"Invalid name (lowercase alphanumeric + hyphens, 1-20 chars)",
)
return
if len(keyword) > _MAX_KEYWORD_LEN:
await bot.reply(message, f"Keyword too long (max {_MAX_KEYWORD_LEN} chars)")
return
irc_channel = message.target
key = _state_key(irc_channel, name)
if _load(bot, key) is not None:
await bot.reply(message, f"Alert '{name}' already exists in this channel")
return
ch_prefix = f"{irc_channel}:"
count = sum(1 for k in bot.state.keys("alert") if k.startswith(ch_prefix))
if count >= _MAX_SUBS:
await bot.reply(message, f"Alert limit reached ({_MAX_SUBS})")
return
# Seed: query all backends to populate seen lists (prevents flood)
loop = asyncio.get_running_loop()
seen: dict[str, list[str]] = {}
for tag, backend in _BACKENDS.items():
try:
items = await loop.run_in_executor(None, backend, keyword)
ids = [item["id"] for item in items]
if len(ids) > _MAX_SEEN:
ids = ids[-_MAX_SEEN:]
seen[tag] = ids
except Exception:
seen[tag] = []
now = datetime.now(timezone.utc).isoformat()
data = {
"keyword": keyword,
"name": name,
"channel": irc_channel,
"interval": _DEFAULT_INTERVAL,
"added_by": message.nick,
"added_at": now,
"last_poll": now,
"last_error": "",
"seen": seen,
}
_save(bot, key, data)
_subscriptions[key] = data
_start_poller(bot, key)
counts = ", ".join(f"{len(seen.get(t, []))} {t}" for t in _BACKENDS)
await bot.reply(
message,
f"Alert '{name}' added for: {keyword} ({counts} existing)",
)
return
# -- del (admin, channel only) -------------------------------------------
if sub == "del":
if not bot._is_admin(message):
await bot.reply(message, "Permission denied: del requires admin")
return
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 3:
await bot.reply(message, "Usage: !alert del <name>")
return
name = parts[2].lower()
channel = message.target
key = _state_key(channel, name)
if _load(bot, key) is None:
await bot.reply(message, f"No alert '{name}' in this channel")
return
_stop_poller(key)
_delete(bot, key)
await bot.reply(message, f"Removed '{name}'")
return
await bot.reply(message, "Usage: !alert <add|del|list|check> [args]")