forked from claw/flaskpaste
PROXY-001: Add startup warning when TRUSTED_PROXY_SECRET empty in production - validate_security_config() checks for missing proxy secret - Additional warning when PKI enabled without proxy secret - Tests for security configuration validation BURN-001: HEAD requests now trigger burn-after-read deletion - Prevents attacker from probing paste existence before retrieval - Updated test to verify new behavior RATE-001: Add RATE_LIMIT_MAX_ENTRIES to cap memory usage - Default 10000 unique IPs tracked - Prunes oldest entries when limit exceeded - Protects against memory exhaustion DoS Test count: 284 -> 291 (7 new security tests)
426 lines
16 KiB
Python
426 lines
16 KiB
Python
"""Security-focused tests for FlaskPaste."""
|
|
|
|
import json
|
|
import logging
|
|
|
|
|
|
class TestSecurityConfigValidation:
|
|
"""Tests for security configuration validation at startup."""
|
|
|
|
def test_proxy_secret_warning_in_production(self, app, caplog):
|
|
"""Warning logged when TRUSTED_PROXY_SECRET not set in production mode."""
|
|
from app import validate_security_config
|
|
|
|
# Simulate production: debug=False, TESTING=False
|
|
original_debug = app.debug
|
|
original_testing = app.config.get("TESTING")
|
|
original_secret = app.config.get("TRUSTED_PROXY_SECRET", "")
|
|
|
|
try:
|
|
app.debug = False
|
|
app.config["TESTING"] = False
|
|
app.config["TRUSTED_PROXY_SECRET"] = ""
|
|
|
|
with caplog.at_level(logging.WARNING):
|
|
validate_security_config(app)
|
|
|
|
assert "TRUSTED_PROXY_SECRET is not set" in caplog.text
|
|
assert "SECURITY WARNING" in caplog.text
|
|
finally:
|
|
app.debug = original_debug
|
|
app.config["TESTING"] = original_testing
|
|
app.config["TRUSTED_PROXY_SECRET"] = original_secret
|
|
|
|
def test_no_warning_when_proxy_secret_set(self, app, caplog):
|
|
"""No warning when TRUSTED_PROXY_SECRET is properly configured."""
|
|
from app import validate_security_config
|
|
|
|
original_debug = app.debug
|
|
original_testing = app.config.get("TESTING")
|
|
original_secret = app.config.get("TRUSTED_PROXY_SECRET", "")
|
|
|
|
try:
|
|
app.debug = False
|
|
app.config["TESTING"] = False
|
|
app.config["TRUSTED_PROXY_SECRET"] = "my-secure-secret"
|
|
|
|
with caplog.at_level(logging.WARNING):
|
|
validate_security_config(app)
|
|
|
|
assert "TRUSTED_PROXY_SECRET is not set" not in caplog.text
|
|
finally:
|
|
app.debug = original_debug
|
|
app.config["TESTING"] = original_testing
|
|
app.config["TRUSTED_PROXY_SECRET"] = original_secret
|
|
|
|
def test_no_warning_in_debug_mode(self, app, caplog):
|
|
"""No warning in debug mode even without proxy secret."""
|
|
from app import validate_security_config
|
|
|
|
original_debug = app.debug
|
|
original_secret = app.config.get("TRUSTED_PROXY_SECRET", "")
|
|
original_pki = app.config.get("PKI_ENABLED", False)
|
|
|
|
try:
|
|
app.debug = True
|
|
app.config["TRUSTED_PROXY_SECRET"] = ""
|
|
app.config["PKI_ENABLED"] = False
|
|
|
|
caplog.clear()
|
|
with caplog.at_level(logging.WARNING):
|
|
validate_security_config(app)
|
|
|
|
assert "TRUSTED_PROXY_SECRET is not set" not in caplog.text
|
|
finally:
|
|
app.debug = original_debug
|
|
app.config["TRUSTED_PROXY_SECRET"] = original_secret
|
|
app.config["PKI_ENABLED"] = original_pki
|
|
|
|
def test_pki_warning_without_proxy_secret(self, app, caplog):
|
|
"""Warning when PKI enabled but no proxy secret."""
|
|
from app import validate_security_config
|
|
|
|
original_debug = app.debug
|
|
original_testing = app.config.get("TESTING")
|
|
original_secret = app.config.get("TRUSTED_PROXY_SECRET", "")
|
|
original_pki = app.config.get("PKI_ENABLED", False)
|
|
|
|
try:
|
|
app.debug = False
|
|
app.config["TESTING"] = False
|
|
app.config["TRUSTED_PROXY_SECRET"] = ""
|
|
app.config["PKI_ENABLED"] = True
|
|
|
|
with caplog.at_level(logging.WARNING):
|
|
validate_security_config(app)
|
|
|
|
assert "PKI is enabled but TRUSTED_PROXY_SECRET is not set" in caplog.text
|
|
finally:
|
|
app.debug = original_debug
|
|
app.config["TESTING"] = original_testing
|
|
app.config["TRUSTED_PROXY_SECRET"] = original_secret
|
|
app.config["PKI_ENABLED"] = original_pki
|
|
|
|
|
|
class TestSecurityHeaders:
|
|
"""Tests for security headers."""
|
|
|
|
def test_x_content_type_options(self, client):
|
|
"""X-Content-Type-Options header is set."""
|
|
response = client.get("/")
|
|
assert response.headers.get("X-Content-Type-Options") == "nosniff"
|
|
|
|
def test_x_frame_options(self, client):
|
|
"""X-Frame-Options header is set."""
|
|
response = client.get("/")
|
|
assert response.headers.get("X-Frame-Options") == "DENY"
|
|
|
|
def test_x_xss_protection_not_present(self, client):
|
|
"""X-XSS-Protection header is not set (deprecated, superseded by CSP)."""
|
|
response = client.get("/")
|
|
assert response.headers.get("X-XSS-Protection") is None
|
|
|
|
def test_content_security_policy(self, client):
|
|
"""Content-Security-Policy header is set."""
|
|
response = client.get("/")
|
|
csp = response.headers.get("Content-Security-Policy")
|
|
assert csp is not None
|
|
assert "default-src 'none'" in csp
|
|
|
|
def test_referrer_policy(self, client):
|
|
"""Referrer-Policy header is set."""
|
|
response = client.get("/")
|
|
assert response.headers.get("Referrer-Policy") == "strict-origin-when-cross-origin"
|
|
|
|
def test_permissions_policy(self, client):
|
|
"""Permissions-Policy header is set."""
|
|
response = client.get("/")
|
|
assert response.headers.get("Permissions-Policy") is not None
|
|
|
|
def test_security_headers_on_error_responses(self, client):
|
|
"""Security headers are present on error responses."""
|
|
response = client.get("/nonexist1234") # 12 chars, valid format but not found
|
|
assert response.headers.get("X-Content-Type-Options") == "nosniff"
|
|
assert response.headers.get("X-Frame-Options") == "DENY"
|
|
|
|
def test_hsts_header(self, client):
|
|
"""Strict-Transport-Security header is set."""
|
|
response = client.get("/")
|
|
hsts = response.headers.get("Strict-Transport-Security")
|
|
assert hsts is not None
|
|
assert "max-age=" in hsts
|
|
assert "includeSubDomains" in hsts
|
|
|
|
def test_cache_control_header(self, client):
|
|
"""Cache-Control header prevents caching of sensitive data."""
|
|
response = client.get("/")
|
|
cache = response.headers.get("Cache-Control")
|
|
assert cache is not None
|
|
assert "no-store" in cache
|
|
assert "private" in cache
|
|
|
|
def test_pragma_header(self, client):
|
|
"""Pragma header set for HTTP/1.0 compatibility."""
|
|
response = client.get("/")
|
|
assert response.headers.get("Pragma") == "no-cache"
|
|
|
|
|
|
class TestRequestIdTracking:
|
|
"""Tests for X-Request-ID tracking."""
|
|
|
|
def test_request_id_generated(self, client):
|
|
"""Request ID is generated when not provided."""
|
|
response = client.get("/")
|
|
request_id = response.headers.get("X-Request-ID")
|
|
assert request_id is not None
|
|
# Should be a valid UUID format
|
|
assert len(request_id) == 36
|
|
assert request_id.count("-") == 4
|
|
|
|
def test_request_id_passed_through(self, client):
|
|
"""Request ID from client is echoed back."""
|
|
custom_id = "my-custom-request-id-12345"
|
|
response = client.get("/", headers={"X-Request-ID": custom_id})
|
|
assert response.headers.get("X-Request-ID") == custom_id
|
|
|
|
def test_request_id_on_error_responses(self, client):
|
|
"""Request ID is present on error responses."""
|
|
custom_id = "error-request-id-67890"
|
|
response = client.get("/nonexist1234", headers={"X-Request-ID": custom_id})
|
|
assert response.headers.get("X-Request-ID") == custom_id
|
|
|
|
|
|
class TestProxyTrustValidation:
|
|
"""Tests for reverse proxy trust validation."""
|
|
|
|
def test_auth_works_without_proxy_secret_configured(self, client, sample_text, auth_header):
|
|
"""Auth header trusted when no proxy secret is configured (default)."""
|
|
response = client.post(
|
|
"/",
|
|
data=sample_text,
|
|
content_type="text/plain",
|
|
headers=auth_header,
|
|
)
|
|
assert response.status_code == 201
|
|
import json
|
|
|
|
data = json.loads(response.data)
|
|
assert "owner" in data
|
|
|
|
def test_auth_ignored_with_wrong_proxy_secret(self, app, client, sample_text, auth_header):
|
|
"""Auth header ignored when proxy secret doesn't match."""
|
|
# Configure a proxy secret
|
|
app.config["TRUSTED_PROXY_SECRET"] = "correct-secret-value"
|
|
|
|
try:
|
|
# Request with wrong secret - auth should be ignored
|
|
headers = dict(auth_header)
|
|
headers["X-Proxy-Secret"] = "wrong-secret"
|
|
response = client.post(
|
|
"/",
|
|
data=sample_text,
|
|
content_type="text/plain",
|
|
headers=headers,
|
|
)
|
|
assert response.status_code == 201
|
|
import json
|
|
|
|
data = json.loads(response.data)
|
|
# Owner should NOT be set because proxy wasn't trusted
|
|
assert "owner" not in data
|
|
finally:
|
|
# Reset config
|
|
app.config["TRUSTED_PROXY_SECRET"] = ""
|
|
|
|
def test_auth_works_with_correct_proxy_secret(self, app, client, sample_text, auth_header):
|
|
"""Auth header trusted when proxy secret matches."""
|
|
# Configure a proxy secret
|
|
app.config["TRUSTED_PROXY_SECRET"] = "correct-secret-value"
|
|
|
|
try:
|
|
# Request with correct secret - auth should work
|
|
headers = dict(auth_header)
|
|
headers["X-Proxy-Secret"] = "correct-secret-value"
|
|
response = client.post(
|
|
"/",
|
|
data=sample_text,
|
|
content_type="text/plain",
|
|
headers=headers,
|
|
)
|
|
assert response.status_code == 201
|
|
import json
|
|
|
|
data = json.loads(response.data)
|
|
assert "owner" in data
|
|
finally:
|
|
# Reset config
|
|
app.config["TRUSTED_PROXY_SECRET"] = ""
|
|
|
|
|
|
class TestInputValidation:
|
|
"""Tests for input validation and sanitization."""
|
|
|
|
def test_paste_id_hex_only(self, client):
|
|
"""Paste IDs must be hexadecimal."""
|
|
# IDs that match the route pattern but fail validation (12 chars)
|
|
invalid_ids = [
|
|
"ABCD12345678", # uppercase
|
|
"abcd-2345678", # dash
|
|
"abcd_2345678", # underscore
|
|
"abcd<>345678", # angle brackets
|
|
]
|
|
for invalid_id in invalid_ids:
|
|
response = client.get(f"/{invalid_id}")
|
|
assert response.status_code == 400, f"Expected 400 for ID: {invalid_id}"
|
|
|
|
# IDs with slashes are routed differently (404 from Flask routing)
|
|
slash_ids = ["abcd/2345678", "../abcd12345"]
|
|
for invalid_id in slash_ids:
|
|
response = client.get(f"/{invalid_id}")
|
|
assert response.status_code in (400, 404), f"Unexpected for ID: {invalid_id}"
|
|
|
|
def test_paste_id_length_enforced(self, client):
|
|
"""Paste IDs must be exactly the configured length (12 chars)."""
|
|
too_short = "abcd1234" # 8 chars
|
|
too_long = "abcdef1234567890abcd" # 20 chars
|
|
|
|
assert client.get(f"/{too_short}").status_code == 400
|
|
assert client.get(f"/{too_long}").status_code == 400
|
|
|
|
def test_auth_header_format_validated(self, client, sample_text):
|
|
"""Auth header must be valid SHA1 format."""
|
|
invalid_headers = [
|
|
"a" * 39, # too short
|
|
"a" * 41, # too long
|
|
"g" * 40, # invalid hex
|
|
"A" * 40, # uppercase (should be lowercased internally)
|
|
"a-a" * 13 + "a", # dashes
|
|
]
|
|
|
|
for invalid in invalid_headers:
|
|
response = client.post(
|
|
"/",
|
|
data=sample_text,
|
|
content_type="text/plain",
|
|
headers={"X-SSL-Client-SHA1": invalid},
|
|
)
|
|
# Invalid auth is ignored, not rejected (treated as anonymous)
|
|
data = json.loads(response.data)
|
|
assert "owner" not in data or data.get("owner") != invalid
|
|
|
|
def test_mime_type_sanitized(self, client):
|
|
"""MIME types are sanitized to prevent injection."""
|
|
# Flask/Werkzeug rejects newlines in headers at the framework level
|
|
# Test that MIME type parameters are stripped to base type
|
|
response = client.post(
|
|
"/",
|
|
data="test content",
|
|
content_type="text/plain; charset=utf-8; boundary=something",
|
|
)
|
|
data = json.loads(response.data)
|
|
# Should be sanitized to just the base MIME type
|
|
assert data["mime_type"] == "text/plain"
|
|
assert "charset" not in data["mime_type"]
|
|
assert "boundary" not in data["mime_type"]
|
|
|
|
|
|
class TestSizeLimits:
|
|
"""Tests for paste size limits."""
|
|
|
|
def test_anonymous_size_limit(self, app, client):
|
|
"""Anonymous pastes are limited in size."""
|
|
max_size = app.config["MAX_PASTE_SIZE_ANON"]
|
|
oversized = "x" * (max_size + 1)
|
|
|
|
response = client.post("/", data=oversized, content_type="text/plain")
|
|
assert response.status_code == 413
|
|
data = json.loads(response.data)
|
|
assert "error" in data
|
|
assert data["trusted"] is False
|
|
|
|
def test_authenticated_larger_limit(self, app, client, auth_header):
|
|
"""Authenticated users have larger size limit."""
|
|
anon_max = app.config["MAX_PASTE_SIZE_ANON"]
|
|
auth_max = app.config["MAX_PASTE_SIZE_AUTH"]
|
|
|
|
# Size that exceeds anon but within auth limit
|
|
# (only test if auth limit is larger)
|
|
if auth_max > anon_max:
|
|
size = anon_max + 100
|
|
content = "x" * size
|
|
|
|
response = client.post(
|
|
"/",
|
|
data=content,
|
|
content_type="text/plain",
|
|
headers=auth_header,
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
|
|
class TestOwnershipEnforcement:
|
|
"""Tests for paste ownership and access control."""
|
|
|
|
def test_cannot_delete_others_paste(self, client, sample_text, auth_header, other_auth_header):
|
|
"""Users cannot delete pastes they don't own."""
|
|
create = client.post(
|
|
"/",
|
|
data=sample_text,
|
|
content_type="text/plain",
|
|
headers=auth_header,
|
|
)
|
|
paste_id = json.loads(create.data)["id"]
|
|
|
|
response = client.delete(f"/{paste_id}", headers=other_auth_header)
|
|
assert response.status_code == 403
|
|
|
|
# Paste should still exist
|
|
assert client.get(f"/{paste_id}").status_code == 200
|
|
|
|
def test_anonymous_paste_undeletable(self, client, sample_text, auth_header):
|
|
"""Anonymous pastes cannot be deleted by anyone."""
|
|
create = client.post("/", data=sample_text, content_type="text/plain")
|
|
paste_id = json.loads(create.data)["id"]
|
|
|
|
response = client.delete(f"/{paste_id}", headers=auth_header)
|
|
assert response.status_code == 403
|
|
|
|
def test_owner_can_delete(self, client, sample_text, auth_header):
|
|
"""Owners can delete their own pastes."""
|
|
create = client.post(
|
|
"/",
|
|
data=sample_text,
|
|
content_type="text/plain",
|
|
headers=auth_header,
|
|
)
|
|
paste_id = json.loads(create.data)["id"]
|
|
|
|
response = client.delete(f"/{paste_id}", headers=auth_header)
|
|
assert response.status_code == 200
|
|
|
|
|
|
class TestJsonResponses:
|
|
"""Tests for JSON response format and encoding."""
|
|
|
|
def test_json_content_type(self, client):
|
|
"""API responses have correct Content-Type."""
|
|
response = client.get("/")
|
|
assert "application/json" in response.content_type
|
|
|
|
def test_unicode_in_response(self, client):
|
|
"""Unicode content is properly encoded in responses."""
|
|
unicode_text = "Hello 你好 مرحبا 🎉"
|
|
create = client.post("/", data=unicode_text.encode("utf-8"))
|
|
assert create.status_code == 201
|
|
|
|
paste_id = json.loads(create.data)["id"]
|
|
raw = client.get(f"/{paste_id}/raw")
|
|
assert raw.data.decode("utf-8") == unicode_text
|
|
|
|
def test_error_responses_are_json(self, client):
|
|
"""Error responses are valid JSON."""
|
|
response = client.get("/nonexist1234") # 12 chars
|
|
assert response.status_code in (400, 404)
|
|
data = json.loads(response.data)
|
|
assert "error" in data
|