add offensive security testing framework
Some checks failed
CI / Lint & Format (push) Failing after 16s
CI / Unit Tests (push) Has been skipped
CI / Memory Leak Check (push) Has been skipped
CI / SBOM Generation (push) Has been skipped
CI / Security Scan (push) Failing after 21s
CI / Security Tests (push) Has been skipped
Some checks failed
CI / Lint & Format (push) Failing after 16s
CI / Unit Tests (push) Has been skipped
CI / Memory Leak Check (push) Has been skipped
CI / SBOM Generation (push) Has been skipped
CI / Security Scan (push) Failing after 21s
CI / Security Tests (push) Has been skipped
- FUZZING.md: comprehensive attack methodology covering 10 phases - tests/fuzz/run_fuzz.py: automated fuzzing harness with 6 test phases Phases: recon, input fuzzing, injection (SQLi, SSTI, path traversal, command injection), auth bypass, business logic, crypto attacks. Includes: radamsa mutations, hypothesis property testing, atheris coverage-guided fuzzing, HTTP smuggling, slowloris, nuclei templates.
This commit is contained in:
1033
FUZZING.md
Normal file
1033
FUZZING.md
Normal file
File diff suppressed because it is too large
Load Diff
962
tests/fuzz/run_fuzz.py
Executable file
962
tests/fuzz/run_fuzz.py
Executable file
@@ -0,0 +1,962 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
FlaskPaste Offensive Security Testing Harness
|
||||
|
||||
Runs comprehensive fuzzing and penetration testing against
|
||||
an isolated FlaskPaste instance.
|
||||
|
||||
Usage:
|
||||
./run_fuzz.py [--phases PHASES] [--quick] [--verbose]
|
||||
|
||||
Examples:
|
||||
./run_fuzz.py # Run all phases
|
||||
./run_fuzz.py --phases 1,2,3 # Run specific phases
|
||||
./run_fuzz.py --quick # Quick smoke test
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
import signal
|
||||
import socket
|
||||
import string
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.parse import quote
|
||||
|
||||
# Ensure we can import the app
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
|
||||
@dataclass
|
||||
class Finding:
|
||||
"""Security finding."""
|
||||
|
||||
severity: str # CRITICAL, HIGH, MEDIUM, LOW, INFO
|
||||
category: str # Injection, Auth, Logic, Crypto, DoS, Info
|
||||
endpoint: str
|
||||
vector: str
|
||||
description: str
|
||||
reproduction: str
|
||||
impact: str
|
||||
evidence: str = ""
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"""
|
||||
┌─────────────────────────────────────────────────────────────────────────────┐
|
||||
│ FINDING: {self.category}-{self.severity[:4]}
|
||||
├─────────────────────────────────────────────────────────────────────────────┤
|
||||
│ Severity: {self.severity}
|
||||
│ Category: {self.category}
|
||||
│ Endpoint: {self.endpoint}
|
||||
│ Vector: {self.vector}
|
||||
├─────────────────────────────────────────────────────────────────────────────┤
|
||||
│ Description:
|
||||
│ {self.description}
|
||||
│
|
||||
│ Reproduction:
|
||||
│ {self.reproduction}
|
||||
│
|
||||
│ Impact:
|
||||
│ {self.impact}
|
||||
│
|
||||
│ Evidence:
|
||||
│ {self.evidence[:200]}
|
||||
└─────────────────────────────────────────────────────────────────────────────┘
|
||||
"""
|
||||
|
||||
|
||||
@dataclass
|
||||
class FuzzResult:
|
||||
"""Result of a fuzz test."""
|
||||
|
||||
phase: str
|
||||
test: str
|
||||
status: str # PASS, FAIL, ERROR, TIMEOUT
|
||||
duration: float
|
||||
requests: int = 0
|
||||
findings: list[Finding] = field(default_factory=list)
|
||||
errors: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
class FlaskPasteFuzzer:
|
||||
"""Comprehensive fuzzer for FlaskPaste."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
target: str = "http://127.0.0.1:5099",
|
||||
fuzz_dir: Path | None = None,
|
||||
verbose: bool = False,
|
||||
):
|
||||
self.target = target
|
||||
self.fuzz_dir = fuzz_dir or Path(tempfile.mkdtemp(prefix="flaskpaste-fuzz-"))
|
||||
self.verbose = verbose
|
||||
self.findings: list[Finding] = []
|
||||
self.results: list[FuzzResult] = []
|
||||
self.server_proc: subprocess.Popen | None = None
|
||||
|
||||
# Create directories
|
||||
(self.fuzz_dir / "db").mkdir(parents=True, exist_ok=True)
|
||||
(self.fuzz_dir / "logs").mkdir(exist_ok=True)
|
||||
(self.fuzz_dir / "crashes").mkdir(exist_ok=True)
|
||||
(self.fuzz_dir / "results").mkdir(exist_ok=True)
|
||||
|
||||
def log(self, msg: str, level: str = "INFO") -> None:
|
||||
"""Log message."""
|
||||
if self.verbose or level in ("ERROR", "CRITICAL", "FINDING"):
|
||||
symbol = {"INFO": "●", "ERROR": "✗", "FINDING": "⚠", "OK": "✓"}.get(
|
||||
level, "●"
|
||||
)
|
||||
print(f"{symbol} {msg}")
|
||||
|
||||
def start_server(self) -> bool:
|
||||
"""Start isolated FlaskPaste instance."""
|
||||
self.log("Starting isolated FlaskPaste instance...")
|
||||
|
||||
env = os.environ.copy()
|
||||
env.update(
|
||||
{
|
||||
"FLASKPASTE_DATABASE": str(self.fuzz_dir / "db" / "fuzz.db"),
|
||||
"FLASKPASTE_SECRET_KEY": f"fuzz-{os.urandom(16).hex()}",
|
||||
"FLASKPASTE_PKI_PASSWORD": "fuzz-pki-password",
|
||||
"FLASKPASTE_REGISTRATION_ENABLED": "true",
|
||||
"FLASKPASTE_POW_DIFFICULTY": "1",
|
||||
"FLASKPASTE_RATE_LIMIT_ENABLED": "false",
|
||||
"FLASKPASTE_PROXY_SECRET": "",
|
||||
"FLASK_ENV": "development",
|
||||
"FLASK_DEBUG": "0",
|
||||
}
|
||||
)
|
||||
|
||||
project_root = Path(__file__).parent.parent.parent
|
||||
log_file = open(self.fuzz_dir / "logs" / "server.log", "w")
|
||||
|
||||
self.server_proc = subprocess.Popen(
|
||||
[
|
||||
str(project_root / "venv" / "bin" / "python"),
|
||||
"run.py",
|
||||
"--host",
|
||||
"127.0.0.1",
|
||||
"--port",
|
||||
"5099",
|
||||
],
|
||||
cwd=project_root,
|
||||
env=env,
|
||||
stdout=log_file,
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
|
||||
# Wait for server to start
|
||||
for _ in range(30):
|
||||
try:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(1)
|
||||
sock.connect(("127.0.0.1", 5099))
|
||||
sock.close()
|
||||
self.log("Server started successfully", "OK")
|
||||
return True
|
||||
except (ConnectionRefusedError, socket.timeout):
|
||||
time.sleep(0.5)
|
||||
|
||||
self.log("Server failed to start", "ERROR")
|
||||
return False
|
||||
|
||||
def stop_server(self) -> None:
|
||||
"""Stop the fuzzing server."""
|
||||
if self.server_proc:
|
||||
self.server_proc.terminate()
|
||||
try:
|
||||
self.server_proc.wait(timeout=5)
|
||||
except subprocess.TimeoutExpired:
|
||||
self.server_proc.kill()
|
||||
self.log("Server stopped")
|
||||
|
||||
def http_request(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
data: bytes | None = None,
|
||||
headers: dict[str, str] | None = None,
|
||||
timeout: float = 5.0,
|
||||
) -> tuple[int | None, bytes, float]:
|
||||
"""Make HTTP request, return (status, body, duration)."""
|
||||
import http.client
|
||||
import urllib.parse
|
||||
|
||||
parsed = urllib.parse.urlparse(self.target)
|
||||
start = time.perf_counter()
|
||||
|
||||
try:
|
||||
conn = http.client.HTTPConnection(parsed.hostname, parsed.port, timeout=timeout)
|
||||
hdrs = headers or {}
|
||||
if data:
|
||||
hdrs.setdefault("Content-Length", str(len(data)))
|
||||
|
||||
conn.request(method, path, body=data, headers=hdrs)
|
||||
resp = conn.getresponse()
|
||||
body = resp.read()
|
||||
duration = time.perf_counter() - start
|
||||
conn.close()
|
||||
return resp.status, body, duration
|
||||
except Exception as e:
|
||||
duration = time.perf_counter() - start
|
||||
return None, str(e).encode(), duration
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# Phase 1: Reconnaissance
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def phase1_recon(self) -> FuzzResult:
|
||||
"""Phase 1: Reconnaissance and enumeration."""
|
||||
self.log("Phase 1: Reconnaissance")
|
||||
start = time.time()
|
||||
requests = 0
|
||||
findings = []
|
||||
|
||||
# Known endpoints
|
||||
endpoints = [
|
||||
"/",
|
||||
"/health",
|
||||
"/challenge",
|
||||
"/client",
|
||||
"/register",
|
||||
"/pastes",
|
||||
"/pki",
|
||||
"/pki/ca",
|
||||
"/pki/ca.crt",
|
||||
"/pki/issue",
|
||||
"/pki/certs",
|
||||
"/audit",
|
||||
]
|
||||
|
||||
# Hidden endpoint discovery
|
||||
hidden_tests = [
|
||||
"/admin",
|
||||
"/debug",
|
||||
"/config",
|
||||
"/env",
|
||||
"/.env",
|
||||
"/robots.txt",
|
||||
"/sitemap.xml",
|
||||
"/.git/config",
|
||||
"/api",
|
||||
"/api/v1",
|
||||
"/swagger",
|
||||
"/docs",
|
||||
"/graphql",
|
||||
"/metrics",
|
||||
"/prometheus",
|
||||
"/__debug__",
|
||||
"/server-status",
|
||||
"/phpinfo.php",
|
||||
"/wp-admin",
|
||||
]
|
||||
|
||||
for path in endpoints + hidden_tests:
|
||||
status, _, _ = self.http_request("GET", path)
|
||||
requests += 1
|
||||
if status and status not in (404, 405):
|
||||
self.log(f" {status} {path}")
|
||||
if path in hidden_tests and status == 200:
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="MEDIUM",
|
||||
category="Info",
|
||||
endpoint=f"GET {path}",
|
||||
vector="URL",
|
||||
description=f"Hidden endpoint accessible: {path}",
|
||||
reproduction=f"curl {self.target}{path}",
|
||||
impact="Information disclosure, potential attack surface",
|
||||
)
|
||||
)
|
||||
|
||||
# HTTP method enumeration
|
||||
methods = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD", "TRACE"]
|
||||
for method in methods:
|
||||
status, body, _ = self.http_request(method, "/")
|
||||
requests += 1
|
||||
if method == "TRACE" and status == 200:
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="LOW",
|
||||
category="Info",
|
||||
endpoint="TRACE /",
|
||||
vector="HTTP Method",
|
||||
description="TRACE method enabled",
|
||||
reproduction=f"curl -X TRACE {self.target}/",
|
||||
impact="Cross-site tracing (XST) possible",
|
||||
)
|
||||
)
|
||||
|
||||
self.findings.extend(findings)
|
||||
return FuzzResult(
|
||||
phase="1",
|
||||
test="Reconnaissance",
|
||||
status="PASS",
|
||||
duration=time.time() - start,
|
||||
requests=requests,
|
||||
findings=findings,
|
||||
)
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# Phase 2: Input Fuzzing
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def phase2_input_fuzzing(self) -> FuzzResult:
|
||||
"""Phase 2: Input fuzzing."""
|
||||
self.log("Phase 2: Input Fuzzing")
|
||||
start = time.time()
|
||||
requests = 0
|
||||
findings = []
|
||||
errors = []
|
||||
|
||||
# Paste content fuzzing
|
||||
payloads = [
|
||||
b"normal text",
|
||||
b"\x00" * 100, # Null bytes
|
||||
b"\xff" * 100, # High bytes
|
||||
os.urandom(1000), # Random binary
|
||||
b"A" * 100000, # Large payload
|
||||
"".join(random.choices(string.printable, k=1000)).encode(),
|
||||
"\u202e" * 100, # RTL override
|
||||
"A\u0300" * 100, # Combining characters
|
||||
]
|
||||
|
||||
for payload in payloads:
|
||||
status, body, duration = self.http_request(
|
||||
"POST",
|
||||
"/",
|
||||
data=payload,
|
||||
headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"},
|
||||
)
|
||||
requests += 1
|
||||
|
||||
if status is None:
|
||||
errors.append(f"Request failed: {body.decode(errors='replace')[:100]}")
|
||||
elif status == 500:
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="HIGH",
|
||||
category="DoS",
|
||||
endpoint="POST /",
|
||||
vector="Body",
|
||||
description=f"Server error with payload: {payload[:50]!r}",
|
||||
reproduction=f"curl -X POST {self.target}/ --data-binary ...",
|
||||
impact="Potential denial of service",
|
||||
evidence=body.decode(errors="replace")[:200],
|
||||
)
|
||||
)
|
||||
# Save crash payload
|
||||
crash_file = self.fuzz_dir / "crashes" / f"{int(time.time())}.payload"
|
||||
crash_file.write_bytes(payload)
|
||||
|
||||
# Header fuzzing
|
||||
header_tests = [
|
||||
("X-Forwarded-For", "' OR 1=1--"),
|
||||
("X-Forwarded-For", "127.0.0.1, 10.0.0.1, evil.com"),
|
||||
("X-SSL-Client-SHA1", "' OR 1=1--"),
|
||||
("X-SSL-Client-SHA1", "../../../etc/passwd"),
|
||||
("X-PoW-Token", "A" * 10000),
|
||||
("Host", "evil.com"),
|
||||
("Host", "evil.com\r\nX-Injected: true"),
|
||||
("X-Expiry", "-1"),
|
||||
("X-Expiry", "99999999999999999999"),
|
||||
("Content-Type", "text/html\r\nX-Injected: true"),
|
||||
]
|
||||
|
||||
for header, value in header_tests:
|
||||
status, body, _ = self.http_request("GET", "/", headers={header: value})
|
||||
requests += 1
|
||||
|
||||
if b"X-Injected" in body or b"true" in body:
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="HIGH",
|
||||
category="Injection",
|
||||
endpoint="GET /",
|
||||
vector=f"Header: {header}",
|
||||
description="Header injection vulnerability",
|
||||
reproduction=f'curl {self.target}/ -H "{header}: {value}"',
|
||||
impact="HTTP response splitting, cache poisoning",
|
||||
)
|
||||
)
|
||||
|
||||
self.findings.extend(findings)
|
||||
return FuzzResult(
|
||||
phase="2",
|
||||
test="Input Fuzzing",
|
||||
status="PASS" if not errors else "ERROR",
|
||||
duration=time.time() - start,
|
||||
requests=requests,
|
||||
findings=findings,
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# Phase 3: Injection Attacks
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def phase3_injection(self) -> FuzzResult:
|
||||
"""Phase 3: Injection attacks."""
|
||||
self.log("Phase 3: Injection Attacks")
|
||||
start = time.time()
|
||||
requests = 0
|
||||
findings = []
|
||||
|
||||
# SQL injection payloads
|
||||
sqli_payloads = [
|
||||
"' OR '1'='1",
|
||||
"1' OR '1'='1'--",
|
||||
"1; DROP TABLE pastes;--",
|
||||
"1 UNION SELECT * FROM sqlite_master--",
|
||||
"1' AND SLEEP(5)--",
|
||||
"1/**/OR/**/1=1",
|
||||
]
|
||||
|
||||
for payload in sqli_payloads:
|
||||
encoded = quote(payload, safe="")
|
||||
|
||||
# Test in paste ID
|
||||
status, body, duration = self.http_request("GET", f"/{encoded}")
|
||||
requests += 1
|
||||
|
||||
# Timing-based detection
|
||||
if duration > 4.5:
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="CRITICAL",
|
||||
category="Injection",
|
||||
endpoint=f"GET /{payload[:20]}...",
|
||||
vector="paste_id",
|
||||
description="Possible blind SQL injection (time-based)",
|
||||
reproduction=f"curl {self.target}/{encoded}",
|
||||
impact="Full database compromise",
|
||||
evidence=f"Response time: {duration:.2f}s",
|
||||
)
|
||||
)
|
||||
|
||||
# Error-based detection
|
||||
if b"sqlite" in body.lower() or b"syntax" in body.lower():
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="CRITICAL",
|
||||
category="Injection",
|
||||
endpoint=f"GET /{payload[:20]}...",
|
||||
vector="paste_id",
|
||||
description="SQL injection with error disclosure",
|
||||
reproduction=f"curl {self.target}/{encoded}",
|
||||
impact="Full database compromise",
|
||||
evidence=body.decode(errors="replace")[:200],
|
||||
)
|
||||
)
|
||||
|
||||
# Path traversal
|
||||
traversal_payloads = [
|
||||
"../../../etc/passwd",
|
||||
"....//....//....//etc/passwd",
|
||||
"..%2f..%2f..%2fetc/passwd",
|
||||
"..%252f..%252f..%252fetc/passwd",
|
||||
]
|
||||
|
||||
for payload in traversal_payloads:
|
||||
status, body, _ = self.http_request("GET", f"/{payload}")
|
||||
requests += 1
|
||||
|
||||
if b"root:" in body:
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="CRITICAL",
|
||||
category="Injection",
|
||||
endpoint=f"GET /{payload}",
|
||||
vector="paste_id",
|
||||
description="Path traversal vulnerability",
|
||||
reproduction=f"curl {self.target}/{payload}",
|
||||
impact="Arbitrary file read",
|
||||
evidence=body.decode(errors="replace")[:200],
|
||||
)
|
||||
)
|
||||
|
||||
# SSTI
|
||||
ssti_payloads = [
|
||||
("{{7*7}}", b"49"),
|
||||
("{{config}}", b"SECRET_KEY"),
|
||||
("${7*7}", b"49"),
|
||||
]
|
||||
|
||||
for payload, indicator in ssti_payloads:
|
||||
status, body, _ = self.http_request(
|
||||
"POST",
|
||||
"/",
|
||||
data=payload.encode(),
|
||||
headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"},
|
||||
)
|
||||
requests += 1
|
||||
|
||||
if indicator in body:
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="CRITICAL",
|
||||
category="Injection",
|
||||
endpoint="POST /",
|
||||
vector="Body",
|
||||
description="Server-side template injection",
|
||||
reproduction=f"curl -X POST {self.target}/ -d '{payload}'",
|
||||
impact="Remote code execution",
|
||||
evidence=body.decode(errors="replace")[:200],
|
||||
)
|
||||
)
|
||||
|
||||
# Command injection
|
||||
cmd_payloads = [
|
||||
"; sleep 5",
|
||||
"| sleep 5",
|
||||
"$(sleep 5)",
|
||||
"`sleep 5`",
|
||||
]
|
||||
|
||||
for payload in cmd_payloads:
|
||||
_, _, duration = self.http_request(
|
||||
"POST",
|
||||
"/",
|
||||
data=payload.encode(),
|
||||
headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"},
|
||||
)
|
||||
requests += 1
|
||||
|
||||
if duration > 4.5:
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="CRITICAL",
|
||||
category="Injection",
|
||||
endpoint="POST /",
|
||||
vector="Body",
|
||||
description="Command injection vulnerability",
|
||||
reproduction=f"curl -X POST {self.target}/ -d '{payload}'",
|
||||
impact="Remote code execution",
|
||||
evidence=f"Response time: {duration:.2f}s",
|
||||
)
|
||||
)
|
||||
|
||||
self.findings.extend(findings)
|
||||
return FuzzResult(
|
||||
phase="3",
|
||||
test="Injection Attacks",
|
||||
status="PASS",
|
||||
duration=time.time() - start,
|
||||
requests=requests,
|
||||
findings=findings,
|
||||
)
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# Phase 4: Authentication & Authorization
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def phase4_auth(self) -> FuzzResult:
|
||||
"""Phase 4: Authentication and authorization attacks."""
|
||||
self.log("Phase 4: Auth/Authz")
|
||||
start = time.time()
|
||||
requests = 0
|
||||
findings = []
|
||||
|
||||
# Certificate fingerprint spoofing
|
||||
fake_fingerprints = [
|
||||
"0000000000000000000000000000000000000000",
|
||||
"da39a3ee5e6b4b0d3255bfef95601890afd80709", # SHA1 of empty
|
||||
"ffffffffffffffffffffffffffffffffffffffff",
|
||||
"' OR 1=1--",
|
||||
"../../../etc/passwd",
|
||||
]
|
||||
|
||||
for fp in fake_fingerprints:
|
||||
status, body, _ = self.http_request(
|
||||
"GET", "/pastes", headers={"X-SSL-Client-SHA1": fp}
|
||||
)
|
||||
requests += 1
|
||||
|
||||
if status == 200:
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="CRITICAL",
|
||||
category="Auth",
|
||||
endpoint="GET /pastes",
|
||||
vector="X-SSL-Client-SHA1",
|
||||
description="Certificate fingerprint bypass",
|
||||
reproduction=f'curl {self.target}/pastes -H "X-SSL-Client-SHA1: {fp}"',
|
||||
impact="Authentication bypass",
|
||||
evidence=body.decode(errors="replace")[:200],
|
||||
)
|
||||
)
|
||||
|
||||
# PoW bypass
|
||||
pow_bypasses = [
|
||||
({"X-PoW-Token": "", "X-PoW-Solution": ""}, "empty"),
|
||||
({"X-PoW-Token": "invalid", "X-PoW-Solution": "0"}, "invalid token"),
|
||||
({"X-PoW-Token": "test", "X-PoW-Solution": "-1"}, "negative solution"),
|
||||
(
|
||||
{"X-PoW-Token": "test", "X-PoW-Solution": "999999999999999999999"},
|
||||
"overflow",
|
||||
),
|
||||
]
|
||||
|
||||
for headers, desc in pow_bypasses:
|
||||
status, body, _ = self.http_request(
|
||||
"POST", "/", data=b"test", headers=headers
|
||||
)
|
||||
requests += 1
|
||||
|
||||
if status == 201:
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="HIGH",
|
||||
category="Auth",
|
||||
endpoint="POST /",
|
||||
vector="PoW headers",
|
||||
description=f"PoW bypass via {desc}",
|
||||
reproduction=f"curl -X POST {self.target}/ -d test -H ...",
|
||||
impact="Spam protection bypass",
|
||||
)
|
||||
)
|
||||
|
||||
# Admin endpoint access without auth
|
||||
admin_endpoints = [
|
||||
("GET", "/pki/certs"),
|
||||
("POST", "/pki/issue"),
|
||||
("GET", "/audit"),
|
||||
]
|
||||
|
||||
for method, path in admin_endpoints:
|
||||
status, body, _ = self.http_request(method, path)
|
||||
requests += 1
|
||||
|
||||
if status == 200:
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="HIGH",
|
||||
category="Auth",
|
||||
endpoint=f"{method} {path}",
|
||||
vector="Missing auth",
|
||||
description="Admin endpoint accessible without authentication",
|
||||
reproduction=f"curl -X {method} {self.target}{path}",
|
||||
impact="Unauthorized admin access",
|
||||
)
|
||||
)
|
||||
|
||||
self.findings.extend(findings)
|
||||
return FuzzResult(
|
||||
phase="4",
|
||||
test="Auth/Authz",
|
||||
status="PASS",
|
||||
duration=time.time() - start,
|
||||
requests=requests,
|
||||
findings=findings,
|
||||
)
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# Phase 5: Business Logic
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def phase5_logic(self) -> FuzzResult:
|
||||
"""Phase 5: Business logic attacks."""
|
||||
self.log("Phase 5: Business Logic")
|
||||
start = time.time()
|
||||
requests = 0
|
||||
findings = []
|
||||
|
||||
# Expiry manipulation
|
||||
expiry_tests = [
|
||||
("-1", "negative"),
|
||||
("0", "zero"),
|
||||
("99999999999999999999", "overflow"),
|
||||
("abc", "non-numeric"),
|
||||
]
|
||||
|
||||
for value, desc in expiry_tests:
|
||||
status, body, _ = self.http_request(
|
||||
"POST",
|
||||
"/",
|
||||
data=b"expiry-test",
|
||||
headers={
|
||||
"X-Expiry": value,
|
||||
"X-PoW-Token": "test",
|
||||
"X-PoW-Solution": "0",
|
||||
},
|
||||
)
|
||||
requests += 1
|
||||
|
||||
if status == 201:
|
||||
# Check if it was accepted without error
|
||||
try:
|
||||
data = json.loads(body)
|
||||
if "id" in data:
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="MEDIUM",
|
||||
category="Logic",
|
||||
endpoint="POST /",
|
||||
vector="X-Expiry",
|
||||
description=f"Expiry manipulation ({desc}) accepted",
|
||||
reproduction=f'curl -X POST {self.target}/ -d test -H "X-Expiry: {value}"',
|
||||
impact="Paste lifetime manipulation",
|
||||
)
|
||||
)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# Large payload DoS
|
||||
sizes = [1024 * 1024, 10 * 1024 * 1024] # 1MB, 10MB
|
||||
|
||||
for size in sizes:
|
||||
payload = os.urandom(size)
|
||||
status, body, duration = self.http_request(
|
||||
"POST",
|
||||
"/",
|
||||
data=payload,
|
||||
headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"},
|
||||
timeout=30,
|
||||
)
|
||||
requests += 1
|
||||
|
||||
if status == 201:
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="MEDIUM",
|
||||
category="DoS",
|
||||
endpoint="POST /",
|
||||
vector="Body size",
|
||||
description=f"Large payload ({size // 1024 // 1024}MB) accepted",
|
||||
reproduction=f"dd if=/dev/urandom bs=1M count={size // 1024 // 1024} | curl -X POST {self.target}/ --data-binary @-",
|
||||
impact="Resource exhaustion",
|
||||
evidence=f"Upload time: {duration:.2f}s",
|
||||
)
|
||||
)
|
||||
elif status == 500:
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="HIGH",
|
||||
category="DoS",
|
||||
endpoint="POST /",
|
||||
vector="Body size",
|
||||
description=f"Server crash on large payload ({size // 1024 // 1024}MB)",
|
||||
reproduction=f"dd if=/dev/urandom bs=1M count={size // 1024 // 1024} | curl -X POST {self.target}/ --data-binary @-",
|
||||
impact="Denial of service",
|
||||
)
|
||||
)
|
||||
|
||||
self.findings.extend(findings)
|
||||
return FuzzResult(
|
||||
phase="5",
|
||||
test="Business Logic",
|
||||
status="PASS",
|
||||
duration=time.time() - start,
|
||||
requests=requests,
|
||||
findings=findings,
|
||||
)
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# Phase 6: Crypto
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def phase6_crypto(self) -> FuzzResult:
|
||||
"""Phase 6: Cryptographic attacks."""
|
||||
self.log("Phase 6: Cryptography")
|
||||
start = time.time()
|
||||
requests = 0
|
||||
findings = []
|
||||
|
||||
# Collect PoW tokens for randomness analysis
|
||||
tokens = []
|
||||
for _ in range(100):
|
||||
status, body, _ = self.http_request("GET", "/challenge")
|
||||
requests += 1
|
||||
if status == 200:
|
||||
try:
|
||||
data = json.loads(body)
|
||||
if "token" in data:
|
||||
tokens.append(data["token"])
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# Check for duplicate tokens
|
||||
if len(tokens) != len(set(tokens)):
|
||||
findings.append(
|
||||
Finding(
|
||||
severity="HIGH",
|
||||
category="Crypto",
|
||||
endpoint="GET /challenge",
|
||||
vector="token",
|
||||
description="Duplicate PoW tokens generated",
|
||||
reproduction=f"for i in {{1..100}}; do curl -s {self.target}/challenge; done",
|
||||
impact="PoW replay attack possible",
|
||||
)
|
||||
)
|
||||
|
||||
# Password timing attack simulation
|
||||
# Create a paste with password first would be needed
|
||||
# Skipping for now as it requires state setup
|
||||
|
||||
self.findings.extend(findings)
|
||||
return FuzzResult(
|
||||
phase="6",
|
||||
test="Cryptography",
|
||||
status="PASS",
|
||||
duration=time.time() - start,
|
||||
requests=requests,
|
||||
findings=findings,
|
||||
)
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# Report Generation
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def generate_report(self) -> str:
|
||||
"""Generate final report."""
|
||||
severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4}
|
||||
sorted_findings = sorted(
|
||||
self.findings, key=lambda f: severity_order.get(f.severity, 5)
|
||||
)
|
||||
|
||||
report = []
|
||||
report.append("=" * 79)
|
||||
report.append("FLASKPASTE OFFENSIVE SECURITY TEST REPORT")
|
||||
report.append("=" * 79)
|
||||
report.append(f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
report.append(f"Target: {self.target}")
|
||||
report.append("")
|
||||
|
||||
# Summary
|
||||
report.append("SUMMARY")
|
||||
report.append("-" * 79)
|
||||
total_requests = sum(r.requests for r in self.results)
|
||||
report.append(f"Total phases run: {len(self.results)}")
|
||||
report.append(f"Total requests: {total_requests}")
|
||||
report.append(f"Total findings: {len(self.findings)}")
|
||||
|
||||
by_severity = {}
|
||||
for f in self.findings:
|
||||
by_severity[f.severity] = by_severity.get(f.severity, 0) + 1
|
||||
|
||||
for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]:
|
||||
if sev in by_severity:
|
||||
report.append(f" {sev}: {by_severity[sev]}")
|
||||
|
||||
report.append("")
|
||||
|
||||
# Phase Results
|
||||
report.append("PHASE RESULTS")
|
||||
report.append("-" * 79)
|
||||
for r in self.results:
|
||||
status_sym = "✓" if r.status == "PASS" else "✗"
|
||||
report.append(
|
||||
f"{status_sym} Phase {r.phase}: {r.test} ({r.requests} requests, {r.duration:.1f}s)"
|
||||
)
|
||||
if r.findings:
|
||||
report.append(f" → {len(r.findings)} findings")
|
||||
if r.errors:
|
||||
report.append(f" → {len(r.errors)} errors")
|
||||
|
||||
report.append("")
|
||||
|
||||
# Findings
|
||||
if sorted_findings:
|
||||
report.append("FINDINGS")
|
||||
report.append("-" * 79)
|
||||
for f in sorted_findings:
|
||||
report.append(str(f))
|
||||
|
||||
return "\n".join(report)
|
||||
|
||||
def run(self, phases: list[int] | None = None, quick: bool = False) -> None:
|
||||
"""Run fuzzing campaign."""
|
||||
all_phases = [
|
||||
(1, self.phase1_recon),
|
||||
(2, self.phase2_input_fuzzing),
|
||||
(3, self.phase3_injection),
|
||||
(4, self.phase4_auth),
|
||||
(5, self.phase5_logic),
|
||||
(6, self.phase6_crypto),
|
||||
]
|
||||
|
||||
if quick:
|
||||
phases = [1, 3] # Just recon and injection for quick test
|
||||
|
||||
if phases:
|
||||
all_phases = [(n, f) for n, f in all_phases if n in phases]
|
||||
|
||||
try:
|
||||
if not self.start_server():
|
||||
print("Failed to start server, aborting")
|
||||
return
|
||||
|
||||
for phase_num, phase_func in all_phases:
|
||||
try:
|
||||
result = phase_func()
|
||||
self.results.append(result)
|
||||
status = "✓" if result.status == "PASS" else "✗"
|
||||
self.log(
|
||||
f" {status} {result.test}: {len(result.findings)} findings",
|
||||
"OK" if result.status == "PASS" else "ERROR",
|
||||
)
|
||||
except Exception as e:
|
||||
self.log(f" Phase {phase_num} failed: {e}", "ERROR")
|
||||
self.results.append(
|
||||
FuzzResult(
|
||||
phase=str(phase_num),
|
||||
test=phase_func.__name__,
|
||||
status="ERROR",
|
||||
duration=0,
|
||||
errors=[str(e)],
|
||||
)
|
||||
)
|
||||
|
||||
# Generate and save report
|
||||
report = self.generate_report()
|
||||
report_file = self.fuzz_dir / "results" / "REPORT.txt"
|
||||
report_file.write_text(report)
|
||||
print(report)
|
||||
print(f"\nReport saved to: {report_file}")
|
||||
|
||||
finally:
|
||||
self.stop_server()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="FlaskPaste Offensive Security Tester")
|
||||
parser.add_argument(
|
||||
"--phases", type=str, help="Comma-separated phase numbers (e.g., 1,2,3)"
|
||||
)
|
||||
parser.add_argument("--quick", action="store_true", help="Quick smoke test")
|
||||
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
|
||||
parser.add_argument("--fuzz-dir", type=str, help="Fuzzing workspace directory")
|
||||
args = parser.parse_args()
|
||||
|
||||
phases = None
|
||||
if args.phases:
|
||||
phases = [int(p.strip()) for p in args.phases.split(",")]
|
||||
|
||||
fuzz_dir = Path(args.fuzz_dir) if args.fuzz_dir else None
|
||||
|
||||
fuzzer = FlaskPasteFuzzer(verbose=args.verbose, fuzz_dir=fuzz_dir)
|
||||
|
||||
# Handle Ctrl+C gracefully
|
||||
def signal_handler(sig: int, frame: Any) -> None:
|
||||
print("\nInterrupted, cleaning up...")
|
||||
fuzzer.stop_server()
|
||||
sys.exit(1)
|
||||
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
fuzzer.run(phases=phases, quick=args.quick)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user