"""API route handlers using modern Flask patterns.""" from __future__ import annotations import hashlib import hmac import json import math import re import secrets import string import threading import time from collections import defaultdict from typing import TYPE_CHECKING, Any from urllib.parse import urlparse from flask import Response, current_app, g, request from flask.views import MethodView from app.api import bp from app.audit import AuditEvent, AuditOutcome, log_event from app.config import VERSION from app.database import check_content_hash, get_db, hash_password, verify_password from app.metrics import ( record_dedup, record_paste_accessed, record_paste_created, record_paste_deleted, record_pow, record_rate_limit, record_url_accessed, record_url_created, record_url_deleted, ) if TYPE_CHECKING: from sqlite3 import Row # Compiled patterns for validation PASTE_ID_PATTERN = re.compile(r"^[a-f0-9]+$") CLIENT_ID_PATTERN = re.compile(r"^[a-f0-9]{40}$") MIME_PATTERN = re.compile(r"^[a-z0-9][a-z0-9!#$&\-^_.+]*/[a-z0-9][a-z0-9!#$&\-^_.+]*$") SHORT_ID_PATTERN = re.compile(r"^[a-zA-Z0-9]+$") CONTROL_CHAR_PATTERN = re.compile(r"[\x00-\x1f\x7f]") SHORT_ID_ALPHABET = string.ascii_letters + string.digits ALLOWED_URL_SCHEMES = frozenset({"http", "https"}) # NOTE: Magic byte detection commented out - using text/binary detection only. # Security headers (X-Content-Type-Options: nosniff, CSP) prevent MIME confusion. # For full MIME detection, consider using the `filetype` library. # # MAGIC_SIGNATURES: dict[bytes, str] = { # b"\x89PNG\r\n\x1a\n": "image/png", # b"\xff\xd8\xff": "image/jpeg", # b"GIF87a": "image/gif", # b"GIF89a": "image/gif", # b"%PDF": "application/pdf", # b"PK\x03\x04": "application/zip", # # ... (see git history for full list) # } # Generic MIME types to override with detection GENERIC_MIME_TYPES = frozenset( { "application/octet-stream", "application/x-www-form-urlencoded", "text/plain", } ) # ───────────────────────────────────────────────────────────────────────────── # Anti-flood: dynamic PoW difficulty adjustment # ───────────────────────────────────────────────────────────────────────────── _antiflood_lock = threading.Lock() _antiflood_requests: list[float] = [] # Global request timestamps _antiflood_difficulty: int = 0 # Current difficulty boost (added to base) _antiflood_last_increase: float = 0 # Last time difficulty was increased def get_dynamic_difficulty() -> int: """Get current PoW difficulty including anti-flood adjustment.""" base: int = current_app.config["POW_DIFFICULTY"] if base == 0 or not current_app.config.get("ANTIFLOOD_ENABLED", True): return base with _antiflood_lock: max_diff: int = current_app.config["ANTIFLOOD_MAX"] return min(base + _antiflood_difficulty, max_diff) def record_antiflood_request() -> None: """Record a request for anti-flood tracking and adjust difficulty.""" if not current_app.config.get("ANTIFLOOD_ENABLED", True): return if current_app.config["POW_DIFFICULTY"] == 0: return global _antiflood_difficulty, _antiflood_last_increase now = time.time() window = current_app.config["ANTIFLOOD_WINDOW"] threshold = current_app.config["ANTIFLOOD_THRESHOLD"] step = current_app.config["ANTIFLOOD_STEP"] max_diff = current_app.config["ANTIFLOOD_MAX"] decay = current_app.config["ANTIFLOOD_DECAY"] base = current_app.config["POW_DIFFICULTY"] max_entries = current_app.config.get("ANTIFLOOD_MAX_ENTRIES", 10000) with _antiflood_lock: # Clean old requests cutoff = now - window _antiflood_requests[:] = [t for t in _antiflood_requests if t > cutoff] # FLOOD-001: Cap list size to prevent memory exhaustion if len(_antiflood_requests) >= max_entries: # Keep only the most recent half _antiflood_requests[:] = _antiflood_requests[-(max_entries // 2) :] # Record this request _antiflood_requests.append(now) count = len(_antiflood_requests) # Check if we should increase difficulty if count > threshold: # Increase difficulty if not already at max if base + _antiflood_difficulty < max_diff: _antiflood_difficulty += step _antiflood_last_increase = now elif _antiflood_difficulty > 0 and (now - _antiflood_last_increase) > decay: # Decay difficulty if abuse has stopped _antiflood_difficulty = max(0, _antiflood_difficulty - step) _antiflood_last_increase = now # Reset timer def reset_antiflood() -> None: """Reset anti-flood state (for testing).""" global _antiflood_difficulty, _antiflood_last_increase with _antiflood_lock: _antiflood_requests.clear() _antiflood_difficulty = 0 _antiflood_last_increase = 0 # ───────────────────────────────────────────────────────────────────────────── # Rate Limiting (in-memory sliding window) # ───────────────────────────────────────────────────────────────────────────── _rate_limit_lock = threading.Lock() _rate_limit_requests: dict[str, list[float]] = defaultdict(list) def get_client_ip() -> str: """Get client IP address, respecting X-Forwarded-For from trusted proxy.""" if is_trusted_proxy(): forwarded = request.headers.get("X-Forwarded-For", "") if forwarded: # Take the first (client) IP from the chain return forwarded.split(",")[0].strip() return request.remote_addr or "unknown" def check_rate_limit(client_ip: str, authenticated: bool = False) -> tuple[bool, int, int, int]: """Check if request is within rate limit. Args: client_ip: Client IP address authenticated: Whether client is authenticated (higher limits) Returns: Tuple of (allowed, remaining, limit, reset_timestamp) - allowed: Whether request is within rate limit - remaining: Requests remaining in current window - limit: Maximum requests per window - reset_timestamp: Unix timestamp when window resets """ if not current_app.config.get("RATE_LIMIT_ENABLED", True): return True, -1, -1, 0 window = current_app.config["RATE_LIMIT_WINDOW"] max_requests = current_app.config["RATE_LIMIT_MAX"] max_entries = current_app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000) cleanup_threshold = current_app.config.get("RATE_LIMIT_CLEANUP_THRESHOLD", 0.8) if authenticated: max_requests *= current_app.config.get("RATE_LIMIT_AUTH_MULTIPLIER", 5) now = time.time() cutoff = now - window with _rate_limit_lock: entry_count = len(_rate_limit_requests) # RATE-002: Proactive cleanup when exceeding threshold threshold_count = int(max_entries * cleanup_threshold) if entry_count >= threshold_count: # Clean up expired entries first _prune_rate_limit_entries(threshold_count // 2, cutoff) # RATE-001: Hard limit enforcement (fallback if threshold cleanup wasn't enough) if len(_rate_limit_requests) >= max_entries and client_ip not in _rate_limit_requests: # Evict oldest entries (those with oldest last request time) _prune_rate_limit_entries(max_entries // 2, cutoff) # Clean old requests and get current list requests = _rate_limit_requests[client_ip] requests[:] = [t for t in requests if t > cutoff] current_count = len(requests) # Calculate reset timestamp (when oldest request expires, or now + window if empty) reset_timestamp = int(requests[0] + window) if requests else int(now + window) if current_count >= max_requests: return False, 0, max_requests, reset_timestamp # Record this request requests.append(now) remaining = max_requests - len(requests) return True, remaining, max_requests, reset_timestamp def _prune_rate_limit_entries(target_size: int, cutoff: float) -> None: """Prune rate limit entries to target size. Must hold _rate_limit_lock. Removes entries with no recent activity first, then oldest entries. """ # First pass: remove entries with all expired requests to_remove = [] for ip, requests in _rate_limit_requests.items(): if not requests or all(t <= cutoff for t in requests): to_remove.append(ip) for ip in to_remove: del _rate_limit_requests[ip] # Second pass: if still over target, remove entries with oldest last activity if len(_rate_limit_requests) > target_size: # Sort by most recent request timestamp (ascending = oldest first) entries = sorted( _rate_limit_requests.items(), key=lambda x: max(x[1]) if x[1] else 0, ) # Remove oldest until we're at target size remove_count = len(entries) - target_size for ip, _ in entries[:remove_count]: del _rate_limit_requests[ip] def cleanup_rate_limits(window: int | None = None) -> int: """Remove expired rate limit entries. Returns count of cleaned entries. Args: window: Rate limit window in seconds. If None, uses app config. """ # This should be called periodically (e.g., via cleanup task) if window is None: window = current_app.config.get("RATE_LIMIT_WINDOW", 60) cutoff = time.time() - window cleaned = 0 with _rate_limit_lock: to_remove = [] for ip, requests in _rate_limit_requests.items(): requests[:] = [t for t in requests if t > cutoff] if not requests: to_remove.append(ip) for ip in to_remove: del _rate_limit_requests[ip] cleaned += 1 return cleaned def reset_rate_limits() -> None: """Clear all rate limit state. For testing only.""" with _rate_limit_lock: _rate_limit_requests.clear() # ───────────────────────────────────────────────────────────────────────────── # ENUM-001: Lookup Rate Limiting (prevents paste ID enumeration) # ───────────────────────────────────────────────────────────────────────────── _lookup_rate_limit_lock = threading.Lock() _lookup_rate_limit_requests: dict[str, list[float]] = defaultdict(list) def check_lookup_rate_limit(client_ip: str) -> tuple[bool, int]: """Check if lookup request is within rate limit. Args: client_ip: Client IP address Returns: Tuple of (allowed, retry_after_seconds) """ if not current_app.config.get("LOOKUP_RATE_LIMIT_ENABLED", True): return True, 0 window = current_app.config.get("LOOKUP_RATE_LIMIT_WINDOW", 60) max_requests = current_app.config.get("LOOKUP_RATE_LIMIT_MAX", 60) max_entries = current_app.config.get("LOOKUP_RATE_LIMIT_MAX_ENTRIES", 10000) now = time.time() cutoff = now - window with _lookup_rate_limit_lock: # ENUM-002: Memory protection - prune if at capacity if ( len(_lookup_rate_limit_requests) >= max_entries and client_ip not in _lookup_rate_limit_requests ): # Evict expired entries first expired = [ ip for ip, reqs in _lookup_rate_limit_requests.items() if not reqs or reqs[-1] <= cutoff ] for ip in expired: del _lookup_rate_limit_requests[ip] # If still at capacity, evict oldest entries if len(_lookup_rate_limit_requests) >= max_entries: sorted_ips = sorted( _lookup_rate_limit_requests.items(), key=lambda x: x[1][-1] if x[1] else 0, ) for ip, _ in sorted_ips[: max_entries // 4]: del _lookup_rate_limit_requests[ip] requests = _lookup_rate_limit_requests[client_ip] requests[:] = [t for t in requests if t > cutoff] if len(requests) >= max_requests: retry_after = int(requests[0] + window - now) + 1 return False, max(1, retry_after) requests.append(now) return True, 0 def reset_lookup_rate_limits() -> None: """Clear lookup rate limit state. For testing only.""" with _lookup_rate_limit_lock: _lookup_rate_limit_requests.clear() def add_rate_limit_headers( response: Response, remaining: int, limit: int, reset_timestamp: int ) -> Response: """Add standard rate limit headers to response. Headers follow draft-ietf-httpapi-ratelimit-headers convention: - X-RateLimit-Limit: Maximum requests per window - X-RateLimit-Remaining: Requests remaining in current window - X-RateLimit-Reset: Unix timestamp when window resets """ if limit > 0: # Only add headers if rate limiting is enabled response.headers["X-RateLimit-Limit"] = str(limit) response.headers["X-RateLimit-Remaining"] = str(max(0, remaining)) response.headers["X-RateLimit-Reset"] = str(reset_timestamp) return response # ───────────────────────────────────────────────────────────────────────────── # Response Helpers # ───────────────────────────────────────────────────────────────────────────── def json_response(data: dict[str, Any], status: int = 200) -> Response: """Create JSON response with proper encoding.""" return Response( json.dumps(data, ensure_ascii=False), status=status, mimetype="application/json", ) def error_response(message: str, status: int, **extra: Any) -> Response: """Create standardized error response.""" data = {"error": message, **extra} return json_response(data, status) # ───────────────────────────────────────────────────────────────────────────── # URL Helpers # ───────────────────────────────────────────────────────────────────────────── def url_prefix() -> str: """Get configured URL prefix for reverse proxy deployments.""" prefix: str = current_app.config.get("URL_PREFIX", "") return prefix def prefixed_url(path: str) -> str: """Generate URL with configured prefix.""" return f"{url_prefix()}{path}" def paste_url(paste_id: str) -> str: """Generate URL for paste metadata endpoint.""" return prefixed_url(f"/{paste_id}") def paste_raw_url(paste_id: str) -> str: """Generate URL for raw paste content endpoint.""" return prefixed_url(f"/{paste_id}/raw") def short_url_path(short_id: str) -> str: """Generate path for short URL redirect endpoint.""" return prefixed_url(f"/s/{short_id}") def short_url_info_path(short_id: str) -> str: """Generate path for short URL info endpoint.""" return prefixed_url(f"/s/{short_id}/info") def base_url() -> str: """Detect full base URL from request headers.""" scheme = ( request.headers.get("X-Forwarded-Proto") or request.headers.get("X-Scheme") or request.scheme ) host = request.headers.get("X-Forwarded-Host") or request.headers.get("Host") or request.host return f"{scheme}://{host}{url_prefix()}" # ───────────────────────────────────────────────────────────────────────────── # Response Builders # ───────────────────────────────────────────────────────────────────────────── def build_paste_metadata( paste_id: str, mime_type: str, size: int, created_at: int, *, owner: str | None = None, burn_after_read: bool = False, expires_at: int | None = None, password_protected: bool = False, include_owner: bool = False, last_accessed: int | None = None, display_name: str | None = None, ) -> dict[str, Any]: """Build standardized paste metadata response dict. Args: paste_id: Paste identifier mime_type: Content MIME type size: Content size in bytes created_at: Creation timestamp owner: Owner fingerprint (included only if include_owner=True) burn_after_read: Whether paste is burn-after-read expires_at: Expiration timestamp password_protected: Whether paste has password include_owner: Whether to include owner in response last_accessed: Last access timestamp (optional) display_name: Human-readable label (optional) """ data: dict[str, Any] = { "id": paste_id, "mime_type": mime_type, "size": size, "created_at": created_at, "url": paste_url(paste_id), "raw": paste_raw_url(paste_id), } if last_accessed is not None: data["last_accessed"] = last_accessed if include_owner and owner: data["owner"] = owner if burn_after_read: data["burn_after_read"] = True if expires_at: data["expires_at"] = expires_at if password_protected: data["password_protected"] = True if display_name: data["display_name"] = display_name return data # ───────────────────────────────────────────────────────────────────────────── # Validation Helpers (used within views) # ───────────────────────────────────────────────────────────────────────────── def validate_paste_id(paste_id: str) -> Response | None: """Validate paste ID format. Returns error response or None if valid.""" expected_length = current_app.config["PASTE_ID_LENGTH"] if len(paste_id) != expected_length or not PASTE_ID_PATTERN.match(paste_id): return error_response("Invalid paste ID", 400) return None def fetch_paste(paste_id: str, check_password: bool = True) -> Response | None: """Fetch paste and store in g.paste. Returns error response or None if OK.""" # ENUM-001: Rate limit lookups to prevent enumeration attacks # Trusted certificate holders are exempt if not get_client_id(): client_ip = get_client_ip() allowed, retry_after = check_lookup_rate_limit(client_ip) if not allowed: response = error_response( f"Lookup rate limit exceeded. Retry after {retry_after} seconds.", 429, retry_after=retry_after, ) response.headers["Retry-After"] = str(retry_after) return response db = get_db() now = int(time.time()) # Update access time db.execute("UPDATE pastes SET last_accessed = ? WHERE id = ?", (now, paste_id)) row = db.execute( """SELECT id, content, mime_type, owner, created_at, length(content) as size, burn_after_read, expires_at, password_hash, display_name FROM pastes WHERE id = ?""", (paste_id,), ).fetchone() if row is None: # TIMING-001: Perform dummy password verification to prevent timing-based # enumeration (attacker can't distinguish "not found" from "wrong password" # by measuring response time) if check_password: dummy_hash = ( "$pbkdf2-sha256$600000$" "0000000000000000000000000000000000000000000000000000000000000000$" "0000000000000000000000000000000000000000000000000000000000000000" ) verify_password("dummy", dummy_hash) db.commit() return error_response("Paste not found", 404) # Password verification if check_password and row["password_hash"]: provided = request.headers.get("X-Paste-Password", "") if not provided: db.commit() return error_response("Password required", 401, password_protected=True) if not verify_password(provided, row["password_hash"]): db.commit() return error_response("Invalid password", 403) g.paste = row g.db = db return None def generate_short_id() -> str: """Generate a random base62 short ID.""" length = current_app.config["SHORT_ID_LENGTH"] return "".join(secrets.choice(SHORT_ID_ALPHABET) for _ in range(length)) def validate_short_id(short_id: str) -> Response | None: """Validate short URL ID format. Returns error response or None if valid.""" expected_length = current_app.config["SHORT_ID_LENGTH"] if len(short_id) != expected_length or not SHORT_ID_PATTERN.match(short_id): return error_response("Invalid short URL ID", 400) return None def validate_target_url(url: str) -> Response | None: """Validate target URL for shortening. Returns error response or None if valid.""" max_length = current_app.config["SHORT_URL_MAX_LENGTH"] if len(url) > max_length: return error_response("URL too long", 400, max_length=max_length, length=len(url)) parsed = urlparse(url) if parsed.scheme not in ALLOWED_URL_SCHEMES: return error_response("Invalid URL scheme", 400, allowed=list(ALLOWED_URL_SCHEMES)) if not parsed.netloc: return error_response("Invalid URL: missing host", 400) return None def fetch_short_url(short_id: str, increment_counter: bool = True) -> Response | None: """Fetch short URL and store in g.short_url. Returns error response or None if OK.""" # Trusted certificate holders are exempt from lookup rate limiting if not get_client_id(): client_ip = get_client_ip() allowed, retry_after = check_lookup_rate_limit(client_ip) if not allowed: response = error_response( f"Lookup rate limit exceeded. Retry after {retry_after} seconds.", 429, retry_after=retry_after, ) response.headers["Retry-After"] = str(retry_after) return response db = get_db() now = int(time.time()) if increment_counter: db.execute( "UPDATE short_urls SET last_accessed = ?, access_count = access_count + 1 WHERE id = ?", (now, short_id), ) else: db.execute( "UPDATE short_urls SET last_accessed = ? WHERE id = ?", (now, short_id), ) row = db.execute( """SELECT id, target_url, owner, created_at, last_accessed, access_count, expires_at FROM short_urls WHERE id = ?""", (short_id,), ).fetchone() if row is None: db.commit() return error_response("Short URL not found", 404) # Check expiry if row["expires_at"] and row["expires_at"] < now: db.execute("DELETE FROM short_urls WHERE id = ?", (short_id,)) db.commit() return error_response("Short URL expired", 404) db.commit() g.short_url = row return None def require_auth() -> Response | None: """Check authentication for ownership operations. Uses get_client_fingerprint() to allow both trusted and untrusted certificate holders to manage their own pastes. Returns error response or None if authenticated. """ client_id = get_client_fingerprint() if not client_id: return error_response("Authentication required", 401) g.client_id = client_id return None # ───────────────────────────────────────────────────────────────────────────── # Authentication & Security # ───────────────────────────────────────────────────────────────────────────── def is_trusted_proxy() -> bool: """Verify request comes from trusted reverse proxy via shared secret. Result is cached per-request in Flask's g object for efficiency. """ if hasattr(g, "_trusted_proxy"): return bool(g._trusted_proxy) expected = current_app.config.get("TRUSTED_PROXY_SECRET", "") if not expected: g._trusted_proxy = True return True provided = request.headers.get("X-Proxy-Secret", "") g._trusted_proxy = hmac.compare_digest(expected, provided) return bool(g._trusted_proxy) def get_client_fingerprint() -> str | None: """Extract client certificate fingerprint for identity/ownership. Returns fingerprint regardless of trust status. Used for: - Paste ownership tracking - Delete/update/list operations (user manages their own pastes) Returns None if no valid fingerprint provided or proxy not trusted. """ if not is_trusted_proxy(): return None sha1 = request.headers.get("X-SSL-Client-SHA1", "").strip().lower() if sha1 and CLIENT_ID_PATTERN.match(sha1): return sha1 return None def get_client_id() -> str | None: """Get trusted client certificate fingerprint for elevated privileges. Returns fingerprint only if certificate is valid and not revoked. Used for: - Rate limit benefits (higher limits for trusted users) - Size limit benefits (larger pastes for trusted users) Untrusted certificates return None here but still work via get_client_fingerprint() for ownership operations. """ fingerprint = get_client_fingerprint() if not fingerprint: return None # Check if PKI is enabled and certificate is revoked if current_app.config.get("PKI_ENABLED"): from app.pki import is_certificate_valid if not is_certificate_valid(fingerprint): current_app.logger.warning( "Elevated auth rejected (revoked/expired): %s", fingerprint[:12] + "..." ) log_event( AuditEvent.AUTH_FAILURE, AuditOutcome.BLOCKED, client_id=fingerprint, client_ip=get_client_ip(), details={"reason": "revoked_or_expired"}, ) return None return fingerprint def is_admin() -> bool: """Check if current authenticated user is an admin. Returns True only if: - User has a valid client certificate - Certificate is marked as admin in the PKI database """ client_id = get_client_id() if not client_id: return False if not current_app.config.get("PKI_ENABLED"): return False from app.pki import is_admin_certificate return is_admin_certificate(client_id) # ───────────────────────────────────────────────────────────────────────────── # Proof-of-Work # ───────────────────────────────────────────────────────────────────────────── def get_pow_secret() -> bytes: """Get PoW signing secret from app config.""" secret: str = current_app.config["POW_SECRET"] return secret.encode() def generate_challenge(difficulty_override: int | None = None) -> dict[str, Any]: """Generate new PoW challenge with signed token. Uses dynamic difficulty which may be elevated during high load, unless difficulty_override is specified. Args: difficulty_override: Optional fixed difficulty (for registration) """ if difficulty_override is not None: difficulty = difficulty_override else: difficulty = get_dynamic_difficulty() ttl = current_app.config["POW_CHALLENGE_TTL"] expires = int(time.time()) + ttl nonce = secrets.token_hex(16) msg = f"{nonce}:{expires}:{difficulty}".encode() sig = hmac.new(get_pow_secret(), msg, hashlib.sha256).hexdigest() return { "nonce": nonce, "difficulty": difficulty, "expires": expires, "token": f"{nonce}:{expires}:{difficulty}:{sig}", } def verify_pow(token: str, solution: str, min_difficulty: int | None = None) -> tuple[bool, str]: """Verify proof-of-work solution. Returns (valid, error_message). Accepts tokens with difficulty >= min_difficulty. The solution must meet the token's embedded difficulty (which may be elevated due to anti-flood). Args: token: PoW challenge token solution: Nonce solution min_difficulty: Minimum required difficulty (defaults to POW_DIFFICULTY) """ base_difficulty = current_app.config["POW_DIFFICULTY"] if base_difficulty == 0 and min_difficulty is None: return True, "" required_difficulty = min_difficulty if min_difficulty is not None else base_difficulty if required_difficulty == 0: return True, "" # Parse token try: parts = token.split(":") if len(parts) != 4: return False, "Invalid challenge format" nonce, expires_str, diff_str, sig = parts expires = int(expires_str) token_diff = int(diff_str) except (ValueError, TypeError): return False, "Invalid challenge format" # Verify signature msg = f"{nonce}:{expires}:{token_diff}".encode() expected_sig = hmac.new(get_pow_secret(), msg, hashlib.sha256).hexdigest() if not hmac.compare_digest(sig, expected_sig): return False, "Invalid challenge signature" # Check expiry if int(time.time()) > expires: return False, "Challenge expired" # Token difficulty must be at least the required difficulty if token_diff < required_difficulty: return False, f"Difficulty too low: {token_diff} < {required_difficulty}" # Verify solution try: solution_int = int(solution) if solution_int < 0: return False, "Invalid solution" except (ValueError, TypeError): return False, "Invalid solution" # Check hash meets the token's difficulty (not current dynamic difficulty) work = f"{nonce}:{solution}".encode() hash_bytes = hashlib.sha256(work).digest() zero_bits = 0 for byte in hash_bytes: if byte == 0: zero_bits += 8 else: zero_bits += 8 - byte.bit_length() break if zero_bits < token_diff: return False, f"Insufficient work: {zero_bits} < {token_diff} bits" return True, "" # ───────────────────────────────────────────────────────────────────────────── # Content Processing # ───────────────────────────────────────────────────────────────────────────── def calculate_entropy(data: bytes) -> float: """Calculate Shannon entropy in bits per byte (0-8 range).""" if not data: return 0.0 freq = [0] * 256 for byte in data: freq[byte] += 1 length = len(data) entropy = 0.0 for count in freq: if count > 0: p = count / length entropy -= p * math.log2(p) return entropy def detect_mime_type(content: bytes, content_type: str | None = None) -> str: """Detect MIME type based on text/binary analysis. Simple approach: if content is valid UTF-8, it's text/plain. Otherwise, it's application/octet-stream (binary). Security headers (X-Content-Type-Options: nosniff, CSP) prevent browsers from MIME-sniffing and executing embedded scripts. """ # Honor explicit Content-Type if specific (not generic) if content_type: mime = content_type.split(";")[0].strip().lower() if mime not in GENERIC_MIME_TYPES and MIME_PATTERN.match(mime): return mime # Text vs binary detection try: content.decode("utf-8") return "text/plain" except UnicodeDecodeError: return "application/octet-stream" def is_recognizable_format(content: bytes) -> tuple[bool, str | None]: """Check if content is a recognizable (likely unencrypted) format. Returns (is_recognizable, detected_format). Used to enforce encryption by rejecting known formats. Simple approach: valid UTF-8 text is recognizable (plaintext). Binary content is considered potentially encrypted (not recognizable). """ # Check if valid UTF-8 text (plaintext) try: content.decode("utf-8") return True, "text/plain" except UnicodeDecodeError: pass return False, None def generate_paste_id(content: bytes) -> str: """Generate unique paste ID from content hash and timestamp.""" data = content + str(time.time_ns()).encode() length = current_app.config["PASTE_ID_LENGTH"] return hashlib.sha256(data).hexdigest()[:length] # ───────────────────────────────────────────────────────────────────────────── # Class-Based Views # ───────────────────────────────────────────────────────────────────────────── class IndexView(MethodView): """Handle API info and paste creation.""" def get(self) -> Response: """Return API information and usage examples.""" url = base_url() difficulty = get_dynamic_difficulty() pki_enabled = current_app.config.get("PKI_ENABLED", False) # Build endpoints dict endpoints: dict[str, str] = { f"GET {prefixed_url('/')}": "API information", f"GET {prefixed_url('/health')}": "Health check", f"GET {prefixed_url('/client')}": "Download CLI client (fpaste)", f"GET {prefixed_url('/challenge')}": "Get proof-of-work challenge", f"POST {prefixed_url('/')}": "Create paste (PoW required unless trusted cert)", f"GET {prefixed_url('/pastes')}": "List your pastes (cert required)", f"GET {prefixed_url('/')}": "Get paste metadata", f"GET {prefixed_url('//raw')}": "Get raw paste content", f"PUT {prefixed_url('/')}": "Update paste (owner only)", f"DELETE {prefixed_url('/')}": "Delete paste (owner only)", f"GET {prefixed_url('/register/challenge')}": "Get registration challenge", f"POST {prefixed_url('/register')}": "Register for client certificate", f"POST {prefixed_url('/s')}": "Create short URL (PoW required unless trusted cert)", f"GET {prefixed_url('/s')}": "List your short URLs (cert required)", f"GET {prefixed_url('/s/')}": "Redirect to target URL", f"GET {prefixed_url('/s//info')}": "Short URL metadata", f"DELETE {prefixed_url('/s/')}": "Delete short URL (owner only)", } if pki_enabled: endpoints.update( { f"GET {prefixed_url('/pki')}": "PKI status", f"GET {prefixed_url('/pki/ca.crt')}": "Download CA certificate", f"POST {prefixed_url('/pki/issue')}": "Issue client certificate", f"GET {prefixed_url('/pki/certs')}": "List certificates", f"POST {prefixed_url('/pki/revoke/')}": "Revoke certificate", } ) # Build response response_data: dict[str, Any] = { "version": VERSION, "endpoints": endpoints, "authentication": { "anonymous": "Create pastes only (strict limits)", "client_cert": "Create + manage own pastes (strict limits)", "trusted_cert": "All operations (no rate limits)", }, "limits": { "anonymous": { "max_size": current_app.config["MAX_PASTE_SIZE_ANON"], "rate": f"{current_app.config['RATE_LIMIT_MAX']}/min", }, "trusted": { "max_size": current_app.config["MAX_PASTE_SIZE_AUTH"], "rate": "unlimited", }, }, "pow": { "enabled": difficulty > 0, "difficulty": difficulty, "hint": "GET /challenge, solve, submit with X-PoW-Token + X-PoW-Solution", }, "cli": { "install": f"curl -o fpaste {url}/client && chmod +x fpaste", "usage": "fpaste file.txt # encrypts by default", }, "usage": { "create": f"curl -X POST --data-binary @file.txt {url}/ (with PoW headers)", "get": f"curl {url}//raw", "delete": f"curl -X DELETE {url}/ (with X-SSL-Client-SHA1)", }, } return json_response(response_data) def post(self) -> Response: """Create a new paste.""" # Parse content content: bytes | None = None mime_type: str | None = None if request.is_json: data = request.get_json(silent=True) if data and isinstance(data.get("content"), str): content = data["content"].encode("utf-8") mime_type = "text/plain" else: content = request.get_data(as_text=False) if content: mime_type = detect_mime_type(content, request.content_type) if not content: return error_response("No content provided", 400) # Separate trusted (for limits) from fingerprint (for ownership) trusted_client = get_client_id() # Only trusted certs get elevated limits owner = get_client_fingerprint() # Any cert can own pastes # Rate limiting (trusted certs exempt) client_ip = get_client_ip() if not trusted_client: allowed, remaining, limit, reset_timestamp = check_rate_limit( client_ip, authenticated=False ) # Store rate limit info for response headers g.rate_limit_remaining = remaining g.rate_limit_limit = limit g.rate_limit_reset = reset_timestamp if not allowed: current_app.logger.warning("Rate limit exceeded: ip=%s", client_ip) # Audit log rate limit event if current_app.config.get("AUDIT_ENABLED", True): from app.audit import AuditEvent, AuditOutcome, log_event log_event( AuditEvent.RATE_LIMIT, AuditOutcome.BLOCKED, client_id=owner, client_ip=client_ip, ) record_rate_limit("blocked") retry_after = max(1, reset_timestamp - int(time.time())) response = error_response( "Rate limit exceeded", 429, retry_after=retry_after, ) response.headers["Retry-After"] = str(retry_after) add_rate_limit_headers(response, 0, limit, reset_timestamp) return response # Proof-of-work verification (trusted certs exempt) difficulty = current_app.config["POW_DIFFICULTY"] if difficulty > 0 and not trusted_client: token = request.headers.get("X-PoW-Token", "") solution = request.headers.get("X-PoW-Solution", "") if not token or not solution: return error_response( "Proof-of-work required", 400, hint="GET /challenge for a new challenge" ) valid, err = verify_pow(token, solution) if not valid: current_app.logger.warning( "PoW verification failed: %s from=%s", err, request.remote_addr ) # Audit log PoW failure if current_app.config.get("AUDIT_ENABLED", True): from app.audit import AuditEvent, AuditOutcome, log_event log_event( AuditEvent.POW_FAILURE, AuditOutcome.FAILURE, client_id=owner, client_ip=client_ip, details={"error": err}, ) record_pow("failure") return error_response(f"Proof-of-work failed: {err}", 400) record_pow("success") # Size limits (only trusted clients get elevated limits) content_size = len(content) max_size = ( current_app.config["MAX_PASTE_SIZE_AUTH"] if trusted_client else current_app.config["MAX_PASTE_SIZE_ANON"] ) if content_size > max_size: return error_response( "Paste too large", 413, size=content_size, max_size=max_size, trusted=trusted_client is not None, ) # Minimum size check (enforces encryption overhead) min_size = current_app.config.get("MIN_PASTE_SIZE", 0) if min_size > 0 and content_size < min_size: return error_response( "Paste too small", 400, size=content_size, min_size=min_size, hint="Encrypt content before uploading (fpaste encrypts by default)", ) # Entropy check min_entropy = current_app.config.get("MIN_ENTROPY", 0) min_entropy_size = current_app.config.get("MIN_ENTROPY_SIZE", 256) if min_entropy > 0 and content_size >= min_entropy_size: entropy = calculate_entropy(content) if entropy < min_entropy: current_app.logger.warning( "Low entropy rejected: %.2f < %.2f from=%s", entropy, min_entropy, request.remote_addr, ) return error_response( "Content entropy too low", 400, entropy=round(entropy, 2), min_entropy=min_entropy, hint="Encrypt content before uploading (fpaste encrypts by default)", ) # Binary content requirement (reject recognizable formats) if current_app.config.get("REQUIRE_BINARY", False): is_recognized, detected_format = is_recognizable_format(content) if is_recognized: current_app.logger.warning( "Recognizable format rejected: %s from=%s", detected_format, request.remote_addr, ) return error_response( "Recognizable format not allowed", 400, detected=detected_format, hint="Encrypt content before uploading (fpaste encrypts by default)", ) # Deduplication check content_hash = hashlib.sha256(content).hexdigest() is_allowed, dedup_count = check_content_hash(content_hash) if not is_allowed: window = current_app.config["CONTENT_DEDUP_WINDOW"] current_app.logger.warning( "Dedup threshold exceeded: hash=%s count=%d from=%s", content_hash[:16], dedup_count, request.remote_addr, ) # Audit log dedup block if current_app.config.get("AUDIT_ENABLED", True): from app.audit import AuditEvent, AuditOutcome, log_event log_event( AuditEvent.DEDUP_BLOCK, AuditOutcome.BLOCKED, client_id=owner, client_ip=client_ip, details={"hash": content_hash[:16], "count": dedup_count}, ) record_dedup("blocked") return error_response( "Duplicate content rate limit exceeded", 429, count=dedup_count, window_seconds=window, ) record_dedup("allowed") # Parse optional headers burn_header = request.headers.get("X-Burn-After-Read", "").strip().lower() burn_after_read = burn_header in ("true", "1", "yes") # Determine default expiry based on authentication level # Anonymous < Untrusted cert < Trusted cert (registered in PKI) if owner is None: # Anonymous user default_expiry = current_app.config.get("EXPIRY_ANON", 86400) elif trusted_client: # Trusted certificate (registered in PKI) from app.pki import is_trusted_certificate if is_trusted_certificate(owner): default_expiry = current_app.config.get("EXPIRY_TRUSTED", 2592000) else: default_expiry = current_app.config.get("EXPIRY_UNTRUSTED", 604800) else: # Has cert but not trusted default_expiry = current_app.config.get("EXPIRY_UNTRUSTED", 604800) expires_at = None expiry_header = request.headers.get("X-Expiry", "").strip() if expiry_header: try: expiry_seconds = int(expiry_header) if expiry_seconds > 0: max_expiry = current_app.config.get("MAX_EXPIRY_SECONDS", 0) if max_expiry > 0: expiry_seconds = min(expiry_seconds, max_expiry) expires_at = int(time.time()) + expiry_seconds except ValueError: pass # Apply default expiry if none specified (0 = no expiry for trusted) if expires_at is None and default_expiry > 0: expires_at = int(time.time()) + default_expiry password_hash = None password_header = request.headers.get("X-Paste-Password", "") if password_header: if len(password_header) > 1024: return error_response("Password too long (max 1024 chars)", 400) password_hash = hash_password(password_header) # Display name (authenticated users only, silently ignored for anonymous) display_name: str | None = None display_name_header = request.headers.get("X-Display-Name", "").strip() if display_name_header and owner: if len(display_name_header) > 128: return error_response("Display name too long (max 128 chars)", 400) if CONTROL_CHAR_PATTERN.search(display_name_header): return error_response("Display name contains invalid characters", 400) display_name = display_name_header # Insert paste paste_id = generate_paste_id(content) now = int(time.time()) db = get_db() db.execute( """INSERT INTO pastes (id, content, mime_type, owner, created_at, last_accessed, burn_after_read, expires_at, password_hash, display_name) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( paste_id, content, mime_type, owner, now, now, 1 if burn_after_read else 0, expires_at, password_hash, display_name, ), ) db.commit() # Build response (creation response is intentionally different - no size) response_data: dict[str, Any] = { "id": paste_id, "url": paste_url(paste_id), "raw": paste_raw_url(paste_id), "mime_type": mime_type, "created_at": now, } if owner: response_data["owner"] = owner if burn_after_read: response_data["burn_after_read"] = True if expires_at: response_data["expires_at"] = expires_at if password_hash: response_data["password_protected"] = True if display_name: response_data["display_name"] = display_name # Record successful paste for anti-flood tracking record_antiflood_request() # Audit log paste creation if current_app.config.get("AUDIT_ENABLED", True): from app.audit import AuditEvent, AuditOutcome, log_event log_event( AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id=paste_id, client_id=owner, client_ip=client_ip, details={"size": content_size, "mime_type": mime_type}, ) record_paste_created("authenticated" if owner else "anonymous", "success") response = json_response(response_data, 201) # Add rate limit headers to successful response rl_remaining = getattr(g, "rate_limit_remaining", -1) rl_limit = getattr(g, "rate_limit_limit", -1) rl_reset = getattr(g, "rate_limit_reset", 0) if rl_limit > 0: add_rate_limit_headers(response, rl_remaining, rl_limit, rl_reset) return response class HealthView(MethodView): """Health check endpoint.""" def get(self) -> Response: """Return health status with database check.""" try: db = get_db() db.execute("SELECT 1") return json_response({"status": "healthy", "database": "ok"}) except Exception: return json_response({"status": "unhealthy", "database": "error"}, 503) class ChallengeView(MethodView): """Proof-of-work challenge endpoint.""" def get(self) -> Response: """Generate and return PoW challenge.""" base_difficulty = current_app.config["POW_DIFFICULTY"] if base_difficulty == 0: return json_response({"enabled": False, "difficulty": 0}) ch = generate_challenge() response = { "enabled": True, "nonce": ch["nonce"], "difficulty": ch["difficulty"], "expires": ch["expires"], "token": ch["token"], } # Indicate if difficulty is elevated due to anti-flood if ch["difficulty"] > base_difficulty: response["elevated"] = True response["base_difficulty"] = base_difficulty return json_response(response) class RegisterChallengeView(MethodView): """Registration PoW challenge endpoint (higher difficulty).""" def get(self) -> Response: """Generate PoW challenge for registration (higher difficulty).""" register_difficulty = current_app.config.get("REGISTER_POW_DIFFICULTY", 24) if register_difficulty == 0: return json_response({"enabled": False, "difficulty": 0}) ch = generate_challenge(difficulty_override=register_difficulty) return json_response( { "enabled": True, "nonce": ch["nonce"], "difficulty": ch["difficulty"], "expires": ch["expires"], "token": ch["token"], "purpose": "registration", } ) class RegisterView(MethodView): """Public client certificate registration endpoint.""" def post(self) -> Response: """Register and obtain a client certificate. Requires PoW to prevent abuse. Returns PKCS#12 bundle with: - Client certificate - Client private key - CA certificate Auto-generates CA if not present and PKI_CA_PASSWORD is configured. """ from cryptography import x509 from cryptography.hazmat.primitives import serialization from app.pki import ( CANotFoundError, PKIError, create_pkcs12, generate_ca, get_ca_info, issue_certificate, ) # Check PKI configuration password = current_app.config.get("PKI_CA_PASSWORD", "") if not password: return error_response( "Registration not available", 503, hint="PKI_CA_PASSWORD not configured", ) # Verify PoW register_difficulty = current_app.config.get("REGISTER_POW_DIFFICULTY", 24) if register_difficulty > 0: token = request.headers.get("X-PoW-Token", "") solution = request.headers.get("X-PoW-Solution", "") if not token or not solution: return error_response( "Proof-of-work required", 400, hint="GET /register/challenge for a registration challenge", difficulty=register_difficulty, ) valid, err = verify_pow(token, solution, min_difficulty=register_difficulty) if not valid: current_app.logger.warning( "Registration PoW failed: %s from=%s", err, request.remote_addr ) return error_response(f"Proof-of-work failed: {err}", 400) # Parse common_name from request common_name = None if request.is_json: data = request.get_json(silent=True) if data and isinstance(data.get("common_name"), str): common_name = data["common_name"][:64].strip() if not common_name: # Generate random common name if not provided common_name = f"client-{secrets.token_hex(4)}" # Auto-generate CA if needed (skip PKI_ENABLED check for registration) ca_info = get_ca_info(skip_enabled_check=True) if ca_info is None: ca_days = current_app.config.get("PKI_CA_DAYS", 3650) try: ca_info = generate_ca("FlaskPaste CA", password, days=ca_days) current_app.logger.info( "CA auto-generated for registration: fingerprint=%s", ca_info["fingerprint_sha1"][:12], ) except PKIError as e: current_app.logger.error("CA auto-generation failed: %s", e) return error_response("CA generation failed", 500) # Issue certificate try: cert_days = current_app.config.get("PKI_CERT_DAYS", 365) cert_info = issue_certificate(common_name, password, days=cert_days) except CANotFoundError: return error_response("CA not available", 500) except PKIError as e: current_app.logger.error("Certificate issuance failed: %s", e) return error_response("Certificate issuance failed", 500) # Load CA cert for PKCS#12 (reuse ca_info from above, or refresh if it was just generated) if ca_info is None or "certificate_pem" not in ca_info: ca_info = get_ca_info(skip_enabled_check=True) assert ca_info is not None # CA was just generated or exists ca_cert = x509.load_pem_x509_certificate(ca_info["certificate_pem"].encode()) client_cert = x509.load_pem_x509_certificate(cert_info["certificate_pem"].encode()) client_key = serialization.load_pem_private_key( cert_info["private_key_pem"].encode(), password=None ) # Create PKCS#12 bundle (no password for easy import) p12_data = create_pkcs12( private_key=client_key, certificate=client_cert, ca_certificate=ca_cert, friendly_name=common_name, password=None, ) current_app.logger.info( "Client registered: cn=%s fingerprint=%s from=%s", common_name, cert_info["fingerprint_sha1"][:12], request.remote_addr, ) log_event( AuditEvent.CERT_ISSUED, AuditOutcome.SUCCESS, client_id=cert_info["fingerprint_sha1"], client_ip=request.remote_addr, details={ "type": "registration", "common_name": common_name, "expires_at": cert_info["expires_at"], }, ) # Return PKCS#12 as binary download response = Response(p12_data, mimetype="application/x-pkcs12") response.headers["Content-Disposition"] = f'attachment; filename="{common_name}.p12"' response.headers["X-Fingerprint-SHA1"] = cert_info["fingerprint_sha1"] response.headers["X-Certificate-Expires"] = str(cert_info["expires_at"]) response.headers["X-Is-Admin"] = "1" if cert_info.get("is_admin") else "0" return response class ClientView(MethodView): """CLI client download endpoint.""" def get(self) -> Response: """Serve fpaste CLI with server URL pre-configured.""" import os server_url = base_url() client_path = os.path.join(current_app.root_path, "..", "fpaste") try: with open(client_path) as f: content = f.read() # Replace default server URL content = content.replace( '"server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000")', f'"server": os.environ.get("FLASKPASTE_SERVER", "{server_url}")', ) content = content.replace( "http://localhost:5000)", f"{server_url})", ) response = Response(content, mimetype="text/x-python") response.headers["Content-Disposition"] = "attachment; filename=fpaste" return response except FileNotFoundError: return error_response("Client not available", 404) class PasteView(MethodView): """Paste metadata operations.""" def get(self, paste_id: str) -> Response: """Retrieve paste metadata.""" # Validate and fetch if err := validate_paste_id(paste_id): return err if err := fetch_paste(paste_id): return err row: Row = g.paste g.db.commit() return json_response( build_paste_metadata( paste_id=row["id"], mime_type=row["mime_type"], size=row["size"], created_at=row["created_at"], burn_after_read=bool(row["burn_after_read"]), expires_at=row["expires_at"], password_protected=bool(row["password_hash"]), display_name=row["display_name"], ) ) def head(self, paste_id: str) -> Response: """Return paste metadata headers only.""" return self.get(paste_id) def put(self, paste_id: str) -> Response: """Update paste content and/or metadata. Requires authentication and ownership. Content update: Send raw body with Content-Type header Metadata update: Use headers with empty body Headers: - X-Paste-Password: Set/change password - X-Remove-Password: true to remove password - X-Extend-Expiry: Seconds to add to current expiry - X-Display-Name: Set/change display name - X-Remove-Display-Name: true to remove display name """ # Validate paste ID format if err := validate_paste_id(paste_id): return err if err := require_auth(): return err db = get_db() # Fetch current paste row = db.execute( """SELECT id, owner, content, mime_type, expires_at, password_hash FROM pastes WHERE id = ?""", (paste_id,), ).fetchone() if row is None: return error_response("Paste not found", 404) if row["owner"] != g.client_id: return error_response("Permission denied", 403) # Check for burn-after-read (cannot update) burn_check = db.execute( "SELECT burn_after_read FROM pastes WHERE id = ?", (paste_id,) ).fetchone() if burn_check and burn_check["burn_after_read"]: return error_response("Cannot update burn-after-read paste", 400) # Parse update parameters new_password = request.headers.get("X-Paste-Password", "").strip() or None remove_password = request.headers.get("X-Remove-Password", "").lower() in ( "true", "1", "yes", ) extend_expiry_str = request.headers.get("X-Extend-Expiry", "").strip() # Prepare update fields update_fields = [] update_params: list[Any] = [] # Content update (if body provided) content = request.get_data() if content: mime_type = request.content_type or "application/octet-stream" # Sanitize MIME type if not MIME_PATTERN.match(mime_type.split(";")[0].strip()): mime_type = "application/octet-stream" update_fields.append("content = ?") update_params.append(content) update_fields.append("mime_type = ?") update_params.append(mime_type.split(";")[0].strip()) # Password update if remove_password: update_fields.append("password_hash = NULL") elif new_password: update_fields.append("password_hash = ?") update_params.append(hash_password(new_password)) # Expiry extension if extend_expiry_str: try: extend_seconds = int(extend_expiry_str) if extend_seconds > 0: current_expiry = row["expires_at"] if current_expiry: new_expiry = current_expiry + extend_seconds else: # If no expiry set, create one from now new_expiry = int(time.time()) + extend_seconds update_fields.append("expires_at = ?") update_params.append(new_expiry) except ValueError: return error_response("Invalid X-Extend-Expiry value", 400) # Display name update remove_display_name = request.headers.get("X-Remove-Display-Name", "").lower() in ( "true", "1", "yes", ) new_display_name = request.headers.get("X-Display-Name", "").strip() if remove_display_name: update_fields.append("display_name = NULL") elif new_display_name: if len(new_display_name) > 128: return error_response("Display name too long (max 128 chars)", 400) if CONTROL_CHAR_PATTERN.search(new_display_name): return error_response("Display name contains invalid characters", 400) update_fields.append("display_name = ?") update_params.append(new_display_name) if not update_fields: return error_response("No updates provided", 400) # Execute update (fields are hardcoded strings, safe from injection) update_sql = f"UPDATE pastes SET {', '.join(update_fields)} WHERE id = ?" # noqa: S608 # nosec B608 update_params.append(paste_id) db.execute(update_sql, update_params) db.commit() # Audit log paste update if current_app.config.get("AUDIT_ENABLED", True): from app.audit import AuditEvent, AuditOutcome, log_event log_event( AuditEvent.PASTE_UPDATE, AuditOutcome.SUCCESS, paste_id=paste_id, client_id=g.client_id, client_ip=get_client_ip(), details={"fields": [f.split(" = ")[0] for f in update_fields]}, ) # Fetch updated paste for response updated = db.execute( """SELECT id, mime_type, length(content) as size, expires_at, display_name, CASE WHEN password_hash IS NOT NULL THEN 1 ELSE 0 END as password_protected FROM pastes WHERE id = ?""", (paste_id,), ).fetchone() response_data: dict[str, Any] = { "id": updated["id"], "size": updated["size"], "mime_type": updated["mime_type"], } if updated["expires_at"]: response_data["expires_at"] = updated["expires_at"] if updated["password_protected"]: response_data["password_protected"] = True if updated["display_name"]: response_data["display_name"] = updated["display_name"] return json_response(response_data) class PasteRawView(MethodView): """Raw paste content retrieval.""" def get(self, paste_id: str) -> Response: """Retrieve raw paste content.""" # Validate and fetch if err := validate_paste_id(paste_id): return err if err := fetch_paste(paste_id): return err row: Row = g.paste db = g.db burn_after_read = row["burn_after_read"] if burn_after_read: db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,)) current_app.logger.info("Burn-after-read paste deleted: %s", paste_id) db.commit() # Audit log paste access if current_app.config.get("AUDIT_ENABLED", True): from app.audit import AuditEvent, AuditOutcome, log_event log_event( AuditEvent.PASTE_ACCESS, AuditOutcome.SUCCESS, paste_id=paste_id, client_id=get_client_fingerprint(), client_ip=get_client_ip(), details={"burn": bool(burn_after_read)}, ) record_paste_accessed( "authenticated" if get_client_fingerprint() else "anonymous", bool(burn_after_read), ) response = Response(row["content"], mimetype=row["mime_type"]) if row["mime_type"].startswith(("image/", "text/")): response.headers["Content-Disposition"] = "inline" if burn_after_read: response.headers["X-Burn-After-Read"] = "true" return response def head(self, paste_id: str) -> Response: """Return raw paste headers. HEAD triggers burn-after-read deletion. Security note: HEAD requests count as paste access for burn-after-read to prevent attackers from probing paste existence before retrieval. """ # Validate and fetch if err := validate_paste_id(paste_id): return err if err := fetch_paste(paste_id): return err row: Row = g.paste db = g.db # BURN-001: HEAD triggers burn-after-read like GET burn_after_read = row["burn_after_read"] if burn_after_read: db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,)) current_app.logger.info("Burn-after-read paste deleted via HEAD: %s", paste_id) db.commit() response = Response(mimetype=row["mime_type"]) response.headers["Content-Length"] = str(row["size"]) if row["mime_type"].startswith(("image/", "text/")): response.headers["Content-Disposition"] = "inline" if burn_after_read: response.headers["X-Burn-After-Read"] = "true" return response class PasteDeleteView(MethodView): """Paste deletion with authentication.""" def delete(self, paste_id: str) -> Response: """Delete paste. Requires ownership or admin rights.""" # Validate if err := validate_paste_id(paste_id): return err if err := require_auth(): return err db = get_db() row = db.execute("SELECT owner FROM pastes WHERE id = ?", (paste_id,)).fetchone() if row is None: return error_response("Paste not found", 404) # Allow if owner or admin if row["owner"] != g.client_id and not is_admin(): return error_response("Permission denied", 403) db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,)) db.commit() # Audit log paste deletion if current_app.config.get("AUDIT_ENABLED", True): from app.audit import AuditEvent, AuditOutcome, log_event log_event( AuditEvent.PASTE_DELETE, AuditOutcome.SUCCESS, paste_id=paste_id, client_id=g.client_id, client_ip=get_client_ip(), ) record_paste_deleted("authenticated", "success") return json_response({"message": "Paste deleted"}) class PastesListView(MethodView): """List pastes with authentication.""" def get(self) -> Response: """List pastes owned by authenticated user, or all pastes for admins. Privacy guarantees: - Requires authentication (mTLS client certificate) - Regular users can ONLY see their own pastes - Admins can see all pastes (with optional owner filter) - Content is never returned, only metadata Query parameters: - limit: max results (default 50, max 200) - offset: pagination offset (default 0) - type: filter by MIME type (glob pattern, e.g., "image/*") - after: filter by created_at >= timestamp - before: filter by created_at <= timestamp - all: (admin only) if "1", list all pastes instead of own - owner: (admin only) filter by owner fingerprint """ import fnmatch # Strict authentication requirement if err := require_auth(): return err client_id = g.client_id user_is_admin = is_admin() # Parse pagination parameters try: limit = min(int(request.args.get("limit", 50)), 200) offset = max(int(request.args.get("offset", 0)), 0) except (ValueError, TypeError): limit, offset = 50, 0 # Parse filter parameters type_filter = request.args.get("type", "").strip() try: after_ts = int(request.args.get("after", 0)) except (ValueError, TypeError): after_ts = 0 try: before_ts = int(request.args.get("before", 0)) except (ValueError, TypeError): before_ts = 0 # Admin-only parameters show_all = request.args.get("all", "0") == "1" and user_is_admin owner_filter = request.args.get("owner", "").strip() if user_is_admin else "" db = get_db() # Build query with filters where_clauses: list[str] = [] params: list[Any] = [] # Owner filtering logic if show_all: # Admin viewing all pastes (with optional owner filter) if owner_filter: where_clauses.append("owner = ?") params.append(owner_filter) # else: no owner filter, show all else: # Regular user or admin without ?all=1: show only own pastes where_clauses.append("owner = ?") params.append(client_id) if after_ts > 0: where_clauses.append("created_at >= ?") params.append(after_ts) if before_ts > 0: where_clauses.append("created_at <= ?") params.append(before_ts) # Build WHERE clause (may be empty for admin viewing all) where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" # Count total pastes matching filters (where_sql is safe, built from constants) count_row = db.execute( f"SELECT COUNT(*) as total FROM pastes WHERE {where_sql}", # noqa: S608 # nosec B608 params, ).fetchone() total = count_row["total"] if count_row else 0 # Fetch pastes with metadata only (where_sql is safe, built from constants) # Include owner for admin view rows = db.execute( f"""SELECT id, owner, mime_type, length(content) as size, created_at, last_accessed, burn_after_read, expires_at, display_name, CASE WHEN password_hash IS NOT NULL THEN 1 ELSE 0 END as password_protected FROM pastes WHERE {where_sql} ORDER BY created_at DESC LIMIT ? OFFSET ?""", # noqa: S608 # nosec B608 [*params, limit, offset], ).fetchall() # Apply MIME type filter (glob pattern matching done in Python for flexibility) if type_filter: rows = [r for r in rows if fnmatch.fnmatch(r["mime_type"], type_filter)] pastes = [ build_paste_metadata( paste_id=row["id"], mime_type=row["mime_type"], size=row["size"], created_at=row["created_at"], owner=row["owner"], burn_after_read=bool(row["burn_after_read"]), expires_at=row["expires_at"], password_protected=bool(row["password_protected"]), include_owner=show_all, last_accessed=row["last_accessed"], display_name=row["display_name"], ) for row in rows ] response_data: dict[str, Any] = { "pastes": pastes, "count": len(pastes), "total": total, "limit": limit, "offset": offset, } if user_is_admin: response_data["is_admin"] = True return json_response(response_data) # ───────────────────────────────────────────────────────────────────────────── # URL Shortener Views # ───────────────────────────────────────────────────────────────────────────── class ShortURLCreateView(MethodView): """Create short URLs.""" def post(self) -> Response: """Create a new short URL.""" # Parse URL from request body target_url: str | None = None if request.is_json: data = request.get_json(silent=True) if data and isinstance(data.get("url"), str): target_url = data["url"].strip() else: raw = request.get_data(as_text=True).strip() if raw: target_url = raw if not target_url: return error_response("No URL provided", 400) # Validate URL if err := validate_target_url(target_url): return err # Auth and rate limiting trusted_client = get_client_id() owner = get_client_fingerprint() client_ip = get_client_ip() # Rate limiting (trusted certs exempt) if not trusted_client: allowed, remaining, limit, reset_timestamp = check_rate_limit( client_ip, authenticated=False ) if not allowed: record_rate_limit("blocked") retry_after = max(1, reset_timestamp - int(time.time())) response = error_response("Rate limit exceeded", 429, retry_after=retry_after) response.headers["Retry-After"] = str(retry_after) add_rate_limit_headers(response, 0, limit, reset_timestamp) return response else: remaining, limit, reset_timestamp = -1, -1, 0 # Proof-of-work (trusted certs exempt) difficulty = current_app.config["POW_DIFFICULTY"] if difficulty > 0 and not trusted_client: token = request.headers.get("X-PoW-Token", "") solution = request.headers.get("X-PoW-Solution", "") if not token or not solution: return error_response( "Proof-of-work required", 400, hint="GET /challenge for a new challenge" ) valid, err_msg = verify_pow(token, solution) if not valid: record_pow("failure") return error_response(f"Proof-of-work failed: {err_msg}", 400) record_pow("success") # Dedup check (same URL within window) url_hash = hashlib.sha256(target_url.encode("utf-8")).hexdigest() is_allowed, dedup_count = check_content_hash(url_hash) if not is_allowed: record_dedup("blocked") window = current_app.config["CONTENT_DEDUP_WINDOW"] return error_response( "Duplicate URL rate limit exceeded", 429, count=dedup_count, window_seconds=window, ) record_dedup("allowed") # Parse optional expiry expires_at = None expiry_header = request.headers.get("X-Expiry", "").strip() if expiry_header: try: expiry_seconds = int(expiry_header) if expiry_seconds > 0: max_expiry = current_app.config.get("MAX_EXPIRY_SECONDS", 0) if max_expiry > 0: expiry_seconds = min(expiry_seconds, max_expiry) expires_at = int(time.time()) + expiry_seconds except ValueError: pass # Generate short ID and insert short_id = generate_short_id() now = int(time.time()) db = get_db() db.execute( """INSERT INTO short_urls (id, target_url, url_hash, owner, created_at, last_accessed, expires_at) VALUES (?, ?, ?, ?, ?, ?, ?)""", (short_id, target_url, url_hash, owner, now, now, expires_at), ) db.commit() record_antiflood_request() log_event( AuditEvent.URL_CREATE, AuditOutcome.SUCCESS, client_id=owner, client_ip=client_ip, details={"short_id": short_id, "target": target_url[:128]}, ) record_url_created("authenticated" if owner else "anonymous", "success") response_data: dict[str, Any] = { "id": short_id, "url": short_url_path(short_id), "target_url": target_url, "created_at": now, } if owner: response_data["owner"] = owner if expires_at: response_data["expires_at"] = expires_at response = json_response(response_data, 201) add_rate_limit_headers(response, remaining, limit, reset_timestamp) return response class ShortURLRedirectView(MethodView): """Redirect short URLs to their targets.""" def get(self, short_id: str) -> Response: """302 redirect to target URL.""" if err := validate_short_id(short_id): return err if err := fetch_short_url(short_id, increment_counter=True): return err row = g.short_url log_event( AuditEvent.URL_ACCESS, AuditOutcome.SUCCESS, client_ip=get_client_ip(), details={"short_id": short_id}, ) record_url_accessed("authenticated" if get_client_fingerprint() else "anonymous") response = Response(status=302) response.headers["Location"] = row["target_url"] response.headers["Cache-Control"] = "no-cache" return response def head(self, short_id: str) -> Response: """HEAD redirect to target URL.""" return self.get(short_id) class ShortURLInfoView(MethodView): """Short URL metadata.""" def get(self, short_id: str) -> Response: """Return short URL metadata without incrementing counter.""" if err := validate_short_id(short_id): return err if err := fetch_short_url(short_id, increment_counter=False): return err row = g.short_url data: dict[str, Any] = { "id": row["id"], "target_url": row["target_url"], "created_at": row["created_at"], "last_accessed": row["last_accessed"], "access_count": row["access_count"], "url": short_url_path(short_id), } if row["owner"]: data["owner"] = row["owner"] if row["expires_at"]: data["expires_at"] = row["expires_at"] return json_response(data) class ShortURLDeleteView(MethodView): """Delete short URLs.""" def delete(self, short_id: str) -> Response: """Delete a short URL. Requires ownership or admin.""" if err := validate_short_id(short_id): return err if err := require_auth(): return err db = get_db() row = db.execute("SELECT owner FROM short_urls WHERE id = ?", (short_id,)).fetchone() if row is None: return error_response("Short URL not found", 404) if row["owner"] != g.client_id and not is_admin(): return error_response("Permission denied", 403) db.execute("DELETE FROM short_urls WHERE id = ?", (short_id,)) db.commit() log_event( AuditEvent.URL_DELETE, AuditOutcome.SUCCESS, client_id=g.client_id, client_ip=get_client_ip(), details={"short_id": short_id}, ) record_url_deleted("authenticated", "success") return json_response({"message": "Short URL deleted"}) class ShortURLsListView(MethodView): """List short URLs owned by authenticated user.""" def get(self) -> Response: """List owned short URLs with pagination.""" if err := require_auth(): return err client_id = g.client_id try: limit = min(int(request.args.get("limit", 50)), 200) offset = max(int(request.args.get("offset", 0)), 0) except (ValueError, TypeError): limit, offset = 50, 0 db = get_db() count_row = db.execute( "SELECT COUNT(*) as total FROM short_urls WHERE owner = ?", (client_id,), ).fetchone() total = count_row["total"] if count_row else 0 rows = db.execute( """SELECT id, target_url, created_at, last_accessed, access_count, expires_at FROM short_urls WHERE owner = ? ORDER BY created_at DESC LIMIT ? OFFSET ?""", (client_id, limit, offset), ).fetchall() urls = [] for row in rows: entry: dict[str, Any] = { "id": row["id"], "target_url": row["target_url"], "created_at": row["created_at"], "access_count": row["access_count"], "url": short_url_path(row["id"]), } if row["expires_at"]: entry["expires_at"] = row["expires_at"] urls.append(entry) return json_response( { "urls": urls, "count": len(urls), "total": total, "limit": limit, "offset": offset, } ) # ───────────────────────────────────────────────────────────────────────────── # PKI Views (Certificate Authority) # ───────────────────────────────────────────────────────────────────────────── def require_pki_enabled() -> Response | None: """Check if PKI is enabled. Returns error response or None if enabled.""" if not current_app.config.get("PKI_ENABLED"): return error_response("PKI not enabled", 404) return None class PKIStatusView(MethodView): """PKI status endpoint.""" def get(self) -> Response: """Return PKI status and CA info if available.""" if not current_app.config.get("PKI_ENABLED"): return json_response({"enabled": False}) from app.pki import get_ca_info ca_info = get_ca_info() if ca_info is None: return json_response( { "enabled": True, "ca_exists": False, "hint": "POST /pki/ca to generate CA", } ) return json_response( { "enabled": True, "ca_exists": True, "common_name": ca_info["common_name"], "fingerprint_sha1": ca_info["fingerprint_sha1"], "created_at": ca_info["created_at"], "expires_at": ca_info["expires_at"], "key_algorithm": ca_info["key_algorithm"], } ) class PKICAGenerateView(MethodView): """CA generation endpoint (first-run only).""" def post(self) -> Response: """Generate CA certificate. Only works if no CA exists.""" if err := require_pki_enabled(): return err from app.pki import ( CAExistsError, PKIError, generate_ca, get_ca_info, ) # Check if CA already exists if get_ca_info() is not None: return error_response("CA already exists", 409) # Get CA password from config password = current_app.config.get("PKI_CA_PASSWORD", "") if not password: return error_response( "PKI_CA_PASSWORD not configured", 500, hint="Set FLASKPASTE_PKI_CA_PASSWORD environment variable", ) # Parse request for optional common name common_name = "FlaskPaste CA" if request.is_json: data = request.get_json(silent=True) if data and isinstance(data.get("common_name"), str): common_name = data["common_name"][:64] # Generate CA try: days = current_app.config.get("PKI_CA_DAYS", 3650) owner = get_client_fingerprint() ca_info = generate_ca(common_name, password, days=days, owner=owner) except CAExistsError: return error_response("CA already exists", 409) except PKIError as e: current_app.logger.error("CA generation failed: %s", e) return error_response("CA generation failed", 500) current_app.logger.info( "CA generated: cn=%s fingerprint=%s", common_name, ca_info["fingerprint_sha1"][:12] ) log_event( AuditEvent.CERT_ISSUED, AuditOutcome.SUCCESS, client_id=owner, client_ip=request.remote_addr, details={ "type": "ca", "fingerprint": ca_info["fingerprint_sha1"][:16], "common_name": common_name, "expires_at": ca_info["expires_at"], }, ) return json_response( { "message": "CA generated", "common_name": ca_info["common_name"], "fingerprint_sha1": ca_info["fingerprint_sha1"], "created_at": ca_info["created_at"], "expires_at": ca_info["expires_at"], "download": prefixed_url("/pki/ca.crt"), }, 201, ) class PKICADownloadView(MethodView): """CA certificate download endpoint.""" def get(self) -> Response: """Download CA certificate in PEM format.""" if err := require_pki_enabled(): return err from app.pki import get_ca_info ca_info = get_ca_info() if ca_info is None: return error_response("CA not initialized", 404) response = Response(ca_info["certificate_pem"], mimetype="application/x-pem-file") response.headers["Content-Disposition"] = ( f"attachment; filename={ca_info['common_name'].replace(' ', '_')}.crt" ) return response class PKIIssueView(MethodView): """Certificate issuance endpoint (open registration).""" def post(self) -> Response: """Issue a new client certificate.""" if err := require_pki_enabled(): return err from app.pki import ( CANotFoundError, PKIError, issue_certificate, ) # Parse request common_name = None if request.is_json: data = request.get_json(silent=True) if data and isinstance(data.get("common_name"), str): common_name = data["common_name"][:64] if not common_name: return error_response( "common_name required", 400, hint='POST {"common_name": "your-name"}' ) # Get CA password from config password = current_app.config.get("PKI_CA_PASSWORD", "") if not password: return error_response("PKI not properly configured", 500) # Issue certificate try: days = current_app.config.get("PKI_CERT_DAYS", 365) issued_to = get_client_fingerprint() cert_info = issue_certificate(common_name, password, days=days, issued_to=issued_to) except CANotFoundError: return error_response("CA not initialized", 404) except PKIError as e: current_app.logger.error("Certificate issuance failed: %s", e) return error_response("Certificate issuance failed", 500) current_app.logger.info( "Certificate issued: cn=%s serial=%s fingerprint=%s to=%s", common_name, cert_info["serial"][:8], cert_info["fingerprint_sha1"][:12], issued_to or "anonymous", ) log_event( AuditEvent.CERT_ISSUED, AuditOutcome.SUCCESS, client_id=cert_info["fingerprint_sha1"], client_ip=request.remote_addr, details={ "type": "client", "serial": cert_info["serial"][:16], "common_name": common_name, "issued_by": issued_to, "expires_at": cert_info["expires_at"], }, ) # Return certificate bundle return json_response( { "message": "Certificate issued", "serial": cert_info["serial"], "common_name": cert_info["common_name"], "fingerprint_sha1": cert_info["fingerprint_sha1"], "created_at": cert_info["created_at"], "expires_at": cert_info["expires_at"], "certificate_pem": cert_info["certificate_pem"], "private_key_pem": cert_info["private_key_pem"], "is_admin": cert_info.get("is_admin", False), }, 201, ) class PKICertsView(MethodView): """Certificate listing endpoint.""" def get(self) -> Response: """List issued certificates.""" if err := require_pki_enabled(): return err client_id = get_client_fingerprint() db = get_db() # Users with certificates can see their own certs or certs they issued # Anonymous users (no cert) see nothing if client_id: rows = db.execute( """SELECT serial, common_name, fingerprint_sha1, created_at, expires_at, issued_to, status, revoked_at FROM issued_certificates WHERE issued_to = ? OR fingerprint_sha1 = ? ORDER BY created_at DESC""", (client_id, client_id), ).fetchall() else: # Anonymous: empty list rows = [] certs = [] for row in rows: cert = { "serial": row["serial"], "common_name": row["common_name"], "fingerprint_sha1": row["fingerprint_sha1"], "created_at": row["created_at"], "expires_at": row["expires_at"], "status": row["status"], } if row["issued_to"]: cert["issued_to"] = row["issued_to"] if row["revoked_at"]: cert["revoked_at"] = row["revoked_at"] certs.append(cert) return json_response({"certificates": certs, "count": len(certs)}) class PKIRevokeView(MethodView): """Certificate revocation endpoint.""" def post(self, serial: str) -> Response: """Revoke a certificate by serial number.""" if err := require_pki_enabled(): return err if err := require_auth(): return err from app.pki import CertificateNotFoundError, PKIError, revoke_certificate db = get_db() # Check certificate exists and get ownership info row = db.execute( "SELECT issued_to, fingerprint_sha1, status FROM issued_certificates WHERE serial = ?", (serial,), ).fetchone() if row is None: return error_response("Certificate not found", 404) if row["status"] == "revoked": return error_response("Certificate already revoked", 409) # Check permission: must be issuer or the certificate itself client_id = g.client_id can_revoke = row["issued_to"] == client_id or row["fingerprint_sha1"] == client_id if not can_revoke: return error_response("Permission denied", 403) # Revoke try: revoke_certificate(serial) except CertificateNotFoundError: return error_response("Certificate not found", 404) except PKIError as e: current_app.logger.error("Revocation failed: %s", e) return error_response("Revocation failed", 500) current_app.logger.info("Certificate revoked: serial=%s by=%s", serial[:8], client_id[:12]) log_event( AuditEvent.CERT_REVOKED, AuditOutcome.SUCCESS, client_id=client_id, client_ip=get_client_ip(), details={ "serial": serial[:16], "fingerprint": row["fingerprint_sha1"][:16], }, ) return json_response({"message": "Certificate revoked", "serial": serial}) # ───────────────────────────────────────────────────────────────────────────── # Audit Log View # ───────────────────────────────────────────────────────────────────────────── class AuditLogView(MethodView): """Audit log query endpoint (admin only).""" def get(self) -> Response: """Query audit log with filters. Query parameters: - event_type: Filter by event type - client_id: Filter by client fingerprint - paste_id: Filter by paste ID - outcome: Filter by outcome (success, failure, blocked) - since: Filter by timestamp >= since - until: Filter by timestamp <= until - limit: Maximum results (default 100, max 500) - offset: Pagination offset """ if err := require_auth(): return err if not is_admin(): return error_response("Admin access required", 403) from app.audit import query_audit_log # Parse query parameters event_type = request.args.get("event_type", "").strip() or None client_id = request.args.get("client_id", "").strip() or None paste_id = request.args.get("paste_id", "").strip() or None outcome = request.args.get("outcome", "").strip() or None try: since = int(request.args.get("since", 0)) or None except (ValueError, TypeError): since = None try: until = int(request.args.get("until", 0)) or None except (ValueError, TypeError): until = None try: limit = min(int(request.args.get("limit", 100)), 500) except (ValueError, TypeError): limit = 100 try: offset = max(int(request.args.get("offset", 0)), 0) except (ValueError, TypeError): offset = 0 entries, total = query_audit_log( event_type=event_type, client_id=client_id, paste_id=paste_id, outcome=outcome, since=since, until=until, limit=limit, offset=offset, ) return json_response( { "entries": entries, "count": len(entries), "total": total, "limit": limit, "offset": offset, } ) # ───────────────────────────────────────────────────────────────────────────── # Route Registration # ───────────────────────────────────────────────────────────────────────────── # Index and paste creation bp.add_url_rule("/", view_func=IndexView.as_view("index")) # Utility endpoints bp.add_url_rule("/health", view_func=HealthView.as_view("health")) bp.add_url_rule("/challenge", view_func=ChallengeView.as_view("challenge")) bp.add_url_rule("/client", view_func=ClientView.as_view("client")) # Registration endpoints (public certificate issuance with PoW) bp.add_url_rule( "/register/challenge", view_func=RegisterChallengeView.as_view("register_challenge") ) bp.add_url_rule("/register", view_func=RegisterView.as_view("register")) # Paste operations bp.add_url_rule("/pastes", view_func=PastesListView.as_view("pastes_list")) bp.add_url_rule("/", view_func=PasteView.as_view("paste"), methods=["GET", "HEAD", "PUT"]) bp.add_url_rule( "//raw", view_func=PasteRawView.as_view("paste_raw"), methods=["GET", "HEAD"] ) bp.add_url_rule( "/", view_func=PasteDeleteView.as_view("paste_delete"), methods=["DELETE"] ) # URL shortener endpoints bp.add_url_rule("/s", view_func=ShortURLCreateView.as_view("short_url_create"), methods=["POST"]) bp.add_url_rule("/s", view_func=ShortURLsListView.as_view("short_urls_list"), methods=["GET"]) bp.add_url_rule( "/s/", view_func=ShortURLRedirectView.as_view("short_url_redirect"), methods=["GET", "HEAD"], ) bp.add_url_rule("/s//info", view_func=ShortURLInfoView.as_view("short_url_info")) bp.add_url_rule( "/s/", view_func=ShortURLDeleteView.as_view("short_url_delete"), methods=["DELETE"], ) # PKI endpoints bp.add_url_rule("/pki", view_func=PKIStatusView.as_view("pki_status")) bp.add_url_rule("/pki/ca", view_func=PKICAGenerateView.as_view("pki_ca_generate")) bp.add_url_rule("/pki/ca.crt", view_func=PKICADownloadView.as_view("pki_ca_download")) bp.add_url_rule("/pki/issue", view_func=PKIIssueView.as_view("pki_issue")) bp.add_url_rule("/pki/certs", view_func=PKICertsView.as_view("pki_certs")) bp.add_url_rule( "/pki/revoke/", view_func=PKIRevokeView.as_view("pki_revoke"), methods=["POST"] ) # Audit log endpoint (admin only) bp.add_url_rule("/audit", view_func=AuditLogView.as_view("audit_log"))