forked from claw/flaskpaste
RATE-002: Proactive rate limit cleanup when entries exceed threshold - Add RATE_LIMIT_CLEANUP_THRESHOLD config (default 0.8) - Trigger cleanup before hitting hard limit - Prevents memory exhaustion under sustained load CLI-001: Validate clipboard tool paths against trusted directories - Add TRUSTED_CLIPBOARD_DIRS for Unix system paths - Add TRUSTED_WINDOWS_PATTERNS for Windows validation - Reject tools in user-writable locations (PATH hijack prevention) - Use absolute paths in subprocess calls
2200 lines
80 KiB
Python
2200 lines
80 KiB
Python
"""API route handlers using modern Flask patterns."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import math
|
|
import re
|
|
import secrets
|
|
import threading
|
|
import time
|
|
from collections import defaultdict
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from flask import Response, current_app, g, request
|
|
from flask.views import MethodView
|
|
|
|
from app.api import bp
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
from app.config import VERSION
|
|
from app.database import check_content_hash, get_db, hash_password, verify_password
|
|
from app.metrics import (
|
|
record_dedup,
|
|
record_paste_accessed,
|
|
record_paste_created,
|
|
record_paste_deleted,
|
|
record_pow,
|
|
record_rate_limit,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlite3 import Row
|
|
|
|
# Compiled patterns for validation
|
|
PASTE_ID_PATTERN = re.compile(r"^[a-f0-9]+$")
|
|
CLIENT_ID_PATTERN = re.compile(r"^[a-f0-9]{40}$")
|
|
MIME_PATTERN = re.compile(r"^[a-z0-9][a-z0-9!#$&\-^_.+]*/[a-z0-9][a-z0-9!#$&\-^_.+]*$")
|
|
|
|
# Magic bytes for binary format detection
|
|
MAGIC_SIGNATURES: dict[bytes, str] = {
|
|
b"\x89PNG\r\n\x1a\n": "image/png",
|
|
b"\xff\xd8\xff": "image/jpeg",
|
|
b"GIF87a": "image/gif",
|
|
b"GIF89a": "image/gif",
|
|
b"RIFF": "image/webp",
|
|
b"PK\x03\x04": "application/zip",
|
|
b"%PDF": "application/pdf",
|
|
b"\x1f\x8b": "application/gzip",
|
|
}
|
|
|
|
# Generic MIME types to override with detection
|
|
GENERIC_MIME_TYPES = frozenset(
|
|
{
|
|
"application/octet-stream",
|
|
"application/x-www-form-urlencoded",
|
|
"text/plain",
|
|
}
|
|
)
|
|
|
|
# Runtime PoW secret cache
|
|
_pow_secret_cache: bytes | None = None
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Anti-flood: dynamic PoW difficulty adjustment
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
_antiflood_lock = threading.Lock()
|
|
_antiflood_requests: list[float] = [] # Global request timestamps
|
|
_antiflood_difficulty: int = 0 # Current difficulty boost (added to base)
|
|
_antiflood_last_increase: float = 0 # Last time difficulty was increased
|
|
|
|
|
|
def get_dynamic_difficulty() -> int:
|
|
"""Get current PoW difficulty including anti-flood adjustment."""
|
|
base: int = current_app.config["POW_DIFFICULTY"]
|
|
if base == 0 or not current_app.config.get("ANTIFLOOD_ENABLED", True):
|
|
return base
|
|
with _antiflood_lock:
|
|
max_diff: int = current_app.config["ANTIFLOOD_MAX"]
|
|
return min(base + _antiflood_difficulty, max_diff)
|
|
|
|
|
|
def record_antiflood_request() -> None:
|
|
"""Record a request for anti-flood tracking and adjust difficulty."""
|
|
if not current_app.config.get("ANTIFLOOD_ENABLED", True):
|
|
return
|
|
if current_app.config["POW_DIFFICULTY"] == 0:
|
|
return
|
|
|
|
global _antiflood_difficulty, _antiflood_last_increase
|
|
|
|
now = time.time()
|
|
window = current_app.config["ANTIFLOOD_WINDOW"]
|
|
threshold = current_app.config["ANTIFLOOD_THRESHOLD"]
|
|
step = current_app.config["ANTIFLOOD_STEP"]
|
|
max_diff = current_app.config["ANTIFLOOD_MAX"]
|
|
decay = current_app.config["ANTIFLOOD_DECAY"]
|
|
base = current_app.config["POW_DIFFICULTY"]
|
|
|
|
with _antiflood_lock:
|
|
# Clean old requests
|
|
cutoff = now - window
|
|
_antiflood_requests[:] = [t for t in _antiflood_requests if t > cutoff]
|
|
|
|
# Record this request
|
|
_antiflood_requests.append(now)
|
|
count = len(_antiflood_requests)
|
|
|
|
# Check if we should increase difficulty
|
|
if count > threshold:
|
|
# Increase difficulty if not already at max
|
|
if base + _antiflood_difficulty < max_diff:
|
|
_antiflood_difficulty += step
|
|
_antiflood_last_increase = now
|
|
elif _antiflood_difficulty > 0 and (now - _antiflood_last_increase) > decay:
|
|
# Decay difficulty if abuse has stopped
|
|
_antiflood_difficulty = max(0, _antiflood_difficulty - step)
|
|
_antiflood_last_increase = now # Reset timer
|
|
|
|
|
|
def reset_antiflood() -> None:
|
|
"""Reset anti-flood state (for testing)."""
|
|
global _antiflood_difficulty, _antiflood_last_increase
|
|
with _antiflood_lock:
|
|
_antiflood_requests.clear()
|
|
_antiflood_difficulty = 0
|
|
_antiflood_last_increase = 0
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Rate Limiting (in-memory sliding window)
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
_rate_limit_lock = threading.Lock()
|
|
_rate_limit_requests: dict[str, list[float]] = defaultdict(list)
|
|
|
|
|
|
def get_client_ip() -> str:
|
|
"""Get client IP address, respecting X-Forwarded-For from trusted proxy."""
|
|
if is_trusted_proxy():
|
|
forwarded = request.headers.get("X-Forwarded-For", "")
|
|
if forwarded:
|
|
# Take the first (client) IP from the chain
|
|
return forwarded.split(",")[0].strip()
|
|
return request.remote_addr or "unknown"
|
|
|
|
|
|
def check_rate_limit(client_ip: str, authenticated: bool = False) -> tuple[bool, int, int, int]:
|
|
"""Check if request is within rate limit.
|
|
|
|
Args:
|
|
client_ip: Client IP address
|
|
authenticated: Whether client is authenticated (higher limits)
|
|
|
|
Returns:
|
|
Tuple of (allowed, remaining, limit, reset_timestamp)
|
|
- allowed: Whether request is within rate limit
|
|
- remaining: Requests remaining in current window
|
|
- limit: Maximum requests per window
|
|
- reset_timestamp: Unix timestamp when window resets
|
|
"""
|
|
if not current_app.config.get("RATE_LIMIT_ENABLED", True):
|
|
return True, -1, -1, 0
|
|
|
|
window = current_app.config["RATE_LIMIT_WINDOW"]
|
|
max_requests = current_app.config["RATE_LIMIT_MAX"]
|
|
max_entries = current_app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000)
|
|
cleanup_threshold = current_app.config.get("RATE_LIMIT_CLEANUP_THRESHOLD", 0.8)
|
|
|
|
if authenticated:
|
|
max_requests *= current_app.config.get("RATE_LIMIT_AUTH_MULTIPLIER", 5)
|
|
|
|
now = time.time()
|
|
cutoff = now - window
|
|
|
|
with _rate_limit_lock:
|
|
entry_count = len(_rate_limit_requests)
|
|
|
|
# RATE-002: Proactive cleanup when exceeding threshold
|
|
threshold_count = int(max_entries * cleanup_threshold)
|
|
if entry_count >= threshold_count:
|
|
# Clean up expired entries first
|
|
_prune_rate_limit_entries(threshold_count // 2, cutoff)
|
|
|
|
# RATE-001: Hard limit enforcement (fallback if threshold cleanup wasn't enough)
|
|
if len(_rate_limit_requests) >= max_entries and client_ip not in _rate_limit_requests:
|
|
# Evict oldest entries (those with oldest last request time)
|
|
_prune_rate_limit_entries(max_entries // 2, cutoff)
|
|
|
|
# Clean old requests and get current list
|
|
requests = _rate_limit_requests[client_ip]
|
|
requests[:] = [t for t in requests if t > cutoff]
|
|
|
|
current_count = len(requests)
|
|
|
|
# Calculate reset timestamp (when oldest request expires, or now + window if empty)
|
|
reset_timestamp = int(requests[0] + window) if requests else int(now + window)
|
|
|
|
if current_count >= max_requests:
|
|
return False, 0, max_requests, reset_timestamp
|
|
|
|
# Record this request
|
|
requests.append(now)
|
|
remaining = max_requests - len(requests)
|
|
|
|
return True, remaining, max_requests, reset_timestamp
|
|
|
|
|
|
def _prune_rate_limit_entries(target_size: int, cutoff: float) -> None:
|
|
"""Prune rate limit entries to target size. Must hold _rate_limit_lock.
|
|
|
|
Removes entries with no recent activity first, then oldest entries.
|
|
"""
|
|
# First pass: remove entries with all expired requests
|
|
to_remove = []
|
|
for ip, requests in _rate_limit_requests.items():
|
|
if not requests or all(t <= cutoff for t in requests):
|
|
to_remove.append(ip)
|
|
|
|
for ip in to_remove:
|
|
del _rate_limit_requests[ip]
|
|
|
|
# Second pass: if still over target, remove entries with oldest last activity
|
|
if len(_rate_limit_requests) > target_size:
|
|
# Sort by most recent request timestamp (ascending = oldest first)
|
|
entries = sorted(
|
|
_rate_limit_requests.items(),
|
|
key=lambda x: max(x[1]) if x[1] else 0,
|
|
)
|
|
# Remove oldest until we're at target size
|
|
remove_count = len(entries) - target_size
|
|
for ip, _ in entries[:remove_count]:
|
|
del _rate_limit_requests[ip]
|
|
|
|
|
|
def cleanup_rate_limits(window: int | None = None) -> int:
|
|
"""Remove expired rate limit entries. Returns count of cleaned entries.
|
|
|
|
Args:
|
|
window: Rate limit window in seconds. If None, uses app config.
|
|
"""
|
|
# This should be called periodically (e.g., via cleanup task)
|
|
if window is None:
|
|
window = current_app.config.get("RATE_LIMIT_WINDOW", 60)
|
|
cutoff = time.time() - window
|
|
|
|
cleaned = 0
|
|
with _rate_limit_lock:
|
|
to_remove = []
|
|
for ip, requests in _rate_limit_requests.items():
|
|
requests[:] = [t for t in requests if t > cutoff]
|
|
if not requests:
|
|
to_remove.append(ip)
|
|
|
|
for ip in to_remove:
|
|
del _rate_limit_requests[ip]
|
|
cleaned += 1
|
|
|
|
return cleaned
|
|
|
|
|
|
def reset_rate_limits() -> None:
|
|
"""Clear all rate limit state. For testing only."""
|
|
with _rate_limit_lock:
|
|
_rate_limit_requests.clear()
|
|
|
|
|
|
def add_rate_limit_headers(
|
|
response: Response, remaining: int, limit: int, reset_timestamp: int
|
|
) -> Response:
|
|
"""Add standard rate limit headers to response.
|
|
|
|
Headers follow draft-ietf-httpapi-ratelimit-headers convention:
|
|
- X-RateLimit-Limit: Maximum requests per window
|
|
- X-RateLimit-Remaining: Requests remaining in current window
|
|
- X-RateLimit-Reset: Unix timestamp when window resets
|
|
"""
|
|
if limit > 0: # Only add headers if rate limiting is enabled
|
|
response.headers["X-RateLimit-Limit"] = str(limit)
|
|
response.headers["X-RateLimit-Remaining"] = str(max(0, remaining))
|
|
response.headers["X-RateLimit-Reset"] = str(reset_timestamp)
|
|
return response
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Response Helpers
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def json_response(data: dict[str, Any], status: int = 200) -> Response:
|
|
"""Create JSON response with proper encoding."""
|
|
return Response(
|
|
json.dumps(data, ensure_ascii=False),
|
|
status=status,
|
|
mimetype="application/json",
|
|
)
|
|
|
|
|
|
def error_response(message: str, status: int, **extra: Any) -> Response:
|
|
"""Create standardized error response."""
|
|
data = {"error": message, **extra}
|
|
return json_response(data, status)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# URL Helpers
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def url_prefix() -> str:
|
|
"""Get configured URL prefix for reverse proxy deployments."""
|
|
prefix: str = current_app.config.get("URL_PREFIX", "")
|
|
return prefix
|
|
|
|
|
|
def prefixed_url(path: str) -> str:
|
|
"""Generate URL with configured prefix."""
|
|
return f"{url_prefix()}{path}"
|
|
|
|
|
|
def paste_url(paste_id: str) -> str:
|
|
"""Generate URL for paste metadata endpoint."""
|
|
return prefixed_url(f"/{paste_id}")
|
|
|
|
|
|
def paste_raw_url(paste_id: str) -> str:
|
|
"""Generate URL for raw paste content endpoint."""
|
|
return prefixed_url(f"/{paste_id}/raw")
|
|
|
|
|
|
def base_url() -> str:
|
|
"""Detect full base URL from request headers."""
|
|
scheme = (
|
|
request.headers.get("X-Forwarded-Proto")
|
|
or request.headers.get("X-Scheme")
|
|
or request.scheme
|
|
)
|
|
host = request.headers.get("X-Forwarded-Host") or request.headers.get("Host") or request.host
|
|
return f"{scheme}://{host}{url_prefix()}"
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Response Builders
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def build_paste_metadata(
|
|
paste_id: str,
|
|
mime_type: str,
|
|
size: int,
|
|
created_at: int,
|
|
*,
|
|
owner: str | None = None,
|
|
burn_after_read: bool = False,
|
|
expires_at: int | None = None,
|
|
password_protected: bool = False,
|
|
include_owner: bool = False,
|
|
last_accessed: int | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Build standardized paste metadata response dict.
|
|
|
|
Args:
|
|
paste_id: Paste identifier
|
|
mime_type: Content MIME type
|
|
size: Content size in bytes
|
|
created_at: Creation timestamp
|
|
owner: Owner fingerprint (included only if include_owner=True)
|
|
burn_after_read: Whether paste is burn-after-read
|
|
expires_at: Expiration timestamp
|
|
password_protected: Whether paste has password
|
|
include_owner: Whether to include owner in response
|
|
last_accessed: Last access timestamp (optional)
|
|
"""
|
|
data: dict[str, Any] = {
|
|
"id": paste_id,
|
|
"mime_type": mime_type,
|
|
"size": size,
|
|
"created_at": created_at,
|
|
"url": paste_url(paste_id),
|
|
"raw": paste_raw_url(paste_id),
|
|
}
|
|
if last_accessed is not None:
|
|
data["last_accessed"] = last_accessed
|
|
if include_owner and owner:
|
|
data["owner"] = owner
|
|
if burn_after_read:
|
|
data["burn_after_read"] = True
|
|
if expires_at:
|
|
data["expires_at"] = expires_at
|
|
if password_protected:
|
|
data["password_protected"] = True
|
|
return data
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Validation Helpers (used within views)
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def validate_paste_id(paste_id: str) -> Response | None:
|
|
"""Validate paste ID format. Returns error response or None if valid."""
|
|
expected_length = current_app.config["PASTE_ID_LENGTH"]
|
|
if len(paste_id) != expected_length or not PASTE_ID_PATTERN.match(paste_id):
|
|
return error_response("Invalid paste ID", 400)
|
|
return None
|
|
|
|
|
|
def fetch_paste(paste_id: str, check_password: bool = True) -> Response | None:
|
|
"""Fetch paste and store in g.paste. Returns error response or None if OK."""
|
|
db = get_db()
|
|
now = int(time.time())
|
|
|
|
# Update access time
|
|
db.execute("UPDATE pastes SET last_accessed = ? WHERE id = ?", (now, paste_id))
|
|
|
|
row = db.execute(
|
|
"""SELECT id, content, mime_type, owner, created_at,
|
|
length(content) as size, burn_after_read, expires_at, password_hash
|
|
FROM pastes WHERE id = ?""",
|
|
(paste_id,),
|
|
).fetchone()
|
|
|
|
if row is None:
|
|
db.commit()
|
|
return error_response("Paste not found", 404)
|
|
|
|
# Password verification
|
|
if check_password and row["password_hash"]:
|
|
provided = request.headers.get("X-Paste-Password", "")
|
|
if not provided:
|
|
db.commit()
|
|
return error_response("Password required", 401, password_protected=True)
|
|
if not verify_password(provided, row["password_hash"]):
|
|
db.commit()
|
|
return error_response("Invalid password", 403)
|
|
|
|
g.paste = row
|
|
g.db = db
|
|
return None
|
|
|
|
|
|
def require_auth() -> Response | None:
|
|
"""Check authentication for ownership operations.
|
|
|
|
Uses get_client_fingerprint() to allow both trusted and untrusted
|
|
certificate holders to manage their own pastes.
|
|
|
|
Returns error response or None if authenticated.
|
|
"""
|
|
client_id = get_client_fingerprint()
|
|
if not client_id:
|
|
return error_response("Authentication required", 401)
|
|
g.client_id = client_id
|
|
return None
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Authentication & Security
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def is_trusted_proxy() -> bool:
|
|
"""Verify request comes from trusted reverse proxy via shared secret."""
|
|
expected = current_app.config.get("TRUSTED_PROXY_SECRET", "")
|
|
if not expected:
|
|
return True
|
|
provided = request.headers.get("X-Proxy-Secret", "")
|
|
return hmac.compare_digest(expected, provided)
|
|
|
|
|
|
def get_client_fingerprint() -> str | None:
|
|
"""Extract client certificate fingerprint for identity/ownership.
|
|
|
|
Returns fingerprint regardless of trust status. Used for:
|
|
- Paste ownership tracking
|
|
- Delete/update/list operations (user manages their own pastes)
|
|
|
|
Returns None if no valid fingerprint provided or proxy not trusted.
|
|
"""
|
|
if not is_trusted_proxy():
|
|
return None
|
|
|
|
sha1 = request.headers.get("X-SSL-Client-SHA1", "").strip().lower()
|
|
if sha1 and CLIENT_ID_PATTERN.match(sha1):
|
|
return sha1
|
|
return None
|
|
|
|
|
|
def get_client_id() -> str | None:
|
|
"""Get trusted client certificate fingerprint for elevated privileges.
|
|
|
|
Returns fingerprint only if certificate is valid and not revoked.
|
|
Used for:
|
|
- Rate limit benefits (higher limits for trusted users)
|
|
- Size limit benefits (larger pastes for trusted users)
|
|
|
|
Untrusted certificates return None here but still work via
|
|
get_client_fingerprint() for ownership operations.
|
|
"""
|
|
fingerprint = get_client_fingerprint()
|
|
if not fingerprint:
|
|
return None
|
|
|
|
# Check if PKI is enabled and certificate is revoked
|
|
if current_app.config.get("PKI_ENABLED"):
|
|
from app.pki import is_certificate_valid
|
|
|
|
if not is_certificate_valid(fingerprint):
|
|
current_app.logger.warning(
|
|
"Elevated auth rejected (revoked/expired): %s", fingerprint[:12] + "..."
|
|
)
|
|
log_event(
|
|
AuditEvent.AUTH_FAILURE,
|
|
AuditOutcome.BLOCKED,
|
|
client_id=fingerprint,
|
|
client_ip=get_client_ip(),
|
|
details={"reason": "revoked_or_expired"},
|
|
)
|
|
return None
|
|
return fingerprint
|
|
|
|
|
|
def is_admin() -> bool:
|
|
"""Check if current authenticated user is an admin.
|
|
|
|
Returns True only if:
|
|
- User has a valid client certificate
|
|
- Certificate is marked as admin in the PKI database
|
|
"""
|
|
client_id = get_client_id()
|
|
if not client_id:
|
|
return False
|
|
|
|
if not current_app.config.get("PKI_ENABLED"):
|
|
return False
|
|
|
|
from app.pki import is_admin_certificate
|
|
|
|
return is_admin_certificate(client_id)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Proof-of-Work
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def get_pow_secret() -> bytes:
|
|
"""Get or generate PoW signing secret."""
|
|
global _pow_secret_cache
|
|
configured: str = current_app.config.get("POW_SECRET", "")
|
|
if configured:
|
|
return configured.encode()
|
|
if _pow_secret_cache is None:
|
|
_pow_secret_cache = secrets.token_bytes(32)
|
|
return _pow_secret_cache
|
|
|
|
|
|
def generate_challenge(difficulty_override: int | None = None) -> dict[str, Any]:
|
|
"""Generate new PoW challenge with signed token.
|
|
|
|
Uses dynamic difficulty which may be elevated during high load,
|
|
unless difficulty_override is specified.
|
|
|
|
Args:
|
|
difficulty_override: Optional fixed difficulty (for registration)
|
|
"""
|
|
if difficulty_override is not None:
|
|
difficulty = difficulty_override
|
|
else:
|
|
difficulty = get_dynamic_difficulty()
|
|
ttl = current_app.config["POW_CHALLENGE_TTL"]
|
|
expires = int(time.time()) + ttl
|
|
nonce = secrets.token_hex(16)
|
|
|
|
msg = f"{nonce}:{expires}:{difficulty}".encode()
|
|
sig = hmac.new(get_pow_secret(), msg, hashlib.sha256).hexdigest()
|
|
|
|
return {
|
|
"nonce": nonce,
|
|
"difficulty": difficulty,
|
|
"expires": expires,
|
|
"token": f"{nonce}:{expires}:{difficulty}:{sig}",
|
|
}
|
|
|
|
|
|
def verify_pow(token: str, solution: str, min_difficulty: int | None = None) -> tuple[bool, str]:
|
|
"""Verify proof-of-work solution. Returns (valid, error_message).
|
|
|
|
Accepts tokens with difficulty >= min_difficulty. The solution must meet the
|
|
token's embedded difficulty (which may be elevated due to anti-flood).
|
|
|
|
Args:
|
|
token: PoW challenge token
|
|
solution: Nonce solution
|
|
min_difficulty: Minimum required difficulty (defaults to POW_DIFFICULTY)
|
|
"""
|
|
base_difficulty = current_app.config["POW_DIFFICULTY"]
|
|
if base_difficulty == 0 and min_difficulty is None:
|
|
return True, ""
|
|
|
|
required_difficulty = min_difficulty if min_difficulty is not None else base_difficulty
|
|
if required_difficulty == 0:
|
|
return True, ""
|
|
|
|
# Parse token
|
|
try:
|
|
parts = token.split(":")
|
|
if len(parts) != 4:
|
|
return False, "Invalid challenge format"
|
|
nonce, expires_str, diff_str, sig = parts
|
|
expires = int(expires_str)
|
|
token_diff = int(diff_str)
|
|
except (ValueError, TypeError):
|
|
return False, "Invalid challenge format"
|
|
|
|
# Verify signature
|
|
msg = f"{nonce}:{expires}:{token_diff}".encode()
|
|
expected_sig = hmac.new(get_pow_secret(), msg, hashlib.sha256).hexdigest()
|
|
if not hmac.compare_digest(sig, expected_sig):
|
|
return False, "Invalid challenge signature"
|
|
|
|
# Check expiry
|
|
if int(time.time()) > expires:
|
|
return False, "Challenge expired"
|
|
|
|
# Token difficulty must be at least the required difficulty
|
|
if token_diff < required_difficulty:
|
|
return False, f"Difficulty too low: {token_diff} < {required_difficulty}"
|
|
|
|
# Verify solution
|
|
try:
|
|
solution_int = int(solution)
|
|
if solution_int < 0:
|
|
return False, "Invalid solution"
|
|
except (ValueError, TypeError):
|
|
return False, "Invalid solution"
|
|
|
|
# Check hash meets the token's difficulty (not current dynamic difficulty)
|
|
work = f"{nonce}:{solution}".encode()
|
|
hash_bytes = hashlib.sha256(work).digest()
|
|
|
|
zero_bits = 0
|
|
for byte in hash_bytes:
|
|
if byte == 0:
|
|
zero_bits += 8
|
|
else:
|
|
zero_bits += 8 - byte.bit_length()
|
|
break
|
|
|
|
if zero_bits < token_diff:
|
|
return False, f"Insufficient work: {zero_bits} < {token_diff} bits"
|
|
|
|
return True, ""
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Content Processing
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def calculate_entropy(data: bytes) -> float:
|
|
"""Calculate Shannon entropy in bits per byte (0-8 range)."""
|
|
if not data:
|
|
return 0.0
|
|
|
|
freq = [0] * 256
|
|
for byte in data:
|
|
freq[byte] += 1
|
|
|
|
length = len(data)
|
|
entropy = 0.0
|
|
for count in freq:
|
|
if count > 0:
|
|
p = count / length
|
|
entropy -= p * math.log2(p)
|
|
|
|
return entropy
|
|
|
|
|
|
def detect_mime_type(content: bytes, content_type: str | None = None) -> str:
|
|
"""Detect MIME type using magic bytes, headers, or content analysis."""
|
|
# Magic byte detection (highest priority)
|
|
for magic, mime in MAGIC_SIGNATURES.items():
|
|
if content.startswith(magic):
|
|
# RIFF container: verify WEBP subtype
|
|
if magic == b"RIFF" and len(content) >= 12 and content[8:12] != b"WEBP":
|
|
continue
|
|
return mime
|
|
|
|
# Explicit Content-Type (if specific)
|
|
if content_type:
|
|
mime = content_type.split(";")[0].strip().lower()
|
|
if mime not in GENERIC_MIME_TYPES and MIME_PATTERN.match(mime):
|
|
return mime
|
|
|
|
# UTF-8 text detection
|
|
try:
|
|
content.decode("utf-8")
|
|
return "text/plain"
|
|
except UnicodeDecodeError:
|
|
return "application/octet-stream"
|
|
|
|
|
|
def is_recognizable_format(content: bytes) -> tuple[bool, str | None]:
|
|
"""Check if content is a recognizable (likely unencrypted) format.
|
|
|
|
Returns (is_recognizable, detected_format).
|
|
Used to enforce encryption by rejecting known formats.
|
|
"""
|
|
# Check magic bytes
|
|
for magic, mime in MAGIC_SIGNATURES.items():
|
|
if content.startswith(magic):
|
|
if magic == b"RIFF" and len(content) >= 12 and content[8:12] != b"WEBP":
|
|
continue
|
|
return True, mime
|
|
|
|
# Check if valid UTF-8 text (plaintext)
|
|
try:
|
|
content.decode("utf-8")
|
|
return True, "text/plain"
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
return False, None
|
|
|
|
|
|
def generate_paste_id(content: bytes) -> str:
|
|
"""Generate unique paste ID from content hash and timestamp."""
|
|
data = content + str(time.time_ns()).encode()
|
|
length = current_app.config["PASTE_ID_LENGTH"]
|
|
return hashlib.sha256(data).hexdigest()[:length]
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Class-Based Views
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class IndexView(MethodView):
|
|
"""Handle API info and paste creation."""
|
|
|
|
def get(self) -> Response:
|
|
"""Return API information and usage examples."""
|
|
prefix = url_prefix() or "/"
|
|
url = base_url()
|
|
difficulty = get_dynamic_difficulty()
|
|
pki_enabled = current_app.config.get("PKI_ENABLED", False)
|
|
|
|
# Build endpoints dict
|
|
endpoints: dict[str, str] = {
|
|
f"GET {prefixed_url('/')}": "API information",
|
|
f"GET {prefixed_url('/health')}": "Health check",
|
|
f"GET {prefixed_url('/client')}": "Download CLI client (fpaste)",
|
|
f"GET {prefixed_url('/challenge')}": "Get proof-of-work challenge",
|
|
f"POST {prefixed_url('/')}": "Create paste (PoW required)",
|
|
f"GET {prefixed_url('/pastes')}": "List your pastes (cert required)",
|
|
f"GET {prefixed_url('/<id>')}": "Get paste metadata",
|
|
f"GET {prefixed_url('/<id>/raw')}": "Get raw paste content",
|
|
f"PUT {prefixed_url('/<id>')}": "Update paste (owner only)",
|
|
f"DELETE {prefixed_url('/<id>')}": "Delete paste (owner only)",
|
|
f"GET {prefixed_url('/register/challenge')}": "Get registration challenge",
|
|
f"POST {prefixed_url('/register')}": "Register for client certificate",
|
|
}
|
|
|
|
if pki_enabled:
|
|
endpoints.update(
|
|
{
|
|
f"GET {prefixed_url('/pki')}": "PKI status",
|
|
f"GET {prefixed_url('/pki/ca.crt')}": "Download CA certificate",
|
|
f"POST {prefixed_url('/pki/issue')}": "Issue client certificate",
|
|
f"GET {prefixed_url('/pki/certs')}": "List certificates",
|
|
f"POST {prefixed_url('/pki/revoke/<serial>')}": "Revoke certificate",
|
|
}
|
|
)
|
|
|
|
# Build response
|
|
response_data: dict[str, Any] = {
|
|
"name": "FlaskPaste",
|
|
"version": VERSION,
|
|
"prefix": prefix,
|
|
"endpoints": endpoints,
|
|
"authentication": {
|
|
"anonymous": "Create pastes only (strict limits)",
|
|
"client_cert": "Create + manage own pastes (strict limits)",
|
|
"trusted_cert": "All operations (relaxed limits)",
|
|
},
|
|
"limits": {
|
|
"anonymous": {
|
|
"max_size": current_app.config["MAX_PASTE_SIZE_ANON"],
|
|
"rate": f"{current_app.config['RATE_LIMIT_MAX']}/min",
|
|
},
|
|
"trusted": {
|
|
"max_size": current_app.config["MAX_PASTE_SIZE_AUTH"],
|
|
"rate": f"{current_app.config['RATE_LIMIT_MAX'] * current_app.config.get('RATE_LIMIT_AUTH_MULTIPLIER', 5)}/min", # noqa: E501
|
|
},
|
|
},
|
|
"pow": {
|
|
"enabled": difficulty > 0,
|
|
"difficulty": difficulty,
|
|
"hint": "GET /challenge, solve, submit with X-PoW-Token + X-PoW-Solution",
|
|
},
|
|
"cli": {
|
|
"install": f"curl -o fpaste {url}/client && chmod +x fpaste",
|
|
"usage": "fpaste file.txt # encrypts by default",
|
|
},
|
|
"usage": {
|
|
"create": f"curl -X POST --data-binary @file.txt {url}/ (with PoW headers)",
|
|
"get": f"curl {url}/<id>/raw",
|
|
"delete": f"curl -X DELETE {url}/<id> (with X-SSL-Client-SHA1)",
|
|
},
|
|
}
|
|
|
|
return json_response(response_data)
|
|
|
|
def post(self) -> Response:
|
|
"""Create a new paste."""
|
|
# Parse content
|
|
content: bytes | None = None
|
|
mime_type: str | None = None
|
|
|
|
if request.is_json:
|
|
data = request.get_json(silent=True)
|
|
if data and isinstance(data.get("content"), str):
|
|
content = data["content"].encode("utf-8")
|
|
mime_type = "text/plain"
|
|
else:
|
|
content = request.get_data(as_text=False)
|
|
if content:
|
|
mime_type = detect_mime_type(content, request.content_type)
|
|
|
|
if not content:
|
|
return error_response("No content provided", 400)
|
|
|
|
# Separate trusted (for limits) from fingerprint (for ownership)
|
|
trusted_client = get_client_id() # Only trusted certs get elevated limits
|
|
owner = get_client_fingerprint() # Any cert can own pastes
|
|
|
|
# Rate limiting (check before expensive operations)
|
|
client_ip = get_client_ip()
|
|
allowed, remaining, limit, reset_timestamp = check_rate_limit(
|
|
client_ip, authenticated=bool(trusted_client)
|
|
)
|
|
|
|
# Store rate limit info for response headers
|
|
g.rate_limit_remaining = remaining
|
|
g.rate_limit_limit = limit
|
|
g.rate_limit_reset = reset_timestamp
|
|
|
|
if not allowed:
|
|
current_app.logger.warning(
|
|
"Rate limit exceeded: ip=%s trusted=%s", client_ip, bool(trusted_client)
|
|
)
|
|
# Audit log rate limit event
|
|
if current_app.config.get("AUDIT_ENABLED", True):
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
|
|
log_event(
|
|
AuditEvent.RATE_LIMIT,
|
|
AuditOutcome.BLOCKED,
|
|
client_id=owner,
|
|
client_ip=client_ip,
|
|
)
|
|
record_rate_limit("blocked")
|
|
retry_after = max(1, reset_timestamp - int(time.time()))
|
|
response = error_response(
|
|
"Rate limit exceeded",
|
|
429,
|
|
retry_after=retry_after,
|
|
)
|
|
response.headers["Retry-After"] = str(retry_after)
|
|
add_rate_limit_headers(response, 0, limit, reset_timestamp)
|
|
return response
|
|
|
|
# Proof-of-work verification
|
|
difficulty = current_app.config["POW_DIFFICULTY"]
|
|
if difficulty > 0:
|
|
token = request.headers.get("X-PoW-Token", "")
|
|
solution = request.headers.get("X-PoW-Solution", "")
|
|
|
|
if not token or not solution:
|
|
return error_response(
|
|
"Proof-of-work required", 400, hint="GET /challenge for a new challenge"
|
|
)
|
|
|
|
valid, err = verify_pow(token, solution)
|
|
if not valid:
|
|
current_app.logger.warning(
|
|
"PoW verification failed: %s from=%s", err, request.remote_addr
|
|
)
|
|
# Audit log PoW failure
|
|
if current_app.config.get("AUDIT_ENABLED", True):
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
|
|
log_event(
|
|
AuditEvent.POW_FAILURE,
|
|
AuditOutcome.FAILURE,
|
|
client_id=owner,
|
|
client_ip=client_ip,
|
|
details={"error": err},
|
|
)
|
|
record_pow("failure")
|
|
return error_response(f"Proof-of-work failed: {err}", 400)
|
|
|
|
record_pow("success")
|
|
|
|
# Size limits (only trusted clients get elevated limits)
|
|
content_size = len(content)
|
|
max_size = (
|
|
current_app.config["MAX_PASTE_SIZE_AUTH"]
|
|
if trusted_client
|
|
else current_app.config["MAX_PASTE_SIZE_ANON"]
|
|
)
|
|
|
|
if content_size > max_size:
|
|
return error_response(
|
|
"Paste too large",
|
|
413,
|
|
size=content_size,
|
|
max_size=max_size,
|
|
trusted=trusted_client is not None,
|
|
)
|
|
|
|
# Minimum size check (enforces encryption overhead)
|
|
min_size = current_app.config.get("MIN_PASTE_SIZE", 0)
|
|
if min_size > 0 and content_size < min_size:
|
|
return error_response(
|
|
"Paste too small",
|
|
400,
|
|
size=content_size,
|
|
min_size=min_size,
|
|
hint="Encrypt content before uploading (fpaste encrypts by default)",
|
|
)
|
|
|
|
# Entropy check
|
|
min_entropy = current_app.config.get("MIN_ENTROPY", 0)
|
|
min_entropy_size = current_app.config.get("MIN_ENTROPY_SIZE", 256)
|
|
if min_entropy > 0 and content_size >= min_entropy_size:
|
|
entropy = calculate_entropy(content)
|
|
if entropy < min_entropy:
|
|
current_app.logger.warning(
|
|
"Low entropy rejected: %.2f < %.2f from=%s",
|
|
entropy,
|
|
min_entropy,
|
|
request.remote_addr,
|
|
)
|
|
return error_response(
|
|
"Content entropy too low",
|
|
400,
|
|
entropy=round(entropy, 2),
|
|
min_entropy=min_entropy,
|
|
hint="Encrypt content before uploading (fpaste encrypts by default)",
|
|
)
|
|
|
|
# Binary content requirement (reject recognizable formats)
|
|
if current_app.config.get("REQUIRE_BINARY", False):
|
|
is_recognized, detected_format = is_recognizable_format(content)
|
|
if is_recognized:
|
|
current_app.logger.warning(
|
|
"Recognizable format rejected: %s from=%s",
|
|
detected_format,
|
|
request.remote_addr,
|
|
)
|
|
return error_response(
|
|
"Recognizable format not allowed",
|
|
400,
|
|
detected=detected_format,
|
|
hint="Encrypt content before uploading (fpaste encrypts by default)",
|
|
)
|
|
|
|
# Deduplication check
|
|
content_hash = hashlib.sha256(content).hexdigest()
|
|
is_allowed, dedup_count = check_content_hash(content_hash)
|
|
|
|
if not is_allowed:
|
|
window = current_app.config["CONTENT_DEDUP_WINDOW"]
|
|
current_app.logger.warning(
|
|
"Dedup threshold exceeded: hash=%s count=%d from=%s",
|
|
content_hash[:16],
|
|
dedup_count,
|
|
request.remote_addr,
|
|
)
|
|
# Audit log dedup block
|
|
if current_app.config.get("AUDIT_ENABLED", True):
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
|
|
log_event(
|
|
AuditEvent.DEDUP_BLOCK,
|
|
AuditOutcome.BLOCKED,
|
|
client_id=owner,
|
|
client_ip=client_ip,
|
|
details={"hash": content_hash[:16], "count": dedup_count},
|
|
)
|
|
record_dedup("blocked")
|
|
return error_response(
|
|
"Duplicate content rate limit exceeded",
|
|
429,
|
|
count=dedup_count,
|
|
window_seconds=window,
|
|
)
|
|
|
|
record_dedup("allowed")
|
|
|
|
# Parse optional headers
|
|
burn_header = request.headers.get("X-Burn-After-Read", "").strip().lower()
|
|
burn_after_read = burn_header in ("true", "1", "yes")
|
|
|
|
# Determine default expiry based on authentication level
|
|
# Anonymous < Untrusted cert < Trusted cert (registered in PKI)
|
|
if owner is None:
|
|
# Anonymous user
|
|
default_expiry = current_app.config.get("EXPIRY_ANON", 86400)
|
|
elif trusted_client:
|
|
# Trusted certificate (registered in PKI)
|
|
from app.pki import is_trusted_certificate
|
|
|
|
if is_trusted_certificate(owner):
|
|
default_expiry = current_app.config.get("EXPIRY_TRUSTED", 2592000)
|
|
else:
|
|
default_expiry = current_app.config.get("EXPIRY_UNTRUSTED", 604800)
|
|
else:
|
|
# Has cert but not trusted
|
|
default_expiry = current_app.config.get("EXPIRY_UNTRUSTED", 604800)
|
|
|
|
expires_at = None
|
|
expiry_header = request.headers.get("X-Expiry", "").strip()
|
|
if expiry_header:
|
|
try:
|
|
expiry_seconds = int(expiry_header)
|
|
if expiry_seconds > 0:
|
|
max_expiry = current_app.config.get("MAX_EXPIRY_SECONDS", 0)
|
|
if max_expiry > 0:
|
|
expiry_seconds = min(expiry_seconds, max_expiry)
|
|
expires_at = int(time.time()) + expiry_seconds
|
|
except ValueError:
|
|
pass
|
|
|
|
# Apply default expiry if none specified (0 = no expiry for trusted)
|
|
if expires_at is None and default_expiry > 0:
|
|
expires_at = int(time.time()) + default_expiry
|
|
|
|
password_hash = None
|
|
password_header = request.headers.get("X-Paste-Password", "")
|
|
if password_header:
|
|
if len(password_header) > 1024:
|
|
return error_response("Password too long (max 1024 chars)", 400)
|
|
password_hash = hash_password(password_header)
|
|
|
|
# Insert paste
|
|
paste_id = generate_paste_id(content)
|
|
now = int(time.time())
|
|
|
|
db = get_db()
|
|
db.execute(
|
|
"""INSERT INTO pastes
|
|
(id, content, mime_type, owner, created_at, last_accessed,
|
|
burn_after_read, expires_at, password_hash)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
paste_id,
|
|
content,
|
|
mime_type,
|
|
owner,
|
|
now,
|
|
now,
|
|
1 if burn_after_read else 0,
|
|
expires_at,
|
|
password_hash,
|
|
),
|
|
)
|
|
db.commit()
|
|
|
|
# Build response (creation response is intentionally different - no size)
|
|
response_data: dict[str, Any] = {
|
|
"id": paste_id,
|
|
"url": paste_url(paste_id),
|
|
"raw": paste_raw_url(paste_id),
|
|
"mime_type": mime_type,
|
|
"created_at": now,
|
|
}
|
|
if owner:
|
|
response_data["owner"] = owner
|
|
if burn_after_read:
|
|
response_data["burn_after_read"] = True
|
|
if expires_at:
|
|
response_data["expires_at"] = expires_at
|
|
if password_hash:
|
|
response_data["password_protected"] = True
|
|
|
|
# Record successful paste for anti-flood tracking
|
|
record_antiflood_request()
|
|
|
|
# Audit log paste creation
|
|
if current_app.config.get("AUDIT_ENABLED", True):
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
|
|
log_event(
|
|
AuditEvent.PASTE_CREATE,
|
|
AuditOutcome.SUCCESS,
|
|
paste_id=paste_id,
|
|
client_id=owner,
|
|
client_ip=client_ip,
|
|
details={"size": content_size, "mime_type": mime_type},
|
|
)
|
|
|
|
record_paste_created("authenticated" if owner else "anonymous", "success")
|
|
|
|
response = json_response(response_data, 201)
|
|
|
|
# Add rate limit headers to successful response
|
|
rl_remaining = getattr(g, "rate_limit_remaining", -1)
|
|
rl_limit = getattr(g, "rate_limit_limit", -1)
|
|
rl_reset = getattr(g, "rate_limit_reset", 0)
|
|
if rl_limit > 0:
|
|
add_rate_limit_headers(response, rl_remaining, rl_limit, rl_reset)
|
|
|
|
return response
|
|
|
|
|
|
class HealthView(MethodView):
|
|
"""Health check endpoint."""
|
|
|
|
def get(self) -> Response:
|
|
"""Return health status with database check."""
|
|
try:
|
|
db = get_db()
|
|
db.execute("SELECT 1")
|
|
return json_response({"status": "healthy", "database": "ok"})
|
|
except Exception:
|
|
return json_response({"status": "unhealthy", "database": "error"}, 503)
|
|
|
|
|
|
class ChallengeView(MethodView):
|
|
"""Proof-of-work challenge endpoint."""
|
|
|
|
def get(self) -> Response:
|
|
"""Generate and return PoW challenge."""
|
|
base_difficulty = current_app.config["POW_DIFFICULTY"]
|
|
if base_difficulty == 0:
|
|
return json_response({"enabled": False, "difficulty": 0})
|
|
|
|
ch = generate_challenge()
|
|
response = {
|
|
"enabled": True,
|
|
"nonce": ch["nonce"],
|
|
"difficulty": ch["difficulty"],
|
|
"expires": ch["expires"],
|
|
"token": ch["token"],
|
|
}
|
|
# Indicate if difficulty is elevated due to anti-flood
|
|
if ch["difficulty"] > base_difficulty:
|
|
response["elevated"] = True
|
|
response["base_difficulty"] = base_difficulty
|
|
return json_response(response)
|
|
|
|
|
|
class RegisterChallengeView(MethodView):
|
|
"""Registration PoW challenge endpoint (higher difficulty)."""
|
|
|
|
def get(self) -> Response:
|
|
"""Generate PoW challenge for registration (higher difficulty)."""
|
|
register_difficulty = current_app.config.get("REGISTER_POW_DIFFICULTY", 24)
|
|
if register_difficulty == 0:
|
|
return json_response({"enabled": False, "difficulty": 0})
|
|
|
|
ch = generate_challenge(difficulty_override=register_difficulty)
|
|
return json_response(
|
|
{
|
|
"enabled": True,
|
|
"nonce": ch["nonce"],
|
|
"difficulty": ch["difficulty"],
|
|
"expires": ch["expires"],
|
|
"token": ch["token"],
|
|
"purpose": "registration",
|
|
}
|
|
)
|
|
|
|
|
|
class RegisterView(MethodView):
|
|
"""Public client certificate registration endpoint."""
|
|
|
|
def post(self) -> Response:
|
|
"""Register and obtain a client certificate.
|
|
|
|
Requires PoW to prevent abuse. Returns PKCS#12 bundle with:
|
|
- Client certificate
|
|
- Client private key
|
|
- CA certificate
|
|
|
|
Auto-generates CA if not present and PKI_CA_PASSWORD is configured.
|
|
"""
|
|
from cryptography import x509
|
|
from cryptography.hazmat.primitives import serialization
|
|
|
|
from app.pki import (
|
|
CANotFoundError,
|
|
PKIError,
|
|
create_pkcs12,
|
|
generate_ca,
|
|
get_ca_info,
|
|
issue_certificate,
|
|
)
|
|
|
|
# Check PKI configuration
|
|
password = current_app.config.get("PKI_CA_PASSWORD", "")
|
|
if not password:
|
|
return error_response(
|
|
"Registration not available",
|
|
503,
|
|
hint="PKI_CA_PASSWORD not configured",
|
|
)
|
|
|
|
# Verify PoW
|
|
register_difficulty = current_app.config.get("REGISTER_POW_DIFFICULTY", 24)
|
|
if register_difficulty > 0:
|
|
token = request.headers.get("X-PoW-Token", "")
|
|
solution = request.headers.get("X-PoW-Solution", "")
|
|
|
|
if not token or not solution:
|
|
return error_response(
|
|
"Proof-of-work required",
|
|
400,
|
|
hint="GET /register/challenge for a registration challenge",
|
|
difficulty=register_difficulty,
|
|
)
|
|
|
|
valid, err = verify_pow(token, solution, min_difficulty=register_difficulty)
|
|
if not valid:
|
|
current_app.logger.warning(
|
|
"Registration PoW failed: %s from=%s", err, request.remote_addr
|
|
)
|
|
return error_response(f"Proof-of-work failed: {err}", 400)
|
|
|
|
# Parse common_name from request
|
|
common_name = None
|
|
if request.is_json:
|
|
data = request.get_json(silent=True)
|
|
if data and isinstance(data.get("common_name"), str):
|
|
common_name = data["common_name"][:64].strip()
|
|
|
|
if not common_name:
|
|
# Generate random common name if not provided
|
|
common_name = f"client-{secrets.token_hex(4)}"
|
|
|
|
# Auto-generate CA if needed (skip PKI_ENABLED check for registration)
|
|
ca_info = get_ca_info(skip_enabled_check=True)
|
|
if ca_info is None:
|
|
ca_days = current_app.config.get("PKI_CA_DAYS", 3650)
|
|
try:
|
|
ca_info = generate_ca("FlaskPaste CA", password, days=ca_days)
|
|
current_app.logger.info(
|
|
"CA auto-generated for registration: fingerprint=%s",
|
|
ca_info["fingerprint_sha1"][:12],
|
|
)
|
|
except PKIError as e:
|
|
current_app.logger.error("CA auto-generation failed: %s", e)
|
|
return error_response("CA generation failed", 500)
|
|
|
|
# Issue certificate
|
|
try:
|
|
cert_days = current_app.config.get("PKI_CERT_DAYS", 365)
|
|
cert_info = issue_certificate(common_name, password, days=cert_days)
|
|
except CANotFoundError:
|
|
return error_response("CA not available", 500)
|
|
except PKIError as e:
|
|
current_app.logger.error("Certificate issuance failed: %s", e)
|
|
return error_response("Certificate issuance failed", 500)
|
|
|
|
# Load CA cert for PKCS#12 (reuse ca_info from above, or refresh if it was just generated)
|
|
if ca_info is None or "certificate_pem" not in ca_info:
|
|
ca_info = get_ca_info(skip_enabled_check=True)
|
|
assert ca_info is not None # CA was just generated or exists
|
|
ca_cert = x509.load_pem_x509_certificate(ca_info["certificate_pem"].encode())
|
|
client_cert = x509.load_pem_x509_certificate(cert_info["certificate_pem"].encode())
|
|
client_key = serialization.load_pem_private_key(
|
|
cert_info["private_key_pem"].encode(), password=None
|
|
)
|
|
|
|
# Create PKCS#12 bundle (no password for easy import)
|
|
p12_data = create_pkcs12(
|
|
private_key=client_key,
|
|
certificate=client_cert,
|
|
ca_certificate=ca_cert,
|
|
friendly_name=common_name,
|
|
password=None,
|
|
)
|
|
|
|
current_app.logger.info(
|
|
"Client registered: cn=%s fingerprint=%s from=%s",
|
|
common_name,
|
|
cert_info["fingerprint_sha1"][:12],
|
|
request.remote_addr,
|
|
)
|
|
log_event(
|
|
AuditEvent.CERT_ISSUED,
|
|
AuditOutcome.SUCCESS,
|
|
client_id=cert_info["fingerprint_sha1"],
|
|
client_ip=request.remote_addr,
|
|
details={
|
|
"type": "registration",
|
|
"common_name": common_name,
|
|
"expires_at": cert_info["expires_at"],
|
|
},
|
|
)
|
|
|
|
# Return PKCS#12 as binary download
|
|
response = Response(p12_data, mimetype="application/x-pkcs12")
|
|
response.headers["Content-Disposition"] = f'attachment; filename="{common_name}.p12"'
|
|
response.headers["X-Fingerprint-SHA1"] = cert_info["fingerprint_sha1"]
|
|
response.headers["X-Certificate-Expires"] = str(cert_info["expires_at"])
|
|
response.headers["X-Is-Admin"] = "1" if cert_info.get("is_admin") else "0"
|
|
return response
|
|
|
|
|
|
class ClientView(MethodView):
|
|
"""CLI client download endpoint."""
|
|
|
|
def get(self) -> Response:
|
|
"""Serve fpaste CLI with server URL pre-configured."""
|
|
import os
|
|
|
|
server_url = base_url()
|
|
client_path = os.path.join(current_app.root_path, "..", "fpaste")
|
|
|
|
try:
|
|
with open(client_path) as f:
|
|
content = f.read()
|
|
|
|
# Replace default server URL
|
|
content = content.replace(
|
|
'"server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000")',
|
|
f'"server": os.environ.get("FLASKPASTE_SERVER", "{server_url}")',
|
|
)
|
|
content = content.replace(
|
|
"http://localhost:5000)",
|
|
f"{server_url})",
|
|
)
|
|
|
|
response = Response(content, mimetype="text/x-python")
|
|
response.headers["Content-Disposition"] = "attachment; filename=fpaste"
|
|
return response
|
|
except FileNotFoundError:
|
|
return error_response("Client not available", 404)
|
|
|
|
|
|
class PasteView(MethodView):
|
|
"""Paste metadata operations."""
|
|
|
|
def get(self, paste_id: str) -> Response:
|
|
"""Retrieve paste metadata."""
|
|
# Validate and fetch
|
|
if err := validate_paste_id(paste_id):
|
|
return err
|
|
if err := fetch_paste(paste_id):
|
|
return err
|
|
|
|
row: Row = g.paste
|
|
g.db.commit()
|
|
|
|
return json_response(
|
|
build_paste_metadata(
|
|
paste_id=row["id"],
|
|
mime_type=row["mime_type"],
|
|
size=row["size"],
|
|
created_at=row["created_at"],
|
|
burn_after_read=bool(row["burn_after_read"]),
|
|
expires_at=row["expires_at"],
|
|
password_protected=bool(row["password_hash"]),
|
|
)
|
|
)
|
|
|
|
def head(self, paste_id: str) -> Response:
|
|
"""Return paste metadata headers only."""
|
|
return self.get(paste_id)
|
|
|
|
def put(self, paste_id: str) -> Response:
|
|
"""Update paste content and/or metadata.
|
|
|
|
Requires authentication and ownership.
|
|
|
|
Content update: Send raw body with Content-Type header
|
|
Metadata update: Use headers with empty body
|
|
|
|
Headers:
|
|
- X-Paste-Password: Set/change password
|
|
- X-Remove-Password: true to remove password
|
|
- X-Extend-Expiry: Seconds to add to current expiry
|
|
"""
|
|
# Validate paste ID format
|
|
if err := validate_paste_id(paste_id):
|
|
return err
|
|
if err := require_auth():
|
|
return err
|
|
|
|
db = get_db()
|
|
|
|
# Fetch current paste
|
|
row = db.execute(
|
|
"""SELECT id, owner, content, mime_type, expires_at, password_hash
|
|
FROM pastes WHERE id = ?""",
|
|
(paste_id,),
|
|
).fetchone()
|
|
|
|
if row is None:
|
|
return error_response("Paste not found", 404)
|
|
|
|
if row["owner"] != g.client_id:
|
|
return error_response("Permission denied", 403)
|
|
|
|
# Check for burn-after-read (cannot update)
|
|
burn_check = db.execute(
|
|
"SELECT burn_after_read FROM pastes WHERE id = ?", (paste_id,)
|
|
).fetchone()
|
|
if burn_check and burn_check["burn_after_read"]:
|
|
return error_response("Cannot update burn-after-read paste", 400)
|
|
|
|
# Parse update parameters
|
|
new_password = request.headers.get("X-Paste-Password", "").strip() or None
|
|
remove_password = request.headers.get("X-Remove-Password", "").lower() in (
|
|
"true",
|
|
"1",
|
|
"yes",
|
|
)
|
|
extend_expiry_str = request.headers.get("X-Extend-Expiry", "").strip()
|
|
|
|
# Prepare update fields
|
|
update_fields = []
|
|
update_params: list[Any] = []
|
|
|
|
# Content update (if body provided)
|
|
content = request.get_data()
|
|
if content:
|
|
mime_type = request.content_type or "application/octet-stream"
|
|
# Sanitize MIME type
|
|
if not MIME_PATTERN.match(mime_type.split(";")[0].strip()):
|
|
mime_type = "application/octet-stream"
|
|
|
|
update_fields.append("content = ?")
|
|
update_params.append(content)
|
|
update_fields.append("mime_type = ?")
|
|
update_params.append(mime_type.split(";")[0].strip())
|
|
|
|
# Password update
|
|
if remove_password:
|
|
update_fields.append("password_hash = NULL")
|
|
elif new_password:
|
|
update_fields.append("password_hash = ?")
|
|
update_params.append(hash_password(new_password))
|
|
|
|
# Expiry extension
|
|
if extend_expiry_str:
|
|
try:
|
|
extend_seconds = int(extend_expiry_str)
|
|
if extend_seconds > 0:
|
|
current_expiry = row["expires_at"]
|
|
if current_expiry:
|
|
new_expiry = current_expiry + extend_seconds
|
|
else:
|
|
# If no expiry set, create one from now
|
|
new_expiry = int(time.time()) + extend_seconds
|
|
update_fields.append("expires_at = ?")
|
|
update_params.append(new_expiry)
|
|
except ValueError:
|
|
return error_response("Invalid X-Extend-Expiry value", 400)
|
|
|
|
if not update_fields:
|
|
return error_response("No updates provided", 400)
|
|
|
|
# Execute update (fields are hardcoded strings, safe from injection)
|
|
update_sql = f"UPDATE pastes SET {', '.join(update_fields)} WHERE id = ?" # noqa: S608 # nosec B608
|
|
update_params.append(paste_id)
|
|
db.execute(update_sql, update_params)
|
|
db.commit()
|
|
|
|
# Audit log paste update
|
|
if current_app.config.get("AUDIT_ENABLED", True):
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
|
|
log_event(
|
|
AuditEvent.PASTE_UPDATE,
|
|
AuditOutcome.SUCCESS,
|
|
paste_id=paste_id,
|
|
client_id=g.client_id,
|
|
client_ip=get_client_ip(),
|
|
details={"fields": [f.split(" = ")[0] for f in update_fields]},
|
|
)
|
|
|
|
# Fetch updated paste for response
|
|
updated = db.execute(
|
|
"""SELECT id, mime_type, length(content) as size, expires_at,
|
|
CASE WHEN password_hash IS NOT NULL THEN 1 ELSE 0 END as password_protected
|
|
FROM pastes WHERE id = ?""",
|
|
(paste_id,),
|
|
).fetchone()
|
|
|
|
response_data: dict[str, Any] = {
|
|
"id": updated["id"],
|
|
"size": updated["size"],
|
|
"mime_type": updated["mime_type"],
|
|
}
|
|
if updated["expires_at"]:
|
|
response_data["expires_at"] = updated["expires_at"]
|
|
if updated["password_protected"]:
|
|
response_data["password_protected"] = True
|
|
|
|
return json_response(response_data)
|
|
|
|
|
|
class PasteRawView(MethodView):
|
|
"""Raw paste content retrieval."""
|
|
|
|
def get(self, paste_id: str) -> Response:
|
|
"""Retrieve raw paste content."""
|
|
# Validate and fetch
|
|
if err := validate_paste_id(paste_id):
|
|
return err
|
|
if err := fetch_paste(paste_id):
|
|
return err
|
|
|
|
row: Row = g.paste
|
|
db = g.db
|
|
|
|
burn_after_read = row["burn_after_read"]
|
|
if burn_after_read:
|
|
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
|
|
current_app.logger.info("Burn-after-read paste deleted: %s", paste_id)
|
|
|
|
db.commit()
|
|
|
|
# Audit log paste access
|
|
if current_app.config.get("AUDIT_ENABLED", True):
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
|
|
log_event(
|
|
AuditEvent.PASTE_ACCESS,
|
|
AuditOutcome.SUCCESS,
|
|
paste_id=paste_id,
|
|
client_id=get_client_fingerprint(),
|
|
client_ip=get_client_ip(),
|
|
details={"burn": bool(burn_after_read)},
|
|
)
|
|
|
|
record_paste_accessed(
|
|
"authenticated" if get_client_fingerprint() else "anonymous",
|
|
bool(burn_after_read),
|
|
)
|
|
|
|
response = Response(row["content"], mimetype=row["mime_type"])
|
|
if row["mime_type"].startswith(("image/", "text/")):
|
|
response.headers["Content-Disposition"] = "inline"
|
|
if burn_after_read:
|
|
response.headers["X-Burn-After-Read"] = "true"
|
|
|
|
return response
|
|
|
|
def head(self, paste_id: str) -> Response:
|
|
"""Return raw paste headers. HEAD triggers burn-after-read deletion.
|
|
|
|
Security note: HEAD requests count as paste access for burn-after-read
|
|
to prevent attackers from probing paste existence before retrieval.
|
|
"""
|
|
# Validate and fetch
|
|
if err := validate_paste_id(paste_id):
|
|
return err
|
|
if err := fetch_paste(paste_id):
|
|
return err
|
|
|
|
row: Row = g.paste
|
|
db = g.db
|
|
|
|
# BURN-001: HEAD triggers burn-after-read like GET
|
|
burn_after_read = row["burn_after_read"]
|
|
if burn_after_read:
|
|
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
|
|
current_app.logger.info("Burn-after-read paste deleted via HEAD: %s", paste_id)
|
|
|
|
db.commit()
|
|
|
|
response = Response(mimetype=row["mime_type"])
|
|
response.headers["Content-Length"] = str(row["size"])
|
|
if row["mime_type"].startswith(("image/", "text/")):
|
|
response.headers["Content-Disposition"] = "inline"
|
|
if burn_after_read:
|
|
response.headers["X-Burn-After-Read"] = "true"
|
|
|
|
return response
|
|
|
|
|
|
class PasteDeleteView(MethodView):
|
|
"""Paste deletion with authentication."""
|
|
|
|
def delete(self, paste_id: str) -> Response:
|
|
"""Delete paste. Requires ownership or admin rights."""
|
|
# Validate
|
|
if err := validate_paste_id(paste_id):
|
|
return err
|
|
if err := require_auth():
|
|
return err
|
|
|
|
db = get_db()
|
|
|
|
row = db.execute("SELECT owner FROM pastes WHERE id = ?", (paste_id,)).fetchone()
|
|
|
|
if row is None:
|
|
return error_response("Paste not found", 404)
|
|
|
|
# Allow if owner or admin
|
|
if row["owner"] != g.client_id and not is_admin():
|
|
return error_response("Permission denied", 403)
|
|
|
|
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
|
|
db.commit()
|
|
|
|
# Audit log paste deletion
|
|
if current_app.config.get("AUDIT_ENABLED", True):
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
|
|
log_event(
|
|
AuditEvent.PASTE_DELETE,
|
|
AuditOutcome.SUCCESS,
|
|
paste_id=paste_id,
|
|
client_id=g.client_id,
|
|
client_ip=get_client_ip(),
|
|
)
|
|
|
|
record_paste_deleted("authenticated", "success")
|
|
|
|
return json_response({"message": "Paste deleted"})
|
|
|
|
|
|
class PastesListView(MethodView):
|
|
"""List pastes with authentication."""
|
|
|
|
def get(self) -> Response:
|
|
"""List pastes owned by authenticated user, or all pastes for admins.
|
|
|
|
Privacy guarantees:
|
|
- Requires authentication (mTLS client certificate)
|
|
- Regular users can ONLY see their own pastes
|
|
- Admins can see all pastes (with optional owner filter)
|
|
- Content is never returned, only metadata
|
|
|
|
Query parameters:
|
|
- limit: max results (default 50, max 200)
|
|
- offset: pagination offset (default 0)
|
|
- type: filter by MIME type (glob pattern, e.g., "image/*")
|
|
- after: filter by created_at >= timestamp
|
|
- before: filter by created_at <= timestamp
|
|
- all: (admin only) if "1", list all pastes instead of own
|
|
- owner: (admin only) filter by owner fingerprint
|
|
"""
|
|
import fnmatch
|
|
|
|
# Strict authentication requirement
|
|
if err := require_auth():
|
|
return err
|
|
|
|
client_id = g.client_id
|
|
user_is_admin = is_admin()
|
|
|
|
# Parse pagination parameters
|
|
try:
|
|
limit = min(int(request.args.get("limit", 50)), 200)
|
|
offset = max(int(request.args.get("offset", 0)), 0)
|
|
except (ValueError, TypeError):
|
|
limit, offset = 50, 0
|
|
|
|
# Parse filter parameters
|
|
type_filter = request.args.get("type", "").strip()
|
|
try:
|
|
after_ts = int(request.args.get("after", 0))
|
|
except (ValueError, TypeError):
|
|
after_ts = 0
|
|
try:
|
|
before_ts = int(request.args.get("before", 0))
|
|
except (ValueError, TypeError):
|
|
before_ts = 0
|
|
|
|
# Admin-only parameters
|
|
show_all = request.args.get("all", "0") == "1" and user_is_admin
|
|
owner_filter = request.args.get("owner", "").strip() if user_is_admin else ""
|
|
|
|
db = get_db()
|
|
|
|
# Build query with filters
|
|
where_clauses: list[str] = []
|
|
params: list[Any] = []
|
|
|
|
# Owner filtering logic
|
|
if show_all:
|
|
# Admin viewing all pastes (with optional owner filter)
|
|
if owner_filter:
|
|
where_clauses.append("owner = ?")
|
|
params.append(owner_filter)
|
|
# else: no owner filter, show all
|
|
else:
|
|
# Regular user or admin without ?all=1: show only own pastes
|
|
where_clauses.append("owner = ?")
|
|
params.append(client_id)
|
|
|
|
if after_ts > 0:
|
|
where_clauses.append("created_at >= ?")
|
|
params.append(after_ts)
|
|
if before_ts > 0:
|
|
where_clauses.append("created_at <= ?")
|
|
params.append(before_ts)
|
|
|
|
# Build WHERE clause (may be empty for admin viewing all)
|
|
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
|
|
|
|
# Count total pastes matching filters (where_sql is safe, built from constants)
|
|
count_row = db.execute(
|
|
f"SELECT COUNT(*) as total FROM pastes WHERE {where_sql}", # noqa: S608 # nosec B608
|
|
params,
|
|
).fetchone()
|
|
total = count_row["total"] if count_row else 0
|
|
|
|
# Fetch pastes with metadata only (where_sql is safe, built from constants)
|
|
# Include owner for admin view
|
|
rows = db.execute(
|
|
f"""SELECT id, owner, mime_type, length(content) as size, created_at,
|
|
last_accessed, burn_after_read, expires_at,
|
|
CASE WHEN password_hash IS NOT NULL THEN 1 ELSE 0 END as password_protected
|
|
FROM pastes
|
|
WHERE {where_sql}
|
|
ORDER BY created_at DESC
|
|
LIMIT ? OFFSET ?""", # noqa: S608 # nosec B608
|
|
[*params, limit, offset],
|
|
).fetchall()
|
|
|
|
# Apply MIME type filter (glob pattern matching done in Python for flexibility)
|
|
if type_filter:
|
|
rows = [r for r in rows if fnmatch.fnmatch(r["mime_type"], type_filter)]
|
|
|
|
pastes = [
|
|
build_paste_metadata(
|
|
paste_id=row["id"],
|
|
mime_type=row["mime_type"],
|
|
size=row["size"],
|
|
created_at=row["created_at"],
|
|
owner=row["owner"],
|
|
burn_after_read=bool(row["burn_after_read"]),
|
|
expires_at=row["expires_at"],
|
|
password_protected=bool(row["password_protected"]),
|
|
include_owner=show_all,
|
|
last_accessed=row["last_accessed"],
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
response_data: dict[str, Any] = {
|
|
"pastes": pastes,
|
|
"count": len(pastes),
|
|
"total": total,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
}
|
|
if user_is_admin:
|
|
response_data["is_admin"] = True
|
|
return json_response(response_data)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# PKI Views (Certificate Authority)
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def require_pki_enabled() -> Response | None:
|
|
"""Check if PKI is enabled. Returns error response or None if enabled."""
|
|
if not current_app.config.get("PKI_ENABLED"):
|
|
return error_response("PKI not enabled", 404)
|
|
return None
|
|
|
|
|
|
class PKIStatusView(MethodView):
|
|
"""PKI status endpoint."""
|
|
|
|
def get(self) -> Response:
|
|
"""Return PKI status and CA info if available."""
|
|
if not current_app.config.get("PKI_ENABLED"):
|
|
return json_response({"enabled": False})
|
|
|
|
from app.pki import get_ca_info
|
|
|
|
ca_info = get_ca_info()
|
|
if ca_info is None:
|
|
return json_response(
|
|
{
|
|
"enabled": True,
|
|
"ca_exists": False,
|
|
"hint": "POST /pki/ca to generate CA",
|
|
}
|
|
)
|
|
|
|
return json_response(
|
|
{
|
|
"enabled": True,
|
|
"ca_exists": True,
|
|
"common_name": ca_info["common_name"],
|
|
"fingerprint_sha1": ca_info["fingerprint_sha1"],
|
|
"created_at": ca_info["created_at"],
|
|
"expires_at": ca_info["expires_at"],
|
|
"key_algorithm": ca_info["key_algorithm"],
|
|
}
|
|
)
|
|
|
|
|
|
class PKICAGenerateView(MethodView):
|
|
"""CA generation endpoint (first-run only)."""
|
|
|
|
def post(self) -> Response:
|
|
"""Generate CA certificate. Only works if no CA exists."""
|
|
if err := require_pki_enabled():
|
|
return err
|
|
|
|
from app.pki import (
|
|
CAExistsError,
|
|
PKIError,
|
|
generate_ca,
|
|
get_ca_info,
|
|
)
|
|
|
|
# Check if CA already exists
|
|
if get_ca_info() is not None:
|
|
return error_response("CA already exists", 409)
|
|
|
|
# Get CA password from config
|
|
password = current_app.config.get("PKI_CA_PASSWORD", "")
|
|
if not password:
|
|
return error_response(
|
|
"PKI_CA_PASSWORD not configured",
|
|
500,
|
|
hint="Set FLASKPASTE_PKI_CA_PASSWORD environment variable",
|
|
)
|
|
|
|
# Parse request for optional common name
|
|
common_name = "FlaskPaste CA"
|
|
if request.is_json:
|
|
data = request.get_json(silent=True)
|
|
if data and isinstance(data.get("common_name"), str):
|
|
common_name = data["common_name"][:64]
|
|
|
|
# Generate CA
|
|
try:
|
|
days = current_app.config.get("PKI_CA_DAYS", 3650)
|
|
owner = get_client_fingerprint()
|
|
ca_info = generate_ca(common_name, password, days=days, owner=owner)
|
|
except CAExistsError:
|
|
return error_response("CA already exists", 409)
|
|
except PKIError as e:
|
|
current_app.logger.error("CA generation failed: %s", e)
|
|
return error_response("CA generation failed", 500)
|
|
|
|
current_app.logger.info(
|
|
"CA generated: cn=%s fingerprint=%s", common_name, ca_info["fingerprint_sha1"][:12]
|
|
)
|
|
log_event(
|
|
AuditEvent.CERT_ISSUED,
|
|
AuditOutcome.SUCCESS,
|
|
client_id=owner,
|
|
client_ip=request.remote_addr,
|
|
details={
|
|
"type": "ca",
|
|
"fingerprint": ca_info["fingerprint_sha1"][:16],
|
|
"common_name": common_name,
|
|
"expires_at": ca_info["expires_at"],
|
|
},
|
|
)
|
|
|
|
return json_response(
|
|
{
|
|
"message": "CA generated",
|
|
"common_name": ca_info["common_name"],
|
|
"fingerprint_sha1": ca_info["fingerprint_sha1"],
|
|
"created_at": ca_info["created_at"],
|
|
"expires_at": ca_info["expires_at"],
|
|
"download": prefixed_url("/pki/ca.crt"),
|
|
},
|
|
201,
|
|
)
|
|
|
|
|
|
class PKICADownloadView(MethodView):
|
|
"""CA certificate download endpoint."""
|
|
|
|
def get(self) -> Response:
|
|
"""Download CA certificate in PEM format."""
|
|
if err := require_pki_enabled():
|
|
return err
|
|
|
|
from app.pki import get_ca_info
|
|
|
|
ca_info = get_ca_info()
|
|
if ca_info is None:
|
|
return error_response("CA not initialized", 404)
|
|
|
|
response = Response(ca_info["certificate_pem"], mimetype="application/x-pem-file")
|
|
response.headers["Content-Disposition"] = (
|
|
f"attachment; filename={ca_info['common_name'].replace(' ', '_')}.crt"
|
|
)
|
|
return response
|
|
|
|
|
|
class PKIIssueView(MethodView):
|
|
"""Certificate issuance endpoint (open registration)."""
|
|
|
|
def post(self) -> Response:
|
|
"""Issue a new client certificate."""
|
|
if err := require_pki_enabled():
|
|
return err
|
|
|
|
from app.pki import (
|
|
CANotFoundError,
|
|
PKIError,
|
|
issue_certificate,
|
|
)
|
|
|
|
# Parse request
|
|
common_name = None
|
|
if request.is_json:
|
|
data = request.get_json(silent=True)
|
|
if data and isinstance(data.get("common_name"), str):
|
|
common_name = data["common_name"][:64]
|
|
|
|
if not common_name:
|
|
return error_response(
|
|
"common_name required", 400, hint='POST {"common_name": "your-name"}'
|
|
)
|
|
|
|
# Get CA password from config
|
|
password = current_app.config.get("PKI_CA_PASSWORD", "")
|
|
if not password:
|
|
return error_response("PKI not properly configured", 500)
|
|
|
|
# Issue certificate
|
|
try:
|
|
days = current_app.config.get("PKI_CERT_DAYS", 365)
|
|
issued_to = get_client_fingerprint()
|
|
cert_info = issue_certificate(common_name, password, days=days, issued_to=issued_to)
|
|
except CANotFoundError:
|
|
return error_response("CA not initialized", 404)
|
|
except PKIError as e:
|
|
current_app.logger.error("Certificate issuance failed: %s", e)
|
|
return error_response("Certificate issuance failed", 500)
|
|
|
|
current_app.logger.info(
|
|
"Certificate issued: cn=%s serial=%s fingerprint=%s to=%s",
|
|
common_name,
|
|
cert_info["serial"][:8],
|
|
cert_info["fingerprint_sha1"][:12],
|
|
issued_to or "anonymous",
|
|
)
|
|
log_event(
|
|
AuditEvent.CERT_ISSUED,
|
|
AuditOutcome.SUCCESS,
|
|
client_id=cert_info["fingerprint_sha1"],
|
|
client_ip=request.remote_addr,
|
|
details={
|
|
"type": "client",
|
|
"serial": cert_info["serial"][:16],
|
|
"common_name": common_name,
|
|
"issued_by": issued_to,
|
|
"expires_at": cert_info["expires_at"],
|
|
},
|
|
)
|
|
|
|
# Return certificate bundle
|
|
return json_response(
|
|
{
|
|
"message": "Certificate issued",
|
|
"serial": cert_info["serial"],
|
|
"common_name": cert_info["common_name"],
|
|
"fingerprint_sha1": cert_info["fingerprint_sha1"],
|
|
"created_at": cert_info["created_at"],
|
|
"expires_at": cert_info["expires_at"],
|
|
"certificate_pem": cert_info["certificate_pem"],
|
|
"private_key_pem": cert_info["private_key_pem"],
|
|
"is_admin": cert_info.get("is_admin", False),
|
|
},
|
|
201,
|
|
)
|
|
|
|
|
|
class PKICertsView(MethodView):
|
|
"""Certificate listing endpoint."""
|
|
|
|
def get(self) -> Response:
|
|
"""List issued certificates."""
|
|
if err := require_pki_enabled():
|
|
return err
|
|
|
|
client_id = get_client_fingerprint()
|
|
|
|
db = get_db()
|
|
|
|
# Users with certificates can see their own certs or certs they issued
|
|
# Anonymous users (no cert) see nothing
|
|
if client_id:
|
|
rows = db.execute(
|
|
"""SELECT serial, common_name, fingerprint_sha1,
|
|
created_at, expires_at, issued_to, status, revoked_at
|
|
FROM issued_certificates
|
|
WHERE issued_to = ? OR fingerprint_sha1 = ?
|
|
ORDER BY created_at DESC""",
|
|
(client_id, client_id),
|
|
).fetchall()
|
|
else:
|
|
# Anonymous: empty list
|
|
rows = []
|
|
|
|
certs = []
|
|
for row in rows:
|
|
cert = {
|
|
"serial": row["serial"],
|
|
"common_name": row["common_name"],
|
|
"fingerprint_sha1": row["fingerprint_sha1"],
|
|
"created_at": row["created_at"],
|
|
"expires_at": row["expires_at"],
|
|
"status": row["status"],
|
|
}
|
|
if row["issued_to"]:
|
|
cert["issued_to"] = row["issued_to"]
|
|
if row["revoked_at"]:
|
|
cert["revoked_at"] = row["revoked_at"]
|
|
certs.append(cert)
|
|
|
|
return json_response({"certificates": certs, "count": len(certs)})
|
|
|
|
|
|
class PKIRevokeView(MethodView):
|
|
"""Certificate revocation endpoint."""
|
|
|
|
def post(self, serial: str) -> Response:
|
|
"""Revoke a certificate by serial number."""
|
|
if err := require_pki_enabled():
|
|
return err
|
|
if err := require_auth():
|
|
return err
|
|
|
|
from app.pki import CertificateNotFoundError, PKIError, revoke_certificate
|
|
|
|
db = get_db()
|
|
|
|
# Check certificate exists and get ownership info
|
|
row = db.execute(
|
|
"SELECT issued_to, fingerprint_sha1, status FROM issued_certificates WHERE serial = ?",
|
|
(serial,),
|
|
).fetchone()
|
|
|
|
if row is None:
|
|
return error_response("Certificate not found", 404)
|
|
|
|
if row["status"] == "revoked":
|
|
return error_response("Certificate already revoked", 409)
|
|
|
|
# Check permission: must be issuer or the certificate itself
|
|
client_id = g.client_id
|
|
can_revoke = row["issued_to"] == client_id or row["fingerprint_sha1"] == client_id
|
|
|
|
if not can_revoke:
|
|
return error_response("Permission denied", 403)
|
|
|
|
# Revoke
|
|
try:
|
|
revoke_certificate(serial)
|
|
except CertificateNotFoundError:
|
|
return error_response("Certificate not found", 404)
|
|
except PKIError as e:
|
|
current_app.logger.error("Revocation failed: %s", e)
|
|
return error_response("Revocation failed", 500)
|
|
|
|
current_app.logger.info("Certificate revoked: serial=%s by=%s", serial[:8], client_id[:12])
|
|
log_event(
|
|
AuditEvent.CERT_REVOKED,
|
|
AuditOutcome.SUCCESS,
|
|
client_id=client_id,
|
|
client_ip=get_client_ip(),
|
|
details={
|
|
"serial": serial[:16],
|
|
"fingerprint": row["fingerprint_sha1"][:16],
|
|
},
|
|
)
|
|
|
|
return json_response({"message": "Certificate revoked", "serial": serial})
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Audit Log View
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class AuditLogView(MethodView):
|
|
"""Audit log query endpoint (admin only)."""
|
|
|
|
def get(self) -> Response:
|
|
"""Query audit log with filters.
|
|
|
|
Query parameters:
|
|
- event_type: Filter by event type
|
|
- client_id: Filter by client fingerprint
|
|
- paste_id: Filter by paste ID
|
|
- outcome: Filter by outcome (success, failure, blocked)
|
|
- since: Filter by timestamp >= since
|
|
- until: Filter by timestamp <= until
|
|
- limit: Maximum results (default 100, max 500)
|
|
- offset: Pagination offset
|
|
"""
|
|
if err := require_auth():
|
|
return err
|
|
if not is_admin():
|
|
return error_response("Admin access required", 403)
|
|
|
|
from app.audit import query_audit_log
|
|
|
|
# Parse query parameters
|
|
event_type = request.args.get("event_type", "").strip() or None
|
|
client_id = request.args.get("client_id", "").strip() or None
|
|
paste_id = request.args.get("paste_id", "").strip() or None
|
|
outcome = request.args.get("outcome", "").strip() or None
|
|
|
|
try:
|
|
since = int(request.args.get("since", 0)) or None
|
|
except (ValueError, TypeError):
|
|
since = None
|
|
try:
|
|
until = int(request.args.get("until", 0)) or None
|
|
except (ValueError, TypeError):
|
|
until = None
|
|
try:
|
|
limit = min(int(request.args.get("limit", 100)), 500)
|
|
except (ValueError, TypeError):
|
|
limit = 100
|
|
try:
|
|
offset = max(int(request.args.get("offset", 0)), 0)
|
|
except (ValueError, TypeError):
|
|
offset = 0
|
|
|
|
entries, total = query_audit_log(
|
|
event_type=event_type,
|
|
client_id=client_id,
|
|
paste_id=paste_id,
|
|
outcome=outcome,
|
|
since=since,
|
|
until=until,
|
|
limit=limit,
|
|
offset=offset,
|
|
)
|
|
|
|
return json_response(
|
|
{
|
|
"entries": entries,
|
|
"count": len(entries),
|
|
"total": total,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
}
|
|
)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Route Registration
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
# Index and paste creation
|
|
bp.add_url_rule("/", view_func=IndexView.as_view("index"))
|
|
|
|
# Utility endpoints
|
|
bp.add_url_rule("/health", view_func=HealthView.as_view("health"))
|
|
bp.add_url_rule("/challenge", view_func=ChallengeView.as_view("challenge"))
|
|
bp.add_url_rule("/client", view_func=ClientView.as_view("client"))
|
|
|
|
# Registration endpoints (public certificate issuance with PoW)
|
|
bp.add_url_rule(
|
|
"/register/challenge", view_func=RegisterChallengeView.as_view("register_challenge")
|
|
)
|
|
bp.add_url_rule("/register", view_func=RegisterView.as_view("register"))
|
|
|
|
# Paste operations
|
|
bp.add_url_rule("/pastes", view_func=PastesListView.as_view("pastes_list"))
|
|
bp.add_url_rule("/<paste_id>", view_func=PasteView.as_view("paste"), methods=["GET", "HEAD", "PUT"])
|
|
bp.add_url_rule(
|
|
"/<paste_id>/raw", view_func=PasteRawView.as_view("paste_raw"), methods=["GET", "HEAD"]
|
|
)
|
|
bp.add_url_rule(
|
|
"/<paste_id>", view_func=PasteDeleteView.as_view("paste_delete"), methods=["DELETE"]
|
|
)
|
|
|
|
# PKI endpoints
|
|
bp.add_url_rule("/pki", view_func=PKIStatusView.as_view("pki_status"))
|
|
bp.add_url_rule("/pki/ca", view_func=PKICAGenerateView.as_view("pki_ca_generate"))
|
|
bp.add_url_rule("/pki/ca.crt", view_func=PKICADownloadView.as_view("pki_ca_download"))
|
|
bp.add_url_rule("/pki/issue", view_func=PKIIssueView.as_view("pki_issue"))
|
|
bp.add_url_rule("/pki/certs", view_func=PKICertsView.as_view("pki_certs"))
|
|
bp.add_url_rule(
|
|
"/pki/revoke/<serial>", view_func=PKIRevokeView.as_view("pki_revoke"), methods=["POST"]
|
|
)
|
|
|
|
# Audit log endpoint (admin only)
|
|
bp.add_url_rule("/audit", view_func=AuditLogView.as_view("audit_log"))
|