feat: Initial project scaffold

Flask API backend for ESP32 sensor fleet:
- App factory pattern with blueprints
- SQLAlchemy 2.x models (Sensor, Device, Sighting, Alert, Event, Probe)
- UDP collector for sensor data streams
- REST API endpoints for sensors, devices, alerts, events, probes, stats
- pytest setup with fixtures
- Containerfile for podman deployment
- Makefile for common tasks
This commit is contained in:
user
2026-02-05 20:56:52 +01:00
commit a676136f5d
34 changed files with 1054 additions and 0 deletions

33
src/esp32_web/__init__.py Normal file
View File

@@ -0,0 +1,33 @@
"""ESP32-Web Flask Application."""
from flask import Flask
from .config import Config
from .extensions import db, migrate
def create_app(config_class=Config):
"""Application factory."""
app = Flask(__name__)
app.config.from_object(config_class)
# Initialize extensions
db.init_app(app)
migrate.init_app(app, db)
# Register blueprints
from .api import bp as api_bp
app.register_blueprint(api_bp, url_prefix='/api/v1')
# Health check
@app.route('/health')
def health():
return {'status': 'ok'}
# Start UDP collector in non-testing mode
if not app.config.get('TESTING'):
from .collector import collector
collector.init_app(app)
with app.app_context():
collector.start()
return app

View File

@@ -0,0 +1,6 @@
"""API Blueprint."""
from flask import Blueprint
bp = Blueprint('api', __name__)
from . import sensors, devices, alerts, events, probes, stats # noqa: E402, F401

View File

@@ -0,0 +1,29 @@
"""Alert endpoints."""
from datetime import datetime, timedelta, UTC
from flask import request
from . import bp
from ..models import Alert
from ..extensions import db
@bp.route('/alerts')
def list_alerts():
"""List alerts with filters."""
alert_type = request.args.get('type')
sensor_id = request.args.get('sensor_id', type=int)
hours = request.args.get('hours', 24, type=int)
limit = min(int(request.args.get('limit', 100)), 1000)
offset = int(request.args.get('offset', 0))
since = datetime.now(UTC) - timedelta(hours=hours)
query = db.select(Alert).where(Alert.timestamp >= since).order_by(Alert.timestamp.desc())
if alert_type:
query = query.where(Alert.alert_type == alert_type)
if sensor_id:
query = query.where(Alert.sensor_id == sensor_id)
query = query.limit(limit).offset(offset)
alerts = db.session.scalars(query).all()
return {'alerts': [a.to_dict() for a in alerts], 'limit': limit, 'offset': offset}

View File

@@ -0,0 +1,42 @@
"""Device endpoints."""
from flask import request
from . import bp
from ..models import Device, Sighting
from ..extensions import db
@bp.route('/devices')
def list_devices():
"""List all devices."""
device_type = request.args.get('type') # 'ble' or 'wifi'
limit = min(int(request.args.get('limit', 100)), 1000)
offset = int(request.args.get('offset', 0))
query = db.select(Device).order_by(Device.last_seen.desc())
if device_type:
query = query.where(Device.device_type == device_type)
query = query.limit(limit).offset(offset)
devices = db.session.scalars(query).all()
return {'devices': [d.to_dict() for d in devices], 'limit': limit, 'offset': offset}
@bp.route('/devices/<mac>')
def get_device(mac):
"""Get device by MAC."""
mac = mac.lower()
device = db.session.scalar(db.select(Device).where(Device.mac == mac))
if not device:
return {'error': 'Device not found'}, 404
# Include recent sightings
sightings = db.session.scalars(
db.select(Sighting)
.where(Sighting.device_id == device.id)
.order_by(Sighting.timestamp.desc())
.limit(20)
).all()
result = device.to_dict()
result['sightings'] = [s.to_dict() for s in sightings]
return result

View File

@@ -0,0 +1,29 @@
"""Event endpoints."""
from datetime import datetime, timedelta, UTC
from flask import request
from . import bp
from ..models import Event
from ..extensions import db
@bp.route('/events')
def list_events():
"""List sensor events."""
event_type = request.args.get('type')
sensor_id = request.args.get('sensor_id', type=int)
hours = request.args.get('hours', 24, type=int)
limit = min(int(request.args.get('limit', 100)), 1000)
offset = int(request.args.get('offset', 0))
since = datetime.now(UTC) - timedelta(hours=hours)
query = db.select(Event).where(Event.timestamp >= since).order_by(Event.timestamp.desc())
if event_type:
query = query.where(Event.event_type == event_type)
if sensor_id:
query = query.where(Event.sensor_id == sensor_id)
query = query.limit(limit).offset(offset)
events = db.session.scalars(query).all()
return {'events': [e.to_dict() for e in events], 'limit': limit, 'offset': offset}

View File

@@ -0,0 +1,44 @@
"""Probe request endpoints."""
from datetime import datetime, timedelta, UTC
from flask import request
from sqlalchemy import func
from . import bp
from ..models import Probe, Device
from ..extensions import db
@bp.route('/probes')
def list_probes():
"""List probe requests."""
ssid = request.args.get('ssid')
hours = request.args.get('hours', 24, type=int)
limit = min(int(request.args.get('limit', 100)), 1000)
offset = int(request.args.get('offset', 0))
since = datetime.now(UTC) - timedelta(hours=hours)
query = db.select(Probe).where(Probe.timestamp >= since).order_by(Probe.timestamp.desc())
if ssid:
query = query.where(Probe.ssid == ssid)
query = query.limit(limit).offset(offset)
probes = db.session.scalars(query).all()
return {'probes': [p.to_dict() for p in probes], 'limit': limit, 'offset': offset}
@bp.route('/probes/ssids')
def list_ssids():
"""List SSIDs with counts."""
hours = request.args.get('hours', 24, type=int)
since = datetime.now(UTC) - timedelta(hours=hours)
results = db.session.execute(
db.select(Probe.ssid, func.count(Probe.id).label('count'))
.where(Probe.timestamp >= since)
.group_by(Probe.ssid)
.order_by(func.count(Probe.id).desc())
.limit(100)
).all()
return {'ssids': [{'ssid': r.ssid, 'count': r.count} for r in results]}

View File

@@ -0,0 +1,53 @@
"""Sensor endpoints."""
import socket
from flask import request, current_app
from . import bp
from ..models import Sensor
from ..extensions import db
@bp.route('/sensors')
def list_sensors():
"""List all sensors."""
sensors = db.session.scalars(db.select(Sensor).order_by(Sensor.hostname)).all()
return {'sensors': [s.to_dict() for s in sensors]}
@bp.route('/sensors/<hostname>')
def get_sensor(hostname):
"""Get sensor by hostname."""
sensor = db.session.scalar(db.select(Sensor).where(Sensor.hostname == hostname))
if not sensor:
return {'error': 'Sensor not found'}, 404
return sensor.to_dict()
@bp.route('/sensors/<hostname>/command', methods=['POST'])
def send_command(hostname):
"""Send UDP command to sensor."""
sensor = db.session.scalar(db.select(Sensor).where(Sensor.hostname == hostname))
if not sensor:
return {'error': 'Sensor not found'}, 404
data = request.get_json()
if not data or 'command' not in data:
return {'error': 'Missing command'}, 400
command = data['command'].strip().upper()
# Whitelist allowed commands
allowed = ('STATUS', 'REBOOT', 'IDENTIFY', 'BLE', 'ADAPTIVE', 'RATE', 'POWER',
'CSIMODE', 'PRESENCE', 'CALIBRATE', 'CHANSCAN')
if not any(command.startswith(a) for a in allowed):
return {'error': 'Command not allowed'}, 403
# Send UDP command
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(2.0)
sock.sendto(command.encode(), (sensor.ip, current_app.config['SENSOR_CMD_PORT']))
sock.close()
except socket.error as e:
return {'error': f'Socket error: {e}'}, 500
return {'status': 'sent', 'command': command}

View File

@@ -0,0 +1,43 @@
"""Statistics endpoints."""
from datetime import datetime, timedelta, UTC
from flask import request
from sqlalchemy import func
from . import bp
from ..models import Sensor, Device, Alert, Event, Probe
from ..extensions import db
@bp.route('/stats')
def get_stats():
"""Get aggregate statistics."""
hours = request.args.get('hours', 24, type=int)
since = datetime.now(UTC) - timedelta(hours=hours)
sensors_total = db.session.scalar(db.select(func.count(Sensor.id)))
sensors_online = db.session.scalar(
db.select(func.count(Sensor.id)).where(Sensor.status == 'online')
)
devices_total = db.session.scalar(db.select(func.count(Device.id)))
devices_ble = db.session.scalar(
db.select(func.count(Device.id)).where(Device.device_type == 'ble')
)
devices_wifi = db.session.scalar(
db.select(func.count(Device.id)).where(Device.device_type == 'wifi')
)
alerts_count = db.session.scalar(
db.select(func.count(Alert.id)).where(Alert.timestamp >= since)
)
events_count = db.session.scalar(
db.select(func.count(Event.id)).where(Event.timestamp >= since)
)
probes_count = db.session.scalar(
db.select(func.count(Probe.id)).where(Probe.timestamp >= since)
)
return {
'sensors': {'total': sensors_total, 'online': sensors_online},
'devices': {'total': devices_total, 'ble': devices_ble, 'wifi': devices_wifi},
'alerts': {'count': alerts_count, 'hours': hours},
'events': {'count': events_count, 'hours': hours},
'probes': {'count': probes_count, 'hours': hours},
}

View File

@@ -0,0 +1,6 @@
"""UDP Collector."""
from .listener import UDPCollector
collector = UDPCollector()
__all__ = ['collector']

View File

@@ -0,0 +1,67 @@
"""UDP listener for sensor data."""
import logging
import socket
import threading
log = logging.getLogger(__name__)
class UDPCollector:
"""Threaded UDP collector for sensor data streams."""
def __init__(self, app=None):
self.app = app
self._thread = None
self._running = False
self._sock = None
def init_app(self, app):
"""Initialize with Flask app."""
self.app = app
app.extensions['collector'] = self
def start(self):
"""Start the collector thread."""
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._listen, daemon=True, name='udp-collector')
self._thread.start()
log.info('UDP collector started on port %d', self.app.config['UDP_LISTEN_PORT'])
def stop(self):
"""Stop the collector thread."""
self._running = False
if self._sock:
self._sock.close()
def _listen(self):
"""Main listen loop."""
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind(('0.0.0.0', self.app.config['UDP_LISTEN_PORT']))
self._sock.settimeout(1.0)
while self._running:
try:
data, addr = self._sock.recvfrom(2048)
self._handle_packet(data.decode('utf-8', errors='replace'), addr)
except socket.timeout:
continue
except Exception as e:
log.exception('Error receiving packet: %s', e)
self._sock.close()
def _handle_packet(self, data: str, addr: tuple):
"""Handle incoming packet."""
data = data.strip()
if not data:
return
with self.app.app_context():
from .parsers import parse_packet
try:
parse_packet(data, addr)
except Exception as e:
log.exception('Error parsing packet: %s', e)

View File

@@ -0,0 +1,180 @@
"""Packet parsers for sensor data streams."""
import json
import logging
import re
from datetime import datetime, UTC
from ..extensions import db
from ..models import Sensor, Device, Sighting, Alert, Event, Probe
log = logging.getLogger(__name__)
def get_or_create_sensor(hostname: str, ip: str) -> Sensor:
"""Get or create sensor by hostname."""
sensor = db.session.scalar(db.select(Sensor).where(Sensor.hostname == hostname))
if not sensor:
sensor = Sensor(hostname=hostname, ip=ip, status='online')
db.session.add(sensor)
db.session.flush()
else:
sensor.ip = ip
sensor.last_seen = datetime.now(UTC)
sensor.status = 'online'
return sensor
def get_or_create_device(mac: str, device_type: str) -> Device:
"""Get or create device by MAC."""
mac = mac.lower()
device = db.session.scalar(db.select(Device).where(Device.mac == mac))
if not device:
device = Device(mac=mac, device_type=device_type)
db.session.add(device)
db.session.flush()
else:
device.last_seen = datetime.now(UTC)
return device
def parse_packet(data: str, addr: tuple):
"""Parse and store incoming packet."""
ip = addr[0]
if data.startswith('CSI_DATA,'):
parse_csi_data(data, ip)
elif data.startswith('BLE_DATA,'):
parse_ble_data(data, ip)
elif data.startswith('PROBE_DATA,'):
parse_probe_data(data, ip)
elif data.startswith('ALERT_DATA,'):
parse_alert_data(data, ip)
elif data.startswith('EVENT,'):
parse_event(data, ip)
else:
log.debug('Unknown packet type: %s', data[:50])
def parse_csi_data(data: str, ip: str):
"""Parse CSI_DATA packet (just update sensor heartbeat)."""
parts = data.split(',')
if len(parts) < 3:
return
hostname = parts[1]
get_or_create_sensor(hostname, ip)
db.session.commit()
def parse_ble_data(data: str, ip: str):
"""Parse BLE_DATA packet."""
# BLE_DATA,hostname,mac,rssi,type,name,company_id,tx_power,flags
parts = data.split(',')
if len(parts) < 5:
return
hostname = parts[1]
mac = parts[2].lower()
rssi = int(parts[3])
# addr_type = parts[4] # pub/rnd
name = parts[5] if len(parts) > 5 else None
company_id = int(parts[6], 16) if len(parts) > 6 and parts[6].startswith('0x') else None
tx_power = int(parts[7]) if len(parts) > 7 and parts[7] not in ('127', '') else None
sensor = get_or_create_sensor(hostname, ip)
device = get_or_create_device(mac, 'ble')
if name:
device.name = name
if company_id:
device.company_id = company_id
if tx_power:
device.tx_power = tx_power
sighting = Sighting(device_id=device.id, sensor_id=sensor.id, rssi=rssi)
db.session.add(sighting)
db.session.commit()
def parse_probe_data(data: str, ip: str):
"""Parse PROBE_DATA packet."""
# PROBE_DATA,hostname,mac,rssi,ssid,channel
parts = data.split(',')
if len(parts) < 6:
return
hostname = parts[1]
mac = parts[2].lower()
rssi = int(parts[3])
ssid = parts[4]
channel = int(parts[5])
sensor = get_or_create_sensor(hostname, ip)
device = get_or_create_device(mac, 'wifi')
probe = Probe(device_id=device.id, sensor_id=sensor.id, ssid=ssid, rssi=rssi, channel=channel)
db.session.add(probe)
db.session.commit()
def parse_alert_data(data: str, ip: str):
"""Parse ALERT_DATA packet."""
# ALERT_DATA,hostname,type,source,target,rssi OR ALERT_DATA,hostname,deauth_flood,count,window
parts = data.split(',')
if len(parts) < 4:
return
hostname = parts[1]
alert_type = parts[2]
sensor = get_or_create_sensor(hostname, ip)
if alert_type == 'deauth_flood':
flood_count = int(parts[3])
flood_window = int(parts[4]) if len(parts) > 4 else None
alert = Alert(
sensor_id=sensor.id, alert_type=alert_type,
flood_count=flood_count, flood_window=flood_window
)
else:
source_mac = parts[3].lower() if len(parts) > 3 else None
target_mac = parts[4].lower() if len(parts) > 4 else None
rssi = int(parts[5]) if len(parts) > 5 else None
alert = Alert(
sensor_id=sensor.id, alert_type=alert_type,
source_mac=source_mac, target_mac=target_mac, rssi=rssi
)
db.session.add(alert)
db.session.commit()
log.info('Alert: %s from %s', alert_type, hostname)
def parse_event(data: str, ip: str):
"""Parse EVENT packet."""
# EVENT,hostname,key=value key=value ...
parts = data.split(',')
if len(parts) < 3:
return
hostname = parts[1]
payload_str = ','.join(parts[2:])
# Parse key=value pairs
payload = {}
for match in re.finditer(r'(\w+)=([^\s,]+)', payload_str):
key, value = match.groups()
# Try to convert to number
try:
payload[key] = int(value)
except ValueError:
try:
payload[key] = float(value)
except ValueError:
payload[key] = value
# Determine event type from first key
event_type = next(iter(payload.keys()), 'unknown')
sensor = get_or_create_sensor(hostname, ip)
event = Event(sensor_id=sensor.id, event_type=event_type, payload_json=json.dumps(payload))
db.session.add(event)
db.session.commit()

25
src/esp32_web/config.py Normal file
View File

@@ -0,0 +1,25 @@
"""Application configuration."""
import os
class Config:
"""Base configuration."""
SECRET_KEY = os.environ.get('SECRET_KEY', 'dev-key-change-me')
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL', 'sqlite:///esp32.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False
# Network
UDP_LISTEN_PORT = int(os.environ.get('UDP_PORT', 5500))
SENSOR_CMD_PORT = int(os.environ.get('CMD_PORT', 5501))
SENSOR_TIMEOUT = int(os.environ.get('SENSOR_TIMEOUT', 60))
class TestConfig(Config):
"""Testing configuration."""
TESTING = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
class ProdConfig(Config):
"""Production configuration."""
pass

View File

@@ -0,0 +1,6 @@
"""Flask extensions."""
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
db = SQLAlchemy()
migrate = Migrate()

View File

@@ -0,0 +1,9 @@
"""Database models."""
from .sensor import Sensor
from .device import Device
from .sighting import Sighting
from .alert import Alert
from .event import Event
from .probe import Probe
__all__ = ['Sensor', 'Device', 'Sighting', 'Alert', 'Event', 'Probe']

View File

@@ -0,0 +1,37 @@
"""Alert model."""
from datetime import datetime, UTC
from ..extensions import db
class Alert(db.Model):
"""Security alert (deauth, disassoc, flood)."""
__tablename__ = 'alerts'
id: db.Mapped[int] = db.mapped_column(primary_key=True)
sensor_id: db.Mapped[int] = db.mapped_column(db.ForeignKey('sensors.id'), index=True)
alert_type: db.Mapped[str] = db.mapped_column(db.String(16), index=True) # deauth, disassoc, deauth_flood
source_mac: db.Mapped[str | None] = db.mapped_column(db.String(17), nullable=True)
target_mac: db.Mapped[str | None] = db.mapped_column(db.String(17), nullable=True)
rssi: db.Mapped[int | None] = db.mapped_column(nullable=True)
flood_count: db.Mapped[int | None] = db.mapped_column(nullable=True)
flood_window: db.Mapped[int | None] = db.mapped_column(nullable=True)
timestamp: db.Mapped[datetime] = db.mapped_column(default=lambda: datetime.now(UTC), index=True)
# Relationships
sensor = db.relationship('Sensor', back_populates='alerts')
def to_dict(self):
d = {
'id': self.id,
'sensor_id': self.sensor_id,
'type': self.alert_type,
'timestamp': self.timestamp.isoformat(),
}
if self.alert_type == 'deauth_flood':
d['flood_count'] = self.flood_count
d['flood_window'] = self.flood_window
else:
d['source_mac'] = self.source_mac
d['target_mac'] = self.target_mac
d['rssi'] = self.rssi
return d

View File

@@ -0,0 +1,37 @@
"""Device model."""
from datetime import datetime, UTC
from ..extensions import db
class Device(db.Model):
"""Discovered BLE/WiFi device."""
__tablename__ = 'devices'
id: db.Mapped[int] = db.mapped_column(primary_key=True)
mac: db.Mapped[str] = db.mapped_column(db.String(17), unique=True, index=True)
device_type: db.Mapped[str] = db.mapped_column(db.String(8)) # 'ble' or 'wifi'
vendor: db.Mapped[str | None] = db.mapped_column(db.String(64), nullable=True)
name: db.Mapped[str | None] = db.mapped_column(db.String(64), nullable=True)
first_seen: db.Mapped[datetime] = db.mapped_column(default=lambda: datetime.now(UTC))
last_seen: db.Mapped[datetime] = db.mapped_column(default=lambda: datetime.now(UTC))
# BLE-specific fields
company_id: db.Mapped[int | None] = db.mapped_column(nullable=True)
tx_power: db.Mapped[int | None] = db.mapped_column(nullable=True)
# Relationships
sightings = db.relationship('Sighting', back_populates='device', lazy='dynamic')
probes = db.relationship('Probe', back_populates='device', lazy='dynamic')
def to_dict(self):
return {
'id': self.id,
'mac': self.mac,
'type': self.device_type,
'vendor': self.vendor,
'name': self.name,
'first_seen': self.first_seen.isoformat(),
'last_seen': self.last_seen.isoformat(),
'company_id': self.company_id,
'tx_power': self.tx_power,
}

View File

@@ -0,0 +1,27 @@
"""Event model."""
from datetime import datetime, UTC
from ..extensions import db
class Event(db.Model):
"""Sensor event (motion, presence, calibration)."""
__tablename__ = 'events'
id: db.Mapped[int] = db.mapped_column(primary_key=True)
sensor_id: db.Mapped[int] = db.mapped_column(db.ForeignKey('sensors.id'), index=True)
event_type: db.Mapped[str] = db.mapped_column(db.String(32), index=True)
payload_json: db.Mapped[str | None] = db.mapped_column(db.Text, nullable=True)
timestamp: db.Mapped[datetime] = db.mapped_column(default=lambda: datetime.now(UTC), index=True)
# Relationships
sensor = db.relationship('Sensor', back_populates='events')
def to_dict(self):
import json
return {
'id': self.id,
'sensor_id': self.sensor_id,
'type': self.event_type,
'payload': json.loads(self.payload_json) if self.payload_json else {},
'timestamp': self.timestamp.isoformat(),
}

View File

@@ -0,0 +1,31 @@
"""Probe request model."""
from datetime import datetime, UTC
from ..extensions import db
class Probe(db.Model):
"""WiFi probe request."""
__tablename__ = 'probes'
id: db.Mapped[int] = db.mapped_column(primary_key=True)
device_id: db.Mapped[int] = db.mapped_column(db.ForeignKey('devices.id'), index=True)
sensor_id: db.Mapped[int] = db.mapped_column(db.ForeignKey('sensors.id'), index=True)
ssid: db.Mapped[str] = db.mapped_column(db.String(32), index=True)
rssi: db.Mapped[int] = db.mapped_column()
channel: db.Mapped[int] = db.mapped_column()
timestamp: db.Mapped[datetime] = db.mapped_column(default=lambda: datetime.now(UTC), index=True)
# Relationships
device = db.relationship('Device', back_populates='probes')
sensor = db.relationship('Sensor', back_populates='probes')
def to_dict(self):
return {
'id': self.id,
'device_id': self.device_id,
'sensor_id': self.sensor_id,
'ssid': self.ssid,
'rssi': self.rssi,
'channel': self.channel,
'timestamp': self.timestamp.isoformat(),
}

View File

@@ -0,0 +1,30 @@
"""Sensor model."""
from datetime import datetime, UTC
from ..extensions import db
class Sensor(db.Model):
"""ESP32 sensor node."""
__tablename__ = 'sensors'
id: db.Mapped[int] = db.mapped_column(primary_key=True)
hostname: db.Mapped[str] = db.mapped_column(db.String(32), unique=True, index=True)
ip: db.Mapped[str] = db.mapped_column(db.String(15))
last_seen: db.Mapped[datetime] = db.mapped_column(default=lambda: datetime.now(UTC))
status: db.Mapped[str] = db.mapped_column(db.String(16), default='unknown')
config_json: db.Mapped[str | None] = db.mapped_column(db.Text, nullable=True)
# Relationships
sightings = db.relationship('Sighting', back_populates='sensor', lazy='dynamic')
alerts = db.relationship('Alert', back_populates='sensor', lazy='dynamic')
events = db.relationship('Event', back_populates='sensor', lazy='dynamic')
probes = db.relationship('Probe', back_populates='sensor', lazy='dynamic')
def to_dict(self):
return {
'id': self.id,
'hostname': self.hostname,
'ip': self.ip,
'last_seen': self.last_seen.isoformat(),
'status': self.status,
}

View File

@@ -0,0 +1,27 @@
"""Sighting model."""
from datetime import datetime, UTC
from ..extensions import db
class Sighting(db.Model):
"""Device sighting by a sensor."""
__tablename__ = 'sightings'
id: db.Mapped[int] = db.mapped_column(primary_key=True)
device_id: db.Mapped[int] = db.mapped_column(db.ForeignKey('devices.id'), index=True)
sensor_id: db.Mapped[int] = db.mapped_column(db.ForeignKey('sensors.id'), index=True)
rssi: db.Mapped[int] = db.mapped_column()
timestamp: db.Mapped[datetime] = db.mapped_column(default=lambda: datetime.now(UTC), index=True)
# Relationships
device = db.relationship('Device', back_populates='sightings')
sensor = db.relationship('Sensor', back_populates='sightings')
def to_dict(self):
return {
'id': self.id,
'device_id': self.device_id,
'sensor_id': self.sensor_id,
'rssi': self.rssi,
'timestamp': self.timestamp.isoformat(),
}

View File

@@ -0,0 +1 @@
"""Business logic services."""

View File

@@ -0,0 +1 @@
"""Utility modules."""