"""Tests for the Telegram adapter.""" import asyncio from unittest.mock import MagicMock, patch from derp.plugin import PluginRegistry from derp.telegram import ( _MAX_MSG_LEN, TelegramBot, TelegramMessage, _build_telegram_message, _split_message, _strip_bot_suffix, ) # -- Helpers ----------------------------------------------------------------- def _make_bot(token="test:token", admins=None, operators=None, trusted=None, prefix=None): """Create a TelegramBot with test config.""" config = { "telegram": { "enabled": True, "bot_token": token, "poll_timeout": 1, "admins": admins or [], "operators": operators or [], "trusted": trusted or [], }, "bot": { "prefix": prefix or "!", "paste_threshold": 4, "plugins_dir": "plugins", "rate_limit": 2.0, "rate_burst": 5, }, } registry = PluginRegistry() bot = TelegramBot("tg-test", config, registry) bot.nick = "TestBot" bot._bot_username = "testbot" return bot def _update(text="!ping", nick="Alice", user_id=123, chat_id=-456, chat_type="group", username="alice"): """Build a minimal Telegram Update dict.""" return { "update_id": 1000, "message": { "message_id": 1, "from": { "id": user_id, "first_name": nick, "username": username, }, "chat": { "id": chat_id, "type": chat_type, }, "text": text, }, } def _tg_msg(text="!ping", nick="Alice", user_id="123", target="-456", is_channel=True): """Create a TelegramMessage for command testing.""" return TelegramMessage( raw={}, nick=nick, prefix=user_id, text=text, target=target, is_channel=is_channel, params=[target, text], ) # -- Test helpers for registering commands ----------------------------------- async def _echo_handler(bot, msg): """Simple command handler that echoes text.""" args = msg.text.split(None, 1) reply = args[1] if len(args) > 1 else "no args" await bot.reply(msg, reply) async def _admin_handler(bot, msg): """Admin-only command handler.""" await bot.reply(msg, "admin action done") # --------------------------------------------------------------------------- # TestTelegramMessage # --------------------------------------------------------------------------- class TestTelegramMessage: def test_defaults(self): msg = TelegramMessage(raw={}, nick=None, prefix=None, text=None, target=None) assert msg.is_channel is True assert msg.command == "PRIVMSG" assert msg.params == [] assert msg.tags == {} def test_custom_values(self): msg = TelegramMessage( raw={"update_id": 1}, nick="Alice", prefix="123", text="hello", target="-456", is_channel=True, command="PRIVMSG", params=["-456", "hello"], tags={"key": "val"}, ) assert msg.nick == "Alice" assert msg.prefix == "123" assert msg.text == "hello" assert msg.target == "-456" assert msg.tags == {"key": "val"} def test_duck_type_compat(self): """TelegramMessage has the same attribute names as IRC Message.""" msg = _tg_msg() attrs = ["raw", "nick", "prefix", "text", "target", "is_channel", "command", "params", "tags"] for attr in attrs: assert hasattr(msg, attr), f"missing attribute: {attr}" def test_dm_message(self): msg = _tg_msg(is_channel=False) assert msg.is_channel is False def test_prefix_is_user_id(self): msg = _tg_msg(user_id="999888777") assert msg.prefix == "999888777" # --------------------------------------------------------------------------- # TestBuildTelegramMessage # --------------------------------------------------------------------------- class TestBuildTelegramMessage: def test_group_message(self): update = _update(text="!ping", chat_type="group") msg = _build_telegram_message(update, "testbot") assert msg is not None assert msg.nick == "Alice" assert msg.prefix == "123" assert msg.text == "!ping" assert msg.target == "-456" assert msg.is_channel is True def test_dm_message(self): update = _update(chat_type="private", chat_id=789) msg = _build_telegram_message(update, "testbot") assert msg is not None assert msg.is_channel is False assert msg.target == "789" def test_supergroup_message(self): update = _update(chat_type="supergroup") msg = _build_telegram_message(update, "testbot") assert msg is not None assert msg.is_channel is True def test_missing_from(self): update = {"update_id": 1, "message": { "message_id": 1, "chat": {"id": -456, "type": "group"}, "text": "hello", }} msg = _build_telegram_message(update, "testbot") assert msg is not None assert msg.nick is None assert msg.prefix is None def test_missing_text(self): update = {"update_id": 1, "message": { "message_id": 1, "from": {"id": 123, "first_name": "Alice"}, "chat": {"id": -456, "type": "group"}, }} msg = _build_telegram_message(update, "testbot") assert msg is not None assert msg.text == "" def test_no_message(self): update = {"update_id": 1} msg = _build_telegram_message(update, "testbot") assert msg is None def test_strips_bot_suffix(self): update = _update(text="!help@testbot") msg = _build_telegram_message(update, "testbot") assert msg is not None assert msg.text == "!help" def test_edited_message(self): update = { "update_id": 1, "edited_message": { "message_id": 1, "from": {"id": 123, "first_name": "Alice"}, "chat": {"id": -456, "type": "group"}, "text": "!ping", }, } msg = _build_telegram_message(update, "testbot") assert msg is not None assert msg.text == "!ping" def test_raw_preserved(self): update = _update() msg = _build_telegram_message(update, "testbot") assert msg.raw is update def test_username_fallback_for_nick(self): update = _update() # Remove first_name, keep username update["message"]["from"] = {"id": 123, "username": "alice_u"} msg = _build_telegram_message(update, "testbot") assert msg.nick == "alice_u" # --------------------------------------------------------------------------- # TestStripBotSuffix # --------------------------------------------------------------------------- class TestStripBotSuffix: def test_strip_command(self): assert _strip_bot_suffix("!help@mybot", "mybot") == "!help" def test_strip_with_args(self): assert _strip_bot_suffix("!echo@mybot hello", "mybot") == "!echo hello" def test_no_suffix(self): assert _strip_bot_suffix("!help", "mybot") == "!help" def test_case_insensitive(self): assert _strip_bot_suffix("!help@MyBot", "mybot") == "!help" def test_empty_username(self): assert _strip_bot_suffix("!help@bot", "") == "!help@bot" def test_plain_text(self): assert _strip_bot_suffix("hello world", "mybot") == "hello world" # --------------------------------------------------------------------------- # TestTelegramBotReply # --------------------------------------------------------------------------- class TestTelegramBotReply: def test_send_calls_api(self): bot = _make_bot() with patch.object(bot, "_api_call", return_value={"ok": True}): asyncio.run(bot.send("-456", "hello")) bot._api_call.assert_called_once_with( "sendMessage", {"chat_id": "-456", "text": "hello"}) def test_reply_sends_to_target(self): bot = _make_bot() msg = _tg_msg(target="-456") sent: list[tuple[str, str]] = [] async def _fake_send(target, text): sent.append((target, text)) with patch.object(bot, "send", side_effect=_fake_send): asyncio.run(bot.reply(msg, "pong")) assert sent == [("-456", "pong")] def test_reply_no_target(self): bot = _make_bot() msg = _tg_msg(target=None) msg.target = None with patch.object(bot, "send") as mock_send: asyncio.run(bot.reply(msg, "pong")) mock_send.assert_not_called() def test_long_reply_under_threshold(self): bot = _make_bot() msg = _tg_msg() sent: list[str] = [] async def _fake_send(target, text): sent.append(text) with patch.object(bot, "send", side_effect=_fake_send): asyncio.run(bot.long_reply(msg, ["a", "b", "c"])) assert sent == ["a", "b", "c"] def test_long_reply_over_threshold_no_paste(self): bot = _make_bot() msg = _tg_msg() sent: list[str] = [] async def _fake_send(target, text): sent.append(text) with patch.object(bot, "send", side_effect=_fake_send): asyncio.run(bot.long_reply(msg, ["a", "b", "c", "d", "e"])) assert sent == ["a", "b", "c", "d", "e"] def test_long_reply_empty(self): bot = _make_bot() msg = _tg_msg() with patch.object(bot, "send") as mock_send: asyncio.run(bot.long_reply(msg, [])) mock_send.assert_not_called() def test_action_format(self): bot = _make_bot() sent: list[tuple[str, str]] = [] async def _fake_send(target, text): sent.append((target, text)) with patch.object(bot, "send", side_effect=_fake_send): asyncio.run(bot.action("-456", "does a thing")) assert sent == [("-456", "_does a thing_")] # --------------------------------------------------------------------------- # TestTelegramBotDispatch # --------------------------------------------------------------------------- class TestTelegramBotDispatch: def test_dispatch_known_command(self): bot = _make_bot() bot.registry.register_command( "echo", _echo_handler, help="echo", plugin="test") msg = _tg_msg(text="!echo world") sent: list[str] = [] async def _fake_send(target, text): sent.append(text) with patch.object(bot, "send", side_effect=_fake_send): asyncio.run(bot._dispatch_command(msg)) assert sent == ["world"] def test_dispatch_unknown_command(self): bot = _make_bot() msg = _tg_msg(text="!nonexistent") with patch.object(bot, "send") as mock_send: asyncio.run(bot._dispatch_command(msg)) mock_send.assert_not_called() def test_dispatch_no_prefix(self): bot = _make_bot() msg = _tg_msg(text="just a message") with patch.object(bot, "send") as mock_send: asyncio.run(bot._dispatch_command(msg)) mock_send.assert_not_called() def test_dispatch_empty_text(self): bot = _make_bot() msg = _tg_msg(text="") with patch.object(bot, "send") as mock_send: asyncio.run(bot._dispatch_command(msg)) mock_send.assert_not_called() def test_dispatch_none_text(self): bot = _make_bot() msg = _tg_msg() msg.text = None with patch.object(bot, "send") as mock_send: asyncio.run(bot._dispatch_command(msg)) mock_send.assert_not_called() def test_dispatch_ambiguous(self): bot = _make_bot() bot.registry.register_command("ping", _echo_handler, plugin="test") bot.registry.register_command("plugins", _echo_handler, plugin="test") msg = _tg_msg(text="!p") sent: list[str] = [] async def _fake_send(target, text): sent.append(text) with patch.object(bot, "send", side_effect=_fake_send): asyncio.run(bot._dispatch_command(msg)) assert len(sent) == 1 assert "Ambiguous" in sent[0] def test_dispatch_tier_denied(self): bot = _make_bot() bot.registry.register_command( "secret", _admin_handler, plugin="test", tier="admin") msg = _tg_msg(text="!secret", user_id="999") sent: list[str] = [] async def _fake_send(target, text): sent.append(text) with patch.object(bot, "send", side_effect=_fake_send): asyncio.run(bot._dispatch_command(msg)) assert len(sent) == 1 assert "Permission denied" in sent[0] def test_dispatch_tier_allowed(self): bot = _make_bot(admins=[123]) bot.registry.register_command( "secret", _admin_handler, plugin="test", tier="admin") msg = _tg_msg(text="!secret", user_id="123") sent: list[str] = [] async def _fake_send(target, text): sent.append(text) with patch.object(bot, "send", side_effect=_fake_send): asyncio.run(bot._dispatch_command(msg)) assert sent == ["admin action done"] def test_dispatch_prefix_match(self): bot = _make_bot() bot.registry.register_command("echo", _echo_handler, plugin="test") msg = _tg_msg(text="!ec hello") sent: list[str] = [] async def _fake_send(target, text): sent.append(text) with patch.object(bot, "send", side_effect=_fake_send): asyncio.run(bot._dispatch_command(msg)) assert sent == ["hello"] # --------------------------------------------------------------------------- # TestTelegramBotTier # --------------------------------------------------------------------------- class TestTelegramBotTier: def test_admin_tier(self): bot = _make_bot(admins=[111]) msg = _tg_msg(user_id="111") assert bot._get_tier(msg) == "admin" def test_oper_tier(self): bot = _make_bot(operators=[222]) msg = _tg_msg(user_id="222") assert bot._get_tier(msg) == "oper" def test_trusted_tier(self): bot = _make_bot(trusted=[333]) msg = _tg_msg(user_id="333") assert bot._get_tier(msg) == "trusted" def test_user_tier_default(self): bot = _make_bot() msg = _tg_msg(user_id="999") assert bot._get_tier(msg) == "user" def test_no_prefix(self): bot = _make_bot(admins=[111]) msg = _tg_msg() msg.prefix = None assert bot._get_tier(msg) == "user" def test_is_admin_true(self): bot = _make_bot(admins=[111]) msg = _tg_msg(user_id="111") assert bot._is_admin(msg) is True def test_is_admin_false(self): bot = _make_bot() msg = _tg_msg(user_id="999") assert bot._is_admin(msg) is False def test_priority_order(self): """Admin takes priority over oper and trusted.""" bot = _make_bot(admins=[111], operators=[111], trusted=[111]) msg = _tg_msg(user_id="111") assert bot._get_tier(msg) == "admin" # --------------------------------------------------------------------------- # TestTelegramBotNoOps # --------------------------------------------------------------------------- class TestTelegramBotNoOps: def test_join_noop(self): bot = _make_bot() asyncio.run(bot.join("#channel")) def test_part_noop(self): bot = _make_bot() asyncio.run(bot.part("#channel", "reason")) def test_kick_noop(self): bot = _make_bot() asyncio.run(bot.kick("#channel", "nick", "reason")) def test_mode_noop(self): bot = _make_bot() asyncio.run(bot.mode("#channel", "+o", "nick")) def test_set_topic_noop(self): bot = _make_bot() asyncio.run(bot.set_topic("#channel", "new topic")) def test_quit_stops(self): bot = _make_bot() bot._running = True asyncio.run(bot.quit()) assert bot._running is False # --------------------------------------------------------------------------- # TestTelegramBotPoll # --------------------------------------------------------------------------- class TestTelegramBotPoll: def test_poll_updates_parses(self): bot = _make_bot() result = { "ok": True, "result": [ {"update_id": 100, "message": { "message_id": 1, "from": {"id": 123, "first_name": "Alice"}, "chat": {"id": -456, "type": "group"}, "text": "hello", }}, ], } with patch.object(bot, "_api_call", return_value=result): updates = bot._poll_updates() assert len(updates) == 1 assert bot._offset == 101 def test_poll_updates_empty(self): bot = _make_bot() with patch.object(bot, "_api_call", return_value={"ok": True, "result": []}): updates = bot._poll_updates() assert updates == [] assert bot._offset == 0 def test_poll_updates_failed(self): bot = _make_bot() with patch.object(bot, "_api_call", return_value={"ok": False, "description": "err"}): updates = bot._poll_updates() assert updates == [] def test_offset_advances(self): bot = _make_bot() result = { "ok": True, "result": [ {"update_id": 50, "message": { "message_id": 1, "from": {"id": 1, "first_name": "A"}, "chat": {"id": -1, "type": "group"}, "text": "a", }}, {"update_id": 51, "message": { "message_id": 2, "from": {"id": 2, "first_name": "B"}, "chat": {"id": -2, "type": "group"}, "text": "b", }}, ], } with patch.object(bot, "_api_call", return_value=result): bot._poll_updates() assert bot._offset == 52 def test_start_getme_failure(self): config = { "telegram": { "enabled": True, "bot_token": "t", "poll_timeout": 1, "admins": [], "operators": [], "trusted": [], }, "bot": {"prefix": "!", "rate_limit": 2.0, "rate_burst": 5}, } bot = TelegramBot("tg-test", config, PluginRegistry()) with patch.object(bot, "_api_call", return_value={"ok": False}): asyncio.run(bot.start()) assert bot.nick == "" def test_start_getme_exception(self): config = { "telegram": { "enabled": True, "bot_token": "t", "poll_timeout": 1, "admins": [], "operators": [], "trusted": [], }, "bot": {"prefix": "!", "rate_limit": 2.0, "rate_burst": 5}, } bot = TelegramBot("tg-test", config, PluginRegistry()) with patch.object(bot, "_api_call", side_effect=Exception("network")): asyncio.run(bot.start()) assert bot.nick == "" # --------------------------------------------------------------------------- # TestTelegramApiCall # --------------------------------------------------------------------------- class TestTelegramApiCall: def test_api_url(self): bot = _make_bot(token="123:ABC") url = bot._api_url("getMe") assert url == "https://api.telegram.org/bot123:ABC/getMe" def test_api_call_get(self): bot = _make_bot() mock_resp = MagicMock() mock_resp.read.return_value = b'{"ok": true, "result": {}}' with patch("derp.telegram.http.urlopen", return_value=mock_resp): result = bot._api_call("getMe") assert result["ok"] is True def test_api_call_post(self): bot = _make_bot() mock_resp = MagicMock() mock_resp.read.return_value = b'{"ok": true, "result": {}}' with patch("derp.telegram.http.urlopen", return_value=mock_resp): result = bot._api_call("sendMessage", {"chat_id": "1", "text": "hi"}) assert result["ok"] is True def test_split_long_message(self): # Build a message that exceeds 4096 bytes lines = [f"line {i}: {'x' * 100}" for i in range(50)] text = "\n".join(lines) chunks = _split_message(text) assert len(chunks) > 1 for chunk in chunks: assert len(chunk.encode("utf-8")) <= _MAX_MSG_LEN def test_short_message_no_split(self): chunks = _split_message("hello world") assert chunks == ["hello world"] def test_send_splits_long_text(self): bot = _make_bot() lines = [f"line {i}: {'x' * 100}" for i in range(50)] text = "\n".join(lines) calls: list[dict] = [] def _fake_api_call(method, payload=None): if method == "sendMessage" and payload: calls.append(payload) return {"ok": True} with patch.object(bot, "_api_call", side_effect=_fake_api_call): asyncio.run(bot.send("-456", text)) assert len(calls) > 1 for call in calls: assert len(call["text"].encode("utf-8")) <= _MAX_MSG_LEN # --------------------------------------------------------------------------- # TestPluginManagement # --------------------------------------------------------------------------- class TestPluginManagement: def test_load_plugin_not_found(self): bot = _make_bot() ok, msg = bot.load_plugin("nonexistent_xyz") assert ok is False assert "not found" in msg def test_load_plugin_already_loaded(self): bot = _make_bot() bot.registry._modules["test"] = object() ok, msg = bot.load_plugin("test") assert ok is False assert "already loaded" in msg def test_unload_core_refused(self): bot = _make_bot() ok, msg = bot.unload_plugin("core") assert ok is False assert "cannot unload core" in msg def test_unload_not_loaded(self): bot = _make_bot() ok, msg = bot.unload_plugin("nonexistent") assert ok is False assert "not loaded" in msg def test_reload_delegates(self): bot = _make_bot() ok, msg = bot.reload_plugin("nonexistent") assert ok is False assert "not loaded" in msg # --------------------------------------------------------------------------- # TestSplitMessage # --------------------------------------------------------------------------- class TestSplitMessage: def test_short_text(self): assert _split_message("hi") == ["hi"] def test_exact_boundary(self): text = "a" * 4096 result = _split_message(text) assert len(result) == 1 def test_multi_line_split(self): lines = ["line " + str(i) for i in range(1000)] text = "\n".join(lines) chunks = _split_message(text) assert len(chunks) > 1 reassembled = "\n".join(chunks) assert reassembled == text def test_empty_text(self): assert _split_message("") == [""] # --------------------------------------------------------------------------- # TestTelegramBotConfig # --------------------------------------------------------------------------- class TestTelegramBotConfig: def test_prefix_from_telegram_section(self): config = { "telegram": { "enabled": True, "bot_token": "t", "poll_timeout": 1, "prefix": "/", "admins": [], "operators": [], "trusted": [], }, "bot": {"prefix": "!", "rate_limit": 2.0, "rate_burst": 5}, } bot = TelegramBot("test", config, PluginRegistry()) assert bot.prefix == "/" def test_prefix_falls_back_to_bot(self): config = { "telegram": { "enabled": True, "bot_token": "t", "poll_timeout": 1, "admins": [], "operators": [], "trusted": [], }, "bot": {"prefix": "!", "rate_limit": 2.0, "rate_burst": 5}, } bot = TelegramBot("test", config, PluginRegistry()) assert bot.prefix == "!" def test_admins_coerced_to_str(self): bot = _make_bot(admins=[111, 222]) assert bot._admins == ["111", "222"] def test_proxy_default_true(self): bot = _make_bot() assert bot._proxy is True def test_proxy_disabled(self): config = { "telegram": { "enabled": True, "bot_token": "t", "poll_timeout": 1, "proxy": False, "admins": [], "operators": [], "trusted": [], }, "bot": {"prefix": "!", "rate_limit": 2.0, "rate_burst": 5}, } bot = TelegramBot("test", config, PluginRegistry()) assert bot._proxy is False