From b0e6d1107c8454b3afbb229b1f34eb6c72c66919 Mon Sep 17 00:00:00 2001 From: User Date: Sun, 1 Feb 2026 13:13:28 +0100 Subject: [PATCH] feat: replace hcitool BLE scanning with bleak library Use bleak's BleakScanner for BLE device discovery instead of hcitool lescan subprocess. Provides real RSSI values from advertisement packets instead of hardcoded -70dB estimate. Co-Authored-By: Claude Opus 4.5 --- src/rf_mapper/scanner.py | 78 ++++++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 34 deletions(-) diff --git a/src/rf_mapper/scanner.py b/src/rf_mapper/scanner.py index ec5b995..4a17eb0 100644 --- a/src/rf_mapper/scanner.py +++ b/src/rf_mapper/scanner.py @@ -3,10 +3,13 @@ import subprocess import re import json +import asyncio from datetime import datetime from pathlib import Path from dataclasses import dataclass, asdict +from bleak import BleakScanner + from .oui import OUILookup from .bluetooth_class import BluetoothClassDecoder from .distance import estimate_distance, rssi_to_quality, rssi_bar @@ -214,49 +217,56 @@ class RFScanner: except Exception as e: print(f"Classic BT scan error: {e}") - # BLE scan + # BLE scan using bleak try: print(f"Scanning BLE devices ({timeout} seconds)...") - result = subprocess.run( - ['sudo', 'timeout', str(timeout), 'hcitool', 'lescan', '--duplicates'], - capture_output=True, - text=True, - timeout=timeout + 5 - ) + + async def _ble_scan(): + return await BleakScanner.discover( + timeout=timeout, + return_adv=True + ) + + ble_results = asyncio.run(_ble_scan()) seen_addrs = {d.address for d in devices} - for line in result.stdout.split('\n'): - match = re.match(r'([0-9A-Fa-f:]+)\s*(.*)', line) - if match: - addr = match.group(1) - name = match.group(2).strip() or '' + for device, adv_data in ble_results.values(): + addr = device.address + if addr not in seen_addrs and addr != 'LE': + seen_addrs.add(addr) - if addr not in seen_addrs and addr != 'LE': - seen_addrs.add(addr) - manufacturer = self.oui_lookup.lookup(addr) + name = device.name or adv_data.local_name or '' + rssi = adv_data.rssi # Real RSSI from advertisement + manufacturer = self.oui_lookup.lookup(addr) - # Try to infer device type from name first, then manufacturer - inferred_type = infer_device_type_from_name(name) - if not inferred_type: - inferred_type = infer_device_type_from_manufacturer(manufacturer) + # Extract manufacturer data if available + if adv_data.manufacturer_data and not manufacturer: + # First 2 bytes are company ID + for company_id in adv_data.manufacturer_data.keys(): + manufacturer = f"Company ID: {company_id}" + break - # Mark randomized MAC devices if still unknown - if not inferred_type: - if is_random_mac(addr): - device_type = "BLE Device (Random MAC)" - else: - device_type = "Low Energy Device" + # Infer device type + inferred_type = infer_device_type_from_name(name) + if not inferred_type: + inferred_type = infer_device_type_from_manufacturer(manufacturer) + + if not inferred_type: + if is_random_mac(addr): + device_type = "BLE Device (Random MAC)" else: - device_type = inferred_type + device_type = "Low Energy Device" + else: + device_type = inferred_type - devices.append(BluetoothDevice( - address=addr, - name=name, - rssi=-70, # Default estimate for BLE - device_class="BLE", - device_type=device_type, - manufacturer=manufacturer - )) + devices.append(BluetoothDevice( + address=addr, + name=name, + rssi=rssi, # Real RSSI instead of -70 + device_class="BLE", + device_type=device_type, + manufacturer=manufacturer + )) except Exception as e: print(f"BLE scan error: {e}")