#!/usr/bin/env python3 """FlaskPaste command-line client.""" from __future__ import annotations import argparse import base64 import hashlib import json import os import ssl import sys import time import urllib.error import urllib.request from datetime import UTC, datetime, timedelta from pathlib import Path from typing import TYPE_CHECKING, Any, NoReturn if TYPE_CHECKING: from collections.abc import Mapping # Optional cryptography support (for encryption and cert generation) try: from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec, rsa from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.x509.oid import NameOID HAS_CRYPTO = True except ImportError: HAS_CRYPTO = False # Constants CONFIG_DIR = Path.home() / ".config" / "fpaste" CONFIG_FILE = CONFIG_DIR / "config" CONFIG_KEYS = frozenset({"server", "cert_sha1", "client_cert", "client_key", "ca_cert"}) MIME_EXTENSIONS: dict[str, str] = { "text/plain": ".txt", "text/html": ".html", "text/css": ".css", "text/javascript": ".js", "text/markdown": ".md", "text/x-python": ".py", "application/json": ".json", "application/xml": ".xml", "application/javascript": ".js", "application/octet-stream": ".bin", "image/png": ".png", "image/jpeg": ".jpg", "image/gif": ".gif", "image/webp": ".webp", "image/svg+xml": ".svg", "application/pdf": ".pdf", "application/zip": ".zip", "application/gzip": ".gz", "application/x-tar": ".tar", } FILE_EXTENSIONS = frozenset( { "txt", "md", "py", "js", "json", "yaml", "yml", "xml", "html", "css", "sh", "bash", "c", "cpp", "h", "go", "rs", "java", "rb", "php", "sql", "log", "conf", "cfg", "ini", "png", "jpg", "jpeg", "gif", "pdf", "zip", "tar", "gz", } ) DATE_FORMATS = ( "%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", ) # ----------------------------------------------------------------------------- # Core utilities # ----------------------------------------------------------------------------- def die(msg: str, code: int = 1) -> NoReturn: """Print error and exit.""" print(f"error: {msg}", file=sys.stderr) sys.exit(code) def request( url: str, method: str = "GET", data: bytes | None = None, headers: dict[str, str] | None = None, ssl_context: ssl.SSLContext | None = None, ) -> tuple[int, bytes, dict[str, str]]: """Make HTTP request and return (status, body, headers).""" headers = headers or {} req = urllib.request.Request(url, data=data, headers=headers, method=method) # noqa: S310 try: with urllib.request.urlopen(req, timeout=30, context=ssl_context) as resp: # noqa: S310 return resp.status, resp.read(), dict(resp.headers) except urllib.error.HTTPError as e: return e.code, e.read(), dict(e.headers) except urllib.error.URLError as e: die(f"Connection failed: {e.reason}") def parse_error(body: bytes, default: str = "request failed") -> str: """Parse error message from JSON response body.""" try: return json.loads(body).get("error", default) except (json.JSONDecodeError, UnicodeDecodeError): return default # ----------------------------------------------------------------------------- # Configuration # ----------------------------------------------------------------------------- def read_config_file(path: Path | None = None) -> dict[str, str]: """Read config file and return key-value pairs.""" path = path or CONFIG_FILE result: dict[str, str] = {} if not path.exists(): return result for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip().lower() if key in CONFIG_KEYS: result[key] = value.strip().strip('"').strip("'") return result def write_config_file( updates: dict[str, str], path: Path | None = None, ) -> Path: """Update config file with new values, preserving existing entries.""" path = path or CONFIG_FILE path.parent.mkdir(parents=True, exist_ok=True) existing = read_config_file(path) existing.update(updates) lines = [f"{k} = {v}" for k, v in sorted(existing.items())] path.write_text("\n".join(lines) + "\n") return path def get_config() -> dict[str, Any]: """Load configuration from environment and config file.""" config: dict[str, Any] = { "server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000"), "cert_sha1": os.environ.get("FLASKPASTE_CERT_SHA1", ""), "client_cert": os.environ.get("FLASKPASTE_CLIENT_CERT", ""), "client_key": os.environ.get("FLASKPASTE_CLIENT_KEY", ""), "ca_cert": os.environ.get("FLASKPASTE_CA_CERT", ""), } # Config file values (lower priority than environment) file_config = read_config_file() for key in CONFIG_KEYS: if not config.get(key) and key in file_config: config[key] = file_config[key] return config def create_ssl_context(config: Mapping[str, Any]) -> ssl.SSLContext | None: """Create SSL context for mTLS if certificates are configured.""" client_cert = config.get("client_cert", "") if not client_cert: return None ctx = ssl.create_default_context() if ca_cert := config.get("ca_cert", ""): ctx.load_verify_locations(ca_cert) try: ctx.load_cert_chain(certfile=client_cert, keyfile=config.get("client_key") or None) except ssl.SSLError as e: die(f"failed to load client certificate: {e}") except FileNotFoundError as e: die(f"certificate file not found: {e}") return ctx # ----------------------------------------------------------------------------- # Encryption # ----------------------------------------------------------------------------- def encrypt_content(plaintext: bytes) -> tuple[bytes, bytes]: """Encrypt content with AES-256-GCM. Returns (ciphertext, key).""" if not HAS_CRYPTO: die("encryption requires 'cryptography' package: pip install cryptography") key = os.urandom(32) nonce = os.urandom(12) aesgcm = AESGCM(key) ciphertext = aesgcm.encrypt(nonce, plaintext, None) return nonce + ciphertext, key def decrypt_content(blob: bytes, key: bytes) -> bytes: """Decrypt AES-256-GCM encrypted content.""" if not HAS_CRYPTO: die("decryption requires 'cryptography' package: pip install cryptography") if len(blob) < 12: die("encrypted content too short") nonce, ciphertext = blob[:12], blob[12:] aesgcm = AESGCM(key) try: return aesgcm.decrypt(nonce, ciphertext, None) except Exception: die("decryption failed (wrong key or corrupted data)") def encode_key(key: bytes) -> str: """Encode key as URL-safe base64.""" return base64.urlsafe_b64encode(key).decode().rstrip("=") def decode_key(encoded: str) -> bytes: """Decode URL-safe base64 key.""" padding = 4 - (len(encoded) % 4) if padding != 4: encoded += "=" * padding try: return base64.urlsafe_b64decode(encoded) except Exception: die("invalid encryption key in URL") # ----------------------------------------------------------------------------- # Proof-of-work # ----------------------------------------------------------------------------- def solve_pow(nonce: str, difficulty: int) -> int: """Solve proof-of-work: find N where SHA256(nonce:N) has `difficulty` leading zero bits.""" n = 0 target_bytes = (difficulty + 7) // 8 while True: work = f"{nonce}:{n}".encode() hash_bytes = hashlib.sha256(work).digest() zero_bits = 0 for byte in hash_bytes[: target_bytes + 1]: if byte == 0: zero_bits += 8 else: zero_bits += 8 - byte.bit_length() break if zero_bits >= difficulty: return n n += 1 if n % 100000 == 0: print(f"\rsolving pow: {n} attempts...", end="", file=sys.stderr) def get_challenge( config: Mapping[str, Any], endpoint: str = "/challenge", ) -> dict[str, Any] | None: """Fetch PoW challenge from server.""" url = config["server"].rstrip("/") + endpoint status, body, _ = request(url, ssl_context=config.get("ssl_context")) if status != 200: return None data = json.loads(body) return data if data.get("enabled") else None # ----------------------------------------------------------------------------- # Formatting utilities # ----------------------------------------------------------------------------- def format_size(size: int) -> str: """Format byte size as human-readable string.""" if size < 1024: return f"{size}B" if size < 1024 * 1024: return f"{size / 1024:.1f}K" return f"{size / (1024 * 1024):.1f}M" def format_timestamp(ts: int | float) -> str: """Format Unix timestamp as human-readable date.""" dt = datetime.fromtimestamp(ts, tz=UTC) return dt.strftime("%Y-%m-%d %H:%M") def format_time_remaining(expires_at: int | float | None) -> str: """Format time remaining until expiry.""" if not expires_at: return "" now = time.time() remaining = expires_at - now if remaining <= 0: return "expired" if remaining < 60: return f"{int(remaining)}s" if remaining < 3600: return f"{int(remaining / 60)}m" if remaining < 86400: hours = int(remaining / 3600) return f"{hours}h" days = int(remaining / 86400) if days >= 365: years = days // 365 return f"{years}y" return f"{days}d" def parse_date(date_str: str) -> int: """Parse date string to Unix timestamp.""" if not date_str: return 0 for fmt in DATE_FORMATS: try: dt = datetime.strptime(date_str, fmt).replace(tzinfo=UTC) return int(dt.timestamp()) except ValueError: continue try: return int(date_str) except ValueError: die(f"invalid date format: {date_str}") def get_extension_for_mime(mime_type: str) -> str: """Get file extension for MIME type.""" return MIME_EXTENSIONS.get(mime_type, ".bin") def format_paste_row(paste: dict[str, Any], show_owner: bool = False) -> str: """Format a paste as a table row.""" paste_id = paste["id"] mime_type = paste.get("mime_type", "unknown")[:16] size = format_size(paste.get("size", 0)) created = format_timestamp(paste.get("created_at", 0)) # Time remaining until expiry expires = format_time_remaining(paste.get("expires_at")) flags = [] if paste.get("burn_after_read"): flags.append("burn") if paste.get("password_protected"): flags.append("pass") flags_str = " ".join(flags) row = f"{paste_id:<12} {mime_type:<16} {size:>6} {created:<16} {expires:<8} {flags_str}" if show_owner and paste.get("owner"): row += f" {paste['owner'][:12]}" return row def print_paste_list( pastes: list[dict[str, Any]], summary: str, as_json: bool = False, data: dict[str, Any] | None = None, ) -> None: """Print a list of pastes in table or JSON format.""" if as_json: print(json.dumps(data or {"pastes": pastes}, indent=2)) return if not pastes: print("no pastes found") return # Check if owner data is present (admin view) show_owner = any(paste.get("owner") for paste in pastes) header = f"{'ID':<12} {'TYPE':<16} {'SIZE':>6} {'CREATED':<16} {'EXPIRES':<8} FLAGS" if show_owner: header += " OWNER" print(header) for paste in pastes: print(format_paste_row(paste, show_owner=show_owner)) print(f"\n{summary}") # ----------------------------------------------------------------------------- # Content helpers # ----------------------------------------------------------------------------- def read_content(file_arg: str | None) -> bytes: """Read content from file or stdin.""" if file_arg: if file_arg == "-": return sys.stdin.buffer.read() path = Path(file_arg) if not path.exists(): die(f"file not found: {file_arg}") return path.read_bytes() if sys.stdin.isatty(): die("no input provided (pipe data or specify file)") return sys.stdin.buffer.read() def prepare_content( content: bytes, encrypt: bool, quiet: bool = False, ) -> tuple[bytes, bytes | None]: """Optionally encrypt content. Returns (content, encryption_key or None).""" if not encrypt: return content, None if not HAS_CRYPTO: die("encryption requires 'cryptography' package (use -E to disable)") if not quiet: print("encrypting...", end="", file=sys.stderr) encrypted, key = encrypt_content(content) if not quiet: print(" done", file=sys.stderr) return encrypted, key def extract_paste_id(url_or_id: str) -> tuple[str, bytes | None]: """Extract paste ID and optional encryption key from URL or ID.""" encryption_key = None if "#" in url_or_id: url_or_id, key_encoded = url_or_id.rsplit("#", 1) if key_encoded: encryption_key = decode_key(key_encoded) paste_id = url_or_id.split("/")[-1] return paste_id, encryption_key def auth_headers(config: Mapping[str, Any]) -> dict[str, str]: """Build authentication headers.""" if cert_sha1 := config.get("cert_sha1"): return {"X-SSL-Client-SHA1": cert_sha1} return {} def require_auth(config: Mapping[str, Any]) -> None: """Ensure authentication is configured.""" if not config.get("cert_sha1"): die("authentication required (set FLASKPASTE_CERT_SHA1)") # ----------------------------------------------------------------------------- # Commands # ----------------------------------------------------------------------------- def cmd_create(args: argparse.Namespace, config: dict[str, Any]) -> None: """Create a new paste.""" content = read_content(args.file) if not content: die("empty content") content, encryption_key = prepare_content( content, encrypt=not getattr(args, "no_encrypt", False), quiet=args.quiet, ) # Build headers base_headers = auth_headers(config) if args.burn: base_headers["X-Burn-After-Read"] = "true" if args.expiry: base_headers["X-Expiry"] = str(args.expiry) if args.password: base_headers["X-Paste-Password"] = args.password url = config["server"].rstrip("/") + "/" max_retries = 5 last_error = "" for attempt in range(max_retries): headers = dict(base_headers) if challenge := get_challenge(config): if attempt > 0 and not args.quiet: print(f"retry {attempt}/{max_retries - 1}...", file=sys.stderr) if not args.quiet: diff = challenge["difficulty"] base_diff = challenge.get("base_difficulty", diff) elevated = challenge.get("elevated", False) msg = ( f"solving pow ({diff} bits, elevated from {base_diff})..." if elevated else f"solving pow ({diff} bits)..." ) print(msg, end="", file=sys.stderr) solution = solve_pow(challenge["nonce"], challenge["difficulty"]) if not args.quiet: print(" done", file=sys.stderr) headers["X-PoW-Token"] = challenge["token"] headers["X-PoW-Solution"] = str(solution) status, body, _ = request( url, method="POST", data=content, headers=headers, ssl_context=config.get("ssl_context") ) if status == 201: data = json.loads(body) key_fragment = f"#{encode_key(encryption_key)}" if encryption_key else "" base_url = config["server"].rstrip("/") if args.raw: print(base_url + data["raw"] + key_fragment) elif args.quiet: print(data["id"] + key_fragment) else: print(base_url + data["url"] + key_fragment) return last_error = parse_error(body, body.decode(errors="replace")) err_lower = last_error.lower() is_pow_error = status == 400 and ("pow" in err_lower or "proof-of-work" in err_lower) if not is_pow_error: die(f"create failed ({status}): {last_error}") if not args.quiet: print(f"pow rejected: {last_error}", file=sys.stderr) die(f"create failed after {max_retries} attempts: {last_error}") def cmd_get(args: argparse.Namespace, config: dict[str, Any]) -> None: """Retrieve a paste.""" paste_id, encryption_key = extract_paste_id(args.id) base = config["server"].rstrip("/") headers: dict[str, str] = {} if args.password: headers["X-Paste-Password"] = args.password if args.meta: url = f"{base}/{paste_id}" status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context")) if status == 200: data = json.loads(body) print(f"id: {data['id']}") print(f"mime_type: {data['mime_type']}") print(f"size: {data['size']}") print(f"created_at: {data['created_at']}") if encryption_key: print("encrypted: yes (key in URL)") if data.get("password_protected"): print("protected: yes (password required)") elif status == 401: die("password required (-p)") elif status == 403: die("invalid password") else: die(f"not found: {paste_id}") else: url = f"{base}/{paste_id}/raw" status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context")) if status == 200: if encryption_key: body = decrypt_content(body, encryption_key) if args.output: Path(args.output).write_bytes(body) print(f"saved: {args.output}", file=sys.stderr) else: sys.stdout.buffer.write(body) if sys.stdout.isatty() and body and not body.endswith(b"\n"): sys.stdout.buffer.write(b"\n") elif status == 401: die("password required (-p)") elif status == 403: die("invalid password") else: die(f"not found: {paste_id}") def cmd_delete(args: argparse.Namespace, config: dict[str, Any]) -> None: """Delete paste(s).""" require_auth(config) delete_all = getattr(args, "all", False) confirm_count = getattr(args, "confirm", None) paste_ids = [paste_id.split("/")[-1] for paste_id in (args.ids or [])] # Validate arguments if delete_all and paste_ids: die("cannot specify both --all and paste IDs") if not delete_all and not paste_ids: die("specify paste ID(s) or use --all") if delete_all: # Fetch all pastes to get count and IDs url = f"{config['server'].rstrip('/')}/pastes?all=1&limit=1000" status, body, _ = request( url, headers=auth_headers(config), ssl_context=config.get("ssl_context") ) if status == 401: die("authentication failed") if status != 200: die(f"failed to list pastes ({status})") data = json.loads(body) pastes = data.get("pastes", []) total = len(pastes) if total == 0: print("no pastes to delete") return # Require confirmation with expected count if confirm_count is None: die(f"--all requires --confirm {total} (found {total} pastes)") if confirm_count != total: die(f"confirmation mismatch: expected {confirm_count}, found {total}") paste_ids = [p["id"] for p in pastes] # Delete pastes deleted = 0 failed = 0 for paste_id in paste_ids: url = f"{config['server'].rstrip('/')}/{paste_id}" status, _, _ = request( url, method="DELETE", headers=auth_headers(config), ssl_context=config.get("ssl_context"), ) if status == 200: print(f"deleted: {paste_id}") deleted += 1 elif status == 404: print(f"not found: {paste_id}", file=sys.stderr) failed += 1 elif status == 403: print(f"permission denied: {paste_id}", file=sys.stderr) failed += 1 else: print(f"failed ({status}): {paste_id}", file=sys.stderr) failed += 1 # Summary for batch operations if len(paste_ids) > 1: print(f"\n{deleted} deleted, {failed} failed") def cmd_info(args: argparse.Namespace, config: dict[str, Any]) -> None: """Show server info.""" url = config["server"].rstrip("/") + "/" status, body, _ = request(url, ssl_context=config.get("ssl_context")) if status != 200: die("failed to connect to server") data = json.loads(body) print(f"server: {config['server']}") print(f"name: {data.get('name', 'unknown')}") print(f"version: {data.get('version', 'unknown')}") if challenge := get_challenge(config): difficulty = challenge.get("difficulty", 0) base_diff = challenge.get("base_difficulty", difficulty) if challenge.get("elevated"): print(f"pow: {difficulty} bits (elevated from {base_diff})") else: print(f"pow: {difficulty} bits") else: print("pow: disabled") def cmd_list(args: argparse.Namespace, config: dict[str, Any]) -> None: """List user's pastes.""" require_auth(config) params = [] if getattr(args, "all", False): params.append("all=1") if args.limit: params.append(f"limit={args.limit}") if args.offset: params.append(f"offset={args.offset}") url = f"{config['server'].rstrip('/')}/pastes" if params: url += "?" + "&".join(params) status, body, _ = request( url, headers=auth_headers(config), ssl_context=config.get("ssl_context") ) if status == 401: die("authentication failed") if status != 200: die(f"failed to list pastes ({status})") data = json.loads(body) pastes = data.get("pastes", []) summary = f"{data.get('count', 0)} of {data.get('total', 0)} pastes shown" print_paste_list(pastes, summary, as_json=args.json, data=data) def cmd_search(args: argparse.Namespace, config: dict[str, Any]) -> None: """Search user's pastes.""" require_auth(config) params = [] if args.type: params.append(f"type={args.type}") if args.after: params.append(f"after={parse_date(args.after)}") if args.before: params.append(f"before={parse_date(args.before)}") if args.limit: params.append(f"limit={args.limit}") url = f"{config['server'].rstrip('/')}/pastes" if params: url += "?" + "&".join(params) status, body, _ = request( url, headers=auth_headers(config), ssl_context=config.get("ssl_context") ) if status == 401: die("authentication failed") if status != 200: die(f"failed to search pastes ({status})") data = json.loads(body) pastes = data.get("pastes", []) summary = f"{data.get('count', 0)} matching pastes found" print_paste_list(pastes, summary, as_json=args.json, data=data) def cmd_update(args: argparse.Namespace, config: dict[str, Any]) -> None: """Update an existing paste.""" require_auth(config) paste_id, _ = extract_paste_id(args.id) url = f"{config['server'].rstrip('/')}/{paste_id}" headers = auth_headers(config) content: bytes | None = None encryption_key: bytes | None = None if args.file: raw_content = read_content(args.file) if not raw_content: die("empty content") content, encryption_key = prepare_content( raw_content, encrypt=not getattr(args, "no_encrypt", False), quiet=args.quiet, ) if args.password: headers["X-Paste-Password"] = args.password if args.remove_password: headers["X-Remove-Password"] = "true" if args.expiry: headers["X-Extend-Expiry"] = str(args.expiry) status, body, _ = request( url, method="PUT", data=content, headers=headers, ssl_context=config.get("ssl_context") ) if status == 200: data = json.loads(body) if args.quiet: print(paste_id) else: print(f"updated: {paste_id}") print(f" size: {data.get('size', 'unknown')}") print(f" type: {data.get('mime_type', 'unknown')}") if data.get("expires_at"): print(f" expires: {data.get('expires_at')}") if data.get("password_protected"): print(" password: protected") if content and encryption_key: base = config["server"].rstrip("/") print(f" key: {base}/{paste_id}#{encode_key(encryption_key)}") elif status == 400: die(parse_error(body, "bad request")) elif status == 401: die("authentication failed") elif status == 403: die("permission denied (not owner)") elif status == 404: die(f"not found: {paste_id}") else: die(f"update failed ({status})") def cmd_export(args: argparse.Namespace, config: dict[str, Any]) -> None: """Export user's pastes to a directory.""" require_auth(config) out_dir = Path(args.output) if args.output else Path("fpaste-export") out_dir.mkdir(parents=True, exist_ok=True) # Load key file keys: dict[str, str] = {} if args.keyfile: keyfile_path = Path(args.keyfile) if not keyfile_path.exists(): die(f"key file not found: {args.keyfile}") for line in keyfile_path.read_text().splitlines(): line = line.strip() if line and not line.startswith("#") and "=" in line: paste_id, key_encoded = line.split("=", 1) keys[paste_id.strip()] = key_encoded.strip() # Fetch paste list url = f"{config['server'].rstrip('/')}/pastes?limit=1000" status, body, _ = request( url, headers=auth_headers(config), ssl_context=config.get("ssl_context") ) if status == 401: die("authentication failed") if status != 200: die(f"failed to list pastes ({status})") pastes = json.loads(body).get("pastes", []) if not pastes: print("no pastes to export") return exported, skipped, errors = 0, 0, 0 manifest: list[dict[str, Any]] = [] for paste in pastes: paste_id = paste["id"] mime_type = paste.get("mime_type", "application/octet-stream") if not args.quiet: print(f"exporting {paste_id}...", end=" ", file=sys.stderr) if paste.get("burn_after_read"): if not args.quiet: print("skipped (burn-after-read)", file=sys.stderr) skipped += 1 continue if paste.get("password_protected"): if not args.quiet: print("skipped (password-protected)", file=sys.stderr) skipped += 1 continue raw_url = f"{config['server'].rstrip('/')}/{paste_id}/raw" status, content, _ = request( raw_url, headers=auth_headers(config), ssl_context=config.get("ssl_context") ) if status != 200: if not args.quiet: print(f"error ({status})", file=sys.stderr) errors += 1 continue decrypted = False if paste_id in keys: try: key = decode_key(keys[paste_id]) content = decrypt_content(content, key) decrypted = True except SystemExit: if not args.quiet: print("decryption failed, keeping encrypted", file=sys.stderr, end=" ") filename = f"{paste_id}{get_extension_for_mime(mime_type)}" (out_dir / filename).write_bytes(content) manifest.append( { "id": paste_id, "filename": filename, "mime_type": mime_type, "size": len(content), "created_at": paste.get("created_at"), "decrypted": decrypted, "encrypted": paste_id in keys and not decrypted, } ) if not args.quiet: status_msg = "decrypted" if decrypted else ("encrypted" if paste_id in keys else "ok") print(status_msg, file=sys.stderr) exported += 1 if args.manifest: manifest_path = out_dir / "manifest.json" manifest_path.write_text(json.dumps(manifest, indent=2)) if not args.quiet: print(f"manifest: {manifest_path}", file=sys.stderr) print(f"\nexported: {exported}, skipped: {skipped}, errors: {errors}") print(f"output: {out_dir}") def cmd_pki_status(args: argparse.Namespace, config: dict[str, Any]) -> None: """Show PKI status and CA information.""" url = config["server"].rstrip("/") + "/pki" status, body, _ = request(url, ssl_context=config.get("ssl_context")) if status == 404: die("PKI not enabled on this server") if status != 200: die(f"failed to get PKI status ({status})") data = json.loads(body) print(f"pki enabled: {data.get('enabled', False)}") print(f"ca exists: {data.get('ca_exists', False)}") if data.get("ca_exists"): print(f"common name: {data.get('common_name', 'unknown')}") print(f"fingerprint: {data.get('fingerprint_sha1', 'unknown')}") if data.get("created_at"): print(f"created: {data.get('created_at')}") if data.get("expires_at"): print(f"expires: {data.get('expires_at')}") print(f"download: {config['server'].rstrip('/')}{data.get('download', '/pki/ca.crt')}") elif hint := data.get("hint"): print(f"hint: {hint}") def cmd_pki_issue(args: argparse.Namespace, config: dict[str, Any]) -> None: """Request a new client certificate from the server CA.""" url = config["server"].rstrip("/") + "/pki/issue" headers = {"Content-Type": "application/json", **auth_headers(config)} payload = json.dumps({"common_name": args.name}).encode() status, body, _ = request( url, method="POST", data=payload, headers=headers, ssl_context=config.get("ssl_context") ) if status == 404: die(parse_error(body, "PKI not available")) if status == 400: die(parse_error(body, "bad request")) if status != 201: die(f"certificate issuance failed ({status})") result = json.loads(body) out_dir = Path(args.output) if args.output else CONFIG_DIR out_dir.mkdir(parents=True, exist_ok=True) key_file = out_dir / "client.key" cert_file = out_dir / "client.crt" if not args.force: if key_file.exists(): die(f"key file exists: {key_file} (use --force)") if cert_file.exists(): die(f"cert file exists: {cert_file} (use --force)") key_file.write_text(result["private_key_pem"]) key_file.chmod(0o600) cert_file.write_text(result["certificate_pem"]) fingerprint = result.get("fingerprint_sha1", "unknown") print(f"key: {key_file}", file=sys.stderr) print(f"certificate: {cert_file}", file=sys.stderr) print(f"fingerprint: {fingerprint}", file=sys.stderr) print(f"serial: {result.get('serial', 'unknown')}", file=sys.stderr) print(f"common name: {result.get('common_name', args.name)}", file=sys.stderr) if args.configure: cfg_path = write_config_file( { "client_cert": str(cert_file), "client_key": str(key_file), "cert_sha1": fingerprint, } ) print(f"config: {cfg_path} (updated)", file=sys.stderr) print(fingerprint) def cmd_pki_download(args: argparse.Namespace, config: dict[str, Any]) -> None: """Download the CA certificate from the server.""" url = config["server"].rstrip("/") + "/pki/ca.crt" status, body, _ = request(url, ssl_context=config.get("ssl_context")) if status == 404: die("CA certificate not available (PKI disabled or CA not generated)") if status != 200: die(f"failed to download CA certificate ({status})") if args.output: out_path = Path(args.output) out_path.write_bytes(body) print(f"saved: {out_path}", file=sys.stderr) if HAS_CRYPTO: cert = x509.load_pem_x509_certificate(body) fp = hashlib.sha1(cert.public_bytes(serialization.Encoding.DER)).hexdigest() # noqa: S324 print(f"fingerprint: {fp}", file=sys.stderr) if args.configure: cfg_path = write_config_file({"ca_cert": str(out_path)}) print(f"config: {cfg_path} (updated)", file=sys.stderr) else: sys.stdout.buffer.write(body) def cmd_register(args: argparse.Namespace, config: dict[str, Any]) -> None: """Register and obtain a client certificate from the server.""" if not HAS_CRYPTO: die("register requires 'cryptography' package: pip install cryptography") from cryptography.hazmat.primitives.serialization import pkcs12 url = config["server"].rstrip("/") + "/register" headers = {"Content-Type": "application/json"} payload: dict[str, str] = {} if args.name: payload["common_name"] = args.name if challenge := get_challenge(config, endpoint="/register/challenge"): if not args.quiet: print(f"solving pow ({challenge['difficulty']} bits)...", end="", file=sys.stderr) solution = solve_pow(challenge["nonce"], challenge["difficulty"]) if not args.quiet: print(" done", file=sys.stderr) headers["X-PoW-Token"] = challenge["token"] headers["X-PoW-Solution"] = str(solution) data = json.dumps(payload).encode() if payload else b"{}" status, body, resp_headers = request( url, method="POST", data=data, headers=headers, ssl_context=config.get("ssl_context") ) if status == 400: die(parse_error(body, "bad request")) if status == 500: die(parse_error(body, "server error")) if status != 200: die(f"registration failed ({status})") fingerprint = resp_headers.get("X-Fingerprint-SHA1", "unknown") out_dir = Path(args.output) if args.output else CONFIG_DIR out_dir.mkdir(parents=True, exist_ok=True) p12_file = out_dir / "client.p12" key_file = out_dir / "client.key" cert_file = out_dir / "client.crt" if not args.force: if p12_file.exists(): die(f"p12 file exists: {p12_file} (use --force)") if not args.p12_only: if key_file.exists(): die(f"key file exists: {key_file} (use --force)") if cert_file.exists(): die(f"cert file exists: {cert_file} (use --force)") p12_file.write_bytes(body) p12_file.chmod(0o600) print(f"pkcs12: {p12_file}", file=sys.stderr) if not args.p12_only: private_key, certificate, _ = pkcs12.load_key_and_certificates(body, None) if private_key is None or certificate is None: die("failed to parse PKCS#12 bundle") key_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), ) cert_pem = certificate.public_bytes(serialization.Encoding.PEM) key_file.write_bytes(key_pem) key_file.chmod(0o600) cert_file.write_bytes(cert_pem) print(f"key: {key_file}", file=sys.stderr) print(f"certificate: {cert_file}", file=sys.stderr) cn = certificate.subject.get_attributes_for_oid(NameOID.COMMON_NAME) if cn: print(f"common name: {cn[0].value}", file=sys.stderr) print(f"fingerprint: {fingerprint}", file=sys.stderr) if args.configure and not args.p12_only: cfg_path = write_config_file( { "client_cert": str(cert_file), "client_key": str(key_file), "cert_sha1": fingerprint, } ) print(f"config: {cfg_path} (updated)", file=sys.stderr) print(fingerprint) def cmd_cert(args: argparse.Namespace, config: dict[str, Any]) -> None: """Generate a self-signed client certificate for mTLS authentication.""" if not HAS_CRYPTO: die("certificate generation requires 'cryptography' package: pip install cryptography") out_dir = Path(args.output) if args.output else CONFIG_DIR out_dir.mkdir(parents=True, exist_ok=True) key_file = out_dir / "client.key" cert_file = out_dir / "client.crt" if not args.force: if key_file.exists(): die(f"key file exists: {key_file} (use --force)") if cert_file.exists(): die(f"cert file exists: {cert_file} (use --force)") # Generate private key if args.algorithm == "rsa": key_size = args.bits or 4096 print(f"generating {key_size}-bit RSA key...", file=sys.stderr) private_key = rsa.generate_private_key(public_exponent=65537, key_size=key_size) elif args.algorithm == "ec": curve_name = args.curve or "secp384r1" curves = { "secp256r1": ec.SECP256R1(), "secp384r1": ec.SECP384R1(), "secp521r1": ec.SECP521R1(), } if curve_name not in curves: die(f"unsupported curve: {curve_name} (use: secp256r1, secp384r1, secp521r1)") print(f"generating EC key ({curve_name})...", file=sys.stderr) private_key = ec.generate_private_key(curves[curve_name]) else: die(f"unsupported algorithm: {args.algorithm}") cn = args.name or os.environ.get("USER", "fpaste-client") subject = issuer = x509.Name( [ x509.NameAttribute(NameOID.COMMON_NAME, cn), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "FlaskPaste Client"), ] ) days = args.days or 365 now = datetime.now(UTC) cert_builder = ( x509.CertificateBuilder() .subject_name(subject) .issuer_name(issuer) .public_key(private_key.public_key()) .serial_number(x509.random_serial_number()) .not_valid_before(now) .not_valid_after(now + timedelta(days=days)) .add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True) .add_extension( x509.KeyUsage( digital_signature=True, key_encipherment=True, content_commitment=False, data_encipherment=False, key_agreement=False, key_cert_sign=False, crl_sign=False, encipher_only=False, decipher_only=False, ), critical=True, ) .add_extension( x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]), critical=False, ) ) print("signing certificate...", file=sys.stderr) certificate = cert_builder.sign(private_key, hashes.SHA256()) cert_der = certificate.public_bytes(serialization.Encoding.DER) fingerprint = hashlib.sha1(cert_der).hexdigest() # noqa: S324 key_encryption = ( serialization.BestAvailableEncryption(args.password_key.encode()) if args.password_key else serialization.NoEncryption() ) key_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=key_encryption, ) cert_pem = certificate.public_bytes(serialization.Encoding.PEM) key_file.write_bytes(key_pem) key_file.chmod(0o600) cert_file.write_bytes(cert_pem) print(f"key: {key_file}", file=sys.stderr) print(f"certificate: {cert_file}", file=sys.stderr) print(f"fingerprint: {fingerprint}", file=sys.stderr) print(f"valid for: {days} days", file=sys.stderr) print(f"common name: {cn}", file=sys.stderr) if args.configure: cfg_path = write_config_file( { "client_cert": str(cert_file), "client_key": str(key_file), "cert_sha1": fingerprint, } ) print(f"config: {cfg_path} (updated)", file=sys.stderr) print(fingerprint) # ----------------------------------------------------------------------------- # Argument parsing # ----------------------------------------------------------------------------- def is_file_path(arg: str) -> bool: """Check if argument looks like a file path.""" if not arg or arg.startswith("-"): return False if Path(arg).exists(): return True if "/" in arg or "\\" in arg: return True if "." in arg and not arg.startswith("."): ext = arg.rsplit(".", 1)[-1].lower() return ext in FILE_EXTENSIONS return False def build_parser() -> argparse.ArgumentParser: """Build and return the argument parser.""" parser = argparse.ArgumentParser( prog="fpaste", description="FlaskPaste command-line client", epilog="Shortcut: fpaste is equivalent to fpaste create ", ) parser.add_argument("-s", "--server", help="server URL (env: FLASKPASTE_SERVER)") subparsers = parser.add_subparsers(dest="command", metavar="command") # create p_create = subparsers.add_parser("create", aliases=["c", "new"], help="create paste") p_create.add_argument("file", nargs="?", help="file to upload (- for stdin)") p_create.add_argument("-E", "--no-encrypt", action="store_true", help="disable encryption") p_create.add_argument("-b", "--burn", action="store_true", help="burn after read") p_create.add_argument("-x", "--expiry", type=int, metavar="SEC", help="expiry in seconds") p_create.add_argument("-p", "--password", metavar="PASS", help="password protect") p_create.add_argument("-r", "--raw", action="store_true", help="output raw URL") p_create.add_argument("-q", "--quiet", action="store_true", help="output ID only") # get p_get = subparsers.add_parser("get", aliases=["g"], help="retrieve paste") p_get.add_argument("id", help="paste ID or URL") p_get.add_argument("-o", "--output", help="save to file") p_get.add_argument("-p", "--password", metavar="PASS", help="password for protected paste") p_get.add_argument("-m", "--meta", action="store_true", help="show metadata only") # delete p_delete = subparsers.add_parser("delete", aliases=["d", "rm"], help="delete paste(s)") p_delete.add_argument("ids", nargs="*", metavar="ID", help="paste ID(s) or URL(s)") p_delete.add_argument("-a", "--all", action="store_true", help="delete all pastes (admin)") p_delete.add_argument( "-c", "--confirm", type=int, metavar="N", help="confirm expected delete count" ) # info subparsers.add_parser("info", aliases=["i"], help="show server info") # list p_list = subparsers.add_parser("list", aliases=["ls"], help="list your pastes") p_list.add_argument("-a", "--all", action="store_true", help="list all pastes (admin only)") p_list.add_argument("-l", "--limit", type=int, metavar="N", help="max pastes (default: 50)") p_list.add_argument("-o", "--offset", type=int, metavar="N", help="skip first N pastes") p_list.add_argument("--json", action="store_true", help="output as JSON") # search p_search = subparsers.add_parser("search", aliases=["s", "find"], help="search your pastes") p_search.add_argument("-t", "--type", metavar="PATTERN", help="filter by MIME type (image/*)") p_search.add_argument("--after", metavar="DATE", help="created after (YYYY-MM-DD or timestamp)") p_search.add_argument("--before", metavar="DATE", help="created before (YYYY-MM-DD)") p_search.add_argument("-l", "--limit", type=int, metavar="N", help="max results (default: 50)") p_search.add_argument("--json", action="store_true", help="output as JSON") # update p_update = subparsers.add_parser("update", aliases=["u"], help="update existing paste") p_update.add_argument("id", help="paste ID or URL") p_update.add_argument("file", nargs="?", help="new content (- for stdin)") p_update.add_argument("-E", "--no-encrypt", action="store_true", help="disable encryption") p_update.add_argument("-p", "--password", metavar="PASS", help="set/change password") p_update.add_argument("--remove-password", action="store_true", help="remove password") p_update.add_argument("-x", "--expiry", type=int, metavar="SEC", help="extend expiry (seconds)") p_update.add_argument("-q", "--quiet", action="store_true", help="minimal output") # export p_export = subparsers.add_parser("export", help="export all pastes to directory") p_export.add_argument("-o", "--output", metavar="DIR", help="output directory") p_export.add_argument("-k", "--keyfile", metavar="FILE", help="key file (paste_id=key format)") p_export.add_argument("--manifest", action="store_true", help="write manifest.json") p_export.add_argument("-q", "--quiet", action="store_true", help="minimal output") # register p_register = subparsers.add_parser("register", help="register and get client certificate") p_register.add_argument("-n", "--name", metavar="CN", help="common name (optional)") p_register.add_argument("-o", "--output", metavar="DIR", help="output directory") p_register.add_argument("--configure", action="store_true", help="update config file") p_register.add_argument("--p12-only", action="store_true", help="save only PKCS#12") p_register.add_argument("-f", "--force", action="store_true", help="overwrite existing files") p_register.add_argument("-q", "--quiet", action="store_true", help="minimal output") # cert p_cert = subparsers.add_parser("cert", help="generate client certificate") p_cert.add_argument("-o", "--output", metavar="DIR", help="output directory") p_cert.add_argument( "-a", "--algorithm", choices=["rsa", "ec"], default="ec", help="key algorithm" ) p_cert.add_argument("-b", "--bits", type=int, metavar="N", help="RSA key size (default: 4096)") p_cert.add_argument( "-c", "--curve", metavar="CURVE", help="EC curve (secp256r1/secp384r1/secp521r1)" ) p_cert.add_argument("-d", "--days", type=int, metavar="N", help="validity period in days") p_cert.add_argument("-n", "--name", metavar="CN", help="common name (default: $USER)") p_cert.add_argument("--password-key", metavar="PASS", help="encrypt private key") p_cert.add_argument("--configure", action="store_true", help="update config file") p_cert.add_argument("-f", "--force", action="store_true", help="overwrite existing files") # pki p_pki = subparsers.add_parser("pki", help="PKI operations (server-issued certificates)") pki_sub = p_pki.add_subparsers(dest="pki_command", metavar="subcommand") pki_sub.add_parser("status", help="show PKI status and CA info") p_pki_issue = pki_sub.add_parser("issue", help="request certificate from server CA") p_pki_issue.add_argument( "-n", "--name", required=True, metavar="CN", help="common name (required)" ) p_pki_issue.add_argument("-o", "--output", metavar="DIR", help="output directory") p_pki_issue.add_argument("--configure", action="store_true", help="update config file") p_pki_issue.add_argument("-f", "--force", action="store_true", help="overwrite existing files") p_pki_download = pki_sub.add_parser("download", aliases=["dl"], help="download CA certificate") p_pki_download.add_argument("-o", "--output", metavar="FILE", help="save to file") p_pki_download.add_argument( "--configure", action="store_true", help="update config file (requires -o)" ) return parser # Command dispatch table COMMANDS: dict[str, Any] = { "create": cmd_create, "c": cmd_create, "new": cmd_create, "get": cmd_get, "g": cmd_get, "delete": cmd_delete, "d": cmd_delete, "rm": cmd_delete, "info": cmd_info, "i": cmd_info, "list": cmd_list, "ls": cmd_list, "search": cmd_search, "s": cmd_search, "find": cmd_search, "update": cmd_update, "u": cmd_update, "export": cmd_export, "register": cmd_register, "cert": cmd_cert, } PKI_COMMANDS: dict[str, Any] = { "status": cmd_pki_status, "issue": cmd_pki_issue, "download": cmd_pki_download, "dl": cmd_pki_download, } def main() -> None: """Main entry point.""" args_to_parse = sys.argv[1:] command_names = set(COMMANDS.keys()) | {"pki"} # Auto-insert "create" if first positional looks like a file insert_pos = 0 has_command = False file_pos = -1 i = 0 while i < len(args_to_parse): arg = args_to_parse[i] if arg in ("-s", "--server"): insert_pos = i + 2 i += 2 continue if arg in ("-h", "--help"): i += 1 insert_pos = i continue if arg.startswith("-"): i += 1 continue if arg in command_names: has_command = True break if is_file_path(arg): file_pos = i break i += 1 if not has_command and (file_pos >= 0 or not sys.stdin.isatty()): args_to_parse.insert(insert_pos, "create") parser = build_parser() args = parser.parse_args(args_to_parse) config = get_config() if args.server: config["server"] = args.server config["ssl_context"] = create_ssl_context(config) if not args.command: if not sys.stdin.isatty(): args.command = "create" args.file = None args.no_encrypt = False args.burn = False args.expiry = None args.password = None args.raw = False args.quiet = False else: parser.print_help() sys.exit(0) if args.command == "pki": if args.pki_command in PKI_COMMANDS: PKI_COMMANDS[args.pki_command](args, config) else: parser.parse_args(["pki", "--help"]) elif args.command in COMMANDS: COMMANDS[args.command](args, config) if __name__ == "__main__": main()