Files
rf-mapper/src/rf_mapper/web/app.py
User 0e99232582 Switch to bleak for reliable BLE scanning with RSSI
- Replace hcitool-based BT scanning with bleak Python library
- Bleak provides reliable RSSI values via D-Bus/BlueZ
- BLE scan now finds devices that hcitool missed
- Update project docs to reflect resolved BT RSSI blocker
- Add bleak>=0.21.0 to dependencies

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 00:17:47 +01:00

1148 lines
41 KiB
Python

"""Flask application factory for RF Mapper web interface"""
import json
import os
import threading
import time
from datetime import datetime
from pathlib import Path
from flask import Flask, jsonify, render_template, request
from ..scanner import RFScanner
from ..distance import estimate_distance
from ..config import Config, get_config
from ..bluetooth_identify import identify_single_device, identify_device
from ..database import DeviceDatabase, init_database, get_database
class AutoScanner:
"""Background scanner that runs periodic scans"""
def __init__(self, app: Flask):
self.app = app
self._thread: threading.Thread | None = None
self._stop_event = threading.Event()
self._enabled = False
self._interval_minutes = 5
self._location_label = "auto_scan"
self._scan_wifi = True
self._scan_bluetooth = True
self._last_scan_time: datetime | None = None
self._last_scan_result: dict | None = None
self._scan_count = 0
self._lock = threading.Lock()
def start(self, interval_minutes: int | None = None, location_label: str | None = None,
scan_wifi: bool = True, scan_bluetooth: bool = True):
"""Start the background scanner"""
with self._lock:
if self._thread and self._thread.is_alive():
return # Already running
if interval_minutes:
self._interval_minutes = interval_minutes
if location_label:
self._location_label = location_label
self._scan_wifi = scan_wifi
self._scan_bluetooth = scan_bluetooth
self._stop_event.clear()
self._enabled = True
self._thread = threading.Thread(target=self._scan_loop, daemon=True)
self._thread.start()
services = []
if self._scan_wifi:
services.append("WiFi")
if self._scan_bluetooth:
services.append("BT")
print(f"[AutoScanner] Started - scanning {'+'.join(services)} every {self._interval_minutes} minutes")
def stop(self):
"""Stop the background scanner"""
with self._lock:
self._enabled = False
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5)
self._thread = None
print("[AutoScanner] Stopped")
def _scan_loop(self):
"""Main scanning loop"""
while not self._stop_event.is_set():
try:
self._perform_scan()
except Exception as e:
print(f"[AutoScanner] Scan error: {e}")
# Wait for interval or stop signal
self._stop_event.wait(timeout=self._interval_minutes * 60)
def _perform_scan(self):
"""Perform a single scan"""
with self.app.app_context():
rf_config = self.app.config["RF_CONFIG"]
data_dir = self.app.config["DATA_DIR"]
services = []
if self._scan_wifi:
services.append("WiFi")
if self._scan_bluetooth:
services.append("BT")
print(f"[AutoScanner] Starting {'+'.join(services)} scan...")
scanner = RFScanner(data_dir)
wifi = []
bt = []
if self._scan_wifi:
wifi = scanner.scan_wifi()
if self._scan_bluetooth:
bt = scanner.scan_bluetooth()
if rf_config.scanner.auto_identify_bluetooth:
from ..bluetooth_identify import infer_device_type_from_name, infer_device_type_from_manufacturer
for dev in bt:
if dev.device_type == "Unknown":
inferred = infer_device_type_from_name(dev.name)
if inferred == "Unknown":
inferred = infer_device_type_from_manufacturer(dev.manufacturer)
dev.device_type = inferred
# Save the scan result
from dataclasses import asdict
from ..scanner import ScanResult
timestamp = datetime.now().isoformat()
result = ScanResult(
timestamp=timestamp,
location_label=self._location_label,
wifi_networks=[asdict(n) for n in wifi],
bluetooth_devices=[asdict(d) for d in bt]
)
# Save to file
scan_id = f"scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{self._location_label}"
filename = data_dir / f"{scan_id}.json"
with open(filename, 'w') as f:
json.dump(asdict(result), f, indent=2)
# Record to database if enabled
db = self.app.config.get("DATABASE")
if db:
lat = self.app.config.get("CURRENT_LAT", 0)
lon = self.app.config.get("CURRENT_LON", 0)
db.record_scan(scan_id, timestamp, self._location_label, lat, lon, len(wifi), len(bt))
for net in wifi:
dist = estimate_distance(net.rssi)
db.record_wifi_observation(
bssid=net.bssid,
ssid=net.ssid,
rssi=net.rssi,
distance_m=dist,
channel=net.channel,
frequency=net.frequency,
encryption=net.encryption,
manufacturer=net.manufacturer,
floor=net.floor,
scan_id=scan_id
)
for dev in bt:
dist = estimate_distance(dev.rssi, tx_power=-65)
db.record_bluetooth_observation(
address=dev.address,
name=dev.name,
rssi=dev.rssi,
distance_m=dist,
device_class=dev.device_class,
device_type=dev.device_type,
manufacturer=dev.manufacturer,
floor=dev.floor,
scan_id=scan_id
)
self._last_scan_time = datetime.now()
self._scan_count += 1
self._last_scan_result = {
"timestamp": timestamp,
"wifi_count": len(wifi),
"bt_count": len(bt)
}
print(f"[AutoScanner] Scan complete - WiFi: {len(wifi)}, BT: {len(bt)}")
@property
def status(self) -> dict:
"""Get current status"""
return {
"enabled": self._enabled,
"running": self._thread is not None and self._thread.is_alive(),
"interval_minutes": self._interval_minutes,
"location_label": self._location_label,
"scan_wifi": self._scan_wifi,
"scan_bluetooth": self._scan_bluetooth,
"last_scan_time": self._last_scan_time.isoformat() if self._last_scan_time else None,
"last_scan_result": self._last_scan_result,
"scan_count": self._scan_count
}
def update_settings(self, interval_minutes: int | None = None, location_label: str | None = None):
"""Update scanner settings"""
with self._lock:
if interval_minutes and interval_minutes > 0:
self._interval_minutes = interval_minutes
if location_label:
self._location_label = location_label
def create_app(config: Config | None = None) -> Flask:
"""Create and configure the Flask application"""
if config is None:
config = get_config()
app = Flask(
__name__,
template_folder=str(Path(__file__).parent / "templates"),
static_folder=str(Path(__file__).parent / "static")
)
# Store config reference
app.config["RF_CONFIG"] = config
# Data directory from config
app.config["DATA_DIR"] = config.get_data_dir()
app.config["DATA_DIR"].mkdir(parents=True, exist_ok=True)
# GPS position from config
app.config["DEFAULT_LAT"] = config.gps.latitude
app.config["DEFAULT_LON"] = config.gps.longitude
# Store current GPS position (can be updated via API)
app.config["CURRENT_LAT"] = app.config["DEFAULT_LAT"]
app.config["CURRENT_LON"] = app.config["DEFAULT_LON"]
# Initialize auto-scanner
auto_scanner = AutoScanner(app)
app.config["AUTO_SCANNER"] = auto_scanner
# Initialize database if enabled
if config.database.enabled:
db = init_database(config.get_database_path())
app.config["DATABASE"] = db
print(f"[Database] Initialized at {config.get_database_path()}")
# Schedule cleanup if auto_cleanup is enabled
if config.database.auto_cleanup:
def cleanup_task():
import time
while True:
time.sleep(3600 * 24) # Run daily
try:
db.cleanup_old_data(config.database.retention_days)
print(f"[Database] Cleaned up data older than {config.database.retention_days} days")
except Exception as e:
print(f"[Database] Cleanup error: {e}")
cleanup_thread = threading.Thread(target=cleanup_task, daemon=True)
cleanup_thread.start()
else:
app.config["DATABASE"] = None
# Start auto-scanner if enabled in config
if config.auto_scan.enabled:
auto_scanner.start(
interval_minutes=config.auto_scan.interval_minutes,
location_label=config.auto_scan.location_label
)
@app.route("/")
def index():
"""Main dashboard page"""
rf_config = app.config["RF_CONFIG"]
return render_template(
"index.html",
lat=app.config["CURRENT_LAT"],
lon=app.config["CURRENT_LON"],
building={
"enabled": rf_config.building.enabled,
"name": rf_config.building.name,
"floors": rf_config.building.floors,
"floor_height_m": rf_config.building.floor_height_m,
"ground_floor_number": rf_config.building.ground_floor_number,
"current_floor": rf_config.building.current_floor
}
)
@app.route("/api/scan", methods=["POST"])
def api_scan():
"""Trigger a new RF scan"""
data = request.get_json() or {}
location_label = data.get("location", "web_scan")
lat = data.get("lat", app.config["CURRENT_LAT"])
lon = data.get("lon", app.config["CURRENT_LON"])
scan_wifi = data.get("scan_wifi", True)
scan_bluetooth = data.get("scan_bluetooth", True)
# Update current position
app.config["CURRENT_LAT"] = lat
app.config["CURRENT_LON"] = lon
rf_config = app.config["RF_CONFIG"]
scanner = RFScanner(app.config["DATA_DIR"])
# Only scan enabled services
wifi = []
bt = []
if scan_wifi:
wifi = scanner.scan_wifi()
if scan_bluetooth:
bt = scanner.scan_bluetooth()
if rf_config.scanner.auto_identify_bluetooth:
from ..bluetooth_identify import infer_device_type_from_name, infer_device_type_from_manufacturer
for dev in bt:
if dev.device_type == "Unknown":
inferred = infer_device_type_from_name(dev.name)
if inferred == "Unknown":
inferred = infer_device_type_from_manufacturer(dev.manufacturer)
dev.device_type = inferred
# Create and save result
from dataclasses import asdict
from ..scanner import ScanResult
timestamp = datetime.now().isoformat()
result = ScanResult(
timestamp=timestamp,
location_label=location_label,
wifi_networks=[asdict(n) for n in wifi],
bluetooth_devices=[asdict(d) for d in bt]
)
scan_id = f"scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{location_label}"
if scan_wifi or scan_bluetooth:
# Save to file
filename = app.config["DATA_DIR"] / f"{scan_id}.json"
with open(filename, 'w') as f:
json.dump(asdict(result), f, indent=2)
# Record to database if enabled
db = app.config.get("DATABASE")
if db:
db.record_scan(scan_id, timestamp, location_label, lat, lon, len(wifi), len(bt))
for net in wifi:
dist = estimate_distance(net.rssi)
db.record_wifi_observation(
bssid=net.bssid,
ssid=net.ssid,
rssi=net.rssi,
distance_m=dist,
channel=net.channel,
frequency=net.frequency,
encryption=net.encryption,
manufacturer=net.manufacturer,
floor=net.floor,
scan_id=scan_id
)
for dev in bt:
dist = estimate_distance(dev.rssi, tx_power=-65)
db.record_bluetooth_observation(
address=dev.address,
name=dev.name,
rssi=dev.rssi,
distance_m=dist,
device_class=dev.device_class,
device_type=dev.device_type,
manufacturer=dev.manufacturer,
floor=dev.floor,
scan_id=scan_id
)
# Enrich with GPS and distance data
response_data = {
"timestamp": timestamp,
"location": location_label,
"gps": {"lat": lat, "lon": lon},
"wifi_networks": [],
"bluetooth_devices": []
}
# Don't assign default floor to detected devices - their floor is unknown
# Only the scanner position has a known floor (from building config)
for net in wifi:
dist = estimate_distance(net.rssi)
response_data["wifi_networks"].append({
"ssid": net.ssid,
"bssid": net.bssid,
"rssi": net.rssi,
"channel": net.channel,
"frequency": net.frequency,
"encryption": net.encryption,
"manufacturer": net.manufacturer,
"estimated_distance_m": round(dist, 2),
"signal_quality": net.signal_quality,
"floor": net.floor, # None = unknown floor
"height_m": net.height_m
})
for dev in bt:
dist = estimate_distance(dev.rssi, tx_power=-65)
response_data["bluetooth_devices"].append({
"address": dev.address,
"name": dev.name,
"rssi": dev.rssi,
"device_class": dev.device_class,
"device_type": dev.device_type,
"manufacturer": dev.manufacturer,
"estimated_distance_m": round(dist, 2),
"signal_quality": dev.signal_quality,
"floor": dev.floor, # None = unknown floor
"height_m": dev.height_m
})
return jsonify(response_data)
@app.route("/api/scans")
def api_list_scans():
"""List all saved scans"""
data_dir = app.config["DATA_DIR"]
scan_files = sorted(data_dir.glob("scan_*.json"), reverse=True)
scans = []
for f in scan_files[:50]: # Limit to 50 most recent
try:
with open(f) as fh:
scan = json.load(fh)
scans.append({
"filename": f.name,
"timestamp": scan.get("timestamp", ""),
"location": scan.get("location_label", "unknown"),
"wifi_count": len(scan.get("wifi_networks", [])),
"bt_count": len(scan.get("bluetooth_devices", []))
})
except Exception:
pass
return jsonify(scans)
@app.route("/api/scans/<filename>")
def api_get_scan(filename: str):
"""Get a specific scan by filename"""
data_dir = app.config["DATA_DIR"]
filepath = data_dir / filename
if not filepath.exists() or not filepath.name.startswith("scan_"):
return jsonify({"error": "Scan not found"}), 404
with open(filepath) as f:
scan = json.load(f)
# Enrich with distance estimates and ensure floor fields exist
for net in scan.get("wifi_networks", []):
net["estimated_distance_m"] = round(estimate_distance(net["rssi"]), 2)
if "floor" not in net:
net["floor"] = None
if "height_m" not in net:
net["height_m"] = None
for dev in scan.get("bluetooth_devices", []):
dev["estimated_distance_m"] = round(estimate_distance(dev["rssi"], tx_power=-65), 2)
if "floor" not in dev:
dev["floor"] = None
if "height_m" not in dev:
dev["height_m"] = None
return jsonify(scan)
@app.route("/api/latest")
def api_latest_scan():
"""Get the most recent scan"""
data_dir = app.config["DATA_DIR"]
scan_files = sorted(data_dir.glob("scan_*.json"), reverse=True)
if not scan_files:
return jsonify({"error": "No scans found"}), 404
with open(scan_files[0]) as f:
scan = json.load(f)
# Enrich with distance estimates and ensure floor fields exist
for net in scan.get("wifi_networks", []):
net["estimated_distance_m"] = round(estimate_distance(net["rssi"]), 2)
if "floor" not in net:
net["floor"] = None
if "height_m" not in net:
net["height_m"] = None
for dev in scan.get("bluetooth_devices", []):
dev["estimated_distance_m"] = round(estimate_distance(dev["rssi"], tx_power=-65), 2)
if "floor" not in dev:
dev["floor"] = None
if "height_m" not in dev:
dev["height_m"] = None
scan["gps"] = {
"lat": app.config["CURRENT_LAT"],
"lon": app.config["CURRENT_LON"]
}
return jsonify(scan)
@app.route("/api/position", methods=["GET", "POST"])
def api_position():
"""Get or set current GPS position"""
if request.method == "POST":
data = request.get_json() or {}
if "lat" in data and "lon" in data:
app.config["CURRENT_LAT"] = float(data["lat"])
app.config["CURRENT_LON"] = float(data["lon"])
return jsonify({
"lat": app.config["CURRENT_LAT"],
"lon": app.config["CURRENT_LON"]
})
@app.route("/api/config", methods=["GET", "POST"])
def api_config():
"""Get or update configuration"""
rf_config = app.config["RF_CONFIG"]
if request.method == "POST":
data = request.get_json() or {}
# Update GPS
if "gps" in data:
if "latitude" in data["gps"]:
rf_config.gps.latitude = float(data["gps"]["latitude"])
app.config["DEFAULT_LAT"] = rf_config.gps.latitude
app.config["CURRENT_LAT"] = rf_config.gps.latitude
if "longitude" in data["gps"]:
rf_config.gps.longitude = float(data["gps"]["longitude"])
app.config["DEFAULT_LON"] = rf_config.gps.longitude
app.config["CURRENT_LON"] = rf_config.gps.longitude
# Update scanner settings
if "scanner" in data:
if "path_loss_exponent" in data["scanner"]:
rf_config.scanner.path_loss_exponent = float(data["scanner"]["path_loss_exponent"])
# Optionally save to file
if data.get("save", False):
rf_config.save()
return jsonify({
"gps": {
"latitude": rf_config.gps.latitude,
"longitude": rf_config.gps.longitude
},
"web": {
"host": rf_config.web.host,
"port": rf_config.web.port
},
"scanner": {
"wifi_interface": rf_config.scanner.wifi_interface,
"bt_scan_timeout": rf_config.scanner.bt_scan_timeout,
"path_loss_exponent": rf_config.scanner.path_loss_exponent
},
"config_file": str(rf_config._config_path) if rf_config._config_path else None
})
@app.route("/api/bluetooth/identify/<address>")
def api_bluetooth_identify(address: str):
"""Identify a Bluetooth device by address"""
try:
info = identify_single_device(address)
return jsonify(info)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/bluetooth/identify", methods=["POST"])
def api_bluetooth_identify_batch():
"""Identify multiple Bluetooth devices"""
data = request.get_json() or {}
addresses = data.get("addresses", [])
results = {}
for addr in addresses[:10]: # Limit to 10 devices
try:
results[addr] = identify_single_device(addr)
except Exception as e:
results[addr] = {"error": str(e)}
return jsonify(results)
@app.route("/api/autoscan", methods=["GET"])
def api_autoscan_status():
"""Get auto-scan status"""
auto_scanner = app.config["AUTO_SCANNER"]
return jsonify(auto_scanner.status)
@app.route("/api/autoscan/start", methods=["POST"])
def api_autoscan_start():
"""Start auto-scanning"""
auto_scanner = app.config["AUTO_SCANNER"]
rf_config = app.config["RF_CONFIG"]
data = request.get_json() or {}
interval = data.get("interval_minutes", rf_config.auto_scan.interval_minutes)
location = data.get("location_label", rf_config.auto_scan.location_label)
scan_wifi = data.get("scan_wifi", True)
scan_bluetooth = data.get("scan_bluetooth", True)
auto_scanner.start(
interval_minutes=interval,
location_label=location,
scan_wifi=scan_wifi,
scan_bluetooth=scan_bluetooth
)
# Update config
rf_config.auto_scan.enabled = True
rf_config.auto_scan.interval_minutes = interval
rf_config.auto_scan.location_label = location
# Save if requested
if data.get("save", False):
rf_config.save()
return jsonify({"status": "started", **auto_scanner.status})
@app.route("/api/autoscan/stop", methods=["POST"])
def api_autoscan_stop():
"""Stop auto-scanning"""
auto_scanner = app.config["AUTO_SCANNER"]
rf_config = app.config["RF_CONFIG"]
data = request.get_json() or {}
auto_scanner.stop()
# Update config
rf_config.auto_scan.enabled = False
# Save if requested
if data.get("save", False):
rf_config.save()
return jsonify({"status": "stopped", **auto_scanner.status})
@app.route("/api/autoscan/settings", methods=["POST"])
def api_autoscan_settings():
"""Update auto-scan settings"""
auto_scanner = app.config["AUTO_SCANNER"]
rf_config = app.config["RF_CONFIG"]
data = request.get_json() or {}
interval = data.get("interval_minutes")
location = data.get("location_label")
auto_scanner.update_settings(interval_minutes=interval, location_label=location)
# Update config
if interval:
rf_config.auto_scan.interval_minutes = interval
if location:
rf_config.auto_scan.location_label = location
# Save if requested
if data.get("save", False):
rf_config.save()
return jsonify(auto_scanner.status)
@app.route("/api/building", methods=["GET", "POST"])
def api_building():
"""Get or update building configuration"""
rf_config = app.config["RF_CONFIG"]
if request.method == "POST":
data = request.get_json() or {}
if "enabled" in data:
rf_config.building.enabled = bool(data["enabled"])
if "name" in data:
rf_config.building.name = str(data["name"])
if "floors" in data:
rf_config.building.floors = int(data["floors"])
if "floor_height_m" in data:
rf_config.building.floor_height_m = float(data["floor_height_m"])
if "ground_floor_number" in data:
rf_config.building.ground_floor_number = int(data["ground_floor_number"])
if "current_floor" in data:
rf_config.building.current_floor = int(data["current_floor"])
# Save if requested
if data.get("save", False):
rf_config.save()
return jsonify({
"enabled": rf_config.building.enabled,
"name": rf_config.building.name,
"floors": rf_config.building.floors,
"floor_height_m": rf_config.building.floor_height_m,
"ground_floor_number": rf_config.building.ground_floor_number,
"current_floor": rf_config.building.current_floor
})
@app.route("/api/device/<device_id>/floor", methods=["POST"])
def api_device_floor(device_id: str):
"""Assign floor to a device in the most recent scan"""
data = request.get_json() or {}
floor = data.get("floor")
height_m = data.get("height_m")
if floor is None and height_m is None:
return jsonify({"error": "Must provide floor or height_m"}), 400
# Load the most recent scan
data_dir = app.config["DATA_DIR"]
scan_files = sorted(data_dir.glob("scan_*.json"), reverse=True)
if not scan_files:
return jsonify({"error": "No scans found"}), 404
scan_file = scan_files[0]
with open(scan_file) as f:
scan = json.load(f)
# Find and update the device
updated = False
device_type = None
# Check WiFi networks (by BSSID)
for net in scan.get("wifi_networks", []):
if net.get("bssid") == device_id:
if floor is not None:
net["floor"] = int(floor)
if height_m is not None:
net["height_m"] = float(height_m)
updated = True
device_type = "wifi"
break
# Check Bluetooth devices (by address)
if not updated:
for dev in scan.get("bluetooth_devices", []):
if dev.get("address") == device_id:
if floor is not None:
dev["floor"] = int(floor)
if height_m is not None:
dev["height_m"] = float(height_m)
updated = True
device_type = "bluetooth"
break
if not updated:
return jsonify({"error": "Device not found"}), 404
# Save the updated scan
with open(scan_file, "w") as f:
json.dump(scan, f, indent=2)
return jsonify({
"status": "updated",
"device_id": device_id,
"device_type": device_type,
"floor": floor,
"height_m": height_m
})
@app.route("/api/device/<device_id>/distance", methods=["POST"])
def api_device_distance(device_id: str):
"""Set custom distance for a device in the most recent scan"""
data = request.get_json() or {}
distance = data.get("distance")
if distance is None:
return jsonify({"error": "Must provide distance"}), 400
# Load the most recent scan
data_dir = app.config["DATA_DIR"]
scan_files = sorted(data_dir.glob("scan_*.json"), reverse=True)
if not scan_files:
return jsonify({"error": "No scans found"}), 404
scan_file = scan_files[0]
with open(scan_file) as f:
scan = json.load(f)
# Find and update the device
updated = False
device_type = None
# Check WiFi networks (by BSSID)
for net in scan.get("wifi_networks", []):
if net.get("bssid") == device_id:
net["custom_distance_m"] = float(distance) if distance else None
updated = True
device_type = "wifi"
break
# Check Bluetooth devices (by address)
if not updated:
for dev in scan.get("bluetooth_devices", []):
if dev.get("address") == device_id:
dev["custom_distance_m"] = float(distance) if distance else None
updated = True
device_type = "bluetooth"
break
if not updated:
return jsonify({"error": "Device not found"}), 404
# Save the updated scan
with open(scan_file, "w") as f:
json.dump(scan, f, indent=2)
return jsonify({
"status": "updated",
"device_id": device_id,
"device_type": device_type,
"custom_distance_m": distance
})
# Store previous distances for movement detection logging
_bt_previous_distances: dict = {}
@app.route("/api/scan/bt", methods=["POST"])
def api_scan_bt():
"""Quick Bluetooth-only scan for real-time tracking using bleak (BLE)"""
import asyncio
from bleak import BleakScanner
bt = []
try:
async def do_scan():
devices = await BleakScanner.discover(timeout=3.0, return_adv=True)
results = []
for addr, (device, adv) in devices.items():
rssi = adv.rssi if adv else -70
name = device.name or '<unknown>'
results.append({
'address': addr,
'name': name,
'rssi': rssi,
'device_class': 'BLE'
})
return results
# Run async scan in sync context
bt = asyncio.run(do_scan())
print(f"[BT] Bleak scan found {len(bt)} devices")
except Exception as e:
print(f"[BT] Bleak scan error: {e}")
bt = []
# The scanner already does auto-identification if enabled
# Just use the device_type already assigned by the scanner
timestamp = datetime.now().isoformat()
response_data = {
"timestamp": timestamp,
"bluetooth_devices": []
}
# Get database reference
db = app.config.get("DATABASE")
for dev in bt:
rssi = dev['rssi']
addr = dev['address']
name = dev['name']
device_class = dev.get('device_class', 'BLE')
dist = estimate_distance(rssi, tx_power=-65)
# Log distance changes for debugging
prev_dist = _bt_previous_distances.get(addr)
if prev_dist is not None:
delta = abs(dist - prev_dist)
if delta > 0.3: # Log if changed by more than 0.3m
print(f"[BT] {name[:20]:<20} RSSI:{rssi:>4} dist:{dist:>5.1f}m (Δ{delta:+.1f}m)")
else:
print(f"[BT] {name[:20]:<20} RSSI:{rssi:>4} dist:{dist:>5.1f}m (new)")
_bt_previous_distances[addr] = dist
# Record to database for historical tracking
if db:
db.record_bluetooth_observation(
address=addr,
name=name,
rssi=rssi,
distance_m=dist,
device_class=device_class,
device_type="Unknown",
manufacturer="",
floor=None,
scan_id=None # Live tracking, no scan_id
)
response_data["bluetooth_devices"].append({
"address": addr,
"name": name,
"rssi": rssi,
"device_class": device_class,
"device_type": "Unknown",
"manufacturer": "",
"estimated_distance_m": round(dist, 2),
"signal_quality": "Fair",
"floor": None,
"height_m": None
})
return jsonify(response_data)
# ==================== Historical Data API ====================
@app.route("/api/history/devices")
def api_history_devices():
"""Get all tracked devices with statistics"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
device_type = request.args.get("type") # 'wifi' or 'bluetooth'
since = request.args.get("since") # ISO timestamp
limit = int(request.args.get("limit", 100))
devices = db.get_all_devices(device_type=device_type, since=since, limit=limit)
return jsonify({"devices": devices, "count": len(devices)})
@app.route("/api/history/devices/<device_id>")
def api_history_device(device_id: str):
"""Get detailed info and stats for a specific device"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
device = db.get_device(device_id)
if not device:
return jsonify({"error": "Device not found"}), 404
stats = db.get_device_stats(device_id)
return jsonify({
"device": device,
"stats": {
"avg_rssi": stats.avg_rssi if stats else None,
"min_rssi": stats.min_rssi if stats else None,
"max_rssi": stats.max_rssi if stats else None,
"avg_distance_m": stats.avg_distance_m if stats else None,
"min_distance_m": stats.min_distance_m if stats else None,
"max_distance_m": stats.max_distance_m if stats else None,
"total_observations": stats.total_observations if stats else 0
}
})
@app.route("/api/history/devices/<device_id>/rssi")
def api_history_device_rssi(device_id: str):
"""Get RSSI history for a device (time series)"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
since = request.args.get("since")
limit = int(request.args.get("limit", 1000))
history = db.get_device_rssi_history(device_id, since=since, limit=limit)
return jsonify({
"device_id": device_id,
"observations": [
{
"timestamp": obs.timestamp,
"rssi": obs.rssi,
"distance_m": obs.distance_m,
"floor": obs.floor
}
for obs in history
],
"count": len(history)
})
@app.route("/api/history/devices/<device_id>/activity")
def api_history_device_activity(device_id: str):
"""Get activity patterns for a device (hourly/daily)"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
days = int(request.args.get("days", 7))
pattern = db.get_device_activity_pattern(device_id, days=days)
return jsonify(pattern)
@app.route("/api/history/devices/<device_id>/label", methods=["POST"])
def api_history_device_label(device_id: str):
"""Set a custom label for a device"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
data = request.get_json() or {}
label = data.get("label", "")
db.set_device_label(device_id, label)
return jsonify({"status": "updated", "device_id": device_id, "label": label})
@app.route("/api/history/devices/<device_id>/favorite", methods=["POST"])
def api_history_device_favorite(device_id: str):
"""Toggle favorite status for a device"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
data = request.get_json() or {}
is_favorite = data.get("favorite", True)
db.set_device_favorite(device_id, is_favorite)
return jsonify({"status": "updated", "device_id": device_id, "favorite": is_favorite})
@app.route("/api/history/movement")
def api_history_movement():
"""Get movement events"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
device_id = request.args.get("device_id")
since = request.args.get("since")
limit = int(request.args.get("limit", 100))
events = db.get_movement_events(device_id=device_id, since=since, limit=limit)
return jsonify({"events": events, "count": len(events)})
@app.route("/api/history/alerts")
def api_history_alerts():
"""Get alerts (new devices, absences, etc.)"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
acknowledged = request.args.get("acknowledged")
if acknowledged is not None:
acknowledged = acknowledged.lower() == "true"
alert_type = request.args.get("type")
limit = int(request.args.get("limit", 50))
alerts = db.get_alerts(acknowledged=acknowledged, alert_type=alert_type, limit=limit)
return jsonify({"alerts": alerts, "count": len(alerts)})
@app.route("/api/history/alerts/<int:alert_id>/acknowledge", methods=["POST"])
def api_history_alert_acknowledge(alert_id: int):
"""Acknowledge an alert"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
db.acknowledge_alert(alert_id)
return jsonify({"status": "acknowledged", "alert_id": alert_id})
@app.route("/api/history/activity")
def api_history_activity():
"""Get recent activity summary"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
hours = int(request.args.get("hours", 24))
activity = db.get_recent_activity(hours=hours)
return jsonify(activity)
@app.route("/api/history/stats")
def api_history_stats():
"""Get database statistics"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
stats = db.get_database_stats()
return jsonify(stats)
@app.route("/api/history/cleanup", methods=["POST"])
def api_history_cleanup():
"""Manually trigger data cleanup"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
rf_config = app.config["RF_CONFIG"]
data = request.get_json() or {}
retention_days = data.get("retention_days", rf_config.database.retention_days)
result = db.cleanup_old_data(retention_days)
return jsonify(result)
return app
def run_server(
host: str | None = None,
port: int | None = None,
debug: bool | None = None,
config: Config | None = None,
profile_requests: bool = False,
log_requests: bool = False
):
"""Run the Flask development server"""
if config is None:
config = get_config()
# Use config values as defaults, allow overrides
host = host or config.web.host
port = port or config.web.port
debug = debug if debug is not None else config.web.debug
app = create_app(config)
# Add request profiling middleware if enabled
if profile_requests:
from ..profiling import add_profiler_middleware
profile_dir = config.get_data_dir() / "profiles"
add_profiler_middleware(app, profile_dir)
# Add request logging if enabled
if log_requests:
from ..profiling import add_request_logging_middleware
log_dir = config.get_data_dir() / "logs"
add_request_logging_middleware(app, log_dir)
print(f"\n{'='*60}")
print("RF Mapper Web Interface")
print(f"{'='*60}")
print(f"Config file: {config._config_path}")
print(f"GPS Position: {config.gps.latitude}, {config.gps.longitude}")
print(f"Data directory: {config.get_data_dir()}")
if config.auto_scan.enabled:
print(f"Auto-scan: ENABLED (every {config.auto_scan.interval_minutes} min)")
else:
print(f"Auto-scan: disabled")
if profile_requests:
print(f"Request profiling: ENABLED")
print(f"Profile output: {config.get_data_dir() / 'profiles'}")
if log_requests:
print(f"Request logging: ENABLED")
print(f"Log output: {config.get_data_dir() / 'logs'}")
print(f"{'='*60}")
print(f"Server running at: http://{host}:{port}")
print(f"Local access: http://localhost:{port}")
print(f"Network access: http://<your-ip>:{port}")
print(f"{'='*60}\n")
app.run(host=host, port=port, debug=debug)