#!/usr/bin/env python3 """Comprehensive penetration testing session for FlaskPaste.""" import hashlib import json import os import sys import time import urllib.error import urllib.request from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any BASE_URL = "http://127.0.0.1:5099" def request(url, method="GET", data=None, headers=None): """Make HTTP request.""" headers = headers or {} req = urllib.request.Request(url, data=data, headers=headers, method=method) # noqa: S310 try: with urllib.request.urlopen(req, timeout=30) as resp: # noqa: S310 return resp.status, resp.read(), dict(resp.headers) except urllib.error.HTTPError as e: return e.code, e.read(), dict(e.headers) except Exception as e: return 0, str(e).encode(), {} def solve_pow(nonce, difficulty): """Solve proof-of-work challenge.""" n = 0 target_bytes = (difficulty + 7) // 8 while True: work = f"{nonce}:{n}".encode() hash_bytes = hashlib.sha256(work).digest() zero_bits = 0 for byte in hash_bytes[: target_bytes + 1]: if byte == 0: zero_bits += 8 else: zero_bits += 8 - byte.bit_length() break if zero_bits >= difficulty: return n n += 1 def get_pow_headers(): """Get PoW challenge and solve it.""" status, body, _ = request(f"{BASE_URL}/challenge") if status != 200: return {} data = json.loads(body) if not data.get("enabled"): return {} solution = solve_pow(data["nonce"], data["difficulty"]) return { "X-PoW-Token": data["token"], "X-PoW-Solution": str(solution), } def random_content(size=1024): """Generate random content.""" return os.urandom(size) def run_tests() -> dict[str, Any]: """Run comprehensive pentest suite.""" results: dict[str, Any] = {"passed": 0, "failed": 0, "tests": []} paste_ids = [] def log_test(name, passed, details=""): status = "PASS" if passed else "FAIL" results["passed" if passed else "failed"] += 1 results["tests"].append({"name": name, "passed": passed, "details": details}) print(f" [{status}] {name}") if details and not passed: print(f" {details[:100]}") print("\n" + "=" * 60) print("PENETRATION TESTING SESSION") print("=" * 60) # Phase 1: Reconnaissance print("\n[Phase 1] Reconnaissance") print("-" * 40) status, body, _ = request(f"{BASE_URL}/") log_test("GET / returns API info", status == 200) status, body, _ = request(f"{BASE_URL}/health") log_test("GET /health returns ok", status == 200) status, body, _ = request(f"{BASE_URL}/challenge") log_test("GET /challenge returns PoW data", status == 200) status, body, _ = request(f"{BASE_URL}/client") log_test("GET /client returns CLI", status == 200 and len(body) > 10000) status, body, _ = request(f"{BASE_URL}/metrics") log_test("GET /metrics returns prometheus data", status == 200) # Phase 2: Paste Creation print("\n[Phase 2] Paste Creation") print("-" * 40) # Create normal paste pow_headers = get_pow_headers() content = b"test paste content" status, body, _ = request(f"{BASE_URL}/", "POST", content, pow_headers) if status == 201: data = json.loads(body) paste_ids.append(data["id"]) log_test("Create paste with PoW", True) else: log_test("Create paste with PoW", False, body.decode()[:100]) # Create burn-after-read paste pow_headers = get_pow_headers() pow_headers["X-Burn-After-Read"] = "true" status, body, _ = request(f"{BASE_URL}/", "POST", b"burn content", pow_headers) burn_id = None if status == 201: data = json.loads(body) burn_id = data["id"] log_test("Create burn-after-read paste", True) else: log_test("Create burn-after-read paste", False) # Create password-protected paste pow_headers = get_pow_headers() pow_headers["X-Paste-Password"] = "secret123" status, body, _ = request(f"{BASE_URL}/", "POST", b"protected content", pow_headers) pw_id = None if status == 201: data = json.loads(body) pw_id = data["id"] paste_ids.append(pw_id) log_test("Create password-protected paste", True) else: log_test("Create password-protected paste", False) # Create expiring paste pow_headers = get_pow_headers() pow_headers["X-Expiry"] = "300" status, body, _ = request(f"{BASE_URL}/", "POST", b"expiring content", pow_headers) if status == 201: data = json.loads(body) paste_ids.append(data["id"]) log_test("Create paste with expiry", True) else: log_test("Create paste with expiry", False) # Phase 3: Paste Retrieval print("\n[Phase 3] Paste Retrieval") print("-" * 40) if paste_ids: pid = paste_ids[0] status, body, _ = request(f"{BASE_URL}/{pid}") log_test("GET paste metadata", status == 200) status, body, _ = request(f"{BASE_URL}/{pid}/raw") log_test("GET paste raw content", status == 200) status, body, _ = request(f"{BASE_URL}/{pid}", "HEAD") log_test("HEAD request for paste", status == 200) # Test burn-after-read if burn_id: status, body, _ = request(f"{BASE_URL}/{burn_id}/raw") first_read = status == 200 status, body, _ = request(f"{BASE_URL}/{burn_id}/raw") second_read = status == 404 log_test("Burn-after-read works", first_read and second_read) # Test password protection if pw_id: status, body, _ = request(f"{BASE_URL}/{pw_id}/raw") log_test("Password-protected paste requires auth", status == 401) status, body, _ = request( f"{BASE_URL}/{pw_id}/raw", headers={"X-Paste-Password": "wrongpassword"} ) log_test("Wrong password rejected", status == 403) status, body, _ = request( f"{BASE_URL}/{pw_id}/raw", headers={"X-Paste-Password": "secret123"} ) log_test("Correct password accepted", status == 200) # Phase 4: Error Handling print("\n[Phase 4] Error Handling") print("-" * 40) status, body, _ = request(f"{BASE_URL}/nonexistent123") log_test("Non-existent paste returns 404", status == 404) status, body, _ = request(f"{BASE_URL}/!!!invalid!!!") log_test("Invalid paste ID rejected", status == 400 or status == 404) status, body, _ = request(f"{BASE_URL}/", "POST", b"no pow") log_test("POST without PoW rejected", status in (400, 429)) status, body, _ = request( f"{BASE_URL}/", "POST", b"x", {"X-PoW-Token": "invalid", "X-PoW-Solution": "0"} ) log_test("Invalid PoW token rejected", status == 400) # Phase 5: Injection Attacks print("\n[Phase 5] Injection Attacks") print("-" * 40) # SQL injection in paste ID sqli_payloads = ["1' OR '1'='1", "1; DROP TABLE pastes;--", "1 UNION SELECT * FROM users"] for payload in sqli_payloads: status, body, _ = request(f"{BASE_URL}/{payload}") log_test(f"SQLi rejected: {payload[:20]}", status in (400, 404)) # SSTI attempts ssti_payloads = ["{{7*7}}", "${7*7}", "<%=7*7%>", "#{7*7}"] pow_headers = get_pow_headers() for payload in ssti_payloads: status, body, _ = request(f"{BASE_URL}/", "POST", payload.encode(), pow_headers) if status == 201: data = json.loads(body) _, content, _ = request(f"{BASE_URL}/{data['id']}/raw") log_test("SSTI payload stored safely", b"49" not in content) paste_ids.append(data["id"]) pow_headers = get_pow_headers() # XSS in content xss_payload = b"" pow_headers = get_pow_headers() status, body, _ = request(f"{BASE_URL}/", "POST", xss_payload, pow_headers) if status == 201: data = json.loads(body) status, content, resp_headers = request(f"{BASE_URL}/{data['id']}/raw") csp = resp_headers.get("Content-Security-Policy", "") xco = resp_headers.get("X-Content-Type-Options", "") log_test("XSS mitigated by headers", "nosniff" in xco and "default-src" in csp) paste_ids.append(data["id"]) # Phase 6: Header Injection print("\n[Phase 6] Header Injection") print("-" * 40) pow_headers = get_pow_headers() pow_headers["X-Forwarded-For"] = "1.2.3.4, 5.6.7.8" status, body, _ = request(f"{BASE_URL}/", "POST", b"xff test", pow_headers) log_test("X-Forwarded-For handled safely", status in (201, 400, 429)) pow_headers = get_pow_headers() pow_headers["Host"] = "evil.com" status, body, _ = request(f"{BASE_URL}/", "POST", b"host test", pow_headers) log_test("Host header override handled", status in (201, 400, 429)) # Phase 7: Rate Limiting print("\n[Phase 7] Rate Limiting") print("-" * 40) # Make many rapid requests for _ in range(100): status, _, _ = request(f"{BASE_URL}/health") if status == 429: break log_test("Rate limiting active on reads", True) # May or may not hit # Phase 8: Size Limits print("\n[Phase 8] Size Limits") print("-" * 40) # Try to exceed size limit pow_headers = get_pow_headers() large_content = random_content(4 * 1024 * 1024) # 4MB status, body, _ = request(f"{BASE_URL}/", "POST", large_content, pow_headers) log_test("Size limit enforced", status == 413 or status == 400) # Phase 9: Concurrent Access print("\n[Phase 9] Concurrent Access") print("-" * 40) def concurrent_request(_): pow_h = get_pow_headers() return request(f"{BASE_URL}/", "POST", b"concurrent", pow_h) with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(concurrent_request, i) for i in range(10)] statuses = [f.result()[0] for f in as_completed(futures)] success_count = sum(1 for s in statuses if s == 201) log_test("Concurrent requests handled", success_count > 0) # Phase 10: MIME Type Detection print("\n[Phase 10] MIME Type Detection") print("-" * 40) mime_tests = [ (b"\x89PNG\r\n\x1a\n", "image/png"), (b"GIF89a", "image/gif"), (b"%PDF-1.4", "application/pdf"), (b"PK\x03\x04", "application/zip"), (b"\x1f\x8b\x08", "application/gzip"), ] for magic, expected_mime in mime_tests: pow_headers = get_pow_headers() status, body, _ = request(f"{BASE_URL}/", "POST", magic + b"\x00" * 100, pow_headers) if status == 201: data = json.loads(body) status, info_body, _ = request(f"{BASE_URL}/{data['id']}") if status == 200: info = json.loads(info_body) detected = info.get("mime_type", "") log_test(f"MIME detection: {expected_mime}", detected == expected_mime) paste_ids.append(data["id"]) # Summary print("\n" + "=" * 60) print("SUMMARY") print("=" * 60) print(f" Passed: {results['passed']}") print(f" Failed: {results['failed']}") print(f" Total: {results['passed'] + results['failed']}") print(f" Pastes created: {len(paste_ids)}") return results if __name__ == "__main__": start = time.time() results = run_tests() elapsed = time.time() - start print(f"\n Elapsed: {elapsed:.2f}s") sys.exit(0 if results["failed"] == 0 else 1)