Search Reddit posts (rd) via JSON API and Mastodon hashtag timelines (ft) across 4 fediverse instances. Both public, no auth required. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
860 lines
28 KiB
Python
860 lines
28 KiB
Python
"""Plugin: keyword alert subscriptions across multiple platforms."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import re
|
|
import sqlite3
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from html.parser import HTMLParser
|
|
from pathlib import Path
|
|
|
|
from derp.http import urlopen as _urlopen
|
|
from derp.plugin import command, event
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
# -- Constants ---------------------------------------------------------------
|
|
|
|
_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,19}$")
|
|
_MAX_KEYWORD_LEN = 100
|
|
_MAX_SEEN = 200
|
|
_DEFAULT_INTERVAL = 300
|
|
_MAX_INTERVAL = 3600
|
|
_FETCH_TIMEOUT = 15
|
|
_MAX_TITLE_LEN = 80
|
|
_MAX_SUBS = 20
|
|
_YT_SEARCH_URL = "https://www.youtube.com/youtubei/v1/search"
|
|
_YT_CLIENT_VERSION = "2.20250101.00.00"
|
|
_GQL_URL = "https://gql.twitch.tv/gql"
|
|
_GQL_CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko"
|
|
_SEARX_URL = "https://searx.mymx.me/search"
|
|
_REDDIT_SEARCH_URL = "https://old.reddit.com/search.json"
|
|
_MASTODON_INSTANCES = [
|
|
"mastodon.social",
|
|
"fosstodon.org",
|
|
"hachyderm.io",
|
|
"infosec.exchange",
|
|
]
|
|
_MASTODON_TAG_TIMEOUT = 4
|
|
|
|
# -- Module-level tracking ---------------------------------------------------
|
|
|
|
_pollers: dict[str, asyncio.Task] = {}
|
|
_subscriptions: dict[str, dict] = {}
|
|
_errors: dict[str, int] = {}
|
|
|
|
# -- History database --------------------------------------------------------
|
|
|
|
_DB_PATH = Path("data/alert_history.db")
|
|
_conn: sqlite3.Connection | None = None
|
|
|
|
|
|
def _db() -> sqlite3.Connection:
|
|
"""Lazy-init the history database connection and schema."""
|
|
global _conn
|
|
if _conn is not None:
|
|
return _conn
|
|
_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
_conn = sqlite3.connect(str(_DB_PATH))
|
|
_conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS results (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
channel TEXT NOT NULL,
|
|
alert TEXT NOT NULL,
|
|
backend TEXT NOT NULL,
|
|
item_id TEXT NOT NULL,
|
|
title TEXT NOT NULL,
|
|
url TEXT NOT NULL,
|
|
date TEXT NOT NULL DEFAULT '',
|
|
found_at TEXT NOT NULL
|
|
)
|
|
""")
|
|
_conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_results_alert ON results(channel, alert)"
|
|
)
|
|
_conn.commit()
|
|
return _conn
|
|
|
|
|
|
def _save_result(channel: str, alert: str, backend: str, item: dict) -> None:
|
|
"""Persist a matched result to the history database."""
|
|
db = _db()
|
|
db.execute(
|
|
"INSERT INTO results (channel, alert, backend, item_id, title, url, date, found_at)"
|
|
" VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
channel,
|
|
alert,
|
|
backend,
|
|
item.get("id", ""),
|
|
item.get("title", ""),
|
|
item.get("url", ""),
|
|
item.get("date", ""),
|
|
datetime.now(timezone.utc).isoformat(),
|
|
),
|
|
)
|
|
db.commit()
|
|
|
|
|
|
# -- Pure helpers ------------------------------------------------------------
|
|
|
|
def _state_key(channel: str, name: str) -> str:
|
|
"""Build composite state key."""
|
|
return f"{channel}:{name}"
|
|
|
|
|
|
def _validate_name(name: str) -> bool:
|
|
"""Check name against allowed pattern."""
|
|
return bool(_NAME_RE.match(name))
|
|
|
|
|
|
def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str:
|
|
"""Truncate text with ellipsis if needed."""
|
|
if len(text) <= max_len:
|
|
return text
|
|
return text[: max_len - 3].rstrip() + "..."
|
|
|
|
|
|
_DATE_PROPS = {
|
|
"article:published_time", "og:article:published_time",
|
|
"og:updated_time", "date", "dc.date", "dcterms.date",
|
|
"sailthru.date",
|
|
}
|
|
|
|
|
|
class _OGParser(HTMLParser):
|
|
"""Extract og:title, og:description, and published date from <meta> tags."""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.og_title = ""
|
|
self.og_description = ""
|
|
self.published = ""
|
|
|
|
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
|
|
if tag != "meta":
|
|
return
|
|
attr_map = {k.lower(): (v or "") for k, v in attrs}
|
|
prop = attr_map.get("property", "").lower()
|
|
name = attr_map.get("name", "").lower()
|
|
content = attr_map.get("content", "")
|
|
if prop == "og:title":
|
|
self.og_title = content
|
|
elif prop == "og:description":
|
|
self.og_description = content
|
|
if not self.published and content:
|
|
if prop in _DATE_PROPS or name in _DATE_PROPS:
|
|
self.published = content
|
|
|
|
|
|
_OG_TIMEOUT = 10
|
|
_OG_MAX_BYTES = 64 * 1024 # Only read first 64 KB (OG tags are in <head>)
|
|
|
|
|
|
def _parse_date(raw: str) -> str:
|
|
"""Try to extract a YYYY-MM-DD date from a raw date string."""
|
|
m = re.search(r"\d{4}-\d{2}-\d{2}", raw)
|
|
return m.group(0) if m else ""
|
|
|
|
|
|
def _strip_html(text: str) -> str:
|
|
"""Remove HTML tags from text."""
|
|
return re.sub(r"<[^>]+>", "", text).strip()
|
|
|
|
|
|
def _fetch_og(url: str) -> tuple[str, str, str]:
|
|
"""Fetch og:title, og:description, and published date from a URL.
|
|
|
|
Returns (og_title, og_description, date). Empty strings on failure.
|
|
"""
|
|
try:
|
|
req = urllib.request.Request(url, method="GET")
|
|
req.add_header("User-Agent", "Mozilla/5.0 (compatible; derp-bot)")
|
|
resp = _urlopen(req, timeout=_OG_TIMEOUT)
|
|
raw = resp.read(_OG_MAX_BYTES)
|
|
resp.close()
|
|
html = raw.decode("utf-8", errors="replace")
|
|
parser = _OGParser()
|
|
parser.feed(html)
|
|
date = _parse_date(parser.published)
|
|
return parser.og_title, parser.og_description, date
|
|
except Exception as exc:
|
|
_log.debug("og fetch failed for %s: %s", url, exc)
|
|
return "", "", ""
|
|
|
|
|
|
# -- YouTube InnerTube search (blocking) ------------------------------------
|
|
|
|
def _extract_videos(obj: object, depth: int = 0) -> list[dict]:
|
|
"""Recursively walk YouTube JSON to find video results.
|
|
|
|
Finds all objects containing both 'videoId' and 'title' keys.
|
|
Resilient to YouTube rearranging wrapper layers.
|
|
"""
|
|
if depth > 20:
|
|
return []
|
|
results = []
|
|
if isinstance(obj, dict):
|
|
video_id = obj.get("videoId")
|
|
title_obj = obj.get("title")
|
|
if isinstance(video_id, str) and video_id and title_obj is not None:
|
|
if isinstance(title_obj, dict):
|
|
runs = title_obj.get("runs", [])
|
|
title = "".join(r.get("text", "") for r in runs if isinstance(r, dict))
|
|
elif isinstance(title_obj, str):
|
|
title = title_obj
|
|
else:
|
|
title = ""
|
|
if title:
|
|
# Extract relative publish time (e.g. "2 days ago")
|
|
pub_obj = obj.get("publishedTimeText")
|
|
date = ""
|
|
if isinstance(pub_obj, dict):
|
|
date = pub_obj.get("simpleText", "")
|
|
elif isinstance(pub_obj, str):
|
|
date = pub_obj
|
|
results.append({
|
|
"id": video_id,
|
|
"title": title,
|
|
"url": f"https://www.youtube.com/watch?v={video_id}",
|
|
"date": date,
|
|
"extra": "",
|
|
})
|
|
for val in obj.values():
|
|
results.extend(_extract_videos(val, depth + 1))
|
|
elif isinstance(obj, list):
|
|
for item in obj:
|
|
results.extend(_extract_videos(item, depth + 1))
|
|
return results
|
|
|
|
|
|
def _search_youtube(keyword: str) -> list[dict]:
|
|
"""Search YouTube via InnerTube API. Blocking."""
|
|
payload = json.dumps({
|
|
"context": {
|
|
"client": {
|
|
"clientName": "WEB",
|
|
"clientVersion": _YT_CLIENT_VERSION,
|
|
},
|
|
},
|
|
"query": keyword,
|
|
}).encode()
|
|
|
|
req = urllib.request.Request(_YT_SEARCH_URL, data=payload, method="POST")
|
|
req.add_header("Content-Type", "application/json")
|
|
|
|
resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT)
|
|
raw = resp.read()
|
|
resp.close()
|
|
|
|
data = json.loads(raw)
|
|
videos = _extract_videos(data)
|
|
# Deduplicate by videoId (same video can appear in multiple sections)
|
|
seen_ids: set[str] = set()
|
|
unique: list[dict] = []
|
|
for v in videos:
|
|
if v["id"] not in seen_ids:
|
|
seen_ids.add(v["id"])
|
|
unique.append(v)
|
|
return unique
|
|
|
|
|
|
# -- Twitch GQL search (blocking) ------------------------------------------
|
|
|
|
def _search_twitch(keyword: str) -> list[dict]:
|
|
"""Search Twitch via public GQL. Blocking."""
|
|
query = (
|
|
'query{searchFor(userQuery:"'
|
|
+ keyword.replace("\\", "\\\\").replace('"', '\\"')
|
|
+ '",options:{targets:[{index:STREAM},{index:VOD}]})'
|
|
"{streams{items{id broadcaster{login displayName}title game{name}"
|
|
"viewersCount}}videos{items{id owner{login displayName}title"
|
|
" game{name}viewCount}}}}"
|
|
)
|
|
body = json.dumps({"query": query}).encode()
|
|
|
|
req = urllib.request.Request(_GQL_URL, data=body, method="POST")
|
|
req.add_header("Client-Id", _GQL_CLIENT_ID)
|
|
req.add_header("Content-Type", "application/json")
|
|
|
|
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
|
|
raw = resp.read()
|
|
resp.close()
|
|
|
|
data = json.loads(raw)
|
|
results: list[dict] = []
|
|
|
|
try:
|
|
search = data["data"]["searchFor"]
|
|
except (KeyError, TypeError):
|
|
return results
|
|
if not search:
|
|
return results
|
|
|
|
# Live streams
|
|
streams = search.get("streams") or {}
|
|
for item in streams.get("items") or []:
|
|
stream_id = str(item.get("id", ""))
|
|
if not stream_id:
|
|
continue
|
|
broadcaster = item.get("broadcaster") or {}
|
|
login = broadcaster.get("login", "")
|
|
display = broadcaster.get("displayName", login)
|
|
title = item.get("title", "")
|
|
game = (item.get("game") or {}).get("name", "")
|
|
line = f"{display} is live: {title}"
|
|
if game:
|
|
line += f" ({game})"
|
|
results.append({
|
|
"id": f"stream:{stream_id}",
|
|
"title": line,
|
|
"url": f"https://twitch.tv/{login}",
|
|
"date": "",
|
|
"extra": "",
|
|
})
|
|
|
|
# VODs
|
|
videos = search.get("videos") or {}
|
|
for item in videos.get("items") or []:
|
|
vod_id = str(item.get("id", ""))
|
|
if not vod_id:
|
|
continue
|
|
title = item.get("title", "")
|
|
results.append({
|
|
"id": f"vod:{vod_id}",
|
|
"title": title,
|
|
"url": f"https://twitch.tv/videos/{vod_id}",
|
|
"date": "",
|
|
"extra": "",
|
|
})
|
|
|
|
return results
|
|
|
|
|
|
# -- SearXNG search (blocking) ----------------------------------------------
|
|
|
|
def _search_searx(keyword: str) -> list[dict]:
|
|
"""Search SearXNG. Blocking."""
|
|
import urllib.parse
|
|
|
|
params = urllib.parse.urlencode({"q": keyword, "format": "json"})
|
|
url = f"{_SEARX_URL}?{params}"
|
|
|
|
req = urllib.request.Request(url, method="GET")
|
|
resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT)
|
|
raw = resp.read()
|
|
resp.close()
|
|
|
|
data = json.loads(raw)
|
|
results: list[dict] = []
|
|
for item in data.get("results", []):
|
|
item_url = item.get("url", "")
|
|
title = item.get("title", "")
|
|
date = _parse_date(item.get("publishedDate") or "")
|
|
results.append({
|
|
"id": item_url,
|
|
"title": title,
|
|
"url": item_url,
|
|
"date": date,
|
|
"extra": "",
|
|
})
|
|
return results
|
|
|
|
|
|
# -- Reddit search (blocking) ------------------------------------------------
|
|
|
|
def _search_reddit(keyword: str) -> list[dict]:
|
|
"""Search Reddit via JSON API. Blocking."""
|
|
import urllib.parse
|
|
|
|
params = urllib.parse.urlencode({
|
|
"q": keyword, "sort": "new", "limit": "25", "t": "week",
|
|
})
|
|
url = f"{_REDDIT_SEARCH_URL}?{params}"
|
|
|
|
req = urllib.request.Request(url, method="GET")
|
|
req.add_header("User-Agent", "derp-bot/1.0 (IRC keyword alert)")
|
|
|
|
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
|
|
raw = resp.read()
|
|
resp.close()
|
|
|
|
data = json.loads(raw)
|
|
results: list[dict] = []
|
|
for child in (data.get("data") or {}).get("children") or []:
|
|
post = child.get("data") or {}
|
|
post_id = post.get("name", "")
|
|
permalink = post.get("permalink", "")
|
|
title = post.get("title", "")
|
|
created = post.get("created_utc")
|
|
date = ""
|
|
if created:
|
|
try:
|
|
date = datetime.fromtimestamp(
|
|
float(created), tz=timezone.utc,
|
|
).strftime("%Y-%m-%d")
|
|
except (ValueError, OSError):
|
|
pass
|
|
results.append({
|
|
"id": post_id,
|
|
"title": title,
|
|
"url": f"https://www.reddit.com{permalink}" if permalink else "",
|
|
"date": date,
|
|
"extra": "",
|
|
})
|
|
return results
|
|
|
|
|
|
# -- Mastodon/Fediverse search (blocking) -----------------------------------
|
|
|
|
def _search_mastodon(keyword: str) -> list[dict]:
|
|
"""Search Mastodon instances via public hashtag timeline. Blocking."""
|
|
import urllib.parse
|
|
|
|
# Sanitize keyword to alphanumeric for hashtag search
|
|
hashtag = re.sub(r"[^a-zA-Z0-9]", "", keyword).lower()
|
|
if not hashtag:
|
|
return []
|
|
|
|
results: list[dict] = []
|
|
seen_urls: set[str] = set()
|
|
|
|
for instance in _MASTODON_INSTANCES:
|
|
tag_url = (
|
|
f"https://{instance}/api/v1/timelines/tag/"
|
|
f"{urllib.parse.quote(hashtag, safe='')}"
|
|
)
|
|
req = urllib.request.Request(tag_url, method="GET")
|
|
req.add_header("User-Agent", "derp-bot/1.0 (IRC keyword alert)")
|
|
try:
|
|
resp = _urlopen(req, timeout=_MASTODON_TAG_TIMEOUT)
|
|
raw = resp.read()
|
|
resp.close()
|
|
except Exception as exc:
|
|
_log.debug("mastodon %s failed: %s", instance, exc)
|
|
continue
|
|
|
|
try:
|
|
statuses = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if not isinstance(statuses, list):
|
|
continue
|
|
|
|
for status in statuses:
|
|
status_url = status.get("url") or status.get("uri", "")
|
|
if not status_url or status_url in seen_urls:
|
|
continue
|
|
seen_urls.add(status_url)
|
|
|
|
acct = (status.get("account") or {}).get("acct", "")
|
|
content = _strip_html(status.get("content", ""))
|
|
title = f"@{acct}: {_truncate(content, 60)}" if acct else content
|
|
date = _parse_date(status.get("created_at", ""))
|
|
results.append({
|
|
"id": status_url,
|
|
"title": title,
|
|
"url": status_url,
|
|
"date": date,
|
|
"extra": "",
|
|
})
|
|
|
|
return results
|
|
|
|
|
|
# -- Backend registry -------------------------------------------------------
|
|
|
|
_BACKENDS: dict[str, callable] = {
|
|
"yt": _search_youtube,
|
|
"tw": _search_twitch,
|
|
"sx": _search_searx,
|
|
"rd": _search_reddit,
|
|
"ft": _search_mastodon,
|
|
}
|
|
|
|
|
|
# -- State helpers -----------------------------------------------------------
|
|
|
|
def _save(bot, key: str, data: dict) -> None:
|
|
"""Persist subscription data to bot.state."""
|
|
bot.state.set("alert", key, json.dumps(data))
|
|
|
|
|
|
def _load(bot, key: str) -> dict | None:
|
|
"""Load subscription data from bot.state."""
|
|
raw = bot.state.get("alert", key)
|
|
if raw is None:
|
|
return None
|
|
try:
|
|
return json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def _delete(bot, key: str) -> None:
|
|
"""Remove subscription data from bot.state."""
|
|
bot.state.delete("alert", key)
|
|
|
|
|
|
# -- Polling -----------------------------------------------------------------
|
|
|
|
async def _poll_once(bot, key: str, announce: bool = True) -> None:
|
|
"""Single poll cycle for one alert subscription (all backends)."""
|
|
data = _subscriptions.get(key)
|
|
if data is None:
|
|
data = _load(bot, key)
|
|
if data is None:
|
|
return
|
|
_subscriptions[key] = data
|
|
|
|
keyword = data["keyword"]
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
data["last_poll"] = now
|
|
|
|
had_error = False
|
|
loop = asyncio.get_running_loop()
|
|
|
|
for tag, backend in _BACKENDS.items():
|
|
try:
|
|
items = await loop.run_in_executor(None, backend, keyword)
|
|
except Exception as exc:
|
|
data["last_error"] = f"{tag}: {exc}"
|
|
had_error = True
|
|
continue
|
|
|
|
seen_set = set(data.get("seen", {}).get(tag, []))
|
|
seen_list = list(data.get("seen", {}).get(tag, []))
|
|
new_items = [item for item in items if item["id"] not in seen_set]
|
|
|
|
# Filter: only announce results that actually contain the keyword
|
|
# Check title/URL first, then fall back to og:title/og:description
|
|
kw_lower = keyword.lower()
|
|
matched = []
|
|
for item in new_items:
|
|
title_l = item.get("title", "").lower()
|
|
url_l = item.get("url", "").lower()
|
|
if kw_lower in title_l or kw_lower in url_l:
|
|
# Fetch OG tags for date if backend didn't provide one
|
|
if not item.get("date") and item.get("url"):
|
|
_, _, og_date = await loop.run_in_executor(
|
|
None, _fetch_og, item["url"],
|
|
)
|
|
if og_date:
|
|
item["date"] = og_date
|
|
matched.append(item)
|
|
continue
|
|
# Fetch OG tags for items that didn't match on title/URL
|
|
item_url = item.get("url", "")
|
|
if item_url:
|
|
og_title, og_desc, og_date = await loop.run_in_executor(
|
|
None, _fetch_og, item_url,
|
|
)
|
|
if (kw_lower in og_title.lower()
|
|
or kw_lower in og_desc.lower()):
|
|
if og_title and len(og_title) > len(item.get("title", "")):
|
|
item["title"] = og_title
|
|
if og_date and not item.get("date"):
|
|
item["date"] = og_date
|
|
matched.append(item)
|
|
|
|
if announce and matched:
|
|
channel = data["channel"]
|
|
name = data["name"]
|
|
for item in matched:
|
|
title = _truncate(item["title"]) if item["title"] else "(no title)"
|
|
url = item["url"]
|
|
date = item.get("date", "")
|
|
line = f"[{name}/{tag}]"
|
|
if date:
|
|
line += f" ({date})"
|
|
line += f" {title}"
|
|
if url:
|
|
line += f" -- {url}"
|
|
await bot.send(channel, line)
|
|
_save_result(channel, name, tag, item)
|
|
|
|
for item in new_items:
|
|
seen_list.append(item["id"])
|
|
if len(seen_list) > _MAX_SEEN:
|
|
seen_list = seen_list[-_MAX_SEEN:]
|
|
data.setdefault("seen", {})[tag] = seen_list
|
|
|
|
if had_error:
|
|
_errors[key] = _errors.get(key, 0) + 1
|
|
else:
|
|
data["last_error"] = ""
|
|
_errors[key] = 0
|
|
|
|
_subscriptions[key] = data
|
|
_save(bot, key, data)
|
|
|
|
|
|
async def _poll_loop(bot, key: str) -> None:
|
|
"""Infinite poll loop for one alert subscription."""
|
|
try:
|
|
while True:
|
|
data = _subscriptions.get(key) or _load(bot, key)
|
|
if data is None:
|
|
return
|
|
interval = data.get("interval", _DEFAULT_INTERVAL)
|
|
errs = _errors.get(key, 0)
|
|
if errs >= 5:
|
|
interval = min(interval * 2, _MAX_INTERVAL)
|
|
await asyncio.sleep(interval)
|
|
await _poll_once(bot, key, announce=True)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
|
|
def _start_poller(bot, key: str) -> None:
|
|
"""Create and track a poller task."""
|
|
existing = _pollers.get(key)
|
|
if existing and not existing.done():
|
|
return
|
|
task = asyncio.create_task(_poll_loop(bot, key))
|
|
_pollers[key] = task
|
|
|
|
|
|
def _stop_poller(key: str) -> None:
|
|
"""Cancel and remove a poller task."""
|
|
task = _pollers.pop(key, None)
|
|
if task and not task.done():
|
|
task.cancel()
|
|
_subscriptions.pop(key, None)
|
|
_errors.pop(key, 0)
|
|
|
|
|
|
# -- Restore on connect -----------------------------------------------------
|
|
|
|
def _restore(bot) -> None:
|
|
"""Rebuild pollers from persisted state."""
|
|
for key in bot.state.keys("alert"):
|
|
existing = _pollers.get(key)
|
|
if existing and not existing.done():
|
|
continue
|
|
data = _load(bot, key)
|
|
if data is None:
|
|
continue
|
|
_subscriptions[key] = data
|
|
_start_poller(bot, key)
|
|
|
|
|
|
@event("001")
|
|
async def on_connect(bot, message):
|
|
"""Restore alert subscription pollers on connect."""
|
|
_restore(bot)
|
|
|
|
|
|
# -- Command handler ---------------------------------------------------------
|
|
|
|
@command("alert", help="Alert: !alert add|del|list|check|history")
|
|
async def cmd_alert(bot, message):
|
|
"""Per-channel keyword alert subscriptions across platforms.
|
|
|
|
Usage:
|
|
!alert add <name> <keyword...> Add keyword alert (admin)
|
|
!alert del <name> Remove alert (admin)
|
|
!alert list List alerts
|
|
!alert check <name> Force-poll now
|
|
!alert history <name> [n] Show recent results (default 5)
|
|
"""
|
|
parts = message.text.split(None, 3)
|
|
if len(parts) < 2:
|
|
await bot.reply(message, "Usage: !alert <add|del|list|check|history> [args]")
|
|
return
|
|
|
|
sub = parts[1].lower()
|
|
|
|
# -- list (any user, channel only) ----------------------------------------
|
|
if sub == "list":
|
|
if not message.is_channel:
|
|
await bot.reply(message, "Use this command in a channel")
|
|
return
|
|
channel = message.target
|
|
prefix = f"{channel}:"
|
|
subs = []
|
|
for key in bot.state.keys("alert"):
|
|
if key.startswith(prefix):
|
|
data = _load(bot, key)
|
|
if data:
|
|
name = data["name"]
|
|
err = data.get("last_error", "")
|
|
if err:
|
|
subs.append(f"{name} (error)")
|
|
else:
|
|
subs.append(name)
|
|
if not subs:
|
|
await bot.reply(message, "No alerts in this channel")
|
|
return
|
|
await bot.reply(message, f"Alerts: {', '.join(subs)}")
|
|
return
|
|
|
|
# -- check (any user, channel only) ---------------------------------------
|
|
if sub == "check":
|
|
if not message.is_channel:
|
|
await bot.reply(message, "Use this command in a channel")
|
|
return
|
|
if len(parts) < 3:
|
|
await bot.reply(message, "Usage: !alert check <name>")
|
|
return
|
|
name = parts[2].lower()
|
|
channel = message.target
|
|
key = _state_key(channel, name)
|
|
data = _load(bot, key)
|
|
if data is None:
|
|
await bot.reply(message, f"No alert '{name}' in this channel")
|
|
return
|
|
_subscriptions[key] = data
|
|
await _poll_once(bot, key, announce=True)
|
|
data = _subscriptions.get(key, data)
|
|
if data.get("last_error"):
|
|
await bot.reply(message, f"{name}: error -- {data['last_error']}")
|
|
else:
|
|
await bot.reply(message, f"{name}: checked")
|
|
return
|
|
|
|
# -- history (any user, channel only) ------------------------------------
|
|
if sub == "history":
|
|
if not message.is_channel:
|
|
await bot.reply(message, "Use this command in a channel")
|
|
return
|
|
if len(parts) < 3:
|
|
await bot.reply(message, "Usage: !alert history <name> [n]")
|
|
return
|
|
name = parts[2].lower()
|
|
channel = message.target
|
|
key = _state_key(channel, name)
|
|
if _load(bot, key) is None:
|
|
await bot.reply(message, f"No alert '{name}' in this channel")
|
|
return
|
|
limit = 5
|
|
if len(parts) >= 4:
|
|
try:
|
|
limit = max(1, min(int(parts[3]), 20))
|
|
except ValueError:
|
|
limit = 5
|
|
db = _db()
|
|
rows = db.execute(
|
|
"SELECT backend, title, url, date, found_at FROM results"
|
|
" WHERE channel = ? AND alert = ? ORDER BY id DESC LIMIT ?",
|
|
(channel, name, limit),
|
|
).fetchall()
|
|
if not rows:
|
|
await bot.reply(message, f"{name}: no history yet")
|
|
return
|
|
for backend, title, url, date, found_at in reversed(rows):
|
|
ts = found_at[:10]
|
|
title = _truncate(title) if title else "(no title)"
|
|
line = f"[{name}/{backend}] ({date or ts}) {title}"
|
|
if url:
|
|
line += f" -- {url}"
|
|
await bot.reply(message, line)
|
|
return
|
|
|
|
# -- add (admin, channel only) -------------------------------------------
|
|
if sub == "add":
|
|
if not bot._is_admin(message):
|
|
await bot.reply(message, "Permission denied: add requires admin")
|
|
return
|
|
if not message.is_channel:
|
|
await bot.reply(message, "Use this command in a channel")
|
|
return
|
|
if len(parts) < 4:
|
|
await bot.reply(message, "Usage: !alert add <name> <keyword...>")
|
|
return
|
|
|
|
name = parts[2].lower()
|
|
keyword = parts[3]
|
|
|
|
if not _validate_name(name):
|
|
await bot.reply(
|
|
message,
|
|
"Invalid name (lowercase alphanumeric + hyphens, 1-20 chars)",
|
|
)
|
|
return
|
|
|
|
if len(keyword) > _MAX_KEYWORD_LEN:
|
|
await bot.reply(message, f"Keyword too long (max {_MAX_KEYWORD_LEN} chars)")
|
|
return
|
|
|
|
irc_channel = message.target
|
|
key = _state_key(irc_channel, name)
|
|
|
|
if _load(bot, key) is not None:
|
|
await bot.reply(message, f"Alert '{name}' already exists in this channel")
|
|
return
|
|
|
|
ch_prefix = f"{irc_channel}:"
|
|
count = sum(1 for k in bot.state.keys("alert") if k.startswith(ch_prefix))
|
|
if count >= _MAX_SUBS:
|
|
await bot.reply(message, f"Alert limit reached ({_MAX_SUBS})")
|
|
return
|
|
|
|
# Seed: query all backends to populate seen lists (prevents flood)
|
|
loop = asyncio.get_running_loop()
|
|
seen: dict[str, list[str]] = {}
|
|
for tag, backend in _BACKENDS.items():
|
|
try:
|
|
items = await loop.run_in_executor(None, backend, keyword)
|
|
ids = [item["id"] for item in items]
|
|
if len(ids) > _MAX_SEEN:
|
|
ids = ids[-_MAX_SEEN:]
|
|
seen[tag] = ids
|
|
except Exception:
|
|
seen[tag] = []
|
|
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
data = {
|
|
"keyword": keyword,
|
|
"name": name,
|
|
"channel": irc_channel,
|
|
"interval": _DEFAULT_INTERVAL,
|
|
"added_by": message.nick,
|
|
"added_at": now,
|
|
"last_poll": now,
|
|
"last_error": "",
|
|
"seen": seen,
|
|
}
|
|
_save(bot, key, data)
|
|
_subscriptions[key] = data
|
|
_start_poller(bot, key)
|
|
|
|
counts = ", ".join(f"{len(seen.get(t, []))} {t}" for t in _BACKENDS)
|
|
await bot.reply(
|
|
message,
|
|
f"Alert '{name}' added for: {keyword} ({counts} existing)",
|
|
)
|
|
return
|
|
|
|
# -- del (admin, channel only) -------------------------------------------
|
|
if sub == "del":
|
|
if not bot._is_admin(message):
|
|
await bot.reply(message, "Permission denied: del requires admin")
|
|
return
|
|
if not message.is_channel:
|
|
await bot.reply(message, "Use this command in a channel")
|
|
return
|
|
if len(parts) < 3:
|
|
await bot.reply(message, "Usage: !alert del <name>")
|
|
return
|
|
|
|
name = parts[2].lower()
|
|
channel = message.target
|
|
key = _state_key(channel, name)
|
|
|
|
if _load(bot, key) is None:
|
|
await bot.reply(message, f"No alert '{name}' in this channel")
|
|
return
|
|
|
|
_stop_poller(key)
|
|
_delete(bot, key)
|
|
await bot.reply(message, f"Removed '{name}'")
|
|
return
|
|
|
|
await bot.reply(message, "Usage: !alert <add|del|list|check|history> [args]")
|