Files
rf-mapper/src/rf_mapper/web/app.py
User 0ffd220022 fix: use configured wifi_interface for WiFi scanning
The scan_wifi() calls were using the default "wlan0" instead of
the configured interface from config.yaml. This broke WiFi scanning
on systems with non-standard interface names like wlo1.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 03:29:48 +01:00

1465 lines
53 KiB
Python

"""Flask application factory for RF Mapper web interface"""
import json
import os
import threading
import time
from datetime import datetime
from pathlib import Path
from flask import Flask, jsonify, render_template, request
from ..scanner import RFScanner
from ..distance import estimate_distance
from ..config import Config, get_config
from ..bluetooth_identify import identify_single_device, identify_device
from ..database import DeviceDatabase, init_database, get_database
from ..homeassistant import HAWebhooks, HAWebhookConfig
class AutoScanner:
"""Background scanner that runs periodic scans"""
def __init__(self, app: Flask):
self.app = app
self._thread: threading.Thread | None = None
self._stop_event = threading.Event()
self._enabled = False
self._interval_minutes = 5
self._location_label = "auto_scan"
self._scan_wifi = True
self._scan_bluetooth = True
self._last_scan_time: datetime | None = None
self._last_scan_result: dict | None = None
self._scan_count = 0
self._lock = threading.Lock()
def start(self, interval_minutes: int | None = None, location_label: str | None = None,
scan_wifi: bool = True, scan_bluetooth: bool = True):
"""Start the background scanner"""
with self._lock:
if self._thread and self._thread.is_alive():
return # Already running
if interval_minutes:
self._interval_minutes = interval_minutes
if location_label:
self._location_label = location_label
self._scan_wifi = scan_wifi
self._scan_bluetooth = scan_bluetooth
self._stop_event.clear()
self._enabled = True
self._thread = threading.Thread(target=self._scan_loop, daemon=True)
self._thread.start()
services = []
if self._scan_wifi:
services.append("WiFi")
if self._scan_bluetooth:
services.append("BT")
print(f"[AutoScanner] Started - scanning {'+'.join(services)} every {self._interval_minutes} minutes")
def stop(self):
"""Stop the background scanner"""
with self._lock:
self._enabled = False
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5)
self._thread = None
print("[AutoScanner] Stopped")
def _scan_loop(self):
"""Main scanning loop"""
while not self._stop_event.is_set():
try:
self._perform_scan()
except Exception as e:
print(f"[AutoScanner] Scan error: {e}")
# Wait for interval or stop signal
self._stop_event.wait(timeout=self._interval_minutes * 60)
def _perform_scan(self):
"""Perform a single scan"""
with self.app.app_context():
rf_config = self.app.config["RF_CONFIG"]
data_dir = self.app.config["DATA_DIR"]
services = []
if self._scan_wifi:
services.append("WiFi")
if self._scan_bluetooth:
services.append("BT")
print(f"[AutoScanner] Starting {'+'.join(services)} scan...")
scanner = RFScanner(data_dir)
wifi = []
bt = []
if self._scan_wifi:
wifi = scanner.scan_wifi(rf_config.scanner.wifi_interface)
if self._scan_bluetooth:
bt = scanner.scan_bluetooth()
if rf_config.scanner.auto_identify_bluetooth:
from ..bluetooth_identify import infer_device_type_from_name, infer_device_type_from_manufacturer
for dev in bt:
if dev.device_type == "Unknown":
inferred = infer_device_type_from_name(dev.name)
if inferred == "Unknown":
inferred = infer_device_type_from_manufacturer(dev.manufacturer)
dev.device_type = inferred
# Save the scan result
from dataclasses import asdict
from ..scanner import ScanResult
timestamp = datetime.now().isoformat()
result = ScanResult(
timestamp=timestamp,
location_label=self._location_label,
wifi_networks=[asdict(n) for n in wifi],
bluetooth_devices=[asdict(d) for d in bt]
)
# Save to file
scan_id = f"scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{self._location_label}"
filename = data_dir / f"{scan_id}.json"
with open(filename, 'w') as f:
json.dump(asdict(result), f, indent=2)
# Record to database if enabled
db = self.app.config.get("DATABASE")
if db:
lat = self.app.config.get("CURRENT_LAT", 0)
lon = self.app.config.get("CURRENT_LON", 0)
db.record_scan(scan_id, timestamp, self._location_label, lat, lon, len(wifi), len(bt))
for net in wifi:
dist = estimate_distance(net.rssi)
db.record_wifi_observation(
bssid=net.bssid,
ssid=net.ssid,
rssi=net.rssi,
distance_m=dist,
channel=net.channel,
frequency=net.frequency,
encryption=net.encryption,
manufacturer=net.manufacturer,
floor=net.floor,
scan_id=scan_id
)
for dev in bt:
dist = estimate_distance(dev.rssi, tx_power=-65)
db.record_bluetooth_observation(
address=dev.address,
name=dev.name,
rssi=dev.rssi,
distance_m=dist,
device_class=dev.device_class,
device_type=dev.device_type,
manufacturer=dev.manufacturer,
floor=dev.floor,
scan_id=scan_id
)
self._last_scan_time = datetime.now()
self._scan_count += 1
self._last_scan_result = {
"timestamp": timestamp,
"wifi_count": len(wifi),
"bt_count": len(bt)
}
print(f"[AutoScanner] Scan complete - WiFi: {len(wifi)}, BT: {len(bt)}")
@property
def status(self) -> dict:
"""Get current status"""
return {
"enabled": self._enabled,
"running": self._thread is not None and self._thread.is_alive(),
"interval_minutes": self._interval_minutes,
"location_label": self._location_label,
"scan_wifi": self._scan_wifi,
"scan_bluetooth": self._scan_bluetooth,
"last_scan_time": self._last_scan_time.isoformat() if self._last_scan_time else None,
"last_scan_result": self._last_scan_result,
"scan_count": self._scan_count
}
def update_settings(self, interval_minutes: int | None = None, location_label: str | None = None):
"""Update scanner settings"""
with self._lock:
if interval_minutes and interval_minutes > 0:
self._interval_minutes = interval_minutes
if location_label:
self._location_label = location_label
def create_app(config: Config | None = None) -> Flask:
"""Create and configure the Flask application"""
if config is None:
config = get_config()
app = Flask(
__name__,
template_folder=str(Path(__file__).parent / "templates"),
static_folder=str(Path(__file__).parent / "static")
)
# Store config reference
app.config["RF_CONFIG"] = config
# Data directory from config
app.config["DATA_DIR"] = config.get_data_dir()
app.config["DATA_DIR"].mkdir(parents=True, exist_ok=True)
# GPS position from config
app.config["DEFAULT_LAT"] = config.gps.latitude
app.config["DEFAULT_LON"] = config.gps.longitude
# Store current GPS position (can be updated via API)
app.config["CURRENT_LAT"] = app.config["DEFAULT_LAT"]
app.config["CURRENT_LON"] = app.config["DEFAULT_LON"]
# Initialize auto-scanner
auto_scanner = AutoScanner(app)
app.config["AUTO_SCANNER"] = auto_scanner
# Initialize database if enabled
if config.database.enabled:
db = init_database(config.get_database_path())
app.config["DATABASE"] = db
print(f"[Database] Initialized at {config.get_database_path()}")
# Schedule cleanup if auto_cleanup is enabled
if config.database.auto_cleanup:
def cleanup_task():
import time
while True:
time.sleep(3600 * 24) # Run daily
try:
db.cleanup_old_data(config.database.retention_days)
print(f"[Database] Cleaned up data older than {config.database.retention_days} days")
except Exception as e:
print(f"[Database] Cleanup error: {e}")
cleanup_thread = threading.Thread(target=cleanup_task, daemon=True)
cleanup_thread.start()
else:
app.config["DATABASE"] = None
# Initialize peer sync if enabled
if config.database.enabled and config.scanner.sync_interval_seconds > 0:
from ..sync import PeerSync
peer_sync = PeerSync(config, db)
app.config["PEER_SYNC"] = peer_sync
peer_sync.start()
print(f"[Sync] Peer sync enabled (interval: {config.scanner.sync_interval_seconds}s)")
else:
app.config["PEER_SYNC"] = None
# Initialize Home Assistant webhooks if enabled
ha_webhook_config = HAWebhookConfig(
enabled=config.home_assistant.enabled,
url=config.home_assistant.url,
webhook_scan=config.home_assistant.webhook_scan,
webhook_new_device=config.home_assistant.webhook_new_device,
webhook_device_gone=config.home_assistant.webhook_device_gone,
device_timeout_minutes=config.home_assistant.device_timeout_minutes
)
ha_webhooks = HAWebhooks(ha_webhook_config)
app.config["HA_WEBHOOKS"] = ha_webhooks
# Build and store scanner identity for webhooks
scanner_identity = config.get_scanner_identity()
app.config["SCANNER_IDENTITY"] = scanner_identity
if config.home_assistant.enabled:
print(f"[Home Assistant] Webhooks enabled -> {config.home_assistant.url}")
print(f"[Scanner] ID: {scanner_identity['id']} @ floor {scanner_identity['floor']}")
# Start absence checker thread if database is enabled
if config.database.enabled:
def absence_checker():
"""Background thread to detect departed devices."""
while True:
time.sleep(60) # Check every minute
try:
db = app.config.get("DATABASE")
scanner_identity = app.config.get("SCANNER_IDENTITY", {})
if db and ha_webhooks.config.enabled:
departed = db.get_recently_departed(
config.home_assistant.device_timeout_minutes
)
for device in departed:
name = device.get("custom_label") or device.get("name") or device.get("ssid") or device["device_id"]
ha_webhooks.send_device_gone(
device_id=device["device_id"],
name=name,
last_seen=device["last_seen"],
device_type=device["device_type"],
last_scanner=scanner_identity
)
db.mark_departure_notified(device["device_id"])
print(f"[HA Webhook] Device departed: {name}")
except Exception as e:
print(f"[HA Webhook] Absence checker error: {e}")
absence_thread = threading.Thread(target=absence_checker, daemon=True)
absence_thread.start()
print(f"[Home Assistant] Absence checker started (timeout: {config.home_assistant.device_timeout_minutes} min)")
# Start auto-scanner if enabled in config
if config.auto_scan.enabled:
auto_scanner.start(
interval_minutes=config.auto_scan.interval_minutes,
location_label=config.auto_scan.location_label
)
@app.route("/")
def index():
"""Main dashboard page"""
rf_config = app.config["RF_CONFIG"]
return render_template(
"index.html",
lat=app.config["CURRENT_LAT"],
lon=app.config["CURRENT_LON"],
building={
"enabled": rf_config.building.enabled,
"name": rf_config.building.name,
"floors": rf_config.building.floors,
"floor_height_m": rf_config.building.floor_height_m,
"ground_floor_number": rf_config.building.ground_floor_number,
"current_floor": rf_config.building.current_floor
}
)
@app.route("/api/scan", methods=["POST"])
def api_scan():
"""Trigger a new RF scan"""
data = request.get_json() or {}
location_label = data.get("location", "web_scan")
lat = data.get("lat", app.config["CURRENT_LAT"])
lon = data.get("lon", app.config["CURRENT_LON"])
scan_wifi = data.get("scan_wifi", True)
scan_bluetooth = data.get("scan_bluetooth", True)
# Update current position
app.config["CURRENT_LAT"] = lat
app.config["CURRENT_LON"] = lon
rf_config = app.config["RF_CONFIG"]
scanner = RFScanner(app.config["DATA_DIR"])
# Only scan enabled services
wifi = []
bt = []
if scan_wifi:
wifi = scanner.scan_wifi(rf_config.scanner.wifi_interface)
if scan_bluetooth:
bt = scanner.scan_bluetooth()
if rf_config.scanner.auto_identify_bluetooth:
from ..bluetooth_identify import infer_device_type_from_name, infer_device_type_from_manufacturer
for dev in bt:
if dev.device_type == "Unknown":
inferred = infer_device_type_from_name(dev.name)
if inferred == "Unknown":
inferred = infer_device_type_from_manufacturer(dev.manufacturer)
dev.device_type = inferred
# Create and save result
from dataclasses import asdict
from ..scanner import ScanResult
timestamp = datetime.now().isoformat()
result = ScanResult(
timestamp=timestamp,
location_label=location_label,
wifi_networks=[asdict(n) for n in wifi],
bluetooth_devices=[asdict(d) for d in bt]
)
scan_id = f"scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{location_label}"
if scan_wifi or scan_bluetooth:
# Save to file
filename = app.config["DATA_DIR"] / f"{scan_id}.json"
with open(filename, 'w') as f:
json.dump(asdict(result), f, indent=2)
# Record to database if enabled
db = app.config.get("DATABASE")
if db:
db.record_scan(scan_id, timestamp, location_label, lat, lon, len(wifi), len(bt))
for net in wifi:
dist = estimate_distance(net.rssi)
db.record_wifi_observation(
bssid=net.bssid,
ssid=net.ssid,
rssi=net.rssi,
distance_m=dist,
channel=net.channel,
frequency=net.frequency,
encryption=net.encryption,
manufacturer=net.manufacturer,
floor=net.floor,
scan_id=scan_id
)
for dev in bt:
dist = estimate_distance(dev.rssi, tx_power=-65)
db.record_bluetooth_observation(
address=dev.address,
name=dev.name,
rssi=dev.rssi,
distance_m=dist,
device_class=dev.device_class,
device_type=dev.device_type,
manufacturer=dev.manufacturer,
floor=dev.floor,
scan_id=scan_id
)
# Enrich with GPS and distance data
response_data = {
"timestamp": timestamp,
"location": location_label,
"gps": {"lat": lat, "lon": lon},
"wifi_networks": [],
"bluetooth_devices": []
}
# Don't assign default floor to detected devices - their floor is unknown
# Only the scanner position has a known floor (from building config)
for net in wifi:
dist = estimate_distance(net.rssi)
response_data["wifi_networks"].append({
"ssid": net.ssid,
"bssid": net.bssid,
"rssi": net.rssi,
"channel": net.channel,
"frequency": net.frequency,
"encryption": net.encryption,
"manufacturer": net.manufacturer,
"estimated_distance_m": round(dist, 2),
"signal_quality": net.signal_quality,
"floor": net.floor, # None = unknown floor
"height_m": net.height_m
})
for dev in bt:
dist = estimate_distance(dev.rssi, tx_power=-65)
response_data["bluetooth_devices"].append({
"address": dev.address,
"name": dev.name,
"rssi": dev.rssi,
"device_class": dev.device_class,
"device_type": dev.device_type,
"manufacturer": dev.manufacturer,
"estimated_distance_m": round(dist, 2),
"signal_quality": dev.signal_quality,
"floor": dev.floor, # None = unknown floor
"height_m": dev.height_m
})
return jsonify(response_data)
@app.route("/api/scans")
def api_list_scans():
"""List all saved scans"""
data_dir = app.config["DATA_DIR"]
scan_files = sorted(data_dir.glob("scan_*.json"), reverse=True)
scans = []
for f in scan_files[:50]: # Limit to 50 most recent
try:
with open(f) as fh:
scan = json.load(fh)
scans.append({
"filename": f.name,
"timestamp": scan.get("timestamp", ""),
"location": scan.get("location_label", "unknown"),
"wifi_count": len(scan.get("wifi_networks", [])),
"bt_count": len(scan.get("bluetooth_devices", []))
})
except Exception:
pass
return jsonify(scans)
@app.route("/api/scans/<filename>")
def api_get_scan(filename: str):
"""Get a specific scan by filename"""
data_dir = app.config["DATA_DIR"]
filepath = data_dir / filename
if not filepath.exists() or not filepath.name.startswith("scan_"):
return jsonify({"error": "Scan not found"}), 404
with open(filepath) as f:
scan = json.load(f)
# Enrich with distance estimates and ensure floor fields exist
for net in scan.get("wifi_networks", []):
net["estimated_distance_m"] = round(estimate_distance(net["rssi"]), 2)
if "floor" not in net:
net["floor"] = None
if "height_m" not in net:
net["height_m"] = None
for dev in scan.get("bluetooth_devices", []):
dev["estimated_distance_m"] = round(estimate_distance(dev["rssi"], tx_power=-65), 2)
if "floor" not in dev:
dev["floor"] = None
if "height_m" not in dev:
dev["height_m"] = None
return jsonify(scan)
@app.route("/api/latest")
def api_latest_scan():
"""Get the most recent scan"""
data_dir = app.config["DATA_DIR"]
scan_files = sorted(data_dir.glob("scan_*.json"), reverse=True)
if not scan_files:
return jsonify({"error": "No scans found"}), 404
with open(scan_files[0]) as f:
scan = json.load(f)
# Enrich with distance estimates and ensure floor fields exist
for net in scan.get("wifi_networks", []):
net["estimated_distance_m"] = round(estimate_distance(net["rssi"]), 2)
if "floor" not in net:
net["floor"] = None
if "height_m" not in net:
net["height_m"] = None
for dev in scan.get("bluetooth_devices", []):
dev["estimated_distance_m"] = round(estimate_distance(dev["rssi"], tx_power=-65), 2)
if "floor" not in dev:
dev["floor"] = None
if "height_m" not in dev:
dev["height_m"] = None
scan["gps"] = {
"lat": app.config["CURRENT_LAT"],
"lon": app.config["CURRENT_LON"]
}
return jsonify(scan)
@app.route("/api/position", methods=["GET", "POST"])
def api_position():
"""Get or set current GPS position"""
if request.method == "POST":
data = request.get_json() or {}
if "lat" in data and "lon" in data:
app.config["CURRENT_LAT"] = float(data["lat"])
app.config["CURRENT_LON"] = float(data["lon"])
return jsonify({
"lat": app.config["CURRENT_LAT"],
"lon": app.config["CURRENT_LON"]
})
@app.route("/api/config", methods=["GET", "POST"])
def api_config():
"""Get or update configuration"""
rf_config = app.config["RF_CONFIG"]
if request.method == "POST":
data = request.get_json() or {}
# Update GPS
if "gps" in data:
if "latitude" in data["gps"]:
rf_config.gps.latitude = float(data["gps"]["latitude"])
app.config["DEFAULT_LAT"] = rf_config.gps.latitude
app.config["CURRENT_LAT"] = rf_config.gps.latitude
if "longitude" in data["gps"]:
rf_config.gps.longitude = float(data["gps"]["longitude"])
app.config["DEFAULT_LON"] = rf_config.gps.longitude
app.config["CURRENT_LON"] = rf_config.gps.longitude
# Update scanner settings
if "scanner" in data:
if "path_loss_exponent" in data["scanner"]:
rf_config.scanner.path_loss_exponent = float(data["scanner"]["path_loss_exponent"])
# Optionally save to file
if data.get("save", False):
rf_config.save()
return jsonify({
"gps": {
"latitude": rf_config.gps.latitude,
"longitude": rf_config.gps.longitude
},
"web": {
"host": rf_config.web.host,
"port": rf_config.web.port
},
"scanner": {
"wifi_interface": rf_config.scanner.wifi_interface,
"bt_scan_timeout": rf_config.scanner.bt_scan_timeout,
"path_loss_exponent": rf_config.scanner.path_loss_exponent
},
"config_file": str(rf_config._config_path) if rf_config._config_path else None
})
@app.route("/api/bluetooth/identify/<address>")
def api_bluetooth_identify(address: str):
"""Identify a Bluetooth device by address"""
try:
info = identify_single_device(address)
return jsonify(info)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/bluetooth/identify", methods=["POST"])
def api_bluetooth_identify_batch():
"""Identify multiple Bluetooth devices"""
data = request.get_json() or {}
addresses = data.get("addresses", [])
results = {}
for addr in addresses[:10]: # Limit to 10 devices
try:
results[addr] = identify_single_device(addr)
except Exception as e:
results[addr] = {"error": str(e)}
return jsonify(results)
@app.route("/api/autoscan", methods=["GET"])
def api_autoscan_status():
"""Get auto-scan status"""
auto_scanner = app.config["AUTO_SCANNER"]
return jsonify(auto_scanner.status)
@app.route("/api/autoscan/start", methods=["POST"])
def api_autoscan_start():
"""Start auto-scanning"""
auto_scanner = app.config["AUTO_SCANNER"]
rf_config = app.config["RF_CONFIG"]
data = request.get_json() or {}
interval = data.get("interval_minutes", rf_config.auto_scan.interval_minutes)
location = data.get("location_label", rf_config.auto_scan.location_label)
scan_wifi = data.get("scan_wifi", True)
scan_bluetooth = data.get("scan_bluetooth", True)
auto_scanner.start(
interval_minutes=interval,
location_label=location,
scan_wifi=scan_wifi,
scan_bluetooth=scan_bluetooth
)
# Update config
rf_config.auto_scan.enabled = True
rf_config.auto_scan.interval_minutes = interval
rf_config.auto_scan.location_label = location
# Save if requested
if data.get("save", False):
rf_config.save()
return jsonify({"status": "started", **auto_scanner.status})
@app.route("/api/autoscan/stop", methods=["POST"])
def api_autoscan_stop():
"""Stop auto-scanning"""
auto_scanner = app.config["AUTO_SCANNER"]
rf_config = app.config["RF_CONFIG"]
data = request.get_json() or {}
auto_scanner.stop()
# Update config
rf_config.auto_scan.enabled = False
# Save if requested
if data.get("save", False):
rf_config.save()
return jsonify({"status": "stopped", **auto_scanner.status})
@app.route("/api/autoscan/settings", methods=["POST"])
def api_autoscan_settings():
"""Update auto-scan settings"""
auto_scanner = app.config["AUTO_SCANNER"]
rf_config = app.config["RF_CONFIG"]
data = request.get_json() or {}
interval = data.get("interval_minutes")
location = data.get("location_label")
auto_scanner.update_settings(interval_minutes=interval, location_label=location)
# Update config
if interval:
rf_config.auto_scan.interval_minutes = interval
if location:
rf_config.auto_scan.location_label = location
# Save if requested
if data.get("save", False):
rf_config.save()
return jsonify(auto_scanner.status)
@app.route("/api/building", methods=["GET", "POST"])
def api_building():
"""Get or update building configuration"""
rf_config = app.config["RF_CONFIG"]
if request.method == "POST":
data = request.get_json() or {}
if "enabled" in data:
rf_config.building.enabled = bool(data["enabled"])
if "name" in data:
rf_config.building.name = str(data["name"])
if "floors" in data:
rf_config.building.floors = int(data["floors"])
if "floor_height_m" in data:
rf_config.building.floor_height_m = float(data["floor_height_m"])
if "ground_floor_number" in data:
rf_config.building.ground_floor_number = int(data["ground_floor_number"])
if "current_floor" in data:
rf_config.building.current_floor = int(data["current_floor"])
# Save if requested
if data.get("save", False):
rf_config.save()
return jsonify({
"enabled": rf_config.building.enabled,
"name": rf_config.building.name,
"floors": rf_config.building.floors,
"floor_height_m": rf_config.building.floor_height_m,
"ground_floor_number": rf_config.building.ground_floor_number,
"current_floor": rf_config.building.current_floor
})
@app.route("/api/device/<device_id>/floor", methods=["POST"])
def api_device_floor(device_id: str):
"""Assign floor to a device - stores in database for persistence"""
data = request.get_json() or {}
floor = data.get("floor")
height_m = data.get("height_m")
if floor is None and height_m is None:
return jsonify({"error": "Must provide floor or height_m"}), 400
# Store floor in database for persistence
db = app.config.get("DATABASE")
if db and floor is not None:
db.set_device_floor(device_id, int(floor))
# Also update the most recent scan file if device exists there
data_dir = app.config["DATA_DIR"]
scan_files = sorted(data_dir.glob("scan_*.json"), reverse=True)
device_type = "unknown"
if scan_files:
scan_file = scan_files[0]
with open(scan_file) as f:
scan = json.load(f)
updated = False
for net in scan.get("wifi_networks", []):
if net.get("bssid") == device_id:
if floor is not None:
net["floor"] = int(floor)
if height_m is not None:
net["height_m"] = float(height_m)
updated = True
device_type = "wifi"
break
if not updated:
for dev in scan.get("bluetooth_devices", []):
if dev.get("address") == device_id:
if floor is not None:
dev["floor"] = int(floor)
if height_m is not None:
dev["height_m"] = float(height_m)
updated = True
device_type = "bluetooth"
break
if updated:
with open(scan_file, "w") as f:
json.dump(scan, f, indent=2)
return jsonify({
"status": "updated",
"device_id": device_id,
"device_type": device_type,
"floor": floor,
"height_m": height_m
})
@app.route("/api/device/<device_id>/distance", methods=["POST"])
def api_device_distance(device_id: str):
"""Set custom distance for a device in the most recent scan"""
data = request.get_json() or {}
distance = data.get("distance")
if distance is None:
return jsonify({"error": "Must provide distance"}), 400
# Load the most recent scan
data_dir = app.config["DATA_DIR"]
scan_files = sorted(data_dir.glob("scan_*.json"), reverse=True)
if not scan_files:
return jsonify({"error": "No scans found"}), 404
scan_file = scan_files[0]
with open(scan_file) as f:
scan = json.load(f)
# Find and update the device
updated = False
device_type = None
# Check WiFi networks (by BSSID)
for net in scan.get("wifi_networks", []):
if net.get("bssid") == device_id:
net["custom_distance_m"] = float(distance) if distance else None
updated = True
device_type = "wifi"
break
# Check Bluetooth devices (by address)
if not updated:
for dev in scan.get("bluetooth_devices", []):
if dev.get("address") == device_id:
dev["custom_distance_m"] = float(distance) if distance else None
updated = True
device_type = "bluetooth"
break
if not updated:
return jsonify({"error": "Device not found"}), 404
# Save the updated scan
with open(scan_file, "w") as f:
json.dump(scan, f, indent=2)
return jsonify({
"status": "updated",
"device_id": device_id,
"device_type": device_type,
"custom_distance_m": distance
})
# Store previous distances for movement detection logging
_bt_previous_distances: dict = {}
@app.route("/api/device/floors", methods=["GET"])
def api_device_floors():
"""Get all saved floor assignments and position offsets"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"floors": {}, "positions": {}})
floors = db.get_all_device_floors()
positions = db.get_all_device_positions()
return jsonify({
"floors": floors,
"positions": positions
})
@app.route("/api/device/<device_id>/position", methods=["POST"])
def api_device_position(device_id: str):
"""Set manual position for a floor-assigned device"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
data = request.get_json() or {}
lat_offset = data.get("lat_offset")
lon_offset = data.get("lon_offset")
# Check if device has an assigned floor (required for manual positioning)
device_floor = db.get_device_floor(device_id)
if device_floor is None:
return jsonify({
"error": "Device must have an assigned floor before manual positioning"
}), 400
# If both offsets are None/null, clear the position
if lat_offset is None and lon_offset is None:
db.clear_device_position(device_id)
return jsonify({
"status": "cleared",
"device_id": device_id,
"message": "Position reset to auto (RSSI-based)"
})
# Validate offsets
if lat_offset is None or lon_offset is None:
return jsonify({
"error": "Both lat_offset and lon_offset are required"
}), 400
try:
lat_offset = float(lat_offset)
lon_offset = float(lon_offset)
except (TypeError, ValueError):
return jsonify({"error": "Invalid offset values"}), 400
db.set_device_position(device_id, lat_offset, lon_offset)
return jsonify({
"status": "updated",
"device_id": device_id,
"lat_offset": lat_offset,
"lon_offset": lon_offset
})
@app.route("/api/scan/bt", methods=["POST"])
def api_scan_bt():
"""Quick Bluetooth-only scan for real-time tracking using bleak (BLE)"""
import asyncio
from bleak import BleakScanner
bt = []
try:
async def do_scan():
devices = await BleakScanner.discover(timeout=3.0, return_adv=True)
results = []
for addr, (device, adv) in devices.items():
rssi = adv.rssi if adv else -70
name = device.name or '<unknown>'
results.append({
'address': addr,
'name': name,
'rssi': rssi,
'device_class': 'BLE'
})
return results
# Run async scan in sync context
bt = asyncio.run(do_scan())
print(f"[BT] Bleak scan found {len(bt)} devices")
except Exception as e:
print(f"[BT] Bleak scan error: {e}")
bt = []
# Get saved floor assignments from database
db = app.config.get("DATABASE")
saved_floors = db.get_all_device_floors() if db else {}
# The scanner already does auto-identification if enabled
# Just use the device_type already assigned by the scanner
timestamp = datetime.now().isoformat()
response_data = {
"timestamp": timestamp,
"bluetooth_devices": []
}
# Get database reference
db = app.config.get("DATABASE")
for dev in bt:
rssi = dev['rssi']
addr = dev['address']
name = dev['name']
device_class = dev.get('device_class', 'BLE')
dist = estimate_distance(rssi, tx_power=-65)
# Log distance changes for debugging
prev_dist = _bt_previous_distances.get(addr)
if prev_dist is not None:
delta = abs(dist - prev_dist)
if delta > 0.3: # Log if changed by more than 0.3m
print(f"[BT] {name[:20]:<20} RSSI:{rssi:>4} dist:{dist:>5.1f}m (Δ{delta:+.1f}m)")
else:
print(f"[BT] {name[:20]:<20} RSSI:{rssi:>4} dist:{dist:>5.1f}m (new)")
_bt_previous_distances[addr] = dist
# Record to database for historical tracking
if db:
# Check if this is a new device (for HA webhook)
existing = db.get_device(addr)
is_new_device = existing is None
db.record_bluetooth_observation(
address=addr,
name=name,
rssi=rssi,
distance_m=dist,
device_class=device_class,
device_type="Unknown",
manufacturer="",
floor=None,
scan_id=None # Live tracking, no scan_id
)
# Reset departure notification flag (device is present)
db.reset_departure_notified(addr)
# Send new device webhook to HA
ha_webhooks = app.config.get("HA_WEBHOOKS")
scanner_identity = app.config.get("SCANNER_IDENTITY", {})
if ha_webhooks and ha_webhooks.config.enabled and is_new_device:
ha_webhooks.send_new_device(
device_id=addr,
name=name,
device_type="bluetooth",
scanner=scanner_identity,
rssi=rssi,
distance_m=dist
)
# Get saved floor from database
saved_floor = saved_floors.get(addr)
response_data["bluetooth_devices"].append({
"address": addr,
"name": name,
"rssi": rssi,
"device_class": device_class,
"device_type": "Unknown",
"manufacturer": "",
"estimated_distance_m": round(dist, 2),
"signal_quality": "Fair",
"floor": saved_floor,
"height_m": None
})
# Send scan results to Home Assistant
ha_webhooks = app.config.get("HA_WEBHOOKS")
scanner_identity = app.config.get("SCANNER_IDENTITY", {})
if ha_webhooks and ha_webhooks.config.enabled and response_data["bluetooth_devices"]:
ha_webhooks.send_scan_results(
devices=[{
"id": d["address"],
"name": d["name"],
"rssi": d["rssi"],
"distance": d["estimated_distance_m"],
"floor": d.get("floor")
} for d in response_data["bluetooth_devices"]],
scanner=scanner_identity,
scan_type="bluetooth"
)
return jsonify(response_data)
# ==================== Historical Data API ====================
@app.route("/api/history/devices")
def api_history_devices():
"""Get all tracked devices with statistics"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
device_type = request.args.get("type") # 'wifi' or 'bluetooth'
since = request.args.get("since") # ISO timestamp
limit = int(request.args.get("limit", 100))
devices = db.get_all_devices(device_type=device_type, since=since, limit=limit)
return jsonify({"devices": devices, "count": len(devices)})
@app.route("/api/history/devices/<device_id>")
def api_history_device(device_id: str):
"""Get detailed info and stats for a specific device"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
device = db.get_device(device_id)
if not device:
return jsonify({"error": "Device not found"}), 404
stats = db.get_device_stats(device_id)
return jsonify({
"device": device,
"stats": {
"avg_rssi": stats.avg_rssi if stats else None,
"min_rssi": stats.min_rssi if stats else None,
"max_rssi": stats.max_rssi if stats else None,
"avg_distance_m": stats.avg_distance_m if stats else None,
"min_distance_m": stats.min_distance_m if stats else None,
"max_distance_m": stats.max_distance_m if stats else None,
"total_observations": stats.total_observations if stats else 0
}
})
@app.route("/api/history/devices/<device_id>/rssi")
def api_history_device_rssi(device_id: str):
"""Get RSSI history for a device (time series)"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
since = request.args.get("since")
limit = int(request.args.get("limit", 1000))
history = db.get_device_rssi_history(device_id, since=since, limit=limit)
return jsonify({
"device_id": device_id,
"observations": [
{
"timestamp": obs.timestamp,
"rssi": obs.rssi,
"distance_m": obs.distance_m,
"floor": obs.floor
}
for obs in history
],
"count": len(history)
})
@app.route("/api/history/devices/<device_id>/activity")
def api_history_device_activity(device_id: str):
"""Get activity patterns for a device (hourly/daily)"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
days = int(request.args.get("days", 7))
pattern = db.get_device_activity_pattern(device_id, days=days)
return jsonify(pattern)
@app.route("/api/history/devices/<device_id>/label", methods=["POST"])
def api_history_device_label(device_id: str):
"""Set a custom label for a device"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
data = request.get_json() or {}
label = data.get("label", "")
db.set_device_label(device_id, label)
return jsonify({"status": "updated", "device_id": device_id, "label": label})
@app.route("/api/history/devices/<device_id>/favorite", methods=["POST"])
def api_history_device_favorite(device_id: str):
"""Toggle favorite status for a device"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
data = request.get_json() or {}
is_favorite = data.get("favorite", True)
db.set_device_favorite(device_id, is_favorite)
return jsonify({"status": "updated", "device_id": device_id, "favorite": is_favorite})
@app.route("/api/history/movement")
def api_history_movement():
"""Get movement events"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
device_id = request.args.get("device_id")
since = request.args.get("since")
limit = int(request.args.get("limit", 100))
events = db.get_movement_events(device_id=device_id, since=since, limit=limit)
return jsonify({"events": events, "count": len(events)})
@app.route("/api/history/alerts")
def api_history_alerts():
"""Get alerts (new devices, absences, etc.)"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
acknowledged = request.args.get("acknowledged")
if acknowledged is not None:
acknowledged = acknowledged.lower() == "true"
alert_type = request.args.get("type")
limit = int(request.args.get("limit", 50))
alerts = db.get_alerts(acknowledged=acknowledged, alert_type=alert_type, limit=limit)
return jsonify({"alerts": alerts, "count": len(alerts)})
@app.route("/api/history/alerts/<int:alert_id>/acknowledge", methods=["POST"])
def api_history_alert_acknowledge(alert_id: int):
"""Acknowledge an alert"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
db.acknowledge_alert(alert_id)
return jsonify({"status": "acknowledged", "alert_id": alert_id})
@app.route("/api/history/activity")
def api_history_activity():
"""Get recent activity summary"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
hours = int(request.args.get("hours", 24))
activity = db.get_recent_activity(hours=hours)
return jsonify(activity)
@app.route("/api/history/stats")
def api_history_stats():
"""Get database statistics"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
stats = db.get_database_stats()
return jsonify(stats)
@app.route("/api/history/cleanup", methods=["POST"])
def api_history_cleanup():
"""Manually trigger data cleanup"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
rf_config = app.config["RF_CONFIG"]
data = request.get_json() or {}
retention_days = data.get("retention_days", rf_config.database.retention_days)
result = db.cleanup_old_data(retention_days)
return jsonify(result)
# ==================== Peer Sync API ====================
@app.route("/api/peers", methods=["GET"])
def api_get_peers():
"""Get list of known peers and this scanner's identity"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
peers = db.get_peers()
peer_sync = app.config.get("PEER_SYNC")
return jsonify({
"this_scanner": app.config["SCANNER_IDENTITY"],
"peers": peers,
"sync_status": peer_sync.get_status() if peer_sync else None
})
@app.route("/api/peers/register", methods=["POST"])
def api_register_peer():
"""Register a peer scanner (called by other scanners)"""
rf_config = app.config["RF_CONFIG"]
if not rf_config.scanner.accept_registrations:
return jsonify({"error": "Registration disabled on this scanner"}), 403
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
data = request.get_json() or {}
# Validate required fields
peer_id = data.get("id")
peer_url = data.get("url")
if not peer_id or not peer_url:
return jsonify({"error": "Missing required fields: id, url"}), 400
# Register the peer
is_new = db.register_peer(
scanner_id=peer_id,
name=data.get("name", peer_id),
url=peer_url,
floor=data.get("floor"),
latitude=data.get("latitude"),
longitude=data.get("longitude")
)
action = "registered" if is_new else "updated"
print(f"[Sync] Peer {action}: {peer_id} at {peer_url}")
# Auto-register back with the peer (mutual registration)
peer_sync = app.config.get("PEER_SYNC")
if peer_sync and is_new:
try:
peer_sync.register_with_peer(peer_url)
print(f"[Sync] Registered back with peer {peer_id}")
except Exception as e:
print(f"[Sync] Failed to register back with {peer_id}: {e}")
return jsonify({
"status": action,
"this_scanner": app.config["SCANNER_IDENTITY"],
"known_peers": db.get_peers()
})
@app.route("/api/peers/<scanner_id>", methods=["DELETE"])
def api_remove_peer(scanner_id: str):
"""Remove a peer scanner"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
removed = db.remove_peer(scanner_id)
if removed:
print(f"[Sync] Peer removed: {scanner_id}")
return jsonify({"status": "removed", "scanner_id": scanner_id})
else:
return jsonify({"error": "Peer not found"}), 404
@app.route("/api/sync/devices", methods=["GET"])
def api_sync_devices_get():
"""Get devices for sync (called by peers)"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
since = request.args.get("since")
devices = db.get_devices_since(since)
return jsonify({
"scanner_id": app.config["SCANNER_IDENTITY"]["id"],
"timestamp": datetime.now().isoformat(),
"devices": devices
})
@app.route("/api/sync/devices", methods=["POST"])
def api_sync_devices_post():
"""Receive device updates from a peer"""
db = app.config.get("DATABASE")
if not db:
return jsonify({"error": "Database not enabled"}), 503
data = request.get_json() or {}
devices = data.get("devices", [])
source_scanner = data.get("source_scanner", "unknown")
updated = db.bulk_update_devices(devices, source_scanner)
return jsonify({
"status": "synced",
"updated": updated,
"received": len(devices)
})
@app.route("/api/sync/trigger", methods=["POST"])
def api_sync_trigger():
"""Manually trigger a sync with all peers"""
peer_sync = app.config.get("PEER_SYNC")
if not peer_sync:
return jsonify({"error": "Peer sync not enabled"}), 503
db = app.config.get("DATABASE")
peers = db.get_peers() if db else []
results = []
for peer in peers:
peer_id = peer["scanner_id"]
peer_url = peer["url"]
try:
updated = peer_sync.sync_devices_from_peer(peer_url)
peer_sync.push_devices_to_peer(peer_url)
results.append({
"peer_id": peer_id,
"status": "success",
"devices_updated": updated
})
except Exception as e:
results.append({
"peer_id": peer_id,
"status": "error",
"error": str(e)
})
return jsonify({
"status": "completed",
"results": results
})
return app
def run_server(
host: str | None = None,
port: int | None = None,
debug: bool | None = None,
config: Config | None = None,
profile_requests: bool = False,
log_requests: bool = False
):
"""Run the Flask development server"""
if config is None:
config = get_config()
# Use config values as defaults, allow overrides
host = host or config.web.host
port = port or config.web.port
debug = debug if debug is not None else config.web.debug
app = create_app(config)
# Add request profiling middleware if enabled
if profile_requests:
from ..profiling import add_profiler_middleware
profile_dir = config.get_data_dir() / "profiles"
add_profiler_middleware(app, profile_dir)
# Add request logging if enabled
if log_requests:
from ..profiling import add_request_logging_middleware
log_dir = config.get_data_dir() / "logs"
add_request_logging_middleware(app, log_dir)
scanner_identity = config.get_scanner_identity()
print(f"\n{'='*60}")
print("RF Mapper Web Interface")
print(f"{'='*60}")
print(f"Scanner ID: {scanner_identity['id']}")
print(f"Scanner Name: {scanner_identity['name']}")
print(f"Scanner Floor: {scanner_identity['floor']}")
print(f"Config file: {config._config_path}")
print(f"GPS Position: {scanner_identity['latitude']}, {scanner_identity['longitude']}")
print(f"Data directory: {config.get_data_dir()}")
if config.auto_scan.enabled:
print(f"Auto-scan: ENABLED (every {config.auto_scan.interval_minutes} min)")
else:
print(f"Auto-scan: disabled")
if profile_requests:
print(f"Request profiling: ENABLED")
print(f"Profile output: {config.get_data_dir() / 'profiles'}")
if log_requests:
print(f"Request logging: ENABLED")
print(f"Log output: {config.get_data_dir() / 'logs'}")
print(f"{'='*60}")
print(f"Server running at: http://{host}:{port}")
print(f"Local access: http://localhost:{port}")
print(f"Network access: http://<your-ip>:{port}")
print(f"{'='*60}\n")
app.run(host=host, port=port, debug=debug)