forked from username/flaskpaste
208 lines
6.0 KiB
Python
208 lines
6.0 KiB
Python
"""Audit logging for security events."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import time
|
|
from enum import StrEnum
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from flask import current_app, g
|
|
|
|
from app.database import get_db
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlite3 import Row
|
|
|
|
|
|
class AuditEvent(StrEnum):
|
|
"""Security-relevant event types for audit logging."""
|
|
|
|
PASTE_CREATE = "paste_create"
|
|
PASTE_ACCESS = "paste_access"
|
|
PASTE_DELETE = "paste_delete"
|
|
PASTE_UPDATE = "paste_update"
|
|
RATE_LIMIT = "rate_limit"
|
|
POW_FAILURE = "pow_failure"
|
|
DEDUP_BLOCK = "dedup_block"
|
|
CERT_ISSUED = "cert_issued"
|
|
CERT_REVOKED = "cert_revoked"
|
|
AUTH_FAILURE = "auth_failure"
|
|
URL_CREATE = "url_create"
|
|
URL_ACCESS = "url_access"
|
|
URL_DELETE = "url_delete"
|
|
|
|
|
|
class AuditOutcome(StrEnum):
|
|
"""Outcome types for audit events."""
|
|
|
|
SUCCESS = "success"
|
|
FAILURE = "failure"
|
|
BLOCKED = "blocked"
|
|
|
|
|
|
def log_event(
|
|
event_type: AuditEvent | str,
|
|
outcome: AuditOutcome | str,
|
|
*,
|
|
paste_id: str | None = None,
|
|
client_id: str | None = None,
|
|
client_ip: str | None = None,
|
|
details: dict[str, Any] | None = None,
|
|
) -> None:
|
|
"""Log a security-relevant event to the audit log.
|
|
|
|
Args:
|
|
event_type: Type of event (from AuditEvent enum or string)
|
|
outcome: Result of the event (success, failure, blocked)
|
|
paste_id: Related paste ID (if applicable)
|
|
client_id: Client certificate fingerprint (if authenticated)
|
|
client_ip: Client IP address
|
|
details: Additional event-specific details (stored as JSON)
|
|
"""
|
|
if not current_app.config.get("AUDIT_ENABLED", True):
|
|
return
|
|
|
|
now = int(time.time())
|
|
request_id = getattr(g, "request_id", None)
|
|
|
|
# Convert enums to strings
|
|
event_type_str = event_type.value if isinstance(event_type, AuditEvent) else event_type
|
|
outcome_str = outcome.value if isinstance(outcome, AuditOutcome) else outcome
|
|
|
|
# Serialize details to JSON if provided
|
|
details_json = json.dumps(details, ensure_ascii=False) if details else None
|
|
|
|
db = get_db()
|
|
db.execute(
|
|
"""INSERT INTO audit_log
|
|
(timestamp, event_type, client_id, client_ip, paste_id, request_id, outcome, details)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
now,
|
|
event_type_str,
|
|
client_id,
|
|
client_ip,
|
|
paste_id,
|
|
request_id,
|
|
outcome_str,
|
|
details_json,
|
|
),
|
|
)
|
|
db.commit()
|
|
|
|
|
|
def query_audit_log(
|
|
*,
|
|
event_type: str | None = None,
|
|
client_id: str | None = None,
|
|
paste_id: str | None = None,
|
|
outcome: str | None = None,
|
|
since: int | None = None,
|
|
until: int | None = None,
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
) -> tuple[list[dict[str, Any]], int]:
|
|
"""Query audit log with filters.
|
|
|
|
Args:
|
|
event_type: Filter by event type
|
|
client_id: Filter by client fingerprint
|
|
paste_id: Filter by paste ID
|
|
outcome: Filter by outcome (success, failure, blocked)
|
|
since: Filter by timestamp >= since
|
|
until: Filter by timestamp <= until
|
|
limit: Maximum results to return
|
|
offset: Pagination offset
|
|
|
|
Returns:
|
|
Tuple of (list of audit entries, total count matching filters)
|
|
"""
|
|
db = get_db()
|
|
|
|
where_clauses: list[str] = []
|
|
params: list[Any] = []
|
|
|
|
if event_type:
|
|
where_clauses.append("event_type = ?")
|
|
params.append(event_type)
|
|
if client_id:
|
|
where_clauses.append("client_id = ?")
|
|
params.append(client_id)
|
|
if paste_id:
|
|
where_clauses.append("paste_id = ?")
|
|
params.append(paste_id)
|
|
if outcome:
|
|
where_clauses.append("outcome = ?")
|
|
params.append(outcome)
|
|
if since:
|
|
where_clauses.append("timestamp >= ?")
|
|
params.append(since)
|
|
if until:
|
|
where_clauses.append("timestamp <= ?")
|
|
params.append(until)
|
|
|
|
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
|
|
|
|
# Get total count
|
|
count_row = db.execute(
|
|
f"SELECT COUNT(*) as total FROM audit_log WHERE {where_sql}", # noqa: S608 # nosec B608
|
|
params,
|
|
).fetchone()
|
|
total = count_row["total"] if count_row else 0
|
|
|
|
# Fetch entries (where_sql built from trusted column names only)
|
|
query = f"""SELECT id, timestamp, event_type, client_id, client_ip,
|
|
paste_id, request_id, outcome, details
|
|
FROM audit_log
|
|
WHERE {where_sql}
|
|
ORDER BY timestamp DESC
|
|
LIMIT ? OFFSET ?""" # noqa: S608 # nosec B608
|
|
rows: list[Row] = db.execute(query, [*params, limit, offset]).fetchall()
|
|
|
|
entries = []
|
|
for row in rows:
|
|
entry: dict[str, Any] = {
|
|
"id": row["id"],
|
|
"timestamp": row["timestamp"],
|
|
"event_type": row["event_type"],
|
|
"outcome": row["outcome"],
|
|
}
|
|
if row["client_id"]:
|
|
entry["client_id"] = row["client_id"]
|
|
if row["client_ip"]:
|
|
entry["client_ip"] = row["client_ip"]
|
|
if row["paste_id"]:
|
|
entry["paste_id"] = row["paste_id"]
|
|
if row["request_id"]:
|
|
entry["request_id"] = row["request_id"]
|
|
if row["details"]:
|
|
try:
|
|
entry["details"] = json.loads(row["details"])
|
|
except json.JSONDecodeError:
|
|
entry["details"] = row["details"]
|
|
entries.append(entry)
|
|
|
|
return entries, total
|
|
|
|
|
|
def cleanup_old_audit_logs(retention_days: int | None = None) -> int:
|
|
"""Delete audit log entries older than retention period.
|
|
|
|
Args:
|
|
retention_days: Number of days to keep. If None, uses config.
|
|
|
|
Returns:
|
|
Number of deleted entries.
|
|
"""
|
|
if retention_days is None:
|
|
retention_days = current_app.config.get("AUDIT_RETENTION_DAYS", 90)
|
|
|
|
cutoff = int(time.time()) - (retention_days * 24 * 60 * 60)
|
|
|
|
db = get_db()
|
|
cursor = db.execute("DELETE FROM audit_log WHERE timestamp < ?", (cutoff,))
|
|
db.commit()
|
|
|
|
return cursor.rowcount
|