feat: initial implementation

Asyncio IRC bot with decorator-based plugin system.
Zero external dependencies, Python 3.11+.

- IRC protocol: message parsing, formatting, async TCP/TLS connection
- Plugin system: @command and @event decorators, file-based loading
- Bot orchestrator: connect, dispatch, reconnect, nick recovery
- CLI: argparse entry point with TOML config
- Built-in plugins: ping, help, version, echo
- 28 unit tests for parser and plugin system
This commit is contained in:
user
2026-02-15 00:37:31 +01:00
commit bf45abcbad
23 changed files with 1398 additions and 0 deletions

3
src/derp/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""derp - asyncio IRC bot with plugin system."""
__version__ = "0.1.0"

5
src/derp/__main__.py Normal file
View File

@@ -0,0 +1,5 @@
"""Allow running as `python -m derp`."""
from derp.cli import main
raise SystemExit(main())

159
src/derp/bot.py Normal file
View File

@@ -0,0 +1,159 @@
"""Bot orchestrator: connect, run loop, dispatch events."""
from __future__ import annotations
import asyncio
import logging
from pathlib import Path
from derp.irc import IRCConnection, Message, format_msg, parse
from derp.plugin import PluginRegistry
log = logging.getLogger(__name__)
RECONNECT_DELAY = 30
class Bot:
"""IRC bot: ties connection, config, and plugins together."""
def __init__(self, config: dict, registry: PluginRegistry) -> None:
self.config = config
self.registry = registry
self.conn = IRCConnection(
host=config["server"]["host"],
port=config["server"]["port"],
tls=config["server"]["tls"],
)
self.nick: str = config["server"]["nick"]
self.prefix: str = config["bot"]["prefix"]
self._running = False
async def start(self) -> None:
"""Connect, register, join channels, and enter the main loop."""
self._running = True
while self._running:
try:
await self._connect_and_run()
except (OSError, ConnectionError) as exc:
log.error("connection lost: %s", exc)
if self._running:
log.info("reconnecting in %ds...", RECONNECT_DELAY)
await asyncio.sleep(RECONNECT_DELAY)
async def _connect_and_run(self) -> None:
"""Single connection lifecycle."""
await self.conn.connect()
try:
await self._register()
await self._loop()
finally:
await self.conn.close()
async def _register(self) -> None:
"""Send NICK/USER registration to the server."""
srv = self.config["server"]
if srv.get("password"):
await self.conn.send(format_msg("PASS", srv["password"]))
await self.conn.send(format_msg("NICK", self.nick))
await self.conn.send(format_msg("USER", srv["user"], "0", "*", srv["realname"]))
async def _loop(self) -> None:
"""Read and dispatch messages until disconnect."""
while self._running:
line = await self.conn.readline()
if line is None:
log.warning("server closed connection")
return
msg = parse(line)
await self._handle(msg)
async def _handle(self, msg: Message) -> None:
"""Dispatch a parsed message to the appropriate handlers."""
# Protocol-level PING/PONG
if msg.command == "PING":
await self.conn.send(format_msg("PONG", msg.params[0] if msg.params else ""))
return
# RPL_WELCOME (001) — join channels
if msg.command == "001":
self.nick = msg.params[0] if msg.params else self.nick
for channel in self.config["bot"]["channels"]:
await self.join(channel)
# Nick already in use (433) — append underscore
if msg.command == "433":
self.nick = self.nick + "_"
await self.conn.send(format_msg("NICK", self.nick))
return
# Dispatch to event handlers
event_type = msg.command
for handler in self.registry.events.get(event_type, []):
try:
await handler.callback(self, msg)
except Exception:
log.exception("error in event handler %s", handler.name)
# Dispatch to command handlers (PRIVMSG only)
if msg.command == "PRIVMSG" and msg.text:
await self._dispatch_command(msg)
async def _dispatch_command(self, msg: Message) -> None:
"""Check if a PRIVMSG is a bot command and dispatch it."""
text = msg.text
if not text or not text.startswith(self.prefix):
return
parts = text[len(self.prefix):].split(None, 1)
cmd_name = parts[0].lower() if parts else ""
handler = self.registry.commands.get(cmd_name)
if handler is None:
return
try:
await handler.callback(self, msg)
except Exception:
log.exception("error in command handler '%s'", cmd_name)
# -- Public API for plugins --
async def send(self, target: str, text: str) -> None:
"""Send a PRIVMSG to a target (channel or nick)."""
for line in text.split("\n"):
await self.conn.send(format_msg("PRIVMSG", target, line))
async def reply(self, msg: Message, text: str) -> None:
"""Reply to the source of a message (channel or PM)."""
target = msg.target if msg.is_channel else msg.nick
if target:
await self.send(target, text)
async def action(self, target: str, text: str) -> None:
"""Send a CTCP ACTION (/me) to a target."""
await self.send(target, f"\x01ACTION {text}\x01")
async def join(self, channel: str) -> None:
"""Join an IRC channel."""
await self.conn.send(format_msg("JOIN", channel))
log.info("joining %s", channel)
async def part(self, channel: str, reason: str = "") -> None:
"""Part an IRC channel."""
if reason:
await self.conn.send(format_msg("PART", channel, reason))
else:
await self.conn.send(format_msg("PART", channel))
async def quit(self, reason: str = "bye") -> None:
"""Quit the IRC server and stop the bot."""
self._running = False
await self.conn.send(format_msg("QUIT", reason))
await self.conn.close()
def load_plugins(self, plugins_dir: str | Path | None = None) -> None:
"""Load plugins from the configured directory."""
if plugins_dir is None:
plugins_dir = self.config["bot"].get("plugins_dir", "plugins")
path = Path(plugins_dir)
self.registry.load_directory(path)

69
src/derp/cli.py Normal file
View File

@@ -0,0 +1,69 @@
"""CLI entry point for derp."""
from __future__ import annotations
import argparse
import asyncio
import logging
import sys
from derp import __version__
from derp.bot import Bot
from derp.config import resolve_config
from derp.plugin import PluginRegistry
LOG_FORMAT = "%(asctime)s %(levelname)-5s %(name)s %(message)s"
LOG_DATE = "%H:%M:%S"
def build_parser() -> argparse.ArgumentParser:
"""Build the argument parser."""
p = argparse.ArgumentParser(
prog="derp",
description="Asyncio IRC bot with plugin system",
)
p.add_argument(
"-c", "--config",
metavar="PATH",
help="path to config file [config/derp.toml]",
)
p.add_argument(
"-v", "--verbose",
action="store_true",
help="enable debug logging",
)
p.add_argument(
"-V", "--version",
action="version",
version=f"derp {__version__}",
)
return p
def main(argv: list[str] | None = None) -> int:
"""Main entry point."""
parser = build_parser()
args = parser.parse_args(argv)
level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(format=LOG_FORMAT, datefmt=LOG_DATE, level=level)
config = resolve_config(args.config)
log = logging.getLogger("derp")
log.info("derp %s starting", __version__)
registry = PluginRegistry()
bot = Bot(config, registry)
bot.load_plugins()
try:
asyncio.run(bot.start())
except KeyboardInterrupt:
log.info("interrupted, shutting down")
return 0
if __name__ == "__main__":
sys.exit(main())

57
src/derp/config.py Normal file
View File

@@ -0,0 +1,57 @@
"""TOML configuration loader."""
from __future__ import annotations
import tomllib
from pathlib import Path
DEFAULTS: dict = {
"server": {
"host": "irc.libera.chat",
"port": 6697,
"tls": True,
"nick": "derp",
"user": "derp",
"realname": "derp IRC bot",
"password": "",
},
"bot": {
"prefix": "!",
"channels": ["#test"],
"plugins_dir": "plugins",
},
"logging": {
"level": "info",
},
}
def _merge(base: dict, override: dict) -> dict:
"""Deep-merge override into base, returning a new dict."""
merged = base.copy()
for key, val in override.items():
if key in merged and isinstance(merged[key], dict) and isinstance(val, dict):
merged[key] = _merge(merged[key], val)
else:
merged[key] = val
return merged
def load(path: Path) -> dict:
"""Load TOML config from path, merged with defaults."""
with open(path, "rb") as f:
user_config = tomllib.load(f)
return _merge(DEFAULTS, user_config)
def resolve_config(path: str | None) -> dict:
"""Resolve config path and load. Falls back to defaults if no file found."""
search_paths = [
Path(path) if path else None,
Path("config/derp.toml"),
Path.home() / ".config" / "derp" / "derp.toml",
]
for p in search_paths:
if p and p.is_file():
return load(p)
return DEFAULTS.copy()

144
src/derp/irc.py Normal file
View File

@@ -0,0 +1,144 @@
"""IRC protocol: message parsing, formatting, and async connection."""
from __future__ import annotations
import asyncio
import logging
import ssl
from dataclasses import dataclass
log = logging.getLogger(__name__)
@dataclass(slots=True)
class Message:
"""Parsed IRC message (RFC 1459)."""
raw: str
prefix: str | None
nick: str | None
command: str
params: list[str]
@property
def target(self) -> str | None:
"""First param — typically channel or nick."""
return self.params[0] if self.params else None
@property
def text(self) -> str | None:
"""Trailing text (last param if prefixed with ':' in raw)."""
return self.params[-1] if self.params else None
@property
def is_channel(self) -> bool:
"""Whether the target is a channel."""
return bool(self.target and self.target.startswith(("#", "&")))
def parse(line: str) -> Message:
"""Parse a raw IRC line into a Message.
Format: [:prefix] command [params...] [:trailing]
"""
raw = line
prefix = None
nick = None
if line.startswith(":"):
prefix, line = line[1:].split(" ", 1)
if "!" in prefix:
nick = prefix.split("!")[0]
else:
nick = None
trailing = None
if " :" in line:
line, trailing = line.split(" :", 1)
parts = line.split()
command = parts[0].upper() if parts else ""
params = parts[1:]
if trailing is not None:
params.append(trailing)
return Message(raw=raw, prefix=prefix, nick=nick, command=command, params=params)
def format_msg(command: str, *params: str) -> str:
"""Format an IRC command into a raw line.
The last param is automatically prefixed with ':' if it contains spaces
or is the trailing argument.
"""
if not params:
return command
*head, tail = params
if " " in tail or tail.startswith(":") or not head:
prefix = f"{command} {' '.join(head)}" if head else command
return f"{prefix} :{tail}"
return f"{command} {' '.join(params)}"
class IRCConnection:
"""Async TCP/TLS connection to an IRC server."""
def __init__(self, host: str, port: int, tls: bool = True) -> None:
self.host = host
self.port = port
self.tls = tls
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
async def connect(self) -> None:
"""Open connection to the IRC server."""
ssl_ctx = None
if self.tls:
ssl_ctx = ssl.create_default_context()
log.info("connecting to %s:%d (tls=%s)", self.host, self.port, self.tls)
self._reader, self._writer = await asyncio.open_connection(
self.host, self.port, ssl=ssl_ctx
)
log.info("connected")
async def send(self, line: str) -> None:
"""Send a raw IRC line (appends CRLF)."""
if self._writer is None:
raise RuntimeError("not connected")
data = f"{line}\r\n".encode("utf-8")
self._writer.write(data)
await self._writer.drain()
log.debug(">>> %s", line)
async def readline(self) -> str | None:
"""Read one IRC line. Returns None on EOF."""
if self._reader is None:
raise RuntimeError("not connected")
try:
data = await self._reader.readline()
except (ConnectionResetError, asyncio.IncompleteReadError):
return None
if not data:
return None
line = data.decode("utf-8", errors="replace").strip()
log.debug("<<< %s", line)
return line
async def close(self) -> None:
"""Close the connection."""
if self._writer:
self._writer.close()
try:
await self._writer.wait_closed()
except Exception:
pass
self._writer = None
self._reader = None
log.info("connection closed")
@property
def connected(self) -> bool:
"""Whether the connection is open."""
return self._writer is not None and not self._writer.is_closing()

125
src/derp/plugin.py Normal file
View File

@@ -0,0 +1,125 @@
"""Plugin system: decorators, registry, and loader."""
from __future__ import annotations
import importlib.util
import inspect
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable
log = logging.getLogger(__name__)
@dataclass(slots=True)
class Handler:
"""A registered command or event handler."""
name: str
callback: Callable
help: str = ""
plugin: str = ""
def command(name: str, help: str = "") -> Callable:
"""Decorator to register an async function as a bot command.
Usage::
@command("ping", help="Check if the bot is alive")
async def cmd_ping(bot, message):
await bot.reply(message, "pong")
"""
def decorator(func: Callable) -> Callable:
func._derp_command = name # type: ignore[attr-defined]
func._derp_help = help # type: ignore[attr-defined]
return func
return decorator
def event(event_type: str) -> Callable:
"""Decorator to register an async function as an event handler.
Usage::
@event("join")
async def on_join(bot, message):
await bot.send(message.target, f"Welcome, {message.nick}!")
"""
def decorator(func: Callable) -> Callable:
func._derp_event = event_type.upper() # type: ignore[attr-defined]
return func
return decorator
class PluginRegistry:
"""Collects and manages command/event handlers from plugin modules."""
def __init__(self) -> None:
self.commands: dict[str, Handler] = {}
self.events: dict[str, list[Handler]] = {}
self._modules: dict[str, Any] = {}
def register_command(self, name: str, callback: Callable, help: str = "",
plugin: str = "") -> None:
"""Register a command handler."""
if name in self.commands:
log.warning("command '%s' already registered, overwriting", name)
self.commands[name] = Handler(name=name, callback=callback, help=help, plugin=plugin)
log.debug("registered command: %s (%s)", name, plugin)
def register_event(self, event_type: str, callback: Callable, plugin: str = "") -> None:
"""Register an event handler."""
event_type = event_type.upper()
if event_type not in self.events:
self.events[event_type] = []
handler = Handler(name=event_type, callback=callback, plugin=plugin)
self.events[event_type].append(handler)
log.debug("registered event: %s (%s)", event_type, plugin)
def _scan_module(self, module: Any, plugin_name: str) -> int:
"""Scan a module for decorated handlers. Returns count of handlers found."""
count = 0
for _name, obj in inspect.getmembers(module, inspect.isfunction):
if hasattr(obj, "_derp_command"):
self.register_command(
obj._derp_command, obj,
help=getattr(obj, "_derp_help", ""),
plugin=plugin_name,
)
count += 1
if hasattr(obj, "_derp_event"):
self.register_event(obj._derp_event, obj, plugin=plugin_name)
count += 1
return count
def load_plugin(self, path: Path) -> None:
"""Load a single plugin from a .py file."""
plugin_name = path.stem
if plugin_name.startswith("_"):
return
try:
spec = importlib.util.spec_from_file_location(f"derp.plugins.{plugin_name}", path)
if spec is None or spec.loader is None:
log.error("failed to create spec for %s", path)
return
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
count = self._scan_module(module, plugin_name)
self._modules[plugin_name] = module
log.info("loaded plugin: %s (%d handlers)", plugin_name, count)
except Exception:
log.exception("failed to load plugin: %s", path)
def load_directory(self, dir_path: Path) -> None:
"""Load all .py plugin files from a directory."""
if not dir_path.is_dir():
log.warning("plugins directory not found: %s", dir_path)
return
for path in sorted(dir_path.glob("*.py")):
self.load_plugin(path)