diff --git a/app/api/routes.py b/app/api/routes.py index 684141c..8e9e934 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -376,6 +376,25 @@ def get_client_id() -> str | None: return fingerprint +def is_admin() -> bool: + """Check if current authenticated user is an admin. + + Returns True only if: + - User has a valid client certificate + - Certificate is marked as admin in the PKI database + """ + client_id = get_client_id() + if not client_id: + return False + + if not current_app.config.get("PKI_ENABLED"): + return False + + from app.pki import is_admin_certificate + + return is_admin_certificate(client_id) + + # ───────────────────────────────────────────────────────────────────────────── # Proof-of-Work # ───────────────────────────────────────────────────────────────────────────── @@ -1286,7 +1305,7 @@ class PasteDeleteView(MethodView): """Paste deletion with authentication.""" def delete(self, paste_id: str) -> Response: - """Delete paste. Requires ownership.""" + """Delete paste. Requires ownership or admin rights.""" # Validate if err := validate_paste_id(paste_id): return err @@ -1300,7 +1319,8 @@ class PasteDeleteView(MethodView): if row is None: return error_response("Paste not found", 404) - if row["owner"] != g.client_id: + # Allow if owner or admin + if row["owner"] != g.client_id and not is_admin(): return error_response("Permission denied", 403) db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,)) @@ -1310,15 +1330,15 @@ class PasteDeleteView(MethodView): class PastesListView(MethodView): - """List authenticated user's pastes (privacy-focused).""" + """List pastes with authentication.""" def get(self) -> Response: - """List pastes owned by authenticated user. + """List pastes owned by authenticated user, or all pastes for admins. Privacy guarantees: - Requires authentication (mTLS client certificate) - - Users can ONLY see their own pastes - - No admin bypass or cross-user visibility + - Regular users can ONLY see their own pastes + - Admins can see all pastes (with optional owner filter) - Content is never returned, only metadata Query parameters: @@ -1327,6 +1347,8 @@ class PastesListView(MethodView): - type: filter by MIME type (glob pattern, e.g., "image/*") - after: filter by created_at >= timestamp - before: filter by created_at <= timestamp + - all: (admin only) if "1", list all pastes instead of own + - owner: (admin only) filter by owner fingerprint """ import fnmatch @@ -1335,6 +1357,7 @@ class PastesListView(MethodView): return err client_id = g.client_id + user_is_admin = is_admin() # Parse pagination parameters try: @@ -1354,11 +1377,27 @@ class PastesListView(MethodView): except (ValueError, TypeError): before_ts = 0 + # Admin-only parameters + show_all = request.args.get("all", "0") == "1" and user_is_admin + owner_filter = request.args.get("owner", "").strip() if user_is_admin else "" + db = get_db() # Build query with filters - where_clauses = ["owner = ?"] - params: list[Any] = [client_id] + where_clauses: list[str] = [] + params: list[Any] = [] + + # Owner filtering logic + if show_all: + # Admin viewing all pastes (with optional owner filter) + if owner_filter: + where_clauses.append("owner = ?") + params.append(owner_filter) + # else: no owner filter, show all + else: + # Regular user or admin without ?all=1: show only own pastes + where_clauses.append("owner = ?") + params.append(client_id) if after_ts > 0: where_clauses.append("created_at >= ?") @@ -1367,7 +1406,8 @@ class PastesListView(MethodView): where_clauses.append("created_at <= ?") params.append(before_ts) - where_sql = " AND ".join(where_clauses) + # Build WHERE clause (may be empty for admin viewing all) + where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" # Count total pastes matching filters (where_sql is safe, built from constants) count_row = db.execute( @@ -1377,8 +1417,9 @@ class PastesListView(MethodView): total = count_row["total"] if count_row else 0 # Fetch pastes with metadata only (where_sql is safe, built from constants) + # Include owner for admin view rows = db.execute( - f"""SELECT id, mime_type, length(content) as size, created_at, + f"""SELECT id, owner, mime_type, length(content) as size, created_at, last_accessed, burn_after_read, expires_at, CASE WHEN password_hash IS NOT NULL THEN 1 ELSE 0 END as password_protected FROM pastes @@ -1403,6 +1444,9 @@ class PastesListView(MethodView): "url": f"/{row['id']}", "raw": f"/{row['id']}/raw", } + # Include owner for admin view + if show_all and row["owner"]: + paste["owner"] = row["owner"] if row["burn_after_read"]: paste["burn_after_read"] = True if row["expires_at"]: @@ -1411,15 +1455,16 @@ class PastesListView(MethodView): paste["password_protected"] = True pastes.append(paste) - return json_response( - { - "pastes": pastes, - "count": len(pastes), - "total": total, - "limit": limit, - "offset": offset, - } - ) + response_data: dict[str, Any] = { + "pastes": pastes, + "count": len(pastes), + "total": total, + "limit": limit, + "offset": offset, + } + if user_is_admin: + response_data["is_admin"] = True + return json_response(response_data) # ───────────────────────────────────────────────────────────────────────────── diff --git a/tests/test_pki.py b/tests/test_pki.py index 67329bf..807c9ff 100644 --- a/tests/test_pki.py +++ b/tests/test_pki.py @@ -619,3 +619,98 @@ class TestPKCS12Creation: # Should succeed with correct password loaded_key, _, _ = pkcs12.load_key_and_certificates(p12_data, password=b"secret123") assert loaded_key is not None + + +class TestAdminPrivileges: + """Test admin privileges for list/delete operations.""" + + def test_admin_can_list_all_pastes(self, client): + """Admin can list all pastes with ?all=1.""" + # Setup: Create CA and issue admin cert (first user) + client.post("/pki/ca") + admin_resp = client.post("/pki/issue", json={"common_name": "admin"}) + admin_fp = admin_resp.get_json()["fingerprint_sha1"] + + # Issue non-admin cert (second user) + user_resp = client.post("/pki/issue", json={"common_name": "user"}) + user_fp = user_resp.get_json()["fingerprint_sha1"] + + # User creates a paste + paste_resp = client.post("/", data=b"user content", headers={"X-SSL-Client-SHA1": user_fp}) + assert paste_resp.status_code == 201 + + # User can only see their own pastes + user_list = client.get("/pastes", headers={"X-SSL-Client-SHA1": user_fp}) + assert user_list.status_code == 200 + assert user_list.get_json()["count"] == 1 + assert "is_admin" not in user_list.get_json() + + # Admin lists only their own by default (no pastes) + admin_list = client.get("/pastes", headers={"X-SSL-Client-SHA1": admin_fp}) + assert admin_list.status_code == 200 + assert admin_list.get_json()["count"] == 0 + assert admin_list.get_json()["is_admin"] is True + + # Admin lists all with ?all=1 + admin_all = client.get("/pastes?all=1", headers={"X-SSL-Client-SHA1": admin_fp}) + assert admin_all.status_code == 200 + assert admin_all.get_json()["count"] == 1 + # Includes owner info + assert "owner" in admin_all.get_json()["pastes"][0] + assert admin_all.get_json()["pastes"][0]["owner"] == user_fp + + def test_non_admin_cannot_use_all_param(self, client): + """Non-admin ?all=1 is ignored.""" + client.post("/pki/ca") + # First user is admin + client.post("/pki/issue", json={"common_name": "admin"}) + # Second user is not + user_resp = client.post("/pki/issue", json={"common_name": "user"}) + user_fp = user_resp.get_json()["fingerprint_sha1"] + + # User tries ?all=1, should be ignored + resp = client.get("/pastes?all=1", headers={"X-SSL-Client-SHA1": user_fp}) + assert resp.status_code == 200 + assert "is_admin" not in resp.get_json() + + def test_admin_can_delete_any_paste(self, client): + """Admin can delete pastes owned by others.""" + client.post("/pki/ca") + admin_resp = client.post("/pki/issue", json={"common_name": "admin"}) + admin_fp = admin_resp.get_json()["fingerprint_sha1"] + + user_resp = client.post("/pki/issue", json={"common_name": "user"}) + user_fp = user_resp.get_json()["fingerprint_sha1"] + + # User creates a paste + paste_resp = client.post("/", data=b"user content", headers={"X-SSL-Client-SHA1": user_fp}) + paste_id = paste_resp.get_json()["id"] + + # Admin deletes it + delete_resp = client.delete(f"/{paste_id}", headers={"X-SSL-Client-SHA1": admin_fp}) + assert delete_resp.status_code == 200 + assert delete_resp.get_json()["message"] == "Paste deleted" + + # Verify gone + get_resp = client.get(f"/{paste_id}") + assert get_resp.status_code == 404 + + def test_non_admin_cannot_delete_others_paste(self, client): + """Non-admin cannot delete pastes owned by others.""" + client.post("/pki/ca") + # First user is admin (create so second is not) + admin_resp = client.post("/pki/issue", json={"common_name": "admin"}) + admin_fp = admin_resp.get_json()["fingerprint_sha1"] + + user_resp = client.post("/pki/issue", json={"common_name": "user"}) + user_fp = user_resp.get_json()["fingerprint_sha1"] + + # Admin creates a paste + paste_resp = client.post( + "/", data=b"admin content", headers={"X-SSL-Client-SHA1": admin_fp} + ) + paste_id = paste_resp.get_json()["id"] + + # User tries to delete it + delete_resp = client.delete(f"/{paste_id}", headers={"X-SSL-Client-SHA1": user_fp}) + assert delete_resp.status_code == 403