feat: make all operational constants configurable via bouncer.toml

Replace hardcoded values across network, captcha, email, and cert
modules with BouncerConfig fields. All values have safe defaults
and are overridable in the [bouncer] section of the config file.

Configurable: probation_seconds, backoff_steps, nick_timeout,
rejoin_delay, http_timeout, captcha_poll_interval/timeout,
email_poll_interval/max_polls/request_timeout, cert_validity_days.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-21 16:33:08 +01:00
parent ed576b002d
commit d13d090e8e
14 changed files with 506 additions and 97 deletions

View File

@@ -115,14 +115,14 @@ own-nick # own nicks shown without suffix
## Connection States
```
DISCONNECTED -> CONNECTING -> REGISTERING -> PROBATION (15s) -> READY
DISCONNECTED -> CONNECTING -> REGISTERING -> PROBATION (45s) -> READY
```
| State | What happens |
|-------|-------------|
| CONNECTING | TCP + SOCKS5 + TLS handshake |
| REGISTERING | Random nick/user/realname sent to server |
| PROBATION | 15s wait, watching for K-line |
| PROBATION | 45s wait (configurable), watching for K-line |
| READY | Switch to configured nick, join channels |
## Auth Cascade
@@ -142,6 +142,12 @@ SASL EXTERNAL (cert + creds) > SASL PLAIN (creds) > NickServ IDENTIFY
```toml
[bouncer]
bind / port / password
captcha_api_key # NoCaptchaAI key (optional)
captcha_poll_interval / captcha_poll_timeout
probation_seconds / nick_timeout / rejoin_delay
backoff_steps / http_timeout
email_poll_interval / email_max_polls / email_request_timeout
cert_validity_days
[bouncer.backlog]
max_messages / replay_on_connect
@@ -189,6 +195,7 @@ src/bouncer/
network.py # server connection + state machine + SASL
client.py # client session handler
cert.py # client certificate generation + management
captcha.py # hCaptcha solver via NoCaptchaAI
commands.py # 25 bouncer control commands (/msg *bouncer)
router.py # message routing + backlog trigger
server.py # TCP listener

View File

@@ -35,8 +35,8 @@ No fixed prefix or pattern -- each attempt looks like a different person.
### 2. Probation (15 seconds)
After registration succeeds (001 RPL_WELCOME), the bouncer enters a 15-second
probation window. During this time it watches for:
After registration succeeds (001 RPL_WELCOME), the bouncer enters a probation
window (default 45s, configurable via `probation_seconds`). During this time it watches for:
- `ERROR` messages (K-line, ban)
- Server closing the connection
@@ -54,10 +54,11 @@ Once probation passes without incident:
### 4. Reconnection
On any disconnection, the bouncer reconnects with exponential backoff:
On any disconnection, the bouncer reconnects with exponential backoff
(configurable via `backoff_steps`):
| Attempt | Delay |
|---------|-------|
| Attempt | Default Delay |
|---------|---------------|
| 1 | 5s |
| 2 | 10s |
| 3 | 30s |
@@ -178,6 +179,26 @@ bind = "127.0.0.1" # listen address
port = 6667 # listen port
password = "changeme" # client authentication password
# Captcha solving (NoCaptchaAI)
captcha_api_key = "" # API key (optional, for auto-verification)
captcha_poll_interval = 3 # seconds between solve polls
captcha_poll_timeout = 120 # max seconds to wait for solve
# Connection tuning
probation_seconds = 45 # post-connect watch period for k-lines
backoff_steps = [5, 10, 30, 60, 120, 300] # reconnect delays
nick_timeout = 10 # seconds to wait for nick change
rejoin_delay = 3 # seconds before rejoin after kick
http_timeout = 15 # per-request HTTP timeout
# Email verification
email_poll_interval = 15 # seconds between inbox checks
email_max_polls = 30 # max inbox checks (~7.5 min)
email_request_timeout = 20 # per-request timeout for email APIs
# Certificate generation
cert_validity_days = 3650 # client cert validity (~10 years)
[bouncer.backlog]
max_messages = 10000 # per network, 0 = unlimited
replay_on_connect = true # replay missed messages on client connect
@@ -196,6 +217,38 @@ autojoin = true # auto-join channels on ready (default: true)
password = "" # IRC server password (optional, for PASS command)
```
## Automatic Captcha Solving
Some IRC networks (e.g. OFTC) require visiting a URL with hCaptcha to verify
nick registration. The bouncer can solve these automatically using NoCaptchaAI.
### Setup
1. Sign up at [dash.nocaptchaai.com](https://dash.nocaptchaai.com) (free tier: 6000 solves/month)
2. Copy your API key from the dashboard
3. Add to config:
```toml
[bouncer]
captcha_api_key = "your-api-key-here"
```
4. Reload config:
```
/msg *bouncer REHASH
```
### How It Works
When NickServ sends a verification URL containing `/verify/`:
1. The bouncer fetches the page via the SOCKS proxy
2. If hCaptcha is detected and an API key is configured, it submits the
challenge to NoCaptchaAI for solving (all traffic routed through the proxy)
3. The solved token is submitted with the verification form
4. On success, the nick is promoted from `pending` to `verified` status
If no API key is set, or solving fails, the URL is stored as `pending` and
shown via the `CREDS` command for manual verification.
## CertFP Authentication
The bouncer supports client certificate fingerprint (CertFP) authentication

View File

@@ -38,6 +38,7 @@ CREATE TABLE IF NOT EXISTS nickserv_creds (
registered_at REAL NOT NULL,
host TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'verified',
verify_url TEXT NOT NULL DEFAULT '',
PRIMARY KEY (network, nick)
);
"""
@@ -45,6 +46,7 @@ CREATE TABLE IF NOT EXISTS nickserv_creds (
# Migration: add status column if missing (existing DBs)
_MIGRATIONS = [
"ALTER TABLE nickserv_creds ADD COLUMN status TEXT NOT NULL DEFAULT 'verified'",
"ALTER TABLE nickserv_creds ADD COLUMN verify_url TEXT NOT NULL DEFAULT ''",
]
@@ -176,6 +178,7 @@ class Backlog:
email: str,
host: str,
status: str = "verified",
verify_url: str = "",
) -> None:
"""Save NickServ credentials.
@@ -184,13 +187,13 @@ class Backlog:
"""
assert self._db is not None
await self._db.execute(
"INSERT INTO nickserv_creds (network, nick, password, email, registered_at, host, status) "
"VALUES (?, ?, ?, ?, ?, ?, ?) "
"INSERT INTO nickserv_creds (network, nick, password, email, registered_at, host, status, verify_url) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?) "
"ON CONFLICT(network, nick) DO UPDATE SET "
"password = excluded.password, email = excluded.email, "
"registered_at = excluded.registered_at, host = excluded.host, "
"status = excluded.status",
(network, nick, password, email, time.time(), host, status),
"status = excluded.status, verify_url = excluded.verify_url",
(network, nick, password, email, time.time(), host, status, verify_url),
)
await self._db.commit()
log.info("saved NickServ creds: %s/%s (host=%s, status=%s)", network, nick, host, status)
@@ -268,21 +271,21 @@ class Backlog:
async def list_nickserv_creds(
self, network: str | None = None,
) -> list[tuple[str, str, str, str, float, str]]:
) -> list[tuple[str, str, str, str, float, str, str]]:
"""List NickServ credentials, optionally filtered by network.
Returns list of (network, nick, email, host, registered_at, status).
Returns list of (network, nick, email, host, registered_at, status, verify_url).
"""
assert self._db is not None
if network:
cursor = await self._db.execute(
"SELECT network, nick, email, host, registered_at, status "
"SELECT network, nick, email, host, registered_at, status, verify_url "
"FROM nickserv_creds WHERE network = ? ORDER BY registered_at DESC",
(network,),
)
else:
cursor = await self._db.execute(
"SELECT network, nick, email, host, registered_at, status "
"SELECT network, nick, email, host, registered_at, status, verify_url "
"FROM nickserv_creds ORDER BY network, registered_at DESC",
)
return await cursor.fetchall()

124
src/bouncer/captcha.py Normal file
View File

@@ -0,0 +1,124 @@
"""hCaptcha solver via NoCaptchaAI token service."""
from __future__ import annotations
import asyncio
import logging
import re
import aiohttp
from aiohttp_socks import ProxyConnector
log = logging.getLogger(__name__)
_TOKEN_URL = "https://token.nocaptchaai.com/token"
_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
)
def _extract_sitekey(html: str) -> str | None:
"""Extract hCaptcha sitekey from page HTML."""
match = re.search(r'data-sitekey=["\']([a-f0-9-]+)["\']', html)
return match.group(1) if match else None
async def solve_hcaptcha(
page_url: str,
page_html: str,
api_key: str,
proxy_host: str = "127.0.0.1",
proxy_port: int = 1080,
poll_interval: int = 3,
poll_timeout: int = 120,
) -> str | None:
"""Solve an hCaptcha challenge via NoCaptchaAI.
Extracts the sitekey from the page HTML, submits to the token service,
polls for the result, and returns the hCaptcha response token.
All HTTP traffic goes through the SOCKS5 proxy.
Returns the solved token string, or None on failure.
"""
sitekey = _extract_sitekey(page_html)
if not sitekey:
log.warning("captcha: no hCaptcha sitekey found in page")
return None
log.info("captcha: solving hCaptcha (sitekey=%s, url=%s)", sitekey, page_url)
connector = ProxyConnector.from_url(
f"socks5://{proxy_host}:{proxy_port}",
)
headers = {
"Content-Type": "application/json",
"apikey": api_key,
}
async with aiohttp.ClientSession(connector=connector) as session:
# Step 1: create task
payload = {
"type": "hcaptcha",
"url": page_url,
"sitekey": sitekey,
"useragent": _USER_AGENT,
}
try:
resp = await session.post(
_TOKEN_URL, json=payload, headers=headers,
timeout=aiohttp.ClientTimeout(total=30),
)
data = await resp.json()
except Exception as e:
log.warning("captcha: failed to create task: %s", e)
return None
status = data.get("status", "")
if status == "processed":
token = data.get("token", "")
log.info("captcha: solved immediately")
return token or None
poll_url = data.get("url")
task_id = data.get("id", "")
if not poll_url:
log.warning("captcha: no poll URL in response: %s", data)
return None
log.info("captcha: task created (id=%s), polling...", task_id)
# Step 2: poll for result
elapsed = 0
await asyncio.sleep(7) # initial wait per API docs
elapsed += 7
while elapsed < poll_timeout:
try:
resp = await session.get(
poll_url, headers=headers,
timeout=aiohttp.ClientTimeout(total=15),
)
data = await resp.json()
except Exception as e:
log.warning("captcha: poll error: %s", e)
await asyncio.sleep(poll_interval)
elapsed += poll_interval
continue
status = data.get("status", "")
if status == "processed":
token = data.get("token", "")
log.info("captcha: solved (elapsed=%ds)", elapsed)
return token or None
if status == "failed":
log.warning("captcha: solve failed: %s", data.get("message", ""))
return None
await asyncio.sleep(poll_interval)
elapsed += poll_interval
log.warning("captcha: timed out after %ds", poll_timeout)
return None

View File

@@ -14,7 +14,7 @@ from cryptography.x509.oid import NameOID
log = logging.getLogger(__name__)
_VALIDITY_DAYS = 3650 # ~10 years
DEFAULT_VALIDITY_DAYS = 3650 # ~10 years
def cert_path(data_dir: Path, network: str, nick: str) -> Path:
@@ -22,7 +22,10 @@ def cert_path(data_dir: Path, network: str, nick: str) -> Path:
return data_dir / "certs" / network / f"{nick}.pem"
def generate_cert(data_dir: Path, network: str, nick: str) -> Path:
def generate_cert(
data_dir: Path, network: str, nick: str,
validity_days: int = DEFAULT_VALIDITY_DAYS,
) -> Path:
"""Generate a self-signed EC P-256 client certificate.
Creates a combined PEM file (cert + key) at the standard path.
@@ -45,7 +48,7 @@ def generate_cert(data_dir: Path, network: str, nick: str) -> Path:
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=_VALIDITY_DAYS))
.not_valid_after(now + datetime.timedelta(days=validity_days))
.sign(key, hashes.SHA256())
)

View File

@@ -183,7 +183,7 @@ async def _cmd_info(router: Router, network_name: str) -> list[str]:
if router.backlog:
rows = await router.backlog.list_nickserv_creds(net.cfg.name)
if rows:
_net, nick, email, _host, _ts, status = rows[0]
_net, nick, email, _host, _ts, status, _url = rows[0]
lines.append(f" NickServ {nick} ({status})")
else:
lines.append(" NickServ (no credentials)")
@@ -244,10 +244,12 @@ async def _cmd_creds(router: Router, network_name: str | None) -> list[str]:
return [f"[CREDS] no stored credentials for {scope}"]
lines = ["[CREDS]"]
for net, nick, email, host, registered_at, status in rows:
for net, nick, email, host, registered_at, status, verify_url in rows:
indicator = "+" if status == "verified" else "~"
email_display = email if email else "--"
lines.append(f" {indicator} {net} {nick} {status} {email_display}")
if verify_url and status == "pending":
lines.append(f" verify: {verify_url}")
lines.append("")
lines.append(" + verified ~ pending")
@@ -704,7 +706,8 @@ async def _cmd_gencert(router: Router, arg: str) -> list[str]:
return [f"No nick available for {net.cfg.name}",
"Usage: GENCERT <network> <nick>"]
pem = generate_cert(DATA_DIR, net.cfg.name, nick)
validity_days = router.config.bouncer.cert_validity_days
pem = generate_cert(DATA_DIR, net.cfg.name, nick, validity_days=validity_days)
fp = fingerprint(pem)
lines = [f"[GENCERT] {net.cfg.name}/{nick}"]

View File

@@ -59,6 +59,26 @@ class BouncerConfig:
password: str = "changeme"
backlog: BacklogConfig = field(default_factory=BacklogConfig)
# Captcha solving (NoCaptchaAI)
captcha_api_key: str = ""
captcha_poll_interval: int = 3
captcha_poll_timeout: int = 120
# Connection tuning
probation_seconds: int = 45
backoff_steps: list[int] = field(default_factory=lambda: [5, 10, 30, 60, 120, 300])
nick_timeout: int = 10
rejoin_delay: int = 3
http_timeout: int = 15
# Email verification
email_poll_interval: int = 15
email_max_polls: int = 30
email_request_timeout: int = 20
# Certificate generation
cert_validity_days: int = 3650
@dataclass(slots=True)
class Config:
@@ -82,6 +102,18 @@ def load(path: Path) -> Config:
port=bouncer_raw.get("port", 6667),
password=bouncer_raw.get("password", "changeme"),
backlog=BacklogConfig(**backlog_raw),
captcha_api_key=bouncer_raw.get("captcha_api_key", ""),
captcha_poll_interval=bouncer_raw.get("captcha_poll_interval", 3),
captcha_poll_timeout=bouncer_raw.get("captcha_poll_timeout", 120),
probation_seconds=bouncer_raw.get("probation_seconds", 45),
backoff_steps=bouncer_raw.get("backoff_steps", [5, 10, 30, 60, 120, 300]),
nick_timeout=bouncer_raw.get("nick_timeout", 10),
rejoin_delay=bouncer_raw.get("rejoin_delay", 3),
http_timeout=bouncer_raw.get("http_timeout", 15),
email_poll_interval=bouncer_raw.get("email_poll_interval", 15),
email_max_polls=bouncer_raw.get("email_max_polls", 30),
email_request_timeout=bouncer_raw.get("email_request_timeout", 20),
cert_validity_days=bouncer_raw.get("cert_validity_days", 3650),
)
proxy_raw = raw.get("proxy", {})

View File

@@ -28,12 +28,11 @@ from aiohttp_socks import ProxyConnector
log = logging.getLogger(__name__)
POLL_INTERVAL = 15 # seconds between inbox checks
MAX_POLLS = 30 # ~7.5 minutes total
REQUEST_TIMEOUT = 20 # per-request timeout
DEFAULT_POLL_INTERVAL = 15 # seconds between inbox checks
DEFAULT_MAX_POLLS = 30 # ~7.5 minutes total
DEFAULT_REQUEST_TIMEOUT = 20 # per-request timeout
REQUEST_RETRIES = 4 # retries per API call
RETRY_BACKOFF = [2, 5, 10, 20] # seconds between retries
PW_PAGE_TIMEOUT = 20000 # playwright page load timeout (ms)
# ---------------------------------------------------------------------------
# Domain registry
@@ -100,6 +99,9 @@ async def verify_email(
email_addr: str,
proxy_host: str = "127.0.0.1",
proxy_port: int = 1080,
poll_interval: int = DEFAULT_POLL_INTERVAL,
max_polls: int = DEFAULT_MAX_POLLS,
request_timeout: int = DEFAULT_REQUEST_TIMEOUT,
) -> VerifyResult | None:
"""Poll temp email provider for a NickServ verification code.
@@ -110,17 +112,19 @@ async def verify_email(
_, domain = email_addr.rsplit("@", 1)
kw = dict(poll_interval=poll_interval, max_polls=max_polls,
request_timeout=request_timeout)
if domain in GUERRILLA_DOMAINS:
return await _guerrilla_verify(email_addr, proxy_host, proxy_port)
return await _guerrilla_verify(email_addr, proxy_host, proxy_port, **kw)
elif domain in YOPMAIL_DOMAINS:
return await _yopmail_verify(email_addr, proxy_host, proxy_port)
return await _yopmail_verify(email_addr, proxy_host, proxy_port, **kw)
elif domain in TRASHMAILR_DOMAINS:
return await _trashmailr_verify(email_addr, proxy_host, proxy_port)
return await _trashmailr_verify(email_addr, proxy_host, proxy_port, **kw)
elif domain in _mailtm_domains:
api_base = _mailtm_domain_api.get(domain, MAILTM_APIS[0])
return await _mailtm_verify(email_addr, api_base, proxy_host, proxy_port)
return await _mailtm_verify(email_addr, api_base, proxy_host, proxy_port, **kw)
elif domain in _tempmail_domains:
return await _tempmail_verify(email_addr, proxy_host, proxy_port)
return await _tempmail_verify(email_addr, proxy_host, proxy_port, **kw)
else:
log.warning("unsupported email domain: %s", domain)
return None
@@ -135,6 +139,7 @@ async def _proxy_json(
method: str = "GET", params: dict | None = None,
json_body: dict | None = None, headers: dict | None = None,
bearer: str | None = None,
request_timeout: int = DEFAULT_REQUEST_TIMEOUT,
) -> dict | list | None:
"""HTTP request through SOCKS5 proxy with retries and fresh connector.
@@ -145,7 +150,7 @@ async def _proxy_json(
try:
proxy_url = f"socks5://{proxy_host}:{proxy_port}"
connector = ProxyConnector.from_url(proxy_url)
timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)
timeout = aiohttp.ClientTimeout(total=request_timeout)
hdrs = dict(headers or {})
if bearer:
hdrs["Authorization"] = f"Bearer {bearer}"
@@ -199,6 +204,9 @@ async def _pw_launch(pw, proxy_host: str, proxy_port: int):
async def _guerrilla_verify(
email_addr: str, proxy_host: str, proxy_port: int,
poll_interval: int = DEFAULT_POLL_INTERVAL,
max_polls: int = DEFAULT_MAX_POLLS,
request_timeout: int = DEFAULT_REQUEST_TIMEOUT,
) -> VerifyResult | None:
"""Poll guerrillamail API for verification code."""
local, domain = email_addr.rsplit("@", 1)
@@ -210,19 +218,19 @@ async def _guerrilla_verify(
log.info("polling guerrillamail for %s (sid=%s...)", email_addr, sid[:8])
for attempt in range(MAX_POLLS):
await asyncio.sleep(POLL_INTERVAL)
for attempt in range(max_polls):
await asyncio.sleep(poll_interval)
try:
result = await _gm_check(proxy_host, proxy_port, sid)
if result:
log.info("found verification code for %s: %s", email_addr, result)
return result
log.debug("poll %d/%d: no verification email yet", attempt + 1, MAX_POLLS)
log.debug("poll %d/%d: no verification email yet", attempt + 1, max_polls)
except asyncio.CancelledError:
raise
except Exception:
log.debug("poll %d/%d failed, will retry next cycle",
attempt + 1, MAX_POLLS, exc_info=True)
attempt + 1, max_polls, exc_info=True)
log.warning("gave up polling guerrillamail for %s", email_addr)
return None
@@ -303,6 +311,9 @@ async def _gm_check(
async def _mailtm_verify(
email_addr: str, api_base: str, proxy_host: str, proxy_port: int,
poll_interval: int = DEFAULT_POLL_INTERVAL,
max_polls: int = DEFAULT_MAX_POLLS,
request_timeout: int = DEFAULT_REQUEST_TIMEOUT,
) -> VerifyResult | None:
"""Poll mail.tm/gw API for verification code."""
local, domain = email_addr.rsplit("@", 1)
@@ -313,6 +324,7 @@ async def _mailtm_verify(
proxy_host, proxy_port, f"{api_base}/accounts",
method="POST",
json_body={"address": email_addr, "password": password},
request_timeout=request_timeout,
)
if not data or not isinstance(data, dict) or not data.get("id"):
log.warning("failed to create mail.tm account for %s", email_addr)
@@ -323,6 +335,7 @@ async def _mailtm_verify(
proxy_host, proxy_port, f"{api_base}/token",
method="POST",
json_body={"address": email_addr, "password": password},
request_timeout=request_timeout,
)
if not token_data or not isinstance(token_data, dict):
log.warning("failed to get mail.tm token for %s", email_addr)
@@ -333,12 +346,13 @@ async def _mailtm_verify(
log.info("polling mail.tm for %s (api=%s)", email_addr, api_base)
for attempt in range(MAX_POLLS):
await asyncio.sleep(POLL_INTERVAL)
for attempt in range(max_polls):
await asyncio.sleep(poll_interval)
try:
msgs = await _proxy_json(
proxy_host, proxy_port, f"{api_base}/messages",
bearer=token,
request_timeout=request_timeout,
)
if not msgs or not isinstance(msgs, dict):
continue
@@ -358,6 +372,7 @@ async def _mailtm_verify(
detail = await _proxy_json(
proxy_host, proxy_port, f"{api_base}/messages/{msg_id}",
bearer=token,
request_timeout=request_timeout,
)
if not detail or not isinstance(detail, dict):
continue
@@ -368,11 +383,11 @@ async def _mailtm_verify(
log.info("found verification code for %s: %s", email_addr, result)
return result
log.debug("poll %d/%d: no verification email yet", attempt + 1, MAX_POLLS)
log.debug("poll %d/%d: no verification email yet", attempt + 1, max_polls)
except asyncio.CancelledError:
raise
except Exception:
log.debug("mail.tm poll %d/%d failed", attempt + 1, MAX_POLLS, exc_info=True)
log.debug("mail.tm poll %d/%d failed", attempt + 1, max_polls, exc_info=True)
log.warning("gave up polling mail.tm for %s", email_addr)
return None
@@ -384,6 +399,9 @@ async def _mailtm_verify(
async def _yopmail_verify(
email_addr: str, proxy_host: str, proxy_port: int,
poll_interval: int = DEFAULT_POLL_INTERVAL,
max_polls: int = DEFAULT_MAX_POLLS,
request_timeout: int = DEFAULT_REQUEST_TIMEOUT,
) -> VerifyResult | None:
"""Poll yopmail via Playwright for verification code."""
try:
@@ -393,19 +411,20 @@ async def _yopmail_verify(
return None
local = email_addr.rsplit("@", 1)[0]
pw_timeout = request_timeout * 1000 # Playwright uses milliseconds
try:
async with async_playwright() as p:
browser, page = await _pw_launch(p, proxy_host, proxy_port)
log.info("polling yopmail for %s", email_addr)
for attempt in range(MAX_POLLS):
await asyncio.sleep(POLL_INTERVAL)
for attempt in range(max_polls):
await asyncio.sleep(poll_interval)
try:
await page.goto(
f"https://yopmail.com/en/?login={local}",
wait_until="networkidle",
timeout=PW_PAGE_TIMEOUT,
timeout=pw_timeout,
)
inbox_frame = page.frame("ifinbox")
@@ -414,7 +433,7 @@ async def _yopmail_verify(
first_mail = inbox_frame.locator(".m")
if await first_mail.count() == 0:
log.debug("poll %d/%d: yopmail inbox empty", attempt + 1, MAX_POLLS)
log.debug("poll %d/%d: yopmail inbox empty", attempt + 1, max_polls)
continue
await first_mail.first.click()
@@ -451,6 +470,9 @@ async def _yopmail_verify(
async def _trashmailr_verify(
email_addr: str, proxy_host: str, proxy_port: int,
poll_interval: int = DEFAULT_POLL_INTERVAL,
max_polls: int = DEFAULT_MAX_POLLS,
request_timeout: int = DEFAULT_REQUEST_TIMEOUT,
) -> VerifyResult | None:
"""Poll trashmailr.com via Playwright for verification code.
@@ -463,18 +485,19 @@ async def _trashmailr_verify(
return None
local, domain = email_addr.rsplit("@", 1)
pw_timeout = request_timeout * 1000 # Playwright uses milliseconds
try:
async with async_playwright() as p:
browser, page = await _pw_launch(p, proxy_host, proxy_port)
log.info("polling trashmailr for %s", email_addr)
for attempt in range(MAX_POLLS):
await asyncio.sleep(POLL_INTERVAL)
for attempt in range(max_polls):
await asyncio.sleep(poll_interval)
try:
# Navigate to inbox list for this address
url = f"https://trashmailr.com/inbox/list.htm?mailAddress={local}@{domain}"
await page.goto(url, wait_until="networkidle", timeout=PW_PAGE_TIMEOUT)
await page.goto(url, wait_until="networkidle", timeout=pw_timeout)
# Wait for mail list to render
try:
@@ -489,7 +512,7 @@ async def _trashmailr_verify(
rows = page.locator(".mailList tr, .mail-item, [data-mail-id]")
count = await rows.count()
if count == 0:
log.debug("poll %d/%d: trashmailr inbox empty", attempt + 1, MAX_POLLS)
log.debug("poll %d/%d: trashmailr inbox empty", attempt + 1, max_polls)
continue
# Check each email
@@ -541,6 +564,9 @@ async def _trashmailr_verify(
async def _tempmail_verify(
email_addr: str, proxy_host: str, proxy_port: int,
poll_interval: int = DEFAULT_POLL_INTERVAL,
max_polls: int = DEFAULT_MAX_POLLS,
request_timeout: int = DEFAULT_REQUEST_TIMEOUT,
) -> VerifyResult | None:
"""Poll temp-mail.org via Playwright for verification code."""
try:
@@ -550,6 +576,7 @@ async def _tempmail_verify(
return None
local, domain = email_addr.rsplit("@", 1)
pw_timeout = request_timeout * 1000 # Playwright uses milliseconds
try:
async with async_playwright() as p:
@@ -558,7 +585,7 @@ async def _tempmail_verify(
# Set the email address
await page.goto("https://temp-mail.org/en/",
wait_until="networkidle", timeout=PW_PAGE_TIMEOUT)
wait_until="networkidle", timeout=pw_timeout)
# Try to set custom address via the change button
change_btn = page.locator("#click-to-edit, .click-to-edit, [data-clipboard-action]")
@@ -580,8 +607,8 @@ async def _tempmail_verify(
await save_btn.first.click()
await asyncio.sleep(2)
for attempt in range(MAX_POLLS):
await asyncio.sleep(POLL_INTERVAL)
for attempt in range(max_polls):
await asyncio.sleep(poll_interval)
try:
# Refresh the inbox
refresh_btn = page.locator("#refresh, .refresh, [data-type='refresh']")
@@ -589,13 +616,13 @@ async def _tempmail_verify(
await refresh_btn.first.click()
await asyncio.sleep(3)
else:
await page.reload(wait_until="networkidle", timeout=PW_PAGE_TIMEOUT)
await page.reload(wait_until="networkidle", timeout=pw_timeout)
# Check for emails in the inbox
mail_items = page.locator(".inbox-dataList li, .mail-item, .message-list-item")
count = await mail_items.count()
if count == 0:
log.debug("poll %d/%d: temp-mail inbox empty", attempt + 1, MAX_POLLS)
log.debug("poll %d/%d: temp-mail inbox empty", attempt + 1, max_polls)
continue
for i in range(count):
@@ -631,7 +658,10 @@ async def _tempmail_verify(
return None
async def fetch_tempmail_domains(proxy_host: str, proxy_port: int) -> set[str]:
async def fetch_tempmail_domains(
proxy_host: str, proxy_port: int,
request_timeout: int = DEFAULT_REQUEST_TIMEOUT,
) -> set[str]:
"""Fetch available domains from temp-mail.org via Playwright.
Updates the module-level _tempmail_domains cache.
@@ -641,12 +671,13 @@ async def fetch_tempmail_domains(proxy_host: str, proxy_port: int) -> set[str]:
except ImportError:
return set()
pw_timeout = request_timeout * 1000 # Playwright uses milliseconds
discovered: set[str] = set()
try:
async with async_playwright() as p:
browser, page = await _pw_launch(p, proxy_host, proxy_port)
await page.goto("https://temp-mail.org/en/",
wait_until="networkidle", timeout=PW_PAGE_TIMEOUT)
wait_until="networkidle", timeout=pw_timeout)
# The domain is shown in the email display or a dropdown
# Try to find domain options

View File

@@ -12,14 +12,13 @@ from pathlib import Path
from typing import Callable
from bouncer.backlog import Backlog
from bouncer.config import NetworkConfig, ProxyConfig
from bouncer.config import BouncerConfig, NetworkConfig, ProxyConfig
from bouncer.email import fetch_extra_domains, get_all_domains, verify_email
from bouncer.irc import IRCMessage, parse
log = logging.getLogger(__name__)
BACKOFF_STEPS = [5, 10, 30, 60, 120, 300]
PROBATION_SECONDS = 45
_DEFAULT_BOUNCER_CFG = BouncerConfig()
class State(Enum):
@@ -197,6 +196,7 @@ class Network:
on_message: Callable[[str, IRCMessage], None] | None = None,
on_status: Callable[[str, str], None] | None = None,
data_dir: Path | None = None,
bouncer_cfg: BouncerConfig | None = None,
) -> None:
self.cfg = cfg
self.proxy_cfg = proxy_cfg
@@ -204,6 +204,7 @@ class Network:
self.on_message = on_message
self.on_status = on_status # (network_name, status_text)
self.data_dir = data_dir
self.bouncer_cfg = bouncer_cfg or _DEFAULT_BOUNCER_CFG
self.nick: str = cfg.nick or "*"
self.channels: set[str] = set()
self.state: State = State.DISCONNECTED
@@ -235,6 +236,8 @@ class Network:
self._sasl_pass: str = ""
self._sasl_mechanism: str = "" # "EXTERNAL" or "PLAIN"
self._sasl_complete: asyncio.Event = asyncio.Event()
# URL for manual verification (e.g. OFTC captcha)
self._verify_url: str = ""
def _status(self, text: str) -> None:
"""Emit a status message to attached clients."""
@@ -381,7 +384,8 @@ class Network:
async def _reconnect_wait(self) -> None:
"""Wait with exponential backoff, then reconnect."""
delay = BACKOFF_STEPS[min(self._reconnect_attempt, len(BACKOFF_STEPS) - 1)]
steps = self.bouncer_cfg.backoff_steps
delay = steps[min(self._reconnect_attempt, len(steps) - 1)]
self._reconnect_attempt += 1
log.info(
"[%s] reconnecting in %ds (attempt %d)",
@@ -430,14 +434,14 @@ class Network:
self.state = State.PROBATION
log.info(
"[%s] probation started (%ds), watching for k-line...",
self.cfg.name, PROBATION_SECONDS,
self.cfg.name, self.bouncer_cfg.probation_seconds,
)
self._probation_task = asyncio.create_task(self._probation_timer())
async def _probation_timer(self) -> None:
"""Wait out the probation period, then transition to ready."""
try:
await asyncio.sleep(PROBATION_SECONDS)
await asyncio.sleep(self.bouncer_cfg.probation_seconds)
except asyncio.CancelledError:
return
@@ -495,11 +499,7 @@ class Network:
if self.cfg.auth_service == "qbot":
self._nickserv_done = asyncio.Event()
await self._qbot_auth()
try:
await asyncio.wait_for(self._nickserv_done.wait(), timeout=30)
except asyncio.TimeoutError:
log.warning("[%s] Q bot did not respond in 30s", self.cfg.name)
await self._nickserv_complete()
await self._nickserv_done.wait()
return
# Check for a pending registration from a previous session
@@ -511,14 +511,7 @@ class Network:
# Try NickServ: IDENTIFY first (previous session), else REGISTER
self._nickserv_done = asyncio.Event()
await self._nickserv_identify()
# If NickServ doesn't respond within 30s, move on
try:
await asyncio.wait_for(self._nickserv_done.wait(), timeout=30)
except asyncio.TimeoutError:
log.warning("[%s] NickServ did not respond in 30s", self.cfg.name)
self._nickserv_pending = ""
await self._nickserv_complete()
await self._nickserv_done.wait()
async def _nickserv_identify(self) -> None:
"""Attempt to IDENTIFY with NickServ using stored credentials.
@@ -539,7 +532,7 @@ class Network:
self._nick_confirmed.clear()
await self.send_raw("NICK", stored_nick)
try:
await asyncio.wait_for(self._nick_confirmed.wait(), timeout=10)
await asyncio.wait_for(self._nick_confirmed.wait(), timeout=self.bouncer_cfg.nick_timeout)
except asyncio.TimeoutError:
log.warning("[%s] nick change to %s not confirmed", self.cfg.name, stored_nick)
@@ -635,7 +628,7 @@ class Network:
self._nick_confirmed.clear()
await self.send_raw("NICK", self.cfg.nick)
try:
await asyncio.wait_for(self._nick_confirmed.wait(), timeout=10)
await asyncio.wait_for(self._nick_confirmed.wait(), timeout=self.bouncer_cfg.nick_timeout)
except asyncio.TimeoutError:
log.warning("[%s] nick change to %s not confirmed",
self.cfg.name, self.cfg.nick)
@@ -657,6 +650,9 @@ class Network:
self._nickserv_email,
proxy_host=self.proxy_cfg.host,
proxy_port=self.proxy_cfg.port,
poll_interval=self.bouncer_cfg.email_poll_interval,
max_polls=self.bouncer_cfg.email_max_polls,
request_timeout=self.bouncer_cfg.email_request_timeout,
)
if not result:
self._status("no verification code found in email")
@@ -696,6 +692,7 @@ class Network:
self.cfg.name, self.nick,
self._nickserv_password, "",
self.visible_host or "",
verify_url="",
)
self._nickserv_pending = ""
await self._nickserv_complete()
@@ -760,15 +757,17 @@ class Network:
async def _visit_verify_url(self, text: str) -> None:
"""Extract and visit a verification URL (e.g. OFTC) via SOCKS proxy.
Attempts a POST first (OFTC form), falls back to GET.
If the page requires a captcha, saves creds as pending and logs the URL.
Flow:
1. GET the page to check for captcha
2. If captcha found and API key configured, solve via NoCaptchaAI
3. POST the form with the solved token
4. If no API key, save as pending and log URL for manual verification
"""
import re
match = re.search(r'(https?://\S+/verify/\S+)', text)
if not match:
return
url = match.group(1)
# Extract token from URL path (after /verify/)
token = url.rsplit("/verify/", 1)[-1] if "/verify/" in url else ""
log.info("[%s] visiting verification URL: %s", self.cfg.name, url)
self._status(f"visiting verification URL...")
@@ -779,27 +778,75 @@ class Network:
f"socks5://{self.proxy_cfg.host}:{self.proxy_cfg.port}",
)
async with aiohttp.ClientSession(connector=connector) as session:
# Try POST with token (OFTC form submission)
resp = await session.post(
url, data={"token": token},
timeout=aiohttp.ClientTimeout(total=15),
# GET the page to inspect for captcha
resp = await session.get(
url, timeout=aiohttp.ClientTimeout(total=self.bouncer_cfg.http_timeout),
)
body = await resp.text()
if resp.status == 200 and "captcha" not in body.lower():
log.info("[%s] verification URL accepted (POST %d)",
self.cfg.name, resp.status)
if resp.status != 200:
log.warning("[%s] verify page returned %d", self.cfg.name, resp.status)
self._status(f"verify manually: {url}")
self._verify_url = url
await self._on_register_success()
return
# No captcha -- submit directly
if "captcha" not in body.lower() and "h-captcha" not in body.lower():
resp = await session.post(
url, data={"token": token},
timeout=aiohttp.ClientTimeout(total=self.bouncer_cfg.http_timeout),
)
if resp.status == 200:
log.info("[%s] verification accepted (no captcha)", self.cfg.name)
self._status("verification accepted")
await self._on_verify_success()
return
# Captcha or unexpected response -- save as pending, log URL
log.warning("[%s] verification requires captcha, visit manually: %s",
self.cfg.name, url)
# Captcha detected -- try solving with NoCaptchaAI
if self.bouncer_cfg.captcha_api_key:
from bouncer.captcha import solve_hcaptcha
self._status("solving captcha...")
solved = await solve_hcaptcha(
page_url=url,
page_html=body,
api_key=self.bouncer_cfg.captcha_api_key,
poll_interval=self.bouncer_cfg.captcha_poll_interval,
poll_timeout=self.bouncer_cfg.captcha_poll_timeout,
proxy_host=self.proxy_cfg.host,
proxy_port=self.proxy_cfg.port,
)
if solved:
log.info("[%s] captcha solved, submitting form", self.cfg.name)
self._status("captcha solved, verifying...")
resp = await session.post(
url,
data={
"token": token,
"h-captcha-response": solved,
},
timeout=aiohttp.ClientTimeout(total=self.bouncer_cfg.http_timeout),
)
result_body = await resp.text()
if resp.status == 200 and "verified" in result_body.lower():
log.info("[%s] verification accepted", self.cfg.name)
self._status("verification accepted")
await self._on_verify_success()
return
log.warning("[%s] form submission failed (%d): %s",
self.cfg.name, resp.status, result_body[:200])
else:
log.warning("[%s] captcha solve failed", self.cfg.name)
# Fallback: save as pending with URL
log.warning("[%s] verify manually: %s", self.cfg.name, url)
self._status(f"verify manually: {url}")
self._verify_url = url
await self._on_register_success()
except Exception as e:
log.warning("[%s] failed to visit verification URL: %s", self.cfg.name, e)
self._status(f"verify manually: {url}")
self._verify_url = url
await self._on_register_success()
def _registration_confirmed(self, lower: str) -> bool:
@@ -838,6 +885,7 @@ class Network:
self._nickserv_password, self._nickserv_email,
self.visible_host or "",
status="pending",
verify_url=self._verify_url,
)
self._nickserv_pending = "verify"
# Start email verification in the background (if not already running)
@@ -888,7 +936,7 @@ class Network:
self._nick_confirmed.clear()
await self.send_raw("NICK", p_nick)
try:
await asyncio.wait_for(self._nick_confirmed.wait(), timeout=10)
await asyncio.wait_for(self._nick_confirmed.wait(), timeout=self.bouncer_cfg.nick_timeout)
except asyncio.TimeoutError:
log.warning("[%s] could not switch to pending nick %s",
self.cfg.name, p_nick)
@@ -1087,7 +1135,7 @@ class Network:
self.channels.discard(channel)
log.warning("[%s] kicked from %s", self.cfg.name, channel)
# Rejoin after a brief delay
await asyncio.sleep(3)
await asyncio.sleep(self.bouncer_cfg.rejoin_delay)
if channel in set(self.cfg.channels) and self._running and self.ready:
await self.send_raw("JOIN", channel)

View File

@@ -110,6 +110,7 @@ class Router:
on_message=self._on_network_message,
on_status=self._on_network_status,
data_dir=self.data_dir,
bouncer_cfg=self.config.bouncer,
)
self.networks[name] = network
asyncio.create_task(network.start())
@@ -294,6 +295,7 @@ class Router:
on_message=self._on_network_message,
on_status=self._on_network_status,
data_dir=self.data_dir,
bouncer_cfg=self.config.bouncer,
)
self.networks[cfg.name] = network
asyncio.create_task(network.start())

37
tests/test_captcha.py Normal file
View File

@@ -0,0 +1,37 @@
"""Tests for bouncer.captcha module."""
from bouncer.captcha import _extract_sitekey
class TestExtractSitekey:
"""Test hCaptcha sitekey extraction from HTML."""
def test_extracts_sitekey_from_div(self) -> None:
html = '<div class="h-captcha" data-sitekey="a1b2c3d4-e5f6-7890-abcd-ef1234567890"></div>'
assert _extract_sitekey(html) == "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
def test_extracts_sitekey_single_quotes(self) -> None:
html = "<div class='h-captcha' data-sitekey='10000000-ffff-ffff-ffff-000000000001'></div>"
assert _extract_sitekey(html) == "10000000-ffff-ffff-ffff-000000000001"
def test_returns_none_no_sitekey(self) -> None:
html = "<div>No captcha here</div>"
assert _extract_sitekey(html) is None
def test_returns_none_empty_html(self) -> None:
assert _extract_sitekey("") is None
def test_extracts_from_full_page(self) -> None:
html = """<!DOCTYPE html>
<html>
<head><title>Verify</title></head>
<body>
<form action="" method="POST">
<input type="hidden" name="token" value="abc123">
<div class="h-captcha" data-sitekey="abcdef01-2345-6789-abcd-ef0123456789"
data-callback="on_success"></div>
<input type="submit" value="Verify">
</form>
</body>
</html>"""
assert _extract_sitekey(html) == "abcdef01-2345-6789-abcd-ef0123456789"

View File

@@ -53,6 +53,15 @@ class TestGenerateCert:
assert pem1 == pem2
assert fp1 != fp2 # New cert = new fingerprint
def test_custom_validity_days(self, data_dir: Path) -> None:
import datetime
from cryptography import x509 as x509_mod
pem = generate_cert(data_dir, "libera", "testnick", validity_days=365)
cert_data = pem.read_bytes()
cert_obj = x509_mod.load_pem_x509_certificate(cert_data)
delta = cert_obj.not_valid_after_utc - cert_obj.not_valid_before_utc
assert 364 <= delta.days <= 366
class TestFingerprint:
def test_format(self, data_dir: Path) -> None:

View File

@@ -51,6 +51,7 @@ def _make_router(*networks: MagicMock) -> MagicMock:
router.add_network = AsyncMock()
router.remove_network = AsyncMock(return_value=True)
router.config = MagicMock()
router.config.bouncer.cert_validity_days = 3650
return router
@@ -143,7 +144,7 @@ class TestInfo:
host="user/fabesune", channels={"#test", "#dev"})
router = _make_router(net)
router.backlog.list_nickserv_creds.return_value = [
("libera", "fabesune", "test@mail.tm", "user/fabesune", 1700000000.0, "verified"),
("libera", "fabesune", "test@mail.tm", "user/fabesune", 1700000000.0, "verified", ""),
]
client = _make_client()
lines = await commands.dispatch("INFO libera", router, client)
@@ -218,14 +219,15 @@ class TestCreds:
net = _make_network("libera", State.READY)
router = _make_router(net)
router.backlog.list_nickserv_creds.return_value = [
("libera", "fabesune", "test@mail.tm", "user/fabesune", 1700000000.0, "verified"),
("libera", "oldnick", "old@mail.tm", "old/host", 1699000000.0, "pending"),
("libera", "fabesune", "test@mail.tm", "user/fabesune", 1700000000.0, "verified", ""),
("libera", "oldnick", "old@mail.tm", "old/host", 1699000000.0, "pending", "https://example.com/verify/abc"),
]
client = _make_client()
lines = await commands.dispatch("CREDS libera", router, client)
assert lines[0] == "[CREDS]"
assert any("+" in line and "fabesune" in line and "verified" in line for line in lines)
assert any("~" in line and "oldnick" in line and "pending" in line for line in lines)
assert any("verify: https://example.com/verify/abc" in line for line in lines)
@pytest.mark.asyncio
async def test_creds_unknown_network(self) -> None:
@@ -758,8 +760,8 @@ class TestDropCreds:
net = _make_network("libera", State.READY)
router = _make_router(net)
router.backlog.list_nickserv_creds.return_value = [
("libera", "nick1", "a@b.c", "", 0.0, "verified"),
("libera", "nick2", "d@e.f", "", 0.0, "pending"),
("libera", "nick1", "a@b.c", "", 0.0, "verified", ""),
("libera", "nick2", "d@e.f", "", 0.0, "pending", ""),
]
client = _make_client()
lines = await commands.dispatch("DROPCREDS libera", router, client)

View File

@@ -123,3 +123,58 @@ tls = true
"""
cfg = load(_write_config(config))
assert cfg.networks["test"].port == 6697
def test_operational_defaults(self):
"""Ensure all operational values have sane defaults."""
cfg = load(_write_config(MINIMAL_CONFIG))
b = cfg.bouncer
assert b.probation_seconds == 45
assert b.backoff_steps == [5, 10, 30, 60, 120, 300]
assert b.nick_timeout == 10
assert b.rejoin_delay == 3
assert b.http_timeout == 15
assert b.captcha_api_key == ""
assert b.captcha_poll_interval == 3
assert b.captcha_poll_timeout == 120
assert b.email_poll_interval == 15
assert b.email_max_polls == 30
assert b.email_request_timeout == 20
assert b.cert_validity_days == 3650
def test_operational_overrides(self):
"""Configurable operational values are parsed from TOML."""
config = """\
[bouncer]
password = "x"
probation_seconds = 60
backoff_steps = [10, 30, 90]
nick_timeout = 20
rejoin_delay = 5
http_timeout = 30
captcha_api_key = "test-key"
captcha_poll_interval = 5
captcha_poll_timeout = 60
email_poll_interval = 10
email_max_polls = 20
email_request_timeout = 25
cert_validity_days = 365
[proxy]
[networks.test]
host = "irc.example.com"
"""
cfg = load(_write_config(config))
b = cfg.bouncer
assert b.probation_seconds == 60
assert b.backoff_steps == [10, 30, 90]
assert b.nick_timeout == 20
assert b.rejoin_delay == 5
assert b.http_timeout == 30
assert b.captcha_api_key == "test-key"
assert b.captcha_poll_interval == 5
assert b.captcha_poll_timeout == 60
assert b.email_poll_interval == 10
assert b.email_max_polls == 20
assert b.email_request_timeout == 25
assert b.cert_validity_days == 365