security: implement HASH-001 and ENUM-001 remediations

HASH-001: Add threading lock to content hash deduplication
- Prevents race condition between SELECT and UPDATE
- Ensures accurate dedup counting under concurrent load

ENUM-001: Add rate limiting to paste lookups
- Separate rate limiter for GET/HEAD on paste endpoints
- Default 60 requests/minute per IP (configurable)
- Prevents brute-force paste ID enumeration attacks
This commit is contained in:
Username
2025-12-24 23:12:28 +01:00
parent da1beca893
commit c130020ab8
5 changed files with 116 additions and 36 deletions

View File

@@ -273,6 +273,50 @@ def reset_rate_limits() -> None:
_rate_limit_requests.clear()
# ─────────────────────────────────────────────────────────────────────────────
# ENUM-001: Lookup Rate Limiting (prevents paste ID enumeration)
# ─────────────────────────────────────────────────────────────────────────────
_lookup_rate_limit_lock = threading.Lock()
_lookup_rate_limit_requests: dict[str, list[float]] = defaultdict(list)
def check_lookup_rate_limit(client_ip: str) -> tuple[bool, int]:
"""Check if lookup request is within rate limit.
Args:
client_ip: Client IP address
Returns:
Tuple of (allowed, retry_after_seconds)
"""
if not current_app.config.get("LOOKUP_RATE_LIMIT_ENABLED", True):
return True, 0
window = current_app.config.get("LOOKUP_RATE_LIMIT_WINDOW", 60)
max_requests = current_app.config.get("LOOKUP_RATE_LIMIT_MAX", 60)
now = time.time()
cutoff = now - window
with _lookup_rate_limit_lock:
requests = _lookup_rate_limit_requests[client_ip]
requests[:] = [t for t in requests if t > cutoff]
if len(requests) >= max_requests:
retry_after = int(requests[0] + window - now) + 1
return False, max(1, retry_after)
requests.append(now)
return True, 0
def reset_lookup_rate_limits() -> None:
"""Clear lookup rate limit state. For testing only."""
with _lookup_rate_limit_lock:
_lookup_rate_limit_requests.clear()
def add_rate_limit_headers(
response: Response, remaining: int, limit: int, reset_timestamp: int
) -> Response:
@@ -415,6 +459,18 @@ def validate_paste_id(paste_id: str) -> Response | None:
def fetch_paste(paste_id: str, check_password: bool = True) -> Response | None:
"""Fetch paste and store in g.paste. Returns error response or None if OK."""
# ENUM-001: Rate limit lookups to prevent enumeration attacks
client_ip = get_client_ip()
allowed, retry_after = check_lookup_rate_limit(client_ip)
if not allowed:
response = error_response(
f"Lookup rate limit exceeded. Retry after {retry_after} seconds.",
429,
retry_after=retry_after,
)
response.headers["Retry-After"] = str(retry_after)
return response
db = get_db()
now = int(time.time())