forked from username/flaskpaste
Authenticated users can tag pastes with a human-readable label via X-Display-Name header. Supports create, update, remove, and listing. Max 128 chars, control characters rejected.
2738 lines
100 KiB
Python
2738 lines
100 KiB
Python
"""API route handlers using modern Flask patterns."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import math
|
|
import re
|
|
import secrets
|
|
import string
|
|
import threading
|
|
import time
|
|
from collections import defaultdict
|
|
from typing import TYPE_CHECKING, Any
|
|
from urllib.parse import urlparse
|
|
|
|
from flask import Response, current_app, g, request
|
|
from flask.views import MethodView
|
|
|
|
from app.api import bp
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
from app.config import VERSION
|
|
from app.database import check_content_hash, get_db, hash_password, verify_password
|
|
from app.metrics import (
|
|
record_dedup,
|
|
record_paste_accessed,
|
|
record_paste_created,
|
|
record_paste_deleted,
|
|
record_pow,
|
|
record_rate_limit,
|
|
record_url_accessed,
|
|
record_url_created,
|
|
record_url_deleted,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlite3 import Row
|
|
|
|
# Compiled patterns for validation
|
|
PASTE_ID_PATTERN = re.compile(r"^[a-f0-9]+$")
|
|
CLIENT_ID_PATTERN = re.compile(r"^[a-f0-9]{40}$")
|
|
MIME_PATTERN = re.compile(r"^[a-z0-9][a-z0-9!#$&\-^_.+]*/[a-z0-9][a-z0-9!#$&\-^_.+]*$")
|
|
SHORT_ID_PATTERN = re.compile(r"^[a-zA-Z0-9]+$")
|
|
CONTROL_CHAR_PATTERN = re.compile(r"[\x00-\x1f\x7f]")
|
|
SHORT_ID_ALPHABET = string.ascii_letters + string.digits
|
|
ALLOWED_URL_SCHEMES = frozenset({"http", "https"})
|
|
|
|
# NOTE: Magic byte detection commented out - using text/binary detection only.
|
|
# Security headers (X-Content-Type-Options: nosniff, CSP) prevent MIME confusion.
|
|
# For full MIME detection, consider using the `filetype` library.
|
|
#
|
|
# MAGIC_SIGNATURES: dict[bytes, str] = {
|
|
# b"\x89PNG\r\n\x1a\n": "image/png",
|
|
# b"\xff\xd8\xff": "image/jpeg",
|
|
# b"GIF87a": "image/gif",
|
|
# b"GIF89a": "image/gif",
|
|
# b"%PDF": "application/pdf",
|
|
# b"PK\x03\x04": "application/zip",
|
|
# # ... (see git history for full list)
|
|
# }
|
|
|
|
# Generic MIME types to override with detection
|
|
GENERIC_MIME_TYPES = frozenset(
|
|
{
|
|
"application/octet-stream",
|
|
"application/x-www-form-urlencoded",
|
|
"text/plain",
|
|
}
|
|
)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Anti-flood: dynamic PoW difficulty adjustment
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
_antiflood_lock = threading.Lock()
|
|
_antiflood_requests: list[float] = [] # Global request timestamps
|
|
_antiflood_difficulty: int = 0 # Current difficulty boost (added to base)
|
|
_antiflood_last_increase: float = 0 # Last time difficulty was increased
|
|
|
|
|
|
def get_dynamic_difficulty() -> int:
|
|
"""Get current PoW difficulty including anti-flood adjustment."""
|
|
base: int = current_app.config["POW_DIFFICULTY"]
|
|
if base == 0 or not current_app.config.get("ANTIFLOOD_ENABLED", True):
|
|
return base
|
|
with _antiflood_lock:
|
|
max_diff: int = current_app.config["ANTIFLOOD_MAX"]
|
|
return min(base + _antiflood_difficulty, max_diff)
|
|
|
|
|
|
def record_antiflood_request() -> None:
|
|
"""Record a request for anti-flood tracking and adjust difficulty."""
|
|
if not current_app.config.get("ANTIFLOOD_ENABLED", True):
|
|
return
|
|
if current_app.config["POW_DIFFICULTY"] == 0:
|
|
return
|
|
|
|
global _antiflood_difficulty, _antiflood_last_increase
|
|
|
|
now = time.time()
|
|
window = current_app.config["ANTIFLOOD_WINDOW"]
|
|
threshold = current_app.config["ANTIFLOOD_THRESHOLD"]
|
|
step = current_app.config["ANTIFLOOD_STEP"]
|
|
max_diff = current_app.config["ANTIFLOOD_MAX"]
|
|
decay = current_app.config["ANTIFLOOD_DECAY"]
|
|
base = current_app.config["POW_DIFFICULTY"]
|
|
|
|
max_entries = current_app.config.get("ANTIFLOOD_MAX_ENTRIES", 10000)
|
|
|
|
with _antiflood_lock:
|
|
# Clean old requests
|
|
cutoff = now - window
|
|
_antiflood_requests[:] = [t for t in _antiflood_requests if t > cutoff]
|
|
|
|
# FLOOD-001: Cap list size to prevent memory exhaustion
|
|
if len(_antiflood_requests) >= max_entries:
|
|
# Keep only the most recent half
|
|
_antiflood_requests[:] = _antiflood_requests[-(max_entries // 2) :]
|
|
|
|
# Record this request
|
|
_antiflood_requests.append(now)
|
|
count = len(_antiflood_requests)
|
|
|
|
# Check if we should increase difficulty
|
|
if count > threshold:
|
|
# Increase difficulty if not already at max
|
|
if base + _antiflood_difficulty < max_diff:
|
|
_antiflood_difficulty += step
|
|
_antiflood_last_increase = now
|
|
elif _antiflood_difficulty > 0 and (now - _antiflood_last_increase) > decay:
|
|
# Decay difficulty if abuse has stopped
|
|
_antiflood_difficulty = max(0, _antiflood_difficulty - step)
|
|
_antiflood_last_increase = now # Reset timer
|
|
|
|
|
|
def reset_antiflood() -> None:
|
|
"""Reset anti-flood state (for testing)."""
|
|
global _antiflood_difficulty, _antiflood_last_increase
|
|
with _antiflood_lock:
|
|
_antiflood_requests.clear()
|
|
_antiflood_difficulty = 0
|
|
_antiflood_last_increase = 0
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Rate Limiting (in-memory sliding window)
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
_rate_limit_lock = threading.Lock()
|
|
_rate_limit_requests: dict[str, list[float]] = defaultdict(list)
|
|
|
|
|
|
def get_client_ip() -> str:
|
|
"""Get client IP address, respecting X-Forwarded-For from trusted proxy."""
|
|
if is_trusted_proxy():
|
|
forwarded = request.headers.get("X-Forwarded-For", "")
|
|
if forwarded:
|
|
# Take the first (client) IP from the chain
|
|
return forwarded.split(",")[0].strip()
|
|
return request.remote_addr or "unknown"
|
|
|
|
|
|
def check_rate_limit(client_ip: str, authenticated: bool = False) -> tuple[bool, int, int, int]:
|
|
"""Check if request is within rate limit.
|
|
|
|
Args:
|
|
client_ip: Client IP address
|
|
authenticated: Whether client is authenticated (higher limits)
|
|
|
|
Returns:
|
|
Tuple of (allowed, remaining, limit, reset_timestamp)
|
|
- allowed: Whether request is within rate limit
|
|
- remaining: Requests remaining in current window
|
|
- limit: Maximum requests per window
|
|
- reset_timestamp: Unix timestamp when window resets
|
|
"""
|
|
if not current_app.config.get("RATE_LIMIT_ENABLED", True):
|
|
return True, -1, -1, 0
|
|
|
|
window = current_app.config["RATE_LIMIT_WINDOW"]
|
|
max_requests = current_app.config["RATE_LIMIT_MAX"]
|
|
max_entries = current_app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000)
|
|
cleanup_threshold = current_app.config.get("RATE_LIMIT_CLEANUP_THRESHOLD", 0.8)
|
|
|
|
if authenticated:
|
|
max_requests *= current_app.config.get("RATE_LIMIT_AUTH_MULTIPLIER", 5)
|
|
|
|
now = time.time()
|
|
cutoff = now - window
|
|
|
|
with _rate_limit_lock:
|
|
entry_count = len(_rate_limit_requests)
|
|
|
|
# RATE-002: Proactive cleanup when exceeding threshold
|
|
threshold_count = int(max_entries * cleanup_threshold)
|
|
if entry_count >= threshold_count:
|
|
# Clean up expired entries first
|
|
_prune_rate_limit_entries(threshold_count // 2, cutoff)
|
|
|
|
# RATE-001: Hard limit enforcement (fallback if threshold cleanup wasn't enough)
|
|
if len(_rate_limit_requests) >= max_entries and client_ip not in _rate_limit_requests:
|
|
# Evict oldest entries (those with oldest last request time)
|
|
_prune_rate_limit_entries(max_entries // 2, cutoff)
|
|
|
|
# Clean old requests and get current list
|
|
requests = _rate_limit_requests[client_ip]
|
|
requests[:] = [t for t in requests if t > cutoff]
|
|
|
|
current_count = len(requests)
|
|
|
|
# Calculate reset timestamp (when oldest request expires, or now + window if empty)
|
|
reset_timestamp = int(requests[0] + window) if requests else int(now + window)
|
|
|
|
if current_count >= max_requests:
|
|
return False, 0, max_requests, reset_timestamp
|
|
|
|
# Record this request
|
|
requests.append(now)
|
|
remaining = max_requests - len(requests)
|
|
|
|
return True, remaining, max_requests, reset_timestamp
|
|
|
|
|
|
def _prune_rate_limit_entries(target_size: int, cutoff: float) -> None:
|
|
"""Prune rate limit entries to target size. Must hold _rate_limit_lock.
|
|
|
|
Removes entries with no recent activity first, then oldest entries.
|
|
"""
|
|
# First pass: remove entries with all expired requests
|
|
to_remove = []
|
|
for ip, requests in _rate_limit_requests.items():
|
|
if not requests or all(t <= cutoff for t in requests):
|
|
to_remove.append(ip)
|
|
|
|
for ip in to_remove:
|
|
del _rate_limit_requests[ip]
|
|
|
|
# Second pass: if still over target, remove entries with oldest last activity
|
|
if len(_rate_limit_requests) > target_size:
|
|
# Sort by most recent request timestamp (ascending = oldest first)
|
|
entries = sorted(
|
|
_rate_limit_requests.items(),
|
|
key=lambda x: max(x[1]) if x[1] else 0,
|
|
)
|
|
# Remove oldest until we're at target size
|
|
remove_count = len(entries) - target_size
|
|
for ip, _ in entries[:remove_count]:
|
|
del _rate_limit_requests[ip]
|
|
|
|
|
|
def cleanup_rate_limits(window: int | None = None) -> int:
|
|
"""Remove expired rate limit entries. Returns count of cleaned entries.
|
|
|
|
Args:
|
|
window: Rate limit window in seconds. If None, uses app config.
|
|
"""
|
|
# This should be called periodically (e.g., via cleanup task)
|
|
if window is None:
|
|
window = current_app.config.get("RATE_LIMIT_WINDOW", 60)
|
|
cutoff = time.time() - window
|
|
|
|
cleaned = 0
|
|
with _rate_limit_lock:
|
|
to_remove = []
|
|
for ip, requests in _rate_limit_requests.items():
|
|
requests[:] = [t for t in requests if t > cutoff]
|
|
if not requests:
|
|
to_remove.append(ip)
|
|
|
|
for ip in to_remove:
|
|
del _rate_limit_requests[ip]
|
|
cleaned += 1
|
|
|
|
return cleaned
|
|
|
|
|
|
def reset_rate_limits() -> None:
|
|
"""Clear all rate limit state. For testing only."""
|
|
with _rate_limit_lock:
|
|
_rate_limit_requests.clear()
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# ENUM-001: Lookup Rate Limiting (prevents paste ID enumeration)
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
_lookup_rate_limit_lock = threading.Lock()
|
|
_lookup_rate_limit_requests: dict[str, list[float]] = defaultdict(list)
|
|
|
|
|
|
def check_lookup_rate_limit(client_ip: str) -> tuple[bool, int]:
|
|
"""Check if lookup request is within rate limit.
|
|
|
|
Args:
|
|
client_ip: Client IP address
|
|
|
|
Returns:
|
|
Tuple of (allowed, retry_after_seconds)
|
|
"""
|
|
if not current_app.config.get("LOOKUP_RATE_LIMIT_ENABLED", True):
|
|
return True, 0
|
|
|
|
window = current_app.config.get("LOOKUP_RATE_LIMIT_WINDOW", 60)
|
|
max_requests = current_app.config.get("LOOKUP_RATE_LIMIT_MAX", 60)
|
|
max_entries = current_app.config.get("LOOKUP_RATE_LIMIT_MAX_ENTRIES", 10000)
|
|
|
|
now = time.time()
|
|
cutoff = now - window
|
|
|
|
with _lookup_rate_limit_lock:
|
|
# ENUM-002: Memory protection - prune if at capacity
|
|
if (
|
|
len(_lookup_rate_limit_requests) >= max_entries
|
|
and client_ip not in _lookup_rate_limit_requests
|
|
):
|
|
# Evict expired entries first
|
|
expired = [
|
|
ip
|
|
for ip, reqs in _lookup_rate_limit_requests.items()
|
|
if not reqs or reqs[-1] <= cutoff
|
|
]
|
|
for ip in expired:
|
|
del _lookup_rate_limit_requests[ip]
|
|
|
|
# If still at capacity, evict oldest entries
|
|
if len(_lookup_rate_limit_requests) >= max_entries:
|
|
sorted_ips = sorted(
|
|
_lookup_rate_limit_requests.items(),
|
|
key=lambda x: x[1][-1] if x[1] else 0,
|
|
)
|
|
for ip, _ in sorted_ips[: max_entries // 4]:
|
|
del _lookup_rate_limit_requests[ip]
|
|
|
|
requests = _lookup_rate_limit_requests[client_ip]
|
|
requests[:] = [t for t in requests if t > cutoff]
|
|
|
|
if len(requests) >= max_requests:
|
|
retry_after = int(requests[0] + window - now) + 1
|
|
return False, max(1, retry_after)
|
|
|
|
requests.append(now)
|
|
return True, 0
|
|
|
|
|
|
def reset_lookup_rate_limits() -> None:
|
|
"""Clear lookup rate limit state. For testing only."""
|
|
with _lookup_rate_limit_lock:
|
|
_lookup_rate_limit_requests.clear()
|
|
|
|
|
|
def add_rate_limit_headers(
|
|
response: Response, remaining: int, limit: int, reset_timestamp: int
|
|
) -> Response:
|
|
"""Add standard rate limit headers to response.
|
|
|
|
Headers follow draft-ietf-httpapi-ratelimit-headers convention:
|
|
- X-RateLimit-Limit: Maximum requests per window
|
|
- X-RateLimit-Remaining: Requests remaining in current window
|
|
- X-RateLimit-Reset: Unix timestamp when window resets
|
|
"""
|
|
if limit > 0: # Only add headers if rate limiting is enabled
|
|
response.headers["X-RateLimit-Limit"] = str(limit)
|
|
response.headers["X-RateLimit-Remaining"] = str(max(0, remaining))
|
|
response.headers["X-RateLimit-Reset"] = str(reset_timestamp)
|
|
return response
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Response Helpers
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def json_response(data: dict[str, Any], status: int = 200) -> Response:
|
|
"""Create JSON response with proper encoding."""
|
|
return Response(
|
|
json.dumps(data, ensure_ascii=False),
|
|
status=status,
|
|
mimetype="application/json",
|
|
)
|
|
|
|
|
|
def error_response(message: str, status: int, **extra: Any) -> Response:
|
|
"""Create standardized error response."""
|
|
data = {"error": message, **extra}
|
|
return json_response(data, status)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# URL Helpers
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def url_prefix() -> str:
|
|
"""Get configured URL prefix for reverse proxy deployments."""
|
|
prefix: str = current_app.config.get("URL_PREFIX", "")
|
|
return prefix
|
|
|
|
|
|
def prefixed_url(path: str) -> str:
|
|
"""Generate URL with configured prefix."""
|
|
return f"{url_prefix()}{path}"
|
|
|
|
|
|
def paste_url(paste_id: str) -> str:
|
|
"""Generate URL for paste metadata endpoint."""
|
|
return prefixed_url(f"/{paste_id}")
|
|
|
|
|
|
def paste_raw_url(paste_id: str) -> str:
|
|
"""Generate URL for raw paste content endpoint."""
|
|
return prefixed_url(f"/{paste_id}/raw")
|
|
|
|
|
|
def short_url_path(short_id: str) -> str:
|
|
"""Generate path for short URL redirect endpoint."""
|
|
return prefixed_url(f"/s/{short_id}")
|
|
|
|
|
|
def short_url_info_path(short_id: str) -> str:
|
|
"""Generate path for short URL info endpoint."""
|
|
return prefixed_url(f"/s/{short_id}/info")
|
|
|
|
|
|
def base_url() -> str:
|
|
"""Detect full base URL from request headers."""
|
|
scheme = (
|
|
request.headers.get("X-Forwarded-Proto")
|
|
or request.headers.get("X-Scheme")
|
|
or request.scheme
|
|
)
|
|
host = request.headers.get("X-Forwarded-Host") or request.headers.get("Host") or request.host
|
|
return f"{scheme}://{host}{url_prefix()}"
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Response Builders
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def build_paste_metadata(
|
|
paste_id: str,
|
|
mime_type: str,
|
|
size: int,
|
|
created_at: int,
|
|
*,
|
|
owner: str | None = None,
|
|
burn_after_read: bool = False,
|
|
expires_at: int | None = None,
|
|
password_protected: bool = False,
|
|
include_owner: bool = False,
|
|
last_accessed: int | None = None,
|
|
display_name: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Build standardized paste metadata response dict.
|
|
|
|
Args:
|
|
paste_id: Paste identifier
|
|
mime_type: Content MIME type
|
|
size: Content size in bytes
|
|
created_at: Creation timestamp
|
|
owner: Owner fingerprint (included only if include_owner=True)
|
|
burn_after_read: Whether paste is burn-after-read
|
|
expires_at: Expiration timestamp
|
|
password_protected: Whether paste has password
|
|
include_owner: Whether to include owner in response
|
|
last_accessed: Last access timestamp (optional)
|
|
display_name: Human-readable label (optional)
|
|
"""
|
|
data: dict[str, Any] = {
|
|
"id": paste_id,
|
|
"mime_type": mime_type,
|
|
"size": size,
|
|
"created_at": created_at,
|
|
"url": paste_url(paste_id),
|
|
"raw": paste_raw_url(paste_id),
|
|
}
|
|
if last_accessed is not None:
|
|
data["last_accessed"] = last_accessed
|
|
if include_owner and owner:
|
|
data["owner"] = owner
|
|
if burn_after_read:
|
|
data["burn_after_read"] = True
|
|
if expires_at:
|
|
data["expires_at"] = expires_at
|
|
if password_protected:
|
|
data["password_protected"] = True
|
|
if display_name:
|
|
data["display_name"] = display_name
|
|
return data
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Validation Helpers (used within views)
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def validate_paste_id(paste_id: str) -> Response | None:
|
|
"""Validate paste ID format. Returns error response or None if valid."""
|
|
expected_length = current_app.config["PASTE_ID_LENGTH"]
|
|
if len(paste_id) != expected_length or not PASTE_ID_PATTERN.match(paste_id):
|
|
return error_response("Invalid paste ID", 400)
|
|
return None
|
|
|
|
|
|
def fetch_paste(paste_id: str, check_password: bool = True) -> Response | None:
|
|
"""Fetch paste and store in g.paste. Returns error response or None if OK."""
|
|
# ENUM-001: Rate limit lookups to prevent enumeration attacks
|
|
# Trusted certificate holders are exempt
|
|
if not get_client_id():
|
|
client_ip = get_client_ip()
|
|
allowed, retry_after = check_lookup_rate_limit(client_ip)
|
|
if not allowed:
|
|
response = error_response(
|
|
f"Lookup rate limit exceeded. Retry after {retry_after} seconds.",
|
|
429,
|
|
retry_after=retry_after,
|
|
)
|
|
response.headers["Retry-After"] = str(retry_after)
|
|
return response
|
|
|
|
db = get_db()
|
|
now = int(time.time())
|
|
|
|
# Update access time
|
|
db.execute("UPDATE pastes SET last_accessed = ? WHERE id = ?", (now, paste_id))
|
|
|
|
row = db.execute(
|
|
"""SELECT id, content, mime_type, owner, created_at,
|
|
length(content) as size, burn_after_read, expires_at,
|
|
password_hash, display_name
|
|
FROM pastes WHERE id = ?""",
|
|
(paste_id,),
|
|
).fetchone()
|
|
|
|
if row is None:
|
|
# TIMING-001: Perform dummy password verification to prevent timing-based
|
|
# enumeration (attacker can't distinguish "not found" from "wrong password"
|
|
# by measuring response time)
|
|
if check_password:
|
|
dummy_hash = (
|
|
"$pbkdf2-sha256$600000$"
|
|
"0000000000000000000000000000000000000000000000000000000000000000$"
|
|
"0000000000000000000000000000000000000000000000000000000000000000"
|
|
)
|
|
verify_password("dummy", dummy_hash)
|
|
db.commit()
|
|
return error_response("Paste not found", 404)
|
|
|
|
# Password verification
|
|
if check_password and row["password_hash"]:
|
|
provided = request.headers.get("X-Paste-Password", "")
|
|
if not provided:
|
|
db.commit()
|
|
return error_response("Password required", 401, password_protected=True)
|
|
if not verify_password(provided, row["password_hash"]):
|
|
db.commit()
|
|
return error_response("Invalid password", 403)
|
|
|
|
g.paste = row
|
|
g.db = db
|
|
return None
|
|
|
|
|
|
def generate_short_id() -> str:
|
|
"""Generate a random base62 short ID."""
|
|
length = current_app.config["SHORT_ID_LENGTH"]
|
|
return "".join(secrets.choice(SHORT_ID_ALPHABET) for _ in range(length))
|
|
|
|
|
|
def validate_short_id(short_id: str) -> Response | None:
|
|
"""Validate short URL ID format. Returns error response or None if valid."""
|
|
expected_length = current_app.config["SHORT_ID_LENGTH"]
|
|
if len(short_id) != expected_length or not SHORT_ID_PATTERN.match(short_id):
|
|
return error_response("Invalid short URL ID", 400)
|
|
return None
|
|
|
|
|
|
def validate_target_url(url: str) -> Response | None:
|
|
"""Validate target URL for shortening. Returns error response or None if valid."""
|
|
max_length = current_app.config["SHORT_URL_MAX_LENGTH"]
|
|
if len(url) > max_length:
|
|
return error_response("URL too long", 400, max_length=max_length, length=len(url))
|
|
|
|
parsed = urlparse(url)
|
|
if parsed.scheme not in ALLOWED_URL_SCHEMES:
|
|
return error_response("Invalid URL scheme", 400, allowed=list(ALLOWED_URL_SCHEMES))
|
|
if not parsed.netloc:
|
|
return error_response("Invalid URL: missing host", 400)
|
|
|
|
return None
|
|
|
|
|
|
def fetch_short_url(short_id: str, increment_counter: bool = True) -> Response | None:
|
|
"""Fetch short URL and store in g.short_url. Returns error response or None if OK."""
|
|
# Trusted certificate holders are exempt from lookup rate limiting
|
|
if not get_client_id():
|
|
client_ip = get_client_ip()
|
|
allowed, retry_after = check_lookup_rate_limit(client_ip)
|
|
if not allowed:
|
|
response = error_response(
|
|
f"Lookup rate limit exceeded. Retry after {retry_after} seconds.",
|
|
429,
|
|
retry_after=retry_after,
|
|
)
|
|
response.headers["Retry-After"] = str(retry_after)
|
|
return response
|
|
|
|
db = get_db()
|
|
now = int(time.time())
|
|
|
|
if increment_counter:
|
|
db.execute(
|
|
"UPDATE short_urls SET last_accessed = ?, access_count = access_count + 1 WHERE id = ?",
|
|
(now, short_id),
|
|
)
|
|
else:
|
|
db.execute(
|
|
"UPDATE short_urls SET last_accessed = ? WHERE id = ?",
|
|
(now, short_id),
|
|
)
|
|
|
|
row = db.execute(
|
|
"""SELECT id, target_url, owner, created_at, last_accessed,
|
|
access_count, expires_at
|
|
FROM short_urls WHERE id = ?""",
|
|
(short_id,),
|
|
).fetchone()
|
|
|
|
if row is None:
|
|
db.commit()
|
|
return error_response("Short URL not found", 404)
|
|
|
|
# Check expiry
|
|
if row["expires_at"] and row["expires_at"] < now:
|
|
db.execute("DELETE FROM short_urls WHERE id = ?", (short_id,))
|
|
db.commit()
|
|
return error_response("Short URL expired", 404)
|
|
|
|
db.commit()
|
|
g.short_url = row
|
|
return None
|
|
|
|
|
|
def require_auth() -> Response | None:
|
|
"""Check authentication for ownership operations.
|
|
|
|
Uses get_client_fingerprint() to allow both trusted and untrusted
|
|
certificate holders to manage their own pastes.
|
|
|
|
Returns error response or None if authenticated.
|
|
"""
|
|
client_id = get_client_fingerprint()
|
|
if not client_id:
|
|
return error_response("Authentication required", 401)
|
|
g.client_id = client_id
|
|
return None
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Authentication & Security
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def is_trusted_proxy() -> bool:
|
|
"""Verify request comes from trusted reverse proxy via shared secret.
|
|
|
|
Result is cached per-request in Flask's g object for efficiency.
|
|
"""
|
|
if hasattr(g, "_trusted_proxy"):
|
|
return bool(g._trusted_proxy)
|
|
|
|
expected = current_app.config.get("TRUSTED_PROXY_SECRET", "")
|
|
if not expected:
|
|
g._trusted_proxy = True
|
|
return True
|
|
provided = request.headers.get("X-Proxy-Secret", "")
|
|
g._trusted_proxy = hmac.compare_digest(expected, provided)
|
|
return bool(g._trusted_proxy)
|
|
|
|
|
|
def get_client_fingerprint() -> str | None:
|
|
"""Extract client certificate fingerprint for identity/ownership.
|
|
|
|
Returns fingerprint regardless of trust status. Used for:
|
|
- Paste ownership tracking
|
|
- Delete/update/list operations (user manages their own pastes)
|
|
|
|
Returns None if no valid fingerprint provided or proxy not trusted.
|
|
"""
|
|
if not is_trusted_proxy():
|
|
return None
|
|
|
|
sha1 = request.headers.get("X-SSL-Client-SHA1", "").strip().lower()
|
|
if sha1 and CLIENT_ID_PATTERN.match(sha1):
|
|
return sha1
|
|
return None
|
|
|
|
|
|
def get_client_id() -> str | None:
|
|
"""Get trusted client certificate fingerprint for elevated privileges.
|
|
|
|
Returns fingerprint only if certificate is valid and not revoked.
|
|
Used for:
|
|
- Rate limit benefits (higher limits for trusted users)
|
|
- Size limit benefits (larger pastes for trusted users)
|
|
|
|
Untrusted certificates return None here but still work via
|
|
get_client_fingerprint() for ownership operations.
|
|
"""
|
|
fingerprint = get_client_fingerprint()
|
|
if not fingerprint:
|
|
return None
|
|
|
|
# Check if PKI is enabled and certificate is revoked
|
|
if current_app.config.get("PKI_ENABLED"):
|
|
from app.pki import is_certificate_valid
|
|
|
|
if not is_certificate_valid(fingerprint):
|
|
current_app.logger.warning(
|
|
"Elevated auth rejected (revoked/expired): %s", fingerprint[:12] + "..."
|
|
)
|
|
log_event(
|
|
AuditEvent.AUTH_FAILURE,
|
|
AuditOutcome.BLOCKED,
|
|
client_id=fingerprint,
|
|
client_ip=get_client_ip(),
|
|
details={"reason": "revoked_or_expired"},
|
|
)
|
|
return None
|
|
return fingerprint
|
|
|
|
|
|
def is_admin() -> bool:
|
|
"""Check if current authenticated user is an admin.
|
|
|
|
Returns True only if:
|
|
- User has a valid client certificate
|
|
- Certificate is marked as admin in the PKI database
|
|
"""
|
|
client_id = get_client_id()
|
|
if not client_id:
|
|
return False
|
|
|
|
if not current_app.config.get("PKI_ENABLED"):
|
|
return False
|
|
|
|
from app.pki import is_admin_certificate
|
|
|
|
return is_admin_certificate(client_id)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Proof-of-Work
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def get_pow_secret() -> bytes:
|
|
"""Get PoW signing secret from app config."""
|
|
secret: str = current_app.config["POW_SECRET"]
|
|
return secret.encode()
|
|
|
|
|
|
def generate_challenge(difficulty_override: int | None = None) -> dict[str, Any]:
|
|
"""Generate new PoW challenge with signed token.
|
|
|
|
Uses dynamic difficulty which may be elevated during high load,
|
|
unless difficulty_override is specified.
|
|
|
|
Args:
|
|
difficulty_override: Optional fixed difficulty (for registration)
|
|
"""
|
|
if difficulty_override is not None:
|
|
difficulty = difficulty_override
|
|
else:
|
|
difficulty = get_dynamic_difficulty()
|
|
ttl = current_app.config["POW_CHALLENGE_TTL"]
|
|
expires = int(time.time()) + ttl
|
|
nonce = secrets.token_hex(16)
|
|
|
|
msg = f"{nonce}:{expires}:{difficulty}".encode()
|
|
sig = hmac.new(get_pow_secret(), msg, hashlib.sha256).hexdigest()
|
|
|
|
return {
|
|
"nonce": nonce,
|
|
"difficulty": difficulty,
|
|
"expires": expires,
|
|
"token": f"{nonce}:{expires}:{difficulty}:{sig}",
|
|
}
|
|
|
|
|
|
def verify_pow(token: str, solution: str, min_difficulty: int | None = None) -> tuple[bool, str]:
|
|
"""Verify proof-of-work solution. Returns (valid, error_message).
|
|
|
|
Accepts tokens with difficulty >= min_difficulty. The solution must meet the
|
|
token's embedded difficulty (which may be elevated due to anti-flood).
|
|
|
|
Args:
|
|
token: PoW challenge token
|
|
solution: Nonce solution
|
|
min_difficulty: Minimum required difficulty (defaults to POW_DIFFICULTY)
|
|
"""
|
|
base_difficulty = current_app.config["POW_DIFFICULTY"]
|
|
if base_difficulty == 0 and min_difficulty is None:
|
|
return True, ""
|
|
|
|
required_difficulty = min_difficulty if min_difficulty is not None else base_difficulty
|
|
if required_difficulty == 0:
|
|
return True, ""
|
|
|
|
# Parse token
|
|
try:
|
|
parts = token.split(":")
|
|
if len(parts) != 4:
|
|
return False, "Invalid challenge format"
|
|
nonce, expires_str, diff_str, sig = parts
|
|
expires = int(expires_str)
|
|
token_diff = int(diff_str)
|
|
except (ValueError, TypeError):
|
|
return False, "Invalid challenge format"
|
|
|
|
# Verify signature
|
|
msg = f"{nonce}:{expires}:{token_diff}".encode()
|
|
expected_sig = hmac.new(get_pow_secret(), msg, hashlib.sha256).hexdigest()
|
|
if not hmac.compare_digest(sig, expected_sig):
|
|
return False, "Invalid challenge signature"
|
|
|
|
# Check expiry
|
|
if int(time.time()) > expires:
|
|
return False, "Challenge expired"
|
|
|
|
# Token difficulty must be at least the required difficulty
|
|
if token_diff < required_difficulty:
|
|
return False, f"Difficulty too low: {token_diff} < {required_difficulty}"
|
|
|
|
# Verify solution
|
|
try:
|
|
solution_int = int(solution)
|
|
if solution_int < 0:
|
|
return False, "Invalid solution"
|
|
except (ValueError, TypeError):
|
|
return False, "Invalid solution"
|
|
|
|
# Check hash meets the token's difficulty (not current dynamic difficulty)
|
|
work = f"{nonce}:{solution}".encode()
|
|
hash_bytes = hashlib.sha256(work).digest()
|
|
|
|
zero_bits = 0
|
|
for byte in hash_bytes:
|
|
if byte == 0:
|
|
zero_bits += 8
|
|
else:
|
|
zero_bits += 8 - byte.bit_length()
|
|
break
|
|
|
|
if zero_bits < token_diff:
|
|
return False, f"Insufficient work: {zero_bits} < {token_diff} bits"
|
|
|
|
return True, ""
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Content Processing
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def calculate_entropy(data: bytes) -> float:
|
|
"""Calculate Shannon entropy in bits per byte (0-8 range)."""
|
|
if not data:
|
|
return 0.0
|
|
|
|
freq = [0] * 256
|
|
for byte in data:
|
|
freq[byte] += 1
|
|
|
|
length = len(data)
|
|
entropy = 0.0
|
|
for count in freq:
|
|
if count > 0:
|
|
p = count / length
|
|
entropy -= p * math.log2(p)
|
|
|
|
return entropy
|
|
|
|
|
|
def detect_mime_type(content: bytes, content_type: str | None = None) -> str:
|
|
"""Detect MIME type based on text/binary analysis.
|
|
|
|
Simple approach: if content is valid UTF-8, it's text/plain.
|
|
Otherwise, it's application/octet-stream (binary).
|
|
|
|
Security headers (X-Content-Type-Options: nosniff, CSP) prevent
|
|
browsers from MIME-sniffing and executing embedded scripts.
|
|
"""
|
|
# Honor explicit Content-Type if specific (not generic)
|
|
if content_type:
|
|
mime = content_type.split(";")[0].strip().lower()
|
|
if mime not in GENERIC_MIME_TYPES and MIME_PATTERN.match(mime):
|
|
return mime
|
|
|
|
# Text vs binary detection
|
|
try:
|
|
content.decode("utf-8")
|
|
return "text/plain"
|
|
except UnicodeDecodeError:
|
|
return "application/octet-stream"
|
|
|
|
|
|
def is_recognizable_format(content: bytes) -> tuple[bool, str | None]:
|
|
"""Check if content is a recognizable (likely unencrypted) format.
|
|
|
|
Returns (is_recognizable, detected_format).
|
|
Used to enforce encryption by rejecting known formats.
|
|
|
|
Simple approach: valid UTF-8 text is recognizable (plaintext).
|
|
Binary content is considered potentially encrypted (not recognizable).
|
|
"""
|
|
# Check if valid UTF-8 text (plaintext)
|
|
try:
|
|
content.decode("utf-8")
|
|
return True, "text/plain"
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
return False, None
|
|
|
|
|
|
def generate_paste_id(content: bytes) -> str:
|
|
"""Generate unique paste ID from content hash and timestamp."""
|
|
data = content + str(time.time_ns()).encode()
|
|
length = current_app.config["PASTE_ID_LENGTH"]
|
|
return hashlib.sha256(data).hexdigest()[:length]
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Class-Based Views
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class IndexView(MethodView):
|
|
"""Handle API info and paste creation."""
|
|
|
|
def get(self) -> Response:
|
|
"""Return API information and usage examples."""
|
|
url = base_url()
|
|
difficulty = get_dynamic_difficulty()
|
|
pki_enabled = current_app.config.get("PKI_ENABLED", False)
|
|
|
|
# Build endpoints dict
|
|
endpoints: dict[str, str] = {
|
|
f"GET {prefixed_url('/')}": "API information",
|
|
f"GET {prefixed_url('/health')}": "Health check",
|
|
f"GET {prefixed_url('/client')}": "Download CLI client (fpaste)",
|
|
f"GET {prefixed_url('/challenge')}": "Get proof-of-work challenge",
|
|
f"POST {prefixed_url('/')}": "Create paste (PoW required unless trusted cert)",
|
|
f"GET {prefixed_url('/pastes')}": "List your pastes (cert required)",
|
|
f"GET {prefixed_url('/<id>')}": "Get paste metadata",
|
|
f"GET {prefixed_url('/<id>/raw')}": "Get raw paste content",
|
|
f"PUT {prefixed_url('/<id>')}": "Update paste (owner only)",
|
|
f"DELETE {prefixed_url('/<id>')}": "Delete paste (owner only)",
|
|
f"GET {prefixed_url('/register/challenge')}": "Get registration challenge",
|
|
f"POST {prefixed_url('/register')}": "Register for client certificate",
|
|
f"POST {prefixed_url('/s')}": "Create short URL (PoW required unless trusted cert)",
|
|
f"GET {prefixed_url('/s')}": "List your short URLs (cert required)",
|
|
f"GET {prefixed_url('/s/<id>')}": "Redirect to target URL",
|
|
f"GET {prefixed_url('/s/<id>/info')}": "Short URL metadata",
|
|
f"DELETE {prefixed_url('/s/<id>')}": "Delete short URL (owner only)",
|
|
}
|
|
|
|
if pki_enabled:
|
|
endpoints.update(
|
|
{
|
|
f"GET {prefixed_url('/pki')}": "PKI status",
|
|
f"GET {prefixed_url('/pki/ca.crt')}": "Download CA certificate",
|
|
f"POST {prefixed_url('/pki/issue')}": "Issue client certificate",
|
|
f"GET {prefixed_url('/pki/certs')}": "List certificates",
|
|
f"POST {prefixed_url('/pki/revoke/<serial>')}": "Revoke certificate",
|
|
}
|
|
)
|
|
|
|
# Build response
|
|
response_data: dict[str, Any] = {
|
|
"version": VERSION,
|
|
"endpoints": endpoints,
|
|
"authentication": {
|
|
"anonymous": "Create pastes only (strict limits)",
|
|
"client_cert": "Create + manage own pastes (strict limits)",
|
|
"trusted_cert": "All operations (no rate limits)",
|
|
},
|
|
"limits": {
|
|
"anonymous": {
|
|
"max_size": current_app.config["MAX_PASTE_SIZE_ANON"],
|
|
"rate": f"{current_app.config['RATE_LIMIT_MAX']}/min",
|
|
},
|
|
"trusted": {
|
|
"max_size": current_app.config["MAX_PASTE_SIZE_AUTH"],
|
|
"rate": "unlimited",
|
|
},
|
|
},
|
|
"pow": {
|
|
"enabled": difficulty > 0,
|
|
"difficulty": difficulty,
|
|
"hint": "GET /challenge, solve, submit with X-PoW-Token + X-PoW-Solution",
|
|
},
|
|
"cli": {
|
|
"install": f"curl -o fpaste {url}/client && chmod +x fpaste",
|
|
"usage": "fpaste file.txt # encrypts by default",
|
|
},
|
|
"usage": {
|
|
"create": f"curl -X POST --data-binary @file.txt {url}/ (with PoW headers)",
|
|
"get": f"curl {url}/<id>/raw",
|
|
"delete": f"curl -X DELETE {url}/<id> (with X-SSL-Client-SHA1)",
|
|
},
|
|
}
|
|
|
|
return json_response(response_data)
|
|
|
|
def post(self) -> Response:
|
|
"""Create a new paste."""
|
|
# Parse content
|
|
content: bytes | None = None
|
|
mime_type: str | None = None
|
|
|
|
if request.is_json:
|
|
data = request.get_json(silent=True)
|
|
if data and isinstance(data.get("content"), str):
|
|
content = data["content"].encode("utf-8")
|
|
mime_type = "text/plain"
|
|
else:
|
|
content = request.get_data(as_text=False)
|
|
if content:
|
|
mime_type = detect_mime_type(content, request.content_type)
|
|
|
|
if not content:
|
|
return error_response("No content provided", 400)
|
|
|
|
# Separate trusted (for limits) from fingerprint (for ownership)
|
|
trusted_client = get_client_id() # Only trusted certs get elevated limits
|
|
owner = get_client_fingerprint() # Any cert can own pastes
|
|
|
|
# Rate limiting (trusted certs exempt)
|
|
client_ip = get_client_ip()
|
|
if not trusted_client:
|
|
allowed, remaining, limit, reset_timestamp = check_rate_limit(
|
|
client_ip, authenticated=False
|
|
)
|
|
|
|
# Store rate limit info for response headers
|
|
g.rate_limit_remaining = remaining
|
|
g.rate_limit_limit = limit
|
|
g.rate_limit_reset = reset_timestamp
|
|
|
|
if not allowed:
|
|
current_app.logger.warning("Rate limit exceeded: ip=%s", client_ip)
|
|
# Audit log rate limit event
|
|
if current_app.config.get("AUDIT_ENABLED", True):
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
|
|
log_event(
|
|
AuditEvent.RATE_LIMIT,
|
|
AuditOutcome.BLOCKED,
|
|
client_id=owner,
|
|
client_ip=client_ip,
|
|
)
|
|
record_rate_limit("blocked")
|
|
retry_after = max(1, reset_timestamp - int(time.time()))
|
|
response = error_response(
|
|
"Rate limit exceeded",
|
|
429,
|
|
retry_after=retry_after,
|
|
)
|
|
response.headers["Retry-After"] = str(retry_after)
|
|
add_rate_limit_headers(response, 0, limit, reset_timestamp)
|
|
return response
|
|
|
|
# Proof-of-work verification (trusted certs exempt)
|
|
difficulty = current_app.config["POW_DIFFICULTY"]
|
|
if difficulty > 0 and not trusted_client:
|
|
token = request.headers.get("X-PoW-Token", "")
|
|
solution = request.headers.get("X-PoW-Solution", "")
|
|
|
|
if not token or not solution:
|
|
return error_response(
|
|
"Proof-of-work required", 400, hint="GET /challenge for a new challenge"
|
|
)
|
|
|
|
valid, err = verify_pow(token, solution)
|
|
if not valid:
|
|
current_app.logger.warning(
|
|
"PoW verification failed: %s from=%s", err, request.remote_addr
|
|
)
|
|
# Audit log PoW failure
|
|
if current_app.config.get("AUDIT_ENABLED", True):
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
|
|
log_event(
|
|
AuditEvent.POW_FAILURE,
|
|
AuditOutcome.FAILURE,
|
|
client_id=owner,
|
|
client_ip=client_ip,
|
|
details={"error": err},
|
|
)
|
|
record_pow("failure")
|
|
return error_response(f"Proof-of-work failed: {err}", 400)
|
|
|
|
record_pow("success")
|
|
|
|
# Size limits (only trusted clients get elevated limits)
|
|
content_size = len(content)
|
|
max_size = (
|
|
current_app.config["MAX_PASTE_SIZE_AUTH"]
|
|
if trusted_client
|
|
else current_app.config["MAX_PASTE_SIZE_ANON"]
|
|
)
|
|
|
|
if content_size > max_size:
|
|
return error_response(
|
|
"Paste too large",
|
|
413,
|
|
size=content_size,
|
|
max_size=max_size,
|
|
trusted=trusted_client is not None,
|
|
)
|
|
|
|
# Minimum size check (enforces encryption overhead)
|
|
min_size = current_app.config.get("MIN_PASTE_SIZE", 0)
|
|
if min_size > 0 and content_size < min_size:
|
|
return error_response(
|
|
"Paste too small",
|
|
400,
|
|
size=content_size,
|
|
min_size=min_size,
|
|
hint="Encrypt content before uploading (fpaste encrypts by default)",
|
|
)
|
|
|
|
# Entropy check
|
|
min_entropy = current_app.config.get("MIN_ENTROPY", 0)
|
|
min_entropy_size = current_app.config.get("MIN_ENTROPY_SIZE", 256)
|
|
if min_entropy > 0 and content_size >= min_entropy_size:
|
|
entropy = calculate_entropy(content)
|
|
if entropy < min_entropy:
|
|
current_app.logger.warning(
|
|
"Low entropy rejected: %.2f < %.2f from=%s",
|
|
entropy,
|
|
min_entropy,
|
|
request.remote_addr,
|
|
)
|
|
return error_response(
|
|
"Content entropy too low",
|
|
400,
|
|
entropy=round(entropy, 2),
|
|
min_entropy=min_entropy,
|
|
hint="Encrypt content before uploading (fpaste encrypts by default)",
|
|
)
|
|
|
|
# Binary content requirement (reject recognizable formats)
|
|
if current_app.config.get("REQUIRE_BINARY", False):
|
|
is_recognized, detected_format = is_recognizable_format(content)
|
|
if is_recognized:
|
|
current_app.logger.warning(
|
|
"Recognizable format rejected: %s from=%s",
|
|
detected_format,
|
|
request.remote_addr,
|
|
)
|
|
return error_response(
|
|
"Recognizable format not allowed",
|
|
400,
|
|
detected=detected_format,
|
|
hint="Encrypt content before uploading (fpaste encrypts by default)",
|
|
)
|
|
|
|
# Deduplication check
|
|
content_hash = hashlib.sha256(content).hexdigest()
|
|
is_allowed, dedup_count = check_content_hash(content_hash)
|
|
|
|
if not is_allowed:
|
|
window = current_app.config["CONTENT_DEDUP_WINDOW"]
|
|
current_app.logger.warning(
|
|
"Dedup threshold exceeded: hash=%s count=%d from=%s",
|
|
content_hash[:16],
|
|
dedup_count,
|
|
request.remote_addr,
|
|
)
|
|
# Audit log dedup block
|
|
if current_app.config.get("AUDIT_ENABLED", True):
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
|
|
log_event(
|
|
AuditEvent.DEDUP_BLOCK,
|
|
AuditOutcome.BLOCKED,
|
|
client_id=owner,
|
|
client_ip=client_ip,
|
|
details={"hash": content_hash[:16], "count": dedup_count},
|
|
)
|
|
record_dedup("blocked")
|
|
return error_response(
|
|
"Duplicate content rate limit exceeded",
|
|
429,
|
|
count=dedup_count,
|
|
window_seconds=window,
|
|
)
|
|
|
|
record_dedup("allowed")
|
|
|
|
# Parse optional headers
|
|
burn_header = request.headers.get("X-Burn-After-Read", "").strip().lower()
|
|
burn_after_read = burn_header in ("true", "1", "yes")
|
|
|
|
# Determine default expiry based on authentication level
|
|
# Anonymous < Untrusted cert < Trusted cert (registered in PKI)
|
|
if owner is None:
|
|
# Anonymous user
|
|
default_expiry = current_app.config.get("EXPIRY_ANON", 86400)
|
|
elif trusted_client:
|
|
# Trusted certificate (registered in PKI)
|
|
from app.pki import is_trusted_certificate
|
|
|
|
if is_trusted_certificate(owner):
|
|
default_expiry = current_app.config.get("EXPIRY_TRUSTED", 2592000)
|
|
else:
|
|
default_expiry = current_app.config.get("EXPIRY_UNTRUSTED", 604800)
|
|
else:
|
|
# Has cert but not trusted
|
|
default_expiry = current_app.config.get("EXPIRY_UNTRUSTED", 604800)
|
|
|
|
expires_at = None
|
|
expiry_header = request.headers.get("X-Expiry", "").strip()
|
|
if expiry_header:
|
|
try:
|
|
expiry_seconds = int(expiry_header)
|
|
if expiry_seconds > 0:
|
|
max_expiry = current_app.config.get("MAX_EXPIRY_SECONDS", 0)
|
|
if max_expiry > 0:
|
|
expiry_seconds = min(expiry_seconds, max_expiry)
|
|
expires_at = int(time.time()) + expiry_seconds
|
|
except ValueError:
|
|
pass
|
|
|
|
# Apply default expiry if none specified (0 = no expiry for trusted)
|
|
if expires_at is None and default_expiry > 0:
|
|
expires_at = int(time.time()) + default_expiry
|
|
|
|
password_hash = None
|
|
password_header = request.headers.get("X-Paste-Password", "")
|
|
if password_header:
|
|
if len(password_header) > 1024:
|
|
return error_response("Password too long (max 1024 chars)", 400)
|
|
password_hash = hash_password(password_header)
|
|
|
|
# Display name (authenticated users only, silently ignored for anonymous)
|
|
display_name: str | None = None
|
|
display_name_header = request.headers.get("X-Display-Name", "").strip()
|
|
if display_name_header and owner:
|
|
if len(display_name_header) > 128:
|
|
return error_response("Display name too long (max 128 chars)", 400)
|
|
if CONTROL_CHAR_PATTERN.search(display_name_header):
|
|
return error_response("Display name contains invalid characters", 400)
|
|
display_name = display_name_header
|
|
|
|
# Insert paste
|
|
paste_id = generate_paste_id(content)
|
|
now = int(time.time())
|
|
|
|
db = get_db()
|
|
db.execute(
|
|
"""INSERT INTO pastes
|
|
(id, content, mime_type, owner, created_at, last_accessed,
|
|
burn_after_read, expires_at, password_hash, display_name)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
paste_id,
|
|
content,
|
|
mime_type,
|
|
owner,
|
|
now,
|
|
now,
|
|
1 if burn_after_read else 0,
|
|
expires_at,
|
|
password_hash,
|
|
display_name,
|
|
),
|
|
)
|
|
db.commit()
|
|
|
|
# Build response (creation response is intentionally different - no size)
|
|
response_data: dict[str, Any] = {
|
|
"id": paste_id,
|
|
"url": paste_url(paste_id),
|
|
"raw": paste_raw_url(paste_id),
|
|
"mime_type": mime_type,
|
|
"created_at": now,
|
|
}
|
|
if owner:
|
|
response_data["owner"] = owner
|
|
if burn_after_read:
|
|
response_data["burn_after_read"] = True
|
|
if expires_at:
|
|
response_data["expires_at"] = expires_at
|
|
if password_hash:
|
|
response_data["password_protected"] = True
|
|
if display_name:
|
|
response_data["display_name"] = display_name
|
|
|
|
# Record successful paste for anti-flood tracking
|
|
record_antiflood_request()
|
|
|
|
# Audit log paste creation
|
|
if current_app.config.get("AUDIT_ENABLED", True):
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
|
|
log_event(
|
|
AuditEvent.PASTE_CREATE,
|
|
AuditOutcome.SUCCESS,
|
|
paste_id=paste_id,
|
|
client_id=owner,
|
|
client_ip=client_ip,
|
|
details={"size": content_size, "mime_type": mime_type},
|
|
)
|
|
|
|
record_paste_created("authenticated" if owner else "anonymous", "success")
|
|
|
|
response = json_response(response_data, 201)
|
|
|
|
# Add rate limit headers to successful response
|
|
rl_remaining = getattr(g, "rate_limit_remaining", -1)
|
|
rl_limit = getattr(g, "rate_limit_limit", -1)
|
|
rl_reset = getattr(g, "rate_limit_reset", 0)
|
|
if rl_limit > 0:
|
|
add_rate_limit_headers(response, rl_remaining, rl_limit, rl_reset)
|
|
|
|
return response
|
|
|
|
|
|
class HealthView(MethodView):
|
|
"""Health check endpoint."""
|
|
|
|
def get(self) -> Response:
|
|
"""Return health status with database check."""
|
|
try:
|
|
db = get_db()
|
|
db.execute("SELECT 1")
|
|
return json_response({"status": "healthy", "database": "ok"})
|
|
except Exception:
|
|
return json_response({"status": "unhealthy", "database": "error"}, 503)
|
|
|
|
|
|
class ChallengeView(MethodView):
|
|
"""Proof-of-work challenge endpoint."""
|
|
|
|
def get(self) -> Response:
|
|
"""Generate and return PoW challenge."""
|
|
base_difficulty = current_app.config["POW_DIFFICULTY"]
|
|
if base_difficulty == 0:
|
|
return json_response({"enabled": False, "difficulty": 0})
|
|
|
|
ch = generate_challenge()
|
|
response = {
|
|
"enabled": True,
|
|
"nonce": ch["nonce"],
|
|
"difficulty": ch["difficulty"],
|
|
"expires": ch["expires"],
|
|
"token": ch["token"],
|
|
}
|
|
# Indicate if difficulty is elevated due to anti-flood
|
|
if ch["difficulty"] > base_difficulty:
|
|
response["elevated"] = True
|
|
response["base_difficulty"] = base_difficulty
|
|
return json_response(response)
|
|
|
|
|
|
class RegisterChallengeView(MethodView):
|
|
"""Registration PoW challenge endpoint (higher difficulty)."""
|
|
|
|
def get(self) -> Response:
|
|
"""Generate PoW challenge for registration (higher difficulty)."""
|
|
register_difficulty = current_app.config.get("REGISTER_POW_DIFFICULTY", 24)
|
|
if register_difficulty == 0:
|
|
return json_response({"enabled": False, "difficulty": 0})
|
|
|
|
ch = generate_challenge(difficulty_override=register_difficulty)
|
|
return json_response(
|
|
{
|
|
"enabled": True,
|
|
"nonce": ch["nonce"],
|
|
"difficulty": ch["difficulty"],
|
|
"expires": ch["expires"],
|
|
"token": ch["token"],
|
|
"purpose": "registration",
|
|
}
|
|
)
|
|
|
|
|
|
class RegisterView(MethodView):
|
|
"""Public client certificate registration endpoint."""
|
|
|
|
def post(self) -> Response:
|
|
"""Register and obtain a client certificate.
|
|
|
|
Requires PoW to prevent abuse. Returns PKCS#12 bundle with:
|
|
- Client certificate
|
|
- Client private key
|
|
- CA certificate
|
|
|
|
Auto-generates CA if not present and PKI_CA_PASSWORD is configured.
|
|
"""
|
|
from cryptography import x509
|
|
from cryptography.hazmat.primitives import serialization
|
|
|
|
from app.pki import (
|
|
CANotFoundError,
|
|
PKIError,
|
|
create_pkcs12,
|
|
generate_ca,
|
|
get_ca_info,
|
|
issue_certificate,
|
|
)
|
|
|
|
# Check PKI configuration
|
|
password = current_app.config.get("PKI_CA_PASSWORD", "")
|
|
if not password:
|
|
return error_response(
|
|
"Registration not available",
|
|
503,
|
|
hint="PKI_CA_PASSWORD not configured",
|
|
)
|
|
|
|
# Verify PoW
|
|
register_difficulty = current_app.config.get("REGISTER_POW_DIFFICULTY", 24)
|
|
if register_difficulty > 0:
|
|
token = request.headers.get("X-PoW-Token", "")
|
|
solution = request.headers.get("X-PoW-Solution", "")
|
|
|
|
if not token or not solution:
|
|
return error_response(
|
|
"Proof-of-work required",
|
|
400,
|
|
hint="GET /register/challenge for a registration challenge",
|
|
difficulty=register_difficulty,
|
|
)
|
|
|
|
valid, err = verify_pow(token, solution, min_difficulty=register_difficulty)
|
|
if not valid:
|
|
current_app.logger.warning(
|
|
"Registration PoW failed: %s from=%s", err, request.remote_addr
|
|
)
|
|
return error_response(f"Proof-of-work failed: {err}", 400)
|
|
|
|
# Parse common_name from request
|
|
common_name = None
|
|
if request.is_json:
|
|
data = request.get_json(silent=True)
|
|
if data and isinstance(data.get("common_name"), str):
|
|
common_name = data["common_name"][:64].strip()
|
|
|
|
if not common_name:
|
|
# Generate random common name if not provided
|
|
common_name = f"client-{secrets.token_hex(4)}"
|
|
|
|
# Auto-generate CA if needed (skip PKI_ENABLED check for registration)
|
|
ca_info = get_ca_info(skip_enabled_check=True)
|
|
if ca_info is None:
|
|
ca_days = current_app.config.get("PKI_CA_DAYS", 3650)
|
|
try:
|
|
ca_info = generate_ca("FlaskPaste CA", password, days=ca_days)
|
|
current_app.logger.info(
|
|
"CA auto-generated for registration: fingerprint=%s",
|
|
ca_info["fingerprint_sha1"][:12],
|
|
)
|
|
except PKIError as e:
|
|
current_app.logger.error("CA auto-generation failed: %s", e)
|
|
return error_response("CA generation failed", 500)
|
|
|
|
# Issue certificate
|
|
try:
|
|
cert_days = current_app.config.get("PKI_CERT_DAYS", 365)
|
|
cert_info = issue_certificate(common_name, password, days=cert_days)
|
|
except CANotFoundError:
|
|
return error_response("CA not available", 500)
|
|
except PKIError as e:
|
|
current_app.logger.error("Certificate issuance failed: %s", e)
|
|
return error_response("Certificate issuance failed", 500)
|
|
|
|
# Load CA cert for PKCS#12 (reuse ca_info from above, or refresh if it was just generated)
|
|
if ca_info is None or "certificate_pem" not in ca_info:
|
|
ca_info = get_ca_info(skip_enabled_check=True)
|
|
assert ca_info is not None # CA was just generated or exists
|
|
ca_cert = x509.load_pem_x509_certificate(ca_info["certificate_pem"].encode())
|
|
client_cert = x509.load_pem_x509_certificate(cert_info["certificate_pem"].encode())
|
|
client_key = serialization.load_pem_private_key(
|
|
cert_info["private_key_pem"].encode(), password=None
|
|
)
|
|
|
|
# Create PKCS#12 bundle (no password for easy import)
|
|
p12_data = create_pkcs12(
|
|
private_key=client_key,
|
|
certificate=client_cert,
|
|
ca_certificate=ca_cert,
|
|
friendly_name=common_name,
|
|
password=None,
|
|
)
|
|
|
|
current_app.logger.info(
|
|
"Client registered: cn=%s fingerprint=%s from=%s",
|
|
common_name,
|
|
cert_info["fingerprint_sha1"][:12],
|
|
request.remote_addr,
|
|
)
|
|
log_event(
|
|
AuditEvent.CERT_ISSUED,
|
|
AuditOutcome.SUCCESS,
|
|
client_id=cert_info["fingerprint_sha1"],
|
|
client_ip=request.remote_addr,
|
|
details={
|
|
"type": "registration",
|
|
"common_name": common_name,
|
|
"expires_at": cert_info["expires_at"],
|
|
},
|
|
)
|
|
|
|
# Return PKCS#12 as binary download
|
|
response = Response(p12_data, mimetype="application/x-pkcs12")
|
|
response.headers["Content-Disposition"] = f'attachment; filename="{common_name}.p12"'
|
|
response.headers["X-Fingerprint-SHA1"] = cert_info["fingerprint_sha1"]
|
|
response.headers["X-Certificate-Expires"] = str(cert_info["expires_at"])
|
|
response.headers["X-Is-Admin"] = "1" if cert_info.get("is_admin") else "0"
|
|
return response
|
|
|
|
|
|
class ClientView(MethodView):
|
|
"""CLI client download endpoint."""
|
|
|
|
def get(self) -> Response:
|
|
"""Serve fpaste CLI with server URL pre-configured."""
|
|
import os
|
|
|
|
server_url = base_url()
|
|
client_path = os.path.join(current_app.root_path, "..", "fpaste")
|
|
|
|
try:
|
|
with open(client_path) as f:
|
|
content = f.read()
|
|
|
|
# Replace default server URL
|
|
content = content.replace(
|
|
'"server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000")',
|
|
f'"server": os.environ.get("FLASKPASTE_SERVER", "{server_url}")',
|
|
)
|
|
content = content.replace(
|
|
"http://localhost:5000)",
|
|
f"{server_url})",
|
|
)
|
|
|
|
response = Response(content, mimetype="text/x-python")
|
|
response.headers["Content-Disposition"] = "attachment; filename=fpaste"
|
|
return response
|
|
except FileNotFoundError:
|
|
return error_response("Client not available", 404)
|
|
|
|
|
|
class PasteView(MethodView):
|
|
"""Paste metadata operations."""
|
|
|
|
def get(self, paste_id: str) -> Response:
|
|
"""Retrieve paste metadata."""
|
|
# Validate and fetch
|
|
if err := validate_paste_id(paste_id):
|
|
return err
|
|
if err := fetch_paste(paste_id):
|
|
return err
|
|
|
|
row: Row = g.paste
|
|
g.db.commit()
|
|
|
|
return json_response(
|
|
build_paste_metadata(
|
|
paste_id=row["id"],
|
|
mime_type=row["mime_type"],
|
|
size=row["size"],
|
|
created_at=row["created_at"],
|
|
burn_after_read=bool(row["burn_after_read"]),
|
|
expires_at=row["expires_at"],
|
|
password_protected=bool(row["password_hash"]),
|
|
display_name=row["display_name"],
|
|
)
|
|
)
|
|
|
|
def head(self, paste_id: str) -> Response:
|
|
"""Return paste metadata headers only."""
|
|
return self.get(paste_id)
|
|
|
|
def put(self, paste_id: str) -> Response:
|
|
"""Update paste content and/or metadata.
|
|
|
|
Requires authentication and ownership.
|
|
|
|
Content update: Send raw body with Content-Type header
|
|
Metadata update: Use headers with empty body
|
|
|
|
Headers:
|
|
- X-Paste-Password: Set/change password
|
|
- X-Remove-Password: true to remove password
|
|
- X-Extend-Expiry: Seconds to add to current expiry
|
|
- X-Display-Name: Set/change display name
|
|
- X-Remove-Display-Name: true to remove display name
|
|
"""
|
|
# Validate paste ID format
|
|
if err := validate_paste_id(paste_id):
|
|
return err
|
|
if err := require_auth():
|
|
return err
|
|
|
|
db = get_db()
|
|
|
|
# Fetch current paste
|
|
row = db.execute(
|
|
"""SELECT id, owner, content, mime_type, expires_at, password_hash
|
|
FROM pastes WHERE id = ?""",
|
|
(paste_id,),
|
|
).fetchone()
|
|
|
|
if row is None:
|
|
return error_response("Paste not found", 404)
|
|
|
|
if row["owner"] != g.client_id:
|
|
return error_response("Permission denied", 403)
|
|
|
|
# Check for burn-after-read (cannot update)
|
|
burn_check = db.execute(
|
|
"SELECT burn_after_read FROM pastes WHERE id = ?", (paste_id,)
|
|
).fetchone()
|
|
if burn_check and burn_check["burn_after_read"]:
|
|
return error_response("Cannot update burn-after-read paste", 400)
|
|
|
|
# Parse update parameters
|
|
new_password = request.headers.get("X-Paste-Password", "").strip() or None
|
|
remove_password = request.headers.get("X-Remove-Password", "").lower() in (
|
|
"true",
|
|
"1",
|
|
"yes",
|
|
)
|
|
extend_expiry_str = request.headers.get("X-Extend-Expiry", "").strip()
|
|
|
|
# Prepare update fields
|
|
update_fields = []
|
|
update_params: list[Any] = []
|
|
|
|
# Content update (if body provided)
|
|
content = request.get_data()
|
|
if content:
|
|
mime_type = request.content_type or "application/octet-stream"
|
|
# Sanitize MIME type
|
|
if not MIME_PATTERN.match(mime_type.split(";")[0].strip()):
|
|
mime_type = "application/octet-stream"
|
|
|
|
update_fields.append("content = ?")
|
|
update_params.append(content)
|
|
update_fields.append("mime_type = ?")
|
|
update_params.append(mime_type.split(";")[0].strip())
|
|
|
|
# Password update
|
|
if remove_password:
|
|
update_fields.append("password_hash = NULL")
|
|
elif new_password:
|
|
update_fields.append("password_hash = ?")
|
|
update_params.append(hash_password(new_password))
|
|
|
|
# Expiry extension
|
|
if extend_expiry_str:
|
|
try:
|
|
extend_seconds = int(extend_expiry_str)
|
|
if extend_seconds > 0:
|
|
current_expiry = row["expires_at"]
|
|
if current_expiry:
|
|
new_expiry = current_expiry + extend_seconds
|
|
else:
|
|
# If no expiry set, create one from now
|
|
new_expiry = int(time.time()) + extend_seconds
|
|
update_fields.append("expires_at = ?")
|
|
update_params.append(new_expiry)
|
|
except ValueError:
|
|
return error_response("Invalid X-Extend-Expiry value", 400)
|
|
|
|
# Display name update
|
|
remove_display_name = request.headers.get("X-Remove-Display-Name", "").lower() in (
|
|
"true",
|
|
"1",
|
|
"yes",
|
|
)
|
|
new_display_name = request.headers.get("X-Display-Name", "").strip()
|
|
if remove_display_name:
|
|
update_fields.append("display_name = NULL")
|
|
elif new_display_name:
|
|
if len(new_display_name) > 128:
|
|
return error_response("Display name too long (max 128 chars)", 400)
|
|
if CONTROL_CHAR_PATTERN.search(new_display_name):
|
|
return error_response("Display name contains invalid characters", 400)
|
|
update_fields.append("display_name = ?")
|
|
update_params.append(new_display_name)
|
|
|
|
if not update_fields:
|
|
return error_response("No updates provided", 400)
|
|
|
|
# Execute update (fields are hardcoded strings, safe from injection)
|
|
update_sql = f"UPDATE pastes SET {', '.join(update_fields)} WHERE id = ?" # noqa: S608 # nosec B608
|
|
update_params.append(paste_id)
|
|
db.execute(update_sql, update_params)
|
|
db.commit()
|
|
|
|
# Audit log paste update
|
|
if current_app.config.get("AUDIT_ENABLED", True):
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
|
|
log_event(
|
|
AuditEvent.PASTE_UPDATE,
|
|
AuditOutcome.SUCCESS,
|
|
paste_id=paste_id,
|
|
client_id=g.client_id,
|
|
client_ip=get_client_ip(),
|
|
details={"fields": [f.split(" = ")[0] for f in update_fields]},
|
|
)
|
|
|
|
# Fetch updated paste for response
|
|
updated = db.execute(
|
|
"""SELECT id, mime_type, length(content) as size, expires_at,
|
|
display_name,
|
|
CASE WHEN password_hash IS NOT NULL THEN 1 ELSE 0 END as password_protected
|
|
FROM pastes WHERE id = ?""",
|
|
(paste_id,),
|
|
).fetchone()
|
|
|
|
response_data: dict[str, Any] = {
|
|
"id": updated["id"],
|
|
"size": updated["size"],
|
|
"mime_type": updated["mime_type"],
|
|
}
|
|
if updated["expires_at"]:
|
|
response_data["expires_at"] = updated["expires_at"]
|
|
if updated["password_protected"]:
|
|
response_data["password_protected"] = True
|
|
if updated["display_name"]:
|
|
response_data["display_name"] = updated["display_name"]
|
|
|
|
return json_response(response_data)
|
|
|
|
|
|
class PasteRawView(MethodView):
|
|
"""Raw paste content retrieval."""
|
|
|
|
def get(self, paste_id: str) -> Response:
|
|
"""Retrieve raw paste content."""
|
|
# Validate and fetch
|
|
if err := validate_paste_id(paste_id):
|
|
return err
|
|
if err := fetch_paste(paste_id):
|
|
return err
|
|
|
|
row: Row = g.paste
|
|
db = g.db
|
|
|
|
burn_after_read = row["burn_after_read"]
|
|
if burn_after_read:
|
|
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
|
|
current_app.logger.info("Burn-after-read paste deleted: %s", paste_id)
|
|
|
|
db.commit()
|
|
|
|
# Audit log paste access
|
|
if current_app.config.get("AUDIT_ENABLED", True):
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
|
|
log_event(
|
|
AuditEvent.PASTE_ACCESS,
|
|
AuditOutcome.SUCCESS,
|
|
paste_id=paste_id,
|
|
client_id=get_client_fingerprint(),
|
|
client_ip=get_client_ip(),
|
|
details={"burn": bool(burn_after_read)},
|
|
)
|
|
|
|
record_paste_accessed(
|
|
"authenticated" if get_client_fingerprint() else "anonymous",
|
|
bool(burn_after_read),
|
|
)
|
|
|
|
response = Response(row["content"], mimetype=row["mime_type"])
|
|
if row["mime_type"].startswith(("image/", "text/")):
|
|
response.headers["Content-Disposition"] = "inline"
|
|
if burn_after_read:
|
|
response.headers["X-Burn-After-Read"] = "true"
|
|
|
|
return response
|
|
|
|
def head(self, paste_id: str) -> Response:
|
|
"""Return raw paste headers. HEAD triggers burn-after-read deletion.
|
|
|
|
Security note: HEAD requests count as paste access for burn-after-read
|
|
to prevent attackers from probing paste existence before retrieval.
|
|
"""
|
|
# Validate and fetch
|
|
if err := validate_paste_id(paste_id):
|
|
return err
|
|
if err := fetch_paste(paste_id):
|
|
return err
|
|
|
|
row: Row = g.paste
|
|
db = g.db
|
|
|
|
# BURN-001: HEAD triggers burn-after-read like GET
|
|
burn_after_read = row["burn_after_read"]
|
|
if burn_after_read:
|
|
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
|
|
current_app.logger.info("Burn-after-read paste deleted via HEAD: %s", paste_id)
|
|
|
|
db.commit()
|
|
|
|
response = Response(mimetype=row["mime_type"])
|
|
response.headers["Content-Length"] = str(row["size"])
|
|
if row["mime_type"].startswith(("image/", "text/")):
|
|
response.headers["Content-Disposition"] = "inline"
|
|
if burn_after_read:
|
|
response.headers["X-Burn-After-Read"] = "true"
|
|
|
|
return response
|
|
|
|
|
|
class PasteDeleteView(MethodView):
|
|
"""Paste deletion with authentication."""
|
|
|
|
def delete(self, paste_id: str) -> Response:
|
|
"""Delete paste. Requires ownership or admin rights."""
|
|
# Validate
|
|
if err := validate_paste_id(paste_id):
|
|
return err
|
|
if err := require_auth():
|
|
return err
|
|
|
|
db = get_db()
|
|
|
|
row = db.execute("SELECT owner FROM pastes WHERE id = ?", (paste_id,)).fetchone()
|
|
|
|
if row is None:
|
|
return error_response("Paste not found", 404)
|
|
|
|
# Allow if owner or admin
|
|
if row["owner"] != g.client_id and not is_admin():
|
|
return error_response("Permission denied", 403)
|
|
|
|
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
|
|
db.commit()
|
|
|
|
# Audit log paste deletion
|
|
if current_app.config.get("AUDIT_ENABLED", True):
|
|
from app.audit import AuditEvent, AuditOutcome, log_event
|
|
|
|
log_event(
|
|
AuditEvent.PASTE_DELETE,
|
|
AuditOutcome.SUCCESS,
|
|
paste_id=paste_id,
|
|
client_id=g.client_id,
|
|
client_ip=get_client_ip(),
|
|
)
|
|
|
|
record_paste_deleted("authenticated", "success")
|
|
|
|
return json_response({"message": "Paste deleted"})
|
|
|
|
|
|
class PastesListView(MethodView):
|
|
"""List pastes with authentication."""
|
|
|
|
def get(self) -> Response:
|
|
"""List pastes owned by authenticated user, or all pastes for admins.
|
|
|
|
Privacy guarantees:
|
|
- Requires authentication (mTLS client certificate)
|
|
- Regular users can ONLY see their own pastes
|
|
- Admins can see all pastes (with optional owner filter)
|
|
- Content is never returned, only metadata
|
|
|
|
Query parameters:
|
|
- limit: max results (default 50, max 200)
|
|
- offset: pagination offset (default 0)
|
|
- type: filter by MIME type (glob pattern, e.g., "image/*")
|
|
- after: filter by created_at >= timestamp
|
|
- before: filter by created_at <= timestamp
|
|
- all: (admin only) if "1", list all pastes instead of own
|
|
- owner: (admin only) filter by owner fingerprint
|
|
"""
|
|
import fnmatch
|
|
|
|
# Strict authentication requirement
|
|
if err := require_auth():
|
|
return err
|
|
|
|
client_id = g.client_id
|
|
user_is_admin = is_admin()
|
|
|
|
# Parse pagination parameters
|
|
try:
|
|
limit = min(int(request.args.get("limit", 50)), 200)
|
|
offset = max(int(request.args.get("offset", 0)), 0)
|
|
except (ValueError, TypeError):
|
|
limit, offset = 50, 0
|
|
|
|
# Parse filter parameters
|
|
type_filter = request.args.get("type", "").strip()
|
|
try:
|
|
after_ts = int(request.args.get("after", 0))
|
|
except (ValueError, TypeError):
|
|
after_ts = 0
|
|
try:
|
|
before_ts = int(request.args.get("before", 0))
|
|
except (ValueError, TypeError):
|
|
before_ts = 0
|
|
|
|
# Admin-only parameters
|
|
show_all = request.args.get("all", "0") == "1" and user_is_admin
|
|
owner_filter = request.args.get("owner", "").strip() if user_is_admin else ""
|
|
|
|
db = get_db()
|
|
|
|
# Build query with filters
|
|
where_clauses: list[str] = []
|
|
params: list[Any] = []
|
|
|
|
# Owner filtering logic
|
|
if show_all:
|
|
# Admin viewing all pastes (with optional owner filter)
|
|
if owner_filter:
|
|
where_clauses.append("owner = ?")
|
|
params.append(owner_filter)
|
|
# else: no owner filter, show all
|
|
else:
|
|
# Regular user or admin without ?all=1: show only own pastes
|
|
where_clauses.append("owner = ?")
|
|
params.append(client_id)
|
|
|
|
if after_ts > 0:
|
|
where_clauses.append("created_at >= ?")
|
|
params.append(after_ts)
|
|
if before_ts > 0:
|
|
where_clauses.append("created_at <= ?")
|
|
params.append(before_ts)
|
|
|
|
# Build WHERE clause (may be empty for admin viewing all)
|
|
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
|
|
|
|
# Count total pastes matching filters (where_sql is safe, built from constants)
|
|
count_row = db.execute(
|
|
f"SELECT COUNT(*) as total FROM pastes WHERE {where_sql}", # noqa: S608 # nosec B608
|
|
params,
|
|
).fetchone()
|
|
total = count_row["total"] if count_row else 0
|
|
|
|
# Fetch pastes with metadata only (where_sql is safe, built from constants)
|
|
# Include owner for admin view
|
|
rows = db.execute(
|
|
f"""SELECT id, owner, mime_type, length(content) as size, created_at,
|
|
last_accessed, burn_after_read, expires_at, display_name,
|
|
CASE WHEN password_hash IS NOT NULL THEN 1 ELSE 0 END as password_protected
|
|
FROM pastes
|
|
WHERE {where_sql}
|
|
ORDER BY created_at DESC
|
|
LIMIT ? OFFSET ?""", # noqa: S608 # nosec B608
|
|
[*params, limit, offset],
|
|
).fetchall()
|
|
|
|
# Apply MIME type filter (glob pattern matching done in Python for flexibility)
|
|
if type_filter:
|
|
rows = [r for r in rows if fnmatch.fnmatch(r["mime_type"], type_filter)]
|
|
|
|
pastes = [
|
|
build_paste_metadata(
|
|
paste_id=row["id"],
|
|
mime_type=row["mime_type"],
|
|
size=row["size"],
|
|
created_at=row["created_at"],
|
|
owner=row["owner"],
|
|
burn_after_read=bool(row["burn_after_read"]),
|
|
expires_at=row["expires_at"],
|
|
password_protected=bool(row["password_protected"]),
|
|
include_owner=show_all,
|
|
last_accessed=row["last_accessed"],
|
|
display_name=row["display_name"],
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
response_data: dict[str, Any] = {
|
|
"pastes": pastes,
|
|
"count": len(pastes),
|
|
"total": total,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
}
|
|
if user_is_admin:
|
|
response_data["is_admin"] = True
|
|
return json_response(response_data)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# URL Shortener Views
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class ShortURLCreateView(MethodView):
|
|
"""Create short URLs."""
|
|
|
|
def post(self) -> Response:
|
|
"""Create a new short URL."""
|
|
# Parse URL from request body
|
|
target_url: str | None = None
|
|
|
|
if request.is_json:
|
|
data = request.get_json(silent=True)
|
|
if data and isinstance(data.get("url"), str):
|
|
target_url = data["url"].strip()
|
|
else:
|
|
raw = request.get_data(as_text=True).strip()
|
|
if raw:
|
|
target_url = raw
|
|
|
|
if not target_url:
|
|
return error_response("No URL provided", 400)
|
|
|
|
# Validate URL
|
|
if err := validate_target_url(target_url):
|
|
return err
|
|
|
|
# Auth and rate limiting
|
|
trusted_client = get_client_id()
|
|
owner = get_client_fingerprint()
|
|
client_ip = get_client_ip()
|
|
|
|
# Rate limiting (trusted certs exempt)
|
|
if not trusted_client:
|
|
allowed, remaining, limit, reset_timestamp = check_rate_limit(
|
|
client_ip, authenticated=False
|
|
)
|
|
if not allowed:
|
|
record_rate_limit("blocked")
|
|
retry_after = max(1, reset_timestamp - int(time.time()))
|
|
response = error_response("Rate limit exceeded", 429, retry_after=retry_after)
|
|
response.headers["Retry-After"] = str(retry_after)
|
|
add_rate_limit_headers(response, 0, limit, reset_timestamp)
|
|
return response
|
|
else:
|
|
remaining, limit, reset_timestamp = -1, -1, 0
|
|
|
|
# Proof-of-work (trusted certs exempt)
|
|
difficulty = current_app.config["POW_DIFFICULTY"]
|
|
if difficulty > 0 and not trusted_client:
|
|
token = request.headers.get("X-PoW-Token", "")
|
|
solution = request.headers.get("X-PoW-Solution", "")
|
|
|
|
if not token or not solution:
|
|
return error_response(
|
|
"Proof-of-work required", 400, hint="GET /challenge for a new challenge"
|
|
)
|
|
|
|
valid, err_msg = verify_pow(token, solution)
|
|
if not valid:
|
|
record_pow("failure")
|
|
return error_response(f"Proof-of-work failed: {err_msg}", 400)
|
|
record_pow("success")
|
|
|
|
# Dedup check (same URL within window)
|
|
url_hash = hashlib.sha256(target_url.encode("utf-8")).hexdigest()
|
|
is_allowed, dedup_count = check_content_hash(url_hash)
|
|
if not is_allowed:
|
|
record_dedup("blocked")
|
|
window = current_app.config["CONTENT_DEDUP_WINDOW"]
|
|
return error_response(
|
|
"Duplicate URL rate limit exceeded",
|
|
429,
|
|
count=dedup_count,
|
|
window_seconds=window,
|
|
)
|
|
record_dedup("allowed")
|
|
|
|
# Parse optional expiry
|
|
expires_at = None
|
|
expiry_header = request.headers.get("X-Expiry", "").strip()
|
|
if expiry_header:
|
|
try:
|
|
expiry_seconds = int(expiry_header)
|
|
if expiry_seconds > 0:
|
|
max_expiry = current_app.config.get("MAX_EXPIRY_SECONDS", 0)
|
|
if max_expiry > 0:
|
|
expiry_seconds = min(expiry_seconds, max_expiry)
|
|
expires_at = int(time.time()) + expiry_seconds
|
|
except ValueError:
|
|
pass
|
|
|
|
# Generate short ID and insert
|
|
short_id = generate_short_id()
|
|
now = int(time.time())
|
|
|
|
db = get_db()
|
|
db.execute(
|
|
"""INSERT INTO short_urls
|
|
(id, target_url, url_hash, owner, created_at, last_accessed, expires_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(short_id, target_url, url_hash, owner, now, now, expires_at),
|
|
)
|
|
db.commit()
|
|
|
|
record_antiflood_request()
|
|
|
|
log_event(
|
|
AuditEvent.URL_CREATE,
|
|
AuditOutcome.SUCCESS,
|
|
client_id=owner,
|
|
client_ip=client_ip,
|
|
details={"short_id": short_id, "target": target_url[:128]},
|
|
)
|
|
|
|
record_url_created("authenticated" if owner else "anonymous", "success")
|
|
|
|
response_data: dict[str, Any] = {
|
|
"id": short_id,
|
|
"url": short_url_path(short_id),
|
|
"target_url": target_url,
|
|
"created_at": now,
|
|
}
|
|
if owner:
|
|
response_data["owner"] = owner
|
|
if expires_at:
|
|
response_data["expires_at"] = expires_at
|
|
|
|
response = json_response(response_data, 201)
|
|
add_rate_limit_headers(response, remaining, limit, reset_timestamp)
|
|
return response
|
|
|
|
|
|
class ShortURLRedirectView(MethodView):
|
|
"""Redirect short URLs to their targets."""
|
|
|
|
def get(self, short_id: str) -> Response:
|
|
"""302 redirect to target URL."""
|
|
if err := validate_short_id(short_id):
|
|
return err
|
|
if err := fetch_short_url(short_id, increment_counter=True):
|
|
return err
|
|
|
|
row = g.short_url
|
|
|
|
log_event(
|
|
AuditEvent.URL_ACCESS,
|
|
AuditOutcome.SUCCESS,
|
|
client_ip=get_client_ip(),
|
|
details={"short_id": short_id},
|
|
)
|
|
record_url_accessed("authenticated" if get_client_fingerprint() else "anonymous")
|
|
|
|
response = Response(status=302)
|
|
response.headers["Location"] = row["target_url"]
|
|
response.headers["Cache-Control"] = "no-cache"
|
|
return response
|
|
|
|
def head(self, short_id: str) -> Response:
|
|
"""HEAD redirect to target URL."""
|
|
return self.get(short_id)
|
|
|
|
|
|
class ShortURLInfoView(MethodView):
|
|
"""Short URL metadata."""
|
|
|
|
def get(self, short_id: str) -> Response:
|
|
"""Return short URL metadata without incrementing counter."""
|
|
if err := validate_short_id(short_id):
|
|
return err
|
|
if err := fetch_short_url(short_id, increment_counter=False):
|
|
return err
|
|
|
|
row = g.short_url
|
|
|
|
data: dict[str, Any] = {
|
|
"id": row["id"],
|
|
"target_url": row["target_url"],
|
|
"created_at": row["created_at"],
|
|
"last_accessed": row["last_accessed"],
|
|
"access_count": row["access_count"],
|
|
"url": short_url_path(short_id),
|
|
}
|
|
if row["owner"]:
|
|
data["owner"] = row["owner"]
|
|
if row["expires_at"]:
|
|
data["expires_at"] = row["expires_at"]
|
|
|
|
return json_response(data)
|
|
|
|
|
|
class ShortURLDeleteView(MethodView):
|
|
"""Delete short URLs."""
|
|
|
|
def delete(self, short_id: str) -> Response:
|
|
"""Delete a short URL. Requires ownership or admin."""
|
|
if err := validate_short_id(short_id):
|
|
return err
|
|
if err := require_auth():
|
|
return err
|
|
|
|
db = get_db()
|
|
row = db.execute("SELECT owner FROM short_urls WHERE id = ?", (short_id,)).fetchone()
|
|
|
|
if row is None:
|
|
return error_response("Short URL not found", 404)
|
|
|
|
if row["owner"] != g.client_id and not is_admin():
|
|
return error_response("Permission denied", 403)
|
|
|
|
db.execute("DELETE FROM short_urls WHERE id = ?", (short_id,))
|
|
db.commit()
|
|
|
|
log_event(
|
|
AuditEvent.URL_DELETE,
|
|
AuditOutcome.SUCCESS,
|
|
client_id=g.client_id,
|
|
client_ip=get_client_ip(),
|
|
details={"short_id": short_id},
|
|
)
|
|
record_url_deleted("authenticated", "success")
|
|
|
|
return json_response({"message": "Short URL deleted"})
|
|
|
|
|
|
class ShortURLsListView(MethodView):
|
|
"""List short URLs owned by authenticated user."""
|
|
|
|
def get(self) -> Response:
|
|
"""List owned short URLs with pagination."""
|
|
if err := require_auth():
|
|
return err
|
|
|
|
client_id = g.client_id
|
|
|
|
try:
|
|
limit = min(int(request.args.get("limit", 50)), 200)
|
|
offset = max(int(request.args.get("offset", 0)), 0)
|
|
except (ValueError, TypeError):
|
|
limit, offset = 50, 0
|
|
|
|
db = get_db()
|
|
|
|
count_row = db.execute(
|
|
"SELECT COUNT(*) as total FROM short_urls WHERE owner = ?",
|
|
(client_id,),
|
|
).fetchone()
|
|
total = count_row["total"] if count_row else 0
|
|
|
|
rows = db.execute(
|
|
"""SELECT id, target_url, created_at, last_accessed, access_count, expires_at
|
|
FROM short_urls
|
|
WHERE owner = ?
|
|
ORDER BY created_at DESC
|
|
LIMIT ? OFFSET ?""",
|
|
(client_id, limit, offset),
|
|
).fetchall()
|
|
|
|
urls = []
|
|
for row in rows:
|
|
entry: dict[str, Any] = {
|
|
"id": row["id"],
|
|
"target_url": row["target_url"],
|
|
"created_at": row["created_at"],
|
|
"access_count": row["access_count"],
|
|
"url": short_url_path(row["id"]),
|
|
}
|
|
if row["expires_at"]:
|
|
entry["expires_at"] = row["expires_at"]
|
|
urls.append(entry)
|
|
|
|
return json_response(
|
|
{
|
|
"urls": urls,
|
|
"count": len(urls),
|
|
"total": total,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
}
|
|
)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# PKI Views (Certificate Authority)
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def require_pki_enabled() -> Response | None:
|
|
"""Check if PKI is enabled. Returns error response or None if enabled."""
|
|
if not current_app.config.get("PKI_ENABLED"):
|
|
return error_response("PKI not enabled", 404)
|
|
return None
|
|
|
|
|
|
class PKIStatusView(MethodView):
|
|
"""PKI status endpoint."""
|
|
|
|
def get(self) -> Response:
|
|
"""Return PKI status and CA info if available."""
|
|
if not current_app.config.get("PKI_ENABLED"):
|
|
return json_response({"enabled": False})
|
|
|
|
from app.pki import get_ca_info
|
|
|
|
ca_info = get_ca_info()
|
|
if ca_info is None:
|
|
return json_response(
|
|
{
|
|
"enabled": True,
|
|
"ca_exists": False,
|
|
"hint": "POST /pki/ca to generate CA",
|
|
}
|
|
)
|
|
|
|
return json_response(
|
|
{
|
|
"enabled": True,
|
|
"ca_exists": True,
|
|
"common_name": ca_info["common_name"],
|
|
"fingerprint_sha1": ca_info["fingerprint_sha1"],
|
|
"created_at": ca_info["created_at"],
|
|
"expires_at": ca_info["expires_at"],
|
|
"key_algorithm": ca_info["key_algorithm"],
|
|
}
|
|
)
|
|
|
|
|
|
class PKICAGenerateView(MethodView):
|
|
"""CA generation endpoint (first-run only)."""
|
|
|
|
def post(self) -> Response:
|
|
"""Generate CA certificate. Only works if no CA exists."""
|
|
if err := require_pki_enabled():
|
|
return err
|
|
|
|
from app.pki import (
|
|
CAExistsError,
|
|
PKIError,
|
|
generate_ca,
|
|
get_ca_info,
|
|
)
|
|
|
|
# Check if CA already exists
|
|
if get_ca_info() is not None:
|
|
return error_response("CA already exists", 409)
|
|
|
|
# Get CA password from config
|
|
password = current_app.config.get("PKI_CA_PASSWORD", "")
|
|
if not password:
|
|
return error_response(
|
|
"PKI_CA_PASSWORD not configured",
|
|
500,
|
|
hint="Set FLASKPASTE_PKI_CA_PASSWORD environment variable",
|
|
)
|
|
|
|
# Parse request for optional common name
|
|
common_name = "FlaskPaste CA"
|
|
if request.is_json:
|
|
data = request.get_json(silent=True)
|
|
if data and isinstance(data.get("common_name"), str):
|
|
common_name = data["common_name"][:64]
|
|
|
|
# Generate CA
|
|
try:
|
|
days = current_app.config.get("PKI_CA_DAYS", 3650)
|
|
owner = get_client_fingerprint()
|
|
ca_info = generate_ca(common_name, password, days=days, owner=owner)
|
|
except CAExistsError:
|
|
return error_response("CA already exists", 409)
|
|
except PKIError as e:
|
|
current_app.logger.error("CA generation failed: %s", e)
|
|
return error_response("CA generation failed", 500)
|
|
|
|
current_app.logger.info(
|
|
"CA generated: cn=%s fingerprint=%s", common_name, ca_info["fingerprint_sha1"][:12]
|
|
)
|
|
log_event(
|
|
AuditEvent.CERT_ISSUED,
|
|
AuditOutcome.SUCCESS,
|
|
client_id=owner,
|
|
client_ip=request.remote_addr,
|
|
details={
|
|
"type": "ca",
|
|
"fingerprint": ca_info["fingerprint_sha1"][:16],
|
|
"common_name": common_name,
|
|
"expires_at": ca_info["expires_at"],
|
|
},
|
|
)
|
|
|
|
return json_response(
|
|
{
|
|
"message": "CA generated",
|
|
"common_name": ca_info["common_name"],
|
|
"fingerprint_sha1": ca_info["fingerprint_sha1"],
|
|
"created_at": ca_info["created_at"],
|
|
"expires_at": ca_info["expires_at"],
|
|
"download": prefixed_url("/pki/ca.crt"),
|
|
},
|
|
201,
|
|
)
|
|
|
|
|
|
class PKICADownloadView(MethodView):
|
|
"""CA certificate download endpoint."""
|
|
|
|
def get(self) -> Response:
|
|
"""Download CA certificate in PEM format."""
|
|
if err := require_pki_enabled():
|
|
return err
|
|
|
|
from app.pki import get_ca_info
|
|
|
|
ca_info = get_ca_info()
|
|
if ca_info is None:
|
|
return error_response("CA not initialized", 404)
|
|
|
|
response = Response(ca_info["certificate_pem"], mimetype="application/x-pem-file")
|
|
response.headers["Content-Disposition"] = (
|
|
f"attachment; filename={ca_info['common_name'].replace(' ', '_')}.crt"
|
|
)
|
|
return response
|
|
|
|
|
|
class PKIIssueView(MethodView):
|
|
"""Certificate issuance endpoint (open registration)."""
|
|
|
|
def post(self) -> Response:
|
|
"""Issue a new client certificate."""
|
|
if err := require_pki_enabled():
|
|
return err
|
|
|
|
from app.pki import (
|
|
CANotFoundError,
|
|
PKIError,
|
|
issue_certificate,
|
|
)
|
|
|
|
# Parse request
|
|
common_name = None
|
|
if request.is_json:
|
|
data = request.get_json(silent=True)
|
|
if data and isinstance(data.get("common_name"), str):
|
|
common_name = data["common_name"][:64]
|
|
|
|
if not common_name:
|
|
return error_response(
|
|
"common_name required", 400, hint='POST {"common_name": "your-name"}'
|
|
)
|
|
|
|
# Get CA password from config
|
|
password = current_app.config.get("PKI_CA_PASSWORD", "")
|
|
if not password:
|
|
return error_response("PKI not properly configured", 500)
|
|
|
|
# Issue certificate
|
|
try:
|
|
days = current_app.config.get("PKI_CERT_DAYS", 365)
|
|
issued_to = get_client_fingerprint()
|
|
cert_info = issue_certificate(common_name, password, days=days, issued_to=issued_to)
|
|
except CANotFoundError:
|
|
return error_response("CA not initialized", 404)
|
|
except PKIError as e:
|
|
current_app.logger.error("Certificate issuance failed: %s", e)
|
|
return error_response("Certificate issuance failed", 500)
|
|
|
|
current_app.logger.info(
|
|
"Certificate issued: cn=%s serial=%s fingerprint=%s to=%s",
|
|
common_name,
|
|
cert_info["serial"][:8],
|
|
cert_info["fingerprint_sha1"][:12],
|
|
issued_to or "anonymous",
|
|
)
|
|
log_event(
|
|
AuditEvent.CERT_ISSUED,
|
|
AuditOutcome.SUCCESS,
|
|
client_id=cert_info["fingerprint_sha1"],
|
|
client_ip=request.remote_addr,
|
|
details={
|
|
"type": "client",
|
|
"serial": cert_info["serial"][:16],
|
|
"common_name": common_name,
|
|
"issued_by": issued_to,
|
|
"expires_at": cert_info["expires_at"],
|
|
},
|
|
)
|
|
|
|
# Return certificate bundle
|
|
return json_response(
|
|
{
|
|
"message": "Certificate issued",
|
|
"serial": cert_info["serial"],
|
|
"common_name": cert_info["common_name"],
|
|
"fingerprint_sha1": cert_info["fingerprint_sha1"],
|
|
"created_at": cert_info["created_at"],
|
|
"expires_at": cert_info["expires_at"],
|
|
"certificate_pem": cert_info["certificate_pem"],
|
|
"private_key_pem": cert_info["private_key_pem"],
|
|
"is_admin": cert_info.get("is_admin", False),
|
|
},
|
|
201,
|
|
)
|
|
|
|
|
|
class PKICertsView(MethodView):
|
|
"""Certificate listing endpoint."""
|
|
|
|
def get(self) -> Response:
|
|
"""List issued certificates."""
|
|
if err := require_pki_enabled():
|
|
return err
|
|
|
|
client_id = get_client_fingerprint()
|
|
|
|
db = get_db()
|
|
|
|
# Users with certificates can see their own certs or certs they issued
|
|
# Anonymous users (no cert) see nothing
|
|
if client_id:
|
|
rows = db.execute(
|
|
"""SELECT serial, common_name, fingerprint_sha1,
|
|
created_at, expires_at, issued_to, status, revoked_at
|
|
FROM issued_certificates
|
|
WHERE issued_to = ? OR fingerprint_sha1 = ?
|
|
ORDER BY created_at DESC""",
|
|
(client_id, client_id),
|
|
).fetchall()
|
|
else:
|
|
# Anonymous: empty list
|
|
rows = []
|
|
|
|
certs = []
|
|
for row in rows:
|
|
cert = {
|
|
"serial": row["serial"],
|
|
"common_name": row["common_name"],
|
|
"fingerprint_sha1": row["fingerprint_sha1"],
|
|
"created_at": row["created_at"],
|
|
"expires_at": row["expires_at"],
|
|
"status": row["status"],
|
|
}
|
|
if row["issued_to"]:
|
|
cert["issued_to"] = row["issued_to"]
|
|
if row["revoked_at"]:
|
|
cert["revoked_at"] = row["revoked_at"]
|
|
certs.append(cert)
|
|
|
|
return json_response({"certificates": certs, "count": len(certs)})
|
|
|
|
|
|
class PKIRevokeView(MethodView):
|
|
"""Certificate revocation endpoint."""
|
|
|
|
def post(self, serial: str) -> Response:
|
|
"""Revoke a certificate by serial number."""
|
|
if err := require_pki_enabled():
|
|
return err
|
|
if err := require_auth():
|
|
return err
|
|
|
|
from app.pki import CertificateNotFoundError, PKIError, revoke_certificate
|
|
|
|
db = get_db()
|
|
|
|
# Check certificate exists and get ownership info
|
|
row = db.execute(
|
|
"SELECT issued_to, fingerprint_sha1, status FROM issued_certificates WHERE serial = ?",
|
|
(serial,),
|
|
).fetchone()
|
|
|
|
if row is None:
|
|
return error_response("Certificate not found", 404)
|
|
|
|
if row["status"] == "revoked":
|
|
return error_response("Certificate already revoked", 409)
|
|
|
|
# Check permission: must be issuer or the certificate itself
|
|
client_id = g.client_id
|
|
can_revoke = row["issued_to"] == client_id or row["fingerprint_sha1"] == client_id
|
|
|
|
if not can_revoke:
|
|
return error_response("Permission denied", 403)
|
|
|
|
# Revoke
|
|
try:
|
|
revoke_certificate(serial)
|
|
except CertificateNotFoundError:
|
|
return error_response("Certificate not found", 404)
|
|
except PKIError as e:
|
|
current_app.logger.error("Revocation failed: %s", e)
|
|
return error_response("Revocation failed", 500)
|
|
|
|
current_app.logger.info("Certificate revoked: serial=%s by=%s", serial[:8], client_id[:12])
|
|
log_event(
|
|
AuditEvent.CERT_REVOKED,
|
|
AuditOutcome.SUCCESS,
|
|
client_id=client_id,
|
|
client_ip=get_client_ip(),
|
|
details={
|
|
"serial": serial[:16],
|
|
"fingerprint": row["fingerprint_sha1"][:16],
|
|
},
|
|
)
|
|
|
|
return json_response({"message": "Certificate revoked", "serial": serial})
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Audit Log View
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class AuditLogView(MethodView):
|
|
"""Audit log query endpoint (admin only)."""
|
|
|
|
def get(self) -> Response:
|
|
"""Query audit log with filters.
|
|
|
|
Query parameters:
|
|
- event_type: Filter by event type
|
|
- client_id: Filter by client fingerprint
|
|
- paste_id: Filter by paste ID
|
|
- outcome: Filter by outcome (success, failure, blocked)
|
|
- since: Filter by timestamp >= since
|
|
- until: Filter by timestamp <= until
|
|
- limit: Maximum results (default 100, max 500)
|
|
- offset: Pagination offset
|
|
"""
|
|
if err := require_auth():
|
|
return err
|
|
if not is_admin():
|
|
return error_response("Admin access required", 403)
|
|
|
|
from app.audit import query_audit_log
|
|
|
|
# Parse query parameters
|
|
event_type = request.args.get("event_type", "").strip() or None
|
|
client_id = request.args.get("client_id", "").strip() or None
|
|
paste_id = request.args.get("paste_id", "").strip() or None
|
|
outcome = request.args.get("outcome", "").strip() or None
|
|
|
|
try:
|
|
since = int(request.args.get("since", 0)) or None
|
|
except (ValueError, TypeError):
|
|
since = None
|
|
try:
|
|
until = int(request.args.get("until", 0)) or None
|
|
except (ValueError, TypeError):
|
|
until = None
|
|
try:
|
|
limit = min(int(request.args.get("limit", 100)), 500)
|
|
except (ValueError, TypeError):
|
|
limit = 100
|
|
try:
|
|
offset = max(int(request.args.get("offset", 0)), 0)
|
|
except (ValueError, TypeError):
|
|
offset = 0
|
|
|
|
entries, total = query_audit_log(
|
|
event_type=event_type,
|
|
client_id=client_id,
|
|
paste_id=paste_id,
|
|
outcome=outcome,
|
|
since=since,
|
|
until=until,
|
|
limit=limit,
|
|
offset=offset,
|
|
)
|
|
|
|
return json_response(
|
|
{
|
|
"entries": entries,
|
|
"count": len(entries),
|
|
"total": total,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
}
|
|
)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Route Registration
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
# Index and paste creation
|
|
bp.add_url_rule("/", view_func=IndexView.as_view("index"))
|
|
|
|
# Utility endpoints
|
|
bp.add_url_rule("/health", view_func=HealthView.as_view("health"))
|
|
bp.add_url_rule("/challenge", view_func=ChallengeView.as_view("challenge"))
|
|
bp.add_url_rule("/client", view_func=ClientView.as_view("client"))
|
|
|
|
# Registration endpoints (public certificate issuance with PoW)
|
|
bp.add_url_rule(
|
|
"/register/challenge", view_func=RegisterChallengeView.as_view("register_challenge")
|
|
)
|
|
bp.add_url_rule("/register", view_func=RegisterView.as_view("register"))
|
|
|
|
# Paste operations
|
|
bp.add_url_rule("/pastes", view_func=PastesListView.as_view("pastes_list"))
|
|
bp.add_url_rule("/<paste_id>", view_func=PasteView.as_view("paste"), methods=["GET", "HEAD", "PUT"])
|
|
bp.add_url_rule(
|
|
"/<paste_id>/raw", view_func=PasteRawView.as_view("paste_raw"), methods=["GET", "HEAD"]
|
|
)
|
|
bp.add_url_rule(
|
|
"/<paste_id>", view_func=PasteDeleteView.as_view("paste_delete"), methods=["DELETE"]
|
|
)
|
|
|
|
# URL shortener endpoints
|
|
bp.add_url_rule("/s", view_func=ShortURLCreateView.as_view("short_url_create"), methods=["POST"])
|
|
bp.add_url_rule("/s", view_func=ShortURLsListView.as_view("short_urls_list"), methods=["GET"])
|
|
bp.add_url_rule(
|
|
"/s/<short_id>",
|
|
view_func=ShortURLRedirectView.as_view("short_url_redirect"),
|
|
methods=["GET", "HEAD"],
|
|
)
|
|
bp.add_url_rule("/s/<short_id>/info", view_func=ShortURLInfoView.as_view("short_url_info"))
|
|
bp.add_url_rule(
|
|
"/s/<short_id>",
|
|
view_func=ShortURLDeleteView.as_view("short_url_delete"),
|
|
methods=["DELETE"],
|
|
)
|
|
|
|
# PKI endpoints
|
|
bp.add_url_rule("/pki", view_func=PKIStatusView.as_view("pki_status"))
|
|
bp.add_url_rule("/pki/ca", view_func=PKICAGenerateView.as_view("pki_ca_generate"))
|
|
bp.add_url_rule("/pki/ca.crt", view_func=PKICADownloadView.as_view("pki_ca_download"))
|
|
bp.add_url_rule("/pki/issue", view_func=PKIIssueView.as_view("pki_issue"))
|
|
bp.add_url_rule("/pki/certs", view_func=PKICertsView.as_view("pki_certs"))
|
|
bp.add_url_rule(
|
|
"/pki/revoke/<serial>", view_func=PKIRevokeView.as_view("pki_revoke"), methods=["POST"]
|
|
)
|
|
|
|
# Audit log endpoint (admin only)
|
|
bp.add_url_rule("/audit", view_func=AuditLogView.as_view("audit_log"))
|