Event-driven plugin that auto-fetches page titles for URLs posted in channel messages. HEAD-then-GET via SOCKS5 pool, og:title priority, cooldown dedup, !-suppression, binary/host filtering. 52 tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
279 lines
8.2 KiB
Python
279 lines
8.2 KiB
Python
"""Plugin: automatic URL title preview for channel messages."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
import time
|
|
import urllib.parse
|
|
import urllib.request
|
|
from html.parser import HTMLParser
|
|
|
|
from derp.http import urlopen as _urlopen
|
|
from derp.plugin import event
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
# -- Constants ---------------------------------------------------------------
|
|
|
|
_URL_RE = re.compile(r"https?://[^\s<>\"\x00-\x1f]{2,}", re.IGNORECASE)
|
|
_USER_AGENT = "Mozilla/5.0 (compatible; derp-bot)"
|
|
_FETCH_TIMEOUT = 10
|
|
_MAX_BYTES = 64 * 1024
|
|
_MAX_TITLE_LEN = 200
|
|
_MAX_DESC_LEN = 150
|
|
_MAX_URLS = 3
|
|
_COOLDOWN = 300 # seconds
|
|
_CACHE_MAX = 500
|
|
|
|
_SKIP_EXTS = frozenset({
|
|
".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg", ".ico", ".bmp",
|
|
".mp4", ".webm", ".mkv", ".avi", ".mov", ".flv",
|
|
".mp3", ".flac", ".ogg", ".wav", ".aac",
|
|
".pdf", ".zip", ".gz", ".tar", ".bz2", ".xz", ".7z", ".rar",
|
|
".exe", ".msi", ".deb", ".rpm", ".dmg", ".iso",
|
|
".apk", ".wasm", ".bin", ".img",
|
|
})
|
|
|
|
# Trailing punctuation to strip, but preserve balanced parens
|
|
_TRAIL_CHARS = set(".,;:!?)>]")
|
|
|
|
# -- Module-level state ------------------------------------------------------
|
|
|
|
_seen: dict[str, float] = {}
|
|
|
|
# -- HTML parser -------------------------------------------------------------
|
|
|
|
|
|
class _TitleParser(HTMLParser):
|
|
"""Extract page title and description from HTML head."""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.og_title = ""
|
|
self.og_description = ""
|
|
self.title = ""
|
|
self.meta_description = ""
|
|
self._in_title = False
|
|
self._title_parts: list[str] = []
|
|
|
|
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
|
|
if tag == "meta":
|
|
attr_map = {k.lower(): (v or "") for k, v in attrs}
|
|
prop = attr_map.get("property", "").lower()
|
|
name = attr_map.get("name", "").lower()
|
|
content = attr_map.get("content", "")
|
|
if prop == "og:title":
|
|
self.og_title = content
|
|
elif prop == "og:description":
|
|
self.og_description = content
|
|
elif name == "description" and not self.meta_description:
|
|
self.meta_description = content
|
|
elif tag == "title":
|
|
self._in_title = True
|
|
self._title_parts = []
|
|
|
|
def handle_data(self, data: str) -> None:
|
|
if self._in_title:
|
|
self._title_parts.append(data)
|
|
|
|
def handle_endtag(self, tag: str) -> None:
|
|
if tag == "title" and self._in_title:
|
|
self._in_title = False
|
|
self.title = " ".join("".join(self._title_parts).split())
|
|
|
|
@property
|
|
def best_title(self) -> str:
|
|
return self.og_title or self.title
|
|
|
|
@property
|
|
def best_description(self) -> str:
|
|
return self.og_description or self.meta_description
|
|
|
|
|
|
# -- URL helpers -------------------------------------------------------------
|
|
|
|
|
|
def _clean_url(raw: str) -> str:
|
|
"""Strip trailing punctuation while preserving balanced parentheses."""
|
|
url = raw
|
|
while url and url[-1] in _TRAIL_CHARS:
|
|
if url[-1] == ")" and url.count("(") > url.count(")") - 1:
|
|
break
|
|
url = url[:-1]
|
|
return url
|
|
|
|
|
|
def _extract_urls(text: str, max_urls: int = _MAX_URLS) -> list[str]:
|
|
"""Extract up to max_urls HTTP(S) URLs from text.
|
|
|
|
Skips URLs where the character immediately before 'http' is '!'
|
|
(suppression marker). Deduplicates while preserving order.
|
|
"""
|
|
urls: list[str] = []
|
|
seen: set[str] = set()
|
|
for m in _URL_RE.finditer(text):
|
|
start = m.start()
|
|
if start > 0 and text[start - 1] == "!":
|
|
continue
|
|
url = _clean_url(m.group())
|
|
if url not in seen:
|
|
seen.add(url)
|
|
urls.append(url)
|
|
if len(urls) >= max_urls:
|
|
break
|
|
return urls
|
|
|
|
|
|
def _is_ignored_url(url: str, ignore_hosts: set[str]) -> bool:
|
|
"""Check if a URL should be skipped (extension or host)."""
|
|
parsed = urllib.parse.urlparse(url)
|
|
path_lower = parsed.path.lower()
|
|
|
|
# Check file extension
|
|
for ext in _SKIP_EXTS:
|
|
if path_lower.endswith(ext):
|
|
return True
|
|
|
|
# Check ignored hosts
|
|
host = parsed.hostname or ""
|
|
if host in ignore_hosts:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def _truncate(text: str, max_len: int) -> str:
|
|
"""Truncate with ellipsis if needed."""
|
|
if len(text) <= max_len:
|
|
return text
|
|
return text[: max_len - 3].rstrip() + "..."
|
|
|
|
|
|
# -- Fetch logic -------------------------------------------------------------
|
|
|
|
|
|
def _fetch_title(url: str) -> tuple[str, str]:
|
|
"""Fetch page title and description for a URL.
|
|
|
|
Uses HEAD-then-GET: HEAD checks Content-Type cheaply, GET fetches
|
|
the body. Both go through the SOCKS5 connection pool.
|
|
|
|
Returns (title, description). Empty strings on failure.
|
|
"""
|
|
# 1. HEAD to check Content-Type
|
|
try:
|
|
req = urllib.request.Request(url, method="HEAD")
|
|
req.add_header("User-Agent", _USER_AGENT)
|
|
resp = _urlopen(req, timeout=_FETCH_TIMEOUT, retries=1)
|
|
ct = (resp.headers.get("Content-Type") or "").lower()
|
|
resp.close()
|
|
if ct and "html" not in ct and "xhtml" not in ct:
|
|
return "", ""
|
|
except Exception:
|
|
pass # HEAD unsupported -- fall through to GET
|
|
|
|
# 2. GET body (reuses pooled connection to same host)
|
|
try:
|
|
req = urllib.request.Request(url, method="GET")
|
|
req.add_header("User-Agent", _USER_AGENT)
|
|
resp = _urlopen(req, timeout=_FETCH_TIMEOUT, retries=1)
|
|
ct = (resp.headers.get("Content-Type") or "").lower()
|
|
if ct and "html" not in ct and "xhtml" not in ct:
|
|
resp.close()
|
|
return "", ""
|
|
raw = resp.read(_MAX_BYTES)
|
|
resp.close()
|
|
except Exception as exc:
|
|
_log.debug("GET failed for %s: %s", url, exc)
|
|
return "", ""
|
|
|
|
# 3. Parse
|
|
html = raw.decode("utf-8", errors="replace")
|
|
parser = _TitleParser()
|
|
try:
|
|
parser.feed(html)
|
|
except Exception:
|
|
pass
|
|
|
|
return parser.best_title, parser.best_description
|
|
|
|
|
|
# -- Cooldown ----------------------------------------------------------------
|
|
|
|
|
|
def _check_cooldown(url: str, cooldown: int) -> bool:
|
|
"""Return True if the URL is within the cooldown window."""
|
|
now = time.monotonic()
|
|
last = _seen.get(url)
|
|
if last is not None and (now - last) < cooldown:
|
|
return True
|
|
|
|
# Prune if cache is too large
|
|
if len(_seen) >= _CACHE_MAX:
|
|
cutoff = now - cooldown
|
|
stale = [k for k, v in _seen.items() if v < cutoff]
|
|
for k in stale:
|
|
del _seen[k]
|
|
|
|
_seen[url] = now
|
|
return False
|
|
|
|
|
|
# -- Event handler -----------------------------------------------------------
|
|
|
|
|
|
@event("PRIVMSG")
|
|
async def on_privmsg(bot, message):
|
|
"""Preview URLs posted in channel messages."""
|
|
import asyncio
|
|
|
|
# Skip non-channel, bot's own messages, and command messages
|
|
if not message.is_channel:
|
|
return
|
|
if message.nick == bot.nick:
|
|
return
|
|
text = message.text or ""
|
|
if text.startswith(bot.prefix):
|
|
return
|
|
|
|
# Read config
|
|
cfg = bot.config.get("urltitle", {})
|
|
cooldown = cfg.get("cooldown", _COOLDOWN)
|
|
max_urls = cfg.get("max_urls", _MAX_URLS)
|
|
extra_ignore = set(cfg.get("ignore_hosts", []))
|
|
|
|
# Build ignore set: FlaskPaste host + config-specified hosts
|
|
ignore_hosts = set(extra_ignore)
|
|
fp_url = bot.config.get("flaskpaste", {}).get("url", "")
|
|
if fp_url:
|
|
fp_host = urllib.parse.urlparse(fp_url).hostname
|
|
if fp_host:
|
|
ignore_hosts.add(fp_host)
|
|
|
|
urls = _extract_urls(text, max_urls)
|
|
if not urls:
|
|
return
|
|
|
|
channel = message.target
|
|
loop = asyncio.get_running_loop()
|
|
|
|
for url in urls:
|
|
if _is_ignored_url(url, ignore_hosts):
|
|
continue
|
|
if _check_cooldown(url, cooldown):
|
|
continue
|
|
|
|
title, desc = await loop.run_in_executor(None, _fetch_title, url)
|
|
if not title:
|
|
continue
|
|
|
|
title = _truncate(title, _MAX_TITLE_LEN)
|
|
if desc:
|
|
desc = _truncate(desc, _MAX_DESC_LEN)
|
|
line = f"\u21b3 {title} -- {desc}"
|
|
else:
|
|
line = f"\u21b3 {title}"
|
|
|
|
await bot.send(channel, line)
|