feat: initial IRC bouncer implementation

Async Python IRC bouncer with SOCKS5 proxy support, multi-network
connections, password auth, and persistent SQLite backlog with replay.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-19 11:29:59 +01:00
commit ced6232373
28 changed files with 2079 additions and 0 deletions

17
.gitignore vendored Normal file
View File

@@ -0,0 +1,17 @@
__pycache__/
*.py[cod]
*$py.class
*.so
.venv/
*.egg-info/
*.egg
dist/
build/
.eggs/
*.db
*.sqlite
*.sqlite3
.ruff_cache/
.pytest_cache/
.mypy_cache/
*.log

36
Makefile Normal file
View File

@@ -0,0 +1,36 @@
APP_NAME := bouncer
VENV := .venv
PIP := $(VENV)/bin/pip
PYTHON := $(VENV)/bin/python
BOUNCER := $(VENV)/bin/bouncer
.PHONY: help venv install dev lint fmt test run clean
help: ## Show this help
@grep -E '^[a-zA-Z_-]+:.*##' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*## "}; {printf " \033[38;5;110m%-12s\033[0m %s\n", $$1, $$2}'
venv: ## Create virtual environment
python3 -m venv $(VENV)
$(PIP) install --upgrade pip
install: venv ## Install package (editable)
$(PIP) install -e .
dev: venv ## Install with dev dependencies
$(PIP) install -e ".[dev]"
lint: ## Run linter
$(VENV)/bin/ruff check src/ tests/
fmt: ## Format code
$(VENV)/bin/black src/ tests/
$(VENV)/bin/ruff check --fix src/ tests/
test: ## Run tests
$(VENV)/bin/pytest -v
run: ## Run bouncer
$(BOUNCER) --config config/bouncer.toml
clean: ## Remove build artifacts
rm -rf $(VENV) dist build *.egg-info src/*.egg-info

46
PROJECT.md Normal file
View File

@@ -0,0 +1,46 @@
# Project: bouncer
## Purpose
IRC bouncer that maintains persistent connections to IRC servers through a SOCKS5 proxy, allowing IRC clients to connect/disconnect while keeping the session alive and replaying missed messages.
## Architecture
```
IRC Client(s) --> [bouncer:6667] --> Router --> [SOCKS5:1080] --> IRC Server(s)
|
Backlog
(SQLite)
```
### Components
| Module | Responsibility |
|--------|---------------|
| `irc.py` | IRC protocol parser/formatter (RFC 2812 subset) |
| `config.py` | TOML configuration loading and validation |
| `proxy.py` | SOCKS5 async connection wrapper |
| `network.py` | Persistent IRC server connection per network |
| `server.py` | TCP listener accepting IRC client connections |
| `client.py` | Per-client session and IRC handshake |
| `router.py` | Message routing between clients and networks |
| `backlog.py` | SQLite message storage and replay |
### Key Decisions
- **asyncio**: Single-threaded async for all I/O
- **python-socks**: Async SOCKS5 proxy support
- **aiosqlite**: Non-blocking SQLite for backlog
- **No IRC library**: Manual protocol handling (IRC is simple line-based)
## Dependencies
| Package | Version | Purpose |
|---------|---------|---------|
| python-socks[asyncio] | >=2.4 | SOCKS5 proxy |
| aiosqlite | >=0.19 | Async SQLite |
## Requirements
- Python 3.10+
- SOCKS5 proxy running on 127.0.0.1:1080

65
README.md Normal file
View File

@@ -0,0 +1,65 @@
# bouncer
IRC bouncer with SOCKS5 proxy support and persistent message backlog.
## Features
- Connect to multiple IRC networks simultaneously
- All outbound connections routed through SOCKS5 proxy
- Persistent message backlog (SQLite) with replay on reconnect
- Multiple clients can attach to the same network session
- Password authentication
- TLS support for IRC server connections
- Automatic reconnection with exponential backoff
- Nick collision handling
## Quick Start
```bash
# Clone and install
cd ~/git/bouncer
make dev
# Copy and edit config
cp config/bouncer.example.toml config/bouncer.toml
$EDITOR config/bouncer.toml
# Run
bouncer -c config/bouncer.toml -v
```
## Connect
From your IRC client, connect to `127.0.0.1:6667` with:
```
PASS networkname:yourpassword
```
Where `networkname` matches a `[networks.NAME]` section in your config.
## Configuration
See [config/bouncer.example.toml](config/bouncer.example.toml) for a full example.
## Documentation
| Document | Description |
|----------|-------------|
| [docs/INSTALL.md](docs/INSTALL.md) | Prerequisites and setup |
| [docs/USAGE.md](docs/USAGE.md) | Comprehensive guide |
| [docs/CHEATSHEET.md](docs/CHEATSHEET.md) | Quick reference |
| [docs/DEBUG.md](docs/DEBUG.md) | Troubleshooting |
## Development
```bash
make dev # Install with dev deps
make test # Run tests
make lint # Run linter
make fmt # Format code
```
## License
MIT

38
ROADMAP.md Normal file
View File

@@ -0,0 +1,38 @@
# Roadmap
## v0.1.0 (current)
- [x] IRC protocol parser/formatter
- [x] TOML configuration
- [x] SOCKS5 proxy connector
- [x] Multi-network support
- [x] Client authentication (password)
- [x] Persistent backlog (SQLite)
- [x] Backlog replay on reconnect
- [x] Automatic reconnection with backoff
- [x] Nick collision handling
- [x] TLS support
## v0.2.0
- [ ] Client-side TLS (accept TLS from clients)
- [ ] Per-network password support
- [ ] CTCP version/ping response
- [ ] Channel key support (JOIN #channel key)
- [ ] SASL authentication to IRC servers
- [ ] Configurable backlog format (timestamps)
## v0.3.0
- [ ] Web status page
- [ ] Hot config reload (SIGHUP)
- [ ] Systemd service file
- [ ] Per-client backlog tracking (multi-user)
- [ ] DCC passthrough
## v1.0.0
- [ ] Stable API
- [ ] Comprehensive test coverage
- [ ] Documentation complete
- [ ] Packaged for PyPI

16
TASKS.md Normal file
View File

@@ -0,0 +1,16 @@
# Tasks
## Current
- [x] P0: Core implementation (irc, config, proxy, network, client, server, router, backlog)
- [x] P0: Unit tests (irc, config, backlog)
- [x] P0: CLI and entry point
- [x] P0: Documentation
- [ ] P1: Integration testing with live IRC server
- [ ] P1: Verify SOCKS5 proxy connectivity end-to-end
## Next
- [ ] P2: Client-side TLS support
- [ ] P2: SASL authentication
- [ ] P3: Systemd service file

29
TODO.md Normal file
View File

@@ -0,0 +1,29 @@
# TODO
## Features
- [ ] Client TLS (accept encrypted client connections)
- [ ] SASL PLAIN/EXTERNAL for IRC server auth
- [ ] Channel key support
- [ ] CTCP VERSION/PING responses
- [ ] Hot config reload on SIGHUP
- [ ] Web status dashboard
- [ ] DCC passthrough
## Infrastructure
- [ ] Systemd unit file
- [ ] Containerfile for podman deployment
- [ ] PyPI packaging
## Testing
- [ ] Integration tests with mock IRC server
- [ ] SOCKS5 proxy failure tests
- [ ] Backlog replay edge cases
- [ ] Concurrent client attach/detach
## Documentation
- [ ] Architecture diagram
- [ ] Sequence diagrams for connection flow

View File

@@ -0,0 +1,29 @@
[bouncer]
bind = "127.0.0.1"
port = 6667
password = "changeme"
[bouncer.backlog]
max_messages = 10000
replay_on_connect = true
[proxy]
host = "127.0.0.1"
port = 1080
[networks.libera]
host = "irc.libera.chat"
port = 6697
tls = true
nick = "mynick"
user = "mynick"
realname = "bouncer user"
channels = ["#test"]
autojoin = true
# [networks.oftc]
# host = "irc.oftc.net"
# port = 6697
# tls = true
# nick = "mynick"
# channels = ["#debian"]

53
docs/CHEATSHEET.md Normal file
View File

@@ -0,0 +1,53 @@
# Cheatsheet
## Commands
```bash
bouncer -c config/bouncer.toml # Start with config
bouncer -c config/bouncer.toml -v # Start with debug output
bouncer --version # Show version
bouncer --help # Show help
```
## Development
```bash
make dev # Install with dev deps
make test # Run pytest
make lint # Run ruff
make fmt # Format with black + ruff
make run # Run with default config
make clean # Remove .venv and build artifacts
```
## Client Connection
```
PASS <network>:<password> # Authenticate + select network
PASS <password> # Authenticate, use first network
```
## Config Structure
```toml
[bouncer] # Listener settings
bind / port / password
[bouncer.backlog] # Backlog settings
max_messages / replay_on_connect
[proxy] # SOCKS5 proxy
host / port
[networks.<name>] # IRC server (repeatable)
host / port / tls
nick / user / realname
channels / autojoin / password
```
## Files
| Path | Purpose |
|------|---------|
| `config/bouncer.toml` | Active configuration |
| `config/bouncer.db` | SQLite backlog database |
| `config/bouncer.example.toml` | Example config template |

75
docs/DEBUG.md Normal file
View File

@@ -0,0 +1,75 @@
# Debugging
## Verbose Mode
```bash
bouncer -c config/bouncer.toml -v
```
Debug logging shows:
- SOCKS5 proxy connection attempts
- IRC server registration
- Client connect/disconnect events
- Message routing
- Backlog replay counts
## Common Issues
### "config not found"
Ensure the config path is correct:
```bash
bouncer -c /full/path/to/bouncer.toml
```
### Connection refused (SOCKS5 proxy)
Verify the proxy is running:
```bash
ss -tlnp | grep 1080
```
### Connection timeout to IRC server
Check the SOCKS5 proxy can reach the IRC server:
```bash
curl --socks5 127.0.0.1:1080 -v telnet://irc.libera.chat:6697
```
### Nick already in use
The bouncer appends `_` to the nick and retries. Check logs for:
```
WARNING bouncer.network [libera] nick in use, trying mynick_
```
### TLS certificate errors
If connecting to a server with a self-signed cert, this is currently not supported. All TLS connections use the system CA store.
## Inspecting the Backlog Database
```bash
sqlite3 config/bouncer.db
-- Recent messages
SELECT * FROM messages ORDER BY id DESC LIMIT 20;
-- Messages per network
SELECT network, COUNT(*) FROM messages GROUP BY network;
-- Client state
SELECT * FROM client_state;
```
## Log Format
```
HH:MM:SS LEVEL module message
```
Levels: `DEBUG`, `INFO`, `WARNING`, `ERROR`

42
docs/INSTALL.md Normal file
View File

@@ -0,0 +1,42 @@
# Installation
## Prerequisites
- Python 3.10+
- SOCKS5 proxy running on `127.0.0.1:1080`
## Setup
```bash
cd ~/git/bouncer
make dev
```
This creates `.venv/`, installs dependencies, and registers the `bouncer` command.
## Verify
```bash
bouncer --version
```
## Configuration
```bash
cp config/bouncer.example.toml config/bouncer.toml
```
Edit `config/bouncer.toml` with your network details. At minimum, set:
- `bouncer.password` -- client authentication password
- `networks.<name>.host` -- IRC server hostname
- `networks.<name>.nick` -- your IRC nickname
- `networks.<name>.channels` -- channels to auto-join
## Symlink
The `make dev` editable install registers `bouncer` in `.venv/bin/`. To make it available system-wide:
```bash
ln -sf ~/git/bouncer/.venv/bin/bouncer ~/.local/bin/bouncer
```

77
docs/USAGE.md Normal file
View File

@@ -0,0 +1,77 @@
# Usage
## Starting the Bouncer
```bash
bouncer -c config/bouncer.toml -v
```
| Flag | Description |
|------|-------------|
| `-c, --config PATH` | Config file (default: `config/bouncer.toml`) |
| `-v, --verbose` | Debug logging |
| `--version` | Show version |
## Connecting with an IRC Client
Configure your IRC client to connect to the bouncer:
| Setting | Value |
|---------|-------|
| Server | `127.0.0.1` |
| Port | `6667` (or as configured) |
| Password | `networkname:yourpassword` |
### Password Format
```
PASS <network>:<password>
```
- `network` -- matches a `[networks.NAME]` section in config
- `password` -- the `bouncer.password` value from config
If you omit the network prefix (`PASS yourpassword`), the first configured network is used.
### Client Examples
**irssi:**
```
/connect -password libera:mypassword 127.0.0.1 6667
```
**weechat:**
```
/server add bouncer 127.0.0.1/6667 -password=libera:mypassword
/connect bouncer
```
**hexchat:**
Set server password to `libera:mypassword` in the network settings.
## Multiple Networks
Define multiple `[networks.*]` sections in the config. Connect with different passwords to access each:
```
PASS libera:mypassword # connects to libera
PASS oftc:mypassword # connects to oftc
```
Multiple clients can attach to the same network simultaneously.
## Backlog
Messages are stored in `bouncer.db` (SQLite) next to the config file. When you reconnect, missed messages are automatically replayed.
Configure backlog in `bouncer.toml`:
```toml
[bouncer.backlog]
max_messages = 10000 # per network, 0 = unlimited
replay_on_connect = true # set false to disable replay
```
## Stopping
Press `Ctrl+C` or send `SIGTERM`. The bouncer shuts down gracefully.

42
pyproject.toml Normal file
View File

@@ -0,0 +1,42 @@
[build-system]
requires = ["setuptools>=68.0", "setuptools-scm"]
build-backend = "setuptools.build_meta"
[project]
name = "bouncer"
version = "0.1.0"
description = "IRC bouncer with SOCKS5 proxy support and persistent backlog"
requires-python = ">=3.10"
dependencies = [
"python-socks[asyncio]>=2.4",
"aiosqlite>=0.19",
]
[project.optional-dependencies]
dev = [
"ruff>=0.4",
"black>=24.0",
"pytest>=8.0",
"pytest-asyncio>=0.23",
]
[project.scripts]
bouncer = "bouncer.__main__:main"
[tool.setuptools.packages.find]
where = ["src"]
[tool.ruff]
target-version = "py310"
line-length = 100
[tool.ruff.lint]
select = ["E", "F", "W", "I"]
[tool.black]
line-length = 100
target-version = ["py310"]
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"

3
src/bouncer/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""IRC bouncer with SOCKS5 proxy support and persistent backlog."""
__version__ = "0.1.0"

80
src/bouncer/__main__.py Normal file
View File

@@ -0,0 +1,80 @@
"""Entry point for bouncer."""
from __future__ import annotations
import asyncio
import logging
import signal
import sys
from pathlib import Path
from bouncer.backlog import Backlog
from bouncer.cli import parse_args
from bouncer.config import load
from bouncer.router import Router
from bouncer.server import start
log = logging.getLogger("bouncer")
def _setup_logging(verbose: bool) -> None:
level = logging.DEBUG if verbose else logging.INFO
fmt = "\033[2m%(asctime)s\033[0m %(levelname)-5s \033[38;5;110m%(name)s\033[0m %(message)s"
datefmt = "%H:%M:%S"
logging.basicConfig(level=level, format=fmt, datefmt=datefmt)
async def _run(config_path: Path, verbose: bool) -> None:
_setup_logging(verbose)
cfg = load(config_path)
log.info("loaded config: %d network(s)", len(cfg.networks))
# Data directory alongside config
data_dir = config_path.parent
db_path = data_dir / "bouncer.db"
backlog = Backlog(db_path)
await backlog.open()
router = Router(cfg, backlog)
await router.start_networks()
server = await start(cfg.bouncer, router)
# Graceful shutdown on SIGINT/SIGTERM
loop = asyncio.get_running_loop()
stop_event = asyncio.Event()
def _signal_handler() -> None:
log.info("shutting down...")
stop_event.set()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, _signal_handler)
await stop_event.wait()
server.close()
await server.wait_closed()
await router.stop_networks()
await backlog.close()
log.info("shutdown complete")
def main() -> None:
"""CLI entry point."""
args = parse_args()
if not args.config.exists():
print(f"error: config not found: {args.config}", file=sys.stderr)
sys.exit(1)
try:
asyncio.run(_run(args.config, args.verbose))
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()

152
src/bouncer/backlog.py Normal file
View File

@@ -0,0 +1,152 @@
"""SQLite-backed persistent message backlog."""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from pathlib import Path
import aiosqlite
log = logging.getLogger(__name__)
SCHEMA = """\
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
network TEXT NOT NULL,
target TEXT NOT NULL,
sender TEXT NOT NULL,
command TEXT NOT NULL,
content TEXT NOT NULL,
timestamp REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_net_ts ON messages(network, timestamp);
CREATE TABLE IF NOT EXISTS client_state (
network TEXT PRIMARY KEY,
last_seen_id INTEGER NOT NULL DEFAULT 0,
last_disconnect REAL
);
"""
@dataclass(slots=True)
class BacklogEntry:
"""A stored message."""
id: int
network: str
target: str
sender: str
command: str
content: str
timestamp: float
class Backlog:
"""Async SQLite backlog storage."""
def __init__(self, db_path: Path) -> None:
self._path = db_path
self._db: aiosqlite.Connection | None = None
async def open(self) -> None:
"""Open database and ensure schema exists."""
self._db = await aiosqlite.connect(self._path)
await self._db.executescript(SCHEMA)
await self._db.commit()
log.debug("backlog database opened: %s", self._path)
async def close(self) -> None:
"""Close the database connection."""
if self._db:
await self._db.close()
self._db = None
async def store(
self,
network: str,
target: str,
sender: str,
command: str,
content: str,
) -> int:
"""Store a message. Returns the message ID."""
assert self._db is not None
cursor = await self._db.execute(
"INSERT INTO messages (network, target, sender, command, content, timestamp) "
"VALUES (?, ?, ?, ?, ?, ?)",
(network, target, sender, command, content, time.time()),
)
await self._db.commit()
return cursor.lastrowid # type: ignore[return-value]
async def replay(self, network: str, since_id: int = 0) -> list[BacklogEntry]:
"""Fetch messages for a network since a given message ID."""
assert self._db is not None
cursor = await self._db.execute(
"SELECT id, network, target, sender, command, content, timestamp "
"FROM messages WHERE network = ? AND id > ? ORDER BY id",
(network, since_id),
)
rows = await cursor.fetchall()
return [BacklogEntry(*row) for row in rows]
async def get_last_seen(self, network: str) -> int:
"""Get the last seen message ID for a network."""
assert self._db is not None
cursor = await self._db.execute(
"SELECT last_seen_id FROM client_state WHERE network = ?",
(network,),
)
row = await cursor.fetchone()
return row[0] if row else 0
async def mark_seen(self, network: str, message_id: int) -> None:
"""Update the last seen message ID for a network."""
assert self._db is not None
await self._db.execute(
"INSERT INTO client_state (network, last_seen_id, last_disconnect) "
"VALUES (?, ?, ?) "
"ON CONFLICT(network) DO UPDATE SET last_seen_id = excluded.last_seen_id",
(network, message_id, time.time()),
)
await self._db.commit()
async def record_disconnect(self, network: str) -> None:
"""Record when the last client disconnected from a network."""
assert self._db is not None
last_id = await self._max_id(network)
await self._db.execute(
"INSERT INTO client_state (network, last_seen_id, last_disconnect) "
"VALUES (?, ?, ?) "
"ON CONFLICT(network) DO UPDATE SET "
"last_seen_id = excluded.last_seen_id, last_disconnect = excluded.last_disconnect",
(network, last_id, time.time()),
)
await self._db.commit()
async def prune(self, network: str, keep: int) -> int:
"""Delete old messages, keeping the most recent `keep` entries. Returns count deleted."""
if keep <= 0:
return 0
assert self._db is not None
cursor = await self._db.execute(
"DELETE FROM messages WHERE network = ? AND id NOT IN "
"(SELECT id FROM messages WHERE network = ? ORDER BY id DESC LIMIT ?)",
(network, network, keep),
)
await self._db.commit()
return cursor.rowcount # type: ignore[return-value]
async def _max_id(self, network: str) -> int:
"""Get the maximum message ID for a network."""
assert self._db is not None
cursor = await self._db.execute(
"SELECT COALESCE(MAX(id), 0) FROM messages WHERE network = ?",
(network,),
)
row = await cursor.fetchone()
return row[0] if row else 0

33
src/bouncer/cli.py Normal file
View File

@@ -0,0 +1,33 @@
"""Command-line interface."""
from __future__ import annotations
import argparse
from pathlib import Path
from bouncer import __version__
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
prog="bouncer",
description="IRC bouncer with SOCKS5 proxy support",
)
parser.add_argument(
"-c", "--config",
type=Path,
default=Path("config/bouncer.toml"),
help="path to configuration file (default: config/bouncer.toml)",
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="enable debug logging",
)
parser.add_argument(
"--version",
action="version",
version=f"%(prog)s {__version__}",
)
return parser.parse_args(argv)

250
src/bouncer/client.py Normal file
View File

@@ -0,0 +1,250 @@
"""IRC client session handler."""
from __future__ import annotations
import asyncio
import logging
from typing import TYPE_CHECKING
from bouncer.irc import IRCMessage, parse
if TYPE_CHECKING:
from bouncer.network import Network
from bouncer.router import Router
log = logging.getLogger(__name__)
# Numeric replies for synthetic welcome
RPL_WELCOME = "001"
RPL_YOURHOST = "002"
RPL_CREATED = "003"
RPL_MYINFO = "004"
RPL_TOPIC = "332"
RPL_NAMREPLY = "353"
RPL_ENDOFNAMES = "366"
# Commands to forward from client to network
FORWARD_COMMANDS = {
"PRIVMSG", "NOTICE", "JOIN", "PART", "QUIT", "KICK", "MODE",
"TOPIC", "INVITE", "WHO", "WHOIS", "WHOWAS", "LIST", "NAMES",
"AWAY", "USERHOST", "ISON", "PING", "PONG",
}
class Client:
"""Handles a single IRC client connection to the bouncer."""
def __init__(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
router: Router,
password: str,
) -> None:
self._reader = reader
self._writer = writer
self._router = router
self._password = password
self._network_name: str | None = None
self._network: Network | None = None
self._nick: str = "*"
self._user: str = "unknown"
self._realname: str = ""
self._authenticated: bool = False
self._registered: bool = False
self._got_pass: bool = False
self._got_nick: bool = False
self._got_user: bool = False
self._pass_raw: str = ""
self._addr = writer.get_extra_info("peername", ("?", 0))
async def handle(self) -> None:
"""Main client session loop."""
log.info("client connected from %s", self._addr)
buf = b""
try:
while True:
data = await self._reader.read(4096)
if not data:
break
buf += data
while b"\r\n" in buf:
line, buf = buf.split(b"\r\n", 1)
if not line:
continue
try:
msg = parse(line)
await self._handle_message(msg)
except Exception:
log.exception("error handling client message: %r", line)
except asyncio.CancelledError:
pass
except ConnectionResetError:
pass
except Exception:
log.exception("client error from %s", self._addr)
finally:
await self._cleanup()
def write(self, data: bytes) -> None:
"""Write raw bytes to the client."""
if not self._writer.is_closing():
self._writer.write(data)
async def _handle_message(self, msg: IRCMessage) -> None:
"""Process an IRC message from the client."""
if not self._registered:
await self._handle_registration(msg)
return
if msg.command == "PING":
self._send_msg(IRCMessage(command="PONG", params=msg.params, prefix="bouncer"))
return
if msg.command == "QUIT":
return
if msg.command in FORWARD_COMMANDS and self._network_name:
await self._router.client_to_network(self._network_name, msg)
async def _handle_registration(self, msg: IRCMessage) -> None:
"""Handle PASS/NICK/USER registration sequence."""
if msg.command == "CAP":
# Minimal CAP handling - just reject capabilities
if msg.params and msg.params[0] == "LS":
self._send_msg(IRCMessage(command="CAP", params=["*", "LS", ""]))
elif msg.params and msg.params[0] == "END":
pass
return
if msg.command == "PASS" and msg.params:
self._pass_raw = msg.params[0]
self._got_pass = True
elif msg.command == "NICK" and msg.params:
self._nick = msg.params[0]
self._got_nick = True
elif msg.command == "USER" and len(msg.params) >= 4:
self._user = msg.params[0]
self._realname = msg.params[3]
self._got_user = True
# Check if we have all three
if self._got_nick and self._got_user:
await self._complete_registration()
async def _complete_registration(self) -> None:
"""Validate credentials and attach to network."""
# Parse PASS: "network:password" or just "password" (use first network)
network_name: str | None = None
password: str = ""
if self._got_pass and ":" in self._pass_raw:
network_name, password = self._pass_raw.split(":", 1)
elif self._got_pass:
password = self._pass_raw
else:
self._send_error("Password required (PASS network:password)")
self._writer.close()
return
if password != self._password:
self._send_error("Invalid password")
self._writer.close()
return
# Resolve network
if not network_name:
# Default to first configured network
names = self._router.network_names()
if names:
network_name = names[0]
if not network_name:
self._send_error("No network specified and none configured")
self._writer.close()
return
self._authenticated = True
self._registered = True
self._network_name = network_name
# Attach to network
self._network = await self._router.attach(self, network_name)
if not self._network:
self._send_error(f"Unknown network: {network_name}")
self._writer.close()
return
log.info(
"client %s authenticated for network %s (nick=%s)",
self._addr, network_name, self._nick,
)
# Send synthetic welcome
await self._send_welcome()
async def _send_welcome(self) -> None:
"""Send IRC welcome sequence and channel state to client."""
assert self._network is not None
server_name = "bouncer"
nick = self._network.nick
self._send_msg(IRCMessage(
command=RPL_WELCOME, prefix=server_name,
params=[nick, f"Welcome to bouncer ({self._network_name})"],
))
self._send_msg(IRCMessage(
command=RPL_YOURHOST, prefix=server_name,
params=[nick, f"Your host is {server_name}"],
))
self._send_msg(IRCMessage(
command=RPL_CREATED, prefix=server_name,
params=[nick, "This server was created by bouncer"],
))
self._send_msg(IRCMessage(
command=RPL_MYINFO, prefix=server_name,
params=[nick, server_name, "bouncer-0.1", "o", "o"],
))
# Send channel state for joined channels
for channel in self._network.channels:
# Topic
topic = self._network.topics.get(channel, "")
if topic:
self._send_msg(IRCMessage(
command=RPL_TOPIC, prefix=server_name,
params=[nick, channel, topic],
))
# Names
names = self._network.names.get(channel, set())
if names:
name_str = " ".join(sorted(names))
self._send_msg(IRCMessage(
command=RPL_NAMREPLY, prefix=server_name,
params=[nick, "=", channel, name_str],
))
self._send_msg(IRCMessage(
command=RPL_ENDOFNAMES, prefix=server_name,
params=[nick, channel, "End of /NAMES list"],
))
def _send_msg(self, msg: IRCMessage) -> None:
"""Send an IRCMessage to this client."""
self.write(msg.format())
def _send_error(self, text: str) -> None:
"""Send an ERROR message to the client."""
self._send_msg(IRCMessage(command="ERROR", params=[text]))
async def _cleanup(self) -> None:
"""Detach from network and close connection."""
log.info("client disconnected from %s", self._addr)
if self._network_name:
await self._router.detach(self, self._network_name)
if not self._writer.is_closing():
self._writer.close()

108
src/bouncer/config.py Normal file
View File

@@ -0,0 +1,108 @@
"""Configuration loader and validation."""
from __future__ import annotations
import sys
from dataclasses import dataclass, field
from pathlib import Path
if sys.version_info >= (3, 11):
import tomllib
else:
try:
import tomllib
except ModuleNotFoundError:
import tomli as tomllib # type: ignore[no-redef]
@dataclass(slots=True)
class BacklogConfig:
"""Backlog storage settings."""
max_messages: int = 10000
replay_on_connect: bool = True
@dataclass(slots=True)
class ProxyConfig:
"""SOCKS5 proxy settings."""
host: str = "127.0.0.1"
port: int = 1080
@dataclass(slots=True)
class NetworkConfig:
"""IRC network connection settings."""
name: str
host: str
port: int = 6667
tls: bool = False
nick: str = "bouncer"
user: str = "bouncer"
realname: str = "bouncer"
channels: list[str] = field(default_factory=list)
autojoin: bool = True
password: str | None = None
@dataclass(slots=True)
class BouncerConfig:
"""Main bouncer settings."""
bind: str = "127.0.0.1"
port: int = 6667
password: str = "changeme"
backlog: BacklogConfig = field(default_factory=BacklogConfig)
@dataclass(slots=True)
class Config:
"""Top-level configuration."""
bouncer: BouncerConfig
proxy: ProxyConfig
networks: dict[str, NetworkConfig]
def load(path: Path) -> Config:
"""Load and validate configuration from a TOML file."""
with open(path, "rb") as f:
raw = tomllib.load(f)
bouncer_raw = raw.get("bouncer", {})
backlog_raw = bouncer_raw.pop("backlog", {})
bouncer = BouncerConfig(
bind=bouncer_raw.get("bind", "127.0.0.1"),
port=bouncer_raw.get("port", 6667),
password=bouncer_raw.get("password", "changeme"),
backlog=BacklogConfig(**backlog_raw),
)
proxy_raw = raw.get("proxy", {})
proxy = ProxyConfig(
host=proxy_raw.get("host", "127.0.0.1"),
port=proxy_raw.get("port", 1080),
)
networks: dict[str, NetworkConfig] = {}
for name, net_raw in raw.get("networks", {}).items():
networks[name] = NetworkConfig(
name=name,
host=net_raw["host"],
port=net_raw.get("port", 6697 if net_raw.get("tls", False) else 6667),
tls=net_raw.get("tls", False),
nick=net_raw.get("nick", "bouncer"),
user=net_raw.get("user", net_raw.get("nick", "bouncer")),
realname=net_raw.get("realname", "bouncer"),
channels=net_raw.get("channels", []),
autojoin=net_raw.get("autojoin", True),
password=net_raw.get("password"),
)
if not networks:
raise ValueError("at least one network must be configured")
return Config(bouncer=bouncer, proxy=proxy, networks=networks)

108
src/bouncer/irc.py Normal file
View File

@@ -0,0 +1,108 @@
"""IRC message parser and formatter (RFC 2812 subset)."""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass(slots=True)
class IRCMessage:
"""Parsed IRC protocol message."""
command: str
params: list[str] = field(default_factory=list)
prefix: str | None = None
tags: dict[str, str | None] = field(default_factory=dict)
@property
def trailing(self) -> str | None:
"""Return the trailing parameter (last param), or None."""
return self.params[-1] if self.params else None
def format(self) -> bytes:
"""Serialize to IRC wire format (with CRLF)."""
parts: list[str] = []
if self.tags:
tag_str = ";".join(
f"{k}={v}" if v is not None else k for k, v in self.tags.items()
)
parts.append(f"@{tag_str}")
if self.prefix:
parts.append(f":{self.prefix}")
parts.append(self.command)
if self.params:
for i, param in enumerate(self.params):
if i == len(self.params) - 1 and (
" " in param or param.startswith(":") or not param
):
parts.append(f":{param}")
else:
parts.append(param)
return (_encode(" ".join(parts)) + b"\r\n")
def parse(data: bytes) -> IRCMessage:
"""Parse a single IRC line (without trailing CRLF) into an IRCMessage."""
line = _decode(data.rstrip(b"\r\n"))
pos = 0
# Parse IRCv3 tags (@key=value;key2)
tags: dict[str, str | None] = {}
if line.startswith("@"):
space = line.index(" ", 1)
tag_str = line[1:space]
for tag in tag_str.split(";"):
if "=" in tag:
k, v = tag.split("=", 1)
tags[k] = v
else:
tags[tag] = None
pos = space + 1
# Parse prefix (:nick!user@host)
prefix: str | None = None
if pos < len(line) and line[pos] == ":":
space = line.index(" ", pos + 1)
prefix = line[pos + 1 : space]
pos = space + 1
# Parse command and params
rest = line[pos:]
if " :" in rest:
head, trailing = rest.split(" :", 1)
parts = head.split()
command = parts[0].upper()
params = parts[1:] + [trailing]
else:
parts = rest.split()
command = parts[0].upper()
params = parts[1:]
return IRCMessage(command=command, params=params, prefix=prefix, tags=tags)
def parse_prefix(prefix: str) -> tuple[str, str | None, str | None]:
"""Split prefix into (nick, user, host). User/host may be None."""
if "!" in prefix:
nick, rest = prefix.split("!", 1)
if "@" in rest:
user, host = rest.split("@", 1)
return nick, user, host
return nick, rest, None
if "@" in prefix:
nick, host = prefix.split("@", 1)
return nick, None, host
return prefix, None, None
def _decode(data: bytes) -> str:
"""Decode IRC bytes, UTF-8 with latin-1 fallback."""
try:
return data.decode("utf-8")
except UnicodeDecodeError:
return data.decode("latin-1")
def _encode(text: str) -> bytes:
"""Encode IRC string to bytes."""
return text.encode("utf-8")

222
src/bouncer/network.py Normal file
View File

@@ -0,0 +1,222 @@
"""Persistent IRC server connection manager."""
from __future__ import annotations
import asyncio
import logging
from typing import Callable
from bouncer.config import NetworkConfig, ProxyConfig
from bouncer.irc import IRCMessage, parse
log = logging.getLogger(__name__)
BACKOFF_STEPS = [5, 10, 30, 60, 120, 300]
class Network:
"""Manages a persistent connection to a single IRC server."""
def __init__(
self,
cfg: NetworkConfig,
proxy_cfg: ProxyConfig,
on_message: Callable[[str, IRCMessage], None] | None = None,
) -> None:
self.cfg = cfg
self.proxy_cfg = proxy_cfg
self.on_message = on_message
self.nick: str = cfg.nick
self.channels: set[str] = set()
self.connected: bool = False
self.registered: bool = False
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
self._reconnect_attempt: int = 0
self._running: bool = False
self._read_task: asyncio.Task[None] | None = None
# Channel state: topic + names per channel
self.topics: dict[str, str] = {}
self.names: dict[str, set[str]] = {}
async def start(self) -> None:
"""Start the network connection loop."""
self._running = True
await self._connect()
async def stop(self) -> None:
"""Disconnect and stop reconnection."""
self._running = False
if self._read_task and not self._read_task.done():
self._read_task.cancel()
await self._disconnect()
async def send(self, msg: IRCMessage) -> None:
"""Send an IRC message to the server."""
if self._writer and not self._writer.is_closing():
self._writer.write(msg.format())
await self._writer.drain()
async def send_raw(self, command: str, *params: str) -> None:
"""Build and send an IRC message."""
await self.send(IRCMessage(command=command, params=list(params)))
async def _connect(self) -> None:
"""Establish connection via SOCKS5 proxy and register."""
from bouncer.proxy import connect
try:
log.info(
"[%s] connecting to %s:%d (tls=%s)",
self.cfg.name, self.cfg.host, self.cfg.port, self.cfg.tls,
)
self._reader, self._writer = await connect(
self.cfg.host,
self.cfg.port,
self.proxy_cfg,
tls=self.cfg.tls,
)
self.connected = True
self._reconnect_attempt = 0
log.info("[%s] connected", self.cfg.name)
# IRC registration
if self.cfg.password:
await self.send_raw("PASS", self.cfg.password)
await self.send_raw("NICK", self.cfg.nick)
await self.send_raw(
"USER", self.cfg.user, "0", "*", self.cfg.realname,
)
# Start reading
self._read_task = asyncio.create_task(self._read_loop())
except Exception:
log.exception("[%s] connection failed", self.cfg.name)
self.connected = False
if self._running:
await self._schedule_reconnect()
async def _disconnect(self) -> None:
"""Close the connection."""
self.connected = False
self.registered = False
if self._writer and not self._writer.is_closing():
try:
self._writer.close()
await self._writer.wait_closed()
except Exception:
pass
self._reader = None
self._writer = None
async def _schedule_reconnect(self) -> None:
"""Wait with exponential backoff, then reconnect."""
delay = BACKOFF_STEPS[min(self._reconnect_attempt, len(BACKOFF_STEPS) - 1)]
self._reconnect_attempt += 1
log.info(
"[%s] reconnecting in %ds (attempt %d)",
self.cfg.name, delay, self._reconnect_attempt,
)
await asyncio.sleep(delay)
if self._running:
await self._connect()
async def _read_loop(self) -> None:
"""Read and dispatch messages from the IRC server."""
assert self._reader is not None
buf = b""
try:
while self._running and self.connected:
data = await self._reader.read(4096)
if not data:
log.warning("[%s] server closed connection", self.cfg.name)
break
buf += data
while b"\r\n" in buf:
line, buf = buf.split(b"\r\n", 1)
if not line:
continue
try:
msg = parse(line)
await self._handle(msg)
except Exception:
log.exception("[%s] failed to parse: %r", self.cfg.name, line)
except asyncio.CancelledError:
return
except Exception:
log.exception("[%s] read loop error", self.cfg.name)
finally:
await self._disconnect()
if self._running:
await self._schedule_reconnect()
async def _handle(self, msg: IRCMessage) -> None:
"""Handle an IRC message from the server."""
if msg.command == "PING":
await self.send_raw("PONG", *msg.params)
return
if msg.command == "001":
# RPL_WELCOME - registration complete
self.registered = True
self.nick = msg.params[0] if msg.params else self.cfg.nick
log.info("[%s] registered as %s", self.cfg.name, self.nick)
if self.cfg.autojoin and self.cfg.channels:
for ch in self.cfg.channels:
await self.send_raw("JOIN", ch)
elif msg.command == "JOIN" and msg.prefix:
nick = msg.prefix.split("!")[0]
channel = msg.params[0] if msg.params else ""
if nick == self.nick:
self.channels.add(channel)
self.names.setdefault(channel, set())
log.info("[%s] joined %s", self.cfg.name, channel)
elif msg.command == "PART" and msg.prefix:
nick = msg.prefix.split("!")[0]
channel = msg.params[0] if msg.params else ""
if nick == self.nick:
self.channels.discard(channel)
self.names.pop(channel, None)
self.topics.pop(channel, None)
elif msg.command == "332":
# RPL_TOPIC
if len(msg.params) >= 3:
self.topics[msg.params[1]] = msg.params[2]
elif msg.command == "353":
# RPL_NAMREPLY
if len(msg.params) >= 4:
channel = msg.params[2]
nicks = msg.params[3].split()
self.names.setdefault(channel, set()).update(nicks)
elif msg.command == "366":
# RPL_ENDOFNAMES
pass
elif msg.command == "433":
# ERR_NICKNAMEINUSE - append underscore
self.nick = self.nick + "_"
await self.send_raw("NICK", self.nick)
log.warning("[%s] nick in use, trying %s", self.cfg.name, self.nick)
elif msg.command == "KICK" and msg.params:
channel = msg.params[0]
kicked = msg.params[1] if len(msg.params) > 1 else ""
if kicked == self.nick:
self.channels.discard(channel)
log.warning("[%s] kicked from %s", self.cfg.name, channel)
# Rejoin after a brief delay
await asyncio.sleep(3)
if channel in {c for c in self.cfg.channels} and self._running:
await self.send_raw("JOIN", channel)
# Forward to router
if self.on_message:
self.on_message(self.cfg.name, msg)

46
src/bouncer/proxy.py Normal file
View File

@@ -0,0 +1,46 @@
"""SOCKS5 async connection wrapper."""
from __future__ import annotations
import asyncio
import logging
import ssl
from python_socks.async_.asyncio import Proxy
from bouncer.config import ProxyConfig
log = logging.getLogger(__name__)
async def connect(
host: str,
port: int,
proxy_cfg: ProxyConfig,
tls: bool = False,
) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
"""Open a TCP connection through the SOCKS5 proxy.
Returns an (asyncio.StreamReader, asyncio.StreamWriter) pair.
If tls=True, the connection is wrapped in SSL after the SOCKS5 handshake.
"""
proxy = Proxy.from_url(f"socks5://{proxy_cfg.host}:{proxy_cfg.port}")
log.debug("connecting to %s:%d via socks5://%s:%d", host, port, proxy_cfg.host, proxy_cfg.port)
sock = await proxy.connect(dest_host=host, dest_port=port)
ssl_ctx: ssl.SSLContext | None = None
if tls:
ssl_ctx = ssl.create_default_context()
reader, writer = await asyncio.open_connection(
host=None,
port=None,
sock=sock.socket,
ssl=ssl_ctx,
server_hostname=host if tls else None,
)
log.debug("connected to %s:%d (tls=%s)", host, port, tls)
return reader, writer

148
src/bouncer/router.py Normal file
View File

@@ -0,0 +1,148 @@
"""Message router between IRC clients and network connections."""
from __future__ import annotations
import asyncio
import logging
from typing import TYPE_CHECKING
from bouncer.backlog import Backlog
from bouncer.config import Config
from bouncer.irc import IRCMessage, parse_prefix
from bouncer.network import Network
if TYPE_CHECKING:
from bouncer.client import Client
log = logging.getLogger(__name__)
# Commands worth storing in backlog
BACKLOG_COMMANDS = {"PRIVMSG", "NOTICE", "TOPIC", "KICK", "MODE"}
class Router:
"""Central message hub linking clients to networks."""
def __init__(self, config: Config, backlog: Backlog) -> None:
self.config = config
self.backlog = backlog
self.networks: dict[str, Network] = {}
self.clients: dict[str, list[Client]] = {} # network_name -> clients
self._lock = asyncio.Lock()
async def start_networks(self) -> None:
"""Connect to all configured networks."""
for name, net_cfg in self.config.networks.items():
network = Network(
cfg=net_cfg,
proxy_cfg=self.config.proxy,
on_message=self._on_network_message,
)
self.networks[name] = network
self.clients[name] = []
asyncio.create_task(network.start())
async def stop_networks(self) -> None:
"""Disconnect all networks."""
for network in self.networks.values():
await network.stop()
async def attach(self, client: Client, network_name: str) -> Network | None:
"""Attach a client to a network. Returns the network or None if not found."""
if network_name not in self.networks:
return None
async with self._lock:
self.clients[network_name].append(client)
network = self.networks[network_name]
client_count = len(self.clients[network_name])
log.info("client attached to %s (%d clients)", network_name, client_count)
# Replay backlog
if self.config.bouncer.backlog.replay_on_connect:
await self._replay_backlog(client, network_name)
return network
async def detach(self, client: Client, network_name: str) -> None:
"""Detach a client from a network."""
async with self._lock:
if network_name in self.clients:
try:
self.clients[network_name].remove(client)
except ValueError:
pass
remaining = len(self.clients.get(network_name, []))
log.info("client detached from %s (%d remaining)", network_name, remaining)
if remaining == 0:
await self.backlog.record_disconnect(network_name)
async def client_to_network(self, network_name: str, msg: IRCMessage) -> None:
"""Forward a client command to the network."""
network = self.networks.get(network_name)
if network and network.connected:
await network.send(msg)
def _on_network_message(self, network_name: str, msg: IRCMessage) -> None:
"""Handle a message from an IRC server (called synchronously from network)."""
asyncio.create_task(self._dispatch(network_name, msg))
async def _dispatch(self, network_name: str, msg: IRCMessage) -> None:
"""Dispatch a network message to attached clients and backlog."""
# Store in backlog for relevant commands
if msg.command in BACKLOG_COMMANDS and msg.params:
target = msg.params[0]
sender = parse_prefix(msg.prefix)[0] if msg.prefix else ""
content = msg.params[1] if len(msg.params) > 1 else ""
await self.backlog.store(network_name, target, sender, msg.command, content)
# Prune if configured
max_msgs = self.config.bouncer.backlog.max_messages
if max_msgs > 0:
await self.backlog.prune(network_name, keep=max_msgs)
# Forward to all attached clients
clients = self.clients.get(network_name, [])
data = msg.format()
for client in clients:
try:
client.write(data)
except Exception:
log.exception("failed to write to client")
async def _replay_backlog(self, client: Client, network_name: str) -> None:
"""Replay missed messages to a newly connected client."""
since_id = await self.backlog.get_last_seen(network_name)
entries = await self.backlog.replay(network_name, since_id=since_id)
if not entries:
return
log.info("replaying %d messages for %s", len(entries), network_name)
for entry in entries:
msg = IRCMessage(
command=entry.command,
params=[entry.target, entry.content],
prefix=entry.sender,
)
try:
client.write(msg.format())
except Exception:
log.exception("failed to replay to client")
break
# Mark the latest as seen
if entries:
await self.backlog.mark_seen(network_name, entries[-1].id)
def network_names(self) -> list[str]:
"""Return available network names."""
return list(self.networks.keys())
def get_network(self, name: str) -> Network | None:
"""Get a network by name."""
return self.networks.get(name)

34
src/bouncer/server.py Normal file
View File

@@ -0,0 +1,34 @@
"""TCP server accepting IRC client connections."""
from __future__ import annotations
import asyncio
import logging
from bouncer.client import Client
from bouncer.config import BouncerConfig
from bouncer.router import Router
log = logging.getLogger(__name__)
async def start(config: BouncerConfig, router: Router) -> asyncio.Server:
"""Start the client listener and return the server object."""
async def _handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
client = Client(reader, writer, router, config.password)
try:
await client.handle()
except Exception:
log.exception("unhandled client error")
server = await asyncio.start_server(
_handle,
host=config.bind,
port=config.port,
)
addrs = ", ".join(str(s.getsockname()) for s in server.sockets)
log.info("listening on %s", addrs)
return server

0
tests/__init__.py Normal file
View File

89
tests/test_backlog.py Normal file
View File

@@ -0,0 +1,89 @@
"""Tests for SQLite backlog storage."""
import tempfile
from pathlib import Path
import pytest
from bouncer.backlog import Backlog
@pytest.fixture
async def backlog():
db_path = Path(tempfile.mktemp(suffix=".db"))
bl = Backlog(db_path)
await bl.open()
yield bl
await bl.close()
db_path.unlink(missing_ok=True)
class TestBacklog:
async def test_store_and_replay(self, backlog: Backlog):
msg_id = await backlog.store("libera", "#test", "nick", "PRIVMSG", "hello")
assert msg_id > 0
entries = await backlog.replay("libera")
assert len(entries) == 1
assert entries[0].content == "hello"
assert entries[0].sender == "nick"
assert entries[0].target == "#test"
async def test_replay_since_id(self, backlog: Backlog):
id1 = await backlog.store("libera", "#test", "a", "PRIVMSG", "first")
id2 = await backlog.store("libera", "#test", "b", "PRIVMSG", "second")
id3 = await backlog.store("libera", "#test", "c", "PRIVMSG", "third")
entries = await backlog.replay("libera", since_id=id1)
assert len(entries) == 2
assert entries[0].id == id2
assert entries[1].id == id3
async def test_replay_empty(self, backlog: Backlog):
entries = await backlog.replay("nonexistent")
assert entries == []
async def test_network_isolation(self, backlog: Backlog):
await backlog.store("libera", "#test", "a", "PRIVMSG", "libera msg")
await backlog.store("oftc", "#test", "b", "PRIVMSG", "oftc msg")
libera = await backlog.replay("libera")
oftc = await backlog.replay("oftc")
assert len(libera) == 1
assert len(oftc) == 1
assert libera[0].content == "libera msg"
assert oftc[0].content == "oftc msg"
async def test_mark_and_get_last_seen(self, backlog: Backlog):
assert await backlog.get_last_seen("libera") == 0
await backlog.mark_seen("libera", 42)
assert await backlog.get_last_seen("libera") == 42
await backlog.mark_seen("libera", 100)
assert await backlog.get_last_seen("libera") == 100
async def test_prune(self, backlog: Backlog):
for i in range(10):
await backlog.store("libera", "#test", "n", "PRIVMSG", f"msg{i}")
deleted = await backlog.prune("libera", keep=3)
assert deleted == 7
entries = await backlog.replay("libera")
assert len(entries) == 3
assert entries[0].content == "msg7"
async def test_prune_zero_keeps_all(self, backlog: Backlog):
for i in range(5):
await backlog.store("libera", "#test", "n", "PRIVMSG", f"msg{i}")
deleted = await backlog.prune("libera", keep=0)
assert deleted == 0
assert len(await backlog.replay("libera")) == 5
async def test_record_disconnect(self, backlog: Backlog):
await backlog.store("libera", "#test", "n", "PRIVMSG", "msg")
await backlog.record_disconnect("libera")
last = await backlog.get_last_seen("libera")
assert last > 0

112
tests/test_config.py Normal file
View File

@@ -0,0 +1,112 @@
"""Tests for configuration loading."""
import tempfile
from pathlib import Path
import pytest
from bouncer.config import load
MINIMAL_CONFIG = """\
[bouncer]
password = "secret"
[proxy]
host = "127.0.0.1"
port = 1080
[networks.test]
host = "irc.example.com"
"""
FULL_CONFIG = """\
[bouncer]
bind = "0.0.0.0"
port = 6668
password = "hunter2"
[bouncer.backlog]
max_messages = 5000
replay_on_connect = false
[proxy]
host = "10.0.0.1"
port = 9050
[networks.libera]
host = "irc.libera.chat"
port = 6697
tls = true
nick = "testbot"
user = "testuser"
realname = "Test Bot"
channels = ["#test", "#dev"]
autojoin = true
[networks.oftc]
host = "irc.oftc.net"
port = 6697
tls = true
nick = "testbot"
channels = ["#debian"]
"""
def _write_config(content: str) -> Path:
f = tempfile.NamedTemporaryFile(mode="w", suffix=".toml", delete=False)
f.write(content)
f.close()
return Path(f.name)
class TestLoad:
def test_minimal(self):
cfg = load(_write_config(MINIMAL_CONFIG))
assert cfg.bouncer.password == "secret"
assert cfg.bouncer.bind == "127.0.0.1"
assert cfg.bouncer.port == 6667
assert cfg.bouncer.backlog.max_messages == 10000
assert cfg.proxy.host == "127.0.0.1"
assert "test" in cfg.networks
net = cfg.networks["test"]
assert net.host == "irc.example.com"
assert net.port == 6667
assert net.tls is False
def test_full(self):
cfg = load(_write_config(FULL_CONFIG))
assert cfg.bouncer.bind == "0.0.0.0"
assert cfg.bouncer.port == 6668
assert cfg.bouncer.backlog.max_messages == 5000
assert cfg.bouncer.backlog.replay_on_connect is False
assert cfg.proxy.port == 9050
assert len(cfg.networks) == 2
libera = cfg.networks["libera"]
assert libera.tls is True
assert libera.port == 6697
assert libera.channels == ["#test", "#dev"]
assert libera.nick == "testbot"
def test_no_networks_raises(self):
config = """\
[bouncer]
password = "x"
[proxy]
"""
with pytest.raises(ValueError, match="at least one network"):
load(_write_config(config))
def test_tls_default_port(self):
config = """\
[bouncer]
password = "x"
[proxy]
[networks.test]
host = "irc.example.com"
tls = true
"""
cfg = load(_write_config(config))
assert cfg.networks["test"].port == 6697

129
tests/test_irc.py Normal file
View File

@@ -0,0 +1,129 @@
"""Tests for IRC message parsing and formatting."""
from bouncer.irc import IRCMessage, parse, parse_prefix
class TestParse:
def test_simple_command(self):
msg = parse(b"PING")
assert msg.command == "PING"
assert msg.params == []
assert msg.prefix is None
def test_command_with_param(self):
msg = parse(b"PING :server.example.com")
assert msg.command == "PING"
assert msg.params == ["server.example.com"]
def test_privmsg(self):
msg = parse(b":nick!user@host PRIVMSG #channel :Hello world")
assert msg.prefix == "nick!user@host"
assert msg.command == "PRIVMSG"
assert msg.params == ["#channel", "Hello world"]
def test_numeric_reply(self):
msg = parse(b":server 001 nick :Welcome to the network")
assert msg.prefix == "server"
assert msg.command == "001"
assert msg.params == ["nick", "Welcome to the network"]
def test_join(self):
msg = parse(b":nick!user@host JOIN #channel")
assert msg.command == "JOIN"
assert msg.params == ["#channel"]
def test_nick_user_registration(self):
msg = parse(b"NICK mynick")
assert msg.command == "NICK"
assert msg.params == ["mynick"]
def test_user_command(self):
msg = parse(b"USER myuser 0 * :Real Name")
assert msg.command == "USER"
assert msg.params == ["myuser", "0", "*", "Real Name"]
def test_pass_with_network(self):
msg = parse(b"PASS libera:secretpass")
assert msg.command == "PASS"
assert msg.params == ["libera:secretpass"]
def test_quit_with_message(self):
msg = parse(b":nick!user@host QUIT :Gone fishing")
assert msg.command == "QUIT"
assert msg.params == ["Gone fishing"]
def test_mode(self):
msg = parse(b":nick!user@host MODE #channel +o othernick")
assert msg.command == "MODE"
assert msg.params == ["#channel", "+o", "othernick"]
def test_crlf_stripped(self):
msg = parse(b"PING :test\r\n")
assert msg.command == "PING"
assert msg.params == ["test"]
def test_ircv3_tags(self):
msg = parse(b"@time=2024-01-01T00:00:00Z :nick!user@host PRIVMSG #ch :hi")
assert msg.tags == {"time": "2024-01-01T00:00:00Z"}
assert msg.command == "PRIVMSG"
def test_ircv3_tag_no_value(self):
msg = parse(b"@draft/reply;+example :nick PRIVMSG #ch :test")
assert msg.tags == {"draft/reply": None, "+example": None}
def test_latin1_fallback(self):
msg = parse(b":nick PRIVMSG #ch :\xe9\xe8\xea")
assert msg.params == ["#ch", "\xe9\xe8\xea"]
def test_trailing(self):
msg = parse(b":nick PRIVMSG #ch :hello world")
assert msg.trailing == "hello world"
def test_trailing_empty(self):
msg = parse(b"PING")
assert msg.trailing is None
class TestFormat:
def test_simple_command(self):
msg = IRCMessage(command="PING")
assert msg.format() == b"PING\r\n"
def test_with_params(self):
msg = IRCMessage(command="NICK", params=["mynick"])
assert msg.format() == b"NICK mynick\r\n"
def test_with_trailing(self):
msg = IRCMessage(command="PRIVMSG", params=["#channel", "Hello world"])
assert msg.format() == b"PRIVMSG #channel :Hello world\r\n"
def test_with_prefix(self):
msg = IRCMessage(command="PRIVMSG", params=["#ch", "hi"], prefix="nick!user@host")
assert msg.format() == b":nick!user@host PRIVMSG #ch hi\r\n"
def test_with_tags(self):
msg = IRCMessage(
command="PRIVMSG",
params=["#ch", "hi"],
tags={"time": "2024-01-01T00:00:00Z"},
)
assert msg.format() == b"@time=2024-01-01T00:00:00Z PRIVMSG #ch hi\r\n"
def test_roundtrip(self):
original = b":nick!user@host PRIVMSG #channel :Hello world"
msg = parse(original)
assert msg.format() == original + b"\r\n"
class TestParsePrefix:
def test_full(self):
assert parse_prefix("nick!user@host") == ("nick", "user", "host")
def test_nick_only(self):
assert parse_prefix("server.example.com") == ("server.example.com", None, None)
def test_nick_host(self):
assert parse_prefix("nick@host") == ("nick", None, "host")
def test_nick_user(self):
assert parse_prefix("nick!user") == ("nick", "user", None)