Files
derp/tests/test_pastemoni.py
user 073659607e feat: add multi-server support
Connect to multiple IRC servers concurrently from a single config file.
Plugins are loaded once and shared; per-server state is isolated via
separate SQLite databases and per-bot runtime state (bot._pstate).

- Add build_server_configs() for [servers.*] config layout
- Bot.__init__ gains name parameter, _pstate dict for plugin isolation
- cli.py runs multiple bots via asyncio.gather
- 9 stateful plugins migrated from module-level dicts to _ps(bot) pattern
- Backward compatible: legacy [server] config works unchanged

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 19:04:20 +01:00

983 lines
32 KiB
Python

"""Tests for the paste site keyword monitor plugin."""
import asyncio
import importlib.util
import json
import sys
from pathlib import Path
from unittest.mock import patch
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.pastemoni",
Path(__file__).resolve().parent.parent / "plugins" / "pastemoni.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.pastemoni import ( # noqa: E402
_MAX_SEEN,
_ArchiveParser,
_delete,
_fetch_gists,
_fetch_pastebin,
_load,
_poll_once,
_ps,
_restore,
_save,
_snippet_around,
_start_poller,
_state_key,
_stop_poller,
_truncate,
_validate_name,
cmd_pastemoni,
on_connect,
)
# -- Fixtures ----------------------------------------------------------------
ARCHIVE_HTML = """\
<html><body>
<table class="maintable">
<tr><td><a href="/AbCd1234">Leaked api_key dump</a></td><td>10 sec ago</td></tr>
<tr><td><a href="/EfGh5678">Random notes</a></td><td>30 sec ago</td></tr>
<tr><td><a href="/IjKl9012">Config file</a></td><td>1 min ago</td></tr>
</table>
</body></html>"""
RAW_PASTES = {
"EfGh5678": b"this paste has api_key = ABCDEF123456 inside",
"IjKl9012": b"nothing to see here",
}
GISTS_RESPONSE = [
{
"id": "gist1",
"description": "contains aws_secret_key configuration",
"html_url": "https://gist.github.com/user1/gist1",
"files": {"config.py": {}},
},
{
"id": "gist2",
"description": "hello world example",
"html_url": "https://gist.github.com/user2/gist2",
"files": {"hello.py": {}},
},
{
"id": "gist3",
"description": "utility scripts",
"html_url": "https://gist.github.com/user3/gist3",
"files": {"aws_secret_key.env": {}},
},
]
# -- Helpers -----------------------------------------------------------------
class _FakeResp:
"""Mock HTTP response."""
def __init__(self, body):
self._body = body if isinstance(body, bytes) else body.encode()
def read(self):
return self._body
def close(self):
pass
class _FakeState:
"""In-memory stand-in for bot.state."""
def __init__(self):
self._store: dict[str, dict[str, str]] = {}
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
return self._store.get(plugin, {}).get(key, default)
def set(self, plugin: str, key: str, value: str) -> None:
self._store.setdefault(plugin, {})[key] = value
def delete(self, plugin: str, key: str) -> bool:
try:
del self._store[plugin][key]
return True
except KeyError:
return False
def keys(self, plugin: str) -> list[str]:
return sorted(self._store.get(plugin, {}).keys())
class _FakeRegistry:
"""Minimal registry stand-in."""
def __init__(self):
self._modules: dict = {}
class _FakeBot:
"""Minimal bot stand-in that captures sent/replied messages."""
def __init__(self, *, admin: bool = False):
self.sent: list[tuple[str, str]] = []
self.replied: list[str] = []
self.state = _FakeState()
self.registry = _FakeRegistry()
self._pstate: dict = {}
self._admin = admin
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
async def reply(self, message, text: str) -> None:
self.replied.append(text)
async def shorten_url(self, url: str) -> str:
return url
def _is_admin(self, message) -> bool:
return self._admin
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
"""Create a channel PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
def _pm(text: str, nick: str = "alice") -> Message:
"""Create a private PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=["botname", text], tags={},
)
def _clear(bot=None) -> None:
"""Reset per-bot plugin state between tests."""
if bot is None:
return
ps = _ps(bot)
for task in ps["pollers"].values():
if task and not task.done():
task.cancel()
ps["pollers"].clear()
ps["monitors"].clear()
ps["errors"].clear()
def _fake_pb(keyword):
"""Fake Pastebin backend returning two results."""
return [
{"id": "pb1", "title": "Test paste",
"url": "https://pastebin.com/pb1", "snippet": "...test keyword..."},
{"id": "pb2", "title": "Another paste",
"url": "https://pastebin.com/pb2", "snippet": "...test data..."},
]
def _fake_gh(keyword):
"""Fake Gists backend returning one result."""
return [
{"id": "gh1", "title": "Test gist",
"url": "https://gist.github.com/gh1", "snippet": "...test content..."},
]
def _fake_pb_error(keyword):
"""Fake Pastebin backend that raises."""
raise ConnectionError("Pastebin down")
def _fake_gh_error(keyword):
"""Fake Gists backend that raises."""
raise ConnectionError("GitHub down")
_FAKE_BACKENDS = {"pb": _fake_pb, "gh": _fake_gh}
def _mock_pb_urlopen(req, **kw):
"""Mock urlopen for Pastebin tests: archive HTML + raw pastes."""
url = req.full_url if hasattr(req, "full_url") else str(req)
if "pastebin.com/archive" in url:
return _FakeResp(ARCHIVE_HTML)
if "pastebin.com/raw/" in url:
paste_id = url.rsplit("/", 1)[-1]
body = RAW_PASTES.get(paste_id, b"no match here")
return _FakeResp(body)
raise ValueError(f"unexpected URL: {url}")
# ---------------------------------------------------------------------------
# TestArchiveParser
# ---------------------------------------------------------------------------
class TestArchiveParser:
def test_extracts_paste_links(self):
parser = _ArchiveParser()
parser.feed(ARCHIVE_HTML)
assert len(parser.links) == 3
assert parser.links[0] == ("AbCd1234", "Leaked api_key dump")
assert parser.links[1] == ("EfGh5678", "Random notes")
assert parser.links[2] == ("IjKl9012", "Config file")
def test_ignores_non_paste_links(self):
html = '<a href="/archive">Archive</a><a href="/about">About</a>'
parser = _ArchiveParser()
parser.feed(html)
assert parser.links == []
def test_empty_html(self):
parser = _ArchiveParser()
parser.feed("<html></html>")
assert parser.links == []
# ---------------------------------------------------------------------------
# TestFetchPastebin
# ---------------------------------------------------------------------------
class TestFetchPastebin:
def test_keyword_in_title(self):
with patch.object(_mod, "_urlopen", side_effect=_mock_pb_urlopen):
results = _fetch_pastebin("api_key")
# AbCd1234 matches by title, EfGh5678 matches by content
ids = [r["id"] for r in results]
assert "AbCd1234" in ids
# Title match should have empty snippet
title_match = next(r for r in results if r["id"] == "AbCd1234")
assert title_match["snippet"] == ""
assert "pastebin.com/AbCd1234" in title_match["url"]
def test_keyword_in_content(self):
with patch.object(_mod, "_urlopen", side_effect=_mock_pb_urlopen):
results = _fetch_pastebin("api_key")
content_match = next(r for r in results if r["id"] == "EfGh5678")
assert "api_key" in content_match["snippet"].lower()
assert "pastebin.com/EfGh5678" in content_match["url"]
def test_no_match(self):
with patch.object(_mod, "_urlopen", side_effect=_mock_pb_urlopen):
results = _fetch_pastebin("nonexistent_keyword_xyz")
assert results == []
def test_network_error(self):
import pytest
with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")):
with pytest.raises(ConnectionError):
_fetch_pastebin("test")
# ---------------------------------------------------------------------------
# TestFetchGists
# ---------------------------------------------------------------------------
class TestFetchGists:
def test_keyword_in_description(self):
with patch.object(
_mod, "_urlopen",
return_value=_FakeResp(json.dumps(GISTS_RESPONSE).encode()),
):
results = _fetch_gists("aws_secret_key")
ids = [r["id"] for r in results]
assert "gist1" in ids
match = next(r for r in results if r["id"] == "gist1")
assert "aws_secret_key" in match["title"].lower()
def test_keyword_in_filename(self):
with patch.object(
_mod, "_urlopen",
return_value=_FakeResp(json.dumps(GISTS_RESPONSE).encode()),
):
results = _fetch_gists("aws_secret_key")
ids = [r["id"] for r in results]
assert "gist3" in ids
def test_no_match(self):
with patch.object(
_mod, "_urlopen",
return_value=_FakeResp(json.dumps(GISTS_RESPONSE).encode()),
):
results = _fetch_gists("nonexistent_keyword_xyz")
assert results == []
def test_api_error(self):
import pytest
with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")):
with pytest.raises(ConnectionError):
_fetch_gists("test")
def test_non_list_response(self):
"""API returning non-list JSON returns empty results."""
with patch.object(
_mod, "_urlopen",
return_value=_FakeResp(json.dumps({"error": "rate limited"}).encode()),
):
results = _fetch_gists("test")
assert results == []
# ---------------------------------------------------------------------------
# TestSnippetAround
# ---------------------------------------------------------------------------
class TestSnippetAround:
def test_short_text_returned_as_is(self):
assert _snippet_around("hello world", "hello") == "hello world"
def test_long_text_shows_context(self):
text = "x" * 50 + "KEYWORD" + "y" * 50
result = _snippet_around(text, "KEYWORD", max_len=40)
assert "KEYWORD" in result
assert "..." in result
def test_empty_text(self):
assert _snippet_around("", "test") == ""
def test_keyword_not_found(self):
result = _snippet_around("a" * 100, "missing", max_len=40)
assert result.endswith("...")
assert len(result) <= 43 # 40 + "..."
# ---------------------------------------------------------------------------
# TestValidateName
# ---------------------------------------------------------------------------
class TestValidateName:
def test_valid(self):
assert _validate_name("leak-watch") is True
def test_valid_numbers(self):
assert _validate_name("test123") is True
def test_invalid_uppercase(self):
assert _validate_name("LeakWatch") is False
def test_invalid_too_long(self):
assert _validate_name("a" * 21) is False
def test_invalid_starts_with_hyphen(self):
assert _validate_name("-bad") is False
def test_invalid_empty(self):
assert _validate_name("") is False
# ---------------------------------------------------------------------------
# TestTruncate
# ---------------------------------------------------------------------------
class TestTruncate:
def test_short_unchanged(self):
assert _truncate("hello", 60) == "hello"
def test_exact_length(self):
text = "a" * 60
assert _truncate(text, 60) == text
def test_long_truncated(self):
text = "a" * 80
result = _truncate(text, 60)
assert len(result) == 60
assert result.endswith("...")
# ---------------------------------------------------------------------------
# TestStateHelpers
# ---------------------------------------------------------------------------
class TestStateHelpers:
def test_save_and_load(self):
bot = _FakeBot()
data = {"keyword": "test", "name": "t"}
_save(bot, "#ch:t", data)
loaded = _load(bot, "#ch:t")
assert loaded == data
def test_load_missing(self):
bot = _FakeBot()
assert _load(bot, "nonexistent") is None
def test_delete(self):
bot = _FakeBot()
_save(bot, "#ch:t", {"name": "t"})
_delete(bot, "#ch:t")
assert _load(bot, "#ch:t") is None
def test_state_key(self):
assert _state_key("#ops", "leak-watch") == "#ops:leak-watch"
def test_load_invalid_json(self):
bot = _FakeBot()
bot.state.set("pastemoni", "bad", "not json{{{")
assert _load(bot, "bad") is None
# ---------------------------------------------------------------------------
# TestPollOnce
# ---------------------------------------------------------------------------
class TestPollOnce:
def test_new_items_announced(self):
bot = _FakeBot()
data = {
"keyword": "test", "name": "poll", "channel": "#test",
"interval": 300, "seen": {"pb": [], "gh": []},
"last_poll": "", "last_errors": {},
}
key = "#test:poll"
_save(bot, key, data)
_ps(bot)["monitors"][key] = data
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await _poll_once(bot, key, announce=True)
msgs = [s for t, s in bot.sent if t == "#test"]
pb_msgs = [m for m in msgs if "[pb]" in m]
gh_msgs = [m for m in msgs if "[gh]" in m]
assert len(pb_msgs) == 2
assert len(gh_msgs) == 1
asyncio.run(inner())
def test_seen_items_deduped(self):
bot = _FakeBot()
data = {
"keyword": "test", "name": "dedup", "channel": "#test",
"interval": 300,
"seen": {"pb": ["pb1", "pb2"], "gh": ["gh1"]},
"last_poll": "", "last_errors": {},
}
key = "#test:dedup"
_save(bot, key, data)
_ps(bot)["monitors"][key] = data
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await _poll_once(bot, key, announce=True)
assert len(bot.sent) == 0
asyncio.run(inner())
def test_error_increments_counter(self):
"""All backends failing increments the error counter."""
bot = _FakeBot()
data = {
"keyword": "test", "name": "errs", "channel": "#test",
"interval": 300, "seen": {"pb": [], "gh": []},
"last_poll": "", "last_errors": {},
}
key = "#test:errs"
_save(bot, key, data)
_ps(bot)["monitors"][key] = data
all_fail = {"pb": _fake_pb_error, "gh": _fake_gh_error}
async def inner():
with patch.object(_mod, "_BACKENDS", all_fail):
await _poll_once(bot, key, announce=True)
assert _ps(bot)["errors"][key] == 1
assert len(bot.sent) == 0
asyncio.run(inner())
def test_partial_failure_resets_counter(self):
"""One backend succeeding resets the error counter."""
bot = _FakeBot()
data = {
"keyword": "test", "name": "partial", "channel": "#test",
"interval": 300, "seen": {"pb": [], "gh": []},
"last_poll": "", "last_errors": {},
}
key = "#test:partial"
_save(bot, key, data)
_ps(bot)["monitors"][key] = data
_ps(bot)["errors"][key] = 3
partial_fail = {"pb": _fake_pb_error, "gh": _fake_gh}
async def inner():
with patch.object(_mod, "_BACKENDS", partial_fail):
await _poll_once(bot, key, announce=True)
assert _ps(bot)["errors"][key] == 0
gh_msgs = [s for t, s in bot.sent if t == "#test" and "[gh]" in s]
assert len(gh_msgs) == 1
asyncio.run(inner())
def test_max_announce_cap(self):
"""Only MAX_ANNOUNCE items announced per backend."""
bot = _FakeBot()
def _fake_many(keyword):
return [
{"id": f"p{i}", "title": f"Paste {i}",
"url": f"https://example.com/{i}", "snippet": ""}
for i in range(10)
]
data = {
"keyword": "test", "name": "cap", "channel": "#test",
"interval": 300, "seen": {"pb": [], "gh": []},
"last_poll": "", "last_errors": {},
}
key = "#test:cap"
_save(bot, key, data)
_ps(bot)["monitors"][key] = data
async def inner():
with patch.object(_mod, "_BACKENDS", {"pb": _fake_many}):
await _poll_once(bot, key, announce=True)
msgs = [s for t, s in bot.sent if t == "#test"]
# 5 items + 1 "... and 5 more"
assert len(msgs) == 6
assert "5 more" in msgs[-1]
asyncio.run(inner())
def test_no_announce_flag(self):
bot = _FakeBot()
data = {
"keyword": "test", "name": "quiet", "channel": "#test",
"interval": 300, "seen": {"pb": [], "gh": []},
"last_poll": "", "last_errors": {},
}
key = "#test:quiet"
_save(bot, key, data)
_ps(bot)["monitors"][key] = data
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await _poll_once(bot, key, announce=False)
assert len(bot.sent) == 0
updated = _load(bot, key)
assert len(updated["seen"]["pb"]) == 2
assert len(updated["seen"]["gh"]) == 1
asyncio.run(inner())
def test_seen_cap(self):
"""Seen list capped at MAX_SEEN per backend."""
bot = _FakeBot()
def _fake_many(keyword):
return [
{"id": f"v{i}", "title": f"V{i}", "url": "", "snippet": ""}
for i in range(250)
]
data = {
"keyword": "test", "name": "seencap", "channel": "#test",
"interval": 300, "seen": {"pb": []},
"last_poll": "", "last_errors": {},
}
key = "#test:seencap"
_save(bot, key, data)
_ps(bot)["monitors"][key] = data
async def inner():
with patch.object(_mod, "_BACKENDS", {"pb": _fake_many}):
await _poll_once(bot, key, announce=False)
updated = _load(bot, key)
assert len(updated["seen"]["pb"]) == _MAX_SEEN
assert updated["seen"]["pb"][0] == "v50"
asyncio.run(inner())
# ---------------------------------------------------------------------------
# TestCommands
# ---------------------------------------------------------------------------
class TestCmdAdd:
def test_add_success(self):
bot = _FakeBot(admin=True)
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_pastemoni(bot, _msg("!pastemoni add leak-watch api_key"))
await asyncio.sleep(0.2)
assert len(bot.replied) == 1
assert "Monitor 'leak-watch' added" in bot.replied[0]
assert "api_key" in bot.replied[0]
data = _load(bot, "#test:leak-watch")
assert data is not None
assert data["keyword"] == "api_key"
assert data["channel"] == "#test"
assert len(data["seen"]["pb"]) == 2
assert len(data["seen"]["gh"]) == 1
assert "#test:leak-watch" in _ps(bot)["pollers"]
_stop_poller(bot, "#test:leak-watch")
await asyncio.sleep(0)
asyncio.run(inner())
def test_add_requires_admin(self):
bot = _FakeBot(admin=False)
asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni add test keyword")))
assert "Permission denied" in bot.replied[0]
def test_add_requires_channel(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_pastemoni(bot, _pm("!pastemoni add test keyword")))
assert "Use this command in a channel" in bot.replied[0]
def test_add_invalid_name(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni add BAD! keyword")))
assert "Invalid name" in bot.replied[0]
def test_add_missing_keyword(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni add myname")))
assert "Usage:" in bot.replied[0]
def test_add_duplicate(self):
bot = _FakeBot(admin=True)
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_pastemoni(bot, _msg("!pastemoni add dupe keyword"))
await asyncio.sleep(0.1)
bot.replied.clear()
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_pastemoni(bot, _msg("!pastemoni add dupe other"))
assert "already exists" in bot.replied[0]
_stop_poller(bot, "#test:dupe")
await asyncio.sleep(0)
asyncio.run(inner())
def test_add_limit(self):
bot = _FakeBot(admin=True)
for i in range(20):
_save(bot, f"#test:mon{i}", {"name": f"mon{i}", "channel": "#test"})
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_pastemoni(bot, _msg("!pastemoni add overflow keyword"))
assert "limit reached" in bot.replied[0]
asyncio.run(inner())
class TestCmdDel:
def test_del_success(self):
bot = _FakeBot(admin=True)
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_pastemoni(bot, _msg("!pastemoni add todel keyword"))
await asyncio.sleep(0.1)
bot.replied.clear()
await cmd_pastemoni(bot, _msg("!pastemoni del todel"))
assert "Removed 'todel'" in bot.replied[0]
assert _load(bot, "#test:todel") is None
assert "#test:todel" not in _ps(bot)["pollers"]
await asyncio.sleep(0)
asyncio.run(inner())
def test_del_requires_admin(self):
bot = _FakeBot(admin=False)
asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni del test")))
assert "Permission denied" in bot.replied[0]
def test_del_nonexistent(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni del nosuch")))
assert "No monitor" in bot.replied[0]
def test_del_no_name(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni del")))
assert "Usage:" in bot.replied[0]
class TestCmdList:
def test_list_empty(self):
bot = _FakeBot()
asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni list")))
assert "No monitors" in bot.replied[0]
def test_list_populated(self):
bot = _FakeBot()
_save(bot, "#test:leaks", {
"name": "leaks", "channel": "#test", "keyword": "api_key",
"last_errors": {},
})
_save(bot, "#test:creds", {
"name": "creds", "channel": "#test", "keyword": "password",
"last_errors": {},
})
asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni list")))
assert "Monitors:" in bot.replied[0]
assert "leaks" in bot.replied[0]
assert "creds" in bot.replied[0]
def test_list_shows_errors(self):
bot = _FakeBot()
_save(bot, "#test:broken", {
"name": "broken", "channel": "#test", "keyword": "test",
"last_errors": {"pb": "Connection refused"},
})
asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni list")))
assert "broken" in bot.replied[0]
assert "1 errors" in bot.replied[0]
def test_list_requires_channel(self):
bot = _FakeBot()
asyncio.run(cmd_pastemoni(bot, _pm("!pastemoni list")))
assert "Use this command in a channel" in bot.replied[0]
def test_list_channel_isolation(self):
bot = _FakeBot()
_save(bot, "#test:mine", {
"name": "mine", "channel": "#test", "keyword": "test",
"last_errors": {},
})
_save(bot, "#other:theirs", {
"name": "theirs", "channel": "#other", "keyword": "test",
"last_errors": {},
})
asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni list")))
assert "mine" in bot.replied[0]
assert "theirs" not in bot.replied[0]
class TestCmdCheck:
def test_check_success(self):
bot = _FakeBot()
data = {
"keyword": "test", "name": "chk", "channel": "#test",
"interval": 300, "seen": {"pb": ["pb1", "pb2"], "gh": ["gh1"]},
"last_poll": "", "last_errors": {},
}
_save(bot, "#test:chk", data)
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_pastemoni(bot, _msg("!pastemoni check chk"))
assert "chk: checked" in bot.replied[0]
asyncio.run(inner())
def test_check_nonexistent(self):
bot = _FakeBot()
asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni check nope")))
assert "No monitor" in bot.replied[0]
def test_check_requires_channel(self):
bot = _FakeBot()
asyncio.run(cmd_pastemoni(bot, _pm("!pastemoni check test")))
assert "Use this command in a channel" in bot.replied[0]
def test_check_shows_error(self):
bot = _FakeBot()
data = {
"keyword": "test", "name": "errchk", "channel": "#test",
"interval": 300, "seen": {"pb": [], "gh": []},
"last_poll": "", "last_errors": {},
}
_save(bot, "#test:errchk", data)
all_fail = {"pb": _fake_pb_error, "gh": _fake_gh_error}
async def inner():
with patch.object(_mod, "_BACKENDS", all_fail):
await cmd_pastemoni(bot, _msg("!pastemoni check errchk"))
assert "error" in bot.replied[0].lower()
asyncio.run(inner())
def test_check_announces_new_items(self):
bot = _FakeBot()
data = {
"keyword": "test", "name": "news", "channel": "#test",
"interval": 300, "seen": {"pb": ["pb1"], "gh": []},
"last_poll": "", "last_errors": {},
}
_save(bot, "#test:news", data)
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_pastemoni(bot, _msg("!pastemoni check news"))
msgs = [s for t, s in bot.sent if t == "#test"]
pb_msgs = [m for m in msgs if "[pb]" in m]
gh_msgs = [m for m in msgs if "[gh]" in m]
assert len(pb_msgs) == 1 # pb2 only (pb1 seen)
assert len(gh_msgs) == 1 # gh1 new
asyncio.run(inner())
def test_check_no_name(self):
bot = _FakeBot()
asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni check")))
assert "Usage:" in bot.replied[0]
class TestCmdUsage:
def test_no_args(self):
bot = _FakeBot()
asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni")))
assert "Usage:" in bot.replied[0]
def test_unknown_subcommand(self):
bot = _FakeBot()
asyncio.run(cmd_pastemoni(bot, _msg("!pastemoni foobar")))
assert "Usage:" in bot.replied[0]
# ---------------------------------------------------------------------------
# TestRestore
# ---------------------------------------------------------------------------
class TestRestore:
def test_pollers_rebuilt_from_state(self):
bot = _FakeBot()
data = {
"keyword": "test", "name": "restored", "channel": "#test",
"interval": 300, "seen": {"pb": [], "gh": []},
"last_poll": "", "last_errors": {},
}
_save(bot, "#test:restored", data)
async def inner():
_restore(bot)
assert "#test:restored" in _ps(bot)["pollers"]
assert not _ps(bot)["pollers"]["#test:restored"].done()
_stop_poller(bot, "#test:restored")
await asyncio.sleep(0)
asyncio.run(inner())
def test_restore_skips_active(self):
bot = _FakeBot()
data = {
"keyword": "test", "name": "active", "channel": "#test",
"interval": 300, "seen": {"pb": [], "gh": []},
"last_poll": "", "last_errors": {},
}
_save(bot, "#test:active", data)
async def inner():
dummy = asyncio.create_task(asyncio.sleep(9999))
_ps(bot)["pollers"]["#test:active"] = dummy
_restore(bot)
assert _ps(bot)["pollers"]["#test:active"] is dummy
dummy.cancel()
await asyncio.sleep(0)
asyncio.run(inner())
def test_restore_replaces_done_task(self):
bot = _FakeBot()
data = {
"keyword": "test", "name": "done", "channel": "#test",
"interval": 300, "seen": {"pb": [], "gh": []},
"last_poll": "", "last_errors": {},
}
_save(bot, "#test:done", data)
async def inner():
done_task = asyncio.create_task(asyncio.sleep(0))
await done_task
_ps(bot)["pollers"]["#test:done"] = done_task
_restore(bot)
new_task = _ps(bot)["pollers"]["#test:done"]
assert new_task is not done_task
assert not new_task.done()
_stop_poller(bot, "#test:done")
await asyncio.sleep(0)
asyncio.run(inner())
def test_restore_skips_bad_json(self):
bot = _FakeBot()
bot.state.set("pastemoni", "#test:bad", "not json{{{")
async def inner():
_restore(bot)
assert "#test:bad" not in _ps(bot)["pollers"]
asyncio.run(inner())
def test_on_connect_calls_restore(self):
bot = _FakeBot()
data = {
"keyword": "test", "name": "conn", "channel": "#test",
"interval": 300, "seen": {"pb": [], "gh": []},
"last_poll": "", "last_errors": {},
}
_save(bot, "#test:conn", data)
async def inner():
msg = _msg("", target="botname")
await on_connect(bot, msg)
assert "#test:conn" in _ps(bot)["pollers"]
_stop_poller(bot, "#test:conn")
await asyncio.sleep(0)
asyncio.run(inner())
# ---------------------------------------------------------------------------
# TestPollerManagement
# ---------------------------------------------------------------------------
class TestPollerManagement:
def test_start_and_stop(self):
bot = _FakeBot()
data = {
"keyword": "test", "name": "mgmt", "channel": "#test",
"interval": 300, "seen": {"pb": [], "gh": []},
"last_poll": "", "last_errors": {},
}
key = "#test:mgmt"
_save(bot, key, data)
_ps(bot)["monitors"][key] = data
async def inner():
_start_poller(bot, key)
assert key in _ps(bot)["pollers"]
assert not _ps(bot)["pollers"][key].done()
_stop_poller(bot, key)
await asyncio.sleep(0)
assert key not in _ps(bot)["pollers"]
assert key not in _ps(bot)["monitors"]
asyncio.run(inner())
def test_start_idempotent(self):
bot = _FakeBot()
data = {
"keyword": "test", "name": "idem", "channel": "#test",
"interval": 300, "seen": {"pb": [], "gh": []},
"last_poll": "", "last_errors": {},
}
key = "#test:idem"
_save(bot, key, data)
_ps(bot)["monitors"][key] = data
async def inner():
_start_poller(bot, key)
first = _ps(bot)["pollers"][key]
_start_poller(bot, key)
assert _ps(bot)["pollers"][key] is first
_stop_poller(bot, key)
await asyncio.sleep(0)
asyncio.run(inner())
def test_stop_nonexistent(self):
bot = _FakeBot()
_stop_poller(bot, "#test:nonexistent")