From 3bcba8b0a94199ed3af4ca98aca3cd01d8c99b4e Mon Sep 17 00:00:00 2001 From: user Date: Sat, 21 Feb 2026 20:06:25 +0100 Subject: [PATCH] feat: add Telegram bot support via long-polling TelegramBot adapter with getUpdates long-polling, all HTTP through SOCKS5 proxy. Duck-typed TelegramMessage compatible with IRC Message. Message splitting at 4096 chars, @botusername suffix stripping, permission tiers via user IDs. 75 test cases. --- src/derp/cli.py | 7 + src/derp/config.py | 8 + src/derp/telegram.py | 488 ++++++++++++++++++++++++++ tests/test_telegram.py | 766 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 1269 insertions(+) create mode 100644 src/derp/telegram.py create mode 100644 tests/test_telegram.py diff --git a/src/derp/cli.py b/src/derp/cli.py index 9d25736..c1f0d4c 100644 --- a/src/derp/cli.py +++ b/src/derp/cli.py @@ -141,6 +141,13 @@ def main(argv: list[str] | None = None) -> int: teams_bot = TeamsBot("teams", config, registry) bots.append(teams_bot) + # Telegram adapter (optional) + if config.get("telegram", {}).get("enabled"): + from derp.telegram import TelegramBot + + tg_bot = TelegramBot("telegram", config, registry) + bots.append(tg_bot) + names = ", ".join(b.name for b in bots) log.info("servers: %s", names) diff --git a/src/derp/config.py b/src/derp/config.py index 8dc4ded..c4d40a0 100644 --- a/src/derp/config.py +++ b/src/derp/config.py @@ -50,6 +50,14 @@ DEFAULTS: dict = { "operators": [], "trusted": [], }, + "telegram": { + "enabled": False, + "bot_token": "", + "poll_timeout": 30, + "admins": [], + "operators": [], + "trusted": [], + }, "logging": { "level": "info", "format": "text", diff --git a/src/derp/telegram.py b/src/derp/telegram.py new file mode 100644 index 0000000..128c935 --- /dev/null +++ b/src/derp/telegram.py @@ -0,0 +1,488 @@ +"""Telegram adapter: long-polling via getUpdates, all HTTP through SOCKS5.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import time +import urllib.request +from dataclasses import dataclass, field +from pathlib import Path + +from derp import http +from derp.bot import _TokenBucket +from derp.plugin import TIERS, PluginRegistry +from derp.state import StateStore + +log = logging.getLogger(__name__) + +_API_BASE = "https://api.telegram.org/bot" +_MAX_MSG_LEN = 4096 +_AMBIGUOUS = object() # sentinel for ambiguous prefix matches + + +@dataclass(slots=True) +class TelegramMessage: + """Parsed Telegram update, duck-typed with IRC Message. + + Plugins that use only ``msg.nick``, ``msg.text``, ``msg.target``, + ``msg.is_channel``, ``msg.prefix``, ``msg.command``, ``msg.params``, + and ``msg.tags`` work without modification. + """ + + raw: dict # original Telegram Update + nick: str | None # first_name (or username fallback) + prefix: str | None # user_id as string (for ACL) + text: str | None # message text + target: str | None # chat_id as string + is_channel: bool = True # True for groups, False for DMs + command: str = "PRIVMSG" # compat shim + params: list[str] = field(default_factory=list) + tags: dict[str, str] = field(default_factory=dict) + + +# -- Helpers ----------------------------------------------------------------- + + +def _strip_bot_suffix(text: str, bot_username: str) -> str: + """Strip ``@botusername`` suffix from command text. + + ``!help@mybot`` -> ``!help`` + """ + if not bot_username: + return text + suffix = f"@{bot_username}" + if " " in text: + first, rest = text.split(" ", 1) + if first.lower().endswith(suffix.lower()): + return first[: -len(suffix)] + " " + rest + return text + if text.lower().endswith(suffix.lower()): + return text[: -len(suffix)] + return text + + +def _build_telegram_message( + update: dict, bot_username: str, +) -> TelegramMessage | None: + """Build a TelegramMessage from a Telegram Update dict. + + Returns None if the update has no usable message. + """ + msg = update.get("message") or update.get("edited_message") + if not msg or not isinstance(msg, dict): + return None + + sender = msg.get("from", {}) + chat = msg.get("chat", {}) + + nick = sender.get("first_name") or sender.get("username") + user_id = sender.get("id") + prefix = str(user_id) if user_id is not None else None + + raw_text = msg.get("text", "") + text = _strip_bot_suffix(raw_text, bot_username) if raw_text else raw_text + + chat_id = chat.get("id") + target = str(chat_id) if chat_id is not None else None + + chat_type = chat.get("type", "private") + is_channel = chat_type in ("group", "supergroup", "channel") + + return TelegramMessage( + raw=update, + nick=nick, + prefix=prefix, + text=text, + target=target, + is_channel=is_channel, + params=[target or "", text] if target else [text], + ) + + +def _split_message(text: str, max_len: int = _MAX_MSG_LEN) -> list[str]: + """Split text at line boundaries to fit within max_len.""" + if len(text.encode("utf-8")) <= max_len: + return [text] + + chunks: list[str] = [] + current: list[str] = [] + current_len = 0 + + for line in text.split("\n"): + line_len = len(line.encode("utf-8")) + 1 # +1 for newline + if current and current_len + line_len > max_len: + chunks.append("\n".join(current)) + current = [] + current_len = 0 + current.append(line) + current_len += line_len + + if current: + chunks.append("\n".join(current)) + return chunks + + +# -- TelegramBot ------------------------------------------------------------- + + +class TelegramBot: + """Telegram bot adapter via long-polling (getUpdates). + + Exposes the same public API as :class:`derp.bot.Bot` so that + protocol-agnostic plugins work without modification. + All HTTP goes through ``derp.http.urlopen`` (SOCKS5 proxy). + """ + + def __init__(self, name: str, config: dict, registry: PluginRegistry) -> None: + self.name = name + self.config = config + self.registry = registry + self._pstate: dict = {} + + tg_cfg = config.get("telegram", {}) + self._token: str = tg_cfg.get("bot_token", "") + self._poll_timeout: int = tg_cfg.get("poll_timeout", 30) + self.nick: str = "" # set by getMe + self._bot_username: str = "" # set by getMe + self.prefix: str = ( + tg_cfg.get("prefix") + or config.get("bot", {}).get("prefix", "!") + ) + self._running = False + self._started: float = time.monotonic() + self._tasks: set[asyncio.Task] = set() + self._admins: list[str] = [str(x) for x in tg_cfg.get("admins", [])] + self._operators: list[str] = [str(x) for x in tg_cfg.get("operators", [])] + self._trusted: list[str] = [str(x) for x in tg_cfg.get("trusted", [])] + self.state = StateStore(f"data/state-{name}.db") + self._offset: int = 0 + + rate_cfg = config.get("bot", {}) + self._bucket = _TokenBucket( + rate=rate_cfg.get("rate_limit", 2.0), + burst=rate_cfg.get("rate_burst", 5), + ) + + # -- Telegram API -------------------------------------------------------- + + def _api_url(self, method: str) -> str: + """Build Telegram Bot API URL.""" + return f"{_API_BASE}{self._token}/{method}" + + def _api_call(self, method: str, payload: dict | None = None) -> dict: + """Synchronous Telegram API call through SOCKS5 proxy. + + Meant to run in a thread executor. + """ + url = self._api_url(method) + if payload: + data = json.dumps(payload).encode("utf-8") + req = urllib.request.Request( + url, data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + else: + req = urllib.request.Request(url, method="GET") + + timeout = self._poll_timeout + 5 if method == "getUpdates" else 30 + resp = http.urlopen(req, timeout=timeout) + body = resp.read() if hasattr(resp, "read") else resp.data + return json.loads(body) + + # -- Lifecycle ----------------------------------------------------------- + + async def start(self) -> None: + """Verify token, then enter long-poll loop.""" + self._running = True + loop = asyncio.get_running_loop() + + # Verify token via getMe + try: + result = await loop.run_in_executor(None, self._api_call, "getMe") + if not result.get("ok"): + log.error("telegram: getMe failed: %s", result) + return + me = result.get("result", {}) + self.nick = me.get("first_name", "bot") + self._bot_username = me.get("username", "") + log.info("telegram: authenticated as @%s", self._bot_username) + except Exception: + log.exception("telegram: failed to authenticate") + return + + # Long-poll loop + while self._running: + try: + updates = await loop.run_in_executor( + None, self._poll_updates, + ) + for update in updates: + msg = _build_telegram_message(update, self._bot_username) + if msg is not None: + await self._dispatch_command(msg) + except Exception: + if self._running: + log.exception("telegram: poll error, backing off 5s") + await asyncio.sleep(5) + + log.info("telegram: stopped") + + def _poll_updates(self) -> list[dict]: + """Fetch updates from Telegram (blocking, run in executor).""" + payload = { + "offset": self._offset, + "timeout": self._poll_timeout, + } + result = self._api_call("getUpdates", payload) + if not result.get("ok"): + log.warning("telegram: getUpdates failed: %s", result) + return [] + updates = result.get("result", []) + if updates: + self._offset = updates[-1]["update_id"] + 1 + return updates + + # -- Command dispatch ---------------------------------------------------- + + async def _dispatch_command(self, msg: TelegramMessage) -> None: + """Parse and dispatch a command from a Telegram message.""" + text = msg.text + if not text or not text.startswith(self.prefix): + return + + parts = text[len(self.prefix):].split(None, 1) + cmd_name = parts[0].lower() if parts else "" + handler = self._resolve_command(cmd_name) + if handler is None: + return + if handler is _AMBIGUOUS: + matches = [k for k in self.registry.commands + if k.startswith(cmd_name)] + names = ", ".join(self.prefix + m for m in sorted(matches)) + await self.reply( + msg, + f"Ambiguous command '{self.prefix}{cmd_name}': {names}", + ) + return + + if not self._plugin_allowed(handler.plugin, msg.target): + return + + required = handler.tier + if required != "user": + sender = self._get_tier(msg) + if TIERS.index(sender) < TIERS.index(required): + await self.reply( + msg, + f"Permission denied: {self.prefix}{cmd_name} " + f"requires {required}", + ) + return + + try: + await handler.callback(self, msg) + except Exception: + log.exception("telegram: error in command handler '%s'", cmd_name) + + def _resolve_command(self, name: str): + """Resolve command name with unambiguous prefix matching. + + Returns the Handler on exact or unique prefix match, the sentinel + ``_AMBIGUOUS`` if multiple commands match, or None if nothing matches. + """ + handler = self.registry.commands.get(name) + if handler is not None: + return handler + matches = [v for k, v in self.registry.commands.items() + if k.startswith(name)] + if len(matches) == 1: + return matches[0] + if len(matches) > 1: + return _AMBIGUOUS + return None + + def _plugin_allowed(self, plugin_name: str, channel: str | None) -> bool: + """Channel filtering is IRC-only; all plugins are allowed on Telegram.""" + return True + + # -- Permission tiers ---------------------------------------------------- + + def _get_tier(self, msg: TelegramMessage) -> str: + """Determine permission tier from user_id. + + Matches exact string comparison of user_id against config lists. + """ + if not msg.prefix: + return "user" + for uid in self._admins: + if msg.prefix == uid: + return "admin" + for uid in self._operators: + if msg.prefix == uid: + return "oper" + for uid in self._trusted: + if msg.prefix == uid: + return "trusted" + return "user" + + def _is_admin(self, msg: TelegramMessage) -> bool: + """Check if the message sender is a bot admin.""" + return self._get_tier(msg) == "admin" + + # -- Public API for plugins ---------------------------------------------- + + async def send(self, target: str, text: str) -> None: + """Send a message via sendMessage API (proxied, rate-limited). + + Long messages are split at line boundaries to fit Telegram's + 4096-character limit. + """ + await self._bucket.acquire() + loop = asyncio.get_running_loop() + for chunk in _split_message(text): + payload = { + "chat_id": target, + "text": chunk, + } + try: + await loop.run_in_executor( + None, self._api_call, "sendMessage", payload, + ) + except Exception: + log.exception("telegram: failed to send message") + + async def reply(self, msg, text: str) -> None: + """Reply to the source chat.""" + if msg.target: + await self.send(msg.target, text) + + async def long_reply( + self, msg, lines: list[str], *, + label: str = "", + ) -> None: + """Reply with a list of lines; paste overflow to FlaskPaste. + + Same overflow logic as :meth:`derp.bot.Bot.long_reply`. + """ + threshold = self.config.get("bot", {}).get("paste_threshold", 4) + if not lines or not msg.target: + return + + if len(lines) <= threshold: + for line in lines: + await self.send(msg.target, line) + return + + # Attempt paste overflow + fp = self.registry._modules.get("flaskpaste") + paste_url = None + if fp: + full_text = "\n".join(lines) + loop = asyncio.get_running_loop() + paste_url = await loop.run_in_executor( + None, fp.create_paste, self, full_text, + ) + + if paste_url: + preview_count = min(2, threshold - 1) + for line in lines[:preview_count]: + await self.send(msg.target, line) + remaining = len(lines) - preview_count + suffix = f" ({label})" if label else "" + await self.send( + msg.target, + f"... {remaining} more lines{suffix}: {paste_url}", + ) + else: + for line in lines: + await self.send(msg.target, line) + + async def action(self, target: str, text: str) -> None: + """Send an action as italic Markdown text.""" + await self.send(target, f"_{text}_") + + async def shorten_url(self, url: str) -> str: + """Shorten a URL via FlaskPaste. Returns original on failure.""" + fp = self.registry._modules.get("flaskpaste") + if not fp: + return url + loop = asyncio.get_running_loop() + try: + return await loop.run_in_executor(None, fp.shorten_url, self, url) + except Exception: + return url + + # -- IRC no-ops ---------------------------------------------------------- + + async def join(self, channel: str) -> None: + """No-op: IRC-only concept.""" + log.debug("telegram: join() is a no-op") + + async def part(self, channel: str, reason: str = "") -> None: + """No-op: IRC-only concept.""" + log.debug("telegram: part() is a no-op") + + async def quit(self, reason: str = "bye") -> None: + """Stop the Telegram adapter.""" + self._running = False + + async def kick(self, channel: str, nick: str, reason: str = "") -> None: + """No-op: IRC-only concept.""" + log.debug("telegram: kick() is a no-op") + + async def mode(self, target: str, mode_str: str, *args: str) -> None: + """No-op: IRC-only concept.""" + log.debug("telegram: mode() is a no-op") + + async def set_topic(self, channel: str, topic: str) -> None: + """No-op: IRC-only concept.""" + log.debug("telegram: set_topic() is a no-op") + + # -- Plugin management (delegated to registry) --------------------------- + + def load_plugins(self, plugins_dir: str | Path | None = None) -> None: + """Load plugins from the configured directory.""" + if plugins_dir is None: + plugins_dir = self.config.get("bot", {}).get( + "plugins_dir", "plugins") + path = Path(plugins_dir) + self.registry.load_directory(path) + + @property + def plugins_dir(self) -> Path: + """Resolved path to the plugins directory.""" + return Path(self.config.get("bot", {}).get("plugins_dir", "plugins")) + + def load_plugin(self, name: str) -> tuple[bool, str]: + """Hot-load a new plugin by name from the plugins directory.""" + if name in self.registry._modules: + return False, f"plugin already loaded: {name}" + path = self.plugins_dir / f"{name}.py" + if not path.is_file(): + return False, f"{name}.py not found" + count = self.registry.load_plugin(path) + if count < 0: + return False, f"failed to load {name}" + return True, f"{count} handlers" + + def reload_plugin(self, name: str) -> tuple[bool, str]: + """Reload a plugin, picking up any file changes.""" + return self.registry.reload_plugin(name) + + def unload_plugin(self, name: str) -> tuple[bool, str]: + """Unload a plugin, removing all its handlers.""" + if self.registry.unload_plugin(name): + return True, "" + if name == "core": + return False, "cannot unload core" + return False, f"plugin not loaded: {name}" + + def _spawn(self, coro, *, name: str | None = None) -> asyncio.Task: + """Spawn a background task and track it for cleanup.""" + task = asyncio.create_task(coro, name=name) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) + return task diff --git a/tests/test_telegram.py b/tests/test_telegram.py new file mode 100644 index 0000000..ba75436 --- /dev/null +++ b/tests/test_telegram.py @@ -0,0 +1,766 @@ +"""Tests for the Telegram adapter.""" + +import asyncio +from unittest.mock import MagicMock, patch + +from derp.plugin import PluginRegistry +from derp.telegram import ( + _MAX_MSG_LEN, + TelegramBot, + TelegramMessage, + _build_telegram_message, + _split_message, + _strip_bot_suffix, +) + +# -- Helpers ----------------------------------------------------------------- + + +def _make_bot(token="test:token", admins=None, operators=None, trusted=None, + prefix=None): + """Create a TelegramBot with test config.""" + config = { + "telegram": { + "enabled": True, + "bot_token": token, + "poll_timeout": 1, + "admins": admins or [], + "operators": operators or [], + "trusted": trusted or [], + }, + "bot": { + "prefix": prefix or "!", + "paste_threshold": 4, + "plugins_dir": "plugins", + "rate_limit": 2.0, + "rate_burst": 5, + }, + } + registry = PluginRegistry() + bot = TelegramBot("tg-test", config, registry) + bot.nick = "TestBot" + bot._bot_username = "testbot" + return bot + + +def _update(text="!ping", nick="Alice", user_id=123, + chat_id=-456, chat_type="group", username="alice"): + """Build a minimal Telegram Update dict.""" + return { + "update_id": 1000, + "message": { + "message_id": 1, + "from": { + "id": user_id, + "first_name": nick, + "username": username, + }, + "chat": { + "id": chat_id, + "type": chat_type, + }, + "text": text, + }, + } + + +def _tg_msg(text="!ping", nick="Alice", user_id="123", + target="-456", is_channel=True): + """Create a TelegramMessage for command testing.""" + return TelegramMessage( + raw={}, nick=nick, prefix=user_id, text=text, target=target, + is_channel=is_channel, + params=[target, text], + ) + + +# -- Test helpers for registering commands ----------------------------------- + + +async def _echo_handler(bot, msg): + """Simple command handler that echoes text.""" + args = msg.text.split(None, 1) + reply = args[1] if len(args) > 1 else "no args" + await bot.reply(msg, reply) + + +async def _admin_handler(bot, msg): + """Admin-only command handler.""" + await bot.reply(msg, "admin action done") + + +# --------------------------------------------------------------------------- +# TestTelegramMessage +# --------------------------------------------------------------------------- + + +class TestTelegramMessage: + def test_defaults(self): + msg = TelegramMessage(raw={}, nick=None, prefix=None, text=None, + target=None) + assert msg.is_channel is True + assert msg.command == "PRIVMSG" + assert msg.params == [] + assert msg.tags == {} + + def test_custom_values(self): + msg = TelegramMessage( + raw={"update_id": 1}, nick="Alice", prefix="123", + text="hello", target="-456", is_channel=True, + command="PRIVMSG", params=["-456", "hello"], + tags={"key": "val"}, + ) + assert msg.nick == "Alice" + assert msg.prefix == "123" + assert msg.text == "hello" + assert msg.target == "-456" + assert msg.tags == {"key": "val"} + + def test_duck_type_compat(self): + """TelegramMessage has the same attribute names as IRC Message.""" + msg = _tg_msg() + attrs = ["raw", "nick", "prefix", "text", "target", + "is_channel", "command", "params", "tags"] + for attr in attrs: + assert hasattr(msg, attr), f"missing attribute: {attr}" + + def test_dm_message(self): + msg = _tg_msg(is_channel=False) + assert msg.is_channel is False + + def test_prefix_is_user_id(self): + msg = _tg_msg(user_id="999888777") + assert msg.prefix == "999888777" + + +# --------------------------------------------------------------------------- +# TestBuildTelegramMessage +# --------------------------------------------------------------------------- + + +class TestBuildTelegramMessage: + def test_group_message(self): + update = _update(text="!ping", chat_type="group") + msg = _build_telegram_message(update, "testbot") + assert msg is not None + assert msg.nick == "Alice" + assert msg.prefix == "123" + assert msg.text == "!ping" + assert msg.target == "-456" + assert msg.is_channel is True + + def test_dm_message(self): + update = _update(chat_type="private", chat_id=789) + msg = _build_telegram_message(update, "testbot") + assert msg is not None + assert msg.is_channel is False + assert msg.target == "789" + + def test_supergroup_message(self): + update = _update(chat_type="supergroup") + msg = _build_telegram_message(update, "testbot") + assert msg is not None + assert msg.is_channel is True + + def test_missing_from(self): + update = {"update_id": 1, "message": { + "message_id": 1, + "chat": {"id": -456, "type": "group"}, + "text": "hello", + }} + msg = _build_telegram_message(update, "testbot") + assert msg is not None + assert msg.nick is None + assert msg.prefix is None + + def test_missing_text(self): + update = {"update_id": 1, "message": { + "message_id": 1, + "from": {"id": 123, "first_name": "Alice"}, + "chat": {"id": -456, "type": "group"}, + }} + msg = _build_telegram_message(update, "testbot") + assert msg is not None + assert msg.text == "" + + def test_no_message(self): + update = {"update_id": 1} + msg = _build_telegram_message(update, "testbot") + assert msg is None + + def test_strips_bot_suffix(self): + update = _update(text="!help@testbot") + msg = _build_telegram_message(update, "testbot") + assert msg is not None + assert msg.text == "!help" + + def test_edited_message(self): + update = { + "update_id": 1, + "edited_message": { + "message_id": 1, + "from": {"id": 123, "first_name": "Alice"}, + "chat": {"id": -456, "type": "group"}, + "text": "!ping", + }, + } + msg = _build_telegram_message(update, "testbot") + assert msg is not None + assert msg.text == "!ping" + + def test_raw_preserved(self): + update = _update() + msg = _build_telegram_message(update, "testbot") + assert msg.raw is update + + def test_username_fallback_for_nick(self): + update = _update() + # Remove first_name, keep username + update["message"]["from"] = {"id": 123, "username": "alice_u"} + msg = _build_telegram_message(update, "testbot") + assert msg.nick == "alice_u" + + +# --------------------------------------------------------------------------- +# TestStripBotSuffix +# --------------------------------------------------------------------------- + + +class TestStripBotSuffix: + def test_strip_command(self): + assert _strip_bot_suffix("!help@mybot", "mybot") == "!help" + + def test_strip_with_args(self): + assert _strip_bot_suffix("!echo@mybot hello", "mybot") == "!echo hello" + + def test_no_suffix(self): + assert _strip_bot_suffix("!help", "mybot") == "!help" + + def test_case_insensitive(self): + assert _strip_bot_suffix("!help@MyBot", "mybot") == "!help" + + def test_empty_username(self): + assert _strip_bot_suffix("!help@bot", "") == "!help@bot" + + def test_plain_text(self): + assert _strip_bot_suffix("hello world", "mybot") == "hello world" + + +# --------------------------------------------------------------------------- +# TestTelegramBotReply +# --------------------------------------------------------------------------- + + +class TestTelegramBotReply: + def test_send_calls_api(self): + bot = _make_bot() + with patch.object(bot, "_api_call", return_value={"ok": True}): + asyncio.run(bot.send("-456", "hello")) + bot._api_call.assert_called_once_with( + "sendMessage", {"chat_id": "-456", "text": "hello"}) + + def test_reply_sends_to_target(self): + bot = _make_bot() + msg = _tg_msg(target="-456") + sent: list[tuple[str, str]] = [] + + async def _fake_send(target, text): + sent.append((target, text)) + + with patch.object(bot, "send", side_effect=_fake_send): + asyncio.run(bot.reply(msg, "pong")) + assert sent == [("-456", "pong")] + + def test_reply_no_target(self): + bot = _make_bot() + msg = _tg_msg(target=None) + msg.target = None + with patch.object(bot, "send") as mock_send: + asyncio.run(bot.reply(msg, "pong")) + mock_send.assert_not_called() + + def test_long_reply_under_threshold(self): + bot = _make_bot() + msg = _tg_msg() + sent: list[str] = [] + + async def _fake_send(target, text): + sent.append(text) + + with patch.object(bot, "send", side_effect=_fake_send): + asyncio.run(bot.long_reply(msg, ["a", "b", "c"])) + assert sent == ["a", "b", "c"] + + def test_long_reply_over_threshold_no_paste(self): + bot = _make_bot() + msg = _tg_msg() + sent: list[str] = [] + + async def _fake_send(target, text): + sent.append(text) + + with patch.object(bot, "send", side_effect=_fake_send): + asyncio.run(bot.long_reply(msg, ["a", "b", "c", "d", "e"])) + assert sent == ["a", "b", "c", "d", "e"] + + def test_long_reply_empty(self): + bot = _make_bot() + msg = _tg_msg() + with patch.object(bot, "send") as mock_send: + asyncio.run(bot.long_reply(msg, [])) + mock_send.assert_not_called() + + def test_action_format(self): + bot = _make_bot() + sent: list[tuple[str, str]] = [] + + async def _fake_send(target, text): + sent.append((target, text)) + + with patch.object(bot, "send", side_effect=_fake_send): + asyncio.run(bot.action("-456", "does a thing")) + assert sent == [("-456", "_does a thing_")] + + +# --------------------------------------------------------------------------- +# TestTelegramBotDispatch +# --------------------------------------------------------------------------- + + +class TestTelegramBotDispatch: + def test_dispatch_known_command(self): + bot = _make_bot() + bot.registry.register_command( + "echo", _echo_handler, help="echo", plugin="test") + msg = _tg_msg(text="!echo world") + sent: list[str] = [] + + async def _fake_send(target, text): + sent.append(text) + + with patch.object(bot, "send", side_effect=_fake_send): + asyncio.run(bot._dispatch_command(msg)) + assert sent == ["world"] + + def test_dispatch_unknown_command(self): + bot = _make_bot() + msg = _tg_msg(text="!nonexistent") + with patch.object(bot, "send") as mock_send: + asyncio.run(bot._dispatch_command(msg)) + mock_send.assert_not_called() + + def test_dispatch_no_prefix(self): + bot = _make_bot() + msg = _tg_msg(text="just a message") + with patch.object(bot, "send") as mock_send: + asyncio.run(bot._dispatch_command(msg)) + mock_send.assert_not_called() + + def test_dispatch_empty_text(self): + bot = _make_bot() + msg = _tg_msg(text="") + with patch.object(bot, "send") as mock_send: + asyncio.run(bot._dispatch_command(msg)) + mock_send.assert_not_called() + + def test_dispatch_none_text(self): + bot = _make_bot() + msg = _tg_msg() + msg.text = None + with patch.object(bot, "send") as mock_send: + asyncio.run(bot._dispatch_command(msg)) + mock_send.assert_not_called() + + def test_dispatch_ambiguous(self): + bot = _make_bot() + bot.registry.register_command("ping", _echo_handler, plugin="test") + bot.registry.register_command("plugins", _echo_handler, plugin="test") + msg = _tg_msg(text="!p") + sent: list[str] = [] + + async def _fake_send(target, text): + sent.append(text) + + with patch.object(bot, "send", side_effect=_fake_send): + asyncio.run(bot._dispatch_command(msg)) + assert len(sent) == 1 + assert "Ambiguous" in sent[0] + + def test_dispatch_tier_denied(self): + bot = _make_bot() + bot.registry.register_command( + "secret", _admin_handler, plugin="test", tier="admin") + msg = _tg_msg(text="!secret", user_id="999") + sent: list[str] = [] + + async def _fake_send(target, text): + sent.append(text) + + with patch.object(bot, "send", side_effect=_fake_send): + asyncio.run(bot._dispatch_command(msg)) + assert len(sent) == 1 + assert "Permission denied" in sent[0] + + def test_dispatch_tier_allowed(self): + bot = _make_bot(admins=[123]) + bot.registry.register_command( + "secret", _admin_handler, plugin="test", tier="admin") + msg = _tg_msg(text="!secret", user_id="123") + sent: list[str] = [] + + async def _fake_send(target, text): + sent.append(text) + + with patch.object(bot, "send", side_effect=_fake_send): + asyncio.run(bot._dispatch_command(msg)) + assert sent == ["admin action done"] + + def test_dispatch_prefix_match(self): + bot = _make_bot() + bot.registry.register_command("echo", _echo_handler, plugin="test") + msg = _tg_msg(text="!ec hello") + sent: list[str] = [] + + async def _fake_send(target, text): + sent.append(text) + + with patch.object(bot, "send", side_effect=_fake_send): + asyncio.run(bot._dispatch_command(msg)) + assert sent == ["hello"] + + +# --------------------------------------------------------------------------- +# TestTelegramBotTier +# --------------------------------------------------------------------------- + + +class TestTelegramBotTier: + def test_admin_tier(self): + bot = _make_bot(admins=[111]) + msg = _tg_msg(user_id="111") + assert bot._get_tier(msg) == "admin" + + def test_oper_tier(self): + bot = _make_bot(operators=[222]) + msg = _tg_msg(user_id="222") + assert bot._get_tier(msg) == "oper" + + def test_trusted_tier(self): + bot = _make_bot(trusted=[333]) + msg = _tg_msg(user_id="333") + assert bot._get_tier(msg) == "trusted" + + def test_user_tier_default(self): + bot = _make_bot() + msg = _tg_msg(user_id="999") + assert bot._get_tier(msg) == "user" + + def test_no_prefix(self): + bot = _make_bot(admins=[111]) + msg = _tg_msg() + msg.prefix = None + assert bot._get_tier(msg) == "user" + + def test_is_admin_true(self): + bot = _make_bot(admins=[111]) + msg = _tg_msg(user_id="111") + assert bot._is_admin(msg) is True + + def test_is_admin_false(self): + bot = _make_bot() + msg = _tg_msg(user_id="999") + assert bot._is_admin(msg) is False + + def test_priority_order(self): + """Admin takes priority over oper and trusted.""" + bot = _make_bot(admins=[111], operators=[111], trusted=[111]) + msg = _tg_msg(user_id="111") + assert bot._get_tier(msg) == "admin" + + +# --------------------------------------------------------------------------- +# TestTelegramBotNoOps +# --------------------------------------------------------------------------- + + +class TestTelegramBotNoOps: + def test_join_noop(self): + bot = _make_bot() + asyncio.run(bot.join("#channel")) + + def test_part_noop(self): + bot = _make_bot() + asyncio.run(bot.part("#channel", "reason")) + + def test_kick_noop(self): + bot = _make_bot() + asyncio.run(bot.kick("#channel", "nick", "reason")) + + def test_mode_noop(self): + bot = _make_bot() + asyncio.run(bot.mode("#channel", "+o", "nick")) + + def test_set_topic_noop(self): + bot = _make_bot() + asyncio.run(bot.set_topic("#channel", "new topic")) + + def test_quit_stops(self): + bot = _make_bot() + bot._running = True + asyncio.run(bot.quit()) + assert bot._running is False + + +# --------------------------------------------------------------------------- +# TestTelegramBotPoll +# --------------------------------------------------------------------------- + + +class TestTelegramBotPoll: + def test_poll_updates_parses(self): + bot = _make_bot() + result = { + "ok": True, + "result": [ + {"update_id": 100, "message": { + "message_id": 1, + "from": {"id": 123, "first_name": "Alice"}, + "chat": {"id": -456, "type": "group"}, + "text": "hello", + }}, + ], + } + with patch.object(bot, "_api_call", return_value=result): + updates = bot._poll_updates() + assert len(updates) == 1 + assert bot._offset == 101 + + def test_poll_updates_empty(self): + bot = _make_bot() + with patch.object(bot, "_api_call", + return_value={"ok": True, "result": []}): + updates = bot._poll_updates() + assert updates == [] + assert bot._offset == 0 + + def test_poll_updates_failed(self): + bot = _make_bot() + with patch.object(bot, "_api_call", + return_value={"ok": False, "description": "err"}): + updates = bot._poll_updates() + assert updates == [] + + def test_offset_advances(self): + bot = _make_bot() + result = { + "ok": True, + "result": [ + {"update_id": 50, "message": { + "message_id": 1, + "from": {"id": 1, "first_name": "A"}, + "chat": {"id": -1, "type": "group"}, + "text": "a", + }}, + {"update_id": 51, "message": { + "message_id": 2, + "from": {"id": 2, "first_name": "B"}, + "chat": {"id": -2, "type": "group"}, + "text": "b", + }}, + ], + } + with patch.object(bot, "_api_call", return_value=result): + bot._poll_updates() + assert bot._offset == 52 + + def test_start_getme_failure(self): + config = { + "telegram": { + "enabled": True, "bot_token": "t", "poll_timeout": 1, + "admins": [], "operators": [], "trusted": [], + }, + "bot": {"prefix": "!", "rate_limit": 2.0, "rate_burst": 5}, + } + bot = TelegramBot("tg-test", config, PluginRegistry()) + with patch.object(bot, "_api_call", + return_value={"ok": False}): + asyncio.run(bot.start()) + assert bot.nick == "" + + def test_start_getme_exception(self): + config = { + "telegram": { + "enabled": True, "bot_token": "t", "poll_timeout": 1, + "admins": [], "operators": [], "trusted": [], + }, + "bot": {"prefix": "!", "rate_limit": 2.0, "rate_burst": 5}, + } + bot = TelegramBot("tg-test", config, PluginRegistry()) + with patch.object(bot, "_api_call", + side_effect=Exception("network")): + asyncio.run(bot.start()) + assert bot.nick == "" + + +# --------------------------------------------------------------------------- +# TestTelegramApiCall +# --------------------------------------------------------------------------- + + +class TestTelegramApiCall: + def test_api_url(self): + bot = _make_bot(token="123:ABC") + url = bot._api_url("getMe") + assert url == "https://api.telegram.org/bot123:ABC/getMe" + + def test_api_call_get(self): + bot = _make_bot() + mock_resp = MagicMock() + mock_resp.read.return_value = b'{"ok": true, "result": {}}' + with patch("derp.telegram.http.urlopen", return_value=mock_resp): + result = bot._api_call("getMe") + assert result["ok"] is True + + def test_api_call_post(self): + bot = _make_bot() + mock_resp = MagicMock() + mock_resp.read.return_value = b'{"ok": true, "result": {}}' + with patch("derp.telegram.http.urlopen", return_value=mock_resp): + result = bot._api_call("sendMessage", {"chat_id": "1", "text": "hi"}) + assert result["ok"] is True + + def test_split_long_message(self): + # Build a message that exceeds 4096 bytes + lines = [f"line {i}: {'x' * 100}" for i in range(50)] + text = "\n".join(lines) + chunks = _split_message(text) + assert len(chunks) > 1 + for chunk in chunks: + assert len(chunk.encode("utf-8")) <= _MAX_MSG_LEN + + def test_short_message_no_split(self): + chunks = _split_message("hello world") + assert chunks == ["hello world"] + + def test_send_splits_long_text(self): + bot = _make_bot() + lines = [f"line {i}: {'x' * 100}" for i in range(50)] + text = "\n".join(lines) + calls: list[dict] = [] + + def _fake_api_call(method, payload=None): + if method == "sendMessage" and payload: + calls.append(payload) + return {"ok": True} + + with patch.object(bot, "_api_call", side_effect=_fake_api_call): + asyncio.run(bot.send("-456", text)) + assert len(calls) > 1 + for call in calls: + assert len(call["text"].encode("utf-8")) <= _MAX_MSG_LEN + + +# --------------------------------------------------------------------------- +# TestPluginManagement +# --------------------------------------------------------------------------- + + +class TestPluginManagement: + def test_load_plugin_not_found(self): + bot = _make_bot() + ok, msg = bot.load_plugin("nonexistent_xyz") + assert ok is False + assert "not found" in msg + + def test_load_plugin_already_loaded(self): + bot = _make_bot() + bot.registry._modules["test"] = object() + ok, msg = bot.load_plugin("test") + assert ok is False + assert "already loaded" in msg + + def test_unload_core_refused(self): + bot = _make_bot() + ok, msg = bot.unload_plugin("core") + assert ok is False + assert "cannot unload core" in msg + + def test_unload_not_loaded(self): + bot = _make_bot() + ok, msg = bot.unload_plugin("nonexistent") + assert ok is False + assert "not loaded" in msg + + def test_reload_delegates(self): + bot = _make_bot() + ok, msg = bot.reload_plugin("nonexistent") + assert ok is False + assert "not loaded" in msg + + +# --------------------------------------------------------------------------- +# TestSplitMessage +# --------------------------------------------------------------------------- + + +class TestSplitMessage: + def test_short_text(self): + assert _split_message("hi") == ["hi"] + + def test_exact_boundary(self): + text = "a" * 4096 + result = _split_message(text) + assert len(result) == 1 + + def test_multi_line_split(self): + lines = ["line " + str(i) for i in range(1000)] + text = "\n".join(lines) + chunks = _split_message(text) + assert len(chunks) > 1 + reassembled = "\n".join(chunks) + assert reassembled == text + + def test_empty_text(self): + assert _split_message("") == [""] + + +# --------------------------------------------------------------------------- +# TestTelegramBotConfig +# --------------------------------------------------------------------------- + + +class TestTelegramBotConfig: + def test_prefix_from_telegram_section(self): + config = { + "telegram": { + "enabled": True, + "bot_token": "t", + "poll_timeout": 1, + "prefix": "/", + "admins": [], + "operators": [], + "trusted": [], + }, + "bot": {"prefix": "!", "rate_limit": 2.0, "rate_burst": 5}, + } + bot = TelegramBot("test", config, PluginRegistry()) + assert bot.prefix == "/" + + def test_prefix_falls_back_to_bot(self): + config = { + "telegram": { + "enabled": True, + "bot_token": "t", + "poll_timeout": 1, + "admins": [], + "operators": [], + "trusted": [], + }, + "bot": {"prefix": "!", "rate_limit": 2.0, "rate_burst": 5}, + } + bot = TelegramBot("test", config, PluginRegistry()) + assert bot.prefix == "!" + + def test_admins_coerced_to_str(self): + bot = _make_bot(admins=[111, 222]) + assert bot._admins == ["111", "222"]