feat: replace hcitool BLE scanning with bleak library

Use bleak's BleakScanner for BLE device discovery instead of
hcitool lescan subprocess. Provides real RSSI values from
advertisement packets instead of hardcoded -70dB estimate.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
User
2026-02-01 13:13:28 +01:00
parent 4fef21c06f
commit b0e6d1107c

View File

@@ -3,10 +3,13 @@
import subprocess import subprocess
import re import re
import json import json
import asyncio
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
from bleak import BleakScanner
from .oui import OUILookup from .oui import OUILookup
from .bluetooth_class import BluetoothClassDecoder from .bluetooth_class import BluetoothClassDecoder
from .distance import estimate_distance, rssi_to_quality, rssi_bar from .distance import estimate_distance, rssi_to_quality, rssi_bar
@@ -214,49 +217,56 @@ class RFScanner:
except Exception as e: except Exception as e:
print(f"Classic BT scan error: {e}") print(f"Classic BT scan error: {e}")
# BLE scan # BLE scan using bleak
try: try:
print(f"Scanning BLE devices ({timeout} seconds)...") print(f"Scanning BLE devices ({timeout} seconds)...")
result = subprocess.run(
['sudo', 'timeout', str(timeout), 'hcitool', 'lescan', '--duplicates'], async def _ble_scan():
capture_output=True, return await BleakScanner.discover(
text=True, timeout=timeout,
timeout=timeout + 5 return_adv=True
) )
ble_results = asyncio.run(_ble_scan())
seen_addrs = {d.address for d in devices} seen_addrs = {d.address for d in devices}
for line in result.stdout.split('\n'): for device, adv_data in ble_results.values():
match = re.match(r'([0-9A-Fa-f:]+)\s*(.*)', line) addr = device.address
if match: if addr not in seen_addrs and addr != 'LE':
addr = match.group(1) seen_addrs.add(addr)
name = match.group(2).strip() or '<unknown>'
if addr not in seen_addrs and addr != 'LE': name = device.name or adv_data.local_name or '<unknown>'
seen_addrs.add(addr) rssi = adv_data.rssi # Real RSSI from advertisement
manufacturer = self.oui_lookup.lookup(addr) manufacturer = self.oui_lookup.lookup(addr)
# Try to infer device type from name first, then manufacturer # Extract manufacturer data if available
inferred_type = infer_device_type_from_name(name) if adv_data.manufacturer_data and not manufacturer:
if not inferred_type: # First 2 bytes are company ID
inferred_type = infer_device_type_from_manufacturer(manufacturer) for company_id in adv_data.manufacturer_data.keys():
manufacturer = f"Company ID: {company_id}"
break
# Mark randomized MAC devices if still unknown # Infer device type
if not inferred_type: inferred_type = infer_device_type_from_name(name)
if is_random_mac(addr): if not inferred_type:
device_type = "BLE Device (Random MAC)" inferred_type = infer_device_type_from_manufacturer(manufacturer)
else:
device_type = "Low Energy Device" if not inferred_type:
if is_random_mac(addr):
device_type = "BLE Device (Random MAC)"
else: else:
device_type = inferred_type device_type = "Low Energy Device"
else:
device_type = inferred_type
devices.append(BluetoothDevice( devices.append(BluetoothDevice(
address=addr, address=addr,
name=name, name=name,
rssi=-70, # Default estimate for BLE rssi=rssi, # Real RSSI instead of -70
device_class="BLE", device_class="BLE",
device_type=device_type, device_type=device_type,
manufacturer=manufacturer manufacturer=manufacturer
)) ))
except Exception as e: except Exception as e:
print(f"BLE scan error: {e}") print(f"BLE scan error: {e}")