Compare commits

...

23 Commits

Author SHA1 Message Date
user
192ea717a7 feat: split CI into gitleaks, lint, and test jobs
Some checks failed
CI / gitleaks (push) Failing after 15s
CI / lint (push) Failing after 17s
CI / test (3.11) (push) Has been skipped
CI / test (3.12) (push) Has been skipped
CI / test (3.13) (push) Has been skipped
- Add gitleaks secret scanning (full history)
- Separate lint (ruff, Python 3.13 only) from test matrix
- Test job gates on lint; gitleaks runs in parallel

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 05:51:53 +01:00
user
7a4aa65882 fix: align cmd_stop else branch with _play_loop finally cleanup
The else branch (no active task) only cleared current, task, and
duck_vol. Now resets all play state fields to match _play_loop's
finally block: done_event, duck_task, progress, and cur_seek.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 05:49:53 +01:00
user
2cd1d5efb1 fix: race condition in skip/seek/stop losing track state
task.cancel() triggers _play_loop's finally block asynchronously.
When cmd_skip or cmd_seek called _ensure_loop before the finally
block ran, the old task's cleanup would overwrite the new task's
state -- causing !np to report "Nothing playing" while audio
was still streaming.

Now await the cancelled task before restarting the loop, ensuring
the finally block completes first.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 05:45:00 +01:00
user
95981275b5 feat: add OpenRouter LLM chat plugin (!ask, !chat)
Single-shot (!ask) and conversational (!chat) LLM commands backed by
OpenRouter's API. Per-user history (20 msg cap), 5s cooldown, reasoning
model fallback, and model switching via subcommands.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 05:39:11 +01:00
user
66116d2caf docs: update Piper TTS endpoint and document available voices
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 05:01:57 +01:00
user
eded764f6a fix: update Piper TTS endpoint and request format
Piper is on 192.168.129.9:5100, not :5000. It expects POST with
JSON body {"text": "..."}, not GET with query params. Also update
Whisper default to 192.168.129.9.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 04:56:24 +01:00
user
9783365b1e feat: add extra Mumble bot instances and TTS greeting
Support [[mumble.extra]] config for additional Mumble identities that
inherit connection settings from the main [mumble] section. Extra bots
get their own state DB and do not run the voice trigger by default.

Add TTS greeting on first connect via mumble.greet config option.
Merlin joins as a second identity with his own client certificate.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 04:34:10 +01:00
user
165938a801 fix: mumble disconnect loop from stale socket and dead parent thread
Two issues causing ~2min reconnect cycles:

1. pymumble captures threading.current_thread() as parent_thread at
   init. Since we construct it in a run_in_executor thread, the parent
   dies after _connect_sync returns, triggering pymumble's loop exit.
   Fix: point parent_thread at threading.main_thread().

2. pymumble's init_connection() drops the control_socket reference
   without closing it. The lingering TCP connection makes Murmur kick
   the new session with "connected from another device". Fix: patch
   init_connection to close the old socket before reconnecting.
2026-02-22 04:24:23 +01:00
user
221cb1f06b fix: voice trigger not receiving audio from pymumble
pymumble passes a User object, not a dict. The isinstance(user, dict)
check returned False, setting name to None and silently discarding
every voice packet. Use try/except for dict-like access instead.
2026-02-22 04:02:23 +01:00
user
c4908f2a63 docs: document seek command and volume persistence 2026-02-22 03:31:39 +01:00
user
c493583a71 feat: add !seek command and persist volume across restarts
Seek to absolute or relative positions mid-track via !seek. Supports
M:SS and plain seconds with +/- prefixes. Volume is now saved to
bot.state and restored on connect.
2026-02-22 03:31:35 +01:00
user
7c099d8cf0 docs: document voice trigger configuration 2026-02-22 03:24:07 +01:00
user
e127f72660 feat: add always-on voice trigger mode with TTS echo
When [voice] trigger is set in config, the bot continuously listens and
transcribes voice. Speech starting with the trigger word is stripped and
echoed back via TTS. Non-triggered speech is silently discarded unless
!listen is also active.
2026-02-22 03:24:03 +01:00
user
7b9359c152 docs: document voice plugin commands
Add Voice STT/TTS section covering !listen, !say, and optional
[voice] config block with whisper_url, piper_url, silence_gap.
2026-02-22 03:08:10 +01:00
user
9fbf45f67d feat: add voice plugin with STT and TTS
Whisper STT: buffers incoming voice PCM per user, transcribes on
silence gap via local whisper.cpp endpoint, posts results as actions.

Piper TTS: !say fetches WAV from local Piper endpoint and plays via
stream_audio(). 37 tests cover buffering, flush logic, transcription,
WAV encoding, commands, and lifecycle.
2026-02-22 03:08:02 +01:00
user
039f060b50 feat: add sound listener hook to MumbleBot
Allow plugins to register callbacks for incoming voice PCM via
bot._sound_listeners. Empty list by default = zero overhead.
2026-02-22 03:07:55 +01:00
user
df20c154ca feat: download audio before playback, add !keep and !kept commands
Audio is now downloaded to data/music/ before playback begins,
eliminating CDN hiccups mid-stream. Falls back to streaming on
download failure. Files are deleted after playback unless marked
with !keep. stream_audio detects local files and uses a direct
ffmpeg pipeline (no yt-dlp).
2026-02-22 02:52:51 +01:00
user
ab924444de fix: survive mumble disconnects without restarting audio stream
Guard stream_audio with _is_audio_ready() so that PCM frames are
dropped (not crashed on) when pymumble recreates SoundOutput with
encoder=None during reconnect. The ffmpeg pipeline stays alive,
position tracking remains accurate, and audio feeding resumes once
the codec is negotiated. Listeners hear brief silence instead of
a 30+ second restart with URL re-resolution.

Also adds chat messages to _auto_resume so users see what the bot
intends ("Resuming 'X' at M:SS in a moment" / "...aborted").
2026-02-22 02:41:44 +01:00
user
ec55c2aef1 feat: auto-resume music on reconnect, sorcerer tier, cert auth
Auto-resume: save playback position on stream errors and cancellation,
restore automatically after reconnect or container restart once the
channel is silent. Plugin lifecycle hook (on_connected) ensures the
reconnect watcher starts without waiting for user commands.

Sorcerer tier: new permission level between oper and admin. Configured
via [mumble] sorcerers list in derp.toml.

Mumble cert auth: pass certfile/keyfile to pymumble for client
certificate authentication.

Fixes: stream_audio now re-raises CancelledError and Exception so
_play_loop detects failures correctly. Subprocess cleanup uses 3s
timeout. Graceful shutdown cancels background tasks before stopping
pymumble. Safe getattr for _opers in core plugin.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 02:14:43 +01:00
user
f899241d73 feat: support relative volume adjustment (+N/-N)
!volume +10 increases by 10, !volume -5 decreases by 5.
Out-of-range results (below 0 or above 100) are rejected.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 00:18:43 +01:00
user
f189cbd290 feat: add !resume to continue playback from last interruption
Tracks playback position via frame counting in stream_audio().
On stop/skip, saves URL + elapsed time to bot.state (SQLite).
!resume reloads the track and seeks to the saved position via
ffmpeg -ss. State persists across bot restarts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 00:15:39 +01:00
user
9d58a5d073 fix: slow volume ramp to 1s for smoother transitions
Reduce _max_step from 0.1 to 0.02 per frame, extending the full
0-to-1 volume ramp from ~200ms to ~1 second.
2026-02-21 23:56:04 +01:00
user
e4e1e219f0 feat: add YouTube search to !play and fix NA URL fallback
Non-URL input (e.g. !play classical music) searches YouTube for 10
results and picks one randomly. Also fixes --flat-playlist returning
"NA" as the URL for single videos by falling back to the original
input URL.
2026-02-21 23:52:01 +01:00
18 changed files with 4109 additions and 57 deletions

View File

@@ -4,9 +4,31 @@ on:
branches: [master]
pull_request:
branches: [master]
jobs:
gitleaks:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: gitleaks/gitleaks-action@v2
env:
GITLEAKS_LICENSE: ${{ secrets.GITLEAKS_LICENSE }}
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.13"
- run: pip install -e . && pip install ruff
- run: ruff check src/ tests/ plugins/
test:
runs-on: ubuntu-latest
needs: [lint]
strategy:
matrix:
python-version: ["3.11", "3.12", "3.13"]
@@ -15,6 +37,5 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- run: pip install -e . && pip install pytest ruff
- run: ruff check src/ tests/ plugins/
- run: pip install -e . && pip install pytest
- run: pytest -v

View File

@@ -16,4 +16,6 @@ services:
- ./config/derp.toml:/app/config/derp.toml:ro,Z
- ./data:/app/data:Z
- ./secrets:/app/secrets:ro,Z
command: ["--verbose"]
environment:
- OPENROUTER_API_KEY
command: ["--verbose", "--cprofile"]

View File

@@ -548,18 +548,28 @@ HTML stripped on receive, escaped on send. IRC-only commands are no-ops.
```
!play <url|playlist> # Play audio (YouTube, SoundCloud, etc.)
!play <playlist-url> # Playlist tracks expanded into queue
!play classical music # YouTube search, random pick from top 10
!stop # Stop playback, clear queue
!skip # Skip current track
!resume # Resume last stopped/skipped track
!queue # Show queue
!queue <url> # Add to queue (alias for !play)
!np # Now playing
!volume # Show current volume
!volume 75 # Set volume (0-100, default 50)
!duck # Show ducking status
!duck on # Enable voice ducking
!duck off # Disable voice ducking
!duck floor 5 # Set duck floor volume (0-100, default 1)
!duck silence 20 # Set silence timeout seconds (default 15)
!duck restore 45 # Set restore ramp duration seconds (default 30)
```
Requires: `yt-dlp`, `ffmpeg`, `libopus` on the host.
Max 50 tracks in queue. Playlists auto-expand; excess truncated at limit.
Volume ramps smoothly over ~200ms (no abrupt jumps mid-playback).
Volume ramps smoothly over ~1s (no abrupt jumps mid-playback).
`!resume` restores position across restarts (persisted via `bot.state`).
Auto-resumes on reconnect if channel is silent (waits up to 60s for silence).
Mumble-only: `!play` replies with error on other adapters, others silently no-op.
## Plugin Template

View File

@@ -187,6 +187,8 @@ unchanged. The server name is derived from the hostname automatically.
| `!username list` | Show available services by category |
| `!alert <add\|del\|list\|check\|info\|history>` | Keyword alert subscriptions across platforms |
| `!searx <query>` | Search SearXNG and show top results |
| `!ask <question>` | Single-shot LLM question via OpenRouter |
| `!chat <msg\|clear\|model\|models>` | Conversational LLM chat with history |
| `!jwt <token>` | Decode JWT header, claims, and flag issues |
| `!mac <address\|random\|update>` | MAC OUI vendor lookup / random MAC |
| `!abuse <ip> [ip2 ...]` | AbuseIPDB reputation check |
@@ -815,6 +817,55 @@ Title Two -- https://example.com/page2
Title Three -- https://example.com/page3
```
### `!ask` / `!chat` -- LLM Chat (OpenRouter)
Chat with large language models via [OpenRouter](https://openrouter.ai/)'s
API. `!ask` is stateless (single question), `!chat` maintains per-user
conversation history.
```
!ask <question> Single-shot question (no history)
!chat <message> Chat with conversation history
!chat clear Clear your history
!chat model Show current model
!chat model <name> Switch model
!chat models List suggested free models
```
Output format:
```
<alice> !ask what is DNS
<derp> DNS (Domain Name System) translates domain names to IP addresses...
<alice> !chat explain TCP
<derp> TCP is a connection-oriented transport protocol...
<alice> !chat how does the handshake work
<derp> The TCP three-way handshake: SYN, SYN-ACK, ACK...
```
- Open to all users, works in channels and PMs
- Per-user cooldown: 5 seconds between requests
- Conversation history capped at 20 messages per user (ephemeral, not
persisted across restarts)
- Responses truncated to 400 characters; multi-line replies use paste overflow
- Default model: `openrouter/auto` (auto-routes to best available free model)
- Reasoning models (DeepSeek R1) are handled transparently -- falls back to
the `reasoning` field when `content` is empty
- Rate limit errors (HTTP 429) produce a clear user-facing message
Configuration:
```toml
[openrouter]
api_key = "" # or set OPENROUTER_API_KEY env var
model = "openrouter/auto" # default model
system_prompt = "You are a helpful IRC bot assistant. Keep responses concise and under 200 words."
```
API key: set `OPENROUTER_API_KEY` env var (preferred) or `api_key` under
`[openrouter]` in config. The env var takes precedence.
### `!alert` -- Keyword Alert Subscriptions
Search keywords across 27 platforms and announce new results. Unlike
@@ -1566,23 +1617,229 @@ and voice transmission.
```
!play <url|playlist> Play audio or add to queue (playlists expanded)
!play <query> Search YouTube, play a random result
!stop Stop playback, clear queue
!skip Skip current track
!seek <offset> Seek to position (1:30, 90, +30, -30)
!resume Resume last stopped/skipped track from saved position
!queue Show queue
!queue <url> Add to queue (alias for !play)
!np Now playing
!volume [0-100] Get/set volume
!volume [0-100] Get/set volume (persisted across restarts)
!keep Keep current track's audio file after playback
!kept [clear] List kept files or clear all
!testtone Play 3-second 440Hz test tone
```
- Queue holds up to 50 tracks
- Non-URL input is treated as a YouTube search; 10 results are fetched
and one is picked randomly
- Playlists are expanded into individual tracks; excess tracks are
truncated at the queue limit
- Volume changes ramp smoothly over ~200ms (no abrupt jumps)
- Default volume: 50%
- Volume changes ramp smoothly over ~1s (no abrupt jumps)
- Default volume: 50%; persisted via `bot.state` across restarts
- Titles resolved via `yt-dlp --flat-playlist` before playback
- Audio pipeline: `yt-dlp | ffmpeg` subprocess, PCM fed to pymumble
- Audio is downloaded before playback (`data/music/`); files are deleted
after playback unless `!keep` is used. Falls back to streaming on
download failure.
- Audio pipeline: `ffmpeg` subprocess for local files, `yt-dlp | ffmpeg`
for streaming fallback, PCM fed to pymumble
- Commands are Mumble-only; `!play` on other adapters replies with an error,
other music commands silently no-op
- Playback runs as an asyncio background task; the bot remains responsive
to text commands during streaming
- `!resume` continues from where playback was interrupted (`!stop`/`!skip`);
position is persisted via `bot.state` and survives bot restarts
### Auto-Resume on Reconnect
If the bot disconnects while music is playing (network hiccup, server
restart), it saves the current track and position. On reconnect, it
automatically resumes playback -- but only after the channel is silent
(using the same silence threshold as voice ducking, default 15s).
- Resume state is saved on both explicit stop/skip and on stream errors
(disconnect)
- Works across container restarts (cold boot) and network reconnections
- The bot waits up to 60s for silence; if the channel stays active, it
aborts and the saved state remains for manual `!resume`
- Chat messages announce resume intentions and abort reasons
- The reconnect watcher starts via the `on_connected` plugin lifecycle hook
### Seeking
Fast-forward or rewind within the currently playing track.
```
!seek 1:30 Seek to 1 minute 30 seconds
!seek 90 Seek to 90 seconds
!seek +30 Jump forward 30 seconds
!seek -30 Jump backward 30 seconds
!seek +1:00 Jump forward 1 minute
```
- Absolute offsets (`1:30`, `90`) seek to that position from the start
- Relative offsets (`+30`, `-1:00`) jump from the current position
- Negative seeks are clamped to the start of the track
- Seeking restarts the audio pipeline at the new position
### Disconnect-Resilient Streaming
During brief network disconnects (~5-15s), the audio stream stays alive.
The ffmpeg pipeline keeps running; PCM frames are read at real-time pace
but dropped while pymumble reconnects. Once the connection re-establishes
and the codec is negotiated, audio feeding resumes automatically. The
listener hears a brief silence instead of a 30+ second restart with URL
re-resolution.
- The `_is_audio_ready()` guard checks: mumble connected, sound_output
exists, Opus encoder initialized
- Frames are counted even during disconnect, so position tracking remains
accurate
- State transitions (connected/disconnected) are logged for diagnostics
### Voice Ducking
When other users speak in the Mumble channel, the music volume automatically
ducks (lowers) to a configurable floor. After a configurable silence period,
volume gradually restores to the user-set level in small steps.
```
!duck Show ducking status and settings
!duck on Enable voice ducking
!duck off Disable voice ducking
!duck floor <0-100> Set floor volume % (default: 1)
!duck silence <sec> Set silence timeout in seconds (default: 15)
!duck restore <sec> Set restore ramp duration in seconds (default: 30)
```
Behavior:
- Enabled by default; voice is detected via pymumble's sound callback
- When someone speaks, volume drops immediately to the floor value
- After `silence` seconds of no voice, volume restores via a single
smooth linear ramp over `restore` seconds (default 30s)
- The per-frame volume ramp in `stream_audio` further smooths the
transition, eliminating audible steps
- Ducking resets when playback stops, skips, or the queue empties
Configuration (optional):
```toml
[music]
duck_enabled = true # Enable voice ducking (default: true)
duck_floor = 1 # Floor volume % during ducking (default: 1)
duck_silence = 15 # Seconds of silence before restoring (default: 15)
duck_restore = 30 # Seconds for smooth volume restore (default: 30)
```
### Download-First Playback
Audio is downloaded to `data/music/` before playback begins. This
eliminates CDN hiccups mid-stream and enables instant seeking. Files
are identified by a hash of the URL so the same URL reuses the same
file (natural dedup).
- If download fails, playback falls back to streaming (`yt-dlp | ffmpeg`)
- After a track finishes, the local file is automatically deleted
- Use `!keep` during playback to preserve the file
- Use `!kept` to list preserved files and their sizes
- Use `!kept clear` to delete all preserved files
- On cancel/error, files are not deleted (needed for `!resume`)
### Extra Mumble Bots
Run additional bot identities on the same Mumble server. Each extra bot
inherits the main `[mumble]` connection settings and overrides only what
differs (username, certificates, greeting). Extra bots share the plugin
registry but get their own state DB and do **not** run the voice trigger
by default (prevents double-processing).
```toml
[[mumble.extra]]
username = "merlin"
certfile = "secrets/mumble/merlin.crt"
keyfile = "secrets/mumble/merlin.key"
greet = "The sorcerer has arrived."
```
- `username`, `certfile`, `keyfile` -- identity overrides
- `greet` -- TTS message spoken on first connect (optional)
- All other `[mumble]` keys (host, port, password, admins, etc.) are inherited
- Voice trigger is disabled unless the extra entry includes a `voice` key
### Voice STT/TTS
Transcribe voice from Mumble users via Whisper STT and speak text aloud
via Piper TTS. Requires local Whisper and Piper services.
```
!listen [on|off] Toggle voice-to-text transcription (admin)
!listen Show current listen status
!say <text> Speak text aloud via TTS (max 500 chars)
```
STT behavior:
- When enabled, the bot buffers incoming voice PCM per user
- After a configurable silence gap (default 1.5s), the buffer is
transcribed via Whisper and posted as an action message
- Utterances shorter than 0.5s are discarded (noise filter)
- Utterances are capped at 30s to bound memory and latency
- Transcription results are posted as: `* derp heard Alice say: hello`
- The listener survives reconnects when `!listen` is on
TTS behavior:
- `!say` fetches WAV from Piper and plays it via `stream_audio()`
- Piper outputs 22050Hz WAV; ffmpeg resamples to 48kHz automatically
- TTS shares the audio output with music playback
- Text is limited to 500 characters
- Set `greet` in `[mumble]` or `[[mumble.extra]]` for automatic TTS on first connect
### Always-On Trigger Mode
Set a trigger word to enable always-on voice listening. The bot
continuously transcribes voice and watches for the trigger word. When
detected, the text after the trigger is spoken back via TTS. No
`!listen` command needed.
```toml
[voice]
trigger = "claude"
```
Behavior:
- Listener starts automatically on connect (no `!listen on` required)
- All speech is transcribed and checked for the trigger word
- Trigger match is case-insensitive: "Claude", "CLAUDE", "claude" all work
- On match, the trigger word is stripped and the remainder is sent to TTS
- Non-triggered speech is silently discarded (unless `!listen` is also on)
- When both trigger and `!listen` are active, triggered speech goes to
TTS and all other speech is posted as the usual "heard X say: ..."
- `!listen` status shows trigger configuration when set
Configuration (optional):
```toml
[voice]
whisper_url = "http://192.168.129.9:8080/inference"
piper_url = "http://192.168.129.9:5100/"
silence_gap = 1.5
trigger = ""
```
Piper TTS accepts POST with JSON body `{"text": "..."}` and returns
22050 Hz 16-bit PCM mono WAV. Default voice: `en_US-lessac-medium`.
Available voices:
| Region | Voices |
|--------|--------|
| en_US | lessac-medium/high, amy-medium, ryan-medium/high, joe-medium, john-medium, kristin-medium, danny-low, ljspeech-high |
| en_GB | alba-medium, cori-medium/high, jenny_dioco-medium, northern_english_male-medium, southern_english_female-low |
| fr_FR | siwis-low/medium, gilles-low, tom-medium, mls-medium, mls_1840-low, upmc-medium |
To switch the active voice, set `piper_tts_voice` (e.g.
`fr_FR-siwis-medium`) and redeploy the TTS service.

View File

@@ -43,3 +43,27 @@ new_opus = "lib_location = find_library('opus') or 'libopus.so.0'"
assert old_opus in src, "opuslib find_library patch target not found"
p.write_text(src.replace(old_opus, new_opus))
print("opuslib musl patch applied")
# -- pymumble: close old socket before reconnecting --
# init_connection() drops control_socket reference without closing it.
# The lingering TCP connection causes Murmur to kick the new session
# with "You connected to the server from another device".
p = pathlib.Path(f"{site}/pymumble_py3/mumble.py")
src = p.read_text()
old_init = """\
self.connected = PYMUMBLE_CONN_STATE_NOT_CONNECTED
self.control_socket = None"""
new_init = """\
self.connected = PYMUMBLE_CONN_STATE_NOT_CONNECTED
if getattr(self, 'control_socket', None) is not None:
try:
self.control_socket.close()
except Exception:
pass
self.control_socket = None"""
assert old_init in src, "pymumble init_connection socket patch target not found"
p.write_text(src.replace(old_init, new_init))
print("pymumble reconnect socket patch applied")

View File

@@ -145,7 +145,8 @@ async def cmd_whoami(bot, message):
prefix = message.prefix or "unknown"
tier = bot._get_tier(message)
tags = [tier]
if message.prefix and message.prefix in bot._opers:
opers = getattr(bot, "_opers", set())
if message.prefix and message.prefix in opers:
tags.append("IRCOP")
await bot.reply(message, f"{prefix} [{', '.join(tags)}]")
@@ -158,12 +159,16 @@ async def cmd_admins(bot, message):
parts.append(f"Admin: {', '.join(bot._admins)}")
else:
parts.append("Admin: (none)")
sorcerers = getattr(bot, "_sorcerers", [])
if sorcerers:
parts.append(f"Sorcerer: {', '.join(sorcerers)}")
if bot._operators:
parts.append(f"Oper: {', '.join(bot._operators)}")
if bot._trusted:
parts.append(f"Trusted: {', '.join(bot._trusted)}")
if bot._opers:
parts.append(f"IRCOPs: {', '.join(sorted(bot._opers))}")
opers = getattr(bot, "_opers", set())
if opers:
parts.append(f"IRCOPs: {', '.join(sorted(opers))}")
else:
parts.append("IRCOPs: (none)")
await bot.reply(message, " | ".join(parts))

298
plugins/llm.py Normal file
View File

@@ -0,0 +1,298 @@
"""Plugin: LLM chat via OpenRouter."""
from __future__ import annotations
import asyncio
import json
import logging
import os
import time
import urllib.request
from derp.http import urlopen as _urlopen
from derp.plugin import command
_log = logging.getLogger(__name__)
# -- Constants ---------------------------------------------------------------
_API_URL = "https://openrouter.ai/api/v1/chat/completions"
_DEFAULT_MODEL = "openrouter/auto"
_TIMEOUT = 30
_MAX_HISTORY = 20
_MAX_REPLY_LEN = 400
_COOLDOWN = 5
_DEFAULT_SYSTEM = (
"You are a helpful IRC bot assistant. Keep responses concise and under 200 words."
)
# -- Per-bot runtime state ---------------------------------------------------
def _ps(bot):
"""Per-bot plugin runtime state."""
return bot._pstate.setdefault("llm", {
"histories": {}, # {nick: [{"role": ..., "content": ...}, ...]}
"cooldowns": {}, # {nick: monotonic_ts}
"model": "", # override per-bot; empty = use default
})
# -- Helpers -----------------------------------------------------------------
def _get_api_key(bot) -> str:
"""Resolve API key from env or config."""
return (
os.environ.get("OPENROUTER_API_KEY", "")
or bot.config.get("openrouter", {}).get("api_key", "")
)
def _get_model(bot) -> str:
"""Resolve current model."""
ps = _ps(bot)
return (
ps["model"]
or bot.config.get("openrouter", {}).get("model", "")
or _DEFAULT_MODEL
)
def _get_system_prompt(bot) -> str:
"""Resolve system prompt from config or default."""
return bot.config.get("openrouter", {}).get("system_prompt", _DEFAULT_SYSTEM)
def _truncate(text: str, max_len: int = _MAX_REPLY_LEN) -> str:
"""Truncate text with ellipsis if needed."""
if len(text) <= max_len:
return text
return text[: max_len - 3].rstrip() + "..."
def _check_cooldown(bot, nick: str) -> bool:
"""Return True if the user is within cooldown period."""
ps = _ps(bot)
last = ps["cooldowns"].get(nick, 0)
return (time.monotonic() - last) < _COOLDOWN
def _set_cooldown(bot, nick: str) -> None:
"""Record a cooldown timestamp for a user."""
_ps(bot)["cooldowns"][nick] = time.monotonic()
# -- Blocking HTTP call ------------------------------------------------------
def _chat_request(api_key: str, model: str, messages: list[dict]) -> dict:
"""Blocking OpenRouter chat completion. Run via executor.
Returns the parsed JSON response dict.
Raises on HTTP or connection errors.
"""
payload = json.dumps({
"model": model,
"messages": messages,
}).encode()
req = urllib.request.Request(_API_URL, data=payload, method="POST")
req.add_header("Authorization", f"Bearer {api_key}")
req.add_header("Content-Type", "application/json")
resp = _urlopen(req, timeout=_TIMEOUT)
raw = resp.read()
resp.close()
return json.loads(raw)
def _extract_reply(data: dict) -> str:
"""Extract reply text from OpenRouter response.
Handles reasoning models that return content="" with a reasoning field.
"""
choices = data.get("choices", [])
if not choices:
return ""
msg = choices[0].get("message", {})
content = (msg.get("content") or "").strip()
if content:
return content
# Fallback for reasoning models
reasoning = (msg.get("reasoning") or "").strip()
return reasoning
# -- Command handlers --------------------------------------------------------
@command("ask", help="Ask: !ask <question>")
async def cmd_ask(bot, message):
"""Single-shot LLM question (no history).
Usage: !ask <question>
"""
parts = message.text.split(None, 1)
if len(parts) < 2 or not parts[1].strip():
await bot.reply(message, "Usage: !ask <question>")
return
api_key = _get_api_key(bot)
if not api_key:
await bot.reply(message, "OpenRouter API key not configured")
return
nick = message.nick
if _check_cooldown(bot, nick):
await bot.reply(message, "Cooldown -- wait a few seconds")
return
prompt = parts[1].strip()
model = _get_model(bot)
system = _get_system_prompt(bot)
messages = [
{"role": "system", "content": system},
{"role": "user", "content": prompt},
]
_set_cooldown(bot, nick)
loop = asyncio.get_running_loop()
try:
data = await loop.run_in_executor(
None, _chat_request, api_key, model, messages,
)
except urllib.error.HTTPError as exc:
if exc.code == 429:
await bot.reply(message, "Rate limited by OpenRouter -- try again later")
else:
await bot.reply(message, f"API error: HTTP {exc.code}")
return
except Exception as exc:
_log.warning("LLM request failed: %s", exc)
await bot.reply(message, f"Request failed: {exc}")
return
reply = _extract_reply(data)
if not reply:
await bot.reply(message, "No response from model")
return
lines = _truncate(reply).split("\n")
await bot.long_reply(message, lines, label="llm")
@command("chat", help="Chat: !chat <msg> | clear | model [name] | models")
async def cmd_chat(bot, message):
"""Conversational LLM chat with per-user history.
Usage:
!chat <message> Send a message (maintains history)
!chat clear Clear your conversation history
!chat model Show current model
!chat model <name> Switch model
!chat models List popular free models
"""
parts = message.text.split(None, 2)
if len(parts) < 2 or not parts[1].strip():
await bot.reply(message, "Usage: !chat <message> | clear | model [name] | models")
return
sub = parts[1].strip().lower()
# -- Subcommands ---------------------------------------------------------
if sub == "clear":
ps = _ps(bot)
nick = message.nick
if nick in ps["histories"]:
del ps["histories"][nick]
await bot.reply(message, "Conversation cleared")
return
if sub == "model":
if len(parts) > 2 and parts[2].strip():
new_model = parts[2].strip()
_ps(bot)["model"] = new_model
await bot.reply(message, f"Model set to: {new_model}")
else:
await bot.reply(message, f"Current model: {_get_model(bot)}")
return
if sub == "models":
models = [
"openrouter/auto -- auto-route to best available",
"google/gemma-3-27b-it:free",
"meta-llama/llama-3.3-70b-instruct:free",
"deepseek/deepseek-r1:free",
"qwen/qwen3-235b-a22b:free",
"mistralai/mistral-small-3.1-24b-instruct:free",
]
await bot.long_reply(message, models, label="models")
return
# -- Chat path -----------------------------------------------------------
api_key = _get_api_key(bot)
if not api_key:
await bot.reply(message, "OpenRouter API key not configured")
return
nick = message.nick
if _check_cooldown(bot, nick):
await bot.reply(message, "Cooldown -- wait a few seconds")
return
# Reconstruct full user text (sub might be part of the message)
user_text = message.text.split(None, 1)[1].strip()
ps = _ps(bot)
history = ps["histories"].setdefault(nick, [])
# Build messages
system = _get_system_prompt(bot)
history.append({"role": "user", "content": user_text})
# Cap history
if len(history) > _MAX_HISTORY:
history[:] = history[-_MAX_HISTORY:]
messages = [{"role": "system", "content": system}] + history
model = _get_model(bot)
_set_cooldown(bot, nick)
loop = asyncio.get_running_loop()
try:
data = await loop.run_in_executor(
None, _chat_request, api_key, model, messages,
)
except urllib.error.HTTPError as exc:
# Remove the failed user message from history
history.pop()
if exc.code == 429:
await bot.reply(message, "Rate limited by OpenRouter -- try again later")
else:
await bot.reply(message, f"API error: HTTP {exc.code}")
return
except Exception as exc:
history.pop()
_log.warning("LLM request failed: %s", exc)
await bot.reply(message, f"Request failed: {exc}")
return
reply = _extract_reply(data)
if not reply:
history.pop()
await bot.reply(message, "No response from model")
return
# Store assistant reply in history
history.append({"role": "assistant", "content": reply})
if len(history) > _MAX_HISTORY:
history[:] = history[-_MAX_HISTORY:]
lines = _truncate(reply).split("\n")
await bot.long_reply(message, lines, label="llm")

View File

@@ -3,9 +3,14 @@
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import random
import subprocess
import time
from dataclasses import dataclass
from pathlib import Path
from derp.plugin import command
@@ -20,6 +25,9 @@ class _Track:
url: str
title: str
requester: str
origin: str = "" # original user-provided URL for re-resolution
local_path: Path | None = None # set before playback
keep: bool = False # True = don't delete after playback
# -- Per-bot runtime state ---------------------------------------------------
@@ -27,12 +35,20 @@ class _Track:
def _ps(bot):
"""Per-bot plugin runtime state."""
cfg = getattr(bot, "config", {}).get("music", {})
return bot._pstate.setdefault("music", {
"queue": [],
"current": None,
"volume": 50,
"task": None,
"done_event": None,
"duck_enabled": cfg.get("duck_enabled", True),
"duck_floor": cfg.get("duck_floor", 1),
"duck_silence": cfg.get("duck_silence", 15),
"duck_restore": cfg.get("duck_restore", 30),
"duck_vol": None,
"duck_task": None,
"_watcher_task": None,
})
@@ -51,6 +67,86 @@ def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str:
return text[: max_len - 3].rstrip() + "..."
def _is_url(text: str) -> bool:
"""Check if text looks like a URL rather than a search query."""
return text.startswith(("http://", "https://", "ytsearch:"))
def _fmt_time(seconds: float) -> str:
"""Format seconds as M:SS."""
m, s = divmod(int(seconds), 60)
return f"{m}:{s:02d}"
def _parse_seek(arg: str) -> tuple[str, float]:
"""Parse a seek offset string into (mode, seconds).
Returns ``("abs", seconds)`` for absolute seeks (``1:30``, ``90``)
or ``("rel", +/-seconds)`` for relative (``+30``, ``-1:00``).
Raises ``ValueError`` on invalid input.
"""
if not arg:
raise ValueError("empty seek argument")
mode = "abs"
raw = arg
if raw[0] in ("+", "-"):
mode = "rel"
sign = -1 if raw[0] == "-" else 1
raw = raw[1:]
else:
sign = 1
if ":" in raw:
parts = raw.split(":", 1)
try:
minutes = int(parts[0])
seconds = int(parts[1])
except ValueError:
raise ValueError(f"invalid seek format: {arg}")
total = minutes * 60 + seconds
else:
try:
total = int(raw)
except ValueError:
raise ValueError(f"invalid seek format: {arg}")
return (mode, sign * float(total))
# -- Resume state persistence ------------------------------------------------
def _save_resume(bot, track: _Track, elapsed: float) -> None:
"""Persist current track and elapsed position for later resumption."""
data = json.dumps({
"url": track.origin or track.url,
"title": track.title,
"requester": track.requester,
"elapsed": round(elapsed, 2),
})
bot.state.set("music", "resume", data)
def _load_resume(bot) -> dict | None:
"""Load resume data, or None if absent/corrupt."""
raw = bot.state.get("music", "resume")
if not raw:
return None
try:
data = json.loads(raw)
if not isinstance(data, dict) or "url" not in data:
return None
return data
except (json.JSONDecodeError, TypeError):
return None
def _clear_resume(bot) -> None:
"""Remove persisted resume state."""
bot.state.delete("music", "resume")
def _resolve_tracks(url: str, max_tracks: int = _MAX_QUEUE) -> list[tuple[str, str]]:
"""Resolve URL into (url, title) pairs via yt-dlp. Blocking, run in executor.
@@ -73,19 +169,205 @@ def _resolve_tracks(url: str, max_tracks: int = _MAX_QUEUE) -> list[tuple[str, s
for i in range(0, len(lines) - 1, 2):
track_url = lines[i].strip()
track_title = lines[i + 1].strip()
if track_url:
tracks.append((track_url, track_title or track_url))
# --flat-playlist prints "NA" for single videos (no extraction)
if not track_url or track_url == "NA":
track_url = url
tracks.append((track_url, track_title or track_url))
return tracks if tracks else [(url, url)]
except Exception:
return [(url, url)]
# -- Download helpers --------------------------------------------------------
_MUSIC_DIR = Path("data/music")
def _download_track(url: str, track_id: str) -> Path | None:
"""Download audio to data/music/. Blocking -- run in executor."""
_MUSIC_DIR.mkdir(parents=True, exist_ok=True)
template = str(_MUSIC_DIR / f"{track_id}.%(ext)s")
try:
result = subprocess.run(
["yt-dlp", "-f", "bestaudio", "--no-warnings",
"-o", template, "--print", "after_move:filepath", url],
capture_output=True, text=True, timeout=300,
)
filepath = result.stdout.strip().splitlines()[-1] if result.stdout.strip() else ""
if filepath and Path(filepath).is_file():
return Path(filepath)
matches = list(_MUSIC_DIR.glob(f"{track_id}.*"))
return matches[0] if matches else None
except Exception:
log.exception("download failed for %s", url)
return None
def _cleanup_track(track: _Track) -> None:
"""Delete the local audio file unless marked to keep."""
if track.local_path is None or track.keep:
return
try:
track.local_path.unlink(missing_ok=True)
log.info("music: deleted %s", track.local_path.name)
except OSError:
log.warning("music: failed to delete %s", track.local_path)
# -- Duck monitor ------------------------------------------------------------
async def _duck_monitor(bot) -> None:
"""Background task: duck volume when voice is detected, restore on silence.
Ducking is immediate (snap to floor). Restoration is a single smooth
linear ramp from floor to user volume over ``duck_restore`` seconds.
The per-frame volume ramp in ``stream_audio`` further smooths each
1-second update, eliminating audible steps.
"""
ps = _ps(bot)
restore_start: float = 0.0 # monotonic ts when restore began
restore_from: float = 0.0 # duck_vol at restore start
try:
while True:
await asyncio.sleep(1)
if not ps["duck_enabled"]:
if ps["duck_vol"] is not None:
ps["duck_vol"] = None
restore_start = 0.0
continue
ts = getattr(bot, "_last_voice_ts", 0.0)
if ts == 0.0:
continue
silence = time.monotonic() - ts
if silence < ps["duck_silence"]:
# Voice active -- duck immediately
if ps["duck_vol"] is None:
log.info("duck: voice detected, ducking to %d%%",
ps["duck_floor"])
ps["duck_vol"] = float(ps["duck_floor"])
restore_start = 0.0
elif ps["duck_vol"] is not None:
# Silence exceeded -- smooth linear restore
if restore_start == 0.0:
restore_start = time.monotonic()
restore_from = ps["duck_vol"]
log.info("duck: restoring %d%% -> %d%% over %ds",
int(restore_from), ps["volume"],
ps["duck_restore"])
elapsed = time.monotonic() - restore_start
dur = ps["duck_restore"]
if dur <= 0 or elapsed >= dur:
ps["duck_vol"] = None
restore_start = 0.0
else:
target = ps["volume"]
ps["duck_vol"] = restore_from + (target - restore_from) * (elapsed / dur)
except asyncio.CancelledError:
ps["duck_vol"] = None
# -- Auto-resume on reconnect ------------------------------------------------
async def _auto_resume(bot) -> None:
"""Wait for silence after reconnect, then resume saved playback."""
ps = _ps(bot)
if ps["current"] is not None:
return
data = _load_resume(bot)
if data is None:
return
elapsed = data.get("elapsed", 0.0)
title = _truncate(data.get("title", data["url"]))
pos = _fmt_time(elapsed)
# Let pymumble fully stabilize after reconnect
await asyncio.sleep(10)
deadline = time.monotonic() + 60
silence_needed = ps.get("duck_silence", 15)
ts = getattr(bot, "_last_voice_ts", 0.0)
if ts != 0.0 and time.monotonic() - ts < silence_needed:
await bot.send("0",
f"Resuming '{title}' at {pos} once silent for "
f"{silence_needed}s")
else:
await bot.send("0", f"Resuming '{title}' at {pos} in a moment")
while time.monotonic() < deadline:
await asyncio.sleep(2)
ts = getattr(bot, "_last_voice_ts", 0.0)
if ts == 0.0:
break
if time.monotonic() - ts >= silence_needed:
break
else:
log.info("music: auto-resume aborted, channel not silent after 60s")
await bot.send("0", f"Resume of '{title}' aborted -- "
"channel not silent")
return
# Re-check after waiting -- someone may have started playback manually
if ps["current"] is not None:
return
data = _load_resume(bot)
if data is None:
return
elapsed = data.get("elapsed", 0.0)
track = _Track(
url=data["url"],
title=data.get("title", data["url"]),
requester=data.get("requester", "?"),
)
ps["queue"].insert(0, track)
_clear_resume(bot)
log.info("music: auto-resuming '%s' from %s",
track.title, _fmt_time(elapsed))
_ensure_loop(bot, seek=elapsed)
async def _reconnect_watcher(bot) -> None:
"""Poll for reconnections and trigger auto-resume.
Also handles cold-start resume: if saved state exists on first
run, waits for the connection to stabilize then resumes.
"""
last_seen = getattr(bot, "_connect_count", 0)
boot_checked = False
while True:
await asyncio.sleep(2)
count = getattr(bot, "_connect_count", 0)
# Cold-start: resume saved state after first connection
if not boot_checked and count >= 1:
boot_checked = True
if _load_resume(bot) is not None:
log.info("music: saved state found on boot, attempting auto-resume")
await _auto_resume(bot)
continue
if count > last_seen and count > 1:
last_seen = count
log.info("music: reconnection detected, attempting auto-resume")
await _auto_resume(bot)
last_seen = max(last_seen, count)
# -- Play loop ---------------------------------------------------------------
async def _play_loop(bot) -> None:
async def _play_loop(bot, *, seek: float = 0.0) -> None:
"""Pop tracks from queue and stream them sequentially."""
ps = _ps(bot)
duck_task = bot._spawn(_duck_monitor(bot), name="music-duck-monitor")
ps["duck_task"] = duck_task
first = True
try:
while ps["queue"]:
track = ps["queue"].pop(0)
@@ -94,44 +376,92 @@ async def _play_loop(bot) -> None:
done = asyncio.Event()
ps["done_event"] = done
cur_seek = seek if first else 0.0
first = False
progress = [0]
ps["progress"] = progress
ps["cur_seek"] = cur_seek
# Download phase
source = track.url
if track.local_path is None:
loop = asyncio.get_running_loop()
tid = hashlib.md5(track.url.encode()).hexdigest()[:12]
dl_path = await loop.run_in_executor(
None, _download_track, track.url, tid,
)
if dl_path:
track.local_path = dl_path
source = str(dl_path)
else:
log.warning("music: download failed, streaming %s",
track.url)
else:
source = str(track.local_path)
try:
await bot.stream_audio(
track.url,
volume=lambda: ps["volume"] / 100.0,
source,
volume=lambda: (
ps["duck_vol"]
if ps["duck_vol"] is not None
else ps["volume"]
) / 100.0,
on_done=done,
seek=cur_seek,
progress=progress,
)
except asyncio.CancelledError:
elapsed = cur_seek + progress[0] * 0.02
if elapsed > 1.0:
_save_resume(bot, track, elapsed)
raise
except Exception:
log.exception("music: stream error for %s", track.url)
elapsed = cur_seek + progress[0] * 0.02
if elapsed > 1.0:
_save_resume(bot, track, elapsed)
break
await done.wait()
if progress[0] > 0:
_clear_resume(bot)
_cleanup_track(track)
except asyncio.CancelledError:
pass
finally:
if duck_task and not duck_task.done():
duck_task.cancel()
ps["current"] = None
ps["done_event"] = None
ps["task"] = None
ps["duck_vol"] = None
ps["duck_task"] = None
ps["progress"] = None
ps["cur_seek"] = 0.0
def _ensure_loop(bot) -> None:
def _ensure_loop(bot, *, seek: float = 0.0) -> None:
"""Start the play loop if not already running."""
ps = _ps(bot)
task = ps.get("task")
if task and not task.done():
return
ps["task"] = bot._spawn(_play_loop(bot), name="music-play-loop")
ps["task"] = bot._spawn(
_play_loop(bot, seek=seek), name="music-play-loop",
)
# -- Commands ----------------------------------------------------------------
@command("play", help="Music: !play <url|playlist>")
@command("play", help="Music: !play <url|query>")
async def cmd_play(bot, message):
"""Play a URL or add to queue if already playing.
Usage:
!play <url> Play audio from URL (YouTube, SoundCloud, etc.)
!play <url> Play audio from URL (YouTube, SoundCloud, etc.)
!play <query> Search YouTube and play the first result
Playlists are expanded into individual tracks. If the queue is nearly
full, only as many tracks as will fit are enqueued.
@@ -142,10 +472,13 @@ async def cmd_play(bot, message):
parts = message.text.split(None, 1)
if len(parts) < 2:
await bot.reply(message, "Usage: !play <url>")
await bot.reply(message, "Usage: !play <url|query>")
return
url = parts[1].strip()
is_search = not _is_url(url)
if is_search:
url = f"ytsearch10:{url}"
ps = _ps(bot)
if len(ps["queue"]) >= _MAX_QUEUE:
@@ -156,12 +489,19 @@ async def cmd_play(bot, message):
loop = asyncio.get_running_loop()
resolved = await loop.run_in_executor(None, _resolve_tracks, url, remaining)
# Search: pick one random result instead of enqueuing all
if is_search and len(resolved) > 1:
resolved = [random.choice(resolved)]
was_idle = ps["current"] is None
requester = message.nick or "?"
added = 0
# Only set origin for direct URLs (not searches) so resume uses the
# resolved video URL rather than an ephemeral search query
origin = url if not is_search else ""
for track_url, track_title in resolved[:remaining]:
ps["queue"].append(_Track(url=track_url, title=track_title,
requester=requester))
requester=requester, origin=origin))
added += 1
total_resolved = len(resolved)
@@ -197,13 +537,59 @@ async def cmd_stop(bot, message):
task = ps.get("task")
if task and not task.done():
task.cancel()
ps["current"] = None
ps["task"] = None
ps["done_event"] = None
try:
await task
except (asyncio.CancelledError, Exception):
pass
else:
ps["current"] = None
ps["task"] = None
ps["done_event"] = None
ps["duck_vol"] = None
ps["duck_task"] = None
ps["progress"] = None
ps["cur_seek"] = 0.0
await bot.reply(message, "Stopped")
@command("resume", help="Music: !resume -- resume last stopped track")
async def cmd_resume(bot, message):
"""Resume playback from the last interrupted position.
Loads the track URL and elapsed time saved when playback was stopped
or skipped. The position persists across bot restarts.
"""
if not _is_mumble(bot):
await bot.reply(message, "Music playback is Mumble-only")
return
ps = _ps(bot)
if ps["current"] is not None:
await bot.reply(message, "Already playing")
return
data = _load_resume(bot)
if data is None:
await bot.reply(message, "Nothing to resume")
return
elapsed = data.get("elapsed", 0.0)
track = _Track(
url=data["url"],
title=data.get("title", data["url"]),
requester=data.get("requester", "?"),
)
ps["queue"].insert(0, track)
_clear_resume(bot)
await bot.reply(
message,
f"Resuming: {_truncate(track.title)} from {_fmt_time(elapsed)}",
)
_ensure_loop(bot, seek=elapsed)
@command("skip", help="Music: !skip")
async def cmd_skip(bot, message):
"""Skip current track, advance to next in queue."""
@@ -215,13 +601,15 @@ async def cmd_skip(bot, message):
await bot.reply(message, "Nothing playing")
return
skipped = ps["current"]
task = ps.get("task")
if task and not task.done():
task.cancel()
skipped = ps["current"]
ps["current"] = None
ps["task"] = None
try:
await task
except (asyncio.CancelledError, Exception):
pass
if ps["queue"]:
_ensure_loop(bot)
@@ -233,6 +621,64 @@ async def cmd_skip(bot, message):
await bot.reply(message, "Skipped, queue empty")
@command("seek", help="Music: !seek <offset>")
async def cmd_seek(bot, message):
"""Seek to position in current track.
Usage:
!seek 1:30 Seek to 1 minute 30 seconds
!seek 90 Seek to 90 seconds
!seek +30 Jump forward 30 seconds
!seek -30 Jump backward 30 seconds
!seek +1:00 Jump forward 1 minute
"""
if not _is_mumble(bot):
return
ps = _ps(bot)
parts = message.text.split(None, 1)
if len(parts) < 2:
await bot.reply(message, "Usage: !seek <offset> (e.g. 1:30, +30, -30)")
return
try:
mode, seconds = _parse_seek(parts[1].strip())
except ValueError:
await bot.reply(message, "Usage: !seek <offset> (e.g. 1:30, +30, -30)")
return
track = ps["current"]
if track is None:
await bot.reply(message, "Nothing playing")
return
# Compute target position
if mode == "abs":
target = seconds
else:
progress = ps.get("progress")
cur_seek = ps.get("cur_seek", 0.0)
elapsed = cur_seek + (progress[0] * 0.02 if progress else 0.0)
target = elapsed + seconds
target = max(0.0, target)
# Re-insert current track at front of queue (local_path intact)
ps["queue"].insert(0, track)
# Cancel the play loop and wait for cleanup
task = ps.get("task")
if task and not task.done():
task.cancel()
try:
await task
except (asyncio.CancelledError, Exception):
pass
_ensure_loop(bot, seek=target)
await bot.reply(message, f"Seeking to {_fmt_time(target)}")
@command("queue", help="Music: !queue [url]")
async def cmd_queue(bot, message):
"""Show queue or add a URL.
@@ -299,13 +745,14 @@ async def cmd_testtone(bot, message):
await bot.reply(message, "Test tone complete")
@command("volume", help="Music: !volume [0-100]")
@command("volume", help="Music: !volume [0-100|+N|-N]")
async def cmd_volume(bot, message):
"""Get or set playback volume.
Usage:
!volume Show current volume
!volume <0-100> Set volume (takes effect immediately)
!volume +N/-N Adjust volume relatively
"""
if not _is_mumble(bot):
return
@@ -316,15 +763,186 @@ async def cmd_volume(bot, message):
await bot.reply(message, f"Volume: {ps['volume']}%")
return
arg = parts[1].strip()
relative = arg.startswith("+") or (arg.startswith("-") and arg != "-")
try:
val = int(parts[1])
val = int(arg)
except ValueError:
await bot.reply(message, "Usage: !volume <0-100>")
await bot.reply(message, "Usage: !volume <0-100|+N|-N>")
return
if relative:
val = ps["volume"] + val
if val < 0 or val > 100:
await bot.reply(message, "Volume must be 0-100")
return
ps["volume"] = val
bot.state.set("music", "volume", str(val))
await bot.reply(message, f"Volume set to {val}%")
@command("duck", help="Music: !duck [on|off|floor N|silence N|restore N]")
async def cmd_duck(bot, message):
"""Configure voice-activated volume ducking.
Usage:
!duck Show ducking status and settings
!duck on Enable voice ducking
!duck off Disable voice ducking
!duck floor <0-100> Set floor volume %
!duck silence <sec> Set silence timeout (seconds)
!duck restore <sec> Set restore ramp duration (seconds)
"""
if not _is_mumble(bot):
await bot.reply(message, "Mumble-only feature")
return
ps = _ps(bot)
parts = message.text.split()
if len(parts) < 2:
state = "on" if ps["duck_enabled"] else "off"
ducking = ""
if ps["duck_vol"] is not None:
ducking = f", ducked to {int(ps['duck_vol'])}%"
await bot.reply(
message,
f"Duck: {state} | floor={ps['duck_floor']}%"
f" silence={ps['duck_silence']}s"
f" restore={ps['duck_restore']}s{ducking}",
)
return
sub = parts[1].lower()
if sub == "on":
ps["duck_enabled"] = True
await bot.reply(message, "Voice ducking enabled")
elif sub == "off":
ps["duck_enabled"] = False
ps["duck_vol"] = None
await bot.reply(message, "Voice ducking disabled")
elif sub == "floor":
if len(parts) < 3:
await bot.reply(message, "Usage: !duck floor <0-100>")
return
try:
val = int(parts[2])
except ValueError:
await bot.reply(message, "Usage: !duck floor <0-100>")
return
if val < 0 or val > 100:
await bot.reply(message, "Floor must be 0-100")
return
ps["duck_floor"] = val
await bot.reply(message, f"Duck floor set to {val}%")
elif sub == "silence":
if len(parts) < 3:
await bot.reply(message, "Usage: !duck silence <seconds>")
return
try:
val = int(parts[2])
except ValueError:
await bot.reply(message, "Usage: !duck silence <seconds>")
return
if val < 1:
await bot.reply(message, "Silence timeout must be >= 1")
return
ps["duck_silence"] = val
await bot.reply(message, f"Duck silence set to {val}s")
elif sub == "restore":
if len(parts) < 3:
await bot.reply(message, "Usage: !duck restore <seconds>")
return
try:
val = int(parts[2])
except ValueError:
await bot.reply(message, "Usage: !duck restore <seconds>")
return
if val < 1:
await bot.reply(message, "Restore duration must be >= 1")
return
ps["duck_restore"] = val
await bot.reply(message, f"Duck restore set to {val}s")
else:
await bot.reply(
message, "Usage: !duck [on|off|floor N|silence N|restore N]",
)
@command("keep", help="Music: !keep -- keep current track's audio file")
async def cmd_keep(bot, message):
"""Mark the current track's local file to keep after playback."""
if not _is_mumble(bot):
await bot.reply(message, "Mumble-only feature")
return
ps = _ps(bot)
track = ps["current"]
if track is None:
await bot.reply(message, "Nothing playing")
return
if track.local_path is None:
await bot.reply(message, "No local file for current track")
return
track.keep = True
await bot.reply(message, f"Keeping: {track.local_path.name}")
@command("kept", help="Music: !kept [clear] -- list or clear kept files")
async def cmd_kept(bot, message):
"""List or clear kept audio files in data/music/."""
if not _is_mumble(bot):
await bot.reply(message, "Mumble-only feature")
return
parts = message.text.split()
if len(parts) >= 2 and parts[1].lower() == "clear":
count = 0
if _MUSIC_DIR.is_dir():
for f in _MUSIC_DIR.iterdir():
if f.is_file():
f.unlink()
count += 1
await bot.reply(message, f"Deleted {count} file(s)")
return
files = sorted(_MUSIC_DIR.iterdir()) if _MUSIC_DIR.is_dir() else []
files = [f for f in files if f.is_file()]
if not files:
await bot.reply(message, "No kept files")
return
lines = [f"Kept files ({len(files)}):"]
for f in files:
size_mb = f.stat().st_size / (1024 * 1024)
lines.append(f" {f.name} ({size_mb:.1f}MB)")
for line in lines:
await bot.reply(message, line)
# -- Plugin lifecycle --------------------------------------------------------
async def on_connected(bot) -> None:
"""Called by MumbleBot after each (re)connection.
Ensures the reconnect watcher is running -- triggers boot-resume
and reconnect-resume without waiting for a user command.
"""
if not _is_mumble(bot):
return
ps = _ps(bot)
saved_vol = bot.state.get("music", "volume")
if saved_vol is not None:
try:
ps["volume"] = max(0, min(100, int(saved_vol)))
except ValueError:
pass
if ps["_watcher_task"] is None and hasattr(bot, "_spawn"):
ps["_watcher_task"] = bot._spawn(
_reconnect_watcher(bot), name="music-reconnect-watcher",
)

347
plugins/voice.py Normal file
View File

@@ -0,0 +1,347 @@
"""Plugin: voice STT/TTS for Mumble channels.
Listens for voice audio via pymumble's sound callback, buffers PCM per
user, transcribes via Whisper STT on silence, and provides TTS playback
via Piper. Commands: !listen, !say.
"""
from __future__ import annotations
import asyncio
import io
import json
import logging
import threading
import time
import wave
import urllib.request
from derp.http import urlopen as _urlopen
from derp.plugin import command
log = logging.getLogger(__name__)
# -- Constants ---------------------------------------------------------------
_SAMPLE_RATE = 48000
_CHANNELS = 1
_SAMPLE_WIDTH = 2 # s16le = 2 bytes per sample
_SILENCE_GAP = 1.5 # seconds of silence before flushing
_MIN_DURATION = 0.5 # discard utterances shorter than this
_MAX_DURATION = 30.0 # cap buffer at this many seconds
_MIN_BYTES = int(_MIN_DURATION * _SAMPLE_RATE * _SAMPLE_WIDTH)
_MAX_BYTES = int(_MAX_DURATION * _SAMPLE_RATE * _SAMPLE_WIDTH)
_FLUSH_INTERVAL = 0.5 # flush monitor poll interval
_MAX_SAY_LEN = 500 # max characters for !say
_WHISPER_URL = "http://192.168.129.9:8080/inference"
_PIPER_URL = "http://192.168.129.9:5100/"
# -- Per-bot state -----------------------------------------------------------
def _ps(bot):
"""Per-bot plugin runtime state."""
cfg = getattr(bot, "config", {}).get("voice", {})
return bot._pstate.setdefault("voice", {
"listen": False,
"trigger": cfg.get("trigger", ""),
"buffers": {}, # {username: bytearray}
"last_ts": {}, # {username: float monotonic}
"flush_task": None,
"lock": threading.Lock(),
"silence_gap": cfg.get("silence_gap", _SILENCE_GAP),
"whisper_url": cfg.get("whisper_url", _WHISPER_URL),
"piper_url": cfg.get("piper_url", _PIPER_URL),
"_listener_registered": False,
})
# -- Helpers -----------------------------------------------------------------
def _is_mumble(bot) -> bool:
"""Check if bot supports voice streaming."""
return hasattr(bot, "stream_audio")
def _pcm_to_wav(pcm: bytes) -> bytes:
"""Wrap raw s16le 48kHz mono PCM in a WAV container."""
buf = io.BytesIO()
with wave.open(buf, "wb") as wf:
wf.setnchannels(_CHANNELS)
wf.setsampwidth(_SAMPLE_WIDTH)
wf.setframerate(_SAMPLE_RATE)
wf.writeframes(pcm)
return buf.getvalue()
# -- STT: Sound listener (pymumble thread) ----------------------------------
def _on_voice(bot, user, sound_chunk):
"""Buffer incoming voice PCM per user. Runs on pymumble thread."""
ps = _ps(bot)
if not ps["listen"] and not ps["trigger"]:
return
try:
name = user["name"]
except (KeyError, TypeError):
name = None
if not name or name == bot.nick:
return
pcm = sound_chunk.pcm
if not pcm:
return
with ps["lock"]:
if name not in ps["buffers"]:
ps["buffers"][name] = bytearray()
buf = ps["buffers"][name]
buf.extend(pcm)
if len(buf) > _MAX_BYTES:
ps["buffers"][name] = bytearray(buf[-_MAX_BYTES:])
ps["last_ts"][name] = time.monotonic()
# -- STT: Whisper transcription ---------------------------------------------
def _transcribe(ps, pcm: bytes) -> str:
"""POST PCM (as WAV) to Whisper and return transcribed text. Blocking."""
wav_data = _pcm_to_wav(pcm)
boundary = "----derp_voice_boundary"
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="audio.wav"\r\n'
f"Content-Type: audio/wav\r\n\r\n"
).encode() + wav_data + (
f"\r\n--{boundary}\r\n"
f'Content-Disposition: form-data; name="response_format"\r\n\r\n'
f"json\r\n--{boundary}--\r\n"
).encode()
req = urllib.request.Request(ps["whisper_url"], data=body, method="POST")
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
resp = _urlopen(req, timeout=30, proxy=False)
data = json.loads(resp.read())
resp.close()
return data.get("text", "").strip()
# -- STT: Flush monitor (asyncio background task) ---------------------------
async def _flush_monitor(bot):
"""Poll for silence gaps and transcribe completed utterances."""
ps = _ps(bot)
loop = asyncio.get_running_loop()
try:
while ps["listen"] or ps["trigger"]:
await asyncio.sleep(_FLUSH_INTERVAL)
now = time.monotonic()
to_flush: list[tuple[str, bytes]] = []
with ps["lock"]:
for name in list(ps["last_ts"]):
elapsed = now - ps["last_ts"][name]
if elapsed >= ps["silence_gap"] and name in ps["buffers"]:
pcm = bytes(ps["buffers"].pop(name))
del ps["last_ts"][name]
to_flush.append((name, pcm))
for name, pcm in to_flush:
if len(pcm) < _MIN_BYTES:
continue
try:
text = await loop.run_in_executor(
None, _transcribe, ps, pcm,
)
except Exception:
log.exception("voice: transcription failed for %s", name)
continue
if not text or text.strip("., ") == "":
continue
trigger = ps["trigger"]
if trigger and text.lower().startswith(trigger.lower()):
remainder = text[len(trigger):].strip()
if remainder:
log.info("voice: trigger from %s: %s", name, remainder)
bot._spawn(
_tts_play(bot, remainder), name="voice-tts",
)
continue
if ps["listen"]:
log.info("voice: %s said: %s", name, text)
await bot.action("0", f"heard {name} say: {text}")
except asyncio.CancelledError:
pass
except Exception:
log.exception("voice: flush monitor error")
# -- TTS: Piper fetch + playback --------------------------------------------
def _fetch_tts(piper_url: str, text: str) -> str | None:
"""POST text to Piper TTS and save the WAV response. Blocking."""
import tempfile
try:
payload = json.dumps({"text": text}).encode()
req = urllib.request.Request(
piper_url, data=payload, method="POST",
)
req.add_header("Content-Type", "application/json")
resp = _urlopen(req, timeout=30, proxy=False)
data = resp.read()
resp.close()
if not data:
return None
tmp = tempfile.NamedTemporaryFile(
suffix=".wav", prefix="derp_tts_", delete=False,
)
tmp.write(data)
tmp.close()
return tmp.name
except Exception:
log.exception("voice: TTS fetch failed")
return None
async def _tts_play(bot, text: str):
"""Fetch TTS audio and play it via stream_audio."""
from pathlib import Path
ps = _ps(bot)
loop = asyncio.get_running_loop()
wav_path = await loop.run_in_executor(
None, _fetch_tts, ps["piper_url"], text,
)
if wav_path is None:
return
try:
done = asyncio.Event()
await bot.stream_audio(str(wav_path), volume=1.0, on_done=done)
await done.wait()
finally:
Path(wav_path).unlink(missing_ok=True)
# -- Listener lifecycle -----------------------------------------------------
def _ensure_listener(bot):
"""Register the sound listener callback (idempotent)."""
ps = _ps(bot)
if ps["_listener_registered"]:
return
if not hasattr(bot, "_sound_listeners"):
return
bot._sound_listeners.append(lambda user, chunk: _on_voice(bot, user, chunk))
ps["_listener_registered"] = True
log.info("voice: registered sound listener")
def _ensure_flush_task(bot):
"""Start the flush monitor if not running."""
ps = _ps(bot)
task = ps.get("flush_task")
if task and not task.done():
return
ps["flush_task"] = bot._spawn(
_flush_monitor(bot), name="voice-flush-monitor",
)
def _stop_flush_task(bot):
"""Cancel the flush monitor."""
ps = _ps(bot)
task = ps.get("flush_task")
if task and not task.done():
task.cancel()
ps["flush_task"] = None
# -- Commands ----------------------------------------------------------------
@command("listen", help="Voice: !listen [on|off] -- toggle STT", tier="admin")
async def cmd_listen(bot, message):
"""Toggle voice-to-text transcription."""
if not _is_mumble(bot):
await bot.reply(message, "Voice is Mumble-only")
return
ps = _ps(bot)
parts = message.text.split()
if len(parts) < 2:
state = "on" if ps["listen"] else "off"
trigger = ps["trigger"]
info = f"Listen: {state}"
if trigger:
info += f" | Trigger: {trigger}"
await bot.reply(message, info)
return
sub = parts[1].lower()
if sub == "on":
ps["listen"] = True
_ensure_listener(bot)
_ensure_flush_task(bot)
await bot.reply(message, "Listening for voice")
elif sub == "off":
ps["listen"] = False
if not ps["trigger"]:
with ps["lock"]:
ps["buffers"].clear()
ps["last_ts"].clear()
_stop_flush_task(bot)
await bot.reply(message, "Stopped listening")
else:
await bot.reply(message, "Usage: !listen [on|off]")
@command("say", help="Voice: !say <text> -- text-to-speech")
async def cmd_say(bot, message):
"""Speak text aloud via Piper TTS."""
if not _is_mumble(bot):
await bot.reply(message, "Voice is Mumble-only")
return
parts = message.text.split(None, 1)
if len(parts) < 2:
await bot.reply(message, "Usage: !say <text>")
return
text = parts[1].strip()
if len(text) > _MAX_SAY_LEN:
await bot.reply(message, f"Text too long (max {_MAX_SAY_LEN} chars)")
return
bot._spawn(_tts_play(bot, text), name="voice-tts")
# -- Plugin lifecycle --------------------------------------------------------
async def on_connected(bot) -> None:
"""Re-register listener after reconnect; play TTS greeting on first join."""
if not _is_mumble(bot):
return
ps = _ps(bot)
# TTS greeting on first connect
greet = bot.config.get("mumble", {}).get("greet")
if greet and not ps.get("_greeted"):
ps["_greeted"] = True
# Wait for audio subsystem to be ready
for _ in range(20):
if bot._is_audio_ready():
break
await asyncio.sleep(0.5)
bot._spawn(_tts_play(bot, greet), name="voice-greet")
if ps["listen"] or ps["trigger"]:
_ensure_listener(bot)
_ensure_flush_task(bot)

View File

@@ -96,6 +96,7 @@ class Bot:
self._tasks: set[asyncio.Task] = set()
self._reconnect_delay: float = 5.0
self._admins: list[str] = config.get("bot", {}).get("admins", [])
self._sorcerers: list[str] = config.get("bot", {}).get("sorcerers", [])
self._operators: list[str] = config.get("bot", {}).get("operators", [])
self._trusted: list[str] = config.get("bot", {}).get("trusted", [])
self._opers: set[str] = set() # hostmasks of known IRC operators
@@ -377,6 +378,9 @@ class Bot:
for pattern in self._admins:
if fnmatch.fnmatch(msg.prefix, pattern):
return "admin"
for pattern in self._sorcerers:
if fnmatch.fnmatch(msg.prefix, pattern):
return "sorcerer"
for pattern in self._operators:
if fnmatch.fnmatch(msg.prefix, pattern):
return "oper"

View File

@@ -155,6 +155,20 @@ def main(argv: list[str] | None = None) -> int:
mumble_bot = MumbleBot("mumble", config, registry)
bots.append(mumble_bot)
# Additional Mumble bots (e.g. merlin)
for extra in config.get("mumble", {}).get("extra", []):
extra_cfg = dict(config)
merged_mu = dict(config["mumble"])
merged_mu.update(extra)
merged_mu.pop("extra", None)
extra_cfg["mumble"] = merged_mu
# Extra bots don't run voice trigger by default
if "voice" not in extra:
extra_cfg["voice"] = {}
username = extra.get("username", f"mumble-{len(bots)}")
bot = MumbleBot(username, extra_cfg, registry)
bots.append(bot)
names = ", ".join(b.name for b in bots)
log.info("servers: %s", names)

View File

@@ -6,8 +6,10 @@ import array
import asyncio
import html
import logging
import os
import re
import struct
import threading
import time
from dataclasses import dataclass, field
from pathlib import Path
@@ -16,6 +18,7 @@ import pymumble_py3 as pymumble
from pymumble_py3.constants import (
PYMUMBLE_CLBK_CONNECTED,
PYMUMBLE_CLBK_DISCONNECTED,
PYMUMBLE_CLBK_SOUNDRECEIVED,
PYMUMBLE_CLBK_TEXTMESSAGERECEIVED,
)
@@ -135,6 +138,8 @@ class MumbleBot:
self._port: int = mu_cfg.get("port", 64738)
self._username: str = mu_cfg.get("username", "derp")
self._password: str = mu_cfg.get("password", "")
self._certfile: str | None = mu_cfg.get("certfile")
self._keyfile: str | None = mu_cfg.get("keyfile")
self.nick: str = self._username
self.prefix: str = (
mu_cfg.get("prefix")
@@ -144,6 +149,7 @@ class MumbleBot:
self._started: float = time.monotonic()
self._tasks: set[asyncio.Task] = set()
self._admins: list[str] = [str(x) for x in mu_cfg.get("admins", [])]
self._sorcerers: list[str] = [str(x) for x in mu_cfg.get("sorcerers", [])]
self._operators: list[str] = [str(x) for x in mu_cfg.get("operators", [])]
self._trusted: list[str] = [str(x) for x in mu_cfg.get("trusted", [])]
self.state = StateStore(f"data/state-{name}.db")
@@ -151,6 +157,9 @@ class MumbleBot:
# pymumble state
self._mumble: pymumble.Mumble | None = None
self._loop: asyncio.AbstractEventLoop | None = None
self._last_voice_ts: float = 0.0
self._connect_count: int = 0
self._sound_listeners: list = []
rate_cfg = config.get("bot", {})
self._bucket = _TokenBucket(
@@ -165,8 +174,14 @@ class MumbleBot:
self._mumble = pymumble.Mumble(
self._host, self._username,
port=self._port, password=self._password,
certfile=self._certfile, keyfile=self._keyfile,
reconnect=True,
)
# pymumble captures threading.current_thread() as parent_thread at
# init time. Since we run in an executor thread (which is transient),
# the parent check in pymumble's loop would see a dead thread and
# disconnect. Point it at the main thread instead.
self._mumble.parent_thread = threading.main_thread()
self._mumble.callbacks.set_callback(
PYMUMBLE_CLBK_TEXTMESSAGERECEIVED,
self._on_text_message,
@@ -179,19 +194,59 @@ class MumbleBot:
PYMUMBLE_CLBK_DISCONNECTED,
self._on_disconnected,
)
self._mumble.set_receive_sound(False)
self._mumble.callbacks.set_callback(
PYMUMBLE_CLBK_SOUNDRECEIVED,
self._on_sound_received,
)
self._mumble.set_receive_sound(True)
self._mumble.start()
self._mumble.is_ready()
def _on_connected(self) -> None:
"""Callback from pymumble thread: connection established."""
self._connect_count += 1
kind = "reconnected" if self._connect_count > 1 else "connected"
session = getattr(self._mumble.users, "myself_session", "?")
log.info("mumble: connected as %s on %s:%d (session=%s)",
self._username, self._host, self._port, session)
log.info("mumble: %s as %s on %s:%d (session=%s)",
kind, self._username, self._host, self._port, session)
if self._loop:
asyncio.run_coroutine_threadsafe(
self._notify_plugins_connected(), self._loop,
)
async def _notify_plugins_connected(self) -> None:
"""Call on_connected(bot) in each loaded plugin that defines it."""
for name, mod in self.registry._modules.items():
fn = getattr(mod, "on_connected", None)
if fn is None or not asyncio.iscoroutinefunction(fn):
continue
try:
await fn(self)
except Exception:
log.exception("mumble: on_connected hook failed in %s", name)
def _on_disconnected(self) -> None:
"""Callback from pymumble thread: connection lost."""
log.warning("mumble: disconnected")
self._last_voice_ts = 0.0
def _on_sound_received(self, user, sound_chunk) -> None:
"""Callback from pymumble thread: voice audio received.
Updates the timestamp used by the music plugin's duck monitor.
When this callback is registered, pymumble passes decoded PCM
directly and does not queue it -- no memory buildup.
"""
prev = self._last_voice_ts
self._last_voice_ts = time.monotonic()
if prev == 0.0:
name = user["name"] if isinstance(user, dict) else "?"
log.info("mumble: first voice packet from %s", name)
for fn in self._sound_listeners:
try:
fn(user, sound_chunk)
except Exception:
log.exception("mumble: sound listener error")
def _on_text_message(self, message) -> None:
"""Callback from pymumble thread: text message received.
@@ -250,6 +305,11 @@ class MumbleBot:
while self._running:
await asyncio.sleep(1)
finally:
# Cancel background tasks first so play-loop can save resume state
for task in list(self._tasks):
task.cancel()
if self._tasks:
await asyncio.gather(*self._tasks, return_exceptions=True)
if self._mumble:
self._mumble.stop()
self._mumble = None
@@ -322,6 +382,9 @@ class MumbleBot:
for name in self._admins:
if msg.prefix == name:
return "admin"
for name in self._sorcerers:
if msg.prefix == name:
return "sorcerer"
for name in self._operators:
if msg.prefix == name:
return "oper"
@@ -413,6 +476,16 @@ class MumbleBot:
# -- Voice streaming -----------------------------------------------------
def _is_audio_ready(self) -> bool:
"""Check if pymumble can accept audio (connected + encoder ready)."""
if self._mumble is None:
return False
try:
so = self._mumble.sound_output
return so is not None and so.encoder is not None
except AttributeError:
return False
async def test_tone(self, duration: float = 3.0) -> None:
"""Send a 440Hz sine test tone for debugging voice output."""
import math
@@ -446,37 +519,47 @@ class MumbleBot:
*,
volume=0.5,
on_done=None,
seek: float = 0.0,
progress: list | None = None,
) -> None:
"""Stream audio from URL through yt-dlp|ffmpeg to voice channel.
Pipeline:
yt-dlp -o - -f bestaudio <url>
| ffmpeg -i pipe:0 -f s16le -ar 48000 -ac 1 pipe:1
| ffmpeg [-ss N.NNN] -i pipe:0 -f s16le -ar 48000 -ac 1 pipe:1
Feeds raw PCM to pymumble's sound_output which handles Opus
encoding, packetization, and timing.
``volume`` may be a float (static) or a callable returning float
(dynamic, re-read each frame).
(dynamic, re-read each frame). ``seek`` skips into the track
(seconds). ``progress`` is a mutable ``[0]`` list updated to the
current frame count each frame.
"""
if self._mumble is None:
return
_get_vol = volume if callable(volume) else lambda: volume
log.info("stream_audio: starting pipeline for %s (vol=%.0f%%)",
url, _get_vol() * 100)
log.info("stream_audio: starting pipeline for %s (vol=%.0f%%, seek=%.1fs)",
url, _get_vol() * 100, seek)
seek_flag = f" -ss {seek:.3f}" if seek > 0 else ""
if os.path.isfile(url):
cmd = (f"ffmpeg{seek_flag} -i {_shell_quote(url)}"
f" -f s16le -ar 48000 -ac 1 -loglevel error pipe:1")
else:
cmd = (f"yt-dlp -o - -f bestaudio --no-warnings {_shell_quote(url)}"
f" | ffmpeg{seek_flag} -i pipe:0 -f s16le -ar 48000 -ac 1"
f" -loglevel error pipe:1")
proc = await asyncio.create_subprocess_exec(
"sh", "-c",
f"yt-dlp -o - -f bestaudio --no-warnings {_shell_quote(url)}"
f" | ffmpeg -i pipe:0 -f s16le -ar 48000 -ac 1"
f" -loglevel error pipe:1",
"sh", "-c", cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_max_step = 0.1 # max volume change per frame (~200ms full ramp)
_max_step = 0.005 # max volume change per frame (~4s full ramp)
_cur_vol = _get_vol()
_was_feeding = True # track connected/disconnected transitions
frames = 0
try:
@@ -487,6 +570,24 @@ class MumbleBot:
if len(pcm) < _FRAME_BYTES:
pcm += b"\x00" * (_FRAME_BYTES - len(pcm))
frames += 1
if progress is not None:
progress[0] = frames
if not self._is_audio_ready():
# Disconnected -- keep reading ffmpeg at real-time pace
if _was_feeding:
log.warning("stream_audio: connection lost, "
"dropping frames at %d", frames)
_was_feeding = False
await asyncio.sleep(0.02)
continue
if not _was_feeding:
log.info("stream_audio: connection restored, "
"resuming feed at frame %d", frames)
_was_feeding = True
target = _get_vol()
if _cur_vol == target:
# Fast path: flat scaling
@@ -504,32 +605,53 @@ class MumbleBot:
pcm = _scale_pcm_ramp(pcm, _cur_vol, next_vol)
_cur_vol = next_vol
self._mumble.sound_output.add_sound(pcm)
frames += 1
try:
self._mumble.sound_output.add_sound(pcm)
except (TypeError, AttributeError, OSError):
# Disconnected mid-feed -- skip this frame
await asyncio.sleep(0.02)
continue
if frames == 1:
log.info("stream_audio: first frame fed to pymumble")
# Keep buffer at most 1 second ahead
while self._mumble.sound_output.get_buffer_size() > 1.0:
await asyncio.sleep(0.05)
try:
while (self._is_audio_ready()
and self._mumble.sound_output.get_buffer_size() > 1.0):
await asyncio.sleep(0.05)
except (TypeError, AttributeError):
pass
# Wait for buffer to drain
while self._mumble.sound_output.get_buffer_size() > 0:
await asyncio.sleep(0.1)
try:
while (self._is_audio_ready()
and self._mumble.sound_output.get_buffer_size() > 0):
await asyncio.sleep(0.1)
except (TypeError, AttributeError):
pass
log.info("stream_audio: finished, %d frames", frames)
except asyncio.CancelledError:
self._mumble.sound_output.clear_buffer()
try:
if self._is_audio_ready():
self._mumble.sound_output.clear_buffer()
except Exception:
pass
log.info("stream_audio: cancelled at frame %d", frames)
raise
except Exception:
log.exception("stream_audio: error at frame %d", frames)
raise
finally:
try:
proc.kill()
except ProcessLookupError:
pass
stderr_out = await proc.stderr.read()
await proc.wait()
try:
stderr_out = await asyncio.wait_for(proc.stderr.read(), timeout=3)
await asyncio.wait_for(proc.wait(), timeout=3)
except (asyncio.TimeoutError, asyncio.CancelledError):
stderr_out = b""
if stderr_out:
log.warning("stream_audio: subprocess stderr: %s",
stderr_out.decode(errors="replace")[:500])

View File

@@ -12,7 +12,7 @@ from typing import Any, Callable
log = logging.getLogger(__name__)
TIERS: tuple[str, ...] = ("user", "trusted", "oper", "admin")
TIERS: tuple[str, ...] = ("user", "trusted", "oper", "sorcerer", "admin")
@dataclass(slots=True)

View File

@@ -107,7 +107,7 @@ def _msg(text: str, prefix: str = "nick!user@host") -> Message:
class TestTierConstants:
def test_tier_order(self):
assert TIERS == ("user", "trusted", "oper", "admin")
assert TIERS == ("user", "trusted", "oper", "sorcerer", "admin")
def test_index_comparison(self):
assert TIERS.index("user") < TIERS.index("trusted")

538
tests/test_llm.py Normal file
View File

@@ -0,0 +1,538 @@
"""Tests for the OpenRouter LLM chat plugin."""
import asyncio
import importlib.util
import json
import sys
import time
import urllib.error
from pathlib import Path
from unittest.mock import patch
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.llm", Path(__file__).resolve().parent.parent / "plugins" / "llm.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.llm import ( # noqa: E402
_COOLDOWN,
_MAX_HISTORY,
_MAX_REPLY_LEN,
_chat_request,
_check_cooldown,
_extract_reply,
_get_api_key,
_get_model,
_ps,
_set_cooldown,
_truncate,
cmd_ask,
cmd_chat,
)
# -- Helpers -----------------------------------------------------------------
class _FakeState:
"""In-memory stand-in for bot.state."""
def __init__(self):
self._store: dict[str, dict[str, str]] = {}
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
return self._store.get(plugin, {}).get(key, default)
def set(self, plugin: str, key: str, value: str) -> None:
self._store.setdefault(plugin, {})[key] = value
def delete(self, plugin: str, key: str) -> bool:
try:
del self._store[plugin][key]
return True
except KeyError:
return False
def keys(self, plugin: str) -> list[str]:
return sorted(self._store.get(plugin, {}).keys())
class _FakeRegistry:
"""Minimal registry stand-in."""
def __init__(self):
self._modules: dict = {}
class _FakeBot:
"""Minimal bot stand-in that captures sent/replied messages."""
def __init__(self, *, admin: bool = False, config: dict | None = None):
self.sent: list[tuple[str, str]] = []
self.actions: list[tuple[str, str]] = []
self.replied: list[str] = []
self.state = _FakeState()
self._pstate: dict = {}
self.registry = _FakeRegistry()
self._admin = admin
self.config = config or {}
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
async def action(self, target: str, text: str) -> None:
self.actions.append((target, text))
async def reply(self, message, text: str) -> None:
self.replied.append(text)
async def long_reply(self, message, lines, *, label: str = "") -> None:
for line in lines:
self.replied.append(line)
def _is_admin(self, message) -> bool:
return self._admin
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
"""Create a channel PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
def _pm(text: str, nick: str = "alice") -> Message:
"""Create a private PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=["botname", text], tags={},
)
def _api_response(content: str = "Hello!", reasoning: str = "") -> dict:
"""Build a mock API response."""
msg = {"role": "assistant", "content": content}
if reasoning:
msg["reasoning"] = reasoning
return {"choices": [{"message": msg}]}
class _FakeResp:
"""Mock HTTP response."""
def __init__(self, data: dict):
self._data = json.dumps(data).encode()
def read(self):
return self._data
def close(self):
pass
def _clear(bot=None) -> None:
"""Reset per-bot plugin state between tests."""
if bot is None:
return
ps = _ps(bot)
ps["histories"].clear()
ps["cooldowns"].clear()
ps["model"] = ""
# ---------------------------------------------------------------------------
# TestTruncate
# ---------------------------------------------------------------------------
class TestTruncate:
def test_short_text_unchanged(self):
assert _truncate("hello") == "hello"
def test_exact_length_unchanged(self):
text = "a" * _MAX_REPLY_LEN
assert _truncate(text) == text
def test_long_text_truncated(self):
text = "a" * 600
result = _truncate(text)
assert len(result) == _MAX_REPLY_LEN
assert result.endswith("...")
def test_custom_max(self):
result = _truncate("abcdefghij", 7)
assert result == "abcd..."
# ---------------------------------------------------------------------------
# TestExtractReply
# ---------------------------------------------------------------------------
class TestExtractReply:
def test_normal_content(self):
data = _api_response(content="Hello world")
assert _extract_reply(data) == "Hello world"
def test_empty_content_falls_back_to_reasoning(self):
data = _api_response(content="", reasoning="Thinking about it")
assert _extract_reply(data) == "Thinking about it"
def test_content_preferred_over_reasoning(self):
data = _api_response(content="Answer", reasoning="Reasoning")
assert _extract_reply(data) == "Answer"
def test_empty_choices(self):
assert _extract_reply({"choices": []}) == ""
def test_no_choices(self):
assert _extract_reply({}) == ""
def test_whitespace_content_falls_back(self):
data = _api_response(content=" ", reasoning="Fallback")
assert _extract_reply(data) == "Fallback"
# ---------------------------------------------------------------------------
# TestGetApiKey
# ---------------------------------------------------------------------------
class TestGetApiKey:
def test_from_env(self):
bot = _FakeBot()
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "env-key"}):
assert _get_api_key(bot) == "env-key"
def test_from_config(self):
bot = _FakeBot(config={"openrouter": {"api_key": "cfg-key"}})
with patch.dict("os.environ", {}, clear=True):
import os
os.environ.pop("OPENROUTER_API_KEY", None)
assert _get_api_key(bot) == "cfg-key"
def test_env_takes_precedence(self):
bot = _FakeBot(config={"openrouter": {"api_key": "cfg-key"}})
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "env-key"}):
assert _get_api_key(bot) == "env-key"
def test_missing_returns_empty(self):
bot = _FakeBot()
with patch.dict("os.environ", {}, clear=True):
import os
os.environ.pop("OPENROUTER_API_KEY", None)
assert _get_api_key(bot) == ""
# ---------------------------------------------------------------------------
# TestGetModel
# ---------------------------------------------------------------------------
class TestGetModel:
def test_default_model(self):
bot = _FakeBot()
assert _get_model(bot) == "openrouter/auto"
def test_from_config(self):
bot = _FakeBot(config={"openrouter": {"model": "some/model"}})
assert _get_model(bot) == "some/model"
def test_runtime_override(self):
bot = _FakeBot(config={"openrouter": {"model": "some/model"}})
_ps(bot)["model"] = "override/model"
assert _get_model(bot) == "override/model"
# ---------------------------------------------------------------------------
# TestCooldown
# ---------------------------------------------------------------------------
class TestCooldown:
def test_first_request_not_limited(self):
bot = _FakeBot()
_clear(bot)
assert _check_cooldown(bot, "alice") is False
def test_second_request_within_cooldown(self):
bot = _FakeBot()
_clear(bot)
_set_cooldown(bot, "alice")
assert _check_cooldown(bot, "alice") is True
def test_different_users_independent(self):
bot = _FakeBot()
_clear(bot)
_set_cooldown(bot, "alice")
assert _check_cooldown(bot, "bob") is False
def test_after_cooldown_passes(self):
bot = _FakeBot()
_clear(bot)
_set_cooldown(bot, "alice")
# Simulate time passing
_ps(bot)["cooldowns"]["alice"] = time.monotonic() - _COOLDOWN - 1
assert _check_cooldown(bot, "alice") is False
# ---------------------------------------------------------------------------
# TestCmdAsk
# ---------------------------------------------------------------------------
class TestCmdAsk:
def test_no_args(self):
bot = _FakeBot()
asyncio.run(cmd_ask(bot, _msg("!ask")))
assert "Usage:" in bot.replied[0]
def test_empty_args(self):
bot = _FakeBot()
asyncio.run(cmd_ask(bot, _msg("!ask ")))
assert "Usage:" in bot.replied[0]
def test_no_api_key(self):
bot = _FakeBot()
_clear(bot)
with patch.dict("os.environ", {}, clear=True):
import os
os.environ.pop("OPENROUTER_API_KEY", None)
asyncio.run(cmd_ask(bot, _msg("!ask what is python")))
assert "not configured" in bot.replied[0]
def test_success(self):
bot = _FakeBot()
_clear(bot)
resp = _FakeResp(_api_response(content="Python is a programming language."))
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
with patch.object(_mod, "_urlopen", return_value=resp):
asyncio.run(cmd_ask(bot, _msg("!ask what is python")))
assert len(bot.replied) == 1
assert "Python is a programming language" in bot.replied[0]
def test_api_error_429(self):
bot = _FakeBot()
_clear(bot)
err = urllib.error.HTTPError(
"url", 429, "Too Many Requests", {}, None,
)
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
with patch.object(_mod, "_urlopen", side_effect=err):
asyncio.run(cmd_ask(bot, _msg("!ask hello")))
assert "Rate limited" in bot.replied[0]
def test_api_error_500(self):
bot = _FakeBot()
_clear(bot)
err = urllib.error.HTTPError(
"url", 500, "Internal Server Error", {}, None,
)
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
with patch.object(_mod, "_urlopen", side_effect=err):
asyncio.run(cmd_ask(bot, _msg("!ask hello")))
assert "API error" in bot.replied[0]
assert "500" in bot.replied[0]
def test_connection_error(self):
bot = _FakeBot()
_clear(bot)
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")):
asyncio.run(cmd_ask(bot, _msg("!ask hello")))
assert "Request failed" in bot.replied[0]
def test_empty_response(self):
bot = _FakeBot()
_clear(bot)
resp = _FakeResp({"choices": []})
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
with patch.object(_mod, "_urlopen", return_value=resp):
asyncio.run(cmd_ask(bot, _msg("!ask hello")))
assert "No response" in bot.replied[0]
def test_cooldown(self):
bot = _FakeBot()
_clear(bot)
resp = _FakeResp(_api_response(content="Hello!"))
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
with patch.object(_mod, "_urlopen", return_value=resp):
asyncio.run(cmd_ask(bot, _msg("!ask first")))
bot.replied.clear()
asyncio.run(cmd_ask(bot, _msg("!ask second")))
assert "Cooldown" in bot.replied[0]
def test_response_truncation(self):
bot = _FakeBot()
_clear(bot)
long_text = "a" * 600
resp = _FakeResp(_api_response(content=long_text))
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
with patch.object(_mod, "_urlopen", return_value=resp):
asyncio.run(cmd_ask(bot, _msg("!ask hello")))
assert len(bot.replied[0]) == _MAX_REPLY_LEN
assert bot.replied[0].endswith("...")
def test_reasoning_model_fallback(self):
bot = _FakeBot()
_clear(bot)
resp = _FakeResp(_api_response(content="", reasoning="Deep thought"))
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
with patch.object(_mod, "_urlopen", return_value=resp):
asyncio.run(cmd_ask(bot, _msg("!ask meaning of life")))
assert "Deep thought" in bot.replied[0]
def test_multiline_uses_long_reply(self):
bot = _FakeBot()
_clear(bot)
resp = _FakeResp(_api_response(content="Line one\nLine two\nLine three"))
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
with patch.object(_mod, "_urlopen", return_value=resp):
asyncio.run(cmd_ask(bot, _msg("!ask hello")))
assert len(bot.replied) == 3
assert bot.replied[0] == "Line one"
# ---------------------------------------------------------------------------
# TestCmdChat
# ---------------------------------------------------------------------------
class TestCmdChat:
def test_no_args(self):
bot = _FakeBot()
asyncio.run(cmd_chat(bot, _msg("!chat")))
assert "Usage:" in bot.replied[0]
def test_chat_with_history(self):
bot = _FakeBot()
_clear(bot)
resp1 = _FakeResp(_api_response(content="I am an assistant."))
resp2 = _FakeResp(_api_response(content="You asked who I am."))
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
with patch.object(_mod, "_urlopen", return_value=resp1):
asyncio.run(cmd_chat(bot, _msg("!chat who are you")))
# Clear cooldown for second request
_ps(bot)["cooldowns"].clear()
with patch.object(_mod, "_urlopen", return_value=resp2) as mock_url:
asyncio.run(cmd_chat(bot, _msg("!chat what did I ask")))
# Verify the history was sent with the second request
call_args = mock_url.call_args
req = call_args[0][0]
body = json.loads(req.data)
# System + user1 + assistant1 + user2 = 4 messages
assert len(body["messages"]) == 4
assert body["messages"][1]["content"] == "who are you"
assert body["messages"][2]["content"] == "I am an assistant."
assert body["messages"][3]["content"] == "what did I ask"
assert "I am an assistant" in bot.replied[0]
assert "You asked who I am" in bot.replied[1]
def test_chat_clear(self):
bot = _FakeBot()
_clear(bot)
# Pre-populate history
_ps(bot)["histories"]["alice"] = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi"},
]
asyncio.run(cmd_chat(bot, _msg("!chat clear")))
assert "cleared" in bot.replied[0].lower()
assert "alice" not in _ps(bot)["histories"]
def test_chat_cooldown(self):
bot = _FakeBot()
_clear(bot)
resp = _FakeResp(_api_response(content="Hello!"))
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
with patch.object(_mod, "_urlopen", return_value=resp):
asyncio.run(cmd_chat(bot, _msg("!chat first")))
bot.replied.clear()
asyncio.run(cmd_chat(bot, _msg("!chat second")))
assert "Cooldown" in bot.replied[0]
def test_chat_model_show(self):
bot = _FakeBot()
_clear(bot)
asyncio.run(cmd_chat(bot, _msg("!chat model")))
assert "openrouter/auto" in bot.replied[0]
def test_chat_model_switch(self):
bot = _FakeBot(admin=True)
_clear(bot)
asyncio.run(cmd_chat(bot, _msg("!chat model meta-llama/llama-3.3-70b-instruct:free")))
assert "Model set to" in bot.replied[0]
assert _ps(bot)["model"] == "meta-llama/llama-3.3-70b-instruct:free"
def test_chat_models_list(self):
bot = _FakeBot()
_clear(bot)
asyncio.run(cmd_chat(bot, _msg("!chat models")))
assert len(bot.replied) >= 3
assert any("openrouter/auto" in r for r in bot.replied)
def test_chat_no_api_key(self):
bot = _FakeBot()
_clear(bot)
with patch.dict("os.environ", {}, clear=True):
import os
os.environ.pop("OPENROUTER_API_KEY", None)
asyncio.run(cmd_chat(bot, _msg("!chat hello")))
assert "not configured" in bot.replied[0]
def test_history_cap(self):
bot = _FakeBot()
_clear(bot)
# Pre-populate with MAX_HISTORY messages
ps = _ps(bot)
ps["histories"]["alice"] = [
{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg{i}"}
for i in range(_MAX_HISTORY)
]
resp = _FakeResp(_api_response(content="Latest reply"))
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
with patch.object(_mod, "_urlopen", return_value=resp):
asyncio.run(cmd_chat(bot, _msg("!chat overflow")))
history = ps["histories"]["alice"]
# History should be capped at MAX_HISTORY
assert len(history) <= _MAX_HISTORY
def test_chat_api_error_removes_user_msg(self):
"""On API failure, the user message should be removed from history."""
bot = _FakeBot()
_clear(bot)
err = urllib.error.HTTPError(
"url", 500, "Internal Server Error", {}, None,
)
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
with patch.object(_mod, "_urlopen", side_effect=err):
asyncio.run(cmd_chat(bot, _msg("!chat hello")))
ps = _ps(bot)
# History should be empty -- user msg was removed on failure
assert len(ps["histories"].get("alice", [])) == 0

View File

@@ -2,7 +2,7 @@
import asyncio
import struct
from unittest.mock import patch
from unittest.mock import AsyncMock, MagicMock, patch
from derp.mumble import (
MumbleBot,
@@ -645,3 +645,95 @@ class TestPcmRamping:
assert samples[1] == 7500
assert samples[2] == 5000
assert samples[3] == 2500
# ---------------------------------------------------------------------------
# TestIsAudioReady
# ---------------------------------------------------------------------------
class TestIsAudioReady:
def test_no_mumble_object(self):
bot = _make_bot()
bot._mumble = None
assert bot._is_audio_ready() is False
def test_no_sound_output(self):
bot = _make_bot()
bot._mumble = MagicMock()
bot._mumble.sound_output = None
assert bot._is_audio_ready() is False
def test_no_encoder(self):
bot = _make_bot()
bot._mumble = MagicMock()
bot._mumble.sound_output.encoder = None
assert bot._is_audio_ready() is False
def test_ready(self):
bot = _make_bot()
bot._mumble = MagicMock()
bot._mumble.sound_output.encoder = MagicMock()
assert bot._is_audio_ready() is True
def test_attribute_error_handled(self):
bot = _make_bot()
bot._mumble = MagicMock()
del bot._mumble.sound_output
assert bot._is_audio_ready() is False
# ---------------------------------------------------------------------------
# TestStreamAudioDisconnect
# ---------------------------------------------------------------------------
class TestStreamAudioDisconnect:
def test_stream_survives_disconnect(self):
"""stream_audio keeps ffmpeg alive when connection drops mid-stream."""
bot = _make_bot()
bot._mumble = MagicMock()
bot._mumble.sound_output.encoder = MagicMock()
bot._mumble.sound_output.get_buffer_size.return_value = 0.0
frame = b"\x00" * 1920
# Track which frame we're on; disconnect after frame 3
frame_count = [0]
connected = [True]
async def _fake_read(n):
if frame_count[0] < 5:
frame_count[0] += 1
# Disconnect after 3 frames are read
if frame_count[0] > 3:
connected[0] = False
return frame
return b""
def _ready():
return connected[0]
proc = MagicMock()
proc.stdout.read = _fake_read
proc.stderr.read = AsyncMock(return_value=b"")
proc.wait = AsyncMock(return_value=0)
proc.kill = MagicMock()
progress = [0]
async def _run():
with patch.object(bot, "_is_audio_ready", side_effect=_ready):
with patch("asyncio.create_subprocess_exec",
return_value=proc):
await bot.stream_audio(
"http://example.com/audio",
volume=0.5,
progress=progress,
)
asyncio.run(_run())
# All 5 frames were read (progress tracks all, connected or not)
assert progress[0] == 5
# Only 3 frames were fed to sound_output (the connected ones)
assert bot._mumble.sound_output.add_sound.call_count == 3

View File

@@ -3,6 +3,8 @@
import asyncio
import importlib.util
import sys
import time
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
# -- Load plugin module directly ---------------------------------------------
@@ -40,6 +42,7 @@ class _FakeBot:
self.sent: list[tuple[str, str]] = []
self.replied: list[str] = []
self.state = _FakeState()
self.config: dict = {}
self._pstate: dict = {}
self._tasks: set[asyncio.Task] = set()
if mumble:
@@ -134,6 +137,25 @@ class TestPlayCommand:
assert len(ps["queue"]) == 1
assert ps["queue"][0].title == "Test Track"
def test_play_search_query(self):
bot = _FakeBot()
msg = _Msg(text="!play classical music")
tracks = [
("https://youtube.com/watch?v=a", "Result 1"),
("https://youtube.com/watch?v=b", "Result 2"),
("https://youtube.com/watch?v=c", "Result 3"),
]
with patch.object(_mod, "_resolve_tracks", return_value=tracks) as mock_rt:
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_play(bot, msg))
# Should prepend ytsearch10: for non-URL input
mock_rt.assert_called_once()
assert mock_rt.call_args[0][0] == "ytsearch10:classical music"
# Should pick one random result, not enqueue all
ps = _mod._ps(bot)
assert len(ps["queue"]) == 1
assert any("Playing" in r for r in bot.replied)
def test_play_shows_queued_when_busy(self):
bot = _FakeBot()
ps = _mod._ps(bot)
@@ -300,8 +322,41 @@ class TestVolumeCommand:
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_volume_negative(self):
def test_volume_negative_absolute(self):
"""Bare negative that underflows clamps at 0-100 error."""
bot = _FakeBot()
_mod._ps(bot)["volume"] = 5
msg = _Msg(text="!volume -10")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_volume_relative_up(self):
bot = _FakeBot()
msg = _Msg(text="!volume +15")
asyncio.run(_mod.cmd_volume(bot, msg))
ps = _mod._ps(bot)
assert ps["volume"] == 65
assert any("65%" in r for r in bot.replied)
def test_volume_relative_down(self):
bot = _FakeBot()
_mod._ps(bot)["volume"] = 80
msg = _Msg(text="!volume -20")
asyncio.run(_mod.cmd_volume(bot, msg))
ps = _mod._ps(bot)
assert ps["volume"] == 60
assert any("60%" in r for r in bot.replied)
def test_volume_relative_clamp_over(self):
bot = _FakeBot()
_mod._ps(bot)["volume"] = 95
msg = _Msg(text="!volume +10")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_volume_relative_clamp_under(self):
bot = _FakeBot()
_mod._ps(bot)["volume"] = 5
msg = _Msg(text="!volume -10")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("0-100" in r for r in bot.replied)
@@ -354,6 +409,21 @@ class TestMusicHelpers:
assert len(result) == 80
assert result.endswith("...")
def test_is_url_http(self):
assert _mod._is_url("https://youtube.com/watch?v=abc") is True
def test_is_url_plain_http(self):
assert _mod._is_url("http://example.com") is True
def test_is_url_ytsearch(self):
assert _mod._is_url("ytsearch:classical music") is True
def test_is_url_search_query(self):
assert _mod._is_url("classical music") is False
def test_is_url_single_word(self):
assert _mod._is_url("jazz") is False
# ---------------------------------------------------------------------------
# TestPlaylistExpansion
@@ -431,6 +501,14 @@ class TestPlaylistExpansion:
tracks = _mod._resolve_tracks("https://example.com/v1")
assert tracks == [("https://example.com/v1", "Single Video")]
def test_resolve_tracks_na_url_fallback(self):
"""--flat-playlist prints NA for single videos; use original URL."""
result = MagicMock()
result.stdout = "NA\nSingle Video\n"
with patch("subprocess.run", return_value=result):
tracks = _mod._resolve_tracks("https://example.com/v1")
assert tracks == [("https://example.com/v1", "Single Video")]
def test_resolve_tracks_playlist(self):
"""Subprocess returning multiple url+title pairs."""
result = MagicMock()
@@ -457,3 +535,827 @@ class TestPlaylistExpansion:
with patch("subprocess.run", return_value=result):
tracks = _mod._resolve_tracks("https://example.com/empty")
assert tracks == [("https://example.com/empty", "https://example.com/empty")]
# ---------------------------------------------------------------------------
# TestResumeState
# ---------------------------------------------------------------------------
class TestResumeState:
def test_save_load_roundtrip(self):
bot = _FakeBot()
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 125.5)
data = _mod._load_resume(bot)
assert data is not None
assert data["url"] == "https://example.com/a"
assert data["title"] == "Song"
assert data["requester"] == "Alice"
assert data["elapsed"] == 125.5
def test_clear_removes_state(self):
bot = _FakeBot()
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 60.0)
_mod._clear_resume(bot)
assert _mod._load_resume(bot) is None
def test_load_returns_none_when_empty(self):
bot = _FakeBot()
assert _mod._load_resume(bot) is None
def test_load_returns_none_on_corrupt_json(self):
bot = _FakeBot()
bot.state.set("music", "resume", "not-json{{{")
assert _mod._load_resume(bot) is None
def test_load_returns_none_on_missing_url(self):
bot = _FakeBot()
bot.state.set("music", "resume", '{"title": "x"}')
assert _mod._load_resume(bot) is None
# ---------------------------------------------------------------------------
# TestResumeCommand
# ---------------------------------------------------------------------------
class TestResumeCommand:
def test_nothing_saved(self):
bot = _FakeBot()
msg = _Msg(text="!resume")
asyncio.run(_mod.cmd_resume(bot, msg))
assert any("Nothing to resume" in r for r in bot.replied)
def test_already_playing(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="Playing", requester="a")
msg = _Msg(text="!resume")
asyncio.run(_mod.cmd_resume(bot, msg))
assert any("Already playing" in r for r in bot.replied)
def test_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!resume")
asyncio.run(_mod.cmd_resume(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
def test_loads_track_and_seeks(self):
bot = _FakeBot()
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 225.0)
msg = _Msg(text="!resume")
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod.cmd_resume(bot, msg))
mock_loop.assert_called_once_with(bot, seek=225.0)
ps = _mod._ps(bot)
assert len(ps["queue"]) == 1
assert ps["queue"][0].url == "https://example.com/a"
assert any("Resuming" in r for r in bot.replied)
def test_time_format_in_reply(self):
bot = _FakeBot()
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 225.0)
msg = _Msg(text="!resume")
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_resume(bot, msg))
assert any("3:45" in r for r in bot.replied)
def test_clears_resume_state_after_loading(self):
bot = _FakeBot()
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 60.0)
msg = _Msg(text="!resume")
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_resume(bot, msg))
assert _mod._load_resume(bot) is None
# ---------------------------------------------------------------------------
# TestFmtTime
# ---------------------------------------------------------------------------
class TestFmtTime:
def test_zero(self):
assert _mod._fmt_time(0) == "0:00"
def test_seconds_only(self):
assert _mod._fmt_time(45) == "0:45"
def test_minutes_and_seconds(self):
assert _mod._fmt_time(225) == "3:45"
def test_large_value(self):
assert _mod._fmt_time(3661) == "61:01"
# ---------------------------------------------------------------------------
# TestDuckCommand
# ---------------------------------------------------------------------------
class TestDuckCommand:
def test_show_status(self):
bot = _FakeBot()
msg = _Msg(text="!duck")
asyncio.run(_mod.cmd_duck(bot, msg))
assert any("Duck:" in r for r in bot.replied)
assert any("floor=1%" in r for r in bot.replied)
assert any("restore=30s" in r for r in bot.replied)
def test_toggle_on(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = False
msg = _Msg(text="!duck on")
asyncio.run(_mod.cmd_duck(bot, msg))
assert ps["duck_enabled"] is True
assert any("enabled" in r for r in bot.replied)
def test_toggle_off(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_vol"] = 5.0
msg = _Msg(text="!duck off")
asyncio.run(_mod.cmd_duck(bot, msg))
assert ps["duck_enabled"] is False
assert ps["duck_vol"] is None
assert any("disabled" in r for r in bot.replied)
def test_set_floor(self):
bot = _FakeBot()
msg = _Msg(text="!duck floor 10")
asyncio.run(_mod.cmd_duck(bot, msg))
ps = _mod._ps(bot)
assert ps["duck_floor"] == 10
assert any("10%" in r for r in bot.replied)
def test_set_floor_invalid(self):
bot = _FakeBot()
msg = _Msg(text="!duck floor 200")
asyncio.run(_mod.cmd_duck(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_set_silence(self):
bot = _FakeBot()
msg = _Msg(text="!duck silence 30")
asyncio.run(_mod.cmd_duck(bot, msg))
ps = _mod._ps(bot)
assert ps["duck_silence"] == 30
assert any("30s" in r for r in bot.replied)
def test_set_restore(self):
bot = _FakeBot()
msg = _Msg(text="!duck restore 45")
asyncio.run(_mod.cmd_duck(bot, msg))
ps = _mod._ps(bot)
assert ps["duck_restore"] == 45
assert any("45s" in r for r in bot.replied)
def test_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!duck")
asyncio.run(_mod.cmd_duck(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestDuckMonitor
# ---------------------------------------------------------------------------
class TestDuckMonitor:
def test_voice_detected_ducks_to_floor(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_floor"] = 5
bot._last_voice_ts = time.monotonic()
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
await asyncio.sleep(1.5)
assert ps["duck_vol"] == 5.0
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_silence_begins_smooth_restore(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_floor"] = 1
ps["duck_restore"] = 10 # 10s total restore
ps["volume"] = 50
bot._last_voice_ts = time.monotonic() - 100
ps["duck_vol"] = 1.0 # already ducked
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
await asyncio.sleep(1.5)
# After ~1s into a 10s ramp from 1->50, vol should be ~5-6
vol = ps["duck_vol"]
assert vol is not None and vol > 1.0
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_full_restore_sets_none(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_floor"] = 1
ps["duck_restore"] = 1 # 1s restore -- completes quickly
ps["volume"] = 50
bot._last_voice_ts = time.monotonic() - 100
ps["duck_vol"] = 1.0
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
# First tick starts restore, second tick sees elapsed >= dur
await asyncio.sleep(2.5)
assert ps["duck_vol"] is None
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_reduck_during_restore(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_floor"] = 5
ps["duck_restore"] = 30
ps["volume"] = 50
bot._last_voice_ts = time.monotonic() - 100
ps["duck_vol"] = 30.0 # mid-restore
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
await asyncio.sleep(0.5)
# Simulate voice arriving now
bot._last_voice_ts = time.monotonic()
await asyncio.sleep(1.5)
assert ps["duck_vol"] == 5.0 # re-ducked to floor
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_disabled_no_ducking(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = False
bot._last_voice_ts = time.monotonic()
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
await asyncio.sleep(1.5)
assert ps["duck_vol"] is None
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
# ---------------------------------------------------------------------------
# TestAutoResume
# ---------------------------------------------------------------------------
class TestAutoResume:
def test_resume_on_silence(self):
"""Auto-resume loads saved state when channel is silent."""
bot = _FakeBot()
bot._connect_count = 2
bot._last_voice_ts = 0.0
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 120.0)
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod._auto_resume(bot))
mock_loop.assert_called_once_with(bot, seek=120.0)
ps = _mod._ps(bot)
assert len(ps["queue"]) == 1
assert ps["queue"][0].url == "https://example.com/a"
# Resume state cleared after loading
assert _mod._load_resume(bot) is None
def test_no_resume_if_playing(self):
"""Auto-resume returns early when already playing."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="Playing", requester="a")
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 60.0)
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod._auto_resume(bot))
mock_loop.assert_not_called()
def test_no_resume_if_no_state(self):
"""Auto-resume returns early when nothing is saved."""
bot = _FakeBot()
bot._last_voice_ts = 0.0
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod._auto_resume(bot))
mock_loop.assert_not_called()
def test_abort_if_voice_active(self):
"""Auto-resume aborts if voice never goes silent within deadline."""
bot = _FakeBot()
now = time.monotonic()
bot._last_voice_ts = now
ps = _mod._ps(bot)
ps["duck_silence"] = 15
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 60.0)
async def _check():
# Patch monotonic to jump past the 60s deadline; keep voice active
mono_val = [now]
_real_sleep = asyncio.sleep
def _fast_mono():
return mono_val[0]
async def _fast_sleep(s):
mono_val[0] += s
bot._last_voice_ts = mono_val[0]
await _real_sleep(0)
with patch.object(time, "monotonic", side_effect=_fast_mono):
with patch("asyncio.sleep", side_effect=_fast_sleep):
with patch.object(_mod, "_ensure_loop") as mock_loop:
await _mod._auto_resume(bot)
mock_loop.assert_not_called()
asyncio.run(_check())
def test_reconnect_watcher_triggers_resume(self):
"""Watcher detects connect_count increment and calls _auto_resume."""
bot = _FakeBot()
bot._connect_count = 1
async def _check():
with patch.object(_mod, "_auto_resume", new_callable=AsyncMock) as mock_ar:
task = asyncio.create_task(_mod._reconnect_watcher(bot))
await asyncio.sleep(0.5)
# Simulate reconnection
bot._connect_count = 2
await asyncio.sleep(3)
mock_ar.assert_called_once_with(bot)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_watcher_ignores_first_connect(self):
"""Watcher does not trigger on initial connection (count 0->1) without saved state."""
bot = _FakeBot()
bot._connect_count = 0
async def _check():
with patch.object(_mod, "_auto_resume", new_callable=AsyncMock) as mock_ar:
task = asyncio.create_task(_mod._reconnect_watcher(bot))
await asyncio.sleep(0.5)
bot._connect_count = 1
await asyncio.sleep(3)
mock_ar.assert_not_called()
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_watcher_boot_resume_with_saved_state(self):
"""Watcher triggers boot-resume on first connect when state exists."""
bot = _FakeBot()
bot._connect_count = 0
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 30.0)
async def _check():
with patch.object(_mod, "_auto_resume", new_callable=AsyncMock) as mock_ar:
task = asyncio.create_task(_mod._reconnect_watcher(bot))
await asyncio.sleep(0.5)
bot._connect_count = 1
await asyncio.sleep(3)
mock_ar.assert_called_once_with(bot)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_on_connected_starts_watcher(self):
"""on_connected() starts the reconnect watcher task."""
bot = _FakeBot()
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
# Close the coroutine to avoid RuntimeWarning
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
assert "music-reconnect-watcher" in spawned
ps = _mod._ps(bot)
assert ps["_watcher_task"] is not None
def test_on_connected_no_double_start(self):
"""on_connected() does not start a second watcher."""
bot = _FakeBot()
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
asyncio.run(_mod.on_connected(bot))
assert spawned.count("music-reconnect-watcher") == 1
# ---------------------------------------------------------------------------
# TestDownloadTrack
# ---------------------------------------------------------------------------
class TestDownloadTrack:
def test_download_success(self, tmp_path):
"""Successful download returns a Path."""
music_dir = tmp_path / "music"
with patch.object(_mod, "_MUSIC_DIR", music_dir):
result = MagicMock()
result.stdout = str(music_dir / "abc123.opus") + "\n"
result.returncode = 0
# Create the file so is_file() returns True
music_dir.mkdir(parents=True)
(music_dir / "abc123.opus").write_bytes(b"audio")
with patch("subprocess.run", return_value=result):
path = _mod._download_track("https://example.com/v", "abc123")
assert path is not None
assert path.name == "abc123.opus"
def test_download_fallback_glob(self, tmp_path):
"""Falls back to glob when --print output is empty."""
music_dir = tmp_path / "music"
with patch.object(_mod, "_MUSIC_DIR", music_dir):
result = MagicMock()
result.stdout = ""
result.returncode = 0
music_dir.mkdir(parents=True)
(music_dir / "abc123.webm").write_bytes(b"audio")
with patch("subprocess.run", return_value=result):
path = _mod._download_track("https://example.com/v", "abc123")
assert path is not None
assert path.name == "abc123.webm"
def test_download_failure_returns_none(self, tmp_path):
"""Exception during download returns None."""
music_dir = tmp_path / "music"
with patch.object(_mod, "_MUSIC_DIR", music_dir):
with patch("subprocess.run", side_effect=Exception("fail")):
path = _mod._download_track("https://example.com/v", "abc123")
assert path is None
def test_download_no_file_returns_none(self, tmp_path):
"""No matching file on disk returns None."""
music_dir = tmp_path / "music"
with patch.object(_mod, "_MUSIC_DIR", music_dir):
result = MagicMock()
result.stdout = "/nonexistent/path.opus\n"
result.returncode = 0
music_dir.mkdir(parents=True)
with patch("subprocess.run", return_value=result):
path = _mod._download_track("https://example.com/v", "abc123")
assert path is None
# ---------------------------------------------------------------------------
# TestCleanupTrack
# ---------------------------------------------------------------------------
class TestCleanupTrack:
def test_cleanup_deletes_file(self, tmp_path):
"""Cleanup deletes the local file when keep=False."""
f = tmp_path / "test.opus"
f.write_bytes(b"audio")
track = _mod._Track(
url="x", title="t", requester="a",
local_path=f, keep=False,
)
_mod._cleanup_track(track)
assert not f.exists()
def test_cleanup_keeps_file_when_flagged(self, tmp_path):
"""Cleanup preserves the file when keep=True."""
f = tmp_path / "test.opus"
f.write_bytes(b"audio")
track = _mod._Track(
url="x", title="t", requester="a",
local_path=f, keep=True,
)
_mod._cleanup_track(track)
assert f.exists()
def test_cleanup_noop_when_no_path(self):
"""Cleanup does nothing when local_path is None."""
track = _mod._Track(url="x", title="t", requester="a")
_mod._cleanup_track(track) # should not raise
# ---------------------------------------------------------------------------
# TestKeepCommand
# ---------------------------------------------------------------------------
class TestKeepCommand:
def test_keep_nothing_playing(self):
bot = _FakeBot()
msg = _Msg(text="!keep")
asyncio.run(_mod.cmd_keep(bot, msg))
assert any("Nothing playing" in r for r in bot.replied)
def test_keep_no_local_file(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="t", requester="a")
msg = _Msg(text="!keep")
asyncio.run(_mod.cmd_keep(bot, msg))
assert any("No local file" in r for r in bot.replied)
def test_keep_marks_track(self, tmp_path):
bot = _FakeBot()
ps = _mod._ps(bot)
f = tmp_path / "abc123.opus"
f.write_bytes(b"audio")
track = _mod._Track(
url="x", title="t", requester="a", local_path=f,
)
ps["current"] = track
msg = _Msg(text="!keep")
asyncio.run(_mod.cmd_keep(bot, msg))
assert track.keep is True
assert any("Keeping" in r for r in bot.replied)
def test_keep_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!keep")
asyncio.run(_mod.cmd_keep(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestKeptCommand
# ---------------------------------------------------------------------------
class TestKeptCommand:
def test_kept_empty(self, tmp_path):
bot = _FakeBot()
with patch.object(_mod, "_MUSIC_DIR", tmp_path / "empty"):
msg = _Msg(text="!kept")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("No kept files" in r for r in bot.replied)
def test_kept_lists_files(self, tmp_path):
bot = _FakeBot()
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "abc123.opus").write_bytes(b"x" * 1024)
with patch.object(_mod, "_MUSIC_DIR", music_dir):
msg = _Msg(text="!kept")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("Kept files" in r for r in bot.replied)
assert any("abc123.opus" in r for r in bot.replied)
def test_kept_clear(self, tmp_path):
bot = _FakeBot()
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "abc123.opus").write_bytes(b"audio")
(music_dir / "def456.webm").write_bytes(b"audio")
with patch.object(_mod, "_MUSIC_DIR", music_dir):
msg = _Msg(text="!kept clear")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("Deleted 2 file(s)" in r for r in bot.replied)
assert not list(music_dir.iterdir())
def test_kept_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!kept")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestParseSeek
# ---------------------------------------------------------------------------
class TestParseSeek:
def test_absolute_seconds(self):
assert _mod._parse_seek("90") == ("abs", 90.0)
def test_absolute_mss(self):
assert _mod._parse_seek("1:30") == ("abs", 90.0)
def test_relative_forward(self):
assert _mod._parse_seek("+30") == ("rel", 30.0)
def test_relative_backward(self):
assert _mod._parse_seek("-30") == ("rel", -30.0)
def test_relative_mss(self):
assert _mod._parse_seek("+1:30") == ("rel", 90.0)
def test_relative_backward_mss(self):
assert _mod._parse_seek("-1:30") == ("rel", -90.0)
def test_invalid_raises(self):
import pytest
with pytest.raises(ValueError):
_mod._parse_seek("abc")
def test_empty_raises(self):
import pytest
with pytest.raises(ValueError):
_mod._parse_seek("")
# ---------------------------------------------------------------------------
# TestSeekCommand
# ---------------------------------------------------------------------------
class TestSeekCommand:
def test_seek_nothing_playing(self):
bot = _FakeBot()
msg = _Msg(text="!seek 1:30")
asyncio.run(_mod.cmd_seek(bot, msg))
assert any("Nothing playing" in r for r in bot.replied)
def test_seek_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!seek 1:30")
asyncio.run(_mod.cmd_seek(bot, msg))
assert bot.replied == []
def test_seek_no_arg(self):
bot = _FakeBot()
msg = _Msg(text="!seek")
asyncio.run(_mod.cmd_seek(bot, msg))
assert any("Usage" in r for r in bot.replied)
def test_seek_invalid_arg(self):
bot = _FakeBot()
msg = _Msg(text="!seek xyz")
asyncio.run(_mod.cmd_seek(bot, msg))
assert any("Usage" in r for r in bot.replied)
def test_seek_absolute(self):
bot = _FakeBot()
ps = _mod._ps(bot)
track = _mod._Track(url="x", title="Song", requester="a")
ps["current"] = track
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
msg = _Msg(text="!seek 1:30")
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod.cmd_seek(bot, msg))
mock_loop.assert_called_once_with(bot, seek=90.0)
assert ps["queue"][0] is track
assert any("1:30" in r for r in bot.replied)
mock_task.cancel.assert_called_once()
def test_seek_relative_forward(self):
bot = _FakeBot()
ps = _mod._ps(bot)
track = _mod._Track(url="x", title="Song", requester="a")
ps["current"] = track
ps["progress"] = [1500] # 1500 * 0.02 = 30s
ps["cur_seek"] = 60.0 # started at 60s
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
msg = _Msg(text="!seek +30")
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod.cmd_seek(bot, msg))
# elapsed = 60 + 30 = 90, target = 90 + 30 = 120
mock_loop.assert_called_once_with(bot, seek=120.0)
def test_seek_relative_backward_clamps(self):
bot = _FakeBot()
ps = _mod._ps(bot)
track = _mod._Track(url="x", title="Song", requester="a")
ps["current"] = track
ps["progress"] = [500] # 500 * 0.02 = 10s
ps["cur_seek"] = 0.0
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
msg = _Msg(text="!seek -30")
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod.cmd_seek(bot, msg))
# elapsed = 0 + 10 = 10, target = 10 - 30 = -20, clamped to 0
mock_loop.assert_called_once_with(bot, seek=0.0)
# ---------------------------------------------------------------------------
# TestVolumePersistence
# ---------------------------------------------------------------------------
class TestVolumePersistence:
def test_volume_persists_to_state(self):
bot = _FakeBot()
msg = _Msg(text="!volume 75")
asyncio.run(_mod.cmd_volume(bot, msg))
assert bot.state.get("music", "volume") == "75"
def test_volume_loads_on_connect(self):
bot = _FakeBot()
bot.state.set("music", "volume", "80")
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
ps = _mod._ps(bot)
assert ps["volume"] == 80
def test_volume_loads_clamps_high(self):
bot = _FakeBot()
bot.state.set("music", "volume", "200")
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
ps = _mod._ps(bot)
assert ps["volume"] == 100
def test_volume_loads_ignores_invalid(self):
bot = _FakeBot()
bot.state.set("music", "volume", "notanumber")
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
ps = _mod._ps(bot)
assert ps["volume"] == 50 # default unchanged

798
tests/test_voice.py Normal file
View File

@@ -0,0 +1,798 @@
"""Tests for the voice STT/TTS plugin."""
import asyncio
import importlib.util
import io
import sys
import time
import wave
from unittest.mock import AsyncMock, MagicMock, patch
# -- Load plugin module directly ---------------------------------------------
_spec = importlib.util.spec_from_file_location("voice", "plugins/voice.py")
_mod = importlib.util.module_from_spec(_spec)
sys.modules["voice"] = _mod
_spec.loader.exec_module(_mod)
# -- Fakes -------------------------------------------------------------------
class _FakeState:
def __init__(self):
self._store: dict[str, dict[str, str]] = {}
def get(self, ns: str, key: str) -> str | None:
return self._store.get(ns, {}).get(key)
def set(self, ns: str, key: str, value: str) -> None:
self._store.setdefault(ns, {})[key] = value
def delete(self, ns: str, key: str) -> None:
self._store.get(ns, {}).pop(key, None)
def keys(self, ns: str) -> list[str]:
return list(self._store.get(ns, {}).keys())
class _FakeBot:
"""Minimal bot for voice plugin testing."""
def __init__(self, *, mumble: bool = True):
self.sent: list[tuple[str, str]] = []
self.replied: list[str] = []
self.actions: list[tuple[str, str]] = []
self.state = _FakeState()
self.config: dict = {}
self._pstate: dict = {}
self._tasks: set[asyncio.Task] = set()
self.nick = "derp"
self._sound_listeners: list = []
if mumble:
self.stream_audio = AsyncMock()
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
async def reply(self, message, text: str) -> None:
self.replied.append(text)
async def action(self, target: str, text: str) -> None:
self.actions.append((target, text))
def _spawn(self, coro, *, name=None):
task = asyncio.ensure_future(coro)
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
return task
class _Msg:
"""Minimal message object."""
def __init__(self, text="!listen", nick="Alice", target="0",
is_channel=True):
self.text = text
self.nick = nick
self.target = target
self.is_channel = is_channel
self.prefix = nick
self.command = "PRIVMSG"
self.params = [target, text]
self.tags = {}
self.raw = {}
class _FakeSoundChunk:
"""Minimal sound chunk with PCM data."""
def __init__(self, pcm: bytes = b"\x00\x00" * 960):
self.pcm = pcm
# ---------------------------------------------------------------------------
# TestMumbleGuard
# ---------------------------------------------------------------------------
class TestMumbleGuard:
def test_is_mumble_true(self):
bot = _FakeBot(mumble=True)
assert _mod._is_mumble(bot) is True
def test_is_mumble_false(self):
bot = _FakeBot(mumble=False)
assert _mod._is_mumble(bot) is False
def test_listen_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!listen on")
asyncio.run(_mod.cmd_listen(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
def test_say_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!say hello")
asyncio.run(_mod.cmd_say(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestListenCommand
# ---------------------------------------------------------------------------
class TestListenCommand:
def test_listen_status(self):
bot = _FakeBot()
msg = _Msg(text="!listen")
asyncio.run(_mod.cmd_listen(bot, msg))
assert any("off" in r.lower() for r in bot.replied)
def test_listen_on(self):
bot = _FakeBot()
msg = _Msg(text="!listen on")
asyncio.run(_mod.cmd_listen(bot, msg))
ps = _mod._ps(bot)
assert ps["listen"] is True
assert any("Listening" in r for r in bot.replied)
def test_listen_off(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["listen"] = True
ps["buffers"]["Alice"] = bytearray(b"\x00" * 100)
ps["last_ts"]["Alice"] = time.monotonic()
msg = _Msg(text="!listen off")
asyncio.run(_mod.cmd_listen(bot, msg))
assert ps["listen"] is False
assert ps["buffers"] == {}
assert ps["last_ts"] == {}
assert any("Stopped" in r for r in bot.replied)
def test_listen_invalid(self):
bot = _FakeBot()
msg = _Msg(text="!listen maybe")
asyncio.run(_mod.cmd_listen(bot, msg))
assert any("Usage" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestSayCommand
# ---------------------------------------------------------------------------
class TestSayCommand:
def test_say_no_text(self):
bot = _FakeBot()
msg = _Msg(text="!say")
asyncio.run(_mod.cmd_say(bot, msg))
assert any("Usage" in r for r in bot.replied)
def test_say_too_long(self):
bot = _FakeBot()
text = "x" * 501
msg = _Msg(text=f"!say {text}")
asyncio.run(_mod.cmd_say(bot, msg))
assert any("too long" in r.lower() for r in bot.replied)
def test_say_spawns_task(self):
bot = _FakeBot()
msg = _Msg(text="!say hello world")
spawned = []
original_spawn = bot._spawn
def track_spawn(coro, *, name=None):
spawned.append(name)
coro.close()
task = MagicMock()
task.done.return_value = False
return task
bot._spawn = track_spawn
asyncio.run(_mod.cmd_say(bot, msg))
assert "voice-tts" in spawned
# ---------------------------------------------------------------------------
# TestAudioBuffering
# ---------------------------------------------------------------------------
class TestAudioBuffering:
def test_accumulates_pcm(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["listen"] = True
user = {"name": "Alice"}
chunk = _FakeSoundChunk(b"\x01\x02" * 480)
_mod._on_voice(bot, user, chunk)
assert "Alice" in ps["buffers"]
assert len(ps["buffers"]["Alice"]) == 960
def test_ignores_own_nick(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["listen"] = True
user = {"name": "derp"}
chunk = _FakeSoundChunk(b"\x01\x02" * 480)
_mod._on_voice(bot, user, chunk)
assert "derp" not in ps["buffers"]
def test_respects_listen_false(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["listen"] = False
user = {"name": "Alice"}
chunk = _FakeSoundChunk(b"\x01\x02" * 480)
_mod._on_voice(bot, user, chunk)
assert ps["buffers"] == {}
def test_caps_at_max_bytes(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["listen"] = True
user = {"name": "Alice"}
# Fill beyond max
big_chunk = _FakeSoundChunk(b"\x00\x01" * (_mod._MAX_BYTES // 2 + 100))
_mod._on_voice(bot, user, big_chunk)
assert len(ps["buffers"]["Alice"]) <= _mod._MAX_BYTES
def test_empty_pcm_ignored(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["listen"] = True
user = {"name": "Alice"}
chunk = _FakeSoundChunk(b"")
_mod._on_voice(bot, user, chunk)
assert "Alice" not in ps["buffers"]
def test_none_user_ignored(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["listen"] = True
chunk = _FakeSoundChunk(b"\x01\x02" * 480)
_mod._on_voice(bot, "not_a_dict", chunk)
assert ps["buffers"] == {}
def test_updates_timestamp(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["listen"] = True
user = {"name": "Alice"}
chunk = _FakeSoundChunk(b"\x01\x02" * 480)
_mod._on_voice(bot, user, chunk)
assert "Alice" in ps["last_ts"]
ts1 = ps["last_ts"]["Alice"]
_mod._on_voice(bot, user, chunk)
assert ps["last_ts"]["Alice"] >= ts1
# ---------------------------------------------------------------------------
# TestFlushLogic
# ---------------------------------------------------------------------------
class TestFlushLogic:
def test_silence_gap_triggers_flush(self):
"""Buffer is flushed and transcribed after silence gap."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["listen"] = True
ps["silence_gap"] = 0.1 # very short for testing
# Pre-populate buffer with enough PCM (> _MIN_BYTES)
pcm = b"\x00\x01" * (_mod._MIN_BYTES // 2 + 100)
with ps["lock"]:
ps["buffers"]["Alice"] = bytearray(pcm)
ps["last_ts"]["Alice"] = time.monotonic() - 1.0 # already silent
async def _check():
with patch.object(_mod, "_transcribe", return_value="hello"):
task = asyncio.create_task(_mod._flush_monitor(bot))
await asyncio.sleep(1.0)
ps["listen"] = False # stop the monitor
await asyncio.sleep(0.2)
try:
await asyncio.wait_for(task, timeout=2)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
assert any("hello" in a[1] for a in bot.actions)
asyncio.run(_check())
def test_min_duration_filter(self):
"""Short utterances (< _MIN_BYTES) are discarded."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["listen"] = True
ps["silence_gap"] = 0.1
# Buffer too small
with ps["lock"]:
ps["buffers"]["Alice"] = bytearray(b"\x00\x01" * 10)
ps["last_ts"]["Alice"] = time.monotonic() - 1.0
async def _check():
with patch.object(_mod, "_transcribe", return_value="x") as mock_t:
task = asyncio.create_task(_mod._flush_monitor(bot))
await asyncio.sleep(0.5)
ps["listen"] = False
await asyncio.sleep(0.2)
try:
await asyncio.wait_for(task, timeout=2)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
mock_t.assert_not_called()
asyncio.run(_check())
def test_buffer_cleared_after_flush(self):
"""Buffer and timestamp are removed after flushing."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["listen"] = True
ps["silence_gap"] = 0.1
pcm = b"\x00\x01" * (_mod._MIN_BYTES // 2 + 100)
with ps["lock"]:
ps["buffers"]["Alice"] = bytearray(pcm)
ps["last_ts"]["Alice"] = time.monotonic() - 1.0
async def _check():
with patch.object(_mod, "_transcribe", return_value="test"):
task = asyncio.create_task(_mod._flush_monitor(bot))
await asyncio.sleep(0.5)
ps["listen"] = False
await asyncio.sleep(0.2)
try:
await asyncio.wait_for(task, timeout=2)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
assert "Alice" not in ps["buffers"]
assert "Alice" not in ps["last_ts"]
asyncio.run(_check())
# ---------------------------------------------------------------------------
# TestPcmToWav
# ---------------------------------------------------------------------------
class TestPcmToWav:
def test_valid_wav(self):
pcm = b"\x00\x00" * 48000 # 1 second of silence
wav_data = _mod._pcm_to_wav(pcm)
# Should start with RIFF header
assert wav_data[:4] == b"RIFF"
# Parse it back
buf = io.BytesIO(wav_data)
with wave.open(buf, "rb") as wf:
assert wf.getnchannels() == 1
assert wf.getsampwidth() == 2
assert wf.getframerate() == 48000
assert wf.getnframes() == 48000
def test_empty_pcm(self):
wav_data = _mod._pcm_to_wav(b"")
buf = io.BytesIO(wav_data)
with wave.open(buf, "rb") as wf:
assert wf.getnframes() == 0
# ---------------------------------------------------------------------------
# TestTranscribe
# ---------------------------------------------------------------------------
class TestTranscribe:
def test_parse_json_response(self):
ps = {"whisper_url": "http://localhost:8080/inference"}
pcm = b"\x00\x00" * 4800 # 0.1s
resp = MagicMock()
resp.read.return_value = b'{"text": "hello world"}'
with patch.object(_mod, "_urlopen", return_value=resp):
text = _mod._transcribe(ps, pcm)
assert text == "hello world"
def test_empty_text(self):
ps = {"whisper_url": "http://localhost:8080/inference"}
pcm = b"\x00\x00" * 4800
resp = MagicMock()
resp.read.return_value = b'{"text": ""}'
with patch.object(_mod, "_urlopen", return_value=resp):
text = _mod._transcribe(ps, pcm)
assert text == ""
def test_missing_text_key(self):
ps = {"whisper_url": "http://localhost:8080/inference"}
pcm = b"\x00\x00" * 4800
resp = MagicMock()
resp.read.return_value = b'{"result": "something"}'
with patch.object(_mod, "_urlopen", return_value=resp):
text = _mod._transcribe(ps, pcm)
assert text == ""
# ---------------------------------------------------------------------------
# TestPerBotState
# ---------------------------------------------------------------------------
class TestPerBotState:
def test_ps_initializes(self):
bot = _FakeBot()
ps = _mod._ps(bot)
assert ps["listen"] is False
assert ps["buffers"] == {}
assert ps["last_ts"] == {}
def test_ps_stable_reference(self):
bot = _FakeBot()
ps1 = _mod._ps(bot)
ps2 = _mod._ps(bot)
assert ps1 is ps2
def test_ps_isolated_per_bot(self):
bot1 = _FakeBot()
bot2 = _FakeBot()
_mod._ps(bot1)["listen"] = True
assert _mod._ps(bot2)["listen"] is False
def test_ps_config_override(self):
bot = _FakeBot()
bot.config = {"voice": {"silence_gap": 3.0}}
ps = _mod._ps(bot)
assert ps["silence_gap"] == 3.0
# ---------------------------------------------------------------------------
# TestEnsureListener
# ---------------------------------------------------------------------------
class TestEnsureListener:
def test_registers_callback(self):
bot = _FakeBot()
_mod._ps(bot) # init state
_mod._ensure_listener(bot)
assert len(bot._sound_listeners) == 1
ps = _mod._ps(bot)
assert ps["_listener_registered"] is True
def test_idempotent(self):
bot = _FakeBot()
_mod._ps(bot)
_mod._ensure_listener(bot)
_mod._ensure_listener(bot)
assert len(bot._sound_listeners) == 1
def test_no_listener_without_attr(self):
bot = _FakeBot()
del bot._sound_listeners
_mod._ps(bot)
_mod._ensure_listener(bot)
# Should not raise, just skip
def test_callback_calls_on_voice(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["listen"] = True
_mod._ensure_listener(bot)
user = {"name": "Alice"}
chunk = _FakeSoundChunk(b"\x01\x02" * 480)
bot._sound_listeners[0](user, chunk)
assert "Alice" in ps["buffers"]
# ---------------------------------------------------------------------------
# TestOnConnected
# ---------------------------------------------------------------------------
class TestOnConnected:
def test_reregisters_when_listening(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["listen"] = True
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
assert ps["_listener_registered"] is True
assert "voice-flush-monitor" in spawned
def test_noop_when_not_listening(self):
bot = _FakeBot()
_mod._ps(bot) # init but listen=False
spawned = []
def fake_spawn(coro, *, name=None):
spawned.append(name)
coro.close()
return MagicMock()
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
assert "voice-flush-monitor" not in spawned
def test_noop_non_mumble(self):
bot = _FakeBot(mumble=False)
asyncio.run(_mod.on_connected(bot))
# Should not raise or register anything
# ---------------------------------------------------------------------------
# TestTriggerMode
# ---------------------------------------------------------------------------
class TestTriggerMode:
def test_trigger_config(self):
"""_ps() reads trigger from config."""
bot = _FakeBot()
bot.config = {"voice": {"trigger": "claude"}}
ps = _mod._ps(bot)
assert ps["trigger"] == "claude"
def test_trigger_default_empty(self):
"""trigger defaults to empty string (disabled)."""
bot = _FakeBot()
ps = _mod._ps(bot)
assert ps["trigger"] == ""
def test_trigger_buffers_without_listen(self):
"""_on_voice buffers when trigger is set, even with listen=False."""
bot = _FakeBot()
bot.config = {"voice": {"trigger": "claude"}}
ps = _mod._ps(bot)
assert ps["listen"] is False
user = {"name": "Alice"}
chunk = _FakeSoundChunk(b"\x01\x02" * 480)
_mod._on_voice(bot, user, chunk)
assert "Alice" in ps["buffers"]
assert len(ps["buffers"]["Alice"]) == 960
def test_trigger_detected_spawns_tts(self):
"""Flush monitor detects trigger word and spawns TTS."""
bot = _FakeBot()
bot.config = {"voice": {"trigger": "claude"}}
ps = _mod._ps(bot)
ps["silence_gap"] = 0.1
pcm = b"\x00\x01" * (_mod._MIN_BYTES // 2 + 100)
with ps["lock"]:
ps["buffers"]["Alice"] = bytearray(pcm)
ps["last_ts"]["Alice"] = time.monotonic() - 1.0
spawned = []
async def _check():
tts_hit = asyncio.Event()
def track_spawn(coro, *, name=None):
spawned.append(name)
if name == "voice-tts":
tts_hit.set()
coro.close()
task = MagicMock()
task.done.return_value = False
return task
bot._spawn = track_spawn
with patch.object(_mod, "_transcribe",
return_value="claude hello world"):
task = asyncio.create_task(_mod._flush_monitor(bot))
await asyncio.wait_for(tts_hit.wait(), timeout=5)
ps["trigger"] = ""
await asyncio.sleep(0.1)
try:
await asyncio.wait_for(task, timeout=2)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
assert "voice-tts" in spawned
asyncio.run(_check())
def test_trigger_strips_word(self):
"""Trigger word is stripped; only remainder goes to TTS."""
bot = _FakeBot()
bot.config = {"voice": {"trigger": "claude"}}
ps = _mod._ps(bot)
ps["silence_gap"] = 0.1
pcm = b"\x00\x01" * (_mod._MIN_BYTES // 2 + 100)
with ps["lock"]:
ps["buffers"]["Alice"] = bytearray(pcm)
ps["last_ts"]["Alice"] = time.monotonic() - 1.0
tts_texts = []
async def _check():
tts_hit = asyncio.Event()
async def _noop():
pass
def capturing_tts(bot_, text):
tts_texts.append(text)
return _noop()
def track_spawn(coro, *, name=None):
if name == "voice-tts":
tts_hit.set()
coro.close()
task = MagicMock()
task.done.return_value = False
return task
bot._spawn = track_spawn
original_tts = _mod._tts_play
_mod._tts_play = capturing_tts
try:
with patch.object(_mod, "_transcribe",
return_value="Claude hello world"):
task = asyncio.create_task(_mod._flush_monitor(bot))
await asyncio.wait_for(tts_hit.wait(), timeout=5)
ps["trigger"] = ""
await asyncio.sleep(0.1)
try:
await asyncio.wait_for(task, timeout=2)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
finally:
_mod._tts_play = original_tts
assert tts_texts == ["hello world"]
asyncio.run(_check())
def test_no_trigger_discards(self):
"""Non-triggered speech is silently discarded when only trigger active."""
bot = _FakeBot()
bot.config = {"voice": {"trigger": "claude"}}
ps = _mod._ps(bot)
ps["silence_gap"] = 0.1
pcm = b"\x00\x01" * (_mod._MIN_BYTES // 2 + 100)
with ps["lock"]:
ps["buffers"]["Alice"] = bytearray(pcm)
ps["last_ts"]["Alice"] = time.monotonic() - 1.0
async def _check():
transcribed = asyncio.Event()
loop = asyncio.get_running_loop()
def mock_transcribe(ps_, pcm_):
loop.call_soon_threadsafe(transcribed.set)
return "hello world"
with patch.object(_mod, "_transcribe",
side_effect=mock_transcribe):
task = asyncio.create_task(_mod._flush_monitor(bot))
await asyncio.wait_for(transcribed.wait(), timeout=5)
# Give the monitor a moment to process the result
await asyncio.sleep(0.2)
ps["trigger"] = ""
await asyncio.sleep(0.1)
try:
await asyncio.wait_for(task, timeout=2)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
assert bot.actions == []
asyncio.run(_check())
def test_on_connected_starts_with_trigger(self):
"""Listener and flush task start on connect when trigger is set."""
bot = _FakeBot()
bot.config = {"voice": {"trigger": "claude"}}
ps = _mod._ps(bot)
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
assert ps["_listener_registered"] is True
assert "voice-flush-monitor" in spawned
def test_listen_status_shows_trigger(self):
"""!listen status includes trigger info when set."""
bot = _FakeBot()
bot.config = {"voice": {"trigger": "claude"}}
_mod._ps(bot)
msg = _Msg(text="!listen")
asyncio.run(_mod.cmd_listen(bot, msg))
assert any("Trigger: claude" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestGreeting
# ---------------------------------------------------------------------------
class TestGreeting:
def test_greet_on_first_connect(self):
"""TTS greeting fires on first connect when configured."""
bot = _FakeBot()
bot.config = {"mumble": {"greet": "Hello there."}}
bot._is_audio_ready = lambda: True
spawned = []
def fake_spawn(coro, *, name=None):
spawned.append(name)
coro.close()
task = MagicMock()
task.done.return_value = False
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
assert "voice-greet" in spawned
def test_greet_only_once(self):
"""Greeting fires only on first connect, not on reconnect."""
bot = _FakeBot()
bot.config = {"mumble": {"greet": "Hello there."}}
bot._is_audio_ready = lambda: True
spawned = []
def fake_spawn(coro, *, name=None):
spawned.append(name)
coro.close()
task = MagicMock()
task.done.return_value = False
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
assert spawned.count("voice-greet") == 1
asyncio.run(_mod.on_connected(bot))
assert spawned.count("voice-greet") == 1
def test_no_greet_without_config(self):
"""No greeting when mumble.greet is not set."""
bot = _FakeBot()
bot.config = {}
spawned = []
def fake_spawn(coro, *, name=None):
spawned.append(name)
coro.close()
task = MagicMock()
task.done.return_value = False
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
assert "voice-greet" not in spawned
def test_no_greet_non_mumble(self):
"""Greeting skipped for non-Mumble bots."""
bot = _FakeBot(mumble=False)
bot.config = {"mumble": {"greet": "Hello there."}}
asyncio.run(_mod.on_connected(bot))
# Should not raise or try to greet