Files
flaskpaste/tests/test_rate_limiting.py
Username 1fbb69d7f9 security: implement pentest remediation (RATE-002, CLI-001)
RATE-002: Proactive rate limit cleanup when entries exceed threshold
- Add RATE_LIMIT_CLEANUP_THRESHOLD config (default 0.8)
- Trigger cleanup before hitting hard limit
- Prevents memory exhaustion under sustained load

CLI-001: Validate clipboard tool paths against trusted directories
- Add TRUSTED_CLIPBOARD_DIRS for Unix system paths
- Add TRUSTED_WINDOWS_PATTERNS for Windows validation
- Reject tools in user-writable locations (PATH hijack prevention)
- Use absolute paths in subprocess calls
2025-12-24 22:03:17 +01:00

342 lines
12 KiB
Python

"""Tests for IP-based rate limiting."""
import json
class TestRateLimiting:
"""Tests for rate limiting on paste creation."""
def test_rate_limit_allows_normal_usage(self, client, sample_text):
"""Normal usage within rate limit succeeds."""
# TestingConfig has RATE_LIMIT_MAX=100
for i in range(5):
response = client.post(
"/",
data=f"paste {i}",
content_type="text/plain",
)
assert response.status_code == 201
def test_rate_limit_exceeded_returns_429(self, client, app):
"""Exceeding rate limit returns 429."""
# Temporarily lower rate limit for test
original_max = app.config["RATE_LIMIT_MAX"]
app.config["RATE_LIMIT_MAX"] = 3
try:
# Make requests up to limit
for i in range(3):
response = client.post(
"/",
data=f"paste {i}",
content_type="text/plain",
)
assert response.status_code == 201
# Next request should be rate limited
response = client.post(
"/",
data="one more",
content_type="text/plain",
)
assert response.status_code == 429
data = json.loads(response.data)
assert "error" in data
assert "Rate limit" in data["error"]
assert "retry_after" in data
finally:
app.config["RATE_LIMIT_MAX"] = original_max
def test_rate_limit_headers(self, client, app):
"""Rate limit response includes proper headers."""
original_max = app.config["RATE_LIMIT_MAX"]
app.config["RATE_LIMIT_MAX"] = 1
try:
# First request succeeds
client.post("/", data="first", content_type="text/plain")
# Second request is rate limited
response = client.post("/", data="second", content_type="text/plain")
assert response.status_code == 429
assert "Retry-After" in response.headers
assert "X-RateLimit-Remaining" in response.headers
assert response.headers["X-RateLimit-Remaining"] == "0"
finally:
app.config["RATE_LIMIT_MAX"] = original_max
def test_rate_limit_headers_on_success(self, client, app):
"""Successful responses include rate limit headers."""
original_max = app.config["RATE_LIMIT_MAX"]
app.config["RATE_LIMIT_MAX"] = 5
try:
# First request should include rate limit headers
response = client.post("/", data="first", content_type="text/plain")
assert response.status_code == 201
# Check rate limit headers
assert "X-RateLimit-Limit" in response.headers
assert "X-RateLimit-Remaining" in response.headers
assert "X-RateLimit-Reset" in response.headers
# Verify values
assert response.headers["X-RateLimit-Limit"] == "5"
assert response.headers["X-RateLimit-Remaining"] == "4" # 5 - 1 = 4
# Reset timestamp should be a valid unix timestamp
reset = int(response.headers["X-RateLimit-Reset"])
import time
assert reset > int(time.time()) # Should be in the future
finally:
app.config["RATE_LIMIT_MAX"] = original_max
def test_rate_limit_auth_multiplier(self, client, app, auth_header):
"""Authenticated users get higher rate limits."""
original_max = app.config["RATE_LIMIT_MAX"]
original_mult = app.config["RATE_LIMIT_AUTH_MULTIPLIER"]
app.config["RATE_LIMIT_MAX"] = 2
app.config["RATE_LIMIT_AUTH_MULTIPLIER"] = 3 # 2 * 3 = 6 for auth users
try:
# Authenticated user can make more requests than base limit
for i in range(5):
response = client.post(
"/",
data=f"auth {i}",
content_type="text/plain",
headers=auth_header,
)
assert response.status_code == 201
# 6th request should succeed (limit is 2*3=6)
response = client.post(
"/",
data="auth 6",
content_type="text/plain",
headers=auth_header,
)
assert response.status_code == 201
# 7th should fail
response = client.post(
"/",
data="auth 7",
content_type="text/plain",
headers=auth_header,
)
assert response.status_code == 429
finally:
app.config["RATE_LIMIT_MAX"] = original_max
app.config["RATE_LIMIT_AUTH_MULTIPLIER"] = original_mult
def test_rate_limit_can_be_disabled(self, client, app):
"""Rate limiting can be disabled via config."""
original_enabled = app.config["RATE_LIMIT_ENABLED"]
original_max = app.config["RATE_LIMIT_MAX"]
app.config["RATE_LIMIT_ENABLED"] = False
app.config["RATE_LIMIT_MAX"] = 1
try:
# Should be able to make many requests
for i in range(5):
response = client.post(
"/",
data=f"paste {i}",
content_type="text/plain",
)
assert response.status_code == 201
finally:
app.config["RATE_LIMIT_ENABLED"] = original_enabled
app.config["RATE_LIMIT_MAX"] = original_max
def test_rate_limit_only_affects_paste_creation(self, client, app, sample_text):
"""Rate limiting only affects POST /, not GET endpoints."""
original_max = app.config["RATE_LIMIT_MAX"]
app.config["RATE_LIMIT_MAX"] = 2
try:
# Create paste
create = client.post("/", data=sample_text, content_type="text/plain")
assert create.status_code == 201
paste_id = json.loads(create.data)["id"]
# Use up rate limit
client.post("/", data="second", content_type="text/plain")
# Should be rate limited for creation
response = client.post("/", data="third", content_type="text/plain")
assert response.status_code == 429
# But GET should still work
response = client.get(f"/{paste_id}")
assert response.status_code == 200
response = client.get(f"/{paste_id}/raw")
assert response.status_code == 200
response = client.get("/health")
assert response.status_code == 200
finally:
app.config["RATE_LIMIT_MAX"] = original_max
class TestRateLimitCleanup:
"""Tests for rate limit cleanup."""
def test_rate_limit_window_expiry(self, app):
"""Rate limit cleanup works with explicit window."""
from app.api.routes import cleanup_rate_limits
# Should work without app context when window is explicit
cleaned = cleanup_rate_limits(window=60)
assert isinstance(cleaned, int)
assert cleaned >= 0
class TestRateLimitMaxEntries:
"""Tests for RATE-001: rate limit storage cap to prevent memory DoS."""
def test_max_entries_config_exists(self, app):
"""RATE_LIMIT_MAX_ENTRIES config should exist."""
assert "RATE_LIMIT_MAX_ENTRIES" in app.config
assert isinstance(app.config["RATE_LIMIT_MAX_ENTRIES"], int)
assert app.config["RATE_LIMIT_MAX_ENTRIES"] > 0
def test_rate_limit_storage_bounded(self, app):
"""Rate limit storage should not exceed max entries."""
from app.api.routes import (
_rate_limit_requests,
check_rate_limit,
reset_rate_limits,
)
# Reset state
reset_rate_limits()
# Set very low max entries for testing
original_max = app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000)
app.config["RATE_LIMIT_MAX_ENTRIES"] = 10
try:
with app.app_context():
# Add more entries than max
for i in range(20):
check_rate_limit(f"192.168.1.{i}", authenticated=False)
# Storage should be bounded
assert len(_rate_limit_requests) <= 10
finally:
app.config["RATE_LIMIT_MAX_ENTRIES"] = original_max
reset_rate_limits()
def test_prune_keeps_recent_entries(self, app):
"""Pruning should keep the most recent entries."""
import time
from app.api.routes import (
_rate_limit_requests,
check_rate_limit,
reset_rate_limits,
)
reset_rate_limits()
original_max = app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000)
app.config["RATE_LIMIT_MAX_ENTRIES"] = 5
try:
with app.app_context():
# Add entries with known order
for i in range(3):
check_rate_limit(f"old-{i}.0.0.1", authenticated=False)
time.sleep(0.01) # Ensure distinct timestamps
# Add more recent entries
for i in range(3):
check_rate_limit(f"new-{i}.0.0.1", authenticated=False)
time.sleep(0.01)
# When we exceed max, old entries should be pruned
# Storage should be bounded
assert len(_rate_limit_requests) <= 5
# Most recent should still be present
assert any("new" in ip for ip in _rate_limit_requests)
finally:
app.config["RATE_LIMIT_MAX_ENTRIES"] = original_max
reset_rate_limits()
class TestRateLimitCleanupThreshold:
"""Tests for RATE-002: automatic cleanup trigger when threshold exceeded."""
def test_cleanup_threshold_config_exists(self, app):
"""RATE_LIMIT_CLEANUP_THRESHOLD config should exist."""
assert "RATE_LIMIT_CLEANUP_THRESHOLD" in app.config
threshold = app.config["RATE_LIMIT_CLEANUP_THRESHOLD"]
assert isinstance(threshold, float)
assert 0.0 < threshold <= 1.0
def test_proactive_cleanup_at_threshold(self, app):
"""Cleanup triggers proactively when threshold exceeded."""
from app.api.routes import (
_rate_limit_requests,
check_rate_limit,
reset_rate_limits,
)
reset_rate_limits()
original_max = app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000)
original_threshold = app.config.get("RATE_LIMIT_CLEANUP_THRESHOLD", 0.8)
# Set max to 10, threshold to 0.5 (cleanup at 5 entries)
app.config["RATE_LIMIT_MAX_ENTRIES"] = 10
app.config["RATE_LIMIT_CLEANUP_THRESHOLD"] = 0.5
try:
with app.app_context():
# Add 6 entries (exceeds 50% threshold of 10)
for i in range(6):
check_rate_limit(f"10.0.0.{i}", authenticated=False)
# Threshold cleanup should have triggered, reducing count
# Cleanup prunes to threshold/2 = 2-3 entries
assert len(_rate_limit_requests) < 6
finally:
app.config["RATE_LIMIT_MAX_ENTRIES"] = original_max
app.config["RATE_LIMIT_CLEANUP_THRESHOLD"] = original_threshold
reset_rate_limits()
def test_no_cleanup_below_threshold(self, app):
"""No cleanup when below threshold."""
from app.api.routes import (
_rate_limit_requests,
check_rate_limit,
reset_rate_limits,
)
reset_rate_limits()
original_max = app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000)
original_threshold = app.config.get("RATE_LIMIT_CLEANUP_THRESHOLD", 0.8)
# Set max to 20, threshold to 0.8 (cleanup at 16 entries)
app.config["RATE_LIMIT_MAX_ENTRIES"] = 20
app.config["RATE_LIMIT_CLEANUP_THRESHOLD"] = 0.8
try:
with app.app_context():
# Add 5 entries (below 80% threshold of 20 = 16)
for i in range(5):
check_rate_limit(f"10.0.0.{i}", authenticated=False)
# All 5 entries should remain (no cleanup triggered)
assert len(_rate_limit_requests) == 5
finally:
app.config["RATE_LIMIT_MAX_ENTRIES"] = original_max
app.config["RATE_LIMIT_CLEANUP_THRESHOLD"] = original_threshold
reset_rate_limits()