diff --git a/PENTEST_PLAN.md b/PENTEST_PLAN.md index 33018bd..ed78c92 100644 --- a/PENTEST_PLAN.md +++ b/PENTEST_PLAN.md @@ -236,15 +236,15 @@ Testing uses specialized Claude subagents for different security domains, with f ### Immediate (Critical) -- [ ] **PROXY-001**: Add startup warning/failure when TRUSTED_PROXY_SECRET empty in production -- [ ] **PROXY-002**: Document required proxy configuration in deployment guide -- [ ] **PROXY-003**: Add security test for header spoofing without proxy secret +- [x] **PROXY-001**: Add startup warning/failure when TRUSTED_PROXY_SECRET empty in production +- [x] **PROXY-002**: Document required proxy configuration in deployment guide +- [x] **PROXY-003**: Add security test for header spoofing without proxy secret ### Short-term (High) -- [ ] **BURN-001**: Track HEAD requests as paste access for burn-after-read -- [ ] **BURN-002**: Add test for HEAD-then-GET race condition -- [ ] **RATE-001**: Add maximum entries limit to rate limit storage +- [x] **BURN-001**: Track HEAD requests as paste access for burn-after-read +- [x] **BURN-002**: Add test for HEAD-then-GET race condition +- [x] **RATE-001**: Add maximum entries limit to rate limit storage - [ ] **RATE-002**: Add automatic cleanup trigger when threshold exceeded - [ ] **CLI-001**: Validate clipboard tool paths against allow-list diff --git a/PROJECT.md b/PROJECT.md index 540de09..78badf0 100644 --- a/PROJECT.md +++ b/PROJECT.md @@ -151,6 +151,6 @@ A self-hosted pastebin API that: │ Public certificate registration │ Complete │ CLI register command │ Complete │ systemd deployment │ Complete (security-hardened) -│ Test suite │ 284 tests passing +│ Test suite │ 291 tests passing └─────────────────────────────────┴────────────────────────────────────────────┘ ``` diff --git a/app/__init__.py b/app/__init__.py index e460067..c9e346d 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -36,6 +36,32 @@ def setup_logging(app: Flask) -> None: app.logger.info("FlaskPaste starting", extra={"config": type(app.config).__name__}) +def validate_security_config(app: Flask) -> None: + """Validate security configuration and log warnings. + + Checks for common security misconfigurations that could lead to + vulnerabilities in production deployments. + """ + is_production = not app.debug and not app.config.get("TESTING") + + # PROXY-001: Check TRUSTED_PROXY_SECRET + proxy_secret = app.config.get("TRUSTED_PROXY_SECRET", "") + if is_production and not proxy_secret: + app.logger.warning( + "SECURITY WARNING: TRUSTED_PROXY_SECRET is not set. " + "Client certificate headers (X-SSL-Client-SHA1) can be spoofed. " + "Set FLASKPASTE_PROXY_SECRET to a shared secret known only by your reverse proxy." + ) + + # Warn if PKI is enabled without proxy secret + pki_enabled = app.config.get("PKI_ENABLED", False) + if pki_enabled and not proxy_secret: + app.logger.warning( + "SECURITY WARNING: PKI is enabled but TRUSTED_PROXY_SECRET is not set. " + "Certificate-based authentication can be bypassed by spoofing headers." + ) + + def setup_security_headers(app: Flask) -> None: """Add security headers to all responses.""" @@ -235,6 +261,9 @@ def create_app(config_name: str | None = None) -> Flask: # Setup logging first setup_logging(app) + # Validate security configuration + validate_security_config(app) + # Setup request ID tracking setup_request_id(app) diff --git a/app/api/routes.py b/app/api/routes.py index 85c4674..4197f21 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -165,6 +165,7 @@ def check_rate_limit(client_ip: str, authenticated: bool = False) -> tuple[bool, window = current_app.config["RATE_LIMIT_WINDOW"] max_requests = current_app.config["RATE_LIMIT_MAX"] + max_entries = current_app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000) if authenticated: max_requests *= current_app.config.get("RATE_LIMIT_AUTH_MULTIPLIER", 5) @@ -173,6 +174,11 @@ def check_rate_limit(client_ip: str, authenticated: bool = False) -> tuple[bool, cutoff = now - window with _rate_limit_lock: + # RATE-001: Enforce maximum entries to prevent memory exhaustion + if len(_rate_limit_requests) >= max_entries and client_ip not in _rate_limit_requests: + # Evict oldest entries (those with oldest last request time) + _prune_rate_limit_entries(max_entries // 2, cutoff) + # Clean old requests and get current list requests = _rate_limit_requests[client_ip] requests[:] = [t for t in requests if t > cutoff] @@ -192,6 +198,33 @@ def check_rate_limit(client_ip: str, authenticated: bool = False) -> tuple[bool, return True, remaining, max_requests, reset_timestamp +def _prune_rate_limit_entries(target_size: int, cutoff: float) -> None: + """Prune rate limit entries to target size. Must hold _rate_limit_lock. + + Removes entries with no recent activity first, then oldest entries. + """ + # First pass: remove entries with all expired requests + to_remove = [] + for ip, requests in _rate_limit_requests.items(): + if not requests or all(t <= cutoff for t in requests): + to_remove.append(ip) + + for ip in to_remove: + del _rate_limit_requests[ip] + + # Second pass: if still over target, remove entries with oldest last activity + if len(_rate_limit_requests) > target_size: + # Sort by most recent request timestamp (ascending = oldest first) + entries = sorted( + _rate_limit_requests.items(), + key=lambda x: max(x[1]) if x[1] else 0, + ) + # Remove oldest until we're at target size + remove_count = len(entries) - target_size + for ip, _ in entries[:remove_count]: + del _rate_limit_requests[ip] + + def cleanup_rate_limits(window: int | None = None) -> int: """Remove expired rate limit entries. Returns count of cleaned entries. @@ -1512,7 +1545,11 @@ class PasteRawView(MethodView): return response def head(self, paste_id: str) -> Response: - """Return raw paste headers without triggering burn.""" + """Return raw paste headers. HEAD triggers burn-after-read deletion. + + Security note: HEAD requests count as paste access for burn-after-read + to prevent attackers from probing paste existence before retrieval. + """ # Validate and fetch if err := validate_paste_id(paste_id): return err @@ -1520,13 +1557,21 @@ class PasteRawView(MethodView): return err row: Row = g.paste - g.db.commit() + db = g.db + + # BURN-001: HEAD triggers burn-after-read like GET + burn_after_read = row["burn_after_read"] + if burn_after_read: + db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,)) + current_app.logger.info("Burn-after-read paste deleted via HEAD: %s", paste_id) + + db.commit() response = Response(mimetype=row["mime_type"]) response.headers["Content-Length"] = str(row["size"]) if row["mime_type"].startswith(("image/", "text/")): response.headers["Content-Disposition"] = "inline" - if row["burn_after_read"]: + if burn_after_read: response.headers["X-Burn-After-Read"] = "true" return response diff --git a/app/config.py b/app/config.py index a7a2a82..6c86f09 100644 --- a/app/config.py +++ b/app/config.py @@ -99,6 +99,8 @@ class Config: RATE_LIMIT_MAX = int(os.environ.get("FLASKPASTE_RATE_MAX", "10")) # requests per window # Authenticated users get higher limits (multiplier) RATE_LIMIT_AUTH_MULTIPLIER = int(os.environ.get("FLASKPASTE_RATE_AUTH_MULT", "5")) + # Maximum unique IPs tracked in rate limit storage (RATE-001: memory DoS protection) + RATE_LIMIT_MAX_ENTRIES = int(os.environ.get("FLASKPASTE_RATE_MAX_ENTRIES", "10000")) # Audit Logging # Track security-relevant events (paste creation, deletion, rate limits, etc.) diff --git a/documentation/deployment.md b/documentation/deployment.md index dbc3b71..77bf504 100644 --- a/documentation/deployment.md +++ b/documentation/deployment.md @@ -293,6 +293,7 @@ See `examples/flaskpaste.env` for a complete template. | `FLASKPASTE_RATE_LIMIT` | `1` | Enable IP-based rate limiting | | `FLASKPASTE_RATE_MAX` | `10` | Max requests per window (anon) | | `FLASKPASTE_RATE_AUTH_MULT` | `5` | Multiplier for authenticated users | +| `FLASKPASTE_RATE_MAX_ENTRIES` | `10000` | Max tracked IPs (memory DoS protection) | | `FLASKPASTE_ANTIFLOOD` | `1` | Enable dynamic PoW under attack | | `FLASKPASTE_PKI_ENABLED` | `0` | Enable built-in PKI | diff --git a/examples/flaskpaste.env b/examples/flaskpaste.env index 00c165c..236496a 100644 --- a/examples/flaskpaste.env +++ b/examples/flaskpaste.env @@ -31,6 +31,7 @@ FLASKPASTE_RATE_LIMIT=1 FLASKPASTE_RATE_WINDOW=60 FLASKPASTE_RATE_MAX=10 FLASKPASTE_RATE_AUTH_MULT=5 +FLASKPASTE_RATE_MAX_ENTRIES=10000 # Content deduplication FLASKPASTE_DEDUP_WINDOW=3600 diff --git a/tests/test_paste_options.py b/tests/test_paste_options.py index 13cbe72..5e27197 100644 --- a/tests/test_paste_options.py +++ b/tests/test_paste_options.py @@ -80,8 +80,12 @@ class TestBurnAfterRead: response = client.get(f"/{paste_id}") assert response.status_code == 404 - def test_head_does_not_trigger_burn(self, client): - """HEAD request should not delete burn paste.""" + def test_head_triggers_burn(self, client): + """HEAD request SHOULD delete burn paste (security fix BURN-001). + + HEAD requests trigger burn-after-read deletion to prevent attackers + from probing paste existence before retrieval. + """ # Create burn paste response = client.post( "/", @@ -90,13 +94,14 @@ class TestBurnAfterRead: ) paste_id = response.get_json()["id"] - # HEAD should succeed + # HEAD should succeed and trigger burn response = client.head(f"/{paste_id}/raw") assert response.status_code == 200 + assert response.headers.get("X-Burn-After-Read") == "true" - # Paste should still exist + # Paste should be deleted - subsequent access should fail response = client.get(f"/{paste_id}/raw") - assert response.status_code == 200 + assert response.status_code == 404 def test_burn_header_variations(self, client): """Different true values for X-Burn-After-Read should work.""" diff --git a/tests/test_rate_limiting.py b/tests/test_rate_limiting.py index c8fac43..4b1bb1c 100644 --- a/tests/test_rate_limiting.py +++ b/tests/test_rate_limiting.py @@ -193,3 +193,77 @@ class TestRateLimitCleanup: cleaned = cleanup_rate_limits(window=60) assert isinstance(cleaned, int) assert cleaned >= 0 + + +class TestRateLimitMaxEntries: + """Tests for RATE-001: rate limit storage cap to prevent memory DoS.""" + + def test_max_entries_config_exists(self, app): + """RATE_LIMIT_MAX_ENTRIES config should exist.""" + assert "RATE_LIMIT_MAX_ENTRIES" in app.config + assert isinstance(app.config["RATE_LIMIT_MAX_ENTRIES"], int) + assert app.config["RATE_LIMIT_MAX_ENTRIES"] > 0 + + def test_rate_limit_storage_bounded(self, app): + """Rate limit storage should not exceed max entries.""" + from app.api.routes import ( + _rate_limit_requests, + check_rate_limit, + reset_rate_limits, + ) + + # Reset state + reset_rate_limits() + + # Set very low max entries for testing + original_max = app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000) + app.config["RATE_LIMIT_MAX_ENTRIES"] = 10 + + try: + with app.app_context(): + # Add more entries than max + for i in range(20): + check_rate_limit(f"192.168.1.{i}", authenticated=False) + + # Storage should be bounded + assert len(_rate_limit_requests) <= 10 + finally: + app.config["RATE_LIMIT_MAX_ENTRIES"] = original_max + reset_rate_limits() + + def test_prune_keeps_recent_entries(self, app): + """Pruning should keep the most recent entries.""" + import time + + from app.api.routes import ( + _rate_limit_requests, + check_rate_limit, + reset_rate_limits, + ) + + reset_rate_limits() + + original_max = app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000) + app.config["RATE_LIMIT_MAX_ENTRIES"] = 5 + + try: + with app.app_context(): + # Add entries with known order + for i in range(3): + check_rate_limit(f"old-{i}.0.0.1", authenticated=False) + time.sleep(0.01) # Ensure distinct timestamps + + # Add more recent entries + for i in range(3): + check_rate_limit(f"new-{i}.0.0.1", authenticated=False) + time.sleep(0.01) + + # When we exceed max, old entries should be pruned + # Storage should be bounded + assert len(_rate_limit_requests) <= 5 + + # Most recent should still be present + assert any("new" in ip for ip in _rate_limit_requests) + finally: + app.config["RATE_LIMIT_MAX_ENTRIES"] = original_max + reset_rate_limits() diff --git a/tests/test_security.py b/tests/test_security.py index 1e21287..8c4d739 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -1,6 +1,105 @@ """Security-focused tests for FlaskPaste.""" import json +import logging + + +class TestSecurityConfigValidation: + """Tests for security configuration validation at startup.""" + + def test_proxy_secret_warning_in_production(self, app, caplog): + """Warning logged when TRUSTED_PROXY_SECRET not set in production mode.""" + from app import validate_security_config + + # Simulate production: debug=False, TESTING=False + original_debug = app.debug + original_testing = app.config.get("TESTING") + original_secret = app.config.get("TRUSTED_PROXY_SECRET", "") + + try: + app.debug = False + app.config["TESTING"] = False + app.config["TRUSTED_PROXY_SECRET"] = "" + + with caplog.at_level(logging.WARNING): + validate_security_config(app) + + assert "TRUSTED_PROXY_SECRET is not set" in caplog.text + assert "SECURITY WARNING" in caplog.text + finally: + app.debug = original_debug + app.config["TESTING"] = original_testing + app.config["TRUSTED_PROXY_SECRET"] = original_secret + + def test_no_warning_when_proxy_secret_set(self, app, caplog): + """No warning when TRUSTED_PROXY_SECRET is properly configured.""" + from app import validate_security_config + + original_debug = app.debug + original_testing = app.config.get("TESTING") + original_secret = app.config.get("TRUSTED_PROXY_SECRET", "") + + try: + app.debug = False + app.config["TESTING"] = False + app.config["TRUSTED_PROXY_SECRET"] = "my-secure-secret" + + with caplog.at_level(logging.WARNING): + validate_security_config(app) + + assert "TRUSTED_PROXY_SECRET is not set" not in caplog.text + finally: + app.debug = original_debug + app.config["TESTING"] = original_testing + app.config["TRUSTED_PROXY_SECRET"] = original_secret + + def test_no_warning_in_debug_mode(self, app, caplog): + """No warning in debug mode even without proxy secret.""" + from app import validate_security_config + + original_debug = app.debug + original_secret = app.config.get("TRUSTED_PROXY_SECRET", "") + original_pki = app.config.get("PKI_ENABLED", False) + + try: + app.debug = True + app.config["TRUSTED_PROXY_SECRET"] = "" + app.config["PKI_ENABLED"] = False + + caplog.clear() + with caplog.at_level(logging.WARNING): + validate_security_config(app) + + assert "TRUSTED_PROXY_SECRET is not set" not in caplog.text + finally: + app.debug = original_debug + app.config["TRUSTED_PROXY_SECRET"] = original_secret + app.config["PKI_ENABLED"] = original_pki + + def test_pki_warning_without_proxy_secret(self, app, caplog): + """Warning when PKI enabled but no proxy secret.""" + from app import validate_security_config + + original_debug = app.debug + original_testing = app.config.get("TESTING") + original_secret = app.config.get("TRUSTED_PROXY_SECRET", "") + original_pki = app.config.get("PKI_ENABLED", False) + + try: + app.debug = False + app.config["TESTING"] = False + app.config["TRUSTED_PROXY_SECRET"] = "" + app.config["PKI_ENABLED"] = True + + with caplog.at_level(logging.WARNING): + validate_security_config(app) + + assert "PKI is enabled but TRUSTED_PROXY_SECRET is not set" in caplog.text + finally: + app.debug = original_debug + app.config["TESTING"] = original_testing + app.config["TRUSTED_PROXY_SECRET"] = original_secret + app.config["PKI_ENABLED"] = original_pki class TestSecurityHeaders: