Files
derp/plugins/twitch.py
user 073659607e feat: add multi-server support
Connect to multiple IRC servers concurrently from a single config file.
Plugins are loaded once and shared; per-server state is isolated via
separate SQLite databases and per-bot runtime state (bot._pstate).

- Add build_server_configs() for [servers.*] config layout
- Bot.__init__ gains name parameter, _pstate dict for plugin isolation
- cli.py runs multiple bots via asyncio.gather
- 9 stateful plugins migrated from module-level dicts to _ps(bot) pattern
- Backward compatible: legacy [server] config works unchanged

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 19:04:20 +01:00

465 lines
15 KiB
Python

"""Plugin: Twitch livestream notifications via public GQL endpoint."""
from __future__ import annotations
import asyncio
import json
import re
import urllib.request
from datetime import datetime, timezone
from derp.http import urlopen as _urlopen
from derp.plugin import command, event
# -- Constants ---------------------------------------------------------------
_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,19}$")
_TWITCH_LOGIN_RE = re.compile(r"^[a-zA-Z0-9_]{1,25}$")
_GQL_URL = "https://gql.twitch.tv/gql"
_GQL_CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko"
_DEFAULT_INTERVAL = 120
_MAX_INTERVAL = 3600
_FETCH_TIMEOUT = 10
_MAX_TITLE_LEN = 80
_MAX_STREAMERS = 20
# -- Per-bot runtime state ---------------------------------------------------
def _ps(bot):
"""Per-bot plugin runtime state."""
return bot._pstate.setdefault("twitch", {
"pollers": {},
"streamers": {},
"errors": {},
})
# -- Pure helpers ------------------------------------------------------------
def _state_key(channel: str, name: str) -> str:
"""Build composite state key."""
return f"{channel}:{name}"
def _validate_name(name: str) -> bool:
"""Check name against allowed pattern."""
return bool(_NAME_RE.match(name))
def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str:
"""Truncate text with ellipsis if needed."""
if len(text) <= max_len:
return text
return text[: max_len - 3].rstrip() + "..."
def _compact_num(n: int) -> str:
"""Format large numbers compactly: 1234 -> 1.2k, 1234567 -> 1.2M."""
if n >= 1_000_000:
return f"{n / 1_000_000:.1f}M".replace(".0M", "M")
if n >= 1_000:
return f"{n / 1_000:.1f}k".replace(".0k", "k")
return str(n)
# -- Blocking helpers (for executor) -----------------------------------------
def _query_stream(login: str) -> dict:
"""Blocking GQL query. Returns normalised stream info.
Keys: exists, login, display_name, live, stream_id, title, game,
viewers, error.
"""
result: dict = {
"exists": False,
"login": "",
"display_name": "",
"live": False,
"stream_id": "",
"title": "",
"game": "",
"viewers": 0,
"error": "",
}
query = (
'query{user(login:"' + login + '"){login displayName '
"stream{id title game{name}viewersCount}}}"
)
body = json.dumps({"query": query}).encode()
req = urllib.request.Request(_GQL_URL, data=body, method="POST")
req.add_header("Client-Id", _GQL_CLIENT_ID)
req.add_header("Content-Type", "application/json")
try:
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
raw = resp.read()
resp.close()
data = json.loads(raw)
except Exception as exc:
result["error"] = str(exc)
return result
try:
user = data["data"]["user"]
except (KeyError, TypeError):
result["error"] = "Unexpected GQL response"
return result
if user is None:
return result # exists=False
result["exists"] = True
result["login"] = user.get("login", "")
result["display_name"] = user.get("displayName", "")
stream = user.get("stream")
if stream is not None:
result["live"] = True
result["stream_id"] = str(stream.get("id", ""))
result["title"] = stream.get("title", "")
game = stream.get("game")
result["game"] = game.get("name", "") if game else ""
result["viewers"] = stream.get("viewersCount", 0)
return result
# -- State helpers -----------------------------------------------------------
def _save(bot, key: str, data: dict) -> None:
"""Persist streamer data to bot.state."""
bot.state.set("twitch", key, json.dumps(data))
def _load(bot, key: str) -> dict | None:
"""Load streamer data from bot.state."""
raw = bot.state.get("twitch", key)
if raw is None:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return None
def _delete(bot, key: str) -> None:
"""Remove streamer data from bot.state."""
bot.state.delete("twitch", key)
# -- Polling -----------------------------------------------------------------
async def _poll_once(bot, key: str, announce: bool = True) -> None:
"""Single poll cycle for one Twitch streamer."""
ps = _ps(bot)
data = ps["streamers"].get(key)
if data is None:
data = _load(bot, key)
if data is None:
return
ps["streamers"][key] = data
login = data["login"]
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _query_stream, login)
now = datetime.now(timezone.utc).isoformat()
data["last_poll"] = now
if result["error"]:
data["last_error"] = result["error"]
ps["errors"][key] = ps["errors"].get(key, 0) + 1
ps["streamers"][key] = data
_save(bot, key, data)
return
data["last_error"] = ""
ps["errors"][key] = 0
was_live = data.get("was_live", False)
old_stream_id = data.get("stream_id", "")
if result["live"]:
new_stream_id = result["stream_id"]
data["last_title"] = result["title"]
data["last_game"] = result["game"]
data["last_viewers"] = result["viewers"]
if announce and (not was_live or new_stream_id != old_stream_id):
channel = data["channel"]
name = data["name"]
title = _truncate(result["title"]) if result["title"] else "(no title)"
game = result["game"]
viewers = result["viewers"]
line = f"[{name}] is live: {title}"
if game:
line += f" ({game})"
if viewers:
line += f" | {_compact_num(viewers)} viewers"
line += f" -- https://twitch.tv/{login}"
await bot.send(channel, line)
data["was_live"] = True
data["stream_id"] = new_stream_id
else:
data["was_live"] = False
ps["streamers"][key] = data
_save(bot, key, data)
async def _poll_loop(bot, key: str) -> None:
"""Infinite poll loop for one Twitch streamer."""
try:
while True:
ps = _ps(bot)
data = ps["streamers"].get(key) or _load(bot, key)
if data is None:
return
interval = data.get("interval", _DEFAULT_INTERVAL)
errs = ps["errors"].get(key, 0)
if errs >= 5:
interval = min(interval * 2, _MAX_INTERVAL)
await asyncio.sleep(interval)
await _poll_once(bot, key, announce=True)
except asyncio.CancelledError:
pass
def _start_poller(bot, key: str) -> None:
"""Create and track a poller task."""
ps = _ps(bot)
existing = ps["pollers"].get(key)
if existing and not existing.done():
return
task = asyncio.create_task(_poll_loop(bot, key))
ps["pollers"][key] = task
def _stop_poller(bot, key: str) -> None:
"""Cancel and remove a poller task."""
ps = _ps(bot)
task = ps["pollers"].pop(key, None)
if task and not task.done():
task.cancel()
ps["streamers"].pop(key, None)
ps["errors"].pop(key, 0)
# -- Restore on connect -----------------------------------------------------
def _restore(bot) -> None:
"""Rebuild pollers from persisted state."""
ps = _ps(bot)
for key in bot.state.keys("twitch"):
existing = ps["pollers"].get(key)
if existing and not existing.done():
continue
data = _load(bot, key)
if data is None:
continue
ps["streamers"][key] = data
_start_poller(bot, key)
@event("001")
async def on_connect(bot, message):
"""Restore Twitch streamer pollers on connect."""
_restore(bot)
# -- Command handler ---------------------------------------------------------
@command("twitch", help="Twitch: !twitch follow|unfollow|list|check")
async def cmd_twitch(bot, message):
"""Per-channel Twitch livestream subscriptions.
Usage:
!twitch follow <username> [name] Follow a streamer (admin)
!twitch unfollow <name> Unfollow a streamer (admin)
!twitch list List followed streamers
!twitch check <name> Check status now
"""
parts = message.text.split(None, 3)
if len(parts) < 2:
await bot.reply(message, "Usage: !twitch <follow|unfollow|list|check> [args]")
return
sub = parts[1].lower()
# -- list (any user, channel only) ----------------------------------------
if sub == "list":
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
channel = message.target
prefix = f"{channel}:"
streamers = []
for key in bot.state.keys("twitch"):
if key.startswith(prefix):
data = _load(bot, key)
if data:
name = data["name"]
err = data.get("last_error", "")
live = data.get("was_live", False)
if err:
streamers.append(f"{name} (error)")
elif live:
viewers = data.get("last_viewers", 0)
if viewers:
streamers.append(
f"{name} (live, {_compact_num(viewers)})"
)
else:
streamers.append(f"{name} (live)")
else:
streamers.append(name)
if not streamers:
await bot.reply(message, "No Twitch streamers in this channel")
return
await bot.reply(message, f"Twitch: {', '.join(streamers)}")
return
# -- check (any user, channel only) ---------------------------------------
if sub == "check":
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 3:
await bot.reply(message, "Usage: !twitch check <name>")
return
name = parts[2].lower()
channel = message.target
key = _state_key(channel, name)
data = _load(bot, key)
if data is None:
await bot.reply(message, f"No streamer '{name}' in this channel")
return
ps = _ps(bot)
ps["streamers"][key] = data
await _poll_once(bot, key, announce=True)
data = ps["streamers"].get(key, data)
if data.get("last_error"):
await bot.reply(message, f"{name}: error -- {data['last_error']}")
elif data.get("was_live"):
title = _truncate(data.get("last_title", ""))
game = data.get("last_game", "")
viewers = data.get("last_viewers", 0)
line = f"{name}: live -- {title}"
if game:
line += f" ({game})"
if viewers:
line += f" | {_compact_num(viewers)} viewers"
await bot.reply(message, line)
else:
await bot.reply(message, f"{name}: offline")
return
# -- follow (admin, channel only) -----------------------------------------
if sub == "follow":
if not bot._is_admin(message):
await bot.reply(message, "Permission denied: follow requires admin")
return
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 3:
await bot.reply(message, "Usage: !twitch follow <username> [name]")
return
username = parts[2]
if not _TWITCH_LOGIN_RE.match(username):
await bot.reply(message, "Invalid Twitch username")
return
# Query GQL to verify user exists and get display name
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _query_stream, username)
if result["error"]:
await bot.reply(message, f"GQL query failed: {result['error']}")
return
if not result["exists"]:
await bot.reply(message, f"Twitch user '{username}' not found")
return
login = result["login"]
display_name = result["display_name"]
name = parts[3].lower() if len(parts) > 3 else login.lower()
if not _validate_name(name):
await bot.reply(
message,
"Invalid name (lowercase alphanumeric + hyphens, 1-20 chars)",
)
return
irc_channel = message.target
key = _state_key(irc_channel, name)
if _load(bot, key) is not None:
await bot.reply(message, f"Streamer '{name}' already exists in this channel")
return
ch_prefix = f"{irc_channel}:"
count = sum(1 for k in bot.state.keys("twitch") if k.startswith(ch_prefix))
if count >= _MAX_STREAMERS:
await bot.reply(message, f"Streamer limit reached ({_MAX_STREAMERS})")
return
now = datetime.now(timezone.utc).isoformat()
data = {
"login": login,
"display_name": display_name,
"name": name,
"channel": irc_channel,
"interval": _DEFAULT_INTERVAL,
"added_by": message.nick,
"added_at": now,
"was_live": result["live"],
"stream_id": result["stream_id"],
"last_title": result["title"],
"last_game": result["game"],
"last_poll": now,
"last_error": "",
}
_save(bot, key, data)
_ps(bot)["streamers"][key] = data
_start_poller(bot, key)
reply = f"Following '{name}' ({display_name})"
if result["live"]:
reply += " [live]"
await bot.reply(message, reply)
return
# -- unfollow (admin, channel only) ---------------------------------------
if sub == "unfollow":
if not bot._is_admin(message):
await bot.reply(message, "Permission denied: unfollow requires admin")
return
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 3:
await bot.reply(message, "Usage: !twitch unfollow <name>")
return
name = parts[2].lower()
channel = message.target
key = _state_key(channel, name)
if _load(bot, key) is None:
await bot.reply(message, f"No streamer '{name}' in this channel")
return
_stop_poller(bot, key)
_delete(bot, key)
await bot.reply(message, f"Unfollowed '{name}'")
return
await bot.reply(message, "Usage: !twitch <follow|unfollow|list|check> [args]")