diff --git a/fpaste b/fpaste index ebd026f..1e3af80 100755 --- a/fpaste +++ b/fpaste @@ -38,7 +38,7 @@ except ImportError: # Constants CONFIG_DIR = Path.home() / ".config" / "fpaste" CONFIG_FILE = CONFIG_DIR / "config" -CONFIG_KEYS = frozenset({"server", "cert_sha1", "client_cert", "client_key", "ca_cert"}) +CONFIG_KEYS = frozenset({"server", "endpoint", "cert_sha1", "client_cert", "client_key", "ca_cert"}) MIME_EXTENSIONS: dict[str, str] = { "text/plain": ".txt", @@ -215,7 +215,8 @@ def write_config_file( def get_config() -> dict[str, Any]: """Load configuration from environment and config file.""" config: dict[str, Any] = { - "server": os.environ.get("FLASKPASTE_SERVER", "http://localhost:5000"), + "server": os.environ.get("FLASKPASTE_SERVER", "https://paste.mymx.me"), + "endpoint": os.environ.get("FLASKPASTE_ENDPOINT", ""), "cert_sha1": os.environ.get("FLASKPASTE_CERT_SHA1", ""), "client_cert": os.environ.get("FLASKPASTE_CLIENT_CERT", ""), "client_key": os.environ.get("FLASKPASTE_CLIENT_KEY", ""), @@ -256,6 +257,24 @@ def create_ssl_context(config: Mapping[str, Any]) -> ssl.SSLContext | None: return ctx +def build_url(config: Mapping[str, Any], path: str = "") -> str: + """Build full URL from server, endpoint, and path.""" + server = config["server"].rstrip("/") + endpoint = config.get("endpoint", "").strip("/") + # Preserve trailing slash for root path "/" + trailing_slash = path == "/" or (path.endswith("/") and path != "/") + path = path.strip("/") + + if endpoint: + base = f"{server}/{endpoint}" + if path: + return f"{base}/{path}/" if trailing_slash else f"{base}/{path}" + return f"{base}/" if trailing_slash else base + if path: + return f"{server}/{path}/" if trailing_slash else f"{server}/{path}" + return f"{server}/" if trailing_slash else server + + # ----------------------------------------------------------------------------- # Encryption # ----------------------------------------------------------------------------- @@ -334,10 +353,10 @@ def solve_pow(nonce: str, difficulty: int) -> int: def get_challenge( config: Mapping[str, Any], - endpoint: str = "/challenge", + path: str = "/challenge", ) -> dict[str, Any] | None: """Fetch PoW challenge from server.""" - url = config["server"].rstrip("/") + endpoint + url = build_url(config, path) status, body, _ = request(url, ssl_context=config.get("ssl_context")) if status != 200: @@ -670,7 +689,7 @@ def cmd_create(args: argparse.Namespace, config: dict[str, Any]) -> None: if args.password: base_headers["X-Paste-Password"] = args.password - url = config["server"].rstrip("/") + "/" + url = build_url(config, "/") max_retries = 5 last_error = "" @@ -706,14 +725,13 @@ def cmd_create(args: argparse.Namespace, config: dict[str, Any]) -> None: if status == 201: data = json.loads(body) key_fragment = f"#{encode_key(encryption_key)}" if encryption_key else "" - base_url = config["server"].rstrip("/") if args.raw: - result_url = base_url + data["raw"] + key_fragment + result_url = build_url(config, data["raw"]) + key_fragment elif args.quiet: result_url = data["id"] + key_fragment else: - result_url = base_url + data["url"] + key_fragment + result_url = build_url(config, data["url"]) + key_fragment print(result_url) @@ -740,14 +758,13 @@ def cmd_create(args: argparse.Namespace, config: dict[str, Any]) -> None: def cmd_get(args: argparse.Namespace, config: dict[str, Any]) -> None: """Retrieve a paste.""" paste_id, encryption_key = extract_paste_id(args.id) - base = config["server"].rstrip("/") headers: dict[str, str] = {} if args.password: headers["X-Paste-Password"] = args.password if args.meta: - url = f"{base}/{paste_id}" + url = build_url(config, paste_id) status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context")) if status == 200: @@ -767,7 +784,7 @@ def cmd_get(args: argparse.Namespace, config: dict[str, Any]) -> None: else: die(f"not found: {paste_id}") else: - url = f"{base}/{paste_id}/raw" + url = build_url(config, f"{paste_id}/raw") status, body, _ = request(url, headers=headers, ssl_context=config.get("ssl_context")) if status == 200: @@ -809,7 +826,7 @@ def cmd_delete(args: argparse.Namespace, config: dict[str, Any]) -> None: if delete_all: # Fetch all pastes to get count and IDs - url = f"{config['server'].rstrip('/')}/pastes?all=1&limit=1000" + url = build_url(config, "/pastes") + "?all=1&limit=1000" status, body, _ = request( url, headers=auth_headers(config), ssl_context=config.get("ssl_context") ) @@ -838,7 +855,7 @@ def cmd_delete(args: argparse.Namespace, config: dict[str, Any]) -> None: deleted = 0 failed = 0 for paste_id in paste_ids: - url = f"{config['server'].rstrip('/')}/{paste_id}" + url = build_url(config, paste_id) status, _, _ = request( url, method="DELETE", @@ -865,7 +882,7 @@ def cmd_delete(args: argparse.Namespace, config: dict[str, Any]) -> None: def cmd_info(args: argparse.Namespace, config: dict[str, Any]) -> None: """Show server info.""" - url = config["server"].rstrip("/") + "/" + url = build_url(config, "/") status, body, _ = request(url, ssl_context=config.get("ssl_context")) if status != 200: @@ -899,7 +916,7 @@ def cmd_list(args: argparse.Namespace, config: dict[str, Any]) -> None: if args.offset: params.append(f"offset={args.offset}") - url = f"{config['server'].rstrip('/')}/pastes" + url = build_url(config, "/pastes") if params: url += "?" + "&".join(params) @@ -932,7 +949,7 @@ def cmd_search(args: argparse.Namespace, config: dict[str, Any]) -> None: if args.limit: params.append(f"limit={args.limit}") - url = f"{config['server'].rstrip('/')}/pastes" + url = build_url(config, "/pastes") if params: url += "?" + "&".join(params) @@ -956,7 +973,7 @@ def cmd_update(args: argparse.Namespace, config: dict[str, Any]) -> None: require_auth(config) paste_id, _ = extract_paste_id(args.id) - url = f"{config['server'].rstrip('/')}/{paste_id}" + url = build_url(config, paste_id) headers = auth_headers(config) content: bytes | None = None @@ -997,8 +1014,7 @@ def cmd_update(args: argparse.Namespace, config: dict[str, Any]) -> None: print(" password: protected") if content and encryption_key: - base = config["server"].rstrip("/") - print(f" key: {base}/{paste_id}#{encode_key(encryption_key)}") + print(f" key: {build_url(config, paste_id)}#{encode_key(encryption_key)}") elif status == 400: die(parse_error(body, "bad request")) elif status == 401: @@ -1031,7 +1047,7 @@ def cmd_export(args: argparse.Namespace, config: dict[str, Any]) -> None: keys[paste_id.strip()] = key_encoded.strip() # Fetch paste list - url = f"{config['server'].rstrip('/')}/pastes?limit=1000" + url = build_url(config, "/pastes") + "?limit=1000" status, body, _ = request( url, headers=auth_headers(config), ssl_context=config.get("ssl_context") ) @@ -1068,7 +1084,7 @@ def cmd_export(args: argparse.Namespace, config: dict[str, Any]) -> None: skipped += 1 continue - raw_url = f"{config['server'].rstrip('/')}/{paste_id}/raw" + raw_url = build_url(config, f"{paste_id}/raw") status, content, _ = request( raw_url, headers=auth_headers(config), ssl_context=config.get("ssl_context") ) @@ -1122,7 +1138,7 @@ def cmd_export(args: argparse.Namespace, config: dict[str, Any]) -> None: def cmd_pki_status(args: argparse.Namespace, config: dict[str, Any]) -> None: """Show PKI status and CA information.""" - url = config["server"].rstrip("/") + "/pki" + url = build_url(config, "/pki") status, body, _ = request(url, ssl_context=config.get("ssl_context")) if status == 404: @@ -1141,14 +1157,14 @@ def cmd_pki_status(args: argparse.Namespace, config: dict[str, Any]) -> None: print(f"created: {data.get('created_at')}") if data.get("expires_at"): print(f"expires: {data.get('expires_at')}") - print(f"download: {config['server'].rstrip('/')}{data.get('download', '/pki/ca.crt')}") + print(f"download: {build_url(config, data.get('download', '/pki/ca.crt'))}") elif hint := data.get("hint"): print(f"hint: {hint}") def cmd_pki_issue(args: argparse.Namespace, config: dict[str, Any]) -> None: """Request a new client certificate from the server CA.""" - url = config["server"].rstrip("/") + "/pki/issue" + url = build_url(config, "/pki/issue") headers = {"Content-Type": "application/json", **auth_headers(config)} payload = json.dumps({"common_name": args.name}).encode() @@ -1205,7 +1221,7 @@ def cmd_pki_issue(args: argparse.Namespace, config: dict[str, Any]) -> None: def cmd_pki_download(args: argparse.Namespace, config: dict[str, Any]) -> None: """Download the CA certificate from the server.""" - url = config["server"].rstrip("/") + "/pki/ca.crt" + url = build_url(config, "/pki/ca.crt") status, body, _ = request(url, ssl_context=config.get("ssl_context")) if status == 404: @@ -1239,14 +1255,14 @@ def cmd_register(args: argparse.Namespace, config: dict[str, Any]) -> None: from cryptography.hazmat.primitives.serialization import pkcs12 - url = config["server"].rstrip("/") + "/register" + url = build_url(config, "/register") headers = {"Content-Type": "application/json"} payload: dict[str, str] = {} if args.name: payload["common_name"] = args.name - if challenge := get_challenge(config, endpoint="/register/challenge"): + if challenge := get_challenge(config, path="/register/challenge"): if not args.quiet: print(f"solving pow ({challenge['difficulty']} bits)...", end="", file=sys.stderr) solution = solve_pow(challenge["nonce"], challenge["difficulty"])