Add shared paginate() helper with total count to all list endpoints. Add request logging middleware (method, path, status, duration, IP). Add data retention service with configurable thresholds and CLI command.
341 lines
12 KiB
Python
341 lines
12 KiB
Python
"""Sensor API tests."""
|
|
from unittest.mock import patch, MagicMock
|
|
from esp32_web.extensions import db
|
|
from esp32_web.models import Sensor, Event
|
|
|
|
|
|
def test_list_sensors_empty(client):
|
|
"""Test listing sensors when empty."""
|
|
response = client.get('/api/v1/sensors')
|
|
assert response.status_code == 200
|
|
assert response.json['sensors'] == []
|
|
assert response.json['total'] == 0
|
|
assert response.json['limit'] == 100
|
|
assert response.json['offset'] == 0
|
|
|
|
|
|
def test_get_sensor_not_found(client):
|
|
"""Test getting non-existent sensor."""
|
|
response = client.get('/api/v1/sensors/nonexistent')
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_health_check(client):
|
|
"""Test health endpoint."""
|
|
response = client.get('/health')
|
|
assert response.status_code == 200
|
|
assert response.json['status'] == 'ok'
|
|
assert 'uptime' in response.json
|
|
assert 'uptime_seconds' in response.json
|
|
|
|
|
|
# Fleet Management Tests
|
|
|
|
def test_get_config_not_found(client):
|
|
"""Test getting config for non-existent sensor."""
|
|
response = client.get('/api/v1/sensors/nonexistent/config')
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_get_config_timeout(client, app):
|
|
"""Test config endpoint when sensor times out."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
|
|
mock_sock = MagicMock()
|
|
mock_sock.recvfrom.side_effect = TimeoutError()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
response = client.get('/api/v1/sensors/test-sensor/config')
|
|
assert response.status_code == 504
|
|
assert 'not responding' in response.json['error']
|
|
|
|
|
|
def test_get_config_success(client, app):
|
|
"""Test successful config retrieval."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
|
|
mock_sock = MagicMock()
|
|
mock_sock.recvfrom.return_value = (
|
|
b'OK STATUS rate=10 power=20 adaptive=on presence=off',
|
|
('192.168.1.100', 5501)
|
|
)
|
|
mock_socket.return_value = mock_sock
|
|
|
|
response = client.get('/api/v1/sensors/test-sensor/config')
|
|
assert response.status_code == 200
|
|
assert response.json['config']['rate'] == 10
|
|
assert response.json['config']['power'] == 20
|
|
assert response.json['config']['adaptive'] is True
|
|
assert response.json['config']['presence'] is False
|
|
|
|
|
|
def test_update_config_not_found(client):
|
|
"""Test updating config for non-existent sensor."""
|
|
response = client.put('/api/v1/sensors/nonexistent/config',
|
|
json={'rate': 5})
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_update_config_unknown_key(client, app):
|
|
"""Test updating config with unknown key."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
with patch('esp32_web.api.sensors.socket.socket'):
|
|
response = client.put('/api/v1/sensors/test-sensor/config',
|
|
json={'invalid_key': 123})
|
|
assert response.status_code == 200
|
|
assert 'Unknown config key' in response.json['errors'][0]
|
|
|
|
|
|
def test_update_config_success(client, app):
|
|
"""Test successful config update."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
|
|
mock_sock = MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
response = client.put('/api/v1/sensors/test-sensor/config',
|
|
json={'rate': 5, 'adaptive': True})
|
|
assert response.status_code == 200
|
|
assert response.json['results']['rate'] == 'ok'
|
|
assert response.json['results']['adaptive'] == 'ok'
|
|
|
|
|
|
def test_trigger_ota_not_found(client):
|
|
"""Test OTA trigger for non-existent sensor."""
|
|
response = client.post('/api/v1/sensors/nonexistent/ota',
|
|
json={'url': 'http://example.com/fw.bin'})
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_trigger_ota_missing_url(client, app):
|
|
"""Test OTA trigger without URL."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
response = client.post('/api/v1/sensors/test-sensor/ota', json={})
|
|
assert response.status_code == 400
|
|
assert 'Missing OTA URL' in response.json['error']
|
|
|
|
|
|
def test_trigger_ota_invalid_url(client, app):
|
|
"""Test OTA trigger with invalid URL scheme."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
response = client.post('/api/v1/sensors/test-sensor/ota',
|
|
json={'url': 'ftp://example.com/fw.bin'})
|
|
assert response.status_code == 400
|
|
assert 'Invalid URL scheme' in response.json['error']
|
|
|
|
|
|
def test_trigger_ota_success(client, app):
|
|
"""Test successful OTA trigger."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
|
|
mock_sock = MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
response = client.post('/api/v1/sensors/test-sensor/ota',
|
|
json={'url': 'https://example.com/fw.bin'})
|
|
assert response.status_code == 200
|
|
assert response.json['status'] == 'ota_triggered'
|
|
assert response.json['url'] == 'https://example.com/fw.bin'
|
|
|
|
|
|
def test_trigger_calibrate_not_found(client):
|
|
"""Test calibration trigger for non-existent sensor."""
|
|
response = client.post('/api/v1/sensors/nonexistent/calibrate', json={})
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_trigger_calibrate_invalid_seconds(client, app):
|
|
"""Test calibration with invalid seconds."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
response = client.post('/api/v1/sensors/test-sensor/calibrate',
|
|
json={'seconds': 100})
|
|
assert response.status_code == 400
|
|
assert 'seconds must be 3-60' in response.json['error']
|
|
|
|
|
|
def test_trigger_calibrate_success(client, app):
|
|
"""Test successful calibration trigger."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
|
|
mock_sock = MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
response = client.post('/api/v1/sensors/test-sensor/calibrate',
|
|
json={'seconds': 15})
|
|
assert response.status_code == 200
|
|
assert response.json['status'] == 'calibration_started'
|
|
assert response.json['seconds'] == 15
|
|
|
|
|
|
def test_trigger_calibrate_default_seconds(client, app):
|
|
"""Test calibration with default seconds."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
with patch('esp32_web.api.sensors.socket.socket') as mock_socket:
|
|
mock_sock = MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
response = client.post('/api/v1/sensors/test-sensor/calibrate',
|
|
json={})
|
|
assert response.status_code == 200
|
|
assert response.json['seconds'] == 10
|
|
|
|
|
|
# Heartbeat Tests
|
|
|
|
def test_heartbeat_status_empty(client):
|
|
"""Test heartbeat status with no sensors."""
|
|
response = client.get('/api/v1/sensors/heartbeat')
|
|
assert response.status_code == 200
|
|
assert response.json['total'] == 0
|
|
assert response.json['sensors'] == []
|
|
|
|
|
|
def test_heartbeat_status_with_sensors(client, app):
|
|
"""Test heartbeat status with sensors."""
|
|
from datetime import datetime, UTC, timedelta
|
|
|
|
with app.app_context():
|
|
# Online sensor (just now)
|
|
s1 = Sensor(hostname='sensor-online', ip='192.168.1.1',
|
|
last_seen=datetime.now(UTC))
|
|
# Stale sensor (3 minutes ago)
|
|
s2 = Sensor(hostname='sensor-stale', ip='192.168.1.2',
|
|
last_seen=datetime.now(UTC) - timedelta(minutes=3))
|
|
# Offline sensor (10 minutes ago)
|
|
s3 = Sensor(hostname='sensor-offline', ip='192.168.1.3',
|
|
last_seen=datetime.now(UTC) - timedelta(minutes=10))
|
|
db.session.add_all([s1, s2, s3])
|
|
db.session.commit()
|
|
|
|
response = client.get('/api/v1/sensors/heartbeat')
|
|
assert response.status_code == 200
|
|
assert response.json['total'] == 3
|
|
assert response.json['online'] == 1
|
|
assert response.json['stale'] == 1
|
|
assert response.json['offline'] == 1
|
|
|
|
|
|
def test_refresh_heartbeats(client, app):
|
|
"""Test refreshing heartbeat status."""
|
|
from datetime import datetime, UTC, timedelta
|
|
|
|
with app.app_context():
|
|
# Offline sensor but status still says 'online'
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.1',
|
|
last_seen=datetime.now(UTC) - timedelta(minutes=10),
|
|
status='online')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
response = client.post('/api/v1/sensors/heartbeat')
|
|
assert response.status_code == 200
|
|
assert response.json['status'] == 'updated'
|
|
assert response.json['offline'] == 1
|
|
|
|
# Verify status was updated
|
|
with app.app_context():
|
|
sensor = db.session.scalar(db.select(Sensor).where(Sensor.hostname == 'test-sensor'))
|
|
assert sensor.status == 'offline'
|
|
|
|
|
|
# Metrics Tests
|
|
|
|
def test_metrics_not_found(client):
|
|
"""Test metrics for non-existent sensor."""
|
|
response = client.get('/api/v1/sensors/nonexistent/metrics')
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_metrics_empty(client, app):
|
|
"""Test metrics for sensor with no activity."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
response = client.get('/api/v1/sensors/test-sensor/metrics')
|
|
assert response.status_code == 200
|
|
assert response.json['hostname'] == 'test-sensor'
|
|
assert response.json['hours'] == 24
|
|
assert response.json['activity']['sightings'] == 0
|
|
assert response.json['activity']['alerts'] == 0
|
|
assert response.json['activity']['events'] == 0
|
|
assert response.json['recent_events'] == []
|
|
|
|
|
|
def test_metrics_with_events(client, app):
|
|
"""Test metrics with sensor events."""
|
|
from datetime import datetime, UTC
|
|
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.flush()
|
|
|
|
# Add some events
|
|
event1 = Event(sensor_id=sensor.id, event_type='presence',
|
|
payload_json='{"state": "detected"}',
|
|
timestamp=datetime.now(UTC))
|
|
event2 = Event(sensor_id=sensor.id, event_type='calibration',
|
|
payload_json='{"nsub": 52}',
|
|
timestamp=datetime.now(UTC))
|
|
db.session.add_all([event1, event2])
|
|
db.session.commit()
|
|
|
|
response = client.get('/api/v1/sensors/test-sensor/metrics')
|
|
assert response.status_code == 200
|
|
assert response.json['activity']['events'] == 2
|
|
assert len(response.json['recent_events']) == 2
|
|
|
|
|
|
def test_metrics_custom_hours(client, app):
|
|
"""Test metrics with custom time range."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='test-sensor', ip='192.168.1.100')
|
|
db.session.add(sensor)
|
|
db.session.commit()
|
|
|
|
response = client.get('/api/v1/sensors/test-sensor/metrics?hours=48')
|
|
assert response.status_code == 200
|
|
assert response.json['hours'] == 48
|