allow untrusted certs to manage own pastes

Split authentication into two functions:
- get_client_fingerprint(): Identity for ownership (any cert)
- get_client_id(): Elevated privileges (trusted certs only)

Behavior:
- Anonymous: Create only, strict limits
- Untrusted cert: Create + delete/update/list own pastes, strict limits
- Trusted cert: All operations, relaxed limits (50MB, 5x rate)

Updated tests to reflect new behavior where revoked certs
can still manage their own pastes.
This commit is contained in:
Username
2025-12-21 12:59:18 +01:00
parent 1f09f2686a
commit 098789ff89
3 changed files with 65 additions and 30 deletions

View File

@@ -303,8 +303,14 @@ def fetch_paste(paste_id: str, check_password: bool = True) -> Response | None:
def require_auth() -> Response | None:
"""Check authentication. Returns error response or None if authenticated."""
client_id = get_client_id()
"""Check authentication for ownership operations.
Uses get_client_fingerprint() to allow both trusted and untrusted
certificate holders to manage their own pastes.
Returns error response or None if authenticated.
"""
client_id = get_client_fingerprint()
if not client_id:
return error_response("Authentication required", 401)
g.client_id = client_id
@@ -325,29 +331,51 @@ def is_trusted_proxy() -> bool:
return hmac.compare_digest(expected, provided)
def get_client_id() -> str | None:
"""Extract and validate client certificate fingerprint."""
def get_client_fingerprint() -> str | None:
"""Extract client certificate fingerprint for identity/ownership.
Returns fingerprint regardless of trust status. Used for:
- Paste ownership tracking
- Delete/update/list operations (user manages their own pastes)
Returns None if no valid fingerprint provided or proxy not trusted.
"""
if not is_trusted_proxy():
current_app.logger.warning(
"Auth header ignored: X-Proxy-Secret mismatch from %s", request.remote_addr
)
return None
sha1 = request.headers.get("X-SSL-Client-SHA1", "").strip().lower()
if sha1 and CLIENT_ID_PATTERN.match(sha1):
# Check if PKI is enabled and certificate is revoked
if current_app.config.get("PKI_ENABLED"):
from app.pki import is_certificate_valid
if not is_certificate_valid(sha1):
current_app.logger.warning(
"Auth rejected: certificate revoked or expired: %s", sha1[:12] + "..."
)
return None
return sha1
return None
def get_client_id() -> str | None:
"""Get trusted client certificate fingerprint for elevated privileges.
Returns fingerprint only if certificate is valid and not revoked.
Used for:
- Rate limit benefits (higher limits for trusted users)
- Size limit benefits (larger pastes for trusted users)
Untrusted certificates return None here but still work via
get_client_fingerprint() for ownership operations.
"""
fingerprint = get_client_fingerprint()
if not fingerprint:
return None
# Check if PKI is enabled and certificate is revoked
if current_app.config.get("PKI_ENABLED"):
from app.pki import is_certificate_valid
if not is_certificate_valid(fingerprint):
current_app.logger.warning(
"Elevated auth rejected (revoked/expired): %s", fingerprint[:12] + "..."
)
return None
return fingerprint
# ─────────────────────────────────────────────────────────────────────────────
# Proof-of-Work
# ─────────────────────────────────────────────────────────────────────────────
@@ -597,14 +625,20 @@ class IndexView(MethodView):
if not content:
return error_response("No content provided", 400)
owner = get_client_id()
# Separate trusted (for limits) from fingerprint (for ownership)
trusted_client = get_client_id() # Only trusted certs get elevated limits
owner = get_client_fingerprint() # Any cert can own pastes
# Rate limiting (check before expensive operations)
client_ip = get_client_ip()
allowed, _remaining, reset_seconds = check_rate_limit(client_ip, authenticated=bool(owner))
allowed, _remaining, reset_seconds = check_rate_limit(
client_ip, authenticated=bool(trusted_client)
)
if not allowed:
current_app.logger.warning("Rate limit exceeded: ip=%s auth=%s", client_ip, bool(owner))
current_app.logger.warning(
"Rate limit exceeded: ip=%s trusted=%s", client_ip, bool(trusted_client)
)
response = error_response(
"Rate limit exceeded",
429,
@@ -633,11 +667,11 @@ class IndexView(MethodView):
)
return error_response(f"Proof-of-work failed: {err}", 400)
# Size limits
# Size limits (only trusted clients get elevated limits)
content_size = len(content)
max_size = (
current_app.config["MAX_PASTE_SIZE_AUTH"]
if owner
if trusted_client
else current_app.config["MAX_PASTE_SIZE_ANON"]
)
@@ -647,7 +681,7 @@ class IndexView(MethodView):
413,
size=content_size,
max_size=max_size,
authenticated=owner is not None,
trusted=trusted_client is not None,
)
# Minimum size check (enforces encryption overhead)
@@ -1431,7 +1465,7 @@ class PKICAGenerateView(MethodView):
# Generate CA
try:
days = current_app.config.get("PKI_CA_DAYS", 3650)
owner = get_client_id()
owner = get_client_fingerprint()
ca_info = generate_ca(common_name, password, days=days, owner=owner)
except CAExistsError:
return error_response("CA already exists", 409)
@@ -1511,7 +1545,7 @@ class PKIIssueView(MethodView):
# Issue certificate
try:
days = current_app.config.get("PKI_CERT_DAYS", 365)
issued_to = get_client_id()
issued_to = get_client_fingerprint()
cert_info = issue_certificate(common_name, password, days=days, issued_to=issued_to)
except CANotFoundError:
return error_response("CA not initialized", 404)
@@ -1551,12 +1585,12 @@ class PKICertsView(MethodView):
if err := require_pki_enabled():
return err
client_id = get_client_id()
client_id = get_client_fingerprint()
db = get_db()
# Authenticated users see their own certs or certs they issued
# Anonymous users see nothing
# Users with certificates can see their own certs or certs they issued
# Anonymous users (no cert) see nothing
if client_id:
rows = db.execute(
"""SELECT serial, common_name, fingerprint_sha1,