"""Plugin: Twitch livestream notifications via public GQL endpoint.""" from __future__ import annotations import asyncio import json import re import urllib.request from datetime import datetime, timezone from derp.http import urlopen as _urlopen from derp.plugin import command, event # -- Constants --------------------------------------------------------------- _NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,19}$") _TWITCH_LOGIN_RE = re.compile(r"^[a-zA-Z0-9_]{1,25}$") _GQL_URL = "https://gql.twitch.tv/gql" _GQL_CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko" _DEFAULT_INTERVAL = 120 _MAX_INTERVAL = 3600 _FETCH_TIMEOUT = 10 _MAX_TITLE_LEN = 80 _MAX_STREAMERS = 20 # -- Per-bot runtime state --------------------------------------------------- def _ps(bot): """Per-bot plugin runtime state.""" return bot._pstate.setdefault("twitch", { "pollers": {}, "streamers": {}, "errors": {}, }) # -- Pure helpers ------------------------------------------------------------ def _state_key(channel: str, name: str) -> str: """Build composite state key.""" return f"{channel}:{name}" def _validate_name(name: str) -> bool: """Check name against allowed pattern.""" return bool(_NAME_RE.match(name)) def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str: """Truncate text with ellipsis if needed.""" if len(text) <= max_len: return text return text[: max_len - 3].rstrip() + "..." def _compact_num(n: int) -> str: """Format large numbers compactly: 1234 -> 1.2k, 1234567 -> 1.2M.""" if n >= 1_000_000: return f"{n / 1_000_000:.1f}M".replace(".0M", "M") if n >= 1_000: return f"{n / 1_000:.1f}k".replace(".0k", "k") return str(n) # -- Blocking helpers (for executor) ----------------------------------------- def _query_stream(login: str) -> dict: """Blocking GQL query. Returns normalised stream info. Keys: exists, login, display_name, live, stream_id, title, game, viewers, error. """ result: dict = { "exists": False, "login": "", "display_name": "", "live": False, "stream_id": "", "title": "", "game": "", "viewers": 0, "error": "", } query = ( 'query{user(login:"' + login + '"){login displayName ' "stream{id title game{name}viewersCount}}}" ) body = json.dumps({"query": query}).encode() req = urllib.request.Request(_GQL_URL, data=body, method="POST") req.add_header("Client-Id", _GQL_CLIENT_ID) req.add_header("Content-Type", "application/json") try: resp = _urlopen(req, timeout=_FETCH_TIMEOUT) raw = resp.read() resp.close() data = json.loads(raw) except Exception as exc: result["error"] = str(exc) return result try: user = data["data"]["user"] except (KeyError, TypeError): result["error"] = "Unexpected GQL response" return result if user is None: return result # exists=False result["exists"] = True result["login"] = user.get("login", "") result["display_name"] = user.get("displayName", "") stream = user.get("stream") if stream is not None: result["live"] = True result["stream_id"] = str(stream.get("id", "")) result["title"] = stream.get("title", "") game = stream.get("game") result["game"] = game.get("name", "") if game else "" result["viewers"] = stream.get("viewersCount", 0) return result # -- State helpers ----------------------------------------------------------- def _save(bot, key: str, data: dict) -> None: """Persist streamer data to bot.state.""" bot.state.set("twitch", key, json.dumps(data)) def _load(bot, key: str) -> dict | None: """Load streamer data from bot.state.""" raw = bot.state.get("twitch", key) if raw is None: return None try: return json.loads(raw) except json.JSONDecodeError: return None def _delete(bot, key: str) -> None: """Remove streamer data from bot.state.""" bot.state.delete("twitch", key) # -- Polling ----------------------------------------------------------------- async def _poll_once(bot, key: str, announce: bool = True) -> None: """Single poll cycle for one Twitch streamer.""" ps = _ps(bot) data = ps["streamers"].get(key) if data is None: data = _load(bot, key) if data is None: return ps["streamers"][key] = data login = data["login"] loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, _query_stream, login) now = datetime.now(timezone.utc).isoformat() data["last_poll"] = now if result["error"]: data["last_error"] = result["error"] ps["errors"][key] = ps["errors"].get(key, 0) + 1 ps["streamers"][key] = data _save(bot, key, data) return data["last_error"] = "" ps["errors"][key] = 0 was_live = data.get("was_live", False) old_stream_id = data.get("stream_id", "") if result["live"]: new_stream_id = result["stream_id"] data["last_title"] = result["title"] data["last_game"] = result["game"] data["last_viewers"] = result["viewers"] if announce and (not was_live or new_stream_id != old_stream_id): channel = data["channel"] name = data["name"] title = _truncate(result["title"]) if result["title"] else "(no title)" game = result["game"] viewers = result["viewers"] line = f"[{name}] is live: {title}" if game: line += f" ({game})" if viewers: line += f" | {_compact_num(viewers)} viewers" line += f" -- https://twitch.tv/{login}" await bot.send(channel, line) data["was_live"] = True data["stream_id"] = new_stream_id else: data["was_live"] = False ps["streamers"][key] = data _save(bot, key, data) async def _poll_loop(bot, key: str) -> None: """Infinite poll loop for one Twitch streamer.""" try: while True: ps = _ps(bot) data = ps["streamers"].get(key) or _load(bot, key) if data is None: return interval = data.get("interval", _DEFAULT_INTERVAL) errs = ps["errors"].get(key, 0) if errs >= 5: interval = min(interval * 2, _MAX_INTERVAL) await asyncio.sleep(interval) await _poll_once(bot, key, announce=True) except asyncio.CancelledError: pass def _start_poller(bot, key: str) -> None: """Create and track a poller task.""" ps = _ps(bot) existing = ps["pollers"].get(key) if existing and not existing.done(): return task = asyncio.create_task(_poll_loop(bot, key)) ps["pollers"][key] = task def _stop_poller(bot, key: str) -> None: """Cancel and remove a poller task.""" ps = _ps(bot) task = ps["pollers"].pop(key, None) if task and not task.done(): task.cancel() ps["streamers"].pop(key, None) ps["errors"].pop(key, 0) # -- Restore on connect ----------------------------------------------------- def _restore(bot) -> None: """Rebuild pollers from persisted state.""" ps = _ps(bot) for key in bot.state.keys("twitch"): existing = ps["pollers"].get(key) if existing and not existing.done(): continue data = _load(bot, key) if data is None: continue ps["streamers"][key] = data _start_poller(bot, key) @event("001") async def on_connect(bot, message): """Restore Twitch streamer pollers on connect.""" _restore(bot) # -- Command handler --------------------------------------------------------- @command("twitch", help="Twitch: !twitch follow|unfollow|list|check") async def cmd_twitch(bot, message): """Per-channel Twitch livestream subscriptions. Usage: !twitch follow [name] Follow a streamer (admin) !twitch unfollow Unfollow a streamer (admin) !twitch list List followed streamers !twitch check Check status now """ parts = message.text.split(None, 3) if len(parts) < 2: await bot.reply(message, "Usage: !twitch [args]") return sub = parts[1].lower() # -- list (any user, channel only) ---------------------------------------- if sub == "list": if not message.is_channel: await bot.reply(message, "Use this command in a channel") return channel = message.target prefix = f"{channel}:" streamers = [] for key in bot.state.keys("twitch"): if key.startswith(prefix): data = _load(bot, key) if data: name = data["name"] err = data.get("last_error", "") live = data.get("was_live", False) if err: streamers.append(f"{name} (error)") elif live: viewers = data.get("last_viewers", 0) if viewers: streamers.append( f"{name} (live, {_compact_num(viewers)})" ) else: streamers.append(f"{name} (live)") else: streamers.append(name) if not streamers: await bot.reply(message, "No Twitch streamers in this channel") return await bot.reply(message, f"Twitch: {', '.join(streamers)}") return # -- check (any user, channel only) --------------------------------------- if sub == "check": if not message.is_channel: await bot.reply(message, "Use this command in a channel") return if len(parts) < 3: await bot.reply(message, "Usage: !twitch check ") return name = parts[2].lower() channel = message.target key = _state_key(channel, name) data = _load(bot, key) if data is None: await bot.reply(message, f"No streamer '{name}' in this channel") return ps = _ps(bot) ps["streamers"][key] = data await _poll_once(bot, key, announce=True) data = ps["streamers"].get(key, data) if data.get("last_error"): await bot.reply(message, f"{name}: error -- {data['last_error']}") elif data.get("was_live"): title = _truncate(data.get("last_title", "")) game = data.get("last_game", "") viewers = data.get("last_viewers", 0) line = f"{name}: live -- {title}" if game: line += f" ({game})" if viewers: line += f" | {_compact_num(viewers)} viewers" await bot.reply(message, line) else: await bot.reply(message, f"{name}: offline") return # -- follow (admin, channel only) ----------------------------------------- if sub == "follow": if not bot._is_admin(message): await bot.reply(message, "Permission denied: follow requires admin") return if not message.is_channel: await bot.reply(message, "Use this command in a channel") return if len(parts) < 3: await bot.reply(message, "Usage: !twitch follow [name]") return username = parts[2] if not _TWITCH_LOGIN_RE.match(username): await bot.reply(message, "Invalid Twitch username") return # Query GQL to verify user exists and get display name loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, _query_stream, username) if result["error"]: await bot.reply(message, f"GQL query failed: {result['error']}") return if not result["exists"]: await bot.reply(message, f"Twitch user '{username}' not found") return login = result["login"] display_name = result["display_name"] name = parts[3].lower() if len(parts) > 3 else login.lower() if not _validate_name(name): await bot.reply( message, "Invalid name (lowercase alphanumeric + hyphens, 1-20 chars)", ) return irc_channel = message.target key = _state_key(irc_channel, name) if _load(bot, key) is not None: await bot.reply(message, f"Streamer '{name}' already exists in this channel") return ch_prefix = f"{irc_channel}:" count = sum(1 for k in bot.state.keys("twitch") if k.startswith(ch_prefix)) if count >= _MAX_STREAMERS: await bot.reply(message, f"Streamer limit reached ({_MAX_STREAMERS})") return now = datetime.now(timezone.utc).isoformat() data = { "login": login, "display_name": display_name, "name": name, "channel": irc_channel, "interval": _DEFAULT_INTERVAL, "added_by": message.nick, "added_at": now, "was_live": result["live"], "stream_id": result["stream_id"], "last_title": result["title"], "last_game": result["game"], "last_poll": now, "last_error": "", } _save(bot, key, data) _ps(bot)["streamers"][key] = data _start_poller(bot, key) reply = f"Following '{name}' ({display_name})" if result["live"]: reply += " [live]" await bot.reply(message, reply) return # -- unfollow (admin, channel only) --------------------------------------- if sub == "unfollow": if not bot._is_admin(message): await bot.reply(message, "Permission denied: unfollow requires admin") return if not message.is_channel: await bot.reply(message, "Use this command in a channel") return if len(parts) < 3: await bot.reply(message, "Usage: !twitch unfollow ") return name = parts[2].lower() channel = message.target key = _state_key(channel, name) if _load(bot, key) is None: await bot.reply(message, f"No streamer '{name}' in this channel") return _stop_poller(bot, key) _delete(bot, key) await bot.reply(message, f"Unfollowed '{name}'") return await bot.reply(message, "Usage: !twitch [args]")