diff --git a/README.md b/README.md index 978b364..7a2c488 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ A lightweight, secure pastebin REST API built with Flask. - **Client certificate authentication** - Optional auth via `X-SSL-Client-SHA1` header - **Automatic expiry** - Pastes expire after configurable period of inactivity - **Size limits** - Configurable limits for anonymous and authenticated users +- **Abuse prevention** - Content-hash deduplication throttles repeated identical submissions - **Security headers** - HSTS, CSP, X-Frame-Options, Cache-Control, and more - **Request tracing** - X-Request-ID support for log correlation - **Proxy trust validation** - Optional shared secret for defense-in-depth @@ -88,6 +89,8 @@ Configuration via environment variables: | `FLASKPASTE_MAX_ANON` | `3145728` (3 MiB) | Max paste size for anonymous users | | `FLASKPASTE_MAX_AUTH` | `52428800` (50 MiB) | Max paste size for authenticated users | | `FLASKPASTE_EXPIRY` | `432000` (5 days) | Paste expiry in seconds | +| `FLASKPASTE_DEDUP_WINDOW` | `3600` (1 hour) | Dedup throttle window in seconds | +| `FLASKPASTE_DEDUP_MAX` | `3` | Max identical submissions per window | | `FLASKPASTE_PROXY_SECRET` | (empty) | Shared secret for proxy trust validation | ## Authentication @@ -164,6 +167,7 @@ flaskpaste/ - **SQL injection protection** - Parameterized queries throughout - **Ownership enforcement** - Only owners can delete their pastes - **Size limits** - Prevents resource exhaustion attacks +- **Abuse prevention** - Content-hash deduplication prevents spam flooding - **Security headers** - HSTS, CSP, X-Frame-Options, X-Content-Type-Options, Cache-Control - **Request tracing** - X-Request-ID for log correlation and debugging - **Proxy trust** - Optional `X-Proxy-Secret` validation to prevent header spoofing diff --git a/app/api/routes.py b/app/api/routes.py index cec73c3..bdd6881 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -9,7 +9,7 @@ import time from flask import Response, current_app, request from app.api import bp -from app.database import get_db +from app.database import check_content_hash, get_db # Valid paste ID pattern (hexadecimal only) PASTE_ID_PATTERN = re.compile(r"^[a-f0-9]+$") @@ -205,6 +205,22 @@ def create_paste(): "authenticated": owner is not None, }, 413) + # Check content deduplication threshold + content_hash = hashlib.sha256(content).hexdigest() + is_allowed, dedup_count = check_content_hash(content_hash) + + if not is_allowed: + window = current_app.config["CONTENT_DEDUP_WINDOW"] + current_app.logger.warning( + "Dedup threshold exceeded: hash=%s count=%d from=%s", + content_hash[:16], dedup_count, request.remote_addr + ) + return _json_response({ + "error": "Duplicate content rate limit exceeded", + "count": dedup_count, + "window_seconds": window, + }, 429) + paste_id = _generate_id(content) now = int(time.time()) diff --git a/app/config.py b/app/config.py index f586728..15c1a28 100644 --- a/app/config.py +++ b/app/config.py @@ -19,6 +19,11 @@ class Config: # Paste expiry (default 5 days) PASTE_EXPIRY_SECONDS = int(os.environ.get("FLASKPASTE_EXPIRY", 5 * 24 * 60 * 60)) + # Content deduplication / abuse prevention + # Throttle repeated submissions of identical content + CONTENT_DEDUP_WINDOW = int(os.environ.get("FLASKPASTE_DEDUP_WINDOW", 3600)) # 1 hour + CONTENT_DEDUP_MAX = int(os.environ.get("FLASKPASTE_DEDUP_MAX", 3)) # max 3 per window + # Reverse proxy trust configuration # SECURITY: The X-SSL-Client-SHA1 header is trusted for authentication. # This header MUST only come from a trusted reverse proxy that validates @@ -47,6 +52,10 @@ class TestingConfig(Config): TESTING = True DATABASE = ":memory:" + # Relaxed dedup for testing (100 per second window) + CONTENT_DEDUP_WINDOW = 1 + CONTENT_DEDUP_MAX = 100 + config = { "development": DevelopmentConfig, diff --git a/app/database.py b/app/database.py index 4624be3..9c38511 100644 --- a/app/database.py +++ b/app/database.py @@ -19,6 +19,16 @@ CREATE TABLE IF NOT EXISTS pastes ( CREATE INDEX IF NOT EXISTS idx_pastes_created_at ON pastes(created_at); CREATE INDEX IF NOT EXISTS idx_pastes_owner ON pastes(owner); CREATE INDEX IF NOT EXISTS idx_pastes_last_accessed ON pastes(last_accessed); + +-- Content hash tracking for abuse prevention +CREATE TABLE IF NOT EXISTS content_hashes ( + hash TEXT PRIMARY KEY, + first_seen INTEGER NOT NULL, + last_seen INTEGER NOT NULL, + count INTEGER NOT NULL DEFAULT 1 +); + +CREATE INDEX IF NOT EXISTS idx_content_hashes_last_seen ON content_hashes(last_seen); """ # Hold reference for in-memory shared cache databases @@ -88,6 +98,79 @@ def cleanup_expired_pastes() -> int: return cursor.rowcount +def cleanup_expired_hashes() -> int: + """Delete content hashes outside the dedup window. + + Returns number of deleted hashes. + """ + window = current_app.config["CONTENT_DEDUP_WINDOW"] + cutoff = int(time.time()) - window + + db = get_db() + cursor = db.execute("DELETE FROM content_hashes WHERE last_seen < ?", (cutoff,)) + db.commit() + + return cursor.rowcount + + +def check_content_hash(content_hash: str) -> tuple[bool, int]: + """Check if content hash exceeds dedup threshold. + + Args: + content_hash: SHA256 hex digest of content + + Returns: + Tuple of (is_allowed, current_count) + is_allowed is False if threshold exceeded within window + """ + window = current_app.config["CONTENT_DEDUP_WINDOW"] + max_count = current_app.config["CONTENT_DEDUP_MAX"] + now = int(time.time()) + cutoff = now - window + + db = get_db() + + # Check existing hash record + row = db.execute( + "SELECT count, last_seen FROM content_hashes WHERE hash = ?", + (content_hash,) + ).fetchone() + + if row is None: + # First time seeing this content + db.execute( + "INSERT INTO content_hashes (hash, first_seen, last_seen, count) VALUES (?, ?, ?, 1)", + (content_hash, now, now) + ) + db.commit() + return True, 1 + + if row["last_seen"] < cutoff: + # Outside window, reset counter + db.execute( + "UPDATE content_hashes SET first_seen = ?, last_seen = ?, count = 1 WHERE hash = ?", + (now, now, content_hash) + ) + db.commit() + return True, 1 + + # Within window, check threshold + current_count = row["count"] + 1 + + if current_count > max_count: + # Exceeded threshold, don't increment (prevent counter overflow) + return False, row["count"] + + # Update counter + db.execute( + "UPDATE content_hashes SET last_seen = ?, count = ? WHERE hash = ?", + (now, current_count, content_hash) + ) + db.commit() + + return True, current_count + + def init_app(app) -> None: """Register database functions with Flask app.""" app.teardown_appcontext(close_db) diff --git a/documentation/api.md b/documentation/api.md index eae5f0e..965cbbc 100644 --- a/documentation/api.md +++ b/documentation/api.md @@ -129,6 +129,7 @@ Content-Type: application/json |------|-------------| | 400 | No content provided | | 413 | Paste too large | +| 429 | Duplicate content rate limit exceeded | **Size Limits:** - Anonymous: 3 MiB (configurable via `FLASKPASTE_MAX_ANON`) @@ -254,6 +255,41 @@ Pastes expire based on last access time (default: 5 days). --- +## Abuse Prevention + +FlaskPaste includes content-hash based deduplication to prevent spam and abuse. + +**How it works:** +- Each paste's SHA256 content hash is tracked +- Repeated submissions of identical content are throttled +- After exceeding the threshold, further duplicates are rejected with 429 + +**Default limits:** +- Window: 1 hour (`FLASKPASTE_DEDUP_WINDOW`) +- Maximum: 3 identical submissions per window (`FLASKPASTE_DEDUP_MAX`) + +**Response (429 Too Many Requests):** +```json +{ + "error": "Duplicate content rate limit exceeded", + "count": 3, + "window_seconds": 3600 +} +``` + +**Configuration:** +```bash +export FLASKPASTE_DEDUP_WINDOW=3600 # Window in seconds (default: 1 hour) +export FLASKPASTE_DEDUP_MAX=3 # Max duplicates per window (default: 3) +``` + +**Notes:** +- Different content is not affected by other content's limits +- Counter resets after the window expires +- Hash records are cleaned up periodically + +--- + ## Error Response Format All errors return JSON: diff --git a/tests/test_abuse_prevention.py b/tests/test_abuse_prevention.py new file mode 100644 index 0000000..e7acfe8 --- /dev/null +++ b/tests/test_abuse_prevention.py @@ -0,0 +1,233 @@ +"""Tests for content-hash based abuse prevention.""" + +import hashlib +import time + +import pytest + +from app import create_app +from app.database import check_content_hash, cleanup_expired_hashes, get_db + + +class TestContentDedup: + """Test content deduplication throttling.""" + + @pytest.fixture + def strict_app(self): + """Create app with strict dedup settings for testing.""" + app = create_app("testing") + app.config["CONTENT_DEDUP_WINDOW"] = 3600 # 1 hour + app.config["CONTENT_DEDUP_MAX"] = 3 # max 3 per window + return app + + @pytest.fixture + def strict_client(self, strict_app): + """Create test client with strict dedup.""" + return strict_app.test_client() + + def test_first_submission_allowed(self, strict_client): + """First submission of content should always succeed.""" + response = strict_client.post("/", data=b"unique content 1") + assert response.status_code == 201 + + def test_duplicate_within_threshold_allowed(self, strict_client): + """Duplicate submissions within threshold should succeed.""" + content = b"unique content 2" + + # First 3 submissions should succeed + for i in range(3): + response = strict_client.post("/", data=content) + assert response.status_code == 201, f"Submission {i+1} failed" + + def test_duplicate_exceeds_threshold_rejected(self, strict_client): + """Fourth duplicate within window should be rejected.""" + content = b"unique content 3" + + # First 3 succeed + for i in range(3): + response = strict_client.post("/", data=content) + assert response.status_code == 201 + + # Fourth should fail with 429 + response = strict_client.post("/", data=content) + assert response.status_code == 429 + + data = response.get_json() + assert data["error"] == "Duplicate content rate limit exceeded" + assert data["count"] == 3 + assert "window_seconds" in data + + def test_different_content_not_affected(self, strict_client): + """Different content should not be affected by other dedup limits.""" + # Max out one content + content1 = b"content type A" + for _ in range(3): + strict_client.post("/", data=content1) + + # Different content should still work + content2 = b"content type B" + response = strict_client.post("/", data=content2) + assert response.status_code == 201 + + def test_dedup_response_format(self, strict_client): + """Verify 429 response format for dedup errors.""" + content = b"unique content 4" + + # Exhaust limit + for _ in range(3): + strict_client.post("/", data=content) + + response = strict_client.post("/", data=content) + assert response.status_code == 429 + assert response.content_type == "application/json" + + data = response.get_json() + assert "error" in data + assert "count" in data + assert "window_seconds" in data + + +class TestContentHashDatabase: + """Test content hash database operations.""" + + @pytest.fixture + def app_context(self): + """Create app context for database tests.""" + app = create_app("testing") + app.config["CONTENT_DEDUP_WINDOW"] = 3600 + app.config["CONTENT_DEDUP_MAX"] = 3 + with app.app_context(): + yield app + + def test_check_content_hash_first_time(self, app_context): + """First check for a hash should return allowed with count 1.""" + content_hash = hashlib.sha256(b"new content").hexdigest() + is_allowed, count = check_content_hash(content_hash) + + assert is_allowed is True + assert count == 1 + + def test_check_content_hash_increments(self, app_context): + """Subsequent checks should increment counter.""" + content_hash = hashlib.sha256(b"incrementing content").hexdigest() + + is_allowed1, count1 = check_content_hash(content_hash) + assert is_allowed1 is True + assert count1 == 1 + + is_allowed2, count2 = check_content_hash(content_hash) + assert is_allowed2 is True + assert count2 == 2 + + is_allowed3, count3 = check_content_hash(content_hash) + assert is_allowed3 is True + assert count3 == 3 + + def test_check_content_hash_threshold(self, app_context): + """Check should fail after threshold exceeded.""" + content_hash = hashlib.sha256(b"threshold content").hexdigest() + + # Use up threshold + for _ in range(3): + check_content_hash(content_hash) + + # Fourth should fail + is_allowed, count = check_content_hash(content_hash) + assert is_allowed is False + assert count == 3 # Count stays at 3, not incremented + + def test_hash_record_persists(self, app_context): + """Hash records should persist in database.""" + content_hash = hashlib.sha256(b"persistent content").hexdigest() + check_content_hash(content_hash) + + # Query database directly + db = get_db() + row = db.execute( + "SELECT hash, count FROM content_hashes WHERE hash = ?", + (content_hash,) + ).fetchone() + + assert row is not None + assert row["hash"] == content_hash + assert row["count"] == 1 + + +class TestContentHashCleanup: + """Test cleanup of expired content hashes.""" + + @pytest.fixture + def app_context(self): + """Create app context for cleanup tests.""" + app = create_app("testing") + app.config["CONTENT_DEDUP_WINDOW"] = 1 # 1 second window + app.config["CONTENT_DEDUP_MAX"] = 3 + with app.app_context(): + yield app + + def test_cleanup_expired_hashes(self, app_context): + """Expired hashes should be cleaned up.""" + content_hash = hashlib.sha256(b"expiring content").hexdigest() + check_content_hash(content_hash) + + # Wait for expiry (2 seconds to be safe) + time.sleep(2) + + # Cleanup should remove it + deleted = cleanup_expired_hashes() + assert deleted >= 1 + + # Verify removed + db = get_db() + row = db.execute( + "SELECT * FROM content_hashes WHERE hash = ?", + (content_hash,) + ).fetchone() + assert row is None + + def test_cleanup_keeps_recent(self, app_context): + """Recent hashes should not be cleaned up.""" + app_context.config["CONTENT_DEDUP_WINDOW"] = 3600 # 1 hour + + content_hash = hashlib.sha256(b"recent content").hexdigest() + check_content_hash(content_hash) + + # Cleanup should not remove it + deleted = cleanup_expired_hashes() + + # Verify still present + db = get_db() + row = db.execute( + "SELECT * FROM content_hashes WHERE hash = ?", + (content_hash,) + ).fetchone() + assert row is not None + + +class TestWindowReset: + """Test that dedup counter resets after window expires.""" + + def test_counter_resets_after_window(self): + """Counter should reset after window expires.""" + app = create_app("testing") + app.config["CONTENT_DEDUP_WINDOW"] = 1 # 1 second window + app.config["CONTENT_DEDUP_MAX"] = 2 + + with app.app_context(): + content_hash = hashlib.sha256(b"resetting content").hexdigest() + + # Use up threshold + check_content_hash(content_hash) + check_content_hash(content_hash) + + # Should be blocked now + is_allowed, _ = check_content_hash(content_hash) + assert is_allowed is False + + # Wait for window to expire (2 seconds to be safe) + time.sleep(2) + + # Should be allowed again + is_allowed, count = check_content_hash(content_hash) + assert is_allowed is True + assert count == 1 # Counter reset