Files
derp/plugins/youtube.py
user 34d5dd6f8d fix: resolve YouTube channel ID via InnerTube for video URLs
Video URLs (watch, shorts, embed, youtu.be) now resolve the channel
ID through the InnerTube player API -- a small JSON POST instead of
fetching the full 1MB watch page. Much more resilient to transient
proxy failures. Page scraping remains as fallback for handle URLs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 18:39:32 +01:00

583 lines
19 KiB
Python

"""Plugin: follow YouTube channels via Atom feeds with periodic polling."""
from __future__ import annotations
import asyncio
import json
import re
import urllib.request
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from urllib.parse import urlparse
from derp.http import urlopen as _urlopen
from derp.plugin import command, event
# -- Constants ---------------------------------------------------------------
_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,19}$")
_CHANNEL_ID_RE = re.compile(r"UC[A-Za-z0-9_-]{22}")
_CHANNEL_URL_RE = re.compile(r"/channel/(UC[A-Za-z0-9_-]{22})")
_PAGE_BROWSE_RE = re.compile(rb'"browseId"\s*:\s*"(UC[A-Za-z0-9_-]{22})"')
_PAGE_CHANNEL_RE = re.compile(rb'"channelId"\s*:\s*"(UC[A-Za-z0-9_-]{22})"')
_VIDEO_ID_RE = re.compile(r"(?:v=|youtu\.be/|/embed/|/shorts/)([A-Za-z0-9_-]{11})")
_YT_DOMAINS = {"youtube.com", "www.youtube.com", "m.youtube.com", "youtu.be"}
_YT_FEED_URL = "https://www.youtube.com/feeds/videos.xml?channel_id={}"
_YT_PLAYER_URL = "https://www.youtube.com/youtubei/v1/player"
_YT_CLIENT_VERSION = "2.20250101.00.00"
_ATOM_NS = "{http://www.w3.org/2005/Atom}"
_YT_NS = "{http://www.youtube.com/xml/schemas/2015}"
_MAX_SEEN = 200
_MAX_ANNOUNCE = 5
_DEFAULT_INTERVAL = 600
_MAX_INTERVAL = 3600
_FETCH_TIMEOUT = 15
_USER_AGENT = "derp/1.0"
_BROWSER_UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36"
_MAX_TITLE_LEN = 80
_MAX_CHANNELS = 20
# -- Module-level tracking ---------------------------------------------------
_pollers: dict[str, asyncio.Task] = {}
_channels: dict[str, dict] = {}
_errors: dict[str, int] = {}
# -- Pure helpers ------------------------------------------------------------
def _state_key(channel: str, name: str) -> str:
"""Build composite state key."""
return f"{channel}:{name}"
def _validate_name(name: str) -> bool:
"""Check name against allowed pattern."""
return bool(_NAME_RE.match(name))
def _derive_name(title: str) -> str:
"""Derive a short feed name from channel title."""
name = title.lower().strip()
name = re.sub(r"[^a-z0-9-]", "", name.replace(" ", "-"))
# Collapse consecutive hyphens
name = re.sub(r"-{2,}", "-", name).strip("-")
if not name or not name[0].isalnum():
name = "yt"
return name[:20]
def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str:
"""Truncate text with ellipsis if needed."""
if len(text) <= max_len:
return text
return text[: max_len - 3].rstrip() + "..."
def _is_youtube_url(url: str) -> bool:
"""Check if URL is a YouTube domain."""
try:
hostname = urlparse(url).hostname or ""
except Exception:
return False
return hostname.lower() in _YT_DOMAINS
def _extract_channel_id(url: str) -> str | None:
"""Try to extract channel ID directly from /channel/ URL."""
m = _CHANNEL_URL_RE.search(url)
return m.group(1) if m else None
def _extract_video_id(url: str) -> str | None:
"""Try to extract video ID from a YouTube URL."""
m = _VIDEO_ID_RE.search(url)
return m.group(1) if m else None
# -- Blocking helpers (for executor) -----------------------------------------
def _resolve_via_innertube(video_id: str) -> str | None:
"""Resolve video ID to channel ID via InnerTube player API. Blocking.
Small JSON request/response -- much more resilient to transient proxy
issues than fetching the full 1MB watch page.
"""
payload = json.dumps({
"context": {
"client": {
"clientName": "WEB",
"clientVersion": _YT_CLIENT_VERSION,
},
},
"videoId": video_id,
}).encode()
req = urllib.request.Request(_YT_PLAYER_URL, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
try:
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
raw = resp.read()
resp.close()
data = json.loads(raw)
channel_id = (data.get("videoDetails") or {}).get("channelId", "")
if channel_id and _CHANNEL_ID_RE.fullmatch(channel_id):
return channel_id
except Exception:
pass
return None
def _resolve_channel(url: str) -> str | None:
"""Fetch YouTube page HTML and extract channel ID. Blocking.
Fallback for handle/non-video URLs. Tries browseId first, then channelId.
"""
req = urllib.request.Request(url, method="GET")
req.add_header("User-Agent", _BROWSER_UA)
try:
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
body = resp.read(1_048_576) # Read up to 1MB
resp.close()
except Exception:
return None
for pattern in (_PAGE_BROWSE_RE, _PAGE_CHANNEL_RE):
m = pattern.search(body)
if m:
return m.group(1).decode()
return None
def _fetch_feed(url: str, etag: str = "", last_modified: str = "") -> dict:
"""Blocking HTTP GET for feed content. Run via executor."""
result: dict = {
"status": 0,
"body": b"",
"etag": "",
"last_modified": "",
"error": "",
}
req = urllib.request.Request(url, method="GET")
req.add_header("User-Agent", _USER_AGENT)
if etag:
req.add_header("If-None-Match", etag)
if last_modified:
req.add_header("If-Modified-Since", last_modified)
try:
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
result["status"] = resp.status
result["body"] = resp.read()
result["etag"] = resp.headers.get("ETag", "")
result["last_modified"] = resp.headers.get("Last-Modified", "")
resp.close()
except urllib.error.HTTPError as exc:
result["status"] = exc.code
if exc.code == 304:
result["etag"] = etag
result["last_modified"] = last_modified
else:
result["error"] = f"HTTP {exc.code}"
except urllib.error.URLError as exc:
result["error"] = str(exc.reason)
except Exception as exc:
result["error"] = str(exc)
return result
# -- Feed parsing ------------------------------------------------------------
def _parse_feed(body: bytes) -> tuple[str, list[dict]]:
"""Parse YouTube Atom feed. Returns (channel_name, items).
Each item: {"id": "yt:video:...", "title": "...", "link": "..."}
"""
root = ET.fromstring(body)
author = root.find(f"{_ATOM_NS}author")
channel_name = ""
if author is not None:
channel_name = (author.findtext(f"{_ATOM_NS}name") or "").strip()
if not channel_name:
channel_name = (root.findtext(f"{_ATOM_NS}title") or "").strip()
items = []
for entry in root.findall(f"{_ATOM_NS}entry"):
entry_id = (entry.findtext(f"{_ATOM_NS}id") or "").strip()
video_id = (entry.findtext(f"{_YT_NS}videoId") or "").strip()
entry_title = (entry.findtext(f"{_ATOM_NS}title") or "").strip()
if video_id:
link = f"https://www.youtube.com/watch?v={video_id}"
else:
link_el = entry.find(f"{_ATOM_NS}link")
link = (link_el.get("href", "") if link_el is not None else "").strip()
if not entry_id:
entry_id = link
if entry_id:
items.append({"id": entry_id, "title": entry_title, "link": link})
return (channel_name, items)
# -- State helpers -----------------------------------------------------------
def _save(bot, key: str, data: dict) -> None:
"""Persist channel data to bot.state."""
bot.state.set("yt", key, json.dumps(data))
def _load(bot, key: str) -> dict | None:
"""Load channel data from bot.state."""
raw = bot.state.get("yt", key)
if raw is None:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return None
def _delete(bot, key: str) -> None:
"""Remove channel data from bot.state."""
bot.state.delete("yt", key)
# -- Polling -----------------------------------------------------------------
async def _poll_once(bot, key: str, announce: bool = True) -> None:
"""Single poll cycle for one YouTube channel."""
data = _channels.get(key)
if data is None:
data = _load(bot, key)
if data is None:
return
_channels[key] = data
url = data["feed_url"]
etag = data.get("etag", "")
last_modified = data.get("last_modified", "")
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, _fetch_feed, url, etag, last_modified,
)
now = datetime.now(timezone.utc).isoformat()
data["last_poll"] = now
if result["error"]:
data["last_error"] = result["error"]
_errors[key] = _errors.get(key, 0) + 1
_channels[key] = data
_save(bot, key, data)
return
# HTTP 304 -- not modified
if result["status"] == 304:
data["last_error"] = ""
_errors[key] = 0
_channels[key] = data
_save(bot, key, data)
return
# Update conditional headers
data["etag"] = result["etag"]
data["last_modified"] = result["last_modified"]
data["last_error"] = ""
_errors[key] = 0
try:
_, items = _parse_feed(result["body"])
except Exception as exc:
data["last_error"] = f"Parse error: {exc}"
_errors[key] = _errors.get(key, 0) + 1
_channels[key] = data
_save(bot, key, data)
return
seen = set(data.get("seen", []))
seen_list = list(data.get("seen", []))
new_items = [item for item in items if item["id"] not in seen]
if announce and new_items:
channel = data["channel"]
name = data["name"]
shown = new_items[:_MAX_ANNOUNCE]
for item in shown:
title = _truncate(item["title"]) if item["title"] else "(no title)"
link = item["link"]
line = f"[{name}] {title}"
if link:
line += f" -- {link}"
await bot.send(channel, line)
remaining = len(new_items) - len(shown)
if remaining > 0:
await bot.send(channel, f"[{name}] ... and {remaining} more")
# Update seen list
for item in new_items:
seen_list.append(item["id"])
if len(seen_list) > _MAX_SEEN:
seen_list = seen_list[-_MAX_SEEN:]
data["seen"] = seen_list
_channels[key] = data
_save(bot, key, data)
async def _poll_loop(bot, key: str) -> None:
"""Infinite poll loop for one YouTube channel."""
try:
while True:
data = _channels.get(key) or _load(bot, key)
if data is None:
return
interval = data.get("interval", _DEFAULT_INTERVAL)
# Back off on consecutive errors
errs = _errors.get(key, 0)
if errs >= 5:
interval = min(interval * 2, _MAX_INTERVAL)
await asyncio.sleep(interval)
await _poll_once(bot, key, announce=True)
except asyncio.CancelledError:
pass
def _start_poller(bot, key: str) -> None:
"""Create and track a poller task."""
existing = _pollers.get(key)
if existing and not existing.done():
return
task = asyncio.create_task(_poll_loop(bot, key))
_pollers[key] = task
def _stop_poller(key: str) -> None:
"""Cancel and remove a poller task."""
task = _pollers.pop(key, None)
if task and not task.done():
task.cancel()
_channels.pop(key, None)
_errors.pop(key, 0)
# -- Restore on connect -----------------------------------------------------
def _restore(bot) -> None:
"""Rebuild pollers from persisted state."""
for key in bot.state.keys("yt"):
existing = _pollers.get(key)
if existing and not existing.done():
continue
data = _load(bot, key)
if data is None:
continue
_channels[key] = data
_start_poller(bot, key)
@event("001")
async def on_connect(bot, message):
"""Restore YouTube channel pollers on connect."""
_restore(bot)
# -- Command handler ---------------------------------------------------------
@command("yt", help="YouTube: !yt follow|unfollow|list|check")
async def cmd_yt(bot, message):
"""Per-channel YouTube channel subscriptions.
Usage:
!yt follow <url> [name] Follow a YouTube channel (admin)
!yt unfollow <name> Unfollow a channel (admin)
!yt list List followed channels
!yt check <name> Force-poll a channel now
"""
parts = message.text.split(None, 3)
if len(parts) < 2:
await bot.reply(message, "Usage: !yt <follow|unfollow|list|check> [args]")
return
sub = parts[1].lower()
# -- list (any user, channel only) ----------------------------------------
if sub == "list":
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
channel = message.target
prefix = f"{channel}:"
channels = []
for key in bot.state.keys("yt"):
if key.startswith(prefix):
data = _load(bot, key)
if data:
name = data["name"]
err = data.get("last_error", "")
if err:
channels.append(f"{name} (error)")
else:
channels.append(name)
if not channels:
await bot.reply(message, "No YouTube channels in this channel")
return
await bot.reply(message, f"YouTube: {', '.join(channels)}")
return
# -- check (any user, channel only) ---------------------------------------
if sub == "check":
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 3:
await bot.reply(message, "Usage: !yt check <name>")
return
name = parts[2].lower()
channel = message.target
key = _state_key(channel, name)
data = _load(bot, key)
if data is None:
await bot.reply(message, f"No channel '{name}' in this channel")
return
_channels[key] = data
await _poll_once(bot, key, announce=True)
data = _channels.get(key, data)
if data.get("last_error"):
await bot.reply(message, f"{name}: error -- {data['last_error']}")
else:
await bot.reply(message, f"{name}: checked")
return
# -- follow (admin, channel only) -----------------------------------------
if sub == "follow":
if not bot._is_admin(message):
await bot.reply(message, "Permission denied: follow requires admin")
return
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 3:
await bot.reply(message, "Usage: !yt follow <url> [name]")
return
url = parts[2]
if not url.startswith(("http://", "https://")):
url = f"https://{url}"
if not _is_youtube_url(url):
await bot.reply(message, "Not a YouTube URL")
return
# Resolve channel ID
loop = asyncio.get_running_loop()
channel_id = _extract_channel_id(url)
if not channel_id:
video_id = _extract_video_id(url)
if video_id:
channel_id = await loop.run_in_executor(
None, _resolve_via_innertube, video_id,
)
if not channel_id:
channel_id = await loop.run_in_executor(None, _resolve_channel, url)
if not channel_id:
await bot.reply(message, "Could not resolve YouTube channel ID")
return
feed_url = _YT_FEED_URL.format(channel_id)
# Test-fetch to validate and get channel name
result = await loop.run_in_executor(None, _fetch_feed, feed_url, "", "")
if result["error"]:
await bot.reply(message, f"Feed fetch failed: {result['error']}")
return
channel_title = ""
seen = []
try:
channel_title, items = _parse_feed(result["body"])
seen = [item["id"] for item in items]
if len(seen) > _MAX_SEEN:
seen = seen[-_MAX_SEEN:]
except Exception as exc:
await bot.reply(message, f"Feed parse failed: {exc}")
return
name = parts[3].lower() if len(parts) > 3 else _derive_name(channel_title or "yt")
if not _validate_name(name):
await bot.reply(
message,
"Invalid name (lowercase alphanumeric + hyphens, 1-20 chars)",
)
return
irc_channel = message.target
key = _state_key(irc_channel, name)
# Check for duplicate
if _load(bot, key) is not None:
await bot.reply(message, f"Channel '{name}' already exists in this channel")
return
# Check per-channel limit
ch_prefix = f"{irc_channel}:"
count = sum(1 for k in bot.state.keys("yt") if k.startswith(ch_prefix))
if count >= _MAX_CHANNELS:
await bot.reply(message, f"Channel limit reached ({_MAX_CHANNELS})")
return
now = datetime.now(timezone.utc).isoformat()
data = {
"channel_id": channel_id,
"feed_url": feed_url,
"name": name,
"channel": irc_channel,
"interval": _DEFAULT_INTERVAL,
"added_by": message.nick,
"added_at": now,
"seen": seen,
"last_poll": now,
"last_error": "",
"etag": result["etag"],
"last_modified": result["last_modified"],
"title": channel_title,
}
_save(bot, key, data)
_channels[key] = data
_start_poller(bot, key)
display = channel_title or name
item_count = len(seen)
await bot.reply(
message,
f"Following '{name}' ({display}, {item_count} existing videos)",
)
return
# -- unfollow (admin, channel only) ---------------------------------------
if sub == "unfollow":
if not bot._is_admin(message):
await bot.reply(message, "Permission denied: unfollow requires admin")
return
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 3:
await bot.reply(message, "Usage: !yt unfollow <name>")
return
name = parts[2].lower()
channel = message.target
key = _state_key(channel, name)
if _load(bot, key) is None:
await bot.reply(message, f"No channel '{name}' in this channel")
return
_stop_poller(key)
_delete(bot, key)
await bot.reply(message, f"Unfollowed '{name}'")
return
await bot.reply(message, "Usage: !yt <follow|unfollow|list|check> [args]")