- Heartbeat service: check_sensor_status (online/stale/offline) - GET /sensors/heartbeat: status summary for all sensors - POST /sensors/heartbeat: refresh heartbeat status - GET /sensors/<hostname>/metrics: activity counts, recent events - CLI command: flask check-heartbeats - Added 7 new tests (34 total)
338 lines
12 KiB
Python
338 lines
12 KiB
Python
"""Sensor API tests."""
|
|
from unittest.mock import patch, MagicMock
|
|
from esp32_web.extensions import db
|
|
from esp32_web.models import Sensor, Event
|
|
|
|
|
|
def test_list_sensors_empty(client):
|
|
"""Test listing sensors when empty."""
|
|
response = client.get('/api/v1/sensors')
|
|
assert response.status_code == 200
|
|
assert response.json == {'sensors': []}
|
|
|
|
|
|
def test_get_sensor_not_found(client):
|
|
"""Test getting non-existent sensor."""
|
|
response = client.get('/api/v1/sensors/nonexistent')
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_health_check(client):
|
|
"""Test health endpoint."""
|
|
response = client.get('/health')
|
|
assert response.status_code == 200
|
|
assert response.json['status'] == 'ok'
|
|
assert 'uptime' in response.json
|
|
assert 'uptime_seconds' in response.json
|
|
|
|
|
|
# Fleet Management Tests
|
|
|
|
def test_get_config_not_found(client):
|
|
"""Test getting config for non-existent sensor."""
|
|
response = client.get('/api/v1/sensors/nonexistent/config')
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_get_config_timeout(client, app):
|
|
"""Test config endpoint when sensor times out."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
|
|
mock_sock = MagicMock()
|
|
mock_sock.recvfrom.side_effect = TimeoutError()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
response = client.get('/api/v1/sensors/test-sensor/config')
|
|
assert response.status_code == 504
|
|
assert 'not responding' in response.json['error']
|
|
|
|
|
|
def test_get_config_success(client, app):
|
|
"""Test successful config retrieval."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
|
|
mock_sock = MagicMock()
|
|
mock_sock.recvfrom.return_value = (
|
|
b'OK STATUS rate=10 power=20 adaptive=on presence=off',
|
|
('192.168.1.100', 5501)
|
|
)
|
|
mock_socket.return_value = mock_sock
|
|
|
|
response = client.get('/api/v1/sensors/test-sensor/config')
|
|
assert response.status_code == 200
|
|
assert response.json['config']['rate'] == 10
|
|
assert response.json['config']['power'] == 20
|
|
assert response.json['config']['adaptive'] is True
|
|
assert response.json['config']['presence'] is False
|
|
|
|
|
|
def test_update_config_not_found(client):
|
|
"""Test updating config for non-existent sensor."""
|
|
response = client.put('/api/v1/sensors/nonexistent/config',
|
|
json={'rate': 5})
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_update_config_unknown_key(client, app):
|
|
"""Test updating config with unknown key."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
with patch('esp32_web.api.sensors.socket.socket'):
|
|
response = client.put('/api/v1/sensors/test-sensor/config',
|
|
json={'invalid_key': 123})
|
|
assert response.status_code == 200
|
|
assert 'Unknown config key' in response.json['errors'][0]
|
|
|
|
|
|
def test_update_config_success(client, app):
|
|
"""Test successful config update."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
|
|
mock_sock = MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
response = client.put('/api/v1/sensors/test-sensor/config',
|
|
json={'rate': 5, 'adaptive': True})
|
|
assert response.status_code == 200
|
|
assert response.json['results']['rate'] == 'ok'
|
|
assert response.json['results']['adaptive'] == 'ok'
|
|
|
|
|
|
def test_trigger_ota_not_found(client):
|
|
"""Test OTA trigger for non-existent sensor."""
|
|
response = client.post('/api/v1/sensors/nonexistent/ota',
|
|
json={'url': 'http://example.com/fw.bin'})
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_trigger_ota_missing_url(client, app):
|
|
"""Test OTA trigger without URL."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
response = client.post('/api/v1/sensors/test-sensor/ota', json={})
|
|
assert response.status_code == 400
|
|
assert 'Missing OTA URL' in response.json['error']
|
|
|
|
|
|
def test_trigger_ota_invalid_url(client, app):
|
|
"""Test OTA trigger with invalid URL scheme."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
response = client.post('/api/v1/sensors/test-sensor/ota',
|
|
json={'url': 'ftp://example.com/fw.bin'})
|
|
assert response.status_code == 400
|
|
assert 'Invalid URL scheme' in response.json['error']
|
|
|
|
|
|
def test_trigger_ota_success(client, app):
|
|
"""Test successful OTA trigger."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
|
|
mock_sock = MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
response = client.post('/api/v1/sensors/test-sensor/ota',
|
|
json={'url': 'https://example.com/fw.bin'})
|
|
assert response.status_code == 200
|
|
assert response.json['status'] == 'ota_triggered'
|
|
assert response.json['url'] == 'https://example.com/fw.bin'
|
|
|
|
|
|
def test_trigger_calibrate_not_found(client):
|
|
"""Test calibration trigger for non-existent sensor."""
|
|
response = client.post('/api/v1/sensors/nonexistent/calibrate', json={})
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_trigger_calibrate_invalid_seconds(client, app):
|
|
"""Test calibration with invalid seconds."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
response = client.post('/api/v1/sensors/test-sensor/calibrate',
|
|
json={'seconds': 100})
|
|
assert response.status_code == 400
|
|
assert 'seconds must be 3-60' in response.json['error']
|
|
|
|
|
|
def test_trigger_calibrate_success(client, app):
|
|
"""Test successful calibration trigger."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
|
|
mock_sock = MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
response = client.post('/api/v1/sensors/test-sensor/calibrate',
|
|
json={'seconds': 15})
|
|
assert response.status_code == 200
|
|
assert response.json['status'] == 'calibration_started'
|
|
assert response.json['seconds'] == 15
|
|
|
|
|
|
def test_trigger_calibrate_default_seconds(client, app):
|
|
"""Test calibration with default seconds."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
|
|
mock_sock = MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
response = client.post('/api/v1/sensors/test-sensor/calibrate',
|
|
json={})
|
|
assert response.status_code == 200
|
|
assert response.json['seconds'] == 10
|
|
|
|
|
|
# Heartbeat Tests
|
|
|
|
def test_heartbeat_status_empty(client):
|
|
"""Test heartbeat status with no sensors."""
|
|
response = client.get('/api/v1/sensors/heartbeat')
|
|
assert response.status_code == 200
|
|
assert response.json['total'] == 0
|
|
assert response.json['sensors'] == []
|
|
|
|
|
|
def test_heartbeat_status_with_sensors(client, app):
|
|
"""Test heartbeat status with sensors."""
|
|
from datetime import datetime, UTC, timedelta
|
|
|
|
with app.app_context():
|
|
# Online sensor (just now)
|
|
s1 = Sensor(hostname='sensor-online', ip='192.168.1.1',
|
|
last_seen=datetime.now(UTC))
|
|
# Stale sensor (3 minutes ago)
|
|
s2 = Sensor(hostname='sensor-stale', ip='192.168.1.2',
|
|
last_seen=datetime.now(UTC) - timedelta(minutes=3))
|
|
# Offline sensor (10 minutes ago)
|
|
s3 = Sensor(hostname='sensor-offline', ip='192.168.1.3',
|
|
last_seen=datetime.now(UTC) - timedelta(minutes=10))
|
|
db.session.add_all([s1, s2, s3])
|
|
db.session.commit()
|
|
|
|
response = client.get('/api/v1/sensors/heartbeat')
|
|
assert response.status_code == 200
|
|
assert response.json['total'] == 3
|
|
assert response.json['online'] == 1
|
|
assert response.json['stale'] == 1
|
|
assert response.json['offline'] == 1
|
|
|
|
|
|
def test_refresh_heartbeats(client, app):
|
|
"""Test refreshing heartbeat status."""
|
|
from datetime import datetime, UTC, timedelta
|
|
|
|
with app.app_context():
|
|
# Offline sensor but status still says 'online'
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.1',
|
|
last_seen=datetime.now(UTC) - timedelta(minutes=10),
|
|
status='online')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
response = client.post('/api/v1/sensors/heartbeat')
|
|
assert response.status_code == 200
|
|
assert response.json['status'] == 'updated'
|
|
assert response.json['offline'] == 1
|
|
|
|
# Verify status was updated
|
|
with app.app_context():
|
|
sensor = db.session.scalar(db.select(Sensor).where(Sensor.hostname == 'test-sensor'))
|
|
assert sensor.status == 'offline'
|
|
|
|
|
|
# Metrics Tests
|
|
|
|
def test_metrics_not_found(client):
|
|
"""Test metrics for non-existent sensor."""
|
|
response = client.get('/api/v1/sensors/nonexistent/metrics')
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_metrics_empty(client, app):
|
|
"""Test metrics for sensor with no activity."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
response = client.get('/api/v1/sensors/test-sensor/metrics')
|
|
assert response.status_code == 200
|
|
assert response.json['hostname'] == 'test-sensor'
|
|
assert response.json['hours'] == 24
|
|
assert response.json['activity']['sightings'] == 0
|
|
assert response.json['activity']['alerts'] == 0
|
|
assert response.json['activity']['events'] == 0
|
|
assert response.json['recent_events'] == []
|
|
|
|
|
|
def test_metrics_with_events(client, app):
|
|
"""Test metrics with sensor events."""
|
|
from datetime import datetime, UTC
|
|
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.flush()
|
|
|
|
# Add some events
|
|
event1 = Event(sensor_id=sensor.id, event_type='presence',
|
|
payload_json='{"state": "detected"}',
|
|
timestamp=datetime.now(UTC))
|
|
event2 = Event(sensor_id=sensor.id, event_type='calibration',
|
|
payload_json='{"nsub": 52}',
|
|
timestamp=datetime.now(UTC))
|
|
db.session.add_all([event1, event2])
|
|
db.session.commit()
|
|
|
|
response = client.get('/api/v1/sensors/test-sensor/metrics')
|
|
assert response.status_code == 200
|
|
assert response.json['activity']['events'] == 2
|
|
assert len(response.json['recent_events']) == 2
|
|
|
|
|
|
def test_metrics_custom_hours(client, app):
|
|
"""Test metrics with custom time range."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
response = client.get('/api/v1/sensors/test-sensor/metrics?hours=48')
|
|
assert response.status_code == 200
|
|
assert response.json['hours'] == 48
|