diff --git a/README.md b/README.md index 39ab0ec..4ccbcd9 100644 --- a/README.md +++ b/README.md @@ -143,6 +143,17 @@ echo "Hello" | ./fpaste # Show server info ./fpaste info + +# Register and get client certificate (if server supports it) +./fpaste register +# Saves certificate to ~/.config/fpaste/ + +# Register with auto-configuration +./fpaste register --configure +# Creates cert files and updates config with fingerprint + +# Register with custom name +./fpaste register -n my-laptop --configure ``` ### End-to-End Encryption diff --git a/app/api/routes.py b/app/api/routes.py index 94ea0d2..a6cf49c 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -912,8 +912,9 @@ class RegisterView(MethodView): # Generate random common name if not provided common_name = f"client-{secrets.token_hex(4)}" - # Auto-generate CA if needed - if get_ca_info() is None: + # Auto-generate CA if needed (skip PKI_ENABLED check for registration) + ca_info = get_ca_info(skip_enabled_check=True) + if ca_info is None: ca_days = current_app.config.get("PKI_CA_DAYS", 3650) try: ca_info = generate_ca("FlaskPaste CA", password, days=ca_days) @@ -935,8 +936,9 @@ class RegisterView(MethodView): current_app.logger.error("Certificate issuance failed: %s", e) return error_response("Certificate issuance failed", 500) - # Load certificates for PKCS#12 creation - ca_info = get_ca_info() + # Load CA cert for PKCS#12 (reuse ca_info from above, or refresh if it was just generated) + if ca_info is None or "certificate_pem" not in ca_info: + ca_info = get_ca_info(skip_enabled_check=True) ca_cert = x509.load_pem_x509_certificate(ca_info["certificate_pem"].encode()) client_cert = x509.load_pem_x509_certificate( cert_info["certificate_pem"].encode() diff --git a/app/pki.py b/app/pki.py index fe8dd70..20b0571 100644 --- a/app/pki.py +++ b/app/pki.py @@ -736,9 +736,12 @@ def is_certificate_valid(fingerprint: str) -> bool: # ───────────────────────────────────────────────────────────────────────────── -def get_ca_info() -> dict | None: +def get_ca_info(skip_enabled_check: bool = False) -> dict | None: """Get CA information for status endpoint. + Args: + skip_enabled_check: If True, skip the PKI_ENABLED check (for registration) + Returns: Dict with CA info or None if no CA exists """ @@ -746,7 +749,7 @@ def get_ca_info() -> dict | None: from app.database import get_db - if not current_app.config.get("PKI_ENABLED"): + if not skip_enabled_check and not current_app.config.get("PKI_ENABLED"): return None db = get_db() diff --git a/documentation/api.md b/documentation/api.md index 5472a1e..39cdca7 100644 --- a/documentation/api.md +++ b/documentation/api.md @@ -1119,7 +1119,8 @@ Content-Type: application/json HTTP/1.1 200 OK Content-Type: application/x-pkcs12 Content-Disposition: attachment; filename="client.p12" -X-Certificate-Fingerprint: b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3 +X-Fingerprint-SHA1: b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3 +X-Certificate-Expires: 1731533400 ``` @@ -1127,7 +1128,8 @@ X-Certificate-Fingerprint: b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3 **Response Headers:** | Header | Description | |--------|-------------| -| `X-Certificate-Fingerprint` | SHA1 fingerprint for `X-SSL-Client-SHA1` header | +| `X-Fingerprint-SHA1` | SHA1 fingerprint for `X-SSL-Client-SHA1` header | +| `X-Certificate-Expires` | Unix timestamp when certificate expires | **PKCS#12 Bundle Contents:** - Client certificate (signed by CA) diff --git a/fpaste b/fpaste index 026368d..7684818 100755 --- a/fpaste +++ b/fpaste @@ -181,9 +181,9 @@ def solve_pow(nonce, difficulty): return n -def get_challenge(config): +def get_challenge(config, endpoint="/challenge"): """Fetch PoW challenge from server.""" - url = config["server"].rstrip("/") + "/challenge" + url = config["server"].rstrip("/") + endpoint status, body, _ = request(url, ssl_context=config.get("ssl_context")) if status != 200: @@ -196,6 +196,11 @@ def get_challenge(config): return data +def get_register_challenge(config): + """Fetch registration PoW challenge from server.""" + return get_challenge(config, endpoint="/register/challenge") + + def cmd_create(args, config): """Create a new paste.""" # Read content from file or stdin @@ -1010,6 +1015,146 @@ def cmd_pki_download(args, config): sys.stdout.buffer.write(body) +def cmd_register(args, config): + """Register and obtain a client certificate from the server.""" + if not HAS_CRYPTO: + die("register requires 'cryptography' package: pip install cryptography") + + # Import pkcs12 for parsing the response + from cryptography.hazmat.primitives.serialization import pkcs12 + + url = config["server"].rstrip("/") + "/register" + + # Build headers + headers = {"Content-Type": "application/json"} + + # Prepare payload + payload = {} + if args.name: + payload["common_name"] = args.name + + # Get and solve PoW challenge if required + challenge = get_register_challenge(config) + if challenge: + if not args.quiet: + diff = challenge["difficulty"] + print(f"solving pow ({diff} bits)...", end="", file=sys.stderr) + solution = solve_pow(challenge["nonce"], challenge["difficulty"]) + if not args.quiet: + print(" done", file=sys.stderr) + headers["X-PoW-Token"] = challenge["token"] + headers["X-PoW-Solution"] = str(solution) + + # Make request + data = json.dumps(payload).encode() if payload else b"{}" + status, body, resp_headers = request( + url, method="POST", data=data, headers=headers, ssl_context=config.get("ssl_context") + ) + + if status == 400: + try: + err = json.loads(body).get("error", "bad request") + except (json.JSONDecodeError, UnicodeDecodeError): + err = "bad request" + die(err) + elif status == 500: + try: + err = json.loads(body).get("error", "server error") + except (json.JSONDecodeError, UnicodeDecodeError): + err = "server error" + die(err) + elif status != 200: + die(f"registration failed ({status})") + + # Get fingerprint from response header + fingerprint = resp_headers.get("X-Fingerprint-SHA1", "unknown") + + # Determine output directory + out_dir = Path(args.output) if args.output else Path.home() / ".config" / "fpaste" + out_dir.mkdir(parents=True, exist_ok=True) + + # File paths + p12_file = out_dir / "client.p12" + key_file = out_dir / "client.key" + cert_file = out_dir / "client.crt" + + # Check for existing files + if not args.force: + if p12_file.exists(): + die(f"p12 file exists: {p12_file} (use --force)") + if not args.p12_only: + if key_file.exists(): + die(f"key file exists: {key_file} (use --force)") + if cert_file.exists(): + die(f"cert file exists: {cert_file} (use --force)") + + # Save PKCS#12 bundle + p12_file.write_bytes(body) + p12_file.chmod(0o600) + print(f"pkcs12: {p12_file}", file=sys.stderr) + + # Extract certificate and key unless --p12-only + if not args.p12_only: + # Parse PKCS#12 bundle (no password) + private_key, certificate, _additional_certs = pkcs12.load_key_and_certificates(body, None) + + if private_key is None or certificate is None: + die("failed to parse PKCS#12 bundle") + + # Serialize private key + key_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + + # Serialize certificate + cert_pem = certificate.public_bytes(serialization.Encoding.PEM) + + # Write files + key_file.write_bytes(key_pem) + key_file.chmod(0o600) + cert_file.write_bytes(cert_pem) + + print(f"key: {key_file}", file=sys.stderr) + print(f"certificate: {cert_file}", file=sys.stderr) + + print(f"fingerprint: {fingerprint}", file=sys.stderr) + + # Get common name from certificate + if not args.p12_only: + cn = certificate.subject.get_attributes_for_oid(NameOID.COMMON_NAME) + if cn: + print(f"common name: {cn[0].value}", file=sys.stderr) + + # Update config file if requested + if args.configure and not args.p12_only: + config_file = Path.home() / ".config" / "fpaste" / "config" + config_file.parent.mkdir(parents=True, exist_ok=True) + + # Read existing config + existing = {} + if config_file.exists(): + for line in config_file.read_text().splitlines(): + line = line.strip() + if line and not line.startswith("#") and "=" in line: + k, v = line.split("=", 1) + existing[k.strip().lower()] = v.strip() + + # Update values + existing["client_cert"] = str(cert_file) + existing["client_key"] = str(key_file) + existing["cert_sha1"] = fingerprint + + # Write config + lines = [f"{k} = {v}" for k, v in sorted(existing.items())] + config_file.write_text("\n".join(lines) + "\n") + print(f"config: {config_file} (updated)", file=sys.stderr) + + # Output fingerprint to stdout for easy capture + print(fingerprint) + + def cmd_cert(args, config): """Generate a self-signed client certificate for mTLS authentication.""" if not HAS_CRYPTO: @@ -1234,6 +1379,7 @@ def main(): "update", "u", "export", + "register", "cert", "pki", } @@ -1338,6 +1484,21 @@ def main(): p_export.add_argument("--manifest", action="store_true", help="write manifest.json") p_export.add_argument("-q", "--quiet", action="store_true", help="minimal output") + # register + p_register = subparsers.add_parser("register", help="register and get client certificate") + p_register.add_argument("-n", "--name", metavar="CN", help="common name (optional)") + p_register.add_argument( + "-o", "--output", metavar="DIR", help="output directory (default: ~/.config/fpaste)" + ) + p_register.add_argument( + "--configure", action="store_true", help="update config file with cert paths" + ) + p_register.add_argument( + "--p12-only", action="store_true", help="save only PKCS#12, don't extract cert/key" + ) + p_register.add_argument("-f", "--force", action="store_true", help="overwrite existing files") + p_register.add_argument("-q", "--quiet", action="store_true", help="minimal output") + # cert p_cert = subparsers.add_parser("cert", help="generate client certificate") p_cert.add_argument("-o", "--output", metavar="DIR", help="output directory") @@ -1427,6 +1588,8 @@ def main(): cmd_update(args, config) elif args.command == "export": cmd_export(args, config) + elif args.command == "register": + cmd_register(args, config) elif args.command == "cert": cmd_cert(args, config) elif args.command == "pki":