From c483beb5551056ac6cd565f939d3b29ca221c877 Mon Sep 17 00:00:00 2001 From: user Date: Sat, 21 Feb 2026 17:59:14 +0100 Subject: [PATCH] feat: add webhook listener for push events to channels HTTP POST endpoint for external services (CI, monitoring, GitHub). HMAC-SHA256 auth, JSON body, single POST endpoint at /. - asyncio.start_server with raw HTTP parsing (zero deps) - Body validation: channel prefix, non-empty text, 64KB cap - !webhook admin command shows address, request count, uptime - Module-level server guard prevents duplicates on reconnect - 22 test cases in test_webhook.py --- plugins/webhook.py | 196 +++++++++++++++++++++ tests/test_webhook.py | 395 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 591 insertions(+) create mode 100644 plugins/webhook.py create mode 100644 tests/test_webhook.py diff --git a/plugins/webhook.py b/plugins/webhook.py new file mode 100644 index 0000000..378c514 --- /dev/null +++ b/plugins/webhook.py @@ -0,0 +1,196 @@ +"""Webhook listener: receive HTTP POST requests and relay messages to IRC.""" + +from __future__ import annotations + +import asyncio +import hashlib +import hmac +import json +import logging +import time + +from derp.plugin import command, event + +log = logging.getLogger(__name__) + +_MAX_BODY = 65536 # 64 KB +_server: asyncio.Server | None = None +_request_count: int = 0 +_started: float = 0.0 + + +def _verify_signature(secret: str, body: bytes, signature: str) -> bool: + """Verify HMAC-SHA256 signature from X-Signature header.""" + if not secret: + return True # no secret configured = open access + if not signature.startswith("sha256="): + return False + expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() + return hmac.compare_digest(expected, signature[7:]) + + +def _http_response(status: int, reason: str, body: str = "") -> bytes: + """Build a minimal HTTP/1.1 response.""" + body_bytes = body.encode("utf-8") if body else b"" + lines = [ + f"HTTP/1.1 {status} {reason}", + "Content-Type: text/plain; charset=utf-8", + f"Content-Length: {len(body_bytes)}", + "Connection: close", + "", + "", + ] + return "\r\n".join(lines).encode("utf-8") + body_bytes + + +async def _handle_request(reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + bot, secret: str) -> None: + """Parse one HTTP request and dispatch to IRC.""" + global _request_count + + try: + # Read request line + request_line = await asyncio.wait_for(reader.readline(), timeout=10.0) + if not request_line: + return + parts = request_line.decode("utf-8", errors="replace").strip().split() + if len(parts) < 2: + writer.write(_http_response(400, "Bad Request", "malformed request")) + return + method, _path = parts[0], parts[1] + + # Read headers + headers: dict[str, str] = {} + while True: + line = await asyncio.wait_for(reader.readline(), timeout=10.0) + if not line or line == b"\r\n" or line == b"\n": + break + decoded = line.decode("utf-8", errors="replace").strip() + if ":" in decoded: + key, val = decoded.split(":", 1) + headers[key.strip().lower()] = val.strip() + + # Method check + if method != "POST": + writer.write(_http_response(405, "Method Not Allowed", "POST only")) + return + + # Read body + content_length = int(headers.get("content-length", "0")) + if content_length > _MAX_BODY: + writer.write(_http_response(413, "Payload Too Large", + f"max {_MAX_BODY} bytes")) + return + body = await asyncio.wait_for(reader.readexactly(content_length), + timeout=10.0) + + # Verify HMAC signature + signature = headers.get("x-signature", "") + if not _verify_signature(secret, body, signature): + writer.write(_http_response(401, "Unauthorized", "bad signature")) + return + + # Parse JSON + try: + data = json.loads(body) + except (json.JSONDecodeError, UnicodeDecodeError): + writer.write(_http_response(400, "Bad Request", "invalid JSON")) + return + + # Validate fields + channel = data.get("channel", "") + text = data.get("text", "") + is_action = data.get("action", False) + + if not isinstance(channel, str) or not channel.startswith(("#", "&")): + writer.write(_http_response(400, "Bad Request", "invalid channel")) + return + if not isinstance(text, str) or not text.strip(): + writer.write(_http_response(400, "Bad Request", "empty text")) + return + + # Send to IRC + text = text.strip() + if is_action: + await bot.action(channel, text) + else: + await bot.send(channel, text) + + _request_count += 1 + writer.write(_http_response(200, "OK", "sent")) + log.info("webhook: relayed to %s (%d bytes)", channel, len(text)) + + except (asyncio.TimeoutError, asyncio.IncompleteReadError, ConnectionError): + log.debug("webhook: client disconnected") + except Exception: + log.exception("webhook: error handling request") + try: + writer.write(_http_response(500, "Internal Server Error")) + except Exception: + pass + finally: + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + + +@event("001") +async def on_connect(bot, message): + """Start the webhook HTTP server on connect (if enabled).""" + global _server, _started, _request_count + + if _server is not None: + return # already running + + cfg = bot.config.get("webhook", {}) + if not cfg.get("enabled"): + return + + host = cfg.get("host", "127.0.0.1") + port = cfg.get("port", 8080) + secret = cfg.get("secret", "") + + async def handler(reader, writer): + await _handle_request(reader, writer, bot, secret) + + try: + _server = await asyncio.start_server(handler, host, port) + _started = time.monotonic() + _request_count = 0 + log.info("webhook: listening on %s:%d", host, port) + except OSError as exc: + log.error("webhook: failed to bind %s:%d: %s", host, port, exc) + + +@command("webhook", help="Show webhook listener status", admin=True) +async def cmd_webhook(bot, message): + """Display webhook server status.""" + if _server is None: + await bot.reply(message, "Webhook: not running") + return + + socks = _server.sockets + if socks: + addr = socks[0].getsockname() + address = f"{addr[0]}:{addr[1]}" + else: + address = "unknown" + + elapsed = int(time.monotonic() - _started) + hours, rem = divmod(elapsed, 3600) + minutes, secs = divmod(rem, 60) + parts = [] + if hours: + parts.append(f"{hours}h") + if minutes: + parts.append(f"{minutes}m") + parts.append(f"{secs}s") + uptime = " ".join(parts) + + await bot.reply( + message, + f"Webhook: {address} | {_request_count} requests | up {uptime}", + ) diff --git a/tests/test_webhook.py b/tests/test_webhook.py new file mode 100644 index 0000000..fbf118a --- /dev/null +++ b/tests/test_webhook.py @@ -0,0 +1,395 @@ +"""Tests for the webhook listener plugin.""" + +import asyncio +import hashlib +import hmac +import importlib.util +import json +import sys +import time +from pathlib import Path + +from derp.irc import Message + +# plugins/ is not a Python package -- load the module from file path +_spec = importlib.util.spec_from_file_location( + "plugins.webhook", Path(__file__).resolve().parent.parent / "plugins" / "webhook.py", +) +_mod = importlib.util.module_from_spec(_spec) +sys.modules[_spec.name] = _mod +_spec.loader.exec_module(_mod) + +from plugins.webhook import ( # noqa: E402 + _MAX_BODY, + _handle_request, + _http_response, + _verify_signature, + cmd_webhook, + on_connect, +) + +# -- Helpers ----------------------------------------------------------------- + + +class _FakeState: + """In-memory stand-in for bot.state.""" + + def __init__(self): + self._store: dict[str, dict[str, str]] = {} + + def get(self, plugin: str, key: str, default: str | None = None) -> str | None: + return self._store.get(plugin, {}).get(key, default) + + def set(self, plugin: str, key: str, value: str) -> None: + self._store.setdefault(plugin, {})[key] = value + + def delete(self, plugin: str, key: str) -> bool: + try: + del self._store[plugin][key] + return True + except KeyError: + return False + + def keys(self, plugin: str) -> list[str]: + return sorted(self._store.get(plugin, {}).keys()) + + +class _FakeBot: + """Minimal bot stand-in that captures sent/action messages.""" + + def __init__(self, *, admin: bool = True, webhook_cfg: dict | None = None): + self.sent: list[tuple[str, str]] = [] + self.replied: list[str] = [] + self.actions: list[tuple[str, str]] = [] + self.state = _FakeState() + self._admin = admin + self.prefix = "!" + self.config = { + "webhook": webhook_cfg or {"enabled": False}, + } + + async def send(self, target: str, text: str) -> None: + self.sent.append((target, text)) + + async def reply(self, message, text: str) -> None: + self.replied.append(text) + + async def action(self, target: str, text: str) -> None: + self.actions.append((target, text)) + + def _is_admin(self, message) -> bool: + return self._admin + + +def _msg(text: str, nick: str = "admin", target: str = "#test") -> Message: + return Message( + raw="", prefix=f"{nick}!~{nick}@host", nick=nick, + command="PRIVMSG", params=[target, text], tags={}, + ) + + +def _sign(secret: str, body: bytes) -> str: + """Generate HMAC-SHA256 signature.""" + sig = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() + return f"sha256={sig}" + + +class _FakeReader: + """Mock asyncio.StreamReader from raw HTTP bytes.""" + + def __init__(self, data: bytes) -> None: + self._data = data + self._pos = 0 + + async def readline(self) -> bytes: + start = self._pos + idx = self._data.find(b"\n", start) + if idx == -1: + self._pos = len(self._data) + return self._data[start:] + self._pos = idx + 1 + return self._data[start:self._pos] + + async def readexactly(self, n: int) -> bytes: + chunk = self._data[self._pos:self._pos + n] + self._pos += n + return chunk + + +class _FakeWriter: + """Mock asyncio.StreamWriter that captures output.""" + + def __init__(self) -> None: + self.data = b"" + self._closed = False + + def write(self, data: bytes) -> None: + self.data += data + + def close(self) -> None: + self._closed = True + + async def wait_closed(self) -> None: + pass + + +def _build_request(method: str, body: bytes, headers: dict[str, str] | None = None) -> bytes: + """Build raw HTTP request bytes.""" + hdrs = headers or {} + if "Content-Length" not in hdrs: + hdrs["Content-Length"] = str(len(body)) + lines = [f"{method} / HTTP/1.1"] + for k, v in hdrs.items(): + lines.append(f"{k}: {v}") + lines.append("") + lines.append("") + return "\r\n".join(lines).encode("utf-8") + body + + +# --------------------------------------------------------------------------- +# TestVerifySignature +# --------------------------------------------------------------------------- + + +class TestVerifySignature: + def test_valid_signature(self): + body = b'{"channel":"#test","text":"hello"}' + sig = _sign("secret", body) + assert _verify_signature("secret", body, sig) is True + + def test_invalid_signature(self): + body = b'{"channel":"#test","text":"hello"}' + assert _verify_signature("secret", body, "sha256=bad") is False + + def test_empty_secret_allows_all(self): + body = b'{"channel":"#test","text":"hello"}' + assert _verify_signature("", body, "") is True + + def test_missing_prefix(self): + body = b'{"channel":"#test","text":"hello"}' + sig = hmac.new(b"secret", body, hashlib.sha256).hexdigest() + assert _verify_signature("secret", body, sig) is False + + +# --------------------------------------------------------------------------- +# TestHttpResponse +# --------------------------------------------------------------------------- + + +class TestHttpResponse: + def test_200_response(self): + resp = _http_response(200, "OK", "sent") + assert b"200 OK" in resp + assert b"sent" in resp + + def test_400_with_body(self): + resp = _http_response(400, "Bad Request", "invalid JSON") + assert b"400 Bad Request" in resp + assert b"invalid JSON" in resp + + def test_405_response(self): + resp = _http_response(405, "Method Not Allowed", "POST only") + assert b"405 Method Not Allowed" in resp + + +# --------------------------------------------------------------------------- +# TestRequestHandler +# --------------------------------------------------------------------------- + + +class TestRequestHandler: + def test_valid_post(self): + bot = _FakeBot() + body = json.dumps({"channel": "#ops", "text": "deploy done"}).encode() + sig = _sign("secret", body) + raw = _build_request("POST", body, { + "Content-Length": str(len(body)), + "Content-Type": "application/json", + "X-Signature": sig, + }) + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(_handle_request(reader, writer, bot, "secret")) + assert b"200 OK" in writer.data + assert ("#ops", "deploy done") in bot.sent + + def test_action_post(self): + bot = _FakeBot() + body = json.dumps({ + "channel": "#ops", "text": "deployed", "action": True, + }).encode() + sig = _sign("s", body) + raw = _build_request("POST", body, { + "Content-Length": str(len(body)), + "X-Signature": sig, + }) + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(_handle_request(reader, writer, bot, "s")) + assert b"200 OK" in writer.data + assert ("#ops", "deployed") in bot.actions + + def test_get_405(self): + bot = _FakeBot() + raw = _build_request("GET", b"") + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(_handle_request(reader, writer, bot, "")) + assert b"405" in writer.data + + def test_bad_signature_401(self): + bot = _FakeBot() + body = json.dumps({"channel": "#test", "text": "x"}).encode() + raw = _build_request("POST", body, { + "Content-Length": str(len(body)), + "X-Signature": "sha256=wrong", + }) + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(_handle_request(reader, writer, bot, "real-secret")) + assert b"401" in writer.data + assert len(bot.sent) == 0 + + def test_bad_json_400(self): + bot = _FakeBot() + body = b"not json" + raw = _build_request("POST", body, {"Content-Length": str(len(body))}) + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(_handle_request(reader, writer, bot, "")) + assert b"400" in writer.data + assert b"invalid JSON" in writer.data + + def test_missing_channel_400(self): + bot = _FakeBot() + body = json.dumps({"text": "no channel"}).encode() + raw = _build_request("POST", body, {"Content-Length": str(len(body))}) + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(_handle_request(reader, writer, bot, "")) + assert b"400" in writer.data + assert b"invalid channel" in writer.data + + def test_invalid_channel_400(self): + bot = _FakeBot() + body = json.dumps({"channel": "nochanprefix", "text": "x"}).encode() + raw = _build_request("POST", body, {"Content-Length": str(len(body))}) + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(_handle_request(reader, writer, bot, "")) + assert b"400" in writer.data + + def test_empty_text_400(self): + bot = _FakeBot() + body = json.dumps({"channel": "#test", "text": ""}).encode() + raw = _build_request("POST", body, {"Content-Length": str(len(body))}) + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(_handle_request(reader, writer, bot, "")) + assert b"400" in writer.data + assert b"empty text" in writer.data + + def test_body_too_large_413(self): + bot = _FakeBot() + raw = _build_request("POST", b"", { + "Content-Length": str(_MAX_BODY + 1), + }) + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(_handle_request(reader, writer, bot, "")) + assert b"413" in writer.data + + def test_counter_increments(self): + bot = _FakeBot() + # Reset counter + _mod._request_count = 0 + body = json.dumps({"channel": "#test", "text": "hi"}).encode() + raw = _build_request("POST", body, {"Content-Length": str(len(body))}) + reader = _FakeReader(raw) + writer = _FakeWriter() + asyncio.run(_handle_request(reader, writer, bot, "")) + assert _mod._request_count == 1 + + +# --------------------------------------------------------------------------- +# TestServerLifecycle +# --------------------------------------------------------------------------- + + +class TestServerLifecycle: + def test_disabled_config(self): + """Server does not start when webhook is disabled.""" + bot = _FakeBot(webhook_cfg={"enabled": False}) + msg = _msg("", target="") + msg = Message(raw="", prefix="", nick="", command="001", + params=["test", "Welcome"], tags={}) + # Reset global state + _mod._server = None + asyncio.run(on_connect(bot, msg)) + assert _mod._server is None + + def test_duplicate_guard(self): + """Second on_connect does not create a second server.""" + sentinel = object() + _mod._server = sentinel + bot = _FakeBot(webhook_cfg={"enabled": True, "port": 0}) + msg = Message(raw="", prefix="", nick="", command="001", + params=["test", "Welcome"], tags={}) + asyncio.run(on_connect(bot, msg)) + assert _mod._server is sentinel + _mod._server = None # cleanup + + def test_on_connect_starts(self): + """on_connect starts the server when enabled.""" + _mod._server = None + bot = _FakeBot(webhook_cfg={ + "enabled": True, "host": "127.0.0.1", "port": 0, "secret": "", + }) + msg = Message(raw="", prefix="", nick="", command="001", + params=["test", "Welcome"], tags={}) + + async def _run(): + await on_connect(bot, msg) + assert _mod._server is not None + _mod._server.close() + await _mod._server.wait_closed() + _mod._server = None + + asyncio.run(_run()) + + +# --------------------------------------------------------------------------- +# TestWebhookCommand +# --------------------------------------------------------------------------- + + +class TestWebhookCommand: + def test_not_running(self): + bot = _FakeBot() + _mod._server = None + asyncio.run(cmd_webhook(bot, _msg("!webhook"))) + assert any("not running" in r for r in bot.replied) + + def test_running_shows_status(self): + bot = _FakeBot() + _mod._request_count = 42 + _mod._started = time.monotonic() - 90 # 1m 30s ago + + async def _run(): + # Start a real server on port 0 to get a valid socket + srv = await asyncio.start_server(lambda r, w: None, + "127.0.0.1", 0) + _mod._server = srv + try: + await cmd_webhook(bot, _msg("!webhook")) + finally: + srv.close() + await srv.wait_closed() + _mod._server = None + + asyncio.run(_run()) + assert len(bot.replied) == 1 + reply = bot.replied[0] + assert "Webhook:" in reply + assert "42 requests" in reply + assert "127.0.0.1:" in reply