fix lint errors (unused vars, line length, formatting)

This commit is contained in:
Username
2025-12-25 20:43:28 +01:00
parent 0496a39a91
commit 8408fedf5a
3 changed files with 30 additions and 36 deletions

View File

@@ -104,9 +104,7 @@ class Config:
# Maximum unique IPs tracked in rate limit storage (RATE-001: memory DoS protection) # Maximum unique IPs tracked in rate limit storage (RATE-001: memory DoS protection)
RATE_LIMIT_MAX_ENTRIES = int(os.environ.get("FLASKPASTE_RATE_MAX_ENTRIES", "10000")) RATE_LIMIT_MAX_ENTRIES = int(os.environ.get("FLASKPASTE_RATE_MAX_ENTRIES", "10000"))
# RATE-002: Cleanup threshold (0.0-1.0) - trigger cleanup when entries exceed this ratio # RATE-002: Cleanup threshold (0.0-1.0) - trigger cleanup when entries exceed this ratio
RATE_LIMIT_CLEANUP_THRESHOLD = float( RATE_LIMIT_CLEANUP_THRESHOLD = float(os.environ.get("FLASKPASTE_RATE_CLEANUP_THRESHOLD", "0.8"))
os.environ.get("FLASKPASTE_RATE_CLEANUP_THRESHOLD", "0.8")
)
# ENUM-001: Rate limiting for paste lookups (prevents enumeration attacks) # ENUM-001: Rate limiting for paste lookups (prevents enumeration attacks)
# Separate from creation limits - allows more reads but prevents brute-force # Separate from creation limits - allows more reads but prevents brute-force

View File

@@ -276,9 +276,7 @@ class PKI:
Raises: Raises:
PKIError: If unable to generate unique serial after max retries PKIError: If unable to generate unique serial after max retries
""" """
existing_serials = { existing_serials = {cert["serial"] for cert in self._certificates.values()}
cert["serial"] for cert in self._certificates.values()
}
for _ in range(_SERIAL_MAX_RETRIES): for _ in range(_SERIAL_MAX_RETRIES):
serial = x509.random_serial_number() serial = x509.random_serial_number()

View File

@@ -13,15 +13,13 @@ Examples:
./run_fuzz.py --phases 1,2,3 # Run specific phases ./run_fuzz.py --phases 1,2,3 # Run specific phases
./run_fuzz.py --quick # Quick smoke test ./run_fuzz.py --quick # Quick smoke test
""" """
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import asyncio
import hashlib
import json import json
import os import os
import random import random
import shutil
import signal import signal
import socket import socket
import string import string
@@ -114,9 +112,7 @@ class FlaskPasteFuzzer:
def log(self, msg: str, level: str = "INFO") -> None: def log(self, msg: str, level: str = "INFO") -> None:
"""Log message.""" """Log message."""
if self.verbose or level in ("ERROR", "CRITICAL", "FINDING"): if self.verbose or level in ("ERROR", "CRITICAL", "FINDING"):
symbol = {"INFO": "", "ERROR": "", "FINDING": "", "OK": ""}.get( symbol = {"INFO": "", "ERROR": "", "FINDING": "", "OK": ""}.get(level, "")
level, ""
)
print(f"{symbol} {msg}") print(f"{symbol} {msg}")
def start_server(self) -> bool: def start_server(self) -> bool:
@@ -139,9 +135,11 @@ class FlaskPasteFuzzer:
) )
project_root = Path(__file__).parent.parent.parent project_root = Path(__file__).parent.parent.parent
log_file = open(self.fuzz_dir / "logs" / "server.log", "w") self.log_file = open( # noqa: SIM115
self.fuzz_dir / "logs" / "server.log", "w"
)
self.server_proc = subprocess.Popen( self.server_proc = subprocess.Popen( # noqa: S603
[ [
str(project_root / "venv" / "bin" / "python"), str(project_root / "venv" / "bin" / "python"),
"run.py", "run.py",
@@ -152,7 +150,7 @@ class FlaskPasteFuzzer:
], ],
cwd=project_root, cwd=project_root,
env=env, env=env,
stdout=log_file, stdout=self.log_file,
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
) )
@@ -165,7 +163,7 @@ class FlaskPasteFuzzer:
sock.close() sock.close()
self.log("Server started successfully", "OK") self.log("Server started successfully", "OK")
return True return True
except (ConnectionRefusedError, socket.timeout): except (TimeoutError, ConnectionRefusedError):
time.sleep(0.5) time.sleep(0.5)
self.log("Server failed to start", "ERROR") self.log("Server failed to start", "ERROR")
@@ -283,7 +281,7 @@ class FlaskPasteFuzzer:
# HTTP method enumeration # HTTP method enumeration
methods = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD", "TRACE"] methods = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD", "TRACE"]
for method in methods: for method in methods:
status, body, _ = self.http_request(method, "/") status, _body, _ = self.http_request(method, "/")
requests += 1 requests += 1
if method == "TRACE" and status == 200: if method == "TRACE" and status == 200:
findings.append( findings.append(
@@ -333,7 +331,7 @@ class FlaskPasteFuzzer:
] ]
for payload in payloads: for payload in payloads:
status, body, duration = self.http_request( status, body, _duration = self.http_request(
"POST", "POST",
"/", "/",
data=payload, data=payload,
@@ -427,7 +425,7 @@ class FlaskPasteFuzzer:
encoded = quote(payload, safe="") encoded = quote(payload, safe="")
# Test in paste ID # Test in paste ID
status, body, duration = self.http_request("GET", f"/{encoded}") _status, body, duration = self.http_request("GET", f"/{encoded}")
requests += 1 requests += 1
# Timing-based detection # Timing-based detection
@@ -469,7 +467,7 @@ class FlaskPasteFuzzer:
] ]
for payload in traversal_payloads: for payload in traversal_payloads:
status, body, _ = self.http_request("GET", f"/{payload}") _status, body, _ = self.http_request("GET", f"/{payload}")
requests += 1 requests += 1
if b"root:" in body: if b"root:" in body:
@@ -494,7 +492,7 @@ class FlaskPasteFuzzer:
] ]
for payload, indicator in ssti_payloads: for payload, indicator in ssti_payloads:
status, body, _ = self.http_request( _status, body, _ = self.http_request(
"POST", "POST",
"/", "/",
data=payload.encode(), data=payload.encode(),
@@ -578,9 +576,7 @@ class FlaskPasteFuzzer:
] ]
for fp in fake_fingerprints: for fp in fake_fingerprints:
status, body, _ = self.http_request( status, body, _ = self.http_request("GET", "/pastes", headers={"X-SSL-Client-SHA1": fp})
"GET", "/pastes", headers={"X-SSL-Client-SHA1": fp}
)
requests += 1 requests += 1
if status == 200: if status == 200:
@@ -609,9 +605,7 @@ class FlaskPasteFuzzer:
] ]
for headers, desc in pow_bypasses: for headers, desc in pow_bypasses:
status, body, _ = self.http_request( status, body, _ = self.http_request("POST", "/", data=b"test", headers=headers)
"POST", "/", data=b"test", headers=headers
)
requests += 1 requests += 1
if status == 201: if status == 201:
@@ -705,7 +699,9 @@ class FlaskPasteFuzzer:
endpoint="POST /", endpoint="POST /",
vector="X-Expiry", vector="X-Expiry",
description=f"Expiry manipulation ({desc}) accepted", description=f"Expiry manipulation ({desc}) accepted",
reproduction=f'curl -X POST {self.target}/ -d test -H "X-Expiry: {value}"', reproduction=(
f'curl -X POST {self.target}/ -d test -H "X-Expiry: {value}"'
),
impact="Paste lifetime manipulation", impact="Paste lifetime manipulation",
) )
) )
@@ -734,7 +730,10 @@ class FlaskPasteFuzzer:
endpoint="POST /", endpoint="POST /",
vector="Body size", vector="Body size",
description=f"Large payload ({size // 1024 // 1024}MB) accepted", description=f"Large payload ({size // 1024 // 1024}MB) accepted",
reproduction=f"dd if=/dev/urandom bs=1M count={size // 1024 // 1024} | curl -X POST {self.target}/ --data-binary @-", reproduction=(
f"dd if=/dev/urandom bs=1M count={size // 1024 // 1024} | "
f"curl -X POST {self.target}/ --data-binary @-"
),
impact="Resource exhaustion", impact="Resource exhaustion",
evidence=f"Upload time: {duration:.2f}s", evidence=f"Upload time: {duration:.2f}s",
) )
@@ -747,7 +746,10 @@ class FlaskPasteFuzzer:
endpoint="POST /", endpoint="POST /",
vector="Body size", vector="Body size",
description=f"Server crash on large payload ({size // 1024 // 1024}MB)", description=f"Server crash on large payload ({size // 1024 // 1024}MB)",
reproduction=f"dd if=/dev/urandom bs=1M count={size // 1024 // 1024} | curl -X POST {self.target}/ --data-binary @-", reproduction=(
f"dd if=/dev/urandom bs=1M count={size // 1024 // 1024} | "
f"curl -X POST {self.target}/ --data-binary @-"
),
impact="Denial of service", impact="Denial of service",
) )
) )
@@ -821,9 +823,7 @@ class FlaskPasteFuzzer:
def generate_report(self) -> str: def generate_report(self) -> str:
"""Generate final report.""" """Generate final report."""
severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4} severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4}
sorted_findings = sorted( sorted_findings = sorted(self.findings, key=lambda f: severity_order.get(f.severity, 5))
self.findings, key=lambda f: severity_order.get(f.severity, 5)
)
report = [] report = []
report.append("=" * 79) report.append("=" * 79)
@@ -931,9 +931,7 @@ class FlaskPasteFuzzer:
def main() -> None: def main() -> None:
parser = argparse.ArgumentParser(description="FlaskPaste Offensive Security Tester") parser = argparse.ArgumentParser(description="FlaskPaste Offensive Security Tester")
parser.add_argument( parser.add_argument("--phases", type=str, help="Comma-separated phase numbers (e.g., 1,2,3)")
"--phases", type=str, help="Comma-separated phase numbers (e.g., 1,2,3)"
)
parser.add_argument("--quick", action="store_true", help="Quick smoke test") parser.add_argument("--quick", action="store_true", help="Quick smoke test")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument("--fuzz-dir", type=str, help="Fuzzing workspace directory") parser.add_argument("--fuzz-dir", type=str, help="Fuzzing workspace directory")