Flask API backend for ESP32 sensor fleet: - App factory pattern with blueprints - SQLAlchemy 2.x models (Sensor, Device, Sighting, Alert, Event, Probe) - UDP collector for sensor data streams - REST API endpoints for sensors, devices, alerts, events, probes, stats - pytest setup with fixtures - Containerfile for podman deployment - Makefile for common tasks
36 lines
1.2 KiB
Python
36 lines
1.2 KiB
Python
"""Parser tests."""
|
|
from esp32_web.collector.parsers import parse_packet
|
|
from esp32_web.models import Sensor, Device, Alert
|
|
|
|
|
|
def test_parse_ble_data(app):
|
|
"""Test parsing BLE_DATA packet."""
|
|
with app.app_context():
|
|
from esp32_web.extensions import db
|
|
|
|
data = 'BLE_DATA,test-sensor,aa:bb:cc:dd:ee:ff,-50,pub,TestDevice,0x004C,10,0'
|
|
parse_packet(data, ('192.168.1.100', 5500))
|
|
|
|
sensor = db.session.scalar(db.select(Sensor).where(Sensor.hostname == 'test-sensor'))
|
|
assert sensor is not None
|
|
assert sensor.ip == '192.168.1.100'
|
|
|
|
device = db.session.scalar(db.select(Device).where(Device.mac == 'aa:bb:cc:dd:ee:ff'))
|
|
assert device is not None
|
|
assert device.device_type == 'ble'
|
|
assert device.name == 'TestDevice'
|
|
|
|
|
|
def test_parse_alert_data(app):
|
|
"""Test parsing ALERT_DATA packet."""
|
|
with app.app_context():
|
|
from esp32_web.extensions import db
|
|
|
|
data = 'ALERT_DATA,test-sensor,deauth,aa:bb:cc:dd:ee:ff,11:22:33:44:55:66,-40'
|
|
parse_packet(data, ('192.168.1.100', 5500))
|
|
|
|
alert = db.session.scalar(db.select(Alert))
|
|
assert alert is not None
|
|
assert alert.alert_type == 'deauth'
|
|
assert alert.source_mac == 'aa:bb:cc:dd:ee:ff'
|