"""API route handlers.""" import hashlib import hmac import json import math import os import re import secrets import time from flask import Response, current_app, request from app.api import bp from app.config import VERSION from app.database import check_content_hash, get_db # Valid paste ID pattern (hexadecimal only) PASTE_ID_PATTERN = re.compile(r"^[a-f0-9]+$") def _url(path: str) -> str: """Generate URL with configured prefix for reverse proxy deployments.""" prefix = current_app.config.get("URL_PREFIX", "") return f"{prefix}{path}" def _base_url() -> str: """Detect base URL from request headers (for reverse proxy deployments).""" scheme = ( request.headers.get("X-Forwarded-Proto") or request.headers.get("X-Scheme") or request.scheme ) host = ( request.headers.get("X-Forwarded-Host") or request.headers.get("Host") or request.host ) prefix = current_app.config.get("URL_PREFIX", "") return f"{scheme}://{host}{prefix}" # Runtime-generated PoW secret (used if not configured) _pow_secret_cache = None # Valid client certificate SHA1 pattern (40 hex chars) CLIENT_ID_PATTERN = re.compile(r"^[a-f0-9]{40}$") # Magic bytes for common binary formats MAGIC_SIGNATURES = { b"\x89PNG\r\n\x1a\n": "image/png", b"\xff\xd8\xff": "image/jpeg", b"GIF87a": "image/gif", b"GIF89a": "image/gif", b"RIFF": "image/webp", # WebP (check for WEBP after RIFF) b"PK\x03\x04": "application/zip", b"%PDF": "application/pdf", b"\x1f\x8b": "application/gzip", } def _calculate_entropy(data: bytes) -> float: """Calculate Shannon entropy in bits per byte. Returns value between 0 (uniform) and 8 (perfectly random). Encrypted/compressed data: ~7.5-8.0 English text: ~4.0-5.0 Binary executables: ~5.0-6.5 """ if not data: return 0.0 # Count byte frequencies freq = [0] * 256 for byte in data: freq[byte] += 1 # Calculate entropy length = len(data) entropy = 0.0 for count in freq: if count > 0: p = count / length entropy -= p * math.log2(p) return entropy def _get_pow_secret() -> bytes: """Get or generate the PoW signing secret.""" global _pow_secret_cache configured = current_app.config.get("POW_SECRET", "") if configured: return configured.encode() if _pow_secret_cache is None: _pow_secret_cache = secrets.token_bytes(32) return _pow_secret_cache def _generate_challenge() -> dict: """Generate a new PoW challenge.""" difficulty = current_app.config["POW_DIFFICULTY"] ttl = current_app.config["POW_CHALLENGE_TTL"] expires = int(time.time()) + ttl nonce = secrets.token_hex(16) # Sign the challenge to prevent tampering msg = f"{nonce}:{expires}:{difficulty}".encode() sig = hmac.new(_get_pow_secret(), msg, hashlib.sha256).hexdigest() return { "nonce": nonce, "difficulty": difficulty, "expires": expires, "signature": sig, } def _verify_pow(challenge: str, nonce: str, solution: str) -> tuple[bool, str]: """Verify a proof-of-work solution. Args: challenge: The challenge nonce from /challenge nonce: Combined "nonce:expires:difficulty:signature" string solution: The solution number found by client Returns: Tuple of (valid, error_message) """ difficulty = current_app.config["POW_DIFFICULTY"] # PoW disabled if difficulty == 0: return True, "" # Parse challenge components try: parts = nonce.split(":") if len(parts) != 4: return False, "Invalid challenge format" ch_nonce, ch_expires, ch_difficulty, ch_sig = parts ch_expires = int(ch_expires) ch_difficulty = int(ch_difficulty) except (ValueError, TypeError): return False, "Invalid challenge format" # Verify signature msg = f"{ch_nonce}:{ch_expires}:{ch_difficulty}".encode() expected_sig = hmac.new(_get_pow_secret(), msg, hashlib.sha256).hexdigest() if not hmac.compare_digest(ch_sig, expected_sig): return False, "Invalid challenge signature" # Check expiry if int(time.time()) > ch_expires: return False, "Challenge expired" # Verify difficulty matches current config if ch_difficulty != difficulty: return False, "Difficulty mismatch" # Verify solution try: solution_int = int(solution) if solution_int < 0: return False, "Invalid solution" except (ValueError, TypeError): return False, "Invalid solution" # Check hash meets difficulty requirement work = f"{ch_nonce}:{solution}".encode() hash_bytes = hashlib.sha256(work).digest() # Count leading zero bits zero_bits = 0 for byte in hash_bytes: if byte == 0: zero_bits += 8 else: # Count leading zeros in this byte zero_bits += (8 - byte.bit_length()) break if zero_bits < difficulty: return False, f"Insufficient work: {zero_bits} < {difficulty} bits" return True, "" def _is_valid_paste_id(paste_id: str) -> bool: """Validate paste ID format (hexadecimal, correct length).""" expected_length = current_app.config["PASTE_ID_LENGTH"] return ( len(paste_id) == expected_length and PASTE_ID_PATTERN.match(paste_id) is not None ) def _detect_mime_type(content: bytes, content_type: str | None = None) -> str: """Detect MIME type from content bytes, with magic byte detection taking priority.""" # Check magic bytes first - most reliable method for magic, mime in MAGIC_SIGNATURES.items(): if content.startswith(magic): # Special case for WebP (RIFF....WEBP) if magic == b"RIFF" and len(content) >= 12: if content[8:12] != b"WEBP": continue return mime # Trust explicit Content-Type if it's specific (not generic defaults) generic_types = { "application/octet-stream", "application/x-www-form-urlencoded", "text/plain", } if content_type: mime = content_type.split(";")[0].strip().lower() if mime not in generic_types: # Sanitize: only allow safe characters in MIME type if re.match(r"^[a-z0-9][a-z0-9!#$&\-^_.+]*\/[a-z0-9][a-z0-9!#$&\-^_.+]*$", mime): return mime # Try to decode as UTF-8 text try: content.decode("utf-8") return "text/plain" except UnicodeDecodeError: return "application/octet-stream" def _generate_id(content: bytes) -> str: """Generate a short unique ID from content hash and timestamp.""" data = content + str(time.time_ns()).encode() length = current_app.config["PASTE_ID_LENGTH"] return hashlib.sha256(data).hexdigest()[:length] def _json_response(data: dict, status: int = 200) -> Response: """Create a JSON response with proper encoding and security headers.""" response = Response( json.dumps(data, ensure_ascii=False), status=status, mimetype="application/json", ) return response def _is_trusted_proxy() -> bool: """Verify request comes from a trusted reverse proxy. If TRUSTED_PROXY_SECRET is configured, the request must include a matching X-Proxy-Secret header. This provides defense-in-depth against header spoofing if an attacker bypasses the reverse proxy. Returns True if no secret is configured (backwards compatible) or if the secret matches. """ expected_secret = current_app.config.get("TRUSTED_PROXY_SECRET", "") if not expected_secret: # No secret configured - trust all requests (backwards compatible) return True # Constant-time comparison to prevent timing attacks provided_secret = request.headers.get("X-Proxy-Secret", "") return hmac.compare_digest(expected_secret, provided_secret) def _get_client_id() -> str | None: """Extract and validate client identity from X-SSL-Client-SHA1 header. Returns lowercase SHA1 fingerprint or None if not present/invalid. SECURITY: The X-SSL-Client-SHA1 header is only trusted if the request comes from a trusted proxy (verified via X-Proxy-Secret if configured). """ # Verify request comes from trusted proxy before trusting auth headers if not _is_trusted_proxy(): current_app.logger.warning( "Auth header ignored: X-Proxy-Secret mismatch from %s", request.remote_addr ) return None client_sha1 = request.headers.get("X-SSL-Client-SHA1", "").strip().lower() # Validate format: must be 40 hex characters (SHA1) if client_sha1 and CLIENT_ID_PATTERN.match(client_sha1): return client_sha1 return None @bp.route("/client", methods=["GET"]) def client(): """Download the fpaste CLI client with server URL pre-configured.""" import os # Detect scheme (check reverse proxy headers first) scheme = ( request.headers.get("X-Forwarded-Proto") or request.headers.get("X-Scheme") or request.scheme ) # Detect host (check reverse proxy headers first) host = ( request.headers.get("X-Forwarded-Host") or request.headers.get("Host") or request.host ) # Build server URL with prefix prefix = current_app.config.get("URL_PREFIX", "") server_url = f"{scheme}://{host}{prefix}" client_path = os.path.join(current_app.root_path, "..", "fpaste") try: with open(client_path, "r") as f: content = f.read() # Replace default server URL content = content.replace( '"server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000")', f'"server": os.environ.get("FLASKPASTE_SERVER", "{server_url}")', ) content = content.replace( "http://localhost:5000)", f"{server_url})", ) response = Response(content, mimetype="text/x-python") response.headers["Content-Disposition"] = "attachment; filename=fpaste" return response except FileNotFoundError: return _json_response({"error": "Client not available"}, 404) @bp.route("/health", methods=["GET"]) def health(): """Health check endpoint for load balancers and monitoring.""" try: db = get_db() db.execute("SELECT 1") return _json_response({"status": "healthy", "database": "ok"}) except Exception: return _json_response({"status": "unhealthy", "database": "error"}, 503) @bp.route("/challenge", methods=["GET"]) def challenge(): """Get a proof-of-work challenge for paste creation.""" difficulty = current_app.config["POW_DIFFICULTY"] if difficulty == 0: return _json_response({"enabled": False, "difficulty": 0}) ch = _generate_challenge() return _json_response({ "enabled": True, "nonce": ch["nonce"], "difficulty": ch["difficulty"], "expires": ch["expires"], "token": f"{ch['nonce']}:{ch['expires']}:{ch['difficulty']}:{ch['signature']}", }) @bp.route("/", methods=["GET", "POST"]) def index(): """Handle API info (GET) and paste creation (POST).""" if request.method == "POST": return create_paste() prefix = current_app.config.get("URL_PREFIX", "") return _json_response( { "name": "FlaskPaste", "version": VERSION, "prefix": prefix or "/", "endpoints": { f"GET {_url('/')}": "API information", f"GET {_url('/health')}": "Health check", f"GET {_url('/client')}": "Download CLI client", f"GET {_url('/challenge')}": "Get PoW challenge", f"POST {_url('/')}": "Create paste", f"GET {_url('/')}": "Retrieve paste metadata", f"GET {_url('//raw')}": "Retrieve raw paste content", f"DELETE {_url('/')}": "Delete paste", }, "usage": { "raw": f"curl --data-binary @file.txt {_base_url()}/", "pipe": f"cat file.txt | curl --data-binary @- {_base_url()}/", "json": f"curl -H 'Content-Type: application/json' -d '{{\"content\":\"...\"}}' {_base_url()}/", }, "note": "Use --data-binary (not -d) to preserve newlines", } ) def create_paste(): """Create a new paste from request body.""" content: bytes | None = None mime_type: str | None = None if request.is_json: data = request.get_json(silent=True) if data and isinstance(data.get("content"), str): content = data["content"].encode("utf-8") mime_type = "text/plain" else: content = request.get_data(as_text=False) if content: mime_type = _detect_mime_type(content, request.content_type) if not content: return _json_response({"error": "No content provided"}, 400) owner = _get_client_id() # Verify proof-of-work (if enabled) difficulty = current_app.config["POW_DIFFICULTY"] if difficulty > 0: pow_token = request.headers.get("X-PoW-Token", "") pow_solution = request.headers.get("X-PoW-Solution", "") if not pow_token or not pow_solution: return _json_response({ "error": "Proof-of-work required", "hint": "GET /challenge for a new challenge", }, 400) # Extract nonce from token for verification parts = pow_token.split(":") pow_nonce = parts[0] if parts else "" valid, err = _verify_pow(pow_nonce, pow_token, pow_solution) if not valid: current_app.logger.warning( "PoW verification failed: %s from=%s", err, request.remote_addr ) return _json_response({"error": f"Proof-of-work failed: {err}"}, 400) # Enforce size limits based on authentication content_size = len(content) if owner: max_size = current_app.config["MAX_PASTE_SIZE_AUTH"] else: max_size = current_app.config["MAX_PASTE_SIZE_ANON"] if content_size > max_size: return _json_response({ "error": "Paste too large", "size": content_size, "max_size": max_size, "authenticated": owner is not None, }, 413) # Check minimum entropy requirement (encryption enforcement) min_entropy = current_app.config.get("MIN_ENTROPY", 0) if min_entropy > 0: entropy = _calculate_entropy(content) if entropy < min_entropy: current_app.logger.warning( "Low entropy rejected: %.2f < %.2f from=%s", entropy, min_entropy, request.remote_addr ) return _json_response({ "error": "Content entropy too low", "entropy": round(entropy, 2), "min_entropy": min_entropy, "hint": "Encrypt content before uploading (-e flag in fpaste)", }, 400) # Check content deduplication threshold content_hash = hashlib.sha256(content).hexdigest() is_allowed, dedup_count = check_content_hash(content_hash) if not is_allowed: window = current_app.config["CONTENT_DEDUP_WINDOW"] current_app.logger.warning( "Dedup threshold exceeded: hash=%s count=%d from=%s", content_hash[:16], dedup_count, request.remote_addr ) return _json_response({ "error": "Duplicate content rate limit exceeded", "count": dedup_count, "window_seconds": window, }, 429) paste_id = _generate_id(content) now = int(time.time()) db = get_db() db.execute( "INSERT INTO pastes (id, content, mime_type, owner, created_at, last_accessed) VALUES (?, ?, ?, ?, ?, ?)", (paste_id, content, mime_type, owner, now, now), ) db.commit() response_data = { "id": paste_id, "url": f"/{paste_id}", "raw": f"/{paste_id}/raw", "mime_type": mime_type, "created_at": now, } if owner: response_data["owner"] = owner return _json_response(response_data, 201) @bp.route("/", methods=["GET", "HEAD"]) def get_paste(paste_id: str): """Retrieve paste metadata by ID. HEAD returns headers only.""" if not _is_valid_paste_id(paste_id): return _json_response({"error": "Invalid paste ID"}, 400) db = get_db() now = int(time.time()) # Update last_accessed and return paste in one transaction db.execute( "UPDATE pastes SET last_accessed = ? WHERE id = ?", (now, paste_id) ) row = db.execute( "SELECT id, mime_type, created_at, length(content) as size FROM pastes WHERE id = ?", (paste_id,) ).fetchone() db.commit() if row is None: return _json_response({"error": "Paste not found"}, 404) return _json_response({ "id": row["id"], "mime_type": row["mime_type"], "size": row["size"], "created_at": row["created_at"], "raw": f"/{paste_id}/raw", }) @bp.route("//raw", methods=["GET", "HEAD"]) def get_paste_raw(paste_id: str): """Retrieve raw paste content with correct MIME type. HEAD returns headers only.""" if not _is_valid_paste_id(paste_id): return _json_response({"error": "Invalid paste ID"}, 400) db = get_db() now = int(time.time()) # Update last_accessed and return paste in one transaction db.execute( "UPDATE pastes SET last_accessed = ? WHERE id = ?", (now, paste_id) ) row = db.execute( "SELECT content, mime_type FROM pastes WHERE id = ?", (paste_id,) ).fetchone() db.commit() if row is None: return _json_response({"error": "Paste not found"}, 404) mime_type = row["mime_type"] response = Response(row["content"], mimetype=mime_type) # Display inline for images and text, let browser decide for others if mime_type.startswith(("image/", "text/")): response.headers["Content-Disposition"] = "inline" return response @bp.route("/", methods=["DELETE"]) def delete_paste(paste_id: str): """Delete a paste by ID. Requires ownership via X-SSL-Client-SHA1 header.""" if not _is_valid_paste_id(paste_id): return _json_response({"error": "Invalid paste ID"}, 400) client_id = _get_client_id() if not client_id: return _json_response({"error": "Authentication required"}, 401) db = get_db() # Check paste exists and verify ownership row = db.execute( "SELECT owner FROM pastes WHERE id = ?", (paste_id,) ).fetchone() if row is None: return _json_response({"error": "Paste not found"}, 404) if row["owner"] != client_id: return _json_response({"error": "Permission denied"}, 403) db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,)) db.commit() return _json_response({"message": "Paste deleted"})