forked from username/flaskpaste
security: implement CRYPTO-001 and TIMING-001 remediations
CRYPTO-001: Certificate serial collision detection - Add _generate_unique_serial() helper for database-backed PKI - Add _generate_unique_serial() method for in-memory PKI class - Check database for existing serial before certificate issuance - Retry with new random serial if collision detected (max 5 attempts) TIMING-001: Constant-time database lookups for sensitive queries - Add dummy PBKDF2 verification when paste not found - Prevents timing-based enumeration (attackers can't distinguish 'not found' from 'wrong password' by measuring response time)
This commit is contained in:
@@ -259,8 +259,8 @@ Testing uses specialized Claude subagents for different security domains, with f
|
||||
|
||||
### Long-term (Low)
|
||||
|
||||
- [ ] **CRYPTO-001**: Add certificate serial collision detection
|
||||
- [ ] **TIMING-001**: Add constant-time database lookups for sensitive queries
|
||||
- [x] **CRYPTO-001**: Add certificate serial collision detection
|
||||
- [x] **TIMING-001**: Add constant-time database lookups for sensitive queries
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -485,6 +485,16 @@ def fetch_paste(paste_id: str, check_password: bool = True) -> Response | None:
|
||||
).fetchone()
|
||||
|
||||
if row is None:
|
||||
# TIMING-001: Perform dummy password verification to prevent timing-based
|
||||
# enumeration (attacker can't distinguish "not found" from "wrong password"
|
||||
# by measuring response time)
|
||||
if check_password:
|
||||
dummy_hash = (
|
||||
"$pbkdf2-sha256$600000$"
|
||||
"0000000000000000000000000000000000000000000000000000000000000000$"
|
||||
"0000000000000000000000000000000000000000000000000000000000000000"
|
||||
)
|
||||
verify_password("dummy", dummy_hash)
|
||||
db.commit()
|
||||
return error_response("Paste not found", 404)
|
||||
|
||||
|
||||
61
app/pki.py
61
app/pki.py
@@ -43,6 +43,7 @@ except ImportError:
|
||||
_KDF_ITERATIONS = 600000 # OWASP 2023 recommendation
|
||||
_SALT_LENGTH = 32
|
||||
_KEY_LENGTH = 32 # AES-256
|
||||
_SERIAL_MAX_RETRIES = 5 # CRYPTO-001: Max attempts for unique serial generation
|
||||
|
||||
|
||||
class PKIError(Exception):
|
||||
@@ -264,6 +265,29 @@ class PKI:
|
||||
"""Check if CA exists."""
|
||||
return self._ca_store is not None
|
||||
|
||||
def _generate_unique_serial(self) -> int:
|
||||
"""Generate a unique certificate serial number.
|
||||
|
||||
CRYPTO-001: Checks existing certificates for collision.
|
||||
|
||||
Returns:
|
||||
Unique serial number as integer
|
||||
|
||||
Raises:
|
||||
PKIError: If unable to generate unique serial after max retries
|
||||
"""
|
||||
existing_serials = {
|
||||
cert["serial"] for cert in self._certificates.values()
|
||||
}
|
||||
|
||||
for _ in range(_SERIAL_MAX_RETRIES):
|
||||
serial = x509.random_serial_number()
|
||||
serial_hex = format(serial, "032x")
|
||||
if serial_hex not in existing_serials:
|
||||
return serial
|
||||
|
||||
raise PKIError("Failed to generate unique serial after max retries")
|
||||
|
||||
def generate_ca(
|
||||
self,
|
||||
common_name: str,
|
||||
@@ -524,7 +548,8 @@ class PKI:
|
||||
|
||||
# Build certificate
|
||||
now = datetime.now(UTC)
|
||||
serial = x509.random_serial_number()
|
||||
# CRYPTO-001: Use collision-safe serial generation
|
||||
serial = self._generate_unique_serial()
|
||||
|
||||
subject = x509.Name(
|
||||
[
|
||||
@@ -784,6 +809,37 @@ def get_ca_info(skip_enabled_check: bool = False) -> dict | None:
|
||||
}
|
||||
|
||||
|
||||
def _generate_unique_serial(db: Any) -> int:
|
||||
"""Generate a unique certificate serial number.
|
||||
|
||||
CRYPTO-001: Checks database for collision before returning.
|
||||
|
||||
Args:
|
||||
db: Database connection
|
||||
|
||||
Returns:
|
||||
Unique serial number as integer
|
||||
|
||||
Raises:
|
||||
PKIError: If unable to generate unique serial after max retries
|
||||
"""
|
||||
_require_crypto()
|
||||
|
||||
for _ in range(_SERIAL_MAX_RETRIES):
|
||||
serial = x509.random_serial_number()
|
||||
serial_hex = format(serial, "032x")
|
||||
|
||||
# Check for collision
|
||||
existing = db.execute(
|
||||
"SELECT 1 FROM issued_certificates WHERE serial = ?", (serial_hex,)
|
||||
).fetchone()
|
||||
|
||||
if existing is None:
|
||||
return serial
|
||||
|
||||
raise PKIError("Failed to generate unique serial after max retries")
|
||||
|
||||
|
||||
def generate_ca(
|
||||
common_name: str,
|
||||
password: str,
|
||||
@@ -958,7 +1014,8 @@ def issue_certificate(
|
||||
|
||||
# Build certificate
|
||||
now = datetime.now(UTC)
|
||||
serial = x509.random_serial_number()
|
||||
# CRYPTO-001: Use collision-safe serial generation
|
||||
serial = _generate_unique_serial(db)
|
||||
|
||||
subject = x509.Name(
|
||||
[
|
||||
|
||||
@@ -43,8 +43,8 @@ class TestClipboardPathValidation:
|
||||
def test_untrusted_unix_paths(self, fpaste):
|
||||
"""Paths in user-writable directories should be rejected."""
|
||||
assert fpaste.is_trusted_clipboard_path("/home/user/bin/xclip") is False
|
||||
assert fpaste.is_trusted_clipboard_path("/tmp/xclip") is False
|
||||
assert fpaste.is_trusted_clipboard_path("/var/tmp/malicious") is False
|
||||
assert fpaste.is_trusted_clipboard_path("/tmp/xclip") is False # noqa: S108
|
||||
assert fpaste.is_trusted_clipboard_path("/var/tmp/malicious") is False # noqa: S108
|
||||
assert fpaste.is_trusted_clipboard_path("./xclip") is False
|
||||
assert fpaste.is_trusted_clipboard_path("") is False
|
||||
|
||||
@@ -70,7 +70,7 @@ class TestClipboardPathValidation:
|
||||
"""find_clipboard_command should reject tools in untrusted paths."""
|
||||
with patch("shutil.which") as mock_which:
|
||||
# Untrusted path should be rejected
|
||||
mock_which.return_value = "/tmp/malicious/xclip"
|
||||
mock_which.return_value = "/tmp/malicious/xclip" # noqa: S108
|
||||
result = fpaste.find_clipboard_command(fpaste.CLIPBOARD_READ_COMMANDS)
|
||||
assert result is None
|
||||
|
||||
|
||||
Reference in New Issue
Block a user