add /csr endpoint for CSR signing
Some checks failed
CI / Lint & Format (push) Failing after 16s
CI / Unit Tests (push) Has been skipped
CI / Memory Leak Check (push) Has been skipped
CI / SBOM Generation (push) Has been skipped
CI / Security Scan (push) Successful in 20s
CI / Security Tests (push) Has been skipped
CI / Advanced Security Tests (push) Has been skipped

Allow clients to submit Certificate Signing Requests instead of
having the server generate private keys. Client keeps key local.

- sign_csr() in pki.py validates and signs CSRs
- POST /csr endpoint with PoW protection
- 10 new tests for CSR functionality
- API documentation updated
This commit is contained in:
Username
2025-12-26 21:44:29 +01:00
parent 6da80aec76
commit 30bd02663b
4 changed files with 562 additions and 0 deletions

View File

@@ -1111,6 +1111,157 @@ def issue_certificate(
}
def sign_csr(
csr_pem: str,
password: str,
days: int = 365,
issued_to: str | None = None,
is_admin: bool | None = None,
) -> dict:
"""Sign a Certificate Signing Request with the CA.
Unlike issue_certificate(), this function does not generate a private key.
The client keeps their private key and only submits the CSR.
Args:
csr_pem: PEM-encoded CSR string
password: CA password for signing
days: Validity period in days
issued_to: Optional fingerprint of issuing user
is_admin: Admin flag (None = auto-detect first user)
Returns:
Dict with signed certificate info (no private key)
Raises:
CANotFoundError: If no CA exists
PKIError: If CSR is invalid or signing fails
"""
_require_crypto()
from app.database import get_db
db = get_db()
# Auto-detect: first registered user becomes admin
if is_admin is None:
count = db.execute(
"SELECT COUNT(*) FROM issued_certificates WHERE status = 'valid'"
).fetchone()[0]
is_admin = count == 0
# Load and validate CSR
try:
csr = x509.load_pem_x509_csr(csr_pem.encode("utf-8"))
except Exception as e:
raise PKIError(f"Invalid CSR format: {e}") from None
# Verify CSR signature (proves client has the private key)
if not csr.is_signature_valid:
raise PKIError("CSR signature is invalid")
# Extract common name from CSR
try:
common_name = csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
except (IndexError, AttributeError):
raise PKIError("CSR must contain a Common Name (CN)") from None
# Load CA
ca_row = db.execute(
"""SELECT certificate_pem, private_key_encrypted, key_salt
FROM certificate_authority WHERE id = 'default'"""
).fetchone()
if ca_row is None:
raise CANotFoundError("No CA configured")
# Decrypt CA private key
ca_key = decrypt_private_key(
ca_row["private_key_encrypted"],
ca_row["key_salt"],
password,
)
ca_cert = x509.load_pem_x509_certificate(ca_row["certificate_pem"].encode("utf-8"))
# Build certificate from CSR
now = datetime.now(UTC)
serial = _generate_unique_serial(db)
cert_builder = (
x509.CertificateBuilder()
.subject_name(csr.subject)
.issuer_name(ca_cert.subject)
.public_key(csr.public_key())
.serial_number(serial)
.not_valid_before(now)
.not_valid_after(now + timedelta(days=days))
.add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True,
)
.add_extension(
x509.KeyUsage(
digital_signature=True,
key_encipherment=True,
content_commitment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False,
),
critical=True,
)
.add_extension(
x509.ExtendedKeyUsage([ExtendedKeyUsageOID.CLIENT_AUTH]),
critical=False,
)
)
certificate = cert_builder.sign(ca_key, hashes.SHA256())
# Serialize
cert_pem = certificate.public_bytes(serialization.Encoding.PEM).decode("utf-8")
# Calculate fingerprint
fingerprint = calculate_fingerprint(certificate)
serial_hex = format(serial, "032x")
created_at = int(now.timestamp())
expires_at = int((now + timedelta(days=days)).timestamp())
# Save to database
db.execute(
"""INSERT INTO issued_certificates
(serial, ca_id, common_name, fingerprint_sha1, certificate_pem,
created_at, expires_at, issued_to, status, revoked_at, is_admin)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
serial_hex,
"default",
common_name,
fingerprint,
cert_pem,
created_at,
expires_at,
issued_to,
"valid",
None,
1 if is_admin else 0,
),
)
db.commit()
return {
"serial": serial_hex,
"common_name": common_name,
"fingerprint_sha1": fingerprint,
"certificate_pem": cert_pem,
"created_at": created_at,
"expires_at": expires_at,
"is_admin": is_admin,
}
def is_admin_certificate(fingerprint: str) -> bool:
"""Check if a certificate fingerprint belongs to an admin.