feat: add CertFP authentication with SASL EXTERNAL
Per-network, per-nick client certificates (EC P-256, self-signed, 10-year validity) stored as combined PEM files. Authentication cascade: SASL EXTERNAL > SASL PLAIN > NickServ IDENTIFY. New commands: GENCERT, CERTFP, DELCERT. GENCERT auto-registers the fingerprint with NickServ CERT ADD when the network is connected. Includes email verification module for NickServ registration and expanded NickServ interaction (IDENTIFY, REGISTER, VERIFY).
This commit is contained in:
135
tests/test_cert.py
Normal file
135
tests/test_cert.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""Tests for client certificate management."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from bouncer.cert import (
|
||||
cert_path,
|
||||
delete_cert,
|
||||
fingerprint,
|
||||
generate_cert,
|
||||
has_cert,
|
||||
list_certs,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def data_dir(tmp_path: Path) -> Path:
|
||||
"""Provide a temporary data directory."""
|
||||
return tmp_path
|
||||
|
||||
|
||||
class TestCertPath:
|
||||
def test_standard_path(self, data_dir: Path) -> None:
|
||||
p = cert_path(data_dir, "libera", "fabesune")
|
||||
assert p == data_dir / "certs" / "libera" / "fabesune.pem"
|
||||
|
||||
|
||||
class TestGenerateCert:
|
||||
def test_creates_pem_file(self, data_dir: Path) -> None:
|
||||
pem = generate_cert(data_dir, "libera", "testnick")
|
||||
assert pem.is_file()
|
||||
assert pem == cert_path(data_dir, "libera", "testnick")
|
||||
|
||||
def test_pem_contains_cert_and_key(self, data_dir: Path) -> None:
|
||||
pem = generate_cert(data_dir, "libera", "testnick")
|
||||
content = pem.read_text()
|
||||
assert "BEGIN CERTIFICATE" in content
|
||||
assert "BEGIN PRIVATE KEY" in content
|
||||
|
||||
def test_file_permissions(self, data_dir: Path) -> None:
|
||||
pem = generate_cert(data_dir, "libera", "testnick")
|
||||
mode = pem.stat().st_mode & 0o777
|
||||
assert mode == 0o600
|
||||
|
||||
def test_overwrites_existing(self, data_dir: Path) -> None:
|
||||
pem1 = generate_cert(data_dir, "libera", "testnick")
|
||||
fp1 = fingerprint(pem1)
|
||||
pem2 = generate_cert(data_dir, "libera", "testnick")
|
||||
fp2 = fingerprint(pem2)
|
||||
assert pem1 == pem2
|
||||
assert fp1 != fp2 # New cert = new fingerprint
|
||||
|
||||
|
||||
class TestFingerprint:
|
||||
def test_format(self, data_dir: Path) -> None:
|
||||
pem = generate_cert(data_dir, "libera", "testnick")
|
||||
fp = fingerprint(pem)
|
||||
parts = fp.split(":")
|
||||
assert len(parts) == 32 # SHA-256 = 32 bytes
|
||||
for part in parts:
|
||||
assert len(part) == 2
|
||||
int(part, 16) # Must be valid hex
|
||||
|
||||
def test_uppercase_hex(self, data_dir: Path) -> None:
|
||||
pem = generate_cert(data_dir, "libera", "testnick")
|
||||
fp = fingerprint(pem)
|
||||
assert fp == fp.upper()
|
||||
|
||||
def test_deterministic_for_same_cert(self, data_dir: Path) -> None:
|
||||
pem = generate_cert(data_dir, "libera", "testnick")
|
||||
assert fingerprint(pem) == fingerprint(pem)
|
||||
|
||||
|
||||
class TestHasCert:
|
||||
def test_exists(self, data_dir: Path) -> None:
|
||||
generate_cert(data_dir, "libera", "testnick")
|
||||
assert has_cert(data_dir, "libera", "testnick") is True
|
||||
|
||||
def test_not_exists(self, data_dir: Path) -> None:
|
||||
assert has_cert(data_dir, "libera", "testnick") is False
|
||||
|
||||
|
||||
class TestDeleteCert:
|
||||
def test_delete_existing(self, data_dir: Path) -> None:
|
||||
generate_cert(data_dir, "libera", "testnick")
|
||||
assert delete_cert(data_dir, "libera", "testnick") is True
|
||||
assert has_cert(data_dir, "libera", "testnick") is False
|
||||
|
||||
def test_delete_nonexistent(self, data_dir: Path) -> None:
|
||||
assert delete_cert(data_dir, "libera", "testnick") is False
|
||||
|
||||
def test_cleans_empty_dir(self, data_dir: Path) -> None:
|
||||
generate_cert(data_dir, "libera", "testnick")
|
||||
delete_cert(data_dir, "libera", "testnick")
|
||||
assert not (data_dir / "certs" / "libera").exists()
|
||||
|
||||
|
||||
class TestListCerts:
|
||||
def test_empty(self, data_dir: Path) -> None:
|
||||
assert list_certs(data_dir) == []
|
||||
|
||||
def test_list_all(self, data_dir: Path) -> None:
|
||||
generate_cert(data_dir, "libera", "nick1")
|
||||
generate_cert(data_dir, "oftc", "nick2")
|
||||
certs = list_certs(data_dir)
|
||||
assert len(certs) == 2
|
||||
networks = {c[0] for c in certs}
|
||||
assert networks == {"libera", "oftc"}
|
||||
|
||||
def test_list_by_network(self, data_dir: Path) -> None:
|
||||
generate_cert(data_dir, "libera", "nick1")
|
||||
generate_cert(data_dir, "oftc", "nick2")
|
||||
certs = list_certs(data_dir, network="libera")
|
||||
assert len(certs) == 1
|
||||
assert certs[0][0] == "libera"
|
||||
assert certs[0][1] == "nick1"
|
||||
|
||||
def test_list_multiple_per_network(self, data_dir: Path) -> None:
|
||||
generate_cert(data_dir, "libera", "nick1")
|
||||
generate_cert(data_dir, "libera", "nick2")
|
||||
certs = list_certs(data_dir, network="libera")
|
||||
assert len(certs) == 2
|
||||
nicks = {c[1] for c in certs}
|
||||
assert nicks == {"nick1", "nick2"}
|
||||
|
||||
def test_fingerprints_present(self, data_dir: Path) -> None:
|
||||
generate_cert(data_dir, "libera", "testnick")
|
||||
certs = list_certs(data_dir)
|
||||
assert len(certs) == 1
|
||||
_, _, fp = certs[0]
|
||||
assert ":" in fp
|
||||
assert len(fp.split(":")) == 32
|
||||
@@ -774,6 +774,185 @@ class TestDropCreds:
|
||||
assert "Unknown network" in lines[0]
|
||||
|
||||
|
||||
class TestGencert:
|
||||
@pytest.mark.asyncio
|
||||
async def test_gencert_no_data_dir(self) -> None:
|
||||
commands.DATA_DIR = None
|
||||
router = _make_router()
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("GENCERT libera", router, client)
|
||||
assert "not available" in lines[0]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gencert_missing_arg(self) -> None:
|
||||
commands.DATA_DIR = Path("/tmp")
|
||||
router = _make_router()
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("GENCERT", router, client)
|
||||
assert "Usage" in lines[0]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gencert_unknown_network(self) -> None:
|
||||
commands.DATA_DIR = Path("/tmp")
|
||||
router = _make_router()
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("GENCERT fakenet", router, client)
|
||||
assert "Unknown network" in lines[0]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gencert_with_nick(self, tmp_path: Path) -> None:
|
||||
commands.DATA_DIR = tmp_path
|
||||
net = _make_network("libera", State.READY, nick="fabesune")
|
||||
router = _make_router(net)
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("GENCERT libera testnick", router, client)
|
||||
assert "[GENCERT]" in lines[0]
|
||||
assert "testnick" in lines[0]
|
||||
assert any("fingerprint" in line for line in lines)
|
||||
# Should auto-send CERT ADD since network is ready
|
||||
net.send_raw.assert_awaited()
|
||||
calls = net.send_raw.await_args_list
|
||||
assert any(
|
||||
c.args[0] == "PRIVMSG" and c.args[1] == "NickServ"
|
||||
and "CERT ADD" in c.args[2]
|
||||
for c in calls
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gencert_uses_current_nick(self, tmp_path: Path) -> None:
|
||||
commands.DATA_DIR = tmp_path
|
||||
net = _make_network("libera", State.READY, nick="fabesune")
|
||||
router = _make_router(net)
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("GENCERT libera", router, client)
|
||||
assert "[GENCERT]" in lines[0]
|
||||
assert "fabesune" in lines[0]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gencert_not_ready(self, tmp_path: Path) -> None:
|
||||
commands.DATA_DIR = tmp_path
|
||||
net = _make_network("libera", State.CONNECTING, nick="fabesune")
|
||||
router = _make_router(net)
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("GENCERT libera", router, client)
|
||||
assert "[GENCERT]" in lines[0]
|
||||
assert any("not ready" in line or "manually" in line for line in lines)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gencert_no_nick(self, tmp_path: Path) -> None:
|
||||
commands.DATA_DIR = tmp_path
|
||||
net = _make_network("libera", State.READY, nick="*")
|
||||
router = _make_router(net)
|
||||
router.backlog.get_nickserv_creds_by_network.return_value = None
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("GENCERT libera", router, client)
|
||||
assert "No nick available" in lines[0]
|
||||
|
||||
|
||||
class TestCertfp:
|
||||
def test_certfp_no_data_dir(self) -> None:
|
||||
commands.DATA_DIR = None
|
||||
router = _make_router()
|
||||
lines = _cmd_certfp_sync(router, None)
|
||||
assert "not available" in lines[0]
|
||||
|
||||
def test_certfp_empty(self, tmp_path: Path) -> None:
|
||||
commands.DATA_DIR = tmp_path
|
||||
router = _make_router()
|
||||
lines = _cmd_certfp_sync(router, None)
|
||||
assert "no certificates" in lines[0]
|
||||
|
||||
def test_certfp_lists_certs(self, tmp_path: Path) -> None:
|
||||
from bouncer.cert import generate_cert
|
||||
commands.DATA_DIR = tmp_path
|
||||
generate_cert(tmp_path, "libera", "fabesune")
|
||||
net = _make_network("libera", State.READY)
|
||||
router = _make_router(net)
|
||||
lines = _cmd_certfp_sync(router, None)
|
||||
assert lines[0] == "[CERTFP]"
|
||||
assert any("libera" in line and "fabesune" in line for line in lines)
|
||||
|
||||
def test_certfp_filter_network(self, tmp_path: Path) -> None:
|
||||
from bouncer.cert import generate_cert
|
||||
commands.DATA_DIR = tmp_path
|
||||
generate_cert(tmp_path, "libera", "nick1")
|
||||
generate_cert(tmp_path, "oftc", "nick2")
|
||||
libera = _make_network("libera", State.READY)
|
||||
oftc = _make_network("oftc", State.READY)
|
||||
router = _make_router(libera, oftc)
|
||||
lines = _cmd_certfp_sync(router, "libera")
|
||||
assert lines[0] == "[CERTFP]"
|
||||
assert any("nick1" in line for line in lines)
|
||||
assert not any("nick2" in line for line in lines)
|
||||
|
||||
def test_certfp_unknown_network(self, tmp_path: Path) -> None:
|
||||
commands.DATA_DIR = tmp_path
|
||||
router = _make_router()
|
||||
lines = _cmd_certfp_sync(router, "fakenet")
|
||||
assert "Unknown network" in lines[0]
|
||||
|
||||
|
||||
class TestDelcert:
|
||||
def test_delcert_no_data_dir(self) -> None:
|
||||
commands.DATA_DIR = None
|
||||
router = _make_router()
|
||||
lines = _cmd_delcert_sync(router, "libera")
|
||||
assert "not available" in lines[0]
|
||||
|
||||
def test_delcert_missing_arg(self) -> None:
|
||||
commands.DATA_DIR = Path("/tmp")
|
||||
router = _make_router()
|
||||
lines = _cmd_delcert_sync(router, "")
|
||||
assert "Usage" in lines[0]
|
||||
|
||||
def test_delcert_unknown_network(self) -> None:
|
||||
commands.DATA_DIR = Path("/tmp")
|
||||
router = _make_router()
|
||||
lines = _cmd_delcert_sync(router, "fakenet")
|
||||
assert "Unknown network" in lines[0]
|
||||
|
||||
def test_delcert_removes_cert(self, tmp_path: Path) -> None:
|
||||
from bouncer.cert import generate_cert, has_cert
|
||||
commands.DATA_DIR = tmp_path
|
||||
generate_cert(tmp_path, "libera", "testnick")
|
||||
assert has_cert(tmp_path, "libera", "testnick")
|
||||
net = _make_network("libera", State.READY, nick="testnick")
|
||||
router = _make_router(net)
|
||||
lines = _cmd_delcert_sync(router, "libera testnick")
|
||||
assert "[DELCERT]" in lines[0]
|
||||
assert "deleted" in lines[0]
|
||||
assert not has_cert(tmp_path, "libera", "testnick")
|
||||
|
||||
def test_delcert_nonexistent(self, tmp_path: Path) -> None:
|
||||
commands.DATA_DIR = tmp_path
|
||||
net = _make_network("libera", State.READY, nick="testnick")
|
||||
router = _make_router(net)
|
||||
lines = _cmd_delcert_sync(router, "libera testnick")
|
||||
assert "no cert found" in lines[0]
|
||||
|
||||
def test_delcert_uses_current_nick(self, tmp_path: Path) -> None:
|
||||
from bouncer.cert import generate_cert, has_cert
|
||||
commands.DATA_DIR = tmp_path
|
||||
generate_cert(tmp_path, "libera", "fabesune")
|
||||
net = _make_network("libera", State.READY, nick="fabesune")
|
||||
router = _make_router(net)
|
||||
lines = _cmd_delcert_sync(router, "libera")
|
||||
assert "deleted" in lines[0]
|
||||
assert not has_cert(tmp_path, "libera", "fabesune")
|
||||
|
||||
|
||||
def _cmd_certfp_sync(router: MagicMock, network_name: str | None) -> list[str]:
|
||||
"""Call _cmd_certfp synchronously (it's not async)."""
|
||||
from bouncer.commands import _cmd_certfp
|
||||
return _cmd_certfp(router, network_name)
|
||||
|
||||
|
||||
def _cmd_delcert_sync(router: MagicMock, arg: str) -> list[str]:
|
||||
"""Call _cmd_delcert synchronously (it's not async)."""
|
||||
from bouncer.commands import _cmd_delcert
|
||||
return _cmd_delcert(router, arg)
|
||||
|
||||
|
||||
class TestUnknownCommand:
|
||||
@pytest.mark.asyncio
|
||||
async def test_unknown_command(self) -> None:
|
||||
|
||||
Reference in New Issue
Block a user