fpaste: add register command for public certificate enrollment
Some checks failed
CI / Lint & Format (push) Failing after 15s
CI / Tests (push) Has been skipped
CI / Security Scan (push) Failing after 19s

- Add register command to obtain client cert from server
- Solve PoW challenge, receive PKCS#12 bundle
- Extract cert/key, optionally update config (--configure)
- Fix registration to work without PKI_ENABLED (only needs PKI_CA_PASSWORD)
- Add skip_enabled_check param to get_ca_info() for registration path
- Update docs: README examples, API header name fix (X-Fingerprint-SHA1)
This commit is contained in:
Username
2025-12-21 10:59:09 +01:00
parent 5849c7406f
commit 880bf631e3
5 changed files with 191 additions and 10 deletions

View File

@@ -143,6 +143,17 @@ echo "Hello" | ./fpaste
# Show server info
./fpaste info
# Register and get client certificate (if server supports it)
./fpaste register
# Saves certificate to ~/.config/fpaste/
# Register with auto-configuration
./fpaste register --configure
# Creates cert files and updates config with fingerprint
# Register with custom name
./fpaste register -n my-laptop --configure
```
### End-to-End Encryption

View File

@@ -912,8 +912,9 @@ class RegisterView(MethodView):
# Generate random common name if not provided
common_name = f"client-{secrets.token_hex(4)}"
# Auto-generate CA if needed
if get_ca_info() is None:
# Auto-generate CA if needed (skip PKI_ENABLED check for registration)
ca_info = get_ca_info(skip_enabled_check=True)
if ca_info is None:
ca_days = current_app.config.get("PKI_CA_DAYS", 3650)
try:
ca_info = generate_ca("FlaskPaste CA", password, days=ca_days)
@@ -935,8 +936,9 @@ class RegisterView(MethodView):
current_app.logger.error("Certificate issuance failed: %s", e)
return error_response("Certificate issuance failed", 500)
# Load certificates for PKCS#12 creation
ca_info = get_ca_info()
# Load CA cert for PKCS#12 (reuse ca_info from above, or refresh if it was just generated)
if ca_info is None or "certificate_pem" not in ca_info:
ca_info = get_ca_info(skip_enabled_check=True)
ca_cert = x509.load_pem_x509_certificate(ca_info["certificate_pem"].encode())
client_cert = x509.load_pem_x509_certificate(
cert_info["certificate_pem"].encode()

View File

@@ -736,9 +736,12 @@ def is_certificate_valid(fingerprint: str) -> bool:
# ─────────────────────────────────────────────────────────────────────────────
def get_ca_info() -> dict | None:
def get_ca_info(skip_enabled_check: bool = False) -> dict | None:
"""Get CA information for status endpoint.
Args:
skip_enabled_check: If True, skip the PKI_ENABLED check (for registration)
Returns:
Dict with CA info or None if no CA exists
"""
@@ -746,7 +749,7 @@ def get_ca_info() -> dict | None:
from app.database import get_db
if not current_app.config.get("PKI_ENABLED"):
if not skip_enabled_check and not current_app.config.get("PKI_ENABLED"):
return None
db = get_db()

View File

@@ -1119,7 +1119,8 @@ Content-Type: application/json
|--------|-------------|
| `X-Fingerprint-SHA1` | SHA1 fingerprint for `X-SSL-Client-SHA1` header |
| `X-Certificate-Expires` | Unix timestamp when certificate expires |
- Client certificate (signed by CA)
**PKCS#12 Bundle Contents:**
- Client certificate (signed by CA)
- Client private key (EC secp384r1)
- CA certificate (for trust chain)
@@ -1127,7 +1128,8 @@ X-Certificate-Fingerprint: b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3
**Errors:**
| Code | Description |
|------|-------------|
| 400 | Proof-of-work failed (invalid/expired challenge) |
| 400 | Proof-of-work required (when enabled) |
| 400 | Proof-of-work failed (invalid/expired challenge) |
| 500 | PKI_CA_PASSWORD not configured |
| 500 | Certificate generation failed |

167
fpaste
View File

@@ -181,9 +181,9 @@ def solve_pow(nonce, difficulty):
return n
def get_challenge(config):
def get_challenge(config, endpoint="/challenge"):
"""Fetch PoW challenge from server."""
url = config["server"].rstrip("/") + "/challenge"
url = config["server"].rstrip("/") + endpoint
status, body, _ = request(url, ssl_context=config.get("ssl_context"))
if status != 200:
@@ -196,6 +196,11 @@ def get_challenge(config):
return data
def get_register_challenge(config):
"""Fetch registration PoW challenge from server."""
return get_challenge(config, endpoint="/register/challenge")
def cmd_create(args, config):
"""Create a new paste."""
# Read content from file or stdin
@@ -1010,6 +1015,146 @@ def cmd_pki_download(args, config):
sys.stdout.buffer.write(body)
def cmd_register(args, config):
"""Register and obtain a client certificate from the server."""
if not HAS_CRYPTO:
die("register requires 'cryptography' package: pip install cryptography")
# Import pkcs12 for parsing the response
from cryptography.hazmat.primitives.serialization import pkcs12
url = config["server"].rstrip("/") + "/register"
# Build headers
headers = {"Content-Type": "application/json"}
# Prepare payload
payload = {}
if args.name:
payload["common_name"] = args.name
# Get and solve PoW challenge if required
challenge = get_register_challenge(config)
if challenge:
if not args.quiet:
diff = challenge["difficulty"]
print(f"solving pow ({diff} bits)...", end="", file=sys.stderr)
solution = solve_pow(challenge["nonce"], challenge["difficulty"])
if not args.quiet:
print(" done", file=sys.stderr)
headers["X-PoW-Token"] = challenge["token"]
headers["X-PoW-Solution"] = str(solution)
# Make request
data = json.dumps(payload).encode() if payload else b"{}"
status, body, resp_headers = request(
url, method="POST", data=data, headers=headers, ssl_context=config.get("ssl_context")
)
if status == 400:
try:
err = json.loads(body).get("error", "bad request")
except (json.JSONDecodeError, UnicodeDecodeError):
err = "bad request"
die(err)
elif status == 500:
try:
err = json.loads(body).get("error", "server error")
except (json.JSONDecodeError, UnicodeDecodeError):
err = "server error"
die(err)
elif status != 200:
die(f"registration failed ({status})")
# Get fingerprint from response header
fingerprint = resp_headers.get("X-Fingerprint-SHA1", "unknown")
# Determine output directory
out_dir = Path(args.output) if args.output else Path.home() / ".config" / "fpaste"
out_dir.mkdir(parents=True, exist_ok=True)
# File paths
p12_file = out_dir / "client.p12"
key_file = out_dir / "client.key"
cert_file = out_dir / "client.crt"
# Check for existing files
if not args.force:
if p12_file.exists():
die(f"p12 file exists: {p12_file} (use --force)")
if not args.p12_only:
if key_file.exists():
die(f"key file exists: {key_file} (use --force)")
if cert_file.exists():
die(f"cert file exists: {cert_file} (use --force)")
# Save PKCS#12 bundle
p12_file.write_bytes(body)
p12_file.chmod(0o600)
print(f"pkcs12: {p12_file}", file=sys.stderr)
# Extract certificate and key unless --p12-only
if not args.p12_only:
# Parse PKCS#12 bundle (no password)
private_key, certificate, _additional_certs = pkcs12.load_key_and_certificates(body, None)
if private_key is None or certificate is None:
die("failed to parse PKCS#12 bundle")
# Serialize private key
key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
# Serialize certificate
cert_pem = certificate.public_bytes(serialization.Encoding.PEM)
# Write files
key_file.write_bytes(key_pem)
key_file.chmod(0o600)
cert_file.write_bytes(cert_pem)
print(f"key: {key_file}", file=sys.stderr)
print(f"certificate: {cert_file}", file=sys.stderr)
print(f"fingerprint: {fingerprint}", file=sys.stderr)
# Get common name from certificate
if not args.p12_only:
cn = certificate.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
if cn:
print(f"common name: {cn[0].value}", file=sys.stderr)
# Update config file if requested
if args.configure and not args.p12_only:
config_file = Path.home() / ".config" / "fpaste" / "config"
config_file.parent.mkdir(parents=True, exist_ok=True)
# Read existing config
existing = {}
if config_file.exists():
for line in config_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
existing[k.strip().lower()] = v.strip()
# Update values
existing["client_cert"] = str(cert_file)
existing["client_key"] = str(key_file)
existing["cert_sha1"] = fingerprint
# Write config
lines = [f"{k} = {v}" for k, v in sorted(existing.items())]
config_file.write_text("\n".join(lines) + "\n")
print(f"config: {config_file} (updated)", file=sys.stderr)
# Output fingerprint to stdout for easy capture
print(fingerprint)
def cmd_cert(args, config):
"""Generate a self-signed client certificate for mTLS authentication."""
if not HAS_CRYPTO:
@@ -1234,6 +1379,7 @@ def main():
"update",
"u",
"export",
"register",
"cert",
"pki",
}
@@ -1338,6 +1484,21 @@ def main():
p_export.add_argument("--manifest", action="store_true", help="write manifest.json")
p_export.add_argument("-q", "--quiet", action="store_true", help="minimal output")
# register
p_register = subparsers.add_parser("register", help="register and get client certificate")
p_register.add_argument("-n", "--name", metavar="CN", help="common name (optional)")
p_register.add_argument(
"-o", "--output", metavar="DIR", help="output directory (default: ~/.config/fpaste)"
)
p_register.add_argument(
"--configure", action="store_true", help="update config file with cert paths"
)
p_register.add_argument(
"--p12-only", action="store_true", help="save only PKCS#12, don't extract cert/key"
)
p_register.add_argument("-f", "--force", action="store_true", help="overwrite existing files")
p_register.add_argument("-q", "--quiet", action="store_true", help="minimal output")
# cert
p_cert = subparsers.add_parser("cert", help="generate client certificate")
p_cert.add_argument("-o", "--output", metavar="DIR", help="output directory")
@@ -1427,6 +1588,8 @@ def main():
cmd_update(args, config)
elif args.command == "export":
cmd_export(args, config)
elif args.command == "register":
cmd_register(args, config)
elif args.command == "cert":
cmd_cert(args, config)
elif args.command == "pki":