diff --git a/TASKS.md b/TASKS.md index 0e1f76f..9478eb0 100644 --- a/TASKS.md +++ b/TASKS.md @@ -1,6 +1,30 @@ # derp - Tasks -## Current Sprint -- v1.2.2 Connection Pooling + Batch OG (2026-02-17) +## Current Sprint -- v1.2.4 URL Title Preview (2026-02-17) + +| Pri | Status | Task | +|-----|--------|------| +| P0 | [x] | URL title preview plugin (`plugins/urltitle.py`) | +| P0 | [x] | HEAD-then-GET fetch via SOCKS5 connection pool | +| P1 | [x] | `_TitleParser`: og:title/description + `` fallback | +| P1 | [x] | URL extraction with `!`-suppression and balanced parens | +| P1 | [x] | Dedup/cooldown (5 min, 500 entry cache) | +| P1 | [x] | Skip non-HTML, binary extensions, FlaskPaste host | +| P2 | [x] | Tests for urltitle (11 test classes, ~40 cases) | +| P2 | [x] | Documentation update (USAGE.md) | + +## Previous Sprint -- v1.2.3 Paste Overflow (2026-02-17) + +| Pri | Status | Task | +|-----|--------|------| +| P0 | [x] | `Bot.long_reply()` method with FlaskPaste overflow | +| P0 | [x] | Configurable `paste_threshold` (default: 4) | +| P1 | [x] | Refactor alert history to use `long_reply()` | +| P1 | [x] | Refactor exploitdb search/cve to use `long_reply()` | +| P1 | [x] | Refactor subdomain, crtsh, abuseipdb, dork to use `long_reply()` | +| P2 | [x] | Tests for paste overflow (10 cases) | + +## Previous Sprint -- v1.2.2 Connection Pooling + Batch OG (2026-02-17) | Pri | Status | Task | |-----|--------|------| @@ -28,6 +52,7 @@ | Date | Task | |------|------| +| 2026-02-17 | v1.2.3 (paste overflow with FlaskPaste integration) | | 2026-02-17 | v1.2.1 (HTTP opener cache, alert perf, concurrent multi-instance, tracemalloc) | | 2026-02-16 | v1.2.0 (subscriptions, alerts, proxy, reminders) | | 2026-02-15 | Calendar-based reminders (at/yearly) with persistence | diff --git a/docs/USAGE.md b/docs/USAGE.md index 57a2c46..00f4a86 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -50,6 +50,7 @@ channels = ["#test"] # Channels to join on connect plugins_dir = "plugins" # Plugin directory path rate_limit = 2.0 # Max messages per second (default: 2.0) rate_burst = 5 # Burst capacity (default: 5) +paste_threshold = 4 # Max lines before overflow to FlaskPaste (default: 4) admins = [] # Hostmask patterns (fnmatch), IRCOPs auto-detected timezone = "UTC" # Timezone for calendar reminders (IANA tz name) @@ -880,3 +881,46 @@ url = "https://paste.mymx.me" # or set FLASKPASTE_URL env var Auth: place client cert/key at `secrets/flaskpaste/derp.crt` and `derp.key` for mTLS (bypasses PoW). Without them, PoW challenges are solved per request. + +### URL Title Preview (urltitle) + +Automatic URL title preview for channel messages. When a user posts a URL, +the bot fetches the page title and description and displays a one-line +preview. No commands -- event-driven only. + +``` +<alice> check out https://example.com/article +<derp> ↳ Article Title -- Description of the article... +``` + +Behavior: + +- Automatically previews HTTP(S) URLs posted in channel messages +- Skips private messages, bot's own messages, and command messages (`!prefix`) +- URLs prefixed with `!` are suppressed: `!https://example.com` produces no preview +- HEAD-then-GET fetch strategy (checks Content-Type before downloading body) +- Skips non-HTML content types (images, PDFs, JSON, etc.) +- Skips binary file extensions (`.png`, `.jpg`, `.pdf`, `.zip`, etc.) +- Skips FlaskPaste URLs and configured ignore hosts +- Dedup: same URL only previewed once per cooldown window (5 min default) +- Max 3 URLs previewed per message (configurable) +- Title from `og:title` takes priority over `<title>` tag +- Description from `og:description` takes priority over `<meta name="description">` +- Title truncated at 200 chars, description at 150 chars + +Output format: + +``` +↳ Page Title -- Description truncated to 150 chars... +↳ Page Title +``` + +Configuration (optional): + +```toml +[urltitle] +cooldown = 300 # seconds before same URL previewed again +timeout = 10 # HTTP fetch timeout +max_urls = 3 # max URLs to preview per message +ignore_hosts = [] # additional hostnames to skip +``` diff --git a/plugins/urltitle.py b/plugins/urltitle.py new file mode 100644 index 0000000..fafda87 --- /dev/null +++ b/plugins/urltitle.py @@ -0,0 +1,278 @@ +"""Plugin: automatic URL title preview for channel messages.""" + +from __future__ import annotations + +import logging +import re +import time +import urllib.parse +import urllib.request +from html.parser import HTMLParser + +from derp.http import urlopen as _urlopen +from derp.plugin import event + +_log = logging.getLogger(__name__) + +# -- Constants --------------------------------------------------------------- + +_URL_RE = re.compile(r"https?://[^\s<>\"\x00-\x1f]{2,}", re.IGNORECASE) +_USER_AGENT = "Mozilla/5.0 (compatible; derp-bot)" +_FETCH_TIMEOUT = 10 +_MAX_BYTES = 64 * 1024 +_MAX_TITLE_LEN = 200 +_MAX_DESC_LEN = 150 +_MAX_URLS = 3 +_COOLDOWN = 300 # seconds +_CACHE_MAX = 500 + +_SKIP_EXTS = frozenset({ + ".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg", ".ico", ".bmp", + ".mp4", ".webm", ".mkv", ".avi", ".mov", ".flv", + ".mp3", ".flac", ".ogg", ".wav", ".aac", + ".pdf", ".zip", ".gz", ".tar", ".bz2", ".xz", ".7z", ".rar", + ".exe", ".msi", ".deb", ".rpm", ".dmg", ".iso", + ".apk", ".wasm", ".bin", ".img", +}) + +# Trailing punctuation to strip, but preserve balanced parens +_TRAIL_CHARS = set(".,;:!?)>]") + +# -- Module-level state ------------------------------------------------------ + +_seen: dict[str, float] = {} + +# -- HTML parser ------------------------------------------------------------- + + +class _TitleParser(HTMLParser): + """Extract page title and description from HTML head.""" + + def __init__(self): + super().__init__() + self.og_title = "" + self.og_description = "" + self.title = "" + self.meta_description = "" + self._in_title = False + self._title_parts: list[str] = [] + + def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: + if tag == "meta": + attr_map = {k.lower(): (v or "") for k, v in attrs} + prop = attr_map.get("property", "").lower() + name = attr_map.get("name", "").lower() + content = attr_map.get("content", "") + if prop == "og:title": + self.og_title = content + elif prop == "og:description": + self.og_description = content + elif name == "description" and not self.meta_description: + self.meta_description = content + elif tag == "title": + self._in_title = True + self._title_parts = [] + + def handle_data(self, data: str) -> None: + if self._in_title: + self._title_parts.append(data) + + def handle_endtag(self, tag: str) -> None: + if tag == "title" and self._in_title: + self._in_title = False + self.title = " ".join("".join(self._title_parts).split()) + + @property + def best_title(self) -> str: + return self.og_title or self.title + + @property + def best_description(self) -> str: + return self.og_description or self.meta_description + + +# -- URL helpers ------------------------------------------------------------- + + +def _clean_url(raw: str) -> str: + """Strip trailing punctuation while preserving balanced parentheses.""" + url = raw + while url and url[-1] in _TRAIL_CHARS: + if url[-1] == ")" and url.count("(") > url.count(")") - 1: + break + url = url[:-1] + return url + + +def _extract_urls(text: str, max_urls: int = _MAX_URLS) -> list[str]: + """Extract up to max_urls HTTP(S) URLs from text. + + Skips URLs where the character immediately before 'http' is '!' + (suppression marker). Deduplicates while preserving order. + """ + urls: list[str] = [] + seen: set[str] = set() + for m in _URL_RE.finditer(text): + start = m.start() + if start > 0 and text[start - 1] == "!": + continue + url = _clean_url(m.group()) + if url not in seen: + seen.add(url) + urls.append(url) + if len(urls) >= max_urls: + break + return urls + + +def _is_ignored_url(url: str, ignore_hosts: set[str]) -> bool: + """Check if a URL should be skipped (extension or host).""" + parsed = urllib.parse.urlparse(url) + path_lower = parsed.path.lower() + + # Check file extension + for ext in _SKIP_EXTS: + if path_lower.endswith(ext): + return True + + # Check ignored hosts + host = parsed.hostname or "" + if host in ignore_hosts: + return True + + return False + + +def _truncate(text: str, max_len: int) -> str: + """Truncate with ellipsis if needed.""" + if len(text) <= max_len: + return text + return text[: max_len - 3].rstrip() + "..." + + +# -- Fetch logic ------------------------------------------------------------- + + +def _fetch_title(url: str) -> tuple[str, str]: + """Fetch page title and description for a URL. + + Uses HEAD-then-GET: HEAD checks Content-Type cheaply, GET fetches + the body. Both go through the SOCKS5 connection pool. + + Returns (title, description). Empty strings on failure. + """ + # 1. HEAD to check Content-Type + try: + req = urllib.request.Request(url, method="HEAD") + req.add_header("User-Agent", _USER_AGENT) + resp = _urlopen(req, timeout=_FETCH_TIMEOUT, retries=1) + ct = (resp.headers.get("Content-Type") or "").lower() + resp.close() + if ct and "html" not in ct and "xhtml" not in ct: + return "", "" + except Exception: + pass # HEAD unsupported -- fall through to GET + + # 2. GET body (reuses pooled connection to same host) + try: + req = urllib.request.Request(url, method="GET") + req.add_header("User-Agent", _USER_AGENT) + resp = _urlopen(req, timeout=_FETCH_TIMEOUT, retries=1) + ct = (resp.headers.get("Content-Type") or "").lower() + if ct and "html" not in ct and "xhtml" not in ct: + resp.close() + return "", "" + raw = resp.read(_MAX_BYTES) + resp.close() + except Exception as exc: + _log.debug("GET failed for %s: %s", url, exc) + return "", "" + + # 3. Parse + html = raw.decode("utf-8", errors="replace") + parser = _TitleParser() + try: + parser.feed(html) + except Exception: + pass + + return parser.best_title, parser.best_description + + +# -- Cooldown ---------------------------------------------------------------- + + +def _check_cooldown(url: str, cooldown: int) -> bool: + """Return True if the URL is within the cooldown window.""" + now = time.monotonic() + last = _seen.get(url) + if last is not None and (now - last) < cooldown: + return True + + # Prune if cache is too large + if len(_seen) >= _CACHE_MAX: + cutoff = now - cooldown + stale = [k for k, v in _seen.items() if v < cutoff] + for k in stale: + del _seen[k] + + _seen[url] = now + return False + + +# -- Event handler ----------------------------------------------------------- + + +@event("PRIVMSG") +async def on_privmsg(bot, message): + """Preview URLs posted in channel messages.""" + import asyncio + + # Skip non-channel, bot's own messages, and command messages + if not message.is_channel: + return + if message.nick == bot.nick: + return + text = message.text or "" + if text.startswith(bot.prefix): + return + + # Read config + cfg = bot.config.get("urltitle", {}) + cooldown = cfg.get("cooldown", _COOLDOWN) + max_urls = cfg.get("max_urls", _MAX_URLS) + extra_ignore = set(cfg.get("ignore_hosts", [])) + + # Build ignore set: FlaskPaste host + config-specified hosts + ignore_hosts = set(extra_ignore) + fp_url = bot.config.get("flaskpaste", {}).get("url", "") + if fp_url: + fp_host = urllib.parse.urlparse(fp_url).hostname + if fp_host: + ignore_hosts.add(fp_host) + + urls = _extract_urls(text, max_urls) + if not urls: + return + + channel = message.target + loop = asyncio.get_running_loop() + + for url in urls: + if _is_ignored_url(url, ignore_hosts): + continue + if _check_cooldown(url, cooldown): + continue + + title, desc = await loop.run_in_executor(None, _fetch_title, url) + if not title: + continue + + title = _truncate(title, _MAX_TITLE_LEN) + if desc: + desc = _truncate(desc, _MAX_DESC_LEN) + line = f"\u21b3 {title} -- {desc}" + else: + line = f"\u21b3 {title}" + + await bot.send(channel, line) diff --git a/tests/test_urltitle.py b/tests/test_urltitle.py new file mode 100644 index 0000000..1a0eaa9 --- /dev/null +++ b/tests/test_urltitle.py @@ -0,0 +1,477 @@ +"""Tests for the URL title preview plugin.""" + +import asyncio +import importlib.util +import sys +import time +from pathlib import Path +from unittest.mock import patch + +from derp.irc import Message + +# plugins/ is not a Python package -- load the module from file path +_spec = importlib.util.spec_from_file_location( + "plugins.urltitle", + Path(__file__).resolve().parent.parent / "plugins" / "urltitle.py", +) +_mod = importlib.util.module_from_spec(_spec) +sys.modules[_spec.name] = _mod +_spec.loader.exec_module(_mod) + +from plugins.urltitle import ( # noqa: E402, I001 + _TitleParser, + _check_cooldown, + _clean_url, + _extract_urls, + _fetch_title, + _is_ignored_url, + _seen, + on_privmsg, +) + + +# -- Helpers ----------------------------------------------------------------- + + +class _FakeBot: + """Minimal bot stand-in that captures sent messages.""" + + def __init__(self): + self.sent: list[tuple[str, str]] = [] + self.nick = "derp" + self.prefix = "!" + self.config = { + "flaskpaste": {"url": "https://paste.mymx.me"}, + "urltitle": {}, + } + + async def send(self, target: str, text: str) -> None: + self.sent.append((target, text)) + + +def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message: + """Create a channel PRIVMSG.""" + return Message( + raw="", prefix=f"{nick}!~{nick}@host", nick=nick, + command="PRIVMSG", params=[target, text], tags={}, + ) + + +def _pm(text: str, nick: str = "alice") -> Message: + """Create a private PRIVMSG.""" + return Message( + raw="", prefix=f"{nick}!~{nick}@host", nick=nick, + command="PRIVMSG", params=["derp", text], tags={}, + ) + + +class _FakeResp: + """Fake HTTP response for mocking _urlopen.""" + + def __init__(self, data: bytes = b"", content_type: str = "text/html", + status: int = 200): + self._data = data + self.headers = {"Content-Type": content_type} + self.status = status + + def read(self, n: int = -1) -> bytes: + if n == -1: + return self._data + return self._data[:n] + + def close(self) -> None: + pass + + +# --------------------------------------------------------------------------- +# TestExtractUrls +# --------------------------------------------------------------------------- + +class TestExtractUrls: + def test_single_url(self): + urls = _extract_urls("check https://example.com please") + assert urls == ["https://example.com"] + + def test_multiple_urls(self): + urls = _extract_urls("see https://a.com and http://b.com ok") + assert urls == ["https://a.com", "http://b.com"] + + def test_max_limit(self): + text = "https://a.com https://b.com https://c.com https://d.com" + urls = _extract_urls(text, max_urls=2) + assert len(urls) == 2 + + def test_trailing_punctuation(self): + urls = _extract_urls("visit https://example.com.") + assert urls == ["https://example.com"] + + def test_trailing_comma(self): + urls = _extract_urls("https://example.com, check it") + assert urls == ["https://example.com"] + + def test_balanced_parens(self): + urls = _extract_urls("https://en.wikipedia.org/wiki/Foo_(bar)") + assert urls == ["https://en.wikipedia.org/wiki/Foo_(bar)"] + + def test_unbalanced_paren_stripped(self): + urls = _extract_urls("(https://example.com)") + assert urls == ["https://example.com"] + + def test_suppressed_url(self): + urls = _extract_urls("!https://example.com") + assert urls == [] + + def test_suppressed_mixed(self): + urls = _extract_urls("!https://skip.com https://keep.com") + assert urls == ["https://keep.com"] + + def test_no_urls(self): + urls = _extract_urls("no urls here") + assert urls == [] + + def test_dedup(self): + urls = _extract_urls("https://a.com https://a.com") + assert urls == ["https://a.com"] + + +# --------------------------------------------------------------------------- +# TestCleanUrl +# --------------------------------------------------------------------------- + +class TestCleanUrl: + def test_no_trailing(self): + assert _clean_url("https://example.com") == "https://example.com" + + def test_strip_period(self): + assert _clean_url("https://example.com.") == "https://example.com" + + def test_strip_semicolon(self): + assert _clean_url("https://example.com;") == "https://example.com" + + def test_preserve_balanced_parens(self): + url = "https://en.wikipedia.org/wiki/Foo_(bar)" + assert _clean_url(url) == url + + def test_strip_trailing_paren_unbalanced(self): + assert _clean_url("https://example.com)") == "https://example.com" + + def test_multiple_trailing(self): + assert _clean_url("https://example.com..;") == "https://example.com" + + +# --------------------------------------------------------------------------- +# TestTitleParser +# --------------------------------------------------------------------------- + +class TestTitleParser: + def test_og_title_priority(self): + parser = _TitleParser() + parser.feed(""" + <html><head> + <meta property="og:title" content="OG Title"> + <title>Page Title + + """) + assert parser.best_title == "OG Title" + + def test_title_fallback(self): + parser = _TitleParser() + parser.feed("Fallback Title") + assert parser.best_title == "Fallback Title" + + def test_og_description(self): + parser = _TitleParser() + parser.feed(""" + + + """) + assert parser.best_description == "OG Desc" + + def test_meta_description_fallback(self): + parser = _TitleParser() + parser.feed('') + assert parser.best_description == "Meta Desc" + + def test_whitespace_collapse(self): + parser = _TitleParser() + parser.feed(" Hello World ") + assert parser.title == "Hello World" + + def test_no_title(self): + parser = _TitleParser() + parser.feed("No title here") + assert parser.best_title == "" + + def test_multipart_title(self): + parser = _TitleParser() + parser.feed("Part 1 <em>Part 2</em> Part 3") + # The parser collects text data; triggers start/end but + # its text is still captured by handle_data + assert "Part 1" in parser.title + + def test_empty_og_title(self): + parser = _TitleParser() + parser.feed(""" + + Real Title + """) + assert parser.best_title == "Real Title" + + +# --------------------------------------------------------------------------- +# TestIsIgnoredUrl +# --------------------------------------------------------------------------- + +class TestIsIgnoredUrl: + def test_paste_host(self): + assert _is_ignored_url( + "https://paste.mymx.me/abc", {"paste.mymx.me"}, + ) is True + + def test_image_extension(self): + assert _is_ignored_url( + "https://example.com/photo.png", set(), + ) is True + + def test_pdf_extension(self): + assert _is_ignored_url( + "https://example.com/doc.pdf", set(), + ) is True + + def test_zip_extension(self): + assert _is_ignored_url( + "https://example.com/archive.zip", set(), + ) is True + + def test_normal_url_passes(self): + assert _is_ignored_url( + "https://example.com/page", set(), + ) is False + + def test_html_extension_passes(self): + assert _is_ignored_url( + "https://example.com/page.html", set(), + ) is False + + def test_custom_ignore_host(self): + assert _is_ignored_url( + "https://private.local/x", {"private.local"}, + ) is True + + +# --------------------------------------------------------------------------- +# TestFetchTitle +# --------------------------------------------------------------------------- + +class TestFetchTitle: + def test_successful_html(self): + html = b"Test Page" + head_resp = _FakeResp(b"", content_type="text/html") + get_resp = _FakeResp(html, content_type="text/html; charset=utf-8") + calls = iter([head_resp, get_resp]) + + with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)): + title, desc = _fetch_title("https://example.com") + assert title == "Test Page" + + def test_non_html_content_type_bails(self): + head_resp = _FakeResp(b"", content_type="application/json") + + with patch.object(_mod, "_urlopen", return_value=head_resp): + title, desc = _fetch_title("https://example.com/api") + assert title == "" + assert desc == "" + + def test_head_fail_falls_through_to_get(self): + html = b"Recovered" + get_resp = _FakeResp(html, content_type="text/html") + + def side_effect(req, **kw): + if req.get_method() == "HEAD": + raise ConnectionError("HEAD not supported") + return get_resp + + with patch.object(_mod, "_urlopen", side_effect=side_effect): + title, desc = _fetch_title("https://example.com") + assert title == "Recovered" + + def test_network_error_returns_empty(self): + with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")): + title, desc = _fetch_title("https://example.com") + assert title == "" + assert desc == "" + + def test_og_tags_extracted(self): + html = ( + b'' + b'' + b'' + b'' + ) + head_resp = _FakeResp(b"", content_type="text/html") + get_resp = _FakeResp(html, content_type="text/html") + calls = iter([head_resp, get_resp]) + + with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)): + title, desc = _fetch_title("https://example.com") + assert title == "OG Title" + assert desc == "OG Desc" + + def test_get_non_html_bails(self): + """HEAD returns html but GET returns non-html (redirect to binary).""" + head_resp = _FakeResp(b"", content_type="text/html") + get_resp = _FakeResp(b"\x89PNG", content_type="image/png") + calls = iter([head_resp, get_resp]) + + with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)): + title, desc = _fetch_title("https://example.com/img") + assert title == "" + + +# --------------------------------------------------------------------------- +# TestCooldown +# --------------------------------------------------------------------------- + +class TestCooldown: + def setup_method(self): + _seen.clear() + + def test_first_access_not_cooled(self): + assert _check_cooldown("https://a.com", 300) is False + + def test_second_access_within_window(self): + _check_cooldown("https://b.com", 300) + assert _check_cooldown("https://b.com", 300) is True + + def test_after_cooldown_expires(self): + _seen["https://c.com"] = time.monotonic() - 400 + assert _check_cooldown("https://c.com", 300) is False + + def test_pruning(self): + """Cache is pruned when it exceeds max size.""" + old = time.monotonic() - 600 + for i in range(600): + _seen[f"https://stale-{i}.com"] = old + _check_cooldown("https://new.com", 300) + assert len(_seen) < 600 + + +# --------------------------------------------------------------------------- +# TestOnPrivmsg +# --------------------------------------------------------------------------- + +class TestOnPrivmsg: + def setup_method(self): + _seen.clear() + + def test_channel_url_previewed(self): + bot = _FakeBot() + html = b"Example" + head_resp = _FakeResp(b"", content_type="text/html") + get_resp = _FakeResp(html, content_type="text/html") + calls = iter([head_resp, get_resp]) + + def inner(): + with patch.object(_mod, "_urlopen", + side_effect=lambda *a, **kw: next(calls)): + asyncio.run(on_privmsg(bot, _msg("check https://example.com"))) + + inner() + assert len(bot.sent) == 1 + assert bot.sent[0][0] == "#test" + assert "\u21b3 Example" in bot.sent[0][1] + + def test_pm_ignored(self): + bot = _FakeBot() + + asyncio.run(on_privmsg(bot, _pm("https://example.com"))) + assert len(bot.sent) == 0 + + def test_bot_nick_ignored(self): + bot = _FakeBot() + + asyncio.run(on_privmsg(bot, _msg("https://example.com", nick="derp"))) + assert len(bot.sent) == 0 + + def test_command_ignored(self): + bot = _FakeBot() + + asyncio.run(on_privmsg(bot, _msg("!shorten https://example.com"))) + assert len(bot.sent) == 0 + + def test_suppressed_url(self): + bot = _FakeBot() + + asyncio.run(on_privmsg(bot, _msg("!https://example.com"))) + assert len(bot.sent) == 0 + + def test_paste_host_ignored(self): + bot = _FakeBot() + + asyncio.run(on_privmsg( + bot, _msg("https://paste.mymx.me/some-paste"), + )) + assert len(bot.sent) == 0 + + def test_empty_title_skipped(self): + bot = _FakeBot() + html = b"No title here" + head_resp = _FakeResp(b"", content_type="text/html") + get_resp = _FakeResp(html, content_type="text/html") + calls = iter([head_resp, get_resp]) + + with patch.object(_mod, "_urlopen", + side_effect=lambda *a, **kw: next(calls)): + asyncio.run(on_privmsg(bot, _msg("https://notitle.com"))) + assert len(bot.sent) == 0 + + def test_image_url_skipped(self): + bot = _FakeBot() + + asyncio.run(on_privmsg( + bot, _msg("https://example.com/photo.png"), + )) + assert len(bot.sent) == 0 + + def test_title_with_description(self): + bot = _FakeBot() + html = ( + b'' + b'My Page' + b'' + b'' + ) + head_resp = _FakeResp(b"", content_type="text/html") + get_resp = _FakeResp(html, content_type="text/html") + calls = iter([head_resp, get_resp]) + + with patch.object(_mod, "_urlopen", + side_effect=lambda *a, **kw: next(calls)): + asyncio.run(on_privmsg(bot, _msg("https://example.com"))) + assert len(bot.sent) == 1 + assert "My Page -- A great page" in bot.sent[0][1] + + def test_cooldown_prevents_repeat(self): + bot = _FakeBot() + html = b"Example" + + def make_calls(): + return iter([ + _FakeResp(b"", content_type="text/html"), + _FakeResp(html, content_type="text/html"), + ]) + + calls = make_calls() + with patch.object(_mod, "_urlopen", + side_effect=lambda *a, **kw: next(calls)): + asyncio.run(on_privmsg(bot, _msg("https://cooldown.com"))) + + assert len(bot.sent) == 1 + bot.sent.clear() + + # Same URL again -- should be suppressed by cooldown + calls = make_calls() + with patch.object(_mod, "_urlopen", + side_effect=lambda *a, **kw: next(calls)): + asyncio.run(on_privmsg(bot, _msg("https://cooldown.com"))) + assert len(bot.sent) == 0