feat: add bouncer control commands via /msg *bouncer
Users can now inspect bouncer state and manage it from their IRC client by sending PRIVMSG to *bouncer (or bouncer). Supported commands: HELP, STATUS, INFO, UPTIME, NETWORKS, CREDS. Responses arrive as NOTICE messages. All commands are case-insensitive. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2
TASKS.md
2
TASKS.md
@@ -13,6 +13,8 @@
|
||||
- [x] P1: Documentation update
|
||||
- [x] P1: Multi-network namespace multiplexing (`/network` suffixes)
|
||||
|
||||
- [x] P1: Bouncer control commands (`/msg *bouncer STATUS/INFO/UPTIME/NETWORKS/CREDS/HELP`)
|
||||
|
||||
## Next
|
||||
|
||||
- [ ] P2: Client-side TLS support
|
||||
|
||||
@@ -41,6 +41,18 @@ make clean # rm .venv, build artifacts
|
||||
PASS <password> # authenticate (all networks)
|
||||
```
|
||||
|
||||
## Bouncer Commands
|
||||
|
||||
```
|
||||
/msg *bouncer HELP # list commands
|
||||
/msg *bouncer STATUS # all network states
|
||||
/msg *bouncer INFO libera # detailed network info
|
||||
/msg *bouncer UPTIME # process uptime
|
||||
/msg *bouncer NETWORKS # list networks
|
||||
/msg *bouncer CREDS # all NickServ creds
|
||||
/msg *bouncer CREDS libera # creds for one network
|
||||
```
|
||||
|
||||
## Namespacing
|
||||
|
||||
```
|
||||
@@ -124,6 +136,7 @@ src/bouncer/
|
||||
proxy.py # SOCKS5 connector (local DNS, multi-IP)
|
||||
network.py # server connection + state machine
|
||||
client.py # client session handler
|
||||
commands.py # bouncer control commands (/msg *bouncer)
|
||||
router.py # message routing + backlog trigger
|
||||
server.py # TCP listener
|
||||
backlog.py # SQLite store/replay/prune
|
||||
|
||||
@@ -196,6 +196,42 @@ autojoin = true # auto-join channels on ready (default: true)
|
||||
password = "" # IRC server password (optional, for PASS command)
|
||||
```
|
||||
|
||||
## Bouncer Commands
|
||||
|
||||
Send a PRIVMSG to `*bouncer` (or `bouncer`) from your IRC client to inspect
|
||||
and control the bouncer. All commands are case-insensitive.
|
||||
|
||||
```
|
||||
/msg *bouncer HELP
|
||||
/msg *bouncer STATUS
|
||||
/msg *bouncer INFO libera
|
||||
/msg *bouncer UPTIME
|
||||
/msg *bouncer NETWORKS
|
||||
/msg *bouncer CREDS
|
||||
/msg *bouncer CREDS libera
|
||||
```
|
||||
|
||||
| Command | Description |
|
||||
|---------|-------------|
|
||||
| `HELP` | List available commands |
|
||||
| `STATUS` | Overview: state, nick, host per network |
|
||||
| `INFO <network>` | Detailed info for one network (state, server, channels, creds) |
|
||||
| `UPTIME` | Bouncer uptime since process start |
|
||||
| `NETWORKS` | List all configured networks with state |
|
||||
| `CREDS [network]` | NickServ credential status (all or per-network) |
|
||||
|
||||
Responses arrive as NOTICE messages from `*bouncer`.
|
||||
|
||||
### Example Output
|
||||
|
||||
```
|
||||
[STATUS]
|
||||
libera ready fabesune user/fabesune
|
||||
oftc ready ceraty cloaked.user
|
||||
hackint connecting (attempt 3)
|
||||
quakenet ready spetyo --
|
||||
```
|
||||
|
||||
## Stopping
|
||||
|
||||
Press `Ctrl+C` or send `SIGTERM`. The bouncer shuts down gracefully, closing
|
||||
|
||||
@@ -6,8 +6,10 @@ import asyncio
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from bouncer import commands
|
||||
from bouncer.backlog import Backlog
|
||||
from bouncer.cli import parse_args
|
||||
from bouncer.config import load
|
||||
@@ -21,7 +23,12 @@ def _setup_logging(verbose: bool) -> None:
|
||||
level = logging.DEBUG if verbose else logging.INFO
|
||||
fmt = "\033[2m%(asctime)s\033[0m %(levelname)-5s \033[38;5;110m%(name)s\033[0m %(message)s"
|
||||
datefmt = "%H:%M:%S"
|
||||
logging.basicConfig(level=level, format=fmt, datefmt=datefmt)
|
||||
logging.basicConfig(level=level, format=fmt, datefmt=datefmt, force=True)
|
||||
# Also log to file for container environments
|
||||
fh = logging.FileHandler("/data/bouncer.log")
|
||||
fh.setLevel(level)
|
||||
fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-5s %(name)s %(message)s", datefmt))
|
||||
logging.getLogger().addHandler(fh)
|
||||
|
||||
|
||||
async def _run(config_path: Path, verbose: bool) -> None:
|
||||
@@ -37,6 +44,8 @@ async def _run(config_path: Path, verbose: bool) -> None:
|
||||
backlog = Backlog(db_path)
|
||||
await backlog.open()
|
||||
|
||||
commands.STARTUP_TIME = time.time()
|
||||
|
||||
router = Router(cfg, backlog)
|
||||
await router.start_networks()
|
||||
|
||||
@@ -71,7 +80,19 @@ def main() -> None:
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
asyncio.run(_run(args.config, args.verbose))
|
||||
if args.cprofile:
|
||||
import cProfile
|
||||
|
||||
prof = cProfile.Profile()
|
||||
prof.enable()
|
||||
try:
|
||||
asyncio.run(_run(args.config, args.verbose))
|
||||
finally:
|
||||
prof.disable()
|
||||
prof.dump_stats(str(args.cprofile))
|
||||
print(f"cProfile stats written to {args.cprofile}", file=sys.stderr)
|
||||
else:
|
||||
asyncio.run(_run(args.config, args.verbose))
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
|
||||
@@ -29,8 +29,24 @@ CREATE TABLE IF NOT EXISTS client_state (
|
||||
last_seen_id INTEGER NOT NULL DEFAULT 0,
|
||||
last_disconnect REAL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS nickserv_creds (
|
||||
network TEXT NOT NULL,
|
||||
nick TEXT NOT NULL,
|
||||
password TEXT NOT NULL,
|
||||
email TEXT NOT NULL,
|
||||
registered_at REAL NOT NULL,
|
||||
host TEXT NOT NULL DEFAULT '',
|
||||
status TEXT NOT NULL DEFAULT 'verified',
|
||||
PRIMARY KEY (network, nick)
|
||||
);
|
||||
"""
|
||||
|
||||
# Migration: add status column if missing (existing DBs)
|
||||
_MIGRATIONS = [
|
||||
"ALTER TABLE nickserv_creds ADD COLUMN status TEXT NOT NULL DEFAULT 'verified'",
|
||||
]
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class BacklogEntry:
|
||||
@@ -57,8 +73,19 @@ class Backlog:
|
||||
self._db = await aiosqlite.connect(self._path)
|
||||
await self._db.executescript(SCHEMA)
|
||||
await self._db.commit()
|
||||
await self._run_migrations()
|
||||
log.debug("backlog database opened: %s", self._path)
|
||||
|
||||
async def _run_migrations(self) -> None:
|
||||
"""Apply schema migrations, skipping any that have already been applied."""
|
||||
assert self._db is not None
|
||||
for sql in _MIGRATIONS:
|
||||
try:
|
||||
await self._db.execute(sql)
|
||||
await self._db.commit()
|
||||
except Exception:
|
||||
pass # column/index already exists
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close the database connection."""
|
||||
if self._db:
|
||||
@@ -141,6 +168,134 @@ class Backlog:
|
||||
await self._db.commit()
|
||||
return cursor.rowcount # type: ignore[return-value]
|
||||
|
||||
async def save_nickserv_creds(
|
||||
self,
|
||||
network: str,
|
||||
nick: str,
|
||||
password: str,
|
||||
email: str,
|
||||
host: str,
|
||||
status: str = "verified",
|
||||
) -> None:
|
||||
"""Save NickServ credentials.
|
||||
|
||||
Status is 'pending' during registration (email not yet verified)
|
||||
or 'verified' after successful verification / IDENTIFY.
|
||||
"""
|
||||
assert self._db is not None
|
||||
await self._db.execute(
|
||||
"INSERT INTO nickserv_creds (network, nick, password, email, registered_at, host, status) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?) "
|
||||
"ON CONFLICT(network, nick) DO UPDATE SET "
|
||||
"password = excluded.password, email = excluded.email, "
|
||||
"registered_at = excluded.registered_at, host = excluded.host, "
|
||||
"status = excluded.status",
|
||||
(network, nick, password, email, time.time(), host, status),
|
||||
)
|
||||
await self._db.commit()
|
||||
log.info("saved NickServ creds: %s/%s (host=%s, status=%s)", network, nick, host, status)
|
||||
|
||||
async def get_nickserv_creds(
|
||||
self, network: str, nick: str
|
||||
) -> tuple[str, str] | None:
|
||||
"""Get stored NickServ password and email for a nick. Returns (password, email) or None."""
|
||||
assert self._db is not None
|
||||
cursor = await self._db.execute(
|
||||
"SELECT password, email FROM nickserv_creds WHERE network = ? AND nick = ?",
|
||||
(network, nick),
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
return (row[0], row[1]) if row else None
|
||||
|
||||
async def get_nickserv_creds_by_network(
|
||||
self, network: str,
|
||||
) -> tuple[str, str] | None:
|
||||
"""Get most recent verified NickServ nick and password for a network.
|
||||
|
||||
Only returns verified credentials (safe for SASL).
|
||||
Returns (nick, password) or None.
|
||||
"""
|
||||
assert self._db is not None
|
||||
cursor = await self._db.execute(
|
||||
"SELECT nick, password FROM nickserv_creds "
|
||||
"WHERE network = ? AND status = 'verified' "
|
||||
"ORDER BY registered_at DESC LIMIT 1",
|
||||
(network,),
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
return (row[0], row[1]) if row else None
|
||||
|
||||
async def get_nickserv_creds_by_host(
|
||||
self, network: str, host: str
|
||||
) -> tuple[str, str] | None:
|
||||
"""Get stored verified NickServ nick and password by host. Returns (nick, password) or None."""
|
||||
assert self._db is not None
|
||||
cursor = await self._db.execute(
|
||||
"SELECT nick, password FROM nickserv_creds "
|
||||
"WHERE network = ? AND host = ? AND status = 'verified' "
|
||||
"ORDER BY registered_at DESC LIMIT 1",
|
||||
(network, host),
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
return (row[0], row[1]) if row else None
|
||||
|
||||
async def get_pending_registration(
|
||||
self, network: str,
|
||||
) -> tuple[str, str, str, str] | None:
|
||||
"""Get a pending (unverified) registration for a network.
|
||||
|
||||
Returns (nick, password, email, host) or None.
|
||||
"""
|
||||
assert self._db is not None
|
||||
cursor = await self._db.execute(
|
||||
"SELECT nick, password, email, host FROM nickserv_creds "
|
||||
"WHERE network = ? AND status = 'pending' "
|
||||
"ORDER BY registered_at DESC LIMIT 1",
|
||||
(network,),
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
return (row[0], row[1], row[2], row[3]) if row else None
|
||||
|
||||
async def mark_nickserv_verified(self, network: str, nick: str) -> None:
|
||||
"""Promote a pending registration to verified."""
|
||||
assert self._db is not None
|
||||
await self._db.execute(
|
||||
"UPDATE nickserv_creds SET status = 'verified' WHERE network = ? AND nick = ?",
|
||||
(network, nick),
|
||||
)
|
||||
await self._db.commit()
|
||||
log.info("marked verified: %s/%s", network, nick)
|
||||
|
||||
async def list_nickserv_creds(
|
||||
self, network: str | None = None,
|
||||
) -> list[tuple[str, str, str, str, float, str]]:
|
||||
"""List NickServ credentials, optionally filtered by network.
|
||||
|
||||
Returns list of (network, nick, email, host, registered_at, status).
|
||||
"""
|
||||
assert self._db is not None
|
||||
if network:
|
||||
cursor = await self._db.execute(
|
||||
"SELECT network, nick, email, host, registered_at, status "
|
||||
"FROM nickserv_creds WHERE network = ? ORDER BY registered_at DESC",
|
||||
(network,),
|
||||
)
|
||||
else:
|
||||
cursor = await self._db.execute(
|
||||
"SELECT network, nick, email, host, registered_at, status "
|
||||
"FROM nickserv_creds ORDER BY network, registered_at DESC",
|
||||
)
|
||||
return await cursor.fetchall()
|
||||
|
||||
async def delete_nickserv_creds(self, network: str, nick: str) -> None:
|
||||
"""Remove stored credentials for a nick."""
|
||||
assert self._db is not None
|
||||
await self._db.execute(
|
||||
"DELETE FROM nickserv_creds WHERE network = ? AND nick = ?",
|
||||
(network, nick),
|
||||
)
|
||||
await self._db.commit()
|
||||
|
||||
async def _max_id(self, network: str) -> int:
|
||||
"""Get the maximum message ID for a network."""
|
||||
assert self._db is not None
|
||||
|
||||
@@ -6,6 +6,7 @@ import asyncio
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from bouncer import commands
|
||||
from bouncer.irc import IRCMessage, parse
|
||||
from bouncer.namespace import encode_channel
|
||||
|
||||
@@ -109,6 +110,13 @@ class Client:
|
||||
if msg.command == "QUIT":
|
||||
return
|
||||
|
||||
# Intercept bouncer control commands
|
||||
if msg.command == "PRIVMSG" and len(msg.params) >= 2:
|
||||
target = msg.params[0].lower()
|
||||
if target in ("*bouncer", "bouncer"):
|
||||
await self._handle_bouncer_command(msg.params[1])
|
||||
return
|
||||
|
||||
if msg.command in FORWARD_COMMANDS:
|
||||
await self._router.route_client_message(msg)
|
||||
|
||||
@@ -220,6 +228,16 @@ class Client:
|
||||
params=[nick, ns_channel, "End of /NAMES list"],
|
||||
))
|
||||
|
||||
async def _handle_bouncer_command(self, text: str) -> None:
|
||||
"""Dispatch a bouncer control command and send responses."""
|
||||
lines = await commands.dispatch(text, self._router, self)
|
||||
for line in lines:
|
||||
self._send_msg(IRCMessage(
|
||||
command="NOTICE",
|
||||
params=[self._nick, line],
|
||||
prefix="*bouncer!bouncer@bouncer",
|
||||
))
|
||||
|
||||
def _send_msg(self, msg: IRCMessage) -> None:
|
||||
"""Send an IRCMessage to this client."""
|
||||
self.write(msg.format())
|
||||
|
||||
192
src/bouncer/commands.py
Normal file
192
src/bouncer/commands.py
Normal file
@@ -0,0 +1,192 @@
|
||||
"""Bouncer control commands via /msg *bouncer."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from bouncer.network import State
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bouncer.client import Client
|
||||
from bouncer.router import Router
|
||||
|
||||
# Set by __main__.py before entering the event loop
|
||||
STARTUP_TIME: float = 0.0
|
||||
|
||||
_COMMANDS: dict[str, str] = {
|
||||
"HELP": "List available commands",
|
||||
"STATUS": "Overview of all networks",
|
||||
"INFO": "Detailed info for a network (INFO <network>)",
|
||||
"UPTIME": "Bouncer uptime since start",
|
||||
"NETWORKS": "List configured networks",
|
||||
"CREDS": "NickServ credential status (CREDS [network])",
|
||||
}
|
||||
|
||||
|
||||
async def dispatch(text: str, router: Router, client: Client) -> list[str]:
|
||||
"""Parse and execute a bouncer command. Returns response lines."""
|
||||
parts = text.strip().split(None, 1)
|
||||
if not parts:
|
||||
return _cmd_help()
|
||||
|
||||
cmd = parts[0].upper()
|
||||
arg = parts[1].strip() if len(parts) > 1 else ""
|
||||
|
||||
if cmd == "HELP":
|
||||
return _cmd_help()
|
||||
if cmd == "STATUS":
|
||||
return _cmd_status(router)
|
||||
if cmd == "INFO":
|
||||
return await _cmd_info(router, arg)
|
||||
if cmd == "UPTIME":
|
||||
return _cmd_uptime()
|
||||
if cmd == "NETWORKS":
|
||||
return _cmd_networks(router)
|
||||
if cmd == "CREDS":
|
||||
return await _cmd_creds(router, arg or None)
|
||||
|
||||
return [f"Unknown command: {cmd}", "Use HELP for available commands."]
|
||||
|
||||
|
||||
def _cmd_help() -> list[str]:
|
||||
"""List available commands."""
|
||||
lines = ["[HELP]"]
|
||||
# Align command names
|
||||
width = max(len(c) for c in _COMMANDS)
|
||||
for cmd, desc in _COMMANDS.items():
|
||||
lines.append(f" {cmd:<{width}} {desc}")
|
||||
return lines
|
||||
|
||||
|
||||
def _state_label(state: State) -> str:
|
||||
"""Human-readable state label."""
|
||||
return {
|
||||
State.DISCONNECTED: "disconnected",
|
||||
State.CONNECTING: "connecting",
|
||||
State.REGISTERING: "registering",
|
||||
State.PROBATION: "probation",
|
||||
State.READY: "ready",
|
||||
}.get(state, str(state))
|
||||
|
||||
|
||||
def _cmd_status(router: Router) -> list[str]:
|
||||
"""Overview: state, nick, host per network."""
|
||||
lines = ["[STATUS]"]
|
||||
if not router.networks:
|
||||
lines.append(" (no networks configured)")
|
||||
return lines
|
||||
|
||||
# Calculate column widths
|
||||
name_w = max(len(n) for n in router.networks)
|
||||
state_w = max(len(_state_label(net.state)) for net in router.networks.values())
|
||||
|
||||
for name in sorted(router.networks):
|
||||
net = router.networks[name]
|
||||
label = _state_label(net.state)
|
||||
if net.state == State.READY:
|
||||
host = net.visible_host or "--"
|
||||
lines.append(f" {name:<{name_w}} {label:<{state_w}} {net.nick:<14} {host}")
|
||||
elif net.state == State.CONNECTING:
|
||||
attempt = net._reconnect_attempt
|
||||
lines.append(f" {name:<{name_w}} {label} (attempt {attempt})")
|
||||
else:
|
||||
lines.append(f" {name:<{name_w}} {label}")
|
||||
|
||||
return lines
|
||||
|
||||
|
||||
async def _cmd_info(router: Router, network_name: str) -> list[str]:
|
||||
"""Detailed info for a single network."""
|
||||
if not network_name:
|
||||
return ["Usage: INFO <network>"]
|
||||
|
||||
net = router.get_network(network_name.lower())
|
||||
if not net:
|
||||
names = ", ".join(sorted(router.networks))
|
||||
return [f"Unknown network: {network_name}", f"Available: {names}"]
|
||||
|
||||
lines = [f"[INFO] {net.cfg.name}"]
|
||||
lines.append(f" State {_state_label(net.state)}")
|
||||
lines.append(f" Server {net.cfg.host}:{net.cfg.port} (tls={'yes' if net.cfg.tls else 'no'})")
|
||||
lines.append(f" Nick {net.nick}")
|
||||
lines.append(f" Host {net.visible_host or '--'}")
|
||||
|
||||
if net.channels:
|
||||
lines.append(f" Channels {', '.join(sorted(net.channels))}")
|
||||
else:
|
||||
lines.append(" Channels (none)")
|
||||
|
||||
if router.backlog:
|
||||
rows = await router.backlog.list_nickserv_creds(net.cfg.name)
|
||||
if rows:
|
||||
_net, nick, email, _host, _ts, status = rows[0]
|
||||
lines.append(f" NickServ {nick} ({status})")
|
||||
else:
|
||||
lines.append(" NickServ (no credentials)")
|
||||
|
||||
return lines
|
||||
|
||||
|
||||
def _cmd_uptime() -> list[str]:
|
||||
"""Bouncer uptime since process start."""
|
||||
if not STARTUP_TIME:
|
||||
return ["[UPTIME] unknown"]
|
||||
|
||||
elapsed = time.time() - STARTUP_TIME
|
||||
days, rem = divmod(int(elapsed), 86400)
|
||||
hours, rem = divmod(rem, 3600)
|
||||
minutes, secs = divmod(rem, 60)
|
||||
|
||||
parts = []
|
||||
if days:
|
||||
parts.append(f"{days}d")
|
||||
if hours:
|
||||
parts.append(f"{hours}h")
|
||||
if minutes:
|
||||
parts.append(f"{minutes}m")
|
||||
parts.append(f"{secs}s")
|
||||
|
||||
return [f"[UPTIME] {' '.join(parts)}"]
|
||||
|
||||
|
||||
def _cmd_networks(router: Router) -> list[str]:
|
||||
"""List all configured networks."""
|
||||
lines = ["[NETWORKS]"]
|
||||
if not router.networks:
|
||||
lines.append(" (none)")
|
||||
return lines
|
||||
|
||||
for name in sorted(router.networks):
|
||||
net = router.networks[name]
|
||||
label = _state_label(net.state)
|
||||
lines.append(f" {name} {label}")
|
||||
|
||||
return lines
|
||||
|
||||
|
||||
async def _cmd_creds(router: Router, network_name: str | None) -> list[str]:
|
||||
"""Show stored NickServ credentials and their status."""
|
||||
if not router.backlog:
|
||||
return ["[CREDS] backlog not available"]
|
||||
|
||||
net_filter = network_name.lower() if network_name else None
|
||||
if net_filter and net_filter not in router.networks:
|
||||
names = ", ".join(sorted(router.networks))
|
||||
return [f"Unknown network: {network_name}", f"Available: {names}"]
|
||||
|
||||
rows = await router.backlog.list_nickserv_creds(net_filter)
|
||||
if not rows:
|
||||
scope = net_filter or "any network"
|
||||
return [f"[CREDS] no stored credentials for {scope}"]
|
||||
|
||||
lines = ["[CREDS]"]
|
||||
for net, nick, email, host, registered_at, status in rows:
|
||||
indicator = "+" if status == "verified" else "~"
|
||||
email_display = email if email else "--"
|
||||
lines.append(f" {indicator} {net} {nick} {status} {email_display}")
|
||||
|
||||
lines.append("")
|
||||
lines.append(" + verified ~ pending")
|
||||
|
||||
return lines
|
||||
227
tests/test_commands.py
Normal file
227
tests/test_commands.py
Normal file
@@ -0,0 +1,227 @@
|
||||
"""Tests for bouncer control commands."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from bouncer import commands
|
||||
from bouncer.network import State
|
||||
|
||||
|
||||
def _make_network(name: str, state: State, nick: str = "testnick",
|
||||
host: str | None = None, channels: set[str] | None = None) -> MagicMock:
|
||||
"""Create a mock Network."""
|
||||
net = MagicMock()
|
||||
net.cfg.name = name
|
||||
net.cfg.host = f"irc.{name}.chat"
|
||||
net.cfg.port = 6697
|
||||
net.cfg.tls = True
|
||||
net.state = state
|
||||
net.nick = nick
|
||||
net.visible_host = host
|
||||
net.channels = channels or set()
|
||||
net._reconnect_attempt = 0
|
||||
return net
|
||||
|
||||
|
||||
def _make_router(*networks: MagicMock) -> MagicMock:
|
||||
"""Create a mock Router with the given networks."""
|
||||
router = MagicMock()
|
||||
router.networks = {n.cfg.name: n for n in networks}
|
||||
router.network_names.return_value = [n.cfg.name for n in networks]
|
||||
router.get_network = lambda name: router.networks.get(name)
|
||||
router.backlog = AsyncMock()
|
||||
return router
|
||||
|
||||
|
||||
def _make_client(nick: str = "testuser") -> MagicMock:
|
||||
"""Create a mock Client."""
|
||||
client = MagicMock()
|
||||
client.nick = nick
|
||||
return client
|
||||
|
||||
|
||||
class TestHelp:
|
||||
@pytest.mark.asyncio
|
||||
async def test_help_lists_commands(self) -> None:
|
||||
router = _make_router()
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("HELP", router, client)
|
||||
assert lines[0] == "[HELP]"
|
||||
assert any("STATUS" in line for line in lines)
|
||||
assert any("UPTIME" in line for line in lines)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_input_shows_help(self) -> None:
|
||||
router = _make_router()
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("", router, client)
|
||||
assert lines[0] == "[HELP]"
|
||||
|
||||
|
||||
class TestStatus:
|
||||
@pytest.mark.asyncio
|
||||
async def test_status_no_networks(self) -> None:
|
||||
router = _make_router()
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("STATUS", router, client)
|
||||
assert lines[0] == "[STATUS]"
|
||||
assert "(no networks configured)" in lines[1]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_status_ready_network(self) -> None:
|
||||
net = _make_network("libera", State.READY, nick="fabesune", host="user/fabesune")
|
||||
router = _make_router(net)
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("STATUS", router, client)
|
||||
assert lines[0] == "[STATUS]"
|
||||
assert "ready" in lines[1]
|
||||
assert "fabesune" in lines[1]
|
||||
assert "user/fabesune" in lines[1]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_status_connecting_shows_attempt(self) -> None:
|
||||
net = _make_network("hackint", State.CONNECTING)
|
||||
net._reconnect_attempt = 3
|
||||
router = _make_router(net)
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("STATUS", router, client)
|
||||
assert "connecting" in lines[1]
|
||||
assert "attempt 3" in lines[1]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_status_case_insensitive(self) -> None:
|
||||
net = _make_network("libera", State.READY, nick="testnick")
|
||||
router = _make_router(net)
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("status", router, client)
|
||||
assert lines[0] == "[STATUS]"
|
||||
|
||||
|
||||
class TestInfo:
|
||||
@pytest.mark.asyncio
|
||||
async def test_info_missing_arg(self) -> None:
|
||||
router = _make_router()
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("INFO", router, client)
|
||||
assert "Usage" in lines[0]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_info_unknown_network(self) -> None:
|
||||
net = _make_network("libera", State.READY)
|
||||
router = _make_router(net)
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("INFO fakenet", router, client)
|
||||
assert "Unknown network" in lines[0]
|
||||
assert "libera" in lines[1]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_info_valid_network(self) -> None:
|
||||
net = _make_network("libera", State.READY, nick="fabesune",
|
||||
host="user/fabesune", channels={"#test", "#dev"})
|
||||
router = _make_router(net)
|
||||
router.backlog.list_nickserv_creds.return_value = [
|
||||
("libera", "fabesune", "test@mail.tm", "user/fabesune", 1700000000.0, "verified"),
|
||||
]
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("INFO libera", router, client)
|
||||
assert lines[0] == "[INFO] libera"
|
||||
assert any("ready" in line for line in lines)
|
||||
assert any("fabesune" in line for line in lines)
|
||||
assert any("#dev" in line or "#test" in line for line in lines)
|
||||
assert any("verified" in line for line in lines)
|
||||
|
||||
|
||||
class TestUptime:
|
||||
@pytest.mark.asyncio
|
||||
async def test_uptime(self) -> None:
|
||||
commands.STARTUP_TIME = time.time() - 3661 # 1h 1m 1s
|
||||
router = _make_router()
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("UPTIME", router, client)
|
||||
assert lines[0].startswith("[UPTIME]")
|
||||
assert "1h" in lines[0]
|
||||
assert "1m" in lines[0]
|
||||
assert "1s" in lines[0]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_uptime_unknown(self) -> None:
|
||||
commands.STARTUP_TIME = 0.0
|
||||
router = _make_router()
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("UPTIME", router, client)
|
||||
assert "unknown" in lines[0]
|
||||
|
||||
|
||||
class TestNetworks:
|
||||
@pytest.mark.asyncio
|
||||
async def test_networks_empty(self) -> None:
|
||||
router = _make_router()
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("NETWORKS", router, client)
|
||||
assert lines[0] == "[NETWORKS]"
|
||||
assert "(none)" in lines[1]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_networks_lists_all(self) -> None:
|
||||
libera = _make_network("libera", State.READY)
|
||||
oftc = _make_network("oftc", State.CONNECTING)
|
||||
router = _make_router(libera, oftc)
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("NETWORKS", router, client)
|
||||
assert lines[0] == "[NETWORKS]"
|
||||
assert any("libera" in line and "ready" in line for line in lines[1:])
|
||||
assert any("oftc" in line and "connecting" in line for line in lines[1:])
|
||||
|
||||
|
||||
class TestCreds:
|
||||
@pytest.mark.asyncio
|
||||
async def test_creds_no_backlog(self) -> None:
|
||||
router = _make_router()
|
||||
router.backlog = None
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("CREDS", router, client)
|
||||
assert "not available" in lines[0]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_creds_empty(self) -> None:
|
||||
router = _make_router()
|
||||
router.backlog.list_nickserv_creds.return_value = []
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("CREDS", router, client)
|
||||
assert "no stored credentials" in lines[0]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_creds_lists_entries(self) -> None:
|
||||
net = _make_network("libera", State.READY)
|
||||
router = _make_router(net)
|
||||
router.backlog.list_nickserv_creds.return_value = [
|
||||
("libera", "fabesune", "test@mail.tm", "user/fabesune", 1700000000.0, "verified"),
|
||||
("libera", "oldnick", "old@mail.tm", "old/host", 1699000000.0, "pending"),
|
||||
]
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("CREDS libera", router, client)
|
||||
assert lines[0] == "[CREDS]"
|
||||
assert any("+" in line and "fabesune" in line and "verified" in line for line in lines)
|
||||
assert any("~" in line and "oldnick" in line and "pending" in line for line in lines)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_creds_unknown_network(self) -> None:
|
||||
net = _make_network("libera", State.READY)
|
||||
router = _make_router(net)
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("CREDS fakenet", router, client)
|
||||
assert "Unknown network" in lines[0]
|
||||
|
||||
|
||||
class TestUnknownCommand:
|
||||
@pytest.mark.asyncio
|
||||
async def test_unknown_command(self) -> None:
|
||||
router = _make_router()
|
||||
client = _make_client()
|
||||
lines = await commands.dispatch("FOOBAR", router, client)
|
||||
assert "Unknown command" in lines[0]
|
||||
assert "HELP" in lines[1]
|
||||
Reference in New Issue
Block a user