From d884d2bb55a9331da88f2ba07036d700dac302ee Mon Sep 17 00:00:00 2001 From: user Date: Sat, 21 Feb 2026 23:15:42 +0100 Subject: [PATCH] refactor: switch Mumble voice to pymumble transport asyncio's SSL memory-BIO transport silently drops voice packets even though text works fine. pymumble uses blocking ssl.SSLSocket.send() which reliably delivers voice data. - Rewrite MumbleBot to use pymumble for connection, SSL, ping, and voice encoding/sending - Bridge pymumble thread callbacks to asyncio via run_coroutine_threadsafe for text dispatch - Voice via sound_output.add_sound(pcm) -- pymumble handles Opus encoding, packetization, and timing - Remove custom protobuf codec, voice varint, and opus ctypes wrapper - Add container patches for pymumble ssl.wrap_socket (Python 3.13) and opuslib find_library (musl/Alpine) Co-Authored-By: Claude Opus 4.6 --- Containerfile | 7 +- patches/apply_pymumble_ssl.py | 45 +++ plugins/music.py | 11 + requirements.txt | 1 + src/derp/mumble.py | 667 +++++++++------------------------- src/derp/opus.py | 100 ----- tests/test_mumble.py | 558 ++-------------------------- tests/test_opus.py | 154 -------- 8 files changed, 261 insertions(+), 1282 deletions(-) create mode 100644 patches/apply_pymumble_ssl.py delete mode 100644 src/derp/opus.py delete mode 100644 tests/test_opus.py diff --git a/Containerfile b/Containerfile index 12cd5b4..d49424e 100644 --- a/Containerfile +++ b/Containerfile @@ -1,12 +1,17 @@ FROM python:3.13-alpine -RUN apk add --no-cache opus ffmpeg yt-dlp +RUN apk add --no-cache opus ffmpeg yt-dlp && \ + ln -s /usr/lib/libopus.so.0 /usr/lib/libopus.so WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt +# Patch pymumble for Python 3.13 (ssl.wrap_socket was removed) +COPY patches/apply_pymumble_ssl.py /tmp/apply_pymumble_ssl.py +RUN python3 /tmp/apply_pymumble_ssl.py && rm /tmp/apply_pymumble_ssl.py + ENV PYTHONPATH=/app/src ENV PYTHONUNBUFFERED=1 ENTRYPOINT ["python", "-m", "derp"] diff --git a/patches/apply_pymumble_ssl.py b/patches/apply_pymumble_ssl.py new file mode 100644 index 0000000..2f866ef --- /dev/null +++ b/patches/apply_pymumble_ssl.py @@ -0,0 +1,45 @@ +"""Patch pymumble deps for Python 3.13+ / musl (Alpine). + +1. pymumble: ssl.wrap_socket was removed in 3.13 +2. opuslib: ctypes.util.find_library fails on musl-based distros +""" + +import pathlib +import sysconfig + +site = sysconfig.get_path("purelib") + +# -- pymumble: replace ssl.wrap_socket with SSLContext -- +p = pathlib.Path(f"{site}/pymumble_py3/mumble.py") +src = p.read_text() + +old = """\ + try: + self.control_socket = ssl.wrap_socket(std_sock, certfile=self.certfile, keyfile=self.keyfile, ssl_version=ssl.PROTOCOL_TLS) + except AttributeError: + self.control_socket = ssl.wrap_socket(std_sock, certfile=self.certfile, keyfile=self.keyfile, ssl_version=ssl.PROTOCOL_TLSv1) + try:""" + +new = """\ + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + if self.certfile: + ctx.load_cert_chain(certfile=self.certfile, keyfile=self.keyfile) + self.control_socket = ctx.wrap_socket(std_sock, server_hostname=self.host) + try:""" + +assert old in src, "pymumble ssl patch target not found" +p.write_text(src.replace(old, new)) +print("pymumble ssl patch applied") + +# -- opuslib: find_library fails on musl, use direct CDLL fallback -- +p = pathlib.Path(f"{site}/opuslib/api/__init__.py") +src = p.read_text() + +old_opus = "lib_location = find_library('opus')" +new_opus = "lib_location = find_library('opus') or 'libopus.so.0'" + +assert old_opus in src, "opuslib find_library patch target not found" +p.write_text(src.replace(old_opus, new_opus)) +print("opuslib musl patch applied") diff --git a/plugins/music.py b/plugins/music.py index fcfbe40..2a29c00 100644 --- a/plugins/music.py +++ b/plugins/music.py @@ -252,6 +252,17 @@ async def cmd_np(bot, message): ) +@command("testtone", help="Music: !testtone -- debug sine wave") +async def cmd_testtone(bot, message): + """Send a 3-second test tone for voice debugging.""" + if not _is_mumble(bot): + await bot.reply(message, "Mumble-only feature") + return + await bot.reply(message, "Sending 440Hz test tone (3s)...") + await bot.test_tone(3.0) + await bot.reply(message, "Test tone complete") + + @command("volume", help="Music: !volume [0-100]") async def cmd_volume(bot, message): """Get or set playback volume. diff --git a/requirements.txt b/requirements.txt index 47fe5f1..2f15259 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ maxminddb>=2.0 +pymumble>=1.6 PySocks>=1.7.1 urllib3[socks]>=2.0 diff --git a/src/derp/mumble.py b/src/derp/mumble.py index 0676c55..703b397 100644 --- a/src/derp/mumble.py +++ b/src/derp/mumble.py @@ -1,4 +1,4 @@ -"""Mumble adapter: TLS/TCP over SOCKS5, protobuf control channel + voice.""" +"""Mumble adapter: pymumble transport with asyncio plugin dispatch.""" from __future__ import annotations @@ -6,15 +6,19 @@ import array import asyncio import html import logging -import random import re -import ssl import struct import time from dataclasses import dataclass, field from pathlib import Path -from derp import http +import pymumble_py3 as pymumble +from pymumble_py3.constants import ( + PYMUMBLE_CLBK_CONNECTED, + PYMUMBLE_CLBK_DISCONNECTED, + PYMUMBLE_CLBK_TEXTMESSAGERECEIVED, +) + from derp.bot import _TokenBucket from derp.plugin import TIERS, PluginRegistry from derp.state import StateStore @@ -23,117 +27,10 @@ log = logging.getLogger(__name__) _AMBIGUOUS = object() # sentinel for ambiguous prefix matches -# -- Mumble message types ---------------------------------------------------- - -MSG_VERSION = 0 -MSG_UDPTUNNEL = 1 -MSG_AUTHENTICATE = 2 -MSG_PING = 3 -MSG_SERVER_SYNC = 5 -MSG_CHANNEL_REMOVE = 6 -MSG_CHANNEL_STATE = 7 -MSG_USER_REMOVE = 8 -MSG_USER_STATE = 9 -MSG_TEXT_MESSAGE = 11 - -# -- Protobuf wire helpers (minimal, no external dep) ------------------------ - -_WIRE_VARINT = 0 -_WIRE_LEN = 2 - - -def _encode_varint(value: int) -> bytes: - """Encode an unsigned integer as a protobuf varint.""" - buf = bytearray() - while value > 0x7F: - buf.append((value & 0x7F) | 0x80) - value >>= 7 - buf.append(value & 0x7F) - return bytes(buf) - - -def _decode_varint(data: bytes, offset: int) -> tuple[int, int]: - """Decode a protobuf varint, returning (value, new_offset).""" - result = 0 - shift = 0 - while offset < len(data): - byte = data[offset] - offset += 1 - result |= (byte & 0x7F) << shift - if not (byte & 0x80): - return result, offset - shift += 7 - return result, offset - - -def _encode_field(field_num: int, wire_type: int, value: int | bytes) -> bytes: - """Encode a single protobuf field.""" - tag = _encode_varint((field_num << 3) | wire_type) - if wire_type == _WIRE_VARINT: - return tag + _encode_varint(value) - # wire_type == _WIRE_LEN - if isinstance(value, str): - value = value.encode("utf-8") - return tag + _encode_varint(len(value)) + value - - -def _decode_fields(data: bytes) -> dict[int, list]: - """Decode protobuf fields, returning field_num -> list of values.""" - fields: dict[int, list] = {} - offset = 0 - while offset < len(data): - tag, offset = _decode_varint(data, offset) - field_num = tag >> 3 - wire_type = tag & 0x07 - if wire_type == _WIRE_VARINT: - value, offset = _decode_varint(data, offset) - elif wire_type == _WIRE_LEN: - length, offset = _decode_varint(data, offset) - value = data[offset:offset + length] - offset += length - elif wire_type == 5: # 32-bit fixed - value = data[offset:offset + 4] - offset += 4 - elif wire_type == 1: # 64-bit fixed - value = data[offset:offset + 8] - offset += 8 - else: - break # unknown wire type, stop parsing - fields.setdefault(field_num, []).append(value) - return fields - - -def _pack_msg(msg_type: int, payload: bytes = b"") -> bytes: - """Wrap a protobuf payload in a Mumble 6-byte header.""" - return struct.pack("!HI", msg_type, len(payload)) + payload - - -def _unpack_header(data: bytes) -> tuple[int, int]: - """Unpack a 6-byte Mumble header into (msg_type, payload_length).""" - return struct.unpack("!HI", data) - - -def _field_str(fields: dict[int, list], num: int) -> str | None: - """Extract a string field.""" - vals = fields.get(num) - if vals and isinstance(vals[0], bytes): - return vals[0].decode("utf-8", errors="replace") - return None - - -def _field_int(fields: dict[int, list], num: int) -> int | None: - """Extract an integer field.""" - vals = fields.get(num) - if vals and isinstance(vals[0], int): - return vals[0] - return None - - -def _field_ints(fields: dict[int, list], num: int) -> list[int]: - """Extract repeated integer fields.""" - vals = fields.get(num, []) - return [v for v in vals if isinstance(v, int)] - +# PCM constants for audio streaming +_SAMPLE_RATE = 48000 +_FRAME_SIZE = 960 # 20ms at 48kHz +_FRAME_BYTES = 1920 # 960 samples * 2 bytes (s16le) # -- HTML helpers ------------------------------------------------------------ @@ -155,66 +52,6 @@ def _shell_quote(s: str) -> str: return "'" + s.replace("'", "'\\''") + "'" -# -- Mumble voice helpers ---------------------------------------------------- - - -def _encode_voice_varint(value: int) -> bytes: - """Encode an integer using Mumble's voice varint format. - - NOT the same as protobuf varint. Mumble voice varints use a prefix - code based on leading bits: - 0xxxxxxx -- 7-bit (0-127) - 10xxxxxx yyyyyyyy -- 14-bit - 110xxxxx yyyyyyyy yyyyyyyy -- 21-bit - 1110xxxx yyyyyyyy yyyyyyyy yyyyyyyy -- 28-bit - 11110000 + 8 bytes -- 64-bit - """ - if value < 0: - raise ValueError("voice varint must be non-negative") - if value < 0x80: - return bytes([value]) - if value < 0x4000: - return bytes([0x80 | (value >> 8), value & 0xFF]) - if value < 0x200000: - return bytes([ - 0xC0 | (value >> 16), - (value >> 8) & 0xFF, - value & 0xFF, - ]) - if value < 0x10000000: - return bytes([ - 0xE0 | (value >> 24), - (value >> 16) & 0xFF, - (value >> 8) & 0xFF, - value & 0xFF, - ]) - # 64-bit fallback - return b"\xf0" + value.to_bytes(8, "big") - - -def _build_voice_packet( - sequence: int, - opus_data: bytes, - *, - last: bool = False, -) -> bytes: - """Build a Mumble voice packet for client-to-server Opus audio. - - Format (client-to-server, no session ID): - 1 byte : header (type=4 << 5 | target=0 -> 0x80) - varint : sequence number (increments by 1 per frame) - varint : opus frame length (bit 13 = terminator on last) - N bytes : raw opus data - """ - header = bytes([0x80]) # type=4 (Opus), target=0 - seq = _encode_voice_varint(sequence) - length = len(opus_data) - if last: - length |= 0x2000 # bit 13 = terminator flag - size = _encode_voice_varint(length) - return header + seq + size + opus_data - - def _scale_pcm(data: bytes, volume: float) -> bytes: """Scale s16le PCM samples by a volume factor, clamped to [-32768, 32767].""" samples = array.array("h") @@ -241,7 +78,7 @@ class MumbleMessage: and ``msg.tags`` work without modification. """ - raw: dict # decoded protobuf fields + raw: dict # original message data nick: str | None # sender username (from session lookup) prefix: str | None # sender username (for ACL) text: str | None # message text (HTML stripped) @@ -252,115 +89,16 @@ class MumbleMessage: tags: dict[str, str] = field(default_factory=dict) -# -- Message builders -------------------------------------------------------- - - -def _build_version_payload() -> bytes: - """Build a Version message payload.""" - payload = b"" - # field 1: version_v1 (uint32) -- legacy: 1.5.0 - payload += _encode_field(1, _WIRE_VARINT, (1 << 16) | (5 << 8)) - # field 2: release (string) - payload += _encode_field(2, _WIRE_LEN, "derp 1.5.0") - # field 3: os (string) - payload += _encode_field(3, _WIRE_LEN, "Linux") - # field 4: os_version (string) - payload += _encode_field(4, _WIRE_LEN, "") - # field 5: version_v2 (uint64) -- semantic: 1.5.0 - payload += _encode_field(5, _WIRE_VARINT, (1 << 48) | (5 << 32)) - return payload - - -def _build_authenticate_payload(username: str, password: str) -> bytes: - """Build an Authenticate message payload.""" - payload = b"" - # field 1: username (string) - payload += _encode_field(1, _WIRE_LEN, username) - # field 2: password (string) - if password: - payload += _encode_field(2, _WIRE_LEN, password) - # field 5: opus (bool/varint) -- True - payload += _encode_field(5, _WIRE_VARINT, 1) - return payload - - -def _build_ping_payload(timestamp: int) -> bytes: - """Build a Ping message payload.""" - return _encode_field(1, _WIRE_VARINT, timestamp) - - -def _build_text_message_payload( - channel_ids: list[int] | None = None, - session_ids: list[int] | None = None, - tree_ids: list[int] | None = None, - message: str = "", -) -> bytes: - """Build a TextMessage payload.""" - payload = b"" - for sid in (session_ids or []): - payload += _encode_field(2, _WIRE_VARINT, sid) - for cid in (channel_ids or []): - payload += _encode_field(3, _WIRE_VARINT, cid) - for tid in (tree_ids or []): - payload += _encode_field(4, _WIRE_VARINT, tid) - # field 5: message (string) - payload += _encode_field(5, _WIRE_LEN, message) - return payload - - -def _build_mumble_message( - fields: dict[int, list], - users: dict[int, str], - our_session: int, -) -> MumbleMessage | None: - """Build a MumbleMessage from decoded TextMessage fields.""" - # field 5: message text - raw_text = _field_str(fields, 5) - if raw_text is None: - return None - - text = _strip_html(raw_text) - - # field 1: actor (sender session) - actor = _field_int(fields, 1) - nick = users.get(actor) if actor is not None else None - prefix = nick # use username for ACL - - # Determine target: channel_id (field 3) or DM (field 2) - channel_ids = _field_ints(fields, 3) - session_ids = _field_ints(fields, 2) - - if channel_ids: - target = str(channel_ids[0]) - is_channel = True - elif session_ids: - target = "dm" - is_channel = False - else: - target = None - is_channel = True - - return MumbleMessage( - raw=dict(fields), - nick=nick, - prefix=prefix, - text=text, - target=target, - is_channel=is_channel, - params=[target or "", text], - ) - - # -- MumbleBot -------------------------------------------------------------- class MumbleBot: - """Mumble bot adapter via TCP/TLS protobuf control channel (text only). + """Mumble bot adapter using pymumble for connection and voice. Exposes the same public API as :class:`derp.bot.Bot` so that protocol-agnostic plugins work without modification. - TCP is routed through ``derp.http.create_connection`` (SOCKS5 - optional via ``mumble.proxy`` config). + Voice uses pymumble's sound_output (blocking SSL socket, proven + reliable for audio delivery). """ def __init__(self, name: str, config: dict, registry: PluginRegistry) -> None: @@ -370,12 +108,10 @@ class MumbleBot: self._pstate: dict = {} mu_cfg = config.get("mumble", {}) - self._proxy: bool = mu_cfg.get("proxy", True) self._host: str = mu_cfg.get("host", "127.0.0.1") self._port: int = mu_cfg.get("port", 64738) self._username: str = mu_cfg.get("username", "derp") self._password: str = mu_cfg.get("password", "") - self._tls_verify: bool = mu_cfg.get("tls_verify", False) self.nick: str = self._username self.prefix: str = ( mu_cfg.get("prefix") @@ -384,19 +120,14 @@ class MumbleBot: self._running = False self._started: float = time.monotonic() self._tasks: set[asyncio.Task] = set() - self._reconnect_delay: float = 5.0 self._admins: list[str] = [str(x) for x in mu_cfg.get("admins", [])] self._operators: list[str] = [str(x) for x in mu_cfg.get("operators", [])] self._trusted: list[str] = [str(x) for x in mu_cfg.get("trusted", [])] self.state = StateStore(f"data/state-{name}.db") - # Protocol state - self._session: int = 0 # our session ID (from ServerSync) - self._channels: dict[int, str] = {} # channel_id -> name - self._users: dict[int, str] = {} # session_id -> username - self._user_channels: dict[int, int] = {} # session_id -> channel_id - self._reader: asyncio.StreamReader | None = None - self._writer: asyncio.StreamWriter | None = None + # pymumble state + self._mumble: pymumble.Mumble | None = None + self._loop: asyncio.AbstractEventLoop | None = None rate_cfg = config.get("bot", {}) self._bucket = _TokenBucket( @@ -406,163 +137,99 @@ class MumbleBot: # -- Connection ---------------------------------------------------------- - def _create_ssl_context(self) -> ssl.SSLContext: - """Build an SSL context for Mumble TLS.""" - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - if not self._tls_verify: - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE - return ctx - - async def _connect(self) -> None: - """Establish TLS connection, optionally through SOCKS5 proxy.""" - loop = asyncio.get_running_loop() - sock = await loop.run_in_executor( - None, lambda: http.create_connection( - (self._host, self._port), proxy=self._proxy, - ), + def _connect_sync(self) -> None: + """Create and start pymumble connection (blocking, run in executor).""" + self._mumble = pymumble.Mumble( + self._host, self._username, + port=self._port, password=self._password, + reconnect=True, ) - ssl_ctx = self._create_ssl_context() - self._reader, self._writer = await asyncio.open_connection( - sock=sock, ssl=ssl_ctx, server_hostname=self._host, + self._mumble.callbacks.set_callback( + PYMUMBLE_CLBK_TEXTMESSAGERECEIVED, + self._on_text_message, ) + self._mumble.callbacks.set_callback( + PYMUMBLE_CLBK_CONNECTED, + self._on_connected, + ) + self._mumble.callbacks.set_callback( + PYMUMBLE_CLBK_DISCONNECTED, + self._on_disconnected, + ) + self._mumble.set_receive_sound(False) + self._mumble.start() + self._mumble.is_ready() - async def _send_msg(self, msg_type: int, payload: bytes = b"") -> None: - """Send a framed Mumble message.""" - if self._writer is None: + def _on_connected(self) -> None: + """Callback from pymumble thread: connection established.""" + session = getattr(self._mumble.users, "myself_session", "?") + log.info("mumble: connected as %s on %s:%d (session=%s)", + self._username, self._host, self._port, session) + + def _on_disconnected(self) -> None: + """Callback from pymumble thread: connection lost.""" + log.warning("mumble: disconnected") + + def _on_text_message(self, message) -> None: + """Callback from pymumble thread: text message received. + + Bridges to the asyncio event loop for command dispatch. + """ + if self._loop is None: return - data = _pack_msg(msg_type, payload) - self._writer.write(data) - await self._writer.drain() + asyncio.run_coroutine_threadsafe( + self._handle_text(message), self._loop, + ) - async def _read_msg(self) -> tuple[int, bytes] | None: - """Read one framed Mumble message. Returns (msg_type, payload).""" - if self._reader is None: - return None - header = await self._reader.readexactly(6) - msg_type, length = _unpack_header(header) - payload = await self._reader.readexactly(length) if length else b"" - return msg_type, payload + async def _handle_text(self, pb_msg) -> None: + """Process a text message from pymumble (runs on asyncio loop).""" + text = _strip_html(pb_msg.message) + actor = pb_msg.actor - async def _close(self) -> None: - """Close the connection.""" - if self._writer: - try: - self._writer.close() - await self._writer.wait_closed() - except Exception: - pass - self._reader = None - self._writer = None + # Look up sender username + nick = None + try: + nick = self._mumble.users[actor]["name"] + except (KeyError, TypeError): + pass + + # Determine target: channel or DM + if pb_msg.channel_id: + target = str(pb_msg.channel_id[0]) + is_channel = True + elif pb_msg.session: + target = "dm" + is_channel = False + else: + target = None + is_channel = True + + msg = MumbleMessage( + raw={}, + nick=nick, + prefix=nick, + text=text, + target=target, + is_channel=is_channel, + params=[target or "", text], + ) + await self._dispatch_command(msg) # -- Lifecycle ----------------------------------------------------------- async def start(self) -> None: - """Connect and enter the message loop with reconnect backoff.""" + """Connect via pymumble and run until stopped.""" self._running = True - while self._running: - try: - await self._connect_and_run() - self._reconnect_delay = 5.0 - except (OSError, ConnectionError, asyncio.IncompleteReadError) as exc: - log.error("mumble: connection lost: %s", exc) - if self._running: - jitter = self._reconnect_delay * 0.25 * (2 * random.random() - 1) - delay = self._reconnect_delay + jitter - log.info("mumble: reconnecting in %.0fs...", delay) - await asyncio.sleep(delay) - self._reconnect_delay = min(self._reconnect_delay * 2, 300.0) + self._loop = asyncio.get_running_loop() - async def _connect_and_run(self) -> None: - """Single connection lifecycle.""" - await self._connect() + await self._loop.run_in_executor(None, self._connect_sync) try: - await self._handshake() - await self._loop() + while self._running: + await asyncio.sleep(1) finally: - await self._close() - - async def _handshake(self) -> None: - """Send Version and Authenticate messages.""" - # Version - await self._send_msg(MSG_VERSION, _build_version_payload()) - # Authenticate - await self._send_msg( - MSG_AUTHENTICATE, - _build_authenticate_payload(self._username, self._password), - ) - log.info("mumble: authenticating as %s on %s:%d", - self._username, self._host, self._port) - - async def _loop(self) -> None: - """Read and dispatch messages until disconnect.""" - while self._running: - result = await self._read_msg() - if result is None: - log.warning("mumble: server closed connection") - return - msg_type, payload = result - await self._handle(msg_type, payload) - - async def _handle(self, msg_type: int, payload: bytes) -> None: - """Dispatch a received Mumble message by type.""" - if msg_type == MSG_PING: - fields = _decode_fields(payload) - ts = _field_int(fields, 1) or 0 - await self._send_msg(MSG_PING, _build_ping_payload(ts)) - return - - if msg_type == MSG_SERVER_SYNC: - fields = _decode_fields(payload) - session = _field_int(fields, 1) - if session is not None: - self._session = session - welcome = _field_str(fields, 3) or "" - log.info("mumble: connected, session=%d, welcome=%s", - self._session, _strip_html(welcome)[:80]) - return - - if msg_type == MSG_CHANNEL_STATE: - fields = _decode_fields(payload) - cid = _field_int(fields, 1) - name = _field_str(fields, 3) - if cid is not None and name is not None: - self._channels[cid] = name - return - - if msg_type == MSG_CHANNEL_REMOVE: - fields = _decode_fields(payload) - cid = _field_int(fields, 1) - if cid is not None: - self._channels.pop(cid, None) - return - - if msg_type == MSG_USER_STATE: - fields = _decode_fields(payload) - session = _field_int(fields, 1) - name = _field_str(fields, 3) - channel_id = _field_int(fields, 5) - if session is not None: - if name is not None: - self._users[session] = name - if channel_id is not None: - self._user_channels[session] = channel_id - return - - if msg_type == MSG_USER_REMOVE: - fields = _decode_fields(payload) - session = _field_int(fields, 1) - if session is not None: - self._users.pop(session, None) - self._user_channels.pop(session, None) - return - - if msg_type == MSG_TEXT_MESSAGE: - fields = _decode_fields(payload) - msg = _build_mumble_message(fields, self._users, self._session) - if msg is not None: - await self._dispatch_command(msg) - return + if self._mumble: + self._mumble.stop() + self._mumble = None # -- Command dispatch ---------------------------------------------------- @@ -607,11 +274,7 @@ class MumbleBot: log.exception("mumble: error in command handler '%s'", cmd_name) def _resolve_command(self, name: str): - """Resolve command name with unambiguous prefix matching. - - Returns the Handler on exact or unique prefix match, the sentinel - ``_AMBIGUOUS`` if multiple commands match, or None if nothing matches. - """ + """Resolve command name with unambiguous prefix matching.""" handler = self.registry.commands.get(name) if handler is not None: return handler @@ -630,10 +293,7 @@ class MumbleBot: # -- Permission tiers ---------------------------------------------------- def _get_tier(self, msg: MumbleMessage) -> str: - """Determine permission tier from username. - - Matches exact string comparison of username against config lists. - """ + """Determine permission tier from username.""" if not msg.prefix: return "user" for name in self._admins: @@ -653,25 +313,31 @@ class MumbleBot: # -- Public API for plugins ---------------------------------------------- + def _send_text_sync(self, channel_id: int, html_text: str) -> None: + """Send a text message via pymumble (blocking, thread-safe).""" + try: + channel = self._mumble.channels[channel_id] + channel.send_text_message(html_text) + except Exception: + log.exception("mumble: failed to send text to channel %d", + channel_id) + async def _send_html(self, target: str, html_text: str) -> None: """Send a TextMessage with pre-formatted HTML (no escaping).""" + if self._mumble is None: + return await self._bucket.acquire() try: channel_id = int(target) except (ValueError, TypeError): - channel_id = 0 # root channel fallback - payload = _build_text_message_payload( - channel_ids=[channel_id], message=html_text, + channel_id = 0 + loop = asyncio.get_running_loop() + await loop.run_in_executor( + None, self._send_text_sync, channel_id, html_text, ) - await self._send_msg(MSG_TEXT_MESSAGE, payload) async def send(self, target: str, text: str) -> None: - """Send a TextMessage to a channel (HTML-escaped, rate-limited). - - ``target`` is a channel_id as string. For DMs, sends to the - current channel instead (Mumble DMs require session IDs which - are not stable). - """ + """Send a TextMessage to a channel (HTML-escaped, rate-limited).""" await self._send_html(target, _escape_html(text)) async def reply(self, msg, text: str) -> None: @@ -679,17 +345,13 @@ class MumbleBot: if msg.target and msg.target != "dm": await self.send(msg.target, text) elif msg.target == "dm": - # Best-effort: send to root channel await self.send("0", text) async def long_reply( self, msg, lines: list[str], *, label: str = "", ) -> None: - """Reply with a list of lines; paste overflow to FlaskPaste. - - Same overflow logic as :meth:`derp.bot.Bot.long_reply`. - """ + """Reply with a list of lines; paste overflow to FlaskPaste.""" threshold = self.config.get("bot", {}).get("paste_threshold", 4) if not lines or not msg.target: return @@ -699,7 +361,6 @@ class MumbleBot: await self.send(msg.target, line) return - # Attempt paste overflow fp = self.registry._modules.get("flaskpaste") paste_url = None if fp: @@ -729,9 +390,32 @@ class MumbleBot: # -- Voice streaming ----------------------------------------------------- - async def _send_voice_packet(self, packet: bytes) -> None: - """Send a voice packet via UDPTunnel (msg type 1).""" - await self._send_msg(MSG_UDPTUNNEL, packet) + async def test_tone(self, duration: float = 3.0) -> None: + """Send a 440Hz sine test tone for debugging voice output.""" + import math + + if self._mumble is None: + return + + log.info("test_tone: sending %.1fs of 440Hz sine", duration) + total_frames = int(duration / 0.02) + + for i in range(total_frames): + samples = [] + for j in range(_FRAME_SIZE): + t = (i * _FRAME_SIZE + j) / _SAMPLE_RATE + samples.append(int(16000 * math.sin(2 * math.pi * 440 * t))) + pcm = struct.pack(f"<{_FRAME_SIZE}h", *samples) + self._mumble.sound_output.add_sound(pcm) + + # Keep buffer shallow so we can cancel promptly + while self._mumble.sound_output.get_buffer_size() > 0.5: + await asyncio.sleep(0.05) + + # Wait for buffer to drain + while self._mumble.sound_output.get_buffer_size() > 0: + await asyncio.sleep(0.1) + log.info("test_tone: done") async def stream_audio( self, @@ -746,16 +430,14 @@ class MumbleBot: yt-dlp -o - -f bestaudio | ffmpeg -i pipe:0 -f s16le -ar 48000 -ac 1 pipe:1 - Reads 1920 bytes (20ms frames), scales volume, encodes Opus, - wraps in voice packets, sends at 20ms intervals. Sets terminator - flag on the last frame. - - Args: - url: Audio URL (YouTube, SoundCloud, etc.) - volume: Volume scale factor (0.0 to 1.0). - on_done: Optional asyncio.Event to set when playback ends. + Feeds raw PCM to pymumble's sound_output which handles Opus + encoding, packetization, and timing. """ - from derp.opus import FRAME_BYTES, OpusEncoder + if self._mumble is None: + return + + log.info("stream_audio: starting pipeline for %s (vol=%.0f%%)", + url, volume * 100) proc = await asyncio.create_subprocess_exec( "sh", "-c", @@ -763,44 +445,50 @@ class MumbleBot: f" | ffmpeg -i pipe:0 -f s16le -ar 48000 -ac 1" f" -loglevel error pipe:1", stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.PIPE, ) - encoder = OpusEncoder() - sequence = 0 + frames = 0 try: while True: - pcm = await proc.stdout.read(FRAME_BYTES) + pcm = await proc.stdout.read(_FRAME_BYTES) if not pcm: break - if len(pcm) < FRAME_BYTES: - pcm += b"\x00" * (FRAME_BYTES - len(pcm)) + if len(pcm) < _FRAME_BYTES: + pcm += b"\x00" * (_FRAME_BYTES - len(pcm)) if volume != 1.0: pcm = _scale_pcm(pcm, volume) - opus_data = encoder.encode(pcm) - pkt = _build_voice_packet(sequence, opus_data) - await self._send_voice_packet(pkt) - sequence += 1 + self._mumble.sound_output.add_sound(pcm) + frames += 1 - # Pace at 20ms per frame - await asyncio.sleep(0.02) + if frames == 1: + log.info("stream_audio: first frame fed to pymumble") - # Send terminator frame (silence) - silence = b"\x00" * FRAME_BYTES - opus_data = encoder.encode(silence) - pkt = _build_voice_packet(sequence, opus_data, last=True) - await self._send_voice_packet(pkt) + # Keep buffer at most 1 second ahead + while self._mumble.sound_output.get_buffer_size() > 1.0: + await asyncio.sleep(0.05) + + # Wait for buffer to drain + while self._mumble.sound_output.get_buffer_size() > 0: + await asyncio.sleep(0.1) + log.info("stream_audio: finished, %d frames", frames) except asyncio.CancelledError: - pass + self._mumble.sound_output.clear_buffer() + log.info("stream_audio: cancelled at frame %d", frames) + except Exception: + log.exception("stream_audio: error at frame %d", frames) finally: - encoder.close() try: proc.kill() except ProcessLookupError: pass + stderr_out = await proc.stderr.read() await proc.wait() + if stderr_out: + log.warning("stream_audio: subprocess stderr: %s", + stderr_out.decode(errors="replace")[:500]) if on_done is not None: on_done.set() @@ -819,11 +507,9 @@ class MumbleBot: async def join(self, channel: str) -> None: """No-op: IRC-only concept.""" - log.debug("mumble: join() is a no-op") async def part(self, channel: str, reason: str = "") -> None: """No-op: IRC-only concept.""" - log.debug("mumble: part() is a no-op") async def quit(self, reason: str = "bye") -> None: """Stop the Mumble adapter.""" @@ -831,15 +517,12 @@ class MumbleBot: async def kick(self, channel: str, nick: str, reason: str = "") -> None: """No-op: IRC-only concept.""" - log.debug("mumble: kick() is a no-op") async def mode(self, target: str, mode_str: str, *args: str) -> None: """No-op: IRC-only concept.""" - log.debug("mumble: mode() is a no-op") async def set_topic(self, channel: str, topic: str) -> None: """No-op: IRC-only concept.""" - log.debug("mumble: set_topic() is a no-op") # -- Plugin management (delegated to registry) --------------------------- diff --git a/src/derp/opus.py b/src/derp/opus.py deleted file mode 100644 index f751d17..0000000 --- a/src/derp/opus.py +++ /dev/null @@ -1,100 +0,0 @@ -"""Minimal ctypes wrapper around system libopus for encoding only.""" - -from __future__ import annotations - -import ctypes -import ctypes.util -from ctypes import POINTER, c_char_p, c_int, c_int32 - -SAMPLE_RATE = 48000 -CHANNELS = 1 -FRAME_SIZE = 960 # 20ms at 48kHz mono -FRAME_BYTES = 1920 # FRAME_SIZE * CHANNELS * 2 (s16le) - -_APPLICATION_AUDIO = 2049 - -_OPUS_SET_BITRATE_REQUEST = 4002 -_OPUS_OK = 0 - -_lib: ctypes.CDLL | None = None - - -def _load_lib() -> ctypes.CDLL: - """Find and load libopus, cached after first call.""" - global _lib - if _lib is not None: - return _lib - - path = ctypes.util.find_library("opus") - if path is None: - path = "libopus.so.0" - - lib = ctypes.cdll.LoadLibrary(path) - - lib.opus_encoder_get_size.argtypes = [c_int] - lib.opus_encoder_get_size.restype = c_int - - lib.opus_encoder_init.argtypes = [c_char_p, c_int32, c_int, c_int] - lib.opus_encoder_init.restype = c_int - - lib.opus_encode.argtypes = [ - c_char_p, # encoder state - c_char_p, # pcm input - c_int, # frame_size (samples per channel) - POINTER(ctypes.c_ubyte), # output buffer - c_int32, # max output bytes - ] - lib.opus_encode.restype = c_int - - lib.opus_encoder_ctl.argtypes = [c_char_p, c_int] - lib.opus_encoder_ctl.restype = c_int - - _lib = lib - return lib - - -class OpusEncoder: - """Opus encoder for 48kHz mono s16le PCM -> Opus frames.""" - - def __init__(self, bitrate: int = 64000) -> None: - lib = _load_lib() - size = lib.opus_encoder_get_size(CHANNELS) - self._state = ctypes.create_string_buffer(size) - rc = lib.opus_encoder_init( - self._state, SAMPLE_RATE, CHANNELS, _APPLICATION_AUDIO, - ) - if rc != _OPUS_OK: - raise RuntimeError(f"opus_encoder_init failed: {rc}") - - rc = lib.opus_encoder_ctl( - self._state, _OPUS_SET_BITRATE_REQUEST, c_int32(bitrate), - ) - if rc != _OPUS_OK: - raise RuntimeError(f"opus_encoder_ctl set bitrate failed: {rc}") - - self._lib = lib - self._out = (ctypes.c_ubyte * 4000)() - - def encode(self, pcm: bytes) -> bytes: - """Encode one 20ms frame of s16le PCM to an Opus packet. - - Args: - pcm: Exactly 1920 bytes (960 samples, 48kHz mono s16le). - - Returns: - Opus-encoded frame bytes. - """ - if len(pcm) != FRAME_BYTES: - raise ValueError( - f"expected {FRAME_BYTES} bytes, got {len(pcm)}" - ) - n = self._lib.opus_encode( - self._state, pcm, FRAME_SIZE, self._out, len(self._out), - ) - if n < 0: - raise RuntimeError(f"opus_encode failed: {n}") - return bytes(self._out[:n]) - - def close(self) -> None: - """Release encoder state.""" - self._state = None diff --git a/tests/test_mumble.py b/tests/test_mumble.py index e69dec6..aa8f70a 100644 --- a/tests/test_mumble.py +++ b/tests/test_mumble.py @@ -1,41 +1,16 @@ """Tests for the Mumble adapter.""" import asyncio -from unittest.mock import patch +import struct +from unittest.mock import patch, MagicMock from derp.mumble import ( - _WIRE_LEN, - _WIRE_VARINT, - MSG_CHANNEL_REMOVE, - MSG_CHANNEL_STATE, - MSG_PING, - MSG_SERVER_SYNC, - MSG_TEXT_MESSAGE, - MSG_UDPTUNNEL, - MSG_USER_REMOVE, - MSG_USER_STATE, MumbleBot, MumbleMessage, - _build_authenticate_payload, - _build_mumble_message, - _build_ping_payload, - _build_text_message_payload, - _build_version_payload, - _build_voice_packet, - _decode_fields, - _decode_varint, - _encode_field, - _encode_varint, - _encode_voice_varint, _escape_html, - _field_int, - _field_ints, - _field_str, - _pack_msg, _scale_pcm, _shell_quote, _strip_html, - _unpack_header, ) from derp.plugin import PluginRegistry @@ -51,7 +26,6 @@ def _make_bot(admins=None, operators=None, trusted=None, prefix=None): "port": 64738, "username": "derp", "password": "", - "tls_verify": False, "admins": admins or [], "operators": operators or [], "trusted": trusted or [], @@ -94,120 +68,6 @@ async def _admin_handler(bot, msg): await bot.reply(msg, "admin action done") -# --------------------------------------------------------------------------- -# TestProtobufCodec -# --------------------------------------------------------------------------- - - -class TestProtobufCodec: - def test_encode_varint_zero(self): - assert _encode_varint(0) == b"\x00" - - def test_encode_varint_small(self): - assert _encode_varint(1) == b"\x01" - assert _encode_varint(127) == b"\x7f" - - def test_encode_varint_two_byte(self): - assert _encode_varint(128) == b"\x80\x01" - assert _encode_varint(300) == b"\xac\x02" - - def test_encode_varint_large(self): - # 16384 = 0x4000 - encoded = _encode_varint(16384) - val, _ = _decode_varint(encoded, 0) - assert val == 16384 - - def test_decode_varint_zero(self): - val, off = _decode_varint(b"\x00", 0) - assert val == 0 - assert off == 1 - - def test_decode_varint_small(self): - val, off = _decode_varint(b"\x01", 0) - assert val == 1 - assert off == 1 - - def test_decode_varint_multi_byte(self): - val, off = _decode_varint(b"\xac\x02", 0) - assert val == 300 - assert off == 2 - - def test_varint_roundtrip(self): - for n in [0, 1, 127, 128, 300, 16384, 2**21, 2**28]: - encoded = _encode_varint(n) - decoded, _ = _decode_varint(encoded, 0) - assert decoded == n, f"roundtrip failed for {n}" - - def test_encode_field_varint(self): - # field 1, wire type 0, value 42 - data = _encode_field(1, _WIRE_VARINT, 42) - fields = _decode_fields(data) - assert fields[1] == [42] - - def test_encode_field_string(self): - data = _encode_field(5, _WIRE_LEN, "hello") - fields = _decode_fields(data) - assert fields[5] == [b"hello"] - - def test_encode_field_bytes(self): - data = _encode_field(3, _WIRE_LEN, b"\x00\x01\x02") - fields = _decode_fields(data) - assert fields[3] == [b"\x00\x01\x02"] - - def test_decode_multiple_fields(self): - data = ( - _encode_field(1, _WIRE_VARINT, 10) - + _encode_field(2, _WIRE_LEN, "test") - + _encode_field(3, _WIRE_VARINT, 99) - ) - fields = _decode_fields(data) - assert fields[1] == [10] - assert fields[2] == [b"test"] - assert fields[3] == [99] - - def test_decode_repeated_fields(self): - data = ( - _encode_field(3, _WIRE_VARINT, 1) - + _encode_field(3, _WIRE_VARINT, 2) - + _encode_field(3, _WIRE_VARINT, 3) - ) - fields = _decode_fields(data) - assert fields[3] == [1, 2, 3] - - def test_pack_unpack_header(self): - packed = _pack_msg(11, b"hello") - msg_type, length = _unpack_header(packed[:6]) - assert msg_type == 11 - assert length == 5 - assert packed[6:] == b"hello" - - def test_pack_empty_payload(self): - packed = _pack_msg(3) - assert len(packed) == 6 - msg_type, length = _unpack_header(packed) - assert msg_type == 3 - assert length == 0 - - def test_field_str(self): - fields = {5: [b"hello"]} - assert _field_str(fields, 5) == "hello" - assert _field_str(fields, 1) is None - - def test_field_int(self): - fields = {1: [42]} - assert _field_int(fields, 1) == 42 - assert _field_int(fields, 2) is None - - def test_field_ints(self): - fields = {3: [1, 2, 3]} - assert _field_ints(fields, 3) == [1, 2, 3] - assert _field_ints(fields, 9) == [] - - def test_decode_empty(self): - fields = _decode_fields(b"") - assert fields == {} - - # --------------------------------------------------------------------------- # TestMumbleMessage # --------------------------------------------------------------------------- @@ -253,78 +113,6 @@ class TestMumbleMessage: assert msg.prefix == "admin_user" -# --------------------------------------------------------------------------- -# TestBuildMumbleMessage -# --------------------------------------------------------------------------- - - -class TestBuildMumbleMessage: - def test_channel_message(self): - fields = { - 1: [42], # actor session - 3: [5], # channel_id - 5: [b"Hello"], # message HTML - } - users = {42: "Alice"} - msg = _build_mumble_message(fields, users, our_session=1) - assert msg is not None - assert msg.nick == "Alice" - assert msg.prefix == "Alice" - assert msg.text == "Hello" - assert msg.target == "5" - assert msg.is_channel is True - - def test_dm_message(self): - fields = { - 1: [42], # actor session - 2: [1], # target session (DM) - 5: [b"secret"], # message - } - users = {42: "Bob"} - msg = _build_mumble_message(fields, users, our_session=1) - assert msg is not None - assert msg.target == "dm" - assert msg.is_channel is False - - def test_missing_sender(self): - fields = { - 3: [0], # channel_id - 5: [b"anonymous"], # message - } - msg = _build_mumble_message(fields, {}, our_session=1) - assert msg is not None - assert msg.nick is None - assert msg.prefix is None - - def test_missing_message(self): - fields = { - 1: [42], - 3: [0], - } - msg = _build_mumble_message(fields, {42: "Alice"}, our_session=1) - assert msg is None - - def test_html_stripped(self): - fields = { - 1: [1], - 3: [0], - 5: [b"click & go"], - } - msg = _build_mumble_message(fields, {1: "User"}, our_session=0) - assert msg is not None - assert msg.text == "click & go" - - def test_no_target(self): - fields = { - 1: [42], - 5: [b"orphan message"], - } - msg = _build_mumble_message(fields, {42: "Alice"}, our_session=1) - assert msg is not None - assert msg.target is None - assert msg.is_channel is True - - # --------------------------------------------------------------------------- # TestHtmlHelpers # --------------------------------------------------------------------------- @@ -356,36 +144,27 @@ class TestHtmlHelpers: class TestMumbleBotReply: - def test_send_builds_text_message(self): + def test_send_calls_send_html(self): bot = _make_bot() - sent: list[tuple[int, bytes]] = [] + sent: list[tuple[str, str]] = [] - async def _fake_send_msg(msg_type, payload=b""): - sent.append((msg_type, payload)) + async def _fake_send_html(target, html_text): + sent.append((target, html_text)) - with patch.object(bot, "_send_msg", side_effect=_fake_send_msg): + with patch.object(bot, "_send_html", side_effect=_fake_send_html): asyncio.run(bot.send("5", "hello")) - assert len(sent) == 1 - assert sent[0][0] == MSG_TEXT_MESSAGE - # Verify payload contains the message - fields = _decode_fields(sent[0][1]) - text = _field_str(fields, 5) - assert text == "hello" - assert _field_ints(fields, 3) == [5] + assert sent == [("5", "hello")] def test_send_escapes_html(self): bot = _make_bot() - sent: list[tuple[int, bytes]] = [] + sent: list[tuple[str, str]] = [] - async def _fake_send_msg(msg_type, payload=b""): - sent.append((msg_type, payload)) + async def _fake_send_html(target, html_text): + sent.append((target, html_text)) - with patch.object(bot, "_send_msg", side_effect=_fake_send_msg): + with patch.object(bot, "_send_html", side_effect=_fake_send_html): asyncio.run(bot.send("0", "