Split authentication into two functions: - get_client_fingerprint(): Identity for ownership (any cert) - get_client_id(): Elevated privileges (trusted certs only) Behavior: - Anonymous: Create only, strict limits - Untrusted cert: Create + delete/update/list own pastes, strict limits - Trusted cert: All operations, relaxed limits (50MB, 5x rate) Updated tests to reflect new behavior where revoked certs can still manage their own pastes.
327 lines
12 KiB
Python
327 lines
12 KiB
Python
"""Security-focused tests for FlaskPaste."""
|
|
|
|
import json
|
|
|
|
|
|
class TestSecurityHeaders:
|
|
"""Tests for security headers."""
|
|
|
|
def test_x_content_type_options(self, client):
|
|
"""X-Content-Type-Options header is set."""
|
|
response = client.get("/")
|
|
assert response.headers.get("X-Content-Type-Options") == "nosniff"
|
|
|
|
def test_x_frame_options(self, client):
|
|
"""X-Frame-Options header is set."""
|
|
response = client.get("/")
|
|
assert response.headers.get("X-Frame-Options") == "DENY"
|
|
|
|
def test_x_xss_protection(self, client):
|
|
"""X-XSS-Protection header is set."""
|
|
response = client.get("/")
|
|
assert response.headers.get("X-XSS-Protection") == "1; mode=block"
|
|
|
|
def test_content_security_policy(self, client):
|
|
"""Content-Security-Policy header is set."""
|
|
response = client.get("/")
|
|
csp = response.headers.get("Content-Security-Policy")
|
|
assert csp is not None
|
|
assert "default-src 'none'" in csp
|
|
|
|
def test_referrer_policy(self, client):
|
|
"""Referrer-Policy header is set."""
|
|
response = client.get("/")
|
|
assert response.headers.get("Referrer-Policy") == "strict-origin-when-cross-origin"
|
|
|
|
def test_permissions_policy(self, client):
|
|
"""Permissions-Policy header is set."""
|
|
response = client.get("/")
|
|
assert response.headers.get("Permissions-Policy") is not None
|
|
|
|
def test_security_headers_on_error_responses(self, client):
|
|
"""Security headers are present on error responses."""
|
|
response = client.get("/nonexist1234") # 12 chars, valid format but not found
|
|
assert response.headers.get("X-Content-Type-Options") == "nosniff"
|
|
assert response.headers.get("X-Frame-Options") == "DENY"
|
|
|
|
def test_hsts_header(self, client):
|
|
"""Strict-Transport-Security header is set."""
|
|
response = client.get("/")
|
|
hsts = response.headers.get("Strict-Transport-Security")
|
|
assert hsts is not None
|
|
assert "max-age=" in hsts
|
|
assert "includeSubDomains" in hsts
|
|
|
|
def test_cache_control_header(self, client):
|
|
"""Cache-Control header prevents caching of sensitive data."""
|
|
response = client.get("/")
|
|
cache = response.headers.get("Cache-Control")
|
|
assert cache is not None
|
|
assert "no-store" in cache
|
|
assert "private" in cache
|
|
|
|
def test_pragma_header(self, client):
|
|
"""Pragma header set for HTTP/1.0 compatibility."""
|
|
response = client.get("/")
|
|
assert response.headers.get("Pragma") == "no-cache"
|
|
|
|
|
|
class TestRequestIdTracking:
|
|
"""Tests for X-Request-ID tracking."""
|
|
|
|
def test_request_id_generated(self, client):
|
|
"""Request ID is generated when not provided."""
|
|
response = client.get("/")
|
|
request_id = response.headers.get("X-Request-ID")
|
|
assert request_id is not None
|
|
# Should be a valid UUID format
|
|
assert len(request_id) == 36
|
|
assert request_id.count("-") == 4
|
|
|
|
def test_request_id_passed_through(self, client):
|
|
"""Request ID from client is echoed back."""
|
|
custom_id = "my-custom-request-id-12345"
|
|
response = client.get("/", headers={"X-Request-ID": custom_id})
|
|
assert response.headers.get("X-Request-ID") == custom_id
|
|
|
|
def test_request_id_on_error_responses(self, client):
|
|
"""Request ID is present on error responses."""
|
|
custom_id = "error-request-id-67890"
|
|
response = client.get("/nonexist1234", headers={"X-Request-ID": custom_id})
|
|
assert response.headers.get("X-Request-ID") == custom_id
|
|
|
|
|
|
class TestProxyTrustValidation:
|
|
"""Tests for reverse proxy trust validation."""
|
|
|
|
def test_auth_works_without_proxy_secret_configured(self, client, sample_text, auth_header):
|
|
"""Auth header trusted when no proxy secret is configured (default)."""
|
|
response = client.post(
|
|
"/",
|
|
data=sample_text,
|
|
content_type="text/plain",
|
|
headers=auth_header,
|
|
)
|
|
assert response.status_code == 201
|
|
import json
|
|
|
|
data = json.loads(response.data)
|
|
assert "owner" in data
|
|
|
|
def test_auth_ignored_with_wrong_proxy_secret(self, app, client, sample_text, auth_header):
|
|
"""Auth header ignored when proxy secret doesn't match."""
|
|
# Configure a proxy secret
|
|
app.config["TRUSTED_PROXY_SECRET"] = "correct-secret-value"
|
|
|
|
try:
|
|
# Request with wrong secret - auth should be ignored
|
|
headers = dict(auth_header)
|
|
headers["X-Proxy-Secret"] = "wrong-secret"
|
|
response = client.post(
|
|
"/",
|
|
data=sample_text,
|
|
content_type="text/plain",
|
|
headers=headers,
|
|
)
|
|
assert response.status_code == 201
|
|
import json
|
|
|
|
data = json.loads(response.data)
|
|
# Owner should NOT be set because proxy wasn't trusted
|
|
assert "owner" not in data
|
|
finally:
|
|
# Reset config
|
|
app.config["TRUSTED_PROXY_SECRET"] = ""
|
|
|
|
def test_auth_works_with_correct_proxy_secret(self, app, client, sample_text, auth_header):
|
|
"""Auth header trusted when proxy secret matches."""
|
|
# Configure a proxy secret
|
|
app.config["TRUSTED_PROXY_SECRET"] = "correct-secret-value"
|
|
|
|
try:
|
|
# Request with correct secret - auth should work
|
|
headers = dict(auth_header)
|
|
headers["X-Proxy-Secret"] = "correct-secret-value"
|
|
response = client.post(
|
|
"/",
|
|
data=sample_text,
|
|
content_type="text/plain",
|
|
headers=headers,
|
|
)
|
|
assert response.status_code == 201
|
|
import json
|
|
|
|
data = json.loads(response.data)
|
|
assert "owner" in data
|
|
finally:
|
|
# Reset config
|
|
app.config["TRUSTED_PROXY_SECRET"] = ""
|
|
|
|
|
|
class TestInputValidation:
|
|
"""Tests for input validation and sanitization."""
|
|
|
|
def test_paste_id_hex_only(self, client):
|
|
"""Paste IDs must be hexadecimal."""
|
|
# IDs that match the route pattern but fail validation (12 chars)
|
|
invalid_ids = [
|
|
"ABCD12345678", # uppercase
|
|
"abcd-2345678", # dash
|
|
"abcd_2345678", # underscore
|
|
"abcd<>345678", # angle brackets
|
|
]
|
|
for invalid_id in invalid_ids:
|
|
response = client.get(f"/{invalid_id}")
|
|
assert response.status_code == 400, f"Expected 400 for ID: {invalid_id}"
|
|
|
|
# IDs with slashes are routed differently (404 from Flask routing)
|
|
slash_ids = ["abcd/2345678", "../abcd12345"]
|
|
for invalid_id in slash_ids:
|
|
response = client.get(f"/{invalid_id}")
|
|
assert response.status_code in (400, 404), f"Unexpected for ID: {invalid_id}"
|
|
|
|
def test_paste_id_length_enforced(self, client):
|
|
"""Paste IDs must be exactly the configured length (12 chars)."""
|
|
too_short = "abcd1234" # 8 chars
|
|
too_long = "abcdef1234567890abcd" # 20 chars
|
|
|
|
assert client.get(f"/{too_short}").status_code == 400
|
|
assert client.get(f"/{too_long}").status_code == 400
|
|
|
|
def test_auth_header_format_validated(self, client, sample_text):
|
|
"""Auth header must be valid SHA1 format."""
|
|
invalid_headers = [
|
|
"a" * 39, # too short
|
|
"a" * 41, # too long
|
|
"g" * 40, # invalid hex
|
|
"A" * 40, # uppercase (should be lowercased internally)
|
|
"a-a" * 13 + "a", # dashes
|
|
]
|
|
|
|
for invalid in invalid_headers:
|
|
response = client.post(
|
|
"/",
|
|
data=sample_text,
|
|
content_type="text/plain",
|
|
headers={"X-SSL-Client-SHA1": invalid},
|
|
)
|
|
# Invalid auth is ignored, not rejected (treated as anonymous)
|
|
data = json.loads(response.data)
|
|
assert "owner" not in data or data.get("owner") != invalid
|
|
|
|
def test_mime_type_sanitized(self, client):
|
|
"""MIME types are sanitized to prevent injection."""
|
|
# Flask/Werkzeug rejects newlines in headers at the framework level
|
|
# Test that MIME type parameters are stripped to base type
|
|
response = client.post(
|
|
"/",
|
|
data="test content",
|
|
content_type="text/plain; charset=utf-8; boundary=something",
|
|
)
|
|
data = json.loads(response.data)
|
|
# Should be sanitized to just the base MIME type
|
|
assert data["mime_type"] == "text/plain"
|
|
assert "charset" not in data["mime_type"]
|
|
assert "boundary" not in data["mime_type"]
|
|
|
|
|
|
class TestSizeLimits:
|
|
"""Tests for paste size limits."""
|
|
|
|
def test_anonymous_size_limit(self, app, client):
|
|
"""Anonymous pastes are limited in size."""
|
|
max_size = app.config["MAX_PASTE_SIZE_ANON"]
|
|
oversized = "x" * (max_size + 1)
|
|
|
|
response = client.post("/", data=oversized, content_type="text/plain")
|
|
assert response.status_code == 413
|
|
data = json.loads(response.data)
|
|
assert "error" in data
|
|
assert data["trusted"] is False
|
|
|
|
def test_authenticated_larger_limit(self, app, client, auth_header):
|
|
"""Authenticated users have larger size limit."""
|
|
anon_max = app.config["MAX_PASTE_SIZE_ANON"]
|
|
auth_max = app.config["MAX_PASTE_SIZE_AUTH"]
|
|
|
|
# Size that exceeds anon but within auth limit
|
|
# (only test if auth limit is larger)
|
|
if auth_max > anon_max:
|
|
size = anon_max + 100
|
|
content = "x" * size
|
|
|
|
response = client.post(
|
|
"/",
|
|
data=content,
|
|
content_type="text/plain",
|
|
headers=auth_header,
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
|
|
class TestOwnershipEnforcement:
|
|
"""Tests for paste ownership and access control."""
|
|
|
|
def test_cannot_delete_others_paste(self, client, sample_text, auth_header, other_auth_header):
|
|
"""Users cannot delete pastes they don't own."""
|
|
create = client.post(
|
|
"/",
|
|
data=sample_text,
|
|
content_type="text/plain",
|
|
headers=auth_header,
|
|
)
|
|
paste_id = json.loads(create.data)["id"]
|
|
|
|
response = client.delete(f"/{paste_id}", headers=other_auth_header)
|
|
assert response.status_code == 403
|
|
|
|
# Paste should still exist
|
|
assert client.get(f"/{paste_id}").status_code == 200
|
|
|
|
def test_anonymous_paste_undeletable(self, client, sample_text, auth_header):
|
|
"""Anonymous pastes cannot be deleted by anyone."""
|
|
create = client.post("/", data=sample_text, content_type="text/plain")
|
|
paste_id = json.loads(create.data)["id"]
|
|
|
|
response = client.delete(f"/{paste_id}", headers=auth_header)
|
|
assert response.status_code == 403
|
|
|
|
def test_owner_can_delete(self, client, sample_text, auth_header):
|
|
"""Owners can delete their own pastes."""
|
|
create = client.post(
|
|
"/",
|
|
data=sample_text,
|
|
content_type="text/plain",
|
|
headers=auth_header,
|
|
)
|
|
paste_id = json.loads(create.data)["id"]
|
|
|
|
response = client.delete(f"/{paste_id}", headers=auth_header)
|
|
assert response.status_code == 200
|
|
|
|
|
|
class TestJsonResponses:
|
|
"""Tests for JSON response format and encoding."""
|
|
|
|
def test_json_content_type(self, client):
|
|
"""API responses have correct Content-Type."""
|
|
response = client.get("/")
|
|
assert "application/json" in response.content_type
|
|
|
|
def test_unicode_in_response(self, client):
|
|
"""Unicode content is properly encoded in responses."""
|
|
unicode_text = "Hello 你好 مرحبا 🎉"
|
|
create = client.post("/", data=unicode_text.encode("utf-8"))
|
|
assert create.status_code == 201
|
|
|
|
paste_id = json.loads(create.data)["id"]
|
|
raw = client.get(f"/{paste_id}/raw")
|
|
assert raw.data.decode("utf-8") == unicode_text
|
|
|
|
def test_error_responses_are_json(self, client):
|
|
"""Error responses are valid JSON."""
|
|
response = client.get("/nonexist1234") # 12 chars
|
|
assert response.status_code in (400, 404)
|
|
data = json.loads(response.data)
|
|
assert "error" in data
|