pki: first registered user gets admin rights
All checks were successful
CI / Lint & Format (push) Successful in 17s
CI / Security Scan (push) Successful in 21s
CI / Tests (push) Successful in 1m2s

Auto-detect first certificate issuance and grant admin flag.
Add is_admin column to issued_certificates table.
Add is_admin_certificate() helper function.
Include is_admin in /pki/issue response and X-Is-Admin header in registration.
This commit is contained in:
Username
2025-12-21 21:13:30 +01:00
parent 99e6a019f4
commit 2acf640d91
4 changed files with 61 additions and 2 deletions

View File

@@ -1043,6 +1043,7 @@ class RegisterView(MethodView):
response.headers["Content-Disposition"] = f'attachment; filename="{common_name}.p12"'
response.headers["X-Fingerprint-SHA1"] = cert_info["fingerprint_sha1"]
response.headers["X-Certificate-Expires"] = str(cert_info["expires_at"])
response.headers["X-Is-Admin"] = "1" if cert_info.get("is_admin") else "0"
return response
@@ -1611,6 +1612,7 @@ class PKIIssueView(MethodView):
"expires_at": cert_info["expires_at"],
"certificate_pem": cert_info["certificate_pem"],
"private_key_pem": cert_info["private_key_pem"],
"is_admin": cert_info.get("is_admin", False),
},
201,
)

View File

@@ -60,6 +60,7 @@ CREATE TABLE IF NOT EXISTS issued_certificates (
issued_to TEXT,
status TEXT NOT NULL DEFAULT 'valid',
revoked_at INTEGER,
is_admin INTEGER NOT NULL DEFAULT 0,
FOREIGN KEY(ca_id) REFERENCES certificate_authority(id) ON DELETE CASCADE
);

View File

@@ -898,6 +898,7 @@ def issue_certificate(
password: str,
days: int = 365,
issued_to: str | None = None,
is_admin: bool | None = None,
) -> dict:
"""Issue a client certificate signed by the CA.
@@ -906,6 +907,7 @@ def issue_certificate(
password: CA password for signing
days: Validity period in days
issued_to: Optional fingerprint of issuing user
is_admin: Admin flag (None = auto-detect first user)
Returns:
Dict with certificate and private key PEM
@@ -918,6 +920,13 @@ def issue_certificate(
db = get_db()
# Auto-detect: first registered user becomes admin
if is_admin is None:
count = db.execute(
"SELECT COUNT(*) FROM issued_certificates WHERE status = 'valid'"
).fetchone()[0]
is_admin = count == 0
# Load CA
ca_row = db.execute(
"""SELECT certificate_pem, private_key_encrypted, key_salt
@@ -1001,8 +1010,8 @@ def issue_certificate(
db.execute(
"""INSERT INTO issued_certificates
(serial, ca_id, common_name, fingerprint_sha1, certificate_pem,
created_at, expires_at, issued_to, status, revoked_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
created_at, expires_at, issued_to, status, revoked_at, is_admin)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
serial_hex,
"default",
@@ -1014,6 +1023,7 @@ def issue_certificate(
issued_to,
"valid",
None,
1 if is_admin else 0,
),
)
db.commit()
@@ -1026,9 +1036,31 @@ def issue_certificate(
"private_key_pem": key_pem,
"created_at": created_at,
"expires_at": expires_at,
"is_admin": is_admin,
}
def is_admin_certificate(fingerprint: str) -> bool:
"""Check if a certificate fingerprint belongs to an admin.
Args:
fingerprint: SHA1 fingerprint of the certificate
Returns:
True if the certificate holder is an admin
"""
from app.database import get_db
db = get_db()
row = db.execute(
"""SELECT is_admin FROM issued_certificates
WHERE fingerprint_sha1 = ? AND status = 'valid'""",
(fingerprint,),
).fetchone()
return bool(row and row["is_admin"])
def revoke_certificate(serial: str) -> bool:
"""Revoke a certificate by serial number.