diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index 095c87b..db8885c 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -250,12 +250,33 @@ Categories: sqli, xss, ssti, lfi, cmdi, xxe !cve search apache rce # Search CVE descriptions !cve update # Download NVD feed (slow) !cve stats # Show index size +!mac AA:BB:CC:DD:EE:FF # MAC OUI vendor lookup +!mac random # Generate random MAC +!mac update # Download IEEE OUI database ``` +## Security Intelligence (API) + +``` +!abuse 8.8.8.8 # AbuseIPDB reputation check +!abuse 8.8.8.8 1.1.1.1 # Batch check (max 5) +!abuse 8.8.8.8 report 14 Spam # Report IP (admin) +!vt # VirusTotal file hash lookup +!vt 8.8.8.8 # VirusTotal IP lookup +!vt example.com # VirusTotal domain lookup +!vt https://evil.com # VirusTotal URL lookup +!jwt eyJhbG... # Decode JWT token +!emailcheck user@example.com # SMTP verification (admin) +``` + +API keys: set `ABUSEIPDB_API_KEY` / `VIRUSTOTAL_API_KEY` env vars or +configure in `config/derp.toml` under `[abuseipdb]` / `[virustotal]`. +VT rate limit: 4 req/min. Email check: max 5, admin only. + ### Data Setup ```bash -./scripts/update-data.sh # Update tor + iprep +./scripts/update-data.sh # Update tor + iprep + oui MAXMIND_LICENSE_KEY=xxx ./scripts/update-data.sh # + GeoLite2 ``` diff --git a/docs/USAGE.md b/docs/USAGE.md index 9abfbf6..4d732d1 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -125,6 +125,12 @@ format = "text" # Log format: "text" (default) or "json" | `!username list` | Show available services by category | | `!alert ` | Keyword alert subscriptions across platforms | | `!searx ` | Search SearXNG and show top results | +| `!jwt ` | Decode JWT header, claims, and flag issues | +| `!mac ` | MAC OUI vendor lookup / random MAC | +| `!abuse [ip2 ...]` | AbuseIPDB reputation check | +| `!abuse report ` | Report IP to AbuseIPDB (admin) | +| `!vt ` | VirusTotal lookup | +| `!emailcheck [email2 ...]` | SMTP email verification (admin) | ### Command Shorthand @@ -726,3 +732,116 @@ Polling and announcements: - `list` shows error status indicators next to each alert - `check` forces an immediate poll across all platforms - `history` queries stored results, most recent first + +### `!jwt` -- JWT Decoder + +Decode JSON Web Token header and payload, flag common issues. + +``` +!jwt eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyMTIzIn0.sig +``` + +Output format: + +``` +Header: alg=RS256 typ=JWT | sig=43 bytes +sub=user123 +WARN: expired (2026-03-01 12:00 UTC) +``` + +Issues detected: +- `alg=none` (unsigned token) +- Expired tokens (`exp` in the past) +- Not-yet-valid tokens (`nbf` in the future) + +No external dependencies -- pure base64/JSON decoding. + +### `!mac` -- MAC Address Lookup + +OUI vendor lookup from IEEE database, random MAC generation. + +``` +!mac AA:BB:CC:DD:EE:FF Vendor lookup +!mac AABB.CCDD.EEFF Cisco-style format also accepted +!mac random Generate random locally-administered MAC +!mac update Download IEEE OUI database +``` + +Output format: + +``` +AA:BB:CC:DD:EE:FF -- Cisco Systems, Inc (OUI: AA:BB:CC) +Random MAC: 02:4A:F7:3C:91:E2 (locally administered) +``` + +- Accepts any common MAC format (colon, dash, dot, no separator) +- Random MACs have the locally-administered bit set and multicast bit cleared +- OUI database stored at `data/oui.txt`, also downloadable via `scripts/update-data.sh` + +### `!abuse` -- AbuseIPDB + +Check IP reputation or report abuse via the AbuseIPDB API. + +``` +!abuse 8.8.8.8 Check single IP +!abuse 8.8.8.8 1.1.1.1 Check multiple (max 5) +!abuse 8.8.8.8 report 14,22 Brute force Report IP (admin) +``` + +Output format: + +``` +8.8.8.8 -- Abuse: 0% (0 reports) | ISP: Google LLC | Usage: Data Center | Country: US +``` + +- API key: set `ABUSEIPDB_API_KEY` env var or `api_key` under `[abuseipdb]` in config +- Private/loopback IPs are rejected +- Reporting requires admin privileges +- Categories are comma-separated numbers per AbuseIPDB docs + +### `!vt` -- VirusTotal + +Query VirusTotal API v3 for file hashes, IPs, domains, or URLs. + +``` +!vt 44d88612fea8a8f36de82e12... File hash (MD5/SHA1/SHA256) +!vt 8.8.8.8 IP address +!vt example.com Domain +!vt https://example.com/page URL +``` + +Output format: + +``` +44d88612fea8a8... -- 62/72 detected | trojan, malware | first seen: 2024-01-15 +8.8.8.8 -- 0/94 | AS15169 GOOGLE | Country: US | Reputation: 0 +example.com -- 0/94 | Registrar: Example Inc | Reputation: 0 +``` + +- API key: set `VIRUSTOTAL_API_KEY` env var or `api_key` under `[virustotal]` in config +- Auto-detects input type from format (hash length, URL scheme, IP, domain) +- Rate limited to 4 requests per minute (VT free tier) +- URL IDs are base64url-encoded per VT API spec + +### `!emailcheck` -- SMTP Email Verification (admin) + +Verify email deliverability via MX resolution and raw SMTP RCPT TO conversation +through the SOCKS5 proxy. + +``` +!emailcheck user@example.com Single check +!emailcheck user@example.com user2@test.org Batch (max 5) +``` + +Output format: + +``` +user@example.com -- SMTP 250 OK (mx: mail.example.com) +bad@example.com -- SMTP 550 User unknown (mx: mail.example.com) +``` + +- Admin only (prevents enumeration abuse) +- Resolves MX records via Tor DNS, falls back to A record +- Raw SMTP via SOCKS5 proxy: EHLO, MAIL FROM:<>, RCPT TO, QUIT +- 15-second timeout per connection +- Max 5 emails per invocation diff --git a/plugins/abuseipdb.py b/plugins/abuseipdb.py new file mode 100644 index 0000000..fa625b4 --- /dev/null +++ b/plugins/abuseipdb.py @@ -0,0 +1,146 @@ +"""Plugin: AbuseIPDB IP reputation check and reporting.""" + +from __future__ import annotations + +import asyncio +import ipaddress +import json +import logging +import os +import urllib.parse +import urllib.request + +from derp.http import urlopen as _urlopen +from derp.plugin import command + +log = logging.getLogger(__name__) + +_API_BASE = "https://api.abuseipdb.com/api/v2" +_MAX_BATCH = 5 + + +def _get_api_key(bot) -> str: + """Resolve API key from environment or config.""" + return (os.environ.get("ABUSEIPDB_API_KEY", "") + or bot.config.get("abuseipdb", {}).get("api_key", "")) + + +def _validate_ip(addr: str) -> str | None: + """Validate IP address. Returns error string or None if valid.""" + try: + ip = ipaddress.ip_address(addr) + except ValueError: + return f"invalid IP: {addr}" + if ip.is_private or ip.is_loopback: + return f"{addr}: private/loopback address" + return None + + +def _check_ip(api_key: str, addr: str) -> dict: + """Query AbuseIPDB check endpoint (blocking).""" + url = f"{_API_BASE}/check?{urllib.parse.urlencode({'ipAddress': addr, 'maxAgeInDays': 90})}" + req = urllib.request.Request(url, headers={ + "Key": api_key, + "Accept": "application/json", + "User-Agent": "derp-bot", + }) + with _urlopen(req, timeout=15) as resp: + return json.loads(resp.read()) + + +def _report_ip(api_key: str, addr: str, categories: str, comment: str) -> dict: + """Submit abuse report (blocking).""" + data = urllib.parse.urlencode({ + "ip": addr, + "categories": categories, + "comment": comment, + }).encode() + req = urllib.request.Request(f"{_API_BASE}/report", data=data, headers={ + "Key": api_key, + "Accept": "application/json", + "User-Agent": "derp-bot", + }) + with _urlopen(req, timeout=15) as resp: + return json.loads(resp.read()) + + +def _format_check(addr: str, data: dict) -> str: + """Format check response into single-line output.""" + d = data.get("data", {}) + score = d.get("abuseConfidenceScore", "?") + reports = d.get("totalReports", 0) + isp = d.get("isp", "?") + usage = d.get("usageType", "?") + country = d.get("countryCode", "?") + return (f"{addr} -- Abuse: {score}% ({reports} reports) | ISP: {isp}" + f" | Usage: {usage} | Country: {country}") + + +@command("abuse", help="AbuseIPDB: !abuse [ip2 ...] | !abuse report ") +async def cmd_abuse(bot, message): + """Check or report IP addresses via AbuseIPDB. + + Usage: + !abuse 8.8.8.8 Check single IP + !abuse 8.8.8.8 1.1.1.1 Check multiple IPs (max 5) + !abuse 8.8.8.8 report 14,22 Brute force Report IP (admin) + """ + parts = message.text.split() + if len(parts) < 2: + usage = "Usage: !abuse [ip2 ...] | !abuse report " + await bot.reply(message, usage) + return + + api_key = _get_api_key(bot) + if not api_key: + await bot.reply(message, "AbuseIPDB API key not configured") + return + + # Detect report mode: !abuse report + if len(parts) >= 5 and parts[2].lower() == "report": + if not bot._is_admin(message): + await bot.reply(message, "Permission denied: reporting requires admin") + return + + addr = parts[1] + err = _validate_ip(addr) + if err: + await bot.reply(message, err) + return + + categories = parts[3] + comment = " ".join(parts[4:]) + loop = asyncio.get_running_loop() + + try: + result = await loop.run_in_executor( + None, _report_ip, api_key, addr, categories, comment, + ) + score = result.get("data", {}).get("abuseConfidenceScore", "?") + await bot.reply(message, f"{addr} -- reported (confidence: {score}%)") + except Exception as exc: + await bot.reply(message, f"{addr} -- report failed: {exc}") + return + + # Check mode: collect IPs from arguments + addrs = parts[1:1 + _MAX_BATCH] + + # Validate all + for addr in addrs: + err = _validate_ip(addr) + if err: + await bot.reply(message, err) + return + + loop = asyncio.get_running_loop() + + async def _query(addr: str) -> str: + try: + result = await loop.run_in_executor(None, _check_ip, api_key, addr) + return _format_check(addr, result) + except Exception as exc: + return f"{addr} -- error: {exc}" + + results = await asyncio.gather(*[_query(a) for a in addrs]) + for line in results: + await bot.reply(message, line) diff --git a/plugins/emailcheck.py b/plugins/emailcheck.py new file mode 100644 index 0000000..ae0c23b --- /dev/null +++ b/plugins/emailcheck.py @@ -0,0 +1,187 @@ +"""Plugin: SMTP email verification via MX resolution and raw SMTP conversation.""" + +from __future__ import annotations + +import asyncio +import logging +import re +import socket + +from derp.dns import QTYPES, build_query, parse_response +from derp.http import create_connection +from derp.plugin import command + +log = logging.getLogger(__name__) + +_MAX_BATCH = 5 +_SMTP_TIMEOUT = 15 +_EMAIL_RE = re.compile(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$") + +# Tor DNS resolver for MX lookups +_DNS_ADDR = "10.200.1.13" +_DNS_PORT = 9053 + + +def _resolve_mx(domain: str) -> list[str]: + """Resolve MX records for a domain via Tor DNS. Returns list of MX hosts.""" + query = build_query(domain, QTYPES["MX"]) + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.settimeout(5) + try: + sock.sendto(query, (_DNS_ADDR, _DNS_PORT)) + data, _ = sock.recvfrom(4096) + finally: + sock.close() + + rcode, results = parse_response(data) + if rcode != 0 or not results: + return [] + + # MX results are "priority host" format + mx_hosts = [] + for r in results: + parts = r.split(None, 1) + if len(parts) == 2: + mx_hosts.append(parts[1].rstrip(".")) + else: + mx_hosts.append(r.rstrip(".")) + + # Sort by priority (already parsed as "prio host") + return mx_hosts + + +def _resolve_a(domain: str) -> str | None: + """Resolve A record for a domain as fallback. Returns first IP or None.""" + query = build_query(domain, QTYPES["A"]) + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.settimeout(5) + try: + sock.sendto(query, (_DNS_ADDR, _DNS_PORT)) + data, _ = sock.recvfrom(4096) + finally: + sock.close() + + rcode, results = parse_response(data) + if rcode == 0 and results: + return results[0] + return None + + +def _smtp_check(email: str, mx_host: str) -> tuple[int, str]: + """Raw SMTP conversation via SOCKS proxy. Returns (response_code, response_text).""" + sock = create_connection((mx_host, 25), timeout=_SMTP_TIMEOUT) + try: + fp = sock.makefile("rb") + + def _read_reply() -> tuple[int, str]: + lines = [] + while True: + line = fp.readline() + if not line: + break + text = line.decode("utf-8", errors="replace").rstrip() + lines.append(text) + # Multi-line: "250-..." continues, "250 ..." ends + if len(text) >= 4 and text[3] == " ": + break + if not lines: + return 0, "no response" + try: + code = int(lines[-1][:3]) + except (ValueError, IndexError): + code = 0 + return code, lines[-1][4:] if len(lines[-1]) > 4 else lines[-1] + + # Read banner + code, text = _read_reply() + if code != 220: + return code, f"banner: {text}" + + # EHLO + sock.sendall(b"EHLO derp.bot\r\n") + code, text = _read_reply() + if code != 250: + return code, f"EHLO: {text}" + + # MAIL FROM + sock.sendall(b"MAIL FROM:<>\r\n") + code, text = _read_reply() + if code != 250: + return code, f"MAIL FROM: {text}" + + # RCPT TO + sock.sendall(f"RCPT TO:<{email}>\r\n".encode()) + code, text = _read_reply() + + # QUIT + try: + sock.sendall(b"QUIT\r\n") + except OSError: + pass + + return code, text + finally: + sock.close() + + +@command("emailcheck", help="SMTP verify: !emailcheck [email2 ...]", admin=True) +async def cmd_emailcheck(bot, message): + """Check email deliverability via SMTP RCPT TO verification. + + Usage: + !emailcheck user@example.com Single check + !emailcheck user@example.com user2@test.org Batch (max 5) + """ + parts = message.text.split() + if len(parts) < 2: + await bot.reply(message, "Usage: !emailcheck [email2 ...]") + return + + emails = parts[1:1 + _MAX_BATCH] + + # Validate format + for email in emails: + if not _EMAIL_RE.match(email): + await bot.reply(message, f"Invalid email format: {email}") + return + + loop = asyncio.get_running_loop() + + async def _check_one(email: str) -> str: + domain = email.split("@", 1)[1] + + # Resolve MX + try: + mx_hosts = await loop.run_in_executor(None, _resolve_mx, domain) + except Exception as exc: + log.debug("emailcheck: MX lookup failed for %s: %s", domain, exc) + mx_hosts = [] + + # Fallback to A record + if not mx_hosts: + try: + a_record = await loop.run_in_executor(None, _resolve_a, domain) + except Exception: + a_record = None + if a_record: + mx_hosts = [domain] + else: + return f"{email} -- no MX or A record for {domain}" + + # Try each MX host + for mx in mx_hosts: + try: + code, text = await loop.run_in_executor(None, _smtp_check, email, mx) + return f"{email} -- SMTP {code} {text} (mx: {mx})" + except Exception as exc: + log.debug("emailcheck: SMTP failed for %s via %s: %s", email, mx, exc) + continue + + return f"{email} -- all MX hosts unreachable" + + if len(emails) > 1: + await bot.reply(message, f"Checking {len(emails)} addresses...") + + results = await asyncio.gather(*[_check_one(e) for e in emails]) + for line in results: + await bot.reply(message, line) diff --git a/plugins/jwt.py b/plugins/jwt.py new file mode 100644 index 0000000..71419e4 --- /dev/null +++ b/plugins/jwt.py @@ -0,0 +1,128 @@ +"""Plugin: decode and inspect JSON Web Tokens.""" + +from __future__ import annotations + +import base64 +import json +import logging +import time + +from derp.plugin import command + +log = logging.getLogger(__name__) + +_DANGEROUS_ALGS = {"none", ""} + + +def _b64url_decode(s: str) -> bytes: + """Base64url decode with padding correction.""" + s = s.replace("-", "+").replace("_", "/") + pad = 4 - len(s) % 4 + if pad != 4: + s += "=" * pad + return base64.b64decode(s) + + +def _decode_jwt(token: str) -> tuple[dict, dict, bytes]: + """Decode JWT into (header, payload, signature_bytes). + + Raises ValueError on malformed tokens. + """ + parts = token.split(".") + if len(parts) != 3: + raise ValueError(f"expected 3 parts, got {len(parts)}") + + try: + header = json.loads(_b64url_decode(parts[0])) + except (json.JSONDecodeError, Exception) as exc: + raise ValueError(f"invalid header: {exc}") from exc + + try: + payload = json.loads(_b64url_decode(parts[1])) + except (json.JSONDecodeError, Exception) as exc: + raise ValueError(f"invalid payload: {exc}") from exc + + try: + sig = _b64url_decode(parts[2]) if parts[2] else b"" + except Exception: + sig = b"" + + return header, payload, sig + + +def _check_issues(header: dict, payload: dict) -> list[str]: + """Return list of warning strings for common JWT issues.""" + issues = [] + now = time.time() + + alg = str(header.get("alg", "")).lower() + if alg in _DANGEROUS_ALGS: + issues.append(f'alg="{header.get("alg", "")}" (unsigned)') + + exp = payload.get("exp") + if isinstance(exp, (int, float)): + from datetime import datetime, timezone + exp_dt = datetime.fromtimestamp(exp, tz=timezone.utc) + if exp < now: + issues.append(f"expired ({exp_dt:%Y-%m-%d %H:%M} UTC)") + + nbf = payload.get("nbf") + if isinstance(nbf, (int, float)): + from datetime import datetime, timezone + nbf_dt = datetime.fromtimestamp(nbf, tz=timezone.utc) + if nbf > now: + issues.append(f"not yet valid (nbf={nbf_dt:%Y-%m-%d %H:%M} UTC)") + + return issues + + +def _format_claims(payload: dict) -> str: + """Format payload claims as compact key=value pairs.""" + parts = [] + for key, val in payload.items(): + if key in ("exp", "nbf", "iat") and isinstance(val, (int, float)): + from datetime import datetime, timezone + dt = datetime.fromtimestamp(val, tz=timezone.utc) + parts.append(f"{key}={dt:%Y-%m-%d %H:%M} UTC") + elif isinstance(val, str): + parts.append(f"{key}={val}") + else: + parts.append(f"{key}={json.dumps(val, separators=(',', ':'))}") + return " | ".join(parts) + + +@command("jwt", help="Decode JWT: !jwt ") +async def cmd_jwt(bot, message): + """Decode a JSON Web Token and display header, claims, and issues.""" + parts = message.text.split(None, 2) + if len(parts) < 2: + await bot.reply(message, "Usage: !jwt ") + return + + token = parts[1].strip() + + try: + header, payload, sig = _decode_jwt(token) + except ValueError as exc: + await bot.reply(message, f"Invalid JWT: {exc}") + return + + # Line 1: header + alg = header.get("alg", "?") + typ = header.get("typ", "?") + sig_len = len(sig) + hdr_line = f"Header: alg={alg} typ={typ} | sig={sig_len} bytes" + + # Line 2: claims + if payload: + claims_line = _format_claims(payload) + else: + claims_line = "(empty payload)" + + await bot.reply(message, hdr_line) + await bot.reply(message, claims_line) + + # Line 3: warnings + issues = _check_issues(header, payload) + if issues: + await bot.reply(message, "WARN: " + " | ".join(issues)) diff --git a/plugins/mac.py b/plugins/mac.py new file mode 100644 index 0000000..fb8ca56 --- /dev/null +++ b/plugins/mac.py @@ -0,0 +1,149 @@ +"""Plugin: MAC address OUI vendor lookup using IEEE database.""" + +from __future__ import annotations + +import asyncio +import logging +import os +import re +import urllib.request +from pathlib import Path + +from derp.http import urlopen as _urlopen +from derp.plugin import command + +log = logging.getLogger(__name__) + +_OUI_PATH = Path("data/oui.txt") +_OUI_URL = "https://standards-oui.ieee.org/oui/oui.txt" + +# Module-level lazy-loaded OUI dict: prefix -> vendor name +_oui_db: dict[str, str] | None = None + +# Regex: lines like "AA-BB-CC (hex)\t\tVendor Name" +_OUI_RE = re.compile(r"^([0-9A-F]{2}-[0-9A-F]{2}-[0-9A-F]{2})\s+\(hex\)\s+(.+)$") + + +def _parse_oui(path: Path) -> dict[str, str]: + """Parse IEEE oui.txt into {prefix: vendor} dict.""" + db: dict[str, str] = {} + try: + for line in path.read_text(encoding="utf-8", errors="replace").splitlines(): + m = _OUI_RE.match(line.strip()) + if m: + prefix = m.group(1).replace("-", ":").upper() + db[prefix] = m.group(2).strip() + except OSError as exc: + log.error("mac: failed to read %s: %s", path, exc) + return db + + +def _get_oui_db() -> dict[str, str]: + """Lazy-load OUI database.""" + global _oui_db + if _oui_db is not None: + return _oui_db + if not _OUI_PATH.is_file(): + log.warning("mac: OUI database not found at %s", _OUI_PATH) + return {} + _oui_db = _parse_oui(_OUI_PATH) + log.info("mac: loaded %d OUI entries from %s", len(_oui_db), _OUI_PATH) + return _oui_db + + +def _normalize_mac(raw: str) -> tuple[str, str]: + """Normalize MAC address input. + + Returns (formatted_mac, oui_prefix) or raises ValueError. + """ + # Strip common separators + cleaned = re.sub(r"[:\-.]", "", raw.strip().upper()) + if len(cleaned) != 12 or not re.fullmatch(r"[0-9A-F]{12}", cleaned): + raise ValueError(f"invalid MAC address: {raw}") + + # Format as AA:BB:CC:DD:EE:FF + formatted = ":".join(cleaned[i:i + 2] for i in range(0, 12, 2)) + oui_prefix = ":".join(cleaned[i:i + 2] for i in range(0, 6, 2)) + return formatted, oui_prefix + + +def _random_mac() -> str: + """Generate a random locally-administered unicast MAC address.""" + octets = list(os.urandom(6)) + # Set locally administered bit (bit 1 of first octet) + octets[0] |= 0x02 + # Clear multicast bit (bit 0 of first octet) + octets[0] &= 0xFE + return ":".join(f"{b:02X}" for b in octets) + + +async def _download_oui() -> tuple[bool, int]: + """Download IEEE OUI database. Returns (success, entry_count).""" + global _oui_db + loop = asyncio.get_running_loop() + + def _fetch(): + _OUI_PATH.parent.mkdir(parents=True, exist_ok=True) + req = urllib.request.Request(_OUI_URL, headers={"User-Agent": "derp-bot"}) + with _urlopen(req, timeout=60) as resp: + return resp.read() + + try: + data = await loop.run_in_executor(None, _fetch) + _OUI_PATH.write_bytes(data) + except Exception as exc: + log.error("mac: failed to download OUI database: %s", exc) + return False, 0 + + # Force reload + _oui_db = _parse_oui(_OUI_PATH) + return True, len(_oui_db) + + +@command("mac", help="MAC lookup: !mac ") +async def cmd_mac(bot, message): + """Look up MAC address vendor, generate random MAC, or update OUI database. + + Usage: + !mac AA:BB:CC:DD:EE:FF Vendor lookup + !mac random Generate random MAC + !mac update Download OUI database + """ + parts = message.text.split(None, 2) + if len(parts) < 2: + await bot.reply(message, "Usage: !mac ") + return + + arg = parts[1].strip() + + if arg.lower() == "update": + await bot.reply(message, "Downloading IEEE OUI database...") + ok, count = await _download_oui() + if ok: + await bot.reply(message, f"OUI database updated: {count} vendors") + else: + await bot.reply(message, "Failed to download OUI database") + return + + if arg.lower() == "random": + mac = _random_mac() + await bot.reply(message, f"Random MAC: {mac} (locally administered)") + return + + # Vendor lookup + try: + formatted, oui_prefix = _normalize_mac(arg) + except ValueError as exc: + await bot.reply(message, str(exc)) + return + + db = _get_oui_db() + if not db: + await bot.reply(message, "OUI database not loaded (run !mac update)") + return + + vendor = db.get(oui_prefix) + if vendor: + await bot.reply(message, f"{formatted} -- {vendor} (OUI: {oui_prefix})") + else: + await bot.reply(message, f"{formatted} -- unknown vendor (OUI: {oui_prefix})") diff --git a/plugins/virustotal.py b/plugins/virustotal.py new file mode 100644 index 0000000..fff11ca --- /dev/null +++ b/plugins/virustotal.py @@ -0,0 +1,210 @@ +"""Plugin: VirusTotal API v3 lookup for hashes, IPs, domains, and URLs.""" + +from __future__ import annotations + +import asyncio +import base64 +import json +import logging +import os +import re +import time +import urllib.request + +from derp.http import urlopen as _urlopen +from derp.plugin import command + +log = logging.getLogger(__name__) + +_API_BASE = "https://www.virustotal.com/api/v3" +_RATE_WINDOW = 60 +_RATE_LIMIT = 4 + +# Module-level rate tracking +_request_times: list[float] = [] + +# Detection regexes +_RE_MD5 = re.compile(r"^[0-9a-fA-F]{32}$") +_RE_SHA1 = re.compile(r"^[0-9a-fA-F]{40}$") +_RE_SHA256 = re.compile(r"^[0-9a-fA-F]{64}$") +_RE_URL = re.compile(r"^https?://", re.IGNORECASE) +_RE_IP = re.compile(r"^(\d{1,3}\.){3}\d{1,3}$") +_RE_DOMAIN = re.compile(r"^[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?(\.[a-zA-Z]{2,})+$") + + +def _get_api_key(bot) -> str: + """Resolve API key from environment or config.""" + return (os.environ.get("VIRUSTOTAL_API_KEY", "") + or bot.config.get("virustotal", {}).get("api_key", "")) + + +def _rate_check() -> bool: + """Check rate limit. Returns True if request is allowed.""" + now = time.monotonic() + # Prune old timestamps + _request_times[:] = [t for t in _request_times if (now - t) < _RATE_WINDOW] + if len(_request_times) >= _RATE_LIMIT: + return False + _request_times.append(now) + return True + + +def _detect_type(query: str) -> tuple[str, str]: + """Detect input type. Returns (type, endpoint_path) or raises ValueError.""" + if _RE_MD5.match(query) or _RE_SHA1.match(query) or _RE_SHA256.match(query): + return "file", f"/files/{query.lower()}" + if _RE_URL.match(query): + url_id = base64.urlsafe_b64encode(query.encode()).decode().rstrip("=") + return "url", f"/urls/{url_id}" + if _RE_IP.match(query): + return "ip", f"/ip_addresses/{query}" + if _RE_DOMAIN.match(query): + return "domain", f"/domains/{query.lower()}" + raise ValueError(f"cannot determine type for: {query}") + + +def _vt_request(api_key: str, path: str) -> dict: + """Make VT API request (blocking).""" + req = urllib.request.Request(f"{_API_BASE}{path}", headers={ + "x-apikey": api_key, + "Accept": "application/json", + "User-Agent": "derp-bot", + }) + with _urlopen(req, timeout=20) as resp: + return json.loads(resp.read()) + + +def _format_file(query: str, data: dict) -> str: + """Format file/hash result.""" + attrs = data.get("data", {}).get("attributes", {}) + stats = attrs.get("last_analysis_stats", {}) + malicious = stats.get("malicious", 0) + total = sum(stats.values()) + + tags = attrs.get("popular_threat_classification", {}) + labels = [] + for entry in tags.get("suggested_threat_label", [])[:3]: + labels.append(entry.get("value", "")) + for entry in tags.get("popular_threat_category", [])[:2]: + val = entry.get("value", "") + if val and val not in labels: + labels.append(val) + + first_seen = attrs.get("first_submission_date", "") + if isinstance(first_seen, int): + from datetime import datetime, timezone + first_seen = datetime.fromtimestamp(first_seen, tz=timezone.utc).strftime("%Y-%m-%d") + + short_hash = query[:16] + "..." if len(query) > 16 else query + parts = [f"{short_hash} -- {malicious}/{total} detected"] + if labels: + parts.append(", ".join(labels)) + if first_seen: + parts.append(f"first seen: {first_seen}") + return " | ".join(parts) + + +def _format_ip(query: str, data: dict) -> str: + """Format IP address result.""" + attrs = data.get("data", {}).get("attributes", {}) + stats = attrs.get("last_analysis_stats", {}) + malicious = stats.get("malicious", 0) + total = sum(stats.values()) + asn = attrs.get("asn", "?") + as_owner = attrs.get("as_owner", "?") + country = attrs.get("country", "?") + reputation = attrs.get("reputation", 0) + return (f"{query} -- {malicious}/{total} | AS{asn} {as_owner}" + f" | Country: {country} | Reputation: {reputation}") + + +def _format_domain(query: str, data: dict) -> str: + """Format domain result.""" + attrs = data.get("data", {}).get("attributes", {}) + stats = attrs.get("last_analysis_stats", {}) + malicious = stats.get("malicious", 0) + total = sum(stats.values()) + registrar = attrs.get("registrar", "?") + reputation = attrs.get("reputation", 0) + categories = attrs.get("categories", {}) + cats = list(set(categories.values()))[:3] + parts = [f"{query} -- {malicious}/{total}"] + if cats: + parts.append(", ".join(cats)) + parts.append(f"Registrar: {registrar}") + parts.append(f"Reputation: {reputation}") + return " | ".join(parts) + + +def _format_url(query: str, data: dict) -> str: + """Format URL result.""" + attrs = data.get("data", {}).get("attributes", {}) + stats = attrs.get("last_analysis_stats", {}) + malicious = stats.get("malicious", 0) + total = sum(stats.values()) + title = attrs.get("title", "") + final_url = attrs.get("last_final_url", query) + parts = [f"{final_url} -- {malicious}/{total}"] + if title: + parts.append(title[:60]) + return " | ".join(parts) + + +_FORMATTERS = { + "file": _format_file, + "ip": _format_ip, + "domain": _format_domain, + "url": _format_url, +} + + +@command("vt", help="VirusTotal: !vt ") +async def cmd_vt(bot, message): + """Query VirusTotal API for file hashes, IPs, domains, or URLs. + + Usage: + !vt 44d88612fea8a8f36de82e12... File hash (MD5/SHA1/SHA256) + !vt 8.8.8.8 IP address + !vt example.com Domain + !vt https://example.com/page URL + """ + parts = message.text.split(None, 2) + if len(parts) < 2: + await bot.reply(message, "Usage: !vt ") + return + + api_key = _get_api_key(bot) + if not api_key: + await bot.reply(message, "VirusTotal API key not configured") + return + + query = parts[1].strip() + + try: + qtype, path = _detect_type(query) + except ValueError as exc: + await bot.reply(message, str(exc)) + return + + if not _rate_check(): + await bot.reply(message, "Rate limited (4 requests/min) -- try again shortly") + return + + loop = asyncio.get_running_loop() + + try: + result = await loop.run_in_executor(None, _vt_request, api_key, path) + except urllib.request.HTTPError as exc: + if exc.code == 404: + await bot.reply(message, f"{query} -- not found in VirusTotal") + elif exc.code == 429: + await bot.reply(message, "VirusTotal API quota exceeded") + else: + await bot.reply(message, f"{query} -- API error: HTTP {exc.code}") + return + except Exception as exc: + await bot.reply(message, f"{query} -- error: {exc}") + return + + formatter = _FORMATTERS.get(qtype, _format_file) + await bot.reply(message, formatter(query, result)) diff --git a/scripts/update-data.sh b/scripts/update-data.sh index 90333ac..67550a0 100755 --- a/scripts/update-data.sh +++ b/scripts/update-data.sh @@ -78,6 +78,24 @@ update_iprep() { fi } +# -- IEEE OUI database -------------------------------------------------------- +update_oui() { + local dest="$DATA_DIR/oui.txt" + local url="https://standards-oui.ieee.org/oui/oui.txt" + mkdir -p "$DATA_DIR" + dim "Downloading IEEE OUI database..." + if curl -sS -fL --max-time 60 -o "$dest.tmp" "$url"; then + local count + count=$(grep -cE '^[0-9A-F]{2}-' "$dest.tmp" || true) + mv "$dest.tmp" "$dest" + info "OUI database: $count vendors" + else + rm -f "$dest.tmp" + err "Failed to download OUI database" + ((FAILURES++)) || true + fi +} + # -- GeoLite2 databases ------------------------------------------------------- update_geolite2() { # Requires MAXMIND_LICENSE_KEY env var @@ -132,6 +150,7 @@ echo update_tor update_iprep +update_oui update_exploitdb update_geolite2