Files
flaskpaste/tests/test_fuzz.py
Username debdc8478e add hypothesis-based fuzzing test suite
18 property-based tests covering:
- Content handling (binary, text, unicode)
- Paste ID validation and path traversal
- Header fuzzing (auth, proxy, XFF)
- JSON endpoint fuzzing
- Size limit enforcement
- Injection detection (SQLi, SSTI, XSS)
- Error handling paths
2025-12-25 19:20:16 +01:00

346 lines
13 KiB
Python

"""Hypothesis-based fuzzing tests for FlaskPaste API."""
import json
import string
from typing import ClassVar
import pytest
from hypothesis import HealthCheck, given, settings
from hypothesis import strategies as st
# Common health check suppressions for pytest fixtures
FIXTURE_HEALTH_CHECKS = [HealthCheck.function_scoped_fixture, HealthCheck.too_slow]
# Strategies for generating test data
PRINTABLE = string.printable
HEX_CHARS = string.hexdigits.lower()
# Custom strategies
paste_content = st.binary(min_size=0, max_size=50000)
text_content = st.text(min_size=0, max_size=10000, alphabet=st.characters())
unicode_content = st.text(
min_size=1,
max_size=5000,
alphabet=st.characters(
whitelist_categories=("L", "N", "P", "S", "Z"),
blacklist_characters="\x00",
),
)
paste_id = st.text(min_size=1, max_size=50, alphabet=HEX_CHARS + "/<>\"'%;()&+-_.")
header_value = st.text(
min_size=0, max_size=500, alphabet=PRINTABLE.replace("\r", "").replace("\n", "")
)
json_primitive = st.one_of(
st.text(), st.integers(), st.floats(allow_nan=False), st.none(), st.booleans()
)
class TestContentFuzzing:
"""Fuzz paste content handling."""
@settings(max_examples=200, suppress_health_check=FIXTURE_HEALTH_CHECKS)
@given(content=paste_content)
def test_binary_content_no_crash(self, client, content):
"""Binary content should never crash the server."""
response = client.post(
"/",
data=content,
content_type="application/octet-stream",
)
assert response.status_code in (201, 400, 413, 429, 503)
@settings(max_examples=200, suppress_health_check=FIXTURE_HEALTH_CHECKS)
@given(content=text_content)
def test_text_content_no_crash(self, client, content):
"""Text content should never crash the server."""
response = client.post(
"/",
data=content.encode("utf-8", errors="replace"),
content_type="text/plain",
)
assert response.status_code in (201, 400, 413, 429, 503)
@settings(max_examples=100, suppress_health_check=FIXTURE_HEALTH_CHECKS)
@given(content=unicode_content)
def test_unicode_roundtrip(self, client, content):
"""Unicode content should survive storage and retrieval."""
response = client.post(
"/",
data=content.encode("utf-8"),
content_type="text/plain; charset=utf-8",
)
if response.status_code == 201:
data = json.loads(response.data)
paste_id = data["id"]
raw = client.get(f"/{paste_id}/raw")
assert raw.status_code == 200
# Content should match (may have charset normalization)
assert raw.data.decode("utf-8") == content
class TestPasteIdFuzzing:
"""Fuzz paste ID handling."""
@settings(max_examples=300, suppress_health_check=FIXTURE_HEALTH_CHECKS)
@given(paste_id=paste_id)
def test_malformed_paste_id(self, client, paste_id):
"""Malformed paste IDs should return 400 or 404, never crash."""
try:
response = client.get(f"/{paste_id}")
except UnicodeError:
# Some malformed paths cause IDNA encoding issues in URL normalization
# This is expected behavior for invalid paths
return
# Should never return 500
assert response.status_code != 500
# Valid responses: 200 (found), 400 (invalid), 404 (not found), 308 (redirect)
assert response.status_code in (200, 400, 404, 308)
@settings(max_examples=200, suppress_health_check=FIXTURE_HEALTH_CHECKS)
@given(paste_id=st.text(min_size=1, max_size=100, alphabet="/<>\"'%;()&+\\"))
def test_special_chars_in_id(self, client, paste_id):
"""Special characters in paste ID should be handled safely."""
response = client.get(f"/{paste_id}")
assert response.status_code in (200, 400, 404, 405, 308)
# Verify response is valid JSON or HTML
if response.content_type and "json" in response.content_type:
try:
json.loads(response.data)
except json.JSONDecodeError:
pytest.fail("Invalid JSON response")
class TestHeaderFuzzing:
"""Fuzz HTTP header handling."""
@settings(max_examples=150, suppress_health_check=FIXTURE_HEALTH_CHECKS)
@given(sha1=header_value)
def test_auth_header_fuzz(self, client, sha1):
"""Arbitrary X-SSL-Client-SHA1 values should not crash."""
response = client.get("/", headers={"X-SSL-Client-SHA1": sha1})
assert response.status_code in (200, 400, 401, 403)
@settings(max_examples=150, suppress_health_check=FIXTURE_HEALTH_CHECKS)
@given(secret=header_value)
def test_proxy_secret_fuzz(self, client, secret):
"""Arbitrary X-Proxy-Secret values should not crash."""
response = client.get("/", headers={"X-Proxy-Secret": secret})
assert response.status_code in (200, 400, 401, 403)
@settings(max_examples=100, suppress_health_check=FIXTURE_HEALTH_CHECKS)
@given(xff=st.lists(st.ip_addresses(), min_size=1, max_size=10))
def test_xff_chain_fuzz(self, client, xff):
"""X-Forwarded-For chains should be handled safely."""
xff_header = ", ".join(str(ip) for ip in xff)
response = client.get("/", headers={"X-Forwarded-For": xff_header})
assert response.status_code in (200, 400, 429)
@settings(max_examples=100, suppress_health_check=FIXTURE_HEALTH_CHECKS)
@given(request_id=header_value)
def test_request_id_fuzz(self, client, request_id):
"""Arbitrary X-Request-ID values should be echoed safely."""
response = client.get("/", headers={"X-Request-ID": request_id})
assert response.status_code == 200
# Should echo back or generate new one
assert response.headers.get("X-Request-ID") is not None
class TestMimeTypeFuzzing:
"""Fuzz MIME type handling."""
@settings(max_examples=150, suppress_health_check=FIXTURE_HEALTH_CHECKS)
@given(
mime=st.text(
min_size=1, max_size=200, alphabet=PRINTABLE.replace("\r", "").replace("\n", "")
)
)
def test_arbitrary_content_type(self, client, mime):
"""Arbitrary Content-Type headers should not crash."""
try:
response = client.post(
"/",
data=b"test content",
content_type=mime,
)
assert response.status_code in (201, 400, 413, 429, 503)
except UnicodeEncodeError:
# Some strings can't be encoded as headers - that's fine
pass
class TestJsonFuzzing:
"""Fuzz JSON endpoint handling."""
@settings(max_examples=100, suppress_health_check=FIXTURE_HEALTH_CHECKS, deadline=None)
@given(
data=st.dictionaries(
st.text(min_size=1, max_size=50),
json_primitive,
min_size=0,
max_size=20,
)
)
def test_pki_ca_json_fuzz(self, client, data):
"""PKI CA endpoint should handle arbitrary JSON."""
response = client.post(
"/pki/ca",
data=json.dumps(data),
content_type="application/json",
)
# Should return structured response, not crash
# 201 = CA created, 200 = already exists, 400 = bad request, 404 = PKI disabled
assert response.status_code in (200, 201, 400, 401, 403, 404, 409)
assert response.status_code != 500
@settings(max_examples=100, suppress_health_check=FIXTURE_HEALTH_CHECKS)
@given(
data=st.recursive(
json_primitive,
lambda children: st.lists(children, max_size=5)
| st.dictionaries(st.text(max_size=10), children, max_size=5),
max_leaves=20,
)
)
def test_nested_json_fuzz(self, client, data):
"""Deeply nested JSON should not cause stack overflow."""
try:
payload = json.dumps({"data": data})
except (ValueError, RecursionError):
return # Can't serialize - skip
response = client.post(
"/pki/issue",
data=payload,
content_type="application/json",
)
assert response.status_code != 500 or "error" in response.data.decode()
class TestSizeLimitFuzzing:
"""Fuzz size limit enforcement."""
@settings(
max_examples=50,
suppress_health_check=[*FIXTURE_HEALTH_CHECKS, HealthCheck.data_too_large],
)
@given(size=st.integers(min_value=0, max_value=2_000_000))
def test_size_boundary(self, client, size):
"""Size limits should be enforced consistently."""
# Generate content of specific size (capped to avoid memory issues)
actual_size = min(size, 500_000)
content = b"x" * actual_size
response = client.post(
"/",
data=content,
content_type="text/plain",
)
# Should reject if too large, accept if within limits
assert response.status_code in (201, 400, 413, 429, 503)
class TestConcurrencyFuzzing:
"""Fuzz concurrent access patterns."""
@settings(max_examples=20, suppress_health_check=FIXTURE_HEALTH_CHECKS, deadline=None)
@given(content=st.binary(min_size=10, max_size=1000))
def test_rapid_create_delete(self, client, content, auth_header):
"""Rapid create/delete should not corrupt state."""
# Create
create = client.post(
"/",
data=content,
content_type="application/octet-stream",
headers=auth_header,
)
if create.status_code != 201:
return # Rate limited or other - skip
data = json.loads(create.data)
paste_id = data["id"]
# Immediate delete
delete = client.delete(f"/{paste_id}", headers=auth_header)
assert delete.status_code in (200, 404)
# Verify gone
get = client.get(f"/{paste_id}")
assert get.status_code in (404, 410)
class TestInjectionFuzzing:
"""Fuzz for injection vulnerabilities."""
INJECTION_PAYLOADS: ClassVar[list[str]] = [
"' OR '1'='1",
"'; DROP TABLE pastes; --",
"{{7*7}}",
"${7*7}",
"<%=7*7%>",
"<script>alert(1)</script>",
"{{constructor.constructor('return this')()}}",
"../../../etc/passwd",
"..\\..\\..\\windows\\system32\\config\\sam",
"\x00",
"\r\nX-Injected: header",
"$(whoami)",
"`whoami`",
"; sleep 5",
]
def test_sql_injection_in_content(self, client):
"""SQL injection payloads in content should be stored literally."""
for payload in self.INJECTION_PAYLOADS:
response = client.post(
"/",
data=payload.encode("utf-8", errors="replace"),
content_type="text/plain",
)
if response.status_code == 201:
data = json.loads(response.data)
paste_id = data["id"]
raw = client.get(f"/{paste_id}/raw")
# Content should be stored literally, not executed
assert raw.data.decode("utf-8", errors="replace") == payload
def test_ssti_in_content(self, client):
"""SSTI payloads should not be evaluated."""
for payload in ["{{7*7}}", "${7*7}", "<%=7*7%>"]:
response = client.post(
"/",
data=payload.encode(),
content_type="text/plain",
)
if response.status_code == 201:
data = json.loads(response.data)
paste_id = data["id"]
raw = client.get(f"/{paste_id}/raw")
# Should NOT contain "49" (7*7 evaluated)
assert raw.data == payload.encode()
class TestErrorHandlingFuzzing:
"""Fuzz error handling paths."""
@settings(max_examples=50, suppress_health_check=FIXTURE_HEALTH_CHECKS)
@given(method=st.sampled_from(["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]))
def test_method_handling(self, client, method):
"""All HTTP methods should be handled without crash."""
response = client.open("/", method=method)
# Should never return 500
assert response.status_code != 500
assert response.status_code in (200, 201, 204, 400, 404, 405, 413, 429)
@settings(max_examples=50, suppress_health_check=FIXTURE_HEALTH_CHECKS)
@given(
path=st.text(
min_size=1, max_size=500, alphabet=string.ascii_letters + string.digits + "/-_."
)
)
def test_path_handling(self, client, path):
"""Arbitrary paths should not crash."""
response = client.get(f"/{path}")
# Should never return 500
assert response.status_code != 500
assert response.status_code in (200, 400, 404, 405, 308)