Files
flaskpaste/app/api/routes.py
Username 03bcb157cc add HEIC/HEIF/AVIF MIME detection signatures
- Add ftyp box signatures for heic, mif1, and avif brands
- Add tests for new image formats
- Fix nested if lint warning in lookup rate limit
- Update security docs: MKV uses WebM header, TAR needs offset 257
2025-12-26 17:04:51 +01:00

2368 lines
88 KiB
Python

"""API route handlers using modern Flask patterns."""
from __future__ import annotations
import hashlib
import hmac
import json
import math
import re
import secrets
import threading
import time
from collections import defaultdict
from typing import TYPE_CHECKING, Any
from flask import Response, current_app, g, request
from flask.views import MethodView
from app.api import bp
from app.audit import AuditEvent, AuditOutcome, log_event
from app.config import VERSION
from app.database import check_content_hash, get_db, hash_password, verify_password
from app.metrics import (
record_dedup,
record_paste_accessed,
record_paste_created,
record_paste_deleted,
record_pow,
record_rate_limit,
)
if TYPE_CHECKING:
from sqlite3 import Row
# Compiled patterns for validation
PASTE_ID_PATTERN = re.compile(r"^[a-f0-9]+$")
CLIENT_ID_PATTERN = re.compile(r"^[a-f0-9]{40}$")
MIME_PATTERN = re.compile(r"^[a-z0-9][a-z0-9!#$&\-^_.+]*/[a-z0-9][a-z0-9!#$&\-^_.+]*$")
# Magic bytes for binary format detection
MAGIC_SIGNATURES: dict[bytes, str] = {
# Images
b"\x89PNG\r\n\x1a\n": "image/png",
b"\xff\xd8\xff": "image/jpeg",
b"GIF87a": "image/gif",
b"GIF89a": "image/gif",
b"RIFF": "image/webp", # RIFF container, verified as WEBP in detect_mime_type
b"BM": "image/bmp",
b"II\x2a\x00": "image/tiff", # Little-endian TIFF
b"MM\x00\x2a": "image/tiff", # Big-endian TIFF
b"\x00\x00\x01\x00": "image/x-icon",
# HEIC/HEIF (ftyp box with heic/mif1 brand) - bytes 4-7 = "ftyp", 8-12 = brand
b"\x00\x00\x00\x18\x66\x74\x79\x70\x68\x65\x69\x63": "image/heic", # ftyp heic
b"\x00\x00\x00\x1c\x66\x74\x79\x70\x68\x65\x69\x63": "image/heic", # ftyp heic
b"\x00\x00\x00\x18\x66\x74\x79\x70\x6d\x69\x66\x31": "image/heif", # ftyp mif1
b"\x00\x00\x00\x1c\x66\x74\x79\x70\x6d\x69\x66\x31": "image/heif", # ftyp mif1
# AVIF (ftyp box with avif brand)
b"\x00\x00\x00\x1c\x66\x74\x79\x70\x61\x76\x69\x66": "image/avif", # ftyp avif
b"\x00\x00\x00\x20\x66\x74\x79\x70\x61\x76\x69\x66": "image/avif", # ftyp avif
# Video/Audio containers (checked for subtype in detect_mime_type)
b"\x1a\x45\xdf\xa3": "video/webm", # Matroska/WebM (same format)
b"FLV\x01": "video/x-flv",
b"\x00\x00\x00\x1c\x66\x74\x79\x70\x69\x73\x6f\x6d": "video/mp4", # ftyp isom
b"\x00\x00\x00\x1c\x66\x74\x79\x70": "video/mp4", # ftyp box at standard offset
b"\x00\x00\x00\x20\x66\x74\x79\x70": "video/mp4", # ftyp with different size
b"\x00\x00\x00\x18\x66\x74\x79\x70": "video/mp4", # ftyp with different size
# Audio
b"ID3": "audio/mpeg", # MP3 with ID3 tag
b"\xff\xfb": "audio/mpeg", # MP3 frame sync
b"\xff\xfa": "audio/mpeg",
b"\xff\xf3": "audio/mpeg",
b"\xff\xf2": "audio/mpeg",
b"fLaC": "audio/flac",
b"OggS": "audio/ogg",
# Documents
b"%PDF": "application/pdf",
b"\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1": "application/msword", # OLE (DOC, XLS, PPT, MSI)
b"PK\x03\x04": "application/zip", # ZIP, DOCX, XLSX, PPTX, ODT, JAR, APK
# Executables
b"MZ": "application/x-msdownload", # EXE, DLL
b"\x7fELF": "application/x-executable", # ELF (Linux)
b"\xfe\xed\xfa\xce": "application/x-mach-binary", # Mach-O 32-bit big-endian
b"\xce\xfa\xed\xfe": "application/x-mach-binary", # Mach-O 32-bit little-endian
b"\xfe\xed\xfa\xcf": "application/x-mach-binary", # Mach-O 64-bit big-endian
b"\xcf\xfa\xed\xfe": "application/x-mach-binary", # Mach-O 64-bit little-endian
b"\xca\xfe\xba\xbe": "application/x-mach-binary", # Mach-O fat/universal binary
b"\x00asm": "application/wasm", # WebAssembly
# Compression/Archives
b"\x1f\x8b": "application/gzip",
b"BZh": "application/x-bzip2",
b"\xfd7zXZ\x00": "application/x-xz",
b"\x28\xb5\x2f\xfd": "application/zstd",
b"\x04\x22\x4d\x18": "application/x-lz4",
b"7z\xbc\xaf\x27\x1c": "application/x-7z-compressed",
b"Rar!\x1a\x07": "application/vnd.rar",
# Packages
b"\xed\xab\xee\xdb": "application/x-rpm",
# Data
b"SQLite format 3\x00": "application/x-sqlite3",
}
# Maximum magic signature length (for safe prefix slicing)
MAX_MAGIC_LEN = 16 # SQLite signature is longest at 16 bytes
# Generic MIME types to override with detection
GENERIC_MIME_TYPES = frozenset(
{
"application/octet-stream",
"application/x-www-form-urlencoded",
"text/plain",
}
)
# Runtime PoW secret cache
_pow_secret_cache: bytes | None = None
# ─────────────────────────────────────────────────────────────────────────────
# Anti-flood: dynamic PoW difficulty adjustment
# ─────────────────────────────────────────────────────────────────────────────
_antiflood_lock = threading.Lock()
_antiflood_requests: list[float] = [] # Global request timestamps
_antiflood_difficulty: int = 0 # Current difficulty boost (added to base)
_antiflood_last_increase: float = 0 # Last time difficulty was increased
def get_dynamic_difficulty() -> int:
"""Get current PoW difficulty including anti-flood adjustment."""
base: int = current_app.config["POW_DIFFICULTY"]
if base == 0 or not current_app.config.get("ANTIFLOOD_ENABLED", True):
return base
with _antiflood_lock:
max_diff: int = current_app.config["ANTIFLOOD_MAX"]
return min(base + _antiflood_difficulty, max_diff)
def record_antiflood_request() -> None:
"""Record a request for anti-flood tracking and adjust difficulty."""
if not current_app.config.get("ANTIFLOOD_ENABLED", True):
return
if current_app.config["POW_DIFFICULTY"] == 0:
return
global _antiflood_difficulty, _antiflood_last_increase
now = time.time()
window = current_app.config["ANTIFLOOD_WINDOW"]
threshold = current_app.config["ANTIFLOOD_THRESHOLD"]
step = current_app.config["ANTIFLOOD_STEP"]
max_diff = current_app.config["ANTIFLOOD_MAX"]
decay = current_app.config["ANTIFLOOD_DECAY"]
base = current_app.config["POW_DIFFICULTY"]
max_entries = current_app.config.get("ANTIFLOOD_MAX_ENTRIES", 10000)
with _antiflood_lock:
# Clean old requests
cutoff = now - window
_antiflood_requests[:] = [t for t in _antiflood_requests if t > cutoff]
# FLOOD-001: Cap list size to prevent memory exhaustion
if len(_antiflood_requests) >= max_entries:
# Keep only the most recent half
_antiflood_requests[:] = _antiflood_requests[-(max_entries // 2) :]
# Record this request
_antiflood_requests.append(now)
count = len(_antiflood_requests)
# Check if we should increase difficulty
if count > threshold:
# Increase difficulty if not already at max
if base + _antiflood_difficulty < max_diff:
_antiflood_difficulty += step
_antiflood_last_increase = now
elif _antiflood_difficulty > 0 and (now - _antiflood_last_increase) > decay:
# Decay difficulty if abuse has stopped
_antiflood_difficulty = max(0, _antiflood_difficulty - step)
_antiflood_last_increase = now # Reset timer
def reset_antiflood() -> None:
"""Reset anti-flood state (for testing)."""
global _antiflood_difficulty, _antiflood_last_increase
with _antiflood_lock:
_antiflood_requests.clear()
_antiflood_difficulty = 0
_antiflood_last_increase = 0
# ─────────────────────────────────────────────────────────────────────────────
# Rate Limiting (in-memory sliding window)
# ─────────────────────────────────────────────────────────────────────────────
_rate_limit_lock = threading.Lock()
_rate_limit_requests: dict[str, list[float]] = defaultdict(list)
def get_client_ip() -> str:
"""Get client IP address, respecting X-Forwarded-For from trusted proxy."""
if is_trusted_proxy():
forwarded = request.headers.get("X-Forwarded-For", "")
if forwarded:
# Take the first (client) IP from the chain
return forwarded.split(",")[0].strip()
return request.remote_addr or "unknown"
def check_rate_limit(client_ip: str, authenticated: bool = False) -> tuple[bool, int, int, int]:
"""Check if request is within rate limit.
Args:
client_ip: Client IP address
authenticated: Whether client is authenticated (higher limits)
Returns:
Tuple of (allowed, remaining, limit, reset_timestamp)
- allowed: Whether request is within rate limit
- remaining: Requests remaining in current window
- limit: Maximum requests per window
- reset_timestamp: Unix timestamp when window resets
"""
if not current_app.config.get("RATE_LIMIT_ENABLED", True):
return True, -1, -1, 0
window = current_app.config["RATE_LIMIT_WINDOW"]
max_requests = current_app.config["RATE_LIMIT_MAX"]
max_entries = current_app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000)
cleanup_threshold = current_app.config.get("RATE_LIMIT_CLEANUP_THRESHOLD", 0.8)
if authenticated:
max_requests *= current_app.config.get("RATE_LIMIT_AUTH_MULTIPLIER", 5)
now = time.time()
cutoff = now - window
with _rate_limit_lock:
entry_count = len(_rate_limit_requests)
# RATE-002: Proactive cleanup when exceeding threshold
threshold_count = int(max_entries * cleanup_threshold)
if entry_count >= threshold_count:
# Clean up expired entries first
_prune_rate_limit_entries(threshold_count // 2, cutoff)
# RATE-001: Hard limit enforcement (fallback if threshold cleanup wasn't enough)
if len(_rate_limit_requests) >= max_entries and client_ip not in _rate_limit_requests:
# Evict oldest entries (those with oldest last request time)
_prune_rate_limit_entries(max_entries // 2, cutoff)
# Clean old requests and get current list
requests = _rate_limit_requests[client_ip]
requests[:] = [t for t in requests if t > cutoff]
current_count = len(requests)
# Calculate reset timestamp (when oldest request expires, or now + window if empty)
reset_timestamp = int(requests[0] + window) if requests else int(now + window)
if current_count >= max_requests:
return False, 0, max_requests, reset_timestamp
# Record this request
requests.append(now)
remaining = max_requests - len(requests)
return True, remaining, max_requests, reset_timestamp
def _prune_rate_limit_entries(target_size: int, cutoff: float) -> None:
"""Prune rate limit entries to target size. Must hold _rate_limit_lock.
Removes entries with no recent activity first, then oldest entries.
"""
# First pass: remove entries with all expired requests
to_remove = []
for ip, requests in _rate_limit_requests.items():
if not requests or all(t <= cutoff for t in requests):
to_remove.append(ip)
for ip in to_remove:
del _rate_limit_requests[ip]
# Second pass: if still over target, remove entries with oldest last activity
if len(_rate_limit_requests) > target_size:
# Sort by most recent request timestamp (ascending = oldest first)
entries = sorted(
_rate_limit_requests.items(),
key=lambda x: max(x[1]) if x[1] else 0,
)
# Remove oldest until we're at target size
remove_count = len(entries) - target_size
for ip, _ in entries[:remove_count]:
del _rate_limit_requests[ip]
def cleanup_rate_limits(window: int | None = None) -> int:
"""Remove expired rate limit entries. Returns count of cleaned entries.
Args:
window: Rate limit window in seconds. If None, uses app config.
"""
# This should be called periodically (e.g., via cleanup task)
if window is None:
window = current_app.config.get("RATE_LIMIT_WINDOW", 60)
cutoff = time.time() - window
cleaned = 0
with _rate_limit_lock:
to_remove = []
for ip, requests in _rate_limit_requests.items():
requests[:] = [t for t in requests if t > cutoff]
if not requests:
to_remove.append(ip)
for ip in to_remove:
del _rate_limit_requests[ip]
cleaned += 1
return cleaned
def reset_rate_limits() -> None:
"""Clear all rate limit state. For testing only."""
with _rate_limit_lock:
_rate_limit_requests.clear()
# ─────────────────────────────────────────────────────────────────────────────
# ENUM-001: Lookup Rate Limiting (prevents paste ID enumeration)
# ─────────────────────────────────────────────────────────────────────────────
_lookup_rate_limit_lock = threading.Lock()
_lookup_rate_limit_requests: dict[str, list[float]] = defaultdict(list)
def check_lookup_rate_limit(client_ip: str) -> tuple[bool, int]:
"""Check if lookup request is within rate limit.
Args:
client_ip: Client IP address
Returns:
Tuple of (allowed, retry_after_seconds)
"""
if not current_app.config.get("LOOKUP_RATE_LIMIT_ENABLED", True):
return True, 0
window = current_app.config.get("LOOKUP_RATE_LIMIT_WINDOW", 60)
max_requests = current_app.config.get("LOOKUP_RATE_LIMIT_MAX", 60)
max_entries = current_app.config.get("LOOKUP_RATE_LIMIT_MAX_ENTRIES", 10000)
now = time.time()
cutoff = now - window
with _lookup_rate_limit_lock:
# ENUM-002: Memory protection - prune if at capacity
if (
len(_lookup_rate_limit_requests) >= max_entries
and client_ip not in _lookup_rate_limit_requests
):
# Evict expired entries first
expired = [
ip
for ip, reqs in _lookup_rate_limit_requests.items()
if not reqs or reqs[-1] <= cutoff
]
for ip in expired:
del _lookup_rate_limit_requests[ip]
# If still at capacity, evict oldest entries
if len(_lookup_rate_limit_requests) >= max_entries:
sorted_ips = sorted(
_lookup_rate_limit_requests.items(),
key=lambda x: x[1][-1] if x[1] else 0,
)
for ip, _ in sorted_ips[: max_entries // 4]:
del _lookup_rate_limit_requests[ip]
requests = _lookup_rate_limit_requests[client_ip]
requests[:] = [t for t in requests if t > cutoff]
if len(requests) >= max_requests:
retry_after = int(requests[0] + window - now) + 1
return False, max(1, retry_after)
requests.append(now)
return True, 0
def reset_lookup_rate_limits() -> None:
"""Clear lookup rate limit state. For testing only."""
with _lookup_rate_limit_lock:
_lookup_rate_limit_requests.clear()
def add_rate_limit_headers(
response: Response, remaining: int, limit: int, reset_timestamp: int
) -> Response:
"""Add standard rate limit headers to response.
Headers follow draft-ietf-httpapi-ratelimit-headers convention:
- X-RateLimit-Limit: Maximum requests per window
- X-RateLimit-Remaining: Requests remaining in current window
- X-RateLimit-Reset: Unix timestamp when window resets
"""
if limit > 0: # Only add headers if rate limiting is enabled
response.headers["X-RateLimit-Limit"] = str(limit)
response.headers["X-RateLimit-Remaining"] = str(max(0, remaining))
response.headers["X-RateLimit-Reset"] = str(reset_timestamp)
return response
# ─────────────────────────────────────────────────────────────────────────────
# Response Helpers
# ─────────────────────────────────────────────────────────────────────────────
def json_response(data: dict[str, Any], status: int = 200) -> Response:
"""Create JSON response with proper encoding."""
return Response(
json.dumps(data, ensure_ascii=False),
status=status,
mimetype="application/json",
)
def error_response(message: str, status: int, **extra: Any) -> Response:
"""Create standardized error response."""
data = {"error": message, **extra}
return json_response(data, status)
# ─────────────────────────────────────────────────────────────────────────────
# URL Helpers
# ─────────────────────────────────────────────────────────────────────────────
def url_prefix() -> str:
"""Get configured URL prefix for reverse proxy deployments."""
prefix: str = current_app.config.get("URL_PREFIX", "")
return prefix
def prefixed_url(path: str) -> str:
"""Generate URL with configured prefix."""
return f"{url_prefix()}{path}"
def paste_url(paste_id: str) -> str:
"""Generate URL for paste metadata endpoint."""
return prefixed_url(f"/{paste_id}")
def paste_raw_url(paste_id: str) -> str:
"""Generate URL for raw paste content endpoint."""
return prefixed_url(f"/{paste_id}/raw")
def base_url() -> str:
"""Detect full base URL from request headers."""
scheme = (
request.headers.get("X-Forwarded-Proto")
or request.headers.get("X-Scheme")
or request.scheme
)
host = request.headers.get("X-Forwarded-Host") or request.headers.get("Host") or request.host
return f"{scheme}://{host}{url_prefix()}"
# ─────────────────────────────────────────────────────────────────────────────
# Response Builders
# ─────────────────────────────────────────────────────────────────────────────
def build_paste_metadata(
paste_id: str,
mime_type: str,
size: int,
created_at: int,
*,
owner: str | None = None,
burn_after_read: bool = False,
expires_at: int | None = None,
password_protected: bool = False,
include_owner: bool = False,
last_accessed: int | None = None,
) -> dict[str, Any]:
"""Build standardized paste metadata response dict.
Args:
paste_id: Paste identifier
mime_type: Content MIME type
size: Content size in bytes
created_at: Creation timestamp
owner: Owner fingerprint (included only if include_owner=True)
burn_after_read: Whether paste is burn-after-read
expires_at: Expiration timestamp
password_protected: Whether paste has password
include_owner: Whether to include owner in response
last_accessed: Last access timestamp (optional)
"""
data: dict[str, Any] = {
"id": paste_id,
"mime_type": mime_type,
"size": size,
"created_at": created_at,
"url": paste_url(paste_id),
"raw": paste_raw_url(paste_id),
}
if last_accessed is not None:
data["last_accessed"] = last_accessed
if include_owner and owner:
data["owner"] = owner
if burn_after_read:
data["burn_after_read"] = True
if expires_at:
data["expires_at"] = expires_at
if password_protected:
data["password_protected"] = True
return data
# ─────────────────────────────────────────────────────────────────────────────
# Validation Helpers (used within views)
# ─────────────────────────────────────────────────────────────────────────────
def validate_paste_id(paste_id: str) -> Response | None:
"""Validate paste ID format. Returns error response or None if valid."""
expected_length = current_app.config["PASTE_ID_LENGTH"]
if len(paste_id) != expected_length or not PASTE_ID_PATTERN.match(paste_id):
return error_response("Invalid paste ID", 400)
return None
def fetch_paste(paste_id: str, check_password: bool = True) -> Response | None:
"""Fetch paste and store in g.paste. Returns error response or None if OK."""
# ENUM-001: Rate limit lookups to prevent enumeration attacks
client_ip = get_client_ip()
allowed, retry_after = check_lookup_rate_limit(client_ip)
if not allowed:
response = error_response(
f"Lookup rate limit exceeded. Retry after {retry_after} seconds.",
429,
retry_after=retry_after,
)
response.headers["Retry-After"] = str(retry_after)
return response
db = get_db()
now = int(time.time())
# Update access time
db.execute("UPDATE pastes SET last_accessed = ? WHERE id = ?", (now, paste_id))
row = db.execute(
"""SELECT id, content, mime_type, owner, created_at,
length(content) as size, burn_after_read, expires_at, password_hash
FROM pastes WHERE id = ?""",
(paste_id,),
).fetchone()
if row is None:
# TIMING-001: Perform dummy password verification to prevent timing-based
# enumeration (attacker can't distinguish "not found" from "wrong password"
# by measuring response time)
if check_password:
dummy_hash = (
"$pbkdf2-sha256$600000$"
"0000000000000000000000000000000000000000000000000000000000000000$"
"0000000000000000000000000000000000000000000000000000000000000000"
)
verify_password("dummy", dummy_hash)
db.commit()
return error_response("Paste not found", 404)
# Password verification
if check_password and row["password_hash"]:
provided = request.headers.get("X-Paste-Password", "")
if not provided:
db.commit()
return error_response("Password required", 401, password_protected=True)
if not verify_password(provided, row["password_hash"]):
db.commit()
return error_response("Invalid password", 403)
g.paste = row
g.db = db
return None
def require_auth() -> Response | None:
"""Check authentication for ownership operations.
Uses get_client_fingerprint() to allow both trusted and untrusted
certificate holders to manage their own pastes.
Returns error response or None if authenticated.
"""
client_id = get_client_fingerprint()
if not client_id:
return error_response("Authentication required", 401)
g.client_id = client_id
return None
# ─────────────────────────────────────────────────────────────────────────────
# Authentication & Security
# ─────────────────────────────────────────────────────────────────────────────
def is_trusted_proxy() -> bool:
"""Verify request comes from trusted reverse proxy via shared secret.
Result is cached per-request in Flask's g object for efficiency.
"""
if hasattr(g, "_trusted_proxy"):
return g._trusted_proxy
expected = current_app.config.get("TRUSTED_PROXY_SECRET", "")
if not expected:
g._trusted_proxy = True
return True
provided = request.headers.get("X-Proxy-Secret", "")
g._trusted_proxy = hmac.compare_digest(expected, provided)
return g._trusted_proxy
def get_client_fingerprint() -> str | None:
"""Extract client certificate fingerprint for identity/ownership.
Returns fingerprint regardless of trust status. Used for:
- Paste ownership tracking
- Delete/update/list operations (user manages their own pastes)
Returns None if no valid fingerprint provided or proxy not trusted.
"""
if not is_trusted_proxy():
return None
sha1 = request.headers.get("X-SSL-Client-SHA1", "").strip().lower()
if sha1 and CLIENT_ID_PATTERN.match(sha1):
return sha1
return None
def get_client_id() -> str | None:
"""Get trusted client certificate fingerprint for elevated privileges.
Returns fingerprint only if certificate is valid and not revoked.
Used for:
- Rate limit benefits (higher limits for trusted users)
- Size limit benefits (larger pastes for trusted users)
Untrusted certificates return None here but still work via
get_client_fingerprint() for ownership operations.
"""
fingerprint = get_client_fingerprint()
if not fingerprint:
return None
# Check if PKI is enabled and certificate is revoked
if current_app.config.get("PKI_ENABLED"):
from app.pki import is_certificate_valid
if not is_certificate_valid(fingerprint):
current_app.logger.warning(
"Elevated auth rejected (revoked/expired): %s", fingerprint[:12] + "..."
)
log_event(
AuditEvent.AUTH_FAILURE,
AuditOutcome.BLOCKED,
client_id=fingerprint,
client_ip=get_client_ip(),
details={"reason": "revoked_or_expired"},
)
return None
return fingerprint
def is_admin() -> bool:
"""Check if current authenticated user is an admin.
Returns True only if:
- User has a valid client certificate
- Certificate is marked as admin in the PKI database
"""
client_id = get_client_id()
if not client_id:
return False
if not current_app.config.get("PKI_ENABLED"):
return False
from app.pki import is_admin_certificate
return is_admin_certificate(client_id)
# ─────────────────────────────────────────────────────────────────────────────
# Proof-of-Work
# ─────────────────────────────────────────────────────────────────────────────
def get_pow_secret() -> bytes:
"""Get or generate PoW signing secret."""
global _pow_secret_cache
configured: str = current_app.config.get("POW_SECRET", "")
if configured:
return configured.encode()
if _pow_secret_cache is None:
_pow_secret_cache = secrets.token_bytes(32)
return _pow_secret_cache
def generate_challenge(difficulty_override: int | None = None) -> dict[str, Any]:
"""Generate new PoW challenge with signed token.
Uses dynamic difficulty which may be elevated during high load,
unless difficulty_override is specified.
Args:
difficulty_override: Optional fixed difficulty (for registration)
"""
if difficulty_override is not None:
difficulty = difficulty_override
else:
difficulty = get_dynamic_difficulty()
ttl = current_app.config["POW_CHALLENGE_TTL"]
expires = int(time.time()) + ttl
nonce = secrets.token_hex(16)
msg = f"{nonce}:{expires}:{difficulty}".encode()
sig = hmac.new(get_pow_secret(), msg, hashlib.sha256).hexdigest()
return {
"nonce": nonce,
"difficulty": difficulty,
"expires": expires,
"token": f"{nonce}:{expires}:{difficulty}:{sig}",
}
def verify_pow(token: str, solution: str, min_difficulty: int | None = None) -> tuple[bool, str]:
"""Verify proof-of-work solution. Returns (valid, error_message).
Accepts tokens with difficulty >= min_difficulty. The solution must meet the
token's embedded difficulty (which may be elevated due to anti-flood).
Args:
token: PoW challenge token
solution: Nonce solution
min_difficulty: Minimum required difficulty (defaults to POW_DIFFICULTY)
"""
base_difficulty = current_app.config["POW_DIFFICULTY"]
if base_difficulty == 0 and min_difficulty is None:
return True, ""
required_difficulty = min_difficulty if min_difficulty is not None else base_difficulty
if required_difficulty == 0:
return True, ""
# Parse token
try:
parts = token.split(":")
if len(parts) != 4:
return False, "Invalid challenge format"
nonce, expires_str, diff_str, sig = parts
expires = int(expires_str)
token_diff = int(diff_str)
except (ValueError, TypeError):
return False, "Invalid challenge format"
# Verify signature
msg = f"{nonce}:{expires}:{token_diff}".encode()
expected_sig = hmac.new(get_pow_secret(), msg, hashlib.sha256).hexdigest()
if not hmac.compare_digest(sig, expected_sig):
return False, "Invalid challenge signature"
# Check expiry
if int(time.time()) > expires:
return False, "Challenge expired"
# Token difficulty must be at least the required difficulty
if token_diff < required_difficulty:
return False, f"Difficulty too low: {token_diff} < {required_difficulty}"
# Verify solution
try:
solution_int = int(solution)
if solution_int < 0:
return False, "Invalid solution"
except (ValueError, TypeError):
return False, "Invalid solution"
# Check hash meets the token's difficulty (not current dynamic difficulty)
work = f"{nonce}:{solution}".encode()
hash_bytes = hashlib.sha256(work).digest()
zero_bits = 0
for byte in hash_bytes:
if byte == 0:
zero_bits += 8
else:
zero_bits += 8 - byte.bit_length()
break
if zero_bits < token_diff:
return False, f"Insufficient work: {zero_bits} < {token_diff} bits"
return True, ""
# ─────────────────────────────────────────────────────────────────────────────
# Content Processing
# ─────────────────────────────────────────────────────────────────────────────
def calculate_entropy(data: bytes) -> float:
"""Calculate Shannon entropy in bits per byte (0-8 range)."""
if not data:
return 0.0
freq = [0] * 256
for byte in data:
freq[byte] += 1
length = len(data)
entropy = 0.0
for count in freq:
if count > 0:
p = count / length
entropy -= p * math.log2(p)
return entropy
def detect_mime_type(content: bytes, content_type: str | None = None) -> str:
"""Detect MIME type using magic bytes, headers, or content analysis."""
# Magic byte detection (highest priority)
# Slice once for safety - only examine first MAX_MAGIC_LEN bytes
prefix = content[:MAX_MAGIC_LEN]
for magic, mime in MAGIC_SIGNATURES.items():
if prefix[: len(magic)] == magic:
# RIFF container: check subtype at bytes 8-12
if magic == b"RIFF" and len(content) >= 12:
subtype = content[8:12]
if subtype == b"WEBP":
return "image/webp"
if subtype == b"AVI ":
return "video/x-msvideo"
if subtype == b"WAVE":
return "audio/wav"
continue # Unknown RIFF subtype
return mime
# Explicit Content-Type (if specific)
if content_type:
mime = content_type.split(";")[0].strip().lower()
if mime not in GENERIC_MIME_TYPES and MIME_PATTERN.match(mime):
return mime
# UTF-8 text detection
try:
content.decode("utf-8")
return "text/plain"
except UnicodeDecodeError:
return "application/octet-stream"
def is_recognizable_format(content: bytes) -> tuple[bool, str | None]:
"""Check if content is a recognizable (likely unencrypted) format.
Returns (is_recognizable, detected_format).
Used to enforce encryption by rejecting known formats.
"""
# Check magic bytes - slice once for safety
prefix = content[:MAX_MAGIC_LEN]
for magic, mime in MAGIC_SIGNATURES.items():
if prefix[: len(magic)] == magic:
if magic == b"RIFF" and len(content) >= 12 and content[8:12] != b"WEBP":
continue
return True, mime
# Check if valid UTF-8 text (plaintext)
try:
content.decode("utf-8")
return True, "text/plain"
except UnicodeDecodeError:
pass
return False, None
def generate_paste_id(content: bytes) -> str:
"""Generate unique paste ID from content hash and timestamp."""
data = content + str(time.time_ns()).encode()
length = current_app.config["PASTE_ID_LENGTH"]
return hashlib.sha256(data).hexdigest()[:length]
# ─────────────────────────────────────────────────────────────────────────────
# Class-Based Views
# ─────────────────────────────────────────────────────────────────────────────
class IndexView(MethodView):
"""Handle API info and paste creation."""
def get(self) -> Response:
"""Return API information and usage examples."""
prefix = url_prefix() or "/"
url = base_url()
difficulty = get_dynamic_difficulty()
pki_enabled = current_app.config.get("PKI_ENABLED", False)
# Build endpoints dict
endpoints: dict[str, str] = {
f"GET {prefixed_url('/')}": "API information",
f"GET {prefixed_url('/health')}": "Health check",
f"GET {prefixed_url('/client')}": "Download CLI client (fpaste)",
f"GET {prefixed_url('/challenge')}": "Get proof-of-work challenge",
f"POST {prefixed_url('/')}": "Create paste (PoW required)",
f"GET {prefixed_url('/pastes')}": "List your pastes (cert required)",
f"GET {prefixed_url('/<id>')}": "Get paste metadata",
f"GET {prefixed_url('/<id>/raw')}": "Get raw paste content",
f"PUT {prefixed_url('/<id>')}": "Update paste (owner only)",
f"DELETE {prefixed_url('/<id>')}": "Delete paste (owner only)",
f"GET {prefixed_url('/register/challenge')}": "Get registration challenge",
f"POST {prefixed_url('/register')}": "Register for client certificate",
}
if pki_enabled:
endpoints.update(
{
f"GET {prefixed_url('/pki')}": "PKI status",
f"GET {prefixed_url('/pki/ca.crt')}": "Download CA certificate",
f"POST {prefixed_url('/pki/issue')}": "Issue client certificate",
f"GET {prefixed_url('/pki/certs')}": "List certificates",
f"POST {prefixed_url('/pki/revoke/<serial>')}": "Revoke certificate",
}
)
# Build response
response_data: dict[str, Any] = {
"name": "FlaskPaste",
"version": VERSION,
"prefix": prefix,
"endpoints": endpoints,
"authentication": {
"anonymous": "Create pastes only (strict limits)",
"client_cert": "Create + manage own pastes (strict limits)",
"trusted_cert": "All operations (relaxed limits)",
},
"limits": {
"anonymous": {
"max_size": current_app.config["MAX_PASTE_SIZE_ANON"],
"rate": f"{current_app.config['RATE_LIMIT_MAX']}/min",
},
"trusted": {
"max_size": current_app.config["MAX_PASTE_SIZE_AUTH"],
"rate": f"{current_app.config['RATE_LIMIT_MAX'] * current_app.config.get('RATE_LIMIT_AUTH_MULTIPLIER', 5)}/min", # noqa: E501
},
},
"pow": {
"enabled": difficulty > 0,
"difficulty": difficulty,
"hint": "GET /challenge, solve, submit with X-PoW-Token + X-PoW-Solution",
},
"cli": {
"install": f"curl -o fpaste {url}/client && chmod +x fpaste",
"usage": "fpaste file.txt # encrypts by default",
},
"usage": {
"create": f"curl -X POST --data-binary @file.txt {url}/ (with PoW headers)",
"get": f"curl {url}/<id>/raw",
"delete": f"curl -X DELETE {url}/<id> (with X-SSL-Client-SHA1)",
},
}
return json_response(response_data)
def post(self) -> Response:
"""Create a new paste."""
# Parse content
content: bytes | None = None
mime_type: str | None = None
if request.is_json:
data = request.get_json(silent=True)
if data and isinstance(data.get("content"), str):
content = data["content"].encode("utf-8")
mime_type = "text/plain"
else:
content = request.get_data(as_text=False)
if content:
mime_type = detect_mime_type(content, request.content_type)
if not content:
return error_response("No content provided", 400)
# Separate trusted (for limits) from fingerprint (for ownership)
trusted_client = get_client_id() # Only trusted certs get elevated limits
owner = get_client_fingerprint() # Any cert can own pastes
# Rate limiting (check before expensive operations)
client_ip = get_client_ip()
allowed, remaining, limit, reset_timestamp = check_rate_limit(
client_ip, authenticated=bool(trusted_client)
)
# Store rate limit info for response headers
g.rate_limit_remaining = remaining
g.rate_limit_limit = limit
g.rate_limit_reset = reset_timestamp
if not allowed:
current_app.logger.warning(
"Rate limit exceeded: ip=%s trusted=%s", client_ip, bool(trusted_client)
)
# Audit log rate limit event
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.RATE_LIMIT,
AuditOutcome.BLOCKED,
client_id=owner,
client_ip=client_ip,
)
record_rate_limit("blocked")
retry_after = max(1, reset_timestamp - int(time.time()))
response = error_response(
"Rate limit exceeded",
429,
retry_after=retry_after,
)
response.headers["Retry-After"] = str(retry_after)
add_rate_limit_headers(response, 0, limit, reset_timestamp)
return response
# Proof-of-work verification
difficulty = current_app.config["POW_DIFFICULTY"]
if difficulty > 0:
token = request.headers.get("X-PoW-Token", "")
solution = request.headers.get("X-PoW-Solution", "")
if not token or not solution:
return error_response(
"Proof-of-work required", 400, hint="GET /challenge for a new challenge"
)
valid, err = verify_pow(token, solution)
if not valid:
current_app.logger.warning(
"PoW verification failed: %s from=%s", err, request.remote_addr
)
# Audit log PoW failure
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.POW_FAILURE,
AuditOutcome.FAILURE,
client_id=owner,
client_ip=client_ip,
details={"error": err},
)
record_pow("failure")
return error_response(f"Proof-of-work failed: {err}", 400)
record_pow("success")
# Size limits (only trusted clients get elevated limits)
content_size = len(content)
max_size = (
current_app.config["MAX_PASTE_SIZE_AUTH"]
if trusted_client
else current_app.config["MAX_PASTE_SIZE_ANON"]
)
if content_size > max_size:
return error_response(
"Paste too large",
413,
size=content_size,
max_size=max_size,
trusted=trusted_client is not None,
)
# Minimum size check (enforces encryption overhead)
min_size = current_app.config.get("MIN_PASTE_SIZE", 0)
if min_size > 0 and content_size < min_size:
return error_response(
"Paste too small",
400,
size=content_size,
min_size=min_size,
hint="Encrypt content before uploading (fpaste encrypts by default)",
)
# Entropy check
min_entropy = current_app.config.get("MIN_ENTROPY", 0)
min_entropy_size = current_app.config.get("MIN_ENTROPY_SIZE", 256)
if min_entropy > 0 and content_size >= min_entropy_size:
entropy = calculate_entropy(content)
if entropy < min_entropy:
current_app.logger.warning(
"Low entropy rejected: %.2f < %.2f from=%s",
entropy,
min_entropy,
request.remote_addr,
)
return error_response(
"Content entropy too low",
400,
entropy=round(entropy, 2),
min_entropy=min_entropy,
hint="Encrypt content before uploading (fpaste encrypts by default)",
)
# Binary content requirement (reject recognizable formats)
if current_app.config.get("REQUIRE_BINARY", False):
is_recognized, detected_format = is_recognizable_format(content)
if is_recognized:
current_app.logger.warning(
"Recognizable format rejected: %s from=%s",
detected_format,
request.remote_addr,
)
return error_response(
"Recognizable format not allowed",
400,
detected=detected_format,
hint="Encrypt content before uploading (fpaste encrypts by default)",
)
# Deduplication check
content_hash = hashlib.sha256(content).hexdigest()
is_allowed, dedup_count = check_content_hash(content_hash)
if not is_allowed:
window = current_app.config["CONTENT_DEDUP_WINDOW"]
current_app.logger.warning(
"Dedup threshold exceeded: hash=%s count=%d from=%s",
content_hash[:16],
dedup_count,
request.remote_addr,
)
# Audit log dedup block
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.DEDUP_BLOCK,
AuditOutcome.BLOCKED,
client_id=owner,
client_ip=client_ip,
details={"hash": content_hash[:16], "count": dedup_count},
)
record_dedup("blocked")
return error_response(
"Duplicate content rate limit exceeded",
429,
count=dedup_count,
window_seconds=window,
)
record_dedup("allowed")
# Parse optional headers
burn_header = request.headers.get("X-Burn-After-Read", "").strip().lower()
burn_after_read = burn_header in ("true", "1", "yes")
# Determine default expiry based on authentication level
# Anonymous < Untrusted cert < Trusted cert (registered in PKI)
if owner is None:
# Anonymous user
default_expiry = current_app.config.get("EXPIRY_ANON", 86400)
elif trusted_client:
# Trusted certificate (registered in PKI)
from app.pki import is_trusted_certificate
if is_trusted_certificate(owner):
default_expiry = current_app.config.get("EXPIRY_TRUSTED", 2592000)
else:
default_expiry = current_app.config.get("EXPIRY_UNTRUSTED", 604800)
else:
# Has cert but not trusted
default_expiry = current_app.config.get("EXPIRY_UNTRUSTED", 604800)
expires_at = None
expiry_header = request.headers.get("X-Expiry", "").strip()
if expiry_header:
try:
expiry_seconds = int(expiry_header)
if expiry_seconds > 0:
max_expiry = current_app.config.get("MAX_EXPIRY_SECONDS", 0)
if max_expiry > 0:
expiry_seconds = min(expiry_seconds, max_expiry)
expires_at = int(time.time()) + expiry_seconds
except ValueError:
pass
# Apply default expiry if none specified (0 = no expiry for trusted)
if expires_at is None and default_expiry > 0:
expires_at = int(time.time()) + default_expiry
password_hash = None
password_header = request.headers.get("X-Paste-Password", "")
if password_header:
if len(password_header) > 1024:
return error_response("Password too long (max 1024 chars)", 400)
password_hash = hash_password(password_header)
# Insert paste
paste_id = generate_paste_id(content)
now = int(time.time())
db = get_db()
db.execute(
"""INSERT INTO pastes
(id, content, mime_type, owner, created_at, last_accessed,
burn_after_read, expires_at, password_hash)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
paste_id,
content,
mime_type,
owner,
now,
now,
1 if burn_after_read else 0,
expires_at,
password_hash,
),
)
db.commit()
# Build response (creation response is intentionally different - no size)
response_data: dict[str, Any] = {
"id": paste_id,
"url": paste_url(paste_id),
"raw": paste_raw_url(paste_id),
"mime_type": mime_type,
"created_at": now,
}
if owner:
response_data["owner"] = owner
if burn_after_read:
response_data["burn_after_read"] = True
if expires_at:
response_data["expires_at"] = expires_at
if password_hash:
response_data["password_protected"] = True
# Record successful paste for anti-flood tracking
record_antiflood_request()
# Audit log paste creation
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.PASTE_CREATE,
AuditOutcome.SUCCESS,
paste_id=paste_id,
client_id=owner,
client_ip=client_ip,
details={"size": content_size, "mime_type": mime_type},
)
record_paste_created("authenticated" if owner else "anonymous", "success")
response = json_response(response_data, 201)
# Add rate limit headers to successful response
rl_remaining = getattr(g, "rate_limit_remaining", -1)
rl_limit = getattr(g, "rate_limit_limit", -1)
rl_reset = getattr(g, "rate_limit_reset", 0)
if rl_limit > 0:
add_rate_limit_headers(response, rl_remaining, rl_limit, rl_reset)
return response
class HealthView(MethodView):
"""Health check endpoint."""
def get(self) -> Response:
"""Return health status with database check."""
try:
db = get_db()
db.execute("SELECT 1")
return json_response({"status": "healthy", "database": "ok"})
except Exception:
return json_response({"status": "unhealthy", "database": "error"}, 503)
class ChallengeView(MethodView):
"""Proof-of-work challenge endpoint."""
def get(self) -> Response:
"""Generate and return PoW challenge."""
base_difficulty = current_app.config["POW_DIFFICULTY"]
if base_difficulty == 0:
return json_response({"enabled": False, "difficulty": 0})
ch = generate_challenge()
response = {
"enabled": True,
"nonce": ch["nonce"],
"difficulty": ch["difficulty"],
"expires": ch["expires"],
"token": ch["token"],
}
# Indicate if difficulty is elevated due to anti-flood
if ch["difficulty"] > base_difficulty:
response["elevated"] = True
response["base_difficulty"] = base_difficulty
return json_response(response)
class RegisterChallengeView(MethodView):
"""Registration PoW challenge endpoint (higher difficulty)."""
def get(self) -> Response:
"""Generate PoW challenge for registration (higher difficulty)."""
register_difficulty = current_app.config.get("REGISTER_POW_DIFFICULTY", 24)
if register_difficulty == 0:
return json_response({"enabled": False, "difficulty": 0})
ch = generate_challenge(difficulty_override=register_difficulty)
return json_response(
{
"enabled": True,
"nonce": ch["nonce"],
"difficulty": ch["difficulty"],
"expires": ch["expires"],
"token": ch["token"],
"purpose": "registration",
}
)
class RegisterView(MethodView):
"""Public client certificate registration endpoint."""
def post(self) -> Response:
"""Register and obtain a client certificate.
Requires PoW to prevent abuse. Returns PKCS#12 bundle with:
- Client certificate
- Client private key
- CA certificate
Auto-generates CA if not present and PKI_CA_PASSWORD is configured.
"""
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from app.pki import (
CANotFoundError,
PKIError,
create_pkcs12,
generate_ca,
get_ca_info,
issue_certificate,
)
# Check PKI configuration
password = current_app.config.get("PKI_CA_PASSWORD", "")
if not password:
return error_response(
"Registration not available",
503,
hint="PKI_CA_PASSWORD not configured",
)
# Verify PoW
register_difficulty = current_app.config.get("REGISTER_POW_DIFFICULTY", 24)
if register_difficulty > 0:
token = request.headers.get("X-PoW-Token", "")
solution = request.headers.get("X-PoW-Solution", "")
if not token or not solution:
return error_response(
"Proof-of-work required",
400,
hint="GET /register/challenge for a registration challenge",
difficulty=register_difficulty,
)
valid, err = verify_pow(token, solution, min_difficulty=register_difficulty)
if not valid:
current_app.logger.warning(
"Registration PoW failed: %s from=%s", err, request.remote_addr
)
return error_response(f"Proof-of-work failed: {err}", 400)
# Parse common_name from request
common_name = None
if request.is_json:
data = request.get_json(silent=True)
if data and isinstance(data.get("common_name"), str):
common_name = data["common_name"][:64].strip()
if not common_name:
# Generate random common name if not provided
common_name = f"client-{secrets.token_hex(4)}"
# Auto-generate CA if needed (skip PKI_ENABLED check for registration)
ca_info = get_ca_info(skip_enabled_check=True)
if ca_info is None:
ca_days = current_app.config.get("PKI_CA_DAYS", 3650)
try:
ca_info = generate_ca("FlaskPaste CA", password, days=ca_days)
current_app.logger.info(
"CA auto-generated for registration: fingerprint=%s",
ca_info["fingerprint_sha1"][:12],
)
except PKIError as e:
current_app.logger.error("CA auto-generation failed: %s", e)
return error_response("CA generation failed", 500)
# Issue certificate
try:
cert_days = current_app.config.get("PKI_CERT_DAYS", 365)
cert_info = issue_certificate(common_name, password, days=cert_days)
except CANotFoundError:
return error_response("CA not available", 500)
except PKIError as e:
current_app.logger.error("Certificate issuance failed: %s", e)
return error_response("Certificate issuance failed", 500)
# Load CA cert for PKCS#12 (reuse ca_info from above, or refresh if it was just generated)
if ca_info is None or "certificate_pem" not in ca_info:
ca_info = get_ca_info(skip_enabled_check=True)
assert ca_info is not None # CA was just generated or exists
ca_cert = x509.load_pem_x509_certificate(ca_info["certificate_pem"].encode())
client_cert = x509.load_pem_x509_certificate(cert_info["certificate_pem"].encode())
client_key = serialization.load_pem_private_key(
cert_info["private_key_pem"].encode(), password=None
)
# Create PKCS#12 bundle (no password for easy import)
p12_data = create_pkcs12(
private_key=client_key,
certificate=client_cert,
ca_certificate=ca_cert,
friendly_name=common_name,
password=None,
)
current_app.logger.info(
"Client registered: cn=%s fingerprint=%s from=%s",
common_name,
cert_info["fingerprint_sha1"][:12],
request.remote_addr,
)
log_event(
AuditEvent.CERT_ISSUED,
AuditOutcome.SUCCESS,
client_id=cert_info["fingerprint_sha1"],
client_ip=request.remote_addr,
details={
"type": "registration",
"common_name": common_name,
"expires_at": cert_info["expires_at"],
},
)
# Return PKCS#12 as binary download
response = Response(p12_data, mimetype="application/x-pkcs12")
response.headers["Content-Disposition"] = f'attachment; filename="{common_name}.p12"'
response.headers["X-Fingerprint-SHA1"] = cert_info["fingerprint_sha1"]
response.headers["X-Certificate-Expires"] = str(cert_info["expires_at"])
response.headers["X-Is-Admin"] = "1" if cert_info.get("is_admin") else "0"
return response
class ClientView(MethodView):
"""CLI client download endpoint."""
def get(self) -> Response:
"""Serve fpaste CLI with server URL pre-configured."""
import os
server_url = base_url()
client_path = os.path.join(current_app.root_path, "..", "fpaste")
try:
with open(client_path) as f:
content = f.read()
# Replace default server URL
content = content.replace(
'"server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000")',
f'"server": os.environ.get("FLASKPASTE_SERVER", "{server_url}")',
)
content = content.replace(
"http://localhost:5000)",
f"{server_url})",
)
response = Response(content, mimetype="text/x-python")
response.headers["Content-Disposition"] = "attachment; filename=fpaste"
return response
except FileNotFoundError:
return error_response("Client not available", 404)
class PasteView(MethodView):
"""Paste metadata operations."""
def get(self, paste_id: str) -> Response:
"""Retrieve paste metadata."""
# Validate and fetch
if err := validate_paste_id(paste_id):
return err
if err := fetch_paste(paste_id):
return err
row: Row = g.paste
g.db.commit()
return json_response(
build_paste_metadata(
paste_id=row["id"],
mime_type=row["mime_type"],
size=row["size"],
created_at=row["created_at"],
burn_after_read=bool(row["burn_after_read"]),
expires_at=row["expires_at"],
password_protected=bool(row["password_hash"]),
)
)
def head(self, paste_id: str) -> Response:
"""Return paste metadata headers only."""
return self.get(paste_id)
def put(self, paste_id: str) -> Response:
"""Update paste content and/or metadata.
Requires authentication and ownership.
Content update: Send raw body with Content-Type header
Metadata update: Use headers with empty body
Headers:
- X-Paste-Password: Set/change password
- X-Remove-Password: true to remove password
- X-Extend-Expiry: Seconds to add to current expiry
"""
# Validate paste ID format
if err := validate_paste_id(paste_id):
return err
if err := require_auth():
return err
db = get_db()
# Fetch current paste
row = db.execute(
"""SELECT id, owner, content, mime_type, expires_at, password_hash
FROM pastes WHERE id = ?""",
(paste_id,),
).fetchone()
if row is None:
return error_response("Paste not found", 404)
if row["owner"] != g.client_id:
return error_response("Permission denied", 403)
# Check for burn-after-read (cannot update)
burn_check = db.execute(
"SELECT burn_after_read FROM pastes WHERE id = ?", (paste_id,)
).fetchone()
if burn_check and burn_check["burn_after_read"]:
return error_response("Cannot update burn-after-read paste", 400)
# Parse update parameters
new_password = request.headers.get("X-Paste-Password", "").strip() or None
remove_password = request.headers.get("X-Remove-Password", "").lower() in (
"true",
"1",
"yes",
)
extend_expiry_str = request.headers.get("X-Extend-Expiry", "").strip()
# Prepare update fields
update_fields = []
update_params: list[Any] = []
# Content update (if body provided)
content = request.get_data()
if content:
mime_type = request.content_type or "application/octet-stream"
# Sanitize MIME type
if not MIME_PATTERN.match(mime_type.split(";")[0].strip()):
mime_type = "application/octet-stream"
update_fields.append("content = ?")
update_params.append(content)
update_fields.append("mime_type = ?")
update_params.append(mime_type.split(";")[0].strip())
# Password update
if remove_password:
update_fields.append("password_hash = NULL")
elif new_password:
update_fields.append("password_hash = ?")
update_params.append(hash_password(new_password))
# Expiry extension
if extend_expiry_str:
try:
extend_seconds = int(extend_expiry_str)
if extend_seconds > 0:
current_expiry = row["expires_at"]
if current_expiry:
new_expiry = current_expiry + extend_seconds
else:
# If no expiry set, create one from now
new_expiry = int(time.time()) + extend_seconds
update_fields.append("expires_at = ?")
update_params.append(new_expiry)
except ValueError:
return error_response("Invalid X-Extend-Expiry value", 400)
if not update_fields:
return error_response("No updates provided", 400)
# Execute update (fields are hardcoded strings, safe from injection)
update_sql = f"UPDATE pastes SET {', '.join(update_fields)} WHERE id = ?" # noqa: S608 # nosec B608
update_params.append(paste_id)
db.execute(update_sql, update_params)
db.commit()
# Audit log paste update
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.PASTE_UPDATE,
AuditOutcome.SUCCESS,
paste_id=paste_id,
client_id=g.client_id,
client_ip=get_client_ip(),
details={"fields": [f.split(" = ")[0] for f in update_fields]},
)
# Fetch updated paste for response
updated = db.execute(
"""SELECT id, mime_type, length(content) as size, expires_at,
CASE WHEN password_hash IS NOT NULL THEN 1 ELSE 0 END as password_protected
FROM pastes WHERE id = ?""",
(paste_id,),
).fetchone()
response_data: dict[str, Any] = {
"id": updated["id"],
"size": updated["size"],
"mime_type": updated["mime_type"],
}
if updated["expires_at"]:
response_data["expires_at"] = updated["expires_at"]
if updated["password_protected"]:
response_data["password_protected"] = True
return json_response(response_data)
class PasteRawView(MethodView):
"""Raw paste content retrieval."""
def get(self, paste_id: str) -> Response:
"""Retrieve raw paste content."""
# Validate and fetch
if err := validate_paste_id(paste_id):
return err
if err := fetch_paste(paste_id):
return err
row: Row = g.paste
db = g.db
burn_after_read = row["burn_after_read"]
if burn_after_read:
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
current_app.logger.info("Burn-after-read paste deleted: %s", paste_id)
db.commit()
# Audit log paste access
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.PASTE_ACCESS,
AuditOutcome.SUCCESS,
paste_id=paste_id,
client_id=get_client_fingerprint(),
client_ip=get_client_ip(),
details={"burn": bool(burn_after_read)},
)
record_paste_accessed(
"authenticated" if get_client_fingerprint() else "anonymous",
bool(burn_after_read),
)
response = Response(row["content"], mimetype=row["mime_type"])
if row["mime_type"].startswith(("image/", "text/")):
response.headers["Content-Disposition"] = "inline"
if burn_after_read:
response.headers["X-Burn-After-Read"] = "true"
return response
def head(self, paste_id: str) -> Response:
"""Return raw paste headers. HEAD triggers burn-after-read deletion.
Security note: HEAD requests count as paste access for burn-after-read
to prevent attackers from probing paste existence before retrieval.
"""
# Validate and fetch
if err := validate_paste_id(paste_id):
return err
if err := fetch_paste(paste_id):
return err
row: Row = g.paste
db = g.db
# BURN-001: HEAD triggers burn-after-read like GET
burn_after_read = row["burn_after_read"]
if burn_after_read:
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
current_app.logger.info("Burn-after-read paste deleted via HEAD: %s", paste_id)
db.commit()
response = Response(mimetype=row["mime_type"])
response.headers["Content-Length"] = str(row["size"])
if row["mime_type"].startswith(("image/", "text/")):
response.headers["Content-Disposition"] = "inline"
if burn_after_read:
response.headers["X-Burn-After-Read"] = "true"
return response
class PasteDeleteView(MethodView):
"""Paste deletion with authentication."""
def delete(self, paste_id: str) -> Response:
"""Delete paste. Requires ownership or admin rights."""
# Validate
if err := validate_paste_id(paste_id):
return err
if err := require_auth():
return err
db = get_db()
row = db.execute("SELECT owner FROM pastes WHERE id = ?", (paste_id,)).fetchone()
if row is None:
return error_response("Paste not found", 404)
# Allow if owner or admin
if row["owner"] != g.client_id and not is_admin():
return error_response("Permission denied", 403)
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
db.commit()
# Audit log paste deletion
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.PASTE_DELETE,
AuditOutcome.SUCCESS,
paste_id=paste_id,
client_id=g.client_id,
client_ip=get_client_ip(),
)
record_paste_deleted("authenticated", "success")
return json_response({"message": "Paste deleted"})
class PastesListView(MethodView):
"""List pastes with authentication."""
def get(self) -> Response:
"""List pastes owned by authenticated user, or all pastes for admins.
Privacy guarantees:
- Requires authentication (mTLS client certificate)
- Regular users can ONLY see their own pastes
- Admins can see all pastes (with optional owner filter)
- Content is never returned, only metadata
Query parameters:
- limit: max results (default 50, max 200)
- offset: pagination offset (default 0)
- type: filter by MIME type (glob pattern, e.g., "image/*")
- after: filter by created_at >= timestamp
- before: filter by created_at <= timestamp
- all: (admin only) if "1", list all pastes instead of own
- owner: (admin only) filter by owner fingerprint
"""
import fnmatch
# Strict authentication requirement
if err := require_auth():
return err
client_id = g.client_id
user_is_admin = is_admin()
# Parse pagination parameters
try:
limit = min(int(request.args.get("limit", 50)), 200)
offset = max(int(request.args.get("offset", 0)), 0)
except (ValueError, TypeError):
limit, offset = 50, 0
# Parse filter parameters
type_filter = request.args.get("type", "").strip()
try:
after_ts = int(request.args.get("after", 0))
except (ValueError, TypeError):
after_ts = 0
try:
before_ts = int(request.args.get("before", 0))
except (ValueError, TypeError):
before_ts = 0
# Admin-only parameters
show_all = request.args.get("all", "0") == "1" and user_is_admin
owner_filter = request.args.get("owner", "").strip() if user_is_admin else ""
db = get_db()
# Build query with filters
where_clauses: list[str] = []
params: list[Any] = []
# Owner filtering logic
if show_all:
# Admin viewing all pastes (with optional owner filter)
if owner_filter:
where_clauses.append("owner = ?")
params.append(owner_filter)
# else: no owner filter, show all
else:
# Regular user or admin without ?all=1: show only own pastes
where_clauses.append("owner = ?")
params.append(client_id)
if after_ts > 0:
where_clauses.append("created_at >= ?")
params.append(after_ts)
if before_ts > 0:
where_clauses.append("created_at <= ?")
params.append(before_ts)
# Build WHERE clause (may be empty for admin viewing all)
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
# Count total pastes matching filters (where_sql is safe, built from constants)
count_row = db.execute(
f"SELECT COUNT(*) as total FROM pastes WHERE {where_sql}", # noqa: S608 # nosec B608
params,
).fetchone()
total = count_row["total"] if count_row else 0
# Fetch pastes with metadata only (where_sql is safe, built from constants)
# Include owner for admin view
rows = db.execute(
f"""SELECT id, owner, mime_type, length(content) as size, created_at,
last_accessed, burn_after_read, expires_at,
CASE WHEN password_hash IS NOT NULL THEN 1 ELSE 0 END as password_protected
FROM pastes
WHERE {where_sql}
ORDER BY created_at DESC
LIMIT ? OFFSET ?""", # noqa: S608 # nosec B608
[*params, limit, offset],
).fetchall()
# Apply MIME type filter (glob pattern matching done in Python for flexibility)
if type_filter:
rows = [r for r in rows if fnmatch.fnmatch(r["mime_type"], type_filter)]
pastes = [
build_paste_metadata(
paste_id=row["id"],
mime_type=row["mime_type"],
size=row["size"],
created_at=row["created_at"],
owner=row["owner"],
burn_after_read=bool(row["burn_after_read"]),
expires_at=row["expires_at"],
password_protected=bool(row["password_protected"]),
include_owner=show_all,
last_accessed=row["last_accessed"],
)
for row in rows
]
response_data: dict[str, Any] = {
"pastes": pastes,
"count": len(pastes),
"total": total,
"limit": limit,
"offset": offset,
}
if user_is_admin:
response_data["is_admin"] = True
return json_response(response_data)
# ─────────────────────────────────────────────────────────────────────────────
# PKI Views (Certificate Authority)
# ─────────────────────────────────────────────────────────────────────────────
def require_pki_enabled() -> Response | None:
"""Check if PKI is enabled. Returns error response or None if enabled."""
if not current_app.config.get("PKI_ENABLED"):
return error_response("PKI not enabled", 404)
return None
class PKIStatusView(MethodView):
"""PKI status endpoint."""
def get(self) -> Response:
"""Return PKI status and CA info if available."""
if not current_app.config.get("PKI_ENABLED"):
return json_response({"enabled": False})
from app.pki import get_ca_info
ca_info = get_ca_info()
if ca_info is None:
return json_response(
{
"enabled": True,
"ca_exists": False,
"hint": "POST /pki/ca to generate CA",
}
)
return json_response(
{
"enabled": True,
"ca_exists": True,
"common_name": ca_info["common_name"],
"fingerprint_sha1": ca_info["fingerprint_sha1"],
"created_at": ca_info["created_at"],
"expires_at": ca_info["expires_at"],
"key_algorithm": ca_info["key_algorithm"],
}
)
class PKICAGenerateView(MethodView):
"""CA generation endpoint (first-run only)."""
def post(self) -> Response:
"""Generate CA certificate. Only works if no CA exists."""
if err := require_pki_enabled():
return err
from app.pki import (
CAExistsError,
PKIError,
generate_ca,
get_ca_info,
)
# Check if CA already exists
if get_ca_info() is not None:
return error_response("CA already exists", 409)
# Get CA password from config
password = current_app.config.get("PKI_CA_PASSWORD", "")
if not password:
return error_response(
"PKI_CA_PASSWORD not configured",
500,
hint="Set FLASKPASTE_PKI_CA_PASSWORD environment variable",
)
# Parse request for optional common name
common_name = "FlaskPaste CA"
if request.is_json:
data = request.get_json(silent=True)
if data and isinstance(data.get("common_name"), str):
common_name = data["common_name"][:64]
# Generate CA
try:
days = current_app.config.get("PKI_CA_DAYS", 3650)
owner = get_client_fingerprint()
ca_info = generate_ca(common_name, password, days=days, owner=owner)
except CAExistsError:
return error_response("CA already exists", 409)
except PKIError as e:
current_app.logger.error("CA generation failed: %s", e)
return error_response("CA generation failed", 500)
current_app.logger.info(
"CA generated: cn=%s fingerprint=%s", common_name, ca_info["fingerprint_sha1"][:12]
)
log_event(
AuditEvent.CERT_ISSUED,
AuditOutcome.SUCCESS,
client_id=owner,
client_ip=request.remote_addr,
details={
"type": "ca",
"fingerprint": ca_info["fingerprint_sha1"][:16],
"common_name": common_name,
"expires_at": ca_info["expires_at"],
},
)
return json_response(
{
"message": "CA generated",
"common_name": ca_info["common_name"],
"fingerprint_sha1": ca_info["fingerprint_sha1"],
"created_at": ca_info["created_at"],
"expires_at": ca_info["expires_at"],
"download": prefixed_url("/pki/ca.crt"),
},
201,
)
class PKICADownloadView(MethodView):
"""CA certificate download endpoint."""
def get(self) -> Response:
"""Download CA certificate in PEM format."""
if err := require_pki_enabled():
return err
from app.pki import get_ca_info
ca_info = get_ca_info()
if ca_info is None:
return error_response("CA not initialized", 404)
response = Response(ca_info["certificate_pem"], mimetype="application/x-pem-file")
response.headers["Content-Disposition"] = (
f"attachment; filename={ca_info['common_name'].replace(' ', '_')}.crt"
)
return response
class PKIIssueView(MethodView):
"""Certificate issuance endpoint (open registration)."""
def post(self) -> Response:
"""Issue a new client certificate."""
if err := require_pki_enabled():
return err
from app.pki import (
CANotFoundError,
PKIError,
issue_certificate,
)
# Parse request
common_name = None
if request.is_json:
data = request.get_json(silent=True)
if data and isinstance(data.get("common_name"), str):
common_name = data["common_name"][:64]
if not common_name:
return error_response(
"common_name required", 400, hint='POST {"common_name": "your-name"}'
)
# Get CA password from config
password = current_app.config.get("PKI_CA_PASSWORD", "")
if not password:
return error_response("PKI not properly configured", 500)
# Issue certificate
try:
days = current_app.config.get("PKI_CERT_DAYS", 365)
issued_to = get_client_fingerprint()
cert_info = issue_certificate(common_name, password, days=days, issued_to=issued_to)
except CANotFoundError:
return error_response("CA not initialized", 404)
except PKIError as e:
current_app.logger.error("Certificate issuance failed: %s", e)
return error_response("Certificate issuance failed", 500)
current_app.logger.info(
"Certificate issued: cn=%s serial=%s fingerprint=%s to=%s",
common_name,
cert_info["serial"][:8],
cert_info["fingerprint_sha1"][:12],
issued_to or "anonymous",
)
log_event(
AuditEvent.CERT_ISSUED,
AuditOutcome.SUCCESS,
client_id=cert_info["fingerprint_sha1"],
client_ip=request.remote_addr,
details={
"type": "client",
"serial": cert_info["serial"][:16],
"common_name": common_name,
"issued_by": issued_to,
"expires_at": cert_info["expires_at"],
},
)
# Return certificate bundle
return json_response(
{
"message": "Certificate issued",
"serial": cert_info["serial"],
"common_name": cert_info["common_name"],
"fingerprint_sha1": cert_info["fingerprint_sha1"],
"created_at": cert_info["created_at"],
"expires_at": cert_info["expires_at"],
"certificate_pem": cert_info["certificate_pem"],
"private_key_pem": cert_info["private_key_pem"],
"is_admin": cert_info.get("is_admin", False),
},
201,
)
class PKICertsView(MethodView):
"""Certificate listing endpoint."""
def get(self) -> Response:
"""List issued certificates."""
if err := require_pki_enabled():
return err
client_id = get_client_fingerprint()
db = get_db()
# Users with certificates can see their own certs or certs they issued
# Anonymous users (no cert) see nothing
if client_id:
rows = db.execute(
"""SELECT serial, common_name, fingerprint_sha1,
created_at, expires_at, issued_to, status, revoked_at
FROM issued_certificates
WHERE issued_to = ? OR fingerprint_sha1 = ?
ORDER BY created_at DESC""",
(client_id, client_id),
).fetchall()
else:
# Anonymous: empty list
rows = []
certs = []
for row in rows:
cert = {
"serial": row["serial"],
"common_name": row["common_name"],
"fingerprint_sha1": row["fingerprint_sha1"],
"created_at": row["created_at"],
"expires_at": row["expires_at"],
"status": row["status"],
}
if row["issued_to"]:
cert["issued_to"] = row["issued_to"]
if row["revoked_at"]:
cert["revoked_at"] = row["revoked_at"]
certs.append(cert)
return json_response({"certificates": certs, "count": len(certs)})
class PKIRevokeView(MethodView):
"""Certificate revocation endpoint."""
def post(self, serial: str) -> Response:
"""Revoke a certificate by serial number."""
if err := require_pki_enabled():
return err
if err := require_auth():
return err
from app.pki import CertificateNotFoundError, PKIError, revoke_certificate
db = get_db()
# Check certificate exists and get ownership info
row = db.execute(
"SELECT issued_to, fingerprint_sha1, status FROM issued_certificates WHERE serial = ?",
(serial,),
).fetchone()
if row is None:
return error_response("Certificate not found", 404)
if row["status"] == "revoked":
return error_response("Certificate already revoked", 409)
# Check permission: must be issuer or the certificate itself
client_id = g.client_id
can_revoke = row["issued_to"] == client_id or row["fingerprint_sha1"] == client_id
if not can_revoke:
return error_response("Permission denied", 403)
# Revoke
try:
revoke_certificate(serial)
except CertificateNotFoundError:
return error_response("Certificate not found", 404)
except PKIError as e:
current_app.logger.error("Revocation failed: %s", e)
return error_response("Revocation failed", 500)
current_app.logger.info("Certificate revoked: serial=%s by=%s", serial[:8], client_id[:12])
log_event(
AuditEvent.CERT_REVOKED,
AuditOutcome.SUCCESS,
client_id=client_id,
client_ip=get_client_ip(),
details={
"serial": serial[:16],
"fingerprint": row["fingerprint_sha1"][:16],
},
)
return json_response({"message": "Certificate revoked", "serial": serial})
# ─────────────────────────────────────────────────────────────────────────────
# Audit Log View
# ─────────────────────────────────────────────────────────────────────────────
class AuditLogView(MethodView):
"""Audit log query endpoint (admin only)."""
def get(self) -> Response:
"""Query audit log with filters.
Query parameters:
- event_type: Filter by event type
- client_id: Filter by client fingerprint
- paste_id: Filter by paste ID
- outcome: Filter by outcome (success, failure, blocked)
- since: Filter by timestamp >= since
- until: Filter by timestamp <= until
- limit: Maximum results (default 100, max 500)
- offset: Pagination offset
"""
if err := require_auth():
return err
if not is_admin():
return error_response("Admin access required", 403)
from app.audit import query_audit_log
# Parse query parameters
event_type = request.args.get("event_type", "").strip() or None
client_id = request.args.get("client_id", "").strip() or None
paste_id = request.args.get("paste_id", "").strip() or None
outcome = request.args.get("outcome", "").strip() or None
try:
since = int(request.args.get("since", 0)) or None
except (ValueError, TypeError):
since = None
try:
until = int(request.args.get("until", 0)) or None
except (ValueError, TypeError):
until = None
try:
limit = min(int(request.args.get("limit", 100)), 500)
except (ValueError, TypeError):
limit = 100
try:
offset = max(int(request.args.get("offset", 0)), 0)
except (ValueError, TypeError):
offset = 0
entries, total = query_audit_log(
event_type=event_type,
client_id=client_id,
paste_id=paste_id,
outcome=outcome,
since=since,
until=until,
limit=limit,
offset=offset,
)
return json_response(
{
"entries": entries,
"count": len(entries),
"total": total,
"limit": limit,
"offset": offset,
}
)
# ─────────────────────────────────────────────────────────────────────────────
# Route Registration
# ─────────────────────────────────────────────────────────────────────────────
# Index and paste creation
bp.add_url_rule("/", view_func=IndexView.as_view("index"))
# Utility endpoints
bp.add_url_rule("/health", view_func=HealthView.as_view("health"))
bp.add_url_rule("/challenge", view_func=ChallengeView.as_view("challenge"))
bp.add_url_rule("/client", view_func=ClientView.as_view("client"))
# Registration endpoints (public certificate issuance with PoW)
bp.add_url_rule(
"/register/challenge", view_func=RegisterChallengeView.as_view("register_challenge")
)
bp.add_url_rule("/register", view_func=RegisterView.as_view("register"))
# Paste operations
bp.add_url_rule("/pastes", view_func=PastesListView.as_view("pastes_list"))
bp.add_url_rule("/<paste_id>", view_func=PasteView.as_view("paste"), methods=["GET", "HEAD", "PUT"])
bp.add_url_rule(
"/<paste_id>/raw", view_func=PasteRawView.as_view("paste_raw"), methods=["GET", "HEAD"]
)
bp.add_url_rule(
"/<paste_id>", view_func=PasteDeleteView.as_view("paste_delete"), methods=["DELETE"]
)
# PKI endpoints
bp.add_url_rule("/pki", view_func=PKIStatusView.as_view("pki_status"))
bp.add_url_rule("/pki/ca", view_func=PKICAGenerateView.as_view("pki_ca_generate"))
bp.add_url_rule("/pki/ca.crt", view_func=PKICADownloadView.as_view("pki_ca_download"))
bp.add_url_rule("/pki/issue", view_func=PKIIssueView.as_view("pki_issue"))
bp.add_url_rule("/pki/certs", view_func=PKICertsView.as_view("pki_certs"))
bp.add_url_rule(
"/pki/revoke/<serial>", view_func=PKIRevokeView.as_view("pki_revoke"), methods=["POST"]
)
# Audit log endpoint (admin only)
bp.add_url_rule("/audit", view_func=AuditLogView.as_view("audit_log"))