Files
flaskpaste/fpaste
Username 0c8bdacfd2
All checks were successful
CI / Lint & Format (push) Successful in 24s
CI / Security Scan (push) Successful in 23s
CI / Memory Leak Check (push) Successful in 21s
CI / SBOM Generation (push) Successful in 22s
CI / Security Tests (push) Successful in 27s
CI / Unit Tests (push) Successful in 36s
fix ruff S310 audit warnings in fpaste
2025-12-25 21:08:48 +01:00

1884 lines
64 KiB
Python
Executable File

#!/usr/bin/env python3
"""FlaskPaste command-line client."""
from __future__ import annotations
import argparse
import base64
import hashlib
import json
import os
import shutil
import ssl
import stat
import subprocess
import sys
import time
import urllib.error
import urllib.request
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import TYPE_CHECKING, Any, NoReturn
if TYPE_CHECKING:
from collections.abc import Mapping
# Optional cryptography support (for encryption and cert generation)
try:
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.x509.oid import NameOID
HAS_CRYPTO = True
except ImportError:
HAS_CRYPTO = False
# Constants
CONFIG_DIR = Path.home() / ".config" / "fpaste"
CONFIG_FILE = CONFIG_DIR / "config"
CONFIG_KEYS = frozenset({"server", "cert_sha1", "client_cert", "client_key", "ca_cert"})
MIME_EXTENSIONS: dict[str, str] = {
"text/plain": ".txt",
"text/html": ".html",
"text/css": ".css",
"text/javascript": ".js",
"text/markdown": ".md",
"text/x-python": ".py",
"application/json": ".json",
"application/xml": ".xml",
"application/javascript": ".js",
"application/octet-stream": ".bin",
"image/png": ".png",
"image/jpeg": ".jpg",
"image/gif": ".gif",
"image/webp": ".webp",
"image/svg+xml": ".svg",
"application/pdf": ".pdf",
"application/zip": ".zip",
"application/gzip": ".gz",
"application/x-tar": ".tar",
}
FILE_EXTENSIONS = frozenset(
{
"txt",
"md",
"py",
"js",
"json",
"yaml",
"yml",
"xml",
"html",
"css",
"sh",
"bash",
"c",
"cpp",
"h",
"go",
"rs",
"java",
"rb",
"php",
"sql",
"log",
"conf",
"cfg",
"ini",
"png",
"jpg",
"jpeg",
"gif",
"pdf",
"zip",
"tar",
"gz",
}
)
DATE_FORMATS = (
"%Y-%m-%d",
"%Y-%m-%d %H:%M",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%SZ",
)
# -----------------------------------------------------------------------------
# Core utilities
# -----------------------------------------------------------------------------
def die(msg: str, code: int = 1) -> NoReturn:
"""Print error and exit."""
print(f"error: {msg}", file=sys.stderr)
sys.exit(code)
def warn(msg: str) -> None:
"""Print warning to stderr."""
print(f"warning: {msg}", file=sys.stderr)
def request(
url: str,
method: str = "GET",
data: bytes | None = None,
headers: dict[str, str] | None = None,
ssl_context: ssl.SSLContext | None = None,
) -> tuple[int, bytes, dict[str, str]]:
"""Make HTTP request and return (status, body, headers)."""
headers = headers or {}
req = urllib.request.Request(url, data=data, headers=headers, method=method) # noqa: S310
try:
with urllib.request.urlopen(req, timeout=30, context=ssl_context) as resp: # noqa: S310 # nosec B310
return resp.status, resp.read(), dict(resp.headers)
except urllib.error.HTTPError as e:
return e.code, e.read(), dict(e.headers)
except urllib.error.URLError as e:
die(f"Connection failed: {e.reason}")
def parse_error(body: bytes, default: str = "request failed") -> str:
"""Parse error message from JSON response body."""
try:
result = json.loads(body).get("error", default)
return str(result) if result is not None else default
except (json.JSONDecodeError, UnicodeDecodeError):
return default
# -----------------------------------------------------------------------------
# Configuration
# -----------------------------------------------------------------------------
def check_config_permissions(path: Path) -> None:
"""CLI-003: Warn if config file has insecure permissions."""
try:
mode = path.stat().st_mode
# Warn if group or others can read (should be 600 or 640)
if mode & stat.S_IROTH:
warn(f"config file {path} is world-readable (mode {stat.filemode(mode)})")
elif mode & stat.S_IRGRP:
# Group-readable is less severe, only warn if also has secrets
pass # Silent for group-readable, common in shared setups
except OSError:
pass # File may not exist yet or permission denied
def read_config_file(path: Path | None = None) -> dict[str, str]:
"""Read config file and return key-value pairs."""
path = path or CONFIG_FILE
result: dict[str, str] = {}
if not path.exists():
return result
# CLI-003: Check file permissions before reading
check_config_permissions(path)
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip().lower()
if key in CONFIG_KEYS:
result[key] = value.strip().strip('"').strip("'")
return result
def write_config_file(
updates: dict[str, str],
path: Path | None = None,
) -> Path:
"""Update config file with new values, preserving existing entries."""
path = path or CONFIG_FILE
path.parent.mkdir(parents=True, exist_ok=True)
existing = read_config_file(path)
existing.update(updates)
lines = [f"{k} = {v}" for k, v in sorted(existing.items())]
path.write_text("\n".join(lines) + "\n")
return path
def get_config() -> dict[str, Any]:
"""Load configuration from environment and config file."""
config: dict[str, Any] = {
"server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000"),
"cert_sha1": os.environ.get("FLASKPASTE_CERT_SHA1", ""),
"client_cert": os.environ.get("FLASKPASTE_CLIENT_CERT", ""),
"client_key": os.environ.get("FLASKPASTE_CLIENT_KEY", ""),
"ca_cert": os.environ.get("FLASKPASTE_CA_CERT", ""),
}
# Config file values (lower priority than environment)
file_config = read_config_file()
for key in CONFIG_KEYS:
if not config.get(key) and key in file_config:
config[key] = file_config[key]
return config
def create_ssl_context(config: Mapping[str, Any]) -> ssl.SSLContext | None:
"""Create SSL context for mTLS if certificates are configured."""
client_cert = config.get("client_cert", "")
if not client_cert:
return None
ctx = ssl.create_default_context()
# CLI-002: Explicitly enable hostname verification (defense in depth)
# create_default_context() sets these, but explicit is safer
ctx.check_hostname = True
ctx.verify_mode = ssl.CERT_REQUIRED
if ca_cert := config.get("ca_cert", ""):
ctx.load_verify_locations(ca_cert)
try:
ctx.load_cert_chain(certfile=client_cert, keyfile=config.get("client_key") or None)
except ssl.SSLError as e:
die(f"failed to load client certificate: {e}")
except FileNotFoundError as e:
die(f"certificate file not found: {e}")
return ctx
# -----------------------------------------------------------------------------
# Encryption
# -----------------------------------------------------------------------------
def encrypt_content(plaintext: bytes) -> tuple[bytes, bytes]:
"""Encrypt content with AES-256-GCM. Returns (ciphertext, key)."""
if not HAS_CRYPTO:
die("encryption requires 'cryptography' package: pip install cryptography")
key = os.urandom(32)
nonce = os.urandom(12)
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, plaintext, None)
return nonce + ciphertext, key
def decrypt_content(blob: bytes, key: bytes) -> bytes:
"""Decrypt AES-256-GCM encrypted content."""
if not HAS_CRYPTO:
die("decryption requires 'cryptography' package: pip install cryptography")
if len(blob) < 12:
die("encrypted content too short")
nonce, ciphertext = blob[:12], blob[12:]
aesgcm = AESGCM(key)
try:
return aesgcm.decrypt(nonce, ciphertext, None)
except Exception:
die("decryption failed (wrong key or corrupted data)")
def encode_key(key: bytes) -> str:
"""Encode key as URL-safe base64."""
return base64.urlsafe_b64encode(key).decode().rstrip("=")
def decode_key(encoded: str) -> bytes:
"""Decode URL-safe base64 key."""
padding = 4 - (len(encoded) % 4)
if padding != 4:
encoded += "=" * padding
try:
return base64.urlsafe_b64decode(encoded)
except Exception:
die("invalid encryption key in URL")
# -----------------------------------------------------------------------------
# Proof-of-work
# -----------------------------------------------------------------------------
def solve_pow(nonce: str, difficulty: int) -> int:
"""Solve proof-of-work: find N where SHA256(nonce:N) has `difficulty` leading zero bits."""
n = 0
target_bytes = (difficulty + 7) // 8
while True:
work = f"{nonce}:{n}".encode()
hash_bytes = hashlib.sha256(work).digest()
zero_bits = 0
for byte in hash_bytes[: target_bytes + 1]:
if byte == 0:
zero_bits += 8
else:
zero_bits += 8 - byte.bit_length()
break
if zero_bits >= difficulty:
return n
n += 1
if n % 100000 == 0:
print(f"\rsolving pow: {n} attempts...", end="", file=sys.stderr)
def get_challenge(
config: Mapping[str, Any],
endpoint: str = "/challenge",
) -> dict[str, Any] | None:
"""Fetch PoW challenge from server."""
url = config["server"].rstrip("/") + endpoint
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status != 200:
return None
data = json.loads(body)
return data if data.get("enabled") else None
# -----------------------------------------------------------------------------
# Formatting utilities
# -----------------------------------------------------------------------------
def format_size(size: int) -> str:
"""Format byte size as human-readable string."""
if size < 1024:
return f"{size}B"
if size < 1024 * 1024:
return f"{size / 1024:.1f}K"
return f"{size / (1024 * 1024):.1f}M"
def format_timestamp(ts: int | float) -> str:
"""Format Unix timestamp as human-readable date."""
dt = datetime.fromtimestamp(ts, tz=UTC)
return dt.strftime("%Y-%m-%d %H:%M")
def format_time_remaining(expires_at: int | float | None) -> str:
"""Format time remaining until expiry."""
if not expires_at:
return ""
now = time.time()
remaining = expires_at - now
if remaining <= 0:
return "expired"
if remaining < 60:
return f"{int(remaining)}s"
if remaining < 3600:
return f"{int(remaining / 60)}m"
if remaining < 86400:
hours = int(remaining / 3600)
return f"{hours}h"
days = int(remaining / 86400)
if days >= 365:
years = days // 365
return f"{years}y"
return f"{days}d"
def parse_date(date_str: str) -> int:
"""Parse date string to Unix timestamp."""
if not date_str:
return 0
for fmt in DATE_FORMATS:
try:
dt = datetime.strptime(date_str, fmt).replace(tzinfo=UTC)
return int(dt.timestamp())
except ValueError:
continue
try:
return int(date_str)
except ValueError:
die(f"invalid date format: {date_str}")
def get_extension_for_mime(mime_type: str) -> str:
"""Get file extension for MIME type."""
return MIME_EXTENSIONS.get(mime_type, ".bin")
def format_paste_row(paste: dict[str, Any], show_owner: bool = False) -> str:
"""Format a paste as a table row."""
paste_id = paste["id"]
mime_type = paste.get("mime_type", "unknown")[:16]
size = format_size(paste.get("size", 0))
created = format_timestamp(paste.get("created_at", 0))
# Time remaining until expiry
expires = format_time_remaining(paste.get("expires_at"))
flags = []
if paste.get("burn_after_read"):
flags.append("burn")
if paste.get("password_protected"):
flags.append("pass")
flags_str = " ".join(flags)
row = f"{paste_id:<12} {mime_type:<16} {size:>6} {created:<16} {expires:<8} {flags_str}"
if show_owner and paste.get("owner"):
row += f" {paste['owner'][:12]}"
return row
def print_paste_list(
pastes: list[dict[str, Any]],
summary: str,
as_json: bool = False,
data: dict[str, Any] | None = None,
) -> None:
"""Print a list of pastes in table or JSON format."""
if as_json:
print(json.dumps(data or {"pastes": pastes}, indent=2))
return
if not pastes:
print("no pastes found")
return
# Check if owner data is present (admin view)
show_owner = any(paste.get("owner") for paste in pastes)
header = f"{'ID':<12} {'TYPE':<16} {'SIZE':>6} {'CREATED':<16} {'EXPIRES':<8} FLAGS"
if show_owner:
header += " OWNER"
print(header)
for paste in pastes:
print(format_paste_row(paste, show_owner=show_owner))
print(f"\n{summary}")
# -----------------------------------------------------------------------------
# Clipboard integration
# -----------------------------------------------------------------------------
# Clipboard read commands (tool name, command args)
CLIPBOARD_READ_COMMANDS = [
("xclip", ["xclip", "-selection", "clipboard", "-o"]),
("xsel", ["xsel", "--clipboard", "--output"]),
("pbpaste", ["pbpaste"]),
("powershell.exe", ["powershell.exe", "-command", "Get-Clipboard"]),
("wl-paste", ["wl-paste"]),
]
# Clipboard write commands (tool name, command args)
CLIPBOARD_WRITE_COMMANDS = [
("xclip", ["xclip", "-selection", "clipboard", "-i"]),
("xsel", ["xsel", "--clipboard", "--input"]),
("pbcopy", ["pbcopy"]),
("clip.exe", ["clip.exe"]),
("wl-copy", ["wl-copy"]),
]
# CLI-001: Trusted directories for clipboard tools (system paths only)
# Prevents command injection via malicious PATH manipulation
TRUSTED_CLIPBOARD_DIRS = frozenset(
{
"/usr/bin",
"/usr/local/bin",
"/bin",
"/opt/homebrew/bin", # macOS Homebrew
"/usr/X11/bin",
"/usr/X11R6/bin",
}
)
# Windows system directories (checked case-insensitively)
TRUSTED_WINDOWS_PATTERNS = (
"\\windows\\",
"\\system32\\",
"\\syswow64\\",
"\\windowsapps\\",
)
def is_trusted_clipboard_path(path: str) -> bool:
"""Check if clipboard tool path is in a trusted system directory.
CLI-001: Validates that resolved clipboard tool paths are in expected
system locations to prevent command injection via PATH manipulation.
"""
if not path:
return False
resolved = Path(path).resolve()
parent = str(resolved.parent)
# Check Unix trusted directories
if parent in TRUSTED_CLIPBOARD_DIRS:
return True
# Check Windows paths (case-insensitive)
parent_lower = parent.lower()
for pattern in TRUSTED_WINDOWS_PATTERNS:
if pattern in parent_lower:
return True
# Also allow Windows Program Files paths
return "\\program files" in parent_lower
def find_clipboard_command(commands: list[tuple[str, list[str]]]) -> list[str] | None:
"""Find first available clipboard command in trusted directories.
CLI-001: Validates that found commands are in trusted system directories
to prevent command injection via PATH manipulation.
"""
for tool_name, cmd in commands:
tool_path = shutil.which(tool_name)
if tool_path and is_trusted_clipboard_path(tool_path):
# Use absolute path for security
return [tool_path, *cmd[1:]]
return None
def read_clipboard() -> bytes:
"""Read content from system clipboard."""
cmd = find_clipboard_command(CLIPBOARD_READ_COMMANDS)
if not cmd:
die("no clipboard tool found (install xclip, xsel, or wl-paste)")
try:
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
die(f"clipboard read failed: {e.stderr.decode(errors='replace')}")
except FileNotFoundError:
die(f"clipboard tool not found: {cmd[0]}")
def write_clipboard(data: bytes) -> None:
"""Write content to system clipboard."""
cmd = find_clipboard_command(CLIPBOARD_WRITE_COMMANDS)
if not cmd:
die("no clipboard tool found (install xclip, xsel, or wl-copy)")
try:
subprocess.run(cmd, input=data, check=True)
except subprocess.CalledProcessError as e:
die(f"clipboard write failed: {e.stderr.decode(errors='replace') if e.stderr else ''}")
except FileNotFoundError:
die(f"clipboard tool not found: {cmd[0]}")
# -----------------------------------------------------------------------------
# Content helpers
# -----------------------------------------------------------------------------
def read_content(file_arg: str | None, from_clipboard: bool = False) -> bytes:
"""Read content from file, stdin, or clipboard."""
if from_clipboard:
return read_clipboard()
if file_arg:
if file_arg == "-":
return sys.stdin.buffer.read()
path = Path(file_arg)
if not path.exists():
die(f"file not found: {file_arg}")
return path.read_bytes()
if sys.stdin.isatty():
die("no input provided (pipe data, specify file, or use -C for clipboard)")
return sys.stdin.buffer.read()
def prepare_content(
content: bytes,
encrypt: bool,
quiet: bool = False,
) -> tuple[bytes, bytes | None]:
"""Optionally encrypt content. Returns (content, encryption_key or None)."""
if not encrypt:
return content, None
if not HAS_CRYPTO:
die("encryption requires 'cryptography' package (use -E to disable)")
if not quiet:
print("encrypting...", end="", file=sys.stderr)
encrypted, key = encrypt_content(content)
if not quiet:
print(" done", file=sys.stderr)
return encrypted, key
def extract_paste_id(url_or_id: str) -> tuple[str, bytes | None]:
"""Extract paste ID and optional encryption key from URL or ID."""
encryption_key = None
if "#" in url_or_id:
url_or_id, key_encoded = url_or_id.rsplit("#", 1)
if key_encoded:
encryption_key = decode_key(key_encoded)
paste_id = url_or_id.split("/")[-1]
return paste_id, encryption_key
def auth_headers(config: Mapping[str, Any]) -> dict[str, str]:
"""Build authentication headers."""
if cert_sha1 := config.get("cert_sha1"):
return {"X-SSL-Client-SHA1": cert_sha1}
return {}
def require_auth(config: Mapping[str, Any]) -> None:
"""Ensure authentication is configured."""
if not config.get("cert_sha1"):
die("authentication required (set FLASKPASTE_CERT_SHA1)")
# -----------------------------------------------------------------------------
# Commands
# -----------------------------------------------------------------------------
def cmd_create(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""Create a new paste."""
from_clipboard = getattr(args, "clipboard", False)
content = read_content(args.file, from_clipboard=from_clipboard)
if not content:
die("empty content")
content, encryption_key = prepare_content(
content,
encrypt=not getattr(args, "no_encrypt", False),
quiet=args.quiet,
)
# Build headers
base_headers = auth_headers(config)
if args.burn:
base_headers["X-Burn-After-Read"] = "true"
if args.expiry:
base_headers["X-Expiry"] = str(args.expiry)
if args.password:
base_headers["X-Paste-Password"] = args.password
url = config["server"].rstrip("/") + "/"
max_retries = 5
last_error = ""
for attempt in range(max_retries):
headers = dict(base_headers)
if challenge := get_challenge(config):
if attempt > 0 and not args.quiet:
print(f"retry {attempt}/{max_retries - 1}...", file=sys.stderr)
if not args.quiet:
diff = challenge["difficulty"]
base_diff = challenge.get("base_difficulty", diff)
elevated = challenge.get("elevated", False)
msg = (
f"solving pow ({diff} bits, elevated from {base_diff})..."
if elevated
else f"solving pow ({diff} bits)..."
)
print(msg, end="", file=sys.stderr)
solution = solve_pow(challenge["nonce"], challenge["difficulty"])
if not args.quiet:
print(" done", file=sys.stderr)
headers["X-PoW-Token"] = challenge["token"]
headers["X-PoW-Solution"] = str(solution)
status, body, _ = request(
url, method="POST", data=content, headers=headers, ssl_context=config.get("ssl_context")
)
if status == 201:
data = json.loads(body)
key_fragment = f"#{encode_key(encryption_key)}" if encryption_key else ""
base_url = config["server"].rstrip("/")
if args.raw:
result_url = base_url + data["raw"] + key_fragment
elif args.quiet:
result_url = data["id"] + key_fragment
else:
result_url = base_url + data["url"] + key_fragment
print(result_url)
# Copy URL to clipboard if requested
if getattr(args, "copy_url", False):
write_clipboard(result_url.encode())
if not args.quiet:
print("(copied to clipboard)", file=sys.stderr)
return
last_error = parse_error(body, body.decode(errors="replace"))
err_lower = last_error.lower()
is_pow_error = status == 400 and ("pow" in err_lower or "proof-of-work" in err_lower)
if not is_pow_error:
die(f"create failed ({status}): {last_error}")
if not args.quiet:
print(f"pow rejected: {last_error}", file=sys.stderr)
die(f"create failed after {max_retries} attempts: {last_error}")
def cmd_get(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""Retrieve a paste."""
paste_id, encryption_key = extract_paste_id(args.id)
base = config["server"].rstrip("/")
headers: dict[str, str] = {}
if args.password:
headers["X-Paste-Password"] = args.password
if args.meta:
url = f"{base}/{paste_id}"
status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context"))
if status == 200:
data = json.loads(body)
print(f"id: {data['id']}")
print(f"mime_type: {data['mime_type']}")
print(f"size: {data['size']}")
print(f"created_at: {data['created_at']}")
if encryption_key:
print("encrypted: yes (key in URL)")
if data.get("password_protected"):
print("protected: yes (password required)")
elif status == 401:
die("password required (-p)")
elif status == 403:
die("invalid password")
else:
die(f"not found: {paste_id}")
else:
url = f"{base}/{paste_id}/raw"
status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context"))
if status == 200:
if encryption_key:
body = decrypt_content(body, encryption_key)
# Copy to clipboard if requested
if getattr(args, "copy", False):
write_clipboard(body)
print("(copied to clipboard)", file=sys.stderr)
elif args.output:
Path(args.output).write_bytes(body)
print(f"saved: {args.output}", file=sys.stderr)
else:
sys.stdout.buffer.write(body)
if sys.stdout.isatty() and body and not body.endswith(b"\n"):
sys.stdout.buffer.write(b"\n")
elif status == 401:
die("password required (-p)")
elif status == 403:
die("invalid password")
else:
die(f"not found: {paste_id}")
def cmd_delete(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""Delete paste(s)."""
require_auth(config)
delete_all = getattr(args, "all", False)
confirm_count = getattr(args, "confirm", None)
paste_ids = [paste_id.split("/")[-1] for paste_id in (args.ids or [])]
# Validate arguments
if delete_all and paste_ids:
die("cannot specify both --all and paste IDs")
if not delete_all and not paste_ids:
die("specify paste ID(s) or use --all")
if delete_all:
# Fetch all pastes to get count and IDs
url = f"{config['server'].rstrip('/')}/pastes?all=1&limit=1000"
status, body, _ = request(
url, headers=auth_headers(config), ssl_context=config.get("ssl_context")
)
if status == 401:
die("authentication failed")
if status != 200:
die(f"failed to list pastes ({status})")
data = json.loads(body)
pastes = data.get("pastes", [])
total = len(pastes)
if total == 0:
print("no pastes to delete")
return
# Require confirmation with expected count
if confirm_count is None:
die(f"--all requires --confirm {total} (found {total} pastes)")
if confirm_count != total:
die(f"confirmation mismatch: expected {confirm_count}, found {total}")
paste_ids = [p["id"] for p in pastes]
# Delete pastes
deleted = 0
failed = 0
for paste_id in paste_ids:
url = f"{config['server'].rstrip('/')}/{paste_id}"
status, _, _ = request(
url,
method="DELETE",
headers=auth_headers(config),
ssl_context=config.get("ssl_context"),
)
if status == 200:
print(f"deleted: {paste_id}")
deleted += 1
elif status == 404:
print(f"not found: {paste_id}", file=sys.stderr)
failed += 1
elif status == 403:
print(f"permission denied: {paste_id}", file=sys.stderr)
failed += 1
else:
print(f"failed ({status}): {paste_id}", file=sys.stderr)
failed += 1
# Summary for batch operations
if len(paste_ids) > 1:
print(f"\n{deleted} deleted, {failed} failed")
def cmd_info(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""Show server info."""
url = config["server"].rstrip("/") + "/"
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status != 200:
die("failed to connect to server")
data = json.loads(body)
print(f"server: {config['server']}")
print(f"name: {data.get('name', 'unknown')}")
print(f"version: {data.get('version', 'unknown')}")
if challenge := get_challenge(config):
difficulty = challenge.get("difficulty", 0)
base_diff = challenge.get("base_difficulty", difficulty)
if challenge.get("elevated"):
print(f"pow: {difficulty} bits (elevated from {base_diff})")
else:
print(f"pow: {difficulty} bits")
else:
print("pow: disabled")
def cmd_list(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""List user's pastes."""
require_auth(config)
params = []
if getattr(args, "all", False):
params.append("all=1")
if args.limit:
params.append(f"limit={args.limit}")
if args.offset:
params.append(f"offset={args.offset}")
url = f"{config['server'].rstrip('/')}/pastes"
if params:
url += "?" + "&".join(params)
status, body, _ = request(
url, headers=auth_headers(config), ssl_context=config.get("ssl_context")
)
if status == 401:
die("authentication failed")
if status != 200:
die(f"failed to list pastes ({status})")
data = json.loads(body)
pastes = data.get("pastes", [])
summary = f"{data.get('count', 0)} of {data.get('total', 0)} pastes shown"
print_paste_list(pastes, summary, as_json=args.json, data=data)
def cmd_search(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""Search user's pastes."""
require_auth(config)
params = []
if args.type:
params.append(f"type={args.type}")
if args.after:
params.append(f"after={parse_date(args.after)}")
if args.before:
params.append(f"before={parse_date(args.before)}")
if args.limit:
params.append(f"limit={args.limit}")
url = f"{config['server'].rstrip('/')}/pastes"
if params:
url += "?" + "&".join(params)
status, body, _ = request(
url, headers=auth_headers(config), ssl_context=config.get("ssl_context")
)
if status == 401:
die("authentication failed")
if status != 200:
die(f"failed to search pastes ({status})")
data = json.loads(body)
pastes = data.get("pastes", [])
summary = f"{data.get('count', 0)} matching pastes found"
print_paste_list(pastes, summary, as_json=args.json, data=data)
def cmd_update(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""Update an existing paste."""
require_auth(config)
paste_id, _ = extract_paste_id(args.id)
url = f"{config['server'].rstrip('/')}/{paste_id}"
headers = auth_headers(config)
content: bytes | None = None
encryption_key: bytes | None = None
if args.file:
raw_content = read_content(args.file)
if not raw_content:
die("empty content")
content, encryption_key = prepare_content(
raw_content,
encrypt=not getattr(args, "no_encrypt", False),
quiet=args.quiet,
)
if args.password:
headers["X-Paste-Password"] = args.password
if args.remove_password:
headers["X-Remove-Password"] = "true"
if args.expiry:
headers["X-Extend-Expiry"] = str(args.expiry)
status, body, _ = request(
url, method="PUT", data=content, headers=headers, ssl_context=config.get("ssl_context")
)
if status == 200:
data = json.loads(body)
if args.quiet:
print(paste_id)
else:
print(f"updated: {paste_id}")
print(f" size: {data.get('size', 'unknown')}")
print(f" type: {data.get('mime_type', 'unknown')}")
if data.get("expires_at"):
print(f" expires: {data.get('expires_at')}")
if data.get("password_protected"):
print(" password: protected")
if content and encryption_key:
base = config["server"].rstrip("/")
print(f" key: {base}/{paste_id}#{encode_key(encryption_key)}")
elif status == 400:
die(parse_error(body, "bad request"))
elif status == 401:
die("authentication failed")
elif status == 403:
die("permission denied (not owner)")
elif status == 404:
die(f"not found: {paste_id}")
else:
die(f"update failed ({status})")
def cmd_export(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""Export user's pastes to a directory."""
require_auth(config)
out_dir = Path(args.output) if args.output else Path("fpaste-export")
out_dir.mkdir(parents=True, exist_ok=True)
# Load key file
keys: dict[str, str] = {}
if args.keyfile:
keyfile_path = Path(args.keyfile)
if not keyfile_path.exists():
die(f"key file not found: {args.keyfile}")
for line in keyfile_path.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
paste_id, key_encoded = line.split("=", 1)
keys[paste_id.strip()] = key_encoded.strip()
# Fetch paste list
url = f"{config['server'].rstrip('/')}/pastes?limit=1000"
status, body, _ = request(
url, headers=auth_headers(config), ssl_context=config.get("ssl_context")
)
if status == 401:
die("authentication failed")
if status != 200:
die(f"failed to list pastes ({status})")
pastes = json.loads(body).get("pastes", [])
if not pastes:
print("no pastes to export")
return
exported, skipped, errors = 0, 0, 0
manifest: list[dict[str, Any]] = []
for paste in pastes:
paste_id = paste["id"]
mime_type = paste.get("mime_type", "application/octet-stream")
if not args.quiet:
print(f"exporting {paste_id}...", end=" ", file=sys.stderr)
if paste.get("burn_after_read"):
if not args.quiet:
print("skipped (burn-after-read)", file=sys.stderr)
skipped += 1
continue
if paste.get("password_protected"):
if not args.quiet:
print("skipped (password-protected)", file=sys.stderr)
skipped += 1
continue
raw_url = f"{config['server'].rstrip('/')}/{paste_id}/raw"
status, content, _ = request(
raw_url, headers=auth_headers(config), ssl_context=config.get("ssl_context")
)
if status != 200:
if not args.quiet:
print(f"error ({status})", file=sys.stderr)
errors += 1
continue
decrypted = False
if paste_id in keys:
try:
key = decode_key(keys[paste_id])
content = decrypt_content(content, key)
decrypted = True
except SystemExit:
if not args.quiet:
print("decryption failed, keeping encrypted", file=sys.stderr, end=" ")
filename = f"{paste_id}{get_extension_for_mime(mime_type)}"
(out_dir / filename).write_bytes(content)
manifest.append(
{
"id": paste_id,
"filename": filename,
"mime_type": mime_type,
"size": len(content),
"created_at": paste.get("created_at"),
"decrypted": decrypted,
"encrypted": paste_id in keys and not decrypted,
}
)
if not args.quiet:
status_msg = "decrypted" if decrypted else ("encrypted" if paste_id in keys else "ok")
print(status_msg, file=sys.stderr)
exported += 1
if args.manifest:
manifest_path = out_dir / "manifest.json"
manifest_path.write_text(json.dumps(manifest, indent=2))
if not args.quiet:
print(f"manifest: {manifest_path}", file=sys.stderr)
print(f"\nexported: {exported}, skipped: {skipped}, errors: {errors}")
print(f"output: {out_dir}")
def cmd_pki_status(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""Show PKI status and CA information."""
url = config["server"].rstrip("/") + "/pki"
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status == 404:
die("PKI not enabled on this server")
if status != 200:
die(f"failed to get PKI status ({status})")
data = json.loads(body)
print(f"pki enabled: {data.get('enabled', False)}")
print(f"ca exists: {data.get('ca_exists', False)}")
if data.get("ca_exists"):
print(f"common name: {data.get('common_name', 'unknown')}")
print(f"fingerprint: {data.get('fingerprint_sha1', 'unknown')}")
if data.get("created_at"):
print(f"created: {data.get('created_at')}")
if data.get("expires_at"):
print(f"expires: {data.get('expires_at')}")
print(f"download: {config['server'].rstrip('/')}{data.get('download', '/pki/ca.crt')}")
elif hint := data.get("hint"):
print(f"hint: {hint}")
def cmd_pki_issue(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""Request a new client certificate from the server CA."""
url = config["server"].rstrip("/") + "/pki/issue"
headers = {"Content-Type": "application/json", **auth_headers(config)}
payload = json.dumps({"common_name": args.name}).encode()
status, body, _ = request(
url, method="POST", data=payload, headers=headers, ssl_context=config.get("ssl_context")
)
if status == 404:
die(parse_error(body, "PKI not available"))
if status == 400:
die(parse_error(body, "bad request"))
if status != 201:
die(f"certificate issuance failed ({status})")
result = json.loads(body)
out_dir = Path(args.output) if args.output else CONFIG_DIR
out_dir.mkdir(parents=True, exist_ok=True)
key_file = out_dir / "client.key"
cert_file = out_dir / "client.crt"
if not args.force:
if key_file.exists():
die(f"key file exists: {key_file} (use --force)")
if cert_file.exists():
die(f"cert file exists: {cert_file} (use --force)")
key_file.write_text(result["private_key_pem"])
key_file.chmod(0o600)
cert_file.write_text(result["certificate_pem"])
fingerprint = result.get("fingerprint_sha1", "unknown")
print(f"key: {key_file}", file=sys.stderr)
print(f"certificate: {cert_file}", file=sys.stderr)
print(f"fingerprint: {fingerprint}", file=sys.stderr)
print(f"serial: {result.get('serial', 'unknown')}", file=sys.stderr)
print(f"common name: {result.get('common_name', args.name)}", file=sys.stderr)
if args.configure:
cfg_path = write_config_file(
{
"client_cert": str(cert_file),
"client_key": str(key_file),
"cert_sha1": fingerprint,
}
)
print(f"config: {cfg_path} (updated)", file=sys.stderr)
print(fingerprint)
def cmd_pki_download(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""Download the CA certificate from the server."""
url = config["server"].rstrip("/") + "/pki/ca.crt"
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status == 404:
die("CA certificate not available (PKI disabled or CA not generated)")
if status != 200:
die(f"failed to download CA certificate ({status})")
if args.output:
out_path = Path(args.output)
out_path.write_bytes(body)
print(f"saved: {out_path}", file=sys.stderr)
if HAS_CRYPTO:
cert = x509.load_pem_x509_certificate(body)
fp = hashlib.sha1(
cert.public_bytes(serialization.Encoding.DER), usedforsecurity=False
).hexdigest()
print(f"fingerprint: {fp}", file=sys.stderr)
if args.configure:
cfg_path = write_config_file({"ca_cert": str(out_path)})
print(f"config: {cfg_path} (updated)", file=sys.stderr)
else:
sys.stdout.buffer.write(body)
def cmd_register(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""Register and obtain a client certificate from the server."""
if not HAS_CRYPTO:
die("register requires 'cryptography' package: pip install cryptography")
from cryptography.hazmat.primitives.serialization import pkcs12
url = config["server"].rstrip("/") + "/register"
headers = {"Content-Type": "application/json"}
payload: dict[str, str] = {}
if args.name:
payload["common_name"] = args.name
if challenge := get_challenge(config, endpoint="/register/challenge"):
if not args.quiet:
print(f"solving pow ({challenge['difficulty']} bits)...", end="", file=sys.stderr)
solution = solve_pow(challenge["nonce"], challenge["difficulty"])
if not args.quiet:
print(" done", file=sys.stderr)
headers["X-PoW-Token"] = challenge["token"]
headers["X-PoW-Solution"] = str(solution)
data = json.dumps(payload).encode() if payload else b"{}"
status, body, resp_headers = request(
url, method="POST", data=data, headers=headers, ssl_context=config.get("ssl_context")
)
if status == 400:
die(parse_error(body, "bad request"))
if status == 500:
die(parse_error(body, "server error"))
if status != 200:
die(f"registration failed ({status})")
fingerprint = resp_headers.get("X-Fingerprint-SHA1", "unknown")
out_dir = Path(args.output) if args.output else CONFIG_DIR
out_dir.mkdir(parents=True, exist_ok=True)
p12_file = out_dir / "client.p12"
key_file = out_dir / "client.key"
cert_file = out_dir / "client.crt"
if not args.force:
if p12_file.exists():
die(f"p12 file exists: {p12_file} (use --force)")
if not args.p12_only:
if key_file.exists():
die(f"key file exists: {key_file} (use --force)")
if cert_file.exists():
die(f"cert file exists: {cert_file} (use --force)")
p12_file.write_bytes(body)
p12_file.chmod(0o600)
print(f"pkcs12: {p12_file}", file=sys.stderr)
if not args.p12_only:
private_key, certificate, _ = pkcs12.load_key_and_certificates(body, None)
if private_key is None or certificate is None:
die("failed to parse PKCS#12 bundle")
key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
cert_pem = certificate.public_bytes(serialization.Encoding.PEM)
key_file.write_bytes(key_pem)
key_file.chmod(0o600)
cert_file.write_bytes(cert_pem)
print(f"key: {key_file}", file=sys.stderr)
print(f"certificate: {cert_file}", file=sys.stderr)
cn = certificate.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
if cn:
cn_value = cn[0].value
if isinstance(cn_value, bytes):
cn_value = cn_value.decode("utf-8", errors="replace")
print(f"common name: {cn_value}", file=sys.stderr)
print(f"fingerprint: {fingerprint}", file=sys.stderr)
if args.configure and not args.p12_only:
cfg_path = write_config_file(
{
"client_cert": str(cert_file),
"client_key": str(key_file),
"cert_sha1": fingerprint,
}
)
print(f"config: {cfg_path} (updated)", file=sys.stderr)
print(fingerprint)
def cmd_cert(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""Generate a self-signed client certificate for mTLS authentication."""
if not HAS_CRYPTO:
die("certificate generation requires 'cryptography' package: pip install cryptography")
out_dir = Path(args.output) if args.output else CONFIG_DIR
out_dir.mkdir(parents=True, exist_ok=True)
key_file = out_dir / "client.key"
cert_file = out_dir / "client.crt"
if not args.force:
if key_file.exists():
die(f"key file exists: {key_file} (use --force)")
if cert_file.exists():
die(f"cert file exists: {cert_file} (use --force)")
# Generate private key
private_key: rsa.RSAPrivateKey | ec.EllipticCurvePrivateKey
if args.algorithm == "rsa":
key_size = args.bits or 4096
print(f"generating {key_size}-bit RSA key...", file=sys.stderr)
private_key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)
elif args.algorithm == "ec":
curve_name = args.curve or "secp384r1"
curves = {
"secp256r1": ec.SECP256R1(),
"secp384r1": ec.SECP384R1(),
"secp521r1": ec.SECP521R1(),
}
if curve_name not in curves:
die(f"unsupported curve: {curve_name} (use: secp256r1, secp384r1, secp521r1)")
print(f"generating EC key ({curve_name})...", file=sys.stderr)
private_key = ec.generate_private_key(curves[curve_name])
else:
die(f"unsupported algorithm: {args.algorithm}")
cn = args.name or os.environ.get("USER", "fpaste-client")
subject = issuer = x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, cn),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "FlaskPaste Client"),
]
)
days = args.days or 365
now = datetime.now(UTC)
cert_builder = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + timedelta(days=days))
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
.add_extension(
x509.KeyUsage(
digital_signature=True,
key_encipherment=True,
content_commitment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False,
),
critical=True,
)
.add_extension(
x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]),
critical=False,
)
)
print("signing certificate...", file=sys.stderr)
certificate = cert_builder.sign(private_key, hashes.SHA256())
cert_der = certificate.public_bytes(serialization.Encoding.DER)
fingerprint = hashlib.sha1(cert_der, usedforsecurity=False).hexdigest()
key_encryption = (
serialization.BestAvailableEncryption(args.password_key.encode())
if args.password_key
else serialization.NoEncryption()
)
key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=key_encryption,
)
cert_pem = certificate.public_bytes(serialization.Encoding.PEM)
key_file.write_bytes(key_pem)
key_file.chmod(0o600)
cert_file.write_bytes(cert_pem)
print(f"key: {key_file}", file=sys.stderr)
print(f"certificate: {cert_file}", file=sys.stderr)
print(f"fingerprint: {fingerprint}", file=sys.stderr)
print(f"valid for: {days} days", file=sys.stderr)
print(f"common name: {cn}", file=sys.stderr)
if args.configure:
cfg_path = write_config_file(
{
"client_cert": str(cert_file),
"client_key": str(key_file),
"cert_sha1": fingerprint,
}
)
print(f"config: {cfg_path} (updated)", file=sys.stderr)
print(fingerprint)
# -----------------------------------------------------------------------------
# Argument parsing
# -----------------------------------------------------------------------------
def is_file_path(arg: str) -> bool:
"""Check if argument looks like a file path."""
if not arg or arg.startswith("-"):
return False
if Path(arg).exists():
return True
if "/" in arg or "\\" in arg:
return True
if "." in arg and not arg.startswith("."):
ext = arg.rsplit(".", 1)[-1].lower()
return ext in FILE_EXTENSIONS
return False
def build_parser() -> argparse.ArgumentParser:
"""Build and return the argument parser."""
parser = argparse.ArgumentParser(
prog="fpaste",
description="FlaskPaste command-line client",
epilog="Shortcut: fpaste <file> is equivalent to fpaste create <file>",
)
parser.add_argument("-s", "--server", help="server URL (env: FLASKPASTE_SERVER)")
subparsers = parser.add_subparsers(dest="command", metavar="command")
# create
p_create = subparsers.add_parser("create", aliases=["c", "new"], help="create paste")
p_create.add_argument("file", nargs="?", help="file to upload (- for stdin)")
p_create.add_argument("-E", "--no-encrypt", action="store_true", help="disable encryption")
p_create.add_argument("-b", "--burn", action="store_true", help="burn after read")
p_create.add_argument("-x", "--expiry", type=int, metavar="SEC", help="expiry in seconds")
p_create.add_argument("-p", "--password", metavar="PASS", help="password protect")
p_create.add_argument("-r", "--raw", action="store_true", help="output raw URL")
p_create.add_argument("-q", "--quiet", action="store_true", help="output ID only")
p_create.add_argument("-C", "--clipboard", action="store_true", help="read from clipboard")
p_create.add_argument("--copy-url", action="store_true", help="copy result URL to clipboard")
# get
p_get = subparsers.add_parser("get", aliases=["g"], help="retrieve paste")
p_get.add_argument("id", help="paste ID or URL")
p_get.add_argument("-o", "--output", help="save to file")
p_get.add_argument("-c", "--copy", action="store_true", help="copy content to clipboard")
p_get.add_argument("-p", "--password", metavar="PASS", help="password for protected paste")
p_get.add_argument("-m", "--meta", action="store_true", help="show metadata only")
# delete
p_delete = subparsers.add_parser("delete", aliases=["d", "rm"], help="delete paste(s)")
p_delete.add_argument("ids", nargs="*", metavar="ID", help="paste ID(s) or URL(s)")
p_delete.add_argument("-a", "--all", action="store_true", help="delete all pastes (admin)")
p_delete.add_argument(
"-c", "--confirm", type=int, metavar="N", help="confirm expected delete count"
)
# info
subparsers.add_parser("info", aliases=["i"], help="show server info")
# list
p_list = subparsers.add_parser("list", aliases=["ls"], help="list your pastes")
p_list.add_argument("-a", "--all", action="store_true", help="list all pastes (admin only)")
p_list.add_argument("-l", "--limit", type=int, metavar="N", help="max pastes (default: 50)")
p_list.add_argument("-o", "--offset", type=int, metavar="N", help="skip first N pastes")
p_list.add_argument("--json", action="store_true", help="output as JSON")
# search
p_search = subparsers.add_parser("search", aliases=["s", "find"], help="search your pastes")
p_search.add_argument("-t", "--type", metavar="PATTERN", help="filter by MIME type (image/*)")
p_search.add_argument("--after", metavar="DATE", help="created after (YYYY-MM-DD or timestamp)")
p_search.add_argument("--before", metavar="DATE", help="created before (YYYY-MM-DD)")
p_search.add_argument("-l", "--limit", type=int, metavar="N", help="max results (default: 50)")
p_search.add_argument("--json", action="store_true", help="output as JSON")
# update
p_update = subparsers.add_parser("update", aliases=["u"], help="update existing paste")
p_update.add_argument("id", help="paste ID or URL")
p_update.add_argument("file", nargs="?", help="new content (- for stdin)")
p_update.add_argument("-E", "--no-encrypt", action="store_true", help="disable encryption")
p_update.add_argument("-p", "--password", metavar="PASS", help="set/change password")
p_update.add_argument("--remove-password", action="store_true", help="remove password")
p_update.add_argument("-x", "--expiry", type=int, metavar="SEC", help="extend expiry (seconds)")
p_update.add_argument("-q", "--quiet", action="store_true", help="minimal output")
# export
p_export = subparsers.add_parser("export", help="export all pastes to directory")
p_export.add_argument("-o", "--output", metavar="DIR", help="output directory")
p_export.add_argument("-k", "--keyfile", metavar="FILE", help="key file (paste_id=key format)")
p_export.add_argument("--manifest", action="store_true", help="write manifest.json")
p_export.add_argument("-q", "--quiet", action="store_true", help="minimal output")
# register
p_register = subparsers.add_parser("register", help="register and get client certificate")
p_register.add_argument("-n", "--name", metavar="CN", help="common name (optional)")
p_register.add_argument("-o", "--output", metavar="DIR", help="output directory")
p_register.add_argument("--configure", action="store_true", help="update config file")
p_register.add_argument("--p12-only", action="store_true", help="save only PKCS#12")
p_register.add_argument("-f", "--force", action="store_true", help="overwrite existing files")
p_register.add_argument("-q", "--quiet", action="store_true", help="minimal output")
# cert
p_cert = subparsers.add_parser("cert", help="generate client certificate")
p_cert.add_argument("-o", "--output", metavar="DIR", help="output directory")
p_cert.add_argument(
"-a", "--algorithm", choices=["rsa", "ec"], default="ec", help="key algorithm"
)
p_cert.add_argument("-b", "--bits", type=int, metavar="N", help="RSA key size (default: 4096)")
p_cert.add_argument(
"-c", "--curve", metavar="CURVE", help="EC curve (secp256r1/secp384r1/secp521r1)"
)
p_cert.add_argument("-d", "--days", type=int, metavar="N", help="validity period in days")
p_cert.add_argument("-n", "--name", metavar="CN", help="common name (default: $USER)")
p_cert.add_argument("--password-key", metavar="PASS", help="encrypt private key")
p_cert.add_argument("--configure", action="store_true", help="update config file")
p_cert.add_argument("-f", "--force", action="store_true", help="overwrite existing files")
# pki
p_pki = subparsers.add_parser("pki", help="PKI operations (server-issued certificates)")
pki_sub = p_pki.add_subparsers(dest="pki_command", metavar="subcommand")
pki_sub.add_parser("status", help="show PKI status and CA info")
p_pki_issue = pki_sub.add_parser("issue", help="request certificate from server CA")
p_pki_issue.add_argument(
"-n", "--name", required=True, metavar="CN", help="common name (required)"
)
p_pki_issue.add_argument("-o", "--output", metavar="DIR", help="output directory")
p_pki_issue.add_argument("--configure", action="store_true", help="update config file")
p_pki_issue.add_argument("-f", "--force", action="store_true", help="overwrite existing files")
p_pki_download = pki_sub.add_parser("download", aliases=["dl"], help="download CA certificate")
p_pki_download.add_argument("-o", "--output", metavar="FILE", help="save to file")
p_pki_download.add_argument(
"--configure", action="store_true", help="update config file (requires -o)"
)
# completion
p_completion = subparsers.add_parser("completion", help="generate shell completion")
p_completion.add_argument(
"--shell", choices=["bash", "zsh", "fish"], default="bash", help="shell type"
)
return parser
def cmd_completion(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""Output shell completion script."""
shell = getattr(args, "shell", None) or "bash"
# Bash completion - full featured
bash_completion = """\
# Bash completion for fpaste
# Install: source this file or copy to /etc/bash_completion.d/fpaste
_fpaste_completions() {
local cur prev words cword
_init_completion || return
local commands="create c new get g delete d rm info i list ls"
commands+=" search s find update u export register cert pki completion"
local pki_commands="status issue download dl"
if [[ $cword -eq 1 ]]; then
COMPREPLY=($(compgen -W "$commands" -- "$cur"))
return
fi
local cmd="${words[1]}"
if [[ "$cmd" == "pki" && $cword -eq 2 ]]; then
COMPREPLY=($(compgen -W "$pki_commands" -- "$cur"))
return
fi
case "$cmd" in
create|c|new)
local opts="-E --no-encrypt -b --burn -x --expiry -p --password"
opts+=" -r --raw -q --quiet -C --clipboard --copy-url"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) || _filedir
;;
get|g)
local opts="-o --output -c --copy -p --password -m --meta"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
;;
delete|d|rm)
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "-a --all -c --confirm" -- "$cur"))
;;
list|ls)
local opts="-a --all -l --limit -o --offset --json"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
;;
search|s|find)
local opts="-t --type --after --before -l --limit --json"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
;;
update|u)
local opts="-E --no-encrypt -p --password --remove-password -x --expiry -q --quiet"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) || _filedir
;;
export)
local opts="-o --output -k --keyfile --manifest -q --quiet"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
;;
register)
local opts="-n --name -o --output --configure --p12-only -f --force -q --quiet"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
;;
cert)
local opts="-o --output -a --algorithm -b --bits -c --curve"
opts+=" -d --days -n --name --password-key --configure -f --force"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
;;
completion)
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "--shell" -- "$cur"))
[[ "$prev" == "--shell" ]] && COMPREPLY=($(compgen -W "bash zsh fish" -- "$cur"))
;;
esac
}
complete -F _fpaste_completions fpaste
"""
# Zsh completion - compact format
zsh_completion = """\
#compdef fpaste
_fpaste() {
local curcontext="$curcontext" state line
typeset -A opt_args
_arguments -C '-s[Server]:url:' '--server[Server]:url:' '1: :->cmd' '*:: :->args'
case $state in
cmd)
local cmds=('create:Create paste' 'get:Get paste' 'delete:Delete'
'info:Server info' 'list:List pastes' 'search:Search'
'update:Update paste' 'export:Export' 'register:Register'
'cert:Generate cert' 'pki:PKI ops' 'completion:Completions')
_describe -t commands 'commands' cmds
;;
args)
case $line[1] in
create|c|new)
_arguments '-E[No encrypt]' '-b[Burn]' '-x[Expiry]:sec:' \\
'-p[Pass]:p:' '-r[Raw]' '-q[Quiet]' '-C[Clipboard]' \\
'--copy-url' '*:file:_files' ;;
get|g)
_arguments '-o[Out]:f:_files' '-c[Copy]' '-p[Pass]:p:' \\
'-m[Meta]' '1:ID:' ;;
delete|d|rm) _arguments '-a[All]' '-c[Confirm]:n:' '*:ID:' ;;
list|ls) _arguments '-a[All]' '-l[Limit]:n:' '-o[Off]:n:' '--json' ;;
search|s|find)
_arguments '-t[Type]:p:' '--after:d:' '--before:d:' \\
'-l[Limit]:n:' '--json' ;;
update|u)
_arguments '-E[No encrypt]' '-p[Pass]:p:' '--remove-password' \\
'-x[Expiry]:s:' '-q[Quiet]' '1:ID:' '*:file:_files' ;;
export)
_arguments '-o[Dir]:d:_files -/' '-k[Keys]:f:_files' \\
'--manifest' '-q[Quiet]' ;;
register)
_arguments '-n[Name]:cn:' '-o[Dir]:d:_files -/' '--configure' \\
'--p12-only' '-f[Force]' '-q[Quiet]' ;;
cert)
_arguments '-o[Dir]:d:_files -/' '-a[Algo]:(rsa ec)' \\
'-b[Bits]:n:' '-c[Curve]:(secp256r1 secp384r1 secp521r1)' \\
'-d[Days]:n:' '-n[Name]:cn:' '--configure' '-f[Force]' ;;
pki) (( CURRENT == 2 )) && _describe 'cmd' '(status issue download)' ;;
completion) _arguments '--shell:(bash zsh fish)' ;;
esac ;;
esac
}
_fpaste "$@"
"""
# Fish completion - compact
fish_completion = """\
# Fish completion for fpaste
complete -c fpaste -f
complete -c fpaste -n __fish_use_subcommand -a 'create c new' -d 'Create'
complete -c fpaste -n __fish_use_subcommand -a 'get g' -d 'Get paste'
complete -c fpaste -n __fish_use_subcommand -a 'delete d rm' -d 'Delete'
complete -c fpaste -n __fish_use_subcommand -a 'info i' -d 'Server info'
complete -c fpaste -n __fish_use_subcommand -a 'list ls' -d 'List'
complete -c fpaste -n __fish_use_subcommand -a 'search s find' -d 'Search'
complete -c fpaste -n __fish_use_subcommand -a 'update u' -d 'Update'
complete -c fpaste -n __fish_use_subcommand -a 'export' -d 'Export'
complete -c fpaste -n __fish_use_subcommand -a 'register' -d 'Register'
complete -c fpaste -n __fish_use_subcommand -a 'cert' -d 'Gen cert'
complete -c fpaste -n __fish_use_subcommand -a 'pki' -d 'PKI'
complete -c fpaste -n __fish_use_subcommand -a 'completion' -d 'Completions'
set -l cr '__fish_seen_subcommand_from create c new'
complete -c fpaste -n $cr -s E -l no-encrypt -d 'No encrypt'
complete -c fpaste -n $cr -s b -l burn -d 'Burn'
complete -c fpaste -n $cr -s x -l expiry -d 'Expiry' -x
complete -c fpaste -n $cr -s p -l password -d 'Password' -x
complete -c fpaste -n $cr -s r -l raw -d 'Raw URL'
complete -c fpaste -n $cr -s q -l quiet -d 'Quiet'
complete -c fpaste -n $cr -s C -l clipboard -d 'Clipboard'
complete -c fpaste -n $cr -l copy-url -d 'Copy URL'
complete -c fpaste -n $cr -F
set -l gt '__fish_seen_subcommand_from get g'
complete -c fpaste -n $gt -s o -l output -d 'Output' -r
complete -c fpaste -n $gt -s c -l copy -d 'Copy'
complete -c fpaste -n $gt -s p -l password -d 'Password' -x
complete -c fpaste -n $gt -s m -l meta -d 'Metadata'
set -l dl '__fish_seen_subcommand_from delete d rm'
complete -c fpaste -n $dl -s a -l all -d 'All'
complete -c fpaste -n $dl -s c -l confirm -d 'Confirm' -x
set -l ls '__fish_seen_subcommand_from list ls'
complete -c fpaste -n $ls -s a -l all -d 'All'
complete -c fpaste -n $ls -s l -l limit -d 'Limit' -x
complete -c fpaste -n $ls -s o -l offset -d 'Offset' -x
complete -c fpaste -n $ls -l json -d 'JSON'
set -l cp '__fish_seen_subcommand_from completion'
complete -c fpaste -n $cp -l shell -d 'Shell' -xa 'bash zsh fish'
"""
completions = {"bash": bash_completion, "zsh": zsh_completion, "fish": fish_completion}
if shell not in completions:
die(f"unsupported shell: {shell} (use: bash, zsh, fish)")
print(completions[shell])
# Command dispatch table
COMMANDS: dict[str, Any] = {
"create": cmd_create,
"c": cmd_create,
"new": cmd_create,
"get": cmd_get,
"g": cmd_get,
"delete": cmd_delete,
"d": cmd_delete,
"rm": cmd_delete,
"info": cmd_info,
"i": cmd_info,
"list": cmd_list,
"ls": cmd_list,
"search": cmd_search,
"s": cmd_search,
"find": cmd_search,
"update": cmd_update,
"u": cmd_update,
"export": cmd_export,
"register": cmd_register,
"cert": cmd_cert,
"completion": cmd_completion,
}
PKI_COMMANDS: dict[str, Any] = {
"status": cmd_pki_status,
"issue": cmd_pki_issue,
"download": cmd_pki_download,
"dl": cmd_pki_download,
}
def main() -> None:
"""Main entry point."""
args_to_parse = sys.argv[1:]
command_names = set(COMMANDS.keys()) | {"pki"}
# Auto-insert "create" if first positional looks like a file
insert_pos = 0
has_command = False
file_pos = -1
i = 0
while i < len(args_to_parse):
arg = args_to_parse[i]
if arg in ("-s", "--server"):
insert_pos = i + 2
i += 2
continue
if arg in ("-h", "--help"):
i += 1
insert_pos = i
continue
if arg.startswith("-"):
i += 1
continue
if arg in command_names:
has_command = True
break
if is_file_path(arg):
file_pos = i
break
i += 1
if not has_command and (file_pos >= 0 or not sys.stdin.isatty()):
args_to_parse.insert(insert_pos, "create")
parser = build_parser()
args = parser.parse_args(args_to_parse)
config = get_config()
if args.server:
config["server"] = args.server
config["ssl_context"] = create_ssl_context(config)
if not args.command:
if not sys.stdin.isatty():
args.command = "create"
args.file = None
args.no_encrypt = False
args.burn = False
args.expiry = None
args.password = None
args.raw = False
args.quiet = False
else:
parser.print_help()
sys.exit(0)
if args.command == "pki":
if args.pki_command in PKI_COMMANDS:
PKI_COMMANDS[args.pki_command](args, config)
else:
parser.parse_args(["pki", "--help"])
elif args.command in COMMANDS:
COMMANDS[args.command](args, config)
if __name__ == "__main__":
main()