From 2acf640d91756f9562dc8ba44dd8e111431e9853 Mon Sep 17 00:00:00 2001 From: Username Date: Sun, 21 Dec 2025 21:13:30 +0100 Subject: [PATCH] pki: first registered user gets admin rights Auto-detect first certificate issuance and grant admin flag. Add is_admin column to issued_certificates table. Add is_admin_certificate() helper function. Include is_admin in /pki/issue response and X-Is-Admin header in registration. --- app/api/routes.py | 2 ++ app/database.py | 1 + app/pki.py | 36 ++++++++++++++++++++++++++++++++++-- tests/test_pki.py | 24 ++++++++++++++++++++++++ 4 files changed, 61 insertions(+), 2 deletions(-) diff --git a/app/api/routes.py b/app/api/routes.py index 5e7cb4a..684141c 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -1043,6 +1043,7 @@ class RegisterView(MethodView): response.headers["Content-Disposition"] = f'attachment; filename="{common_name}.p12"' response.headers["X-Fingerprint-SHA1"] = cert_info["fingerprint_sha1"] response.headers["X-Certificate-Expires"] = str(cert_info["expires_at"]) + response.headers["X-Is-Admin"] = "1" if cert_info.get("is_admin") else "0" return response @@ -1611,6 +1612,7 @@ class PKIIssueView(MethodView): "expires_at": cert_info["expires_at"], "certificate_pem": cert_info["certificate_pem"], "private_key_pem": cert_info["private_key_pem"], + "is_admin": cert_info.get("is_admin", False), }, 201, ) diff --git a/app/database.py b/app/database.py index 0a33223..ec8b0c6 100644 --- a/app/database.py +++ b/app/database.py @@ -60,6 +60,7 @@ CREATE TABLE IF NOT EXISTS issued_certificates ( issued_to TEXT, status TEXT NOT NULL DEFAULT 'valid', revoked_at INTEGER, + is_admin INTEGER NOT NULL DEFAULT 0, FOREIGN KEY(ca_id) REFERENCES certificate_authority(id) ON DELETE CASCADE ); diff --git a/app/pki.py b/app/pki.py index 20b0571..3330397 100644 --- a/app/pki.py +++ b/app/pki.py @@ -898,6 +898,7 @@ def issue_certificate( password: str, days: int = 365, issued_to: str | None = None, + is_admin: bool | None = None, ) -> dict: """Issue a client certificate signed by the CA. @@ -906,6 +907,7 @@ def issue_certificate( password: CA password for signing days: Validity period in days issued_to: Optional fingerprint of issuing user + is_admin: Admin flag (None = auto-detect first user) Returns: Dict with certificate and private key PEM @@ -918,6 +920,13 @@ def issue_certificate( db = get_db() + # Auto-detect: first registered user becomes admin + if is_admin is None: + count = db.execute( + "SELECT COUNT(*) FROM issued_certificates WHERE status = 'valid'" + ).fetchone()[0] + is_admin = count == 0 + # Load CA ca_row = db.execute( """SELECT certificate_pem, private_key_encrypted, key_salt @@ -1001,8 +1010,8 @@ def issue_certificate( db.execute( """INSERT INTO issued_certificates (serial, ca_id, common_name, fingerprint_sha1, certificate_pem, - created_at, expires_at, issued_to, status, revoked_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + created_at, expires_at, issued_to, status, revoked_at, is_admin) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( serial_hex, "default", @@ -1014,6 +1023,7 @@ def issue_certificate( issued_to, "valid", None, + 1 if is_admin else 0, ), ) db.commit() @@ -1026,9 +1036,31 @@ def issue_certificate( "private_key_pem": key_pem, "created_at": created_at, "expires_at": expires_at, + "is_admin": is_admin, } +def is_admin_certificate(fingerprint: str) -> bool: + """Check if a certificate fingerprint belongs to an admin. + + Args: + fingerprint: SHA1 fingerprint of the certificate + + Returns: + True if the certificate holder is an admin + """ + from app.database import get_db + + db = get_db() + row = db.execute( + """SELECT is_admin FROM issued_certificates + WHERE fingerprint_sha1 = ? AND status = 'valid'""", + (fingerprint,), + ).fetchone() + + return bool(row and row["is_admin"]) + + def revoke_certificate(serial: str) -> bool: """Revoke a certificate by serial number. diff --git a/tests/test_pki.py b/tests/test_pki.py index 20be0b7..67329bf 100644 --- a/tests/test_pki.py +++ b/tests/test_pki.py @@ -157,6 +157,30 @@ class TestCertificateIssuance: assert data1["serial"] != data2["serial"] assert data1["fingerprint_sha1"] != data2["fingerprint_sha1"] + def test_first_user_is_admin(self, app, client): + """First issued certificate gets admin rights.""" + from app.pki import is_admin_certificate + + client.post("/pki/ca") + + # First user becomes admin + response1 = client.post("/pki/issue", json={"common_name": "admin"}) + assert response1.status_code == 201 + data1 = response1.get_json() + assert data1.get("is_admin") is True + + with app.app_context(): + assert is_admin_certificate(data1["fingerprint_sha1"]) is True + + # Second user is not admin + response2 = client.post("/pki/issue", json={"common_name": "user"}) + assert response2.status_code == 201 + data2 = response2.get_json() + assert data2.get("is_admin") is False + + with app.app_context(): + assert is_admin_certificate(data2["fingerprint_sha1"]) is False + class TestCertificateListing: """Test GET /pki/certs endpoint."""