feat: voice profiles, rubberband FX, per-bot plugin filtering
- Add rubberband package to container for pitch-shifting FX - Split FX chain: rubberband CLI for pitch, ffmpeg for filters - Configurable voice profile (voice, fx, piper params) in [voice] - Extra bots inherit voice config (minus trigger) for own TTS - Greeting is voice-only, spoken directly by the greeting bot - Per-bot only_plugins/except_plugins filtering on Mumble - Alias plugin, core plugin tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
212
tests/test_alias.py
Normal file
212
tests/test_alias.py
Normal file
@@ -0,0 +1,212 @@
|
||||
"""Tests for the alias plugin."""
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import sys
|
||||
|
||||
from derp.plugin import PluginRegistry
|
||||
|
||||
# -- Load plugin module directly ---------------------------------------------
|
||||
|
||||
_spec = importlib.util.spec_from_file_location("alias", "plugins/alias.py")
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules["alias"] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
|
||||
# -- Fakes -------------------------------------------------------------------
|
||||
|
||||
|
||||
class _FakeState:
|
||||
def __init__(self):
|
||||
self._store: dict[str, dict[str, str]] = {}
|
||||
|
||||
def get(self, ns: str, key: str) -> str | None:
|
||||
return self._store.get(ns, {}).get(key)
|
||||
|
||||
def set(self, ns: str, key: str, value: str) -> None:
|
||||
self._store.setdefault(ns, {})[key] = value
|
||||
|
||||
def delete(self, ns: str, key: str) -> bool:
|
||||
if ns in self._store and key in self._store[ns]:
|
||||
del self._store[ns][key]
|
||||
return True
|
||||
return False
|
||||
|
||||
def keys(self, ns: str) -> list[str]:
|
||||
return list(self._store.get(ns, {}).keys())
|
||||
|
||||
def clear(self, ns: str) -> int:
|
||||
count = len(self._store.get(ns, {}))
|
||||
self._store.pop(ns, None)
|
||||
return count
|
||||
|
||||
|
||||
class _FakeBot:
|
||||
def __init__(self, *, admin: bool = False):
|
||||
self.replied: list[str] = []
|
||||
self.state = _FakeState()
|
||||
self.registry = PluginRegistry()
|
||||
self._admin = admin
|
||||
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
def _is_admin(self, message) -> bool:
|
||||
return self._admin
|
||||
|
||||
|
||||
class _Msg:
|
||||
def __init__(self, text="!alias"):
|
||||
self.text = text
|
||||
self.nick = "Alice"
|
||||
self.target = "#test"
|
||||
self.is_channel = True
|
||||
self.prefix = "Alice!~alice@host"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestAliasAdd
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAliasAdd:
|
||||
def test_add_creates_alias(self):
|
||||
bot = _FakeBot()
|
||||
# Register a target command
|
||||
async def _noop(b, m): pass
|
||||
bot.registry.register_command("skip", _noop, plugin="music")
|
||||
msg = _Msg(text="!alias add s skip")
|
||||
asyncio.run(_mod.cmd_alias(bot, msg))
|
||||
assert bot.state.get("alias", "s") == "skip"
|
||||
assert any("s -> skip" in r for r in bot.replied)
|
||||
|
||||
def test_add_rejects_existing_command(self):
|
||||
bot = _FakeBot()
|
||||
async def _noop(b, m): pass
|
||||
bot.registry.register_command("skip", _noop, plugin="music")
|
||||
msg = _Msg(text="!alias add skip stop")
|
||||
asyncio.run(_mod.cmd_alias(bot, msg))
|
||||
assert any("already a registered command" in r for r in bot.replied)
|
||||
assert bot.state.get("alias", "skip") is None
|
||||
|
||||
def test_add_rejects_chaining(self):
|
||||
bot = _FakeBot()
|
||||
async def _noop(b, m): pass
|
||||
bot.registry.register_command("skip", _noop, plugin="music")
|
||||
bot.state.set("alias", "sk", "skip")
|
||||
msg = _Msg(text="!alias add x sk")
|
||||
asyncio.run(_mod.cmd_alias(bot, msg))
|
||||
assert any("no chaining" in r for r in bot.replied)
|
||||
|
||||
def test_add_rejects_unknown_target(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!alias add s nonexistent")
|
||||
asyncio.run(_mod.cmd_alias(bot, msg))
|
||||
assert any("unknown command" in r for r in bot.replied)
|
||||
|
||||
def test_add_lowercases_name(self):
|
||||
bot = _FakeBot()
|
||||
async def _noop(b, m): pass
|
||||
bot.registry.register_command("skip", _noop, plugin="music")
|
||||
msg = _Msg(text="!alias add S skip")
|
||||
asyncio.run(_mod.cmd_alias(bot, msg))
|
||||
assert bot.state.get("alias", "s") == "skip"
|
||||
|
||||
def test_add_missing_args(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!alias add s")
|
||||
asyncio.run(_mod.cmd_alias(bot, msg))
|
||||
assert any("Usage" in r for r in bot.replied)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestAliasDel
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAliasDel:
|
||||
def test_del_removes_alias(self):
|
||||
bot = _FakeBot()
|
||||
bot.state.set("alias", "s", "skip")
|
||||
msg = _Msg(text="!alias del s")
|
||||
asyncio.run(_mod.cmd_alias(bot, msg))
|
||||
assert bot.state.get("alias", "s") is None
|
||||
assert any("removed" in r for r in bot.replied)
|
||||
|
||||
def test_del_nonexistent(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!alias del x")
|
||||
asyncio.run(_mod.cmd_alias(bot, msg))
|
||||
assert any("no alias" in r for r in bot.replied)
|
||||
|
||||
def test_del_missing_name(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!alias del")
|
||||
asyncio.run(_mod.cmd_alias(bot, msg))
|
||||
assert any("Usage" in r for r in bot.replied)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestAliasList
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAliasList:
|
||||
def test_list_empty(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!alias list")
|
||||
asyncio.run(_mod.cmd_alias(bot, msg))
|
||||
assert any("No aliases" in r for r in bot.replied)
|
||||
|
||||
def test_list_shows_entries(self):
|
||||
bot = _FakeBot()
|
||||
bot.state.set("alias", "s", "skip")
|
||||
bot.state.set("alias", "np", "nowplaying")
|
||||
msg = _Msg(text="!alias list")
|
||||
asyncio.run(_mod.cmd_alias(bot, msg))
|
||||
assert any("s -> skip" in r for r in bot.replied)
|
||||
assert any("np -> nowplaying" in r for r in bot.replied)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestAliasClear
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAliasClear:
|
||||
def test_clear_as_admin(self):
|
||||
bot = _FakeBot(admin=True)
|
||||
bot.state.set("alias", "s", "skip")
|
||||
bot.state.set("alias", "np", "nowplaying")
|
||||
msg = _Msg(text="!alias clear")
|
||||
asyncio.run(_mod.cmd_alias(bot, msg))
|
||||
assert any("Cleared 2" in r for r in bot.replied)
|
||||
assert bot.state.keys("alias") == []
|
||||
|
||||
def test_clear_denied_non_admin(self):
|
||||
bot = _FakeBot(admin=False)
|
||||
bot.state.set("alias", "s", "skip")
|
||||
msg = _Msg(text="!alias clear")
|
||||
asyncio.run(_mod.cmd_alias(bot, msg))
|
||||
assert any("Permission denied" in r for r in bot.replied)
|
||||
assert bot.state.get("alias", "s") == "skip"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestAliasUsage
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAliasUsage:
|
||||
def test_no_subcommand(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!alias")
|
||||
asyncio.run(_mod.cmd_alias(bot, msg))
|
||||
assert any("Usage" in r for r in bot.replied)
|
||||
|
||||
def test_unknown_subcommand(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!alias foo")
|
||||
asyncio.run(_mod.cmd_alias(bot, msg))
|
||||
assert any("Usage" in r for r in bot.replied)
|
||||
92
tests/test_core.py
Normal file
92
tests/test_core.py
Normal file
@@ -0,0 +1,92 @@
|
||||
"""Tests for the core plugin."""
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import sys
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
# -- Load plugin module directly ---------------------------------------------
|
||||
|
||||
_spec = importlib.util.spec_from_file_location("core", "plugins/core.py")
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules["core"] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
|
||||
# -- Fakes -------------------------------------------------------------------
|
||||
|
||||
|
||||
class _FakeRegistry:
|
||||
def __init__(self):
|
||||
self._bots: dict = {}
|
||||
|
||||
|
||||
class _FakeBot:
|
||||
def __init__(self, *, mumble: bool = False):
|
||||
self.replied: list[str] = []
|
||||
self.registry = _FakeRegistry()
|
||||
self.nick = "derp"
|
||||
self._receive_sound = False
|
||||
if mumble:
|
||||
self._mumble = MagicMock()
|
||||
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
|
||||
def _make_listener():
|
||||
"""Create a fake listener bot (merlin) with _receive_sound=True."""
|
||||
listener = _FakeBot(mumble=True)
|
||||
listener.nick = "merlin"
|
||||
listener._receive_sound = True
|
||||
return listener
|
||||
|
||||
|
||||
class _Msg:
|
||||
def __init__(self, text="!deaf"):
|
||||
self.text = text
|
||||
self.nick = "Alice"
|
||||
self.target = "0"
|
||||
self.is_channel = True
|
||||
self.prefix = "Alice"
|
||||
|
||||
|
||||
# -- Tests -------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDeafCommand:
|
||||
def test_deaf_targets_listener(self):
|
||||
"""!deaf toggles the listener bot (merlin), not the calling bot."""
|
||||
bot = _FakeBot(mumble=True)
|
||||
listener = _make_listener()
|
||||
bot.registry._bots = {"derp": bot, "merlin": listener}
|
||||
listener._mumble.users.myself.get.return_value = False
|
||||
msg = _Msg(text="!deaf")
|
||||
asyncio.run(_mod.cmd_deaf(bot, msg))
|
||||
listener._mumble.users.myself.deafen.assert_called_once()
|
||||
assert any("merlin" in r and "deafened" in r for r in bot.replied)
|
||||
|
||||
def test_deaf_toggle_off(self):
|
||||
bot = _FakeBot(mumble=True)
|
||||
listener = _make_listener()
|
||||
bot.registry._bots = {"derp": bot, "merlin": listener}
|
||||
listener._mumble.users.myself.get.return_value = True
|
||||
msg = _Msg(text="!deaf")
|
||||
asyncio.run(_mod.cmd_deaf(bot, msg))
|
||||
listener._mumble.users.myself.undeafen.assert_called_once()
|
||||
listener._mumble.users.myself.unmute.assert_called_once()
|
||||
assert any("merlin" in r and "undeafened" in r for r in bot.replied)
|
||||
|
||||
def test_deaf_non_mumble_silent(self):
|
||||
bot = _FakeBot(mumble=False)
|
||||
msg = _Msg(text="!deaf")
|
||||
asyncio.run(_mod.cmd_deaf(bot, msg))
|
||||
assert bot.replied == []
|
||||
|
||||
def test_deaf_fallback_no_listener(self):
|
||||
"""Falls back to calling bot when no listener is registered."""
|
||||
bot = _FakeBot(mumble=True)
|
||||
bot._mumble.users.myself.get.return_value = False
|
||||
msg = _Msg(text="!deaf")
|
||||
asyncio.run(_mod.cmd_deaf(bot, msg))
|
||||
bot._mumble.users.myself.deafen.assert_called_once()
|
||||
@@ -35,6 +35,13 @@ class _FakeState:
|
||||
return list(self._store.get(ns, {}).keys())
|
||||
|
||||
|
||||
class _FakeRegistry:
|
||||
"""Minimal registry with shared voice timestamp."""
|
||||
|
||||
def __init__(self):
|
||||
self._voice_ts: float = 0.0
|
||||
|
||||
|
||||
class _FakeBot:
|
||||
"""Minimal bot for music plugin testing."""
|
||||
|
||||
@@ -45,6 +52,7 @@ class _FakeBot:
|
||||
self.config: dict = {}
|
||||
self._pstate: dict = {}
|
||||
self._tasks: set[asyncio.Task] = set()
|
||||
self.registry = _FakeRegistry()
|
||||
if mumble:
|
||||
self.stream_audio = AsyncMock()
|
||||
|
||||
@@ -299,6 +307,19 @@ class TestNpCommand:
|
||||
asyncio.run(_mod.cmd_np(bot, msg))
|
||||
assert any("Cool Song" in r for r in bot.replied)
|
||||
assert any("DJ" in r for r in bot.replied)
|
||||
assert any("0:00" in r for r in bot.replied)
|
||||
|
||||
def test_np_shows_elapsed(self):
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
ps["current"] = _mod._Track(
|
||||
url="x", title="Cool Song", requester="DJ",
|
||||
)
|
||||
ps["cur_seek"] = 60.0
|
||||
ps["progress"] = [1500] # 1500 * 0.02 = 30s
|
||||
msg = _Msg(text="!np")
|
||||
asyncio.run(_mod.cmd_np(bot, msg))
|
||||
assert any("1:30" in r for r in bot.replied)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -527,6 +548,21 @@ class TestPlaylistExpansion:
|
||||
assert tracks[0] == ("https://example.com/1", "First")
|
||||
assert tracks[1] == ("https://example.com/2", "Second")
|
||||
|
||||
def test_resolve_tracks_preserves_playlist_url(self):
|
||||
"""Video+playlist URL passes through to yt-dlp intact."""
|
||||
result = MagicMock()
|
||||
result.stdout = (
|
||||
"https://youtube.com/watch?v=a\nFirst\n"
|
||||
"https://youtube.com/watch?v=b\nSecond\n"
|
||||
)
|
||||
url = "https://www.youtube.com/watch?v=a&list=PLxyz&index=1"
|
||||
with patch("subprocess.run", return_value=result) as mock_run:
|
||||
tracks = _mod._resolve_tracks(url)
|
||||
# URL must reach yt-dlp with &list= intact
|
||||
called_url = mock_run.call_args[0][0][-1]
|
||||
assert "list=PLxyz" in called_url
|
||||
assert len(tracks) == 2
|
||||
|
||||
def test_resolve_tracks_error_fallback(self):
|
||||
"""On error, returns [(url, url)]."""
|
||||
with patch("subprocess.run", side_effect=Exception("fail")):
|
||||
@@ -580,6 +616,56 @@ class TestResumeState:
|
||||
bot.state.set("music", "resume", '{"title": "x"}')
|
||||
assert _mod._load_resume(bot) is None
|
||||
|
||||
def test_save_strips_youtube_playlist_params(self):
|
||||
"""_save_resume strips &list= and other playlist params from YouTube URLs."""
|
||||
bot = _FakeBot()
|
||||
track = _mod._Track(
|
||||
url="https://www.youtube.com/watch?v=abc123&list=RDabc123&start_radio=1&pp=xyz",
|
||||
title="Song", requester="Alice",
|
||||
)
|
||||
_mod._save_resume(bot, track, 60.0)
|
||||
data = _mod._load_resume(bot)
|
||||
assert data is not None
|
||||
assert data["url"] == "https://www.youtube.com/watch?v=abc123"
|
||||
|
||||
def test_save_preserves_non_youtube_urls(self):
|
||||
"""_save_resume leaves non-YouTube URLs unchanged."""
|
||||
bot = _FakeBot()
|
||||
track = _mod._Track(
|
||||
url="https://soundcloud.com/artist/track?ref=playlist",
|
||||
title="Song", requester="Alice",
|
||||
)
|
||||
_mod._save_resume(bot, track, 30.0)
|
||||
data = _mod._load_resume(bot)
|
||||
assert data["url"] == "https://soundcloud.com/artist/track?ref=playlist"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestStripPlaylistParams
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestStripPlaylistParams:
|
||||
def test_strips_list_param(self):
|
||||
url = "https://www.youtube.com/watch?v=abc&list=PLxyz&index=3"
|
||||
assert _mod._strip_playlist_params(url) == "https://www.youtube.com/watch?v=abc"
|
||||
|
||||
def test_strips_radio_params(self):
|
||||
url = "https://www.youtube.com/watch?v=abc&list=RDabc&start_radio=1&pp=xyz"
|
||||
assert _mod._strip_playlist_params(url) == "https://www.youtube.com/watch?v=abc"
|
||||
|
||||
def test_preserves_plain_url(self):
|
||||
url = "https://www.youtube.com/watch?v=abc123"
|
||||
assert _mod._strip_playlist_params(url) == "https://www.youtube.com/watch?v=abc123"
|
||||
|
||||
def test_non_youtube_unchanged(self):
|
||||
url = "https://soundcloud.com/track?list=abc"
|
||||
assert _mod._strip_playlist_params(url) == url
|
||||
|
||||
def test_youtu_be_without_v_param(self):
|
||||
url = "https://youtu.be/abc123"
|
||||
assert _mod._strip_playlist_params(url) == url
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestResumeCommand
|
||||
@@ -740,7 +826,7 @@ class TestDuckMonitor:
|
||||
ps = _mod._ps(bot)
|
||||
ps["duck_enabled"] = True
|
||||
ps["duck_floor"] = 5
|
||||
bot._last_voice_ts = time.monotonic()
|
||||
bot.registry._voice_ts = time.monotonic()
|
||||
|
||||
async def _check():
|
||||
task = asyncio.create_task(_mod._duck_monitor(bot))
|
||||
@@ -760,7 +846,7 @@ class TestDuckMonitor:
|
||||
ps["duck_floor"] = 1
|
||||
ps["duck_restore"] = 10 # 10s total restore
|
||||
ps["volume"] = 50
|
||||
bot._last_voice_ts = time.monotonic() - 100
|
||||
bot.registry._voice_ts = time.monotonic() - 100
|
||||
ps["duck_vol"] = 1.0 # already ducked
|
||||
|
||||
async def _check():
|
||||
@@ -783,7 +869,7 @@ class TestDuckMonitor:
|
||||
ps["duck_floor"] = 1
|
||||
ps["duck_restore"] = 1 # 1s restore -- completes quickly
|
||||
ps["volume"] = 50
|
||||
bot._last_voice_ts = time.monotonic() - 100
|
||||
bot.registry._voice_ts = time.monotonic() - 100
|
||||
ps["duck_vol"] = 1.0
|
||||
|
||||
async def _check():
|
||||
@@ -805,14 +891,14 @@ class TestDuckMonitor:
|
||||
ps["duck_floor"] = 5
|
||||
ps["duck_restore"] = 30
|
||||
ps["volume"] = 50
|
||||
bot._last_voice_ts = time.monotonic() - 100
|
||||
bot.registry._voice_ts = time.monotonic() - 100
|
||||
ps["duck_vol"] = 30.0 # mid-restore
|
||||
|
||||
async def _check():
|
||||
task = asyncio.create_task(_mod._duck_monitor(bot))
|
||||
await asyncio.sleep(0.5)
|
||||
# Simulate voice arriving now
|
||||
bot._last_voice_ts = time.monotonic()
|
||||
bot.registry._voice_ts = time.monotonic()
|
||||
await asyncio.sleep(1.5)
|
||||
assert ps["duck_vol"] == 5.0 # re-ducked to floor
|
||||
task.cancel()
|
||||
@@ -826,7 +912,7 @@ class TestDuckMonitor:
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
ps["duck_enabled"] = False
|
||||
bot._last_voice_ts = time.monotonic()
|
||||
bot.registry._voice_ts = time.monotonic()
|
||||
|
||||
async def _check():
|
||||
task = asyncio.create_task(_mod._duck_monitor(bot))
|
||||
@@ -850,7 +936,7 @@ class TestAutoResume:
|
||||
"""Auto-resume loads saved state when channel is silent."""
|
||||
bot = _FakeBot()
|
||||
bot._connect_count = 2
|
||||
bot._last_voice_ts = 0.0
|
||||
bot.registry._voice_ts = 0.0
|
||||
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
|
||||
_mod._save_resume(bot, track, 120.0)
|
||||
|
||||
@@ -878,7 +964,7 @@ class TestAutoResume:
|
||||
def test_no_resume_if_no_state(self):
|
||||
"""Auto-resume returns early when nothing is saved."""
|
||||
bot = _FakeBot()
|
||||
bot._last_voice_ts = 0.0
|
||||
bot.registry._voice_ts = 0.0
|
||||
with patch.object(_mod, "_ensure_loop") as mock_loop:
|
||||
asyncio.run(_mod._auto_resume(bot))
|
||||
mock_loop.assert_not_called()
|
||||
@@ -887,7 +973,7 @@ class TestAutoResume:
|
||||
"""Auto-resume aborts if voice never goes silent within deadline."""
|
||||
bot = _FakeBot()
|
||||
now = time.monotonic()
|
||||
bot._last_voice_ts = now
|
||||
bot.registry._voice_ts = now
|
||||
ps = _mod._ps(bot)
|
||||
ps["duck_silence"] = 15
|
||||
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
|
||||
@@ -903,7 +989,7 @@ class TestAutoResume:
|
||||
|
||||
async def _fast_sleep(s):
|
||||
mono_val[0] += s
|
||||
bot._last_voice_ts = mono_val[0]
|
||||
bot.registry._voice_ts = mono_val[0]
|
||||
await _real_sleep(0)
|
||||
|
||||
with patch.object(time, "monotonic", side_effect=_fast_mono):
|
||||
@@ -918,6 +1004,9 @@ class TestAutoResume:
|
||||
"""Watcher detects connect_count increment and calls _auto_resume."""
|
||||
bot = _FakeBot()
|
||||
bot._connect_count = 1
|
||||
# Resume state must exist for watcher to call _auto_resume
|
||||
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
|
||||
_mod._save_resume(bot, track, 60.0)
|
||||
|
||||
async def _check():
|
||||
with patch.object(_mod, "_auto_resume", new_callable=AsyncMock) as mock_ar:
|
||||
@@ -1014,6 +1103,111 @@ class TestAutoResume:
|
||||
assert spawned.count("music-reconnect-watcher") == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestAutoplayKept
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAutoplayKept:
|
||||
def test_shuffles_kept_tracks(self, tmp_path):
|
||||
"""Autoplay loads kept tracks, shuffles, and starts playback."""
|
||||
bot = _FakeBot()
|
||||
bot.registry._voice_ts = 0.0
|
||||
music_dir = tmp_path / "music"
|
||||
music_dir.mkdir()
|
||||
# Create two kept files
|
||||
(music_dir / "a.opus").write_bytes(b"audio")
|
||||
(music_dir / "b.opus").write_bytes(b"audio")
|
||||
bot.state.set("music", "keep:1", json.dumps({
|
||||
"url": "https://example.com/a", "title": "Track A",
|
||||
"filename": "a.opus", "id": 1,
|
||||
}))
|
||||
bot.state.set("music", "keep:2", json.dumps({
|
||||
"url": "https://example.com/b", "title": "Track B",
|
||||
"filename": "b.opus", "id": 2,
|
||||
}))
|
||||
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
|
||||
patch.object(_mod, "_ensure_loop") as mock_loop:
|
||||
asyncio.run(_mod._autoplay_kept(bot))
|
||||
mock_loop.assert_called_once_with(bot)
|
||||
ps = _mod._ps(bot)
|
||||
assert len(ps["queue"]) == 2
|
||||
titles = {t.title for t in ps["queue"]}
|
||||
assert titles == {"Track A", "Track B"}
|
||||
# All tracks marked keep=True
|
||||
assert all(t.keep for t in ps["queue"])
|
||||
|
||||
def test_skips_when_already_playing(self):
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
ps["current"] = _mod._Track(url="x", title="Playing", requester="a")
|
||||
with patch.object(_mod, "_ensure_loop") as mock_loop:
|
||||
asyncio.run(_mod._autoplay_kept(bot))
|
||||
mock_loop.assert_not_called()
|
||||
|
||||
def test_skips_when_no_kept_tracks(self):
|
||||
bot = _FakeBot()
|
||||
bot.registry._voice_ts = 0.0
|
||||
with patch.object(_mod, "_ensure_loop") as mock_loop:
|
||||
asyncio.run(_mod._autoplay_kept(bot))
|
||||
mock_loop.assert_not_called()
|
||||
|
||||
def test_load_kept_tracks_skips_missing_files(self, tmp_path):
|
||||
"""Tracks with missing local files are excluded."""
|
||||
bot = _FakeBot()
|
||||
music_dir = tmp_path / "music"
|
||||
music_dir.mkdir()
|
||||
bot.state.set("music", "keep:1", json.dumps({
|
||||
"url": "https://example.com/a", "title": "Gone",
|
||||
"filename": "missing.opus", "id": 1,
|
||||
}))
|
||||
with patch.object(_mod, "_MUSIC_DIR", music_dir):
|
||||
tracks = _mod._load_kept_tracks(bot)
|
||||
assert tracks == []
|
||||
|
||||
def test_watcher_autoplay_on_boot_no_resume(self):
|
||||
"""Watcher triggers autoplay on boot when no resume state exists."""
|
||||
bot = _FakeBot()
|
||||
bot._connect_count = 0
|
||||
|
||||
async def _check():
|
||||
with patch.object(_mod, "_autoplay_kept",
|
||||
new_callable=AsyncMock) as mock_ap:
|
||||
task = asyncio.create_task(_mod._reconnect_watcher(bot))
|
||||
await asyncio.sleep(0.5)
|
||||
bot._connect_count = 1
|
||||
await asyncio.sleep(3)
|
||||
mock_ap.assert_called_once_with(bot)
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
asyncio.run(_check())
|
||||
|
||||
def test_watcher_autoplay_on_reconnect_no_resume(self):
|
||||
"""Watcher triggers autoplay on reconnect when no resume state."""
|
||||
bot = _FakeBot()
|
||||
bot._connect_count = 1
|
||||
|
||||
async def _check():
|
||||
with patch.object(_mod, "_autoplay_kept",
|
||||
new_callable=AsyncMock) as mock_ap:
|
||||
task = asyncio.create_task(_mod._reconnect_watcher(bot))
|
||||
await asyncio.sleep(0.5)
|
||||
bot._connect_count = 2
|
||||
await asyncio.sleep(3)
|
||||
mock_ap.assert_called_once_with(bot)
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
asyncio.run(_check())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestDownloadTrack
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -1143,6 +1337,49 @@ class TestKeepCommand:
|
||||
assert track.keep is True
|
||||
assert any("Keeping" in r for r in bot.replied)
|
||||
|
||||
def test_keep_duplicate_blocked(self, tmp_path):
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
f = tmp_path / "abc123.opus"
|
||||
f.write_bytes(b"audio")
|
||||
track = _mod._Track(
|
||||
url="https://example.com/v", title="t", requester="a",
|
||||
local_path=f,
|
||||
)
|
||||
ps["current"] = track
|
||||
# Pre-existing kept entry with same URL
|
||||
bot.state.set("music", "keep:1", json.dumps({
|
||||
"url": "https://example.com/v", "id": 1,
|
||||
}))
|
||||
bot.state.set("music", "keep_next_id", "2")
|
||||
msg = _Msg(text="!keep")
|
||||
asyncio.run(_mod.cmd_keep(bot, msg))
|
||||
assert any("Already kept" in r for r in bot.replied)
|
||||
assert any("#1" in r for r in bot.replied)
|
||||
# ID counter should not have incremented
|
||||
assert bot.state.get("music", "keep_next_id") == "2"
|
||||
|
||||
def test_keep_duplicate_with_playlist_params(self, tmp_path):
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
f = tmp_path / "abc123.opus"
|
||||
f.write_bytes(b"audio")
|
||||
# Track URL has playlist cruft
|
||||
track = _mod._Track(
|
||||
url="https://www.youtube.com/watch?v=abc&list=RDabc&start_radio=1",
|
||||
title="t", requester="a", local_path=f,
|
||||
)
|
||||
ps["current"] = track
|
||||
# Existing entry stored with clean URL
|
||||
bot.state.set("music", "keep:1", json.dumps({
|
||||
"url": "https://www.youtube.com/watch?v=abc", "id": 1,
|
||||
}))
|
||||
bot.state.set("music", "keep_next_id", "2")
|
||||
msg = _Msg(text="!keep")
|
||||
asyncio.run(_mod.cmd_keep(bot, msg))
|
||||
assert any("Already kept" in r for r in bot.replied)
|
||||
assert bot.state.get("music", "keep_next_id") == "2"
|
||||
|
||||
def test_keep_non_mumble(self):
|
||||
bot = _FakeBot(mumble=False)
|
||||
msg = _Msg(text="!keep")
|
||||
@@ -1267,50 +1504,43 @@ class TestSeekCommand:
|
||||
def test_seek_absolute(self):
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
track = _mod._Track(url="x", title="Song", requester="a")
|
||||
ps["current"] = track
|
||||
mock_task = MagicMock()
|
||||
mock_task.done.return_value = False
|
||||
ps["task"] = mock_task
|
||||
ps["current"] = _mod._Track(url="x", title="Song", requester="a")
|
||||
ps["seek_req"] = [None]
|
||||
ps["progress"] = [100]
|
||||
msg = _Msg(text="!seek 1:30")
|
||||
with patch.object(_mod, "_ensure_loop") as mock_loop:
|
||||
asyncio.run(_mod.cmd_seek(bot, msg))
|
||||
mock_loop.assert_called_once_with(bot, seek=90.0)
|
||||
assert ps["queue"][0] is track
|
||||
asyncio.run(_mod.cmd_seek(bot, msg))
|
||||
assert ps["seek_req"][0] == 90.0
|
||||
assert ps["cur_seek"] == 90.0
|
||||
assert ps["progress"][0] == 0
|
||||
assert any("1:30" in r for r in bot.replied)
|
||||
mock_task.cancel.assert_called_once()
|
||||
|
||||
def test_seek_relative_forward(self):
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
track = _mod._Track(url="x", title="Song", requester="a")
|
||||
ps["current"] = track
|
||||
ps["current"] = _mod._Track(url="x", title="Song", requester="a")
|
||||
ps["seek_req"] = [None]
|
||||
ps["progress"] = [1500] # 1500 * 0.02 = 30s
|
||||
ps["cur_seek"] = 60.0 # started at 60s
|
||||
mock_task = MagicMock()
|
||||
mock_task.done.return_value = False
|
||||
ps["task"] = mock_task
|
||||
msg = _Msg(text="!seek +30")
|
||||
with patch.object(_mod, "_ensure_loop") as mock_loop:
|
||||
asyncio.run(_mod.cmd_seek(bot, msg))
|
||||
# elapsed = 60 + 30 = 90, target = 90 + 30 = 120
|
||||
mock_loop.assert_called_once_with(bot, seek=120.0)
|
||||
asyncio.run(_mod.cmd_seek(bot, msg))
|
||||
# elapsed = 60 + 30 = 90, target = 90 + 30 = 120
|
||||
assert ps["seek_req"][0] == 120.0
|
||||
assert ps["cur_seek"] == 120.0
|
||||
assert ps["progress"][0] == 0
|
||||
|
||||
def test_seek_relative_backward_clamps(self):
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
track = _mod._Track(url="x", title="Song", requester="a")
|
||||
ps["current"] = track
|
||||
ps["current"] = _mod._Track(url="x", title="Song", requester="a")
|
||||
ps["seek_req"] = [None]
|
||||
ps["progress"] = [500] # 500 * 0.02 = 10s
|
||||
ps["cur_seek"] = 0.0
|
||||
mock_task = MagicMock()
|
||||
mock_task.done.return_value = False
|
||||
ps["task"] = mock_task
|
||||
msg = _Msg(text="!seek -30")
|
||||
with patch.object(_mod, "_ensure_loop") as mock_loop:
|
||||
asyncio.run(_mod.cmd_seek(bot, msg))
|
||||
# elapsed = 0 + 10 = 10, target = 10 - 30 = -20, clamped to 0
|
||||
mock_loop.assert_called_once_with(bot, seek=0.0)
|
||||
asyncio.run(_mod.cmd_seek(bot, msg))
|
||||
# elapsed = 0 + 10 = 10, target = 10 - 30 = -20, clamped to 0
|
||||
assert ps["seek_req"][0] == 0.0
|
||||
assert ps["cur_seek"] == 0.0
|
||||
assert ps["progress"][0] == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -36,6 +36,20 @@ class TestDecorators:
|
||||
assert handler._derp_event == "PRIVMSG"
|
||||
|
||||
|
||||
def test_command_decorator_aliases(self):
|
||||
@command("skip", help="skip track", aliases=["next", "s"])
|
||||
async def handler(bot, msg):
|
||||
pass
|
||||
|
||||
assert handler._derp_aliases == ["next", "s"]
|
||||
|
||||
def test_command_decorator_aliases_default(self):
|
||||
@command("ping", help="ping")
|
||||
async def handler(bot, msg):
|
||||
pass
|
||||
|
||||
assert handler._derp_aliases == []
|
||||
|
||||
def test_command_decorator_admin(self):
|
||||
@command("secret", help="admin only", admin=True)
|
||||
async def handler(bot, msg):
|
||||
@@ -208,6 +222,46 @@ class TestRegistry:
|
||||
assert registry.commands["secret"].admin is True
|
||||
assert registry.commands["public"].admin is False
|
||||
|
||||
def test_load_plugin_aliases(self, tmp_path: Path):
|
||||
plugin_file = tmp_path / "aliased.py"
|
||||
plugin_file.write_text(textwrap.dedent("""\
|
||||
from derp.plugin import command
|
||||
|
||||
@command("skip", help="Skip track", aliases=["next", "s"])
|
||||
async def cmd_skip(bot, msg):
|
||||
pass
|
||||
"""))
|
||||
|
||||
registry = PluginRegistry()
|
||||
count = registry.load_plugin(plugin_file)
|
||||
assert count == 3 # primary + 2 aliases
|
||||
assert "skip" in registry.commands
|
||||
assert "next" in registry.commands
|
||||
assert "s" in registry.commands
|
||||
# Aliases point to the same callback
|
||||
assert registry.commands["next"].callback is registry.commands["skip"].callback
|
||||
assert registry.commands["s"].callback is registry.commands["skip"].callback
|
||||
# Alias help text references the primary command
|
||||
assert registry.commands["next"].help == "alias for !skip"
|
||||
|
||||
def test_unload_removes_aliases(self, tmp_path: Path):
|
||||
plugin_file = tmp_path / "aliased.py"
|
||||
plugin_file.write_text(textwrap.dedent("""\
|
||||
from derp.plugin import command
|
||||
|
||||
@command("skip", help="Skip track", aliases=["next"])
|
||||
async def cmd_skip(bot, msg):
|
||||
pass
|
||||
"""))
|
||||
|
||||
registry = PluginRegistry()
|
||||
registry.load_plugin(plugin_file)
|
||||
assert "next" in registry.commands
|
||||
|
||||
registry.unload_plugin("aliased")
|
||||
assert "skip" not in registry.commands
|
||||
assert "next" not in registry.commands
|
||||
|
||||
def test_load_plugin_stores_path(self, tmp_path: Path):
|
||||
plugin_file = tmp_path / "pathed.py"
|
||||
plugin_file.write_text(textwrap.dedent("""\
|
||||
@@ -677,6 +731,71 @@ class TestChannelFilter:
|
||||
assert bot._plugin_allowed("encode", "&local") is False
|
||||
|
||||
|
||||
class TestAliasDispatch:
|
||||
"""Test alias fallback in _dispatch_command."""
|
||||
|
||||
@staticmethod
|
||||
def _make_bot_with_alias(alias_name: str, target_cmd: str) -> tuple[Bot, list]:
|
||||
"""Create a Bot with a command and an alias pointing to it."""
|
||||
config = {
|
||||
"server": {"host": "localhost", "port": 6667, "tls": False,
|
||||
"nick": "test", "user": "test", "realname": "test"},
|
||||
"bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins"},
|
||||
}
|
||||
registry = PluginRegistry()
|
||||
called = []
|
||||
|
||||
async def _handler(bot, msg):
|
||||
called.append(msg.text)
|
||||
|
||||
registry.register_command(target_cmd, _handler, plugin="test")
|
||||
bot = Bot("test", config, registry)
|
||||
bot.conn = _FakeConnection()
|
||||
bot.state.set("alias", alias_name, target_cmd)
|
||||
return bot, called
|
||||
|
||||
def test_alias_resolves_command(self):
|
||||
"""An alias triggers the target command handler."""
|
||||
bot, called = self._make_bot_with_alias("s", "skip")
|
||||
msg = Message(raw="", prefix="nick!u@h", nick="nick",
|
||||
command="PRIVMSG", params=["#ch", "!s"], tags={})
|
||||
|
||||
async def _run():
|
||||
bot._dispatch_command(msg)
|
||||
await asyncio.sleep(0.05) # let spawned task run
|
||||
|
||||
asyncio.run(_run())
|
||||
assert len(called) == 1
|
||||
|
||||
def test_alias_ignored_when_command_exists(self):
|
||||
"""Direct command match takes priority over alias."""
|
||||
bot, called = self._make_bot_with_alias("skip", "stop")
|
||||
# "skip" is both a real command and an alias to "stop"; real wins
|
||||
msg = Message(raw="", prefix="nick!u@h", nick="nick",
|
||||
command="PRIVMSG", params=["#ch", "!skip"], tags={})
|
||||
|
||||
async def _run():
|
||||
bot._dispatch_command(msg)
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
asyncio.run(_run())
|
||||
assert len(called) == 1
|
||||
# Handler was the "skip" handler, not "stop"
|
||||
|
||||
def test_no_alias_no_crash(self):
|
||||
"""Unknown command with no alias silently returns."""
|
||||
config = {
|
||||
"server": {"host": "localhost", "port": 6667, "tls": False,
|
||||
"nick": "test", "user": "test", "realname": "test"},
|
||||
"bot": {"prefix": "!", "channels": [], "plugins_dir": "plugins"},
|
||||
}
|
||||
bot = Bot("test", config, PluginRegistry())
|
||||
bot.conn = _FakeConnection()
|
||||
msg = Message(raw="", prefix="nick!u@h", nick="nick",
|
||||
command="PRIVMSG", params=["#ch", "!nonexistent"], tags={})
|
||||
bot._dispatch_command(msg) # should not raise
|
||||
|
||||
|
||||
class TestSplitUtf8:
|
||||
"""Test UTF-8 safe message splitting."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user