Files
flaskpaste/fpaste
Username bfc238b5cf
Some checks failed
CI / Lint & Format (push) Successful in 16s
CI / Security Scan (push) Failing after 19s
CI / Tests (push) Successful in 34s
add CLI enhancements and scheduled cleanup
CLI commands:
- list: show user's pastes with pagination
- search: filter by type (glob), after/before timestamps
- update: modify content, password, or extend expiry
- export: save pastes to directory with optional decryption

API changes:
- PUT /<id>: update paste content and metadata
- GET /pastes: add type, after, before query params

Scheduled tasks:
- Thread-safe cleanup with per-task intervals
- Activate cleanup_expired_hashes (15min)
- Activate cleanup_rate_limits (5min)

Tests: 205 passing
2025-12-20 20:13:00 +01:00

1402 lines
46 KiB
Python
Executable File

#!/usr/bin/env python3
"""FlaskPaste command-line client."""
import argparse
import base64
import hashlib
import json
import os
import ssl
import sys
import urllib.error
import urllib.request
from datetime import UTC, datetime, timedelta
from pathlib import Path
# Optional cryptography support (for encryption and cert generation)
try:
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.x509.oid import NameOID
HAS_CRYPTO = True
except ImportError:
HAS_CRYPTO = False
def get_config():
"""Load configuration from environment or config file."""
config = {
"server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000"),
"cert_sha1": os.environ.get("FLASKPASTE_CERT_SHA1", ""),
"client_cert": os.environ.get("FLASKPASTE_CLIENT_CERT", ""),
"client_key": os.environ.get("FLASKPASTE_CLIENT_KEY", ""),
"ca_cert": os.environ.get("FLASKPASTE_CA_CERT", ""),
}
# Try config file
config_file = Path.home() / ".config" / "fpaste" / "config"
if config_file.exists():
for line in config_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, value = line.split("=", 1)
key = key.strip().lower()
value = value.strip().strip('"').strip("'")
if key == "server":
config["server"] = value
elif key == "cert_sha1":
config["cert_sha1"] = value
elif key == "client_cert":
config["client_cert"] = value
elif key == "client_key":
config["client_key"] = value
elif key == "ca_cert":
config["ca_cert"] = value
return config
def create_ssl_context(config):
"""Create SSL context for mTLS if certificates are configured."""
client_cert = config.get("client_cert", "")
client_key = config.get("client_key", "")
ca_cert = config.get("ca_cert", "")
if not client_cert:
return None
ctx = ssl.create_default_context()
# Load CA certificate if specified
if ca_cert:
ctx.load_verify_locations(ca_cert)
# Load client certificate and key
try:
ctx.load_cert_chain(certfile=client_cert, keyfile=client_key or None)
except ssl.SSLError as e:
die(f"failed to load client certificate: {e}")
except FileNotFoundError as e:
die(f"certificate file not found: {e}")
return ctx
def request(url, method="GET", data=None, headers=None, ssl_context=None):
"""Make HTTP request and return response."""
headers = headers or {}
# User-configured server URL, audit is expected
req = urllib.request.Request(url, data=data, headers=headers, method=method) # noqa: S310
try:
# User-configured server URL, audit is expected
with urllib.request.urlopen(req, timeout=30, context=ssl_context) as resp: # noqa: S310
return resp.status, resp.read(), dict(resp.headers)
except urllib.error.HTTPError as e:
return e.code, e.read(), dict(e.headers)
except urllib.error.URLError as e:
die(f"Connection failed: {e.reason}")
def die(msg, code=1):
"""Print error and exit."""
print(f"error: {msg}", file=sys.stderr)
sys.exit(code)
def encrypt_content(plaintext):
"""Encrypt content with AES-256-GCM. Returns (ciphertext, key)."""
if not HAS_CRYPTO:
die("encryption requires 'cryptography' package: pip install cryptography")
key = os.urandom(32)
nonce = os.urandom(12) # 96-bit nonce for GCM
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, plaintext, None)
return nonce + ciphertext, key
def decrypt_content(blob, key):
"""Decrypt AES-256-GCM encrypted content."""
if not HAS_CRYPTO:
die("decryption requires 'cryptography' package: pip install cryptography")
if len(blob) < 12:
die("encrypted content too short")
nonce, ciphertext = blob[:12], blob[12:]
aesgcm = AESGCM(key)
try:
return aesgcm.decrypt(nonce, ciphertext, None)
except Exception:
die("decryption failed (wrong key or corrupted data)")
def encode_key(key):
"""Encode key as URL-safe base64."""
return base64.urlsafe_b64encode(key).decode().rstrip("=")
def decode_key(encoded):
"""Decode URL-safe base64 key."""
# Add padding if needed
padding = 4 - (len(encoded) % 4)
if padding != 4:
encoded += "=" * padding
try:
return base64.urlsafe_b64decode(encoded)
except Exception:
die("invalid encryption key in URL")
def solve_pow(nonce, difficulty):
"""Solve proof-of-work challenge.
Find a number N such that SHA256(nonce:N) has `difficulty` leading zero bits.
"""
n = 0
target_bytes = (difficulty + 7) // 8 # Bytes to check
while True:
work = f"{nonce}:{n}".encode()
hash_bytes = hashlib.sha256(work).digest()
# Count leading zero bits
zero_bits = 0
for byte in hash_bytes[: target_bytes + 1]:
if byte == 0:
zero_bits += 8
else:
zero_bits += 8 - byte.bit_length()
break
if zero_bits >= difficulty:
return n
n += 1
# Progress indicator for high difficulty
if n % 100000 == 0:
print(f"\rsolving pow: {n} attempts...", end="", file=sys.stderr)
return n
def get_challenge(config):
"""Fetch PoW challenge from server."""
url = config["server"].rstrip("/") + "/challenge"
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status != 200:
return None
data = json.loads(body)
if not data.get("enabled"):
return None
return data
def cmd_create(args, config):
"""Create a new paste."""
# Read content from file or stdin
if args.file:
if args.file == "-":
content = sys.stdin.buffer.read()
else:
path = Path(args.file)
if not path.exists():
die(f"file not found: {args.file}")
content = path.read_bytes()
else:
# No file specified, read from stdin
if sys.stdin.isatty():
die("no input provided (pipe data or specify file)")
content = sys.stdin.buffer.read()
if not content:
die("empty content")
# Encrypt by default (unless --no-encrypt)
encryption_key = None
if not getattr(args, "no_encrypt", False):
if not HAS_CRYPTO:
die("encryption requires 'cryptography' package (use -E to disable)")
if not args.quiet:
print("encrypting...", end="", file=sys.stderr)
content, encryption_key = encrypt_content(content)
if not args.quiet:
print(" done", file=sys.stderr)
headers = {}
if config["cert_sha1"]:
headers["X-SSL-Client-SHA1"] = config["cert_sha1"]
# Add burn-after-read header
if args.burn:
headers["X-Burn-After-Read"] = "true"
# Add custom expiry header
if args.expiry:
headers["X-Expiry"] = str(args.expiry)
# Add password header
if args.password:
headers["X-Paste-Password"] = args.password
# Get and solve PoW challenge if required
challenge = get_challenge(config)
if challenge:
if not args.quiet:
print(f"solving pow (difficulty={challenge['difficulty']})...", end="", file=sys.stderr)
solution = solve_pow(challenge["nonce"], challenge["difficulty"])
if not args.quiet:
print(" done", file=sys.stderr)
headers["X-PoW-Token"] = challenge["token"]
headers["X-PoW-Solution"] = str(solution)
url = config["server"].rstrip("/") + "/"
status, body, _ = request(
url, method="POST", data=content, headers=headers, ssl_context=config.get("ssl_context")
)
if status == 201:
data = json.loads(body)
# Append encryption key to URL fragment if encrypted
key_fragment = ""
if encryption_key:
key_fragment = "#" + encode_key(encryption_key)
if args.raw:
print(config["server"].rstrip("/") + data["raw"] + key_fragment)
elif args.quiet:
print(data["id"] + key_fragment)
else:
print(config["server"].rstrip("/") + data["url"] + key_fragment)
else:
try:
err = json.loads(body).get("error", body.decode())
except (json.JSONDecodeError, UnicodeDecodeError):
err = body.decode(errors="replace")
die(f"create failed ({status}): {err}")
def cmd_get(args, config):
"""Retrieve a paste."""
# Parse URL for paste ID and optional encryption key fragment
url_input = args.id
encryption_key = None
# Extract key from URL fragment (#...)
if "#" in url_input:
url_input, key_encoded = url_input.rsplit("#", 1)
if key_encoded:
encryption_key = decode_key(key_encoded)
paste_id = url_input.split("/")[-1] # Handle full URLs
base = config["server"].rstrip("/")
# Build headers for password-protected pastes
headers = {}
if args.password:
headers["X-Paste-Password"] = args.password
if args.meta:
url = f"{base}/{paste_id}"
status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context"))
if status == 200:
data = json.loads(body)
print(f"id: {data['id']}")
print(f"mime_type: {data['mime_type']}")
print(f"size: {data['size']}")
print(f"created_at: {data['created_at']}")
if encryption_key:
print("encrypted: yes (key in URL)")
if data.get("password_protected"):
print("protected: yes (password required)")
elif status == 401:
die("password required (-p)")
elif status == 403:
die("invalid password")
else:
die(f"not found: {paste_id}")
else:
url = f"{base}/{paste_id}/raw"
ssl_ctx = config.get("ssl_context")
status, body, _ = request(url, headers=headers, ssl_context=ssl_ctx)
if status == 200:
# Decrypt if encryption key was provided
if encryption_key:
body = decrypt_content(body, encryption_key)
if args.output:
Path(args.output).write_bytes(body)
print(f"saved: {args.output}", file=sys.stderr)
else:
# Write binary to stdout
sys.stdout.buffer.write(body)
# Add newline if content doesn't end with one and stdout is tty
if sys.stdout.isatty() and body and not body.endswith(b"\n"):
sys.stdout.buffer.write(b"\n")
elif status == 401:
die("password required (-p)")
elif status == 403:
die("invalid password")
else:
die(f"not found: {paste_id}")
def cmd_delete(args, config):
"""Delete a paste."""
if not config["cert_sha1"]:
die("authentication required (set FLASKPASTE_CERT_SHA1)")
paste_id = args.id.split("/")[-1]
base = config["server"].rstrip("/")
url = f"{base}/{paste_id}"
headers = {"X-SSL-Client-SHA1": config["cert_sha1"]}
status, _, _ = request(
url, method="DELETE", headers=headers, ssl_context=config.get("ssl_context")
)
if status == 200:
print(f"deleted: {paste_id}")
elif status == 404:
die(f"not found: {paste_id}")
elif status == 403:
die("permission denied (not owner)")
elif status == 401:
die("authentication failed")
else:
die(f"delete failed ({status})")
def cmd_info(args, config):
"""Show server info."""
url = config["server"].rstrip("/") + "/"
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status == 200:
data = json.loads(body)
print(f"server: {config['server']}")
print(f"name: {data.get('name', 'unknown')}")
print(f"version: {data.get('version', 'unknown')}")
else:
die("failed to connect to server")
def format_size(size):
"""Format byte size as human-readable string."""
if size < 1024:
return f"{size}B"
elif size < 1024 * 1024:
return f"{size / 1024:.1f}K"
else:
return f"{size / (1024 * 1024):.1f}M"
def format_timestamp(ts):
"""Format Unix timestamp as human-readable date."""
from datetime import datetime
dt = datetime.fromtimestamp(ts, tz=UTC)
return dt.strftime("%Y-%m-%d %H:%M")
def cmd_list(args, config):
"""List user's pastes."""
if not config["cert_sha1"]:
die("authentication required (set FLASKPASTE_CERT_SHA1)")
base = config["server"].rstrip("/")
params = []
if args.limit:
params.append(f"limit={args.limit}")
if args.offset:
params.append(f"offset={args.offset}")
url = f"{base}/pastes"
if params:
url += "?" + "&".join(params)
headers = {"X-SSL-Client-SHA1": config["cert_sha1"]}
status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context"))
if status == 401:
die("authentication failed")
elif status != 200:
die(f"failed to list pastes ({status})")
data = json.loads(body)
pastes = data.get("pastes", [])
if args.json:
print(json.dumps(data, indent=2))
return
if not pastes:
print("no pastes found")
return
# Print header
print(f"{'ID':<12} {'TYPE':<16} {'SIZE':>6} {'CREATED':<16} FLAGS")
for p in pastes:
paste_id = p["id"]
mime_type = p.get("mime_type", "unknown")[:16]
size = format_size(p.get("size", 0))
created = format_timestamp(p.get("created_at", 0))
flags = []
if p.get("burn_after_read"):
flags.append("burn")
if p.get("password_protected"):
flags.append("pass")
if p.get("expires_at"):
flags.append("exp")
flag_str = " ".join(flags)
print(f"{paste_id:<12} {mime_type:<16} {size:>6} {created:<16} {flag_str}")
# Print summary
print(f"\n{data.get('count', 0)} of {data.get('total', 0)} pastes shown")
def parse_date(date_str):
"""Parse date string to Unix timestamp."""
from datetime import datetime
if not date_str:
return 0
# Try various formats
formats = [
"%Y-%m-%d",
"%Y-%m-%d %H:%M",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%SZ",
]
for fmt in formats:
try:
dt = datetime.strptime(date_str, fmt)
dt = dt.replace(tzinfo=UTC)
return int(dt.timestamp())
except ValueError:
continue
# Try as Unix timestamp
try:
return int(date_str)
except ValueError:
pass
die(f"invalid date format: {date_str}")
def cmd_search(args, config):
"""Search user's pastes."""
if not config["cert_sha1"]:
die("authentication required (set FLASKPASTE_CERT_SHA1)")
base = config["server"].rstrip("/")
params = []
if args.type:
params.append(f"type={args.type}")
if args.after:
ts = parse_date(args.after)
params.append(f"after={ts}")
if args.before:
ts = parse_date(args.before)
params.append(f"before={ts}")
if args.limit:
params.append(f"limit={args.limit}")
url = f"{base}/pastes"
if params:
url += "?" + "&".join(params)
headers = {"X-SSL-Client-SHA1": config["cert_sha1"]}
status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context"))
if status == 401:
die("authentication failed")
elif status != 200:
die(f"failed to search pastes ({status})")
data = json.loads(body)
pastes = data.get("pastes", [])
if args.json:
print(json.dumps(data, indent=2))
return
if not pastes:
print("no matching pastes found")
return
# Print header
print(f"{'ID':<12} {'TYPE':<16} {'SIZE':>6} {'CREATED':<16} FLAGS")
for p in pastes:
paste_id = p["id"]
mime_type = p.get("mime_type", "unknown")[:16]
size = format_size(p.get("size", 0))
created = format_timestamp(p.get("created_at", 0))
flags = []
if p.get("burn_after_read"):
flags.append("burn")
if p.get("password_protected"):
flags.append("pass")
if p.get("expires_at"):
flags.append("exp")
flag_str = " ".join(flags)
print(f"{paste_id:<12} {mime_type:<16} {size:>6} {created:<16} {flag_str}")
# Print summary
print(f"\n{data.get('count', 0)} matching pastes found")
def cmd_update(args, config):
"""Update an existing paste."""
if not config["cert_sha1"]:
die("authentication required (set FLASKPASTE_CERT_SHA1)")
paste_id = args.id.split("/")[-1] # Handle full URLs
if "#" in paste_id:
paste_id = paste_id.split("#")[0] # Remove key fragment
base = config["server"].rstrip("/")
url = f"{base}/{paste_id}"
headers = {"X-SSL-Client-SHA1": config["cert_sha1"]}
content = None
# Read content from file if provided
if args.file:
if args.file == "-":
content = sys.stdin.buffer.read()
else:
path = Path(args.file)
if not path.exists():
die(f"file not found: {args.file}")
content = path.read_bytes()
if not content:
die("empty content")
# Encrypt if requested (default is to encrypt)
if not getattr(args, "no_encrypt", False):
if not HAS_CRYPTO:
die("encryption requires 'cryptography' package (use -E to disable)")
if not args.quiet:
print("encrypting...", end="", file=sys.stderr)
content, encryption_key = encrypt_content(content)
if not args.quiet:
print(" done", file=sys.stderr)
else:
encryption_key = None
# Set metadata update headers
if args.password:
headers["X-Paste-Password"] = args.password
if args.remove_password:
headers["X-Remove-Password"] = "true"
if args.expiry:
headers["X-Extend-Expiry"] = str(args.expiry)
# Make request
status, body, _ = request(
url, method="PUT", data=content, headers=headers, ssl_context=config.get("ssl_context")
)
if status == 200:
data = json.loads(body)
if args.quiet:
print(paste_id)
else:
print(f"updated: {paste_id}")
print(f" size: {data.get('size', 'unknown')}")
print(f" type: {data.get('mime_type', 'unknown')}")
if data.get("expires_at"):
print(f" expires: {data.get('expires_at')}")
if data.get("password_protected"):
print(" password: protected")
# Show new encryption key if content was updated and encrypted
if content and "encryption_key" in dir() and encryption_key:
key_fragment = "#" + encode_key(encryption_key)
print(f" key: {base}/{paste_id}{key_fragment}")
elif status == 400:
try:
err = json.loads(body).get("error", "bad request")
except (json.JSONDecodeError, UnicodeDecodeError):
err = "bad request"
die(err)
elif status == 401:
die("authentication failed")
elif status == 403:
die("permission denied (not owner)")
elif status == 404:
die(f"not found: {paste_id}")
else:
die(f"update failed ({status})")
def cmd_export(args, config):
"""Export user's pastes to a directory."""
if not config["cert_sha1"]:
die("authentication required (set FLASKPASTE_CERT_SHA1)")
base = config["server"].rstrip("/")
out_dir = Path(args.output) if args.output else Path("fpaste-export")
# Create output directory
out_dir.mkdir(parents=True, exist_ok=True)
# Load key file if provided
keys = {}
if args.keyfile:
keyfile_path = Path(args.keyfile)
if not keyfile_path.exists():
die(f"key file not found: {args.keyfile}")
for line in keyfile_path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
paste_id, key_encoded = line.split("=", 1)
keys[paste_id.strip()] = key_encoded.strip()
# Fetch paste list
headers = {"X-SSL-Client-SHA1": config["cert_sha1"]}
url = f"{base}/pastes?limit=1000" # Fetch all pastes
status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context"))
if status == 401:
die("authentication failed")
elif status != 200:
die(f"failed to list pastes ({status})")
data = json.loads(body)
pastes = data.get("pastes", [])
if not pastes:
print("no pastes to export")
return
# Export each paste
exported = 0
skipped = 0
errors = 0
manifest = []
for p in pastes:
paste_id = p["id"]
mime_type = p.get("mime_type", "application/octet-stream")
if not args.quiet:
print(f"exporting {paste_id}...", end=" ", file=sys.stderr)
# Skip burn-after-read pastes
if p.get("burn_after_read"):
if not args.quiet:
print("skipped (burn-after-read)", file=sys.stderr)
skipped += 1
continue
# Fetch raw content
raw_url = f"{base}/{paste_id}/raw"
req_headers = dict(headers)
if p.get("password_protected"):
if not args.quiet:
print("skipped (password-protected)", file=sys.stderr)
skipped += 1
continue
ssl_ctx = config.get("ssl_context")
status, content, _ = request(raw_url, headers=req_headers, ssl_context=ssl_ctx)
if status != 200:
if not args.quiet:
print(f"error ({status})", file=sys.stderr)
errors += 1
continue
# Decrypt if key available
decrypted = False
if paste_id in keys:
try:
key = decode_key(keys[paste_id])
content = decrypt_content(content, key)
decrypted = True
except SystemExit:
# Decryption failed, keep encrypted content
if not args.quiet:
print("decryption failed, keeping encrypted", file=sys.stderr, end=" ")
# Determine file extension from MIME type
ext = get_extension_for_mime(mime_type)
filename = f"{paste_id}{ext}"
filepath = out_dir / filename
# Write content
filepath.write_bytes(content)
# Add to manifest
manifest.append(
{
"id": paste_id,
"filename": filename,
"mime_type": mime_type,
"size": len(content),
"created_at": p.get("created_at"),
"decrypted": decrypted,
"encrypted": paste_id in keys and not decrypted,
}
)
if not args.quiet:
status_msg = "decrypted" if decrypted else ("encrypted" if paste_id in keys else "ok")
print(status_msg, file=sys.stderr)
exported += 1
# Write manifest
if args.manifest:
manifest_path = out_dir / "manifest.json"
manifest_path.write_text(json.dumps(manifest, indent=2))
if not args.quiet:
print(f"manifest: {manifest_path}", file=sys.stderr)
# Summary
print(f"\nexported: {exported}, skipped: {skipped}, errors: {errors}")
print(f"output: {out_dir}")
def get_extension_for_mime(mime_type):
"""Get file extension for MIME type."""
mime_map = {
"text/plain": ".txt",
"text/html": ".html",
"text/css": ".css",
"text/javascript": ".js",
"text/markdown": ".md",
"text/x-python": ".py",
"application/json": ".json",
"application/xml": ".xml",
"application/javascript": ".js",
"application/octet-stream": ".bin",
"image/png": ".png",
"image/jpeg": ".jpg",
"image/gif": ".gif",
"image/webp": ".webp",
"image/svg+xml": ".svg",
"application/pdf": ".pdf",
"application/zip": ".zip",
"application/gzip": ".gz",
"application/x-tar": ".tar",
}
return mime_map.get(mime_type, ".bin")
def cmd_pki_status(args, config):
"""Show PKI status and CA information."""
url = config["server"].rstrip("/") + "/pki"
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status == 404:
die("PKI not enabled on this server")
elif status != 200:
die(f"failed to get PKI status ({status})")
data = json.loads(body)
print(f"pki enabled: {data.get('enabled', False)}")
print(f"ca exists: {data.get('ca_exists', False)}")
if data.get("ca_exists"):
print(f"common name: {data.get('common_name', 'unknown')}")
print(f"fingerprint: {data.get('fingerprint_sha1', 'unknown')}")
if data.get("created_at"):
print(f"created: {data.get('created_at')}")
if data.get("expires_at"):
print(f"expires: {data.get('expires_at')}")
print(f"download: {config['server'].rstrip('/')}{data.get('download', '/pki/ca.crt')}")
elif data.get("hint"):
print(f"hint: {data.get('hint')}")
def cmd_pki_issue(args, config):
"""Request a new client certificate from the server CA."""
url = config["server"].rstrip("/") + "/pki/issue"
headers = {"Content-Type": "application/json"}
if config["cert_sha1"]:
headers["X-SSL-Client-SHA1"] = config["cert_sha1"]
payload = {"common_name": args.name}
data = json.dumps(payload).encode()
status, body, _ = request(
url, method="POST", data=data, headers=headers, ssl_context=config.get("ssl_context")
)
if status == 404:
# Could be PKI disabled or no CA
try:
err = json.loads(body).get("error", "PKI not available")
except (json.JSONDecodeError, UnicodeDecodeError):
err = "PKI not available"
die(err)
elif status == 400:
try:
err = json.loads(body).get("error", "bad request")
except (json.JSONDecodeError, UnicodeDecodeError):
err = "bad request"
die(err)
elif status != 201:
die(f"certificate issuance failed ({status})")
result = json.loads(body)
# Determine output directory
out_dir = Path(args.output) if args.output else Path.home() / ".config" / "fpaste"
out_dir.mkdir(parents=True, exist_ok=True)
# File paths
key_file = out_dir / "client.key"
cert_file = out_dir / "client.crt"
# Check for existing files
if not args.force:
if key_file.exists():
die(f"key file exists: {key_file} (use --force)")
if cert_file.exists():
die(f"cert file exists: {cert_file} (use --force)")
# Write files
key_file.write_text(result["private_key_pem"])
key_file.chmod(0o600)
cert_file.write_text(result["certificate_pem"])
fingerprint = result.get("fingerprint_sha1", "unknown")
print(f"key: {key_file}", file=sys.stderr)
print(f"certificate: {cert_file}", file=sys.stderr)
print(f"fingerprint: {fingerprint}", file=sys.stderr)
print(f"serial: {result.get('serial', 'unknown')}", file=sys.stderr)
print(f"common name: {result.get('common_name', args.name)}", file=sys.stderr)
# Update config file if requested
if args.configure:
config_file = Path.home() / ".config" / "fpaste" / "config"
config_file.parent.mkdir(parents=True, exist_ok=True)
# Read existing config
existing = {}
if config_file.exists():
for line in config_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
existing[k.strip().lower()] = v.strip()
# Update values
existing["client_cert"] = str(cert_file)
existing["client_key"] = str(key_file)
existing["cert_sha1"] = fingerprint
# Write config
lines = [f"{k} = {v}" for k, v in sorted(existing.items())]
config_file.write_text("\n".join(lines) + "\n")
print(f"config: {config_file} (updated)", file=sys.stderr)
# Output fingerprint to stdout for easy capture
print(fingerprint)
def cmd_pki_download(args, config):
"""Download the CA certificate from the server."""
url = config["server"].rstrip("/") + "/pki/ca.crt"
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status == 404:
die("CA certificate not available (PKI disabled or CA not generated)")
elif status != 200:
die(f"failed to download CA certificate ({status})")
# Determine output
if args.output:
out_path = Path(args.output)
out_path.write_bytes(body)
print(f"saved: {out_path}", file=sys.stderr)
# Calculate and show fingerprint if cryptography available
if HAS_CRYPTO:
cert = x509.load_pem_x509_certificate(body)
# SHA1 is standard for X.509 fingerprints
fp = hashlib.sha1(cert.public_bytes(serialization.Encoding.DER)).hexdigest() # noqa: S324
print(f"fingerprint: {fp}", file=sys.stderr)
# Update config if requested
if args.configure:
config_file = Path.home() / ".config" / "fpaste" / "config"
config_file.parent.mkdir(parents=True, exist_ok=True)
existing = {}
if config_file.exists():
for line in config_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
existing[k.strip().lower()] = v.strip()
existing["ca_cert"] = str(out_path)
lines = [f"{k} = {v}" for k, v in sorted(existing.items())]
config_file.write_text("\n".join(lines) + "\n")
print(f"config: {config_file} (updated)", file=sys.stderr)
else:
# Output to stdout
sys.stdout.buffer.write(body)
def cmd_cert(args, config):
"""Generate a self-signed client certificate for mTLS authentication."""
if not HAS_CRYPTO:
die("certificate generation requires 'cryptography' package: pip install cryptography")
# Determine output directory
out_dir = Path(args.output) if args.output else Path.home() / ".config" / "fpaste"
out_dir.mkdir(parents=True, exist_ok=True)
# File paths
key_file = out_dir / "client.key"
cert_file = out_dir / "client.crt"
# Check for existing files
if not args.force:
if key_file.exists():
die(f"key file exists: {key_file} (use --force)")
if cert_file.exists():
die(f"cert file exists: {cert_file} (use --force)")
# Generate private key
if args.algorithm == "rsa":
key_size = args.bits or 4096
print(f"generating {key_size}-bit RSA key...", file=sys.stderr)
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
)
elif args.algorithm == "ec":
curve_name = args.curve or "secp384r1"
curves = {
"secp256r1": ec.SECP256R1(),
"secp384r1": ec.SECP384R1(),
"secp521r1": ec.SECP521R1(),
}
if curve_name not in curves:
die(f"unsupported curve: {curve_name} (use: secp256r1, secp384r1, secp521r1)")
print(f"generating EC key ({curve_name})...", file=sys.stderr)
private_key = ec.generate_private_key(curves[curve_name])
else:
die(f"unsupported algorithm: {args.algorithm}")
# Certificate subject
cn = args.name or os.environ.get("USER", "fpaste-client")
subject = issuer = x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, cn),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "FlaskPaste Client"),
]
)
# Validity period
days = args.days or 365
now = datetime.now(UTC)
# Build certificate
cert_builder = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + timedelta(days=days))
.add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True,
)
.add_extension(
x509.KeyUsage(
digital_signature=True,
key_encipherment=True,
content_commitment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False,
),
critical=True,
)
.add_extension(
x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]),
critical=False,
)
)
# Sign certificate
print("signing certificate...", file=sys.stderr)
certificate = cert_builder.sign(private_key, hashes.SHA256())
# Calculate SHA1 fingerprint (standard for X.509)
cert_der = certificate.public_bytes(serialization.Encoding.DER)
fingerprint = hashlib.sha1(cert_der).hexdigest() # noqa: S324
# Serialize private key
if args.password_key:
key_encryption = serialization.BestAvailableEncryption(args.password_key.encode("utf-8"))
else:
key_encryption = serialization.NoEncryption()
key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=key_encryption,
)
# Serialize certificate
cert_pem = certificate.public_bytes(serialization.Encoding.PEM)
# Write files
key_file.write_bytes(key_pem)
key_file.chmod(0o600) # Restrict permissions
cert_file.write_bytes(cert_pem)
print(f"key: {key_file}", file=sys.stderr)
print(f"certificate: {cert_file}", file=sys.stderr)
print(f"fingerprint: {fingerprint}", file=sys.stderr)
print(f"valid for: {days} days", file=sys.stderr)
print(f"common name: {cn}", file=sys.stderr)
# Update config file if requested
if args.configure:
config_file = Path.home() / ".config" / "fpaste" / "config"
config_file.parent.mkdir(parents=True, exist_ok=True)
# Read existing config
existing = {}
if config_file.exists():
for line in config_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
existing[k.strip().lower()] = v.strip()
# Update values
existing["client_cert"] = str(cert_file)
existing["client_key"] = str(key_file)
existing["cert_sha1"] = fingerprint
# Write config
lines = [f"{k} = {v}" for k, v in sorted(existing.items())]
config_file.write_text("\n".join(lines) + "\n")
print(f"config: {config_file} (updated)", file=sys.stderr)
# Output fingerprint to stdout for easy capture
print(fingerprint)
def is_file_path(arg):
"""Check if argument looks like a file path."""
if not arg or arg.startswith("-"):
return False
# Check if it's an existing file
if Path(arg).exists():
return True
# Check if it looks like a path (contains / or \ or common extensions)
if "/" in arg or "\\" in arg:
return True
# Check for common file extensions
if "." in arg and not arg.startswith("."):
ext = arg.rsplit(".", 1)[-1].lower()
if ext in (
"txt",
"md",
"py",
"js",
"json",
"yaml",
"yml",
"xml",
"html",
"css",
"sh",
"bash",
"c",
"cpp",
"h",
"go",
"rs",
"java",
"rb",
"php",
"sql",
"log",
"conf",
"cfg",
"ini",
"png",
"jpg",
"jpeg",
"gif",
"pdf",
"zip",
"tar",
"gz",
):
return True
return False
def main():
# Pre-process arguments: if first positional looks like a file, insert "create"
args_to_parse = sys.argv[1:]
commands = {
"create",
"c",
"new",
"get",
"g",
"delete",
"d",
"rm",
"info",
"i",
"list",
"ls",
"search",
"s",
"find",
"update",
"u",
"export",
"cert",
"pki",
}
# Find insertion point for "create" command
insert_pos = 0
has_command = False
file_pos = -1
i = 0
while i < len(args_to_parse):
arg = args_to_parse[i]
if arg in ("-s", "--server"):
insert_pos = i + 2 # After -s value
i += 2
continue
if arg in ("-h", "--help"):
i += 1
insert_pos = i
continue
if arg.startswith("-"):
# Unknown option - might be for create subcommand
i += 1
continue
# Found positional argument
if arg in commands:
has_command = True
break
elif is_file_path(arg):
file_pos = i
break
i += 1
# Insert "create" if no command found and we have input (file path or piped stdin)
if not has_command and (file_pos >= 0 or not sys.stdin.isatty()):
args_to_parse.insert(insert_pos, "create")
parser = argparse.ArgumentParser(
prog="fpaste",
description="FlaskPaste command-line client",
epilog="Shortcut: fpaste <file> is equivalent to fpaste create <file>",
)
parser.add_argument(
"-s",
"--server",
help="server URL (env: FLASKPASTE_SERVER)",
)
subparsers = parser.add_subparsers(dest="command", metavar="command")
# create
p_create = subparsers.add_parser("create", aliases=["c", "new"], help="create paste")
p_create.add_argument("file", nargs="?", help="file to upload (- for stdin)")
p_create.add_argument("-E", "--no-encrypt", action="store_true", help="disable encryption")
p_create.add_argument("-b", "--burn", action="store_true", help="burn after read")
p_create.add_argument("-x", "--expiry", type=int, metavar="SEC", help="expiry in seconds")
p_create.add_argument("-p", "--password", metavar="PASS", help="password protect")
p_create.add_argument("-r", "--raw", action="store_true", help="output raw URL")
p_create.add_argument("-q", "--quiet", action="store_true", help="output ID only")
# get
p_get = subparsers.add_parser("get", aliases=["g"], help="retrieve paste")
p_get.add_argument("id", help="paste ID or URL")
p_get.add_argument("-o", "--output", help="save to file")
p_get.add_argument("-p", "--password", metavar="PASS", help="password for protected paste")
p_get.add_argument("-m", "--meta", action="store_true", help="show metadata only")
# delete
p_delete = subparsers.add_parser("delete", aliases=["d", "rm"], help="delete paste")
p_delete.add_argument("id", help="paste ID or URL")
# info
subparsers.add_parser("info", aliases=["i"], help="show server info")
# list
p_list = subparsers.add_parser("list", aliases=["ls"], help="list your pastes")
p_list.add_argument("-l", "--limit", type=int, metavar="N", help="max pastes (default: 50)")
p_list.add_argument("-o", "--offset", type=int, metavar="N", help="skip first N pastes")
p_list.add_argument("--json", action="store_true", help="output as JSON")
# search
p_search = subparsers.add_parser("search", aliases=["s", "find"], help="search your pastes")
p_search.add_argument("-t", "--type", metavar="PATTERN", help="filter by MIME type (image/*)")
p_search.add_argument("--after", metavar="DATE", help="created after (YYYY-MM-DD or timestamp)")
p_search.add_argument("--before", metavar="DATE", help="created before (YYYY-MM-DD)")
p_search.add_argument("-l", "--limit", type=int, metavar="N", help="max results (default: 50)")
p_search.add_argument("--json", action="store_true", help="output as JSON")
# update
p_update = subparsers.add_parser("update", aliases=["u"], help="update existing paste")
p_update.add_argument("id", help="paste ID or URL")
p_update.add_argument("file", nargs="?", help="new content (- for stdin)")
p_update.add_argument("-E", "--no-encrypt", action="store_true", help="disable encryption")
p_update.add_argument("-p", "--password", metavar="PASS", help="set/change password")
p_update.add_argument("--remove-password", action="store_true", help="remove password")
p_update.add_argument("-x", "--expiry", type=int, metavar="SEC", help="extend expiry (seconds)")
p_update.add_argument("-q", "--quiet", action="store_true", help="minimal output")
# export
p_export = subparsers.add_parser("export", help="export all pastes to directory")
p_export.add_argument("-o", "--output", metavar="DIR", help="output directory")
p_export.add_argument("-k", "--keyfile", metavar="FILE", help="key file (paste_id=key format)")
p_export.add_argument("--manifest", action="store_true", help="write manifest.json")
p_export.add_argument("-q", "--quiet", action="store_true", help="minimal output")
# cert
p_cert = subparsers.add_parser("cert", help="generate client certificate")
p_cert.add_argument("-o", "--output", metavar="DIR", help="output directory")
p_cert.add_argument(
"-a", "--algorithm", choices=["rsa", "ec"], default="ec", help="key algorithm (default: ec)"
)
p_cert.add_argument("-b", "--bits", type=int, metavar="N", help="RSA key size (default: 4096)")
p_cert.add_argument(
"-c", "--curve", metavar="CURVE", help="EC curve: secp256r1, secp384r1, secp521r1"
)
p_cert.add_argument("-d", "--days", type=int, metavar="N", help="validity period in days")
p_cert.add_argument("-n", "--name", metavar="CN", help="common name (default: $USER)")
p_cert.add_argument("--password-key", metavar="PASS", help="encrypt private key with password")
p_cert.add_argument(
"--configure", action="store_true", help="update config file with generated cert paths"
)
p_cert.add_argument("-f", "--force", action="store_true", help="overwrite existing files")
# pki (with subcommands)
p_pki = subparsers.add_parser("pki", help="PKI operations (server-issued certificates)")
pki_sub = p_pki.add_subparsers(dest="pki_command", metavar="subcommand")
# pki status
pki_sub.add_parser("status", help="show PKI status and CA info")
# pki issue
p_pki_issue = pki_sub.add_parser("issue", help="request certificate from server CA")
p_pki_issue.add_argument(
"-n", "--name", required=True, metavar="CN", help="common name for certificate (required)"
)
p_pki_issue.add_argument(
"-o", "--output", metavar="DIR", help="output directory (default: ~/.config/fpaste)"
)
p_pki_issue.add_argument(
"--configure", action="store_true", help="update config file with issued cert paths"
)
p_pki_issue.add_argument("-f", "--force", action="store_true", help="overwrite existing files")
# pki download
p_pki_download = pki_sub.add_parser("download", aliases=["dl"], help="download CA certificate")
p_pki_download.add_argument(
"-o", "--output", metavar="FILE", help="save to file (default: stdout)"
)
p_pki_download.add_argument(
"--configure",
action="store_true",
help="update config file with CA cert path (requires -o)",
)
args = parser.parse_args(args_to_parse)
config = get_config()
if args.server:
config["server"] = args.server
# Create SSL context for mTLS if configured
config["ssl_context"] = create_ssl_context(config)
if not args.command:
# Default: create from stdin if data is piped
if not sys.stdin.isatty():
args.command = "create"
args.file = None
args.no_encrypt = False # Encrypt by default
args.burn = False
args.expiry = None
args.password = None
args.raw = False
args.quiet = False
else:
parser.print_help()
sys.exit(0)
if args.command in ("create", "c", "new"):
cmd_create(args, config)
elif args.command in ("get", "g"):
cmd_get(args, config)
elif args.command in ("delete", "d", "rm"):
cmd_delete(args, config)
elif args.command in ("info", "i"):
cmd_info(args, config)
elif args.command in ("list", "ls"):
cmd_list(args, config)
elif args.command in ("search", "s", "find"):
cmd_search(args, config)
elif args.command in ("update", "u"):
cmd_update(args, config)
elif args.command == "export":
cmd_export(args, config)
elif args.command == "cert":
cmd_cert(args, config)
elif args.command == "pki":
if args.pki_command == "status":
cmd_pki_status(args, config)
elif args.pki_command == "issue":
cmd_pki_issue(args, config)
elif args.pki_command in ("download", "dl"):
cmd_pki_download(args, config)
else:
# Show pki help if no subcommand
parser.parse_args(["pki", "--help"])
if __name__ == "__main__":
main()