feat: add Microsoft Teams support via outgoing webhooks

TeamsBot adapter exposes the same plugin API as IRC Bot so ~90% of
plugins work without modification.  Uses raw asyncio HTTP server
(no MS SDK dependency) with HMAC-SHA256 signature validation.

- TeamsMessage dataclass duck-typed with IRC Message
- Permission tiers via AAD object IDs (exact match)
- Reply buffer collected and returned as HTTP JSON response
- Incoming webhook support for proactive send()
- IRC-only methods (join/part/kick/mode) as no-ops
- 74 new tests (1302 total)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-21 19:52:33 +01:00
parent c8879f6089
commit 014b609686
4 changed files with 1285 additions and 4 deletions

View File

@@ -56,7 +56,7 @@ def build_parser() -> argparse.ArgumentParser:
return p
def _run(bots: list[Bot]) -> None:
def _run(bots: list) -> None:
"""Run all bots concurrently with graceful SIGTERM handling."""
import signal
@@ -71,11 +71,12 @@ def _run(bots: list[Bot]) -> None:
logging.getLogger("derp").info("interrupted, shutting down")
def _shutdown(bots: list[Bot]) -> None:
def _shutdown(bots: list) -> None:
"""Signal handler: stop all bot loops so cProfile can flush."""
logging.getLogger("derp").info("SIGTERM received, shutting down")
for bot in bots:
bot._running = False
if hasattr(bot, "conn"):
asyncio.get_running_loop().create_task(bot.conn.close())
@@ -125,7 +126,7 @@ def main(argv: list[str] | None = None) -> int:
server_configs = build_server_configs(config)
registry = PluginRegistry()
bots: list[Bot] = []
bots: list = []
for name, srv_config in server_configs.items():
bot = Bot(name, srv_config, registry)
bots.append(bot)
@@ -133,6 +134,13 @@ def main(argv: list[str] | None = None) -> int:
# Load plugins once (shared registry)
bots[0].load_plugins()
# Teams adapter (optional)
if config.get("teams", {}).get("enabled"):
from derp.teams import TeamsBot
teams_bot = TeamsBot("teams", config, registry)
bots.append(teams_bot)
names = ", ".join(b.name for b in bots)
log.info("servers: %s", names)

View File

@@ -39,6 +39,17 @@ DEFAULTS: dict = {
"port": 8080,
"secret": "",
},
"teams": {
"enabled": False,
"bot_name": "derp",
"bind": "127.0.0.1",
"port": 8081,
"webhook_secret": "",
"incoming_webhook_url": "",
"admins": [],
"operators": [],
"trusted": [],
},
"logging": {
"level": "info",
"format": "text",

528
src/derp/teams.py Normal file
View File

@@ -0,0 +1,528 @@
"""Microsoft Teams adapter: outgoing webhook receiver + incoming webhook sender."""
from __future__ import annotations
import asyncio
import base64
import hashlib
import hmac
import json
import logging
import re
import time
import urllib.request
from dataclasses import dataclass, field
from pathlib import Path
from derp.bot import _TokenBucket
from derp.plugin import TIERS, PluginRegistry
from derp.state import StateStore
log = logging.getLogger(__name__)
_MAX_BODY = 65536 # 64 KB
_AMBIGUOUS = object() # sentinel for ambiguous prefix matches
@dataclass(slots=True)
class TeamsMessage:
"""Parsed Teams Activity message, duck-typed with IRC Message.
Plugins that use only ``msg.nick``, ``msg.text``, ``msg.target``,
``msg.is_channel``, ``msg.prefix``, ``msg.command``, ``msg.params``,
and ``msg.tags`` work without modification.
"""
raw: dict
nick: str | None
prefix: str | None # AAD object ID (for ACL matching)
text: str | None
target: str | None # conversation/channel ID
is_channel: bool = True # outgoing webhooks are always channels
command: str = "PRIVMSG" # compatibility shim
params: list[str] = field(default_factory=list)
tags: dict[str, str] = field(default_factory=dict)
_replies: list[str] = field(default_factory=list, repr=False)
# -- Helpers -----------------------------------------------------------------
def _verify_hmac(secret: str, body: bytes, auth_header: str) -> bool:
"""Verify Teams outgoing webhook HMAC-SHA256 signature.
The secret is base64-encoded. The Authorization header format is
``HMAC <base64(hmac-sha256(b64decode(secret), body))>``.
"""
if not secret:
return True # no secret configured = open access
if not auth_header.startswith("HMAC "):
return False
try:
key = base64.b64decode(secret)
except Exception:
log.error("teams: invalid base64 webhook secret")
return False
expected = base64.b64encode(
hmac.new(key, body, hashlib.sha256).digest(),
).decode("ascii")
return hmac.compare_digest(expected, auth_header[5:])
def _strip_mention(text: str, bot_name: str) -> str:
"""Strip ``<at>BotName</at>`` prefix from message text."""
return re.sub(r"<at>[^<]*</at>\s*", "", text).strip()
def _parse_activity(body: bytes) -> dict | None:
"""Parse Teams Activity JSON. Returns None on failure."""
try:
data = json.loads(body)
except (json.JSONDecodeError, UnicodeDecodeError):
return None
if not isinstance(data, dict):
return None
return data
def _build_teams_message(activity: dict, bot_name: str) -> TeamsMessage:
"""Build a TeamsMessage from a Teams Activity dict."""
sender = activity.get("from", {})
conv = activity.get("conversation", {})
nick = sender.get("name")
prefix = sender.get("aadObjectId")
raw_text = activity.get("text", "")
text = _strip_mention(raw_text, bot_name)
target = conv.get("id")
return TeamsMessage(
raw=activity,
nick=nick,
prefix=prefix,
text=text,
target=target,
params=[target or "", text] if target else [text],
)
def _http_response(status: int, reason: str, body: str = "",
content_type: str = "text/plain; charset=utf-8") -> bytes:
"""Build a minimal HTTP/1.1 response."""
body_bytes = body.encode("utf-8") if body else b""
lines = [
f"HTTP/1.1 {status} {reason}",
f"Content-Type: {content_type}",
f"Content-Length: {len(body_bytes)}",
"Connection: close",
"",
"",
]
return "\r\n".join(lines).encode("utf-8") + body_bytes
def _json_response(status: int, reason: str, data: dict) -> bytes:
"""Build an HTTP/1.1 JSON response."""
body = json.dumps(data)
return _http_response(status, reason, body, "application/json")
# -- TeamsBot ----------------------------------------------------------------
class TeamsBot:
"""Microsoft Teams bot adapter via outgoing/incoming webhooks.
Exposes the same public API as :class:`derp.bot.Bot` so that
protocol-agnostic plugins work without modification.
"""
def __init__(self, name: str, config: dict, registry: PluginRegistry) -> None:
self.name = name
self.config = config
self.registry = registry
self._pstate: dict = {}
teams_cfg = config.get("teams", {})
self.nick: str = teams_cfg.get("bot_name", "derp")
self.prefix: str = config.get("bot", {}).get("prefix", "!")
self._running = False
self._started: float = time.monotonic()
self._tasks: set[asyncio.Task] = set()
self._admins: list[str] = teams_cfg.get("admins", [])
self._operators: list[str] = teams_cfg.get("operators", [])
self._trusted: list[str] = teams_cfg.get("trusted", [])
self.state = StateStore(f"data/state-{name}.db")
self._server: asyncio.Server | None = None
self._webhook_secret: str = teams_cfg.get("webhook_secret", "")
self._incoming_url: str = teams_cfg.get("incoming_webhook_url", "")
self._bind: str = teams_cfg.get("bind", "127.0.0.1")
self._port: int = teams_cfg.get("port", 8081)
rate_cfg = config.get("bot", {})
self._bucket = _TokenBucket(
rate=rate_cfg.get("rate_limit", 2.0),
burst=rate_cfg.get("rate_burst", 5),
)
# -- Lifecycle -----------------------------------------------------------
async def start(self) -> None:
"""Start the HTTP server for receiving outgoing webhooks."""
self._running = True
try:
self._server = await asyncio.start_server(
self._handle_connection, self._bind, self._port,
)
except OSError as exc:
log.error("teams: failed to bind %s:%d: %s",
self._bind, self._port, exc)
return
log.info("teams: listening on %s:%d", self._bind, self._port)
try:
while self._running:
await asyncio.sleep(1)
finally:
self._server.close()
await self._server.wait_closed()
log.info("teams: stopped")
# -- HTTP server ---------------------------------------------------------
async def _handle_connection(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
) -> None:
"""Handle a single HTTP connection from Teams."""
try:
# Read request line
request_line = await asyncio.wait_for(
reader.readline(), timeout=10.0)
if not request_line:
return
parts = request_line.decode("utf-8", errors="replace").strip().split()
if len(parts) < 2:
writer.write(_http_response(400, "Bad Request",
"malformed request"))
return
method, path = parts[0], parts[1]
# Read headers
headers: dict[str, str] = {}
while True:
line = await asyncio.wait_for(reader.readline(), timeout=10.0)
if not line or line == b"\r\n" or line == b"\n":
break
decoded = line.decode("utf-8", errors="replace").strip()
if ":" in decoded:
key, val = decoded.split(":", 1)
headers[key.strip().lower()] = val.strip()
# Method check
if method != "POST":
writer.write(_http_response(405, "Method Not Allowed",
"POST only"))
return
# Path check
if path != "/api/messages":
writer.write(_http_response(404, "Not Found"))
return
# Read body
content_length = int(headers.get("content-length", "0"))
if content_length > _MAX_BODY:
writer.write(_http_response(413, "Payload Too Large",
f"max {_MAX_BODY} bytes"))
return
body = await asyncio.wait_for(
reader.readexactly(content_length), timeout=10.0)
# Verify HMAC signature
auth = headers.get("authorization", "")
if not _verify_hmac(self._webhook_secret, body, auth):
writer.write(_http_response(401, "Unauthorized",
"bad signature"))
return
# Parse Activity JSON
activity = _parse_activity(body)
if activity is None:
writer.write(_http_response(400, "Bad Request",
"invalid JSON"))
return
# Only handle message activities
if activity.get("type") != "message":
writer.write(_json_response(200, "OK",
{"type": "message", "text": ""}))
return
# Build message and dispatch
msg = _build_teams_message(activity, self.nick)
await self._dispatch_command(msg)
# Collect replies
reply_text = "\n".join(msg._replies) if msg._replies else ""
writer.write(_json_response(200, "OK", {
"type": "message",
"text": reply_text,
}))
except (asyncio.TimeoutError, asyncio.IncompleteReadError,
ConnectionError):
log.debug("teams: client disconnected")
except Exception:
log.exception("teams: error handling request")
try:
writer.write(_http_response(500, "Internal Server Error"))
except Exception:
pass
finally:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
# -- Command dispatch ----------------------------------------------------
async def _dispatch_command(self, msg: TeamsMessage) -> None:
"""Parse and dispatch a command from a Teams message."""
text = msg.text
if not text or not text.startswith(self.prefix):
return
parts = text[len(self.prefix):].split(None, 1)
cmd_name = parts[0].lower() if parts else ""
handler = self._resolve_command(cmd_name)
if handler is None:
return
if handler is _AMBIGUOUS:
matches = [k for k in self.registry.commands
if k.startswith(cmd_name)]
names = ", ".join(self.prefix + m for m in sorted(matches))
msg._replies.append(
f"Ambiguous command '{self.prefix}{cmd_name}': {names}")
return
if not self._plugin_allowed(handler.plugin, msg.target):
return
required = handler.tier
if required != "user":
sender = self._get_tier(msg)
if TIERS.index(sender) < TIERS.index(required):
msg._replies.append(
f"Permission denied: {self.prefix}{cmd_name} "
f"requires {required}")
return
try:
await handler.callback(self, msg)
except Exception:
log.exception("teams: error in command handler '%s'", cmd_name)
def _resolve_command(self, name: str):
"""Resolve command name with unambiguous prefix matching.
Returns the Handler on exact or unique prefix match, the sentinel
``_AMBIGUOUS`` if multiple commands match, or None if nothing matches.
"""
handler = self.registry.commands.get(name)
if handler is not None:
return handler
matches = [v for k, v in self.registry.commands.items()
if k.startswith(name)]
if len(matches) == 1:
return matches[0]
if len(matches) > 1:
return _AMBIGUOUS
return None
def _plugin_allowed(self, plugin_name: str, channel: str | None) -> bool:
"""Channel filtering is IRC-only; all plugins are allowed on Teams."""
return True
# -- Permission tiers ----------------------------------------------------
def _get_tier(self, msg: TeamsMessage) -> str:
"""Determine permission tier from AAD object ID.
Unlike IRC (fnmatch hostmask patterns), Teams matches exact
AAD object IDs from the ``teams.admins``, ``teams.operators``,
and ``teams.trusted`` config lists.
"""
if not msg.prefix:
return "user"
for aad_id in self._admins:
if msg.prefix == aad_id:
return "admin"
for aad_id in self._operators:
if msg.prefix == aad_id:
return "oper"
for aad_id in self._trusted:
if msg.prefix == aad_id:
return "trusted"
return "user"
def _is_admin(self, msg: TeamsMessage) -> bool:
"""Check if the message sender is a bot admin."""
return self._get_tier(msg) == "admin"
# -- Public API for plugins ----------------------------------------------
async def send(self, target: str, text: str) -> None:
"""Send a message via incoming webhook (proactive messages).
Requires ``teams.incoming_webhook_url`` to be configured.
Does nothing if no URL is set.
"""
if not self._incoming_url:
log.debug("teams: send() skipped, no incoming_webhook_url")
return
await self._bucket.acquire()
payload = json.dumps({"text": text}).encode("utf-8")
req = urllib.request.Request(
self._incoming_url,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
loop = asyncio.get_running_loop()
try:
await loop.run_in_executor(None, urllib.request.urlopen, req)
except Exception:
log.exception("teams: failed to send via incoming webhook")
async def reply(self, msg, text: str) -> None:
"""Reply by appending to the message reply buffer.
Collected replies are returned as the HTTP response body.
"""
msg._replies.append(text)
async def long_reply(
self, msg, lines: list[str], *,
label: str = "",
) -> None:
"""Reply with a list of lines; paste overflow to FlaskPaste.
Same overflow logic as :meth:`derp.bot.Bot.long_reply` but
appends to the reply buffer instead of sending via IRC.
"""
threshold = self.config.get("bot", {}).get("paste_threshold", 4)
if not lines:
return
if len(lines) <= threshold:
for line in lines:
msg._replies.append(line)
return
# Attempt paste overflow
fp = self.registry._modules.get("flaskpaste")
paste_url = None
if fp:
full_text = "\n".join(lines)
loop = asyncio.get_running_loop()
paste_url = await loop.run_in_executor(
None, fp.create_paste, self, full_text,
)
if paste_url:
preview_count = min(2, threshold - 1)
for line in lines[:preview_count]:
msg._replies.append(line)
remaining = len(lines) - preview_count
suffix = f" ({label})" if label else ""
msg._replies.append(
f"... {remaining} more lines{suffix}: {paste_url}")
else:
for line in lines:
msg._replies.append(line)
async def action(self, target: str, text: str) -> None:
"""Send an action as italic text via incoming webhook."""
await self.send(target, f"_{text}_")
async def shorten_url(self, url: str) -> str:
"""Shorten a URL via FlaskPaste. Returns original on failure."""
fp = self.registry._modules.get("flaskpaste")
if not fp:
return url
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(None, fp.shorten_url, self, url)
except Exception:
return url
# -- IRC no-ops ----------------------------------------------------------
async def join(self, channel: str) -> None:
"""No-op: IRC-only concept."""
log.debug("teams: join() is a no-op")
async def part(self, channel: str, reason: str = "") -> None:
"""No-op: IRC-only concept."""
log.debug("teams: part() is a no-op")
async def quit(self, reason: str = "bye") -> None:
"""Stop the Teams adapter."""
self._running = False
async def kick(self, channel: str, nick: str, reason: str = "") -> None:
"""No-op: IRC-only concept."""
log.debug("teams: kick() is a no-op")
async def mode(self, target: str, mode_str: str, *args: str) -> None:
"""No-op: IRC-only concept."""
log.debug("teams: mode() is a no-op")
async def set_topic(self, channel: str, topic: str) -> None:
"""No-op: IRC-only concept."""
log.debug("teams: set_topic() is a no-op")
# -- Plugin management (delegated to registry) ---------------------------
def load_plugins(self, plugins_dir: str | Path | None = None) -> None:
"""Load plugins from the configured directory."""
if plugins_dir is None:
plugins_dir = self.config.get("bot", {}).get(
"plugins_dir", "plugins")
path = Path(plugins_dir)
self.registry.load_directory(path)
@property
def plugins_dir(self) -> Path:
"""Resolved path to the plugins directory."""
return Path(self.config.get("bot", {}).get("plugins_dir", "plugins"))
def load_plugin(self, name: str) -> tuple[bool, str]:
"""Hot-load a new plugin by name from the plugins directory."""
if name in self.registry._modules:
return False, f"plugin already loaded: {name}"
path = self.plugins_dir / f"{name}.py"
if not path.is_file():
return False, f"{name}.py not found"
count = self.registry.load_plugin(path)
if count < 0:
return False, f"failed to load {name}"
return True, f"{count} handlers"
def reload_plugin(self, name: str) -> tuple[bool, str]:
"""Reload a plugin, picking up any file changes."""
return self.registry.reload_plugin(name)
def unload_plugin(self, name: str) -> tuple[bool, str]:
"""Unload a plugin, removing all its handlers."""
if self.registry.unload_plugin(name):
return True, ""
if name == "core":
return False, "cannot unload core"
return False, f"plugin not loaded: {name}"
def _spawn(self, coro, *, name: str | None = None) -> asyncio.Task:
"""Spawn a background task and track it for cleanup."""
task = asyncio.create_task(coro, name=name)
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
return task

734
tests/test_teams.py Normal file
View File

@@ -0,0 +1,734 @@
"""Tests for the Microsoft Teams adapter."""
import asyncio
import base64
import hashlib
import hmac
import json
from derp.plugin import PluginRegistry
from derp.teams import (
_MAX_BODY,
TeamsBot,
TeamsMessage,
_build_teams_message,
_http_response,
_json_response,
_parse_activity,
_strip_mention,
_verify_hmac,
)
# -- Helpers -----------------------------------------------------------------
def _make_bot(secret="", admins=None, operators=None, trusted=None,
incoming_url=""):
"""Create a TeamsBot with test config."""
config = {
"teams": {
"enabled": True,
"bot_name": "derp",
"bind": "127.0.0.1",
"port": 0,
"webhook_secret": secret,
"incoming_webhook_url": incoming_url,
"admins": admins or [],
"operators": operators or [],
"trusted": trusted or [],
},
"bot": {
"prefix": "!",
"paste_threshold": 4,
"plugins_dir": "plugins",
"rate_limit": 2.0,
"rate_burst": 5,
},
}
registry = PluginRegistry()
return TeamsBot("teams-test", config, registry)
def _activity(text="hello", nick="Alice", aad_id="aad-123",
conv_id="conv-456", msg_type="message"):
"""Build a minimal Teams Activity dict."""
return {
"type": msg_type,
"from": {"name": nick, "aadObjectId": aad_id},
"conversation": {"id": conv_id},
"text": text,
}
def _teams_msg(text="!ping", nick="Alice", aad_id="aad-123",
target="conv-456"):
"""Create a TeamsMessage for command testing."""
return TeamsMessage(
raw={}, nick=nick, prefix=aad_id, text=text, target=target,
params=[target, text],
)
def _sign_teams(secret: str, body: bytes) -> str:
"""Generate Teams HMAC-SHA256 Authorization header value."""
key = base64.b64decode(secret)
sig = base64.b64encode(
hmac.new(key, body, hashlib.sha256).digest(),
).decode("ascii")
return f"HMAC {sig}"
class _FakeReader:
"""Mock asyncio.StreamReader from raw HTTP bytes."""
def __init__(self, data: bytes) -> None:
self._data = data
self._pos = 0
async def readline(self) -> bytes:
start = self._pos
idx = self._data.find(b"\n", start)
if idx == -1:
self._pos = len(self._data)
return self._data[start:]
self._pos = idx + 1
return self._data[start:self._pos]
async def readexactly(self, n: int) -> bytes:
chunk = self._data[self._pos:self._pos + n]
self._pos += n
return chunk
class _FakeWriter:
"""Mock asyncio.StreamWriter that captures output."""
def __init__(self) -> None:
self.data = b""
self._closed = False
def write(self, data: bytes) -> None:
self.data += data
def close(self) -> None:
self._closed = True
async def wait_closed(self) -> None:
pass
def _build_request(method: str, path: str, body: bytes,
headers: dict[str, str] | None = None) -> bytes:
"""Build raw HTTP request bytes."""
hdrs = headers or {}
if "Content-Length" not in hdrs:
hdrs["Content-Length"] = str(len(body))
lines = [f"{method} {path} HTTP/1.1"]
for k, v in hdrs.items():
lines.append(f"{k}: {v}")
lines.append("")
lines.append("")
return "\r\n".join(lines).encode("utf-8") + body
# -- Test helpers for registering commands -----------------------------------
async def _echo_handler(bot, msg):
"""Simple command handler that echoes text."""
args = msg.text.split(None, 1)
reply = args[1] if len(args) > 1 else "no args"
await bot.reply(msg, reply)
async def _admin_handler(bot, msg):
"""Admin-only command handler."""
await bot.reply(msg, "admin action done")
# ---------------------------------------------------------------------------
# TestTeamsMessage
# ---------------------------------------------------------------------------
class TestTeamsMessage:
def test_defaults(self):
msg = TeamsMessage(raw={}, nick=None, prefix=None, text=None,
target=None)
assert msg.is_channel is True
assert msg.command == "PRIVMSG"
assert msg.params == []
assert msg.tags == {}
assert msg._replies == []
def test_custom_values(self):
msg = TeamsMessage(
raw={"type": "message"}, nick="Alice", prefix="aad-123",
text="hello", target="conv-456", is_channel=True,
command="PRIVMSG", params=["conv-456", "hello"],
tags={"key": "val"},
)
assert msg.nick == "Alice"
assert msg.prefix == "aad-123"
assert msg.text == "hello"
assert msg.target == "conv-456"
assert msg.tags == {"key": "val"}
def test_duck_type_compat(self):
"""TeamsMessage has the same attribute names as IRC Message."""
msg = _teams_msg()
attrs = ["raw", "nick", "prefix", "text", "target",
"is_channel", "command", "params", "tags"]
for attr in attrs:
assert hasattr(msg, attr), f"missing attribute: {attr}"
def test_replies_buffer(self):
msg = _teams_msg()
assert msg._replies == []
msg._replies.append("pong")
msg._replies.append("line2")
assert len(msg._replies) == 2
def test_raw_dict(self):
activity = {"type": "message", "id": "123"}
msg = TeamsMessage(raw=activity, nick=None, prefix=None,
text=None, target=None)
assert msg.raw is activity
def test_prefix_is_aad_id(self):
msg = _teams_msg(aad_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx")
assert msg.prefix == "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
# ---------------------------------------------------------------------------
# TestVerifyHmac
# ---------------------------------------------------------------------------
class TestVerifyHmac:
def test_valid_signature(self):
# base64-encoded secret
secret = base64.b64encode(b"test-secret").decode()
body = b'{"type":"message","text":"hello"}'
auth = _sign_teams(secret, body)
assert _verify_hmac(secret, body, auth) is True
def test_invalid_signature(self):
secret = base64.b64encode(b"test-secret").decode()
body = b'{"type":"message","text":"hello"}'
assert _verify_hmac(secret, body, "HMAC badsignature") is False
def test_missing_hmac_prefix(self):
secret = base64.b64encode(b"test-secret").decode()
body = b'{"text":"hello"}'
# No "HMAC " prefix
key = base64.b64decode(secret)
sig = base64.b64encode(
hmac.new(key, body, hashlib.sha256).digest()
).decode()
assert _verify_hmac(secret, body, sig) is False
def test_empty_secret_allows_all(self):
assert _verify_hmac("", b"any body", "") is True
assert _verify_hmac("", b"any body", "HMAC whatever") is True
def test_invalid_base64_secret(self):
assert _verify_hmac("not-valid-b64!!!", b"body", "HMAC x") is False
# ---------------------------------------------------------------------------
# TestStripMention
# ---------------------------------------------------------------------------
class TestStripMention:
def test_strip_at_mention(self):
assert _strip_mention("<at>derp</at> !help", "derp") == "!help"
def test_strip_with_extra_spaces(self):
assert _strip_mention("<at>derp</at> !ping", "derp") == "!ping"
def test_no_mention(self):
assert _strip_mention("!help", "derp") == "!help"
def test_multiple_mentions(self):
text = "<at>derp</at> hello <at>other</at> world"
assert _strip_mention(text, "derp") == "hello world"
def test_empty_text(self):
assert _strip_mention("", "derp") == ""
def test_mention_only(self):
assert _strip_mention("<at>derp</at>", "derp") == ""
# ---------------------------------------------------------------------------
# TestParseActivity
# ---------------------------------------------------------------------------
class TestParseActivity:
def test_valid_activity(self):
body = json.dumps({"type": "message", "text": "hello"}).encode()
result = _parse_activity(body)
assert result == {"type": "message", "text": "hello"}
def test_invalid_json(self):
assert _parse_activity(b"not json") is None
def test_not_a_dict(self):
assert _parse_activity(b'["array"]') is None
def test_empty_body(self):
assert _parse_activity(b"") is None
def test_unicode_error(self):
assert _parse_activity(b"\xff\xfe") is None
# ---------------------------------------------------------------------------
# TestBuildTeamsMessage
# ---------------------------------------------------------------------------
class TestBuildTeamsMessage:
def test_basic_message(self):
activity = _activity(text="<at>derp</at> !ping")
msg = _build_teams_message(activity, "derp")
assert msg.nick == "Alice"
assert msg.prefix == "aad-123"
assert msg.text == "!ping"
assert msg.target == "conv-456"
assert msg.is_channel is True
assert msg.command == "PRIVMSG"
def test_strips_mention(self):
activity = _activity(text="<at>Bot</at> !help commands")
msg = _build_teams_message(activity, "Bot")
assert msg.text == "!help commands"
def test_missing_from(self):
activity = {"type": "message", "text": "hello",
"conversation": {"id": "conv"}}
msg = _build_teams_message(activity, "derp")
assert msg.nick is None
assert msg.prefix is None
def test_missing_conversation(self):
activity = {"type": "message", "text": "hello",
"from": {"name": "Alice", "aadObjectId": "aad"}}
msg = _build_teams_message(activity, "derp")
assert msg.target is None
def test_raw_preserved(self):
activity = _activity()
msg = _build_teams_message(activity, "derp")
assert msg.raw is activity
def test_params_populated(self):
activity = _activity(text="<at>derp</at> !test arg")
msg = _build_teams_message(activity, "derp")
assert msg.params[0] == "conv-456"
assert msg.params[1] == "!test arg"
# ---------------------------------------------------------------------------
# TestTeamsBotReply
# ---------------------------------------------------------------------------
class TestTeamsBotReply:
def test_reply_appends(self):
bot = _make_bot()
msg = _teams_msg()
asyncio.run(bot.reply(msg, "pong"))
assert msg._replies == ["pong"]
def test_multi_reply(self):
bot = _make_bot()
msg = _teams_msg()
async def _run():
await bot.reply(msg, "line 1")
await bot.reply(msg, "line 2")
await bot.reply(msg, "line 3")
asyncio.run(_run())
assert msg._replies == ["line 1", "line 2", "line 3"]
def test_long_reply_under_threshold(self):
bot = _make_bot()
msg = _teams_msg()
lines = ["a", "b", "c"]
asyncio.run(bot.long_reply(msg, lines))
assert msg._replies == ["a", "b", "c"]
def test_long_reply_over_threshold_no_paste(self):
"""Over threshold with no FlaskPaste sends all lines."""
bot = _make_bot()
msg = _teams_msg()
lines = ["a", "b", "c", "d", "e", "f"] # 6 > threshold of 4
asyncio.run(bot.long_reply(msg, lines))
assert msg._replies == lines
def test_long_reply_empty(self):
bot = _make_bot()
msg = _teams_msg()
asyncio.run(bot.long_reply(msg, []))
assert msg._replies == []
def test_action_format(self):
"""action() maps to italic text via send()."""
bot = _make_bot(incoming_url="http://example.com/hook")
# action sends to incoming webhook; without actual URL it logs debug
bot._incoming_url = ""
asyncio.run(bot.action("conv", "does a thing"))
# No incoming URL, so send() is a no-op (debug log)
def test_send_no_incoming_url(self):
"""send() is a no-op when no incoming_webhook_url is configured."""
bot = _make_bot()
# Should not raise
asyncio.run(bot.send("target", "text"))
# ---------------------------------------------------------------------------
# TestTeamsBotTier
# ---------------------------------------------------------------------------
class TestTeamsBotTier:
def test_admin_tier(self):
bot = _make_bot(admins=["aad-admin"])
msg = _teams_msg(aad_id="aad-admin")
assert bot._get_tier(msg) == "admin"
def test_oper_tier(self):
bot = _make_bot(operators=["aad-oper"])
msg = _teams_msg(aad_id="aad-oper")
assert bot._get_tier(msg) == "oper"
def test_trusted_tier(self):
bot = _make_bot(trusted=["aad-trusted"])
msg = _teams_msg(aad_id="aad-trusted")
assert bot._get_tier(msg) == "trusted"
def test_user_tier_default(self):
bot = _make_bot()
msg = _teams_msg(aad_id="aad-unknown")
assert bot._get_tier(msg) == "user"
def test_no_prefix(self):
bot = _make_bot(admins=["aad-admin"])
msg = _teams_msg(aad_id=None)
msg.prefix = None
assert bot._get_tier(msg) == "user"
def test_is_admin_true(self):
bot = _make_bot(admins=["aad-admin"])
msg = _teams_msg(aad_id="aad-admin")
assert bot._is_admin(msg) is True
def test_is_admin_false(self):
bot = _make_bot()
msg = _teams_msg(aad_id="aad-nobody")
assert bot._is_admin(msg) is False
def test_priority_order(self):
"""Admin takes priority over oper and trusted."""
bot = _make_bot(admins=["aad-x"], operators=["aad-x"],
trusted=["aad-x"])
msg = _teams_msg(aad_id="aad-x")
assert bot._get_tier(msg) == "admin"
# ---------------------------------------------------------------------------
# TestTeamsBotDispatch
# ---------------------------------------------------------------------------
class TestTeamsBotDispatch:
def test_dispatch_known_command(self):
bot = _make_bot()
bot.registry.register_command(
"echo", _echo_handler, help="echo", plugin="test")
msg = _teams_msg(text="!echo world")
asyncio.run(bot._dispatch_command(msg))
assert msg._replies == ["world"]
def test_dispatch_unknown_command(self):
bot = _make_bot()
msg = _teams_msg(text="!nonexistent")
asyncio.run(bot._dispatch_command(msg))
assert msg._replies == []
def test_dispatch_no_prefix(self):
bot = _make_bot()
msg = _teams_msg(text="just a message")
asyncio.run(bot._dispatch_command(msg))
assert msg._replies == []
def test_dispatch_empty_text(self):
bot = _make_bot()
msg = _teams_msg(text="")
asyncio.run(bot._dispatch_command(msg))
assert msg._replies == []
def test_dispatch_none_text(self):
bot = _make_bot()
msg = _teams_msg(text=None)
msg.text = None
asyncio.run(bot._dispatch_command(msg))
assert msg._replies == []
def test_dispatch_ambiguous(self):
bot = _make_bot()
bot.registry.register_command(
"ping", _echo_handler, plugin="test")
bot.registry.register_command(
"plugins", _echo_handler, plugin="test")
msg = _teams_msg(text="!p")
asyncio.run(bot._dispatch_command(msg))
assert len(msg._replies) == 1
assert "Ambiguous" in msg._replies[0]
def test_dispatch_tier_denied(self):
bot = _make_bot()
bot.registry.register_command(
"secret", _admin_handler, plugin="test", tier="admin")
msg = _teams_msg(text="!secret", aad_id="aad-nobody")
asyncio.run(bot._dispatch_command(msg))
assert len(msg._replies) == 1
assert "Permission denied" in msg._replies[0]
def test_dispatch_tier_allowed(self):
bot = _make_bot(admins=["aad-admin"])
bot.registry.register_command(
"secret", _admin_handler, plugin="test", tier="admin")
msg = _teams_msg(text="!secret", aad_id="aad-admin")
asyncio.run(bot._dispatch_command(msg))
assert msg._replies == ["admin action done"]
def test_dispatch_prefix_match(self):
"""Unambiguous prefix resolves to the full command."""
bot = _make_bot()
bot.registry.register_command(
"echo", _echo_handler, plugin="test")
msg = _teams_msg(text="!ec hello")
asyncio.run(bot._dispatch_command(msg))
assert msg._replies == ["hello"]
# ---------------------------------------------------------------------------
# TestTeamsBotNoOps
# ---------------------------------------------------------------------------
class TestTeamsBotNoOps:
def test_join_noop(self):
bot = _make_bot()
asyncio.run(bot.join("#channel"))
def test_part_noop(self):
bot = _make_bot()
asyncio.run(bot.part("#channel", "reason"))
def test_kick_noop(self):
bot = _make_bot()
asyncio.run(bot.kick("#channel", "nick", "reason"))
def test_mode_noop(self):
bot = _make_bot()
asyncio.run(bot.mode("#channel", "+o", "nick"))
def test_set_topic_noop(self):
bot = _make_bot()
asyncio.run(bot.set_topic("#channel", "new topic"))
def test_quit_stops(self):
bot = _make_bot()
bot._running = True
asyncio.run(bot.quit())
assert bot._running is False
# ---------------------------------------------------------------------------
# TestHTTPHandler
# ---------------------------------------------------------------------------
class TestHTTPHandler:
def _b64_secret(self):
return base64.b64encode(b"test-secret-key").decode()
def test_valid_post_with_reply(self):
secret = self._b64_secret()
bot = _make_bot(secret=secret)
bot.registry.register_command(
"ping", _echo_handler, plugin="test")
activity = _activity(text="<at>derp</at> !ping")
body = json.dumps(activity).encode()
auth = _sign_teams(secret, body)
raw = _build_request("POST", "/api/messages", body, {
"Content-Length": str(len(body)),
"Content-Type": "application/json",
"Authorization": auth,
})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(bot._handle_connection(reader, writer))
assert b"200 OK" in writer.data
resp_body = writer.data.split(b"\r\n\r\n", 1)[1]
data = json.loads(resp_body)
assert data["type"] == "message"
def test_get_405(self):
bot = _make_bot()
raw = _build_request("GET", "/api/messages", b"")
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(bot._handle_connection(reader, writer))
assert b"405" in writer.data
def test_wrong_path_404(self):
bot = _make_bot()
raw = _build_request("POST", "/wrong/path", b"")
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(bot._handle_connection(reader, writer))
assert b"404" in writer.data
def test_bad_signature_401(self):
secret = self._b64_secret()
bot = _make_bot(secret=secret)
body = json.dumps(_activity()).encode()
raw = _build_request("POST", "/api/messages", body, {
"Content-Length": str(len(body)),
"Authorization": "HMAC badsignature",
})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(bot._handle_connection(reader, writer))
assert b"401" in writer.data
def test_bad_json_400(self):
bot = _make_bot()
body = b"not json at all"
raw = _build_request("POST", "/api/messages", body, {
"Content-Length": str(len(body)),
})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(bot._handle_connection(reader, writer))
assert b"400" in writer.data
assert b"invalid JSON" in writer.data
def test_non_message_activity(self):
bot = _make_bot()
body = json.dumps({"type": "conversationUpdate"}).encode()
raw = _build_request("POST", "/api/messages", body, {
"Content-Length": str(len(body)),
})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(bot._handle_connection(reader, writer))
assert b"200 OK" in writer.data
resp_body = writer.data.split(b"\r\n\r\n", 1)[1]
data = json.loads(resp_body)
assert data["text"] == ""
def test_body_too_large_413(self):
bot = _make_bot()
raw = _build_request("POST", "/api/messages", b"", {
"Content-Length": str(_MAX_BODY + 1),
})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(bot._handle_connection(reader, writer))
assert b"413" in writer.data
def test_command_dispatch_full_cycle(self):
"""Full request lifecycle: receive, dispatch, reply."""
bot = _make_bot()
async def _pong(b, m):
await b.reply(m, "pong")
bot.registry.register_command("ping", _pong, plugin="test")
activity = _activity(text="<at>derp</at> !ping")
body = json.dumps(activity).encode()
raw = _build_request("POST", "/api/messages", body, {
"Content-Length": str(len(body)),
"Content-Type": "application/json",
})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(bot._handle_connection(reader, writer))
assert b"200 OK" in writer.data
resp_body = writer.data.split(b"\r\n\r\n", 1)[1]
data = json.loads(resp_body)
assert data["text"] == "pong"
# ---------------------------------------------------------------------------
# TestHttpResponse
# ---------------------------------------------------------------------------
class TestHttpResponse:
def test_plain_200(self):
resp = _http_response(200, "OK", "sent")
assert b"200 OK" in resp
assert b"sent" in resp
assert b"text/plain" in resp
def test_json_response(self):
resp = _json_response(200, "OK", {"type": "message", "text": "hi"})
assert b"200 OK" in resp
assert b"application/json" in resp
body = resp.split(b"\r\n\r\n", 1)[1]
data = json.loads(body)
assert data["text"] == "hi"
def test_404_response(self):
resp = _http_response(404, "Not Found")
assert b"404 Not Found" in resp
assert b"Content-Length: 0" in resp
# ---------------------------------------------------------------------------
# TestTeamsBotPluginManagement
# ---------------------------------------------------------------------------
class TestTeamsBotPluginManagement:
def test_load_plugin_not_found(self):
bot = _make_bot()
ok, msg = bot.load_plugin("nonexistent_xyz")
assert ok is False
assert "not found" in msg
def test_load_plugin_already_loaded(self):
bot = _make_bot()
bot.registry._modules["test"] = object()
ok, msg = bot.load_plugin("test")
assert ok is False
assert "already loaded" in msg
def test_unload_core_refused(self):
bot = _make_bot()
ok, msg = bot.unload_plugin("core")
assert ok is False
assert "cannot unload core" in msg
def test_unload_not_loaded(self):
bot = _make_bot()
ok, msg = bot.unload_plugin("nonexistent")
assert ok is False
assert "not loaded" in msg
def test_reload_delegates(self):
bot = _make_bot()
ok, msg = bot.reload_plugin("nonexistent")
assert ok is False
assert "not loaded" in msg