Files
flaskpaste/fpaste
Username 9ccd4225dd fpaste: add E2E encryption support
-e/--encrypt flag encrypts content with AES-256-GCM before upload.
Key is appended to URL fragment (#...), never sent to server.
Auto-detects key fragment on retrieval and decrypts locally.
2025-12-20 06:51:35 +01:00

372 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
"""FlaskPaste command-line client."""
import argparse
import base64
import hashlib
import json
import os
import sys
import urllib.error
import urllib.request
from pathlib import Path
# Optional encryption support
try:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
HAS_CRYPTO = True
except ImportError:
HAS_CRYPTO = False
def get_config():
"""Load configuration from environment or config file."""
config = {
"server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000"),
"cert_sha1": os.environ.get("FLASKPASTE_CERT_SHA1", ""),
}
# Try config file
config_file = Path.home() / ".config" / "fpaste" / "config"
if config_file.exists():
for line in config_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, value = line.split("=", 1)
key = key.strip().lower()
value = value.strip().strip('"').strip("'")
if key == "server":
config["server"] = value
elif key == "cert_sha1":
config["cert_sha1"] = value
return config
def request(url, method="GET", data=None, headers=None):
"""Make HTTP request and return response."""
headers = headers or {}
req = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status, resp.read(), dict(resp.headers)
except urllib.error.HTTPError as e:
return e.code, e.read(), dict(e.headers)
except urllib.error.URLError as e:
die(f"Connection failed: {e.reason}")
def die(msg, code=1):
"""Print error and exit."""
print(f"error: {msg}", file=sys.stderr)
sys.exit(code)
def encrypt_content(plaintext):
"""Encrypt content with AES-256-GCM. Returns (ciphertext, key)."""
if not HAS_CRYPTO:
die("encryption requires 'cryptography' package: pip install cryptography")
key = os.urandom(32)
nonce = os.urandom(12) # 96-bit nonce for GCM
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, plaintext, None)
return nonce + ciphertext, key
def decrypt_content(blob, key):
"""Decrypt AES-256-GCM encrypted content."""
if not HAS_CRYPTO:
die("decryption requires 'cryptography' package: pip install cryptography")
if len(blob) < 12:
die("encrypted content too short")
nonce, ciphertext = blob[:12], blob[12:]
aesgcm = AESGCM(key)
try:
return aesgcm.decrypt(nonce, ciphertext, None)
except Exception:
die("decryption failed (wrong key or corrupted data)")
def encode_key(key):
"""Encode key as URL-safe base64."""
return base64.urlsafe_b64encode(key).decode().rstrip("=")
def decode_key(encoded):
"""Decode URL-safe base64 key."""
# Add padding if needed
padding = 4 - (len(encoded) % 4)
if padding != 4:
encoded += "=" * padding
try:
return base64.urlsafe_b64decode(encoded)
except Exception:
die("invalid encryption key in URL")
def solve_pow(nonce, difficulty):
"""Solve proof-of-work challenge.
Find a number N such that SHA256(nonce:N) has `difficulty` leading zero bits.
"""
n = 0
target_bytes = (difficulty + 7) // 8 # Bytes to check
while True:
work = f"{nonce}:{n}".encode()
hash_bytes = hashlib.sha256(work).digest()
# Count leading zero bits
zero_bits = 0
for byte in hash_bytes[:target_bytes + 1]:
if byte == 0:
zero_bits += 8
else:
zero_bits += (8 - byte.bit_length())
break
if zero_bits >= difficulty:
return n
n += 1
# Progress indicator for high difficulty
if n % 100000 == 0:
print(f"\rsolving pow: {n} attempts...", end="", file=sys.stderr)
return n
def get_challenge(config):
"""Fetch PoW challenge from server."""
url = config["server"].rstrip("/") + "/challenge"
status, body, _ = request(url)
if status != 200:
return None
data = json.loads(body)
if not data.get("enabled"):
return None
return data
def cmd_create(args, config):
"""Create a new paste."""
# Read content from file or stdin
if args.file:
if args.file == "-":
content = sys.stdin.buffer.read()
else:
path = Path(args.file)
if not path.exists():
die(f"file not found: {args.file}")
content = path.read_bytes()
else:
# No file specified, read from stdin
if sys.stdin.isatty():
die("no input provided (pipe data or specify file)")
content = sys.stdin.buffer.read()
if not content:
die("empty content")
# Encrypt content if requested
encryption_key = None
if args.encrypt:
if not args.quiet:
print("encrypting...", end="", file=sys.stderr)
content, encryption_key = encrypt_content(content)
if not args.quiet:
print(" done", file=sys.stderr)
headers = {}
if config["cert_sha1"]:
headers["X-SSL-Client-SHA1"] = config["cert_sha1"]
# Get and solve PoW challenge if required
challenge = get_challenge(config)
if challenge:
if not args.quiet:
print(f"solving pow (difficulty={challenge['difficulty']})...", end="", file=sys.stderr)
solution = solve_pow(challenge["nonce"], challenge["difficulty"])
if not args.quiet:
print(" done", file=sys.stderr)
headers["X-PoW-Token"] = challenge["token"]
headers["X-PoW-Solution"] = str(solution)
url = config["server"].rstrip("/") + "/"
status, body, _ = request(url, method="POST", data=content, headers=headers)
if status == 201:
data = json.loads(body)
# Append encryption key to URL fragment if encrypted
key_fragment = ""
if encryption_key:
key_fragment = "#" + encode_key(encryption_key)
if args.raw:
print(config["server"].rstrip("/") + data["raw"] + key_fragment)
elif args.quiet:
print(data["id"] + key_fragment)
else:
print(config["server"].rstrip("/") + data["url"] + key_fragment)
else:
try:
err = json.loads(body).get("error", body.decode())
except (json.JSONDecodeError, UnicodeDecodeError):
err = body.decode(errors="replace")
die(f"create failed ({status}): {err}")
def cmd_get(args, config):
"""Retrieve a paste."""
# Parse URL for paste ID and optional encryption key fragment
url_input = args.id
encryption_key = None
# Extract key from URL fragment (#...)
if "#" in url_input:
url_input, key_encoded = url_input.rsplit("#", 1)
if key_encoded:
encryption_key = decode_key(key_encoded)
paste_id = url_input.split("/")[-1] # Handle full URLs
base = config["server"].rstrip("/")
if args.meta:
url = f"{base}/{paste_id}"
status, body, _ = request(url)
if status == 200:
data = json.loads(body)
print(f"id: {data['id']}")
print(f"mime_type: {data['mime_type']}")
print(f"size: {data['size']}")
print(f"created_at: {data['created_at']}")
if encryption_key:
print(f"encrypted: yes (key in URL)")
else:
die(f"not found: {paste_id}")
else:
url = f"{base}/{paste_id}/raw"
status, body, headers = request(url)
if status == 200:
# Decrypt if encryption key was provided
if encryption_key:
body = decrypt_content(body, encryption_key)
if args.output:
Path(args.output).write_bytes(body)
print(f"saved: {args.output}", file=sys.stderr)
else:
# Write binary to stdout
sys.stdout.buffer.write(body)
# Add newline if content doesn't end with one and stdout is tty
if sys.stdout.isatty() and body and not body.endswith(b"\n"):
sys.stdout.buffer.write(b"\n")
else:
die(f"not found: {paste_id}")
def cmd_delete(args, config):
"""Delete a paste."""
if not config["cert_sha1"]:
die("authentication required (set FLASKPASTE_CERT_SHA1)")
paste_id = args.id.split("/")[-1]
base = config["server"].rstrip("/")
url = f"{base}/{paste_id}"
headers = {"X-SSL-Client-SHA1": config["cert_sha1"]}
status, body, _ = request(url, method="DELETE", headers=headers)
if status == 200:
print(f"deleted: {paste_id}")
elif status == 404:
die(f"not found: {paste_id}")
elif status == 403:
die("permission denied (not owner)")
elif status == 401:
die("authentication failed")
else:
die(f"delete failed ({status})")
def cmd_info(args, config):
"""Show server info."""
url = config["server"].rstrip("/") + "/"
status, body, _ = request(url)
if status == 200:
data = json.loads(body)
print(f"server: {config['server']}")
print(f"name: {data.get('name', 'unknown')}")
print(f"version: {data.get('version', 'unknown')}")
else:
die("failed to connect to server")
def main():
parser = argparse.ArgumentParser(
prog="fpaste",
description="FlaskPaste command-line client",
)
parser.add_argument(
"-s", "--server",
help="server URL (default: $FLASKPASTE_SERVER or http://localhost:5000)",
)
subparsers = parser.add_subparsers(dest="command", metavar="command")
# create
p_create = subparsers.add_parser("create", aliases=["c", "new"], help="create paste")
p_create.add_argument("file", nargs="?", help="file to upload (- for stdin)")
p_create.add_argument("-e", "--encrypt", action="store_true", help="encrypt content (E2E)")
p_create.add_argument("-r", "--raw", action="store_true", help="output raw URL")
p_create.add_argument("-q", "--quiet", action="store_true", help="output ID only")
# get
p_get = subparsers.add_parser("get", aliases=["g"], help="retrieve paste")
p_get.add_argument("id", help="paste ID or URL")
p_get.add_argument("-o", "--output", help="save to file")
p_get.add_argument("-m", "--meta", action="store_true", help="show metadata only")
# delete
p_delete = subparsers.add_parser("delete", aliases=["d", "rm"], help="delete paste")
p_delete.add_argument("id", help="paste ID or URL")
# info
subparsers.add_parser("info", aliases=["i"], help="show server info")
args = parser.parse_args()
config = get_config()
if args.server:
config["server"] = args.server
if not args.command:
# Default: create from stdin if data is piped
if not sys.stdin.isatty():
args.command = "create"
args.file = None
args.encrypt = False
args.raw = False
args.quiet = False
else:
parser.print_help()
sys.exit(0)
if args.command in ("create", "c", "new"):
cmd_create(args, config)
elif args.command in ("get", "g"):
cmd_get(args, config)
elif args.command in ("delete", "d", "rm"):
cmd_delete(args, config)
elif args.command in ("info", "i"):
cmd_info(args, config)
if __name__ == "__main__":
main()