From 45712ea93f33b09d8039a6c31d53cb9b620dde6d Mon Sep 17 00:00:00 2001 From: Username Date: Sat, 20 Dec 2025 20:45:58 +0100 Subject: [PATCH] add anti-flood: dynamic PoW difficulty under load When paste creation rate exceeds threshold, PoW difficulty increases to slow down attackers. Decays back to base when abuse stops. Config: - ANTIFLOOD_THRESHOLD: requests/window before increase (30) - ANTIFLOOD_STEP: difficulty bits per step (2) - ANTIFLOOD_MAX: maximum difficulty cap (28) - ANTIFLOOD_DECAY: seconds before reducing (30) --- app/api/routes.py | 125 ++++++++++++++++++++++++++++++++++++++-------- app/config.py | 12 +++++ 2 files changed, 115 insertions(+), 22 deletions(-) diff --git a/app/api/routes.py b/app/api/routes.py index 807e0cf..63bb13d 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -52,6 +52,72 @@ GENERIC_MIME_TYPES = frozenset( # Runtime PoW secret cache _pow_secret_cache: bytes | None = None +# ───────────────────────────────────────────────────────────────────────────── +# Anti-flood: dynamic PoW difficulty adjustment +# ───────────────────────────────────────────────────────────────────────────── + +_antiflood_lock = threading.Lock() +_antiflood_requests: list[float] = [] # Global request timestamps +_antiflood_difficulty: int = 0 # Current difficulty boost (added to base) +_antiflood_last_increase: float = 0 # Last time difficulty was increased + + +def get_dynamic_difficulty() -> int: + """Get current PoW difficulty including anti-flood adjustment.""" + base = current_app.config["POW_DIFFICULTY"] + if base == 0 or not current_app.config.get("ANTIFLOOD_ENABLED", True): + return base + with _antiflood_lock: + return min(base + _antiflood_difficulty, current_app.config["ANTIFLOOD_MAX"]) + + +def record_antiflood_request() -> None: + """Record a request for anti-flood tracking and adjust difficulty.""" + if not current_app.config.get("ANTIFLOOD_ENABLED", True): + return + if current_app.config["POW_DIFFICULTY"] == 0: + return + + global _antiflood_difficulty, _antiflood_last_increase + + now = time.time() + window = current_app.config["ANTIFLOOD_WINDOW"] + threshold = current_app.config["ANTIFLOOD_THRESHOLD"] + step = current_app.config["ANTIFLOOD_STEP"] + max_diff = current_app.config["ANTIFLOOD_MAX"] + decay = current_app.config["ANTIFLOOD_DECAY"] + base = current_app.config["POW_DIFFICULTY"] + + with _antiflood_lock: + # Clean old requests + cutoff = now - window + _antiflood_requests[:] = [t for t in _antiflood_requests if t > cutoff] + + # Record this request + _antiflood_requests.append(now) + count = len(_antiflood_requests) + + # Check if we should increase difficulty + if count > threshold: + # Increase difficulty if not already at max + if base + _antiflood_difficulty < max_diff: + _antiflood_difficulty += step + _antiflood_last_increase = now + elif _antiflood_difficulty > 0 and (now - _antiflood_last_increase) > decay: + # Decay difficulty if abuse has stopped + _antiflood_difficulty = max(0, _antiflood_difficulty - step) + _antiflood_last_increase = now # Reset timer + + +def reset_antiflood() -> None: + """Reset anti-flood state (for testing).""" + global _antiflood_difficulty, _antiflood_last_increase + with _antiflood_lock: + _antiflood_requests.clear() + _antiflood_difficulty = 0 + _antiflood_last_increase = 0 + + # ───────────────────────────────────────────────────────────────────────────── # Rate Limiting (in-memory sliding window) # ───────────────────────────────────────────────────────────────────────────── @@ -299,8 +365,11 @@ def get_pow_secret() -> bytes: def generate_challenge() -> dict[str, Any]: - """Generate new PoW challenge with signed token.""" - difficulty = current_app.config["POW_DIFFICULTY"] + """Generate new PoW challenge with signed token. + + Uses dynamic difficulty which may be elevated during high load. + """ + difficulty = get_dynamic_difficulty() ttl = current_app.config["POW_CHALLENGE_TTL"] expires = int(time.time()) + ttl nonce = secrets.token_hex(16) @@ -317,9 +386,13 @@ def generate_challenge() -> dict[str, Any]: def verify_pow(token: str, solution: str) -> tuple[bool, str]: - """Verify proof-of-work solution. Returns (valid, error_message).""" - difficulty = current_app.config["POW_DIFFICULTY"] - if difficulty == 0: + """Verify proof-of-work solution. Returns (valid, error_message). + + Accepts tokens with difficulty >= base. The solution must meet the + token's embedded difficulty (which may be elevated due to anti-flood). + """ + base_difficulty = current_app.config["POW_DIFFICULTY"] + if base_difficulty == 0: return True, "" # Parse token @@ -339,11 +412,13 @@ def verify_pow(token: str, solution: str) -> tuple[bool, str]: if not hmac.compare_digest(sig, expected_sig): return False, "Invalid challenge signature" - # Check expiry and difficulty + # Check expiry if int(time.time()) > expires: return False, "Challenge expired" - if token_diff != difficulty: - return False, "Difficulty mismatch" + + # Token difficulty must be at least base (anti-flood may have raised it) + if token_diff < base_difficulty: + return False, "Difficulty too low" # Verify solution try: @@ -353,7 +428,7 @@ def verify_pow(token: str, solution: str) -> tuple[bool, str]: except (ValueError, TypeError): return False, "Invalid solution" - # Check hash meets difficulty + # Check hash meets the token's difficulty (not current dynamic difficulty) work = f"{nonce}:{solution}".encode() hash_bytes = hashlib.sha256(work).digest() @@ -365,8 +440,8 @@ def verify_pow(token: str, solution: str) -> tuple[bool, str]: zero_bits += 8 - byte.bit_length() break - if zero_bits < difficulty: - return False, f"Insufficient work: {zero_bits} < {difficulty} bits" + if zero_bits < token_diff: + return False, f"Insufficient work: {zero_bits} < {token_diff} bits" return True, "" @@ -688,6 +763,9 @@ class IndexView(MethodView): if password_hash: response_data["password_protected"] = True + # Record successful paste for anti-flood tracking + record_antiflood_request() + return json_response(response_data, 201) @@ -709,20 +787,23 @@ class ChallengeView(MethodView): def get(self) -> Response: """Generate and return PoW challenge.""" - difficulty = current_app.config["POW_DIFFICULTY"] - if difficulty == 0: + base_difficulty = current_app.config["POW_DIFFICULTY"] + if base_difficulty == 0: return json_response({"enabled": False, "difficulty": 0}) ch = generate_challenge() - return json_response( - { - "enabled": True, - "nonce": ch["nonce"], - "difficulty": ch["difficulty"], - "expires": ch["expires"], - "token": ch["token"], - } - ) + response = { + "enabled": True, + "nonce": ch["nonce"], + "difficulty": ch["difficulty"], + "expires": ch["expires"], + "token": ch["token"], + } + # Indicate if difficulty is elevated due to anti-flood + if ch["difficulty"] > base_difficulty: + response["elevated"] = True + response["base_difficulty"] = base_difficulty + return json_response(response) class ClientView(MethodView): diff --git a/app/config.py b/app/config.py index 62d5031..31fb2d7 100644 --- a/app/config.py +++ b/app/config.py @@ -64,6 +64,18 @@ class Config: # Secret key for signing challenges (auto-generated if not set) POW_SECRET = os.environ.get("FLASKPASTE_POW_SECRET", "") + # Anti-flood: dynamically increase PoW difficulty under load + ANTIFLOOD_ENABLED = os.environ.get("FLASKPASTE_ANTIFLOOD", "1").lower() in ( + "1", + "true", + "yes", + ) + ANTIFLOOD_WINDOW = int(os.environ.get("FLASKPASTE_ANTIFLOOD_WINDOW", "60")) # seconds + ANTIFLOOD_THRESHOLD = int(os.environ.get("FLASKPASTE_ANTIFLOOD_THRESHOLD", "30")) # req/window + ANTIFLOOD_STEP = int(os.environ.get("FLASKPASTE_ANTIFLOOD_STEP", "2")) # bits per step + ANTIFLOOD_MAX = int(os.environ.get("FLASKPASTE_ANTIFLOOD_MAX", "28")) # max difficulty + ANTIFLOOD_DECAY = int(os.environ.get("FLASKPASTE_ANTIFLOOD_DECAY", "30")) # seconds to decay + # URL prefix for reverse proxy deployments (e.g., "/paste" for mymx.me/paste) URL_PREFIX = os.environ.get("FLASKPASTE_URL_PREFIX", "").rstrip("/")