diff --git a/.gitignore b/.gitignore index 548cdb8..dfb9ff0 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,6 @@ build/ # Personal config (keep example only) config/bouncer.toml + +# Client certificates (generated per-network) +certs/ diff --git a/Containerfile b/Containerfile index e1e6907..c6cf545 100644 --- a/Containerfile +++ b/Containerfile @@ -4,7 +4,10 @@ WORKDIR /app RUN pip install --no-cache-dir \ "python-socks[asyncio]>=2.4" \ - "aiosqlite>=0.19" + "aiosqlite>=0.19" \ + "aiohttp>=3.9" \ + "aiohttp-socks>=0.8" \ + "cryptography>=41.0" ENV PYTHONUNBUFFERED=1 ENV PYTHONDONTWRITEBYTECODE=1 diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index 3637da1..54dd716 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -87,6 +87,17 @@ PASS # authenticate (all networks) /msg *bouncer DROPCREDS libera nick # delete one nick's creds ``` +### CertFP + +``` +/msg *bouncer GENCERT libera # generate cert (current nick) +/msg *bouncer GENCERT libera nick # generate cert (specific nick) +/msg *bouncer CERTFP # list all cert fingerprints +/msg *bouncer CERTFP libera # list certs for one network +/msg *bouncer DELCERT libera # delete cert (current nick) +/msg *bouncer DELCERT libera nick # delete cert (specific nick) +``` + ## Namespacing ``` @@ -114,6 +125,12 @@ DISCONNECTED -> CONNECTING -> REGISTERING -> PROBATION (15s) -> READY | PROBATION | 15s wait, watching for K-line | | READY | Switch to configured nick, join channels | +## Auth Cascade + +``` +SASL EXTERNAL (cert + creds) > SASL PLAIN (creds) > NickServ IDENTIFY +``` + ## Reconnect Backoff ``` @@ -144,6 +161,7 @@ password # optional, IRC server PASS | `config/bouncer.toml` | Active config (gitignored) | | `config/bouncer.example.toml` | Example template | | `config/bouncer.db` | SQLite backlog (auto-created) | +| `{data_dir}/certs/{net}/{nick}.pem` | Client certificates (auto-created) | ## Backlog Queries @@ -167,10 +185,11 @@ src/bouncer/ config.py # TOML loader irc.py # IRC message parse/format namespace.py # /network encode/decode for multiplexing - proxy.py # SOCKS5 connector (local DNS, multi-IP) - network.py # server connection + state machine + proxy.py # SOCKS5 connector (local DNS, multi-IP, CertFP) + network.py # server connection + state machine + SASL client.py # client session handler - commands.py # 22 bouncer control commands (/msg *bouncer) + cert.py # client certificate generation + management + commands.py # 25 bouncer control commands (/msg *bouncer) router.py # message routing + backlog trigger server.py # TCP listener backlog.py # SQLite store/replay/prune diff --git a/docs/USAGE.md b/docs/USAGE.md index 1b4a464..717b0a0 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -196,6 +196,55 @@ autojoin = true # auto-join channels on ready (default: true) password = "" # IRC server password (optional, for PASS command) ``` +## CertFP Authentication + +The bouncer supports client certificate fingerprint (CertFP) authentication +via SASL EXTERNAL. Each certificate is unique per (network, nick) pair and +stored as a combined PEM file at `{data_dir}/certs/{network}/{nick}.pem`. + +### Authentication Cascade + +When connecting, the bouncer selects the strongest available method: + +| Priority | Method | Condition | +|----------|--------|-----------| +| 1 | SASL EXTERNAL | Stored creds + cert file exists | +| 2 | SASL PLAIN | Stored creds, no cert | +| 3 | NickServ IDENTIFY | Fallback after SASL failure | + +### Setup + +1. Generate a certificate: + ``` + /msg *bouncer GENCERT libera + ``` + This creates an EC P-256 self-signed cert (10-year validity) and + auto-sends `NickServ CERT ADD ` if the network is connected. + +2. Reconnect to use CertFP: + ``` + /msg *bouncer RECONNECT libera + ``` + The bouncer will now present the client certificate during TLS and + authenticate via SASL EXTERNAL. + +3. Verify the fingerprint is registered: + ``` + /msg *bouncer CERTFP libera + ``` + +### Certificate Storage + +Certificates are stored alongside the config file: + +``` +{data_dir}/certs/ + libera/ + fabesune.pem # cert + private key (chmod 600) + oftc/ + mynick.pem +``` + ## Bouncer Commands Send a PRIVMSG to `*bouncer` (or `bouncer`) from your IRC client to inspect @@ -248,6 +297,14 @@ Responses arrive as NOTICE messages from `*bouncer`. | `REGISTER ` | Trigger NickServ registration attempt | | `DROPCREDS [nick]` | Delete stored NickServ credentials | +### CertFP + +| Command | Description | +|---------|-------------| +| `GENCERT [nick]` | Generate client cert, auto-register with NickServ | +| `CERTFP [network]` | Show certificate fingerprints (all or per-network) | +| `DELCERT [nick]` | Delete a client certificate | + ### Examples ``` @@ -272,6 +329,12 @@ Responses arrive as NOTICE messages from `*bouncer`. /msg *bouncer REGISTER libera /msg *bouncer DROPCREDS libera /msg *bouncer DROPCREDS libera oldnick +/msg *bouncer GENCERT libera +/msg *bouncer GENCERT libera fabesune +/msg *bouncer CERTFP +/msg *bouncer CERTFP libera +/msg *bouncer DELCERT libera +/msg *bouncer DELCERT libera fabesune ``` ### Example Output diff --git a/pyproject.toml b/pyproject.toml index 8075c3f..92b6769 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,9 @@ requires-python = ">=3.10" dependencies = [ "python-socks[asyncio]>=2.4", "aiosqlite>=0.19", + "aiohttp>=3.9", + "aiohttp-socks>=0.8", + "cryptography>=41.0", ] [project.optional-dependencies] @@ -19,6 +22,9 @@ dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", ] +browser = [ + "playwright>=1.40", +] [project.scripts] bouncer = "bouncer.__main__:main" diff --git a/src/bouncer/__main__.py b/src/bouncer/__main__.py index 1971023..4ce55b8 100644 --- a/src/bouncer/__main__.py +++ b/src/bouncer/__main__.py @@ -46,8 +46,9 @@ async def _run(config_path: Path, verbose: bool) -> None: commands.STARTUP_TIME = time.time() commands.CONFIG_PATH = config_path + commands.DATA_DIR = data_dir - router = Router(cfg, backlog) + router = Router(cfg, backlog, data_dir=data_dir) await router.start_networks() server = await start(cfg.bouncer, router) diff --git a/src/bouncer/cert.py b/src/bouncer/cert.py new file mode 100644 index 0000000..44649ff --- /dev/null +++ b/src/bouncer/cert.py @@ -0,0 +1,126 @@ +"""Client certificate management for CertFP authentication.""" + +from __future__ import annotations + +import datetime +import logging +import os +from pathlib import Path + +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.x509.oid import NameOID + +log = logging.getLogger(__name__) + +_VALIDITY_DAYS = 3650 # ~10 years + + +def cert_path(data_dir: Path, network: str, nick: str) -> Path: + """Return the PEM file path for a (network, nick) pair.""" + return data_dir / "certs" / network / f"{nick}.pem" + + +def generate_cert(data_dir: Path, network: str, nick: str) -> Path: + """Generate a self-signed EC P-256 client certificate. + + Creates a combined PEM file (cert + key) at the standard path. + Returns the path to the generated file. + """ + pem = cert_path(data_dir, network, nick) + pem.parent.mkdir(parents=True, exist_ok=True) + + key = ec.generate_private_key(ec.SECP256R1()) + + subject = issuer = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, f"{nick}@{network}"), + ]) + + now = datetime.datetime.now(datetime.timezone.utc) + cert = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(now) + .not_valid_after(now + datetime.timedelta(days=_VALIDITY_DAYS)) + .sign(key, hashes.SHA256()) + ) + + key_bytes = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + cert_bytes = cert.public_bytes(serialization.Encoding.PEM) + + pem.write_bytes(cert_bytes + key_bytes) + os.chmod(pem, 0o600) + + log.info("generated cert %s (CN=%s@%s)", pem, nick, network) + return pem + + +def fingerprint(pem_path: Path) -> str: + """Return the SHA-256 fingerprint in colon-separated uppercase hex. + + This is the format NickServ expects for CERT ADD. + """ + cert_data = pem_path.read_bytes() + cert = x509.load_pem_x509_certificate(cert_data) + digest = cert.fingerprint(hashes.SHA256()) + return ":".join(f"{b:02X}" for b in digest) + + +def has_cert(data_dir: Path, network: str, nick: str) -> bool: + """Check whether a certificate exists for (network, nick).""" + return cert_path(data_dir, network, nick).is_file() + + +def delete_cert(data_dir: Path, network: str, nick: str) -> bool: + """Delete the certificate for (network, nick). Returns True if removed.""" + pem = cert_path(data_dir, network, nick) + if pem.is_file(): + pem.unlink() + log.info("deleted cert %s", pem) + # Clean up empty directories + try: + pem.parent.rmdir() + except OSError: + pass + return True + return False + + +def list_certs( + data_dir: Path, network: str | None = None, +) -> list[tuple[str, str, str]]: + """List certificates as (network, nick, fingerprint) tuples. + + If network is given, only lists certs for that network. + """ + certs_dir = data_dir / "certs" + if not certs_dir.is_dir(): + return [] + + results: list[tuple[str, str, str]] = [] + + if network: + net_dir = certs_dir / network + if net_dir.is_dir(): + for pem_file in sorted(net_dir.glob("*.pem")): + nick = pem_file.stem + fp = fingerprint(pem_file) + results.append((network, nick, fp)) + else: + for net_dir in sorted(certs_dir.iterdir()): + if not net_dir.is_dir(): + continue + for pem_file in sorted(net_dir.glob("*.pem")): + nick = pem_file.stem + fp = fingerprint(pem_file) + results.append((net_dir.name, nick, fp)) + + return results diff --git a/src/bouncer/commands.py b/src/bouncer/commands.py index 278edfb..751818d 100644 --- a/src/bouncer/commands.py +++ b/src/bouncer/commands.py @@ -17,6 +17,7 @@ if TYPE_CHECKING: # Set by __main__.py before entering the event loop STARTUP_TIME: float = 0.0 CONFIG_PATH: Path | None = None +DATA_DIR: Path | None = None _COMMANDS: dict[str, str] = { "HELP": "List available commands", @@ -41,6 +42,9 @@ _COMMANDS: dict[str, str] = { "IDENTIFY": "Force NickServ IDENTIFY (IDENTIFY )", "REGISTER": "Trigger NickServ registration (REGISTER )", "DROPCREDS": "Delete stored NickServ creds (DROPCREDS [nick])", + "GENCERT": "Generate client cert (GENCERT [nick])", + "CERTFP": "Show cert fingerprints (CERTFP [network])", + "DELCERT": "Delete client cert (DELCERT [nick])", } @@ -97,6 +101,12 @@ async def dispatch(text: str, router: Router, client: Client) -> list[str]: return await _cmd_register(router, arg) if cmd == "DROPCREDS": return await _cmd_dropcreds(router, arg) + if cmd == "GENCERT": + return await _cmd_gencert(router, arg) + if cmd == "CERTFP": + return _cmd_certfp(router, arg or None) + if cmd == "DELCERT": + return _cmd_delcert(router, arg) return [f"Unknown command: {cmd}", "Use HELP for available commands."] @@ -655,3 +665,111 @@ async def _cmd_dropcreds(router: Router, arg: str) -> list[str]: lines.append(f" deleted: {nick}") return lines + + +# --- CertFP --- + + +async def _cmd_gencert(router: Router, arg: str) -> list[str]: + """Generate a client certificate for CertFP authentication.""" + from bouncer.cert import fingerprint, generate_cert + + if not DATA_DIR: + return ["[GENCERT] data directory not available"] + + parts = arg.split(None, 1) + if not parts: + return ["Usage: GENCERT [nick]"] + + net, err = _resolve_network(router, parts[0]) + if err: + return err + + # Determine nick: explicit arg > current network nick > stored creds + if len(parts) >= 2: + nick = parts[1] + elif net.nick and net.nick != "*": + nick = net.nick + elif router.backlog: + creds = await router.backlog.get_nickserv_creds_by_network(net.cfg.name) + if creds: + nick = creds[0] + else: + return [f"No nick available for {net.cfg.name}", + "Usage: GENCERT "] + else: + return [f"No nick available for {net.cfg.name}", + "Usage: GENCERT "] + + pem = generate_cert(DATA_DIR, net.cfg.name, nick) + fp = fingerprint(pem) + + lines = [f"[GENCERT] {net.cfg.name}/{nick}"] + lines.append(f" fingerprint: {fp}") + lines.append(f" path: {pem}") + + # Auto-register with NickServ if connected + if net.ready: + await net.send_raw("PRIVMSG", "NickServ", f"CERT ADD {fp}") + lines.append(" sent: NickServ CERT ADD") + else: + lines.append(" (network not ready, register fingerprint manually)") + lines.append(f" /msg NickServ CERT ADD {fp}") + + return lines + + +def _cmd_certfp(router: Router, network_name: str | None) -> list[str]: + """List client certificate fingerprints.""" + from bouncer.cert import list_certs + + if not DATA_DIR: + return ["[CERTFP] data directory not available"] + + net_filter = network_name.lower() if network_name else None + if net_filter and net_filter not in router.networks: + names = ", ".join(sorted(router.networks)) + return [f"Unknown network: {network_name}", f"Available: {names}"] + + certs = list_certs(DATA_DIR, network=net_filter) + if not certs: + scope = net_filter or "any network" + return [f"[CERTFP] no certificates for {scope}"] + + lines = ["[CERTFP]"] + for net, nick, fp in certs: + lines.append(f" {net} {nick} {fp}") + + return lines + + +def _cmd_delcert(router: Router, arg: str) -> list[str]: + """Delete a client certificate.""" + from bouncer.cert import delete_cert + + if not DATA_DIR: + return ["[DELCERT] data directory not available"] + + parts = arg.split(None, 1) + if not parts: + return ["Usage: DELCERT [nick]"] + + net_name = parts[0].lower() + if net_name not in router.networks: + names = ", ".join(sorted(router.networks)) + return [f"Unknown network: {net_name}", f"Available: {names}"] + + net = router.networks[net_name] + + if len(parts) >= 2: + nick = parts[1] + elif net.nick and net.nick != "*": + nick = net.nick + else: + return [f"No nick specified for {net_name}", + "Usage: DELCERT "] + + if delete_cert(DATA_DIR, net_name, nick): + return [f"[DELCERT] deleted cert for {net_name}/{nick}"] + else: + return [f"[DELCERT] no cert found for {net_name}/{nick}"] diff --git a/src/bouncer/email.py b/src/bouncer/email.py new file mode 100644 index 0000000..a7b0e39 --- /dev/null +++ b/src/bouncer/email.py @@ -0,0 +1,736 @@ +"""Temp email client for NickServ verification. + +Supports multiple providers: +- Guerrilla Mail (API) -- sharklasers.com, guerrillamail.com, grr.la, etc. +- Mail.tm / Mail.gw (API) -- domains fetched dynamically +- YOPmail (Playwright) -- yopmail.com, yopmail.fr, yopmail.net +- TrashMailr (Playwright) -- discard.email, discardmail.com, tempr.email +- Temp-Mail.org (Playwright) -- domains fetched dynamically + +All HTTP requests go through the SOCKS5 proxy. Proxy circuits may drop at +any time, so every individual request is retried with exponential backoff +and a fresh connector on each attempt. + +Playwright providers use the proxy for all browser traffic and retry +individual poll attempts on failure. +""" + +from __future__ import annotations + +import asyncio +import html +import json +import logging +import re + +import aiohttp +from aiohttp_socks import ProxyConnector + +log = logging.getLogger(__name__) + +POLL_INTERVAL = 15 # seconds between inbox checks +MAX_POLLS = 30 # ~7.5 minutes total +REQUEST_TIMEOUT = 20 # per-request timeout +REQUEST_RETRIES = 4 # retries per API call +RETRY_BACKOFF = [2, 5, 10, 20] # seconds between retries +PW_PAGE_TIMEOUT = 20000 # playwright page load timeout (ms) + +# --------------------------------------------------------------------------- +# Domain registry +# --------------------------------------------------------------------------- + +# Guerrilla Mail API domains +GUERRILLA_DOMAINS = { + "sharklasers.com", "guerrillamail.com", "grr.la", "guerrillamail.de", + "guerrillamail.net", "guerrillamail.org", "guerrillamailblock.com", + "pokemail.net", "spam4.me", +} + +# YOPmail domains (Playwright) +YOPMAIL_DOMAINS = {"yopmail.com", "yopmail.fr", "yopmail.net"} + +# TrashMailr / discard.email domains (Playwright) +TRASHMAILR_DOMAINS = {"discard.email", "discardmail.com", "tempr.email"} + +# Mail.tm / Mail.gw REST API +MAILTM_APIS = ["https://api.mail.tm", "https://api.mail.gw"] +_mailtm_domains: set[str] = set() +_mailtm_domain_api: dict[str, str] = {} # domain -> api_base + +# Temp-Mail.org (Playwright) -- domains fetched at runtime +_tempmail_domains: set[str] = set() + +GUERRILLA_API = "https://api.guerrillamail.com/ajax.php" + + +def get_all_domains() -> list[str]: + """Return all currently known email domains (static + dynamically fetched).""" + return sorted( + GUERRILLA_DOMAINS | YOPMAIL_DOMAINS | TRASHMAILR_DOMAINS + | _mailtm_domains | _tempmail_domains + ) + + +async def fetch_extra_domains(proxy_host: str, proxy_port: int) -> set[str]: + """Fetch additional domains from mail.tm/gw APIs. + + Returns newly discovered domains and updates the module-level cache. + """ + discovered: set[str] = set() + for api_base in MAILTM_APIS: + data = await _proxy_json( + proxy_host, proxy_port, f"{api_base}/domains", method="GET", + ) + if not data: + continue + members = data.get("hydra:member", []) if isinstance(data, dict) else [] + for entry in members: + domain = entry.get("domain", "") + if domain and entry.get("isActive", True): + discovered.add(domain) + _mailtm_domain_api[domain] = api_base + + if discovered: + _mailtm_domains.update(discovered) + log.info("mail.tm/gw domains: %s", ", ".join(sorted(discovered))) + return discovered + + +async def verify_email( + email_addr: str, + proxy_host: str = "127.0.0.1", + proxy_port: int = 1080, +) -> VerifyResult | None: + """Poll temp email provider for a NickServ verification code. + + Routes to the correct provider based on email domain. + """ + if "@" not in email_addr: + return None + + _, domain = email_addr.rsplit("@", 1) + + if domain in GUERRILLA_DOMAINS: + return await _guerrilla_verify(email_addr, proxy_host, proxy_port) + elif domain in YOPMAIL_DOMAINS: + return await _yopmail_verify(email_addr, proxy_host, proxy_port) + elif domain in TRASHMAILR_DOMAINS: + return await _trashmailr_verify(email_addr, proxy_host, proxy_port) + elif domain in _mailtm_domains: + api_base = _mailtm_domain_api.get(domain, MAILTM_APIS[0]) + return await _mailtm_verify(email_addr, api_base, proxy_host, proxy_port) + elif domain in _tempmail_domains: + return await _tempmail_verify(email_addr, proxy_host, proxy_port) + else: + log.warning("unsupported email domain: %s", domain) + return None + + +# --------------------------------------------------------------------------- +# Resilient HTTP helper +# --------------------------------------------------------------------------- + +async def _proxy_json( + proxy_host: str, proxy_port: int, url: str, *, + method: str = "GET", params: dict | None = None, + json_body: dict | None = None, headers: dict | None = None, + bearer: str | None = None, +) -> dict | list | None: + """HTTP request through SOCKS5 proxy with retries and fresh connector. + + Returns parsed JSON on success, None on total failure. + """ + for attempt in range(REQUEST_RETRIES): + connector = None + try: + proxy_url = f"socks5://{proxy_host}:{proxy_port}" + connector = ProxyConnector.from_url(proxy_url) + timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT) + hdrs = dict(headers or {}) + if bearer: + hdrs["Authorization"] = f"Bearer {bearer}" + async with aiohttp.ClientSession( + connector=connector, timeout=timeout, + ) as session: + req_kwargs: dict = {"headers": hdrs} + if params: + req_kwargs["params"] = params + if json_body is not None: + req_kwargs["json"] = json_body + async with session.request(method, url, **req_kwargs) as resp: + raw = await resp.read() + if not raw: + raise aiohttp.ClientError("empty response") + text = raw.decode("utf-8", errors="replace") + if text.startswith("Could not load"): + raise aiohttp.ClientError(f"bad response: {text[:80]}") + return json.loads(raw) + except asyncio.CancelledError: + raise + except Exception: + delay = RETRY_BACKOFF[min(attempt, len(RETRY_BACKOFF) - 1)] + log.debug("request %s failed (attempt %d/%d), retrying in %ds", + url, attempt + 1, REQUEST_RETRIES, delay, exc_info=True) + await asyncio.sleep(delay) + finally: + if connector: + await connector.close() + + log.warning("request failed after %d attempts: %s", REQUEST_RETRIES, url) + return None + + +def _pw_proxy(proxy_host: str, proxy_port: int) -> dict: + """Build Playwright proxy config.""" + return {"server": f"socks5://{proxy_host}:{proxy_port}"} + + +async def _pw_launch(pw, proxy_host: str, proxy_port: int): + """Launch a headless Chromium browser with SOCKS5 proxy.""" + browser = await pw.chromium.launch(headless=True) + context = await browser.new_context(proxy=_pw_proxy(proxy_host, proxy_port)) + page = await context.new_page() + return browser, page + + +# --------------------------------------------------------------------------- +# Guerrilla Mail (JSON API) +# --------------------------------------------------------------------------- + +async def _guerrilla_verify( + email_addr: str, proxy_host: str, proxy_port: int, +) -> VerifyResult | None: + """Poll guerrillamail API for verification code.""" + local, domain = email_addr.rsplit("@", 1) + + sid = await _gm_init(proxy_host, proxy_port, local, domain) + if not sid: + log.warning("failed to init guerrillamail session for %s", email_addr) + return None + + log.info("polling guerrillamail for %s (sid=%s...)", email_addr, sid[:8]) + + for attempt in range(MAX_POLLS): + await asyncio.sleep(POLL_INTERVAL) + try: + result = await _gm_check(proxy_host, proxy_port, sid) + if result: + log.info("found verification code for %s: %s", email_addr, result) + return result + log.debug("poll %d/%d: no verification email yet", attempt + 1, MAX_POLLS) + except asyncio.CancelledError: + raise + except Exception: + log.debug("poll %d/%d failed, will retry next cycle", + attempt + 1, MAX_POLLS, exc_info=True) + + log.warning("gave up polling guerrillamail for %s", email_addr) + return None + + +async def _gm_init( + proxy_host: str, proxy_port: int, local: str, domain: str, +) -> str | None: + """Initialize guerrillamail session and claim address.""" + data = await _proxy_json(proxy_host, proxy_port, GUERRILLA_API, params={ + "f": "get_email_address", "lang": "en", "site": domain, + }) + if not data or not isinstance(data, dict): + return None + sid = data.get("sid_token", "") + if not sid: + return None + + data = await _proxy_json(proxy_host, proxy_port, GUERRILLA_API, params={ + "f": "set_email_user", + "email_user": local, + "site": domain, + "lang": "en", + "sid_token": sid, + }) + if data and isinstance(data, dict): + sid = data.get("sid_token", sid) + claimed = data.get("email_addr", "") + log.debug("claimed email: %s (sid=%s...)", claimed, sid[:8]) + + return sid + + +async def _gm_check( + proxy_host: str, proxy_port: int, sid: str, +) -> VerifyResult | None: + """Check guerrillamail inbox for verification code.""" + data = await _proxy_json(proxy_host, proxy_port, GUERRILLA_API, params={ + "f": "check_email", "seq": "0", "sid_token": sid, + }) + if not data or not isinstance(data, dict): + return None + + emails = data.get("list", []) + if not emails: + return None + + sid_new = data.get("sid_token", sid) + + for email in emails: + subject = email.get("mail_subject", "").lower() + mail_from = email.get("mail_from", "").lower() + + if not _is_nickserv_email(subject, mail_from): + continue + + mail_id = email.get("mail_id") + if not mail_id: + continue + + mail_data = await _proxy_json(proxy_host, proxy_port, GUERRILLA_API, params={ + "f": "fetch_email", "email_id": mail_id, "sid_token": sid_new, + }) + if not mail_data or not isinstance(mail_data, dict): + continue + + body = mail_data.get("mail_body", "") + result = extract_code(body) + if result: + return result + + return None + + +# --------------------------------------------------------------------------- +# Mail.tm / Mail.gw (REST API) +# --------------------------------------------------------------------------- + +async def _mailtm_verify( + email_addr: str, api_base: str, proxy_host: str, proxy_port: int, +) -> VerifyResult | None: + """Poll mail.tm/gw API for verification code.""" + local, domain = email_addr.rsplit("@", 1) + password = f"{local}Pass1!" # mail.tm requires 6+ chars + + # Create account + data = await _proxy_json( + proxy_host, proxy_port, f"{api_base}/accounts", + method="POST", + json_body={"address": email_addr, "password": password}, + ) + if not data or not isinstance(data, dict) or not data.get("id"): + log.warning("failed to create mail.tm account for %s", email_addr) + return None + + # Get auth token + token_data = await _proxy_json( + proxy_host, proxy_port, f"{api_base}/token", + method="POST", + json_body={"address": email_addr, "password": password}, + ) + if not token_data or not isinstance(token_data, dict): + log.warning("failed to get mail.tm token for %s", email_addr) + return None + token = token_data.get("token", "") + if not token: + return None + + log.info("polling mail.tm for %s (api=%s)", email_addr, api_base) + + for attempt in range(MAX_POLLS): + await asyncio.sleep(POLL_INTERVAL) + try: + msgs = await _proxy_json( + proxy_host, proxy_port, f"{api_base}/messages", + bearer=token, + ) + if not msgs or not isinstance(msgs, dict): + continue + + for msg in msgs.get("hydra:member", []): + subject = msg.get("subject", "").lower() + from_addr = "" + if msg.get("from"): + from_addr = msg["from"].get("address", "").lower() + if not _is_nickserv_email(subject, from_addr): + continue + + msg_id = msg.get("id") + if not msg_id: + continue + + detail = await _proxy_json( + proxy_host, proxy_port, f"{api_base}/messages/{msg_id}", + bearer=token, + ) + if not detail or not isinstance(detail, dict): + continue + + body = detail.get("html", "") or detail.get("text", "") or "" + result = extract_code(body) + if result: + log.info("found verification code for %s: %s", email_addr, result) + return result + + log.debug("poll %d/%d: no verification email yet", attempt + 1, MAX_POLLS) + except asyncio.CancelledError: + raise + except Exception: + log.debug("mail.tm poll %d/%d failed", attempt + 1, MAX_POLLS, exc_info=True) + + log.warning("gave up polling mail.tm for %s", email_addr) + return None + + +# --------------------------------------------------------------------------- +# YOPmail (Playwright) +# --------------------------------------------------------------------------- + +async def _yopmail_verify( + email_addr: str, proxy_host: str, proxy_port: int, +) -> VerifyResult | None: + """Poll yopmail via Playwright for verification code.""" + try: + from playwright.async_api import async_playwright + except ImportError: + log.error("playwright not installed, cannot check yopmail") + return None + + local = email_addr.rsplit("@", 1)[0] + + try: + async with async_playwright() as p: + browser, page = await _pw_launch(p, proxy_host, proxy_port) + log.info("polling yopmail for %s", email_addr) + + for attempt in range(MAX_POLLS): + await asyncio.sleep(POLL_INTERVAL) + try: + await page.goto( + f"https://yopmail.com/en/?login={local}", + wait_until="networkidle", + timeout=PW_PAGE_TIMEOUT, + ) + + inbox_frame = page.frame("ifinbox") + if not inbox_frame: + continue + + first_mail = inbox_frame.locator(".m") + if await first_mail.count() == 0: + log.debug("poll %d/%d: yopmail inbox empty", attempt + 1, MAX_POLLS) + continue + + await first_mail.first.click() + await asyncio.sleep(2) + + mail_frame = page.frame("ifmail") + if not mail_frame: + continue + + body = await mail_frame.locator("body").inner_html() + result = extract_code(body) + if result: + log.info("found verification code for %s: %s", email_addr, result) + await browser.close() + return result + + except Exception: + log.debug("yopmail poll %d failed", attempt + 1, exc_info=True) + + log.warning("gave up polling yopmail for %s", email_addr) + await browser.close() + return None + + except asyncio.CancelledError: + raise + except Exception: + log.exception("yopmail verification failed for %s", email_addr) + return None + + +# --------------------------------------------------------------------------- +# TrashMailr / discard.email (Playwright) +# --------------------------------------------------------------------------- + +async def _trashmailr_verify( + email_addr: str, proxy_host: str, proxy_port: int, +) -> VerifyResult | None: + """Poll trashmailr.com via Playwright for verification code. + + Works for discard.email, discardmail.com, tempr.email domains. + """ + try: + from playwright.async_api import async_playwright + except ImportError: + log.error("playwright not installed, cannot check trashmailr") + return None + + local, domain = email_addr.rsplit("@", 1) + + try: + async with async_playwright() as p: + browser, page = await _pw_launch(p, proxy_host, proxy_port) + log.info("polling trashmailr for %s", email_addr) + + for attempt in range(MAX_POLLS): + await asyncio.sleep(POLL_INTERVAL) + try: + # Navigate to inbox list for this address + url = f"https://trashmailr.com/inbox/list.htm?mailAddress={local}@{domain}" + await page.goto(url, wait_until="networkidle", timeout=PW_PAGE_TIMEOUT) + + # Wait for mail list to render + try: + await page.wait_for_selector( + ".mailList, .mail-list, .noMails, .no-mails", + timeout=5000, + ) + except Exception: + pass + + # Check for email rows -- trashmailr uses table rows or divs + rows = page.locator(".mailList tr, .mail-item, [data-mail-id]") + count = await rows.count() + if count == 0: + log.debug("poll %d/%d: trashmailr inbox empty", attempt + 1, MAX_POLLS) + continue + + # Check each email + for i in range(count): + row = rows.nth(i) + row_text = (await row.inner_text()).lower() + if not _is_nickserv_email(row_text, ""): + continue + + # Click to open the email + await row.click() + await asyncio.sleep(2) + + # Read the email body + body_el = page.locator(".mailBody, .mail-body, .mailContent, .mail-content") + if await body_el.count() > 0: + body = await body_el.first.inner_html() + result = extract_code(body) + if result: + log.info("found verification code for %s: %s", email_addr, result) + await browser.close() + return result + + # Also try the full page body as fallback + body = await page.locator("body").inner_html() + result = extract_code(body) + if result: + log.info("found verification code for %s: %s", email_addr, result) + await browser.close() + return result + + except Exception: + log.debug("trashmailr poll %d failed", attempt + 1, exc_info=True) + + log.warning("gave up polling trashmailr for %s", email_addr) + await browser.close() + return None + + except asyncio.CancelledError: + raise + except Exception: + log.exception("trashmailr verification failed for %s", email_addr) + return None + + +# --------------------------------------------------------------------------- +# Temp-Mail.org (Playwright) +# --------------------------------------------------------------------------- + +async def _tempmail_verify( + email_addr: str, proxy_host: str, proxy_port: int, +) -> VerifyResult | None: + """Poll temp-mail.org via Playwright for verification code.""" + try: + from playwright.async_api import async_playwright + except ImportError: + log.error("playwright not installed, cannot check temp-mail.org") + return None + + local, domain = email_addr.rsplit("@", 1) + + try: + async with async_playwright() as p: + browser, page = await _pw_launch(p, proxy_host, proxy_port) + log.info("polling temp-mail.org for %s", email_addr) + + # Set the email address + await page.goto("https://temp-mail.org/en/", + wait_until="networkidle", timeout=PW_PAGE_TIMEOUT) + + # Try to set custom address via the change button + change_btn = page.locator("#click-to-edit, .click-to-edit, [data-clipboard-action]") + if await change_btn.count() > 0: + await change_btn.first.click() + await asyncio.sleep(1) + + input_field = page.locator("#mail-input, input[name='mail']") + if await input_field.count() > 0: + await input_field.first.fill(local) + + # Select domain from dropdown if available + domain_select = page.locator("select, .domain-select") + if await domain_select.count() > 0: + await domain_select.first.select_option(label=domain) + + save_btn = page.locator("#save, .save-btn, button[type='submit']") + if await save_btn.count() > 0: + await save_btn.first.click() + await asyncio.sleep(2) + + for attempt in range(MAX_POLLS): + await asyncio.sleep(POLL_INTERVAL) + try: + # Refresh the inbox + refresh_btn = page.locator("#refresh, .refresh, [data-type='refresh']") + if await refresh_btn.count() > 0: + await refresh_btn.first.click() + await asyncio.sleep(3) + else: + await page.reload(wait_until="networkidle", timeout=PW_PAGE_TIMEOUT) + + # Check for emails in the inbox + mail_items = page.locator(".inbox-dataList li, .mail-item, .message-list-item") + count = await mail_items.count() + if count == 0: + log.debug("poll %d/%d: temp-mail inbox empty", attempt + 1, MAX_POLLS) + continue + + for i in range(count): + item = mail_items.nth(i) + item_text = (await item.inner_text()).lower() + if not _is_nickserv_email(item_text, ""): + continue + + await item.click() + await asyncio.sleep(2) + + # Read email body + body_el = page.locator(".inbox-data-content, .mail-text, .message-body") + if await body_el.count() > 0: + body = await body_el.first.inner_html() + result = extract_code(body) + if result: + log.info("found verification code for %s: %s", email_addr, result) + await browser.close() + return result + + except Exception: + log.debug("temp-mail poll %d failed", attempt + 1, exc_info=True) + + log.warning("gave up polling temp-mail.org for %s", email_addr) + await browser.close() + return None + + except asyncio.CancelledError: + raise + except Exception: + log.exception("temp-mail.org verification failed for %s", email_addr) + return None + + +async def fetch_tempmail_domains(proxy_host: str, proxy_port: int) -> set[str]: + """Fetch available domains from temp-mail.org via Playwright. + + Updates the module-level _tempmail_domains cache. + """ + try: + from playwright.async_api import async_playwright + except ImportError: + return set() + + discovered: set[str] = set() + try: + async with async_playwright() as p: + browser, page = await _pw_launch(p, proxy_host, proxy_port) + await page.goto("https://temp-mail.org/en/", + wait_until="networkidle", timeout=PW_PAGE_TIMEOUT) + + # The domain is shown in the email display or a dropdown + # Try to find domain options + options = page.locator("select option, .domain-item") + count = await options.count() + for i in range(count): + text = (await options.nth(i).inner_text()).strip() + if "." in text and "@" not in text: + # Clean up domain (remove leading @) + domain = text.lstrip("@").strip() + if domain: + discovered.add(domain) + + # Also try to read the current email address for its domain + addr_el = page.locator("#mail-address, .mail-address, #click-to-copy") + if await addr_el.count() > 0: + addr_text = (await addr_el.first.inner_text()).strip() + if "@" in addr_text: + domain = addr_text.rsplit("@", 1)[1] + discovered.add(domain) + + await browser.close() + + except Exception: + log.debug("failed to fetch temp-mail.org domains", exc_info=True) + + if discovered: + _tempmail_domains.update(discovered) + log.info("temp-mail.org domains: %s", ", ".join(sorted(discovered))) + return discovered + + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + +def _is_nickserv_email(subject: str, mail_from: str) -> bool: + """Check if an email looks like a NickServ verification.""" + combined = subject + " " + mail_from + return any(kw in combined for kw in ( + "nickserv", "verify", "registration", "confirm", "activate", + )) + + +class VerifyResult: + """Parsed verification code with the command format to use.""" + + __slots__ = ("code", "style") + + def __init__(self, code: str, style: str) -> None: + self.code = code + self.style = style # "atheme" or "anope" + + def __repr__(self) -> str: + return f"VerifyResult({self.code!r}, {self.style!r})" + + +def extract_code(body: str) -> VerifyResult | None: + """Extract NickServ verification code from email body. + + Returns a VerifyResult with the code and which style to use: + - atheme: /msg NickServ VERIFY REGISTER nick code + - anope: /msg NickServ CONFIRM code + """ + text = html.unescape(body) + text = re.sub(r"<[^>]+>", " ", text) + text = re.sub(r"\s+", " ", text) + + # Atheme-style: VERIFY REGISTER + m = re.search(r"VERIFY\s+REGISTER\s+\S+\s+(\S+)", text, re.IGNORECASE) + if m: + return VerifyResult(m.group(1), "atheme") + + # Anope-style: CONFIRM + m = re.search(r"CONFIRM\s+(\S+)", text, re.IGNORECASE) + if m: + return VerifyResult(m.group(1), "anope") + + # Generic: verification code is/: + m = re.search( + r"(?:verification|confirm|activation)\s+(?:code|token)[:\s]+(\S+)", + text, re.IGNORECASE, + ) + if m: + return VerifyResult(m.group(1), "anope") + + return None diff --git a/src/bouncer/network.py b/src/bouncer/network.py index acbffcd..8c2beef 100644 --- a/src/bouncer/network.py +++ b/src/bouncer/network.py @@ -3,19 +3,23 @@ from __future__ import annotations import asyncio +import base64 import hashlib import logging import random from enum import Enum, auto +from pathlib import Path from typing import Callable +from bouncer.backlog import Backlog from bouncer.config import NetworkConfig, ProxyConfig +from bouncer.email import fetch_extra_domains, get_all_domains, verify_email from bouncer.irc import IRCMessage, parse log = logging.getLogger(__name__) BACKOFF_STEPS = [5, 10, 30, 60, 120, 300] -PROBATION_SECONDS = 15 +PROBATION_SECONDS = 45 class State(Enum): @@ -99,20 +103,24 @@ def _random_nick() -> str: return base -_GENERIC_IDENTS = ["user", "ident"] -_GENERIC_REALNAMES = ["realname", "unknown"] +_GENERIC_IDENTS = [ + "user", "me", "usr", "x", "i", "u", "id", "anon", + "client", "irc", "chat", "net", "null", "void", +] + +_GENERIC_REALNAMES = [ + "...", "-", ".", "New User", "realname", "me", + "IRC User", "Unknown", "Anonymous", "user", +] + +def _email_domains() -> list[str]: + """Return all currently available email domains.""" + return get_all_domains() -def _nick_for_host(host: str) -> str: - """Generate a deterministic pronounceable nick from a hostname. - - The same hostname always produces the same nick. Uses the host string - as a seed for the markov generator so nicks are stable across reconnects - to known endpoints. - """ - seed = int(hashlib.sha256(host.encode()).hexdigest(), 16) - rng = random.Random(seed) - length = rng.randint(5, 8) +def _seeded_markov(rng: random.Random, min_len: int, max_len: int) -> str: + """Generate a pronounceable word using a seeded RNG.""" + length = rng.randint(min_len, max_len) ch = rng.choice(_STARTERS) word = [ch] consonant_run = 0 if ch in _VOWELS else 1 @@ -129,13 +137,55 @@ def _nick_for_host(host: str) -> str: else: consonant_run += 1 word.append(ch) + return "".join(word) - base = "".join(word) + +def _rng_for_key(key: str) -> random.Random: + """Create a seeded RNG from an arbitrary string key.""" + seed = int(hashlib.sha256(key.encode()).hexdigest(), 16) + return random.Random(seed) + + +def _nick_for_host(host: str) -> str: + """Generate a deterministic pronounceable nick from a hostname. + + The same hostname always produces the same nick, stable across reconnects. + """ + rng = _rng_for_key(host) + base = _seeded_markov(rng, 5, 8) if rng.random() < 0.3: base += str(rng.randint(0, 99)) return base +def _password_for_host(host: str) -> str: + """Derive a deterministic NickServ password from a hostname. + + Uses a different hash domain than the nick so they're independent. + """ + raw = hashlib.sha256(f"nickserv:{host}".encode()).hexdigest() + return raw[:16] + + +def _email_for_host( + host: str, excluded: set[str] | None = None, +) -> str | None: + """Generate a random-looking email from a hostname. + + Deterministic local part, random domain from available (non-excluded) list. + Returns None if all domains are excluded. + """ + available = [d for d in _email_domains() if d not in (excluded or set())] + if not available: + return None + rng = _rng_for_key(f"email:{host}") + local = _seeded_markov(rng, 5, 9) + if rng.random() < 0.5: + local += str(rng.randint(10, 99)) + domain = random.choice(available) + return f"{local}{random.randint(1, 999)}@{domain}" + + class Network: """Manages a persistent connection to a single IRC server.""" @@ -143,11 +193,17 @@ class Network: self, cfg: NetworkConfig, proxy_cfg: ProxyConfig, + backlog: Backlog | None = None, on_message: Callable[[str, IRCMessage], None] | None = None, + on_status: Callable[[str, str], None] | None = None, + data_dir: Path | None = None, ) -> None: self.cfg = cfg self.proxy_cfg = proxy_cfg + self.backlog = backlog self.on_message = on_message + self.on_status = on_status # (network_name, status_text) + self.data_dir = data_dir self.nick: str = cfg.nick or "*" self.channels: set[str] = set() self.state: State = State.DISCONNECTED @@ -167,6 +223,23 @@ class Network: # Channel state: topic + names per channel self.topics: dict[str, str] = {} self.names: dict[str, set[str]] = {} + # NickServ registration state + self._nickserv_pending: str = "" # "identify" or "register" + self._nickserv_password: str = "" + self._nickserv_email: str = "" + self._nickserv_done: asyncio.Event = asyncio.Event() + self._verify_task: asyncio.Task[None] | None = None + self._rejected_email_domains: set[str] = set() + # SASL authentication state + self._sasl_nick: str = "" + self._sasl_pass: str = "" + self._sasl_mechanism: str = "" # "EXTERNAL" or "PLAIN" + self._sasl_complete: asyncio.Event = asyncio.Event() + + def _status(self, text: str) -> None: + """Emit a status message to attached clients.""" + if self.on_status: + self.on_status(self.cfg.name, text) @property def connected(self) -> bool: @@ -188,7 +261,7 @@ class Network: async def stop(self) -> None: """Disconnect and stop reconnection.""" self._running = False - for task in (self._read_task, self._reconnect_task, self._probation_task): + for task in (self._read_task, self._reconnect_task, self._probation_task, self._verify_task): if task and not task.done(): task.cancel() await self._disconnect() @@ -204,29 +277,72 @@ class Network: await self.send(IRCMessage(command=command, params=list(params))) async def _connect(self) -> None: - """Establish connection via SOCKS5 proxy and register with random nick.""" + """Establish connection via SOCKS5 proxy and register. + + Authentication cascade: + 1. Stored creds + client cert -> SASL EXTERNAL (CertFP) + 2. Stored creds, no cert -> SASL PLAIN + 3. No creds -> random nick, no SASL + """ + from bouncer.cert import cert_path, has_cert from bouncer.proxy import connect self.state = State.CONNECTING - self._connect_nick = _random_nick() self.visible_host = None + self._sasl_nick = "" + self._sasl_pass = "" + self._sasl_mechanism = "" + self._sasl_complete = asyncio.Event() + + # Check for stored creds to decide SASL strategy + use_sasl = False + client_cert = None + if self.backlog: + creds = await self.backlog.get_nickserv_creds_by_network(self.cfg.name) + if creds: + self._sasl_nick, self._sasl_pass = creds + self._connect_nick = self._sasl_nick + use_sasl = True + + # Prefer EXTERNAL if a cert exists for this nick + if self.data_dir and has_cert(self.data_dir, self.cfg.name, self._sasl_nick): + self._sasl_mechanism = "EXTERNAL" + client_cert = cert_path(self.data_dir, self.cfg.name, self._sasl_nick) + log.info("[%s] stored creds + cert for %s, will use SASL EXTERNAL", + self.cfg.name, self._sasl_nick) + else: + self._sasl_mechanism = "PLAIN" + log.info("[%s] stored creds for %s, will use SASL PLAIN", + self.cfg.name, self._sasl_nick) + + if not use_sasl: + self._connect_nick = _random_nick() try: log.info( - "[%s] connecting to %s:%d (tls=%s)", + "[%s] connecting to %s:%d (tls=%s, sasl=%s)", self.cfg.name, self.cfg.host, self.cfg.port, self.cfg.tls, + self._sasl_mechanism or "none", ) self._reader, self._writer = await connect( self.cfg.host, self.cfg.port, self.proxy_cfg, tls=self.cfg.tls, + client_cert=client_cert, ) self.state = State.REGISTERING - self._reconnect_attempt = 0 - log.info("[%s] connected, registering as %s", self.cfg.name, self._connect_nick) - # IRC registration with generic identity + if use_sasl: + self._status( + f"connected, authenticating as {self._connect_nick}" + f" (SASL {self._sasl_mechanism})" + ) + await self.send_raw("CAP", "REQ", "sasl") + else: + self._status(f"connected, registering as {self._connect_nick}") + + # IRC registration if self.cfg.password: await self.send_raw("PASS", self.cfg.password) await self.send_raw("NICK", self._connect_nick) @@ -328,35 +444,319 @@ class Network: if self.state != State.PROBATION: return + self._status("probation passed, connection stable") log.info("[%s] probation passed, connection stable", self.cfg.name) + self._reconnect_attempt = 0 await self._go_ready() async def _go_ready(self) -> None: - """Transition to ready: switch to host-derived nick, then join channels.""" + """Transition to ready: skip NickServ if SASL succeeded, otherwise register. + + Also checks for pending (unverified) registrations from a previous + session and resumes email verification if found. + """ self.state = State.READY - - # Derive a stable nick from the exit endpoint - if self.visible_host: - desired = _nick_for_host(self.visible_host) - elif self.cfg.nick: - desired = self.cfg.nick - else: - desired = _random_nick() - log.info("[%s] switching nick: %s -> %s (host=%s)", self.cfg.name, self.nick, desired, + log.info("[%s] ready as %s (host=%s)", self.cfg.name, self.nick, self.visible_host or "unknown") - self._nick_confirmed.clear() - await self.send_raw("NICK", desired) - # Wait for server to confirm the nick change before joining + # SASL already authenticated -- skip NickServ entirely + if self._sasl_complete.is_set(): + self._status(f"ready as {self.nick} (SASL)") + # Still check for pending registrations to resume verification + await self._resume_pending_verification() + await self._nickserv_complete() + return + + # Check for a pending registration from a previous session + if await self._resume_pending_verification(): + # Pending verification resumed -- skip normal NickServ flow + await self._nickserv_complete() + return + + # Try NickServ: IDENTIFY first (previous session), else REGISTER + self._nickserv_done = asyncio.Event() + await self._nickserv_identify() + + # If NickServ doesn't respond within 15s, move on try: - await asyncio.wait_for(self._nick_confirmed.wait(), timeout=10) + await asyncio.wait_for(self._nickserv_done.wait(), timeout=15) except asyncio.TimeoutError: - log.warning("[%s] nick change not confirmed in 10s, joining anyway", self.cfg.name) + log.warning("[%s] NickServ did not respond in 15s", self.cfg.name) + self._nickserv_pending = "" + await self._nickserv_complete() - # Join configured channels - if self.cfg.autojoin and self.cfg.channels: - for ch in self.cfg.channels: - await self.send_raw("JOIN", ch) + async def _nickserv_identify(self) -> None: + """Attempt to IDENTIFY with NickServ using stored credentials. + + If no stored creds exist for this network, skip straight to REGISTER. + """ + host = self.visible_host or "" + + # Look up stored credentials by network + host + if self.backlog and host: + creds = await self.backlog.get_nickserv_creds_by_host( + self.cfg.name, host, + ) + if creds: + stored_nick, stored_pass = creds + log.info("[%s] found stored creds for nick %s, switching", self.cfg.name, stored_nick) + # Switch to the registered nick first + self._nick_confirmed.clear() + await self.send_raw("NICK", stored_nick) + try: + await asyncio.wait_for(self._nick_confirmed.wait(), timeout=10) + except asyncio.TimeoutError: + log.warning("[%s] nick change to %s not confirmed", self.cfg.name, stored_nick) + + self._nickserv_password = stored_pass + self._nickserv_pending = "identify" + log.info("[%s] attempting NickServ IDENTIFY as %s", self.cfg.name, self.nick) + await self.send_raw("PRIVMSG", "NickServ", f"IDENTIFY {stored_pass}") + return + + # No stored creds — register the current random nick + await self._nickserv_register() + + async def _nickserv_register(self) -> None: + """Attempt to REGISTER with NickServ using a generated email. + + Picks a domain not yet rejected by this server. If all static domains + are exhausted, fetches additional domains from mail.tm/gw before giving up. + """ + host = self.visible_host or self.nick + password = _password_for_host(host) + email = _email_for_host(host, excluded=self._rejected_email_domains) + + if not email: + # All known domains rejected -- try fetching more from mail.tm/gw + self._status("all email domains rejected, fetching more...") + log.info("[%s] all domains rejected, fetching mail.tm/gw domains", self.cfg.name) + new_domains = await fetch_extra_domains( + self.proxy_cfg.host, self.proxy_cfg.port, + ) + if new_domains: + email = _email_for_host(host, excluded=self._rejected_email_domains) + + if not email: + self._status("all email domains exhausted") + log.warning("[%s] no email domains left to try", self.cfg.name) + self._nickserv_pending = "" + await self._nickserv_complete() + return + + self._nickserv_password = password + self._nickserv_email = email + self._nickserv_pending = "register" + log.info("[%s] attempting NickServ REGISTER (email=%s)", self.cfg.name, email) + await self.send_raw("PRIVMSG", "NickServ", f"REGISTER {password} {email}") + + async def _nickserv_complete(self) -> None: + """Signal that NickServ interaction is finished.""" + self._nickserv_done.set() + + async def _verify_email_code(self) -> None: + """Poll temp email for NickServ verification code and confirm.""" + if not self._nickserv_email: + return + + self._status(f"checking email {self._nickserv_email} for verification code...") + nick = self.nick + result = await verify_email( + self._nickserv_email, + proxy_host=self.proxy_cfg.host, + proxy_port=self.proxy_cfg.port, + ) + if not result: + self._status("no verification code found in email") + return + + if self.state != State.READY or not self._running: + return + + self._status(f"verifying {nick} with code {result.code}") + if result.style == "atheme": + cmd = f"VERIFY REGISTER {nick} {result.code}" + else: + cmd = f"CONFIRM {result.code}" + log.info("[%s] sending NickServ %s", self.cfg.name, cmd) + await self.send_raw("PRIVMSG", "NickServ", cmd) + + async def _handle_nickserv(self, text: str) -> None: + """Process NickServ NOTICE responses. + + Handles both immediate responses (while _nickserv_pending is set) and + late-arriving responses (after the 15s timeout cleared pending state). + """ + lower = text.lower() + log.info("[%s] NickServ: %s", self.cfg.name, text) + + if self._nickserv_pending == "identify": + if "you are now identified" in lower: + self._status(f"identified as {self.nick}") + log.info("[%s] NickServ IDENTIFY succeeded", self.cfg.name) + if self.backlog and self._nickserv_password: + await self.backlog.save_nickserv_creds( + self.cfg.name, self.nick, + self._nickserv_password, "", + self.visible_host or "", + ) + self._nickserv_pending = "" + await self._nickserv_complete() + elif "is not a registered nickname" in lower or "not registered" in lower: + self._status(f"{self.nick} not registered, attempting REGISTER") + log.info("[%s] nick not registered, attempting REGISTER", self.cfg.name) + self._nickserv_pending = "" + await self._nickserv_register() + elif "invalid password" in lower or "password incorrect" in lower: + self._status(f"IDENTIFY failed for {self.nick} (wrong password)") + log.warning("[%s] NickServ IDENTIFY failed (wrong password)", self.cfg.name) + self._nickserv_pending = "" + await self._nickserv_complete() + + elif self._nickserv_pending == "register": + if self._registration_immediate(lower): + # Some servers register immediately without email + await self._on_verify_success() + elif self._registration_confirmed(lower): + await self._on_register_success() + elif "is already registered" in lower: + self._status(f"{self.nick} already registered by someone else") + log.warning("[%s] nick already registered by someone else", self.cfg.name) + self._nickserv_pending = "" + await self._nickserv_complete() + elif "do not accept" in lower or "not allowed" in lower: + # Blacklist this domain and try the next one + rejected = "" + if self._nickserv_email and "@" in self._nickserv_email: + rejected = self._nickserv_email.rsplit("@", 1)[1] + self._rejected_email_domains.add(rejected) + remaining = len(_email_domains()) - len(self._rejected_email_domains) + self._status(f"email domain {rejected} rejected, {remaining} left") + log.warning("[%s] NickServ rejected %s, trying next domain (%d left)", + self.cfg.name, rejected, remaining) + self._nickserv_pending = "" + await self._nickserv_register() + elif "too soon" in lower or "wait" in lower or "too many" in lower: + self._status(f"REGISTER rejected (too soon/rate limited)") + log.warning("[%s] NickServ rate limited: %s", self.cfg.name, text) + self._nickserv_pending = "" + await self._nickserv_complete() + + elif self._nickserv_pending == "verify": + # Waiting for VERIFY/CONFIRM response + if self._verification_succeeded(lower): + await self._on_verify_success() + elif "invalid" in lower or "unknown" in lower: + self._status(f"verification failed: {text}") + log.warning("[%s] verification failed: %s", self.cfg.name, text) + self._nickserv_pending = "" + + else: + # Late-arriving messages (after 15s timeout cleared pending state) + if self._registration_confirmed(lower) and self._nickserv_password: + log.info("[%s] late NickServ registration confirmation", self.cfg.name) + await self._on_register_success() + elif self._verification_succeeded(lower) and self._nickserv_password: + log.info("[%s] late NickServ verification confirmation", self.cfg.name) + await self._on_verify_success() + + def _registration_confirmed(self, lower: str) -> bool: + """Check if a NickServ message indicates registration accepted.""" + return any(kw in lower for kw in ( + "has been sent to", "passcode has been sent", + "activation instructions", "already been requested", + )) + + def _registration_immediate(self, lower: str) -> bool: + """Check if registration completed without email verification.""" + return "nickname registered" in lower and "email" not in lower + + def _verification_succeeded(self, lower: str) -> bool: + """Check if email verification / nick activation succeeded.""" + return any(kw in lower for kw in ( + "has been verified", "has now been verified", + "has been activated", "has now been activated", + "now identified", "registration complete", + "nickname confirmed", "account confirmed", + "you are now identified", + )) + + async def _on_register_success(self) -> None: + """Handle NickServ REGISTER accepted (email verification pending). + + Saves credentials as 'pending' so verification can resume across + reconnects. SASL only uses 'verified' creds, so this is safe. + """ + self._status(f"registered {self.nick} (verification email sent)") + log.info("[%s] NickServ REGISTER accepted, awaiting verification", self.cfg.name) + # Persist pending state for cross-session resume + if self.backlog and self._nickserv_password and self._nickserv_email: + await self.backlog.save_nickserv_creds( + self.cfg.name, self.nick, + self._nickserv_password, self._nickserv_email, + self.visible_host or "", + status="pending", + ) + self._nickserv_pending = "verify" + # Start email verification in the background (if not already running) + if not self._verify_task or self._verify_task.done(): + self._verify_task = asyncio.create_task( + self._verify_email_code() + ) + await self._nickserv_complete() + + async def _on_verify_success(self) -> None: + """Handle verified registration -- promote to verified for SASL.""" + self._status(f"verified {self.nick} -- SASL ready") + log.info("[%s] nick %s fully verified, saving credentials", self.cfg.name, self.nick) + if self.backlog and self._nickserv_password: + await self.backlog.mark_nickserv_verified(self.cfg.name, self.nick) + self._nickserv_pending = "" + + async def _resume_pending_verification(self) -> bool: + """Check for a pending registration from a previous session and resume. + + If the pending nick matches the current nick (or we can switch to it), + resumes email verification in the background. + + Returns True if a pending verification was resumed. + """ + if not self.backlog: + return False + + pending = await self.backlog.get_pending_registration(self.cfg.name) + if not pending: + return False + + p_nick, p_pass, p_email, p_host = pending + log.info("[%s] found pending registration: nick=%s email=%s", + self.cfg.name, p_nick, p_email) + + # If we're already SASL'd as a different nick, we can't verify + # for the pending nick on this connection -- just resume email check + # The verification code doesn't require being connected as that nick + + self._nickserv_password = p_pass + self._nickserv_email = p_email + self._nickserv_pending = "verify" + self._status(f"resuming verification for {p_nick} ({p_email})") + + # Switch to the pending nick if possible (needed for VERIFY command) + if self.nick != p_nick and not self._sasl_complete.is_set(): + self._nick_confirmed.clear() + await self.send_raw("NICK", p_nick) + try: + await asyncio.wait_for(self._nick_confirmed.wait(), timeout=10) + except asyncio.TimeoutError: + log.warning("[%s] could not switch to pending nick %s", + self.cfg.name, p_nick) + + # Resume email verification in the background + if not self._verify_task or self._verify_task.done(): + self._verify_task = asyncio.create_task( + self._verify_email_code() + ) + return True async def _handle(self, msg: IRCMessage) -> None: """Handle an IRC message from the server.""" @@ -366,8 +766,66 @@ class Network: if msg.command == "ERROR": reason = msg.params[0] if msg.params else "unknown" + self._status(f"ERROR: {reason}") log.warning("[%s] server ERROR: %s", self.cfg.name, reason) - # Connection will be closed by server; read_loop handles reconnect + return + + # --- SASL capability negotiation --- + if msg.command == "CAP" and len(msg.params) >= 3: + subcommand = msg.params[1].upper() + caps = msg.params[2].strip().lower() + if subcommand == "ACK" and "sasl" in caps: + log.info("[%s] SASL capability acknowledged, using %s", + self.cfg.name, self._sasl_mechanism) + await self.send_raw("AUTHENTICATE", self._sasl_mechanism or "PLAIN") + elif subcommand == "NAK" and "sasl" in caps: + log.warning("[%s] SASL not supported by server", self.cfg.name) + self._status("SASL not supported, falling back") + self._sasl_nick = "" + self._sasl_pass = "" + self._sasl_mechanism = "" + await self.send_raw("CAP", "END") + return + + if msg.command == "AUTHENTICATE" and msg.params and msg.params[0] == "+": + if self._sasl_mechanism == "EXTERNAL": + # EXTERNAL: send account name (nick) base64-encoded + encoded = base64.b64encode(self._sasl_nick.encode()).decode() + await self.send_raw("AUTHENTICATE", encoded) + log.debug("[%s] sent SASL EXTERNAL identity", self.cfg.name) + elif self._sasl_nick and self._sasl_pass: + # PLAIN: nick\0nick\0pass + cred = f"{self._sasl_nick}\0{self._sasl_nick}\0{self._sasl_pass}" + encoded = base64.b64encode(cred.encode()).decode() + await self.send_raw("AUTHENTICATE", encoded) + log.debug("[%s] sent SASL PLAIN credentials", self.cfg.name) + else: + await self.send_raw("AUTHENTICATE", "*") # abort + return + + if msg.command == "903": + # RPL_SASLSUCCESS + log.info("[%s] SASL %s authentication successful", self.cfg.name, self._sasl_mechanism) + self._status(f"SASL {self._sasl_mechanism} authenticated as {self._sasl_nick}") + self._sasl_complete.set() + await self.send_raw("CAP", "END") + return + + if msg.command in ("902", "904", "905"): + # ERR_NICKLOCKED / ERR_SASLFAIL / ERR_SASLTOOLONG + reason = msg.params[-1] if msg.params else msg.command + log.warning("[%s] SASL %s failed (%s): %s", + self.cfg.name, self._sasl_mechanism, msg.command, reason) + self._status(f"SASL {self._sasl_mechanism} failed, falling back") + self._sasl_nick = "" + self._sasl_pass = "" + self._sasl_mechanism = "" + await self.send_raw("CAP", "END") + return + + if msg.command in ("906", "908"): + # ERR_SASLABORTED / RPL_SASLMECHS + await self.send_raw("CAP", "END") return if msg.command == "001": @@ -391,9 +849,15 @@ class Network: log.info("[%s] visible host (396): %s", self.cfg.name, self.visible_host) elif msg.command == "NOTICE" and msg.params: - # Extract hostname from server notices during connect text = msg.params[-1] if msg.params else "" - if "Found your hostname" in text: + sender = msg.prefix.split("!")[0].lower() if msg.prefix else "" + + # NickServ response handling (always route, even after timeout) + if sender == "nickserv": + await self._handle_nickserv(text) + + # Extract hostname from server notices during connect + elif "Found your hostname" in text: # "*** Found your hostname: some.host.example.com" parts = text.rsplit(": ", 1) if len(parts) == 2: diff --git a/src/bouncer/proxy.py b/src/bouncer/proxy.py index 33669fd..8541125 100644 --- a/src/bouncer/proxy.py +++ b/src/bouncer/proxy.py @@ -6,6 +6,7 @@ import asyncio import logging import socket import ssl +from pathlib import Path from python_socks.async_.asyncio import Proxy @@ -39,6 +40,7 @@ async def connect( port: int, proxy_cfg: ProxyConfig, tls: bool = False, + client_cert: Path | None = None, ) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: """Open a TCP connection through the SOCKS5 proxy. @@ -47,6 +49,7 @@ async def connect( Returns an (asyncio.StreamReader, asyncio.StreamWriter) pair. If tls=True, the connection is wrapped in SSL after the SOCKS5 handshake. + If client_cert is given (Path to a combined PEM), it is loaded for CertFP. """ addrs = await _resolve_all(host, port) last_err: Exception | None = None @@ -67,6 +70,9 @@ async def connect( ssl_ctx: ssl.SSLContext | None = None if tls: ssl_ctx = ssl.create_default_context() + if client_cert: + ssl_ctx.load_cert_chain(certfile=str(client_cert)) + log.debug("loaded client cert %s", client_cert) reader, writer = await asyncio.open_connection( host=None, diff --git a/src/bouncer/router.py b/src/bouncer/router.py index d6a75aa..f7065fc 100644 --- a/src/bouncer/router.py +++ b/src/bouncer/router.py @@ -4,6 +4,7 @@ from __future__ import annotations import asyncio import logging +from pathlib import Path from typing import TYPE_CHECKING from bouncer.backlog import Backlog @@ -82,9 +83,10 @@ def _suppress(msg: IRCMessage) -> bool: class Router: """Central message hub linking clients to networks.""" - def __init__(self, config: Config, backlog: Backlog) -> None: + def __init__(self, config: Config, backlog: Backlog, data_dir: Path | None = None) -> None: self.config = config self.backlog = backlog + self.data_dir = data_dir self.networks: dict[str, Network] = {} self.clients: list[Client] = [] self._lock = asyncio.Lock() @@ -98,6 +100,7 @@ class Router: backlog=self.backlog, on_message=self._on_network_message, on_status=self._on_network_status, + data_dir=self.data_dir, ) self.networks[name] = network asyncio.create_task(network.start()) @@ -281,6 +284,7 @@ class Router: backlog=self.backlog, on_message=self._on_network_message, on_status=self._on_network_status, + data_dir=self.data_dir, ) self.networks[cfg.name] = network asyncio.create_task(network.start()) diff --git a/tests/test_cert.py b/tests/test_cert.py new file mode 100644 index 0000000..4d7552b --- /dev/null +++ b/tests/test_cert.py @@ -0,0 +1,135 @@ +"""Tests for client certificate management.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from bouncer.cert import ( + cert_path, + delete_cert, + fingerprint, + generate_cert, + has_cert, + list_certs, +) + + +@pytest.fixture +def data_dir(tmp_path: Path) -> Path: + """Provide a temporary data directory.""" + return tmp_path + + +class TestCertPath: + def test_standard_path(self, data_dir: Path) -> None: + p = cert_path(data_dir, "libera", "fabesune") + assert p == data_dir / "certs" / "libera" / "fabesune.pem" + + +class TestGenerateCert: + def test_creates_pem_file(self, data_dir: Path) -> None: + pem = generate_cert(data_dir, "libera", "testnick") + assert pem.is_file() + assert pem == cert_path(data_dir, "libera", "testnick") + + def test_pem_contains_cert_and_key(self, data_dir: Path) -> None: + pem = generate_cert(data_dir, "libera", "testnick") + content = pem.read_text() + assert "BEGIN CERTIFICATE" in content + assert "BEGIN PRIVATE KEY" in content + + def test_file_permissions(self, data_dir: Path) -> None: + pem = generate_cert(data_dir, "libera", "testnick") + mode = pem.stat().st_mode & 0o777 + assert mode == 0o600 + + def test_overwrites_existing(self, data_dir: Path) -> None: + pem1 = generate_cert(data_dir, "libera", "testnick") + fp1 = fingerprint(pem1) + pem2 = generate_cert(data_dir, "libera", "testnick") + fp2 = fingerprint(pem2) + assert pem1 == pem2 + assert fp1 != fp2 # New cert = new fingerprint + + +class TestFingerprint: + def test_format(self, data_dir: Path) -> None: + pem = generate_cert(data_dir, "libera", "testnick") + fp = fingerprint(pem) + parts = fp.split(":") + assert len(parts) == 32 # SHA-256 = 32 bytes + for part in parts: + assert len(part) == 2 + int(part, 16) # Must be valid hex + + def test_uppercase_hex(self, data_dir: Path) -> None: + pem = generate_cert(data_dir, "libera", "testnick") + fp = fingerprint(pem) + assert fp == fp.upper() + + def test_deterministic_for_same_cert(self, data_dir: Path) -> None: + pem = generate_cert(data_dir, "libera", "testnick") + assert fingerprint(pem) == fingerprint(pem) + + +class TestHasCert: + def test_exists(self, data_dir: Path) -> None: + generate_cert(data_dir, "libera", "testnick") + assert has_cert(data_dir, "libera", "testnick") is True + + def test_not_exists(self, data_dir: Path) -> None: + assert has_cert(data_dir, "libera", "testnick") is False + + +class TestDeleteCert: + def test_delete_existing(self, data_dir: Path) -> None: + generate_cert(data_dir, "libera", "testnick") + assert delete_cert(data_dir, "libera", "testnick") is True + assert has_cert(data_dir, "libera", "testnick") is False + + def test_delete_nonexistent(self, data_dir: Path) -> None: + assert delete_cert(data_dir, "libera", "testnick") is False + + def test_cleans_empty_dir(self, data_dir: Path) -> None: + generate_cert(data_dir, "libera", "testnick") + delete_cert(data_dir, "libera", "testnick") + assert not (data_dir / "certs" / "libera").exists() + + +class TestListCerts: + def test_empty(self, data_dir: Path) -> None: + assert list_certs(data_dir) == [] + + def test_list_all(self, data_dir: Path) -> None: + generate_cert(data_dir, "libera", "nick1") + generate_cert(data_dir, "oftc", "nick2") + certs = list_certs(data_dir) + assert len(certs) == 2 + networks = {c[0] for c in certs} + assert networks == {"libera", "oftc"} + + def test_list_by_network(self, data_dir: Path) -> None: + generate_cert(data_dir, "libera", "nick1") + generate_cert(data_dir, "oftc", "nick2") + certs = list_certs(data_dir, network="libera") + assert len(certs) == 1 + assert certs[0][0] == "libera" + assert certs[0][1] == "nick1" + + def test_list_multiple_per_network(self, data_dir: Path) -> None: + generate_cert(data_dir, "libera", "nick1") + generate_cert(data_dir, "libera", "nick2") + certs = list_certs(data_dir, network="libera") + assert len(certs) == 2 + nicks = {c[1] for c in certs} + assert nicks == {"nick1", "nick2"} + + def test_fingerprints_present(self, data_dir: Path) -> None: + generate_cert(data_dir, "libera", "testnick") + certs = list_certs(data_dir) + assert len(certs) == 1 + _, _, fp = certs[0] + assert ":" in fp + assert len(fp.split(":")) == 32 diff --git a/tests/test_commands.py b/tests/test_commands.py index 9c4f876..5b10a90 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -774,6 +774,185 @@ class TestDropCreds: assert "Unknown network" in lines[0] +class TestGencert: + @pytest.mark.asyncio + async def test_gencert_no_data_dir(self) -> None: + commands.DATA_DIR = None + router = _make_router() + client = _make_client() + lines = await commands.dispatch("GENCERT libera", router, client) + assert "not available" in lines[0] + + @pytest.mark.asyncio + async def test_gencert_missing_arg(self) -> None: + commands.DATA_DIR = Path("/tmp") + router = _make_router() + client = _make_client() + lines = await commands.dispatch("GENCERT", router, client) + assert "Usage" in lines[0] + + @pytest.mark.asyncio + async def test_gencert_unknown_network(self) -> None: + commands.DATA_DIR = Path("/tmp") + router = _make_router() + client = _make_client() + lines = await commands.dispatch("GENCERT fakenet", router, client) + assert "Unknown network" in lines[0] + + @pytest.mark.asyncio + async def test_gencert_with_nick(self, tmp_path: Path) -> None: + commands.DATA_DIR = tmp_path + net = _make_network("libera", State.READY, nick="fabesune") + router = _make_router(net) + client = _make_client() + lines = await commands.dispatch("GENCERT libera testnick", router, client) + assert "[GENCERT]" in lines[0] + assert "testnick" in lines[0] + assert any("fingerprint" in line for line in lines) + # Should auto-send CERT ADD since network is ready + net.send_raw.assert_awaited() + calls = net.send_raw.await_args_list + assert any( + c.args[0] == "PRIVMSG" and c.args[1] == "NickServ" + and "CERT ADD" in c.args[2] + for c in calls + ) + + @pytest.mark.asyncio + async def test_gencert_uses_current_nick(self, tmp_path: Path) -> None: + commands.DATA_DIR = tmp_path + net = _make_network("libera", State.READY, nick="fabesune") + router = _make_router(net) + client = _make_client() + lines = await commands.dispatch("GENCERT libera", router, client) + assert "[GENCERT]" in lines[0] + assert "fabesune" in lines[0] + + @pytest.mark.asyncio + async def test_gencert_not_ready(self, tmp_path: Path) -> None: + commands.DATA_DIR = tmp_path + net = _make_network("libera", State.CONNECTING, nick="fabesune") + router = _make_router(net) + client = _make_client() + lines = await commands.dispatch("GENCERT libera", router, client) + assert "[GENCERT]" in lines[0] + assert any("not ready" in line or "manually" in line for line in lines) + + @pytest.mark.asyncio + async def test_gencert_no_nick(self, tmp_path: Path) -> None: + commands.DATA_DIR = tmp_path + net = _make_network("libera", State.READY, nick="*") + router = _make_router(net) + router.backlog.get_nickserv_creds_by_network.return_value = None + client = _make_client() + lines = await commands.dispatch("GENCERT libera", router, client) + assert "No nick available" in lines[0] + + +class TestCertfp: + def test_certfp_no_data_dir(self) -> None: + commands.DATA_DIR = None + router = _make_router() + lines = _cmd_certfp_sync(router, None) + assert "not available" in lines[0] + + def test_certfp_empty(self, tmp_path: Path) -> None: + commands.DATA_DIR = tmp_path + router = _make_router() + lines = _cmd_certfp_sync(router, None) + assert "no certificates" in lines[0] + + def test_certfp_lists_certs(self, tmp_path: Path) -> None: + from bouncer.cert import generate_cert + commands.DATA_DIR = tmp_path + generate_cert(tmp_path, "libera", "fabesune") + net = _make_network("libera", State.READY) + router = _make_router(net) + lines = _cmd_certfp_sync(router, None) + assert lines[0] == "[CERTFP]" + assert any("libera" in line and "fabesune" in line for line in lines) + + def test_certfp_filter_network(self, tmp_path: Path) -> None: + from bouncer.cert import generate_cert + commands.DATA_DIR = tmp_path + generate_cert(tmp_path, "libera", "nick1") + generate_cert(tmp_path, "oftc", "nick2") + libera = _make_network("libera", State.READY) + oftc = _make_network("oftc", State.READY) + router = _make_router(libera, oftc) + lines = _cmd_certfp_sync(router, "libera") + assert lines[0] == "[CERTFP]" + assert any("nick1" in line for line in lines) + assert not any("nick2" in line for line in lines) + + def test_certfp_unknown_network(self, tmp_path: Path) -> None: + commands.DATA_DIR = tmp_path + router = _make_router() + lines = _cmd_certfp_sync(router, "fakenet") + assert "Unknown network" in lines[0] + + +class TestDelcert: + def test_delcert_no_data_dir(self) -> None: + commands.DATA_DIR = None + router = _make_router() + lines = _cmd_delcert_sync(router, "libera") + assert "not available" in lines[0] + + def test_delcert_missing_arg(self) -> None: + commands.DATA_DIR = Path("/tmp") + router = _make_router() + lines = _cmd_delcert_sync(router, "") + assert "Usage" in lines[0] + + def test_delcert_unknown_network(self) -> None: + commands.DATA_DIR = Path("/tmp") + router = _make_router() + lines = _cmd_delcert_sync(router, "fakenet") + assert "Unknown network" in lines[0] + + def test_delcert_removes_cert(self, tmp_path: Path) -> None: + from bouncer.cert import generate_cert, has_cert + commands.DATA_DIR = tmp_path + generate_cert(tmp_path, "libera", "testnick") + assert has_cert(tmp_path, "libera", "testnick") + net = _make_network("libera", State.READY, nick="testnick") + router = _make_router(net) + lines = _cmd_delcert_sync(router, "libera testnick") + assert "[DELCERT]" in lines[0] + assert "deleted" in lines[0] + assert not has_cert(tmp_path, "libera", "testnick") + + def test_delcert_nonexistent(self, tmp_path: Path) -> None: + commands.DATA_DIR = tmp_path + net = _make_network("libera", State.READY, nick="testnick") + router = _make_router(net) + lines = _cmd_delcert_sync(router, "libera testnick") + assert "no cert found" in lines[0] + + def test_delcert_uses_current_nick(self, tmp_path: Path) -> None: + from bouncer.cert import generate_cert, has_cert + commands.DATA_DIR = tmp_path + generate_cert(tmp_path, "libera", "fabesune") + net = _make_network("libera", State.READY, nick="fabesune") + router = _make_router(net) + lines = _cmd_delcert_sync(router, "libera") + assert "deleted" in lines[0] + assert not has_cert(tmp_path, "libera", "fabesune") + + +def _cmd_certfp_sync(router: MagicMock, network_name: str | None) -> list[str]: + """Call _cmd_certfp synchronously (it's not async).""" + from bouncer.commands import _cmd_certfp + return _cmd_certfp(router, network_name) + + +def _cmd_delcert_sync(router: MagicMock, arg: str) -> list[str]: + """Call _cmd_delcert synchronously (it's not async).""" + from bouncer.commands import _cmd_delcert + return _cmd_delcert(router, arg) + + class TestUnknownCommand: @pytest.mark.asyncio async def test_unknown_command(self) -> None: