"""Audit logging for security events.""" from __future__ import annotations import json import time from enum import StrEnum from typing import TYPE_CHECKING, Any from flask import current_app, g from app.database import get_db if TYPE_CHECKING: from sqlite3 import Row class AuditEvent(StrEnum): """Security-relevant event types for audit logging.""" PASTE_CREATE = "paste_create" PASTE_ACCESS = "paste_access" PASTE_DELETE = "paste_delete" PASTE_UPDATE = "paste_update" RATE_LIMIT = "rate_limit" POW_FAILURE = "pow_failure" DEDUP_BLOCK = "dedup_block" CERT_ISSUED = "cert_issued" CERT_REVOKED = "cert_revoked" AUTH_FAILURE = "auth_failure" URL_CREATE = "url_create" URL_ACCESS = "url_access" URL_DELETE = "url_delete" class AuditOutcome(StrEnum): """Outcome types for audit events.""" SUCCESS = "success" FAILURE = "failure" BLOCKED = "blocked" def log_event( event_type: AuditEvent | str, outcome: AuditOutcome | str, *, paste_id: str | None = None, client_id: str | None = None, client_ip: str | None = None, details: dict[str, Any] | None = None, ) -> None: """Log a security-relevant event to the audit log. Args: event_type: Type of event (from AuditEvent enum or string) outcome: Result of the event (success, failure, blocked) paste_id: Related paste ID (if applicable) client_id: Client certificate fingerprint (if authenticated) client_ip: Client IP address details: Additional event-specific details (stored as JSON) """ if not current_app.config.get("AUDIT_ENABLED", True): return now = int(time.time()) request_id = getattr(g, "request_id", None) # Convert enums to strings event_type_str = event_type.value if isinstance(event_type, AuditEvent) else event_type outcome_str = outcome.value if isinstance(outcome, AuditOutcome) else outcome # Serialize details to JSON if provided details_json = json.dumps(details, ensure_ascii=False) if details else None db = get_db() db.execute( """INSERT INTO audit_log (timestamp, event_type, client_id, client_ip, paste_id, request_id, outcome, details) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( now, event_type_str, client_id, client_ip, paste_id, request_id, outcome_str, details_json, ), ) db.commit() def query_audit_log( *, event_type: str | None = None, client_id: str | None = None, paste_id: str | None = None, outcome: str | None = None, since: int | None = None, until: int | None = None, limit: int = 100, offset: int = 0, ) -> tuple[list[dict[str, Any]], int]: """Query audit log with filters. Args: event_type: Filter by event type client_id: Filter by client fingerprint paste_id: Filter by paste ID outcome: Filter by outcome (success, failure, blocked) since: Filter by timestamp >= since until: Filter by timestamp <= until limit: Maximum results to return offset: Pagination offset Returns: Tuple of (list of audit entries, total count matching filters) """ db = get_db() where_clauses: list[str] = [] params: list[Any] = [] if event_type: where_clauses.append("event_type = ?") params.append(event_type) if client_id: where_clauses.append("client_id = ?") params.append(client_id) if paste_id: where_clauses.append("paste_id = ?") params.append(paste_id) if outcome: where_clauses.append("outcome = ?") params.append(outcome) if since: where_clauses.append("timestamp >= ?") params.append(since) if until: where_clauses.append("timestamp <= ?") params.append(until) where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" # Get total count count_row = db.execute( f"SELECT COUNT(*) as total FROM audit_log WHERE {where_sql}", # noqa: S608 # nosec B608 params, ).fetchone() total = count_row["total"] if count_row else 0 # Fetch entries (where_sql built from trusted column names only) query = f"""SELECT id, timestamp, event_type, client_id, client_ip, paste_id, request_id, outcome, details FROM audit_log WHERE {where_sql} ORDER BY timestamp DESC LIMIT ? OFFSET ?""" # noqa: S608 # nosec B608 rows: list[Row] = db.execute(query, [*params, limit, offset]).fetchall() entries = [] for row in rows: entry: dict[str, Any] = { "id": row["id"], "timestamp": row["timestamp"], "event_type": row["event_type"], "outcome": row["outcome"], } if row["client_id"]: entry["client_id"] = row["client_id"] if row["client_ip"]: entry["client_ip"] = row["client_ip"] if row["paste_id"]: entry["paste_id"] = row["paste_id"] if row["request_id"]: entry["request_id"] = row["request_id"] if row["details"]: try: entry["details"] = json.loads(row["details"]) except json.JSONDecodeError: entry["details"] = row["details"] entries.append(entry) return entries, total def cleanup_old_audit_logs(retention_days: int | None = None) -> int: """Delete audit log entries older than retention period. Args: retention_days: Number of days to keep. If None, uses config. Returns: Number of deleted entries. """ if retention_days is None: retention_days = current_app.config.get("AUDIT_RETENTION_DAYS", 90) cutoff = int(time.time()) - (retention_days * 24 * 60 * 60) db = get_db() cursor = db.execute("DELETE FROM audit_log WHERE timestamp < ?", (cutoff,)) db.commit() return cursor.rowcount