Files
derp/tests/test_cron.py
user 073659607e feat: add multi-server support
Connect to multiple IRC servers concurrently from a single config file.
Plugins are loaded once and shared; per-server state is isolated via
separate SQLite databases and per-bot runtime state (bot._pstate).

- Add build_server_configs() for [servers.*] config layout
- Bot.__init__ gains name parameter, _pstate dict for plugin isolation
- cli.py runs multiple bots via asyncio.gather
- 9 stateful plugins migrated from module-level dicts to _ps(bot) pattern
- Backward compatible: legacy [server] config works unchanged

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 19:04:20 +01:00

621 lines
19 KiB
Python

"""Tests for the cron plugin."""
import asyncio
import importlib.util
import json
import sys
from pathlib import Path
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.cron", Path(__file__).resolve().parent.parent / "plugins" / "cron.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.cron import ( # noqa: E402
_MAX_JOBS,
_delete,
_format_duration,
_load,
_make_id,
_parse_duration,
_ps,
_restore,
_save,
_start_job,
_state_key,
_stop_job,
cmd_cron,
on_connect,
)
# -- Helpers -----------------------------------------------------------------
class _FakeState:
"""In-memory stand-in for bot.state."""
def __init__(self):
self._store: dict[str, dict[str, str]] = {}
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
return self._store.get(plugin, {}).get(key, default)
def set(self, plugin: str, key: str, value: str) -> None:
self._store.setdefault(plugin, {})[key] = value
def delete(self, plugin: str, key: str) -> bool:
try:
del self._store[plugin][key]
return True
except KeyError:
return False
def keys(self, plugin: str) -> list[str]:
return sorted(self._store.get(plugin, {}).keys())
class _FakeBot:
"""Minimal bot stand-in that captures sent/replied messages."""
def __init__(self, *, admin: bool = True):
self.sent: list[tuple[str, str]] = []
self.replied: list[str] = []
self.dispatched: list[Message] = []
self.state = _FakeState()
self._pstate: dict = {}
self._admin = admin
self.prefix = "!"
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
async def reply(self, message, text: str) -> None:
self.replied.append(text)
def _is_admin(self, message) -> bool:
return self._admin
def _dispatch_command(self, msg: Message) -> None:
self.dispatched.append(msg)
def _msg(text: str, nick: str = "admin", target: str = "#test") -> Message:
"""Create a channel PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
def _pm(text: str, nick: str = "admin") -> Message:
"""Create a private PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=["botname", text], tags={},
)
def _clear(bot=None) -> None:
"""Reset per-bot plugin state between tests."""
if bot is None:
return
ps = _ps(bot)
for task in ps["tasks"].values():
if task and not task.done():
task.cancel()
ps["tasks"].clear()
ps["jobs"].clear()
# ---------------------------------------------------------------------------
# TestParseDuration
# ---------------------------------------------------------------------------
class TestParseDuration:
def test_minutes(self):
assert _parse_duration("5m") == 300
def test_hours(self):
assert _parse_duration("1h") == 3600
def test_combined(self):
assert _parse_duration("1h30m") == 5400
def test_days(self):
assert _parse_duration("2d") == 172800
def test_seconds(self):
assert _parse_duration("90s") == 90
def test_raw_int(self):
assert _parse_duration("300") == 300
def test_zero(self):
assert _parse_duration("0") is None
def test_negative(self):
assert _parse_duration("-5") is None
def test_invalid(self):
assert _parse_duration("abc") is None
def test_empty(self):
assert _parse_duration("") is None
def test_full_combo(self):
assert _parse_duration("1d2h3m4s") == 86400 + 7200 + 180 + 4
# ---------------------------------------------------------------------------
# TestFormatDuration
# ---------------------------------------------------------------------------
class TestFormatDuration:
def test_seconds(self):
assert _format_duration(45) == "45s"
def test_minutes(self):
assert _format_duration(300) == "5m"
def test_hours(self):
assert _format_duration(3600) == "1h"
def test_combined(self):
assert _format_duration(5400) == "1h30m"
def test_days(self):
assert _format_duration(86400) == "1d"
def test_zero(self):
assert _format_duration(0) == "0s"
def test_full_combo(self):
assert _format_duration(90061) == "1d1h1m1s"
# ---------------------------------------------------------------------------
# TestMakeId
# ---------------------------------------------------------------------------
class TestMakeId:
def test_returns_hex(self):
result = _make_id("#test", "!ping")
assert len(result) == 6
int(result, 16) # should not raise
def test_unique(self):
ids = {_make_id("#test", f"!cmd{i}") for i in range(10)}
assert len(ids) == 10
# ---------------------------------------------------------------------------
# TestStateHelpers
# ---------------------------------------------------------------------------
class TestStateHelpers:
def test_save_and_load(self):
bot = _FakeBot()
data = {"id": "abc123", "channel": "#test"}
_save(bot, "#test:abc123", data)
loaded = _load(bot, "#test:abc123")
assert loaded == data
def test_load_missing(self):
bot = _FakeBot()
assert _load(bot, "nonexistent") is None
def test_delete(self):
bot = _FakeBot()
_save(bot, "#test:del", {"id": "del"})
_delete(bot, "#test:del")
assert _load(bot, "#test:del") is None
def test_state_key(self):
assert _state_key("#ops", "abc123") == "#ops:abc123"
def test_load_invalid_json(self):
bot = _FakeBot()
bot.state.set("cron", "bad", "not json{{{")
assert _load(bot, "bad") is None
# ---------------------------------------------------------------------------
# TestCmdCronAdd
# ---------------------------------------------------------------------------
class TestCmdCronAdd:
def test_add_success(self):
bot = _FakeBot()
async def inner():
await cmd_cron(bot, _msg("!cron add 5m #ops !rss check news"))
await asyncio.sleep(0)
assert len(bot.replied) == 1
assert "Cron #" in bot.replied[0]
assert "!rss check news" in bot.replied[0]
assert "5m" in bot.replied[0]
assert "#ops" in bot.replied[0]
# Verify state persisted
keys = bot.state.keys("cron")
assert len(keys) == 1
data = json.loads(bot.state.get("cron", keys[0]))
assert data["command"] == "!rss check news"
assert data["interval"] == 300
assert data["channel"] == "#ops"
# Verify task started
assert len(_ps(bot)["tasks"]) == 1
_clear(bot)
await asyncio.sleep(0)
asyncio.run(inner())
def test_add_requires_channel(self):
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _pm("!cron add 5m #ops !ping")))
assert "Use this command in a channel" in bot.replied[0]
def test_add_missing_args(self):
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron add 5m")))
assert "Usage:" in bot.replied[0]
def test_add_invalid_interval(self):
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron add abc #ops !ping")))
assert "Invalid interval" in bot.replied[0]
def test_add_interval_too_short(self):
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron add 30s #ops !ping")))
assert "Minimum interval" in bot.replied[0]
def test_add_interval_too_long(self):
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron add 30d #ops !ping")))
assert "Maximum interval" in bot.replied[0]
def test_add_bad_target(self):
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron add 5m alice !ping")))
assert "Target must be a channel" in bot.replied[0]
def test_add_job_limit(self):
bot = _FakeBot()
for i in range(_MAX_JOBS):
_save(bot, f"#ops:job{i}", {"id": f"job{i}", "channel": "#ops"})
asyncio.run(cmd_cron(bot, _msg("!cron add 5m #ops !ping")))
assert "limit reached" in bot.replied[0]
def test_add_admin_required(self):
bot = _FakeBot(admin=False)
asyncio.run(cmd_cron(bot, _msg("!cron add 5m #ops !ping")))
# The @command(admin=True) decorator handles this via bot._dispatch_command,
# but since we call cmd_cron directly, the check is at the decorator level.
# In direct call tests, admin check is already handled by the framework.
# This test just verifies the command runs without error for non-admin.
# Framework-level denial is tested in integration tests.
# ---------------------------------------------------------------------------
# TestCmdCronDel
# ---------------------------------------------------------------------------
class TestCmdCronDel:
def test_del_success(self):
bot = _FakeBot()
async def inner():
await cmd_cron(bot, _msg("!cron add 5m #test !ping"))
await asyncio.sleep(0)
# Extract ID from reply
reply = bot.replied[0]
cron_id = reply.split("#")[1].split(":")[0]
bot.replied.clear()
await cmd_cron(bot, _msg(f"!cron del {cron_id}"))
assert "Removed" in bot.replied[0]
assert cron_id in bot.replied[0]
assert len(bot.state.keys("cron")) == 0
_clear(bot)
await asyncio.sleep(0)
asyncio.run(inner())
def test_del_nonexistent(self):
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron del nosuch")))
assert "No cron job" in bot.replied[0]
def test_del_missing_id(self):
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron del")))
assert "Usage:" in bot.replied[0]
def test_del_with_hash_prefix(self):
bot = _FakeBot()
async def inner():
await cmd_cron(bot, _msg("!cron add 5m #test !ping"))
await asyncio.sleep(0)
reply = bot.replied[0]
cron_id = reply.split("#")[1].split(":")[0]
bot.replied.clear()
await cmd_cron(bot, _msg(f"!cron del #{cron_id}"))
assert "Removed" in bot.replied[0]
_clear(bot)
await asyncio.sleep(0)
asyncio.run(inner())
# ---------------------------------------------------------------------------
# TestCmdCronList
# ---------------------------------------------------------------------------
class TestCmdCronList:
def test_list_empty(self):
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron list")))
assert "No cron jobs" in bot.replied[0]
def test_list_populated(self):
bot = _FakeBot()
_save(bot, "#test:abc123", {
"id": "abc123", "channel": "#test",
"command": "!rss check news", "interval": 300,
})
asyncio.run(cmd_cron(bot, _msg("!cron list")))
assert "#abc123" in bot.replied[0]
assert "5m" in bot.replied[0]
assert "!rss check news" in bot.replied[0]
def test_list_requires_channel(self):
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _pm("!cron list")))
assert "Use this command in a channel" in bot.replied[0]
def test_list_channel_isolation(self):
bot = _FakeBot()
_save(bot, "#test:mine", {
"id": "mine", "channel": "#test",
"command": "!ping", "interval": 300,
})
_save(bot, "#other:theirs", {
"id": "theirs", "channel": "#other",
"command": "!ping", "interval": 600,
})
asyncio.run(cmd_cron(bot, _msg("!cron list")))
assert "mine" in bot.replied[0]
assert len(bot.replied) == 1 # only the #test job
# ---------------------------------------------------------------------------
# TestCmdCronUsage
# ---------------------------------------------------------------------------
class TestCmdCronUsage:
def test_no_args(self):
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron")))
assert "Usage:" in bot.replied[0]
def test_unknown_subcommand(self):
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron foobar")))
assert "Usage:" in bot.replied[0]
# ---------------------------------------------------------------------------
# TestRestore
# ---------------------------------------------------------------------------
class TestRestore:
def test_restore_spawns_tasks(self):
bot = _FakeBot()
data = {
"id": "abc123", "channel": "#test",
"command": "!ping", "interval": 300,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
_save(bot, "#test:abc123", data)
async def inner():
_restore(bot)
ps = _ps(bot)
assert "#test:abc123" in ps["tasks"]
assert not ps["tasks"]["#test:abc123"].done()
_clear(bot)
await asyncio.sleep(0)
asyncio.run(inner())
def test_restore_skips_active(self):
bot = _FakeBot()
data = {
"id": "active", "channel": "#test",
"command": "!ping", "interval": 300,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
_save(bot, "#test:active", data)
async def inner():
ps = _ps(bot)
dummy = asyncio.create_task(asyncio.sleep(9999))
ps["tasks"]["#test:active"] = dummy
_restore(bot)
assert ps["tasks"]["#test:active"] is dummy
dummy.cancel()
await asyncio.sleep(0)
asyncio.run(inner())
def test_restore_replaces_done_task(self):
bot = _FakeBot()
data = {
"id": "done", "channel": "#test",
"command": "!ping", "interval": 300,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
_save(bot, "#test:done", data)
async def inner():
ps = _ps(bot)
done_task = asyncio.create_task(asyncio.sleep(0))
await done_task
ps["tasks"]["#test:done"] = done_task
_restore(bot)
new_task = ps["tasks"]["#test:done"]
assert new_task is not done_task
assert not new_task.done()
_clear(bot)
await asyncio.sleep(0)
asyncio.run(inner())
def test_restore_skips_bad_json(self):
bot = _FakeBot()
bot.state.set("cron", "#test:bad", "not json{{{")
async def inner():
_restore(bot)
assert "#test:bad" not in _ps(bot)["tasks"]
asyncio.run(inner())
def test_on_connect_calls_restore(self):
bot = _FakeBot()
data = {
"id": "conn", "channel": "#test",
"command": "!ping", "interval": 300,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
_save(bot, "#test:conn", data)
async def inner():
msg = _msg("", target="botname")
await on_connect(bot, msg)
assert "#test:conn" in _ps(bot)["tasks"]
_clear(bot)
await asyncio.sleep(0)
asyncio.run(inner())
# ---------------------------------------------------------------------------
# TestCronLoop
# ---------------------------------------------------------------------------
class TestCronLoop:
def test_dispatches_command(self):
"""Cron loop dispatches a synthetic message after interval."""
bot = _FakeBot()
async def inner():
data = {
"id": "loop1", "channel": "#test",
"command": "!ping", "interval": 0.05,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
key = "#test:loop1"
_ps(bot)["jobs"][key] = data
_start_job(bot, key)
await asyncio.sleep(0.15)
_stop_job(bot, key)
await asyncio.sleep(0)
# Should have dispatched at least once
assert len(bot.dispatched) >= 1
msg = bot.dispatched[0]
assert msg.nick == "admin"
assert msg.params[0] == "#test"
assert msg.params[1] == "!ping"
asyncio.run(inner())
def test_loop_stops_on_job_removal(self):
"""Cron loop exits when job is removed from jobs dict."""
bot = _FakeBot()
async def inner():
data = {
"id": "loop2", "channel": "#test",
"command": "!ping", "interval": 0.05,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
key = "#test:loop2"
ps = _ps(bot)
ps["jobs"][key] = data
_start_job(bot, key)
await asyncio.sleep(0.02)
ps["jobs"].pop(key, None)
await asyncio.sleep(0.1)
task = ps["tasks"].get(key)
if task:
assert task.done()
asyncio.run(inner())
# ---------------------------------------------------------------------------
# TestJobManagement
# ---------------------------------------------------------------------------
class TestJobManagement:
def test_start_and_stop(self):
bot = _FakeBot()
data = {
"id": "mgmt", "channel": "#test",
"command": "!ping", "interval": 300,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
key = "#test:mgmt"
ps = _ps(bot)
ps["jobs"][key] = data
async def inner():
_start_job(bot, key)
assert key in ps["tasks"]
assert not ps["tasks"][key].done()
_stop_job(bot, key)
await asyncio.sleep(0)
assert key not in ps["tasks"]
assert key not in ps["jobs"]
asyncio.run(inner())
def test_start_idempotent(self):
bot = _FakeBot()
data = {
"id": "idem", "channel": "#test",
"command": "!ping", "interval": 300,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
key = "#test:idem"
ps = _ps(bot)
ps["jobs"][key] = data
async def inner():
_start_job(bot, key)
first = ps["tasks"][key]
_start_job(bot, key)
assert ps["tasks"][key] is first
_stop_job(bot, key)
await asyncio.sleep(0)
asyncio.run(inner())
def test_stop_nonexistent(self):
bot = _FakeBot()
_stop_job(bot, "#test:nonexistent")