"""Tests for the cron plugin.""" import asyncio import importlib.util import json import sys from pathlib import Path from derp.irc import Message # plugins/ is not a Python package -- load the module from file path _spec = importlib.util.spec_from_file_location( "plugins.cron", Path(__file__).resolve().parent.parent / "plugins" / "cron.py", ) _mod = importlib.util.module_from_spec(_spec) sys.modules[_spec.name] = _mod _spec.loader.exec_module(_mod) from plugins.cron import ( # noqa: E402 _MAX_JOBS, _delete, _format_duration, _load, _make_id, _parse_duration, _ps, _restore, _save, _start_job, _state_key, _stop_job, cmd_cron, on_connect, ) # -- Helpers ----------------------------------------------------------------- class _FakeState: """In-memory stand-in for bot.state.""" def __init__(self): self._store: dict[str, dict[str, str]] = {} def get(self, plugin: str, key: str, default: str | None = None) -> str | None: return self._store.get(plugin, {}).get(key, default) def set(self, plugin: str, key: str, value: str) -> None: self._store.setdefault(plugin, {})[key] = value def delete(self, plugin: str, key: str) -> bool: try: del self._store[plugin][key] return True except KeyError: return False def keys(self, plugin: str) -> list[str]: return sorted(self._store.get(plugin, {}).keys()) class _FakeBot: """Minimal bot stand-in that captures sent/replied messages.""" def __init__(self, *, admin: bool = True): self.sent: list[tuple[str, str]] = [] self.replied: list[str] = [] self.dispatched: list[Message] = [] self.state = _FakeState() self._pstate: dict = {} self._admin = admin self.prefix = "!" async def send(self, target: str, text: str) -> None: self.sent.append((target, text)) async def reply(self, message, text: str) -> None: self.replied.append(text) def _is_admin(self, message) -> bool: return self._admin def _dispatch_command(self, msg: Message) -> None: self.dispatched.append(msg) def _msg(text: str, nick: str = "admin", target: str = "#test") -> Message: """Create a channel PRIVMSG.""" return Message( raw="", prefix=f"{nick}!~{nick}@host", nick=nick, command="PRIVMSG", params=[target, text], tags={}, ) def _pm(text: str, nick: str = "admin") -> Message: """Create a private PRIVMSG.""" return Message( raw="", prefix=f"{nick}!~{nick}@host", nick=nick, command="PRIVMSG", params=["botname", text], tags={}, ) def _clear(bot=None) -> None: """Reset per-bot plugin state between tests.""" if bot is None: return ps = _ps(bot) for task in ps["tasks"].values(): if task and not task.done(): task.cancel() ps["tasks"].clear() ps["jobs"].clear() # --------------------------------------------------------------------------- # TestParseDuration # --------------------------------------------------------------------------- class TestParseDuration: def test_minutes(self): assert _parse_duration("5m") == 300 def test_hours(self): assert _parse_duration("1h") == 3600 def test_combined(self): assert _parse_duration("1h30m") == 5400 def test_days(self): assert _parse_duration("2d") == 172800 def test_seconds(self): assert _parse_duration("90s") == 90 def test_raw_int(self): assert _parse_duration("300") == 300 def test_zero(self): assert _parse_duration("0") is None def test_negative(self): assert _parse_duration("-5") is None def test_invalid(self): assert _parse_duration("abc") is None def test_empty(self): assert _parse_duration("") is None def test_full_combo(self): assert _parse_duration("1d2h3m4s") == 86400 + 7200 + 180 + 4 # --------------------------------------------------------------------------- # TestFormatDuration # --------------------------------------------------------------------------- class TestFormatDuration: def test_seconds(self): assert _format_duration(45) == "45s" def test_minutes(self): assert _format_duration(300) == "5m" def test_hours(self): assert _format_duration(3600) == "1h" def test_combined(self): assert _format_duration(5400) == "1h30m" def test_days(self): assert _format_duration(86400) == "1d" def test_zero(self): assert _format_duration(0) == "0s" def test_full_combo(self): assert _format_duration(90061) == "1d1h1m1s" # --------------------------------------------------------------------------- # TestMakeId # --------------------------------------------------------------------------- class TestMakeId: def test_returns_hex(self): result = _make_id("#test", "!ping") assert len(result) == 6 int(result, 16) # should not raise def test_unique(self): ids = {_make_id("#test", f"!cmd{i}") for i in range(10)} assert len(ids) == 10 # --------------------------------------------------------------------------- # TestStateHelpers # --------------------------------------------------------------------------- class TestStateHelpers: def test_save_and_load(self): bot = _FakeBot() data = {"id": "abc123", "channel": "#test"} _save(bot, "#test:abc123", data) loaded = _load(bot, "#test:abc123") assert loaded == data def test_load_missing(self): bot = _FakeBot() assert _load(bot, "nonexistent") is None def test_delete(self): bot = _FakeBot() _save(bot, "#test:del", {"id": "del"}) _delete(bot, "#test:del") assert _load(bot, "#test:del") is None def test_state_key(self): assert _state_key("#ops", "abc123") == "#ops:abc123" def test_load_invalid_json(self): bot = _FakeBot() bot.state.set("cron", "bad", "not json{{{") assert _load(bot, "bad") is None # --------------------------------------------------------------------------- # TestCmdCronAdd # --------------------------------------------------------------------------- class TestCmdCronAdd: def test_add_success(self): bot = _FakeBot() async def inner(): await cmd_cron(bot, _msg("!cron add 5m #ops !rss check news")) await asyncio.sleep(0) assert len(bot.replied) == 1 assert "Cron #" in bot.replied[0] assert "!rss check news" in bot.replied[0] assert "5m" in bot.replied[0] assert "#ops" in bot.replied[0] # Verify state persisted keys = bot.state.keys("cron") assert len(keys) == 1 data = json.loads(bot.state.get("cron", keys[0])) assert data["command"] == "!rss check news" assert data["interval"] == 300 assert data["channel"] == "#ops" # Verify task started assert len(_ps(bot)["tasks"]) == 1 _clear(bot) await asyncio.sleep(0) asyncio.run(inner()) def test_add_requires_channel(self): bot = _FakeBot() asyncio.run(cmd_cron(bot, _pm("!cron add 5m #ops !ping"))) assert "Use this command in a channel" in bot.replied[0] def test_add_missing_args(self): bot = _FakeBot() asyncio.run(cmd_cron(bot, _msg("!cron add 5m"))) assert "Usage:" in bot.replied[0] def test_add_invalid_interval(self): bot = _FakeBot() asyncio.run(cmd_cron(bot, _msg("!cron add abc #ops !ping"))) assert "Invalid interval" in bot.replied[0] def test_add_interval_too_short(self): bot = _FakeBot() asyncio.run(cmd_cron(bot, _msg("!cron add 30s #ops !ping"))) assert "Minimum interval" in bot.replied[0] def test_add_interval_too_long(self): bot = _FakeBot() asyncio.run(cmd_cron(bot, _msg("!cron add 30d #ops !ping"))) assert "Maximum interval" in bot.replied[0] def test_add_bad_target(self): bot = _FakeBot() asyncio.run(cmd_cron(bot, _msg("!cron add 5m alice !ping"))) assert "Target must be a channel" in bot.replied[0] def test_add_job_limit(self): bot = _FakeBot() for i in range(_MAX_JOBS): _save(bot, f"#ops:job{i}", {"id": f"job{i}", "channel": "#ops"}) asyncio.run(cmd_cron(bot, _msg("!cron add 5m #ops !ping"))) assert "limit reached" in bot.replied[0] def test_add_admin_required(self): bot = _FakeBot(admin=False) asyncio.run(cmd_cron(bot, _msg("!cron add 5m #ops !ping"))) # The @command(admin=True) decorator handles this via bot._dispatch_command, # but since we call cmd_cron directly, the check is at the decorator level. # In direct call tests, admin check is already handled by the framework. # This test just verifies the command runs without error for non-admin. # Framework-level denial is tested in integration tests. # --------------------------------------------------------------------------- # TestCmdCronDel # --------------------------------------------------------------------------- class TestCmdCronDel: def test_del_success(self): bot = _FakeBot() async def inner(): await cmd_cron(bot, _msg("!cron add 5m #test !ping")) await asyncio.sleep(0) # Extract ID from reply reply = bot.replied[0] cron_id = reply.split("#")[1].split(":")[0] bot.replied.clear() await cmd_cron(bot, _msg(f"!cron del {cron_id}")) assert "Removed" in bot.replied[0] assert cron_id in bot.replied[0] assert len(bot.state.keys("cron")) == 0 _clear(bot) await asyncio.sleep(0) asyncio.run(inner()) def test_del_nonexistent(self): bot = _FakeBot() asyncio.run(cmd_cron(bot, _msg("!cron del nosuch"))) assert "No cron job" in bot.replied[0] def test_del_missing_id(self): bot = _FakeBot() asyncio.run(cmd_cron(bot, _msg("!cron del"))) assert "Usage:" in bot.replied[0] def test_del_with_hash_prefix(self): bot = _FakeBot() async def inner(): await cmd_cron(bot, _msg("!cron add 5m #test !ping")) await asyncio.sleep(0) reply = bot.replied[0] cron_id = reply.split("#")[1].split(":")[0] bot.replied.clear() await cmd_cron(bot, _msg(f"!cron del #{cron_id}")) assert "Removed" in bot.replied[0] _clear(bot) await asyncio.sleep(0) asyncio.run(inner()) # --------------------------------------------------------------------------- # TestCmdCronList # --------------------------------------------------------------------------- class TestCmdCronList: def test_list_empty(self): bot = _FakeBot() asyncio.run(cmd_cron(bot, _msg("!cron list"))) assert "No cron jobs" in bot.replied[0] def test_list_populated(self): bot = _FakeBot() _save(bot, "#test:abc123", { "id": "abc123", "channel": "#test", "command": "!rss check news", "interval": 300, }) asyncio.run(cmd_cron(bot, _msg("!cron list"))) assert "#abc123" in bot.replied[0] assert "5m" in bot.replied[0] assert "!rss check news" in bot.replied[0] def test_list_requires_channel(self): bot = _FakeBot() asyncio.run(cmd_cron(bot, _pm("!cron list"))) assert "Use this command in a channel" in bot.replied[0] def test_list_channel_isolation(self): bot = _FakeBot() _save(bot, "#test:mine", { "id": "mine", "channel": "#test", "command": "!ping", "interval": 300, }) _save(bot, "#other:theirs", { "id": "theirs", "channel": "#other", "command": "!ping", "interval": 600, }) asyncio.run(cmd_cron(bot, _msg("!cron list"))) assert "mine" in bot.replied[0] assert len(bot.replied) == 1 # only the #test job # --------------------------------------------------------------------------- # TestCmdCronUsage # --------------------------------------------------------------------------- class TestCmdCronUsage: def test_no_args(self): bot = _FakeBot() asyncio.run(cmd_cron(bot, _msg("!cron"))) assert "Usage:" in bot.replied[0] def test_unknown_subcommand(self): bot = _FakeBot() asyncio.run(cmd_cron(bot, _msg("!cron foobar"))) assert "Usage:" in bot.replied[0] # --------------------------------------------------------------------------- # TestRestore # --------------------------------------------------------------------------- class TestRestore: def test_restore_spawns_tasks(self): bot = _FakeBot() data = { "id": "abc123", "channel": "#test", "command": "!ping", "interval": 300, "prefix": "admin!~admin@host", "nick": "admin", "added_by": "admin", } _save(bot, "#test:abc123", data) async def inner(): _restore(bot) ps = _ps(bot) assert "#test:abc123" in ps["tasks"] assert not ps["tasks"]["#test:abc123"].done() _clear(bot) await asyncio.sleep(0) asyncio.run(inner()) def test_restore_skips_active(self): bot = _FakeBot() data = { "id": "active", "channel": "#test", "command": "!ping", "interval": 300, "prefix": "admin!~admin@host", "nick": "admin", "added_by": "admin", } _save(bot, "#test:active", data) async def inner(): ps = _ps(bot) dummy = asyncio.create_task(asyncio.sleep(9999)) ps["tasks"]["#test:active"] = dummy _restore(bot) assert ps["tasks"]["#test:active"] is dummy dummy.cancel() await asyncio.sleep(0) asyncio.run(inner()) def test_restore_replaces_done_task(self): bot = _FakeBot() data = { "id": "done", "channel": "#test", "command": "!ping", "interval": 300, "prefix": "admin!~admin@host", "nick": "admin", "added_by": "admin", } _save(bot, "#test:done", data) async def inner(): ps = _ps(bot) done_task = asyncio.create_task(asyncio.sleep(0)) await done_task ps["tasks"]["#test:done"] = done_task _restore(bot) new_task = ps["tasks"]["#test:done"] assert new_task is not done_task assert not new_task.done() _clear(bot) await asyncio.sleep(0) asyncio.run(inner()) def test_restore_skips_bad_json(self): bot = _FakeBot() bot.state.set("cron", "#test:bad", "not json{{{") async def inner(): _restore(bot) assert "#test:bad" not in _ps(bot)["tasks"] asyncio.run(inner()) def test_on_connect_calls_restore(self): bot = _FakeBot() data = { "id": "conn", "channel": "#test", "command": "!ping", "interval": 300, "prefix": "admin!~admin@host", "nick": "admin", "added_by": "admin", } _save(bot, "#test:conn", data) async def inner(): msg = _msg("", target="botname") await on_connect(bot, msg) assert "#test:conn" in _ps(bot)["tasks"] _clear(bot) await asyncio.sleep(0) asyncio.run(inner()) # --------------------------------------------------------------------------- # TestCronLoop # --------------------------------------------------------------------------- class TestCronLoop: def test_dispatches_command(self): """Cron loop dispatches a synthetic message after interval.""" bot = _FakeBot() async def inner(): data = { "id": "loop1", "channel": "#test", "command": "!ping", "interval": 0.05, "prefix": "admin!~admin@host", "nick": "admin", "added_by": "admin", } key = "#test:loop1" _ps(bot)["jobs"][key] = data _start_job(bot, key) await asyncio.sleep(0.15) _stop_job(bot, key) await asyncio.sleep(0) # Should have dispatched at least once assert len(bot.dispatched) >= 1 msg = bot.dispatched[0] assert msg.nick == "admin" assert msg.params[0] == "#test" assert msg.params[1] == "!ping" asyncio.run(inner()) def test_loop_stops_on_job_removal(self): """Cron loop exits when job is removed from jobs dict.""" bot = _FakeBot() async def inner(): data = { "id": "loop2", "channel": "#test", "command": "!ping", "interval": 0.05, "prefix": "admin!~admin@host", "nick": "admin", "added_by": "admin", } key = "#test:loop2" ps = _ps(bot) ps["jobs"][key] = data _start_job(bot, key) await asyncio.sleep(0.02) ps["jobs"].pop(key, None) await asyncio.sleep(0.1) task = ps["tasks"].get(key) if task: assert task.done() asyncio.run(inner()) # --------------------------------------------------------------------------- # TestJobManagement # --------------------------------------------------------------------------- class TestJobManagement: def test_start_and_stop(self): bot = _FakeBot() data = { "id": "mgmt", "channel": "#test", "command": "!ping", "interval": 300, "prefix": "admin!~admin@host", "nick": "admin", "added_by": "admin", } key = "#test:mgmt" ps = _ps(bot) ps["jobs"][key] = data async def inner(): _start_job(bot, key) assert key in ps["tasks"] assert not ps["tasks"][key].done() _stop_job(bot, key) await asyncio.sleep(0) assert key not in ps["tasks"] assert key not in ps["jobs"] asyncio.run(inner()) def test_start_idempotent(self): bot = _FakeBot() data = { "id": "idem", "channel": "#test", "command": "!ping", "interval": 300, "prefix": "admin!~admin@host", "nick": "admin", "added_by": "admin", } key = "#test:idem" ps = _ps(bot) ps["jobs"][key] = data async def inner(): _start_job(bot, key) first = ps["tasks"][key] _start_job(bot, key) assert ps["tasks"][key] is first _stop_job(bot, key) await asyncio.sleep(0) asyncio.run(inner()) def test_stop_nonexistent(self): bot = _FakeBot() _stop_job(bot, "#test:nonexistent")