routes: skip rate limiting for trusted certificate holders

This commit is contained in:
Username
2026-02-18 08:42:25 +01:00
parent 283f87b9c4
commit c69290af2d
3 changed files with 102 additions and 87 deletions

View File

@@ -504,16 +504,18 @@ def validate_paste_id(paste_id: str) -> Response | None:
def fetch_paste(paste_id: str, check_password: bool = True) -> Response | None:
"""Fetch paste and store in g.paste. Returns error response or None if OK."""
# ENUM-001: Rate limit lookups to prevent enumeration attacks
client_ip = get_client_ip()
allowed, retry_after = check_lookup_rate_limit(client_ip)
if not allowed:
response = error_response(
f"Lookup rate limit exceeded. Retry after {retry_after} seconds.",
429,
retry_after=retry_after,
)
response.headers["Retry-After"] = str(retry_after)
return response
# Trusted certificate holders are exempt
if not get_client_id():
client_ip = get_client_ip()
allowed, retry_after = check_lookup_rate_limit(client_ip)
if not allowed:
response = error_response(
f"Lookup rate limit exceeded. Retry after {retry_after} seconds.",
429,
retry_after=retry_after,
)
response.headers["Retry-After"] = str(retry_after)
return response
db = get_db()
now = int(time.time())
@@ -588,16 +590,18 @@ def validate_target_url(url: str) -> Response | None:
def fetch_short_url(short_id: str, increment_counter: bool = True) -> Response | None:
"""Fetch short URL and store in g.short_url. Returns error response or None if OK."""
client_ip = get_client_ip()
allowed, retry_after = check_lookup_rate_limit(client_ip)
if not allowed:
response = error_response(
f"Lookup rate limit exceeded. Retry after {retry_after} seconds.",
429,
retry_after=retry_after,
)
response.headers["Retry-After"] = str(retry_after)
return response
# Trusted certificate holders are exempt from lookup rate limiting
if not get_client_id():
client_ip = get_client_ip()
allowed, retry_after = check_lookup_rate_limit(client_ip)
if not allowed:
response = error_response(
f"Lookup rate limit exceeded. Retry after {retry_after} seconds.",
429,
retry_after=retry_after,
)
response.headers["Retry-After"] = str(retry_after)
return response
db = get_db()
now = int(time.time())
@@ -983,7 +987,7 @@ class IndexView(MethodView):
"authentication": {
"anonymous": "Create pastes only (strict limits)",
"client_cert": "Create + manage own pastes (strict limits)",
"trusted_cert": "All operations (relaxed limits)",
"trusted_cert": "All operations (no rate limits)",
},
"limits": {
"anonymous": {
@@ -992,7 +996,7 @@ class IndexView(MethodView):
},
"trusted": {
"max_size": current_app.config["MAX_PASTE_SIZE_AUTH"],
"rate": f"{current_app.config['RATE_LIMIT_MAX'] * current_app.config.get('RATE_LIMIT_AUTH_MULTIPLIER', 5)}/min", # noqa: E501
"rate": "unlimited",
},
},
"pow": {
@@ -1036,41 +1040,42 @@ class IndexView(MethodView):
trusted_client = get_client_id() # Only trusted certs get elevated limits
owner = get_client_fingerprint() # Any cert can own pastes
# Rate limiting (check before expensive operations)
# Rate limiting (trusted certs exempt)
client_ip = get_client_ip()
allowed, remaining, limit, reset_timestamp = check_rate_limit(
client_ip, authenticated=bool(trusted_client)
)
# Store rate limit info for response headers
g.rate_limit_remaining = remaining
g.rate_limit_limit = limit
g.rate_limit_reset = reset_timestamp
if not allowed:
current_app.logger.warning(
"Rate limit exceeded: ip=%s trusted=%s", client_ip, bool(trusted_client)
if not trusted_client:
allowed, remaining, limit, reset_timestamp = check_rate_limit(
client_ip, authenticated=False
)
# Audit log rate limit event
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.RATE_LIMIT,
AuditOutcome.BLOCKED,
client_id=owner,
client_ip=client_ip,
# Store rate limit info for response headers
g.rate_limit_remaining = remaining
g.rate_limit_limit = limit
g.rate_limit_reset = reset_timestamp
if not allowed:
current_app.logger.warning(
"Rate limit exceeded: ip=%s", client_ip
)
record_rate_limit("blocked")
retry_after = max(1, reset_timestamp - int(time.time()))
response = error_response(
"Rate limit exceeded",
429,
retry_after=retry_after,
)
response.headers["Retry-After"] = str(retry_after)
add_rate_limit_headers(response, 0, limit, reset_timestamp)
return response
# Audit log rate limit event
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.RATE_LIMIT,
AuditOutcome.BLOCKED,
client_id=owner,
client_ip=client_ip,
)
record_rate_limit("blocked")
retry_after = max(1, reset_timestamp - int(time.time()))
response = error_response(
"Rate limit exceeded",
429,
retry_after=retry_after,
)
response.headers["Retry-After"] = str(retry_after)
add_rate_limit_headers(response, 0, limit, reset_timestamp)
return response
# Proof-of-work verification (trusted certs exempt)
difficulty = current_app.config["POW_DIFFICULTY"]
@@ -1994,16 +1999,22 @@ class ShortURLCreateView(MethodView):
owner = get_client_fingerprint()
client_ip = get_client_ip()
allowed, remaining, limit, reset_timestamp = check_rate_limit(
client_ip, authenticated=bool(trusted_client)
)
if not allowed:
record_rate_limit("blocked")
retry_after = max(1, reset_timestamp - int(time.time()))
response = error_response("Rate limit exceeded", 429, retry_after=retry_after)
response.headers["Retry-After"] = str(retry_after)
add_rate_limit_headers(response, 0, limit, reset_timestamp)
return response
# Rate limiting (trusted certs exempt)
if not trusted_client:
allowed, remaining, limit, reset_timestamp = check_rate_limit(
client_ip, authenticated=False
)
if not allowed:
record_rate_limit("blocked")
retry_after = max(1, reset_timestamp - int(time.time()))
response = error_response(
"Rate limit exceeded", 429, retry_after=retry_after
)
response.headers["Retry-After"] = str(retry_after)
add_rate_limit_headers(response, 0, limit, reset_timestamp)
return response
else:
remaining, limit, reset_timestamp = -1, -1, 0
# Proof-of-work (trusted certs exempt)
difficulty = current_app.config["POW_DIFFICULTY"]