diff --git a/plugins/alert.py b/plugins/alert.py new file mode 100644 index 0000000..ffbb39b --- /dev/null +++ b/plugins/alert.py @@ -0,0 +1,511 @@ +"""Plugin: keyword alert subscriptions across multiple platforms.""" + +from __future__ import annotations + +import asyncio +import json +import re +import ssl +import urllib.request +from datetime import datetime, timezone + +from derp.plugin import command, event + +# -- Constants --------------------------------------------------------------- + +_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,19}$") +_MAX_KEYWORD_LEN = 100 +_MAX_SEEN = 200 +_MAX_ANNOUNCE = 5 +_DEFAULT_INTERVAL = 300 +_MAX_INTERVAL = 3600 +_FETCH_TIMEOUT = 15 +_MAX_TITLE_LEN = 80 +_MAX_SUBS = 20 +_YT_SEARCH_URL = "https://www.youtube.com/youtubei/v1/search" +_YT_CLIENT_VERSION = "2.20250101.00.00" +_GQL_URL = "https://gql.twitch.tv/gql" +_GQL_CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko" + +# -- Module-level tracking --------------------------------------------------- + +_pollers: dict[str, asyncio.Task] = {} +_subscriptions: dict[str, dict] = {} +_errors: dict[str, int] = {} + + +# -- Pure helpers ------------------------------------------------------------ + +def _state_key(channel: str, name: str) -> str: + """Build composite state key.""" + return f"{channel}:{name}" + + +def _validate_name(name: str) -> bool: + """Check name against allowed pattern.""" + return bool(_NAME_RE.match(name)) + + +def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str: + """Truncate text with ellipsis if needed.""" + if len(text) <= max_len: + return text + return text[: max_len - 3].rstrip() + "..." + + +# -- YouTube InnerTube search (blocking) ------------------------------------ + +def _extract_videos(obj: object, depth: int = 0) -> list[dict]: + """Recursively walk YouTube JSON to find video results. + + Finds all objects containing both 'videoId' and 'title' keys. + Resilient to YouTube rearranging wrapper layers. + """ + if depth > 20: + return [] + results = [] + if isinstance(obj, dict): + video_id = obj.get("videoId") + title_obj = obj.get("title") + if isinstance(video_id, str) and video_id and title_obj is not None: + if isinstance(title_obj, dict): + runs = title_obj.get("runs", []) + title = "".join(r.get("text", "") for r in runs if isinstance(r, dict)) + elif isinstance(title_obj, str): + title = title_obj + else: + title = "" + if title: + results.append({ + "id": video_id, + "title": title, + "url": f"https://www.youtube.com/watch?v={video_id}", + "extra": "", + }) + for val in obj.values(): + results.extend(_extract_videos(val, depth + 1)) + elif isinstance(obj, list): + for item in obj: + results.extend(_extract_videos(item, depth + 1)) + return results + + +def _search_youtube(keyword: str) -> list[dict]: + """Search YouTube via InnerTube API. Blocking.""" + payload = json.dumps({ + "context": { + "client": { + "clientName": "WEB", + "clientVersion": _YT_CLIENT_VERSION, + }, + }, + "query": keyword, + }).encode() + + req = urllib.request.Request(_YT_SEARCH_URL, data=payload, method="POST") + req.add_header("Content-Type", "application/json") + + ctx = ssl.create_default_context() + resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT, context=ctx) + raw = resp.read() + resp.close() + + data = json.loads(raw) + videos = _extract_videos(data) + # Deduplicate by videoId (same video can appear in multiple sections) + seen_ids: set[str] = set() + unique: list[dict] = [] + for v in videos: + if v["id"] not in seen_ids: + seen_ids.add(v["id"]) + unique.append(v) + return unique + + +# -- Twitch GQL search (blocking) ------------------------------------------ + +def _search_twitch(keyword: str) -> list[dict]: + """Search Twitch via public GQL. Blocking.""" + query = ( + 'query{searchFor(userQuery:"' + + keyword.replace("\\", "\\\\").replace('"', '\\"') + + '",options:{targets:[{index:STREAM},{index:VOD}]})' + "{streams{items{id broadcaster{login displayName}title game{name}" + "viewersCount}}videos{items{id owner{login displayName}title" + " game{name}viewCount}}}}" + ) + body = json.dumps({"query": query}).encode() + + req = urllib.request.Request(_GQL_URL, data=body, method="POST") + req.add_header("Client-Id", _GQL_CLIENT_ID) + req.add_header("Content-Type", "application/json") + + ctx = ssl.create_default_context() + resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT, context=ctx) + raw = resp.read() + resp.close() + + data = json.loads(raw) + results: list[dict] = [] + + try: + search = data["data"]["searchFor"] + except (KeyError, TypeError): + return results + if not search: + return results + + # Live streams + streams = search.get("streams") or {} + for item in streams.get("items") or []: + stream_id = str(item.get("id", "")) + if not stream_id: + continue + broadcaster = item.get("broadcaster") or {} + login = broadcaster.get("login", "") + display = broadcaster.get("displayName", login) + title = item.get("title", "") + game = (item.get("game") or {}).get("name", "") + line = f"{display} is live: {title}" + if game: + line += f" ({game})" + results.append({ + "id": f"stream:{stream_id}", + "title": line, + "url": f"https://twitch.tv/{login}", + "extra": "", + }) + + # VODs + videos = search.get("videos") or {} + for item in videos.get("items") or []: + vod_id = str(item.get("id", "")) + if not vod_id: + continue + title = item.get("title", "") + results.append({ + "id": f"vod:{vod_id}", + "title": title, + "url": f"https://twitch.tv/videos/{vod_id}", + "extra": "", + }) + + return results + + +# -- Backend registry ------------------------------------------------------- + +_BACKENDS: dict[str, callable] = { + "yt": _search_youtube, + "tw": _search_twitch, +} + + +# -- State helpers ----------------------------------------------------------- + +def _save(bot, key: str, data: dict) -> None: + """Persist subscription data to bot.state.""" + bot.state.set("alert", key, json.dumps(data)) + + +def _load(bot, key: str) -> dict | None: + """Load subscription data from bot.state.""" + raw = bot.state.get("alert", key) + if raw is None: + return None + try: + return json.loads(raw) + except json.JSONDecodeError: + return None + + +def _delete(bot, key: str) -> None: + """Remove subscription data from bot.state.""" + bot.state.delete("alert", key) + + +# -- Polling ----------------------------------------------------------------- + +async def _poll_once(bot, key: str, announce: bool = True) -> None: + """Single poll cycle for one alert subscription (all backends).""" + data = _subscriptions.get(key) + if data is None: + data = _load(bot, key) + if data is None: + return + _subscriptions[key] = data + + keyword = data["keyword"] + now = datetime.now(timezone.utc).isoformat() + data["last_poll"] = now + + had_error = False + loop = asyncio.get_running_loop() + + for tag, backend in _BACKENDS.items(): + try: + items = await loop.run_in_executor(None, backend, keyword) + except Exception as exc: + data["last_error"] = f"{tag}: {exc}" + had_error = True + continue + + seen_set = set(data.get("seen", {}).get(tag, [])) + seen_list = list(data.get("seen", {}).get(tag, [])) + new_items = [item for item in items if item["id"] not in seen_set] + + if announce and new_items: + channel = data["channel"] + name = data["name"] + shown = new_items[:_MAX_ANNOUNCE] + for item in shown: + title = _truncate(item["title"]) if item["title"] else "(no title)" + url = item["url"] + line = f"[{name}/{tag}] {title}" + if url: + line += f" -- {url}" + await bot.send(channel, line) + remaining = len(new_items) - len(shown) + if remaining > 0: + await bot.send(channel, f"[{name}/{tag}] ... and {remaining} more") + + for item in new_items: + seen_list.append(item["id"]) + if len(seen_list) > _MAX_SEEN: + seen_list = seen_list[-_MAX_SEEN:] + data.setdefault("seen", {})[tag] = seen_list + + if had_error: + _errors[key] = _errors.get(key, 0) + 1 + else: + data["last_error"] = "" + _errors[key] = 0 + + _subscriptions[key] = data + _save(bot, key, data) + + +async def _poll_loop(bot, key: str) -> None: + """Infinite poll loop for one alert subscription.""" + try: + while True: + data = _subscriptions.get(key) or _load(bot, key) + if data is None: + return + interval = data.get("interval", _DEFAULT_INTERVAL) + errs = _errors.get(key, 0) + if errs >= 5: + interval = min(interval * 2, _MAX_INTERVAL) + await asyncio.sleep(interval) + await _poll_once(bot, key, announce=True) + except asyncio.CancelledError: + pass + + +def _start_poller(bot, key: str) -> None: + """Create and track a poller task.""" + existing = _pollers.get(key) + if existing and not existing.done(): + return + task = asyncio.create_task(_poll_loop(bot, key)) + _pollers[key] = task + + +def _stop_poller(key: str) -> None: + """Cancel and remove a poller task.""" + task = _pollers.pop(key, None) + if task and not task.done(): + task.cancel() + _subscriptions.pop(key, None) + _errors.pop(key, 0) + + +# -- Restore on connect ----------------------------------------------------- + +def _restore(bot) -> None: + """Rebuild pollers from persisted state.""" + for key in bot.state.keys("alert"): + existing = _pollers.get(key) + if existing and not existing.done(): + continue + data = _load(bot, key) + if data is None: + continue + _subscriptions[key] = data + _start_poller(bot, key) + + +@event("001") +async def on_connect(bot, message): + """Restore alert subscription pollers on connect.""" + _restore(bot) + + +# -- Command handler --------------------------------------------------------- + +@command("alert", help="Alert: !alert add|del|list|check") +async def cmd_alert(bot, message): + """Per-channel keyword alert subscriptions across platforms. + + Usage: + !alert add Add keyword alert (admin) + !alert del Remove alert (admin) + !alert list List alerts + !alert check Force-poll now + """ + parts = message.text.split(None, 3) + if len(parts) < 2: + await bot.reply(message, "Usage: !alert [args]") + return + + sub = parts[1].lower() + + # -- list (any user, channel only) ---------------------------------------- + if sub == "list": + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + channel = message.target + prefix = f"{channel}:" + subs = [] + for key in bot.state.keys("alert"): + if key.startswith(prefix): + data = _load(bot, key) + if data: + name = data["name"] + err = data.get("last_error", "") + if err: + subs.append(f"{name} (error)") + else: + subs.append(name) + if not subs: + await bot.reply(message, "No alerts in this channel") + return + await bot.reply(message, f"Alerts: {', '.join(subs)}") + return + + # -- check (any user, channel only) --------------------------------------- + if sub == "check": + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + if len(parts) < 3: + await bot.reply(message, "Usage: !alert check ") + return + name = parts[2].lower() + channel = message.target + key = _state_key(channel, name) + data = _load(bot, key) + if data is None: + await bot.reply(message, f"No alert '{name}' in this channel") + return + _subscriptions[key] = data + await _poll_once(bot, key, announce=True) + data = _subscriptions.get(key, data) + if data.get("last_error"): + await bot.reply(message, f"{name}: error -- {data['last_error']}") + else: + await bot.reply(message, f"{name}: checked") + return + + # -- add (admin, channel only) ------------------------------------------- + if sub == "add": + if not bot._is_admin(message): + await bot.reply(message, "Permission denied: add requires admin") + return + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + if len(parts) < 4: + await bot.reply(message, "Usage: !alert add ") + return + + name = parts[2].lower() + keyword = parts[3] + + if not _validate_name(name): + await bot.reply( + message, + "Invalid name (lowercase alphanumeric + hyphens, 1-20 chars)", + ) + return + + if len(keyword) > _MAX_KEYWORD_LEN: + await bot.reply(message, f"Keyword too long (max {_MAX_KEYWORD_LEN} chars)") + return + + irc_channel = message.target + key = _state_key(irc_channel, name) + + if _load(bot, key) is not None: + await bot.reply(message, f"Alert '{name}' already exists in this channel") + return + + ch_prefix = f"{irc_channel}:" + count = sum(1 for k in bot.state.keys("alert") if k.startswith(ch_prefix)) + if count >= _MAX_SUBS: + await bot.reply(message, f"Alert limit reached ({_MAX_SUBS})") + return + + # Seed: query all backends to populate seen lists (prevents flood) + loop = asyncio.get_running_loop() + seen: dict[str, list[str]] = {} + for tag, backend in _BACKENDS.items(): + try: + items = await loop.run_in_executor(None, backend, keyword) + ids = [item["id"] for item in items] + if len(ids) > _MAX_SEEN: + ids = ids[-_MAX_SEEN:] + seen[tag] = ids + except Exception: + seen[tag] = [] + + now = datetime.now(timezone.utc).isoformat() + data = { + "keyword": keyword, + "name": name, + "channel": irc_channel, + "interval": _DEFAULT_INTERVAL, + "added_by": message.nick, + "added_at": now, + "last_poll": now, + "last_error": "", + "seen": seen, + } + _save(bot, key, data) + _subscriptions[key] = data + _start_poller(bot, key) + + counts = ", ".join(f"{len(seen.get(t, []))} {t}" for t in _BACKENDS) + await bot.reply( + message, + f"Alert '{name}' added for: {keyword} ({counts} existing)", + ) + return + + # -- del (admin, channel only) ------------------------------------------- + if sub == "del": + if not bot._is_admin(message): + await bot.reply(message, "Permission denied: del requires admin") + return + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + if len(parts) < 3: + await bot.reply(message, "Usage: !alert del ") + return + + name = parts[2].lower() + channel = message.target + key = _state_key(channel, name) + + if _load(bot, key) is None: + await bot.reply(message, f"No alert '{name}' in this channel") + return + + _stop_poller(key) + _delete(bot, key) + await bot.reply(message, f"Removed '{name}'") + return + + await bot.reply(message, "Usage: !alert [args]") diff --git a/tests/test_alert.py b/tests/test_alert.py new file mode 100644 index 0000000..6903c36 --- /dev/null +++ b/tests/test_alert.py @@ -0,0 +1,1165 @@ +"""Tests for the keyword alert subscription plugin.""" + +import asyncio +import importlib.util +import json +import sys +from pathlib import Path +from unittest.mock import patch + +from derp.irc import Message + +# plugins/ is not a Python package -- load the module from file path +_spec = importlib.util.spec_from_file_location( + "plugins.alert", Path(__file__).resolve().parent.parent / "plugins" / "alert.py", +) +_mod = importlib.util.module_from_spec(_spec) +sys.modules[_spec.name] = _mod +_spec.loader.exec_module(_mod) + +from plugins.alert import ( # noqa: E402 + _MAX_ANNOUNCE, + _MAX_SEEN, + _delete, + _errors, + _extract_videos, + _load, + _poll_once, + _pollers, + _restore, + _save, + _search_twitch, + _search_youtube, + _start_poller, + _state_key, + _stop_poller, + _subscriptions, + _truncate, + _validate_name, + cmd_alert, + on_connect, +) + +# -- Fixtures ---------------------------------------------------------------- + +# Minimal InnerTube-style response with two videos +YT_RESPONSE = { + "contents": { + "twoColumnSearchResultsRenderer": { + "primaryContents": { + "sectionListRenderer": { + "contents": [ + { + "itemSectionRenderer": { + "contents": [ + { + "videoRenderer": { + "videoId": "abc123", + "title": { + "runs": [{"text": "First Video"}], + }, + }, + }, + { + "videoRenderer": { + "videoId": "def456", + "title": { + "runs": [{"text": "Second Video"}], + }, + }, + }, + ], + }, + }, + ], + }, + }, + }, + }, +} + +# Deeply nested variant (extra wrapper layers) +YT_NESTED = { + "wrapper": { + "inner": [ + {"videoId": "nested1", "title": {"runs": [{"text": "Nested"}]}}, + ], + }, +} + +# GQL search response with streams and VODs +GQL_RESPONSE = { + "data": { + "searchFor": { + "streams": { + "items": [ + { + "id": "111", + "broadcaster": {"login": "streamer1", "displayName": "Streamer1"}, + "title": "Live now!", + "game": {"name": "Minecraft"}, + "viewersCount": 500, + }, + ], + }, + "videos": { + "items": [ + { + "id": "222", + "owner": {"login": "creator1", "displayName": "Creator1"}, + "title": "Cool VOD", + "game": {"name": "Fortnite"}, + "viewCount": 1000, + }, + ], + }, + }, + }, +} + + +# -- Helpers ----------------------------------------------------------------- + +class _FakeState: + """In-memory stand-in for bot.state.""" + + def __init__(self): + self._store: dict[str, dict[str, str]] = {} + + def get(self, plugin: str, key: str, default: str | None = None) -> str | None: + return self._store.get(plugin, {}).get(key, default) + + def set(self, plugin: str, key: str, value: str) -> None: + self._store.setdefault(plugin, {})[key] = value + + def delete(self, plugin: str, key: str) -> bool: + try: + del self._store[plugin][key] + return True + except KeyError: + return False + + def keys(self, plugin: str) -> list[str]: + return sorted(self._store.get(plugin, {}).keys()) + + +class _FakeBot: + """Minimal bot stand-in that captures sent/replied messages.""" + + def __init__(self, *, admin: bool = False): + self.sent: list[tuple[str, str]] = [] + self.replied: list[str] = [] + self.state = _FakeState() + self._admin = admin + + async def send(self, target: str, text: str) -> None: + self.sent.append((target, text)) + + async def reply(self, message, text: str) -> None: + self.replied.append(text) + + def _is_admin(self, message) -> bool: + return self._admin + + +def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message: + """Create a channel PRIVMSG.""" + return Message( + raw="", prefix=f"{nick}!~{nick}@host", nick=nick, + command="PRIVMSG", params=[target, text], tags={}, + ) + + +def _pm(text: str, nick: str = "alice") -> Message: + """Create a private PRIVMSG.""" + return Message( + raw="", prefix=f"{nick}!~{nick}@host", nick=nick, + command="PRIVMSG", params=["botname", text], tags={}, + ) + + +def _clear() -> None: + """Reset module-level state between tests.""" + for task in _pollers.values(): + if task and not task.done(): + task.cancel() + _pollers.clear() + _subscriptions.clear() + _errors.clear() + + +def _fake_yt(keyword): + """Fake YouTube backend returning two results.""" + return [ + {"id": "yt1", "title": "YT Result 1", + "url": "https://www.youtube.com/watch?v=yt1", "extra": ""}, + {"id": "yt2", "title": "YT Result 2", + "url": "https://www.youtube.com/watch?v=yt2", "extra": ""}, + ] + + +def _fake_tw(keyword): + """Fake Twitch backend returning two results.""" + return [ + {"id": "stream:tw1", "title": "TW Stream 1", + "url": "https://twitch.tv/user1", "extra": ""}, + {"id": "vod:tw2", "title": "TW VOD 1", + "url": "https://twitch.tv/videos/tw2", "extra": ""}, + ] + + +def _fake_yt_error(keyword): + """Fake YouTube backend that raises.""" + raise ConnectionError("YouTube down") + + +def _fake_tw_error(keyword): + """Fake Twitch backend that raises.""" + raise ConnectionError("Twitch down") + + +_FAKE_BACKENDS = {"yt": _fake_yt, "tw": _fake_tw} + + +# --------------------------------------------------------------------------- +# TestValidateName +# --------------------------------------------------------------------------- + +class TestValidateName: + def test_valid_simple(self): + assert _validate_name("mc-speed") is True + + def test_valid_with_numbers(self): + assert _validate_name("alert123") is True + + def test_valid_single_char(self): + assert _validate_name("a") is True + + def test_valid_max_length(self): + assert _validate_name("a" * 20) is True + + def test_invalid_too_long(self): + assert _validate_name("a" * 21) is False + + def test_invalid_uppercase(self): + assert _validate_name("Alert") is False + + def test_invalid_starts_with_hyphen(self): + assert _validate_name("-alert") is False + + def test_invalid_special_chars(self): + assert _validate_name("alert!") is False + + def test_invalid_spaces(self): + assert _validate_name("my alert") is False + + def test_invalid_empty(self): + assert _validate_name("") is False + + +# --------------------------------------------------------------------------- +# TestTruncate +# --------------------------------------------------------------------------- + +class TestTruncate: + def test_short_text_unchanged(self): + assert _truncate("hello", 80) == "hello" + + def test_exact_length_unchanged(self): + text = "a" * 80 + assert _truncate(text, 80) == text + + def test_long_text_truncated(self): + text = "a" * 100 + result = _truncate(text, 80) + assert len(result) == 80 + assert result.endswith("...") + + def test_default_max_length(self): + text = "a" * 100 + result = _truncate(text) + assert len(result) == 80 + + def test_trailing_space_stripped(self): + text = "word " * 20 + result = _truncate(text, 20) + assert not result.endswith(" ...") + + +# --------------------------------------------------------------------------- +# TestExtractVideos +# --------------------------------------------------------------------------- + +class TestExtractVideos: + def test_standard_response(self): + videos = _extract_videos(YT_RESPONSE) + assert len(videos) == 2 + assert videos[0]["id"] == "abc123" + assert videos[0]["title"] == "First Video" + assert "watch?v=abc123" in videos[0]["url"] + assert videos[1]["id"] == "def456" + + def test_nested_response(self): + videos = _extract_videos(YT_NESTED) + assert len(videos) == 1 + assert videos[0]["id"] == "nested1" + assert videos[0]["title"] == "Nested" + + def test_empty_response(self): + videos = _extract_videos({}) + assert videos == [] + + def test_depth_limit(self): + """Deeply nested structure stops at depth 20.""" + obj = {"videoId": "deep1", "title": {"runs": [{"text": "Deep"}]}} + # Wrap in 25 layers of nesting + for _ in range(25): + obj = {"child": obj} + videos = _extract_videos(obj) + assert len(videos) == 0 + + def test_title_as_string(self): + obj = {"videoId": "str1", "title": "String Title"} + videos = _extract_videos(obj) + assert len(videos) == 1 + assert videos[0]["title"] == "String Title" + + def test_empty_title_skipped(self): + obj = {"videoId": "empty1", "title": {"runs": []}} + videos = _extract_videos(obj) + assert len(videos) == 0 + + def test_dedup_in_search_youtube(self): + """_search_youtube deduplicates by videoId.""" + # Two sections containing the same video + response = { + "a": [ + {"videoId": "dup1", "title": "Video A"}, + {"videoId": "dup1", "title": "Video A Copy"}, + ], + } + + class FakeResp: + def read(self): + return json.dumps(response).encode() + def close(self): + pass + + with patch("urllib.request.urlopen", return_value=FakeResp()): + results = _search_youtube("test") + assert len(results) == 1 + assert results[0]["id"] == "dup1" + + +# --------------------------------------------------------------------------- +# TestSearchYoutube +# --------------------------------------------------------------------------- + +class TestSearchYoutube: + def test_parses_response(self): + class FakeResp: + def read(self): + return json.dumps(YT_RESPONSE).encode() + def close(self): + pass + + with patch("urllib.request.urlopen", return_value=FakeResp()): + results = _search_youtube("test query") + assert len(results) == 2 + assert results[0]["id"] == "abc123" + assert results[0]["title"] == "First Video" + + def test_http_error_propagates(self): + import pytest + with patch("urllib.request.urlopen", side_effect=ConnectionError("fail")): + with pytest.raises(ConnectionError): + _search_youtube("test") + + +# --------------------------------------------------------------------------- +# TestSearchTwitch +# --------------------------------------------------------------------------- + +class TestSearchTwitch: + def test_parses_streams_and_vods(self): + class FakeResp: + def read(self): + return json.dumps(GQL_RESPONSE).encode() + def close(self): + pass + + with patch("urllib.request.urlopen", return_value=FakeResp()): + results = _search_twitch("minecraft") + assert len(results) == 2 + # Stream + assert results[0]["id"] == "stream:111" + assert "Streamer1 is live:" in results[0]["title"] + assert "(Minecraft)" in results[0]["title"] + assert results[0]["url"] == "https://twitch.tv/streamer1" + # VOD + assert results[1]["id"] == "vod:222" + assert results[1]["title"] == "Cool VOD" + assert "videos/222" in results[1]["url"] + + def test_empty_search_results(self): + empty = {"data": {"searchFor": {"streams": {"items": []}, "videos": {"items": []}}}} + + class FakeResp: + def read(self): + return json.dumps(empty).encode() + def close(self): + pass + + with patch("urllib.request.urlopen", return_value=FakeResp()): + results = _search_twitch("nothing") + assert results == [] + + def test_bad_gql_response(self): + bad = {"data": {"searchFor": None}} + + class FakeResp: + def read(self): + return json.dumps(bad).encode() + def close(self): + pass + + with patch("urllib.request.urlopen", return_value=FakeResp()): + results = _search_twitch("bad") + assert results == [] + + def test_http_error_propagates(self): + import pytest + with patch("urllib.request.urlopen", side_effect=ConnectionError("fail")): + with pytest.raises(ConnectionError): + _search_twitch("test") + + def test_stream_without_game(self): + no_game = { + "data": { + "searchFor": { + "streams": { + "items": [{ + "id": "333", + "broadcaster": {"login": "nogame", "displayName": "NoGame"}, + "title": "Just chatting", + "game": None, + "viewersCount": 10, + }], + }, + "videos": {"items": []}, + }, + }, + } + + class FakeResp: + def read(self): + return json.dumps(no_game).encode() + def close(self): + pass + + with patch("urllib.request.urlopen", return_value=FakeResp()): + results = _search_twitch("chat") + assert len(results) == 1 + assert "()" not in results[0]["title"] + + +# --------------------------------------------------------------------------- +# TestStateHelpers +# --------------------------------------------------------------------------- + +class TestStateHelpers: + def test_save_and_load(self): + bot = _FakeBot() + data = {"keyword": "test", "name": "t"} + _save(bot, "#ch:t", data) + loaded = _load(bot, "#ch:t") + assert loaded == data + + def test_load_missing(self): + bot = _FakeBot() + assert _load(bot, "nonexistent") is None + + def test_delete(self): + bot = _FakeBot() + _save(bot, "#ch:t", {"name": "t"}) + _delete(bot, "#ch:t") + assert _load(bot, "#ch:t") is None + + def test_state_key(self): + assert _state_key("#ops", "mc-speed") == "#ops:mc-speed" + + def test_load_invalid_json(self): + bot = _FakeBot() + bot.state.set("alert", "bad", "not json{{{") + assert _load(bot, "bad") is None + + +# --------------------------------------------------------------------------- +# TestCmdAlertAdd +# --------------------------------------------------------------------------- + +class TestCmdAlertAdd: + def test_add_success(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await cmd_alert(bot, _msg("!alert add mc-speed minecraft speedrun")) + await asyncio.sleep(0) + assert len(bot.replied) == 1 + assert "Alert 'mc-speed' added" in bot.replied[0] + assert "minecraft speedrun" in bot.replied[0] + assert "2 yt" in bot.replied[0] + assert "2 tw" in bot.replied[0] + data = _load(bot, "#test:mc-speed") + assert data is not None + assert data["name"] == "mc-speed" + assert data["keyword"] == "minecraft speedrun" + assert data["channel"] == "#test" + assert len(data["seen"]["yt"]) == 2 + assert len(data["seen"]["tw"]) == 2 + assert "#test:mc-speed" in _pollers + _stop_poller("#test:mc-speed") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_add_requires_admin(self): + _clear() + bot = _FakeBot(admin=False) + asyncio.run(cmd_alert(bot, _msg("!alert add test keyword"))) + assert "Permission denied" in bot.replied[0] + + def test_add_requires_channel(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_alert(bot, _pm("!alert add test keyword"))) + assert "Use this command in a channel" in bot.replied[0] + + def test_add_invalid_name(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_alert(bot, _msg("!alert add BAD! keyword"))) + assert "Invalid name" in bot.replied[0] + + def test_add_missing_keyword(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_alert(bot, _msg("!alert add myname"))) + assert "Usage:" in bot.replied[0] + + def test_add_keyword_too_long(self): + _clear() + bot = _FakeBot(admin=True) + long_kw = "x" * 101 + asyncio.run(cmd_alert(bot, _msg(f"!alert add test {long_kw}"))) + assert "too long" in bot.replied[0] + + def test_add_duplicate(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await cmd_alert(bot, _msg("!alert add dupe some keyword")) + await asyncio.sleep(0) + bot.replied.clear() + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await cmd_alert(bot, _msg("!alert add dupe other keyword")) + assert "already exists" in bot.replied[0] + _stop_poller("#test:dupe") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_add_limit(self): + _clear() + bot = _FakeBot(admin=True) + for i in range(20): + _save(bot, f"#test:sub{i}", {"name": f"sub{i}", "channel": "#test"}) + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await cmd_alert(bot, _msg("!alert add overflow keyword")) + assert "limit reached" in bot.replied[0] + + asyncio.run(inner()) + + def test_add_seed_error_still_creates(self): + """If a backend fails during seeding, seen list is empty for that backend.""" + _clear() + bot = _FakeBot(admin=True) + backends = {"yt": _fake_yt, "tw": _fake_tw_error} + + async def inner(): + with patch.object(_mod, "_BACKENDS", backends): + await cmd_alert(bot, _msg("!alert add partial test keyword")) + await asyncio.sleep(0) + data = _load(bot, "#test:partial") + assert data is not None + assert len(data["seen"]["yt"]) == 2 + assert len(data["seen"]["tw"]) == 0 + _stop_poller("#test:partial") + await asyncio.sleep(0) + + asyncio.run(inner()) + + +# --------------------------------------------------------------------------- +# TestCmdAlertDel +# --------------------------------------------------------------------------- + +class TestCmdAlertDel: + def test_del_success(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await cmd_alert(bot, _msg("!alert add todel some keyword")) + await asyncio.sleep(0) + bot.replied.clear() + await cmd_alert(bot, _msg("!alert del todel")) + assert "Removed 'todel'" in bot.replied[0] + assert _load(bot, "#test:todel") is None + assert "#test:todel" not in _pollers + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_del_requires_admin(self): + _clear() + bot = _FakeBot(admin=False) + asyncio.run(cmd_alert(bot, _msg("!alert del test"))) + assert "Permission denied" in bot.replied[0] + + def test_del_requires_channel(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_alert(bot, _pm("!alert del test"))) + assert "Use this command in a channel" in bot.replied[0] + + def test_del_nonexistent(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_alert(bot, _msg("!alert del nosuch"))) + assert "No alert" in bot.replied[0] + + def test_del_no_name(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_alert(bot, _msg("!alert del"))) + assert "Usage:" in bot.replied[0] + + +# --------------------------------------------------------------------------- +# TestCmdAlertList +# --------------------------------------------------------------------------- + +class TestCmdAlertList: + def test_list_empty(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_alert(bot, _msg("!alert list"))) + assert "No alerts" in bot.replied[0] + + def test_list_populated(self): + _clear() + bot = _FakeBot() + _save(bot, "#test:mc", { + "name": "mc", "channel": "#test", "keyword": "minecraft", + "last_error": "", + }) + _save(bot, "#test:rl", { + "name": "rl", "channel": "#test", "keyword": "rocket league", + "last_error": "", + }) + asyncio.run(cmd_alert(bot, _msg("!alert list"))) + assert "Alerts:" in bot.replied[0] + assert "mc" in bot.replied[0] + assert "rl" in bot.replied[0] + + def test_list_shows_error(self): + _clear() + bot = _FakeBot() + _save(bot, "#test:broken", { + "name": "broken", "channel": "#test", "keyword": "test", + "last_error": "Connection refused", + }) + asyncio.run(cmd_alert(bot, _msg("!alert list"))) + assert "broken (error)" in bot.replied[0] + + def test_list_requires_channel(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_alert(bot, _pm("!alert list"))) + assert "Use this command in a channel" in bot.replied[0] + + def test_list_channel_isolation(self): + _clear() + bot = _FakeBot() + _save(bot, "#test:mine", { + "name": "mine", "channel": "#test", "keyword": "test", + "last_error": "", + }) + _save(bot, "#other:theirs", { + "name": "theirs", "channel": "#other", "keyword": "test", + "last_error": "", + }) + asyncio.run(cmd_alert(bot, _msg("!alert list"))) + assert "mine" in bot.replied[0] + assert "theirs" not in bot.replied[0] + + +# --------------------------------------------------------------------------- +# TestCmdAlertCheck +# --------------------------------------------------------------------------- + +class TestCmdAlertCheck: + def test_check_success(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "chk", "channel": "#test", + "interval": 300, "seen": {"yt": ["yt1", "yt2"], "tw": ["stream:tw1", "vod:tw2"]}, + "last_poll": "", "last_error": "", + } + _save(bot, "#test:chk", data) + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await cmd_alert(bot, _msg("!alert check chk")) + assert "chk: checked" in bot.replied[0] + + asyncio.run(inner()) + + def test_check_nonexistent(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_alert(bot, _msg("!alert check nope"))) + assert "No alert" in bot.replied[0] + + def test_check_requires_channel(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_alert(bot, _pm("!alert check test"))) + assert "Use this command in a channel" in bot.replied[0] + + def test_check_shows_error(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "errchk", "channel": "#test", + "interval": 300, "seen": {"yt": [], "tw": []}, + "last_poll": "", "last_error": "", + } + _save(bot, "#test:errchk", data) + backends = {"yt": _fake_yt_error, "tw": _fake_tw} + + async def inner(): + with patch.object(_mod, "_BACKENDS", backends): + await cmd_alert(bot, _msg("!alert check errchk")) + assert "error" in bot.replied[0].lower() + + asyncio.run(inner()) + + def test_check_announces_new_items(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "news", "channel": "#test", + "interval": 300, "seen": {"yt": ["yt1"], "tw": []}, + "last_poll": "", "last_error": "", + } + _save(bot, "#test:news", data) + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await cmd_alert(bot, _msg("!alert check news")) + # yt2 is new for yt, both tw results are new + announcements = [s for t, s in bot.sent if t == "#test"] + yt_msgs = [m for m in announcements if "/yt]" in m] + tw_msgs = [m for m in announcements if "/tw]" in m] + assert len(yt_msgs) == 1 # yt2 only + assert len(tw_msgs) == 2 # both tw results + + asyncio.run(inner()) + + def test_check_no_name(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_alert(bot, _msg("!alert check"))) + assert "Usage:" in bot.replied[0] + + +# --------------------------------------------------------------------------- +# TestPollOnce +# --------------------------------------------------------------------------- + +class TestPollOnce: + def test_new_items_announced(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "poll", "channel": "#test", + "interval": 300, "seen": {"yt": [], "tw": []}, + "last_poll": "", "last_error": "", + } + key = "#test:poll" + _save(bot, key, data) + _subscriptions[key] = data + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await _poll_once(bot, key, announce=True) + messages = [s for t, s in bot.sent if t == "#test"] + assert len(messages) == 4 # 2 yt + 2 tw + assert "[poll/yt]" in messages[0] + assert "[poll/tw]" in messages[2] + + asyncio.run(inner()) + + def test_dedup_no_repeat(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "dedup", "channel": "#test", + "interval": 300, + "seen": {"yt": ["yt1", "yt2"], "tw": ["stream:tw1", "vod:tw2"]}, + "last_poll": "", "last_error": "", + } + key = "#test:dedup" + _save(bot, key, data) + _subscriptions[key] = data + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await _poll_once(bot, key, announce=True) + assert len(bot.sent) == 0 + + asyncio.run(inner()) + + def test_max_announce_per_platform(self): + """Only MAX_ANNOUNCE items per platform, then '... and N more'.""" + _clear() + bot = _FakeBot() + + def fake_many(keyword): + return [ + {"id": f"v{i}", "title": f"Video {i}", + "url": f"https://example.com/{i}", "extra": ""} + for i in range(8) + ] + + data = { + "keyword": "test", "name": "many", "channel": "#test", + "interval": 300, "seen": {"yt": [], "tw": []}, + "last_poll": "", "last_error": "", + } + key = "#test:many" + _save(bot, key, data) + _subscriptions[key] = data + + async def inner(): + with patch.object(_mod, "_BACKENDS", {"yt": fake_many, "tw": _fake_tw}): + await _poll_once(bot, key, announce=True) + yt_msgs = [s for t, s in bot.sent if t == "#test" and "/yt]" in s] + assert len(yt_msgs) == _MAX_ANNOUNCE + 1 # 5 items + "... and 3 more" + assert "... and 3 more" in yt_msgs[-1] + + asyncio.run(inner()) + + def test_partial_backend_failure(self): + """One backend fails, other still works. Error counter increments.""" + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "partial", "channel": "#test", + "interval": 300, "seen": {"yt": [], "tw": []}, + "last_poll": "", "last_error": "", + } + key = "#test:partial" + _save(bot, key, data) + _subscriptions[key] = data + backends = {"yt": _fake_yt_error, "tw": _fake_tw} + + async def inner(): + with patch.object(_mod, "_BACKENDS", backends): + await _poll_once(bot, key, announce=True) + # Twitch results should still be announced + tw_msgs = [s for t, s in bot.sent if t == "#test" and "/tw]" in s] + assert len(tw_msgs) == 2 + # Error counter should be incremented + assert _errors[key] == 1 + updated = _load(bot, key) + assert "yt:" in updated["last_error"] + + asyncio.run(inner()) + + def test_no_announce_flag(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "quiet", "channel": "#test", + "interval": 300, "seen": {"yt": [], "tw": []}, + "last_poll": "", "last_error": "", + } + key = "#test:quiet" + _save(bot, key, data) + _subscriptions[key] = data + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await _poll_once(bot, key, announce=False) + assert len(bot.sent) == 0 + updated = _load(bot, key) + assert len(updated["seen"]["yt"]) == 2 + assert len(updated["seen"]["tw"]) == 2 + + asyncio.run(inner()) + + def test_seen_cap(self): + """Seen list is capped at MAX_SEEN per platform.""" + _clear() + bot = _FakeBot() + + def fake_many(keyword): + return [ + {"id": f"v{i}", "title": f"V{i}", "url": "", "extra": ""} + for i in range(250) + ] + + data = { + "keyword": "test", "name": "cap", "channel": "#test", + "interval": 300, "seen": {"yt": [], "tw": []}, + "last_poll": "", "last_error": "", + } + key = "#test:cap" + _save(bot, key, data) + _subscriptions[key] = data + + async def inner(): + with patch.object(_mod, "_BACKENDS", {"yt": fake_many, "tw": _fake_tw}): + await _poll_once(bot, key, announce=False) + updated = _load(bot, key) + assert len(updated["seen"]["yt"]) == _MAX_SEEN + # Oldest entries should have been evicted + assert updated["seen"]["yt"][0] == "v50" + + asyncio.run(inner()) + + def test_all_backends_error(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "allerr", "channel": "#test", + "interval": 300, "seen": {"yt": [], "tw": []}, + "last_poll": "", "last_error": "", + } + key = "#test:allerr" + _save(bot, key, data) + _subscriptions[key] = data + backends = {"yt": _fake_yt_error, "tw": _fake_tw_error} + + async def inner(): + with patch.object(_mod, "_BACKENDS", backends): + await _poll_once(bot, key, announce=True) + assert _errors[key] == 1 + assert len(bot.sent) == 0 + + asyncio.run(inner()) + + def test_success_clears_error(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "clrerr", "channel": "#test", + "interval": 300, "seen": {"yt": ["yt1", "yt2"], "tw": ["stream:tw1", "vod:tw2"]}, + "last_poll": "", "last_error": "old error", + } + key = "#test:clrerr" + _save(bot, key, data) + _subscriptions[key] = data + _errors[key] = 3 + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await _poll_once(bot, key, announce=True) + assert _errors[key] == 0 + updated = _load(bot, key) + assert updated["last_error"] == "" + + asyncio.run(inner()) + + +# --------------------------------------------------------------------------- +# TestRestore +# --------------------------------------------------------------------------- + +class TestRestore: + def test_restore_spawns_pollers(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "restored", "channel": "#test", + "interval": 300, "seen": {"yt": [], "tw": []}, + "last_poll": "", "last_error": "", + } + _save(bot, "#test:restored", data) + + async def inner(): + _restore(bot) + assert "#test:restored" in _pollers + task = _pollers["#test:restored"] + assert not task.done() + _stop_poller("#test:restored") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_restore_skips_active(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "active", "channel": "#test", + "interval": 300, "seen": {"yt": [], "tw": []}, + "last_poll": "", "last_error": "", + } + _save(bot, "#test:active", data) + + async def inner(): + dummy = asyncio.create_task(asyncio.sleep(9999)) + _pollers["#test:active"] = dummy + _restore(bot) + assert _pollers["#test:active"] is dummy + dummy.cancel() + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_restore_replaces_done_task(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "done", "channel": "#test", + "interval": 300, "seen": {"yt": [], "tw": []}, + "last_poll": "", "last_error": "", + } + _save(bot, "#test:done", data) + + async def inner(): + done_task = asyncio.create_task(asyncio.sleep(0)) + await done_task + _pollers["#test:done"] = done_task + _restore(bot) + new_task = _pollers["#test:done"] + assert new_task is not done_task + assert not new_task.done() + _stop_poller("#test:done") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_restore_skips_bad_json(self): + _clear() + bot = _FakeBot() + bot.state.set("alert", "#test:bad", "not json{{{") + + async def inner(): + _restore(bot) + assert "#test:bad" not in _pollers + + asyncio.run(inner()) + + def test_on_connect_calls_restore(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "conn", "channel": "#test", + "interval": 300, "seen": {"yt": [], "tw": []}, + "last_poll": "", "last_error": "", + } + _save(bot, "#test:conn", data) + + async def inner(): + msg = _msg("", target="botname") + await on_connect(bot, msg) + assert "#test:conn" in _pollers + _stop_poller("#test:conn") + await asyncio.sleep(0) + + asyncio.run(inner()) + + +# --------------------------------------------------------------------------- +# TestPollerManagement +# --------------------------------------------------------------------------- + +class TestPollerManagement: + def test_start_and_stop(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "mgmt", "channel": "#test", + "interval": 300, "seen": {"yt": [], "tw": []}, + "last_poll": "", "last_error": "", + } + key = "#test:mgmt" + _save(bot, key, data) + _subscriptions[key] = data + + async def inner(): + _start_poller(bot, key) + assert key in _pollers + assert not _pollers[key].done() + _stop_poller(key) + await asyncio.sleep(0) + assert key not in _pollers + assert key not in _subscriptions + + asyncio.run(inner()) + + def test_start_idempotent(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "idem", "channel": "#test", + "interval": 300, "seen": {"yt": [], "tw": []}, + "last_poll": "", "last_error": "", + } + key = "#test:idem" + _save(bot, key, data) + _subscriptions[key] = data + + async def inner(): + _start_poller(bot, key) + first = _pollers[key] + _start_poller(bot, key) + assert _pollers[key] is first + _stop_poller(key) + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_stop_nonexistent(self): + _clear() + _stop_poller("#test:nonexistent") + + +# --------------------------------------------------------------------------- +# TestCmdAlertUsage +# --------------------------------------------------------------------------- + +class TestCmdAlertUsage: + def test_no_args(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_alert(bot, _msg("!alert"))) + assert "Usage:" in bot.replied[0] + + def test_unknown_subcommand(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_alert(bot, _msg("!alert foobar"))) + assert "Usage:" in bot.replied[0]