diff --git a/app/api/routes.py b/app/api/routes.py index 7e3499d..8e5ba95 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -863,6 +863,7 @@ class IndexView(MethodView): f"DELETE {prefixed_url('/')}": "Delete paste (owner only)", f"GET {prefixed_url('/register/challenge')}": "Get registration challenge", f"POST {prefixed_url('/register')}": "Register for client certificate", + f"POST {prefixed_url('/csr')}": "Sign CSR (bring your own key)", } if pki_enabled: @@ -1414,6 +1415,121 @@ class RegisterView(MethodView): return response +class CSRView(MethodView): + """CSR signing endpoint for client-generated keys.""" + + def post(self) -> Response: + """Sign a Certificate Signing Request. + + Accepts a PEM-encoded CSR and returns a signed certificate. + Client keeps their private key - only the CSR is submitted. + + Requires PoW to prevent abuse. + + Request body: PEM-encoded CSR (Content-Type: application/x-pem-file or text/plain) + + Returns: + Signed certificate PEM with headers for fingerprint and expiry + """ + from app.pki import ( + CANotFoundError, + PKIError, + generate_ca, + get_ca_info, + sign_csr, + ) + + # Check PKI configuration + password = current_app.config.get("PKI_CA_PASSWORD", "") + if not password: + return error_response( + "CSR signing not available", + 503, + hint="PKI_CA_PASSWORD not configured", + ) + + # Verify PoW (same difficulty as registration) + register_difficulty = current_app.config.get("REGISTER_POW_DIFFICULTY", 24) + if register_difficulty > 0: + token = request.headers.get("X-PoW-Token", "") + solution = request.headers.get("X-PoW-Solution", "") + + if not token or not solution: + return error_response( + "Proof-of-work required", + 400, + hint="GET /register/challenge for a challenge", + difficulty=register_difficulty, + ) + + valid, err = verify_pow(token, solution, min_difficulty=register_difficulty) + if not valid: + current_app.logger.warning("CSR PoW failed: %s from=%s", err, request.remote_addr) + return error_response(f"Proof-of-work failed: {err}", 400) + + # Get CSR from request body + csr_pem = request.get_data(as_text=True) + if not csr_pem or "BEGIN CERTIFICATE REQUEST" not in csr_pem: + return error_response( + "CSR required", + 400, + hint="Submit PEM-encoded CSR in request body", + ) + + # Auto-generate CA if needed + ca_info = get_ca_info(skip_enabled_check=True) + if ca_info is None: + ca_days = current_app.config.get("PKI_CA_DAYS", 3650) + try: + ca_info = generate_ca("FlaskPaste CA", password, days=ca_days) + current_app.logger.info( + "CA auto-generated for CSR signing: fingerprint=%s", + ca_info["fingerprint_sha1"][:12], + ) + except PKIError as e: + current_app.logger.error("CA auto-generation failed: %s", e) + return error_response("CA generation failed", 500) + + # Sign CSR + try: + cert_days = current_app.config.get("PKI_CERT_DAYS", 365) + cert_info = sign_csr(csr_pem, password, days=cert_days) + except CANotFoundError: + return error_response("CA not available", 500) + except PKIError as e: + current_app.logger.warning("CSR signing failed: %s from=%s", e, request.remote_addr) + return error_response(f"CSR signing failed: {e}", 400) + + current_app.logger.info( + "CSR signed: cn=%s fingerprint=%s from=%s", + cert_info["common_name"], + cert_info["fingerprint_sha1"][:12], + request.remote_addr, + ) + log_event( + AuditEvent.CERT_ISSUED, + AuditOutcome.SUCCESS, + client_id=cert_info["fingerprint_sha1"], + client_ip=request.remote_addr, + details={ + "type": "csr", + "common_name": cert_info["common_name"], + "expires_at": cert_info["expires_at"], + }, + ) + + # Return certificate as PEM + response = Response(cert_info["certificate_pem"], mimetype="application/x-pem-file") + response.headers["Content-Disposition"] = ( + f'attachment; filename="{cert_info["common_name"]}.crt"' + ) + response.headers["X-Fingerprint-SHA1"] = cert_info["fingerprint_sha1"] + response.headers["X-Certificate-Expires"] = str(cert_info["expires_at"]) + response.headers["X-Certificate-Serial"] = cert_info["serial"] + response.headers["X-Is-Admin"] = "1" if cert_info.get("is_admin") else "0" + return response + + class ClientView(MethodView): """CLI client download endpoint.""" @@ -2276,6 +2392,7 @@ bp.add_url_rule( "/register/challenge", view_func=RegisterChallengeView.as_view("register_challenge") ) bp.add_url_rule("/register", view_func=RegisterView.as_view("register")) +bp.add_url_rule("/csr", view_func=CSRView.as_view("csr")) # Paste operations bp.add_url_rule("/pastes", view_func=PastesListView.as_view("pastes_list")) diff --git a/app/pki.py b/app/pki.py index d28d3ce..16ef095 100644 --- a/app/pki.py +++ b/app/pki.py @@ -1111,6 +1111,157 @@ def issue_certificate( } +def sign_csr( + csr_pem: str, + password: str, + days: int = 365, + issued_to: str | None = None, + is_admin: bool | None = None, +) -> dict: + """Sign a Certificate Signing Request with the CA. + + Unlike issue_certificate(), this function does not generate a private key. + The client keeps their private key and only submits the CSR. + + Args: + csr_pem: PEM-encoded CSR string + password: CA password for signing + days: Validity period in days + issued_to: Optional fingerprint of issuing user + is_admin: Admin flag (None = auto-detect first user) + + Returns: + Dict with signed certificate info (no private key) + + Raises: + CANotFoundError: If no CA exists + PKIError: If CSR is invalid or signing fails + """ + _require_crypto() + from app.database import get_db + + db = get_db() + + # Auto-detect: first registered user becomes admin + if is_admin is None: + count = db.execute( + "SELECT COUNT(*) FROM issued_certificates WHERE status = 'valid'" + ).fetchone()[0] + is_admin = count == 0 + + # Load and validate CSR + try: + csr = x509.load_pem_x509_csr(csr_pem.encode("utf-8")) + except Exception as e: + raise PKIError(f"Invalid CSR format: {e}") from None + + # Verify CSR signature (proves client has the private key) + if not csr.is_signature_valid: + raise PKIError("CSR signature is invalid") + + # Extract common name from CSR + try: + common_name = csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + except (IndexError, AttributeError): + raise PKIError("CSR must contain a Common Name (CN)") from None + + # Load CA + ca_row = db.execute( + """SELECT certificate_pem, private_key_encrypted, key_salt + FROM certificate_authority WHERE id = 'default'""" + ).fetchone() + + if ca_row is None: + raise CANotFoundError("No CA configured") + + # Decrypt CA private key + ca_key = decrypt_private_key( + ca_row["private_key_encrypted"], + ca_row["key_salt"], + password, + ) + ca_cert = x509.load_pem_x509_certificate(ca_row["certificate_pem"].encode("utf-8")) + + # Build certificate from CSR + now = datetime.now(UTC) + serial = _generate_unique_serial(db) + + cert_builder = ( + x509.CertificateBuilder() + .subject_name(csr.subject) + .issuer_name(ca_cert.subject) + .public_key(csr.public_key()) + .serial_number(serial) + .not_valid_before(now) + .not_valid_after(now + timedelta(days=days)) + .add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True, + ) + .add_extension( + x509.KeyUsage( + digital_signature=True, + key_encipherment=True, + content_commitment=False, + data_encipherment=False, + key_agreement=False, + key_cert_sign=False, + crl_sign=False, + encipher_only=False, + decipher_only=False, + ), + critical=True, + ) + .add_extension( + x509.ExtendedKeyUsage([ExtendedKeyUsageOID.CLIENT_AUTH]), + critical=False, + ) + ) + + certificate = cert_builder.sign(ca_key, hashes.SHA256()) + + # Serialize + cert_pem = certificate.public_bytes(serialization.Encoding.PEM).decode("utf-8") + + # Calculate fingerprint + fingerprint = calculate_fingerprint(certificate) + serial_hex = format(serial, "032x") + created_at = int(now.timestamp()) + expires_at = int((now + timedelta(days=days)).timestamp()) + + # Save to database + db.execute( + """INSERT INTO issued_certificates + (serial, ca_id, common_name, fingerprint_sha1, certificate_pem, + created_at, expires_at, issued_to, status, revoked_at, is_admin) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + serial_hex, + "default", + common_name, + fingerprint, + cert_pem, + created_at, + expires_at, + issued_to, + "valid", + None, + 1 if is_admin else 0, + ), + ) + db.commit() + + return { + "serial": serial_hex, + "common_name": common_name, + "fingerprint_sha1": fingerprint, + "certificate_pem": cert_pem, + "created_at": created_at, + "expires_at": expires_at, + "is_admin": is_admin, + } + + def is_admin_certificate(fingerprint: str) -> bool: """Check if a certificate fingerprint belongs to an admin. diff --git a/documentation/api.md b/documentation/api.md index b97f066..aa4a0b1 100644 --- a/documentation/api.md +++ b/documentation/api.md @@ -1229,6 +1229,122 @@ curl -H "X-SSL-Client-SHA1: $(openssl x509 -in client.crt -fingerprint -sha1 -no --- +### POST /csr + +Sign a Certificate Signing Request (CSR). Unlike `/register`, this endpoint allows clients to keep their private keys local. + +**How it works:** +1. Client generates a key pair locally +2. Client creates a CSR with their public key +3. Client submits CSR to server (with PoW if enabled) +4. Server signs CSR and returns the certificate +5. Client combines their private key with the signed certificate + +**Request (with PoW):** +```http +POST /csr HTTP/1.1 +Host: localhost:5000 +Content-Type: application/x-pem-file +X-PoW-Token: a1b2c3d4...:1700000300:24:signature +X-PoW-Solution: 12345678 + +-----BEGIN CERTIFICATE REQUEST----- +MIIBhDCB7gIBADBFMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExEjAQBgNVBAcM +... +-----END CERTIFICATE REQUEST----- +``` + +**Request (PoW disabled):** +```http +POST /csr HTTP/1.1 +Host: localhost:5000 +Content-Type: application/x-pem-file + +-----BEGIN CERTIFICATE REQUEST----- +... +-----END CERTIFICATE REQUEST----- +``` + +**Response (200 OK):** +```http +HTTP/1.1 200 OK +Content-Type: application/x-pem-file +Content-Disposition: attachment; filename="alice.crt" +X-Fingerprint-SHA1: b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3 +X-Certificate-Expires: 1731533400 +X-Certificate-Serial: 00000000000000000000000000000002 +X-Is-Admin: 0 + +-----BEGIN CERTIFICATE----- +MIICxDCCAaygAwIBAgIUY... +-----END CERTIFICATE----- +``` + +**Response Headers:** +| Header | Description | +|--------|-------------| +| `X-Fingerprint-SHA1` | SHA1 fingerprint for `X-SSL-Client-SHA1` header | +| `X-Certificate-Expires` | Unix timestamp when certificate expires | +| `X-Certificate-Serial` | Certificate serial number | +| `X-Is-Admin` | `1` if first user (admin), `0` otherwise | + +**Errors:** +| Code | Description | +|------|-------------| +| 400 | CSR required (empty body) | +| 400 | Invalid CSR format | +| 400 | CSR must contain a Common Name (CN) | +| 400 | CSR signature is invalid | +| 400 | Proof-of-work required (when enabled) | +| 400 | Proof-of-work failed (invalid/expired challenge) | +| 503 | PKI_CA_PASSWORD not configured | + +**Generating a CSR:** + +```bash +# Generate key pair and CSR with OpenSSL +openssl ecparam -genkey -name secp384r1 -out client.key +openssl req -new -key client.key -out client.csr -subj "/CN=alice" + +# Submit CSR to server +curl -X POST --data-binary @client.csr \ + -H "Content-Type: application/x-pem-file" \ + https://paste.example.com/csr > client.crt + +# Verify the certificate +openssl x509 -in client.crt -noout -subject -issuer +``` + +**Creating a PKCS#12 bundle:** + +```bash +# Combine key and certificate for browser/curl use +openssl pkcs12 -export -out client.p12 \ + -inkey client.key -in client.crt \ + -certfile ca.crt -passout pass: + +# Use with curl +curl --cert client.crt --key client.key https://paste.example.com/ +``` + +**Comparison with /register:** + +| Feature | /register | /csr | +|---------|-----------|------| +| Private key | Generated server-side | Generated client-side | +| Response format | PKCS#12 bundle | PEM certificate only | +| Key security | Transmitted over network | Never leaves client | +| Complexity | Simple (one step) | More steps required | + +**Notes:** +- Uses same PoW difficulty as `/register` (`FLASKPASTE_REGISTER_POW`) +- CA is auto-generated on first CSR if not present +- First user (via `/register` or `/csr`) becomes admin +- CSR must be signed (server validates signature) +- Common Name (CN) in CSR becomes certificate subject + +--- + ## Audit Logging FlaskPaste logs PKI certificate lifecycle events for compliance and forensics. diff --git a/tests/test_pki.py b/tests/test_pki.py index 807c9ff..514bcf0 100644 --- a/tests/test_pki.py +++ b/tests/test_pki.py @@ -621,6 +621,184 @@ class TestPKCS12Creation: assert loaded_key is not None +class TestCSRSigning: + """Test POST /csr endpoint for signing Certificate Signing Requests.""" + + def _generate_csr(self, common_name: str = "test-client") -> str: + """Generate a test CSR and return PEM string.""" + from cryptography import x509 + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives.serialization import Encoding + from cryptography.x509.oid import NameOID + + key = ec.generate_private_key(ec.SECP384R1()) + csr = ( + x509.CertificateSigningRequestBuilder() + .subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, common_name)])) + .sign(key, hashes.SHA256()) + ) + return csr.public_bytes(Encoding.PEM).decode("utf-8") + + def test_csr_requires_pow(self, client, app): + """CSR signing fails without PoW when difficulty > 0.""" + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 10 + + csr_pem = self._generate_csr() + response = client.post("/csr", data=csr_pem, content_type="application/x-pem-file") + assert response.status_code == 400 + assert "Proof-of-work required" in response.get_json()["error"] + + def test_csr_with_pow_disabled_succeeds(self, client, app): + """CSR signing succeeds without PoW when difficulty is 0.""" + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + csr_pem = self._generate_csr("my-client") + response = client.post("/csr", data=csr_pem, content_type="application/x-pem-file") + assert response.status_code == 200 + assert response.content_type == "application/x-pem-file" + assert b"-----BEGIN CERTIFICATE-----" in response.data + assert b"-----END CERTIFICATE-----" in response.data + assert "X-Fingerprint-SHA1" in response.headers + assert len(response.headers["X-Fingerprint-SHA1"]) == 40 + + def test_csr_auto_generates_ca(self, client, app): + """CSR signing auto-generates CA if not present.""" + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + # Verify no CA exists + from app.pki import get_ca_info + + assert get_ca_info() is None + + csr_pem = self._generate_csr() + response = client.post("/csr", data=csr_pem, content_type="application/x-pem-file") + assert response.status_code == 200 + + # Verify CA now exists + with app.app_context(): + from app.pki import get_ca_info + + ca_info = get_ca_info() + assert ca_info is not None + assert ca_info["common_name"] == "FlaskPaste CA" + + def test_csr_returns_valid_certificate(self, client, app): + """CSR signing returns certificate signed by CA.""" + from cryptography import x509 + from cryptography.x509.oid import NameOID + + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + csr_pem = self._generate_csr("verified-client") + response = client.post("/csr", data=csr_pem, content_type="application/x-pem-file") + assert response.status_code == 200 + + # Parse and verify certificate + cert = x509.load_pem_x509_certificate(response.data) + cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + assert cn == "verified-client" + + # Verify response headers match certificate + from app.pki import calculate_fingerprint + + fingerprint = calculate_fingerprint(cert) + assert response.headers["X-Fingerprint-SHA1"] == fingerprint + + def test_csr_without_body_fails(self, client, app): + """CSR signing fails without CSR in body.""" + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + response = client.post("/csr", data="", content_type="application/x-pem-file") + assert response.status_code == 400 + assert "CSR required" in response.get_json()["error"] + + def test_csr_with_invalid_pem_fails(self, client, app): + """CSR signing fails with invalid PEM.""" + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + response = client.post( + "/csr", + data="-----BEGIN CERTIFICATE REQUEST-----\ninvalid\n-----END CERTIFICATE REQUEST-----", + content_type="application/x-pem-file", + ) + assert response.status_code == 400 + assert "Invalid CSR format" in response.get_json()["error"] + + def test_csr_without_common_name_fails(self, client, app): + """CSR signing fails when CSR lacks Common Name.""" + from cryptography import x509 + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives.serialization import Encoding + + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + # Generate CSR without CN + key = ec.generate_private_key(ec.SECP384R1()) + csr = ( + x509.CertificateSigningRequestBuilder() + .subject_name(x509.Name([])) # Empty subject + .sign(key, hashes.SHA256()) + ) + csr_pem = csr.public_bytes(Encoding.PEM).decode("utf-8") + + response = client.post("/csr", data=csr_pem, content_type="application/x-pem-file") + assert response.status_code == 400 + assert "Common Name" in response.get_json()["error"] + + def test_csr_without_pki_password_fails(self, client, app): + """CSR signing fails when PKI_CA_PASSWORD not configured.""" + with app.app_context(): + app.config["PKI_CA_PASSWORD"] = "" + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + csr_pem = self._generate_csr() + response = client.post("/csr", data=csr_pem, content_type="application/x-pem-file") + assert response.status_code == 503 + assert "not available" in response.get_json()["error"] + + def test_csr_first_user_is_admin(self, client, app): + """First CSR-signed certificate gets admin rights.""" + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + csr_pem = self._generate_csr("admin-client") + response = client.post("/csr", data=csr_pem, content_type="application/x-pem-file") + assert response.status_code == 200 + assert response.headers["X-Is-Admin"] == "1" + + # Second CSR is not admin + csr_pem2 = self._generate_csr("user-client") + response2 = client.post("/csr", data=csr_pem2, content_type="application/x-pem-file") + assert response2.status_code == 200 + assert response2.headers["X-Is-Admin"] == "0" + + def test_csr_response_headers(self, client, app): + """CSR signing returns appropriate headers.""" + with app.app_context(): + app.config["REGISTER_POW_DIFFICULTY"] = 0 + + csr_pem = self._generate_csr("header-test") + response = client.post("/csr", data=csr_pem, content_type="application/x-pem-file") + assert response.status_code == 200 + + # Check all expected headers + assert "X-Fingerprint-SHA1" in response.headers + assert "X-Certificate-Expires" in response.headers + assert "X-Certificate-Serial" in response.headers + assert "X-Is-Admin" in response.headers + assert "Content-Disposition" in response.headers + assert "header-test.crt" in response.headers["Content-Disposition"] + + class TestAdminPrivileges: """Test admin privileges for list/delete operations."""