flaskpaste: initial commit with security hardening

Features:
- REST API for text/binary pastes with MIME detection
- Client certificate auth via X-SSL-Client-SHA1 header
- SQLite with WAL mode for concurrent access
- Automatic paste expiry with LRU cleanup

Security:
- HSTS, CSP, X-Frame-Options, X-Content-Type-Options
- Cache-Control: no-store for sensitive responses
- X-Request-ID tracing for log correlation
- X-Proxy-Secret validation for defense-in-depth
- Parameterized queries, input validation
- Size limits (3 MiB anon, 50 MiB auth)

Includes /health endpoint, container support, and 70 tests.
This commit is contained in:
Username
2025-12-16 04:42:18 +01:00
commit 8f9868f0d9
21 changed files with 2588 additions and 0 deletions

43
.containerignore Normal file
View File

@@ -0,0 +1,43 @@
# Git
.git
.gitignore
# Python
__pycache__
*.py[cod]
*$py.class
*.so
.Python
venv/
.venv/
ENV/
env/
*.egg-info/
.eggs/
dist/
build/
# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/
tests/
# IDE
.idea/
.vscode/
*.swp
*.swo
# Data (use volumes instead)
data/
# Documentation (not needed in container)
documentation/
*.md
!requirements.txt
# Misc
.DS_Store
*.log

48
.gitignore vendored Normal file
View File

@@ -0,0 +1,48 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
venv/
ENV/
.venv/
# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/
# IDE
.idea/
.vscode/
*.swp
*.swo
# Data
data/*.db
data/*.db-journal
data/*.db-wal
data/*.db-shm
# Secrets
.credentials
.credentials.json
*.pem
*.key
# Build
dist/
build/
*.egg-info/
# OS
.DS_Store
Thumbs.db
# Private
.claude/
CLAUDE.md
CRUSH.md
PERSONAE

43
Containerfile Normal file
View File

@@ -0,0 +1,43 @@
# FlaskPaste Container Image
# Build: podman build -t flaskpaste .
# Run: podman run -d -p 5000:5000 -v flaskpaste-data:/app/data flaskpaste
FROM python:3.11-slim
LABEL maintainer="FlaskPaste"
LABEL description="Lightweight secure pastebin REST API"
# Create non-root user
RUN groupadd -r flaskpaste && useradd -r -g flaskpaste flaskpaste
# Set working directory
WORKDIR /app
# Install dependencies first (cache layer)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt gunicorn
# Copy application code
COPY app/ ./app/
COPY wsgi.py .
# Create data directory
RUN mkdir -p /app/data && chown -R flaskpaste:flaskpaste /app
# Switch to non-root user
USER flaskpaste
# Environment defaults
ENV FLASK_ENV=production
ENV FLASKPASTE_DB=/app/data/pastes.db
ENV PYTHONUNBUFFERED=1
# Expose port
EXPOSE 5000
# Health check
HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:5000/health')" || exit 1
# Run with gunicorn
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "2", "--access-logfile", "-", "wsgi:application"]

173
README.md Normal file
View File

@@ -0,0 +1,173 @@
# FlaskPaste
A lightweight, secure pastebin REST API built with Flask.
## Features
- **Simple REST API** - Create, retrieve, and delete pastes via HTTP
- **Binary support** - Upload text, images, archives, and other binary content
- **Automatic MIME detection** - Magic byte detection for common formats (PNG, JPEG, GIF, WebP, ZIP, PDF, GZIP)
- **Client certificate authentication** - Optional auth via `X-SSL-Client-SHA1` header
- **Automatic expiry** - Pastes expire after configurable period of inactivity
- **Size limits** - Configurable limits for anonymous and authenticated users
- **Security headers** - HSTS, CSP, X-Frame-Options, Cache-Control, and more
- **Request tracing** - X-Request-ID support for log correlation
- **Proxy trust validation** - Optional shared secret for defense-in-depth
- **Minimal dependencies** - Flask only, SQLite built-in
## Quick Start
```bash
# Clone and setup
git clone <repository>
cd flaskpaste
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
# Run development server
python run.py
```
## API Endpoints
| Method | Endpoint | Description |
|--------|----------|-------------|
| `GET /` | API information and usage |
| `GET /health` | Health check (returns DB status) |
| `POST /` | Create a new paste |
| `GET /<id>` | Retrieve paste metadata |
| `GET /<id>/raw` | Retrieve raw paste content |
| `DELETE /<id>` | Delete paste (requires auth) |
## Usage Examples
### Create a paste (raw text)
```bash
curl --data-binary @file.txt http://localhost:5000/
```
### Create a paste (piped input)
```bash
echo "Hello, World!" | curl --data-binary @- http://localhost:5000/
```
### Create a paste (JSON)
```bash
curl -H "Content-Type: application/json" \
-d '{"content":"Hello from JSON!"}' \
http://localhost:5000/
```
### Retrieve paste metadata
```bash
curl http://localhost:5000/abc12345
```
### Retrieve raw content
```bash
curl http://localhost:5000/abc12345/raw
```
### Delete a paste (requires authentication)
```bash
curl -X DELETE \
-H "X-SSL-Client-SHA1: <your-cert-fingerprint>" \
http://localhost:5000/abc12345
```
## Configuration
Configuration via environment variables:
| Variable | Default | Description |
|----------|---------|-------------|
| `FLASK_ENV` | `development` | Environment (`development`, `production`, `testing`) |
| `FLASKPASTE_DB` | `./data/pastes.db` | SQLite database path |
| `FLASKPASTE_ID_LENGTH` | `12` | Paste ID length (hex characters) |
| `FLASKPASTE_MAX_ANON` | `3145728` (3 MiB) | Max paste size for anonymous users |
| `FLASKPASTE_MAX_AUTH` | `52428800` (50 MiB) | Max paste size for authenticated users |
| `FLASKPASTE_EXPIRY` | `432000` (5 days) | Paste expiry in seconds |
| `FLASKPASTE_PROXY_SECRET` | (empty) | Shared secret for proxy trust validation |
## Authentication
FlaskPaste uses client certificate authentication. When deployed behind a reverse proxy (nginx, Apache), configure the proxy to:
1. Terminate TLS and validate client certificates
2. Extract the certificate SHA1 fingerprint
3. Pass it via the `X-SSL-Client-SHA1` header
Example nginx configuration:
```nginx
location / {
proxy_pass http://127.0.0.1:5000;
proxy_set_header X-SSL-Client-SHA1 $ssl_client_fingerprint;
}
```
Authenticated users can:
- Upload larger pastes (50 MiB vs 3 MiB)
- Delete their own pastes
## Production Deployment
### Using Gunicorn
```bash
pip install gunicorn
gunicorn -w 4 -b 0.0.0.0:5000 wsgi:app
```
### Using Podman/Docker
```bash
podman build -t flaskpaste .
podman run -d -p 5000:5000 -v flaskpaste-data:/app/data flaskpaste
```
See `Containerfile` for container build configuration.
## Development
### Running Tests
```bash
pip install pytest pytest-cov
pytest tests/ -v
```
### Test Coverage
```bash
pytest tests/ --cov=app --cov-report=term-missing
```
### Project Structure
```
flaskpaste/
├── app/
│ ├── __init__.py # Flask app factory
│ ├── config.py # Configuration classes
│ ├── database.py # SQLite management
│ └── api/
│ ├── __init__.py # Blueprint setup
│ └── routes.py # API endpoints
├── tests/ # Test suite
├── data/ # SQLite database
├── run.py # Development server
├── wsgi.py # Production WSGI entry
├── Containerfile # Podman/Docker build
└── requirements.txt # Dependencies
```
## Security Considerations
- **Input validation** - Paste IDs are hex-only, auth headers validated
- **MIME sanitization** - Content-Type headers are sanitized
- **SQL injection protection** - Parameterized queries throughout
- **Ownership enforcement** - Only owners can delete their pastes
- **Size limits** - Prevents resource exhaustion attacks
- **Security headers** - HSTS, CSP, X-Frame-Options, X-Content-Type-Options, Cache-Control
- **Request tracing** - X-Request-ID for log correlation and debugging
- **Proxy trust** - Optional `X-Proxy-Secret` validation to prevent header spoofing
## License
MIT

225
app/__init__.py Normal file
View File

@@ -0,0 +1,225 @@
"""Flask application factory."""
import logging
import os
import sys
import uuid
from flask import Flask, Response, g, request
from app.config import config
def setup_logging(app: Flask) -> None:
"""Configure structured logging."""
log_level = logging.DEBUG if app.debug else logging.INFO
log_format = (
"%(asctime)s %(levelname)s [%(name)s] %(message)s"
if app.debug
else '{"time":"%(asctime)s","level":"%(levelname)s","logger":"%(name)s","message":"%(message)s"}'
)
logging.basicConfig(
level=log_level,
format=log_format,
stream=sys.stdout,
)
# Reduce noise from werkzeug in production
if not app.debug:
logging.getLogger("werkzeug").setLevel(logging.WARNING)
app.logger.info("FlaskPaste starting", extra={"config": type(app.config).__name__})
def setup_security_headers(app: Flask) -> None:
"""Add security headers to all responses."""
@app.after_request
def add_security_headers(response: Response) -> Response:
# Prevent MIME type sniffing
response.headers["X-Content-Type-Options"] = "nosniff"
# Prevent clickjacking
response.headers["X-Frame-Options"] = "DENY"
# XSS protection (legacy but still useful)
response.headers["X-XSS-Protection"] = "1; mode=block"
# Referrer policy
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Content Security Policy (restrictive for API)
response.headers["Content-Security-Policy"] = "default-src 'none'; frame-ancestors 'none'"
# Permissions policy
response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"
# HSTS - enforce HTTPS (1 year, include subdomains)
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
# Prevent caching of sensitive paste data
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, private"
response.headers["Pragma"] = "no-cache"
return response
def setup_request_id(app: Flask) -> None:
"""Add request ID tracking for log correlation and tracing."""
@app.before_request
def assign_request_id():
# Use incoming X-Request-ID from proxy, or generate a new one
request_id = request.headers.get("X-Request-ID", "").strip()
if not request_id:
request_id = str(uuid.uuid4())
g.request_id = request_id
@app.after_request
def add_request_id_header(response: Response) -> Response:
# Echo request ID back to client for tracing
request_id = getattr(g, "request_id", None)
if request_id:
response.headers["X-Request-ID"] = request_id
# Access logging with request ID
app.logger.info(
"%s %s %s [rid=%s]",
request.method,
request.path,
response.status_code,
request_id or "-",
)
return response
def setup_error_handlers(app: Flask) -> None:
"""Register global error handlers."""
import json
@app.errorhandler(400)
def bad_request(error):
app.logger.warning("Bad request: %s [rid=%s]", request.path, getattr(g, "request_id", "-"))
return Response(
json.dumps({"error": "Bad request"}),
status=400,
mimetype="application/json",
)
@app.errorhandler(404)
def not_found(error):
return Response(
json.dumps({"error": "Not found"}),
status=404,
mimetype="application/json",
)
@app.errorhandler(429)
def rate_limit_exceeded(error):
app.logger.warning(
"Rate limit exceeded: %s from %s [rid=%s]",
request.path, request.remote_addr, getattr(g, "request_id", "-")
)
return Response(
json.dumps({"error": "Rate limit exceeded", "retry_after": error.description}),
status=429,
mimetype="application/json",
)
@app.errorhandler(500)
def internal_error(error):
app.logger.error(
"Internal error: %s - %s [rid=%s]",
request.path, str(error), getattr(g, "request_id", "-")
)
return Response(
json.dumps({"error": "Internal server error"}),
status=500,
mimetype="application/json",
)
@app.errorhandler(Exception)
def handle_exception(error):
app.logger.exception(
"Unhandled exception: %s [rid=%s]",
str(error), getattr(g, "request_id", "-")
)
return Response(
json.dumps({"error": "Internal server error"}),
status=500,
mimetype="application/json",
)
def setup_rate_limiting(app: Flask) -> None:
"""Configure rate limiting."""
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
key_func=get_remote_address,
app=app,
default_limits=["200 per day", "60 per hour"],
storage_uri="memory://",
strategy="fixed-window",
)
# Store limiter on app for use in routes
app.extensions["limiter"] = limiter
return limiter
def setup_metrics(app: Flask) -> None:
"""Configure Prometheus metrics."""
# Only enable metrics in production
if app.config.get("TESTING"):
return
try:
from prometheus_flask_exporter import PrometheusMetrics
metrics = PrometheusMetrics(app)
# Add app info
metrics.info("flaskpaste_info", "FlaskPaste application info", version="1.0.0")
app.extensions["metrics"] = metrics
except ImportError:
app.logger.warning("prometheus_flask_exporter not available, metrics disabled")
def create_app(config_name: str | None = None) -> Flask:
"""Create and configure the Flask application."""
if config_name is None:
config_name = os.environ.get("FLASK_ENV", "default")
app = Flask(__name__)
app.config.from_object(config[config_name])
# Setup logging first
setup_logging(app)
# Setup request ID tracking
setup_request_id(app)
# Setup security headers
setup_security_headers(app)
# Setup error handlers
setup_error_handlers(app)
# Setup rate limiting (skip in testing)
if not app.config.get("TESTING"):
setup_rate_limiting(app)
# Setup metrics (skip in testing)
setup_metrics(app)
# Initialize database
from app import database
database.init_app(app)
# Register blueprints
from app.api import bp as api_bp
app.register_blueprint(api_bp)
app.logger.info("FlaskPaste initialized successfully")
return app

32
app/api/__init__.py Normal file
View File

@@ -0,0 +1,32 @@
"""API blueprint registration."""
import time
from flask import Blueprint, current_app
bp = Blueprint("api", __name__)
# Throttle cleanup to run at most once per hour
_last_cleanup = 0
_CLEANUP_INTERVAL = 3600 # 1 hour
@bp.before_request
def cleanup_expired():
"""Periodically clean up expired pastes."""
global _last_cleanup
now = time.time()
if now - _last_cleanup < _CLEANUP_INTERVAL:
return
_last_cleanup = now
from app.database import cleanup_expired_pastes
count = cleanup_expired_pastes()
if count > 0:
current_app.logger.info(f"Cleaned up {count} expired paste(s)")
from app.api import routes # noqa: E402, F401

319
app/api/routes.py Normal file
View File

@@ -0,0 +1,319 @@
"""API route handlers."""
import hashlib
import hmac
import json
import re
import time
from flask import Response, current_app, request
from app.api import bp
from app.database import get_db
# Valid paste ID pattern (hexadecimal only)
PASTE_ID_PATTERN = re.compile(r"^[a-f0-9]+$")
# Valid client certificate SHA1 pattern (40 hex chars)
CLIENT_ID_PATTERN = re.compile(r"^[a-f0-9]{40}$")
# Magic bytes for common binary formats
MAGIC_SIGNATURES = {
b"\x89PNG\r\n\x1a\n": "image/png",
b"\xff\xd8\xff": "image/jpeg",
b"GIF87a": "image/gif",
b"GIF89a": "image/gif",
b"RIFF": "image/webp", # WebP (check for WEBP after RIFF)
b"PK\x03\x04": "application/zip",
b"%PDF": "application/pdf",
b"\x1f\x8b": "application/gzip",
}
def _is_valid_paste_id(paste_id: str) -> bool:
"""Validate paste ID format (hexadecimal, correct length)."""
expected_length = current_app.config["PASTE_ID_LENGTH"]
return (
len(paste_id) == expected_length
and PASTE_ID_PATTERN.match(paste_id) is not None
)
def _detect_mime_type(content: bytes, content_type: str | None = None) -> str:
"""Detect MIME type from content bytes, with magic byte detection taking priority."""
# Check magic bytes first - most reliable method
for magic, mime in MAGIC_SIGNATURES.items():
if content.startswith(magic):
# Special case for WebP (RIFF....WEBP)
if magic == b"RIFF" and len(content) >= 12:
if content[8:12] != b"WEBP":
continue
return mime
# Trust explicit Content-Type if it's specific (not generic defaults)
generic_types = {
"application/octet-stream",
"application/x-www-form-urlencoded",
"text/plain",
}
if content_type:
mime = content_type.split(";")[0].strip().lower()
if mime not in generic_types:
# Sanitize: only allow safe characters in MIME type
if re.match(r"^[a-z0-9][a-z0-9!#$&\-^_.+]*\/[a-z0-9][a-z0-9!#$&\-^_.+]*$", mime):
return mime
# Try to decode as UTF-8 text
try:
content.decode("utf-8")
return "text/plain"
except UnicodeDecodeError:
return "application/octet-stream"
def _generate_id(content: bytes) -> str:
"""Generate a short unique ID from content hash and timestamp."""
data = content + str(time.time_ns()).encode()
length = current_app.config["PASTE_ID_LENGTH"]
return hashlib.sha256(data).hexdigest()[:length]
def _json_response(data: dict, status: int = 200) -> Response:
"""Create a JSON response with proper encoding and security headers."""
response = Response(
json.dumps(data, ensure_ascii=False),
status=status,
mimetype="application/json",
)
return response
def _is_trusted_proxy() -> bool:
"""Verify request comes from a trusted reverse proxy.
If TRUSTED_PROXY_SECRET is configured, the request must include a matching
X-Proxy-Secret header. This provides defense-in-depth against header spoofing
if an attacker bypasses the reverse proxy.
Returns True if no secret is configured (backwards compatible) or if the
secret matches.
"""
expected_secret = current_app.config.get("TRUSTED_PROXY_SECRET", "")
if not expected_secret:
# No secret configured - trust all requests (backwards compatible)
return True
# Constant-time comparison to prevent timing attacks
provided_secret = request.headers.get("X-Proxy-Secret", "")
return hmac.compare_digest(expected_secret, provided_secret)
def _get_client_id() -> str | None:
"""Extract and validate client identity from X-SSL-Client-SHA1 header.
Returns lowercase SHA1 fingerprint or None if not present/invalid.
SECURITY: The X-SSL-Client-SHA1 header is only trusted if the request
comes from a trusted proxy (verified via X-Proxy-Secret if configured).
"""
# Verify request comes from trusted proxy before trusting auth headers
if not _is_trusted_proxy():
current_app.logger.warning(
"Auth header ignored: X-Proxy-Secret mismatch from %s",
request.remote_addr
)
return None
client_sha1 = request.headers.get("X-SSL-Client-SHA1", "").strip().lower()
# Validate format: must be 40 hex characters (SHA1)
if client_sha1 and CLIENT_ID_PATTERN.match(client_sha1):
return client_sha1
return None
@bp.route("/health", methods=["GET"])
def health():
"""Health check endpoint for load balancers and monitoring."""
try:
db = get_db()
db.execute("SELECT 1")
return _json_response({"status": "healthy", "database": "ok"})
except Exception:
return _json_response({"status": "unhealthy", "database": "error"}, 503)
@bp.route("/", methods=["GET", "POST"])
def index():
"""Handle API info (GET) and paste creation (POST)."""
if request.method == "POST":
return create_paste()
return _json_response(
{
"name": "FlaskPaste",
"version": "1.0.0",
"endpoints": {
"GET /": "API information",
"GET /health": "Health check",
"POST /": "Create paste",
"GET /<id>": "Retrieve paste metadata",
"GET /<id>/raw": "Retrieve raw paste content",
"DELETE /<id>": "Delete paste",
},
"usage": {
"raw": "curl --data-binary @file.txt http://host/",
"pipe": "cat file.txt | curl --data-binary @- http://host/",
"json": "curl -H 'Content-Type: application/json' -d '{\"content\":\"...\"}' http://host/",
},
"note": "Use --data-binary (not -d) to preserve newlines",
}
)
def create_paste():
"""Create a new paste from request body."""
content: bytes | None = None
mime_type: str | None = None
if request.is_json:
data = request.get_json(silent=True)
if data and isinstance(data.get("content"), str):
content = data["content"].encode("utf-8")
mime_type = "text/plain"
else:
content = request.get_data(as_text=False)
if content:
mime_type = _detect_mime_type(content, request.content_type)
if not content:
return _json_response({"error": "No content provided"}, 400)
owner = _get_client_id()
# Enforce size limits based on authentication
content_size = len(content)
if owner:
max_size = current_app.config["MAX_PASTE_SIZE_AUTH"]
else:
max_size = current_app.config["MAX_PASTE_SIZE_ANON"]
if content_size > max_size:
return _json_response({
"error": "Paste too large",
"size": content_size,
"max_size": max_size,
"authenticated": owner is not None,
}, 413)
paste_id = _generate_id(content)
now = int(time.time())
db = get_db()
db.execute(
"INSERT INTO pastes (id, content, mime_type, owner, created_at, last_accessed) VALUES (?, ?, ?, ?, ?, ?)",
(paste_id, content, mime_type, owner, now, now),
)
db.commit()
response_data = {
"id": paste_id,
"url": f"/{paste_id}",
"raw": f"/{paste_id}/raw",
"mime_type": mime_type,
"created_at": now,
}
if owner:
response_data["owner"] = owner
return _json_response(response_data, 201)
@bp.route("/<paste_id>", methods=["GET"])
def get_paste(paste_id: str):
"""Retrieve paste metadata by ID."""
if not _is_valid_paste_id(paste_id):
return _json_response({"error": "Invalid paste ID"}, 400)
db = get_db()
now = int(time.time())
# Update last_accessed and return paste in one transaction
db.execute(
"UPDATE pastes SET last_accessed = ? WHERE id = ?", (now, paste_id)
)
row = db.execute(
"SELECT id, mime_type, created_at, length(content) as size FROM pastes WHERE id = ?",
(paste_id,)
).fetchone()
db.commit()
if row is None:
return _json_response({"error": "Paste not found"}, 404)
return _json_response({
"id": row["id"],
"mime_type": row["mime_type"],
"size": row["size"],
"created_at": row["created_at"],
"raw": f"/{paste_id}/raw",
})
@bp.route("/<paste_id>/raw", methods=["GET"])
def get_paste_raw(paste_id: str):
"""Retrieve raw paste content with correct MIME type."""
if not _is_valid_paste_id(paste_id):
return _json_response({"error": "Invalid paste ID"}, 400)
db = get_db()
now = int(time.time())
# Update last_accessed and return paste in one transaction
db.execute(
"UPDATE pastes SET last_accessed = ? WHERE id = ?", (now, paste_id)
)
row = db.execute(
"SELECT content, mime_type FROM pastes WHERE id = ?", (paste_id,)
).fetchone()
db.commit()
if row is None:
return _json_response({"error": "Paste not found"}, 404)
mime_type = row["mime_type"]
response = Response(row["content"], mimetype=mime_type)
# Display inline for images and text, let browser decide for others
if mime_type.startswith(("image/", "text/")):
response.headers["Content-Disposition"] = "inline"
return response
@bp.route("/<paste_id>", methods=["DELETE"])
def delete_paste(paste_id: str):
"""Delete a paste by ID. Requires ownership via X-SSL-Client-SHA1 header."""
if not _is_valid_paste_id(paste_id):
return _json_response({"error": "Invalid paste ID"}, 400)
client_id = _get_client_id()
if not client_id:
return _json_response({"error": "Authentication required"}, 401)
db = get_db()
# Check paste exists and verify ownership
row = db.execute(
"SELECT owner FROM pastes WHERE id = ?", (paste_id,)
).fetchone()
if row is None:
return _json_response({"error": "Paste not found"}, 404)
if row["owner"] != client_id:
return _json_response({"error": "Permission denied"}, 403)
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
db.commit()
return _json_response({"message": "Paste deleted"})

56
app/config.py Normal file
View File

@@ -0,0 +1,56 @@
"""Application configuration."""
import os
from pathlib import Path
class Config:
"""Base configuration."""
BASE_DIR = Path(__file__).parent.parent
DATABASE = os.environ.get("FLASKPASTE_DB", BASE_DIR / "data" / "pastes.db")
PASTE_ID_LENGTH = int(os.environ.get("FLASKPASTE_ID_LENGTH", "12"))
# Paste size limits
MAX_PASTE_SIZE_ANON = int(os.environ.get("FLASKPASTE_MAX_ANON", 3 * 1024 * 1024)) # 3MiB
MAX_PASTE_SIZE_AUTH = int(os.environ.get("FLASKPASTE_MAX_AUTH", 50 * 1024 * 1024)) # 50MiB
MAX_CONTENT_LENGTH = MAX_PASTE_SIZE_AUTH # Flask request limit
# Paste expiry (default 5 days)
PASTE_EXPIRY_SECONDS = int(os.environ.get("FLASKPASTE_EXPIRY", 5 * 24 * 60 * 60))
# Reverse proxy trust configuration
# SECURITY: The X-SSL-Client-SHA1 header is trusted for authentication.
# This header MUST only come from a trusted reverse proxy that validates
# client certificates. Direct access to this app MUST be blocked.
#
# Set FLASKPASTE_PROXY_SECRET to require the proxy to send a matching
# X-Proxy-Secret header, providing defense-in-depth against header spoofing.
TRUSTED_PROXY_SECRET = os.environ.get("FLASKPASTE_PROXY_SECRET", "")
class DevelopmentConfig(Config):
"""Development configuration."""
DEBUG = True
class ProductionConfig(Config):
"""Production configuration."""
DEBUG = False
class TestingConfig(Config):
"""Testing configuration."""
TESTING = True
DATABASE = ":memory:"
config = {
"development": DevelopmentConfig,
"production": ProductionConfig,
"testing": TestingConfig,
"default": DevelopmentConfig,
}

96
app/database.py Normal file
View File

@@ -0,0 +1,96 @@
"""Database connection and schema management."""
import sqlite3
import time
from pathlib import Path
from flask import current_app, g
SCHEMA = """
CREATE TABLE IF NOT EXISTS pastes (
id TEXT PRIMARY KEY,
content BLOB NOT NULL,
mime_type TEXT NOT NULL DEFAULT 'text/plain',
owner TEXT,
created_at INTEGER NOT NULL,
last_accessed INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_pastes_created_at ON pastes(created_at);
CREATE INDEX IF NOT EXISTS idx_pastes_owner ON pastes(owner);
CREATE INDEX IF NOT EXISTS idx_pastes_last_accessed ON pastes(last_accessed);
"""
# Hold reference for in-memory shared cache databases
_memory_db_holder = None
def _get_connection_string(db_path) -> tuple[str, dict]:
"""Get connection string and kwargs for sqlite3.connect."""
if isinstance(db_path, Path):
db_path.parent.mkdir(parents=True, exist_ok=True)
return str(db_path), {}
if db_path == ":memory:":
return "file::memory:?cache=shared", {"uri": True}
return db_path, {}
def get_db() -> sqlite3.Connection:
"""Get database connection for current request context."""
if "db" not in g:
db_path = current_app.config["DATABASE"]
conn_str, kwargs = _get_connection_string(db_path)
g.db = sqlite3.connect(conn_str, **kwargs)
g.db.row_factory = sqlite3.Row
g.db.execute("PRAGMA foreign_keys = ON")
if isinstance(db_path, Path):
g.db.execute("PRAGMA journal_mode = WAL")
return g.db
def close_db(exception=None) -> None:
"""Close database connection at end of request."""
db = g.pop("db", None)
if db is not None:
db.close()
def init_db() -> None:
"""Initialize database schema."""
global _memory_db_holder
db_path = current_app.config["DATABASE"]
conn_str, kwargs = _get_connection_string(db_path)
# For in-memory databases, keep a connection alive
if db_path == ":memory:":
_memory_db_holder = sqlite3.connect(conn_str, **kwargs)
_memory_db_holder.executescript(SCHEMA)
_memory_db_holder.commit()
else:
db = get_db()
db.executescript(SCHEMA)
db.commit()
def cleanup_expired_pastes() -> int:
"""Delete pastes that haven't been accessed within expiry period.
Returns number of deleted pastes.
"""
expiry_seconds = current_app.config["PASTE_EXPIRY_SECONDS"]
cutoff = int(time.time()) - expiry_seconds
db = get_db()
cursor = db.execute("DELETE FROM pastes WHERE last_accessed < ?", (cutoff,))
db.commit()
return cursor.rowcount
def init_app(app) -> None:
"""Register database functions with Flask app."""
app.teardown_appcontext(close_db)
with app.app_context():
init_db()

38
compose.yaml Normal file
View File

@@ -0,0 +1,38 @@
# FlaskPaste Container Compose
# Usage: podman-compose up -d
# Or: podman compose up -d
services:
flaskpaste:
build:
context: .
dockerfile: Containerfile
container_name: flaskpaste
restart: unless-stopped
ports:
- "5000:5000"
volumes:
- flaskpaste-data:/app/data
environment:
- FLASK_ENV=production
- FLASKPASTE_EXPIRY=432000 # 5 days
- FLASKPASTE_MAX_ANON=3145728 # 3 MiB
- FLASKPASTE_MAX_AUTH=52428800 # 50 MiB
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:5000/health')"]
interval: 30s
timeout: 5s
retries: 3
start_period: 5s
deploy:
resources:
limits:
cpus: '1.0'
memory: 256M
reservations:
cpus: '0.25'
memory: 64M
volumes:
flaskpaste-data:
driver: local

333
documentation/api.md Normal file
View File

@@ -0,0 +1,333 @@
# FlaskPaste API Reference
## Overview
FlaskPaste provides a RESTful API for creating, retrieving, and deleting text and binary pastes.
**Base URL:** `http://your-server:5000/`
**Content Types:**
- Requests: `application/json`, `text/plain`, `application/octet-stream`, or any binary type
- Responses: `application/json` for metadata, original MIME type for raw content
## Authentication
Authentication is optional and uses client certificate fingerprints passed via the `X-SSL-Client-SHA1` header.
```http
X-SSL-Client-SHA1: a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2
```
The fingerprint must be exactly 40 lowercase hexadecimal characters (SHA1).
**Benefits of authentication:**
- Larger upload limit (50 MiB vs 3 MiB)
- Ability to delete owned pastes
---
## Endpoints
### GET /health
Health check endpoint for load balancers and monitoring.
**Request:**
```http
GET /health HTTP/1.1
Host: localhost:5000
```
**Response (200 OK):**
```json
{
"status": "healthy",
"database": "ok"
}
```
**Response (503 Service Unavailable):**
```json
{
"status": "unhealthy",
"database": "error"
}
```
---
### GET /
Returns API information and usage instructions.
**Request:**
```http
GET / HTTP/1.1
Host: localhost:5000
```
**Response:**
```json
{
"name": "FlaskPaste",
"version": "1.0.0",
"endpoints": {
"GET /": "API information",
"GET /health": "Health check",
"POST /": "Create paste",
"GET /<id>": "Retrieve paste metadata",
"GET /<id>/raw": "Retrieve raw paste content",
"DELETE /<id>": "Delete paste"
},
"usage": {
"raw": "curl --data-binary @file.txt http://host/",
"pipe": "cat file.txt | curl --data-binary @- http://host/",
"json": "curl -H 'Content-Type: application/json' -d '{\"content\":\"...\"}' http://host/"
},
"note": "Use --data-binary (not -d) to preserve newlines"
}
```
---
### POST /
Create a new paste.
**Request (Raw Binary):**
```http
POST / HTTP/1.1
Host: localhost:5000
Content-Type: application/octet-stream
```
**Request (JSON):**
```http
POST / HTTP/1.1
Host: localhost:5000
Content-Type: application/json
```
**Response (201 Created):**
```json
{
"id": "abc12345",
"url": "/abc12345",
"raw": "/abc12345/raw",
"mime_type": "text/plain",
"created_at": 1700000000,
"owner": "a1b2c3..." // Only present if authenticated
}
```
**Errors:**
| Code | Description |
|------|-------------|
| 400 | No content provided |
| 413 | Paste too large |
**Size Limits:**
- Anonymous: 3 MiB (configurable via `FLASKPASTE_MAX_ANON`)
- Authenticated: 50 MiB (configurable via `FLASKPASTE_MAX_AUTH`)
---
### GET /{id}
Retrieve paste metadata.
**Request:**
```http
GET /abc12345 HTTP/1.1
Host: localhost:5000
```
**Response (200 OK):**
```json
{
"id": "abc12345",
"mime_type": "text/plain",
"size": 1234,
"created_at": 1700000000,
"raw": "/abc12345/raw"
}
```
**Errors:**
| Code | Description |
|------|-------------|
| 400 | Invalid paste ID format |
| 404 | Paste not found |
---
### GET /{id}/raw
Retrieve raw paste content with correct MIME type.
**Request:**
```http
GET /abc12345/raw HTTP/1.1
Host: localhost:5000
```
**Response (200 OK):**
```http
HTTP/1.1 200 OK
Content-Type: image/png
Content-Disposition: inline
<binary content>
```
- `Content-Disposition: inline` is set for `image/*` and `text/*` types
- Content-Type matches the detected/stored MIME type
**Errors:**
| Code | Description |
|------|-------------|
| 400 | Invalid paste ID format |
| 404 | Paste not found |
---
### DELETE /{id}
Delete a paste. Requires authentication and ownership.
**Request:**
```http
DELETE /abc12345 HTTP/1.1
Host: localhost:5000
X-SSL-Client-SHA1: a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2
```
**Response (200 OK):**
```json
{
"message": "Paste deleted"
}
```
**Errors:**
| Code | Description |
|------|-------------|
| 400 | Invalid paste ID format |
| 401 | Authentication required |
| 403 | Permission denied (not owner) |
| 404 | Paste not found |
---
## MIME Type Detection
FlaskPaste automatically detects MIME types using:
1. **Magic byte signatures** (highest priority)
- PNG: `\x89PNG\r\n\x1a\n`
- JPEG: `\xff\xd8\xff`
- GIF: `GIF87a` or `GIF89a`
- WebP: `RIFF....WEBP`
- ZIP: `PK\x03\x04`
- PDF: `%PDF`
- GZIP: `\x1f\x8b`
2. **Explicit Content-Type header** (if not generic)
3. **UTF-8 detection** (falls back to `text/plain`)
4. **Binary fallback** (`application/octet-stream`)
---
## Paste Expiry
Pastes expire based on last access time (default: 5 days).
- Every `GET /{id}` or `GET /{id}/raw` updates the last access timestamp
- Cleanup runs automatically (hourly, throttled)
- Configurable via `FLASKPASTE_EXPIRY` environment variable
---
## Error Response Format
All errors return JSON:
```json
{
"error": "Description of the error"
}
```
For size limit errors (413):
```json
{
"error": "Paste too large",
"size": 5000000,
"max_size": 3145728,
"authenticated": false
}
```
---
## Security Headers
All responses include the following security headers:
| Header | Value |
|--------|-------|
| `X-Content-Type-Options` | `nosniff` |
| `X-Frame-Options` | `DENY` |
| `X-XSS-Protection` | `1; mode=block` |
| `Referrer-Policy` | `strict-origin-when-cross-origin` |
| `Content-Security-Policy` | `default-src 'none'; frame-ancestors 'none'` |
| `Permissions-Policy` | `geolocation=(), microphone=(), camera=()` |
| `Strict-Transport-Security` | `max-age=31536000; includeSubDomains` |
| `Cache-Control` | `no-store, no-cache, must-revalidate, private` |
| `Pragma` | `no-cache` |
---
## Request Tracing
All requests include an `X-Request-ID` header for log correlation:
- If the client provides `X-Request-ID`, it is passed through
- If not provided, a UUID is generated
- The ID is echoed back in the response
- All log entries include `[rid=<request-id>]`
**Example:**
```bash
# Client-provided ID
curl -H "X-Request-ID: my-trace-123" https://paste.example.com/health
# Response includes:
# X-Request-ID: my-trace-123
```
---
## Proxy Trust Validation
When `FLASKPASTE_PROXY_SECRET` is configured, the application validates that requests come from a trusted reverse proxy by checking the `X-Proxy-Secret` header.
This provides defense-in-depth against header spoofing if an attacker bypasses the reverse proxy.
**Configuration:**
```bash
export FLASKPASTE_PROXY_SECRET="your-secret-value"
```
**Proxy Configuration (HAProxy):**
```
http-request set-header X-Proxy-Secret your-secret-value
```
If the secret doesn't match, authentication headers (`X-SSL-Client-SHA1`) are ignored and the request is treated as anonymous.
If the secret doesn't match, authentication headers (`X-SSL-Client-SHA1`) are ignored and the request is treated as anonymous.

305
documentation/deployment.md Normal file
View File

@@ -0,0 +1,305 @@
# FlaskPaste Deployment Guide
## Overview
FlaskPaste can be deployed in several ways:
- Development server (for testing only)
- WSGI server (Gunicorn, uWSGI)
- Container (Podman/Docker)
## Prerequisites
- Python 3.11+
- SQLite 3
- Reverse proxy for TLS termination (nginx, Apache, Caddy)
---
## Development Server
**For testing only - not for production!**
```bash
# Setup
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
# Run
python run.py
# or
flask --app app run --debug
```
---
## Production: WSGI Server
### Gunicorn
```bash
pip install gunicorn
# Basic usage
gunicorn -w 4 -b 0.0.0.0:5000 wsgi:app
# With Unix socket (recommended for nginx)
gunicorn -w 4 --bind unix:/run/flaskpaste/gunicorn.sock wsgi:app
# Production settings
gunicorn \
--workers 4 \
--bind unix:/run/flaskpaste/gunicorn.sock \
--access-logfile /var/log/flaskpaste/access.log \
--error-logfile /var/log/flaskpaste/error.log \
--capture-output \
wsgi:app
```
### Systemd Service
Create `/etc/systemd/system/flaskpaste.service`:
```ini
[Unit]
Description=FlaskPaste Pastebin Service
After=network.target
[Service]
Type=notify
User=flaskpaste
Group=flaskpaste
RuntimeDirectory=flaskpaste
WorkingDirectory=/opt/flaskpaste
Environment="FLASK_ENV=production"
Environment="FLASKPASTE_DB=/var/lib/flaskpaste/pastes.db"
ExecStart=/opt/flaskpaste/venv/bin/gunicorn \
--workers 4 \
--bind unix:/run/flaskpaste/gunicorn.sock \
wsgi:app
ExecReload=/bin/kill -s HUP $MAINPID
Restart=on-failure
RestartSec=5
[Install]
WantedBy=multi-user.target
```
```bash
sudo systemctl daemon-reload
sudo systemctl enable --now flaskpaste
```
---
## Production: Container Deployment
### Build the Container
```bash
# Using Podman
podman build -t flaskpaste .
# Using Docker
docker build -t flaskpaste .
```
### Run the Container
```bash
# Basic run
podman run -d \
--name flaskpaste \
-p 5000:5000 \
-v flaskpaste-data:/app/data \
flaskpaste
# With environment configuration
podman run -d \
--name flaskpaste \
-p 5000:5000 \
-v flaskpaste-data:/app/data \
-e FLASKPASTE_EXPIRY=604800 \
-e FLASKPASTE_MAX_ANON=1048576 \
flaskpaste
```
### Podman Quadlet (systemd integration)
Create `/etc/containers/systemd/flaskpaste.container`:
```ini
[Unit]
Description=FlaskPaste Container
After=local-fs.target
[Container]
Image=localhost/flaskpaste:latest
ContainerName=flaskpaste
PublishPort=5000:5000
Volume=flaskpaste-data:/app/data:Z
Environment=FLASK_ENV=production
Environment=FLASKPASTE_EXPIRY=432000
[Service]
Restart=on-failure
RestartSec=10
[Install]
WantedBy=multi-user.target default.target
```
```bash
systemctl --user daemon-reload
systemctl --user start flaskpaste
```
---
## Reverse Proxy Configuration
### nginx
```nginx
upstream flaskpaste {
server unix:/run/flaskpaste/gunicorn.sock;
# or for container: server 127.0.0.1:5000;
}
server {
listen 443 ssl http2;
server_name paste.example.com;
ssl_certificate /etc/ssl/certs/paste.example.com.crt;
ssl_certificate_key /etc/ssl/private/paste.example.com.key;
# Optional: Client certificate authentication
ssl_client_certificate /etc/ssl/certs/ca.crt;
ssl_verify_client optional;
# Size limit (should match FLASKPASTE_MAX_AUTH)
client_max_body_size 50M;
location / {
proxy_pass http://flaskpaste;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Pass client certificate fingerprint
proxy_set_header X-SSL-Client-SHA1 $ssl_client_fingerprint;
}
}
```
### Caddy
```caddyfile
paste.example.com {
reverse_proxy localhost:5000
# Client certificate auth (if needed)
tls {
client_auth {
mode request
trusted_ca_cert_file /etc/ssl/certs/ca.crt
}
}
}
```
### HAProxy
```haproxy
frontend https
bind *:443 ssl crt /etc/haproxy/certs/
# Route to flaskpaste backend
acl is_paste path_beg /paste
use_backend backend-flaskpaste if is_paste
backend backend-flaskpaste
mode http
# Strip /paste prefix before forwarding
http-request replace-path /paste(.*) \1
http-request set-path / if { path -m len 0 }
# Request tracing - generate X-Request-ID if not present
http-request set-header X-Request-ID %[uuid()] unless { req.hdr(X-Request-ID) -m found }
http-request set-var(txn.request_id) req.hdr(X-Request-ID)
http-response set-header X-Request-ID %[var(txn.request_id)]
# Proxy trust secret - proves request came through HAProxy
http-request set-header X-Proxy-Secret your-secret-value-here
# Pass client certificate fingerprint (if using mTLS)
http-request set-header X-SSL-Client-SHA1 %[ssl_c_sha1,hex,lower]
server flaskpaste 127.0.0.1:5000 check
```
---
## Data Persistence
### Database Location
Default: `./data/pastes.db`
Configure via `FLASKPASTE_DB`:
```bash
export FLASKPASTE_DB=/var/lib/flaskpaste/pastes.db
```
### Backup
```bash
# SQLite online backup
sqlite3 /var/lib/flaskpaste/pastes.db ".backup '/backup/pastes-$(date +%Y%m%d).db'"
```
### Container Volume
```bash
# Create named volume
podman volume create flaskpaste-data
# Backup from volume
podman run --rm \
-v flaskpaste-data:/app/data:ro \
-v ./backup:/backup \
alpine cp /app/data/pastes.db /backup/
```
---
## Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `FLASK_ENV` | `development` | Set to `production` in production |
| `FLASKPASTE_DB` | `./data/pastes.db` | Database file path |
| `FLASKPASTE_ID_LENGTH` | `12` | Paste ID length (hex chars) |
| `FLASKPASTE_MAX_ANON` | `3145728` | Max anonymous paste (bytes) |
| `FLASKPASTE_MAX_AUTH` | `52428800` | Max authenticated paste (bytes) |
| `FLASKPASTE_EXPIRY` | `432000` | Paste expiry (seconds) |
| `FLASKPASTE_PROXY_SECRET` | (empty) | Shared secret for proxy trust validation |
---
## Security Checklist
- [ ] Run behind reverse proxy with TLS
- [ ] Use non-root user in containers
- [ ] Set appropriate file permissions on database
- [ ] Configure firewall (only expose reverse proxy)
- [ ] Set `FLASK_ENV=production`
- [ ] Configure client certificate auth (if needed)
- [ ] Set `FLASKPASTE_PROXY_SECRET` for defense-in-depth
- [ ] Configure proxy to send `X-Proxy-Secret` header
- [ ] Configure proxy to pass/generate `X-Request-ID`
- [ ] Set up log rotation
- [ ] Enable automatic backups
- [ ] Monitor disk usage (paste storage)

13
requirements.txt Normal file
View File

@@ -0,0 +1,13 @@
flask>=3.0
# Security & Rate Limiting
flask-limiter>=3.5
flask-cors>=4.0
# Observability
prometheus-flask-exporter>=0.23
# Development/Testing
pytest>=8.0
pytest-cov>=4.0
locust>=2.20

8
run.py Executable file
View File

@@ -0,0 +1,8 @@
#!/usr/bin/env python3
"""Development server entry point."""
from app import create_app
if __name__ == "__main__":
app = create_app("development")
app.run(host="0.0.0.0", port=5000)

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# FlaskPaste test suite

77
tests/conftest.py Normal file
View File

@@ -0,0 +1,77 @@
"""Pytest fixtures for FlaskPaste tests."""
import pytest
from app import create_app
@pytest.fixture
def app():
"""Create application for testing."""
app = create_app("testing")
yield app
@pytest.fixture
def client(app):
"""Create test client."""
return app.test_client()
@pytest.fixture
def runner(app):
"""Create CLI runner."""
return app.test_cli_runner()
@pytest.fixture
def sample_text():
"""Sample text content for testing."""
return "Hello, FlaskPaste!"
@pytest.fixture
def sample_json():
"""Sample JSON payload for testing."""
return {"content": "Hello from JSON!"}
@pytest.fixture
def auth_header():
"""Valid authentication header."""
return {"X-SSL-Client-SHA1": "a" * 40}
@pytest.fixture
def other_auth_header():
"""Different valid authentication header."""
return {"X-SSL-Client-SHA1": "b" * 40}
@pytest.fixture
def png_bytes():
"""Minimal valid PNG bytes for testing."""
return (
b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01"
b"\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde\x00"
b"\x00\x00\x0cIDATx\x9cc\xf8\x0f\x00\x00\x01\x01\x00"
b"\x05\x18\xd8N\x00\x00\x00\x00IEND\xaeB`\x82"
)
@pytest.fixture
def jpeg_bytes():
"""Minimal JPEG magic bytes for testing."""
return b"\xff\xd8\xff\xe0\x00\x10JFIF\x00"
@pytest.fixture
def zip_bytes():
"""ZIP magic bytes for testing."""
return b"PK\x03\x04" + b"\x00" * 26
@pytest.fixture
def pdf_bytes():
"""PDF magic bytes for testing."""
return b"%PDF-1.4 test content"

263
tests/test_api.py Normal file
View File

@@ -0,0 +1,263 @@
"""Tests for FlaskPaste API endpoints."""
import json
class TestIndex:
"""Tests for GET / endpoint."""
def test_get_api_info(self, client):
"""GET / returns API information."""
response = client.get("/")
assert response.status_code == 200
data = json.loads(response.data)
assert data["name"] == "FlaskPaste"
assert data["version"] == "1.0.0"
assert "endpoints" in data
assert "usage" in data
def test_api_info_contains_endpoints(self, client):
"""API info lists all endpoints."""
response = client.get("/")
data = json.loads(response.data)
endpoints = data["endpoints"]
assert "GET /" in endpoints
assert "POST /" in endpoints
assert "GET /<id>" in endpoints
assert "GET /<id>/raw" in endpoints
assert "DELETE /<id>" in endpoints
class TestHealth:
"""Tests for GET /health endpoint."""
def test_health_endpoint_returns_ok(self, client):
"""Health endpoint returns healthy status."""
response = client.get("/health")
assert response.status_code == 200
data = json.loads(response.data)
assert data["status"] == "healthy"
assert data["database"] == "ok"
def test_health_endpoint_json_response(self, client):
"""Health endpoint returns JSON content type."""
response = client.get("/health")
assert "application/json" in response.content_type
class TestCreatePaste:
"""Tests for POST / endpoint."""
def test_create_paste_raw(self, client, sample_text):
"""Create paste with raw text body."""
response = client.post(
"/",
data=sample_text,
content_type="text/plain",
)
assert response.status_code == 201
data = json.loads(response.data)
assert "id" in data
assert len(data["id"]) == 12
assert data["mime_type"] == "text/plain"
assert "url" in data
assert "raw" in data
def test_create_paste_json(self, client, sample_json):
"""Create paste with JSON body."""
response = client.post(
"/",
data=json.dumps(sample_json),
content_type="application/json",
)
assert response.status_code == 201
data = json.loads(response.data)
assert "id" in data
assert data["mime_type"] == "text/plain"
def test_create_paste_binary(self, client, png_bytes):
"""Create paste with binary content detects MIME type."""
response = client.post(
"/",
data=png_bytes,
content_type="application/octet-stream",
)
assert response.status_code == 201
data = json.loads(response.data)
assert data["mime_type"] == "image/png"
def test_create_paste_empty_fails(self, client):
"""Create paste with empty content fails."""
response = client.post("/", data="")
assert response.status_code == 400
data = json.loads(response.data)
assert "error" in data
def test_create_paste_with_auth(self, client, sample_text, auth_header):
"""Create paste with authentication includes owner."""
response = client.post(
"/",
data=sample_text,
content_type="text/plain",
headers=auth_header,
)
assert response.status_code == 201
data = json.loads(response.data)
assert "owner" in data
assert data["owner"] == "a" * 40
def test_create_paste_invalid_auth_ignored(self, client, sample_text):
"""Invalid auth header is ignored (treated as anonymous)."""
response = client.post(
"/",
data=sample_text,
content_type="text/plain",
headers={"X-SSL-Client-SHA1": "invalid"},
)
assert response.status_code == 201
data = json.loads(response.data)
assert "owner" not in data
class TestGetPaste:
"""Tests for GET /<id> endpoint."""
def test_get_paste_metadata(self, client, sample_text):
"""Get paste returns metadata."""
create = client.post("/", data=sample_text, content_type="text/plain")
paste_id = json.loads(create.data)["id"]
response = client.get(f"/{paste_id}")
assert response.status_code == 200
data = json.loads(response.data)
assert data["id"] == paste_id
assert data["mime_type"] == "text/plain"
assert data["size"] == len(sample_text)
assert "created_at" in data
assert "raw" in data
def test_get_paste_not_found(self, client):
"""Get nonexistent paste returns 404."""
response = client.get("/abcd12345678")
assert response.status_code == 404
data = json.loads(response.data)
assert "error" in data
def test_get_paste_invalid_id(self, client):
"""Get paste with invalid ID returns 400."""
response = client.get("/invalid!")
assert response.status_code == 400
def test_get_paste_wrong_length_id(self, client):
"""Get paste with wrong length ID returns 400."""
response = client.get("/abc")
assert response.status_code == 400
class TestGetPasteRaw:
"""Tests for GET /<id>/raw endpoint."""
def test_get_paste_raw_text(self, client, sample_text):
"""Get raw paste returns content with correct MIME type."""
create = client.post("/", data=sample_text, content_type="text/plain")
paste_id = json.loads(create.data)["id"]
response = client.get(f"/{paste_id}/raw")
assert response.status_code == 200
assert response.data.decode("utf-8") == sample_text
assert response.content_type == "text/plain; charset=utf-8"
def test_get_paste_raw_binary(self, client, png_bytes):
"""Get raw binary paste returns correct content."""
create = client.post(
"/",
data=png_bytes,
content_type="application/octet-stream",
)
paste_id = json.loads(create.data)["id"]
response = client.get(f"/{paste_id}/raw")
assert response.status_code == 200
assert response.data == png_bytes
assert response.content_type == "image/png"
def test_get_paste_raw_not_found(self, client):
"""Get raw nonexistent paste returns 404."""
response = client.get("/abcd12345678/raw")
assert response.status_code == 404
def test_get_paste_raw_inline_disposition(self, client, sample_text):
"""Text and image pastes have inline disposition."""
create = client.post("/", data=sample_text, content_type="text/plain")
paste_id = json.loads(create.data)["id"]
response = client.get(f"/{paste_id}/raw")
assert response.headers.get("Content-Disposition") == "inline"
class TestDeletePaste:
"""Tests for DELETE /<id> endpoint."""
def test_delete_paste_success(self, client, sample_text, auth_header):
"""Delete owned paste succeeds."""
create = client.post(
"/",
data=sample_text,
content_type="text/plain",
headers=auth_header,
)
paste_id = json.loads(create.data)["id"]
response = client.delete(f"/{paste_id}", headers=auth_header)
assert response.status_code == 200
data = json.loads(response.data)
assert "message" in data
# Verify deletion
get_response = client.get(f"/{paste_id}")
assert get_response.status_code == 404
def test_delete_paste_no_auth(self, client, sample_text):
"""Delete without authentication fails."""
create = client.post("/", data=sample_text, content_type="text/plain")
paste_id = json.loads(create.data)["id"]
response = client.delete(f"/{paste_id}")
assert response.status_code == 401
data = json.loads(response.data)
assert "error" in data
def test_delete_paste_wrong_owner(
self, client, sample_text, auth_header, other_auth_header
):
"""Delete paste owned by another user fails."""
create = client.post(
"/",
data=sample_text,
content_type="text/plain",
headers=auth_header,
)
paste_id = json.loads(create.data)["id"]
response = client.delete(f"/{paste_id}", headers=other_auth_header)
assert response.status_code == 403
data = json.loads(response.data)
assert "error" in data
def test_delete_anonymous_paste_fails(self, client, sample_text, auth_header):
"""Cannot delete anonymous paste (no owner)."""
create = client.post("/", data=sample_text, content_type="text/plain")
paste_id = json.loads(create.data)["id"]
response = client.delete(f"/{paste_id}", headers=auth_header)
assert response.status_code == 403
def test_delete_paste_not_found(self, client, auth_header):
"""Delete nonexistent paste returns 404."""
response = client.delete("/abcd12345678", headers=auth_header)
assert response.status_code == 404
def test_delete_paste_invalid_id(self, client, auth_header):
"""Delete with invalid ID returns 400."""
response = client.delete("/invalid!", headers=auth_header)
assert response.status_code == 400

90
tests/test_database.py Normal file
View File

@@ -0,0 +1,90 @@
"""Tests for database operations."""
import json
import time
from unittest.mock import patch
class TestDatabaseOperations:
"""Tests for database functionality."""
def test_paste_persists(self, client, sample_text):
"""Paste persists in database and can be retrieved."""
create = client.post("/", data=sample_text, content_type="text/plain")
paste_id = json.loads(create.data)["id"]
response = client.get(f"/{paste_id}/raw")
assert response.data.decode("utf-8") == sample_text
def test_multiple_pastes_independent(self, client):
"""Multiple pastes have unique IDs and content."""
create1 = client.post("/", data="paste one", content_type="text/plain")
create2 = client.post("/", data="paste two", content_type="text/plain")
id1 = json.loads(create1.data)["id"]
id2 = json.loads(create2.data)["id"]
assert id1 != id2
raw1 = client.get(f"/{id1}/raw")
raw2 = client.get(f"/{id2}/raw")
assert raw1.data.decode("utf-8") == "paste one"
assert raw2.data.decode("utf-8") == "paste two"
def test_last_accessed_updated_on_get(self, client, sample_text):
"""Last accessed timestamp updates on retrieval."""
create = client.post("/", data=sample_text, content_type="text/plain")
paste_id = json.loads(create.data)["id"]
# Get the paste twice with a small delay
client.get(f"/{paste_id}")
first_access = time.time()
time.sleep(0.1)
client.get(f"/{paste_id}")
second_access = time.time()
# Timestamps should be close to current time (within 2 seconds)
assert second_access > first_access
class TestCleanupExpiredPastes:
"""Tests for paste expiry and cleanup."""
def test_expired_paste_cleaned_up(self, app, client, sample_text):
"""Expired pastes are removed by cleanup."""
from app.database import cleanup_expired_pastes
# Create a paste
create = client.post("/", data=sample_text, content_type="text/plain")
paste_id = json.loads(create.data)["id"]
# Verify it exists
assert client.get(f"/{paste_id}").status_code == 200
# Mock time to simulate expiry (paste expiry + 1 second)
future_time = time.time() + app.config["PASTE_EXPIRY_SECONDS"] + 1
with patch("time.time", return_value=future_time):
with app.app_context():
deleted = cleanup_expired_pastes()
assert deleted >= 1
# Paste should now be gone
assert client.get(f"/{paste_id}").status_code == 404
def test_non_expired_paste_kept(self, app, client, sample_text):
"""Non-expired pastes are preserved by cleanup."""
from app.database import cleanup_expired_pastes
create = client.post("/", data=sample_text, content_type="text/plain")
paste_id = json.loads(create.data)["id"]
with app.app_context():
deleted = cleanup_expired_pastes()
assert deleted == 0
assert client.get(f"/{paste_id}").status_code == 200

View File

@@ -0,0 +1,95 @@
"""Tests for MIME type detection."""
import json
class TestMimeDetection:
"""Tests for automatic MIME type detection."""
def test_detect_png(self, client, png_bytes):
"""Detect PNG from magic bytes."""
response = client.post("/", data=png_bytes)
data = json.loads(response.data)
assert data["mime_type"] == "image/png"
def test_detect_jpeg(self, client, jpeg_bytes):
"""Detect JPEG from magic bytes."""
response = client.post("/", data=jpeg_bytes)
data = json.loads(response.data)
assert data["mime_type"] == "image/jpeg"
def test_detect_zip(self, client, zip_bytes):
"""Detect ZIP from magic bytes."""
response = client.post("/", data=zip_bytes)
data = json.loads(response.data)
assert data["mime_type"] == "application/zip"
def test_detect_pdf(self, client, pdf_bytes):
"""Detect PDF from magic bytes."""
response = client.post("/", data=pdf_bytes)
data = json.loads(response.data)
assert data["mime_type"] == "application/pdf"
def test_detect_gif87a(self, client):
"""Detect GIF87a from magic bytes."""
response = client.post("/", data=b"GIF87a" + b"\x00" * 10)
data = json.loads(response.data)
assert data["mime_type"] == "image/gif"
def test_detect_gif89a(self, client):
"""Detect GIF89a from magic bytes."""
response = client.post("/", data=b"GIF89a" + b"\x00" * 10)
data = json.loads(response.data)
assert data["mime_type"] == "image/gif"
def test_detect_gzip(self, client):
"""Detect GZIP from magic bytes."""
response = client.post("/", data=b"\x1f\x8b\x08" + b"\x00" * 10)
data = json.loads(response.data)
assert data["mime_type"] == "application/gzip"
def test_detect_utf8_text(self, client):
"""UTF-8 text defaults to text/plain."""
response = client.post("/", data="Hello, world! 你好")
data = json.loads(response.data)
assert data["mime_type"] == "text/plain"
def test_detect_binary_fallback(self, client):
"""Non-UTF8 binary without magic falls back to octet-stream."""
response = client.post("/", data=b"\x80\x81\x82\x83\x84")
data = json.loads(response.data)
assert data["mime_type"] == "application/octet-stream"
def test_explicit_content_type_honored(self, client):
"""Explicit Content-Type is honored for non-generic types."""
response = client.post(
"/",
data="<html><body>test</body></html>",
content_type="text/html",
)
data = json.loads(response.data)
assert data["mime_type"] == "text/html"
def test_generic_content_type_overridden(self, client, png_bytes):
"""Generic Content-Type is overridden by magic detection."""
response = client.post(
"/",
data=png_bytes,
content_type="application/octet-stream",
)
data = json.loads(response.data)
assert data["mime_type"] == "image/png"
def test_webp_detection(self, client):
"""Detect WebP from RIFF...WEBP magic."""
webp_header = b"RIFF\x00\x00\x00\x00WEBP"
response = client.post("/", data=webp_header + b"\x00" * 20)
data = json.loads(response.data)
assert data["mime_type"] == "image/webp"
def test_riff_non_webp_not_detected(self, client):
"""RIFF without WEBP marker is not detected as WebP."""
riff_other = b"RIFF\x00\x00\x00\x00WAVE"
response = client.post("/", data=riff_other + b"\x00" * 20)
data = json.loads(response.data)
assert data["mime_type"] != "image/webp"

325
tests/test_security.py Normal file
View File

@@ -0,0 +1,325 @@
"""Security-focused tests for FlaskPaste."""
import json
class TestSecurityHeaders:
"""Tests for security headers."""
def test_x_content_type_options(self, client):
"""X-Content-Type-Options header is set."""
response = client.get("/")
assert response.headers.get("X-Content-Type-Options") == "nosniff"
def test_x_frame_options(self, client):
"""X-Frame-Options header is set."""
response = client.get("/")
assert response.headers.get("X-Frame-Options") == "DENY"
def test_x_xss_protection(self, client):
"""X-XSS-Protection header is set."""
response = client.get("/")
assert response.headers.get("X-XSS-Protection") == "1; mode=block"
def test_content_security_policy(self, client):
"""Content-Security-Policy header is set."""
response = client.get("/")
csp = response.headers.get("Content-Security-Policy")
assert csp is not None
assert "default-src 'none'" in csp
def test_referrer_policy(self, client):
"""Referrer-Policy header is set."""
response = client.get("/")
assert response.headers.get("Referrer-Policy") == "strict-origin-when-cross-origin"
def test_permissions_policy(self, client):
"""Permissions-Policy header is set."""
response = client.get("/")
assert response.headers.get("Permissions-Policy") is not None
def test_security_headers_on_error_responses(self, client):
"""Security headers are present on error responses."""
response = client.get("/nonexist1234") # 12 chars, valid format but not found
assert response.headers.get("X-Content-Type-Options") == "nosniff"
assert response.headers.get("X-Frame-Options") == "DENY"
def test_hsts_header(self, client):
"""Strict-Transport-Security header is set."""
response = client.get("/")
hsts = response.headers.get("Strict-Transport-Security")
assert hsts is not None
assert "max-age=" in hsts
assert "includeSubDomains" in hsts
def test_cache_control_header(self, client):
"""Cache-Control header prevents caching of sensitive data."""
response = client.get("/")
cache = response.headers.get("Cache-Control")
assert cache is not None
assert "no-store" in cache
assert "private" in cache
def test_pragma_header(self, client):
"""Pragma header set for HTTP/1.0 compatibility."""
response = client.get("/")
assert response.headers.get("Pragma") == "no-cache"
class TestRequestIdTracking:
"""Tests for X-Request-ID tracking."""
def test_request_id_generated(self, client):
"""Request ID is generated when not provided."""
response = client.get("/")
request_id = response.headers.get("X-Request-ID")
assert request_id is not None
# Should be a valid UUID format
assert len(request_id) == 36
assert request_id.count("-") == 4
def test_request_id_passed_through(self, client):
"""Request ID from client is echoed back."""
custom_id = "my-custom-request-id-12345"
response = client.get("/", headers={"X-Request-ID": custom_id})
assert response.headers.get("X-Request-ID") == custom_id
def test_request_id_on_error_responses(self, client):
"""Request ID is present on error responses."""
custom_id = "error-request-id-67890"
response = client.get("/nonexist1234", headers={"X-Request-ID": custom_id})
assert response.headers.get("X-Request-ID") == custom_id
class TestProxyTrustValidation:
"""Tests for reverse proxy trust validation."""
def test_auth_works_without_proxy_secret_configured(self, client, sample_text, auth_header):
"""Auth header trusted when no proxy secret is configured (default)."""
response = client.post(
"/",
data=sample_text,
content_type="text/plain",
headers=auth_header,
)
assert response.status_code == 201
import json
data = json.loads(response.data)
assert "owner" in data
def test_auth_ignored_with_wrong_proxy_secret(self, app, client, sample_text, auth_header):
"""Auth header ignored when proxy secret doesn't match."""
# Configure a proxy secret
app.config["TRUSTED_PROXY_SECRET"] = "correct-secret-value"
try:
# Request with wrong secret - auth should be ignored
headers = dict(auth_header)
headers["X-Proxy-Secret"] = "wrong-secret"
response = client.post(
"/",
data=sample_text,
content_type="text/plain",
headers=headers,
)
assert response.status_code == 201
import json
data = json.loads(response.data)
# Owner should NOT be set because proxy wasn't trusted
assert "owner" not in data
finally:
# Reset config
app.config["TRUSTED_PROXY_SECRET"] = ""
def test_auth_works_with_correct_proxy_secret(self, app, client, sample_text, auth_header):
"""Auth header trusted when proxy secret matches."""
# Configure a proxy secret
app.config["TRUSTED_PROXY_SECRET"] = "correct-secret-value"
try:
# Request with correct secret - auth should work
headers = dict(auth_header)
headers["X-Proxy-Secret"] = "correct-secret-value"
response = client.post(
"/",
data=sample_text,
content_type="text/plain",
headers=headers,
)
assert response.status_code == 201
import json
data = json.loads(response.data)
assert "owner" in data
finally:
# Reset config
app.config["TRUSTED_PROXY_SECRET"] = ""
class TestInputValidation:
"""Tests for input validation and sanitization."""
def test_paste_id_hex_only(self, client):
"""Paste IDs must be hexadecimal."""
# IDs that match the route pattern but fail validation (12 chars)
invalid_ids = [
"ABCD12345678", # uppercase
"abcd-2345678", # dash
"abcd_2345678", # underscore
"abcd<>345678", # angle brackets
]
for invalid_id in invalid_ids:
response = client.get(f"/{invalid_id}")
assert response.status_code == 400, f"Expected 400 for ID: {invalid_id}"
# IDs with slashes are routed differently (404 from Flask routing)
slash_ids = ["abcd/2345678", "../abcd12345"]
for invalid_id in slash_ids:
response = client.get(f"/{invalid_id}")
assert response.status_code in (400, 404), f"Unexpected for ID: {invalid_id}"
def test_paste_id_length_enforced(self, client):
"""Paste IDs must be exactly the configured length (12 chars)."""
too_short = "abcd1234" # 8 chars
too_long = "abcdef1234567890abcd" # 20 chars
assert client.get(f"/{too_short}").status_code == 400
assert client.get(f"/{too_long}").status_code == 400
def test_auth_header_format_validated(self, client, sample_text):
"""Auth header must be valid SHA1 format."""
invalid_headers = [
"a" * 39, # too short
"a" * 41, # too long
"g" * 40, # invalid hex
"A" * 40, # uppercase (should be lowercased internally)
"a-a" * 13 + "a", # dashes
]
for invalid in invalid_headers:
response = client.post(
"/",
data=sample_text,
content_type="text/plain",
headers={"X-SSL-Client-SHA1": invalid},
)
# Invalid auth is ignored, not rejected (treated as anonymous)
data = json.loads(response.data)
assert "owner" not in data or data.get("owner") != invalid
def test_mime_type_sanitized(self, client):
"""MIME types are sanitized to prevent injection."""
# Flask/Werkzeug rejects newlines in headers at the framework level
# Test that MIME type parameters are stripped to base type
response = client.post(
"/",
data="test content",
content_type="text/plain; charset=utf-8; boundary=something",
)
data = json.loads(response.data)
# Should be sanitized to just the base MIME type
assert data["mime_type"] == "text/plain"
assert "charset" not in data["mime_type"]
assert "boundary" not in data["mime_type"]
class TestSizeLimits:
"""Tests for paste size limits."""
def test_anonymous_size_limit(self, app, client):
"""Anonymous pastes are limited in size."""
max_size = app.config["MAX_PASTE_SIZE_ANON"]
oversized = "x" * (max_size + 1)
response = client.post("/", data=oversized, content_type="text/plain")
assert response.status_code == 413
data = json.loads(response.data)
assert "error" in data
assert data["authenticated"] is False
def test_authenticated_larger_limit(self, app, client, auth_header):
"""Authenticated users have larger size limit."""
anon_max = app.config["MAX_PASTE_SIZE_ANON"]
auth_max = app.config["MAX_PASTE_SIZE_AUTH"]
# Size that exceeds anon but within auth limit
# (only test if auth limit is larger)
if auth_max > anon_max:
size = anon_max + 100
content = "x" * size
response = client.post(
"/",
data=content,
content_type="text/plain",
headers=auth_header,
)
assert response.status_code == 201
class TestOwnershipEnforcement:
"""Tests for paste ownership and access control."""
def test_cannot_delete_others_paste(
self, client, sample_text, auth_header, other_auth_header
):
"""Users cannot delete pastes they don't own."""
create = client.post(
"/",
data=sample_text,
content_type="text/plain",
headers=auth_header,
)
paste_id = json.loads(create.data)["id"]
response = client.delete(f"/{paste_id}", headers=other_auth_header)
assert response.status_code == 403
# Paste should still exist
assert client.get(f"/{paste_id}").status_code == 200
def test_anonymous_paste_undeletable(self, client, sample_text, auth_header):
"""Anonymous pastes cannot be deleted by anyone."""
create = client.post("/", data=sample_text, content_type="text/plain")
paste_id = json.loads(create.data)["id"]
response = client.delete(f"/{paste_id}", headers=auth_header)
assert response.status_code == 403
def test_owner_can_delete(self, client, sample_text, auth_header):
"""Owners can delete their own pastes."""
create = client.post(
"/",
data=sample_text,
content_type="text/plain",
headers=auth_header,
)
paste_id = json.loads(create.data)["id"]
response = client.delete(f"/{paste_id}", headers=auth_header)
assert response.status_code == 200
class TestJsonResponses:
"""Tests for JSON response format and encoding."""
def test_json_content_type(self, client):
"""API responses have correct Content-Type."""
response = client.get("/")
assert "application/json" in response.content_type
def test_unicode_in_response(self, client):
"""Unicode content is properly encoded in responses."""
unicode_text = "Hello 你好 مرحبا 🎉"
create = client.post("/", data=unicode_text.encode("utf-8"))
assert create.status_code == 201
paste_id = json.loads(create.data)["id"]
raw = client.get(f"/{paste_id}/raw")
assert raw.data.decode("utf-8") == unicode_text
def test_error_responses_are_json(self, client):
"""Error responses are valid JSON."""
response = client.get("/nonexist1234") # 12 chars
assert response.status_code in (400, 404)
data = json.loads(response.data)
assert "error" in data

5
wsgi.py Normal file
View File

@@ -0,0 +1,5 @@
"""WSGI entry point for production servers."""
from app import create_app
application = create_app("production")