add /register endpoint for public certificate registration
Public endpoint allows anyone to obtain a client certificate for authentication. Features: - Higher PoW difficulty than paste creation (24 vs 20 bits) - Auto-generates CA on first registration if not present - Returns PKCS#12 bundle with cert, key, and CA - Configurable via FLASKPASTE_REGISTER_POW Endpoints: - GET /register/challenge - Get registration PoW challenge - POST /register - Register and receive PKCS#12 bundle
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
"""Tests for PKI (Certificate Authority) functionality."""
|
||||
|
||||
from datetime import UTC
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -369,3 +369,228 @@ class TestPKICryptoFunctions:
|
||||
|
||||
assert len(fingerprint) == 40
|
||||
assert all(c in "0123456789abcdef" for c in fingerprint)
|
||||
|
||||
|
||||
class TestRegistration:
|
||||
"""Test public certificate registration via /register endpoint."""
|
||||
|
||||
def test_register_challenge_returns_token(self, client, app):
|
||||
"""Registration challenge endpoint returns PoW token."""
|
||||
with app.app_context():
|
||||
app.config["REGISTER_POW_DIFFICULTY"] = 10
|
||||
|
||||
response = client.get("/register/challenge")
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data["enabled"] is True
|
||||
assert "nonce" in data
|
||||
assert "token" in data
|
||||
assert data["difficulty"] == 10
|
||||
assert data["purpose"] == "registration"
|
||||
|
||||
def test_register_challenge_disabled(self, client, app):
|
||||
"""Registration challenge shows disabled when difficulty is 0."""
|
||||
with app.app_context():
|
||||
app.config["REGISTER_POW_DIFFICULTY"] = 0
|
||||
|
||||
response = client.get("/register/challenge")
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data["enabled"] is False
|
||||
|
||||
def test_register_requires_pow(self, client, app):
|
||||
"""Registration fails without PoW when difficulty > 0."""
|
||||
with app.app_context():
|
||||
app.config["REGISTER_POW_DIFFICULTY"] = 10
|
||||
|
||||
response = client.post("/register", json={"common_name": "test"})
|
||||
assert response.status_code == 400
|
||||
assert "Proof-of-work required" in response.get_json()["error"]
|
||||
|
||||
def test_register_with_pow_disabled_succeeds(self, client, app):
|
||||
"""Registration succeeds without PoW when difficulty is 0."""
|
||||
with app.app_context():
|
||||
app.config["REGISTER_POW_DIFFICULTY"] = 0
|
||||
|
||||
response = client.post("/register", json={"common_name": "test-client"})
|
||||
assert response.status_code == 200
|
||||
assert response.content_type == "application/x-pkcs12"
|
||||
assert "X-Fingerprint-SHA1" in response.headers
|
||||
assert len(response.headers["X-Fingerprint-SHA1"]) == 40
|
||||
|
||||
def test_register_auto_generates_ca(self, client, app):
|
||||
"""Registration auto-generates CA if not present."""
|
||||
with app.app_context():
|
||||
app.config["REGISTER_POW_DIFFICULTY"] = 0
|
||||
|
||||
# Verify no CA exists
|
||||
from app.pki import get_ca_info
|
||||
|
||||
assert get_ca_info() is None
|
||||
|
||||
response = client.post("/register", json={"common_name": "first-client"})
|
||||
assert response.status_code == 200
|
||||
|
||||
# Verify CA now exists
|
||||
with app.app_context():
|
||||
from app.pki import get_ca_info
|
||||
|
||||
ca_info = get_ca_info()
|
||||
assert ca_info is not None
|
||||
assert ca_info["common_name"] == "FlaskPaste CA"
|
||||
|
||||
def test_register_returns_pkcs12(self, client, app):
|
||||
"""Registration returns valid PKCS#12 bundle."""
|
||||
from cryptography.hazmat.primitives.serialization import pkcs12
|
||||
|
||||
with app.app_context():
|
||||
app.config["REGISTER_POW_DIFFICULTY"] = 0
|
||||
|
||||
response = client.post("/register", json={"common_name": "my-client"})
|
||||
assert response.status_code == 200
|
||||
|
||||
# Verify PKCS#12 can be loaded
|
||||
p12_data = response.data
|
||||
private_key, certificate, additional_certs = pkcs12.load_key_and_certificates(
|
||||
p12_data, password=None
|
||||
)
|
||||
|
||||
assert private_key is not None
|
||||
assert certificate is not None
|
||||
# Should include CA certificate
|
||||
assert additional_certs is not None
|
||||
assert len(additional_certs) == 1
|
||||
|
||||
def test_register_generates_common_name(self, client, app):
|
||||
"""Registration generates random CN if not provided."""
|
||||
with app.app_context():
|
||||
app.config["REGISTER_POW_DIFFICULTY"] = 0
|
||||
|
||||
response = client.post("/register")
|
||||
assert response.status_code == 200
|
||||
|
||||
# CN is in the Content-Disposition header
|
||||
disposition = response.headers["Content-Disposition"]
|
||||
assert "client-" in disposition
|
||||
assert ".p12" in disposition
|
||||
|
||||
def test_register_respects_custom_common_name(self, client, app):
|
||||
"""Registration uses provided common name."""
|
||||
with app.app_context():
|
||||
app.config["REGISTER_POW_DIFFICULTY"] = 0
|
||||
|
||||
response = client.post("/register", json={"common_name": "custom-name"})
|
||||
assert response.status_code == 200
|
||||
|
||||
disposition = response.headers["Content-Disposition"]
|
||||
assert "custom-name.p12" in disposition
|
||||
|
||||
def test_register_without_pki_password_fails(self, client, app):
|
||||
"""Registration fails when PKI_CA_PASSWORD not configured."""
|
||||
with app.app_context():
|
||||
app.config["PKI_CA_PASSWORD"] = ""
|
||||
app.config["REGISTER_POW_DIFFICULTY"] = 0
|
||||
|
||||
response = client.post("/register", json={"common_name": "test"})
|
||||
assert response.status_code == 503
|
||||
assert "not available" in response.get_json()["error"]
|
||||
|
||||
|
||||
class TestPKCS12Creation:
|
||||
"""Test PKCS#12 bundle creation function."""
|
||||
|
||||
def test_create_pkcs12_without_password(self):
|
||||
"""PKCS#12 created without password can be loaded."""
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives.serialization import pkcs12
|
||||
from cryptography.x509.oid import NameOID
|
||||
|
||||
from app.pki import create_pkcs12
|
||||
|
||||
# Generate test keys and certs
|
||||
ca_key = ec.generate_private_key(ec.SECP384R1())
|
||||
client_key = ec.generate_private_key(ec.SECP384R1())
|
||||
|
||||
now = datetime.now(UTC)
|
||||
ca_cert = (
|
||||
x509.CertificateBuilder()
|
||||
.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "Test CA")]))
|
||||
.issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "Test CA")]))
|
||||
.public_key(ca_key.public_key())
|
||||
.serial_number(1)
|
||||
.not_valid_before(now)
|
||||
.not_valid_after(now + timedelta(days=365))
|
||||
.sign(ca_key, hashes.SHA256())
|
||||
)
|
||||
|
||||
client_cert = (
|
||||
x509.CertificateBuilder()
|
||||
.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "Client")]))
|
||||
.issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "Test CA")]))
|
||||
.public_key(client_key.public_key())
|
||||
.serial_number(2)
|
||||
.not_valid_before(now)
|
||||
.not_valid_after(now + timedelta(days=30))
|
||||
.sign(ca_key, hashes.SHA256())
|
||||
)
|
||||
|
||||
# Create PKCS#12
|
||||
p12_data = create_pkcs12(
|
||||
private_key=client_key,
|
||||
certificate=client_cert,
|
||||
ca_certificate=ca_cert,
|
||||
friendly_name="test-client",
|
||||
password=None,
|
||||
)
|
||||
|
||||
# Load and verify
|
||||
loaded_key, loaded_cert, loaded_cas = pkcs12.load_key_and_certificates(
|
||||
p12_data, password=None
|
||||
)
|
||||
|
||||
assert loaded_key is not None
|
||||
assert loaded_cert is not None
|
||||
assert loaded_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value == "Client"
|
||||
assert loaded_cas is not None
|
||||
assert len(loaded_cas) == 1
|
||||
|
||||
def test_create_pkcs12_with_password(self):
|
||||
"""PKCS#12 created with password requires password to load."""
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives.serialization import pkcs12
|
||||
from cryptography.x509.oid import NameOID
|
||||
|
||||
from app.pki import create_pkcs12
|
||||
|
||||
key = ec.generate_private_key(ec.SECP384R1())
|
||||
now = datetime.now(UTC)
|
||||
cert = (
|
||||
x509.CertificateBuilder()
|
||||
.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "Test")]))
|
||||
.issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "Test")]))
|
||||
.public_key(key.public_key())
|
||||
.serial_number(1)
|
||||
.not_valid_before(now)
|
||||
.not_valid_after(now + timedelta(days=1))
|
||||
.sign(key, hashes.SHA256())
|
||||
)
|
||||
|
||||
p12_data = create_pkcs12(
|
||||
private_key=key,
|
||||
certificate=cert,
|
||||
ca_certificate=cert,
|
||||
friendly_name="test",
|
||||
password=b"secret123",
|
||||
)
|
||||
|
||||
# Should fail without password
|
||||
with pytest.raises(ValueError):
|
||||
pkcs12.load_key_and_certificates(p12_data, password=None)
|
||||
|
||||
# Should succeed with correct password
|
||||
loaded_key, _, _ = pkcs12.load_key_and_certificates(p12_data, password=b"secret123")
|
||||
assert loaded_key is not None
|
||||
|
||||
Reference in New Issue
Block a user