"""Plugin: automatic URL title preview for channel messages.""" from __future__ import annotations import logging import re import time import urllib.parse import urllib.request from html.parser import HTMLParser from derp.http import urlopen as _urlopen from derp.plugin import event _log = logging.getLogger(__name__) # -- Constants --------------------------------------------------------------- _URL_RE = re.compile(r"https?://[^\s<>\"\x00-\x1f]{2,}", re.IGNORECASE) _USER_AGENT = "Mozilla/5.0 (compatible; derp-bot)" _FETCH_TIMEOUT = 10 _MAX_BYTES = 64 * 1024 _MAX_TITLE_LEN = 200 _MAX_DESC_LEN = 150 _MAX_URLS = 3 _COOLDOWN = 300 # seconds _CACHE_MAX = 500 _SKIP_EXTS = frozenset({ ".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg", ".ico", ".bmp", ".mp4", ".webm", ".mkv", ".avi", ".mov", ".flv", ".mp3", ".flac", ".ogg", ".wav", ".aac", ".pdf", ".zip", ".gz", ".tar", ".bz2", ".xz", ".7z", ".rar", ".exe", ".msi", ".deb", ".rpm", ".dmg", ".iso", ".apk", ".wasm", ".bin", ".img", }) # Trailing punctuation to strip, but preserve balanced parens _TRAIL_CHARS = set(".,;:!?)>]") # -- Per-bot state ----------------------------------------------------------- def _ps(bot): """Per-bot plugin runtime state.""" return bot._pstate.setdefault("urltitle", { "seen": {}, }) # -- HTML parser ------------------------------------------------------------- class _TitleParser(HTMLParser): """Extract page title and description from HTML head.""" def __init__(self): super().__init__() self.og_title = "" self.og_description = "" self.title = "" self.meta_description = "" self._in_title = False self._title_parts: list[str] = [] def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: if tag == "meta": attr_map = {k.lower(): (v or "") for k, v in attrs} prop = attr_map.get("property", "").lower() name = attr_map.get("name", "").lower() content = attr_map.get("content", "") if prop == "og:title": self.og_title = content elif prop == "og:description": self.og_description = content elif name == "description" and not self.meta_description: self.meta_description = content elif tag == "title": self._in_title = True self._title_parts = [] def handle_data(self, data: str) -> None: if self._in_title: self._title_parts.append(data) def handle_endtag(self, tag: str) -> None: if tag == "title" and self._in_title: self._in_title = False self.title = " ".join("".join(self._title_parts).split()) @property def best_title(self) -> str: return self.og_title or self.title @property def best_description(self) -> str: return self.og_description or self.meta_description # -- URL helpers ------------------------------------------------------------- def _clean_url(raw: str) -> str: """Strip trailing punctuation while preserving balanced parentheses.""" url = raw while url and url[-1] in _TRAIL_CHARS: if url[-1] == ")" and url.count("(") > url.count(")") - 1: break url = url[:-1] return url def _extract_urls(text: str, max_urls: int = _MAX_URLS) -> list[str]: """Extract up to max_urls HTTP(S) URLs from text. Skips URLs where the character immediately before 'http' is '!' (suppression marker). Deduplicates while preserving order. """ urls: list[str] = [] seen: set[str] = set() for m in _URL_RE.finditer(text): start = m.start() if start > 0 and text[start - 1] == "!": continue url = _clean_url(m.group()) if url not in seen: seen.add(url) urls.append(url) if len(urls) >= max_urls: break return urls def _is_ignored_url(url: str, ignore_hosts: set[str]) -> bool: """Check if a URL should be skipped (extension or host).""" parsed = urllib.parse.urlparse(url) path_lower = parsed.path.lower() # Check file extension for ext in _SKIP_EXTS: if path_lower.endswith(ext): return True # Check ignored hosts host = parsed.hostname or "" if host in ignore_hosts: return True return False def _truncate(text: str, max_len: int) -> str: """Truncate with ellipsis if needed.""" if len(text) <= max_len: return text return text[: max_len - 3].rstrip() + "..." # -- Fetch logic ------------------------------------------------------------- def _fetch_title(url: str) -> tuple[str, str]: """Fetch page title and description for a URL. Uses HEAD-then-GET: HEAD checks Content-Type cheaply, GET fetches the body. Both go through the SOCKS5 connection pool. Returns (title, description). Empty strings on failure. """ # 1. HEAD to check Content-Type try: req = urllib.request.Request(url, method="HEAD") req.add_header("User-Agent", _USER_AGENT) resp = _urlopen(req, timeout=_FETCH_TIMEOUT, retries=1) ct = (resp.headers.get("Content-Type") or "").lower() resp.close() if ct and "html" not in ct and "xhtml" not in ct: return "", "" except Exception: pass # HEAD unsupported -- fall through to GET # 2. GET body (reuses pooled connection to same host) try: req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", _USER_AGENT) resp = _urlopen(req, timeout=_FETCH_TIMEOUT, retries=1) ct = (resp.headers.get("Content-Type") or "").lower() if ct and "html" not in ct and "xhtml" not in ct: resp.close() return "", "" raw = resp.read(_MAX_BYTES) resp.close() except Exception as exc: _log.debug("GET failed for %s: %s", url, exc) return "", "" # 3. Parse html = raw.decode("utf-8", errors="replace") parser = _TitleParser() try: parser.feed(html) except Exception: pass return parser.best_title, parser.best_description # -- Cooldown ---------------------------------------------------------------- def _check_cooldown(bot, url: str, cooldown: int) -> bool: """Return True if the URL is within the cooldown window.""" seen = _ps(bot)["seen"] now = time.monotonic() last = seen.get(url) if last is not None and (now - last) < cooldown: return True # Prune if cache is too large if len(seen) >= _CACHE_MAX: cutoff = now - cooldown stale = [k for k, v in seen.items() if v < cutoff] for k in stale: del seen[k] seen[url] = now return False # -- Event handler ----------------------------------------------------------- @event("PRIVMSG") async def on_privmsg(bot, message): """Preview URLs posted in channel messages.""" import asyncio # Skip non-channel, bot's own messages, and command messages if not message.is_channel: return if message.nick == bot.nick: return text = message.text or "" if text.startswith(bot.prefix): return # Read config cfg = bot.config.get("urltitle", {}) cooldown = cfg.get("cooldown", _COOLDOWN) max_urls = cfg.get("max_urls", _MAX_URLS) extra_ignore = set(cfg.get("ignore_hosts", [])) # Build ignore set: FlaskPaste host + config-specified hosts ignore_hosts = set(extra_ignore) fp_url = bot.config.get("flaskpaste", {}).get("url", "") if fp_url: fp_host = urllib.parse.urlparse(fp_url).hostname if fp_host: ignore_hosts.add(fp_host) urls = _extract_urls(text, max_urls) if not urls: return channel = message.target loop = asyncio.get_running_loop() for url in urls: if _is_ignored_url(url, ignore_hosts): continue if _check_cooldown(bot, url, cooldown): continue title, desc = await loop.run_in_executor(None, _fetch_title, url) if not title: continue title = _truncate(title, _MAX_TITLE_LEN) if desc: desc = _truncate(desc, _MAX_DESC_LEN) line = f"\u21b3 {title} -- {desc}" else: line = f"\u21b3 {title}" await bot.send(channel, line)