Some checks failed
CI / Lint & Format (push) Successful in 22s
CI / Security Scan (push) Successful in 21s
CI / Unit Tests (push) Failing after 17s
CI / Advanced Security Tests (push) Failing after 14s
CI / Memory Leak Check (push) Successful in 20s
CI / Security Tests (push) Successful in 25s
CI / Build & Push Image (push) Has been skipped
CI / SBOM Generation (push) Successful in 20s
338 lines
11 KiB
Python
338 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""Comprehensive penetration testing session for FlaskPaste."""
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from typing import Any
|
|
|
|
BASE_URL = "http://127.0.0.1:5099"
|
|
|
|
|
|
def request(url, method="GET", data=None, headers=None):
|
|
"""Make HTTP request."""
|
|
headers = headers or {}
|
|
req = urllib.request.Request(url, data=data, headers=headers, method=method) # noqa: S310
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp: # noqa: S310
|
|
return resp.status, resp.read(), dict(resp.headers)
|
|
except urllib.error.HTTPError as e:
|
|
return e.code, e.read(), dict(e.headers)
|
|
except Exception as e:
|
|
return 0, str(e).encode(), {}
|
|
|
|
|
|
def solve_pow(nonce, difficulty):
|
|
"""Solve proof-of-work challenge."""
|
|
n = 0
|
|
target_bytes = (difficulty + 7) // 8
|
|
while True:
|
|
work = f"{nonce}:{n}".encode()
|
|
hash_bytes = hashlib.sha256(work).digest()
|
|
zero_bits = 0
|
|
for byte in hash_bytes[: target_bytes + 1]:
|
|
if byte == 0:
|
|
zero_bits += 8
|
|
else:
|
|
zero_bits += 8 - byte.bit_length()
|
|
break
|
|
if zero_bits >= difficulty:
|
|
return n
|
|
n += 1
|
|
|
|
|
|
def get_pow_headers():
|
|
"""Get PoW challenge and solve it."""
|
|
status, body, _ = request(f"{BASE_URL}/challenge")
|
|
if status != 200:
|
|
return {}
|
|
data = json.loads(body)
|
|
if not data.get("enabled"):
|
|
return {}
|
|
solution = solve_pow(data["nonce"], data["difficulty"])
|
|
return {
|
|
"X-PoW-Token": data["token"],
|
|
"X-PoW-Solution": str(solution),
|
|
}
|
|
|
|
|
|
def random_content(size=1024):
|
|
"""Generate random content."""
|
|
return os.urandom(size)
|
|
|
|
|
|
def run_tests() -> dict[str, Any]:
|
|
"""Run comprehensive pentest suite."""
|
|
results: dict[str, Any] = {"passed": 0, "failed": 0, "tests": []}
|
|
paste_ids = []
|
|
|
|
def log_test(name, passed, details=""):
|
|
status = "PASS" if passed else "FAIL"
|
|
results["passed" if passed else "failed"] += 1
|
|
results["tests"].append({"name": name, "passed": passed, "details": details})
|
|
print(f" [{status}] {name}")
|
|
if details and not passed:
|
|
print(f" {details[:100]}")
|
|
|
|
print("\n" + "=" * 60)
|
|
print("PENETRATION TESTING SESSION")
|
|
print("=" * 60)
|
|
|
|
# Phase 1: Reconnaissance
|
|
print("\n[Phase 1] Reconnaissance")
|
|
print("-" * 40)
|
|
|
|
status, body, _ = request(f"{BASE_URL}/")
|
|
log_test("GET / returns API info", status == 200)
|
|
|
|
status, body, _ = request(f"{BASE_URL}/health")
|
|
log_test("GET /health returns ok", status == 200)
|
|
|
|
status, body, _ = request(f"{BASE_URL}/challenge")
|
|
log_test("GET /challenge returns PoW data", status == 200)
|
|
|
|
status, body, _ = request(f"{BASE_URL}/client")
|
|
log_test("GET /client returns CLI", status == 200 and len(body) > 10000)
|
|
|
|
status, body, _ = request(f"{BASE_URL}/metrics")
|
|
log_test("GET /metrics returns prometheus data", status == 200)
|
|
|
|
# Phase 2: Paste Creation
|
|
print("\n[Phase 2] Paste Creation")
|
|
print("-" * 40)
|
|
|
|
# Create normal paste
|
|
pow_headers = get_pow_headers()
|
|
content = b"test paste content"
|
|
status, body, _ = request(f"{BASE_URL}/", "POST", content, pow_headers)
|
|
if status == 201:
|
|
data = json.loads(body)
|
|
paste_ids.append(data["id"])
|
|
log_test("Create paste with PoW", True)
|
|
else:
|
|
log_test("Create paste with PoW", False, body.decode()[:100])
|
|
|
|
# Create burn-after-read paste
|
|
pow_headers = get_pow_headers()
|
|
pow_headers["X-Burn-After-Read"] = "true"
|
|
status, body, _ = request(f"{BASE_URL}/", "POST", b"burn content", pow_headers)
|
|
burn_id = None
|
|
if status == 201:
|
|
data = json.loads(body)
|
|
burn_id = data["id"]
|
|
log_test("Create burn-after-read paste", True)
|
|
else:
|
|
log_test("Create burn-after-read paste", False)
|
|
|
|
# Create password-protected paste
|
|
pow_headers = get_pow_headers()
|
|
pow_headers["X-Paste-Password"] = "secret123"
|
|
status, body, _ = request(f"{BASE_URL}/", "POST", b"protected content", pow_headers)
|
|
pw_id = None
|
|
if status == 201:
|
|
data = json.loads(body)
|
|
pw_id = data["id"]
|
|
paste_ids.append(pw_id)
|
|
log_test("Create password-protected paste", True)
|
|
else:
|
|
log_test("Create password-protected paste", False)
|
|
|
|
# Create expiring paste
|
|
pow_headers = get_pow_headers()
|
|
pow_headers["X-Expiry"] = "300"
|
|
status, body, _ = request(f"{BASE_URL}/", "POST", b"expiring content", pow_headers)
|
|
if status == 201:
|
|
data = json.loads(body)
|
|
paste_ids.append(data["id"])
|
|
log_test("Create paste with expiry", True)
|
|
else:
|
|
log_test("Create paste with expiry", False)
|
|
|
|
# Phase 3: Paste Retrieval
|
|
print("\n[Phase 3] Paste Retrieval")
|
|
print("-" * 40)
|
|
|
|
if paste_ids:
|
|
pid = paste_ids[0]
|
|
status, body, _ = request(f"{BASE_URL}/{pid}")
|
|
log_test("GET paste metadata", status == 200)
|
|
|
|
status, body, _ = request(f"{BASE_URL}/{pid}/raw")
|
|
log_test("GET paste raw content", status == 200)
|
|
|
|
status, body, _ = request(f"{BASE_URL}/{pid}", "HEAD")
|
|
log_test("HEAD request for paste", status == 200)
|
|
|
|
# Test burn-after-read
|
|
if burn_id:
|
|
status, body, _ = request(f"{BASE_URL}/{burn_id}/raw")
|
|
first_read = status == 200
|
|
status, body, _ = request(f"{BASE_URL}/{burn_id}/raw")
|
|
second_read = status == 404
|
|
log_test("Burn-after-read works", first_read and second_read)
|
|
|
|
# Test password protection
|
|
if pw_id:
|
|
status, body, _ = request(f"{BASE_URL}/{pw_id}/raw")
|
|
log_test("Password-protected paste requires auth", status == 401)
|
|
|
|
status, body, _ = request(
|
|
f"{BASE_URL}/{pw_id}/raw", headers={"X-Paste-Password": "wrongpassword"}
|
|
)
|
|
log_test("Wrong password rejected", status == 403)
|
|
|
|
status, body, _ = request(
|
|
f"{BASE_URL}/{pw_id}/raw", headers={"X-Paste-Password": "secret123"}
|
|
)
|
|
log_test("Correct password accepted", status == 200)
|
|
|
|
# Phase 4: Error Handling
|
|
print("\n[Phase 4] Error Handling")
|
|
print("-" * 40)
|
|
|
|
status, body, _ = request(f"{BASE_URL}/nonexistent123")
|
|
log_test("Non-existent paste returns 404", status == 404)
|
|
|
|
status, body, _ = request(f"{BASE_URL}/!!!invalid!!!")
|
|
log_test("Invalid paste ID rejected", status == 400 or status == 404)
|
|
|
|
status, body, _ = request(f"{BASE_URL}/", "POST", b"no pow")
|
|
log_test("POST without PoW rejected", status in (400, 429))
|
|
|
|
status, body, _ = request(
|
|
f"{BASE_URL}/", "POST", b"x", {"X-PoW-Token": "invalid", "X-PoW-Solution": "0"}
|
|
)
|
|
log_test("Invalid PoW token rejected", status == 400)
|
|
|
|
# Phase 5: Injection Attacks
|
|
print("\n[Phase 5] Injection Attacks")
|
|
print("-" * 40)
|
|
|
|
# SQL injection in paste ID
|
|
sqli_payloads = ["1' OR '1'='1", "1; DROP TABLE pastes;--", "1 UNION SELECT * FROM users"]
|
|
for payload in sqli_payloads:
|
|
status, body, _ = request(f"{BASE_URL}/{payload}")
|
|
log_test(f"SQLi rejected: {payload[:20]}", status in (400, 404))
|
|
|
|
# SSTI attempts
|
|
ssti_payloads = ["{{7*7}}", "${7*7}", "<%=7*7%>", "#{7*7}"]
|
|
pow_headers = get_pow_headers()
|
|
for payload in ssti_payloads:
|
|
status, body, _ = request(f"{BASE_URL}/", "POST", payload.encode(), pow_headers)
|
|
if status == 201:
|
|
data = json.loads(body)
|
|
_, content, _ = request(f"{BASE_URL}/{data['id']}/raw")
|
|
log_test("SSTI payload stored safely", b"49" not in content)
|
|
paste_ids.append(data["id"])
|
|
pow_headers = get_pow_headers()
|
|
|
|
# XSS in content
|
|
xss_payload = b"<script>alert('xss')</script>"
|
|
pow_headers = get_pow_headers()
|
|
status, body, _ = request(f"{BASE_URL}/", "POST", xss_payload, pow_headers)
|
|
if status == 201:
|
|
data = json.loads(body)
|
|
status, content, resp_headers = request(f"{BASE_URL}/{data['id']}/raw")
|
|
csp = resp_headers.get("Content-Security-Policy", "")
|
|
xco = resp_headers.get("X-Content-Type-Options", "")
|
|
log_test("XSS mitigated by headers", "nosniff" in xco and "default-src" in csp)
|
|
paste_ids.append(data["id"])
|
|
|
|
# Phase 6: Header Injection
|
|
print("\n[Phase 6] Header Injection")
|
|
print("-" * 40)
|
|
|
|
pow_headers = get_pow_headers()
|
|
pow_headers["X-Forwarded-For"] = "1.2.3.4, 5.6.7.8"
|
|
status, body, _ = request(f"{BASE_URL}/", "POST", b"xff test", pow_headers)
|
|
log_test("X-Forwarded-For handled safely", status in (201, 400, 429))
|
|
|
|
pow_headers = get_pow_headers()
|
|
pow_headers["Host"] = "evil.com"
|
|
status, body, _ = request(f"{BASE_URL}/", "POST", b"host test", pow_headers)
|
|
log_test("Host header override handled", status in (201, 400, 429))
|
|
|
|
# Phase 7: Rate Limiting
|
|
print("\n[Phase 7] Rate Limiting")
|
|
print("-" * 40)
|
|
|
|
# Make many rapid requests
|
|
for _ in range(100):
|
|
status, _, _ = request(f"{BASE_URL}/health")
|
|
if status == 429:
|
|
break
|
|
log_test("Rate limiting active on reads", True) # May or may not hit
|
|
|
|
# Phase 8: Size Limits
|
|
print("\n[Phase 8] Size Limits")
|
|
print("-" * 40)
|
|
|
|
# Try to exceed size limit
|
|
pow_headers = get_pow_headers()
|
|
large_content = random_content(4 * 1024 * 1024) # 4MB
|
|
status, body, _ = request(f"{BASE_URL}/", "POST", large_content, pow_headers)
|
|
log_test("Size limit enforced", status == 413 or status == 400)
|
|
|
|
# Phase 9: Concurrent Access
|
|
print("\n[Phase 9] Concurrent Access")
|
|
print("-" * 40)
|
|
|
|
def concurrent_request(_):
|
|
pow_h = get_pow_headers()
|
|
return request(f"{BASE_URL}/", "POST", b"concurrent", pow_h)
|
|
|
|
with ThreadPoolExecutor(max_workers=5) as executor:
|
|
futures = [executor.submit(concurrent_request, i) for i in range(10)]
|
|
statuses = [f.result()[0] for f in as_completed(futures)]
|
|
|
|
success_count = sum(1 for s in statuses if s == 201)
|
|
log_test("Concurrent requests handled", success_count > 0)
|
|
|
|
# Phase 10: MIME Type Detection
|
|
print("\n[Phase 10] MIME Type Detection")
|
|
print("-" * 40)
|
|
|
|
mime_tests = [
|
|
(b"\x89PNG\r\n\x1a\n", "image/png"),
|
|
(b"GIF89a", "image/gif"),
|
|
(b"%PDF-1.4", "application/pdf"),
|
|
(b"PK\x03\x04", "application/zip"),
|
|
(b"\x1f\x8b\x08", "application/gzip"),
|
|
]
|
|
|
|
for magic, expected_mime in mime_tests:
|
|
pow_headers = get_pow_headers()
|
|
status, body, _ = request(f"{BASE_URL}/", "POST", magic + b"\x00" * 100, pow_headers)
|
|
if status == 201:
|
|
data = json.loads(body)
|
|
status, info_body, _ = request(f"{BASE_URL}/{data['id']}")
|
|
if status == 200:
|
|
info = json.loads(info_body)
|
|
detected = info.get("mime_type", "")
|
|
log_test(f"MIME detection: {expected_mime}", detected == expected_mime)
|
|
paste_ids.append(data["id"])
|
|
|
|
# Summary
|
|
print("\n" + "=" * 60)
|
|
print("SUMMARY")
|
|
print("=" * 60)
|
|
print(f" Passed: {results['passed']}")
|
|
print(f" Failed: {results['failed']}")
|
|
print(f" Total: {results['passed'] + results['failed']}")
|
|
print(f" Pastes created: {len(paste_ids)}")
|
|
|
|
return results
|
|
|
|
|
|
if __name__ == "__main__":
|
|
start = time.time()
|
|
results = run_tests()
|
|
elapsed = time.time() - start
|
|
print(f"\n Elapsed: {elapsed:.2f}s")
|
|
sys.exit(0 if results["failed"] == 0 else 1)
|