Files
derp/tests/test_alert.py
user 073659607e feat: add multi-server support
Connect to multiple IRC servers concurrently from a single config file.
Plugins are loaded once and shared; per-server state is isolated via
separate SQLite databases and per-bot runtime state (bot._pstate).

- Add build_server_configs() for [servers.*] config layout
- Bot.__init__ gains name parameter, _pstate dict for plugin isolation
- cli.py runs multiple bots via asyncio.gather
- 9 stateful plugins migrated from module-level dicts to _ps(bot) pattern
- Backward compatible: legacy [server] config works unchanged

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 19:04:20 +01:00

1393 lines
46 KiB
Python

"""Tests for the keyword alert subscription plugin."""
import asyncio
import importlib.util
import json
import sys
from pathlib import Path
from unittest.mock import patch
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.alert", Path(__file__).resolve().parent.parent / "plugins" / "alert.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.alert import ( # noqa: E402
_MAX_SEEN,
_compact_num,
_delete,
_extract_videos,
_load,
_poll_once,
_ps,
_restore,
_save,
_save_result,
_search_searx,
_search_twitch,
_search_youtube,
_start_poller,
_state_key,
_stop_poller,
_truncate,
_validate_name,
cmd_alert,
on_connect,
)
# -- Fixtures ----------------------------------------------------------------
# Minimal InnerTube-style response with two videos
YT_RESPONSE = {
"contents": {
"twoColumnSearchResultsRenderer": {
"primaryContents": {
"sectionListRenderer": {
"contents": [
{
"itemSectionRenderer": {
"contents": [
{
"videoRenderer": {
"videoId": "abc123",
"title": {
"runs": [{"text": "First Video"}],
},
},
},
{
"videoRenderer": {
"videoId": "def456",
"title": {
"runs": [{"text": "Second Video"}],
},
},
},
],
},
},
],
},
},
},
},
}
# Deeply nested variant (extra wrapper layers)
YT_NESTED = {
"wrapper": {
"inner": [
{"videoId": "nested1", "title": {"runs": [{"text": "Nested"}]}},
],
},
}
# GQL search response with streams and VODs
GQL_RESPONSE = {
"data": {
"searchFor": {
"streams": {
"items": [
{
"id": "111",
"broadcaster": {"login": "streamer1", "displayName": "Streamer1"},
"title": "Live now!",
"game": {"name": "Minecraft"},
"viewersCount": 500,
},
],
},
"videos": {
"items": [
{
"id": "222",
"owner": {"login": "creator1", "displayName": "Creator1"},
"title": "Cool VOD",
"game": {"name": "Fortnite"},
"viewCount": 1000,
},
],
},
},
},
}
# SearXNG search response
SEARX_RESPONSE = {
"results": [
{"title": "SearX Result 1", "url": "https://example.com/sx1", "content": "Snippet 1"},
{"title": "SearX Result 2", "url": "https://example.com/sx2", "content": "Snippet 2"},
{"title": "SearX Result 3", "url": "https://example.com/sx3", "content": "Snippet 3"},
],
}
# -- Helpers -----------------------------------------------------------------
class _FakeState:
"""In-memory stand-in for bot.state."""
def __init__(self):
self._store: dict[str, dict[str, str]] = {}
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
return self._store.get(plugin, {}).get(key, default)
def set(self, plugin: str, key: str, value: str) -> None:
self._store.setdefault(plugin, {})[key] = value
def delete(self, plugin: str, key: str) -> bool:
try:
del self._store[plugin][key]
return True
except KeyError:
return False
def keys(self, plugin: str) -> list[str]:
return sorted(self._store.get(plugin, {}).keys())
class _FakeRegistry:
"""Minimal registry stand-in."""
def __init__(self):
self._modules: dict = {}
class _FakeBot:
"""Minimal bot stand-in that captures sent/replied messages."""
def __init__(self, *, admin: bool = False):
self.sent: list[tuple[str, str]] = []
self.actions: list[tuple[str, str]] = []
self.replied: list[str] = []
self.state = _FakeState()
self._pstate: dict = {}
self.registry = _FakeRegistry()
self._admin = admin
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
async def action(self, target: str, text: str) -> None:
self.actions.append((target, text))
async def reply(self, message, text: str) -> None:
self.replied.append(text)
async def long_reply(self, message, lines, *, label: str = "") -> None:
for line in lines:
self.replied.append(line)
def _is_admin(self, message) -> bool:
return self._admin
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
"""Create a channel PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
def _pm(text: str, nick: str = "alice") -> Message:
"""Create a private PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=["botname", text], tags={},
)
def _clear(bot=None) -> None:
"""Reset per-bot plugin state between tests."""
if bot is None:
return
ps = _ps(bot)
for task in ps["pollers"].values():
if task and not task.done():
task.cancel()
ps["pollers"].clear()
ps["subs"].clear()
ps["errors"].clear()
ps["poll_count"].clear()
def _fake_yt(keyword):
"""Fake YouTube backend returning two results (keyword in title)."""
return [
{"id": "yt1", "title": "YT test Result 1",
"url": "https://www.youtube.com/watch?v=yt1", "extra": ""},
{"id": "yt2", "title": "YT test Result 2",
"url": "https://www.youtube.com/watch?v=yt2", "extra": ""},
]
def _fake_tw(keyword):
"""Fake Twitch backend returning two results (keyword in title)."""
return [
{"id": "stream:tw1", "title": "TW test Stream 1",
"url": "https://twitch.tv/user1", "extra": "500 viewers"},
{"id": "vod:tw2", "title": "TW test VOD 1",
"url": "https://twitch.tv/videos/tw2", "extra": "1k views"},
]
def _fake_yt_error(keyword):
"""Fake YouTube backend that raises."""
raise ConnectionError("YouTube down")
def _fake_tw_error(keyword):
"""Fake Twitch backend that raises."""
raise ConnectionError("Twitch down")
def _fake_sx(keyword):
"""Fake SearX backend returning two results (keyword in title)."""
return [
{"id": "https://example.com/sx1", "title": "SX test Result 1",
"url": "https://example.com/sx1", "extra": ""},
{"id": "https://example.com/sx2", "title": "SX test Result 2",
"url": "https://example.com/sx2", "extra": ""},
]
def _fake_sx_error(keyword):
"""Fake SearX backend that raises."""
raise ConnectionError("SearX down")
_FAKE_BACKENDS = {"yt": _fake_yt, "tw": _fake_tw, "sx": _fake_sx}
# ---------------------------------------------------------------------------
# TestValidateName
# ---------------------------------------------------------------------------
class TestValidateName:
def test_valid_simple(self):
assert _validate_name("mc-speed") is True
def test_valid_with_numbers(self):
assert _validate_name("alert123") is True
def test_valid_single_char(self):
assert _validate_name("a") is True
def test_valid_max_length(self):
assert _validate_name("a" * 20) is True
def test_invalid_too_long(self):
assert _validate_name("a" * 21) is False
def test_invalid_uppercase(self):
assert _validate_name("Alert") is False
def test_invalid_starts_with_hyphen(self):
assert _validate_name("-alert") is False
def test_invalid_special_chars(self):
assert _validate_name("alert!") is False
def test_invalid_spaces(self):
assert _validate_name("my alert") is False
def test_invalid_empty(self):
assert _validate_name("") is False
# ---------------------------------------------------------------------------
# TestTruncate
# ---------------------------------------------------------------------------
class TestTruncate:
def test_short_text_unchanged(self):
assert _truncate("hello", 80) == "hello"
def test_exact_length_unchanged(self):
text = "a" * 80
assert _truncate(text, 80) == text
def test_long_text_truncated(self):
text = "a" * 100
result = _truncate(text, 80)
assert len(result) == 80
assert result.endswith("...")
def test_default_max_length(self):
text = "a" * 100
result = _truncate(text)
assert len(result) == 80
def test_trailing_space_stripped(self):
text = "word " * 20
result = _truncate(text, 20)
assert not result.endswith(" ...")
# ---------------------------------------------------------------------------
# TestCompactNum
# ---------------------------------------------------------------------------
class TestCompactNum:
def test_zero(self):
assert _compact_num(0) == "0"
def test_small(self):
assert _compact_num(999) == "999"
def test_one_k(self):
assert _compact_num(1000) == "1k"
def test_one_point_five_k(self):
assert _compact_num(1500) == "1.5k"
def test_one_m(self):
assert _compact_num(1000000) == "1M"
def test_two_point_five_m(self):
assert _compact_num(2500000) == "2.5M"
def test_exact_boundary(self):
assert _compact_num(10000) == "10k"
def test_large_millions(self):
assert _compact_num(12300000) == "12.3M"
# ---------------------------------------------------------------------------
# TestExtractVideos
# ---------------------------------------------------------------------------
class TestExtractVideos:
def test_standard_response(self):
videos = _extract_videos(YT_RESPONSE)
assert len(videos) == 2
assert videos[0]["id"] == "abc123"
assert videos[0]["title"] == "First Video"
assert "watch?v=abc123" in videos[0]["url"]
assert videos[1]["id"] == "def456"
def test_nested_response(self):
videos = _extract_videos(YT_NESTED)
assert len(videos) == 1
assert videos[0]["id"] == "nested1"
assert videos[0]["title"] == "Nested"
def test_empty_response(self):
videos = _extract_videos({})
assert videos == []
def test_depth_limit(self):
"""Deeply nested structure stops at depth 20."""
obj = {"videoId": "deep1", "title": {"runs": [{"text": "Deep"}]}}
# Wrap in 25 layers of nesting
for _ in range(25):
obj = {"child": obj}
videos = _extract_videos(obj)
assert len(videos) == 0
def test_title_as_string(self):
obj = {"videoId": "str1", "title": "String Title"}
videos = _extract_videos(obj)
assert len(videos) == 1
assert videos[0]["title"] == "String Title"
def test_empty_title_skipped(self):
obj = {"videoId": "empty1", "title": {"runs": []}}
videos = _extract_videos(obj)
assert len(videos) == 0
def test_dedup_in_search_youtube(self):
"""_search_youtube deduplicates by videoId."""
# Two sections containing the same video
response = {
"a": [
{"videoId": "dup1", "title": "Video A"},
{"videoId": "dup1", "title": "Video A Copy"},
],
}
class FakeResp:
def read(self):
return json.dumps(response).encode()
def close(self):
pass
with patch("urllib.request.urlopen", return_value=FakeResp()):
results = _search_youtube("test")
assert len(results) == 1
assert results[0]["id"] == "dup1"
# ---------------------------------------------------------------------------
# TestSearchYoutube
# ---------------------------------------------------------------------------
class TestSearchYoutube:
def test_parses_response(self):
class FakeResp:
def read(self):
return json.dumps(YT_RESPONSE).encode()
def close(self):
pass
with patch("urllib.request.urlopen", return_value=FakeResp()):
results = _search_youtube("test query")
assert len(results) == 2
assert results[0]["id"] == "abc123"
assert results[0]["title"] == "First Video"
def test_http_error_propagates(self):
import pytest
with patch("urllib.request.urlopen", side_effect=ConnectionError("fail")):
with pytest.raises(ConnectionError):
_search_youtube("test")
# ---------------------------------------------------------------------------
# TestSearchTwitch
# ---------------------------------------------------------------------------
class TestSearchTwitch:
def test_parses_streams_and_vods(self):
class FakeResp:
def read(self):
return json.dumps(GQL_RESPONSE).encode()
def close(self):
pass
with patch.object(_mod, "_urlopen", return_value=FakeResp()):
results = _search_twitch("minecraft")
assert len(results) == 2
# Stream
assert results[0]["id"] == "stream:111"
assert "Streamer1 is live:" in results[0]["title"]
assert "(Minecraft)" in results[0]["title"]
assert results[0]["url"] == "https://twitch.tv/streamer1"
# VOD
assert results[1]["id"] == "vod:222"
assert results[1]["title"] == "Cool VOD"
assert "videos/222" in results[1]["url"]
def test_empty_search_results(self):
empty = {"data": {"searchFor": {"streams": {"items": []}, "videos": {"items": []}}}}
class FakeResp:
def read(self):
return json.dumps(empty).encode()
def close(self):
pass
with patch.object(_mod, "_urlopen", return_value=FakeResp()):
results = _search_twitch("nothing")
assert results == []
def test_bad_gql_response(self):
bad = {"data": {"searchFor": None}}
class FakeResp:
def read(self):
return json.dumps(bad).encode()
def close(self):
pass
with patch.object(_mod, "_urlopen", return_value=FakeResp()):
results = _search_twitch("bad")
assert results == []
def test_http_error_propagates(self):
import pytest
with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")):
with pytest.raises(ConnectionError):
_search_twitch("test")
def test_stream_without_game(self):
no_game = {
"data": {
"searchFor": {
"streams": {
"items": [{
"id": "333",
"broadcaster": {"login": "nogame", "displayName": "NoGame"},
"title": "Just chatting",
"game": None,
"viewersCount": 10,
}],
},
"videos": {"items": []},
},
},
}
class FakeResp:
def read(self):
return json.dumps(no_game).encode()
def close(self):
pass
with patch.object(_mod, "_urlopen", return_value=FakeResp()):
results = _search_twitch("chat")
assert len(results) == 1
assert "()" not in results[0]["title"]
# ---------------------------------------------------------------------------
# TestStateHelpers
# ---------------------------------------------------------------------------
class TestStateHelpers:
def test_save_and_load(self):
bot = _FakeBot()
data = {"keyword": "test", "name": "t"}
_save(bot, "#ch:t", data)
loaded = _load(bot, "#ch:t")
assert loaded == data
def test_load_missing(self):
bot = _FakeBot()
assert _load(bot, "nonexistent") is None
def test_delete(self):
bot = _FakeBot()
_save(bot, "#ch:t", {"name": "t"})
_delete(bot, "#ch:t")
assert _load(bot, "#ch:t") is None
def test_state_key(self):
assert _state_key("#ops", "mc-speed") == "#ops:mc-speed"
def test_load_invalid_json(self):
bot = _FakeBot()
bot.state.set("alert", "bad", "not json{{{")
assert _load(bot, "bad") is None
# ---------------------------------------------------------------------------
# TestCmdAlertAdd
# ---------------------------------------------------------------------------
class TestCmdAlertAdd:
def test_add_success(self):
_clear()
bot = _FakeBot(admin=True)
async def inner():
with (
patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS),
patch.object(_mod, "_fetch_og_batch", return_value={}),
):
await cmd_alert(bot, _msg("!alert add mc-speed minecraft speedrun"))
# Allow background seeding task to complete (patches must stay active)
await asyncio.sleep(0.2)
assert len(bot.replied) == 1
assert "Alert 'mc-speed' added" in bot.replied[0]
assert "minecraft speedrun" in bot.replied[0]
data = _load(bot, "#test:mc-speed")
assert data is not None
assert data["name"] == "mc-speed"
assert data["keyword"] == "minecraft speedrun"
assert data["channel"] == "#test"
# Seeding happens in background; verify seen lists populated
assert len(data["seen"]["yt"]) == 2
assert len(data["seen"]["tw"]) == 2
assert len(data["seen"]["sx"]) == 2
assert "#test:mc-speed" in _ps(bot)["pollers"]
_stop_poller(bot, "#test:mc-speed")
await asyncio.sleep(0)
asyncio.run(inner())
def test_add_requires_admin(self):
_clear()
bot = _FakeBot(admin=False)
asyncio.run(cmd_alert(bot, _msg("!alert add test keyword")))
assert "Permission denied" in bot.replied[0]
def test_add_requires_channel(self):
_clear()
bot = _FakeBot(admin=True)
asyncio.run(cmd_alert(bot, _pm("!alert add test keyword")))
assert "Use this command in a channel" in bot.replied[0]
def test_add_invalid_name(self):
_clear()
bot = _FakeBot(admin=True)
asyncio.run(cmd_alert(bot, _msg("!alert add BAD! keyword")))
assert "Invalid name" in bot.replied[0]
def test_add_missing_keyword(self):
_clear()
bot = _FakeBot(admin=True)
asyncio.run(cmd_alert(bot, _msg("!alert add myname")))
assert "Usage:" in bot.replied[0]
def test_add_keyword_too_long(self):
_clear()
bot = _FakeBot(admin=True)
long_kw = "x" * 101
asyncio.run(cmd_alert(bot, _msg(f"!alert add test {long_kw}")))
assert "too long" in bot.replied[0]
def test_add_duplicate(self):
_clear()
bot = _FakeBot(admin=True)
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_alert(bot, _msg("!alert add dupe some keyword"))
await asyncio.sleep(0.1)
bot.replied.clear()
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_alert(bot, _msg("!alert add dupe other keyword"))
assert "already exists" in bot.replied[0]
_stop_poller(bot, "#test:dupe")
await asyncio.sleep(0)
asyncio.run(inner())
def test_add_limit(self):
_clear()
bot = _FakeBot(admin=True)
for i in range(20):
_save(bot, f"#test:sub{i}", {"name": f"sub{i}", "channel": "#test"})
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_alert(bot, _msg("!alert add overflow keyword"))
assert "limit reached" in bot.replied[0]
asyncio.run(inner())
def test_add_seed_error_still_creates(self):
"""If a backend fails during seeding, seen list is empty for that backend."""
_clear()
bot = _FakeBot(admin=True)
backends = {"yt": _fake_yt, "tw": _fake_tw_error, "sx": _fake_sx}
async def inner():
with (
patch.object(_mod, "_BACKENDS", backends),
patch.object(_mod, "_fetch_og_batch", return_value={}),
):
await cmd_alert(bot, _msg("!alert add partial test keyword"))
# Allow background seeding task to complete (patches must stay active)
await asyncio.sleep(0.2)
data = _load(bot, "#test:partial")
assert data is not None
assert len(data["seen"]["yt"]) == 2
assert len(data["seen"].get("tw", [])) == 0
assert len(data["seen"]["sx"]) == 2
_stop_poller(bot, "#test:partial")
await asyncio.sleep(0)
asyncio.run(inner())
# ---------------------------------------------------------------------------
# TestCmdAlertDel
# ---------------------------------------------------------------------------
class TestCmdAlertDel:
def test_del_success(self):
_clear()
bot = _FakeBot(admin=True)
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_alert(bot, _msg("!alert add todel some keyword"))
await asyncio.sleep(0.1)
bot.replied.clear()
await cmd_alert(bot, _msg("!alert del todel"))
assert "Removed 'todel'" in bot.replied[0]
assert _load(bot, "#test:todel") is None
assert "#test:todel" not in _ps(bot)["pollers"]
await asyncio.sleep(0)
asyncio.run(inner())
def test_del_requires_admin(self):
_clear()
bot = _FakeBot(admin=False)
asyncio.run(cmd_alert(bot, _msg("!alert del test")))
assert "Permission denied" in bot.replied[0]
def test_del_requires_channel(self):
_clear()
bot = _FakeBot(admin=True)
asyncio.run(cmd_alert(bot, _pm("!alert del test")))
assert "Use this command in a channel" in bot.replied[0]
def test_del_nonexistent(self):
_clear()
bot = _FakeBot(admin=True)
asyncio.run(cmd_alert(bot, _msg("!alert del nosuch")))
assert "No alert" in bot.replied[0]
def test_del_no_name(self):
_clear()
bot = _FakeBot(admin=True)
asyncio.run(cmd_alert(bot, _msg("!alert del")))
assert "Usage:" in bot.replied[0]
# ---------------------------------------------------------------------------
# TestCmdAlertList
# ---------------------------------------------------------------------------
class TestCmdAlertList:
def test_list_empty(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_alert(bot, _msg("!alert list")))
assert "No alerts" in bot.replied[0]
def test_list_populated(self):
_clear()
bot = _FakeBot()
_save(bot, "#test:mc", {
"name": "mc", "channel": "#test", "keyword": "minecraft",
"last_error": "",
})
_save(bot, "#test:rl", {
"name": "rl", "channel": "#test", "keyword": "rocket league",
"last_error": "",
})
asyncio.run(cmd_alert(bot, _msg("!alert list")))
assert "Alerts:" in bot.replied[0]
assert "mc" in bot.replied[0]
assert "rl" in bot.replied[0]
def test_list_shows_error(self):
_clear()
bot = _FakeBot()
_save(bot, "#test:broken", {
"name": "broken", "channel": "#test", "keyword": "test",
"last_errors": {"yt": "Connection refused"},
})
asyncio.run(cmd_alert(bot, _msg("!alert list")))
assert "broken" in bot.replied[0]
assert "backend error" in bot.replied[0]
def test_list_requires_channel(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_alert(bot, _pm("!alert list")))
assert "Use this command in a channel" in bot.replied[0]
def test_list_channel_isolation(self):
_clear()
bot = _FakeBot()
_save(bot, "#test:mine", {
"name": "mine", "channel": "#test", "keyword": "test",
"last_error": "",
})
_save(bot, "#other:theirs", {
"name": "theirs", "channel": "#other", "keyword": "test",
"last_error": "",
})
asyncio.run(cmd_alert(bot, _msg("!alert list")))
assert "mine" in bot.replied[0]
assert "theirs" not in bot.replied[0]
# ---------------------------------------------------------------------------
# TestCmdAlertCheck
# ---------------------------------------------------------------------------
class TestCmdAlertCheck:
def test_check_success(self):
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "chk", "channel": "#test",
"interval": 300, "seen": {
"yt": ["yt1", "yt2"], "tw": ["stream:tw1", "vod:tw2"],
"sx": ["https://example.com/sx1", "https://example.com/sx2"],
},
"last_poll": "", "last_error": "",
}
_save(bot, "#test:chk", data)
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_alert(bot, _msg("!alert check chk"))
assert "chk: checked" in bot.replied[0]
asyncio.run(inner())
def test_check_nonexistent(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_alert(bot, _msg("!alert check nope")))
assert "No alert" in bot.replied[0]
def test_check_requires_channel(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_alert(bot, _pm("!alert check test")))
assert "Use this command in a channel" in bot.replied[0]
def test_check_shows_error(self):
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "errchk", "channel": "#test",
"interval": 300, "seen": {"yt": [], "tw": [], "sx": []},
"last_poll": "", "last_error": "",
}
_save(bot, "#test:errchk", data)
backends = {"yt": _fake_yt_error, "tw": _fake_tw, "sx": _fake_sx}
async def inner():
with patch.object(_mod, "_BACKENDS", backends):
await cmd_alert(bot, _msg("!alert check errchk"))
assert "error" in bot.replied[0].lower()
asyncio.run(inner())
def test_check_announces_new_items(self):
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "news", "channel": "#test",
"interval": 300, "seen": {"yt": ["yt1"], "tw": [], "sx": []},
"last_poll": "", "last_error": "",
}
_save(bot, "#test:news", data)
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await cmd_alert(bot, _msg("!alert check news"))
# yt2 is new for yt, both tw and sx results are new
# Metadata (with backend tags) goes to action(), titles to send()
actions = [s for t, s in bot.actions if t == "#test"]
yt_msgs = [m for m in actions if "/yt/" in m]
tw_msgs = [m for m in actions if "/tw/" in m]
sx_msgs = [m for m in actions if "/sx/" in m]
assert len(yt_msgs) == 1 # yt2 only
assert len(tw_msgs) == 2 # both tw results
assert len(sx_msgs) == 2 # both sx results
asyncio.run(inner())
def test_check_no_name(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_alert(bot, _msg("!alert check")))
assert "Usage:" in bot.replied[0]
# ---------------------------------------------------------------------------
# TestPollOnce
# ---------------------------------------------------------------------------
class TestPollOnce:
def test_new_items_announced(self):
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "poll", "channel": "#test",
"interval": 300, "seen": {"yt": [], "tw": [], "sx": []},
"last_poll": "", "last_error": "",
}
key = "#test:poll"
_save(bot, key, data)
_ps(bot)["subs"][key] = data
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await _poll_once(bot, key, announce=True)
# Titles go to send(), metadata goes to action()
titles = [s for t, s in bot.sent if t == "#test"]
actions = [s for t, s in bot.actions if t == "#test"]
assert len(titles) == 6 # 2 yt + 2 tw + 2 sx
assert len(actions) == 6
assert "[poll/yt/" in actions[0]
assert "[poll/tw/" in actions[2]
assert "[poll/sx/" in actions[4]
# Twitch fakes have extra metadata; verify it appears in titles
tw_titles = [s for t, s in bot.sent if t == "#test" and "TW test" in s]
assert any("| 500 viewers" in t for t in tw_titles)
assert any("| 1k views" in t for t in tw_titles)
# YouTube fakes have no extra; verify no pipe suffix
yt_titles = [s for t, s in bot.sent if t == "#test" and "YT test" in s]
assert all("|" not in t for t in yt_titles)
asyncio.run(inner())
def test_dedup_no_repeat(self):
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "dedup", "channel": "#test",
"interval": 300,
"seen": {
"yt": ["yt1", "yt2"], "tw": ["stream:tw1", "vod:tw2"],
"sx": ["https://example.com/sx1", "https://example.com/sx2"],
},
"last_poll": "", "last_error": "",
}
key = "#test:dedup"
_save(bot, key, data)
_ps(bot)["subs"][key] = data
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await _poll_once(bot, key, announce=True)
assert len(bot.sent) == 0
asyncio.run(inner())
def test_partial_backend_failure(self):
"""One backend fails, other still works. Error counter increments."""
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "partial", "channel": "#test",
"interval": 300, "seen": {"yt": [], "tw": [], "sx": []},
"last_poll": "", "last_error": "",
}
key = "#test:partial"
_save(bot, key, data)
_ps(bot)["subs"][key] = data
backends = {"yt": _fake_yt_error, "tw": _fake_tw, "sx": _fake_sx}
async def inner():
with patch.object(_mod, "_BACKENDS", backends):
await _poll_once(bot, key, announce=True)
# Twitch and SearX results should still be announced
tw_msgs = [s for t, s in bot.actions if t == "#test" and "/tw/" in s]
sx_msgs = [s for t, s in bot.actions if t == "#test" and "/sx/" in s]
assert len(tw_msgs) == 2
assert len(sx_msgs) == 2
# Error counter should be incremented for yt backend
assert _ps(bot)["errors"][key]["yt"] == 1
updated = _load(bot, key)
assert "yt" in updated.get("last_errors", {})
asyncio.run(inner())
def test_no_announce_flag(self):
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "quiet", "channel": "#test",
"interval": 300, "seen": {"yt": [], "tw": [], "sx": []},
"last_poll": "", "last_error": "",
}
key = "#test:quiet"
_save(bot, key, data)
_ps(bot)["subs"][key] = data
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await _poll_once(bot, key, announce=False)
assert len(bot.sent) == 0
updated = _load(bot, key)
assert len(updated["seen"]["yt"]) == 2
assert len(updated["seen"]["tw"]) == 2
assert len(updated["seen"]["sx"]) == 2
asyncio.run(inner())
def test_seen_cap(self):
"""Seen list is capped at MAX_SEEN per platform."""
_clear()
bot = _FakeBot()
def fake_many(keyword):
return [
{"id": f"v{i}", "title": f"V{i}", "url": "", "extra": ""}
for i in range(250)
]
data = {
"keyword": "test", "name": "cap", "channel": "#test",
"interval": 300, "seen": {"yt": [], "tw": [], "sx": []},
"last_poll": "", "last_error": "",
}
key = "#test:cap"
_save(bot, key, data)
_ps(bot)["subs"][key] = data
async def inner():
with patch.object(_mod, "_BACKENDS", {"yt": fake_many, "tw": _fake_tw}):
await _poll_once(bot, key, announce=False)
updated = _load(bot, key)
assert len(updated["seen"]["yt"]) == _MAX_SEEN
# Oldest entries should have been evicted
assert updated["seen"]["yt"][0] == "v50"
asyncio.run(inner())
def test_all_backends_error(self):
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "allerr", "channel": "#test",
"interval": 300, "seen": {"yt": [], "tw": [], "sx": []},
"last_poll": "", "last_error": "",
}
key = "#test:allerr"
_save(bot, key, data)
_ps(bot)["subs"][key] = data
backends = {"yt": _fake_yt_error, "tw": _fake_tw_error, "sx": _fake_sx_error}
async def inner():
with patch.object(_mod, "_BACKENDS", backends):
await _poll_once(bot, key, announce=True)
assert all(v == 1 for v in _ps(bot)["errors"][key].values())
assert len(bot.sent) == 0
asyncio.run(inner())
def test_success_clears_error(self):
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "clrerr", "channel": "#test",
"interval": 300, "seen": {
"yt": ["yt1", "yt2"], "tw": ["stream:tw1", "vod:tw2"],
"sx": ["https://example.com/sx1", "https://example.com/sx2"],
},
"last_poll": "", "last_errors": {"yt": "old error"},
}
key = "#test:clrerr"
_save(bot, key, data)
_ps(bot)["subs"][key] = data
_ps(bot)["errors"][key] = {"yt": 3, "tw": 3, "sx": 3}
async def inner():
with patch.object(_mod, "_BACKENDS", _FAKE_BACKENDS):
await _poll_once(bot, key, announce=True)
assert all(v == 0 for v in _ps(bot)["errors"][key].values())
updated = _load(bot, key)
assert updated.get("last_errors", {}) == {}
asyncio.run(inner())
# ---------------------------------------------------------------------------
# TestRestore
# ---------------------------------------------------------------------------
class TestRestore:
def test_restore_spawns_pollers(self):
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "restored", "channel": "#test",
"interval": 300, "seen": {"yt": [], "tw": [], "sx": []},
"last_poll": "", "last_error": "",
}
_save(bot, "#test:restored", data)
async def inner():
_restore(bot)
ps = _ps(bot)
assert "#test:restored" in ps["pollers"]
task = ps["pollers"]["#test:restored"]
assert not task.done()
_stop_poller(bot, "#test:restored")
await asyncio.sleep(0)
asyncio.run(inner())
def test_restore_skips_active(self):
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "active", "channel": "#test",
"interval": 300, "seen": {"yt": [], "tw": [], "sx": []},
"last_poll": "", "last_error": "",
}
_save(bot, "#test:active", data)
async def inner():
ps = _ps(bot)
dummy = asyncio.create_task(asyncio.sleep(9999))
ps["pollers"]["#test:active"] = dummy
_restore(bot)
assert ps["pollers"]["#test:active"] is dummy
dummy.cancel()
await asyncio.sleep(0)
asyncio.run(inner())
def test_restore_replaces_done_task(self):
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "done", "channel": "#test",
"interval": 300, "seen": {"yt": [], "tw": [], "sx": []},
"last_poll": "", "last_error": "",
}
_save(bot, "#test:done", data)
async def inner():
ps = _ps(bot)
done_task = asyncio.create_task(asyncio.sleep(0))
await done_task
ps["pollers"]["#test:done"] = done_task
_restore(bot)
new_task = ps["pollers"]["#test:done"]
assert new_task is not done_task
assert not new_task.done()
_stop_poller(bot, "#test:done")
await asyncio.sleep(0)
asyncio.run(inner())
def test_restore_skips_bad_json(self):
_clear()
bot = _FakeBot()
bot.state.set("alert", "#test:bad", "not json{{{")
async def inner():
_restore(bot)
assert "#test:bad" not in _ps(bot)["pollers"]
asyncio.run(inner())
def test_on_connect_calls_restore(self):
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "conn", "channel": "#test",
"interval": 300, "seen": {"yt": [], "tw": [], "sx": []},
"last_poll": "", "last_error": "",
}
_save(bot, "#test:conn", data)
async def inner():
msg = _msg("", target="botname")
await on_connect(bot, msg)
assert "#test:conn" in _ps(bot)["pollers"]
_stop_poller(bot, "#test:conn")
await asyncio.sleep(0)
asyncio.run(inner())
# ---------------------------------------------------------------------------
# TestPollerManagement
# ---------------------------------------------------------------------------
class TestPollerManagement:
def test_start_and_stop(self):
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "mgmt", "channel": "#test",
"interval": 300, "seen": {"yt": [], "tw": [], "sx": []},
"last_poll": "", "last_error": "",
}
key = "#test:mgmt"
_save(bot, key, data)
_ps(bot)["subs"][key] = data
async def inner():
ps = _ps(bot)
_start_poller(bot, key)
assert key in ps["pollers"]
assert not ps["pollers"][key].done()
_stop_poller(bot, key)
await asyncio.sleep(0)
assert key not in ps["pollers"]
assert key not in ps["subs"]
asyncio.run(inner())
def test_start_idempotent(self):
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "idem", "channel": "#test",
"interval": 300, "seen": {"yt": [], "tw": [], "sx": []},
"last_poll": "", "last_error": "",
}
key = "#test:idem"
_save(bot, key, data)
_ps(bot)["subs"][key] = data
async def inner():
ps = _ps(bot)
_start_poller(bot, key)
first = ps["pollers"][key]
_start_poller(bot, key)
assert ps["pollers"][key] is first
_stop_poller(bot, key)
await asyncio.sleep(0)
asyncio.run(inner())
def test_stop_nonexistent(self):
bot = _FakeBot()
_stop_poller(bot, "#test:nonexistent")
# ---------------------------------------------------------------------------
# TestCmdAlertUsage
# ---------------------------------------------------------------------------
class TestCmdAlertUsage:
def test_no_args(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_alert(bot, _msg("!alert")))
assert "Usage:" in bot.replied[0]
def test_unknown_subcommand(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_alert(bot, _msg("!alert foobar")))
assert "Usage:" in bot.replied[0]
# ---------------------------------------------------------------------------
# TestSearchSearx
# ---------------------------------------------------------------------------
class TestSearchSearx:
def test_parses_response(self):
class FakeResp:
def read(self):
return json.dumps(SEARX_RESPONSE).encode()
def close(self):
pass
with patch("urllib.request.urlopen", return_value=FakeResp()):
results = _search_searx("test query")
# Same response served for all categories; deduped by URL
assert len(results) == 3
assert results[0]["id"] == "https://example.com/sx1"
assert results[0]["title"] == "SearX Result 1"
assert results[0]["url"] == "https://example.com/sx1"
assert results[0]["extra"] == ""
def test_empty_results(self):
empty = {"results": []}
class FakeResp:
def read(self):
return json.dumps(empty).encode()
def close(self):
pass
with patch("urllib.request.urlopen", return_value=FakeResp()):
results = _search_searx("nothing")
assert results == []
def test_http_error_returns_empty(self):
"""SearXNG catches per-category errors; all failing returns empty."""
with patch("urllib.request.urlopen", side_effect=ConnectionError("fail")):
results = _search_searx("test")
assert results == []
# ---------------------------------------------------------------------------
# TestExtraInHistory
# ---------------------------------------------------------------------------
class TestExtraInHistory:
def test_history_shows_extra(self):
"""History output includes | extra suffix when extra is non-empty."""
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "hist", "channel": "#test",
"interval": 300, "seen": {}, "last_poll": "", "last_error": "",
}
_save(bot, "#test:hist", data)
# Insert a result with extra metadata
_save_result(bot, "#test", "hist", "hn", {
"id": "h1", "title": "Cool HN Post", "url": "https://hn.example.com/1",
"date": "2026-01-15", "extra": "42pt 10c",
})
async def inner():
await cmd_alert(bot, _msg("!alert history hist"))
assert len(bot.replied) >= 1
found = any("| 42pt 10c" in line for line in bot.replied)
assert found, f"Expected extra in history, got: {bot.replied}"
asyncio.run(inner())
def test_history_no_extra(self):
"""History output has no pipe when extra is empty."""
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "hist2", "channel": "#test",
"interval": 300, "seen": {}, "last_poll": "", "last_error": "",
}
_save(bot, "#test:hist2", data)
_save_result(bot, "#test", "hist2", "yt", {
"id": "y1", "title": "Plain Video", "url": "https://yt.example.com/1",
"date": "", "extra": "",
})
async def inner():
await cmd_alert(bot, _msg("!alert history hist2"))
assert len(bot.replied) >= 1
assert all("|" not in line for line in bot.replied)
asyncio.run(inner())
# ---------------------------------------------------------------------------
# TestExtraInInfo
# ---------------------------------------------------------------------------
class TestExtraInInfo:
def test_info_shows_extra(self):
"""Info output includes | extra suffix when extra is non-empty."""
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "inf", "channel": "#test",
"interval": 300, "seen": {}, "last_poll": "", "last_error": "",
}
_save(bot, "#test:inf", data)
short_id = _save_result(bot, "#test", "inf", "gh", {
"id": "g1", "title": "cool/repo: A cool project",
"url": "https://github.com/cool/repo",
"date": "2026-01-10", "extra": "42* 5fk",
})
async def inner():
await cmd_alert(bot, _msg(f"!alert info {short_id}"))
assert len(bot.replied) >= 1
assert "| 42* 5fk" in bot.replied[0]
asyncio.run(inner())
def test_info_no_extra(self):
"""Info output has no pipe when extra is empty."""
_clear()
bot = _FakeBot()
data = {
"keyword": "test", "name": "inf2", "channel": "#test",
"interval": 300, "seen": {}, "last_poll": "", "last_error": "",
}
_save(bot, "#test:inf2", data)
short_id = _save_result(bot, "#test", "inf2", "yt", {
"id": "y2", "title": "Some Video",
"url": "https://youtube.com/watch?v=y2",
"date": "", "extra": "",
})
async def inner():
await cmd_alert(bot, _msg(f"!alert info {short_id}"))
assert len(bot.replied) >= 1
assert "|" not in bot.replied[0]
asyncio.run(inner())