feat: pagination totals, request logging, data retention
Add shared paginate() helper with total count to all list endpoints. Add request logging middleware (method, path, status, duration, IP). Add data retention service with configurable thresholds and CLI command.
This commit is contained in:
6
Makefile
6
Makefile
@@ -1,4 +1,4 @@
|
||||
.PHONY: help build run dev stop logs test migrate clean install start restart status oui
|
||||
.PHONY: help build run dev stop logs test migrate clean install start restart status oui cleanup
|
||||
|
||||
APP_NAME := esp32-web
|
||||
PORT := 5500
|
||||
@@ -26,6 +26,7 @@ help:
|
||||
@echo " make install Install with dev dependencies"
|
||||
@echo " make test Run tests"
|
||||
@echo " make oui Download OUI database"
|
||||
@echo " make cleanup Delete expired data"
|
||||
@echo " make clean Remove cache files"
|
||||
@echo ""
|
||||
@echo "Container:"
|
||||
@@ -85,6 +86,9 @@ test:
|
||||
oui:
|
||||
flask --app src/esp32_web download-oui
|
||||
|
||||
cleanup:
|
||||
flask --app src/esp32_web cleanup-data
|
||||
|
||||
migrate:
|
||||
flask --app src/esp32_web db upgrade
|
||||
|
||||
|
||||
@@ -1,12 +1,16 @@
|
||||
"""ESP32-Web Flask Application."""
|
||||
import click
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime, UTC
|
||||
from flask import Flask, Response, send_from_directory
|
||||
from flask import Flask, Response, request, send_from_directory
|
||||
from pathlib import Path
|
||||
|
||||
from .config import Config
|
||||
from .extensions import db, migrate
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Track app start time
|
||||
_start_time = None
|
||||
|
||||
@@ -27,6 +31,21 @@ def create_app(config_class=Config):
|
||||
from .api import bp as api_bp
|
||||
app.register_blueprint(api_bp, url_prefix='/api/v1')
|
||||
|
||||
# Request logging
|
||||
@app.before_request
|
||||
def _start_timer():
|
||||
request._start_time = time.monotonic()
|
||||
|
||||
@app.after_request
|
||||
def _log_request(response):
|
||||
if request.path == '/health':
|
||||
return response
|
||||
duration_ms = (time.monotonic() - getattr(request, '_start_time', time.monotonic())) * 1000
|
||||
logger.info('%s %s %s %.0fms %s',
|
||||
request.method, request.path, response.status_code,
|
||||
duration_ms, request.remote_addr)
|
||||
return response
|
||||
|
||||
# Health check with uptime
|
||||
@app.route('/health')
|
||||
def health():
|
||||
@@ -108,4 +127,16 @@ def create_app(config_class=Config):
|
||||
counts = update_all_heartbeats()
|
||||
click.echo(f"Sensors: {counts['online']} online, {counts['stale']} stale, {counts['offline']} offline")
|
||||
|
||||
@app.cli.command('cleanup-data')
|
||||
def cleanup_data_cmd():
|
||||
"""Delete data older than retention thresholds."""
|
||||
from .services.retention import cleanup_old_data
|
||||
counts = cleanup_old_data()
|
||||
total = sum(counts.values())
|
||||
parts = [f"{name}: {n}" for name, n in counts.items() if n > 0]
|
||||
if parts:
|
||||
click.echo(f"Deleted {total} rows ({', '.join(parts)})")
|
||||
else:
|
||||
click.echo("No expired data found")
|
||||
|
||||
return app
|
||||
|
||||
@@ -1,6 +1,27 @@
|
||||
"""API Blueprint."""
|
||||
from flask import Blueprint
|
||||
from flask import Blueprint, request
|
||||
from ..extensions import db
|
||||
|
||||
bp = Blueprint('api', __name__)
|
||||
|
||||
|
||||
def paginate(query, schema_fn):
|
||||
"""Apply limit/offset pagination to a query and return items with metadata.
|
||||
|
||||
Returns dict with 'items', 'total', 'limit', 'offset'.
|
||||
"""
|
||||
limit = min(request.args.get('limit', 100, type=int), 1000)
|
||||
offset = request.args.get('offset', 0, type=int)
|
||||
total = db.session.scalar(
|
||||
db.select(db.func.count()).select_from(query.subquery())
|
||||
)
|
||||
results = db.session.scalars(query.limit(limit).offset(offset)).all()
|
||||
return {
|
||||
'items': [schema_fn(r) for r in results],
|
||||
'total': total,
|
||||
'limit': limit,
|
||||
'offset': offset,
|
||||
}
|
||||
|
||||
|
||||
from . import sensors, devices, alerts, events, probes, stats, export # noqa: E402, F401
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
"""Alert endpoints."""
|
||||
from datetime import datetime, timedelta, UTC
|
||||
from flask import request
|
||||
from . import bp
|
||||
from . import bp, paginate
|
||||
from ..models import Alert
|
||||
from ..extensions import db
|
||||
|
||||
@@ -12,8 +12,6 @@ def list_alerts():
|
||||
alert_type = request.args.get('type')
|
||||
sensor_id = request.args.get('sensor_id', type=int)
|
||||
hours = request.args.get('hours', 24, type=int)
|
||||
limit = min(int(request.args.get('limit', 100)), 1000)
|
||||
offset = int(request.args.get('offset', 0))
|
||||
|
||||
since = datetime.now(UTC) - timedelta(hours=hours)
|
||||
query = db.select(Alert).where(Alert.timestamp >= since).order_by(Alert.timestamp.desc())
|
||||
@@ -23,7 +21,6 @@ def list_alerts():
|
||||
if sensor_id:
|
||||
query = query.where(Alert.sensor_id == sensor_id)
|
||||
|
||||
query = query.limit(limit).offset(offset)
|
||||
alerts = db.session.scalars(query).all()
|
||||
|
||||
return {'alerts': [a.to_dict() for a in alerts], 'limit': limit, 'offset': offset}
|
||||
result = paginate(query, Alert.to_dict)
|
||||
return {'alerts': result['items'], 'total': result['total'],
|
||||
'limit': result['limit'], 'offset': result['offset']}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""Device endpoints."""
|
||||
from flask import request
|
||||
from . import bp
|
||||
from . import bp, paginate
|
||||
from ..models import Device, Sighting
|
||||
from ..extensions import db
|
||||
from ..services.device_service import enrich_device
|
||||
@@ -10,16 +10,14 @@ from ..services.device_service import enrich_device
|
||||
def list_devices():
|
||||
"""List all devices."""
|
||||
device_type = request.args.get('type') # 'ble' or 'wifi'
|
||||
limit = min(int(request.args.get('limit', 100)), 1000)
|
||||
offset = int(request.args.get('offset', 0))
|
||||
|
||||
query = db.select(Device).order_by(Device.last_seen.desc())
|
||||
if device_type:
|
||||
query = query.where(Device.device_type == device_type)
|
||||
query = query.limit(limit).offset(offset)
|
||||
|
||||
devices = db.session.scalars(query).all()
|
||||
return {'devices': [enrich_device(d) for d in devices], 'limit': limit, 'offset': offset}
|
||||
result = paginate(query, enrich_device)
|
||||
return {'devices': result['items'], 'total': result['total'],
|
||||
'limit': result['limit'], 'offset': result['offset']}
|
||||
|
||||
|
||||
@bp.route('/devices/<mac>')
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
"""Event endpoints."""
|
||||
from datetime import datetime, timedelta, UTC
|
||||
from flask import request
|
||||
from . import bp
|
||||
from . import bp, paginate
|
||||
from ..models import Event
|
||||
from ..extensions import db
|
||||
|
||||
@@ -12,8 +12,6 @@ def list_events():
|
||||
event_type = request.args.get('type')
|
||||
sensor_id = request.args.get('sensor_id', type=int)
|
||||
hours = request.args.get('hours', 24, type=int)
|
||||
limit = min(int(request.args.get('limit', 100)), 1000)
|
||||
offset = int(request.args.get('offset', 0))
|
||||
|
||||
since = datetime.now(UTC) - timedelta(hours=hours)
|
||||
query = db.select(Event).where(Event.timestamp >= since).order_by(Event.timestamp.desc())
|
||||
@@ -23,7 +21,6 @@ def list_events():
|
||||
if sensor_id:
|
||||
query = query.where(Event.sensor_id == sensor_id)
|
||||
|
||||
query = query.limit(limit).offset(offset)
|
||||
events = db.session.scalars(query).all()
|
||||
|
||||
return {'events': [e.to_dict() for e in events], 'limit': limit, 'offset': offset}
|
||||
result = paginate(query, Event.to_dict)
|
||||
return {'events': result['items'], 'total': result['total'],
|
||||
'limit': result['limit'], 'offset': result['offset']}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
from datetime import datetime, timedelta, UTC
|
||||
from flask import request
|
||||
from sqlalchemy import func
|
||||
from . import bp
|
||||
from . import bp, paginate
|
||||
from ..models import Probe, Device
|
||||
from ..extensions import db
|
||||
|
||||
@@ -12,8 +12,6 @@ def list_probes():
|
||||
"""List probe requests."""
|
||||
ssid = request.args.get('ssid')
|
||||
hours = request.args.get('hours', 24, type=int)
|
||||
limit = min(int(request.args.get('limit', 100)), 1000)
|
||||
offset = int(request.args.get('offset', 0))
|
||||
|
||||
since = datetime.now(UTC) - timedelta(hours=hours)
|
||||
query = db.select(Probe).where(Probe.timestamp >= since).order_by(Probe.timestamp.desc())
|
||||
@@ -21,10 +19,9 @@ def list_probes():
|
||||
if ssid:
|
||||
query = query.where(Probe.ssid == ssid)
|
||||
|
||||
query = query.limit(limit).offset(offset)
|
||||
probes = db.session.scalars(query).all()
|
||||
|
||||
return {'probes': [p.to_dict() for p in probes], 'limit': limit, 'offset': offset}
|
||||
result = paginate(query, Probe.to_dict)
|
||||
return {'probes': result['items'], 'total': result['total'],
|
||||
'limit': result['limit'], 'offset': result['offset']}
|
||||
|
||||
|
||||
@bp.route('/probes/ssids')
|
||||
|
||||
@@ -3,7 +3,7 @@ import json
|
||||
import socket
|
||||
from datetime import datetime, timedelta, UTC
|
||||
from flask import request, current_app
|
||||
from . import bp
|
||||
from . import bp, paginate
|
||||
from ..models import Sensor, Event, Sighting, Alert
|
||||
from ..extensions import db
|
||||
from ..services.heartbeat import get_heartbeat_summary, update_all_heartbeats
|
||||
@@ -12,8 +12,10 @@ from ..services.heartbeat import get_heartbeat_summary, update_all_heartbeats
|
||||
@bp.route('/sensors')
|
||||
def list_sensors():
|
||||
"""List all sensors."""
|
||||
sensors = db.session.scalars(db.select(Sensor).order_by(Sensor.hostname)).all()
|
||||
return {'sensors': [s.to_dict() for s in sensors]}
|
||||
query = db.select(Sensor).order_by(Sensor.hostname)
|
||||
result = paginate(query, Sensor.to_dict)
|
||||
return {'sensors': result['items'], 'total': result['total'],
|
||||
'limit': result['limit'], 'offset': result['offset']}
|
||||
|
||||
|
||||
@bp.route('/sensors/<hostname>')
|
||||
|
||||
@@ -13,6 +13,12 @@ class Config:
|
||||
SENSOR_CMD_PORT = int(os.environ.get('CMD_PORT', 5501))
|
||||
SENSOR_TIMEOUT = int(os.environ.get('SENSOR_TIMEOUT', 60))
|
||||
|
||||
# Data retention (days)
|
||||
RETENTION_SIGHTINGS_DAYS = int(os.environ.get('RETENTION_SIGHTINGS_DAYS', 14))
|
||||
RETENTION_PROBES_DAYS = int(os.environ.get('RETENTION_PROBES_DAYS', 14))
|
||||
RETENTION_EVENTS_DAYS = int(os.environ.get('RETENTION_EVENTS_DAYS', 60))
|
||||
RETENTION_ALERTS_DAYS = int(os.environ.get('RETENTION_ALERTS_DAYS', 365))
|
||||
|
||||
|
||||
class TestConfig(Config):
|
||||
"""Testing configuration."""
|
||||
|
||||
35
src/esp32_web/services/retention.py
Normal file
35
src/esp32_web/services/retention.py
Normal file
@@ -0,0 +1,35 @@
|
||||
"""Data retention service."""
|
||||
from datetime import datetime, UTC, timedelta
|
||||
from flask import current_app
|
||||
from ..extensions import db
|
||||
from ..models import Sighting, Probe, Event, Alert
|
||||
|
||||
|
||||
def cleanup_old_data() -> dict:
|
||||
"""Delete rows older than configured retention periods.
|
||||
|
||||
Returns dict with counts deleted per table.
|
||||
"""
|
||||
now = datetime.now(UTC)
|
||||
counts = {}
|
||||
|
||||
tables = [
|
||||
('sightings', Sighting, Sighting.timestamp,
|
||||
current_app.config['RETENTION_SIGHTINGS_DAYS']),
|
||||
('probes', Probe, Probe.timestamp,
|
||||
current_app.config['RETENTION_PROBES_DAYS']),
|
||||
('events', Event, Event.timestamp,
|
||||
current_app.config['RETENTION_EVENTS_DAYS']),
|
||||
('alerts', Alert, Alert.timestamp,
|
||||
current_app.config['RETENTION_ALERTS_DAYS']),
|
||||
]
|
||||
|
||||
for name, model, ts_col, days in tables:
|
||||
cutoff = now - timedelta(days=days)
|
||||
result = db.session.execute(
|
||||
db.delete(model).where(ts_col < cutoff)
|
||||
)
|
||||
counts[name] = result.rowcount
|
||||
|
||||
db.session.commit()
|
||||
return counts
|
||||
128
tests/test_api/test_pagination.py
Normal file
128
tests/test_api/test_pagination.py
Normal file
@@ -0,0 +1,128 @@
|
||||
"""Pagination tests for all list endpoints."""
|
||||
from datetime import datetime, UTC
|
||||
from esp32_web.extensions import db
|
||||
from esp32_web.models import Sensor, Device, Alert, Event, Probe
|
||||
|
||||
|
||||
def _create_sensors(app, n):
|
||||
with app.app_context():
|
||||
for i in range(n):
|
||||
db.session.add(Sensor(hostname=f'sensor-{i:03d}', ip=f'192.168.1.{i}'))
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def test_sensors_pagination_defaults(client, app):
|
||||
"""Sensors endpoint returns total, limit, offset."""
|
||||
_create_sensors(app, 3)
|
||||
resp = client.get('/api/v1/sensors')
|
||||
assert resp.status_code == 200
|
||||
assert resp.json['total'] == 3
|
||||
assert resp.json['limit'] == 100
|
||||
assert resp.json['offset'] == 0
|
||||
assert len(resp.json['sensors']) == 3
|
||||
|
||||
|
||||
def test_sensors_pagination_limit(client, app):
|
||||
"""Sensors limit param restricts returned items."""
|
||||
_create_sensors(app, 5)
|
||||
resp = client.get('/api/v1/sensors?limit=2')
|
||||
assert resp.json['total'] == 5
|
||||
assert resp.json['limit'] == 2
|
||||
assert len(resp.json['sensors']) == 2
|
||||
|
||||
|
||||
def test_sensors_pagination_offset(client, app):
|
||||
"""Sensors offset param skips items."""
|
||||
_create_sensors(app, 5)
|
||||
resp = client.get('/api/v1/sensors?limit=2&offset=3')
|
||||
assert resp.json['total'] == 5
|
||||
assert resp.json['offset'] == 3
|
||||
assert len(resp.json['sensors']) == 2
|
||||
|
||||
|
||||
def test_sensors_pagination_max_limit(client, app):
|
||||
"""Limit is capped at 1000."""
|
||||
_create_sensors(app, 1)
|
||||
resp = client.get('/api/v1/sensors?limit=5000')
|
||||
assert resp.json['limit'] == 1000
|
||||
|
||||
|
||||
def test_devices_pagination(client, app):
|
||||
"""Devices endpoint includes total count."""
|
||||
with app.app_context():
|
||||
for i in range(3):
|
||||
db.session.add(Device(
|
||||
mac=f'aa:bb:cc:dd:ee:{i:02x}',
|
||||
device_type='wifi',
|
||||
last_seen=datetime.now(UTC),
|
||||
))
|
||||
db.session.commit()
|
||||
|
||||
resp = client.get('/api/v1/devices?limit=2')
|
||||
assert resp.status_code == 200
|
||||
assert resp.json['total'] == 3
|
||||
assert len(resp.json['devices']) == 2
|
||||
|
||||
|
||||
def test_alerts_pagination(client, app):
|
||||
"""Alerts endpoint includes total count."""
|
||||
with app.app_context():
|
||||
sensor = Sensor(hostname='s1', ip='10.0.0.1')
|
||||
db.session.add(sensor)
|
||||
db.session.flush()
|
||||
for _ in range(4):
|
||||
db.session.add(Alert(
|
||||
sensor_id=sensor.id,
|
||||
alert_type='deauth',
|
||||
timestamp=datetime.now(UTC),
|
||||
))
|
||||
db.session.commit()
|
||||
|
||||
resp = client.get('/api/v1/alerts?limit=2&hours=1')
|
||||
assert resp.status_code == 200
|
||||
assert resp.json['total'] == 4
|
||||
assert len(resp.json['alerts']) == 2
|
||||
|
||||
|
||||
def test_events_pagination(client, app):
|
||||
"""Events endpoint includes total count."""
|
||||
with app.app_context():
|
||||
sensor = Sensor(hostname='s1', ip='10.0.0.1')
|
||||
db.session.add(sensor)
|
||||
db.session.flush()
|
||||
for _ in range(3):
|
||||
db.session.add(Event(
|
||||
sensor_id=sensor.id,
|
||||
event_type='presence',
|
||||
timestamp=datetime.now(UTC),
|
||||
))
|
||||
db.session.commit()
|
||||
|
||||
resp = client.get('/api/v1/events?hours=1')
|
||||
assert resp.status_code == 200
|
||||
assert resp.json['total'] == 3
|
||||
|
||||
|
||||
def test_probes_pagination(client, app):
|
||||
"""Probes endpoint includes total count."""
|
||||
with app.app_context():
|
||||
sensor = Sensor(hostname='s1', ip='10.0.0.1')
|
||||
device = Device(mac='aa:bb:cc:dd:ee:ff', device_type='wifi',
|
||||
last_seen=datetime.now(UTC))
|
||||
db.session.add_all([sensor, device])
|
||||
db.session.flush()
|
||||
for _ in range(3):
|
||||
db.session.add(Probe(
|
||||
device_id=device.id,
|
||||
sensor_id=sensor.id,
|
||||
ssid='TestNet',
|
||||
rssi=-50,
|
||||
channel=6,
|
||||
timestamp=datetime.now(UTC),
|
||||
))
|
||||
db.session.commit()
|
||||
|
||||
resp = client.get('/api/v1/probes?hours=1&limit=2')
|
||||
assert resp.status_code == 200
|
||||
assert resp.json['total'] == 3
|
||||
assert len(resp.json['probes']) == 2
|
||||
@@ -8,7 +8,10 @@ def test_list_sensors_empty(client):
|
||||
"""Test listing sensors when empty."""
|
||||
response = client.get('/api/v1/sensors')
|
||||
assert response.status_code == 200
|
||||
assert response.json == {'sensors': []}
|
||||
assert response.json['sensors'] == []
|
||||
assert response.json['total'] == 0
|
||||
assert response.json['limit'] == 100
|
||||
assert response.json['offset'] == 0
|
||||
|
||||
|
||||
def test_get_sensor_not_found(client):
|
||||
|
||||
0
tests/test_services/__init__.py
Normal file
0
tests/test_services/__init__.py
Normal file
127
tests/test_services/test_retention.py
Normal file
127
tests/test_services/test_retention.py
Normal file
@@ -0,0 +1,127 @@
|
||||
"""Data retention service tests."""
|
||||
from datetime import datetime, UTC, timedelta
|
||||
from esp32_web.extensions import db
|
||||
from esp32_web.models import Sensor, Device, Sighting, Probe, Event, Alert
|
||||
from esp32_web.services.retention import cleanup_old_data
|
||||
|
||||
|
||||
def _setup_sensor_and_device(app):
|
||||
"""Create a sensor and device for FK references."""
|
||||
with app.app_context():
|
||||
sensor = Sensor(hostname='s1', ip='10.0.0.1')
|
||||
device = Device(mac='aa:bb:cc:dd:ee:ff', device_type='wifi',
|
||||
last_seen=datetime.now(UTC))
|
||||
db.session.add_all([sensor, device])
|
||||
db.session.commit()
|
||||
return sensor.id, device.id
|
||||
|
||||
|
||||
def test_cleanup_deletes_old_sightings(app):
|
||||
"""Sightings older than retention period are deleted."""
|
||||
sensor_id, device_id = _setup_sensor_and_device(app)
|
||||
with app.app_context():
|
||||
# Old sighting (30 days ago, retention=14)
|
||||
db.session.add(Sighting(
|
||||
device_id=device_id, sensor_id=sensor_id, rssi=-50,
|
||||
timestamp=datetime.now(UTC) - timedelta(days=30),
|
||||
))
|
||||
# Recent sighting (1 day ago)
|
||||
db.session.add(Sighting(
|
||||
device_id=device_id, sensor_id=sensor_id, rssi=-60,
|
||||
timestamp=datetime.now(UTC) - timedelta(days=1),
|
||||
))
|
||||
db.session.commit()
|
||||
|
||||
counts = cleanup_old_data()
|
||||
assert counts['sightings'] == 1
|
||||
|
||||
remaining = db.session.scalar(
|
||||
db.select(db.func.count(Sighting.id))
|
||||
)
|
||||
assert remaining == 1
|
||||
|
||||
|
||||
def test_cleanup_deletes_old_probes(app):
|
||||
"""Probes older than retention period are deleted."""
|
||||
sensor_id, device_id = _setup_sensor_and_device(app)
|
||||
with app.app_context():
|
||||
db.session.add(Probe(
|
||||
device_id=device_id, sensor_id=sensor_id,
|
||||
ssid='OldNet', rssi=-50, channel=6,
|
||||
timestamp=datetime.now(UTC) - timedelta(days=30),
|
||||
))
|
||||
db.session.add(Probe(
|
||||
device_id=device_id, sensor_id=sensor_id,
|
||||
ssid='NewNet', rssi=-40, channel=1,
|
||||
timestamp=datetime.now(UTC) - timedelta(days=1),
|
||||
))
|
||||
db.session.commit()
|
||||
|
||||
counts = cleanup_old_data()
|
||||
assert counts['probes'] == 1
|
||||
|
||||
remaining = db.session.scalar(
|
||||
db.select(db.func.count(Probe.id))
|
||||
)
|
||||
assert remaining == 1
|
||||
|
||||
|
||||
def test_cleanup_deletes_old_events(app):
|
||||
"""Events older than 60 days are deleted."""
|
||||
sensor_id, _ = _setup_sensor_and_device(app)
|
||||
with app.app_context():
|
||||
db.session.add(Event(
|
||||
sensor_id=sensor_id, event_type='presence',
|
||||
timestamp=datetime.now(UTC) - timedelta(days=90),
|
||||
))
|
||||
db.session.add(Event(
|
||||
sensor_id=sensor_id, event_type='presence',
|
||||
timestamp=datetime.now(UTC) - timedelta(days=10),
|
||||
))
|
||||
db.session.commit()
|
||||
|
||||
counts = cleanup_old_data()
|
||||
assert counts['events'] == 1
|
||||
|
||||
remaining = db.session.scalar(
|
||||
db.select(db.func.count(Event.id))
|
||||
)
|
||||
assert remaining == 1
|
||||
|
||||
|
||||
def test_cleanup_deletes_old_alerts(app):
|
||||
"""Alerts older than 365 days are deleted."""
|
||||
sensor_id, _ = _setup_sensor_and_device(app)
|
||||
with app.app_context():
|
||||
db.session.add(Alert(
|
||||
sensor_id=sensor_id, alert_type='deauth',
|
||||
timestamp=datetime.now(UTC) - timedelta(days=400),
|
||||
))
|
||||
db.session.add(Alert(
|
||||
sensor_id=sensor_id, alert_type='deauth',
|
||||
timestamp=datetime.now(UTC) - timedelta(days=100),
|
||||
))
|
||||
db.session.commit()
|
||||
|
||||
counts = cleanup_old_data()
|
||||
assert counts['alerts'] == 1
|
||||
|
||||
remaining = db.session.scalar(
|
||||
db.select(db.func.count(Alert.id))
|
||||
)
|
||||
assert remaining == 1
|
||||
|
||||
|
||||
def test_cleanup_no_expired_data(app):
|
||||
"""Cleanup with no expired data returns zero counts."""
|
||||
with app.app_context():
|
||||
counts = cleanup_old_data()
|
||||
assert all(v == 0 for v in counts.values())
|
||||
|
||||
|
||||
def test_cleanup_cli_command(app):
|
||||
"""CLI command runs and outputs results."""
|
||||
runner = app.test_cli_runner()
|
||||
result = runner.invoke(args=['cleanup-data'])
|
||||
assert result.exit_code == 0
|
||||
assert 'No expired data found' in result.output
|
||||
Reference in New Issue
Block a user