Files
flaskpaste/tests/security/dos_memory_test.py
Username 3be2fd6cf6
Some checks failed
CI / Lint & Format (push) Successful in 22s
CI / Security Scan (push) Successful in 21s
CI / Unit Tests (push) Failing after 17s
CI / Advanced Security Tests (push) Failing after 14s
CI / Memory Leak Check (push) Successful in 20s
CI / Security Tests (push) Successful in 25s
CI / Build & Push Image (push) Has been skipped
CI / SBOM Generation (push) Successful in 20s
tests: fix mypy type errors in security tests
2026-01-18 10:18:09 +01:00

259 lines
7.6 KiB
Python

#!/usr/bin/env python3
"""DoS memory exhaustion tests for FlaskPaste."""
import sys
import threading
import time
sys.path.insert(0, ".")
from app import create_app
def test_antiflood_memory():
"""Test anti-flood list doesn't grow unbounded."""
print("\n[1] Anti-Flood List Growth")
print("=" * 50)
app = create_app("testing")
# Import the antiflood internals
from app.api.routes import (
_antiflood_requests,
record_antiflood_request,
reset_antiflood,
)
with app.app_context():
reset_antiflood()
# Simulate 20000 requests (2x max_entries)
max_entries = app.config.get("ANTIFLOOD_MAX_ENTRIES", 10000)
print(f" Max entries config: {max_entries}")
for _ in range(20000):
record_antiflood_request()
list_size = len(_antiflood_requests)
print(f" After 20000 requests: {list_size} entries")
if list_size > max_entries:
print(f" FAIL: List grew beyond max ({list_size} > {max_entries})")
return False
# The list should be trimmed to max_entries/2 when exceeded
expected_max = max_entries
if list_size <= expected_max:
print(f" PASS: List properly bounded ({list_size} <= {expected_max})")
reset_antiflood()
return True
print(" FAIL: Unexpected list size")
return False
def test_rate_limit_memory():
"""Test rate limit dict doesn't grow unbounded with unique IPs."""
print("\n[2] Rate Limit Dict Growth (per-IP)")
print("=" * 50)
app = create_app("testing")
from app.api.routes import (
_rate_limit_requests,
check_rate_limit,
reset_rate_limits,
)
with app.app_context():
reset_rate_limits()
max_entries = app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000)
print(f" Max entries config: {max_entries}")
# Simulate requests from 15000 unique IPs
for i in range(15000):
ip = f"192.168.{i // 256}.{i % 256}"
check_rate_limit(ip, authenticated=False)
dict_size = len(_rate_limit_requests)
print(f" After 15000 unique IPs: {dict_size} entries")
if dict_size > max_entries:
print(f" FAIL: Dict grew beyond max ({dict_size} > {max_entries})")
reset_rate_limits()
return False
print(f" PASS: Dict properly bounded ({dict_size} <= {max_entries})")
reset_rate_limits()
return True
def test_lookup_rate_limit_memory():
"""Test lookup rate limit dict for memory exhaustion."""
print("\n[3] Lookup Rate Limit Dict Growth (per-IP)")
print("=" * 50)
app = create_app("testing")
from app.api.routes import (
_lookup_rate_limit_requests,
check_lookup_rate_limit,
reset_lookup_rate_limits,
)
with app.app_context():
reset_lookup_rate_limits()
# Simulate requests from 15000 unique IPs
for i in range(15000):
ip = f"10.{i // 65536}.{(i // 256) % 256}.{i % 256}"
check_lookup_rate_limit(ip)
dict_size = len(_lookup_rate_limit_requests)
print(f" After 15000 unique IPs: {dict_size} entries")
# Check if there's a max entries config
max_entries = app.config.get("LOOKUP_RATE_LIMIT_MAX_ENTRIES", None)
if max_entries:
if dict_size > max_entries:
print(f" FAIL: Dict grew beyond max ({dict_size} > {max_entries})")
reset_lookup_rate_limits()
return False
print(f" PASS: Dict properly bounded ({dict_size} <= {max_entries})")
else:
print(" WARN: No max entries limit configured!")
print(f" Dict has {dict_size} entries and could grow unbounded")
print(" FAIL: Memory exhaustion vulnerability")
reset_lookup_rate_limits()
return False
reset_lookup_rate_limits()
return True
def test_dedup_memory():
"""Test content dedup dict doesn't grow unbounded."""
print("\n[4] Content Dedup Growth")
print("=" * 50)
app = create_app("testing")
# Content hash dedup is stored in database, not memory
# Check if there's a cleanup mechanism
with app.app_context():
max_entries = app.config.get("DEDUP_MAX_ENTRIES", None)
dedup_window = app.config.get("DEDUP_WINDOW", 3600)
print(f" Dedup window: {dedup_window}s")
if max_entries:
print(f" Max entries config: {max_entries}")
else:
print(" NOTE: Dedup is stored in database (SQLite)")
print(" Entries expire after window elapses")
print(" Mitigated by PoW requirement for creation")
print(" PASS: Database-backed with expiry")
return True
def test_concurrent_memory_pressure():
"""Test memory behavior under concurrent load."""
print("\n[5] Concurrent Memory Pressure")
print("=" * 50)
app = create_app("testing")
from app.api.routes import (
_rate_limit_requests,
check_rate_limit,
reset_rate_limits,
)
with app.app_context():
reset_rate_limits()
errors = []
def make_requests(thread_id: int) -> None:
# Each thread needs its own app context
with app.app_context():
try:
for i in range(1000):
ip = f"172.{thread_id}.{i // 256}.{i % 256}"
check_rate_limit(ip, authenticated=False)
except Exception as e:
errors.append(str(e))
threads = [threading.Thread(target=make_requests, args=(t,)) for t in range(10)]
start = time.time()
for t in threads:
t.start()
for t in threads:
t.join()
elapsed = time.time() - start
dict_size = len(_rate_limit_requests)
max_entries = app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000)
print(" 10 threads x 1000 IPs = 10000 unique IPs")
print(f" Elapsed: {elapsed:.2f}s")
print(f" Final dict size: {dict_size}")
print(f" Errors: {len(errors)}")
reset_rate_limits()
if errors:
print(" FAIL: Errors during concurrent access")
for e in errors[:5]:
print(f" {e}")
return False
if dict_size > max_entries:
print(" FAIL: Dict exceeded max under concurrency")
return False
print(" PASS: Concurrent access handled correctly")
return True
def main():
print("=" * 60)
print("DoS MEMORY EXHAUSTION TESTS")
print("=" * 60)
results = []
results.append(("Anti-Flood List Growth", test_antiflood_memory()))
results.append(("Rate Limit Dict Growth", test_rate_limit_memory()))
results.append(("Lookup Rate Limit Growth", test_lookup_rate_limit_memory()))
results.append(("Content Dedup Growth", test_dedup_memory()))
results.append(("Concurrent Memory Pressure", test_concurrent_memory_pressure()))
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
passed = sum(1 for _, r in results if r)
total = len(results)
for name, result in results:
status = "PASS" if result else "FAIL"
print(f" {status}: {name}")
print(f"\n{passed}/{total} checks passed")
# Report vulnerabilities
if passed < total:
print("\nVULNERABILITIES:")
for name, result in results:
if not result:
print(f" - {name}")
return 0 if passed == total else 1
if __name__ == "__main__":
sys.exit(main())