"""RF Environment Scanner - WiFi and Bluetooth device discovery""" import subprocess import re import json import asyncio import os from datetime import datetime from pathlib import Path from dataclasses import dataclass, asdict from bleak import BleakScanner def is_termux() -> bool: """Detect if running in Termux (Android).""" return os.environ.get('TERMUX_VERSION') is not None or \ os.path.exists('/data/data/com.termux') from .oui import OUILookup from .bluetooth_class import BluetoothClassDecoder from .distance import estimate_distance, rssi_to_quality, rssi_bar from .bluetooth_identify import ( identify_device, infer_device_type_from_name, infer_device_type_from_manufacturer, is_random_mac, ) @dataclass class WifiNetwork: """Represents a discovered WiFi network""" ssid: str bssid: str rssi: int # dBm channel: int frequency: int # MHz encryption: str manufacturer: str = "" floor: int | None = None # Floor number (0=ground) height_m: float | None = None # Height in meters @property def estimated_distance(self) -> float: """Estimate distance in meters""" return estimate_distance(self.rssi) @property def signal_quality(self) -> str: """Human-readable signal quality""" return rssi_to_quality(self.rssi) @dataclass class BluetoothDevice: """Represents a discovered Bluetooth device""" address: str name: str rssi: int # dBm device_class: str device_type: str manufacturer: str = "" floor: int | None = None # Floor number (0=ground) height_m: float | None = None # Height in meters @property def estimated_distance(self) -> float: """Estimate distance in meters (BT has lower TX power)""" return estimate_distance(self.rssi, tx_power=-65) @property def signal_quality(self) -> str: """Human-readable signal quality""" return rssi_to_quality(self.rssi) @dataclass class ScanResult: """Container for scan results""" timestamp: str location_label: str wifi_networks: list bluetooth_devices: list class RFScanner: """Main RF scanning class for WiFi and Bluetooth""" def __init__(self, data_dir: Path | None = None): self.oui_lookup = OUILookup() self.bt_decoder = BluetoothClassDecoder() self.scan_history: list[ScanResult] = [] self.data_dir = data_dir or Path.home() / "git" / "rf-mapper" / "data" self.data_dir.mkdir(parents=True, exist_ok=True) def scan_wifi(self, interface: str = "wlan0") -> list[WifiNetwork]: """ Scan for WiFi networks using iw. Args: interface: WiFi interface name (default: wlan0) Returns: List of discovered WiFi networks """ networks = [] try: result = subprocess.run( ['sudo', 'iw', 'dev', interface, 'scan'], capture_output=True, text=True, timeout=30 ) if result.returncode != 0: print(f"WiFi scan error: {result.stderr}") return networks current_network: dict = {} for line in result.stdout.split('\n'): line = line.strip() if line.startswith('BSS '): if current_network and 'bssid' in current_network: networks.append(self._create_wifi_network(current_network)) match = re.match(r'BSS ([0-9a-fA-F:]+)', line) if match: current_network = {'bssid': match.group(1)} elif line.startswith('signal:'): match = re.search(r'(-?\d+\.?\d*)\s*dBm', line) if match: current_network['rssi'] = int(float(match.group(1))) elif line.startswith('freq:'): match = re.search(r'(\d+)', line) if match: current_network['frequency'] = int(match.group(1)) elif line.startswith('SSID:'): ssid = line.replace('SSID:', '').strip() current_network['ssid'] = ssid if ssid else '' elif line.startswith('DS Parameter set: channel'): match = re.search(r'channel\s+(\d+)', line) if match: current_network['channel'] = int(match.group(1)) elif 'RSN:' in line or 'WPA:' in line: current_network['encryption'] = 'WPA/WPA2' elif 'WEP' in line: current_network['encryption'] = 'WEP' elif 'Privacy' in line and 'encryption' not in current_network: current_network['encryption'] = 'Encrypted' if current_network and 'bssid' in current_network: networks.append(self._create_wifi_network(current_network)) except subprocess.TimeoutExpired: print("WiFi scan timed out") except Exception as e: print(f"WiFi scan error: {e}") return networks def _create_wifi_network(self, data: dict) -> WifiNetwork: """Create WifiNetwork from parsed data""" bssid = data.get('bssid', '') return WifiNetwork( ssid=data.get('ssid', ''), bssid=bssid, rssi=data.get('rssi', -100), channel=data.get('channel', 0), frequency=data.get('frequency', 0), encryption=data.get('encryption', 'Open'), manufacturer=self.oui_lookup.lookup(bssid) ) def scan_bluetooth(self, timeout: int = 10, auto_identify: bool = True) -> list[BluetoothDevice]: """ Scan for Bluetooth devices (Classic and BLE). Args: timeout: Scan duration in seconds auto_identify: Automatically identify unknown devices Returns: List of discovered Bluetooth devices """ devices = [] # Classic Bluetooth scan (not supported on Termux/Android) if is_termux(): print("Skipping Classic BT scan (not supported on Termux)") else: try: print(f"Scanning Classic Bluetooth ({timeout} seconds)...") result = subprocess.run( ['sudo', 'hcitool', 'inq', '--flush'], capture_output=True, text=True, timeout=timeout + 10 ) for line in result.stdout.split('\n'): match = re.match( r'\s*([0-9A-Fa-f:]+)\s+clock offset:\s*\S+\s+class:\s*(\S+)', line ) if match: addr = match.group(1) device_class = match.group(2) name = self._get_bt_name(addr) rssi = self._get_bt_rssi(addr) dev_type, dev_subtype = self.bt_decoder.decode(device_class) devices.append(BluetoothDevice( address=addr, name=name, rssi=rssi, device_class=device_class, device_type=f"{dev_type}" + (f" ({dev_subtype})" if dev_subtype else ""), manufacturer=self.oui_lookup.lookup(addr) )) except Exception as e: print(f"Classic BT scan error: {e}") # BLE scan using bleak (not supported on Termux/Android) if is_termux(): print("Skipping BLE scan (not supported on Termux)") else: try: print(f"Scanning BLE devices ({timeout} seconds)...") async def _ble_scan(): return await BleakScanner.discover( timeout=timeout, return_adv=True ) ble_results = asyncio.run(_ble_scan()) seen_addrs = {d.address for d in devices} for device, adv_data in ble_results.values(): addr = device.address if addr not in seen_addrs and addr != 'LE': seen_addrs.add(addr) name = device.name or adv_data.local_name or '' rssi = adv_data.rssi # Real RSSI from advertisement manufacturer = self.oui_lookup.lookup(addr) # Extract manufacturer data if available if adv_data.manufacturer_data and not manufacturer: # First 2 bytes are company ID for company_id in adv_data.manufacturer_data.keys(): manufacturer = f"Company ID: {company_id}" break # Infer device type inferred_type = infer_device_type_from_name(name) if not inferred_type: inferred_type = infer_device_type_from_manufacturer(manufacturer) if not inferred_type: if is_random_mac(addr): device_type = "BLE Device (Random MAC)" else: device_type = "Low Energy Device" else: device_type = inferred_type devices.append(BluetoothDevice( address=addr, name=name, rssi=rssi, # Real RSSI instead of -70 device_class="BLE", device_type=device_type, manufacturer=manufacturer )) except Exception as e: print(f"BLE scan error: {e}") # Auto-identify unknown devices if auto_identify and devices: devices = self._auto_identify_devices(devices) return devices def _auto_identify_devices(self, devices: list[BluetoothDevice]) -> list[BluetoothDevice]: """Automatically identify devices with unknown names or types""" identified = [] # Types that are considered "unidentified" and need deeper lookup generic_types = {'Low Energy Device', 'Unknown', '', 'BLE Device (Random MAC)'} for dev in devices: # Skip if already well-identified (has good name and specific type) is_name_known = dev.name and dev.name not in ('', '(unknown)', 'Unknown', '') is_type_known = dev.device_type and dev.device_type not in generic_types if is_name_known and is_type_known: identified.append(dev) continue # Try to identify via bluetoothctl try: print(f" Identifying {dev.address}...") info = identify_device(dev.address, is_ble=(dev.device_class == "BLE")) # Update device with identified info new_name = info.name or info.alias or dev.name new_type = dev.device_type # Keep existing type as fallback # Use bluetoothctl-discovered type if better than what we have if info.device_type and info.device_type != "Unknown": new_type = info.device_type elif dev.device_type in generic_types: # Try name-based inference with potentially better name inferred = infer_device_type_from_name(new_name) if not inferred: # Try manufacturer-based inference inferred = infer_device_type_from_manufacturer(dev.manufacturer) if inferred: new_type = inferred elif is_random_mac(dev.address): new_type = "BLE Device (Random MAC)" # Build services string if available services_str = "" if info.services: services_str = f" [{', '.join(info.services[:3])}]" identified.append(BluetoothDevice( address=dev.address, name=new_name, rssi=dev.rssi, device_class=dev.device_class, device_type=new_type + services_str if services_str else new_type, manufacturer=dev.manufacturer )) except Exception as e: # Keep original device if identification fails identified.append(dev) return identified def _get_bt_name(self, address: str) -> str: """Get Bluetooth device name""" try: result = subprocess.run( ['sudo', 'hcitool', 'name', address], capture_output=True, text=True, timeout=10 ) name = result.stdout.strip() return name if name else '' except: return '' def _get_bt_rssi(self, address: str) -> int: """Get Bluetooth RSSI for a device""" try: result = subprocess.run( ['sudo', 'hcitool', 'rssi', address], capture_output=True, text=True, timeout=5 ) match = re.search(r'RSSI return value:\s*(-?\d+)', result.stdout) if match: return int(match.group(1)) except: pass return -80 def full_scan( self, location_label: str = "default", auto_identify_bt: bool = True ) -> tuple[ScanResult, list[WifiNetwork], list[BluetoothDevice]]: """ Perform a full WiFi and Bluetooth scan. Args: location_label: Label for this scan location auto_identify_bt: Automatically identify Bluetooth devices Returns: Tuple of (ScanResult, wifi_networks, bluetooth_devices) """ print(f"\n{'='*60}") print(f"Starting RF Environment Scan at {datetime.now().isoformat()}") print(f"Location: {location_label}") print('='*60) print("\n[1/2] Scanning WiFi networks...") wifi_networks = self.scan_wifi() print(f" Found {len(wifi_networks)} WiFi networks") print("\n[2/2] Scanning Bluetooth devices...") bt_devices = self.scan_bluetooth(auto_identify=auto_identify_bt) print(f" Found {len(bt_devices)} Bluetooth devices") result = ScanResult( timestamp=datetime.now().isoformat(), location_label=location_label, wifi_networks=[asdict(n) for n in wifi_networks], bluetooth_devices=[asdict(d) for d in bt_devices] ) self.scan_history.append(result) # Save to file filename = self.data_dir / f"scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{location_label}.json" with open(filename, 'w') as f: json.dump(asdict(result), f, indent=2) print(f"\nScan saved to: {filename}") return result, wifi_networks, bt_devices def print_results(self, wifi_networks: list[WifiNetwork], bt_devices: list[BluetoothDevice]): """Pretty print scan results""" print(f"\n{'='*80}") print("WiFi NETWORKS") print('='*80) wifi_sorted = sorted(wifi_networks, key=lambda x: x.rssi, reverse=True) print(f"{'SSID':<25} {'BSSID':<18} {'RSSI':>6} {'Ch':>4} {'Manufacturer':<25}") print('-'*80) for net in wifi_sorted: bar = rssi_bar(net.rssi) print(f"{net.ssid[:24]:<25} {net.bssid:<18} {net.rssi:>4}dB {net.channel:>4} {net.manufacturer[:24]:<25}") print(f" Signal: {bar} ({net.signal_quality})") print(f"\n{'='*80}") print("BLUETOOTH DEVICES") print('='*80) bt_sorted = sorted(bt_devices, key=lambda x: x.rssi, reverse=True) print(f"{'Name':<20} {'Address':<18} {'RSSI':>6} {'Type':<20} {'Manufacturer':<20}") print('-'*80) for dev in bt_sorted: print(f"{dev.name[:19]:<20} {dev.address:<18} {dev.rssi:>4}dB {dev.device_type[:19]:<20} {dev.manufacturer[:19]:<20}")