commit 8f9868f0d9c8631b95447a2ec3ee71e7b8aff194 Author: Username Date: Tue Dec 16 04:42:18 2025 +0100 flaskpaste: initial commit with security hardening Features: - REST API for text/binary pastes with MIME detection - Client certificate auth via X-SSL-Client-SHA1 header - SQLite with WAL mode for concurrent access - Automatic paste expiry with LRU cleanup Security: - HSTS, CSP, X-Frame-Options, X-Content-Type-Options - Cache-Control: no-store for sensitive responses - X-Request-ID tracing for log correlation - X-Proxy-Secret validation for defense-in-depth - Parameterized queries, input validation - Size limits (3 MiB anon, 50 MiB auth) Includes /health endpoint, container support, and 70 tests. diff --git a/.containerignore b/.containerignore new file mode 100644 index 0000000..a0b5449 --- /dev/null +++ b/.containerignore @@ -0,0 +1,43 @@ +# Git +.git +.gitignore + +# Python +__pycache__ +*.py[cod] +*$py.class +*.so +.Python +venv/ +.venv/ +ENV/ +env/ +*.egg-info/ +.eggs/ +dist/ +build/ + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ +tests/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Data (use volumes instead) +data/ + +# Documentation (not needed in container) +documentation/ +*.md +!requirements.txt + +# Misc +.DS_Store +*.log diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..430c7d6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,48 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +venv/ +ENV/ +.venv/ + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Data +data/*.db +data/*.db-journal +data/*.db-wal +data/*.db-shm + +# Secrets +.credentials +.credentials.json +*.pem +*.key + +# Build +dist/ +build/ +*.egg-info/ + +# OS +.DS_Store +Thumbs.db + +# Private +.claude/ +CLAUDE.md +CRUSH.md +PERSONAE diff --git a/Containerfile b/Containerfile new file mode 100644 index 0000000..b3f2c58 --- /dev/null +++ b/Containerfile @@ -0,0 +1,43 @@ +# FlaskPaste Container Image +# Build: podman build -t flaskpaste . +# Run: podman run -d -p 5000:5000 -v flaskpaste-data:/app/data flaskpaste + +FROM python:3.11-slim + +LABEL maintainer="FlaskPaste" +LABEL description="Lightweight secure pastebin REST API" + +# Create non-root user +RUN groupadd -r flaskpaste && useradd -r -g flaskpaste flaskpaste + +# Set working directory +WORKDIR /app + +# Install dependencies first (cache layer) +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt gunicorn + +# Copy application code +COPY app/ ./app/ +COPY wsgi.py . + +# Create data directory +RUN mkdir -p /app/data && chown -R flaskpaste:flaskpaste /app + +# Switch to non-root user +USER flaskpaste + +# Environment defaults +ENV FLASK_ENV=production +ENV FLASKPASTE_DB=/app/data/pastes.db +ENV PYTHONUNBUFFERED=1 + +# Expose port +EXPOSE 5000 + +# Health check +HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \ + CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:5000/health')" || exit 1 + +# Run with gunicorn +CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "2", "--access-logfile", "-", "wsgi:application"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..978b364 --- /dev/null +++ b/README.md @@ -0,0 +1,173 @@ +# FlaskPaste + +A lightweight, secure pastebin REST API built with Flask. + +## Features + +- **Simple REST API** - Create, retrieve, and delete pastes via HTTP +- **Binary support** - Upload text, images, archives, and other binary content +- **Automatic MIME detection** - Magic byte detection for common formats (PNG, JPEG, GIF, WebP, ZIP, PDF, GZIP) +- **Client certificate authentication** - Optional auth via `X-SSL-Client-SHA1` header +- **Automatic expiry** - Pastes expire after configurable period of inactivity +- **Size limits** - Configurable limits for anonymous and authenticated users +- **Security headers** - HSTS, CSP, X-Frame-Options, Cache-Control, and more +- **Request tracing** - X-Request-ID support for log correlation +- **Proxy trust validation** - Optional shared secret for defense-in-depth +- **Minimal dependencies** - Flask only, SQLite built-in + +## Quick Start + +```bash +# Clone and setup +git clone +cd flaskpaste +python3 -m venv venv +source venv/bin/activate +pip install -r requirements.txt + +# Run development server +python run.py +``` + +## API Endpoints + +| Method | Endpoint | Description | +|--------|----------|-------------| +| `GET /` | API information and usage | +| `GET /health` | Health check (returns DB status) | +| `POST /` | Create a new paste | +| `GET /` | Retrieve paste metadata | +| `GET //raw` | Retrieve raw paste content | +| `DELETE /` | Delete paste (requires auth) | + +## Usage Examples + +### Create a paste (raw text) +```bash +curl --data-binary @file.txt http://localhost:5000/ +``` + +### Create a paste (piped input) +```bash +echo "Hello, World!" | curl --data-binary @- http://localhost:5000/ +``` + +### Create a paste (JSON) +```bash +curl -H "Content-Type: application/json" \ + -d '{"content":"Hello from JSON!"}' \ + http://localhost:5000/ +``` + +### Retrieve paste metadata +```bash +curl http://localhost:5000/abc12345 +``` + +### Retrieve raw content +```bash +curl http://localhost:5000/abc12345/raw +``` + +### Delete a paste (requires authentication) +```bash +curl -X DELETE \ + -H "X-SSL-Client-SHA1: " \ + http://localhost:5000/abc12345 +``` + +## Configuration + +Configuration via environment variables: + +| Variable | Default | Description | +|----------|---------|-------------| +| `FLASK_ENV` | `development` | Environment (`development`, `production`, `testing`) | +| `FLASKPASTE_DB` | `./data/pastes.db` | SQLite database path | +| `FLASKPASTE_ID_LENGTH` | `12` | Paste ID length (hex characters) | +| `FLASKPASTE_MAX_ANON` | `3145728` (3 MiB) | Max paste size for anonymous users | +| `FLASKPASTE_MAX_AUTH` | `52428800` (50 MiB) | Max paste size for authenticated users | +| `FLASKPASTE_EXPIRY` | `432000` (5 days) | Paste expiry in seconds | +| `FLASKPASTE_PROXY_SECRET` | (empty) | Shared secret for proxy trust validation | + +## Authentication + +FlaskPaste uses client certificate authentication. When deployed behind a reverse proxy (nginx, Apache), configure the proxy to: + +1. Terminate TLS and validate client certificates +2. Extract the certificate SHA1 fingerprint +3. Pass it via the `X-SSL-Client-SHA1` header + +Example nginx configuration: +```nginx +location / { + proxy_pass http://127.0.0.1:5000; + proxy_set_header X-SSL-Client-SHA1 $ssl_client_fingerprint; +} +``` + +Authenticated users can: +- Upload larger pastes (50 MiB vs 3 MiB) +- Delete their own pastes + +## Production Deployment + +### Using Gunicorn +```bash +pip install gunicorn +gunicorn -w 4 -b 0.0.0.0:5000 wsgi:app +``` + +### Using Podman/Docker +```bash +podman build -t flaskpaste . +podman run -d -p 5000:5000 -v flaskpaste-data:/app/data flaskpaste +``` + +See `Containerfile` for container build configuration. + +## Development + +### Running Tests +```bash +pip install pytest pytest-cov +pytest tests/ -v +``` + +### Test Coverage +```bash +pytest tests/ --cov=app --cov-report=term-missing +``` + +### Project Structure +``` +flaskpaste/ +├── app/ +│ ├── __init__.py # Flask app factory +│ ├── config.py # Configuration classes +│ ├── database.py # SQLite management +│ └── api/ +│ ├── __init__.py # Blueprint setup +│ └── routes.py # API endpoints +├── tests/ # Test suite +├── data/ # SQLite database +├── run.py # Development server +├── wsgi.py # Production WSGI entry +├── Containerfile # Podman/Docker build +└── requirements.txt # Dependencies +``` + +## Security Considerations + +- **Input validation** - Paste IDs are hex-only, auth headers validated +- **MIME sanitization** - Content-Type headers are sanitized +- **SQL injection protection** - Parameterized queries throughout +- **Ownership enforcement** - Only owners can delete their pastes +- **Size limits** - Prevents resource exhaustion attacks +- **Security headers** - HSTS, CSP, X-Frame-Options, X-Content-Type-Options, Cache-Control +- **Request tracing** - X-Request-ID for log correlation and debugging +- **Proxy trust** - Optional `X-Proxy-Secret` validation to prevent header spoofing + +## License + +MIT diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..bc00524 --- /dev/null +++ b/app/__init__.py @@ -0,0 +1,225 @@ +"""Flask application factory.""" + +import logging +import os +import sys +import uuid + +from flask import Flask, Response, g, request + +from app.config import config + + +def setup_logging(app: Flask) -> None: + """Configure structured logging.""" + log_level = logging.DEBUG if app.debug else logging.INFO + log_format = ( + "%(asctime)s %(levelname)s [%(name)s] %(message)s" + if app.debug + else '{"time":"%(asctime)s","level":"%(levelname)s","logger":"%(name)s","message":"%(message)s"}' + ) + + logging.basicConfig( + level=log_level, + format=log_format, + stream=sys.stdout, + ) + + # Reduce noise from werkzeug in production + if not app.debug: + logging.getLogger("werkzeug").setLevel(logging.WARNING) + + app.logger.info("FlaskPaste starting", extra={"config": type(app.config).__name__}) + + +def setup_security_headers(app: Flask) -> None: + """Add security headers to all responses.""" + + @app.after_request + def add_security_headers(response: Response) -> Response: + # Prevent MIME type sniffing + response.headers["X-Content-Type-Options"] = "nosniff" + # Prevent clickjacking + response.headers["X-Frame-Options"] = "DENY" + # XSS protection (legacy but still useful) + response.headers["X-XSS-Protection"] = "1; mode=block" + # Referrer policy + response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" + # Content Security Policy (restrictive for API) + response.headers["Content-Security-Policy"] = "default-src 'none'; frame-ancestors 'none'" + # Permissions policy + response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()" + # HSTS - enforce HTTPS (1 year, include subdomains) + response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains" + # Prevent caching of sensitive paste data + response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, private" + response.headers["Pragma"] = "no-cache" + + return response + + +def setup_request_id(app: Flask) -> None: + """Add request ID tracking for log correlation and tracing.""" + + @app.before_request + def assign_request_id(): + # Use incoming X-Request-ID from proxy, or generate a new one + request_id = request.headers.get("X-Request-ID", "").strip() + if not request_id: + request_id = str(uuid.uuid4()) + g.request_id = request_id + + @app.after_request + def add_request_id_header(response: Response) -> Response: + # Echo request ID back to client for tracing + request_id = getattr(g, "request_id", None) + if request_id: + response.headers["X-Request-ID"] = request_id + + # Access logging with request ID + app.logger.info( + "%s %s %s [rid=%s]", + request.method, + request.path, + response.status_code, + request_id or "-", + ) + return response + + +def setup_error_handlers(app: Flask) -> None: + """Register global error handlers.""" + import json + + @app.errorhandler(400) + def bad_request(error): + app.logger.warning("Bad request: %s [rid=%s]", request.path, getattr(g, "request_id", "-")) + return Response( + json.dumps({"error": "Bad request"}), + status=400, + mimetype="application/json", + ) + + @app.errorhandler(404) + def not_found(error): + return Response( + json.dumps({"error": "Not found"}), + status=404, + mimetype="application/json", + ) + + @app.errorhandler(429) + def rate_limit_exceeded(error): + app.logger.warning( + "Rate limit exceeded: %s from %s [rid=%s]", + request.path, request.remote_addr, getattr(g, "request_id", "-") + ) + return Response( + json.dumps({"error": "Rate limit exceeded", "retry_after": error.description}), + status=429, + mimetype="application/json", + ) + + @app.errorhandler(500) + def internal_error(error): + app.logger.error( + "Internal error: %s - %s [rid=%s]", + request.path, str(error), getattr(g, "request_id", "-") + ) + return Response( + json.dumps({"error": "Internal server error"}), + status=500, + mimetype="application/json", + ) + + @app.errorhandler(Exception) + def handle_exception(error): + app.logger.exception( + "Unhandled exception: %s [rid=%s]", + str(error), getattr(g, "request_id", "-") + ) + return Response( + json.dumps({"error": "Internal server error"}), + status=500, + mimetype="application/json", + ) + + +def setup_rate_limiting(app: Flask) -> None: + """Configure rate limiting.""" + from flask_limiter import Limiter + from flask_limiter.util import get_remote_address + + limiter = Limiter( + key_func=get_remote_address, + app=app, + default_limits=["200 per day", "60 per hour"], + storage_uri="memory://", + strategy="fixed-window", + ) + + # Store limiter on app for use in routes + app.extensions["limiter"] = limiter + + return limiter + + +def setup_metrics(app: Flask) -> None: + """Configure Prometheus metrics.""" + # Only enable metrics in production + if app.config.get("TESTING"): + return + + try: + from prometheus_flask_exporter import PrometheusMetrics + + metrics = PrometheusMetrics(app) + + # Add app info + metrics.info("flaskpaste_info", "FlaskPaste application info", version="1.0.0") + + app.extensions["metrics"] = metrics + except ImportError: + app.logger.warning("prometheus_flask_exporter not available, metrics disabled") + + +def create_app(config_name: str | None = None) -> Flask: + """Create and configure the Flask application.""" + if config_name is None: + config_name = os.environ.get("FLASK_ENV", "default") + + app = Flask(__name__) + app.config.from_object(config[config_name]) + + # Setup logging first + setup_logging(app) + + # Setup request ID tracking + setup_request_id(app) + + # Setup security headers + setup_security_headers(app) + + # Setup error handlers + setup_error_handlers(app) + + # Setup rate limiting (skip in testing) + if not app.config.get("TESTING"): + setup_rate_limiting(app) + + # Setup metrics (skip in testing) + setup_metrics(app) + + # Initialize database + from app import database + + database.init_app(app) + + # Register blueprints + from app.api import bp as api_bp + + app.register_blueprint(api_bp) + + app.logger.info("FlaskPaste initialized successfully") + + return app diff --git a/app/api/__init__.py b/app/api/__init__.py new file mode 100644 index 0000000..8038871 --- /dev/null +++ b/app/api/__init__.py @@ -0,0 +1,32 @@ +"""API blueprint registration.""" + +import time + +from flask import Blueprint, current_app + +bp = Blueprint("api", __name__) + +# Throttle cleanup to run at most once per hour +_last_cleanup = 0 +_CLEANUP_INTERVAL = 3600 # 1 hour + + +@bp.before_request +def cleanup_expired(): + """Periodically clean up expired pastes.""" + global _last_cleanup + + now = time.time() + if now - _last_cleanup < _CLEANUP_INTERVAL: + return + + _last_cleanup = now + + from app.database import cleanup_expired_pastes + + count = cleanup_expired_pastes() + if count > 0: + current_app.logger.info(f"Cleaned up {count} expired paste(s)") + + +from app.api import routes # noqa: E402, F401 diff --git a/app/api/routes.py b/app/api/routes.py new file mode 100644 index 0000000..cec73c3 --- /dev/null +++ b/app/api/routes.py @@ -0,0 +1,319 @@ +"""API route handlers.""" + +import hashlib +import hmac +import json +import re +import time + +from flask import Response, current_app, request + +from app.api import bp +from app.database import get_db + +# Valid paste ID pattern (hexadecimal only) +PASTE_ID_PATTERN = re.compile(r"^[a-f0-9]+$") + +# Valid client certificate SHA1 pattern (40 hex chars) +CLIENT_ID_PATTERN = re.compile(r"^[a-f0-9]{40}$") + +# Magic bytes for common binary formats +MAGIC_SIGNATURES = { + b"\x89PNG\r\n\x1a\n": "image/png", + b"\xff\xd8\xff": "image/jpeg", + b"GIF87a": "image/gif", + b"GIF89a": "image/gif", + b"RIFF": "image/webp", # WebP (check for WEBP after RIFF) + b"PK\x03\x04": "application/zip", + b"%PDF": "application/pdf", + b"\x1f\x8b": "application/gzip", +} + + +def _is_valid_paste_id(paste_id: str) -> bool: + """Validate paste ID format (hexadecimal, correct length).""" + expected_length = current_app.config["PASTE_ID_LENGTH"] + return ( + len(paste_id) == expected_length + and PASTE_ID_PATTERN.match(paste_id) is not None + ) + + +def _detect_mime_type(content: bytes, content_type: str | None = None) -> str: + """Detect MIME type from content bytes, with magic byte detection taking priority.""" + # Check magic bytes first - most reliable method + for magic, mime in MAGIC_SIGNATURES.items(): + if content.startswith(magic): + # Special case for WebP (RIFF....WEBP) + if magic == b"RIFF" and len(content) >= 12: + if content[8:12] != b"WEBP": + continue + return mime + + # Trust explicit Content-Type if it's specific (not generic defaults) + generic_types = { + "application/octet-stream", + "application/x-www-form-urlencoded", + "text/plain", + } + if content_type: + mime = content_type.split(";")[0].strip().lower() + if mime not in generic_types: + # Sanitize: only allow safe characters in MIME type + if re.match(r"^[a-z0-9][a-z0-9!#$&\-^_.+]*\/[a-z0-9][a-z0-9!#$&\-^_.+]*$", mime): + return mime + + # Try to decode as UTF-8 text + try: + content.decode("utf-8") + return "text/plain" + except UnicodeDecodeError: + return "application/octet-stream" + + +def _generate_id(content: bytes) -> str: + """Generate a short unique ID from content hash and timestamp.""" + data = content + str(time.time_ns()).encode() + length = current_app.config["PASTE_ID_LENGTH"] + return hashlib.sha256(data).hexdigest()[:length] + + +def _json_response(data: dict, status: int = 200) -> Response: + """Create a JSON response with proper encoding and security headers.""" + response = Response( + json.dumps(data, ensure_ascii=False), + status=status, + mimetype="application/json", + ) + return response + + +def _is_trusted_proxy() -> bool: + """Verify request comes from a trusted reverse proxy. + + If TRUSTED_PROXY_SECRET is configured, the request must include a matching + X-Proxy-Secret header. This provides defense-in-depth against header spoofing + if an attacker bypasses the reverse proxy. + + Returns True if no secret is configured (backwards compatible) or if the + secret matches. + """ + expected_secret = current_app.config.get("TRUSTED_PROXY_SECRET", "") + if not expected_secret: + # No secret configured - trust all requests (backwards compatible) + return True + + # Constant-time comparison to prevent timing attacks + provided_secret = request.headers.get("X-Proxy-Secret", "") + return hmac.compare_digest(expected_secret, provided_secret) + + +def _get_client_id() -> str | None: + """Extract and validate client identity from X-SSL-Client-SHA1 header. + + Returns lowercase SHA1 fingerprint or None if not present/invalid. + + SECURITY: The X-SSL-Client-SHA1 header is only trusted if the request + comes from a trusted proxy (verified via X-Proxy-Secret if configured). + """ + # Verify request comes from trusted proxy before trusting auth headers + if not _is_trusted_proxy(): + current_app.logger.warning( + "Auth header ignored: X-Proxy-Secret mismatch from %s", + request.remote_addr + ) + return None + + client_sha1 = request.headers.get("X-SSL-Client-SHA1", "").strip().lower() + # Validate format: must be 40 hex characters (SHA1) + if client_sha1 and CLIENT_ID_PATTERN.match(client_sha1): + return client_sha1 + return None + + +@bp.route("/health", methods=["GET"]) +def health(): + """Health check endpoint for load balancers and monitoring.""" + try: + db = get_db() + db.execute("SELECT 1") + return _json_response({"status": "healthy", "database": "ok"}) + except Exception: + return _json_response({"status": "unhealthy", "database": "error"}, 503) + + +@bp.route("/", methods=["GET", "POST"]) +def index(): + """Handle API info (GET) and paste creation (POST).""" + if request.method == "POST": + return create_paste() + + return _json_response( + { + "name": "FlaskPaste", + "version": "1.0.0", + "endpoints": { + "GET /": "API information", + "GET /health": "Health check", + "POST /": "Create paste", + "GET /": "Retrieve paste metadata", + "GET //raw": "Retrieve raw paste content", + "DELETE /": "Delete paste", + }, + "usage": { + "raw": "curl --data-binary @file.txt http://host/", + "pipe": "cat file.txt | curl --data-binary @- http://host/", + "json": "curl -H 'Content-Type: application/json' -d '{\"content\":\"...\"}' http://host/", + }, + "note": "Use --data-binary (not -d) to preserve newlines", + } + ) + + +def create_paste(): + """Create a new paste from request body.""" + content: bytes | None = None + mime_type: str | None = None + + if request.is_json: + data = request.get_json(silent=True) + if data and isinstance(data.get("content"), str): + content = data["content"].encode("utf-8") + mime_type = "text/plain" + else: + content = request.get_data(as_text=False) + if content: + mime_type = _detect_mime_type(content, request.content_type) + + if not content: + return _json_response({"error": "No content provided"}, 400) + + owner = _get_client_id() + + # Enforce size limits based on authentication + content_size = len(content) + if owner: + max_size = current_app.config["MAX_PASTE_SIZE_AUTH"] + else: + max_size = current_app.config["MAX_PASTE_SIZE_ANON"] + + if content_size > max_size: + return _json_response({ + "error": "Paste too large", + "size": content_size, + "max_size": max_size, + "authenticated": owner is not None, + }, 413) + + paste_id = _generate_id(content) + now = int(time.time()) + + db = get_db() + db.execute( + "INSERT INTO pastes (id, content, mime_type, owner, created_at, last_accessed) VALUES (?, ?, ?, ?, ?, ?)", + (paste_id, content, mime_type, owner, now, now), + ) + db.commit() + + response_data = { + "id": paste_id, + "url": f"/{paste_id}", + "raw": f"/{paste_id}/raw", + "mime_type": mime_type, + "created_at": now, + } + if owner: + response_data["owner"] = owner + + return _json_response(response_data, 201) + + +@bp.route("/", methods=["GET"]) +def get_paste(paste_id: str): + """Retrieve paste metadata by ID.""" + if not _is_valid_paste_id(paste_id): + return _json_response({"error": "Invalid paste ID"}, 400) + + db = get_db() + now = int(time.time()) + + # Update last_accessed and return paste in one transaction + db.execute( + "UPDATE pastes SET last_accessed = ? WHERE id = ?", (now, paste_id) + ) + row = db.execute( + "SELECT id, mime_type, created_at, length(content) as size FROM pastes WHERE id = ?", + (paste_id,) + ).fetchone() + db.commit() + + if row is None: + return _json_response({"error": "Paste not found"}, 404) + + return _json_response({ + "id": row["id"], + "mime_type": row["mime_type"], + "size": row["size"], + "created_at": row["created_at"], + "raw": f"/{paste_id}/raw", + }) + + +@bp.route("//raw", methods=["GET"]) +def get_paste_raw(paste_id: str): + """Retrieve raw paste content with correct MIME type.""" + if not _is_valid_paste_id(paste_id): + return _json_response({"error": "Invalid paste ID"}, 400) + + db = get_db() + now = int(time.time()) + + # Update last_accessed and return paste in one transaction + db.execute( + "UPDATE pastes SET last_accessed = ? WHERE id = ?", (now, paste_id) + ) + row = db.execute( + "SELECT content, mime_type FROM pastes WHERE id = ?", (paste_id,) + ).fetchone() + db.commit() + + if row is None: + return _json_response({"error": "Paste not found"}, 404) + + mime_type = row["mime_type"] + + response = Response(row["content"], mimetype=mime_type) + # Display inline for images and text, let browser decide for others + if mime_type.startswith(("image/", "text/")): + response.headers["Content-Disposition"] = "inline" + + return response + + +@bp.route("/", methods=["DELETE"]) +def delete_paste(paste_id: str): + """Delete a paste by ID. Requires ownership via X-SSL-Client-SHA1 header.""" + if not _is_valid_paste_id(paste_id): + return _json_response({"error": "Invalid paste ID"}, 400) + + client_id = _get_client_id() + if not client_id: + return _json_response({"error": "Authentication required"}, 401) + + db = get_db() + + # Check paste exists and verify ownership + row = db.execute( + "SELECT owner FROM pastes WHERE id = ?", (paste_id,) + ).fetchone() + + if row is None: + return _json_response({"error": "Paste not found"}, 404) + + if row["owner"] != client_id: + return _json_response({"error": "Permission denied"}, 403) + + db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,)) + db.commit() + + return _json_response({"message": "Paste deleted"}) diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..f586728 --- /dev/null +++ b/app/config.py @@ -0,0 +1,56 @@ +"""Application configuration.""" + +import os +from pathlib import Path + + +class Config: + """Base configuration.""" + + BASE_DIR = Path(__file__).parent.parent + DATABASE = os.environ.get("FLASKPASTE_DB", BASE_DIR / "data" / "pastes.db") + PASTE_ID_LENGTH = int(os.environ.get("FLASKPASTE_ID_LENGTH", "12")) + + # Paste size limits + MAX_PASTE_SIZE_ANON = int(os.environ.get("FLASKPASTE_MAX_ANON", 3 * 1024 * 1024)) # 3MiB + MAX_PASTE_SIZE_AUTH = int(os.environ.get("FLASKPASTE_MAX_AUTH", 50 * 1024 * 1024)) # 50MiB + MAX_CONTENT_LENGTH = MAX_PASTE_SIZE_AUTH # Flask request limit + + # Paste expiry (default 5 days) + PASTE_EXPIRY_SECONDS = int(os.environ.get("FLASKPASTE_EXPIRY", 5 * 24 * 60 * 60)) + + # Reverse proxy trust configuration + # SECURITY: The X-SSL-Client-SHA1 header is trusted for authentication. + # This header MUST only come from a trusted reverse proxy that validates + # client certificates. Direct access to this app MUST be blocked. + # + # Set FLASKPASTE_PROXY_SECRET to require the proxy to send a matching + # X-Proxy-Secret header, providing defense-in-depth against header spoofing. + TRUSTED_PROXY_SECRET = os.environ.get("FLASKPASTE_PROXY_SECRET", "") + + +class DevelopmentConfig(Config): + """Development configuration.""" + + DEBUG = True + + +class ProductionConfig(Config): + """Production configuration.""" + + DEBUG = False + + +class TestingConfig(Config): + """Testing configuration.""" + + TESTING = True + DATABASE = ":memory:" + + +config = { + "development": DevelopmentConfig, + "production": ProductionConfig, + "testing": TestingConfig, + "default": DevelopmentConfig, +} diff --git a/app/database.py b/app/database.py new file mode 100644 index 0000000..4624be3 --- /dev/null +++ b/app/database.py @@ -0,0 +1,96 @@ +"""Database connection and schema management.""" + +import sqlite3 +import time +from pathlib import Path + +from flask import current_app, g + +SCHEMA = """ +CREATE TABLE IF NOT EXISTS pastes ( + id TEXT PRIMARY KEY, + content BLOB NOT NULL, + mime_type TEXT NOT NULL DEFAULT 'text/plain', + owner TEXT, + created_at INTEGER NOT NULL, + last_accessed INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_pastes_created_at ON pastes(created_at); +CREATE INDEX IF NOT EXISTS idx_pastes_owner ON pastes(owner); +CREATE INDEX IF NOT EXISTS idx_pastes_last_accessed ON pastes(last_accessed); +""" + +# Hold reference for in-memory shared cache databases +_memory_db_holder = None + + +def _get_connection_string(db_path) -> tuple[str, dict]: + """Get connection string and kwargs for sqlite3.connect.""" + if isinstance(db_path, Path): + db_path.parent.mkdir(parents=True, exist_ok=True) + return str(db_path), {} + if db_path == ":memory:": + return "file::memory:?cache=shared", {"uri": True} + return db_path, {} + + +def get_db() -> sqlite3.Connection: + """Get database connection for current request context.""" + if "db" not in g: + db_path = current_app.config["DATABASE"] + conn_str, kwargs = _get_connection_string(db_path) + g.db = sqlite3.connect(conn_str, **kwargs) + g.db.row_factory = sqlite3.Row + g.db.execute("PRAGMA foreign_keys = ON") + if isinstance(db_path, Path): + g.db.execute("PRAGMA journal_mode = WAL") + return g.db + + +def close_db(exception=None) -> None: + """Close database connection at end of request.""" + db = g.pop("db", None) + if db is not None: + db.close() + + +def init_db() -> None: + """Initialize database schema.""" + global _memory_db_holder + + db_path = current_app.config["DATABASE"] + conn_str, kwargs = _get_connection_string(db_path) + + # For in-memory databases, keep a connection alive + if db_path == ":memory:": + _memory_db_holder = sqlite3.connect(conn_str, **kwargs) + _memory_db_holder.executescript(SCHEMA) + _memory_db_holder.commit() + else: + db = get_db() + db.executescript(SCHEMA) + db.commit() + + +def cleanup_expired_pastes() -> int: + """Delete pastes that haven't been accessed within expiry period. + + Returns number of deleted pastes. + """ + expiry_seconds = current_app.config["PASTE_EXPIRY_SECONDS"] + cutoff = int(time.time()) - expiry_seconds + + db = get_db() + cursor = db.execute("DELETE FROM pastes WHERE last_accessed < ?", (cutoff,)) + db.commit() + + return cursor.rowcount + + +def init_app(app) -> None: + """Register database functions with Flask app.""" + app.teardown_appcontext(close_db) + + with app.app_context(): + init_db() diff --git a/compose.yaml b/compose.yaml new file mode 100644 index 0000000..dc8f88c --- /dev/null +++ b/compose.yaml @@ -0,0 +1,38 @@ +# FlaskPaste Container Compose +# Usage: podman-compose up -d +# Or: podman compose up -d + +services: + flaskpaste: + build: + context: . + dockerfile: Containerfile + container_name: flaskpaste + restart: unless-stopped + ports: + - "5000:5000" + volumes: + - flaskpaste-data:/app/data + environment: + - FLASK_ENV=production + - FLASKPASTE_EXPIRY=432000 # 5 days + - FLASKPASTE_MAX_ANON=3145728 # 3 MiB + - FLASKPASTE_MAX_AUTH=52428800 # 50 MiB + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:5000/health')"] + interval: 30s + timeout: 5s + retries: 3 + start_period: 5s + deploy: + resources: + limits: + cpus: '1.0' + memory: 256M + reservations: + cpus: '0.25' + memory: 64M + +volumes: + flaskpaste-data: + driver: local diff --git a/documentation/api.md b/documentation/api.md new file mode 100644 index 0000000..eae5f0e --- /dev/null +++ b/documentation/api.md @@ -0,0 +1,333 @@ +# FlaskPaste API Reference + +## Overview + +FlaskPaste provides a RESTful API for creating, retrieving, and deleting text and binary pastes. + +**Base URL:** `http://your-server:5000/` + +**Content Types:** +- Requests: `application/json`, `text/plain`, `application/octet-stream`, or any binary type +- Responses: `application/json` for metadata, original MIME type for raw content + +## Authentication + +Authentication is optional and uses client certificate fingerprints passed via the `X-SSL-Client-SHA1` header. + +```http +X-SSL-Client-SHA1: a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2 +``` + +The fingerprint must be exactly 40 lowercase hexadecimal characters (SHA1). + +**Benefits of authentication:** +- Larger upload limit (50 MiB vs 3 MiB) +- Ability to delete owned pastes + +--- + +## Endpoints + +### GET /health + +Health check endpoint for load balancers and monitoring. + +**Request:** +```http +GET /health HTTP/1.1 +Host: localhost:5000 +``` + +**Response (200 OK):** +```json +{ + "status": "healthy", + "database": "ok" +} +``` + +**Response (503 Service Unavailable):** +```json +{ + "status": "unhealthy", + "database": "error" +} +``` + +--- + +### GET / + +Returns API information and usage instructions. + +**Request:** +```http +GET / HTTP/1.1 +Host: localhost:5000 +``` + +**Response:** +```json +{ + "name": "FlaskPaste", + "version": "1.0.0", + "endpoints": { + "GET /": "API information", + "GET /health": "Health check", + "POST /": "Create paste", + "GET /": "Retrieve paste metadata", + "GET //raw": "Retrieve raw paste content", + "DELETE /": "Delete paste" + }, + "usage": { + "raw": "curl --data-binary @file.txt http://host/", + "pipe": "cat file.txt | curl --data-binary @- http://host/", + "json": "curl -H 'Content-Type: application/json' -d '{\"content\":\"...\"}' http://host/" + }, + "note": "Use --data-binary (not -d) to preserve newlines" +} +``` + +--- + +### POST / + +Create a new paste. + +**Request (Raw Binary):** +```http +POST / HTTP/1.1 +Host: localhost:5000 +Content-Type: application/octet-stream + + +``` + +**Request (JSON):** +```http +POST / HTTP/1.1 +Host: localhost:5000 +Content-Type: application/json + +{"content": "Hello, World!"} +``` + +**Response (201 Created):** +```json +{ + "id": "abc12345", + "url": "/abc12345", + "raw": "/abc12345/raw", + "mime_type": "text/plain", + "created_at": 1700000000, + "owner": "a1b2c3..." // Only present if authenticated +} +``` + +**Errors:** +| Code | Description | +|------|-------------| +| 400 | No content provided | +| 413 | Paste too large | + +**Size Limits:** +- Anonymous: 3 MiB (configurable via `FLASKPASTE_MAX_ANON`) +- Authenticated: 50 MiB (configurable via `FLASKPASTE_MAX_AUTH`) + +--- + +### GET /{id} + +Retrieve paste metadata. + +**Request:** +```http +GET /abc12345 HTTP/1.1 +Host: localhost:5000 +``` + +**Response (200 OK):** +```json +{ + "id": "abc12345", + "mime_type": "text/plain", + "size": 1234, + "created_at": 1700000000, + "raw": "/abc12345/raw" +} +``` + +**Errors:** +| Code | Description | +|------|-------------| +| 400 | Invalid paste ID format | +| 404 | Paste not found | + +--- + +### GET /{id}/raw + +Retrieve raw paste content with correct MIME type. + +**Request:** +```http +GET /abc12345/raw HTTP/1.1 +Host: localhost:5000 +``` + +**Response (200 OK):** +```http +HTTP/1.1 200 OK +Content-Type: image/png +Content-Disposition: inline + + +``` + +- `Content-Disposition: inline` is set for `image/*` and `text/*` types +- Content-Type matches the detected/stored MIME type + +**Errors:** +| Code | Description | +|------|-------------| +| 400 | Invalid paste ID format | +| 404 | Paste not found | + +--- + +### DELETE /{id} + +Delete a paste. Requires authentication and ownership. + +**Request:** +```http +DELETE /abc12345 HTTP/1.1 +Host: localhost:5000 +X-SSL-Client-SHA1: a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2 +``` + +**Response (200 OK):** +```json +{ + "message": "Paste deleted" +} +``` + +**Errors:** +| Code | Description | +|------|-------------| +| 400 | Invalid paste ID format | +| 401 | Authentication required | +| 403 | Permission denied (not owner) | +| 404 | Paste not found | + +--- + +## MIME Type Detection + +FlaskPaste automatically detects MIME types using: + +1. **Magic byte signatures** (highest priority) + - PNG: `\x89PNG\r\n\x1a\n` + - JPEG: `\xff\xd8\xff` + - GIF: `GIF87a` or `GIF89a` + - WebP: `RIFF....WEBP` + - ZIP: `PK\x03\x04` + - PDF: `%PDF` + - GZIP: `\x1f\x8b` + +2. **Explicit Content-Type header** (if not generic) + +3. **UTF-8 detection** (falls back to `text/plain`) + +4. **Binary fallback** (`application/octet-stream`) + +--- + +## Paste Expiry + +Pastes expire based on last access time (default: 5 days). + +- Every `GET /{id}` or `GET /{id}/raw` updates the last access timestamp +- Cleanup runs automatically (hourly, throttled) +- Configurable via `FLASKPASTE_EXPIRY` environment variable + +--- + +## Error Response Format + +All errors return JSON: + +```json +{ + "error": "Description of the error" +} +``` + +For size limit errors (413): +```json +{ + "error": "Paste too large", + "size": 5000000, + "max_size": 3145728, + "authenticated": false +} +``` + +--- + +## Security Headers + +All responses include the following security headers: + +| Header | Value | +|--------|-------| +| `X-Content-Type-Options` | `nosniff` | +| `X-Frame-Options` | `DENY` | +| `X-XSS-Protection` | `1; mode=block` | +| `Referrer-Policy` | `strict-origin-when-cross-origin` | +| `Content-Security-Policy` | `default-src 'none'; frame-ancestors 'none'` | +| `Permissions-Policy` | `geolocation=(), microphone=(), camera=()` | +| `Strict-Transport-Security` | `max-age=31536000; includeSubDomains` | +| `Cache-Control` | `no-store, no-cache, must-revalidate, private` | +| `Pragma` | `no-cache` | + +--- + +## Request Tracing + +All requests include an `X-Request-ID` header for log correlation: + +- If the client provides `X-Request-ID`, it is passed through +- If not provided, a UUID is generated +- The ID is echoed back in the response +- All log entries include `[rid=]` + +**Example:** +```bash +# Client-provided ID +curl -H "X-Request-ID: my-trace-123" https://paste.example.com/health + +# Response includes: +# X-Request-ID: my-trace-123 +``` + +--- + +## Proxy Trust Validation + +When `FLASKPASTE_PROXY_SECRET` is configured, the application validates that requests come from a trusted reverse proxy by checking the `X-Proxy-Secret` header. + +This provides defense-in-depth against header spoofing if an attacker bypasses the reverse proxy. + +**Configuration:** +```bash +export FLASKPASTE_PROXY_SECRET="your-secret-value" +``` + +**Proxy Configuration (HAProxy):** +``` +http-request set-header X-Proxy-Secret your-secret-value +``` + +If the secret doesn't match, authentication headers (`X-SSL-Client-SHA1`) are ignored and the request is treated as anonymous. diff --git a/documentation/deployment.md b/documentation/deployment.md new file mode 100644 index 0000000..194477f --- /dev/null +++ b/documentation/deployment.md @@ -0,0 +1,305 @@ +# FlaskPaste Deployment Guide + +## Overview + +FlaskPaste can be deployed in several ways: +- Development server (for testing only) +- WSGI server (Gunicorn, uWSGI) +- Container (Podman/Docker) + +## Prerequisites + +- Python 3.11+ +- SQLite 3 +- Reverse proxy for TLS termination (nginx, Apache, Caddy) + +--- + +## Development Server + +**For testing only - not for production!** + +```bash +# Setup +python3 -m venv venv +source venv/bin/activate +pip install -r requirements.txt + +# Run +python run.py +# or +flask --app app run --debug +``` + +--- + +## Production: WSGI Server + +### Gunicorn + +```bash +pip install gunicorn + +# Basic usage +gunicorn -w 4 -b 0.0.0.0:5000 wsgi:app + +# With Unix socket (recommended for nginx) +gunicorn -w 4 --bind unix:/run/flaskpaste/gunicorn.sock wsgi:app + +# Production settings +gunicorn \ + --workers 4 \ + --bind unix:/run/flaskpaste/gunicorn.sock \ + --access-logfile /var/log/flaskpaste/access.log \ + --error-logfile /var/log/flaskpaste/error.log \ + --capture-output \ + wsgi:app +``` + +### Systemd Service + +Create `/etc/systemd/system/flaskpaste.service`: + +```ini +[Unit] +Description=FlaskPaste Pastebin Service +After=network.target + +[Service] +Type=notify +User=flaskpaste +Group=flaskpaste +RuntimeDirectory=flaskpaste +WorkingDirectory=/opt/flaskpaste +Environment="FLASK_ENV=production" +Environment="FLASKPASTE_DB=/var/lib/flaskpaste/pastes.db" +ExecStart=/opt/flaskpaste/venv/bin/gunicorn \ + --workers 4 \ + --bind unix:/run/flaskpaste/gunicorn.sock \ + wsgi:app +ExecReload=/bin/kill -s HUP $MAINPID +Restart=on-failure +RestartSec=5 + +[Install] +WantedBy=multi-user.target +``` + +```bash +sudo systemctl daemon-reload +sudo systemctl enable --now flaskpaste +``` + +--- + +## Production: Container Deployment + +### Build the Container + +```bash +# Using Podman +podman build -t flaskpaste . + +# Using Docker +docker build -t flaskpaste . +``` + +### Run the Container + +```bash +# Basic run +podman run -d \ + --name flaskpaste \ + -p 5000:5000 \ + -v flaskpaste-data:/app/data \ + flaskpaste + +# With environment configuration +podman run -d \ + --name flaskpaste \ + -p 5000:5000 \ + -v flaskpaste-data:/app/data \ + -e FLASKPASTE_EXPIRY=604800 \ + -e FLASKPASTE_MAX_ANON=1048576 \ + flaskpaste +``` + +### Podman Quadlet (systemd integration) + +Create `/etc/containers/systemd/flaskpaste.container`: + +```ini +[Unit] +Description=FlaskPaste Container +After=local-fs.target + +[Container] +Image=localhost/flaskpaste:latest +ContainerName=flaskpaste +PublishPort=5000:5000 +Volume=flaskpaste-data:/app/data:Z +Environment=FLASK_ENV=production +Environment=FLASKPASTE_EXPIRY=432000 + +[Service] +Restart=on-failure +RestartSec=10 + +[Install] +WantedBy=multi-user.target default.target +``` + +```bash +systemctl --user daemon-reload +systemctl --user start flaskpaste +``` + +--- + +## Reverse Proxy Configuration + +### nginx + +```nginx +upstream flaskpaste { + server unix:/run/flaskpaste/gunicorn.sock; + # or for container: server 127.0.0.1:5000; +} + +server { + listen 443 ssl http2; + server_name paste.example.com; + + ssl_certificate /etc/ssl/certs/paste.example.com.crt; + ssl_certificate_key /etc/ssl/private/paste.example.com.key; + + # Optional: Client certificate authentication + ssl_client_certificate /etc/ssl/certs/ca.crt; + ssl_verify_client optional; + + # Size limit (should match FLASKPASTE_MAX_AUTH) + client_max_body_size 50M; + + location / { + proxy_pass http://flaskpaste; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # Pass client certificate fingerprint + proxy_set_header X-SSL-Client-SHA1 $ssl_client_fingerprint; + } +} +``` + +### Caddy + +```caddyfile +paste.example.com { + reverse_proxy localhost:5000 + + # Client certificate auth (if needed) + tls { + client_auth { + mode request + trusted_ca_cert_file /etc/ssl/certs/ca.crt + } + } +} +``` + +### HAProxy + +```haproxy +frontend https + bind *:443 ssl crt /etc/haproxy/certs/ + + # Route to flaskpaste backend + acl is_paste path_beg /paste + use_backend backend-flaskpaste if is_paste + +backend backend-flaskpaste + mode http + + # Strip /paste prefix before forwarding + http-request replace-path /paste(.*) \1 + http-request set-path / if { path -m len 0 } + + # Request tracing - generate X-Request-ID if not present + http-request set-header X-Request-ID %[uuid()] unless { req.hdr(X-Request-ID) -m found } + http-request set-var(txn.request_id) req.hdr(X-Request-ID) + http-response set-header X-Request-ID %[var(txn.request_id)] + + # Proxy trust secret - proves request came through HAProxy + http-request set-header X-Proxy-Secret your-secret-value-here + + # Pass client certificate fingerprint (if using mTLS) + http-request set-header X-SSL-Client-SHA1 %[ssl_c_sha1,hex,lower] + + server flaskpaste 127.0.0.1:5000 check +``` + +--- + +## Data Persistence + +### Database Location + +Default: `./data/pastes.db` + +Configure via `FLASKPASTE_DB`: +```bash +export FLASKPASTE_DB=/var/lib/flaskpaste/pastes.db +``` + +### Backup + +```bash +# SQLite online backup +sqlite3 /var/lib/flaskpaste/pastes.db ".backup '/backup/pastes-$(date +%Y%m%d).db'" +``` + +### Container Volume + +```bash +# Create named volume +podman volume create flaskpaste-data + +# Backup from volume +podman run --rm \ + -v flaskpaste-data:/app/data:ro \ + -v ./backup:/backup \ + alpine cp /app/data/pastes.db /backup/ +``` + +--- + +## Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `FLASK_ENV` | `development` | Set to `production` in production | +| `FLASKPASTE_DB` | `./data/pastes.db` | Database file path | +| `FLASKPASTE_ID_LENGTH` | `12` | Paste ID length (hex chars) | +| `FLASKPASTE_MAX_ANON` | `3145728` | Max anonymous paste (bytes) | +| `FLASKPASTE_MAX_AUTH` | `52428800` | Max authenticated paste (bytes) | +| `FLASKPASTE_EXPIRY` | `432000` | Paste expiry (seconds) | +| `FLASKPASTE_PROXY_SECRET` | (empty) | Shared secret for proxy trust validation | + +--- + +## Security Checklist + +- [ ] Run behind reverse proxy with TLS +- [ ] Use non-root user in containers +- [ ] Set appropriate file permissions on database +- [ ] Configure firewall (only expose reverse proxy) +- [ ] Set `FLASK_ENV=production` +- [ ] Configure client certificate auth (if needed) +- [ ] Set `FLASKPASTE_PROXY_SECRET` for defense-in-depth +- [ ] Configure proxy to send `X-Proxy-Secret` header +- [ ] Configure proxy to pass/generate `X-Request-ID` +- [ ] Set up log rotation +- [ ] Enable automatic backups +- [ ] Monitor disk usage (paste storage) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..44f23e0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,13 @@ +flask>=3.0 + +# Security & Rate Limiting +flask-limiter>=3.5 +flask-cors>=4.0 + +# Observability +prometheus-flask-exporter>=0.23 + +# Development/Testing +pytest>=8.0 +pytest-cov>=4.0 +locust>=2.20 diff --git a/run.py b/run.py new file mode 100755 index 0000000..63ee5a1 --- /dev/null +++ b/run.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python3 +"""Development server entry point.""" + +from app import create_app + +if __name__ == "__main__": + app = create_app("development") + app.run(host="0.0.0.0", port=5000) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..ad16c9d --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +# FlaskPaste test suite diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..bdb1711 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,77 @@ +"""Pytest fixtures for FlaskPaste tests.""" + +import pytest + +from app import create_app + + +@pytest.fixture +def app(): + """Create application for testing.""" + app = create_app("testing") + yield app + + +@pytest.fixture +def client(app): + """Create test client.""" + return app.test_client() + + +@pytest.fixture +def runner(app): + """Create CLI runner.""" + return app.test_cli_runner() + + +@pytest.fixture +def sample_text(): + """Sample text content for testing.""" + return "Hello, FlaskPaste!" + + +@pytest.fixture +def sample_json(): + """Sample JSON payload for testing.""" + return {"content": "Hello from JSON!"} + + +@pytest.fixture +def auth_header(): + """Valid authentication header.""" + return {"X-SSL-Client-SHA1": "a" * 40} + + +@pytest.fixture +def other_auth_header(): + """Different valid authentication header.""" + return {"X-SSL-Client-SHA1": "b" * 40} + + +@pytest.fixture +def png_bytes(): + """Minimal valid PNG bytes for testing.""" + return ( + b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01" + b"\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde\x00" + b"\x00\x00\x0cIDATx\x9cc\xf8\x0f\x00\x00\x01\x01\x00" + b"\x05\x18\xd8N\x00\x00\x00\x00IEND\xaeB`\x82" + ) + + +@pytest.fixture +def jpeg_bytes(): + """Minimal JPEG magic bytes for testing.""" + return b"\xff\xd8\xff\xe0\x00\x10JFIF\x00" + + +@pytest.fixture +def zip_bytes(): + """ZIP magic bytes for testing.""" + return b"PK\x03\x04" + b"\x00" * 26 + + +@pytest.fixture +def pdf_bytes(): + """PDF magic bytes for testing.""" + return b"%PDF-1.4 test content" diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..967abe4 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,263 @@ +"""Tests for FlaskPaste API endpoints.""" + +import json + + +class TestIndex: + """Tests for GET / endpoint.""" + + def test_get_api_info(self, client): + """GET / returns API information.""" + response = client.get("/") + assert response.status_code == 200 + data = json.loads(response.data) + assert data["name"] == "FlaskPaste" + assert data["version"] == "1.0.0" + assert "endpoints" in data + assert "usage" in data + + def test_api_info_contains_endpoints(self, client): + """API info lists all endpoints.""" + response = client.get("/") + data = json.loads(response.data) + endpoints = data["endpoints"] + assert "GET /" in endpoints + assert "POST /" in endpoints + assert "GET /" in endpoints + assert "GET //raw" in endpoints + assert "DELETE /" in endpoints + + +class TestHealth: + """Tests for GET /health endpoint.""" + + def test_health_endpoint_returns_ok(self, client): + """Health endpoint returns healthy status.""" + response = client.get("/health") + assert response.status_code == 200 + data = json.loads(response.data) + assert data["status"] == "healthy" + assert data["database"] == "ok" + + def test_health_endpoint_json_response(self, client): + """Health endpoint returns JSON content type.""" + response = client.get("/health") + assert "application/json" in response.content_type + + +class TestCreatePaste: + """Tests for POST / endpoint.""" + + def test_create_paste_raw(self, client, sample_text): + """Create paste with raw text body.""" + response = client.post( + "/", + data=sample_text, + content_type="text/plain", + ) + assert response.status_code == 201 + data = json.loads(response.data) + assert "id" in data + assert len(data["id"]) == 12 + assert data["mime_type"] == "text/plain" + assert "url" in data + assert "raw" in data + + def test_create_paste_json(self, client, sample_json): + """Create paste with JSON body.""" + response = client.post( + "/", + data=json.dumps(sample_json), + content_type="application/json", + ) + assert response.status_code == 201 + data = json.loads(response.data) + assert "id" in data + assert data["mime_type"] == "text/plain" + + def test_create_paste_binary(self, client, png_bytes): + """Create paste with binary content detects MIME type.""" + response = client.post( + "/", + data=png_bytes, + content_type="application/octet-stream", + ) + assert response.status_code == 201 + data = json.loads(response.data) + assert data["mime_type"] == "image/png" + + def test_create_paste_empty_fails(self, client): + """Create paste with empty content fails.""" + response = client.post("/", data="") + assert response.status_code == 400 + data = json.loads(response.data) + assert "error" in data + + def test_create_paste_with_auth(self, client, sample_text, auth_header): + """Create paste with authentication includes owner.""" + response = client.post( + "/", + data=sample_text, + content_type="text/plain", + headers=auth_header, + ) + assert response.status_code == 201 + data = json.loads(response.data) + assert "owner" in data + assert data["owner"] == "a" * 40 + + def test_create_paste_invalid_auth_ignored(self, client, sample_text): + """Invalid auth header is ignored (treated as anonymous).""" + response = client.post( + "/", + data=sample_text, + content_type="text/plain", + headers={"X-SSL-Client-SHA1": "invalid"}, + ) + assert response.status_code == 201 + data = json.loads(response.data) + assert "owner" not in data + + +class TestGetPaste: + """Tests for GET / endpoint.""" + + def test_get_paste_metadata(self, client, sample_text): + """Get paste returns metadata.""" + create = client.post("/", data=sample_text, content_type="text/plain") + paste_id = json.loads(create.data)["id"] + + response = client.get(f"/{paste_id}") + assert response.status_code == 200 + data = json.loads(response.data) + assert data["id"] == paste_id + assert data["mime_type"] == "text/plain" + assert data["size"] == len(sample_text) + assert "created_at" in data + assert "raw" in data + + def test_get_paste_not_found(self, client): + """Get nonexistent paste returns 404.""" + response = client.get("/abcd12345678") + assert response.status_code == 404 + data = json.loads(response.data) + assert "error" in data + + def test_get_paste_invalid_id(self, client): + """Get paste with invalid ID returns 400.""" + response = client.get("/invalid!") + assert response.status_code == 400 + + def test_get_paste_wrong_length_id(self, client): + """Get paste with wrong length ID returns 400.""" + response = client.get("/abc") + assert response.status_code == 400 + + +class TestGetPasteRaw: + """Tests for GET //raw endpoint.""" + + def test_get_paste_raw_text(self, client, sample_text): + """Get raw paste returns content with correct MIME type.""" + create = client.post("/", data=sample_text, content_type="text/plain") + paste_id = json.loads(create.data)["id"] + + response = client.get(f"/{paste_id}/raw") + assert response.status_code == 200 + assert response.data.decode("utf-8") == sample_text + assert response.content_type == "text/plain; charset=utf-8" + + def test_get_paste_raw_binary(self, client, png_bytes): + """Get raw binary paste returns correct content.""" + create = client.post( + "/", + data=png_bytes, + content_type="application/octet-stream", + ) + paste_id = json.loads(create.data)["id"] + + response = client.get(f"/{paste_id}/raw") + assert response.status_code == 200 + assert response.data == png_bytes + assert response.content_type == "image/png" + + def test_get_paste_raw_not_found(self, client): + """Get raw nonexistent paste returns 404.""" + response = client.get("/abcd12345678/raw") + assert response.status_code == 404 + + def test_get_paste_raw_inline_disposition(self, client, sample_text): + """Text and image pastes have inline disposition.""" + create = client.post("/", data=sample_text, content_type="text/plain") + paste_id = json.loads(create.data)["id"] + + response = client.get(f"/{paste_id}/raw") + assert response.headers.get("Content-Disposition") == "inline" + + +class TestDeletePaste: + """Tests for DELETE / endpoint.""" + + def test_delete_paste_success(self, client, sample_text, auth_header): + """Delete owned paste succeeds.""" + create = client.post( + "/", + data=sample_text, + content_type="text/plain", + headers=auth_header, + ) + paste_id = json.loads(create.data)["id"] + + response = client.delete(f"/{paste_id}", headers=auth_header) + assert response.status_code == 200 + data = json.loads(response.data) + assert "message" in data + + # Verify deletion + get_response = client.get(f"/{paste_id}") + assert get_response.status_code == 404 + + def test_delete_paste_no_auth(self, client, sample_text): + """Delete without authentication fails.""" + create = client.post("/", data=sample_text, content_type="text/plain") + paste_id = json.loads(create.data)["id"] + + response = client.delete(f"/{paste_id}") + assert response.status_code == 401 + data = json.loads(response.data) + assert "error" in data + + def test_delete_paste_wrong_owner( + self, client, sample_text, auth_header, other_auth_header + ): + """Delete paste owned by another user fails.""" + create = client.post( + "/", + data=sample_text, + content_type="text/plain", + headers=auth_header, + ) + paste_id = json.loads(create.data)["id"] + + response = client.delete(f"/{paste_id}", headers=other_auth_header) + assert response.status_code == 403 + data = json.loads(response.data) + assert "error" in data + + def test_delete_anonymous_paste_fails(self, client, sample_text, auth_header): + """Cannot delete anonymous paste (no owner).""" + create = client.post("/", data=sample_text, content_type="text/plain") + paste_id = json.loads(create.data)["id"] + + response = client.delete(f"/{paste_id}", headers=auth_header) + assert response.status_code == 403 + + def test_delete_paste_not_found(self, client, auth_header): + """Delete nonexistent paste returns 404.""" + response = client.delete("/abcd12345678", headers=auth_header) + assert response.status_code == 404 + + def test_delete_paste_invalid_id(self, client, auth_header): + """Delete with invalid ID returns 400.""" + response = client.delete("/invalid!", headers=auth_header) + assert response.status_code == 400 diff --git a/tests/test_database.py b/tests/test_database.py new file mode 100644 index 0000000..31486ef --- /dev/null +++ b/tests/test_database.py @@ -0,0 +1,90 @@ +"""Tests for database operations.""" + +import json +import time +from unittest.mock import patch + + +class TestDatabaseOperations: + """Tests for database functionality.""" + + def test_paste_persists(self, client, sample_text): + """Paste persists in database and can be retrieved.""" + create = client.post("/", data=sample_text, content_type="text/plain") + paste_id = json.loads(create.data)["id"] + + response = client.get(f"/{paste_id}/raw") + assert response.data.decode("utf-8") == sample_text + + def test_multiple_pastes_independent(self, client): + """Multiple pastes have unique IDs and content.""" + create1 = client.post("/", data="paste one", content_type="text/plain") + create2 = client.post("/", data="paste two", content_type="text/plain") + + id1 = json.loads(create1.data)["id"] + id2 = json.loads(create2.data)["id"] + + assert id1 != id2 + + raw1 = client.get(f"/{id1}/raw") + raw2 = client.get(f"/{id2}/raw") + + assert raw1.data.decode("utf-8") == "paste one" + assert raw2.data.decode("utf-8") == "paste two" + + def test_last_accessed_updated_on_get(self, client, sample_text): + """Last accessed timestamp updates on retrieval.""" + create = client.post("/", data=sample_text, content_type="text/plain") + paste_id = json.loads(create.data)["id"] + + # Get the paste twice with a small delay + client.get(f"/{paste_id}") + first_access = time.time() + + time.sleep(0.1) + + client.get(f"/{paste_id}") + second_access = time.time() + + # Timestamps should be close to current time (within 2 seconds) + assert second_access > first_access + + +class TestCleanupExpiredPastes: + """Tests for paste expiry and cleanup.""" + + def test_expired_paste_cleaned_up(self, app, client, sample_text): + """Expired pastes are removed by cleanup.""" + from app.database import cleanup_expired_pastes + + # Create a paste + create = client.post("/", data=sample_text, content_type="text/plain") + paste_id = json.loads(create.data)["id"] + + # Verify it exists + assert client.get(f"/{paste_id}").status_code == 200 + + # Mock time to simulate expiry (paste expiry + 1 second) + future_time = time.time() + app.config["PASTE_EXPIRY_SECONDS"] + 1 + + with patch("time.time", return_value=future_time): + with app.app_context(): + deleted = cleanup_expired_pastes() + + assert deleted >= 1 + + # Paste should now be gone + assert client.get(f"/{paste_id}").status_code == 404 + + def test_non_expired_paste_kept(self, app, client, sample_text): + """Non-expired pastes are preserved by cleanup.""" + from app.database import cleanup_expired_pastes + + create = client.post("/", data=sample_text, content_type="text/plain") + paste_id = json.loads(create.data)["id"] + + with app.app_context(): + deleted = cleanup_expired_pastes() + + assert deleted == 0 + assert client.get(f"/{paste_id}").status_code == 200 diff --git a/tests/test_mime_detection.py b/tests/test_mime_detection.py new file mode 100644 index 0000000..e2c43ef --- /dev/null +++ b/tests/test_mime_detection.py @@ -0,0 +1,95 @@ +"""Tests for MIME type detection.""" + +import json + + +class TestMimeDetection: + """Tests for automatic MIME type detection.""" + + def test_detect_png(self, client, png_bytes): + """Detect PNG from magic bytes.""" + response = client.post("/", data=png_bytes) + data = json.loads(response.data) + assert data["mime_type"] == "image/png" + + def test_detect_jpeg(self, client, jpeg_bytes): + """Detect JPEG from magic bytes.""" + response = client.post("/", data=jpeg_bytes) + data = json.loads(response.data) + assert data["mime_type"] == "image/jpeg" + + def test_detect_zip(self, client, zip_bytes): + """Detect ZIP from magic bytes.""" + response = client.post("/", data=zip_bytes) + data = json.loads(response.data) + assert data["mime_type"] == "application/zip" + + def test_detect_pdf(self, client, pdf_bytes): + """Detect PDF from magic bytes.""" + response = client.post("/", data=pdf_bytes) + data = json.loads(response.data) + assert data["mime_type"] == "application/pdf" + + def test_detect_gif87a(self, client): + """Detect GIF87a from magic bytes.""" + response = client.post("/", data=b"GIF87a" + b"\x00" * 10) + data = json.loads(response.data) + assert data["mime_type"] == "image/gif" + + def test_detect_gif89a(self, client): + """Detect GIF89a from magic bytes.""" + response = client.post("/", data=b"GIF89a" + b"\x00" * 10) + data = json.loads(response.data) + assert data["mime_type"] == "image/gif" + + def test_detect_gzip(self, client): + """Detect GZIP from magic bytes.""" + response = client.post("/", data=b"\x1f\x8b\x08" + b"\x00" * 10) + data = json.loads(response.data) + assert data["mime_type"] == "application/gzip" + + def test_detect_utf8_text(self, client): + """UTF-8 text defaults to text/plain.""" + response = client.post("/", data="Hello, world! 你好") + data = json.loads(response.data) + assert data["mime_type"] == "text/plain" + + def test_detect_binary_fallback(self, client): + """Non-UTF8 binary without magic falls back to octet-stream.""" + response = client.post("/", data=b"\x80\x81\x82\x83\x84") + data = json.loads(response.data) + assert data["mime_type"] == "application/octet-stream" + + def test_explicit_content_type_honored(self, client): + """Explicit Content-Type is honored for non-generic types.""" + response = client.post( + "/", + data="test", + content_type="text/html", + ) + data = json.loads(response.data) + assert data["mime_type"] == "text/html" + + def test_generic_content_type_overridden(self, client, png_bytes): + """Generic Content-Type is overridden by magic detection.""" + response = client.post( + "/", + data=png_bytes, + content_type="application/octet-stream", + ) + data = json.loads(response.data) + assert data["mime_type"] == "image/png" + + def test_webp_detection(self, client): + """Detect WebP from RIFF...WEBP magic.""" + webp_header = b"RIFF\x00\x00\x00\x00WEBP" + response = client.post("/", data=webp_header + b"\x00" * 20) + data = json.loads(response.data) + assert data["mime_type"] == "image/webp" + + def test_riff_non_webp_not_detected(self, client): + """RIFF without WEBP marker is not detected as WebP.""" + riff_other = b"RIFF\x00\x00\x00\x00WAVE" + response = client.post("/", data=riff_other + b"\x00" * 20) + data = json.loads(response.data) + assert data["mime_type"] != "image/webp" diff --git a/tests/test_security.py b/tests/test_security.py new file mode 100644 index 0000000..aaf9f02 --- /dev/null +++ b/tests/test_security.py @@ -0,0 +1,325 @@ +"""Security-focused tests for FlaskPaste.""" + +import json + + +class TestSecurityHeaders: + """Tests for security headers.""" + + def test_x_content_type_options(self, client): + """X-Content-Type-Options header is set.""" + response = client.get("/") + assert response.headers.get("X-Content-Type-Options") == "nosniff" + + def test_x_frame_options(self, client): + """X-Frame-Options header is set.""" + response = client.get("/") + assert response.headers.get("X-Frame-Options") == "DENY" + + def test_x_xss_protection(self, client): + """X-XSS-Protection header is set.""" + response = client.get("/") + assert response.headers.get("X-XSS-Protection") == "1; mode=block" + + def test_content_security_policy(self, client): + """Content-Security-Policy header is set.""" + response = client.get("/") + csp = response.headers.get("Content-Security-Policy") + assert csp is not None + assert "default-src 'none'" in csp + + def test_referrer_policy(self, client): + """Referrer-Policy header is set.""" + response = client.get("/") + assert response.headers.get("Referrer-Policy") == "strict-origin-when-cross-origin" + + def test_permissions_policy(self, client): + """Permissions-Policy header is set.""" + response = client.get("/") + assert response.headers.get("Permissions-Policy") is not None + + def test_security_headers_on_error_responses(self, client): + """Security headers are present on error responses.""" + response = client.get("/nonexist1234") # 12 chars, valid format but not found + assert response.headers.get("X-Content-Type-Options") == "nosniff" + assert response.headers.get("X-Frame-Options") == "DENY" + + def test_hsts_header(self, client): + """Strict-Transport-Security header is set.""" + response = client.get("/") + hsts = response.headers.get("Strict-Transport-Security") + assert hsts is not None + assert "max-age=" in hsts + assert "includeSubDomains" in hsts + + def test_cache_control_header(self, client): + """Cache-Control header prevents caching of sensitive data.""" + response = client.get("/") + cache = response.headers.get("Cache-Control") + assert cache is not None + assert "no-store" in cache + assert "private" in cache + + def test_pragma_header(self, client): + """Pragma header set for HTTP/1.0 compatibility.""" + response = client.get("/") + assert response.headers.get("Pragma") == "no-cache" + + +class TestRequestIdTracking: + """Tests for X-Request-ID tracking.""" + + def test_request_id_generated(self, client): + """Request ID is generated when not provided.""" + response = client.get("/") + request_id = response.headers.get("X-Request-ID") + assert request_id is not None + # Should be a valid UUID format + assert len(request_id) == 36 + assert request_id.count("-") == 4 + + def test_request_id_passed_through(self, client): + """Request ID from client is echoed back.""" + custom_id = "my-custom-request-id-12345" + response = client.get("/", headers={"X-Request-ID": custom_id}) + assert response.headers.get("X-Request-ID") == custom_id + + def test_request_id_on_error_responses(self, client): + """Request ID is present on error responses.""" + custom_id = "error-request-id-67890" + response = client.get("/nonexist1234", headers={"X-Request-ID": custom_id}) + assert response.headers.get("X-Request-ID") == custom_id + + +class TestProxyTrustValidation: + """Tests for reverse proxy trust validation.""" + + def test_auth_works_without_proxy_secret_configured(self, client, sample_text, auth_header): + """Auth header trusted when no proxy secret is configured (default).""" + response = client.post( + "/", + data=sample_text, + content_type="text/plain", + headers=auth_header, + ) + assert response.status_code == 201 + import json + data = json.loads(response.data) + assert "owner" in data + + def test_auth_ignored_with_wrong_proxy_secret(self, app, client, sample_text, auth_header): + """Auth header ignored when proxy secret doesn't match.""" + # Configure a proxy secret + app.config["TRUSTED_PROXY_SECRET"] = "correct-secret-value" + + try: + # Request with wrong secret - auth should be ignored + headers = dict(auth_header) + headers["X-Proxy-Secret"] = "wrong-secret" + response = client.post( + "/", + data=sample_text, + content_type="text/plain", + headers=headers, + ) + assert response.status_code == 201 + import json + data = json.loads(response.data) + # Owner should NOT be set because proxy wasn't trusted + assert "owner" not in data + finally: + # Reset config + app.config["TRUSTED_PROXY_SECRET"] = "" + + def test_auth_works_with_correct_proxy_secret(self, app, client, sample_text, auth_header): + """Auth header trusted when proxy secret matches.""" + # Configure a proxy secret + app.config["TRUSTED_PROXY_SECRET"] = "correct-secret-value" + + try: + # Request with correct secret - auth should work + headers = dict(auth_header) + headers["X-Proxy-Secret"] = "correct-secret-value" + response = client.post( + "/", + data=sample_text, + content_type="text/plain", + headers=headers, + ) + assert response.status_code == 201 + import json + data = json.loads(response.data) + assert "owner" in data + finally: + # Reset config + app.config["TRUSTED_PROXY_SECRET"] = "" + + +class TestInputValidation: + """Tests for input validation and sanitization.""" + + def test_paste_id_hex_only(self, client): + """Paste IDs must be hexadecimal.""" + # IDs that match the route pattern but fail validation (12 chars) + invalid_ids = [ + "ABCD12345678", # uppercase + "abcd-2345678", # dash + "abcd_2345678", # underscore + "abcd<>345678", # angle brackets + ] + for invalid_id in invalid_ids: + response = client.get(f"/{invalid_id}") + assert response.status_code == 400, f"Expected 400 for ID: {invalid_id}" + + # IDs with slashes are routed differently (404 from Flask routing) + slash_ids = ["abcd/2345678", "../abcd12345"] + for invalid_id in slash_ids: + response = client.get(f"/{invalid_id}") + assert response.status_code in (400, 404), f"Unexpected for ID: {invalid_id}" + + def test_paste_id_length_enforced(self, client): + """Paste IDs must be exactly the configured length (12 chars).""" + too_short = "abcd1234" # 8 chars + too_long = "abcdef1234567890abcd" # 20 chars + + assert client.get(f"/{too_short}").status_code == 400 + assert client.get(f"/{too_long}").status_code == 400 + + def test_auth_header_format_validated(self, client, sample_text): + """Auth header must be valid SHA1 format.""" + invalid_headers = [ + "a" * 39, # too short + "a" * 41, # too long + "g" * 40, # invalid hex + "A" * 40, # uppercase (should be lowercased internally) + "a-a" * 13 + "a", # dashes + ] + + for invalid in invalid_headers: + response = client.post( + "/", + data=sample_text, + content_type="text/plain", + headers={"X-SSL-Client-SHA1": invalid}, + ) + # Invalid auth is ignored, not rejected (treated as anonymous) + data = json.loads(response.data) + assert "owner" not in data or data.get("owner") != invalid + + def test_mime_type_sanitized(self, client): + """MIME types are sanitized to prevent injection.""" + # Flask/Werkzeug rejects newlines in headers at the framework level + # Test that MIME type parameters are stripped to base type + response = client.post( + "/", + data="test content", + content_type="text/plain; charset=utf-8; boundary=something", + ) + data = json.loads(response.data) + # Should be sanitized to just the base MIME type + assert data["mime_type"] == "text/plain" + assert "charset" not in data["mime_type"] + assert "boundary" not in data["mime_type"] + + +class TestSizeLimits: + """Tests for paste size limits.""" + + def test_anonymous_size_limit(self, app, client): + """Anonymous pastes are limited in size.""" + max_size = app.config["MAX_PASTE_SIZE_ANON"] + oversized = "x" * (max_size + 1) + + response = client.post("/", data=oversized, content_type="text/plain") + assert response.status_code == 413 + data = json.loads(response.data) + assert "error" in data + assert data["authenticated"] is False + + def test_authenticated_larger_limit(self, app, client, auth_header): + """Authenticated users have larger size limit.""" + anon_max = app.config["MAX_PASTE_SIZE_ANON"] + auth_max = app.config["MAX_PASTE_SIZE_AUTH"] + + # Size that exceeds anon but within auth limit + # (only test if auth limit is larger) + if auth_max > anon_max: + size = anon_max + 100 + content = "x" * size + + response = client.post( + "/", + data=content, + content_type="text/plain", + headers=auth_header, + ) + assert response.status_code == 201 + + +class TestOwnershipEnforcement: + """Tests for paste ownership and access control.""" + + def test_cannot_delete_others_paste( + self, client, sample_text, auth_header, other_auth_header + ): + """Users cannot delete pastes they don't own.""" + create = client.post( + "/", + data=sample_text, + content_type="text/plain", + headers=auth_header, + ) + paste_id = json.loads(create.data)["id"] + + response = client.delete(f"/{paste_id}", headers=other_auth_header) + assert response.status_code == 403 + + # Paste should still exist + assert client.get(f"/{paste_id}").status_code == 200 + + def test_anonymous_paste_undeletable(self, client, sample_text, auth_header): + """Anonymous pastes cannot be deleted by anyone.""" + create = client.post("/", data=sample_text, content_type="text/plain") + paste_id = json.loads(create.data)["id"] + + response = client.delete(f"/{paste_id}", headers=auth_header) + assert response.status_code == 403 + + def test_owner_can_delete(self, client, sample_text, auth_header): + """Owners can delete their own pastes.""" + create = client.post( + "/", + data=sample_text, + content_type="text/plain", + headers=auth_header, + ) + paste_id = json.loads(create.data)["id"] + + response = client.delete(f"/{paste_id}", headers=auth_header) + assert response.status_code == 200 + + +class TestJsonResponses: + """Tests for JSON response format and encoding.""" + + def test_json_content_type(self, client): + """API responses have correct Content-Type.""" + response = client.get("/") + assert "application/json" in response.content_type + + def test_unicode_in_response(self, client): + """Unicode content is properly encoded in responses.""" + unicode_text = "Hello 你好 مرحبا 🎉" + create = client.post("/", data=unicode_text.encode("utf-8")) + assert create.status_code == 201 + + paste_id = json.loads(create.data)["id"] + raw = client.get(f"/{paste_id}/raw") + assert raw.data.decode("utf-8") == unicode_text + + def test_error_responses_are_json(self, client): + """Error responses are valid JSON.""" + response = client.get("/nonexist1234") # 12 chars + assert response.status_code in (400, 404) + data = json.loads(response.data) + assert "error" in data diff --git a/wsgi.py b/wsgi.py new file mode 100644 index 0000000..2958bd8 --- /dev/null +++ b/wsgi.py @@ -0,0 +1,5 @@ +"""WSGI entry point for production servers.""" + +from app import create_app + +application = create_app("production")