"""Plugin: CVE lookup against local NVD JSON feed.""" from __future__ import annotations import json import logging import re import time from pathlib import Path from derp.http import urlopen as _urlopen from derp.plugin import command log = logging.getLogger(__name__) _DATA_DIR = Path("data/nvd") _MAX_AGE = 86400 _CVE_RE = re.compile(r"^CVE-\d{4}-\d{4,}$", re.IGNORECASE) _MAX_RESULTS = 5 # In-memory index: cve_id -> {description, severity, score, published} _index: dict[str, dict] = {} _loaded_at: float = 0 def _load_index() -> dict[str, dict]: """Load NVD JSON files into a searchable index.""" idx: dict[str, dict] = {} if not _DATA_DIR.is_dir(): return idx for path in sorted(_DATA_DIR.glob("*.json")): try: data = json.loads(path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError) as exc: log.warning("cve: skipping %s: %s", path.name, exc) continue vulns = data.get("vulnerabilities", []) for entry in vulns: cve = entry.get("cve", {}) cve_id = cve.get("id", "") if not cve_id: continue # Extract English description descs = cve.get("descriptions", []) desc = "" for d in descs: if d.get("lang") == "en": desc = d.get("value", "") break if not desc and descs: desc = descs[0].get("value", "") # Extract CVSS score (prefer v3.1, then v3.0, then v2) metrics = cve.get("metrics", {}) score = "" severity = "" for key in ("cvssMetricV31", "cvssMetricV30", "cvssMetricV2"): metric_list = metrics.get(key, []) if metric_list: cvss = metric_list[0].get("cvssData", {}) score = cvss.get("baseScore", "") severity = cvss.get("baseSeverity", "") break published = cve.get("published", "")[:10] idx[cve_id.upper()] = { "description": desc, "severity": severity, "score": score, "published": published, } log.info("cve: indexed %d CVEs from %s", len(idx), _DATA_DIR) return idx def _refresh_if_stale() -> None: """Reload the index if stale.""" global _index, _loaded_at now = time.monotonic() if _index and (now - _loaded_at) < _MAX_AGE: return idx = _load_index() if idx: _index = idx _loaded_at = now def _format_cve(cve_id: str, rec: dict) -> str: """Format a single CVE entry for IRC output.""" parts = [cve_id] if rec["score"]: sev = f" {rec['severity']}" if rec["severity"] else "" parts.append(f"CVSS {rec['score']}{sev}") if rec["published"]: parts.append(rec["published"]) desc = rec["description"] if len(desc) > 200: desc = desc[:197] + "..." parts.append(desc) return " | ".join(parts) async def _download_nvd() -> tuple[int, str]: """Download NVD CVE JSON feed. Returns (count, error).""" import asyncio import urllib.request _DATA_DIR.mkdir(parents=True, exist_ok=True) loop = asyncio.get_running_loop() # NVD 2.0 API: paginated, 2000 per request base_url = "https://services.nvd.nist.gov/rest/json/cves/2.0" page_size = 2000 start_index = 0 total = 0 file_num = 0 def _fetch(url): req = urllib.request.Request(url, headers={"User-Agent": "derp-bot"}) with _urlopen(req, timeout=120) as resp: return resp.read() try: while True: url = f"{base_url}?startIndex={start_index}&resultsPerPage={page_size}" data = await loop.run_in_executor(None, _fetch, url) parsed = json.loads(data) total_results = parsed.get("totalResults", 0) vulns = parsed.get("vulnerabilities", []) if not vulns: break dest = _DATA_DIR / f"nvd_{file_num:04d}.json" dest.write_bytes(data) total += len(vulns) file_num += 1 start_index += page_size if start_index >= total_results: break # Rate limit: NVD allows ~5 req/30s without API key await asyncio.sleep(6) except Exception as exc: if total > 0: return total, f"partial ({exc})" return 0, str(exc) global _index, _loaded_at _index = {} _loaded_at = 0 return total, "" @command("cve", help="CVE lookup: !cve ") async def cmd_cve(bot, message): """Look up CVE details or search by keyword. Usage: !cve CVE-2024-1234 Lookup specific CVE !cve search apache rce Search descriptions !cve update Download NVD feed (slow) !cve stats Show index statistics """ parts = message.text.split(None, 2) if len(parts) < 2: await bot.reply(message, "Usage: !cve |update|stats>") return arg = parts[1].strip() if arg == "update": await bot.reply(message, "Downloading NVD feed (this takes a while)...") count, err = await _download_nvd() if err and count == 0: await bot.reply(message, f"Failed: {err}") elif err: await bot.reply(message, f"Downloaded {count} CVEs ({err})") else: await bot.reply(message, f"Downloaded {count} CVEs") return if arg == "stats": _refresh_if_stale() if not _index: await bot.reply(message, "No CVE data loaded (run !cve update)") else: await bot.reply(message, f"CVE index: {len(_index)} entries") return if arg.lower() == "search": term = parts[2].strip() if len(parts) > 2 else "" if not term: await bot.reply(message, "Usage: !cve search ") return _refresh_if_stale() if not _index: await bot.reply(message, "No CVE data loaded (run !cve update)") return term_lower = term.lower() matches = [] for cve_id, rec in _index.items(): if term_lower in rec["description"].lower() or term_lower in cve_id.lower(): matches.append((cve_id, rec)) if len(matches) >= _MAX_RESULTS: break if not matches: await bot.reply(message, f"No CVEs matching '{term}'") else: for cve_id, rec in matches: await bot.reply(message, _format_cve(cve_id, rec)) return # Direct CVE-ID lookup cve_id = arg.upper() if not _CVE_RE.match(cve_id): # Maybe it's a search term without "search" prefix _refresh_if_stale() if not _index: await bot.reply(message, "No CVE data loaded (run !cve update)") return term_lower = arg.lower() rest = parts[2].strip() if len(parts) > 2 else "" if rest: term_lower = f"{term_lower} {rest.lower()}" matches = [] for cid, rec in _index.items(): if term_lower in rec["description"].lower(): matches.append((cid, rec)) if len(matches) >= _MAX_RESULTS: break if not matches: await bot.reply(message, f"No CVEs matching '{arg}'") else: for cid, rec in matches: await bot.reply(message, _format_cve(cid, rec)) return _refresh_if_stale() if not _index: await bot.reply(message, "No CVE data loaded (run !cve update)") return rec = _index.get(cve_id) if not rec: await bot.reply(message, f"{cve_id}: not found in local index") return await bot.reply(message, _format_cve(cve_id, rec))