feat: add peer sync for multi-scanner deployments
Enable scanner instances to discover each other and synchronize device metadata (floors, positions, labels, favorites) automatically. New features: - Peer registration API with mutual auto-registration - Background sync thread with configurable interval - Timestamp-based conflict resolution (newest wins) - Config options: peers, sync_interval_seconds, accept_registrations API endpoints: - GET/POST /api/peers - list peers, register new peer - DELETE /api/peers/<id> - remove peer - GET/POST /api/sync/devices - device sync for peers - POST /api/sync/trigger - manual sync trigger Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
208
src/rf_mapper/sync.py
Normal file
208
src/rf_mapper/sync.py
Normal file
@@ -0,0 +1,208 @@
|
||||
"""Peer synchronization for RF Mapper multi-scanner deployments"""
|
||||
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
import requests
|
||||
|
||||
from .config import Config
|
||||
from .database import DeviceDatabase
|
||||
|
||||
|
||||
def get_local_ip() -> str:
|
||||
"""Get the local IP address of this machine."""
|
||||
try:
|
||||
# Create a socket and connect to an external address
|
||||
# This doesn't actually send data, just determines the local IP
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s.connect(("8.8.8.8", 80))
|
||||
ip = s.getsockname()[0]
|
||||
s.close()
|
||||
return ip
|
||||
except Exception:
|
||||
return "127.0.0.1"
|
||||
|
||||
|
||||
class PeerSync:
|
||||
"""Manages peer discovery and device metadata synchronization.
|
||||
|
||||
Handles:
|
||||
- Registration with peer scanners
|
||||
- Background sync thread for pulling/pushing device metadata
|
||||
- Conflict resolution using timestamps
|
||||
"""
|
||||
|
||||
def __init__(self, config: Config, db: DeviceDatabase):
|
||||
"""Initialize peer sync manager.
|
||||
|
||||
Args:
|
||||
config: RF Mapper configuration
|
||||
db: Device database instance
|
||||
"""
|
||||
self.config = config
|
||||
self.db = db
|
||||
self.scanner_identity = config.get_scanner_identity()
|
||||
self._running = False
|
||||
self._thread: Optional[threading.Thread] = None
|
||||
self._last_sync: dict[str, str] = {} # peer_id -> last sync timestamp
|
||||
|
||||
@property
|
||||
def local_url(self) -> str:
|
||||
"""Get this scanner's URL for peer registration."""
|
||||
return f"http://{get_local_ip()}:{self.config.web.port}"
|
||||
|
||||
def start(self):
|
||||
"""Start the background sync thread."""
|
||||
if self._thread and self._thread.is_alive():
|
||||
return # Already running
|
||||
|
||||
self._running = True
|
||||
self._thread = threading.Thread(target=self._sync_loop, daemon=True)
|
||||
self._thread.start()
|
||||
print(f"[Sync] Background sync started (interval: {self.config.scanner.sync_interval_seconds}s)")
|
||||
|
||||
def stop(self):
|
||||
"""Stop the background sync thread."""
|
||||
self._running = False
|
||||
if self._thread:
|
||||
self._thread.join(timeout=5)
|
||||
self._thread = None
|
||||
print("[Sync] Background sync stopped")
|
||||
|
||||
def register_with_peer(self, peer_url: str) -> dict:
|
||||
"""Register this scanner with a peer.
|
||||
|
||||
Args:
|
||||
peer_url: Base URL of the peer scanner
|
||||
|
||||
Returns:
|
||||
Response from peer containing peer info and known peers
|
||||
|
||||
Raises:
|
||||
requests.RequestException on network errors
|
||||
"""
|
||||
payload = {
|
||||
"id": self.scanner_identity["id"],
|
||||
"name": self.scanner_identity["name"],
|
||||
"url": self.local_url,
|
||||
"floor": self.scanner_identity["floor"],
|
||||
"latitude": self.scanner_identity["latitude"],
|
||||
"longitude": self.scanner_identity["longitude"]
|
||||
}
|
||||
|
||||
resp = requests.post(
|
||||
f"{peer_url.rstrip('/')}/api/peers/register",
|
||||
json=payload,
|
||||
timeout=10
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
def sync_devices_from_peer(self, peer_url: str, since: Optional[str] = None) -> int:
|
||||
"""Pull device updates from a peer.
|
||||
|
||||
Args:
|
||||
peer_url: Base URL of the peer scanner
|
||||
since: ISO timestamp to get updates since (None = all)
|
||||
|
||||
Returns:
|
||||
Number of devices updated locally
|
||||
"""
|
||||
params = {"since": since} if since else {}
|
||||
|
||||
resp = requests.get(
|
||||
f"{peer_url.rstrip('/')}/api/sync/devices",
|
||||
params=params,
|
||||
timeout=15
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
devices = data.get("devices", [])
|
||||
source_scanner = data.get("scanner_id", "unknown")
|
||||
|
||||
updated = self.db.bulk_update_devices(devices, source_scanner)
|
||||
return updated
|
||||
|
||||
def push_devices_to_peer(self, peer_url: str, since: Optional[str] = None) -> dict:
|
||||
"""Push device updates to a peer.
|
||||
|
||||
Args:
|
||||
peer_url: Base URL of the peer scanner
|
||||
since: ISO timestamp to send updates since (None = all)
|
||||
|
||||
Returns:
|
||||
Response from peer with sync status
|
||||
"""
|
||||
devices = self.db.get_devices_since(since)
|
||||
|
||||
payload = {
|
||||
"source_scanner": self.scanner_identity["id"],
|
||||
"devices": devices
|
||||
}
|
||||
|
||||
resp = requests.post(
|
||||
f"{peer_url.rstrip('/')}/api/sync/devices",
|
||||
json=payload,
|
||||
timeout=15
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
def _sync_loop(self):
|
||||
"""Background sync loop - runs every sync_interval_seconds."""
|
||||
# Initial delay to let app fully start
|
||||
time.sleep(5)
|
||||
|
||||
while self._running:
|
||||
peers = self.db.get_peers()
|
||||
|
||||
for peer in peers:
|
||||
peer_id = peer["scanner_id"]
|
||||
peer_url = peer["url"]
|
||||
|
||||
try:
|
||||
# Pull updates from peer
|
||||
since = self._last_sync.get(peer_id)
|
||||
updated = self.sync_devices_from_peer(peer_url, since)
|
||||
|
||||
# Push our updates to peer
|
||||
self.push_devices_to_peer(peer_url, since)
|
||||
|
||||
# Update last sync time and peer last_seen
|
||||
self._last_sync[peer_id] = datetime.now().isoformat()
|
||||
self.db.update_peer_last_seen(peer_id)
|
||||
|
||||
if updated > 0:
|
||||
print(f"[Sync] Synced with {peer_id}: updated {updated} devices")
|
||||
|
||||
except requests.exceptions.ConnectionError:
|
||||
print(f"[Sync] Peer {peer_id} unreachable at {peer_url}")
|
||||
except requests.exceptions.Timeout:
|
||||
print(f"[Sync] Sync with {peer_id} timed out")
|
||||
except Exception as e:
|
||||
print(f"[Sync] Error syncing with {peer_id}: {e}")
|
||||
|
||||
# Wait for next sync interval
|
||||
for _ in range(self.config.scanner.sync_interval_seconds):
|
||||
if not self._running:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
def get_status(self) -> dict:
|
||||
"""Get current sync status.
|
||||
|
||||
Returns:
|
||||
Dict with running state, peer count, last sync times
|
||||
"""
|
||||
peers = self.db.get_peers()
|
||||
return {
|
||||
"running": self._running,
|
||||
"sync_interval_seconds": self.config.scanner.sync_interval_seconds,
|
||||
"peer_count": len(peers),
|
||||
"last_sync": self._last_sync.copy(),
|
||||
"local_url": self.local_url
|
||||
}
|
||||
Reference in New Issue
Block a user