Files
flaskpaste/tests/test_audit.py
Username 7063f8718e feat: add observability and CLI enhancements
Audit logging:
- audit_log table with event tracking
- app/audit.py module with log_event(), query_audit_log()
- GET /audit endpoint (admin only)
- configurable retention and cleanup

Prometheus metrics:
- app/metrics.py with custom counters
- paste create/access/delete, rate limit, PoW, dedup metrics
- instrumentation in API routes

CLI clipboard integration:
- fpaste create -C/--clipboard (read from clipboard)
- fpaste create --copy-url (copy result URL)
- fpaste get -c/--copy (copy content)
- cross-platform: xclip, xsel, pbcopy, wl-copy

Shell completions:
- completions/ directory with bash/zsh/fish scripts
- fpaste completion --shell command
2025-12-23 22:39:50 +01:00

313 lines
11 KiB
Python

"""Tests for audit logging system."""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
import pytest
from app import create_app
from app.audit import AuditEvent, AuditOutcome, cleanup_old_audit_logs, log_event, query_audit_log
from app.database import get_db
if TYPE_CHECKING:
from flask import Flask
from flask.testing import FlaskClient
@pytest.fixture
def app() -> Flask:
"""Create application configured for testing."""
app = create_app("testing")
app.config["POW_DIFFICULTY"] = 0
app.config["RATE_LIMIT_ENABLED"] = False
app.config["AUDIT_ENABLED"] = True
app.config["AUDIT_RETENTION_DAYS"] = 90
return app
@pytest.fixture
def client(app: Flask) -> FlaskClient:
"""Create test client."""
return app.test_client()
@pytest.fixture
def admin_client(app: Flask) -> FlaskClient:
"""Create test client with admin authentication."""
client = app.test_client()
fingerprint = "b" * 40
client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = fingerprint
with app.app_context():
app.config["PKI_ENABLED"] = True
db = get_db()
now = int(time.time())
db.execute(
"""INSERT OR REPLACE INTO certificate_authority
(id, common_name, certificate_pem, private_key_encrypted,
key_salt, created_at, expires_at, key_algorithm)
VALUES ('default', 'Test CA', 'CERT', X'00', X'00', ?, ?, 'rsa')""",
(now, now + 86400 * 365),
)
db.execute(
"""INSERT OR REPLACE INTO issued_certificates
(serial, ca_id, common_name, fingerprint_sha1, certificate_pem,
created_at, expires_at, status, is_admin)
VALUES (?, 'default', 'admin', ?, 'CERT', ?, ?, 'valid', 1)""",
("admin001", fingerprint, now, now + 86400),
)
db.commit()
return client
class TestAuditLogging:
"""Test audit logging functions."""
def test_log_event_creates_record(self, app: Flask) -> None:
"""log_event should create audit log entry."""
with app.app_context():
log_event(
AuditEvent.PASTE_CREATE,
AuditOutcome.SUCCESS,
paste_id="abc123",
client_id="a" * 40,
client_ip="192.168.1.1",
details={"size": 1024},
)
entries, total = query_audit_log(paste_id="abc123")
assert total == 1
assert len(entries) == 1
assert entries[0]["event_type"] == "paste_create"
assert entries[0]["outcome"] == "success"
assert entries[0]["paste_id"] == "abc123"
assert entries[0]["client_id"] == "a" * 40
assert entries[0]["client_ip"] == "192.168.1.1"
assert entries[0]["details"]["size"] == 1024
def test_log_event_with_string_types(self, app: Flask) -> None:
"""log_event should accept string types as well as enums."""
with app.app_context():
log_event(
"custom_event",
"custom_outcome",
paste_id="xyz789",
)
entries, total = query_audit_log(event_type="custom_event")
assert total == 1
assert entries[0]["event_type"] == "custom_event"
assert entries[0]["outcome"] == "custom_outcome"
def test_log_event_disabled(self, app: Flask) -> None:
"""log_event should skip logging when disabled."""
app.config["AUDIT_ENABLED"] = False
with app.app_context():
log_event(
AuditEvent.PASTE_CREATE,
AuditOutcome.SUCCESS,
paste_id="disabled123",
)
_entries, total = query_audit_log(paste_id="disabled123")
assert total == 0
class TestAuditQuery:
"""Test audit log query functionality."""
def test_query_by_event_type(self, app: Flask) -> None:
"""Query should filter by event type."""
with app.app_context():
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="evt_p1")
log_event(AuditEvent.PASTE_DELETE, AuditOutcome.SUCCESS, paste_id="evt_p2")
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="evt_p3")
entries, _total = query_audit_log(event_type="paste_create")
# Verify entries matching our test are present
our_entries = [e for e in entries if e.get("paste_id", "").startswith("evt_")]
assert len(our_entries) == 2
assert all(e["event_type"] == "paste_create" for e in entries)
def test_query_by_client_id(self, app: Flask) -> None:
"""Query should filter by client ID."""
with app.app_context():
# Use unique client IDs for this test
client_a = "1" * 40
client_b = "2" * 40
log_event(
AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, client_id=client_a, paste_id="cid_p1"
)
log_event(
AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, client_id=client_b, paste_id="cid_p2"
)
entries, total = query_audit_log(client_id=client_a)
assert total >= 1
assert all(e["client_id"] == client_a for e in entries)
def test_query_by_outcome(self, app: Flask) -> None:
"""Query should filter by outcome."""
with app.app_context():
log_event(AuditEvent.RATE_LIMIT, AuditOutcome.BLOCKED, paste_id="out_test")
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS)
log_event(AuditEvent.POW_FAILURE, AuditOutcome.FAILURE)
entries, total = query_audit_log(outcome="blocked")
assert total >= 1
assert all(e["outcome"] == "blocked" for e in entries)
def test_query_by_timestamp_range(self, app: Flask) -> None:
"""Query should filter by timestamp range."""
with app.app_context():
now = int(time.time())
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="recent")
entries, total = query_audit_log(since=now - 60, until=now + 60)
assert total >= 1
assert any(e["paste_id"] == "recent" for e in entries)
def test_query_pagination(self, app: Flask) -> None:
"""Query should support pagination."""
with app.app_context():
for i in range(5):
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id=f"p{i}")
entries1, total = query_audit_log(limit=2, offset=0)
entries2, _ = query_audit_log(limit=2, offset=2)
assert total >= 5
assert len(entries1) == 2
assert len(entries2) == 2
assert entries1[0]["paste_id"] != entries2[0]["paste_id"]
class TestAuditCleanup:
"""Test audit log cleanup functionality."""
def test_cleanup_old_logs(self, app: Flask) -> None:
"""Cleanup should delete old entries."""
with app.app_context():
db = get_db()
old_ts = int(time.time()) - (100 * 24 * 60 * 60) # 100 days ago
# Insert old entry directly
db.execute(
"""INSERT INTO audit_log
(timestamp, event_type, outcome)
VALUES (?, 'old_event', 'success')""",
(old_ts,),
)
db.commit()
# Insert recent entry
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="recent")
# Cleanup with 90 day retention
deleted = cleanup_old_audit_logs(retention_days=90)
assert deleted == 1
entries, _total = query_audit_log()
assert all(e["event_type"] != "old_event" for e in entries)
class TestAuditEndpoint:
"""Test GET /audit endpoint."""
def test_audit_requires_auth(self, client: FlaskClient) -> None:
"""Audit endpoint should require authentication."""
response = client.get("/audit")
assert response.status_code == 401
def test_audit_requires_admin(self, app: Flask) -> None:
"""Audit endpoint should require admin access."""
client = app.test_client()
client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = "c" * 40 # Non-admin
response = client.get("/audit")
assert response.status_code == 403
def test_audit_admin_access(self, admin_client: FlaskClient, app: Flask) -> None:
"""Admin should be able to query audit log."""
with app.app_context():
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="test123")
response = admin_client.get("/audit")
assert response.status_code == 200
data = response.get_json()
assert "entries" in data
assert "total" in data
assert "count" in data
def test_audit_query_params(self, admin_client: FlaskClient, app: Flask) -> None:
"""Audit endpoint should accept query parameters."""
with app.app_context():
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="filtered")
log_event(AuditEvent.PASTE_DELETE, AuditOutcome.SUCCESS, paste_id="other")
response = admin_client.get("/audit?event_type=paste_create")
assert response.status_code == 200
data = response.get_json()
assert all(e["event_type"] == "paste_create" for e in data["entries"])
class TestAuditIntegration:
"""Test audit logging integration with paste operations."""
def test_paste_create_logs_event(self, client: FlaskClient, app: Flask) -> None:
"""Paste creation should log audit event."""
response = client.post("/", data=b"test content")
assert response.status_code == 201
paste_id = response.get_json()["id"]
with app.app_context():
entries, _ = query_audit_log(paste_id=paste_id, event_type="paste_create")
assert len(entries) == 1
assert entries[0]["outcome"] == "success"
def test_paste_access_logs_event(self, client: FlaskClient, app: Flask) -> None:
"""Paste access should log audit event."""
# Create paste
create_response = client.post("/", data=b"access test")
paste_id = create_response.get_json()["id"]
# Access paste
client.get(f"/{paste_id}/raw")
with app.app_context():
entries, _ = query_audit_log(paste_id=paste_id, event_type="paste_access")
assert len(entries) == 1
assert entries[0]["outcome"] == "success"
def test_rate_limit_logs_event(self, app: Flask) -> None:
"""Rate limit block should log audit event."""
app.config["RATE_LIMIT_ENABLED"] = True
app.config["RATE_LIMIT_WINDOW"] = 60
app.config["RATE_LIMIT_MAX"] = 1
client = app.test_client()
# First request should succeed
client.post("/", data=b"first")
# Second request should be rate limited
response = client.post("/", data=b"second")
assert response.status_code == 429
with app.app_context():
entries, _ = query_audit_log(event_type="rate_limit")
assert len(entries) >= 1
assert entries[0]["outcome"] == "blocked"