- PoW-authenticated paste creation and URL shortening via FlaskPaste - !paste <text> creates a paste, !shorten <url> shortens a URL - Module-level shorten_url/create_paste helpers for cross-plugin use - Alert plugin auto-shortens URLs in announcements and history output - Custom TLS CA cert support via secrets/flaskpaste/derp.crt - No SOCKS proxy -- direct urllib.request to FlaskPaste instance
219 lines
6.3 KiB
Python
219 lines
6.3 KiB
Python
"""Plugin: FlaskPaste integration -- paste creation and URL shortening."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import ssl
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
from derp.plugin import command
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
_DEFAULT_URL = "https://paste.mymx.me"
|
|
_TIMEOUT = 15
|
|
_CERT_DIR = Path(__file__).resolve().parent.parent / "secrets" / "flaskpaste"
|
|
|
|
|
|
# -- Internal helpers --------------------------------------------------------
|
|
|
|
def _get_base_url(bot) -> str:
|
|
"""Resolve FlaskPaste base URL from env or config, strip trailing slash."""
|
|
url = (os.environ.get("FLASKPASTE_URL", "")
|
|
or bot.config.get("flaskpaste", {}).get("url", _DEFAULT_URL))
|
|
return url.rstrip("/")
|
|
|
|
|
|
def _ssl_context() -> ssl.SSLContext:
|
|
"""Build SSL context with custom CA cert if available."""
|
|
cert_path = _CERT_DIR / "derp.crt"
|
|
if cert_path.exists():
|
|
ctx = ssl.create_default_context(cafile=str(cert_path))
|
|
else:
|
|
ctx = ssl.create_default_context()
|
|
return ctx
|
|
|
|
|
|
def _solve_pow(nonce: str, difficulty: int) -> int:
|
|
"""Find N where SHA256(nonce:N) has `difficulty` leading zero bits.
|
|
|
|
Blocking -- run in executor.
|
|
"""
|
|
prefix = f"{nonce}:".encode()
|
|
n = 0
|
|
while True:
|
|
digest = hashlib.sha256(prefix + str(n).encode()).digest()
|
|
leading = 0
|
|
for byte in digest:
|
|
if byte == 0:
|
|
leading += 8
|
|
else:
|
|
mask = 0x80
|
|
while mask and not (byte & mask):
|
|
leading += 1
|
|
mask >>= 1
|
|
break
|
|
if leading >= difficulty:
|
|
return n
|
|
n += 1
|
|
|
|
|
|
def _get_challenge(base_url: str) -> dict:
|
|
"""GET /challenge -- returns {nonce, difficulty, token}."""
|
|
ctx = _ssl_context()
|
|
req = urllib.request.Request(
|
|
f"{base_url}/challenge",
|
|
headers={"Accept": "application/json", "User-Agent": "derp-bot"},
|
|
)
|
|
with urllib.request.urlopen(req, timeout=_TIMEOUT, context=ctx) as resp:
|
|
return json.loads(resp.read())
|
|
|
|
|
|
def _create_paste(base_url: str, content: str) -> str:
|
|
"""Challenge + solve + POST / to create a paste. Returns paste URL."""
|
|
ch = _get_challenge(base_url)
|
|
nonce = ch["nonce"]
|
|
difficulty = ch["difficulty"]
|
|
token = ch["token"]
|
|
solution = _solve_pow(nonce, difficulty)
|
|
|
|
data = json.dumps({"content": content}).encode()
|
|
req = urllib.request.Request(
|
|
base_url,
|
|
data=data,
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"X-PoW-Token": token,
|
|
"X-PoW-Solution": str(solution),
|
|
"User-Agent": "derp-bot",
|
|
},
|
|
)
|
|
ctx = _ssl_context()
|
|
with urllib.request.urlopen(req, timeout=_TIMEOUT, context=ctx) as resp:
|
|
body = json.loads(resp.read())
|
|
# Response should contain the paste URL or ID
|
|
paste_id = body.get("id", "")
|
|
if paste_id:
|
|
return f"{base_url}/{paste_id}"
|
|
# Fallback: check for url field
|
|
return body.get("url", "")
|
|
|
|
|
|
def _shorten_url(base_url: str, url: str) -> str:
|
|
"""Challenge + solve + POST /s to shorten a URL. Returns short URL."""
|
|
ch = _get_challenge(base_url)
|
|
nonce = ch["nonce"]
|
|
difficulty = ch["difficulty"]
|
|
token = ch["token"]
|
|
solution = _solve_pow(nonce, difficulty)
|
|
|
|
data = json.dumps({"url": url}).encode()
|
|
req = urllib.request.Request(
|
|
f"{base_url}/s",
|
|
data=data,
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"X-PoW-Token": token,
|
|
"X-PoW-Solution": str(solution),
|
|
"User-Agent": "derp-bot",
|
|
},
|
|
)
|
|
ctx = _ssl_context()
|
|
with urllib.request.urlopen(req, timeout=_TIMEOUT, context=ctx) as resp:
|
|
body = json.loads(resp.read())
|
|
short_id = body.get("id", "")
|
|
if short_id:
|
|
return f"{base_url}/s/{short_id}"
|
|
return body.get("url", "")
|
|
|
|
|
|
# -- Public helpers (importable by other plugins) ----------------------------
|
|
|
|
def shorten_url(bot, url: str) -> str:
|
|
"""Shorten a URL via FlaskPaste. Returns short URL or original on failure."""
|
|
base_url = _get_base_url(bot)
|
|
try:
|
|
return _shorten_url(base_url, url)
|
|
except Exception as exc:
|
|
log.debug("flaskpaste shorten failed: %s", exc)
|
|
return url
|
|
|
|
|
|
def create_paste(bot, content: str) -> str | None:
|
|
"""Create a paste via FlaskPaste. Returns paste URL or None on failure."""
|
|
base_url = _get_base_url(bot)
|
|
try:
|
|
return _create_paste(base_url, content)
|
|
except Exception as exc:
|
|
log.debug("flaskpaste paste failed: %s", exc)
|
|
return None
|
|
|
|
|
|
# -- Commands ----------------------------------------------------------------
|
|
|
|
@command("paste", help="Create a paste: !paste <text>")
|
|
async def cmd_paste(bot, message):
|
|
"""Create a paste on FlaskPaste and return the URL.
|
|
|
|
Usage:
|
|
!paste some long text here
|
|
"""
|
|
parts = message.text.split(None, 1)
|
|
if len(parts) < 2:
|
|
await bot.reply(message, "Usage: !paste <text>")
|
|
return
|
|
|
|
content = parts[1]
|
|
base_url = _get_base_url(bot)
|
|
loop = asyncio.get_running_loop()
|
|
|
|
try:
|
|
url = await loop.run_in_executor(None, _create_paste, base_url, content)
|
|
except Exception as exc:
|
|
await bot.reply(message, f"paste failed: {exc}")
|
|
return
|
|
|
|
if url:
|
|
await bot.reply(message, url)
|
|
else:
|
|
await bot.reply(message, "paste failed: no URL returned")
|
|
|
|
|
|
@command("shorten", help="Shorten a URL: !shorten <url>")
|
|
async def cmd_shorten(bot, message):
|
|
"""Shorten a URL via FlaskPaste.
|
|
|
|
Usage:
|
|
!shorten https://very-long-url.example.com/path
|
|
"""
|
|
parts = message.text.split(None, 1)
|
|
if len(parts) < 2:
|
|
await bot.reply(message, "Usage: !shorten <url>")
|
|
return
|
|
|
|
target_url = parts[1].strip()
|
|
if not target_url.startswith(("http://", "https://")):
|
|
await bot.reply(message, "URL must start with http:// or https://")
|
|
return
|
|
|
|
base_url = _get_base_url(bot)
|
|
loop = asyncio.get_running_loop()
|
|
|
|
try:
|
|
short = await loop.run_in_executor(
|
|
None, _shorten_url, base_url, target_url,
|
|
)
|
|
except Exception as exc:
|
|
await bot.reply(message, f"shorten failed: {exc}")
|
|
return
|
|
|
|
if short:
|
|
await bot.reply(message, short)
|
|
else:
|
|
await bot.reply(message, "shorten failed: no URL returned")
|