Compare commits

...

10 Commits

Author SHA1 Message Date
user
c3b19feb0f feat: add paste site keyword monitor plugin
Poll Pastebin archive and GitHub Gists for keyword matches,
announce hits to subscribed IRC channels. Follows rss.py
polling/subscription pattern with state persistence.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 09:01:46 +01:00
user
1836fa50af feat: paste overflow via FlaskPaste for long replies
Add Bot.long_reply() that sends lines directly when under threshold,
or creates a FlaskPaste paste with preview + link when over. Refactor
abuseipdb, alert history, crtsh, dork, exploitdb, and subdomain
plugins to use long_reply(). Configurable paste_threshold (default: 4).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 22:07:31 +01:00
user
8cabe0f8e8 feat: add URL title preview plugin
Event-driven plugin that auto-fetches page titles for URLs posted in
channel messages. HEAD-then-GET via SOCKS5 pool, og:title priority,
cooldown dedup, !-suppression, binary/host filtering. 52 tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 21:57:00 +01:00
user
7606280358 fix: repair broken tests across alert, chanmgmt, and integration
- test_alert: remove stale _MAX_ANNOUNCE import/test, update _errors
  assertions for per-backend dict, fix announcement checks (action vs
  send), mock _fetch_og_batch in seeding tests, fix YouTube/SearX mock
  targets (urllib.request.urlopen), include keyword in fake data titles
- test_chanmgmt: add _FakeState to _FakeBot (on_invite now persists)
- test_integration: update help assertion for new output format

696 tests pass, 0 failures.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 21:14:44 +01:00
user
94f563d55a feat: connection pooling via urllib3 + batch OG fetching
Replace per-request SOCKS5+TLS handshakes with urllib3 SOCKSProxyManager
connection pool (20 pools, 4 conns/host). Batch _fetch_og calls via
ThreadPoolExecutor to parallelize OG tag enrichment in alert polling.
Cache flaskpaste SSL context at module level.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 20:52:22 +01:00
user
e11994f320 docs: update for v1.2.1 performance changes
- USAGE.md: alert output format, background seeding, per-backend errors,
  concurrent fetches
- CHEATSHEET.md: updated alert section
- DEBUG.md: added profiling section (cProfile + tracemalloc)
- ROADMAP.md: added v1.2.1 milestone

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 18:09:53 +01:00
user
a2a607baa2 fix: write tracemalloc dump to file instead of logger
Podman's log buffer truncates the output. Write full traceback dump
to data/derp.malloc with per-allocation stack traces.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 13:16:22 +01:00
user
404800af94 docs: update TASKS.md with v1.2.1 performance work
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 12:47:43 +01:00
user
694c775782 fix: remove title truncation from alert backend builders
Mastodon, Bluesky, GitHub, GitLab, npm, PyPI, and arXiv backends
no longer truncate content/descriptions in titles. Full text is
shown on the PRIVMSG line; only !alert history keeps truncation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 12:44:48 +01:00
user
9672e325c2 fix: show full alert titles, split metadata into ACTION line
ACTION carries the tag/date/URL, PRIVMSG carries the uncropped title.
Removes _truncate on alert output for better readability.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 12:22:23 +01:00
28 changed files with 3158 additions and 185 deletions

View File

@@ -2,7 +2,8 @@ FROM python:3.13-alpine
WORKDIR /app
RUN pip install --no-cache-dir maxminddb>=2.0 PySocks>=1.7.1
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
ENV PYTHONPATH=/app/src
ENV PYTHONUNBUFFERED=1

View File

@@ -97,6 +97,17 @@
- [x] Graceful SIGTERM shutdown
- [x] InnerTube-based YouTube channel resolution for video URLs
## v1.2.1 -- Performance + Polish (done)
- [x] HTTP opener caching at module level (eliminates per-request construction)
- [x] `--tracemalloc` CLI flag for memory profiling (dumps to `data/derp.malloc`)
- [x] Background seeding on `!alert add` (instant reply, seeds asynchronously)
- [x] Per-backend error tracking with exponential backoff
- [x] Concurrent fetches for multi-instance backends (PeerTube, Mastodon, Lemmy, SearXNG)
- [x] `retries` parameter for `derp.http.urlopen`
- [x] Alert output: ACTION line (metadata/URL) + PRIVMSG (full uncropped title)
- [x] tracemalloc writes to file instead of logger (survives podman log buffer)
## v2.0.0 -- Multi-Server + Integrations
- [ ] Multi-server support (per-server config, shared plugins)

View File

@@ -1,27 +1,72 @@
# derp - Tasks
## Current Sprint -- v1.2.0 Subscriptions + Proxy (2026-02-16)
## Current Sprint -- v1.2.5 Paste Site Keyword Monitor (2026-02-18)
| Pri | Status | Task |
|-----|--------|------|
| P0 | [x] | `rss` plugin (RSS/Atom feed subscriptions) |
| P0 | [x] | `yt` plugin (YouTube channel follow via Atom feeds) |
| P0 | [x] | `twitch` plugin (livestream notifications via GQL) |
| P0 | [x] | `alert` plugin (keyword alerts, 27 backends) |
| P0 | [x] | SOCKS5 proxy transport layer (HTTP, TCP, async) |
| P1 | [x] | `searx` plugin (SearXNG web search) |
| P1 | [x] | `tdns` plugin (TCP DNS via SOCKS5) |
| P1 | [x] | `remind` plugin (one-shot, repeating, calendar) |
| P1 | [x] | Alert history (SQLite) with short IDs + `!alert info` |
| P1 | [x] | OG tag fetching for keyword match + date enrichment |
| P1 | [x] | InnerTube channel resolution for video URLs |
| P2 | [x] | Invite auto-join with persistence |
| P2 | [x] | Graceful SIGTERM shutdown |
| P0 | [x] | Pastemoni plugin (`plugins/pastemoni.py`) |
| P0 | [x] | Pastebin archive scraping + raw content matching |
| P0 | [x] | GitHub Gists API keyword filtering |
| P1 | [x] | Polling/subscription architecture (rss.py pattern) |
| P1 | [x] | State persistence + restore on connect |
| P1 | [x] | Command handler: add/del/list/check |
| P2 | [x] | Tests for pastemoni (15 test classes, ~45 cases) |
| P2 | [x] | Documentation update (USAGE.md) |
## Previous Sprint -- v1.2.4 URL Title Preview (2026-02-17)
| Pri | Status | Task |
|-----|--------|------|
| P0 | [x] | URL title preview plugin (`plugins/urltitle.py`) |
| P0 | [x] | HEAD-then-GET fetch via SOCKS5 connection pool |
| P1 | [x] | `_TitleParser`: og:title/description + `<title>` fallback |
| P1 | [x] | URL extraction with `!`-suppression and balanced parens |
| P1 | [x] | Dedup/cooldown (5 min, 500 entry cache) |
| P1 | [x] | Skip non-HTML, binary extensions, FlaskPaste host |
| P2 | [x] | Tests for urltitle (11 test classes, ~40 cases) |
| P2 | [x] | Documentation update (USAGE.md) |
## Previous Sprint -- v1.2.3 Paste Overflow (2026-02-17)
| Pri | Status | Task |
|-----|--------|------|
| P0 | [x] | `Bot.long_reply()` method with FlaskPaste overflow |
| P0 | [x] | Configurable `paste_threshold` (default: 4) |
| P1 | [x] | Refactor alert history to use `long_reply()` |
| P1 | [x] | Refactor exploitdb search/cve to use `long_reply()` |
| P1 | [x] | Refactor subdomain, crtsh, abuseipdb, dork to use `long_reply()` |
| P2 | [x] | Tests for paste overflow (10 cases) |
## Previous Sprint -- v1.2.2 Connection Pooling + Batch OG (2026-02-17)
| Pri | Status | Task |
|-----|--------|------|
| P0 | [x] | Batch `_fetch_og` calls via ThreadPoolExecutor (alert.py) |
| P0 | [x] | Connection pooling via `urllib3[socks]` SOCKSProxyManager (http.py) |
| P1 | [x] | Cache FlaskPaste `_ssl_context()` at module level |
| P1 | [x] | Backward-compat `urllib.error.HTTPError` for 4xx/5xx in pooled path |
| P1 | [x] | Legacy opener fallback for `context=` callers (username.py) |
| P2 | [x] | Containerfile uses requirements.txt for deps |
## Previous Sprint -- v1.2.1 Performance + Polish (2026-02-17)
| Pri | Status | Task |
|-----|--------|------|
| P1 | [x] | Cache default HTTP opener at module level |
| P1 | [x] | `--tracemalloc` CLI flag for memory profiling |
| P1 | [x] | Background seeding on `!alert add` (instant reply) |
| P1 | [x] | Per-backend error tracking with exponential backoff |
| P1 | [x] | Concurrent fetches for multi-instance backends (PeerTube, Mastodon, Lemmy, SearXNG) |
| P1 | [x] | `retries` parameter for `derp.http.urlopen` |
| P2 | [x] | Full alert titles (ACTION metadata + PRIVMSG content) |
| P2 | [x] | Remove title truncation from backend builders |
## Completed
| Date | Task |
|------|------|
| 2026-02-17 | v1.2.3 (paste overflow with FlaskPaste integration) |
| 2026-02-17 | v1.2.1 (HTTP opener cache, alert perf, concurrent multi-instance, tracemalloc) |
| 2026-02-16 | v1.2.0 (subscriptions, alerts, proxy, reminders) |
| 2026-02-15 | Calendar-based reminders (at/yearly) with persistence |
| 2026-02-15 | v1.1.0 (channel filter, JSON logging, dork, wayback, tests) |

View File

@@ -384,9 +384,12 @@ Archive.org (ia), Hacker News (hn), GitHub (gh), Wikipedia (wp),
Stack Exchange (se), GitLab (gl), npm (nm), PyPI (pp), Docker Hub (dh),
arXiv (ax), Lobsters (lb), DEV.to (dv), Medium (md), Hugging Face (hf).
Names: lowercase alphanumeric + hyphens, 1-20 chars. Keywords: 1-100 chars.
Max 20 alerts/channel. Polls every 5min. Format: `[name/yt/a8k2m] Title -- URL`.
Use `!alert info <id>` to see full details. No API credentials needed. Persists
across restarts. History stored in `data/alert_history.db`.
Max 20 alerts/channel. Polls every 5min. Output: ACTION with `[name/tag/id] date - URL`,
then PRIVMSG with full title. `add` replies instantly (seeds in background).
Per-backend error tracking (5+ errors backs off that backend only).
Multi-instance backends (pt, ft, ly, sx) fetch concurrently.
Use `!alert info <id>` for details. Persists across restarts.
History in `data/alert_history.db`.
## SearX

View File

@@ -97,6 +97,41 @@ ERROR derp.plugin failed to load plugin: plugins/broken.py
- In container: resolver is typically `127.0.0.11` (Podman DNS)
- Fallback: `8.8.8.8` if no resolver found in `/etc/resolv.conf`
## Profiling
### CPU (cProfile)
```bash
derp --cprofile # Dump to derp.prof on shutdown
derp --cprofile /app/data/derp.prof # Custom path
```
Analyze with:
```python
import pstats
p = pstats.Stats("data/derp.prof")
p.sort_stats("tottime").print_stats(30)
p.sort_stats("cumulative").print_stats("plugins/", 20)
```
### Memory (tracemalloc)
```bash
derp --tracemalloc # 10 frames (default)
derp --tracemalloc 25 # 25 frames deep
```
Writes top 25 allocations with full tracebacks to `data/derp.malloc`
on clean shutdown. Both flags can be combined:
```bash
derp --verbose --cprofile /app/data/derp.prof --tracemalloc
```
Requires clean SIGTERM shutdown (not SIGKILL) to flush data.
Use `podman stop -t 30 derp` to allow graceful shutdown.
## Testing IRC Connection
```bash

View File

@@ -50,6 +50,7 @@ channels = ["#test"] # Channels to join on connect
plugins_dir = "plugins" # Plugin directory path
rate_limit = 2.0 # Max messages per second (default: 2.0)
rate_burst = 5 # Burst capacity (default: 5)
paste_threshold = 4 # Max lines before overflow to FlaskPaste (default: 4)
admins = [] # Hostmask patterns (fnmatch), IRCOPs auto-detected
timezone = "UTC" # Timezone for calendar reminders (IANA tz name)
@@ -133,6 +134,7 @@ format = "text" # Log format: "text" (default) or "json"
| `!vt <hash\|ip\|domain\|url>` | VirusTotal lookup |
| `!emailcheck <email> [email2 ...]` | SMTP email verification (admin) |
| `!shorten <url>` | Shorten a URL via FlaskPaste |
| `!pastemoni <add\|del\|list\|check>` | Paste site keyword monitoring |
### Command Shorthand
@@ -721,19 +723,24 @@ Platforms searched:
Polling and announcements:
- Alerts are polled every 5 minutes by default
- On `add`, existing results are recorded without announcing (prevents flood)
- New results announced as `[name/<tag>/<id>] Title -- URL` where tag is one of:
`yt`, `tw`, `sx`, `rd`, `ft`, `dg`, `gn`, `kk`, `dm`, `pt`, `bs`, `ly`, `od`, `ia`,
`hn`, `gh`, `wp`, `se`, `gl`, `nm`, `pp`, `dh`, `ax`, `lb`, `dv`, `md`, `hf`
and `<id>` is a short deterministic ID for use with `!alert info`
- Titles are truncated to 80 characters
- On `add`, the bot replies immediately; existing results are seeded in the
background to avoid flooding
- New results announced as two lines:
- ACTION: `* derp [name/<tag>/<id>] date - URL`
- PRIVMSG: full uncropped title/content
- Tags: `yt`, `tw`, `sx`, `rd`, `ft`, `dg`, `gn`, `kk`, `dm`, `pt`, `bs`, `ly`,
`od`, `ia`, `hn`, `gh`, `wp`, `se`, `gl`, `nm`, `pp`, `dh`, `ax`, `lb`, `dv`,
`md`, `hf` -- `<id>` is a short deterministic ID for use with `!alert info`
- Each platform maintains its own seen list (capped at 200 per platform)
- 5 consecutive errors doubles the poll interval (max 1 hour)
- Per-backend error tracking with exponential backoff (5+ errors skips
that backend with increasing cooldown; other backends unaffected)
- Multi-instance backends (PeerTube, Mastodon, Lemmy, SearXNG) fetch
concurrently for faster polling
- Subscriptions persist across bot restarts via `bot.state`
- Matched results are stored in `data/alert_history.db` (SQLite)
- `list` shows error status indicators next to each alert
- `list` shows per-backend error counts next to each alert
- `check` forces an immediate poll across all platforms
- `history` queries stored results, most recent first
- `history` queries stored results (titles truncated), most recent first
### `!jwt` -- JWT Decoder
@@ -866,6 +873,44 @@ https://paste.mymx.me/s/AbCdEfGh
- mTLS client cert skips PoW; falls back to PoW challenge if no cert
- Also used internally by `!alert` to shorten announcement URLs
### `!pastemoni` -- Paste Site Keyword Monitor
Monitor public paste sites for keywords (data leaks, credential dumps, brand
mentions). Polls Pastebin's archive and GitHub's public Gists API on a
schedule, checks new pastes for keyword matches, and announces hits to the
subscribed IRC channel.
```
!pastemoni add <name> <keyword> Add monitor (admin)
!pastemoni del <name> Remove monitor (admin)
!pastemoni list List monitors
!pastemoni check <name> Force-poll now
```
- `add` and `del` require admin privileges
- All subcommands must be used in a channel (not PM)
- Names must be lowercase alphanumeric + hyphens, 1-20 characters
- Maximum 20 monitors per channel
Backends:
- **Pastebin** (`pb`) -- Scrapes `pastebin.com/archive` for recent pastes,
fetches raw content, case-insensitive keyword match against title + content
- **GitHub Gists** (`gh`) -- Queries `api.github.com/gists/public`, matches
keyword against description and filenames
Polling and announcements:
- Monitors are polled every 5 minutes by default
- On `add`, existing items are seeded in the background (no flood)
- New matches announced as `[tag] Title -- snippet -- URL`
- Maximum 5 items announced per backend per poll; excess shown as `... and N more`
- Titles truncated to 60 characters, snippets to 80 characters
- 5 consecutive all-backend failures doubles the poll interval (max 1 hour)
- Subscriptions persist across bot restarts via `bot.state`
- `list` shows keyword and per-backend error counts
- `check` forces an immediate poll across all backends
### FlaskPaste Configuration
```toml
@@ -875,3 +920,46 @@ url = "https://paste.mymx.me" # or set FLASKPASTE_URL env var
Auth: place client cert/key at `secrets/flaskpaste/derp.crt` and `derp.key`
for mTLS (bypasses PoW). Without them, PoW challenges are solved per request.
### URL Title Preview (urltitle)
Automatic URL title preview for channel messages. When a user posts a URL,
the bot fetches the page title and description and displays a one-line
preview. No commands -- event-driven only.
```
<alice> check out https://example.com/article
<derp> ↳ Article Title -- Description of the article...
```
Behavior:
- Automatically previews HTTP(S) URLs posted in channel messages
- Skips private messages, bot's own messages, and command messages (`!prefix`)
- URLs prefixed with `!` are suppressed: `!https://example.com` produces no preview
- HEAD-then-GET fetch strategy (checks Content-Type before downloading body)
- Skips non-HTML content types (images, PDFs, JSON, etc.)
- Skips binary file extensions (`.png`, `.jpg`, `.pdf`, `.zip`, etc.)
- Skips FlaskPaste URLs and configured ignore hosts
- Dedup: same URL only previewed once per cooldown window (5 min default)
- Max 3 URLs previewed per message (configurable)
- Title from `og:title` takes priority over `<title>` tag
- Description from `og:description` takes priority over `<meta name="description">`
- Title truncated at 200 chars, description at 150 chars
Output format:
```
↳ Page Title -- Description truncated to 150 chars...
↳ Page Title
```
Configuration (optional):
```toml
[urltitle]
cooldown = 300 # seconds before same URL previewed again
timeout = 10 # HTTP fetch timeout
max_urls = 3 # max URLs to preview per message
ignore_hosts = [] # additional hostnames to skip
```

View File

@@ -142,5 +142,4 @@ async def cmd_abuse(bot, message):
return f"{addr} -- error: {exc}"
results = await asyncio.gather(*[_query(a) for a in addrs])
for line in results:
await bot.reply(message, line)
await bot.long_reply(message, list(results), label="abuse check")

View File

@@ -330,6 +330,23 @@ def _fetch_og(url: str) -> tuple[str, str, str]:
return "", "", ""
def _fetch_og_batch(urls: list[str]) -> dict[str, tuple[str, str, str]]:
"""Fetch OG tags for multiple URLs concurrently.
Returns {url: (og_title, og_description, date)} for each input URL.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
if not urls:
return {}
results: dict[str, tuple[str, str, str]] = {}
with ThreadPoolExecutor(max_workers=min(len(urls), 8)) as pool:
futures = {pool.submit(_fetch_og, url): url for url in urls}
for fut in as_completed(futures):
results[futures[fut]] = fut.result()
return results
# -- YouTube InnerTube search (blocking) ------------------------------------
def _extract_videos(obj: object, depth: int = 0) -> list[dict]:
@@ -604,7 +621,7 @@ def _search_mastodon(keyword: str) -> list[dict]:
continue
acct = (status.get("account") or {}).get("acct", "")
content = _strip_html(status.get("content", ""))
title = f"@{acct}: {_truncate(content, 60)}" if acct else content
title = f"@{acct}: {content}" if acct else content
items.append({
"id": status_url,
"title": title,
@@ -903,7 +920,7 @@ def _search_bluesky(keyword: str) -> list[dict]:
display = author.get("displayName") or handle
record = post.get("record") or {}
text = record.get("text", "")
title = f"@{display}: {_truncate(text, 60)}"
title = f"@{display}: {text}"
date = _parse_date(record.get("createdAt", ""))
post_url = f"https://bsky.app/profile/{handle}/post/{rkey}" if handle else ""
results.append({
@@ -1143,7 +1160,7 @@ def _search_github(keyword: str) -> list[dict]:
stars = repo.get("stargazers_count", 0)
title = full_name
if description:
title += f": {_truncate(description, 50)}"
title += f": {description}"
if stars:
title += f" [{stars}*]"
date = _parse_date(repo.get("updated_at", ""))
@@ -1280,7 +1297,7 @@ def _search_gitlab(keyword: str) -> list[dict]:
stars = repo.get("star_count", 0)
title = name
if description:
title += f": {_truncate(description, 50)}"
title += f": {description}"
if stars:
title += f" [{stars}*]"
date = _parse_date(repo.get("last_activity_at", ""))
@@ -1320,7 +1337,7 @@ def _search_npm(keyword: str) -> list[dict]:
npm_url = links.get("npm", f"https://www.npmjs.com/package/{name}")
title = f"{name}@{version}" if version else name
if description:
title += f": {_truncate(description, 50)}"
title += f": {description}"
date = _parse_date(pkg.get("date", ""))
results.append({
"id": name, "title": title, "url": npm_url,
@@ -1356,7 +1373,7 @@ def _search_pypi(keyword: str) -> list[dict]:
pkg_name = title.split()[0] if title else ""
display = title
if desc:
display += f": {_truncate(desc, 50)}"
display += f": {desc}"
results.append({
"id": pkg_name or link,
"title": display,
@@ -1393,7 +1410,7 @@ def _search_dockerhub(keyword: str) -> list[dict]:
stars = item.get("star_count", 0)
title = name
if description:
title += f": {_truncate(description, 50)}"
title += f": {description}"
if stars:
title += f" [{stars}*]"
hub_url = (
@@ -1753,26 +1770,41 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None:
# Filter: only announce results that actually contain the keyword
# Check title/URL first, then fall back to og:title/og:description
kw_lower = keyword.lower()
# Collect URLs that need OG enrichment (batch fetch)
urls_needing_og: set[str] = set()
for item in new_items:
title_l = item.get("title", "").lower()
url_l = item.get("url", "").lower()
if kw_lower in title_l or kw_lower in url_l:
# Title/URL match -- only need OG for date enrichment
if not item.get("date") and item.get("url"):
urls_needing_og.add(item["url"])
elif item.get("url"):
# No title/URL match -- need OG for keyword fallback
urls_needing_og.add(item["url"])
og_cache: dict[str, tuple[str, str, str]] = {}
if urls_needing_og:
og_cache = await loop.run_in_executor(
None, _fetch_og_batch, list(urls_needing_og),
)
matched = []
for item in new_items:
title_l = item.get("title", "").lower()
url_l = item.get("url", "").lower()
if kw_lower in title_l or kw_lower in url_l:
# Fetch OG tags for date if backend didn't provide one
if not item.get("date") and item.get("url"):
_, _, og_date = await loop.run_in_executor(
None, _fetch_og, item["url"],
)
_, _, og_date = og_cache.get(item["url"], ("", "", ""))
if og_date:
item["date"] = og_date
matched.append(item)
continue
# Fetch OG tags for items that didn't match on title/URL
# Check OG tags for keyword match
item_url = item.get("url", "")
if item_url:
og_title, og_desc, og_date = await loop.run_in_executor(
None, _fetch_og, item_url,
)
og_title, og_desc, og_date = og_cache.get(item_url, ("", "", ""))
if (kw_lower in og_title.lower()
or kw_lower in og_desc.lower()):
if og_title and len(og_title) > len(item.get("title", "")):
@@ -1803,15 +1835,15 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None:
short_id = _save_result(
channel, name, tag, item, short_url=short_url,
)
title = _truncate(item["title"]) if item["title"] else "(no title)"
title = item["title"] or "(no title)"
date = item.get("date", "")
line = f"[{name}/{tag}/{short_id}]"
meta = f"[{name}/{tag}/{short_id}]"
if date:
line += f" ({date})"
line += f" {title}"
meta += f" {date}"
if display_url:
line += f" -- {display_url}"
await bot.send(channel, line)
meta += f" - {display_url}"
await bot.action(channel, meta)
await bot.send(channel, title)
for item in new_items:
seen_list.append(item["id"])
@@ -1980,6 +2012,7 @@ async def cmd_alert(bot, message):
return
loop = asyncio.get_running_loop()
fp = bot.registry._modules.get("flaskpaste")
history_lines = []
for row_id, backend, title, url, date, found_at, short_id, short_url in reversed(rows):
ts = found_at[:10]
title = _truncate(title) if title else "(no title)"
@@ -2001,7 +2034,8 @@ async def cmd_alert(bot, message):
line = f"[{name}/{backend}/{short_id}] ({date or ts}) {title}"
if display_url:
line += f" -- {display_url}"
await bot.reply(message, line)
history_lines.append(line)
await bot.long_reply(message, history_lines, label="history")
return
# -- info (any user, channel only) ---------------------------------------

View File

@@ -182,6 +182,4 @@ async def cmd_cert(bot, message):
await bot.reply(message, f"Querying crt.sh for {len(domains)} domain(s)...")
results = await asyncio.gather(*[analyze_domain(d) for d in domains])
for line in results:
await bot.reply(message, line)
await bot.long_reply(message, list(results), label="certs")

View File

@@ -67,8 +67,9 @@ async def cmd_dork(bot, message):
subcmd = parts[1].lower()
if subcmd == "list":
lines = [f" {k:<10} {desc}" for k, (_, desc) in sorted(_DORKS.items())]
await bot.reply(message, "Dork categories:\n" + "\n".join(lines))
lines = ["Dork categories:"]
lines.extend(f" {k:<10} {desc}" for k, (_, desc) in sorted(_DORKS.items()))
await bot.long_reply(message, lines, label="dork categories")
return
if len(parts) < 3:

View File

@@ -158,10 +158,10 @@ async def cmd_exploitdb(bot, message):
if not matches:
await bot.reply(message, f"No exploits matching '{term}'")
return
for entry in matches[:_MAX_RESULTS]:
await bot.reply(message, _format_entry(entry))
lines = [_format_entry(e) for e in matches[:_MAX_RESULTS]]
if len(matches) > _MAX_RESULTS:
await bot.reply(message, f"({len(matches)} total, showing {_MAX_RESULTS})")
lines.append(f"({len(matches)} total, showing {_MAX_RESULTS})")
await bot.long_reply(message, lines, label="exploits")
return
if sub.lower() == "cve":
@@ -177,10 +177,10 @@ async def cmd_exploitdb(bot, message):
if not matches:
await bot.reply(message, f"No exploits for {cve_id}")
return
for entry in matches[:_MAX_RESULTS]:
await bot.reply(message, _format_entry(entry))
lines = [_format_entry(e) for e in matches[:_MAX_RESULTS]]
if len(matches) > _MAX_RESULTS:
await bot.reply(message, f"({len(matches)} total, showing {_MAX_RESULTS})")
lines.append(f"({len(matches)} total, showing {_MAX_RESULTS})")
await bot.long_reply(message, lines, label="exploits")
return
# Direct ID lookup
@@ -209,7 +209,7 @@ async def cmd_exploitdb(bot, message):
if not matches:
await bot.reply(message, f"No exploits matching '{term}'")
return
for entry in matches[:_MAX_RESULTS]:
await bot.reply(message, _format_entry(entry))
lines = [_format_entry(e) for e in matches[:_MAX_RESULTS]]
if len(matches) > _MAX_RESULTS:
await bot.reply(message, f"({len(matches)} total, showing {_MAX_RESULTS})")
lines.append(f"({len(matches)} total, showing {_MAX_RESULTS})")
await bot.long_reply(message, lines, label="exploits")

View File

@@ -34,14 +34,23 @@ def _has_client_cert() -> bool:
return (_CERT_DIR / "derp.crt").exists() and (_CERT_DIR / "derp.key").exists()
_cached_ssl_ctx: ssl.SSLContext | None = None
def _ssl_context() -> ssl.SSLContext:
"""Build SSL context, loading client cert for mTLS if available."""
ctx = ssl.create_default_context()
cert_path = _CERT_DIR / "derp.crt"
key_path = _CERT_DIR / "derp.key"
if cert_path.exists() and key_path.exists():
ctx.load_cert_chain(str(cert_path), str(key_path))
return ctx
"""Build SSL context, loading client cert for mTLS if available.
Cached at module level -- cert files are static at runtime.
"""
global _cached_ssl_ctx
if _cached_ssl_ctx is None:
ctx = ssl.create_default_context()
cert_path = _CERT_DIR / "derp.crt"
key_path = _CERT_DIR / "derp.key"
if cert_path.exists() and key_path.exists():
ctx.load_cert_chain(str(cert_path), str(key_path))
_cached_ssl_ctx = ctx
return _cached_ssl_ctx
def _solve_pow(nonce: str, difficulty: int) -> int:

520
plugins/pastemoni.py Normal file
View File

@@ -0,0 +1,520 @@
"""Plugin: paste site keyword monitor for Pastebin and GitHub Gists."""
from __future__ import annotations
import asyncio
import json
import logging
import re
import urllib.request
from datetime import datetime, timezone
from html.parser import HTMLParser
from derp.http import urlopen as _urlopen
from derp.plugin import command, event
_log = logging.getLogger(__name__)
# -- Constants ---------------------------------------------------------------
_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,19}$")
_MAX_SEEN = 200
_MAX_ANNOUNCE = 5
_DEFAULT_INTERVAL = 300
_MAX_INTERVAL = 3600
_FETCH_TIMEOUT = 15
_USER_AGENT = "derp-bot/1.0 (IRC paste monitor)"
_MAX_MONITORS = 20
_MAX_SNIPPET_LEN = 80
_MAX_TITLE_LEN = 60
# -- Module-level tracking ---------------------------------------------------
_pollers: dict[str, asyncio.Task] = {}
_monitors: dict[str, dict] = {}
_errors: dict[str, int] = {}
# -- Pure helpers ------------------------------------------------------------
def _state_key(channel: str, name: str) -> str:
"""Build composite state key."""
return f"{channel}:{name}"
def _validate_name(name: str) -> bool:
"""Check name against allowed pattern."""
return bool(_NAME_RE.match(name))
def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str:
"""Truncate text with ellipsis if needed."""
if len(text) <= max_len:
return text
return text[: max_len - 3].rstrip() + "..."
def _snippet_around(text: str, keyword: str, max_len: int = _MAX_SNIPPET_LEN) -> str:
"""Extract snippet centered around keyword match."""
if not text:
return ""
text = " ".join(text.split()) # collapse whitespace
if len(text) <= max_len:
return text
idx = text.lower().find(keyword.lower())
if idx < 0:
return text[: max_len - 3] + "..."
start = max(0, idx - max_len // 3)
end = min(len(text), start + max_len)
snippet = text[start:end]
if start > 0:
snippet = "..." + snippet
if end < len(text):
snippet = snippet + "..."
return snippet
# -- State helpers -----------------------------------------------------------
def _save(bot, key: str, data: dict) -> None:
"""Persist monitor data to bot.state."""
bot.state.set("pastemoni", key, json.dumps(data))
def _load(bot, key: str) -> dict | None:
"""Load monitor data from bot.state."""
raw = bot.state.get("pastemoni", key)
if raw is None:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return None
def _delete(bot, key: str) -> None:
"""Remove monitor data from bot.state."""
bot.state.delete("pastemoni", key)
# -- Pastebin archive parser ------------------------------------------------
class _ArchiveParser(HTMLParser):
"""Extract paste links from Pastebin archive HTML."""
def __init__(self):
super().__init__()
self.links: list[tuple[str, str]] = [] # (paste_id, title)
self._in_link = False
self._href = ""
self._title_parts: list[str] = []
def handle_starttag(self, tag, attrs):
if tag != "a":
return
attr_map = {k: (v or "") for k, v in attrs}
href = attr_map.get("href", "")
if re.match(r"^/[a-zA-Z0-9]{8}$", href):
self._in_link = True
self._href = href[1:] # strip leading /
self._title_parts = []
def handle_data(self, data):
if self._in_link:
self._title_parts.append(data)
def handle_endtag(self, tag):
if tag == "a" and self._in_link:
self._in_link = False
title = "".join(self._title_parts).strip()
if self._href:
self.links.append((self._href, title))
# -- Pastebin backend --------------------------------------------------------
def _fetch_pastebin(keyword: str) -> list[dict]:
"""Scrape Pastebin archive and filter by keyword. Blocking."""
req = urllib.request.Request("https://pastebin.com/archive", method="GET")
req.add_header("User-Agent", _USER_AGENT)
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
raw = resp.read()
resp.close()
html = raw.decode("utf-8", errors="replace")
parser = _ArchiveParser()
parser.feed(html)
kw_lower = keyword.lower()
results: list[dict] = []
for paste_id, title in parser.links[:30]:
# Check title first (avoids raw fetch)
if kw_lower in title.lower():
results.append({
"id": paste_id,
"title": _truncate(title, _MAX_TITLE_LEN),
"url": f"https://pastebin.com/{paste_id}",
"snippet": "",
})
continue
# Fetch raw content and check
try:
raw_req = urllib.request.Request(
f"https://pastebin.com/raw/{paste_id}", method="GET",
)
raw_req.add_header("User-Agent", _USER_AGENT)
raw_resp = _urlopen(raw_req, timeout=_FETCH_TIMEOUT)
content = raw_resp.read().decode("utf-8", errors="replace")
raw_resp.close()
except Exception:
continue
if kw_lower in content.lower():
results.append({
"id": paste_id,
"title": _truncate(title or "(untitled)", _MAX_TITLE_LEN),
"url": f"https://pastebin.com/{paste_id}",
"snippet": _snippet_around(content, keyword),
})
return results
# -- GitHub Gists backend ----------------------------------------------------
def _fetch_gists(keyword: str) -> list[dict]:
"""Query GitHub public gists and filter by keyword. Blocking."""
req = urllib.request.Request(
"https://api.github.com/gists/public?per_page=30", method="GET",
)
req.add_header("User-Agent", _USER_AGENT)
req.add_header("Accept", "application/vnd.github+json")
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
raw = resp.read()
resp.close()
gists = json.loads(raw)
kw_lower = keyword.lower()
results: list[dict] = []
for gist in gists if isinstance(gists, list) else []:
gist_id = gist.get("id", "")
if not gist_id:
continue
description = gist.get("description") or ""
html_url = gist.get("html_url", "")
files = gist.get("files") or {}
filenames = " ".join(files.keys())
searchable = f"{description} {filenames}"
if kw_lower not in searchable.lower():
continue
source = description or filenames
title = _truncate(source or "(no description)", _MAX_TITLE_LEN)
snippet = _snippet_around(source, keyword) if len(source) > _MAX_TITLE_LEN else ""
results.append({
"id": gist_id,
"title": title,
"url": html_url,
"snippet": snippet,
})
return results
# -- Backend registry -------------------------------------------------------
_BACKENDS: dict[str, callable] = {
"pb": _fetch_pastebin,
"gh": _fetch_gists,
}
# -- Polling -----------------------------------------------------------------
async def _poll_once(bot, key: str, announce: bool = True) -> None:
"""Single poll cycle for one monitor (all backends)."""
data = _monitors.get(key)
if data is None:
data = _load(bot, key)
if data is None:
return
_monitors[key] = data
keyword = data["keyword"]
now = datetime.now(timezone.utc).isoformat()
data["last_poll"] = now
loop = asyncio.get_running_loop()
had_success = False
for tag, backend in _BACKENDS.items():
try:
items = await loop.run_in_executor(None, backend, keyword)
except Exception as exc:
_log.debug("pastemoni %s/%s error: %s", key, tag, exc)
data.setdefault("last_errors", {})[tag] = str(exc)
continue
had_success = True
data.setdefault("last_errors", {}).pop(tag, None)
seen_set = set(data.get("seen", {}).get(tag, []))
seen_list = list(data.get("seen", {}).get(tag, []))
new_items = [item for item in items if item["id"] not in seen_set]
if announce and new_items:
channel = data["channel"]
shown = new_items[:_MAX_ANNOUNCE]
for item in shown:
title = item.get("title") or "(untitled)"
snippet = item.get("snippet", "")
url = item.get("url", "")
parts = [f"[{tag}] {title}"]
if snippet:
parts.append(snippet)
if url:
parts.append(url)
await bot.send(channel, " -- ".join(parts))
remaining = len(new_items) - len(shown)
if remaining > 0:
await bot.send(channel, f"[{tag}] ... and {remaining} more")
for item in new_items:
seen_list.append(item["id"])
if len(seen_list) > _MAX_SEEN:
seen_list = seen_list[-_MAX_SEEN:]
data.setdefault("seen", {})[tag] = seen_list
if had_success:
_errors[key] = 0
else:
_errors[key] = _errors.get(key, 0) + 1
_monitors[key] = data
_save(bot, key, data)
async def _poll_loop(bot, key: str) -> None:
"""Infinite poll loop for one monitor."""
try:
while True:
data = _monitors.get(key) or _load(bot, key)
if data is None:
return
interval = data.get("interval", _DEFAULT_INTERVAL)
errs = _errors.get(key, 0)
if errs >= 5:
interval = min(interval * 2, _MAX_INTERVAL)
await asyncio.sleep(interval)
await _poll_once(bot, key, announce=True)
except asyncio.CancelledError:
pass
def _start_poller(bot, key: str) -> None:
"""Create and track a poller task."""
existing = _pollers.get(key)
if existing and not existing.done():
return
task = asyncio.create_task(_poll_loop(bot, key))
_pollers[key] = task
def _stop_poller(key: str) -> None:
"""Cancel and remove a poller task."""
task = _pollers.pop(key, None)
if task and not task.done():
task.cancel()
_monitors.pop(key, None)
_errors.pop(key, 0)
# -- Restore on connect -----------------------------------------------------
def _restore(bot) -> None:
"""Rebuild pollers from persisted state."""
for key in bot.state.keys("pastemoni"):
existing = _pollers.get(key)
if existing and not existing.done():
continue
data = _load(bot, key)
if data is None:
continue
_monitors[key] = data
_start_poller(bot, key)
@event("001")
async def on_connect(bot, message):
"""Restore paste monitor pollers on connect."""
_restore(bot)
# -- Command handler ---------------------------------------------------------
@command("pastemoni", help="Paste monitor: !pastemoni add|del|list|check")
async def cmd_pastemoni(bot, message):
"""Per-channel paste site keyword monitoring.
Usage:
!pastemoni add <name> <keyword> Add monitor (admin)
!pastemoni del <name> Remove monitor (admin)
!pastemoni list List monitors
!pastemoni check <name> Force-poll now
"""
parts = message.text.split(None, 3)
if len(parts) < 2:
await bot.reply(message, "Usage: !pastemoni <add|del|list|check> [args]")
return
sub = parts[1].lower()
# -- list ----------------------------------------------------------------
if sub == "list":
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
channel = message.target
prefix = f"{channel}:"
monitors = []
for key in bot.state.keys("pastemoni"):
if key.startswith(prefix):
data = _load(bot, key)
if data:
name = data["name"]
keyword = data.get("keyword", "")
errs = data.get("last_errors", {})
entry = f"{name} ({keyword})"
if errs:
entry += f" [{len(errs)} errors]"
monitors.append(entry)
if not monitors:
await bot.reply(message, "No monitors in this channel")
return
await bot.reply(message, f"Monitors: {', '.join(monitors)}")
return
# -- check ---------------------------------------------------------------
if sub == "check":
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 3:
await bot.reply(message, "Usage: !pastemoni check <name>")
return
name = parts[2].lower()
channel = message.target
key = _state_key(channel, name)
data = _load(bot, key)
if data is None:
await bot.reply(message, f"No monitor '{name}' in this channel")
return
_monitors[key] = data
await _poll_once(bot, key, announce=True)
data = _monitors.get(key, data)
errs = data.get("last_errors", {})
if errs:
tags = ", ".join(sorted(errs))
await bot.reply(message, f"{name}: errors on {tags}")
else:
await bot.reply(message, f"{name}: checked")
return
# -- add (admin) ---------------------------------------------------------
if sub == "add":
if not bot._is_admin(message):
await bot.reply(message, "Permission denied: add requires admin")
return
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 4:
await bot.reply(message, "Usage: !pastemoni add <name> <keyword>")
return
name = parts[2].lower()
keyword = parts[3]
if not _validate_name(name):
await bot.reply(
message,
"Invalid name (lowercase alphanumeric + hyphens, 1-20 chars)",
)
return
channel = message.target
key = _state_key(channel, name)
if _load(bot, key) is not None:
await bot.reply(
message, f"Monitor '{name}' already exists in this channel",
)
return
ch_prefix = f"{channel}:"
count = sum(
1 for k in bot.state.keys("pastemoni") if k.startswith(ch_prefix)
)
if count >= _MAX_MONITORS:
await bot.reply(message, f"Monitor limit reached ({_MAX_MONITORS})")
return
now = datetime.now(timezone.utc).isoformat()
data = {
"keyword": keyword,
"name": name,
"channel": channel,
"interval": _DEFAULT_INTERVAL,
"added_by": message.nick,
"added_at": now,
"last_poll": now,
"last_errors": {},
"seen": {},
}
_save(bot, key, data)
_monitors[key] = data
async def _seed():
await _poll_once(bot, key, announce=False)
_start_poller(bot, key)
asyncio.create_task(_seed())
await bot.reply(
message,
f"Monitor '{name}' added for: {keyword} (seeding in background)",
)
return
# -- del (admin) ---------------------------------------------------------
if sub == "del":
if not bot._is_admin(message):
await bot.reply(message, "Permission denied: del requires admin")
return
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 3:
await bot.reply(message, "Usage: !pastemoni del <name>")
return
name = parts[2].lower()
channel = message.target
key = _state_key(channel, name)
if _load(bot, key) is None:
await bot.reply(message, f"No monitor '{name}' in this channel")
return
_stop_poller(key)
_delete(bot, key)
await bot.reply(message, f"Removed '{name}'")
return
await bot.reply(message, "Usage: !pastemoni <add|del|list|check> [args]")

View File

@@ -153,8 +153,7 @@ async def cmd_subdomain(bot, message):
total = len(sorted_subs)
shown = sorted_subs[:_MAX_RESULTS]
for fqdn, ips in shown:
await bot.reply(message, f" {fqdn} -> {', '.join(ips)}")
lines = [f" {fqdn} -> {', '.join(ips)}" for fqdn, ips in shown]
suffix = f" ({total - _MAX_RESULTS} more)" if total > _MAX_RESULTS else ""
await bot.reply(message, f"{domain}: {total} subdomains found{suffix}")
lines.append(f"{domain}: {total} subdomains found{suffix}")
await bot.long_reply(message, lines, label="subdomains")

278
plugins/urltitle.py Normal file
View File

@@ -0,0 +1,278 @@
"""Plugin: automatic URL title preview for channel messages."""
from __future__ import annotations
import logging
import re
import time
import urllib.parse
import urllib.request
from html.parser import HTMLParser
from derp.http import urlopen as _urlopen
from derp.plugin import event
_log = logging.getLogger(__name__)
# -- Constants ---------------------------------------------------------------
_URL_RE = re.compile(r"https?://[^\s<>\"\x00-\x1f]{2,}", re.IGNORECASE)
_USER_AGENT = "Mozilla/5.0 (compatible; derp-bot)"
_FETCH_TIMEOUT = 10
_MAX_BYTES = 64 * 1024
_MAX_TITLE_LEN = 200
_MAX_DESC_LEN = 150
_MAX_URLS = 3
_COOLDOWN = 300 # seconds
_CACHE_MAX = 500
_SKIP_EXTS = frozenset({
".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg", ".ico", ".bmp",
".mp4", ".webm", ".mkv", ".avi", ".mov", ".flv",
".mp3", ".flac", ".ogg", ".wav", ".aac",
".pdf", ".zip", ".gz", ".tar", ".bz2", ".xz", ".7z", ".rar",
".exe", ".msi", ".deb", ".rpm", ".dmg", ".iso",
".apk", ".wasm", ".bin", ".img",
})
# Trailing punctuation to strip, but preserve balanced parens
_TRAIL_CHARS = set(".,;:!?)>]")
# -- Module-level state ------------------------------------------------------
_seen: dict[str, float] = {}
# -- HTML parser -------------------------------------------------------------
class _TitleParser(HTMLParser):
"""Extract page title and description from HTML head."""
def __init__(self):
super().__init__()
self.og_title = ""
self.og_description = ""
self.title = ""
self.meta_description = ""
self._in_title = False
self._title_parts: list[str] = []
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
if tag == "meta":
attr_map = {k.lower(): (v or "") for k, v in attrs}
prop = attr_map.get("property", "").lower()
name = attr_map.get("name", "").lower()
content = attr_map.get("content", "")
if prop == "og:title":
self.og_title = content
elif prop == "og:description":
self.og_description = content
elif name == "description" and not self.meta_description:
self.meta_description = content
elif tag == "title":
self._in_title = True
self._title_parts = []
def handle_data(self, data: str) -> None:
if self._in_title:
self._title_parts.append(data)
def handle_endtag(self, tag: str) -> None:
if tag == "title" and self._in_title:
self._in_title = False
self.title = " ".join("".join(self._title_parts).split())
@property
def best_title(self) -> str:
return self.og_title or self.title
@property
def best_description(self) -> str:
return self.og_description or self.meta_description
# -- URL helpers -------------------------------------------------------------
def _clean_url(raw: str) -> str:
"""Strip trailing punctuation while preserving balanced parentheses."""
url = raw
while url and url[-1] in _TRAIL_CHARS:
if url[-1] == ")" and url.count("(") > url.count(")") - 1:
break
url = url[:-1]
return url
def _extract_urls(text: str, max_urls: int = _MAX_URLS) -> list[str]:
"""Extract up to max_urls HTTP(S) URLs from text.
Skips URLs where the character immediately before 'http' is '!'
(suppression marker). Deduplicates while preserving order.
"""
urls: list[str] = []
seen: set[str] = set()
for m in _URL_RE.finditer(text):
start = m.start()
if start > 0 and text[start - 1] == "!":
continue
url = _clean_url(m.group())
if url not in seen:
seen.add(url)
urls.append(url)
if len(urls) >= max_urls:
break
return urls
def _is_ignored_url(url: str, ignore_hosts: set[str]) -> bool:
"""Check if a URL should be skipped (extension or host)."""
parsed = urllib.parse.urlparse(url)
path_lower = parsed.path.lower()
# Check file extension
for ext in _SKIP_EXTS:
if path_lower.endswith(ext):
return True
# Check ignored hosts
host = parsed.hostname or ""
if host in ignore_hosts:
return True
return False
def _truncate(text: str, max_len: int) -> str:
"""Truncate with ellipsis if needed."""
if len(text) <= max_len:
return text
return text[: max_len - 3].rstrip() + "..."
# -- Fetch logic -------------------------------------------------------------
def _fetch_title(url: str) -> tuple[str, str]:
"""Fetch page title and description for a URL.
Uses HEAD-then-GET: HEAD checks Content-Type cheaply, GET fetches
the body. Both go through the SOCKS5 connection pool.
Returns (title, description). Empty strings on failure.
"""
# 1. HEAD to check Content-Type
try:
req = urllib.request.Request(url, method="HEAD")
req.add_header("User-Agent", _USER_AGENT)
resp = _urlopen(req, timeout=_FETCH_TIMEOUT, retries=1)
ct = (resp.headers.get("Content-Type") or "").lower()
resp.close()
if ct and "html" not in ct and "xhtml" not in ct:
return "", ""
except Exception:
pass # HEAD unsupported -- fall through to GET
# 2. GET body (reuses pooled connection to same host)
try:
req = urllib.request.Request(url, method="GET")
req.add_header("User-Agent", _USER_AGENT)
resp = _urlopen(req, timeout=_FETCH_TIMEOUT, retries=1)
ct = (resp.headers.get("Content-Type") or "").lower()
if ct and "html" not in ct and "xhtml" not in ct:
resp.close()
return "", ""
raw = resp.read(_MAX_BYTES)
resp.close()
except Exception as exc:
_log.debug("GET failed for %s: %s", url, exc)
return "", ""
# 3. Parse
html = raw.decode("utf-8", errors="replace")
parser = _TitleParser()
try:
parser.feed(html)
except Exception:
pass
return parser.best_title, parser.best_description
# -- Cooldown ----------------------------------------------------------------
def _check_cooldown(url: str, cooldown: int) -> bool:
"""Return True if the URL is within the cooldown window."""
now = time.monotonic()
last = _seen.get(url)
if last is not None and (now - last) < cooldown:
return True
# Prune if cache is too large
if len(_seen) >= _CACHE_MAX:
cutoff = now - cooldown
stale = [k for k, v in _seen.items() if v < cutoff]
for k in stale:
del _seen[k]
_seen[url] = now
return False
# -- Event handler -----------------------------------------------------------
@event("PRIVMSG")
async def on_privmsg(bot, message):
"""Preview URLs posted in channel messages."""
import asyncio
# Skip non-channel, bot's own messages, and command messages
if not message.is_channel:
return
if message.nick == bot.nick:
return
text = message.text or ""
if text.startswith(bot.prefix):
return
# Read config
cfg = bot.config.get("urltitle", {})
cooldown = cfg.get("cooldown", _COOLDOWN)
max_urls = cfg.get("max_urls", _MAX_URLS)
extra_ignore = set(cfg.get("ignore_hosts", []))
# Build ignore set: FlaskPaste host + config-specified hosts
ignore_hosts = set(extra_ignore)
fp_url = bot.config.get("flaskpaste", {}).get("url", "")
if fp_url:
fp_host = urllib.parse.urlparse(fp_url).hostname
if fp_host:
ignore_hosts.add(fp_host)
urls = _extract_urls(text, max_urls)
if not urls:
return
channel = message.target
loop = asyncio.get_running_loop()
for url in urls:
if _is_ignored_url(url, ignore_hosts):
continue
if _check_cooldown(url, cooldown):
continue
title, desc = await loop.run_in_executor(None, _fetch_title, url)
if not title:
continue
title = _truncate(title, _MAX_TITLE_LEN)
if desc:
desc = _truncate(desc, _MAX_DESC_LEN)
line = f"\u21b3 {title} -- {desc}"
else:
line = f"\u21b3 {title}"
await bot.send(channel, line)

View File

@@ -11,6 +11,7 @@ license = "MIT"
dependencies = [
"maxminddb>=2.0",
"PySocks>=1.7.1",
"urllib3[socks]>=2.0",
]
[project.scripts]

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
maxminddb>=2.0
PySocks>=1.7.1
urllib3[socks]>=2.0

View File

@@ -460,6 +460,52 @@ class Bot:
if target:
await self.send(target, text)
async def long_reply(
self, msg: Message, lines: list[str], *,
label: str = "",
) -> None:
"""Reply with a list of lines; paste overflow to FlaskPaste.
If len(lines) <= paste_threshold, sends each line via send().
If len(lines) > paste_threshold, creates a paste with all lines
and sends a preview (first 2 lines) + paste URL.
Falls back to sending all lines if FlaskPaste is unavailable.
"""
threshold = self.config["bot"].get("paste_threshold", 4)
target = msg.target if msg.is_channel else msg.nick
if not lines or not target:
return
if len(lines) <= threshold:
for line in lines:
await self.send(target, line)
return
# Attempt paste overflow
fp = self.registry._modules.get("flaskpaste")
paste_url = None
if fp:
full_text = "\n".join(lines)
loop = asyncio.get_running_loop()
paste_url = await loop.run_in_executor(
None, fp.create_paste, self, full_text,
)
if paste_url:
preview_count = min(2, threshold - 1)
for line in lines[:preview_count]:
await self.send(target, line)
remaining = len(lines) - preview_count
suffix = f" ({label})" if label else ""
await self.send(
target,
f"... {remaining} more lines{suffix}: {paste_url}",
)
else:
for line in lines:
await self.send(target, line)
async def action(self, target: str, text: str) -> None:
"""Send a CTCP ACTION (/me) to a target."""
await self.send(target, f"\x01ACTION {text}\x01")

View File

@@ -78,8 +78,8 @@ def _shutdown(bot: Bot) -> None:
asyncio.get_running_loop().create_task(bot.conn.close())
def _dump_tracemalloc(log: logging.Logger, limit: int = 25) -> None:
"""Log top memory allocations from tracemalloc snapshot."""
def _dump_tracemalloc(log: logging.Logger, path: str, limit: int = 25) -> None:
"""Dump top memory allocations to a file and log summary."""
import tracemalloc
snapshot = tracemalloc.take_snapshot()
@@ -90,9 +90,16 @@ def _dump_tracemalloc(log: logging.Logger, limit: int = 25) -> None:
])
stats = snapshot.statistics("traceback")
total = sum(s.size for s in stats)
log.info("tracemalloc top %d (total tracked: %.1f KiB)", limit, total / 1024)
lines = [f"tracemalloc top {limit} (total tracked: {total / 1024:.1f} KiB)\n"]
for i, stat in enumerate(stats[:limit], 1):
log.info("#%d %.1f KiB %s", i, stat.size / 1024, stat.traceback.format()[0])
frames = stat.traceback.format()
lines.append(f"#{i} {stat.size / 1024:.1f} KiB ({stat.count} blocks)")
for frame in frames:
lines.append(f" {frame}")
lines.append("")
with open(path, "w") as f:
f.write("\n".join(lines))
log.info("tracemalloc saved to %s (%.1f KiB tracked)", path, total / 1024)
def main(argv: list[str] | None = None) -> int:
@@ -134,7 +141,7 @@ def main(argv: list[str] | None = None) -> int:
_run(bot)
if args.tracemalloc:
_dump_tracemalloc(log)
_dump_tracemalloc(log, "data/derp.malloc")
return 0

View File

@@ -27,6 +27,7 @@ DEFAULTS: dict = {
"plugins_dir": "plugins",
"rate_limit": 2.0,
"rate_burst": 5,
"paste_threshold": 4,
"admins": [],
},
"channels": {},

View File

@@ -5,19 +5,50 @@ import logging
import socket
import ssl
import time
import urllib.error
import urllib.request
import socks
import urllib3
from socks import SOCKS5
from sockshandler import SocksiPyConnectionS, SocksiPyHandler
from urllib3.contrib.socks import SOCKSProxyManager
_PROXY_ADDR = "127.0.0.1"
_PROXY_PORT = 1080
_MAX_RETRIES = 3
_RETRY_ERRORS = (ssl.SSLError, ConnectionError, TimeoutError, OSError)
_RETRY_ERRORS = (
ssl.SSLError, ConnectionError, TimeoutError, OSError,
urllib3.exceptions.HTTPError,
)
_log = logging.getLogger(__name__)
# -- Connection pool (urllib3) ------------------------------------------------
_pool: SOCKSProxyManager | None = None
# Allow redirects but no urllib3-level retries (we retry ourselves).
_POOL_RETRIES = urllib3.Retry(
total=10, connect=0, read=0, redirect=10, status=0, other=0,
)
def _get_pool() -> SOCKSProxyManager:
"""Lazy-init the SOCKS5 connection pool."""
global _pool
if _pool is None:
_pool = SOCKSProxyManager(
f"socks5h://{_PROXY_ADDR}:{_PROXY_PORT}/",
num_pools=20,
maxsize=4,
retries=_POOL_RETRIES,
)
return _pool
# -- Legacy opener (for build_opener / context= callers) ---------------------
_default_opener: urllib.request.OpenerDirector | None = None
@@ -52,12 +83,66 @@ class _ProxyHandler(SocksiPyHandler, urllib.request.HTTPSHandler):
return self.do_open(build, req)
# -- Public HTTP interface ---------------------------------------------------
def urlopen(req, *, timeout=None, context=None, retries=None):
"""Proxy-aware drop-in for urllib.request.urlopen.
Uses connection pooling via urllib3 for default requests.
Falls back to legacy opener for custom SSL context.
Retries on transient SSL/connection errors with exponential backoff.
"""
max_retries = retries if retries is not None else _MAX_RETRIES
# Custom SSL context -> fall back to opener (rare: username.py only)
if context is not None:
return _urlopen_legacy(req, timeout=timeout, context=context, retries=max_retries)
# Default path: pooled urllib3
pool = _get_pool()
if isinstance(req, str):
url, headers, body, method = req, {}, None, "GET"
else:
url = req.full_url
headers = dict(req.header_items())
body = req.data
method = req.get_method()
to = urllib3.Timeout(total=timeout) if timeout else urllib3.Timeout(total=30)
for attempt in range(max_retries):
try:
resp = pool.request(
method, url,
headers=headers,
body=body,
timeout=to,
preload_content=False,
)
if resp.status >= 400:
# Drain body so connection returns to pool, then raise
# urllib.error.HTTPError for backward compatibility.
resp.read()
raise urllib.error.HTTPError(
url, resp.status, resp.reason or "",
resp.headers, None,
)
return resp
except urllib.error.HTTPError:
raise
except _RETRY_ERRORS as exc:
if attempt + 1 >= max_retries:
raise
delay = 2 ** attempt
_log.debug("urlopen retry %d/%d after %s: %s",
attempt + 1, max_retries, type(exc).__name__, exc)
time.sleep(delay)
def _urlopen_legacy(req, *, timeout=None, context=None, retries=None):
"""Open URL through legacy opener (custom SSL context)."""
max_retries = retries if retries is not None else _MAX_RETRIES
opener = _get_opener(context)
kwargs = {}
if timeout is not None:
@@ -82,6 +167,8 @@ def build_opener(*handlers, context=None):
return urllib.request.build_opener(proxy, *handlers)
# -- Raw TCP helpers (unchanged) ---------------------------------------------
def create_connection(address, *, timeout=None):
"""SOCKS5-proxied drop-in for socket.create_connection.

View File

@@ -18,7 +18,6 @@ sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.alert import ( # noqa: E402
_MAX_ANNOUNCE,
_MAX_SEEN,
_delete,
_errors,
@@ -153,18 +152,30 @@ class _FakeState:
return sorted(self._store.get(plugin, {}).keys())
class _FakeRegistry:
"""Minimal registry stand-in."""
def __init__(self):
self._modules: dict = {}
class _FakeBot:
"""Minimal bot stand-in that captures sent/replied messages."""
def __init__(self, *, admin: bool = False):
self.sent: list[tuple[str, str]] = []
self.actions: list[tuple[str, str]] = []
self.replied: list[str] = []
self.state = _FakeState()
self.registry = _FakeRegistry()
self._admin = admin
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
async def action(self, target: str, text: str) -> None:
self.actions.append((target, text))
async def reply(self, message, text: str) -> None:
self.replied.append(text)
@@ -199,21 +210,21 @@ def _clear() -> None:
def _fake_yt(keyword):
"""Fake YouTube backend returning two results."""
"""Fake YouTube backend returning two results (keyword in title)."""
return [
{"id": "yt1", "title": "YT Result 1",
{"id": "yt1", "title": "YT test Result 1",
"url": "https://www.youtube.com/watch?v=yt1", "extra": ""},
{"id": "yt2", "title": "YT Result 2",
{"id": "yt2", "title": "YT test Result 2",
"url": "https://www.youtube.com/watch?v=yt2", "extra": ""},
]
def _fake_tw(keyword):
"""Fake Twitch backend returning two results."""
"""Fake Twitch backend returning two results (keyword in title)."""
return [
{"id": "stream:tw1", "title": "TW Stream 1",
{"id": "stream:tw1", "title": "TW test Stream 1",
"url": "https://twitch.tv/user1", "extra": ""},
{"id": "vod:tw2", "title": "TW VOD 1",
{"id": "vod:tw2", "title": "TW test VOD 1",
"url": "https://twitch.tv/videos/tw2", "extra": ""},
]
@@ -229,11 +240,11 @@ def _fake_tw_error(keyword):
def _fake_sx(keyword):
"""Fake SearX backend returning two results."""
"""Fake SearX backend returning two results (keyword in title)."""
return [
{"id": "https://example.com/sx1", "title": "SX Result 1",
{"id": "https://example.com/sx1", "title": "SX test Result 1",
"url": "https://example.com/sx1", "extra": ""},
{"id": "https://example.com/sx2", "title": "SX Result 2",
{"id": "https://example.com/sx2", "title": "SX test Result 2",
"url": "https://example.com/sx2", "extra": ""},
]
@@ -370,7 +381,7 @@ class TestExtractVideos:
def close(self):
pass
with patch.object(_mod, "_urlopen", return_value=FakeResp()):
with patch("urllib.request.urlopen", return_value=FakeResp()):
results = _search_youtube("test")
assert len(results) == 1
assert results[0]["id"] == "dup1"
@@ -388,7 +399,7 @@ class TestSearchYoutube:
def close(self):
pass
with patch.object(_mod, "_urlopen", return_value=FakeResp()):
with patch("urllib.request.urlopen", return_value=FakeResp()):
results = _search_youtube("test query")
assert len(results) == 2
assert results[0]["id"] == "abc123"
@@ -396,7 +407,7 @@ class TestSearchYoutube:
def test_http_error_propagates(self):
import pytest
with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")):
with patch("urllib.request.urlopen", side_effect=ConnectionError("fail")):
with pytest.raises(ConnectionError):
_search_youtube("test")
@@ -529,26 +540,28 @@ class TestCmdAlertAdd:
bot = _FakeBot(admin=True)
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
with (
patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS),
patch.object(_mod, "_fetch_og_batch", return_value={}),
):
await cmd_alert(bot, _msg("!alert add mc-speed minecraft speedrun"))
await asyncio.sleep(0)
assert len(bot.replied) == 1
assert "Alert 'mc-speed' added" in bot.replied[0]
assert "minecraft speedrun" in bot.replied[0]
assert "2 yt" in bot.replied[0]
assert "2 tw" in bot.replied[0]
assert "2 sx" in bot.replied[0]
data = _load(bot, "#test:mc-speed")
assert data is not None
assert data["name"] == "mc-speed"
assert data["keyword"] == "minecraft speedrun"
assert data["channel"] == "#test"
assert len(data["seen"]["yt"]) == 2
assert len(data["seen"]["tw"]) == 2
assert len(data["seen"]["sx"]) == 2
assert "#test:mc-speed" in _pollers
_stop_poller("#test:mc-speed")
await asyncio.sleep(0)
# Allow background seeding task to complete (patches must stay active)
await asyncio.sleep(0.2)
assert len(bot.replied) == 1
assert "Alert 'mc-speed' added" in bot.replied[0]
assert "minecraft speedrun" in bot.replied[0]
data = _load(bot, "#test:mc-speed")
assert data is not None
assert data["name"] == "mc-speed"
assert data["keyword"] == "minecraft speedrun"
assert data["channel"] == "#test"
# Seeding happens in background; verify seen lists populated
assert len(data["seen"]["yt"]) == 2
assert len(data["seen"]["tw"]) == 2
assert len(data["seen"]["sx"]) == 2
assert "#test:mc-speed" in _pollers
_stop_poller("#test:mc-speed")
await asyncio.sleep(0)
asyncio.run(inner())
@@ -590,7 +603,7 @@ class TestCmdAlertAdd:
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_alert(bot, _msg("!alert add dupe some keyword"))
await asyncio.sleep(0)
await asyncio.sleep(0.1)
bot.replied.clear()
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_alert(bot, _msg("!alert add dupe other keyword"))
@@ -620,16 +633,20 @@ class TestCmdAlertAdd:
backends = {"yt": _fake_yt, "tw": _fake_tw_error, "sx": _fake_sx}
async def inner():
with patch.object(_mod, "_BACKENDS", backends):
with (
patch.object(_mod, "_BACKENDS", backends),
patch.object(_mod, "_fetch_og_batch", return_value={}),
):
await cmd_alert(bot, _msg("!alert add partial test keyword"))
await asyncio.sleep(0)
data = _load(bot, "#test:partial")
assert data is not None
assert len(data["seen"]["yt"]) == 2
assert len(data["seen"]["tw"]) == 0
assert len(data["seen"]["sx"]) == 2
_stop_poller("#test:partial")
await asyncio.sleep(0)
# Allow background seeding task to complete (patches must stay active)
await asyncio.sleep(0.2)
data = _load(bot, "#test:partial")
assert data is not None
assert len(data["seen"]["yt"]) == 2
assert len(data["seen"].get("tw", [])) == 0
assert len(data["seen"]["sx"]) == 2
_stop_poller("#test:partial")
await asyncio.sleep(0)
asyncio.run(inner())
@@ -646,7 +663,7 @@ class TestCmdAlertDel:
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_alert(bot, _msg("!alert add todel some keyword"))
await asyncio.sleep(0)
await asyncio.sleep(0.1)
bot.replied.clear()
await cmd_alert(bot, _msg("!alert del todel"))
assert "Removed 'todel'" in bot.replied[0]
@@ -713,10 +730,11 @@ class TestCmdAlertList:
bot = _FakeBot()
_save(bot, "#test:broken", {
"name": "broken", "channel": "#test", "keyword": "test",
"last_error": "Connection refused",
"last_errors": {"yt": "Connection refused"},
})
asyncio.run(cmd_alert(bot, _msg("!alert list")))
assert "broken (error)" in bot.replied[0]
assert "broken" in bot.replied[0]
assert "backend error" in bot.replied[0]
def test_list_requires_channel(self):
_clear()
@@ -809,10 +827,11 @@ class TestCmdAlertCheck:
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_alert(bot, _msg("!alert check news"))
# yt2 is new for yt, both tw and sx results are new
announcements = [s for t, s in bot.sent if t == "#test"]
yt_msgs = [m for m in announcements if "/yt]" in m]
tw_msgs = [m for m in announcements if "/tw]" in m]
sx_msgs = [m for m in announcements if "/sx]" in m]
# Metadata (with backend tags) goes to action(), titles to send()
actions = [s for t, s in bot.actions if t == "#test"]
yt_msgs = [m for m in actions if "/yt/" in m]
tw_msgs = [m for m in actions if "/tw/" in m]
sx_msgs = [m for m in actions if "/sx/" in m]
assert len(yt_msgs) == 1 # yt2 only
assert len(tw_msgs) == 2 # both tw results
assert len(sx_msgs) == 2 # both sx results
@@ -846,11 +865,14 @@ class TestPollOnce:
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await _poll_once(bot, key, announce=True)
messages = [s for t, s in bot.sent if t == "#test"]
assert len(messages) == 6 # 2 yt + 2 tw + 2 sx
assert "[poll/yt]" in messages[0]
assert "[poll/tw]" in messages[2]
assert "[poll/sx]" in messages[4]
# Titles go to send(), metadata goes to action()
titles = [s for t, s in bot.sent if t == "#test"]
actions = [s for t, s in bot.actions if t == "#test"]
assert len(titles) == 6 # 2 yt + 2 tw + 2 sx
assert len(actions) == 6
assert "[poll/yt/" in actions[0]
assert "[poll/tw/" in actions[2]
assert "[poll/sx/" in actions[4]
asyncio.run(inner())
@@ -877,36 +899,6 @@ class TestPollOnce:
asyncio.run(inner())
def test_max_announce_per_platform(self):
"""Only MAX_ANNOUNCE items per platform, then '... and N more'."""
_clear()
bot = _FakeBot()
def fake_many(keyword):
return [
{"id": f"v{i}", "title": f"Video {i}",
"url": f"https://example.com/{i}", "extra": ""}
for i in range(8)
]
data = {
"keyword": "test", "name": "many", "channel": "#test",
"interval": 300, "seen": {"yt": [], "tw": [], "sx": []},
"last_poll": "", "last_error": "",
}
key = "#test:many"
_save(bot, key, data)
_subscriptions[key] = data
async def inner():
with patch.object(_mod, "_BACKENDS", {"yt": fake_many, "tw": _fake_tw}):
await _poll_once(bot, key, announce=True)
yt_msgs = [s for t, s in bot.sent if t == "#test" and "/yt]" in s]
assert len(yt_msgs) == _MAX_ANNOUNCE + 1 # 5 items + "... and 3 more"
assert "... and 3 more" in yt_msgs[-1]
asyncio.run(inner())
def test_partial_backend_failure(self):
"""One backend fails, other still works. Error counter increments."""
_clear()
@@ -925,14 +917,14 @@ class TestPollOnce:
with patch.object(_mod, "_BACKENDS", backends):
await _poll_once(bot, key, announce=True)
# Twitch and SearX results should still be announced
tw_msgs = [s for t, s in bot.sent if t == "#test" and "/tw]" in s]
sx_msgs = [s for t, s in bot.sent if t == "#test" and "/sx]" in s]
tw_msgs = [s for t, s in bot.actions if t == "#test" and "/tw/" in s]
sx_msgs = [s for t, s in bot.actions if t == "#test" and "/sx/" in s]
assert len(tw_msgs) == 2
assert len(sx_msgs) == 2
# Error counter should be incremented
assert _errors[key] == 1
# Error counter should be incremented for yt backend
assert _errors[key]["yt"] == 1
updated = _load(bot, key)
assert "yt:" in updated["last_error"]
assert "yt" in updated.get("last_errors", {})
asyncio.run(inner())
@@ -1005,7 +997,7 @@ class TestPollOnce:
async def inner():
with patch.object(_mod, "_BACKENDS", backends):
await _poll_once(bot, key, announce=True)
assert _errors[key] == 1
assert all(v == 1 for v in _errors[key].values())
assert len(bot.sent) == 0
asyncio.run(inner())
@@ -1019,19 +1011,19 @@ class TestPollOnce:
"yt": ["yt1", "yt2"], "tw": ["stream:tw1", "vod:tw2"],
"sx": ["https://example.com/sx1", "https://example.com/sx2"],
},
"last_poll": "", "last_error": "old error",
"last_poll": "", "last_errors": {"yt": "old error"},
}
key = "#test:clrerr"
_save(bot, key, data)
_subscriptions[key] = data
_errors[key] = 3
_errors[key] = {"yt": 3, "tw": 3, "sx": 3}
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await _poll_once(bot, key, announce=True)
assert _errors[key] == 0
assert all(v == 0 for v in _errors[key].values())
updated = _load(bot, key)
assert updated["last_error"] == ""
assert updated.get("last_errors", {}) == {}
asyncio.run(inner())
@@ -1222,6 +1214,7 @@ class TestSearchSearx:
with patch("urllib.request.urlopen", return_value=FakeResp()):
results = _search_searx("test query")
# Same response served for all categories; deduped by URL
assert len(results) == 3
assert results[0]["id"] == "https://example.com/sx1"
assert results[0]["title"] == "SearX Result 1"
@@ -1241,9 +1234,8 @@ class TestSearchSearx:
results = _search_searx("nothing")
assert results == []
def test_http_error_propagates(self):
import pytest
def test_http_error_returns_empty(self):
"""SearXNG catches per-category errors; all failing returns empty."""
with patch("urllib.request.urlopen", side_effect=ConnectionError("fail")):
with pytest.raises(ConnectionError):
_search_searx("test")
results = _search_searx("test")
assert results == []

View File

@@ -31,6 +31,26 @@ class _FakeConn:
self.sent.append(raw)
class _FakeState:
"""In-memory stand-in for bot.state."""
def __init__(self):
self._store: dict[str, dict[str, str]] = {}
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
return self._store.get(plugin, {}).get(key, default)
def set(self, plugin: str, key: str, value: str) -> None:
self._store.setdefault(plugin, {})[key] = value
def delete(self, plugin: str, key: str) -> bool:
try:
del self._store[plugin][key]
return True
except KeyError:
return False
class _FakeBot:
"""Minimal bot stand-in."""
@@ -38,6 +58,7 @@ class _FakeBot:
self.joined: list[str] = []
self._admin = admin
self.conn = _FakeConn()
self.state = _FakeState()
def _is_admin(self, message) -> bool:
return self._admin

View File

@@ -1,6 +1,7 @@
"""Tests for the SOCKS5 proxy HTTP/TCP module."""
import ssl
import urllib.error
import urllib.request
from unittest.mock import MagicMock, patch
@@ -12,20 +13,46 @@ from derp.http import (
_PROXY_ADDR,
_PROXY_PORT,
_get_opener,
_get_pool,
_ProxyHandler,
build_opener,
create_connection,
urlopen,
)
@pytest.fixture(autouse=True)
def _reset_opener_cache():
"""Clear cached opener between tests."""
def _reset_caches():
"""Clear cached opener and pool between tests."""
derp.http._default_opener = None
derp.http._pool = None
yield
derp.http._default_opener = None
derp.http._pool = None
# -- Connection pool ---------------------------------------------------------
class TestConnectionPool:
def test_pool_lazy_init(self):
assert derp.http._pool is None
pool = _get_pool()
assert pool is not None
assert derp.http._pool is pool
def test_pool_cached(self):
a = _get_pool()
b = _get_pool()
assert a is b
def test_pool_is_socks_manager(self):
from urllib3.contrib.socks import SOCKSProxyManager
pool = _get_pool()
assert isinstance(pool, SOCKSProxyManager)
# -- Legacy opener -----------------------------------------------------------
class TestProxyHandler:
def test_uses_socks5(self):
handler = _ProxyHandler()
@@ -103,6 +130,106 @@ class TestOpenerCache:
assert a is not b
# -- urlopen (pooled path) --------------------------------------------------
class TestUrlopen:
@patch.object(derp.http, "_get_pool")
def test_extracts_request_fields(self, mock_pool_fn):
pool = MagicMock()
resp = MagicMock()
resp.status = 200
pool.request.return_value = resp
mock_pool_fn.return_value = pool
req = urllib.request.Request(
"https://example.com/test",
headers={"X-Custom": "val"},
method="POST",
)
req.data = b"body"
urlopen(req, timeout=10)
pool.request.assert_called_once()
call_kw = pool.request.call_args
assert call_kw[0][0] == "POST"
assert call_kw[0][1] == "https://example.com/test"
assert call_kw[1]["body"] == b"body"
@patch.object(derp.http, "_get_pool")
def test_string_url(self, mock_pool_fn):
pool = MagicMock()
resp = MagicMock()
resp.status = 200
pool.request.return_value = resp
mock_pool_fn.return_value = pool
urlopen("https://example.com/")
call_args = pool.request.call_args
assert call_args[0] == ("GET", "https://example.com/")
@patch.object(derp.http, "_get_pool")
def test_raises_http_error_on_4xx(self, mock_pool_fn):
pool = MagicMock()
resp = MagicMock()
resp.status = 404
resp.reason = "Not Found"
resp.headers = {}
resp.read.return_value = b""
pool.request.return_value = resp
mock_pool_fn.return_value = pool
with pytest.raises(urllib.error.HTTPError) as exc_info:
urlopen("https://example.com/missing")
assert exc_info.value.code == 404
@patch.object(derp.http, "_get_pool")
def test_raises_http_error_on_5xx(self, mock_pool_fn):
pool = MagicMock()
resp = MagicMock()
resp.status = 500
resp.reason = "Internal Server Error"
resp.headers = {}
resp.read.return_value = b""
pool.request.return_value = resp
mock_pool_fn.return_value = pool
with pytest.raises(urllib.error.HTTPError) as exc_info:
urlopen("https://example.com/error")
assert exc_info.value.code == 500
@patch.object(derp.http, "_get_pool")
def test_returns_response_on_2xx(self, mock_pool_fn):
pool = MagicMock()
resp = MagicMock()
resp.status = 200
pool.request.return_value = resp
mock_pool_fn.return_value = pool
result = urlopen("https://example.com/")
assert result is resp
@patch.object(derp.http, "_get_pool")
def test_context_falls_back_to_opener(self, mock_pool_fn):
"""Custom SSL context should use legacy opener, not pool."""
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with patch.object(derp.http, "_get_opener") as mock_opener_fn:
opener = MagicMock()
resp = MagicMock()
opener.open.return_value = resp
mock_opener_fn.return_value = opener
result = urlopen("https://example.com/", context=ctx)
mock_pool_fn.assert_not_called()
mock_opener_fn.assert_called_once_with(ctx)
assert result is resp
# -- create_connection -------------------------------------------------------
class TestCreateConnection:
@patch("derp.http.socks.socksocket")
def test_sets_socks5_proxy(self, mock_cls):

View File

@@ -227,8 +227,8 @@ class TestCommandDispatch:
replies = h.sent_privmsgs("#test")
assert len(replies) == 1
assert "Commands:" in replies[0]
assert "!ping" in replies[0]
assert "help" in replies[0]
assert "ping" in replies[0]
def test_unknown_command_ignored(self):
"""Unknown commands produce no reply."""

View File

@@ -0,0 +1,175 @@
"""Tests for Bot.long_reply() paste overflow behaviour."""
import asyncio
import types
from derp.bot import Bot
from derp.irc import Message
from derp.plugin import PluginRegistry
# -- Helpers -----------------------------------------------------------------
def _make_bot(*, paste_threshold: int = 4, flaskpaste_mod=None) -> Bot:
"""Build a Bot with minimal config and a captured send log."""
config = {
"server": {
"host": "localhost", "port": 6667, "tls": False,
"nick": "testbot", "user": "testbot", "realname": "test",
},
"bot": {
"prefix": "!",
"channels": ["#test"],
"plugins_dir": "plugins",
"rate_limit": 100.0,
"rate_burst": 100,
"paste_threshold": paste_threshold,
"admins": [],
},
}
registry = PluginRegistry()
if flaskpaste_mod is not None:
registry._modules["flaskpaste"] = flaskpaste_mod
bot = Bot(config, registry)
bot._sent: list[tuple[str, str]] = [] # type: ignore[attr-defined]
async def _capturing_send(target: str, text: str) -> None:
bot._sent.append((target, text))
bot.send = _capturing_send # type: ignore[assignment]
return bot
def _msg(text: str = "", target: str = "#test", nick: str = "alice") -> Message:
"""Create a channel PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
def _pm(text: str = "", nick: str = "alice") -> Message:
"""Create a private PRIVMSG (target = bot nick)."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=["testbot", text], tags={},
)
def _make_fp_mod(*, paste_url: str | None = "https://paste.example/abc"):
"""Build a fake flaskpaste module with create_paste()."""
mod = types.ModuleType("flaskpaste")
mod.create_paste = lambda bot, content: paste_url # type: ignore[attr-defined]
return mod
# -- Tests -------------------------------------------------------------------
class TestShortReply:
def test_sends_all(self):
"""Lines <= threshold are sent individually, no paste."""
bot = _make_bot(paste_threshold=4)
msg = _msg()
lines = ["line 1", "line 2", "line 3"]
asyncio.run(bot.long_reply(msg, lines))
assert len(bot._sent) == 3
assert bot._sent[0] == ("#test", "line 1")
assert bot._sent[1] == ("#test", "line 2")
assert bot._sent[2] == ("#test", "line 3")
class TestLongReply:
def test_creates_paste(self):
"""Lines > threshold creates paste, sends preview + URL."""
fp = _make_fp_mod(paste_url="https://paste.example/xyz")
bot = _make_bot(paste_threshold=3, flaskpaste_mod=fp)
msg = _msg()
lines = ["line 1", "line 2", "line 3", "line 4", "line 5"]
asyncio.run(bot.long_reply(msg, lines, label="results"))
# preview_count = min(2, threshold-1) = min(2, 2) = 2
assert len(bot._sent) == 3
assert bot._sent[0] == ("#test", "line 1")
assert bot._sent[1] == ("#test", "line 2")
assert "3 more lines" in bot._sent[2][1]
assert "(results)" in bot._sent[2][1]
assert "https://paste.example/xyz" in bot._sent[2][1]
def test_fallback_no_flaskpaste(self):
"""No flaskpaste module loaded -- falls back to sending all lines."""
bot = _make_bot(paste_threshold=2)
msg = _msg()
lines = ["a", "b", "c", "d"]
asyncio.run(bot.long_reply(msg, lines))
assert len(bot._sent) == 4
assert [t for _, t in bot._sent] == ["a", "b", "c", "d"]
def test_fallback_paste_fails(self):
"""create_paste returns None -- falls back to sending all lines."""
fp = _make_fp_mod(paste_url=None)
bot = _make_bot(paste_threshold=2, flaskpaste_mod=fp)
msg = _msg()
lines = ["a", "b", "c"]
asyncio.run(bot.long_reply(msg, lines))
assert len(bot._sent) == 3
assert [t for _, t in bot._sent] == ["a", "b", "c"]
def test_label_in_overflow_message(self):
"""Label appears in the overflow message."""
fp = _make_fp_mod()
bot = _make_bot(paste_threshold=2, flaskpaste_mod=fp)
msg = _msg()
lines = ["a", "b", "c"]
asyncio.run(bot.long_reply(msg, lines, label="history"))
overflow = bot._sent[-1][1]
assert "(history)" in overflow
def test_no_label(self):
"""Overflow message omits label suffix when label is empty."""
fp = _make_fp_mod()
bot = _make_bot(paste_threshold=2, flaskpaste_mod=fp)
msg = _msg()
lines = ["a", "b", "c"]
asyncio.run(bot.long_reply(msg, lines))
overflow = bot._sent[-1][1]
assert "more lines:" in overflow
assert "()" not in overflow
class TestThreshold:
def test_configurable(self):
"""Custom threshold from config controls overflow point."""
fp = _make_fp_mod()
bot = _make_bot(paste_threshold=10, flaskpaste_mod=fp)
msg = _msg()
# 10 lines == threshold -> no paste
lines_at = [f"line {i}" for i in range(10)]
asyncio.run(bot.long_reply(msg, lines_at))
assert len(bot._sent) == 10
def test_over_threshold_pastes(self):
"""Lines exceeding threshold triggers paste."""
fp = _make_fp_mod()
bot = _make_bot(paste_threshold=10, flaskpaste_mod=fp)
msg = _msg()
lines_over = [f"line {i}" for i in range(11)]
asyncio.run(bot.long_reply(msg, lines_over))
assert len(bot._sent) == 3 # 2 preview + overflow msg
class TestEdgeCases:
def test_empty_lines_noop(self):
"""Empty list produces no output."""
bot = _make_bot()
msg = _msg()
asyncio.run(bot.long_reply(msg, []))
assert bot._sent == []
def test_pm_uses_nick(self):
"""Private messages use nick as target."""
bot = _make_bot(paste_threshold=4)
msg = _pm()
lines = ["x", "y"]
asyncio.run(bot.long_reply(msg, lines))
assert len(bot._sent) == 2
assert bot._sent[0] == ("alice", "x")
assert bot._sent[1] == ("alice", "y")

1015
tests/test_pastemoni.py Normal file

File diff suppressed because it is too large Load Diff

477
tests/test_urltitle.py Normal file
View File

@@ -0,0 +1,477 @@
"""Tests for the URL title preview plugin."""
import asyncio
import importlib.util
import sys
import time
from pathlib import Path
from unittest.mock import patch
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.urltitle",
Path(__file__).resolve().parent.parent / "plugins" / "urltitle.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.urltitle import ( # noqa: E402, I001
_TitleParser,
_check_cooldown,
_clean_url,
_extract_urls,
_fetch_title,
_is_ignored_url,
_seen,
on_privmsg,
)
# -- Helpers -----------------------------------------------------------------
class _FakeBot:
"""Minimal bot stand-in that captures sent messages."""
def __init__(self):
self.sent: list[tuple[str, str]] = []
self.nick = "derp"
self.prefix = "!"
self.config = {
"flaskpaste": {"url": "https://paste.mymx.me"},
"urltitle": {},
}
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
"""Create a channel PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
def _pm(text: str, nick: str = "alice") -> Message:
"""Create a private PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=["derp", text], tags={},
)
class _FakeResp:
"""Fake HTTP response for mocking _urlopen."""
def __init__(self, data: bytes = b"", content_type: str = "text/html",
status: int = 200):
self._data = data
self.headers = {"Content-Type": content_type}
self.status = status
def read(self, n: int = -1) -> bytes:
if n == -1:
return self._data
return self._data[:n]
def close(self) -> None:
pass
# ---------------------------------------------------------------------------
# TestExtractUrls
# ---------------------------------------------------------------------------
class TestExtractUrls:
def test_single_url(self):
urls = _extract_urls("check https://example.com please")
assert urls == ["https://example.com"]
def test_multiple_urls(self):
urls = _extract_urls("see https://a.com and http://b.com ok")
assert urls == ["https://a.com", "http://b.com"]
def test_max_limit(self):
text = "https://a.com https://b.com https://c.com https://d.com"
urls = _extract_urls(text, max_urls=2)
assert len(urls) == 2
def test_trailing_punctuation(self):
urls = _extract_urls("visit https://example.com.")
assert urls == ["https://example.com"]
def test_trailing_comma(self):
urls = _extract_urls("https://example.com, check it")
assert urls == ["https://example.com"]
def test_balanced_parens(self):
urls = _extract_urls("https://en.wikipedia.org/wiki/Foo_(bar)")
assert urls == ["https://en.wikipedia.org/wiki/Foo_(bar)"]
def test_unbalanced_paren_stripped(self):
urls = _extract_urls("(https://example.com)")
assert urls == ["https://example.com"]
def test_suppressed_url(self):
urls = _extract_urls("!https://example.com")
assert urls == []
def test_suppressed_mixed(self):
urls = _extract_urls("!https://skip.com https://keep.com")
assert urls == ["https://keep.com"]
def test_no_urls(self):
urls = _extract_urls("no urls here")
assert urls == []
def test_dedup(self):
urls = _extract_urls("https://a.com https://a.com")
assert urls == ["https://a.com"]
# ---------------------------------------------------------------------------
# TestCleanUrl
# ---------------------------------------------------------------------------
class TestCleanUrl:
def test_no_trailing(self):
assert _clean_url("https://example.com") == "https://example.com"
def test_strip_period(self):
assert _clean_url("https://example.com.") == "https://example.com"
def test_strip_semicolon(self):
assert _clean_url("https://example.com;") == "https://example.com"
def test_preserve_balanced_parens(self):
url = "https://en.wikipedia.org/wiki/Foo_(bar)"
assert _clean_url(url) == url
def test_strip_trailing_paren_unbalanced(self):
assert _clean_url("https://example.com)") == "https://example.com"
def test_multiple_trailing(self):
assert _clean_url("https://example.com..;") == "https://example.com"
# ---------------------------------------------------------------------------
# TestTitleParser
# ---------------------------------------------------------------------------
class TestTitleParser:
def test_og_title_priority(self):
parser = _TitleParser()
parser.feed("""
<html><head>
<meta property="og:title" content="OG Title">
<title>Page Title</title>
</head></html>
""")
assert parser.best_title == "OG Title"
def test_title_fallback(self):
parser = _TitleParser()
parser.feed("<html><head><title>Fallback Title</title></head></html>")
assert parser.best_title == "Fallback Title"
def test_og_description(self):
parser = _TitleParser()
parser.feed("""
<meta property="og:description" content="OG Desc">
<meta name="description" content="Meta Desc">
""")
assert parser.best_description == "OG Desc"
def test_meta_description_fallback(self):
parser = _TitleParser()
parser.feed('<meta name="description" content="Meta Desc">')
assert parser.best_description == "Meta Desc"
def test_whitespace_collapse(self):
parser = _TitleParser()
parser.feed("<title> Hello World </title>")
assert parser.title == "Hello World"
def test_no_title(self):
parser = _TitleParser()
parser.feed("<html><body>No title here</body></html>")
assert parser.best_title == ""
def test_multipart_title(self):
parser = _TitleParser()
parser.feed("<title>Part 1 <em>Part 2</em> Part 3</title>")
# The parser collects text data; <em> triggers start/end but
# its text is still captured by handle_data
assert "Part 1" in parser.title
def test_empty_og_title(self):
parser = _TitleParser()
parser.feed("""
<meta property="og:title" content="">
<title>Real Title</title>
""")
assert parser.best_title == "Real Title"
# ---------------------------------------------------------------------------
# TestIsIgnoredUrl
# ---------------------------------------------------------------------------
class TestIsIgnoredUrl:
def test_paste_host(self):
assert _is_ignored_url(
"https://paste.mymx.me/abc", {"paste.mymx.me"},
) is True
def test_image_extension(self):
assert _is_ignored_url(
"https://example.com/photo.png", set(),
) is True
def test_pdf_extension(self):
assert _is_ignored_url(
"https://example.com/doc.pdf", set(),
) is True
def test_zip_extension(self):
assert _is_ignored_url(
"https://example.com/archive.zip", set(),
) is True
def test_normal_url_passes(self):
assert _is_ignored_url(
"https://example.com/page", set(),
) is False
def test_html_extension_passes(self):
assert _is_ignored_url(
"https://example.com/page.html", set(),
) is False
def test_custom_ignore_host(self):
assert _is_ignored_url(
"https://private.local/x", {"private.local"},
) is True
# ---------------------------------------------------------------------------
# TestFetchTitle
# ---------------------------------------------------------------------------
class TestFetchTitle:
def test_successful_html(self):
html = b"<html><head><title>Test Page</title></head></html>"
head_resp = _FakeResp(b"", content_type="text/html")
get_resp = _FakeResp(html, content_type="text/html; charset=utf-8")
calls = iter([head_resp, get_resp])
with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)):
title, desc = _fetch_title("https://example.com")
assert title == "Test Page"
def test_non_html_content_type_bails(self):
head_resp = _FakeResp(b"", content_type="application/json")
with patch.object(_mod, "_urlopen", return_value=head_resp):
title, desc = _fetch_title("https://example.com/api")
assert title == ""
assert desc == ""
def test_head_fail_falls_through_to_get(self):
html = b"<html><head><title>Recovered</title></head></html>"
get_resp = _FakeResp(html, content_type="text/html")
def side_effect(req, **kw):
if req.get_method() == "HEAD":
raise ConnectionError("HEAD not supported")
return get_resp
with patch.object(_mod, "_urlopen", side_effect=side_effect):
title, desc = _fetch_title("https://example.com")
assert title == "Recovered"
def test_network_error_returns_empty(self):
with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")):
title, desc = _fetch_title("https://example.com")
assert title == ""
assert desc == ""
def test_og_tags_extracted(self):
html = (
b'<html><head>'
b'<meta property="og:title" content="OG Title">'
b'<meta property="og:description" content="OG Desc">'
b'</head></html>'
)
head_resp = _FakeResp(b"", content_type="text/html")
get_resp = _FakeResp(html, content_type="text/html")
calls = iter([head_resp, get_resp])
with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)):
title, desc = _fetch_title("https://example.com")
assert title == "OG Title"
assert desc == "OG Desc"
def test_get_non_html_bails(self):
"""HEAD returns html but GET returns non-html (redirect to binary)."""
head_resp = _FakeResp(b"", content_type="text/html")
get_resp = _FakeResp(b"\x89PNG", content_type="image/png")
calls = iter([head_resp, get_resp])
with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)):
title, desc = _fetch_title("https://example.com/img")
assert title == ""
# ---------------------------------------------------------------------------
# TestCooldown
# ---------------------------------------------------------------------------
class TestCooldown:
def setup_method(self):
_seen.clear()
def test_first_access_not_cooled(self):
assert _check_cooldown("https://a.com", 300) is False
def test_second_access_within_window(self):
_check_cooldown("https://b.com", 300)
assert _check_cooldown("https://b.com", 300) is True
def test_after_cooldown_expires(self):
_seen["https://c.com"] = time.monotonic() - 400
assert _check_cooldown("https://c.com", 300) is False
def test_pruning(self):
"""Cache is pruned when it exceeds max size."""
old = time.monotonic() - 600
for i in range(600):
_seen[f"https://stale-{i}.com"] = old
_check_cooldown("https://new.com", 300)
assert len(_seen) < 600
# ---------------------------------------------------------------------------
# TestOnPrivmsg
# ---------------------------------------------------------------------------
class TestOnPrivmsg:
def setup_method(self):
_seen.clear()
def test_channel_url_previewed(self):
bot = _FakeBot()
html = b"<html><head><title>Example</title></head></html>"
head_resp = _FakeResp(b"", content_type="text/html")
get_resp = _FakeResp(html, content_type="text/html")
calls = iter([head_resp, get_resp])
def inner():
with patch.object(_mod, "_urlopen",
side_effect=lambda *a, **kw: next(calls)):
asyncio.run(on_privmsg(bot, _msg("check https://example.com")))
inner()
assert len(bot.sent) == 1
assert bot.sent[0][0] == "#test"
assert "\u21b3 Example" in bot.sent[0][1]
def test_pm_ignored(self):
bot = _FakeBot()
asyncio.run(on_privmsg(bot, _pm("https://example.com")))
assert len(bot.sent) == 0
def test_bot_nick_ignored(self):
bot = _FakeBot()
asyncio.run(on_privmsg(bot, _msg("https://example.com", nick="derp")))
assert len(bot.sent) == 0
def test_command_ignored(self):
bot = _FakeBot()
asyncio.run(on_privmsg(bot, _msg("!shorten https://example.com")))
assert len(bot.sent) == 0
def test_suppressed_url(self):
bot = _FakeBot()
asyncio.run(on_privmsg(bot, _msg("!https://example.com")))
assert len(bot.sent) == 0
def test_paste_host_ignored(self):
bot = _FakeBot()
asyncio.run(on_privmsg(
bot, _msg("https://paste.mymx.me/some-paste"),
))
assert len(bot.sent) == 0
def test_empty_title_skipped(self):
bot = _FakeBot()
html = b"<html><body>No title here</body></html>"
head_resp = _FakeResp(b"", content_type="text/html")
get_resp = _FakeResp(html, content_type="text/html")
calls = iter([head_resp, get_resp])
with patch.object(_mod, "_urlopen",
side_effect=lambda *a, **kw: next(calls)):
asyncio.run(on_privmsg(bot, _msg("https://notitle.com")))
assert len(bot.sent) == 0
def test_image_url_skipped(self):
bot = _FakeBot()
asyncio.run(on_privmsg(
bot, _msg("https://example.com/photo.png"),
))
assert len(bot.sent) == 0
def test_title_with_description(self):
bot = _FakeBot()
html = (
b'<html><head>'
b'<title>My Page</title>'
b'<meta name="description" content="A great page">'
b'</head></html>'
)
head_resp = _FakeResp(b"", content_type="text/html")
get_resp = _FakeResp(html, content_type="text/html")
calls = iter([head_resp, get_resp])
with patch.object(_mod, "_urlopen",
side_effect=lambda *a, **kw: next(calls)):
asyncio.run(on_privmsg(bot, _msg("https://example.com")))
assert len(bot.sent) == 1
assert "My Page -- A great page" in bot.sent[0][1]
def test_cooldown_prevents_repeat(self):
bot = _FakeBot()
html = b"<html><head><title>Example</title></head></html>"
def make_calls():
return iter([
_FakeResp(b"", content_type="text/html"),
_FakeResp(html, content_type="text/html"),
])
calls = make_calls()
with patch.object(_mod, "_urlopen",
side_effect=lambda *a, **kw: next(calls)):
asyncio.run(on_privmsg(bot, _msg("https://cooldown.com")))
assert len(bot.sent) == 1
bot.sent.clear()
# Same URL again -- should be suppressed by cooldown
calls = make_calls()
with patch.object(_mod, "_urlopen",
side_effect=lambda *a, **kw: next(calls)):
asyncio.run(on_privmsg(bot, _msg("https://cooldown.com")))
assert len(bot.sent) == 0