"""API route handlers using modern Flask patterns.""" from __future__ import annotations import hashlib import hmac import json import math import re import secrets import threading import time from collections import defaultdict from typing import TYPE_CHECKING, Any from flask import Response, current_app, g, request from flask.views import MethodView from app.api import bp from app.config import VERSION from app.database import check_content_hash, get_db, hash_password, verify_password if TYPE_CHECKING: from sqlite3 import Row # Compiled patterns for validation PASTE_ID_PATTERN = re.compile(r"^[a-f0-9]+$") CLIENT_ID_PATTERN = re.compile(r"^[a-f0-9]{40}$") MIME_PATTERN = re.compile(r"^[a-z0-9][a-z0-9!#$&\-^_.+]*/[a-z0-9][a-z0-9!#$&\-^_.+]*$") # Magic bytes for binary format detection MAGIC_SIGNATURES: dict[bytes, str] = { b"\x89PNG\r\n\x1a\n": "image/png", b"\xff\xd8\xff": "image/jpeg", b"GIF87a": "image/gif", b"GIF89a": "image/gif", b"RIFF": "image/webp", b"PK\x03\x04": "application/zip", b"%PDF": "application/pdf", b"\x1f\x8b": "application/gzip", } # Generic MIME types to override with detection GENERIC_MIME_TYPES = frozenset( { "application/octet-stream", "application/x-www-form-urlencoded", "text/plain", } ) # Runtime PoW secret cache _pow_secret_cache: bytes | None = None # ───────────────────────────────────────────────────────────────────────────── # Anti-flood: dynamic PoW difficulty adjustment # ───────────────────────────────────────────────────────────────────────────── _antiflood_lock = threading.Lock() _antiflood_requests: list[float] = [] # Global request timestamps _antiflood_difficulty: int = 0 # Current difficulty boost (added to base) _antiflood_last_increase: float = 0 # Last time difficulty was increased def get_dynamic_difficulty() -> int: """Get current PoW difficulty including anti-flood adjustment.""" base = current_app.config["POW_DIFFICULTY"] if base == 0 or not current_app.config.get("ANTIFLOOD_ENABLED", True): return base with _antiflood_lock: return min(base + _antiflood_difficulty, current_app.config["ANTIFLOOD_MAX"]) def record_antiflood_request() -> None: """Record a request for anti-flood tracking and adjust difficulty.""" if not current_app.config.get("ANTIFLOOD_ENABLED", True): return if current_app.config["POW_DIFFICULTY"] == 0: return global _antiflood_difficulty, _antiflood_last_increase now = time.time() window = current_app.config["ANTIFLOOD_WINDOW"] threshold = current_app.config["ANTIFLOOD_THRESHOLD"] step = current_app.config["ANTIFLOOD_STEP"] max_diff = current_app.config["ANTIFLOOD_MAX"] decay = current_app.config["ANTIFLOOD_DECAY"] base = current_app.config["POW_DIFFICULTY"] with _antiflood_lock: # Clean old requests cutoff = now - window _antiflood_requests[:] = [t for t in _antiflood_requests if t > cutoff] # Record this request _antiflood_requests.append(now) count = len(_antiflood_requests) # Check if we should increase difficulty if count > threshold: # Increase difficulty if not already at max if base + _antiflood_difficulty < max_diff: _antiflood_difficulty += step _antiflood_last_increase = now elif _antiflood_difficulty > 0 and (now - _antiflood_last_increase) > decay: # Decay difficulty if abuse has stopped _antiflood_difficulty = max(0, _antiflood_difficulty - step) _antiflood_last_increase = now # Reset timer def reset_antiflood() -> None: """Reset anti-flood state (for testing).""" global _antiflood_difficulty, _antiflood_last_increase with _antiflood_lock: _antiflood_requests.clear() _antiflood_difficulty = 0 _antiflood_last_increase = 0 # ───────────────────────────────────────────────────────────────────────────── # Rate Limiting (in-memory sliding window) # ───────────────────────────────────────────────────────────────────────────── _rate_limit_lock = threading.Lock() _rate_limit_requests: dict[str, list[float]] = defaultdict(list) def get_client_ip() -> str: """Get client IP address, respecting X-Forwarded-For from trusted proxy.""" if is_trusted_proxy(): forwarded = request.headers.get("X-Forwarded-For", "") if forwarded: # Take the first (client) IP from the chain return forwarded.split(",")[0].strip() return request.remote_addr or "unknown" def check_rate_limit(client_ip: str, authenticated: bool = False) -> tuple[bool, int, int]: """Check if request is within rate limit. Args: client_ip: Client IP address authenticated: Whether client is authenticated (higher limits) Returns: Tuple of (allowed, remaining, reset_seconds) """ if not current_app.config.get("RATE_LIMIT_ENABLED", True): return True, -1, 0 window = current_app.config["RATE_LIMIT_WINDOW"] max_requests = current_app.config["RATE_LIMIT_MAX"] if authenticated: max_requests *= current_app.config.get("RATE_LIMIT_AUTH_MULTIPLIER", 5) now = time.time() cutoff = now - window with _rate_limit_lock: # Clean old requests and get current list requests = _rate_limit_requests[client_ip] requests[:] = [t for t in requests if t > cutoff] current_count = len(requests) if current_count >= max_requests: # Calculate reset time (when oldest request expires) reset_at = int(requests[0] + window - now) + 1 if requests else window return False, 0, reset_at # Record this request requests.append(now) remaining = max_requests - len(requests) return True, remaining, window def cleanup_rate_limits(window: int | None = None) -> int: """Remove expired rate limit entries. Returns count of cleaned entries. Args: window: Rate limit window in seconds. If None, uses app config. """ # This should be called periodically (e.g., via cleanup task) if window is None: window = current_app.config.get("RATE_LIMIT_WINDOW", 60) cutoff = time.time() - window cleaned = 0 with _rate_limit_lock: to_remove = [] for ip, requests in _rate_limit_requests.items(): requests[:] = [t for t in requests if t > cutoff] if not requests: to_remove.append(ip) for ip in to_remove: del _rate_limit_requests[ip] cleaned += 1 return cleaned def reset_rate_limits() -> None: """Clear all rate limit state. For testing only.""" with _rate_limit_lock: _rate_limit_requests.clear() # ───────────────────────────────────────────────────────────────────────────── # Response Helpers # ───────────────────────────────────────────────────────────────────────────── def json_response(data: dict[str, Any], status: int = 200) -> Response: """Create JSON response with proper encoding.""" return Response( json.dumps(data, ensure_ascii=False), status=status, mimetype="application/json", ) def error_response(message: str, status: int, **extra: Any) -> Response: """Create standardized error response.""" data = {"error": message, **extra} return json_response(data, status) # ───────────────────────────────────────────────────────────────────────────── # URL Helpers # ───────────────────────────────────────────────────────────────────────────── def url_prefix() -> str: """Get configured URL prefix for reverse proxy deployments.""" return current_app.config.get("URL_PREFIX", "") def prefixed_url(path: str) -> str: """Generate URL with configured prefix.""" return f"{url_prefix()}{path}" def base_url() -> str: """Detect full base URL from request headers.""" scheme = ( request.headers.get("X-Forwarded-Proto") or request.headers.get("X-Scheme") or request.scheme ) host = request.headers.get("X-Forwarded-Host") or request.headers.get("Host") or request.host return f"{scheme}://{host}{url_prefix()}" # ───────────────────────────────────────────────────────────────────────────── # Validation Helpers (used within views) # ───────────────────────────────────────────────────────────────────────────── def validate_paste_id(paste_id: str) -> Response | None: """Validate paste ID format. Returns error response or None if valid.""" expected_length = current_app.config["PASTE_ID_LENGTH"] if len(paste_id) != expected_length or not PASTE_ID_PATTERN.match(paste_id): return error_response("Invalid paste ID", 400) return None def fetch_paste(paste_id: str, check_password: bool = True) -> Response | None: """Fetch paste and store in g.paste. Returns error response or None if OK.""" db = get_db() now = int(time.time()) # Update access time db.execute("UPDATE pastes SET last_accessed = ? WHERE id = ?", (now, paste_id)) row = db.execute( """SELECT id, content, mime_type, owner, created_at, length(content) as size, burn_after_read, expires_at, password_hash FROM pastes WHERE id = ?""", (paste_id,), ).fetchone() if row is None: db.commit() return error_response("Paste not found", 404) # Password verification if check_password and row["password_hash"]: provided = request.headers.get("X-Paste-Password", "") if not provided: db.commit() return error_response("Password required", 401, password_protected=True) if not verify_password(provided, row["password_hash"]): db.commit() return error_response("Invalid password", 403) g.paste = row g.db = db return None def require_auth() -> Response | None: """Check authentication for ownership operations. Uses get_client_fingerprint() to allow both trusted and untrusted certificate holders to manage their own pastes. Returns error response or None if authenticated. """ client_id = get_client_fingerprint() if not client_id: return error_response("Authentication required", 401) g.client_id = client_id return None # ───────────────────────────────────────────────────────────────────────────── # Authentication & Security # ───────────────────────────────────────────────────────────────────────────── def is_trusted_proxy() -> bool: """Verify request comes from trusted reverse proxy via shared secret.""" expected = current_app.config.get("TRUSTED_PROXY_SECRET", "") if not expected: return True provided = request.headers.get("X-Proxy-Secret", "") return hmac.compare_digest(expected, provided) def get_client_fingerprint() -> str | None: """Extract client certificate fingerprint for identity/ownership. Returns fingerprint regardless of trust status. Used for: - Paste ownership tracking - Delete/update/list operations (user manages their own pastes) Returns None if no valid fingerprint provided or proxy not trusted. """ if not is_trusted_proxy(): return None sha1 = request.headers.get("X-SSL-Client-SHA1", "").strip().lower() if sha1 and CLIENT_ID_PATTERN.match(sha1): return sha1 return None def get_client_id() -> str | None: """Get trusted client certificate fingerprint for elevated privileges. Returns fingerprint only if certificate is valid and not revoked. Used for: - Rate limit benefits (higher limits for trusted users) - Size limit benefits (larger pastes for trusted users) Untrusted certificates return None here but still work via get_client_fingerprint() for ownership operations. """ fingerprint = get_client_fingerprint() if not fingerprint: return None # Check if PKI is enabled and certificate is revoked if current_app.config.get("PKI_ENABLED"): from app.pki import is_certificate_valid if not is_certificate_valid(fingerprint): current_app.logger.warning( "Elevated auth rejected (revoked/expired): %s", fingerprint[:12] + "..." ) return None return fingerprint def is_admin() -> bool: """Check if current authenticated user is an admin. Returns True only if: - User has a valid client certificate - Certificate is marked as admin in the PKI database """ client_id = get_client_id() if not client_id: return False if not current_app.config.get("PKI_ENABLED"): return False from app.pki import is_admin_certificate return is_admin_certificate(client_id) # ───────────────────────────────────────────────────────────────────────────── # Proof-of-Work # ───────────────────────────────────────────────────────────────────────────── def get_pow_secret() -> bytes: """Get or generate PoW signing secret.""" global _pow_secret_cache configured = current_app.config.get("POW_SECRET", "") if configured: return configured.encode() if _pow_secret_cache is None: _pow_secret_cache = secrets.token_bytes(32) return _pow_secret_cache def generate_challenge(difficulty_override: int | None = None) -> dict[str, Any]: """Generate new PoW challenge with signed token. Uses dynamic difficulty which may be elevated during high load, unless difficulty_override is specified. Args: difficulty_override: Optional fixed difficulty (for registration) """ if difficulty_override is not None: difficulty = difficulty_override else: difficulty = get_dynamic_difficulty() ttl = current_app.config["POW_CHALLENGE_TTL"] expires = int(time.time()) + ttl nonce = secrets.token_hex(16) msg = f"{nonce}:{expires}:{difficulty}".encode() sig = hmac.new(get_pow_secret(), msg, hashlib.sha256).hexdigest() return { "nonce": nonce, "difficulty": difficulty, "expires": expires, "token": f"{nonce}:{expires}:{difficulty}:{sig}", } def verify_pow(token: str, solution: str, min_difficulty: int | None = None) -> tuple[bool, str]: """Verify proof-of-work solution. Returns (valid, error_message). Accepts tokens with difficulty >= min_difficulty. The solution must meet the token's embedded difficulty (which may be elevated due to anti-flood). Args: token: PoW challenge token solution: Nonce solution min_difficulty: Minimum required difficulty (defaults to POW_DIFFICULTY) """ base_difficulty = current_app.config["POW_DIFFICULTY"] if base_difficulty == 0 and min_difficulty is None: return True, "" required_difficulty = min_difficulty if min_difficulty is not None else base_difficulty if required_difficulty == 0: return True, "" # Parse token try: parts = token.split(":") if len(parts) != 4: return False, "Invalid challenge format" nonce, expires_str, diff_str, sig = parts expires = int(expires_str) token_diff = int(diff_str) except (ValueError, TypeError): return False, "Invalid challenge format" # Verify signature msg = f"{nonce}:{expires}:{token_diff}".encode() expected_sig = hmac.new(get_pow_secret(), msg, hashlib.sha256).hexdigest() if not hmac.compare_digest(sig, expected_sig): return False, "Invalid challenge signature" # Check expiry if int(time.time()) > expires: return False, "Challenge expired" # Token difficulty must be at least the required difficulty if token_diff < required_difficulty: return False, f"Difficulty too low: {token_diff} < {required_difficulty}" # Verify solution try: solution_int = int(solution) if solution_int < 0: return False, "Invalid solution" except (ValueError, TypeError): return False, "Invalid solution" # Check hash meets the token's difficulty (not current dynamic difficulty) work = f"{nonce}:{solution}".encode() hash_bytes = hashlib.sha256(work).digest() zero_bits = 0 for byte in hash_bytes: if byte == 0: zero_bits += 8 else: zero_bits += 8 - byte.bit_length() break if zero_bits < token_diff: return False, f"Insufficient work: {zero_bits} < {token_diff} bits" return True, "" # ───────────────────────────────────────────────────────────────────────────── # Content Processing # ───────────────────────────────────────────────────────────────────────────── def calculate_entropy(data: bytes) -> float: """Calculate Shannon entropy in bits per byte (0-8 range).""" if not data: return 0.0 freq = [0] * 256 for byte in data: freq[byte] += 1 length = len(data) entropy = 0.0 for count in freq: if count > 0: p = count / length entropy -= p * math.log2(p) return entropy def detect_mime_type(content: bytes, content_type: str | None = None) -> str: """Detect MIME type using magic bytes, headers, or content analysis.""" # Magic byte detection (highest priority) for magic, mime in MAGIC_SIGNATURES.items(): if content.startswith(magic): # RIFF container: verify WEBP subtype if magic == b"RIFF" and len(content) >= 12 and content[8:12] != b"WEBP": continue return mime # Explicit Content-Type (if specific) if content_type: mime = content_type.split(";")[0].strip().lower() if mime not in GENERIC_MIME_TYPES and MIME_PATTERN.match(mime): return mime # UTF-8 text detection try: content.decode("utf-8") return "text/plain" except UnicodeDecodeError: return "application/octet-stream" def is_recognizable_format(content: bytes) -> tuple[bool, str | None]: """Check if content is a recognizable (likely unencrypted) format. Returns (is_recognizable, detected_format). Used to enforce encryption by rejecting known formats. """ # Check magic bytes for magic, mime in MAGIC_SIGNATURES.items(): if content.startswith(magic): if magic == b"RIFF" and len(content) >= 12 and content[8:12] != b"WEBP": continue return True, mime # Check if valid UTF-8 text (plaintext) try: content.decode("utf-8") return True, "text/plain" except UnicodeDecodeError: pass return False, None def generate_paste_id(content: bytes) -> str: """Generate unique paste ID from content hash and timestamp.""" data = content + str(time.time_ns()).encode() length = current_app.config["PASTE_ID_LENGTH"] return hashlib.sha256(data).hexdigest()[:length] # ───────────────────────────────────────────────────────────────────────────── # Class-Based Views # ───────────────────────────────────────────────────────────────────────────── class IndexView(MethodView): """Handle API info and paste creation.""" def get(self) -> Response: """Return API information and usage examples.""" prefix = url_prefix() or "/" url = base_url() difficulty = get_dynamic_difficulty() pki_enabled = current_app.config.get("PKI_ENABLED", False) # Build endpoints dict endpoints: dict[str, str] = { f"GET {prefixed_url('/')}": "API information", f"GET {prefixed_url('/health')}": "Health check", f"GET {prefixed_url('/client')}": "Download CLI client (fpaste)", f"GET {prefixed_url('/challenge')}": "Get proof-of-work challenge", f"POST {prefixed_url('/')}": "Create paste (PoW required)", f"GET {prefixed_url('/pastes')}": "List your pastes (cert required)", f"GET {prefixed_url('/')}": "Get paste metadata", f"GET {prefixed_url('//raw')}": "Get raw paste content", f"PUT {prefixed_url('/')}": "Update paste (owner only)", f"DELETE {prefixed_url('/')}": "Delete paste (owner only)", f"GET {prefixed_url('/register/challenge')}": "Get registration challenge", f"POST {prefixed_url('/register')}": "Register for client certificate", } if pki_enabled: endpoints.update( { f"GET {prefixed_url('/pki')}": "PKI status", f"GET {prefixed_url('/pki/ca.crt')}": "Download CA certificate", f"POST {prefixed_url('/pki/issue')}": "Issue client certificate", f"GET {prefixed_url('/pki/certs')}": "List certificates", f"POST {prefixed_url('/pki/revoke/')}": "Revoke certificate", } ) # Build response response_data: dict[str, Any] = { "name": "FlaskPaste", "version": VERSION, "prefix": prefix, "endpoints": endpoints, "authentication": { "anonymous": "Create pastes only (strict limits)", "client_cert": "Create + manage own pastes (strict limits)", "trusted_cert": "All operations (relaxed limits)", }, "limits": { "anonymous": { "max_size": current_app.config["MAX_PASTE_SIZE_ANON"], "rate": f"{current_app.config['RATE_LIMIT_MAX']}/min", }, "trusted": { "max_size": current_app.config["MAX_PASTE_SIZE_AUTH"], "rate": f"{current_app.config['RATE_LIMIT_MAX'] * current_app.config.get('RATE_LIMIT_AUTH_MULTIPLIER', 5)}/min", # noqa: E501 }, }, "pow": { "enabled": difficulty > 0, "difficulty": difficulty, "hint": "GET /challenge, solve, submit with X-PoW-Token + X-PoW-Solution", }, "cli": { "install": f"curl -o fpaste {url}/client && chmod +x fpaste", "usage": "fpaste file.txt # encrypts by default", }, "usage": { "create": f"curl -X POST --data-binary @file.txt {url}/ (with PoW headers)", "get": f"curl {url}//raw", "delete": f"curl -X DELETE {url}/ (with X-SSL-Client-SHA1)", }, } return json_response(response_data) def post(self) -> Response: """Create a new paste.""" # Parse content content: bytes | None = None mime_type: str | None = None if request.is_json: data = request.get_json(silent=True) if data and isinstance(data.get("content"), str): content = data["content"].encode("utf-8") mime_type = "text/plain" else: content = request.get_data(as_text=False) if content: mime_type = detect_mime_type(content, request.content_type) if not content: return error_response("No content provided", 400) # Separate trusted (for limits) from fingerprint (for ownership) trusted_client = get_client_id() # Only trusted certs get elevated limits owner = get_client_fingerprint() # Any cert can own pastes # Rate limiting (check before expensive operations) client_ip = get_client_ip() allowed, _remaining, reset_seconds = check_rate_limit( client_ip, authenticated=bool(trusted_client) ) if not allowed: current_app.logger.warning( "Rate limit exceeded: ip=%s trusted=%s", client_ip, bool(trusted_client) ) response = error_response( "Rate limit exceeded", 429, retry_after=reset_seconds, ) response.headers["Retry-After"] = str(reset_seconds) response.headers["X-RateLimit-Remaining"] = "0" response.headers["X-RateLimit-Reset"] = str(reset_seconds) return response # Proof-of-work verification difficulty = current_app.config["POW_DIFFICULTY"] if difficulty > 0: token = request.headers.get("X-PoW-Token", "") solution = request.headers.get("X-PoW-Solution", "") if not token or not solution: return error_response( "Proof-of-work required", 400, hint="GET /challenge for a new challenge" ) valid, err = verify_pow(token, solution) if not valid: current_app.logger.warning( "PoW verification failed: %s from=%s", err, request.remote_addr ) return error_response(f"Proof-of-work failed: {err}", 400) # Size limits (only trusted clients get elevated limits) content_size = len(content) max_size = ( current_app.config["MAX_PASTE_SIZE_AUTH"] if trusted_client else current_app.config["MAX_PASTE_SIZE_ANON"] ) if content_size > max_size: return error_response( "Paste too large", 413, size=content_size, max_size=max_size, trusted=trusted_client is not None, ) # Minimum size check (enforces encryption overhead) min_size = current_app.config.get("MIN_PASTE_SIZE", 0) if min_size > 0 and content_size < min_size: return error_response( "Paste too small", 400, size=content_size, min_size=min_size, hint="Encrypt content before uploading (fpaste encrypts by default)", ) # Entropy check min_entropy = current_app.config.get("MIN_ENTROPY", 0) min_entropy_size = current_app.config.get("MIN_ENTROPY_SIZE", 256) if min_entropy > 0 and content_size >= min_entropy_size: entropy = calculate_entropy(content) if entropy < min_entropy: current_app.logger.warning( "Low entropy rejected: %.2f < %.2f from=%s", entropy, min_entropy, request.remote_addr, ) return error_response( "Content entropy too low", 400, entropy=round(entropy, 2), min_entropy=min_entropy, hint="Encrypt content before uploading (fpaste encrypts by default)", ) # Binary content requirement (reject recognizable formats) if current_app.config.get("REQUIRE_BINARY", False): is_recognized, detected_format = is_recognizable_format(content) if is_recognized: current_app.logger.warning( "Recognizable format rejected: %s from=%s", detected_format, request.remote_addr, ) return error_response( "Recognizable format not allowed", 400, detected=detected_format, hint="Encrypt content before uploading (fpaste encrypts by default)", ) # Deduplication check content_hash = hashlib.sha256(content).hexdigest() is_allowed, dedup_count = check_content_hash(content_hash) if not is_allowed: window = current_app.config["CONTENT_DEDUP_WINDOW"] current_app.logger.warning( "Dedup threshold exceeded: hash=%s count=%d from=%s", content_hash[:16], dedup_count, request.remote_addr, ) return error_response( "Duplicate content rate limit exceeded", 429, count=dedup_count, window_seconds=window, ) # Parse optional headers burn_header = request.headers.get("X-Burn-After-Read", "").strip().lower() burn_after_read = burn_header in ("true", "1", "yes") # Determine default expiry based on authentication level # Anonymous < Untrusted cert < Trusted cert (registered in PKI) if owner is None: # Anonymous user default_expiry = current_app.config.get("EXPIRY_ANON", 86400) elif trusted_client: # Trusted certificate (registered in PKI) from app.pki import is_trusted_certificate if is_trusted_certificate(owner): default_expiry = current_app.config.get("EXPIRY_TRUSTED", 2592000) else: default_expiry = current_app.config.get("EXPIRY_UNTRUSTED", 604800) else: # Has cert but not trusted default_expiry = current_app.config.get("EXPIRY_UNTRUSTED", 604800) expires_at = None expiry_header = request.headers.get("X-Expiry", "").strip() if expiry_header: try: expiry_seconds = int(expiry_header) if expiry_seconds > 0: max_expiry = current_app.config.get("MAX_EXPIRY_SECONDS", 0) if max_expiry > 0: expiry_seconds = min(expiry_seconds, max_expiry) expires_at = int(time.time()) + expiry_seconds except ValueError: pass # Apply default expiry if none specified (0 = no expiry for trusted) if expires_at is None and default_expiry > 0: expires_at = int(time.time()) + default_expiry password_hash = None password_header = request.headers.get("X-Paste-Password", "") if password_header: if len(password_header) > 1024: return error_response("Password too long (max 1024 chars)", 400) password_hash = hash_password(password_header) # Insert paste paste_id = generate_paste_id(content) now = int(time.time()) db = get_db() db.execute( """INSERT INTO pastes (id, content, mime_type, owner, created_at, last_accessed, burn_after_read, expires_at, password_hash) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( paste_id, content, mime_type, owner, now, now, 1 if burn_after_read else 0, expires_at, password_hash, ), ) db.commit() # Build response response_data: dict[str, Any] = { "id": paste_id, "url": f"/{paste_id}", "raw": f"/{paste_id}/raw", "mime_type": mime_type, "created_at": now, } if owner: response_data["owner"] = owner if burn_after_read: response_data["burn_after_read"] = True if expires_at: response_data["expires_at"] = expires_at if password_hash: response_data["password_protected"] = True # Record successful paste for anti-flood tracking record_antiflood_request() return json_response(response_data, 201) class HealthView(MethodView): """Health check endpoint.""" def get(self) -> Response: """Return health status with database check.""" try: db = get_db() db.execute("SELECT 1") return json_response({"status": "healthy", "database": "ok"}) except Exception: return json_response({"status": "unhealthy", "database": "error"}, 503) class ChallengeView(MethodView): """Proof-of-work challenge endpoint.""" def get(self) -> Response: """Generate and return PoW challenge.""" base_difficulty = current_app.config["POW_DIFFICULTY"] if base_difficulty == 0: return json_response({"enabled": False, "difficulty": 0}) ch = generate_challenge() response = { "enabled": True, "nonce": ch["nonce"], "difficulty": ch["difficulty"], "expires": ch["expires"], "token": ch["token"], } # Indicate if difficulty is elevated due to anti-flood if ch["difficulty"] > base_difficulty: response["elevated"] = True response["base_difficulty"] = base_difficulty return json_response(response) class RegisterChallengeView(MethodView): """Registration PoW challenge endpoint (higher difficulty).""" def get(self) -> Response: """Generate PoW challenge for registration (higher difficulty).""" register_difficulty = current_app.config.get("REGISTER_POW_DIFFICULTY", 24) if register_difficulty == 0: return json_response({"enabled": False, "difficulty": 0}) ch = generate_challenge(difficulty_override=register_difficulty) return json_response( { "enabled": True, "nonce": ch["nonce"], "difficulty": ch["difficulty"], "expires": ch["expires"], "token": ch["token"], "purpose": "registration", } ) class RegisterView(MethodView): """Public client certificate registration endpoint.""" def post(self) -> Response: """Register and obtain a client certificate. Requires PoW to prevent abuse. Returns PKCS#12 bundle with: - Client certificate - Client private key - CA certificate Auto-generates CA if not present and PKI_CA_PASSWORD is configured. """ from cryptography import x509 from cryptography.hazmat.primitives import serialization from app.pki import ( CANotFoundError, PKIError, create_pkcs12, generate_ca, get_ca_info, issue_certificate, ) # Check PKI configuration password = current_app.config.get("PKI_CA_PASSWORD", "") if not password: return error_response( "Registration not available", 503, hint="PKI_CA_PASSWORD not configured", ) # Verify PoW register_difficulty = current_app.config.get("REGISTER_POW_DIFFICULTY", 24) if register_difficulty > 0: token = request.headers.get("X-PoW-Token", "") solution = request.headers.get("X-PoW-Solution", "") if not token or not solution: return error_response( "Proof-of-work required", 400, hint="GET /register/challenge for a registration challenge", difficulty=register_difficulty, ) valid, err = verify_pow(token, solution, min_difficulty=register_difficulty) if not valid: current_app.logger.warning( "Registration PoW failed: %s from=%s", err, request.remote_addr ) return error_response(f"Proof-of-work failed: {err}", 400) # Parse common_name from request common_name = None if request.is_json: data = request.get_json(silent=True) if data and isinstance(data.get("common_name"), str): common_name = data["common_name"][:64].strip() if not common_name: # Generate random common name if not provided common_name = f"client-{secrets.token_hex(4)}" # Auto-generate CA if needed (skip PKI_ENABLED check for registration) ca_info = get_ca_info(skip_enabled_check=True) if ca_info is None: ca_days = current_app.config.get("PKI_CA_DAYS", 3650) try: ca_info = generate_ca("FlaskPaste CA", password, days=ca_days) current_app.logger.info( "CA auto-generated for registration: fingerprint=%s", ca_info["fingerprint_sha1"][:12], ) except PKIError as e: current_app.logger.error("CA auto-generation failed: %s", e) return error_response("CA generation failed", 500) # Issue certificate try: cert_days = current_app.config.get("PKI_CERT_DAYS", 365) cert_info = issue_certificate(common_name, password, days=cert_days) except CANotFoundError: return error_response("CA not available", 500) except PKIError as e: current_app.logger.error("Certificate issuance failed: %s", e) return error_response("Certificate issuance failed", 500) # Load CA cert for PKCS#12 (reuse ca_info from above, or refresh if it was just generated) if ca_info is None or "certificate_pem" not in ca_info: ca_info = get_ca_info(skip_enabled_check=True) ca_cert = x509.load_pem_x509_certificate(ca_info["certificate_pem"].encode()) client_cert = x509.load_pem_x509_certificate(cert_info["certificate_pem"].encode()) client_key = serialization.load_pem_private_key( cert_info["private_key_pem"].encode(), password=None ) # Create PKCS#12 bundle (no password for easy import) p12_data = create_pkcs12( private_key=client_key, certificate=client_cert, ca_certificate=ca_cert, friendly_name=common_name, password=None, ) current_app.logger.info( "Client registered: cn=%s fingerprint=%s from=%s", common_name, cert_info["fingerprint_sha1"][:12], request.remote_addr, ) # Return PKCS#12 as binary download response = Response(p12_data, mimetype="application/x-pkcs12") response.headers["Content-Disposition"] = f'attachment; filename="{common_name}.p12"' response.headers["X-Fingerprint-SHA1"] = cert_info["fingerprint_sha1"] response.headers["X-Certificate-Expires"] = str(cert_info["expires_at"]) response.headers["X-Is-Admin"] = "1" if cert_info.get("is_admin") else "0" return response class ClientView(MethodView): """CLI client download endpoint.""" def get(self) -> Response: """Serve fpaste CLI with server URL pre-configured.""" import os server_url = base_url() client_path = os.path.join(current_app.root_path, "..", "fpaste") try: with open(client_path) as f: content = f.read() # Replace default server URL content = content.replace( '"server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000")', f'"server": os.environ.get("FLASKPASTE_SERVER", "{server_url}")', ) content = content.replace( "http://localhost:5000)", f"{server_url})", ) response = Response(content, mimetype="text/x-python") response.headers["Content-Disposition"] = "attachment; filename=fpaste" return response except FileNotFoundError: return error_response("Client not available", 404) class PasteView(MethodView): """Paste metadata operations.""" def get(self, paste_id: str) -> Response: """Retrieve paste metadata.""" # Validate and fetch if err := validate_paste_id(paste_id): return err if err := fetch_paste(paste_id): return err row: Row = g.paste g.db.commit() response_data: dict[str, Any] = { "id": row["id"], "mime_type": row["mime_type"], "size": row["size"], "created_at": row["created_at"], "raw": f"/{paste_id}/raw", } if row["burn_after_read"]: response_data["burn_after_read"] = True if row["expires_at"]: response_data["expires_at"] = row["expires_at"] if row["password_hash"]: response_data["password_protected"] = True return json_response(response_data) def head(self, paste_id: str) -> Response: """Return paste metadata headers only.""" return self.get(paste_id) def put(self, paste_id: str) -> Response: """Update paste content and/or metadata. Requires authentication and ownership. Content update: Send raw body with Content-Type header Metadata update: Use headers with empty body Headers: - X-Paste-Password: Set/change password - X-Remove-Password: true to remove password - X-Extend-Expiry: Seconds to add to current expiry """ # Validate paste ID format if err := validate_paste_id(paste_id): return err if err := require_auth(): return err db = get_db() # Fetch current paste row = db.execute( """SELECT id, owner, content, mime_type, expires_at, password_hash FROM pastes WHERE id = ?""", (paste_id,), ).fetchone() if row is None: return error_response("Paste not found", 404) if row["owner"] != g.client_id: return error_response("Permission denied", 403) # Check for burn-after-read (cannot update) burn_check = db.execute( "SELECT burn_after_read FROM pastes WHERE id = ?", (paste_id,) ).fetchone() if burn_check and burn_check["burn_after_read"]: return error_response("Cannot update burn-after-read paste", 400) # Parse update parameters new_password = request.headers.get("X-Paste-Password", "").strip() or None remove_password = request.headers.get("X-Remove-Password", "").lower() in ( "true", "1", "yes", ) extend_expiry_str = request.headers.get("X-Extend-Expiry", "").strip() # Prepare update fields update_fields = [] update_params: list[Any] = [] # Content update (if body provided) content = request.get_data() if content: mime_type = request.content_type or "application/octet-stream" # Sanitize MIME type if not MIME_PATTERN.match(mime_type.split(";")[0].strip()): mime_type = "application/octet-stream" update_fields.append("content = ?") update_params.append(content) update_fields.append("mime_type = ?") update_params.append(mime_type.split(";")[0].strip()) # Password update if remove_password: update_fields.append("password_hash = NULL") elif new_password: update_fields.append("password_hash = ?") update_params.append(hash_password(new_password)) # Expiry extension if extend_expiry_str: try: extend_seconds = int(extend_expiry_str) if extend_seconds > 0: current_expiry = row["expires_at"] if current_expiry: new_expiry = current_expiry + extend_seconds else: # If no expiry set, create one from now new_expiry = int(time.time()) + extend_seconds update_fields.append("expires_at = ?") update_params.append(new_expiry) except ValueError: return error_response("Invalid X-Extend-Expiry value", 400) if not update_fields: return error_response("No updates provided", 400) # Execute update (fields are hardcoded strings, safe from injection) update_sql = f"UPDATE pastes SET {', '.join(update_fields)} WHERE id = ?" # noqa: S608 # nosec B608 update_params.append(paste_id) db.execute(update_sql, update_params) db.commit() # Fetch updated paste for response updated = db.execute( """SELECT id, mime_type, length(content) as size, expires_at, CASE WHEN password_hash IS NOT NULL THEN 1 ELSE 0 END as password_protected FROM pastes WHERE id = ?""", (paste_id,), ).fetchone() response_data: dict[str, Any] = { "id": updated["id"], "size": updated["size"], "mime_type": updated["mime_type"], } if updated["expires_at"]: response_data["expires_at"] = updated["expires_at"] if updated["password_protected"]: response_data["password_protected"] = True return json_response(response_data) class PasteRawView(MethodView): """Raw paste content retrieval.""" def get(self, paste_id: str) -> Response: """Retrieve raw paste content.""" # Validate and fetch if err := validate_paste_id(paste_id): return err if err := fetch_paste(paste_id): return err row: Row = g.paste db = g.db burn_after_read = row["burn_after_read"] if burn_after_read: db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,)) current_app.logger.info("Burn-after-read paste deleted: %s", paste_id) db.commit() response = Response(row["content"], mimetype=row["mime_type"]) if row["mime_type"].startswith(("image/", "text/")): response.headers["Content-Disposition"] = "inline" if burn_after_read: response.headers["X-Burn-After-Read"] = "true" return response def head(self, paste_id: str) -> Response: """Return raw paste headers without triggering burn.""" # Validate and fetch if err := validate_paste_id(paste_id): return err if err := fetch_paste(paste_id): return err row: Row = g.paste g.db.commit() response = Response(mimetype=row["mime_type"]) response.headers["Content-Length"] = str(row["size"]) if row["mime_type"].startswith(("image/", "text/")): response.headers["Content-Disposition"] = "inline" if row["burn_after_read"]: response.headers["X-Burn-After-Read"] = "true" return response class PasteDeleteView(MethodView): """Paste deletion with authentication.""" def delete(self, paste_id: str) -> Response: """Delete paste. Requires ownership or admin rights.""" # Validate if err := validate_paste_id(paste_id): return err if err := require_auth(): return err db = get_db() row = db.execute("SELECT owner FROM pastes WHERE id = ?", (paste_id,)).fetchone() if row is None: return error_response("Paste not found", 404) # Allow if owner or admin if row["owner"] != g.client_id and not is_admin(): return error_response("Permission denied", 403) db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,)) db.commit() return json_response({"message": "Paste deleted"}) class PastesListView(MethodView): """List pastes with authentication.""" def get(self) -> Response: """List pastes owned by authenticated user, or all pastes for admins. Privacy guarantees: - Requires authentication (mTLS client certificate) - Regular users can ONLY see their own pastes - Admins can see all pastes (with optional owner filter) - Content is never returned, only metadata Query parameters: - limit: max results (default 50, max 200) - offset: pagination offset (default 0) - type: filter by MIME type (glob pattern, e.g., "image/*") - after: filter by created_at >= timestamp - before: filter by created_at <= timestamp - all: (admin only) if "1", list all pastes instead of own - owner: (admin only) filter by owner fingerprint """ import fnmatch # Strict authentication requirement if err := require_auth(): return err client_id = g.client_id user_is_admin = is_admin() # Parse pagination parameters try: limit = min(int(request.args.get("limit", 50)), 200) offset = max(int(request.args.get("offset", 0)), 0) except (ValueError, TypeError): limit, offset = 50, 0 # Parse filter parameters type_filter = request.args.get("type", "").strip() try: after_ts = int(request.args.get("after", 0)) except (ValueError, TypeError): after_ts = 0 try: before_ts = int(request.args.get("before", 0)) except (ValueError, TypeError): before_ts = 0 # Admin-only parameters show_all = request.args.get("all", "0") == "1" and user_is_admin owner_filter = request.args.get("owner", "").strip() if user_is_admin else "" db = get_db() # Build query with filters where_clauses: list[str] = [] params: list[Any] = [] # Owner filtering logic if show_all: # Admin viewing all pastes (with optional owner filter) if owner_filter: where_clauses.append("owner = ?") params.append(owner_filter) # else: no owner filter, show all else: # Regular user or admin without ?all=1: show only own pastes where_clauses.append("owner = ?") params.append(client_id) if after_ts > 0: where_clauses.append("created_at >= ?") params.append(after_ts) if before_ts > 0: where_clauses.append("created_at <= ?") params.append(before_ts) # Build WHERE clause (may be empty for admin viewing all) where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" # Count total pastes matching filters (where_sql is safe, built from constants) count_row = db.execute( f"SELECT COUNT(*) as total FROM pastes WHERE {where_sql}", # noqa: S608 # nosec B608 params, ).fetchone() total = count_row["total"] if count_row else 0 # Fetch pastes with metadata only (where_sql is safe, built from constants) # Include owner for admin view rows = db.execute( f"""SELECT id, owner, mime_type, length(content) as size, created_at, last_accessed, burn_after_read, expires_at, CASE WHEN password_hash IS NOT NULL THEN 1 ELSE 0 END as password_protected FROM pastes WHERE {where_sql} ORDER BY created_at DESC LIMIT ? OFFSET ?""", # noqa: S608 # nosec B608 [*params, limit, offset], ).fetchall() # Apply MIME type filter (glob pattern matching done in Python for flexibility) if type_filter: rows = [r for r in rows if fnmatch.fnmatch(r["mime_type"], type_filter)] pastes = [] for row in rows: paste: dict[str, Any] = { "id": row["id"], "mime_type": row["mime_type"], "size": row["size"], "created_at": row["created_at"], "last_accessed": row["last_accessed"], "url": f"/{row['id']}", "raw": f"/{row['id']}/raw", } # Include owner for admin view if show_all and row["owner"]: paste["owner"] = row["owner"] if row["burn_after_read"]: paste["burn_after_read"] = True if row["expires_at"]: paste["expires_at"] = row["expires_at"] if row["password_protected"]: paste["password_protected"] = True pastes.append(paste) response_data: dict[str, Any] = { "pastes": pastes, "count": len(pastes), "total": total, "limit": limit, "offset": offset, } if user_is_admin: response_data["is_admin"] = True return json_response(response_data) # ───────────────────────────────────────────────────────────────────────────── # PKI Views (Certificate Authority) # ───────────────────────────────────────────────────────────────────────────── def require_pki_enabled() -> Response | None: """Check if PKI is enabled. Returns error response or None if enabled.""" if not current_app.config.get("PKI_ENABLED"): return error_response("PKI not enabled", 404) return None class PKIStatusView(MethodView): """PKI status endpoint.""" def get(self) -> Response: """Return PKI status and CA info if available.""" if not current_app.config.get("PKI_ENABLED"): return json_response({"enabled": False}) from app.pki import get_ca_info ca_info = get_ca_info() if ca_info is None: return json_response( { "enabled": True, "ca_exists": False, "hint": "POST /pki/ca to generate CA", } ) return json_response( { "enabled": True, "ca_exists": True, "common_name": ca_info["common_name"], "fingerprint_sha1": ca_info["fingerprint_sha1"], "created_at": ca_info["created_at"], "expires_at": ca_info["expires_at"], "key_algorithm": ca_info["key_algorithm"], } ) class PKICAGenerateView(MethodView): """CA generation endpoint (first-run only).""" def post(self) -> Response: """Generate CA certificate. Only works if no CA exists.""" if err := require_pki_enabled(): return err from app.pki import ( CAExistsError, PKIError, generate_ca, get_ca_info, ) # Check if CA already exists if get_ca_info() is not None: return error_response("CA already exists", 409) # Get CA password from config password = current_app.config.get("PKI_CA_PASSWORD", "") if not password: return error_response( "PKI_CA_PASSWORD not configured", 500, hint="Set FLASKPASTE_PKI_CA_PASSWORD environment variable", ) # Parse request for optional common name common_name = "FlaskPaste CA" if request.is_json: data = request.get_json(silent=True) if data and isinstance(data.get("common_name"), str): common_name = data["common_name"][:64] # Generate CA try: days = current_app.config.get("PKI_CA_DAYS", 3650) owner = get_client_fingerprint() ca_info = generate_ca(common_name, password, days=days, owner=owner) except CAExistsError: return error_response("CA already exists", 409) except PKIError as e: current_app.logger.error("CA generation failed: %s", e) return error_response("CA generation failed", 500) current_app.logger.info( "CA generated: cn=%s fingerprint=%s", common_name, ca_info["fingerprint_sha1"][:12] ) return json_response( { "message": "CA generated", "common_name": ca_info["common_name"], "fingerprint_sha1": ca_info["fingerprint_sha1"], "created_at": ca_info["created_at"], "expires_at": ca_info["expires_at"], "download": prefixed_url("/pki/ca.crt"), }, 201, ) class PKICADownloadView(MethodView): """CA certificate download endpoint.""" def get(self) -> Response: """Download CA certificate in PEM format.""" if err := require_pki_enabled(): return err from app.pki import get_ca_info ca_info = get_ca_info() if ca_info is None: return error_response("CA not initialized", 404) response = Response(ca_info["certificate_pem"], mimetype="application/x-pem-file") response.headers["Content-Disposition"] = ( f"attachment; filename={ca_info['common_name'].replace(' ', '_')}.crt" ) return response class PKIIssueView(MethodView): """Certificate issuance endpoint (open registration).""" def post(self) -> Response: """Issue a new client certificate.""" if err := require_pki_enabled(): return err from app.pki import ( CANotFoundError, PKIError, issue_certificate, ) # Parse request common_name = None if request.is_json: data = request.get_json(silent=True) if data and isinstance(data.get("common_name"), str): common_name = data["common_name"][:64] if not common_name: return error_response( "common_name required", 400, hint='POST {"common_name": "your-name"}' ) # Get CA password from config password = current_app.config.get("PKI_CA_PASSWORD", "") if not password: return error_response("PKI not properly configured", 500) # Issue certificate try: days = current_app.config.get("PKI_CERT_DAYS", 365) issued_to = get_client_fingerprint() cert_info = issue_certificate(common_name, password, days=days, issued_to=issued_to) except CANotFoundError: return error_response("CA not initialized", 404) except PKIError as e: current_app.logger.error("Certificate issuance failed: %s", e) return error_response("Certificate issuance failed", 500) current_app.logger.info( "Certificate issued: cn=%s serial=%s fingerprint=%s to=%s", common_name, cert_info["serial"][:8], cert_info["fingerprint_sha1"][:12], issued_to or "anonymous", ) # Return certificate bundle return json_response( { "message": "Certificate issued", "serial": cert_info["serial"], "common_name": cert_info["common_name"], "fingerprint_sha1": cert_info["fingerprint_sha1"], "created_at": cert_info["created_at"], "expires_at": cert_info["expires_at"], "certificate_pem": cert_info["certificate_pem"], "private_key_pem": cert_info["private_key_pem"], "is_admin": cert_info.get("is_admin", False), }, 201, ) class PKICertsView(MethodView): """Certificate listing endpoint.""" def get(self) -> Response: """List issued certificates.""" if err := require_pki_enabled(): return err client_id = get_client_fingerprint() db = get_db() # Users with certificates can see their own certs or certs they issued # Anonymous users (no cert) see nothing if client_id: rows = db.execute( """SELECT serial, common_name, fingerprint_sha1, created_at, expires_at, issued_to, status, revoked_at FROM issued_certificates WHERE issued_to = ? OR fingerprint_sha1 = ? ORDER BY created_at DESC""", (client_id, client_id), ).fetchall() else: # Anonymous: empty list rows = [] certs = [] for row in rows: cert = { "serial": row["serial"], "common_name": row["common_name"], "fingerprint_sha1": row["fingerprint_sha1"], "created_at": row["created_at"], "expires_at": row["expires_at"], "status": row["status"], } if row["issued_to"]: cert["issued_to"] = row["issued_to"] if row["revoked_at"]: cert["revoked_at"] = row["revoked_at"] certs.append(cert) return json_response({"certificates": certs, "count": len(certs)}) class PKIRevokeView(MethodView): """Certificate revocation endpoint.""" def post(self, serial: str) -> Response: """Revoke a certificate by serial number.""" if err := require_pki_enabled(): return err if err := require_auth(): return err from app.pki import CertificateNotFoundError, PKIError, revoke_certificate db = get_db() # Check certificate exists and get ownership info row = db.execute( "SELECT issued_to, fingerprint_sha1, status FROM issued_certificates WHERE serial = ?", (serial,), ).fetchone() if row is None: return error_response("Certificate not found", 404) if row["status"] == "revoked": return error_response("Certificate already revoked", 409) # Check permission: must be issuer or the certificate itself client_id = g.client_id can_revoke = row["issued_to"] == client_id or row["fingerprint_sha1"] == client_id if not can_revoke: return error_response("Permission denied", 403) # Revoke try: revoke_certificate(serial) except CertificateNotFoundError: return error_response("Certificate not found", 404) except PKIError as e: current_app.logger.error("Revocation failed: %s", e) return error_response("Revocation failed", 500) current_app.logger.info("Certificate revoked: serial=%s by=%s", serial[:8], client_id[:12]) return json_response({"message": "Certificate revoked", "serial": serial}) # ───────────────────────────────────────────────────────────────────────────── # Route Registration # ───────────────────────────────────────────────────────────────────────────── # Index and paste creation bp.add_url_rule("/", view_func=IndexView.as_view("index")) # Utility endpoints bp.add_url_rule("/health", view_func=HealthView.as_view("health")) bp.add_url_rule("/challenge", view_func=ChallengeView.as_view("challenge")) bp.add_url_rule("/client", view_func=ClientView.as_view("client")) # Registration endpoints (public certificate issuance with PoW) bp.add_url_rule( "/register/challenge", view_func=RegisterChallengeView.as_view("register_challenge") ) bp.add_url_rule("/register", view_func=RegisterView.as_view("register")) # Paste operations bp.add_url_rule("/pastes", view_func=PastesListView.as_view("pastes_list")) bp.add_url_rule("/", view_func=PasteView.as_view("paste"), methods=["GET", "HEAD", "PUT"]) bp.add_url_rule( "//raw", view_func=PasteRawView.as_view("paste_raw"), methods=["GET", "HEAD"] ) bp.add_url_rule( "/", view_func=PasteDeleteView.as_view("paste_delete"), methods=["DELETE"] ) # PKI endpoints bp.add_url_rule("/pki", view_func=PKIStatusView.as_view("pki_status")) bp.add_url_rule("/pki/ca", view_func=PKICAGenerateView.as_view("pki_ca_generate")) bp.add_url_rule("/pki/ca.crt", view_func=PKICADownloadView.as_view("pki_ca_download")) bp.add_url_rule("/pki/issue", view_func=PKIIssueView.as_view("pki_issue")) bp.add_url_rule("/pki/certs", view_func=PKICertsView.as_view("pki_certs")) bp.add_url_rule( "/pki/revoke/", view_func=PKIRevokeView.as_view("pki_revoke"), methods=["POST"] )