Files
derp/plugins/mac.py
user eb37fef730 feat: add jwt, mac, abuseipdb, virustotal, and emailcheck plugins
v2.0.0 sprint 1 -- five standalone plugins requiring no core changes:

- jwt: decode JWT header/payload, flag alg=none/expired/nbf issues
- mac: IEEE OUI vendor lookup, random MAC generation, OUI download
- abuseipdb: IP reputation check + abuse reporting (admin) via API
- virustotal: hash/IP/domain/URL lookup via VT APIv3, 4/min rate limit
- emailcheck: SMTP RCPT TO verification via MX + SOCKS proxy (admin)

Also adds update_oui() to update-data.sh and documents all five
plugins in USAGE.md and CHEATSHEET.md.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 21:04:43 +01:00

150 lines
4.6 KiB
Python

"""Plugin: MAC address OUI vendor lookup using IEEE database."""
from __future__ import annotations
import asyncio
import logging
import os
import re
import urllib.request
from pathlib import Path
from derp.http import urlopen as _urlopen
from derp.plugin import command
log = logging.getLogger(__name__)
_OUI_PATH = Path("data/oui.txt")
_OUI_URL = "https://standards-oui.ieee.org/oui/oui.txt"
# Module-level lazy-loaded OUI dict: prefix -> vendor name
_oui_db: dict[str, str] | None = None
# Regex: lines like "AA-BB-CC (hex)\t\tVendor Name"
_OUI_RE = re.compile(r"^([0-9A-F]{2}-[0-9A-F]{2}-[0-9A-F]{2})\s+\(hex\)\s+(.+)$")
def _parse_oui(path: Path) -> dict[str, str]:
"""Parse IEEE oui.txt into {prefix: vendor} dict."""
db: dict[str, str] = {}
try:
for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
m = _OUI_RE.match(line.strip())
if m:
prefix = m.group(1).replace("-", ":").upper()
db[prefix] = m.group(2).strip()
except OSError as exc:
log.error("mac: failed to read %s: %s", path, exc)
return db
def _get_oui_db() -> dict[str, str]:
"""Lazy-load OUI database."""
global _oui_db
if _oui_db is not None:
return _oui_db
if not _OUI_PATH.is_file():
log.warning("mac: OUI database not found at %s", _OUI_PATH)
return {}
_oui_db = _parse_oui(_OUI_PATH)
log.info("mac: loaded %d OUI entries from %s", len(_oui_db), _OUI_PATH)
return _oui_db
def _normalize_mac(raw: str) -> tuple[str, str]:
"""Normalize MAC address input.
Returns (formatted_mac, oui_prefix) or raises ValueError.
"""
# Strip common separators
cleaned = re.sub(r"[:\-.]", "", raw.strip().upper())
if len(cleaned) != 12 or not re.fullmatch(r"[0-9A-F]{12}", cleaned):
raise ValueError(f"invalid MAC address: {raw}")
# Format as AA:BB:CC:DD:EE:FF
formatted = ":".join(cleaned[i:i + 2] for i in range(0, 12, 2))
oui_prefix = ":".join(cleaned[i:i + 2] for i in range(0, 6, 2))
return formatted, oui_prefix
def _random_mac() -> str:
"""Generate a random locally-administered unicast MAC address."""
octets = list(os.urandom(6))
# Set locally administered bit (bit 1 of first octet)
octets[0] |= 0x02
# Clear multicast bit (bit 0 of first octet)
octets[0] &= 0xFE
return ":".join(f"{b:02X}" for b in octets)
async def _download_oui() -> tuple[bool, int]:
"""Download IEEE OUI database. Returns (success, entry_count)."""
global _oui_db
loop = asyncio.get_running_loop()
def _fetch():
_OUI_PATH.parent.mkdir(parents=True, exist_ok=True)
req = urllib.request.Request(_OUI_URL, headers={"User-Agent": "derp-bot"})
with _urlopen(req, timeout=60) as resp:
return resp.read()
try:
data = await loop.run_in_executor(None, _fetch)
_OUI_PATH.write_bytes(data)
except Exception as exc:
log.error("mac: failed to download OUI database: %s", exc)
return False, 0
# Force reload
_oui_db = _parse_oui(_OUI_PATH)
return True, len(_oui_db)
@command("mac", help="MAC lookup: !mac <address|random|update>")
async def cmd_mac(bot, message):
"""Look up MAC address vendor, generate random MAC, or update OUI database.
Usage:
!mac AA:BB:CC:DD:EE:FF Vendor lookup
!mac random Generate random MAC
!mac update Download OUI database
"""
parts = message.text.split(None, 2)
if len(parts) < 2:
await bot.reply(message, "Usage: !mac <address|random|update>")
return
arg = parts[1].strip()
if arg.lower() == "update":
await bot.reply(message, "Downloading IEEE OUI database...")
ok, count = await _download_oui()
if ok:
await bot.reply(message, f"OUI database updated: {count} vendors")
else:
await bot.reply(message, "Failed to download OUI database")
return
if arg.lower() == "random":
mac = _random_mac()
await bot.reply(message, f"Random MAC: {mac} (locally administered)")
return
# Vendor lookup
try:
formatted, oui_prefix = _normalize_mac(arg)
except ValueError as exc:
await bot.reply(message, str(exc))
return
db = _get_oui_db()
if not db:
await bot.reply(message, "OUI database not loaded (run !mac update)")
return
vendor = db.get(oui_prefix)
if vendor:
await bot.reply(message, f"{formatted} -- {vendor} (OUI: {oui_prefix})")
else:
await bot.reply(message, f"{formatted} -- unknown vendor (OUI: {oui_prefix})")