diff --git a/plugins/twitch.py b/plugins/twitch.py new file mode 100644 index 0000000..201b597 --- /dev/null +++ b/plugins/twitch.py @@ -0,0 +1,434 @@ +"""Plugin: Twitch livestream notifications via public GQL endpoint.""" + +from __future__ import annotations + +import asyncio +import json +import re +import ssl +import urllib.request +from datetime import datetime, timezone + +from derp.plugin import command, event + +# -- Constants --------------------------------------------------------------- + +_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,19}$") +_TWITCH_LOGIN_RE = re.compile(r"^[a-zA-Z0-9_]{1,25}$") +_GQL_URL = "https://gql.twitch.tv/gql" +_GQL_CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko" +_DEFAULT_INTERVAL = 120 +_MAX_INTERVAL = 3600 +_FETCH_TIMEOUT = 10 +_MAX_TITLE_LEN = 80 +_MAX_STREAMERS = 20 + +# -- Module-level tracking --------------------------------------------------- + +_pollers: dict[str, asyncio.Task] = {} +_streamers: dict[str, dict] = {} +_errors: dict[str, int] = {} + + +# -- Pure helpers ------------------------------------------------------------ + +def _state_key(channel: str, name: str) -> str: + """Build composite state key.""" + return f"{channel}:{name}" + + +def _validate_name(name: str) -> bool: + """Check name against allowed pattern.""" + return bool(_NAME_RE.match(name)) + + +def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str: + """Truncate text with ellipsis if needed.""" + if len(text) <= max_len: + return text + return text[: max_len - 3].rstrip() + "..." + + +# -- Blocking helpers (for executor) ----------------------------------------- + +def _query_stream(login: str) -> dict: + """Blocking GQL query. Returns normalised stream info. + + Keys: exists, login, display_name, live, stream_id, title, game, + viewers, error. + """ + result: dict = { + "exists": False, + "login": "", + "display_name": "", + "live": False, + "stream_id": "", + "title": "", + "game": "", + "viewers": 0, + "error": "", + } + + query = ( + 'query{user(login:"' + login + '"){login displayName ' + "stream{id title game{name}viewersCount}}}" + ) + body = json.dumps({"query": query}).encode() + + req = urllib.request.Request(_GQL_URL, data=body, method="POST") + req.add_header("Client-Id", _GQL_CLIENT_ID) + req.add_header("Content-Type", "application/json") + + ctx = ssl.create_default_context() + + try: + resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT, context=ctx) + raw = resp.read() + resp.close() + data = json.loads(raw) + except Exception as exc: + result["error"] = str(exc) + return result + + try: + user = data["data"]["user"] + except (KeyError, TypeError): + result["error"] = "Unexpected GQL response" + return result + + if user is None: + return result # exists=False + + result["exists"] = True + result["login"] = user.get("login", "") + result["display_name"] = user.get("displayName", "") + + stream = user.get("stream") + if stream is not None: + result["live"] = True + result["stream_id"] = str(stream.get("id", "")) + result["title"] = stream.get("title", "") + game = stream.get("game") + result["game"] = game.get("name", "") if game else "" + result["viewers"] = stream.get("viewersCount", 0) + + return result + + +# -- State helpers ----------------------------------------------------------- + +def _save(bot, key: str, data: dict) -> None: + """Persist streamer data to bot.state.""" + bot.state.set("twitch", key, json.dumps(data)) + + +def _load(bot, key: str) -> dict | None: + """Load streamer data from bot.state.""" + raw = bot.state.get("twitch", key) + if raw is None: + return None + try: + return json.loads(raw) + except json.JSONDecodeError: + return None + + +def _delete(bot, key: str) -> None: + """Remove streamer data from bot.state.""" + bot.state.delete("twitch", key) + + +# -- Polling ----------------------------------------------------------------- + +async def _poll_once(bot, key: str, announce: bool = True) -> None: + """Single poll cycle for one Twitch streamer.""" + data = _streamers.get(key) + if data is None: + data = _load(bot, key) + if data is None: + return + _streamers[key] = data + + login = data["login"] + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_stream, login) + + now = datetime.now(timezone.utc).isoformat() + data["last_poll"] = now + + if result["error"]: + data["last_error"] = result["error"] + _errors[key] = _errors.get(key, 0) + 1 + _streamers[key] = data + _save(bot, key, data) + return + + data["last_error"] = "" + _errors[key] = 0 + + was_live = data.get("was_live", False) + old_stream_id = data.get("stream_id", "") + + if result["live"]: + new_stream_id = result["stream_id"] + data["last_title"] = result["title"] + data["last_game"] = result["game"] + + if announce and (not was_live or new_stream_id != old_stream_id): + channel = data["channel"] + name = data["name"] + title = _truncate(result["title"]) if result["title"] else "(no title)" + game = result["game"] + line = f"[{name}] is live: {title}" + if game: + line += f" ({game})" + line += f" -- https://twitch.tv/{login}" + await bot.send(channel, line) + + data["was_live"] = True + data["stream_id"] = new_stream_id + else: + data["was_live"] = False + + _streamers[key] = data + _save(bot, key, data) + + +async def _poll_loop(bot, key: str) -> None: + """Infinite poll loop for one Twitch streamer.""" + try: + while True: + data = _streamers.get(key) or _load(bot, key) + if data is None: + return + interval = data.get("interval", _DEFAULT_INTERVAL) + errs = _errors.get(key, 0) + if errs >= 5: + interval = min(interval * 2, _MAX_INTERVAL) + await asyncio.sleep(interval) + await _poll_once(bot, key, announce=True) + except asyncio.CancelledError: + pass + + +def _start_poller(bot, key: str) -> None: + """Create and track a poller task.""" + existing = _pollers.get(key) + if existing and not existing.done(): + return + task = asyncio.create_task(_poll_loop(bot, key)) + _pollers[key] = task + + +def _stop_poller(key: str) -> None: + """Cancel and remove a poller task.""" + task = _pollers.pop(key, None) + if task and not task.done(): + task.cancel() + _streamers.pop(key, None) + _errors.pop(key, 0) + + +# -- Restore on connect ----------------------------------------------------- + +def _restore(bot) -> None: + """Rebuild pollers from persisted state.""" + for key in bot.state.keys("twitch"): + existing = _pollers.get(key) + if existing and not existing.done(): + continue + data = _load(bot, key) + if data is None: + continue + _streamers[key] = data + _start_poller(bot, key) + + +@event("001") +async def on_connect(bot, message): + """Restore Twitch streamer pollers on connect.""" + _restore(bot) + + +# -- Command handler --------------------------------------------------------- + +@command("twitch", help="Twitch: !twitch follow|unfollow|list|check") +async def cmd_twitch(bot, message): + """Per-channel Twitch livestream subscriptions. + + Usage: + !twitch follow [name] Follow a streamer (admin) + !twitch unfollow Unfollow a streamer (admin) + !twitch list List followed streamers + !twitch check Check status now + """ + parts = message.text.split(None, 3) + if len(parts) < 2: + await bot.reply(message, "Usage: !twitch [args]") + return + + sub = parts[1].lower() + + # -- list (any user, channel only) ---------------------------------------- + if sub == "list": + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + channel = message.target + prefix = f"{channel}:" + streamers = [] + for key in bot.state.keys("twitch"): + if key.startswith(prefix): + data = _load(bot, key) + if data: + name = data["name"] + err = data.get("last_error", "") + live = data.get("was_live", False) + if err: + streamers.append(f"{name} (error)") + elif live: + streamers.append(f"{name} (live)") + else: + streamers.append(name) + if not streamers: + await bot.reply(message, "No Twitch streamers in this channel") + return + await bot.reply(message, f"Twitch: {', '.join(streamers)}") + return + + # -- check (any user, channel only) --------------------------------------- + if sub == "check": + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + if len(parts) < 3: + await bot.reply(message, "Usage: !twitch check ") + return + name = parts[2].lower() + channel = message.target + key = _state_key(channel, name) + data = _load(bot, key) + if data is None: + await bot.reply(message, f"No streamer '{name}' in this channel") + return + _streamers[key] = data + await _poll_once(bot, key, announce=True) + data = _streamers.get(key, data) + if data.get("last_error"): + await bot.reply(message, f"{name}: error -- {data['last_error']}") + elif data.get("was_live"): + title = _truncate(data.get("last_title", "")) + game = data.get("last_game", "") + line = f"{name}: live -- {title}" + if game: + line += f" ({game})" + await bot.reply(message, line) + else: + await bot.reply(message, f"{name}: offline") + return + + # -- follow (admin, channel only) ----------------------------------------- + if sub == "follow": + if not bot._is_admin(message): + await bot.reply(message, "Permission denied: follow requires admin") + return + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + if len(parts) < 3: + await bot.reply(message, "Usage: !twitch follow [name]") + return + + username = parts[2] + if not _TWITCH_LOGIN_RE.match(username): + await bot.reply(message, "Invalid Twitch username") + return + + # Query GQL to verify user exists and get display name + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_stream, username) + + if result["error"]: + await bot.reply(message, f"GQL query failed: {result['error']}") + return + if not result["exists"]: + await bot.reply(message, f"Twitch user '{username}' not found") + return + + login = result["login"] + display_name = result["display_name"] + name = parts[3].lower() if len(parts) > 3 else login.lower() + + if not _validate_name(name): + await bot.reply( + message, + "Invalid name (lowercase alphanumeric + hyphens, 1-20 chars)", + ) + return + + irc_channel = message.target + key = _state_key(irc_channel, name) + + if _load(bot, key) is not None: + await bot.reply(message, f"Streamer '{name}' already exists in this channel") + return + + ch_prefix = f"{irc_channel}:" + count = sum(1 for k in bot.state.keys("twitch") if k.startswith(ch_prefix)) + if count >= _MAX_STREAMERS: + await bot.reply(message, f"Streamer limit reached ({_MAX_STREAMERS})") + return + + now = datetime.now(timezone.utc).isoformat() + data = { + "login": login, + "display_name": display_name, + "name": name, + "channel": irc_channel, + "interval": _DEFAULT_INTERVAL, + "added_by": message.nick, + "added_at": now, + "was_live": result["live"], + "stream_id": result["stream_id"], + "last_title": result["title"], + "last_game": result["game"], + "last_poll": now, + "last_error": "", + } + _save(bot, key, data) + _streamers[key] = data + _start_poller(bot, key) + + reply = f"Following '{name}' ({display_name})" + if result["live"]: + reply += " [live]" + await bot.reply(message, reply) + return + + # -- unfollow (admin, channel only) --------------------------------------- + if sub == "unfollow": + if not bot._is_admin(message): + await bot.reply(message, "Permission denied: unfollow requires admin") + return + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + if len(parts) < 3: + await bot.reply(message, "Usage: !twitch unfollow ") + return + + name = parts[2].lower() + channel = message.target + key = _state_key(channel, name) + + if _load(bot, key) is None: + await bot.reply(message, f"No streamer '{name}' in this channel") + return + + _stop_poller(key) + _delete(bot, key) + await bot.reply(message, f"Unfollowed '{name}'") + return + + await bot.reply(message, "Usage: !twitch [args]") diff --git a/tests/test_twitch.py b/tests/test_twitch.py new file mode 100644 index 0000000..778d142 --- /dev/null +++ b/tests/test_twitch.py @@ -0,0 +1,1134 @@ +"""Tests for the Twitch livestream notification plugin.""" + +import asyncio +import importlib.util +import sys +from pathlib import Path +from unittest.mock import patch + +from derp.irc import Message + +# plugins/ is not a Python package -- load the module from file path +_spec = importlib.util.spec_from_file_location( + "plugins.twitch", Path(__file__).resolve().parent.parent / "plugins" / "twitch.py", +) +_mod = importlib.util.module_from_spec(_spec) +sys.modules[_spec.name] = _mod +_spec.loader.exec_module(_mod) + +from plugins.twitch import ( # noqa: E402 + _delete, + _errors, + _load, + _poll_once, + _pollers, + _restore, + _save, + _start_poller, + _state_key, + _stop_poller, + _streamers, + _truncate, + _validate_name, + cmd_twitch, + on_connect, +) + +# -- Fixtures ---------------------------------------------------------------- + +GQL_LIVE = { + "data": { + "user": { + "login": "xqc", + "displayName": "xQc", + "stream": { + "id": "12345", + "title": "Playing games", + "game": {"name": "Fortnite"}, + "viewersCount": 50000, + }, + }, + }, +} + +GQL_OFFLINE = { + "data": { + "user": { + "login": "xqc", + "displayName": "xQc", + "stream": None, + }, + }, +} + +GQL_NOT_FOUND = { + "data": { + "user": None, + }, +} + +GQL_LIVE_NO_GAME = { + "data": { + "user": { + "login": "streamer", + "displayName": "Streamer", + "stream": { + "id": "99999", + "title": "Just chatting", + "game": None, + "viewersCount": 100, + }, + }, + }, +} + +GQL_LIVE_NEW_STREAM = { + "data": { + "user": { + "login": "xqc", + "displayName": "xQc", + "stream": { + "id": "67890", + "title": "New stream", + "game": {"name": "Minecraft"}, + "viewersCount": 40000, + }, + }, + }, +} + + +# -- Helpers ----------------------------------------------------------------- + +class _FakeState: + """In-memory stand-in for bot.state.""" + + def __init__(self): + self._store: dict[str, dict[str, str]] = {} + + def get(self, plugin: str, key: str, default: str | None = None) -> str | None: + return self._store.get(plugin, {}).get(key, default) + + def set(self, plugin: str, key: str, value: str) -> None: + self._store.setdefault(plugin, {})[key] = value + + def delete(self, plugin: str, key: str) -> bool: + try: + del self._store[plugin][key] + return True + except KeyError: + return False + + def keys(self, plugin: str) -> list[str]: + return sorted(self._store.get(plugin, {}).keys()) + + +class _FakeBot: + """Minimal bot stand-in that captures sent/replied messages.""" + + def __init__(self, *, admin: bool = False): + self.sent: list[tuple[str, str]] = [] + self.replied: list[str] = [] + self.state = _FakeState() + self._admin = admin + + async def send(self, target: str, text: str) -> None: + self.sent.append((target, text)) + + async def reply(self, message, text: str) -> None: + self.replied.append(text) + + def _is_admin(self, message) -> bool: + return self._admin + + +def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message: + """Create a channel PRIVMSG.""" + return Message( + raw="", prefix=f"{nick}!~{nick}@host", nick=nick, + command="PRIVMSG", params=[target, text], tags={}, + ) + + +def _pm(text: str, nick: str = "alice") -> Message: + """Create a private PRIVMSG.""" + return Message( + raw="", prefix=f"{nick}!~{nick}@host", nick=nick, + command="PRIVMSG", params=["botname", text], tags={}, + ) + + +def _clear() -> None: + """Reset module-level state between tests.""" + for task in _pollers.values(): + if task and not task.done(): + task.cancel() + _pollers.clear() + _streamers.clear() + _errors.clear() + + +def _fake_query_live(login): + """Fake GQL query: streamer is live.""" + return { + "exists": True, + "login": "xqc", + "display_name": "xQc", + "live": True, + "stream_id": "12345", + "title": "Playing games", + "game": "Fortnite", + "viewers": 50000, + "error": "", + } + + +def _fake_query_offline(login): + """Fake GQL query: streamer is offline.""" + return { + "exists": True, + "login": "xqc", + "display_name": "xQc", + "live": False, + "stream_id": "", + "title": "", + "game": "", + "viewers": 0, + "error": "", + } + + +def _fake_query_not_found(login): + """Fake GQL query: user does not exist.""" + return { + "exists": False, + "login": "", + "display_name": "", + "live": False, + "stream_id": "", + "title": "", + "game": "", + "viewers": 0, + "error": "", + } + + +def _fake_query_error(login): + """Fake GQL query: network error.""" + return { + "exists": False, + "login": "", + "display_name": "", + "live": False, + "stream_id": "", + "title": "", + "game": "", + "viewers": 0, + "error": "Connection refused", + } + + +def _fake_query_live_new_stream(login): + """Fake GQL query: live with different stream_id.""" + return { + "exists": True, + "login": "xqc", + "display_name": "xQc", + "live": True, + "stream_id": "67890", + "title": "New stream", + "game": "Minecraft", + "viewers": 40000, + "error": "", + } + + +def _fake_query_live_no_game(login): + """Fake GQL query: live but no game set.""" + return { + "exists": True, + "login": "streamer", + "display_name": "Streamer", + "live": True, + "stream_id": "99999", + "title": "Just chatting", + "game": "", + "viewers": 100, + "error": "", + } + + +# --------------------------------------------------------------------------- +# TestValidateName +# --------------------------------------------------------------------------- + +class TestValidateName: + def test_valid_simple(self): + assert _validate_name("xqc") is True + + def test_valid_with_hyphens(self): + assert _validate_name("my-streamer") is True + + def test_valid_with_numbers(self): + assert _validate_name("stream3r") is True + + def test_invalid_uppercase(self): + assert _validate_name("XQC") is False + + def test_invalid_underscore(self): + assert _validate_name("my_streamer") is False + + def test_invalid_starts_with_hyphen(self): + assert _validate_name("-name") is False + + def test_invalid_too_long(self): + assert _validate_name("a" * 21) is False + + def test_invalid_empty(self): + assert _validate_name("") is False + + def test_valid_single_char(self): + assert _validate_name("a") is True + + def test_valid_max_length(self): + assert _validate_name("a" * 20) is True + + +# --------------------------------------------------------------------------- +# TestTruncate +# --------------------------------------------------------------------------- + +class TestTruncate: + def test_short_text_unchanged(self): + assert _truncate("hello", 80) == "hello" + + def test_exact_length_unchanged(self): + text = "a" * 80 + assert _truncate(text, 80) == text + + def test_long_text_truncated(self): + text = "a" * 100 + result = _truncate(text, 80) + assert len(result) == 80 + assert result.endswith("...") + + def test_default_max_length(self): + text = "a" * 100 + result = _truncate(text) + assert len(result) == 80 + + def test_trailing_space_stripped(self): + text = "word " * 20 + result = _truncate(text, 20) + assert not result.endswith(" ...") + + +# --------------------------------------------------------------------------- +# TestQueryStream +# --------------------------------------------------------------------------- + +class _FakeGqlResp: + """Fake urllib response for GQL tests.""" + + def __init__(self, data): + import json as _json + self._data = _json.dumps(data).encode() + + def read(self): + return self._data + + def close(self): + pass + + +class TestQueryStream: + """Test _query_stream response parsing with mocked HTTP.""" + + def test_live_response(self): + with patch("urllib.request.urlopen", return_value=_FakeGqlResp(GQL_LIVE)): + result = _mod._query_stream("xqc") + assert result["exists"] is True + assert result["live"] is True + assert result["login"] == "xqc" + assert result["display_name"] == "xQc" + assert result["stream_id"] == "12345" + assert result["title"] == "Playing games" + assert result["game"] == "Fortnite" + assert result["viewers"] == 50000 + assert result["error"] == "" + + def test_offline_response(self): + with patch("urllib.request.urlopen", return_value=_FakeGqlResp(GQL_OFFLINE)): + result = _mod._query_stream("xqc") + assert result["exists"] is True + assert result["live"] is False + assert result["login"] == "xqc" + assert result["stream_id"] == "" + + def test_not_found_response(self): + with patch("urllib.request.urlopen", return_value=_FakeGqlResp(GQL_NOT_FOUND)): + result = _mod._query_stream("nobody") + assert result["exists"] is False + assert result["live"] is False + + def test_no_game_response(self): + with patch("urllib.request.urlopen", return_value=_FakeGqlResp(GQL_LIVE_NO_GAME)): + result = _mod._query_stream("streamer") + assert result["exists"] is True + assert result["live"] is True + assert result["game"] == "" + + def test_network_error(self): + with patch("urllib.request.urlopen", side_effect=Exception("timeout")): + result = _mod._query_stream("xqc") + assert result["error"] == "timeout" + assert result["exists"] is False + + +# --------------------------------------------------------------------------- +# TestStateHelpers +# --------------------------------------------------------------------------- + +class TestStateHelpers: + def test_save_and_load(self): + bot = _FakeBot() + data = {"login": "xqc", "name": "xqc"} + _save(bot, "#ch:xqc", data) + loaded = _load(bot, "#ch:xqc") + assert loaded == data + + def test_load_missing(self): + bot = _FakeBot() + assert _load(bot, "nonexistent") is None + + def test_delete(self): + bot = _FakeBot() + _save(bot, "#ch:xqc", {"name": "xqc"}) + _delete(bot, "#ch:xqc") + assert _load(bot, "#ch:xqc") is None + + def test_state_key(self): + assert _state_key("#ops", "xqc") == "#ops:xqc" + + def test_load_invalid_json(self): + bot = _FakeBot() + bot.state.set("twitch", "bad", "not json{{{") + assert _load(bot, "bad") is None + + +# --------------------------------------------------------------------------- +# TestCmdTwitchFollow +# --------------------------------------------------------------------------- + +class TestCmdTwitchFollow: + def test_follow_success(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_offline): + await cmd_twitch(bot, _msg("!twitch follow xqc")) + await asyncio.sleep(0) + assert len(bot.replied) == 1 + assert "Following 'xqc'" in bot.replied[0] + assert "(xQc)" in bot.replied[0] + data = _load(bot, "#test:xqc") + assert data is not None + assert data["login"] == "xqc" + assert data["name"] == "xqc" + assert data["channel"] == "#test" + assert data["was_live"] is False + assert "#test:xqc" in _pollers + _stop_poller("#test:xqc") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_follow_with_custom_name(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_offline): + await cmd_twitch(bot, _msg("!twitch follow xqc my-streamer")) + await asyncio.sleep(0) + assert "Following 'my-streamer'" in bot.replied[0] + data = _load(bot, "#test:my-streamer") + assert data is not None + assert data["name"] == "my-streamer" + _stop_poller("#test:my-streamer") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_follow_live_seeded(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_live): + await cmd_twitch(bot, _msg("!twitch follow xqc")) + await asyncio.sleep(0) + assert "[live]" in bot.replied[0] + data = _load(bot, "#test:xqc") + assert data["was_live"] is True + assert data["stream_id"] == "12345" + # Should NOT have announced (seed, not transition) + assert len(bot.sent) == 0 + _stop_poller("#test:xqc") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_follow_requires_admin(self): + _clear() + bot = _FakeBot(admin=False) + asyncio.run(cmd_twitch(bot, _msg("!twitch follow xqc"))) + assert "Permission denied" in bot.replied[0] + + def test_follow_requires_channel(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_twitch(bot, _pm("!twitch follow xqc"))) + assert "Use this command in a channel" in bot.replied[0] + + def test_follow_invalid_twitch_username(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_twitch(bot, _msg("!twitch follow @invalid!"))) + assert "Invalid Twitch username" in bot.replied[0] + + def test_follow_user_not_found(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_not_found): + await cmd_twitch(bot, _msg("!twitch follow nobody")) + assert "not found" in bot.replied[0] + + asyncio.run(inner()) + + def test_follow_gql_error(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_error): + await cmd_twitch(bot, _msg("!twitch follow xqc")) + assert "GQL query failed" in bot.replied[0] + + asyncio.run(inner()) + + def test_follow_invalid_name(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_offline): + await cmd_twitch(bot, _msg("!twitch follow xqc BAD!")) + assert "Invalid name" in bot.replied[0] + + asyncio.run(inner()) + + def test_follow_duplicate(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_offline): + await cmd_twitch(bot, _msg("!twitch follow xqc")) + await asyncio.sleep(0) + bot.replied.clear() + with patch.object(_mod, "_query_stream", _fake_query_offline): + await cmd_twitch(bot, _msg("!twitch follow xqc")) + assert "already exists" in bot.replied[0] + _clear() + + asyncio.run(inner()) + + def test_follow_channel_limit(self): + _clear() + bot = _FakeBot(admin=True) + for i in range(20): + _save(bot, f"#test:s{i}", {"name": f"s{i}", "channel": "#test"}) + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_offline): + await cmd_twitch(bot, _msg("!twitch follow xqc")) + assert "limit reached" in bot.replied[0] + + asyncio.run(inner()) + + def test_follow_no_username(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_twitch(bot, _msg("!twitch follow"))) + assert "Usage:" in bot.replied[0] + + +# --------------------------------------------------------------------------- +# TestCmdTwitchUnfollow +# --------------------------------------------------------------------------- + +class TestCmdTwitchUnfollow: + def test_unfollow_success(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_offline): + await cmd_twitch(bot, _msg("!twitch follow xqc")) + await asyncio.sleep(0) + bot.replied.clear() + await cmd_twitch(bot, _msg("!twitch unfollow xqc")) + assert "Unfollowed 'xqc'" in bot.replied[0] + assert _load(bot, "#test:xqc") is None + assert "#test:xqc" not in _pollers + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_unfollow_requires_admin(self): + _clear() + bot = _FakeBot(admin=False) + asyncio.run(cmd_twitch(bot, _msg("!twitch unfollow xqc"))) + assert "Permission denied" in bot.replied[0] + + def test_unfollow_requires_channel(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_twitch(bot, _pm("!twitch unfollow xqc"))) + assert "Use this command in a channel" in bot.replied[0] + + def test_unfollow_nonexistent(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_twitch(bot, _msg("!twitch unfollow nobody"))) + assert "No streamer" in bot.replied[0] + + def test_unfollow_no_name(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_twitch(bot, _msg("!twitch unfollow"))) + assert "Usage:" in bot.replied[0] + + +# --------------------------------------------------------------------------- +# TestCmdTwitchList +# --------------------------------------------------------------------------- + +class TestCmdTwitchList: + def test_list_empty(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_twitch(bot, _msg("!twitch list"))) + assert "No Twitch streamers" in bot.replied[0] + + def test_list_populated(self): + _clear() + bot = _FakeBot() + _save(bot, "#test:xqc", { + "name": "xqc", "channel": "#test", + "last_error": "", "was_live": False, + }) + _save(bot, "#test:shroud", { + "name": "shroud", "channel": "#test", + "last_error": "", "was_live": False, + }) + asyncio.run(cmd_twitch(bot, _msg("!twitch list"))) + assert "Twitch:" in bot.replied[0] + assert "xqc" in bot.replied[0] + assert "shroud" in bot.replied[0] + + def test_list_shows_error(self): + _clear() + bot = _FakeBot() + _save(bot, "#test:broken", { + "name": "broken", "channel": "#test", + "last_error": "Connection refused", "was_live": False, + }) + asyncio.run(cmd_twitch(bot, _msg("!twitch list"))) + assert "broken (error)" in bot.replied[0] + + def test_list_shows_live(self): + _clear() + bot = _FakeBot() + _save(bot, "#test:xqc", { + "name": "xqc", "channel": "#test", + "last_error": "", "was_live": True, + }) + asyncio.run(cmd_twitch(bot, _msg("!twitch list"))) + assert "xqc (live)" in bot.replied[0] + + def test_list_requires_channel(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_twitch(bot, _pm("!twitch list"))) + assert "Use this command in a channel" in bot.replied[0] + + def test_list_only_this_channel(self): + _clear() + bot = _FakeBot() + _save(bot, "#test:mine", { + "name": "mine", "channel": "#test", + "last_error": "", "was_live": False, + }) + _save(bot, "#other:theirs", { + "name": "theirs", "channel": "#other", + "last_error": "", "was_live": False, + }) + asyncio.run(cmd_twitch(bot, _msg("!twitch list"))) + assert "mine" in bot.replied[0] + assert "theirs" not in bot.replied[0] + + +# --------------------------------------------------------------------------- +# TestCmdTwitchCheck +# --------------------------------------------------------------------------- + +class TestCmdTwitchCheck: + def test_check_offline(self): + _clear() + bot = _FakeBot() + data = { + "login": "xqc", "name": "xqc", "channel": "#test", + "interval": 120, "was_live": False, "stream_id": "", + "last_title": "", "last_game": "", + "last_poll": "", "last_error": "", + } + _save(bot, "#test:xqc", data) + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_offline): + await cmd_twitch(bot, _msg("!twitch check xqc")) + assert "xqc: offline" in bot.replied[0] + + asyncio.run(inner()) + + def test_check_live(self): + _clear() + bot = _FakeBot() + data = { + "login": "xqc", "name": "xqc", "channel": "#test", + "interval": 120, "was_live": False, "stream_id": "", + "last_title": "", "last_game": "", + "last_poll": "", "last_error": "", + } + _save(bot, "#test:xqc", data) + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_live): + await cmd_twitch(bot, _msg("!twitch check xqc")) + # Should announce (offline -> live transition) + announcements = [s for t, s in bot.sent if t == "#test"] + assert len(announcements) == 1 + assert "[xqc] is live" in announcements[0] + assert "Fortnite" in announcements[0] + # Check reply shows live status + assert "xqc: live" in bot.replied[0] + + asyncio.run(inner()) + + def test_check_nonexistent(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_twitch(bot, _msg("!twitch check nope"))) + assert "No streamer" in bot.replied[0] + + def test_check_requires_channel(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_twitch(bot, _pm("!twitch check xqc"))) + assert "Use this command in a channel" in bot.replied[0] + + def test_check_shows_error(self): + _clear() + bot = _FakeBot() + data = { + "login": "xqc", "name": "xqc", "channel": "#test", + "interval": 120, "was_live": False, "stream_id": "", + "last_title": "", "last_game": "", + "last_poll": "", "last_error": "", + } + _save(bot, "#test:xqc", data) + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_error): + await cmd_twitch(bot, _msg("!twitch check xqc")) + assert "error" in bot.replied[0].lower() + + asyncio.run(inner()) + + def test_check_no_name(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_twitch(bot, _msg("!twitch check"))) + assert "Usage:" in bot.replied[0] + + +# --------------------------------------------------------------------------- +# TestPollOnce +# --------------------------------------------------------------------------- + +class TestPollOnce: + def test_offline_to_online(self): + """Transition from offline to live triggers announcement.""" + _clear() + bot = _FakeBot() + data = { + "login": "xqc", "name": "xqc", "channel": "#test", + "interval": 120, "was_live": False, "stream_id": "", + "last_title": "", "last_game": "", + "last_poll": "", "last_error": "", + } + key = "#test:xqc" + _save(bot, key, data) + _streamers[key] = data + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_live): + await _poll_once(bot, key) + messages = [s for t, s in bot.sent if t == "#test"] + assert len(messages) == 1 + assert "[xqc] is live" in messages[0] + assert "Playing games" in messages[0] + assert "Fortnite" in messages[0] + assert "https://twitch.tv/xqc" in messages[0] + updated = _load(bot, key) + assert updated["was_live"] is True + assert updated["stream_id"] == "12345" + + asyncio.run(inner()) + + def test_online_same_stream_no_announce(self): + """Same stream_id while already live does NOT re-announce.""" + _clear() + bot = _FakeBot() + data = { + "login": "xqc", "name": "xqc", "channel": "#test", + "interval": 120, "was_live": True, "stream_id": "12345", + "last_title": "Playing games", "last_game": "Fortnite", + "last_poll": "", "last_error": "", + } + key = "#test:xqc" + _save(bot, key, data) + _streamers[key] = data + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_live): + await _poll_once(bot, key) + assert len(bot.sent) == 0 + updated = _load(bot, key) + assert updated["was_live"] is True + + asyncio.run(inner()) + + def test_online_new_stream_announces(self): + """Different stream_id while live announces new stream.""" + _clear() + bot = _FakeBot() + data = { + "login": "xqc", "name": "xqc", "channel": "#test", + "interval": 120, "was_live": True, "stream_id": "12345", + "last_title": "Playing games", "last_game": "Fortnite", + "last_poll": "", "last_error": "", + } + key = "#test:xqc" + _save(bot, key, data) + _streamers[key] = data + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_live_new_stream): + await _poll_once(bot, key) + messages = [s for t, s in bot.sent if t == "#test"] + assert len(messages) == 1 + assert "New stream" in messages[0] + assert "Minecraft" in messages[0] + updated = _load(bot, key) + assert updated["stream_id"] == "67890" + + asyncio.run(inner()) + + def test_online_to_offline(self): + """Stream ending sets was_live to False, no announcement.""" + _clear() + bot = _FakeBot() + data = { + "login": "xqc", "name": "xqc", "channel": "#test", + "interval": 120, "was_live": True, "stream_id": "12345", + "last_title": "Playing games", "last_game": "Fortnite", + "last_poll": "", "last_error": "", + } + key = "#test:xqc" + _save(bot, key, data) + _streamers[key] = data + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_offline): + await _poll_once(bot, key) + assert len(bot.sent) == 0 + updated = _load(bot, key) + assert updated["was_live"] is False + + asyncio.run(inner()) + + def test_error_increments(self): + _clear() + bot = _FakeBot() + data = { + "login": "xqc", "name": "xqc", "channel": "#test", + "interval": 120, "was_live": False, "stream_id": "", + "last_title": "", "last_game": "", + "last_poll": "", "last_error": "", + } + key = "#test:xqc" + _save(bot, key, data) + _streamers[key] = data + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_error): + await _poll_once(bot, key) + await _poll_once(bot, key) + assert _errors[key] == 2 + updated = _load(bot, key) + assert updated["last_error"] == "Connection refused" + + asyncio.run(inner()) + + def test_no_announce_flag(self): + """announce=False suppresses messages but still updates state.""" + _clear() + bot = _FakeBot() + data = { + "login": "xqc", "name": "xqc", "channel": "#test", + "interval": 120, "was_live": False, "stream_id": "", + "last_title": "", "last_game": "", + "last_poll": "", "last_error": "", + } + key = "#test:xqc" + _save(bot, key, data) + _streamers[key] = data + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_live): + await _poll_once(bot, key, announce=False) + assert len(bot.sent) == 0 + updated = _load(bot, key) + assert updated["was_live"] is True + assert updated["stream_id"] == "12345" + + asyncio.run(inner()) + + def test_no_game_omitted(self): + """Game is omitted from announcement when empty.""" + _clear() + bot = _FakeBot() + data = { + "login": "streamer", "name": "streamer", "channel": "#test", + "interval": 120, "was_live": False, "stream_id": "", + "last_title": "", "last_game": "", + "last_poll": "", "last_error": "", + } + key = "#test:streamer" + _save(bot, key, data) + _streamers[key] = data + + async def inner(): + with patch.object(_mod, "_query_stream", _fake_query_live_no_game): + await _poll_once(bot, key) + messages = [s for t, s in bot.sent if t == "#test"] + assert len(messages) == 1 + assert "Just chatting" in messages[0] + assert "(" not in messages[0] # No game parenthetical + + asyncio.run(inner()) + + def test_missing_key_returns_early(self): + """poll_once with unknown key does nothing.""" + _clear() + bot = _FakeBot() + + async def inner(): + await _poll_once(bot, "#test:missing") + assert len(bot.sent) == 0 + + asyncio.run(inner()) + + +# --------------------------------------------------------------------------- +# TestRestore +# --------------------------------------------------------------------------- + +class TestRestore: + def test_restore_spawns_pollers(self): + _clear() + bot = _FakeBot() + data = { + "login": "xqc", "name": "xqc", "channel": "#test", + "interval": 120, "was_live": False, "stream_id": "", + "last_title": "", "last_game": "", + "last_poll": "", "last_error": "", + } + _save(bot, "#test:xqc", data) + + async def inner(): + _restore(bot) + assert "#test:xqc" in _pollers + task = _pollers["#test:xqc"] + assert not task.done() + _stop_poller("#test:xqc") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_restore_skips_active(self): + _clear() + bot = _FakeBot() + data = { + "login": "xqc", "name": "xqc", "channel": "#test", + "interval": 120, "was_live": False, "stream_id": "", + "last_title": "", "last_game": "", + "last_poll": "", "last_error": "", + } + _save(bot, "#test:xqc", data) + + async def inner(): + dummy = asyncio.create_task(asyncio.sleep(9999)) + _pollers["#test:xqc"] = dummy + _restore(bot) + assert _pollers["#test:xqc"] is dummy + dummy.cancel() + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_restore_replaces_done_task(self): + _clear() + bot = _FakeBot() + data = { + "login": "xqc", "name": "xqc", "channel": "#test", + "interval": 120, "was_live": False, "stream_id": "", + "last_title": "", "last_game": "", + "last_poll": "", "last_error": "", + } + _save(bot, "#test:xqc", data) + + async def inner(): + done_task = asyncio.create_task(asyncio.sleep(0)) + await done_task + _pollers["#test:xqc"] = done_task + _restore(bot) + new_task = _pollers["#test:xqc"] + assert new_task is not done_task + assert not new_task.done() + _stop_poller("#test:xqc") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_restore_skips_bad_json(self): + _clear() + bot = _FakeBot() + bot.state.set("twitch", "#test:bad", "not json{{{") + + async def inner(): + _restore(bot) + assert "#test:bad" not in _pollers + + asyncio.run(inner()) + + def test_on_connect_calls_restore(self): + _clear() + bot = _FakeBot() + data = { + "login": "xqc", "name": "xqc", "channel": "#test", + "interval": 120, "was_live": False, "stream_id": "", + "last_title": "", "last_game": "", + "last_poll": "", "last_error": "", + } + _save(bot, "#test:xqc", data) + + async def inner(): + msg = _msg("", target="botname") + await on_connect(bot, msg) + assert "#test:xqc" in _pollers + _stop_poller("#test:xqc") + await asyncio.sleep(0) + + asyncio.run(inner()) + + +# --------------------------------------------------------------------------- +# TestPollerManagement +# --------------------------------------------------------------------------- + +class TestPollerManagement: + def test_start_and_stop(self): + _clear() + bot = _FakeBot() + data = { + "login": "xqc", "name": "xqc", "channel": "#test", + "interval": 120, "was_live": False, "stream_id": "", + "last_title": "", "last_game": "", + "last_poll": "", "last_error": "", + } + key = "#test:xqc" + _save(bot, key, data) + _streamers[key] = data + + async def inner(): + _start_poller(bot, key) + assert key in _pollers + assert not _pollers[key].done() + _stop_poller(key) + await asyncio.sleep(0) + assert key not in _pollers + assert key not in _streamers + + asyncio.run(inner()) + + def test_start_idempotent(self): + _clear() + bot = _FakeBot() + data = { + "login": "xqc", "name": "xqc", "channel": "#test", + "interval": 120, "was_live": False, "stream_id": "", + "last_title": "", "last_game": "", + "last_poll": "", "last_error": "", + } + key = "#test:xqc" + _save(bot, key, data) + _streamers[key] = data + + async def inner(): + _start_poller(bot, key) + first = _pollers[key] + _start_poller(bot, key) + assert _pollers[key] is first + _stop_poller(key) + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_stop_nonexistent(self): + _clear() + _stop_poller("#test:nonexistent") + + +# --------------------------------------------------------------------------- +# TestCmdTwitchUsage +# --------------------------------------------------------------------------- + +class TestCmdTwitchUsage: + def test_no_args(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_twitch(bot, _msg("!twitch"))) + assert "Usage:" in bot.replied[0] + + def test_unknown_subcommand(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_twitch(bot, _msg("!twitch foobar"))) + assert "Usage:" in bot.replied[0]