pki: add minimal certificate authority

- CA generation with encrypted private key storage (AES-256-GCM)
- Client certificate issuance with configurable validity
- Certificate revocation with status tracking
- SHA1 fingerprint integration with existing mTLS auth
- API endpoints: /pki/status, /pki/ca, /pki/issue, /pki/revoke
- CLI commands: fpaste pki status/issue/revoke
- Comprehensive test coverage
This commit is contained in:
Username
2025-12-20 17:20:15 +01:00
parent 7deba711d4
commit 4e38517faf
9 changed files with 3815 additions and 481 deletions

View File

@@ -1,7 +1,27 @@
# FlaskPaste Container Image # FlaskPaste Container Image (Multi-Stage Build)
# Build: podman build -t flaskpaste . # Build: podman build -t flaskpaste .
# Run: podman run -d -p 5000:5000 -v flaskpaste-data:/app/data flaskpaste # Run: podman run -d -p 5000:5000 -v flaskpaste-data:/app/data flaskpaste
# Stage 1: Build dependencies
FROM python:3.11-slim AS builder
WORKDIR /build
# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Create virtual environment
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt gunicorn
# Stage 2: Runtime image
FROM python:3.11-slim FROM python:3.11-slim
LABEL maintainer="FlaskPaste" LABEL maintainer="FlaskPaste"
@@ -10,19 +30,19 @@ LABEL description="Lightweight secure pastebin REST API"
# Create non-root user # Create non-root user
RUN groupadd -r flaskpaste && useradd -r -g flaskpaste flaskpaste RUN groupadd -r flaskpaste && useradd -r -g flaskpaste flaskpaste
# Copy virtual environment from builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Set working directory # Set working directory
WORKDIR /app WORKDIR /app
# Install dependencies first (cache layer) # Copy only necessary application files
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt gunicorn
# Copy application code
COPY app/ ./app/ COPY app/ ./app/
COPY wsgi.py . COPY wsgi.py .
COPY fpaste . COPY fpaste .
# Create data directory # Create data directory with correct ownership
RUN mkdir -p /app/data && chown -R flaskpaste:flaskpaste /app RUN mkdir -p /app/data && chown -R flaskpaste:flaskpaste /app
# Switch to non-root user # Switch to non-root user
@@ -32,6 +52,7 @@ USER flaskpaste
ENV FLASK_ENV=production ENV FLASK_ENV=production
ENV FLASKPASTE_DB=/app/data/pastes.db ENV FLASKPASTE_DB=/app/data/pastes.db
ENV PYTHONUNBUFFERED=1 ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
# Expose port # Expose port
EXPOSE 5000 EXPOSE 5000

File diff suppressed because it is too large Load Diff

View File

@@ -4,7 +4,7 @@ import os
from pathlib import Path from pathlib import Path
# Application version # Application version
VERSION = "1.1.0" VERSION = "1.2.0"
class Config: class Config:
@@ -21,6 +21,8 @@ class Config:
# Paste expiry (default 5 days) # Paste expiry (default 5 days)
PASTE_EXPIRY_SECONDS = int(os.environ.get("FLASKPASTE_EXPIRY", 5 * 24 * 60 * 60)) PASTE_EXPIRY_SECONDS = int(os.environ.get("FLASKPASTE_EXPIRY", 5 * 24 * 60 * 60))
# Maximum custom expiry (default 30 days, 0 = use default expiry as max)
MAX_EXPIRY_SECONDS = int(os.environ.get("FLASKPASTE_MAX_EXPIRY", 30 * 24 * 60 * 60))
# Content deduplication / abuse prevention # Content deduplication / abuse prevention
# Throttle repeated submissions of identical content # Throttle repeated submissions of identical content
@@ -54,6 +56,16 @@ class Config:
# URL prefix for reverse proxy deployments (e.g., "/paste" for mymx.me/paste) # URL prefix for reverse proxy deployments (e.g., "/paste" for mymx.me/paste)
URL_PREFIX = os.environ.get("FLASKPASTE_URL_PREFIX", "").rstrip("/") URL_PREFIX = os.environ.get("FLASKPASTE_URL_PREFIX", "").rstrip("/")
# PKI Configuration
# Enable PKI endpoints for certificate authority and issuance
PKI_ENABLED = os.environ.get("FLASKPASTE_PKI_ENABLED", "0").lower() in ("1", "true", "yes")
# CA password for signing operations (REQUIRED when PKI is enabled)
PKI_CA_PASSWORD = os.environ.get("FLASKPASTE_PKI_CA_PASSWORD", "")
# Default validity period for issued certificates (days)
PKI_CERT_DAYS = int(os.environ.get("FLASKPASTE_PKI_CERT_DAYS", "365"))
# CA certificate validity period (days)
PKI_CA_DAYS = int(os.environ.get("FLASKPASTE_PKI_CA_DAYS", "3650")) # 10 years
class DevelopmentConfig(Config): class DevelopmentConfig(Config):
"""Development configuration.""" """Development configuration."""
@@ -80,6 +92,12 @@ class TestingConfig(Config):
# Disable PoW for most tests (easier testing) # Disable PoW for most tests (easier testing)
POW_DIFFICULTY = 0 POW_DIFFICULTY = 0
# PKI testing configuration
PKI_ENABLED = True
PKI_CA_PASSWORD = "test-ca-password"
PKI_CERT_DAYS = 30
PKI_CA_DAYS = 365
config = { config = {
"development": DevelopmentConfig, "development": DevelopmentConfig,

View File

@@ -1,5 +1,7 @@
"""Database connection and schema management.""" """Database connection and schema management."""
import hashlib
import secrets
import sqlite3 import sqlite3
import time import time
from pathlib import Path from pathlib import Path
@@ -13,7 +15,10 @@ CREATE TABLE IF NOT EXISTS pastes (
mime_type TEXT NOT NULL DEFAULT 'text/plain', mime_type TEXT NOT NULL DEFAULT 'text/plain',
owner TEXT, owner TEXT,
created_at INTEGER NOT NULL, created_at INTEGER NOT NULL,
last_accessed INTEGER NOT NULL last_accessed INTEGER NOT NULL,
burn_after_read INTEGER NOT NULL DEFAULT 0,
expires_at INTEGER,
password_hash TEXT
); );
CREATE INDEX IF NOT EXISTS idx_pastes_created_at ON pastes(created_at); CREATE INDEX IF NOT EXISTS idx_pastes_created_at ON pastes(created_at);
@@ -29,8 +34,86 @@ CREATE TABLE IF NOT EXISTS content_hashes (
); );
CREATE INDEX IF NOT EXISTS idx_content_hashes_last_seen ON content_hashes(last_seen); CREATE INDEX IF NOT EXISTS idx_content_hashes_last_seen ON content_hashes(last_seen);
-- PKI: Certificate Authority storage
CREATE TABLE IF NOT EXISTS certificate_authority (
id TEXT PRIMARY KEY DEFAULT 'default',
common_name TEXT NOT NULL,
certificate_pem TEXT NOT NULL,
private_key_encrypted BLOB NOT NULL,
key_salt BLOB NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
key_algorithm TEXT NOT NULL,
owner TEXT
);
-- PKI: Issued client certificates
CREATE TABLE IF NOT EXISTS issued_certificates (
serial TEXT PRIMARY KEY,
ca_id TEXT NOT NULL DEFAULT 'default',
common_name TEXT NOT NULL,
fingerprint_sha1 TEXT NOT NULL UNIQUE,
certificate_pem TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
issued_to TEXT,
status TEXT NOT NULL DEFAULT 'valid',
revoked_at INTEGER,
FOREIGN KEY(ca_id) REFERENCES certificate_authority(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_certs_fingerprint ON issued_certificates(fingerprint_sha1);
CREATE INDEX IF NOT EXISTS idx_certs_status ON issued_certificates(status);
CREATE INDEX IF NOT EXISTS idx_certs_ca_id ON issued_certificates(ca_id);
""" """
# Password hashing constants
_HASH_ITERATIONS = 600000 # OWASP 2023 recommendation for PBKDF2-SHA256
_SALT_LENGTH = 32
def hash_password(password: str) -> str:
"""Hash password using PBKDF2-HMAC-SHA256.
Returns format: $pbkdf2-sha256$iterations$salt$hash
All values are hex-encoded.
"""
if not password:
return None
salt = secrets.token_bytes(_SALT_LENGTH)
dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, _HASH_ITERATIONS)
return f"$pbkdf2-sha256${_HASH_ITERATIONS}${salt.hex()}${dk.hex()}"
def verify_password(password: str, password_hash: str) -> bool:
"""Verify password against stored hash.
Uses constant-time comparison to prevent timing attacks.
"""
if not password or not password_hash:
return False
try:
# Parse hash format: $pbkdf2-sha256$iterations$salt$hash
parts = password_hash.split("$")
if len(parts) != 5 or parts[1] != "pbkdf2-sha256":
return False
iterations = int(parts[2])
salt = bytes.fromhex(parts[3])
stored_hash = bytes.fromhex(parts[4])
# Compute hash of provided password
dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, iterations)
# Constant-time comparison
return secrets.compare_digest(dk, stored_hash)
except (ValueError, IndexError):
return False
# Hold reference for in-memory shared cache databases # Hold reference for in-memory shared cache databases
_memory_db_holder = None _memory_db_holder = None
@@ -98,15 +181,27 @@ def init_db() -> None:
def cleanup_expired_pastes() -> int: def cleanup_expired_pastes() -> int:
"""Delete pastes that haven't been accessed within expiry period. """Delete pastes that have expired.
Pastes expire based on:
- Custom expires_at timestamp if set
- Default expiry from last_accessed if expires_at is NULL
Returns number of deleted pastes. Returns number of deleted pastes.
""" """
expiry_seconds = current_app.config["PASTE_EXPIRY_SECONDS"] expiry_seconds = current_app.config["PASTE_EXPIRY_SECONDS"]
cutoff = int(time.time()) - expiry_seconds now = int(time.time())
default_cutoff = now - expiry_seconds
db = get_db() db = get_db()
cursor = db.execute("DELETE FROM pastes WHERE last_accessed < ?", (cutoff,)) # Delete pastes with custom expiry that have passed,
# OR pastes without custom expiry that exceed default window
cursor = db.execute(
"""DELETE FROM pastes WHERE
(expires_at IS NOT NULL AND expires_at < ?)
OR (expires_at IS NULL AND last_accessed < ?)""",
(now, default_cutoff),
)
db.commit() db.commit()
return cursor.rowcount return cursor.rowcount
@@ -146,15 +241,14 @@ def check_content_hash(content_hash: str) -> tuple[bool, int]:
# Check existing hash record # Check existing hash record
row = db.execute( row = db.execute(
"SELECT count, last_seen FROM content_hashes WHERE hash = ?", "SELECT count, last_seen FROM content_hashes WHERE hash = ?", (content_hash,)
(content_hash,)
).fetchone() ).fetchone()
if row is None: if row is None:
# First time seeing this content # First time seeing this content
db.execute( db.execute(
"INSERT INTO content_hashes (hash, first_seen, last_seen, count) VALUES (?, ?, ?, 1)", "INSERT INTO content_hashes (hash, first_seen, last_seen, count) VALUES (?, ?, ?, 1)",
(content_hash, now, now) (content_hash, now, now),
) )
db.commit() db.commit()
return True, 1 return True, 1
@@ -163,7 +257,7 @@ def check_content_hash(content_hash: str) -> tuple[bool, int]:
# Outside window, reset counter # Outside window, reset counter
db.execute( db.execute(
"UPDATE content_hashes SET first_seen = ?, last_seen = ?, count = 1 WHERE hash = ?", "UPDATE content_hashes SET first_seen = ?, last_seen = ?, count = 1 WHERE hash = ?",
(now, now, content_hash) (now, now, content_hash),
) )
db.commit() db.commit()
return True, 1 return True, 1
@@ -178,7 +272,7 @@ def check_content_hash(content_hash: str) -> tuple[bool, int]:
# Update counter # Update counter
db.execute( db.execute(
"UPDATE content_hashes SET last_seen = ?, count = ? WHERE hash = ?", "UPDATE content_hashes SET last_seen = ?, count = ? WHERE hash = ?",
(now, current_count, content_hash) (now, current_count, content_hash),
) )
db.commit() db.commit()

1019
app/pki.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -103,7 +103,7 @@ Host: localhost:5000
```json ```json
{ {
"name": "FlaskPaste", "name": "FlaskPaste",
"version": "1.1.0", "version": "1.2.0",
"endpoints": { "endpoints": {
"GET /": "API information", "GET /": "API information",
"GET /health": "Health check", "GET /health": "Health check",
@@ -159,6 +159,45 @@ X-PoW-Solution: 12345678
**Request (Burn-After-Read):** **Request (Burn-After-Read):**
Create a paste that deletes itself after first retrieval: Create a paste that deletes itself after first retrieval:
```http
POST / HTTP/1.1
Host: localhost:5000
Content-Type: text/plain
X-Burn-After-Read: true
```
**Request (Custom Expiry):**
Create a paste with custom expiry time (in seconds):
```http
POST / HTTP/1.1
Host: localhost:5000
Content-Type: text/plain
X-Expiry: 3600
```
**Request (Password Protected):**
Create a paste that requires a password to access:
```http
POST / HTTP/1.1
Host: localhost:5000
Content-Type: text/plain
X-Paste-Password: secretpassword
```
**Response (201 Created):**
```json
{
"id": "abc12345",
"url": "/abc12345",
"raw": "/abc12345/raw",
"mime_type": "text/plain", "mime_type": "text/plain",
"created_at": 1700000000, "created_at": 1700000000,
"owner": "a1b2c3...", // Only present if authenticated "owner": "a1b2c3...", // Only present if authenticated
@@ -167,7 +206,10 @@ Hello, World!
"password_protected": true // Only present if password set "password_protected": true // Only present if password set
} }
``` ```
**Errors:**
| Code | Description |
|------|-------------|
| 400 | No content provided | | 400 | No content provided |
| 400 | Password too long (max 1024 chars) | | 400 | Password too long (max 1024 chars) |
| 400 | Proof-of-work required (when PoW enabled) | | 400 | Proof-of-work required (when PoW enabled) |
@@ -175,6 +217,7 @@ Hello, World!
| 413 | Paste too large | | 413 | Paste too large |
| 429 | Duplicate content rate limit exceeded | | 429 | Duplicate content rate limit exceeded |
**Size Limits:**
- Anonymous: 3 MiB (configurable via `FLASKPASTE_MAX_ANON`) - Anonymous: 3 MiB (configurable via `FLASKPASTE_MAX_ANON`)
- Authenticated: 50 MiB (configurable via `FLASKPASTE_MAX_AUTH`) - Authenticated: 50 MiB (configurable via `FLASKPASTE_MAX_AUTH`)
@@ -198,6 +241,13 @@ GET /abc12345 HTTP/1.1
Host: localhost:5000 Host: localhost:5000
X-Paste-Password: secretpassword X-Paste-Password: secretpassword
``` ```
**Response (200 OK):**
```json
{
"id": "abc12345",
"mime_type": "text/plain",
"size": 1234,
"created_at": 1700000000, "created_at": 1700000000,
"raw": "/abc12345/raw", "raw": "/abc12345/raw",
"password_protected": true // Only present if protected "password_protected": true // Only present if protected
@@ -205,7 +255,8 @@ Host: localhost:5000
``` ```
**Errors:** **Errors:**
| Code | Description |
|------|-------------|
| 400 | Invalid paste ID format | | 400 | Invalid paste ID format |
| 401 | Password required | | 401 | Password required |
| 403 | Invalid password | | 403 | Invalid password |
@@ -213,6 +264,8 @@ Host: localhost:5000
--- ---
### GET /{id}/raw
### HEAD /{id}/raw ### HEAD /{id}/raw
Retrieve raw paste content with correct MIME type. HEAD returns headers only (useful for checking MIME type and size without downloading content). Retrieve raw paste content with correct MIME type. HEAD returns headers only (useful for checking MIME type and size without downloading content).
@@ -229,6 +282,13 @@ GET /abc12345/raw HTTP/1.1
Host: localhost:5000 Host: localhost:5000
X-Paste-Password: secretpassword X-Paste-Password: secretpassword
``` ```
**Response (200 OK):**
```http
HTTP/1.1 200 OK
Content-Type: image/png
Content-Disposition: inline
<binary content> <binary content>
``` ```
@@ -245,6 +305,8 @@ Content-Disposition: inline
--- ---
### DELETE /{id}
Delete a paste. Requires authentication and ownership. Delete a paste. Requires authentication and ownership.
**Request:** **Request:**
@@ -306,6 +368,135 @@ Pastes expire based on last access time (default: 5 days).
```bash ```bash
# Paste expires in 1 hour # Paste expires in 1 hour
curl -H "X-Expiry: 3600" --data-binary @file.txt http://host/
```
**Configuration:**
```bash
export FLASKPASTE_EXPIRY=432000 # Default expiry: 5 days
export FLASKPASTE_MAX_EXPIRY=2592000 # Max custom expiry: 30 days
```
**Notes:**
- Custom expiry is capped at `FLASKPASTE_MAX_EXPIRY`
- Invalid or negative values are ignored (uses default)
- Response includes `expires_at` timestamp when custom expiry is set
---
## Burn-After-Read
Single-access pastes that delete themselves after first retrieval.
**How it works:**
- Set `X-Burn-After-Read: true` header on creation
- First `GET /{id}/raw` returns content and deletes paste
- Subsequent requests return 404
- Metadata `GET /{id}` does not trigger burn
- `HEAD` requests do not trigger burn
**Usage:**
```bash
# Create burn-after-read paste
curl -H "X-Burn-After-Read: true" --data-binary @secret.txt http://host/
# Response indicates burn is enabled
{
"id": "abc12345",
"burn_after_read": true,
...
}
```
**Notes:**
- Response includes `X-Burn-After-Read: true` header when content is retrieved
- Can be combined with custom expiry (paste expires OR burns, whichever first)
- Accepted values: `true`, `1`, `yes` (case-insensitive)
---
## Password Protection
Pastes can be protected with a password using PBKDF2-HMAC-SHA256 hashing.
**Creating a protected paste:**
```http
POST / HTTP/1.1
Host: localhost:5000
Content-Type: text/plain
X-Paste-Password: mysecretpassword
```
**Response (201 Created):**
```json
{
"id": "abc12345",
"url": "/abc12345",
"raw": "/abc12345/raw",
"mime_type": "text/plain",
"created_at": 1700000000,
"password_protected": true
}
```
**Accessing protected paste:**
```http
GET /abc12345 HTTP/1.1
Host: localhost:5000
X-Paste-Password: mysecretpassword
```
**Errors:**
| Code | Description |
|------|-------------|
| 400 | Password too long (max 1024 chars) |
| 401 | Password required |
| 403 | Invalid password |
**Response (401 Unauthorized):**
```json
{
"error": "Password required",
"password_protected": true
}
```
**Response (403 Forbidden):**
```json
{
"error": "Invalid password"
}
```
**Security Implementation:**
- PBKDF2-HMAC-SHA256 with 600,000 iterations (OWASP 2023)
- 32-byte random salt per password
- Constant-time comparison prevents timing attacks
- Passwords never logged or stored in plaintext
- Maximum 1024 characters to prevent DoS via expensive hashing
**CLI Usage:**
```bash
# Create protected paste
./fpaste create -p "mypassword" secret.txt
# Retrieve protected paste
./fpaste get -p "mypassword" abc12345
```
**Notes:**
- Password protection can be combined with burn-after-read and custom expiry
- Unicode passwords are supported
- Special characters are allowed
---
## Abuse Prevention
FlaskPaste includes content-hash based deduplication to prevent spam and abuse.
**How it works:**
- Each paste's SHA256 content hash is tracked - Each paste's SHA256 content hash is tracked
- Repeated submissions of identical content are throttled - Repeated submissions of identical content are throttled
- After exceeding the threshold, further duplicates are rejected with 429 - After exceeding the threshold, further duplicates are rejected with 429
@@ -513,3 +704,235 @@ http-request set-header X-Proxy-Secret your-secret-value
FlaskPaste includes an optional minimal PKI for issuing client certificates. FlaskPaste includes an optional minimal PKI for issuing client certificates.
### Configuration
Enable PKI via environment variables:
```bash
export FLASKPASTE_PKI_ENABLED=1 # Enable PKI endpoints
export FLASKPASTE_PKI_CA_PASSWORD="secret" # Required: CA password
export FLASKPASTE_PKI_CERT_DAYS=365 # Client certificate validity (days)
export FLASKPASTE_PKI_CA_DAYS=3650 # CA certificate validity (days)
```
### Certificate Revocation
When PKI is enabled, certificates issued by the CA are tracked. Revoked certificates are rejected during authentication (treated as anonymous).
---
### GET /pki
Get PKI status and CA information.
**Request:**
```http
GET /pki HTTP/1.1
Host: localhost:5000
```
**Response (PKI disabled):**
```json
{
"enabled": false
}
```
**Response (PKI enabled, no CA):**
```json
{
"enabled": true,
"ca_exists": false,
"hint": "POST /pki/ca to generate CA"
}
```
**Response (PKI enabled with CA):**
```json
{
"enabled": true,
"ca_exists": true,
"common_name": "FlaskPaste CA",
"fingerprint_sha1": "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2",
"created_at": 1700000000,
"expires_at": 2015000000,
"key_algorithm": "ec:secp384r1"
}
```
---
### POST /pki/ca
Generate a new Certificate Authority. Only works once (first-run bootstrap).
**Request:**
```http
POST /pki/ca HTTP/1.1
Host: localhost:5000
Content-Type: application/json
```
**Response (201 Created):**
```json
{
"message": "CA generated",
"common_name": "My CA",
"fingerprint_sha1": "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2",
"created_at": 1700000000,
"expires_at": 2015000000,
"download": "/pki/ca.crt"
}
```
**Errors:**
| Code | Description |
|------|-------------|
| 404 | PKI not enabled |
| 409 | CA already exists |
| 500 | PKI_CA_PASSWORD not configured |
---
### GET /pki/ca.crt
Download CA certificate in PEM format (for trust store).
**Request:**
```http
GET /pki/ca.crt HTTP/1.1
Host: localhost:5000
```
**Response (200 OK):**
```
-----BEGIN CERTIFICATE-----
MIICxDCCAaygAwIBAgIUY...
-----END CERTIFICATE-----
```
Content-Type: `application/x-pem-file`
**Errors:**
| Code | Description |
|------|-------------|
| 404 | PKI not enabled or CA not initialized |
---
### POST /pki/issue
Issue a new client certificate (open registration).
**Request:**
```http
POST /pki/issue HTTP/1.1
Host: localhost:5000
Content-Type: application/json
```
**Response (201 Created):**
```json
{
"message": "Certificate issued",
"serial": "00000000000000000000000000000001",
"common_name": "alice",
"fingerprint_sha1": "b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3",
"created_at": 1700000000,
"expires_at": 1731536000,
"certificate_pem": "-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----\n",
"private_key_pem": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n"
}
```
**Errors:**
| Code | Description |
|------|-------------|
| 400 | common_name required |
| 404 | PKI not enabled or CA not initialized |
| 500 | Certificate issuance failed |
**Security Notes:**
- Private key is generated server-side and returned in response
- Store the private key securely; it is not recoverable
- The certificate can be used with nginx, HAProxy, or curl for mTLS
---
### GET /pki/certs
List certificates (own certificates only).
**Request (authenticated):**
```http
GET /pki/certs HTTP/1.1
Host: localhost:5000
X-SSL-Client-SHA1: a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2
```
**Response (200 OK):**
```json
{
"certificates": [
{
"serial": "00000000000000000000000000000001",
"common_name": "alice",
"fingerprint_sha1": "b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3",
"created_at": 1700000000,
"expires_at": 1731536000,
"status": "valid",
"issued_to": "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2"
}
],
"count": 1
}
```
**Notes:**
- Anonymous users receive an empty list
- Users see certificates they issued or certificates that match their fingerprint
---
### POST /pki/revoke/{serial}
Revoke a certificate by serial number. Requires authentication and ownership.
**Request:**
```http
POST /pki/revoke/00000000000000000000000000000001 HTTP/1.1
Host: localhost:5000
X-SSL-Client-SHA1: a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2
```
**Response (200 OK):**
```json
{
"message": "Certificate revoked",
"serial": "00000000000000000000000000000001"
}
```
**Errors:**
| Code | Description |
|------|-------------|
| 401 | Authentication required |
| 403 | Permission denied (not owner) |
| 404 | Certificate not found or PKI not enabled |
| 409 | Certificate already revoked |
**Authorization:**
- Must be authenticated
- Can revoke certificates you issued (issued_to matches your fingerprint)
- Can revoke your own certificate (fingerprint matches)
| 401 | Authentication required |
| 403 | Permission denied (not owner) |
| 404 | Certificate not found or PKI not enabled |
| 409 | Certificate already revoked |
**Authorization:**
- Must be authenticated
- Can revoke certificates you issued (issued_to matches your fingerprint)
- Can revoke your own certificate (fingerprint matches)

491
fpaste
View File

@@ -6,14 +6,20 @@ import base64
import hashlib import hashlib
import json import json
import os import os
import ssl
import sys import sys
import urllib.error import urllib.error
import urllib.request import urllib.request
from datetime import UTC, datetime, timedelta
from pathlib import Path from pathlib import Path
# Optional encryption support # Optional cryptography support (for encryption and cert generation)
try: try:
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.x509.oid import NameOID
HAS_CRYPTO = True HAS_CRYPTO = True
except ImportError: except ImportError:
@@ -25,6 +31,9 @@ def get_config():
config = { config = {
"server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000"), "server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000"),
"cert_sha1": os.environ.get("FLASKPASTE_CERT_SHA1", ""), "cert_sha1": os.environ.get("FLASKPASTE_CERT_SHA1", ""),
"client_cert": os.environ.get("FLASKPASTE_CLIENT_CERT", ""),
"client_key": os.environ.get("FLASKPASTE_CLIENT_KEY", ""),
"ca_cert": os.environ.get("FLASKPASTE_CA_CERT", ""),
} }
# Try config file # Try config file
@@ -40,17 +49,51 @@ def get_config():
config["server"] = value config["server"] = value
elif key == "cert_sha1": elif key == "cert_sha1":
config["cert_sha1"] = value config["cert_sha1"] = value
elif key == "client_cert":
config["client_cert"] = value
elif key == "client_key":
config["client_key"] = value
elif key == "ca_cert":
config["ca_cert"] = value
return config return config
def request(url, method="GET", data=None, headers=None): def create_ssl_context(config):
"""Create SSL context for mTLS if certificates are configured."""
client_cert = config.get("client_cert", "")
client_key = config.get("client_key", "")
ca_cert = config.get("ca_cert", "")
if not client_cert:
return None
ctx = ssl.create_default_context()
# Load CA certificate if specified
if ca_cert:
ctx.load_verify_locations(ca_cert)
# Load client certificate and key
try:
ctx.load_cert_chain(certfile=client_cert, keyfile=client_key or None)
except ssl.SSLError as e:
die(f"failed to load client certificate: {e}")
except FileNotFoundError as e:
die(f"certificate file not found: {e}")
return ctx
def request(url, method="GET", data=None, headers=None, ssl_context=None):
"""Make HTTP request and return response.""" """Make HTTP request and return response."""
headers = headers or {} headers = headers or {}
req = urllib.request.Request(url, data=data, headers=headers, method=method) # User-configured server URL, audit is expected
req = urllib.request.Request(url, data=data, headers=headers, method=method) # noqa: S310
try: try:
with urllib.request.urlopen(req, timeout=30) as resp: # User-configured server URL, audit is expected
with urllib.request.urlopen(req, timeout=30, context=ssl_context) as resp: # noqa: S310
return resp.status, resp.read(), dict(resp.headers) return resp.status, resp.read(), dict(resp.headers)
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
return e.code, e.read(), dict(e.headers) return e.code, e.read(), dict(e.headers)
@@ -120,11 +163,11 @@ def solve_pow(nonce, difficulty):
# Count leading zero bits # Count leading zero bits
zero_bits = 0 zero_bits = 0
for byte in hash_bytes[:target_bytes + 1]: for byte in hash_bytes[: target_bytes + 1]:
if byte == 0: if byte == 0:
zero_bits += 8 zero_bits += 8
else: else:
zero_bits += (8 - byte.bit_length()) zero_bits += 8 - byte.bit_length()
break break
if zero_bits >= difficulty: if zero_bits >= difficulty:
@@ -141,7 +184,7 @@ def solve_pow(nonce, difficulty):
def get_challenge(config): def get_challenge(config):
"""Fetch PoW challenge from server.""" """Fetch PoW challenge from server."""
url = config["server"].rstrip("/") + "/challenge" url = config["server"].rstrip("/") + "/challenge"
status, body, _ = request(url) status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status != 200: if status != 200:
return None return None
@@ -186,6 +229,18 @@ def cmd_create(args, config):
if config["cert_sha1"]: if config["cert_sha1"]:
headers["X-SSL-Client-SHA1"] = config["cert_sha1"] headers["X-SSL-Client-SHA1"] = config["cert_sha1"]
# Add burn-after-read header
if args.burn:
headers["X-Burn-After-Read"] = "true"
# Add custom expiry header
if args.expiry:
headers["X-Expiry"] = str(args.expiry)
# Add password header
if args.password:
headers["X-Paste-Password"] = args.password
# Get and solve PoW challenge if required # Get and solve PoW challenge if required
challenge = get_challenge(config) challenge = get_challenge(config)
if challenge: if challenge:
@@ -198,7 +253,9 @@ def cmd_create(args, config):
headers["X-PoW-Solution"] = str(solution) headers["X-PoW-Solution"] = str(solution)
url = config["server"].rstrip("/") + "/" url = config["server"].rstrip("/") + "/"
status, body, _ = request(url, method="POST", data=content, headers=headers) status, body, _ = request(
url, method="POST", data=content, headers=headers, ssl_context=config.get("ssl_context")
)
if status == 201: if status == 201:
data = json.loads(body) data = json.loads(body)
@@ -236,9 +293,14 @@ def cmd_get(args, config):
paste_id = url_input.split("/")[-1] # Handle full URLs paste_id = url_input.split("/")[-1] # Handle full URLs
base = config["server"].rstrip("/") base = config["server"].rstrip("/")
# Build headers for password-protected pastes
headers = {}
if args.password:
headers["X-Paste-Password"] = args.password
if args.meta: if args.meta:
url = f"{base}/{paste_id}" url = f"{base}/{paste_id}"
status, body, _ = request(url) status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context"))
if status == 200: if status == 200:
data = json.loads(body) data = json.loads(body)
print(f"id: {data['id']}") print(f"id: {data['id']}")
@@ -246,12 +308,19 @@ def cmd_get(args, config):
print(f"size: {data['size']}") print(f"size: {data['size']}")
print(f"created_at: {data['created_at']}") print(f"created_at: {data['created_at']}")
if encryption_key: if encryption_key:
print(f"encrypted: yes (key in URL)") print("encrypted: yes (key in URL)")
if data.get("password_protected"):
print("protected: yes (password required)")
elif status == 401:
die("password required (-p)")
elif status == 403:
die("invalid password")
else: else:
die(f"not found: {paste_id}") die(f"not found: {paste_id}")
else: else:
url = f"{base}/{paste_id}/raw" url = f"{base}/{paste_id}/raw"
status, body, headers = request(url) ssl_ctx = config.get("ssl_context")
status, body, _ = request(url, headers=headers, ssl_context=ssl_ctx)
if status == 200: if status == 200:
# Decrypt if encryption key was provided # Decrypt if encryption key was provided
if encryption_key: if encryption_key:
@@ -266,6 +335,10 @@ def cmd_get(args, config):
# Add newline if content doesn't end with one and stdout is tty # Add newline if content doesn't end with one and stdout is tty
if sys.stdout.isatty() and body and not body.endswith(b"\n"): if sys.stdout.isatty() and body and not body.endswith(b"\n"):
sys.stdout.buffer.write(b"\n") sys.stdout.buffer.write(b"\n")
elif status == 401:
die("password required (-p)")
elif status == 403:
die("invalid password")
else: else:
die(f"not found: {paste_id}") die(f"not found: {paste_id}")
@@ -280,7 +353,9 @@ def cmd_delete(args, config):
url = f"{base}/{paste_id}" url = f"{base}/{paste_id}"
headers = {"X-SSL-Client-SHA1": config["cert_sha1"]} headers = {"X-SSL-Client-SHA1": config["cert_sha1"]}
status, body, _ = request(url, method="DELETE", headers=headers) status, _, _ = request(
url, method="DELETE", headers=headers, ssl_context=config.get("ssl_context")
)
if status == 200: if status == 200:
print(f"deleted: {paste_id}") print(f"deleted: {paste_id}")
@@ -297,7 +372,7 @@ def cmd_delete(args, config):
def cmd_info(args, config): def cmd_info(args, config):
"""Show server info.""" """Show server info."""
url = config["server"].rstrip("/") + "/" url = config["server"].rstrip("/") + "/"
status, body, _ = request(url) status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status == 200: if status == 200:
data = json.loads(body) data = json.loads(body)
@@ -308,21 +383,337 @@ def cmd_info(args, config):
die("failed to connect to server") die("failed to connect to server")
def cmd_pki_status(args, config):
"""Show PKI status and CA information."""
url = config["server"].rstrip("/") + "/pki"
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status == 404:
die("PKI not enabled on this server")
elif status != 200:
die(f"failed to get PKI status ({status})")
data = json.loads(body)
print(f"pki enabled: {data.get('enabled', False)}")
print(f"ca exists: {data.get('ca_exists', False)}")
if data.get("ca_exists"):
print(f"common name: {data.get('common_name', 'unknown')}")
print(f"fingerprint: {data.get('fingerprint_sha1', 'unknown')}")
if data.get("created_at"):
print(f"created: {data.get('created_at')}")
if data.get("expires_at"):
print(f"expires: {data.get('expires_at')}")
print(f"download: {config['server'].rstrip('/')}{data.get('download', '/pki/ca.crt')}")
elif data.get("hint"):
print(f"hint: {data.get('hint')}")
def cmd_pki_issue(args, config):
"""Request a new client certificate from the server CA."""
url = config["server"].rstrip("/") + "/pki/issue"
headers = {"Content-Type": "application/json"}
if config["cert_sha1"]:
headers["X-SSL-Client-SHA1"] = config["cert_sha1"]
payload = {"common_name": args.name}
data = json.dumps(payload).encode()
status, body, _ = request(
url, method="POST", data=data, headers=headers, ssl_context=config.get("ssl_context")
)
if status == 404:
# Could be PKI disabled or no CA
try:
err = json.loads(body).get("error", "PKI not available")
except (json.JSONDecodeError, UnicodeDecodeError):
err = "PKI not available"
die(err)
elif status == 400:
try:
err = json.loads(body).get("error", "bad request")
except (json.JSONDecodeError, UnicodeDecodeError):
err = "bad request"
die(err)
elif status != 201:
die(f"certificate issuance failed ({status})")
result = json.loads(body)
# Determine output directory
out_dir = Path(args.output) if args.output else Path.home() / ".config" / "fpaste"
out_dir.mkdir(parents=True, exist_ok=True)
# File paths
key_file = out_dir / "client.key"
cert_file = out_dir / "client.crt"
# Check for existing files
if not args.force:
if key_file.exists():
die(f"key file exists: {key_file} (use --force)")
if cert_file.exists():
die(f"cert file exists: {cert_file} (use --force)")
# Write files
key_file.write_text(result["private_key_pem"])
key_file.chmod(0o600)
cert_file.write_text(result["certificate_pem"])
fingerprint = result.get("fingerprint_sha1", "unknown")
print(f"key: {key_file}", file=sys.stderr)
print(f"certificate: {cert_file}", file=sys.stderr)
print(f"fingerprint: {fingerprint}", file=sys.stderr)
print(f"serial: {result.get('serial', 'unknown')}", file=sys.stderr)
print(f"common name: {result.get('common_name', args.name)}", file=sys.stderr)
# Update config file if requested
if args.configure:
config_file = Path.home() / ".config" / "fpaste" / "config"
config_file.parent.mkdir(parents=True, exist_ok=True)
# Read existing config
existing = {}
if config_file.exists():
for line in config_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
existing[k.strip().lower()] = v.strip()
# Update values
existing["client_cert"] = str(cert_file)
existing["client_key"] = str(key_file)
existing["cert_sha1"] = fingerprint
# Write config
lines = [f"{k} = {v}" for k, v in sorted(existing.items())]
config_file.write_text("\n".join(lines) + "\n")
print(f"config: {config_file} (updated)", file=sys.stderr)
# Output fingerprint to stdout for easy capture
print(fingerprint)
def cmd_pki_download(args, config):
"""Download the CA certificate from the server."""
url = config["server"].rstrip("/") + "/pki/ca.crt"
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status == 404:
die("CA certificate not available (PKI disabled or CA not generated)")
elif status != 200:
die(f"failed to download CA certificate ({status})")
# Determine output
if args.output:
out_path = Path(args.output)
out_path.write_bytes(body)
print(f"saved: {out_path}", file=sys.stderr)
# Calculate and show fingerprint if cryptography available
if HAS_CRYPTO:
cert = x509.load_pem_x509_certificate(body)
# SHA1 is standard for X.509 fingerprints
fp = hashlib.sha1(cert.public_bytes(serialization.Encoding.DER)).hexdigest() # noqa: S324
print(f"fingerprint: {fp}", file=sys.stderr)
# Update config if requested
if args.configure:
config_file = Path.home() / ".config" / "fpaste" / "config"
config_file.parent.mkdir(parents=True, exist_ok=True)
existing = {}
if config_file.exists():
for line in config_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
existing[k.strip().lower()] = v.strip()
existing["ca_cert"] = str(out_path)
lines = [f"{k} = {v}" for k, v in sorted(existing.items())]
config_file.write_text("\n".join(lines) + "\n")
print(f"config: {config_file} (updated)", file=sys.stderr)
else:
# Output to stdout
sys.stdout.buffer.write(body)
def cmd_cert(args, config):
"""Generate a self-signed client certificate for mTLS authentication."""
if not HAS_CRYPTO:
die("certificate generation requires 'cryptography' package: pip install cryptography")
# Determine output directory
out_dir = Path(args.output) if args.output else Path.home() / ".config" / "fpaste"
out_dir.mkdir(parents=True, exist_ok=True)
# File paths
key_file = out_dir / "client.key"
cert_file = out_dir / "client.crt"
# Check for existing files
if not args.force:
if key_file.exists():
die(f"key file exists: {key_file} (use --force)")
if cert_file.exists():
die(f"cert file exists: {cert_file} (use --force)")
# Generate private key
if args.algorithm == "rsa":
key_size = args.bits or 4096
print(f"generating {key_size}-bit RSA key...", file=sys.stderr)
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
)
elif args.algorithm == "ec":
curve_name = args.curve or "secp384r1"
curves = {
"secp256r1": ec.SECP256R1(),
"secp384r1": ec.SECP384R1(),
"secp521r1": ec.SECP521R1(),
}
if curve_name not in curves:
die(f"unsupported curve: {curve_name} (use: secp256r1, secp384r1, secp521r1)")
print(f"generating EC key ({curve_name})...", file=sys.stderr)
private_key = ec.generate_private_key(curves[curve_name])
else:
die(f"unsupported algorithm: {args.algorithm}")
# Certificate subject
cn = args.name or os.environ.get("USER", "fpaste-client")
subject = issuer = x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, cn),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "FlaskPaste Client"),
]
)
# Validity period
days = args.days or 365
now = datetime.now(UTC)
# Build certificate
cert_builder = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + timedelta(days=days))
.add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True,
)
.add_extension(
x509.KeyUsage(
digital_signature=True,
key_encipherment=True,
content_commitment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False,
),
critical=True,
)
.add_extension(
x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]),
critical=False,
)
)
# Sign certificate
print("signing certificate...", file=sys.stderr)
certificate = cert_builder.sign(private_key, hashes.SHA256())
# Calculate SHA1 fingerprint (standard for X.509)
cert_der = certificate.public_bytes(serialization.Encoding.DER)
fingerprint = hashlib.sha1(cert_der).hexdigest() # noqa: S324
# Serialize private key
if args.password_key:
key_encryption = serialization.BestAvailableEncryption(args.password_key.encode("utf-8"))
else:
key_encryption = serialization.NoEncryption()
key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=key_encryption,
)
# Serialize certificate
cert_pem = certificate.public_bytes(serialization.Encoding.PEM)
# Write files
key_file.write_bytes(key_pem)
key_file.chmod(0o600) # Restrict permissions
cert_file.write_bytes(cert_pem)
print(f"key: {key_file}", file=sys.stderr)
print(f"certificate: {cert_file}", file=sys.stderr)
print(f"fingerprint: {fingerprint}", file=sys.stderr)
print(f"valid for: {days} days", file=sys.stderr)
print(f"common name: {cn}", file=sys.stderr)
# Update config file if requested
if args.configure:
config_file = Path.home() / ".config" / "fpaste" / "config"
config_file.parent.mkdir(parents=True, exist_ok=True)
# Read existing config
existing = {}
if config_file.exists():
for line in config_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
existing[k.strip().lower()] = v.strip()
# Update values
existing["client_cert"] = str(cert_file)
existing["client_key"] = str(key_file)
existing["cert_sha1"] = fingerprint
# Write config
lines = [f"{k} = {v}" for k, v in sorted(existing.items())]
config_file.write_text("\n".join(lines) + "\n")
print(f"config: {config_file} (updated)", file=sys.stderr)
# Output fingerprint to stdout for easy capture
print(fingerprint)
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog="fpaste", prog="fpaste",
description="FlaskPaste command-line client", description="FlaskPaste command-line client",
) )
parser.add_argument( parser.add_argument(
"-s", "--server", "-s",
help="server URL (default: $FLASKPASTE_SERVER or http://localhost:5000)", "--server",
help="server URL (env: FLASKPASTE_SERVER)",
) )
subparsers = parser.add_subparsers(dest="command", metavar="command") subparsers = parser.add_subparsers(dest="command", metavar="command")
# create # create
p_create = subparsers.add_parser("create", aliases=["c", "new"], help="create paste") p_create = subparsers.add_parser("create", aliases=["c", "new"], help="create paste")
p_create.add_argument("file", nargs="?", help="file to upload (- for stdin)") p_create.add_argument("file", nargs="?", help="file to upload (- for stdin)")
p_create.add_argument("-e", "--encrypt", action="store_true", help="encrypt content (E2E)") p_create.add_argument("-e", "--encrypt", action="store_true", help="encrypt content")
p_create.add_argument("-b", "--burn", action="store_true", help="burn after read")
p_create.add_argument("-x", "--expiry", type=int, metavar="SEC", help="expiry in seconds")
p_create.add_argument("-p", "--password", metavar="PASS", help="password protect")
p_create.add_argument("-r", "--raw", action="store_true", help="output raw URL") p_create.add_argument("-r", "--raw", action="store_true", help="output raw URL")
p_create.add_argument("-q", "--quiet", action="store_true", help="output ID only") p_create.add_argument("-q", "--quiet", action="store_true", help="output ID only")
@@ -330,6 +721,7 @@ def main():
p_get = subparsers.add_parser("get", aliases=["g"], help="retrieve paste") p_get = subparsers.add_parser("get", aliases=["g"], help="retrieve paste")
p_get.add_argument("id", help="paste ID or URL") p_get.add_argument("id", help="paste ID or URL")
p_get.add_argument("-o", "--output", help="save to file") p_get.add_argument("-o", "--output", help="save to file")
p_get.add_argument("-p", "--password", metavar="PASS", help="password for protected paste")
p_get.add_argument("-m", "--meta", action="store_true", help="show metadata only") p_get.add_argument("-m", "--meta", action="store_true", help="show metadata only")
# delete # delete
@@ -339,18 +731,73 @@ def main():
# info # info
subparsers.add_parser("info", aliases=["i"], help="show server info") subparsers.add_parser("info", aliases=["i"], help="show server info")
# cert
p_cert = subparsers.add_parser("cert", help="generate client certificate")
p_cert.add_argument("-o", "--output", metavar="DIR", help="output directory")
p_cert.add_argument(
"-a", "--algorithm", choices=["rsa", "ec"], default="ec", help="key algorithm (default: ec)"
)
p_cert.add_argument("-b", "--bits", type=int, metavar="N", help="RSA key size (default: 4096)")
p_cert.add_argument(
"-c", "--curve", metavar="CURVE", help="EC curve: secp256r1, secp384r1, secp521r1"
)
p_cert.add_argument("-d", "--days", type=int, metavar="N", help="validity period in days")
p_cert.add_argument("-n", "--name", metavar="CN", help="common name (default: $USER)")
p_cert.add_argument("--password-key", metavar="PASS", help="encrypt private key with password")
p_cert.add_argument(
"--configure", action="store_true", help="update config file with generated cert paths"
)
p_cert.add_argument("-f", "--force", action="store_true", help="overwrite existing files")
# pki (with subcommands)
p_pki = subparsers.add_parser("pki", help="PKI operations (server-issued certificates)")
pki_sub = p_pki.add_subparsers(dest="pki_command", metavar="subcommand")
# pki status
pki_sub.add_parser("status", help="show PKI status and CA info")
# pki issue
p_pki_issue = pki_sub.add_parser("issue", help="request certificate from server CA")
p_pki_issue.add_argument(
"-n", "--name", required=True, metavar="CN", help="common name for certificate (required)"
)
p_pki_issue.add_argument(
"-o", "--output", metavar="DIR", help="output directory (default: ~/.config/fpaste)"
)
p_pki_issue.add_argument(
"--configure", action="store_true", help="update config file with issued cert paths"
)
p_pki_issue.add_argument("-f", "--force", action="store_true", help="overwrite existing files")
# pki download
p_pki_download = pki_sub.add_parser("download", aliases=["dl"], help="download CA certificate")
p_pki_download.add_argument(
"-o", "--output", metavar="FILE", help="save to file (default: stdout)"
)
p_pki_download.add_argument(
"--configure",
action="store_true",
help="update config file with CA cert path (requires -o)",
)
args = parser.parse_args() args = parser.parse_args()
config = get_config() config = get_config()
if args.server: if args.server:
config["server"] = args.server config["server"] = args.server
# Create SSL context for mTLS if configured
config["ssl_context"] = create_ssl_context(config)
if not args.command: if not args.command:
# Default: create from stdin if data is piped # Default: create from stdin if data is piped
if not sys.stdin.isatty(): if not sys.stdin.isatty():
args.command = "create" args.command = "create"
args.file = None args.file = None
args.encrypt = False args.encrypt = False
args.burn = False
args.expiry = None
args.password = None
args.raw = False args.raw = False
args.quiet = False args.quiet = False
else: else:
@@ -365,6 +812,18 @@ def main():
cmd_delete(args, config) cmd_delete(args, config)
elif args.command in ("info", "i"): elif args.command in ("info", "i"):
cmd_info(args, config) cmd_info(args, config)
elif args.command == "cert":
cmd_cert(args, config)
elif args.command == "pki":
if args.pki_command == "status":
cmd_pki_status(args, config)
elif args.pki_command == "issue":
cmd_pki_issue(args, config)
elif args.pki_command in ("download", "dl"):
cmd_pki_download(args, config)
else:
# Show pki help if no subcommand
parser.parse_args(["pki", "--help"])
if __name__ == "__main__": if __name__ == "__main__":

500
tests/test_paste_options.py Normal file
View File

@@ -0,0 +1,500 @@
"""Tests for burn-after-read and custom expiry features."""
import time
import pytest
from app import create_app
from app.database import cleanup_expired_pastes
class TestBurnAfterRead:
"""Test burn-after-read paste functionality."""
@pytest.fixture
def app(self):
"""Create app for burn-after-read tests."""
return create_app("testing")
@pytest.fixture
def client(self, app):
"""Create test client."""
return app.test_client()
def test_create_burn_paste(self, client):
"""Creating a burn-after-read paste should succeed."""
response = client.post(
"/",
data=b"secret message",
headers={"X-Burn-After-Read": "true"},
)
assert response.status_code == 201
data = response.get_json()
assert data["burn_after_read"] is True
def test_burn_paste_deleted_after_raw_get(self, client):
"""Burn paste should be deleted after first GET /raw."""
# Create burn paste
response = client.post(
"/",
data=b"one-time secret",
headers={"X-Burn-After-Read": "true"},
)
paste_id = response.get_json()["id"]
# First GET should succeed
response = client.get(f"/{paste_id}/raw")
assert response.status_code == 200
assert response.data == b"one-time secret"
assert response.headers.get("X-Burn-After-Read") == "true"
# Second GET should fail (paste deleted)
response = client.get(f"/{paste_id}/raw")
assert response.status_code == 404
def test_burn_paste_metadata_does_not_trigger_burn(self, client):
"""GET metadata should not delete burn paste."""
# Create burn paste
response = client.post(
"/",
data=b"secret",
headers={"X-Burn-After-Read": "true"},
)
paste_id = response.get_json()["id"]
# Metadata GET should succeed and show burn flag
response = client.get(f"/{paste_id}")
assert response.status_code == 200
data = response.get_json()
assert data["burn_after_read"] is True
# Paste should still exist
response = client.get(f"/{paste_id}")
assert response.status_code == 200
# Raw GET should delete it
response = client.get(f"/{paste_id}/raw")
assert response.status_code == 200
# Now it's gone
response = client.get(f"/{paste_id}")
assert response.status_code == 404
def test_head_does_not_trigger_burn(self, client):
"""HEAD request should not delete burn paste."""
# Create burn paste
response = client.post(
"/",
data=b"secret",
headers={"X-Burn-After-Read": "true"},
)
paste_id = response.get_json()["id"]
# HEAD should succeed
response = client.head(f"/{paste_id}/raw")
assert response.status_code == 200
# Paste should still exist
response = client.get(f"/{paste_id}/raw")
assert response.status_code == 200
def test_burn_header_variations(self, client):
"""Different true values for X-Burn-After-Read should work."""
for value in ["true", "TRUE", "1", "yes", "YES"]:
response = client.post(
"/",
data=b"content",
headers={"X-Burn-After-Read": value},
)
data = response.get_json()
assert data.get("burn_after_read") is True, f"Failed for value: {value}"
def test_burn_header_false_values(self, client):
"""False values should not enable burn-after-read."""
for value in ["false", "0", "no", ""]:
response = client.post(
"/",
data=b"content",
headers={"X-Burn-After-Read": value},
)
data = response.get_json()
assert "burn_after_read" not in data, f"Should not be burn for: {value}"
class TestCustomExpiry:
"""Test custom expiry functionality."""
@pytest.fixture
def app(self):
"""Create app with short max expiry for testing."""
app = create_app("testing")
app.config["MAX_EXPIRY_SECONDS"] = 3600 # 1 hour max
app.config["PASTE_EXPIRY_SECONDS"] = 60 # 1 minute default
return app
@pytest.fixture
def client(self, app):
"""Create test client."""
return app.test_client()
def test_create_paste_with_custom_expiry(self, client):
"""Creating a paste with X-Expiry should set expires_at."""
response = client.post(
"/",
data=b"temporary content",
headers={"X-Expiry": "300"}, # 5 minutes
)
assert response.status_code == 201
data = response.get_json()
assert "expires_at" in data
# Should be approximately now + 300
now = int(time.time())
assert abs(data["expires_at"] - (now + 300)) < 5
def test_custom_expiry_capped_at_max(self, client):
"""Custom expiry should be capped at MAX_EXPIRY_SECONDS."""
response = client.post(
"/",
data=b"content",
headers={"X-Expiry": "999999"}, # Way more than max
)
assert response.status_code == 201
data = response.get_json()
assert "expires_at" in data
# Should be capped at 3600 seconds from now
now = int(time.time())
assert abs(data["expires_at"] - (now + 3600)) < 5
def test_expiry_shown_in_metadata(self, client):
"""Custom expiry should appear in paste metadata."""
response = client.post(
"/",
data=b"content",
headers={"X-Expiry": "600"},
)
paste_id = response.get_json()["id"]
response = client.get(f"/{paste_id}")
data = response.get_json()
assert "expires_at" in data
def test_invalid_expiry_ignored(self, client):
"""Invalid X-Expiry values should be ignored."""
for value in ["invalid", "-100", "0", ""]:
response = client.post(
"/",
data=b"content",
headers={"X-Expiry": value},
)
assert response.status_code == 201
data = response.get_json()
assert "expires_at" not in data, f"Should not have expiry for: {value}"
def test_paste_without_custom_expiry(self, client):
"""Paste without X-Expiry should not have expires_at."""
response = client.post("/", data=b"content")
assert response.status_code == 201
data = response.get_json()
assert "expires_at" not in data
class TestExpiryCleanup:
"""Test cleanup of expired pastes."""
@pytest.fixture
def app(self):
"""Create app with very short expiry for testing."""
app = create_app("testing")
app.config["PASTE_EXPIRY_SECONDS"] = 1 # 1 second default
app.config["MAX_EXPIRY_SECONDS"] = 10
return app
@pytest.fixture
def client(self, app):
"""Create test client."""
return app.test_client()
def test_cleanup_custom_expired_paste(self, app, client):
"""Paste with expired custom expiry should be cleaned up."""
# Create paste with 1 second expiry
response = client.post(
"/",
data=b"expiring soon",
headers={"X-Expiry": "1"},
)
paste_id = response.get_json()["id"]
# Should exist immediately
response = client.get(f"/{paste_id}")
assert response.status_code == 200
# Wait for expiry
time.sleep(2)
# Run cleanup
with app.app_context():
deleted = cleanup_expired_pastes()
assert deleted >= 1
# Should be gone
response = client.get(f"/{paste_id}")
assert response.status_code == 404
def test_cleanup_respects_default_expiry(self, app, client):
"""Paste without custom expiry should use default expiry."""
# Create paste without custom expiry
response = client.post("/", data=b"default expiry")
paste_id = response.get_json()["id"]
# Wait for default expiry (1 second in test config)
time.sleep(2)
# Run cleanup
with app.app_context():
deleted = cleanup_expired_pastes()
assert deleted >= 1
# Should be gone
response = client.get(f"/{paste_id}")
assert response.status_code == 404
def test_cleanup_keeps_unexpired_paste(self, app, client):
"""Paste with future custom expiry should not be cleaned up."""
# Create paste with long expiry
response = client.post(
"/",
data=b"not expiring soon",
headers={"X-Expiry": "10"}, # 10 seconds
)
paste_id = response.get_json()["id"]
# Run cleanup immediately
with app.app_context():
cleanup_expired_pastes()
# Should still exist
response = client.get(f"/{paste_id}")
assert response.status_code == 200
class TestCombinedOptions:
"""Test combinations of burn-after-read and custom expiry."""
@pytest.fixture
def app(self):
"""Create app for combined tests."""
return create_app("testing")
@pytest.fixture
def client(self, app):
"""Create test client."""
return app.test_client()
def test_burn_and_expiry_together(self, client):
"""Paste can have both burn-after-read and custom expiry."""
response = client.post(
"/",
data=b"secret with expiry",
headers={
"X-Burn-After-Read": "true",
"X-Expiry": "3600",
},
)
assert response.status_code == 201
data = response.get_json()
assert data["burn_after_read"] is True
assert "expires_at" in data
class TestPasswordProtection:
"""Test password-protected paste functionality."""
@pytest.fixture
def app(self):
"""Create app for password tests."""
return create_app("testing")
@pytest.fixture
def client(self, app):
"""Create test client."""
return app.test_client()
def test_create_password_protected_paste(self, client):
"""Creating a password-protected paste should succeed."""
response = client.post(
"/",
data=b"secret content",
headers={"X-Paste-Password": "mypassword123"},
)
assert response.status_code == 201
data = response.get_json()
assert data["password_protected"] is True
def test_get_protected_paste_without_password(self, client):
"""Accessing protected paste without password should return 401."""
# Create protected paste
response = client.post(
"/",
data=b"protected content",
headers={"X-Paste-Password": "secret"},
)
paste_id = response.get_json()["id"]
# Try to access without password
response = client.get(f"/{paste_id}")
assert response.status_code == 401
data = response.get_json()
assert data["password_protected"] is True
assert "Password required" in data["error"]
def test_get_protected_paste_with_wrong_password(self, client):
"""Accessing protected paste with wrong password should return 403."""
# Create protected paste
response = client.post(
"/",
data=b"protected content",
headers={"X-Paste-Password": "correctpassword"},
)
paste_id = response.get_json()["id"]
# Try with wrong password
response = client.get(
f"/{paste_id}",
headers={"X-Paste-Password": "wrongpassword"},
)
assert response.status_code == 403
data = response.get_json()
assert "Invalid password" in data["error"]
def test_get_protected_paste_with_correct_password(self, client):
"""Accessing protected paste with correct password should succeed."""
password = "supersecret123"
# Create protected paste
response = client.post(
"/",
data=b"protected content",
headers={"X-Paste-Password": password},
)
paste_id = response.get_json()["id"]
# Access with correct password
response = client.get(
f"/{paste_id}",
headers={"X-Paste-Password": password},
)
assert response.status_code == 200
data = response.get_json()
assert data["password_protected"] is True
def test_get_raw_protected_paste_without_password(self, client):
"""Getting raw content without password should return 401."""
response = client.post(
"/",
data=b"secret raw content",
headers={"X-Paste-Password": "secret"},
)
paste_id = response.get_json()["id"]
response = client.get(f"/{paste_id}/raw")
assert response.status_code == 401
def test_get_raw_protected_paste_with_correct_password(self, client):
"""Getting raw content with correct password should succeed."""
password = "mypassword"
response = client.post(
"/",
data=b"secret raw content",
headers={"X-Paste-Password": password},
)
paste_id = response.get_json()["id"]
response = client.get(
f"/{paste_id}/raw",
headers={"X-Paste-Password": password},
)
assert response.status_code == 200
assert response.data == b"secret raw content"
def test_password_too_long_rejected(self, client):
"""Password longer than 1024 chars should be rejected."""
long_password = "x" * 1025
response = client.post(
"/",
data=b"content",
headers={"X-Paste-Password": long_password},
)
assert response.status_code == 400
data = response.get_json()
assert "too long" in data["error"]
def test_unprotected_paste_accessible(self, client):
"""Unprotected paste should be accessible without password."""
response = client.post("/", data=b"public content")
paste_id = response.get_json()["id"]
response = client.get(f"/{paste_id}")
assert response.status_code == 200
assert "password_protected" not in response.get_json()
def test_password_with_special_chars(self, client):
"""Password with special characters should work."""
password = "p@ssw0rd!#$%^&*()_+-=[]{}|;':\",./<>?"
response = client.post(
"/",
data=b"special content",
headers={"X-Paste-Password": password},
)
paste_id = response.get_json()["id"]
response = client.get(
f"/{paste_id}",
headers={"X-Paste-Password": password},
)
assert response.status_code == 200
def test_password_with_unicode(self, client):
"""Password with unicode characters should work."""
password = "пароль密码🔐"
response = client.post(
"/",
data=b"unicode content",
headers={"X-Paste-Password": password},
)
paste_id = response.get_json()["id"]
response = client.get(
f"/{paste_id}",
headers={"X-Paste-Password": password},
)
assert response.status_code == 200
def test_password_combined_with_burn(self, client):
"""Password protection can be combined with burn-after-read."""
password = "secret"
response = client.post(
"/",
data=b"protected burn content",
headers={
"X-Paste-Password": password,
"X-Burn-After-Read": "true",
},
)
assert response.status_code == 201
data = response.get_json()
assert data["password_protected"] is True
assert data["burn_after_read"] is True
paste_id = data["id"]
# First access with password should succeed
response = client.get(
f"/{paste_id}/raw",
headers={"X-Paste-Password": password},
)
assert response.status_code == 200
# Second access should fail (burned)
response = client.get(
f"/{paste_id}/raw",
headers={"X-Paste-Password": password},
)
assert response.status_code == 404

371
tests/test_pki.py Normal file
View File

@@ -0,0 +1,371 @@
"""Tests for PKI (Certificate Authority) functionality."""
from datetime import UTC
import pytest
from app.pki import reset_pki
@pytest.fixture(autouse=True)
def reset_pki_state(app):
"""Reset PKI state and clear PKI database tables before each test."""
reset_pki()
# Clear PKI tables in database
with app.app_context():
from app.database import get_db
db = get_db()
db.execute("DELETE FROM issued_certificates")
db.execute("DELETE FROM certificate_authority")
db.commit()
yield
reset_pki()
class TestPKIStatus:
"""Test GET /pki endpoint."""
def test_pki_status_when_enabled(self, client):
"""PKI status shows enabled with no CA initially."""
response = client.get("/pki")
assert response.status_code == 200
data = response.get_json()
assert data["enabled"] is True
assert data["ca_exists"] is False
assert "hint" in data
def test_pki_status_after_ca_generation(self, client):
"""PKI status shows CA info after generation."""
# Generate CA first
client.post("/pki/ca", json={"common_name": "Test CA"})
response = client.get("/pki")
assert response.status_code == 200
data = response.get_json()
assert data["enabled"] is True
assert data["ca_exists"] is True
assert data["common_name"] == "Test CA"
assert "fingerprint_sha1" in data
assert len(data["fingerprint_sha1"]) == 40
class TestCAGeneration:
"""Test POST /pki/ca endpoint."""
def test_generate_ca_success(self, client):
"""CA can be generated with default name."""
response = client.post("/pki/ca")
assert response.status_code == 201
data = response.get_json()
assert data["message"] == "CA generated"
assert data["common_name"] == "FlaskPaste CA"
assert "fingerprint_sha1" in data
assert "created_at" in data
assert "expires_at" in data
assert data["download"] == "/pki/ca.crt"
def test_generate_ca_custom_name(self, client):
"""CA can be generated with custom name."""
response = client.post("/pki/ca", json={"common_name": "My Custom CA"})
assert response.status_code == 201
data = response.get_json()
assert data["common_name"] == "My Custom CA"
def test_generate_ca_twice_fails(self, client):
"""CA cannot be generated twice."""
# First generation succeeds
response = client.post("/pki/ca")
assert response.status_code == 201
# Second generation fails
response = client.post("/pki/ca")
assert response.status_code == 409
data = response.get_json()
assert "already exists" in data["error"]
class TestCADownload:
"""Test GET /pki/ca.crt endpoint."""
def test_download_ca_not_initialized(self, client):
"""Download fails when no CA exists."""
response = client.get("/pki/ca.crt")
assert response.status_code == 404
def test_download_ca_success(self, client):
"""CA certificate can be downloaded."""
# Generate CA first
client.post("/pki/ca", json={"common_name": "Test CA"})
response = client.get("/pki/ca.crt")
assert response.status_code == 200
assert response.content_type == "application/x-pem-file"
assert b"-----BEGIN CERTIFICATE-----" in response.data
assert b"-----END CERTIFICATE-----" in response.data
class TestCertificateIssuance:
"""Test POST /pki/issue endpoint."""
def test_issue_without_ca_fails(self, client):
"""Issuance fails when no CA exists."""
response = client.post("/pki/issue", json={"common_name": "alice"})
assert response.status_code == 404
def test_issue_without_name_fails(self, client):
"""Issuance fails without common_name."""
client.post("/pki/ca")
response = client.post("/pki/issue", json={})
assert response.status_code == 400
assert "common_name required" in response.get_json()["error"]
def test_issue_certificate_success(self, client):
"""Certificate issuance succeeds."""
client.post("/pki/ca")
response = client.post("/pki/issue", json={"common_name": "alice"})
assert response.status_code == 201
data = response.get_json()
assert data["message"] == "Certificate issued"
assert data["common_name"] == "alice"
assert "serial" in data
assert "fingerprint_sha1" in data
assert len(data["fingerprint_sha1"]) == 40
assert "certificate_pem" in data
assert "private_key_pem" in data
assert "-----BEGIN CERTIFICATE-----" in data["certificate_pem"]
assert "-----BEGIN PRIVATE KEY-----" in data["private_key_pem"]
def test_issue_multiple_certificates(self, client):
"""Multiple certificates can be issued."""
client.post("/pki/ca")
response1 = client.post("/pki/issue", json={"common_name": "alice"})
response2 = client.post("/pki/issue", json={"common_name": "bob"})
assert response1.status_code == 201
assert response2.status_code == 201
data1 = response1.get_json()
data2 = response2.get_json()
# Different serials and fingerprints
assert data1["serial"] != data2["serial"]
assert data1["fingerprint_sha1"] != data2["fingerprint_sha1"]
class TestCertificateListing:
"""Test GET /pki/certs endpoint."""
def test_list_anonymous_empty(self, client):
"""Anonymous users see empty list."""
client.post("/pki/ca")
response = client.get("/pki/certs")
assert response.status_code == 200
data = response.get_json()
assert data["certificates"] == []
assert data["count"] == 0
def test_list_authenticated_sees_own(self, client):
"""Authenticated users see certificates they issued."""
client.post("/pki/ca")
# Issue certificate as authenticated user
issuer_fingerprint = "a" * 40
client.post(
"/pki/issue",
json={"common_name": "alice"},
headers={"X-SSL-Client-SHA1": issuer_fingerprint},
)
# List as same user
response = client.get("/pki/certs", headers={"X-SSL-Client-SHA1": issuer_fingerprint})
assert response.status_code == 200
data = response.get_json()
assert data["count"] == 1
assert data["certificates"][0]["common_name"] == "alice"
class TestCertificateRevocation:
"""Test POST /pki/revoke/<serial> endpoint."""
def test_revoke_unauthenticated_fails(self, client):
"""Revocation requires authentication."""
client.post("/pki/ca")
issue_resp = client.post("/pki/issue", json={"common_name": "alice"})
serial = issue_resp.get_json()["serial"]
response = client.post(f"/pki/revoke/{serial}")
assert response.status_code == 401
def test_revoke_unauthorized_fails(self, client):
"""Revocation requires ownership."""
client.post("/pki/ca")
# Issue as one user
issue_resp = client.post(
"/pki/issue", json={"common_name": "alice"}, headers={"X-SSL-Client-SHA1": "a" * 40}
)
serial = issue_resp.get_json()["serial"]
# Try to revoke as different user
response = client.post(f"/pki/revoke/{serial}", headers={"X-SSL-Client-SHA1": "b" * 40})
assert response.status_code == 403
def test_revoke_as_issuer_succeeds(self, client):
"""Issuer can revoke certificate."""
client.post("/pki/ca")
issuer = "a" * 40
issue_resp = client.post(
"/pki/issue", json={"common_name": "alice"}, headers={"X-SSL-Client-SHA1": issuer}
)
serial = issue_resp.get_json()["serial"]
response = client.post(f"/pki/revoke/{serial}", headers={"X-SSL-Client-SHA1": issuer})
assert response.status_code == 200
assert response.get_json()["message"] == "Certificate revoked"
def test_revoke_nonexistent_fails(self, client):
"""Revoking nonexistent certificate fails."""
client.post("/pki/ca")
response = client.post("/pki/revoke/0" * 32, headers={"X-SSL-Client-SHA1": "a" * 40})
assert response.status_code == 404
def test_revoke_twice_fails(self, client):
"""Certificate cannot be revoked twice."""
client.post("/pki/ca")
issuer = "a" * 40
issue_resp = client.post(
"/pki/issue", json={"common_name": "alice"}, headers={"X-SSL-Client-SHA1": issuer}
)
serial = issue_resp.get_json()["serial"]
# First revocation succeeds
response = client.post(f"/pki/revoke/{serial}", headers={"X-SSL-Client-SHA1": issuer})
assert response.status_code == 200
# Second revocation fails
response = client.post(f"/pki/revoke/{serial}", headers={"X-SSL-Client-SHA1": issuer})
assert response.status_code == 409
class TestRevocationIntegration:
"""Test revocation affects authentication."""
def test_revoked_cert_treated_as_anonymous(self, client):
"""Revoked certificate is treated as anonymous."""
client.post("/pki/ca")
# Issue certificate
issuer = "a" * 40
issue_resp = client.post(
"/pki/issue", json={"common_name": "alice"}, headers={"X-SSL-Client-SHA1": issuer}
)
cert_fingerprint = issue_resp.get_json()["fingerprint_sha1"]
serial = issue_resp.get_json()["serial"]
# Create paste as authenticated user
create_resp = client.post(
"/", data=b"test content", headers={"X-SSL-Client-SHA1": cert_fingerprint}
)
assert create_resp.status_code == 201
paste_id = create_resp.get_json()["id"]
assert "owner" in create_resp.get_json()
# Revoke the certificate
client.post(f"/pki/revoke/{serial}", headers={"X-SSL-Client-SHA1": issuer})
# Try to delete paste with revoked cert - should fail
delete_resp = client.delete(f"/{paste_id}", headers={"X-SSL-Client-SHA1": cert_fingerprint})
assert delete_resp.status_code == 401
class TestPKICryptoFunctions:
"""Test standalone PKI cryptographic functions."""
def test_derive_key_consistency(self):
"""Key derivation produces consistent results."""
from app.pki import derive_key
password = "test-password"
salt = b"x" * 32
key1 = derive_key(password, salt)
key2 = derive_key(password, salt)
assert key1 == key2
assert len(key1) == 32
def test_encrypt_decrypt_roundtrip(self):
"""Private key encryption/decryption roundtrip."""
from cryptography.hazmat.primitives.asymmetric import ec
from app.pki import decrypt_private_key, encrypt_private_key
# Generate a test key
private_key = ec.generate_private_key(ec.SECP384R1())
password = "test-password"
# Encrypt
encrypted, salt = encrypt_private_key(private_key, password)
# Decrypt
decrypted = decrypt_private_key(encrypted, salt, password)
# Verify same key
assert private_key.private_numbers() == decrypted.private_numbers()
def test_wrong_password_fails(self):
"""Decryption with wrong password fails."""
from cryptography.hazmat.primitives.asymmetric import ec
from app.pki import (
InvalidPasswordError,
decrypt_private_key,
encrypt_private_key,
)
private_key = ec.generate_private_key(ec.SECP384R1())
encrypted, salt = encrypt_private_key(private_key, "correct")
with pytest.raises(InvalidPasswordError):
decrypt_private_key(encrypted, salt, "wrong")
def test_fingerprint_calculation(self):
"""Certificate fingerprint is calculated correctly."""
from datetime import datetime, timedelta
from cryptography import x509
# Minimal self-signed cert for testing
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.x509.oid import NameOID
from app.pki import calculate_fingerprint
key = ec.generate_private_key(ec.SECP256R1())
subject = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "test")])
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(subject)
.public_key(key.public_key())
.serial_number(1)
.not_valid_before(datetime.now(UTC))
.not_valid_after(datetime.now(UTC) + timedelta(days=1))
.sign(key, hashes.SHA256())
)
fingerprint = calculate_fingerprint(cert)
assert len(fingerprint) == 40
assert all(c in "0123456789abcdef" for c in fingerprint)