From c3b19feb0fada816a41f06d83d6acc1fd5236c74 Mon Sep 17 00:00:00 2001 From: user Date: Wed, 18 Feb 2026 09:01:46 +0100 Subject: [PATCH] feat: add paste site keyword monitor plugin Poll Pastebin archive and GitHub Gists for keyword matches, announce hits to subscribed IRC channels. Follows rss.py polling/subscription pattern with state persistence. Co-Authored-By: Claude Opus 4.6 --- TASKS.md | 15 +- docs/USAGE.md | 39 ++ plugins/pastemoni.py | 520 ++++++++++++++++++++ tests/test_pastemoni.py | 1015 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 1588 insertions(+), 1 deletion(-) create mode 100644 plugins/pastemoni.py create mode 100644 tests/test_pastemoni.py diff --git a/TASKS.md b/TASKS.md index 9478eb0..60d9f06 100644 --- a/TASKS.md +++ b/TASKS.md @@ -1,6 +1,19 @@ # derp - Tasks -## Current Sprint -- v1.2.4 URL Title Preview (2026-02-17) +## Current Sprint -- v1.2.5 Paste Site Keyword Monitor (2026-02-18) + +| Pri | Status | Task | +|-----|--------|------| +| P0 | [x] | Pastemoni plugin (`plugins/pastemoni.py`) | +| P0 | [x] | Pastebin archive scraping + raw content matching | +| P0 | [x] | GitHub Gists API keyword filtering | +| P1 | [x] | Polling/subscription architecture (rss.py pattern) | +| P1 | [x] | State persistence + restore on connect | +| P1 | [x] | Command handler: add/del/list/check | +| P2 | [x] | Tests for pastemoni (15 test classes, ~45 cases) | +| P2 | [x] | Documentation update (USAGE.md) | + +## Previous Sprint -- v1.2.4 URL Title Preview (2026-02-17) | Pri | Status | Task | |-----|--------|------| diff --git a/docs/USAGE.md b/docs/USAGE.md index 00f4a86..64915a6 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -134,6 +134,7 @@ format = "text" # Log format: "text" (default) or "json" | `!vt ` | VirusTotal lookup | | `!emailcheck [email2 ...]` | SMTP email verification (admin) | | `!shorten ` | Shorten a URL via FlaskPaste | +| `!pastemoni ` | Paste site keyword monitoring | ### Command Shorthand @@ -872,6 +873,44 @@ https://paste.mymx.me/s/AbCdEfGh - mTLS client cert skips PoW; falls back to PoW challenge if no cert - Also used internally by `!alert` to shorten announcement URLs +### `!pastemoni` -- Paste Site Keyword Monitor + +Monitor public paste sites for keywords (data leaks, credential dumps, brand +mentions). Polls Pastebin's archive and GitHub's public Gists API on a +schedule, checks new pastes for keyword matches, and announces hits to the +subscribed IRC channel. + +``` +!pastemoni add Add monitor (admin) +!pastemoni del Remove monitor (admin) +!pastemoni list List monitors +!pastemoni check Force-poll now +``` + +- `add` and `del` require admin privileges +- All subcommands must be used in a channel (not PM) +- Names must be lowercase alphanumeric + hyphens, 1-20 characters +- Maximum 20 monitors per channel + +Backends: + +- **Pastebin** (`pb`) -- Scrapes `pastebin.com/archive` for recent pastes, + fetches raw content, case-insensitive keyword match against title + content +- **GitHub Gists** (`gh`) -- Queries `api.github.com/gists/public`, matches + keyword against description and filenames + +Polling and announcements: + +- Monitors are polled every 5 minutes by default +- On `add`, existing items are seeded in the background (no flood) +- New matches announced as `[tag] Title -- snippet -- URL` +- Maximum 5 items announced per backend per poll; excess shown as `... and N more` +- Titles truncated to 60 characters, snippets to 80 characters +- 5 consecutive all-backend failures doubles the poll interval (max 1 hour) +- Subscriptions persist across bot restarts via `bot.state` +- `list` shows keyword and per-backend error counts +- `check` forces an immediate poll across all backends + ### FlaskPaste Configuration ```toml diff --git a/plugins/pastemoni.py b/plugins/pastemoni.py new file mode 100644 index 0000000..2675325 --- /dev/null +++ b/plugins/pastemoni.py @@ -0,0 +1,520 @@ +"""Plugin: paste site keyword monitor for Pastebin and GitHub Gists.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import re +import urllib.request +from datetime import datetime, timezone +from html.parser import HTMLParser + +from derp.http import urlopen as _urlopen +from derp.plugin import command, event + +_log = logging.getLogger(__name__) + +# -- Constants --------------------------------------------------------------- + +_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,19}$") +_MAX_SEEN = 200 +_MAX_ANNOUNCE = 5 +_DEFAULT_INTERVAL = 300 +_MAX_INTERVAL = 3600 +_FETCH_TIMEOUT = 15 +_USER_AGENT = "derp-bot/1.0 (IRC paste monitor)" +_MAX_MONITORS = 20 +_MAX_SNIPPET_LEN = 80 +_MAX_TITLE_LEN = 60 + +# -- Module-level tracking --------------------------------------------------- + +_pollers: dict[str, asyncio.Task] = {} +_monitors: dict[str, dict] = {} +_errors: dict[str, int] = {} + + +# -- Pure helpers ------------------------------------------------------------ + +def _state_key(channel: str, name: str) -> str: + """Build composite state key.""" + return f"{channel}:{name}" + + +def _validate_name(name: str) -> bool: + """Check name against allowed pattern.""" + return bool(_NAME_RE.match(name)) + + +def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str: + """Truncate text with ellipsis if needed.""" + if len(text) <= max_len: + return text + return text[: max_len - 3].rstrip() + "..." + + +def _snippet_around(text: str, keyword: str, max_len: int = _MAX_SNIPPET_LEN) -> str: + """Extract snippet centered around keyword match.""" + if not text: + return "" + text = " ".join(text.split()) # collapse whitespace + if len(text) <= max_len: + return text + idx = text.lower().find(keyword.lower()) + if idx < 0: + return text[: max_len - 3] + "..." + start = max(0, idx - max_len // 3) + end = min(len(text), start + max_len) + snippet = text[start:end] + if start > 0: + snippet = "..." + snippet + if end < len(text): + snippet = snippet + "..." + return snippet + + +# -- State helpers ----------------------------------------------------------- + +def _save(bot, key: str, data: dict) -> None: + """Persist monitor data to bot.state.""" + bot.state.set("pastemoni", key, json.dumps(data)) + + +def _load(bot, key: str) -> dict | None: + """Load monitor data from bot.state.""" + raw = bot.state.get("pastemoni", key) + if raw is None: + return None + try: + return json.loads(raw) + except json.JSONDecodeError: + return None + + +def _delete(bot, key: str) -> None: + """Remove monitor data from bot.state.""" + bot.state.delete("pastemoni", key) + + +# -- Pastebin archive parser ------------------------------------------------ + +class _ArchiveParser(HTMLParser): + """Extract paste links from Pastebin archive HTML.""" + + def __init__(self): + super().__init__() + self.links: list[tuple[str, str]] = [] # (paste_id, title) + self._in_link = False + self._href = "" + self._title_parts: list[str] = [] + + def handle_starttag(self, tag, attrs): + if tag != "a": + return + attr_map = {k: (v or "") for k, v in attrs} + href = attr_map.get("href", "") + if re.match(r"^/[a-zA-Z0-9]{8}$", href): + self._in_link = True + self._href = href[1:] # strip leading / + self._title_parts = [] + + def handle_data(self, data): + if self._in_link: + self._title_parts.append(data) + + def handle_endtag(self, tag): + if tag == "a" and self._in_link: + self._in_link = False + title = "".join(self._title_parts).strip() + if self._href: + self.links.append((self._href, title)) + + +# -- Pastebin backend -------------------------------------------------------- + +def _fetch_pastebin(keyword: str) -> list[dict]: + """Scrape Pastebin archive and filter by keyword. Blocking.""" + req = urllib.request.Request("https://pastebin.com/archive", method="GET") + req.add_header("User-Agent", _USER_AGENT) + + resp = _urlopen(req, timeout=_FETCH_TIMEOUT) + raw = resp.read() + resp.close() + + html = raw.decode("utf-8", errors="replace") + parser = _ArchiveParser() + parser.feed(html) + + kw_lower = keyword.lower() + results: list[dict] = [] + + for paste_id, title in parser.links[:30]: + # Check title first (avoids raw fetch) + if kw_lower in title.lower(): + results.append({ + "id": paste_id, + "title": _truncate(title, _MAX_TITLE_LEN), + "url": f"https://pastebin.com/{paste_id}", + "snippet": "", + }) + continue + + # Fetch raw content and check + try: + raw_req = urllib.request.Request( + f"https://pastebin.com/raw/{paste_id}", method="GET", + ) + raw_req.add_header("User-Agent", _USER_AGENT) + raw_resp = _urlopen(raw_req, timeout=_FETCH_TIMEOUT) + content = raw_resp.read().decode("utf-8", errors="replace") + raw_resp.close() + except Exception: + continue + + if kw_lower in content.lower(): + results.append({ + "id": paste_id, + "title": _truncate(title or "(untitled)", _MAX_TITLE_LEN), + "url": f"https://pastebin.com/{paste_id}", + "snippet": _snippet_around(content, keyword), + }) + + return results + + +# -- GitHub Gists backend ---------------------------------------------------- + +def _fetch_gists(keyword: str) -> list[dict]: + """Query GitHub public gists and filter by keyword. Blocking.""" + req = urllib.request.Request( + "https://api.github.com/gists/public?per_page=30", method="GET", + ) + req.add_header("User-Agent", _USER_AGENT) + req.add_header("Accept", "application/vnd.github+json") + + resp = _urlopen(req, timeout=_FETCH_TIMEOUT) + raw = resp.read() + resp.close() + + gists = json.loads(raw) + kw_lower = keyword.lower() + results: list[dict] = [] + + for gist in gists if isinstance(gists, list) else []: + gist_id = gist.get("id", "") + if not gist_id: + continue + description = gist.get("description") or "" + html_url = gist.get("html_url", "") + files = gist.get("files") or {} + filenames = " ".join(files.keys()) + + searchable = f"{description} {filenames}" + if kw_lower not in searchable.lower(): + continue + + source = description or filenames + title = _truncate(source or "(no description)", _MAX_TITLE_LEN) + snippet = _snippet_around(source, keyword) if len(source) > _MAX_TITLE_LEN else "" + results.append({ + "id": gist_id, + "title": title, + "url": html_url, + "snippet": snippet, + }) + + return results + + +# -- Backend registry ------------------------------------------------------- + +_BACKENDS: dict[str, callable] = { + "pb": _fetch_pastebin, + "gh": _fetch_gists, +} + + +# -- Polling ----------------------------------------------------------------- + +async def _poll_once(bot, key: str, announce: bool = True) -> None: + """Single poll cycle for one monitor (all backends).""" + data = _monitors.get(key) + if data is None: + data = _load(bot, key) + if data is None: + return + _monitors[key] = data + + keyword = data["keyword"] + now = datetime.now(timezone.utc).isoformat() + data["last_poll"] = now + + loop = asyncio.get_running_loop() + had_success = False + + for tag, backend in _BACKENDS.items(): + try: + items = await loop.run_in_executor(None, backend, keyword) + except Exception as exc: + _log.debug("pastemoni %s/%s error: %s", key, tag, exc) + data.setdefault("last_errors", {})[tag] = str(exc) + continue + + had_success = True + data.setdefault("last_errors", {}).pop(tag, None) + + seen_set = set(data.get("seen", {}).get(tag, [])) + seen_list = list(data.get("seen", {}).get(tag, [])) + new_items = [item for item in items if item["id"] not in seen_set] + + if announce and new_items: + channel = data["channel"] + shown = new_items[:_MAX_ANNOUNCE] + for item in shown: + title = item.get("title") or "(untitled)" + snippet = item.get("snippet", "") + url = item.get("url", "") + parts = [f"[{tag}] {title}"] + if snippet: + parts.append(snippet) + if url: + parts.append(url) + await bot.send(channel, " -- ".join(parts)) + remaining = len(new_items) - len(shown) + if remaining > 0: + await bot.send(channel, f"[{tag}] ... and {remaining} more") + + for item in new_items: + seen_list.append(item["id"]) + if len(seen_list) > _MAX_SEEN: + seen_list = seen_list[-_MAX_SEEN:] + data.setdefault("seen", {})[tag] = seen_list + + if had_success: + _errors[key] = 0 + else: + _errors[key] = _errors.get(key, 0) + 1 + + _monitors[key] = data + _save(bot, key, data) + + +async def _poll_loop(bot, key: str) -> None: + """Infinite poll loop for one monitor.""" + try: + while True: + data = _monitors.get(key) or _load(bot, key) + if data is None: + return + interval = data.get("interval", _DEFAULT_INTERVAL) + errs = _errors.get(key, 0) + if errs >= 5: + interval = min(interval * 2, _MAX_INTERVAL) + await asyncio.sleep(interval) + await _poll_once(bot, key, announce=True) + except asyncio.CancelledError: + pass + + +def _start_poller(bot, key: str) -> None: + """Create and track a poller task.""" + existing = _pollers.get(key) + if existing and not existing.done(): + return + task = asyncio.create_task(_poll_loop(bot, key)) + _pollers[key] = task + + +def _stop_poller(key: str) -> None: + """Cancel and remove a poller task.""" + task = _pollers.pop(key, None) + if task and not task.done(): + task.cancel() + _monitors.pop(key, None) + _errors.pop(key, 0) + + +# -- Restore on connect ----------------------------------------------------- + +def _restore(bot) -> None: + """Rebuild pollers from persisted state.""" + for key in bot.state.keys("pastemoni"): + existing = _pollers.get(key) + if existing and not existing.done(): + continue + data = _load(bot, key) + if data is None: + continue + _monitors[key] = data + _start_poller(bot, key) + + +@event("001") +async def on_connect(bot, message): + """Restore paste monitor pollers on connect.""" + _restore(bot) + + +# -- Command handler --------------------------------------------------------- + +@command("pastemoni", help="Paste monitor: !pastemoni add|del|list|check") +async def cmd_pastemoni(bot, message): + """Per-channel paste site keyword monitoring. + + Usage: + !pastemoni add Add monitor (admin) + !pastemoni del Remove monitor (admin) + !pastemoni list List monitors + !pastemoni check Force-poll now + """ + parts = message.text.split(None, 3) + if len(parts) < 2: + await bot.reply(message, "Usage: !pastemoni [args]") + return + + sub = parts[1].lower() + + # -- list ---------------------------------------------------------------- + if sub == "list": + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + channel = message.target + prefix = f"{channel}:" + monitors = [] + for key in bot.state.keys("pastemoni"): + if key.startswith(prefix): + data = _load(bot, key) + if data: + name = data["name"] + keyword = data.get("keyword", "") + errs = data.get("last_errors", {}) + entry = f"{name} ({keyword})" + if errs: + entry += f" [{len(errs)} errors]" + monitors.append(entry) + if not monitors: + await bot.reply(message, "No monitors in this channel") + return + await bot.reply(message, f"Monitors: {', '.join(monitors)}") + return + + # -- check --------------------------------------------------------------- + if sub == "check": + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + if len(parts) < 3: + await bot.reply(message, "Usage: !pastemoni check ") + return + name = parts[2].lower() + channel = message.target + key = _state_key(channel, name) + data = _load(bot, key) + if data is None: + await bot.reply(message, f"No monitor '{name}' in this channel") + return + _monitors[key] = data + await _poll_once(bot, key, announce=True) + data = _monitors.get(key, data) + errs = data.get("last_errors", {}) + if errs: + tags = ", ".join(sorted(errs)) + await bot.reply(message, f"{name}: errors on {tags}") + else: + await bot.reply(message, f"{name}: checked") + return + + # -- add (admin) --------------------------------------------------------- + if sub == "add": + if not bot._is_admin(message): + await bot.reply(message, "Permission denied: add requires admin") + return + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + if len(parts) < 4: + await bot.reply(message, "Usage: !pastemoni add ") + return + + name = parts[2].lower() + keyword = parts[3] + + if not _validate_name(name): + await bot.reply( + message, + "Invalid name (lowercase alphanumeric + hyphens, 1-20 chars)", + ) + return + + channel = message.target + key = _state_key(channel, name) + + if _load(bot, key) is not None: + await bot.reply( + message, f"Monitor '{name}' already exists in this channel", + ) + return + + ch_prefix = f"{channel}:" + count = sum( + 1 for k in bot.state.keys("pastemoni") if k.startswith(ch_prefix) + ) + if count >= _MAX_MONITORS: + await bot.reply(message, f"Monitor limit reached ({_MAX_MONITORS})") + return + + now = datetime.now(timezone.utc).isoformat() + data = { + "keyword": keyword, + "name": name, + "channel": channel, + "interval": _DEFAULT_INTERVAL, + "added_by": message.nick, + "added_at": now, + "last_poll": now, + "last_errors": {}, + "seen": {}, + } + _save(bot, key, data) + _monitors[key] = data + + async def _seed(): + await _poll_once(bot, key, announce=False) + _start_poller(bot, key) + + asyncio.create_task(_seed()) + + await bot.reply( + message, + f"Monitor '{name}' added for: {keyword} (seeding in background)", + ) + return + + # -- del (admin) --------------------------------------------------------- + if sub == "del": + if not bot._is_admin(message): + await bot.reply(message, "Permission denied: del requires admin") + return + if not message.is_channel: + await bot.reply(message, "Use this command in a channel") + return + if len(parts) < 3: + await bot.reply(message, "Usage: !pastemoni del ") + return + + name = parts[2].lower() + channel = message.target + key = _state_key(channel, name) + + if _load(bot, key) is None: + await bot.reply(message, f"No monitor '{name}' in this channel") + return + + _stop_poller(key) + _delete(bot, key) + await bot.reply(message, f"Removed '{name}'") + return + + await bot.reply(message, "Usage: !pastemoni [args]") diff --git a/tests/test_pastemoni.py b/tests/test_pastemoni.py new file mode 100644 index 0000000..a241f11 --- /dev/null +++ b/tests/test_pastemoni.py @@ -0,0 +1,1015 @@ +"""Tests for the paste site keyword monitor plugin.""" + +import asyncio +import importlib.util +import json +import sys +from pathlib import Path +from unittest.mock import patch + +from derp.irc import Message + +# plugins/ is not a Python package -- load the module from file path +_spec = importlib.util.spec_from_file_location( + "plugins.pastemoni", + Path(__file__).resolve().parent.parent / "plugins" / "pastemoni.py", +) +_mod = importlib.util.module_from_spec(_spec) +sys.modules[_spec.name] = _mod +_spec.loader.exec_module(_mod) + +from plugins.pastemoni import ( # noqa: E402 + _MAX_SEEN, + _ArchiveParser, + _delete, + _errors, + _fetch_gists, + _fetch_pastebin, + _load, + _monitors, + _poll_once, + _pollers, + _restore, + _save, + _snippet_around, + _start_poller, + _state_key, + _stop_poller, + _truncate, + _validate_name, + cmd_pastemoni, + on_connect, +) + +# -- Fixtures ---------------------------------------------------------------- + +ARCHIVE_HTML = """\ + + + + + +
Leaked api_key dump10 sec ago
Random notes30 sec ago
Config file1 min ago
+""" + +RAW_PASTES = { + "EfGh5678": b"this paste has api_key = ABCDEF123456 inside", + "IjKl9012": b"nothing to see here", +} + +GISTS_RESPONSE = [ + { + "id": "gist1", + "description": "contains aws_secret_key configuration", + "html_url": "https://gist.github.com/user1/gist1", + "files": {"config.py": {}}, + }, + { + "id": "gist2", + "description": "hello world example", + "html_url": "https://gist.github.com/user2/gist2", + "files": {"hello.py": {}}, + }, + { + "id": "gist3", + "description": "utility scripts", + "html_url": "https://gist.github.com/user3/gist3", + "files": {"aws_secret_key.env": {}}, + }, +] + + +# -- Helpers ----------------------------------------------------------------- + +class _FakeResp: + """Mock HTTP response.""" + + def __init__(self, body): + self._body = body if isinstance(body, bytes) else body.encode() + + def read(self): + return self._body + + def close(self): + pass + + +class _FakeState: + """In-memory stand-in for bot.state.""" + + def __init__(self): + self._store: dict[str, dict[str, str]] = {} + + def get(self, plugin: str, key: str, default: str | None = None) -> str | None: + return self._store.get(plugin, {}).get(key, default) + + def set(self, plugin: str, key: str, value: str) -> None: + self._store.setdefault(plugin, {})[key] = value + + def delete(self, plugin: str, key: str) -> bool: + try: + del self._store[plugin][key] + return True + except KeyError: + return False + + def keys(self, plugin: str) -> list[str]: + return sorted(self._store.get(plugin, {}).keys()) + + +class _FakeRegistry: + """Minimal registry stand-in.""" + + def __init__(self): + self._modules: dict = {} + + +class _FakeBot: + """Minimal bot stand-in that captures sent/replied messages.""" + + def __init__(self, *, admin: bool = False): + self.sent: list[tuple[str, str]] = [] + self.replied: list[str] = [] + self.state = _FakeState() + self.registry = _FakeRegistry() + self._admin = admin + + async def send(self, target: str, text: str) -> None: + self.sent.append((target, text)) + + async def reply(self, message, text: str) -> None: + self.replied.append(text) + + def _is_admin(self, message) -> bool: + return self._admin + + +def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message: + """Create a channel PRIVMSG.""" + return Message( + raw="", prefix=f"{nick}!~{nick}@host", nick=nick, + command="PRIVMSG", params=[target, text], tags={}, + ) + + +def _pm(text: str, nick: str = "alice") -> Message: + """Create a private PRIVMSG.""" + return Message( + raw="", prefix=f"{nick}!~{nick}@host", nick=nick, + command="PRIVMSG", params=["botname", text], tags={}, + ) + + +def _clear() -> None: + """Reset module-level state between tests.""" + for task in _pollers.values(): + if task and not task.done(): + task.cancel() + _pollers.clear() + _monitors.clear() + _errors.clear() + + +def _fake_pb(keyword): + """Fake Pastebin backend returning two results.""" + return [ + {"id": "pb1", "title": "Test paste", + "url": "https://pastebin.com/pb1", "snippet": "...test keyword..."}, + {"id": "pb2", "title": "Another paste", + "url": "https://pastebin.com/pb2", "snippet": "...test data..."}, + ] + + +def _fake_gh(keyword): + """Fake Gists backend returning one result.""" + return [ + {"id": "gh1", "title": "Test gist", + "url": "https://gist.github.com/gh1", "snippet": "...test content..."}, + ] + + +def _fake_pb_error(keyword): + """Fake Pastebin backend that raises.""" + raise ConnectionError("Pastebin down") + + +def _fake_gh_error(keyword): + """Fake Gists backend that raises.""" + raise ConnectionError("GitHub down") + + +_FAKE_BACKENDS = {"pb": _fake_pb, "gh": _fake_gh} + + +def _mock_pb_urlopen(req, **kw): + """Mock urlopen for Pastebin tests: archive HTML + raw pastes.""" + url = req.full_url if hasattr(req, "full_url") else str(req) + if "pastebin.com/archive" in url: + return _FakeResp(ARCHIVE_HTML) + if "pastebin.com/raw/" in url: + paste_id = url.rsplit("/", 1)[-1] + body = RAW_PASTES.get(paste_id, b"no match here") + return _FakeResp(body) + raise ValueError(f"unexpected URL: {url}") + + +# --------------------------------------------------------------------------- +# TestArchiveParser +# --------------------------------------------------------------------------- + +class TestArchiveParser: + def test_extracts_paste_links(self): + parser = _ArchiveParser() + parser.feed(ARCHIVE_HTML) + assert len(parser.links) == 3 + assert parser.links[0] == ("AbCd1234", "Leaked api_key dump") + assert parser.links[1] == ("EfGh5678", "Random notes") + assert parser.links[2] == ("IjKl9012", "Config file") + + def test_ignores_non_paste_links(self): + html = 'ArchiveAbout' + parser = _ArchiveParser() + parser.feed(html) + assert parser.links == [] + + def test_empty_html(self): + parser = _ArchiveParser() + parser.feed("") + assert parser.links == [] + + +# --------------------------------------------------------------------------- +# TestFetchPastebin +# --------------------------------------------------------------------------- + +class TestFetchPastebin: + def test_keyword_in_title(self): + with patch.object(_mod, "_urlopen", side_effect=_mock_pb_urlopen): + results = _fetch_pastebin("api_key") + # AbCd1234 matches by title, EfGh5678 matches by content + ids = [r["id"] for r in results] + assert "AbCd1234" in ids + # Title match should have empty snippet + title_match = next(r for r in results if r["id"] == "AbCd1234") + assert title_match["snippet"] == "" + assert "pastebin.com/AbCd1234" in title_match["url"] + + def test_keyword_in_content(self): + with patch.object(_mod, "_urlopen", side_effect=_mock_pb_urlopen): + results = _fetch_pastebin("api_key") + content_match = next(r for r in results if r["id"] == "EfGh5678") + assert "api_key" in content_match["snippet"].lower() + assert "pastebin.com/EfGh5678" in content_match["url"] + + def test_no_match(self): + with patch.object(_mod, "_urlopen", side_effect=_mock_pb_urlopen): + results = _fetch_pastebin("nonexistent_keyword_xyz") + assert results == [] + + def test_network_error(self): + import pytest + with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")): + with pytest.raises(ConnectionError): + _fetch_pastebin("test") + + +# --------------------------------------------------------------------------- +# TestFetchGists +# --------------------------------------------------------------------------- + +class TestFetchGists: + def test_keyword_in_description(self): + with patch.object( + _mod, "_urlopen", + return_value=_FakeResp(json.dumps(GISTS_RESPONSE).encode()), + ): + results = _fetch_gists("aws_secret_key") + ids = [r["id"] for r in results] + assert "gist1" in ids + match = next(r for r in results if r["id"] == "gist1") + assert "aws_secret_key" in match["title"].lower() + + def test_keyword_in_filename(self): + with patch.object( + _mod, "_urlopen", + return_value=_FakeResp(json.dumps(GISTS_RESPONSE).encode()), + ): + results = _fetch_gists("aws_secret_key") + ids = [r["id"] for r in results] + assert "gist3" in ids + + def test_no_match(self): + with patch.object( + _mod, "_urlopen", + return_value=_FakeResp(json.dumps(GISTS_RESPONSE).encode()), + ): + results = _fetch_gists("nonexistent_keyword_xyz") + assert results == [] + + def test_api_error(self): + import pytest + with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")): + with pytest.raises(ConnectionError): + _fetch_gists("test") + + def test_non_list_response(self): + """API returning non-list JSON returns empty results.""" + with patch.object( + _mod, "_urlopen", + return_value=_FakeResp(json.dumps({"error": "rate limited"}).encode()), + ): + results = _fetch_gists("test") + assert results == [] + + +# --------------------------------------------------------------------------- +# TestSnippetAround +# --------------------------------------------------------------------------- + +class TestSnippetAround: + def test_short_text_returned_as_is(self): + assert _snippet_around("hello world", "hello") == "hello world" + + def test_long_text_shows_context(self): + text = "x" * 50 + "KEYWORD" + "y" * 50 + result = _snippet_around(text, "KEYWORD", max_len=40) + assert "KEYWORD" in result + assert "..." in result + + def test_empty_text(self): + assert _snippet_around("", "test") == "" + + def test_keyword_not_found(self): + result = _snippet_around("a" * 100, "missing", max_len=40) + assert result.endswith("...") + assert len(result) <= 43 # 40 + "..." + + +# --------------------------------------------------------------------------- +# TestValidateName +# --------------------------------------------------------------------------- + +class TestValidateName: + def test_valid(self): + assert _validate_name("leak-watch") is True + + def test_valid_numbers(self): + assert _validate_name("test123") is True + + def test_invalid_uppercase(self): + assert _validate_name("LeakWatch") is False + + def test_invalid_too_long(self): + assert _validate_name("a" * 21) is False + + def test_invalid_starts_with_hyphen(self): + assert _validate_name("-bad") is False + + def test_invalid_empty(self): + assert _validate_name("") is False + + +# --------------------------------------------------------------------------- +# TestTruncate +# --------------------------------------------------------------------------- + +class TestTruncate: + def test_short_unchanged(self): + assert _truncate("hello", 60) == "hello" + + def test_exact_length(self): + text = "a" * 60 + assert _truncate(text, 60) == text + + def test_long_truncated(self): + text = "a" * 80 + result = _truncate(text, 60) + assert len(result) == 60 + assert result.endswith("...") + + +# --------------------------------------------------------------------------- +# TestStateHelpers +# --------------------------------------------------------------------------- + +class TestStateHelpers: + def test_save_and_load(self): + bot = _FakeBot() + data = {"keyword": "test", "name": "t"} + _save(bot, "#ch:t", data) + loaded = _load(bot, "#ch:t") + assert loaded == data + + def test_load_missing(self): + bot = _FakeBot() + assert _load(bot, "nonexistent") is None + + def test_delete(self): + bot = _FakeBot() + _save(bot, "#ch:t", {"name": "t"}) + _delete(bot, "#ch:t") + assert _load(bot, "#ch:t") is None + + def test_state_key(self): + assert _state_key("#ops", "leak-watch") == "#ops:leak-watch" + + def test_load_invalid_json(self): + bot = _FakeBot() + bot.state.set("pastemoni", "bad", "not json{{{") + assert _load(bot, "bad") is None + + +# --------------------------------------------------------------------------- +# TestPollOnce +# --------------------------------------------------------------------------- + +class TestPollOnce: + def test_new_items_announced(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "poll", "channel": "#test", + "interval": 300, "seen": {"pb": [], "gh": []}, + "last_poll": "", "last_errors": {}, + } + key = "#test:poll" + _save(bot, key, data) + _monitors[key] = data + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await _poll_once(bot, key, announce=True) + msgs = [s for t, s in bot.sent if t == "#test"] + pb_msgs = [m for m in msgs if "[pb]" in m] + gh_msgs = [m for m in msgs if "[gh]" in m] + assert len(pb_msgs) == 2 + assert len(gh_msgs) == 1 + + asyncio.run(inner()) + + def test_seen_items_deduped(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "dedup", "channel": "#test", + "interval": 300, + "seen": {"pb": ["pb1", "pb2"], "gh": ["gh1"]}, + "last_poll": "", "last_errors": {}, + } + key = "#test:dedup" + _save(bot, key, data) + _monitors[key] = data + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await _poll_once(bot, key, announce=True) + assert len(bot.sent) == 0 + + asyncio.run(inner()) + + def test_error_increments_counter(self): + """All backends failing increments the error counter.""" + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "errs", "channel": "#test", + "interval": 300, "seen": {"pb": [], "gh": []}, + "last_poll": "", "last_errors": {}, + } + key = "#test:errs" + _save(bot, key, data) + _monitors[key] = data + all_fail = {"pb": _fake_pb_error, "gh": _fake_gh_error} + + async def inner(): + with patch.object(_mod, "_BACKENDS", all_fail): + await _poll_once(bot, key, announce=True) + assert _errors[key] == 1 + assert len(bot.sent) == 0 + + asyncio.run(inner()) + + def test_partial_failure_resets_counter(self): + """One backend succeeding resets the error counter.""" + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "partial", "channel": "#test", + "interval": 300, "seen": {"pb": [], "gh": []}, + "last_poll": "", "last_errors": {}, + } + key = "#test:partial" + _save(bot, key, data) + _monitors[key] = data + _errors[key] = 3 + partial_fail = {"pb": _fake_pb_error, "gh": _fake_gh} + + async def inner(): + with patch.object(_mod, "_BACKENDS", partial_fail): + await _poll_once(bot, key, announce=True) + assert _errors[key] == 0 + gh_msgs = [s for t, s in bot.sent if t == "#test" and "[gh]" in s] + assert len(gh_msgs) == 1 + + asyncio.run(inner()) + + def test_max_announce_cap(self): + """Only MAX_ANNOUNCE items announced per backend.""" + _clear() + bot = _FakeBot() + + def _fake_many(keyword): + return [ + {"id": f"p{i}", "title": f"Paste {i}", + "url": f"https://example.com/{i}", "snippet": ""} + for i in range(10) + ] + + data = { + "keyword": "test", "name": "cap", "channel": "#test", + "interval": 300, "seen": {"pb": [], "gh": []}, + "last_poll": "", "last_errors": {}, + } + key = "#test:cap" + _save(bot, key, data) + _monitors[key] = data + + async def inner(): + with patch.object(_mod, "_BACKENDS", {"pb": _fake_many}): + await _poll_once(bot, key, announce=True) + msgs = [s for t, s in bot.sent if t == "#test"] + # 5 items + 1 "... and 5 more" + assert len(msgs) == 6 + assert "5 more" in msgs[-1] + + asyncio.run(inner()) + + def test_no_announce_flag(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "quiet", "channel": "#test", + "interval": 300, "seen": {"pb": [], "gh": []}, + "last_poll": "", "last_errors": {}, + } + key = "#test:quiet" + _save(bot, key, data) + _monitors[key] = data + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await _poll_once(bot, key, announce=False) + assert len(bot.sent) == 0 + updated = _load(bot, key) + assert len(updated["seen"]["pb"]) == 2 + assert len(updated["seen"]["gh"]) == 1 + + asyncio.run(inner()) + + def test_seen_cap(self): + """Seen list capped at MAX_SEEN per backend.""" + _clear() + bot = _FakeBot() + + def _fake_many(keyword): + return [ + {"id": f"v{i}", "title": f"V{i}", "url": "", "snippet": ""} + for i in range(250) + ] + + data = { + "keyword": "test", "name": "seencap", "channel": "#test", + "interval": 300, "seen": {"pb": []}, + "last_poll": "", "last_errors": {}, + } + key = "#test:seencap" + _save(bot, key, data) + _monitors[key] = data + + async def inner(): + with patch.object(_mod, "_BACKENDS", {"pb": _fake_many}): + await _poll_once(bot, key, announce=False) + updated = _load(bot, key) + assert len(updated["seen"]["pb"]) == _MAX_SEEN + assert updated["seen"]["pb"][0] == "v50" + + asyncio.run(inner()) + + +# --------------------------------------------------------------------------- +# TestCommands +# --------------------------------------------------------------------------- + +class TestCmdAdd: + def test_add_success(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await cmd_pastemoni(bot, _msg("!pastemoni add leak-watch api_key")) + await asyncio.sleep(0.2) + assert len(bot.replied) == 1 + assert "Monitor 'leak-watch' added" in bot.replied[0] + assert "api_key" in bot.replied[0] + data = _load(bot, "#test:leak-watch") + assert data is not None + assert data["keyword"] == "api_key" + assert data["channel"] == "#test" + assert len(data["seen"]["pb"]) == 2 + assert len(data["seen"]["gh"]) == 1 + assert "#test:leak-watch" in _pollers + _stop_poller("#test:leak-watch") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_add_requires_admin(self): + _clear() + bot = _FakeBot(admin=False) + asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni add test keyword"))) + assert "Permission denied" in bot.replied[0] + + def test_add_requires_channel(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_pastemoni(bot, _pm("!pastemoni add test keyword"))) + assert "Use this command in a channel" in bot.replied[0] + + def test_add_invalid_name(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni add BAD! keyword"))) + assert "Invalid name" in bot.replied[0] + + def test_add_missing_keyword(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni add myname"))) + assert "Usage:" in bot.replied[0] + + def test_add_duplicate(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await cmd_pastemoni(bot, _msg("!pastemoni add dupe keyword")) + await asyncio.sleep(0.1) + bot.replied.clear() + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await cmd_pastemoni(bot, _msg("!pastemoni add dupe other")) + assert "already exists" in bot.replied[0] + _stop_poller("#test:dupe") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_add_limit(self): + _clear() + bot = _FakeBot(admin=True) + for i in range(20): + _save(bot, f"#test:mon{i}", {"name": f"mon{i}", "channel": "#test"}) + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await cmd_pastemoni(bot, _msg("!pastemoni add overflow keyword")) + assert "limit reached" in bot.replied[0] + + asyncio.run(inner()) + + +class TestCmdDel: + def test_del_success(self): + _clear() + bot = _FakeBot(admin=True) + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await cmd_pastemoni(bot, _msg("!pastemoni add todel keyword")) + await asyncio.sleep(0.1) + bot.replied.clear() + await cmd_pastemoni(bot, _msg("!pastemoni del todel")) + assert "Removed 'todel'" in bot.replied[0] + assert _load(bot, "#test:todel") is None + assert "#test:todel" not in _pollers + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_del_requires_admin(self): + _clear() + bot = _FakeBot(admin=False) + asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni del test"))) + assert "Permission denied" in bot.replied[0] + + def test_del_nonexistent(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni del nosuch"))) + assert "No monitor" in bot.replied[0] + + def test_del_no_name(self): + _clear() + bot = _FakeBot(admin=True) + asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni del"))) + assert "Usage:" in bot.replied[0] + + +class TestCmdList: + def test_list_empty(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni list"))) + assert "No monitors" in bot.replied[0] + + def test_list_populated(self): + _clear() + bot = _FakeBot() + _save(bot, "#test:leaks", { + "name": "leaks", "channel": "#test", "keyword": "api_key", + "last_errors": {}, + }) + _save(bot, "#test:creds", { + "name": "creds", "channel": "#test", "keyword": "password", + "last_errors": {}, + }) + asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni list"))) + assert "Monitors:" in bot.replied[0] + assert "leaks" in bot.replied[0] + assert "creds" in bot.replied[0] + + def test_list_shows_errors(self): + _clear() + bot = _FakeBot() + _save(bot, "#test:broken", { + "name": "broken", "channel": "#test", "keyword": "test", + "last_errors": {"pb": "Connection refused"}, + }) + asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni list"))) + assert "broken" in bot.replied[0] + assert "1 errors" in bot.replied[0] + + def test_list_requires_channel(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_pastemoni(bot, _pm("!pastemoni list"))) + assert "Use this command in a channel" in bot.replied[0] + + def test_list_channel_isolation(self): + _clear() + bot = _FakeBot() + _save(bot, "#test:mine", { + "name": "mine", "channel": "#test", "keyword": "test", + "last_errors": {}, + }) + _save(bot, "#other:theirs", { + "name": "theirs", "channel": "#other", "keyword": "test", + "last_errors": {}, + }) + asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni list"))) + assert "mine" in bot.replied[0] + assert "theirs" not in bot.replied[0] + + +class TestCmdCheck: + def test_check_success(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "chk", "channel": "#test", + "interval": 300, "seen": {"pb": ["pb1", "pb2"], "gh": ["gh1"]}, + "last_poll": "", "last_errors": {}, + } + _save(bot, "#test:chk", data) + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await cmd_pastemoni(bot, _msg("!pastemoni check chk")) + assert "chk: checked" in bot.replied[0] + + asyncio.run(inner()) + + def test_check_nonexistent(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni check nope"))) + assert "No monitor" in bot.replied[0] + + def test_check_requires_channel(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_pastemoni(bot, _pm("!pastemoni check test"))) + assert "Use this command in a channel" in bot.replied[0] + + def test_check_shows_error(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "errchk", "channel": "#test", + "interval": 300, "seen": {"pb": [], "gh": []}, + "last_poll": "", "last_errors": {}, + } + _save(bot, "#test:errchk", data) + all_fail = {"pb": _fake_pb_error, "gh": _fake_gh_error} + + async def inner(): + with patch.object(_mod, "_BACKENDS", all_fail): + await cmd_pastemoni(bot, _msg("!pastemoni check errchk")) + assert "error" in bot.replied[0].lower() + + asyncio.run(inner()) + + def test_check_announces_new_items(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "news", "channel": "#test", + "interval": 300, "seen": {"pb": ["pb1"], "gh": []}, + "last_poll": "", "last_errors": {}, + } + _save(bot, "#test:news", data) + + async def inner(): + with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS): + await cmd_pastemoni(bot, _msg("!pastemoni check news")) + msgs = [s for t, s in bot.sent if t == "#test"] + pb_msgs = [m for m in msgs if "[pb]" in m] + gh_msgs = [m for m in msgs if "[gh]" in m] + assert len(pb_msgs) == 1 # pb2 only (pb1 seen) + assert len(gh_msgs) == 1 # gh1 new + + asyncio.run(inner()) + + def test_check_no_name(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni check"))) + assert "Usage:" in bot.replied[0] + + +class TestCmdUsage: + def test_no_args(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni"))) + assert "Usage:" in bot.replied[0] + + def test_unknown_subcommand(self): + _clear() + bot = _FakeBot() + asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni foobar"))) + assert "Usage:" in bot.replied[0] + + +# --------------------------------------------------------------------------- +# TestRestore +# --------------------------------------------------------------------------- + +class TestRestore: + def test_pollers_rebuilt_from_state(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "restored", "channel": "#test", + "interval": 300, "seen": {"pb": [], "gh": []}, + "last_poll": "", "last_errors": {}, + } + _save(bot, "#test:restored", data) + + async def inner(): + _restore(bot) + assert "#test:restored" in _pollers + assert not _pollers["#test:restored"].done() + _stop_poller("#test:restored") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_restore_skips_active(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "active", "channel": "#test", + "interval": 300, "seen": {"pb": [], "gh": []}, + "last_poll": "", "last_errors": {}, + } + _save(bot, "#test:active", data) + + async def inner(): + dummy = asyncio.create_task(asyncio.sleep(9999)) + _pollers["#test:active"] = dummy + _restore(bot) + assert _pollers["#test:active"] is dummy + dummy.cancel() + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_restore_replaces_done_task(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "done", "channel": "#test", + "interval": 300, "seen": {"pb": [], "gh": []}, + "last_poll": "", "last_errors": {}, + } + _save(bot, "#test:done", data) + + async def inner(): + done_task = asyncio.create_task(asyncio.sleep(0)) + await done_task + _pollers["#test:done"] = done_task + _restore(bot) + new_task = _pollers["#test:done"] + assert new_task is not done_task + assert not new_task.done() + _stop_poller("#test:done") + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_restore_skips_bad_json(self): + _clear() + bot = _FakeBot() + bot.state.set("pastemoni", "#test:bad", "not json{{{") + + async def inner(): + _restore(bot) + assert "#test:bad" not in _pollers + + asyncio.run(inner()) + + def test_on_connect_calls_restore(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "conn", "channel": "#test", + "interval": 300, "seen": {"pb": [], "gh": []}, + "last_poll": "", "last_errors": {}, + } + _save(bot, "#test:conn", data) + + async def inner(): + msg = _msg("", target="botname") + await on_connect(bot, msg) + assert "#test:conn" in _pollers + _stop_poller("#test:conn") + await asyncio.sleep(0) + + asyncio.run(inner()) + + +# --------------------------------------------------------------------------- +# TestPollerManagement +# --------------------------------------------------------------------------- + +class TestPollerManagement: + def test_start_and_stop(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "mgmt", "channel": "#test", + "interval": 300, "seen": {"pb": [], "gh": []}, + "last_poll": "", "last_errors": {}, + } + key = "#test:mgmt" + _save(bot, key, data) + _monitors[key] = data + + async def inner(): + _start_poller(bot, key) + assert key in _pollers + assert not _pollers[key].done() + _stop_poller(key) + await asyncio.sleep(0) + assert key not in _pollers + assert key not in _monitors + + asyncio.run(inner()) + + def test_start_idempotent(self): + _clear() + bot = _FakeBot() + data = { + "keyword": "test", "name": "idem", "channel": "#test", + "interval": 300, "seen": {"pb": [], "gh": []}, + "last_poll": "", "last_errors": {}, + } + key = "#test:idem" + _save(bot, key, data) + _monitors[key] = data + + async def inner(): + _start_poller(bot, key) + first = _pollers[key] + _start_poller(bot, key) + assert _pollers[key] is first + _stop_poller(key) + await asyncio.sleep(0) + + asyncio.run(inner()) + + def test_stop_nonexistent(self): + _clear() + _stop_poller("#test:nonexistent")