Files
derp/plugins/voice.py
user 3c475107e3 refactor: simplify audition to single-bot playback
derp now handles both listening and speaking, so audition no longer
needs cross-bot lookup or dual-play through merlin.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 23:07:08 +01:00

618 lines
20 KiB
Python

"""Plugin: voice STT/TTS for Mumble channels.
Listens for voice audio via pymumble's sound callback, buffers PCM per
user, transcribes via Whisper STT on silence, and provides TTS playback
via Piper. Commands: !listen, !say.
"""
from __future__ import annotations
import asyncio
import io
import json
import logging
import math
import struct
import threading
import time
import urllib.request
import wave
from derp.http import urlopen as _urlopen
from derp.plugin import command
log = logging.getLogger(__name__)
# -- Constants ---------------------------------------------------------------
_SAMPLE_RATE = 48000
_CHANNELS = 1
_SAMPLE_WIDTH = 2 # s16le = 2 bytes per sample
_SILENCE_GAP = 1.5 # seconds of silence before flushing
_MIN_DURATION = 0.5 # discard utterances shorter than this
_MAX_DURATION = 30.0 # cap buffer at this many seconds
_MIN_BYTES = int(_MIN_DURATION * _SAMPLE_RATE * _SAMPLE_WIDTH)
_MAX_BYTES = int(_MAX_DURATION * _SAMPLE_RATE * _SAMPLE_WIDTH)
_FLUSH_INTERVAL = 0.5 # flush monitor poll interval
_MAX_SAY_LEN = 500 # max characters for !say
_WHISPER_URL = "http://192.168.129.9:8080/inference"
_PIPER_URL = "http://192.168.129.9:5100/"
# -- Per-bot state -----------------------------------------------------------
def _ps(bot):
"""Per-bot plugin runtime state."""
cfg = getattr(bot, "config", {}).get("voice", {})
trigger = cfg.get("trigger", "")
# Bias Whisper toward the trigger word unless explicitly configured
default_prompt = f"{trigger.capitalize()}, " if trigger else ""
return bot._pstate.setdefault("voice", {
"listen": False,
"trigger": trigger,
"buffers": {}, # {username: bytearray}
"last_ts": {}, # {username: float monotonic}
"flush_task": None,
"lock": threading.Lock(),
"silence_gap": cfg.get("silence_gap", _SILENCE_GAP),
"whisper_url": cfg.get("whisper_url", _WHISPER_URL),
"piper_url": cfg.get("piper_url", _PIPER_URL),
"voice": cfg.get("voice", ""),
"length_scale": cfg.get("length_scale", 1.0),
"noise_scale": cfg.get("noise_scale", 0.667),
"noise_w": cfg.get("noise_w", 0.8),
"fx": cfg.get("fx", ""),
"initial_prompt": cfg.get("initial_prompt", default_prompt),
"_listener_registered": False,
})
# -- Helpers -----------------------------------------------------------------
def _is_mumble(bot) -> bool:
"""Check if bot supports voice streaming."""
return hasattr(bot, "stream_audio")
def _pcm_to_wav(pcm: bytes) -> bytes:
"""Wrap raw s16le 48kHz mono PCM in a WAV container."""
buf = io.BytesIO()
with wave.open(buf, "wb") as wf:
wf.setnchannels(_CHANNELS)
wf.setsampwidth(_SAMPLE_WIDTH)
wf.setframerate(_SAMPLE_RATE)
wf.writeframes(pcm)
return buf.getvalue()
# -- Acknowledge tone --------------------------------------------------------
_ACK_FREQ = (880, 1320) # A5 -> E6 ascending
_ACK_NOTE_DUR = 0.15 # seconds per note
_ACK_AMP = 12000 # gentle amplitude
_ACK_FRAME = 960 # 20ms at 48kHz, matches Mumble native
async def _ack_tone(bot) -> None:
"""Play a short two-tone ascending chime via pymumble sound_output."""
mu = getattr(bot, "_mumble", None)
if mu is None:
return
so = mu.sound_output
if so is None:
return
# Unmute if self-muted (stream_audio handles re-mute later)
if getattr(bot, "_self_mute", False):
if bot._mute_task and not bot._mute_task.done():
bot._mute_task.cancel()
bot._mute_task = None
try:
mu.users.myself.unmute()
except Exception:
pass
frames_per_note = int(_ACK_NOTE_DUR / 0.02) # 0.02s per frame
for freq in _ACK_FREQ:
for i in range(frames_per_note):
samples = []
for j in range(_ACK_FRAME):
t = (i * _ACK_FRAME + j) / _SAMPLE_RATE
samples.append(int(_ACK_AMP * math.sin(2 * math.pi * freq * t)))
pcm = struct.pack(f"<{_ACK_FRAME}h", *samples)
so.add_sound(pcm)
while so.get_buffer_size() > 0.5:
await asyncio.sleep(0.02)
# Wait for tone to finish
while so.get_buffer_size() > 0:
await asyncio.sleep(0.05)
# -- STT: Sound listener (pymumble thread) ----------------------------------
def _on_voice(bot, user, sound_chunk):
"""Buffer incoming voice PCM per user. Runs on pymumble thread."""
ps = _ps(bot)
if not ps["listen"] and not ps["trigger"]:
return
try:
name = user["name"]
except (KeyError, TypeError):
name = None
if not name or name == bot.nick:
return
pcm = sound_chunk.pcm
if not pcm:
return
with ps["lock"]:
if name not in ps["buffers"]:
ps["buffers"][name] = bytearray()
buf = ps["buffers"][name]
buf.extend(pcm)
if len(buf) > _MAX_BYTES:
ps["buffers"][name] = bytearray(buf[-_MAX_BYTES:])
ps["last_ts"][name] = time.monotonic()
# -- STT: Whisper transcription ---------------------------------------------
def _transcribe(ps, pcm: bytes) -> str:
"""POST PCM (as WAV) to Whisper and return transcribed text. Blocking."""
wav_data = _pcm_to_wav(pcm)
boundary = "----derp_voice_boundary"
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="audio.wav"\r\n'
f"Content-Type: audio/wav\r\n\r\n"
).encode() + wav_data + (
f"\r\n--{boundary}\r\n"
f'Content-Disposition: form-data; name="response_format"\r\n\r\n'
f"json"
).encode()
# Bias Whisper toward the trigger word when configured
prompt = ps.get("initial_prompt", "")
if prompt:
body += (
f"\r\n--{boundary}\r\n"
f'Content-Disposition: form-data; name="initial_prompt"\r\n\r\n'
f"{prompt}"
).encode()
body += f"\r\n--{boundary}--\r\n".encode()
req = urllib.request.Request(ps["whisper_url"], data=body, method="POST")
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
resp = _urlopen(req, timeout=30, proxy=False)
data = json.loads(resp.read())
resp.close()
return data.get("text", "").strip()
# -- STT: Flush monitor (asyncio background task) ---------------------------
async def _flush_monitor(bot):
"""Poll for silence gaps and transcribe completed utterances."""
ps = _ps(bot)
loop = asyncio.get_running_loop()
try:
while ps["listen"] or ps["trigger"]:
await asyncio.sleep(_FLUSH_INTERVAL)
now = time.monotonic()
to_flush: list[tuple[str, bytes]] = []
with ps["lock"]:
for name in list(ps["last_ts"]):
elapsed = now - ps["last_ts"][name]
if elapsed >= ps["silence_gap"] and name in ps["buffers"]:
pcm = bytes(ps["buffers"].pop(name))
del ps["last_ts"][name]
to_flush.append((name, pcm))
for name, pcm in to_flush:
if len(pcm) < _MIN_BYTES:
continue
try:
text = await loop.run_in_executor(
None, _transcribe, ps, pcm,
)
except Exception:
log.exception("voice: transcription failed for %s", name)
continue
if not text or text.strip("., ") == "":
continue
trigger = ps["trigger"]
if trigger and text.lower().startswith(trigger.lower()):
remainder = text[len(trigger):].strip().lstrip(",.;:!?")
if remainder:
log.info("voice: trigger from %s: %s", name, remainder)
bot._spawn(
_tts_play(bot, remainder), name="voice-tts",
)
continue
if ps["listen"]:
log.info("voice: %s said: %s", name, text)
await bot.action("0", f"heard {name} say: {text}")
except asyncio.CancelledError:
pass
except Exception:
log.exception("voice: flush monitor error")
# -- TTS: Piper fetch + playback --------------------------------------------
def _fetch_tts(piper_url: str, text: str) -> str | None:
"""POST text to Piper TTS and save the WAV response. Blocking."""
import tempfile
try:
payload = json.dumps({"text": text}).encode()
req = urllib.request.Request(
piper_url, data=payload, method="POST",
)
req.add_header("Content-Type", "application/json")
resp = _urlopen(req, timeout=30, proxy=False)
data = resp.read()
resp.close()
if not data:
return None
tmp = tempfile.NamedTemporaryFile(
suffix=".wav", prefix="derp_tts_", delete=False,
)
tmp.write(data)
tmp.close()
return tmp.name
except Exception:
log.exception("voice: TTS fetch failed")
return None
async def _tts_play(bot, text: str):
"""Fetch TTS audio and play it via stream_audio.
Uses the configured voice profile (voice, fx, piper params) when set,
otherwise falls back to Piper's default voice.
"""
from pathlib import Path
ps = _ps(bot)
loop = asyncio.get_running_loop()
if ps["voice"] or ps["fx"]:
wav_path = await loop.run_in_executor(
None, lambda: _fetch_tts_voice(
ps["piper_url"], text,
voice=ps["voice"],
length_scale=ps["length_scale"],
noise_scale=ps["noise_scale"],
noise_w=ps["noise_w"],
fx=ps["fx"],
),
)
else:
wav_path = await loop.run_in_executor(
None, _fetch_tts, ps["piper_url"], text,
)
if wav_path is None:
return
try:
# Signal music plugin to duck, wait for it to take effect
bot.registry._tts_active = True
await asyncio.sleep(1.5)
await _ack_tone(bot)
done = asyncio.Event()
await bot.stream_audio(str(wav_path), volume=1.0, on_done=done)
await done.wait()
finally:
bot.registry._tts_active = False
Path(wav_path).unlink(missing_ok=True)
# -- Listener lifecycle -----------------------------------------------------
def _ensure_listener(bot):
"""Register the sound listener callback (idempotent)."""
ps = _ps(bot)
if ps["_listener_registered"]:
return
if not hasattr(bot, "_sound_listeners"):
return
bot._sound_listeners.append(lambda user, chunk: _on_voice(bot, user, chunk))
ps["_listener_registered"] = True
log.info("voice: registered sound listener")
def _ensure_flush_task(bot):
"""Start the flush monitor if not running."""
ps = _ps(bot)
task = ps.get("flush_task")
if task and not task.done():
return
ps["flush_task"] = bot._spawn(
_flush_monitor(bot), name="voice-flush-monitor",
)
def _stop_flush_task(bot):
"""Cancel the flush monitor."""
ps = _ps(bot)
task = ps.get("flush_task")
if task and not task.done():
task.cancel()
ps["flush_task"] = None
# -- Commands ----------------------------------------------------------------
@command("listen", help="Voice: !listen [on|off] -- toggle STT", tier="admin")
async def cmd_listen(bot, message):
"""Toggle voice-to-text transcription."""
if not _is_mumble(bot):
await bot.reply(message, "Voice is Mumble-only")
return
ps = _ps(bot)
parts = message.text.split()
if len(parts) < 2:
state = "on" if ps["listen"] else "off"
trigger = ps["trigger"]
info = f"Listen: {state}"
if trigger:
info += f" | Trigger: {trigger}"
await bot.reply(message, info)
return
sub = parts[1].lower()
if sub == "on":
ps["listen"] = True
_ensure_listener(bot)
_ensure_flush_task(bot)
await bot.reply(message, "Listening for voice")
elif sub == "off":
ps["listen"] = False
if not ps["trigger"]:
with ps["lock"]:
ps["buffers"].clear()
ps["last_ts"].clear()
_stop_flush_task(bot)
await bot.reply(message, "Stopped listening")
else:
await bot.reply(message, "Usage: !listen [on|off]")
@command("say", help="Voice: !say <text> -- text-to-speech")
async def cmd_say(bot, message):
"""Speak text aloud via Piper TTS."""
if not _is_mumble(bot):
await bot.reply(message, "Voice is Mumble-only")
return
parts = message.text.split(None, 1)
if len(parts) < 2:
await bot.reply(message, "Usage: !say <text>")
return
text = parts[1].strip()
if len(text) > _MAX_SAY_LEN:
await bot.reply(message, f"Text too long (max {_MAX_SAY_LEN} chars)")
return
bot._spawn(_tts_play(bot, text), name="voice-tts")
def _split_fx(fx: str) -> tuple[list[str], str]:
"""Split FX chain into rubberband CLI args and ffmpeg filter string.
Alpine's ffmpeg lacks librubberband, so pitch shifting is handled by
the ``rubberband`` CLI tool and remaining filters by ffmpeg.
"""
import math
parts = fx.split(",")
rb_args: list[str] = []
ff_parts: list[str] = []
for part in parts:
if part.startswith("rubberband="):
opts: dict[str, str] = {}
for kv in part[len("rubberband="):].split(":"):
k, _, v = kv.partition("=")
opts[k] = v
if "pitch" in opts:
semitones = 12 * math.log2(float(opts["pitch"]))
rb_args += ["--pitch", f"{semitones:.2f}"]
if opts.get("formant") == "1":
rb_args.append("--formant")
else:
ff_parts.append(part)
return rb_args, ",".join(ff_parts)
def _fetch_tts_voice(piper_url: str, text: str, *, voice: str = "",
speaker_id: int = 0, length_scale: float = 1.0,
noise_scale: float = 0.667, noise_w: float = 0.8,
fx: str = "") -> str | None:
"""Fetch TTS with explicit voice params and optional FX. Blocking.
Pitch shifting uses the ``rubberband`` CLI (Alpine ffmpeg has no
librubberband); remaining audio filters go through ffmpeg.
"""
import os
import subprocess
import tempfile
payload = {"text": text}
if voice:
payload["voice"] = voice
if speaker_id:
payload["speaker_id"] = speaker_id
payload["length_scale"] = length_scale
payload["noise_scale"] = noise_scale
payload["noise_w"] = noise_w
data = json.dumps(payload).encode()
req = urllib.request.Request(piper_url, data=data, method="POST")
req.add_header("Content-Type", "application/json")
resp = _urlopen(req, timeout=30, proxy=False)
wav_data = resp.read()
resp.close()
if not wav_data:
return None
tmp = tempfile.NamedTemporaryFile(suffix=".wav", prefix="derp_aud_", delete=False)
tmp.write(wav_data)
tmp.close()
if not fx:
return tmp.name
rb_args, ff_filters = _split_fx(fx)
current = tmp.name
# Pitch shift via rubberband CLI
if rb_args:
rb_out = tempfile.NamedTemporaryFile(
suffix=".wav", prefix="derp_aud_", delete=False,
)
rb_out.close()
r = subprocess.run(
["rubberband"] + rb_args + [current, rb_out.name],
capture_output=True, timeout=15,
)
os.unlink(current)
if r.returncode != 0:
log.warning("voice: rubberband failed: %s", r.stderr[:200])
os.unlink(rb_out.name)
return None
current = rb_out.name
# Remaining filters via ffmpeg
if ff_filters:
ff_out = tempfile.NamedTemporaryFile(
suffix=".wav", prefix="derp_aud_", delete=False,
)
ff_out.close()
r = subprocess.run(
["ffmpeg", "-y", "-i", current, "-af", ff_filters, ff_out.name],
capture_output=True, timeout=15,
)
os.unlink(current)
if r.returncode != 0:
log.warning("voice: ffmpeg failed: %s", r.stderr[:200])
os.unlink(ff_out.name)
return None
current = ff_out.name
return current
@command("audition", help="Voice: !audition -- play voice samples", tier="admin")
async def cmd_audition(bot, message):
"""Play voice samples through Mumble for comparison."""
if not _is_mumble(bot):
return
ps = _ps(bot)
piper_url = ps["piper_url"]
phrase = "The sorcerer has arrived. I have seen things beyond your understanding."
# FX building blocks
_deep = "rubberband=pitch=0.87:formant=1"
_bass = "bass=g=6:f=110:w=0.6"
_bass_heavy = "equalizer=f=80:t=h:w=150:g=8"
_echo_subtle = "aecho=0.8:0.6:25|40:0.25|0.15"
_echo_chamber = "aecho=0.8:0.88:60:0.35"
_echo_cave = "aecho=0.8:0.7:40|70|100:0.3|0.2|0.1"
samples = [
# -- Base voices (no FX) for reference
("ryan-high raw", "en_US-ryan-high", 0, ""),
("lessac-high raw", "en_US-lessac-high", 0, ""),
# -- Deep pitch only
("ryan deep", "en_US-ryan-high", 0,
_deep),
("lessac deep", "en_US-lessac-high", 0,
_deep),
# -- Deep + bass boost
("ryan deep+bass", "en_US-ryan-high", 0,
f"{_deep},{_bass}"),
("lessac deep+bass", "en_US-lessac-high", 0,
f"{_deep},{_bass}"),
# -- Deep + heavy bass
("ryan deep+heavy bass", "en_US-ryan-high", 0,
f"{_deep},{_bass_heavy}"),
# -- Deep + bass + subtle echo
("ryan deep+bass+echo", "en_US-ryan-high", 0,
f"{_deep},{_bass},{_echo_subtle}"),
("lessac deep+bass+echo", "en_US-lessac-high", 0,
f"{_deep},{_bass},{_echo_subtle}"),
# -- Deep + bass + chamber reverb
("ryan deep+bass+chamber", "en_US-ryan-high", 0,
f"{_deep},{_bass},{_echo_chamber}"),
("lessac deep+bass+chamber", "en_US-lessac-high", 0,
f"{_deep},{_bass},{_echo_chamber}"),
# -- Deep + heavy bass + cave reverb
("ryan deep+heavybass+cave", "en_US-ryan-high", 0,
f"{_deep},{_bass_heavy},{_echo_cave}"),
# -- Libritts best candidates with full sorcerer chain
("libritts #20 deep+bass+echo", "en_US-libritts_r-medium", 20,
f"{_deep},{_bass},{_echo_subtle}"),
("libritts #22 deep+bass+echo", "en_US-libritts_r-medium", 22,
f"{_deep},{_bass},{_echo_subtle}"),
("libritts #79 deep+bass+chamber", "en_US-libritts_r-medium", 79,
f"{_deep},{_bass},{_echo_chamber}"),
]
await bot.reply(message, f"Auditioning {len(samples)} voice samples...")
loop = asyncio.get_running_loop()
from pathlib import Path
for i, (label, voice, sid, fx) in enumerate(samples, 1):
await bot.send("0", f"[{i}/{len(samples)}] {label}")
await asyncio.sleep(1)
sample_wav = await loop.run_in_executor(
None, lambda v=voice, s=sid, f=fx: _fetch_tts_voice(
piper_url, phrase, voice=v, speaker_id=s,
length_scale=1.15, noise_scale=0.4, noise_w=0.5, fx=f,
),
)
if sample_wav is None:
await bot.send("0", " (failed)")
continue
try:
done = asyncio.Event()
await bot.stream_audio(sample_wav, volume=1.0, on_done=done)
await done.wait()
finally:
Path(sample_wav).unlink(missing_ok=True)
await asyncio.sleep(2)
await bot.send("0", "Audition complete.")
# -- Plugin lifecycle --------------------------------------------------------
async def on_connected(bot) -> None:
"""Re-register listener after reconnect; play TTS greeting on first connect."""
if not _is_mumble(bot):
return
ps = _ps(bot)
if ps["listen"] or ps["trigger"]:
_ensure_listener(bot)
_ensure_flush_task(bot)
# Greet via TTS on first connection only
greet = getattr(bot, "config", {}).get("mumble", {}).get("greet")
if greet and not ps.get("_greeted"):
ps["_greeted"] = True
ready = getattr(bot, "_is_audio_ready", None)
if ready:
for _ in range(20):
if ready():
break
await asyncio.sleep(0.5)
bot._spawn(_tts_play(bot, greet), name="voice-greet")