171 lines
5.4 KiB
Python
171 lines
5.4 KiB
Python
"""Tests for scheduled cleanup functionality."""
|
|
|
|
import time
|
|
|
|
|
|
class TestScheduledCleanup:
|
|
"""Tests for scheduled cleanup in before_request hook."""
|
|
|
|
def test_cleanup_times_reset(self, app, client):
|
|
"""Verify cleanup times can be reset for testing."""
|
|
from app.api import _cleanup_times, reset_cleanup_times
|
|
|
|
# Set some cleanup times
|
|
with app.app_context():
|
|
reset_cleanup_times()
|
|
for key in _cleanup_times:
|
|
assert _cleanup_times[key] == 0
|
|
|
|
def test_cleanup_runs_on_request(self, app, client, auth_header):
|
|
"""Verify cleanup runs during request handling."""
|
|
from app.api import _cleanup_times, reset_cleanup_times
|
|
|
|
with app.app_context():
|
|
reset_cleanup_times()
|
|
|
|
# Make a request to trigger cleanup
|
|
client.get("/health")
|
|
|
|
# Cleanup times should be set
|
|
with app.app_context():
|
|
for key in _cleanup_times:
|
|
assert _cleanup_times[key] > 0
|
|
|
|
def test_cleanup_respects_intervals(self, app, client):
|
|
"""Verify cleanup respects configured intervals."""
|
|
from app.api import _cleanup_times, reset_cleanup_times
|
|
|
|
with app.app_context():
|
|
reset_cleanup_times()
|
|
|
|
# First request triggers cleanup
|
|
client.get("/health")
|
|
|
|
with app.app_context():
|
|
first_times = {k: v for k, v in _cleanup_times.items()}
|
|
|
|
# Immediate second request should not reset times
|
|
client.get("/health")
|
|
|
|
with app.app_context():
|
|
for key in _cleanup_times:
|
|
assert _cleanup_times[key] == first_times[key]
|
|
|
|
def test_expired_paste_cleanup(self, app, client, auth_header):
|
|
"""Test that expired pastes are cleaned up."""
|
|
import json
|
|
|
|
from app.database import get_db
|
|
|
|
# Create paste with short expiry
|
|
response = client.post(
|
|
"/",
|
|
data="test content",
|
|
content_type="text/plain",
|
|
headers={**auth_header, "X-Expiry": "2"}, # 2 seconds
|
|
)
|
|
assert response.status_code == 201
|
|
paste_id = json.loads(response.data)["id"]
|
|
|
|
# Wait for expiry (extra buffer for CI timing)
|
|
time.sleep(3)
|
|
|
|
# Trigger cleanup via database function
|
|
with app.app_context():
|
|
from app.database import cleanup_expired_pastes
|
|
|
|
count = cleanup_expired_pastes()
|
|
# At least our paste should be cleaned up
|
|
assert count >= 1, f"Expected cleanup to remove at least 1 paste, got {count}"
|
|
|
|
# Verify paste is gone
|
|
db = get_db()
|
|
row = db.execute("SELECT id FROM pastes WHERE id = ?", (paste_id,)).fetchone()
|
|
assert row is None, f"Paste {paste_id} should have been deleted"
|
|
|
|
def test_rate_limit_cleanup(self, app, client):
|
|
"""Test that rate limit entries are cleaned up."""
|
|
from app.api.routes import (
|
|
_rate_limit_requests,
|
|
check_rate_limit,
|
|
cleanup_rate_limits,
|
|
reset_rate_limits,
|
|
)
|
|
|
|
with app.app_context():
|
|
reset_rate_limits()
|
|
|
|
# Add some rate limit entries
|
|
check_rate_limit("192.168.1.1", authenticated=False)
|
|
check_rate_limit("192.168.1.2", authenticated=False)
|
|
|
|
assert len(_rate_limit_requests) == 2
|
|
|
|
# Cleanup with very large window (nothing should be removed)
|
|
count = cleanup_rate_limits(window=3600)
|
|
assert count == 0
|
|
|
|
# Wait a bit and cleanup with tiny window
|
|
time.sleep(0.1)
|
|
count = cleanup_rate_limits(window=0) # Immediate cleanup
|
|
assert count == 2
|
|
assert len(_rate_limit_requests) == 0
|
|
|
|
|
|
class TestCleanupThreadSafety:
|
|
"""Tests for thread-safety of cleanup operations."""
|
|
|
|
def test_cleanup_lock_exists(self, app):
|
|
"""Verify cleanup lock exists."""
|
|
import threading
|
|
|
|
from app.api import _cleanup_lock
|
|
|
|
assert isinstance(_cleanup_lock, type(threading.Lock()))
|
|
|
|
def test_concurrent_cleanup_access(self, app, client):
|
|
"""Test that concurrent requests don't corrupt cleanup state."""
|
|
import threading
|
|
|
|
from app.api import reset_cleanup_times
|
|
|
|
with app.app_context():
|
|
reset_cleanup_times()
|
|
|
|
errors = []
|
|
results = []
|
|
|
|
def make_request():
|
|
try:
|
|
resp = client.get("/health")
|
|
results.append(resp.status_code)
|
|
except Exception as e:
|
|
errors.append(str(e))
|
|
|
|
# Simulate concurrent requests
|
|
threads = [threading.Thread(target=make_request) for _ in range(10)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
assert not errors
|
|
assert all(r == 200 for r in results)
|
|
|
|
|
|
class TestCleanupConfiguration:
|
|
"""Tests for cleanup configuration."""
|
|
|
|
def test_cleanup_intervals_configured(self, app):
|
|
"""Verify cleanup intervals are properly configured."""
|
|
from app.api import _CLEANUP_INTERVALS
|
|
|
|
assert "pastes" in _CLEANUP_INTERVALS
|
|
assert "hashes" in _CLEANUP_INTERVALS
|
|
assert "rate_limits" in _CLEANUP_INTERVALS
|
|
|
|
# Verify reasonable intervals
|
|
assert _CLEANUP_INTERVALS["pastes"] >= 60 # At least 1 minute
|
|
assert _CLEANUP_INTERVALS["hashes"] >= 60
|
|
assert _CLEANUP_INTERVALS["rate_limits"] >= 60
|