Initial commit: RF Mapper v0.3.0-dev

WiFi & Bluetooth signal mapping tool for Raspberry Pi with:
- WiFi scanning via iw command
- Bluetooth Classic/BLE device discovery
- RSSI-based distance estimation
- OUI manufacturer lookup
- Web dashboard with multiple views:
  - Radar view (polar plot)
  - 2D Map (Leaflet/OpenStreetMap)
  - 3D Map (MapLibre GL JS with building extrusion)
- Floor-based device positioning
- Live BT tracking mode (auto-starts on page load)
- SQLite database for historical device tracking:
  - RSSI time-series history
  - Device statistics (avg/min/max)
  - Movement detection and velocity estimation
  - Activity patterns (hourly/daily)
  - New device alerts
  - Automatic data retention/cleanup
- REST API for all functionality

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
User
2026-02-01 00:08:21 +01:00
commit 52df6421be
33 changed files with 8939 additions and 0 deletions

428
src/rf_mapper/scanner.py Normal file
View File

@@ -0,0 +1,428 @@
"""RF Environment Scanner - WiFi and Bluetooth device discovery"""
import subprocess
import re
import json
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, asdict
from .oui import OUILookup
from .bluetooth_class import BluetoothClassDecoder
from .distance import estimate_distance, rssi_to_quality, rssi_bar
from .bluetooth_identify import (
identify_device,
infer_device_type_from_name,
infer_device_type_from_manufacturer,
is_random_mac,
)
@dataclass
class WifiNetwork:
"""Represents a discovered WiFi network"""
ssid: str
bssid: str
rssi: int # dBm
channel: int
frequency: int # MHz
encryption: str
manufacturer: str = ""
floor: int | None = None # Floor number (0=ground)
height_m: float | None = None # Height in meters
@property
def estimated_distance(self) -> float:
"""Estimate distance in meters"""
return estimate_distance(self.rssi)
@property
def signal_quality(self) -> str:
"""Human-readable signal quality"""
return rssi_to_quality(self.rssi)
@dataclass
class BluetoothDevice:
"""Represents a discovered Bluetooth device"""
address: str
name: str
rssi: int # dBm
device_class: str
device_type: str
manufacturer: str = ""
floor: int | None = None # Floor number (0=ground)
height_m: float | None = None # Height in meters
@property
def estimated_distance(self) -> float:
"""Estimate distance in meters (BT has lower TX power)"""
return estimate_distance(self.rssi, tx_power=-65)
@property
def signal_quality(self) -> str:
"""Human-readable signal quality"""
return rssi_to_quality(self.rssi)
@dataclass
class ScanResult:
"""Container for scan results"""
timestamp: str
location_label: str
wifi_networks: list
bluetooth_devices: list
class RFScanner:
"""Main RF scanning class for WiFi and Bluetooth"""
def __init__(self, data_dir: Path | None = None):
self.oui_lookup = OUILookup()
self.bt_decoder = BluetoothClassDecoder()
self.scan_history: list[ScanResult] = []
self.data_dir = data_dir or Path.home() / "git" / "rf-mapper" / "data"
self.data_dir.mkdir(parents=True, exist_ok=True)
def scan_wifi(self, interface: str = "wlan0") -> list[WifiNetwork]:
"""
Scan for WiFi networks using iw.
Args:
interface: WiFi interface name (default: wlan0)
Returns:
List of discovered WiFi networks
"""
networks = []
try:
result = subprocess.run(
['sudo', 'iw', 'dev', interface, 'scan'],
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
print(f"WiFi scan error: {result.stderr}")
return networks
current_network: dict = {}
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('BSS '):
if current_network and 'bssid' in current_network:
networks.append(self._create_wifi_network(current_network))
match = re.match(r'BSS ([0-9a-fA-F:]+)', line)
if match:
current_network = {'bssid': match.group(1)}
elif line.startswith('signal:'):
match = re.search(r'(-?\d+\.?\d*)\s*dBm', line)
if match:
current_network['rssi'] = int(float(match.group(1)))
elif line.startswith('freq:'):
match = re.search(r'(\d+)', line)
if match:
current_network['frequency'] = int(match.group(1))
elif line.startswith('SSID:'):
ssid = line.replace('SSID:', '').strip()
current_network['ssid'] = ssid if ssid else '<hidden>'
elif line.startswith('DS Parameter set: channel'):
match = re.search(r'channel\s+(\d+)', line)
if match:
current_network['channel'] = int(match.group(1))
elif 'RSN:' in line or 'WPA:' in line:
current_network['encryption'] = 'WPA/WPA2'
elif 'WEP' in line:
current_network['encryption'] = 'WEP'
elif 'Privacy' in line and 'encryption' not in current_network:
current_network['encryption'] = 'Encrypted'
if current_network and 'bssid' in current_network:
networks.append(self._create_wifi_network(current_network))
except subprocess.TimeoutExpired:
print("WiFi scan timed out")
except Exception as e:
print(f"WiFi scan error: {e}")
return networks
def _create_wifi_network(self, data: dict) -> WifiNetwork:
"""Create WifiNetwork from parsed data"""
bssid = data.get('bssid', '')
return WifiNetwork(
ssid=data.get('ssid', '<unknown>'),
bssid=bssid,
rssi=data.get('rssi', -100),
channel=data.get('channel', 0),
frequency=data.get('frequency', 0),
encryption=data.get('encryption', 'Open'),
manufacturer=self.oui_lookup.lookup(bssid)
)
def scan_bluetooth(self, timeout: int = 10, auto_identify: bool = True) -> list[BluetoothDevice]:
"""
Scan for Bluetooth devices (Classic and BLE).
Args:
timeout: Scan duration in seconds
auto_identify: Automatically identify unknown devices
Returns:
List of discovered Bluetooth devices
"""
devices = []
# Classic Bluetooth scan
try:
print(f"Scanning Classic Bluetooth ({timeout} seconds)...")
result = subprocess.run(
['sudo', 'hcitool', 'inq', '--flush'],
capture_output=True,
text=True,
timeout=timeout + 10
)
for line in result.stdout.split('\n'):
match = re.match(
r'\s*([0-9A-Fa-f:]+)\s+clock offset:\s*\S+\s+class:\s*(\S+)',
line
)
if match:
addr = match.group(1)
device_class = match.group(2)
name = self._get_bt_name(addr)
rssi = self._get_bt_rssi(addr)
dev_type, dev_subtype = self.bt_decoder.decode(device_class)
devices.append(BluetoothDevice(
address=addr,
name=name,
rssi=rssi,
device_class=device_class,
device_type=f"{dev_type}" + (f" ({dev_subtype})" if dev_subtype else ""),
manufacturer=self.oui_lookup.lookup(addr)
))
except Exception as e:
print(f"Classic BT scan error: {e}")
# BLE scan
try:
print(f"Scanning BLE devices ({timeout} seconds)...")
result = subprocess.run(
['sudo', 'timeout', str(timeout), 'hcitool', 'lescan', '--duplicates'],
capture_output=True,
text=True,
timeout=timeout + 5
)
seen_addrs = {d.address for d in devices}
for line in result.stdout.split('\n'):
match = re.match(r'([0-9A-Fa-f:]+)\s*(.*)', line)
if match:
addr = match.group(1)
name = match.group(2).strip() or '<unknown>'
if addr not in seen_addrs and addr != 'LE':
seen_addrs.add(addr)
manufacturer = self.oui_lookup.lookup(addr)
# Try to infer device type from name first, then manufacturer
inferred_type = infer_device_type_from_name(name)
if not inferred_type:
inferred_type = infer_device_type_from_manufacturer(manufacturer)
# Mark randomized MAC devices if still unknown
if not inferred_type:
if is_random_mac(addr):
device_type = "BLE Device (Random MAC)"
else:
device_type = "Low Energy Device"
else:
device_type = inferred_type
devices.append(BluetoothDevice(
address=addr,
name=name,
rssi=-70, # Default estimate for BLE
device_class="BLE",
device_type=device_type,
manufacturer=manufacturer
))
except Exception as e:
print(f"BLE scan error: {e}")
# Auto-identify unknown devices
if auto_identify and devices:
devices = self._auto_identify_devices(devices)
return devices
def _auto_identify_devices(self, devices: list[BluetoothDevice]) -> list[BluetoothDevice]:
"""Automatically identify devices with unknown names or types"""
identified = []
# Types that are considered "unidentified" and need deeper lookup
generic_types = {'Low Energy Device', 'Unknown', '', 'BLE Device (Random MAC)'}
for dev in devices:
# Skip if already well-identified (has good name and specific type)
is_name_known = dev.name and dev.name not in ('<unknown>', '(unknown)', 'Unknown', '')
is_type_known = dev.device_type and dev.device_type not in generic_types
if is_name_known and is_type_known:
identified.append(dev)
continue
# Try to identify via bluetoothctl
try:
print(f" Identifying {dev.address}...")
info = identify_device(dev.address, is_ble=(dev.device_class == "BLE"))
# Update device with identified info
new_name = info.name or info.alias or dev.name
new_type = dev.device_type # Keep existing type as fallback
# Use bluetoothctl-discovered type if better than what we have
if info.device_type and info.device_type != "Unknown":
new_type = info.device_type
elif dev.device_type in generic_types:
# Try name-based inference with potentially better name
inferred = infer_device_type_from_name(new_name)
if not inferred:
# Try manufacturer-based inference
inferred = infer_device_type_from_manufacturer(dev.manufacturer)
if inferred:
new_type = inferred
elif is_random_mac(dev.address):
new_type = "BLE Device (Random MAC)"
# Build services string if available
services_str = ""
if info.services:
services_str = f" [{', '.join(info.services[:3])}]"
identified.append(BluetoothDevice(
address=dev.address,
name=new_name,
rssi=dev.rssi,
device_class=dev.device_class,
device_type=new_type + services_str if services_str else new_type,
manufacturer=dev.manufacturer
))
except Exception as e:
# Keep original device if identification fails
identified.append(dev)
return identified
def _get_bt_name(self, address: str) -> str:
"""Get Bluetooth device name"""
try:
result = subprocess.run(
['sudo', 'hcitool', 'name', address],
capture_output=True,
text=True,
timeout=10
)
name = result.stdout.strip()
return name if name else '<unknown>'
except:
return '<unknown>'
def _get_bt_rssi(self, address: str) -> int:
"""Get Bluetooth RSSI for a device"""
try:
result = subprocess.run(
['sudo', 'hcitool', 'rssi', address],
capture_output=True,
text=True,
timeout=5
)
match = re.search(r'RSSI return value:\s*(-?\d+)', result.stdout)
if match:
return int(match.group(1))
except:
pass
return -80
def full_scan(
self,
location_label: str = "default",
auto_identify_bt: bool = True
) -> tuple[ScanResult, list[WifiNetwork], list[BluetoothDevice]]:
"""
Perform a full WiFi and Bluetooth scan.
Args:
location_label: Label for this scan location
auto_identify_bt: Automatically identify Bluetooth devices
Returns:
Tuple of (ScanResult, wifi_networks, bluetooth_devices)
"""
print(f"\n{'='*60}")
print(f"Starting RF Environment Scan at {datetime.now().isoformat()}")
print(f"Location: {location_label}")
print('='*60)
print("\n[1/2] Scanning WiFi networks...")
wifi_networks = self.scan_wifi()
print(f" Found {len(wifi_networks)} WiFi networks")
print("\n[2/2] Scanning Bluetooth devices...")
bt_devices = self.scan_bluetooth(auto_identify=auto_identify_bt)
print(f" Found {len(bt_devices)} Bluetooth devices")
result = ScanResult(
timestamp=datetime.now().isoformat(),
location_label=location_label,
wifi_networks=[asdict(n) for n in wifi_networks],
bluetooth_devices=[asdict(d) for d in bt_devices]
)
self.scan_history.append(result)
# Save to file
filename = self.data_dir / f"scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{location_label}.json"
with open(filename, 'w') as f:
json.dump(asdict(result), f, indent=2)
print(f"\nScan saved to: {filename}")
return result, wifi_networks, bt_devices
def print_results(self, wifi_networks: list[WifiNetwork], bt_devices: list[BluetoothDevice]):
"""Pretty print scan results"""
print(f"\n{'='*80}")
print("WiFi NETWORKS")
print('='*80)
wifi_sorted = sorted(wifi_networks, key=lambda x: x.rssi, reverse=True)
print(f"{'SSID':<25} {'BSSID':<18} {'RSSI':>6} {'Ch':>4} {'Manufacturer':<25}")
print('-'*80)
for net in wifi_sorted:
bar = rssi_bar(net.rssi)
print(f"{net.ssid[:24]:<25} {net.bssid:<18} {net.rssi:>4}dB {net.channel:>4} {net.manufacturer[:24]:<25}")
print(f" Signal: {bar} ({net.signal_quality})")
print(f"\n{'='*80}")
print("BLUETOOTH DEVICES")
print('='*80)
bt_sorted = sorted(bt_devices, key=lambda x: x.rssi, reverse=True)
print(f"{'Name':<20} {'Address':<18} {'RSSI':>6} {'Type':<20} {'Manufacturer':<20}")
print('-'*80)
for dev in bt_sorted:
print(f"{dev.name[:19]:<20} {dev.address:<18} {dev.rssi:>4}dB {dev.device_type[:19]:<20} {dev.manufacturer[:19]:<20}")