From 3955935da400564a2c51dfded9084740e5f66e74 Mon Sep 17 00:00:00 2001 From: user Date: Sun, 15 Feb 2026 14:34:20 +0100 Subject: [PATCH] feat: add YouTube channel follow plugin Follow YouTube channels via Atom feeds with !yt follow/unfollow/list/check. Resolves any YouTube URL to a channel ID, polls for new videos, and announces them in IRC channels. --- plugins/youtube.py | 542 ++++++++++++++++++++ tests/test_youtube.py | 1105 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 1647 insertions(+) create mode 100644 plugins/youtube.py create mode 100644 tests/test_youtube.py diff --git a/plugins/youtube.py b/plugins/youtube.py new file mode 100644 index 0000000..098fdce --- /dev/null +++ b/plugins/youtube.py @@ -0,0 +1,542 @@ +"""Plugin: follow YouTube channels via Atom feeds with periodic polling.""" + +from __future__ import annotations + +import asyncio +import json +import re +import ssl +import urllib.request +import xml.etree.ElementTree as ET +from datetime import datetime, timezone +from urllib.parse import urlparse + +from derp.plugin import command, event + +# -- Constants --------------------------------------------------------------- + +_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,19}$") +_CHANNEL_ID_RE = re.compile(r"UC[A-Za-z0-9_-]{22}") +_CHANNEL_URL_RE = re.compile(r"/channel/(UC[A-Za-z0-9_-]{22})") +_PAGE_BROWSE_RE = re.compile(rb'"browseId"\s*:\s*"(UC[A-Za-z0-9_-]{22})"') +_PAGE_CHANNEL_RE = re.compile(rb'"channelId"\s*:\s*"(UC[A-Za-z0-9_-]{22})"') +_YT_DOMAINS = {"youtube.com", "www.youtube.com", "m.youtube.com", "youtu.be"} +_YT_FEED_URL = "https://www.youtube.com/feeds/videos.xml?channel_id={}" +_ATOM_NS = "{http://www.w3.org/2005/Atom}" +_YT_NS = "{http://www.youtube.com/xml/schemas/2015}" +_MAX_SEEN = 200 +_MAX_ANNOUNCE = 5 +_DEFAULT_INTERVAL = 600 +_MAX_INTERVAL = 3600 +_FETCH_TIMEOUT = 15 +_USER_AGENT = "derp/1.0" +_BROWSER_UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36" +_MAX_TITLE_LEN = 80 +_MAX_CHANNELS = 20 + +# -- Module-level tracking --------------------------------------------------- + +_pollers: dict[str, asyncio.Task] = {} +_channels: dict[str, dict] = {} +_errors: dict[str, int] = {} + + +# -- Pure helpers ------------------------------------------------------------ + +def _state_key(channel: str, name: str) -> str: + """Build composite state key.""" + return f"{channel}:{name}" + + +def _validate_name(name: str) -> bool: + """Check name against allowed pattern.""" + return bool(_NAME_RE.match(name)) + + +def _derive_name(title: str) -> str: + """Derive a short feed name from channel title.""" + name = title.lower().strip() + name = re.sub(r"[^a-z0-9-]", "", name.replace(" ", "-")) + # Collapse consecutive hyphens + name = re.sub(r"-{2,}", "-", name).strip("-") + if not name or not name[0].isalnum(): + name = "yt" + return name[:20] + + +def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str: + """Truncate text with ellipsis if needed.""" + if len(text) <= max_len: + return text + return text[: max_len - 3].rstrip() + "..." + + +def _is_youtube_url(url: str) -> bool: + """Check if URL is a YouTube domain.""" + try: + hostname = urlparse(url).hostname or "" + except Exception: + return False + return hostname.lower() in _YT_DOMAINS + + +def _extract_channel_id(url: str) -> str | None: + """Try to extract channel ID directly from /channel/ URL.""" + m = _CHANNEL_URL_RE.search(url) + return m.group(1) if m else None + + +# -- Blocking helpers (for executor) ----------------------------------------- + +def _resolve_channel(url: str) -> str | None: + """Fetch YouTube page HTML and extract channel ID. Blocking. + + Tries browseId first (reliable on both channel and video pages), + then falls back to channelId (correct on video pages but may match + recommended channels on channel pages). + """ + req = urllib.request.Request(url, method="GET") + req.add_header("User-Agent", _BROWSER_UA) + ctx = ssl.create_default_context() + try: + resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT, context=ctx) + body = resp.read(1_048_576) # Read up to 1MB + resp.close() + except Exception: + return None + for pattern in (_PAGE_BROWSE_RE, _PAGE_CHANNEL_RE): + m = pattern.search(body) + if m: + return m.group(1).decode() + return None + + +def _fetch_feed(url: str, etag: str = "", last_modified: str = "") -> dict: + """Blocking HTTP GET for feed content. Run via executor.""" + result: dict = { + "status": 0, + "body": b"", + "etag": "", + "last_modified": "", + "error": "", + } + + req = urllib.request.Request(url, method="GET") + req.add_header("User-Agent", _USER_AGENT) + if etag: + req.add_header("If-None-Match", etag) + if last_modified: + req.add_header("If-Modified-Since", last_modified) + + ctx = ssl.create_default_context() + + try: + resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT, context=ctx) + result["status"] = resp.status + result["body"] = resp.read() + result["etag"] = resp.headers.get("ETag", "") + result["last_modified"] = resp.headers.get("Last-Modified", "") + resp.close() + except urllib.error.HTTPError as exc: + result["status"] = exc.code + if exc.code == 304: + result["etag"] = etag + result["last_modified"] = last_modified + else: + result["error"] = f"HTTP {exc.code}" + except urllib.error.URLError as exc: + result["error"] = str(exc.reason) + except Exception as exc: + result["error"] = str(exc) + + return result + + +# -- Feed parsing ------------------------------------------------------------ + +def _parse_feed(body: bytes) -> tuple[str, list[dict]]: + """Parse YouTube Atom feed. Returns (channel_name, items). + + Each item: {"id": "yt:video:...", "title": "...", "link": "..."} + """ + root = ET.fromstring(body) + author = root.find(f"{_ATOM_NS}author") + channel_name = "" + if author is not None: + channel_name = (author.findtext(f"{_ATOM_NS}name") or "").strip() + if not channel_name: + channel_name = (root.findtext(f"{_ATOM_NS}title") or "").strip() + + items = [] + for entry in root.findall(f"{_ATOM_NS}entry"): + entry_id = (entry.findtext(f"{_ATOM_NS}id") or "").strip() + video_id = (entry.findtext(f"{_YT_NS}videoId") or "").strip() + entry_title = (entry.findtext(f"{_ATOM_NS}title") or "").strip() + if video_id: + link = f"https://www.youtube.com/watch?v={video_id}" + else: + link_el = entry.find(f"{_ATOM_NS}link") + link = (link_el.get("href", "") if link_el is not None else "").strip() + if not entry_id: + entry_id = link + if entry_id: + items.append({"id": entry_id, "title": entry_title, "link": link}) + return (channel_name, items) + + +# -- State helpers ----------------------------------------------------------- + +def _save(bot, key: str, data: dict) -> None: + """Persist channel data to bot.state.""" + bot.state.set("yt", key, json.dumps(data)) + + +def _load(bot, key: str) -> dict | None: + """Load channel data from bot.state.""" + raw = bot.state.get("yt", key) + if raw is None: + return None + try: + return json.loads(raw) + except json.JSONDecodeError: + return None + + +def _delete(bot, key: str) -> None: + """Remove channel data from bot.state.""" + bot.state.delete("yt", key) + + +# -- Polling ----------------------------------------------------------------- + +async def _poll_once(bot, key: str, announce: bool = True) -> None: + """Single poll cycle for one YouTube channel.""" + data = _channels.get(key) + if data is None: + data = _load(bot, key) + if data is None: + return + _channels[key] = data + + url = data["feed_url"] + etag = data.get("etag", "") + last_modified = data.get("last_modified", "") + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor( + None, _fetch_feed, url, etag, last_modified, + ) + + now = datetime.now(timezone.utc).isoformat() + data["last_poll"] = now + + if result["error"]: + data["last_error"] = result["error"] + _errors[key] = _errors.get(key, 0) + 1 + _channels[key] = data + _save(bot, key, data) + return + + # HTTP 304 -- not modified + if result["status"] == 304: + data["last_error"] = "" + _errors[key] = 0 + _channels[key] = data + _save(bot, key, data) + return + + # Update conditional headers + data["etag"] = result["etag"] + data["last_modified"] = result["last_modified"] + data["last_error"] = "" + _errors[key] = 0 + + try: + _, items = _parse_feed(result["body"]) + except Exception as exc: + data["last_error"] = f"Parse error: {exc}" + _errors[key] = _errors.get(key, 0) + 1 + _channels[key] = data + _save(bot, key, data) + return + + seen = set(data.get("seen", [])) + seen_list = list(data.get("seen", [])) + new_items = [item for item in items if item["id"] not in seen] + + if announce and new_items: + channel = data["channel"] + name = data["name"] + shown = new_items[:_MAX_ANNOUNCE] + for item in shown: + title = _truncate(item["title"]) if item["title"] else "(no title)" + link = item["link"] + line = f"[{name}] {title}" + if link: + line += f" -- {link}" + await bot.send(channel, line) + remaining = len(new_items) - len(shown) + if remaining > 0: + await bot.send(channel, f"[{name}] ... and {remaining} more") + + # Update seen list + for item in new_items: + seen_list.append(item["id"]) + if len(seen_list) > _MAX_SEEN: + seen_list = seen_list[-_MAX_SEEN:] + data["seen"] = seen_list + + _channels[key] = data + _save(bot, key, data) + + +async def _poll_loop(bot, key: str) -> None: + """Infinite poll loop for one YouTube channel.""" + try: + while True: + data = _channels.get(key) or _load(bot, key) + if data is None: + return + interval = data.get("interval", _DEFAULT_INTERVAL) + # Back off on consecutive errors + errs = _errors.get(key, 0) + if errs >= 5: + interval = min(interval * 2, _MAX_INTERVAL) + await asyncio.sleep(interval) + await _poll_once(bot, key, announce=True) + except asyncio.CancelledError: + pass + + +def _start_poller(bot, key: str) -> None: + """Create and track a poller task.""" + existing = _pollers.get(key) + if existing and not existing.done(): + return + task = asyncio.create_task(_poll_loop(bot, key)) + _pollers[key] = task + + +def _stop_poller(key: str) -> None: + """Cancel and remove a poller task.""" + task = _pollers.pop(key, None) + if task and not task.done(): + task.cancel() + _channels.pop(key, None) + _errors.pop(key, 0) + + +# -- Restore on connect ----------------------------------------------------- + +def _restore(bot) -> None: + """Rebuild pollers from persisted state.""" + for key in bot.state.keys("yt"): + existing = _pollers.get(key) + if existing and not existing.done(): + continue + data = _load(bot, key) + if data is None: + continue + _channels[key] = data + _start_poller(bot, key) + + +@event("001") +async def on_connect(bot, message): + """Restore YouTube channel pollers on connect.""" + _restore(bot) + + +# -- Command handler --------------------------------------------------------- + +@command("yt", help="YouTube: !yt follow|unfollow|list|check") +async def cmd_yt(bot, message): + """Per-channel YouTube channel subscriptions. + + Usage: + !yt follow [name] Follow a YouTube channel (admin) + !yt unfollow Unfollow a channel (admin) + !yt list List followed channels + !yt check Force-poll a channel now + """ + parts = message.text.split(None, 3) + if len(parts) < 2: + await bot.reply(message, "Usage: !yt [args]") + return + + sub = parts[1].lower() + + # -- list (any user, channel only) ---------------------------------------- + if sub == "list": + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + channel = message.target + prefix = f"{channel}:" + channels = [] + for key in bot.state.keys("yt"): + if key.startswith(prefix): + data = _load(bot, key) + if data: + name = data["name"] + err = data.get("last_error", "") + if err: + channels.append(f"{name} (error)") + else: + channels.append(name) + if not channels: + await bot.reply(message, "No YouTube channels in this channel") + return + await bot.reply(message, f"YouTube: {', '.join(channels)}") + return + + # -- check (any user, channel only) --------------------------------------- + if sub == "check": + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + if len(parts) < 3: + await bot.reply(message, "Usage: !yt check ") + return + name = parts[2].lower() + channel = message.target + key = _state_key(channel, name) + data = _load(bot, key) + if data is None: + await bot.reply(message, f"No channel '{name}' in this channel") + return + _channels[key] = data + await _poll_once(bot, key, announce=True) + data = _channels.get(key, data) + if data.get("last_error"): + await bot.reply(message, f"{name}: error -- {data['last_error']}") + else: + await bot.reply(message, f"{name}: checked") + return + + # -- follow (admin, channel only) ----------------------------------------- + if sub == "follow": + if not bot._is_admin(message): + await bot.reply(message, "Permission denied: follow requires admin") + return + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + if len(parts) < 3: + await bot.reply(message, "Usage: !yt follow [name]") + return + + url = parts[2] + if not url.startswith(("http://", "https://")): + url = f"https://{url}" + + if not _is_youtube_url(url): + await bot.reply(message, "Not a YouTube URL") + return + + # Resolve channel ID + loop = asyncio.get_running_loop() + channel_id = _extract_channel_id(url) + if not channel_id: + channel_id = await loop.run_in_executor(None, _resolve_channel, url) + if not channel_id: + await bot.reply(message, "Could not resolve YouTube channel ID") + return + + feed_url = _YT_FEED_URL.format(channel_id) + + # Test-fetch to validate and get channel name + result = await loop.run_in_executor(None, _fetch_feed, feed_url, "", "") + + if result["error"]: + await bot.reply(message, f"Feed fetch failed: {result['error']}") + return + + channel_title = "" + seen = [] + try: + channel_title, items = _parse_feed(result["body"]) + seen = [item["id"] for item in items] + if len(seen) > _MAX_SEEN: + seen = seen[-_MAX_SEEN:] + except Exception as exc: + await bot.reply(message, f"Feed parse failed: {exc}") + return + + name = parts[3].lower() if len(parts) > 3 else _derive_name(channel_title or "yt") + if not _validate_name(name): + await bot.reply( + message, + "Invalid name (lowercase alphanumeric + hyphens, 1-20 chars)", + ) + return + + irc_channel = message.target + key = _state_key(irc_channel, name) + + # Check for duplicate + if _load(bot, key) is not None: + await bot.reply(message, f"Channel '{name}' already exists in this channel") + return + + # Check per-channel limit + ch_prefix = f"{irc_channel}:" + count = sum(1 for k in bot.state.keys("yt") if k.startswith(ch_prefix)) + if count >= _MAX_CHANNELS: + await bot.reply(message, f"Channel limit reached ({_MAX_CHANNELS})") + return + + now = datetime.now(timezone.utc).isoformat() + data = { + "channel_id": channel_id, + "feed_url": feed_url, + "name": name, + "channel": irc_channel, + "interval": _DEFAULT_INTERVAL, + "added_by": message.nick, + "added_at": now, + "seen": seen, + "last_poll": now, + "last_error": "", + "etag": result["etag"], + "last_modified": result["last_modified"], + "title": channel_title, + } + _save(bot, key, data) + _channels[key] = data + _start_poller(bot, key) + + display = channel_title or name + item_count = len(seen) + await bot.reply( + message, + f"Following '{name}' ({display}, {item_count} existing videos)", + ) + return + + # -- unfollow (admin, channel only) --------------------------------------- + if sub == "unfollow": + if not bot._is_admin(message): + await bot.reply(message, "Permission denied: unfollow requires admin") + return + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + if len(parts) < 3: + await bot.reply(message, "Usage: !yt unfollow ") + return + + name = parts[2].lower() + channel = message.target + key = _state_key(channel, name) + + if _load(bot, key) is None: + await bot.reply(message, f"No channel '{name}' in this channel") + return + + _stop_poller(key) + _delete(bot, key) + await bot.reply(message, f"Unfollowed '{name}'") + return + + await bot.reply(message, "Usage: !yt [args]") diff --git a/tests/test_youtube.py b/tests/test_youtube.py new file mode 100644 index 0000000..6287e66 --- /dev/null +++ b/tests/test_youtube.py @@ -0,0 +1,1105 @@ +"""Tests for the YouTube channel follow plugin.""" + +import asyncio +import importlib.util +import sys +from pathlib import Path +from unittest.mock import patch + +from derp.irc import Message + +# plugins/ is not a Python package -- load the module from file path +_spec = importlib.util.spec_from_file_location( + "plugins.youtube", Path(__file__).resolve().parent.parent / "plugins" / "youtube.py", +) +_mod = importlib.util.module_from_spec(_spec) +sys.modules[_spec.name] = _mod +_spec.loader.exec_module(_mod) + +from plugins.youtube import ( # noqa: E402 + _MAX_ANNOUNCE, + _channels, + _delete, + _derive_name, + _errors, + _extract_channel_id, + _is_youtube_url, + _load, + _parse_feed, + _poll_once, + _pollers, + _restore, + _save, + _start_poller, + _state_key, + _stop_poller, + _truncate, + _validate_name, + cmd_yt, + on_connect, +) + +# -- Fixtures ---------------------------------------------------------------- + +YT_ATOM_FEED = b"""\ + + + 3Blue1Brown - Videos + 3Blue1Brown + + yt:video:abc123 + abc123 + Linear Algebra + + + + yt:video:def456 + def456 + Calculus + + + + yt:video:ghi789 + ghi789 + Neural Networks + + + +""" + +YT_ATOM_NO_VIDEOID = b"""\ + + + Test Channel + Test Channel + + yt:video:xyz + Fallback Link + + + +""" + +YT_ATOM_EMPTY = b"""\ + + + Empty Channel + Empty Channel + +""" + +FAKE_YT_PAGE_BROWSE = b"""\ + + + +""" + +FAKE_YT_PAGE_CHANNELID = b"""\ + + + +""" + +FAKE_YT_PAGE_NO_ID = b"""\ +No channel here +""" + + +# -- Helpers ----------------------------------------------------------------- + +class _FakeState: + """In-memory stand-in for bot.state.""" + + def __init__(self): + self._store: dict[str, dict[str, str]] = {} + + def get(self, plugin: str, key: str, default: str | None = None) -> str | None: + return self._store.get(plugin, {}).get(key, default) + + def set(self, plugin: str, key: str, value: str) -> None: + self._store.setdefault(plugin, {})[key] = value + + def delete(self, plugin: str, key: str) -> bool: + try: + del self._store[plugin][key] + return True + except KeyError: + return False + + def keys(self, plugin: str) -> list[str]: + return sorted(self._store.get(plugin, {}).keys()) + + +class _FakeBot: + """Minimal bot stand-in that captures sent/replied messages.""" + + def __init__(self, *, admin: bool = False): + self.sent: list[tuple[str, str]] = [] + self.replied: list[str] = [] + self.state = _FakeState() + self._admin = admin + + async def send(self, target: str, text: str) -> None: + self.sent.append((target, text)) + + async def reply(self, message, text: str) -> None: + self.replied.append(text) + + def _is_admin(self, message) -> bool: + return self._admin + + +def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message: + """Create a channel PRIVMSG.""" + return Message( + raw="", prefix=f"{nick}!~{nick}@host", nick=nick, + command="PRIVMSG", params=[target, text], tags={}, + ) + + +def _pm(text: str, nick: str = "alice") -> Message: + """Create a private PRIVMSG.""" + return Message( + raw="", prefix=f"{nick}!~{nick}@host", nick=nick, + command="PRIVMSG", params=["botname", text], tags={}, + ) + + +def _clear() -> None: + """Reset module-level state between tests.""" + for task in _pollers.values(): + if task and not task.done(): + task.cancel() + _pollers.clear() + _channels.clear() + _errors.clear() + + +def _fake_fetch_ok(url, etag="", last_modified=""): + """Fake fetch that returns YT_ATOM_FEED.""" + return { + "status": 200, + "body": YT_ATOM_FEED, + "etag": '"abc"', + "last_modified": "Sat, 15 Feb 2026 12:00:00 GMT", + "error": "", + } + + +def _fake_fetch_error(url, etag="", last_modified=""): + """Fake fetch that returns an error.""" + return { + "status": 0, + "body": b"", + "etag": "", + "last_modified": "", + "error": "Connection refused", + } + + +def _fake_fetch_304(url, etag="", last_modified=""): + """Fake fetch that returns 304 Not Modified.""" + return { + "status": 304, + "body": b"", + "etag": etag, + "last_modified": last_modified, + "error": "", + } + + +def _fake_resolve_ok(url): + """Fake page scrape returning a channel ID.""" + return "UCYO_jab_esuFRV4b17AJtAw" + + +def _fake_resolve_fail(url): + """Fake page scrape returning None.""" + return None + + +# --------------------------------------------------------------------------- +# TestIsYoutubeUrl +# --------------------------------------------------------------------------- + +class TestIsYoutubeUrl: + def test_standard(self): + assert _is_youtube_url("https://www.youtube.com/watch?v=abc") is True + + def test_short(self): + assert _is_youtube_url("https://youtu.be/abc") is True + + def test_mobile(self): + assert _is_youtube_url("https://m.youtube.com/watch?v=abc") is True + + def test_no_www(self): + assert _is_youtube_url("https://youtube.com/@handle") is True + + def test_not_youtube(self): + assert _is_youtube_url("https://example.com/video") is False + + def test_invalid(self): + assert _is_youtube_url("not a url") is False + + def test_empty(self): + assert _is_youtube_url("") is False + + +# --------------------------------------------------------------------------- +# TestExtractChannelId +# --------------------------------------------------------------------------- + +class TestExtractChannelId: + def test_direct_channel_url(self): + url = "https://www.youtube.com/channel/UCYO_jab_esuFRV4b17AJtAw" + assert _extract_channel_id(url) == "UCYO_jab_esuFRV4b17AJtAw" + + def test_channel_url_with_path(self): + url = "https://www.youtube.com/channel/UCYO_jab_esuFRV4b17AJtAw/videos" + assert _extract_channel_id(url) == "UCYO_jab_esuFRV4b17AJtAw" + + def test_handle_url_no_match(self): + url = "https://www.youtube.com/@3blue1brown" + assert _extract_channel_id(url) is None + + def test_video_url_no_match(self): + url = "https://www.youtube.com/watch?v=abc123" + assert _extract_channel_id(url) is None + + def test_short_url_no_match(self): + url = "https://youtu.be/abc123" + assert _extract_channel_id(url) is None + + +# --------------------------------------------------------------------------- +# TestDeriveName +# --------------------------------------------------------------------------- + +class TestDeriveName: + def test_simple_title(self): + assert _derive_name("3Blue1Brown") == "3blue1brown" + + def test_title_with_spaces(self): + assert _derive_name("Linus Tech Tips") == "linus-tech-tips" + + def test_title_with_special(self): + assert _derive_name("Tom Scott!") == "tom-scott" + + def test_empty_title(self): + assert _derive_name("") == "yt" + + def test_long_title_truncated(self): + result = _derive_name("A" * 50) + assert len(result) <= 20 + + def test_only_special_chars(self): + assert _derive_name("!!!") == "yt" + + def test_validates(self): + result = _derive_name("Some Channel Name") + assert _validate_name(result) + + +# --------------------------------------------------------------------------- +# TestTruncate +# --------------------------------------------------------------------------- + +class TestTruncate: + def test_short_text_unchanged(self): + assert _truncate("hello", 80) == "hello" + + def test_exact_length_unchanged(self): + text = "a" * 80 + assert _truncate(text, 80) == text + + def test_long_text_truncated(self): + text = "a" * 100 + result = _truncate(text, 80) + assert len(result) == 80 + assert result.endswith("...") + + def test_default_max_length(self): + text = "a" * 100 + result = _truncate(text) + assert len(result) == 80 + + def test_trailing_space_stripped(self): + text = "word " * 20 + result = _truncate(text, 20) + assert not result.endswith(" ...") + + +# --------------------------------------------------------------------------- +# TestParseFeed +# --------------------------------------------------------------------------- + +class TestParseFeed: + def test_parses_entries(self): + channel_name, items = _parse_feed(YT_ATOM_FEED) + assert channel_name == "3Blue1Brown" + assert len(items) == 3 + assert items[0]["id"] == "yt:video:abc123" + assert items[0]["title"] == "Linear Algebra" + assert items[0]["link"] == "https://www.youtube.com/watch?v=abc123" + + def test_builds_link_from_videoid(self): + _, items = _parse_feed(YT_ATOM_FEED) + assert items[1]["link"] == "https://www.youtube.com/watch?v=def456" + + def test_fallback_link_no_videoid(self): + _, items = _parse_feed(YT_ATOM_NO_VIDEOID) + assert len(items) == 1 + assert items[0]["link"] == "https://www.youtube.com/watch?v=xyz" + + def test_empty_feed(self): + channel_name, items = _parse_feed(YT_ATOM_EMPTY) + assert channel_name == "Empty Channel" + assert items == [] + + def test_author_name_preferred(self): + channel_name, _ = _parse_feed(YT_ATOM_FEED) + assert channel_name == "3Blue1Brown" + + +# --------------------------------------------------------------------------- +# TestResolveChannel +# --------------------------------------------------------------------------- + +class TestResolveChannel: + def test_browse_id_preferred(self): + from plugins.youtube import _PAGE_BROWSE_RE + m = _PAGE_BROWSE_RE.search(FAKE_YT_PAGE_BROWSE) + assert m is not None + assert m.group(1).decode() == "UCYO_jab_esuFRV4b17AJtAw" + + def test_channelid_fallback(self): + from plugins.youtube import _PAGE_CHANNEL_RE + m = _PAGE_CHANNEL_RE.search(FAKE_YT_PAGE_CHANNELID) + assert m is not None + assert m.group(1).decode() == "UCsXVk37bltHxD1rDPwtNM8Q" + + def test_no_match_in_page(self): + from plugins.youtube import _PAGE_BROWSE_RE, _PAGE_CHANNEL_RE + assert _PAGE_BROWSE_RE.search(FAKE_YT_PAGE_NO_ID) is None + assert _PAGE_CHANNEL_RE.search(FAKE_YT_PAGE_NO_ID) is None + + +# --------------------------------------------------------------------------- +# TestStateHelpers +# --------------------------------------------------------------------------- + +class TestStateHelpers: + def test_save_and_load(self): + bot = _FakeBot() + data = {"feed_url": "https://example.com/feed", "name": "test"} + _save(bot, "#ch:test", data) + loaded = _load(bot, "#ch:test") + assert loaded == data + + def test_load_missing(self): + bot = _FakeBot() + assert _load(bot, "nonexistent") is None + + def test_delete(self): + bot = _FakeBot() + _save(bot, "#ch:test", {"name": "test"}) + _delete(bot, "#ch:test") + assert _load(bot, "#ch:test") is None + + def test_state_key(self): + assert _state_key("#ops", "3b1b") == "#ops:3b1b" + + def test_load_invalid_json(self): + bot = _FakeBot() + bot.state.set("yt", "bad", "not json{{{") + assert _load(bot, "bad") is None + + +# --------------------------------------------------------------------------- +# TestCmdYtFollow +# --------------------------------------------------------------------------- + +class TestCmdYtFollow: + def test_follow_with_channel_url(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with ( + patch.object(_mod, "_fetch_feed", _fake_fetch_ok), + ): + url = "https://www.youtube.com/channel/UCYO_jab_esuFRV4b17AJtAw" + await cmd_yt(bot, _msg(f"!yt follow {url} 3b1b")) + await asyncio.sleep(0) + assert len(bot.replied) == 1 + assert "Following '3b1b'" in bot.replied[0] + assert "3 existing videos" in bot.replied[0] + data = _load(bot, "#test:3b1b") + assert data is not None + assert data["channel_id"] == "UCYO_jab_esuFRV4b17AJtAw" + assert data["name"] == "3b1b" + assert data["channel"] == "#test" + assert len(data["seen"]) == 3 + assert "#test:3b1b" in _pollers + _stop_poller("#test:3b1b") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_follow_with_handle_url(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with ( + patch.object(_mod, "_resolve_channel", _fake_resolve_ok), + patch.object(_mod, "_fetch_feed", _fake_fetch_ok), + ): + await cmd_yt(bot, _msg("!yt follow https://www.youtube.com/@3blue1brown")) + await asyncio.sleep(0) + assert len(bot.replied) == 1 + assert "Following" in bot.replied[0] + _clear() + + asyncio.run(inner()) + + def test_follow_derives_name(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with ( + patch.object(_mod, "_fetch_feed", _fake_fetch_ok), + ): + url = "https://www.youtube.com/channel/UCYO_jab_esuFRV4b17AJtAw" + await cmd_yt(bot, _msg(f"!yt follow {url}")) + await asyncio.sleep(0) + # Name derived from channel title "3Blue1Brown" + assert "Following '3blue1brown'" in bot.replied[0] + _clear() + + asyncio.run(inner()) + + def test_follow_with_video_url_no_scheme(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with ( + patch.object(_mod, "_resolve_channel", _fake_resolve_ok), + patch.object(_mod, "_fetch_feed", _fake_fetch_ok), + ): + await cmd_yt(bot, _msg("!yt follow youtube.com/watch?v=abc123 test")) + await asyncio.sleep(0) + assert len(bot.replied) == 1 + assert "Following 'test'" in bot.replied[0] + data = _load(bot, "#test:test") + assert data is not None + assert data["channel_id"] == "UCYO_jab_esuFRV4b17AJtAw" + _clear() + + asyncio.run(inner()) + + def test_follow_requires_admin(self): + _clear() + bot = _FakeBot(admin=False) + asyncio.run(cmd_yt(bot, _msg("!yt follow https://www.youtube.com/@test"))) + assert "Permission denied" in bot.replied[0] + + def test_follow_requires_channel(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_yt(bot, _pm("!yt follow https://www.youtube.com/@test"))) + assert "Use this command in a channel" in bot.replied[0] + + def test_follow_not_youtube_url(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_yt(bot, _msg("!yt follow https://example.com/video"))) + assert "Not a YouTube URL" in bot.replied[0] + + def test_follow_resolve_fails(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_resolve_channel", _fake_resolve_fail): + await cmd_yt(bot, _msg("!yt follow https://www.youtube.com/@nonexistent")) + assert "Could not resolve" in bot.replied[0] + + asyncio.run(inner()) + + def test_follow_invalid_name(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_fetch_feed", _fake_fetch_ok): + url = "https://www.youtube.com/channel/UCYO_jab_esuFRV4b17AJtAw" + await cmd_yt(bot, _msg(f"!yt follow {url} BAD!")) + assert "Invalid name" in bot.replied[0] + + asyncio.run(inner()) + + def test_follow_duplicate(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_fetch_feed", _fake_fetch_ok): + url = "https://www.youtube.com/channel/UCYO_jab_esuFRV4b17AJtAw" + await cmd_yt(bot, _msg(f"!yt follow {url} dupe")) + await asyncio.sleep(0) + bot.replied.clear() + with patch.object(_mod, "_fetch_feed", _fake_fetch_ok): + url2 = "https://www.youtube.com/channel/UCYO_jab_esuFRV4b17AJtAw" + await cmd_yt(bot, _msg(f"!yt follow {url2} dupe")) + assert "already exists" in bot.replied[0] + _clear() + + asyncio.run(inner()) + + def test_follow_fetch_error(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_fetch_feed", _fake_fetch_error): + url = "https://www.youtube.com/channel/UCYO_jab_esuFRV4b17AJtAw" + await cmd_yt(bot, _msg(f"!yt follow {url}")) + assert "Feed fetch failed" in bot.replied[0] + + asyncio.run(inner()) + + def test_follow_no_url(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_yt(bot, _msg("!yt follow"))) + assert "Usage:" in bot.replied[0] + + def test_follow_channel_limit(self): + _clear() + bot = _FakeBot(admin=True) + for i in range(20): + _save(bot, f"#test:ch{i}", {"name": f"ch{i}", "channel": "#test"}) + + async def inner(): + with patch.object(_mod, "_fetch_feed", _fake_fetch_ok): + url = "https://www.youtube.com/channel/UCYO_jab_esuFRV4b17AJtAw" + await cmd_yt(bot, _msg(f"!yt follow {url} overflow")) + assert "limit reached" in bot.replied[0] + + asyncio.run(inner()) + + def test_follow_prepends_https(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with ( + patch.object(_mod, "_resolve_channel", _fake_resolve_ok), + patch.object(_mod, "_fetch_feed", _fake_fetch_ok), + ): + await cmd_yt(bot, _msg("!yt follow youtube.com/@test yttest")) + await asyncio.sleep(0) + data = _load(bot, "#test:yttest") + assert data is not None + assert data["channel_id"] == "UCYO_jab_esuFRV4b17AJtAw" + _clear() + + asyncio.run(inner()) + + +# --------------------------------------------------------------------------- +# TestCmdYtUnfollow +# --------------------------------------------------------------------------- + +class TestCmdYtUnfollow: + def test_unfollow_success(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_fetch_feed", _fake_fetch_ok): + url = "https://www.youtube.com/channel/UCYO_jab_esuFRV4b17AJtAw" + await cmd_yt(bot, _msg(f"!yt follow {url} delfeed")) + await asyncio.sleep(0) + bot.replied.clear() + await cmd_yt(bot, _msg("!yt unfollow delfeed")) + assert "Unfollowed 'delfeed'" in bot.replied[0] + assert _load(bot, "#test:delfeed") is None + assert "#test:delfeed" not in _pollers + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_unfollow_requires_admin(self): + _clear() + bot = _FakeBot(admin=False) + asyncio.run(cmd_yt(bot, _msg("!yt unfollow somefeed"))) + assert "Permission denied" in bot.replied[0] + + def test_unfollow_requires_channel(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_yt(bot, _pm("!yt unfollow somefeed"))) + assert "Use this command in a channel" in bot.replied[0] + + def test_unfollow_nonexistent(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_yt(bot, _msg("!yt unfollow nosuchfeed"))) + assert "No channel" in bot.replied[0] + + def test_unfollow_no_name(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_yt(bot, _msg("!yt unfollow"))) + assert "Usage:" in bot.replied[0] + + +# --------------------------------------------------------------------------- +# TestCmdYtList +# --------------------------------------------------------------------------- + +class TestCmdYtList: + def test_list_empty(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_yt(bot, _msg("!yt list"))) + assert "No YouTube channels" in bot.replied[0] + + def test_list_populated(self): + _clear() + bot = _FakeBot() + _save(bot, "#test:3b1b", { + "name": "3b1b", "channel": "#test", + "feed_url": "https://yt.com/feed", "last_error": "", + }) + _save(bot, "#test:veritasium", { + "name": "veritasium", "channel": "#test", + "feed_url": "https://yt.com/feed2", "last_error": "", + }) + asyncio.run(cmd_yt(bot, _msg("!yt list"))) + assert "YouTube:" in bot.replied[0] + assert "3b1b" in bot.replied[0] + assert "veritasium" in bot.replied[0] + + def test_list_shows_error(self): + _clear() + bot = _FakeBot() + _save(bot, "#test:broken", { + "name": "broken", "channel": "#test", + "feed_url": "https://yt.com/feed", "last_error": "Connection refused", + }) + asyncio.run(cmd_yt(bot, _msg("!yt list"))) + assert "broken (error)" in bot.replied[0] + + def test_list_requires_channel(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_yt(bot, _pm("!yt list"))) + assert "Use this command in a channel" in bot.replied[0] + + def test_list_only_this_channel(self): + _clear() + bot = _FakeBot() + _save(bot, "#test:mine", { + "name": "mine", "channel": "#test", + "feed_url": "https://yt.com/feed", "last_error": "", + }) + _save(bot, "#other:theirs", { + "name": "theirs", "channel": "#other", + "feed_url": "https://yt.com/feed2", "last_error": "", + }) + asyncio.run(cmd_yt(bot, _msg("!yt list"))) + assert "mine" in bot.replied[0] + assert "theirs" not in bot.replied[0] + + +# --------------------------------------------------------------------------- +# TestCmdYtCheck +# --------------------------------------------------------------------------- + +class TestCmdYtCheck: + def test_check_success(self): + _clear() + bot = _FakeBot() + data = { + "feed_url": "https://yt.com/feed", "name": "chk", "channel": "#test", + "interval": 600, "seen": ["yt:video:abc123", "yt:video:def456", "yt:video:ghi789"], + "last_poll": "", "last_error": "", "etag": "", "last_modified": "", + "title": "Test", "channel_id": "UC123", + } + _save(bot, "#test:chk", data) + + async def inner(): + with patch.object(_mod, "_fetch_feed", _fake_fetch_ok): + await cmd_yt(bot, _msg("!yt check chk")) + assert "chk: checked" in bot.replied[0] + + asyncio.run(inner()) + + def test_check_nonexistent(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_yt(bot, _msg("!yt check nope"))) + assert "No channel" in bot.replied[0] + + def test_check_requires_channel(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_yt(bot, _pm("!yt check something"))) + assert "Use this command in a channel" in bot.replied[0] + + def test_check_shows_error(self): + _clear() + bot = _FakeBot() + data = { + "feed_url": "https://yt.com/feed", "name": "errfeed", "channel": "#test", + "interval": 600, "seen": [], "last_poll": "", "last_error": "", + "etag": "", "last_modified": "", "title": "", "channel_id": "UC123", + } + _save(bot, "#test:errfeed", data) + + async def inner(): + with patch.object(_mod, "_fetch_feed", _fake_fetch_error): + await cmd_yt(bot, _msg("!yt check errfeed")) + assert "error" in bot.replied[0].lower() + + asyncio.run(inner()) + + def test_check_announces_new_items(self): + _clear() + bot = _FakeBot() + data = { + "feed_url": "https://yt.com/feed", "name": "news", "channel": "#test", + "interval": 600, "seen": ["yt:video:abc123"], + "last_poll": "", "last_error": "", "etag": "", "last_modified": "", + "title": "Test", "channel_id": "UC123", + } + _save(bot, "#test:news", data) + + async def inner(): + with patch.object(_mod, "_fetch_feed", _fake_fetch_ok): + await cmd_yt(bot, _msg("!yt check news")) + announcements = [s for t, s in bot.sent if t == "#test"] + assert len(announcements) == 2 + assert "[news]" in announcements[0] + assert "Calculus" in announcements[0] + + asyncio.run(inner()) + + def test_check_no_name(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_yt(bot, _msg("!yt check"))) + assert "Usage:" in bot.replied[0] + + +# --------------------------------------------------------------------------- +# TestPollOnce +# --------------------------------------------------------------------------- + +class TestPollOnce: + def test_poll_304_clears_error(self): + _clear() + bot = _FakeBot() + data = { + "feed_url": "https://yt.com/feed", "name": "f304", "channel": "#test", + "interval": 600, "seen": [], "last_poll": "", "last_error": "old err", + "etag": '"xyz"', "last_modified": "", "title": "", "channel_id": "UC123", + } + key = "#test:f304" + _save(bot, key, data) + _channels[key] = data + + async def inner(): + with patch.object(_mod, "_fetch_feed", _fake_fetch_304): + await _poll_once(bot, key) + updated = _load(bot, key) + assert updated["last_error"] == "" + + asyncio.run(inner()) + + def test_poll_error_increments(self): + _clear() + bot = _FakeBot() + data = { + "feed_url": "https://yt.com/feed", "name": "ferr", "channel": "#test", + "interval": 600, "seen": [], "last_poll": "", "last_error": "", + "etag": "", "last_modified": "", "title": "", "channel_id": "UC123", + } + key = "#test:ferr" + _save(bot, key, data) + _channels[key] = data + + async def inner(): + with patch.object(_mod, "_fetch_feed", _fake_fetch_error): + await _poll_once(bot, key) + await _poll_once(bot, key) + assert _errors[key] == 2 + updated = _load(bot, key) + assert updated["last_error"] == "Connection refused" + + asyncio.run(inner()) + + def test_poll_max_announce(self): + """Only MAX_ANNOUNCE items are individually announced.""" + _clear() + bot = _FakeBot() + entries_xml = "" + for i in range(8): + entries_xml += f""" + + yt:video:vid{i} + vid{i} + Video {i} + """ + big_feed = f"""\ + + + Big Channel + Big Channel + {entries_xml} +""".encode() + + def fake_big(url, etag="", lm=""): + return {"status": 200, "body": big_feed, "etag": "", "last_modified": "", "error": ""} + + data = { + "feed_url": "https://yt.com/feed", "name": "big", "channel": "#test", + "interval": 600, "seen": [], "last_poll": "", "last_error": "", + "etag": "", "last_modified": "", "title": "", "channel_id": "UC123", + } + key = "#test:big" + _save(bot, key, data) + _channels[key] = data + + async def inner(): + with patch.object(_mod, "_fetch_feed", fake_big): + await _poll_once(bot, key, announce=True) + messages = [s for t, s in bot.sent if t == "#test"] + # 5 individual + 1 "... and N more" + assert len(messages) == _MAX_ANNOUNCE + 1 + assert "... and 3 more" in messages[-1] + + asyncio.run(inner()) + + def test_poll_no_announce_flag(self): + _clear() + bot = _FakeBot() + data = { + "feed_url": "https://yt.com/feed", "name": "quiet", "channel": "#test", + "interval": 600, "seen": [], "last_poll": "", "last_error": "", + "etag": "", "last_modified": "", "title": "", "channel_id": "UC123", + } + key = "#test:quiet" + _save(bot, key, data) + _channels[key] = data + + async def inner(): + with patch.object(_mod, "_fetch_feed", _fake_fetch_ok): + await _poll_once(bot, key, announce=False) + assert len(bot.sent) == 0 + updated = _load(bot, key) + assert len(updated["seen"]) == 3 + + asyncio.run(inner()) + + def test_poll_updates_etag(self): + _clear() + bot = _FakeBot() + data = { + "feed_url": "https://yt.com/feed", "name": "etag", "channel": "#test", + "interval": 600, + "seen": ["yt:video:abc123", "yt:video:def456", "yt:video:ghi789"], + "last_poll": "", "last_error": "", "etag": "", "last_modified": "", + "title": "", "channel_id": "UC123", + } + key = "#test:etag" + _save(bot, key, data) + _channels[key] = data + + async def inner(): + with patch.object(_mod, "_fetch_feed", _fake_fetch_ok): + await _poll_once(bot, key) + updated = _load(bot, key) + assert updated["etag"] == '"abc"' + + asyncio.run(inner()) + + +# --------------------------------------------------------------------------- +# TestRestore +# --------------------------------------------------------------------------- + +class TestRestore: + def test_restore_spawns_pollers(self): + _clear() + bot = _FakeBot() + data = { + "feed_url": "https://yt.com/feed", "name": "restored", "channel": "#test", + "interval": 600, "seen": [], "last_poll": "", "last_error": "", + "etag": "", "last_modified": "", "title": "", "channel_id": "UC123", + } + _save(bot, "#test:restored", data) + + async def inner(): + _restore(bot) + assert "#test:restored" in _pollers + task = _pollers["#test:restored"] + assert not task.done() + _stop_poller("#test:restored") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_restore_skips_active(self): + _clear() + bot = _FakeBot() + data = { + "feed_url": "https://yt.com/feed", "name": "active", "channel": "#test", + "interval": 600, "seen": [], "last_poll": "", "last_error": "", + "etag": "", "last_modified": "", "title": "", "channel_id": "UC123", + } + _save(bot, "#test:active", data) + + async def inner(): + dummy = asyncio.create_task(asyncio.sleep(9999)) + _pollers["#test:active"] = dummy + _restore(bot) + assert _pollers["#test:active"] is dummy + dummy.cancel() + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_restore_replaces_done_task(self): + _clear() + bot = _FakeBot() + data = { + "feed_url": "https://yt.com/feed", "name": "done", "channel": "#test", + "interval": 600, "seen": [], "last_poll": "", "last_error": "", + "etag": "", "last_modified": "", "title": "", "channel_id": "UC123", + } + _save(bot, "#test:done", data) + + async def inner(): + done_task = asyncio.create_task(asyncio.sleep(0)) + await done_task + _pollers["#test:done"] = done_task + _restore(bot) + new_task = _pollers["#test:done"] + assert new_task is not done_task + assert not new_task.done() + _stop_poller("#test:done") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_restore_skips_bad_json(self): + _clear() + bot = _FakeBot() + bot.state.set("yt", "#test:bad", "not json{{{") + + async def inner(): + _restore(bot) + assert "#test:bad" not in _pollers + + asyncio.run(inner()) + + def test_on_connect_calls_restore(self): + _clear() + bot = _FakeBot() + data = { + "feed_url": "https://yt.com/feed", "name": "conn", "channel": "#test", + "interval": 600, "seen": [], "last_poll": "", "last_error": "", + "etag": "", "last_modified": "", "title": "", "channel_id": "UC123", + } + _save(bot, "#test:conn", data) + + async def inner(): + msg = _msg("", target="botname") + await on_connect(bot, msg) + assert "#test:conn" in _pollers + _stop_poller("#test:conn") + await asyncio.sleep(0) + + asyncio.run(inner()) + + +# --------------------------------------------------------------------------- +# TestPollerManagement +# --------------------------------------------------------------------------- + +class TestPollerManagement: + def test_start_and_stop(self): + _clear() + bot = _FakeBot() + data = { + "feed_url": "https://yt.com/feed", "name": "mgmt", "channel": "#test", + "interval": 600, "seen": [], "last_poll": "", "last_error": "", + "etag": "", "last_modified": "", "title": "", "channel_id": "UC123", + } + key = "#test:mgmt" + _save(bot, key, data) + _channels[key] = data + + async def inner(): + _start_poller(bot, key) + assert key in _pollers + assert not _pollers[key].done() + _stop_poller(key) + await asyncio.sleep(0) + assert key not in _pollers + assert key not in _channels + + asyncio.run(inner()) + + def test_start_idempotent(self): + _clear() + bot = _FakeBot() + data = { + "feed_url": "https://yt.com/feed", "name": "idem", "channel": "#test", + "interval": 600, "seen": [], "last_poll": "", "last_error": "", + "etag": "", "last_modified": "", "title": "", "channel_id": "UC123", + } + key = "#test:idem" + _save(bot, key, data) + _channels[key] = data + + async def inner(): + _start_poller(bot, key) + first = _pollers[key] + _start_poller(bot, key) + assert _pollers[key] is first + _stop_poller(key) + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_stop_nonexistent(self): + _clear() + _stop_poller("#test:nonexistent") + + +# --------------------------------------------------------------------------- +# TestCmdYtUsage +# --------------------------------------------------------------------------- + +class TestCmdYtUsage: + def test_no_args(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_yt(bot, _msg("!yt"))) + assert "Usage:" in bot.replied[0] + + def test_unknown_subcommand(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_yt(bot, _msg("!yt foobar"))) + assert "Usage:" in bot.replied[0]