Connect to multiple IRC servers concurrently from a single config file. Plugins are loaded once and shared; per-server state is isolated via separate SQLite databases and per-bot runtime state (bot._pstate). - Add build_server_configs() for [servers.*] config layout - Bot.__init__ gains name parameter, _pstate dict for plugin isolation - cli.py runs multiple bots via asyncio.gather - 9 stateful plugins migrated from module-level dicts to _ps(bot) pattern - Backward compatible: legacy [server] config works unchanged Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
702 lines
23 KiB
Python
702 lines
23 KiB
Python
"""Plugin: follow YouTube channels via Atom feeds with periodic polling."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import re
|
|
import urllib.request
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime, timezone
|
|
from urllib.parse import urlparse
|
|
|
|
from derp.http import urlopen as _urlopen
|
|
from derp.plugin import command, event
|
|
|
|
# -- Constants ---------------------------------------------------------------
|
|
|
|
_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,19}$")
|
|
_CHANNEL_ID_RE = re.compile(r"UC[A-Za-z0-9_-]{22}")
|
|
_CHANNEL_URL_RE = re.compile(r"/channel/(UC[A-Za-z0-9_-]{22})")
|
|
_PAGE_BROWSE_RE = re.compile(rb'"browseId"\s*:\s*"(UC[A-Za-z0-9_-]{22})"')
|
|
_PAGE_CHANNEL_RE = re.compile(rb'"channelId"\s*:\s*"(UC[A-Za-z0-9_-]{22})"')
|
|
_VIDEO_ID_RE = re.compile(r"(?:v=|youtu\.be/|/embed/|/shorts/)([A-Za-z0-9_-]{11})")
|
|
_YT_DOMAINS = {"youtube.com", "www.youtube.com", "m.youtube.com", "youtu.be"}
|
|
_YT_FEED_URL = "https://www.youtube.com/feeds/videos.xml?channel_id={}"
|
|
_YT_PLAYER_URL = "https://www.youtube.com/youtubei/v1/player"
|
|
_ANDROID_VERSION = "19.29.37"
|
|
_ANDROID_SDK = 33
|
|
_ANDROID_UA = f"com.google.android.youtube/{_ANDROID_VERSION} (Linux; U; Android 13)"
|
|
_ATOM_NS = "{http://www.w3.org/2005/Atom}"
|
|
_YT_NS = "{http://www.youtube.com/xml/schemas/2015}"
|
|
_MEDIA_NS = "{http://search.yahoo.com/mrss/}"
|
|
_MAX_SEEN = 200
|
|
_MAX_ANNOUNCE = 5
|
|
_DEFAULT_INTERVAL = 600
|
|
_MAX_INTERVAL = 3600
|
|
_FETCH_TIMEOUT = 15
|
|
_USER_AGENT = "derp/1.0"
|
|
_BROWSER_UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36"
|
|
_MAX_TITLE_LEN = 80
|
|
_MAX_CHANNELS = 20
|
|
|
|
# -- Per-bot runtime state ---------------------------------------------------
|
|
|
|
def _ps(bot):
|
|
"""Per-bot plugin runtime state."""
|
|
return bot._pstate.setdefault("yt", {
|
|
"pollers": {},
|
|
"channels": {},
|
|
"errors": {},
|
|
})
|
|
|
|
|
|
# -- Pure helpers ------------------------------------------------------------
|
|
|
|
def _state_key(channel: str, name: str) -> str:
|
|
"""Build composite state key."""
|
|
return f"{channel}:{name}"
|
|
|
|
|
|
def _validate_name(name: str) -> bool:
|
|
"""Check name against allowed pattern."""
|
|
return bool(_NAME_RE.match(name))
|
|
|
|
|
|
def _derive_name(title: str) -> str:
|
|
"""Derive a short feed name from channel title."""
|
|
name = title.lower().strip()
|
|
name = re.sub(r"[^a-z0-9-]", "", name.replace(" ", "-"))
|
|
# Collapse consecutive hyphens
|
|
name = re.sub(r"-{2,}", "-", name).strip("-")
|
|
if not name or not name[0].isalnum():
|
|
name = "yt"
|
|
return name[:20]
|
|
|
|
|
|
def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str:
|
|
"""Truncate text with ellipsis if needed."""
|
|
if len(text) <= max_len:
|
|
return text
|
|
return text[: max_len - 3].rstrip() + "..."
|
|
|
|
|
|
def _compact_num(n: int) -> str:
|
|
"""Format large numbers compactly: 1234 -> 1.2k, 1234567 -> 1.2M."""
|
|
if n >= 1_000_000:
|
|
return f"{n / 1_000_000:.1f}M".replace(".0M", "M")
|
|
if n >= 1_000:
|
|
return f"{n / 1_000:.1f}k".replace(".0k", "k")
|
|
return str(n)
|
|
|
|
|
|
def _is_youtube_url(url: str) -> bool:
|
|
"""Check if URL is a YouTube domain."""
|
|
try:
|
|
hostname = urlparse(url).hostname or ""
|
|
except Exception:
|
|
return False
|
|
return hostname.lower() in _YT_DOMAINS
|
|
|
|
|
|
def _extract_channel_id(url: str) -> str | None:
|
|
"""Try to extract channel ID directly from /channel/ URL."""
|
|
m = _CHANNEL_URL_RE.search(url)
|
|
return m.group(1) if m else None
|
|
|
|
|
|
def _extract_video_id(url: str) -> str | None:
|
|
"""Try to extract video ID from a YouTube URL."""
|
|
m = _VIDEO_ID_RE.search(url)
|
|
return m.group(1) if m else None
|
|
|
|
|
|
# -- Blocking helpers (for executor) -----------------------------------------
|
|
|
|
def _innertube_player(video_id: str) -> dict:
|
|
"""Fetch videoDetails via InnerTube ANDROID client. Blocking.
|
|
|
|
Uses ANDROID client -- WEB client returns LOGIN_REQUIRED since ~2026-02.
|
|
Returns videoDetails dict, or {} on failure.
|
|
"""
|
|
payload = json.dumps({
|
|
"context": {
|
|
"client": {
|
|
"clientName": "ANDROID",
|
|
"clientVersion": _ANDROID_VERSION,
|
|
"androidSdkVersion": _ANDROID_SDK,
|
|
},
|
|
},
|
|
"videoId": video_id,
|
|
}).encode()
|
|
req = urllib.request.Request(_YT_PLAYER_URL, data=payload, method="POST")
|
|
req.add_header("Content-Type", "application/json")
|
|
req.add_header("User-Agent", _ANDROID_UA)
|
|
try:
|
|
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
|
|
raw = resp.read()
|
|
resp.close()
|
|
data = json.loads(raw)
|
|
return data.get("videoDetails") or {}
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _resolve_via_innertube(video_id: str) -> str | None:
|
|
"""Resolve video ID to channel ID via InnerTube player API. Blocking."""
|
|
details = _innertube_player(video_id)
|
|
channel_id = details.get("channelId", "")
|
|
if channel_id and _CHANNEL_ID_RE.fullmatch(channel_id):
|
|
return channel_id
|
|
return None
|
|
|
|
|
|
def _fetch_duration(video_id: str) -> int:
|
|
"""Fetch video duration in seconds via InnerTube player API. Blocking.
|
|
|
|
Returns 0 on failure or for live content.
|
|
"""
|
|
details = _innertube_player(video_id)
|
|
if not details:
|
|
return 0
|
|
if details.get("isLiveContent") and details.get("isLive"):
|
|
return 0
|
|
try:
|
|
return int(details.get("lengthSeconds", 0))
|
|
except (ValueError, TypeError):
|
|
return 0
|
|
|
|
|
|
def _format_duration(seconds: int) -> str:
|
|
"""Format seconds as compact duration: 62 -> '1:02', 3661 -> '1:01:01'."""
|
|
if seconds <= 0:
|
|
return ""
|
|
h, rem = divmod(seconds, 3600)
|
|
m, s = divmod(rem, 60)
|
|
if h:
|
|
return f"{h}:{m:02d}:{s:02d}"
|
|
return f"{m}:{s:02d}"
|
|
|
|
|
|
def _resolve_channel(url: str) -> str | None:
|
|
"""Fetch YouTube page HTML and extract channel ID. Blocking.
|
|
|
|
Fallback for handle/non-video URLs. Tries browseId first, then channelId.
|
|
"""
|
|
req = urllib.request.Request(url, method="GET")
|
|
req.add_header("User-Agent", _BROWSER_UA)
|
|
try:
|
|
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
|
|
body = resp.read(1_048_576) # Read up to 1MB
|
|
resp.close()
|
|
except Exception:
|
|
return None
|
|
for pattern in (_PAGE_BROWSE_RE, _PAGE_CHANNEL_RE):
|
|
m = pattern.search(body)
|
|
if m:
|
|
return m.group(1).decode()
|
|
return None
|
|
|
|
|
|
def _fetch_feed(url: str, etag: str = "", last_modified: str = "") -> dict:
|
|
"""Blocking HTTP GET for feed content. Run via executor."""
|
|
result: dict = {
|
|
"status": 0,
|
|
"body": b"",
|
|
"etag": "",
|
|
"last_modified": "",
|
|
"error": "",
|
|
}
|
|
|
|
req = urllib.request.Request(url, method="GET")
|
|
req.add_header("User-Agent", _USER_AGENT)
|
|
if etag:
|
|
req.add_header("If-None-Match", etag)
|
|
if last_modified:
|
|
req.add_header("If-Modified-Since", last_modified)
|
|
|
|
try:
|
|
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
|
|
result["status"] = resp.status
|
|
result["body"] = resp.read()
|
|
result["etag"] = resp.headers.get("ETag", "")
|
|
result["last_modified"] = resp.headers.get("Last-Modified", "")
|
|
resp.close()
|
|
except urllib.error.HTTPError as exc:
|
|
result["status"] = exc.code
|
|
if exc.code == 304:
|
|
result["etag"] = etag
|
|
result["last_modified"] = last_modified
|
|
else:
|
|
result["error"] = f"HTTP {exc.code}"
|
|
except urllib.error.URLError as exc:
|
|
result["error"] = str(exc.reason)
|
|
except Exception as exc:
|
|
result["error"] = str(exc)
|
|
|
|
return result
|
|
|
|
|
|
# -- Feed parsing ------------------------------------------------------------
|
|
|
|
def _parse_feed(body: bytes) -> tuple[str, list[dict]]:
|
|
"""Parse YouTube Atom feed. Returns (channel_name, items).
|
|
|
|
Each item: {"id": "yt:video:...", "title": "...", "link": "..."}
|
|
"""
|
|
root = ET.fromstring(body)
|
|
author = root.find(f"{_ATOM_NS}author")
|
|
channel_name = ""
|
|
if author is not None:
|
|
channel_name = (author.findtext(f"{_ATOM_NS}name") or "").strip()
|
|
if not channel_name:
|
|
channel_name = (root.findtext(f"{_ATOM_NS}title") or "").strip()
|
|
|
|
items = []
|
|
for entry in root.findall(f"{_ATOM_NS}entry"):
|
|
entry_id = (entry.findtext(f"{_ATOM_NS}id") or "").strip()
|
|
video_id = (entry.findtext(f"{_YT_NS}videoId") or "").strip()
|
|
entry_title = (entry.findtext(f"{_ATOM_NS}title") or "").strip()
|
|
if video_id:
|
|
link = f"https://www.youtube.com/watch?v={video_id}"
|
|
else:
|
|
link_el = entry.find(f"{_ATOM_NS}link")
|
|
link = (link_el.get("href", "") if link_el is not None else "").strip()
|
|
if not entry_id:
|
|
entry_id = link
|
|
# Published date
|
|
published = (entry.findtext(f"{_ATOM_NS}published") or "").strip()
|
|
date = published[:10] if len(published) >= 10 else ""
|
|
# media:statistics views + media:starRating count (likes)
|
|
views = 0
|
|
likes = 0
|
|
group = entry.find(f"{_MEDIA_NS}group")
|
|
if group is not None:
|
|
community = group.find(f"{_MEDIA_NS}community")
|
|
if community is not None:
|
|
stats_el = community.find(f"{_MEDIA_NS}statistics")
|
|
if stats_el is not None:
|
|
try:
|
|
views = int(stats_el.get("views", "0"))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
rating_el = community.find(f"{_MEDIA_NS}starRating")
|
|
if rating_el is not None:
|
|
try:
|
|
likes = int(rating_el.get("count", "0"))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
if entry_id:
|
|
items.append({
|
|
"id": entry_id, "title": entry_title, "link": link,
|
|
"date": date, "views": views, "likes": likes,
|
|
})
|
|
return (channel_name, items)
|
|
|
|
|
|
# -- State helpers -----------------------------------------------------------
|
|
|
|
def _save(bot, key: str, data: dict) -> None:
|
|
"""Persist channel data to bot.state."""
|
|
bot.state.set("yt", key, json.dumps(data))
|
|
|
|
|
|
def _load(bot, key: str) -> dict | None:
|
|
"""Load channel data from bot.state."""
|
|
raw = bot.state.get("yt", key)
|
|
if raw is None:
|
|
return None
|
|
try:
|
|
return json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def _delete(bot, key: str) -> None:
|
|
"""Remove channel data from bot.state."""
|
|
bot.state.delete("yt", key)
|
|
|
|
|
|
# -- Polling -----------------------------------------------------------------
|
|
|
|
async def _poll_once(bot, key: str, announce: bool = True) -> None:
|
|
"""Single poll cycle for one YouTube channel."""
|
|
ps = _ps(bot)
|
|
data = ps["channels"].get(key)
|
|
if data is None:
|
|
data = _load(bot, key)
|
|
if data is None:
|
|
return
|
|
ps["channels"][key] = data
|
|
|
|
url = data["feed_url"]
|
|
etag = data.get("etag", "")
|
|
last_modified = data.get("last_modified", "")
|
|
|
|
loop = asyncio.get_running_loop()
|
|
result = await loop.run_in_executor(
|
|
None, _fetch_feed, url, etag, last_modified,
|
|
)
|
|
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
data["last_poll"] = now
|
|
|
|
if result["error"]:
|
|
data["last_error"] = result["error"]
|
|
ps["errors"][key] = ps["errors"].get(key, 0) + 1
|
|
ps["channels"][key] = data
|
|
_save(bot, key, data)
|
|
return
|
|
|
|
# HTTP 304 -- not modified
|
|
if result["status"] == 304:
|
|
data["last_error"] = ""
|
|
ps["errors"][key] = 0
|
|
ps["channels"][key] = data
|
|
_save(bot, key, data)
|
|
return
|
|
|
|
# Update conditional headers
|
|
data["etag"] = result["etag"]
|
|
data["last_modified"] = result["last_modified"]
|
|
data["last_error"] = ""
|
|
ps["errors"][key] = 0
|
|
|
|
try:
|
|
_, items = _parse_feed(result["body"])
|
|
except Exception as exc:
|
|
data["last_error"] = f"Parse error: {exc}"
|
|
ps["errors"][key] = ps["errors"].get(key, 0) + 1
|
|
ps["channels"][key] = data
|
|
_save(bot, key, data)
|
|
return
|
|
|
|
seen = set(data.get("seen", []))
|
|
seen_list = list(data.get("seen", []))
|
|
new_items = [item for item in items if item["id"] not in seen]
|
|
|
|
if announce and new_items:
|
|
channel = data["channel"]
|
|
name = data["name"]
|
|
shown = new_items[:_MAX_ANNOUNCE]
|
|
# Fetch durations for announced videos concurrently
|
|
durations: dict[str, int] = {}
|
|
video_ids = []
|
|
for item in shown:
|
|
vid = item["id"].removeprefix("yt:video:")
|
|
if vid != item["id"]:
|
|
video_ids.append((item["id"], vid))
|
|
if video_ids:
|
|
futs = {
|
|
item_id: loop.run_in_executor(None, _fetch_duration, vid)
|
|
for item_id, vid in video_ids
|
|
}
|
|
for item_id, fut in futs.items():
|
|
try:
|
|
durations[item_id] = await fut
|
|
except Exception:
|
|
pass
|
|
for item in shown:
|
|
title = _truncate(item["title"]) if item["title"] else "(no title)"
|
|
link = item["link"]
|
|
if link:
|
|
link = await bot.shorten_url(link)
|
|
# Build metadata suffix
|
|
parts = []
|
|
dur = durations.get(item["id"], 0)
|
|
dur_str = _format_duration(dur)
|
|
if dur_str:
|
|
parts.append(dur_str)
|
|
views = item.get("views", 0)
|
|
likes = item.get("likes", 0)
|
|
if views:
|
|
parts.append(f"{_compact_num(views)}v")
|
|
if likes:
|
|
parts.append(f"{_compact_num(likes)}lk")
|
|
date = item.get("date", "")
|
|
if date:
|
|
parts.append(date)
|
|
extra = " ".join(parts)
|
|
line = f"[{name}] {title}"
|
|
if extra:
|
|
line += f" | {extra}"
|
|
if link:
|
|
line += f" -- {link}"
|
|
await bot.send(channel, line)
|
|
remaining = len(new_items) - len(shown)
|
|
if remaining > 0:
|
|
await bot.send(channel, f"[{name}] ... and {remaining} more")
|
|
|
|
# Update seen list
|
|
for item in new_items:
|
|
seen_list.append(item["id"])
|
|
if len(seen_list) > _MAX_SEEN:
|
|
seen_list = seen_list[-_MAX_SEEN:]
|
|
data["seen"] = seen_list
|
|
|
|
ps["channels"][key] = data
|
|
_save(bot, key, data)
|
|
|
|
|
|
async def _poll_loop(bot, key: str) -> None:
|
|
"""Infinite poll loop for one YouTube channel."""
|
|
try:
|
|
while True:
|
|
ps = _ps(bot)
|
|
data = ps["channels"].get(key) or _load(bot, key)
|
|
if data is None:
|
|
return
|
|
interval = data.get("interval", _DEFAULT_INTERVAL)
|
|
# Back off on consecutive errors
|
|
errs = ps["errors"].get(key, 0)
|
|
if errs >= 5:
|
|
interval = min(interval * 2, _MAX_INTERVAL)
|
|
await asyncio.sleep(interval)
|
|
await _poll_once(bot, key, announce=True)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
|
|
def _start_poller(bot, key: str) -> None:
|
|
"""Create and track a poller task."""
|
|
ps = _ps(bot)
|
|
existing = ps["pollers"].get(key)
|
|
if existing and not existing.done():
|
|
return
|
|
task = asyncio.create_task(_poll_loop(bot, key))
|
|
ps["pollers"][key] = task
|
|
|
|
|
|
def _stop_poller(bot, key: str) -> None:
|
|
"""Cancel and remove a poller task."""
|
|
ps = _ps(bot)
|
|
task = ps["pollers"].pop(key, None)
|
|
if task and not task.done():
|
|
task.cancel()
|
|
ps["channels"].pop(key, None)
|
|
ps["errors"].pop(key, 0)
|
|
|
|
|
|
# -- Restore on connect -----------------------------------------------------
|
|
|
|
def _restore(bot) -> None:
|
|
"""Rebuild pollers from persisted state."""
|
|
ps = _ps(bot)
|
|
for key in bot.state.keys("yt"):
|
|
existing = ps["pollers"].get(key)
|
|
if existing and not existing.done():
|
|
continue
|
|
data = _load(bot, key)
|
|
if data is None:
|
|
continue
|
|
ps["channels"][key] = data
|
|
_start_poller(bot, key)
|
|
|
|
|
|
@event("001")
|
|
async def on_connect(bot, message):
|
|
"""Restore YouTube channel pollers on connect."""
|
|
_restore(bot)
|
|
|
|
|
|
# -- Command handler ---------------------------------------------------------
|
|
|
|
@command("yt", help="YouTube: !yt follow|unfollow|list|check")
|
|
async def cmd_yt(bot, message):
|
|
"""Per-channel YouTube channel subscriptions.
|
|
|
|
Usage:
|
|
!yt follow <url> [name] Follow a YouTube channel (admin)
|
|
!yt unfollow <name> Unfollow a channel (admin)
|
|
!yt list List followed channels
|
|
!yt check <name> Force-poll a channel now
|
|
"""
|
|
parts = message.text.split(None, 3)
|
|
if len(parts) < 2:
|
|
await bot.reply(message, "Usage: !yt <follow|unfollow|list|check> [args]")
|
|
return
|
|
|
|
sub = parts[1].lower()
|
|
|
|
# -- list (any user, channel only) ----------------------------------------
|
|
if sub == "list":
|
|
if not message.is_channel:
|
|
await bot.reply(message, "Use this command in a channel")
|
|
return
|
|
channel = message.target
|
|
prefix = f"{channel}:"
|
|
channels = []
|
|
for key in bot.state.keys("yt"):
|
|
if key.startswith(prefix):
|
|
data = _load(bot, key)
|
|
if data:
|
|
name = data["name"]
|
|
err = data.get("last_error", "")
|
|
if err:
|
|
channels.append(f"{name} (error)")
|
|
else:
|
|
channels.append(name)
|
|
if not channels:
|
|
await bot.reply(message, "No YouTube channels in this channel")
|
|
return
|
|
await bot.reply(message, f"YouTube: {', '.join(channels)}")
|
|
return
|
|
|
|
# -- check (any user, channel only) ---------------------------------------
|
|
if sub == "check":
|
|
if not message.is_channel:
|
|
await bot.reply(message, "Use this command in a channel")
|
|
return
|
|
if len(parts) < 3:
|
|
await bot.reply(message, "Usage: !yt check <name>")
|
|
return
|
|
name = parts[2].lower()
|
|
channel = message.target
|
|
key = _state_key(channel, name)
|
|
data = _load(bot, key)
|
|
if data is None:
|
|
await bot.reply(message, f"No channel '{name}' in this channel")
|
|
return
|
|
ps = _ps(bot)
|
|
ps["channels"][key] = data
|
|
await _poll_once(bot, key, announce=True)
|
|
data = ps["channels"].get(key, data)
|
|
if data.get("last_error"):
|
|
await bot.reply(message, f"{name}: error -- {data['last_error']}")
|
|
else:
|
|
await bot.reply(message, f"{name}: checked")
|
|
return
|
|
|
|
# -- follow (admin, channel only) -----------------------------------------
|
|
if sub == "follow":
|
|
if not bot._is_admin(message):
|
|
await bot.reply(message, "Permission denied: follow requires admin")
|
|
return
|
|
if not message.is_channel:
|
|
await bot.reply(message, "Use this command in a channel")
|
|
return
|
|
if len(parts) < 3:
|
|
await bot.reply(message, "Usage: !yt follow <url> [name]")
|
|
return
|
|
|
|
url = parts[2]
|
|
if not url.startswith(("http://", "https://")):
|
|
url = f"https://{url}"
|
|
|
|
if not _is_youtube_url(url):
|
|
await bot.reply(message, "Not a YouTube URL")
|
|
return
|
|
|
|
# Resolve channel ID
|
|
loop = asyncio.get_running_loop()
|
|
channel_id = _extract_channel_id(url)
|
|
if not channel_id:
|
|
video_id = _extract_video_id(url)
|
|
if video_id:
|
|
channel_id = await loop.run_in_executor(
|
|
None, _resolve_via_innertube, video_id,
|
|
)
|
|
if not channel_id:
|
|
channel_id = await loop.run_in_executor(None, _resolve_channel, url)
|
|
if not channel_id:
|
|
await bot.reply(message, "Could not resolve YouTube channel ID")
|
|
return
|
|
|
|
feed_url = _YT_FEED_URL.format(channel_id)
|
|
|
|
# Test-fetch to validate and get channel name
|
|
result = await loop.run_in_executor(None, _fetch_feed, feed_url, "", "")
|
|
|
|
if result["error"]:
|
|
await bot.reply(message, f"Feed fetch failed: {result['error']}")
|
|
return
|
|
|
|
channel_title = ""
|
|
seen = []
|
|
try:
|
|
channel_title, items = _parse_feed(result["body"])
|
|
seen = [item["id"] for item in items]
|
|
if len(seen) > _MAX_SEEN:
|
|
seen = seen[-_MAX_SEEN:]
|
|
except Exception as exc:
|
|
await bot.reply(message, f"Feed parse failed: {exc}")
|
|
return
|
|
|
|
name = parts[3].lower() if len(parts) > 3 else _derive_name(channel_title or "yt")
|
|
if not _validate_name(name):
|
|
await bot.reply(
|
|
message,
|
|
"Invalid name (lowercase alphanumeric + hyphens, 1-20 chars)",
|
|
)
|
|
return
|
|
|
|
irc_channel = message.target
|
|
key = _state_key(irc_channel, name)
|
|
|
|
# Check for duplicate
|
|
if _load(bot, key) is not None:
|
|
await bot.reply(message, f"Channel '{name}' already exists in this channel")
|
|
return
|
|
|
|
# Check per-channel limit
|
|
ch_prefix = f"{irc_channel}:"
|
|
count = sum(1 for k in bot.state.keys("yt") if k.startswith(ch_prefix))
|
|
if count >= _MAX_CHANNELS:
|
|
await bot.reply(message, f"Channel limit reached ({_MAX_CHANNELS})")
|
|
return
|
|
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
data = {
|
|
"channel_id": channel_id,
|
|
"feed_url": feed_url,
|
|
"name": name,
|
|
"channel": irc_channel,
|
|
"interval": _DEFAULT_INTERVAL,
|
|
"added_by": message.nick,
|
|
"added_at": now,
|
|
"seen": seen,
|
|
"last_poll": now,
|
|
"last_error": "",
|
|
"etag": result["etag"],
|
|
"last_modified": result["last_modified"],
|
|
"title": channel_title,
|
|
}
|
|
_save(bot, key, data)
|
|
_ps(bot)["channels"][key] = data
|
|
_start_poller(bot, key)
|
|
|
|
display = channel_title or name
|
|
item_count = len(seen)
|
|
await bot.reply(
|
|
message,
|
|
f"Following '{name}' ({display}, {item_count} existing videos)",
|
|
)
|
|
return
|
|
|
|
# -- unfollow (admin, channel only) ---------------------------------------
|
|
if sub == "unfollow":
|
|
if not bot._is_admin(message):
|
|
await bot.reply(message, "Permission denied: unfollow requires admin")
|
|
return
|
|
if not message.is_channel:
|
|
await bot.reply(message, "Use this command in a channel")
|
|
return
|
|
if len(parts) < 3:
|
|
await bot.reply(message, "Usage: !yt unfollow <name>")
|
|
return
|
|
|
|
name = parts[2].lower()
|
|
channel = message.target
|
|
key = _state_key(channel, name)
|
|
|
|
if _load(bot, key) is None:
|
|
await bot.reply(message, f"No channel '{name}' in this channel")
|
|
return
|
|
|
|
_stop_poller(bot, key)
|
|
_delete(bot, key)
|
|
await bot.reply(message, f"Unfollowed '{name}'")
|
|
return
|
|
|
|
await bot.reply(message, "Usage: !yt <follow|unfollow|list|check> [args]")
|