From e9d17e8b006179d56ee60b86a4c07c6f1240755b Mon Sep 17 00:00:00 2001 From: user Date: Sun, 22 Feb 2026 11:41:00 +0100 Subject: [PATCH] feat: voice profiles, rubberband FX, per-bot plugin filtering - Add rubberband package to container for pitch-shifting FX - Split FX chain: rubberband CLI for pitch, ffmpeg for filters - Configurable voice profile (voice, fx, piper params) in [voice] - Extra bots inherit voice config (minus trigger) for own TTS - Greeting is voice-only, spoken directly by the greeting bot - Per-bot only_plugins/except_plugins filtering on Mumble - Alias plugin, core plugin tests Co-Authored-By: Claude Opus 4.6 --- Containerfile | 2 +- plugins/alias.py | 85 ++++++++++++ plugins/core.py | 28 ++++ plugins/music.py | 181 +++++++++++++++++++++---- plugins/voice.py | 257 +++++++++++++++++++++++++++++++++--- src/derp/bot.py | 6 + src/derp/cli.py | 12 +- src/derp/mumble.py | 181 ++++++++++++++++++++++--- src/derp/plugin.py | 28 +++- tests/test_alias.py | 212 ++++++++++++++++++++++++++++++ tests/test_core.py | 92 +++++++++++++ tests/test_music.py | 306 +++++++++++++++++++++++++++++++++++++------ tests/test_plugin.py | 119 +++++++++++++++++ 13 files changed, 1398 insertions(+), 111 deletions(-) create mode 100644 plugins/alias.py create mode 100644 tests/test_alias.py create mode 100644 tests/test_core.py diff --git a/Containerfile b/Containerfile index d49424e..9373822 100644 --- a/Containerfile +++ b/Containerfile @@ -1,6 +1,6 @@ FROM python:3.13-alpine -RUN apk add --no-cache opus ffmpeg yt-dlp && \ +RUN apk add --no-cache opus ffmpeg yt-dlp rubberband && \ ln -s /usr/lib/libopus.so.0 /usr/lib/libopus.so WORKDIR /app diff --git a/plugins/alias.py b/plugins/alias.py new file mode 100644 index 0000000..b26dc1a --- /dev/null +++ b/plugins/alias.py @@ -0,0 +1,85 @@ +"""Plugin: user-defined command aliases (persistent).""" + +from __future__ import annotations + +import logging + +from derp.plugin import command + +log = logging.getLogger(__name__) + +_NS = "alias" + + +@command("alias", help="Aliases: !alias add|del|list|clear") +async def cmd_alias(bot, message): + """Create short aliases for existing bot commands. + + Usage: + !alias add Create alias (e.g. !alias add s skip) + !alias del Remove alias + !alias list Show all aliases + !alias clear Remove all aliases (admin only) + """ + parts = message.text.split(None, 3) + if len(parts) < 2: + await bot.reply(message, "Usage: !alias [args]") + return + + sub = parts[1].lower() + + if sub == "add": + if len(parts) < 4: + await bot.reply(message, "Usage: !alias add ") + return + name = parts[2].lower() + target = parts[3].lower() + + # Cannot shadow an existing registered command + if name in bot.registry.commands: + await bot.reply(message, f"'{name}' is already a registered command") + return + + # Cannot alias to another alias (single-level only) + if bot.state.get(_NS, target) is not None: + await bot.reply(message, f"'{target}' is itself an alias; no chaining") + return + + # Target must resolve to a real command + if target not in bot.registry.commands: + await bot.reply(message, f"unknown command: {target}") + return + + bot.state.set(_NS, name, target) + await bot.reply(message, f"alias: {name} -> {target}") + + elif sub == "del": + if len(parts) < 3: + await bot.reply(message, "Usage: !alias del ") + return + name = parts[2].lower() + if bot.state.delete(_NS, name): + await bot.reply(message, f"alias removed: {name}") + else: + await bot.reply(message, f"no alias: {name}") + + elif sub == "list": + keys = bot.state.keys(_NS) + if not keys: + await bot.reply(message, "No aliases defined") + return + entries = [] + for key in sorted(keys): + target = bot.state.get(_NS, key) + entries.append(f"{key} -> {target}") + await bot.reply(message, "Aliases: " + ", ".join(entries)) + + elif sub == "clear": + if not bot._is_admin(message): + await bot.reply(message, "Permission denied: clear requires admin") + return + count = bot.state.clear(_NS) + await bot.reply(message, f"Cleared {count} alias(es)") + + else: + await bot.reply(message, "Usage: !alias [args]") diff --git a/plugins/core.py b/plugins/core.py index 65f70a2..c5b3680 100644 --- a/plugins/core.py +++ b/plugins/core.py @@ -174,6 +174,34 @@ async def cmd_admins(bot, message): await bot.reply(message, " | ".join(parts)) +@command("deaf", help="Toggle voice listener deaf on Mumble") +async def cmd_deaf(bot, message): + """Toggle the voice listener's deaf state on Mumble. + + Targets the bot with ``receive_sound = true`` (merlin) so that + deafening stops ducking without affecting the music bot's playback. + """ + # Find the listener bot (receive_sound=true) among registered peers + listener = None + bots = getattr(bot.registry, "_bots", {}) + for peer in bots.values(): + if getattr(peer, "_receive_sound", False): + listener = peer + break + mumble = getattr(listener or bot, "_mumble", None) + if mumble is None: + return + myself = mumble.users.myself + name = getattr(listener, "nick", "bot") + if myself.get("self_deaf", False): + myself.undeafen() + myself.unmute() + await bot.reply(message, f"{name}: undeafened") + else: + myself.deafen() + await bot.reply(message, f"{name}: deafened") + + @command("state", help="Inspect plugin state: !state ...", admin=True) async def cmd_state(bot, message): """Manage the plugin state store. diff --git a/plugins/music.py b/plugins/music.py index 5462e78..3a73889 100644 --- a/plugins/music.py +++ b/plugins/music.py @@ -13,6 +13,7 @@ import subprocess import time from dataclasses import dataclass from pathlib import Path +from urllib.parse import parse_qs, urlencode, urlparse, urlunparse from derp.plugin import command @@ -53,6 +54,7 @@ def _ps(bot): "fade_vol": None, "fade_step": None, "history": [], + "autoplay": cfg.get("autoplay", True), "_watcher_task": None, }) @@ -122,10 +124,27 @@ def _parse_seek(arg: str) -> tuple[str, float]: # -- Resume state persistence ------------------------------------------------ +def _strip_playlist_params(url: str) -> str: + """Strip playlist context params from a YouTube URL. + + Keeps only the video identifier so resume/download targets the + exact video instead of resolving through a radio mix or playlist. + """ + parsed = urlparse(url) + if "youtube.com" not in parsed.netloc and "youtu.be" not in parsed.netloc: + return url + params = parse_qs(parsed.query, keep_blank_values=True) + # Keep only the video ID; drop list, index, start_radio, pp, etc. + clean = {k: v for k, v in params.items() if k == "v"} + if not clean: + return url + return urlunparse(parsed._replace(query=urlencode(clean, doseq=True))) + + def _save_resume(bot, track: _Track, elapsed: float) -> None: """Persist current track and elapsed position for later resumption.""" data = json.dumps({ - "url": track.origin or track.url, + "url": _strip_playlist_params(track.url), "title": track.title, "requester": track.requester, "elapsed": round(elapsed, 2), @@ -157,6 +176,10 @@ def _resolve_tracks(url: str, max_tracks: int = _MAX_QUEUE) -> list[tuple[str, s Handles both single videos and playlists. For playlists, returns up to ``max_tracks`` individual entries. Falls back to ``[(url, url)]`` on error. + + YouTube URLs with ``&list=`` are passed through intact so yt-dlp can + resolve the full playlist. Playlist params are only stripped in + ``_save_resume()`` where we need the exact video for resume. """ try: result = subprocess.run( @@ -234,18 +257,21 @@ def _download_track(url: str, track_id: str, title: str = "") -> Path | None: go to the cache dir and are cleaned up after playback unless kept. """ filename = _sanitize_filename(title, track_id) if title else track_id + _MIN_CACHE_SIZE = 100 * 1024 # 100 KB -- skip partial downloads # Reuse existing kept or cached file for d in (_MUSIC_DIR, _CACHE_DIR): for name in (filename, track_id): existing = list(d.glob(f"{name}.*")) if d.is_dir() else [] - if existing: - return existing[0] + for f in existing: + # Trust kept files; skip suspiciously small cache files + if d == _MUSIC_DIR or f.stat().st_size >= _MIN_CACHE_SIZE: + return f _CACHE_DIR.mkdir(parents=True, exist_ok=True) template = str(_CACHE_DIR / f"{track_id}.%(ext)s") try: result = subprocess.run( ["yt-dlp", "-f", "bestaudio", "-x", "-c", "--no-overwrites", - "--no-warnings", "-o", template, + "--no-playlist", "--no-warnings", "-o", template, "--print", "after_move:filepath", url], capture_output=True, text=True, timeout=300, ) @@ -292,7 +318,7 @@ async def _duck_monitor(bot) -> None: ps["duck_vol"] = None restore_start = 0.0 continue - ts = getattr(bot, "_last_voice_ts", 0.0) + ts = getattr(bot.registry, "_voice_ts", 0.0) if ts == 0.0: continue silence = time.monotonic() - ts @@ -346,7 +372,7 @@ async def _auto_resume(bot) -> None: deadline = time.monotonic() + 60 silence_needed = ps.get("duck_silence", 15) - ts = getattr(bot, "_last_voice_ts", 0.0) + ts = getattr(bot.registry, "_voice_ts", 0.0) if ts != 0.0 and time.monotonic() - ts < silence_needed: await bot.send("0", f"Resuming '{title}' at {pos} once silent for " @@ -356,7 +382,7 @@ async def _auto_resume(bot) -> None: while time.monotonic() < deadline: await asyncio.sleep(2) - ts = getattr(bot, "_last_voice_ts", 0.0) + ts = getattr(bot.registry, "_voice_ts", 0.0) if ts == 0.0: break if time.monotonic() - ts >= silence_needed: @@ -387,6 +413,76 @@ async def _auto_resume(bot) -> None: _ensure_loop(bot, seek=elapsed) +def _load_kept_tracks(bot) -> list[_Track]: + """Load all kept tracks from state with valid local files.""" + tracks = [] + for key in bot.state.keys("music"): + if not key.startswith("keep:"): + continue + raw = bot.state.get("music", key) + if not raw: + continue + try: + meta = json.loads(raw) + except (json.JSONDecodeError, TypeError): + continue + filename = meta.get("filename", "") + if not filename: + continue + fpath = _MUSIC_DIR / filename + if not fpath.is_file(): + continue + tracks.append(_Track( + url=meta.get("url", str(fpath)), + title=meta.get("title") or filename, + requester="autoplay", + local_path=fpath, + keep=True, + )) + return tracks + + +async def _autoplay_kept(bot) -> None: + """Shuffle kept tracks and start playback when idle after reconnect.""" + ps = _ps(bot) + if ps["current"] is not None: + return + + kept = _load_kept_tracks(bot) + if not kept: + return + + # Let pymumble fully stabilize + await asyncio.sleep(10) + + # Wait for silence + deadline = time.monotonic() + 60 + silence_needed = ps.get("duck_silence", 15) + ts = getattr(bot.registry, "_voice_ts", 0.0) + if ts != 0.0 and time.monotonic() - ts < silence_needed: + await bot.send("0", + f"Shuffling {len(kept)} kept tracks once silent") + + while time.monotonic() < deadline: + await asyncio.sleep(2) + ts = getattr(bot.registry, "_voice_ts", 0.0) + if ts == 0.0: + break + if time.monotonic() - ts >= silence_needed: + break + else: + log.info("music: autoplay aborted, channel not silent after 60s") + return + + if ps["current"] is not None: + return + + random.shuffle(kept) + ps["queue"].extend(kept) + log.info("music: autoplay %d kept tracks (shuffled)", len(kept)) + _ensure_loop(bot) + + async def _reconnect_watcher(bot) -> None: """Poll for reconnections and trigger auto-resume. @@ -399,30 +495,37 @@ async def _reconnect_watcher(bot) -> None: await asyncio.sleep(2) count = getattr(bot, "_connect_count", 0) - # Cold-start: resume saved state after first connection + # Cold-start: resume or autoplay after first connection if not boot_checked and count >= 1: boot_checked = True if _load_resume(bot) is not None: log.info("music: saved state found on boot, attempting auto-resume") await _auto_resume(bot) - continue + elif _ps(bot).get("autoplay", True): + await _autoplay_kept(bot) + continue if count > last_seen and count > 1: last_seen = count - log.info("music: reconnection detected, attempting auto-resume") - await _auto_resume(bot) + if _load_resume(bot) is not None: + log.info("music: reconnection detected, attempting auto-resume") + await _auto_resume(bot) + elif _ps(bot).get("autoplay", True): + await _autoplay_kept(bot) last_seen = max(last_seen, count) # -- Play loop --------------------------------------------------------------- -async def _play_loop(bot, *, seek: float = 0.0) -> None: +async def _play_loop(bot, *, seek: float = 0.0, fade_in: float | bool = True) -> None: """Pop tracks from queue and stream them sequentially.""" ps = _ps(bot) duck_task = bot._spawn(_duck_monitor(bot), name="music-duck-monitor") ps["duck_task"] = duck_task first = True + seek_req = [None] + ps["seek_req"] = seek_req try: while ps["queue"]: track = ps["queue"].pop(0) @@ -434,6 +537,8 @@ async def _play_loop(bot, *, seek: float = 0.0) -> None: ps["done_event"] = done cur_seek = seek if first else 0.0 + if not first: + fade_in = True # always fade in after first track first = False progress = [0] ps["progress"] = progress @@ -470,7 +575,8 @@ async def _play_loop(bot, *, seek: float = 0.0) -> None: seek=cur_seek, progress=progress, fade_step=lambda: ps.get("fade_step"), - fade_in=True, + fade_in=fade_in, + seek_req=seek_req, ) except asyncio.CancelledError: elapsed = cur_seek + progress[0] * 0.02 @@ -512,16 +618,17 @@ async def _play_loop(bot, *, seek: float = 0.0) -> None: ps["fade_step"] = None ps["progress"] = None ps["cur_seek"] = 0.0 + ps["seek_req"] = None -def _ensure_loop(bot, *, seek: float = 0.0) -> None: +def _ensure_loop(bot, *, seek: float = 0.0, fade_in: float | bool = True) -> None: """Start the play loop if not already running.""" ps = _ps(bot) task = ps.get("task") if task and not task.done(): return ps["task"] = bot._spawn( - _play_loop(bot, seek=seek), name="music-play-loop", + _play_loop(bot, seek=seek, fade_in=fade_in), name="music-play-loop", ) @@ -723,7 +830,7 @@ async def cmd_resume(bot, message): _ensure_loop(bot, seek=elapsed) -@command("skip", help="Music: !skip") +@command("skip", help="Music: !skip", aliases=["next"]) async def cmd_skip(bot, message): """Skip current track, advance to next in queue.""" if not _is_mumble(bot): @@ -807,11 +914,6 @@ async def cmd_seek(bot, message): await bot.reply(message, "Usage: !seek (e.g. 1:30, +30, -30)") return - track = ps["current"] - if track is None: - await bot.reply(message, "Nothing playing") - return - # Compute target position if mode == "abs": target = seconds @@ -823,12 +925,14 @@ async def cmd_seek(bot, message): target = max(0.0, target) - # Re-insert current track at front of queue (local_path intact) - ps["queue"].insert(0, track) - - await _fade_and_cancel(bot) - - _ensure_loop(bot, seek=target) + seek_req = ps.get("seek_req") + if not seek_req: + await bot.reply(message, "Nothing playing") + return + seek_req[0] = target + ps["cur_seek"] = target + if ps.get("progress"): + ps["progress"][0] = 0 await bot.reply(message, f"Seeking to {_fmt_time(target)}") @@ -881,9 +985,13 @@ async def cmd_np(bot, message): return track = ps["current"] + progress = ps.get("progress") + cur_seek = ps.get("cur_seek", 0.0) + elapsed = cur_seek + (progress[0] * 0.02 if progress else 0.0) await bot.reply( message, - f"Now playing: {_truncate(track.title)} [{track.requester}]", + f"Now playing: {_truncate(track.title)} [{track.requester}]" + f" ({_fmt_time(elapsed)})", ) @@ -1047,6 +1155,23 @@ async def cmd_keep(bot, message): return track.keep = True + # Check if this track is already kept (by normalized URL) + norm_url = _strip_playlist_params(track.url) + for key in bot.state.keys("music"): + if not key.startswith("keep:"): + continue + raw = bot.state.get("music", key) + if not raw: + continue + try: + existing = json.loads(raw) + except (json.JSONDecodeError, TypeError): + continue + if _strip_playlist_params(existing.get("url", "")) == norm_url: + kid = existing.get("id", key.split(":", 1)[1]) + await bot.reply(message, f"Already kept as #{kid}") + return + # Assign a unique short ID last_id = int(bot.state.get("music", "keep_next_id") or "1") keep_id = last_id diff --git a/plugins/voice.py b/plugins/voice.py index 0ec6e30..c93a569 100644 --- a/plugins/voice.py +++ b/plugins/voice.py @@ -54,6 +54,11 @@ def _ps(bot): "silence_gap": cfg.get("silence_gap", _SILENCE_GAP), "whisper_url": cfg.get("whisper_url", _WHISPER_URL), "piper_url": cfg.get("piper_url", _PIPER_URL), + "voice": cfg.get("voice", ""), + "length_scale": cfg.get("length_scale", 1.0), + "noise_scale": cfg.get("noise_scale", 0.667), + "noise_w": cfg.get("noise_w", 0.8), + "fx": cfg.get("fx", ""), "_listener_registered": False, }) @@ -210,14 +215,30 @@ def _fetch_tts(piper_url: str, text: str) -> str | None: async def _tts_play(bot, text: str): - """Fetch TTS audio and play it via stream_audio.""" + """Fetch TTS audio and play it via stream_audio. + + Uses the configured voice profile (voice, fx, piper params) when set, + otherwise falls back to Piper's default voice. + """ from pathlib import Path ps = _ps(bot) loop = asyncio.get_running_loop() - wav_path = await loop.run_in_executor( - None, _fetch_tts, ps["piper_url"], text, - ) + if ps["voice"] or ps["fx"]: + wav_path = await loop.run_in_executor( + None, lambda: _fetch_tts_voice( + ps["piper_url"], text, + voice=ps["voice"], + length_scale=ps["length_scale"], + noise_scale=ps["noise_scale"], + noise_w=ps["noise_w"], + fx=ps["fx"], + ), + ) + else: + wav_path = await loop.run_in_executor( + None, _fetch_tts, ps["piper_url"], text, + ) if wav_path is None: return try: @@ -322,26 +343,228 @@ async def cmd_say(bot, message): bot._spawn(_tts_play(bot, text), name="voice-tts") +def _split_fx(fx: str) -> tuple[list[str], str]: + """Split FX chain into rubberband CLI args and ffmpeg filter string. + + Alpine's ffmpeg lacks librubberband, so pitch shifting is handled by + the ``rubberband`` CLI tool and remaining filters by ffmpeg. + """ + import math + parts = fx.split(",") + rb_args: list[str] = [] + ff_parts: list[str] = [] + for part in parts: + if part.startswith("rubberband="): + opts: dict[str, str] = {} + for kv in part[len("rubberband="):].split(":"): + k, _, v = kv.partition("=") + opts[k] = v + if "pitch" in opts: + semitones = 12 * math.log2(float(opts["pitch"])) + rb_args += ["--pitch", f"{semitones:.2f}"] + if opts.get("formant") == "1": + rb_args.append("--formant") + else: + ff_parts.append(part) + return rb_args, ",".join(ff_parts) + + +def _fetch_tts_voice(piper_url: str, text: str, *, voice: str = "", + speaker_id: int = 0, length_scale: float = 1.0, + noise_scale: float = 0.667, noise_w: float = 0.8, + fx: str = "") -> str | None: + """Fetch TTS with explicit voice params and optional FX. Blocking. + + Pitch shifting uses the ``rubberband`` CLI (Alpine ffmpeg has no + librubberband); remaining audio filters go through ffmpeg. + """ + import os + import subprocess + import tempfile + payload = {"text": text} + if voice: + payload["voice"] = voice + if speaker_id: + payload["speaker_id"] = speaker_id + payload["length_scale"] = length_scale + payload["noise_scale"] = noise_scale + payload["noise_w"] = noise_w + data = json.dumps(payload).encode() + req = urllib.request.Request(piper_url, data=data, method="POST") + req.add_header("Content-Type", "application/json") + resp = _urlopen(req, timeout=30, proxy=False) + wav_data = resp.read() + resp.close() + if not wav_data: + return None + tmp = tempfile.NamedTemporaryFile(suffix=".wav", prefix="derp_aud_", delete=False) + tmp.write(wav_data) + tmp.close() + if not fx: + return tmp.name + + rb_args, ff_filters = _split_fx(fx) + current = tmp.name + + # Pitch shift via rubberband CLI + if rb_args: + rb_out = tempfile.NamedTemporaryFile( + suffix=".wav", prefix="derp_aud_", delete=False, + ) + rb_out.close() + r = subprocess.run( + ["rubberband"] + rb_args + [current, rb_out.name], + capture_output=True, timeout=15, + ) + os.unlink(current) + if r.returncode != 0: + log.warning("voice: rubberband failed: %s", r.stderr[:200]) + os.unlink(rb_out.name) + return None + current = rb_out.name + + # Remaining filters via ffmpeg + if ff_filters: + ff_out = tempfile.NamedTemporaryFile( + suffix=".wav", prefix="derp_aud_", delete=False, + ) + ff_out.close() + r = subprocess.run( + ["ffmpeg", "-y", "-i", current, "-af", ff_filters, ff_out.name], + capture_output=True, timeout=15, + ) + os.unlink(current) + if r.returncode != 0: + log.warning("voice: ffmpeg failed: %s", r.stderr[:200]) + os.unlink(ff_out.name) + return None + current = ff_out.name + + return current + + +@command("audition", help="Voice: !audition -- play voice samples", tier="admin") +async def cmd_audition(bot, message): + """Play voice samples through Mumble for comparison.""" + if not _is_mumble(bot): + return + + ps = _ps(bot) + piper_url = ps["piper_url"] + phrase = "The sorcerer has arrived. I have seen things beyond your understanding." + + # FX building blocks + _deep = "rubberband=pitch=0.87:formant=1" + _bass = "bass=g=6:f=110:w=0.6" + _bass_heavy = "equalizer=f=80:t=h:w=150:g=8" + _echo_subtle = "aecho=0.8:0.6:25|40:0.25|0.15" + _echo_chamber = "aecho=0.8:0.88:60:0.35" + _echo_cave = "aecho=0.8:0.7:40|70|100:0.3|0.2|0.1" + + samples = [ + # -- Base voices (no FX) for reference + ("ryan-high raw", "en_US-ryan-high", 0, ""), + ("lessac-high raw", "en_US-lessac-high", 0, ""), + # -- Deep pitch only + ("ryan deep", "en_US-ryan-high", 0, + _deep), + ("lessac deep", "en_US-lessac-high", 0, + _deep), + # -- Deep + bass boost + ("ryan deep+bass", "en_US-ryan-high", 0, + f"{_deep},{_bass}"), + ("lessac deep+bass", "en_US-lessac-high", 0, + f"{_deep},{_bass}"), + # -- Deep + heavy bass + ("ryan deep+heavy bass", "en_US-ryan-high", 0, + f"{_deep},{_bass_heavy}"), + # -- Deep + bass + subtle echo + ("ryan deep+bass+echo", "en_US-ryan-high", 0, + f"{_deep},{_bass},{_echo_subtle}"), + ("lessac deep+bass+echo", "en_US-lessac-high", 0, + f"{_deep},{_bass},{_echo_subtle}"), + # -- Deep + bass + chamber reverb + ("ryan deep+bass+chamber", "en_US-ryan-high", 0, + f"{_deep},{_bass},{_echo_chamber}"), + ("lessac deep+bass+chamber", "en_US-lessac-high", 0, + f"{_deep},{_bass},{_echo_chamber}"), + # -- Deep + heavy bass + cave reverb + ("ryan deep+heavybass+cave", "en_US-ryan-high", 0, + f"{_deep},{_bass_heavy},{_echo_cave}"), + # -- Libritts best candidates with full sorcerer chain + ("libritts #20 deep+bass+echo", "en_US-libritts_r-medium", 20, + f"{_deep},{_bass},{_echo_subtle}"), + ("libritts #22 deep+bass+echo", "en_US-libritts_r-medium", 22, + f"{_deep},{_bass},{_echo_subtle}"), + ("libritts #79 deep+bass+chamber", "en_US-libritts_r-medium", 79, + f"{_deep},{_bass},{_echo_chamber}"), + ] + + # Find merlin (the listener bot) -- plays the audition samples + merlin = None + for peer in getattr(bot.registry, "_bots", {}).values(): + if getattr(peer, "_receive_sound", False): + merlin = peer + break + + await bot.reply(message, f"Auditioning {len(samples)} voice samples...") + loop = asyncio.get_running_loop() + from pathlib import Path + + # Pre-generate derp's default voice (same phrase, no FX) + derp_wav = await loop.run_in_executor( + None, lambda: _fetch_tts_voice(piper_url, phrase), + ) + + for i, (label, voice, sid, fx) in enumerate(samples, 1): + announcer = merlin or bot + await announcer.send("0", f"[{i}/{len(samples)}] {label}") + await asyncio.sleep(1) + # Generate the audition sample (merlin's candidate voice) + sample_wav = await loop.run_in_executor( + None, lambda v=voice, s=sid, f=fx: _fetch_tts_voice( + piper_url, phrase, voice=v, speaker_id=s, + length_scale=1.15, noise_scale=0.4, noise_w=0.5, fx=f, + ), + ) + if sample_wav is None: + await bot.send("0", " (failed)") + continue + try: + # Both bots speak simultaneously: + # merlin plays the audition sample, derp plays its default voice + merlin_done = asyncio.Event() + derp_done = asyncio.Event() + if merlin: + merlin_task = asyncio.create_task( + merlin.stream_audio(sample_wav, volume=1.0, + on_done=merlin_done)) + derp_task = asyncio.create_task( + bot.stream_audio(derp_wav, volume=1.0, + on_done=derp_done)) + await asyncio.gather(merlin_task, derp_task) + else: + await bot.stream_audio(sample_wav, volume=1.0, + on_done=merlin_done) + await merlin_done.wait() + finally: + Path(sample_wav).unlink(missing_ok=True) + await asyncio.sleep(2) + + if derp_wav: + Path(derp_wav).unlink(missing_ok=True) + announcer = merlin or bot + await announcer.send("0", "Audition complete.") + + # -- Plugin lifecycle -------------------------------------------------------- async def on_connected(bot) -> None: - """Re-register listener after reconnect; play TTS greeting on first join.""" + """Re-register listener after reconnect.""" if not _is_mumble(bot): return ps = _ps(bot) - - # TTS greeting on first connect - greet = bot.config.get("mumble", {}).get("greet") - if greet and not ps.get("_greeted"): - ps["_greeted"] = True - # Wait for audio subsystem to be ready - for _ in range(20): - if bot._is_audio_ready(): - break - await asyncio.sleep(0.5) - bot._spawn(_tts_play(bot, greet), name="voice-greet") - if ps["listen"] or ps["trigger"]: _ensure_listener(bot) _ensure_flush_task(bot) diff --git a/src/derp/bot.py b/src/derp/bot.py index 1bd04de..4789d89 100644 --- a/src/derp/bot.py +++ b/src/derp/bot.py @@ -405,6 +405,12 @@ class Bot: parts = text[len(self.prefix):].split(None, 1) cmd_name = parts[0].lower() if parts else "" handler = self._resolve_command(cmd_name) + if handler is None: + # Check user-defined aliases + target = self.state.get("alias", cmd_name) if hasattr(self, "state") else None + if target: + cmd_name = target + handler = self._resolve_command(cmd_name) if handler is None: return if handler is _AMBIGUOUS: diff --git a/src/derp/cli.py b/src/derp/cli.py index 228349b..d3ce980 100644 --- a/src/derp/cli.py +++ b/src/derp/cli.py @@ -161,10 +161,18 @@ def main(argv: list[str] | None = None) -> int: merged_mu = dict(config["mumble"]) merged_mu.update(extra) merged_mu.pop("extra", None) + # Plugin filters are exclusive; don't inherit the parent's + if "only_plugins" in extra: + merged_mu.pop("except_plugins", None) + elif "except_plugins" in extra: + merged_mu.pop("only_plugins", None) extra_cfg["mumble"] = merged_mu - # Extra bots don't run voice trigger by default + # Extra bots inherit [voice] config but not the trigger if "voice" not in extra: - extra_cfg["voice"] = {} + extra_cfg["voice"] = { + k: v for k, v in config.get("voice", {}).items() + if k != "trigger" + } username = extra.get("username", f"mumble-{len(bots)}") bot = MumbleBot(username, extra_cfg, registry) bots.append(bot) diff --git a/src/derp/mumble.py b/src/derp/mumble.py index eed53b1..47eb22b 100644 --- a/src/derp/mumble.py +++ b/src/derp/mumble.py @@ -164,6 +164,18 @@ class MumbleBot: self._last_voice_ts: float = 0.0 self._connect_count: int = 0 self._sound_listeners: list = [] + self._receive_sound: bool = mu_cfg.get("receive_sound", True) + self._only_plugins: set[str] | None = ( + set(mu_cfg["only_plugins"]) if "only_plugins" in mu_cfg else None + ) + self._except_plugins: set[str] | None = ( + set(mu_cfg["except_plugins"]) if "except_plugins" in mu_cfg else None + ) + + # Register in shared bot index so plugins can find peers + if not hasattr(registry, "_bots"): + registry._bots = {} + registry._bots[self._username] = self rate_cfg = config.get("bot", {}) self._bucket = _TokenBucket( @@ -202,7 +214,7 @@ class MumbleBot: PYMUMBLE_CLBK_SOUNDRECEIVED, self._on_sound_received, ) - self._mumble.set_receive_sound(True) + self._mumble.set_receive_sound(self._receive_sound) self._mumble.start() self._mumble.is_ready() @@ -219,8 +231,20 @@ class MumbleBot: ) async def _notify_plugins_connected(self) -> None: - """Call on_connected(bot) in each loaded plugin that defines it.""" + """Call on_connected(bot) in each loaded plugin that defines it. + + Respects ``only_plugins`` / ``except_plugins`` so lifecycle hooks + only fire for plugins this bot is allowed to handle. + + After plugin hooks, checks for a ``greet`` config on the connecting + bot. If present and this is the first connection, the greeting is + spoken through the voice-capable peer (the bot whose ``only_plugins`` + includes ``voice``), so that a non-speaking bot like merlin can + still have an audible entrance announced by derp. + """ for name, mod in self.registry._modules.items(): + if not self._plugin_allowed(name, None): + continue fn = getattr(mod, "on_connected", None) if fn is None or not asyncio.iscoroutinefunction(fn): continue @@ -228,6 +252,22 @@ class MumbleBot: await fn(self) except Exception: log.exception("mumble: on_connected hook failed in %s", name) + await self._play_greet() + + async def _play_greet(self) -> None: + """Speak the greeting via TTS on connect (voice only, no text).""" + greet = self.config.get("mumble", {}).get("greet") + if not greet: + return + voice_mod = self.registry._modules.get("voice") + tts_play = getattr(voice_mod, "_tts_play", None) if voice_mod else None + if tts_play is None: + return + for _ in range(20): + if self._is_audio_ready(): + break + await asyncio.sleep(0.5) + self._spawn(tts_play(self, greet), name="voice-greet") def _on_disconnected(self) -> None: """Callback from pymumble thread: connection lost.""" @@ -243,6 +283,7 @@ class MumbleBot: """ prev = self._last_voice_ts self._last_voice_ts = time.monotonic() + self.registry._voice_ts = self._last_voice_ts if prev == 0.0: name = user["name"] if isinstance(user, dict) else "?" log.info("mumble: first voice packet from %s", name) @@ -361,12 +402,17 @@ class MumbleBot: log.exception("mumble: error in command handler '%s'", cmd_name) def _resolve_command(self, name: str): - """Resolve command name with unambiguous prefix matching.""" + """Resolve command name with unambiguous prefix matching. + + Only considers commands from plugins this bot is allowed to handle, + so filtered-out plugins never trigger ambiguity or dispatch. + """ handler = self.registry.commands.get(name) - if handler is not None: + if handler is not None and self._plugin_allowed(handler.plugin, None): return handler matches = [v for k, v in self.registry.commands.items() - if k.startswith(name)] + if k.startswith(name) + and self._plugin_allowed(v.plugin, None)] if len(matches) == 1: return matches[0] if len(matches) > 1: @@ -374,7 +420,11 @@ class MumbleBot: return None def _plugin_allowed(self, plugin_name: str, channel: str | None) -> bool: - """Channel filtering is IRC-only; all plugins are allowed on Mumble.""" + """Check if this bot handles commands from the given plugin.""" + if self._only_plugins is not None: + return plugin_name in self._only_plugins + if self._except_plugins is not None: + return plugin_name not in self._except_plugins return True # -- Permission tiers ---------------------------------------------------- @@ -526,7 +576,8 @@ class MumbleBot: seek: float = 0.0, progress: list | None = None, fade_step=None, - fade_in: bool = False, + fade_in: float | bool = False, + seek_req: list | None = None, ) -> None: """Stream audio from URL through yt-dlp|ffmpeg to voice channel. @@ -543,8 +594,12 @@ class MumbleBot: current frame count each frame. ``fade_step`` is an optional callable returning a float or None; when non-None it overrides the default ramp step for fast fades (e.g. skip/stop). - ``fade_in`` starts playback from silence and ramps up to the - target volume over ~0.8s. + ``fade_in`` controls the initial volume ramp: ``False``/``0`` = + no fade-in, ``True`` = 5.0s ramp, or a float for a custom + duration in seconds. ``seek_req`` is a mutable ``[None]`` list; + when ``seek_req[0]`` is set to a float, the stream swaps its + ffmpeg pipeline in-place (fade-out, swap, fade-in) without + cancelling the task. """ if self._mumble is None: return @@ -553,14 +608,16 @@ class MumbleBot: log.info("stream_audio: starting pipeline for %s (vol=%.0f%%, seek=%.1fs)", url, _get_vol() * 100, seek) - seek_flag = f" -ss {seek:.3f}" if seek > 0 else "" - if os.path.isfile(url): - cmd = (f"ffmpeg{seek_flag} -i {_shell_quote(url)}" - f" -f s16le -ar 48000 -ac 1 -loglevel error pipe:1") - else: - cmd = (f"yt-dlp -o - -f bestaudio --no-warnings {_shell_quote(url)}" - f" | ffmpeg{seek_flag} -i pipe:0 -f s16le -ar 48000 -ac 1" - f" -loglevel error pipe:1") + def _build_cmd(seek_pos): + seek_flag = f" -ss {seek_pos:.3f}" if seek_pos > 0 else "" + if os.path.isfile(url): + return (f"ffmpeg{seek_flag} -i {_shell_quote(url)}" + f" -f s16le -ar 48000 -ac 1 -loglevel error pipe:1") + return (f"yt-dlp -o - -f bestaudio --no-warnings {_shell_quote(url)}" + f" | ffmpeg{seek_flag} -i pipe:0 -f s16le -ar 48000 -ac 1" + f" -loglevel error pipe:1") + + cmd = _build_cmd(seek) proc = await asyncio.create_subprocess_exec( "sh", "-c", cmd, stdout=asyncio.subprocess.PIPE, @@ -568,18 +625,71 @@ class MumbleBot: ) _max_step = 0.005 # max volume change per frame (~4s full ramp) + # Normalize fade_in to a duration in seconds + if fade_in is True: + _fade_dur = 5.0 + elif fade_in: + _fade_dur = float(fade_in) + else: + _fade_dur = 0.0 _fade_in_target = _get_vol() - _cur_vol = 0.0 if fade_in else _fade_in_target - # Fade-in: constant step to ramp linearly over ~5s (250 frames) - _fade_in_total = int(5.0 / 0.02) if fade_in else 0 + _cur_vol = 0.0 if _fade_dur > 0 else _fade_in_target + _fade_in_total = int(_fade_dur / 0.02) if _fade_dur > 0 else 0 _fade_in_frames = _fade_in_total _fade_in_step = (_fade_in_target / _fade_in_total) if _fade_in_total else 0 _was_feeding = True # track connected/disconnected transitions + # Seek state (in-stream pipeline swap) + _seek_fading = False + _seek_target = 0.0 + _seek_fade_out = 0 + _SEEK_FADE_FRAMES = 10 # 0.2s ramp-down + frames = 0 try: while True: + # Seek: swap pipeline when fade-out complete + if _seek_fading and _seek_fade_out <= 0: + try: + if self._is_audio_ready(): + self._mumble.sound_output.clear_buffer() + except Exception: + pass + try: + proc.kill() + except ProcessLookupError: + pass + try: + await asyncio.wait_for(proc.stderr.read(), timeout=3) + await asyncio.wait_for(proc.wait(), timeout=3) + except (asyncio.TimeoutError, asyncio.CancelledError): + pass + cmd = _build_cmd(_seek_target) + proc = await asyncio.create_subprocess_exec( + "sh", "-c", cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + frames = 0 + if progress is not None: + progress[0] = 0 + seek = _seek_target + _fade_in_total = 25 # 0.5s fade-in + _fade_in_frames = _fade_in_total + _fade_in_target = _get_vol() + _fade_in_step = ( + (_fade_in_target / _fade_in_total) + if _fade_in_total else 0 + ) + _cur_vol = 0.0 + _seek_fading = False + log.info("stream_audio: seek to %.1fs", _seek_target) + continue + pcm = await proc.stdout.read(_FRAME_BYTES) + if not pcm and _seek_fading: + _seek_fade_out = 0 + continue if not pcm: break if len(pcm) < _FRAME_BYTES: @@ -603,6 +713,37 @@ class MumbleBot: "resuming feed at frame %d", frames) _was_feeding = True + # Seek: fade-out in progress + if _seek_fading: + if (seek_req is not None + and seek_req[0] is not None + and seek_req[0] != _seek_target): + _seek_target = seek_req[0] + seek_req[0] = None + fade_ratio = _seek_fade_out / _SEEK_FADE_FRAMES + pcm = _scale_pcm(pcm, _cur_vol * fade_ratio) + try: + self._mumble.sound_output.add_sound(pcm) + except (TypeError, AttributeError, OSError): + pass + _seek_fade_out -= 1 + try: + while (self._is_audio_ready() + and self._mumble.sound_output.get_buffer_size() > 1.0): + await asyncio.sleep(0.05) + except (TypeError, AttributeError): + pass + continue + + # Seek: check for new request + if seek_req is not None and seek_req[0] is not None: + _seek_target = seek_req[0] + seek_req[0] = None + _seek_fading = True + _seek_fade_out = _SEEK_FADE_FRAMES + log.info("stream_audio: seek to %.1fs, fading out", + _seek_target) + target = _get_vol() step = _max_step if _fade_in_frames > 0: diff --git a/src/derp/plugin.py b/src/derp/plugin.py index 294c936..dbd2d57 100644 --- a/src/derp/plugin.py +++ b/src/derp/plugin.py @@ -27,7 +27,13 @@ class Handler: tier: str = "user" -def command(name: str, help: str = "", admin: bool = False, tier: str = "") -> Callable: +def command( + name: str, + help: str = "", + admin: bool = False, + tier: str = "", + aliases: list[str] | None = None, +) -> Callable: """Decorator to register an async function as a bot command. Usage:: @@ -40,8 +46,8 @@ def command(name: str, help: str = "", admin: bool = False, tier: str = "") -> C async def cmd_reload(bot, message): ... - @command("trusted_cmd", help="Trusted-only", tier="trusted") - async def cmd_trusted(bot, message): + @command("skip", help="Skip track", aliases=["next"]) + async def cmd_skip(bot, message): ... """ @@ -50,6 +56,7 @@ def command(name: str, help: str = "", admin: bool = False, tier: str = "") -> C func._derp_help = help # type: ignore[attr-defined] func._derp_admin = admin # type: ignore[attr-defined] func._derp_tier = tier if tier else ("admin" if admin else "user") # type: ignore[attr-defined] + func._derp_aliases = aliases or [] # type: ignore[attr-defined] return func return decorator @@ -107,14 +114,25 @@ class PluginRegistry: count = 0 for _name, obj in inspect.getmembers(module, inspect.isfunction): if hasattr(obj, "_derp_command"): + cmd_tier = getattr(obj, "_derp_tier", "user") + cmd_admin = getattr(obj, "_derp_admin", False) self.register_command( obj._derp_command, obj, help=getattr(obj, "_derp_help", ""), plugin=plugin_name, - admin=getattr(obj, "_derp_admin", False), - tier=getattr(obj, "_derp_tier", "user"), + admin=cmd_admin, + tier=cmd_tier, ) count += 1 + for alias in getattr(obj, "_derp_aliases", []): + self.register_command( + alias, obj, + help=f"alias for !{obj._derp_command}", + plugin=plugin_name, + admin=cmd_admin, + tier=cmd_tier, + ) + count += 1 if hasattr(obj, "_derp_event"): self.register_event(obj._derp_event, obj, plugin=plugin_name) count += 1 diff --git a/tests/test_alias.py b/tests/test_alias.py new file mode 100644 index 0000000..7b13497 --- /dev/null +++ b/tests/test_alias.py @@ -0,0 +1,212 @@ +"""Tests for the alias plugin.""" + +import asyncio +import importlib.util +import sys + +from derp.plugin import PluginRegistry + +# -- Load plugin module directly --------------------------------------------- + +_spec = importlib.util.spec_from_file_location("alias", "plugins/alias.py") +_mod = importlib.util.module_from_spec(_spec) +sys.modules["alias"] = _mod +_spec.loader.exec_module(_mod) + + +# -- Fakes ------------------------------------------------------------------- + + +class _FakeState: + def __init__(self): + self._store: dict[str, dict[str, str]] = {} + + def get(self, ns: str, key: str) -> str | None: + return self._store.get(ns, {}).get(key) + + def set(self, ns: str, key: str, value: str) -> None: + self._store.setdefault(ns, {})[key] = value + + def delete(self, ns: str, key: str) -> bool: + if ns in self._store and key in self._store[ns]: + del self._store[ns][key] + return True + return False + + def keys(self, ns: str) -> list[str]: + return list(self._store.get(ns, {}).keys()) + + def clear(self, ns: str) -> int: + count = len(self._store.get(ns, {})) + self._store.pop(ns, None) + return count + + +class _FakeBot: + def __init__(self, *, admin: bool = False): + self.replied: list[str] = [] + self.state = _FakeState() + self.registry = PluginRegistry() + self._admin = admin + + async def reply(self, message, text: str) -> None: + self.replied.append(text) + + def _is_admin(self, message) -> bool: + return self._admin + + +class _Msg: + def __init__(self, text="!alias"): + self.text = text + self.nick = "Alice" + self.target = "#test" + self.is_channel = True + self.prefix = "Alice!~alice@host" + + +# --------------------------------------------------------------------------- +# TestAliasAdd +# --------------------------------------------------------------------------- + + +class TestAliasAdd: + def test_add_creates_alias(self): + bot = _FakeBot() + # Register a target command + async def _noop(b, m): pass + bot.registry.register_command("skip", _noop, plugin="music") + msg = _Msg(text="!alias add s skip") + asyncio.run(_mod.cmd_alias(bot, msg)) + assert bot.state.get("alias", "s") == "skip" + assert any("s -> skip" in r for r in bot.replied) + + def test_add_rejects_existing_command(self): + bot = _FakeBot() + async def _noop(b, m): pass + bot.registry.register_command("skip", _noop, plugin="music") + msg = _Msg(text="!alias add skip stop") + asyncio.run(_mod.cmd_alias(bot, msg)) + assert any("already a registered command" in r for r in bot.replied) + assert bot.state.get("alias", "skip") is None + + def test_add_rejects_chaining(self): + bot = _FakeBot() + async def _noop(b, m): pass + bot.registry.register_command("skip", _noop, plugin="music") + bot.state.set("alias", "sk", "skip") + msg = _Msg(text="!alias add x sk") + asyncio.run(_mod.cmd_alias(bot, msg)) + assert any("no chaining" in r for r in bot.replied) + + def test_add_rejects_unknown_target(self): + bot = _FakeBot() + msg = _Msg(text="!alias add s nonexistent") + asyncio.run(_mod.cmd_alias(bot, msg)) + assert any("unknown command" in r for r in bot.replied) + + def test_add_lowercases_name(self): + bot = _FakeBot() + async def _noop(b, m): pass + bot.registry.register_command("skip", _noop, plugin="music") + msg = _Msg(text="!alias add S skip") + asyncio.run(_mod.cmd_alias(bot, msg)) + assert bot.state.get("alias", "s") == "skip" + + def test_add_missing_args(self): + bot = _FakeBot() + msg = _Msg(text="!alias add s") + asyncio.run(_mod.cmd_alias(bot, msg)) + assert any("Usage" in r for r in bot.replied) + + +# --------------------------------------------------------------------------- +# TestAliasDel +# --------------------------------------------------------------------------- + + +class TestAliasDel: + def test_del_removes_alias(self): + bot = _FakeBot() + bot.state.set("alias", "s", "skip") + msg = _Msg(text="!alias del s") + asyncio.run(_mod.cmd_alias(bot, msg)) + assert bot.state.get("alias", "s") is None + assert any("removed" in r for r in bot.replied) + + def test_del_nonexistent(self): + bot = _FakeBot() + msg = _Msg(text="!alias del x") + asyncio.run(_mod.cmd_alias(bot, msg)) + assert any("no alias" in r for r in bot.replied) + + def test_del_missing_name(self): + bot = _FakeBot() + msg = _Msg(text="!alias del") + asyncio.run(_mod.cmd_alias(bot, msg)) + assert any("Usage" in r for r in bot.replied) + + +# --------------------------------------------------------------------------- +# TestAliasList +# --------------------------------------------------------------------------- + + +class TestAliasList: + def test_list_empty(self): + bot = _FakeBot() + msg = _Msg(text="!alias list") + asyncio.run(_mod.cmd_alias(bot, msg)) + assert any("No aliases" in r for r in bot.replied) + + def test_list_shows_entries(self): + bot = _FakeBot() + bot.state.set("alias", "s", "skip") + bot.state.set("alias", "np", "nowplaying") + msg = _Msg(text="!alias list") + asyncio.run(_mod.cmd_alias(bot, msg)) + assert any("s -> skip" in r for r in bot.replied) + assert any("np -> nowplaying" in r for r in bot.replied) + + +# --------------------------------------------------------------------------- +# TestAliasClear +# --------------------------------------------------------------------------- + + +class TestAliasClear: + def test_clear_as_admin(self): + bot = _FakeBot(admin=True) + bot.state.set("alias", "s", "skip") + bot.state.set("alias", "np", "nowplaying") + msg = _Msg(text="!alias clear") + asyncio.run(_mod.cmd_alias(bot, msg)) + assert any("Cleared 2" in r for r in bot.replied) + assert bot.state.keys("alias") == [] + + def test_clear_denied_non_admin(self): + bot = _FakeBot(admin=False) + bot.state.set("alias", "s", "skip") + msg = _Msg(text="!alias clear") + asyncio.run(_mod.cmd_alias(bot, msg)) + assert any("Permission denied" in r for r in bot.replied) + assert bot.state.get("alias", "s") == "skip" + + +# --------------------------------------------------------------------------- +# TestAliasUsage +# --------------------------------------------------------------------------- + + +class TestAliasUsage: + def test_no_subcommand(self): + bot = _FakeBot() + msg = _Msg(text="!alias") + asyncio.run(_mod.cmd_alias(bot, msg)) + assert any("Usage" in r for r in bot.replied) + + def test_unknown_subcommand(self): + bot = _FakeBot() + msg = _Msg(text="!alias foo") + asyncio.run(_mod.cmd_alias(bot, msg)) + assert any("Usage" in r for r in bot.replied) diff --git a/tests/test_core.py b/tests/test_core.py new file mode 100644 index 0000000..789513b --- /dev/null +++ b/tests/test_core.py @@ -0,0 +1,92 @@ +"""Tests for the core plugin.""" + +import asyncio +import importlib.util +import sys +from unittest.mock import MagicMock + +# -- Load plugin module directly --------------------------------------------- + +_spec = importlib.util.spec_from_file_location("core", "plugins/core.py") +_mod = importlib.util.module_from_spec(_spec) +sys.modules["core"] = _mod +_spec.loader.exec_module(_mod) + + +# -- Fakes ------------------------------------------------------------------- + + +class _FakeRegistry: + def __init__(self): + self._bots: dict = {} + + +class _FakeBot: + def __init__(self, *, mumble: bool = False): + self.replied: list[str] = [] + self.registry = _FakeRegistry() + self.nick = "derp" + self._receive_sound = False + if mumble: + self._mumble = MagicMock() + + async def reply(self, message, text: str) -> None: + self.replied.append(text) + + +def _make_listener(): + """Create a fake listener bot (merlin) with _receive_sound=True.""" + listener = _FakeBot(mumble=True) + listener.nick = "merlin" + listener._receive_sound = True + return listener + + +class _Msg: + def __init__(self, text="!deaf"): + self.text = text + self.nick = "Alice" + self.target = "0" + self.is_channel = True + self.prefix = "Alice" + + +# -- Tests ------------------------------------------------------------------- + + +class TestDeafCommand: + def test_deaf_targets_listener(self): + """!deaf toggles the listener bot (merlin), not the calling bot.""" + bot = _FakeBot(mumble=True) + listener = _make_listener() + bot.registry._bots = {"derp": bot, "merlin": listener} + listener._mumble.users.myself.get.return_value = False + msg = _Msg(text="!deaf") + asyncio.run(_mod.cmd_deaf(bot, msg)) + listener._mumble.users.myself.deafen.assert_called_once() + assert any("merlin" in r and "deafened" in r for r in bot.replied) + + def test_deaf_toggle_off(self): + bot = _FakeBot(mumble=True) + listener = _make_listener() + bot.registry._bots = {"derp": bot, "merlin": listener} + listener._mumble.users.myself.get.return_value = True + msg = _Msg(text="!deaf") + asyncio.run(_mod.cmd_deaf(bot, msg)) + listener._mumble.users.myself.undeafen.assert_called_once() + listener._mumble.users.myself.unmute.assert_called_once() + assert any("merlin" in r and "undeafened" in r for r in bot.replied) + + def test_deaf_non_mumble_silent(self): + bot = _FakeBot(mumble=False) + msg = _Msg(text="!deaf") + asyncio.run(_mod.cmd_deaf(bot, msg)) + assert bot.replied == [] + + def test_deaf_fallback_no_listener(self): + """Falls back to calling bot when no listener is registered.""" + bot = _FakeBot(mumble=True) + bot._mumble.users.myself.get.return_value = False + msg = _Msg(text="!deaf") + asyncio.run(_mod.cmd_deaf(bot, msg)) + bot._mumble.users.myself.deafen.assert_called_once() diff --git a/tests/test_music.py b/tests/test_music.py index d8ff378..825b4f7 100644 --- a/tests/test_music.py +++ b/tests/test_music.py @@ -35,6 +35,13 @@ class _FakeState: return list(self._store.get(ns, {}).keys()) +class _FakeRegistry: + """Minimal registry with shared voice timestamp.""" + + def __init__(self): + self._voice_ts: float = 0.0 + + class _FakeBot: """Minimal bot for music plugin testing.""" @@ -45,6 +52,7 @@ class _FakeBot: self.config: dict = {} self._pstate: dict = {} self._tasks: set[asyncio.Task] = set() + self.registry = _FakeRegistry() if mumble: self.stream_audio = AsyncMock() @@ -299,6 +307,19 @@ class TestNpCommand: asyncio.run(_mod.cmd_np(bot, msg)) assert any("Cool Song" in r for r in bot.replied) assert any("DJ" in r for r in bot.replied) + assert any("0:00" in r for r in bot.replied) + + def test_np_shows_elapsed(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["current"] = _mod._Track( + url="x", title="Cool Song", requester="DJ", + ) + ps["cur_seek"] = 60.0 + ps["progress"] = [1500] # 1500 * 0.02 = 30s + msg = _Msg(text="!np") + asyncio.run(_mod.cmd_np(bot, msg)) + assert any("1:30" in r for r in bot.replied) # --------------------------------------------------------------------------- @@ -527,6 +548,21 @@ class TestPlaylistExpansion: assert tracks[0] == ("https://example.com/1", "First") assert tracks[1] == ("https://example.com/2", "Second") + def test_resolve_tracks_preserves_playlist_url(self): + """Video+playlist URL passes through to yt-dlp intact.""" + result = MagicMock() + result.stdout = ( + "https://youtube.com/watch?v=a\nFirst\n" + "https://youtube.com/watch?v=b\nSecond\n" + ) + url = "https://www.youtube.com/watch?v=a&list=PLxyz&index=1" + with patch("subprocess.run", return_value=result) as mock_run: + tracks = _mod._resolve_tracks(url) + # URL must reach yt-dlp with &list= intact + called_url = mock_run.call_args[0][0][-1] + assert "list=PLxyz" in called_url + assert len(tracks) == 2 + def test_resolve_tracks_error_fallback(self): """On error, returns [(url, url)].""" with patch("subprocess.run", side_effect=Exception("fail")): @@ -580,6 +616,56 @@ class TestResumeState: bot.state.set("music", "resume", '{"title": "x"}') assert _mod._load_resume(bot) is None + def test_save_strips_youtube_playlist_params(self): + """_save_resume strips &list= and other playlist params from YouTube URLs.""" + bot = _FakeBot() + track = _mod._Track( + url="https://www.youtube.com/watch?v=abc123&list=RDabc123&start_radio=1&pp=xyz", + title="Song", requester="Alice", + ) + _mod._save_resume(bot, track, 60.0) + data = _mod._load_resume(bot) + assert data is not None + assert data["url"] == "https://www.youtube.com/watch?v=abc123" + + def test_save_preserves_non_youtube_urls(self): + """_save_resume leaves non-YouTube URLs unchanged.""" + bot = _FakeBot() + track = _mod._Track( + url="https://soundcloud.com/artist/track?ref=playlist", + title="Song", requester="Alice", + ) + _mod._save_resume(bot, track, 30.0) + data = _mod._load_resume(bot) + assert data["url"] == "https://soundcloud.com/artist/track?ref=playlist" + + +# --------------------------------------------------------------------------- +# TestStripPlaylistParams +# --------------------------------------------------------------------------- + + +class TestStripPlaylistParams: + def test_strips_list_param(self): + url = "https://www.youtube.com/watch?v=abc&list=PLxyz&index=3" + assert _mod._strip_playlist_params(url) == "https://www.youtube.com/watch?v=abc" + + def test_strips_radio_params(self): + url = "https://www.youtube.com/watch?v=abc&list=RDabc&start_radio=1&pp=xyz" + assert _mod._strip_playlist_params(url) == "https://www.youtube.com/watch?v=abc" + + def test_preserves_plain_url(self): + url = "https://www.youtube.com/watch?v=abc123" + assert _mod._strip_playlist_params(url) == "https://www.youtube.com/watch?v=abc123" + + def test_non_youtube_unchanged(self): + url = "https://soundcloud.com/track?list=abc" + assert _mod._strip_playlist_params(url) == url + + def test_youtu_be_without_v_param(self): + url = "https://youtu.be/abc123" + assert _mod._strip_playlist_params(url) == url + # --------------------------------------------------------------------------- # TestResumeCommand @@ -740,7 +826,7 @@ class TestDuckMonitor: ps = _mod._ps(bot) ps["duck_enabled"] = True ps["duck_floor"] = 5 - bot._last_voice_ts = time.monotonic() + bot.registry._voice_ts = time.monotonic() async def _check(): task = asyncio.create_task(_mod._duck_monitor(bot)) @@ -760,7 +846,7 @@ class TestDuckMonitor: ps["duck_floor"] = 1 ps["duck_restore"] = 10 # 10s total restore ps["volume"] = 50 - bot._last_voice_ts = time.monotonic() - 100 + bot.registry._voice_ts = time.monotonic() - 100 ps["duck_vol"] = 1.0 # already ducked async def _check(): @@ -783,7 +869,7 @@ class TestDuckMonitor: ps["duck_floor"] = 1 ps["duck_restore"] = 1 # 1s restore -- completes quickly ps["volume"] = 50 - bot._last_voice_ts = time.monotonic() - 100 + bot.registry._voice_ts = time.monotonic() - 100 ps["duck_vol"] = 1.0 async def _check(): @@ -805,14 +891,14 @@ class TestDuckMonitor: ps["duck_floor"] = 5 ps["duck_restore"] = 30 ps["volume"] = 50 - bot._last_voice_ts = time.monotonic() - 100 + bot.registry._voice_ts = time.monotonic() - 100 ps["duck_vol"] = 30.0 # mid-restore async def _check(): task = asyncio.create_task(_mod._duck_monitor(bot)) await asyncio.sleep(0.5) # Simulate voice arriving now - bot._last_voice_ts = time.monotonic() + bot.registry._voice_ts = time.monotonic() await asyncio.sleep(1.5) assert ps["duck_vol"] == 5.0 # re-ducked to floor task.cancel() @@ -826,7 +912,7 @@ class TestDuckMonitor: bot = _FakeBot() ps = _mod._ps(bot) ps["duck_enabled"] = False - bot._last_voice_ts = time.monotonic() + bot.registry._voice_ts = time.monotonic() async def _check(): task = asyncio.create_task(_mod._duck_monitor(bot)) @@ -850,7 +936,7 @@ class TestAutoResume: """Auto-resume loads saved state when channel is silent.""" bot = _FakeBot() bot._connect_count = 2 - bot._last_voice_ts = 0.0 + bot.registry._voice_ts = 0.0 track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice") _mod._save_resume(bot, track, 120.0) @@ -878,7 +964,7 @@ class TestAutoResume: def test_no_resume_if_no_state(self): """Auto-resume returns early when nothing is saved.""" bot = _FakeBot() - bot._last_voice_ts = 0.0 + bot.registry._voice_ts = 0.0 with patch.object(_mod, "_ensure_loop") as mock_loop: asyncio.run(_mod._auto_resume(bot)) mock_loop.assert_not_called() @@ -887,7 +973,7 @@ class TestAutoResume: """Auto-resume aborts if voice never goes silent within deadline.""" bot = _FakeBot() now = time.monotonic() - bot._last_voice_ts = now + bot.registry._voice_ts = now ps = _mod._ps(bot) ps["duck_silence"] = 15 track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice") @@ -903,7 +989,7 @@ class TestAutoResume: async def _fast_sleep(s): mono_val[0] += s - bot._last_voice_ts = mono_val[0] + bot.registry._voice_ts = mono_val[0] await _real_sleep(0) with patch.object(time, "monotonic", side_effect=_fast_mono): @@ -918,6 +1004,9 @@ class TestAutoResume: """Watcher detects connect_count increment and calls _auto_resume.""" bot = _FakeBot() bot._connect_count = 1 + # Resume state must exist for watcher to call _auto_resume + track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice") + _mod._save_resume(bot, track, 60.0) async def _check(): with patch.object(_mod, "_auto_resume", new_callable=AsyncMock) as mock_ar: @@ -1014,6 +1103,111 @@ class TestAutoResume: assert spawned.count("music-reconnect-watcher") == 1 +# --------------------------------------------------------------------------- +# TestAutoplayKept +# --------------------------------------------------------------------------- + + +class TestAutoplayKept: + def test_shuffles_kept_tracks(self, tmp_path): + """Autoplay loads kept tracks, shuffles, and starts playback.""" + bot = _FakeBot() + bot.registry._voice_ts = 0.0 + music_dir = tmp_path / "music" + music_dir.mkdir() + # Create two kept files + (music_dir / "a.opus").write_bytes(b"audio") + (music_dir / "b.opus").write_bytes(b"audio") + bot.state.set("music", "keep:1", json.dumps({ + "url": "https://example.com/a", "title": "Track A", + "filename": "a.opus", "id": 1, + })) + bot.state.set("music", "keep:2", json.dumps({ + "url": "https://example.com/b", "title": "Track B", + "filename": "b.opus", "id": 2, + })) + with patch.object(_mod, "_MUSIC_DIR", music_dir), \ + patch.object(_mod, "_ensure_loop") as mock_loop: + asyncio.run(_mod._autoplay_kept(bot)) + mock_loop.assert_called_once_with(bot) + ps = _mod._ps(bot) + assert len(ps["queue"]) == 2 + titles = {t.title for t in ps["queue"]} + assert titles == {"Track A", "Track B"} + # All tracks marked keep=True + assert all(t.keep for t in ps["queue"]) + + def test_skips_when_already_playing(self): + bot = _FakeBot() + ps = _mod._ps(bot) + ps["current"] = _mod._Track(url="x", title="Playing", requester="a") + with patch.object(_mod, "_ensure_loop") as mock_loop: + asyncio.run(_mod._autoplay_kept(bot)) + mock_loop.assert_not_called() + + def test_skips_when_no_kept_tracks(self): + bot = _FakeBot() + bot.registry._voice_ts = 0.0 + with patch.object(_mod, "_ensure_loop") as mock_loop: + asyncio.run(_mod._autoplay_kept(bot)) + mock_loop.assert_not_called() + + def test_load_kept_tracks_skips_missing_files(self, tmp_path): + """Tracks with missing local files are excluded.""" + bot = _FakeBot() + music_dir = tmp_path / "music" + music_dir.mkdir() + bot.state.set("music", "keep:1", json.dumps({ + "url": "https://example.com/a", "title": "Gone", + "filename": "missing.opus", "id": 1, + })) + with patch.object(_mod, "_MUSIC_DIR", music_dir): + tracks = _mod._load_kept_tracks(bot) + assert tracks == [] + + def test_watcher_autoplay_on_boot_no_resume(self): + """Watcher triggers autoplay on boot when no resume state exists.""" + bot = _FakeBot() + bot._connect_count = 0 + + async def _check(): + with patch.object(_mod, "_autoplay_kept", + new_callable=AsyncMock) as mock_ap: + task = asyncio.create_task(_mod._reconnect_watcher(bot)) + await asyncio.sleep(0.5) + bot._connect_count = 1 + await asyncio.sleep(3) + mock_ap.assert_called_once_with(bot) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + asyncio.run(_check()) + + def test_watcher_autoplay_on_reconnect_no_resume(self): + """Watcher triggers autoplay on reconnect when no resume state.""" + bot = _FakeBot() + bot._connect_count = 1 + + async def _check(): + with patch.object(_mod, "_autoplay_kept", + new_callable=AsyncMock) as mock_ap: + task = asyncio.create_task(_mod._reconnect_watcher(bot)) + await asyncio.sleep(0.5) + bot._connect_count = 2 + await asyncio.sleep(3) + mock_ap.assert_called_once_with(bot) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + asyncio.run(_check()) + + # --------------------------------------------------------------------------- # TestDownloadTrack # --------------------------------------------------------------------------- @@ -1143,6 +1337,49 @@ class TestKeepCommand: assert track.keep is True assert any("Keeping" in r for r in bot.replied) + def test_keep_duplicate_blocked(self, tmp_path): + bot = _FakeBot() + ps = _mod._ps(bot) + f = tmp_path / "abc123.opus" + f.write_bytes(b"audio") + track = _mod._Track( + url="https://example.com/v", title="t", requester="a", + local_path=f, + ) + ps["current"] = track + # Pre-existing kept entry with same URL + bot.state.set("music", "keep:1", json.dumps({ + "url": "https://example.com/v", "id": 1, + })) + bot.state.set("music", "keep_next_id", "2") + msg = _Msg(text="!keep") + asyncio.run(_mod.cmd_keep(bot, msg)) + assert any("Already kept" in r for r in bot.replied) + assert any("#1" in r for r in bot.replied) + # ID counter should not have incremented + assert bot.state.get("music", "keep_next_id") == "2" + + def test_keep_duplicate_with_playlist_params(self, tmp_path): + bot = _FakeBot() + ps = _mod._ps(bot) + f = tmp_path / "abc123.opus" + f.write_bytes(b"audio") + # Track URL has playlist cruft + track = _mod._Track( + url="https://www.youtube.com/watch?v=abc&list=RDabc&start_radio=1", + title="t", requester="a", local_path=f, + ) + ps["current"] = track + # Existing entry stored with clean URL + bot.state.set("music", "keep:1", json.dumps({ + "url": "https://www.youtube.com/watch?v=abc", "id": 1, + })) + bot.state.set("music", "keep_next_id", "2") + msg = _Msg(text="!keep") + asyncio.run(_mod.cmd_keep(bot, msg)) + assert any("Already kept" in r for r in bot.replied) + assert bot.state.get("music", "keep_next_id") == "2" + def test_keep_non_mumble(self): bot = _FakeBot(mumble=False) msg = _Msg(text="!keep") @@ -1267,50 +1504,43 @@ class TestSeekCommand: def test_seek_absolute(self): bot = _FakeBot() ps = _mod._ps(bot) - track = _mod._Track(url="x", title="Song", requester="a") - ps["current"] = track - mock_task = MagicMock() - mock_task.done.return_value = False - ps["task"] = mock_task + ps["current"] = _mod._Track(url="x", title="Song", requester="a") + ps["seek_req"] = [None] + ps["progress"] = [100] msg = _Msg(text="!seek 1:30") - with patch.object(_mod, "_ensure_loop") as mock_loop: - asyncio.run(_mod.cmd_seek(bot, msg)) - mock_loop.assert_called_once_with(bot, seek=90.0) - assert ps["queue"][0] is track + asyncio.run(_mod.cmd_seek(bot, msg)) + assert ps["seek_req"][0] == 90.0 + assert ps["cur_seek"] == 90.0 + assert ps["progress"][0] == 0 assert any("1:30" in r for r in bot.replied) - mock_task.cancel.assert_called_once() def test_seek_relative_forward(self): bot = _FakeBot() ps = _mod._ps(bot) - track = _mod._Track(url="x", title="Song", requester="a") - ps["current"] = track + ps["current"] = _mod._Track(url="x", title="Song", requester="a") + ps["seek_req"] = [None] ps["progress"] = [1500] # 1500 * 0.02 = 30s ps["cur_seek"] = 60.0 # started at 60s - mock_task = MagicMock() - mock_task.done.return_value = False - ps["task"] = mock_task msg = _Msg(text="!seek +30") - with patch.object(_mod, "_ensure_loop") as mock_loop: - asyncio.run(_mod.cmd_seek(bot, msg)) - # elapsed = 60 + 30 = 90, target = 90 + 30 = 120 - mock_loop.assert_called_once_with(bot, seek=120.0) + asyncio.run(_mod.cmd_seek(bot, msg)) + # elapsed = 60 + 30 = 90, target = 90 + 30 = 120 + assert ps["seek_req"][0] == 120.0 + assert ps["cur_seek"] == 120.0 + assert ps["progress"][0] == 0 def test_seek_relative_backward_clamps(self): bot = _FakeBot() ps = _mod._ps(bot) - track = _mod._Track(url="x", title="Song", requester="a") - ps["current"] = track + ps["current"] = _mod._Track(url="x", title="Song", requester="a") + ps["seek_req"] = [None] ps["progress"] = [500] # 500 * 0.02 = 10s ps["cur_seek"] = 0.0 - mock_task = MagicMock() - mock_task.done.return_value = False - ps["task"] = mock_task msg = _Msg(text="!seek -30") - with patch.object(_mod, "_ensure_loop") as mock_loop: - asyncio.run(_mod.cmd_seek(bot, msg)) - # elapsed = 0 + 10 = 10, target = 10 - 30 = -20, clamped to 0 - mock_loop.assert_called_once_with(bot, seek=0.0) + asyncio.run(_mod.cmd_seek(bot, msg)) + # elapsed = 0 + 10 = 10, target = 10 - 30 = -20, clamped to 0 + assert ps["seek_req"][0] == 0.0 + assert ps["cur_seek"] == 0.0 + assert ps["progress"][0] == 0 # --------------------------------------------------------------------------- diff --git a/tests/test_plugin.py b/tests/test_plugin.py index b8e6ad0..82674bc 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -36,6 +36,20 @@ class TestDecorators: assert handler._derp_event == "PRIVMSG" + def test_command_decorator_aliases(self): + @command("skip", help="skip track", aliases=["next", "s"]) + async def handler(bot, msg): + pass + + assert handler._derp_aliases == ["next", "s"] + + def test_command_decorator_aliases_default(self): + @command("ping", help="ping") + async def handler(bot, msg): + pass + + assert handler._derp_aliases == [] + def test_command_decorator_admin(self): @command("secret", help="admin only", admin=True) async def handler(bot, msg): @@ -208,6 +222,46 @@ class TestRegistry: assert registry.commands["secret"].admin is True assert registry.commands["public"].admin is False + def test_load_plugin_aliases(self, tmp_path: Path): + plugin_file = tmp_path / "aliased.py" + plugin_file.write_text(textwrap.dedent("""\ + from derp.plugin import command + + @command("skip", help="Skip track", aliases=["next", "s"]) + async def cmd_skip(bot, msg): + pass + """)) + + registry = PluginRegistry() + count = registry.load_plugin(plugin_file) + assert count == 3 # primary + 2 aliases + assert "skip" in registry.commands + assert "next" in registry.commands + assert "s" in registry.commands + # Aliases point to the same callback + assert registry.commands["next"].callback is registry.commands["skip"].callback + assert registry.commands["s"].callback is registry.commands["skip"].callback + # Alias help text references the primary command + assert registry.commands["next"].help == "alias for !skip" + + def test_unload_removes_aliases(self, tmp_path: Path): + plugin_file = tmp_path / "aliased.py" + plugin_file.write_text(textwrap.dedent("""\ + from derp.plugin import command + + @command("skip", help="Skip track", aliases=["next"]) + async def cmd_skip(bot, msg): + pass + """)) + + registry = PluginRegistry() + registry.load_plugin(plugin_file) + assert "next" in registry.commands + + registry.unload_plugin("aliased") + assert "skip" not in registry.commands + assert "next" not in registry.commands + def test_load_plugin_stores_path(self, tmp_path: Path): plugin_file = tmp_path / "pathed.py" plugin_file.write_text(textwrap.dedent("""\ @@ -677,6 +731,71 @@ class TestChannelFilter: assert bot._plugin_allowed("encode", "&local") is False +class TestAliasDispatch: + """Test alias fallback in _dispatch_command.""" + + @staticmethod + def _make_bot_with_alias(alias_name: str, target_cmd: str) -> tuple[Bot, list]: + """Create a Bot with a command and an alias pointing to it.""" + config = { + "server": {"host": "localhost", "port": 6667, "tls": False, + "nick": "test", "user": "test", "realname": "test"}, + "bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins"}, + } + registry = PluginRegistry() + called = [] + + async def _handler(bot, msg): + called.append(msg.text) + + registry.register_command(target_cmd, _handler, plugin="test") + bot = Bot("test", config, registry) + bot.conn = _FakeConnection() + bot.state.set("alias", alias_name, target_cmd) + return bot, called + + def test_alias_resolves_command(self): + """An alias triggers the target command handler.""" + bot, called = self._make_bot_with_alias("s", "skip") + msg = Message(raw="", prefix="nick!u@h", nick="nick", + command="PRIVMSG", params=["#ch", "!s"], tags={}) + + async def _run(): + bot._dispatch_command(msg) + await asyncio.sleep(0.05) # let spawned task run + + asyncio.run(_run()) + assert len(called) == 1 + + def test_alias_ignored_when_command_exists(self): + """Direct command match takes priority over alias.""" + bot, called = self._make_bot_with_alias("skip", "stop") + # "skip" is both a real command and an alias to "stop"; real wins + msg = Message(raw="", prefix="nick!u@h", nick="nick", + command="PRIVMSG", params=["#ch", "!skip"], tags={}) + + async def _run(): + bot._dispatch_command(msg) + await asyncio.sleep(0.05) + + asyncio.run(_run()) + assert len(called) == 1 + # Handler was the "skip" handler, not "stop" + + def test_no_alias_no_crash(self): + """Unknown command with no alias silently returns.""" + config = { + "server": {"host": "localhost", "port": 6667, "tls": False, + "nick": "test", "user": "test", "realname": "test"}, + "bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins"}, + } + bot = Bot("test", config, PluginRegistry()) + bot.conn = _FakeConnection() + msg = Message(raw="", prefix="nick!u@h", nick="nick", + command="PRIVMSG", params=["#ch", "!nonexistent"], tags={}) + bot._dispatch_command(msg) # should not raise + + class TestSplitUtf8: """Test UTF-8 safe message splitting."""