diff --git a/plugins/voice.py b/plugins/voice.py new file mode 100644 index 0000000..f6f4d0e --- /dev/null +++ b/plugins/voice.py @@ -0,0 +1,309 @@ +"""Plugin: voice STT/TTS for Mumble channels. + +Listens for voice audio via pymumble's sound callback, buffers PCM per +user, transcribes via Whisper STT on silence, and provides TTS playback +via Piper. Commands: !listen, !say. +""" + +from __future__ import annotations + +import asyncio +import io +import json +import logging +import threading +import time +import wave +from urllib.parse import quote_plus +import urllib.request + +from derp.http import urlopen as _urlopen +from derp.plugin import command + +log = logging.getLogger(__name__) + +# -- Constants --------------------------------------------------------------- + +_SAMPLE_RATE = 48000 +_CHANNELS = 1 +_SAMPLE_WIDTH = 2 # s16le = 2 bytes per sample + +_SILENCE_GAP = 1.5 # seconds of silence before flushing +_MIN_DURATION = 0.5 # discard utterances shorter than this +_MAX_DURATION = 30.0 # cap buffer at this many seconds +_MIN_BYTES = int(_MIN_DURATION * _SAMPLE_RATE * _SAMPLE_WIDTH) +_MAX_BYTES = int(_MAX_DURATION * _SAMPLE_RATE * _SAMPLE_WIDTH) +_FLUSH_INTERVAL = 0.5 # flush monitor poll interval +_MAX_SAY_LEN = 500 # max characters for !say + +_WHISPER_URL = "http://192.168.122.1:8080/inference" +_PIPER_URL = "http://192.168.122.1:5000/" + +# -- Per-bot state ----------------------------------------------------------- + + +def _ps(bot): + """Per-bot plugin runtime state.""" + cfg = getattr(bot, "config", {}).get("voice", {}) + return bot._pstate.setdefault("voice", { + "listen": False, + "buffers": {}, # {username: bytearray} + "last_ts": {}, # {username: float monotonic} + "flush_task": None, + "lock": threading.Lock(), + "silence_gap": cfg.get("silence_gap", _SILENCE_GAP), + "whisper_url": cfg.get("whisper_url", _WHISPER_URL), + "piper_url": cfg.get("piper_url", _PIPER_URL), + "_listener_registered": False, + }) + + +# -- Helpers ----------------------------------------------------------------- + + +def _is_mumble(bot) -> bool: + """Check if bot supports voice streaming.""" + return hasattr(bot, "stream_audio") + + +def _pcm_to_wav(pcm: bytes) -> bytes: + """Wrap raw s16le 48kHz mono PCM in a WAV container.""" + buf = io.BytesIO() + with wave.open(buf, "wb") as wf: + wf.setnchannels(_CHANNELS) + wf.setsampwidth(_SAMPLE_WIDTH) + wf.setframerate(_SAMPLE_RATE) + wf.writeframes(pcm) + return buf.getvalue() + + +# -- STT: Sound listener (pymumble thread) ---------------------------------- + + +def _on_voice(bot, user, sound_chunk): + """Buffer incoming voice PCM per user. Runs on pymumble thread.""" + ps = _ps(bot) + if not ps["listen"]: + return + name = user["name"] if isinstance(user, dict) else None + if not name or name == bot.nick: + return + pcm = sound_chunk.pcm + if not pcm: + return + with ps["lock"]: + if name not in ps["buffers"]: + ps["buffers"][name] = bytearray() + buf = ps["buffers"][name] + buf.extend(pcm) + if len(buf) > _MAX_BYTES: + ps["buffers"][name] = bytearray(buf[-_MAX_BYTES:]) + ps["last_ts"][name] = time.monotonic() + + +# -- STT: Whisper transcription --------------------------------------------- + + +def _transcribe(ps, pcm: bytes) -> str: + """POST PCM (as WAV) to Whisper and return transcribed text. Blocking.""" + wav_data = _pcm_to_wav(pcm) + boundary = "----derp_voice_boundary" + body = ( + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file"; filename="audio.wav"\r\n' + f"Content-Type: audio/wav\r\n\r\n" + ).encode() + wav_data + ( + f"\r\n--{boundary}\r\n" + f'Content-Disposition: form-data; name="response_format"\r\n\r\n' + f"json\r\n--{boundary}--\r\n" + ).encode() + req = urllib.request.Request(ps["whisper_url"], data=body, method="POST") + req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}") + resp = _urlopen(req, timeout=30, proxy=False) + data = json.loads(resp.read()) + resp.close() + return data.get("text", "").strip() + + +# -- STT: Flush monitor (asyncio background task) --------------------------- + + +async def _flush_monitor(bot): + """Poll for silence gaps and transcribe completed utterances.""" + ps = _ps(bot) + loop = asyncio.get_running_loop() + try: + while ps["listen"]: + await asyncio.sleep(_FLUSH_INTERVAL) + now = time.monotonic() + to_flush: list[tuple[str, bytes]] = [] + + with ps["lock"]: + for name in list(ps["last_ts"]): + elapsed = now - ps["last_ts"][name] + if elapsed >= ps["silence_gap"] and name in ps["buffers"]: + pcm = bytes(ps["buffers"].pop(name)) + del ps["last_ts"][name] + to_flush.append((name, pcm)) + + for name, pcm in to_flush: + if len(pcm) < _MIN_BYTES: + continue + try: + text = await loop.run_in_executor( + None, _transcribe, ps, pcm, + ) + except Exception: + log.exception("voice: transcription failed for %s", name) + continue + if not text or text.strip("., ") == "": + continue + log.info("voice: %s said: %s", name, text) + await bot.action("0", f"heard {name} say: {text}") + except asyncio.CancelledError: + pass + except Exception: + log.exception("voice: flush monitor error") + + +# -- TTS: Piper fetch + playback -------------------------------------------- + + +def _fetch_tts(url: str) -> str | None: + """Fetch TTS WAV from Piper to a temp file. Blocking.""" + import tempfile + try: + resp = _urlopen(url, timeout=30, proxy=False) + data = resp.read() + resp.close() + if not data: + return None + tmp = tempfile.NamedTemporaryFile( + suffix=".wav", prefix="derp_tts_", delete=False, + ) + tmp.write(data) + tmp.close() + return tmp.name + except Exception: + log.exception("voice: TTS fetch failed") + return None + + +async def _tts_play(bot, text: str): + """Fetch TTS audio and play it via stream_audio.""" + from pathlib import Path + + ps = _ps(bot) + url = ps["piper_url"] + "?text=" + quote_plus(text) + loop = asyncio.get_running_loop() + wav_path = await loop.run_in_executor(None, _fetch_tts, url) + if wav_path is None: + return + try: + done = asyncio.Event() + await bot.stream_audio(str(wav_path), volume=1.0, on_done=done) + await done.wait() + finally: + Path(wav_path).unlink(missing_ok=True) + + +# -- Listener lifecycle ----------------------------------------------------- + + +def _ensure_listener(bot): + """Register the sound listener callback (idempotent).""" + ps = _ps(bot) + if ps["_listener_registered"]: + return + if not hasattr(bot, "_sound_listeners"): + return + bot._sound_listeners.append(lambda user, chunk: _on_voice(bot, user, chunk)) + ps["_listener_registered"] = True + log.info("voice: registered sound listener") + + +def _ensure_flush_task(bot): + """Start the flush monitor if not running.""" + ps = _ps(bot) + task = ps.get("flush_task") + if task and not task.done(): + return + ps["flush_task"] = bot._spawn( + _flush_monitor(bot), name="voice-flush-monitor", + ) + + +def _stop_flush_task(bot): + """Cancel the flush monitor.""" + ps = _ps(bot) + task = ps.get("flush_task") + if task and not task.done(): + task.cancel() + ps["flush_task"] = None + + +# -- Commands ---------------------------------------------------------------- + + +@command("listen", help="Voice: !listen [on|off] -- toggle STT", tier="admin") +async def cmd_listen(bot, message): + """Toggle voice-to-text transcription.""" + if not _is_mumble(bot): + await bot.reply(message, "Voice is Mumble-only") + return + + ps = _ps(bot) + parts = message.text.split() + if len(parts) < 2: + state = "on" if ps["listen"] else "off" + await bot.reply(message, f"Listen: {state}") + return + + sub = parts[1].lower() + if sub == "on": + ps["listen"] = True + _ensure_listener(bot) + _ensure_flush_task(bot) + await bot.reply(message, "Listening for voice") + elif sub == "off": + ps["listen"] = False + with ps["lock"]: + ps["buffers"].clear() + ps["last_ts"].clear() + _stop_flush_task(bot) + await bot.reply(message, "Stopped listening") + else: + await bot.reply(message, "Usage: !listen [on|off]") + + +@command("say", help="Voice: !say -- text-to-speech") +async def cmd_say(bot, message): + """Speak text aloud via Piper TTS.""" + if not _is_mumble(bot): + await bot.reply(message, "Voice is Mumble-only") + return + + parts = message.text.split(None, 1) + if len(parts) < 2: + await bot.reply(message, "Usage: !say ") + return + + text = parts[1].strip() + if len(text) > _MAX_SAY_LEN: + await bot.reply(message, f"Text too long (max {_MAX_SAY_LEN} chars)") + return + + bot._spawn(_tts_play(bot, text), name="voice-tts") + + +# -- Plugin lifecycle -------------------------------------------------------- + + +async def on_connected(bot) -> None: + """Re-register listener after reconnect if listen was on.""" + if not _is_mumble(bot): + return + ps = _ps(bot) + if ps["listen"]: + _ensure_listener(bot) + _ensure_flush_task(bot) diff --git a/tests/test_voice.py b/tests/test_voice.py new file mode 100644 index 0000000..dc8d679 --- /dev/null +++ b/tests/test_voice.py @@ -0,0 +1,534 @@ +"""Tests for the voice STT/TTS plugin.""" + +import asyncio +import importlib.util +import io +import sys +import time +import wave +from unittest.mock import AsyncMock, MagicMock, patch + +# -- Load plugin module directly --------------------------------------------- + +_spec = importlib.util.spec_from_file_location("voice", "plugins/voice.py") +_mod = importlib.util.module_from_spec(_spec) +sys.modules["voice"] = _mod +_spec.loader.exec_module(_mod) + + +# -- Fakes ------------------------------------------------------------------- + + +class _FakeState: + def __init__(self): + self._store: dict[str, dict[str, str]] = {} + + def get(self, ns: str, key: str) -> str | None: + return self._store.get(ns, {}).get(key) + + def set(self, ns: str, key: str, value: str) -> None: + self._store.setdefault(ns, {})[key] = value + + def delete(self, ns: str, key: str) -> None: + self._store.get(ns, {}).pop(key, None) + + def keys(self, ns: str) -> list[str]: + return list(self._store.get(ns, {}).keys()) + + +class _FakeBot: + """Minimal bot for voice plugin testing.""" + + def __init__(self, *, mumble: bool = True): + self.sent: list[tuple[str, str]] = [] + self.replied: list[str] = [] + self.actions: list[tuple[str, str]] = [] + self.state = _FakeState() + self.config: dict = {} + self._pstate: dict = {} + self._tasks: set[asyncio.Task] = set() + self.nick = "derp" + self._sound_listeners: list = [] + if mumble: + self.stream_audio = AsyncMock() + + async def send(self, target: str, text: str) -> None: + self.sent.append((target, text)) + + async def reply(self, message, text: str) -> None: + self.replied.append(text) + + async def action(self, target: str, text: str) -> None: + self.actions.append((target, text)) + + def _spawn(self, coro, *, name=None): + task = asyncio.ensure_future(coro) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) + return task + + +class _Msg: + """Minimal message object.""" + + def __init__(self, text="!listen", nick="Alice", target="0", + is_channel=True): + self.text = text + self.nick = nick + self.target = target + self.is_channel = is_channel + self.prefix = nick + self.command = "PRIVMSG" + self.params = [target, text] + self.tags = {} + self.raw = {} + + +class _FakeSoundChunk: + """Minimal sound chunk with PCM data.""" + + def __init__(self, pcm: bytes = b"\x00\x00" * 960): + self.pcm = pcm + + +# --------------------------------------------------------------------------- +# TestMumbleGuard +# --------------------------------------------------------------------------- + + +class TestMumbleGuard: + def test_is_mumble_true(self): + bot = _FakeBot(mumble=True) + assert _mod._is_mumble(bot) is True + + def test_is_mumble_false(self): + bot = _FakeBot(mumble=False) + assert _mod._is_mumble(bot) is False + + def test_listen_non_mumble(self): + bot = _FakeBot(mumble=False) + msg = _Msg(text="!listen on") + asyncio.run(_mod.cmd_listen(bot, msg)) + assert any("Mumble-only" in r for r in bot.replied) + + def test_say_non_mumble(self): + bot = _FakeBot(mumble=False) + msg = _Msg(text="!say hello") + asyncio.run(_mod.cmd_say(bot, msg)) + assert any("Mumble-only" in r for r in bot.replied) + + +# --------------------------------------------------------------------------- +# TestListenCommand +# --------------------------------------------------------------------------- + + +class TestListenCommand: + def test_listen_status(self): + bot = _FakeBot() + msg = _Msg(text="!listen") + asyncio.run(_mod.cmd_listen(bot, msg)) + assert any("off" in r.lower() for r in bot.replied) + + def test_listen_on(self): + bot = _FakeBot() + msg = _Msg(text="!listen on") + asyncio.run(_mod.cmd_listen(bot, msg)) + ps = _mod._ps(bot) + assert ps["listen"] is True + assert any("Listening" in r for r in bot.replied) + + def test_listen_off(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["listen"] = True + ps["buffers"]["Alice"] = bytearray(b"\x00" * 100) + ps["last_ts"]["Alice"] = time.monotonic() + msg = _Msg(text="!listen off") + asyncio.run(_mod.cmd_listen(bot, msg)) + assert ps["listen"] is False + assert ps["buffers"] == {} + assert ps["last_ts"] == {} + assert any("Stopped" in r for r in bot.replied) + + def test_listen_invalid(self): + bot = _FakeBot() + msg = _Msg(text="!listen maybe") + asyncio.run(_mod.cmd_listen(bot, msg)) + assert any("Usage" in r for r in bot.replied) + + +# --------------------------------------------------------------------------- +# TestSayCommand +# --------------------------------------------------------------------------- + + +class TestSayCommand: + def test_say_no_text(self): + bot = _FakeBot() + msg = _Msg(text="!say") + asyncio.run(_mod.cmd_say(bot, msg)) + assert any("Usage" in r for r in bot.replied) + + def test_say_too_long(self): + bot = _FakeBot() + text = "x" * 501 + msg = _Msg(text=f"!say {text}") + asyncio.run(_mod.cmd_say(bot, msg)) + assert any("too long" in r.lower() for r in bot.replied) + + def test_say_spawns_task(self): + bot = _FakeBot() + msg = _Msg(text="!say hello world") + + spawned = [] + original_spawn = bot._spawn + + def track_spawn(coro, *, name=None): + spawned.append(name) + coro.close() + task = MagicMock() + task.done.return_value = False + return task + + bot._spawn = track_spawn + asyncio.run(_mod.cmd_say(bot, msg)) + assert "voice-tts" in spawned + + +# --------------------------------------------------------------------------- +# TestAudioBuffering +# --------------------------------------------------------------------------- + + +class TestAudioBuffering: + def test_accumulates_pcm(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["listen"] = True + user = {"name": "Alice"} + chunk = _FakeSoundChunk(b"\x01\x02" * 480) + _mod._on_voice(bot, user, chunk) + assert "Alice" in ps["buffers"] + assert len(ps["buffers"]["Alice"]) == 960 + + def test_ignores_own_nick(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["listen"] = True + user = {"name": "derp"} + chunk = _FakeSoundChunk(b"\x01\x02" * 480) + _mod._on_voice(bot, user, chunk) + assert "derp" not in ps["buffers"] + + def test_respects_listen_false(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["listen"] = False + user = {"name": "Alice"} + chunk = _FakeSoundChunk(b"\x01\x02" * 480) + _mod._on_voice(bot, user, chunk) + assert ps["buffers"] == {} + + def test_caps_at_max_bytes(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["listen"] = True + user = {"name": "Alice"} + # Fill beyond max + big_chunk = _FakeSoundChunk(b"\x00\x01" * (_mod._MAX_BYTES // 2 + 100)) + _mod._on_voice(bot, user, big_chunk) + assert len(ps["buffers"]["Alice"]) <= _mod._MAX_BYTES + + def test_empty_pcm_ignored(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["listen"] = True + user = {"name": "Alice"} + chunk = _FakeSoundChunk(b"") + _mod._on_voice(bot, user, chunk) + assert "Alice" not in ps["buffers"] + + def test_none_user_ignored(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["listen"] = True + chunk = _FakeSoundChunk(b"\x01\x02" * 480) + _mod._on_voice(bot, "not_a_dict", chunk) + assert ps["buffers"] == {} + + def test_updates_timestamp(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["listen"] = True + user = {"name": "Alice"} + chunk = _FakeSoundChunk(b"\x01\x02" * 480) + _mod._on_voice(bot, user, chunk) + assert "Alice" in ps["last_ts"] + ts1 = ps["last_ts"]["Alice"] + _mod._on_voice(bot, user, chunk) + assert ps["last_ts"]["Alice"] >= ts1 + + +# --------------------------------------------------------------------------- +# TestFlushLogic +# --------------------------------------------------------------------------- + + +class TestFlushLogic: + def test_silence_gap_triggers_flush(self): + """Buffer is flushed and transcribed after silence gap.""" + bot = _FakeBot() + ps = _mod._ps(bot) + ps["listen"] = True + ps["silence_gap"] = 0.1 # very short for testing + + # Pre-populate buffer with enough PCM (> _MIN_BYTES) + pcm = b"\x00\x01" * (_mod._MIN_BYTES // 2 + 100) + with ps["lock"]: + ps["buffers"]["Alice"] = bytearray(pcm) + ps["last_ts"]["Alice"] = time.monotonic() - 1.0 # already silent + + async def _check(): + with patch.object(_mod, "_transcribe", return_value="hello"): + task = asyncio.create_task(_mod._flush_monitor(bot)) + await asyncio.sleep(1.0) + ps["listen"] = False # stop the monitor + await asyncio.sleep(0.2) + try: + await asyncio.wait_for(task, timeout=2) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass + assert any("hello" in a[1] for a in bot.actions) + + asyncio.run(_check()) + + def test_min_duration_filter(self): + """Short utterances (< _MIN_BYTES) are discarded.""" + bot = _FakeBot() + ps = _mod._ps(bot) + ps["listen"] = True + ps["silence_gap"] = 0.1 + + # Buffer too small + with ps["lock"]: + ps["buffers"]["Alice"] = bytearray(b"\x00\x01" * 10) + ps["last_ts"]["Alice"] = time.monotonic() - 1.0 + + async def _check(): + with patch.object(_mod, "_transcribe", return_value="x") as mock_t: + task = asyncio.create_task(_mod._flush_monitor(bot)) + await asyncio.sleep(0.5) + ps["listen"] = False + await asyncio.sleep(0.2) + try: + await asyncio.wait_for(task, timeout=2) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass + mock_t.assert_not_called() + + asyncio.run(_check()) + + def test_buffer_cleared_after_flush(self): + """Buffer and timestamp are removed after flushing.""" + bot = _FakeBot() + ps = _mod._ps(bot) + ps["listen"] = True + ps["silence_gap"] = 0.1 + + pcm = b"\x00\x01" * (_mod._MIN_BYTES // 2 + 100) + with ps["lock"]: + ps["buffers"]["Alice"] = bytearray(pcm) + ps["last_ts"]["Alice"] = time.monotonic() - 1.0 + + async def _check(): + with patch.object(_mod, "_transcribe", return_value="test"): + task = asyncio.create_task(_mod._flush_monitor(bot)) + await asyncio.sleep(0.5) + ps["listen"] = False + await asyncio.sleep(0.2) + try: + await asyncio.wait_for(task, timeout=2) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass + assert "Alice" not in ps["buffers"] + assert "Alice" not in ps["last_ts"] + + asyncio.run(_check()) + + +# --------------------------------------------------------------------------- +# TestPcmToWav +# --------------------------------------------------------------------------- + + +class TestPcmToWav: + def test_valid_wav(self): + pcm = b"\x00\x00" * 48000 # 1 second of silence + wav_data = _mod._pcm_to_wav(pcm) + # Should start with RIFF header + assert wav_data[:4] == b"RIFF" + # Parse it back + buf = io.BytesIO(wav_data) + with wave.open(buf, "rb") as wf: + assert wf.getnchannels() == 1 + assert wf.getsampwidth() == 2 + assert wf.getframerate() == 48000 + assert wf.getnframes() == 48000 + + def test_empty_pcm(self): + wav_data = _mod._pcm_to_wav(b"") + buf = io.BytesIO(wav_data) + with wave.open(buf, "rb") as wf: + assert wf.getnframes() == 0 + + +# --------------------------------------------------------------------------- +# TestTranscribe +# --------------------------------------------------------------------------- + + +class TestTranscribe: + def test_parse_json_response(self): + ps = {"whisper_url": "http://localhost:8080/inference"} + pcm = b"\x00\x00" * 4800 # 0.1s + resp = MagicMock() + resp.read.return_value = b'{"text": "hello world"}' + with patch.object(_mod, "_urlopen", return_value=resp): + text = _mod._transcribe(ps, pcm) + assert text == "hello world" + + def test_empty_text(self): + ps = {"whisper_url": "http://localhost:8080/inference"} + pcm = b"\x00\x00" * 4800 + resp = MagicMock() + resp.read.return_value = b'{"text": ""}' + with patch.object(_mod, "_urlopen", return_value=resp): + text = _mod._transcribe(ps, pcm) + assert text == "" + + def test_missing_text_key(self): + ps = {"whisper_url": "http://localhost:8080/inference"} + pcm = b"\x00\x00" * 4800 + resp = MagicMock() + resp.read.return_value = b'{"result": "something"}' + with patch.object(_mod, "_urlopen", return_value=resp): + text = _mod._transcribe(ps, pcm) + assert text == "" + + +# --------------------------------------------------------------------------- +# TestPerBotState +# --------------------------------------------------------------------------- + + +class TestPerBotState: + def test_ps_initializes(self): + bot = _FakeBot() + ps = _mod._ps(bot) + assert ps["listen"] is False + assert ps["buffers"] == {} + assert ps["last_ts"] == {} + + def test_ps_stable_reference(self): + bot = _FakeBot() + ps1 = _mod._ps(bot) + ps2 = _mod._ps(bot) + assert ps1 is ps2 + + def test_ps_isolated_per_bot(self): + bot1 = _FakeBot() + bot2 = _FakeBot() + _mod._ps(bot1)["listen"] = True + assert _mod._ps(bot2)["listen"] is False + + def test_ps_config_override(self): + bot = _FakeBot() + bot.config = {"voice": {"silence_gap": 3.0}} + ps = _mod._ps(bot) + assert ps["silence_gap"] == 3.0 + + +# --------------------------------------------------------------------------- +# TestEnsureListener +# --------------------------------------------------------------------------- + + +class TestEnsureListener: + def test_registers_callback(self): + bot = _FakeBot() + _mod._ps(bot) # init state + _mod._ensure_listener(bot) + assert len(bot._sound_listeners) == 1 + ps = _mod._ps(bot) + assert ps["_listener_registered"] is True + + def test_idempotent(self): + bot = _FakeBot() + _mod._ps(bot) + _mod._ensure_listener(bot) + _mod._ensure_listener(bot) + assert len(bot._sound_listeners) == 1 + + def test_no_listener_without_attr(self): + bot = _FakeBot() + del bot._sound_listeners + _mod._ps(bot) + _mod._ensure_listener(bot) + # Should not raise, just skip + + def test_callback_calls_on_voice(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["listen"] = True + _mod._ensure_listener(bot) + user = {"name": "Alice"} + chunk = _FakeSoundChunk(b"\x01\x02" * 480) + bot._sound_listeners[0](user, chunk) + assert "Alice" in ps["buffers"] + + +# --------------------------------------------------------------------------- +# TestOnConnected +# --------------------------------------------------------------------------- + + +class TestOnConnected: + def test_reregisters_when_listening(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["listen"] = True + + spawned = [] + + def fake_spawn(coro, *, name=None): + task = MagicMock() + task.done.return_value = False + spawned.append(name) + coro.close() + return task + + bot._spawn = fake_spawn + asyncio.run(_mod.on_connected(bot)) + assert ps["_listener_registered"] is True + assert "voice-flush-monitor" in spawned + + def test_noop_when_not_listening(self): + bot = _FakeBot() + _mod._ps(bot) # init but listen=False + + spawned = [] + + def fake_spawn(coro, *, name=None): + spawned.append(name) + coro.close() + return MagicMock() + + bot._spawn = fake_spawn + asyncio.run(_mod.on_connected(bot)) + assert "voice-flush-monitor" not in spawned + + def test_noop_non_mumble(self): + bot = _FakeBot(mumble=False) + asyncio.run(_mod.on_connected(bot)) + # Should not raise or register anything