#!/usr/bin/env python3 """ FlaskPaste Offensive Security Testing Harness Runs comprehensive fuzzing and penetration testing against an isolated FlaskPaste instance. Usage: ./run_fuzz.py [--phases PHASES] [--quick] [--verbose] Examples: ./run_fuzz.py # Run all phases ./run_fuzz.py --phases 1,2,3 # Run specific phases ./run_fuzz.py --quick # Quick smoke test """ from __future__ import annotations import argparse import json import os import random import signal import socket import string import subprocess import sys import tempfile import time from dataclasses import dataclass, field from pathlib import Path from typing import Any from urllib.parse import quote # Ensure we can import the app sys.path.insert(0, str(Path(__file__).parent.parent.parent)) @dataclass class Finding: """Security finding.""" severity: str # CRITICAL, HIGH, MEDIUM, LOW, INFO category: str # Injection, Auth, Logic, Crypto, DoS, Info endpoint: str vector: str description: str reproduction: str impact: str evidence: str = "" def __str__(self) -> str: return f""" ┌─────────────────────────────────────────────────────────────────────────────┐ │ FINDING: {self.category}-{self.severity[:4]} ├─────────────────────────────────────────────────────────────────────────────┤ │ Severity: {self.severity} │ Category: {self.category} │ Endpoint: {self.endpoint} │ Vector: {self.vector} ├─────────────────────────────────────────────────────────────────────────────┤ │ Description: │ {self.description} │ │ Reproduction: │ {self.reproduction} │ │ Impact: │ {self.impact} │ │ Evidence: │ {self.evidence[:200]} └─────────────────────────────────────────────────────────────────────────────┘ """ @dataclass class FuzzResult: """Result of a fuzz test.""" phase: str test: str status: str # PASS, FAIL, ERROR, TIMEOUT duration: float requests: int = 0 findings: list[Finding] = field(default_factory=list) errors: list[str] = field(default_factory=list) class FlaskPasteFuzzer: """Comprehensive fuzzer for FlaskPaste.""" def __init__( self, target: str = "http://127.0.0.1:5099", fuzz_dir: Path | None = None, verbose: bool = False, ): self.target = target self.fuzz_dir = fuzz_dir or Path(tempfile.mkdtemp(prefix="flaskpaste-fuzz-")) self.verbose = verbose self.findings: list[Finding] = [] self.results: list[FuzzResult] = [] self.server_proc: subprocess.Popen | None = None # Create directories (self.fuzz_dir / "db").mkdir(parents=True, exist_ok=True) (self.fuzz_dir / "logs").mkdir(exist_ok=True) (self.fuzz_dir / "crashes").mkdir(exist_ok=True) (self.fuzz_dir / "results").mkdir(exist_ok=True) def log(self, msg: str, level: str = "INFO") -> None: """Log message.""" if self.verbose or level in ("ERROR", "CRITICAL", "FINDING"): symbol = {"INFO": "●", "ERROR": "✗", "FINDING": "⚠", "OK": "✓"}.get(level, "●") print(f"{symbol} {msg}") def start_server(self) -> bool: """Start isolated FlaskPaste instance.""" self.log("Starting isolated FlaskPaste instance...") env = os.environ.copy() env.update( { "FLASKPASTE_DATABASE": str(self.fuzz_dir / "db" / "fuzz.db"), "FLASKPASTE_SECRET_KEY": f"fuzz-{os.urandom(16).hex()}", "FLASKPASTE_PKI_PASSWORD": "fuzz-pki-password", "FLASKPASTE_REGISTRATION_ENABLED": "true", "FLASKPASTE_POW_DIFFICULTY": "1", "FLASKPASTE_RATE_LIMIT_ENABLED": "false", "FLASKPASTE_PROXY_SECRET": "", "FLASK_ENV": "development", "FLASK_DEBUG": "0", } ) project_root = Path(__file__).parent.parent.parent self.log_file = open( # noqa: SIM115 self.fuzz_dir / "logs" / "server.log", "w" ) self.server_proc = subprocess.Popen( # noqa: S603 [ str(project_root / "venv" / "bin" / "python"), "run.py", "--host", "127.0.0.1", "--port", "5099", ], cwd=project_root, env=env, stdout=self.log_file, stderr=subprocess.STDOUT, ) # Wait for server to start for _ in range(30): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) sock.connect(("127.0.0.1", 5099)) sock.close() self.log("Server started successfully", "OK") return True except (TimeoutError, ConnectionRefusedError): time.sleep(0.5) self.log("Server failed to start", "ERROR") return False def stop_server(self) -> None: """Stop the fuzzing server.""" if self.server_proc: self.server_proc.terminate() try: self.server_proc.wait(timeout=5) except subprocess.TimeoutExpired: self.server_proc.kill() self.log("Server stopped") def http_request( self, method: str, path: str, data: bytes | None = None, headers: dict[str, str] | None = None, timeout: float = 5.0, ) -> tuple[int | None, bytes, float]: """Make HTTP request, return (status, body, duration).""" import http.client import urllib.parse parsed = urllib.parse.urlparse(self.target) start = time.perf_counter() try: assert parsed.hostname is not None conn = http.client.HTTPConnection(parsed.hostname, parsed.port, timeout=timeout) hdrs = headers or {} if data: hdrs.setdefault("Content-Length", str(len(data))) conn.request(method, path, body=data, headers=hdrs) resp = conn.getresponse() body = resp.read() duration = time.perf_counter() - start conn.close() return resp.status, body, duration except Exception as e: duration = time.perf_counter() - start return None, str(e).encode(), duration # ───────────────────────────────────────────────────────────────────────── # Phase 1: Reconnaissance # ───────────────────────────────────────────────────────────────────────── def phase1_recon(self) -> FuzzResult: """Phase 1: Reconnaissance and enumeration.""" self.log("Phase 1: Reconnaissance") start = time.time() requests = 0 findings = [] # Known endpoints endpoints = [ "/", "/health", "/challenge", "/client", "/register", "/pastes", "/pki", "/pki/ca", "/pki/ca.crt", "/pki/issue", "/pki/certs", "/audit", ] # Hidden endpoint discovery hidden_tests = [ "/admin", "/debug", "/config", "/env", "/.env", "/robots.txt", "/sitemap.xml", "/.git/config", "/api", "/api/v1", "/swagger", "/docs", "/graphql", "/metrics", "/prometheus", "/__debug__", "/server-status", "/phpinfo.php", "/wp-admin", ] for path in endpoints + hidden_tests: status, _, _ = self.http_request("GET", path) requests += 1 if status and status not in (404, 405): self.log(f" {status} {path}") if path in hidden_tests and status == 200: findings.append( Finding( severity="MEDIUM", category="Info", endpoint=f"GET {path}", vector="URL", description=f"Hidden endpoint accessible: {path}", reproduction=f"curl {self.target}{path}", impact="Information disclosure, potential attack surface", ) ) # HTTP method enumeration methods = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD", "TRACE"] for method in methods: status, _body, _ = self.http_request(method, "/") requests += 1 if method == "TRACE" and status == 200: findings.append( Finding( severity="LOW", category="Info", endpoint="TRACE /", vector="HTTP Method", description="TRACE method enabled", reproduction=f"curl -X TRACE {self.target}/", impact="Cross-site tracing (XST) possible", ) ) self.findings.extend(findings) return FuzzResult( phase="1", test="Reconnaissance", status="PASS", duration=time.time() - start, requests=requests, findings=findings, ) # ───────────────────────────────────────────────────────────────────────── # Phase 2: Input Fuzzing # ───────────────────────────────────────────────────────────────────────── def phase2_input_fuzzing(self) -> FuzzResult: """Phase 2: Input fuzzing.""" self.log("Phase 2: Input Fuzzing") start = time.time() requests = 0 findings = [] errors = [] # Paste content fuzzing payloads: list[bytes] = [ b"normal text", b"\x00" * 100, # Null bytes b"\xff" * 100, # High bytes os.urandom(1000), # Random binary b"A" * 100000, # Large payload "".join(random.choices(string.printable, k=1000)).encode(), ("\u202e" * 100).encode("utf-8"), # RTL override ("A\u0300" * 100).encode("utf-8"), # Combining characters ] for payload in payloads: status, body, _duration = self.http_request( "POST", "/", data=payload, headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"}, ) requests += 1 if status is None: errors.append(f"Request failed: {body.decode(errors='replace')[:100]}") elif status == 500: findings.append( Finding( severity="HIGH", category="DoS", endpoint="POST /", vector="Body", description=f"Server error with payload: {payload[:50]!r}", reproduction=f"curl -X POST {self.target}/ --data-binary ...", impact="Potential denial of service", evidence=body.decode(errors="replace")[:200], ) ) # Save crash payload crash_file = self.fuzz_dir / "crashes" / f"{int(time.time())}.payload" crash_file.write_bytes(payload) # Header fuzzing header_tests = [ ("X-Forwarded-For", "' OR 1=1--"), ("X-Forwarded-For", "127.0.0.1, 10.0.0.1, evil.com"), ("X-SSL-Client-SHA1", "' OR 1=1--"), ("X-SSL-Client-SHA1", "../../../etc/passwd"), ("X-PoW-Token", "A" * 10000), ("Host", "evil.com"), ("Host", "evil.com\r\nX-Injected: true"), ("X-Expiry", "-1"), ("X-Expiry", "99999999999999999999"), ("Content-Type", "text/html\r\nX-Injected: true"), ] for header, value in header_tests: status, body, _ = self.http_request("GET", "/", headers={header: value}) requests += 1 if b"X-Injected" in body or b"true" in body: findings.append( Finding( severity="HIGH", category="Injection", endpoint="GET /", vector=f"Header: {header}", description="Header injection vulnerability", reproduction=f'curl {self.target}/ -H "{header}: {value}"', impact="HTTP response splitting, cache poisoning", ) ) self.findings.extend(findings) return FuzzResult( phase="2", test="Input Fuzzing", status="PASS" if not errors else "ERROR", duration=time.time() - start, requests=requests, findings=findings, errors=errors, ) # ───────────────────────────────────────────────────────────────────────── # Phase 3: Injection Attacks # ───────────────────────────────────────────────────────────────────────── def phase3_injection(self) -> FuzzResult: """Phase 3: Injection attacks.""" self.log("Phase 3: Injection Attacks") start = time.time() requests = 0 findings = [] # SQL injection payloads sqli_payloads = [ "' OR '1'='1", "1' OR '1'='1'--", "1; DROP TABLE pastes;--", "1 UNION SELECT * FROM sqlite_master--", "1' AND SLEEP(5)--", "1/**/OR/**/1=1", ] for payload in sqli_payloads: encoded = quote(payload, safe="") # Test in paste ID _status, body, duration = self.http_request("GET", f"/{encoded}") requests += 1 # Timing-based detection if duration > 4.5: findings.append( Finding( severity="CRITICAL", category="Injection", endpoint=f"GET /{payload[:20]}...", vector="paste_id", description="Possible blind SQL injection (time-based)", reproduction=f"curl {self.target}/{encoded}", impact="Full database compromise", evidence=f"Response time: {duration:.2f}s", ) ) # Error-based detection if b"sqlite" in body.lower() or b"syntax" in body.lower(): findings.append( Finding( severity="CRITICAL", category="Injection", endpoint=f"GET /{payload[:20]}...", vector="paste_id", description="SQL injection with error disclosure", reproduction=f"curl {self.target}/{encoded}", impact="Full database compromise", evidence=body.decode(errors="replace")[:200], ) ) # Path traversal traversal_payloads = [ "../../../etc/passwd", "....//....//....//etc/passwd", "..%2f..%2f..%2fetc/passwd", "..%252f..%252f..%252fetc/passwd", ] for payload in traversal_payloads: _status, body, _ = self.http_request("GET", f"/{payload}") requests += 1 if b"root:" in body: findings.append( Finding( severity="CRITICAL", category="Injection", endpoint=f"GET /{payload}", vector="paste_id", description="Path traversal vulnerability", reproduction=f"curl {self.target}/{payload}", impact="Arbitrary file read", evidence=body.decode(errors="replace")[:200], ) ) # SSTI ssti_payloads = [ ("{{7*7}}", b"49"), ("{{config}}", b"SECRET_KEY"), ("${7*7}", b"49"), ] for payload, indicator in ssti_payloads: _status, body, _ = self.http_request( "POST", "/", data=payload.encode(), headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"}, ) requests += 1 if indicator in body: findings.append( Finding( severity="CRITICAL", category="Injection", endpoint="POST /", vector="Body", description="Server-side template injection", reproduction=f"curl -X POST {self.target}/ -d '{payload}'", impact="Remote code execution", evidence=body.decode(errors="replace")[:200], ) ) # Command injection cmd_payloads = [ "; sleep 5", "| sleep 5", "$(sleep 5)", "`sleep 5`", ] for payload in cmd_payloads: _, _, duration = self.http_request( "POST", "/", data=payload.encode(), headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"}, ) requests += 1 if duration > 4.5: findings.append( Finding( severity="CRITICAL", category="Injection", endpoint="POST /", vector="Body", description="Command injection vulnerability", reproduction=f"curl -X POST {self.target}/ -d '{payload}'", impact="Remote code execution", evidence=f"Response time: {duration:.2f}s", ) ) self.findings.extend(findings) return FuzzResult( phase="3", test="Injection Attacks", status="PASS", duration=time.time() - start, requests=requests, findings=findings, ) # ───────────────────────────────────────────────────────────────────────── # Phase 4: Authentication & Authorization # ───────────────────────────────────────────────────────────────────────── def phase4_auth(self) -> FuzzResult: """Phase 4: Authentication and authorization attacks.""" self.log("Phase 4: Auth/Authz") start = time.time() requests = 0 findings = [] # Certificate fingerprint spoofing fake_fingerprints = [ "0000000000000000000000000000000000000000", "da39a3ee5e6b4b0d3255bfef95601890afd80709", # SHA1 of empty "ffffffffffffffffffffffffffffffffffffffff", "' OR 1=1--", "../../../etc/passwd", ] for fp in fake_fingerprints: status, body, _ = self.http_request("GET", "/pastes", headers={"X-SSL-Client-SHA1": fp}) requests += 1 if status == 200: findings.append( Finding( severity="CRITICAL", category="Auth", endpoint="GET /pastes", vector="X-SSL-Client-SHA1", description="Certificate fingerprint bypass", reproduction=f'curl {self.target}/pastes -H "X-SSL-Client-SHA1: {fp}"', impact="Authentication bypass", evidence=body.decode(errors="replace")[:200], ) ) # PoW bypass pow_bypasses = [ ({"X-PoW-Token": "", "X-PoW-Solution": ""}, "empty"), ({"X-PoW-Token": "invalid", "X-PoW-Solution": "0"}, "invalid token"), ({"X-PoW-Token": "test", "X-PoW-Solution": "-1"}, "negative solution"), ( {"X-PoW-Token": "test", "X-PoW-Solution": "999999999999999999999"}, "overflow", ), ] for headers, desc in pow_bypasses: status, body, _ = self.http_request("POST", "/", data=b"test", headers=headers) requests += 1 if status == 201: findings.append( Finding( severity="HIGH", category="Auth", endpoint="POST /", vector="PoW headers", description=f"PoW bypass via {desc}", reproduction=f"curl -X POST {self.target}/ -d test -H ...", impact="Spam protection bypass", ) ) # Admin endpoint access without auth admin_endpoints = [ ("GET", "/pki/certs"), ("POST", "/pki/issue"), ("GET", "/audit"), ] for method, path in admin_endpoints: status, body, _ = self.http_request(method, path) requests += 1 if status == 200: findings.append( Finding( severity="HIGH", category="Auth", endpoint=f"{method} {path}", vector="Missing auth", description="Admin endpoint accessible without authentication", reproduction=f"curl -X {method} {self.target}{path}", impact="Unauthorized admin access", ) ) self.findings.extend(findings) return FuzzResult( phase="4", test="Auth/Authz", status="PASS", duration=time.time() - start, requests=requests, findings=findings, ) # ───────────────────────────────────────────────────────────────────────── # Phase 5: Business Logic # ───────────────────────────────────────────────────────────────────────── def phase5_logic(self) -> FuzzResult: """Phase 5: Business logic attacks.""" self.log("Phase 5: Business Logic") start = time.time() requests = 0 findings = [] # Expiry manipulation expiry_tests = [ ("-1", "negative"), ("0", "zero"), ("99999999999999999999", "overflow"), ("abc", "non-numeric"), ] for value, desc in expiry_tests: status, body, _ = self.http_request( "POST", "/", data=b"expiry-test", headers={ "X-Expiry": value, "X-PoW-Token": "test", "X-PoW-Solution": "0", }, ) requests += 1 if status == 201: # Check if it was accepted without error try: data = json.loads(body) if "id" in data: findings.append( Finding( severity="MEDIUM", category="Logic", endpoint="POST /", vector="X-Expiry", description=f"Expiry manipulation ({desc}) accepted", reproduction=( f'curl -X POST {self.target}/ -d test -H "X-Expiry: {value}"' ), impact="Paste lifetime manipulation", ) ) except json.JSONDecodeError: pass # Large payload DoS sizes = [1024 * 1024, 10 * 1024 * 1024] # 1MB, 10MB for size in sizes: payload = os.urandom(size) status, body, duration = self.http_request( "POST", "/", data=payload, headers={"X-PoW-Token": "test", "X-PoW-Solution": "0"}, timeout=30, ) requests += 1 if status == 201: findings.append( Finding( severity="MEDIUM", category="DoS", endpoint="POST /", vector="Body size", description=f"Large payload ({size // 1024 // 1024}MB) accepted", reproduction=( f"dd if=/dev/urandom bs=1M count={size // 1024 // 1024} | " f"curl -X POST {self.target}/ --data-binary @-" ), impact="Resource exhaustion", evidence=f"Upload time: {duration:.2f}s", ) ) elif status == 500: findings.append( Finding( severity="HIGH", category="DoS", endpoint="POST /", vector="Body size", description=f"Server crash on large payload ({size // 1024 // 1024}MB)", reproduction=( f"dd if=/dev/urandom bs=1M count={size // 1024 // 1024} | " f"curl -X POST {self.target}/ --data-binary @-" ), impact="Denial of service", ) ) self.findings.extend(findings) return FuzzResult( phase="5", test="Business Logic", status="PASS", duration=time.time() - start, requests=requests, findings=findings, ) # ───────────────────────────────────────────────────────────────────────── # Phase 6: Crypto # ───────────────────────────────────────────────────────────────────────── def phase6_crypto(self) -> FuzzResult: """Phase 6: Cryptographic attacks.""" self.log("Phase 6: Cryptography") start = time.time() requests = 0 findings = [] # Collect PoW tokens for randomness analysis tokens = [] for _ in range(100): status, body, _ = self.http_request("GET", "/challenge") requests += 1 if status == 200: try: data = json.loads(body) if "token" in data: tokens.append(data["token"]) except json.JSONDecodeError: pass # Check for duplicate tokens if len(tokens) != len(set(tokens)): findings.append( Finding( severity="HIGH", category="Crypto", endpoint="GET /challenge", vector="token", description="Duplicate PoW tokens generated", reproduction=f"for i in {{1..100}}; do curl -s {self.target}/challenge; done", impact="PoW replay attack possible", ) ) # Password timing attack simulation # Create a paste with password first would be needed # Skipping for now as it requires state setup self.findings.extend(findings) return FuzzResult( phase="6", test="Cryptography", status="PASS", duration=time.time() - start, requests=requests, findings=findings, ) # ───────────────────────────────────────────────────────────────────────── # Report Generation # ───────────────────────────────────────────────────────────────────────── def generate_report(self) -> str: """Generate final report.""" severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4} sorted_findings = sorted(self.findings, key=lambda f: severity_order.get(f.severity, 5)) report = [] report.append("=" * 79) report.append("FLASKPASTE OFFENSIVE SECURITY TEST REPORT") report.append("=" * 79) report.append(f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}") report.append(f"Target: {self.target}") report.append("") # Summary report.append("SUMMARY") report.append("-" * 79) total_requests = sum(r.requests for r in self.results) report.append(f"Total phases run: {len(self.results)}") report.append(f"Total requests: {total_requests}") report.append(f"Total findings: {len(self.findings)}") by_severity: dict[str, int] = {} for f in self.findings: by_severity[f.severity] = by_severity.get(f.severity, 0) + 1 for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]: if sev in by_severity: report.append(f" {sev}: {by_severity[sev]}") report.append("") # Phase Results report.append("PHASE RESULTS") report.append("-" * 79) for r in self.results: status_sym = "✓" if r.status == "PASS" else "✗" report.append( f"{status_sym} Phase {r.phase}: {r.test} ({r.requests} requests, {r.duration:.1f}s)" ) if r.findings: report.append(f" → {len(r.findings)} findings") if r.errors: report.append(f" → {len(r.errors)} errors") report.append("") # Findings if sorted_findings: report.append("FINDINGS") report.append("-" * 79) for f in sorted_findings: report.append(str(f)) return "\n".join(report) def run(self, phases: list[int] | None = None, quick: bool = False) -> None: """Run fuzzing campaign.""" all_phases = [ (1, self.phase1_recon), (2, self.phase2_input_fuzzing), (3, self.phase3_injection), (4, self.phase4_auth), (5, self.phase5_logic), (6, self.phase6_crypto), ] if quick: phases = [1, 3] # Just recon and injection for quick test if phases: all_phases = [(n, f) for n, f in all_phases if n in phases] try: if not self.start_server(): print("Failed to start server, aborting") return for phase_num, phase_func in all_phases: try: result = phase_func() self.results.append(result) status = "✓" if result.status == "PASS" else "✗" self.log( f" {status} {result.test}: {len(result.findings)} findings", "OK" if result.status == "PASS" else "ERROR", ) except Exception as e: self.log(f" Phase {phase_num} failed: {e}", "ERROR") self.results.append( FuzzResult( phase=str(phase_num), test=phase_func.__name__, status="ERROR", duration=0, errors=[str(e)], ) ) # Generate and save report report = self.generate_report() report_file = self.fuzz_dir / "results" / "REPORT.txt" report_file.write_text(report) print(report) print(f"\nReport saved to: {report_file}") finally: self.stop_server() def main() -> None: parser = argparse.ArgumentParser(description="FlaskPaste Offensive Security Tester") parser.add_argument("--phases", type=str, help="Comma-separated phase numbers (e.g., 1,2,3)") parser.add_argument("--quick", action="store_true", help="Quick smoke test") parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") parser.add_argument("--fuzz-dir", type=str, help="Fuzzing workspace directory") args = parser.parse_args() phases = None if args.phases: phases = [int(p.strip()) for p in args.phases.split(",")] fuzz_dir = Path(args.fuzz_dir) if args.fuzz_dir else None fuzzer = FlaskPasteFuzzer(verbose=args.verbose, fuzz_dir=fuzz_dir) # Handle Ctrl+C gracefully def signal_handler(sig: int, frame: Any) -> None: print("\nInterrupted, cleaning up...") fuzzer.stop_server() sys.exit(1) signal.signal(signal.SIGINT, signal_handler) fuzzer.run(phases=phases, quick=args.quick) if __name__ == "__main__": main()