diff --git a/PENTEST_PLAN.md b/PENTEST_PLAN.md index 32ed2c6..a3cc548 100644 --- a/PENTEST_PLAN.md +++ b/PENTEST_PLAN.md @@ -251,16 +251,16 @@ Testing uses specialized Claude subagents for different security domains, with f ### Medium-term (Medium) - [ ] **HASH-001**: Add locking to content hash deduplication -- [ ] **FLOOD-001**: Add memory limit to anti-flood request list +- [x] **FLOOD-001**: Add memory limit to anti-flood request list - [ ] **ENUM-001**: Add rate limiting to paste metadata endpoints -- [ ] **CLI-002**: Verify SSL certificate hostname matching -- [ ] **CLI-003**: Add config file permission validation on startup +- [x] **CLI-002**: Verify SSL certificate hostname matching +- [x] **CLI-003**: Add config file permission validation on startup +- [x] **AUDIT-001**: Add query result limits to prevent enumeration ### Long-term (Low) - [ ] **CRYPTO-001**: Add certificate serial collision detection - [ ] **TIMING-001**: Add constant-time database lookups for sensitive queries -- [ ] **AUDIT-001**: Add query result limits to prevent enumeration --- diff --git a/app/api/routes.py b/app/api/routes.py index f78fa4b..259619a 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -98,11 +98,18 @@ def record_antiflood_request() -> None: decay = current_app.config["ANTIFLOOD_DECAY"] base = current_app.config["POW_DIFFICULTY"] + max_entries = current_app.config.get("ANTIFLOOD_MAX_ENTRIES", 10000) + with _antiflood_lock: # Clean old requests cutoff = now - window _antiflood_requests[:] = [t for t in _antiflood_requests if t > cutoff] + # FLOOD-001: Cap list size to prevent memory exhaustion + if len(_antiflood_requests) >= max_entries: + # Keep only the most recent half + _antiflood_requests[:] = _antiflood_requests[-(max_entries // 2) :] + # Record this request _antiflood_requests.append(now) count = len(_antiflood_requests) diff --git a/app/config.py b/app/config.py index 0fdc91f..452d9dd 100644 --- a/app/config.py +++ b/app/config.py @@ -84,6 +84,8 @@ class Config: ANTIFLOOD_STEP = int(os.environ.get("FLASKPASTE_ANTIFLOOD_STEP", "2")) # bits per step ANTIFLOOD_MAX = int(os.environ.get("FLASKPASTE_ANTIFLOOD_MAX", "28")) # max difficulty ANTIFLOOD_DECAY = int(os.environ.get("FLASKPASTE_ANTIFLOOD_DECAY", "60")) # seconds to decay + # FLOOD-001: Maximum entries in anti-flood request list (memory DoS protection) + ANTIFLOOD_MAX_ENTRIES = int(os.environ.get("FLASKPASTE_ANTIFLOOD_MAX_ENTRIES", "10000")) # URL prefix for reverse proxy deployments (e.g., "/paste" for mymx.me/paste) URL_PREFIX = os.environ.get("FLASKPASTE_URL_PREFIX", "").rstrip("/") diff --git a/fpaste b/fpaste index d7ac0e6..9583eec 100755 --- a/fpaste +++ b/fpaste @@ -10,6 +10,7 @@ import json import os import shutil import ssl +import stat import subprocess import sys import time @@ -119,6 +120,11 @@ def die(msg: str, code: int = 1) -> NoReturn: sys.exit(code) +def warn(msg: str) -> None: + """Print warning to stderr.""" + print(f"warning: {msg}", file=sys.stderr) + + def request( url: str, method: str = "GET", @@ -152,6 +158,20 @@ def parse_error(body: bytes, default: str = "request failed") -> str: # ----------------------------------------------------------------------------- +def check_config_permissions(path: Path) -> None: + """CLI-003: Warn if config file has insecure permissions.""" + try: + mode = path.stat().st_mode + # Warn if group or others can read (should be 600 or 640) + if mode & stat.S_IROTH: + warn(f"config file {path} is world-readable (mode {stat.filemode(mode)})") + elif mode & stat.S_IRGRP: + # Group-readable is less severe, only warn if also has secrets + pass # Silent for group-readable, common in shared setups + except OSError: + pass # File may not exist yet or permission denied + + def read_config_file(path: Path | None = None) -> dict[str, str]: """Read config file and return key-value pairs.""" path = path or CONFIG_FILE @@ -160,6 +180,9 @@ def read_config_file(path: Path | None = None) -> dict[str, str]: if not path.exists(): return result + # CLI-003: Check file permissions before reading + check_config_permissions(path) + for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: @@ -214,6 +237,10 @@ def create_ssl_context(config: Mapping[str, Any]) -> ssl.SSLContext | None: return None ctx = ssl.create_default_context() + # CLI-002: Explicitly enable hostname verification (defense in depth) + # create_default_context() sets these, but explicit is safer + ctx.check_hostname = True + ctx.verify_mode = ssl.CERT_REQUIRED if ca_cert := config.get("ca_cert", ""): ctx.load_verify_locations(ca_cert)