add proof-of-work spam prevention

Clients must solve a SHA256 hash puzzle before paste creation.
Configurable via FLASKPASTE_POW_DIFFICULTY (0 = disabled, 16 = default).
Challenge tokens expire after FLASKPASTE_POW_TTL seconds (default 300).
This commit is contained in:
Username
2025-12-20 04:03:59 +01:00
parent 682df17257
commit 8fdeeaed9c
4 changed files with 392 additions and 0 deletions

View File

@@ -3,7 +3,9 @@
import hashlib
import hmac
import json
import os
import re
import secrets
import time
from flask import Response, current_app, request
@@ -14,6 +16,9 @@ from app.database import check_content_hash, get_db
# Valid paste ID pattern (hexadecimal only)
PASTE_ID_PATTERN = re.compile(r"^[a-f0-9]+$")
# Runtime-generated PoW secret (used if not configured)
_pow_secret_cache = None
# Valid client certificate SHA1 pattern (40 hex chars)
CLIENT_ID_PATTERN = re.compile(r"^[a-f0-9]{40}$")
@@ -30,6 +35,108 @@ MAGIC_SIGNATURES = {
}
def _get_pow_secret() -> bytes:
"""Get or generate the PoW signing secret."""
global _pow_secret_cache
configured = current_app.config.get("POW_SECRET", "")
if configured:
return configured.encode()
if _pow_secret_cache is None:
_pow_secret_cache = secrets.token_bytes(32)
return _pow_secret_cache
def _generate_challenge() -> dict:
"""Generate a new PoW challenge."""
difficulty = current_app.config["POW_DIFFICULTY"]
ttl = current_app.config["POW_CHALLENGE_TTL"]
expires = int(time.time()) + ttl
nonce = secrets.token_hex(16)
# Sign the challenge to prevent tampering
msg = f"{nonce}:{expires}:{difficulty}".encode()
sig = hmac.new(_get_pow_secret(), msg, hashlib.sha256).hexdigest()
return {
"nonce": nonce,
"difficulty": difficulty,
"expires": expires,
"signature": sig,
}
def _verify_pow(challenge: str, nonce: str, solution: str) -> tuple[bool, str]:
"""Verify a proof-of-work solution.
Args:
challenge: The challenge nonce from /challenge
nonce: Combined "nonce:expires:difficulty:signature" string
solution: The solution number found by client
Returns:
Tuple of (valid, error_message)
"""
difficulty = current_app.config["POW_DIFFICULTY"]
# PoW disabled
if difficulty == 0:
return True, ""
# Parse challenge components
try:
parts = nonce.split(":")
if len(parts) != 4:
return False, "Invalid challenge format"
ch_nonce, ch_expires, ch_difficulty, ch_sig = parts
ch_expires = int(ch_expires)
ch_difficulty = int(ch_difficulty)
except (ValueError, TypeError):
return False, "Invalid challenge format"
# Verify signature
msg = f"{ch_nonce}:{ch_expires}:{ch_difficulty}".encode()
expected_sig = hmac.new(_get_pow_secret(), msg, hashlib.sha256).hexdigest()
if not hmac.compare_digest(ch_sig, expected_sig):
return False, "Invalid challenge signature"
# Check expiry
if int(time.time()) > ch_expires:
return False, "Challenge expired"
# Verify difficulty matches current config
if ch_difficulty != difficulty:
return False, "Difficulty mismatch"
# Verify solution
try:
solution_int = int(solution)
if solution_int < 0:
return False, "Invalid solution"
except (ValueError, TypeError):
return False, "Invalid solution"
# Check hash meets difficulty requirement
work = f"{ch_nonce}:{solution}".encode()
hash_bytes = hashlib.sha256(work).digest()
# Count leading zero bits
zero_bits = 0
for byte in hash_bytes:
if byte == 0:
zero_bits += 8
else:
# Count leading zeros in this byte
zero_bits += (8 - byte.bit_length())
break
if zero_bits < difficulty:
return False, f"Insufficient work: {zero_bits} < {difficulty} bits"
return True, ""
def _is_valid_paste_id(paste_id: str) -> bool:
"""Validate paste ID format (hexadecimal, correct length)."""
expected_length = current_app.config["PASTE_ID_LENGTH"]
@@ -142,6 +249,23 @@ def health():
return _json_response({"status": "unhealthy", "database": "error"}, 503)
@bp.route("/challenge", methods=["GET"])
def challenge():
"""Get a proof-of-work challenge for paste creation."""
difficulty = current_app.config["POW_DIFFICULTY"]
if difficulty == 0:
return _json_response({"enabled": False, "difficulty": 0})
ch = _generate_challenge()
return _json_response({
"enabled": True,
"nonce": ch["nonce"],
"difficulty": ch["difficulty"],
"expires": ch["expires"],
"token": f"{ch['nonce']}:{ch['expires']}:{ch['difficulty']}:{ch['signature']}",
})
@bp.route("/", methods=["GET", "POST"])
def index():
"""Handle API info (GET) and paste creation (POST)."""
@@ -190,6 +314,30 @@ def create_paste():
owner = _get_client_id()
# Verify proof-of-work (if enabled)
difficulty = current_app.config["POW_DIFFICULTY"]
if difficulty > 0:
pow_token = request.headers.get("X-PoW-Token", "")
pow_solution = request.headers.get("X-PoW-Solution", "")
if not pow_token or not pow_solution:
return _json_response({
"error": "Proof-of-work required",
"hint": "GET /challenge for a new challenge",
}, 400)
# Extract nonce from token for verification
parts = pow_token.split(":")
pow_nonce = parts[0] if parts else ""
valid, err = _verify_pow(pow_nonce, pow_token, pow_solution)
if not valid:
current_app.logger.warning(
"PoW verification failed: %s from=%s",
err, request.remote_addr
)
return _json_response({"error": f"Proof-of-work failed: {err}"}, 400)
# Enforce size limits based on authentication
content_size = len(content)
if owner:

View File

@@ -33,6 +33,14 @@ class Config:
# X-Proxy-Secret header, providing defense-in-depth against header spoofing.
TRUSTED_PROXY_SECRET = os.environ.get("FLASKPASTE_PROXY_SECRET", "")
# Proof-of-work spam prevention
# Clients must solve a computational puzzle before paste creation.
# Difficulty is number of leading zero bits required in hash (0 = disabled).
POW_DIFFICULTY = int(os.environ.get("FLASKPASTE_POW_DIFFICULTY", "16"))
POW_CHALLENGE_TTL = int(os.environ.get("FLASKPASTE_POW_TTL", "300")) # 5 minutes
# Secret key for signing challenges (auto-generated if not set)
POW_SECRET = os.environ.get("FLASKPASTE_POW_SECRET", "")
class DevelopmentConfig(Config):
"""Development configuration."""
@@ -56,6 +64,9 @@ class TestingConfig(Config):
CONTENT_DEDUP_WINDOW = 1
CONTENT_DEDUP_MAX = 100
# Disable PoW for most tests (easier testing)
POW_DIFFICULTY = 0
config = {
"development": DevelopmentConfig,