Add shared paginate() helper with total count to all list endpoints. Add request logging middleware (method, path, status, duration, IP). Add data retention service with configurable thresholds and CLI command.
128 lines
4.1 KiB
Python
128 lines
4.1 KiB
Python
"""Data retention service tests."""
|
|
from datetime import datetime, UTC, timedelta
|
|
from esp32_web.extensions import db
|
|
from esp32_web.models import Sensor, Device, Sighting, Probe, Event, Alert
|
|
from esp32_web.services.retention import cleanup_old_data
|
|
|
|
|
|
def _setup_sensor_and_device(app):
|
|
"""Create a sensor and device for FK references."""
|
|
with app.app_context():
|
|
sensor = Sensor(hostname='s1', ip='10.0.0.1')
|
|
device = Device(mac='aa:bb:cc:dd:ee:ff', device_type='wifi',
|
|
last_seen=datetime.now(UTC))
|
|
db.session.add_all([sensor, device])
|
|
db.session.commit()
|
|
return sensor.id, device.id
|
|
|
|
|
|
def test_cleanup_deletes_old_sightings(app):
|
|
"""Sightings older than retention period are deleted."""
|
|
sensor_id, device_id = _setup_sensor_and_device(app)
|
|
with app.app_context():
|
|
# Old sighting (30 days ago, retention=14)
|
|
db.session.add(Sighting(
|
|
device_id=device_id, sensor_id=sensor_id, rssi=-50,
|
|
timestamp=datetime.now(UTC) - timedelta(days=30),
|
|
))
|
|
# Recent sighting (1 day ago)
|
|
db.session.add(Sighting(
|
|
device_id=device_id, sensor_id=sensor_id, rssi=-60,
|
|
timestamp=datetime.now(UTC) - timedelta(days=1),
|
|
))
|
|
db.session.commit()
|
|
|
|
counts = cleanup_old_data()
|
|
assert counts['sightings'] == 1
|
|
|
|
remaining = db.session.scalar(
|
|
db.select(db.func.count(Sighting.id))
|
|
)
|
|
assert remaining == 1
|
|
|
|
|
|
def test_cleanup_deletes_old_probes(app):
|
|
"""Probes older than retention period are deleted."""
|
|
sensor_id, device_id = _setup_sensor_and_device(app)
|
|
with app.app_context():
|
|
db.session.add(Probe(
|
|
device_id=device_id, sensor_id=sensor_id,
|
|
ssid='OldNet', rssi=-50, channel=6,
|
|
timestamp=datetime.now(UTC) - timedelta(days=30),
|
|
))
|
|
db.session.add(Probe(
|
|
device_id=device_id, sensor_id=sensor_id,
|
|
ssid='NewNet', rssi=-40, channel=1,
|
|
timestamp=datetime.now(UTC) - timedelta(days=1),
|
|
))
|
|
db.session.commit()
|
|
|
|
counts = cleanup_old_data()
|
|
assert counts['probes'] == 1
|
|
|
|
remaining = db.session.scalar(
|
|
db.select(db.func.count(Probe.id))
|
|
)
|
|
assert remaining == 1
|
|
|
|
|
|
def test_cleanup_deletes_old_events(app):
|
|
"""Events older than 60 days are deleted."""
|
|
sensor_id, _ = _setup_sensor_and_device(app)
|
|
with app.app_context():
|
|
db.session.add(Event(
|
|
sensor_id=sensor_id, event_type='presence',
|
|
timestamp=datetime.now(UTC) - timedelta(days=90),
|
|
))
|
|
db.session.add(Event(
|
|
sensor_id=sensor_id, event_type='presence',
|
|
timestamp=datetime.now(UTC) - timedelta(days=10),
|
|
))
|
|
db.session.commit()
|
|
|
|
counts = cleanup_old_data()
|
|
assert counts['events'] == 1
|
|
|
|
remaining = db.session.scalar(
|
|
db.select(db.func.count(Event.id))
|
|
)
|
|
assert remaining == 1
|
|
|
|
|
|
def test_cleanup_deletes_old_alerts(app):
|
|
"""Alerts older than 365 days are deleted."""
|
|
sensor_id, _ = _setup_sensor_and_device(app)
|
|
with app.app_context():
|
|
db.session.add(Alert(
|
|
sensor_id=sensor_id, alert_type='deauth',
|
|
timestamp=datetime.now(UTC) - timedelta(days=400),
|
|
))
|
|
db.session.add(Alert(
|
|
sensor_id=sensor_id, alert_type='deauth',
|
|
timestamp=datetime.now(UTC) - timedelta(days=100),
|
|
))
|
|
db.session.commit()
|
|
|
|
counts = cleanup_old_data()
|
|
assert counts['alerts'] == 1
|
|
|
|
remaining = db.session.scalar(
|
|
db.select(db.func.count(Alert.id))
|
|
)
|
|
assert remaining == 1
|
|
|
|
|
|
def test_cleanup_no_expired_data(app):
|
|
"""Cleanup with no expired data returns zero counts."""
|
|
with app.app_context():
|
|
counts = cleanup_old_data()
|
|
assert all(v == 0 for v in counts.values())
|
|
|
|
|
|
def test_cleanup_cli_command(app):
|
|
"""CLI command runs and outputs results."""
|
|
runner = app.test_cli_runner()
|
|
result = runner.invoke(args=['cleanup-data'])
|
|
assert result.exit_code == 0
|
|
assert 'No expired data found' in result.output
|