feat: v0.1.3 — fleet management endpoints

- GET /sensors/<hostname>/config: query sensor STATUS, parse response
- PUT /sensors/<hostname>/config: update rate, power, adaptive, etc.
- POST /sensors/<hostname>/ota: trigger OTA update with URL
- POST /sensors/<hostname>/calibrate: trigger baseline calibration

Added 14 new tests for fleet management endpoints.
This commit is contained in:
user
2026-02-05 21:21:35 +01:00
parent 58c974b535
commit 4b72b3293e
5 changed files with 367 additions and 11 deletions

View File

@@ -24,12 +24,12 @@
- [x] Device profile enrichment
- [x] Export endpoints (CSV, JSON)
## v0.1.3 - Fleet Management
## v0.1.3 - Fleet Management [DONE]
- [ ] Sensor config endpoint (GET/PUT)
- [ ] OTA trigger endpoint
- [ ] Calibration trigger endpoint
- [ ] Sensor history/metrics
- [x] Sensor config endpoint (GET/PUT)
- [x] OTA trigger endpoint
- [x] Calibration trigger endpoint
- [ ] Sensor history/metrics (moved to v0.1.4)
## v0.1.4 - Zones & Presence

View File

@@ -5,10 +5,10 @@
## Current Sprint: v0.1.3 — Fleet Management
### P1 - High
- [ ] `GET /api/v1/sensors/<id>/config` — read sensor config
- [ ] `PUT /api/v1/sensors/<id>/config` — update sensor config
- [ ] `POST /api/v1/sensors/<id>/ota` — trigger OTA update
- [ ] `POST /api/v1/sensors/<id>/calibrate` — trigger calibration
- [x] `GET /api/v1/sensors/<id>/config` — read sensor config
- [x] `PUT /api/v1/sensors/<id>/config` — update sensor config
- [x] `POST /api/v1/sensors/<id>/ota` — trigger OTA update
- [x] `POST /api/v1/sensors/<id>/calibrate` — trigger calibration
### P2 - Normal
- [ ] Sensor heartbeat timeout detection

View File

@@ -1,6 +1,6 @@
[project]
name = "esp32-web"
version = "0.1.2"
version = "0.1.3"
description = "REST API backend for ESP32 sensor fleet"
requires-python = ">=3.11"
dependencies = [

View File

@@ -1,4 +1,5 @@
"""Sensor endpoints."""
import json
import socket
from flask import request, current_app
from . import bp
@@ -37,7 +38,8 @@ def send_command(hostname):
# Whitelist allowed commands
allowed = ('STATUS', 'REBOOT', 'IDENTIFY', 'BLE', 'ADAPTIVE', 'RATE', 'POWER',
'CSIMODE', 'PRESENCE', 'CALIBRATE', 'CHANSCAN')
'CSIMODE', 'PRESENCE', 'CALIBRATE', 'CHANSCAN', 'OTA', 'TARGET',
'THRESHOLD', 'SCANRATE', 'PROBERATE', 'POWERSAVE', 'FLOODTHRESH')
if not any(command.startswith(a) for a in allowed):
return {'error': 'Command not allowed'}, 403
@@ -51,3 +53,163 @@ def send_command(hostname):
return {'error': f'Socket error: {e}'}, 500
return {'status': 'sent', 'command': command}
def _send_command_with_response(ip: str, command: str, timeout: float = 2.0) -> str | None:
"""Send UDP command and wait for response."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
sock.sendto(command.encode(), (ip, current_app.config['SENSOR_CMD_PORT']))
data, _ = sock.recvfrom(1400)
sock.close()
return data.decode('utf-8', errors='replace').strip()
except socket.timeout:
return None
except socket.error:
return None
def _parse_status_response(response: str) -> dict:
"""Parse STATUS response into dict."""
result = {}
if not response or not response.startswith('OK STATUS'):
return result
# Parse key=value pairs
for part in response.split():
if '=' in part:
key, value = part.split('=', 1)
# Try to convert to appropriate type
if value.isdigit():
result[key] = int(value)
elif value.replace('.', '').replace('-', '').isdigit():
try:
result[key] = float(value)
except ValueError:
result[key] = value
elif value in ('on', 'true'):
result[key] = True
elif value in ('off', 'false'):
result[key] = False
else:
result[key] = value
return result
@bp.route('/sensors/<hostname>/config')
def get_sensor_config(hostname):
"""Get sensor configuration by querying STATUS."""
sensor = db.session.scalar(db.select(Sensor).where(Sensor.hostname == hostname))
if not sensor:
return {'error': 'Sensor not found'}, 404
response = _send_command_with_response(sensor.ip, 'STATUS')
if not response:
return {'error': 'Sensor not responding'}, 504
config = _parse_status_response(response)
config['hostname'] = sensor.hostname
config['ip'] = sensor.ip
# Store config in database
sensor.config_json = json.dumps(config)
db.session.commit()
return {'config': config}
@bp.route('/sensors/<hostname>/config', methods=['PUT'])
def update_sensor_config(hostname):
"""Update sensor configuration."""
sensor = db.session.scalar(db.select(Sensor).where(Sensor.hostname == hostname))
if not sensor:
return {'error': 'Sensor not found'}, 404
data = request.get_json()
if not data:
return {'error': 'No configuration provided'}, 400
results = {}
errors = []
# Map config keys to commands
config_commands = {
'rate': lambda v: f'RATE {v}',
'power': lambda v: f'POWER {v}',
'adaptive': lambda v: f'ADAPTIVE {"ON" if v else "OFF"}',
'threshold': lambda v: f'THRESHOLD {v}',
'ble': lambda v: f'BLE {"ON" if v else "OFF"}',
'csi_mode': lambda v: f'CSIMODE {v.upper()}',
'presence': lambda v: f'PRESENCE {"ON" if v else "OFF"}',
'powersave': lambda v: f'POWERSAVE {"ON" if v else "OFF"}',
'chanscan': lambda v: f'CHANSCAN {"ON" if v else "OFF"}',
}
for key, value in data.items():
if key not in config_commands:
errors.append(f'Unknown config key: {key}')
continue
command = config_commands[key](value)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(2.0)
sock.sendto(command.encode(), (sensor.ip, current_app.config['SENSOR_CMD_PORT']))
sock.close()
results[key] = 'ok'
except socket.error as e:
errors.append(f'{key}: {e}')
return {'results': results, 'errors': errors}
@bp.route('/sensors/<hostname>/ota', methods=['POST'])
def trigger_ota(hostname):
"""Trigger OTA update on sensor."""
sensor = db.session.scalar(db.select(Sensor).where(Sensor.hostname == hostname))
if not sensor:
return {'error': 'Sensor not found'}, 404
data = request.get_json()
if not data or 'url' not in data:
return {'error': 'Missing OTA URL'}, 400
url = data['url']
if not url.startswith(('http://', 'https://')):
return {'error': 'Invalid URL scheme'}, 400
command = f'OTA {url}'
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(2.0)
sock.sendto(command.encode(), (sensor.ip, current_app.config['SENSOR_CMD_PORT']))
sock.close()
except socket.error as e:
return {'error': f'Socket error: {e}'}, 500
return {'status': 'ota_triggered', 'url': url}
@bp.route('/sensors/<hostname>/calibrate', methods=['POST'])
def trigger_calibrate(hostname):
"""Trigger baseline calibration on sensor."""
sensor = db.session.scalar(db.select(Sensor).where(Sensor.hostname == hostname))
if not sensor:
return {'error': 'Sensor not found'}, 404
data = request.get_json() or {}
seconds = data.get('seconds', 10)
if not isinstance(seconds, int) or seconds < 3 or seconds > 60:
return {'error': 'seconds must be 3-60'}, 400
command = f'CALIBRATE {seconds}'
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(2.0)
sock.sendto(command.encode(), (sensor.ip, current_app.config['SENSOR_CMD_PORT']))
sock.close()
except socket.error as e:
return {'error': f'Socket error: {e}'}, 500
return {'status': 'calibration_started', 'seconds': seconds}

View File

@@ -1,4 +1,7 @@
"""Sensor API tests."""
from unittest.mock import patch, MagicMock
from esp32_web.extensions import db
from esp32_web.models import Sensor
def test_list_sensors_empty(client):
@@ -21,3 +24,194 @@ def test_health_check(client):
assert response.json['status'] == 'ok'
assert 'uptime' in response.json
assert 'uptime_seconds' in response.json
# Fleet Management Tests
def test_get_config_not_found(client):
"""Test getting config for non-existent sensor."""
response = client.get('/api/v1/sensors/nonexistent/config')
assert response.status_code == 404
def test_get_config_timeout(client, app):
"""Test config endpoint when sensor times out."""
with app.app_context():
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
db.session.add(sensor)
db.session.commit()
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
mock_sock = MagicMock()
mock_sock.recvfrom.side_effect = TimeoutError()
mock_socket.return_value = mock_sock
response = client.get('/api/v1/sensors/test-sensor/config')
assert response.status_code == 504
assert 'not responding' in response.json['error']
def test_get_config_success(client, app):
"""Test successful config retrieval."""
with app.app_context():
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
db.session.add(sensor)
db.session.commit()
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
mock_sock = MagicMock()
mock_sock.recvfrom.return_value = (
b'OK STATUS rate=10 power=20 adaptive=on presence=off',
('192.168.1.100', 5501)
)
mock_socket.return_value = mock_sock
response = client.get('/api/v1/sensors/test-sensor/config')
assert response.status_code == 200
assert response.json['config']['rate'] == 10
assert response.json['config']['power'] == 20
assert response.json['config']['adaptive'] is True
assert response.json['config']['presence'] is False
def test_update_config_not_found(client):
"""Test updating config for non-existent sensor."""
response = client.put('/api/v1/sensors/nonexistent/config',
json={'rate': 5})
assert response.status_code == 404
def test_update_config_unknown_key(client, app):
"""Test updating config with unknown key."""
with app.app_context():
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
db.session.add(sensor)
db.session.commit()
with patch('esp32_web.api.sensors.socket.socket'):
response = client.put('/api/v1/sensors/test-sensor/config',
json={'invalid_key': 123})
assert response.status_code == 200
assert 'Unknown config key' in response.json['errors'][0]
def test_update_config_success(client, app):
"""Test successful config update."""
with app.app_context():
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
db.session.add(sensor)
db.session.commit()
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
mock_sock = MagicMock()
mock_socket.return_value = mock_sock
response = client.put('/api/v1/sensors/test-sensor/config',
json={'rate': 5, 'adaptive': True})
assert response.status_code == 200
assert response.json['results']['rate'] == 'ok'
assert response.json['results']['adaptive'] == 'ok'
def test_trigger_ota_not_found(client):
"""Test OTA trigger for non-existent sensor."""
response = client.post('/api/v1/sensors/nonexistent/ota',
json={'url': 'http://example.com/fw.bin'})
assert response.status_code == 404
def test_trigger_ota_missing_url(client, app):
"""Test OTA trigger without URL."""
with app.app_context():
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
db.session.add(sensor)
db.session.commit()
response = client.post('/api/v1/sensors/test-sensor/ota', json={})
assert response.status_code == 400
assert 'Missing OTA URL' in response.json['error']
def test_trigger_ota_invalid_url(client, app):
"""Test OTA trigger with invalid URL scheme."""
with app.app_context():
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
db.session.add(sensor)
db.session.commit()
response = client.post('/api/v1/sensors/test-sensor/ota',
json={'url': 'ftp://example.com/fw.bin'})
assert response.status_code == 400
assert 'Invalid URL scheme' in response.json['error']
def test_trigger_ota_success(client, app):
"""Test successful OTA trigger."""
with app.app_context():
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
db.session.add(sensor)
db.session.commit()
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
mock_sock = MagicMock()
mock_socket.return_value = mock_sock
response = client.post('/api/v1/sensors/test-sensor/ota',
json={'url': 'https://example.com/fw.bin'})
assert response.status_code == 200
assert response.json['status'] == 'ota_triggered'
assert response.json['url'] == 'https://example.com/fw.bin'
def test_trigger_calibrate_not_found(client):
"""Test calibration trigger for non-existent sensor."""
response = client.post('/api/v1/sensors/nonexistent/calibrate', json={})
assert response.status_code == 404
def test_trigger_calibrate_invalid_seconds(client, app):
"""Test calibration with invalid seconds."""
with app.app_context():
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
db.session.add(sensor)
db.session.commit()
response = client.post('/api/v1/sensors/test-sensor/calibrate',
json={'seconds': 100})
assert response.status_code == 400
assert 'seconds must be 3-60' in response.json['error']
def test_trigger_calibrate_success(client, app):
"""Test successful calibration trigger."""
with app.app_context():
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
db.session.add(sensor)
db.session.commit()
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
mock_sock = MagicMock()
mock_socket.return_value = mock_sock
response = client.post('/api/v1/sensors/test-sensor/calibrate',
json={'seconds': 15})
assert response.status_code == 200
assert response.json['status'] == 'calibration_started'
assert response.json['seconds'] == 15
def test_trigger_calibrate_default_seconds(client, app):
"""Test calibration with default seconds."""
with app.app_context():
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
db.session.add(sensor)
db.session.commit()
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
mock_sock = MagicMock()
mock_socket.return_value = mock_sock
response = client.post('/api/v1/sensors/test-sensor/calibrate',
json={})
assert response.status_code == 200
assert response.json['seconds'] == 10