Files
flaskpaste/app/api/routes.py
Username 2ccbfcbfaa ci: update linting and security checks
- Fix bandit suppressions (use # nosec B608 for bandit)
- Add # noqa: S608 for ruff compatibility
- CI workflow: add coverage reporting (informational)
- CI workflow: track mypy error baseline
- CI workflow: improve documentation
2025-12-21 13:39:30 +01:00

1749 lines
64 KiB
Python

"""API route handlers using modern Flask patterns."""
from __future__ import annotations
import hashlib
import hmac
import json
import math
import re
import secrets
import threading
import time
from collections import defaultdict
from typing import TYPE_CHECKING, Any
from flask import Response, current_app, g, request
from flask.views import MethodView
from app.api import bp
from app.config import VERSION
from app.database import check_content_hash, get_db, hash_password, verify_password
if TYPE_CHECKING:
from sqlite3 import Row
# Compiled patterns for validation
PASTE_ID_PATTERN = re.compile(r"^[a-f0-9]+$")
CLIENT_ID_PATTERN = re.compile(r"^[a-f0-9]{40}$")
MIME_PATTERN = re.compile(r"^[a-z0-9][a-z0-9!#$&\-^_.+]*/[a-z0-9][a-z0-9!#$&\-^_.+]*$")
# Magic bytes for binary format detection
MAGIC_SIGNATURES: dict[bytes, str] = {
b"\x89PNG\r\n\x1a\n": "image/png",
b"\xff\xd8\xff": "image/jpeg",
b"GIF87a": "image/gif",
b"GIF89a": "image/gif",
b"RIFF": "image/webp",
b"PK\x03\x04": "application/zip",
b"%PDF": "application/pdf",
b"\x1f\x8b": "application/gzip",
}
# Generic MIME types to override with detection
GENERIC_MIME_TYPES = frozenset(
{
"application/octet-stream",
"application/x-www-form-urlencoded",
"text/plain",
}
)
# Runtime PoW secret cache
_pow_secret_cache: bytes | None = None
# ─────────────────────────────────────────────────────────────────────────────
# Anti-flood: dynamic PoW difficulty adjustment
# ─────────────────────────────────────────────────────────────────────────────
_antiflood_lock = threading.Lock()
_antiflood_requests: list[float] = [] # Global request timestamps
_antiflood_difficulty: int = 0 # Current difficulty boost (added to base)
_antiflood_last_increase: float = 0 # Last time difficulty was increased
def get_dynamic_difficulty() -> int:
"""Get current PoW difficulty including anti-flood adjustment."""
base = current_app.config["POW_DIFFICULTY"]
if base == 0 or not current_app.config.get("ANTIFLOOD_ENABLED", True):
return base
with _antiflood_lock:
return min(base + _antiflood_difficulty, current_app.config["ANTIFLOOD_MAX"])
def record_antiflood_request() -> None:
"""Record a request for anti-flood tracking and adjust difficulty."""
if not current_app.config.get("ANTIFLOOD_ENABLED", True):
return
if current_app.config["POW_DIFFICULTY"] == 0:
return
global _antiflood_difficulty, _antiflood_last_increase
now = time.time()
window = current_app.config["ANTIFLOOD_WINDOW"]
threshold = current_app.config["ANTIFLOOD_THRESHOLD"]
step = current_app.config["ANTIFLOOD_STEP"]
max_diff = current_app.config["ANTIFLOOD_MAX"]
decay = current_app.config["ANTIFLOOD_DECAY"]
base = current_app.config["POW_DIFFICULTY"]
with _antiflood_lock:
# Clean old requests
cutoff = now - window
_antiflood_requests[:] = [t for t in _antiflood_requests if t > cutoff]
# Record this request
_antiflood_requests.append(now)
count = len(_antiflood_requests)
# Check if we should increase difficulty
if count > threshold:
# Increase difficulty if not already at max
if base + _antiflood_difficulty < max_diff:
_antiflood_difficulty += step
_antiflood_last_increase = now
elif _antiflood_difficulty > 0 and (now - _antiflood_last_increase) > decay:
# Decay difficulty if abuse has stopped
_antiflood_difficulty = max(0, _antiflood_difficulty - step)
_antiflood_last_increase = now # Reset timer
def reset_antiflood() -> None:
"""Reset anti-flood state (for testing)."""
global _antiflood_difficulty, _antiflood_last_increase
with _antiflood_lock:
_antiflood_requests.clear()
_antiflood_difficulty = 0
_antiflood_last_increase = 0
# ─────────────────────────────────────────────────────────────────────────────
# Rate Limiting (in-memory sliding window)
# ─────────────────────────────────────────────────────────────────────────────
_rate_limit_lock = threading.Lock()
_rate_limit_requests: dict[str, list[float]] = defaultdict(list)
def get_client_ip() -> str:
"""Get client IP address, respecting X-Forwarded-For from trusted proxy."""
if is_trusted_proxy():
forwarded = request.headers.get("X-Forwarded-For", "")
if forwarded:
# Take the first (client) IP from the chain
return forwarded.split(",")[0].strip()
return request.remote_addr or "unknown"
def check_rate_limit(client_ip: str, authenticated: bool = False) -> tuple[bool, int, int]:
"""Check if request is within rate limit.
Args:
client_ip: Client IP address
authenticated: Whether client is authenticated (higher limits)
Returns:
Tuple of (allowed, remaining, reset_seconds)
"""
if not current_app.config.get("RATE_LIMIT_ENABLED", True):
return True, -1, 0
window = current_app.config["RATE_LIMIT_WINDOW"]
max_requests = current_app.config["RATE_LIMIT_MAX"]
if authenticated:
max_requests *= current_app.config.get("RATE_LIMIT_AUTH_MULTIPLIER", 5)
now = time.time()
cutoff = now - window
with _rate_limit_lock:
# Clean old requests and get current list
requests = _rate_limit_requests[client_ip]
requests[:] = [t for t in requests if t > cutoff]
current_count = len(requests)
if current_count >= max_requests:
# Calculate reset time (when oldest request expires)
reset_at = int(requests[0] + window - now) + 1 if requests else window
return False, 0, reset_at
# Record this request
requests.append(now)
remaining = max_requests - len(requests)
return True, remaining, window
def cleanup_rate_limits(window: int | None = None) -> int:
"""Remove expired rate limit entries. Returns count of cleaned entries.
Args:
window: Rate limit window in seconds. If None, uses app config.
"""
# This should be called periodically (e.g., via cleanup task)
if window is None:
window = current_app.config.get("RATE_LIMIT_WINDOW", 60)
cutoff = time.time() - window
cleaned = 0
with _rate_limit_lock:
to_remove = []
for ip, requests in _rate_limit_requests.items():
requests[:] = [t for t in requests if t > cutoff]
if not requests:
to_remove.append(ip)
for ip in to_remove:
del _rate_limit_requests[ip]
cleaned += 1
return cleaned
def reset_rate_limits() -> None:
"""Clear all rate limit state. For testing only."""
with _rate_limit_lock:
_rate_limit_requests.clear()
# ─────────────────────────────────────────────────────────────────────────────
# Response Helpers
# ─────────────────────────────────────────────────────────────────────────────
def json_response(data: dict[str, Any], status: int = 200) -> Response:
"""Create JSON response with proper encoding."""
return Response(
json.dumps(data, ensure_ascii=False),
status=status,
mimetype="application/json",
)
def error_response(message: str, status: int, **extra: Any) -> Response:
"""Create standardized error response."""
data = {"error": message, **extra}
return json_response(data, status)
# ─────────────────────────────────────────────────────────────────────────────
# URL Helpers
# ─────────────────────────────────────────────────────────────────────────────
def url_prefix() -> str:
"""Get configured URL prefix for reverse proxy deployments."""
return current_app.config.get("URL_PREFIX", "")
def prefixed_url(path: str) -> str:
"""Generate URL with configured prefix."""
return f"{url_prefix()}{path}"
def base_url() -> str:
"""Detect full base URL from request headers."""
scheme = (
request.headers.get("X-Forwarded-Proto")
or request.headers.get("X-Scheme")
or request.scheme
)
host = request.headers.get("X-Forwarded-Host") or request.headers.get("Host") or request.host
return f"{scheme}://{host}{url_prefix()}"
# ─────────────────────────────────────────────────────────────────────────────
# Validation Helpers (used within views)
# ─────────────────────────────────────────────────────────────────────────────
def validate_paste_id(paste_id: str) -> Response | None:
"""Validate paste ID format. Returns error response or None if valid."""
expected_length = current_app.config["PASTE_ID_LENGTH"]
if len(paste_id) != expected_length or not PASTE_ID_PATTERN.match(paste_id):
return error_response("Invalid paste ID", 400)
return None
def fetch_paste(paste_id: str, check_password: bool = True) -> Response | None:
"""Fetch paste and store in g.paste. Returns error response or None if OK."""
db = get_db()
now = int(time.time())
# Update access time
db.execute("UPDATE pastes SET last_accessed = ? WHERE id = ?", (now, paste_id))
row = db.execute(
"""SELECT id, content, mime_type, owner, created_at,
length(content) as size, burn_after_read, expires_at, password_hash
FROM pastes WHERE id = ?""",
(paste_id,),
).fetchone()
if row is None:
db.commit()
return error_response("Paste not found", 404)
# Password verification
if check_password and row["password_hash"]:
provided = request.headers.get("X-Paste-Password", "")
if not provided:
db.commit()
return error_response("Password required", 401, password_protected=True)
if not verify_password(provided, row["password_hash"]):
db.commit()
return error_response("Invalid password", 403)
g.paste = row
g.db = db
return None
def require_auth() -> Response | None:
"""Check authentication for ownership operations.
Uses get_client_fingerprint() to allow both trusted and untrusted
certificate holders to manage their own pastes.
Returns error response or None if authenticated.
"""
client_id = get_client_fingerprint()
if not client_id:
return error_response("Authentication required", 401)
g.client_id = client_id
return None
# ─────────────────────────────────────────────────────────────────────────────
# Authentication & Security
# ─────────────────────────────────────────────────────────────────────────────
def is_trusted_proxy() -> bool:
"""Verify request comes from trusted reverse proxy via shared secret."""
expected = current_app.config.get("TRUSTED_PROXY_SECRET", "")
if not expected:
return True
provided = request.headers.get("X-Proxy-Secret", "")
return hmac.compare_digest(expected, provided)
def get_client_fingerprint() -> str | None:
"""Extract client certificate fingerprint for identity/ownership.
Returns fingerprint regardless of trust status. Used for:
- Paste ownership tracking
- Delete/update/list operations (user manages their own pastes)
Returns None if no valid fingerprint provided or proxy not trusted.
"""
if not is_trusted_proxy():
return None
sha1 = request.headers.get("X-SSL-Client-SHA1", "").strip().lower()
if sha1 and CLIENT_ID_PATTERN.match(sha1):
return sha1
return None
def get_client_id() -> str | None:
"""Get trusted client certificate fingerprint for elevated privileges.
Returns fingerprint only if certificate is valid and not revoked.
Used for:
- Rate limit benefits (higher limits for trusted users)
- Size limit benefits (larger pastes for trusted users)
Untrusted certificates return None here but still work via
get_client_fingerprint() for ownership operations.
"""
fingerprint = get_client_fingerprint()
if not fingerprint:
return None
# Check if PKI is enabled and certificate is revoked
if current_app.config.get("PKI_ENABLED"):
from app.pki import is_certificate_valid
if not is_certificate_valid(fingerprint):
current_app.logger.warning(
"Elevated auth rejected (revoked/expired): %s", fingerprint[:12] + "..."
)
return None
return fingerprint
# ─────────────────────────────────────────────────────────────────────────────
# Proof-of-Work
# ─────────────────────────────────────────────────────────────────────────────
def get_pow_secret() -> bytes:
"""Get or generate PoW signing secret."""
global _pow_secret_cache
configured = current_app.config.get("POW_SECRET", "")
if configured:
return configured.encode()
if _pow_secret_cache is None:
_pow_secret_cache = secrets.token_bytes(32)
return _pow_secret_cache
def generate_challenge(difficulty_override: int | None = None) -> dict[str, Any]:
"""Generate new PoW challenge with signed token.
Uses dynamic difficulty which may be elevated during high load,
unless difficulty_override is specified.
Args:
difficulty_override: Optional fixed difficulty (for registration)
"""
if difficulty_override is not None:
difficulty = difficulty_override
else:
difficulty = get_dynamic_difficulty()
ttl = current_app.config["POW_CHALLENGE_TTL"]
expires = int(time.time()) + ttl
nonce = secrets.token_hex(16)
msg = f"{nonce}:{expires}:{difficulty}".encode()
sig = hmac.new(get_pow_secret(), msg, hashlib.sha256).hexdigest()
return {
"nonce": nonce,
"difficulty": difficulty,
"expires": expires,
"token": f"{nonce}:{expires}:{difficulty}:{sig}",
}
def verify_pow(token: str, solution: str, min_difficulty: int | None = None) -> tuple[bool, str]:
"""Verify proof-of-work solution. Returns (valid, error_message).
Accepts tokens with difficulty >= min_difficulty. The solution must meet the
token's embedded difficulty (which may be elevated due to anti-flood).
Args:
token: PoW challenge token
solution: Nonce solution
min_difficulty: Minimum required difficulty (defaults to POW_DIFFICULTY)
"""
base_difficulty = current_app.config["POW_DIFFICULTY"]
if base_difficulty == 0 and min_difficulty is None:
return True, ""
required_difficulty = min_difficulty if min_difficulty is not None else base_difficulty
if required_difficulty == 0:
return True, ""
# Parse token
try:
parts = token.split(":")
if len(parts) != 4:
return False, "Invalid challenge format"
nonce, expires_str, diff_str, sig = parts
expires = int(expires_str)
token_diff = int(diff_str)
except (ValueError, TypeError):
return False, "Invalid challenge format"
# Verify signature
msg = f"{nonce}:{expires}:{token_diff}".encode()
expected_sig = hmac.new(get_pow_secret(), msg, hashlib.sha256).hexdigest()
if not hmac.compare_digest(sig, expected_sig):
return False, "Invalid challenge signature"
# Check expiry
if int(time.time()) > expires:
return False, "Challenge expired"
# Token difficulty must be at least the required difficulty
if token_diff < required_difficulty:
return False, f"Difficulty too low: {token_diff} < {required_difficulty}"
# Verify solution
try:
solution_int = int(solution)
if solution_int < 0:
return False, "Invalid solution"
except (ValueError, TypeError):
return False, "Invalid solution"
# Check hash meets the token's difficulty (not current dynamic difficulty)
work = f"{nonce}:{solution}".encode()
hash_bytes = hashlib.sha256(work).digest()
zero_bits = 0
for byte in hash_bytes:
if byte == 0:
zero_bits += 8
else:
zero_bits += 8 - byte.bit_length()
break
if zero_bits < token_diff:
return False, f"Insufficient work: {zero_bits} < {token_diff} bits"
return True, ""
# ─────────────────────────────────────────────────────────────────────────────
# Content Processing
# ─────────────────────────────────────────────────────────────────────────────
def calculate_entropy(data: bytes) -> float:
"""Calculate Shannon entropy in bits per byte (0-8 range)."""
if not data:
return 0.0
freq = [0] * 256
for byte in data:
freq[byte] += 1
length = len(data)
entropy = 0.0
for count in freq:
if count > 0:
p = count / length
entropy -= p * math.log2(p)
return entropy
def detect_mime_type(content: bytes, content_type: str | None = None) -> str:
"""Detect MIME type using magic bytes, headers, or content analysis."""
# Magic byte detection (highest priority)
for magic, mime in MAGIC_SIGNATURES.items():
if content.startswith(magic):
# RIFF container: verify WEBP subtype
if magic == b"RIFF" and len(content) >= 12 and content[8:12] != b"WEBP":
continue
return mime
# Explicit Content-Type (if specific)
if content_type:
mime = content_type.split(";")[0].strip().lower()
if mime not in GENERIC_MIME_TYPES and MIME_PATTERN.match(mime):
return mime
# UTF-8 text detection
try:
content.decode("utf-8")
return "text/plain"
except UnicodeDecodeError:
return "application/octet-stream"
def is_recognizable_format(content: bytes) -> tuple[bool, str | None]:
"""Check if content is a recognizable (likely unencrypted) format.
Returns (is_recognizable, detected_format).
Used to enforce encryption by rejecting known formats.
"""
# Check magic bytes
for magic, mime in MAGIC_SIGNATURES.items():
if content.startswith(magic):
if magic == b"RIFF" and len(content) >= 12 and content[8:12] != b"WEBP":
continue
return True, mime
# Check if valid UTF-8 text (plaintext)
try:
content.decode("utf-8")
return True, "text/plain"
except UnicodeDecodeError:
pass
return False, None
def generate_paste_id(content: bytes) -> str:
"""Generate unique paste ID from content hash and timestamp."""
data = content + str(time.time_ns()).encode()
length = current_app.config["PASTE_ID_LENGTH"]
return hashlib.sha256(data).hexdigest()[:length]
# ─────────────────────────────────────────────────────────────────────────────
# Class-Based Views
# ─────────────────────────────────────────────────────────────────────────────
class IndexView(MethodView):
"""Handle API info and paste creation."""
def get(self) -> Response:
"""Return API information and usage examples."""
prefix = url_prefix() or "/"
url = base_url()
difficulty = get_dynamic_difficulty()
pki_enabled = current_app.config.get("PKI_ENABLED", False)
# Build endpoints dict
endpoints: dict[str, str] = {
f"GET {prefixed_url('/')}": "API information",
f"GET {prefixed_url('/health')}": "Health check",
f"GET {prefixed_url('/client')}": "Download CLI client (fpaste)",
f"GET {prefixed_url('/challenge')}": "Get proof-of-work challenge",
f"POST {prefixed_url('/')}": "Create paste (PoW required)",
f"GET {prefixed_url('/pastes')}": "List your pastes (cert required)",
f"GET {prefixed_url('/<id>')}": "Get paste metadata",
f"GET {prefixed_url('/<id>/raw')}": "Get raw paste content",
f"PUT {prefixed_url('/<id>')}": "Update paste (owner only)",
f"DELETE {prefixed_url('/<id>')}": "Delete paste (owner only)",
f"GET {prefixed_url('/register/challenge')}": "Get registration challenge",
f"POST {prefixed_url('/register')}": "Register for client certificate",
}
if pki_enabled:
endpoints.update(
{
f"GET {prefixed_url('/pki')}": "PKI status",
f"GET {prefixed_url('/pki/ca.crt')}": "Download CA certificate",
f"POST {prefixed_url('/pki/issue')}": "Issue client certificate",
f"GET {prefixed_url('/pki/certs')}": "List certificates",
f"POST {prefixed_url('/pki/revoke/<serial>')}": "Revoke certificate",
}
)
# Build response
response_data: dict[str, Any] = {
"name": "FlaskPaste",
"version": VERSION,
"prefix": prefix,
"endpoints": endpoints,
"authentication": {
"anonymous": "Create pastes only (strict limits)",
"client_cert": "Create + manage own pastes (strict limits)",
"trusted_cert": "All operations (relaxed limits)",
},
"limits": {
"anonymous": {
"max_size": current_app.config["MAX_PASTE_SIZE_ANON"],
"rate": f"{current_app.config['RATE_LIMIT_MAX']}/min",
},
"trusted": {
"max_size": current_app.config["MAX_PASTE_SIZE_AUTH"],
"rate": f"{current_app.config['RATE_LIMIT_MAX'] * current_app.config.get('RATE_LIMIT_AUTH_MULTIPLIER', 5)}/min", # noqa: E501
},
},
"pow": {
"enabled": difficulty > 0,
"difficulty": difficulty,
"hint": "GET /challenge, solve, submit with X-PoW-Token + X-PoW-Solution",
},
"cli": {
"install": f"curl -o fpaste {url}/client && chmod +x fpaste",
"usage": "fpaste file.txt # encrypts by default",
},
"usage": {
"create": f"curl -X POST --data-binary @file.txt {url}/ (with PoW headers)",
"get": f"curl {url}/<id>/raw",
"delete": f"curl -X DELETE {url}/<id> (with X-SSL-Client-SHA1)",
},
}
return json_response(response_data)
def post(self) -> Response:
"""Create a new paste."""
# Parse content
content: bytes | None = None
mime_type: str | None = None
if request.is_json:
data = request.get_json(silent=True)
if data and isinstance(data.get("content"), str):
content = data["content"].encode("utf-8")
mime_type = "text/plain"
else:
content = request.get_data(as_text=False)
if content:
mime_type = detect_mime_type(content, request.content_type)
if not content:
return error_response("No content provided", 400)
# Separate trusted (for limits) from fingerprint (for ownership)
trusted_client = get_client_id() # Only trusted certs get elevated limits
owner = get_client_fingerprint() # Any cert can own pastes
# Rate limiting (check before expensive operations)
client_ip = get_client_ip()
allowed, _remaining, reset_seconds = check_rate_limit(
client_ip, authenticated=bool(trusted_client)
)
if not allowed:
current_app.logger.warning(
"Rate limit exceeded: ip=%s trusted=%s", client_ip, bool(trusted_client)
)
response = error_response(
"Rate limit exceeded",
429,
retry_after=reset_seconds,
)
response.headers["Retry-After"] = str(reset_seconds)
response.headers["X-RateLimit-Remaining"] = "0"
response.headers["X-RateLimit-Reset"] = str(reset_seconds)
return response
# Proof-of-work verification
difficulty = current_app.config["POW_DIFFICULTY"]
if difficulty > 0:
token = request.headers.get("X-PoW-Token", "")
solution = request.headers.get("X-PoW-Solution", "")
if not token or not solution:
return error_response(
"Proof-of-work required", 400, hint="GET /challenge for a new challenge"
)
valid, err = verify_pow(token, solution)
if not valid:
current_app.logger.warning(
"PoW verification failed: %s from=%s", err, request.remote_addr
)
return error_response(f"Proof-of-work failed: {err}", 400)
# Size limits (only trusted clients get elevated limits)
content_size = len(content)
max_size = (
current_app.config["MAX_PASTE_SIZE_AUTH"]
if trusted_client
else current_app.config["MAX_PASTE_SIZE_ANON"]
)
if content_size > max_size:
return error_response(
"Paste too large",
413,
size=content_size,
max_size=max_size,
trusted=trusted_client is not None,
)
# Minimum size check (enforces encryption overhead)
min_size = current_app.config.get("MIN_PASTE_SIZE", 0)
if min_size > 0 and content_size < min_size:
return error_response(
"Paste too small",
400,
size=content_size,
min_size=min_size,
hint="Encrypt content before uploading (fpaste encrypts by default)",
)
# Entropy check
min_entropy = current_app.config.get("MIN_ENTROPY", 0)
min_entropy_size = current_app.config.get("MIN_ENTROPY_SIZE", 256)
if min_entropy > 0 and content_size >= min_entropy_size:
entropy = calculate_entropy(content)
if entropy < min_entropy:
current_app.logger.warning(
"Low entropy rejected: %.2f < %.2f from=%s",
entropy,
min_entropy,
request.remote_addr,
)
return error_response(
"Content entropy too low",
400,
entropy=round(entropy, 2),
min_entropy=min_entropy,
hint="Encrypt content before uploading (fpaste encrypts by default)",
)
# Binary content requirement (reject recognizable formats)
if current_app.config.get("REQUIRE_BINARY", False):
is_recognized, detected_format = is_recognizable_format(content)
if is_recognized:
current_app.logger.warning(
"Recognizable format rejected: %s from=%s",
detected_format,
request.remote_addr,
)
return error_response(
"Recognizable format not allowed",
400,
detected=detected_format,
hint="Encrypt content before uploading (fpaste encrypts by default)",
)
# Deduplication check
content_hash = hashlib.sha256(content).hexdigest()
is_allowed, dedup_count = check_content_hash(content_hash)
if not is_allowed:
window = current_app.config["CONTENT_DEDUP_WINDOW"]
current_app.logger.warning(
"Dedup threshold exceeded: hash=%s count=%d from=%s",
content_hash[:16],
dedup_count,
request.remote_addr,
)
return error_response(
"Duplicate content rate limit exceeded",
429,
count=dedup_count,
window_seconds=window,
)
# Parse optional headers
burn_header = request.headers.get("X-Burn-After-Read", "").strip().lower()
burn_after_read = burn_header in ("true", "1", "yes")
expires_at = None
expiry_header = request.headers.get("X-Expiry", "").strip()
if expiry_header:
try:
expiry_seconds = int(expiry_header)
if expiry_seconds > 0:
max_expiry = current_app.config.get("MAX_EXPIRY_SECONDS", 0)
if max_expiry > 0:
expiry_seconds = min(expiry_seconds, max_expiry)
expires_at = int(time.time()) + expiry_seconds
except ValueError:
pass
password_hash = None
password_header = request.headers.get("X-Paste-Password", "")
if password_header:
if len(password_header) > 1024:
return error_response("Password too long (max 1024 chars)", 400)
password_hash = hash_password(password_header)
# Insert paste
paste_id = generate_paste_id(content)
now = int(time.time())
db = get_db()
db.execute(
"""INSERT INTO pastes
(id, content, mime_type, owner, created_at, last_accessed,
burn_after_read, expires_at, password_hash)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
paste_id,
content,
mime_type,
owner,
now,
now,
1 if burn_after_read else 0,
expires_at,
password_hash,
),
)
db.commit()
# Build response
response_data: dict[str, Any] = {
"id": paste_id,
"url": f"/{paste_id}",
"raw": f"/{paste_id}/raw",
"mime_type": mime_type,
"created_at": now,
}
if owner:
response_data["owner"] = owner
if burn_after_read:
response_data["burn_after_read"] = True
if expires_at:
response_data["expires_at"] = expires_at
if password_hash:
response_data["password_protected"] = True
# Record successful paste for anti-flood tracking
record_antiflood_request()
return json_response(response_data, 201)
class HealthView(MethodView):
"""Health check endpoint."""
def get(self) -> Response:
"""Return health status with database check."""
try:
db = get_db()
db.execute("SELECT 1")
return json_response({"status": "healthy", "database": "ok"})
except Exception:
return json_response({"status": "unhealthy", "database": "error"}, 503)
class ChallengeView(MethodView):
"""Proof-of-work challenge endpoint."""
def get(self) -> Response:
"""Generate and return PoW challenge."""
base_difficulty = current_app.config["POW_DIFFICULTY"]
if base_difficulty == 0:
return json_response({"enabled": False, "difficulty": 0})
ch = generate_challenge()
response = {
"enabled": True,
"nonce": ch["nonce"],
"difficulty": ch["difficulty"],
"expires": ch["expires"],
"token": ch["token"],
}
# Indicate if difficulty is elevated due to anti-flood
if ch["difficulty"] > base_difficulty:
response["elevated"] = True
response["base_difficulty"] = base_difficulty
return json_response(response)
class RegisterChallengeView(MethodView):
"""Registration PoW challenge endpoint (higher difficulty)."""
def get(self) -> Response:
"""Generate PoW challenge for registration (higher difficulty)."""
register_difficulty = current_app.config.get("REGISTER_POW_DIFFICULTY", 24)
if register_difficulty == 0:
return json_response({"enabled": False, "difficulty": 0})
ch = generate_challenge(difficulty_override=register_difficulty)
return json_response(
{
"enabled": True,
"nonce": ch["nonce"],
"difficulty": ch["difficulty"],
"expires": ch["expires"],
"token": ch["token"],
"purpose": "registration",
}
)
class RegisterView(MethodView):
"""Public client certificate registration endpoint."""
def post(self) -> Response:
"""Register and obtain a client certificate.
Requires PoW to prevent abuse. Returns PKCS#12 bundle with:
- Client certificate
- Client private key
- CA certificate
Auto-generates CA if not present and PKI_CA_PASSWORD is configured.
"""
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from app.pki import (
CANotFoundError,
PKIError,
create_pkcs12,
generate_ca,
get_ca_info,
issue_certificate,
)
# Check PKI configuration
password = current_app.config.get("PKI_CA_PASSWORD", "")
if not password:
return error_response(
"Registration not available",
503,
hint="PKI_CA_PASSWORD not configured",
)
# Verify PoW
register_difficulty = current_app.config.get("REGISTER_POW_DIFFICULTY", 24)
if register_difficulty > 0:
token = request.headers.get("X-PoW-Token", "")
solution = request.headers.get("X-PoW-Solution", "")
if not token or not solution:
return error_response(
"Proof-of-work required",
400,
hint="GET /register/challenge for a registration challenge",
difficulty=register_difficulty,
)
valid, err = verify_pow(token, solution, min_difficulty=register_difficulty)
if not valid:
current_app.logger.warning(
"Registration PoW failed: %s from=%s", err, request.remote_addr
)
return error_response(f"Proof-of-work failed: {err}", 400)
# Parse common_name from request
common_name = None
if request.is_json:
data = request.get_json(silent=True)
if data and isinstance(data.get("common_name"), str):
common_name = data["common_name"][:64].strip()
if not common_name:
# Generate random common name if not provided
common_name = f"client-{secrets.token_hex(4)}"
# Auto-generate CA if needed (skip PKI_ENABLED check for registration)
ca_info = get_ca_info(skip_enabled_check=True)
if ca_info is None:
ca_days = current_app.config.get("PKI_CA_DAYS", 3650)
try:
ca_info = generate_ca("FlaskPaste CA", password, days=ca_days)
current_app.logger.info(
"CA auto-generated for registration: fingerprint=%s",
ca_info["fingerprint_sha1"][:12],
)
except PKIError as e:
current_app.logger.error("CA auto-generation failed: %s", e)
return error_response("CA generation failed", 500)
# Issue certificate
try:
cert_days = current_app.config.get("PKI_CERT_DAYS", 365)
cert_info = issue_certificate(common_name, password, days=cert_days)
except CANotFoundError:
return error_response("CA not available", 500)
except PKIError as e:
current_app.logger.error("Certificate issuance failed: %s", e)
return error_response("Certificate issuance failed", 500)
# Load CA cert for PKCS#12 (reuse ca_info from above, or refresh if it was just generated)
if ca_info is None or "certificate_pem" not in ca_info:
ca_info = get_ca_info(skip_enabled_check=True)
ca_cert = x509.load_pem_x509_certificate(ca_info["certificate_pem"].encode())
client_cert = x509.load_pem_x509_certificate(cert_info["certificate_pem"].encode())
client_key = serialization.load_pem_private_key(
cert_info["private_key_pem"].encode(), password=None
)
# Create PKCS#12 bundle (no password for easy import)
p12_data = create_pkcs12(
private_key=client_key,
certificate=client_cert,
ca_certificate=ca_cert,
friendly_name=common_name,
password=None,
)
current_app.logger.info(
"Client registered: cn=%s fingerprint=%s from=%s",
common_name,
cert_info["fingerprint_sha1"][:12],
request.remote_addr,
)
# Return PKCS#12 as binary download
response = Response(p12_data, mimetype="application/x-pkcs12")
response.headers["Content-Disposition"] = f'attachment; filename="{common_name}.p12"'
response.headers["X-Fingerprint-SHA1"] = cert_info["fingerprint_sha1"]
response.headers["X-Certificate-Expires"] = str(cert_info["expires_at"])
return response
class ClientView(MethodView):
"""CLI client download endpoint."""
def get(self) -> Response:
"""Serve fpaste CLI with server URL pre-configured."""
import os
server_url = base_url()
client_path = os.path.join(current_app.root_path, "..", "fpaste")
try:
with open(client_path) as f:
content = f.read()
# Replace default server URL
content = content.replace(
'"server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000")',
f'"server": os.environ.get("FLASKPASTE_SERVER", "{server_url}")',
)
content = content.replace(
"http://localhost:5000)",
f"{server_url})",
)
response = Response(content, mimetype="text/x-python")
response.headers["Content-Disposition"] = "attachment; filename=fpaste"
return response
except FileNotFoundError:
return error_response("Client not available", 404)
class PasteView(MethodView):
"""Paste metadata operations."""
def get(self, paste_id: str) -> Response:
"""Retrieve paste metadata."""
# Validate and fetch
if err := validate_paste_id(paste_id):
return err
if err := fetch_paste(paste_id):
return err
row: Row = g.paste
g.db.commit()
response_data: dict[str, Any] = {
"id": row["id"],
"mime_type": row["mime_type"],
"size": row["size"],
"created_at": row["created_at"],
"raw": f"/{paste_id}/raw",
}
if row["burn_after_read"]:
response_data["burn_after_read"] = True
if row["expires_at"]:
response_data["expires_at"] = row["expires_at"]
if row["password_hash"]:
response_data["password_protected"] = True
return json_response(response_data)
def head(self, paste_id: str) -> Response:
"""Return paste metadata headers only."""
return self.get(paste_id)
def put(self, paste_id: str) -> Response:
"""Update paste content and/or metadata.
Requires authentication and ownership.
Content update: Send raw body with Content-Type header
Metadata update: Use headers with empty body
Headers:
- X-Paste-Password: Set/change password
- X-Remove-Password: true to remove password
- X-Extend-Expiry: Seconds to add to current expiry
"""
# Validate paste ID format
if err := validate_paste_id(paste_id):
return err
if err := require_auth():
return err
db = get_db()
# Fetch current paste
row = db.execute(
"""SELECT id, owner, content, mime_type, expires_at, password_hash
FROM pastes WHERE id = ?""",
(paste_id,),
).fetchone()
if row is None:
return error_response("Paste not found", 404)
if row["owner"] != g.client_id:
return error_response("Permission denied", 403)
# Check for burn-after-read (cannot update)
burn_check = db.execute(
"SELECT burn_after_read FROM pastes WHERE id = ?", (paste_id,)
).fetchone()
if burn_check and burn_check["burn_after_read"]:
return error_response("Cannot update burn-after-read paste", 400)
# Parse update parameters
new_password = request.headers.get("X-Paste-Password", "").strip() or None
remove_password = request.headers.get("X-Remove-Password", "").lower() in (
"true",
"1",
"yes",
)
extend_expiry_str = request.headers.get("X-Extend-Expiry", "").strip()
# Prepare update fields
update_fields = []
update_params: list[Any] = []
# Content update (if body provided)
content = request.get_data()
if content:
mime_type = request.content_type or "application/octet-stream"
# Sanitize MIME type
if not MIME_PATTERN.match(mime_type.split(";")[0].strip()):
mime_type = "application/octet-stream"
update_fields.append("content = ?")
update_params.append(content)
update_fields.append("mime_type = ?")
update_params.append(mime_type.split(";")[0].strip())
# Password update
if remove_password:
update_fields.append("password_hash = NULL")
elif new_password:
update_fields.append("password_hash = ?")
update_params.append(hash_password(new_password))
# Expiry extension
if extend_expiry_str:
try:
extend_seconds = int(extend_expiry_str)
if extend_seconds > 0:
current_expiry = row["expires_at"]
if current_expiry:
new_expiry = current_expiry + extend_seconds
else:
# If no expiry set, create one from now
new_expiry = int(time.time()) + extend_seconds
update_fields.append("expires_at = ?")
update_params.append(new_expiry)
except ValueError:
return error_response("Invalid X-Extend-Expiry value", 400)
if not update_fields:
return error_response("No updates provided", 400)
# Execute update (fields are hardcoded strings, safe from injection)
update_sql = f"UPDATE pastes SET {', '.join(update_fields)} WHERE id = ?" # noqa: S608 # nosec B608
update_params.append(paste_id)
db.execute(update_sql, update_params)
db.commit()
# Fetch updated paste for response
updated = db.execute(
"""SELECT id, mime_type, length(content) as size, expires_at,
CASE WHEN password_hash IS NOT NULL THEN 1 ELSE 0 END as password_protected
FROM pastes WHERE id = ?""",
(paste_id,),
).fetchone()
response_data: dict[str, Any] = {
"id": updated["id"],
"size": updated["size"],
"mime_type": updated["mime_type"],
}
if updated["expires_at"]:
response_data["expires_at"] = updated["expires_at"]
if updated["password_protected"]:
response_data["password_protected"] = True
return json_response(response_data)
class PasteRawView(MethodView):
"""Raw paste content retrieval."""
def get(self, paste_id: str) -> Response:
"""Retrieve raw paste content."""
# Validate and fetch
if err := validate_paste_id(paste_id):
return err
if err := fetch_paste(paste_id):
return err
row: Row = g.paste
db = g.db
burn_after_read = row["burn_after_read"]
if burn_after_read:
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
current_app.logger.info("Burn-after-read paste deleted: %s", paste_id)
db.commit()
response = Response(row["content"], mimetype=row["mime_type"])
if row["mime_type"].startswith(("image/", "text/")):
response.headers["Content-Disposition"] = "inline"
if burn_after_read:
response.headers["X-Burn-After-Read"] = "true"
return response
def head(self, paste_id: str) -> Response:
"""Return raw paste headers without triggering burn."""
# Validate and fetch
if err := validate_paste_id(paste_id):
return err
if err := fetch_paste(paste_id):
return err
row: Row = g.paste
g.db.commit()
response = Response(mimetype=row["mime_type"])
response.headers["Content-Length"] = str(row["size"])
if row["mime_type"].startswith(("image/", "text/")):
response.headers["Content-Disposition"] = "inline"
if row["burn_after_read"]:
response.headers["X-Burn-After-Read"] = "true"
return response
class PasteDeleteView(MethodView):
"""Paste deletion with authentication."""
def delete(self, paste_id: str) -> Response:
"""Delete paste. Requires ownership."""
# Validate
if err := validate_paste_id(paste_id):
return err
if err := require_auth():
return err
db = get_db()
row = db.execute("SELECT owner FROM pastes WHERE id = ?", (paste_id,)).fetchone()
if row is None:
return error_response("Paste not found", 404)
if row["owner"] != g.client_id:
return error_response("Permission denied", 403)
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
db.commit()
return json_response({"message": "Paste deleted"})
class PastesListView(MethodView):
"""List authenticated user's pastes (privacy-focused)."""
def get(self) -> Response:
"""List pastes owned by authenticated user.
Privacy guarantees:
- Requires authentication (mTLS client certificate)
- Users can ONLY see their own pastes
- No admin bypass or cross-user visibility
- Content is never returned, only metadata
Query parameters:
- limit: max results (default 50, max 200)
- offset: pagination offset (default 0)
- type: filter by MIME type (glob pattern, e.g., "image/*")
- after: filter by created_at >= timestamp
- before: filter by created_at <= timestamp
"""
import fnmatch
# Strict authentication requirement
if err := require_auth():
return err
client_id = g.client_id
# Parse pagination parameters
try:
limit = min(int(request.args.get("limit", 50)), 200)
offset = max(int(request.args.get("offset", 0)), 0)
except (ValueError, TypeError):
limit, offset = 50, 0
# Parse filter parameters
type_filter = request.args.get("type", "").strip()
try:
after_ts = int(request.args.get("after", 0))
except (ValueError, TypeError):
after_ts = 0
try:
before_ts = int(request.args.get("before", 0))
except (ValueError, TypeError):
before_ts = 0
db = get_db()
# Build query with filters
where_clauses = ["owner = ?"]
params: list[Any] = [client_id]
if after_ts > 0:
where_clauses.append("created_at >= ?")
params.append(after_ts)
if before_ts > 0:
where_clauses.append("created_at <= ?")
params.append(before_ts)
where_sql = " AND ".join(where_clauses)
# Count total pastes matching filters (where_sql is safe, built from constants)
count_row = db.execute(
f"SELECT COUNT(*) as total FROM pastes WHERE {where_sql}", # noqa: S608 # nosec B608
params,
).fetchone()
total = count_row["total"] if count_row else 0
# Fetch pastes with metadata only (where_sql is safe, built from constants)
rows = db.execute(
f"""SELECT id, mime_type, length(content) as size, created_at,
last_accessed, burn_after_read, expires_at,
CASE WHEN password_hash IS NOT NULL THEN 1 ELSE 0 END as password_protected
FROM pastes
WHERE {where_sql}
ORDER BY created_at DESC
LIMIT ? OFFSET ?""", # noqa: S608 # nosec B608
[*params, limit, offset],
).fetchall()
# Apply MIME type filter (glob pattern matching done in Python for flexibility)
if type_filter:
rows = [r for r in rows if fnmatch.fnmatch(r["mime_type"], type_filter)]
pastes = []
for row in rows:
paste: dict[str, Any] = {
"id": row["id"],
"mime_type": row["mime_type"],
"size": row["size"],
"created_at": row["created_at"],
"last_accessed": row["last_accessed"],
"url": f"/{row['id']}",
"raw": f"/{row['id']}/raw",
}
if row["burn_after_read"]:
paste["burn_after_read"] = True
if row["expires_at"]:
paste["expires_at"] = row["expires_at"]
if row["password_protected"]:
paste["password_protected"] = True
pastes.append(paste)
return json_response(
{
"pastes": pastes,
"count": len(pastes),
"total": total,
"limit": limit,
"offset": offset,
}
)
# ─────────────────────────────────────────────────────────────────────────────
# PKI Views (Certificate Authority)
# ─────────────────────────────────────────────────────────────────────────────
def require_pki_enabled() -> Response | None:
"""Check if PKI is enabled. Returns error response or None if enabled."""
if not current_app.config.get("PKI_ENABLED"):
return error_response("PKI not enabled", 404)
return None
class PKIStatusView(MethodView):
"""PKI status endpoint."""
def get(self) -> Response:
"""Return PKI status and CA info if available."""
if not current_app.config.get("PKI_ENABLED"):
return json_response({"enabled": False})
from app.pki import get_ca_info
ca_info = get_ca_info()
if ca_info is None:
return json_response(
{
"enabled": True,
"ca_exists": False,
"hint": "POST /pki/ca to generate CA",
}
)
return json_response(
{
"enabled": True,
"ca_exists": True,
"common_name": ca_info["common_name"],
"fingerprint_sha1": ca_info["fingerprint_sha1"],
"created_at": ca_info["created_at"],
"expires_at": ca_info["expires_at"],
"key_algorithm": ca_info["key_algorithm"],
}
)
class PKICAGenerateView(MethodView):
"""CA generation endpoint (first-run only)."""
def post(self) -> Response:
"""Generate CA certificate. Only works if no CA exists."""
if err := require_pki_enabled():
return err
from app.pki import (
CAExistsError,
PKIError,
generate_ca,
get_ca_info,
)
# Check if CA already exists
if get_ca_info() is not None:
return error_response("CA already exists", 409)
# Get CA password from config
password = current_app.config.get("PKI_CA_PASSWORD", "")
if not password:
return error_response(
"PKI_CA_PASSWORD not configured",
500,
hint="Set FLASKPASTE_PKI_CA_PASSWORD environment variable",
)
# Parse request for optional common name
common_name = "FlaskPaste CA"
if request.is_json:
data = request.get_json(silent=True)
if data and isinstance(data.get("common_name"), str):
common_name = data["common_name"][:64]
# Generate CA
try:
days = current_app.config.get("PKI_CA_DAYS", 3650)
owner = get_client_fingerprint()
ca_info = generate_ca(common_name, password, days=days, owner=owner)
except CAExistsError:
return error_response("CA already exists", 409)
except PKIError as e:
current_app.logger.error("CA generation failed: %s", e)
return error_response("CA generation failed", 500)
current_app.logger.info(
"CA generated: cn=%s fingerprint=%s", common_name, ca_info["fingerprint_sha1"][:12]
)
return json_response(
{
"message": "CA generated",
"common_name": ca_info["common_name"],
"fingerprint_sha1": ca_info["fingerprint_sha1"],
"created_at": ca_info["created_at"],
"expires_at": ca_info["expires_at"],
"download": prefixed_url("/pki/ca.crt"),
},
201,
)
class PKICADownloadView(MethodView):
"""CA certificate download endpoint."""
def get(self) -> Response:
"""Download CA certificate in PEM format."""
if err := require_pki_enabled():
return err
from app.pki import get_ca_info
ca_info = get_ca_info()
if ca_info is None:
return error_response("CA not initialized", 404)
response = Response(ca_info["certificate_pem"], mimetype="application/x-pem-file")
response.headers["Content-Disposition"] = (
f"attachment; filename={ca_info['common_name'].replace(' ', '_')}.crt"
)
return response
class PKIIssueView(MethodView):
"""Certificate issuance endpoint (open registration)."""
def post(self) -> Response:
"""Issue a new client certificate."""
if err := require_pki_enabled():
return err
from app.pki import (
CANotFoundError,
PKIError,
issue_certificate,
)
# Parse request
common_name = None
if request.is_json:
data = request.get_json(silent=True)
if data and isinstance(data.get("common_name"), str):
common_name = data["common_name"][:64]
if not common_name:
return error_response(
"common_name required", 400, hint='POST {"common_name": "your-name"}'
)
# Get CA password from config
password = current_app.config.get("PKI_CA_PASSWORD", "")
if not password:
return error_response("PKI not properly configured", 500)
# Issue certificate
try:
days = current_app.config.get("PKI_CERT_DAYS", 365)
issued_to = get_client_fingerprint()
cert_info = issue_certificate(common_name, password, days=days, issued_to=issued_to)
except CANotFoundError:
return error_response("CA not initialized", 404)
except PKIError as e:
current_app.logger.error("Certificate issuance failed: %s", e)
return error_response("Certificate issuance failed", 500)
current_app.logger.info(
"Certificate issued: cn=%s serial=%s fingerprint=%s to=%s",
common_name,
cert_info["serial"][:8],
cert_info["fingerprint_sha1"][:12],
issued_to or "anonymous",
)
# Return certificate bundle
return json_response(
{
"message": "Certificate issued",
"serial": cert_info["serial"],
"common_name": cert_info["common_name"],
"fingerprint_sha1": cert_info["fingerprint_sha1"],
"created_at": cert_info["created_at"],
"expires_at": cert_info["expires_at"],
"certificate_pem": cert_info["certificate_pem"],
"private_key_pem": cert_info["private_key_pem"],
},
201,
)
class PKICertsView(MethodView):
"""Certificate listing endpoint."""
def get(self) -> Response:
"""List issued certificates."""
if err := require_pki_enabled():
return err
client_id = get_client_fingerprint()
db = get_db()
# Users with certificates can see their own certs or certs they issued
# Anonymous users (no cert) see nothing
if client_id:
rows = db.execute(
"""SELECT serial, common_name, fingerprint_sha1,
created_at, expires_at, issued_to, status, revoked_at
FROM issued_certificates
WHERE issued_to = ? OR fingerprint_sha1 = ?
ORDER BY created_at DESC""",
(client_id, client_id),
).fetchall()
else:
# Anonymous: empty list
rows = []
certs = []
for row in rows:
cert = {
"serial": row["serial"],
"common_name": row["common_name"],
"fingerprint_sha1": row["fingerprint_sha1"],
"created_at": row["created_at"],
"expires_at": row["expires_at"],
"status": row["status"],
}
if row["issued_to"]:
cert["issued_to"] = row["issued_to"]
if row["revoked_at"]:
cert["revoked_at"] = row["revoked_at"]
certs.append(cert)
return json_response({"certificates": certs, "count": len(certs)})
class PKIRevokeView(MethodView):
"""Certificate revocation endpoint."""
def post(self, serial: str) -> Response:
"""Revoke a certificate by serial number."""
if err := require_pki_enabled():
return err
if err := require_auth():
return err
from app.pki import CertificateNotFoundError, PKIError, revoke_certificate
db = get_db()
# Check certificate exists and get ownership info
row = db.execute(
"SELECT issued_to, fingerprint_sha1, status FROM issued_certificates WHERE serial = ?",
(serial,),
).fetchone()
if row is None:
return error_response("Certificate not found", 404)
if row["status"] == "revoked":
return error_response("Certificate already revoked", 409)
# Check permission: must be issuer or the certificate itself
client_id = g.client_id
can_revoke = row["issued_to"] == client_id or row["fingerprint_sha1"] == client_id
if not can_revoke:
return error_response("Permission denied", 403)
# Revoke
try:
revoke_certificate(serial)
except CertificateNotFoundError:
return error_response("Certificate not found", 404)
except PKIError as e:
current_app.logger.error("Revocation failed: %s", e)
return error_response("Revocation failed", 500)
current_app.logger.info("Certificate revoked: serial=%s by=%s", serial[:8], client_id[:12])
return json_response({"message": "Certificate revoked", "serial": serial})
# ─────────────────────────────────────────────────────────────────────────────
# Route Registration
# ─────────────────────────────────────────────────────────────────────────────
# Index and paste creation
bp.add_url_rule("/", view_func=IndexView.as_view("index"))
# Utility endpoints
bp.add_url_rule("/health", view_func=HealthView.as_view("health"))
bp.add_url_rule("/challenge", view_func=ChallengeView.as_view("challenge"))
bp.add_url_rule("/client", view_func=ClientView.as_view("client"))
# Registration endpoints (public certificate issuance with PoW)
bp.add_url_rule(
"/register/challenge", view_func=RegisterChallengeView.as_view("register_challenge")
)
bp.add_url_rule("/register", view_func=RegisterView.as_view("register"))
# Paste operations
bp.add_url_rule("/pastes", view_func=PastesListView.as_view("pastes_list"))
bp.add_url_rule("/<paste_id>", view_func=PasteView.as_view("paste"), methods=["GET", "HEAD", "PUT"])
bp.add_url_rule(
"/<paste_id>/raw", view_func=PasteRawView.as_view("paste_raw"), methods=["GET", "HEAD"]
)
bp.add_url_rule(
"/<paste_id>", view_func=PasteDeleteView.as_view("paste_delete"), methods=["DELETE"]
)
# PKI endpoints
bp.add_url_rule("/pki", view_func=PKIStatusView.as_view("pki_status"))
bp.add_url_rule("/pki/ca", view_func=PKICAGenerateView.as_view("pki_ca_generate"))
bp.add_url_rule("/pki/ca.crt", view_func=PKICADownloadView.as_view("pki_ca_download"))
bp.add_url_rule("/pki/issue", view_func=PKIIssueView.as_view("pki_issue"))
bp.add_url_rule("/pki/certs", view_func=PKICertsView.as_view("pki_certs"))
bp.add_url_rule(
"/pki/revoke/<serial>", view_func=PKIRevokeView.as_view("pki_revoke"), methods=["POST"]
)