diff --git a/config.yaml b/config.yaml index 3c68797..0943220 100644 --- a/config.yaml +++ b/config.yaml @@ -1,70 +1,55 @@ -# RF Mapper Configuration -# ======================== - -# GPS Reference Position -# Set this to your Home Assistant or scanner location gps: - # Belgium (Brussels area) - Update with your actual coordinates - latitude: 50.8585853 - longitude: 4.3978724 - # You can get precise coordinates from Home Assistant at: - # http://192.168.129.10:8123/config/zone - -# Web Server Settings + latitude: 50.85852913116932 + longitude: 4.3976058563598315 web: - host: "0.0.0.0" + host: 0.0.0.0 port: 5000 debug: false - -# Scanner Settings scanner: - # WiFi interface name - wifi_interface: "wlan0" - # Bluetooth scan timeout in seconds + id: '' + name: '' + latitude: null + longitude: null + floor: null + wifi_interface: wlan0 bt_scan_timeout: 10 - # Path loss exponent for distance estimation - # 2.0 = free space, 2.5 = light indoor, 3.0-4.0 = walls path_loss_exponent: 2.5 - # Automatically identify Bluetooth devices (queries device services) - auto_identify_bluetooth: true - -# Data Storage + wifi_tx_power: -59 + bt_tx_power: -72 + # Peer sync settings + peers: [] # List of peer scanner URLs (auto-populated via registration) + sync_interval_seconds: 30 # How often to sync with peers (0 = disabled) + accept_registrations: true # Allow other scanners to register with this one data: - # Directory for scan data (relative to project root or absolute path) - directory: "data" - # Maximum number of scans to keep (0 = unlimited) + directory: data max_scans: 100 - -# SQLite Database for Device History database: - # Enable historical tracking enabled: true - # Database filename (stored in data directory) - filename: "devices.db" - # Data retention period in days (auto-cleanup older data) + filename: devices.db retention_days: 30 - # Enable automatic daily cleanup auto_cleanup: true - -# Home Assistant Integration (optional) home_assistant: - enabled: false - url: "http://192.168.129.10:8123" - # Token can be set here or via HA_TOKEN environment variable - # Generate at: http://192.168.129.10:8123/profile -> Long-Lived Access Tokens - token: "" - -# Building Configuration (for 3D map view) -building: - # Enable 3D building visualization enabled: true - # Building name (displayed in UI) - name: "Home" - # Number of floors in the building + url: http://192.168.129.10:8123 + token: '' + webhook_scan: rf_mapper_scan + webhook_new_device: rf_mapper_new_device + webhook_device_gone: rf_mapper_device_gone + device_timeout_minutes: 5 +profiling: + enabled: false + cpu: true + memory: false + output_dir: data/profiles + sort_by: cumtime +auto_scan: + enabled: false + interval_minutes: 5 + location_label: auto_scan +building: + enabled: true + name: Home floors: 12 - # Height of each floor in meters floor_height_m: 3.0 - # Ground floor number (0 in most countries, 1 in US) ground_floor_number: 0 - # Scanner's current floor (devices scanned will be assigned to this floor) current_floor: 11 diff --git a/src/rf_mapper/config.py b/src/rf_mapper/config.py index 00826a5..d69b509 100644 --- a/src/rf_mapper/config.py +++ b/src/rf_mapper/config.py @@ -22,6 +22,14 @@ class WebConfig: @dataclass class ScannerConfig: + # Identity fields (for multi-scanner support) + id: str = "" # Unique scanner ID (auto-generated from hostname if empty) + name: str = "" # Human-readable display name (optional) + latitude: float | None = None # Scanner position (falls back to gps.latitude) + longitude: float | None = None # Scanner position (falls back to gps.longitude) + floor: int | None = None # Scanner's floor (falls back to building.current_floor) + + # Scanning configuration wifi_interface: str = "wlan0" bt_scan_timeout: int = 10 path_loss_exponent: float = 2.5 @@ -29,6 +37,11 @@ class ScannerConfig: bt_tx_power: float = -72 # Calibrated TX power at 1m for Bluetooth (dBm) auto_identify_bluetooth: bool = True + # Peer sync configuration + peers: list = field(default_factory=list) # List of peer scanner URLs + sync_interval_seconds: int = 30 # How often to sync with peers + accept_registrations: bool = True # Allow other scanners to register + @dataclass class DataConfig: @@ -49,6 +62,10 @@ class HomeAssistantConfig: enabled: bool = False url: str = "http://192.168.129.10:8123" token: str = "" + webhook_scan: str = "rf_mapper_scan" + webhook_new_device: str = "rf_mapper_new_device" + webhook_device_gone: str = "rf_mapper_device_gone" + device_timeout_minutes: int = 5 @dataclass @@ -156,12 +173,23 @@ class Config: # Scanner if "scanner" in data: config.scanner = ScannerConfig( + # Identity fields + id=data["scanner"].get("id", config.scanner.id), + name=data["scanner"].get("name", config.scanner.name), + latitude=data["scanner"].get("latitude", config.scanner.latitude), + longitude=data["scanner"].get("longitude", config.scanner.longitude), + floor=data["scanner"].get("floor", config.scanner.floor), + # Scanning configuration wifi_interface=data["scanner"].get("wifi_interface", config.scanner.wifi_interface), bt_scan_timeout=data["scanner"].get("bt_scan_timeout", config.scanner.bt_scan_timeout), path_loss_exponent=data["scanner"].get("path_loss_exponent", config.scanner.path_loss_exponent), wifi_tx_power=data["scanner"].get("wifi_tx_power", config.scanner.wifi_tx_power), bt_tx_power=data["scanner"].get("bt_tx_power", config.scanner.bt_tx_power), - auto_identify_bluetooth=data["scanner"].get("auto_identify_bluetooth", config.scanner.auto_identify_bluetooth) + auto_identify_bluetooth=data["scanner"].get("auto_identify_bluetooth", config.scanner.auto_identify_bluetooth), + # Peer sync configuration + peers=data["scanner"].get("peers", config.scanner.peers), + sync_interval_seconds=data["scanner"].get("sync_interval_seconds", config.scanner.sync_interval_seconds), + accept_registrations=data["scanner"].get("accept_registrations", config.scanner.accept_registrations) ) # Data @@ -185,7 +213,11 @@ class Config: config.home_assistant = HomeAssistantConfig( enabled=data["home_assistant"].get("enabled", config.home_assistant.enabled), url=data["home_assistant"].get("url", config.home_assistant.url), - token=data["home_assistant"].get("token", config.home_assistant.token) + token=data["home_assistant"].get("token", config.home_assistant.token), + webhook_scan=data["home_assistant"].get("webhook_scan", config.home_assistant.webhook_scan), + webhook_new_device=data["home_assistant"].get("webhook_new_device", config.home_assistant.webhook_new_device), + webhook_device_gone=data["home_assistant"].get("webhook_device_gone", config.home_assistant.webhook_device_gone), + device_timeout_minutes=data["home_assistant"].get("device_timeout_minutes", config.home_assistant.device_timeout_minutes) ) # Profiling @@ -254,6 +286,27 @@ class Config: """Get the database file path""" return self.get_data_dir() / self.database.filename + def get_scanner_identity(self) -> dict: + """Get scanner identity dict for webhooks and multi-scanner support. + + Returns dict with: + - id: Unique scanner ID (from config or auto-generated from hostname) + - name: Display name (from config or same as id) + - latitude: Scanner position (from scanner config or gps config) + - longitude: Scanner position (from scanner config or gps config) + - floor: Scanner's floor (from scanner config or building config) + """ + import socket + + scanner_id = self.scanner.id or socket.gethostname() + return { + "id": scanner_id, + "name": self.scanner.name or scanner_id, + "latitude": self.scanner.latitude if self.scanner.latitude is not None else self.gps.latitude, + "longitude": self.scanner.longitude if self.scanner.longitude is not None else self.gps.longitude, + "floor": self.scanner.floor if self.scanner.floor is not None else self.building.current_floor + } + def save(self, path: Path | None = None): """Save current configuration to file""" save_path = path or self._config_path @@ -271,11 +324,22 @@ class Config: "debug": self.web.debug }, "scanner": { + # Identity fields + "id": self.scanner.id, + "name": self.scanner.name, + "latitude": self.scanner.latitude, + "longitude": self.scanner.longitude, + "floor": self.scanner.floor, + # Scanning configuration "wifi_interface": self.scanner.wifi_interface, "bt_scan_timeout": self.scanner.bt_scan_timeout, "path_loss_exponent": self.scanner.path_loss_exponent, "wifi_tx_power": self.scanner.wifi_tx_power, - "bt_tx_power": self.scanner.bt_tx_power + "bt_tx_power": self.scanner.bt_tx_power, + # Peer sync configuration + "peers": self.scanner.peers, + "sync_interval_seconds": self.scanner.sync_interval_seconds, + "accept_registrations": self.scanner.accept_registrations }, "data": { "directory": self.data.directory, @@ -290,7 +354,11 @@ class Config: "home_assistant": { "enabled": self.home_assistant.enabled, "url": self.home_assistant.url, - "token": self.home_assistant.token + "token": self.home_assistant.token, + "webhook_scan": self.home_assistant.webhook_scan, + "webhook_new_device": self.home_assistant.webhook_new_device, + "webhook_device_gone": self.home_assistant.webhook_device_gone, + "device_timeout_minutes": self.home_assistant.device_timeout_minutes }, "profiling": { "enabled": self.profiling.enabled, diff --git a/src/rf_mapper/database.py b/src/rf_mapper/database.py index 5e541e3..262a3f7 100644 --- a/src/rf_mapper/database.py +++ b/src/rf_mapper/database.py @@ -167,26 +167,102 @@ class DeviceDatabase: except sqlite3.OperationalError: pass # Column already exists + # Add custom position columns for manual position override (migration) + try: + cursor.execute("ALTER TABLE devices ADD COLUMN custom_lat_offset REAL") + except sqlite3.OperationalError: + pass # Column already exists + + try: + cursor.execute("ALTER TABLE devices ADD COLUMN custom_lon_offset REAL") + except sqlite3.OperationalError: + pass # Column already exists + + # Add departure_notified column for HA integration (migration) + try: + cursor.execute("ALTER TABLE devices ADD COLUMN departure_notified INTEGER DEFAULT 0") + except sqlite3.OperationalError: + pass # Column already exists + + # Add scanner_id column to scans table for multi-scanner support (migration) + try: + cursor.execute("ALTER TABLE scans ADD COLUMN scanner_id TEXT") + except sqlite3.OperationalError: + pass # Column already exists + + # Add scanner_id column to rssi_history for multi-scanner support (migration) + try: + cursor.execute("ALTER TABLE rssi_history ADD COLUMN scanner_id TEXT") + except sqlite3.OperationalError: + pass # Column already exists + + # Peers table - known scanner peers for sync + cursor.execute(""" + CREATE TABLE IF NOT EXISTS peers ( + scanner_id TEXT PRIMARY KEY, + name TEXT, + url TEXT NOT NULL, + floor INTEGER, + latitude REAL, + longitude REAL, + last_seen TEXT, + registered_at TEXT + ) + """) + + # Add notes column to devices table if missing (for sync) + try: + cursor.execute("ALTER TABLE devices ADD COLUMN notes TEXT") + except sqlite3.OperationalError: + pass # Column already exists + conn.commit() def record_scan(self, scan_id: str, timestamp: str, location_label: str, - lat: float, lon: float, wifi_count: int, bt_count: int): - """Record a scan event""" + lat: float, lon: float, wifi_count: int, bt_count: int, + scanner_id: Optional[str] = None): + """Record a scan event + + Args: + scan_id: Unique identifier for this scan + timestamp: ISO timestamp of the scan + location_label: User-defined location label + lat: Latitude of scan location + lon: Longitude of scan location + wifi_count: Number of WiFi networks detected + bt_count: Number of Bluetooth devices detected + scanner_id: ID of the scanner that performed this scan + """ conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" - INSERT OR REPLACE INTO scans (scan_id, timestamp, location_label, lat, lon, wifi_count, bt_count) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, (scan_id, timestamp, location_label, lat, lon, wifi_count, bt_count)) + INSERT OR REPLACE INTO scans (scan_id, timestamp, location_label, lat, lon, wifi_count, bt_count, scanner_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, (scan_id, timestamp, location_label, lat, lon, wifi_count, bt_count, scanner_id)) conn.commit() def record_wifi_observation(self, bssid: str, ssid: str, rssi: int, distance_m: float, channel: int, frequency: int, encryption: str, manufacturer: str, floor: Optional[int] = None, - scan_id: Optional[str] = None): - """Record a WiFi network observation""" + scan_id: Optional[str] = None, + scanner_id: Optional[str] = None): + """Record a WiFi network observation + + Args: + bssid: MAC address of the WiFi network + ssid: Network name + rssi: Signal strength in dBm + distance_m: Estimated distance in meters + channel: WiFi channel + frequency: Frequency in MHz + encryption: Encryption type + manufacturer: Manufacturer from OUI lookup + floor: Floor where the device was detected + scan_id: ID of the scan this observation belongs to + scanner_id: ID of the scanner that detected this network + """ conn = self._get_connection() cursor = conn.cursor() timestamp = datetime.now().isoformat() @@ -209,9 +285,9 @@ class DeviceDatabase: # Insert RSSI observation cursor.execute(""" - INSERT INTO rssi_history (device_id, timestamp, rssi, distance_m, floor, scan_id) - VALUES (?, ?, ?, ?, ?, ?) - """, (bssid, timestamp, rssi, distance_m, floor, scan_id)) + INSERT INTO rssi_history (device_id, timestamp, rssi, distance_m, floor, scan_id, scanner_id) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, (bssid, timestamp, rssi, distance_m, floor, scan_id, scanner_id)) conn.commit() @@ -223,8 +299,22 @@ class DeviceDatabase: def record_bluetooth_observation(self, address: str, name: str, rssi: int, distance_m: float, device_class: str, device_type: str, manufacturer: str, - floor: Optional[int] = None, scan_id: Optional[str] = None): - """Record a Bluetooth device observation""" + floor: Optional[int] = None, scan_id: Optional[str] = None, + scanner_id: Optional[str] = None): + """Record a Bluetooth device observation + + Args: + address: MAC address of the Bluetooth device + name: Device name + rssi: Signal strength in dBm + distance_m: Estimated distance in meters + device_class: Bluetooth device class + device_type: Inferred device type (Phone, Headphones, etc.) + manufacturer: Manufacturer from OUI lookup + floor: Floor where the device was detected + scan_id: ID of the scan this observation belongs to + scanner_id: ID of the scanner that detected this device + """ conn = self._get_connection() cursor = conn.cursor() timestamp = datetime.now().isoformat() @@ -252,9 +342,9 @@ class DeviceDatabase: # Insert RSSI observation cursor.execute(""" - INSERT INTO rssi_history (device_id, timestamp, rssi, distance_m, floor, scan_id) - VALUES (?, ?, ?, ?, ?, ?) - """, (address, timestamp, rssi, distance_m, floor, scan_id)) + INSERT INTO rssi_history (device_id, timestamp, rssi, distance_m, floor, scan_id, scanner_id) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, (address, timestamp, rssi, distance_m, floor, scan_id, scanner_id)) conn.commit() @@ -498,6 +588,103 @@ class DeviceDatabase: cursor.execute("SELECT device_id, assigned_floor FROM devices WHERE assigned_floor IS NOT NULL") return {row['device_id']: row['assigned_floor'] for row in cursor.fetchall()} + def set_device_position(self, device_id: str, lat_offset: float, lon_offset: float): + """Set custom position offset for a device (relative to scanner position)""" + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute(""" + UPDATE devices SET custom_lat_offset = ?, custom_lon_offset = ?, updated_at = CURRENT_TIMESTAMP + WHERE device_id = ? + """, (lat_offset, lon_offset, device_id)) + conn.commit() + + def get_device_position(self, device_id: str) -> tuple | None: + """Get custom position offset for a device, returns (lat_offset, lon_offset) or None""" + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute( + "SELECT custom_lat_offset, custom_lon_offset FROM devices WHERE device_id = ?", + (device_id,) + ) + row = cursor.fetchone() + if row and row['custom_lat_offset'] is not None and row['custom_lon_offset'] is not None: + return (row['custom_lat_offset'], row['custom_lon_offset']) + return None + + def get_all_device_positions(self) -> dict: + """Get all device position offsets as a dict: {device_id: {lat_offset, lon_offset}}""" + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute(""" + SELECT device_id, custom_lat_offset, custom_lon_offset + FROM devices + WHERE custom_lat_offset IS NOT NULL AND custom_lon_offset IS NOT NULL + """) + return { + row['device_id']: { + 'lat_offset': row['custom_lat_offset'], + 'lon_offset': row['custom_lon_offset'] + } + for row in cursor.fetchall() + } + + def clear_device_position(self, device_id: str): + """Clear custom position for a device (reset to RSSI-based)""" + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute(""" + UPDATE devices SET custom_lat_offset = NULL, custom_lon_offset = NULL, updated_at = CURRENT_TIMESTAMP + WHERE device_id = ? + """, (device_id,)) + conn.commit() + + def get_recently_departed(self, timeout_minutes: int) -> list[dict]: + """Get devices not seen within timeout that haven't been notified. + + Args: + timeout_minutes: Minutes since last_seen to consider departed + + Returns: + List of device dicts that have departed but not yet notified + """ + conn = self._get_connection() + cursor = conn.cursor() + + cutoff = (datetime.now() - timedelta(minutes=timeout_minutes)).isoformat() + cursor.execute(""" + SELECT device_id, device_type, name, ssid, manufacturer, last_seen + FROM devices + WHERE last_seen < ? AND (departure_notified = 0 OR departure_notified IS NULL) + """, (cutoff,)) + + return [dict(row) for row in cursor.fetchall()] + + def mark_departure_notified(self, device_id: str): + """Mark device as notified about departure.""" + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute(""" + UPDATE devices SET departure_notified = 1, updated_at = CURRENT_TIMESTAMP + WHERE device_id = ? + """, (device_id,)) + conn.commit() + + def reset_departure_notified(self, device_id: str): + """Reset departure notification flag when device returns.""" + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute(""" + UPDATE devices SET departure_notified = 0, updated_at = CURRENT_TIMESTAMP + WHERE device_id = ? + """, (device_id,)) + conn.commit() + def get_recent_activity(self, hours: int = 24) -> dict: """Get activity summary for the last N hours""" conn = self._get_connection() @@ -650,6 +837,208 @@ class DeviceDatabase: "database_size_mb": round(db_size / 1024 / 1024, 2) } + # ==================== Peer Sync Methods ==================== + + def register_peer(self, scanner_id: str, name: str, url: str, + floor: Optional[int] = None, latitude: Optional[float] = None, + longitude: Optional[float] = None) -> bool: + """Register a peer scanner. + + Args: + scanner_id: Unique identifier for the peer scanner + name: Human-readable name + url: Base URL of the peer (e.g., http://192.168.129.9:5000) + floor: Floor where peer scanner is located + latitude: GPS latitude of peer + longitude: GPS longitude of peer + + Returns: + True if newly registered, False if updated existing + """ + conn = self._get_connection() + cursor = conn.cursor() + timestamp = datetime.now().isoformat() + + # Check if peer already exists + cursor.execute("SELECT scanner_id FROM peers WHERE scanner_id = ?", (scanner_id,)) + exists = cursor.fetchone() is not None + + cursor.execute(""" + INSERT INTO peers (scanner_id, name, url, floor, latitude, longitude, last_seen, registered_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(scanner_id) DO UPDATE SET + name = excluded.name, + url = excluded.url, + floor = excluded.floor, + latitude = excluded.latitude, + longitude = excluded.longitude, + last_seen = excluded.last_seen + """, (scanner_id, name, url, floor, latitude, longitude, timestamp, timestamp)) + + conn.commit() + return not exists + + def get_peers(self) -> list[dict]: + """Get all registered peers. + + Returns: + List of peer dictionaries with scanner_id, name, url, floor, etc. + """ + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute("SELECT * FROM peers ORDER BY registered_at") + return [dict(row) for row in cursor.fetchall()] + + def get_peer(self, scanner_id: str) -> Optional[dict]: + """Get a specific peer by scanner_id.""" + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute("SELECT * FROM peers WHERE scanner_id = ?", (scanner_id,)) + row = cursor.fetchone() + return dict(row) if row else None + + def remove_peer(self, scanner_id: str) -> bool: + """Remove a peer scanner. + + Returns: + True if peer was removed, False if not found + """ + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute("DELETE FROM peers WHERE scanner_id = ?", (scanner_id,)) + conn.commit() + return cursor.rowcount > 0 + + def update_peer_last_seen(self, scanner_id: str): + """Update the last_seen timestamp for a peer.""" + conn = self._get_connection() + cursor = conn.cursor() + timestamp = datetime.now().isoformat() + + cursor.execute( + "UPDATE peers SET last_seen = ? WHERE scanner_id = ?", + (timestamp, scanner_id) + ) + conn.commit() + + def get_devices_since(self, since: Optional[str] = None) -> list[dict]: + """Get devices updated since a given timestamp for sync. + + Args: + since: ISO timestamp. If None, returns all devices with sync-relevant data. + + Returns: + List of device dicts with sync-relevant fields + """ + conn = self._get_connection() + cursor = conn.cursor() + + query = """ + SELECT device_id, device_type, name, ssid, manufacturer, + custom_label, assigned_floor, custom_lat_offset, custom_lon_offset, + is_favorite, notes, updated_at + FROM devices + WHERE (custom_label IS NOT NULL OR assigned_floor IS NOT NULL + OR custom_lat_offset IS NOT NULL OR is_favorite = 1 OR notes IS NOT NULL) + """ + params = [] + + if since: + query += " AND updated_at > ?" + params.append(since) + + query += " ORDER BY updated_at" + + cursor.execute(query, params) + return [dict(row) for row in cursor.fetchall()] + + def bulk_update_devices(self, devices: list[dict], source_scanner: str) -> int: + """Bulk update device metadata from peer sync. + + Uses timestamp-based conflict resolution: newer wins. + Only updates non-null fields from peer. + + Args: + devices: List of device dicts from peer + source_scanner: Scanner ID that sent the update + + Returns: + Number of devices updated + """ + conn = self._get_connection() + cursor = conn.cursor() + updated_count = 0 + + for dev in devices: + device_id = dev.get("device_id") + if not device_id: + continue + + # Get existing device + cursor.execute( + "SELECT updated_at FROM devices WHERE device_id = ?", + (device_id,) + ) + existing = cursor.fetchone() + + if existing: + # Check timestamp - skip if local is newer + local_updated = existing["updated_at"] or "" + peer_updated = dev.get("updated_at", "") + if local_updated > peer_updated: + continue # Local is newer, skip + + # Merge non-null fields from peer + updates = [] + params = [] + + if dev.get("custom_label") is not None: + updates.append("custom_label = ?") + params.append(dev["custom_label"]) + + if dev.get("assigned_floor") is not None: + updates.append("assigned_floor = ?") + params.append(dev["assigned_floor"]) + + if dev.get("custom_lat_offset") is not None: + updates.append("custom_lat_offset = ?") + params.append(dev["custom_lat_offset"]) + + if dev.get("custom_lon_offset") is not None: + updates.append("custom_lon_offset = ?") + params.append(dev["custom_lon_offset"]) + + if dev.get("is_favorite") is not None: + updates.append("is_favorite = ?") + params.append(1 if dev["is_favorite"] else 0) + + if dev.get("notes") is not None: + updates.append("notes = ?") + params.append(dev["notes"]) + + if updates: + # Keep the peer's updated_at to preserve timeline + updates.append("updated_at = ?") + params.append(peer_updated) + params.append(device_id) + + cursor.execute( + f"UPDATE devices SET {', '.join(updates)} WHERE device_id = ?", + params + ) + if cursor.rowcount > 0: + updated_count += 1 + else: + # Device doesn't exist locally - we can only sync metadata for + # devices we've seen, so skip unknown devices + pass + + conn.commit() + return updated_count + def close(self): """Close database connection""" if hasattr(self._local, 'conn') and self._local.conn: diff --git a/src/rf_mapper/sync.py b/src/rf_mapper/sync.py new file mode 100644 index 0000000..2c65b1e --- /dev/null +++ b/src/rf_mapper/sync.py @@ -0,0 +1,208 @@ +"""Peer synchronization for RF Mapper multi-scanner deployments""" + +import socket +import threading +import time +from datetime import datetime +from typing import Optional + +import requests + +from .config import Config +from .database import DeviceDatabase + + +def get_local_ip() -> str: + """Get the local IP address of this machine.""" + try: + # Create a socket and connect to an external address + # This doesn't actually send data, just determines the local IP + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) + ip = s.getsockname()[0] + s.close() + return ip + except Exception: + return "127.0.0.1" + + +class PeerSync: + """Manages peer discovery and device metadata synchronization. + + Handles: + - Registration with peer scanners + - Background sync thread for pulling/pushing device metadata + - Conflict resolution using timestamps + """ + + def __init__(self, config: Config, db: DeviceDatabase): + """Initialize peer sync manager. + + Args: + config: RF Mapper configuration + db: Device database instance + """ + self.config = config + self.db = db + self.scanner_identity = config.get_scanner_identity() + self._running = False + self._thread: Optional[threading.Thread] = None + self._last_sync: dict[str, str] = {} # peer_id -> last sync timestamp + + @property + def local_url(self) -> str: + """Get this scanner's URL for peer registration.""" + return f"http://{get_local_ip()}:{self.config.web.port}" + + def start(self): + """Start the background sync thread.""" + if self._thread and self._thread.is_alive(): + return # Already running + + self._running = True + self._thread = threading.Thread(target=self._sync_loop, daemon=True) + self._thread.start() + print(f"[Sync] Background sync started (interval: {self.config.scanner.sync_interval_seconds}s)") + + def stop(self): + """Stop the background sync thread.""" + self._running = False + if self._thread: + self._thread.join(timeout=5) + self._thread = None + print("[Sync] Background sync stopped") + + def register_with_peer(self, peer_url: str) -> dict: + """Register this scanner with a peer. + + Args: + peer_url: Base URL of the peer scanner + + Returns: + Response from peer containing peer info and known peers + + Raises: + requests.RequestException on network errors + """ + payload = { + "id": self.scanner_identity["id"], + "name": self.scanner_identity["name"], + "url": self.local_url, + "floor": self.scanner_identity["floor"], + "latitude": self.scanner_identity["latitude"], + "longitude": self.scanner_identity["longitude"] + } + + resp = requests.post( + f"{peer_url.rstrip('/')}/api/peers/register", + json=payload, + timeout=10 + ) + resp.raise_for_status() + return resp.json() + + def sync_devices_from_peer(self, peer_url: str, since: Optional[str] = None) -> int: + """Pull device updates from a peer. + + Args: + peer_url: Base URL of the peer scanner + since: ISO timestamp to get updates since (None = all) + + Returns: + Number of devices updated locally + """ + params = {"since": since} if since else {} + + resp = requests.get( + f"{peer_url.rstrip('/')}/api/sync/devices", + params=params, + timeout=15 + ) + resp.raise_for_status() + data = resp.json() + + devices = data.get("devices", []) + source_scanner = data.get("scanner_id", "unknown") + + updated = self.db.bulk_update_devices(devices, source_scanner) + return updated + + def push_devices_to_peer(self, peer_url: str, since: Optional[str] = None) -> dict: + """Push device updates to a peer. + + Args: + peer_url: Base URL of the peer scanner + since: ISO timestamp to send updates since (None = all) + + Returns: + Response from peer with sync status + """ + devices = self.db.get_devices_since(since) + + payload = { + "source_scanner": self.scanner_identity["id"], + "devices": devices + } + + resp = requests.post( + f"{peer_url.rstrip('/')}/api/sync/devices", + json=payload, + timeout=15 + ) + resp.raise_for_status() + return resp.json() + + def _sync_loop(self): + """Background sync loop - runs every sync_interval_seconds.""" + # Initial delay to let app fully start + time.sleep(5) + + while self._running: + peers = self.db.get_peers() + + for peer in peers: + peer_id = peer["scanner_id"] + peer_url = peer["url"] + + try: + # Pull updates from peer + since = self._last_sync.get(peer_id) + updated = self.sync_devices_from_peer(peer_url, since) + + # Push our updates to peer + self.push_devices_to_peer(peer_url, since) + + # Update last sync time and peer last_seen + self._last_sync[peer_id] = datetime.now().isoformat() + self.db.update_peer_last_seen(peer_id) + + if updated > 0: + print(f"[Sync] Synced with {peer_id}: updated {updated} devices") + + except requests.exceptions.ConnectionError: + print(f"[Sync] Peer {peer_id} unreachable at {peer_url}") + except requests.exceptions.Timeout: + print(f"[Sync] Sync with {peer_id} timed out") + except Exception as e: + print(f"[Sync] Error syncing with {peer_id}: {e}") + + # Wait for next sync interval + for _ in range(self.config.scanner.sync_interval_seconds): + if not self._running: + break + time.sleep(1) + + def get_status(self) -> dict: + """Get current sync status. + + Returns: + Dict with running state, peer count, last sync times + """ + peers = self.db.get_peers() + return { + "running": self._running, + "sync_interval_seconds": self.config.scanner.sync_interval_seconds, + "peer_count": len(peers), + "last_sync": self._last_sync.copy(), + "local_url": self.local_url + } diff --git a/src/rf_mapper/web/app.py b/src/rf_mapper/web/app.py index ae879ff..6790381 100644 --- a/src/rf_mapper/web/app.py +++ b/src/rf_mapper/web/app.py @@ -14,6 +14,7 @@ from ..distance import estimate_distance from ..config import Config, get_config from ..bluetooth_identify import identify_single_device, identify_device from ..database import DeviceDatabase, init_database, get_database +from ..homeassistant import HAWebhooks, HAWebhookConfig class AutoScanner: @@ -251,6 +252,67 @@ def create_app(config: Config | None = None) -> Flask: else: app.config["DATABASE"] = None + # Initialize peer sync if enabled + if config.database.enabled and config.scanner.sync_interval_seconds > 0: + from ..sync import PeerSync + peer_sync = PeerSync(config, db) + app.config["PEER_SYNC"] = peer_sync + peer_sync.start() + print(f"[Sync] Peer sync enabled (interval: {config.scanner.sync_interval_seconds}s)") + else: + app.config["PEER_SYNC"] = None + + # Initialize Home Assistant webhooks if enabled + ha_webhook_config = HAWebhookConfig( + enabled=config.home_assistant.enabled, + url=config.home_assistant.url, + webhook_scan=config.home_assistant.webhook_scan, + webhook_new_device=config.home_assistant.webhook_new_device, + webhook_device_gone=config.home_assistant.webhook_device_gone, + device_timeout_minutes=config.home_assistant.device_timeout_minutes + ) + ha_webhooks = HAWebhooks(ha_webhook_config) + app.config["HA_WEBHOOKS"] = ha_webhooks + + # Build and store scanner identity for webhooks + scanner_identity = config.get_scanner_identity() + app.config["SCANNER_IDENTITY"] = scanner_identity + + if config.home_assistant.enabled: + print(f"[Home Assistant] Webhooks enabled -> {config.home_assistant.url}") + print(f"[Scanner] ID: {scanner_identity['id']} @ floor {scanner_identity['floor']}") + + # Start absence checker thread if database is enabled + if config.database.enabled: + def absence_checker(): + """Background thread to detect departed devices.""" + while True: + time.sleep(60) # Check every minute + try: + db = app.config.get("DATABASE") + scanner_identity = app.config.get("SCANNER_IDENTITY", {}) + if db and ha_webhooks.config.enabled: + departed = db.get_recently_departed( + config.home_assistant.device_timeout_minutes + ) + for device in departed: + name = device.get("custom_label") or device.get("name") or device.get("ssid") or device["device_id"] + ha_webhooks.send_device_gone( + device_id=device["device_id"], + name=name, + last_seen=device["last_seen"], + device_type=device["device_type"], + last_scanner=scanner_identity + ) + db.mark_departure_notified(device["device_id"]) + print(f"[HA Webhook] Device departed: {name}") + except Exception as e: + print(f"[HA Webhook] Absence checker error: {e}") + + absence_thread = threading.Thread(target=absence_checker, daemon=True) + absence_thread.start() + print(f"[Home Assistant] Absence checker started (timeout: {config.home_assistant.device_timeout_minutes} min)") + # Start auto-scanner if enabled in config if config.auto_scan.enabled: auto_scanner.start( @@ -810,11 +872,66 @@ def create_app(config: Config | None = None) -> Flask: @app.route("/api/device/floors", methods=["GET"]) def api_device_floors(): - """Get all saved floor assignments""" + """Get all saved floor assignments and position offsets""" db = app.config.get("DATABASE") if not db: - return jsonify({}) - return jsonify(db.get_all_device_floors()) + return jsonify({"floors": {}, "positions": {}}) + + floors = db.get_all_device_floors() + positions = db.get_all_device_positions() + + return jsonify({ + "floors": floors, + "positions": positions + }) + + @app.route("/api/device//position", methods=["POST"]) + def api_device_position(device_id: str): + """Set manual position for a floor-assigned device""" + db = app.config.get("DATABASE") + if not db: + return jsonify({"error": "Database not enabled"}), 503 + + data = request.get_json() or {} + lat_offset = data.get("lat_offset") + lon_offset = data.get("lon_offset") + + # Check if device has an assigned floor (required for manual positioning) + device_floor = db.get_device_floor(device_id) + if device_floor is None: + return jsonify({ + "error": "Device must have an assigned floor before manual positioning" + }), 400 + + # If both offsets are None/null, clear the position + if lat_offset is None and lon_offset is None: + db.clear_device_position(device_id) + return jsonify({ + "status": "cleared", + "device_id": device_id, + "message": "Position reset to auto (RSSI-based)" + }) + + # Validate offsets + if lat_offset is None or lon_offset is None: + return jsonify({ + "error": "Both lat_offset and lon_offset are required" + }), 400 + + try: + lat_offset = float(lat_offset) + lon_offset = float(lon_offset) + except (TypeError, ValueError): + return jsonify({"error": "Invalid offset values"}), 400 + + db.set_device_position(device_id, lat_offset, lon_offset) + + return jsonify({ + "status": "updated", + "device_id": device_id, + "lat_offset": lat_offset, + "lon_offset": lon_offset + }) @app.route("/api/scan/bt", methods=["POST"]) def api_scan_bt(): @@ -881,6 +998,10 @@ def create_app(config: Config | None = None) -> Flask: # Record to database for historical tracking if db: + # Check if this is a new device (for HA webhook) + existing = db.get_device(addr) + is_new_device = existing is None + db.record_bluetooth_observation( address=addr, name=name, @@ -893,6 +1014,22 @@ def create_app(config: Config | None = None) -> Flask: scan_id=None # Live tracking, no scan_id ) + # Reset departure notification flag (device is present) + db.reset_departure_notified(addr) + + # Send new device webhook to HA + ha_webhooks = app.config.get("HA_WEBHOOKS") + scanner_identity = app.config.get("SCANNER_IDENTITY", {}) + if ha_webhooks and ha_webhooks.config.enabled and is_new_device: + ha_webhooks.send_new_device( + device_id=addr, + name=name, + device_type="bluetooth", + scanner=scanner_identity, + rssi=rssi, + distance_m=dist + ) + # Get saved floor from database saved_floor = saved_floors.get(addr) @@ -909,6 +1046,22 @@ def create_app(config: Config | None = None) -> Flask: "height_m": None }) + # Send scan results to Home Assistant + ha_webhooks = app.config.get("HA_WEBHOOKS") + scanner_identity = app.config.get("SCANNER_IDENTITY", {}) + if ha_webhooks and ha_webhooks.config.enabled and response_data["bluetooth_devices"]: + ha_webhooks.send_scan_results( + devices=[{ + "id": d["address"], + "name": d["name"], + "rssi": d["rssi"], + "distance": d["estimated_distance_m"], + "floor": d.get("floor") + } for d in response_data["bluetooth_devices"]], + scanner=scanner_identity, + scan_type="bluetooth" + ) + return jsonify(response_data) # ==================== Historical Data API ==================== @@ -1099,6 +1252,154 @@ def create_app(config: Config | None = None) -> Flask: result = db.cleanup_old_data(retention_days) return jsonify(result) + # ==================== Peer Sync API ==================== + + @app.route("/api/peers", methods=["GET"]) + def api_get_peers(): + """Get list of known peers and this scanner's identity""" + db = app.config.get("DATABASE") + if not db: + return jsonify({"error": "Database not enabled"}), 503 + + peers = db.get_peers() + peer_sync = app.config.get("PEER_SYNC") + + return jsonify({ + "this_scanner": app.config["SCANNER_IDENTITY"], + "peers": peers, + "sync_status": peer_sync.get_status() if peer_sync else None + }) + + @app.route("/api/peers/register", methods=["POST"]) + def api_register_peer(): + """Register a peer scanner (called by other scanners)""" + rf_config = app.config["RF_CONFIG"] + if not rf_config.scanner.accept_registrations: + return jsonify({"error": "Registration disabled on this scanner"}), 403 + + db = app.config.get("DATABASE") + if not db: + return jsonify({"error": "Database not enabled"}), 503 + + data = request.get_json() or {} + + # Validate required fields + peer_id = data.get("id") + peer_url = data.get("url") + if not peer_id or not peer_url: + return jsonify({"error": "Missing required fields: id, url"}), 400 + + # Register the peer + is_new = db.register_peer( + scanner_id=peer_id, + name=data.get("name", peer_id), + url=peer_url, + floor=data.get("floor"), + latitude=data.get("latitude"), + longitude=data.get("longitude") + ) + + action = "registered" if is_new else "updated" + print(f"[Sync] Peer {action}: {peer_id} at {peer_url}") + + # Auto-register back with the peer (mutual registration) + peer_sync = app.config.get("PEER_SYNC") + if peer_sync and is_new: + try: + peer_sync.register_with_peer(peer_url) + print(f"[Sync] Registered back with peer {peer_id}") + except Exception as e: + print(f"[Sync] Failed to register back with {peer_id}: {e}") + + return jsonify({ + "status": action, + "this_scanner": app.config["SCANNER_IDENTITY"], + "known_peers": db.get_peers() + }) + + @app.route("/api/peers/", methods=["DELETE"]) + def api_remove_peer(scanner_id: str): + """Remove a peer scanner""" + db = app.config.get("DATABASE") + if not db: + return jsonify({"error": "Database not enabled"}), 503 + + removed = db.remove_peer(scanner_id) + if removed: + print(f"[Sync] Peer removed: {scanner_id}") + return jsonify({"status": "removed", "scanner_id": scanner_id}) + else: + return jsonify({"error": "Peer not found"}), 404 + + @app.route("/api/sync/devices", methods=["GET"]) + def api_sync_devices_get(): + """Get devices for sync (called by peers)""" + db = app.config.get("DATABASE") + if not db: + return jsonify({"error": "Database not enabled"}), 503 + + since = request.args.get("since") + devices = db.get_devices_since(since) + + return jsonify({ + "scanner_id": app.config["SCANNER_IDENTITY"]["id"], + "timestamp": datetime.now().isoformat(), + "devices": devices + }) + + @app.route("/api/sync/devices", methods=["POST"]) + def api_sync_devices_post(): + """Receive device updates from a peer""" + db = app.config.get("DATABASE") + if not db: + return jsonify({"error": "Database not enabled"}), 503 + + data = request.get_json() or {} + devices = data.get("devices", []) + source_scanner = data.get("source_scanner", "unknown") + + updated = db.bulk_update_devices(devices, source_scanner) + + return jsonify({ + "status": "synced", + "updated": updated, + "received": len(devices) + }) + + @app.route("/api/sync/trigger", methods=["POST"]) + def api_sync_trigger(): + """Manually trigger a sync with all peers""" + peer_sync = app.config.get("PEER_SYNC") + if not peer_sync: + return jsonify({"error": "Peer sync not enabled"}), 503 + + db = app.config.get("DATABASE") + peers = db.get_peers() if db else [] + + results = [] + for peer in peers: + peer_id = peer["scanner_id"] + peer_url = peer["url"] + try: + updated = peer_sync.sync_devices_from_peer(peer_url) + peer_sync.push_devices_to_peer(peer_url) + results.append({ + "peer_id": peer_id, + "status": "success", + "devices_updated": updated + }) + except Exception as e: + results.append({ + "peer_id": peer_id, + "status": "error", + "error": str(e) + }) + + return jsonify({ + "status": "completed", + "results": results + }) + return app @@ -1133,11 +1434,16 @@ def run_server( log_dir = config.get_data_dir() / "logs" add_request_logging_middleware(app, log_dir) + scanner_identity = config.get_scanner_identity() + print(f"\n{'='*60}") print("RF Mapper Web Interface") print(f"{'='*60}") + print(f"Scanner ID: {scanner_identity['id']}") + print(f"Scanner Name: {scanner_identity['name']}") + print(f"Scanner Floor: {scanner_identity['floor']}") print(f"Config file: {config._config_path}") - print(f"GPS Position: {config.gps.latitude}, {config.gps.longitude}") + print(f"GPS Position: {scanner_identity['latitude']}, {scanner_identity['longitude']}") print(f"Data directory: {config.get_data_dir()}") if config.auto_scan.enabled: print(f"Auto-scan: ENABLED (every {config.auto_scan.interval_minutes} min)")