Files
flaskpaste/fpaste
2025-12-20 18:22:59 +01:00

895 lines
31 KiB
Python
Executable File

#!/usr/bin/env python3
"""FlaskPaste command-line client."""
import argparse
import base64
import hashlib
import json
import os
import ssl
import sys
import urllib.error
import urllib.request
from datetime import UTC, datetime, timedelta
from pathlib import Path
# Optional cryptography support (for encryption and cert generation)
try:
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.x509.oid import NameOID
HAS_CRYPTO = True
except ImportError:
HAS_CRYPTO = False
def get_config():
"""Load configuration from environment or config file."""
config = {
"server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000"),
"cert_sha1": os.environ.get("FLASKPASTE_CERT_SHA1", ""),
"client_cert": os.environ.get("FLASKPASTE_CLIENT_CERT", ""),
"client_key": os.environ.get("FLASKPASTE_CLIENT_KEY", ""),
"ca_cert": os.environ.get("FLASKPASTE_CA_CERT", ""),
}
# Try config file
config_file = Path.home() / ".config" / "fpaste" / "config"
if config_file.exists():
for line in config_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, value = line.split("=", 1)
key = key.strip().lower()
value = value.strip().strip('"').strip("'")
if key == "server":
config["server"] = value
elif key == "cert_sha1":
config["cert_sha1"] = value
elif key == "client_cert":
config["client_cert"] = value
elif key == "client_key":
config["client_key"] = value
elif key == "ca_cert":
config["ca_cert"] = value
return config
def create_ssl_context(config):
"""Create SSL context for mTLS if certificates are configured."""
client_cert = config.get("client_cert", "")
client_key = config.get("client_key", "")
ca_cert = config.get("ca_cert", "")
if not client_cert:
return None
ctx = ssl.create_default_context()
# Load CA certificate if specified
if ca_cert:
ctx.load_verify_locations(ca_cert)
# Load client certificate and key
try:
ctx.load_cert_chain(certfile=client_cert, keyfile=client_key or None)
except ssl.SSLError as e:
die(f"failed to load client certificate: {e}")
except FileNotFoundError as e:
die(f"certificate file not found: {e}")
return ctx
def request(url, method="GET", data=None, headers=None, ssl_context=None):
"""Make HTTP request and return response."""
headers = headers or {}
# User-configured server URL, audit is expected
req = urllib.request.Request(url, data=data, headers=headers, method=method) # noqa: S310
try:
# User-configured server URL, audit is expected
with urllib.request.urlopen(req, timeout=30, context=ssl_context) as resp: # noqa: S310
return resp.status, resp.read(), dict(resp.headers)
except urllib.error.HTTPError as e:
return e.code, e.read(), dict(e.headers)
except urllib.error.URLError as e:
die(f"Connection failed: {e.reason}")
def die(msg, code=1):
"""Print error and exit."""
print(f"error: {msg}", file=sys.stderr)
sys.exit(code)
def encrypt_content(plaintext):
"""Encrypt content with AES-256-GCM. Returns (ciphertext, key)."""
if not HAS_CRYPTO:
die("encryption requires 'cryptography' package: pip install cryptography")
key = os.urandom(32)
nonce = os.urandom(12) # 96-bit nonce for GCM
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, plaintext, None)
return nonce + ciphertext, key
def decrypt_content(blob, key):
"""Decrypt AES-256-GCM encrypted content."""
if not HAS_CRYPTO:
die("decryption requires 'cryptography' package: pip install cryptography")
if len(blob) < 12:
die("encrypted content too short")
nonce, ciphertext = blob[:12], blob[12:]
aesgcm = AESGCM(key)
try:
return aesgcm.decrypt(nonce, ciphertext, None)
except Exception:
die("decryption failed (wrong key or corrupted data)")
def encode_key(key):
"""Encode key as URL-safe base64."""
return base64.urlsafe_b64encode(key).decode().rstrip("=")
def decode_key(encoded):
"""Decode URL-safe base64 key."""
# Add padding if needed
padding = 4 - (len(encoded) % 4)
if padding != 4:
encoded += "=" * padding
try:
return base64.urlsafe_b64decode(encoded)
except Exception:
die("invalid encryption key in URL")
def solve_pow(nonce, difficulty):
"""Solve proof-of-work challenge.
Find a number N such that SHA256(nonce:N) has `difficulty` leading zero bits.
"""
n = 0
target_bytes = (difficulty + 7) // 8 # Bytes to check
while True:
work = f"{nonce}:{n}".encode()
hash_bytes = hashlib.sha256(work).digest()
# Count leading zero bits
zero_bits = 0
for byte in hash_bytes[: target_bytes + 1]:
if byte == 0:
zero_bits += 8
else:
zero_bits += 8 - byte.bit_length()
break
if zero_bits >= difficulty:
return n
n += 1
# Progress indicator for high difficulty
if n % 100000 == 0:
print(f"\rsolving pow: {n} attempts...", end="", file=sys.stderr)
return n
def get_challenge(config):
"""Fetch PoW challenge from server."""
url = config["server"].rstrip("/") + "/challenge"
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status != 200:
return None
data = json.loads(body)
if not data.get("enabled"):
return None
return data
def cmd_create(args, config):
"""Create a new paste."""
# Read content from file or stdin
if args.file:
if args.file == "-":
content = sys.stdin.buffer.read()
else:
path = Path(args.file)
if not path.exists():
die(f"file not found: {args.file}")
content = path.read_bytes()
else:
# No file specified, read from stdin
if sys.stdin.isatty():
die("no input provided (pipe data or specify file)")
content = sys.stdin.buffer.read()
if not content:
die("empty content")
# Encrypt by default (unless --no-encrypt)
encryption_key = None
if not getattr(args, "no_encrypt", False):
if not HAS_CRYPTO:
die("encryption requires 'cryptography' package (use -E to disable)")
if not args.quiet:
print("encrypting...", end="", file=sys.stderr)
content, encryption_key = encrypt_content(content)
if not args.quiet:
print(" done", file=sys.stderr)
headers = {}
if config["cert_sha1"]:
headers["X-SSL-Client-SHA1"] = config["cert_sha1"]
# Add burn-after-read header
if args.burn:
headers["X-Burn-After-Read"] = "true"
# Add custom expiry header
if args.expiry:
headers["X-Expiry"] = str(args.expiry)
# Add password header
if args.password:
headers["X-Paste-Password"] = args.password
# Get and solve PoW challenge if required
challenge = get_challenge(config)
if challenge:
if not args.quiet:
print(f"solving pow (difficulty={challenge['difficulty']})...", end="", file=sys.stderr)
solution = solve_pow(challenge["nonce"], challenge["difficulty"])
if not args.quiet:
print(" done", file=sys.stderr)
headers["X-PoW-Token"] = challenge["token"]
headers["X-PoW-Solution"] = str(solution)
url = config["server"].rstrip("/") + "/"
status, body, _ = request(
url, method="POST", data=content, headers=headers, ssl_context=config.get("ssl_context")
)
if status == 201:
data = json.loads(body)
# Append encryption key to URL fragment if encrypted
key_fragment = ""
if encryption_key:
key_fragment = "#" + encode_key(encryption_key)
if args.raw:
print(config["server"].rstrip("/") + data["raw"] + key_fragment)
elif args.quiet:
print(data["id"] + key_fragment)
else:
print(config["server"].rstrip("/") + data["url"] + key_fragment)
else:
try:
err = json.loads(body).get("error", body.decode())
except (json.JSONDecodeError, UnicodeDecodeError):
err = body.decode(errors="replace")
die(f"create failed ({status}): {err}")
def cmd_get(args, config):
"""Retrieve a paste."""
# Parse URL for paste ID and optional encryption key fragment
url_input = args.id
encryption_key = None
# Extract key from URL fragment (#...)
if "#" in url_input:
url_input, key_encoded = url_input.rsplit("#", 1)
if key_encoded:
encryption_key = decode_key(key_encoded)
paste_id = url_input.split("/")[-1] # Handle full URLs
base = config["server"].rstrip("/")
# Build headers for password-protected pastes
headers = {}
if args.password:
headers["X-Paste-Password"] = args.password
if args.meta:
url = f"{base}/{paste_id}"
status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context"))
if status == 200:
data = json.loads(body)
print(f"id: {data['id']}")
print(f"mime_type: {data['mime_type']}")
print(f"size: {data['size']}")
print(f"created_at: {data['created_at']}")
if encryption_key:
print("encrypted: yes (key in URL)")
if data.get("password_protected"):
print("protected: yes (password required)")
elif status == 401:
die("password required (-p)")
elif status == 403:
die("invalid password")
else:
die(f"not found: {paste_id}")
else:
url = f"{base}/{paste_id}/raw"
ssl_ctx = config.get("ssl_context")
status, body, _ = request(url, headers=headers, ssl_context=ssl_ctx)
if status == 200:
# Decrypt if encryption key was provided
if encryption_key:
body = decrypt_content(body, encryption_key)
if args.output:
Path(args.output).write_bytes(body)
print(f"saved: {args.output}", file=sys.stderr)
else:
# Write binary to stdout
sys.stdout.buffer.write(body)
# Add newline if content doesn't end with one and stdout is tty
if sys.stdout.isatty() and body and not body.endswith(b"\n"):
sys.stdout.buffer.write(b"\n")
elif status == 401:
die("password required (-p)")
elif status == 403:
die("invalid password")
else:
die(f"not found: {paste_id}")
def cmd_delete(args, config):
"""Delete a paste."""
if not config["cert_sha1"]:
die("authentication required (set FLASKPASTE_CERT_SHA1)")
paste_id = args.id.split("/")[-1]
base = config["server"].rstrip("/")
url = f"{base}/{paste_id}"
headers = {"X-SSL-Client-SHA1": config["cert_sha1"]}
status, _, _ = request(
url, method="DELETE", headers=headers, ssl_context=config.get("ssl_context")
)
if status == 200:
print(f"deleted: {paste_id}")
elif status == 404:
die(f"not found: {paste_id}")
elif status == 403:
die("permission denied (not owner)")
elif status == 401:
die("authentication failed")
else:
die(f"delete failed ({status})")
def cmd_info(args, config):
"""Show server info."""
url = config["server"].rstrip("/") + "/"
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status == 200:
data = json.loads(body)
print(f"server: {config['server']}")
print(f"name: {data.get('name', 'unknown')}")
print(f"version: {data.get('version', 'unknown')}")
else:
die("failed to connect to server")
def cmd_pki_status(args, config):
"""Show PKI status and CA information."""
url = config["server"].rstrip("/") + "/pki"
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status == 404:
die("PKI not enabled on this server")
elif status != 200:
die(f"failed to get PKI status ({status})")
data = json.loads(body)
print(f"pki enabled: {data.get('enabled', False)}")
print(f"ca exists: {data.get('ca_exists', False)}")
if data.get("ca_exists"):
print(f"common name: {data.get('common_name', 'unknown')}")
print(f"fingerprint: {data.get('fingerprint_sha1', 'unknown')}")
if data.get("created_at"):
print(f"created: {data.get('created_at')}")
if data.get("expires_at"):
print(f"expires: {data.get('expires_at')}")
print(f"download: {config['server'].rstrip('/')}{data.get('download', '/pki/ca.crt')}")
elif data.get("hint"):
print(f"hint: {data.get('hint')}")
def cmd_pki_issue(args, config):
"""Request a new client certificate from the server CA."""
url = config["server"].rstrip("/") + "/pki/issue"
headers = {"Content-Type": "application/json"}
if config["cert_sha1"]:
headers["X-SSL-Client-SHA1"] = config["cert_sha1"]
payload = {"common_name": args.name}
data = json.dumps(payload).encode()
status, body, _ = request(
url, method="POST", data=data, headers=headers, ssl_context=config.get("ssl_context")
)
if status == 404:
# Could be PKI disabled or no CA
try:
err = json.loads(body).get("error", "PKI not available")
except (json.JSONDecodeError, UnicodeDecodeError):
err = "PKI not available"
die(err)
elif status == 400:
try:
err = json.loads(body).get("error", "bad request")
except (json.JSONDecodeError, UnicodeDecodeError):
err = "bad request"
die(err)
elif status != 201:
die(f"certificate issuance failed ({status})")
result = json.loads(body)
# Determine output directory
out_dir = Path(args.output) if args.output else Path.home() / ".config" / "fpaste"
out_dir.mkdir(parents=True, exist_ok=True)
# File paths
key_file = out_dir / "client.key"
cert_file = out_dir / "client.crt"
# Check for existing files
if not args.force:
if key_file.exists():
die(f"key file exists: {key_file} (use --force)")
if cert_file.exists():
die(f"cert file exists: {cert_file} (use --force)")
# Write files
key_file.write_text(result["private_key_pem"])
key_file.chmod(0o600)
cert_file.write_text(result["certificate_pem"])
fingerprint = result.get("fingerprint_sha1", "unknown")
print(f"key: {key_file}", file=sys.stderr)
print(f"certificate: {cert_file}", file=sys.stderr)
print(f"fingerprint: {fingerprint}", file=sys.stderr)
print(f"serial: {result.get('serial', 'unknown')}", file=sys.stderr)
print(f"common name: {result.get('common_name', args.name)}", file=sys.stderr)
# Update config file if requested
if args.configure:
config_file = Path.home() / ".config" / "fpaste" / "config"
config_file.parent.mkdir(parents=True, exist_ok=True)
# Read existing config
existing = {}
if config_file.exists():
for line in config_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
existing[k.strip().lower()] = v.strip()
# Update values
existing["client_cert"] = str(cert_file)
existing["client_key"] = str(key_file)
existing["cert_sha1"] = fingerprint
# Write config
lines = [f"{k} = {v}" for k, v in sorted(existing.items())]
config_file.write_text("\n".join(lines) + "\n")
print(f"config: {config_file} (updated)", file=sys.stderr)
# Output fingerprint to stdout for easy capture
print(fingerprint)
def cmd_pki_download(args, config):
"""Download the CA certificate from the server."""
url = config["server"].rstrip("/") + "/pki/ca.crt"
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status == 404:
die("CA certificate not available (PKI disabled or CA not generated)")
elif status != 200:
die(f"failed to download CA certificate ({status})")
# Determine output
if args.output:
out_path = Path(args.output)
out_path.write_bytes(body)
print(f"saved: {out_path}", file=sys.stderr)
# Calculate and show fingerprint if cryptography available
if HAS_CRYPTO:
cert = x509.load_pem_x509_certificate(body)
# SHA1 is standard for X.509 fingerprints
fp = hashlib.sha1(cert.public_bytes(serialization.Encoding.DER)).hexdigest() # noqa: S324
print(f"fingerprint: {fp}", file=sys.stderr)
# Update config if requested
if args.configure:
config_file = Path.home() / ".config" / "fpaste" / "config"
config_file.parent.mkdir(parents=True, exist_ok=True)
existing = {}
if config_file.exists():
for line in config_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
existing[k.strip().lower()] = v.strip()
existing["ca_cert"] = str(out_path)
lines = [f"{k} = {v}" for k, v in sorted(existing.items())]
config_file.write_text("\n".join(lines) + "\n")
print(f"config: {config_file} (updated)", file=sys.stderr)
else:
# Output to stdout
sys.stdout.buffer.write(body)
def cmd_cert(args, config):
"""Generate a self-signed client certificate for mTLS authentication."""
if not HAS_CRYPTO:
die("certificate generation requires 'cryptography' package: pip install cryptography")
# Determine output directory
out_dir = Path(args.output) if args.output else Path.home() / ".config" / "fpaste"
out_dir.mkdir(parents=True, exist_ok=True)
# File paths
key_file = out_dir / "client.key"
cert_file = out_dir / "client.crt"
# Check for existing files
if not args.force:
if key_file.exists():
die(f"key file exists: {key_file} (use --force)")
if cert_file.exists():
die(f"cert file exists: {cert_file} (use --force)")
# Generate private key
if args.algorithm == "rsa":
key_size = args.bits or 4096
print(f"generating {key_size}-bit RSA key...", file=sys.stderr)
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
)
elif args.algorithm == "ec":
curve_name = args.curve or "secp384r1"
curves = {
"secp256r1": ec.SECP256R1(),
"secp384r1": ec.SECP384R1(),
"secp521r1": ec.SECP521R1(),
}
if curve_name not in curves:
die(f"unsupported curve: {curve_name} (use: secp256r1, secp384r1, secp521r1)")
print(f"generating EC key ({curve_name})...", file=sys.stderr)
private_key = ec.generate_private_key(curves[curve_name])
else:
die(f"unsupported algorithm: {args.algorithm}")
# Certificate subject
cn = args.name or os.environ.get("USER", "fpaste-client")
subject = issuer = x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, cn),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "FlaskPaste Client"),
]
)
# Validity period
days = args.days or 365
now = datetime.now(UTC)
# Build certificate
cert_builder = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + timedelta(days=days))
.add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True,
)
.add_extension(
x509.KeyUsage(
digital_signature=True,
key_encipherment=True,
content_commitment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False,
),
critical=True,
)
.add_extension(
x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]),
critical=False,
)
)
# Sign certificate
print("signing certificate...", file=sys.stderr)
certificate = cert_builder.sign(private_key, hashes.SHA256())
# Calculate SHA1 fingerprint (standard for X.509)
cert_der = certificate.public_bytes(serialization.Encoding.DER)
fingerprint = hashlib.sha1(cert_der).hexdigest() # noqa: S324
# Serialize private key
if args.password_key:
key_encryption = serialization.BestAvailableEncryption(args.password_key.encode("utf-8"))
else:
key_encryption = serialization.NoEncryption()
key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=key_encryption,
)
# Serialize certificate
cert_pem = certificate.public_bytes(serialization.Encoding.PEM)
# Write files
key_file.write_bytes(key_pem)
key_file.chmod(0o600) # Restrict permissions
cert_file.write_bytes(cert_pem)
print(f"key: {key_file}", file=sys.stderr)
print(f"certificate: {cert_file}", file=sys.stderr)
print(f"fingerprint: {fingerprint}", file=sys.stderr)
print(f"valid for: {days} days", file=sys.stderr)
print(f"common name: {cn}", file=sys.stderr)
# Update config file if requested
if args.configure:
config_file = Path.home() / ".config" / "fpaste" / "config"
config_file.parent.mkdir(parents=True, exist_ok=True)
# Read existing config
existing = {}
if config_file.exists():
for line in config_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
existing[k.strip().lower()] = v.strip()
# Update values
existing["client_cert"] = str(cert_file)
existing["client_key"] = str(key_file)
existing["cert_sha1"] = fingerprint
# Write config
lines = [f"{k} = {v}" for k, v in sorted(existing.items())]
config_file.write_text("\n".join(lines) + "\n")
print(f"config: {config_file} (updated)", file=sys.stderr)
# Output fingerprint to stdout for easy capture
print(fingerprint)
def is_file_path(arg):
"""Check if argument looks like a file path."""
if not arg or arg.startswith("-"):
return False
# Check if it's an existing file
if Path(arg).exists():
return True
# Check if it looks like a path (contains / or \ or common extensions)
if "/" in arg or "\\" in arg:
return True
# Check for common file extensions
if "." in arg and not arg.startswith("."):
ext = arg.rsplit(".", 1)[-1].lower()
if ext in ("txt", "md", "py", "js", "json", "yaml", "yml", "xml", "html",
"css", "sh", "bash", "c", "cpp", "h", "go", "rs", "java",
"rb", "php", "sql", "log", "conf", "cfg", "ini", "png",
"jpg", "jpeg", "gif", "pdf", "zip", "tar", "gz"):
return True
return False
def main():
# Pre-process arguments: if first positional looks like a file, insert "create"
args_to_parse = sys.argv[1:]
commands = {"create", "c", "new", "get", "g", "delete", "d", "rm", "info", "i", "cert", "pki"}
top_level_opts = {"-s", "--server", "-h", "--help"}
# Find insertion point for "create" command
insert_pos = 0
has_command = False
file_pos = -1
i = 0
while i < len(args_to_parse):
arg = args_to_parse[i]
if arg in ("-s", "--server"):
insert_pos = i + 2 # After -s value
i += 2
continue
if arg in ("-h", "--help"):
i += 1
insert_pos = i
continue
if arg.startswith("-"):
# Unknown option - might be for create subcommand
i += 1
continue
# Found positional argument
if arg in commands:
has_command = True
break
elif is_file_path(arg):
file_pos = i
break
i += 1
# Insert "create" if no command found and we have input
# (either a file path or piped stdin)
if not has_command:
if file_pos >= 0 or not sys.stdin.isatty():
args_to_parse.insert(insert_pos, "create")
parser = argparse.ArgumentParser(
prog="fpaste",
description="FlaskPaste command-line client",
epilog="Shortcut: fpaste <file> is equivalent to fpaste create <file>",
)
parser.add_argument(
"-s",
"--server",
help="server URL (env: FLASKPASTE_SERVER)",
)
subparsers = parser.add_subparsers(dest="command", metavar="command")
# create
p_create = subparsers.add_parser("create", aliases=["c", "new"], help="create paste")
p_create.add_argument("file", nargs="?", help="file to upload (- for stdin)")
p_create.add_argument("-E", "--no-encrypt", action="store_true", help="disable encryption")
p_create.add_argument("-b", "--burn", action="store_true", help="burn after read")
p_create.add_argument("-x", "--expiry", type=int, metavar="SEC", help="expiry in seconds")
p_create.add_argument("-p", "--password", metavar="PASS", help="password protect")
p_create.add_argument("-r", "--raw", action="store_true", help="output raw URL")
p_create.add_argument("-q", "--quiet", action="store_true", help="output ID only")
# get
p_get = subparsers.add_parser("get", aliases=["g"], help="retrieve paste")
p_get.add_argument("id", help="paste ID or URL")
p_get.add_argument("-o", "--output", help="save to file")
p_get.add_argument("-p", "--password", metavar="PASS", help="password for protected paste")
p_get.add_argument("-m", "--meta", action="store_true", help="show metadata only")
# delete
p_delete = subparsers.add_parser("delete", aliases=["d", "rm"], help="delete paste")
p_delete.add_argument("id", help="paste ID or URL")
# info
subparsers.add_parser("info", aliases=["i"], help="show server info")
# cert
p_cert = subparsers.add_parser("cert", help="generate client certificate")
p_cert.add_argument("-o", "--output", metavar="DIR", help="output directory")
p_cert.add_argument(
"-a", "--algorithm", choices=["rsa", "ec"], default="ec", help="key algorithm (default: ec)"
)
p_cert.add_argument("-b", "--bits", type=int, metavar="N", help="RSA key size (default: 4096)")
p_cert.add_argument(
"-c", "--curve", metavar="CURVE", help="EC curve: secp256r1, secp384r1, secp521r1"
)
p_cert.add_argument("-d", "--days", type=int, metavar="N", help="validity period in days")
p_cert.add_argument("-n", "--name", metavar="CN", help="common name (default: $USER)")
p_cert.add_argument("--password-key", metavar="PASS", help="encrypt private key with password")
p_cert.add_argument(
"--configure", action="store_true", help="update config file with generated cert paths"
)
p_cert.add_argument("-f", "--force", action="store_true", help="overwrite existing files")
# pki (with subcommands)
p_pki = subparsers.add_parser("pki", help="PKI operations (server-issued certificates)")
pki_sub = p_pki.add_subparsers(dest="pki_command", metavar="subcommand")
# pki status
pki_sub.add_parser("status", help="show PKI status and CA info")
# pki issue
p_pki_issue = pki_sub.add_parser("issue", help="request certificate from server CA")
p_pki_issue.add_argument(
"-n", "--name", required=True, metavar="CN", help="common name for certificate (required)"
)
p_pki_issue.add_argument(
"-o", "--output", metavar="DIR", help="output directory (default: ~/.config/fpaste)"
)
p_pki_issue.add_argument(
"--configure", action="store_true", help="update config file with issued cert paths"
)
p_pki_issue.add_argument("-f", "--force", action="store_true", help="overwrite existing files")
# pki download
p_pki_download = pki_sub.add_parser("download", aliases=["dl"], help="download CA certificate")
p_pki_download.add_argument(
"-o", "--output", metavar="FILE", help="save to file (default: stdout)"
)
p_pki_download.add_argument(
"--configure",
action="store_true",
help="update config file with CA cert path (requires -o)",
)
args = parser.parse_args(args_to_parse)
config = get_config()
if args.server:
config["server"] = args.server
# Create SSL context for mTLS if configured
config["ssl_context"] = create_ssl_context(config)
if not args.command:
# Default: create from stdin if data is piped
if not sys.stdin.isatty():
args.command = "create"
args.file = None
args.no_encrypt = False # Encrypt by default
args.burn = False
args.expiry = None
args.password = None
args.raw = False
args.quiet = False
else:
parser.print_help()
sys.exit(0)
if args.command in ("create", "c", "new"):
cmd_create(args, config)
elif args.command in ("get", "g"):
cmd_get(args, config)
elif args.command in ("delete", "d", "rm"):
cmd_delete(args, config)
elif args.command in ("info", "i"):
cmd_info(args, config)
elif args.command == "cert":
cmd_cert(args, config)
elif args.command == "pki":
if args.pki_command == "status":
cmd_pki_status(args, config)
elif args.pki_command == "issue":
cmd_pki_issue(args, config)
elif args.pki_command in ("download", "dl"):
cmd_pki_download(args, config)
else:
# Show pki help if no subcommand
parser.parse_args(["pki", "--help"])
if __name__ == "__main__":
main()