Files
flaskpaste/tests/test_paste_management.py
2025-12-25 00:19:21 +01:00

451 lines
18 KiB
Python

"""Tests for paste management API endpoints (list, search, update)."""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
import pytest
from app import create_app
from app.database import get_db
if TYPE_CHECKING:
from flask import Flask
from flask.testing import FlaskClient
# ─────────────────────────────────────────────────────────────────────────────
# Fixtures
# ─────────────────────────────────────────────────────────────────────────────
@pytest.fixture
def app() -> Flask:
"""Create application configured for testing."""
app = create_app("testing")
app.config["POW_DIFFICULTY"] = 0 # Disable PoW for tests
app.config["RATE_LIMIT_ENABLED"] = False
return app
@pytest.fixture
def client(app: Flask) -> FlaskClient:
"""Create test client."""
return app.test_client()
@pytest.fixture
def auth_client(app: Flask) -> FlaskClient:
"""Create test client with authentication headers."""
client = app.test_client()
# Simulate mTLS client certificate fingerprint
client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = "a" * 40
return client
@pytest.fixture
def admin_client(app: Flask) -> FlaskClient:
"""Create test client with admin authentication."""
client = app.test_client()
fingerprint = "b" * 40
client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = fingerprint
# Mark as admin in PKI
with app.app_context():
app.config["PKI_ENABLED"] = True
db = get_db()
now = int(time.time())
# Create CA first (required by foreign key)
db.execute(
"""INSERT OR REPLACE INTO certificate_authority
(id, common_name, certificate_pem, private_key_encrypted,
key_salt, created_at, expires_at, key_algorithm)
VALUES ('default', 'Test CA', 'CERT', X'00', X'00', ?, ?, 'rsa')""",
(now, now + 86400 * 365),
)
db.execute(
"""INSERT OR REPLACE INTO issued_certificates
(serial, ca_id, common_name, fingerprint_sha1, certificate_pem,
created_at, expires_at, status, is_admin)
VALUES (?, 'default', 'admin', ?, 'CERT', ?, ?, 'valid', 1)""",
("admin001", fingerprint, now, now + 86400),
)
db.commit()
return client
def create_paste(
client: FlaskClient,
content: bytes = b"test content",
owner: str | None = None,
burn_after_read: bool = False,
password: str | None = None,
expires_in: int | None = None,
) -> str:
"""Helper to create a paste and return its ID."""
headers = {"Content-Type": "text/plain"}
if owner:
headers["X-SSL-Client-SHA1"] = owner
if burn_after_read:
headers["X-Burn-After-Read"] = "true"
if password:
headers["X-Paste-Password"] = password
if expires_in:
headers["X-Expiry"] = str(expires_in)
response = client.post("/", data=content, headers=headers)
assert response.status_code == 201
data = response.get_json()
assert data is not None
return str(data["id"])
# ─────────────────────────────────────────────────────────────────────────────
# Test: List Pastes
# ─────────────────────────────────────────────────────────────────────────────
class TestPastesList:
"""Test GET /pastes endpoint."""
def test_list_requires_auth(self, client: FlaskClient) -> None:
"""Listing pastes requires authentication."""
response = client.get("/pastes")
assert response.status_code == 401
def test_list_own_pastes(self, app: Flask, auth_client: FlaskClient) -> None:
"""Authenticated users can list their own pastes."""
with app.app_context():
# Create pastes owned by authenticated user
paste1 = create_paste(auth_client, b"paste 1", owner="a" * 40)
paste2 = create_paste(auth_client, b"paste 2", owner="a" * 40)
# Create paste owned by different user
create_paste(auth_client, b"other paste", owner="c" * 40)
response = auth_client.get("/pastes")
assert response.status_code == 200
data = response.get_json()
assert "pastes" in data
assert data["count"] >= 2
paste_ids = [p["id"] for p in data["pastes"]]
assert paste1 in paste_ids
assert paste2 in paste_ids
def test_list_pagination(self, app: Flask, auth_client: FlaskClient) -> None:
"""List endpoint supports pagination."""
with app.app_context():
for i in range(5):
create_paste(auth_client, f"paste {i}".encode(), owner="a" * 40)
# First page
response = auth_client.get("/pastes?limit=2&offset=0")
assert response.status_code == 200
data = response.get_json()
assert data["count"] == 2
assert data["limit"] == 2
assert data["offset"] == 0
# Second page
response = auth_client.get("/pastes?limit=2&offset=2")
data = response.get_json()
assert data["count"] == 2
assert data["offset"] == 2
def test_list_returns_metadata_only(self, app: Flask, auth_client: FlaskClient) -> None:
"""List should return metadata, not content."""
with app.app_context():
create_paste(auth_client, b"secret content", owner="a" * 40)
response = auth_client.get("/pastes")
data = response.get_json()
for paste in data["pastes"]:
assert "id" in paste
assert "mime_type" in paste
assert "size" in paste
assert "created_at" in paste
assert "content" not in paste
def test_admin_list_all(self, app: Flask, admin_client: FlaskClient) -> None:
"""Admin can list all pastes with ?all=1."""
with app.app_context():
create_paste(admin_client, b"user paste", owner="c" * 40)
create_paste(admin_client, b"admin paste", owner="b" * 40)
response = admin_client.get("/pastes?all=1")
assert response.status_code == 200
data = response.get_json()
assert data.get("is_admin") is True
# Should include pastes from different owners
assert data["total"] >= 2
# ─────────────────────────────────────────────────────────────────────────────
# Test: Search Pastes
# ─────────────────────────────────────────────────────────────────────────────
class TestPastesSearch:
"""Test search functionality on GET /pastes."""
def test_search_by_type(self, app: Flask, auth_client: FlaskClient) -> None:
"""Search can filter by MIME type glob pattern."""
with app.app_context():
# Create pastes with different types
# text/plain
create_paste(auth_client, b"text content", owner="a" * 40)
# Will be detected as application/octet-stream (binary)
create_paste(auth_client, b"\x89PNG\r\n\x1a\nimage", owner="a" * 40)
response = auth_client.get("/pastes?type=text/*")
assert response.status_code == 200
data = response.get_json()
for paste in data["pastes"]:
assert paste["mime_type"].startswith("text/")
def test_search_by_date_after(self, app: Flask, auth_client: FlaskClient) -> None:
"""Search can filter by created_at >= timestamp."""
with app.app_context():
create_paste(auth_client, b"paste", owner="a" * 40)
# Search for pastes created in the last hour
one_hour_ago = int(time.time()) - 3600
response = auth_client.get(f"/pastes?after={one_hour_ago}")
assert response.status_code == 200
data = response.get_json()
for paste in data["pastes"]:
assert paste["created_at"] >= one_hour_ago
def test_search_by_date_before(self, app: Flask, auth_client: FlaskClient) -> None:
"""Search can filter by created_at <= timestamp."""
with app.app_context():
create_paste(auth_client, b"paste", owner="a" * 40)
future = int(time.time()) + 3600
response = auth_client.get(f"/pastes?before={future}")
assert response.status_code == 200
data = response.get_json()
for paste in data["pastes"]:
assert paste["created_at"] <= future
def test_search_combined_filters(self, app: Flask, auth_client: FlaskClient) -> None:
"""Multiple filters can be combined."""
with app.app_context():
create_paste(auth_client, b"test", owner="a" * 40)
now = int(time.time())
response = auth_client.get(
f"/pastes?type=text/*&after={now - 60}&before={now + 60}&limit=10"
)
assert response.status_code == 200
# ─────────────────────────────────────────────────────────────────────────────
# Test: Update Paste
# ─────────────────────────────────────────────────────────────────────────────
class TestPasteUpdate:
"""Test PUT /<id> endpoint."""
def test_update_requires_auth(self, app: Flask, client: FlaskClient) -> None:
"""Update requires authentication."""
with app.app_context():
paste_id = create_paste(client, b"content")
response = client.put(f"/{paste_id}", data=b"new content")
assert response.status_code == 401
def test_update_requires_ownership(self, app: Flask, auth_client: FlaskClient) -> None:
"""Update requires paste ownership."""
with app.app_context():
# Create paste owned by different user
paste_id = create_paste(auth_client, b"content", owner="c" * 40)
response = auth_client.put(f"/{paste_id}", data=b"new content")
assert response.status_code == 403
def test_update_content(self, app: Flask, auth_client: FlaskClient) -> None:
"""Owner can update paste content."""
with app.app_context():
paste_id = create_paste(auth_client, b"original", owner="a" * 40)
response = auth_client.put(
f"/{paste_id}",
data=b"updated content",
content_type="text/plain",
)
assert response.status_code == 200
data = response.get_json()
assert data["id"] == paste_id
assert data["size"] == len(b"updated content")
# Verify content changed
raw_response = auth_client.get(f"/{paste_id}/raw")
assert raw_response.data == b"updated content"
def test_update_password_set(self, app: Flask, auth_client: FlaskClient) -> None:
"""Owner can set password on paste."""
with app.app_context():
paste_id = create_paste(auth_client, b"content", owner="a" * 40)
response = auth_client.put(
f"/{paste_id}",
headers={"X-Paste-Password": "newpass"},
)
assert response.status_code == 200
data = response.get_json()
assert data.get("password_protected") is True
# Verify password required
raw_response = auth_client.get(f"/{paste_id}/raw")
assert raw_response.status_code == 401
def test_update_password_remove(self, app: Flask, auth_client: FlaskClient) -> None:
"""Owner can remove password from paste."""
with app.app_context():
paste_id = create_paste(auth_client, b"content", owner="a" * 40, password="secret")
response = auth_client.put(
f"/{paste_id}",
headers={"X-Remove-Password": "true"},
)
assert response.status_code == 200
data = response.get_json()
assert data.get("password_protected") is not True
# Verify no password required
raw_response = auth_client.get(f"/{paste_id}/raw")
assert raw_response.status_code == 200
def test_update_extend_expiry(self, app: Flask, auth_client: FlaskClient) -> None:
"""Owner can extend paste expiry."""
with app.app_context():
paste_id = create_paste(auth_client, b"content", owner="a" * 40, expires_in=3600)
# Get original expiry
info_response = auth_client.get(f"/{paste_id}")
original_expiry = info_response.get_json().get("expires_at")
response = auth_client.put(
f"/{paste_id}",
headers={"X-Extend-Expiry": "7200"},
)
assert response.status_code == 200
data = response.get_json()
# New expiry should be later
assert data["expires_at"] > original_expiry
def test_update_no_changes(self, app: Flask, auth_client: FlaskClient) -> None:
"""Update with no changes returns 400."""
with app.app_context():
paste_id = create_paste(auth_client, b"content", owner="a" * 40)
response = auth_client.put(f"/{paste_id}")
assert response.status_code == 400
assert "No updates" in response.get_json().get("error", "")
def test_update_not_found(self, auth_client: FlaskClient) -> None:
"""Update non-existent paste returns 404."""
response = auth_client.put("/000000000000", data=b"content")
assert response.status_code == 404
def test_update_burn_after_read_forbidden(self, app: Flask, auth_client: FlaskClient) -> None:
"""Cannot update burn-after-read pastes."""
with app.app_context():
paste_id = create_paste(auth_client, b"content", owner="a" * 40, burn_after_read=True)
response = auth_client.put(f"/{paste_id}", data=b"new content")
assert response.status_code == 400
assert "burn-after-read" in response.get_json().get("error", "").lower()
# ─────────────────────────────────────────────────────────────────────────────
# Test: Delete Paste
# ─────────────────────────────────────────────────────────────────────────────
class TestPasteDelete:
"""Test DELETE /<id> endpoint."""
def test_delete_requires_auth(self, app: Flask, client: FlaskClient) -> None:
"""Delete requires authentication."""
with app.app_context():
paste_id = create_paste(client, b"content")
response = client.delete(f"/{paste_id}")
assert response.status_code == 401
def test_delete_own_paste(self, app: Flask, auth_client: FlaskClient) -> None:
"""Owner can delete their paste."""
with app.app_context():
paste_id = create_paste(auth_client, b"content", owner="a" * 40)
response = auth_client.delete(f"/{paste_id}")
assert response.status_code == 200
assert "deleted" in response.get_json().get("message", "").lower()
# Verify deleted
get_response = auth_client.get(f"/{paste_id}")
assert get_response.status_code == 404
def test_delete_requires_ownership(self, app: Flask, auth_client: FlaskClient) -> None:
"""Cannot delete paste owned by others."""
with app.app_context():
paste_id = create_paste(auth_client, b"content", owner="c" * 40)
response = auth_client.delete(f"/{paste_id}")
assert response.status_code == 403
def test_admin_delete_any(self, app: Flask, admin_client: FlaskClient) -> None:
"""Admin can delete any paste."""
with app.app_context():
paste_id = create_paste(admin_client, b"content", owner="c" * 40)
response = admin_client.delete(f"/{paste_id}")
assert response.status_code == 200
# ─────────────────────────────────────────────────────────────────────────────
# Test: Edge Cases
# ─────────────────────────────────────────────────────────────────────────────
class TestPasteManagementEdgeCases:
"""Test edge cases and error handling."""
def test_invalid_paste_id_format(self, auth_client: FlaskClient) -> None:
"""Invalid paste ID format returns 400."""
response = auth_client.get("/invalid!")
assert response.status_code == 400
def test_list_invalid_pagination(self, auth_client: FlaskClient) -> None:
"""Invalid pagination values are handled gracefully."""
response = auth_client.get("/pastes?limit=invalid&offset=-5")
assert response.status_code == 200
data = response.get_json()
# Should use defaults
assert data["limit"] == 50
assert data["offset"] == 0
def test_search_invalid_timestamp(self, auth_client: FlaskClient) -> None:
"""Invalid timestamp values are handled gracefully."""
response = auth_client.get("/pastes?after=notanumber")
assert response.status_code == 200
# Should treat as 0 (no filter)
def test_update_invalid_expiry(self, app: Flask, auth_client: FlaskClient) -> None:
"""Invalid X-Extend-Expiry returns 400."""
with app.app_context():
paste_id = create_paste(auth_client, b"content", owner="a" * 40)
response = auth_client.put(
f"/{paste_id}",
headers={"X-Extend-Expiry": "notanumber"},
)
assert response.status_code == 400