diff --git a/tests/test_paste_management.py b/tests/test_paste_management.py new file mode 100644 index 0000000..be46797 --- /dev/null +++ b/tests/test_paste_management.py @@ -0,0 +1,464 @@ +"""Tests for paste management API endpoints (list, search, update).""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +import pytest + +from app import create_app +from app.database import get_db + +if TYPE_CHECKING: + from flask import Flask + from flask.testing import FlaskClient + + +# ───────────────────────────────────────────────────────────────────────────── +# Fixtures +# ───────────────────────────────────────────────────────────────────────────── + + +@pytest.fixture +def app() -> Flask: + """Create application configured for testing.""" + app = create_app("testing") + app.config["POW_DIFFICULTY"] = 0 # Disable PoW for tests + app.config["RATE_LIMIT_ENABLED"] = False + return app + + +@pytest.fixture +def client(app: Flask) -> FlaskClient: + """Create test client.""" + return app.test_client() + + +@pytest.fixture +def auth_client(app: Flask) -> FlaskClient: + """Create test client with authentication headers.""" + client = app.test_client() + # Simulate mTLS client certificate fingerprint + client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = "a" * 40 + return client + + +@pytest.fixture +def admin_client(app: Flask) -> FlaskClient: + """Create test client with admin authentication.""" + client = app.test_client() + fingerprint = "b" * 40 + client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = fingerprint + # Mark as admin in PKI + with app.app_context(): + app.config["PKI_ENABLED"] = True + db = get_db() + now = int(time.time()) + # Create CA first (required by foreign key) + db.execute( + """INSERT OR REPLACE INTO certificate_authority + (id, common_name, certificate_pem, private_key_encrypted, + key_salt, created_at, expires_at, key_algorithm) + VALUES ('default', 'Test CA', 'CERT', X'00', X'00', ?, ?, 'rsa')""", + (now, now + 86400 * 365), + ) + db.execute( + """INSERT OR REPLACE INTO issued_certificates + (serial, ca_id, common_name, fingerprint_sha1, certificate_pem, + created_at, expires_at, status, is_admin) + VALUES (?, 'default', 'admin', ?, 'CERT', ?, ?, 'valid', 1)""", + ("admin001", fingerprint, now, now + 86400), + ) + db.commit() + return client + + +def create_paste( + client: FlaskClient, + content: bytes = b"test content", + owner: str | None = None, + burn_after_read: bool = False, + password: str | None = None, + expires_in: int | None = None, +) -> str: + """Helper to create a paste and return its ID.""" + headers = {"Content-Type": "text/plain"} + if owner: + headers["X-SSL-Client-SHA1"] = owner + if burn_after_read: + headers["X-Burn-After-Read"] = "true" + if password: + headers["X-Paste-Password"] = password + if expires_in: + headers["X-Expiry"] = str(expires_in) + + response = client.post("/", data=content, headers=headers) + assert response.status_code == 201 + return response.get_json()["id"] + + +# ───────────────────────────────────────────────────────────────────────────── +# Test: List Pastes +# ───────────────────────────────────────────────────────────────────────────── + + +class TestPastesList: + """Test GET /pastes endpoint.""" + + def test_list_requires_auth(self, client: FlaskClient) -> None: + """Listing pastes requires authentication.""" + response = client.get("/pastes") + assert response.status_code == 401 + + def test_list_own_pastes(self, app: Flask, auth_client: FlaskClient) -> None: + """Authenticated users can list their own pastes.""" + with app.app_context(): + # Create pastes owned by authenticated user + paste1 = create_paste(auth_client, b"paste 1", owner="a" * 40) + paste2 = create_paste(auth_client, b"paste 2", owner="a" * 40) + # Create paste owned by different user + create_paste(auth_client, b"other paste", owner="c" * 40) + + response = auth_client.get("/pastes") + assert response.status_code == 200 + data = response.get_json() + + assert "pastes" in data + assert data["count"] >= 2 + paste_ids = [p["id"] for p in data["pastes"]] + assert paste1 in paste_ids + assert paste2 in paste_ids + + def test_list_pagination(self, app: Flask, auth_client: FlaskClient) -> None: + """List endpoint supports pagination.""" + with app.app_context(): + for i in range(5): + create_paste(auth_client, f"paste {i}".encode(), owner="a" * 40) + + # First page + response = auth_client.get("/pastes?limit=2&offset=0") + assert response.status_code == 200 + data = response.get_json() + assert data["count"] == 2 + assert data["limit"] == 2 + assert data["offset"] == 0 + + # Second page + response = auth_client.get("/pastes?limit=2&offset=2") + data = response.get_json() + assert data["count"] == 2 + assert data["offset"] == 2 + + def test_list_returns_metadata_only( + self, app: Flask, auth_client: FlaskClient + ) -> None: + """List should return metadata, not content.""" + with app.app_context(): + create_paste(auth_client, b"secret content", owner="a" * 40) + + response = auth_client.get("/pastes") + data = response.get_json() + + for paste in data["pastes"]: + assert "id" in paste + assert "mime_type" in paste + assert "size" in paste + assert "created_at" in paste + assert "content" not in paste + + def test_admin_list_all(self, app: Flask, admin_client: FlaskClient) -> None: + """Admin can list all pastes with ?all=1.""" + with app.app_context(): + create_paste(admin_client, b"user paste", owner="c" * 40) + create_paste(admin_client, b"admin paste", owner="b" * 40) + + response = admin_client.get("/pastes?all=1") + assert response.status_code == 200 + data = response.get_json() + assert data.get("is_admin") is True + # Should include pastes from different owners + assert data["total"] >= 2 + + +# ───────────────────────────────────────────────────────────────────────────── +# Test: Search Pastes +# ───────────────────────────────────────────────────────────────────────────── + + +class TestPastesSearch: + """Test search functionality on GET /pastes.""" + + def test_search_by_type(self, app: Flask, auth_client: FlaskClient) -> None: + """Search can filter by MIME type glob pattern.""" + with app.app_context(): + # Create pastes with different types + # text/plain + create_paste(auth_client, b"text content", owner="a" * 40) + # Will be detected as application/octet-stream (binary) + create_paste(auth_client, b"\x89PNG\r\n\x1a\nimage", owner="a" * 40) + + response = auth_client.get("/pastes?type=text/*") + assert response.status_code == 200 + data = response.get_json() + + for paste in data["pastes"]: + assert paste["mime_type"].startswith("text/") + + def test_search_by_date_after(self, app: Flask, auth_client: FlaskClient) -> None: + """Search can filter by created_at >= timestamp.""" + with app.app_context(): + create_paste(auth_client, b"paste", owner="a" * 40) + + # Search for pastes created in the last hour + one_hour_ago = int(time.time()) - 3600 + response = auth_client.get(f"/pastes?after={one_hour_ago}") + assert response.status_code == 200 + data = response.get_json() + + for paste in data["pastes"]: + assert paste["created_at"] >= one_hour_ago + + def test_search_by_date_before(self, app: Flask, auth_client: FlaskClient) -> None: + """Search can filter by created_at <= timestamp.""" + with app.app_context(): + create_paste(auth_client, b"paste", owner="a" * 40) + + future = int(time.time()) + 3600 + response = auth_client.get(f"/pastes?before={future}") + assert response.status_code == 200 + data = response.get_json() + + for paste in data["pastes"]: + assert paste["created_at"] <= future + + def test_search_combined_filters( + self, app: Flask, auth_client: FlaskClient + ) -> None: + """Multiple filters can be combined.""" + with app.app_context(): + create_paste(auth_client, b"test", owner="a" * 40) + + now = int(time.time()) + response = auth_client.get( + f"/pastes?type=text/*&after={now - 60}&before={now + 60}&limit=10" + ) + assert response.status_code == 200 + + +# ───────────────────────────────────────────────────────────────────────────── +# Test: Update Paste +# ───────────────────────────────────────────────────────────────────────────── + + +class TestPasteUpdate: + """Test PUT / endpoint.""" + + def test_update_requires_auth(self, app: Flask, client: FlaskClient) -> None: + """Update requires authentication.""" + with app.app_context(): + paste_id = create_paste(client, b"content") + + response = client.put(f"/{paste_id}", data=b"new content") + assert response.status_code == 401 + + def test_update_requires_ownership( + self, app: Flask, auth_client: FlaskClient + ) -> None: + """Update requires paste ownership.""" + with app.app_context(): + # Create paste owned by different user + paste_id = create_paste(auth_client, b"content", owner="c" * 40) + + response = auth_client.put(f"/{paste_id}", data=b"new content") + assert response.status_code == 403 + + def test_update_content(self, app: Flask, auth_client: FlaskClient) -> None: + """Owner can update paste content.""" + with app.app_context(): + paste_id = create_paste(auth_client, b"original", owner="a" * 40) + + response = auth_client.put( + f"/{paste_id}", + data=b"updated content", + content_type="text/plain", + ) + assert response.status_code == 200 + data = response.get_json() + assert data["id"] == paste_id + assert data["size"] == len(b"updated content") + + # Verify content changed + raw_response = auth_client.get(f"/{paste_id}/raw") + assert raw_response.data == b"updated content" + + def test_update_password_set(self, app: Flask, auth_client: FlaskClient) -> None: + """Owner can set password on paste.""" + with app.app_context(): + paste_id = create_paste(auth_client, b"content", owner="a" * 40) + + response = auth_client.put( + f"/{paste_id}", + headers={"X-Paste-Password": "newpass"}, + ) + assert response.status_code == 200 + data = response.get_json() + assert data.get("password_protected") is True + + # Verify password required + raw_response = auth_client.get(f"/{paste_id}/raw") + assert raw_response.status_code == 401 + + def test_update_password_remove(self, app: Flask, auth_client: FlaskClient) -> None: + """Owner can remove password from paste.""" + with app.app_context(): + paste_id = create_paste( + auth_client, b"content", owner="a" * 40, password="secret" + ) + + response = auth_client.put( + f"/{paste_id}", + headers={"X-Remove-Password": "true"}, + ) + assert response.status_code == 200 + data = response.get_json() + assert data.get("password_protected") is not True + + # Verify no password required + raw_response = auth_client.get(f"/{paste_id}/raw") + assert raw_response.status_code == 200 + + def test_update_extend_expiry(self, app: Flask, auth_client: FlaskClient) -> None: + """Owner can extend paste expiry.""" + with app.app_context(): + paste_id = create_paste( + auth_client, b"content", owner="a" * 40, expires_in=3600 + ) + # Get original expiry + info_response = auth_client.get(f"/{paste_id}") + original_expiry = info_response.get_json().get("expires_at") + + response = auth_client.put( + f"/{paste_id}", + headers={"X-Extend-Expiry": "7200"}, + ) + assert response.status_code == 200 + data = response.get_json() + + # New expiry should be later + assert data["expires_at"] > original_expiry + + def test_update_no_changes(self, app: Flask, auth_client: FlaskClient) -> None: + """Update with no changes returns 400.""" + with app.app_context(): + paste_id = create_paste(auth_client, b"content", owner="a" * 40) + + response = auth_client.put(f"/{paste_id}") + assert response.status_code == 400 + assert "No updates" in response.get_json().get("error", "") + + def test_update_not_found(self, auth_client: FlaskClient) -> None: + """Update non-existent paste returns 404.""" + response = auth_client.put("/000000000000", data=b"content") + assert response.status_code == 404 + + def test_update_burn_after_read_forbidden( + self, app: Flask, auth_client: FlaskClient + ) -> None: + """Cannot update burn-after-read pastes.""" + with app.app_context(): + paste_id = create_paste( + auth_client, b"content", owner="a" * 40, burn_after_read=True + ) + + response = auth_client.put(f"/{paste_id}", data=b"new content") + assert response.status_code == 400 + assert "burn-after-read" in response.get_json().get("error", "").lower() + + +# ───────────────────────────────────────────────────────────────────────────── +# Test: Delete Paste +# ───────────────────────────────────────────────────────────────────────────── + + +class TestPasteDelete: + """Test DELETE / endpoint.""" + + def test_delete_requires_auth(self, app: Flask, client: FlaskClient) -> None: + """Delete requires authentication.""" + with app.app_context(): + paste_id = create_paste(client, b"content") + + response = client.delete(f"/{paste_id}") + assert response.status_code == 401 + + def test_delete_own_paste(self, app: Flask, auth_client: FlaskClient) -> None: + """Owner can delete their paste.""" + with app.app_context(): + paste_id = create_paste(auth_client, b"content", owner="a" * 40) + + response = auth_client.delete(f"/{paste_id}") + assert response.status_code == 200 + assert "deleted" in response.get_json().get("message", "").lower() + + # Verify deleted + get_response = auth_client.get(f"/{paste_id}") + assert get_response.status_code == 404 + + def test_delete_requires_ownership( + self, app: Flask, auth_client: FlaskClient + ) -> None: + """Cannot delete paste owned by others.""" + with app.app_context(): + paste_id = create_paste(auth_client, b"content", owner="c" * 40) + + response = auth_client.delete(f"/{paste_id}") + assert response.status_code == 403 + + def test_admin_delete_any(self, app: Flask, admin_client: FlaskClient) -> None: + """Admin can delete any paste.""" + with app.app_context(): + paste_id = create_paste(admin_client, b"content", owner="c" * 40) + + response = admin_client.delete(f"/{paste_id}") + assert response.status_code == 200 + + +# ───────────────────────────────────────────────────────────────────────────── +# Test: Edge Cases +# ───────────────────────────────────────────────────────────────────────────── + + +class TestPasteManagementEdgeCases: + """Test edge cases and error handling.""" + + def test_invalid_paste_id_format(self, auth_client: FlaskClient) -> None: + """Invalid paste ID format returns 400.""" + response = auth_client.get("/invalid!") + assert response.status_code == 400 + + def test_list_invalid_pagination(self, auth_client: FlaskClient) -> None: + """Invalid pagination values are handled gracefully.""" + response = auth_client.get("/pastes?limit=invalid&offset=-5") + assert response.status_code == 200 + data = response.get_json() + # Should use defaults + assert data["limit"] == 50 + assert data["offset"] == 0 + + def test_search_invalid_timestamp(self, auth_client: FlaskClient) -> None: + """Invalid timestamp values are handled gracefully.""" + response = auth_client.get("/pastes?after=notanumber") + assert response.status_code == 200 + # Should treat as 0 (no filter) + + def test_update_invalid_expiry(self, app: Flask, auth_client: FlaskClient) -> None: + """Invalid X-Extend-Expiry returns 400.""" + with app.app_context(): + paste_id = create_paste(auth_client, b"content", owner="a" * 40) + + response = auth_client.put( + f"/{paste_id}", + headers={"X-Extend-Expiry": "notanumber"}, + ) + assert response.status_code == 400