forked from claw/flaskpaste
Audit logging: - audit_log table with event tracking - app/audit.py module with log_event(), query_audit_log() - GET /audit endpoint (admin only) - configurable retention and cleanup Prometheus metrics: - app/metrics.py with custom counters - paste create/access/delete, rate limit, PoW, dedup metrics - instrumentation in API routes CLI clipboard integration: - fpaste create -C/--clipboard (read from clipboard) - fpaste create --copy-url (copy result URL) - fpaste get -c/--copy (copy content) - cross-platform: xclip, xsel, pbcopy, wl-copy Shell completions: - completions/ directory with bash/zsh/fish scripts - fpaste completion --shell command
313 lines
11 KiB
Python
313 lines
11 KiB
Python
"""Tests for audit logging system."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from typing import TYPE_CHECKING
|
|
|
|
import pytest
|
|
|
|
from app import create_app
|
|
from app.audit import AuditEvent, AuditOutcome, cleanup_old_audit_logs, log_event, query_audit_log
|
|
from app.database import get_db
|
|
|
|
if TYPE_CHECKING:
|
|
from flask import Flask
|
|
from flask.testing import FlaskClient
|
|
|
|
|
|
@pytest.fixture
|
|
def app() -> Flask:
|
|
"""Create application configured for testing."""
|
|
app = create_app("testing")
|
|
app.config["POW_DIFFICULTY"] = 0
|
|
app.config["RATE_LIMIT_ENABLED"] = False
|
|
app.config["AUDIT_ENABLED"] = True
|
|
app.config["AUDIT_RETENTION_DAYS"] = 90
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def client(app: Flask) -> FlaskClient:
|
|
"""Create test client."""
|
|
return app.test_client()
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_client(app: Flask) -> FlaskClient:
|
|
"""Create test client with admin authentication."""
|
|
client = app.test_client()
|
|
fingerprint = "b" * 40
|
|
client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = fingerprint
|
|
with app.app_context():
|
|
app.config["PKI_ENABLED"] = True
|
|
db = get_db()
|
|
now = int(time.time())
|
|
db.execute(
|
|
"""INSERT OR REPLACE INTO certificate_authority
|
|
(id, common_name, certificate_pem, private_key_encrypted,
|
|
key_salt, created_at, expires_at, key_algorithm)
|
|
VALUES ('default', 'Test CA', 'CERT', X'00', X'00', ?, ?, 'rsa')""",
|
|
(now, now + 86400 * 365),
|
|
)
|
|
db.execute(
|
|
"""INSERT OR REPLACE INTO issued_certificates
|
|
(serial, ca_id, common_name, fingerprint_sha1, certificate_pem,
|
|
created_at, expires_at, status, is_admin)
|
|
VALUES (?, 'default', 'admin', ?, 'CERT', ?, ?, 'valid', 1)""",
|
|
("admin001", fingerprint, now, now + 86400),
|
|
)
|
|
db.commit()
|
|
return client
|
|
|
|
|
|
class TestAuditLogging:
|
|
"""Test audit logging functions."""
|
|
|
|
def test_log_event_creates_record(self, app: Flask) -> None:
|
|
"""log_event should create audit log entry."""
|
|
with app.app_context():
|
|
log_event(
|
|
AuditEvent.PASTE_CREATE,
|
|
AuditOutcome.SUCCESS,
|
|
paste_id="abc123",
|
|
client_id="a" * 40,
|
|
client_ip="192.168.1.1",
|
|
details={"size": 1024},
|
|
)
|
|
|
|
entries, total = query_audit_log(paste_id="abc123")
|
|
|
|
assert total == 1
|
|
assert len(entries) == 1
|
|
assert entries[0]["event_type"] == "paste_create"
|
|
assert entries[0]["outcome"] == "success"
|
|
assert entries[0]["paste_id"] == "abc123"
|
|
assert entries[0]["client_id"] == "a" * 40
|
|
assert entries[0]["client_ip"] == "192.168.1.1"
|
|
assert entries[0]["details"]["size"] == 1024
|
|
|
|
def test_log_event_with_string_types(self, app: Flask) -> None:
|
|
"""log_event should accept string types as well as enums."""
|
|
with app.app_context():
|
|
log_event(
|
|
"custom_event",
|
|
"custom_outcome",
|
|
paste_id="xyz789",
|
|
)
|
|
|
|
entries, total = query_audit_log(event_type="custom_event")
|
|
|
|
assert total == 1
|
|
assert entries[0]["event_type"] == "custom_event"
|
|
assert entries[0]["outcome"] == "custom_outcome"
|
|
|
|
def test_log_event_disabled(self, app: Flask) -> None:
|
|
"""log_event should skip logging when disabled."""
|
|
app.config["AUDIT_ENABLED"] = False
|
|
with app.app_context():
|
|
log_event(
|
|
AuditEvent.PASTE_CREATE,
|
|
AuditOutcome.SUCCESS,
|
|
paste_id="disabled123",
|
|
)
|
|
|
|
_entries, total = query_audit_log(paste_id="disabled123")
|
|
assert total == 0
|
|
|
|
|
|
class TestAuditQuery:
|
|
"""Test audit log query functionality."""
|
|
|
|
def test_query_by_event_type(self, app: Flask) -> None:
|
|
"""Query should filter by event type."""
|
|
with app.app_context():
|
|
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="evt_p1")
|
|
log_event(AuditEvent.PASTE_DELETE, AuditOutcome.SUCCESS, paste_id="evt_p2")
|
|
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="evt_p3")
|
|
|
|
entries, _total = query_audit_log(event_type="paste_create")
|
|
|
|
# Verify entries matching our test are present
|
|
our_entries = [e for e in entries if e.get("paste_id", "").startswith("evt_")]
|
|
assert len(our_entries) == 2
|
|
assert all(e["event_type"] == "paste_create" for e in entries)
|
|
|
|
def test_query_by_client_id(self, app: Flask) -> None:
|
|
"""Query should filter by client ID."""
|
|
with app.app_context():
|
|
# Use unique client IDs for this test
|
|
client_a = "1" * 40
|
|
client_b = "2" * 40
|
|
log_event(
|
|
AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, client_id=client_a, paste_id="cid_p1"
|
|
)
|
|
log_event(
|
|
AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, client_id=client_b, paste_id="cid_p2"
|
|
)
|
|
|
|
entries, total = query_audit_log(client_id=client_a)
|
|
|
|
assert total >= 1
|
|
assert all(e["client_id"] == client_a for e in entries)
|
|
|
|
def test_query_by_outcome(self, app: Flask) -> None:
|
|
"""Query should filter by outcome."""
|
|
with app.app_context():
|
|
log_event(AuditEvent.RATE_LIMIT, AuditOutcome.BLOCKED, paste_id="out_test")
|
|
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS)
|
|
log_event(AuditEvent.POW_FAILURE, AuditOutcome.FAILURE)
|
|
|
|
entries, total = query_audit_log(outcome="blocked")
|
|
|
|
assert total >= 1
|
|
assert all(e["outcome"] == "blocked" for e in entries)
|
|
|
|
def test_query_by_timestamp_range(self, app: Flask) -> None:
|
|
"""Query should filter by timestamp range."""
|
|
with app.app_context():
|
|
now = int(time.time())
|
|
|
|
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="recent")
|
|
|
|
entries, total = query_audit_log(since=now - 60, until=now + 60)
|
|
|
|
assert total >= 1
|
|
assert any(e["paste_id"] == "recent" for e in entries)
|
|
|
|
def test_query_pagination(self, app: Flask) -> None:
|
|
"""Query should support pagination."""
|
|
with app.app_context():
|
|
for i in range(5):
|
|
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id=f"p{i}")
|
|
|
|
entries1, total = query_audit_log(limit=2, offset=0)
|
|
entries2, _ = query_audit_log(limit=2, offset=2)
|
|
|
|
assert total >= 5
|
|
assert len(entries1) == 2
|
|
assert len(entries2) == 2
|
|
assert entries1[0]["paste_id"] != entries2[0]["paste_id"]
|
|
|
|
|
|
class TestAuditCleanup:
|
|
"""Test audit log cleanup functionality."""
|
|
|
|
def test_cleanup_old_logs(self, app: Flask) -> None:
|
|
"""Cleanup should delete old entries."""
|
|
with app.app_context():
|
|
db = get_db()
|
|
old_ts = int(time.time()) - (100 * 24 * 60 * 60) # 100 days ago
|
|
|
|
# Insert old entry directly
|
|
db.execute(
|
|
"""INSERT INTO audit_log
|
|
(timestamp, event_type, outcome)
|
|
VALUES (?, 'old_event', 'success')""",
|
|
(old_ts,),
|
|
)
|
|
db.commit()
|
|
|
|
# Insert recent entry
|
|
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="recent")
|
|
|
|
# Cleanup with 90 day retention
|
|
deleted = cleanup_old_audit_logs(retention_days=90)
|
|
|
|
assert deleted == 1
|
|
|
|
entries, _total = query_audit_log()
|
|
assert all(e["event_type"] != "old_event" for e in entries)
|
|
|
|
|
|
class TestAuditEndpoint:
|
|
"""Test GET /audit endpoint."""
|
|
|
|
def test_audit_requires_auth(self, client: FlaskClient) -> None:
|
|
"""Audit endpoint should require authentication."""
|
|
response = client.get("/audit")
|
|
assert response.status_code == 401
|
|
|
|
def test_audit_requires_admin(self, app: Flask) -> None:
|
|
"""Audit endpoint should require admin access."""
|
|
client = app.test_client()
|
|
client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = "c" * 40 # Non-admin
|
|
|
|
response = client.get("/audit")
|
|
assert response.status_code == 403
|
|
|
|
def test_audit_admin_access(self, admin_client: FlaskClient, app: Flask) -> None:
|
|
"""Admin should be able to query audit log."""
|
|
with app.app_context():
|
|
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="test123")
|
|
|
|
response = admin_client.get("/audit")
|
|
assert response.status_code == 200
|
|
|
|
data = response.get_json()
|
|
assert "entries" in data
|
|
assert "total" in data
|
|
assert "count" in data
|
|
|
|
def test_audit_query_params(self, admin_client: FlaskClient, app: Flask) -> None:
|
|
"""Audit endpoint should accept query parameters."""
|
|
with app.app_context():
|
|
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="filtered")
|
|
log_event(AuditEvent.PASTE_DELETE, AuditOutcome.SUCCESS, paste_id="other")
|
|
|
|
response = admin_client.get("/audit?event_type=paste_create")
|
|
assert response.status_code == 200
|
|
|
|
data = response.get_json()
|
|
assert all(e["event_type"] == "paste_create" for e in data["entries"])
|
|
|
|
|
|
class TestAuditIntegration:
|
|
"""Test audit logging integration with paste operations."""
|
|
|
|
def test_paste_create_logs_event(self, client: FlaskClient, app: Flask) -> None:
|
|
"""Paste creation should log audit event."""
|
|
response = client.post("/", data=b"test content")
|
|
assert response.status_code == 201
|
|
|
|
paste_id = response.get_json()["id"]
|
|
|
|
with app.app_context():
|
|
entries, _ = query_audit_log(paste_id=paste_id, event_type="paste_create")
|
|
assert len(entries) == 1
|
|
assert entries[0]["outcome"] == "success"
|
|
|
|
def test_paste_access_logs_event(self, client: FlaskClient, app: Flask) -> None:
|
|
"""Paste access should log audit event."""
|
|
# Create paste
|
|
create_response = client.post("/", data=b"access test")
|
|
paste_id = create_response.get_json()["id"]
|
|
|
|
# Access paste
|
|
client.get(f"/{paste_id}/raw")
|
|
|
|
with app.app_context():
|
|
entries, _ = query_audit_log(paste_id=paste_id, event_type="paste_access")
|
|
assert len(entries) == 1
|
|
assert entries[0]["outcome"] == "success"
|
|
|
|
def test_rate_limit_logs_event(self, app: Flask) -> None:
|
|
"""Rate limit block should log audit event."""
|
|
app.config["RATE_LIMIT_ENABLED"] = True
|
|
app.config["RATE_LIMIT_WINDOW"] = 60
|
|
app.config["RATE_LIMIT_MAX"] = 1
|
|
|
|
client = app.test_client()
|
|
|
|
# First request should succeed
|
|
client.post("/", data=b"first")
|
|
|
|
# Second request should be rate limited
|
|
response = client.post("/", data=b"second")
|
|
assert response.status_code == 429
|
|
|
|
with app.app_context():
|
|
entries, _ = query_audit_log(event_type="rate_limit")
|
|
assert len(entries) >= 1
|
|
assert entries[0]["outcome"] == "blocked"
|