#!/usr/bin/env python3 """DoS memory exhaustion tests for FlaskPaste.""" import sys import threading import time sys.path.insert(0, ".") from app import create_app def test_antiflood_memory(): """Test anti-flood list doesn't grow unbounded.""" print("\n[1] Anti-Flood List Growth") print("=" * 50) app = create_app("testing") # Import the antiflood internals from app.api.routes import ( _antiflood_requests, record_antiflood_request, reset_antiflood, ) with app.app_context(): reset_antiflood() # Simulate 20000 requests (2x max_entries) max_entries = app.config.get("ANTIFLOOD_MAX_ENTRIES", 10000) print(f" Max entries config: {max_entries}") for _ in range(20000): record_antiflood_request() list_size = len(_antiflood_requests) print(f" After 20000 requests: {list_size} entries") if list_size > max_entries: print(f" FAIL: List grew beyond max ({list_size} > {max_entries})") return False # The list should be trimmed to max_entries/2 when exceeded expected_max = max_entries if list_size <= expected_max: print(f" PASS: List properly bounded ({list_size} <= {expected_max})") reset_antiflood() return True print(" FAIL: Unexpected list size") return False def test_rate_limit_memory(): """Test rate limit dict doesn't grow unbounded with unique IPs.""" print("\n[2] Rate Limit Dict Growth (per-IP)") print("=" * 50) app = create_app("testing") from app.api.routes import ( _rate_limit_requests, check_rate_limit, reset_rate_limits, ) with app.app_context(): reset_rate_limits() max_entries = app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000) print(f" Max entries config: {max_entries}") # Simulate requests from 15000 unique IPs for i in range(15000): ip = f"192.168.{i // 256}.{i % 256}" check_rate_limit(ip, authenticated=False) dict_size = len(_rate_limit_requests) print(f" After 15000 unique IPs: {dict_size} entries") if dict_size > max_entries: print(f" FAIL: Dict grew beyond max ({dict_size} > {max_entries})") reset_rate_limits() return False print(f" PASS: Dict properly bounded ({dict_size} <= {max_entries})") reset_rate_limits() return True def test_lookup_rate_limit_memory(): """Test lookup rate limit dict for memory exhaustion.""" print("\n[3] Lookup Rate Limit Dict Growth (per-IP)") print("=" * 50) app = create_app("testing") from app.api.routes import ( _lookup_rate_limit_requests, check_lookup_rate_limit, reset_lookup_rate_limits, ) with app.app_context(): reset_lookup_rate_limits() # Simulate requests from 15000 unique IPs for i in range(15000): ip = f"10.{i // 65536}.{(i // 256) % 256}.{i % 256}" check_lookup_rate_limit(ip) dict_size = len(_lookup_rate_limit_requests) print(f" After 15000 unique IPs: {dict_size} entries") # Check if there's a max entries config max_entries = app.config.get("LOOKUP_RATE_LIMIT_MAX_ENTRIES", None) if max_entries: if dict_size > max_entries: print(f" FAIL: Dict grew beyond max ({dict_size} > {max_entries})") reset_lookup_rate_limits() return False print(f" PASS: Dict properly bounded ({dict_size} <= {max_entries})") else: print(" WARN: No max entries limit configured!") print(f" Dict has {dict_size} entries and could grow unbounded") print(" FAIL: Memory exhaustion vulnerability") reset_lookup_rate_limits() return False reset_lookup_rate_limits() return True def test_dedup_memory(): """Test content dedup dict doesn't grow unbounded.""" print("\n[4] Content Dedup Growth") print("=" * 50) app = create_app("testing") # Content hash dedup is stored in database, not memory # Check if there's a cleanup mechanism with app.app_context(): max_entries = app.config.get("DEDUP_MAX_ENTRIES", None) dedup_window = app.config.get("DEDUP_WINDOW", 3600) print(f" Dedup window: {dedup_window}s") if max_entries: print(f" Max entries config: {max_entries}") else: print(" NOTE: Dedup is stored in database (SQLite)") print(" Entries expire after window elapses") print(" Mitigated by PoW requirement for creation") print(" PASS: Database-backed with expiry") return True def test_concurrent_memory_pressure(): """Test memory behavior under concurrent load.""" print("\n[5] Concurrent Memory Pressure") print("=" * 50) app = create_app("testing") from app.api.routes import ( _rate_limit_requests, check_rate_limit, reset_rate_limits, ) with app.app_context(): reset_rate_limits() errors = [] def make_requests(thread_id: int) -> None: # Each thread needs its own app context with app.app_context(): try: for i in range(1000): ip = f"172.{thread_id}.{i // 256}.{i % 256}" check_rate_limit(ip, authenticated=False) except Exception as e: errors.append(str(e)) threads = [threading.Thread(target=make_requests, args=(t,)) for t in range(10)] start = time.time() for t in threads: t.start() for t in threads: t.join() elapsed = time.time() - start dict_size = len(_rate_limit_requests) max_entries = app.config.get("RATE_LIMIT_MAX_ENTRIES", 10000) print(" 10 threads x 1000 IPs = 10000 unique IPs") print(f" Elapsed: {elapsed:.2f}s") print(f" Final dict size: {dict_size}") print(f" Errors: {len(errors)}") reset_rate_limits() if errors: print(" FAIL: Errors during concurrent access") for e in errors[:5]: print(f" {e}") return False if dict_size > max_entries: print(" FAIL: Dict exceeded max under concurrency") return False print(" PASS: Concurrent access handled correctly") return True def main(): print("=" * 60) print("DoS MEMORY EXHAUSTION TESTS") print("=" * 60) results = [] results.append(("Anti-Flood List Growth", test_antiflood_memory())) results.append(("Rate Limit Dict Growth", test_rate_limit_memory())) results.append(("Lookup Rate Limit Growth", test_lookup_rate_limit_memory())) results.append(("Content Dedup Growth", test_dedup_memory())) results.append(("Concurrent Memory Pressure", test_concurrent_memory_pressure())) print("\n" + "=" * 60) print("SUMMARY") print("=" * 60) passed = sum(1 for _, r in results if r) total = len(results) for name, result in results: status = "PASS" if result else "FAIL" print(f" {status}: {name}") print(f"\n{passed}/{total} checks passed") # Report vulnerabilities if passed < total: print("\nVULNERABILITIES:") for name, result in results: if not result: print(f" - {name}") return 0 if passed == total else 1 if __name__ == "__main__": sys.exit(main())