Files
derp/plugins/exploitdb.py
user 4a2960b288 feat: add exploitdb and payload plugins, complete wave 4
ExploitDB: search local exploit-db CSV mirror by keyword, EDB ID,
or CVE identifier. In-bot update command downloads the latest CSV
from GitLab. Also added to the update-data.sh script.

Payload: built-in template library with 52 payloads across 6
categories (sqli, xss, ssti, lfi, cmdi, xxe). Supports browsing,
numeric index, and keyword search within categories.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 02:54:38 +01:00

215 lines
7.0 KiB
Python

"""Plugin: search local exploit-db CSV mirror."""
from __future__ import annotations
import csv
import logging
import time
from pathlib import Path
from derp.plugin import command
log = logging.getLogger(__name__)
_DATA_DIR = Path("data/exploitdb")
_CSV_FILE = _DATA_DIR / "files_exploits.csv"
_CSV_URL = "https://gitlab.com/exploit-database/exploitdb/-/raw/main/files_exploits.csv"
_MAX_AGE = 86400
_MAX_RESULTS = 5
# In-memory index: list of dicts
_index: list[dict] = []
_loaded_at: float = 0
def _load_index() -> list[dict]:
"""Load the exploit-db CSV into memory."""
if not _CSV_FILE.is_file():
return []
entries = []
try:
with open(_CSV_FILE, encoding="utf-8", errors="replace") as f:
reader = csv.DictReader(f)
for row in reader:
entries.append({
"id": row.get("id", ""),
"description": row.get("description", ""),
"date": row.get("date_published", ""),
"author": row.get("author", ""),
"type": row.get("type", ""),
"platform": row.get("platform", ""),
"codes": row.get("codes", ""),
})
except (OSError, csv.Error) as exc:
log.error("exploitdb: failed to load CSV: %s", exc)
return []
log.info("exploitdb: indexed %d exploits", len(entries))
return entries
def _refresh_if_stale() -> None:
"""Reload the index if stale or empty."""
global _index, _loaded_at
now = time.monotonic()
if _index and (now - _loaded_at) < _MAX_AGE:
return
idx = _load_index()
if idx:
_index = idx
_loaded_at = now
def _format_entry(entry: dict) -> str:
"""Format a single exploit entry for IRC output."""
parts = [f"EDB-{entry['id']}"]
if entry["date"]:
parts.append(entry["date"])
if entry["type"]:
parts.append(entry["type"])
if entry["platform"]:
parts.append(entry["platform"])
desc = entry["description"]
if len(desc) > 180:
desc = desc[:177] + "..."
parts.append(desc)
return " | ".join(parts)
async def _download_csv() -> tuple[int, str]:
"""Download the exploit-db CSV. Returns (count, error)."""
import asyncio
import urllib.request
_DATA_DIR.mkdir(parents=True, exist_ok=True)
loop = asyncio.get_running_loop()
def _fetch():
req = urllib.request.Request(_CSV_URL, headers={"User-Agent": "derp-bot"})
with urllib.request.urlopen(req, timeout=60) as resp: # noqa: S310
return resp.read()
try:
data = await loop.run_in_executor(None, _fetch)
except Exception as exc:
return 0, str(exc)[:100]
_CSV_FILE.write_bytes(data)
# Force reload
global _index, _loaded_at
_index = []
_loaded_at = 0
_refresh_if_stale()
return len(_index), ""
@command("exploitdb", help="Exploit-DB: !exploitdb <search|id|update|stats>")
async def cmd_exploitdb(bot, message):
"""Search the local exploit-db CSV mirror.
Usage:
!exploitdb search <term> Search by keyword
!exploitdb <edb-id> Lookup by EDB ID
!exploitdb cve <CVE-ID> Search by CVE identifier
!exploitdb update Download latest CSV
!exploitdb stats Show index statistics
"""
parts = message.text.split(None, 2)
if len(parts) < 2:
await bot.reply(message, "Usage: !exploitdb <search <term>|<id>|cve <id>|update|stats>")
return
sub = parts[1].strip()
if sub == "update":
await bot.reply(message, "Downloading exploit-db CSV...")
count, err = await _download_csv()
if err:
await bot.reply(message, f"Failed: {err}")
else:
await bot.reply(message, f"Loaded {count} exploits")
return
if sub == "stats":
_refresh_if_stale()
if not _index:
await bot.reply(message, "No data loaded (run !exploitdb update)")
else:
types: dict[str, int] = {}
for e in _index:
t = e["type"] or "unknown"
types[t] = types.get(t, 0) + 1
breakdown = ", ".join(f"{v} {k}" for k, v in sorted(types.items()))
await bot.reply(message, f"Exploit-DB: {len(_index)} exploits ({breakdown})")
return
if sub.lower() == "search":
term = parts[2].strip() if len(parts) > 2 else ""
if not term:
await bot.reply(message, "Usage: !exploitdb search <term>")
return
_refresh_if_stale()
if not _index:
await bot.reply(message, "No data loaded (run !exploitdb update)")
return
term_lower = term.lower()
matches = [e for e in _index if term_lower in e["description"].lower()]
if not matches:
await bot.reply(message, f"No exploits matching '{term}'")
return
for entry in matches[:_MAX_RESULTS]:
await bot.reply(message, _format_entry(entry))
if len(matches) > _MAX_RESULTS:
await bot.reply(message, f"({len(matches)} total, showing {_MAX_RESULTS})")
return
if sub.lower() == "cve":
cve_id = parts[2].strip().upper() if len(parts) > 2 else ""
if not cve_id:
await bot.reply(message, "Usage: !exploitdb cve <CVE-ID>")
return
_refresh_if_stale()
if not _index:
await bot.reply(message, "No data loaded (run !exploitdb update)")
return
matches = [e for e in _index if cve_id in e["codes"].upper()]
if not matches:
await bot.reply(message, f"No exploits for {cve_id}")
return
for entry in matches[:_MAX_RESULTS]:
await bot.reply(message, _format_entry(entry))
if len(matches) > _MAX_RESULTS:
await bot.reply(message, f"({len(matches)} total, showing {_MAX_RESULTS})")
return
# Direct ID lookup
if sub.isdigit():
_refresh_if_stale()
if not _index:
await bot.reply(message, "No data loaded (run !exploitdb update)")
return
for entry in _index:
if entry["id"] == sub:
await bot.reply(message, _format_entry(entry))
if entry["codes"]:
await bot.reply(message, f" Refs: {entry['codes']}")
return
await bot.reply(message, f"EDB-{sub}: not found")
return
# Fallback: treat as search term
_refresh_if_stale()
if not _index:
await bot.reply(message, "No data loaded (run !exploitdb update)")
return
rest = parts[2].strip() if len(parts) > 2 else ""
term = f"{sub} {rest}".strip().lower()
matches = [e for e in _index if term in e["description"].lower()]
if not matches:
await bot.reply(message, f"No exploits matching '{term}'")
return
for entry in matches[:_MAX_RESULTS]:
await bot.reply(message, _format_entry(entry))
if len(matches) > _MAX_RESULTS:
await bot.reply(message, f"({len(matches)} total, showing {_MAX_RESULTS})")