Compare commits

...

4 Commits

Author SHA1 Message Date
user
e3bb793574 feat: add canary, tcping, archive, resolve plugins
canary: generate realistic fake credentials (token/aws/basic) for
planting as canary tripwires. Per-channel state persistence.

tcping: TCP connect latency probe through SOCKS5 proxy with
min/avg/max reporting. Proxy-compatible alternative to traceroute.

archive: save URLs to Wayback Machine via Save Page Now API,
routed through SOCKS5 proxy.

resolve: bulk DNS resolution (up to 10 hosts) via TCP DNS through
SOCKS5 proxy with concurrent asyncio.gather.

83 new tests (1010 total), docs updated.
2026-02-20 19:38:10 +01:00
user
7c40a6b7f1 fix: switch youtube innertube to ANDROID client (WEB blocked)
YouTube's InnerTube /player endpoint now returns LOGIN_REQUIRED for the
WEB client. Switch to ANDROID client context which still returns full
videoDetails. Fixes missing duration in announcements and broken channel
resolution from video URLs.

Extract shared _innertube_player() helper to deduplicate payload
construction between _resolve_via_innertube and _fetch_duration.
2026-02-20 19:38:01 +01:00
user
3de3f054df feat: add internetdb plugin (Shodan InternetDB host recon)
Free, keyless API returning open ports, hostnames, CPEs, tags, and
known CVEs for any public IP. All requests routed through SOCKS5.
21 test cases (927 total).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 17:41:51 +01:00
user
442fea703c feat: replace MaxMind ASN with iptoasn.com TSV backend
Drop GeoLite2-ASN.mmdb dependency (required license key) in favor of
iptoasn.com ip2asn-v4.tsv (no auth, public domain).  Bisect-based
lookup in pure stdlib, downloaded via SOCKS5 in update-data.sh.
Adds 30 test cases covering load, lookup, and command handler.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 20:43:00 +01:00
18 changed files with 2472 additions and 82 deletions

View File

@@ -1,6 +1,38 @@
# derp - Tasks
## Current Sprint -- v1.2.7 Subscription Plugin Enrichment (2026-02-19)
## Current Sprint -- v1.3.0 Tier 2 Plugins (2026-02-20)
| Pri | Status | Task |
|-----|--------|------|
| P0 | [x] | Canary token generator (`plugins/canary.py`) -- gen/list/info/del |
| P0 | [x] | TCP ping (`plugins/tcping.py`) -- latency probe via SOCKS5 |
| P0 | [x] | Wayback archive (`plugins/archive.py`) -- Save Page Now via SOCKS5 |
| P0 | [x] | Bulk DNS resolve (`plugins/resolve.py`) -- concurrent TCP DNS via SOCKS5 |
| P1 | [x] | Tests for all 4 plugins |
| P2 | [x] | Documentation update (USAGE.md, CHEATSHEET.md) |
## Previous Sprint -- v1.2.9 InternetDB Plugin (2026-02-19)
| Pri | Status | Task |
|-----|--------|------|
| P0 | [x] | Shodan InternetDB plugin (`plugins/internetdb.py`) -- free, no API key |
| P0 | [x] | Fetch via SOCKS5 proxy (`derp.http.urlopen`) |
| P1 | [x] | Compact formatting: hostnames, ports, CPEs, tags, CVEs with truncation |
| P1 | [x] | Input validation: IPv4/IPv6, private/loopback rejection |
| P2 | [x] | Tests: fetch, format, command handler (21 cases, 927 total) |
| P2 | [x] | Documentation update (USAGE.md, CHEATSHEET.md) |
## Previous Sprint -- v1.2.8 ASN Backend Replacement (2026-02-19)
| Pri | Status | Task |
|-----|--------|------|
| P0 | [x] | Replace MaxMind ASN with iptoasn.com TSV backend (no license key) |
| P0 | [x] | Bisect-based lookup in `plugins/asn.py` (pure stdlib) |
| P1 | [x] | `update_asn()` in `scripts/update-data.sh` (SOCKS5 download) |
| P2 | [x] | Tests: load, lookup, command handler (30 cases, 906 total) |
| P2 | [x] | Documentation update (USAGE.md data directory layout) |
## Previous Sprint -- v1.2.7 Subscription Plugin Enrichment (2026-02-19)
| Pri | Status | Task |
|-----|--------|------|

71
TODO.md
View File

@@ -9,15 +9,72 @@
- [ ] Webhook listener (HTTP endpoint for push events to channels)
- [ ] Granular ACLs (per-command: trusted, operator, admin)
## LLM Bridge
Goal: let an LLM agent interact with the bot over IRC in real-time,
with full machine access (bash, file ops, etc.).
### Architecture
Owner addresses the bot on IRC. A bridge daemon reads addressed
messages, feeds them to an LLM with tool access, and writes replies
back through the bot. The bot already has `owner` config
(`[bot] owner` hostmask patterns) to gate who can trigger LLM
interactions.
```
IRC -> bot stdout (addressed msgs) -> bridge -> LLM API -> bridge -> bot inbox -> IRC
```
### Approach options (ranked)
1. **Claude Code Agent SDK** (clean, non-trivial)
- Custom Python agent using `anthropic` SDK with `tool_use`
- Define tools: bash exec, file read/write, web fetch
- Persistent conversation memory across messages
- Full control over the event loop -- real-time IRC is natural
- Tradeoff: must implement and maintain tool definitions
2. **Claude Code CLI per-message** (simple, stateless)
- `echo "user said X" | claude --print --allowedTools bash,read,write`
- Each invocation is a cold start with no conversation memory
- Simple to implement but slow startup, no multi-turn context
- Could pass conversation history via system prompt (fragile)
3. **Persistent Claude Code subprocess** (hack, fragile)
- Long-running `claude` process with stdin/stdout piped
- Keeps context across messages within a session
- Not designed for this -- output parsing is brittle
- Session may drift or hit context limits
### Bot-side plumbing needed
- `--llm` CLI flag: route logging to file, stdout for addressed msgs
- `_is_addressed()`: DM or nick-prefixed messages
- `_is_owner()`: only owner hostmasks trigger LLM routing
- Inbox file polling (`/tmp/llm-inbox`): bridge writes `<target> <msg>`
- `llm-send` script: line splitting (400 char), FlaskPaste overflow
- Stdout format: `HH:MM [#chan] <nick> text` / `HH:MM --- status`
- Only LLM-originated replies echoed to stdout (not all bot output)
### Previous attempt (reverted)
The `--llm` mode was implemented and tested (commit ea6f079, reverted
in 6f1f4b2). The stdout/stdin plumbing worked but Claude Code CLI
cannot act as a real-time daemon -- each tool call is a blocking
round-trip, making interactive IRC conversation impractical. The code
is preserved in git history for reference.
## Plugins -- Security/OSINT
- [ ] `emailcheck` -- SMTP VRFY/RCPT TO verification
- [ ] `canary` -- canary token generator/tracker
- [ ] `virustotal` -- hash/URL/IP/domain lookup (free API)
- [ ] `abuseipdb` -- IP abuse confidence scoring (free tier)
- [ ] `jwt` -- decode tokens, show claims/expiry, flag weaknesses
- [ ] `mac` -- OUI vendor lookup (local IEEE database)
- [ ] `pastemoni` -- monitor paste sites for keywords
- [x] `emailcheck` -- SMTP VRFY/RCPT TO verification
- [x] `canary` -- canary token generator/tracker
- [x] `virustotal` -- hash/URL/IP/domain lookup (free API)
- [x] `abuseipdb` -- IP abuse confidence scoring (free tier)
- [x] `jwt` -- decode tokens, show claims/expiry, flag weaknesses
- [x] `mac` -- OUI vendor lookup (local IEEE database)
- [x] `pastemoni` -- monitor paste sites for keywords
- [x] `internetdb` -- Shodan InternetDB host recon (free, no API key)
## Plugins -- Utility

View File

@@ -134,6 +134,25 @@ SASL auto-added when sasl_user/sasl_pass configured.
!unload <plugin> # Remove a plugin (admin)
```
## Archive
```
!archive https://example.com/page # Save to Wayback Machine
```
URL must have `http://` or `https://` scheme. 30s timeout. SOCKS5-proxied.
## Bulk DNS
```
!resolve example.com github.com # A records (concurrent)
!resolve example.com AAAA # Specific type
!resolve 1.2.3.4 8.8.8.8 # Auto PTR for IPs
```
Max 10 hosts. Types: A, AAAA, MX, NS, TXT, CNAME, PTR, SOA.
TCP DNS via SOCKS5, server 1.1.1.1.
## Recon
```
@@ -224,9 +243,26 @@ Categories: sqli, xss, ssti, lfi, cmdi, xxe
!refang hxxps[://]evil[.]com # Refang IOC
```
## Canary Tokens
```
!canary gen db-cred # 40-char hex token (default)
!canary gen aws staging-key # AWS AKIA keypair
!canary gen basic svc-login # user:pass pair
!canary list # List channel canaries
!canary info db-cred # Show full token
!canary del db-cred # Delete canary (admin)
```
Types: `token` (hex), `aws` (AKIA+secret), `basic` (user:pass).
Max 50/channel. `gen`/`del` admin only. Persists across restarts.
## Network
```
!tcping example.com # TCP latency (port 443, 3 probes)
!tcping example.com 22 # Custom port
!tcping example.com 80 5 # Custom port + count (max 10)
!cidr 10.0.0.0/24 # Subnet info
!cidr contains 10.0.0.0/8 10.1.2.3 # Membership check
!portcheck 10.0.0.1 # Scan common ports
@@ -237,9 +273,10 @@ Categories: sqli, xss, ssti, lfi, cmdi, xxe
!blacklist 1.2.3.4 # DNSBL reputation check
```
## Intelligence (local databases)
## Intelligence (local databases + APIs)
```
!internetdb 8.8.8.8 # Shodan InternetDB: ports, CVEs, CPEs, tags
!geoip 8.8.8.8 # GeoIP: city, country, coords, tz
!asn 8.8.8.8 # ASN: number + organization
!tor 1.2.3.4 # Check Tor exit node

View File

@@ -133,6 +133,11 @@ format = "text" # Log format: "text" (default) or "json"
| `!abuse <ip> report <cats> <comment>` | Report IP to AbuseIPDB (admin) |
| `!vt <hash\|ip\|domain\|url>` | VirusTotal lookup |
| `!emailcheck <email> [email2 ...]` | SMTP email verification (admin) |
| `!internetdb <ip>` | Shodan InternetDB host recon (ports, CVEs, CPEs) |
| `!canary <gen\|list\|info\|del>` | Canary token generator/tracker |
| `!tcping <host> [port] [count]` | TCP connect latency probe via SOCKS5 |
| `!archive <url>` | Save URL to Wayback Machine |
| `!resolve <host> [host2 ...] [type]` | Bulk DNS resolution via TCP/SOCKS5 |
| `!shorten <url>` | Shorten a URL via FlaskPaste |
| `!pastemoni <add\|del\|list\|check>` | Paste site keyword monitoring |
@@ -935,6 +940,125 @@ Polling and announcements:
- `list` shows keyword and per-backend error counts
- `check` forces an immediate poll across all backends
### `!internetdb` -- Shodan InternetDB
Look up host information from Shodan's free InternetDB API. Returns open ports,
reverse hostnames, CPE software fingerprints, tags, and known CVEs. No API key
required.
```
!internetdb 8.8.8.8
```
Output format:
```
8.8.8.8 -- dns.google | Ports: 53, 443 | CPEs: cpe:/a:isc:bind | Tags: cloud
```
- Single IP per query (IPv4 or IPv6)
- Private/loopback addresses are rejected
- Hostnames truncated to first 5; CVEs truncated to first 10 (with `+N more`)
- CPEs truncated to first 8
- All requests routed through SOCKS5 proxy
- Returns "no data available" for IPs not in the InternetDB index
### `!canary` -- Canary Token Generator
Generate realistic-looking credentials for planting as canary tokens (tripwires
for detecting unauthorized access). Tokens are persisted per-channel.
```
!canary gen db-cred Generate default token (40-char hex)
!canary gen aws staging-key AWS-style keypair
!canary gen basic svc-login Username:password pair
!canary list List canaries in channel
!canary info db-cred Show full token details
!canary del db-cred Delete a canary (admin)
```
Token types:
| Type | Format | Example |
|------|--------|---------|
| `token` | 40-char hex (API key / SHA1) | `a3f8b2c1d4e5...` |
| `aws` | AKIA access key + base64 secret | `AKIA7X9M2PVL5N...` |
| `basic` | user:pass pair | `svcadmin:xK9mP2vL5nR8wQ3z` |
- `gen` and `del` require admin privileges
- All subcommands must be used in a channel (not PM)
- Labels: 1-32 chars, alphanumeric + hyphens + underscores
- Maximum 50 canaries per channel
- Persisted via `bot.state` (survives restarts)
### `!tcping` -- TCP Connect Latency Probe
Measure TCP connect latency to a host:port through the SOCKS5 proxy. Sequential
probes with min/avg/max summary.
```
!tcping example.com Port 443, 3 probes
!tcping example.com 22 Port 22, 3 probes
!tcping example.com 80 5 Port 80, 5 probes
```
Output format:
```
tcping example.com:443 -- 3 probes 1: 45ms 2: 43ms 3: 47ms min/avg/max: 43/45/47 ms
```
- Default port: 443, default count: 3
- Max count: 10, timeout: 10s per probe
- Private/reserved addresses rejected
- Routed through SOCKS5 proxy
### `!archive` -- Wayback Machine Save
Save a URL to the Wayback Machine via the Save Page Now API.
```
!archive https://example.com/page
```
Output format:
```
Archiving https://example.com/page...
Archived: https://web.archive.org/web/20260220.../https://example.com/page
```
- URL must start with `http://` or `https://`
- Timeout: 30s (archiving can be slow)
- Handles 429 rate limit, 523 origin unreachable
- Sends acknowledgment before archiving
- Routed through SOCKS5 proxy
### `!resolve` -- Bulk DNS Resolution
Resolve multiple hosts via TCP DNS through the SOCKS5 proxy. Concurrent
resolution with compact output.
```
!resolve example.com github.com A records (default)
!resolve example.com AAAA Specific record type
!resolve 1.2.3.4 8.8.8.8 Auto PTR for IPs
```
Output format:
```
example.com -> 93.184.216.34
github.com -> 140.82.121.3
badhost.invalid -> NXDOMAIN
```
- Max 10 hosts per invocation
- Default type: A (auto-detect IP -> PTR)
- DNS server: 1.1.1.1 (Cloudflare)
- Concurrent via `asyncio.gather()`
- Valid types: A, NS, CNAME, SOA, PTR, MX, TXT, AAAA
### FlaskPaste Configuration
```toml

105
plugins/archive.py Normal file
View File

@@ -0,0 +1,105 @@
"""Plugin: Wayback Machine Save Page Now (SOCKS5-proxied)."""
from __future__ import annotations
import asyncio
import logging
import urllib.error
import urllib.request
from derp.http import urlopen as _urlopen
from derp.plugin import command
log = logging.getLogger(__name__)
_SAVE_URL = "https://web.archive.org/save/"
_TIMEOUT = 30
_USER_AGENT = "derp/1.0"
def _save_page(url: str) -> dict:
"""Blocking POST to Save Page Now. Returns result dict."""
target = f"{_SAVE_URL}{url}"
req = urllib.request.Request(
target,
headers={"User-Agent": _USER_AGENT},
)
try:
resp = _urlopen(req, timeout=_TIMEOUT)
# The save endpoint returns a redirect to the archived page.
# With urllib3 pooled requests, redirects are followed automatically.
final_url = getattr(resp, "geturl", lambda: None)()
headers = resp.headers if hasattr(resp, "headers") else {}
# Check for Content-Location or Link header with archived URL
content_location = None
if hasattr(headers, "get"):
content_location = headers.get("Content-Location", "")
link = headers.get("Link", "")
else:
content_location = ""
link = ""
resp.read()
# Try Content-Location first (most reliable)
if content_location and "/web/" in content_location:
if content_location.startswith("/"):
return {"url": f"https://web.archive.org{content_location}"}
return {"url": content_location}
# Try final URL after redirects
if final_url and "/web/" in final_url:
return {"url": final_url}
# Try Link header
if link and "/web/" in link:
# Extract URL from Link header: <url>; rel="memento"
for part in link.split(","):
part = part.strip()
if "/web/" in part and "<" in part:
extracted = part.split("<", 1)[1].split(">", 1)[0]
return {"url": extracted}
# If we got a 200 but no archive URL, report success without link
return {"url": f"https://web.archive.org/web/*/{url}"}
except urllib.error.HTTPError as exc:
if exc.code == 429:
return {"error": "rate limited -- try again later"}
if exc.code == 523:
return {"error": "origin unreachable"}
return {"error": f"HTTP {exc.code}"}
except (TimeoutError, OSError) as exc:
return {"error": f"timeout: {exc}"}
except Exception as exc:
return {"error": str(exc)[:100]}
@command("archive", help="Save to Wayback Machine: !archive <url>")
async def cmd_archive(bot, message):
"""Save a URL to the Wayback Machine via Save Page Now.
Usage:
!archive https://example.com/page
"""
parts = message.text.split(None, 1)
if len(parts) < 2:
await bot.reply(message, "Usage: !archive <url>")
return
url = parts[1].strip()
if not url.startswith(("http://", "https://")):
await bot.reply(message, "URL must start with http:// or https://")
return
await bot.reply(message, f"Archiving {url}...")
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _save_page, url)
if "error" in result:
await bot.reply(message, f"Archive failed: {result['error']}")
else:
await bot.reply(message, f"Archived: {result['url']}")

View File

@@ -1,41 +1,112 @@
"""Plugin: ASN lookup using MaxMind GeoLite2-ASN mmdb."""
"""Plugin: ASN lookup using iptoasn.com TSV database."""
from __future__ import annotations
import ipaddress
import logging
import struct
from bisect import bisect_right
from pathlib import Path
from derp.plugin import command
log = logging.getLogger(__name__)
_DB_PATHS = [
Path("data/GeoLite2-ASN.mmdb"),
Path("/usr/share/GeoIP/GeoLite2-ASN.mmdb"),
Path.home() / ".local" / "share" / "GeoIP" / "GeoLite2-ASN.mmdb",
]
_DB_PATH = Path("data/ip2asn-v4.tsv")
_reader = None
# Sorted parallel arrays populated by _load_db():
# _starts[i] = start IP as 32-bit int
# _ends[i] = end IP as 32-bit int
# _asns[i] = "AS<number>"
# _countries[i] = two-letter country code
# _orgs[i] = AS description string
_starts: list[int] = []
_ends: list[int] = []
_asns: list[str] = []
_countries: list[str] = []
_orgs: list[str] = []
_loaded = False
def _get_reader():
"""Lazy-load the mmdb reader."""
global _reader
if _reader is not None:
return _reader
try:
import maxminddb
except ImportError:
log.error("maxminddb package not installed")
def _ip_to_int(addr: str) -> int:
"""Convert dotted-quad IPv4 string to 32-bit unsigned integer."""
return struct.unpack("!I", ipaddress.IPv4Address(addr).packed)[0]
def _load_db(path: Path | None = None) -> bool:
"""Load the iptoasn TSV into sorted arrays.
Returns True if loaded successfully, False otherwise.
Rows with ASN 0 ("Not routed") are skipped.
"""
global _loaded
p = path or _DB_PATH
if not p.is_file():
log.warning("asn: %s not found (run update-data)", p)
return False
starts: list[int] = []
ends: list[int] = []
asns: list[str] = []
countries: list[str] = []
orgs: list[str] = []
with open(p, encoding="utf-8", errors="replace") as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 5:
continue
asn_num = parts[2]
if asn_num == "0":
continue
try:
start = _ip_to_int(parts[0])
end = _ip_to_int(parts[1])
except (ValueError, struct.error):
continue
starts.append(start)
ends.append(end)
asns.append(f"AS{asn_num}")
countries.append(parts[3])
orgs.append(parts[4])
_starts.clear()
_ends.clear()
_asns.clear()
_countries.clear()
_orgs.clear()
_starts.extend(starts)
_ends.extend(ends)
_asns.extend(asns)
_countries.extend(countries)
_orgs.extend(orgs)
_loaded = True
log.info("asn: loaded %d ranges from %s", len(_starts), p)
return True
def _lookup(addr: str) -> tuple[str, str, str] | None:
"""Look up an IPv4 address in the loaded database.
Returns (asn, org, country) or None if not found.
"""
if not _loaded:
if not _load_db():
return None
ip_int = _ip_to_int(addr)
idx = bisect_right(_starts, ip_int) - 1
if idx < 0:
return None
for path in _DB_PATHS:
if path.is_file():
_reader = maxminddb.open_database(str(path))
log.info("asn: loaded %s", path)
return _reader
log.warning("asn: no GeoLite2-ASN.mmdb found")
return None
if ip_int > _ends[idx]:
return None
return _asns[idx], _orgs[idx], _countries[idx]
@command("asn", help="ASN lookup: !asn <ip>")
@@ -61,25 +132,17 @@ async def cmd_asn(bot, message):
await bot.reply(message, f"{addr}: private/loopback address")
return
reader = _get_reader()
if reader is None:
await bot.reply(message, "ASN database not available (run update-data)")
if ip.version != 4:
await bot.reply(message, f"{addr}: only IPv4 supported")
return
try:
rec = reader.get(str(ip))
except Exception as exc:
await bot.reply(message, f"Lookup error: {exc}")
result = _lookup(str(ip))
if result is None:
if not _loaded:
await bot.reply(message, "ASN database not available (run update-data)")
else:
await bot.reply(message, f"{addr}: no ASN data")
return
if not rec:
await bot.reply(message, f"{addr}: no ASN data")
return
asn = rec.get("autonomous_system_number", "")
org = rec.get("autonomous_system_organization", "")
if asn:
await bot.reply(message, f"{addr}: AS{asn} ({org})" if org else f"{addr}: AS{asn}")
else:
await bot.reply(message, f"{addr}: no ASN data")
asn, org, country = result
await bot.reply(message, f"{addr}: {asn} {org} ({country})")

197
plugins/canary.py Normal file
View File

@@ -0,0 +1,197 @@
"""Plugin: canary token generator -- plant realistic fake credentials."""
from __future__ import annotations
import json
import secrets
import string
from datetime import datetime, timezone
from derp.plugin import command
_MAX_PER_CHANNEL = 50
def _gen_token() -> str:
"""40-char hex string (looks like API key / SHA1)."""
return secrets.token_hex(20)
def _gen_aws() -> dict[str, str]:
"""AWS-style keypair: AKIA + 16 alnum access key, 40-char base64 secret."""
chars = string.ascii_uppercase + string.digits
access = "AKIA" + "".join(secrets.choice(chars) for _ in range(16))
# 30 random bytes -> 40-char base64
secret = secrets.token_urlsafe(30)
return {"access_key": access, "secret_key": secret}
def _gen_basic() -> dict[str, str]:
"""Random user:pass pair."""
alnum = string.ascii_lowercase + string.digits
user = "svc" + "".join(secrets.choice(alnum) for _ in range(5))
pw = secrets.token_urlsafe(16)
return {"user": user, "pass": pw}
_TYPES = {
"token": "API token (40-char hex)",
"aws": "AWS keypair (AKIA access + secret)",
"basic": "Username:password pair",
}
def _load(bot, channel: str) -> dict:
"""Load canary store for a channel."""
raw = bot.state.get("canary", channel)
if not raw:
return {}
try:
return json.loads(raw)
except (json.JSONDecodeError, TypeError):
return {}
def _save(bot, channel: str, store: dict) -> None:
"""Persist canary store for a channel."""
bot.state.set("canary", channel, json.dumps(store))
def _format_token(entry: dict) -> str:
"""Format a canary entry for display."""
ttype = entry["type"]
value = entry["value"]
if ttype == "aws":
return f"Access: {value['access_key']} Secret: {value['secret_key']}"
if ttype == "basic":
return f"{value['user']}:{value['pass']}"
return value
@command("canary", help="Canary tokens: !canary gen [type] <label> | list | info | del")
async def cmd_canary(bot, message):
"""Generate and manage canary tokens (fake credentials for detection).
Usage:
!canary gen [type] <label> Generate token (admin)
!canary list List canaries in channel
!canary info <label> Show full token details
!canary del <label> Delete a canary (admin)
"""
parts = message.text.split()
if len(parts) < 2:
await bot.reply(message, "Usage: !canary <gen|list|info|del> [args]")
return
sub = parts[1].lower()
channel = message.target if message.is_channel else None
# ---- gen -----------------------------------------------------------------
if sub == "gen":
if not bot._is_admin(message):
await bot.reply(message, "Permission denied")
return
if not channel:
await bot.reply(message, "Use this command in a channel")
return
# Parse: !canary gen [type] <label>
rest = parts[2:]
if not rest:
types = ", ".join(_TYPES)
await bot.reply(message, f"Usage: !canary gen [type] <label> (types: {types})")
return
# Check if first arg is a type
ttype = "token"
if rest[0].lower() in _TYPES:
ttype = rest[0].lower()
rest = rest[1:]
if not rest:
await bot.reply(message, "Usage: !canary gen [type] <label>")
return
label = rest[0].lower()
if len(label) > 32 or not all(c.isalnum() or c in "-_" for c in label):
await bot.reply(message, "Label: 1-32 chars, alphanumeric/hyphens/underscores")
return
store = _load(bot, channel)
if label in store:
await bot.reply(message, f"Canary '{label}' already exists")
return
if len(store) >= _MAX_PER_CHANNEL:
await bot.reply(message, f"Limit reached ({_MAX_PER_CHANNEL} per channel)")
return
# Generate
if ttype == "aws":
value = _gen_aws()
elif ttype == "basic":
value = _gen_basic()
else:
value = _gen_token()
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
store[label] = {"type": ttype, "value": value, "created": now}
_save(bot, channel, store)
display = _format_token(store[label])
await bot.reply(message, f"Canary '{label}' ({ttype}): {display}")
return
# ---- list ----------------------------------------------------------------
if sub == "list":
if not channel:
await bot.reply(message, "Use this command in a channel")
return
store = _load(bot, channel)
if not store:
await bot.reply(message, "No canaries in this channel")
return
items = [f"{lbl} ({e['type']})" for lbl, e in sorted(store.items())]
await bot.reply(message, f"Canaries: {', '.join(items)}")
return
# ---- info ----------------------------------------------------------------
if sub == "info":
if not channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 3:
await bot.reply(message, "Usage: !canary info <label>")
return
label = parts[2].lower()
store = _load(bot, channel)
entry = store.get(label)
if not entry:
await bot.reply(message, f"No canary '{label}'")
return
display = _format_token(entry)
await bot.reply(message, f"{label} ({entry['type']}, {entry['created']}): {display}")
return
# ---- del -----------------------------------------------------------------
if sub == "del":
if not bot._is_admin(message):
await bot.reply(message, "Permission denied")
return
if not channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 3:
await bot.reply(message, "Usage: !canary del <label>")
return
label = parts[2].lower()
store = _load(bot, channel)
if label not in store:
await bot.reply(message, f"No canary '{label}'")
return
del store[label]
_save(bot, channel, store)
await bot.reply(message, f"Deleted canary '{label}'")
return
# ---- unknown -------------------------------------------------------------
await bot.reply(message, "Usage: !canary <gen|list|info|del> [args]")

105
plugins/internetdb.py Normal file
View File

@@ -0,0 +1,105 @@
"""Plugin: Shodan InternetDB -- free host reconnaissance (no API key)."""
from __future__ import annotations
import asyncio
import ipaddress
import json
import logging
from derp.http import urlopen as _urlopen
from derp.plugin import command
log = logging.getLogger(__name__)
_API_URL = "https://internetdb.shodan.io"
_TIMEOUT = 15
def _fetch(addr: str) -> dict | None:
"""Fetch InternetDB data for an IP address.
Returns parsed JSON dict, or None on 404 (no data).
Raises on network/server errors.
"""
import urllib.error
try:
resp = _urlopen(f"{_API_URL}/{addr}", timeout=_TIMEOUT)
return json.loads(resp.read())
except urllib.error.HTTPError as exc:
if exc.code == 404:
return None
raise
def _format_result(addr: str, data: dict) -> str:
"""Format InternetDB response into a compact IRC message."""
lines = []
hostnames = data.get("hostnames", [])
if hostnames:
lines.append(f"{addr} -- {', '.join(hostnames[:5])}")
else:
lines.append(addr)
ports = data.get("ports", [])
if ports:
lines.append(f"Ports: {', '.join(str(p) for p in sorted(ports))}")
cpes = data.get("cpes", [])
if cpes:
lines.append(f"CPEs: {', '.join(cpes[:8])}")
tags = data.get("tags", [])
if tags:
lines.append(f"Tags: {', '.join(tags)}")
vulns = data.get("vulns", [])
if vulns:
shown = vulns[:10]
suffix = f" (+{len(vulns) - 10} more)" if len(vulns) > 10 else ""
lines.append(f"CVEs: {', '.join(shown)}{suffix}")
return " | ".join(lines)
@command("internetdb", help="Shodan InternetDB: !internetdb <ip>")
async def cmd_internetdb(bot, message):
"""Look up host information from Shodan InternetDB.
Returns open ports, hostnames, CPEs, tags, and known CVEs.
Free API, no key required.
Usage:
!internetdb 8.8.8.8
"""
parts = message.text.split(None, 2)
if len(parts) < 2:
await bot.reply(message, "Usage: !internetdb <ip>")
return
addr = parts[1].strip()
try:
ip = ipaddress.ip_address(addr)
except ValueError:
await bot.reply(message, f"Invalid IP address: {addr}")
return
if ip.is_private or ip.is_loopback:
await bot.reply(message, f"{addr}: private/loopback address")
return
loop = asyncio.get_running_loop()
try:
data = await loop.run_in_executor(None, _fetch, str(ip))
except Exception as exc:
log.error("internetdb: lookup failed for %s: %s", addr, exc)
await bot.reply(message, f"{addr}: lookup failed ({exc})")
return
if data is None:
await bot.reply(message, f"{addr}: no data available")
return
await bot.reply(message, _format_result(str(ip), data))

115
plugins/resolve.py Normal file
View File

@@ -0,0 +1,115 @@
"""Plugin: bulk DNS resolution over TCP (SOCKS5-proxied)."""
from __future__ import annotations
import asyncio
import ipaddress
import struct
from derp.dns import (
QTYPES,
RCODES,
build_query,
parse_response,
reverse_name,
)
from derp.http import open_connection as _open_connection
from derp.plugin import command
_DEFAULT_SERVER = "1.1.1.1"
_TIMEOUT = 5.0
_MAX_HOSTS = 10
async def _query_tcp(name: str, qtype: int, server: str,
timeout: float = _TIMEOUT) -> tuple[int, list[str]]:
"""Send a DNS query over TCP and return (rcode, [values])."""
reader, writer = await asyncio.wait_for(
_open_connection(server, 53, timeout=timeout), timeout=timeout,
)
try:
pkt = build_query(name, qtype)
writer.write(struct.pack("!H", len(pkt)) + pkt)
await writer.drain()
length = struct.unpack("!H", await reader.readexactly(2))[0]
data = await reader.readexactly(length)
return parse_response(data)
finally:
writer.close()
await writer.wait_closed()
async def _resolve_one(host: str, qtype_str: str,
server: str) -> str:
"""Resolve a single host, return formatted result line."""
qtype = QTYPES.get(qtype_str)
lookup = host
if qtype_str == "PTR":
try:
lookup = reverse_name(host)
except ValueError:
return f"{host} -> invalid IP for PTR"
try:
rcode, results = await _query_tcp(lookup, qtype, server)
except (TimeoutError, asyncio.TimeoutError):
return f"{host} -> timeout"
except OSError as exc:
return f"{host} -> error: {exc}"
if rcode != 0:
err = RCODES.get(rcode, f"error {rcode}")
return f"{host} -> {err}"
if not results:
return f"{host} -> no records"
return f"{host} -> {', '.join(results)}"
@command("resolve", help="Bulk DNS: !resolve <host> [host2 ...] [type]")
async def cmd_resolve(bot, message):
"""Bulk DNS resolution via TCP through SOCKS5 proxy.
Usage:
!resolve example.com github.com (A records)
!resolve example.com AAAA (specific type)
!resolve 1.2.3.4 8.8.8.8 (auto PTR)
"""
parts = message.text.split()
if len(parts) < 2:
await bot.reply(message, "Usage: !resolve <host> [host2 ...] [type]")
return
args = parts[1:]
# Check if last arg is a record type
qtype_str = None
if args[-1].upper() in QTYPES:
qtype_str = args[-1].upper()
args = args[:-1]
if not args:
await bot.reply(message, "Usage: !resolve <host> [host2 ...] [type]")
return
hosts = args[:_MAX_HOSTS]
# Auto-detect type per host if not specified
async def _do(host: str) -> str:
qt = qtype_str
if qt is None:
try:
ipaddress.ip_address(host)
qt = "PTR"
except ValueError:
qt = "A"
return await _resolve_one(host, qt, _DEFAULT_SERVER)
results = await asyncio.gather(*[_do(h) for h in hosts])
lines = list(results)
if len(args) > _MAX_HOSTS:
lines.append(f"(showing first {_MAX_HOSTS} of {len(args)})")
for line in lines:
await bot.reply(message, line)

116
plugins/tcping.py Normal file
View File

@@ -0,0 +1,116 @@
"""Plugin: TCP connect latency probe (SOCKS5-proxied)."""
from __future__ import annotations
import asyncio
import ipaddress
import time
from derp.http import open_connection as _open_connection
from derp.plugin import command
_TIMEOUT = 10.0
_MAX_COUNT = 10
_DEFAULT_COUNT = 3
_DEFAULT_PORT = 443
def _is_internal(host: str) -> bool:
"""Check if host is a private/reserved address."""
try:
ip = ipaddress.ip_address(host)
return ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local
except ValueError:
return False
def _validate_host(host: str) -> bool:
"""Check that host is an IP or looks like a domain."""
try:
ipaddress.ip_address(host)
return True
except ValueError:
pass
return "." in host and all(c.isalnum() or c in ".-" for c in host)
async def _probe(host: str, port: int, timeout: float) -> float | None:
"""Single TCP connect probe. Returns RTT in ms or None on failure."""
t0 = time.perf_counter()
try:
_, writer = await asyncio.wait_for(
_open_connection(host, port, timeout=timeout), timeout=timeout,
)
rtt = (time.perf_counter() - t0) * 1000
writer.close()
await writer.wait_closed()
return rtt
except (OSError, asyncio.TimeoutError, TimeoutError):
return None
@command("tcping", help="TCP latency: !tcping <host> [port] [count]")
async def cmd_tcping(bot, message):
"""Measure TCP connect latency to a host:port through SOCKS5 proxy.
Usage:
!tcping example.com (port 443, 3 probes)
!tcping example.com 22 (port 22, 3 probes)
!tcping example.com 80 5 (port 80, 5 probes)
"""
parts = message.text.split()
if len(parts) < 2:
await bot.reply(message, "Usage: !tcping <host> [port] [count]")
return
host = parts[1]
if not _validate_host(host):
await bot.reply(message, f"Invalid host: {host}")
return
if _is_internal(host):
await bot.reply(message, f"Refused: {host} is an internal/reserved address")
return
port = _DEFAULT_PORT
count = _DEFAULT_COUNT
if len(parts) > 2:
try:
port = int(parts[2])
if port < 1 or port > 65535:
raise ValueError
except ValueError:
await bot.reply(message, f"Invalid port: {parts[2]}")
return
if len(parts) > 3:
try:
count = int(parts[3])
count = max(1, min(count, _MAX_COUNT))
except ValueError:
pass
results: list[float | None] = []
for _ in range(count):
rtt = await _probe(host, port, _TIMEOUT)
results.append(rtt)
successes = [r for r in results if r is not None]
if not successes:
await bot.reply(message, f"tcping {host}:{port} -- {count} probes, all timed out")
return
probe_strs = []
for i, r in enumerate(results, 1):
probe_strs.append(f"{i}: {r:.0f}ms" if r is not None else f"{i}: timeout")
mn = min(successes)
avg = sum(successes) / len(successes)
mx = max(successes)
header = f"tcping {host}:{port} -- {count} probes"
probes = " ".join(probe_strs)
summary = f"min/avg/max: {mn:.0f}/{avg:.0f}/{mx:.0f} ms"
await bot.reply(message, f"{header} {probes} {summary}")

View File

@@ -24,7 +24,9 @@ _VIDEO_ID_RE = re.compile(r"(?:v=|youtu\.be/|/embed/|/shorts/)([A-Za-z0-9_-]{11}
_YT_DOMAINS = {"youtube.com", "www.youtube.com", "m.youtube.com", "youtu.be"}
_YT_FEED_URL = "https://www.youtube.com/feeds/videos.xml?channel_id={}"
_YT_PLAYER_URL = "https://www.youtube.com/youtubei/v1/player"
_YT_CLIENT_VERSION = "2.20250101.00.00"
_ANDROID_VERSION = "19.29.37"
_ANDROID_SDK = 33
_ANDROID_UA = f"com.google.android.youtube/{_ANDROID_VERSION} (Linux; U; Android 13)"
_ATOM_NS = "{http://www.w3.org/2005/Atom}"
_YT_NS = "{http://www.youtube.com/xml/schemas/2015}"
_MEDIA_NS = "{http://search.yahoo.com/mrss/}"
@@ -107,33 +109,41 @@ def _extract_video_id(url: str) -> str | None:
# -- Blocking helpers (for executor) -----------------------------------------
def _resolve_via_innertube(video_id: str) -> str | None:
"""Resolve video ID to channel ID via InnerTube player API. Blocking.
def _innertube_player(video_id: str) -> dict:
"""Fetch videoDetails via InnerTube ANDROID client. Blocking.
Small JSON request/response -- much more resilient to transient proxy
issues than fetching the full 1MB watch page.
Uses ANDROID client -- WEB client returns LOGIN_REQUIRED since ~2026-02.
Returns videoDetails dict, or {} on failure.
"""
payload = json.dumps({
"context": {
"client": {
"clientName": "WEB",
"clientVersion": _YT_CLIENT_VERSION,
"clientName": "ANDROID",
"clientVersion": _ANDROID_VERSION,
"androidSdkVersion": _ANDROID_SDK,
},
},
"videoId": video_id,
}).encode()
req = urllib.request.Request(_YT_PLAYER_URL, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("User-Agent", _ANDROID_UA)
try:
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
raw = resp.read()
resp.close()
data = json.loads(raw)
channel_id = (data.get("videoDetails") or {}).get("channelId", "")
if channel_id and _CHANNEL_ID_RE.fullmatch(channel_id):
return channel_id
return data.get("videoDetails") or {}
except Exception:
pass
return {}
def _resolve_via_innertube(video_id: str) -> str | None:
"""Resolve video ID to channel ID via InnerTube player API. Blocking."""
details = _innertube_player(video_id)
channel_id = details.get("channelId", "")
if channel_id and _CHANNEL_ID_RE.fullmatch(channel_id):
return channel_id
return None
@@ -142,28 +152,14 @@ def _fetch_duration(video_id: str) -> int:
Returns 0 on failure or for live content.
"""
payload = json.dumps({
"context": {
"client": {
"clientName": "WEB",
"clientVersion": _YT_CLIENT_VERSION,
},
},
"videoId": video_id,
}).encode()
req = urllib.request.Request(_YT_PLAYER_URL, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
details = _innertube_player(video_id)
if not details:
return 0
if details.get("isLiveContent") and details.get("isLive"):
return 0
try:
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
raw = resp.read()
resp.close()
data = json.loads(raw)
details = data.get("videoDetails") or {}
if details.get("isLiveContent") and details.get("isLive"):
return 0
secs = int(details.get("lengthSeconds", 0))
return secs
except Exception:
return int(details.get("lengthSeconds", 0))
except (ValueError, TypeError):
return 0

View File

@@ -124,6 +124,26 @@ update_geolite2() {
done
}
# -- iptoasn ASN database -----------------------------------------------------
update_asn() {
local dest="$DATA_DIR/ip2asn-v4.tsv"
local url="https://iptoasn.com/data/ip2asn-v4.tsv.gz"
mkdir -p "$DATA_DIR"
dim "Downloading iptoasn database..."
if curl -sS -fL --max-time 60 -o "$dest.gz" "$url" ||
curl -sS -fL --socks5-hostname 127.0.0.1:1080 --max-time 60 \
-o "$dest.gz" "$url"; then
gunzip -f "$dest.gz"
local count
count=$(wc -l < "$dest")
info "iptoasn: $count ranges"
else
rm -f "$dest.gz"
err "Failed to download iptoasn database"
((FAILURES++)) || true
fi
}
# -- Exploit-DB CSV -----------------------------------------------------------
update_exploitdb() {
local dest_dir="$DATA_DIR/exploitdb"
@@ -151,6 +171,7 @@ echo
update_tor
update_iprep
update_oui
update_asn
update_exploitdb
update_geolite2

150
tests/test_archive.py Normal file
View File

@@ -0,0 +1,150 @@
"""Tests for the Wayback Machine archive plugin."""
import asyncio
import importlib.util
import sys
import urllib.error
from pathlib import Path
from unittest.mock import MagicMock, patch
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.archive", Path(__file__).resolve().parent.parent / "plugins" / "archive.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.archive import _save_page, cmd_archive # noqa: E402
# -- Helpers -----------------------------------------------------------------
class _FakeBot:
def __init__(self):
self.replied: list[str] = []
async def reply(self, message, text: str) -> None:
self.replied.append(text)
def _msg(text: str) -> Message:
return Message(
raw="", prefix="alice!~alice@host", nick="alice",
command="PRIVMSG", params=["#test", text], tags={},
)
# -- _save_page --------------------------------------------------------------
class TestSavePage:
def test_content_location_header(self):
resp = MagicMock()
resp.headers = {"Content-Location": "/web/20260220/https://example.com"}
resp.read.return_value = b""
resp.geturl.return_value = "https://web.archive.org/save/https://example.com"
with patch.object(_mod, "_urlopen", return_value=resp):
result = _save_page("https://example.com")
assert "url" in result
assert "/web/20260220" in result["url"]
def test_final_url_redirect(self):
resp = MagicMock()
resp.headers = {}
resp.read.return_value = b""
resp.geturl.return_value = "https://web.archive.org/web/20260220/https://example.com"
with patch.object(_mod, "_urlopen", return_value=resp):
result = _save_page("https://example.com")
assert "url" in result
assert "/web/20260220" in result["url"]
def test_fallback_url(self):
resp = MagicMock()
resp.headers = {}
resp.read.return_value = b""
resp.geturl.return_value = "https://web.archive.org/save/ok"
with patch.object(_mod, "_urlopen", return_value=resp):
result = _save_page("https://example.com")
assert "url" in result
assert "/web/*/" in result["url"]
def test_rate_limit(self):
exc = urllib.error.HTTPError(
"url", 429, "Too Many Requests", {}, None,
)
with patch.object(_mod, "_urlopen", side_effect=exc):
result = _save_page("https://example.com")
assert "error" in result
assert "rate limit" in result["error"]
def test_origin_unreachable(self):
exc = urllib.error.HTTPError(
"url", 523, "Origin Unreachable", {}, None,
)
with patch.object(_mod, "_urlopen", side_effect=exc):
result = _save_page("https://example.com")
assert "error" in result
assert "unreachable" in result["error"]
def test_generic_http_error(self):
exc = urllib.error.HTTPError(
"url", 500, "Server Error", {}, None,
)
with patch.object(_mod, "_urlopen", side_effect=exc):
result = _save_page("https://example.com")
assert "error" in result
assert "500" in result["error"]
def test_timeout(self):
with patch.object(_mod, "_urlopen", side_effect=TimeoutError("timed out")):
result = _save_page("https://example.com")
assert "error" in result
assert "timeout" in result["error"]
# -- Command handler ---------------------------------------------------------
class TestCmdArchive:
def test_no_args(self):
bot = _FakeBot()
asyncio.run(cmd_archive(bot, _msg("!archive")))
assert "Usage" in bot.replied[0]
def test_no_scheme(self):
bot = _FakeBot()
asyncio.run(cmd_archive(bot, _msg("!archive example.com")))
assert "http://" in bot.replied[0]
def test_success(self):
bot = _FakeBot()
result = {"url": "https://web.archive.org/web/20260220/https://example.com"}
with patch.object(_mod, "_save_page", return_value=result):
asyncio.run(cmd_archive(bot, _msg("!archive https://example.com")))
assert len(bot.replied) == 2
assert "Archiving" in bot.replied[0]
assert "Archived:" in bot.replied[1]
assert "/web/20260220" in bot.replied[1]
def test_error(self):
bot = _FakeBot()
result = {"error": "rate limited -- try again later"}
with patch.object(_mod, "_save_page", return_value=result):
asyncio.run(cmd_archive(bot, _msg("!archive https://example.com")))
assert len(bot.replied) == 2
assert "failed" in bot.replied[1].lower()
assert "rate limit" in bot.replied[1]

258
tests/test_asn.py Normal file
View File

@@ -0,0 +1,258 @@
"""Tests for the ASN lookup plugin (iptoasn.com TSV backend)."""
import asyncio
import importlib.util
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.asn", Path(__file__).resolve().parent.parent / "plugins" / "asn.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.asn import ( # noqa: E402
_ip_to_int,
_load_db,
_lookup,
cmd_asn,
)
# -- Sample TSV data ---------------------------------------------------------
SAMPLE_TSV = """\
1.0.0.0\t1.0.0.255\t13335\tUS\tCLOUDFLARENET
1.0.1.0\t1.0.3.255\t0\tNone\tNot routed
1.0.4.0\t1.0.7.255\t56203\tAU\tGTELECOM
8.8.8.0\t8.8.8.255\t15169\tUS\tGOOGLE
"""
# -- Helpers -----------------------------------------------------------------
class _FakeBot:
def __init__(self):
self.replied: list[str] = []
async def reply(self, message, text: str) -> None:
self.replied.append(text)
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
def _reset_db():
"""Clear module-level DB state between tests."""
_mod._starts.clear()
_mod._ends.clear()
_mod._asns.clear()
_mod._countries.clear()
_mod._orgs.clear()
_mod._loaded = False
def _load_sample(tsv: str = SAMPLE_TSV) -> Path:
"""Write TSV to a temp file, load it, return the path."""
_reset_db()
tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".tsv", delete=False)
tmp.write(tsv)
tmp.flush()
tmp.close()
p = Path(tmp.name)
_load_db(p)
return p
# ---------------------------------------------------------------------------
# TestIpToInt
# ---------------------------------------------------------------------------
class TestIpToInt:
def test_zero(self):
assert _ip_to_int("0.0.0.0") == 0
def test_one(self):
assert _ip_to_int("0.0.0.1") == 1
def test_max(self):
assert _ip_to_int("255.255.255.255") == 0xFFFFFFFF
def test_known(self):
assert _ip_to_int("1.0.0.0") == 0x01000000
def test_google_dns(self):
assert _ip_to_int("8.8.8.8") == 0x08080808
# ---------------------------------------------------------------------------
# TestLoad
# ---------------------------------------------------------------------------
class TestLoad:
def test_loads_rows(self):
_load_sample()
# 4 rows in TSV, but ASN 0 is skipped -> 3 entries
assert len(_mod._starts) == 3
def test_skips_asn_zero(self):
_load_sample()
for asn in _mod._asns:
assert asn != "AS0"
def test_first_entry(self):
_load_sample()
assert _mod._asns[0] == "AS13335"
assert _mod._orgs[0] == "CLOUDFLARENET"
assert _mod._countries[0] == "US"
def test_missing_file_returns_false(self):
_reset_db()
result = _load_db(Path("/nonexistent/path.tsv"))
assert result is False
assert not _mod._loaded
def test_empty_file(self):
_reset_db()
tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".tsv", delete=False)
tmp.write("")
tmp.close()
result = _load_db(Path(tmp.name))
assert result is True
assert len(_mod._starts) == 0
def test_skips_comments_and_blanks(self):
tsv = "# comment\n\n1.0.0.0\t1.0.0.255\t13335\tUS\tCLOUD\n"
_load_sample(tsv)
assert len(_mod._starts) == 1
def test_skips_malformed_rows(self):
tsv = "bad\tdata\n1.0.0.0\t1.0.0.255\t13335\tUS\tCLOUD\n"
_load_sample(tsv)
assert len(_mod._starts) == 1
# ---------------------------------------------------------------------------
# TestLookup
# ---------------------------------------------------------------------------
class TestLookup:
def setup_method(self):
_load_sample()
def test_exact_start(self):
result = _lookup("1.0.0.0")
assert result is not None
asn, org, country = result
assert asn == "AS13335"
assert org == "CLOUDFLARENET"
assert country == "US"
def test_mid_range(self):
result = _lookup("1.0.0.128")
assert result is not None
assert result[0] == "AS13335"
def test_exact_end(self):
result = _lookup("1.0.0.255")
assert result is not None
assert result[0] == "AS13335"
def test_second_range(self):
result = _lookup("1.0.5.0")
assert result is not None
assert result[0] == "AS56203"
assert result[2] == "AU"
def test_google_dns(self):
result = _lookup("8.8.8.8")
assert result is not None
assert result[0] == "AS15169"
assert result[1] == "GOOGLE"
def test_miss_gap(self):
"""IP in the not-routed gap (ASN 0 range) returns None."""
result = _lookup("1.0.1.0")
assert result is None
def test_miss_below_first(self):
result = _lookup("0.255.255.255")
assert result is None
def test_miss_above_last(self):
result = _lookup("8.8.9.0")
assert result is None
def test_db_not_loaded(self):
_reset_db()
with patch.object(_mod, "_DB_PATH", Path("/nonexistent")):
result = _lookup("1.0.0.0")
assert result is None
# ---------------------------------------------------------------------------
# TestCommand
# ---------------------------------------------------------------------------
class TestCommand:
def setup_method(self):
_load_sample()
def test_valid_ip(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn 1.0.0.1")))
assert "AS13335" in bot.replied[0]
assert "CLOUDFLARENET" in bot.replied[0]
assert "(US)" in bot.replied[0]
def test_google_dns(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn 8.8.8.8")))
assert "AS15169" in bot.replied[0]
assert "GOOGLE" in bot.replied[0]
def test_private_ip(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn 192.168.1.1")))
assert "private/loopback" in bot.replied[0]
def test_loopback(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn 127.0.0.1")))
assert "private/loopback" in bot.replied[0]
def test_invalid_input(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn notanip")))
assert "Invalid IP" in bot.replied[0]
def test_no_args(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn")))
assert "Usage:" in bot.replied[0]
def test_ipv6_rejected(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn 2606:4700::1")))
assert "only IPv4" in bot.replied[0]
def test_no_match(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn 200.200.200.200")))
assert "no ASN data" in bot.replied[0]
def test_db_missing(self):
_reset_db()
bot = _FakeBot()
with patch.object(_mod, "_DB_PATH", Path("/nonexistent")):
asyncio.run(cmd_asn(bot, _msg("!asn 1.0.0.1")))
assert "not available" in bot.replied[0]

302
tests/test_canary.py Normal file
View File

@@ -0,0 +1,302 @@
"""Tests for the canary token generator plugin."""
import asyncio
import importlib.util
import sys
from pathlib import Path
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.canary", Path(__file__).resolve().parent.parent / "plugins" / "canary.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.canary import ( # noqa: E402
_gen_aws,
_gen_basic,
_gen_token,
_load,
_save,
cmd_canary,
)
# -- Helpers -----------------------------------------------------------------
class _FakeState:
"""In-memory stand-in for bot.state."""
def __init__(self):
self._store: dict[str, dict[str, str]] = {}
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
return self._store.get(plugin, {}).get(key, default)
def set(self, plugin: str, key: str, value: str) -> None:
self._store.setdefault(plugin, {})[key] = value
def delete(self, plugin: str, key: str) -> bool:
try:
del self._store[plugin][key]
return True
except KeyError:
return False
def keys(self, plugin: str) -> list[str]:
return sorted(self._store.get(plugin, {}).keys())
class _FakeBot:
"""Minimal bot stand-in that captures sent/replied messages."""
def __init__(self, *, admin: bool = False):
self.sent: list[tuple[str, str]] = []
self.replied: list[str] = []
self.state = _FakeState()
self._admin = admin
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
async def reply(self, message, text: str) -> None:
self.replied.append(text)
def _is_admin(self, message) -> bool:
return self._admin
def _msg(text: str, nick: str = "alice", target: str = "#ops") -> Message:
"""Create a channel PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
def _pm(text: str, nick: str = "alice") -> Message:
"""Create a private PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=["botname", text], tags={},
)
# -- Token generators -------------------------------------------------------
class TestGenToken:
def test_length(self):
tok = _gen_token()
assert len(tok) == 40
def test_hex(self):
tok = _gen_token()
int(tok, 16) # Should not raise
def test_unique(self):
assert _gen_token() != _gen_token()
class TestGenAws:
def test_access_key_format(self):
pair = _gen_aws()
assert pair["access_key"].startswith("AKIA")
assert len(pair["access_key"]) == 20
def test_secret_key_present(self):
pair = _gen_aws()
assert len(pair["secret_key"]) > 20
class TestGenBasic:
def test_user_format(self):
pair = _gen_basic()
assert pair["user"].startswith("svc")
assert len(pair["user"]) == 8
def test_pass_present(self):
pair = _gen_basic()
assert len(pair["pass"]) > 10
# -- State helpers -----------------------------------------------------------
class TestStateHelpers:
def test_save_and_load(self):
bot = _FakeBot()
store = {"mykey": {"type": "token", "value": "abc", "created": "now"}}
_save(bot, "#ops", store)
loaded = _load(bot, "#ops")
assert loaded == store
def test_load_empty(self):
bot = _FakeBot()
assert _load(bot, "#ops") == {}
def test_load_bad_json(self):
bot = _FakeBot()
bot.state.set("canary", "#ops", "not json{{{")
assert _load(bot, "#ops") == {}
# -- Command: gen ------------------------------------------------------------
class TestCmdGen:
def test_gen_default_token(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary gen db-cred")))
assert len(bot.replied) == 1
assert "db-cred" in bot.replied[0]
assert "token" in bot.replied[0]
store = _load(bot, "#ops")
assert "db-cred" in store
assert store["db-cred"]["type"] == "token"
assert len(store["db-cred"]["value"]) == 40
def test_gen_aws(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary gen aws staging-key")))
assert "staging-key" in bot.replied[0]
assert "AKIA" in bot.replied[0]
store = _load(bot, "#ops")
assert store["staging-key"]["type"] == "aws"
def test_gen_basic(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary gen basic svc-login")))
assert "svc-login" in bot.replied[0]
store = _load(bot, "#ops")
assert store["svc-login"]["type"] == "basic"
assert "user" in store["svc-login"]["value"]
def test_gen_requires_admin(self):
bot = _FakeBot(admin=False)
asyncio.run(cmd_canary(bot, _msg("!canary gen mytoken")))
assert "Permission denied" in bot.replied[0]
def test_gen_requires_channel(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _pm("!canary gen mytoken")))
assert "channel" in bot.replied[0].lower()
def test_gen_duplicate(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary gen dup-test")))
bot.replied.clear()
asyncio.run(cmd_canary(bot, _msg("!canary gen dup-test")))
assert "already exists" in bot.replied[0]
def test_gen_no_label(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary gen")))
assert "Usage" in bot.replied[0]
def test_gen_type_no_label(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary gen aws")))
assert "Usage" in bot.replied[0]
def test_gen_invalid_label(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary gen b@d!")))
assert "Label" in bot.replied[0]
# -- Command: list -----------------------------------------------------------
class TestCmdList:
def test_list_empty(self):
bot = _FakeBot()
asyncio.run(cmd_canary(bot, _msg("!canary list")))
assert "No canaries" in bot.replied[0]
def test_list_populated(self):
bot = _FakeBot()
store = {
"api-key": {"type": "token", "value": "abc", "created": "now"},
"db-cred": {"type": "basic", "value": {"user": "x", "pass": "y"}, "created": "now"},
}
_save(bot, "#ops", store)
asyncio.run(cmd_canary(bot, _msg("!canary list")))
assert "api-key" in bot.replied[0]
assert "db-cred" in bot.replied[0]
def test_list_requires_channel(self):
bot = _FakeBot()
asyncio.run(cmd_canary(bot, _pm("!canary list")))
assert "channel" in bot.replied[0].lower()
# -- Command: info -----------------------------------------------------------
class TestCmdInfo:
def test_info_exists(self):
bot = _FakeBot()
store = {"mykey": {"type": "token", "value": "a" * 40, "created": "2026-02-20T14:00:00"}}
_save(bot, "#ops", store)
asyncio.run(cmd_canary(bot, _msg("!canary info mykey")))
assert "mykey" in bot.replied[0]
assert "a" * 40 in bot.replied[0]
def test_info_missing(self):
bot = _FakeBot()
asyncio.run(cmd_canary(bot, _msg("!canary info nope")))
assert "No canary" in bot.replied[0]
def test_info_no_label(self):
bot = _FakeBot()
asyncio.run(cmd_canary(bot, _msg("!canary info")))
assert "Usage" in bot.replied[0]
def test_info_requires_channel(self):
bot = _FakeBot()
asyncio.run(cmd_canary(bot, _pm("!canary info mykey")))
assert "channel" in bot.replied[0].lower()
# -- Command: del ------------------------------------------------------------
class TestCmdDel:
def test_del_success(self):
bot = _FakeBot(admin=True)
store = {"victim": {"type": "token", "value": "x", "created": "now"}}
_save(bot, "#ops", store)
asyncio.run(cmd_canary(bot, _msg("!canary del victim")))
assert "Deleted" in bot.replied[0]
assert _load(bot, "#ops") == {}
def test_del_nonexistent(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary del nope")))
assert "No canary" in bot.replied[0]
def test_del_requires_admin(self):
bot = _FakeBot(admin=False)
asyncio.run(cmd_canary(bot, _msg("!canary del something")))
assert "Permission denied" in bot.replied[0]
def test_del_requires_channel(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _pm("!canary del something")))
assert "channel" in bot.replied[0].lower()
def test_del_no_label(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary del")))
assert "Usage" in bot.replied[0]
# -- Command: usage ----------------------------------------------------------
class TestCmdUsage:
def test_no_args(self):
bot = _FakeBot()
asyncio.run(cmd_canary(bot, _msg("!canary")))
assert "Usage" in bot.replied[0]
def test_unknown_subcommand(self):
bot = _FakeBot()
asyncio.run(cmd_canary(bot, _msg("!canary foobar")))
assert "Usage" in bot.replied[0]

281
tests/test_internetdb.py Normal file
View File

@@ -0,0 +1,281 @@
"""Tests for the InternetDB plugin (Shodan free host recon)."""
import asyncio
import importlib.util
import json
import sys
import urllib.error
from pathlib import Path
from unittest.mock import patch
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.internetdb",
Path(__file__).resolve().parent.parent / "plugins" / "internetdb.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.internetdb import ( # noqa: E402
_fetch,
_format_result,
cmd_internetdb,
)
# -- Sample API responses ----------------------------------------------------
SAMPLE_FULL = {
"cpes": ["cpe:/a:apache:http_server:2.4.41", "cpe:/a:openssl:openssl:1.1.1"],
"hostnames": ["dns.google"],
"ip": "8.8.8.8",
"ports": [443, 53],
"tags": ["cloud"],
"vulns": ["CVE-2021-23017", "CVE-2021-3449"],
}
SAMPLE_MINIMAL = {
"cpes": [],
"hostnames": [],
"ip": "203.0.113.1",
"ports": [80],
"tags": [],
"vulns": [],
}
SAMPLE_EMPTY = {
"cpes": [],
"hostnames": [],
"ip": "198.51.100.1",
"ports": [],
"tags": [],
"vulns": [],
}
SAMPLE_MANY_VULNS = {
"cpes": [],
"hostnames": ["vuln.example.com"],
"ip": "192.0.2.1",
"ports": [22, 80, 443],
"tags": ["self-signed", "eol-os"],
"vulns": [f"CVE-2021-{i}" for i in range(15)],
}
SAMPLE_MANY_HOSTNAMES = {
"cpes": [],
"hostnames": [f"host{i}.example.com" for i in range(8)],
"ip": "192.0.2.2",
"ports": [80],
"tags": [],
"vulns": [],
}
# -- Helpers -----------------------------------------------------------------
class _FakeBot:
def __init__(self):
self.replied: list[str] = []
async def reply(self, message, text: str) -> None:
self.replied.append(text)
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
class _FakeResp:
"""Minimal file-like response for urlopen mocking."""
def __init__(self, data: bytes, status: int = 200):
self._data = data
self.status = status
def read(self):
return self._data
def __enter__(self):
return self
def __exit__(self, *a):
pass
# ---------------------------------------------------------------------------
# TestFetch
# ---------------------------------------------------------------------------
class TestFetch:
def test_success(self):
body = json.dumps(SAMPLE_FULL).encode()
with patch.object(_mod, "_urlopen", return_value=_FakeResp(body)):
result = _fetch("8.8.8.8")
assert result == SAMPLE_FULL
def test_not_found(self):
exc = urllib.error.HTTPError(
"https://internetdb.shodan.io/192.0.2.1", 404, "Not Found", {}, None,
)
with patch.object(_mod, "_urlopen", side_effect=exc):
result = _fetch("192.0.2.1")
assert result is None
def test_server_error_raises(self):
exc = urllib.error.HTTPError(
"https://internetdb.shodan.io/192.0.2.1", 500, "Server Error", {}, None,
)
with patch.object(_mod, "_urlopen", side_effect=exc):
try:
_fetch("192.0.2.1")
assert False, "Expected HTTPError"
except urllib.error.HTTPError as e:
assert e.code == 500
def test_builds_correct_url(self):
calls = []
def _mock_urlopen(url, **kw):
calls.append(url)
return _FakeResp(json.dumps(SAMPLE_MINIMAL).encode())
with patch.object(_mod, "_urlopen", side_effect=_mock_urlopen):
_fetch("203.0.113.1")
assert calls[0] == "https://internetdb.shodan.io/203.0.113.1"
# ---------------------------------------------------------------------------
# TestFormatResult
# ---------------------------------------------------------------------------
class TestFormatResult:
def test_full_response(self):
result = _format_result("8.8.8.8", SAMPLE_FULL)
assert "8.8.8.8 -- dns.google" in result
assert "Ports: 53, 443" in result
assert "cpe:/a:apache:http_server:2.4.41" in result
assert "Tags: cloud" in result
assert "CVE-2021-23017" in result
def test_minimal_response(self):
result = _format_result("203.0.113.1", SAMPLE_MINIMAL)
assert "203.0.113.1" in result
assert "Ports: 80" in result
# No hostnames, CPEs, tags, or vulns sections
assert "--" not in result
assert "CPEs:" not in result
assert "Tags:" not in result
assert "CVEs:" not in result
def test_empty_response(self):
result = _format_result("198.51.100.1", SAMPLE_EMPTY)
assert result == "198.51.100.1"
def test_many_vulns_truncated(self):
result = _format_result("192.0.2.1", SAMPLE_MANY_VULNS)
assert "+5 more" in result
# First 10 shown
assert "CVE-2021-0" in result
assert "CVE-2021-9" in result
def test_many_hostnames_truncated(self):
result = _format_result("192.0.2.2", SAMPLE_MANY_HOSTNAMES)
# Only first 5 hostnames shown
assert "host0.example.com" in result
assert "host4.example.com" in result
assert "host5.example.com" not in result
def test_ports_sorted(self):
data = {**SAMPLE_FULL, "ports": [8080, 22, 443, 80]}
result = _format_result("8.8.8.8", data)
assert "Ports: 22, 80, 443, 8080" in result
def test_many_cpes_truncated(self):
data = {**SAMPLE_EMPTY, "cpes": [f"cpe:/a:vendor{i}:prod" for i in range(12)]}
result = _format_result("198.51.100.1", data)
assert "cpe:/a:vendor0:prod" in result
assert "cpe:/a:vendor7:prod" in result
assert "cpe:/a:vendor8:prod" not in result
# ---------------------------------------------------------------------------
# TestCommand
# ---------------------------------------------------------------------------
class TestCommand:
def _run(self, bot, msg, data=None, side_effect=None):
"""Run command with mocked _fetch."""
if side_effect is not None:
with patch.object(_mod, "_fetch", side_effect=side_effect):
asyncio.run(cmd_internetdb(bot, msg))
else:
with patch.object(_mod, "_fetch", return_value=data):
asyncio.run(cmd_internetdb(bot, msg))
def test_valid_ip(self):
bot = _FakeBot()
self._run(bot, _msg("!internetdb 8.8.8.8"), data=SAMPLE_FULL)
assert "dns.google" in bot.replied[0]
assert "Ports:" in bot.replied[0]
def test_no_data(self):
bot = _FakeBot()
self._run(bot, _msg("!internetdb 4.4.4.4"), data=None)
assert "no data available" in bot.replied[0]
def test_no_args(self):
bot = _FakeBot()
asyncio.run(cmd_internetdb(bot, _msg("!internetdb")))
assert "Usage:" in bot.replied[0]
def test_invalid_ip(self):
bot = _FakeBot()
asyncio.run(cmd_internetdb(bot, _msg("!internetdb notanip")))
assert "Invalid IP" in bot.replied[0]
def test_private_ip(self):
bot = _FakeBot()
asyncio.run(cmd_internetdb(bot, _msg("!internetdb 192.168.1.1")))
assert "private/loopback" in bot.replied[0]
def test_loopback(self):
bot = _FakeBot()
asyncio.run(cmd_internetdb(bot, _msg("!internetdb 127.0.0.1")))
assert "private/loopback" in bot.replied[0]
def test_ipv6(self):
bot = _FakeBot()
self._run(bot, _msg("!internetdb 2606:4700::1"), data=SAMPLE_MINIMAL)
assert "Ports:" in bot.replied[0]
def test_network_error(self):
bot = _FakeBot()
self._run(
bot, _msg("!internetdb 8.8.8.8"),
side_effect=ConnectionError("timeout"),
)
assert "lookup failed" in bot.replied[0]
def test_normalized_ip(self):
"""ipaddress normalization (e.g. ::ffff:8.8.8.8 -> mapped)."""
calls = []
def _mock_fetch(addr):
calls.append(addr)
return SAMPLE_MINIMAL
bot = _FakeBot()
with patch.object(_mod, "_fetch", side_effect=_mock_fetch):
asyncio.run(cmd_internetdb(bot, _msg("!internetdb 8.008.8.8")))
# Leading zeros rejected by Python's strict parser -> "Invalid IP"
assert "Invalid IP" in bot.replied[0]
def test_empty_result(self):
bot = _FakeBot()
self._run(bot, _msg("!internetdb 198.51.100.1"), data=SAMPLE_EMPTY)
assert "198.51.100.1" in bot.replied[0]

228
tests/test_resolve.py Normal file
View File

@@ -0,0 +1,228 @@
"""Tests for the bulk DNS resolve plugin."""
import asyncio
import importlib.util
import struct
import sys
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from derp.dns import encode_name
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.resolve", Path(__file__).resolve().parent.parent / "plugins" / "resolve.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.resolve import _query_tcp, _resolve_one, cmd_resolve # noqa: E402
# -- Helpers -----------------------------------------------------------------
def _make_a_response(ip_bytes: bytes = b"\x01\x02\x03\x04") -> bytes:
"""Build a minimal A-record DNS response."""
tid = b"\x00\x01"
flags = struct.pack("!H", 0x8180)
counts = struct.pack("!HHHH", 1, 1, 0, 0)
qname = encode_name("example.com")
question = qname + struct.pack("!HH", 1, 1)
answer = qname + struct.pack("!HHIH", 1, 1, 300, len(ip_bytes)) + ip_bytes
return tid + flags + counts + question + answer
def _make_nxdomain_response() -> bytes:
"""Build a minimal NXDOMAIN DNS response."""
tid = b"\x00\x01"
flags = struct.pack("!H", 0x8183) # rcode=3
counts = struct.pack("!HHHH", 1, 0, 0, 0)
qname = encode_name("nope.invalid")
question = qname + struct.pack("!HH", 1, 1)
return tid + flags + counts + question
class _FakeBot:
def __init__(self):
self.replied: list[str] = []
async def reply(self, message, text: str) -> None:
self.replied.append(text)
def _msg(text: str) -> Message:
return Message(
raw="", prefix="alice!~alice@host", nick="alice",
command="PRIVMSG", params=["#test", text], tags={},
)
# -- _query_tcp --------------------------------------------------------------
class TestQueryTcp:
def test_a_record(self):
response = _make_a_response()
framed = struct.pack("!H", len(response)) + response
reader = AsyncMock()
reader.readexactly = AsyncMock(side_effect=[framed[:2], framed[2:]])
writer = MagicMock()
writer.drain = AsyncMock()
writer.wait_closed = AsyncMock()
mock_open = AsyncMock(return_value=(reader, writer))
with patch.object(_mod, "_open_connection", mock_open):
rcode, results = asyncio.run(_query_tcp("example.com", 1, "1.1.1.1"))
assert rcode == 0
assert results == ["1.2.3.4"]
def test_nxdomain(self):
response = _make_nxdomain_response()
framed = struct.pack("!H", len(response)) + response
reader = AsyncMock()
reader.readexactly = AsyncMock(side_effect=[framed[:2], framed[2:]])
writer = MagicMock()
writer.drain = AsyncMock()
writer.wait_closed = AsyncMock()
mock_open = AsyncMock(return_value=(reader, writer))
with patch.object(_mod, "_open_connection", mock_open):
rcode, results = asyncio.run(_query_tcp("nope.invalid", 1, "1.1.1.1"))
assert rcode == 3
assert results == []
# -- _resolve_one ------------------------------------------------------------
class TestResolveOne:
def test_success(self):
mock_tcp = AsyncMock(return_value=(0, ["1.2.3.4"]))
with patch.object(_mod, "_query_tcp", mock_tcp):
result = asyncio.run(_resolve_one("example.com", "A", "1.1.1.1"))
assert "example.com -> 1.2.3.4" == result
def test_nxdomain(self):
mock_tcp = AsyncMock(return_value=(3, []))
with patch.object(_mod, "_query_tcp", mock_tcp):
result = asyncio.run(_resolve_one("bad.invalid", "A", "1.1.1.1"))
assert "NXDOMAIN" in result
def test_timeout(self):
mock_tcp = AsyncMock(side_effect=asyncio.TimeoutError())
with patch.object(_mod, "_query_tcp", mock_tcp):
result = asyncio.run(_resolve_one("slow.example.com", "A", "1.1.1.1"))
assert "timeout" in result
def test_error(self):
mock_tcp = AsyncMock(side_effect=OSError("connection refused"))
with patch.object(_mod, "_query_tcp", mock_tcp):
result = asyncio.run(_resolve_one("down.example.com", "A", "1.1.1.1"))
assert "error" in result
def test_ptr_auto(self):
mock_tcp = AsyncMock(return_value=(0, ["dns.google"]))
with patch.object(_mod, "_query_tcp", mock_tcp):
result = asyncio.run(_resolve_one("8.8.8.8", "PTR", "1.1.1.1"))
assert "dns.google" in result
def test_ptr_invalid_ip(self):
result = asyncio.run(_resolve_one("not-an-ip", "PTR", "1.1.1.1"))
assert "invalid IP" in result
def test_no_records(self):
mock_tcp = AsyncMock(return_value=(0, []))
with patch.object(_mod, "_query_tcp", mock_tcp):
result = asyncio.run(_resolve_one("empty.example.com", "A", "1.1.1.1"))
assert "no records" in result
def test_multiple_results(self):
mock_tcp = AsyncMock(return_value=(0, ["1.1.1.1", "1.0.0.1"]))
with patch.object(_mod, "_query_tcp", mock_tcp):
result = asyncio.run(_resolve_one("multi.example.com", "A", "1.1.1.1"))
assert "1.1.1.1, 1.0.0.1" in result
# -- Command handler ---------------------------------------------------------
class TestCmdResolve:
def test_no_args(self):
bot = _FakeBot()
asyncio.run(cmd_resolve(bot, _msg("!resolve")))
assert "Usage" in bot.replied[0]
def test_single_host(self):
bot = _FakeBot()
mock_tcp = AsyncMock(return_value=(0, ["93.184.216.34"]))
with patch.object(_mod, "_query_tcp", mock_tcp):
asyncio.run(cmd_resolve(bot, _msg("!resolve example.com")))
assert len(bot.replied) == 1
assert "example.com -> 93.184.216.34" in bot.replied[0]
def test_multiple_hosts(self):
bot = _FakeBot()
async def fake_tcp(name, qtype, server, timeout=5.0):
if "example" in name:
return 0, ["93.184.216.34"]
return 0, ["140.82.121.3"]
with patch.object(_mod, "_query_tcp", fake_tcp):
asyncio.run(cmd_resolve(bot, _msg("!resolve example.com github.com")))
assert len(bot.replied) == 2
assert "93.184.216.34" in bot.replied[0]
assert "140.82.121.3" in bot.replied[1]
def test_explicit_type(self):
bot = _FakeBot()
mock_tcp = AsyncMock(return_value=(0, ["2606:2800:220:1:248:1893:25c8:1946"]))
with patch.object(_mod, "_query_tcp", mock_tcp):
asyncio.run(cmd_resolve(bot, _msg("!resolve example.com AAAA")))
assert "2606:" in bot.replied[0]
# Verify AAAA qtype (28) was used
call_args = mock_tcp.call_args[0]
assert call_args[1] == 28
def test_ip_auto_ptr(self):
bot = _FakeBot()
mock_tcp = AsyncMock(return_value=(0, ["dns.google"]))
with patch.object(_mod, "_query_tcp", mock_tcp):
asyncio.run(cmd_resolve(bot, _msg("!resolve 8.8.8.8")))
assert "dns.google" in bot.replied[0]
def test_type_only_no_hosts(self):
bot = _FakeBot()
asyncio.run(cmd_resolve(bot, _msg("!resolve AAAA")))
assert "Usage" in bot.replied[0]
def test_nxdomain(self):
bot = _FakeBot()
mock_tcp = AsyncMock(return_value=(3, []))
with patch.object(_mod, "_query_tcp", mock_tcp):
asyncio.run(cmd_resolve(bot, _msg("!resolve bad.invalid")))
assert "NXDOMAIN" in bot.replied[0]
def test_max_hosts(self):
"""Hosts beyond MAX_HOSTS are truncated."""
bot = _FakeBot()
hosts = " ".join(f"h{i}.example.com" for i in range(15))
mock_tcp = AsyncMock(return_value=(0, ["1.2.3.4"]))
with patch.object(_mod, "_query_tcp", mock_tcp):
asyncio.run(cmd_resolve(bot, _msg(f"!resolve {hosts}")))
# 10 results + 1 truncation note
assert len(bot.replied) == 11
assert "showing first 10" in bot.replied[-1]

203
tests/test_tcping.py Normal file
View File

@@ -0,0 +1,203 @@
"""Tests for the TCP ping plugin."""
import asyncio
import importlib.util
import sys
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.tcping", Path(__file__).resolve().parent.parent / "plugins" / "tcping.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.tcping import ( # noqa: E402
_is_internal,
_probe,
_validate_host,
cmd_tcping,
)
# -- Helpers -----------------------------------------------------------------
class _FakeBot:
def __init__(self):
self.replied: list[str] = []
async def reply(self, message, text: str) -> None:
self.replied.append(text)
def _msg(text: str) -> Message:
return Message(
raw="", prefix="alice!~alice@host", nick="alice",
command="PRIVMSG", params=["#test", text], tags={},
)
# -- Validation --------------------------------------------------------------
class TestValidateHost:
def test_valid_ip(self):
assert _validate_host("93.184.216.34") is True
def test_valid_domain(self):
assert _validate_host("example.com") is True
def test_invalid_no_dot(self):
assert _validate_host("localhost") is False
def test_invalid_chars(self):
assert _validate_host("bad host!") is False
class TestIsInternal:
def test_private(self):
assert _is_internal("192.168.1.1") is True
def test_loopback(self):
assert _is_internal("127.0.0.1") is True
def test_public(self):
assert _is_internal("8.8.8.8") is False
def test_domain(self):
assert _is_internal("example.com") is False
# -- Probe -------------------------------------------------------------------
class TestProbe:
def test_success(self):
writer = MagicMock()
writer.wait_closed = AsyncMock()
mock_open = AsyncMock(return_value=(MagicMock(), writer))
with patch.object(_mod, "_open_connection", mock_open):
rtt = asyncio.run(_probe("example.com", 443, 5.0))
assert rtt is not None
assert rtt >= 0
writer.close.assert_called_once()
def test_timeout(self):
mock_open = AsyncMock(side_effect=asyncio.TimeoutError())
with patch.object(_mod, "_open_connection", mock_open):
rtt = asyncio.run(_probe("example.com", 443, 0.1))
assert rtt is None
def test_connection_error(self):
mock_open = AsyncMock(side_effect=OSError("refused"))
with patch.object(_mod, "_open_connection", mock_open):
rtt = asyncio.run(_probe("example.com", 443, 5.0))
assert rtt is None
# -- Command -----------------------------------------------------------------
class TestCmdTcping:
def test_no_args(self):
bot = _FakeBot()
asyncio.run(cmd_tcping(bot, _msg("!tcping")))
assert "Usage" in bot.replied[0]
def test_invalid_host(self):
bot = _FakeBot()
asyncio.run(cmd_tcping(bot, _msg("!tcping notahost")))
assert "Invalid host" in bot.replied[0]
def test_internal_host(self):
bot = _FakeBot()
asyncio.run(cmd_tcping(bot, _msg("!tcping 192.168.1.1")))
assert "internal" in bot.replied[0].lower()
def test_invalid_port(self):
bot = _FakeBot()
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com 99999")))
assert "Invalid port" in bot.replied[0]
def test_invalid_port_string(self):
bot = _FakeBot()
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com abc")))
assert "Invalid port" in bot.replied[0]
def test_success_default(self):
"""Default 3 probes, all succeed."""
bot = _FakeBot()
writer = MagicMock()
writer.wait_closed = AsyncMock()
mock_open = AsyncMock(return_value=(MagicMock(), writer))
with patch.object(_mod, "_open_connection", mock_open):
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com")))
assert len(bot.replied) == 1
reply = bot.replied[0]
assert "example.com:443" in reply
assert "3 probes" in reply
assert "min/avg/max" in reply
def test_custom_port_and_count(self):
bot = _FakeBot()
writer = MagicMock()
writer.wait_closed = AsyncMock()
mock_open = AsyncMock(return_value=(MagicMock(), writer))
with patch.object(_mod, "_open_connection", mock_open):
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com 22 5")))
reply = bot.replied[0]
assert "example.com:22" in reply
assert "5 probes" in reply
def test_all_timeout(self):
bot = _FakeBot()
mock_open = AsyncMock(side_effect=asyncio.TimeoutError())
with patch.object(_mod, "_open_connection", mock_open):
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com")))
assert "timed out" in bot.replied[0]
def test_count_clamped(self):
"""Count > MAX_COUNT gets clamped."""
bot = _FakeBot()
writer = MagicMock()
writer.wait_closed = AsyncMock()
mock_open = AsyncMock(return_value=(MagicMock(), writer))
with patch.object(_mod, "_open_connection", mock_open):
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com 443 99")))
# MAX_COUNT is 10
assert "10 probes" in bot.replied[0]
def test_partial_timeout(self):
"""Some probes succeed, some fail."""
bot = _FakeBot()
call_count = 0
writer = MagicMock()
writer.wait_closed = AsyncMock()
async def mock_open(host, port, timeout=None):
nonlocal call_count
call_count += 1
if call_count == 2:
raise asyncio.TimeoutError()
return (MagicMock(), writer)
with patch.object(_mod, "_open_connection", mock_open):
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com 443 3")))
reply = bot.replied[0]
assert "timeout" in reply
assert "min/avg/max" in reply