"""Hypothesis-based fuzzing tests for FlaskPaste API.""" import json import string from typing import ClassVar import pytest from hypothesis import HealthCheck, given, settings from hypothesis import strategies as st # Common health check suppressions for pytest fixtures FIXTURE_HEALTH_CHECKS = [HealthCheck.function_scoped_fixture, HealthCheck.too_slow] # Strategies for generating test data PRINTABLE = string.printable HEX_CHARS = string.hexdigits.lower() # Custom strategies paste_content = st.binary(min_size=0, max_size=50000) text_content = st.text(min_size=0, max_size=10000, alphabet=st.characters()) unicode_content = st.text( min_size=1, max_size=5000, alphabet=st.characters( whitelist_categories=("L", "N", "P", "S", "Z"), blacklist_characters="\x00", ), ) paste_id = st.text(min_size=1, max_size=50, alphabet=HEX_CHARS + "/<>\"'%;()&+-_.") header_value = st.text( min_size=0, max_size=500, alphabet=PRINTABLE.replace("\r", "").replace("\n", "") ) json_primitive = st.one_of( st.text(), st.integers(), st.floats(allow_nan=False), st.none(), st.booleans() ) class TestContentFuzzing: """Fuzz paste content handling.""" @settings(max_examples=200, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given(content=paste_content) def test_binary_content_no_crash(self, client, content): """Binary content should never crash the server.""" response = client.post( "/", data=content, content_type="application/octet-stream", ) assert response.status_code in (201, 400, 413, 429, 503) @settings(max_examples=200, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given(content=text_content) def test_text_content_no_crash(self, client, content): """Text content should never crash the server.""" response = client.post( "/", data=content.encode("utf-8", errors="replace"), content_type="text/plain", ) assert response.status_code in (201, 400, 413, 429, 503) @settings(max_examples=100, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given(content=unicode_content) def test_unicode_roundtrip(self, client, content): """Unicode content should survive storage and retrieval.""" response = client.post( "/", data=content.encode("utf-8"), content_type="text/plain; charset=utf-8", ) if response.status_code == 201: data = json.loads(response.data) paste_id = data["id"] raw = client.get(f"/{paste_id}/raw") assert raw.status_code == 200 # Content should match (may have charset normalization) assert raw.data.decode("utf-8") == content class TestPasteIdFuzzing: """Fuzz paste ID handling.""" @settings(max_examples=300, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given(paste_id=paste_id) def test_malformed_paste_id(self, client, paste_id): """Malformed paste IDs should return 400 or 404, never crash.""" try: response = client.get(f"/{paste_id}") except UnicodeError: # Some malformed paths cause IDNA encoding issues in URL normalization # This is expected behavior for invalid paths return # Should never return 500 assert response.status_code != 500 # Valid responses: 200 (found), 400 (invalid), 404 (not found), 308 (redirect) assert response.status_code in (200, 400, 404, 308) @settings(max_examples=200, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given(paste_id=st.text(min_size=1, max_size=100, alphabet="/<>\"'%;()&+\\")) def test_special_chars_in_id(self, client, paste_id): """Special characters in paste ID should be handled safely.""" response = client.get(f"/{paste_id}") assert response.status_code in (200, 400, 404, 405, 308) # Verify response is valid JSON or HTML if response.content_type and "json" in response.content_type: try: json.loads(response.data) except json.JSONDecodeError: pytest.fail("Invalid JSON response") class TestHeaderFuzzing: """Fuzz HTTP header handling.""" @settings(max_examples=150, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given(sha1=header_value) def test_auth_header_fuzz(self, client, sha1): """Arbitrary X-SSL-Client-SHA1 values should not crash.""" response = client.get("/", headers={"X-SSL-Client-SHA1": sha1}) assert response.status_code in (200, 400, 401, 403) @settings(max_examples=150, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given(secret=header_value) def test_proxy_secret_fuzz(self, client, secret): """Arbitrary X-Proxy-Secret values should not crash.""" response = client.get("/", headers={"X-Proxy-Secret": secret}) assert response.status_code in (200, 400, 401, 403) @settings(max_examples=100, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given(xff=st.lists(st.ip_addresses(), min_size=1, max_size=10)) def test_xff_chain_fuzz(self, client, xff): """X-Forwarded-For chains should be handled safely.""" xff_header = ", ".join(str(ip) for ip in xff) response = client.get("/", headers={"X-Forwarded-For": xff_header}) assert response.status_code in (200, 400, 429) @settings(max_examples=100, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given(request_id=header_value) def test_request_id_fuzz(self, client, request_id): """Arbitrary X-Request-ID values should be echoed safely.""" response = client.get("/", headers={"X-Request-ID": request_id}) assert response.status_code == 200 # Should echo back or generate new one assert response.headers.get("X-Request-ID") is not None class TestMimeTypeFuzzing: """Fuzz MIME type handling.""" @settings(max_examples=150, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given( mime=st.text( min_size=1, max_size=200, alphabet=PRINTABLE.replace("\r", "").replace("\n", "") ) ) def test_arbitrary_content_type(self, client, mime): """Arbitrary Content-Type headers should not crash.""" try: response = client.post( "/", data=b"test content", content_type=mime, ) assert response.status_code in (201, 400, 413, 429, 503) except UnicodeEncodeError: # Some strings can't be encoded as headers - that's fine pass class TestMimeDetectionFuzzing: """Property-based tests for MIME magic byte detection.""" # Known magic signatures mapped to expected MIME types MAGIC_SIGNATURES: ClassVar[list[tuple[bytes, str]]] = [ (b"\x89PNG\r\n\x1a\n", "image/png"), (b"\xff\xd8\xff", "image/jpeg"), (b"GIF87a", "image/gif"), (b"GIF89a", "image/gif"), (b"%PDF", "application/pdf"), (b"PK\x03\x04", "application/zip"), (b"\x1f\x8b", "application/gzip"), (b"fLaC", "audio/flac"), (b"OggS", "audio/ogg"), (b"ID3", "audio/mpeg"), (b"\x7fELF", "application/x-executable"), (b"MZ", "application/x-msdownload"), (b"BZh", "application/x-bzip2"), (b"7z\xbc\xaf\x27\x1c", "application/x-7z-compressed"), (b"SQLite format 3\x00", "application/x-sqlite3"), # HEIC/HEIF/AVIF (ftyp box format) (b"\x00\x00\x00\x18\x66\x74\x79\x70\x68\x65\x69\x63", "image/heic"), (b"\x00\x00\x00\x18\x66\x74\x79\x70\x6d\x69\x66\x31", "image/heif"), (b"\x00\x00\x00\x1c\x66\x74\x79\x70\x61\x76\x69\x66", "image/avif"), ] @settings(max_examples=100, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given(suffix=st.binary(min_size=0, max_size=1000)) def test_magic_prefix_detection(self, client, suffix): """Magic bytes followed by arbitrary data should detect correctly.""" for magic, expected_mime in self.MAGIC_SIGNATURES: content = magic + suffix response = client.post( "/", data=content, content_type="application/octet-stream", ) if response.status_code == 201: data = json.loads(response.data) assert data["mime_type"] == expected_mime, ( f"Expected {expected_mime} for magic {magic!r}, got {data['mime_type']}" ) @settings(max_examples=200, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given(content=st.binary(min_size=1, max_size=5000)) def test_random_binary_never_crashes(self, client, content): """Random binary content should never crash MIME detection.""" response = client.post( "/", data=content, content_type="application/octet-stream", ) assert response.status_code in (201, 400, 413, 429, 503) if response.status_code == 201: data = json.loads(response.data) # MIME type should always be a valid format assert "/" in data["mime_type"] assert len(data["mime_type"]) < 100 @settings(max_examples=100, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given( magic=st.sampled_from([m for m, _ in MAGIC_SIGNATURES]), truncate=st.integers(min_value=1, max_value=10), ) def test_partial_magic_no_false_match(self, client, magic, truncate): """Truncated magic bytes should not produce false positive matches.""" if truncate >= len(magic): return # Skip if we'd use full magic partial = magic[:truncate] # Add random suffix that's clearly not the rest of the magic content = partial + b"\xff\xfe\xfd\xfc" * 10 response = client.post( "/", data=content, content_type="application/octet-stream", ) # Partial magic should not crash - may match different signature or fallback assert response.status_code in (201, 400, 413, 429, 503) @settings(max_examples=50, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given( content=st.binary(min_size=100, max_size=1000), inject_pos=st.integers(min_value=20, max_value=80), ) def test_magic_not_at_start_ignored(self, client, content, inject_pos): """Magic bytes not at offset 0 should not trigger detection.""" # Inject PNG magic in middle of random data png_magic = b"\x89PNG\r\n\x1a\n" if inject_pos < len(content): modified = content[:inject_pos] + png_magic + content[inject_pos:] response = client.post( "/", data=modified, content_type="application/octet-stream", ) if response.status_code == 201: data = json.loads(response.data) # Should NOT detect as PNG (magic not at start) assert data["mime_type"] != "image/png" class TestJsonFuzzing: """Fuzz JSON endpoint handling.""" @settings(max_examples=100, suppress_health_check=FIXTURE_HEALTH_CHECKS, deadline=None) @given( data=st.dictionaries( st.text(min_size=1, max_size=50), json_primitive, min_size=0, max_size=20, ) ) def test_pki_ca_json_fuzz(self, client, data): """PKI CA endpoint should handle arbitrary JSON.""" response = client.post( "/pki/ca", data=json.dumps(data), content_type="application/json", ) # Should return structured response, not crash # 201 = CA created, 200 = already exists, 400 = bad request, 404 = PKI disabled assert response.status_code in (200, 201, 400, 401, 403, 404, 409) assert response.status_code != 500 @settings(max_examples=100, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given( data=st.recursive( json_primitive, lambda children: st.lists(children, max_size=5) | st.dictionaries(st.text(max_size=10), children, max_size=5), max_leaves=20, ) ) def test_nested_json_fuzz(self, client, data): """Deeply nested JSON should not cause stack overflow.""" try: payload = json.dumps({"data": data}) except (ValueError, RecursionError): return # Can't serialize - skip response = client.post( "/pki/issue", data=payload, content_type="application/json", ) assert response.status_code != 500 or "error" in response.data.decode() class TestSizeLimitFuzzing: """Fuzz size limit enforcement.""" @settings( max_examples=50, suppress_health_check=[*FIXTURE_HEALTH_CHECKS, HealthCheck.data_too_large], ) @given(size=st.integers(min_value=0, max_value=2_000_000)) def test_size_boundary(self, client, size): """Size limits should be enforced consistently.""" # Generate content of specific size (capped to avoid memory issues) actual_size = min(size, 500_000) content = b"x" * actual_size response = client.post( "/", data=content, content_type="text/plain", ) # Should reject if too large, accept if within limits assert response.status_code in (201, 400, 413, 429, 503) class TestConcurrencyFuzzing: """Fuzz concurrent access patterns.""" @settings(max_examples=20, suppress_health_check=FIXTURE_HEALTH_CHECKS, deadline=None) @given(content=st.binary(min_size=10, max_size=1000)) def test_rapid_create_delete(self, client, content, auth_header): """Rapid create/delete should not corrupt state.""" # Create create = client.post( "/", data=content, content_type="application/octet-stream", headers=auth_header, ) if create.status_code != 201: return # Rate limited or other - skip data = json.loads(create.data) paste_id = data["id"] # Immediate delete delete = client.delete(f"/{paste_id}", headers=auth_header) assert delete.status_code in (200, 404) # Verify gone get = client.get(f"/{paste_id}") assert get.status_code in (404, 410) class TestInjectionFuzzing: """Fuzz for injection vulnerabilities.""" INJECTION_PAYLOADS: ClassVar[list[str]] = [ "' OR '1'='1", "'; DROP TABLE pastes; --", "{{7*7}}", "${7*7}", "<%=7*7%>", "", "{{constructor.constructor('return this')()}}", "../../../etc/passwd", "..\\..\\..\\windows\\system32\\config\\sam", "\x00", "\r\nX-Injected: header", "$(whoami)", "`whoami`", "; sleep 5", ] def test_sql_injection_in_content(self, client): """SQL injection payloads in content should be stored literally.""" for payload in self.INJECTION_PAYLOADS: response = client.post( "/", data=payload.encode("utf-8", errors="replace"), content_type="text/plain", ) if response.status_code == 201: data = json.loads(response.data) paste_id = data["id"] raw = client.get(f"/{paste_id}/raw") # Content should be stored literally, not executed assert raw.data.decode("utf-8", errors="replace") == payload def test_ssti_in_content(self, client): """SSTI payloads should not be evaluated.""" for payload in ["{{7*7}}", "${7*7}", "<%=7*7%>"]: response = client.post( "/", data=payload.encode(), content_type="text/plain", ) if response.status_code == 201: data = json.loads(response.data) paste_id = data["id"] raw = client.get(f"/{paste_id}/raw") # Should NOT contain "49" (7*7 evaluated) assert raw.data == payload.encode() class TestErrorHandlingFuzzing: """Fuzz error handling paths.""" @settings(max_examples=50, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given(method=st.sampled_from(["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])) def test_method_handling(self, client, method): """All HTTP methods should be handled without crash.""" response = client.open("/", method=method) # Should never return 500 assert response.status_code != 500 assert response.status_code in (200, 201, 204, 400, 404, 405, 413, 429) @settings(max_examples=50, suppress_health_check=FIXTURE_HEALTH_CHECKS) @given( path=st.text( min_size=1, max_size=500, alphabet=string.ascii_letters + string.digits + "/-_." ) ) def test_path_handling(self, client, path): """Arbitrary paths should not crash.""" response = client.get(f"/{path}") # Should never return 500 assert response.status_code != 500 assert response.status_code in (200, 400, 404, 405, 308)