SearXNG instance at 192.168.122.119 is reachable via grokbox static route -- no need to tunnel through SOCKS5. Reverts searx and alert plugins to stdlib urlopen for SearXNG queries. YouTube and Twitch in alert.py still use the proxy. Also removes cprofile flag from docker-compose command. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
540 lines
17 KiB
Python
540 lines
17 KiB
Python
"""Plugin: keyword alert subscriptions across multiple platforms."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import re
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
|
|
from derp.http import urlopen as _urlopen
|
|
from derp.plugin import command, event
|
|
|
|
# -- Constants ---------------------------------------------------------------
|
|
|
|
_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,19}$")
|
|
_MAX_KEYWORD_LEN = 100
|
|
_MAX_SEEN = 200
|
|
_MAX_ANNOUNCE = 5
|
|
_DEFAULT_INTERVAL = 300
|
|
_MAX_INTERVAL = 3600
|
|
_FETCH_TIMEOUT = 15
|
|
_MAX_TITLE_LEN = 80
|
|
_MAX_SUBS = 20
|
|
_YT_SEARCH_URL = "https://www.youtube.com/youtubei/v1/search"
|
|
_YT_CLIENT_VERSION = "2.20250101.00.00"
|
|
_GQL_URL = "https://gql.twitch.tv/gql"
|
|
_GQL_CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko"
|
|
_SEARX_URL = "http://192.168.122.119:3000/search"
|
|
|
|
# -- Module-level tracking ---------------------------------------------------
|
|
|
|
_pollers: dict[str, asyncio.Task] = {}
|
|
_subscriptions: dict[str, dict] = {}
|
|
_errors: dict[str, int] = {}
|
|
|
|
|
|
# -- Pure helpers ------------------------------------------------------------
|
|
|
|
def _state_key(channel: str, name: str) -> str:
|
|
"""Build composite state key."""
|
|
return f"{channel}:{name}"
|
|
|
|
|
|
def _validate_name(name: str) -> bool:
|
|
"""Check name against allowed pattern."""
|
|
return bool(_NAME_RE.match(name))
|
|
|
|
|
|
def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str:
|
|
"""Truncate text with ellipsis if needed."""
|
|
if len(text) <= max_len:
|
|
return text
|
|
return text[: max_len - 3].rstrip() + "..."
|
|
|
|
|
|
# -- YouTube InnerTube search (blocking) ------------------------------------
|
|
|
|
def _extract_videos(obj: object, depth: int = 0) -> list[dict]:
|
|
"""Recursively walk YouTube JSON to find video results.
|
|
|
|
Finds all objects containing both 'videoId' and 'title' keys.
|
|
Resilient to YouTube rearranging wrapper layers.
|
|
"""
|
|
if depth > 20:
|
|
return []
|
|
results = []
|
|
if isinstance(obj, dict):
|
|
video_id = obj.get("videoId")
|
|
title_obj = obj.get("title")
|
|
if isinstance(video_id, str) and video_id and title_obj is not None:
|
|
if isinstance(title_obj, dict):
|
|
runs = title_obj.get("runs", [])
|
|
title = "".join(r.get("text", "") for r in runs if isinstance(r, dict))
|
|
elif isinstance(title_obj, str):
|
|
title = title_obj
|
|
else:
|
|
title = ""
|
|
if title:
|
|
results.append({
|
|
"id": video_id,
|
|
"title": title,
|
|
"url": f"https://www.youtube.com/watch?v={video_id}",
|
|
"extra": "",
|
|
})
|
|
for val in obj.values():
|
|
results.extend(_extract_videos(val, depth + 1))
|
|
elif isinstance(obj, list):
|
|
for item in obj:
|
|
results.extend(_extract_videos(item, depth + 1))
|
|
return results
|
|
|
|
|
|
def _search_youtube(keyword: str) -> list[dict]:
|
|
"""Search YouTube via InnerTube API. Blocking."""
|
|
payload = json.dumps({
|
|
"context": {
|
|
"client": {
|
|
"clientName": "WEB",
|
|
"clientVersion": _YT_CLIENT_VERSION,
|
|
},
|
|
},
|
|
"query": keyword,
|
|
}).encode()
|
|
|
|
req = urllib.request.Request(_YT_SEARCH_URL, data=payload, method="POST")
|
|
req.add_header("Content-Type", "application/json")
|
|
|
|
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
|
|
raw = resp.read()
|
|
resp.close()
|
|
|
|
data = json.loads(raw)
|
|
videos = _extract_videos(data)
|
|
# Deduplicate by videoId (same video can appear in multiple sections)
|
|
seen_ids: set[str] = set()
|
|
unique: list[dict] = []
|
|
for v in videos:
|
|
if v["id"] not in seen_ids:
|
|
seen_ids.add(v["id"])
|
|
unique.append(v)
|
|
return unique
|
|
|
|
|
|
# -- Twitch GQL search (blocking) ------------------------------------------
|
|
|
|
def _search_twitch(keyword: str) -> list[dict]:
|
|
"""Search Twitch via public GQL. Blocking."""
|
|
query = (
|
|
'query{searchFor(userQuery:"'
|
|
+ keyword.replace("\\", "\\\\").replace('"', '\\"')
|
|
+ '",options:{targets:[{index:STREAM},{index:VOD}]})'
|
|
"{streams{items{id broadcaster{login displayName}title game{name}"
|
|
"viewersCount}}videos{items{id owner{login displayName}title"
|
|
" game{name}viewCount}}}}"
|
|
)
|
|
body = json.dumps({"query": query}).encode()
|
|
|
|
req = urllib.request.Request(_GQL_URL, data=body, method="POST")
|
|
req.add_header("Client-Id", _GQL_CLIENT_ID)
|
|
req.add_header("Content-Type", "application/json")
|
|
|
|
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
|
|
raw = resp.read()
|
|
resp.close()
|
|
|
|
data = json.loads(raw)
|
|
results: list[dict] = []
|
|
|
|
try:
|
|
search = data["data"]["searchFor"]
|
|
except (KeyError, TypeError):
|
|
return results
|
|
if not search:
|
|
return results
|
|
|
|
# Live streams
|
|
streams = search.get("streams") or {}
|
|
for item in streams.get("items") or []:
|
|
stream_id = str(item.get("id", ""))
|
|
if not stream_id:
|
|
continue
|
|
broadcaster = item.get("broadcaster") or {}
|
|
login = broadcaster.get("login", "")
|
|
display = broadcaster.get("displayName", login)
|
|
title = item.get("title", "")
|
|
game = (item.get("game") or {}).get("name", "")
|
|
line = f"{display} is live: {title}"
|
|
if game:
|
|
line += f" ({game})"
|
|
results.append({
|
|
"id": f"stream:{stream_id}",
|
|
"title": line,
|
|
"url": f"https://twitch.tv/{login}",
|
|
"extra": "",
|
|
})
|
|
|
|
# VODs
|
|
videos = search.get("videos") or {}
|
|
for item in videos.get("items") or []:
|
|
vod_id = str(item.get("id", ""))
|
|
if not vod_id:
|
|
continue
|
|
title = item.get("title", "")
|
|
results.append({
|
|
"id": f"vod:{vod_id}",
|
|
"title": title,
|
|
"url": f"https://twitch.tv/videos/{vod_id}",
|
|
"extra": "",
|
|
})
|
|
|
|
return results
|
|
|
|
|
|
# -- SearXNG search (blocking) ----------------------------------------------
|
|
|
|
def _search_searx(keyword: str) -> list[dict]:
|
|
"""Search SearXNG. Blocking."""
|
|
import urllib.parse
|
|
|
|
params = urllib.parse.urlencode({"q": keyword, "format": "json"})
|
|
url = f"{_SEARX_URL}?{params}"
|
|
|
|
req = urllib.request.Request(url, method="GET")
|
|
resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT)
|
|
raw = resp.read()
|
|
resp.close()
|
|
|
|
data = json.loads(raw)
|
|
results: list[dict] = []
|
|
for item in data.get("results", []):
|
|
item_url = item.get("url", "")
|
|
title = item.get("title", "")
|
|
results.append({
|
|
"id": item_url,
|
|
"title": title,
|
|
"url": item_url,
|
|
"extra": "",
|
|
})
|
|
return results
|
|
|
|
|
|
# -- Backend registry -------------------------------------------------------
|
|
|
|
_BACKENDS: dict[str, callable] = {
|
|
"yt": _search_youtube,
|
|
"tw": _search_twitch,
|
|
"sx": _search_searx,
|
|
}
|
|
|
|
|
|
# -- State helpers -----------------------------------------------------------
|
|
|
|
def _save(bot, key: str, data: dict) -> None:
|
|
"""Persist subscription data to bot.state."""
|
|
bot.state.set("alert", key, json.dumps(data))
|
|
|
|
|
|
def _load(bot, key: str) -> dict | None:
|
|
"""Load subscription data from bot.state."""
|
|
raw = bot.state.get("alert", key)
|
|
if raw is None:
|
|
return None
|
|
try:
|
|
return json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def _delete(bot, key: str) -> None:
|
|
"""Remove subscription data from bot.state."""
|
|
bot.state.delete("alert", key)
|
|
|
|
|
|
# -- Polling -----------------------------------------------------------------
|
|
|
|
async def _poll_once(bot, key: str, announce: bool = True) -> None:
|
|
"""Single poll cycle for one alert subscription (all backends)."""
|
|
data = _subscriptions.get(key)
|
|
if data is None:
|
|
data = _load(bot, key)
|
|
if data is None:
|
|
return
|
|
_subscriptions[key] = data
|
|
|
|
keyword = data["keyword"]
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
data["last_poll"] = now
|
|
|
|
had_error = False
|
|
loop = asyncio.get_running_loop()
|
|
|
|
for tag, backend in _BACKENDS.items():
|
|
try:
|
|
items = await loop.run_in_executor(None, backend, keyword)
|
|
except Exception as exc:
|
|
data["last_error"] = f"{tag}: {exc}"
|
|
had_error = True
|
|
continue
|
|
|
|
seen_set = set(data.get("seen", {}).get(tag, []))
|
|
seen_list = list(data.get("seen", {}).get(tag, []))
|
|
new_items = [item for item in items if item["id"] not in seen_set]
|
|
|
|
if announce and new_items:
|
|
channel = data["channel"]
|
|
name = data["name"]
|
|
shown = new_items[:_MAX_ANNOUNCE]
|
|
for item in shown:
|
|
title = _truncate(item["title"]) if item["title"] else "(no title)"
|
|
url = item["url"]
|
|
line = f"[{name}/{tag}] {title}"
|
|
if url:
|
|
line += f" -- {url}"
|
|
await bot.send(channel, line)
|
|
remaining = len(new_items) - len(shown)
|
|
if remaining > 0:
|
|
await bot.send(channel, f"[{name}/{tag}] ... and {remaining} more")
|
|
|
|
for item in new_items:
|
|
seen_list.append(item["id"])
|
|
if len(seen_list) > _MAX_SEEN:
|
|
seen_list = seen_list[-_MAX_SEEN:]
|
|
data.setdefault("seen", {})[tag] = seen_list
|
|
|
|
if had_error:
|
|
_errors[key] = _errors.get(key, 0) + 1
|
|
else:
|
|
data["last_error"] = ""
|
|
_errors[key] = 0
|
|
|
|
_subscriptions[key] = data
|
|
_save(bot, key, data)
|
|
|
|
|
|
async def _poll_loop(bot, key: str) -> None:
|
|
"""Infinite poll loop for one alert subscription."""
|
|
try:
|
|
while True:
|
|
data = _subscriptions.get(key) or _load(bot, key)
|
|
if data is None:
|
|
return
|
|
interval = data.get("interval", _DEFAULT_INTERVAL)
|
|
errs = _errors.get(key, 0)
|
|
if errs >= 5:
|
|
interval = min(interval * 2, _MAX_INTERVAL)
|
|
await asyncio.sleep(interval)
|
|
await _poll_once(bot, key, announce=True)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
|
|
def _start_poller(bot, key: str) -> None:
|
|
"""Create and track a poller task."""
|
|
existing = _pollers.get(key)
|
|
if existing and not existing.done():
|
|
return
|
|
task = asyncio.create_task(_poll_loop(bot, key))
|
|
_pollers[key] = task
|
|
|
|
|
|
def _stop_poller(key: str) -> None:
|
|
"""Cancel and remove a poller task."""
|
|
task = _pollers.pop(key, None)
|
|
if task and not task.done():
|
|
task.cancel()
|
|
_subscriptions.pop(key, None)
|
|
_errors.pop(key, 0)
|
|
|
|
|
|
# -- Restore on connect -----------------------------------------------------
|
|
|
|
def _restore(bot) -> None:
|
|
"""Rebuild pollers from persisted state."""
|
|
for key in bot.state.keys("alert"):
|
|
existing = _pollers.get(key)
|
|
if existing and not existing.done():
|
|
continue
|
|
data = _load(bot, key)
|
|
if data is None:
|
|
continue
|
|
_subscriptions[key] = data
|
|
_start_poller(bot, key)
|
|
|
|
|
|
@event("001")
|
|
async def on_connect(bot, message):
|
|
"""Restore alert subscription pollers on connect."""
|
|
_restore(bot)
|
|
|
|
|
|
# -- Command handler ---------------------------------------------------------
|
|
|
|
@command("alert", help="Alert: !alert add|del|list|check")
|
|
async def cmd_alert(bot, message):
|
|
"""Per-channel keyword alert subscriptions across platforms.
|
|
|
|
Usage:
|
|
!alert add <name> <keyword...> Add keyword alert (admin)
|
|
!alert del <name> Remove alert (admin)
|
|
!alert list List alerts
|
|
!alert check <name> Force-poll now
|
|
"""
|
|
parts = message.text.split(None, 3)
|
|
if len(parts) < 2:
|
|
await bot.reply(message, "Usage: !alert <add|del|list|check> [args]")
|
|
return
|
|
|
|
sub = parts[1].lower()
|
|
|
|
# -- list (any user, channel only) ----------------------------------------
|
|
if sub == "list":
|
|
if not message.is_channel:
|
|
await bot.reply(message, "Use this command in a channel")
|
|
return
|
|
channel = message.target
|
|
prefix = f"{channel}:"
|
|
subs = []
|
|
for key in bot.state.keys("alert"):
|
|
if key.startswith(prefix):
|
|
data = _load(bot, key)
|
|
if data:
|
|
name = data["name"]
|
|
err = data.get("last_error", "")
|
|
if err:
|
|
subs.append(f"{name} (error)")
|
|
else:
|
|
subs.append(name)
|
|
if not subs:
|
|
await bot.reply(message, "No alerts in this channel")
|
|
return
|
|
await bot.reply(message, f"Alerts: {', '.join(subs)}")
|
|
return
|
|
|
|
# -- check (any user, channel only) ---------------------------------------
|
|
if sub == "check":
|
|
if not message.is_channel:
|
|
await bot.reply(message, "Use this command in a channel")
|
|
return
|
|
if len(parts) < 3:
|
|
await bot.reply(message, "Usage: !alert check <name>")
|
|
return
|
|
name = parts[2].lower()
|
|
channel = message.target
|
|
key = _state_key(channel, name)
|
|
data = _load(bot, key)
|
|
if data is None:
|
|
await bot.reply(message, f"No alert '{name}' in this channel")
|
|
return
|
|
_subscriptions[key] = data
|
|
await _poll_once(bot, key, announce=True)
|
|
data = _subscriptions.get(key, data)
|
|
if data.get("last_error"):
|
|
await bot.reply(message, f"{name}: error -- {data['last_error']}")
|
|
else:
|
|
await bot.reply(message, f"{name}: checked")
|
|
return
|
|
|
|
# -- add (admin, channel only) -------------------------------------------
|
|
if sub == "add":
|
|
if not bot._is_admin(message):
|
|
await bot.reply(message, "Permission denied: add requires admin")
|
|
return
|
|
if not message.is_channel:
|
|
await bot.reply(message, "Use this command in a channel")
|
|
return
|
|
if len(parts) < 4:
|
|
await bot.reply(message, "Usage: !alert add <name> <keyword...>")
|
|
return
|
|
|
|
name = parts[2].lower()
|
|
keyword = parts[3]
|
|
|
|
if not _validate_name(name):
|
|
await bot.reply(
|
|
message,
|
|
"Invalid name (lowercase alphanumeric + hyphens, 1-20 chars)",
|
|
)
|
|
return
|
|
|
|
if len(keyword) > _MAX_KEYWORD_LEN:
|
|
await bot.reply(message, f"Keyword too long (max {_MAX_KEYWORD_LEN} chars)")
|
|
return
|
|
|
|
irc_channel = message.target
|
|
key = _state_key(irc_channel, name)
|
|
|
|
if _load(bot, key) is not None:
|
|
await bot.reply(message, f"Alert '{name}' already exists in this channel")
|
|
return
|
|
|
|
ch_prefix = f"{irc_channel}:"
|
|
count = sum(1 for k in bot.state.keys("alert") if k.startswith(ch_prefix))
|
|
if count >= _MAX_SUBS:
|
|
await bot.reply(message, f"Alert limit reached ({_MAX_SUBS})")
|
|
return
|
|
|
|
# Seed: query all backends to populate seen lists (prevents flood)
|
|
loop = asyncio.get_running_loop()
|
|
seen: dict[str, list[str]] = {}
|
|
for tag, backend in _BACKENDS.items():
|
|
try:
|
|
items = await loop.run_in_executor(None, backend, keyword)
|
|
ids = [item["id"] for item in items]
|
|
if len(ids) > _MAX_SEEN:
|
|
ids = ids[-_MAX_SEEN:]
|
|
seen[tag] = ids
|
|
except Exception:
|
|
seen[tag] = []
|
|
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
data = {
|
|
"keyword": keyword,
|
|
"name": name,
|
|
"channel": irc_channel,
|
|
"interval": _DEFAULT_INTERVAL,
|
|
"added_by": message.nick,
|
|
"added_at": now,
|
|
"last_poll": now,
|
|
"last_error": "",
|
|
"seen": seen,
|
|
}
|
|
_save(bot, key, data)
|
|
_subscriptions[key] = data
|
|
_start_poller(bot, key)
|
|
|
|
counts = ", ".join(f"{len(seen.get(t, []))} {t}" for t in _BACKENDS)
|
|
await bot.reply(
|
|
message,
|
|
f"Alert '{name}' added for: {keyword} ({counts} existing)",
|
|
)
|
|
return
|
|
|
|
# -- del (admin, channel only) -------------------------------------------
|
|
if sub == "del":
|
|
if not bot._is_admin(message):
|
|
await bot.reply(message, "Permission denied: del requires admin")
|
|
return
|
|
if not message.is_channel:
|
|
await bot.reply(message, "Use this command in a channel")
|
|
return
|
|
if len(parts) < 3:
|
|
await bot.reply(message, "Usage: !alert del <name>")
|
|
return
|
|
|
|
name = parts[2].lower()
|
|
channel = message.target
|
|
key = _state_key(channel, name)
|
|
|
|
if _load(bot, key) is None:
|
|
await bot.reply(message, f"No alert '{name}' in this channel")
|
|
return
|
|
|
|
_stop_poller(key)
|
|
_delete(bot, key)
|
|
await bot.reply(message, f"Removed '{name}'")
|
|
return
|
|
|
|
await bot.reply(message, "Usage: !alert <add|del|list|check> [args]")
|