forked from claw/flaskpaste
pki: admin can list/delete any paste
Add is_admin() helper to check if current user is admin. Update DELETE /<id> to allow admin to delete any paste. Update GET /pastes to support ?all=1 for admin to list all pastes. Admin view includes owner fingerprint in paste metadata.
This commit is contained in:
@@ -376,6 +376,25 @@ def get_client_id() -> str | None:
|
|||||||
return fingerprint
|
return fingerprint
|
||||||
|
|
||||||
|
|
||||||
|
def is_admin() -> bool:
|
||||||
|
"""Check if current authenticated user is an admin.
|
||||||
|
|
||||||
|
Returns True only if:
|
||||||
|
- User has a valid client certificate
|
||||||
|
- Certificate is marked as admin in the PKI database
|
||||||
|
"""
|
||||||
|
client_id = get_client_id()
|
||||||
|
if not client_id:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not current_app.config.get("PKI_ENABLED"):
|
||||||
|
return False
|
||||||
|
|
||||||
|
from app.pki import is_admin_certificate
|
||||||
|
|
||||||
|
return is_admin_certificate(client_id)
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
# Proof-of-Work
|
# Proof-of-Work
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
@@ -1286,7 +1305,7 @@ class PasteDeleteView(MethodView):
|
|||||||
"""Paste deletion with authentication."""
|
"""Paste deletion with authentication."""
|
||||||
|
|
||||||
def delete(self, paste_id: str) -> Response:
|
def delete(self, paste_id: str) -> Response:
|
||||||
"""Delete paste. Requires ownership."""
|
"""Delete paste. Requires ownership or admin rights."""
|
||||||
# Validate
|
# Validate
|
||||||
if err := validate_paste_id(paste_id):
|
if err := validate_paste_id(paste_id):
|
||||||
return err
|
return err
|
||||||
@@ -1300,7 +1319,8 @@ class PasteDeleteView(MethodView):
|
|||||||
if row is None:
|
if row is None:
|
||||||
return error_response("Paste not found", 404)
|
return error_response("Paste not found", 404)
|
||||||
|
|
||||||
if row["owner"] != g.client_id:
|
# Allow if owner or admin
|
||||||
|
if row["owner"] != g.client_id and not is_admin():
|
||||||
return error_response("Permission denied", 403)
|
return error_response("Permission denied", 403)
|
||||||
|
|
||||||
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
|
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
|
||||||
@@ -1310,15 +1330,15 @@ class PasteDeleteView(MethodView):
|
|||||||
|
|
||||||
|
|
||||||
class PastesListView(MethodView):
|
class PastesListView(MethodView):
|
||||||
"""List authenticated user's pastes (privacy-focused)."""
|
"""List pastes with authentication."""
|
||||||
|
|
||||||
def get(self) -> Response:
|
def get(self) -> Response:
|
||||||
"""List pastes owned by authenticated user.
|
"""List pastes owned by authenticated user, or all pastes for admins.
|
||||||
|
|
||||||
Privacy guarantees:
|
Privacy guarantees:
|
||||||
- Requires authentication (mTLS client certificate)
|
- Requires authentication (mTLS client certificate)
|
||||||
- Users can ONLY see their own pastes
|
- Regular users can ONLY see their own pastes
|
||||||
- No admin bypass or cross-user visibility
|
- Admins can see all pastes (with optional owner filter)
|
||||||
- Content is never returned, only metadata
|
- Content is never returned, only metadata
|
||||||
|
|
||||||
Query parameters:
|
Query parameters:
|
||||||
@@ -1327,6 +1347,8 @@ class PastesListView(MethodView):
|
|||||||
- type: filter by MIME type (glob pattern, e.g., "image/*")
|
- type: filter by MIME type (glob pattern, e.g., "image/*")
|
||||||
- after: filter by created_at >= timestamp
|
- after: filter by created_at >= timestamp
|
||||||
- before: filter by created_at <= timestamp
|
- before: filter by created_at <= timestamp
|
||||||
|
- all: (admin only) if "1", list all pastes instead of own
|
||||||
|
- owner: (admin only) filter by owner fingerprint
|
||||||
"""
|
"""
|
||||||
import fnmatch
|
import fnmatch
|
||||||
|
|
||||||
@@ -1335,6 +1357,7 @@ class PastesListView(MethodView):
|
|||||||
return err
|
return err
|
||||||
|
|
||||||
client_id = g.client_id
|
client_id = g.client_id
|
||||||
|
user_is_admin = is_admin()
|
||||||
|
|
||||||
# Parse pagination parameters
|
# Parse pagination parameters
|
||||||
try:
|
try:
|
||||||
@@ -1354,11 +1377,27 @@ class PastesListView(MethodView):
|
|||||||
except (ValueError, TypeError):
|
except (ValueError, TypeError):
|
||||||
before_ts = 0
|
before_ts = 0
|
||||||
|
|
||||||
|
# Admin-only parameters
|
||||||
|
show_all = request.args.get("all", "0") == "1" and user_is_admin
|
||||||
|
owner_filter = request.args.get("owner", "").strip() if user_is_admin else ""
|
||||||
|
|
||||||
db = get_db()
|
db = get_db()
|
||||||
|
|
||||||
# Build query with filters
|
# Build query with filters
|
||||||
where_clauses = ["owner = ?"]
|
where_clauses: list[str] = []
|
||||||
params: list[Any] = [client_id]
|
params: list[Any] = []
|
||||||
|
|
||||||
|
# Owner filtering logic
|
||||||
|
if show_all:
|
||||||
|
# Admin viewing all pastes (with optional owner filter)
|
||||||
|
if owner_filter:
|
||||||
|
where_clauses.append("owner = ?")
|
||||||
|
params.append(owner_filter)
|
||||||
|
# else: no owner filter, show all
|
||||||
|
else:
|
||||||
|
# Regular user or admin without ?all=1: show only own pastes
|
||||||
|
where_clauses.append("owner = ?")
|
||||||
|
params.append(client_id)
|
||||||
|
|
||||||
if after_ts > 0:
|
if after_ts > 0:
|
||||||
where_clauses.append("created_at >= ?")
|
where_clauses.append("created_at >= ?")
|
||||||
@@ -1367,7 +1406,8 @@ class PastesListView(MethodView):
|
|||||||
where_clauses.append("created_at <= ?")
|
where_clauses.append("created_at <= ?")
|
||||||
params.append(before_ts)
|
params.append(before_ts)
|
||||||
|
|
||||||
where_sql = " AND ".join(where_clauses)
|
# Build WHERE clause (may be empty for admin viewing all)
|
||||||
|
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
|
||||||
|
|
||||||
# Count total pastes matching filters (where_sql is safe, built from constants)
|
# Count total pastes matching filters (where_sql is safe, built from constants)
|
||||||
count_row = db.execute(
|
count_row = db.execute(
|
||||||
@@ -1377,8 +1417,9 @@ class PastesListView(MethodView):
|
|||||||
total = count_row["total"] if count_row else 0
|
total = count_row["total"] if count_row else 0
|
||||||
|
|
||||||
# Fetch pastes with metadata only (where_sql is safe, built from constants)
|
# Fetch pastes with metadata only (where_sql is safe, built from constants)
|
||||||
|
# Include owner for admin view
|
||||||
rows = db.execute(
|
rows = db.execute(
|
||||||
f"""SELECT id, mime_type, length(content) as size, created_at,
|
f"""SELECT id, owner, mime_type, length(content) as size, created_at,
|
||||||
last_accessed, burn_after_read, expires_at,
|
last_accessed, burn_after_read, expires_at,
|
||||||
CASE WHEN password_hash IS NOT NULL THEN 1 ELSE 0 END as password_protected
|
CASE WHEN password_hash IS NOT NULL THEN 1 ELSE 0 END as password_protected
|
||||||
FROM pastes
|
FROM pastes
|
||||||
@@ -1403,6 +1444,9 @@ class PastesListView(MethodView):
|
|||||||
"url": f"/{row['id']}",
|
"url": f"/{row['id']}",
|
||||||
"raw": f"/{row['id']}/raw",
|
"raw": f"/{row['id']}/raw",
|
||||||
}
|
}
|
||||||
|
# Include owner for admin view
|
||||||
|
if show_all and row["owner"]:
|
||||||
|
paste["owner"] = row["owner"]
|
||||||
if row["burn_after_read"]:
|
if row["burn_after_read"]:
|
||||||
paste["burn_after_read"] = True
|
paste["burn_after_read"] = True
|
||||||
if row["expires_at"]:
|
if row["expires_at"]:
|
||||||
@@ -1411,15 +1455,16 @@ class PastesListView(MethodView):
|
|||||||
paste["password_protected"] = True
|
paste["password_protected"] = True
|
||||||
pastes.append(paste)
|
pastes.append(paste)
|
||||||
|
|
||||||
return json_response(
|
response_data: dict[str, Any] = {
|
||||||
{
|
"pastes": pastes,
|
||||||
"pastes": pastes,
|
"count": len(pastes),
|
||||||
"count": len(pastes),
|
"total": total,
|
||||||
"total": total,
|
"limit": limit,
|
||||||
"limit": limit,
|
"offset": offset,
|
||||||
"offset": offset,
|
}
|
||||||
}
|
if user_is_admin:
|
||||||
)
|
response_data["is_admin"] = True
|
||||||
|
return json_response(response_data)
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|||||||
@@ -619,3 +619,98 @@ class TestPKCS12Creation:
|
|||||||
# Should succeed with correct password
|
# Should succeed with correct password
|
||||||
loaded_key, _, _ = pkcs12.load_key_and_certificates(p12_data, password=b"secret123")
|
loaded_key, _, _ = pkcs12.load_key_and_certificates(p12_data, password=b"secret123")
|
||||||
assert loaded_key is not None
|
assert loaded_key is not None
|
||||||
|
|
||||||
|
|
||||||
|
class TestAdminPrivileges:
|
||||||
|
"""Test admin privileges for list/delete operations."""
|
||||||
|
|
||||||
|
def test_admin_can_list_all_pastes(self, client):
|
||||||
|
"""Admin can list all pastes with ?all=1."""
|
||||||
|
# Setup: Create CA and issue admin cert (first user)
|
||||||
|
client.post("/pki/ca")
|
||||||
|
admin_resp = client.post("/pki/issue", json={"common_name": "admin"})
|
||||||
|
admin_fp = admin_resp.get_json()["fingerprint_sha1"]
|
||||||
|
|
||||||
|
# Issue non-admin cert (second user)
|
||||||
|
user_resp = client.post("/pki/issue", json={"common_name": "user"})
|
||||||
|
user_fp = user_resp.get_json()["fingerprint_sha1"]
|
||||||
|
|
||||||
|
# User creates a paste
|
||||||
|
paste_resp = client.post("/", data=b"user content", headers={"X-SSL-Client-SHA1": user_fp})
|
||||||
|
assert paste_resp.status_code == 201
|
||||||
|
|
||||||
|
# User can only see their own pastes
|
||||||
|
user_list = client.get("/pastes", headers={"X-SSL-Client-SHA1": user_fp})
|
||||||
|
assert user_list.status_code == 200
|
||||||
|
assert user_list.get_json()["count"] == 1
|
||||||
|
assert "is_admin" not in user_list.get_json()
|
||||||
|
|
||||||
|
# Admin lists only their own by default (no pastes)
|
||||||
|
admin_list = client.get("/pastes", headers={"X-SSL-Client-SHA1": admin_fp})
|
||||||
|
assert admin_list.status_code == 200
|
||||||
|
assert admin_list.get_json()["count"] == 0
|
||||||
|
assert admin_list.get_json()["is_admin"] is True
|
||||||
|
|
||||||
|
# Admin lists all with ?all=1
|
||||||
|
admin_all = client.get("/pastes?all=1", headers={"X-SSL-Client-SHA1": admin_fp})
|
||||||
|
assert admin_all.status_code == 200
|
||||||
|
assert admin_all.get_json()["count"] == 1
|
||||||
|
# Includes owner info
|
||||||
|
assert "owner" in admin_all.get_json()["pastes"][0]
|
||||||
|
assert admin_all.get_json()["pastes"][0]["owner"] == user_fp
|
||||||
|
|
||||||
|
def test_non_admin_cannot_use_all_param(self, client):
|
||||||
|
"""Non-admin ?all=1 is ignored."""
|
||||||
|
client.post("/pki/ca")
|
||||||
|
# First user is admin
|
||||||
|
client.post("/pki/issue", json={"common_name": "admin"})
|
||||||
|
# Second user is not
|
||||||
|
user_resp = client.post("/pki/issue", json={"common_name": "user"})
|
||||||
|
user_fp = user_resp.get_json()["fingerprint_sha1"]
|
||||||
|
|
||||||
|
# User tries ?all=1, should be ignored
|
||||||
|
resp = client.get("/pastes?all=1", headers={"X-SSL-Client-SHA1": user_fp})
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert "is_admin" not in resp.get_json()
|
||||||
|
|
||||||
|
def test_admin_can_delete_any_paste(self, client):
|
||||||
|
"""Admin can delete pastes owned by others."""
|
||||||
|
client.post("/pki/ca")
|
||||||
|
admin_resp = client.post("/pki/issue", json={"common_name": "admin"})
|
||||||
|
admin_fp = admin_resp.get_json()["fingerprint_sha1"]
|
||||||
|
|
||||||
|
user_resp = client.post("/pki/issue", json={"common_name": "user"})
|
||||||
|
user_fp = user_resp.get_json()["fingerprint_sha1"]
|
||||||
|
|
||||||
|
# User creates a paste
|
||||||
|
paste_resp = client.post("/", data=b"user content", headers={"X-SSL-Client-SHA1": user_fp})
|
||||||
|
paste_id = paste_resp.get_json()["id"]
|
||||||
|
|
||||||
|
# Admin deletes it
|
||||||
|
delete_resp = client.delete(f"/{paste_id}", headers={"X-SSL-Client-SHA1": admin_fp})
|
||||||
|
assert delete_resp.status_code == 200
|
||||||
|
assert delete_resp.get_json()["message"] == "Paste deleted"
|
||||||
|
|
||||||
|
# Verify gone
|
||||||
|
get_resp = client.get(f"/{paste_id}")
|
||||||
|
assert get_resp.status_code == 404
|
||||||
|
|
||||||
|
def test_non_admin_cannot_delete_others_paste(self, client):
|
||||||
|
"""Non-admin cannot delete pastes owned by others."""
|
||||||
|
client.post("/pki/ca")
|
||||||
|
# First user is admin (create so second is not)
|
||||||
|
admin_resp = client.post("/pki/issue", json={"common_name": "admin"})
|
||||||
|
admin_fp = admin_resp.get_json()["fingerprint_sha1"]
|
||||||
|
|
||||||
|
user_resp = client.post("/pki/issue", json={"common_name": "user"})
|
||||||
|
user_fp = user_resp.get_json()["fingerprint_sha1"]
|
||||||
|
|
||||||
|
# Admin creates a paste
|
||||||
|
paste_resp = client.post(
|
||||||
|
"/", data=b"admin content", headers={"X-SSL-Client-SHA1": admin_fp}
|
||||||
|
)
|
||||||
|
paste_id = paste_resp.get_json()["id"]
|
||||||
|
|
||||||
|
# User tries to delete it
|
||||||
|
delete_resp = client.delete(f"/{paste_id}", headers={"X-SSL-Client-SHA1": user_fp})
|
||||||
|
assert delete_resp.status_code == 403
|
||||||
|
|||||||
Reference in New Issue
Block a user