add security testing suite and update docs
Some checks failed
CI / Lint & Format (push) Failing after 15s
CI / Unit Tests (push) Has been skipped
CI / Memory Leak Check (push) Has been skipped
CI / SBOM Generation (push) Has been skipped
CI / Security Scan (push) Successful in 19s
CI / Security Tests (push) Has been skipped
Some checks failed
CI / Lint & Format (push) Failing after 15s
CI / Unit Tests (push) Has been skipped
CI / Memory Leak Check (push) Has been skipped
CI / SBOM Generation (push) Has been skipped
CI / Security Scan (push) Successful in 19s
CI / Security Tests (push) Has been skipped
- tests/security/pentest_session.py: comprehensive 10-phase pentest - tests/security/profiled_server.py: cProfile-enabled server - tests/security/cli_security_audit.py: CLI security checks - tests/security/dos_memory_test.py: memory exhaustion tests - tests/security/race_condition_test.py: concurrency tests - docs: add pentest results, profiling analysis, new test commands
This commit is contained in:
258
tests/security/dos_memory_test.py
Normal file
258
tests/security/dos_memory_test.py
Normal file
@@ -0,0 +1,258 @@
|
||||
#!/usr/bin/env python3
|
||||
"""DoS memory exhaustion tests for FlaskPaste."""
|
||||
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
sys.path.insert(0, ".")
|
||||
|
||||
from app import create_app
|
||||
|
||||
|
||||
def test_antiflood_memory():
|
||||
"""Test anti-flood list doesn't grow unbounded."""
|
||||
print("\n[1] Anti-Flood List Growth")
|
||||
print("=" * 50)
|
||||
|
||||
app = create_app("testing")
|
||||
|
||||
# Import the antiflood internals
|
||||
from app.api.routes import (
|
||||
_antiflood_requests,
|
||||
record_antiflood_request,
|
||||
reset_antiflood,
|
||||
)
|
||||
|
||||
with app.app_context():
|
||||
reset_antiflood()
|
||||
|
||||
# Simulate 20000 requests (2x max_entries)
|
||||
max_entries = app.config.get("ANTIFLOOD_MAX_ENTRIES", 10000)
|
||||
print(f" Max entries config: {max_entries}")
|
||||
|
||||
for i in range(20000):
|
||||
record_antiflood_request()
|
||||
|
||||
list_size = len(_antiflood_requests)
|
||||
print(f" After 20000 requests: {list_size} entries")
|
||||
|
||||
if list_size > max_entries:
|
||||
print(f" FAIL: List grew beyond max ({list_size} > {max_entries})")
|
||||
return False
|
||||
|
||||
# The list should be trimmed to max_entries/2 when exceeded
|
||||
expected_max = max_entries
|
||||
if list_size <= expected_max:
|
||||
print(f" PASS: List properly bounded ({list_size} <= {expected_max})")
|
||||
reset_antiflood()
|
||||
return True
|
||||
|
||||
print(" FAIL: Unexpected list size")
|
||||
return False
|
||||
|
||||
|
||||
def test_rate_limit_memory():
|
||||
"""Test rate limit dict doesn't grow unbounded with unique IPs."""
|
||||
print("\n[2] Rate Limit Dict Growth (per-IP)")
|
||||
print("=" * 50)
|
||||
|
||||
app = create_app("testing")
|
||||
|
||||
from app.api.routes import (
|
||||
_rate_limit_requests,
|
||||
check_rate_limit,
|
||||
reset_rate_limits,
|
||||
)
|
||||
|
||||
with app.app_context():
|
||||
reset_rate_limits()
|
||||
|
||||
max_entries = app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000)
|
||||
print(f" Max entries config: {max_entries}")
|
||||
|
||||
# Simulate requests from 15000 unique IPs
|
||||
for i in range(15000):
|
||||
ip = f"192.168.{i // 256}.{i % 256}"
|
||||
check_rate_limit(ip, authenticated=False)
|
||||
|
||||
dict_size = len(_rate_limit_requests)
|
||||
print(f" After 15000 unique IPs: {dict_size} entries")
|
||||
|
||||
if dict_size > max_entries:
|
||||
print(f" FAIL: Dict grew beyond max ({dict_size} > {max_entries})")
|
||||
reset_rate_limits()
|
||||
return False
|
||||
|
||||
print(f" PASS: Dict properly bounded ({dict_size} <= {max_entries})")
|
||||
reset_rate_limits()
|
||||
return True
|
||||
|
||||
|
||||
def test_lookup_rate_limit_memory():
|
||||
"""Test lookup rate limit dict for memory exhaustion."""
|
||||
print("\n[3] Lookup Rate Limit Dict Growth (per-IP)")
|
||||
print("=" * 50)
|
||||
|
||||
app = create_app("testing")
|
||||
|
||||
from app.api.routes import (
|
||||
_lookup_rate_limit_requests,
|
||||
check_lookup_rate_limit,
|
||||
reset_lookup_rate_limits,
|
||||
)
|
||||
|
||||
with app.app_context():
|
||||
reset_lookup_rate_limits()
|
||||
|
||||
# Simulate requests from 15000 unique IPs
|
||||
for i in range(15000):
|
||||
ip = f"10.{i // 65536}.{(i // 256) % 256}.{i % 256}"
|
||||
check_lookup_rate_limit(ip)
|
||||
|
||||
dict_size = len(_lookup_rate_limit_requests)
|
||||
print(f" After 15000 unique IPs: {dict_size} entries")
|
||||
|
||||
# Check if there's a max entries config
|
||||
max_entries = app.config.get("LOOKUP_RATE_LIMIT_MAX_ENTRIES", None)
|
||||
|
||||
if max_entries:
|
||||
if dict_size > max_entries:
|
||||
print(f" FAIL: Dict grew beyond max ({dict_size} > {max_entries})")
|
||||
reset_lookup_rate_limits()
|
||||
return False
|
||||
print(f" PASS: Dict properly bounded ({dict_size} <= {max_entries})")
|
||||
else:
|
||||
print(" WARN: No max entries limit configured!")
|
||||
print(f" Dict has {dict_size} entries and could grow unbounded")
|
||||
print(" FAIL: Memory exhaustion vulnerability")
|
||||
reset_lookup_rate_limits()
|
||||
return False
|
||||
|
||||
reset_lookup_rate_limits()
|
||||
return True
|
||||
|
||||
|
||||
def test_dedup_memory():
|
||||
"""Test content dedup dict doesn't grow unbounded."""
|
||||
print("\n[4] Content Dedup Growth")
|
||||
print("=" * 50)
|
||||
|
||||
app = create_app("testing")
|
||||
|
||||
# Content hash dedup is stored in database, not memory
|
||||
# Check if there's a cleanup mechanism
|
||||
with app.app_context():
|
||||
max_entries = app.config.get("DEDUP_MAX_ENTRIES", None)
|
||||
dedup_window = app.config.get("DEDUP_WINDOW", 3600)
|
||||
|
||||
print(f" Dedup window: {dedup_window}s")
|
||||
if max_entries:
|
||||
print(f" Max entries config: {max_entries}")
|
||||
else:
|
||||
print(" NOTE: Dedup is stored in database (SQLite)")
|
||||
print(" Entries expire after window elapses")
|
||||
print(" Mitigated by PoW requirement for creation")
|
||||
|
||||
print(" PASS: Database-backed with expiry")
|
||||
return True
|
||||
|
||||
|
||||
def test_concurrent_memory_pressure():
|
||||
"""Test memory behavior under concurrent load."""
|
||||
print("\n[5] Concurrent Memory Pressure")
|
||||
print("=" * 50)
|
||||
|
||||
app = create_app("testing")
|
||||
|
||||
from app.api.routes import (
|
||||
_rate_limit_requests,
|
||||
check_rate_limit,
|
||||
reset_rate_limits,
|
||||
)
|
||||
|
||||
with app.app_context():
|
||||
reset_rate_limits()
|
||||
errors = []
|
||||
|
||||
def make_requests(thread_id: int):
|
||||
# Each thread needs its own app context
|
||||
with app.app_context():
|
||||
try:
|
||||
for i in range(1000):
|
||||
ip = f"172.{thread_id}.{i // 256}.{i % 256}"
|
||||
check_rate_limit(ip, authenticated=False)
|
||||
except Exception as e:
|
||||
errors.append(str(e))
|
||||
|
||||
threads = [threading.Thread(target=make_requests, args=(t,)) for t in range(10)]
|
||||
|
||||
start = time.time()
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
elapsed = time.time() - start
|
||||
|
||||
dict_size = len(_rate_limit_requests)
|
||||
max_entries = app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000)
|
||||
|
||||
print(" 10 threads x 1000 IPs = 10000 unique IPs")
|
||||
print(f" Elapsed: {elapsed:.2f}s")
|
||||
print(f" Final dict size: {dict_size}")
|
||||
print(f" Errors: {len(errors)}")
|
||||
|
||||
reset_rate_limits()
|
||||
|
||||
if errors:
|
||||
print(" FAIL: Errors during concurrent access")
|
||||
for e in errors[:5]:
|
||||
print(f" {e}")
|
||||
return False
|
||||
|
||||
if dict_size > max_entries:
|
||||
print(" FAIL: Dict exceeded max under concurrency")
|
||||
return False
|
||||
|
||||
print(" PASS: Concurrent access handled correctly")
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
print("=" * 60)
|
||||
print("DoS MEMORY EXHAUSTION TESTS")
|
||||
print("=" * 60)
|
||||
|
||||
results = []
|
||||
|
||||
results.append(("Anti-Flood List Growth", test_antiflood_memory()))
|
||||
results.append(("Rate Limit Dict Growth", test_rate_limit_memory()))
|
||||
results.append(("Lookup Rate Limit Growth", test_lookup_rate_limit_memory()))
|
||||
results.append(("Content Dedup Growth", test_dedup_memory()))
|
||||
results.append(("Concurrent Memory Pressure", test_concurrent_memory_pressure()))
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("SUMMARY")
|
||||
print("=" * 60)
|
||||
|
||||
passed = sum(1 for _, r in results if r)
|
||||
total = len(results)
|
||||
|
||||
for name, result in results:
|
||||
status = "PASS" if result else "FAIL"
|
||||
print(f" {status}: {name}")
|
||||
|
||||
print(f"\n{passed}/{total} checks passed")
|
||||
|
||||
# Report vulnerabilities
|
||||
if passed < total:
|
||||
print("\nVULNERABILITIES:")
|
||||
for name, result in results:
|
||||
if not result:
|
||||
print(f" - {name}")
|
||||
|
||||
return 0 if passed == total else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user