Compare commits
4 Commits
6f1f4b2fc8
...
e3bb793574
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e3bb793574 | ||
|
|
7c40a6b7f1 | ||
|
|
3de3f054df | ||
|
|
442fea703c |
34
TASKS.md
34
TASKS.md
@@ -1,6 +1,38 @@
|
||||
# derp - Tasks
|
||||
|
||||
## Current Sprint -- v1.2.7 Subscription Plugin Enrichment (2026-02-19)
|
||||
## Current Sprint -- v1.3.0 Tier 2 Plugins (2026-02-20)
|
||||
|
||||
| Pri | Status | Task |
|
||||
|-----|--------|------|
|
||||
| P0 | [x] | Canary token generator (`plugins/canary.py`) -- gen/list/info/del |
|
||||
| P0 | [x] | TCP ping (`plugins/tcping.py`) -- latency probe via SOCKS5 |
|
||||
| P0 | [x] | Wayback archive (`plugins/archive.py`) -- Save Page Now via SOCKS5 |
|
||||
| P0 | [x] | Bulk DNS resolve (`plugins/resolve.py`) -- concurrent TCP DNS via SOCKS5 |
|
||||
| P1 | [x] | Tests for all 4 plugins |
|
||||
| P2 | [x] | Documentation update (USAGE.md, CHEATSHEET.md) |
|
||||
|
||||
## Previous Sprint -- v1.2.9 InternetDB Plugin (2026-02-19)
|
||||
|
||||
| Pri | Status | Task |
|
||||
|-----|--------|------|
|
||||
| P0 | [x] | Shodan InternetDB plugin (`plugins/internetdb.py`) -- free, no API key |
|
||||
| P0 | [x] | Fetch via SOCKS5 proxy (`derp.http.urlopen`) |
|
||||
| P1 | [x] | Compact formatting: hostnames, ports, CPEs, tags, CVEs with truncation |
|
||||
| P1 | [x] | Input validation: IPv4/IPv6, private/loopback rejection |
|
||||
| P2 | [x] | Tests: fetch, format, command handler (21 cases, 927 total) |
|
||||
| P2 | [x] | Documentation update (USAGE.md, CHEATSHEET.md) |
|
||||
|
||||
## Previous Sprint -- v1.2.8 ASN Backend Replacement (2026-02-19)
|
||||
|
||||
| Pri | Status | Task |
|
||||
|-----|--------|------|
|
||||
| P0 | [x] | Replace MaxMind ASN with iptoasn.com TSV backend (no license key) |
|
||||
| P0 | [x] | Bisect-based lookup in `plugins/asn.py` (pure stdlib) |
|
||||
| P1 | [x] | `update_asn()` in `scripts/update-data.sh` (SOCKS5 download) |
|
||||
| P2 | [x] | Tests: load, lookup, command handler (30 cases, 906 total) |
|
||||
| P2 | [x] | Documentation update (USAGE.md data directory layout) |
|
||||
|
||||
## Previous Sprint -- v1.2.7 Subscription Plugin Enrichment (2026-02-19)
|
||||
|
||||
| Pri | Status | Task |
|
||||
|-----|--------|------|
|
||||
|
||||
71
TODO.md
71
TODO.md
@@ -9,15 +9,72 @@
|
||||
- [ ] Webhook listener (HTTP endpoint for push events to channels)
|
||||
- [ ] Granular ACLs (per-command: trusted, operator, admin)
|
||||
|
||||
## LLM Bridge
|
||||
|
||||
Goal: let an LLM agent interact with the bot over IRC in real-time,
|
||||
with full machine access (bash, file ops, etc.).
|
||||
|
||||
### Architecture
|
||||
|
||||
Owner addresses the bot on IRC. A bridge daemon reads addressed
|
||||
messages, feeds them to an LLM with tool access, and writes replies
|
||||
back through the bot. The bot already has `owner` config
|
||||
(`[bot] owner` hostmask patterns) to gate who can trigger LLM
|
||||
interactions.
|
||||
|
||||
```
|
||||
IRC -> bot stdout (addressed msgs) -> bridge -> LLM API -> bridge -> bot inbox -> IRC
|
||||
```
|
||||
|
||||
### Approach options (ranked)
|
||||
|
||||
1. **Claude Code Agent SDK** (clean, non-trivial)
|
||||
- Custom Python agent using `anthropic` SDK with `tool_use`
|
||||
- Define tools: bash exec, file read/write, web fetch
|
||||
- Persistent conversation memory across messages
|
||||
- Full control over the event loop -- real-time IRC is natural
|
||||
- Tradeoff: must implement and maintain tool definitions
|
||||
|
||||
2. **Claude Code CLI per-message** (simple, stateless)
|
||||
- `echo "user said X" | claude --print --allowedTools bash,read,write`
|
||||
- Each invocation is a cold start with no conversation memory
|
||||
- Simple to implement but slow startup, no multi-turn context
|
||||
- Could pass conversation history via system prompt (fragile)
|
||||
|
||||
3. **Persistent Claude Code subprocess** (hack, fragile)
|
||||
- Long-running `claude` process with stdin/stdout piped
|
||||
- Keeps context across messages within a session
|
||||
- Not designed for this -- output parsing is brittle
|
||||
- Session may drift or hit context limits
|
||||
|
||||
### Bot-side plumbing needed
|
||||
|
||||
- `--llm` CLI flag: route logging to file, stdout for addressed msgs
|
||||
- `_is_addressed()`: DM or nick-prefixed messages
|
||||
- `_is_owner()`: only owner hostmasks trigger LLM routing
|
||||
- Inbox file polling (`/tmp/llm-inbox`): bridge writes `<target> <msg>`
|
||||
- `llm-send` script: line splitting (400 char), FlaskPaste overflow
|
||||
- Stdout format: `HH:MM [#chan] <nick> text` / `HH:MM --- status`
|
||||
- Only LLM-originated replies echoed to stdout (not all bot output)
|
||||
|
||||
### Previous attempt (reverted)
|
||||
|
||||
The `--llm` mode was implemented and tested (commit ea6f079, reverted
|
||||
in 6f1f4b2). The stdout/stdin plumbing worked but Claude Code CLI
|
||||
cannot act as a real-time daemon -- each tool call is a blocking
|
||||
round-trip, making interactive IRC conversation impractical. The code
|
||||
is preserved in git history for reference.
|
||||
|
||||
## Plugins -- Security/OSINT
|
||||
|
||||
- [ ] `emailcheck` -- SMTP VRFY/RCPT TO verification
|
||||
- [ ] `canary` -- canary token generator/tracker
|
||||
- [ ] `virustotal` -- hash/URL/IP/domain lookup (free API)
|
||||
- [ ] `abuseipdb` -- IP abuse confidence scoring (free tier)
|
||||
- [ ] `jwt` -- decode tokens, show claims/expiry, flag weaknesses
|
||||
- [ ] `mac` -- OUI vendor lookup (local IEEE database)
|
||||
- [ ] `pastemoni` -- monitor paste sites for keywords
|
||||
- [x] `emailcheck` -- SMTP VRFY/RCPT TO verification
|
||||
- [x] `canary` -- canary token generator/tracker
|
||||
- [x] `virustotal` -- hash/URL/IP/domain lookup (free API)
|
||||
- [x] `abuseipdb` -- IP abuse confidence scoring (free tier)
|
||||
- [x] `jwt` -- decode tokens, show claims/expiry, flag weaknesses
|
||||
- [x] `mac` -- OUI vendor lookup (local IEEE database)
|
||||
- [x] `pastemoni` -- monitor paste sites for keywords
|
||||
- [x] `internetdb` -- Shodan InternetDB host recon (free, no API key)
|
||||
|
||||
## Plugins -- Utility
|
||||
|
||||
|
||||
@@ -134,6 +134,25 @@ SASL auto-added when sasl_user/sasl_pass configured.
|
||||
!unload <plugin> # Remove a plugin (admin)
|
||||
```
|
||||
|
||||
## Archive
|
||||
|
||||
```
|
||||
!archive https://example.com/page # Save to Wayback Machine
|
||||
```
|
||||
|
||||
URL must have `http://` or `https://` scheme. 30s timeout. SOCKS5-proxied.
|
||||
|
||||
## Bulk DNS
|
||||
|
||||
```
|
||||
!resolve example.com github.com # A records (concurrent)
|
||||
!resolve example.com AAAA # Specific type
|
||||
!resolve 1.2.3.4 8.8.8.8 # Auto PTR for IPs
|
||||
```
|
||||
|
||||
Max 10 hosts. Types: A, AAAA, MX, NS, TXT, CNAME, PTR, SOA.
|
||||
TCP DNS via SOCKS5, server 1.1.1.1.
|
||||
|
||||
## Recon
|
||||
|
||||
```
|
||||
@@ -224,9 +243,26 @@ Categories: sqli, xss, ssti, lfi, cmdi, xxe
|
||||
!refang hxxps[://]evil[.]com # Refang IOC
|
||||
```
|
||||
|
||||
## Canary Tokens
|
||||
|
||||
```
|
||||
!canary gen db-cred # 40-char hex token (default)
|
||||
!canary gen aws staging-key # AWS AKIA keypair
|
||||
!canary gen basic svc-login # user:pass pair
|
||||
!canary list # List channel canaries
|
||||
!canary info db-cred # Show full token
|
||||
!canary del db-cred # Delete canary (admin)
|
||||
```
|
||||
|
||||
Types: `token` (hex), `aws` (AKIA+secret), `basic` (user:pass).
|
||||
Max 50/channel. `gen`/`del` admin only. Persists across restarts.
|
||||
|
||||
## Network
|
||||
|
||||
```
|
||||
!tcping example.com # TCP latency (port 443, 3 probes)
|
||||
!tcping example.com 22 # Custom port
|
||||
!tcping example.com 80 5 # Custom port + count (max 10)
|
||||
!cidr 10.0.0.0/24 # Subnet info
|
||||
!cidr contains 10.0.0.0/8 10.1.2.3 # Membership check
|
||||
!portcheck 10.0.0.1 # Scan common ports
|
||||
@@ -237,9 +273,10 @@ Categories: sqli, xss, ssti, lfi, cmdi, xxe
|
||||
!blacklist 1.2.3.4 # DNSBL reputation check
|
||||
```
|
||||
|
||||
## Intelligence (local databases)
|
||||
## Intelligence (local databases + APIs)
|
||||
|
||||
```
|
||||
!internetdb 8.8.8.8 # Shodan InternetDB: ports, CVEs, CPEs, tags
|
||||
!geoip 8.8.8.8 # GeoIP: city, country, coords, tz
|
||||
!asn 8.8.8.8 # ASN: number + organization
|
||||
!tor 1.2.3.4 # Check Tor exit node
|
||||
|
||||
124
docs/USAGE.md
124
docs/USAGE.md
@@ -133,6 +133,11 @@ format = "text" # Log format: "text" (default) or "json"
|
||||
| `!abuse <ip> report <cats> <comment>` | Report IP to AbuseIPDB (admin) |
|
||||
| `!vt <hash\|ip\|domain\|url>` | VirusTotal lookup |
|
||||
| `!emailcheck <email> [email2 ...]` | SMTP email verification (admin) |
|
||||
| `!internetdb <ip>` | Shodan InternetDB host recon (ports, CVEs, CPEs) |
|
||||
| `!canary <gen\|list\|info\|del>` | Canary token generator/tracker |
|
||||
| `!tcping <host> [port] [count]` | TCP connect latency probe via SOCKS5 |
|
||||
| `!archive <url>` | Save URL to Wayback Machine |
|
||||
| `!resolve <host> [host2 ...] [type]` | Bulk DNS resolution via TCP/SOCKS5 |
|
||||
| `!shorten <url>` | Shorten a URL via FlaskPaste |
|
||||
| `!pastemoni <add\|del\|list\|check>` | Paste site keyword monitoring |
|
||||
|
||||
@@ -935,6 +940,125 @@ Polling and announcements:
|
||||
- `list` shows keyword and per-backend error counts
|
||||
- `check` forces an immediate poll across all backends
|
||||
|
||||
### `!internetdb` -- Shodan InternetDB
|
||||
|
||||
Look up host information from Shodan's free InternetDB API. Returns open ports,
|
||||
reverse hostnames, CPE software fingerprints, tags, and known CVEs. No API key
|
||||
required.
|
||||
|
||||
```
|
||||
!internetdb 8.8.8.8
|
||||
```
|
||||
|
||||
Output format:
|
||||
|
||||
```
|
||||
8.8.8.8 -- dns.google | Ports: 53, 443 | CPEs: cpe:/a:isc:bind | Tags: cloud
|
||||
```
|
||||
|
||||
- Single IP per query (IPv4 or IPv6)
|
||||
- Private/loopback addresses are rejected
|
||||
- Hostnames truncated to first 5; CVEs truncated to first 10 (with `+N more`)
|
||||
- CPEs truncated to first 8
|
||||
- All requests routed through SOCKS5 proxy
|
||||
- Returns "no data available" for IPs not in the InternetDB index
|
||||
|
||||
### `!canary` -- Canary Token Generator
|
||||
|
||||
Generate realistic-looking credentials for planting as canary tokens (tripwires
|
||||
for detecting unauthorized access). Tokens are persisted per-channel.
|
||||
|
||||
```
|
||||
!canary gen db-cred Generate default token (40-char hex)
|
||||
!canary gen aws staging-key AWS-style keypair
|
||||
!canary gen basic svc-login Username:password pair
|
||||
!canary list List canaries in channel
|
||||
!canary info db-cred Show full token details
|
||||
!canary del db-cred Delete a canary (admin)
|
||||
```
|
||||
|
||||
Token types:
|
||||
|
||||
| Type | Format | Example |
|
||||
|------|--------|---------|
|
||||
| `token` | 40-char hex (API key / SHA1) | `a3f8b2c1d4e5...` |
|
||||
| `aws` | AKIA access key + base64 secret | `AKIA7X9M2PVL5N...` |
|
||||
| `basic` | user:pass pair | `svcadmin:xK9mP2vL5nR8wQ3z` |
|
||||
|
||||
- `gen` and `del` require admin privileges
|
||||
- All subcommands must be used in a channel (not PM)
|
||||
- Labels: 1-32 chars, alphanumeric + hyphens + underscores
|
||||
- Maximum 50 canaries per channel
|
||||
- Persisted via `bot.state` (survives restarts)
|
||||
|
||||
### `!tcping` -- TCP Connect Latency Probe
|
||||
|
||||
Measure TCP connect latency to a host:port through the SOCKS5 proxy. Sequential
|
||||
probes with min/avg/max summary.
|
||||
|
||||
```
|
||||
!tcping example.com Port 443, 3 probes
|
||||
!tcping example.com 22 Port 22, 3 probes
|
||||
!tcping example.com 80 5 Port 80, 5 probes
|
||||
```
|
||||
|
||||
Output format:
|
||||
|
||||
```
|
||||
tcping example.com:443 -- 3 probes 1: 45ms 2: 43ms 3: 47ms min/avg/max: 43/45/47 ms
|
||||
```
|
||||
|
||||
- Default port: 443, default count: 3
|
||||
- Max count: 10, timeout: 10s per probe
|
||||
- Private/reserved addresses rejected
|
||||
- Routed through SOCKS5 proxy
|
||||
|
||||
### `!archive` -- Wayback Machine Save
|
||||
|
||||
Save a URL to the Wayback Machine via the Save Page Now API.
|
||||
|
||||
```
|
||||
!archive https://example.com/page
|
||||
```
|
||||
|
||||
Output format:
|
||||
|
||||
```
|
||||
Archiving https://example.com/page...
|
||||
Archived: https://web.archive.org/web/20260220.../https://example.com/page
|
||||
```
|
||||
|
||||
- URL must start with `http://` or `https://`
|
||||
- Timeout: 30s (archiving can be slow)
|
||||
- Handles 429 rate limit, 523 origin unreachable
|
||||
- Sends acknowledgment before archiving
|
||||
- Routed through SOCKS5 proxy
|
||||
|
||||
### `!resolve` -- Bulk DNS Resolution
|
||||
|
||||
Resolve multiple hosts via TCP DNS through the SOCKS5 proxy. Concurrent
|
||||
resolution with compact output.
|
||||
|
||||
```
|
||||
!resolve example.com github.com A records (default)
|
||||
!resolve example.com AAAA Specific record type
|
||||
!resolve 1.2.3.4 8.8.8.8 Auto PTR for IPs
|
||||
```
|
||||
|
||||
Output format:
|
||||
|
||||
```
|
||||
example.com -> 93.184.216.34
|
||||
github.com -> 140.82.121.3
|
||||
badhost.invalid -> NXDOMAIN
|
||||
```
|
||||
|
||||
- Max 10 hosts per invocation
|
||||
- Default type: A (auto-detect IP -> PTR)
|
||||
- DNS server: 1.1.1.1 (Cloudflare)
|
||||
- Concurrent via `asyncio.gather()`
|
||||
- Valid types: A, NS, CNAME, SOA, PTR, MX, TXT, AAAA
|
||||
|
||||
### FlaskPaste Configuration
|
||||
|
||||
```toml
|
||||
|
||||
105
plugins/archive.py
Normal file
105
plugins/archive.py
Normal file
@@ -0,0 +1,105 @@
|
||||
"""Plugin: Wayback Machine Save Page Now (SOCKS5-proxied)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
|
||||
from derp.http import urlopen as _urlopen
|
||||
from derp.plugin import command
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
_SAVE_URL = "https://web.archive.org/save/"
|
||||
_TIMEOUT = 30
|
||||
_USER_AGENT = "derp/1.0"
|
||||
|
||||
|
||||
def _save_page(url: str) -> dict:
|
||||
"""Blocking POST to Save Page Now. Returns result dict."""
|
||||
target = f"{_SAVE_URL}{url}"
|
||||
req = urllib.request.Request(
|
||||
target,
|
||||
headers={"User-Agent": _USER_AGENT},
|
||||
)
|
||||
|
||||
try:
|
||||
resp = _urlopen(req, timeout=_TIMEOUT)
|
||||
# The save endpoint returns a redirect to the archived page.
|
||||
# With urllib3 pooled requests, redirects are followed automatically.
|
||||
final_url = getattr(resp, "geturl", lambda: None)()
|
||||
headers = resp.headers if hasattr(resp, "headers") else {}
|
||||
|
||||
# Check for Content-Location or Link header with archived URL
|
||||
content_location = None
|
||||
if hasattr(headers, "get"):
|
||||
content_location = headers.get("Content-Location", "")
|
||||
link = headers.get("Link", "")
|
||||
else:
|
||||
content_location = ""
|
||||
link = ""
|
||||
|
||||
resp.read()
|
||||
|
||||
# Try Content-Location first (most reliable)
|
||||
if content_location and "/web/" in content_location:
|
||||
if content_location.startswith("/"):
|
||||
return {"url": f"https://web.archive.org{content_location}"}
|
||||
return {"url": content_location}
|
||||
|
||||
# Try final URL after redirects
|
||||
if final_url and "/web/" in final_url:
|
||||
return {"url": final_url}
|
||||
|
||||
# Try Link header
|
||||
if link and "/web/" in link:
|
||||
# Extract URL from Link header: <url>; rel="memento"
|
||||
for part in link.split(","):
|
||||
part = part.strip()
|
||||
if "/web/" in part and "<" in part:
|
||||
extracted = part.split("<", 1)[1].split(">", 1)[0]
|
||||
return {"url": extracted}
|
||||
|
||||
# If we got a 200 but no archive URL, report success without link
|
||||
return {"url": f"https://web.archive.org/web/*/{url}"}
|
||||
|
||||
except urllib.error.HTTPError as exc:
|
||||
if exc.code == 429:
|
||||
return {"error": "rate limited -- try again later"}
|
||||
if exc.code == 523:
|
||||
return {"error": "origin unreachable"}
|
||||
return {"error": f"HTTP {exc.code}"}
|
||||
except (TimeoutError, OSError) as exc:
|
||||
return {"error": f"timeout: {exc}"}
|
||||
except Exception as exc:
|
||||
return {"error": str(exc)[:100]}
|
||||
|
||||
|
||||
@command("archive", help="Save to Wayback Machine: !archive <url>")
|
||||
async def cmd_archive(bot, message):
|
||||
"""Save a URL to the Wayback Machine via Save Page Now.
|
||||
|
||||
Usage:
|
||||
!archive https://example.com/page
|
||||
"""
|
||||
parts = message.text.split(None, 1)
|
||||
if len(parts) < 2:
|
||||
await bot.reply(message, "Usage: !archive <url>")
|
||||
return
|
||||
|
||||
url = parts[1].strip()
|
||||
if not url.startswith(("http://", "https://")):
|
||||
await bot.reply(message, "URL must start with http:// or https://")
|
||||
return
|
||||
|
||||
await bot.reply(message, f"Archiving {url}...")
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
result = await loop.run_in_executor(None, _save_page, url)
|
||||
|
||||
if "error" in result:
|
||||
await bot.reply(message, f"Archive failed: {result['error']}")
|
||||
else:
|
||||
await bot.reply(message, f"Archived: {result['url']}")
|
||||
145
plugins/asn.py
145
plugins/asn.py
@@ -1,41 +1,112 @@
|
||||
"""Plugin: ASN lookup using MaxMind GeoLite2-ASN mmdb."""
|
||||
"""Plugin: ASN lookup using iptoasn.com TSV database."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ipaddress
|
||||
import logging
|
||||
import struct
|
||||
from bisect import bisect_right
|
||||
from pathlib import Path
|
||||
|
||||
from derp.plugin import command
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
_DB_PATHS = [
|
||||
Path("data/GeoLite2-ASN.mmdb"),
|
||||
Path("/usr/share/GeoIP/GeoLite2-ASN.mmdb"),
|
||||
Path.home() / ".local" / "share" / "GeoIP" / "GeoLite2-ASN.mmdb",
|
||||
]
|
||||
_DB_PATH = Path("data/ip2asn-v4.tsv")
|
||||
|
||||
_reader = None
|
||||
# Sorted parallel arrays populated by _load_db():
|
||||
# _starts[i] = start IP as 32-bit int
|
||||
# _ends[i] = end IP as 32-bit int
|
||||
# _asns[i] = "AS<number>"
|
||||
# _countries[i] = two-letter country code
|
||||
# _orgs[i] = AS description string
|
||||
_starts: list[int] = []
|
||||
_ends: list[int] = []
|
||||
_asns: list[str] = []
|
||||
_countries: list[str] = []
|
||||
_orgs: list[str] = []
|
||||
|
||||
_loaded = False
|
||||
|
||||
|
||||
def _get_reader():
|
||||
"""Lazy-load the mmdb reader."""
|
||||
global _reader
|
||||
if _reader is not None:
|
||||
return _reader
|
||||
try:
|
||||
import maxminddb
|
||||
except ImportError:
|
||||
log.error("maxminddb package not installed")
|
||||
def _ip_to_int(addr: str) -> int:
|
||||
"""Convert dotted-quad IPv4 string to 32-bit unsigned integer."""
|
||||
return struct.unpack("!I", ipaddress.IPv4Address(addr).packed)[0]
|
||||
|
||||
|
||||
def _load_db(path: Path | None = None) -> bool:
|
||||
"""Load the iptoasn TSV into sorted arrays.
|
||||
|
||||
Returns True if loaded successfully, False otherwise.
|
||||
Rows with ASN 0 ("Not routed") are skipped.
|
||||
"""
|
||||
global _loaded
|
||||
p = path or _DB_PATH
|
||||
if not p.is_file():
|
||||
log.warning("asn: %s not found (run update-data)", p)
|
||||
return False
|
||||
|
||||
starts: list[int] = []
|
||||
ends: list[int] = []
|
||||
asns: list[str] = []
|
||||
countries: list[str] = []
|
||||
orgs: list[str] = []
|
||||
|
||||
with open(p, encoding="utf-8", errors="replace") as fh:
|
||||
for line in fh:
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
parts = line.split("\t")
|
||||
if len(parts) < 5:
|
||||
continue
|
||||
asn_num = parts[2]
|
||||
if asn_num == "0":
|
||||
continue
|
||||
try:
|
||||
start = _ip_to_int(parts[0])
|
||||
end = _ip_to_int(parts[1])
|
||||
except (ValueError, struct.error):
|
||||
continue
|
||||
starts.append(start)
|
||||
ends.append(end)
|
||||
asns.append(f"AS{asn_num}")
|
||||
countries.append(parts[3])
|
||||
orgs.append(parts[4])
|
||||
|
||||
_starts.clear()
|
||||
_ends.clear()
|
||||
_asns.clear()
|
||||
_countries.clear()
|
||||
_orgs.clear()
|
||||
|
||||
_starts.extend(starts)
|
||||
_ends.extend(ends)
|
||||
_asns.extend(asns)
|
||||
_countries.extend(countries)
|
||||
_orgs.extend(orgs)
|
||||
|
||||
_loaded = True
|
||||
log.info("asn: loaded %d ranges from %s", len(_starts), p)
|
||||
return True
|
||||
|
||||
|
||||
def _lookup(addr: str) -> tuple[str, str, str] | None:
|
||||
"""Look up an IPv4 address in the loaded database.
|
||||
|
||||
Returns (asn, org, country) or None if not found.
|
||||
"""
|
||||
if not _loaded:
|
||||
if not _load_db():
|
||||
return None
|
||||
|
||||
ip_int = _ip_to_int(addr)
|
||||
idx = bisect_right(_starts, ip_int) - 1
|
||||
if idx < 0:
|
||||
return None
|
||||
for path in _DB_PATHS:
|
||||
if path.is_file():
|
||||
_reader = maxminddb.open_database(str(path))
|
||||
log.info("asn: loaded %s", path)
|
||||
return _reader
|
||||
log.warning("asn: no GeoLite2-ASN.mmdb found")
|
||||
return None
|
||||
if ip_int > _ends[idx]:
|
||||
return None
|
||||
return _asns[idx], _orgs[idx], _countries[idx]
|
||||
|
||||
|
||||
@command("asn", help="ASN lookup: !asn <ip>")
|
||||
@@ -61,25 +132,17 @@ async def cmd_asn(bot, message):
|
||||
await bot.reply(message, f"{addr}: private/loopback address")
|
||||
return
|
||||
|
||||
reader = _get_reader()
|
||||
if reader is None:
|
||||
await bot.reply(message, "ASN database not available (run update-data)")
|
||||
if ip.version != 4:
|
||||
await bot.reply(message, f"{addr}: only IPv4 supported")
|
||||
return
|
||||
|
||||
try:
|
||||
rec = reader.get(str(ip))
|
||||
except Exception as exc:
|
||||
await bot.reply(message, f"Lookup error: {exc}")
|
||||
result = _lookup(str(ip))
|
||||
if result is None:
|
||||
if not _loaded:
|
||||
await bot.reply(message, "ASN database not available (run update-data)")
|
||||
else:
|
||||
await bot.reply(message, f"{addr}: no ASN data")
|
||||
return
|
||||
|
||||
if not rec:
|
||||
await bot.reply(message, f"{addr}: no ASN data")
|
||||
return
|
||||
|
||||
asn = rec.get("autonomous_system_number", "")
|
||||
org = rec.get("autonomous_system_organization", "")
|
||||
|
||||
if asn:
|
||||
await bot.reply(message, f"{addr}: AS{asn} ({org})" if org else f"{addr}: AS{asn}")
|
||||
else:
|
||||
await bot.reply(message, f"{addr}: no ASN data")
|
||||
asn, org, country = result
|
||||
await bot.reply(message, f"{addr}: {asn} {org} ({country})")
|
||||
|
||||
197
plugins/canary.py
Normal file
197
plugins/canary.py
Normal file
@@ -0,0 +1,197 @@
|
||||
"""Plugin: canary token generator -- plant realistic fake credentials."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import secrets
|
||||
import string
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from derp.plugin import command
|
||||
|
||||
_MAX_PER_CHANNEL = 50
|
||||
|
||||
|
||||
def _gen_token() -> str:
|
||||
"""40-char hex string (looks like API key / SHA1)."""
|
||||
return secrets.token_hex(20)
|
||||
|
||||
|
||||
def _gen_aws() -> dict[str, str]:
|
||||
"""AWS-style keypair: AKIA + 16 alnum access key, 40-char base64 secret."""
|
||||
chars = string.ascii_uppercase + string.digits
|
||||
access = "AKIA" + "".join(secrets.choice(chars) for _ in range(16))
|
||||
# 30 random bytes -> 40-char base64
|
||||
secret = secrets.token_urlsafe(30)
|
||||
return {"access_key": access, "secret_key": secret}
|
||||
|
||||
|
||||
def _gen_basic() -> dict[str, str]:
|
||||
"""Random user:pass pair."""
|
||||
alnum = string.ascii_lowercase + string.digits
|
||||
user = "svc" + "".join(secrets.choice(alnum) for _ in range(5))
|
||||
pw = secrets.token_urlsafe(16)
|
||||
return {"user": user, "pass": pw}
|
||||
|
||||
|
||||
_TYPES = {
|
||||
"token": "API token (40-char hex)",
|
||||
"aws": "AWS keypair (AKIA access + secret)",
|
||||
"basic": "Username:password pair",
|
||||
}
|
||||
|
||||
|
||||
def _load(bot, channel: str) -> dict:
|
||||
"""Load canary store for a channel."""
|
||||
raw = bot.state.get("canary", channel)
|
||||
if not raw:
|
||||
return {}
|
||||
try:
|
||||
return json.loads(raw)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
return {}
|
||||
|
||||
|
||||
def _save(bot, channel: str, store: dict) -> None:
|
||||
"""Persist canary store for a channel."""
|
||||
bot.state.set("canary", channel, json.dumps(store))
|
||||
|
||||
|
||||
def _format_token(entry: dict) -> str:
|
||||
"""Format a canary entry for display."""
|
||||
ttype = entry["type"]
|
||||
value = entry["value"]
|
||||
if ttype == "aws":
|
||||
return f"Access: {value['access_key']} Secret: {value['secret_key']}"
|
||||
if ttype == "basic":
|
||||
return f"{value['user']}:{value['pass']}"
|
||||
return value
|
||||
|
||||
|
||||
@command("canary", help="Canary tokens: !canary gen [type] <label> | list | info | del")
|
||||
async def cmd_canary(bot, message):
|
||||
"""Generate and manage canary tokens (fake credentials for detection).
|
||||
|
||||
Usage:
|
||||
!canary gen [type] <label> Generate token (admin)
|
||||
!canary list List canaries in channel
|
||||
!canary info <label> Show full token details
|
||||
!canary del <label> Delete a canary (admin)
|
||||
"""
|
||||
parts = message.text.split()
|
||||
if len(parts) < 2:
|
||||
await bot.reply(message, "Usage: !canary <gen|list|info|del> [args]")
|
||||
return
|
||||
|
||||
sub = parts[1].lower()
|
||||
channel = message.target if message.is_channel else None
|
||||
|
||||
# ---- gen -----------------------------------------------------------------
|
||||
if sub == "gen":
|
||||
if not bot._is_admin(message):
|
||||
await bot.reply(message, "Permission denied")
|
||||
return
|
||||
if not channel:
|
||||
await bot.reply(message, "Use this command in a channel")
|
||||
return
|
||||
|
||||
# Parse: !canary gen [type] <label>
|
||||
rest = parts[2:]
|
||||
if not rest:
|
||||
types = ", ".join(_TYPES)
|
||||
await bot.reply(message, f"Usage: !canary gen [type] <label> (types: {types})")
|
||||
return
|
||||
|
||||
# Check if first arg is a type
|
||||
ttype = "token"
|
||||
if rest[0].lower() in _TYPES:
|
||||
ttype = rest[0].lower()
|
||||
rest = rest[1:]
|
||||
|
||||
if not rest:
|
||||
await bot.reply(message, "Usage: !canary gen [type] <label>")
|
||||
return
|
||||
|
||||
label = rest[0].lower()
|
||||
if len(label) > 32 or not all(c.isalnum() or c in "-_" for c in label):
|
||||
await bot.reply(message, "Label: 1-32 chars, alphanumeric/hyphens/underscores")
|
||||
return
|
||||
|
||||
store = _load(bot, channel)
|
||||
if label in store:
|
||||
await bot.reply(message, f"Canary '{label}' already exists")
|
||||
return
|
||||
if len(store) >= _MAX_PER_CHANNEL:
|
||||
await bot.reply(message, f"Limit reached ({_MAX_PER_CHANNEL} per channel)")
|
||||
return
|
||||
|
||||
# Generate
|
||||
if ttype == "aws":
|
||||
value = _gen_aws()
|
||||
elif ttype == "basic":
|
||||
value = _gen_basic()
|
||||
else:
|
||||
value = _gen_token()
|
||||
|
||||
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
|
||||
store[label] = {"type": ttype, "value": value, "created": now}
|
||||
_save(bot, channel, store)
|
||||
|
||||
display = _format_token(store[label])
|
||||
await bot.reply(message, f"Canary '{label}' ({ttype}): {display}")
|
||||
return
|
||||
|
||||
# ---- list ----------------------------------------------------------------
|
||||
if sub == "list":
|
||||
if not channel:
|
||||
await bot.reply(message, "Use this command in a channel")
|
||||
return
|
||||
store = _load(bot, channel)
|
||||
if not store:
|
||||
await bot.reply(message, "No canaries in this channel")
|
||||
return
|
||||
items = [f"{lbl} ({e['type']})" for lbl, e in sorted(store.items())]
|
||||
await bot.reply(message, f"Canaries: {', '.join(items)}")
|
||||
return
|
||||
|
||||
# ---- info ----------------------------------------------------------------
|
||||
if sub == "info":
|
||||
if not channel:
|
||||
await bot.reply(message, "Use this command in a channel")
|
||||
return
|
||||
if len(parts) < 3:
|
||||
await bot.reply(message, "Usage: !canary info <label>")
|
||||
return
|
||||
label = parts[2].lower()
|
||||
store = _load(bot, channel)
|
||||
entry = store.get(label)
|
||||
if not entry:
|
||||
await bot.reply(message, f"No canary '{label}'")
|
||||
return
|
||||
display = _format_token(entry)
|
||||
await bot.reply(message, f"{label} ({entry['type']}, {entry['created']}): {display}")
|
||||
return
|
||||
|
||||
# ---- del -----------------------------------------------------------------
|
||||
if sub == "del":
|
||||
if not bot._is_admin(message):
|
||||
await bot.reply(message, "Permission denied")
|
||||
return
|
||||
if not channel:
|
||||
await bot.reply(message, "Use this command in a channel")
|
||||
return
|
||||
if len(parts) < 3:
|
||||
await bot.reply(message, "Usage: !canary del <label>")
|
||||
return
|
||||
label = parts[2].lower()
|
||||
store = _load(bot, channel)
|
||||
if label not in store:
|
||||
await bot.reply(message, f"No canary '{label}'")
|
||||
return
|
||||
del store[label]
|
||||
_save(bot, channel, store)
|
||||
await bot.reply(message, f"Deleted canary '{label}'")
|
||||
return
|
||||
|
||||
# ---- unknown -------------------------------------------------------------
|
||||
await bot.reply(message, "Usage: !canary <gen|list|info|del> [args]")
|
||||
105
plugins/internetdb.py
Normal file
105
plugins/internetdb.py
Normal file
@@ -0,0 +1,105 @@
|
||||
"""Plugin: Shodan InternetDB -- free host reconnaissance (no API key)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import ipaddress
|
||||
import json
|
||||
import logging
|
||||
|
||||
from derp.http import urlopen as _urlopen
|
||||
from derp.plugin import command
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
_API_URL = "https://internetdb.shodan.io"
|
||||
_TIMEOUT = 15
|
||||
|
||||
|
||||
def _fetch(addr: str) -> dict | None:
|
||||
"""Fetch InternetDB data for an IP address.
|
||||
|
||||
Returns parsed JSON dict, or None on 404 (no data).
|
||||
Raises on network/server errors.
|
||||
"""
|
||||
import urllib.error
|
||||
|
||||
try:
|
||||
resp = _urlopen(f"{_API_URL}/{addr}", timeout=_TIMEOUT)
|
||||
return json.loads(resp.read())
|
||||
except urllib.error.HTTPError as exc:
|
||||
if exc.code == 404:
|
||||
return None
|
||||
raise
|
||||
|
||||
|
||||
def _format_result(addr: str, data: dict) -> str:
|
||||
"""Format InternetDB response into a compact IRC message."""
|
||||
lines = []
|
||||
|
||||
hostnames = data.get("hostnames", [])
|
||||
if hostnames:
|
||||
lines.append(f"{addr} -- {', '.join(hostnames[:5])}")
|
||||
else:
|
||||
lines.append(addr)
|
||||
|
||||
ports = data.get("ports", [])
|
||||
if ports:
|
||||
lines.append(f"Ports: {', '.join(str(p) for p in sorted(ports))}")
|
||||
|
||||
cpes = data.get("cpes", [])
|
||||
if cpes:
|
||||
lines.append(f"CPEs: {', '.join(cpes[:8])}")
|
||||
|
||||
tags = data.get("tags", [])
|
||||
if tags:
|
||||
lines.append(f"Tags: {', '.join(tags)}")
|
||||
|
||||
vulns = data.get("vulns", [])
|
||||
if vulns:
|
||||
shown = vulns[:10]
|
||||
suffix = f" (+{len(vulns) - 10} more)" if len(vulns) > 10 else ""
|
||||
lines.append(f"CVEs: {', '.join(shown)}{suffix}")
|
||||
|
||||
return " | ".join(lines)
|
||||
|
||||
|
||||
@command("internetdb", help="Shodan InternetDB: !internetdb <ip>")
|
||||
async def cmd_internetdb(bot, message):
|
||||
"""Look up host information from Shodan InternetDB.
|
||||
|
||||
Returns open ports, hostnames, CPEs, tags, and known CVEs.
|
||||
Free API, no key required.
|
||||
|
||||
Usage:
|
||||
!internetdb 8.8.8.8
|
||||
"""
|
||||
parts = message.text.split(None, 2)
|
||||
if len(parts) < 2:
|
||||
await bot.reply(message, "Usage: !internetdb <ip>")
|
||||
return
|
||||
|
||||
addr = parts[1].strip()
|
||||
try:
|
||||
ip = ipaddress.ip_address(addr)
|
||||
except ValueError:
|
||||
await bot.reply(message, f"Invalid IP address: {addr}")
|
||||
return
|
||||
|
||||
if ip.is_private or ip.is_loopback:
|
||||
await bot.reply(message, f"{addr}: private/loopback address")
|
||||
return
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
try:
|
||||
data = await loop.run_in_executor(None, _fetch, str(ip))
|
||||
except Exception as exc:
|
||||
log.error("internetdb: lookup failed for %s: %s", addr, exc)
|
||||
await bot.reply(message, f"{addr}: lookup failed ({exc})")
|
||||
return
|
||||
|
||||
if data is None:
|
||||
await bot.reply(message, f"{addr}: no data available")
|
||||
return
|
||||
|
||||
await bot.reply(message, _format_result(str(ip), data))
|
||||
115
plugins/resolve.py
Normal file
115
plugins/resolve.py
Normal file
@@ -0,0 +1,115 @@
|
||||
"""Plugin: bulk DNS resolution over TCP (SOCKS5-proxied)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import ipaddress
|
||||
import struct
|
||||
|
||||
from derp.dns import (
|
||||
QTYPES,
|
||||
RCODES,
|
||||
build_query,
|
||||
parse_response,
|
||||
reverse_name,
|
||||
)
|
||||
from derp.http import open_connection as _open_connection
|
||||
from derp.plugin import command
|
||||
|
||||
_DEFAULT_SERVER = "1.1.1.1"
|
||||
_TIMEOUT = 5.0
|
||||
_MAX_HOSTS = 10
|
||||
|
||||
|
||||
async def _query_tcp(name: str, qtype: int, server: str,
|
||||
timeout: float = _TIMEOUT) -> tuple[int, list[str]]:
|
||||
"""Send a DNS query over TCP and return (rcode, [values])."""
|
||||
reader, writer = await asyncio.wait_for(
|
||||
_open_connection(server, 53, timeout=timeout), timeout=timeout,
|
||||
)
|
||||
try:
|
||||
pkt = build_query(name, qtype)
|
||||
writer.write(struct.pack("!H", len(pkt)) + pkt)
|
||||
await writer.drain()
|
||||
length = struct.unpack("!H", await reader.readexactly(2))[0]
|
||||
data = await reader.readexactly(length)
|
||||
return parse_response(data)
|
||||
finally:
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
|
||||
|
||||
async def _resolve_one(host: str, qtype_str: str,
|
||||
server: str) -> str:
|
||||
"""Resolve a single host, return formatted result line."""
|
||||
qtype = QTYPES.get(qtype_str)
|
||||
lookup = host
|
||||
|
||||
if qtype_str == "PTR":
|
||||
try:
|
||||
lookup = reverse_name(host)
|
||||
except ValueError:
|
||||
return f"{host} -> invalid IP for PTR"
|
||||
|
||||
try:
|
||||
rcode, results = await _query_tcp(lookup, qtype, server)
|
||||
except (TimeoutError, asyncio.TimeoutError):
|
||||
return f"{host} -> timeout"
|
||||
except OSError as exc:
|
||||
return f"{host} -> error: {exc}"
|
||||
|
||||
if rcode != 0:
|
||||
err = RCODES.get(rcode, f"error {rcode}")
|
||||
return f"{host} -> {err}"
|
||||
if not results:
|
||||
return f"{host} -> no records"
|
||||
return f"{host} -> {', '.join(results)}"
|
||||
|
||||
|
||||
@command("resolve", help="Bulk DNS: !resolve <host> [host2 ...] [type]")
|
||||
async def cmd_resolve(bot, message):
|
||||
"""Bulk DNS resolution via TCP through SOCKS5 proxy.
|
||||
|
||||
Usage:
|
||||
!resolve example.com github.com (A records)
|
||||
!resolve example.com AAAA (specific type)
|
||||
!resolve 1.2.3.4 8.8.8.8 (auto PTR)
|
||||
"""
|
||||
parts = message.text.split()
|
||||
if len(parts) < 2:
|
||||
await bot.reply(message, "Usage: !resolve <host> [host2 ...] [type]")
|
||||
return
|
||||
|
||||
args = parts[1:]
|
||||
|
||||
# Check if last arg is a record type
|
||||
qtype_str = None
|
||||
if args[-1].upper() in QTYPES:
|
||||
qtype_str = args[-1].upper()
|
||||
args = args[:-1]
|
||||
|
||||
if not args:
|
||||
await bot.reply(message, "Usage: !resolve <host> [host2 ...] [type]")
|
||||
return
|
||||
|
||||
hosts = args[:_MAX_HOSTS]
|
||||
|
||||
# Auto-detect type per host if not specified
|
||||
async def _do(host: str) -> str:
|
||||
qt = qtype_str
|
||||
if qt is None:
|
||||
try:
|
||||
ipaddress.ip_address(host)
|
||||
qt = "PTR"
|
||||
except ValueError:
|
||||
qt = "A"
|
||||
return await _resolve_one(host, qt, _DEFAULT_SERVER)
|
||||
|
||||
results = await asyncio.gather(*[_do(h) for h in hosts])
|
||||
|
||||
lines = list(results)
|
||||
if len(args) > _MAX_HOSTS:
|
||||
lines.append(f"(showing first {_MAX_HOSTS} of {len(args)})")
|
||||
|
||||
for line in lines:
|
||||
await bot.reply(message, line)
|
||||
116
plugins/tcping.py
Normal file
116
plugins/tcping.py
Normal file
@@ -0,0 +1,116 @@
|
||||
"""Plugin: TCP connect latency probe (SOCKS5-proxied)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import ipaddress
|
||||
import time
|
||||
|
||||
from derp.http import open_connection as _open_connection
|
||||
from derp.plugin import command
|
||||
|
||||
_TIMEOUT = 10.0
|
||||
_MAX_COUNT = 10
|
||||
_DEFAULT_COUNT = 3
|
||||
_DEFAULT_PORT = 443
|
||||
|
||||
|
||||
def _is_internal(host: str) -> bool:
|
||||
"""Check if host is a private/reserved address."""
|
||||
try:
|
||||
ip = ipaddress.ip_address(host)
|
||||
return ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def _validate_host(host: str) -> bool:
|
||||
"""Check that host is an IP or looks like a domain."""
|
||||
try:
|
||||
ipaddress.ip_address(host)
|
||||
return True
|
||||
except ValueError:
|
||||
pass
|
||||
return "." in host and all(c.isalnum() or c in ".-" for c in host)
|
||||
|
||||
|
||||
async def _probe(host: str, port: int, timeout: float) -> float | None:
|
||||
"""Single TCP connect probe. Returns RTT in ms or None on failure."""
|
||||
t0 = time.perf_counter()
|
||||
try:
|
||||
_, writer = await asyncio.wait_for(
|
||||
_open_connection(host, port, timeout=timeout), timeout=timeout,
|
||||
)
|
||||
rtt = (time.perf_counter() - t0) * 1000
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
return rtt
|
||||
except (OSError, asyncio.TimeoutError, TimeoutError):
|
||||
return None
|
||||
|
||||
|
||||
@command("tcping", help="TCP latency: !tcping <host> [port] [count]")
|
||||
async def cmd_tcping(bot, message):
|
||||
"""Measure TCP connect latency to a host:port through SOCKS5 proxy.
|
||||
|
||||
Usage:
|
||||
!tcping example.com (port 443, 3 probes)
|
||||
!tcping example.com 22 (port 22, 3 probes)
|
||||
!tcping example.com 80 5 (port 80, 5 probes)
|
||||
"""
|
||||
parts = message.text.split()
|
||||
if len(parts) < 2:
|
||||
await bot.reply(message, "Usage: !tcping <host> [port] [count]")
|
||||
return
|
||||
|
||||
host = parts[1]
|
||||
if not _validate_host(host):
|
||||
await bot.reply(message, f"Invalid host: {host}")
|
||||
return
|
||||
|
||||
if _is_internal(host):
|
||||
await bot.reply(message, f"Refused: {host} is an internal/reserved address")
|
||||
return
|
||||
|
||||
port = _DEFAULT_PORT
|
||||
count = _DEFAULT_COUNT
|
||||
|
||||
if len(parts) > 2:
|
||||
try:
|
||||
port = int(parts[2])
|
||||
if port < 1 or port > 65535:
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
await bot.reply(message, f"Invalid port: {parts[2]}")
|
||||
return
|
||||
|
||||
if len(parts) > 3:
|
||||
try:
|
||||
count = int(parts[3])
|
||||
count = max(1, min(count, _MAX_COUNT))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
results: list[float | None] = []
|
||||
for _ in range(count):
|
||||
rtt = await _probe(host, port, _TIMEOUT)
|
||||
results.append(rtt)
|
||||
|
||||
successes = [r for r in results if r is not None]
|
||||
|
||||
if not successes:
|
||||
await bot.reply(message, f"tcping {host}:{port} -- {count} probes, all timed out")
|
||||
return
|
||||
|
||||
probe_strs = []
|
||||
for i, r in enumerate(results, 1):
|
||||
probe_strs.append(f"{i}: {r:.0f}ms" if r is not None else f"{i}: timeout")
|
||||
|
||||
mn = min(successes)
|
||||
avg = sum(successes) / len(successes)
|
||||
mx = max(successes)
|
||||
|
||||
header = f"tcping {host}:{port} -- {count} probes"
|
||||
probes = " ".join(probe_strs)
|
||||
summary = f"min/avg/max: {mn:.0f}/{avg:.0f}/{mx:.0f} ms"
|
||||
await bot.reply(message, f"{header} {probes} {summary}")
|
||||
@@ -24,7 +24,9 @@ _VIDEO_ID_RE = re.compile(r"(?:v=|youtu\.be/|/embed/|/shorts/)([A-Za-z0-9_-]{11}
|
||||
_YT_DOMAINS = {"youtube.com", "www.youtube.com", "m.youtube.com", "youtu.be"}
|
||||
_YT_FEED_URL = "https://www.youtube.com/feeds/videos.xml?channel_id={}"
|
||||
_YT_PLAYER_URL = "https://www.youtube.com/youtubei/v1/player"
|
||||
_YT_CLIENT_VERSION = "2.20250101.00.00"
|
||||
_ANDROID_VERSION = "19.29.37"
|
||||
_ANDROID_SDK = 33
|
||||
_ANDROID_UA = f"com.google.android.youtube/{_ANDROID_VERSION} (Linux; U; Android 13)"
|
||||
_ATOM_NS = "{http://www.w3.org/2005/Atom}"
|
||||
_YT_NS = "{http://www.youtube.com/xml/schemas/2015}"
|
||||
_MEDIA_NS = "{http://search.yahoo.com/mrss/}"
|
||||
@@ -107,33 +109,41 @@ def _extract_video_id(url: str) -> str | None:
|
||||
|
||||
# -- Blocking helpers (for executor) -----------------------------------------
|
||||
|
||||
def _resolve_via_innertube(video_id: str) -> str | None:
|
||||
"""Resolve video ID to channel ID via InnerTube player API. Blocking.
|
||||
def _innertube_player(video_id: str) -> dict:
|
||||
"""Fetch videoDetails via InnerTube ANDROID client. Blocking.
|
||||
|
||||
Small JSON request/response -- much more resilient to transient proxy
|
||||
issues than fetching the full 1MB watch page.
|
||||
Uses ANDROID client -- WEB client returns LOGIN_REQUIRED since ~2026-02.
|
||||
Returns videoDetails dict, or {} on failure.
|
||||
"""
|
||||
payload = json.dumps({
|
||||
"context": {
|
||||
"client": {
|
||||
"clientName": "WEB",
|
||||
"clientVersion": _YT_CLIENT_VERSION,
|
||||
"clientName": "ANDROID",
|
||||
"clientVersion": _ANDROID_VERSION,
|
||||
"androidSdkVersion": _ANDROID_SDK,
|
||||
},
|
||||
},
|
||||
"videoId": video_id,
|
||||
}).encode()
|
||||
req = urllib.request.Request(_YT_PLAYER_URL, data=payload, method="POST")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
req.add_header("User-Agent", _ANDROID_UA)
|
||||
try:
|
||||
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
|
||||
raw = resp.read()
|
||||
resp.close()
|
||||
data = json.loads(raw)
|
||||
channel_id = (data.get("videoDetails") or {}).get("channelId", "")
|
||||
if channel_id and _CHANNEL_ID_RE.fullmatch(channel_id):
|
||||
return channel_id
|
||||
return data.get("videoDetails") or {}
|
||||
except Exception:
|
||||
pass
|
||||
return {}
|
||||
|
||||
|
||||
def _resolve_via_innertube(video_id: str) -> str | None:
|
||||
"""Resolve video ID to channel ID via InnerTube player API. Blocking."""
|
||||
details = _innertube_player(video_id)
|
||||
channel_id = details.get("channelId", "")
|
||||
if channel_id and _CHANNEL_ID_RE.fullmatch(channel_id):
|
||||
return channel_id
|
||||
return None
|
||||
|
||||
|
||||
@@ -142,28 +152,14 @@ def _fetch_duration(video_id: str) -> int:
|
||||
|
||||
Returns 0 on failure or for live content.
|
||||
"""
|
||||
payload = json.dumps({
|
||||
"context": {
|
||||
"client": {
|
||||
"clientName": "WEB",
|
||||
"clientVersion": _YT_CLIENT_VERSION,
|
||||
},
|
||||
},
|
||||
"videoId": video_id,
|
||||
}).encode()
|
||||
req = urllib.request.Request(_YT_PLAYER_URL, data=payload, method="POST")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
details = _innertube_player(video_id)
|
||||
if not details:
|
||||
return 0
|
||||
if details.get("isLiveContent") and details.get("isLive"):
|
||||
return 0
|
||||
try:
|
||||
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
|
||||
raw = resp.read()
|
||||
resp.close()
|
||||
data = json.loads(raw)
|
||||
details = data.get("videoDetails") or {}
|
||||
if details.get("isLiveContent") and details.get("isLive"):
|
||||
return 0
|
||||
secs = int(details.get("lengthSeconds", 0))
|
||||
return secs
|
||||
except Exception:
|
||||
return int(details.get("lengthSeconds", 0))
|
||||
except (ValueError, TypeError):
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
@@ -124,6 +124,26 @@ update_geolite2() {
|
||||
done
|
||||
}
|
||||
|
||||
# -- iptoasn ASN database -----------------------------------------------------
|
||||
update_asn() {
|
||||
local dest="$DATA_DIR/ip2asn-v4.tsv"
|
||||
local url="https://iptoasn.com/data/ip2asn-v4.tsv.gz"
|
||||
mkdir -p "$DATA_DIR"
|
||||
dim "Downloading iptoasn database..."
|
||||
if curl -sS -fL --max-time 60 -o "$dest.gz" "$url" ||
|
||||
curl -sS -fL --socks5-hostname 127.0.0.1:1080 --max-time 60 \
|
||||
-o "$dest.gz" "$url"; then
|
||||
gunzip -f "$dest.gz"
|
||||
local count
|
||||
count=$(wc -l < "$dest")
|
||||
info "iptoasn: $count ranges"
|
||||
else
|
||||
rm -f "$dest.gz"
|
||||
err "Failed to download iptoasn database"
|
||||
((FAILURES++)) || true
|
||||
fi
|
||||
}
|
||||
|
||||
# -- Exploit-DB CSV -----------------------------------------------------------
|
||||
update_exploitdb() {
|
||||
local dest_dir="$DATA_DIR/exploitdb"
|
||||
@@ -151,6 +171,7 @@ echo
|
||||
update_tor
|
||||
update_iprep
|
||||
update_oui
|
||||
update_asn
|
||||
update_exploitdb
|
||||
update_geolite2
|
||||
|
||||
|
||||
150
tests/test_archive.py
Normal file
150
tests/test_archive.py
Normal file
@@ -0,0 +1,150 @@
|
||||
"""Tests for the Wayback Machine archive plugin."""
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import sys
|
||||
import urllib.error
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from derp.irc import Message
|
||||
|
||||
# plugins/ is not a Python package -- load the module from file path
|
||||
_spec = importlib.util.spec_from_file_location(
|
||||
"plugins.archive", Path(__file__).resolve().parent.parent / "plugins" / "archive.py",
|
||||
)
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules[_spec.name] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
from plugins.archive import _save_page, cmd_archive # noqa: E402
|
||||
|
||||
# -- Helpers -----------------------------------------------------------------
|
||||
|
||||
class _FakeBot:
|
||||
def __init__(self):
|
||||
self.replied: list[str] = []
|
||||
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
|
||||
def _msg(text: str) -> Message:
|
||||
return Message(
|
||||
raw="", prefix="alice!~alice@host", nick="alice",
|
||||
command="PRIVMSG", params=["#test", text], tags={},
|
||||
)
|
||||
|
||||
|
||||
# -- _save_page --------------------------------------------------------------
|
||||
|
||||
class TestSavePage:
|
||||
def test_content_location_header(self):
|
||||
resp = MagicMock()
|
||||
resp.headers = {"Content-Location": "/web/20260220/https://example.com"}
|
||||
resp.read.return_value = b""
|
||||
resp.geturl.return_value = "https://web.archive.org/save/https://example.com"
|
||||
|
||||
with patch.object(_mod, "_urlopen", return_value=resp):
|
||||
result = _save_page("https://example.com")
|
||||
|
||||
assert "url" in result
|
||||
assert "/web/20260220" in result["url"]
|
||||
|
||||
def test_final_url_redirect(self):
|
||||
resp = MagicMock()
|
||||
resp.headers = {}
|
||||
resp.read.return_value = b""
|
||||
resp.geturl.return_value = "https://web.archive.org/web/20260220/https://example.com"
|
||||
|
||||
with patch.object(_mod, "_urlopen", return_value=resp):
|
||||
result = _save_page("https://example.com")
|
||||
|
||||
assert "url" in result
|
||||
assert "/web/20260220" in result["url"]
|
||||
|
||||
def test_fallback_url(self):
|
||||
resp = MagicMock()
|
||||
resp.headers = {}
|
||||
resp.read.return_value = b""
|
||||
resp.geturl.return_value = "https://web.archive.org/save/ok"
|
||||
|
||||
with patch.object(_mod, "_urlopen", return_value=resp):
|
||||
result = _save_page("https://example.com")
|
||||
|
||||
assert "url" in result
|
||||
assert "/web/*/" in result["url"]
|
||||
|
||||
def test_rate_limit(self):
|
||||
exc = urllib.error.HTTPError(
|
||||
"url", 429, "Too Many Requests", {}, None,
|
||||
)
|
||||
with patch.object(_mod, "_urlopen", side_effect=exc):
|
||||
result = _save_page("https://example.com")
|
||||
|
||||
assert "error" in result
|
||||
assert "rate limit" in result["error"]
|
||||
|
||||
def test_origin_unreachable(self):
|
||||
exc = urllib.error.HTTPError(
|
||||
"url", 523, "Origin Unreachable", {}, None,
|
||||
)
|
||||
with patch.object(_mod, "_urlopen", side_effect=exc):
|
||||
result = _save_page("https://example.com")
|
||||
|
||||
assert "error" in result
|
||||
assert "unreachable" in result["error"]
|
||||
|
||||
def test_generic_http_error(self):
|
||||
exc = urllib.error.HTTPError(
|
||||
"url", 500, "Server Error", {}, None,
|
||||
)
|
||||
with patch.object(_mod, "_urlopen", side_effect=exc):
|
||||
result = _save_page("https://example.com")
|
||||
|
||||
assert "error" in result
|
||||
assert "500" in result["error"]
|
||||
|
||||
def test_timeout(self):
|
||||
with patch.object(_mod, "_urlopen", side_effect=TimeoutError("timed out")):
|
||||
result = _save_page("https://example.com")
|
||||
|
||||
assert "error" in result
|
||||
assert "timeout" in result["error"]
|
||||
|
||||
|
||||
# -- Command handler ---------------------------------------------------------
|
||||
|
||||
class TestCmdArchive:
|
||||
def test_no_args(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_archive(bot, _msg("!archive")))
|
||||
assert "Usage" in bot.replied[0]
|
||||
|
||||
def test_no_scheme(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_archive(bot, _msg("!archive example.com")))
|
||||
assert "http://" in bot.replied[0]
|
||||
|
||||
def test_success(self):
|
||||
bot = _FakeBot()
|
||||
result = {"url": "https://web.archive.org/web/20260220/https://example.com"}
|
||||
|
||||
with patch.object(_mod, "_save_page", return_value=result):
|
||||
asyncio.run(cmd_archive(bot, _msg("!archive https://example.com")))
|
||||
|
||||
assert len(bot.replied) == 2
|
||||
assert "Archiving" in bot.replied[0]
|
||||
assert "Archived:" in bot.replied[1]
|
||||
assert "/web/20260220" in bot.replied[1]
|
||||
|
||||
def test_error(self):
|
||||
bot = _FakeBot()
|
||||
result = {"error": "rate limited -- try again later"}
|
||||
|
||||
with patch.object(_mod, "_save_page", return_value=result):
|
||||
asyncio.run(cmd_archive(bot, _msg("!archive https://example.com")))
|
||||
|
||||
assert len(bot.replied) == 2
|
||||
assert "failed" in bot.replied[1].lower()
|
||||
assert "rate limit" in bot.replied[1]
|
||||
258
tests/test_asn.py
Normal file
258
tests/test_asn.py
Normal file
@@ -0,0 +1,258 @@
|
||||
"""Tests for the ASN lookup plugin (iptoasn.com TSV backend)."""
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
from derp.irc import Message
|
||||
|
||||
# plugins/ is not a Python package -- load the module from file path
|
||||
_spec = importlib.util.spec_from_file_location(
|
||||
"plugins.asn", Path(__file__).resolve().parent.parent / "plugins" / "asn.py",
|
||||
)
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules[_spec.name] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
from plugins.asn import ( # noqa: E402
|
||||
_ip_to_int,
|
||||
_load_db,
|
||||
_lookup,
|
||||
cmd_asn,
|
||||
)
|
||||
|
||||
# -- Sample TSV data ---------------------------------------------------------
|
||||
|
||||
SAMPLE_TSV = """\
|
||||
1.0.0.0\t1.0.0.255\t13335\tUS\tCLOUDFLARENET
|
||||
1.0.1.0\t1.0.3.255\t0\tNone\tNot routed
|
||||
1.0.4.0\t1.0.7.255\t56203\tAU\tGTELECOM
|
||||
8.8.8.0\t8.8.8.255\t15169\tUS\tGOOGLE
|
||||
"""
|
||||
|
||||
|
||||
# -- Helpers -----------------------------------------------------------------
|
||||
|
||||
class _FakeBot:
|
||||
def __init__(self):
|
||||
self.replied: list[str] = []
|
||||
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
|
||||
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
|
||||
return Message(
|
||||
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
||||
command="PRIVMSG", params=[target, text], tags={},
|
||||
)
|
||||
|
||||
|
||||
def _reset_db():
|
||||
"""Clear module-level DB state between tests."""
|
||||
_mod._starts.clear()
|
||||
_mod._ends.clear()
|
||||
_mod._asns.clear()
|
||||
_mod._countries.clear()
|
||||
_mod._orgs.clear()
|
||||
_mod._loaded = False
|
||||
|
||||
|
||||
def _load_sample(tsv: str = SAMPLE_TSV) -> Path:
|
||||
"""Write TSV to a temp file, load it, return the path."""
|
||||
_reset_db()
|
||||
tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".tsv", delete=False)
|
||||
tmp.write(tsv)
|
||||
tmp.flush()
|
||||
tmp.close()
|
||||
p = Path(tmp.name)
|
||||
_load_db(p)
|
||||
return p
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestIpToInt
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestIpToInt:
|
||||
def test_zero(self):
|
||||
assert _ip_to_int("0.0.0.0") == 0
|
||||
|
||||
def test_one(self):
|
||||
assert _ip_to_int("0.0.0.1") == 1
|
||||
|
||||
def test_max(self):
|
||||
assert _ip_to_int("255.255.255.255") == 0xFFFFFFFF
|
||||
|
||||
def test_known(self):
|
||||
assert _ip_to_int("1.0.0.0") == 0x01000000
|
||||
|
||||
def test_google_dns(self):
|
||||
assert _ip_to_int("8.8.8.8") == 0x08080808
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestLoad
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLoad:
|
||||
def test_loads_rows(self):
|
||||
_load_sample()
|
||||
# 4 rows in TSV, but ASN 0 is skipped -> 3 entries
|
||||
assert len(_mod._starts) == 3
|
||||
|
||||
def test_skips_asn_zero(self):
|
||||
_load_sample()
|
||||
for asn in _mod._asns:
|
||||
assert asn != "AS0"
|
||||
|
||||
def test_first_entry(self):
|
||||
_load_sample()
|
||||
assert _mod._asns[0] == "AS13335"
|
||||
assert _mod._orgs[0] == "CLOUDFLARENET"
|
||||
assert _mod._countries[0] == "US"
|
||||
|
||||
def test_missing_file_returns_false(self):
|
||||
_reset_db()
|
||||
result = _load_db(Path("/nonexistent/path.tsv"))
|
||||
assert result is False
|
||||
assert not _mod._loaded
|
||||
|
||||
def test_empty_file(self):
|
||||
_reset_db()
|
||||
tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".tsv", delete=False)
|
||||
tmp.write("")
|
||||
tmp.close()
|
||||
result = _load_db(Path(tmp.name))
|
||||
assert result is True
|
||||
assert len(_mod._starts) == 0
|
||||
|
||||
def test_skips_comments_and_blanks(self):
|
||||
tsv = "# comment\n\n1.0.0.0\t1.0.0.255\t13335\tUS\tCLOUD\n"
|
||||
_load_sample(tsv)
|
||||
assert len(_mod._starts) == 1
|
||||
|
||||
def test_skips_malformed_rows(self):
|
||||
tsv = "bad\tdata\n1.0.0.0\t1.0.0.255\t13335\tUS\tCLOUD\n"
|
||||
_load_sample(tsv)
|
||||
assert len(_mod._starts) == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestLookup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLookup:
|
||||
def setup_method(self):
|
||||
_load_sample()
|
||||
|
||||
def test_exact_start(self):
|
||||
result = _lookup("1.0.0.0")
|
||||
assert result is not None
|
||||
asn, org, country = result
|
||||
assert asn == "AS13335"
|
||||
assert org == "CLOUDFLARENET"
|
||||
assert country == "US"
|
||||
|
||||
def test_mid_range(self):
|
||||
result = _lookup("1.0.0.128")
|
||||
assert result is not None
|
||||
assert result[0] == "AS13335"
|
||||
|
||||
def test_exact_end(self):
|
||||
result = _lookup("1.0.0.255")
|
||||
assert result is not None
|
||||
assert result[0] == "AS13335"
|
||||
|
||||
def test_second_range(self):
|
||||
result = _lookup("1.0.5.0")
|
||||
assert result is not None
|
||||
assert result[0] == "AS56203"
|
||||
assert result[2] == "AU"
|
||||
|
||||
def test_google_dns(self):
|
||||
result = _lookup("8.8.8.8")
|
||||
assert result is not None
|
||||
assert result[0] == "AS15169"
|
||||
assert result[1] == "GOOGLE"
|
||||
|
||||
def test_miss_gap(self):
|
||||
"""IP in the not-routed gap (ASN 0 range) returns None."""
|
||||
result = _lookup("1.0.1.0")
|
||||
assert result is None
|
||||
|
||||
def test_miss_below_first(self):
|
||||
result = _lookup("0.255.255.255")
|
||||
assert result is None
|
||||
|
||||
def test_miss_above_last(self):
|
||||
result = _lookup("8.8.9.0")
|
||||
assert result is None
|
||||
|
||||
def test_db_not_loaded(self):
|
||||
_reset_db()
|
||||
with patch.object(_mod, "_DB_PATH", Path("/nonexistent")):
|
||||
result = _lookup("1.0.0.0")
|
||||
assert result is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCommand
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCommand:
|
||||
def setup_method(self):
|
||||
_load_sample()
|
||||
|
||||
def test_valid_ip(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_asn(bot, _msg("!asn 1.0.0.1")))
|
||||
assert "AS13335" in bot.replied[0]
|
||||
assert "CLOUDFLARENET" in bot.replied[0]
|
||||
assert "(US)" in bot.replied[0]
|
||||
|
||||
def test_google_dns(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_asn(bot, _msg("!asn 8.8.8.8")))
|
||||
assert "AS15169" in bot.replied[0]
|
||||
assert "GOOGLE" in bot.replied[0]
|
||||
|
||||
def test_private_ip(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_asn(bot, _msg("!asn 192.168.1.1")))
|
||||
assert "private/loopback" in bot.replied[0]
|
||||
|
||||
def test_loopback(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_asn(bot, _msg("!asn 127.0.0.1")))
|
||||
assert "private/loopback" in bot.replied[0]
|
||||
|
||||
def test_invalid_input(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_asn(bot, _msg("!asn notanip")))
|
||||
assert "Invalid IP" in bot.replied[0]
|
||||
|
||||
def test_no_args(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_asn(bot, _msg("!asn")))
|
||||
assert "Usage:" in bot.replied[0]
|
||||
|
||||
def test_ipv6_rejected(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_asn(bot, _msg("!asn 2606:4700::1")))
|
||||
assert "only IPv4" in bot.replied[0]
|
||||
|
||||
def test_no_match(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_asn(bot, _msg("!asn 200.200.200.200")))
|
||||
assert "no ASN data" in bot.replied[0]
|
||||
|
||||
def test_db_missing(self):
|
||||
_reset_db()
|
||||
bot = _FakeBot()
|
||||
with patch.object(_mod, "_DB_PATH", Path("/nonexistent")):
|
||||
asyncio.run(cmd_asn(bot, _msg("!asn 1.0.0.1")))
|
||||
assert "not available" in bot.replied[0]
|
||||
302
tests/test_canary.py
Normal file
302
tests/test_canary.py
Normal file
@@ -0,0 +1,302 @@
|
||||
"""Tests for the canary token generator plugin."""
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from derp.irc import Message
|
||||
|
||||
# plugins/ is not a Python package -- load the module from file path
|
||||
_spec = importlib.util.spec_from_file_location(
|
||||
"plugins.canary", Path(__file__).resolve().parent.parent / "plugins" / "canary.py",
|
||||
)
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules[_spec.name] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
from plugins.canary import ( # noqa: E402
|
||||
_gen_aws,
|
||||
_gen_basic,
|
||||
_gen_token,
|
||||
_load,
|
||||
_save,
|
||||
cmd_canary,
|
||||
)
|
||||
|
||||
# -- Helpers -----------------------------------------------------------------
|
||||
|
||||
class _FakeState:
|
||||
"""In-memory stand-in for bot.state."""
|
||||
|
||||
def __init__(self):
|
||||
self._store: dict[str, dict[str, str]] = {}
|
||||
|
||||
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
|
||||
return self._store.get(plugin, {}).get(key, default)
|
||||
|
||||
def set(self, plugin: str, key: str, value: str) -> None:
|
||||
self._store.setdefault(plugin, {})[key] = value
|
||||
|
||||
def delete(self, plugin: str, key: str) -> bool:
|
||||
try:
|
||||
del self._store[plugin][key]
|
||||
return True
|
||||
except KeyError:
|
||||
return False
|
||||
|
||||
def keys(self, plugin: str) -> list[str]:
|
||||
return sorted(self._store.get(plugin, {}).keys())
|
||||
|
||||
|
||||
class _FakeBot:
|
||||
"""Minimal bot stand-in that captures sent/replied messages."""
|
||||
|
||||
def __init__(self, *, admin: bool = False):
|
||||
self.sent: list[tuple[str, str]] = []
|
||||
self.replied: list[str] = []
|
||||
self.state = _FakeState()
|
||||
self._admin = admin
|
||||
|
||||
async def send(self, target: str, text: str) -> None:
|
||||
self.sent.append((target, text))
|
||||
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
def _is_admin(self, message) -> bool:
|
||||
return self._admin
|
||||
|
||||
|
||||
def _msg(text: str, nick: str = "alice", target: str = "#ops") -> Message:
|
||||
"""Create a channel PRIVMSG."""
|
||||
return Message(
|
||||
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
||||
command="PRIVMSG", params=[target, text], tags={},
|
||||
)
|
||||
|
||||
|
||||
def _pm(text: str, nick: str = "alice") -> Message:
|
||||
"""Create a private PRIVMSG."""
|
||||
return Message(
|
||||
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
||||
command="PRIVMSG", params=["botname", text], tags={},
|
||||
)
|
||||
|
||||
|
||||
# -- Token generators -------------------------------------------------------
|
||||
|
||||
class TestGenToken:
|
||||
def test_length(self):
|
||||
tok = _gen_token()
|
||||
assert len(tok) == 40
|
||||
|
||||
def test_hex(self):
|
||||
tok = _gen_token()
|
||||
int(tok, 16) # Should not raise
|
||||
|
||||
def test_unique(self):
|
||||
assert _gen_token() != _gen_token()
|
||||
|
||||
|
||||
class TestGenAws:
|
||||
def test_access_key_format(self):
|
||||
pair = _gen_aws()
|
||||
assert pair["access_key"].startswith("AKIA")
|
||||
assert len(pair["access_key"]) == 20
|
||||
|
||||
def test_secret_key_present(self):
|
||||
pair = _gen_aws()
|
||||
assert len(pair["secret_key"]) > 20
|
||||
|
||||
|
||||
class TestGenBasic:
|
||||
def test_user_format(self):
|
||||
pair = _gen_basic()
|
||||
assert pair["user"].startswith("svc")
|
||||
assert len(pair["user"]) == 8
|
||||
|
||||
def test_pass_present(self):
|
||||
pair = _gen_basic()
|
||||
assert len(pair["pass"]) > 10
|
||||
|
||||
|
||||
# -- State helpers -----------------------------------------------------------
|
||||
|
||||
class TestStateHelpers:
|
||||
def test_save_and_load(self):
|
||||
bot = _FakeBot()
|
||||
store = {"mykey": {"type": "token", "value": "abc", "created": "now"}}
|
||||
_save(bot, "#ops", store)
|
||||
loaded = _load(bot, "#ops")
|
||||
assert loaded == store
|
||||
|
||||
def test_load_empty(self):
|
||||
bot = _FakeBot()
|
||||
assert _load(bot, "#ops") == {}
|
||||
|
||||
def test_load_bad_json(self):
|
||||
bot = _FakeBot()
|
||||
bot.state.set("canary", "#ops", "not json{{{")
|
||||
assert _load(bot, "#ops") == {}
|
||||
|
||||
|
||||
# -- Command: gen ------------------------------------------------------------
|
||||
|
||||
class TestCmdGen:
|
||||
def test_gen_default_token(self):
|
||||
bot = _FakeBot(admin=True)
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary gen db-cred")))
|
||||
assert len(bot.replied) == 1
|
||||
assert "db-cred" in bot.replied[0]
|
||||
assert "token" in bot.replied[0]
|
||||
store = _load(bot, "#ops")
|
||||
assert "db-cred" in store
|
||||
assert store["db-cred"]["type"] == "token"
|
||||
assert len(store["db-cred"]["value"]) == 40
|
||||
|
||||
def test_gen_aws(self):
|
||||
bot = _FakeBot(admin=True)
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary gen aws staging-key")))
|
||||
assert "staging-key" in bot.replied[0]
|
||||
assert "AKIA" in bot.replied[0]
|
||||
store = _load(bot, "#ops")
|
||||
assert store["staging-key"]["type"] == "aws"
|
||||
|
||||
def test_gen_basic(self):
|
||||
bot = _FakeBot(admin=True)
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary gen basic svc-login")))
|
||||
assert "svc-login" in bot.replied[0]
|
||||
store = _load(bot, "#ops")
|
||||
assert store["svc-login"]["type"] == "basic"
|
||||
assert "user" in store["svc-login"]["value"]
|
||||
|
||||
def test_gen_requires_admin(self):
|
||||
bot = _FakeBot(admin=False)
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary gen mytoken")))
|
||||
assert "Permission denied" in bot.replied[0]
|
||||
|
||||
def test_gen_requires_channel(self):
|
||||
bot = _FakeBot(admin=True)
|
||||
asyncio.run(cmd_canary(bot, _pm("!canary gen mytoken")))
|
||||
assert "channel" in bot.replied[0].lower()
|
||||
|
||||
def test_gen_duplicate(self):
|
||||
bot = _FakeBot(admin=True)
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary gen dup-test")))
|
||||
bot.replied.clear()
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary gen dup-test")))
|
||||
assert "already exists" in bot.replied[0]
|
||||
|
||||
def test_gen_no_label(self):
|
||||
bot = _FakeBot(admin=True)
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary gen")))
|
||||
assert "Usage" in bot.replied[0]
|
||||
|
||||
def test_gen_type_no_label(self):
|
||||
bot = _FakeBot(admin=True)
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary gen aws")))
|
||||
assert "Usage" in bot.replied[0]
|
||||
|
||||
def test_gen_invalid_label(self):
|
||||
bot = _FakeBot(admin=True)
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary gen b@d!")))
|
||||
assert "Label" in bot.replied[0]
|
||||
|
||||
|
||||
# -- Command: list -----------------------------------------------------------
|
||||
|
||||
class TestCmdList:
|
||||
def test_list_empty(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary list")))
|
||||
assert "No canaries" in bot.replied[0]
|
||||
|
||||
def test_list_populated(self):
|
||||
bot = _FakeBot()
|
||||
store = {
|
||||
"api-key": {"type": "token", "value": "abc", "created": "now"},
|
||||
"db-cred": {"type": "basic", "value": {"user": "x", "pass": "y"}, "created": "now"},
|
||||
}
|
||||
_save(bot, "#ops", store)
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary list")))
|
||||
assert "api-key" in bot.replied[0]
|
||||
assert "db-cred" in bot.replied[0]
|
||||
|
||||
def test_list_requires_channel(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_canary(bot, _pm("!canary list")))
|
||||
assert "channel" in bot.replied[0].lower()
|
||||
|
||||
|
||||
# -- Command: info -----------------------------------------------------------
|
||||
|
||||
class TestCmdInfo:
|
||||
def test_info_exists(self):
|
||||
bot = _FakeBot()
|
||||
store = {"mykey": {"type": "token", "value": "a" * 40, "created": "2026-02-20T14:00:00"}}
|
||||
_save(bot, "#ops", store)
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary info mykey")))
|
||||
assert "mykey" in bot.replied[0]
|
||||
assert "a" * 40 in bot.replied[0]
|
||||
|
||||
def test_info_missing(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary info nope")))
|
||||
assert "No canary" in bot.replied[0]
|
||||
|
||||
def test_info_no_label(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary info")))
|
||||
assert "Usage" in bot.replied[0]
|
||||
|
||||
def test_info_requires_channel(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_canary(bot, _pm("!canary info mykey")))
|
||||
assert "channel" in bot.replied[0].lower()
|
||||
|
||||
|
||||
# -- Command: del ------------------------------------------------------------
|
||||
|
||||
class TestCmdDel:
|
||||
def test_del_success(self):
|
||||
bot = _FakeBot(admin=True)
|
||||
store = {"victim": {"type": "token", "value": "x", "created": "now"}}
|
||||
_save(bot, "#ops", store)
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary del victim")))
|
||||
assert "Deleted" in bot.replied[0]
|
||||
assert _load(bot, "#ops") == {}
|
||||
|
||||
def test_del_nonexistent(self):
|
||||
bot = _FakeBot(admin=True)
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary del nope")))
|
||||
assert "No canary" in bot.replied[0]
|
||||
|
||||
def test_del_requires_admin(self):
|
||||
bot = _FakeBot(admin=False)
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary del something")))
|
||||
assert "Permission denied" in bot.replied[0]
|
||||
|
||||
def test_del_requires_channel(self):
|
||||
bot = _FakeBot(admin=True)
|
||||
asyncio.run(cmd_canary(bot, _pm("!canary del something")))
|
||||
assert "channel" in bot.replied[0].lower()
|
||||
|
||||
def test_del_no_label(self):
|
||||
bot = _FakeBot(admin=True)
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary del")))
|
||||
assert "Usage" in bot.replied[0]
|
||||
|
||||
|
||||
# -- Command: usage ----------------------------------------------------------
|
||||
|
||||
class TestCmdUsage:
|
||||
def test_no_args(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary")))
|
||||
assert "Usage" in bot.replied[0]
|
||||
|
||||
def test_unknown_subcommand(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_canary(bot, _msg("!canary foobar")))
|
||||
assert "Usage" in bot.replied[0]
|
||||
281
tests/test_internetdb.py
Normal file
281
tests/test_internetdb.py
Normal file
@@ -0,0 +1,281 @@
|
||||
"""Tests for the InternetDB plugin (Shodan free host recon)."""
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
import urllib.error
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
from derp.irc import Message
|
||||
|
||||
# plugins/ is not a Python package -- load the module from file path
|
||||
_spec = importlib.util.spec_from_file_location(
|
||||
"plugins.internetdb",
|
||||
Path(__file__).resolve().parent.parent / "plugins" / "internetdb.py",
|
||||
)
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules[_spec.name] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
from plugins.internetdb import ( # noqa: E402
|
||||
_fetch,
|
||||
_format_result,
|
||||
cmd_internetdb,
|
||||
)
|
||||
|
||||
# -- Sample API responses ----------------------------------------------------
|
||||
|
||||
SAMPLE_FULL = {
|
||||
"cpes": ["cpe:/a:apache:http_server:2.4.41", "cpe:/a:openssl:openssl:1.1.1"],
|
||||
"hostnames": ["dns.google"],
|
||||
"ip": "8.8.8.8",
|
||||
"ports": [443, 53],
|
||||
"tags": ["cloud"],
|
||||
"vulns": ["CVE-2021-23017", "CVE-2021-3449"],
|
||||
}
|
||||
|
||||
SAMPLE_MINIMAL = {
|
||||
"cpes": [],
|
||||
"hostnames": [],
|
||||
"ip": "203.0.113.1",
|
||||
"ports": [80],
|
||||
"tags": [],
|
||||
"vulns": [],
|
||||
}
|
||||
|
||||
SAMPLE_EMPTY = {
|
||||
"cpes": [],
|
||||
"hostnames": [],
|
||||
"ip": "198.51.100.1",
|
||||
"ports": [],
|
||||
"tags": [],
|
||||
"vulns": [],
|
||||
}
|
||||
|
||||
SAMPLE_MANY_VULNS = {
|
||||
"cpes": [],
|
||||
"hostnames": ["vuln.example.com"],
|
||||
"ip": "192.0.2.1",
|
||||
"ports": [22, 80, 443],
|
||||
"tags": ["self-signed", "eol-os"],
|
||||
"vulns": [f"CVE-2021-{i}" for i in range(15)],
|
||||
}
|
||||
|
||||
SAMPLE_MANY_HOSTNAMES = {
|
||||
"cpes": [],
|
||||
"hostnames": [f"host{i}.example.com" for i in range(8)],
|
||||
"ip": "192.0.2.2",
|
||||
"ports": [80],
|
||||
"tags": [],
|
||||
"vulns": [],
|
||||
}
|
||||
|
||||
|
||||
# -- Helpers -----------------------------------------------------------------
|
||||
|
||||
class _FakeBot:
|
||||
def __init__(self):
|
||||
self.replied: list[str] = []
|
||||
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
|
||||
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
|
||||
return Message(
|
||||
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
||||
command="PRIVMSG", params=[target, text], tags={},
|
||||
)
|
||||
|
||||
|
||||
class _FakeResp:
|
||||
"""Minimal file-like response for urlopen mocking."""
|
||||
|
||||
def __init__(self, data: bytes, status: int = 200):
|
||||
self._data = data
|
||||
self.status = status
|
||||
|
||||
def read(self):
|
||||
return self._data
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *a):
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestFetch
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFetch:
|
||||
def test_success(self):
|
||||
body = json.dumps(SAMPLE_FULL).encode()
|
||||
with patch.object(_mod, "_urlopen", return_value=_FakeResp(body)):
|
||||
result = _fetch("8.8.8.8")
|
||||
assert result == SAMPLE_FULL
|
||||
|
||||
def test_not_found(self):
|
||||
exc = urllib.error.HTTPError(
|
||||
"https://internetdb.shodan.io/192.0.2.1", 404, "Not Found", {}, None,
|
||||
)
|
||||
with patch.object(_mod, "_urlopen", side_effect=exc):
|
||||
result = _fetch("192.0.2.1")
|
||||
assert result is None
|
||||
|
||||
def test_server_error_raises(self):
|
||||
exc = urllib.error.HTTPError(
|
||||
"https://internetdb.shodan.io/192.0.2.1", 500, "Server Error", {}, None,
|
||||
)
|
||||
with patch.object(_mod, "_urlopen", side_effect=exc):
|
||||
try:
|
||||
_fetch("192.0.2.1")
|
||||
assert False, "Expected HTTPError"
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code == 500
|
||||
|
||||
def test_builds_correct_url(self):
|
||||
calls = []
|
||||
|
||||
def _mock_urlopen(url, **kw):
|
||||
calls.append(url)
|
||||
return _FakeResp(json.dumps(SAMPLE_MINIMAL).encode())
|
||||
|
||||
with patch.object(_mod, "_urlopen", side_effect=_mock_urlopen):
|
||||
_fetch("203.0.113.1")
|
||||
assert calls[0] == "https://internetdb.shodan.io/203.0.113.1"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestFormatResult
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFormatResult:
|
||||
def test_full_response(self):
|
||||
result = _format_result("8.8.8.8", SAMPLE_FULL)
|
||||
assert "8.8.8.8 -- dns.google" in result
|
||||
assert "Ports: 53, 443" in result
|
||||
assert "cpe:/a:apache:http_server:2.4.41" in result
|
||||
assert "Tags: cloud" in result
|
||||
assert "CVE-2021-23017" in result
|
||||
|
||||
def test_minimal_response(self):
|
||||
result = _format_result("203.0.113.1", SAMPLE_MINIMAL)
|
||||
assert "203.0.113.1" in result
|
||||
assert "Ports: 80" in result
|
||||
# No hostnames, CPEs, tags, or vulns sections
|
||||
assert "--" not in result
|
||||
assert "CPEs:" not in result
|
||||
assert "Tags:" not in result
|
||||
assert "CVEs:" not in result
|
||||
|
||||
def test_empty_response(self):
|
||||
result = _format_result("198.51.100.1", SAMPLE_EMPTY)
|
||||
assert result == "198.51.100.1"
|
||||
|
||||
def test_many_vulns_truncated(self):
|
||||
result = _format_result("192.0.2.1", SAMPLE_MANY_VULNS)
|
||||
assert "+5 more" in result
|
||||
# First 10 shown
|
||||
assert "CVE-2021-0" in result
|
||||
assert "CVE-2021-9" in result
|
||||
|
||||
def test_many_hostnames_truncated(self):
|
||||
result = _format_result("192.0.2.2", SAMPLE_MANY_HOSTNAMES)
|
||||
# Only first 5 hostnames shown
|
||||
assert "host0.example.com" in result
|
||||
assert "host4.example.com" in result
|
||||
assert "host5.example.com" not in result
|
||||
|
||||
def test_ports_sorted(self):
|
||||
data = {**SAMPLE_FULL, "ports": [8080, 22, 443, 80]}
|
||||
result = _format_result("8.8.8.8", data)
|
||||
assert "Ports: 22, 80, 443, 8080" in result
|
||||
|
||||
def test_many_cpes_truncated(self):
|
||||
data = {**SAMPLE_EMPTY, "cpes": [f"cpe:/a:vendor{i}:prod" for i in range(12)]}
|
||||
result = _format_result("198.51.100.1", data)
|
||||
assert "cpe:/a:vendor0:prod" in result
|
||||
assert "cpe:/a:vendor7:prod" in result
|
||||
assert "cpe:/a:vendor8:prod" not in result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCommand
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCommand:
|
||||
def _run(self, bot, msg, data=None, side_effect=None):
|
||||
"""Run command with mocked _fetch."""
|
||||
if side_effect is not None:
|
||||
with patch.object(_mod, "_fetch", side_effect=side_effect):
|
||||
asyncio.run(cmd_internetdb(bot, msg))
|
||||
else:
|
||||
with patch.object(_mod, "_fetch", return_value=data):
|
||||
asyncio.run(cmd_internetdb(bot, msg))
|
||||
|
||||
def test_valid_ip(self):
|
||||
bot = _FakeBot()
|
||||
self._run(bot, _msg("!internetdb 8.8.8.8"), data=SAMPLE_FULL)
|
||||
assert "dns.google" in bot.replied[0]
|
||||
assert "Ports:" in bot.replied[0]
|
||||
|
||||
def test_no_data(self):
|
||||
bot = _FakeBot()
|
||||
self._run(bot, _msg("!internetdb 4.4.4.4"), data=None)
|
||||
assert "no data available" in bot.replied[0]
|
||||
|
||||
def test_no_args(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_internetdb(bot, _msg("!internetdb")))
|
||||
assert "Usage:" in bot.replied[0]
|
||||
|
||||
def test_invalid_ip(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_internetdb(bot, _msg("!internetdb notanip")))
|
||||
assert "Invalid IP" in bot.replied[0]
|
||||
|
||||
def test_private_ip(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_internetdb(bot, _msg("!internetdb 192.168.1.1")))
|
||||
assert "private/loopback" in bot.replied[0]
|
||||
|
||||
def test_loopback(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_internetdb(bot, _msg("!internetdb 127.0.0.1")))
|
||||
assert "private/loopback" in bot.replied[0]
|
||||
|
||||
def test_ipv6(self):
|
||||
bot = _FakeBot()
|
||||
self._run(bot, _msg("!internetdb 2606:4700::1"), data=SAMPLE_MINIMAL)
|
||||
assert "Ports:" in bot.replied[0]
|
||||
|
||||
def test_network_error(self):
|
||||
bot = _FakeBot()
|
||||
self._run(
|
||||
bot, _msg("!internetdb 8.8.8.8"),
|
||||
side_effect=ConnectionError("timeout"),
|
||||
)
|
||||
assert "lookup failed" in bot.replied[0]
|
||||
|
||||
def test_normalized_ip(self):
|
||||
"""ipaddress normalization (e.g. ::ffff:8.8.8.8 -> mapped)."""
|
||||
calls = []
|
||||
|
||||
def _mock_fetch(addr):
|
||||
calls.append(addr)
|
||||
return SAMPLE_MINIMAL
|
||||
|
||||
bot = _FakeBot()
|
||||
with patch.object(_mod, "_fetch", side_effect=_mock_fetch):
|
||||
asyncio.run(cmd_internetdb(bot, _msg("!internetdb 8.008.8.8")))
|
||||
# Leading zeros rejected by Python's strict parser -> "Invalid IP"
|
||||
assert "Invalid IP" in bot.replied[0]
|
||||
|
||||
def test_empty_result(self):
|
||||
bot = _FakeBot()
|
||||
self._run(bot, _msg("!internetdb 198.51.100.1"), data=SAMPLE_EMPTY)
|
||||
assert "198.51.100.1" in bot.replied[0]
|
||||
228
tests/test_resolve.py
Normal file
228
tests/test_resolve.py
Normal file
@@ -0,0 +1,228 @@
|
||||
"""Tests for the bulk DNS resolve plugin."""
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import struct
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
from derp.dns import encode_name
|
||||
from derp.irc import Message
|
||||
|
||||
# plugins/ is not a Python package -- load the module from file path
|
||||
_spec = importlib.util.spec_from_file_location(
|
||||
"plugins.resolve", Path(__file__).resolve().parent.parent / "plugins" / "resolve.py",
|
||||
)
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules[_spec.name] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
from plugins.resolve import _query_tcp, _resolve_one, cmd_resolve # noqa: E402
|
||||
|
||||
# -- Helpers -----------------------------------------------------------------
|
||||
|
||||
def _make_a_response(ip_bytes: bytes = b"\x01\x02\x03\x04") -> bytes:
|
||||
"""Build a minimal A-record DNS response."""
|
||||
tid = b"\x00\x01"
|
||||
flags = struct.pack("!H", 0x8180)
|
||||
counts = struct.pack("!HHHH", 1, 1, 0, 0)
|
||||
qname = encode_name("example.com")
|
||||
question = qname + struct.pack("!HH", 1, 1)
|
||||
answer = qname + struct.pack("!HHIH", 1, 1, 300, len(ip_bytes)) + ip_bytes
|
||||
return tid + flags + counts + question + answer
|
||||
|
||||
|
||||
def _make_nxdomain_response() -> bytes:
|
||||
"""Build a minimal NXDOMAIN DNS response."""
|
||||
tid = b"\x00\x01"
|
||||
flags = struct.pack("!H", 0x8183) # rcode=3
|
||||
counts = struct.pack("!HHHH", 1, 0, 0, 0)
|
||||
qname = encode_name("nope.invalid")
|
||||
question = qname + struct.pack("!HH", 1, 1)
|
||||
return tid + flags + counts + question
|
||||
|
||||
|
||||
class _FakeBot:
|
||||
def __init__(self):
|
||||
self.replied: list[str] = []
|
||||
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
|
||||
def _msg(text: str) -> Message:
|
||||
return Message(
|
||||
raw="", prefix="alice!~alice@host", nick="alice",
|
||||
command="PRIVMSG", params=["#test", text], tags={},
|
||||
)
|
||||
|
||||
|
||||
# -- _query_tcp --------------------------------------------------------------
|
||||
|
||||
class TestQueryTcp:
|
||||
def test_a_record(self):
|
||||
response = _make_a_response()
|
||||
framed = struct.pack("!H", len(response)) + response
|
||||
|
||||
reader = AsyncMock()
|
||||
reader.readexactly = AsyncMock(side_effect=[framed[:2], framed[2:]])
|
||||
writer = MagicMock()
|
||||
writer.drain = AsyncMock()
|
||||
writer.wait_closed = AsyncMock()
|
||||
|
||||
mock_open = AsyncMock(return_value=(reader, writer))
|
||||
with patch.object(_mod, "_open_connection", mock_open):
|
||||
rcode, results = asyncio.run(_query_tcp("example.com", 1, "1.1.1.1"))
|
||||
|
||||
assert rcode == 0
|
||||
assert results == ["1.2.3.4"]
|
||||
|
||||
def test_nxdomain(self):
|
||||
response = _make_nxdomain_response()
|
||||
framed = struct.pack("!H", len(response)) + response
|
||||
|
||||
reader = AsyncMock()
|
||||
reader.readexactly = AsyncMock(side_effect=[framed[:2], framed[2:]])
|
||||
writer = MagicMock()
|
||||
writer.drain = AsyncMock()
|
||||
writer.wait_closed = AsyncMock()
|
||||
|
||||
mock_open = AsyncMock(return_value=(reader, writer))
|
||||
with patch.object(_mod, "_open_connection", mock_open):
|
||||
rcode, results = asyncio.run(_query_tcp("nope.invalid", 1, "1.1.1.1"))
|
||||
|
||||
assert rcode == 3
|
||||
assert results == []
|
||||
|
||||
|
||||
# -- _resolve_one ------------------------------------------------------------
|
||||
|
||||
class TestResolveOne:
|
||||
def test_success(self):
|
||||
mock_tcp = AsyncMock(return_value=(0, ["1.2.3.4"]))
|
||||
with patch.object(_mod, "_query_tcp", mock_tcp):
|
||||
result = asyncio.run(_resolve_one("example.com", "A", "1.1.1.1"))
|
||||
assert "example.com -> 1.2.3.4" == result
|
||||
|
||||
def test_nxdomain(self):
|
||||
mock_tcp = AsyncMock(return_value=(3, []))
|
||||
with patch.object(_mod, "_query_tcp", mock_tcp):
|
||||
result = asyncio.run(_resolve_one("bad.invalid", "A", "1.1.1.1"))
|
||||
assert "NXDOMAIN" in result
|
||||
|
||||
def test_timeout(self):
|
||||
mock_tcp = AsyncMock(side_effect=asyncio.TimeoutError())
|
||||
with patch.object(_mod, "_query_tcp", mock_tcp):
|
||||
result = asyncio.run(_resolve_one("slow.example.com", "A", "1.1.1.1"))
|
||||
assert "timeout" in result
|
||||
|
||||
def test_error(self):
|
||||
mock_tcp = AsyncMock(side_effect=OSError("connection refused"))
|
||||
with patch.object(_mod, "_query_tcp", mock_tcp):
|
||||
result = asyncio.run(_resolve_one("down.example.com", "A", "1.1.1.1"))
|
||||
assert "error" in result
|
||||
|
||||
def test_ptr_auto(self):
|
||||
mock_tcp = AsyncMock(return_value=(0, ["dns.google"]))
|
||||
with patch.object(_mod, "_query_tcp", mock_tcp):
|
||||
result = asyncio.run(_resolve_one("8.8.8.8", "PTR", "1.1.1.1"))
|
||||
assert "dns.google" in result
|
||||
|
||||
def test_ptr_invalid_ip(self):
|
||||
result = asyncio.run(_resolve_one("not-an-ip", "PTR", "1.1.1.1"))
|
||||
assert "invalid IP" in result
|
||||
|
||||
def test_no_records(self):
|
||||
mock_tcp = AsyncMock(return_value=(0, []))
|
||||
with patch.object(_mod, "_query_tcp", mock_tcp):
|
||||
result = asyncio.run(_resolve_one("empty.example.com", "A", "1.1.1.1"))
|
||||
assert "no records" in result
|
||||
|
||||
def test_multiple_results(self):
|
||||
mock_tcp = AsyncMock(return_value=(0, ["1.1.1.1", "1.0.0.1"]))
|
||||
with patch.object(_mod, "_query_tcp", mock_tcp):
|
||||
result = asyncio.run(_resolve_one("multi.example.com", "A", "1.1.1.1"))
|
||||
assert "1.1.1.1, 1.0.0.1" in result
|
||||
|
||||
|
||||
# -- Command handler ---------------------------------------------------------
|
||||
|
||||
class TestCmdResolve:
|
||||
def test_no_args(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_resolve(bot, _msg("!resolve")))
|
||||
assert "Usage" in bot.replied[0]
|
||||
|
||||
def test_single_host(self):
|
||||
bot = _FakeBot()
|
||||
mock_tcp = AsyncMock(return_value=(0, ["93.184.216.34"]))
|
||||
|
||||
with patch.object(_mod, "_query_tcp", mock_tcp):
|
||||
asyncio.run(cmd_resolve(bot, _msg("!resolve example.com")))
|
||||
|
||||
assert len(bot.replied) == 1
|
||||
assert "example.com -> 93.184.216.34" in bot.replied[0]
|
||||
|
||||
def test_multiple_hosts(self):
|
||||
bot = _FakeBot()
|
||||
|
||||
async def fake_tcp(name, qtype, server, timeout=5.0):
|
||||
if "example" in name:
|
||||
return 0, ["93.184.216.34"]
|
||||
return 0, ["140.82.121.3"]
|
||||
|
||||
with patch.object(_mod, "_query_tcp", fake_tcp):
|
||||
asyncio.run(cmd_resolve(bot, _msg("!resolve example.com github.com")))
|
||||
|
||||
assert len(bot.replied) == 2
|
||||
assert "93.184.216.34" in bot.replied[0]
|
||||
assert "140.82.121.3" in bot.replied[1]
|
||||
|
||||
def test_explicit_type(self):
|
||||
bot = _FakeBot()
|
||||
mock_tcp = AsyncMock(return_value=(0, ["2606:2800:220:1:248:1893:25c8:1946"]))
|
||||
|
||||
with patch.object(_mod, "_query_tcp", mock_tcp):
|
||||
asyncio.run(cmd_resolve(bot, _msg("!resolve example.com AAAA")))
|
||||
|
||||
assert "2606:" in bot.replied[0]
|
||||
# Verify AAAA qtype (28) was used
|
||||
call_args = mock_tcp.call_args[0]
|
||||
assert call_args[1] == 28
|
||||
|
||||
def test_ip_auto_ptr(self):
|
||||
bot = _FakeBot()
|
||||
mock_tcp = AsyncMock(return_value=(0, ["dns.google"]))
|
||||
|
||||
with patch.object(_mod, "_query_tcp", mock_tcp):
|
||||
asyncio.run(cmd_resolve(bot, _msg("!resolve 8.8.8.8")))
|
||||
|
||||
assert "dns.google" in bot.replied[0]
|
||||
|
||||
def test_type_only_no_hosts(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_resolve(bot, _msg("!resolve AAAA")))
|
||||
assert "Usage" in bot.replied[0]
|
||||
|
||||
def test_nxdomain(self):
|
||||
bot = _FakeBot()
|
||||
mock_tcp = AsyncMock(return_value=(3, []))
|
||||
|
||||
with patch.object(_mod, "_query_tcp", mock_tcp):
|
||||
asyncio.run(cmd_resolve(bot, _msg("!resolve bad.invalid")))
|
||||
|
||||
assert "NXDOMAIN" in bot.replied[0]
|
||||
|
||||
def test_max_hosts(self):
|
||||
"""Hosts beyond MAX_HOSTS are truncated."""
|
||||
bot = _FakeBot()
|
||||
hosts = " ".join(f"h{i}.example.com" for i in range(15))
|
||||
mock_tcp = AsyncMock(return_value=(0, ["1.2.3.4"]))
|
||||
|
||||
with patch.object(_mod, "_query_tcp", mock_tcp):
|
||||
asyncio.run(cmd_resolve(bot, _msg(f"!resolve {hosts}")))
|
||||
|
||||
# 10 results + 1 truncation note
|
||||
assert len(bot.replied) == 11
|
||||
assert "showing first 10" in bot.replied[-1]
|
||||
203
tests/test_tcping.py
Normal file
203
tests/test_tcping.py
Normal file
@@ -0,0 +1,203 @@
|
||||
"""Tests for the TCP ping plugin."""
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
from derp.irc import Message
|
||||
|
||||
# plugins/ is not a Python package -- load the module from file path
|
||||
_spec = importlib.util.spec_from_file_location(
|
||||
"plugins.tcping", Path(__file__).resolve().parent.parent / "plugins" / "tcping.py",
|
||||
)
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules[_spec.name] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
from plugins.tcping import ( # noqa: E402
|
||||
_is_internal,
|
||||
_probe,
|
||||
_validate_host,
|
||||
cmd_tcping,
|
||||
)
|
||||
|
||||
# -- Helpers -----------------------------------------------------------------
|
||||
|
||||
class _FakeBot:
|
||||
def __init__(self):
|
||||
self.replied: list[str] = []
|
||||
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
|
||||
def _msg(text: str) -> Message:
|
||||
return Message(
|
||||
raw="", prefix="alice!~alice@host", nick="alice",
|
||||
command="PRIVMSG", params=["#test", text], tags={},
|
||||
)
|
||||
|
||||
|
||||
# -- Validation --------------------------------------------------------------
|
||||
|
||||
class TestValidateHost:
|
||||
def test_valid_ip(self):
|
||||
assert _validate_host("93.184.216.34") is True
|
||||
|
||||
def test_valid_domain(self):
|
||||
assert _validate_host("example.com") is True
|
||||
|
||||
def test_invalid_no_dot(self):
|
||||
assert _validate_host("localhost") is False
|
||||
|
||||
def test_invalid_chars(self):
|
||||
assert _validate_host("bad host!") is False
|
||||
|
||||
|
||||
class TestIsInternal:
|
||||
def test_private(self):
|
||||
assert _is_internal("192.168.1.1") is True
|
||||
|
||||
def test_loopback(self):
|
||||
assert _is_internal("127.0.0.1") is True
|
||||
|
||||
def test_public(self):
|
||||
assert _is_internal("8.8.8.8") is False
|
||||
|
||||
def test_domain(self):
|
||||
assert _is_internal("example.com") is False
|
||||
|
||||
|
||||
# -- Probe -------------------------------------------------------------------
|
||||
|
||||
class TestProbe:
|
||||
def test_success(self):
|
||||
writer = MagicMock()
|
||||
writer.wait_closed = AsyncMock()
|
||||
mock_open = AsyncMock(return_value=(MagicMock(), writer))
|
||||
|
||||
with patch.object(_mod, "_open_connection", mock_open):
|
||||
rtt = asyncio.run(_probe("example.com", 443, 5.0))
|
||||
|
||||
assert rtt is not None
|
||||
assert rtt >= 0
|
||||
writer.close.assert_called_once()
|
||||
|
||||
def test_timeout(self):
|
||||
mock_open = AsyncMock(side_effect=asyncio.TimeoutError())
|
||||
|
||||
with patch.object(_mod, "_open_connection", mock_open):
|
||||
rtt = asyncio.run(_probe("example.com", 443, 0.1))
|
||||
|
||||
assert rtt is None
|
||||
|
||||
def test_connection_error(self):
|
||||
mock_open = AsyncMock(side_effect=OSError("refused"))
|
||||
|
||||
with patch.object(_mod, "_open_connection", mock_open):
|
||||
rtt = asyncio.run(_probe("example.com", 443, 5.0))
|
||||
|
||||
assert rtt is None
|
||||
|
||||
|
||||
# -- Command -----------------------------------------------------------------
|
||||
|
||||
class TestCmdTcping:
|
||||
def test_no_args(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_tcping(bot, _msg("!tcping")))
|
||||
assert "Usage" in bot.replied[0]
|
||||
|
||||
def test_invalid_host(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_tcping(bot, _msg("!tcping notahost")))
|
||||
assert "Invalid host" in bot.replied[0]
|
||||
|
||||
def test_internal_host(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_tcping(bot, _msg("!tcping 192.168.1.1")))
|
||||
assert "internal" in bot.replied[0].lower()
|
||||
|
||||
def test_invalid_port(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com 99999")))
|
||||
assert "Invalid port" in bot.replied[0]
|
||||
|
||||
def test_invalid_port_string(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com abc")))
|
||||
assert "Invalid port" in bot.replied[0]
|
||||
|
||||
def test_success_default(self):
|
||||
"""Default 3 probes, all succeed."""
|
||||
bot = _FakeBot()
|
||||
writer = MagicMock()
|
||||
writer.wait_closed = AsyncMock()
|
||||
mock_open = AsyncMock(return_value=(MagicMock(), writer))
|
||||
|
||||
with patch.object(_mod, "_open_connection", mock_open):
|
||||
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com")))
|
||||
|
||||
assert len(bot.replied) == 1
|
||||
reply = bot.replied[0]
|
||||
assert "example.com:443" in reply
|
||||
assert "3 probes" in reply
|
||||
assert "min/avg/max" in reply
|
||||
|
||||
def test_custom_port_and_count(self):
|
||||
bot = _FakeBot()
|
||||
writer = MagicMock()
|
||||
writer.wait_closed = AsyncMock()
|
||||
mock_open = AsyncMock(return_value=(MagicMock(), writer))
|
||||
|
||||
with patch.object(_mod, "_open_connection", mock_open):
|
||||
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com 22 5")))
|
||||
|
||||
reply = bot.replied[0]
|
||||
assert "example.com:22" in reply
|
||||
assert "5 probes" in reply
|
||||
|
||||
def test_all_timeout(self):
|
||||
bot = _FakeBot()
|
||||
mock_open = AsyncMock(side_effect=asyncio.TimeoutError())
|
||||
|
||||
with patch.object(_mod, "_open_connection", mock_open):
|
||||
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com")))
|
||||
|
||||
assert "timed out" in bot.replied[0]
|
||||
|
||||
def test_count_clamped(self):
|
||||
"""Count > MAX_COUNT gets clamped."""
|
||||
bot = _FakeBot()
|
||||
writer = MagicMock()
|
||||
writer.wait_closed = AsyncMock()
|
||||
mock_open = AsyncMock(return_value=(MagicMock(), writer))
|
||||
|
||||
with patch.object(_mod, "_open_connection", mock_open):
|
||||
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com 443 99")))
|
||||
|
||||
# MAX_COUNT is 10
|
||||
assert "10 probes" in bot.replied[0]
|
||||
|
||||
def test_partial_timeout(self):
|
||||
"""Some probes succeed, some fail."""
|
||||
bot = _FakeBot()
|
||||
call_count = 0
|
||||
writer = MagicMock()
|
||||
writer.wait_closed = AsyncMock()
|
||||
|
||||
async def mock_open(host, port, timeout=None):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count == 2:
|
||||
raise asyncio.TimeoutError()
|
||||
return (MagicMock(), writer)
|
||||
|
||||
with patch.object(_mod, "_open_connection", mock_open):
|
||||
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com 443 3")))
|
||||
|
||||
reply = bot.replied[0]
|
||||
assert "timeout" in reply
|
||||
assert "min/avg/max" in reply
|
||||
Reference in New Issue
Block a user