Files
derp/tests/test_webhook.py
user c483beb555 feat: add webhook listener for push events to channels
HTTP POST endpoint for external services (CI, monitoring, GitHub).
HMAC-SHA256 auth, JSON body, single POST endpoint at /.

- asyncio.start_server with raw HTTP parsing (zero deps)
- Body validation: channel prefix, non-empty text, 64KB cap
- !webhook admin command shows address, request count, uptime
- Module-level server guard prevents duplicates on reconnect
- 22 test cases in test_webhook.py
2026-02-21 17:59:14 +01:00

396 lines
13 KiB
Python

"""Tests for the webhook listener plugin."""
import asyncio
import hashlib
import hmac
import importlib.util
import json
import sys
import time
from pathlib import Path
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.webhook", Path(__file__).resolve().parent.parent / "plugins" / "webhook.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.webhook import ( # noqa: E402
_MAX_BODY,
_handle_request,
_http_response,
_verify_signature,
cmd_webhook,
on_connect,
)
# -- Helpers -----------------------------------------------------------------
class _FakeState:
"""In-memory stand-in for bot.state."""
def __init__(self):
self._store: dict[str, dict[str, str]] = {}
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
return self._store.get(plugin, {}).get(key, default)
def set(self, plugin: str, key: str, value: str) -> None:
self._store.setdefault(plugin, {})[key] = value
def delete(self, plugin: str, key: str) -> bool:
try:
del self._store[plugin][key]
return True
except KeyError:
return False
def keys(self, plugin: str) -> list[str]:
return sorted(self._store.get(plugin, {}).keys())
class _FakeBot:
"""Minimal bot stand-in that captures sent/action messages."""
def __init__(self, *, admin: bool = True, webhook_cfg: dict | None = None):
self.sent: list[tuple[str, str]] = []
self.replied: list[str] = []
self.actions: list[tuple[str, str]] = []
self.state = _FakeState()
self._admin = admin
self.prefix = "!"
self.config = {
"webhook": webhook_cfg or {"enabled": False},
}
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
async def reply(self, message, text: str) -> None:
self.replied.append(text)
async def action(self, target: str, text: str) -> None:
self.actions.append((target, text))
def _is_admin(self, message) -> bool:
return self._admin
def _msg(text: str, nick: str = "admin", target: str = "#test") -> Message:
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
def _sign(secret: str, body: bytes) -> str:
"""Generate HMAC-SHA256 signature."""
sig = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
return f"sha256={sig}"
class _FakeReader:
"""Mock asyncio.StreamReader from raw HTTP bytes."""
def __init__(self, data: bytes) -> None:
self._data = data
self._pos = 0
async def readline(self) -> bytes:
start = self._pos
idx = self._data.find(b"\n", start)
if idx == -1:
self._pos = len(self._data)
return self._data[start:]
self._pos = idx + 1
return self._data[start:self._pos]
async def readexactly(self, n: int) -> bytes:
chunk = self._data[self._pos:self._pos + n]
self._pos += n
return chunk
class _FakeWriter:
"""Mock asyncio.StreamWriter that captures output."""
def __init__(self) -> None:
self.data = b""
self._closed = False
def write(self, data: bytes) -> None:
self.data += data
def close(self) -> None:
self._closed = True
async def wait_closed(self) -> None:
pass
def _build_request(method: str, body: bytes, headers: dict[str, str] | None = None) -> bytes:
"""Build raw HTTP request bytes."""
hdrs = headers or {}
if "Content-Length" not in hdrs:
hdrs["Content-Length"] = str(len(body))
lines = [f"{method} / HTTP/1.1"]
for k, v in hdrs.items():
lines.append(f"{k}: {v}")
lines.append("")
lines.append("")
return "\r\n".join(lines).encode("utf-8") + body
# ---------------------------------------------------------------------------
# TestVerifySignature
# ---------------------------------------------------------------------------
class TestVerifySignature:
def test_valid_signature(self):
body = b'{"channel":"#test","text":"hello"}'
sig = _sign("secret", body)
assert _verify_signature("secret", body, sig) is True
def test_invalid_signature(self):
body = b'{"channel":"#test","text":"hello"}'
assert _verify_signature("secret", body, "sha256=bad") is False
def test_empty_secret_allows_all(self):
body = b'{"channel":"#test","text":"hello"}'
assert _verify_signature("", body, "") is True
def test_missing_prefix(self):
body = b'{"channel":"#test","text":"hello"}'
sig = hmac.new(b"secret", body, hashlib.sha256).hexdigest()
assert _verify_signature("secret", body, sig) is False
# ---------------------------------------------------------------------------
# TestHttpResponse
# ---------------------------------------------------------------------------
class TestHttpResponse:
def test_200_response(self):
resp = _http_response(200, "OK", "sent")
assert b"200 OK" in resp
assert b"sent" in resp
def test_400_with_body(self):
resp = _http_response(400, "Bad Request", "invalid JSON")
assert b"400 Bad Request" in resp
assert b"invalid JSON" in resp
def test_405_response(self):
resp = _http_response(405, "Method Not Allowed", "POST only")
assert b"405 Method Not Allowed" in resp
# ---------------------------------------------------------------------------
# TestRequestHandler
# ---------------------------------------------------------------------------
class TestRequestHandler:
def test_valid_post(self):
bot = _FakeBot()
body = json.dumps({"channel": "#ops", "text": "deploy done"}).encode()
sig = _sign("secret", body)
raw = _build_request("POST", body, {
"Content-Length": str(len(body)),
"Content-Type": "application/json",
"X-Signature": sig,
})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, "secret"))
assert b"200 OK" in writer.data
assert ("#ops", "deploy done") in bot.sent
def test_action_post(self):
bot = _FakeBot()
body = json.dumps({
"channel": "#ops", "text": "deployed", "action": True,
}).encode()
sig = _sign("s", body)
raw = _build_request("POST", body, {
"Content-Length": str(len(body)),
"X-Signature": sig,
})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, "s"))
assert b"200 OK" in writer.data
assert ("#ops", "deployed") in bot.actions
def test_get_405(self):
bot = _FakeBot()
raw = _build_request("GET", b"")
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, ""))
assert b"405" in writer.data
def test_bad_signature_401(self):
bot = _FakeBot()
body = json.dumps({"channel": "#test", "text": "x"}).encode()
raw = _build_request("POST", body, {
"Content-Length": str(len(body)),
"X-Signature": "sha256=wrong",
})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, "real-secret"))
assert b"401" in writer.data
assert len(bot.sent) == 0
def test_bad_json_400(self):
bot = _FakeBot()
body = b"not json"
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, ""))
assert b"400" in writer.data
assert b"invalid JSON" in writer.data
def test_missing_channel_400(self):
bot = _FakeBot()
body = json.dumps({"text": "no channel"}).encode()
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, ""))
assert b"400" in writer.data
assert b"invalid channel" in writer.data
def test_invalid_channel_400(self):
bot = _FakeBot()
body = json.dumps({"channel": "nochanprefix", "text": "x"}).encode()
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, ""))
assert b"400" in writer.data
def test_empty_text_400(self):
bot = _FakeBot()
body = json.dumps({"channel": "#test", "text": ""}).encode()
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, ""))
assert b"400" in writer.data
assert b"empty text" in writer.data
def test_body_too_large_413(self):
bot = _FakeBot()
raw = _build_request("POST", b"", {
"Content-Length": str(_MAX_BODY + 1),
})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, ""))
assert b"413" in writer.data
def test_counter_increments(self):
bot = _FakeBot()
# Reset counter
_mod._request_count = 0
body = json.dumps({"channel": "#test", "text": "hi"}).encode()
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, ""))
assert _mod._request_count == 1
# ---------------------------------------------------------------------------
# TestServerLifecycle
# ---------------------------------------------------------------------------
class TestServerLifecycle:
def test_disabled_config(self):
"""Server does not start when webhook is disabled."""
bot = _FakeBot(webhook_cfg={"enabled": False})
msg = _msg("", target="")
msg = Message(raw="", prefix="", nick="", command="001",
params=["test", "Welcome"], tags={})
# Reset global state
_mod._server = None
asyncio.run(on_connect(bot, msg))
assert _mod._server is None
def test_duplicate_guard(self):
"""Second on_connect does not create a second server."""
sentinel = object()
_mod._server = sentinel
bot = _FakeBot(webhook_cfg={"enabled": True, "port": 0})
msg = Message(raw="", prefix="", nick="", command="001",
params=["test", "Welcome"], tags={})
asyncio.run(on_connect(bot, msg))
assert _mod._server is sentinel
_mod._server = None # cleanup
def test_on_connect_starts(self):
"""on_connect starts the server when enabled."""
_mod._server = None
bot = _FakeBot(webhook_cfg={
"enabled": True, "host": "127.0.0.1", "port": 0, "secret": "",
})
msg = Message(raw="", prefix="", nick="", command="001",
params=["test", "Welcome"], tags={})
async def _run():
await on_connect(bot, msg)
assert _mod._server is not None
_mod._server.close()
await _mod._server.wait_closed()
_mod._server = None
asyncio.run(_run())
# ---------------------------------------------------------------------------
# TestWebhookCommand
# ---------------------------------------------------------------------------
class TestWebhookCommand:
def test_not_running(self):
bot = _FakeBot()
_mod._server = None
asyncio.run(cmd_webhook(bot, _msg("!webhook")))
assert any("not running" in r for r in bot.replied)
def test_running_shows_status(self):
bot = _FakeBot()
_mod._request_count = 42
_mod._started = time.monotonic() - 90 # 1m 30s ago
async def _run():
# Start a real server on port 0 to get a valid socket
srv = await asyncio.start_server(lambda r, w: None,
"127.0.0.1", 0)
_mod._server = srv
try:
await cmd_webhook(bot, _msg("!webhook"))
finally:
srv.close()
await srv.wait_closed()
_mod._server = None
asyncio.run(_run())
assert len(bot.replied) == 1
reply = bot.replied[0]
assert "Webhook:" in reply
assert "42 requests" in reply
assert "127.0.0.1:" in reply