"""Tests for content-hash based abuse prevention.""" import hashlib import time from concurrent.futures import ThreadPoolExecutor, as_completed import pytest from app import create_app from app.database import check_content_hash, cleanup_expired_hashes, get_db class TestContentDedup: """Test content deduplication throttling.""" @pytest.fixture def strict_app(self): """Create app with strict dedup settings for testing.""" app = create_app("testing") app.config["CONTENT_DEDUP_WINDOW"] = 3600 # 1 hour app.config["CONTENT_DEDUP_MAX"] = 3 # max 3 per window return app @pytest.fixture def strict_client(self, strict_app): """Create test client with strict dedup.""" return strict_app.test_client() def test_first_submission_allowed(self, strict_client): """First submission of content should always succeed.""" response = strict_client.post("/", data=b"unique content 1") assert response.status_code == 201 def test_duplicate_within_threshold_allowed(self, strict_client): """Duplicate submissions within threshold should succeed.""" content = b"unique content 2" # First 3 submissions should succeed for i in range(3): response = strict_client.post("/", data=content) assert response.status_code == 201, f"Submission {i + 1} failed" def test_duplicate_exceeds_threshold_rejected(self, strict_client): """Fourth duplicate within window should be rejected.""" content = b"unique content 3" # First 3 succeed for _ in range(3): response = strict_client.post("/", data=content) assert response.status_code == 201 # Fourth should fail with 429 response = strict_client.post("/", data=content) assert response.status_code == 429 data = response.get_json() assert data["error"] == "Duplicate content rate limit exceeded" assert data["count"] == 3 assert "window_seconds" in data def test_different_content_not_affected(self, strict_client): """Different content should not be affected by other dedup limits.""" # Max out one content content1 = b"content type A" for _ in range(3): strict_client.post("/", data=content1) # Different content should still work content2 = b"content type B" response = strict_client.post("/", data=content2) assert response.status_code == 201 def test_dedup_response_format(self, strict_client): """Verify 429 response format for dedup errors.""" content = b"unique content 4" # Exhaust limit for _ in range(3): strict_client.post("/", data=content) response = strict_client.post("/", data=content) assert response.status_code == 429 assert response.content_type == "application/json" data = response.get_json() assert "error" in data assert "count" in data assert "window_seconds" in data class TestContentHashDatabase: """Test content hash database operations.""" @pytest.fixture def app_context(self): """Create app context for database tests.""" app = create_app("testing") app.config["CONTENT_DEDUP_WINDOW"] = 3600 app.config["CONTENT_DEDUP_MAX"] = 3 with app.app_context(): yield app def test_check_content_hash_first_time(self, app_context): """First check for a hash should return allowed with count 1.""" content_hash = hashlib.sha256(b"new content").hexdigest() is_allowed, count = check_content_hash(content_hash) assert is_allowed is True assert count == 1 def test_check_content_hash_increments(self, app_context): """Subsequent checks should increment counter.""" content_hash = hashlib.sha256(b"incrementing content").hexdigest() is_allowed1, count1 = check_content_hash(content_hash) assert is_allowed1 is True assert count1 == 1 is_allowed2, count2 = check_content_hash(content_hash) assert is_allowed2 is True assert count2 == 2 is_allowed3, count3 = check_content_hash(content_hash) assert is_allowed3 is True assert count3 == 3 def test_check_content_hash_threshold(self, app_context): """Check should fail after threshold exceeded.""" content_hash = hashlib.sha256(b"threshold content").hexdigest() # Use up threshold for _ in range(3): check_content_hash(content_hash) # Fourth should fail is_allowed, count = check_content_hash(content_hash) assert is_allowed is False assert count == 3 # Count stays at 3, not incremented def test_hash_record_persists(self, app_context): """Hash records should persist in database.""" content_hash = hashlib.sha256(b"persistent content").hexdigest() check_content_hash(content_hash) # Query database directly db = get_db() row = db.execute( "SELECT hash, count FROM content_hashes WHERE hash = ?", (content_hash,) ).fetchone() assert row is not None assert row["hash"] == content_hash assert row["count"] == 1 class TestContentHashCleanup: """Test cleanup of expired content hashes.""" @pytest.fixture def app_context(self): """Create app context for cleanup tests.""" app = create_app("testing") app.config["CONTENT_DEDUP_WINDOW"] = 1 # 1 second window app.config["CONTENT_DEDUP_MAX"] = 3 with app.app_context(): yield app def test_cleanup_expired_hashes(self, app_context): """Expired hashes should be cleaned up.""" content_hash = hashlib.sha256(b"expiring content").hexdigest() check_content_hash(content_hash) # Wait for expiry (2 seconds to be safe) time.sleep(2) # Cleanup should remove it deleted = cleanup_expired_hashes() assert deleted >= 1 # Verify removed db = get_db() row = db.execute("SELECT * FROM content_hashes WHERE hash = ?", (content_hash,)).fetchone() assert row is None def test_cleanup_keeps_recent(self, app_context): """Recent hashes should not be cleaned up.""" app_context.config["CONTENT_DEDUP_WINDOW"] = 3600 # 1 hour content_hash = hashlib.sha256(b"recent content").hexdigest() check_content_hash(content_hash) # Cleanup should not remove it cleanup_expired_hashes() # Verify still present db = get_db() row = db.execute("SELECT * FROM content_hashes WHERE hash = ?", (content_hash,)).fetchone() assert row is not None class TestWindowReset: """Test that dedup counter resets after window expires.""" def test_counter_resets_after_window(self): """Counter should reset after window expires.""" app = create_app("testing") app.config["CONTENT_DEDUP_WINDOW"] = 1 # 1 second window app.config["CONTENT_DEDUP_MAX"] = 2 with app.app_context(): content_hash = hashlib.sha256(b"resetting content").hexdigest() # Use up threshold check_content_hash(content_hash) check_content_hash(content_hash) # Should be blocked now is_allowed, _ = check_content_hash(content_hash) assert is_allowed is False # Wait for window to expire (2 seconds to be safe) time.sleep(2) # Should be allowed again is_allowed, count = check_content_hash(content_hash) assert is_allowed is True assert count == 1 # Counter reset class TestMinimumSizeEnforcement: """Test minimum paste size requirement.""" @pytest.fixture def minsize_app(self): """Create app with minimum size requirement enabled.""" app = create_app("testing") app.config["MIN_PASTE_SIZE"] = 64 # Require 64 bytes minimum return app @pytest.fixture def minsize_client(self, minsize_app): """Create test client with minimum size requirement.""" return minsize_app.test_client() def test_small_content_rejected(self, minsize_client): """Content below minimum size should be rejected.""" response = minsize_client.post("/", data=b"too small") assert response.status_code == 400 data = response.get_json() assert data["error"] == "Paste too small" assert data["size"] == 9 assert data["min_size"] == 64 assert "hint" in data def test_content_at_minimum_accepted(self, minsize_client): """Content at minimum size should be accepted.""" content = b"x" * 64 response = minsize_client.post("/", data=content) assert response.status_code == 201 def test_content_above_minimum_accepted(self, minsize_client): """Content above minimum size should be accepted.""" content = b"x" * 128 response = minsize_client.post("/", data=content) assert response.status_code == 201 def test_minsize_disabled_by_default(self, client): """Minimum size check should be disabled by default (MIN_PASTE_SIZE=0).""" response = client.post("/", data=b"x") assert response.status_code == 201 class TestBinaryRequirement: """Test binary content requirement (MIME-based encryption enforcement).""" @pytest.fixture def binary_app(self): """Create app with binary requirement enabled.""" app = create_app("testing") app.config["REQUIRE_BINARY"] = True return app @pytest.fixture def binary_client(self, binary_app): """Create test client with binary requirement.""" return binary_app.test_client() def test_plaintext_rejected(self, binary_client): """UTF-8 text should be rejected when binary required.""" response = binary_client.post("/", data=b"Hello, this is plaintext") assert response.status_code == 400 data = response.get_json() assert data["error"] == "Recognizable format not allowed" assert data["detected"] == "text/plain" assert "hint" in data def test_png_accepted_as_binary(self, binary_client): """PNG content accepted as unrecognized binary (magic detection disabled).""" # PNG signature: 89 50 4E 47 0D 0A 1A 0A png_content = b"\x89PNG\r\n\x1a\n" + b"\x00" * 100 response = binary_client.post("/", data=png_content) # With magic detection disabled, PNG bytes are just binary assert response.status_code == 201 data = response.get_json() assert data["mime_type"] == "application/octet-stream" def test_jpeg_accepted_as_binary(self, binary_client): """JPEG content accepted as unrecognized binary (magic detection disabled).""" jpeg_content = b"\xff\xd8\xff" + b"\x00" * 100 response = binary_client.post("/", data=jpeg_content) # With magic detection disabled, JPEG bytes are just binary assert response.status_code == 201 data = response.get_json() assert data["mime_type"] == "application/octet-stream" def test_random_binary_accepted(self, binary_client): """Random binary data (encrypted) should be accepted.""" import os random_data = os.urandom(256) # High entropy, no magic bytes response = binary_client.post("/", data=random_data) assert response.status_code == 201 def test_binary_disabled_by_default(self, client): """Binary requirement should be disabled by default.""" response = client.post("/", data=b"plaintext is fine by default") assert response.status_code == 201 class TestEntropyEnforcement: """Test minimum entropy requirement.""" @pytest.fixture def entropy_app(self): """Create app with entropy requirement enabled.""" app = create_app("testing") app.config["MIN_ENTROPY"] = 6.0 # Require high entropy return app @pytest.fixture def entropy_client(self, entropy_app): """Create test client with entropy requirement.""" return entropy_app.test_client() def test_plaintext_rejected(self, entropy_client): """Plaintext content should be rejected when entropy required.""" # Must be >= MIN_ENTROPY_SIZE (256 bytes) to trigger check plaintext = b"Hello, this is plain English text. " * 10 # ~350 bytes response = entropy_client.post( "/", data=plaintext, content_type="text/plain", ) assert response.status_code == 400 data = response.get_json() assert data["error"] == "Content entropy too low" assert "entropy" in data assert "min_entropy" in data assert "hint" in data def test_random_data_accepted(self, entropy_client): """Random/encrypted data should pass entropy check.""" import os random_data = os.urandom(512) # High entropy random bytes response = entropy_client.post( "/", data=random_data, content_type="application/octet-stream", ) assert response.status_code == 201 def test_entropy_disabled_by_default(self, client, sample_text): """Entropy check should be disabled by default (MIN_ENTROPY=0).""" # Default testing config has MIN_ENTROPY=0 response = client.post( "/", data=sample_text, content_type="text/plain", ) assert response.status_code == 201 def test_repeated_bytes_rejected(self, entropy_client): """Repeated bytes have zero entropy and should be rejected.""" # Must be >= MIN_ENTROPY_SIZE (256 bytes) to trigger check response = entropy_client.post( "/", data=b"a" * 500, content_type="text/plain", ) assert response.status_code == 400 data = response.get_json() assert data["entropy"] == 0.0 def test_small_content_exempt(self, entropy_client): """Small content should be exempt from entropy check.""" # Content < MIN_ENTROPY_SIZE (256 bytes) should pass response = entropy_client.post( "/", data=b"Small plaintext content", content_type="text/plain", ) assert response.status_code == 201 class TestEntropyConfigValidation: """Test entropy config validation and bounds checking.""" def test_min_entropy_clamped_to_valid_range(self): """MIN_ENTROPY should be clamped to [0, 8] range.""" from app.config import Config # Verify clamping logic works (config uses max(0, min(8, value))) assert 0.0 <= Config.MIN_ENTROPY <= 8.0 def test_min_entropy_size_positive(self): """MIN_ENTROPY_SIZE should be at least 1.""" from app.config import Config assert Config.MIN_ENTROPY_SIZE >= 1 class TestConcurrentSubmissions: """Test concurrent identical submissions handling. Note: SQLite with in-memory shared cache has limited concurrency support. These tests verify that: 1. The dedup system doesn't allow MORE than threshold (security) 2. Database integrity is maintained under concurrent load 3. Sequential access properly enforces thresholds Production deployments using file-based SQLite with WAL mode have better concurrent write handling. """ @pytest.fixture def concurrent_app(self): """Create app with strict dedup for concurrency testing.""" app = create_app("testing") app.config["CONTENT_DEDUP_WINDOW"] = 3600 app.config["CONTENT_DEDUP_MAX"] = 5 return app @pytest.mark.skip( reason="SQLite in-memory shared cache has severe concurrent write limitations. " "This test documents expected behavior with file-based SQLite + WAL mode." ) def test_concurrent_identical_submissions_limited(self, concurrent_app): """Concurrent identical submissions should not exceed threshold. Under concurrent load with proper database configuration (file-based SQLite with WAL mode), the dedup system should properly limit concurrent identical submissions. Note: This test is skipped because in-memory SQLite shared cache cannot handle concurrent writes. """ results = [] content = b"concurrent test content xyz" num_threads = 10 def submit_paste(): """Submit paste.""" with concurrent_app.test_client() as client: response = client.post("/", data=content) return response.status_code with ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [executor.submit(submit_paste) for _ in range(num_threads)] results = [f.result() for f in as_completed(futures)] # Count outcomes successes = results.count(201) rejections = results.count(429) errors = results.count(500) # Critical security property: never exceed threshold max_allowed = concurrent_app.config["CONTENT_DEDUP_MAX"] assert successes <= max_allowed, f"Exceeded threshold: {successes} > {max_allowed}" assert successes >= 1, "At least one submission should succeed" assert successes + rejections + errors == num_threads @pytest.mark.skip( reason="SQLite in-memory shared cache has severe concurrent write limitations. " "This test documents expected behavior with file-based SQLite + WAL mode." ) def test_concurrent_different_content_mostly_succeed(self, concurrent_app): """Concurrent submissions of different content should mostly succeed. With proper database configuration, unique content submissions should succeed without blocking each other. Note: This test is skipped because in-memory SQLite shared cache cannot handle concurrent writes. """ results = [] num_threads = 8 def submit_unique_paste(thread_id): """Submit unique content per thread.""" with concurrent_app.test_client() as client: content = f"unique content for thread {thread_id}".encode() response = client.post("/", data=content) return response.status_code with ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [executor.submit(submit_unique_paste, i) for i in range(num_threads)] results = [f.result() for f in as_completed(futures)] # All unique content should succeed successes = results.count(201) assert successes == num_threads, f"Expected {num_threads} successes, got {successes}" def test_sequential_check_content_hash_database_integrity(self, concurrent_app): """Sequential check_content_hash calls should maintain database integrity.""" content_hash = hashlib.sha256(b"sequential db test").hexdigest() max_allowed = concurrent_app.config["CONTENT_DEDUP_MAX"] results = [] with concurrent_app.app_context(): # Sequential checks should work correctly for _ in range(max_allowed + 5): is_allowed, count = check_content_hash(content_hash) results.append((is_allowed, count)) # First max_allowed should succeed for i in range(max_allowed): assert results[i][0] is True, f"Check {i + 1} should be allowed" assert results[i][1] == i + 1, f"Check {i + 1} should have count {i + 1}" # Rest should be denied for i in range(max_allowed, len(results)): assert results[i][0] is False, f"Check {i + 1} should be denied" assert results[i][1] == max_allowed, f"Count should stay at {max_allowed}" # Verify final database state with concurrent_app.app_context(): db = get_db() row = db.execute( "SELECT count FROM content_hashes WHERE hash = ?", (content_hash,) ).fetchone() assert row is not None assert row["count"] == max_allowed def test_rapid_sequential_submissions(self, concurrent_app): """Rapid sequential submissions should properly enforce threshold.""" content = b"rapid sequential content" results = [] with concurrent_app.test_client() as client: for _ in range(10): response = client.post("/", data=content) results.append(response.status_code) successes = results.count(201) rejections = results.count(429) assert successes == 5, f"Expected 5 successes, got {successes}" assert rejections == 5, f"Expected 5 rejections, got {rejections}" # First 5 should succeed, rest should fail assert results[:5] == [201] * 5 assert results[5:] == [429] * 5 def test_staggered_concurrent_submissions(self, concurrent_app): """Staggered concurrent submissions with slight delays.""" results = [] content = b"staggered concurrent content" num_submissions = 10 def submit_with_delay(delay_ms): """Submit paste after small delay.""" time.sleep(delay_ms / 1000) with concurrent_app.test_client() as client: response = client.post("/", data=content) return response.status_code with ThreadPoolExecutor(max_workers=num_submissions) as executor: # Stagger submissions by 10ms each futures = [executor.submit(submit_with_delay, i * 10) for i in range(num_submissions)] results = [f.result() for f in as_completed(futures)] successes = results.count(201) rejections = results.count(429) errors = results.count(500) # With staggered timing, most should complete successfully max_allowed = concurrent_app.config["CONTENT_DEDUP_MAX"] assert successes <= max_allowed, f"Exceeded threshold: {successes} > {max_allowed}" assert successes >= 1, "At least one submission should succeed" # Verify reasonable behavior assert successes + rejections + errors == num_submissions