_resolve_title replaced with _resolve_tracks using --flat-playlist to enumerate playlist entries. cmd_play enqueues each track individually, with truncation when the queue is nearly full. Single-video behavior unchanged.
460 lines
16 KiB
Python
460 lines
16 KiB
Python
"""Tests for the music playback plugin."""
|
|
|
|
import asyncio
|
|
import importlib.util
|
|
import sys
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
# -- Load plugin module directly ---------------------------------------------
|
|
|
|
_spec = importlib.util.spec_from_file_location("music", "plugins/music.py")
|
|
_mod = importlib.util.module_from_spec(_spec)
|
|
sys.modules["music"] = _mod
|
|
_spec.loader.exec_module(_mod)
|
|
|
|
|
|
# -- Fakes -------------------------------------------------------------------
|
|
|
|
|
|
class _FakeState:
|
|
def __init__(self):
|
|
self._store: dict[str, dict[str, str]] = {}
|
|
|
|
def get(self, ns: str, key: str) -> str | None:
|
|
return self._store.get(ns, {}).get(key)
|
|
|
|
def set(self, ns: str, key: str, value: str) -> None:
|
|
self._store.setdefault(ns, {})[key] = value
|
|
|
|
def delete(self, ns: str, key: str) -> None:
|
|
self._store.get(ns, {}).pop(key, None)
|
|
|
|
def keys(self, ns: str) -> list[str]:
|
|
return list(self._store.get(ns, {}).keys())
|
|
|
|
|
|
class _FakeBot:
|
|
"""Minimal bot for music plugin testing."""
|
|
|
|
def __init__(self, *, mumble: bool = True):
|
|
self.sent: list[tuple[str, str]] = []
|
|
self.replied: list[str] = []
|
|
self.state = _FakeState()
|
|
self._pstate: dict = {}
|
|
self._tasks: set[asyncio.Task] = set()
|
|
if mumble:
|
|
self.stream_audio = AsyncMock()
|
|
|
|
async def send(self, target: str, text: str) -> None:
|
|
self.sent.append((target, text))
|
|
|
|
async def reply(self, message, text: str) -> None:
|
|
self.replied.append(text)
|
|
|
|
def _is_admin(self, message) -> bool:
|
|
return False
|
|
|
|
def _spawn(self, coro, *, name=None):
|
|
task = asyncio.ensure_future(coro)
|
|
self._tasks.add(task)
|
|
task.add_done_callback(self._tasks.discard)
|
|
return task
|
|
|
|
|
|
class _Msg:
|
|
"""Minimal message object."""
|
|
|
|
def __init__(self, text="!play url", nick="Alice", target="0",
|
|
is_channel=True):
|
|
self.text = text
|
|
self.nick = nick
|
|
self.target = target
|
|
self.is_channel = is_channel
|
|
self.prefix = nick
|
|
self.command = "PRIVMSG"
|
|
self.params = [target, text]
|
|
self.tags = {}
|
|
self.raw = {}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestMumbleGuard
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMumbleGuard:
|
|
def test_is_mumble_true(self):
|
|
bot = _FakeBot(mumble=True)
|
|
assert _mod._is_mumble(bot) is True
|
|
|
|
def test_is_mumble_false(self):
|
|
bot = _FakeBot(mumble=False)
|
|
assert _mod._is_mumble(bot) is False
|
|
|
|
def test_play_non_mumble(self):
|
|
bot = _FakeBot(mumble=False)
|
|
msg = _Msg(text="!play https://example.com")
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
assert any("Mumble-only" in r for r in bot.replied)
|
|
|
|
def test_stop_non_mumble_silent(self):
|
|
bot = _FakeBot(mumble=False)
|
|
msg = _Msg(text="!stop")
|
|
asyncio.run(_mod.cmd_stop(bot, msg))
|
|
assert bot.replied == []
|
|
|
|
def test_skip_non_mumble_silent(self):
|
|
bot = _FakeBot(mumble=False)
|
|
msg = _Msg(text="!skip")
|
|
asyncio.run(_mod.cmd_skip(bot, msg))
|
|
assert bot.replied == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestPlayCommand
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPlayCommand:
|
|
def test_play_no_url(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!play")
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
assert any("Usage" in r for r in bot.replied)
|
|
|
|
def test_play_queues_track(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!play https://example.com/track")
|
|
tracks = [("https://example.com/track", "Test Track")]
|
|
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
|
with patch.object(_mod, "_ensure_loop"):
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
assert any("Playing" in r for r in bot.replied)
|
|
ps = _mod._ps(bot)
|
|
assert len(ps["queue"]) == 1
|
|
assert ps["queue"][0].title == "Test Track"
|
|
|
|
def test_play_shows_queued_when_busy(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["current"] = _mod._Track(
|
|
url="x", title="Current", requester="Bob",
|
|
)
|
|
msg = _Msg(text="!play https://example.com/next")
|
|
tracks = [("https://example.com/next", "Next Track")]
|
|
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
assert any("Queued" in r for r in bot.replied)
|
|
|
|
def test_play_queue_full(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["queue"] = [
|
|
_mod._Track(url="x", title="t", requester="a")
|
|
for _ in range(_mod._MAX_QUEUE)
|
|
]
|
|
msg = _Msg(text="!play https://example.com/overflow")
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
assert any("full" in r.lower() for r in bot.replied)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestStopCommand
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestStopCommand:
|
|
def test_stop_clears_queue(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["queue"] = [_mod._Track(url="x", title="t", requester="a")]
|
|
ps["current"] = _mod._Track(url="y", title="s", requester="b")
|
|
msg = _Msg(text="!stop")
|
|
asyncio.run(_mod.cmd_stop(bot, msg))
|
|
assert ps["queue"] == []
|
|
assert ps["current"] is None
|
|
assert any("Stopped" in r for r in bot.replied)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestSkipCommand
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSkipCommand:
|
|
def test_skip_nothing_playing(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!skip")
|
|
asyncio.run(_mod.cmd_skip(bot, msg))
|
|
assert any("Nothing" in r for r in bot.replied)
|
|
|
|
def test_skip_with_queue(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["current"] = _mod._Track(url="a", title="First", requester="x")
|
|
ps["queue"] = [_mod._Track(url="b", title="Second", requester="y")]
|
|
# We need to mock the task
|
|
mock_task = MagicMock()
|
|
mock_task.done.return_value = False
|
|
ps["task"] = mock_task
|
|
msg = _Msg(text="!skip")
|
|
with patch.object(_mod, "_ensure_loop"):
|
|
asyncio.run(_mod.cmd_skip(bot, msg))
|
|
assert any("Skipped" in r for r in bot.replied)
|
|
mock_task.cancel.assert_called_once()
|
|
|
|
def test_skip_empty_queue(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["current"] = _mod._Track(url="a", title="Only", requester="x")
|
|
mock_task = MagicMock()
|
|
mock_task.done.return_value = False
|
|
ps["task"] = mock_task
|
|
msg = _Msg(text="!skip")
|
|
asyncio.run(_mod.cmd_skip(bot, msg))
|
|
assert any("empty" in r.lower() for r in bot.replied)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestQueueCommand
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestQueueCommand:
|
|
def test_queue_empty(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!queue")
|
|
asyncio.run(_mod.cmd_queue(bot, msg))
|
|
assert any("empty" in r.lower() for r in bot.replied)
|
|
|
|
def test_queue_with_tracks(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["current"] = _mod._Track(url="a", title="Now", requester="x")
|
|
ps["queue"] = [
|
|
_mod._Track(url="b", title="Next", requester="y"),
|
|
]
|
|
msg = _Msg(text="!queue")
|
|
asyncio.run(_mod.cmd_queue(bot, msg))
|
|
assert any("Now" in r for r in bot.replied)
|
|
assert any("Next" in r for r in bot.replied)
|
|
|
|
def test_queue_with_url_delegates(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!queue https://example.com/track")
|
|
tracks = [("https://example.com/track", "Title")]
|
|
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
|
with patch.object(_mod, "_ensure_loop"):
|
|
asyncio.run(_mod.cmd_queue(bot, msg))
|
|
# Should have called cmd_play logic
|
|
assert any("Playing" in r or "Queued" in r for r in bot.replied)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestNpCommand
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestNpCommand:
|
|
def test_np_nothing(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!np")
|
|
asyncio.run(_mod.cmd_np(bot, msg))
|
|
assert any("Nothing" in r for r in bot.replied)
|
|
|
|
def test_np_playing(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["current"] = _mod._Track(
|
|
url="x", title="Cool Song", requester="DJ",
|
|
)
|
|
msg = _Msg(text="!np")
|
|
asyncio.run(_mod.cmd_np(bot, msg))
|
|
assert any("Cool Song" in r for r in bot.replied)
|
|
assert any("DJ" in r for r in bot.replied)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestVolumeCommand
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestVolumeCommand:
|
|
def test_volume_show(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!volume")
|
|
asyncio.run(_mod.cmd_volume(bot, msg))
|
|
assert any("50%" in r for r in bot.replied)
|
|
|
|
def test_volume_set(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!volume 75")
|
|
asyncio.run(_mod.cmd_volume(bot, msg))
|
|
ps = _mod._ps(bot)
|
|
assert ps["volume"] == 75
|
|
assert any("75%" in r for r in bot.replied)
|
|
|
|
def test_volume_out_of_range(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!volume 150")
|
|
asyncio.run(_mod.cmd_volume(bot, msg))
|
|
assert any("0-100" in r for r in bot.replied)
|
|
|
|
def test_volume_negative(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!volume -10")
|
|
asyncio.run(_mod.cmd_volume(bot, msg))
|
|
assert any("0-100" in r for r in bot.replied)
|
|
|
|
def test_volume_invalid(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!volume abc")
|
|
asyncio.run(_mod.cmd_volume(bot, msg))
|
|
assert any("Usage" in r for r in bot.replied)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestPerBotState
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPerBotState:
|
|
def test_ps_initializes(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
assert ps["queue"] == []
|
|
assert ps["current"] is None
|
|
assert ps["volume"] == 50
|
|
|
|
def test_ps_stable_reference(self):
|
|
bot = _FakeBot()
|
|
ps1 = _mod._ps(bot)
|
|
ps2 = _mod._ps(bot)
|
|
assert ps1 is ps2
|
|
|
|
def test_ps_isolated_per_bot(self):
|
|
bot1 = _FakeBot()
|
|
bot2 = _FakeBot()
|
|
_mod._ps(bot1)["volume"] = 80
|
|
assert _mod._ps(bot2)["volume"] == 50
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestHelpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMusicHelpers:
|
|
def test_truncate_short(self):
|
|
assert _mod._truncate("short") == "short"
|
|
|
|
def test_truncate_long(self):
|
|
long = "x" * 100
|
|
result = _mod._truncate(long)
|
|
assert len(result) == 80
|
|
assert result.endswith("...")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestPlaylistExpansion
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPlaylistExpansion:
|
|
def test_enqueue_multiple_tracks(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!play https://example.com/playlist")
|
|
tracks = [
|
|
("https://example.com/1", "Track 1"),
|
|
("https://example.com/2", "Track 2"),
|
|
("https://example.com/3", "Track 3"),
|
|
]
|
|
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
|
with patch.object(_mod, "_ensure_loop"):
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
ps = _mod._ps(bot)
|
|
assert len(ps["queue"]) == 3
|
|
assert any("Queued 3 tracks" in r for r in bot.replied)
|
|
|
|
def test_truncate_at_queue_limit(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["current"] = _mod._Track(url="x", title="Playing", requester="a")
|
|
# Fill queue to 2 slots remaining
|
|
ps["queue"] = [
|
|
_mod._Track(url="x", title="t", requester="a")
|
|
for _ in range(_mod._MAX_QUEUE - 2)
|
|
]
|
|
msg = _Msg(text="!play https://example.com/playlist")
|
|
tracks = [
|
|
("https://example.com/1", "Track 1"),
|
|
("https://example.com/2", "Track 2"),
|
|
("https://example.com/3", "Track 3"),
|
|
("https://example.com/4", "Track 4"),
|
|
]
|
|
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
assert len(ps["queue"]) == _mod._MAX_QUEUE
|
|
assert any("2 of 4" in r for r in bot.replied)
|
|
|
|
def test_start_loop_when_idle(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!play https://example.com/playlist")
|
|
tracks = [
|
|
("https://example.com/1", "Track 1"),
|
|
("https://example.com/2", "Track 2"),
|
|
]
|
|
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
|
with patch.object(_mod, "_ensure_loop") as mock_loop:
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
mock_loop.assert_called_once()
|
|
|
|
def test_no_loop_start_when_busy(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["current"] = _mod._Track(url="x", title="Current", requester="a")
|
|
msg = _Msg(text="!play https://example.com/playlist")
|
|
tracks = [
|
|
("https://example.com/1", "Track 1"),
|
|
("https://example.com/2", "Track 2"),
|
|
]
|
|
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
|
with patch.object(_mod, "_ensure_loop") as mock_loop:
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
mock_loop.assert_not_called()
|
|
|
|
def test_resolve_tracks_single_video(self):
|
|
"""Subprocess returning a single url+title pair."""
|
|
result = MagicMock()
|
|
result.stdout = "https://example.com/v1\nSingle Video\n"
|
|
with patch("subprocess.run", return_value=result):
|
|
tracks = _mod._resolve_tracks("https://example.com/v1")
|
|
assert tracks == [("https://example.com/v1", "Single Video")]
|
|
|
|
def test_resolve_tracks_playlist(self):
|
|
"""Subprocess returning multiple url+title pairs."""
|
|
result = MagicMock()
|
|
result.stdout = (
|
|
"https://example.com/1\nFirst\n"
|
|
"https://example.com/2\nSecond\n"
|
|
)
|
|
with patch("subprocess.run", return_value=result):
|
|
tracks = _mod._resolve_tracks("https://example.com/pl")
|
|
assert len(tracks) == 2
|
|
assert tracks[0] == ("https://example.com/1", "First")
|
|
assert tracks[1] == ("https://example.com/2", "Second")
|
|
|
|
def test_resolve_tracks_error_fallback(self):
|
|
"""On error, returns [(url, url)]."""
|
|
with patch("subprocess.run", side_effect=Exception("fail")):
|
|
tracks = _mod._resolve_tracks("https://example.com/bad")
|
|
assert tracks == [("https://example.com/bad", "https://example.com/bad")]
|
|
|
|
def test_resolve_tracks_empty_output(self):
|
|
"""Empty stdout returns fallback."""
|
|
result = MagicMock()
|
|
result.stdout = ""
|
|
with patch("subprocess.run", return_value=result):
|
|
tracks = _mod._resolve_tracks("https://example.com/empty")
|
|
assert tracks == [("https://example.com/empty", "https://example.com/empty")]
|