From 8fdeeaed9ca3b449f242d52d83d3e1e74b665081 Mon Sep 17 00:00:00 2001 From: Username Date: Sat, 20 Dec 2025 04:03:59 +0100 Subject: [PATCH] add proof-of-work spam prevention Clients must solve a SHA256 hash puzzle before paste creation. Configurable via FLASKPASTE_POW_DIFFICULTY (0 = disabled, 16 = default). Challenge tokens expire after FLASKPASTE_POW_TTL seconds (default 300). --- app/api/routes.py | 148 +++++++++++++++++++++++++++++++++++++++ app/config.py | 11 +++ fpaste | 59 ++++++++++++++++ tests/test_pow.py | 174 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 392 insertions(+) create mode 100644 tests/test_pow.py diff --git a/app/api/routes.py b/app/api/routes.py index bd0b30e..c4ca379 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -3,7 +3,9 @@ import hashlib import hmac import json +import os import re +import secrets import time from flask import Response, current_app, request @@ -14,6 +16,9 @@ from app.database import check_content_hash, get_db # Valid paste ID pattern (hexadecimal only) PASTE_ID_PATTERN = re.compile(r"^[a-f0-9]+$") +# Runtime-generated PoW secret (used if not configured) +_pow_secret_cache = None + # Valid client certificate SHA1 pattern (40 hex chars) CLIENT_ID_PATTERN = re.compile(r"^[a-f0-9]{40}$") @@ -30,6 +35,108 @@ MAGIC_SIGNATURES = { } +def _get_pow_secret() -> bytes: + """Get or generate the PoW signing secret.""" + global _pow_secret_cache + + configured = current_app.config.get("POW_SECRET", "") + if configured: + return configured.encode() + + if _pow_secret_cache is None: + _pow_secret_cache = secrets.token_bytes(32) + return _pow_secret_cache + + +def _generate_challenge() -> dict: + """Generate a new PoW challenge.""" + difficulty = current_app.config["POW_DIFFICULTY"] + ttl = current_app.config["POW_CHALLENGE_TTL"] + expires = int(time.time()) + ttl + nonce = secrets.token_hex(16) + + # Sign the challenge to prevent tampering + msg = f"{nonce}:{expires}:{difficulty}".encode() + sig = hmac.new(_get_pow_secret(), msg, hashlib.sha256).hexdigest() + + return { + "nonce": nonce, + "difficulty": difficulty, + "expires": expires, + "signature": sig, + } + + +def _verify_pow(challenge: str, nonce: str, solution: str) -> tuple[bool, str]: + """Verify a proof-of-work solution. + + Args: + challenge: The challenge nonce from /challenge + nonce: Combined "nonce:expires:difficulty:signature" string + solution: The solution number found by client + + Returns: + Tuple of (valid, error_message) + """ + difficulty = current_app.config["POW_DIFFICULTY"] + + # PoW disabled + if difficulty == 0: + return True, "" + + # Parse challenge components + try: + parts = nonce.split(":") + if len(parts) != 4: + return False, "Invalid challenge format" + ch_nonce, ch_expires, ch_difficulty, ch_sig = parts + ch_expires = int(ch_expires) + ch_difficulty = int(ch_difficulty) + except (ValueError, TypeError): + return False, "Invalid challenge format" + + # Verify signature + msg = f"{ch_nonce}:{ch_expires}:{ch_difficulty}".encode() + expected_sig = hmac.new(_get_pow_secret(), msg, hashlib.sha256).hexdigest() + if not hmac.compare_digest(ch_sig, expected_sig): + return False, "Invalid challenge signature" + + # Check expiry + if int(time.time()) > ch_expires: + return False, "Challenge expired" + + # Verify difficulty matches current config + if ch_difficulty != difficulty: + return False, "Difficulty mismatch" + + # Verify solution + try: + solution_int = int(solution) + if solution_int < 0: + return False, "Invalid solution" + except (ValueError, TypeError): + return False, "Invalid solution" + + # Check hash meets difficulty requirement + work = f"{ch_nonce}:{solution}".encode() + hash_bytes = hashlib.sha256(work).digest() + + # Count leading zero bits + zero_bits = 0 + for byte in hash_bytes: + if byte == 0: + zero_bits += 8 + else: + # Count leading zeros in this byte + zero_bits += (8 - byte.bit_length()) + break + + if zero_bits < difficulty: + return False, f"Insufficient work: {zero_bits} < {difficulty} bits" + + return True, "" + + def _is_valid_paste_id(paste_id: str) -> bool: """Validate paste ID format (hexadecimal, correct length).""" expected_length = current_app.config["PASTE_ID_LENGTH"] @@ -142,6 +249,23 @@ def health(): return _json_response({"status": "unhealthy", "database": "error"}, 503) +@bp.route("/challenge", methods=["GET"]) +def challenge(): + """Get a proof-of-work challenge for paste creation.""" + difficulty = current_app.config["POW_DIFFICULTY"] + if difficulty == 0: + return _json_response({"enabled": False, "difficulty": 0}) + + ch = _generate_challenge() + return _json_response({ + "enabled": True, + "nonce": ch["nonce"], + "difficulty": ch["difficulty"], + "expires": ch["expires"], + "token": f"{ch['nonce']}:{ch['expires']}:{ch['difficulty']}:{ch['signature']}", + }) + + @bp.route("/", methods=["GET", "POST"]) def index(): """Handle API info (GET) and paste creation (POST).""" @@ -190,6 +314,30 @@ def create_paste(): owner = _get_client_id() + # Verify proof-of-work (if enabled) + difficulty = current_app.config["POW_DIFFICULTY"] + if difficulty > 0: + pow_token = request.headers.get("X-PoW-Token", "") + pow_solution = request.headers.get("X-PoW-Solution", "") + + if not pow_token or not pow_solution: + return _json_response({ + "error": "Proof-of-work required", + "hint": "GET /challenge for a new challenge", + }, 400) + + # Extract nonce from token for verification + parts = pow_token.split(":") + pow_nonce = parts[0] if parts else "" + + valid, err = _verify_pow(pow_nonce, pow_token, pow_solution) + if not valid: + current_app.logger.warning( + "PoW verification failed: %s from=%s", + err, request.remote_addr + ) + return _json_response({"error": f"Proof-of-work failed: {err}"}, 400) + # Enforce size limits based on authentication content_size = len(content) if owner: diff --git a/app/config.py b/app/config.py index 15c1a28..4ea9c13 100644 --- a/app/config.py +++ b/app/config.py @@ -33,6 +33,14 @@ class Config: # X-Proxy-Secret header, providing defense-in-depth against header spoofing. TRUSTED_PROXY_SECRET = os.environ.get("FLASKPASTE_PROXY_SECRET", "") + # Proof-of-work spam prevention + # Clients must solve a computational puzzle before paste creation. + # Difficulty is number of leading zero bits required in hash (0 = disabled). + POW_DIFFICULTY = int(os.environ.get("FLASKPASTE_POW_DIFFICULTY", "16")) + POW_CHALLENGE_TTL = int(os.environ.get("FLASKPASTE_POW_TTL", "300")) # 5 minutes + # Secret key for signing challenges (auto-generated if not set) + POW_SECRET = os.environ.get("FLASKPASTE_POW_SECRET", "") + class DevelopmentConfig(Config): """Development configuration.""" @@ -56,6 +64,9 @@ class TestingConfig(Config): CONTENT_DEDUP_WINDOW = 1 CONTENT_DEDUP_MAX = 100 + # Disable PoW for most tests (easier testing) + POW_DIFFICULTY = 0 + config = { "development": DevelopmentConfig, diff --git a/fpaste b/fpaste index cc02652..02e8d0d 100755 --- a/fpaste +++ b/fpaste @@ -2,6 +2,7 @@ """FlaskPaste command-line client.""" import argparse +import hashlib import json import os import sys @@ -54,6 +55,53 @@ def die(msg, code=1): sys.exit(code) +def solve_pow(nonce, difficulty): + """Solve proof-of-work challenge. + + Find a number N such that SHA256(nonce:N) has `difficulty` leading zero bits. + """ + n = 0 + target_bytes = (difficulty + 7) // 8 # Bytes to check + + while True: + work = f"{nonce}:{n}".encode() + hash_bytes = hashlib.sha256(work).digest() + + # Count leading zero bits + zero_bits = 0 + for byte in hash_bytes[:target_bytes + 1]: + if byte == 0: + zero_bits += 8 + else: + zero_bits += (8 - byte.bit_length()) + break + + if zero_bits >= difficulty: + return n + + n += 1 + # Progress indicator for high difficulty + if n % 100000 == 0: + print(f"\rsolving pow: {n} attempts...", end="", file=sys.stderr) + + return n + + +def get_challenge(config): + """Fetch PoW challenge from server.""" + url = config["server"].rstrip("/") + "/challenge" + status, body, _ = request(url) + + if status != 200: + return None + + data = json.loads(body) + if not data.get("enabled"): + return None + + return data + + def cmd_create(args, config): """Create a new paste.""" # Read content from file or stdin @@ -78,6 +126,17 @@ def cmd_create(args, config): if config["cert_sha1"]: headers["X-SSL-Client-SHA1"] = config["cert_sha1"] + # Get and solve PoW challenge if required + challenge = get_challenge(config) + if challenge: + if not args.quiet: + print(f"solving pow (difficulty={challenge['difficulty']})...", end="", file=sys.stderr) + solution = solve_pow(challenge["nonce"], challenge["difficulty"]) + if not args.quiet: + print(f" done", file=sys.stderr) + headers["X-PoW-Token"] = challenge["token"] + headers["X-PoW-Solution"] = str(solution) + url = config["server"].rstrip("/") + "/" status, body, _ = request(url, method="POST", data=content, headers=headers) diff --git a/tests/test_pow.py b/tests/test_pow.py new file mode 100644 index 0000000..a51f426 --- /dev/null +++ b/tests/test_pow.py @@ -0,0 +1,174 @@ +"""Tests for proof-of-work system.""" + +import hashlib +import json +import time + + +class TestPowChallenge: + """Tests for GET /challenge endpoint.""" + + def test_challenge_disabled_by_default_in_testing(self, client): + """Challenge endpoint returns disabled when POW_DIFFICULTY=0.""" + response = client.get("/challenge") + assert response.status_code == 200 + data = json.loads(response.data) + assert data["enabled"] is False + assert data["difficulty"] == 0 + + def test_challenge_returns_token_when_enabled(self, app, client): + """Challenge endpoint returns valid token when enabled.""" + app.config["POW_DIFFICULTY"] = 8 + + response = client.get("/challenge") + assert response.status_code == 200 + data = json.loads(response.data) + + assert data["enabled"] is True + assert data["difficulty"] == 8 + assert "nonce" in data + assert "expires" in data + assert "token" in data + assert len(data["nonce"]) == 32 # 16 bytes hex + assert data["expires"] > int(time.time()) + + +class TestPowVerification: + """Tests for PoW verification in paste creation.""" + + def test_paste_without_pow_when_disabled(self, client, sample_text): + """Paste creation works without PoW when disabled.""" + response = client.post("/", data=sample_text, content_type="text/plain") + assert response.status_code == 201 + + def test_paste_requires_pow_when_enabled(self, app, client, sample_text): + """Paste creation fails without PoW when enabled.""" + app.config["POW_DIFFICULTY"] = 8 + + response = client.post("/", data=sample_text, content_type="text/plain") + assert response.status_code == 400 + data = json.loads(response.data) + assert "Proof-of-work required" in data["error"] + + def test_paste_with_valid_pow(self, app, client, sample_text): + """Paste creation succeeds with valid PoW.""" + app.config["POW_DIFFICULTY"] = 8 + + # Get challenge + ch_response = client.get("/challenge") + ch_data = json.loads(ch_response.data) + + # Solve PoW + nonce = ch_data["nonce"] + difficulty = ch_data["difficulty"] + solution = solve_pow(nonce, difficulty) + + # Submit with solution + response = client.post( + "/", + data=sample_text, + content_type="text/plain", + headers={ + "X-PoW-Token": ch_data["token"], + "X-PoW-Solution": str(solution), + }, + ) + assert response.status_code == 201 + + def test_paste_with_invalid_solution(self, app, client, sample_text): + """Paste creation fails with wrong solution.""" + app.config["POW_DIFFICULTY"] = 8 + + # Get challenge + ch_response = client.get("/challenge") + ch_data = json.loads(ch_response.data) + + # Submit with wrong solution + response = client.post( + "/", + data=sample_text, + content_type="text/plain", + headers={ + "X-PoW-Token": ch_data["token"], + "X-PoW-Solution": "999999999", + }, + ) + assert response.status_code == 400 + data = json.loads(response.data) + assert "Insufficient work" in data["error"] + + def test_paste_with_expired_challenge(self, app, client, sample_text): + """Paste creation fails with expired challenge.""" + app.config["POW_DIFFICULTY"] = 8 + app.config["POW_CHALLENGE_TTL"] = 1 # 1 second + + # Get challenge + ch_response = client.get("/challenge") + ch_data = json.loads(ch_response.data) + + # Solve PoW + solution = solve_pow(ch_data["nonce"], ch_data["difficulty"]) + + # Wait for expiry + time.sleep(2) + + # Submit with expired challenge + response = client.post( + "/", + data=sample_text, + content_type="text/plain", + headers={ + "X-PoW-Token": ch_data["token"], + "X-PoW-Solution": str(solution), + }, + ) + assert response.status_code == 400 + data = json.loads(response.data) + assert "expired" in data["error"].lower() + + def test_paste_with_tampered_token(self, app, client, sample_text): + """Paste creation fails with tampered token.""" + app.config["POW_DIFFICULTY"] = 8 + + # Get challenge + ch_response = client.get("/challenge") + ch_data = json.loads(ch_response.data) + + # Tamper with token (change difficulty) + parts = ch_data["token"].split(":") + parts[2] = "1" # Lower difficulty + tampered_token = ":".join(parts) + + # Submit with tampered token + response = client.post( + "/", + data=sample_text, + content_type="text/plain", + headers={ + "X-PoW-Token": tampered_token, + "X-PoW-Solution": "0", + }, + ) + assert response.status_code == 400 + data = json.loads(response.data) + assert "signature" in data["error"].lower() or "mismatch" in data["error"].lower() + + +def solve_pow(nonce, difficulty): + """Solve proof-of-work challenge (test helper).""" + n = 0 + while True: + work = f"{nonce}:{n}".encode() + hash_bytes = hashlib.sha256(work).digest() + + zero_bits = 0 + for byte in hash_bytes: + if byte == 0: + zero_bits += 8 + else: + zero_bits += (8 - byte.bit_length()) + break + + if zero_bits >= difficulty: + return n + n += 1