"""Tests for the music playback plugin.""" import asyncio import importlib.util import sys from unittest.mock import AsyncMock, MagicMock, patch # -- Load plugin module directly --------------------------------------------- _spec = importlib.util.spec_from_file_location("music", "plugins/music.py") _mod = importlib.util.module_from_spec(_spec) sys.modules["music"] = _mod _spec.loader.exec_module(_mod) # -- Fakes ------------------------------------------------------------------- class _FakeState: def __init__(self): self._store: dict[str, dict[str, str]] = {} def get(self, ns: str, key: str) -> str | None: return self._store.get(ns, {}).get(key) def set(self, ns: str, key: str, value: str) -> None: self._store.setdefault(ns, {})[key] = value def delete(self, ns: str, key: str) -> None: self._store.get(ns, {}).pop(key, None) def keys(self, ns: str) -> list[str]: return list(self._store.get(ns, {}).keys()) class _FakeBot: """Minimal bot for music plugin testing.""" def __init__(self, *, mumble: bool = True): self.sent: list[tuple[str, str]] = [] self.replied: list[str] = [] self.state = _FakeState() self._pstate: dict = {} self._tasks: set[asyncio.Task] = set() if mumble: self.stream_audio = AsyncMock() async def send(self, target: str, text: str) -> None: self.sent.append((target, text)) async def reply(self, message, text: str) -> None: self.replied.append(text) def _is_admin(self, message) -> bool: return False def _spawn(self, coro, *, name=None): task = asyncio.ensure_future(coro) self._tasks.add(task) task.add_done_callback(self._tasks.discard) return task class _Msg: """Minimal message object.""" def __init__(self, text="!play url", nick="Alice", target="0", is_channel=True): self.text = text self.nick = nick self.target = target self.is_channel = is_channel self.prefix = nick self.command = "PRIVMSG" self.params = [target, text] self.tags = {} self.raw = {} # --------------------------------------------------------------------------- # TestMumbleGuard # --------------------------------------------------------------------------- class TestMumbleGuard: def test_is_mumble_true(self): bot = _FakeBot(mumble=True) assert _mod._is_mumble(bot) is True def test_is_mumble_false(self): bot = _FakeBot(mumble=False) assert _mod._is_mumble(bot) is False def test_play_non_mumble(self): bot = _FakeBot(mumble=False) msg = _Msg(text="!play https://example.com") asyncio.run(_mod.cmd_play(bot, msg)) assert any("Mumble-only" in r for r in bot.replied) def test_stop_non_mumble_silent(self): bot = _FakeBot(mumble=False) msg = _Msg(text="!stop") asyncio.run(_mod.cmd_stop(bot, msg)) assert bot.replied == [] def test_skip_non_mumble_silent(self): bot = _FakeBot(mumble=False) msg = _Msg(text="!skip") asyncio.run(_mod.cmd_skip(bot, msg)) assert bot.replied == [] # --------------------------------------------------------------------------- # TestPlayCommand # --------------------------------------------------------------------------- class TestPlayCommand: def test_play_no_url(self): bot = _FakeBot() msg = _Msg(text="!play") asyncio.run(_mod.cmd_play(bot, msg)) assert any("Usage" in r for r in bot.replied) def test_play_queues_track(self): bot = _FakeBot() msg = _Msg(text="!play https://example.com/track") tracks = [("https://example.com/track", "Test Track")] with patch.object(_mod, "_resolve_tracks", return_value=tracks): with patch.object(_mod, "_ensure_loop"): asyncio.run(_mod.cmd_play(bot, msg)) assert any("Playing" in r for r in bot.replied) ps = _mod._ps(bot) assert len(ps["queue"]) == 1 assert ps["queue"][0].title == "Test Track" def test_play_search_query(self): bot = _FakeBot() msg = _Msg(text="!play classical music") tracks = [ ("https://youtube.com/watch?v=a", "Result 1"), ("https://youtube.com/watch?v=b", "Result 2"), ("https://youtube.com/watch?v=c", "Result 3"), ] with patch.object(_mod, "_resolve_tracks", return_value=tracks) as mock_rt: with patch.object(_mod, "_ensure_loop"): asyncio.run(_mod.cmd_play(bot, msg)) # Should prepend ytsearch10: for non-URL input mock_rt.assert_called_once() assert mock_rt.call_args[0][0] == "ytsearch10:classical music" # Should pick one random result, not enqueue all ps = _mod._ps(bot) assert len(ps["queue"]) == 1 assert any("Playing" in r for r in bot.replied) def test_play_shows_queued_when_busy(self): bot = _FakeBot() ps = _mod._ps(bot) ps["current"] = _mod._Track( url="x", title="Current", requester="Bob", ) msg = _Msg(text="!play https://example.com/next") tracks = [("https://example.com/next", "Next Track")] with patch.object(_mod, "_resolve_tracks", return_value=tracks): asyncio.run(_mod.cmd_play(bot, msg)) assert any("Queued" in r for r in bot.replied) def test_play_queue_full(self): bot = _FakeBot() ps = _mod._ps(bot) ps["queue"] = [ _mod._Track(url="x", title="t", requester="a") for _ in range(_mod._MAX_QUEUE) ] msg = _Msg(text="!play https://example.com/overflow") asyncio.run(_mod.cmd_play(bot, msg)) assert any("full" in r.lower() for r in bot.replied) # --------------------------------------------------------------------------- # TestStopCommand # --------------------------------------------------------------------------- class TestStopCommand: def test_stop_clears_queue(self): bot = _FakeBot() ps = _mod._ps(bot) ps["queue"] = [_mod._Track(url="x", title="t", requester="a")] ps["current"] = _mod._Track(url="y", title="s", requester="b") msg = _Msg(text="!stop") asyncio.run(_mod.cmd_stop(bot, msg)) assert ps["queue"] == [] assert ps["current"] is None assert any("Stopped" in r for r in bot.replied) # --------------------------------------------------------------------------- # TestSkipCommand # --------------------------------------------------------------------------- class TestSkipCommand: def test_skip_nothing_playing(self): bot = _FakeBot() msg = _Msg(text="!skip") asyncio.run(_mod.cmd_skip(bot, msg)) assert any("Nothing" in r for r in bot.replied) def test_skip_with_queue(self): bot = _FakeBot() ps = _mod._ps(bot) ps["current"] = _mod._Track(url="a", title="First", requester="x") ps["queue"] = [_mod._Track(url="b", title="Second", requester="y")] # We need to mock the task mock_task = MagicMock() mock_task.done.return_value = False ps["task"] = mock_task msg = _Msg(text="!skip") with patch.object(_mod, "_ensure_loop"): asyncio.run(_mod.cmd_skip(bot, msg)) assert any("Skipped" in r for r in bot.replied) mock_task.cancel.assert_called_once() def test_skip_empty_queue(self): bot = _FakeBot() ps = _mod._ps(bot) ps["current"] = _mod._Track(url="a", title="Only", requester="x") mock_task = MagicMock() mock_task.done.return_value = False ps["task"] = mock_task msg = _Msg(text="!skip") asyncio.run(_mod.cmd_skip(bot, msg)) assert any("empty" in r.lower() for r in bot.replied) # --------------------------------------------------------------------------- # TestQueueCommand # --------------------------------------------------------------------------- class TestQueueCommand: def test_queue_empty(self): bot = _FakeBot() msg = _Msg(text="!queue") asyncio.run(_mod.cmd_queue(bot, msg)) assert any("empty" in r.lower() for r in bot.replied) def test_queue_with_tracks(self): bot = _FakeBot() ps = _mod._ps(bot) ps["current"] = _mod._Track(url="a", title="Now", requester="x") ps["queue"] = [ _mod._Track(url="b", title="Next", requester="y"), ] msg = _Msg(text="!queue") asyncio.run(_mod.cmd_queue(bot, msg)) assert any("Now" in r for r in bot.replied) assert any("Next" in r for r in bot.replied) def test_queue_with_url_delegates(self): bot = _FakeBot() msg = _Msg(text="!queue https://example.com/track") tracks = [("https://example.com/track", "Title")] with patch.object(_mod, "_resolve_tracks", return_value=tracks): with patch.object(_mod, "_ensure_loop"): asyncio.run(_mod.cmd_queue(bot, msg)) # Should have called cmd_play logic assert any("Playing" in r or "Queued" in r for r in bot.replied) # --------------------------------------------------------------------------- # TestNpCommand # --------------------------------------------------------------------------- class TestNpCommand: def test_np_nothing(self): bot = _FakeBot() msg = _Msg(text="!np") asyncio.run(_mod.cmd_np(bot, msg)) assert any("Nothing" in r for r in bot.replied) def test_np_playing(self): bot = _FakeBot() ps = _mod._ps(bot) ps["current"] = _mod._Track( url="x", title="Cool Song", requester="DJ", ) msg = _Msg(text="!np") asyncio.run(_mod.cmd_np(bot, msg)) assert any("Cool Song" in r for r in bot.replied) assert any("DJ" in r for r in bot.replied) # --------------------------------------------------------------------------- # TestVolumeCommand # --------------------------------------------------------------------------- class TestVolumeCommand: def test_volume_show(self): bot = _FakeBot() msg = _Msg(text="!volume") asyncio.run(_mod.cmd_volume(bot, msg)) assert any("50%" in r for r in bot.replied) def test_volume_set(self): bot = _FakeBot() msg = _Msg(text="!volume 75") asyncio.run(_mod.cmd_volume(bot, msg)) ps = _mod._ps(bot) assert ps["volume"] == 75 assert any("75%" in r for r in bot.replied) def test_volume_out_of_range(self): bot = _FakeBot() msg = _Msg(text="!volume 150") asyncio.run(_mod.cmd_volume(bot, msg)) assert any("0-100" in r for r in bot.replied) def test_volume_negative(self): bot = _FakeBot() msg = _Msg(text="!volume -10") asyncio.run(_mod.cmd_volume(bot, msg)) assert any("0-100" in r for r in bot.replied) def test_volume_invalid(self): bot = _FakeBot() msg = _Msg(text="!volume abc") asyncio.run(_mod.cmd_volume(bot, msg)) assert any("Usage" in r for r in bot.replied) # --------------------------------------------------------------------------- # TestPerBotState # --------------------------------------------------------------------------- class TestPerBotState: def test_ps_initializes(self): bot = _FakeBot() ps = _mod._ps(bot) assert ps["queue"] == [] assert ps["current"] is None assert ps["volume"] == 50 def test_ps_stable_reference(self): bot = _FakeBot() ps1 = _mod._ps(bot) ps2 = _mod._ps(bot) assert ps1 is ps2 def test_ps_isolated_per_bot(self): bot1 = _FakeBot() bot2 = _FakeBot() _mod._ps(bot1)["volume"] = 80 assert _mod._ps(bot2)["volume"] == 50 # --------------------------------------------------------------------------- # TestHelpers # --------------------------------------------------------------------------- class TestMusicHelpers: def test_truncate_short(self): assert _mod._truncate("short") == "short" def test_truncate_long(self): long = "x" * 100 result = _mod._truncate(long) assert len(result) == 80 assert result.endswith("...") def test_is_url_http(self): assert _mod._is_url("https://youtube.com/watch?v=abc") is True def test_is_url_plain_http(self): assert _mod._is_url("http://example.com") is True def test_is_url_ytsearch(self): assert _mod._is_url("ytsearch:classical music") is True def test_is_url_search_query(self): assert _mod._is_url("classical music") is False def test_is_url_single_word(self): assert _mod._is_url("jazz") is False # --------------------------------------------------------------------------- # TestPlaylistExpansion # --------------------------------------------------------------------------- class TestPlaylistExpansion: def test_enqueue_multiple_tracks(self): bot = _FakeBot() msg = _Msg(text="!play https://example.com/playlist") tracks = [ ("https://example.com/1", "Track 1"), ("https://example.com/2", "Track 2"), ("https://example.com/3", "Track 3"), ] with patch.object(_mod, "_resolve_tracks", return_value=tracks): with patch.object(_mod, "_ensure_loop"): asyncio.run(_mod.cmd_play(bot, msg)) ps = _mod._ps(bot) assert len(ps["queue"]) == 3 assert any("Queued 3 tracks" in r for r in bot.replied) def test_truncate_at_queue_limit(self): bot = _FakeBot() ps = _mod._ps(bot) ps["current"] = _mod._Track(url="x", title="Playing", requester="a") # Fill queue to 2 slots remaining ps["queue"] = [ _mod._Track(url="x", title="t", requester="a") for _ in range(_mod._MAX_QUEUE - 2) ] msg = _Msg(text="!play https://example.com/playlist") tracks = [ ("https://example.com/1", "Track 1"), ("https://example.com/2", "Track 2"), ("https://example.com/3", "Track 3"), ("https://example.com/4", "Track 4"), ] with patch.object(_mod, "_resolve_tracks", return_value=tracks): asyncio.run(_mod.cmd_play(bot, msg)) assert len(ps["queue"]) == _mod._MAX_QUEUE assert any("2 of 4" in r for r in bot.replied) def test_start_loop_when_idle(self): bot = _FakeBot() msg = _Msg(text="!play https://example.com/playlist") tracks = [ ("https://example.com/1", "Track 1"), ("https://example.com/2", "Track 2"), ] with patch.object(_mod, "_resolve_tracks", return_value=tracks): with patch.object(_mod, "_ensure_loop") as mock_loop: asyncio.run(_mod.cmd_play(bot, msg)) mock_loop.assert_called_once() def test_no_loop_start_when_busy(self): bot = _FakeBot() ps = _mod._ps(bot) ps["current"] = _mod._Track(url="x", title="Current", requester="a") msg = _Msg(text="!play https://example.com/playlist") tracks = [ ("https://example.com/1", "Track 1"), ("https://example.com/2", "Track 2"), ] with patch.object(_mod, "_resolve_tracks", return_value=tracks): with patch.object(_mod, "_ensure_loop") as mock_loop: asyncio.run(_mod.cmd_play(bot, msg)) mock_loop.assert_not_called() def test_resolve_tracks_single_video(self): """Subprocess returning a single url+title pair.""" result = MagicMock() result.stdout = "https://example.com/v1\nSingle Video\n" with patch("subprocess.run", return_value=result): tracks = _mod._resolve_tracks("https://example.com/v1") assert tracks == [("https://example.com/v1", "Single Video")] def test_resolve_tracks_na_url_fallback(self): """--flat-playlist prints NA for single videos; use original URL.""" result = MagicMock() result.stdout = "NA\nSingle Video\n" with patch("subprocess.run", return_value=result): tracks = _mod._resolve_tracks("https://example.com/v1") assert tracks == [("https://example.com/v1", "Single Video")] def test_resolve_tracks_playlist(self): """Subprocess returning multiple url+title pairs.""" result = MagicMock() result.stdout = ( "https://example.com/1\nFirst\n" "https://example.com/2\nSecond\n" ) with patch("subprocess.run", return_value=result): tracks = _mod._resolve_tracks("https://example.com/pl") assert len(tracks) == 2 assert tracks[0] == ("https://example.com/1", "First") assert tracks[1] == ("https://example.com/2", "Second") def test_resolve_tracks_error_fallback(self): """On error, returns [(url, url)].""" with patch("subprocess.run", side_effect=Exception("fail")): tracks = _mod._resolve_tracks("https://example.com/bad") assert tracks == [("https://example.com/bad", "https://example.com/bad")] def test_resolve_tracks_empty_output(self): """Empty stdout returns fallback.""" result = MagicMock() result.stdout = "" with patch("subprocess.run", return_value=result): tracks = _mod._resolve_tracks("https://example.com/empty") assert tracks == [("https://example.com/empty", "https://example.com/empty")]