add anti-flood: dynamic PoW difficulty under load

When paste creation rate exceeds threshold, PoW difficulty
increases to slow down attackers. Decays back to base when
abuse stops.

Config:
- ANTIFLOOD_THRESHOLD: requests/window before increase (30)
- ANTIFLOOD_STEP: difficulty bits per step (2)
- ANTIFLOOD_MAX: maximum difficulty cap (28)
- ANTIFLOOD_DECAY: seconds before reducing (30)
This commit is contained in:
Username
2025-12-20 20:45:58 +01:00
parent a6812af027
commit 45712ea93f
2 changed files with 115 additions and 22 deletions

View File

@@ -52,6 +52,72 @@ GENERIC_MIME_TYPES = frozenset(
# Runtime PoW secret cache
_pow_secret_cache: bytes | None = None
# ─────────────────────────────────────────────────────────────────────────────
# Anti-flood: dynamic PoW difficulty adjustment
# ─────────────────────────────────────────────────────────────────────────────
_antiflood_lock = threading.Lock()
_antiflood_requests: list[float] = [] # Global request timestamps
_antiflood_difficulty: int = 0 # Current difficulty boost (added to base)
_antiflood_last_increase: float = 0 # Last time difficulty was increased
def get_dynamic_difficulty() -> int:
"""Get current PoW difficulty including anti-flood adjustment."""
base = current_app.config["POW_DIFFICULTY"]
if base == 0 or not current_app.config.get("ANTIFLOOD_ENABLED", True):
return base
with _antiflood_lock:
return min(base + _antiflood_difficulty, current_app.config["ANTIFLOOD_MAX"])
def record_antiflood_request() -> None:
"""Record a request for anti-flood tracking and adjust difficulty."""
if not current_app.config.get("ANTIFLOOD_ENABLED", True):
return
if current_app.config["POW_DIFFICULTY"] == 0:
return
global _antiflood_difficulty, _antiflood_last_increase
now = time.time()
window = current_app.config["ANTIFLOOD_WINDOW"]
threshold = current_app.config["ANTIFLOOD_THRESHOLD"]
step = current_app.config["ANTIFLOOD_STEP"]
max_diff = current_app.config["ANTIFLOOD_MAX"]
decay = current_app.config["ANTIFLOOD_DECAY"]
base = current_app.config["POW_DIFFICULTY"]
with _antiflood_lock:
# Clean old requests
cutoff = now - window
_antiflood_requests[:] = [t for t in _antiflood_requests if t > cutoff]
# Record this request
_antiflood_requests.append(now)
count = len(_antiflood_requests)
# Check if we should increase difficulty
if count > threshold:
# Increase difficulty if not already at max
if base + _antiflood_difficulty < max_diff:
_antiflood_difficulty += step
_antiflood_last_increase = now
elif _antiflood_difficulty > 0 and (now - _antiflood_last_increase) > decay:
# Decay difficulty if abuse has stopped
_antiflood_difficulty = max(0, _antiflood_difficulty - step)
_antiflood_last_increase = now # Reset timer
def reset_antiflood() -> None:
"""Reset anti-flood state (for testing)."""
global _antiflood_difficulty, _antiflood_last_increase
with _antiflood_lock:
_antiflood_requests.clear()
_antiflood_difficulty = 0
_antiflood_last_increase = 0
# ─────────────────────────────────────────────────────────────────────────────
# Rate Limiting (in-memory sliding window)
# ─────────────────────────────────────────────────────────────────────────────
@@ -299,8 +365,11 @@ def get_pow_secret() -> bytes:
def generate_challenge() -> dict[str, Any]:
"""Generate new PoW challenge with signed token."""
difficulty = current_app.config["POW_DIFFICULTY"]
"""Generate new PoW challenge with signed token.
Uses dynamic difficulty which may be elevated during high load.
"""
difficulty = get_dynamic_difficulty()
ttl = current_app.config["POW_CHALLENGE_TTL"]
expires = int(time.time()) + ttl
nonce = secrets.token_hex(16)
@@ -317,9 +386,13 @@ def generate_challenge() -> dict[str, Any]:
def verify_pow(token: str, solution: str) -> tuple[bool, str]:
"""Verify proof-of-work solution. Returns (valid, error_message)."""
difficulty = current_app.config["POW_DIFFICULTY"]
if difficulty == 0:
"""Verify proof-of-work solution. Returns (valid, error_message).
Accepts tokens with difficulty >= base. The solution must meet the
token's embedded difficulty (which may be elevated due to anti-flood).
"""
base_difficulty = current_app.config["POW_DIFFICULTY"]
if base_difficulty == 0:
return True, ""
# Parse token
@@ -339,11 +412,13 @@ def verify_pow(token: str, solution: str) -> tuple[bool, str]:
if not hmac.compare_digest(sig, expected_sig):
return False, "Invalid challenge signature"
# Check expiry and difficulty
# Check expiry
if int(time.time()) > expires:
return False, "Challenge expired"
if token_diff != difficulty:
return False, "Difficulty mismatch"
# Token difficulty must be at least base (anti-flood may have raised it)
if token_diff < base_difficulty:
return False, "Difficulty too low"
# Verify solution
try:
@@ -353,7 +428,7 @@ def verify_pow(token: str, solution: str) -> tuple[bool, str]:
except (ValueError, TypeError):
return False, "Invalid solution"
# Check hash meets difficulty
# Check hash meets the token's difficulty (not current dynamic difficulty)
work = f"{nonce}:{solution}".encode()
hash_bytes = hashlib.sha256(work).digest()
@@ -365,8 +440,8 @@ def verify_pow(token: str, solution: str) -> tuple[bool, str]:
zero_bits += 8 - byte.bit_length()
break
if zero_bits < difficulty:
return False, f"Insufficient work: {zero_bits} < {difficulty} bits"
if zero_bits < token_diff:
return False, f"Insufficient work: {zero_bits} < {token_diff} bits"
return True, ""
@@ -688,6 +763,9 @@ class IndexView(MethodView):
if password_hash:
response_data["password_protected"] = True
# Record successful paste for anti-flood tracking
record_antiflood_request()
return json_response(response_data, 201)
@@ -709,20 +787,23 @@ class ChallengeView(MethodView):
def get(self) -> Response:
"""Generate and return PoW challenge."""
difficulty = current_app.config["POW_DIFFICULTY"]
if difficulty == 0:
base_difficulty = current_app.config["POW_DIFFICULTY"]
if base_difficulty == 0:
return json_response({"enabled": False, "difficulty": 0})
ch = generate_challenge()
return json_response(
{
response = {
"enabled": True,
"nonce": ch["nonce"],
"difficulty": ch["difficulty"],
"expires": ch["expires"],
"token": ch["token"],
}
)
# Indicate if difficulty is elevated due to anti-flood
if ch["difficulty"] > base_difficulty:
response["elevated"] = True
response["base_difficulty"] = base_difficulty
return json_response(response)
class ClientView(MethodView):

View File

@@ -64,6 +64,18 @@ class Config:
# Secret key for signing challenges (auto-generated if not set)
POW_SECRET = os.environ.get("FLASKPASTE_POW_SECRET", "")
# Anti-flood: dynamically increase PoW difficulty under load
ANTIFLOOD_ENABLED = os.environ.get("FLASKPASTE_ANTIFLOOD", "1").lower() in (
"1",
"true",
"yes",
)
ANTIFLOOD_WINDOW = int(os.environ.get("FLASKPASTE_ANTIFLOOD_WINDOW", "60")) # seconds
ANTIFLOOD_THRESHOLD = int(os.environ.get("FLASKPASTE_ANTIFLOOD_THRESHOLD", "30")) # req/window
ANTIFLOOD_STEP = int(os.environ.get("FLASKPASTE_ANTIFLOOD_STEP", "2")) # bits per step
ANTIFLOOD_MAX = int(os.environ.get("FLASKPASTE_ANTIFLOOD_MAX", "28")) # max difficulty
ANTIFLOOD_DECAY = int(os.environ.get("FLASKPASTE_ANTIFLOOD_DECAY", "30")) # seconds to decay
# URL prefix for reverse proxy deployments (e.g., "/paste" for mymx.me/paste)
URL_PREFIX = os.environ.get("FLASKPASTE_URL_PREFIX", "").rstrip("/")