"""Tests for content-hash based abuse prevention.""" import hashlib import time import pytest from app import create_app from app.database import check_content_hash, cleanup_expired_hashes, get_db class TestContentDedup: """Test content deduplication throttling.""" @pytest.fixture def strict_app(self): """Create app with strict dedup settings for testing.""" app = create_app("testing") app.config["CONTENT_DEDUP_WINDOW"] = 3600 # 1 hour app.config["CONTENT_DEDUP_MAX"] = 3 # max 3 per window return app @pytest.fixture def strict_client(self, strict_app): """Create test client with strict dedup.""" return strict_app.test_client() def test_first_submission_allowed(self, strict_client): """First submission of content should always succeed.""" response = strict_client.post("/", data=b"unique content 1") assert response.status_code == 201 def test_duplicate_within_threshold_allowed(self, strict_client): """Duplicate submissions within threshold should succeed.""" content = b"unique content 2" # First 3 submissions should succeed for i in range(3): response = strict_client.post("/", data=content) assert response.status_code == 201, f"Submission {i+1} failed" def test_duplicate_exceeds_threshold_rejected(self, strict_client): """Fourth duplicate within window should be rejected.""" content = b"unique content 3" # First 3 succeed for i in range(3): response = strict_client.post("/", data=content) assert response.status_code == 201 # Fourth should fail with 429 response = strict_client.post("/", data=content) assert response.status_code == 429 data = response.get_json() assert data["error"] == "Duplicate content rate limit exceeded" assert data["count"] == 3 assert "window_seconds" in data def test_different_content_not_affected(self, strict_client): """Different content should not be affected by other dedup limits.""" # Max out one content content1 = b"content type A" for _ in range(3): strict_client.post("/", data=content1) # Different content should still work content2 = b"content type B" response = strict_client.post("/", data=content2) assert response.status_code == 201 def test_dedup_response_format(self, strict_client): """Verify 429 response format for dedup errors.""" content = b"unique content 4" # Exhaust limit for _ in range(3): strict_client.post("/", data=content) response = strict_client.post("/", data=content) assert response.status_code == 429 assert response.content_type == "application/json" data = response.get_json() assert "error" in data assert "count" in data assert "window_seconds" in data class TestContentHashDatabase: """Test content hash database operations.""" @pytest.fixture def app_context(self): """Create app context for database tests.""" app = create_app("testing") app.config["CONTENT_DEDUP_WINDOW"] = 3600 app.config["CONTENT_DEDUP_MAX"] = 3 with app.app_context(): yield app def test_check_content_hash_first_time(self, app_context): """First check for a hash should return allowed with count 1.""" content_hash = hashlib.sha256(b"new content").hexdigest() is_allowed, count = check_content_hash(content_hash) assert is_allowed is True assert count == 1 def test_check_content_hash_increments(self, app_context): """Subsequent checks should increment counter.""" content_hash = hashlib.sha256(b"incrementing content").hexdigest() is_allowed1, count1 = check_content_hash(content_hash) assert is_allowed1 is True assert count1 == 1 is_allowed2, count2 = check_content_hash(content_hash) assert is_allowed2 is True assert count2 == 2 is_allowed3, count3 = check_content_hash(content_hash) assert is_allowed3 is True assert count3 == 3 def test_check_content_hash_threshold(self, app_context): """Check should fail after threshold exceeded.""" content_hash = hashlib.sha256(b"threshold content").hexdigest() # Use up threshold for _ in range(3): check_content_hash(content_hash) # Fourth should fail is_allowed, count = check_content_hash(content_hash) assert is_allowed is False assert count == 3 # Count stays at 3, not incremented def test_hash_record_persists(self, app_context): """Hash records should persist in database.""" content_hash = hashlib.sha256(b"persistent content").hexdigest() check_content_hash(content_hash) # Query database directly db = get_db() row = db.execute( "SELECT hash, count FROM content_hashes WHERE hash = ?", (content_hash,) ).fetchone() assert row is not None assert row["hash"] == content_hash assert row["count"] == 1 class TestContentHashCleanup: """Test cleanup of expired content hashes.""" @pytest.fixture def app_context(self): """Create app context for cleanup tests.""" app = create_app("testing") app.config["CONTENT_DEDUP_WINDOW"] = 1 # 1 second window app.config["CONTENT_DEDUP_MAX"] = 3 with app.app_context(): yield app def test_cleanup_expired_hashes(self, app_context): """Expired hashes should be cleaned up.""" content_hash = hashlib.sha256(b"expiring content").hexdigest() check_content_hash(content_hash) # Wait for expiry (2 seconds to be safe) time.sleep(2) # Cleanup should remove it deleted = cleanup_expired_hashes() assert deleted >= 1 # Verify removed db = get_db() row = db.execute( "SELECT * FROM content_hashes WHERE hash = ?", (content_hash,) ).fetchone() assert row is None def test_cleanup_keeps_recent(self, app_context): """Recent hashes should not be cleaned up.""" app_context.config["CONTENT_DEDUP_WINDOW"] = 3600 # 1 hour content_hash = hashlib.sha256(b"recent content").hexdigest() check_content_hash(content_hash) # Cleanup should not remove it deleted = cleanup_expired_hashes() # Verify still present db = get_db() row = db.execute( "SELECT * FROM content_hashes WHERE hash = ?", (content_hash,) ).fetchone() assert row is not None class TestWindowReset: """Test that dedup counter resets after window expires.""" def test_counter_resets_after_window(self): """Counter should reset after window expires.""" app = create_app("testing") app.config["CONTENT_DEDUP_WINDOW"] = 1 # 1 second window app.config["CONTENT_DEDUP_MAX"] = 2 with app.app_context(): content_hash = hashlib.sha256(b"resetting content").hexdigest() # Use up threshold check_content_hash(content_hash) check_content_hash(content_hash) # Should be blocked now is_allowed, _ = check_content_hash(content_hash) assert is_allowed is False # Wait for window to expire (2 seconds to be safe) time.sleep(2) # Should be allowed again is_allowed, count = check_content_hash(content_hash) assert is_allowed is True assert count == 1 # Counter reset