diff --git a/src/derp/cli.py b/src/derp/cli.py index 0670aac..9d25736 100644 --- a/src/derp/cli.py +++ b/src/derp/cli.py @@ -56,7 +56,7 @@ def build_parser() -> argparse.ArgumentParser: return p -def _run(bots: list[Bot]) -> None: +def _run(bots: list) -> None: """Run all bots concurrently with graceful SIGTERM handling.""" import signal @@ -71,12 +71,13 @@ def _run(bots: list[Bot]) -> None: logging.getLogger("derp").info("interrupted, shutting down") -def _shutdown(bots: list[Bot]) -> None: +def _shutdown(bots: list) -> None: """Signal handler: stop all bot loops so cProfile can flush.""" logging.getLogger("derp").info("SIGTERM received, shutting down") for bot in bots: bot._running = False - asyncio.get_running_loop().create_task(bot.conn.close()) + if hasattr(bot, "conn"): + asyncio.get_running_loop().create_task(bot.conn.close()) def _dump_tracemalloc(log: logging.Logger, path: str, limit: int = 25) -> None: @@ -125,7 +126,7 @@ def main(argv: list[str] | None = None) -> int: server_configs = build_server_configs(config) registry = PluginRegistry() - bots: list[Bot] = [] + bots: list = [] for name, srv_config in server_configs.items(): bot = Bot(name, srv_config, registry) bots.append(bot) @@ -133,6 +134,13 @@ def main(argv: list[str] | None = None) -> int: # Load plugins once (shared registry) bots[0].load_plugins() + # Teams adapter (optional) + if config.get("teams", {}).get("enabled"): + from derp.teams import TeamsBot + + teams_bot = TeamsBot("teams", config, registry) + bots.append(teams_bot) + names = ", ".join(b.name for b in bots) log.info("servers: %s", names) diff --git a/src/derp/config.py b/src/derp/config.py index 15b6dce..8dc4ded 100644 --- a/src/derp/config.py +++ b/src/derp/config.py @@ -39,6 +39,17 @@ DEFAULTS: dict = { "port": 8080, "secret": "", }, + "teams": { + "enabled": False, + "bot_name": "derp", + "bind": "127.0.0.1", + "port": 8081, + "webhook_secret": "", + "incoming_webhook_url": "", + "admins": [], + "operators": [], + "trusted": [], + }, "logging": { "level": "info", "format": "text", diff --git a/src/derp/teams.py b/src/derp/teams.py new file mode 100644 index 0000000..4ff322a --- /dev/null +++ b/src/derp/teams.py @@ -0,0 +1,528 @@ +"""Microsoft Teams adapter: outgoing webhook receiver + incoming webhook sender.""" + +from __future__ import annotations + +import asyncio +import base64 +import hashlib +import hmac +import json +import logging +import re +import time +import urllib.request +from dataclasses import dataclass, field +from pathlib import Path + +from derp.bot import _TokenBucket +from derp.plugin import TIERS, PluginRegistry +from derp.state import StateStore + +log = logging.getLogger(__name__) + +_MAX_BODY = 65536 # 64 KB +_AMBIGUOUS = object() # sentinel for ambiguous prefix matches + + +@dataclass(slots=True) +class TeamsMessage: + """Parsed Teams Activity message, duck-typed with IRC Message. + + Plugins that use only ``msg.nick``, ``msg.text``, ``msg.target``, + ``msg.is_channel``, ``msg.prefix``, ``msg.command``, ``msg.params``, + and ``msg.tags`` work without modification. + """ + + raw: dict + nick: str | None + prefix: str | None # AAD object ID (for ACL matching) + text: str | None + target: str | None # conversation/channel ID + is_channel: bool = True # outgoing webhooks are always channels + command: str = "PRIVMSG" # compatibility shim + params: list[str] = field(default_factory=list) + tags: dict[str, str] = field(default_factory=dict) + _replies: list[str] = field(default_factory=list, repr=False) + + +# -- Helpers ----------------------------------------------------------------- + + +def _verify_hmac(secret: str, body: bytes, auth_header: str) -> bool: + """Verify Teams outgoing webhook HMAC-SHA256 signature. + + The secret is base64-encoded. The Authorization header format is + ``HMAC ``. + """ + if not secret: + return True # no secret configured = open access + if not auth_header.startswith("HMAC "): + return False + try: + key = base64.b64decode(secret) + except Exception: + log.error("teams: invalid base64 webhook secret") + return False + expected = base64.b64encode( + hmac.new(key, body, hashlib.sha256).digest(), + ).decode("ascii") + return hmac.compare_digest(expected, auth_header[5:]) + + +def _strip_mention(text: str, bot_name: str) -> str: + """Strip ``BotName`` prefix from message text.""" + return re.sub(r"[^<]*\s*", "", text).strip() + + +def _parse_activity(body: bytes) -> dict | None: + """Parse Teams Activity JSON. Returns None on failure.""" + try: + data = json.loads(body) + except (json.JSONDecodeError, UnicodeDecodeError): + return None + if not isinstance(data, dict): + return None + return data + + +def _build_teams_message(activity: dict, bot_name: str) -> TeamsMessage: + """Build a TeamsMessage from a Teams Activity dict.""" + sender = activity.get("from", {}) + conv = activity.get("conversation", {}) + nick = sender.get("name") + prefix = sender.get("aadObjectId") + raw_text = activity.get("text", "") + text = _strip_mention(raw_text, bot_name) + target = conv.get("id") + return TeamsMessage( + raw=activity, + nick=nick, + prefix=prefix, + text=text, + target=target, + params=[target or "", text] if target else [text], + ) + + +def _http_response(status: int, reason: str, body: str = "", + content_type: str = "text/plain; charset=utf-8") -> bytes: + """Build a minimal HTTP/1.1 response.""" + body_bytes = body.encode("utf-8") if body else b"" + lines = [ + f"HTTP/1.1 {status} {reason}", + f"Content-Type: {content_type}", + f"Content-Length: {len(body_bytes)}", + "Connection: close", + "", + "", + ] + return "\r\n".join(lines).encode("utf-8") + body_bytes + + +def _json_response(status: int, reason: str, data: dict) -> bytes: + """Build an HTTP/1.1 JSON response.""" + body = json.dumps(data) + return _http_response(status, reason, body, "application/json") + + +# -- TeamsBot ---------------------------------------------------------------- + + +class TeamsBot: + """Microsoft Teams bot adapter via outgoing/incoming webhooks. + + Exposes the same public API as :class:`derp.bot.Bot` so that + protocol-agnostic plugins work without modification. + """ + + def __init__(self, name: str, config: dict, registry: PluginRegistry) -> None: + self.name = name + self.config = config + self.registry = registry + self._pstate: dict = {} + + teams_cfg = config.get("teams", {}) + self.nick: str = teams_cfg.get("bot_name", "derp") + self.prefix: str = config.get("bot", {}).get("prefix", "!") + self._running = False + self._started: float = time.monotonic() + self._tasks: set[asyncio.Task] = set() + self._admins: list[str] = teams_cfg.get("admins", []) + self._operators: list[str] = teams_cfg.get("operators", []) + self._trusted: list[str] = teams_cfg.get("trusted", []) + self.state = StateStore(f"data/state-{name}.db") + self._server: asyncio.Server | None = None + + self._webhook_secret: str = teams_cfg.get("webhook_secret", "") + self._incoming_url: str = teams_cfg.get("incoming_webhook_url", "") + self._bind: str = teams_cfg.get("bind", "127.0.0.1") + self._port: int = teams_cfg.get("port", 8081) + + rate_cfg = config.get("bot", {}) + self._bucket = _TokenBucket( + rate=rate_cfg.get("rate_limit", 2.0), + burst=rate_cfg.get("rate_burst", 5), + ) + + # -- Lifecycle ----------------------------------------------------------- + + async def start(self) -> None: + """Start the HTTP server for receiving outgoing webhooks.""" + self._running = True + try: + self._server = await asyncio.start_server( + self._handle_connection, self._bind, self._port, + ) + except OSError as exc: + log.error("teams: failed to bind %s:%d: %s", + self._bind, self._port, exc) + return + log.info("teams: listening on %s:%d", self._bind, self._port) + try: + while self._running: + await asyncio.sleep(1) + finally: + self._server.close() + await self._server.wait_closed() + log.info("teams: stopped") + + # -- HTTP server --------------------------------------------------------- + + async def _handle_connection( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, + ) -> None: + """Handle a single HTTP connection from Teams.""" + try: + # Read request line + request_line = await asyncio.wait_for( + reader.readline(), timeout=10.0) + if not request_line: + return + parts = request_line.decode("utf-8", errors="replace").strip().split() + if len(parts) < 2: + writer.write(_http_response(400, "Bad Request", + "malformed request")) + return + method, path = parts[0], parts[1] + + # Read headers + headers: dict[str, str] = {} + while True: + line = await asyncio.wait_for(reader.readline(), timeout=10.0) + if not line or line == b"\r\n" or line == b"\n": + break + decoded = line.decode("utf-8", errors="replace").strip() + if ":" in decoded: + key, val = decoded.split(":", 1) + headers[key.strip().lower()] = val.strip() + + # Method check + if method != "POST": + writer.write(_http_response(405, "Method Not Allowed", + "POST only")) + return + + # Path check + if path != "/api/messages": + writer.write(_http_response(404, "Not Found")) + return + + # Read body + content_length = int(headers.get("content-length", "0")) + if content_length > _MAX_BODY: + writer.write(_http_response(413, "Payload Too Large", + f"max {_MAX_BODY} bytes")) + return + body = await asyncio.wait_for( + reader.readexactly(content_length), timeout=10.0) + + # Verify HMAC signature + auth = headers.get("authorization", "") + if not _verify_hmac(self._webhook_secret, body, auth): + writer.write(_http_response(401, "Unauthorized", + "bad signature")) + return + + # Parse Activity JSON + activity = _parse_activity(body) + if activity is None: + writer.write(_http_response(400, "Bad Request", + "invalid JSON")) + return + + # Only handle message activities + if activity.get("type") != "message": + writer.write(_json_response(200, "OK", + {"type": "message", "text": ""})) + return + + # Build message and dispatch + msg = _build_teams_message(activity, self.nick) + await self._dispatch_command(msg) + + # Collect replies + reply_text = "\n".join(msg._replies) if msg._replies else "" + writer.write(_json_response(200, "OK", { + "type": "message", + "text": reply_text, + })) + + except (asyncio.TimeoutError, asyncio.IncompleteReadError, + ConnectionError): + log.debug("teams: client disconnected") + except Exception: + log.exception("teams: error handling request") + try: + writer.write(_http_response(500, "Internal Server Error")) + except Exception: + pass + finally: + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + + # -- Command dispatch ---------------------------------------------------- + + async def _dispatch_command(self, msg: TeamsMessage) -> None: + """Parse and dispatch a command from a Teams message.""" + text = msg.text + if not text or not text.startswith(self.prefix): + return + + parts = text[len(self.prefix):].split(None, 1) + cmd_name = parts[0].lower() if parts else "" + handler = self._resolve_command(cmd_name) + if handler is None: + return + if handler is _AMBIGUOUS: + matches = [k for k in self.registry.commands + if k.startswith(cmd_name)] + names = ", ".join(self.prefix + m for m in sorted(matches)) + msg._replies.append( + f"Ambiguous command '{self.prefix}{cmd_name}': {names}") + return + + if not self._plugin_allowed(handler.plugin, msg.target): + return + + required = handler.tier + if required != "user": + sender = self._get_tier(msg) + if TIERS.index(sender) < TIERS.index(required): + msg._replies.append( + f"Permission denied: {self.prefix}{cmd_name} " + f"requires {required}") + return + + try: + await handler.callback(self, msg) + except Exception: + log.exception("teams: error in command handler '%s'", cmd_name) + + def _resolve_command(self, name: str): + """Resolve command name with unambiguous prefix matching. + + Returns the Handler on exact or unique prefix match, the sentinel + ``_AMBIGUOUS`` if multiple commands match, or None if nothing matches. + """ + handler = self.registry.commands.get(name) + if handler is not None: + return handler + matches = [v for k, v in self.registry.commands.items() + if k.startswith(name)] + if len(matches) == 1: + return matches[0] + if len(matches) > 1: + return _AMBIGUOUS + return None + + def _plugin_allowed(self, plugin_name: str, channel: str | None) -> bool: + """Channel filtering is IRC-only; all plugins are allowed on Teams.""" + return True + + # -- Permission tiers ---------------------------------------------------- + + def _get_tier(self, msg: TeamsMessage) -> str: + """Determine permission tier from AAD object ID. + + Unlike IRC (fnmatch hostmask patterns), Teams matches exact + AAD object IDs from the ``teams.admins``, ``teams.operators``, + and ``teams.trusted`` config lists. + """ + if not msg.prefix: + return "user" + for aad_id in self._admins: + if msg.prefix == aad_id: + return "admin" + for aad_id in self._operators: + if msg.prefix == aad_id: + return "oper" + for aad_id in self._trusted: + if msg.prefix == aad_id: + return "trusted" + return "user" + + def _is_admin(self, msg: TeamsMessage) -> bool: + """Check if the message sender is a bot admin.""" + return self._get_tier(msg) == "admin" + + # -- Public API for plugins ---------------------------------------------- + + async def send(self, target: str, text: str) -> None: + """Send a message via incoming webhook (proactive messages). + + Requires ``teams.incoming_webhook_url`` to be configured. + Does nothing if no URL is set. + """ + if not self._incoming_url: + log.debug("teams: send() skipped, no incoming_webhook_url") + return + await self._bucket.acquire() + payload = json.dumps({"text": text}).encode("utf-8") + req = urllib.request.Request( + self._incoming_url, + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + loop = asyncio.get_running_loop() + try: + await loop.run_in_executor(None, urllib.request.urlopen, req) + except Exception: + log.exception("teams: failed to send via incoming webhook") + + async def reply(self, msg, text: str) -> None: + """Reply by appending to the message reply buffer. + + Collected replies are returned as the HTTP response body. + """ + msg._replies.append(text) + + async def long_reply( + self, msg, lines: list[str], *, + label: str = "", + ) -> None: + """Reply with a list of lines; paste overflow to FlaskPaste. + + Same overflow logic as :meth:`derp.bot.Bot.long_reply` but + appends to the reply buffer instead of sending via IRC. + """ + threshold = self.config.get("bot", {}).get("paste_threshold", 4) + if not lines: + return + + if len(lines) <= threshold: + for line in lines: + msg._replies.append(line) + return + + # Attempt paste overflow + fp = self.registry._modules.get("flaskpaste") + paste_url = None + if fp: + full_text = "\n".join(lines) + loop = asyncio.get_running_loop() + paste_url = await loop.run_in_executor( + None, fp.create_paste, self, full_text, + ) + + if paste_url: + preview_count = min(2, threshold - 1) + for line in lines[:preview_count]: + msg._replies.append(line) + remaining = len(lines) - preview_count + suffix = f" ({label})" if label else "" + msg._replies.append( + f"... {remaining} more lines{suffix}: {paste_url}") + else: + for line in lines: + msg._replies.append(line) + + async def action(self, target: str, text: str) -> None: + """Send an action as italic text via incoming webhook.""" + await self.send(target, f"_{text}_") + + async def shorten_url(self, url: str) -> str: + """Shorten a URL via FlaskPaste. Returns original on failure.""" + fp = self.registry._modules.get("flaskpaste") + if not fp: + return url + loop = asyncio.get_running_loop() + try: + return await loop.run_in_executor(None, fp.shorten_url, self, url) + except Exception: + return url + + # -- IRC no-ops ---------------------------------------------------------- + + async def join(self, channel: str) -> None: + """No-op: IRC-only concept.""" + log.debug("teams: join() is a no-op") + + async def part(self, channel: str, reason: str = "") -> None: + """No-op: IRC-only concept.""" + log.debug("teams: part() is a no-op") + + async def quit(self, reason: str = "bye") -> None: + """Stop the Teams adapter.""" + self._running = False + + async def kick(self, channel: str, nick: str, reason: str = "") -> None: + """No-op: IRC-only concept.""" + log.debug("teams: kick() is a no-op") + + async def mode(self, target: str, mode_str: str, *args: str) -> None: + """No-op: IRC-only concept.""" + log.debug("teams: mode() is a no-op") + + async def set_topic(self, channel: str, topic: str) -> None: + """No-op: IRC-only concept.""" + log.debug("teams: set_topic() is a no-op") + + # -- Plugin management (delegated to registry) --------------------------- + + def load_plugins(self, plugins_dir: str | Path | None = None) -> None: + """Load plugins from the configured directory.""" + if plugins_dir is None: + plugins_dir = self.config.get("bot", {}).get( + "plugins_dir", "plugins") + path = Path(plugins_dir) + self.registry.load_directory(path) + + @property + def plugins_dir(self) -> Path: + """Resolved path to the plugins directory.""" + return Path(self.config.get("bot", {}).get("plugins_dir", "plugins")) + + def load_plugin(self, name: str) -> tuple[bool, str]: + """Hot-load a new plugin by name from the plugins directory.""" + if name in self.registry._modules: + return False, f"plugin already loaded: {name}" + path = self.plugins_dir / f"{name}.py" + if not path.is_file(): + return False, f"{name}.py not found" + count = self.registry.load_plugin(path) + if count < 0: + return False, f"failed to load {name}" + return True, f"{count} handlers" + + def reload_plugin(self, name: str) -> tuple[bool, str]: + """Reload a plugin, picking up any file changes.""" + return self.registry.reload_plugin(name) + + def unload_plugin(self, name: str) -> tuple[bool, str]: + """Unload a plugin, removing all its handlers.""" + if self.registry.unload_plugin(name): + return True, "" + if name == "core": + return False, "cannot unload core" + return False, f"plugin not loaded: {name}" + + def _spawn(self, coro, *, name: str | None = None) -> asyncio.Task: + """Spawn a background task and track it for cleanup.""" + task = asyncio.create_task(coro, name=name) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) + return task diff --git a/tests/test_teams.py b/tests/test_teams.py new file mode 100644 index 0000000..831a19f --- /dev/null +++ b/tests/test_teams.py @@ -0,0 +1,734 @@ +"""Tests for the Microsoft Teams adapter.""" + +import asyncio +import base64 +import hashlib +import hmac +import json + +from derp.plugin import PluginRegistry +from derp.teams import ( + _MAX_BODY, + TeamsBot, + TeamsMessage, + _build_teams_message, + _http_response, + _json_response, + _parse_activity, + _strip_mention, + _verify_hmac, +) + +# -- Helpers ----------------------------------------------------------------- + + +def _make_bot(secret="", admins=None, operators=None, trusted=None, + incoming_url=""): + """Create a TeamsBot with test config.""" + config = { + "teams": { + "enabled": True, + "bot_name": "derp", + "bind": "127.0.0.1", + "port": 0, + "webhook_secret": secret, + "incoming_webhook_url": incoming_url, + "admins": admins or [], + "operators": operators or [], + "trusted": trusted or [], + }, + "bot": { + "prefix": "!", + "paste_threshold": 4, + "plugins_dir": "plugins", + "rate_limit": 2.0, + "rate_burst": 5, + }, + } + registry = PluginRegistry() + return TeamsBot("teams-test", config, registry) + + +def _activity(text="hello", nick="Alice", aad_id="aad-123", + conv_id="conv-456", msg_type="message"): + """Build a minimal Teams Activity dict.""" + return { + "type": msg_type, + "from": {"name": nick, "aadObjectId": aad_id}, + "conversation": {"id": conv_id}, + "text": text, + } + + +def _teams_msg(text="!ping", nick="Alice", aad_id="aad-123", + target="conv-456"): + """Create a TeamsMessage for command testing.""" + return TeamsMessage( + raw={}, nick=nick, prefix=aad_id, text=text, target=target, + params=[target, text], + ) + + +def _sign_teams(secret: str, body: bytes) -> str: + """Generate Teams HMAC-SHA256 Authorization header value.""" + key = base64.b64decode(secret) + sig = base64.b64encode( + hmac.new(key, body, hashlib.sha256).digest(), + ).decode("ascii") + return f"HMAC {sig}" + + +class _FakeReader: + """Mock asyncio.StreamReader from raw HTTP bytes.""" + + def __init__(self, data: bytes) -> None: + self._data = data + self._pos = 0 + + async def readline(self) -> bytes: + start = self._pos + idx = self._data.find(b"\n", start) + if idx == -1: + self._pos = len(self._data) + return self._data[start:] + self._pos = idx + 1 + return self._data[start:self._pos] + + async def readexactly(self, n: int) -> bytes: + chunk = self._data[self._pos:self._pos + n] + self._pos += n + return chunk + + +class _FakeWriter: + """Mock asyncio.StreamWriter that captures output.""" + + def __init__(self) -> None: + self.data = b"" + self._closed = False + + def write(self, data: bytes) -> None: + self.data += data + + def close(self) -> None: + self._closed = True + + async def wait_closed(self) -> None: + pass + + +def _build_request(method: str, path: str, body: bytes, + headers: dict[str, str] | None = None) -> bytes: + """Build raw HTTP request bytes.""" + hdrs = headers or {} + if "Content-Length" not in hdrs: + hdrs["Content-Length"] = str(len(body)) + lines = [f"{method} {path} HTTP/1.1"] + for k, v in hdrs.items(): + lines.append(f"{k}: {v}") + lines.append("") + lines.append("") + return "\r\n".join(lines).encode("utf-8") + body + + +# -- Test helpers for registering commands ----------------------------------- + + +async def _echo_handler(bot, msg): + """Simple command handler that echoes text.""" + args = msg.text.split(None, 1) + reply = args[1] if len(args) > 1 else "no args" + await bot.reply(msg, reply) + + +async def _admin_handler(bot, msg): + """Admin-only command handler.""" + await bot.reply(msg, "admin action done") + + +# --------------------------------------------------------------------------- +# TestTeamsMessage +# --------------------------------------------------------------------------- + + +class TestTeamsMessage: + def test_defaults(self): + msg = TeamsMessage(raw={}, nick=None, prefix=None, text=None, + target=None) + assert msg.is_channel is True + assert msg.command == "PRIVMSG" + assert msg.params == [] + assert msg.tags == {} + assert msg._replies == [] + + def test_custom_values(self): + msg = TeamsMessage( + raw={"type": "message"}, nick="Alice", prefix="aad-123", + text="hello", target="conv-456", is_channel=True, + command="PRIVMSG", params=["conv-456", "hello"], + tags={"key": "val"}, + ) + assert msg.nick == "Alice" + assert msg.prefix == "aad-123" + assert msg.text == "hello" + assert msg.target == "conv-456" + assert msg.tags == {"key": "val"} + + def test_duck_type_compat(self): + """TeamsMessage has the same attribute names as IRC Message.""" + msg = _teams_msg() + attrs = ["raw", "nick", "prefix", "text", "target", + "is_channel", "command", "params", "tags"] + for attr in attrs: + assert hasattr(msg, attr), f"missing attribute: {attr}" + + def test_replies_buffer(self): + msg = _teams_msg() + assert msg._replies == [] + msg._replies.append("pong") + msg._replies.append("line2") + assert len(msg._replies) == 2 + + def test_raw_dict(self): + activity = {"type": "message", "id": "123"} + msg = TeamsMessage(raw=activity, nick=None, prefix=None, + text=None, target=None) + assert msg.raw is activity + + def test_prefix_is_aad_id(self): + msg = _teams_msg(aad_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx") + assert msg.prefix == "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" + + +# --------------------------------------------------------------------------- +# TestVerifyHmac +# --------------------------------------------------------------------------- + + +class TestVerifyHmac: + def test_valid_signature(self): + # base64-encoded secret + secret = base64.b64encode(b"test-secret").decode() + body = b'{"type":"message","text":"hello"}' + auth = _sign_teams(secret, body) + assert _verify_hmac(secret, body, auth) is True + + def test_invalid_signature(self): + secret = base64.b64encode(b"test-secret").decode() + body = b'{"type":"message","text":"hello"}' + assert _verify_hmac(secret, body, "HMAC badsignature") is False + + def test_missing_hmac_prefix(self): + secret = base64.b64encode(b"test-secret").decode() + body = b'{"text":"hello"}' + # No "HMAC " prefix + key = base64.b64decode(secret) + sig = base64.b64encode( + hmac.new(key, body, hashlib.sha256).digest() + ).decode() + assert _verify_hmac(secret, body, sig) is False + + def test_empty_secret_allows_all(self): + assert _verify_hmac("", b"any body", "") is True + assert _verify_hmac("", b"any body", "HMAC whatever") is True + + def test_invalid_base64_secret(self): + assert _verify_hmac("not-valid-b64!!!", b"body", "HMAC x") is False + + +# --------------------------------------------------------------------------- +# TestStripMention +# --------------------------------------------------------------------------- + + +class TestStripMention: + def test_strip_at_mention(self): + assert _strip_mention("derp !help", "derp") == "!help" + + def test_strip_with_extra_spaces(self): + assert _strip_mention("derp !ping", "derp") == "!ping" + + def test_no_mention(self): + assert _strip_mention("!help", "derp") == "!help" + + def test_multiple_mentions(self): + text = "derp hello other world" + assert _strip_mention(text, "derp") == "hello world" + + def test_empty_text(self): + assert _strip_mention("", "derp") == "" + + def test_mention_only(self): + assert _strip_mention("derp", "derp") == "" + + +# --------------------------------------------------------------------------- +# TestParseActivity +# --------------------------------------------------------------------------- + + +class TestParseActivity: + def test_valid_activity(self): + body = json.dumps({"type": "message", "text": "hello"}).encode() + result = _parse_activity(body) + assert result == {"type": "message", "text": "hello"} + + def test_invalid_json(self): + assert _parse_activity(b"not json") is None + + def test_not_a_dict(self): + assert _parse_activity(b'["array"]') is None + + def test_empty_body(self): + assert _parse_activity(b"") is None + + def test_unicode_error(self): + assert _parse_activity(b"\xff\xfe") is None + + +# --------------------------------------------------------------------------- +# TestBuildTeamsMessage +# --------------------------------------------------------------------------- + + +class TestBuildTeamsMessage: + def test_basic_message(self): + activity = _activity(text="derp !ping") + msg = _build_teams_message(activity, "derp") + assert msg.nick == "Alice" + assert msg.prefix == "aad-123" + assert msg.text == "!ping" + assert msg.target == "conv-456" + assert msg.is_channel is True + assert msg.command == "PRIVMSG" + + def test_strips_mention(self): + activity = _activity(text="Bot !help commands") + msg = _build_teams_message(activity, "Bot") + assert msg.text == "!help commands" + + def test_missing_from(self): + activity = {"type": "message", "text": "hello", + "conversation": {"id": "conv"}} + msg = _build_teams_message(activity, "derp") + assert msg.nick is None + assert msg.prefix is None + + def test_missing_conversation(self): + activity = {"type": "message", "text": "hello", + "from": {"name": "Alice", "aadObjectId": "aad"}} + msg = _build_teams_message(activity, "derp") + assert msg.target is None + + def test_raw_preserved(self): + activity = _activity() + msg = _build_teams_message(activity, "derp") + assert msg.raw is activity + + def test_params_populated(self): + activity = _activity(text="derp !test arg") + msg = _build_teams_message(activity, "derp") + assert msg.params[0] == "conv-456" + assert msg.params[1] == "!test arg" + + +# --------------------------------------------------------------------------- +# TestTeamsBotReply +# --------------------------------------------------------------------------- + + +class TestTeamsBotReply: + def test_reply_appends(self): + bot = _make_bot() + msg = _teams_msg() + asyncio.run(bot.reply(msg, "pong")) + assert msg._replies == ["pong"] + + def test_multi_reply(self): + bot = _make_bot() + msg = _teams_msg() + + async def _run(): + await bot.reply(msg, "line 1") + await bot.reply(msg, "line 2") + await bot.reply(msg, "line 3") + + asyncio.run(_run()) + assert msg._replies == ["line 1", "line 2", "line 3"] + + def test_long_reply_under_threshold(self): + bot = _make_bot() + msg = _teams_msg() + lines = ["a", "b", "c"] + asyncio.run(bot.long_reply(msg, lines)) + assert msg._replies == ["a", "b", "c"] + + def test_long_reply_over_threshold_no_paste(self): + """Over threshold with no FlaskPaste sends all lines.""" + bot = _make_bot() + msg = _teams_msg() + lines = ["a", "b", "c", "d", "e", "f"] # 6 > threshold of 4 + asyncio.run(bot.long_reply(msg, lines)) + assert msg._replies == lines + + def test_long_reply_empty(self): + bot = _make_bot() + msg = _teams_msg() + asyncio.run(bot.long_reply(msg, [])) + assert msg._replies == [] + + def test_action_format(self): + """action() maps to italic text via send().""" + bot = _make_bot(incoming_url="http://example.com/hook") + # action sends to incoming webhook; without actual URL it logs debug + bot._incoming_url = "" + asyncio.run(bot.action("conv", "does a thing")) + # No incoming URL, so send() is a no-op (debug log) + + def test_send_no_incoming_url(self): + """send() is a no-op when no incoming_webhook_url is configured.""" + bot = _make_bot() + # Should not raise + asyncio.run(bot.send("target", "text")) + + +# --------------------------------------------------------------------------- +# TestTeamsBotTier +# --------------------------------------------------------------------------- + + +class TestTeamsBotTier: + def test_admin_tier(self): + bot = _make_bot(admins=["aad-admin"]) + msg = _teams_msg(aad_id="aad-admin") + assert bot._get_tier(msg) == "admin" + + def test_oper_tier(self): + bot = _make_bot(operators=["aad-oper"]) + msg = _teams_msg(aad_id="aad-oper") + assert bot._get_tier(msg) == "oper" + + def test_trusted_tier(self): + bot = _make_bot(trusted=["aad-trusted"]) + msg = _teams_msg(aad_id="aad-trusted") + assert bot._get_tier(msg) == "trusted" + + def test_user_tier_default(self): + bot = _make_bot() + msg = _teams_msg(aad_id="aad-unknown") + assert bot._get_tier(msg) == "user" + + def test_no_prefix(self): + bot = _make_bot(admins=["aad-admin"]) + msg = _teams_msg(aad_id=None) + msg.prefix = None + assert bot._get_tier(msg) == "user" + + def test_is_admin_true(self): + bot = _make_bot(admins=["aad-admin"]) + msg = _teams_msg(aad_id="aad-admin") + assert bot._is_admin(msg) is True + + def test_is_admin_false(self): + bot = _make_bot() + msg = _teams_msg(aad_id="aad-nobody") + assert bot._is_admin(msg) is False + + def test_priority_order(self): + """Admin takes priority over oper and trusted.""" + bot = _make_bot(admins=["aad-x"], operators=["aad-x"], + trusted=["aad-x"]) + msg = _teams_msg(aad_id="aad-x") + assert bot._get_tier(msg) == "admin" + + +# --------------------------------------------------------------------------- +# TestTeamsBotDispatch +# --------------------------------------------------------------------------- + + +class TestTeamsBotDispatch: + def test_dispatch_known_command(self): + bot = _make_bot() + bot.registry.register_command( + "echo", _echo_handler, help="echo", plugin="test") + msg = _teams_msg(text="!echo world") + asyncio.run(bot._dispatch_command(msg)) + assert msg._replies == ["world"] + + def test_dispatch_unknown_command(self): + bot = _make_bot() + msg = _teams_msg(text="!nonexistent") + asyncio.run(bot._dispatch_command(msg)) + assert msg._replies == [] + + def test_dispatch_no_prefix(self): + bot = _make_bot() + msg = _teams_msg(text="just a message") + asyncio.run(bot._dispatch_command(msg)) + assert msg._replies == [] + + def test_dispatch_empty_text(self): + bot = _make_bot() + msg = _teams_msg(text="") + asyncio.run(bot._dispatch_command(msg)) + assert msg._replies == [] + + def test_dispatch_none_text(self): + bot = _make_bot() + msg = _teams_msg(text=None) + msg.text = None + asyncio.run(bot._dispatch_command(msg)) + assert msg._replies == [] + + def test_dispatch_ambiguous(self): + bot = _make_bot() + bot.registry.register_command( + "ping", _echo_handler, plugin="test") + bot.registry.register_command( + "plugins", _echo_handler, plugin="test") + msg = _teams_msg(text="!p") + asyncio.run(bot._dispatch_command(msg)) + assert len(msg._replies) == 1 + assert "Ambiguous" in msg._replies[0] + + def test_dispatch_tier_denied(self): + bot = _make_bot() + bot.registry.register_command( + "secret", _admin_handler, plugin="test", tier="admin") + msg = _teams_msg(text="!secret", aad_id="aad-nobody") + asyncio.run(bot._dispatch_command(msg)) + assert len(msg._replies) == 1 + assert "Permission denied" in msg._replies[0] + + def test_dispatch_tier_allowed(self): + bot = _make_bot(admins=["aad-admin"]) + bot.registry.register_command( + "secret", _admin_handler, plugin="test", tier="admin") + msg = _teams_msg(text="!secret", aad_id="aad-admin") + asyncio.run(bot._dispatch_command(msg)) + assert msg._replies == ["admin action done"] + + def test_dispatch_prefix_match(self): + """Unambiguous prefix resolves to the full command.""" + bot = _make_bot() + bot.registry.register_command( + "echo", _echo_handler, plugin="test") + msg = _teams_msg(text="!ec hello") + asyncio.run(bot._dispatch_command(msg)) + assert msg._replies == ["hello"] + + +# --------------------------------------------------------------------------- +# TestTeamsBotNoOps +# --------------------------------------------------------------------------- + + +class TestTeamsBotNoOps: + def test_join_noop(self): + bot = _make_bot() + asyncio.run(bot.join("#channel")) + + def test_part_noop(self): + bot = _make_bot() + asyncio.run(bot.part("#channel", "reason")) + + def test_kick_noop(self): + bot = _make_bot() + asyncio.run(bot.kick("#channel", "nick", "reason")) + + def test_mode_noop(self): + bot = _make_bot() + asyncio.run(bot.mode("#channel", "+o", "nick")) + + def test_set_topic_noop(self): + bot = _make_bot() + asyncio.run(bot.set_topic("#channel", "new topic")) + + def test_quit_stops(self): + bot = _make_bot() + bot._running = True + asyncio.run(bot.quit()) + assert bot._running is False + + +# --------------------------------------------------------------------------- +# TestHTTPHandler +# --------------------------------------------------------------------------- + + +class TestHTTPHandler: + def _b64_secret(self): + return base64.b64encode(b"test-secret-key").decode() + + def test_valid_post_with_reply(self): + secret = self._b64_secret() + bot = _make_bot(secret=secret) + bot.registry.register_command( + "ping", _echo_handler, plugin="test") + activity = _activity(text="derp !ping") + body = json.dumps(activity).encode() + auth = _sign_teams(secret, body) + raw = _build_request("POST", "/api/messages", body, { + "Content-Length": str(len(body)), + "Content-Type": "application/json", + "Authorization": auth, + }) + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(bot._handle_connection(reader, writer)) + assert b"200 OK" in writer.data + resp_body = writer.data.split(b"\r\n\r\n", 1)[1] + data = json.loads(resp_body) + assert data["type"] == "message" + + def test_get_405(self): + bot = _make_bot() + raw = _build_request("GET", "/api/messages", b"") + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(bot._handle_connection(reader, writer)) + assert b"405" in writer.data + + def test_wrong_path_404(self): + bot = _make_bot() + raw = _build_request("POST", "/wrong/path", b"") + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(bot._handle_connection(reader, writer)) + assert b"404" in writer.data + + def test_bad_signature_401(self): + secret = self._b64_secret() + bot = _make_bot(secret=secret) + body = json.dumps(_activity()).encode() + raw = _build_request("POST", "/api/messages", body, { + "Content-Length": str(len(body)), + "Authorization": "HMAC badsignature", + }) + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(bot._handle_connection(reader, writer)) + assert b"401" in writer.data + + def test_bad_json_400(self): + bot = _make_bot() + body = b"not json at all" + raw = _build_request("POST", "/api/messages", body, { + "Content-Length": str(len(body)), + }) + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(bot._handle_connection(reader, writer)) + assert b"400" in writer.data + assert b"invalid JSON" in writer.data + + def test_non_message_activity(self): + bot = _make_bot() + body = json.dumps({"type": "conversationUpdate"}).encode() + raw = _build_request("POST", "/api/messages", body, { + "Content-Length": str(len(body)), + }) + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(bot._handle_connection(reader, writer)) + assert b"200 OK" in writer.data + resp_body = writer.data.split(b"\r\n\r\n", 1)[1] + data = json.loads(resp_body) + assert data["text"] == "" + + def test_body_too_large_413(self): + bot = _make_bot() + raw = _build_request("POST", "/api/messages", b"", { + "Content-Length": str(_MAX_BODY + 1), + }) + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(bot._handle_connection(reader, writer)) + assert b"413" in writer.data + + def test_command_dispatch_full_cycle(self): + """Full request lifecycle: receive, dispatch, reply.""" + bot = _make_bot() + + async def _pong(b, m): + await b.reply(m, "pong") + + bot.registry.register_command("ping", _pong, plugin="test") + activity = _activity(text="derp !ping") + body = json.dumps(activity).encode() + raw = _build_request("POST", "/api/messages", body, { + "Content-Length": str(len(body)), + "Content-Type": "application/json", + }) + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(bot._handle_connection(reader, writer)) + assert b"200 OK" in writer.data + resp_body = writer.data.split(b"\r\n\r\n", 1)[1] + data = json.loads(resp_body) + assert data["text"] == "pong" + + +# --------------------------------------------------------------------------- +# TestHttpResponse +# --------------------------------------------------------------------------- + + +class TestHttpResponse: + def test_plain_200(self): + resp = _http_response(200, "OK", "sent") + assert b"200 OK" in resp + assert b"sent" in resp + assert b"text/plain" in resp + + def test_json_response(self): + resp = _json_response(200, "OK", {"type": "message", "text": "hi"}) + assert b"200 OK" in resp + assert b"application/json" in resp + body = resp.split(b"\r\n\r\n", 1)[1] + data = json.loads(body) + assert data["text"] == "hi" + + def test_404_response(self): + resp = _http_response(404, "Not Found") + assert b"404 Not Found" in resp + assert b"Content-Length: 0" in resp + + +# --------------------------------------------------------------------------- +# TestTeamsBotPluginManagement +# --------------------------------------------------------------------------- + + +class TestTeamsBotPluginManagement: + def test_load_plugin_not_found(self): + bot = _make_bot() + ok, msg = bot.load_plugin("nonexistent_xyz") + assert ok is False + assert "not found" in msg + + def test_load_plugin_already_loaded(self): + bot = _make_bot() + bot.registry._modules["test"] = object() + ok, msg = bot.load_plugin("test") + assert ok is False + assert "already loaded" in msg + + def test_unload_core_refused(self): + bot = _make_bot() + ok, msg = bot.unload_plugin("core") + assert ok is False + assert "cannot unload core" in msg + + def test_unload_not_loaded(self): + bot = _make_bot() + ok, msg = bot.unload_plugin("nonexistent") + assert ok is False + assert "not loaded" in msg + + def test_reload_delegates(self): + bot = _make_bot() + ok, msg = bot.reload_plugin("nonexistent") + assert ok is False + assert "not loaded" in msg