forked from username/flaskpaste
962 lines
36 KiB
Python
Executable File
962 lines
36 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
FlaskPaste Offensive Security Testing Harness
|
|
|
|
Runs comprehensive fuzzing and penetration testing against
|
|
an isolated FlaskPaste instance.
|
|
|
|
Usage:
|
|
./run_fuzz.py [--phases PHASES] [--quick] [--verbose]
|
|
|
|
Examples:
|
|
./run_fuzz.py # Run all phases
|
|
./run_fuzz.py --phases 1,2,3 # Run specific phases
|
|
./run_fuzz.py --quick # Quick smoke test
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import random
|
|
import signal
|
|
import socket
|
|
import string
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.parse import quote
|
|
|
|
# Ensure we can import the app
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
|
|
|
@dataclass
|
|
class Finding:
|
|
"""Security finding."""
|
|
|
|
severity: str # CRITICAL, HIGH, MEDIUM, LOW, INFO
|
|
category: str # Injection, Auth, Logic, Crypto, DoS, Info
|
|
endpoint: str
|
|
vector: str
|
|
description: str
|
|
reproduction: str
|
|
impact: str
|
|
evidence: str = ""
|
|
|
|
def __str__(self) -> str:
|
|
return f"""
|
|
┌─────────────────────────────────────────────────────────────────────────────┐
|
|
│ FINDING: {self.category}-{self.severity[:4]}
|
|
├─────────────────────────────────────────────────────────────────────────────┤
|
|
│ Severity: {self.severity}
|
|
│ Category: {self.category}
|
|
│ Endpoint: {self.endpoint}
|
|
│ Vector: {self.vector}
|
|
├─────────────────────────────────────────────────────────────────────────────┤
|
|
│ Description:
|
|
│ {self.description}
|
|
│
|
|
│ Reproduction:
|
|
│ {self.reproduction}
|
|
│
|
|
│ Impact:
|
|
│ {self.impact}
|
|
│
|
|
│ Evidence:
|
|
│ {self.evidence[:200]}
|
|
└─────────────────────────────────────────────────────────────────────────────┘
|
|
"""
|
|
|
|
|
|
@dataclass
|
|
class FuzzResult:
|
|
"""Result of a fuzz test."""
|
|
|
|
phase: str
|
|
test: str
|
|
status: str # PASS, FAIL, ERROR, TIMEOUT
|
|
duration: float
|
|
requests: int = 0
|
|
findings: list[Finding] = field(default_factory=list)
|
|
errors: list[str] = field(default_factory=list)
|
|
|
|
|
|
class FlaskPasteFuzzer:
|
|
"""Comprehensive fuzzer for FlaskPaste."""
|
|
|
|
def __init__(
|
|
self,
|
|
target: str = "http://127.0.0.1:5099",
|
|
fuzz_dir: Path | None = None,
|
|
verbose: bool = False,
|
|
):
|
|
self.target = target
|
|
self.fuzz_dir = fuzz_dir or Path(tempfile.mkdtemp(prefix="flaskpaste-fuzz-"))
|
|
self.verbose = verbose
|
|
self.findings: list[Finding] = []
|
|
self.results: list[FuzzResult] = []
|
|
self.server_proc: subprocess.Popen | None = None
|
|
|
|
# Create directories
|
|
(self.fuzz_dir / "db").mkdir(parents=True, exist_ok=True)
|
|
(self.fuzz_dir / "logs").mkdir(exist_ok=True)
|
|
(self.fuzz_dir / "crashes").mkdir(exist_ok=True)
|
|
(self.fuzz_dir / "results").mkdir(exist_ok=True)
|
|
|
|
def log(self, msg: str, level: str = "INFO") -> None:
|
|
"""Log message."""
|
|
if self.verbose or level in ("ERROR", "CRITICAL", "FINDING"):
|
|
symbol = {"INFO": "●", "ERROR": "✗", "FINDING": "⚠", "OK": "✓"}.get(level, "●")
|
|
print(f"{symbol} {msg}")
|
|
|
|
def start_server(self) -> bool:
|
|
"""Start isolated FlaskPaste instance."""
|
|
self.log("Starting isolated FlaskPaste instance...")
|
|
|
|
env = os.environ.copy()
|
|
env.update(
|
|
{
|
|
"FLASKPASTE_DATABASE": str(self.fuzz_dir / "db" / "fuzz.db"),
|
|
"FLASKPASTE_SECRET_KEY": f"fuzz-{os.urandom(16).hex()}",
|
|
"FLASKPASTE_PKI_PASSWORD": "fuzz-pki-password",
|
|
"FLASKPASTE_REGISTRATION_ENABLED": "true",
|
|
"FLASKPASTE_POW_DIFFICULTY": "1",
|
|
"FLASKPASTE_RATE_LIMIT_ENABLED": "false",
|
|
"FLASKPASTE_PROXY_SECRET": "",
|
|
"FLASK_ENV": "development",
|
|
"FLASK_DEBUG": "0",
|
|
}
|
|
)
|
|
|
|
project_root = Path(__file__).parent.parent.parent
|
|
self.log_file = open( # noqa: SIM115
|
|
self.fuzz_dir / "logs" / "server.log", "w"
|
|
)
|
|
|
|
self.server_proc = subprocess.Popen( # noqa: S603
|
|
[
|
|
str(project_root / "venv" / "bin" / "python"),
|
|
"run.py",
|
|
"--host",
|
|
"127.0.0.1",
|
|
"--port",
|
|
"5099",
|
|
],
|
|
cwd=project_root,
|
|
env=env,
|
|
stdout=self.log_file,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
|
|
# Wait for server to start
|
|
for _ in range(30):
|
|
try:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(1)
|
|
sock.connect(("127.0.0.1", 5099))
|
|
sock.close()
|
|
self.log("Server started successfully", "OK")
|
|
return True
|
|
except (TimeoutError, ConnectionRefusedError):
|
|
time.sleep(0.5)
|
|
|
|
self.log("Server failed to start", "ERROR")
|
|
return False
|
|
|
|
def stop_server(self) -> None:
|
|
"""Stop the fuzzing server."""
|
|
if self.server_proc:
|
|
self.server_proc.terminate()
|
|
try:
|
|
self.server_proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
self.server_proc.kill()
|
|
self.log("Server stopped")
|
|
|
|
def http_request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
data: bytes | None = None,
|
|
headers: dict[str, str] | None = None,
|
|
timeout: float = 5.0,
|
|
) -> tuple[int | None, bytes, float]:
|
|
"""Make HTTP request, return (status, body, duration)."""
|
|
import http.client
|
|
import urllib.parse
|
|
|
|
parsed = urllib.parse.urlparse(self.target)
|
|
start = time.perf_counter()
|
|
|
|
try:
|
|
assert parsed.hostname is not None
|
|
conn = http.client.HTTPConnection(parsed.hostname, parsed.port, timeout=timeout)
|
|
hdrs = headers or {}
|
|
if data:
|
|
hdrs.setdefault("Content-Length", str(len(data)))
|
|
|
|
conn.request(method, path, body=data, headers=hdrs)
|
|
resp = conn.getresponse()
|
|
body = resp.read()
|
|
duration = time.perf_counter() - start
|
|
conn.close()
|
|
return resp.status, body, duration
|
|
except Exception as e:
|
|
duration = time.perf_counter() - start
|
|
return None, str(e).encode(), duration
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
# Phase 1: Reconnaissance
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
|
|
def phase1_recon(self) -> FuzzResult:
|
|
"""Phase 1: Reconnaissance and enumeration."""
|
|
self.log("Phase 1: Reconnaissance")
|
|
start = time.time()
|
|
requests = 0
|
|
findings = []
|
|
|
|
# Known endpoints
|
|
endpoints = [
|
|
"/",
|
|
"/health",
|
|
"/challenge",
|
|
"/client",
|
|
"/register",
|
|
"/pastes",
|
|
"/pki",
|
|
"/pki/ca",
|
|
"/pki/ca.crt",
|
|
"/pki/issue",
|
|
"/pki/certs",
|
|
"/audit",
|
|
]
|
|
|
|
# Hidden endpoint discovery
|
|
hidden_tests = [
|
|
"/admin",
|
|
"/debug",
|
|
"/config",
|
|
"/env",
|
|
"/.env",
|
|
"/robots.txt",
|
|
"/sitemap.xml",
|
|
"/.git/config",
|
|
"/api",
|
|
"/api/v1",
|
|
"/swagger",
|
|
"/docs",
|
|
"/graphql",
|
|
"/metrics",
|
|
"/prometheus",
|
|
"/__debug__",
|
|
"/server-status",
|
|
"/phpinfo.php",
|
|
"/wp-admin",
|
|
]
|
|
|
|
for path in endpoints + hidden_tests:
|
|
status, _, _ = self.http_request("GET", path)
|
|
requests += 1
|
|
if status and status not in (404, 405):
|
|
self.log(f" {status} {path}")
|
|
if path in hidden_tests and status == 200:
|
|
findings.append(
|
|
Finding(
|
|
severity="MEDIUM",
|
|
category="Info",
|
|
endpoint=f"GET {path}",
|
|
vector="URL",
|
|
description=f"Hidden endpoint accessible: {path}",
|
|
reproduction=f"curl {self.target}{path}",
|
|
impact="Information disclosure, potential attack surface",
|
|
)
|
|
)
|
|
|
|
# HTTP method enumeration
|
|
methods = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD", "TRACE"]
|
|
for method in methods:
|
|
status, _body, _ = self.http_request(method, "/")
|
|
requests += 1
|
|
if method == "TRACE" and status == 200:
|
|
findings.append(
|
|
Finding(
|
|
severity="LOW",
|
|
category="Info",
|
|
endpoint="TRACE /",
|
|
vector="HTTP Method",
|
|
description="TRACE method enabled",
|
|
reproduction=f"curl -X TRACE {self.target}/",
|
|
impact="Cross-site tracing (XST) possible",
|
|
)
|
|
)
|
|
|
|
self.findings.extend(findings)
|
|
return FuzzResult(
|
|
phase="1",
|
|
test="Reconnaissance",
|
|
status="PASS",
|
|
duration=time.time() - start,
|
|
requests=requests,
|
|
findings=findings,
|
|
)
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
# Phase 2: Input Fuzzing
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
|
|
def phase2_input_fuzzing(self) -> FuzzResult:
|
|
"""Phase 2: Input fuzzing."""
|
|
self.log("Phase 2: Input Fuzzing")
|
|
start = time.time()
|
|
requests = 0
|
|
findings = []
|
|
errors = []
|
|
|
|
# Paste content fuzzing
|
|
payloads: list[bytes] = [
|
|
b"normal text",
|
|
b"\x00" * 100, # Null bytes
|
|
b"\xff" * 100, # High bytes
|
|
os.urandom(1000), # Random binary
|
|
b"A" * 100000, # Large payload
|
|
"".join(random.choices(string.printable, k=1000)).encode(),
|
|
("\u202e" * 100).encode("utf-8"), # RTL override
|
|
("A\u0300" * 100).encode("utf-8"), # Combining characters
|
|
]
|
|
|
|
for payload in payloads:
|
|
status, body, _duration = self.http_request(
|
|
"POST",
|
|
"/",
|
|
data=payload,
|
|
headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"},
|
|
)
|
|
requests += 1
|
|
|
|
if status is None:
|
|
errors.append(f"Request failed: {body.decode(errors='replace')[:100]}")
|
|
elif status == 500:
|
|
findings.append(
|
|
Finding(
|
|
severity="HIGH",
|
|
category="DoS",
|
|
endpoint="POST /",
|
|
vector="Body",
|
|
description=f"Server error with payload: {payload[:50]!r}",
|
|
reproduction=f"curl -X POST {self.target}/ --data-binary ...",
|
|
impact="Potential denial of service",
|
|
evidence=body.decode(errors="replace")[:200],
|
|
)
|
|
)
|
|
# Save crash payload
|
|
crash_file = self.fuzz_dir / "crashes" / f"{int(time.time())}.payload"
|
|
crash_file.write_bytes(payload)
|
|
|
|
# Header fuzzing
|
|
header_tests = [
|
|
("X-Forwarded-For", "' OR 1=1--"),
|
|
("X-Forwarded-For", "127.0.0.1, 10.0.0.1, evil.com"),
|
|
("X-SSL-Client-SHA1", "' OR 1=1--"),
|
|
("X-SSL-Client-SHA1", "../../../etc/passwd"),
|
|
("X-PoW-Token", "A" * 10000),
|
|
("Host", "evil.com"),
|
|
("Host", "evil.com\r\nX-Injected: true"),
|
|
("X-Expiry", "-1"),
|
|
("X-Expiry", "99999999999999999999"),
|
|
("Content-Type", "text/html\r\nX-Injected: true"),
|
|
]
|
|
|
|
for header, value in header_tests:
|
|
status, body, _ = self.http_request("GET", "/", headers={header: value})
|
|
requests += 1
|
|
|
|
if b"X-Injected" in body or b"true" in body:
|
|
findings.append(
|
|
Finding(
|
|
severity="HIGH",
|
|
category="Injection",
|
|
endpoint="GET /",
|
|
vector=f"Header: {header}",
|
|
description="Header injection vulnerability",
|
|
reproduction=f'curl {self.target}/ -H "{header}: {value}"',
|
|
impact="HTTP response splitting, cache poisoning",
|
|
)
|
|
)
|
|
|
|
self.findings.extend(findings)
|
|
return FuzzResult(
|
|
phase="2",
|
|
test="Input Fuzzing",
|
|
status="PASS" if not errors else "ERROR",
|
|
duration=time.time() - start,
|
|
requests=requests,
|
|
findings=findings,
|
|
errors=errors,
|
|
)
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
# Phase 3: Injection Attacks
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
|
|
def phase3_injection(self) -> FuzzResult:
|
|
"""Phase 3: Injection attacks."""
|
|
self.log("Phase 3: Injection Attacks")
|
|
start = time.time()
|
|
requests = 0
|
|
findings = []
|
|
|
|
# SQL injection payloads
|
|
sqli_payloads = [
|
|
"' OR '1'='1",
|
|
"1' OR '1'='1'--",
|
|
"1; DROP TABLE pastes;--",
|
|
"1 UNION SELECT * FROM sqlite_master--",
|
|
"1' AND SLEEP(5)--",
|
|
"1/**/OR/**/1=1",
|
|
]
|
|
|
|
for payload in sqli_payloads:
|
|
encoded = quote(payload, safe="")
|
|
|
|
# Test in paste ID
|
|
_status, body, duration = self.http_request("GET", f"/{encoded}")
|
|
requests += 1
|
|
|
|
# Timing-based detection
|
|
if duration > 4.5:
|
|
findings.append(
|
|
Finding(
|
|
severity="CRITICAL",
|
|
category="Injection",
|
|
endpoint=f"GET /{payload[:20]}...",
|
|
vector="paste_id",
|
|
description="Possible blind SQL injection (time-based)",
|
|
reproduction=f"curl {self.target}/{encoded}",
|
|
impact="Full database compromise",
|
|
evidence=f"Response time: {duration:.2f}s",
|
|
)
|
|
)
|
|
|
|
# Error-based detection
|
|
if b"sqlite" in body.lower() or b"syntax" in body.lower():
|
|
findings.append(
|
|
Finding(
|
|
severity="CRITICAL",
|
|
category="Injection",
|
|
endpoint=f"GET /{payload[:20]}...",
|
|
vector="paste_id",
|
|
description="SQL injection with error disclosure",
|
|
reproduction=f"curl {self.target}/{encoded}",
|
|
impact="Full database compromise",
|
|
evidence=body.decode(errors="replace")[:200],
|
|
)
|
|
)
|
|
|
|
# Path traversal
|
|
traversal_payloads = [
|
|
"../../../etc/passwd",
|
|
"....//....//....//etc/passwd",
|
|
"..%2f..%2f..%2fetc/passwd",
|
|
"..%252f..%252f..%252fetc/passwd",
|
|
]
|
|
|
|
for payload in traversal_payloads:
|
|
_status, body, _ = self.http_request("GET", f"/{payload}")
|
|
requests += 1
|
|
|
|
if b"root:" in body:
|
|
findings.append(
|
|
Finding(
|
|
severity="CRITICAL",
|
|
category="Injection",
|
|
endpoint=f"GET /{payload}",
|
|
vector="paste_id",
|
|
description="Path traversal vulnerability",
|
|
reproduction=f"curl {self.target}/{payload}",
|
|
impact="Arbitrary file read",
|
|
evidence=body.decode(errors="replace")[:200],
|
|
)
|
|
)
|
|
|
|
# SSTI
|
|
ssti_payloads = [
|
|
("{{7*7}}", b"49"),
|
|
("{{config}}", b"SECRET_KEY"),
|
|
("${7*7}", b"49"),
|
|
]
|
|
|
|
for payload, indicator in ssti_payloads:
|
|
_status, body, _ = self.http_request(
|
|
"POST",
|
|
"/",
|
|
data=payload.encode(),
|
|
headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"},
|
|
)
|
|
requests += 1
|
|
|
|
if indicator in body:
|
|
findings.append(
|
|
Finding(
|
|
severity="CRITICAL",
|
|
category="Injection",
|
|
endpoint="POST /",
|
|
vector="Body",
|
|
description="Server-side template injection",
|
|
reproduction=f"curl -X POST {self.target}/ -d '{payload}'",
|
|
impact="Remote code execution",
|
|
evidence=body.decode(errors="replace")[:200],
|
|
)
|
|
)
|
|
|
|
# Command injection
|
|
cmd_payloads = [
|
|
"; sleep 5",
|
|
"| sleep 5",
|
|
"$(sleep 5)",
|
|
"`sleep 5`",
|
|
]
|
|
|
|
for payload in cmd_payloads:
|
|
_, _, duration = self.http_request(
|
|
"POST",
|
|
"/",
|
|
data=payload.encode(),
|
|
headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"},
|
|
)
|
|
requests += 1
|
|
|
|
if duration > 4.5:
|
|
findings.append(
|
|
Finding(
|
|
severity="CRITICAL",
|
|
category="Injection",
|
|
endpoint="POST /",
|
|
vector="Body",
|
|
description="Command injection vulnerability",
|
|
reproduction=f"curl -X POST {self.target}/ -d '{payload}'",
|
|
impact="Remote code execution",
|
|
evidence=f"Response time: {duration:.2f}s",
|
|
)
|
|
)
|
|
|
|
self.findings.extend(findings)
|
|
return FuzzResult(
|
|
phase="3",
|
|
test="Injection Attacks",
|
|
status="PASS",
|
|
duration=time.time() - start,
|
|
requests=requests,
|
|
findings=findings,
|
|
)
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
# Phase 4: Authentication & Authorization
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
|
|
def phase4_auth(self) -> FuzzResult:
|
|
"""Phase 4: Authentication and authorization attacks."""
|
|
self.log("Phase 4: Auth/Authz")
|
|
start = time.time()
|
|
requests = 0
|
|
findings = []
|
|
|
|
# Certificate fingerprint spoofing
|
|
fake_fingerprints = [
|
|
"0000000000000000000000000000000000000000",
|
|
"da39a3ee5e6b4b0d3255bfef95601890afd80709", # SHA1 of empty
|
|
"ffffffffffffffffffffffffffffffffffffffff",
|
|
"' OR 1=1--",
|
|
"../../../etc/passwd",
|
|
]
|
|
|
|
for fp in fake_fingerprints:
|
|
status, body, _ = self.http_request("GET", "/pastes", headers={"X-SSL-Client-SHA1": fp})
|
|
requests += 1
|
|
|
|
if status == 200:
|
|
findings.append(
|
|
Finding(
|
|
severity="CRITICAL",
|
|
category="Auth",
|
|
endpoint="GET /pastes",
|
|
vector="X-SSL-Client-SHA1",
|
|
description="Certificate fingerprint bypass",
|
|
reproduction=f'curl {self.target}/pastes -H "X-SSL-Client-SHA1: {fp}"',
|
|
impact="Authentication bypass",
|
|
evidence=body.decode(errors="replace")[:200],
|
|
)
|
|
)
|
|
|
|
# PoW bypass
|
|
pow_bypasses = [
|
|
({"X-PoW-Token": "", "X-PoW-Solution": ""}, "empty"),
|
|
({"X-PoW-Token": "invalid", "X-PoW-Solution": "0"}, "invalid token"),
|
|
({"X-PoW-Token": "test", "X-PoW-Solution": "-1"}, "negative solution"),
|
|
(
|
|
{"X-PoW-Token": "test", "X-PoW-Solution": "999999999999999999999"},
|
|
"overflow",
|
|
),
|
|
]
|
|
|
|
for headers, desc in pow_bypasses:
|
|
status, body, _ = self.http_request("POST", "/", data=b"test", headers=headers)
|
|
requests += 1
|
|
|
|
if status == 201:
|
|
findings.append(
|
|
Finding(
|
|
severity="HIGH",
|
|
category="Auth",
|
|
endpoint="POST /",
|
|
vector="PoW headers",
|
|
description=f"PoW bypass via {desc}",
|
|
reproduction=f"curl -X POST {self.target}/ -d test -H ...",
|
|
impact="Spam protection bypass",
|
|
)
|
|
)
|
|
|
|
# Admin endpoint access without auth
|
|
admin_endpoints = [
|
|
("GET", "/pki/certs"),
|
|
("POST", "/pki/issue"),
|
|
("GET", "/audit"),
|
|
]
|
|
|
|
for method, path in admin_endpoints:
|
|
status, body, _ = self.http_request(method, path)
|
|
requests += 1
|
|
|
|
if status == 200:
|
|
findings.append(
|
|
Finding(
|
|
severity="HIGH",
|
|
category="Auth",
|
|
endpoint=f"{method} {path}",
|
|
vector="Missing auth",
|
|
description="Admin endpoint accessible without authentication",
|
|
reproduction=f"curl -X {method} {self.target}{path}",
|
|
impact="Unauthorized admin access",
|
|
)
|
|
)
|
|
|
|
self.findings.extend(findings)
|
|
return FuzzResult(
|
|
phase="4",
|
|
test="Auth/Authz",
|
|
status="PASS",
|
|
duration=time.time() - start,
|
|
requests=requests,
|
|
findings=findings,
|
|
)
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
# Phase 5: Business Logic
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
|
|
def phase5_logic(self) -> FuzzResult:
|
|
"""Phase 5: Business logic attacks."""
|
|
self.log("Phase 5: Business Logic")
|
|
start = time.time()
|
|
requests = 0
|
|
findings = []
|
|
|
|
# Expiry manipulation
|
|
expiry_tests = [
|
|
("-1", "negative"),
|
|
("0", "zero"),
|
|
("99999999999999999999", "overflow"),
|
|
("abc", "non-numeric"),
|
|
]
|
|
|
|
for value, desc in expiry_tests:
|
|
status, body, _ = self.http_request(
|
|
"POST",
|
|
"/",
|
|
data=b"expiry-test",
|
|
headers={
|
|
"X-Expiry": value,
|
|
"X-PoW-Token": "test",
|
|
"X-PoW-Solution": "0",
|
|
},
|
|
)
|
|
requests += 1
|
|
|
|
if status == 201:
|
|
# Check if it was accepted without error
|
|
try:
|
|
data = json.loads(body)
|
|
if "id" in data:
|
|
findings.append(
|
|
Finding(
|
|
severity="MEDIUM",
|
|
category="Logic",
|
|
endpoint="POST /",
|
|
vector="X-Expiry",
|
|
description=f"Expiry manipulation ({desc}) accepted",
|
|
reproduction=(
|
|
f'curl -X POST {self.target}/ -d test -H "X-Expiry: {value}"'
|
|
),
|
|
impact="Paste lifetime manipulation",
|
|
)
|
|
)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Large payload DoS
|
|
sizes = [1024 * 1024, 10 * 1024 * 1024] # 1MB, 10MB
|
|
|
|
for size in sizes:
|
|
payload = os.urandom(size)
|
|
status, body, duration = self.http_request(
|
|
"POST",
|
|
"/",
|
|
data=payload,
|
|
headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"},
|
|
timeout=30,
|
|
)
|
|
requests += 1
|
|
|
|
if status == 201:
|
|
findings.append(
|
|
Finding(
|
|
severity="MEDIUM",
|
|
category="DoS",
|
|
endpoint="POST /",
|
|
vector="Body size",
|
|
description=f"Large payload ({size // 1024 // 1024}MB) accepted",
|
|
reproduction=(
|
|
f"dd if=/dev/urandom bs=1M count={size // 1024 // 1024} | "
|
|
f"curl -X POST {self.target}/ --data-binary @-"
|
|
),
|
|
impact="Resource exhaustion",
|
|
evidence=f"Upload time: {duration:.2f}s",
|
|
)
|
|
)
|
|
elif status == 500:
|
|
findings.append(
|
|
Finding(
|
|
severity="HIGH",
|
|
category="DoS",
|
|
endpoint="POST /",
|
|
vector="Body size",
|
|
description=f"Server crash on large payload ({size // 1024 // 1024}MB)",
|
|
reproduction=(
|
|
f"dd if=/dev/urandom bs=1M count={size // 1024 // 1024} | "
|
|
f"curl -X POST {self.target}/ --data-binary @-"
|
|
),
|
|
impact="Denial of service",
|
|
)
|
|
)
|
|
|
|
self.findings.extend(findings)
|
|
return FuzzResult(
|
|
phase="5",
|
|
test="Business Logic",
|
|
status="PASS",
|
|
duration=time.time() - start,
|
|
requests=requests,
|
|
findings=findings,
|
|
)
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
# Phase 6: Crypto
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
|
|
def phase6_crypto(self) -> FuzzResult:
|
|
"""Phase 6: Cryptographic attacks."""
|
|
self.log("Phase 6: Cryptography")
|
|
start = time.time()
|
|
requests = 0
|
|
findings = []
|
|
|
|
# Collect PoW tokens for randomness analysis
|
|
tokens = []
|
|
for _ in range(100):
|
|
status, body, _ = self.http_request("GET", "/challenge")
|
|
requests += 1
|
|
if status == 200:
|
|
try:
|
|
data = json.loads(body)
|
|
if "token" in data:
|
|
tokens.append(data["token"])
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Check for duplicate tokens
|
|
if len(tokens) != len(set(tokens)):
|
|
findings.append(
|
|
Finding(
|
|
severity="HIGH",
|
|
category="Crypto",
|
|
endpoint="GET /challenge",
|
|
vector="token",
|
|
description="Duplicate PoW tokens generated",
|
|
reproduction=f"for i in {{1..100}}; do curl -s {self.target}/challenge; done",
|
|
impact="PoW replay attack possible",
|
|
)
|
|
)
|
|
|
|
# Password timing attack simulation
|
|
# Create a paste with password first would be needed
|
|
# Skipping for now as it requires state setup
|
|
|
|
self.findings.extend(findings)
|
|
return FuzzResult(
|
|
phase="6",
|
|
test="Cryptography",
|
|
status="PASS",
|
|
duration=time.time() - start,
|
|
requests=requests,
|
|
findings=findings,
|
|
)
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
# Report Generation
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
|
|
def generate_report(self) -> str:
|
|
"""Generate final report."""
|
|
severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4}
|
|
sorted_findings = sorted(self.findings, key=lambda f: severity_order.get(f.severity, 5))
|
|
|
|
report = []
|
|
report.append("=" * 79)
|
|
report.append("FLASKPASTE OFFENSIVE SECURITY TEST REPORT")
|
|
report.append("=" * 79)
|
|
report.append(f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
report.append(f"Target: {self.target}")
|
|
report.append("")
|
|
|
|
# Summary
|
|
report.append("SUMMARY")
|
|
report.append("-" * 79)
|
|
total_requests = sum(r.requests for r in self.results)
|
|
report.append(f"Total phases run: {len(self.results)}")
|
|
report.append(f"Total requests: {total_requests}")
|
|
report.append(f"Total findings: {len(self.findings)}")
|
|
|
|
by_severity: dict[str, int] = {}
|
|
for f in self.findings:
|
|
by_severity[f.severity] = by_severity.get(f.severity, 0) + 1
|
|
|
|
for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]:
|
|
if sev in by_severity:
|
|
report.append(f" {sev}: {by_severity[sev]}")
|
|
|
|
report.append("")
|
|
|
|
# Phase Results
|
|
report.append("PHASE RESULTS")
|
|
report.append("-" * 79)
|
|
for r in self.results:
|
|
status_sym = "✓" if r.status == "PASS" else "✗"
|
|
report.append(
|
|
f"{status_sym} Phase {r.phase}: {r.test} ({r.requests} requests, {r.duration:.1f}s)"
|
|
)
|
|
if r.findings:
|
|
report.append(f" → {len(r.findings)} findings")
|
|
if r.errors:
|
|
report.append(f" → {len(r.errors)} errors")
|
|
|
|
report.append("")
|
|
|
|
# Findings
|
|
if sorted_findings:
|
|
report.append("FINDINGS")
|
|
report.append("-" * 79)
|
|
for f in sorted_findings:
|
|
report.append(str(f))
|
|
|
|
return "\n".join(report)
|
|
|
|
def run(self, phases: list[int] | None = None, quick: bool = False) -> None:
|
|
"""Run fuzzing campaign."""
|
|
all_phases = [
|
|
(1, self.phase1_recon),
|
|
(2, self.phase2_input_fuzzing),
|
|
(3, self.phase3_injection),
|
|
(4, self.phase4_auth),
|
|
(5, self.phase5_logic),
|
|
(6, self.phase6_crypto),
|
|
]
|
|
|
|
if quick:
|
|
phases = [1, 3] # Just recon and injection for quick test
|
|
|
|
if phases:
|
|
all_phases = [(n, f) for n, f in all_phases if n in phases]
|
|
|
|
try:
|
|
if not self.start_server():
|
|
print("Failed to start server, aborting")
|
|
return
|
|
|
|
for phase_num, phase_func in all_phases:
|
|
try:
|
|
result = phase_func()
|
|
self.results.append(result)
|
|
status = "✓" if result.status == "PASS" else "✗"
|
|
self.log(
|
|
f" {status} {result.test}: {len(result.findings)} findings",
|
|
"OK" if result.status == "PASS" else "ERROR",
|
|
)
|
|
except Exception as e:
|
|
self.log(f" Phase {phase_num} failed: {e}", "ERROR")
|
|
self.results.append(
|
|
FuzzResult(
|
|
phase=str(phase_num),
|
|
test=phase_func.__name__,
|
|
status="ERROR",
|
|
duration=0,
|
|
errors=[str(e)],
|
|
)
|
|
)
|
|
|
|
# Generate and save report
|
|
report = self.generate_report()
|
|
report_file = self.fuzz_dir / "results" / "REPORT.txt"
|
|
report_file.write_text(report)
|
|
print(report)
|
|
print(f"\nReport saved to: {report_file}")
|
|
|
|
finally:
|
|
self.stop_server()
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="FlaskPaste Offensive Security Tester")
|
|
parser.add_argument("--phases", type=str, help="Comma-separated phase numbers (e.g., 1,2,3)")
|
|
parser.add_argument("--quick", action="store_true", help="Quick smoke test")
|
|
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
|
|
parser.add_argument("--fuzz-dir", type=str, help="Fuzzing workspace directory")
|
|
args = parser.parse_args()
|
|
|
|
phases = None
|
|
if args.phases:
|
|
phases = [int(p.strip()) for p in args.phases.split(",")]
|
|
|
|
fuzz_dir = Path(args.fuzz_dir) if args.fuzz_dir else None
|
|
|
|
fuzzer = FlaskPasteFuzzer(verbose=args.verbose, fuzz_dir=fuzz_dir)
|
|
|
|
# Handle Ctrl+C gracefully
|
|
def signal_handler(sig: int, frame: Any) -> None:
|
|
print("\nInterrupted, cleaning up...")
|
|
fuzzer.stop_server()
|
|
sys.exit(1)
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
fuzzer.run(phases=phases, quick=args.quick)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|