"""Tests for audit logging system.""" from __future__ import annotations import time from typing import TYPE_CHECKING import pytest from app import create_app from app.audit import AuditEvent, AuditOutcome, cleanup_old_audit_logs, log_event, query_audit_log from app.database import get_db if TYPE_CHECKING: from flask import Flask from flask.testing import FlaskClient @pytest.fixture def app() -> Flask: """Create application configured for testing.""" app = create_app("testing") app.config["POW_DIFFICULTY"] = 0 app.config["RATE_LIMIT_ENABLED"] = False app.config["AUDIT_ENABLED"] = True app.config["AUDIT_RETENTION_DAYS"] = 90 return app @pytest.fixture def client(app: Flask) -> FlaskClient: """Create test client.""" return app.test_client() @pytest.fixture def admin_client(app: Flask) -> FlaskClient: """Create test client with admin authentication.""" client = app.test_client() fingerprint = "b" * 40 client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = fingerprint with app.app_context(): app.config["PKI_ENABLED"] = True db = get_db() now = int(time.time()) db.execute( """INSERT OR REPLACE INTO certificate_authority (id, common_name, certificate_pem, private_key_encrypted, key_salt, created_at, expires_at, key_algorithm) VALUES ('default', 'Test CA', 'CERT', X'00', X'00', ?, ?, 'rsa')""", (now, now + 86400 * 365), ) db.execute( """INSERT OR REPLACE INTO issued_certificates (serial, ca_id, common_name, fingerprint_sha1, certificate_pem, created_at, expires_at, status, is_admin) VALUES (?, 'default', 'admin', ?, 'CERT', ?, ?, 'valid', 1)""", ("admin001", fingerprint, now, now + 86400), ) db.commit() return client class TestAuditLogging: """Test audit logging functions.""" def test_log_event_creates_record(self, app: Flask) -> None: """log_event should create audit log entry.""" with app.app_context(): log_event( AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="abc123", client_id="a" * 40, client_ip="192.168.1.1", details={"size": 1024}, ) entries, total = query_audit_log(paste_id="abc123") assert total == 1 assert len(entries) == 1 assert entries[0]["event_type"] == "paste_create" assert entries[0]["outcome"] == "success" assert entries[0]["paste_id"] == "abc123" assert entries[0]["client_id"] == "a" * 40 assert entries[0]["client_ip"] == "192.168.1.1" assert entries[0]["details"]["size"] == 1024 def test_log_event_with_string_types(self, app: Flask) -> None: """log_event should accept string types as well as enums.""" with app.app_context(): log_event( "custom_event", "custom_outcome", paste_id="xyz789", ) entries, total = query_audit_log(event_type="custom_event") assert total == 1 assert entries[0]["event_type"] == "custom_event" assert entries[0]["outcome"] == "custom_outcome" def test_log_event_disabled(self, app: Flask) -> None: """log_event should skip logging when disabled.""" app.config["AUDIT_ENABLED"] = False with app.app_context(): log_event( AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="disabled123", ) _entries, total = query_audit_log(paste_id="disabled123") assert total == 0 class TestAuditQuery: """Test audit log query functionality.""" def test_query_by_event_type(self, app: Flask) -> None: """Query should filter by event type.""" with app.app_context(): log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="evt_p1") log_event(AuditEvent.PASTE_DELETE, AuditOutcome.SUCCESS, paste_id="evt_p2") log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="evt_p3") entries, _total = query_audit_log(event_type="paste_create") # Verify entries matching our test are present our_entries = [e for e in entries if e.get("paste_id", "").startswith("evt_")] assert len(our_entries) == 2 assert all(e["event_type"] == "paste_create" for e in entries) def test_query_by_client_id(self, app: Flask) -> None: """Query should filter by client ID.""" with app.app_context(): # Use unique client IDs for this test client_a = "1" * 40 client_b = "2" * 40 log_event( AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, client_id=client_a, paste_id="cid_p1" ) log_event( AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, client_id=client_b, paste_id="cid_p2" ) entries, total = query_audit_log(client_id=client_a) assert total >= 1 assert all(e["client_id"] == client_a for e in entries) def test_query_by_outcome(self, app: Flask) -> None: """Query should filter by outcome.""" with app.app_context(): log_event(AuditEvent.RATE_LIMIT, AuditOutcome.BLOCKED, paste_id="out_test") log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS) log_event(AuditEvent.POW_FAILURE, AuditOutcome.FAILURE) entries, total = query_audit_log(outcome="blocked") assert total >= 1 assert all(e["outcome"] == "blocked" for e in entries) def test_query_by_timestamp_range(self, app: Flask) -> None: """Query should filter by timestamp range.""" with app.app_context(): now = int(time.time()) log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="recent") entries, total = query_audit_log(since=now - 60, until=now + 60) assert total >= 1 assert any(e["paste_id"] == "recent" for e in entries) def test_query_pagination(self, app: Flask) -> None: """Query should support pagination.""" with app.app_context(): for i in range(5): log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id=f"p{i}") entries1, total = query_audit_log(limit=2, offset=0) entries2, _ = query_audit_log(limit=2, offset=2) assert total >= 5 assert len(entries1) == 2 assert len(entries2) == 2 assert entries1[0]["paste_id"] != entries2[0]["paste_id"] class TestAuditCleanup: """Test audit log cleanup functionality.""" def test_cleanup_old_logs(self, app: Flask) -> None: """Cleanup should delete old entries.""" with app.app_context(): db = get_db() old_ts = int(time.time()) - (100 * 24 * 60 * 60) # 100 days ago # Insert old entry directly db.execute( """INSERT INTO audit_log (timestamp, event_type, outcome) VALUES (?, 'old_event', 'success')""", (old_ts,), ) db.commit() # Insert recent entry log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="recent") # Cleanup with 90 day retention deleted = cleanup_old_audit_logs(retention_days=90) assert deleted == 1 entries, _total = query_audit_log() assert all(e["event_type"] != "old_event" for e in entries) class TestAuditEndpoint: """Test GET /audit endpoint.""" def test_audit_requires_auth(self, client: FlaskClient) -> None: """Audit endpoint should require authentication.""" response = client.get("/audit") assert response.status_code == 401 def test_audit_requires_admin(self, app: Flask) -> None: """Audit endpoint should require admin access.""" client = app.test_client() client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = "c" * 40 # Non-admin response = client.get("/audit") assert response.status_code == 403 def test_audit_admin_access(self, admin_client: FlaskClient, app: Flask) -> None: """Admin should be able to query audit log.""" with app.app_context(): log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="test123") response = admin_client.get("/audit") assert response.status_code == 200 data = response.get_json() assert "entries" in data assert "total" in data assert "count" in data def test_audit_query_params(self, admin_client: FlaskClient, app: Flask) -> None: """Audit endpoint should accept query parameters.""" with app.app_context(): log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="filtered") log_event(AuditEvent.PASTE_DELETE, AuditOutcome.SUCCESS, paste_id="other") response = admin_client.get("/audit?event_type=paste_create") assert response.status_code == 200 data = response.get_json() assert all(e["event_type"] == "paste_create" for e in data["entries"]) class TestAuditIntegration: """Test audit logging integration with paste operations.""" def test_paste_create_logs_event(self, client: FlaskClient, app: Flask) -> None: """Paste creation should log audit event.""" response = client.post("/", data=b"test content") assert response.status_code == 201 paste_id = response.get_json()["id"] with app.app_context(): entries, _ = query_audit_log(paste_id=paste_id, event_type="paste_create") assert len(entries) == 1 assert entries[0]["outcome"] == "success" def test_paste_access_logs_event(self, client: FlaskClient, app: Flask) -> None: """Paste access should log audit event.""" # Create paste create_response = client.post("/", data=b"access test") paste_id = create_response.get_json()["id"] # Access paste client.get(f"/{paste_id}/raw") with app.app_context(): entries, _ = query_audit_log(paste_id=paste_id, event_type="paste_access") assert len(entries) == 1 assert entries[0]["outcome"] == "success" def test_rate_limit_logs_event(self, app: Flask) -> None: """Rate limit block should log audit event.""" app.config["RATE_LIMIT_ENABLED"] = True app.config["RATE_LIMIT_WINDOW"] = 60 app.config["RATE_LIMIT_MAX"] = 1 client = app.test_client() # First request should succeed client.post("/", data=b"first") # Second request should be rate limited response = client.post("/", data=b"second") assert response.status_code == 429 with app.app_context(): entries, _ = query_audit_log(event_type="rate_limit") assert len(entries) >= 1 assert entries[0]["outcome"] == "blocked"