Add PySocks dependency and shared src/derp/http.py module providing proxy-aware urlopen() and build_opener() that route through socks5h://127.0.0.1:1080. Subclassed SocksiPyHandler passes SSL context through to HTTPS connections. Swapped 14 external-facing plugins to use the proxied helpers. Local-only traffic (SearXNG, raw DNS/TLS sockets) stays direct. Updated test mocks in test_twitch and test_alert accordingly.
216 lines
7.0 KiB
Python
216 lines
7.0 KiB
Python
"""Plugin: search local exploit-db CSV mirror."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from derp.http import urlopen as _urlopen
|
|
from derp.plugin import command
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
_DATA_DIR = Path("data/exploitdb")
|
|
_CSV_FILE = _DATA_DIR / "files_exploits.csv"
|
|
_CSV_URL = "https://gitlab.com/exploit-database/exploitdb/-/raw/main/files_exploits.csv"
|
|
_MAX_AGE = 86400
|
|
_MAX_RESULTS = 5
|
|
|
|
# In-memory index: list of dicts
|
|
_index: list[dict] = []
|
|
_loaded_at: float = 0
|
|
|
|
|
|
def _load_index() -> list[dict]:
|
|
"""Load the exploit-db CSV into memory."""
|
|
if not _CSV_FILE.is_file():
|
|
return []
|
|
entries = []
|
|
try:
|
|
with open(_CSV_FILE, encoding="utf-8", errors="replace") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
entries.append({
|
|
"id": row.get("id", ""),
|
|
"description": row.get("description", ""),
|
|
"date": row.get("date_published", ""),
|
|
"author": row.get("author", ""),
|
|
"type": row.get("type", ""),
|
|
"platform": row.get("platform", ""),
|
|
"codes": row.get("codes", ""),
|
|
})
|
|
except (OSError, csv.Error) as exc:
|
|
log.error("exploitdb: failed to load CSV: %s", exc)
|
|
return []
|
|
log.info("exploitdb: indexed %d exploits", len(entries))
|
|
return entries
|
|
|
|
|
|
def _refresh_if_stale() -> None:
|
|
"""Reload the index if stale or empty."""
|
|
global _index, _loaded_at
|
|
now = time.monotonic()
|
|
if _index and (now - _loaded_at) < _MAX_AGE:
|
|
return
|
|
idx = _load_index()
|
|
if idx:
|
|
_index = idx
|
|
_loaded_at = now
|
|
|
|
|
|
def _format_entry(entry: dict) -> str:
|
|
"""Format a single exploit entry for IRC output."""
|
|
parts = [f"EDB-{entry['id']}"]
|
|
if entry["date"]:
|
|
parts.append(entry["date"])
|
|
if entry["type"]:
|
|
parts.append(entry["type"])
|
|
if entry["platform"]:
|
|
parts.append(entry["platform"])
|
|
desc = entry["description"]
|
|
if len(desc) > 180:
|
|
desc = desc[:177] + "..."
|
|
parts.append(desc)
|
|
return " | ".join(parts)
|
|
|
|
|
|
async def _download_csv() -> tuple[int, str]:
|
|
"""Download the exploit-db CSV. Returns (count, error)."""
|
|
import asyncio
|
|
import urllib.request
|
|
|
|
_DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
loop = asyncio.get_running_loop()
|
|
|
|
def _fetch():
|
|
req = urllib.request.Request(_CSV_URL, headers={"User-Agent": "derp-bot"})
|
|
with _urlopen(req, timeout=60) as resp:
|
|
return resp.read()
|
|
|
|
try:
|
|
data = await loop.run_in_executor(None, _fetch)
|
|
except Exception as exc:
|
|
return 0, str(exc)[:100]
|
|
|
|
_CSV_FILE.write_bytes(data)
|
|
|
|
# Force reload
|
|
global _index, _loaded_at
|
|
_index = []
|
|
_loaded_at = 0
|
|
_refresh_if_stale()
|
|
return len(_index), ""
|
|
|
|
|
|
@command("exploitdb", help="Exploit-DB: !exploitdb <search|id|update|stats>")
|
|
async def cmd_exploitdb(bot, message):
|
|
"""Search the local exploit-db CSV mirror.
|
|
|
|
Usage:
|
|
!exploitdb search <term> Search by keyword
|
|
!exploitdb <edb-id> Lookup by EDB ID
|
|
!exploitdb cve <CVE-ID> Search by CVE identifier
|
|
!exploitdb update Download latest CSV
|
|
!exploitdb stats Show index statistics
|
|
"""
|
|
parts = message.text.split(None, 2)
|
|
if len(parts) < 2:
|
|
await bot.reply(message, "Usage: !exploitdb <search <term>|<id>|cve <id>|update|stats>")
|
|
return
|
|
|
|
sub = parts[1].strip()
|
|
|
|
if sub == "update":
|
|
await bot.reply(message, "Downloading exploit-db CSV...")
|
|
count, err = await _download_csv()
|
|
if err:
|
|
await bot.reply(message, f"Failed: {err}")
|
|
else:
|
|
await bot.reply(message, f"Loaded {count} exploits")
|
|
return
|
|
|
|
if sub == "stats":
|
|
_refresh_if_stale()
|
|
if not _index:
|
|
await bot.reply(message, "No data loaded (run !exploitdb update)")
|
|
else:
|
|
types: dict[str, int] = {}
|
|
for e in _index:
|
|
t = e["type"] or "unknown"
|
|
types[t] = types.get(t, 0) + 1
|
|
breakdown = ", ".join(f"{v} {k}" for k, v in sorted(types.items()))
|
|
await bot.reply(message, f"Exploit-DB: {len(_index)} exploits ({breakdown})")
|
|
return
|
|
|
|
if sub.lower() == "search":
|
|
term = parts[2].strip() if len(parts) > 2 else ""
|
|
if not term:
|
|
await bot.reply(message, "Usage: !exploitdb search <term>")
|
|
return
|
|
_refresh_if_stale()
|
|
if not _index:
|
|
await bot.reply(message, "No data loaded (run !exploitdb update)")
|
|
return
|
|
term_lower = term.lower()
|
|
matches = [e for e in _index if term_lower in e["description"].lower()]
|
|
if not matches:
|
|
await bot.reply(message, f"No exploits matching '{term}'")
|
|
return
|
|
for entry in matches[:_MAX_RESULTS]:
|
|
await bot.reply(message, _format_entry(entry))
|
|
if len(matches) > _MAX_RESULTS:
|
|
await bot.reply(message, f"({len(matches)} total, showing {_MAX_RESULTS})")
|
|
return
|
|
|
|
if sub.lower() == "cve":
|
|
cve_id = parts[2].strip().upper() if len(parts) > 2 else ""
|
|
if not cve_id:
|
|
await bot.reply(message, "Usage: !exploitdb cve <CVE-ID>")
|
|
return
|
|
_refresh_if_stale()
|
|
if not _index:
|
|
await bot.reply(message, "No data loaded (run !exploitdb update)")
|
|
return
|
|
matches = [e for e in _index if cve_id in e["codes"].upper()]
|
|
if not matches:
|
|
await bot.reply(message, f"No exploits for {cve_id}")
|
|
return
|
|
for entry in matches[:_MAX_RESULTS]:
|
|
await bot.reply(message, _format_entry(entry))
|
|
if len(matches) > _MAX_RESULTS:
|
|
await bot.reply(message, f"({len(matches)} total, showing {_MAX_RESULTS})")
|
|
return
|
|
|
|
# Direct ID lookup
|
|
if sub.isdigit():
|
|
_refresh_if_stale()
|
|
if not _index:
|
|
await bot.reply(message, "No data loaded (run !exploitdb update)")
|
|
return
|
|
for entry in _index:
|
|
if entry["id"] == sub:
|
|
await bot.reply(message, _format_entry(entry))
|
|
if entry["codes"]:
|
|
await bot.reply(message, f" Refs: {entry['codes']}")
|
|
return
|
|
await bot.reply(message, f"EDB-{sub}: not found")
|
|
return
|
|
|
|
# Fallback: treat as search term
|
|
_refresh_if_stale()
|
|
if not _index:
|
|
await bot.reply(message, "No data loaded (run !exploitdb update)")
|
|
return
|
|
rest = parts[2].strip() if len(parts) > 2 else ""
|
|
term = f"{sub} {rest}".strip().lower()
|
|
matches = [e for e in _index if term in e["description"].lower()]
|
|
if not matches:
|
|
await bot.reply(message, f"No exploits matching '{term}'")
|
|
return
|
|
for entry in matches[:_MAX_RESULTS]:
|
|
await bot.reply(message, _format_entry(entry))
|
|
if len(matches) > _MAX_RESULTS:
|
|
await bot.reply(message, f"({len(matches)} total, showing {_MAX_RESULTS})")
|