forked from username/flaskpaste
451 lines
18 KiB
Python
451 lines
18 KiB
Python
"""Tests for paste management API endpoints (list, search, update)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from typing import TYPE_CHECKING
|
|
|
|
import pytest
|
|
|
|
from app import create_app
|
|
from app.database import get_db
|
|
|
|
if TYPE_CHECKING:
|
|
from flask import Flask
|
|
from flask.testing import FlaskClient
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Fixtures
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.fixture
|
|
def app() -> Flask:
|
|
"""Create application configured for testing."""
|
|
app = create_app("testing")
|
|
app.config["POW_DIFFICULTY"] = 0 # Disable PoW for tests
|
|
app.config["RATE_LIMIT_ENABLED"] = False
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def client(app: Flask) -> FlaskClient:
|
|
"""Create test client."""
|
|
return app.test_client()
|
|
|
|
|
|
@pytest.fixture
|
|
def auth_client(app: Flask) -> FlaskClient:
|
|
"""Create test client with authentication headers."""
|
|
client = app.test_client()
|
|
# Simulate mTLS client certificate fingerprint
|
|
client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = "a" * 40
|
|
return client
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_client(app: Flask) -> FlaskClient:
|
|
"""Create test client with admin authentication."""
|
|
client = app.test_client()
|
|
fingerprint = "b" * 40
|
|
client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = fingerprint
|
|
# Mark as admin in PKI
|
|
with app.app_context():
|
|
app.config["PKI_ENABLED"] = True
|
|
db = get_db()
|
|
now = int(time.time())
|
|
# Create CA first (required by foreign key)
|
|
db.execute(
|
|
"""INSERT OR REPLACE INTO certificate_authority
|
|
(id, common_name, certificate_pem, private_key_encrypted,
|
|
key_salt, created_at, expires_at, key_algorithm)
|
|
VALUES ('default', 'Test CA', 'CERT', X'00', X'00', ?, ?, 'rsa')""",
|
|
(now, now + 86400 * 365),
|
|
)
|
|
db.execute(
|
|
"""INSERT OR REPLACE INTO issued_certificates
|
|
(serial, ca_id, common_name, fingerprint_sha1, certificate_pem,
|
|
created_at, expires_at, status, is_admin)
|
|
VALUES (?, 'default', 'admin', ?, 'CERT', ?, ?, 'valid', 1)""",
|
|
("admin001", fingerprint, now, now + 86400),
|
|
)
|
|
db.commit()
|
|
return client
|
|
|
|
|
|
def create_paste(
|
|
client: FlaskClient,
|
|
content: bytes = b"test content",
|
|
owner: str | None = None,
|
|
burn_after_read: bool = False,
|
|
password: str | None = None,
|
|
expires_in: int | None = None,
|
|
) -> str:
|
|
"""Helper to create a paste and return its ID."""
|
|
headers = {"Content-Type": "text/plain"}
|
|
if owner:
|
|
headers["X-SSL-Client-SHA1"] = owner
|
|
if burn_after_read:
|
|
headers["X-Burn-After-Read"] = "true"
|
|
if password:
|
|
headers["X-Paste-Password"] = password
|
|
if expires_in:
|
|
headers["X-Expiry"] = str(expires_in)
|
|
|
|
response = client.post("/", data=content, headers=headers)
|
|
assert response.status_code == 201
|
|
data = response.get_json()
|
|
assert data is not None
|
|
return str(data["id"])
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Test: List Pastes
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestPastesList:
|
|
"""Test GET /pastes endpoint."""
|
|
|
|
def test_list_requires_auth(self, client: FlaskClient) -> None:
|
|
"""Listing pastes requires authentication."""
|
|
response = client.get("/pastes")
|
|
assert response.status_code == 401
|
|
|
|
def test_list_own_pastes(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""Authenticated users can list their own pastes."""
|
|
with app.app_context():
|
|
# Create pastes owned by authenticated user
|
|
paste1 = create_paste(auth_client, b"paste 1", owner="a" * 40)
|
|
paste2 = create_paste(auth_client, b"paste 2", owner="a" * 40)
|
|
# Create paste owned by different user
|
|
create_paste(auth_client, b"other paste", owner="c" * 40)
|
|
|
|
response = auth_client.get("/pastes")
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
|
|
assert "pastes" in data
|
|
assert data["count"] >= 2
|
|
paste_ids = [p["id"] for p in data["pastes"]]
|
|
assert paste1 in paste_ids
|
|
assert paste2 in paste_ids
|
|
|
|
def test_list_pagination(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""List endpoint supports pagination."""
|
|
with app.app_context():
|
|
for i in range(5):
|
|
create_paste(auth_client, f"paste {i}".encode(), owner="a" * 40)
|
|
|
|
# First page
|
|
response = auth_client.get("/pastes?limit=2&offset=0")
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert data["count"] == 2
|
|
assert data["limit"] == 2
|
|
assert data["offset"] == 0
|
|
|
|
# Second page
|
|
response = auth_client.get("/pastes?limit=2&offset=2")
|
|
data = response.get_json()
|
|
assert data["count"] == 2
|
|
assert data["offset"] == 2
|
|
|
|
def test_list_returns_metadata_only(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""List should return metadata, not content."""
|
|
with app.app_context():
|
|
create_paste(auth_client, b"secret content", owner="a" * 40)
|
|
|
|
response = auth_client.get("/pastes")
|
|
data = response.get_json()
|
|
|
|
for paste in data["pastes"]:
|
|
assert "id" in paste
|
|
assert "mime_type" in paste
|
|
assert "size" in paste
|
|
assert "created_at" in paste
|
|
assert "content" not in paste
|
|
|
|
def test_admin_list_all(self, app: Flask, admin_client: FlaskClient) -> None:
|
|
"""Admin can list all pastes with ?all=1."""
|
|
with app.app_context():
|
|
create_paste(admin_client, b"user paste", owner="c" * 40)
|
|
create_paste(admin_client, b"admin paste", owner="b" * 40)
|
|
|
|
response = admin_client.get("/pastes?all=1")
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert data.get("is_admin") is True
|
|
# Should include pastes from different owners
|
|
assert data["total"] >= 2
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Test: Search Pastes
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestPastesSearch:
|
|
"""Test search functionality on GET /pastes."""
|
|
|
|
def test_search_by_type(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""Search can filter by MIME type glob pattern."""
|
|
with app.app_context():
|
|
# Create pastes with different types
|
|
# text/plain
|
|
create_paste(auth_client, b"text content", owner="a" * 40)
|
|
# Will be detected as application/octet-stream (binary)
|
|
create_paste(auth_client, b"\x89PNG\r\n\x1a\nimage", owner="a" * 40)
|
|
|
|
response = auth_client.get("/pastes?type=text/*")
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
|
|
for paste in data["pastes"]:
|
|
assert paste["mime_type"].startswith("text/")
|
|
|
|
def test_search_by_date_after(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""Search can filter by created_at >= timestamp."""
|
|
with app.app_context():
|
|
create_paste(auth_client, b"paste", owner="a" * 40)
|
|
|
|
# Search for pastes created in the last hour
|
|
one_hour_ago = int(time.time()) - 3600
|
|
response = auth_client.get(f"/pastes?after={one_hour_ago}")
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
|
|
for paste in data["pastes"]:
|
|
assert paste["created_at"] >= one_hour_ago
|
|
|
|
def test_search_by_date_before(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""Search can filter by created_at <= timestamp."""
|
|
with app.app_context():
|
|
create_paste(auth_client, b"paste", owner="a" * 40)
|
|
|
|
future = int(time.time()) + 3600
|
|
response = auth_client.get(f"/pastes?before={future}")
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
|
|
for paste in data["pastes"]:
|
|
assert paste["created_at"] <= future
|
|
|
|
def test_search_combined_filters(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""Multiple filters can be combined."""
|
|
with app.app_context():
|
|
create_paste(auth_client, b"test", owner="a" * 40)
|
|
|
|
now = int(time.time())
|
|
response = auth_client.get(
|
|
f"/pastes?type=text/*&after={now - 60}&before={now + 60}&limit=10"
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Test: Update Paste
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestPasteUpdate:
|
|
"""Test PUT /<id> endpoint."""
|
|
|
|
def test_update_requires_auth(self, app: Flask, client: FlaskClient) -> None:
|
|
"""Update requires authentication."""
|
|
with app.app_context():
|
|
paste_id = create_paste(client, b"content")
|
|
|
|
response = client.put(f"/{paste_id}", data=b"new content")
|
|
assert response.status_code == 401
|
|
|
|
def test_update_requires_ownership(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""Update requires paste ownership."""
|
|
with app.app_context():
|
|
# Create paste owned by different user
|
|
paste_id = create_paste(auth_client, b"content", owner="c" * 40)
|
|
|
|
response = auth_client.put(f"/{paste_id}", data=b"new content")
|
|
assert response.status_code == 403
|
|
|
|
def test_update_content(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""Owner can update paste content."""
|
|
with app.app_context():
|
|
paste_id = create_paste(auth_client, b"original", owner="a" * 40)
|
|
|
|
response = auth_client.put(
|
|
f"/{paste_id}",
|
|
data=b"updated content",
|
|
content_type="text/plain",
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert data["id"] == paste_id
|
|
assert data["size"] == len(b"updated content")
|
|
|
|
# Verify content changed
|
|
raw_response = auth_client.get(f"/{paste_id}/raw")
|
|
assert raw_response.data == b"updated content"
|
|
|
|
def test_update_password_set(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""Owner can set password on paste."""
|
|
with app.app_context():
|
|
paste_id = create_paste(auth_client, b"content", owner="a" * 40)
|
|
|
|
response = auth_client.put(
|
|
f"/{paste_id}",
|
|
headers={"X-Paste-Password": "newpass"},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert data.get("password_protected") is True
|
|
|
|
# Verify password required
|
|
raw_response = auth_client.get(f"/{paste_id}/raw")
|
|
assert raw_response.status_code == 401
|
|
|
|
def test_update_password_remove(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""Owner can remove password from paste."""
|
|
with app.app_context():
|
|
paste_id = create_paste(auth_client, b"content", owner="a" * 40, password="secret")
|
|
|
|
response = auth_client.put(
|
|
f"/{paste_id}",
|
|
headers={"X-Remove-Password": "true"},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert data.get("password_protected") is not True
|
|
|
|
# Verify no password required
|
|
raw_response = auth_client.get(f"/{paste_id}/raw")
|
|
assert raw_response.status_code == 200
|
|
|
|
def test_update_extend_expiry(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""Owner can extend paste expiry."""
|
|
with app.app_context():
|
|
paste_id = create_paste(auth_client, b"content", owner="a" * 40, expires_in=3600)
|
|
# Get original expiry
|
|
info_response = auth_client.get(f"/{paste_id}")
|
|
original_expiry = info_response.get_json().get("expires_at")
|
|
|
|
response = auth_client.put(
|
|
f"/{paste_id}",
|
|
headers={"X-Extend-Expiry": "7200"},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
|
|
# New expiry should be later
|
|
assert data["expires_at"] > original_expiry
|
|
|
|
def test_update_no_changes(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""Update with no changes returns 400."""
|
|
with app.app_context():
|
|
paste_id = create_paste(auth_client, b"content", owner="a" * 40)
|
|
|
|
response = auth_client.put(f"/{paste_id}")
|
|
assert response.status_code == 400
|
|
assert "No updates" in response.get_json().get("error", "")
|
|
|
|
def test_update_not_found(self, auth_client: FlaskClient) -> None:
|
|
"""Update non-existent paste returns 404."""
|
|
response = auth_client.put("/000000000000", data=b"content")
|
|
assert response.status_code == 404
|
|
|
|
def test_update_burn_after_read_forbidden(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""Cannot update burn-after-read pastes."""
|
|
with app.app_context():
|
|
paste_id = create_paste(auth_client, b"content", owner="a" * 40, burn_after_read=True)
|
|
|
|
response = auth_client.put(f"/{paste_id}", data=b"new content")
|
|
assert response.status_code == 400
|
|
assert "burn-after-read" in response.get_json().get("error", "").lower()
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Test: Delete Paste
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestPasteDelete:
|
|
"""Test DELETE /<id> endpoint."""
|
|
|
|
def test_delete_requires_auth(self, app: Flask, client: FlaskClient) -> None:
|
|
"""Delete requires authentication."""
|
|
with app.app_context():
|
|
paste_id = create_paste(client, b"content")
|
|
|
|
response = client.delete(f"/{paste_id}")
|
|
assert response.status_code == 401
|
|
|
|
def test_delete_own_paste(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""Owner can delete their paste."""
|
|
with app.app_context():
|
|
paste_id = create_paste(auth_client, b"content", owner="a" * 40)
|
|
|
|
response = auth_client.delete(f"/{paste_id}")
|
|
assert response.status_code == 200
|
|
assert "deleted" in response.get_json().get("message", "").lower()
|
|
|
|
# Verify deleted
|
|
get_response = auth_client.get(f"/{paste_id}")
|
|
assert get_response.status_code == 404
|
|
|
|
def test_delete_requires_ownership(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""Cannot delete paste owned by others."""
|
|
with app.app_context():
|
|
paste_id = create_paste(auth_client, b"content", owner="c" * 40)
|
|
|
|
response = auth_client.delete(f"/{paste_id}")
|
|
assert response.status_code == 403
|
|
|
|
def test_admin_delete_any(self, app: Flask, admin_client: FlaskClient) -> None:
|
|
"""Admin can delete any paste."""
|
|
with app.app_context():
|
|
paste_id = create_paste(admin_client, b"content", owner="c" * 40)
|
|
|
|
response = admin_client.delete(f"/{paste_id}")
|
|
assert response.status_code == 200
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Test: Edge Cases
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestPasteManagementEdgeCases:
|
|
"""Test edge cases and error handling."""
|
|
|
|
def test_invalid_paste_id_format(self, auth_client: FlaskClient) -> None:
|
|
"""Invalid paste ID format returns 400."""
|
|
response = auth_client.get("/invalid!")
|
|
assert response.status_code == 400
|
|
|
|
def test_list_invalid_pagination(self, auth_client: FlaskClient) -> None:
|
|
"""Invalid pagination values are handled gracefully."""
|
|
response = auth_client.get("/pastes?limit=invalid&offset=-5")
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
# Should use defaults
|
|
assert data["limit"] == 50
|
|
assert data["offset"] == 0
|
|
|
|
def test_search_invalid_timestamp(self, auth_client: FlaskClient) -> None:
|
|
"""Invalid timestamp values are handled gracefully."""
|
|
response = auth_client.get("/pastes?after=notanumber")
|
|
assert response.status_code == 200
|
|
# Should treat as 0 (no filter)
|
|
|
|
def test_update_invalid_expiry(self, app: Flask, auth_client: FlaskClient) -> None:
|
|
"""Invalid X-Extend-Expiry returns 400."""
|
|
with app.app_context():
|
|
paste_id = create_paste(auth_client, b"content", owner="a" * 40)
|
|
|
|
response = auth_client.put(
|
|
f"/{paste_id}",
|
|
headers={"X-Extend-Expiry": "notanumber"},
|
|
)
|
|
assert response.status_code == 400
|