diff --git a/.gitignore b/.gitignore index 430c7d6..20801d6 100644 --- a/.gitignore +++ b/.gitignore @@ -31,6 +31,7 @@ data/*.db-shm .credentials.json *.pem *.key +keys/ # Build dist/ diff --git a/README.md b/README.md index 0d44812..39ab0ec 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,8 @@ python run.py | `GET //raw` | Retrieve raw paste content | | `HEAD //raw` | Retrieve paste headers (no body) | | `DELETE /` | Delete paste (requires auth) | +| `GET /register/challenge` | Get PoW challenge for registration | +| `POST /register` | Register and get client certificate (PKCS#12) | ## Usage Examples @@ -230,6 +232,7 @@ Configuration via environment variables: | `FLASKPASTE_PROXY_SECRET` | (empty) | Shared secret for proxy trust validation | | `FLASKPASTE_POW_DIFFICULTY` | `20` | PoW difficulty (leading zero bits, 0=disabled) | | `FLASKPASTE_POW_TTL` | `300` (5 min) | PoW challenge validity period | +| `FLASKPASTE_REGISTER_POW` | `24` | Registration PoW difficulty (higher than paste creation) | | `FLASKPASTE_POW_SECRET` | (auto) | Secret for signing PoW challenges | | `FLASKPASTE_ANTIFLOOD` | `1` | Enable anti-flood (dynamic PoW difficulty) | | `FLASKPASTE_ANTIFLOOD_WINDOW` | `60` | Anti-flood measurement window (seconds) | diff --git a/app/api/routes.py b/app/api/routes.py index 63bb13d..94ea0d2 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -364,12 +364,19 @@ def get_pow_secret() -> bytes: return _pow_secret_cache -def generate_challenge() -> dict[str, Any]: +def generate_challenge(difficulty_override: int | None = None) -> dict[str, Any]: """Generate new PoW challenge with signed token. - Uses dynamic difficulty which may be elevated during high load. + Uses dynamic difficulty which may be elevated during high load, + unless difficulty_override is specified. + + Args: + difficulty_override: Optional fixed difficulty (for registration) """ - difficulty = get_dynamic_difficulty() + if difficulty_override is not None: + difficulty = difficulty_override + else: + difficulty = get_dynamic_difficulty() ttl = current_app.config["POW_CHALLENGE_TTL"] expires = int(time.time()) + ttl nonce = secrets.token_hex(16) @@ -385,14 +392,25 @@ def generate_challenge() -> dict[str, Any]: } -def verify_pow(token: str, solution: str) -> tuple[bool, str]: +def verify_pow( + token: str, solution: str, min_difficulty: int | None = None +) -> tuple[bool, str]: """Verify proof-of-work solution. Returns (valid, error_message). - Accepts tokens with difficulty >= base. The solution must meet the + Accepts tokens with difficulty >= min_difficulty. The solution must meet the token's embedded difficulty (which may be elevated due to anti-flood). + + Args: + token: PoW challenge token + solution: Nonce solution + min_difficulty: Minimum required difficulty (defaults to POW_DIFFICULTY) """ base_difficulty = current_app.config["POW_DIFFICULTY"] - if base_difficulty == 0: + if base_difficulty == 0 and min_difficulty is None: + return True, "" + + required_difficulty = min_difficulty if min_difficulty is not None else base_difficulty + if required_difficulty == 0: return True, "" # Parse token @@ -416,9 +434,9 @@ def verify_pow(token: str, solution: str) -> tuple[bool, str]: if int(time.time()) > expires: return False, "Challenge expired" - # Token difficulty must be at least base (anti-flood may have raised it) - if token_diff < base_difficulty: - return False, "Difficulty too low" + # Token difficulty must be at least the required difficulty + if token_diff < required_difficulty: + return False, f"Difficulty too low: {token_diff} < {required_difficulty}" # Verify solution try: @@ -806,6 +824,153 @@ class ChallengeView(MethodView): return json_response(response) +class RegisterChallengeView(MethodView): + """Registration PoW challenge endpoint (higher difficulty).""" + + def get(self) -> Response: + """Generate PoW challenge for registration (higher difficulty).""" + register_difficulty = current_app.config.get("REGISTER_POW_DIFFICULTY", 24) + if register_difficulty == 0: + return json_response({"enabled": False, "difficulty": 0}) + + ch = generate_challenge(difficulty_override=register_difficulty) + return json_response( + { + "enabled": True, + "nonce": ch["nonce"], + "difficulty": ch["difficulty"], + "expires": ch["expires"], + "token": ch["token"], + "purpose": "registration", + } + ) + + +class RegisterView(MethodView): + """Public client certificate registration endpoint.""" + + def post(self) -> Response: + """Register and obtain a client certificate. + + Requires PoW to prevent abuse. Returns PKCS#12 bundle with: + - Client certificate + - Client private key + - CA certificate + + Auto-generates CA if not present and PKI_CA_PASSWORD is configured. + """ + from cryptography import x509 + from cryptography.hazmat.primitives import serialization + + from app.pki import ( + CANotFoundError, + PKIError, + create_pkcs12, + generate_ca, + get_ca_info, + issue_certificate, + ) + + # Check PKI configuration + password = current_app.config.get("PKI_CA_PASSWORD", "") + if not password: + return error_response( + "Registration not available", + 503, + hint="PKI_CA_PASSWORD not configured", + ) + + # Verify PoW + register_difficulty = current_app.config.get("REGISTER_POW_DIFFICULTY", 24) + if register_difficulty > 0: + token = request.headers.get("X-PoW-Token", "") + solution = request.headers.get("X-PoW-Solution", "") + + if not token or not solution: + return error_response( + "Proof-of-work required", + 400, + hint="GET /register/challenge for a registration challenge", + difficulty=register_difficulty, + ) + + valid, err = verify_pow(token, solution, min_difficulty=register_difficulty) + if not valid: + current_app.logger.warning( + "Registration PoW failed: %s from=%s", err, request.remote_addr + ) + return error_response(f"Proof-of-work failed: {err}", 400) + + # Parse common_name from request + common_name = None + if request.is_json: + data = request.get_json(silent=True) + if data and isinstance(data.get("common_name"), str): + common_name = data["common_name"][:64].strip() + + if not common_name: + # Generate random common name if not provided + common_name = f"client-{secrets.token_hex(4)}" + + # Auto-generate CA if needed + if get_ca_info() is None: + ca_days = current_app.config.get("PKI_CA_DAYS", 3650) + try: + ca_info = generate_ca("FlaskPaste CA", password, days=ca_days) + current_app.logger.info( + "CA auto-generated for registration: fingerprint=%s", + ca_info["fingerprint_sha1"][:12], + ) + except PKIError as e: + current_app.logger.error("CA auto-generation failed: %s", e) + return error_response("CA generation failed", 500) + + # Issue certificate + try: + cert_days = current_app.config.get("PKI_CERT_DAYS", 365) + cert_info = issue_certificate(common_name, password, days=cert_days) + except CANotFoundError: + return error_response("CA not available", 500) + except PKIError as e: + current_app.logger.error("Certificate issuance failed: %s", e) + return error_response("Certificate issuance failed", 500) + + # Load certificates for PKCS#12 creation + ca_info = get_ca_info() + ca_cert = x509.load_pem_x509_certificate(ca_info["certificate_pem"].encode()) + client_cert = x509.load_pem_x509_certificate( + cert_info["certificate_pem"].encode() + ) + client_key = serialization.load_pem_private_key( + cert_info["private_key_pem"].encode(), password=None + ) + + # Create PKCS#12 bundle (no password for easy import) + p12_data = create_pkcs12( + private_key=client_key, + certificate=client_cert, + ca_certificate=ca_cert, + friendly_name=common_name, + password=None, + ) + + current_app.logger.info( + "Client registered: cn=%s fingerprint=%s from=%s", + common_name, + cert_info["fingerprint_sha1"][:12], + request.remote_addr, + ) + + # Return PKCS#12 as binary download + response = Response(p12_data, mimetype="application/x-pkcs12") + response.headers["Content-Disposition"] = ( + f'attachment; filename="{common_name}.p12"' + ) + response.headers["X-Fingerprint-SHA1"] = cert_info["fingerprint_sha1"] + response.headers["X-Certificate-Expires"] = str(cert_info["expires_at"]) + return response + + class ClientView(MethodView): """CLI client download endpoint.""" @@ -1481,6 +1646,12 @@ bp.add_url_rule("/health", view_func=HealthView.as_view("health")) bp.add_url_rule("/challenge", view_func=ChallengeView.as_view("challenge")) bp.add_url_rule("/client", view_func=ClientView.as_view("client")) +# Registration endpoints (public certificate issuance with PoW) +bp.add_url_rule( + "/register/challenge", view_func=RegisterChallengeView.as_view("register_challenge") +) +bp.add_url_rule("/register", view_func=RegisterView.as_view("register")) + # Paste operations bp.add_url_rule("/pastes", view_func=PastesListView.as_view("pastes_list")) bp.add_url_rule("/", view_func=PasteView.as_view("paste"), methods=["GET", "HEAD", "PUT"]) diff --git a/app/config.py b/app/config.py index 700fac3..96b9b8b 100644 --- a/app/config.py +++ b/app/config.py @@ -63,6 +63,8 @@ class Config: POW_CHALLENGE_TTL = int(os.environ.get("FLASKPASTE_POW_TTL", "300")) # 5 minutes # Secret key for signing challenges (auto-generated if not set) POW_SECRET = os.environ.get("FLASKPASTE_POW_SECRET", "") + # Registration PoW difficulty (higher than paste creation for security) + REGISTER_POW_DIFFICULTY = int(os.environ.get("FLASKPASTE_REGISTER_POW", "24")) # Anti-flood: dynamically increase PoW difficulty under load ANTIFLOOD_ENABLED = os.environ.get("FLASKPASTE_ANTIFLOOD", "1").lower() in ( diff --git a/app/pki.py b/app/pki.py index 01bb982..fe8dd70 100644 --- a/app/pki.py +++ b/app/pki.py @@ -31,6 +31,7 @@ try: from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + from cryptography.hazmat.primitives.serialization import pkcs12 from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID HAS_CRYPTO = True @@ -189,6 +190,49 @@ def calculate_fingerprint(certificate: Any) -> str: return hashlib.sha1(cert_der, usedforsecurity=False).hexdigest() +def create_pkcs12( + private_key: Any, + certificate: Any, + ca_certificate: Any, + friendly_name: str, + password: bytes | None = None, +) -> bytes: + """Create PKCS#12 bundle containing certificate, private key, and CA cert. + + Args: + private_key: Client private key object + certificate: Client certificate object + ca_certificate: CA certificate object + friendly_name: Friendly name for the certificate + password: Optional password for PKCS#12 encryption (None = no password) + + Returns: + PKCS#12 bytes (DER encoded) + """ + _require_crypto() + + # Build encryption algorithm - use strong encryption if password provided + if password: + encryption = ( + serialization.BestAvailableEncryption(password) + if password + else serialization.NoEncryption() + ) + else: + encryption = serialization.NoEncryption() + + # Serialize to PKCS#12 + p12_data = pkcs12.serialize_key_and_certificates( + name=friendly_name.encode("utf-8"), + key=private_key, + cert=certificate, + cas=[ca_certificate], + encryption_algorithm=encryption, + ) + + return p12_data + + class PKI: """Standalone PKI manager for CA and certificate operations. diff --git a/documentation/api.md b/documentation/api.md index c6d9691..5472a1e 100644 --- a/documentation/api.md +++ b/documentation/api.md @@ -1049,3 +1049,124 @@ X-SSL-Client-SHA1: a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2 - Must be authenticated - Can revoke certificates you issued (issued_to matches your fingerprint) - Can revoke your own certificate (fingerprint matches) + +--- + +### GET /register/challenge + +Get a proof-of-work challenge for public certificate registration. Returns a challenge with higher difficulty than paste creation. + +**Request:** +```http +GET /register/challenge HTTP/1.1 +Host: localhost:5000 +``` + +**Response (PoW disabled):** +```json +{ + "enabled": false, + "difficulty": 0 +} +``` + +**Response (PoW enabled):** +```json +{ + "enabled": true, + "nonce": "a1b2c3d4e5f6a7b8a1b2c3d4e5f6a7b8", + "difficulty": 24, + "expires": 1700000300, + "token": "a1b2c3d4...:1700000300:24:signature" +} +``` + +**Notes:** +- Registration difficulty defaults to 24 bits (vs 20 for paste creation) +- Higher difficulty protects against automated certificate harvesting +- Configurable via `FLASKPASTE_REGISTER_POW` environment variable + +--- + +### POST /register + +Register for a client certificate (public endpoint with PoW protection). + +This endpoint allows anyone to obtain a client certificate for authentication. The CA is auto-generated on first registration if it doesn't exist. + +**Request (with PoW):** +```http +POST /register HTTP/1.1 +Host: localhost:5000 +Content-Type: application/json +X-PoW-Token: a1b2c3d4...:1700000300:24:signature +X-PoW-Solution: 12345678 + +{"common_name": "alice"} +``` + +**Request (PoW disabled):** +```http +POST /register HTTP/1.1 +Host: localhost:5000 +Content-Type: application/json + +{"common_name": "bob"} +``` + +**Response (200 OK):** +```http +HTTP/1.1 200 OK +Content-Type: application/x-pkcs12 +Content-Disposition: attachment; filename="client.p12" +X-Certificate-Fingerprint: b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3 + + +``` + +**Response Headers:** +| Header | Description | +|--------|-------------| +| `X-Certificate-Fingerprint` | SHA1 fingerprint for `X-SSL-Client-SHA1` header | + +**PKCS#12 Bundle Contents:** +- Client certificate (signed by CA) +- Client private key (EC secp384r1) +- CA certificate (for trust chain) + +**Errors:** +| Code | Description | +|------|-------------| +| 400 | Proof-of-work required (when enabled) | +| 400 | Proof-of-work failed (invalid/expired challenge) | +| 500 | PKI_CA_PASSWORD not configured | +| 500 | Certificate generation failed | + +**Configuration:** +```bash +export FLASKPASTE_REGISTER_POW=24 # Registration PoW difficulty (0=disabled) +export FLASKPASTE_PKI_CA_PASSWORD="..." # Required for certificate signing +export FLASKPASTE_PKI_CERT_DAYS=365 # Client certificate validity +export FLASKPASTE_PKI_CA_DAYS=3650 # CA certificate validity (auto-generated) +``` + +**Using the Certificate:** + +```bash +# Extract certificate and key from PKCS#12 +openssl pkcs12 -in client.p12 -clcerts -nokeys -out client.crt +openssl pkcs12 -in client.p12 -nocerts -nodes -out client.key + +# Use with curl +curl --cert client.crt --key client.key https://paste.example.com/ + +# Use fingerprint for header-based auth (behind reverse proxy) +curl -H "X-SSL-Client-SHA1: $(openssl x509 -in client.crt -fingerprint -sha1 -noout | cut -d= -f2 | tr -d :)" \ + https://paste.example.com/pastes +``` + +**Notes:** +- `common_name` is optional; a random UUID is generated if omitted +- The PKCS#12 bundle has no password (empty password) +- CA is auto-generated on first registration if not present +- Private key is generated server-side and included in response diff --git a/tests/test_pki.py b/tests/test_pki.py index bf896bc..0eb6493 100644 --- a/tests/test_pki.py +++ b/tests/test_pki.py @@ -1,6 +1,6 @@ """Tests for PKI (Certificate Authority) functionality.""" -from datetime import UTC +from datetime import UTC, datetime, timedelta import pytest @@ -369,3 +369,228 @@ class TestPKICryptoFunctions: assert len(fingerprint) == 40 assert all(c in "0123456789abcdef" for c in fingerprint) + + +class TestRegistration: + """Test public certificate registration via /register endpoint.""" + + def test_register_challenge_returns_token(self, client, app): + """Registration challenge endpoint returns PoW token.""" + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 10 + + response = client.get("/register/challenge") + assert response.status_code == 200 + data = response.get_json() + assert data["enabled"] is True + assert "nonce" in data + assert "token" in data + assert data["difficulty"] == 10 + assert data["purpose"] == "registration" + + def test_register_challenge_disabled(self, client, app): + """Registration challenge shows disabled when difficulty is 0.""" + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + response = client.get("/register/challenge") + assert response.status_code == 200 + data = response.get_json() + assert data["enabled"] is False + + def test_register_requires_pow(self, client, app): + """Registration fails without PoW when difficulty > 0.""" + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 10 + + response = client.post("/register", json={"common_name": "test"}) + assert response.status_code == 400 + assert "Proof-of-work required" in response.get_json()["error"] + + def test_register_with_pow_disabled_succeeds(self, client, app): + """Registration succeeds without PoW when difficulty is 0.""" + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + response = client.post("/register", json={"common_name": "test-client"}) + assert response.status_code == 200 + assert response.content_type == "application/x-pkcs12" + assert "X-Fingerprint-SHA1" in response.headers + assert len(response.headers["X-Fingerprint-SHA1"]) == 40 + + def test_register_auto_generates_ca(self, client, app): + """Registration auto-generates CA if not present.""" + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + # Verify no CA exists + from app.pki import get_ca_info + + assert get_ca_info() is None + + response = client.post("/register", json={"common_name": "first-client"}) + assert response.status_code == 200 + + # Verify CA now exists + with app.app_context(): + from app.pki import get_ca_info + + ca_info = get_ca_info() + assert ca_info is not None + assert ca_info["common_name"] == "FlaskPaste CA" + + def test_register_returns_pkcs12(self, client, app): + """Registration returns valid PKCS#12 bundle.""" + from cryptography.hazmat.primitives.serialization import pkcs12 + + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + response = client.post("/register", json={"common_name": "my-client"}) + assert response.status_code == 200 + + # Verify PKCS#12 can be loaded + p12_data = response.data + private_key, certificate, additional_certs = pkcs12.load_key_and_certificates( + p12_data, password=None + ) + + assert private_key is not None + assert certificate is not None + # Should include CA certificate + assert additional_certs is not None + assert len(additional_certs) == 1 + + def test_register_generates_common_name(self, client, app): + """Registration generates random CN if not provided.""" + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + response = client.post("/register") + assert response.status_code == 200 + + # CN is in the Content-Disposition header + disposition = response.headers["Content-Disposition"] + assert "client-" in disposition + assert ".p12" in disposition + + def test_register_respects_custom_common_name(self, client, app): + """Registration uses provided common name.""" + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + response = client.post("/register", json={"common_name": "custom-name"}) + assert response.status_code == 200 + + disposition = response.headers["Content-Disposition"] + assert "custom-name.p12" in disposition + + def test_register_without_pki_password_fails(self, client, app): + """Registration fails when PKI_CA_PASSWORD not configured.""" + with app.app_context(): + app.config["PKI_CA_PASSWORD"] = "" + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + response = client.post("/register", json={"common_name": "test"}) + assert response.status_code == 503 + assert "not available" in response.get_json()["error"] + + +class TestPKCS12Creation: + """Test PKCS#12 bundle creation function.""" + + def test_create_pkcs12_without_password(self): + """PKCS#12 created without password can be loaded.""" + from cryptography import x509 + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives.serialization import pkcs12 + from cryptography.x509.oid import NameOID + + from app.pki import create_pkcs12 + + # Generate test keys and certs + ca_key = ec.generate_private_key(ec.SECP384R1()) + client_key = ec.generate_private_key(ec.SECP384R1()) + + now = datetime.now(UTC) + ca_cert = ( + x509.CertificateBuilder() + .subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "Test CA")])) + .issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "Test CA")])) + .public_key(ca_key.public_key()) + .serial_number(1) + .not_valid_before(now) + .not_valid_after(now + timedelta(days=365)) + .sign(ca_key, hashes.SHA256()) + ) + + client_cert = ( + x509.CertificateBuilder() + .subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "Client")])) + .issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "Test CA")])) + .public_key(client_key.public_key()) + .serial_number(2) + .not_valid_before(now) + .not_valid_after(now + timedelta(days=30)) + .sign(ca_key, hashes.SHA256()) + ) + + # Create PKCS#12 + p12_data = create_pkcs12( + private_key=client_key, + certificate=client_cert, + ca_certificate=ca_cert, + friendly_name="test-client", + password=None, + ) + + # Load and verify + loaded_key, loaded_cert, loaded_cas = pkcs12.load_key_and_certificates( + p12_data, password=None + ) + + assert loaded_key is not None + assert loaded_cert is not None + assert loaded_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value == "Client" + assert loaded_cas is not None + assert len(loaded_cas) == 1 + + def test_create_pkcs12_with_password(self): + """PKCS#12 created with password requires password to load.""" + from cryptography import x509 + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives.serialization import pkcs12 + from cryptography.x509.oid import NameOID + + from app.pki import create_pkcs12 + + key = ec.generate_private_key(ec.SECP384R1()) + now = datetime.now(UTC) + cert = ( + x509.CertificateBuilder() + .subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "Test")])) + .issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "Test")])) + .public_key(key.public_key()) + .serial_number(1) + .not_valid_before(now) + .not_valid_after(now + timedelta(days=1)) + .sign(key, hashes.SHA256()) + ) + + p12_data = create_pkcs12( + private_key=key, + certificate=cert, + ca_certificate=cert, + friendly_name="test", + password=b"secret123", + ) + + # Should fail without password + with pytest.raises(ValueError): + pkcs12.load_key_and_certificates(p12_data, password=None) + + # Should succeed with correct password + loaded_key, _, _ = pkcs12.load_key_and_certificates(p12_data, password=b"secret123") + assert loaded_key is not None