#!/usr/bin/env python3 """FlaskPaste command-line client.""" from __future__ import annotations import argparse import base64 import hashlib import json import os import shutil import ssl import stat import subprocess import sys import time import urllib.error import urllib.request from datetime import UTC, datetime, timedelta from pathlib import Path from typing import TYPE_CHECKING, Any, NoReturn if TYPE_CHECKING: from collections.abc import Mapping # Optional cryptography support (for encryption and cert generation) try: from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec, rsa from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.x509.oid import NameOID HAS_CRYPTO = True except ImportError: HAS_CRYPTO = False # Constants CONFIG_DIR = Path.home() / ".config" / "fpaste" CONFIG_FILE = CONFIG_DIR / "config" CONFIG_KEYS = frozenset({"server", "endpoint", "cert_sha1", "client_cert", "client_key", "ca_cert"}) MIME_EXTENSIONS: dict[str, str] = { "text/plain": ".txt", "text/html": ".html", "text/css": ".css", "text/javascript": ".js", "text/markdown": ".md", "text/x-python": ".py", "application/json": ".json", "application/xml": ".xml", "application/javascript": ".js", "application/octet-stream": ".bin", "image/png": ".png", "image/jpeg": ".jpg", "image/gif": ".gif", "image/webp": ".webp", "image/svg+xml": ".svg", "application/pdf": ".pdf", "application/zip": ".zip", "application/gzip": ".gz", "application/x-tar": ".tar", } FILE_EXTENSIONS = frozenset( { "txt", "md", "py", "js", "json", "yaml", "yml", "xml", "html", "css", "sh", "bash", "c", "cpp", "h", "go", "rs", "java", "rb", "php", "sql", "log", "conf", "cfg", "ini", "png", "jpg", "jpeg", "gif", "pdf", "zip", "tar", "gz", } ) DATE_FORMATS = ( "%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", ) # ----------------------------------------------------------------------------- # Core utilities # ----------------------------------------------------------------------------- def die(msg: str, code: int = 1) -> NoReturn: """Print error and exit.""" print(f"error: {msg}", file=sys.stderr) sys.exit(code) def warn(msg: str) -> None: """Print warning to stderr.""" print(f"warning: {msg}", file=sys.stderr) def request( url: str, method: str = "GET", data: bytes | None = None, headers: dict[str, str] | None = None, ssl_context: ssl.SSLContext | None = None, ) -> tuple[int, bytes, dict[str, str]]: """Make HTTP request and return (status, body, headers).""" headers = headers or {} req = urllib.request.Request(url, data=data, headers=headers, method=method) # noqa: S310 try: with urllib.request.urlopen(req, timeout=30, context=ssl_context) as resp: # noqa: S310 # nosec B310 return resp.status, resp.read(), dict(resp.headers) except urllib.error.HTTPError as e: return e.code, e.read(), dict(e.headers) except urllib.error.URLError as e: die(f"Connection failed: {e.reason}") def parse_error(body: bytes, default: str = "request failed") -> str: """Parse error message from JSON response body.""" try: result = json.loads(body).get("error", default) return str(result) if result is not None else default except (json.JSONDecodeError, UnicodeDecodeError): return default # ----------------------------------------------------------------------------- # Configuration # ----------------------------------------------------------------------------- def check_config_permissions(path: Path) -> None: """CLI-003: Warn if config file has insecure permissions.""" try: mode = path.stat().st_mode # Warn if group or others can read (should be 600 or 640) if mode & stat.S_IROTH: warn(f"config file {path} is world-readable (mode {stat.filemode(mode)})") elif mode & stat.S_IRGRP: # Group-readable is less severe, only warn if also has secrets pass # Silent for group-readable, common in shared setups except OSError: pass # File may not exist yet or permission denied def read_config_file(path: Path | None = None) -> dict[str, str]: """Read config file and return key-value pairs.""" path = path or CONFIG_FILE result: dict[str, str] = {} if not path.exists(): return result # CLI-003: Check file permissions before reading check_config_permissions(path) for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip().lower() if key in CONFIG_KEYS: result[key] = value.strip().strip('"').strip("'") return result def write_config_file( updates: dict[str, str], path: Path | None = None, ) -> Path: """Update config file with new values, preserving existing entries.""" path = path or CONFIG_FILE path.parent.mkdir(parents=True, exist_ok=True) existing = read_config_file(path) existing.update(updates) lines = [f"{k} = {v}" for k, v in sorted(existing.items())] path.write_text("\n".join(lines) + "\n") return path def get_config() -> dict[str, Any]: """Load configuration from environment and config file.""" config: dict[str, Any] = { "server": os.environ.get("FLASKPASTE_SERVER", "https://paste.mymx.me"), "endpoint": os.environ.get("FLASKPASTE_ENDPOINT", ""), "cert_sha1": os.environ.get("FLASKPASTE_CERT_SHA1", ""), "client_cert": os.environ.get("FLASKPASTE_CLIENT_CERT", ""), "client_key": os.environ.get("FLASKPASTE_CLIENT_KEY", ""), "ca_cert": os.environ.get("FLASKPASTE_CA_CERT", ""), } # Config file values (lower priority than environment) file_config = read_config_file() for key in CONFIG_KEYS: if not config.get(key) and key in file_config: config[key] = file_config[key] return config def create_ssl_context(config: Mapping[str, Any]) -> ssl.SSLContext | None: """Create SSL context for mTLS if certificates are configured.""" client_cert = config.get("client_cert", "") if not client_cert: return None ctx = ssl.create_default_context() # CLI-002: Explicitly enable hostname verification (defense in depth) # create_default_context() sets these, but explicit is safer ctx.check_hostname = True ctx.verify_mode = ssl.CERT_REQUIRED if ca_cert := config.get("ca_cert", ""): ctx.load_verify_locations(ca_cert) try: ctx.load_cert_chain(certfile=client_cert, keyfile=config.get("client_key") or None) except ssl.SSLError as e: die(f"failed to load client certificate: {e}") except FileNotFoundError as e: die(f"certificate file not found: {e}") return ctx def build_url(config: Mapping[str, Any], path: str = "") -> str: """Build full URL from server, endpoint, and path.""" server = config["server"].rstrip("/") endpoint = config.get("endpoint", "").strip("/") # Preserve trailing slash for root path "/" trailing_slash = path == "/" or (path.endswith("/") and path != "/") path = path.strip("/") if endpoint: base = f"{server}/{endpoint}" if path: return f"{base}/{path}/" if trailing_slash else f"{base}/{path}" return f"{base}/" if trailing_slash else base if path: return f"{server}/{path}/" if trailing_slash else f"{server}/{path}" return f"{server}/" if trailing_slash else server # ----------------------------------------------------------------------------- # Encryption # ----------------------------------------------------------------------------- def encrypt_content(plaintext: bytes) -> tuple[bytes, bytes]: """Encrypt content with AES-256-GCM. Returns (ciphertext, key).""" if not HAS_CRYPTO: die("encryption requires 'cryptography' package: pip install cryptography") key = os.urandom(32) nonce = os.urandom(12) aesgcm = AESGCM(key) ciphertext = aesgcm.encrypt(nonce, plaintext, None) return nonce + ciphertext, key def decrypt_content(blob: bytes, key: bytes) -> bytes: """Decrypt AES-256-GCM encrypted content.""" if not HAS_CRYPTO: die("decryption requires 'cryptography' package: pip install cryptography") if len(blob) < 12: die("encrypted content too short") nonce, ciphertext = blob[:12], blob[12:] aesgcm = AESGCM(key) try: return aesgcm.decrypt(nonce, ciphertext, None) except Exception: die("decryption failed (wrong key or corrupted data)") def encode_key(key: bytes) -> str: """Encode key as URL-safe base64.""" return base64.urlsafe_b64encode(key).decode().rstrip("=") def decode_key(encoded: str) -> bytes: """Decode URL-safe base64 key.""" padding = 4 - (len(encoded) % 4) if padding != 4: encoded += "=" * padding try: return base64.urlsafe_b64decode(encoded) except Exception: die("invalid encryption key in URL") # ----------------------------------------------------------------------------- # Proof-of-work # ----------------------------------------------------------------------------- def solve_pow(nonce: str, difficulty: int) -> int: """Solve proof-of-work: find N where SHA256(nonce:N) has `difficulty` leading zero bits.""" n = 0 target_bytes = (difficulty + 7) // 8 while True: work = f"{nonce}:{n}".encode() hash_bytes = hashlib.sha256(work).digest() zero_bits = 0 for byte in hash_bytes[: target_bytes + 1]: if byte == 0: zero_bits += 8 else: zero_bits += 8 - byte.bit_length() break if zero_bits >= difficulty: return n n += 1 if n % 100000 == 0: print(f"\rsolving pow: {n} attempts...", end="", file=sys.stderr) def get_challenge( config: Mapping[str, Any], path: str = "/challenge", ) -> dict[str, Any] | None: """Fetch PoW challenge from server.""" url = build_url(config, path) status, body, _ = request(url, ssl_context=config.get("ssl_context")) if status != 200: return None data = json.loads(body) return data if data.get("enabled") else None # ----------------------------------------------------------------------------- # Formatting utilities # ----------------------------------------------------------------------------- def format_size(size: int) -> str: """Format byte size as human-readable string.""" if size < 1024: return f"{size}B" if size < 1024 * 1024: return f"{size / 1024:.1f}K" return f"{size / (1024 * 1024):.1f}M" def format_timestamp(ts: int | float) -> str: """Format Unix timestamp as human-readable date.""" dt = datetime.fromtimestamp(ts, tz=UTC) return dt.strftime("%Y-%m-%d %H:%M") def format_time_remaining(expires_at: int | float | None) -> str: """Format time remaining until expiry.""" if not expires_at: return "" now = time.time() remaining = expires_at - now if remaining <= 0: return "expired" if remaining < 60: return f"{int(remaining)}s" if remaining < 3600: return f"{int(remaining / 60)}m" if remaining < 86400: hours = int(remaining / 3600) return f"{hours}h" days = int(remaining / 86400) if days >= 365: years = days // 365 return f"{years}y" return f"{days}d" def parse_date(date_str: str) -> int: """Parse date string to Unix timestamp.""" if not date_str: return 0 for fmt in DATE_FORMATS: try: dt = datetime.strptime(date_str, fmt).replace(tzinfo=UTC) return int(dt.timestamp()) except ValueError: continue try: return int(date_str) except ValueError: die(f"invalid date format: {date_str}") def get_extension_for_mime(mime_type: str) -> str: """Get file extension for MIME type.""" return MIME_EXTENSIONS.get(mime_type, ".bin") def format_paste_row(paste: dict[str, Any], show_owner: bool = False) -> str: """Format a paste as a table row.""" paste_id = paste["id"] mime_type = paste.get("mime_type", "unknown")[:16] size = format_size(paste.get("size", 0)) created = format_timestamp(paste.get("created_at", 0)) # Time remaining until expiry expires = format_time_remaining(paste.get("expires_at")) flags = [] if paste.get("burn_after_read"): flags.append("burn") if paste.get("password_protected"): flags.append("pass") flags_str = " ".join(flags) row = f"{paste_id:<12} {mime_type:<16} {size:>6} {created:<16} {expires:<8} {flags_str}" if show_owner and paste.get("owner"): row += f" {paste['owner'][:12]}" return row def print_paste_list( pastes: list[dict[str, Any]], summary: str, as_json: bool = False, data: dict[str, Any] | None = None, ) -> None: """Print a list of pastes in table or JSON format.""" if as_json: print(json.dumps(data or {"pastes": pastes}, indent=2)) return if not pastes: print("no pastes found") return # Check if owner data is present (admin view) show_owner = any(paste.get("owner") for paste in pastes) header = f"{'ID':<12} {'TYPE':<16} {'SIZE':>6} {'CREATED':<16} {'EXPIRES':<8} FLAGS" if show_owner: header += " OWNER" print(header) for paste in pastes: print(format_paste_row(paste, show_owner=show_owner)) print(f"\n{summary}") # ----------------------------------------------------------------------------- # Clipboard integration # ----------------------------------------------------------------------------- # Clipboard read commands (tool name, command args) CLIPBOARD_READ_COMMANDS = [ ("xclip", ["xclip", "-selection", "clipboard", "-o"]), ("xsel", ["xsel", "--clipboard", "--output"]), ("pbpaste", ["pbpaste"]), ("powershell.exe", ["powershell.exe", "-command", "Get-Clipboard"]), ("wl-paste", ["wl-paste"]), ] # Clipboard write commands (tool name, command args) CLIPBOARD_WRITE_COMMANDS = [ ("xclip", ["xclip", "-selection", "clipboard", "-i"]), ("xsel", ["xsel", "--clipboard", "--input"]), ("pbcopy", ["pbcopy"]), ("clip.exe", ["clip.exe"]), ("wl-copy", ["wl-copy"]), ] # CLI-001: Trusted directories for clipboard tools (system paths only) # Prevents command injection via malicious PATH manipulation TRUSTED_CLIPBOARD_DIRS = frozenset( { "/usr/bin", "/usr/local/bin", "/bin", "/opt/homebrew/bin", # macOS Homebrew "/usr/X11/bin", "/usr/X11R6/bin", } ) # Windows system directories (checked case-insensitively) TRUSTED_WINDOWS_PATTERNS = ( "\\windows\\", "\\system32\\", "\\syswow64\\", "\\windowsapps\\", ) def is_trusted_clipboard_path(path: str) -> bool: """Check if clipboard tool path is in a trusted system directory. CLI-001: Validates that resolved clipboard tool paths are in expected system locations to prevent command injection via PATH manipulation. """ if not path: return False resolved = Path(path).resolve() parent = str(resolved.parent) # Check Unix trusted directories if parent in TRUSTED_CLIPBOARD_DIRS: return True # Check Windows paths (case-insensitive) parent_lower = parent.lower() for pattern in TRUSTED_WINDOWS_PATTERNS: if pattern in parent_lower: return True # Also allow Windows Program Files paths return "\\program files" in parent_lower def find_clipboard_command(commands: list[tuple[str, list[str]]]) -> list[str] | None: """Find first available clipboard command in trusted directories. CLI-001: Validates that found commands are in trusted system directories to prevent command injection via PATH manipulation. """ for tool_name, cmd in commands: tool_path = shutil.which(tool_name) if tool_path and is_trusted_clipboard_path(tool_path): # Use absolute path for security return [tool_path, *cmd[1:]] return None def read_clipboard() -> bytes: """Read content from system clipboard.""" cmd = find_clipboard_command(CLIPBOARD_READ_COMMANDS) if not cmd: die("no clipboard tool found (install xclip, xsel, or wl-paste)") try: result = subprocess.run(cmd, capture_output=True, check=True) return result.stdout except subprocess.CalledProcessError as e: die(f"clipboard read failed: {e.stderr.decode(errors='replace')}") except FileNotFoundError: die(f"clipboard tool not found: {cmd[0]}") def write_clipboard(data: bytes) -> None: """Write content to system clipboard.""" cmd = find_clipboard_command(CLIPBOARD_WRITE_COMMANDS) if not cmd: die("no clipboard tool found (install xclip, xsel, or wl-copy)") try: subprocess.run(cmd, input=data, check=True) except subprocess.CalledProcessError as e: die(f"clipboard write failed: {e.stderr.decode(errors='replace') if e.stderr else ''}") except FileNotFoundError: die(f"clipboard tool not found: {cmd[0]}") # ----------------------------------------------------------------------------- # Content helpers # ----------------------------------------------------------------------------- def read_content(file_arg: str | None, from_clipboard: bool = False) -> bytes: """Read content from file, stdin, or clipboard.""" if from_clipboard: return read_clipboard() if file_arg: if file_arg == "-": return sys.stdin.buffer.read() path = Path(file_arg) if not path.exists(): die(f"file not found: {file_arg}") return path.read_bytes() if sys.stdin.isatty(): die("no input provided (pipe data, specify file, or use -C for clipboard)") return sys.stdin.buffer.read() def prepare_content( content: bytes, encrypt: bool, quiet: bool = False, ) -> tuple[bytes, bytes | None]: """Optionally encrypt content. Returns (content, encryption_key or None).""" if not encrypt: return content, None if not HAS_CRYPTO: die("encryption requires 'cryptography' package (use -E to disable)") if not quiet: print("encrypting...", end="", file=sys.stderr) encrypted, key = encrypt_content(content) if not quiet: print(" done", file=sys.stderr) return encrypted, key def extract_paste_id(url_or_id: str) -> tuple[str, bytes | None]: """Extract paste ID and optional encryption key from URL or ID.""" encryption_key = None if "#" in url_or_id: url_or_id, key_encoded = url_or_id.rsplit("#", 1) if key_encoded: encryption_key = decode_key(key_encoded) paste_id = url_or_id.split("/")[-1] return paste_id, encryption_key def auth_headers(config: Mapping[str, Any]) -> dict[str, str]: """Build authentication headers.""" if cert_sha1 := config.get("cert_sha1"): return {"X-SSL-Client-SHA1": cert_sha1} return {} def require_auth(config: Mapping[str, Any]) -> None: """Ensure authentication is configured.""" if not config.get("cert_sha1"): die("authentication required (set FLASKPASTE_CERT_SHA1)") # ----------------------------------------------------------------------------- # Commands # ----------------------------------------------------------------------------- def cmd_create(args: argparse.Namespace, config: dict[str, Any]) -> None: """Create a new paste.""" from_clipboard = getattr(args, "clipboard", False) content = read_content(args.file, from_clipboard=from_clipboard) if not content: die("empty content") content, encryption_key = prepare_content( content, encrypt=not getattr(args, "no_encrypt", False), quiet=args.quiet, ) # Build headers base_headers = auth_headers(config) if args.burn: base_headers["X-Burn-After-Read"] = "true" if args.expiry: base_headers["X-Expiry"] = str(args.expiry) if args.password: base_headers["X-Paste-Password"] = args.password url = build_url(config, "/") max_retries = 5 last_error = "" for attempt in range(max_retries): headers = dict(base_headers) if challenge := get_challenge(config): if attempt > 0 and not args.quiet: print(f"retry {attempt}/{max_retries - 1}...", file=sys.stderr) if not args.quiet: diff = challenge["difficulty"] base_diff = challenge.get("base_difficulty", diff) elevated = challenge.get("elevated", False) msg = ( f"solving pow ({diff} bits, elevated from {base_diff})..." if elevated else f"solving pow ({diff} bits)..." ) print(msg, end="", file=sys.stderr) solution = solve_pow(challenge["nonce"], challenge["difficulty"]) if not args.quiet: print(" done", file=sys.stderr) headers["X-PoW-Token"] = challenge["token"] headers["X-PoW-Solution"] = str(solution) status, body, _ = request( url, method="POST", data=content, headers=headers, ssl_context=config.get("ssl_context") ) if status == 201: data = json.loads(body) key_fragment = f"#{encode_key(encryption_key)}" if encryption_key else "" if args.raw: result_url = build_url(config, data["raw"]) + key_fragment elif args.quiet: result_url = data["id"] + key_fragment else: result_url = build_url(config, data["url"]) + key_fragment print(result_url) # Copy URL to clipboard if requested if getattr(args, "copy_url", False): write_clipboard(result_url.encode()) if not args.quiet: print("(copied to clipboard)", file=sys.stderr) return last_error = parse_error(body, body.decode(errors="replace")) err_lower = last_error.lower() is_pow_error = status == 400 and ("pow" in err_lower or "proof-of-work" in err_lower) if not is_pow_error: die(f"create failed ({status}): {last_error}") if not args.quiet: print(f"pow rejected: {last_error}", file=sys.stderr) die(f"create failed after {max_retries} attempts: {last_error}") def cmd_get(args: argparse.Namespace, config: dict[str, Any]) -> None: """Retrieve a paste.""" paste_id, encryption_key = extract_paste_id(args.id) headers: dict[str, str] = {} if args.password: headers["X-Paste-Password"] = args.password if args.meta: url = build_url(config, paste_id) status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context")) if status == 200: data = json.loads(body) print(f"id: {data['id']}") print(f"mime_type: {data['mime_type']}") print(f"size: {data['size']}") print(f"created_at: {data['created_at']}") if encryption_key: print("encrypted: yes (key in URL)") if data.get("password_protected"): print("protected: yes (password required)") elif status == 401: die("password required (-p)") elif status == 403: die("invalid password") else: die(f"not found: {paste_id}") else: url = build_url(config, f"{paste_id}/raw") status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context")) if status == 200: if encryption_key: body = decrypt_content(body, encryption_key) # Copy to clipboard if requested if getattr(args, "copy", False): write_clipboard(body) print("(copied to clipboard)", file=sys.stderr) elif args.output: Path(args.output).write_bytes(body) print(f"saved: {args.output}", file=sys.stderr) else: sys.stdout.buffer.write(body) if sys.stdout.isatty() and body and not body.endswith(b"\n"): sys.stdout.buffer.write(b"\n") elif status == 401: die("password required (-p)") elif status == 403: die("invalid password") else: die(f"not found: {paste_id}") def cmd_delete(args: argparse.Namespace, config: dict[str, Any]) -> None: """Delete paste(s).""" require_auth(config) delete_all = getattr(args, "all", False) confirm_count = getattr(args, "confirm", None) paste_ids = [paste_id.split("/")[-1] for paste_id in (args.ids or [])] # Validate arguments if delete_all and paste_ids: die("cannot specify both --all and paste IDs") if not delete_all and not paste_ids: die("specify paste ID(s) or use --all") if delete_all: # Fetch all pastes to get count and IDs url = build_url(config, "/pastes") + "?all=1&limit=1000" status, body, _ = request( url, headers=auth_headers(config), ssl_context=config.get("ssl_context") ) if status == 401: die("authentication failed") if status != 200: die(f"failed to list pastes ({status})") data = json.loads(body) pastes = data.get("pastes", []) total = len(pastes) if total == 0: print("no pastes to delete") return # Require confirmation with expected count if confirm_count is None: die(f"--all requires --confirm {total} (found {total} pastes)") if confirm_count != total: die(f"confirmation mismatch: expected {confirm_count}, found {total}") paste_ids = [p["id"] for p in pastes] # Delete pastes deleted = 0 failed = 0 for paste_id in paste_ids: url = build_url(config, paste_id) status, _, _ = request( url, method="DELETE", headers=auth_headers(config), ssl_context=config.get("ssl_context"), ) if status == 200: print(f"deleted: {paste_id}") deleted += 1 elif status == 404: print(f"not found: {paste_id}", file=sys.stderr) failed += 1 elif status == 403: print(f"permission denied: {paste_id}", file=sys.stderr) failed += 1 else: print(f"failed ({status}): {paste_id}", file=sys.stderr) failed += 1 # Summary for batch operations if len(paste_ids) > 1: print(f"\n{deleted} deleted, {failed} failed") def cmd_info(args: argparse.Namespace, config: dict[str, Any]) -> None: """Show server info.""" url = build_url(config, "/") status, body, _ = request(url, ssl_context=config.get("ssl_context")) if status != 200: die("failed to connect to server") data = json.loads(body) print(f"server: {config['server']}") print(f"name: {data.get('name', 'unknown')}") print(f"version: {data.get('version', 'unknown')}") if challenge := get_challenge(config): difficulty = challenge.get("difficulty", 0) base_diff = challenge.get("base_difficulty", difficulty) if challenge.get("elevated"): print(f"pow: {difficulty} bits (elevated from {base_diff})") else: print(f"pow: {difficulty} bits") else: print("pow: disabled") def cmd_list(args: argparse.Namespace, config: dict[str, Any]) -> None: """List user's pastes.""" require_auth(config) params = [] if getattr(args, "all", False): params.append("all=1") if args.limit: params.append(f"limit={args.limit}") if args.offset: params.append(f"offset={args.offset}") url = build_url(config, "/pastes") if params: url += "?" + "&".join(params) status, body, _ = request( url, headers=auth_headers(config), ssl_context=config.get("ssl_context") ) if status == 401: die("authentication failed") if status != 200: die(f"failed to list pastes ({status})") data = json.loads(body) pastes = data.get("pastes", []) summary = f"{data.get('count', 0)} of {data.get('total', 0)} pastes shown" print_paste_list(pastes, summary, as_json=args.json, data=data) def cmd_search(args: argparse.Namespace, config: dict[str, Any]) -> None: """Search user's pastes.""" require_auth(config) params = [] if args.type: params.append(f"type={args.type}") if args.after: params.append(f"after={parse_date(args.after)}") if args.before: params.append(f"before={parse_date(args.before)}") if args.limit: params.append(f"limit={args.limit}") url = build_url(config, "/pastes") if params: url += "?" + "&".join(params) status, body, _ = request( url, headers=auth_headers(config), ssl_context=config.get("ssl_context") ) if status == 401: die("authentication failed") if status != 200: die(f"failed to search pastes ({status})") data = json.loads(body) pastes = data.get("pastes", []) summary = f"{data.get('count', 0)} matching pastes found" print_paste_list(pastes, summary, as_json=args.json, data=data) def cmd_update(args: argparse.Namespace, config: dict[str, Any]) -> None: """Update an existing paste.""" require_auth(config) paste_id, _ = extract_paste_id(args.id) url = build_url(config, paste_id) headers = auth_headers(config) content: bytes | None = None encryption_key: bytes | None = None if args.file: raw_content = read_content(args.file) if not raw_content: die("empty content") content, encryption_key = prepare_content( raw_content, encrypt=not getattr(args, "no_encrypt", False), quiet=args.quiet, ) if args.password: headers["X-Paste-Password"] = args.password if args.remove_password: headers["X-Remove-Password"] = "true" if args.expiry: headers["X-Extend-Expiry"] = str(args.expiry) status, body, _ = request( url, method="PUT", data=content, headers=headers, ssl_context=config.get("ssl_context") ) if status == 200: data = json.loads(body) if args.quiet: print(paste_id) else: print(f"updated: {paste_id}") print(f" size: {data.get('size', 'unknown')}") print(f" type: {data.get('mime_type', 'unknown')}") if data.get("expires_at"): print(f" expires: {data.get('expires_at')}") if data.get("password_protected"): print(" password: protected") if content and encryption_key: print(f" key: {build_url(config, paste_id)}#{encode_key(encryption_key)}") elif status == 400: die(parse_error(body, "bad request")) elif status == 401: die("authentication failed") elif status == 403: die("permission denied (not owner)") elif status == 404: die(f"not found: {paste_id}") else: die(f"update failed ({status})") def cmd_export(args: argparse.Namespace, config: dict[str, Any]) -> None: """Export user's pastes to a directory.""" require_auth(config) out_dir = Path(args.output) if args.output else Path("fpaste-export") out_dir.mkdir(parents=True, exist_ok=True) # Load key file keys: dict[str, str] = {} if args.keyfile: keyfile_path = Path(args.keyfile) if not keyfile_path.exists(): die(f"key file not found: {args.keyfile}") for line in keyfile_path.read_text().splitlines(): line = line.strip() if line and not line.startswith("#") and "=" in line: paste_id, key_encoded = line.split("=", 1) keys[paste_id.strip()] = key_encoded.strip() # Fetch paste list url = build_url(config, "/pastes") + "?limit=1000" status, body, _ = request( url, headers=auth_headers(config), ssl_context=config.get("ssl_context") ) if status == 401: die("authentication failed") if status != 200: die(f"failed to list pastes ({status})") pastes = json.loads(body).get("pastes", []) if not pastes: print("no pastes to export") return exported, skipped, errors = 0, 0, 0 manifest: list[dict[str, Any]] = [] for paste in pastes: paste_id = paste["id"] mime_type = paste.get("mime_type", "application/octet-stream") if not args.quiet: print(f"exporting {paste_id}...", end=" ", file=sys.stderr) if paste.get("burn_after_read"): if not args.quiet: print("skipped (burn-after-read)", file=sys.stderr) skipped += 1 continue if paste.get("password_protected"): if not args.quiet: print("skipped (password-protected)", file=sys.stderr) skipped += 1 continue raw_url = build_url(config, f"{paste_id}/raw") status, content, _ = request( raw_url, headers=auth_headers(config), ssl_context=config.get("ssl_context") ) if status != 200: if not args.quiet: print(f"error ({status})", file=sys.stderr) errors += 1 continue decrypted = False if paste_id in keys: try: key = decode_key(keys[paste_id]) content = decrypt_content(content, key) decrypted = True except SystemExit: if not args.quiet: print("decryption failed, keeping encrypted", file=sys.stderr, end=" ") filename = f"{paste_id}{get_extension_for_mime(mime_type)}" (out_dir / filename).write_bytes(content) manifest.append( { "id": paste_id, "filename": filename, "mime_type": mime_type, "size": len(content), "created_at": paste.get("created_at"), "decrypted": decrypted, "encrypted": paste_id in keys and not decrypted, } ) if not args.quiet: status_msg = "decrypted" if decrypted else ("encrypted" if paste_id in keys else "ok") print(status_msg, file=sys.stderr) exported += 1 if args.manifest: manifest_path = out_dir / "manifest.json" manifest_path.write_text(json.dumps(manifest, indent=2)) if not args.quiet: print(f"manifest: {manifest_path}", file=sys.stderr) print(f"\nexported: {exported}, skipped: {skipped}, errors: {errors}") print(f"output: {out_dir}") def cmd_pki_status(args: argparse.Namespace, config: dict[str, Any]) -> None: """Show PKI status and CA information.""" url = build_url(config, "/pki") status, body, _ = request(url, ssl_context=config.get("ssl_context")) if status == 404: die("PKI not enabled on this server") if status != 200: die(f"failed to get PKI status ({status})") data = json.loads(body) print(f"pki enabled: {data.get('enabled', False)}") print(f"ca exists: {data.get('ca_exists', False)}") if data.get("ca_exists"): print(f"common name: {data.get('common_name', 'unknown')}") print(f"fingerprint: {data.get('fingerprint_sha1', 'unknown')}") if data.get("created_at"): print(f"created: {data.get('created_at')}") if data.get("expires_at"): print(f"expires: {data.get('expires_at')}") print(f"download: {build_url(config, data.get('download', '/pki/ca.crt'))}") elif hint := data.get("hint"): print(f"hint: {hint}") def cmd_pki_issue(args: argparse.Namespace, config: dict[str, Any]) -> None: """Request a new client certificate from the server CA.""" url = build_url(config, "/pki/issue") headers = {"Content-Type": "application/json", **auth_headers(config)} payload = json.dumps({"common_name": args.name}).encode() status, body, _ = request( url, method="POST", data=payload, headers=headers, ssl_context=config.get("ssl_context") ) if status == 404: die(parse_error(body, "PKI not available")) if status == 400: die(parse_error(body, "bad request")) if status != 201: die(f"certificate issuance failed ({status})") result = json.loads(body) out_dir = Path(args.output) if args.output else CONFIG_DIR out_dir.mkdir(parents=True, exist_ok=True) key_file = out_dir / "client.key" cert_file = out_dir / "client.crt" if not args.force: if key_file.exists(): die(f"key file exists: {key_file} (use --force)") if cert_file.exists(): die(f"cert file exists: {cert_file} (use --force)") key_file.write_text(result["private_key_pem"]) key_file.chmod(0o600) cert_file.write_text(result["certificate_pem"]) fingerprint = result.get("fingerprint_sha1", "unknown") print(f"key: {key_file}", file=sys.stderr) print(f"certificate: {cert_file}", file=sys.stderr) print(f"fingerprint: {fingerprint}", file=sys.stderr) print(f"serial: {result.get('serial', 'unknown')}", file=sys.stderr) print(f"common name: {result.get('common_name', args.name)}", file=sys.stderr) if args.configure: cfg_path = write_config_file( { "client_cert": str(cert_file), "client_key": str(key_file), "cert_sha1": fingerprint, } ) print(f"config: {cfg_path} (updated)", file=sys.stderr) print(fingerprint) def cmd_pki_download(args: argparse.Namespace, config: dict[str, Any]) -> None: """Download the CA certificate from the server.""" url = build_url(config, "/pki/ca.crt") status, body, _ = request(url, ssl_context=config.get("ssl_context")) if status == 404: die("CA certificate not available (PKI disabled or CA not generated)") if status != 200: die(f"failed to download CA certificate ({status})") if args.output: out_path = Path(args.output) out_path.write_bytes(body) print(f"saved: {out_path}", file=sys.stderr) if HAS_CRYPTO: cert = x509.load_pem_x509_certificate(body) fp = hashlib.sha1( cert.public_bytes(serialization.Encoding.DER), usedforsecurity=False ).hexdigest() print(f"fingerprint: {fp}", file=sys.stderr) if args.configure: cfg_path = write_config_file({"ca_cert": str(out_path)}) print(f"config: {cfg_path} (updated)", file=sys.stderr) else: sys.stdout.buffer.write(body) def cmd_register(args: argparse.Namespace, config: dict[str, Any]) -> None: """Register and obtain a client certificate from the server.""" if not HAS_CRYPTO: die("register requires 'cryptography' package: pip install cryptography") from cryptography.hazmat.primitives.serialization import pkcs12 url = build_url(config, "/register") headers = {"Content-Type": "application/json"} payload: dict[str, str] = {} if args.name: payload["common_name"] = args.name if challenge := get_challenge(config, path="/register/challenge"): if not args.quiet: print(f"solving pow ({challenge['difficulty']} bits)...", end="", file=sys.stderr) solution = solve_pow(challenge["nonce"], challenge["difficulty"]) if not args.quiet: print(" done", file=sys.stderr) headers["X-PoW-Token"] = challenge["token"] headers["X-PoW-Solution"] = str(solution) data = json.dumps(payload).encode() if payload else b"{}" status, body, resp_headers = request( url, method="POST", data=data, headers=headers, ssl_context=config.get("ssl_context") ) if status == 400: die(parse_error(body, "bad request")) if status == 500: die(parse_error(body, "server error")) if status != 200: die(f"registration failed ({status})") fingerprint = resp_headers.get("X-Fingerprint-SHA1", "unknown") out_dir = Path(args.output) if args.output else CONFIG_DIR out_dir.mkdir(parents=True, exist_ok=True) p12_file = out_dir / "client.p12" key_file = out_dir / "client.key" cert_file = out_dir / "client.crt" if not args.force: if p12_file.exists(): die(f"p12 file exists: {p12_file} (use --force)") if not args.p12_only: if key_file.exists(): die(f"key file exists: {key_file} (use --force)") if cert_file.exists(): die(f"cert file exists: {cert_file} (use --force)") p12_file.write_bytes(body) p12_file.chmod(0o600) print(f"pkcs12: {p12_file}", file=sys.stderr) if not args.p12_only: private_key, certificate, _ = pkcs12.load_key_and_certificates(body, None) if private_key is None or certificate is None: die("failed to parse PKCS#12 bundle") key_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), ) cert_pem = certificate.public_bytes(serialization.Encoding.PEM) key_file.write_bytes(key_pem) key_file.chmod(0o600) cert_file.write_bytes(cert_pem) print(f"key: {key_file}", file=sys.stderr) print(f"certificate: {cert_file}", file=sys.stderr) cn = certificate.subject.get_attributes_for_oid(NameOID.COMMON_NAME) if cn: cn_value = cn[0].value if isinstance(cn_value, bytes): cn_value = cn_value.decode("utf-8", errors="replace") print(f"common name: {cn_value}", file=sys.stderr) print(f"fingerprint: {fingerprint}", file=sys.stderr) if args.configure and not args.p12_only: cfg_path = write_config_file( { "client_cert": str(cert_file), "client_key": str(key_file), "cert_sha1": fingerprint, } ) print(f"config: {cfg_path} (updated)", file=sys.stderr) print(fingerprint) def cmd_cert(args: argparse.Namespace, config: dict[str, Any]) -> None: """Generate a self-signed client certificate for mTLS authentication.""" if not HAS_CRYPTO: die("certificate generation requires 'cryptography' package: pip install cryptography") out_dir = Path(args.output) if args.output else CONFIG_DIR out_dir.mkdir(parents=True, exist_ok=True) key_file = out_dir / "client.key" cert_file = out_dir / "client.crt" if not args.force: if key_file.exists(): die(f"key file exists: {key_file} (use --force)") if cert_file.exists(): die(f"cert file exists: {cert_file} (use --force)") # Generate private key private_key: rsa.RSAPrivateKey | ec.EllipticCurvePrivateKey if args.algorithm == "rsa": key_size = args.bits or 4096 print(f"generating {key_size}-bit RSA key...", file=sys.stderr) private_key = rsa.generate_private_key(public_exponent=65537, key_size=key_size) elif args.algorithm == "ec": curve_name = args.curve or "secp384r1" curves = { "secp256r1": ec.SECP256R1(), "secp384r1": ec.SECP384R1(), "secp521r1": ec.SECP521R1(), } if curve_name not in curves: die(f"unsupported curve: {curve_name} (use: secp256r1, secp384r1, secp521r1)") print(f"generating EC key ({curve_name})...", file=sys.stderr) private_key = ec.generate_private_key(curves[curve_name]) else: die(f"unsupported algorithm: {args.algorithm}") cn = args.name or os.environ.get("USER", "fpaste-client") subject = issuer = x509.Name( [ x509.NameAttribute(NameOID.COMMON_NAME, cn), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "FlaskPaste Client"), ] ) days = args.days or 365 now = datetime.now(UTC) cert_builder = ( x509.CertificateBuilder() .subject_name(subject) .issuer_name(issuer) .public_key(private_key.public_key()) .serial_number(x509.random_serial_number()) .not_valid_before(now) .not_valid_after(now + timedelta(days=days)) .add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True) .add_extension( x509.KeyUsage( digital_signature=True, key_encipherment=True, content_commitment=False, data_encipherment=False, key_agreement=False, key_cert_sign=False, crl_sign=False, encipher_only=False, decipher_only=False, ), critical=True, ) .add_extension( x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]), critical=False, ) ) print("signing certificate...", file=sys.stderr) certificate = cert_builder.sign(private_key, hashes.SHA256()) cert_der = certificate.public_bytes(serialization.Encoding.DER) fingerprint = hashlib.sha1(cert_der, usedforsecurity=False).hexdigest() key_encryption = ( serialization.BestAvailableEncryption(args.password_key.encode()) if args.password_key else serialization.NoEncryption() ) key_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=key_encryption, ) cert_pem = certificate.public_bytes(serialization.Encoding.PEM) key_file.write_bytes(key_pem) key_file.chmod(0o600) cert_file.write_bytes(cert_pem) print(f"key: {key_file}", file=sys.stderr) print(f"certificate: {cert_file}", file=sys.stderr) print(f"fingerprint: {fingerprint}", file=sys.stderr) print(f"valid for: {days} days", file=sys.stderr) print(f"common name: {cn}", file=sys.stderr) if args.configure: cfg_path = write_config_file( { "client_cert": str(cert_file), "client_key": str(key_file), "cert_sha1": fingerprint, } ) print(f"config: {cfg_path} (updated)", file=sys.stderr) print(fingerprint) # ----------------------------------------------------------------------------- # Argument parsing # ----------------------------------------------------------------------------- def is_file_path(arg: str) -> bool: """Check if argument looks like a file path.""" if not arg or arg.startswith("-"): return False if Path(arg).exists(): return True if "/" in arg or "\\" in arg: return True if "." in arg and not arg.startswith("."): ext = arg.rsplit(".", 1)[-1].lower() return ext in FILE_EXTENSIONS return False def build_parser() -> argparse.ArgumentParser: """Build and return the argument parser.""" parser = argparse.ArgumentParser( prog="fpaste", description="FlaskPaste command-line client", epilog="Shortcut: fpaste is equivalent to fpaste create ", ) parser.add_argument("-s", "--server", help="server URL (env: FLASKPASTE_SERVER)") subparsers = parser.add_subparsers(dest="command", metavar="command") # create p_create = subparsers.add_parser("create", aliases=["c", "new"], help="create paste") p_create.add_argument("file", nargs="?", help="file to upload (- for stdin)") p_create.add_argument("-E", "--no-encrypt", action="store_true", help="disable encryption") p_create.add_argument("-b", "--burn", action="store_true", help="burn after read") p_create.add_argument("-x", "--expiry", type=int, metavar="SEC", help="expiry in seconds") p_create.add_argument("-p", "--password", metavar="PASS", help="password protect") p_create.add_argument("-r", "--raw", action="store_true", help="output raw URL") p_create.add_argument("-q", "--quiet", action="store_true", help="output ID only") p_create.add_argument("-C", "--clipboard", action="store_true", help="read from clipboard") p_create.add_argument("--copy-url", action="store_true", help="copy result URL to clipboard") # get p_get = subparsers.add_parser("get", aliases=["g"], help="retrieve paste") p_get.add_argument("id", help="paste ID or URL") p_get.add_argument("-o", "--output", help="save to file") p_get.add_argument("-c", "--copy", action="store_true", help="copy content to clipboard") p_get.add_argument("-p", "--password", metavar="PASS", help="password for protected paste") p_get.add_argument("-m", "--meta", action="store_true", help="show metadata only") # delete p_delete = subparsers.add_parser("delete", aliases=["d", "rm"], help="delete paste(s)") p_delete.add_argument("ids", nargs="*", metavar="ID", help="paste ID(s) or URL(s)") p_delete.add_argument("-a", "--all", action="store_true", help="delete all pastes (admin)") p_delete.add_argument( "-c", "--confirm", type=int, metavar="N", help="confirm expected delete count" ) # info subparsers.add_parser("info", aliases=["i"], help="show server info") # list p_list = subparsers.add_parser("list", aliases=["ls"], help="list your pastes") p_list.add_argument("-a", "--all", action="store_true", help="list all pastes (admin only)") p_list.add_argument("-l", "--limit", type=int, metavar="N", help="max pastes (default: 50)") p_list.add_argument("-o", "--offset", type=int, metavar="N", help="skip first N pastes") p_list.add_argument("--json", action="store_true", help="output as JSON") # search p_search = subparsers.add_parser("search", aliases=["s", "find"], help="search your pastes") p_search.add_argument("-t", "--type", metavar="PATTERN", help="filter by MIME type (image/*)") p_search.add_argument("--after", metavar="DATE", help="created after (YYYY-MM-DD or timestamp)") p_search.add_argument("--before", metavar="DATE", help="created before (YYYY-MM-DD)") p_search.add_argument("-l", "--limit", type=int, metavar="N", help="max results (default: 50)") p_search.add_argument("--json", action="store_true", help="output as JSON") # update p_update = subparsers.add_parser("update", aliases=["u"], help="update existing paste") p_update.add_argument("id", help="paste ID or URL") p_update.add_argument("file", nargs="?", help="new content (- for stdin)") p_update.add_argument("-E", "--no-encrypt", action="store_true", help="disable encryption") p_update.add_argument("-p", "--password", metavar="PASS", help="set/change password") p_update.add_argument("--remove-password", action="store_true", help="remove password") p_update.add_argument("-x", "--expiry", type=int, metavar="SEC", help="extend expiry (seconds)") p_update.add_argument("-q", "--quiet", action="store_true", help="minimal output") # export p_export = subparsers.add_parser("export", help="export all pastes to directory") p_export.add_argument("-o", "--output", metavar="DIR", help="output directory") p_export.add_argument("-k", "--keyfile", metavar="FILE", help="key file (paste_id=key format)") p_export.add_argument("--manifest", action="store_true", help="write manifest.json") p_export.add_argument("-q", "--quiet", action="store_true", help="minimal output") # register p_register = subparsers.add_parser("register", help="register and get client certificate") p_register.add_argument("-n", "--name", metavar="CN", help="common name (optional)") p_register.add_argument("-o", "--output", metavar="DIR", help="output directory") p_register.add_argument("--configure", action="store_true", help="update config file") p_register.add_argument("--p12-only", action="store_true", help="save only PKCS#12") p_register.add_argument("-f", "--force", action="store_true", help="overwrite existing files") p_register.add_argument("-q", "--quiet", action="store_true", help="minimal output") # cert p_cert = subparsers.add_parser("cert", help="generate client certificate") p_cert.add_argument("-o", "--output", metavar="DIR", help="output directory") p_cert.add_argument( "-a", "--algorithm", choices=["rsa", "ec"], default="ec", help="key algorithm" ) p_cert.add_argument("-b", "--bits", type=int, metavar="N", help="RSA key size (default: 4096)") p_cert.add_argument( "-c", "--curve", metavar="CURVE", help="EC curve (secp256r1/secp384r1/secp521r1)" ) p_cert.add_argument("-d", "--days", type=int, metavar="N", help="validity period in days") p_cert.add_argument("-n", "--name", metavar="CN", help="common name (default: $USER)") p_cert.add_argument("--password-key", metavar="PASS", help="encrypt private key") p_cert.add_argument("--configure", action="store_true", help="update config file") p_cert.add_argument("-f", "--force", action="store_true", help="overwrite existing files") # pki p_pki = subparsers.add_parser("pki", help="PKI operations (server-issued certificates)") pki_sub = p_pki.add_subparsers(dest="pki_command", metavar="subcommand") pki_sub.add_parser("status", help="show PKI status and CA info") p_pki_issue = pki_sub.add_parser("issue", help="request certificate from server CA") p_pki_issue.add_argument( "-n", "--name", required=True, metavar="CN", help="common name (required)" ) p_pki_issue.add_argument("-o", "--output", metavar="DIR", help="output directory") p_pki_issue.add_argument("--configure", action="store_true", help="update config file") p_pki_issue.add_argument("-f", "--force", action="store_true", help="overwrite existing files") p_pki_download = pki_sub.add_parser("download", aliases=["dl"], help="download CA certificate") p_pki_download.add_argument("-o", "--output", metavar="FILE", help="save to file") p_pki_download.add_argument( "--configure", action="store_true", help="update config file (requires -o)" ) # completion p_completion = subparsers.add_parser("completion", help="generate shell completion") p_completion.add_argument( "--shell", choices=["bash", "zsh", "fish"], default="bash", help="shell type" ) return parser def cmd_completion(args: argparse.Namespace, config: dict[str, Any]) -> None: """Output shell completion script.""" shell = getattr(args, "shell", None) or "bash" # Bash completion - full featured bash_completion = """\ # Bash completion for fpaste # Install: source this file or copy to /etc/bash_completion.d/fpaste _fpaste_completions() { local cur prev words cword _init_completion || return local commands="create c new get g delete d rm info i list ls" commands+=" search s find update u export register cert pki completion" local pki_commands="status issue download dl" if [[ $cword -eq 1 ]]; then COMPREPLY=($(compgen -W "$commands" -- "$cur")) return fi local cmd="${words[1]}" if [[ "$cmd" == "pki" && $cword -eq 2 ]]; then COMPREPLY=($(compgen -W "$pki_commands" -- "$cur")) return fi case "$cmd" in create|c|new) local opts="-E --no-encrypt -b --burn -x --expiry -p --password" opts+=" -r --raw -q --quiet -C --clipboard --copy-url" [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) || _filedir ;; get|g) local opts="-o --output -c --copy -p --password -m --meta" [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) ;; delete|d|rm) [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "-a --all -c --confirm" -- "$cur")) ;; list|ls) local opts="-a --all -l --limit -o --offset --json" [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) ;; search|s|find) local opts="-t --type --after --before -l --limit --json" [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) ;; update|u) local opts="-E --no-encrypt -p --password --remove-password -x --expiry -q --quiet" [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) || _filedir ;; export) local opts="-o --output -k --keyfile --manifest -q --quiet" [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) ;; register) local opts="-n --name -o --output --configure --p12-only -f --force -q --quiet" [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) ;; cert) local opts="-o --output -a --algorithm -b --bits -c --curve" opts+=" -d --days -n --name --password-key --configure -f --force" [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) ;; completion) [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "--shell" -- "$cur")) [[ "$prev" == "--shell" ]] && COMPREPLY=($(compgen -W "bash zsh fish" -- "$cur")) ;; esac } complete -F _fpaste_completions fpaste """ # Zsh completion - compact format zsh_completion = """\ #compdef fpaste _fpaste() { local curcontext="$curcontext" state line typeset -A opt_args _arguments -C '-s[Server]:url:' '--server[Server]:url:' '1: :->cmd' '*:: :->args' case $state in cmd) local cmds=('create:Create paste' 'get:Get paste' 'delete:Delete' 'info:Server info' 'list:List pastes' 'search:Search' 'update:Update paste' 'export:Export' 'register:Register' 'cert:Generate cert' 'pki:PKI ops' 'completion:Completions') _describe -t commands 'commands' cmds ;; args) case $line[1] in create|c|new) _arguments '-E[No encrypt]' '-b[Burn]' '-x[Expiry]:sec:' \\ '-p[Pass]:p:' '-r[Raw]' '-q[Quiet]' '-C[Clipboard]' \\ '--copy-url' '*:file:_files' ;; get|g) _arguments '-o[Out]:f:_files' '-c[Copy]' '-p[Pass]:p:' \\ '-m[Meta]' '1:ID:' ;; delete|d|rm) _arguments '-a[All]' '-c[Confirm]:n:' '*:ID:' ;; list|ls) _arguments '-a[All]' '-l[Limit]:n:' '-o[Off]:n:' '--json' ;; search|s|find) _arguments '-t[Type]:p:' '--after:d:' '--before:d:' \\ '-l[Limit]:n:' '--json' ;; update|u) _arguments '-E[No encrypt]' '-p[Pass]:p:' '--remove-password' \\ '-x[Expiry]:s:' '-q[Quiet]' '1:ID:' '*:file:_files' ;; export) _arguments '-o[Dir]:d:_files -/' '-k[Keys]:f:_files' \\ '--manifest' '-q[Quiet]' ;; register) _arguments '-n[Name]:cn:' '-o[Dir]:d:_files -/' '--configure' \\ '--p12-only' '-f[Force]' '-q[Quiet]' ;; cert) _arguments '-o[Dir]:d:_files -/' '-a[Algo]:(rsa ec)' \\ '-b[Bits]:n:' '-c[Curve]:(secp256r1 secp384r1 secp521r1)' \\ '-d[Days]:n:' '-n[Name]:cn:' '--configure' '-f[Force]' ;; pki) (( CURRENT == 2 )) && _describe 'cmd' '(status issue download)' ;; completion) _arguments '--shell:(bash zsh fish)' ;; esac ;; esac } _fpaste "$@" """ # Fish completion - compact fish_completion = """\ # Fish completion for fpaste complete -c fpaste -f complete -c fpaste -n __fish_use_subcommand -a 'create c new' -d 'Create' complete -c fpaste -n __fish_use_subcommand -a 'get g' -d 'Get paste' complete -c fpaste -n __fish_use_subcommand -a 'delete d rm' -d 'Delete' complete -c fpaste -n __fish_use_subcommand -a 'info i' -d 'Server info' complete -c fpaste -n __fish_use_subcommand -a 'list ls' -d 'List' complete -c fpaste -n __fish_use_subcommand -a 'search s find' -d 'Search' complete -c fpaste -n __fish_use_subcommand -a 'update u' -d 'Update' complete -c fpaste -n __fish_use_subcommand -a 'export' -d 'Export' complete -c fpaste -n __fish_use_subcommand -a 'register' -d 'Register' complete -c fpaste -n __fish_use_subcommand -a 'cert' -d 'Gen cert' complete -c fpaste -n __fish_use_subcommand -a 'pki' -d 'PKI' complete -c fpaste -n __fish_use_subcommand -a 'completion' -d 'Completions' set -l cr '__fish_seen_subcommand_from create c new' complete -c fpaste -n $cr -s E -l no-encrypt -d 'No encrypt' complete -c fpaste -n $cr -s b -l burn -d 'Burn' complete -c fpaste -n $cr -s x -l expiry -d 'Expiry' -x complete -c fpaste -n $cr -s p -l password -d 'Password' -x complete -c fpaste -n $cr -s r -l raw -d 'Raw URL' complete -c fpaste -n $cr -s q -l quiet -d 'Quiet' complete -c fpaste -n $cr -s C -l clipboard -d 'Clipboard' complete -c fpaste -n $cr -l copy-url -d 'Copy URL' complete -c fpaste -n $cr -F set -l gt '__fish_seen_subcommand_from get g' complete -c fpaste -n $gt -s o -l output -d 'Output' -r complete -c fpaste -n $gt -s c -l copy -d 'Copy' complete -c fpaste -n $gt -s p -l password -d 'Password' -x complete -c fpaste -n $gt -s m -l meta -d 'Metadata' set -l dl '__fish_seen_subcommand_from delete d rm' complete -c fpaste -n $dl -s a -l all -d 'All' complete -c fpaste -n $dl -s c -l confirm -d 'Confirm' -x set -l ls '__fish_seen_subcommand_from list ls' complete -c fpaste -n $ls -s a -l all -d 'All' complete -c fpaste -n $ls -s l -l limit -d 'Limit' -x complete -c fpaste -n $ls -s o -l offset -d 'Offset' -x complete -c fpaste -n $ls -l json -d 'JSON' set -l cp '__fish_seen_subcommand_from completion' complete -c fpaste -n $cp -l shell -d 'Shell' -xa 'bash zsh fish' """ completions = {"bash": bash_completion, "zsh": zsh_completion, "fish": fish_completion} if shell not in completions: die(f"unsupported shell: {shell} (use: bash, zsh, fish)") print(completions[shell]) # Command dispatch table COMMANDS: dict[str, Any] = { "create": cmd_create, "c": cmd_create, "new": cmd_create, "get": cmd_get, "g": cmd_get, "delete": cmd_delete, "d": cmd_delete, "rm": cmd_delete, "info": cmd_info, "i": cmd_info, "list": cmd_list, "ls": cmd_list, "search": cmd_search, "s": cmd_search, "find": cmd_search, "update": cmd_update, "u": cmd_update, "export": cmd_export, "register": cmd_register, "cert": cmd_cert, "completion": cmd_completion, } PKI_COMMANDS: dict[str, Any] = { "status": cmd_pki_status, "issue": cmd_pki_issue, "download": cmd_pki_download, "dl": cmd_pki_download, } def main() -> None: """Main entry point.""" args_to_parse = sys.argv[1:] command_names = set(COMMANDS.keys()) | {"pki"} # Auto-insert "create" if first positional looks like a file insert_pos = 0 has_command = False file_pos = -1 i = 0 while i < len(args_to_parse): arg = args_to_parse[i] if arg in ("-s", "--server"): insert_pos = i + 2 i += 2 continue if arg in ("-h", "--help"): i += 1 insert_pos = i continue if arg.startswith("-"): i += 1 continue if arg in command_names: has_command = True break if is_file_path(arg): file_pos = i break i += 1 if not has_command and (file_pos >= 0 or not sys.stdin.isatty()): args_to_parse.insert(insert_pos, "create") parser = build_parser() args = parser.parse_args(args_to_parse) config = get_config() if args.server: config["server"] = args.server config["ssl_context"] = create_ssl_context(config) if not args.command: if not sys.stdin.isatty(): args.command = "create" args.file = None args.no_encrypt = False args.burn = False args.expiry = None args.password = None args.raw = False args.quiet = False else: parser.print_help() sys.exit(0) if args.command == "pki": if args.pki_command in PKI_COMMANDS: PKI_COMMANDS[args.pki_command](args, config) else: parser.parse_args(["pki", "--help"]) elif args.command in COMMANDS: COMMANDS[args.command](args, config) if __name__ == "__main__": main()