From c69290af2d9beb789f6ef2396e19bbf5177c6915 Mon Sep 17 00:00:00 2001 From: Username Date: Wed, 18 Feb 2026 08:42:25 +0100 Subject: [PATCH] routes: skip rate limiting for trusted certificate holders --- app/__init__.py | 21 ++++-- app/api/routes.py | 137 +++++++++++++++++++----------------- tests/test_rate_limiting.py | 31 ++++---- 3 files changed, 102 insertions(+), 87 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 902e9f0..228e780 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -226,12 +226,23 @@ def setup_rate_limiting(app: Flask) -> None: from flask_limiter import Limiter from flask_limiter.util import get_remote_address - def is_health_endpoint() -> bool: - """Check if request is to health endpoint (exempt from rate limiting).""" - # Get configured URL prefix (e.g., "/paste") + def is_rate_limit_exempt() -> bool: + """Check if request is exempt from global rate limiting. + + Exempt: health endpoint, trusted certificate holders. + """ prefix = app.config.get("URL_PREFIX", "") health_path = f"{prefix}/health" if prefix else "/health" - return request.path == health_path + if request.path == health_path: + return True + + # Trusted certificate holders bypass rate limiting + try: + from app.api.routes import get_client_id + + return get_client_id() is not None + except Exception: + return False limiter = Limiter( key_func=get_remote_address, @@ -239,7 +250,7 @@ def setup_rate_limiting(app: Flask) -> None: default_limits=["200 per day", "60 per hour"], storage_uri="memory://", strategy="fixed-window", - default_limits_exempt_when=is_health_endpoint, + default_limits_exempt_when=is_rate_limit_exempt, ) # Store limiter on app for use in routes diff --git a/app/api/routes.py b/app/api/routes.py index 2ff9e16..4f5bbf8 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -504,16 +504,18 @@ def validate_paste_id(paste_id: str) -> Response | None: def fetch_paste(paste_id: str, check_password: bool = True) -> Response | None: """Fetch paste and store in g.paste. Returns error response or None if OK.""" # ENUM-001: Rate limit lookups to prevent enumeration attacks - client_ip = get_client_ip() - allowed, retry_after = check_lookup_rate_limit(client_ip) - if not allowed: - response = error_response( - f"Lookup rate limit exceeded. Retry after {retry_after} seconds.", - 429, - retry_after=retry_after, - ) - response.headers["Retry-After"] = str(retry_after) - return response + # Trusted certificate holders are exempt + if not get_client_id(): + client_ip = get_client_ip() + allowed, retry_after = check_lookup_rate_limit(client_ip) + if not allowed: + response = error_response( + f"Lookup rate limit exceeded. Retry after {retry_after} seconds.", + 429, + retry_after=retry_after, + ) + response.headers["Retry-After"] = str(retry_after) + return response db = get_db() now = int(time.time()) @@ -588,16 +590,18 @@ def validate_target_url(url: str) -> Response | None: def fetch_short_url(short_id: str, increment_counter: bool = True) -> Response | None: """Fetch short URL and store in g.short_url. Returns error response or None if OK.""" - client_ip = get_client_ip() - allowed, retry_after = check_lookup_rate_limit(client_ip) - if not allowed: - response = error_response( - f"Lookup rate limit exceeded. Retry after {retry_after} seconds.", - 429, - retry_after=retry_after, - ) - response.headers["Retry-After"] = str(retry_after) - return response + # Trusted certificate holders are exempt from lookup rate limiting + if not get_client_id(): + client_ip = get_client_ip() + allowed, retry_after = check_lookup_rate_limit(client_ip) + if not allowed: + response = error_response( + f"Lookup rate limit exceeded. Retry after {retry_after} seconds.", + 429, + retry_after=retry_after, + ) + response.headers["Retry-After"] = str(retry_after) + return response db = get_db() now = int(time.time()) @@ -983,7 +987,7 @@ class IndexView(MethodView): "authentication": { "anonymous": "Create pastes only (strict limits)", "client_cert": "Create + manage own pastes (strict limits)", - "trusted_cert": "All operations (relaxed limits)", + "trusted_cert": "All operations (no rate limits)", }, "limits": { "anonymous": { @@ -992,7 +996,7 @@ class IndexView(MethodView): }, "trusted": { "max_size": current_app.config["MAX_PASTE_SIZE_AUTH"], - "rate": f"{current_app.config['RATE_LIMIT_MAX'] * current_app.config.get('RATE_LIMIT_AUTH_MULTIPLIER', 5)}/min", # noqa: E501 + "rate": "unlimited", }, }, "pow": { @@ -1036,41 +1040,42 @@ class IndexView(MethodView): trusted_client = get_client_id() # Only trusted certs get elevated limits owner = get_client_fingerprint() # Any cert can own pastes - # Rate limiting (check before expensive operations) + # Rate limiting (trusted certs exempt) client_ip = get_client_ip() - allowed, remaining, limit, reset_timestamp = check_rate_limit( - client_ip, authenticated=bool(trusted_client) - ) - - # Store rate limit info for response headers - g.rate_limit_remaining = remaining - g.rate_limit_limit = limit - g.rate_limit_reset = reset_timestamp - - if not allowed: - current_app.logger.warning( - "Rate limit exceeded: ip=%s trusted=%s", client_ip, bool(trusted_client) + if not trusted_client: + allowed, remaining, limit, reset_timestamp = check_rate_limit( + client_ip, authenticated=False ) - # Audit log rate limit event - if current_app.config.get("AUDIT_ENABLED", True): - from app.audit import AuditEvent, AuditOutcome, log_event - log_event( - AuditEvent.RATE_LIMIT, - AuditOutcome.BLOCKED, - client_id=owner, - client_ip=client_ip, + # Store rate limit info for response headers + g.rate_limit_remaining = remaining + g.rate_limit_limit = limit + g.rate_limit_reset = reset_timestamp + + if not allowed: + current_app.logger.warning( + "Rate limit exceeded: ip=%s", client_ip ) - record_rate_limit("blocked") - retry_after = max(1, reset_timestamp - int(time.time())) - response = error_response( - "Rate limit exceeded", - 429, - retry_after=retry_after, - ) - response.headers["Retry-After"] = str(retry_after) - add_rate_limit_headers(response, 0, limit, reset_timestamp) - return response + # Audit log rate limit event + if current_app.config.get("AUDIT_ENABLED", True): + from app.audit import AuditEvent, AuditOutcome, log_event + + log_event( + AuditEvent.RATE_LIMIT, + AuditOutcome.BLOCKED, + client_id=owner, + client_ip=client_ip, + ) + record_rate_limit("blocked") + retry_after = max(1, reset_timestamp - int(time.time())) + response = error_response( + "Rate limit exceeded", + 429, + retry_after=retry_after, + ) + response.headers["Retry-After"] = str(retry_after) + add_rate_limit_headers(response, 0, limit, reset_timestamp) + return response # Proof-of-work verification (trusted certs exempt) difficulty = current_app.config["POW_DIFFICULTY"] @@ -1994,16 +1999,22 @@ class ShortURLCreateView(MethodView): owner = get_client_fingerprint() client_ip = get_client_ip() - allowed, remaining, limit, reset_timestamp = check_rate_limit( - client_ip, authenticated=bool(trusted_client) - ) - if not allowed: - record_rate_limit("blocked") - retry_after = max(1, reset_timestamp - int(time.time())) - response = error_response("Rate limit exceeded", 429, retry_after=retry_after) - response.headers["Retry-After"] = str(retry_after) - add_rate_limit_headers(response, 0, limit, reset_timestamp) - return response + # Rate limiting (trusted certs exempt) + if not trusted_client: + allowed, remaining, limit, reset_timestamp = check_rate_limit( + client_ip, authenticated=False + ) + if not allowed: + record_rate_limit("blocked") + retry_after = max(1, reset_timestamp - int(time.time())) + response = error_response( + "Rate limit exceeded", 429, retry_after=retry_after + ) + response.headers["Retry-After"] = str(retry_after) + add_rate_limit_headers(response, 0, limit, reset_timestamp) + return response + else: + remaining, limit, reset_timestamp = -1, -1, 0 # Proof-of-work (trusted certs exempt) difficulty = current_app.config["POW_DIFFICULTY"] diff --git a/tests/test_rate_limiting.py b/tests/test_rate_limiting.py index eae9471..c5406e8 100644 --- a/tests/test_rate_limiting.py +++ b/tests/test_rate_limiting.py @@ -92,44 +92,37 @@ class TestRateLimiting: finally: app.config["RATE_LIMIT_MAX"] = original_max - def test_rate_limit_auth_multiplier(self, client, app, auth_header): - """Authenticated users get higher rate limits.""" + def test_trusted_cert_exempt_from_rate_limit(self, client, app, auth_header): + """Trusted certificate holders are exempt from rate limiting.""" original_max = app.config["RATE_LIMIT_MAX"] - original_mult = app.config["RATE_LIMIT_AUTH_MULTIPLIER"] app.config["RATE_LIMIT_MAX"] = 2 - app.config["RATE_LIMIT_AUTH_MULTIPLIER"] = 3 # 2 * 3 = 6 for auth users try: - # Authenticated user can make more requests than base limit + # Trusted client can exceed the base limit without being blocked for i in range(5): response = client.post( "/", - data=f"auth {i}", + data=f"trusted {i}", content_type="text/plain", headers=auth_header, ) assert response.status_code == 201 - # 6th request should succeed (limit is 2*3=6) + # Anonymous user hits the limit + for i in range(2): + client.post( + "/", + data=f"anon {i}", + content_type="text/plain", + ) response = client.post( "/", - data="auth 6", + data="anon overflow", content_type="text/plain", - headers=auth_header, - ) - assert response.status_code == 201 - - # 7th should fail - response = client.post( - "/", - data="auth 7", - content_type="text/plain", - headers=auth_header, ) assert response.status_code == 429 finally: app.config["RATE_LIMIT_MAX"] = original_max - app.config["RATE_LIMIT_AUTH_MULTIPLIER"] = original_mult def test_rate_limit_can_be_disabled(self, client, app): """Rate limiting can be disabled via config."""