Files
flaskpaste/app/audit.py
2026-02-16 20:59:55 +01:00

208 lines
6.0 KiB
Python

"""Audit logging for security events."""
from __future__ import annotations
import json
import time
from enum import StrEnum
from typing import TYPE_CHECKING, Any
from flask import current_app, g
from app.database import get_db
if TYPE_CHECKING:
from sqlite3 import Row
class AuditEvent(StrEnum):
"""Security-relevant event types for audit logging."""
PASTE_CREATE = "paste_create"
PASTE_ACCESS = "paste_access"
PASTE_DELETE = "paste_delete"
PASTE_UPDATE = "paste_update"
RATE_LIMIT = "rate_limit"
POW_FAILURE = "pow_failure"
DEDUP_BLOCK = "dedup_block"
CERT_ISSUED = "cert_issued"
CERT_REVOKED = "cert_revoked"
AUTH_FAILURE = "auth_failure"
URL_CREATE = "url_create"
URL_ACCESS = "url_access"
URL_DELETE = "url_delete"
class AuditOutcome(StrEnum):
"""Outcome types for audit events."""
SUCCESS = "success"
FAILURE = "failure"
BLOCKED = "blocked"
def log_event(
event_type: AuditEvent | str,
outcome: AuditOutcome | str,
*,
paste_id: str | None = None,
client_id: str | None = None,
client_ip: str | None = None,
details: dict[str, Any] | None = None,
) -> None:
"""Log a security-relevant event to the audit log.
Args:
event_type: Type of event (from AuditEvent enum or string)
outcome: Result of the event (success, failure, blocked)
paste_id: Related paste ID (if applicable)
client_id: Client certificate fingerprint (if authenticated)
client_ip: Client IP address
details: Additional event-specific details (stored as JSON)
"""
if not current_app.config.get("AUDIT_ENABLED", True):
return
now = int(time.time())
request_id = getattr(g, "request_id", None)
# Convert enums to strings
event_type_str = event_type.value if isinstance(event_type, AuditEvent) else event_type
outcome_str = outcome.value if isinstance(outcome, AuditOutcome) else outcome
# Serialize details to JSON if provided
details_json = json.dumps(details, ensure_ascii=False) if details else None
db = get_db()
db.execute(
"""INSERT INTO audit_log
(timestamp, event_type, client_id, client_ip, paste_id, request_id, outcome, details)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
now,
event_type_str,
client_id,
client_ip,
paste_id,
request_id,
outcome_str,
details_json,
),
)
db.commit()
def query_audit_log(
*,
event_type: str | None = None,
client_id: str | None = None,
paste_id: str | None = None,
outcome: str | None = None,
since: int | None = None,
until: int | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict[str, Any]], int]:
"""Query audit log with filters.
Args:
event_type: Filter by event type
client_id: Filter by client fingerprint
paste_id: Filter by paste ID
outcome: Filter by outcome (success, failure, blocked)
since: Filter by timestamp >= since
until: Filter by timestamp <= until
limit: Maximum results to return
offset: Pagination offset
Returns:
Tuple of (list of audit entries, total count matching filters)
"""
db = get_db()
where_clauses: list[str] = []
params: list[Any] = []
if event_type:
where_clauses.append("event_type = ?")
params.append(event_type)
if client_id:
where_clauses.append("client_id = ?")
params.append(client_id)
if paste_id:
where_clauses.append("paste_id = ?")
params.append(paste_id)
if outcome:
where_clauses.append("outcome = ?")
params.append(outcome)
if since:
where_clauses.append("timestamp >= ?")
params.append(since)
if until:
where_clauses.append("timestamp <= ?")
params.append(until)
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
# Get total count
count_row = db.execute(
f"SELECT COUNT(*) as total FROM audit_log WHERE {where_sql}", # noqa: S608 # nosec B608
params,
).fetchone()
total = count_row["total"] if count_row else 0
# Fetch entries (where_sql built from trusted column names only)
query = f"""SELECT id, timestamp, event_type, client_id, client_ip,
paste_id, request_id, outcome, details
FROM audit_log
WHERE {where_sql}
ORDER BY timestamp DESC
LIMIT ? OFFSET ?""" # noqa: S608 # nosec B608
rows: list[Row] = db.execute(query, [*params, limit, offset]).fetchall()
entries = []
for row in rows:
entry: dict[str, Any] = {
"id": row["id"],
"timestamp": row["timestamp"],
"event_type": row["event_type"],
"outcome": row["outcome"],
}
if row["client_id"]:
entry["client_id"] = row["client_id"]
if row["client_ip"]:
entry["client_ip"] = row["client_ip"]
if row["paste_id"]:
entry["paste_id"] = row["paste_id"]
if row["request_id"]:
entry["request_id"] = row["request_id"]
if row["details"]:
try:
entry["details"] = json.loads(row["details"])
except json.JSONDecodeError:
entry["details"] = row["details"]
entries.append(entry)
return entries, total
def cleanup_old_audit_logs(retention_days: int | None = None) -> int:
"""Delete audit log entries older than retention period.
Args:
retention_days: Number of days to keep. If None, uses config.
Returns:
Number of deleted entries.
"""
if retention_days is None:
retention_days = current_app.config.get("AUDIT_RETENTION_DAYS", 90)
cutoff = int(time.time()) - (retention_days * 24 * 60 * 60)
db = get_db()
cursor = db.execute("DELETE FROM audit_log WHERE timestamp < ?", (cutoff,))
db.commit()
return cursor.rowcount