Files
flaskpaste/tests/test_abuse_prevention.py
Username 202e927918 add content-hash dedup for abuse prevention
Throttle repeated submissions of identical content using SHA256 hash
tracking. Configurable via FLASKPASTE_DEDUP_WINDOW and FLASKPASTE_DEDUP_MAX.
2025-12-20 03:31:20 +01:00

234 lines
7.7 KiB
Python

"""Tests for content-hash based abuse prevention."""
import hashlib
import time
import pytest
from app import create_app
from app.database import check_content_hash, cleanup_expired_hashes, get_db
class TestContentDedup:
"""Test content deduplication throttling."""
@pytest.fixture
def strict_app(self):
"""Create app with strict dedup settings for testing."""
app = create_app("testing")
app.config["CONTENT_DEDUP_WINDOW"] = 3600 # 1 hour
app.config["CONTENT_DEDUP_MAX"] = 3 # max 3 per window
return app
@pytest.fixture
def strict_client(self, strict_app):
"""Create test client with strict dedup."""
return strict_app.test_client()
def test_first_submission_allowed(self, strict_client):
"""First submission of content should always succeed."""
response = strict_client.post("/", data=b"unique content 1")
assert response.status_code == 201
def test_duplicate_within_threshold_allowed(self, strict_client):
"""Duplicate submissions within threshold should succeed."""
content = b"unique content 2"
# First 3 submissions should succeed
for i in range(3):
response = strict_client.post("/", data=content)
assert response.status_code == 201, f"Submission {i+1} failed"
def test_duplicate_exceeds_threshold_rejected(self, strict_client):
"""Fourth duplicate within window should be rejected."""
content = b"unique content 3"
# First 3 succeed
for i in range(3):
response = strict_client.post("/", data=content)
assert response.status_code == 201
# Fourth should fail with 429
response = strict_client.post("/", data=content)
assert response.status_code == 429
data = response.get_json()
assert data["error"] == "Duplicate content rate limit exceeded"
assert data["count"] == 3
assert "window_seconds" in data
def test_different_content_not_affected(self, strict_client):
"""Different content should not be affected by other dedup limits."""
# Max out one content
content1 = b"content type A"
for _ in range(3):
strict_client.post("/", data=content1)
# Different content should still work
content2 = b"content type B"
response = strict_client.post("/", data=content2)
assert response.status_code == 201
def test_dedup_response_format(self, strict_client):
"""Verify 429 response format for dedup errors."""
content = b"unique content 4"
# Exhaust limit
for _ in range(3):
strict_client.post("/", data=content)
response = strict_client.post("/", data=content)
assert response.status_code == 429
assert response.content_type == "application/json"
data = response.get_json()
assert "error" in data
assert "count" in data
assert "window_seconds" in data
class TestContentHashDatabase:
"""Test content hash database operations."""
@pytest.fixture
def app_context(self):
"""Create app context for database tests."""
app = create_app("testing")
app.config["CONTENT_DEDUP_WINDOW"] = 3600
app.config["CONTENT_DEDUP_MAX"] = 3
with app.app_context():
yield app
def test_check_content_hash_first_time(self, app_context):
"""First check for a hash should return allowed with count 1."""
content_hash = hashlib.sha256(b"new content").hexdigest()
is_allowed, count = check_content_hash(content_hash)
assert is_allowed is True
assert count == 1
def test_check_content_hash_increments(self, app_context):
"""Subsequent checks should increment counter."""
content_hash = hashlib.sha256(b"incrementing content").hexdigest()
is_allowed1, count1 = check_content_hash(content_hash)
assert is_allowed1 is True
assert count1 == 1
is_allowed2, count2 = check_content_hash(content_hash)
assert is_allowed2 is True
assert count2 == 2
is_allowed3, count3 = check_content_hash(content_hash)
assert is_allowed3 is True
assert count3 == 3
def test_check_content_hash_threshold(self, app_context):
"""Check should fail after threshold exceeded."""
content_hash = hashlib.sha256(b"threshold content").hexdigest()
# Use up threshold
for _ in range(3):
check_content_hash(content_hash)
# Fourth should fail
is_allowed, count = check_content_hash(content_hash)
assert is_allowed is False
assert count == 3 # Count stays at 3, not incremented
def test_hash_record_persists(self, app_context):
"""Hash records should persist in database."""
content_hash = hashlib.sha256(b"persistent content").hexdigest()
check_content_hash(content_hash)
# Query database directly
db = get_db()
row = db.execute(
"SELECT hash, count FROM content_hashes WHERE hash = ?",
(content_hash,)
).fetchone()
assert row is not None
assert row["hash"] == content_hash
assert row["count"] == 1
class TestContentHashCleanup:
"""Test cleanup of expired content hashes."""
@pytest.fixture
def app_context(self):
"""Create app context for cleanup tests."""
app = create_app("testing")
app.config["CONTENT_DEDUP_WINDOW"] = 1 # 1 second window
app.config["CONTENT_DEDUP_MAX"] = 3
with app.app_context():
yield app
def test_cleanup_expired_hashes(self, app_context):
"""Expired hashes should be cleaned up."""
content_hash = hashlib.sha256(b"expiring content").hexdigest()
check_content_hash(content_hash)
# Wait for expiry (2 seconds to be safe)
time.sleep(2)
# Cleanup should remove it
deleted = cleanup_expired_hashes()
assert deleted >= 1
# Verify removed
db = get_db()
row = db.execute(
"SELECT * FROM content_hashes WHERE hash = ?",
(content_hash,)
).fetchone()
assert row is None
def test_cleanup_keeps_recent(self, app_context):
"""Recent hashes should not be cleaned up."""
app_context.config["CONTENT_DEDUP_WINDOW"] = 3600 # 1 hour
content_hash = hashlib.sha256(b"recent content").hexdigest()
check_content_hash(content_hash)
# Cleanup should not remove it
deleted = cleanup_expired_hashes()
# Verify still present
db = get_db()
row = db.execute(
"SELECT * FROM content_hashes WHERE hash = ?",
(content_hash,)
).fetchone()
assert row is not None
class TestWindowReset:
"""Test that dedup counter resets after window expires."""
def test_counter_resets_after_window(self):
"""Counter should reset after window expires."""
app = create_app("testing")
app.config["CONTENT_DEDUP_WINDOW"] = 1 # 1 second window
app.config["CONTENT_DEDUP_MAX"] = 2
with app.app_context():
content_hash = hashlib.sha256(b"resetting content").hexdigest()
# Use up threshold
check_content_hash(content_hash)
check_content_hash(content_hash)
# Should be blocked now
is_allowed, _ = check_content_hash(content_hash)
assert is_allowed is False
# Wait for window to expire (2 seconds to be safe)
time.sleep(2)
# Should be allowed again
is_allowed, count = check_content_hash(content_hash)
assert is_allowed is True
assert count == 1 # Counter reset