feat: Add OSINT features (v0.1.2)
- MAC vendor lookup (IEEE OUI database) - BLE company_id to manufacturer mapping - Device profile enrichment in API responses - Export endpoints: devices.csv, devices.json, alerts.csv, probes.csv - Auto-populate vendor on device creation - CLI command: flask download-oui - Makefile target: make oui 13 tests passing
This commit is contained in:
87
src/esp32_web/utils/oui.py
Normal file
87
src/esp32_web/utils/oui.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""MAC OUI vendor lookup."""
|
||||
import csv
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# OUI database path
|
||||
OUI_DB_PATH = Path(os.environ.get('OUI_DB_PATH', '/var/lib/esp32-web/oui.csv'))
|
||||
OUI_DB_URL = 'https://standards-oui.ieee.org/oui/oui.csv'
|
||||
|
||||
# In-memory cache
|
||||
_oui_cache: dict[str, str] = {}
|
||||
_loaded = False
|
||||
|
||||
|
||||
def _normalize_mac(mac: str) -> str:
|
||||
"""Extract OUI prefix (first 6 hex chars) from MAC."""
|
||||
clean = mac.upper().replace(':', '').replace('-', '').replace('.', '')
|
||||
return clean[:6] if len(clean) >= 6 else ''
|
||||
|
||||
|
||||
def load_oui_db(path: Path | None = None) -> int:
|
||||
"""Load OUI database from CSV file."""
|
||||
global _oui_cache, _loaded
|
||||
|
||||
if path is None:
|
||||
path = OUI_DB_PATH
|
||||
|
||||
if not path.exists():
|
||||
log.warning('OUI database not found: %s', path)
|
||||
return 0
|
||||
|
||||
_oui_cache.clear()
|
||||
count = 0
|
||||
|
||||
try:
|
||||
with open(path, 'r', encoding='utf-8') as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
# IEEE CSV format: Registry,Assignment,Organization Name,...
|
||||
assignment = row.get('Assignment', '').strip().upper()
|
||||
org = row.get('Organization Name', '').strip()
|
||||
if assignment and org:
|
||||
_oui_cache[assignment] = org
|
||||
count += 1
|
||||
except Exception as e:
|
||||
log.exception('Error loading OUI database: %s', e)
|
||||
return 0
|
||||
|
||||
_loaded = True
|
||||
log.info('Loaded %d OUI entries from %s', count, path)
|
||||
return count
|
||||
|
||||
|
||||
def lookup_vendor(mac: str) -> str | None:
|
||||
"""Lookup vendor by MAC address."""
|
||||
global _loaded
|
||||
|
||||
if not _loaded:
|
||||
load_oui_db()
|
||||
|
||||
oui = _normalize_mac(mac)
|
||||
if not oui:
|
||||
return None
|
||||
|
||||
return _oui_cache.get(oui)
|
||||
|
||||
|
||||
def download_oui_db(path: Path | None = None) -> bool:
|
||||
"""Download OUI database from IEEE."""
|
||||
import urllib.request
|
||||
|
||||
if path is None:
|
||||
path = OUI_DB_PATH
|
||||
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
log.info('Downloading OUI database from %s', OUI_DB_URL)
|
||||
urllib.request.urlretrieve(OUI_DB_URL, path)
|
||||
log.info('OUI database saved to %s', path)
|
||||
return True
|
||||
except Exception as e:
|
||||
log.exception('Error downloading OUI database: %s', e)
|
||||
return False
|
||||
Reference in New Issue
Block a user