Files
flaskpaste/app/api/routes.py
Username 8fdeeaed9c
All checks were successful
CI / test (push) Successful in 37s
add proof-of-work spam prevention
Clients must solve a SHA256 hash puzzle before paste creation.
Configurable via FLASKPASTE_POW_DIFFICULTY (0 = disabled, 16 = default).
Challenge tokens expire after FLASKPASTE_POW_TTL seconds (default 300).
2025-12-20 04:03:59 +01:00

484 lines
15 KiB
Python

"""API route handlers."""
import hashlib
import hmac
import json
import os
import re
import secrets
import time
from flask import Response, current_app, request
from app.api import bp
from app.database import check_content_hash, get_db
# Valid paste ID pattern (hexadecimal only)
PASTE_ID_PATTERN = re.compile(r"^[a-f0-9]+$")
# Runtime-generated PoW secret (used if not configured)
_pow_secret_cache = None
# Valid client certificate SHA1 pattern (40 hex chars)
CLIENT_ID_PATTERN = re.compile(r"^[a-f0-9]{40}$")
# Magic bytes for common binary formats
MAGIC_SIGNATURES = {
b"\x89PNG\r\n\x1a\n": "image/png",
b"\xff\xd8\xff": "image/jpeg",
b"GIF87a": "image/gif",
b"GIF89a": "image/gif",
b"RIFF": "image/webp", # WebP (check for WEBP after RIFF)
b"PK\x03\x04": "application/zip",
b"%PDF": "application/pdf",
b"\x1f\x8b": "application/gzip",
}
def _get_pow_secret() -> bytes:
"""Get or generate the PoW signing secret."""
global _pow_secret_cache
configured = current_app.config.get("POW_SECRET", "")
if configured:
return configured.encode()
if _pow_secret_cache is None:
_pow_secret_cache = secrets.token_bytes(32)
return _pow_secret_cache
def _generate_challenge() -> dict:
"""Generate a new PoW challenge."""
difficulty = current_app.config["POW_DIFFICULTY"]
ttl = current_app.config["POW_CHALLENGE_TTL"]
expires = int(time.time()) + ttl
nonce = secrets.token_hex(16)
# Sign the challenge to prevent tampering
msg = f"{nonce}:{expires}:{difficulty}".encode()
sig = hmac.new(_get_pow_secret(), msg, hashlib.sha256).hexdigest()
return {
"nonce": nonce,
"difficulty": difficulty,
"expires": expires,
"signature": sig,
}
def _verify_pow(challenge: str, nonce: str, solution: str) -> tuple[bool, str]:
"""Verify a proof-of-work solution.
Args:
challenge: The challenge nonce from /challenge
nonce: Combined "nonce:expires:difficulty:signature" string
solution: The solution number found by client
Returns:
Tuple of (valid, error_message)
"""
difficulty = current_app.config["POW_DIFFICULTY"]
# PoW disabled
if difficulty == 0:
return True, ""
# Parse challenge components
try:
parts = nonce.split(":")
if len(parts) != 4:
return False, "Invalid challenge format"
ch_nonce, ch_expires, ch_difficulty, ch_sig = parts
ch_expires = int(ch_expires)
ch_difficulty = int(ch_difficulty)
except (ValueError, TypeError):
return False, "Invalid challenge format"
# Verify signature
msg = f"{ch_nonce}:{ch_expires}:{ch_difficulty}".encode()
expected_sig = hmac.new(_get_pow_secret(), msg, hashlib.sha256).hexdigest()
if not hmac.compare_digest(ch_sig, expected_sig):
return False, "Invalid challenge signature"
# Check expiry
if int(time.time()) > ch_expires:
return False, "Challenge expired"
# Verify difficulty matches current config
if ch_difficulty != difficulty:
return False, "Difficulty mismatch"
# Verify solution
try:
solution_int = int(solution)
if solution_int < 0:
return False, "Invalid solution"
except (ValueError, TypeError):
return False, "Invalid solution"
# Check hash meets difficulty requirement
work = f"{ch_nonce}:{solution}".encode()
hash_bytes = hashlib.sha256(work).digest()
# Count leading zero bits
zero_bits = 0
for byte in hash_bytes:
if byte == 0:
zero_bits += 8
else:
# Count leading zeros in this byte
zero_bits += (8 - byte.bit_length())
break
if zero_bits < difficulty:
return False, f"Insufficient work: {zero_bits} < {difficulty} bits"
return True, ""
def _is_valid_paste_id(paste_id: str) -> bool:
"""Validate paste ID format (hexadecimal, correct length)."""
expected_length = current_app.config["PASTE_ID_LENGTH"]
return (
len(paste_id) == expected_length
and PASTE_ID_PATTERN.match(paste_id) is not None
)
def _detect_mime_type(content: bytes, content_type: str | None = None) -> str:
"""Detect MIME type from content bytes, with magic byte detection taking priority."""
# Check magic bytes first - most reliable method
for magic, mime in MAGIC_SIGNATURES.items():
if content.startswith(magic):
# Special case for WebP (RIFF....WEBP)
if magic == b"RIFF" and len(content) >= 12:
if content[8:12] != b"WEBP":
continue
return mime
# Trust explicit Content-Type if it's specific (not generic defaults)
generic_types = {
"application/octet-stream",
"application/x-www-form-urlencoded",
"text/plain",
}
if content_type:
mime = content_type.split(";")[0].strip().lower()
if mime not in generic_types:
# Sanitize: only allow safe characters in MIME type
if re.match(r"^[a-z0-9][a-z0-9!#$&\-^_.+]*\/[a-z0-9][a-z0-9!#$&\-^_.+]*$", mime):
return mime
# Try to decode as UTF-8 text
try:
content.decode("utf-8")
return "text/plain"
except UnicodeDecodeError:
return "application/octet-stream"
def _generate_id(content: bytes) -> str:
"""Generate a short unique ID from content hash and timestamp."""
data = content + str(time.time_ns()).encode()
length = current_app.config["PASTE_ID_LENGTH"]
return hashlib.sha256(data).hexdigest()[:length]
def _json_response(data: dict, status: int = 200) -> Response:
"""Create a JSON response with proper encoding and security headers."""
response = Response(
json.dumps(data, ensure_ascii=False),
status=status,
mimetype="application/json",
)
return response
def _is_trusted_proxy() -> bool:
"""Verify request comes from a trusted reverse proxy.
If TRUSTED_PROXY_SECRET is configured, the request must include a matching
X-Proxy-Secret header. This provides defense-in-depth against header spoofing
if an attacker bypasses the reverse proxy.
Returns True if no secret is configured (backwards compatible) or if the
secret matches.
"""
expected_secret = current_app.config.get("TRUSTED_PROXY_SECRET", "")
if not expected_secret:
# No secret configured - trust all requests (backwards compatible)
return True
# Constant-time comparison to prevent timing attacks
provided_secret = request.headers.get("X-Proxy-Secret", "")
return hmac.compare_digest(expected_secret, provided_secret)
def _get_client_id() -> str | None:
"""Extract and validate client identity from X-SSL-Client-SHA1 header.
Returns lowercase SHA1 fingerprint or None if not present/invalid.
SECURITY: The X-SSL-Client-SHA1 header is only trusted if the request
comes from a trusted proxy (verified via X-Proxy-Secret if configured).
"""
# Verify request comes from trusted proxy before trusting auth headers
if not _is_trusted_proxy():
current_app.logger.warning(
"Auth header ignored: X-Proxy-Secret mismatch from %s",
request.remote_addr
)
return None
client_sha1 = request.headers.get("X-SSL-Client-SHA1", "").strip().lower()
# Validate format: must be 40 hex characters (SHA1)
if client_sha1 and CLIENT_ID_PATTERN.match(client_sha1):
return client_sha1
return None
@bp.route("/health", methods=["GET"])
def health():
"""Health check endpoint for load balancers and monitoring."""
try:
db = get_db()
db.execute("SELECT 1")
return _json_response({"status": "healthy", "database": "ok"})
except Exception:
return _json_response({"status": "unhealthy", "database": "error"}, 503)
@bp.route("/challenge", methods=["GET"])
def challenge():
"""Get a proof-of-work challenge for paste creation."""
difficulty = current_app.config["POW_DIFFICULTY"]
if difficulty == 0:
return _json_response({"enabled": False, "difficulty": 0})
ch = _generate_challenge()
return _json_response({
"enabled": True,
"nonce": ch["nonce"],
"difficulty": ch["difficulty"],
"expires": ch["expires"],
"token": f"{ch['nonce']}:{ch['expires']}:{ch['difficulty']}:{ch['signature']}",
})
@bp.route("/", methods=["GET", "POST"])
def index():
"""Handle API info (GET) and paste creation (POST)."""
if request.method == "POST":
return create_paste()
return _json_response(
{
"name": "FlaskPaste",
"version": "1.0.0",
"endpoints": {
"GET /": "API information",
"GET /health": "Health check",
"POST /": "Create paste",
"GET /<id>": "Retrieve paste metadata",
"GET /<id>/raw": "Retrieve raw paste content",
"DELETE /<id>": "Delete paste",
},
"usage": {
"raw": "curl --data-binary @file.txt http://host/",
"pipe": "cat file.txt | curl --data-binary @- http://host/",
"json": "curl -H 'Content-Type: application/json' -d '{\"content\":\"...\"}' http://host/",
},
"note": "Use --data-binary (not -d) to preserve newlines",
}
)
def create_paste():
"""Create a new paste from request body."""
content: bytes | None = None
mime_type: str | None = None
if request.is_json:
data = request.get_json(silent=True)
if data and isinstance(data.get("content"), str):
content = data["content"].encode("utf-8")
mime_type = "text/plain"
else:
content = request.get_data(as_text=False)
if content:
mime_type = _detect_mime_type(content, request.content_type)
if not content:
return _json_response({"error": "No content provided"}, 400)
owner = _get_client_id()
# Verify proof-of-work (if enabled)
difficulty = current_app.config["POW_DIFFICULTY"]
if difficulty > 0:
pow_token = request.headers.get("X-PoW-Token", "")
pow_solution = request.headers.get("X-PoW-Solution", "")
if not pow_token or not pow_solution:
return _json_response({
"error": "Proof-of-work required",
"hint": "GET /challenge for a new challenge",
}, 400)
# Extract nonce from token for verification
parts = pow_token.split(":")
pow_nonce = parts[0] if parts else ""
valid, err = _verify_pow(pow_nonce, pow_token, pow_solution)
if not valid:
current_app.logger.warning(
"PoW verification failed: %s from=%s",
err, request.remote_addr
)
return _json_response({"error": f"Proof-of-work failed: {err}"}, 400)
# Enforce size limits based on authentication
content_size = len(content)
if owner:
max_size = current_app.config["MAX_PASTE_SIZE_AUTH"]
else:
max_size = current_app.config["MAX_PASTE_SIZE_ANON"]
if content_size > max_size:
return _json_response({
"error": "Paste too large",
"size": content_size,
"max_size": max_size,
"authenticated": owner is not None,
}, 413)
# Check content deduplication threshold
content_hash = hashlib.sha256(content).hexdigest()
is_allowed, dedup_count = check_content_hash(content_hash)
if not is_allowed:
window = current_app.config["CONTENT_DEDUP_WINDOW"]
current_app.logger.warning(
"Dedup threshold exceeded: hash=%s count=%d from=%s",
content_hash[:16], dedup_count, request.remote_addr
)
return _json_response({
"error": "Duplicate content rate limit exceeded",
"count": dedup_count,
"window_seconds": window,
}, 429)
paste_id = _generate_id(content)
now = int(time.time())
db = get_db()
db.execute(
"INSERT INTO pastes (id, content, mime_type, owner, created_at, last_accessed) VALUES (?, ?, ?, ?, ?, ?)",
(paste_id, content, mime_type, owner, now, now),
)
db.commit()
response_data = {
"id": paste_id,
"url": f"/{paste_id}",
"raw": f"/{paste_id}/raw",
"mime_type": mime_type,
"created_at": now,
}
if owner:
response_data["owner"] = owner
return _json_response(response_data, 201)
@bp.route("/<paste_id>", methods=["GET", "HEAD"])
def get_paste(paste_id: str):
"""Retrieve paste metadata by ID. HEAD returns headers only."""
if not _is_valid_paste_id(paste_id):
return _json_response({"error": "Invalid paste ID"}, 400)
db = get_db()
now = int(time.time())
# Update last_accessed and return paste in one transaction
db.execute(
"UPDATE pastes SET last_accessed = ? WHERE id = ?", (now, paste_id)
)
row = db.execute(
"SELECT id, mime_type, created_at, length(content) as size FROM pastes WHERE id = ?",
(paste_id,)
).fetchone()
db.commit()
if row is None:
return _json_response({"error": "Paste not found"}, 404)
return _json_response({
"id": row["id"],
"mime_type": row["mime_type"],
"size": row["size"],
"created_at": row["created_at"],
"raw": f"/{paste_id}/raw",
})
@bp.route("/<paste_id>/raw", methods=["GET", "HEAD"])
def get_paste_raw(paste_id: str):
"""Retrieve raw paste content with correct MIME type. HEAD returns headers only."""
if not _is_valid_paste_id(paste_id):
return _json_response({"error": "Invalid paste ID"}, 400)
db = get_db()
now = int(time.time())
# Update last_accessed and return paste in one transaction
db.execute(
"UPDATE pastes SET last_accessed = ? WHERE id = ?", (now, paste_id)
)
row = db.execute(
"SELECT content, mime_type FROM pastes WHERE id = ?", (paste_id,)
).fetchone()
db.commit()
if row is None:
return _json_response({"error": "Paste not found"}, 404)
mime_type = row["mime_type"]
response = Response(row["content"], mimetype=mime_type)
# Display inline for images and text, let browser decide for others
if mime_type.startswith(("image/", "text/")):
response.headers["Content-Disposition"] = "inline"
return response
@bp.route("/<paste_id>", methods=["DELETE"])
def delete_paste(paste_id: str):
"""Delete a paste by ID. Requires ownership via X-SSL-Client-SHA1 header."""
if not _is_valid_paste_id(paste_id):
return _json_response({"error": "Invalid paste ID"}, 400)
client_id = _get_client_id()
if not client_id:
return _json_response({"error": "Authentication required"}, 401)
db = get_db()
# Check paste exists and verify ownership
row = db.execute(
"SELECT owner FROM pastes WHERE id = ?", (paste_id,)
).fetchone()
if row is None:
return _json_response({"error": "Paste not found"}, 404)
if row["owner"] != client_id:
return _json_response({"error": "Permission denied"}, 403)
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
db.commit()
return _json_response({"message": "Paste deleted"})