"""Intelligence API tests.""" from datetime import datetime, UTC, timedelta from esp32_web.extensions import db from esp32_web.models import Device, Probe, Sighting, Sensor def _seed_devices(app): """Create test devices, probes, and sightings.""" with app.app_context(): sensor = Sensor(hostname='test-sensor', ip='192.168.1.1') db.session.add(sensor) db.session.flush() d1 = Device(mac='aa:bb:cc:dd:ee:01', device_type='wifi', vendor='Apple, Inc.', first_seen=datetime.now(UTC), last_seen=datetime.now(UTC)) d2 = Device(mac='aa:bb:cc:dd:ee:02', device_type='wifi', vendor='Samsung', first_seen=datetime.now(UTC), last_seen=datetime.now(UTC)) d3 = Device(mac='aa:bb:cc:dd:ee:03', device_type='ble', company_id=0x004C, first_seen=datetime.now(UTC), last_seen=datetime.now(UTC)) db.session.add_all([d1, d2, d3]) db.session.flush() now = datetime.now(UTC) # Both d1 and d2 probe "HomeNet" db.session.add_all([ Probe(device_id=d1.id, sensor_id=sensor.id, ssid='HomeNet', rssi=-50, channel=6, timestamp=now), Probe(device_id=d2.id, sensor_id=sensor.id, ssid='HomeNet', rssi=-60, channel=6, timestamp=now), Probe(device_id=d1.id, sensor_id=sensor.id, ssid='Office', rssi=-55, channel=1, timestamp=now), ]) # Sightings db.session.add_all([ Sighting(device_id=d1.id, sensor_id=sensor.id, rssi=-50, timestamp=now), Sighting(device_id=d3.id, sensor_id=sensor.id, rssi=-70, timestamp=now), ]) db.session.commit() # -- Vendor Treemap -- def test_vendor_treemap_empty(client): """Test treemap with no devices.""" response = client.get('/api/v1/intelligence/vendor-treemap') assert response.status_code == 200 assert response.json['name'] == 'devices' assert response.json['children'] == [] def test_vendor_treemap_with_data(client, app): """Test treemap returns grouped vendor data.""" _seed_devices(app) response = client.get('/api/v1/intelligence/vendor-treemap') assert response.status_code == 200 data = response.json assert data['name'] == 'devices' assert len(data['children']) > 0 # Find wifi group wifi = next((c for c in data['children'] if c['name'] == 'wifi'), None) assert wifi is not None assert len(wifi['children']) >= 1 # Find ble group ble = next((c for c in data['children'] if c['name'] == 'ble'), None) assert ble is not None assert len(ble['children']) >= 1 # -- SSID Graph -- def test_ssid_graph_empty(client): """Test graph with no probes.""" response = client.get('/api/v1/intelligence/ssid-graph') assert response.status_code == 200 assert response.json['nodes'] == [] assert response.json['links'] == [] def test_ssid_graph_with_data(client, app): """Test graph returns nodes and links for shared SSIDs.""" _seed_devices(app) response = client.get('/api/v1/intelligence/ssid-graph?hours=24') assert response.status_code == 200 data = response.json assert len(data['nodes']) >= 2 # d1 and d2 share "HomeNet" → at least one link assert len(data['links']) >= 1 link = data['links'][0] assert 'HomeNet' in link['shared_ssids'] assert link['weight'] >= 1 # SSID summary present assert len(data['ssids']) >= 1 def test_ssid_graph_params(client, app): """Test graph with custom parameters.""" _seed_devices(app) response = client.get('/api/v1/intelligence/ssid-graph?hours=1&min_shared=5&limit=10') assert response.status_code == 200 # With min_shared=5, no pairs should match assert response.json['links'] == [] # -- Fingerprint Clusters -- def test_fingerprint_clusters_empty(client): """Test clusters with no activity.""" response = client.get('/api/v1/intelligence/fingerprint-clusters') assert response.status_code == 200 assert response.json['clusters'] == [] assert response.json['total_devices'] == 0 def test_fingerprint_clusters_with_data(client, app): """Test clusters group devices by behavior.""" _seed_devices(app) response = client.get('/api/v1/intelligence/fingerprint-clusters?hours=24') assert response.status_code == 200 data = response.json assert data['total_devices'] >= 2 assert data['total_clusters'] >= 1 # Each cluster has expected structure cluster = data['clusters'][0] assert 'label' in cluster assert 'device_count' in cluster assert 'devices' in cluster assert 'centroid' in cluster assert 'probe_rate' in cluster['centroid'] assert 'sighting_rate' in cluster['centroid']