forked from username/flaskpaste
add command-line client
This commit is contained in:
229
fpaste
Executable file
229
fpaste
Executable file
@@ -0,0 +1,229 @@
|
||||
#!/usr/bin/env python3
|
||||
"""FlaskPaste command-line client."""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def get_config():
|
||||
"""Load configuration from environment or config file."""
|
||||
config = {
|
||||
"server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000"),
|
||||
"cert_sha1": os.environ.get("FLASKPASTE_CERT_SHA1", ""),
|
||||
}
|
||||
|
||||
# Try config file
|
||||
config_file = Path.home() / ".config" / "fpaste" / "config"
|
||||
if config_file.exists():
|
||||
for line in config_file.read_text().splitlines():
|
||||
line = line.strip()
|
||||
if line and not line.startswith("#") and "=" in line:
|
||||
key, value = line.split("=", 1)
|
||||
key = key.strip().lower()
|
||||
value = value.strip().strip('"').strip("'")
|
||||
if key == "server":
|
||||
config["server"] = value
|
||||
elif key == "cert_sha1":
|
||||
config["cert_sha1"] = value
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def request(url, method="GET", data=None, headers=None):
|
||||
"""Make HTTP request and return response."""
|
||||
headers = headers or {}
|
||||
req = urllib.request.Request(url, data=data, headers=headers, method=method)
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return resp.status, resp.read(), dict(resp.headers)
|
||||
except urllib.error.HTTPError as e:
|
||||
return e.code, e.read(), dict(e.headers)
|
||||
except urllib.error.URLError as e:
|
||||
die(f"Connection failed: {e.reason}")
|
||||
|
||||
|
||||
def die(msg, code=1):
|
||||
"""Print error and exit."""
|
||||
print(f"error: {msg}", file=sys.stderr)
|
||||
sys.exit(code)
|
||||
|
||||
|
||||
def cmd_create(args, config):
|
||||
"""Create a new paste."""
|
||||
# Read content from file or stdin
|
||||
if args.file:
|
||||
if args.file == "-":
|
||||
content = sys.stdin.buffer.read()
|
||||
else:
|
||||
path = Path(args.file)
|
||||
if not path.exists():
|
||||
die(f"file not found: {args.file}")
|
||||
content = path.read_bytes()
|
||||
else:
|
||||
# No file specified, read from stdin
|
||||
if sys.stdin.isatty():
|
||||
die("no input provided (pipe data or specify file)")
|
||||
content = sys.stdin.buffer.read()
|
||||
|
||||
if not content:
|
||||
die("empty content")
|
||||
|
||||
headers = {}
|
||||
if config["cert_sha1"]:
|
||||
headers["X-SSL-Client-SHA1"] = config["cert_sha1"]
|
||||
|
||||
url = config["server"].rstrip("/") + "/"
|
||||
status, body, _ = request(url, method="POST", data=content, headers=headers)
|
||||
|
||||
if status == 201:
|
||||
data = json.loads(body)
|
||||
if args.raw:
|
||||
print(config["server"].rstrip("/") + data["raw"])
|
||||
elif args.quiet:
|
||||
print(data["id"])
|
||||
else:
|
||||
print(config["server"].rstrip("/") + data["url"])
|
||||
else:
|
||||
try:
|
||||
err = json.loads(body).get("error", body.decode())
|
||||
except (json.JSONDecodeError, UnicodeDecodeError):
|
||||
err = body.decode(errors="replace")
|
||||
die(f"create failed ({status}): {err}")
|
||||
|
||||
|
||||
def cmd_get(args, config):
|
||||
"""Retrieve a paste."""
|
||||
paste_id = args.id.split("/")[-1] # Handle full URLs
|
||||
base = config["server"].rstrip("/")
|
||||
|
||||
if args.meta:
|
||||
url = f"{base}/{paste_id}"
|
||||
status, body, _ = request(url)
|
||||
if status == 200:
|
||||
data = json.loads(body)
|
||||
print(f"id: {data['id']}")
|
||||
print(f"mime_type: {data['mime_type']}")
|
||||
print(f"size: {data['size']}")
|
||||
print(f"created_at: {data['created_at']}")
|
||||
else:
|
||||
die(f"not found: {paste_id}")
|
||||
else:
|
||||
url = f"{base}/{paste_id}/raw"
|
||||
status, body, headers = request(url)
|
||||
if status == 200:
|
||||
if args.output:
|
||||
Path(args.output).write_bytes(body)
|
||||
print(f"saved: {args.output}", file=sys.stderr)
|
||||
else:
|
||||
# Write binary to stdout
|
||||
sys.stdout.buffer.write(body)
|
||||
# Add newline if content doesn't end with one and stdout is tty
|
||||
if sys.stdout.isatty() and body and not body.endswith(b"\n"):
|
||||
sys.stdout.buffer.write(b"\n")
|
||||
else:
|
||||
die(f"not found: {paste_id}")
|
||||
|
||||
|
||||
def cmd_delete(args, config):
|
||||
"""Delete a paste."""
|
||||
if not config["cert_sha1"]:
|
||||
die("authentication required (set FLASKPASTE_CERT_SHA1)")
|
||||
|
||||
paste_id = args.id.split("/")[-1]
|
||||
base = config["server"].rstrip("/")
|
||||
url = f"{base}/{paste_id}"
|
||||
|
||||
headers = {"X-SSL-Client-SHA1": config["cert_sha1"]}
|
||||
status, body, _ = request(url, method="DELETE", headers=headers)
|
||||
|
||||
if status == 200:
|
||||
print(f"deleted: {paste_id}")
|
||||
elif status == 404:
|
||||
die(f"not found: {paste_id}")
|
||||
elif status == 403:
|
||||
die("permission denied (not owner)")
|
||||
elif status == 401:
|
||||
die("authentication failed")
|
||||
else:
|
||||
die(f"delete failed ({status})")
|
||||
|
||||
|
||||
def cmd_info(args, config):
|
||||
"""Show server info."""
|
||||
url = config["server"].rstrip("/") + "/"
|
||||
status, body, _ = request(url)
|
||||
|
||||
if status == 200:
|
||||
data = json.loads(body)
|
||||
print(f"server: {config['server']}")
|
||||
print(f"name: {data.get('name', 'unknown')}")
|
||||
print(f"version: {data.get('version', 'unknown')}")
|
||||
else:
|
||||
die("failed to connect to server")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="fpaste",
|
||||
description="FlaskPaste command-line client",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-s", "--server",
|
||||
help="server URL (default: $FLASKPASTE_SERVER or http://localhost:5000)",
|
||||
)
|
||||
subparsers = parser.add_subparsers(dest="command", metavar="command")
|
||||
|
||||
# create
|
||||
p_create = subparsers.add_parser("create", aliases=["c", "new"], help="create paste")
|
||||
p_create.add_argument("file", nargs="?", help="file to upload (- for stdin)")
|
||||
p_create.add_argument("-r", "--raw", action="store_true", help="output raw URL")
|
||||
p_create.add_argument("-q", "--quiet", action="store_true", help="output ID only")
|
||||
|
||||
# get
|
||||
p_get = subparsers.add_parser("get", aliases=["g"], help="retrieve paste")
|
||||
p_get.add_argument("id", help="paste ID or URL")
|
||||
p_get.add_argument("-o", "--output", help="save to file")
|
||||
p_get.add_argument("-m", "--meta", action="store_true", help="show metadata only")
|
||||
|
||||
# delete
|
||||
p_delete = subparsers.add_parser("delete", aliases=["d", "rm"], help="delete paste")
|
||||
p_delete.add_argument("id", help="paste ID or URL")
|
||||
|
||||
# info
|
||||
subparsers.add_parser("info", aliases=["i"], help="show server info")
|
||||
|
||||
args = parser.parse_args()
|
||||
config = get_config()
|
||||
|
||||
if args.server:
|
||||
config["server"] = args.server
|
||||
|
||||
if not args.command:
|
||||
# Default: create from stdin if data is piped
|
||||
if not sys.stdin.isatty():
|
||||
args.command = "create"
|
||||
args.file = None
|
||||
args.raw = False
|
||||
args.quiet = False
|
||||
else:
|
||||
parser.print_help()
|
||||
sys.exit(0)
|
||||
|
||||
if args.command in ("create", "c", "new"):
|
||||
cmd_create(args, config)
|
||||
elif args.command in ("get", "g"):
|
||||
cmd_get(args, config)
|
||||
elif args.command in ("delete", "d", "rm"):
|
||||
cmd_delete(args, config)
|
||||
elif args.command in ("info", "i"):
|
||||
cmd_info(args, config)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user