"""Plugin: search local exploit-db CSV mirror.""" from __future__ import annotations import csv import logging import time from pathlib import Path from derp.http import urlopen as _urlopen from derp.plugin import command log = logging.getLogger(__name__) _DATA_DIR = Path("data/exploitdb") _CSV_FILE = _DATA_DIR / "files_exploits.csv" _CSV_URL = "https://gitlab.com/exploit-database/exploitdb/-/raw/main/files_exploits.csv" _MAX_AGE = 86400 _MAX_RESULTS = 5 # In-memory index: list of dicts _index: list[dict] = [] _loaded_at: float = 0 def _load_index() -> list[dict]: """Load the exploit-db CSV into memory.""" if not _CSV_FILE.is_file(): return [] entries = [] try: with open(_CSV_FILE, encoding="utf-8", errors="replace") as f: reader = csv.DictReader(f) for row in reader: entries.append({ "id": row.get("id", ""), "description": row.get("description", ""), "date": row.get("date_published", ""), "author": row.get("author", ""), "type": row.get("type", ""), "platform": row.get("platform", ""), "codes": row.get("codes", ""), }) except (OSError, csv.Error) as exc: log.error("exploitdb: failed to load CSV: %s", exc) return [] log.info("exploitdb: indexed %d exploits", len(entries)) return entries def _refresh_if_stale() -> None: """Reload the index if stale or empty.""" global _index, _loaded_at now = time.monotonic() if _index and (now - _loaded_at) < _MAX_AGE: return idx = _load_index() if idx: _index = idx _loaded_at = now def _format_entry(entry: dict) -> str: """Format a single exploit entry for IRC output.""" parts = [f"EDB-{entry['id']}"] if entry["date"]: parts.append(entry["date"]) if entry["type"]: parts.append(entry["type"]) if entry["platform"]: parts.append(entry["platform"]) desc = entry["description"] if len(desc) > 180: desc = desc[:177] + "..." parts.append(desc) return " | ".join(parts) async def _download_csv() -> tuple[int, str]: """Download the exploit-db CSV. Returns (count, error).""" import asyncio import urllib.request _DATA_DIR.mkdir(parents=True, exist_ok=True) loop = asyncio.get_running_loop() def _fetch(): req = urllib.request.Request(_CSV_URL, headers={"User-Agent": "derp-bot"}) with _urlopen(req, timeout=60) as resp: return resp.read() try: data = await loop.run_in_executor(None, _fetch) except Exception as exc: return 0, str(exc)[:100] _CSV_FILE.write_bytes(data) # Force reload global _index, _loaded_at _index = [] _loaded_at = 0 _refresh_if_stale() return len(_index), "" @command("exploitdb", help="Exploit-DB: !exploitdb ") async def cmd_exploitdb(bot, message): """Search the local exploit-db CSV mirror. Usage: !exploitdb search Search by keyword !exploitdb Lookup by EDB ID !exploitdb cve Search by CVE identifier !exploitdb update Download latest CSV !exploitdb stats Show index statistics """ parts = message.text.split(None, 2) if len(parts) < 2: await bot.reply(message, "Usage: !exploitdb ||cve |update|stats>") return sub = parts[1].strip() if sub == "update": await bot.reply(message, "Downloading exploit-db CSV...") count, err = await _download_csv() if err: await bot.reply(message, f"Failed: {err}") else: await bot.reply(message, f"Loaded {count} exploits") return if sub == "stats": _refresh_if_stale() if not _index: await bot.reply(message, "No data loaded (run !exploitdb update)") else: types: dict[str, int] = {} for e in _index: t = e["type"] or "unknown" types[t] = types.get(t, 0) + 1 breakdown = ", ".join(f"{v} {k}" for k, v in sorted(types.items())) await bot.reply(message, f"Exploit-DB: {len(_index)} exploits ({breakdown})") return if sub.lower() == "search": term = parts[2].strip() if len(parts) > 2 else "" if not term: await bot.reply(message, "Usage: !exploitdb search ") return _refresh_if_stale() if not _index: await bot.reply(message, "No data loaded (run !exploitdb update)") return term_lower = term.lower() matches = [e for e in _index if term_lower in e["description"].lower()] if not matches: await bot.reply(message, f"No exploits matching '{term}'") return lines = [_format_entry(e) for e in matches[:_MAX_RESULTS]] if len(matches) > _MAX_RESULTS: lines.append(f"({len(matches)} total, showing {_MAX_RESULTS})") await bot.long_reply(message, lines, label="exploits") return if sub.lower() == "cve": cve_id = parts[2].strip().upper() if len(parts) > 2 else "" if not cve_id: await bot.reply(message, "Usage: !exploitdb cve ") return _refresh_if_stale() if not _index: await bot.reply(message, "No data loaded (run !exploitdb update)") return matches = [e for e in _index if cve_id in e["codes"].upper()] if not matches: await bot.reply(message, f"No exploits for {cve_id}") return lines = [_format_entry(e) for e in matches[:_MAX_RESULTS]] if len(matches) > _MAX_RESULTS: lines.append(f"({len(matches)} total, showing {_MAX_RESULTS})") await bot.long_reply(message, lines, label="exploits") return # Direct ID lookup if sub.isdigit(): _refresh_if_stale() if not _index: await bot.reply(message, "No data loaded (run !exploitdb update)") return for entry in _index: if entry["id"] == sub: await bot.reply(message, _format_entry(entry)) if entry["codes"]: await bot.reply(message, f" Refs: {entry['codes']}") return await bot.reply(message, f"EDB-{sub}: not found") return # Fallback: treat as search term _refresh_if_stale() if not _index: await bot.reply(message, "No data loaded (run !exploitdb update)") return rest = parts[2].strip() if len(parts) > 2 else "" term = f"{sub} {rest}".strip().lower() matches = [e for e in _index if term in e["description"].lower()] if not matches: await bot.reply(message, f"No exploits matching '{term}'") return lines = [_format_entry(e) for e in matches[:_MAX_RESULTS]] if len(matches) > _MAX_RESULTS: lines.append(f"({len(matches)} total, showing {_MAX_RESULTS})") await bot.long_reply(message, lines, label="exploits")