#!/usr/bin/env python3 """FlaskPaste command-line client.""" import argparse import base64 import hashlib import json import os import sys import urllib.error import urllib.request from pathlib import Path # Optional encryption support try: from cryptography.hazmat.primitives.ciphers.aead import AESGCM HAS_CRYPTO = True except ImportError: HAS_CRYPTO = False def get_config(): """Load configuration from environment or config file.""" config = { "server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000"), "cert_sha1": os.environ.get("FLASKPASTE_CERT_SHA1", ""), } # Try config file config_file = Path.home() / ".config" / "fpaste" / "config" if config_file.exists(): for line in config_file.read_text().splitlines(): line = line.strip() if line and not line.startswith("#") and "=" in line: key, value = line.split("=", 1) key = key.strip().lower() value = value.strip().strip('"').strip("'") if key == "server": config["server"] = value elif key == "cert_sha1": config["cert_sha1"] = value return config def request(url, method="GET", data=None, headers=None): """Make HTTP request and return response.""" headers = headers or {} req = urllib.request.Request(url, data=data, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=30) as resp: return resp.status, resp.read(), dict(resp.headers) except urllib.error.HTTPError as e: return e.code, e.read(), dict(e.headers) except urllib.error.URLError as e: die(f"Connection failed: {e.reason}") def die(msg, code=1): """Print error and exit.""" print(f"error: {msg}", file=sys.stderr) sys.exit(code) def encrypt_content(plaintext): """Encrypt content with AES-256-GCM. Returns (ciphertext, key).""" if not HAS_CRYPTO: die("encryption requires 'cryptography' package: pip install cryptography") key = os.urandom(32) nonce = os.urandom(12) # 96-bit nonce for GCM aesgcm = AESGCM(key) ciphertext = aesgcm.encrypt(nonce, plaintext, None) return nonce + ciphertext, key def decrypt_content(blob, key): """Decrypt AES-256-GCM encrypted content.""" if not HAS_CRYPTO: die("decryption requires 'cryptography' package: pip install cryptography") if len(blob) < 12: die("encrypted content too short") nonce, ciphertext = blob[:12], blob[12:] aesgcm = AESGCM(key) try: return aesgcm.decrypt(nonce, ciphertext, None) except Exception: die("decryption failed (wrong key or corrupted data)") def encode_key(key): """Encode key as URL-safe base64.""" return base64.urlsafe_b64encode(key).decode().rstrip("=") def decode_key(encoded): """Decode URL-safe base64 key.""" # Add padding if needed padding = 4 - (len(encoded) % 4) if padding != 4: encoded += "=" * padding try: return base64.urlsafe_b64decode(encoded) except Exception: die("invalid encryption key in URL") def solve_pow(nonce, difficulty): """Solve proof-of-work challenge. Find a number N such that SHA256(nonce:N) has `difficulty` leading zero bits. """ n = 0 target_bytes = (difficulty + 7) // 8 # Bytes to check while True: work = f"{nonce}:{n}".encode() hash_bytes = hashlib.sha256(work).digest() # Count leading zero bits zero_bits = 0 for byte in hash_bytes[:target_bytes + 1]: if byte == 0: zero_bits += 8 else: zero_bits += (8 - byte.bit_length()) break if zero_bits >= difficulty: return n n += 1 # Progress indicator for high difficulty if n % 100000 == 0: print(f"\rsolving pow: {n} attempts...", end="", file=sys.stderr) return n def get_challenge(config): """Fetch PoW challenge from server.""" url = config["server"].rstrip("/") + "/challenge" status, body, _ = request(url) if status != 200: return None data = json.loads(body) if not data.get("enabled"): return None return data def cmd_create(args, config): """Create a new paste.""" # Read content from file or stdin if args.file: if args.file == "-": content = sys.stdin.buffer.read() else: path = Path(args.file) if not path.exists(): die(f"file not found: {args.file}") content = path.read_bytes() else: # No file specified, read from stdin if sys.stdin.isatty(): die("no input provided (pipe data or specify file)") content = sys.stdin.buffer.read() if not content: die("empty content") # Encrypt content if requested encryption_key = None if args.encrypt: if not args.quiet: print("encrypting...", end="", file=sys.stderr) content, encryption_key = encrypt_content(content) if not args.quiet: print(" done", file=sys.stderr) headers = {} if config["cert_sha1"]: headers["X-SSL-Client-SHA1"] = config["cert_sha1"] # Get and solve PoW challenge if required challenge = get_challenge(config) if challenge: if not args.quiet: print(f"solving pow (difficulty={challenge['difficulty']})...", end="", file=sys.stderr) solution = solve_pow(challenge["nonce"], challenge["difficulty"]) if not args.quiet: print(" done", file=sys.stderr) headers["X-PoW-Token"] = challenge["token"] headers["X-PoW-Solution"] = str(solution) url = config["server"].rstrip("/") + "/" status, body, _ = request(url, method="POST", data=content, headers=headers) if status == 201: data = json.loads(body) # Append encryption key to URL fragment if encrypted key_fragment = "" if encryption_key: key_fragment = "#" + encode_key(encryption_key) if args.raw: print(config["server"].rstrip("/") + data["raw"] + key_fragment) elif args.quiet: print(data["id"] + key_fragment) else: print(config["server"].rstrip("/") + data["url"] + key_fragment) else: try: err = json.loads(body).get("error", body.decode()) except (json.JSONDecodeError, UnicodeDecodeError): err = body.decode(errors="replace") die(f"create failed ({status}): {err}") def cmd_get(args, config): """Retrieve a paste.""" # Parse URL for paste ID and optional encryption key fragment url_input = args.id encryption_key = None # Extract key from URL fragment (#...) if "#" in url_input: url_input, key_encoded = url_input.rsplit("#", 1) if key_encoded: encryption_key = decode_key(key_encoded) paste_id = url_input.split("/")[-1] # Handle full URLs base = config["server"].rstrip("/") if args.meta: url = f"{base}/{paste_id}" status, body, _ = request(url) if status == 200: data = json.loads(body) print(f"id: {data['id']}") print(f"mime_type: {data['mime_type']}") print(f"size: {data['size']}") print(f"created_at: {data['created_at']}") if encryption_key: print(f"encrypted: yes (key in URL)") else: die(f"not found: {paste_id}") else: url = f"{base}/{paste_id}/raw" status, body, headers = request(url) if status == 200: # Decrypt if encryption key was provided if encryption_key: body = decrypt_content(body, encryption_key) if args.output: Path(args.output).write_bytes(body) print(f"saved: {args.output}", file=sys.stderr) else: # Write binary to stdout sys.stdout.buffer.write(body) # Add newline if content doesn't end with one and stdout is tty if sys.stdout.isatty() and body and not body.endswith(b"\n"): sys.stdout.buffer.write(b"\n") else: die(f"not found: {paste_id}") def cmd_delete(args, config): """Delete a paste.""" if not config["cert_sha1"]: die("authentication required (set FLASKPASTE_CERT_SHA1)") paste_id = args.id.split("/")[-1] base = config["server"].rstrip("/") url = f"{base}/{paste_id}" headers = {"X-SSL-Client-SHA1": config["cert_sha1"]} status, body, _ = request(url, method="DELETE", headers=headers) if status == 200: print(f"deleted: {paste_id}") elif status == 404: die(f"not found: {paste_id}") elif status == 403: die("permission denied (not owner)") elif status == 401: die("authentication failed") else: die(f"delete failed ({status})") def cmd_info(args, config): """Show server info.""" url = config["server"].rstrip("/") + "/" status, body, _ = request(url) if status == 200: data = json.loads(body) print(f"server: {config['server']}") print(f"name: {data.get('name', 'unknown')}") print(f"version: {data.get('version', 'unknown')}") else: die("failed to connect to server") def main(): parser = argparse.ArgumentParser( prog="fpaste", description="FlaskPaste command-line client", ) parser.add_argument( "-s", "--server", help="server URL (default: $FLASKPASTE_SERVER or http://localhost:5000)", ) subparsers = parser.add_subparsers(dest="command", metavar="command") # create p_create = subparsers.add_parser("create", aliases=["c", "new"], help="create paste") p_create.add_argument("file", nargs="?", help="file to upload (- for stdin)") p_create.add_argument("-e", "--encrypt", action="store_true", help="encrypt content (E2E)") p_create.add_argument("-r", "--raw", action="store_true", help="output raw URL") p_create.add_argument("-q", "--quiet", action="store_true", help="output ID only") # get p_get = subparsers.add_parser("get", aliases=["g"], help="retrieve paste") p_get.add_argument("id", help="paste ID or URL") p_get.add_argument("-o", "--output", help="save to file") p_get.add_argument("-m", "--meta", action="store_true", help="show metadata only") # delete p_delete = subparsers.add_parser("delete", aliases=["d", "rm"], help="delete paste") p_delete.add_argument("id", help="paste ID or URL") # info subparsers.add_parser("info", aliases=["i"], help="show server info") args = parser.parse_args() config = get_config() if args.server: config["server"] = args.server if not args.command: # Default: create from stdin if data is piped if not sys.stdin.isatty(): args.command = "create" args.file = None args.encrypt = False args.raw = False args.quiet = False else: parser.print_help() sys.exit(0) if args.command in ("create", "c", "new"): cmd_create(args, config) elif args.command in ("get", "g"): cmd_get(args, config) elif args.command in ("delete", "d", "rm"): cmd_delete(args, config) elif args.command in ("info", "i"): cmd_info(args, config) if __name__ == "__main__": main()