forked from claw/flaskpaste
- Add endpoint config key (FLASKPASTE_ENDPOINT env var) - Add build_url() helper for URL construction - Change default server to https://paste.mymx.me - Support endpoint prefix in config file
1900 lines
65 KiB
Python
Executable File
1900 lines
65 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""FlaskPaste command-line client."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import shutil
|
|
import ssl
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from datetime import UTC, datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Any, NoReturn
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Mapping
|
|
|
|
# Optional cryptography support (for encryption and cert generation)
|
|
try:
|
|
from cryptography import x509
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ec, rsa
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
from cryptography.x509.oid import NameOID
|
|
|
|
HAS_CRYPTO = True
|
|
except ImportError:
|
|
HAS_CRYPTO = False
|
|
|
|
# Constants
|
|
CONFIG_DIR = Path.home() / ".config" / "fpaste"
|
|
CONFIG_FILE = CONFIG_DIR / "config"
|
|
CONFIG_KEYS = frozenset({"server", "endpoint", "cert_sha1", "client_cert", "client_key", "ca_cert"})
|
|
|
|
MIME_EXTENSIONS: dict[str, str] = {
|
|
"text/plain": ".txt",
|
|
"text/html": ".html",
|
|
"text/css": ".css",
|
|
"text/javascript": ".js",
|
|
"text/markdown": ".md",
|
|
"text/x-python": ".py",
|
|
"application/json": ".json",
|
|
"application/xml": ".xml",
|
|
"application/javascript": ".js",
|
|
"application/octet-stream": ".bin",
|
|
"image/png": ".png",
|
|
"image/jpeg": ".jpg",
|
|
"image/gif": ".gif",
|
|
"image/webp": ".webp",
|
|
"image/svg+xml": ".svg",
|
|
"application/pdf": ".pdf",
|
|
"application/zip": ".zip",
|
|
"application/gzip": ".gz",
|
|
"application/x-tar": ".tar",
|
|
}
|
|
|
|
FILE_EXTENSIONS = frozenset(
|
|
{
|
|
"txt",
|
|
"md",
|
|
"py",
|
|
"js",
|
|
"json",
|
|
"yaml",
|
|
"yml",
|
|
"xml",
|
|
"html",
|
|
"css",
|
|
"sh",
|
|
"bash",
|
|
"c",
|
|
"cpp",
|
|
"h",
|
|
"go",
|
|
"rs",
|
|
"java",
|
|
"rb",
|
|
"php",
|
|
"sql",
|
|
"log",
|
|
"conf",
|
|
"cfg",
|
|
"ini",
|
|
"png",
|
|
"jpg",
|
|
"jpeg",
|
|
"gif",
|
|
"pdf",
|
|
"zip",
|
|
"tar",
|
|
"gz",
|
|
}
|
|
)
|
|
|
|
DATE_FORMATS = (
|
|
"%Y-%m-%d",
|
|
"%Y-%m-%d %H:%M",
|
|
"%Y-%m-%d %H:%M:%S",
|
|
"%Y-%m-%dT%H:%M:%S",
|
|
"%Y-%m-%dT%H:%M:%SZ",
|
|
)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Core utilities
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
def die(msg: str, code: int = 1) -> NoReturn:
|
|
"""Print error and exit."""
|
|
print(f"error: {msg}", file=sys.stderr)
|
|
sys.exit(code)
|
|
|
|
|
|
def warn(msg: str) -> None:
|
|
"""Print warning to stderr."""
|
|
print(f"warning: {msg}", file=sys.stderr)
|
|
|
|
|
|
def request(
|
|
url: str,
|
|
method: str = "GET",
|
|
data: bytes | None = None,
|
|
headers: dict[str, str] | None = None,
|
|
ssl_context: ssl.SSLContext | None = None,
|
|
) -> tuple[int, bytes, dict[str, str]]:
|
|
"""Make HTTP request and return (status, body, headers)."""
|
|
headers = headers or {}
|
|
req = urllib.request.Request(url, data=data, headers=headers, method=method) # noqa: S310
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30, context=ssl_context) as resp: # noqa: S310 # nosec B310
|
|
return resp.status, resp.read(), dict(resp.headers)
|
|
except urllib.error.HTTPError as e:
|
|
return e.code, e.read(), dict(e.headers)
|
|
except urllib.error.URLError as e:
|
|
die(f"Connection failed: {e.reason}")
|
|
|
|
|
|
def parse_error(body: bytes, default: str = "request failed") -> str:
|
|
"""Parse error message from JSON response body."""
|
|
try:
|
|
result = json.loads(body).get("error", default)
|
|
return str(result) if result is not None else default
|
|
except (json.JSONDecodeError, UnicodeDecodeError):
|
|
return default
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Configuration
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
def check_config_permissions(path: Path) -> None:
|
|
"""CLI-003: Warn if config file has insecure permissions."""
|
|
try:
|
|
mode = path.stat().st_mode
|
|
# Warn if group or others can read (should be 600 or 640)
|
|
if mode & stat.S_IROTH:
|
|
warn(f"config file {path} is world-readable (mode {stat.filemode(mode)})")
|
|
elif mode & stat.S_IRGRP:
|
|
# Group-readable is less severe, only warn if also has secrets
|
|
pass # Silent for group-readable, common in shared setups
|
|
except OSError:
|
|
pass # File may not exist yet or permission denied
|
|
|
|
|
|
def read_config_file(path: Path | None = None) -> dict[str, str]:
|
|
"""Read config file and return key-value pairs."""
|
|
path = path or CONFIG_FILE
|
|
result: dict[str, str] = {}
|
|
|
|
if not path.exists():
|
|
return result
|
|
|
|
# CLI-003: Check file permissions before reading
|
|
check_config_permissions(path)
|
|
|
|
for line in path.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
key = key.strip().lower()
|
|
if key in CONFIG_KEYS:
|
|
result[key] = value.strip().strip('"').strip("'")
|
|
|
|
return result
|
|
|
|
|
|
def write_config_file(
|
|
updates: dict[str, str],
|
|
path: Path | None = None,
|
|
) -> Path:
|
|
"""Update config file with new values, preserving existing entries."""
|
|
path = path or CONFIG_FILE
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
existing = read_config_file(path)
|
|
existing.update(updates)
|
|
|
|
lines = [f"{k} = {v}" for k, v in sorted(existing.items())]
|
|
path.write_text("\n".join(lines) + "\n")
|
|
return path
|
|
|
|
|
|
def get_config() -> dict[str, Any]:
|
|
"""Load configuration from environment and config file."""
|
|
config: dict[str, Any] = {
|
|
"server": os.environ.get("FLASKPASTE_SERVER", "https://paste.mymx.me"),
|
|
"endpoint": os.environ.get("FLASKPASTE_ENDPOINT", ""),
|
|
"cert_sha1": os.environ.get("FLASKPASTE_CERT_SHA1", ""),
|
|
"client_cert": os.environ.get("FLASKPASTE_CLIENT_CERT", ""),
|
|
"client_key": os.environ.get("FLASKPASTE_CLIENT_KEY", ""),
|
|
"ca_cert": os.environ.get("FLASKPASTE_CA_CERT", ""),
|
|
}
|
|
|
|
# Config file values (lower priority than environment)
|
|
file_config = read_config_file()
|
|
for key in CONFIG_KEYS:
|
|
if not config.get(key) and key in file_config:
|
|
config[key] = file_config[key]
|
|
|
|
return config
|
|
|
|
|
|
def create_ssl_context(config: Mapping[str, Any]) -> ssl.SSLContext | None:
|
|
"""Create SSL context for mTLS if certificates are configured."""
|
|
client_cert = config.get("client_cert", "")
|
|
if not client_cert:
|
|
return None
|
|
|
|
ctx = ssl.create_default_context()
|
|
# CLI-002: Explicitly enable hostname verification (defense in depth)
|
|
# create_default_context() sets these, but explicit is safer
|
|
ctx.check_hostname = True
|
|
ctx.verify_mode = ssl.CERT_REQUIRED
|
|
|
|
if ca_cert := config.get("ca_cert", ""):
|
|
ctx.load_verify_locations(ca_cert)
|
|
|
|
try:
|
|
ctx.load_cert_chain(certfile=client_cert, keyfile=config.get("client_key") or None)
|
|
except ssl.SSLError as e:
|
|
die(f"failed to load client certificate: {e}")
|
|
except FileNotFoundError as e:
|
|
die(f"certificate file not found: {e}")
|
|
|
|
return ctx
|
|
|
|
|
|
def build_url(config: Mapping[str, Any], path: str = "") -> str:
|
|
"""Build full URL from server, endpoint, and path."""
|
|
server = config["server"].rstrip("/")
|
|
endpoint = config.get("endpoint", "").strip("/")
|
|
# Preserve trailing slash for root path "/"
|
|
trailing_slash = path == "/" or (path.endswith("/") and path != "/")
|
|
path = path.strip("/")
|
|
|
|
if endpoint:
|
|
base = f"{server}/{endpoint}"
|
|
if path:
|
|
return f"{base}/{path}/" if trailing_slash else f"{base}/{path}"
|
|
return f"{base}/" if trailing_slash else base
|
|
if path:
|
|
return f"{server}/{path}/" if trailing_slash else f"{server}/{path}"
|
|
return f"{server}/" if trailing_slash else server
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Encryption
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
def encrypt_content(plaintext: bytes) -> tuple[bytes, bytes]:
|
|
"""Encrypt content with AES-256-GCM. Returns (ciphertext, key)."""
|
|
if not HAS_CRYPTO:
|
|
die("encryption requires 'cryptography' package: pip install cryptography")
|
|
key = os.urandom(32)
|
|
nonce = os.urandom(12)
|
|
aesgcm = AESGCM(key)
|
|
ciphertext = aesgcm.encrypt(nonce, plaintext, None)
|
|
return nonce + ciphertext, key
|
|
|
|
|
|
def decrypt_content(blob: bytes, key: bytes) -> bytes:
|
|
"""Decrypt AES-256-GCM encrypted content."""
|
|
if not HAS_CRYPTO:
|
|
die("decryption requires 'cryptography' package: pip install cryptography")
|
|
if len(blob) < 12:
|
|
die("encrypted content too short")
|
|
nonce, ciphertext = blob[:12], blob[12:]
|
|
aesgcm = AESGCM(key)
|
|
try:
|
|
return aesgcm.decrypt(nonce, ciphertext, None)
|
|
except Exception:
|
|
die("decryption failed (wrong key or corrupted data)")
|
|
|
|
|
|
def encode_key(key: bytes) -> str:
|
|
"""Encode key as URL-safe base64."""
|
|
return base64.urlsafe_b64encode(key).decode().rstrip("=")
|
|
|
|
|
|
def decode_key(encoded: str) -> bytes:
|
|
"""Decode URL-safe base64 key."""
|
|
padding = 4 - (len(encoded) % 4)
|
|
if padding != 4:
|
|
encoded += "=" * padding
|
|
try:
|
|
return base64.urlsafe_b64decode(encoded)
|
|
except Exception:
|
|
die("invalid encryption key in URL")
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Proof-of-work
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
def solve_pow(nonce: str, difficulty: int) -> int:
|
|
"""Solve proof-of-work: find N where SHA256(nonce:N) has `difficulty` leading zero bits."""
|
|
n = 0
|
|
target_bytes = (difficulty + 7) // 8
|
|
|
|
while True:
|
|
work = f"{nonce}:{n}".encode()
|
|
hash_bytes = hashlib.sha256(work).digest()
|
|
|
|
zero_bits = 0
|
|
for byte in hash_bytes[: target_bytes + 1]:
|
|
if byte == 0:
|
|
zero_bits += 8
|
|
else:
|
|
zero_bits += 8 - byte.bit_length()
|
|
break
|
|
|
|
if zero_bits >= difficulty:
|
|
return n
|
|
|
|
n += 1
|
|
if n % 100000 == 0:
|
|
print(f"\rsolving pow: {n} attempts...", end="", file=sys.stderr)
|
|
|
|
|
|
def get_challenge(
|
|
config: Mapping[str, Any],
|
|
path: str = "/challenge",
|
|
) -> dict[str, Any] | None:
|
|
"""Fetch PoW challenge from server."""
|
|
url = build_url(config, path)
|
|
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
|
|
|
|
if status != 200:
|
|
return None
|
|
|
|
data = json.loads(body)
|
|
return data if data.get("enabled") else None
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Formatting utilities
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
def format_size(size: int) -> str:
|
|
"""Format byte size as human-readable string."""
|
|
if size < 1024:
|
|
return f"{size}B"
|
|
if size < 1024 * 1024:
|
|
return f"{size / 1024:.1f}K"
|
|
return f"{size / (1024 * 1024):.1f}M"
|
|
|
|
|
|
def format_timestamp(ts: int | float) -> str:
|
|
"""Format Unix timestamp as human-readable date."""
|
|
dt = datetime.fromtimestamp(ts, tz=UTC)
|
|
return dt.strftime("%Y-%m-%d %H:%M")
|
|
|
|
|
|
def format_time_remaining(expires_at: int | float | None) -> str:
|
|
"""Format time remaining until expiry."""
|
|
if not expires_at:
|
|
return ""
|
|
now = time.time()
|
|
remaining = expires_at - now
|
|
if remaining <= 0:
|
|
return "expired"
|
|
if remaining < 60:
|
|
return f"{int(remaining)}s"
|
|
if remaining < 3600:
|
|
return f"{int(remaining / 60)}m"
|
|
if remaining < 86400:
|
|
hours = int(remaining / 3600)
|
|
return f"{hours}h"
|
|
days = int(remaining / 86400)
|
|
if days >= 365:
|
|
years = days // 365
|
|
return f"{years}y"
|
|
return f"{days}d"
|
|
|
|
|
|
def parse_date(date_str: str) -> int:
|
|
"""Parse date string to Unix timestamp."""
|
|
if not date_str:
|
|
return 0
|
|
|
|
for fmt in DATE_FORMATS:
|
|
try:
|
|
dt = datetime.strptime(date_str, fmt).replace(tzinfo=UTC)
|
|
return int(dt.timestamp())
|
|
except ValueError:
|
|
continue
|
|
|
|
try:
|
|
return int(date_str)
|
|
except ValueError:
|
|
die(f"invalid date format: {date_str}")
|
|
|
|
|
|
def get_extension_for_mime(mime_type: str) -> str:
|
|
"""Get file extension for MIME type."""
|
|
return MIME_EXTENSIONS.get(mime_type, ".bin")
|
|
|
|
|
|
def format_paste_row(paste: dict[str, Any], show_owner: bool = False) -> str:
|
|
"""Format a paste as a table row."""
|
|
paste_id = paste["id"]
|
|
mime_type = paste.get("mime_type", "unknown")[:16]
|
|
size = format_size(paste.get("size", 0))
|
|
created = format_timestamp(paste.get("created_at", 0))
|
|
|
|
# Time remaining until expiry
|
|
expires = format_time_remaining(paste.get("expires_at"))
|
|
|
|
flags = []
|
|
if paste.get("burn_after_read"):
|
|
flags.append("burn")
|
|
if paste.get("password_protected"):
|
|
flags.append("pass")
|
|
|
|
flags_str = " ".join(flags)
|
|
row = f"{paste_id:<12} {mime_type:<16} {size:>6} {created:<16} {expires:<8} {flags_str}"
|
|
if show_owner and paste.get("owner"):
|
|
row += f" {paste['owner'][:12]}"
|
|
return row
|
|
|
|
|
|
def print_paste_list(
|
|
pastes: list[dict[str, Any]],
|
|
summary: str,
|
|
as_json: bool = False,
|
|
data: dict[str, Any] | None = None,
|
|
) -> None:
|
|
"""Print a list of pastes in table or JSON format."""
|
|
if as_json:
|
|
print(json.dumps(data or {"pastes": pastes}, indent=2))
|
|
return
|
|
|
|
if not pastes:
|
|
print("no pastes found")
|
|
return
|
|
|
|
# Check if owner data is present (admin view)
|
|
show_owner = any(paste.get("owner") for paste in pastes)
|
|
header = f"{'ID':<12} {'TYPE':<16} {'SIZE':>6} {'CREATED':<16} {'EXPIRES':<8} FLAGS"
|
|
if show_owner:
|
|
header += " OWNER"
|
|
print(header)
|
|
for paste in pastes:
|
|
print(format_paste_row(paste, show_owner=show_owner))
|
|
print(f"\n{summary}")
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Clipboard integration
|
|
# -----------------------------------------------------------------------------
|
|
|
|
# Clipboard read commands (tool name, command args)
|
|
CLIPBOARD_READ_COMMANDS = [
|
|
("xclip", ["xclip", "-selection", "clipboard", "-o"]),
|
|
("xsel", ["xsel", "--clipboard", "--output"]),
|
|
("pbpaste", ["pbpaste"]),
|
|
("powershell.exe", ["powershell.exe", "-command", "Get-Clipboard"]),
|
|
("wl-paste", ["wl-paste"]),
|
|
]
|
|
|
|
# Clipboard write commands (tool name, command args)
|
|
CLIPBOARD_WRITE_COMMANDS = [
|
|
("xclip", ["xclip", "-selection", "clipboard", "-i"]),
|
|
("xsel", ["xsel", "--clipboard", "--input"]),
|
|
("pbcopy", ["pbcopy"]),
|
|
("clip.exe", ["clip.exe"]),
|
|
("wl-copy", ["wl-copy"]),
|
|
]
|
|
|
|
# CLI-001: Trusted directories for clipboard tools (system paths only)
|
|
# Prevents command injection via malicious PATH manipulation
|
|
TRUSTED_CLIPBOARD_DIRS = frozenset(
|
|
{
|
|
"/usr/bin",
|
|
"/usr/local/bin",
|
|
"/bin",
|
|
"/opt/homebrew/bin", # macOS Homebrew
|
|
"/usr/X11/bin",
|
|
"/usr/X11R6/bin",
|
|
}
|
|
)
|
|
|
|
# Windows system directories (checked case-insensitively)
|
|
TRUSTED_WINDOWS_PATTERNS = (
|
|
"\\windows\\",
|
|
"\\system32\\",
|
|
"\\syswow64\\",
|
|
"\\windowsapps\\",
|
|
)
|
|
|
|
|
|
def is_trusted_clipboard_path(path: str) -> bool:
|
|
"""Check if clipboard tool path is in a trusted system directory.
|
|
|
|
CLI-001: Validates that resolved clipboard tool paths are in expected
|
|
system locations to prevent command injection via PATH manipulation.
|
|
"""
|
|
if not path:
|
|
return False
|
|
|
|
resolved = Path(path).resolve()
|
|
parent = str(resolved.parent)
|
|
|
|
# Check Unix trusted directories
|
|
if parent in TRUSTED_CLIPBOARD_DIRS:
|
|
return True
|
|
|
|
# Check Windows paths (case-insensitive)
|
|
parent_lower = parent.lower()
|
|
for pattern in TRUSTED_WINDOWS_PATTERNS:
|
|
if pattern in parent_lower:
|
|
return True
|
|
|
|
# Also allow Windows Program Files paths
|
|
return "\\program files" in parent_lower
|
|
|
|
|
|
def find_clipboard_command(commands: list[tuple[str, list[str]]]) -> list[str] | None:
|
|
"""Find first available clipboard command in trusted directories.
|
|
|
|
CLI-001: Validates that found commands are in trusted system directories
|
|
to prevent command injection via PATH manipulation.
|
|
"""
|
|
for tool_name, cmd in commands:
|
|
tool_path = shutil.which(tool_name)
|
|
if tool_path and is_trusted_clipboard_path(tool_path):
|
|
# Use absolute path for security
|
|
return [tool_path, *cmd[1:]]
|
|
return None
|
|
|
|
|
|
def read_clipboard() -> bytes:
|
|
"""Read content from system clipboard."""
|
|
cmd = find_clipboard_command(CLIPBOARD_READ_COMMANDS)
|
|
if not cmd:
|
|
die("no clipboard tool found (install xclip, xsel, or wl-paste)")
|
|
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, check=True)
|
|
return result.stdout
|
|
except subprocess.CalledProcessError as e:
|
|
die(f"clipboard read failed: {e.stderr.decode(errors='replace')}")
|
|
except FileNotFoundError:
|
|
die(f"clipboard tool not found: {cmd[0]}")
|
|
|
|
|
|
def write_clipboard(data: bytes) -> None:
|
|
"""Write content to system clipboard."""
|
|
cmd = find_clipboard_command(CLIPBOARD_WRITE_COMMANDS)
|
|
if not cmd:
|
|
die("no clipboard tool found (install xclip, xsel, or wl-copy)")
|
|
|
|
try:
|
|
subprocess.run(cmd, input=data, check=True)
|
|
except subprocess.CalledProcessError as e:
|
|
die(f"clipboard write failed: {e.stderr.decode(errors='replace') if e.stderr else ''}")
|
|
except FileNotFoundError:
|
|
die(f"clipboard tool not found: {cmd[0]}")
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Content helpers
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
def read_content(file_arg: str | None, from_clipboard: bool = False) -> bytes:
|
|
"""Read content from file, stdin, or clipboard."""
|
|
if from_clipboard:
|
|
return read_clipboard()
|
|
|
|
if file_arg:
|
|
if file_arg == "-":
|
|
return sys.stdin.buffer.read()
|
|
path = Path(file_arg)
|
|
if not path.exists():
|
|
die(f"file not found: {file_arg}")
|
|
return path.read_bytes()
|
|
|
|
if sys.stdin.isatty():
|
|
die("no input provided (pipe data, specify file, or use -C for clipboard)")
|
|
return sys.stdin.buffer.read()
|
|
|
|
|
|
def prepare_content(
|
|
content: bytes,
|
|
encrypt: bool,
|
|
quiet: bool = False,
|
|
) -> tuple[bytes, bytes | None]:
|
|
"""Optionally encrypt content. Returns (content, encryption_key or None)."""
|
|
if not encrypt:
|
|
return content, None
|
|
|
|
if not HAS_CRYPTO:
|
|
die("encryption requires 'cryptography' package (use -E to disable)")
|
|
|
|
if not quiet:
|
|
print("encrypting...", end="", file=sys.stderr)
|
|
encrypted, key = encrypt_content(content)
|
|
if not quiet:
|
|
print(" done", file=sys.stderr)
|
|
return encrypted, key
|
|
|
|
|
|
def extract_paste_id(url_or_id: str) -> tuple[str, bytes | None]:
|
|
"""Extract paste ID and optional encryption key from URL or ID."""
|
|
encryption_key = None
|
|
|
|
if "#" in url_or_id:
|
|
url_or_id, key_encoded = url_or_id.rsplit("#", 1)
|
|
if key_encoded:
|
|
encryption_key = decode_key(key_encoded)
|
|
|
|
paste_id = url_or_id.split("/")[-1]
|
|
return paste_id, encryption_key
|
|
|
|
|
|
def auth_headers(config: Mapping[str, Any]) -> dict[str, str]:
|
|
"""Build authentication headers."""
|
|
if cert_sha1 := config.get("cert_sha1"):
|
|
return {"X-SSL-Client-SHA1": cert_sha1}
|
|
return {}
|
|
|
|
|
|
def require_auth(config: Mapping[str, Any]) -> None:
|
|
"""Ensure authentication is configured."""
|
|
if not config.get("cert_sha1"):
|
|
die("authentication required (set FLASKPASTE_CERT_SHA1)")
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Commands
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
def cmd_create(args: argparse.Namespace, config: dict[str, Any]) -> None:
|
|
"""Create a new paste."""
|
|
from_clipboard = getattr(args, "clipboard", False)
|
|
content = read_content(args.file, from_clipboard=from_clipboard)
|
|
if not content:
|
|
die("empty content")
|
|
|
|
content, encryption_key = prepare_content(
|
|
content,
|
|
encrypt=not getattr(args, "no_encrypt", False),
|
|
quiet=args.quiet,
|
|
)
|
|
|
|
# Build headers
|
|
base_headers = auth_headers(config)
|
|
if args.burn:
|
|
base_headers["X-Burn-After-Read"] = "true"
|
|
if args.expiry:
|
|
base_headers["X-Expiry"] = str(args.expiry)
|
|
if args.password:
|
|
base_headers["X-Paste-Password"] = args.password
|
|
|
|
url = build_url(config, "/")
|
|
max_retries = 5
|
|
last_error = ""
|
|
|
|
for attempt in range(max_retries):
|
|
headers = dict(base_headers)
|
|
|
|
if challenge := get_challenge(config):
|
|
if attempt > 0 and not args.quiet:
|
|
print(f"retry {attempt}/{max_retries - 1}...", file=sys.stderr)
|
|
|
|
if not args.quiet:
|
|
diff = challenge["difficulty"]
|
|
base_diff = challenge.get("base_difficulty", diff)
|
|
elevated = challenge.get("elevated", False)
|
|
msg = (
|
|
f"solving pow ({diff} bits, elevated from {base_diff})..."
|
|
if elevated
|
|
else f"solving pow ({diff} bits)..."
|
|
)
|
|
print(msg, end="", file=sys.stderr)
|
|
|
|
solution = solve_pow(challenge["nonce"], challenge["difficulty"])
|
|
if not args.quiet:
|
|
print(" done", file=sys.stderr)
|
|
|
|
headers["X-PoW-Token"] = challenge["token"]
|
|
headers["X-PoW-Solution"] = str(solution)
|
|
|
|
status, body, _ = request(
|
|
url, method="POST", data=content, headers=headers, ssl_context=config.get("ssl_context")
|
|
)
|
|
|
|
if status == 201:
|
|
data = json.loads(body)
|
|
key_fragment = f"#{encode_key(encryption_key)}" if encryption_key else ""
|
|
|
|
if args.raw:
|
|
result_url = build_url(config, data["raw"]) + key_fragment
|
|
elif args.quiet:
|
|
result_url = data["id"] + key_fragment
|
|
else:
|
|
result_url = build_url(config, data["url"]) + key_fragment
|
|
|
|
print(result_url)
|
|
|
|
# Copy URL to clipboard if requested
|
|
if getattr(args, "copy_url", False):
|
|
write_clipboard(result_url.encode())
|
|
if not args.quiet:
|
|
print("(copied to clipboard)", file=sys.stderr)
|
|
return
|
|
|
|
last_error = parse_error(body, body.decode(errors="replace"))
|
|
err_lower = last_error.lower()
|
|
is_pow_error = status == 400 and ("pow" in err_lower or "proof-of-work" in err_lower)
|
|
|
|
if not is_pow_error:
|
|
die(f"create failed ({status}): {last_error}")
|
|
|
|
if not args.quiet:
|
|
print(f"pow rejected: {last_error}", file=sys.stderr)
|
|
|
|
die(f"create failed after {max_retries} attempts: {last_error}")
|
|
|
|
|
|
def cmd_get(args: argparse.Namespace, config: dict[str, Any]) -> None:
|
|
"""Retrieve a paste."""
|
|
paste_id, encryption_key = extract_paste_id(args.id)
|
|
|
|
headers: dict[str, str] = {}
|
|
if args.password:
|
|
headers["X-Paste-Password"] = args.password
|
|
|
|
if args.meta:
|
|
url = build_url(config, paste_id)
|
|
status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context"))
|
|
|
|
if status == 200:
|
|
data = json.loads(body)
|
|
print(f"id: {data['id']}")
|
|
print(f"mime_type: {data['mime_type']}")
|
|
print(f"size: {data['size']}")
|
|
print(f"created_at: {data['created_at']}")
|
|
if encryption_key:
|
|
print("encrypted: yes (key in URL)")
|
|
if data.get("password_protected"):
|
|
print("protected: yes (password required)")
|
|
elif status == 401:
|
|
die("password required (-p)")
|
|
elif status == 403:
|
|
die("invalid password")
|
|
else:
|
|
die(f"not found: {paste_id}")
|
|
else:
|
|
url = build_url(config, f"{paste_id}/raw")
|
|
status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context"))
|
|
|
|
if status == 200:
|
|
if encryption_key:
|
|
body = decrypt_content(body, encryption_key)
|
|
|
|
# Copy to clipboard if requested
|
|
if getattr(args, "copy", False):
|
|
write_clipboard(body)
|
|
print("(copied to clipboard)", file=sys.stderr)
|
|
elif args.output:
|
|
Path(args.output).write_bytes(body)
|
|
print(f"saved: {args.output}", file=sys.stderr)
|
|
else:
|
|
sys.stdout.buffer.write(body)
|
|
if sys.stdout.isatty() and body and not body.endswith(b"\n"):
|
|
sys.stdout.buffer.write(b"\n")
|
|
elif status == 401:
|
|
die("password required (-p)")
|
|
elif status == 403:
|
|
die("invalid password")
|
|
else:
|
|
die(f"not found: {paste_id}")
|
|
|
|
|
|
def cmd_delete(args: argparse.Namespace, config: dict[str, Any]) -> None:
|
|
"""Delete paste(s)."""
|
|
require_auth(config)
|
|
|
|
delete_all = getattr(args, "all", False)
|
|
confirm_count = getattr(args, "confirm", None)
|
|
paste_ids = [paste_id.split("/")[-1] for paste_id in (args.ids or [])]
|
|
|
|
# Validate arguments
|
|
if delete_all and paste_ids:
|
|
die("cannot specify both --all and paste IDs")
|
|
if not delete_all and not paste_ids:
|
|
die("specify paste ID(s) or use --all")
|
|
|
|
if delete_all:
|
|
# Fetch all pastes to get count and IDs
|
|
url = build_url(config, "/pastes") + "?all=1&limit=1000"
|
|
status, body, _ = request(
|
|
url, headers=auth_headers(config), ssl_context=config.get("ssl_context")
|
|
)
|
|
if status == 401:
|
|
die("authentication failed")
|
|
if status != 200:
|
|
die(f"failed to list pastes ({status})")
|
|
|
|
data = json.loads(body)
|
|
pastes = data.get("pastes", [])
|
|
total = len(pastes)
|
|
|
|
if total == 0:
|
|
print("no pastes to delete")
|
|
return
|
|
|
|
# Require confirmation with expected count
|
|
if confirm_count is None:
|
|
die(f"--all requires --confirm {total} (found {total} pastes)")
|
|
if confirm_count != total:
|
|
die(f"confirmation mismatch: expected {confirm_count}, found {total}")
|
|
|
|
paste_ids = [p["id"] for p in pastes]
|
|
|
|
# Delete pastes
|
|
deleted = 0
|
|
failed = 0
|
|
for paste_id in paste_ids:
|
|
url = build_url(config, paste_id)
|
|
status, _, _ = request(
|
|
url,
|
|
method="DELETE",
|
|
headers=auth_headers(config),
|
|
ssl_context=config.get("ssl_context"),
|
|
)
|
|
if status == 200:
|
|
print(f"deleted: {paste_id}")
|
|
deleted += 1
|
|
elif status == 404:
|
|
print(f"not found: {paste_id}", file=sys.stderr)
|
|
failed += 1
|
|
elif status == 403:
|
|
print(f"permission denied: {paste_id}", file=sys.stderr)
|
|
failed += 1
|
|
else:
|
|
print(f"failed ({status}): {paste_id}", file=sys.stderr)
|
|
failed += 1
|
|
|
|
# Summary for batch operations
|
|
if len(paste_ids) > 1:
|
|
print(f"\n{deleted} deleted, {failed} failed")
|
|
|
|
|
|
def cmd_info(args: argparse.Namespace, config: dict[str, Any]) -> None:
|
|
"""Show server info."""
|
|
url = build_url(config, "/")
|
|
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
|
|
|
|
if status != 200:
|
|
die("failed to connect to server")
|
|
|
|
data = json.loads(body)
|
|
print(f"server: {config['server']}")
|
|
print(f"name: {data.get('name', 'unknown')}")
|
|
print(f"version: {data.get('version', 'unknown')}")
|
|
|
|
if challenge := get_challenge(config):
|
|
difficulty = challenge.get("difficulty", 0)
|
|
base_diff = challenge.get("base_difficulty", difficulty)
|
|
if challenge.get("elevated"):
|
|
print(f"pow: {difficulty} bits (elevated from {base_diff})")
|
|
else:
|
|
print(f"pow: {difficulty} bits")
|
|
else:
|
|
print("pow: disabled")
|
|
|
|
|
|
def cmd_list(args: argparse.Namespace, config: dict[str, Any]) -> None:
|
|
"""List user's pastes."""
|
|
require_auth(config)
|
|
|
|
params = []
|
|
if getattr(args, "all", False):
|
|
params.append("all=1")
|
|
if args.limit:
|
|
params.append(f"limit={args.limit}")
|
|
if args.offset:
|
|
params.append(f"offset={args.offset}")
|
|
|
|
url = build_url(config, "/pastes")
|
|
if params:
|
|
url += "?" + "&".join(params)
|
|
|
|
status, body, _ = request(
|
|
url, headers=auth_headers(config), ssl_context=config.get("ssl_context")
|
|
)
|
|
|
|
if status == 401:
|
|
die("authentication failed")
|
|
if status != 200:
|
|
die(f"failed to list pastes ({status})")
|
|
|
|
data = json.loads(body)
|
|
pastes = data.get("pastes", [])
|
|
summary = f"{data.get('count', 0)} of {data.get('total', 0)} pastes shown"
|
|
print_paste_list(pastes, summary, as_json=args.json, data=data)
|
|
|
|
|
|
def cmd_search(args: argparse.Namespace, config: dict[str, Any]) -> None:
|
|
"""Search user's pastes."""
|
|
require_auth(config)
|
|
|
|
params = []
|
|
if args.type:
|
|
params.append(f"type={args.type}")
|
|
if args.after:
|
|
params.append(f"after={parse_date(args.after)}")
|
|
if args.before:
|
|
params.append(f"before={parse_date(args.before)}")
|
|
if args.limit:
|
|
params.append(f"limit={args.limit}")
|
|
|
|
url = build_url(config, "/pastes")
|
|
if params:
|
|
url += "?" + "&".join(params)
|
|
|
|
status, body, _ = request(
|
|
url, headers=auth_headers(config), ssl_context=config.get("ssl_context")
|
|
)
|
|
|
|
if status == 401:
|
|
die("authentication failed")
|
|
if status != 200:
|
|
die(f"failed to search pastes ({status})")
|
|
|
|
data = json.loads(body)
|
|
pastes = data.get("pastes", [])
|
|
summary = f"{data.get('count', 0)} matching pastes found"
|
|
print_paste_list(pastes, summary, as_json=args.json, data=data)
|
|
|
|
|
|
def cmd_update(args: argparse.Namespace, config: dict[str, Any]) -> None:
|
|
"""Update an existing paste."""
|
|
require_auth(config)
|
|
|
|
paste_id, _ = extract_paste_id(args.id)
|
|
url = build_url(config, paste_id)
|
|
|
|
headers = auth_headers(config)
|
|
content: bytes | None = None
|
|
encryption_key: bytes | None = None
|
|
|
|
if args.file:
|
|
raw_content = read_content(args.file)
|
|
if not raw_content:
|
|
die("empty content")
|
|
content, encryption_key = prepare_content(
|
|
raw_content,
|
|
encrypt=not getattr(args, "no_encrypt", False),
|
|
quiet=args.quiet,
|
|
)
|
|
|
|
if args.password:
|
|
headers["X-Paste-Password"] = args.password
|
|
if args.remove_password:
|
|
headers["X-Remove-Password"] = "true"
|
|
if args.expiry:
|
|
headers["X-Extend-Expiry"] = str(args.expiry)
|
|
|
|
status, body, _ = request(
|
|
url, method="PUT", data=content, headers=headers, ssl_context=config.get("ssl_context")
|
|
)
|
|
|
|
if status == 200:
|
|
data = json.loads(body)
|
|
if args.quiet:
|
|
print(paste_id)
|
|
else:
|
|
print(f"updated: {paste_id}")
|
|
print(f" size: {data.get('size', 'unknown')}")
|
|
print(f" type: {data.get('mime_type', 'unknown')}")
|
|
if data.get("expires_at"):
|
|
print(f" expires: {data.get('expires_at')}")
|
|
if data.get("password_protected"):
|
|
print(" password: protected")
|
|
|
|
if content and encryption_key:
|
|
print(f" key: {build_url(config, paste_id)}#{encode_key(encryption_key)}")
|
|
elif status == 400:
|
|
die(parse_error(body, "bad request"))
|
|
elif status == 401:
|
|
die("authentication failed")
|
|
elif status == 403:
|
|
die("permission denied (not owner)")
|
|
elif status == 404:
|
|
die(f"not found: {paste_id}")
|
|
else:
|
|
die(f"update failed ({status})")
|
|
|
|
|
|
def cmd_export(args: argparse.Namespace, config: dict[str, Any]) -> None:
|
|
"""Export user's pastes to a directory."""
|
|
require_auth(config)
|
|
|
|
out_dir = Path(args.output) if args.output else Path("fpaste-export")
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Load key file
|
|
keys: dict[str, str] = {}
|
|
if args.keyfile:
|
|
keyfile_path = Path(args.keyfile)
|
|
if not keyfile_path.exists():
|
|
die(f"key file not found: {args.keyfile}")
|
|
for line in keyfile_path.read_text().splitlines():
|
|
line = line.strip()
|
|
if line and not line.startswith("#") and "=" in line:
|
|
paste_id, key_encoded = line.split("=", 1)
|
|
keys[paste_id.strip()] = key_encoded.strip()
|
|
|
|
# Fetch paste list
|
|
url = build_url(config, "/pastes") + "?limit=1000"
|
|
status, body, _ = request(
|
|
url, headers=auth_headers(config), ssl_context=config.get("ssl_context")
|
|
)
|
|
|
|
if status == 401:
|
|
die("authentication failed")
|
|
if status != 200:
|
|
die(f"failed to list pastes ({status})")
|
|
|
|
pastes = json.loads(body).get("pastes", [])
|
|
if not pastes:
|
|
print("no pastes to export")
|
|
return
|
|
|
|
exported, skipped, errors = 0, 0, 0
|
|
manifest: list[dict[str, Any]] = []
|
|
|
|
for paste in pastes:
|
|
paste_id = paste["id"]
|
|
mime_type = paste.get("mime_type", "application/octet-stream")
|
|
|
|
if not args.quiet:
|
|
print(f"exporting {paste_id}...", end=" ", file=sys.stderr)
|
|
|
|
if paste.get("burn_after_read"):
|
|
if not args.quiet:
|
|
print("skipped (burn-after-read)", file=sys.stderr)
|
|
skipped += 1
|
|
continue
|
|
|
|
if paste.get("password_protected"):
|
|
if not args.quiet:
|
|
print("skipped (password-protected)", file=sys.stderr)
|
|
skipped += 1
|
|
continue
|
|
|
|
raw_url = build_url(config, f"{paste_id}/raw")
|
|
status, content, _ = request(
|
|
raw_url, headers=auth_headers(config), ssl_context=config.get("ssl_context")
|
|
)
|
|
|
|
if status != 200:
|
|
if not args.quiet:
|
|
print(f"error ({status})", file=sys.stderr)
|
|
errors += 1
|
|
continue
|
|
|
|
decrypted = False
|
|
if paste_id in keys:
|
|
try:
|
|
key = decode_key(keys[paste_id])
|
|
content = decrypt_content(content, key)
|
|
decrypted = True
|
|
except SystemExit:
|
|
if not args.quiet:
|
|
print("decryption failed, keeping encrypted", file=sys.stderr, end=" ")
|
|
|
|
filename = f"{paste_id}{get_extension_for_mime(mime_type)}"
|
|
(out_dir / filename).write_bytes(content)
|
|
|
|
manifest.append(
|
|
{
|
|
"id": paste_id,
|
|
"filename": filename,
|
|
"mime_type": mime_type,
|
|
"size": len(content),
|
|
"created_at": paste.get("created_at"),
|
|
"decrypted": decrypted,
|
|
"encrypted": paste_id in keys and not decrypted,
|
|
}
|
|
)
|
|
|
|
if not args.quiet:
|
|
status_msg = "decrypted" if decrypted else ("encrypted" if paste_id in keys else "ok")
|
|
print(status_msg, file=sys.stderr)
|
|
|
|
exported += 1
|
|
|
|
if args.manifest:
|
|
manifest_path = out_dir / "manifest.json"
|
|
manifest_path.write_text(json.dumps(manifest, indent=2))
|
|
if not args.quiet:
|
|
print(f"manifest: {manifest_path}", file=sys.stderr)
|
|
|
|
print(f"\nexported: {exported}, skipped: {skipped}, errors: {errors}")
|
|
print(f"output: {out_dir}")
|
|
|
|
|
|
def cmd_pki_status(args: argparse.Namespace, config: dict[str, Any]) -> None:
|
|
"""Show PKI status and CA information."""
|
|
url = build_url(config, "/pki")
|
|
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
|
|
|
|
if status == 404:
|
|
die("PKI not enabled on this server")
|
|
if status != 200:
|
|
die(f"failed to get PKI status ({status})")
|
|
|
|
data = json.loads(body)
|
|
print(f"pki enabled: {data.get('enabled', False)}")
|
|
print(f"ca exists: {data.get('ca_exists', False)}")
|
|
|
|
if data.get("ca_exists"):
|
|
print(f"common name: {data.get('common_name', 'unknown')}")
|
|
print(f"fingerprint: {data.get('fingerprint_sha1', 'unknown')}")
|
|
if data.get("created_at"):
|
|
print(f"created: {data.get('created_at')}")
|
|
if data.get("expires_at"):
|
|
print(f"expires: {data.get('expires_at')}")
|
|
print(f"download: {build_url(config, data.get('download', '/pki/ca.crt'))}")
|
|
elif hint := data.get("hint"):
|
|
print(f"hint: {hint}")
|
|
|
|
|
|
def cmd_pki_issue(args: argparse.Namespace, config: dict[str, Any]) -> None:
|
|
"""Request a new client certificate from the server CA."""
|
|
url = build_url(config, "/pki/issue")
|
|
|
|
headers = {"Content-Type": "application/json", **auth_headers(config)}
|
|
payload = json.dumps({"common_name": args.name}).encode()
|
|
|
|
status, body, _ = request(
|
|
url, method="POST", data=payload, headers=headers, ssl_context=config.get("ssl_context")
|
|
)
|
|
|
|
if status == 404:
|
|
die(parse_error(body, "PKI not available"))
|
|
if status == 400:
|
|
die(parse_error(body, "bad request"))
|
|
if status != 201:
|
|
die(f"certificate issuance failed ({status})")
|
|
|
|
result = json.loads(body)
|
|
|
|
out_dir = Path(args.output) if args.output else CONFIG_DIR
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
key_file = out_dir / "client.key"
|
|
cert_file = out_dir / "client.crt"
|
|
|
|
if not args.force:
|
|
if key_file.exists():
|
|
die(f"key file exists: {key_file} (use --force)")
|
|
if cert_file.exists():
|
|
die(f"cert file exists: {cert_file} (use --force)")
|
|
|
|
key_file.write_text(result["private_key_pem"])
|
|
key_file.chmod(0o600)
|
|
cert_file.write_text(result["certificate_pem"])
|
|
|
|
fingerprint = result.get("fingerprint_sha1", "unknown")
|
|
|
|
print(f"key: {key_file}", file=sys.stderr)
|
|
print(f"certificate: {cert_file}", file=sys.stderr)
|
|
print(f"fingerprint: {fingerprint}", file=sys.stderr)
|
|
print(f"serial: {result.get('serial', 'unknown')}", file=sys.stderr)
|
|
print(f"common name: {result.get('common_name', args.name)}", file=sys.stderr)
|
|
|
|
if args.configure:
|
|
cfg_path = write_config_file(
|
|
{
|
|
"client_cert": str(cert_file),
|
|
"client_key": str(key_file),
|
|
"cert_sha1": fingerprint,
|
|
}
|
|
)
|
|
print(f"config: {cfg_path} (updated)", file=sys.stderr)
|
|
|
|
print(fingerprint)
|
|
|
|
|
|
def cmd_pki_download(args: argparse.Namespace, config: dict[str, Any]) -> None:
|
|
"""Download the CA certificate from the server."""
|
|
url = build_url(config, "/pki/ca.crt")
|
|
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
|
|
|
|
if status == 404:
|
|
die("CA certificate not available (PKI disabled or CA not generated)")
|
|
if status != 200:
|
|
die(f"failed to download CA certificate ({status})")
|
|
|
|
if args.output:
|
|
out_path = Path(args.output)
|
|
out_path.write_bytes(body)
|
|
print(f"saved: {out_path}", file=sys.stderr)
|
|
|
|
if HAS_CRYPTO:
|
|
cert = x509.load_pem_x509_certificate(body)
|
|
fp = hashlib.sha1(
|
|
cert.public_bytes(serialization.Encoding.DER), usedforsecurity=False
|
|
).hexdigest()
|
|
print(f"fingerprint: {fp}", file=sys.stderr)
|
|
|
|
if args.configure:
|
|
cfg_path = write_config_file({"ca_cert": str(out_path)})
|
|
print(f"config: {cfg_path} (updated)", file=sys.stderr)
|
|
else:
|
|
sys.stdout.buffer.write(body)
|
|
|
|
|
|
def cmd_register(args: argparse.Namespace, config: dict[str, Any]) -> None:
|
|
"""Register and obtain a client certificate from the server."""
|
|
if not HAS_CRYPTO:
|
|
die("register requires 'cryptography' package: pip install cryptography")
|
|
|
|
from cryptography.hazmat.primitives.serialization import pkcs12
|
|
|
|
url = build_url(config, "/register")
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
payload: dict[str, str] = {}
|
|
if args.name:
|
|
payload["common_name"] = args.name
|
|
|
|
if challenge := get_challenge(config, path="/register/challenge"):
|
|
if not args.quiet:
|
|
print(f"solving pow ({challenge['difficulty']} bits)...", end="", file=sys.stderr)
|
|
solution = solve_pow(challenge["nonce"], challenge["difficulty"])
|
|
if not args.quiet:
|
|
print(" done", file=sys.stderr)
|
|
headers["X-PoW-Token"] = challenge["token"]
|
|
headers["X-PoW-Solution"] = str(solution)
|
|
|
|
data = json.dumps(payload).encode() if payload else b"{}"
|
|
status, body, resp_headers = request(
|
|
url, method="POST", data=data, headers=headers, ssl_context=config.get("ssl_context")
|
|
)
|
|
|
|
if status == 400:
|
|
die(parse_error(body, "bad request"))
|
|
if status == 500:
|
|
die(parse_error(body, "server error"))
|
|
if status != 200:
|
|
die(f"registration failed ({status})")
|
|
|
|
fingerprint = resp_headers.get("X-Fingerprint-SHA1", "unknown")
|
|
|
|
out_dir = Path(args.output) if args.output else CONFIG_DIR
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
p12_file = out_dir / "client.p12"
|
|
key_file = out_dir / "client.key"
|
|
cert_file = out_dir / "client.crt"
|
|
|
|
if not args.force:
|
|
if p12_file.exists():
|
|
die(f"p12 file exists: {p12_file} (use --force)")
|
|
if not args.p12_only:
|
|
if key_file.exists():
|
|
die(f"key file exists: {key_file} (use --force)")
|
|
if cert_file.exists():
|
|
die(f"cert file exists: {cert_file} (use --force)")
|
|
|
|
p12_file.write_bytes(body)
|
|
p12_file.chmod(0o600)
|
|
print(f"pkcs12: {p12_file}", file=sys.stderr)
|
|
|
|
if not args.p12_only:
|
|
private_key, certificate, _ = pkcs12.load_key_and_certificates(body, None)
|
|
|
|
if private_key is None or certificate is None:
|
|
die("failed to parse PKCS#12 bundle")
|
|
|
|
key_pem = private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption(),
|
|
)
|
|
cert_pem = certificate.public_bytes(serialization.Encoding.PEM)
|
|
|
|
key_file.write_bytes(key_pem)
|
|
key_file.chmod(0o600)
|
|
cert_file.write_bytes(cert_pem)
|
|
|
|
print(f"key: {key_file}", file=sys.stderr)
|
|
print(f"certificate: {cert_file}", file=sys.stderr)
|
|
|
|
cn = certificate.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
|
|
if cn:
|
|
cn_value = cn[0].value
|
|
if isinstance(cn_value, bytes):
|
|
cn_value = cn_value.decode("utf-8", errors="replace")
|
|
print(f"common name: {cn_value}", file=sys.stderr)
|
|
|
|
print(f"fingerprint: {fingerprint}", file=sys.stderr)
|
|
|
|
if args.configure and not args.p12_only:
|
|
cfg_path = write_config_file(
|
|
{
|
|
"client_cert": str(cert_file),
|
|
"client_key": str(key_file),
|
|
"cert_sha1": fingerprint,
|
|
}
|
|
)
|
|
print(f"config: {cfg_path} (updated)", file=sys.stderr)
|
|
|
|
print(fingerprint)
|
|
|
|
|
|
def cmd_cert(args: argparse.Namespace, config: dict[str, Any]) -> None:
|
|
"""Generate a self-signed client certificate for mTLS authentication."""
|
|
if not HAS_CRYPTO:
|
|
die("certificate generation requires 'cryptography' package: pip install cryptography")
|
|
|
|
out_dir = Path(args.output) if args.output else CONFIG_DIR
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
key_file = out_dir / "client.key"
|
|
cert_file = out_dir / "client.crt"
|
|
|
|
if not args.force:
|
|
if key_file.exists():
|
|
die(f"key file exists: {key_file} (use --force)")
|
|
if cert_file.exists():
|
|
die(f"cert file exists: {cert_file} (use --force)")
|
|
|
|
# Generate private key
|
|
private_key: rsa.RSAPrivateKey | ec.EllipticCurvePrivateKey
|
|
if args.algorithm == "rsa":
|
|
key_size = args.bits or 4096
|
|
print(f"generating {key_size}-bit RSA key...", file=sys.stderr)
|
|
private_key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)
|
|
elif args.algorithm == "ec":
|
|
curve_name = args.curve or "secp384r1"
|
|
curves = {
|
|
"secp256r1": ec.SECP256R1(),
|
|
"secp384r1": ec.SECP384R1(),
|
|
"secp521r1": ec.SECP521R1(),
|
|
}
|
|
if curve_name not in curves:
|
|
die(f"unsupported curve: {curve_name} (use: secp256r1, secp384r1, secp521r1)")
|
|
print(f"generating EC key ({curve_name})...", file=sys.stderr)
|
|
private_key = ec.generate_private_key(curves[curve_name])
|
|
else:
|
|
die(f"unsupported algorithm: {args.algorithm}")
|
|
|
|
cn = args.name or os.environ.get("USER", "fpaste-client")
|
|
subject = issuer = x509.Name(
|
|
[
|
|
x509.NameAttribute(NameOID.COMMON_NAME, cn),
|
|
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "FlaskPaste Client"),
|
|
]
|
|
)
|
|
|
|
days = args.days or 365
|
|
now = datetime.now(UTC)
|
|
|
|
cert_builder = (
|
|
x509.CertificateBuilder()
|
|
.subject_name(subject)
|
|
.issuer_name(issuer)
|
|
.public_key(private_key.public_key())
|
|
.serial_number(x509.random_serial_number())
|
|
.not_valid_before(now)
|
|
.not_valid_after(now + timedelta(days=days))
|
|
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
|
|
.add_extension(
|
|
x509.KeyUsage(
|
|
digital_signature=True,
|
|
key_encipherment=True,
|
|
content_commitment=False,
|
|
data_encipherment=False,
|
|
key_agreement=False,
|
|
key_cert_sign=False,
|
|
crl_sign=False,
|
|
encipher_only=False,
|
|
decipher_only=False,
|
|
),
|
|
critical=True,
|
|
)
|
|
.add_extension(
|
|
x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]),
|
|
critical=False,
|
|
)
|
|
)
|
|
|
|
print("signing certificate...", file=sys.stderr)
|
|
certificate = cert_builder.sign(private_key, hashes.SHA256())
|
|
|
|
cert_der = certificate.public_bytes(serialization.Encoding.DER)
|
|
fingerprint = hashlib.sha1(cert_der, usedforsecurity=False).hexdigest()
|
|
|
|
key_encryption = (
|
|
serialization.BestAvailableEncryption(args.password_key.encode())
|
|
if args.password_key
|
|
else serialization.NoEncryption()
|
|
)
|
|
|
|
key_pem = private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=key_encryption,
|
|
)
|
|
cert_pem = certificate.public_bytes(serialization.Encoding.PEM)
|
|
|
|
key_file.write_bytes(key_pem)
|
|
key_file.chmod(0o600)
|
|
cert_file.write_bytes(cert_pem)
|
|
|
|
print(f"key: {key_file}", file=sys.stderr)
|
|
print(f"certificate: {cert_file}", file=sys.stderr)
|
|
print(f"fingerprint: {fingerprint}", file=sys.stderr)
|
|
print(f"valid for: {days} days", file=sys.stderr)
|
|
print(f"common name: {cn}", file=sys.stderr)
|
|
|
|
if args.configure:
|
|
cfg_path = write_config_file(
|
|
{
|
|
"client_cert": str(cert_file),
|
|
"client_key": str(key_file),
|
|
"cert_sha1": fingerprint,
|
|
}
|
|
)
|
|
print(f"config: {cfg_path} (updated)", file=sys.stderr)
|
|
|
|
print(fingerprint)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Argument parsing
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
def is_file_path(arg: str) -> bool:
|
|
"""Check if argument looks like a file path."""
|
|
if not arg or arg.startswith("-"):
|
|
return False
|
|
if Path(arg).exists():
|
|
return True
|
|
if "/" in arg or "\\" in arg:
|
|
return True
|
|
if "." in arg and not arg.startswith("."):
|
|
ext = arg.rsplit(".", 1)[-1].lower()
|
|
return ext in FILE_EXTENSIONS
|
|
return False
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
"""Build and return the argument parser."""
|
|
parser = argparse.ArgumentParser(
|
|
prog="fpaste",
|
|
description="FlaskPaste command-line client",
|
|
epilog="Shortcut: fpaste <file> is equivalent to fpaste create <file>",
|
|
)
|
|
parser.add_argument("-s", "--server", help="server URL (env: FLASKPASTE_SERVER)")
|
|
subparsers = parser.add_subparsers(dest="command", metavar="command")
|
|
|
|
# create
|
|
p_create = subparsers.add_parser("create", aliases=["c", "new"], help="create paste")
|
|
p_create.add_argument("file", nargs="?", help="file to upload (- for stdin)")
|
|
p_create.add_argument("-E", "--no-encrypt", action="store_true", help="disable encryption")
|
|
p_create.add_argument("-b", "--burn", action="store_true", help="burn after read")
|
|
p_create.add_argument("-x", "--expiry", type=int, metavar="SEC", help="expiry in seconds")
|
|
p_create.add_argument("-p", "--password", metavar="PASS", help="password protect")
|
|
p_create.add_argument("-r", "--raw", action="store_true", help="output raw URL")
|
|
p_create.add_argument("-q", "--quiet", action="store_true", help="output ID only")
|
|
p_create.add_argument("-C", "--clipboard", action="store_true", help="read from clipboard")
|
|
p_create.add_argument("--copy-url", action="store_true", help="copy result URL to clipboard")
|
|
|
|
# get
|
|
p_get = subparsers.add_parser("get", aliases=["g"], help="retrieve paste")
|
|
p_get.add_argument("id", help="paste ID or URL")
|
|
p_get.add_argument("-o", "--output", help="save to file")
|
|
p_get.add_argument("-c", "--copy", action="store_true", help="copy content to clipboard")
|
|
p_get.add_argument("-p", "--password", metavar="PASS", help="password for protected paste")
|
|
p_get.add_argument("-m", "--meta", action="store_true", help="show metadata only")
|
|
|
|
# delete
|
|
p_delete = subparsers.add_parser("delete", aliases=["d", "rm"], help="delete paste(s)")
|
|
p_delete.add_argument("ids", nargs="*", metavar="ID", help="paste ID(s) or URL(s)")
|
|
p_delete.add_argument("-a", "--all", action="store_true", help="delete all pastes (admin)")
|
|
p_delete.add_argument(
|
|
"-c", "--confirm", type=int, metavar="N", help="confirm expected delete count"
|
|
)
|
|
|
|
# info
|
|
subparsers.add_parser("info", aliases=["i"], help="show server info")
|
|
|
|
# list
|
|
p_list = subparsers.add_parser("list", aliases=["ls"], help="list your pastes")
|
|
p_list.add_argument("-a", "--all", action="store_true", help="list all pastes (admin only)")
|
|
p_list.add_argument("-l", "--limit", type=int, metavar="N", help="max pastes (default: 50)")
|
|
p_list.add_argument("-o", "--offset", type=int, metavar="N", help="skip first N pastes")
|
|
p_list.add_argument("--json", action="store_true", help="output as JSON")
|
|
|
|
# search
|
|
p_search = subparsers.add_parser("search", aliases=["s", "find"], help="search your pastes")
|
|
p_search.add_argument("-t", "--type", metavar="PATTERN", help="filter by MIME type (image/*)")
|
|
p_search.add_argument("--after", metavar="DATE", help="created after (YYYY-MM-DD or timestamp)")
|
|
p_search.add_argument("--before", metavar="DATE", help="created before (YYYY-MM-DD)")
|
|
p_search.add_argument("-l", "--limit", type=int, metavar="N", help="max results (default: 50)")
|
|
p_search.add_argument("--json", action="store_true", help="output as JSON")
|
|
|
|
# update
|
|
p_update = subparsers.add_parser("update", aliases=["u"], help="update existing paste")
|
|
p_update.add_argument("id", help="paste ID or URL")
|
|
p_update.add_argument("file", nargs="?", help="new content (- for stdin)")
|
|
p_update.add_argument("-E", "--no-encrypt", action="store_true", help="disable encryption")
|
|
p_update.add_argument("-p", "--password", metavar="PASS", help="set/change password")
|
|
p_update.add_argument("--remove-password", action="store_true", help="remove password")
|
|
p_update.add_argument("-x", "--expiry", type=int, metavar="SEC", help="extend expiry (seconds)")
|
|
p_update.add_argument("-q", "--quiet", action="store_true", help="minimal output")
|
|
|
|
# export
|
|
p_export = subparsers.add_parser("export", help="export all pastes to directory")
|
|
p_export.add_argument("-o", "--output", metavar="DIR", help="output directory")
|
|
p_export.add_argument("-k", "--keyfile", metavar="FILE", help="key file (paste_id=key format)")
|
|
p_export.add_argument("--manifest", action="store_true", help="write manifest.json")
|
|
p_export.add_argument("-q", "--quiet", action="store_true", help="minimal output")
|
|
|
|
# register
|
|
p_register = subparsers.add_parser("register", help="register and get client certificate")
|
|
p_register.add_argument("-n", "--name", metavar="CN", help="common name (optional)")
|
|
p_register.add_argument("-o", "--output", metavar="DIR", help="output directory")
|
|
p_register.add_argument("--configure", action="store_true", help="update config file")
|
|
p_register.add_argument("--p12-only", action="store_true", help="save only PKCS#12")
|
|
p_register.add_argument("-f", "--force", action="store_true", help="overwrite existing files")
|
|
p_register.add_argument("-q", "--quiet", action="store_true", help="minimal output")
|
|
|
|
# cert
|
|
p_cert = subparsers.add_parser("cert", help="generate client certificate")
|
|
p_cert.add_argument("-o", "--output", metavar="DIR", help="output directory")
|
|
p_cert.add_argument(
|
|
"-a", "--algorithm", choices=["rsa", "ec"], default="ec", help="key algorithm"
|
|
)
|
|
p_cert.add_argument("-b", "--bits", type=int, metavar="N", help="RSA key size (default: 4096)")
|
|
p_cert.add_argument(
|
|
"-c", "--curve", metavar="CURVE", help="EC curve (secp256r1/secp384r1/secp521r1)"
|
|
)
|
|
p_cert.add_argument("-d", "--days", type=int, metavar="N", help="validity period in days")
|
|
p_cert.add_argument("-n", "--name", metavar="CN", help="common name (default: $USER)")
|
|
p_cert.add_argument("--password-key", metavar="PASS", help="encrypt private key")
|
|
p_cert.add_argument("--configure", action="store_true", help="update config file")
|
|
p_cert.add_argument("-f", "--force", action="store_true", help="overwrite existing files")
|
|
|
|
# pki
|
|
p_pki = subparsers.add_parser("pki", help="PKI operations (server-issued certificates)")
|
|
pki_sub = p_pki.add_subparsers(dest="pki_command", metavar="subcommand")
|
|
|
|
pki_sub.add_parser("status", help="show PKI status and CA info")
|
|
|
|
p_pki_issue = pki_sub.add_parser("issue", help="request certificate from server CA")
|
|
p_pki_issue.add_argument(
|
|
"-n", "--name", required=True, metavar="CN", help="common name (required)"
|
|
)
|
|
p_pki_issue.add_argument("-o", "--output", metavar="DIR", help="output directory")
|
|
p_pki_issue.add_argument("--configure", action="store_true", help="update config file")
|
|
p_pki_issue.add_argument("-f", "--force", action="store_true", help="overwrite existing files")
|
|
|
|
p_pki_download = pki_sub.add_parser("download", aliases=["dl"], help="download CA certificate")
|
|
p_pki_download.add_argument("-o", "--output", metavar="FILE", help="save to file")
|
|
p_pki_download.add_argument(
|
|
"--configure", action="store_true", help="update config file (requires -o)"
|
|
)
|
|
|
|
# completion
|
|
p_completion = subparsers.add_parser("completion", help="generate shell completion")
|
|
p_completion.add_argument(
|
|
"--shell", choices=["bash", "zsh", "fish"], default="bash", help="shell type"
|
|
)
|
|
|
|
return parser
|
|
|
|
|
|
def cmd_completion(args: argparse.Namespace, config: dict[str, Any]) -> None:
|
|
"""Output shell completion script."""
|
|
shell = getattr(args, "shell", None) or "bash"
|
|
|
|
# Bash completion - full featured
|
|
bash_completion = """\
|
|
# Bash completion for fpaste
|
|
# Install: source this file or copy to /etc/bash_completion.d/fpaste
|
|
|
|
_fpaste_completions() {
|
|
local cur prev words cword
|
|
_init_completion || return
|
|
|
|
local commands="create c new get g delete d rm info i list ls"
|
|
commands+=" search s find update u export register cert pki completion"
|
|
local pki_commands="status issue download dl"
|
|
|
|
if [[ $cword -eq 1 ]]; then
|
|
COMPREPLY=($(compgen -W "$commands" -- "$cur"))
|
|
return
|
|
fi
|
|
|
|
local cmd="${words[1]}"
|
|
|
|
if [[ "$cmd" == "pki" && $cword -eq 2 ]]; then
|
|
COMPREPLY=($(compgen -W "$pki_commands" -- "$cur"))
|
|
return
|
|
fi
|
|
|
|
case "$cmd" in
|
|
create|c|new)
|
|
local opts="-E --no-encrypt -b --burn -x --expiry -p --password"
|
|
opts+=" -r --raw -q --quiet -C --clipboard --copy-url"
|
|
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) || _filedir
|
|
;;
|
|
get|g)
|
|
local opts="-o --output -c --copy -p --password -m --meta"
|
|
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
|
|
;;
|
|
delete|d|rm)
|
|
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "-a --all -c --confirm" -- "$cur"))
|
|
;;
|
|
list|ls)
|
|
local opts="-a --all -l --limit -o --offset --json"
|
|
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
|
|
;;
|
|
search|s|find)
|
|
local opts="-t --type --after --before -l --limit --json"
|
|
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
|
|
;;
|
|
update|u)
|
|
local opts="-E --no-encrypt -p --password --remove-password -x --expiry -q --quiet"
|
|
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) || _filedir
|
|
;;
|
|
export)
|
|
local opts="-o --output -k --keyfile --manifest -q --quiet"
|
|
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
|
|
;;
|
|
register)
|
|
local opts="-n --name -o --output --configure --p12-only -f --force -q --quiet"
|
|
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
|
|
;;
|
|
cert)
|
|
local opts="-o --output -a --algorithm -b --bits -c --curve"
|
|
opts+=" -d --days -n --name --password-key --configure -f --force"
|
|
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
|
|
;;
|
|
completion)
|
|
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "--shell" -- "$cur"))
|
|
[[ "$prev" == "--shell" ]] && COMPREPLY=($(compgen -W "bash zsh fish" -- "$cur"))
|
|
;;
|
|
esac
|
|
}
|
|
|
|
complete -F _fpaste_completions fpaste
|
|
"""
|
|
|
|
# Zsh completion - compact format
|
|
zsh_completion = """\
|
|
#compdef fpaste
|
|
_fpaste() {
|
|
local curcontext="$curcontext" state line
|
|
typeset -A opt_args
|
|
_arguments -C '-s[Server]:url:' '--server[Server]:url:' '1: :->cmd' '*:: :->args'
|
|
case $state in
|
|
cmd)
|
|
local cmds=('create:Create paste' 'get:Get paste' 'delete:Delete'
|
|
'info:Server info' 'list:List pastes' 'search:Search'
|
|
'update:Update paste' 'export:Export' 'register:Register'
|
|
'cert:Generate cert' 'pki:PKI ops' 'completion:Completions')
|
|
_describe -t commands 'commands' cmds
|
|
;;
|
|
args)
|
|
case $line[1] in
|
|
create|c|new)
|
|
_arguments '-E[No encrypt]' '-b[Burn]' '-x[Expiry]:sec:' \\
|
|
'-p[Pass]:p:' '-r[Raw]' '-q[Quiet]' '-C[Clipboard]' \\
|
|
'--copy-url' '*:file:_files' ;;
|
|
get|g)
|
|
_arguments '-o[Out]:f:_files' '-c[Copy]' '-p[Pass]:p:' \\
|
|
'-m[Meta]' '1:ID:' ;;
|
|
delete|d|rm) _arguments '-a[All]' '-c[Confirm]:n:' '*:ID:' ;;
|
|
list|ls) _arguments '-a[All]' '-l[Limit]:n:' '-o[Off]:n:' '--json' ;;
|
|
search|s|find)
|
|
_arguments '-t[Type]:p:' '--after:d:' '--before:d:' \\
|
|
'-l[Limit]:n:' '--json' ;;
|
|
update|u)
|
|
_arguments '-E[No encrypt]' '-p[Pass]:p:' '--remove-password' \\
|
|
'-x[Expiry]:s:' '-q[Quiet]' '1:ID:' '*:file:_files' ;;
|
|
export)
|
|
_arguments '-o[Dir]:d:_files -/' '-k[Keys]:f:_files' \\
|
|
'--manifest' '-q[Quiet]' ;;
|
|
register)
|
|
_arguments '-n[Name]:cn:' '-o[Dir]:d:_files -/' '--configure' \\
|
|
'--p12-only' '-f[Force]' '-q[Quiet]' ;;
|
|
cert)
|
|
_arguments '-o[Dir]:d:_files -/' '-a[Algo]:(rsa ec)' \\
|
|
'-b[Bits]:n:' '-c[Curve]:(secp256r1 secp384r1 secp521r1)' \\
|
|
'-d[Days]:n:' '-n[Name]:cn:' '--configure' '-f[Force]' ;;
|
|
pki) (( CURRENT == 2 )) && _describe 'cmd' '(status issue download)' ;;
|
|
completion) _arguments '--shell:(bash zsh fish)' ;;
|
|
esac ;;
|
|
esac
|
|
}
|
|
_fpaste "$@"
|
|
"""
|
|
|
|
# Fish completion - compact
|
|
fish_completion = """\
|
|
# Fish completion for fpaste
|
|
complete -c fpaste -f
|
|
complete -c fpaste -n __fish_use_subcommand -a 'create c new' -d 'Create'
|
|
complete -c fpaste -n __fish_use_subcommand -a 'get g' -d 'Get paste'
|
|
complete -c fpaste -n __fish_use_subcommand -a 'delete d rm' -d 'Delete'
|
|
complete -c fpaste -n __fish_use_subcommand -a 'info i' -d 'Server info'
|
|
complete -c fpaste -n __fish_use_subcommand -a 'list ls' -d 'List'
|
|
complete -c fpaste -n __fish_use_subcommand -a 'search s find' -d 'Search'
|
|
complete -c fpaste -n __fish_use_subcommand -a 'update u' -d 'Update'
|
|
complete -c fpaste -n __fish_use_subcommand -a 'export' -d 'Export'
|
|
complete -c fpaste -n __fish_use_subcommand -a 'register' -d 'Register'
|
|
complete -c fpaste -n __fish_use_subcommand -a 'cert' -d 'Gen cert'
|
|
complete -c fpaste -n __fish_use_subcommand -a 'pki' -d 'PKI'
|
|
complete -c fpaste -n __fish_use_subcommand -a 'completion' -d 'Completions'
|
|
|
|
set -l cr '__fish_seen_subcommand_from create c new'
|
|
complete -c fpaste -n $cr -s E -l no-encrypt -d 'No encrypt'
|
|
complete -c fpaste -n $cr -s b -l burn -d 'Burn'
|
|
complete -c fpaste -n $cr -s x -l expiry -d 'Expiry' -x
|
|
complete -c fpaste -n $cr -s p -l password -d 'Password' -x
|
|
complete -c fpaste -n $cr -s r -l raw -d 'Raw URL'
|
|
complete -c fpaste -n $cr -s q -l quiet -d 'Quiet'
|
|
complete -c fpaste -n $cr -s C -l clipboard -d 'Clipboard'
|
|
complete -c fpaste -n $cr -l copy-url -d 'Copy URL'
|
|
complete -c fpaste -n $cr -F
|
|
|
|
set -l gt '__fish_seen_subcommand_from get g'
|
|
complete -c fpaste -n $gt -s o -l output -d 'Output' -r
|
|
complete -c fpaste -n $gt -s c -l copy -d 'Copy'
|
|
complete -c fpaste -n $gt -s p -l password -d 'Password' -x
|
|
complete -c fpaste -n $gt -s m -l meta -d 'Metadata'
|
|
|
|
set -l dl '__fish_seen_subcommand_from delete d rm'
|
|
complete -c fpaste -n $dl -s a -l all -d 'All'
|
|
complete -c fpaste -n $dl -s c -l confirm -d 'Confirm' -x
|
|
|
|
set -l ls '__fish_seen_subcommand_from list ls'
|
|
complete -c fpaste -n $ls -s a -l all -d 'All'
|
|
complete -c fpaste -n $ls -s l -l limit -d 'Limit' -x
|
|
complete -c fpaste -n $ls -s o -l offset -d 'Offset' -x
|
|
complete -c fpaste -n $ls -l json -d 'JSON'
|
|
|
|
set -l cp '__fish_seen_subcommand_from completion'
|
|
complete -c fpaste -n $cp -l shell -d 'Shell' -xa 'bash zsh fish'
|
|
"""
|
|
|
|
completions = {"bash": bash_completion, "zsh": zsh_completion, "fish": fish_completion}
|
|
|
|
if shell not in completions:
|
|
die(f"unsupported shell: {shell} (use: bash, zsh, fish)")
|
|
|
|
print(completions[shell])
|
|
|
|
|
|
# Command dispatch table
|
|
COMMANDS: dict[str, Any] = {
|
|
"create": cmd_create,
|
|
"c": cmd_create,
|
|
"new": cmd_create,
|
|
"get": cmd_get,
|
|
"g": cmd_get,
|
|
"delete": cmd_delete,
|
|
"d": cmd_delete,
|
|
"rm": cmd_delete,
|
|
"info": cmd_info,
|
|
"i": cmd_info,
|
|
"list": cmd_list,
|
|
"ls": cmd_list,
|
|
"search": cmd_search,
|
|
"s": cmd_search,
|
|
"find": cmd_search,
|
|
"update": cmd_update,
|
|
"u": cmd_update,
|
|
"export": cmd_export,
|
|
"register": cmd_register,
|
|
"cert": cmd_cert,
|
|
"completion": cmd_completion,
|
|
}
|
|
|
|
PKI_COMMANDS: dict[str, Any] = {
|
|
"status": cmd_pki_status,
|
|
"issue": cmd_pki_issue,
|
|
"download": cmd_pki_download,
|
|
"dl": cmd_pki_download,
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
"""Main entry point."""
|
|
args_to_parse = sys.argv[1:]
|
|
command_names = set(COMMANDS.keys()) | {"pki"}
|
|
|
|
# Auto-insert "create" if first positional looks like a file
|
|
insert_pos = 0
|
|
has_command = False
|
|
file_pos = -1
|
|
|
|
i = 0
|
|
while i < len(args_to_parse):
|
|
arg = args_to_parse[i]
|
|
if arg in ("-s", "--server"):
|
|
insert_pos = i + 2
|
|
i += 2
|
|
continue
|
|
if arg in ("-h", "--help"):
|
|
i += 1
|
|
insert_pos = i
|
|
continue
|
|
if arg.startswith("-"):
|
|
i += 1
|
|
continue
|
|
if arg in command_names:
|
|
has_command = True
|
|
break
|
|
if is_file_path(arg):
|
|
file_pos = i
|
|
break
|
|
i += 1
|
|
|
|
if not has_command and (file_pos >= 0 or not sys.stdin.isatty()):
|
|
args_to_parse.insert(insert_pos, "create")
|
|
|
|
parser = build_parser()
|
|
args = parser.parse_args(args_to_parse)
|
|
config = get_config()
|
|
|
|
if args.server:
|
|
config["server"] = args.server
|
|
|
|
config["ssl_context"] = create_ssl_context(config)
|
|
|
|
if not args.command:
|
|
if not sys.stdin.isatty():
|
|
args.command = "create"
|
|
args.file = None
|
|
args.no_encrypt = False
|
|
args.burn = False
|
|
args.expiry = None
|
|
args.password = None
|
|
args.raw = False
|
|
args.quiet = False
|
|
else:
|
|
parser.print_help()
|
|
sys.exit(0)
|
|
|
|
if args.command == "pki":
|
|
if args.pki_command in PKI_COMMANDS:
|
|
PKI_COMMANDS[args.pki_command](args, config)
|
|
else:
|
|
parser.parse_args(["pki", "--help"])
|
|
elif args.command in COMMANDS:
|
|
COMMANDS[args.command](args, config)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|