Files
flaskpaste/tests/fuzz/run_fuzz.py
Username e8b4cd5e77
Some checks failed
CI / Security Scan (push) Failing after 22s
CI / Lint & Format (push) Failing after 24s
CI / Unit Tests (push) Has been skipped
CI / Security Tests (push) Has been skipped
CI / Memory Leak Check (push) Has been skipped
CI / SBOM Generation (push) Has been skipped
ci: install dependencies for mypy type checking
Also fix type errors in fuzz tests.
2025-12-25 20:47:17 +01:00

962 lines
36 KiB
Python
Executable File

#!/usr/bin/env python3
"""
FlaskPaste Offensive Security Testing Harness
Runs comprehensive fuzzing and penetration testing against
an isolated FlaskPaste instance.
Usage:
./run_fuzz.py [--phases PHASES] [--quick] [--verbose]
Examples:
./run_fuzz.py # Run all phases
./run_fuzz.py --phases 1,2,3 # Run specific phases
./run_fuzz.py --quick # Quick smoke test
"""
from __future__ import annotations
import argparse
import json
import os
import random
import signal
import socket
import string
import subprocess
import sys
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from urllib.parse import quote
# Ensure we can import the app
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
@dataclass
class Finding:
"""Security finding."""
severity: str # CRITICAL, HIGH, MEDIUM, LOW, INFO
category: str # Injection, Auth, Logic, Crypto, DoS, Info
endpoint: str
vector: str
description: str
reproduction: str
impact: str
evidence: str = ""
def __str__(self) -> str:
return f"""
┌─────────────────────────────────────────────────────────────────────────────┐
│ FINDING: {self.category}-{self.severity[:4]}
├─────────────────────────────────────────────────────────────────────────────┤
│ Severity: {self.severity}
│ Category: {self.category}
│ Endpoint: {self.endpoint}
│ Vector: {self.vector}
├─────────────────────────────────────────────────────────────────────────────┤
│ Description:
{self.description}
│ Reproduction:
{self.reproduction}
│ Impact:
{self.impact}
│ Evidence:
{self.evidence[:200]}
└─────────────────────────────────────────────────────────────────────────────┘
"""
@dataclass
class FuzzResult:
"""Result of a fuzz test."""
phase: str
test: str
status: str # PASS, FAIL, ERROR, TIMEOUT
duration: float
requests: int = 0
findings: list[Finding] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
class FlaskPasteFuzzer:
"""Comprehensive fuzzer for FlaskPaste."""
def __init__(
self,
target: str = "http://127.0.0.1:5099",
fuzz_dir: Path | None = None,
verbose: bool = False,
):
self.target = target
self.fuzz_dir = fuzz_dir or Path(tempfile.mkdtemp(prefix="flaskpaste-fuzz-"))
self.verbose = verbose
self.findings: list[Finding] = []
self.results: list[FuzzResult] = []
self.server_proc: subprocess.Popen | None = None
# Create directories
(self.fuzz_dir / "db").mkdir(parents=True, exist_ok=True)
(self.fuzz_dir / "logs").mkdir(exist_ok=True)
(self.fuzz_dir / "crashes").mkdir(exist_ok=True)
(self.fuzz_dir / "results").mkdir(exist_ok=True)
def log(self, msg: str, level: str = "INFO") -> None:
"""Log message."""
if self.verbose or level in ("ERROR", "CRITICAL", "FINDING"):
symbol = {"INFO": "", "ERROR": "", "FINDING": "", "OK": ""}.get(level, "")
print(f"{symbol} {msg}")
def start_server(self) -> bool:
"""Start isolated FlaskPaste instance."""
self.log("Starting isolated FlaskPaste instance...")
env = os.environ.copy()
env.update(
{
"FLASKPASTE_DATABASE": str(self.fuzz_dir / "db" / "fuzz.db"),
"FLASKPASTE_SECRET_KEY": f"fuzz-{os.urandom(16).hex()}",
"FLASKPASTE_PKI_PASSWORD": "fuzz-pki-password",
"FLASKPASTE_REGISTRATION_ENABLED": "true",
"FLASKPASTE_POW_DIFFICULTY": "1",
"FLASKPASTE_RATE_LIMIT_ENABLED": "false",
"FLASKPASTE_PROXY_SECRET": "",
"FLASK_ENV": "development",
"FLASK_DEBUG": "0",
}
)
project_root = Path(__file__).parent.parent.parent
self.log_file = open( # noqa: SIM115
self.fuzz_dir / "logs" / "server.log", "w"
)
self.server_proc = subprocess.Popen( # noqa: S603
[
str(project_root / "venv" / "bin" / "python"),
"run.py",
"--host",
"127.0.0.1",
"--port",
"5099",
],
cwd=project_root,
env=env,
stdout=self.log_file,
stderr=subprocess.STDOUT,
)
# Wait for server to start
for _ in range(30):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.connect(("127.0.0.1", 5099))
sock.close()
self.log("Server started successfully", "OK")
return True
except (TimeoutError, ConnectionRefusedError):
time.sleep(0.5)
self.log("Server failed to start", "ERROR")
return False
def stop_server(self) -> None:
"""Stop the fuzzing server."""
if self.server_proc:
self.server_proc.terminate()
try:
self.server_proc.wait(timeout=5)
except subprocess.TimeoutExpired:
self.server_proc.kill()
self.log("Server stopped")
def http_request(
self,
method: str,
path: str,
data: bytes | None = None,
headers: dict[str, str] | None = None,
timeout: float = 5.0,
) -> tuple[int | None, bytes, float]:
"""Make HTTP request, return (status, body, duration)."""
import http.client
import urllib.parse
parsed = urllib.parse.urlparse(self.target)
start = time.perf_counter()
try:
assert parsed.hostname is not None
conn = http.client.HTTPConnection(parsed.hostname, parsed.port, timeout=timeout)
hdrs = headers or {}
if data:
hdrs.setdefault("Content-Length", str(len(data)))
conn.request(method, path, body=data, headers=hdrs)
resp = conn.getresponse()
body = resp.read()
duration = time.perf_counter() - start
conn.close()
return resp.status, body, duration
except Exception as e:
duration = time.perf_counter() - start
return None, str(e).encode(), duration
# ─────────────────────────────────────────────────────────────────────────
# Phase 1: Reconnaissance
# ─────────────────────────────────────────────────────────────────────────
def phase1_recon(self) -> FuzzResult:
"""Phase 1: Reconnaissance and enumeration."""
self.log("Phase 1: Reconnaissance")
start = time.time()
requests = 0
findings = []
# Known endpoints
endpoints = [
"/",
"/health",
"/challenge",
"/client",
"/register",
"/pastes",
"/pki",
"/pki/ca",
"/pki/ca.crt",
"/pki/issue",
"/pki/certs",
"/audit",
]
# Hidden endpoint discovery
hidden_tests = [
"/admin",
"/debug",
"/config",
"/env",
"/.env",
"/robots.txt",
"/sitemap.xml",
"/.git/config",
"/api",
"/api/v1",
"/swagger",
"/docs",
"/graphql",
"/metrics",
"/prometheus",
"/__debug__",
"/server-status",
"/phpinfo.php",
"/wp-admin",
]
for path in endpoints + hidden_tests:
status, _, _ = self.http_request("GET", path)
requests += 1
if status and status not in (404, 405):
self.log(f" {status} {path}")
if path in hidden_tests and status == 200:
findings.append(
Finding(
severity="MEDIUM",
category="Info",
endpoint=f"GET {path}",
vector="URL",
description=f"Hidden endpoint accessible: {path}",
reproduction=f"curl {self.target}{path}",
impact="Information disclosure, potential attack surface",
)
)
# HTTP method enumeration
methods = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD", "TRACE"]
for method in methods:
status, _body, _ = self.http_request(method, "/")
requests += 1
if method == "TRACE" and status == 200:
findings.append(
Finding(
severity="LOW",
category="Info",
endpoint="TRACE /",
vector="HTTP Method",
description="TRACE method enabled",
reproduction=f"curl -X TRACE {self.target}/",
impact="Cross-site tracing (XST) possible",
)
)
self.findings.extend(findings)
return FuzzResult(
phase="1",
test="Reconnaissance",
status="PASS",
duration=time.time() - start,
requests=requests,
findings=findings,
)
# ─────────────────────────────────────────────────────────────────────────
# Phase 2: Input Fuzzing
# ─────────────────────────────────────────────────────────────────────────
def phase2_input_fuzzing(self) -> FuzzResult:
"""Phase 2: Input fuzzing."""
self.log("Phase 2: Input Fuzzing")
start = time.time()
requests = 0
findings = []
errors = []
# Paste content fuzzing
payloads: list[bytes] = [
b"normal text",
b"\x00" * 100, # Null bytes
b"\xff" * 100, # High bytes
os.urandom(1000), # Random binary
b"A" * 100000, # Large payload
"".join(random.choices(string.printable, k=1000)).encode(),
("\u202e" * 100).encode("utf-8"), # RTL override
("A\u0300" * 100).encode("utf-8"), # Combining characters
]
for payload in payloads:
status, body, _duration = self.http_request(
"POST",
"/",
data=payload,
headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"},
)
requests += 1
if status is None:
errors.append(f"Request failed: {body.decode(errors='replace')[:100]}")
elif status == 500:
findings.append(
Finding(
severity="HIGH",
category="DoS",
endpoint="POST /",
vector="Body",
description=f"Server error with payload: {payload[:50]!r}",
reproduction=f"curl -X POST {self.target}/ --data-binary ...",
impact="Potential denial of service",
evidence=body.decode(errors="replace")[:200],
)
)
# Save crash payload
crash_file = self.fuzz_dir / "crashes" / f"{int(time.time())}.payload"
crash_file.write_bytes(payload)
# Header fuzzing
header_tests = [
("X-Forwarded-For", "' OR 1=1--"),
("X-Forwarded-For", "127.0.0.1, 10.0.0.1, evil.com"),
("X-SSL-Client-SHA1", "' OR 1=1--"),
("X-SSL-Client-SHA1", "../../../etc/passwd"),
("X-PoW-Token", "A" * 10000),
("Host", "evil.com"),
("Host", "evil.com\r\nX-Injected: true"),
("X-Expiry", "-1"),
("X-Expiry", "99999999999999999999"),
("Content-Type", "text/html\r\nX-Injected: true"),
]
for header, value in header_tests:
status, body, _ = self.http_request("GET", "/", headers={header: value})
requests += 1
if b"X-Injected" in body or b"true" in body:
findings.append(
Finding(
severity="HIGH",
category="Injection",
endpoint="GET /",
vector=f"Header: {header}",
description="Header injection vulnerability",
reproduction=f'curl {self.target}/ -H "{header}: {value}"',
impact="HTTP response splitting, cache poisoning",
)
)
self.findings.extend(findings)
return FuzzResult(
phase="2",
test="Input Fuzzing",
status="PASS" if not errors else "ERROR",
duration=time.time() - start,
requests=requests,
findings=findings,
errors=errors,
)
# ─────────────────────────────────────────────────────────────────────────
# Phase 3: Injection Attacks
# ─────────────────────────────────────────────────────────────────────────
def phase3_injection(self) -> FuzzResult:
"""Phase 3: Injection attacks."""
self.log("Phase 3: Injection Attacks")
start = time.time()
requests = 0
findings = []
# SQL injection payloads
sqli_payloads = [
"' OR '1'='1",
"1' OR '1'='1'--",
"1; DROP TABLE pastes;--",
"1 UNION SELECT * FROM sqlite_master--",
"1' AND SLEEP(5)--",
"1/**/OR/**/1=1",
]
for payload in sqli_payloads:
encoded = quote(payload, safe="")
# Test in paste ID
_status, body, duration = self.http_request("GET", f"/{encoded}")
requests += 1
# Timing-based detection
if duration > 4.5:
findings.append(
Finding(
severity="CRITICAL",
category="Injection",
endpoint=f"GET /{payload[:20]}...",
vector="paste_id",
description="Possible blind SQL injection (time-based)",
reproduction=f"curl {self.target}/{encoded}",
impact="Full database compromise",
evidence=f"Response time: {duration:.2f}s",
)
)
# Error-based detection
if b"sqlite" in body.lower() or b"syntax" in body.lower():
findings.append(
Finding(
severity="CRITICAL",
category="Injection",
endpoint=f"GET /{payload[:20]}...",
vector="paste_id",
description="SQL injection with error disclosure",
reproduction=f"curl {self.target}/{encoded}",
impact="Full database compromise",
evidence=body.decode(errors="replace")[:200],
)
)
# Path traversal
traversal_payloads = [
"../../../etc/passwd",
"....//....//....//etc/passwd",
"..%2f..%2f..%2fetc/passwd",
"..%252f..%252f..%252fetc/passwd",
]
for payload in traversal_payloads:
_status, body, _ = self.http_request("GET", f"/{payload}")
requests += 1
if b"root:" in body:
findings.append(
Finding(
severity="CRITICAL",
category="Injection",
endpoint=f"GET /{payload}",
vector="paste_id",
description="Path traversal vulnerability",
reproduction=f"curl {self.target}/{payload}",
impact="Arbitrary file read",
evidence=body.decode(errors="replace")[:200],
)
)
# SSTI
ssti_payloads = [
("{{7*7}}", b"49"),
("{{config}}", b"SECRET_KEY"),
("${7*7}", b"49"),
]
for payload, indicator in ssti_payloads:
_status, body, _ = self.http_request(
"POST",
"/",
data=payload.encode(),
headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"},
)
requests += 1
if indicator in body:
findings.append(
Finding(
severity="CRITICAL",
category="Injection",
endpoint="POST /",
vector="Body",
description="Server-side template injection",
reproduction=f"curl -X POST {self.target}/ -d '{payload}'",
impact="Remote code execution",
evidence=body.decode(errors="replace")[:200],
)
)
# Command injection
cmd_payloads = [
"; sleep 5",
"| sleep 5",
"$(sleep 5)",
"`sleep 5`",
]
for payload in cmd_payloads:
_, _, duration = self.http_request(
"POST",
"/",
data=payload.encode(),
headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"},
)
requests += 1
if duration > 4.5:
findings.append(
Finding(
severity="CRITICAL",
category="Injection",
endpoint="POST /",
vector="Body",
description="Command injection vulnerability",
reproduction=f"curl -X POST {self.target}/ -d '{payload}'",
impact="Remote code execution",
evidence=f"Response time: {duration:.2f}s",
)
)
self.findings.extend(findings)
return FuzzResult(
phase="3",
test="Injection Attacks",
status="PASS",
duration=time.time() - start,
requests=requests,
findings=findings,
)
# ─────────────────────────────────────────────────────────────────────────
# Phase 4: Authentication & Authorization
# ─────────────────────────────────────────────────────────────────────────
def phase4_auth(self) -> FuzzResult:
"""Phase 4: Authentication and authorization attacks."""
self.log("Phase 4: Auth/Authz")
start = time.time()
requests = 0
findings = []
# Certificate fingerprint spoofing
fake_fingerprints = [
"0000000000000000000000000000000000000000",
"da39a3ee5e6b4b0d3255bfef95601890afd80709", # SHA1 of empty
"ffffffffffffffffffffffffffffffffffffffff",
"' OR 1=1--",
"../../../etc/passwd",
]
for fp in fake_fingerprints:
status, body, _ = self.http_request("GET", "/pastes", headers={"X-SSL-Client-SHA1": fp})
requests += 1
if status == 200:
findings.append(
Finding(
severity="CRITICAL",
category="Auth",
endpoint="GET /pastes",
vector="X-SSL-Client-SHA1",
description="Certificate fingerprint bypass",
reproduction=f'curl {self.target}/pastes -H "X-SSL-Client-SHA1: {fp}"',
impact="Authentication bypass",
evidence=body.decode(errors="replace")[:200],
)
)
# PoW bypass
pow_bypasses = [
({"X-PoW-Token": "", "X-PoW-Solution": ""}, "empty"),
({"X-PoW-Token": "invalid", "X-PoW-Solution": "0"}, "invalid token"),
({"X-PoW-Token": "test", "X-PoW-Solution": "-1"}, "negative solution"),
(
{"X-PoW-Token": "test", "X-PoW-Solution": "999999999999999999999"},
"overflow",
),
]
for headers, desc in pow_bypasses:
status, body, _ = self.http_request("POST", "/", data=b"test", headers=headers)
requests += 1
if status == 201:
findings.append(
Finding(
severity="HIGH",
category="Auth",
endpoint="POST /",
vector="PoW headers",
description=f"PoW bypass via {desc}",
reproduction=f"curl -X POST {self.target}/ -d test -H ...",
impact="Spam protection bypass",
)
)
# Admin endpoint access without auth
admin_endpoints = [
("GET", "/pki/certs"),
("POST", "/pki/issue"),
("GET", "/audit"),
]
for method, path in admin_endpoints:
status, body, _ = self.http_request(method, path)
requests += 1
if status == 200:
findings.append(
Finding(
severity="HIGH",
category="Auth",
endpoint=f"{method} {path}",
vector="Missing auth",
description="Admin endpoint accessible without authentication",
reproduction=f"curl -X {method} {self.target}{path}",
impact="Unauthorized admin access",
)
)
self.findings.extend(findings)
return FuzzResult(
phase="4",
test="Auth/Authz",
status="PASS",
duration=time.time() - start,
requests=requests,
findings=findings,
)
# ─────────────────────────────────────────────────────────────────────────
# Phase 5: Business Logic
# ─────────────────────────────────────────────────────────────────────────
def phase5_logic(self) -> FuzzResult:
"""Phase 5: Business logic attacks."""
self.log("Phase 5: Business Logic")
start = time.time()
requests = 0
findings = []
# Expiry manipulation
expiry_tests = [
("-1", "negative"),
("0", "zero"),
("99999999999999999999", "overflow"),
("abc", "non-numeric"),
]
for value, desc in expiry_tests:
status, body, _ = self.http_request(
"POST",
"/",
data=b"expiry-test",
headers={
"X-Expiry": value,
"X-PoW-Token": "test",
"X-PoW-Solution": "0",
},
)
requests += 1
if status == 201:
# Check if it was accepted without error
try:
data = json.loads(body)
if "id" in data:
findings.append(
Finding(
severity="MEDIUM",
category="Logic",
endpoint="POST /",
vector="X-Expiry",
description=f"Expiry manipulation ({desc}) accepted",
reproduction=(
f'curl -X POST {self.target}/ -d test -H "X-Expiry: {value}"'
),
impact="Paste lifetime manipulation",
)
)
except json.JSONDecodeError:
pass
# Large payload DoS
sizes = [1024 * 1024, 10 * 1024 * 1024] # 1MB, 10MB
for size in sizes:
payload = os.urandom(size)
status, body, duration = self.http_request(
"POST",
"/",
data=payload,
headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"},
timeout=30,
)
requests += 1
if status == 201:
findings.append(
Finding(
severity="MEDIUM",
category="DoS",
endpoint="POST /",
vector="Body size",
description=f"Large payload ({size // 1024 // 1024}MB) accepted",
reproduction=(
f"dd if=/dev/urandom bs=1M count={size // 1024 // 1024} | "
f"curl -X POST {self.target}/ --data-binary @-"
),
impact="Resource exhaustion",
evidence=f"Upload time: {duration:.2f}s",
)
)
elif status == 500:
findings.append(
Finding(
severity="HIGH",
category="DoS",
endpoint="POST /",
vector="Body size",
description=f"Server crash on large payload ({size // 1024 // 1024}MB)",
reproduction=(
f"dd if=/dev/urandom bs=1M count={size // 1024 // 1024} | "
f"curl -X POST {self.target}/ --data-binary @-"
),
impact="Denial of service",
)
)
self.findings.extend(findings)
return FuzzResult(
phase="5",
test="Business Logic",
status="PASS",
duration=time.time() - start,
requests=requests,
findings=findings,
)
# ─────────────────────────────────────────────────────────────────────────
# Phase 6: Crypto
# ─────────────────────────────────────────────────────────────────────────
def phase6_crypto(self) -> FuzzResult:
"""Phase 6: Cryptographic attacks."""
self.log("Phase 6: Cryptography")
start = time.time()
requests = 0
findings = []
# Collect PoW tokens for randomness analysis
tokens = []
for _ in range(100):
status, body, _ = self.http_request("GET", "/challenge")
requests += 1
if status == 200:
try:
data = json.loads(body)
if "token" in data:
tokens.append(data["token"])
except json.JSONDecodeError:
pass
# Check for duplicate tokens
if len(tokens) != len(set(tokens)):
findings.append(
Finding(
severity="HIGH",
category="Crypto",
endpoint="GET /challenge",
vector="token",
description="Duplicate PoW tokens generated",
reproduction=f"for i in {{1..100}}; do curl -s {self.target}/challenge; done",
impact="PoW replay attack possible",
)
)
# Password timing attack simulation
# Create a paste with password first would be needed
# Skipping for now as it requires state setup
self.findings.extend(findings)
return FuzzResult(
phase="6",
test="Cryptography",
status="PASS",
duration=time.time() - start,
requests=requests,
findings=findings,
)
# ─────────────────────────────────────────────────────────────────────────
# Report Generation
# ─────────────────────────────────────────────────────────────────────────
def generate_report(self) -> str:
"""Generate final report."""
severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4}
sorted_findings = sorted(self.findings, key=lambda f: severity_order.get(f.severity, 5))
report = []
report.append("=" * 79)
report.append("FLASKPASTE OFFENSIVE SECURITY TEST REPORT")
report.append("=" * 79)
report.append(f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"Target: {self.target}")
report.append("")
# Summary
report.append("SUMMARY")
report.append("-" * 79)
total_requests = sum(r.requests for r in self.results)
report.append(f"Total phases run: {len(self.results)}")
report.append(f"Total requests: {total_requests}")
report.append(f"Total findings: {len(self.findings)}")
by_severity: dict[str, int] = {}
for f in self.findings:
by_severity[f.severity] = by_severity.get(f.severity, 0) + 1
for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]:
if sev in by_severity:
report.append(f" {sev}: {by_severity[sev]}")
report.append("")
# Phase Results
report.append("PHASE RESULTS")
report.append("-" * 79)
for r in self.results:
status_sym = "" if r.status == "PASS" else ""
report.append(
f"{status_sym} Phase {r.phase}: {r.test} ({r.requests} requests, {r.duration:.1f}s)"
)
if r.findings:
report.append(f"{len(r.findings)} findings")
if r.errors:
report.append(f"{len(r.errors)} errors")
report.append("")
# Findings
if sorted_findings:
report.append("FINDINGS")
report.append("-" * 79)
for f in sorted_findings:
report.append(str(f))
return "\n".join(report)
def run(self, phases: list[int] | None = None, quick: bool = False) -> None:
"""Run fuzzing campaign."""
all_phases = [
(1, self.phase1_recon),
(2, self.phase2_input_fuzzing),
(3, self.phase3_injection),
(4, self.phase4_auth),
(5, self.phase5_logic),
(6, self.phase6_crypto),
]
if quick:
phases = [1, 3] # Just recon and injection for quick test
if phases:
all_phases = [(n, f) for n, f in all_phases if n in phases]
try:
if not self.start_server():
print("Failed to start server, aborting")
return
for phase_num, phase_func in all_phases:
try:
result = phase_func()
self.results.append(result)
status = "" if result.status == "PASS" else ""
self.log(
f" {status} {result.test}: {len(result.findings)} findings",
"OK" if result.status == "PASS" else "ERROR",
)
except Exception as e:
self.log(f" Phase {phase_num} failed: {e}", "ERROR")
self.results.append(
FuzzResult(
phase=str(phase_num),
test=phase_func.__name__,
status="ERROR",
duration=0,
errors=[str(e)],
)
)
# Generate and save report
report = self.generate_report()
report_file = self.fuzz_dir / "results" / "REPORT.txt"
report_file.write_text(report)
print(report)
print(f"\nReport saved to: {report_file}")
finally:
self.stop_server()
def main() -> None:
parser = argparse.ArgumentParser(description="FlaskPaste Offensive Security Tester")
parser.add_argument("--phases", type=str, help="Comma-separated phase numbers (e.g., 1,2,3)")
parser.add_argument("--quick", action="store_true", help="Quick smoke test")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument("--fuzz-dir", type=str, help="Fuzzing workspace directory")
args = parser.parse_args()
phases = None
if args.phases:
phases = [int(p.strip()) for p in args.phases.split(",")]
fuzz_dir = Path(args.fuzz_dir) if args.fuzz_dir else None
fuzzer = FlaskPasteFuzzer(verbose=args.verbose, fuzz_dir=fuzz_dir)
# Handle Ctrl+C gracefully
def signal_handler(sig: int, frame: Any) -> None:
print("\nInterrupted, cleaning up...")
fuzzer.stop_server()
sys.exit(1)
signal.signal(signal.SIGINT, signal_handler)
fuzzer.run(phases=phases, quick=args.quick)
if __name__ == "__main__":
main()