- MAC vendor lookup (IEEE OUI database) - BLE company_id to manufacturer mapping - Device profile enrichment in API responses - Export endpoints: devices.csv, devices.json, alerts.csv, probes.csv - Auto-populate vendor on device creation - CLI command: flask download-oui - Makefile target: make oui 13 tests passing
88 lines
2.3 KiB
Python
88 lines
2.3 KiB
Python
"""MAC OUI vendor lookup."""
|
|
import csv
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# OUI database path
|
|
OUI_DB_PATH = Path(os.environ.get('OUI_DB_PATH', '/var/lib/esp32-web/oui.csv'))
|
|
OUI_DB_URL = 'https://standards-oui.ieee.org/oui/oui.csv'
|
|
|
|
# In-memory cache
|
|
_oui_cache: dict[str, str] = {}
|
|
_loaded = False
|
|
|
|
|
|
def _normalize_mac(mac: str) -> str:
|
|
"""Extract OUI prefix (first 6 hex chars) from MAC."""
|
|
clean = mac.upper().replace(':', '').replace('-', '').replace('.', '')
|
|
return clean[:6] if len(clean) >= 6 else ''
|
|
|
|
|
|
def load_oui_db(path: Path | None = None) -> int:
|
|
"""Load OUI database from CSV file."""
|
|
global _oui_cache, _loaded
|
|
|
|
if path is None:
|
|
path = OUI_DB_PATH
|
|
|
|
if not path.exists():
|
|
log.warning('OUI database not found: %s', path)
|
|
return 0
|
|
|
|
_oui_cache.clear()
|
|
count = 0
|
|
|
|
try:
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
# IEEE CSV format: Registry,Assignment,Organization Name,...
|
|
assignment = row.get('Assignment', '').strip().upper()
|
|
org = row.get('Organization Name', '').strip()
|
|
if assignment and org:
|
|
_oui_cache[assignment] = org
|
|
count += 1
|
|
except Exception as e:
|
|
log.exception('Error loading OUI database: %s', e)
|
|
return 0
|
|
|
|
_loaded = True
|
|
log.info('Loaded %d OUI entries from %s', count, path)
|
|
return count
|
|
|
|
|
|
def lookup_vendor(mac: str) -> str | None:
|
|
"""Lookup vendor by MAC address."""
|
|
global _loaded
|
|
|
|
if not _loaded:
|
|
load_oui_db()
|
|
|
|
oui = _normalize_mac(mac)
|
|
if not oui:
|
|
return None
|
|
|
|
return _oui_cache.get(oui)
|
|
|
|
|
|
def download_oui_db(path: Path | None = None) -> bool:
|
|
"""Download OUI database from IEEE."""
|
|
import urllib.request
|
|
|
|
if path is None:
|
|
path = OUI_DB_PATH
|
|
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
log.info('Downloading OUI database from %s', OUI_DB_URL)
|
|
urllib.request.urlretrieve(OUI_DB_URL, path)
|
|
log.info('OUI database saved to %s', path)
|
|
return True
|
|
except Exception as e:
|
|
log.exception('Error downloading OUI database: %s', e)
|
|
return False
|