diff --git a/plugins/lastfm.py b/plugins/lastfm.py index 9c88704..40676bb 100644 --- a/plugins/lastfm.py +++ b/plugins/lastfm.py @@ -105,26 +105,41 @@ def _parse_title(raw_title: str) -> tuple[str, str]: return ("", raw_title) -def _current_meta(bot) -> tuple[str, str]: - """Extract artist and title from the currently playing track. +def _music_bot(bot): + """Return the bot instance that owns music playback. - Returns (artist, title). Either or both may be empty. - Tries the music plugin's current track metadata on this bot first, - then checks peer bots (shared registry) so extra bots can see what - the music bot is playing. + Checks the calling bot first, then peer bots via the shared registry. + Returns the first bot with an active music state, or ``bot`` as fallback. """ - # Check this bot first, then peers candidates = [bot] for peer in getattr(getattr(bot, "registry", None), "_bots", {}).values(): if peer is not bot: candidates.append(peer) for b in candidates: music_ps = getattr(b, "_pstate", {}).get("music", {}) - current = music_ps.get("current") - if current is not None: - raw_title = current.title or "" - if raw_title: - return _parse_title(raw_title) + if music_ps.get("current") is not None or music_ps.get("queue"): + return b + # No active music state -- prefer a bot that allows the music plugin + for b in candidates: + only = getattr(b, "_only_plugins", None) + if only is not None and "music" in only: + return b + return bot + + +def _current_meta(bot) -> tuple[str, str]: + """Extract artist and title from the currently playing track. + + Returns (artist, title). Either or both may be empty. + Checks the music bot (via ``_music_bot``) for now-playing metadata. + """ + mb = _music_bot(bot) + music_ps = getattr(mb, "_pstate", {}).get("music", {}) + current = music_ps.get("current") + if current is not None: + raw_title = current.title or "" + if raw_title: + return _parse_title(raw_title) return ("", "") @@ -410,13 +425,14 @@ async def cmd_similar(bot, message): await bot.reply(message, "No playable tracks resolved") return - # Transition: fade out current, load new playlist - ps = music_mod._ps(bot) - await music_mod._fade_and_cancel(bot, duration=3.0) + # Transition on the music bot (derp), not the calling bot (may be merlin) + dj = _music_bot(bot) + ps = music_mod._ps(dj) + await music_mod._fade_and_cancel(dj, duration=3.0) ps["queue"].clear() ps["current"] = None ps["queue"] = list(tracks) - music_mod._ensure_loop(bot, fade_in=True) + music_mod._ensure_loop(dj, fade_in=True) await bot.reply(message, f"Playing {len(tracks)} similar tracks for {search_artist}") diff --git a/tests/test_lastfm.py b/tests/test_lastfm.py index b807edd..9c586b1 100644 --- a/tests/test_lastfm.py +++ b/tests/test_lastfm.py @@ -2,7 +2,6 @@ import asyncio import importlib.util -import json import sys from dataclasses import dataclass from unittest.mock import AsyncMock, MagicMock, patch @@ -21,14 +20,17 @@ _spec.loader.exec_module(_mod) class _FakeRegistry: def __init__(self): self._modules: dict = {} + self._bots: dict = {} class _FakeBot: - def __init__(self, *, api_key: str = "test-key"): + def __init__(self, *, api_key: str = "test-key", name: str = "derp"): self.replied: list[str] = [] self.config: dict = {"lastfm": {"api_key": api_key}} if api_key else {} self._pstate: dict = {} + self._only_plugins: set[str] | None = None self.registry = _FakeRegistry() + self._username = name async def reply(self, message, text: str) -> None: self.replied.append(text) @@ -615,6 +617,47 @@ class TestCmdSimilar: asyncio.run(_mod.cmd_similar(bot, msg)) assert any("Playing 1 similar" in r for r in bot.replied) + def test_cross_bot_delegates_to_music_bot(self): + """When merlin runs !similar, playback starts on derp (music bot).""" + # derp = music bot (has active music state) + derp = _FakeBot(api_key="test-key", name="derp") + derp._only_plugins = {"music", "voice"} + derp._pstate["music"] = { + "current": _FakeTrack(title="Tool - Lateralus"), + "queue": [], + } + # merlin = calling bot (no music plugin) + merlin = _FakeBot(api_key="test-key", name="merlin") + shared_reg = _FakeRegistry() + shared_reg._bots = {"derp": derp, "merlin": merlin} + derp.registry = shared_reg + merlin.registry = shared_reg + + artists = [{"name": "Deftones", "match": "0.8"}] + fake_tracks = [_FakeTrack(url="http://yt/1", title="Song 1")] + + music_mod = MagicMock() + music_mod._ps.return_value = { + "queue": [], "current": None, "task": None, + } + music_mod._fade_and_cancel = AsyncMock() + music_mod._ensure_loop = MagicMock() + shared_reg._modules["music"] = music_mod + + msg = _Msg(text="!similar Tool") + with patch.object(_mod, "_get_similar_tracks", return_value=[]), \ + patch.object(_mod, "_get_similar_artists", return_value=artists), \ + patch.object(_mod, "_resolve_playlist", + return_value=fake_tracks): + asyncio.run(_mod.cmd_similar(merlin, msg)) + + # Music ops must target derp, not merlin + music_mod._fade_and_cancel.assert_called_once_with(derp, duration=3.0) + music_mod._ensure_loop.assert_called_once_with(derp, fade_in=True) + music_mod._ps.assert_called_with(derp) + # Reply still goes through merlin (the bot the user talked to) + assert any("Playing 1 similar" in r for r in merlin.replied) + # --------------------------------------------------------------------------- # TestCmdTags @@ -694,6 +737,46 @@ class TestCmdTags: assert any("Lateralus:" in r for r in bot.replied) +# --------------------------------------------------------------------------- +# TestMusicBot +# --------------------------------------------------------------------------- + + +class TestMusicBot: + def test_returns_self_when_active(self): + """Returns calling bot when it has active music state.""" + bot = _FakeBot() + bot._pstate["music"] = {"current": _FakeTrack(title="X"), "queue": []} + assert _mod._music_bot(bot) is bot + + def test_returns_peer_with_active_state(self): + """Returns peer bot that has music playing.""" + derp = _FakeBot(name="derp") + derp._pstate["music"] = {"current": _FakeTrack(title="X"), "queue": []} + merlin = _FakeBot(name="merlin") + reg = _FakeRegistry() + reg._bots = {"derp": derp, "merlin": merlin} + derp.registry = reg + merlin.registry = reg + assert _mod._music_bot(merlin) is derp + + def test_falls_back_to_only_plugins(self): + """Returns bot with only_plugins containing 'music' when no active state.""" + derp = _FakeBot(name="derp") + derp._only_plugins = {"music", "voice"} + merlin = _FakeBot(name="merlin") + reg = _FakeRegistry() + reg._bots = {"derp": derp, "merlin": merlin} + derp.registry = reg + merlin.registry = reg + assert _mod._music_bot(merlin) is derp + + def test_returns_self_as_last_resort(self): + """Returns calling bot when no peer has music state or filters.""" + bot = _FakeBot() + assert _mod._music_bot(bot) is bot + + # --------------------------------------------------------------------------- # TestParseTitle # ---------------------------------------------------------------------------