Files
derp/plugins/flaskpaste.py
user ffa75670e2 fix: use mTLS client cert to bypass PoW on flaskpaste
When secrets/flaskpaste/derp.crt and derp.key are present, load them
into the SSL context for mutual TLS auth and skip the PoW challenge
entirely. Fall back to PoW only when no client cert is available.
2026-02-16 23:13:09 +01:00

214 lines
6.4 KiB
Python

"""Plugin: FlaskPaste integration -- paste creation and URL shortening."""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import os
import ssl
import urllib.request
from pathlib import Path
from derp.plugin import command
log = logging.getLogger(__name__)
_DEFAULT_URL = "https://paste.mymx.me"
_TIMEOUT = 15
_CERT_DIR = Path(__file__).resolve().parent.parent / "secrets" / "flaskpaste"
# -- Internal helpers --------------------------------------------------------
def _get_base_url(bot) -> str:
"""Resolve FlaskPaste base URL from env or config, strip trailing slash."""
url = (os.environ.get("FLASKPASTE_URL", "")
or bot.config.get("flaskpaste", {}).get("url", _DEFAULT_URL))
return url.rstrip("/")
def _has_client_cert() -> bool:
"""Check if mTLS client cert and key are available."""
return (_CERT_DIR / "derp.crt").exists() and (_CERT_DIR / "derp.key").exists()
def _ssl_context() -> ssl.SSLContext:
"""Build SSL context, loading client cert for mTLS if available."""
ctx = ssl.create_default_context()
cert_path = _CERT_DIR / "derp.crt"
key_path = _CERT_DIR / "derp.key"
if cert_path.exists() and key_path.exists():
ctx.load_cert_chain(str(cert_path), str(key_path))
return ctx
def _solve_pow(nonce: str, difficulty: int) -> int:
"""Find N where SHA256(nonce:N) has `difficulty` leading zero bits.
Blocking -- run in executor.
"""
prefix = f"{nonce}:".encode()
n = 0
while True:
digest = hashlib.sha256(prefix + str(n).encode()).digest()
leading = 0
for byte in digest:
if byte == 0:
leading += 8
else:
mask = 0x80
while mask and not (byte & mask):
leading += 1
mask >>= 1
break
if leading >= difficulty:
return n
n += 1
def _get_challenge(base_url: str) -> dict:
"""GET /challenge -- returns {nonce, difficulty, token}."""
ctx = _ssl_context()
req = urllib.request.Request(
f"{base_url}/challenge",
headers={"Accept": "application/json", "User-Agent": "derp-bot"},
)
with urllib.request.urlopen(req, timeout=_TIMEOUT, context=ctx) as resp:
return json.loads(resp.read())
def _pow_headers(base_url: str) -> dict:
"""Solve PoW challenge and return auth headers. Empty dict if mTLS."""
if _has_client_cert():
return {}
ch = _get_challenge(base_url)
solution = _solve_pow(ch["nonce"], ch["difficulty"])
return {
"X-PoW-Token": ch["token"],
"X-PoW-Solution": str(solution),
}
def _create_paste(base_url: str, content: str) -> str:
"""POST / to create a paste. Uses mTLS or PoW. Returns paste URL."""
headers = {
"Content-Type": "application/json",
"User-Agent": "derp-bot",
**_pow_headers(base_url),
}
data = json.dumps({"content": content}).encode()
req = urllib.request.Request(base_url, data=data, headers=headers)
ctx = _ssl_context()
with urllib.request.urlopen(req, timeout=_TIMEOUT, context=ctx) as resp:
body = json.loads(resp.read())
paste_id = body.get("id", "")
if paste_id:
return f"{base_url}/{paste_id}"
return body.get("url", "")
def _shorten_url(base_url: str, url: str) -> str:
"""POST /s to shorten a URL. Uses mTLS or PoW. Returns short URL."""
headers = {
"Content-Type": "application/json",
"User-Agent": "derp-bot",
**_pow_headers(base_url),
}
data = json.dumps({"url": url}).encode()
req = urllib.request.Request(f"{base_url}/s", data=data, headers=headers)
ctx = _ssl_context()
with urllib.request.urlopen(req, timeout=_TIMEOUT, context=ctx) as resp:
body = json.loads(resp.read())
short_id = body.get("id", "")
if short_id:
return f"{base_url}/s/{short_id}"
return body.get("url", "")
# -- Public helpers (importable by other plugins) ----------------------------
def shorten_url(bot, url: str) -> str:
"""Shorten a URL via FlaskPaste. Returns short URL or original on failure."""
base_url = _get_base_url(bot)
try:
return _shorten_url(base_url, url)
except Exception as exc:
log.debug("flaskpaste shorten failed: %s", exc)
return url
def create_paste(bot, content: str) -> str | None:
"""Create a paste via FlaskPaste. Returns paste URL or None on failure."""
base_url = _get_base_url(bot)
try:
return _create_paste(base_url, content)
except Exception as exc:
log.debug("flaskpaste paste failed: %s", exc)
return None
# -- Commands ----------------------------------------------------------------
@command("paste", help="Create a paste: !paste <text>")
async def cmd_paste(bot, message):
"""Create a paste on FlaskPaste and return the URL.
Usage:
!paste some long text here
"""
parts = message.text.split(None, 1)
if len(parts) < 2:
await bot.reply(message, "Usage: !paste <text>")
return
content = parts[1]
base_url = _get_base_url(bot)
loop = asyncio.get_running_loop()
try:
url = await loop.run_in_executor(None, _create_paste, base_url, content)
except Exception as exc:
await bot.reply(message, f"paste failed: {exc}")
return
if url:
await bot.reply(message, url)
else:
await bot.reply(message, "paste failed: no URL returned")
@command("shorten", help="Shorten a URL: !shorten <url>")
async def cmd_shorten(bot, message):
"""Shorten a URL via FlaskPaste.
Usage:
!shorten https://very-long-url.example.com/path
"""
parts = message.text.split(None, 1)
if len(parts) < 2:
await bot.reply(message, "Usage: !shorten <url>")
return
target_url = parts[1].strip()
if not target_url.startswith(("http://", "https://")):
await bot.reply(message, "URL must start with http:// or https://")
return
base_url = _get_base_url(bot)
loop = asyncio.get_running_loop()
try:
short = await loop.run_in_executor(
None, _shorten_url, base_url, target_url,
)
except Exception as exc:
await bot.reply(message, f"shorten failed: {exc}")
return
if short:
await bot.reply(message, short)
else:
await bot.reply(message, "shorten failed: no URL returned")