Non-URL input (e.g. !play classical music) searches YouTube for 10 results and picks one randomly. Also fixes --flat-playlist returning "NA" as the URL for single videos by falling back to the original input URL.
502 lines
17 KiB
Python
502 lines
17 KiB
Python
"""Tests for the music playback plugin."""
|
|
|
|
import asyncio
|
|
import importlib.util
|
|
import sys
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
# -- Load plugin module directly ---------------------------------------------
|
|
|
|
_spec = importlib.util.spec_from_file_location("music", "plugins/music.py")
|
|
_mod = importlib.util.module_from_spec(_spec)
|
|
sys.modules["music"] = _mod
|
|
_spec.loader.exec_module(_mod)
|
|
|
|
|
|
# -- Fakes -------------------------------------------------------------------
|
|
|
|
|
|
class _FakeState:
|
|
def __init__(self):
|
|
self._store: dict[str, dict[str, str]] = {}
|
|
|
|
def get(self, ns: str, key: str) -> str | None:
|
|
return self._store.get(ns, {}).get(key)
|
|
|
|
def set(self, ns: str, key: str, value: str) -> None:
|
|
self._store.setdefault(ns, {})[key] = value
|
|
|
|
def delete(self, ns: str, key: str) -> None:
|
|
self._store.get(ns, {}).pop(key, None)
|
|
|
|
def keys(self, ns: str) -> list[str]:
|
|
return list(self._store.get(ns, {}).keys())
|
|
|
|
|
|
class _FakeBot:
|
|
"""Minimal bot for music plugin testing."""
|
|
|
|
def __init__(self, *, mumble: bool = True):
|
|
self.sent: list[tuple[str, str]] = []
|
|
self.replied: list[str] = []
|
|
self.state = _FakeState()
|
|
self._pstate: dict = {}
|
|
self._tasks: set[asyncio.Task] = set()
|
|
if mumble:
|
|
self.stream_audio = AsyncMock()
|
|
|
|
async def send(self, target: str, text: str) -> None:
|
|
self.sent.append((target, text))
|
|
|
|
async def reply(self, message, text: str) -> None:
|
|
self.replied.append(text)
|
|
|
|
def _is_admin(self, message) -> bool:
|
|
return False
|
|
|
|
def _spawn(self, coro, *, name=None):
|
|
task = asyncio.ensure_future(coro)
|
|
self._tasks.add(task)
|
|
task.add_done_callback(self._tasks.discard)
|
|
return task
|
|
|
|
|
|
class _Msg:
|
|
"""Minimal message object."""
|
|
|
|
def __init__(self, text="!play url", nick="Alice", target="0",
|
|
is_channel=True):
|
|
self.text = text
|
|
self.nick = nick
|
|
self.target = target
|
|
self.is_channel = is_channel
|
|
self.prefix = nick
|
|
self.command = "PRIVMSG"
|
|
self.params = [target, text]
|
|
self.tags = {}
|
|
self.raw = {}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestMumbleGuard
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMumbleGuard:
|
|
def test_is_mumble_true(self):
|
|
bot = _FakeBot(mumble=True)
|
|
assert _mod._is_mumble(bot) is True
|
|
|
|
def test_is_mumble_false(self):
|
|
bot = _FakeBot(mumble=False)
|
|
assert _mod._is_mumble(bot) is False
|
|
|
|
def test_play_non_mumble(self):
|
|
bot = _FakeBot(mumble=False)
|
|
msg = _Msg(text="!play https://example.com")
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
assert any("Mumble-only" in r for r in bot.replied)
|
|
|
|
def test_stop_non_mumble_silent(self):
|
|
bot = _FakeBot(mumble=False)
|
|
msg = _Msg(text="!stop")
|
|
asyncio.run(_mod.cmd_stop(bot, msg))
|
|
assert bot.replied == []
|
|
|
|
def test_skip_non_mumble_silent(self):
|
|
bot = _FakeBot(mumble=False)
|
|
msg = _Msg(text="!skip")
|
|
asyncio.run(_mod.cmd_skip(bot, msg))
|
|
assert bot.replied == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestPlayCommand
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPlayCommand:
|
|
def test_play_no_url(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!play")
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
assert any("Usage" in r for r in bot.replied)
|
|
|
|
def test_play_queues_track(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!play https://example.com/track")
|
|
tracks = [("https://example.com/track", "Test Track")]
|
|
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
|
with patch.object(_mod, "_ensure_loop"):
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
assert any("Playing" in r for r in bot.replied)
|
|
ps = _mod._ps(bot)
|
|
assert len(ps["queue"]) == 1
|
|
assert ps["queue"][0].title == "Test Track"
|
|
|
|
def test_play_search_query(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!play classical music")
|
|
tracks = [
|
|
("https://youtube.com/watch?v=a", "Result 1"),
|
|
("https://youtube.com/watch?v=b", "Result 2"),
|
|
("https://youtube.com/watch?v=c", "Result 3"),
|
|
]
|
|
with patch.object(_mod, "_resolve_tracks", return_value=tracks) as mock_rt:
|
|
with patch.object(_mod, "_ensure_loop"):
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
# Should prepend ytsearch10: for non-URL input
|
|
mock_rt.assert_called_once()
|
|
assert mock_rt.call_args[0][0] == "ytsearch10:classical music"
|
|
# Should pick one random result, not enqueue all
|
|
ps = _mod._ps(bot)
|
|
assert len(ps["queue"]) == 1
|
|
assert any("Playing" in r for r in bot.replied)
|
|
|
|
def test_play_shows_queued_when_busy(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["current"] = _mod._Track(
|
|
url="x", title="Current", requester="Bob",
|
|
)
|
|
msg = _Msg(text="!play https://example.com/next")
|
|
tracks = [("https://example.com/next", "Next Track")]
|
|
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
assert any("Queued" in r for r in bot.replied)
|
|
|
|
def test_play_queue_full(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["queue"] = [
|
|
_mod._Track(url="x", title="t", requester="a")
|
|
for _ in range(_mod._MAX_QUEUE)
|
|
]
|
|
msg = _Msg(text="!play https://example.com/overflow")
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
assert any("full" in r.lower() for r in bot.replied)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestStopCommand
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestStopCommand:
|
|
def test_stop_clears_queue(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["queue"] = [_mod._Track(url="x", title="t", requester="a")]
|
|
ps["current"] = _mod._Track(url="y", title="s", requester="b")
|
|
msg = _Msg(text="!stop")
|
|
asyncio.run(_mod.cmd_stop(bot, msg))
|
|
assert ps["queue"] == []
|
|
assert ps["current"] is None
|
|
assert any("Stopped" in r for r in bot.replied)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestSkipCommand
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSkipCommand:
|
|
def test_skip_nothing_playing(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!skip")
|
|
asyncio.run(_mod.cmd_skip(bot, msg))
|
|
assert any("Nothing" in r for r in bot.replied)
|
|
|
|
def test_skip_with_queue(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["current"] = _mod._Track(url="a", title="First", requester="x")
|
|
ps["queue"] = [_mod._Track(url="b", title="Second", requester="y")]
|
|
# We need to mock the task
|
|
mock_task = MagicMock()
|
|
mock_task.done.return_value = False
|
|
ps["task"] = mock_task
|
|
msg = _Msg(text="!skip")
|
|
with patch.object(_mod, "_ensure_loop"):
|
|
asyncio.run(_mod.cmd_skip(bot, msg))
|
|
assert any("Skipped" in r for r in bot.replied)
|
|
mock_task.cancel.assert_called_once()
|
|
|
|
def test_skip_empty_queue(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["current"] = _mod._Track(url="a", title="Only", requester="x")
|
|
mock_task = MagicMock()
|
|
mock_task.done.return_value = False
|
|
ps["task"] = mock_task
|
|
msg = _Msg(text="!skip")
|
|
asyncio.run(_mod.cmd_skip(bot, msg))
|
|
assert any("empty" in r.lower() for r in bot.replied)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestQueueCommand
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestQueueCommand:
|
|
def test_queue_empty(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!queue")
|
|
asyncio.run(_mod.cmd_queue(bot, msg))
|
|
assert any("empty" in r.lower() for r in bot.replied)
|
|
|
|
def test_queue_with_tracks(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["current"] = _mod._Track(url="a", title="Now", requester="x")
|
|
ps["queue"] = [
|
|
_mod._Track(url="b", title="Next", requester="y"),
|
|
]
|
|
msg = _Msg(text="!queue")
|
|
asyncio.run(_mod.cmd_queue(bot, msg))
|
|
assert any("Now" in r for r in bot.replied)
|
|
assert any("Next" in r for r in bot.replied)
|
|
|
|
def test_queue_with_url_delegates(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!queue https://example.com/track")
|
|
tracks = [("https://example.com/track", "Title")]
|
|
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
|
with patch.object(_mod, "_ensure_loop"):
|
|
asyncio.run(_mod.cmd_queue(bot, msg))
|
|
# Should have called cmd_play logic
|
|
assert any("Playing" in r or "Queued" in r for r in bot.replied)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestNpCommand
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestNpCommand:
|
|
def test_np_nothing(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!np")
|
|
asyncio.run(_mod.cmd_np(bot, msg))
|
|
assert any("Nothing" in r for r in bot.replied)
|
|
|
|
def test_np_playing(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["current"] = _mod._Track(
|
|
url="x", title="Cool Song", requester="DJ",
|
|
)
|
|
msg = _Msg(text="!np")
|
|
asyncio.run(_mod.cmd_np(bot, msg))
|
|
assert any("Cool Song" in r for r in bot.replied)
|
|
assert any("DJ" in r for r in bot.replied)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestVolumeCommand
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestVolumeCommand:
|
|
def test_volume_show(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!volume")
|
|
asyncio.run(_mod.cmd_volume(bot, msg))
|
|
assert any("50%" in r for r in bot.replied)
|
|
|
|
def test_volume_set(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!volume 75")
|
|
asyncio.run(_mod.cmd_volume(bot, msg))
|
|
ps = _mod._ps(bot)
|
|
assert ps["volume"] == 75
|
|
assert any("75%" in r for r in bot.replied)
|
|
|
|
def test_volume_out_of_range(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!volume 150")
|
|
asyncio.run(_mod.cmd_volume(bot, msg))
|
|
assert any("0-100" in r for r in bot.replied)
|
|
|
|
def test_volume_negative(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!volume -10")
|
|
asyncio.run(_mod.cmd_volume(bot, msg))
|
|
assert any("0-100" in r for r in bot.replied)
|
|
|
|
def test_volume_invalid(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!volume abc")
|
|
asyncio.run(_mod.cmd_volume(bot, msg))
|
|
assert any("Usage" in r for r in bot.replied)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestPerBotState
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPerBotState:
|
|
def test_ps_initializes(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
assert ps["queue"] == []
|
|
assert ps["current"] is None
|
|
assert ps["volume"] == 50
|
|
|
|
def test_ps_stable_reference(self):
|
|
bot = _FakeBot()
|
|
ps1 = _mod._ps(bot)
|
|
ps2 = _mod._ps(bot)
|
|
assert ps1 is ps2
|
|
|
|
def test_ps_isolated_per_bot(self):
|
|
bot1 = _FakeBot()
|
|
bot2 = _FakeBot()
|
|
_mod._ps(bot1)["volume"] = 80
|
|
assert _mod._ps(bot2)["volume"] == 50
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestHelpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMusicHelpers:
|
|
def test_truncate_short(self):
|
|
assert _mod._truncate("short") == "short"
|
|
|
|
def test_truncate_long(self):
|
|
long = "x" * 100
|
|
result = _mod._truncate(long)
|
|
assert len(result) == 80
|
|
assert result.endswith("...")
|
|
|
|
def test_is_url_http(self):
|
|
assert _mod._is_url("https://youtube.com/watch?v=abc") is True
|
|
|
|
def test_is_url_plain_http(self):
|
|
assert _mod._is_url("http://example.com") is True
|
|
|
|
def test_is_url_ytsearch(self):
|
|
assert _mod._is_url("ytsearch:classical music") is True
|
|
|
|
def test_is_url_search_query(self):
|
|
assert _mod._is_url("classical music") is False
|
|
|
|
def test_is_url_single_word(self):
|
|
assert _mod._is_url("jazz") is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestPlaylistExpansion
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPlaylistExpansion:
|
|
def test_enqueue_multiple_tracks(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!play https://example.com/playlist")
|
|
tracks = [
|
|
("https://example.com/1", "Track 1"),
|
|
("https://example.com/2", "Track 2"),
|
|
("https://example.com/3", "Track 3"),
|
|
]
|
|
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
|
with patch.object(_mod, "_ensure_loop"):
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
ps = _mod._ps(bot)
|
|
assert len(ps["queue"]) == 3
|
|
assert any("Queued 3 tracks" in r for r in bot.replied)
|
|
|
|
def test_truncate_at_queue_limit(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["current"] = _mod._Track(url="x", title="Playing", requester="a")
|
|
# Fill queue to 2 slots remaining
|
|
ps["queue"] = [
|
|
_mod._Track(url="x", title="t", requester="a")
|
|
for _ in range(_mod._MAX_QUEUE - 2)
|
|
]
|
|
msg = _Msg(text="!play https://example.com/playlist")
|
|
tracks = [
|
|
("https://example.com/1", "Track 1"),
|
|
("https://example.com/2", "Track 2"),
|
|
("https://example.com/3", "Track 3"),
|
|
("https://example.com/4", "Track 4"),
|
|
]
|
|
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
assert len(ps["queue"]) == _mod._MAX_QUEUE
|
|
assert any("2 of 4" in r for r in bot.replied)
|
|
|
|
def test_start_loop_when_idle(self):
|
|
bot = _FakeBot()
|
|
msg = _Msg(text="!play https://example.com/playlist")
|
|
tracks = [
|
|
("https://example.com/1", "Track 1"),
|
|
("https://example.com/2", "Track 2"),
|
|
]
|
|
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
|
with patch.object(_mod, "_ensure_loop") as mock_loop:
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
mock_loop.assert_called_once()
|
|
|
|
def test_no_loop_start_when_busy(self):
|
|
bot = _FakeBot()
|
|
ps = _mod._ps(bot)
|
|
ps["current"] = _mod._Track(url="x", title="Current", requester="a")
|
|
msg = _Msg(text="!play https://example.com/playlist")
|
|
tracks = [
|
|
("https://example.com/1", "Track 1"),
|
|
("https://example.com/2", "Track 2"),
|
|
]
|
|
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
|
with patch.object(_mod, "_ensure_loop") as mock_loop:
|
|
asyncio.run(_mod.cmd_play(bot, msg))
|
|
mock_loop.assert_not_called()
|
|
|
|
def test_resolve_tracks_single_video(self):
|
|
"""Subprocess returning a single url+title pair."""
|
|
result = MagicMock()
|
|
result.stdout = "https://example.com/v1\nSingle Video\n"
|
|
with patch("subprocess.run", return_value=result):
|
|
tracks = _mod._resolve_tracks("https://example.com/v1")
|
|
assert tracks == [("https://example.com/v1", "Single Video")]
|
|
|
|
def test_resolve_tracks_na_url_fallback(self):
|
|
"""--flat-playlist prints NA for single videos; use original URL."""
|
|
result = MagicMock()
|
|
result.stdout = "NA\nSingle Video\n"
|
|
with patch("subprocess.run", return_value=result):
|
|
tracks = _mod._resolve_tracks("https://example.com/v1")
|
|
assert tracks == [("https://example.com/v1", "Single Video")]
|
|
|
|
def test_resolve_tracks_playlist(self):
|
|
"""Subprocess returning multiple url+title pairs."""
|
|
result = MagicMock()
|
|
result.stdout = (
|
|
"https://example.com/1\nFirst\n"
|
|
"https://example.com/2\nSecond\n"
|
|
)
|
|
with patch("subprocess.run", return_value=result):
|
|
tracks = _mod._resolve_tracks("https://example.com/pl")
|
|
assert len(tracks) == 2
|
|
assert tracks[0] == ("https://example.com/1", "First")
|
|
assert tracks[1] == ("https://example.com/2", "Second")
|
|
|
|
def test_resolve_tracks_error_fallback(self):
|
|
"""On error, returns [(url, url)]."""
|
|
with patch("subprocess.run", side_effect=Exception("fail")):
|
|
tracks = _mod._resolve_tracks("https://example.com/bad")
|
|
assert tracks == [("https://example.com/bad", "https://example.com/bad")]
|
|
|
|
def test_resolve_tracks_empty_output(self):
|
|
"""Empty stdout returns fallback."""
|
|
result = MagicMock()
|
|
result.stdout = ""
|
|
with patch("subprocess.run", return_value=result):
|
|
tracks = _mod._resolve_tracks("https://example.com/empty")
|
|
assert tracks == [("https://example.com/empty", "https://example.com/empty")]
|