diff --git a/plugins/_musicbrainz.py b/plugins/_musicbrainz.py new file mode 100644 index 0000000..a529b9a --- /dev/null +++ b/plugins/_musicbrainz.py @@ -0,0 +1,118 @@ +"""MusicBrainz API helper for music discovery fallback. + +Private module (underscore prefix) -- plugin loader skips it. +All functions are blocking; callers should run them in an executor. +""" + +from __future__ import annotations + +import json +import logging +import time +from urllib.request import Request + +log = logging.getLogger(__name__) + +_BASE = "https://musicbrainz.org/ws/2" +_UA = "derp-bot/2.0.0 (https://git.mymx.me/username/derp)" + +# Rate limit: MusicBrainz requires max 1 request/second. +# We use 1.1s between calls to stay well within limits. +_RATE_INTERVAL = 1.1 +_last_request: float = 0.0 + + +def _mb_request(path: str, params: dict | None = None) -> dict: + """Rate-limited GET to MusicBrainz API. Blocking.""" + global _last_request + from derp.http import urlopen + + elapsed = time.monotonic() - _last_request + if elapsed < _RATE_INTERVAL: + time.sleep(_RATE_INTERVAL - elapsed) + + qs = "&".join(f"{k}={v}" for k, v in (params or {}).items()) + url = f"{_BASE}/{path}?fmt=json&{qs}" if qs else f"{_BASE}/{path}?fmt=json" + req = Request(url, headers={"User-Agent": _UA}) + + try: + resp = urlopen(req, timeout=10, proxy=False) + _last_request = time.monotonic() + return json.loads(resp.read().decode()) + except Exception: + _last_request = time.monotonic() + log.warning("musicbrainz: request failed: %s", path, exc_info=True) + return {} + + +def mb_search_artist(name: str) -> str | None: + """Search for an artist by name, return MBID or None.""" + from urllib.parse import quote + + data = _mb_request("artist", {"query": quote(name), "limit": "1"}) + artists = data.get("artists", []) + if not artists: + return None + # Require a reasonable score to avoid false matches + score = artists[0].get("score", 0) + if score < 50: + return None + return artists[0].get("id") + + +def mb_artist_tags(mbid: str) -> list[str]: + """Fetch top 5 tags for an artist by MBID.""" + data = _mb_request(f"artist/{mbid}", {"inc": "tags"}) + tags = data.get("tags", []) + if not tags: + return [] + # Sort by count descending, take top 5 + sorted_tags = sorted(tags, key=lambda t: t.get("count", 0), reverse=True) + return [t["name"] for t in sorted_tags[:5] if t.get("name")] + + +def mb_find_similar_recordings(artist: str, tags: list[str], + limit: int = 10) -> list[dict]: + """Find recordings by other artists sharing top tags. + + Searches MusicBrainz for recordings tagged with the top 2 tags, + excluding the original artist. Returns [{"artist": str, "title": str}]. + """ + from urllib.parse import quote + + if not tags: + return [] + + # Use top 2 tags for the query + tag_query = " AND ".join(f'tag:"{t}"' for t in tags[:2]) + query = f'({tag_query}) AND NOT artist:"{artist}"' + + data = _mb_request("recording", { + "query": quote(query), + "limit": str(limit), + }) + recordings = data.get("recordings", []) + if not recordings: + return [] + + seen = set() + results = [] + for rec in recordings: + title = rec.get("title", "") + credits = rec.get("artist-credit", []) + if not credits or not title: + continue + rec_artist = credits[0].get("name", "") if credits else "" + if not rec_artist: + continue + # Skip the original artist (case-insensitive) + if rec_artist.lower() == artist.lower(): + continue + # Deduplicate by artist+title + key = f"{rec_artist.lower()}:{title.lower()}" + if key in seen: + continue + seen.add(key) + results.append({"artist": rec_artist, "title": title}) + + return results diff --git a/plugins/lastfm.py b/plugins/lastfm.py index 28b37d4..94c36cb 100644 --- a/plugins/lastfm.py +++ b/plugins/lastfm.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import json import logging import os @@ -91,6 +92,19 @@ def _search_track(api_key: str, query: str, # -- Metadata extraction ----------------------------------------------------- +def _parse_title(raw_title: str) -> tuple[str, str]: + """Split a raw track title into (artist, title). + + Tries common separators: `` - ``, `` -- ``, `` | ``, `` ~ ``. + Returns ``("", raw_title)`` if no separator is found. + """ + for sep in (" - ", " -- ", " | ", " ~ "): + if sep in raw_title: + parts = raw_title.split(sep, 1) + return (parts[0].strip(), parts[1].strip()) + return ("", raw_title) + + def _current_meta(bot) -> tuple[str, str]: """Extract artist and title from the currently playing track. @@ -103,15 +117,60 @@ def _current_meta(bot) -> tuple[str, str]: if current is None: return ("", "") raw_title = current.title or "" + return _parse_title(raw_title) - # Try common "Artist - Title" patterns - for sep in (" - ", " -- ", " | ", " ~ "): - if sep in raw_title: - parts = raw_title.split(sep, 1) - return (parts[0].strip(), parts[1].strip()) - # No separator -- treat whole thing as a search query - return ("", raw_title) +# -- Discovery orchestrator -------------------------------------------------- + + +async def discover_similar(bot, last_track_title: str) -> tuple[str, str] | None: + """Find a similar track via Last.fm or MusicBrainz fallback. + + Returns ``(artist, title)`` or ``None`` if nothing found. + """ + artist, title = _parse_title(last_track_title) + loop = asyncio.get_running_loop() + + # -- Last.fm path -- + api_key = _get_api_key(bot) + if api_key and artist: + try: + similar = await loop.run_in_executor( + None, _get_similar_tracks, api_key, artist, title, 20, + ) + if similar: + pick = random.choice(similar) + pick_artist = pick.get("artist", {}).get("name", "") + pick_title = pick.get("name", "") + if pick_artist and pick_title: + return (pick_artist, pick_title) + except Exception: + log.warning("lastfm: discover via Last.fm failed", exc_info=True) + + # -- MusicBrainz fallback -- + if artist: + try: + from plugins._musicbrainz import ( + mb_artist_tags, + mb_find_similar_recordings, + mb_search_artist, + ) + + mbid = await loop.run_in_executor(None, mb_search_artist, artist) + if mbid: + tags = await loop.run_in_executor(None, mb_artist_tags, mbid) + if tags: + picks = await loop.run_in_executor( + None, mb_find_similar_recordings, artist, tags, 20, + ) + if picks: + pick = random.choice(picks) + return (pick["artist"], pick["title"]) + except Exception: + log.warning("lastfm: discover via MusicBrainz failed", + exc_info=True) + + return None # -- Formatting -------------------------------------------------------------- diff --git a/plugins/music.py b/plugins/music.py index 66cede7..83b0b62 100644 --- a/plugins/music.py +++ b/plugins/music.py @@ -58,6 +58,8 @@ def _ps(bot): "history": [], "autoplay": cfg.get("autoplay", True), "autoplay_cooldown": cfg.get("autoplay_cooldown", 30), + "discover": cfg.get("discover", True), + "discover_ratio": cfg.get("discover_ratio", 3), "announce": cfg.get("announce", False), "paused": None, "_watcher_task": None, @@ -575,17 +577,64 @@ async def _play_loop(bot, *, seek: float = 0.0, fade_in: float | bool = True) -> seek_req = [None] ps["seek_req"] = seek_req _autoplay_pool: list[_Track] = [] # shuffled deck, refilled each cycle + _discover_seen: set[str] = set() # "artist:title" dedup within session + _autoplay_count: int = 0 # autoplay picks since loop start try: while ps["queue"] or ps.get("autoplay"): # Autoplay: cooldown + silence wait, then pick next from shuffled deck if not ps["queue"]: - if not _autoplay_pool: - kept = _load_kept_tracks(bot) - if not kept: - break - random.shuffle(kept) - _autoplay_pool = kept - log.info("music: autoplay shuffled %d kept tracks", len(kept)) + _autoplay_count += 1 + + # -- Discovery attempt on every Nth autoplay pick -- + discovered = False + ratio = ps.get("discover_ratio", 3) + if (ps.get("discover") and ratio > 0 + and _autoplay_count % ratio == 0 + and ps["history"]): + last = ps["history"][-1] + try: + lfm = bot.registry._modules.get("lastfm") + if lfm and hasattr(lfm, "discover_similar"): + pair = await lfm.discover_similar(bot, last.title) + if pair: + a, t = pair + key = f"{a.lower()}:{t.lower()}" + if key not in _discover_seen: + _discover_seen.add(key) + loop = asyncio.get_running_loop() + res = await loop.run_in_executor( + None, _resolve_tracks, + f"{a} {t}", 1, + ) + if res: + discovered = True + pick = _Track( + url=res[0][0], title=res[0][1], + requester="discover", + ) + log.info( + "music: discovered '%s' " + "similar to '%s'", + pick.title, last.title, + ) + except Exception: + log.warning( + "music: discovery failed, using kept deck", + exc_info=True, + ) + + # -- Kept-deck fallback -- + if not discovered: + if not _autoplay_pool: + kept = _load_kept_tracks(bot) + if not kept: + break + random.shuffle(kept) + _autoplay_pool = kept + log.info("music: autoplay shuffled %d kept tracks", + len(kept)) + pick = _autoplay_pool.pop(0) + cooldown = ps.get("autoplay_cooldown", 30) log.info("music: autoplay cooldown %ds before next track", cooldown) @@ -602,9 +651,8 @@ async def _play_loop(bot, *, seek: float = 0.0, fade_in: float | bool = True) -> # Re-check: someone may have queued something or stopped if ps["queue"]: continue - pick = _autoplay_pool.pop(0) ps["queue"].append(pick) - log.info("music: autoplay picked '%s' (%d remaining)", + log.info("music: autoplay queued '%s' (%d pool remaining)", pick.title, len(_autoplay_pool)) track = ps["queue"].pop(0) ps["current"] = track diff --git a/tests/test_lastfm.py b/tests/test_lastfm.py index 4e8e46c..856b445 100644 --- a/tests/test_lastfm.py +++ b/tests/test_lastfm.py @@ -5,7 +5,7 @@ import importlib.util import json import sys from dataclasses import dataclass -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch # -- Load plugin module directly --------------------------------------------- @@ -521,3 +521,142 @@ class TestCmdTags: with patch.object(_mod, "_get_top_tags", return_value=tags): asyncio.run(_mod.cmd_tags(bot, msg)) assert any("Lateralus:" in r for r in bot.replied) + + +# --------------------------------------------------------------------------- +# TestParseTitle +# --------------------------------------------------------------------------- + + +class TestParseTitle: + def test_dash_separator(self): + assert _mod._parse_title("Tool - Lateralus") == ("Tool", "Lateralus") + + def test_double_dash(self): + assert _mod._parse_title("Tool -- Lateralus") == ("Tool", "Lateralus") + + def test_pipe_separator(self): + assert _mod._parse_title("Tool | Lateralus") == ("Tool", "Lateralus") + + def test_tilde_separator(self): + assert _mod._parse_title("Tool ~ Lateralus") == ("Tool", "Lateralus") + + def test_no_separator(self): + assert _mod._parse_title("Lateralus") == ("", "Lateralus") + + def test_empty_string(self): + assert _mod._parse_title("") == ("", "") + + def test_strips_whitespace(self): + assert _mod._parse_title(" Tool - Lateralus ") == ("Tool", "Lateralus") + + def test_first_separator_wins(self): + """Only the first matching separator is used.""" + assert _mod._parse_title("A - B - C") == ("A", "B - C") + + def test_dash_priority_over_pipe(self): + """Dash separator is tried before pipe.""" + assert _mod._parse_title("A - B | C") == ("A", "B | C") + + +# --------------------------------------------------------------------------- +# TestDiscoverSimilar +# --------------------------------------------------------------------------- + + +class TestDiscoverSimilar: + def test_lastfm_path(self): + """Returns Last.fm result when API key + results available.""" + bot = _FakeBot(api_key="test-key") + tracks = [{"name": "Found", "artist": {"name": "Band"}, "match": "0.9"}] + with patch.object(_mod, "_get_similar_tracks", return_value=tracks): + result = asyncio.run( + _mod.discover_similar(bot, "Tool - Lateralus"), + ) + assert result == ("Band", "Found") + + def test_lastfm_empty_falls_to_musicbrainz(self): + """Falls back to MusicBrainz when Last.fm returns nothing.""" + bot = _FakeBot(api_key="test-key") + mb_picks = [{"artist": "MB Artist", "title": "MB Song"}] + with patch.object(_mod, "_get_similar_tracks", return_value=[]), \ + patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \ + patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \ + patch("plugins._musicbrainz.mb_find_similar_recordings", + return_value=mb_picks): + result = asyncio.run( + _mod.discover_similar(bot, "Tool - Lateralus"), + ) + assert result == ("MB Artist", "MB Song") + + def test_no_api_key_uses_musicbrainz(self): + """Skips Last.fm when no API key, goes straight to MusicBrainz.""" + bot = _FakeBot(api_key="") + mb_picks = [{"artist": "MB Band", "title": "MB Track"}] + with patch.dict("os.environ", {}, clear=True), \ + patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \ + patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \ + patch("plugins._musicbrainz.mb_find_similar_recordings", + return_value=mb_picks): + result = asyncio.run( + _mod.discover_similar(bot, "Tool - Lateralus"), + ) + assert result == ("MB Band", "MB Track") + + def test_both_fail_returns_none(self): + """Returns None when both Last.fm and MusicBrainz fail.""" + bot = _FakeBot(api_key="test-key") + with patch.object(_mod, "_get_similar_tracks", return_value=[]), \ + patch("plugins._musicbrainz.mb_search_artist", return_value=None): + result = asyncio.run( + _mod.discover_similar(bot, "Tool - Lateralus"), + ) + assert result is None + + def test_no_artist_returns_none(self): + """Returns None when title has no artist component.""" + bot = _FakeBot(api_key="test-key") + result = asyncio.run( + _mod.discover_similar(bot, "Lateralus"), + ) + assert result is None + + def test_musicbrainz_import_error_handled(self): + """Gracefully handles import error for _musicbrainz module.""" + bot = _FakeBot(api_key="test-key") + with patch.object(_mod, "_get_similar_tracks", return_value=[]), \ + patch.dict("sys.modules", {"plugins._musicbrainz": None}): + result = asyncio.run( + _mod.discover_similar(bot, "Tool - Lateralus"), + ) + assert result is None + + def test_lastfm_exception_falls_to_musicbrainz(self): + """Last.fm exception triggers MusicBrainz fallback.""" + bot = _FakeBot(api_key="test-key") + mb_picks = [{"artist": "Fallback", "title": "Song"}] + with patch.object(_mod, "_get_similar_tracks", + side_effect=Exception("API down")), \ + patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \ + patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \ + patch("plugins._musicbrainz.mb_find_similar_recordings", + return_value=mb_picks): + result = asyncio.run( + _mod.discover_similar(bot, "Tool - Lateralus"), + ) + assert result == ("Fallback", "Song") + + def test_lastfm_pick_missing_name_falls_to_musicbrainz(self): + """Falls to MB when Last.fm result has empty artist/title.""" + bot = _FakeBot(api_key="test-key") + tracks = [{"name": "", "artist": {"name": ""}, "match": "0.9"}] + mb_picks = [{"artist": "MB", "title": "Track"}] + with patch.object(_mod, "_get_similar_tracks", return_value=tracks), \ + patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \ + patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \ + patch("plugins._musicbrainz.mb_find_similar_recordings", + return_value=mb_picks): + result = asyncio.run( + _mod.discover_similar(bot, "Tool - Lateralus"), + ) + assert result == ("MB", "Track") diff --git a/tests/test_music.py b/tests/test_music.py index a9dd53a..4cd691b 100644 --- a/tests/test_music.py +++ b/tests/test_music.py @@ -2362,3 +2362,245 @@ class TestKeptRepair: stored = json.loads(raw) assert stored["filename"] == "song.webm" assert (music_dir / "song.webm").is_file() + + +# --------------------------------------------------------------------------- +# TestAutoplayDiscovery +# --------------------------------------------------------------------------- + + +class TestAutoplayDiscovery: + """Tests for the discovery integration in _play_loop autoplay.""" + + def test_config_defaults(self): + """Default discover/discover_ratio values are set.""" + bot = _FakeBot() + ps = _mod._ps(bot) + assert ps["discover"] is True + assert ps["discover_ratio"] == 3 + + def test_config_from_toml(self): + """Config values are read from bot config.""" + bot = _FakeBot() + bot.config = {"music": {"discover": False, "discover_ratio": 5}} + # Reset pstate so _ps re-reads config + bot._pstate.clear() + ps = _mod._ps(bot) + assert ps["discover"] is False + assert ps["discover_ratio"] == 5 + + def test_discovery_triggers_on_ratio(self, tmp_path): + """Discovery is attempted when autoplay_count is a multiple of ratio.""" + bot = _FakeBot() + ps = _mod._ps(bot) + ps["autoplay"] = True + ps["discover"] = True + ps["discover_ratio"] = 1 # trigger every pick + ps["autoplay_cooldown"] = 0 + ps["duck_silence"] = 0 + + # Seed history so discovery has something to reference + ps["history"] = [ + _mod._Track(url="x", title="Tool - Lateralus", requester="a"), + ] + + # Set up kept tracks for fallback pool + music_dir = tmp_path / "music" + music_dir.mkdir() + (music_dir / "a.opus").write_bytes(b"audio") + bot.state.set("music", "keep:1", json.dumps({ + "url": "https://example.com/a", "title": "Kept Track", + "filename": "a.opus", "id": 1, + })) + + discover_called = [] + + async def fake_discover(b, title): + discover_called.append(title) + return ("Deftones", "Change") + + lastfm_mod = MagicMock() + lastfm_mod.discover_similar = fake_discover + bot.registry._modules = {"lastfm": lastfm_mod} + + resolved = [("https://youtube.com/watch?v=x", "Deftones - Change")] + + async def _run(): + with patch.object(_mod, "_MUSIC_DIR", music_dir), \ + patch.object(_mod, "_resolve_tracks", return_value=resolved), \ + patch.object(_mod, "_download_track", return_value=None): + task = asyncio.create_task( + _mod._play_loop(bot, seek=0.0, fade_in=False), + ) + # Let it pick a track, then cancel + await asyncio.sleep(0.5) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + asyncio.run(_run()) + assert len(discover_called) >= 1 + assert discover_called[0] == "Tool - Lateralus" + + def test_discovery_disabled(self, tmp_path): + """Discovery is skipped when discover=False.""" + bot = _FakeBot() + ps = _mod._ps(bot) + ps["autoplay"] = True + ps["discover"] = False + ps["discover_ratio"] = 1 + ps["autoplay_cooldown"] = 0 + ps["duck_silence"] = 0 + ps["history"] = [ + _mod._Track(url="x", title="Tool - Lateralus", requester="a"), + ] + + music_dir = tmp_path / "music" + music_dir.mkdir() + (music_dir / "a.opus").write_bytes(b"audio") + bot.state.set("music", "keep:1", json.dumps({ + "url": "https://example.com/a", "title": "Kept Track", + "filename": "a.opus", "id": 1, + })) + + discover_called = [] + + async def fake_discover(b, title): + discover_called.append(title) + return ("X", "Y") + + lastfm_mod = MagicMock() + lastfm_mod.discover_similar = fake_discover + bot.registry._modules = {"lastfm": lastfm_mod} + + async def _run(): + with patch.object(_mod, "_MUSIC_DIR", music_dir), \ + patch.object(_mod, "_download_track", return_value=None): + task = asyncio.create_task( + _mod._play_loop(bot, seek=0.0, fade_in=False), + ) + await asyncio.sleep(0.5) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + asyncio.run(_run()) + assert discover_called == [] + + def test_discovery_dedup(self): + """Same discovered track is not resolved twice (dedup by seen set).""" + # Unit-test the dedup logic directly: simulate the set-based + # deduplication that _play_loop uses with _discover_seen. + _discover_seen: set[str] = set() + + def _would_resolve(artist: str, title: str) -> bool: + key = f"{artist.lower()}:{title.lower()}" + if key in _discover_seen: + return False + _discover_seen.add(key) + return True + + assert _would_resolve("Deftones", "Change") is True + assert _would_resolve("Deftones", "Change") is False + assert _would_resolve("deftones", "change") is False + assert _would_resolve("Tool", "Sober") is True + + def test_discovery_fallback_to_kept(self, tmp_path): + """Falls back to kept deck when discovery returns None.""" + bot = _FakeBot() + ps = _mod._ps(bot) + ps["autoplay"] = True + ps["discover"] = True + ps["discover_ratio"] = 1 + ps["autoplay_cooldown"] = 0 + ps["duck_silence"] = 0 + ps["history"] = [ + _mod._Track(url="x", title="Tool - Lateralus", requester="a"), + ] + + music_dir = tmp_path / "music" + music_dir.mkdir() + (music_dir / "a.opus").write_bytes(b"audio") + bot.state.set("music", "keep:1", json.dumps({ + "url": "https://example.com/a", "title": "Kept Track", + "filename": "a.opus", "id": 1, + })) + + async def fake_discover(b, title): + return None + + lastfm_mod = MagicMock() + lastfm_mod.discover_similar = fake_discover + bot.registry._modules = {"lastfm": lastfm_mod} + + queued_titles = [] + + async def _run(): + with patch.object(_mod, "_MUSIC_DIR", music_dir), \ + patch.object(_mod, "_download_track", return_value=None): + task = asyncio.create_task( + _mod._play_loop(bot, seek=0.0, fade_in=False), + ) + await asyncio.sleep(0.5) + # Check what was queued -- should be kept track, not discovered + if ps.get("current"): + queued_titles.append(ps["current"].title) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + asyncio.run(_run()) + # The kept track should have been used as fallback + if queued_titles: + assert queued_titles[0] == "Kept Track" + + def test_no_history_skips_discovery(self, tmp_path): + """Discovery is skipped when history is empty.""" + bot = _FakeBot() + ps = _mod._ps(bot) + ps["autoplay"] = True + ps["discover"] = True + ps["discover_ratio"] = 1 + ps["autoplay_cooldown"] = 0 + ps["duck_silence"] = 0 + ps["history"] = [] + + music_dir = tmp_path / "music" + music_dir.mkdir() + (music_dir / "a.opus").write_bytes(b"audio") + bot.state.set("music", "keep:1", json.dumps({ + "url": "https://example.com/a", "title": "Kept Track", + "filename": "a.opus", "id": 1, + })) + + discover_called = [] + + async def fake_discover(b, title): + discover_called.append(title) + return ("X", "Y") + + lastfm_mod = MagicMock() + lastfm_mod.discover_similar = fake_discover + bot.registry._modules = {"lastfm": lastfm_mod} + + async def _run(): + with patch.object(_mod, "_MUSIC_DIR", music_dir), \ + patch.object(_mod, "_download_track", return_value=None): + task = asyncio.create_task( + _mod._play_loop(bot, seek=0.0, fade_in=False), + ) + await asyncio.sleep(0.5) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + asyncio.run(_run()) + assert discover_called == [] diff --git a/tests/test_musicbrainz.py b/tests/test_musicbrainz.py new file mode 100644 index 0000000..c672720 --- /dev/null +++ b/tests/test_musicbrainz.py @@ -0,0 +1,310 @@ +"""Tests for the MusicBrainz API helper module.""" + +import importlib.util +import json +import sys +import time +from io import BytesIO +from unittest.mock import MagicMock, patch + +# -- Load module directly ---------------------------------------------------- + +_spec = importlib.util.spec_from_file_location( + "_musicbrainz", "plugins/_musicbrainz.py", +) +_mod = importlib.util.module_from_spec(_spec) +sys.modules["_musicbrainz"] = _mod +_spec.loader.exec_module(_mod) + + +# -- Helpers ----------------------------------------------------------------- + + +def _make_resp(data: dict) -> MagicMock: + """Create a fake HTTP response with JSON body.""" + resp = MagicMock() + resp.read.return_value = json.dumps(data).encode() + return resp + + +# --------------------------------------------------------------------------- +# TestMbRequest +# --------------------------------------------------------------------------- + + +class TestMbRequest: + def setup_method(self): + _mod._last_request = 0.0 + + def test_returns_parsed_json(self): + resp = _make_resp({"status": "ok"}) + with patch("derp.http.urlopen", return_value=resp): + result = _mod._mb_request("artist", {"query": "Tool"}) + assert result == {"status": "ok"} + + def test_rate_delay_enforced(self): + """Second call within rate interval triggers sleep.""" + _mod._last_request = time.monotonic() + resp = _make_resp({}) + slept = [] + with patch("derp.http.urlopen", return_value=resp), \ + patch.object(_mod.time, "sleep", side_effect=slept.append), \ + patch.object(_mod.time, "monotonic", return_value=_mod._last_request + 0.2): + _mod._mb_request("artist", {"query": "X"}) + assert len(slept) == 1 + assert slept[0] > 0 + + def test_no_delay_when_interval_elapsed(self): + """No sleep when enough time has passed since last request.""" + _mod._last_request = time.monotonic() - 5.0 + resp = _make_resp({}) + with patch("derp.http.urlopen", return_value=resp), \ + patch.object(_mod.time, "sleep") as mock_sleep: + _mod._mb_request("artist", {"query": "X"}) + mock_sleep.assert_not_called() + + def test_returns_empty_on_error(self): + with patch("derp.http.urlopen", side_effect=ConnectionError("fail")): + result = _mod._mb_request("artist", {"query": "X"}) + assert result == {} + + def test_updates_last_request_on_success(self): + _mod._last_request = 0.0 + resp = _make_resp({}) + with patch("derp.http.urlopen", return_value=resp): + _mod._mb_request("test") + assert _mod._last_request > 0 + + def test_updates_last_request_on_error(self): + _mod._last_request = 0.0 + with patch("derp.http.urlopen", side_effect=Exception("boom")): + _mod._mb_request("test") + assert _mod._last_request > 0 + + def test_none_params(self): + """Handles None params without error.""" + resp = _make_resp({"ok": True}) + with patch("derp.http.urlopen", return_value=resp): + result = _mod._mb_request("test", None) + assert result == {"ok": True} + + +# --------------------------------------------------------------------------- +# TestMbSearchArtist +# --------------------------------------------------------------------------- + + +class TestMbSearchArtist: + def setup_method(self): + _mod._last_request = 0.0 + + def test_returns_mbid(self): + data = {"artists": [{"id": "abc-123", "name": "Tool", "score": 100}]} + with patch.object(_mod, "_mb_request", return_value=data): + result = _mod.mb_search_artist("Tool") + assert result == "abc-123" + + def test_returns_none_no_results(self): + with patch.object(_mod, "_mb_request", return_value={"artists": []}): + assert _mod.mb_search_artist("Unknown") is None + + def test_returns_none_on_empty_response(self): + with patch.object(_mod, "_mb_request", return_value={}): + assert _mod.mb_search_artist("X") is None + + def test_returns_none_low_score(self): + """Rejects matches with score below 50.""" + data = {"artists": [{"id": "low", "name": "Mismatch", "score": 30}]} + with patch.object(_mod, "_mb_request", return_value=data): + assert _mod.mb_search_artist("Tool") is None + + def test_returns_none_on_error(self): + with patch.object(_mod, "_mb_request", return_value={}): + assert _mod.mb_search_artist("Error") is None + + def test_accepts_high_score(self): + data = {"artists": [{"id": "abc", "name": "Tool", "score": 85}]} + with patch.object(_mod, "_mb_request", return_value=data): + assert _mod.mb_search_artist("Tool") == "abc" + + +# --------------------------------------------------------------------------- +# TestMbArtistTags +# --------------------------------------------------------------------------- + + +class TestMbArtistTags: + def setup_method(self): + _mod._last_request = 0.0 + + def test_returns_sorted_top_5(self): + data = {"tags": [ + {"name": "rock", "count": 50}, + {"name": "metal", "count": 100}, + {"name": "prog", "count": 80}, + {"name": "alternative", "count": 60}, + {"name": "hard rock", "count": 40}, + {"name": "grunge", "count": 30}, + ]} + with patch.object(_mod, "_mb_request", return_value=data): + result = _mod.mb_artist_tags("mbid-123") + assert len(result) == 5 + assert result[0] == "metal" + assert result[1] == "prog" + assert result[2] == "alternative" + assert result[3] == "rock" + assert result[4] == "hard rock" + + def test_empty_tags(self): + with patch.object(_mod, "_mb_request", return_value={"tags": []}): + assert _mod.mb_artist_tags("mbid") == [] + + def test_no_tags_key(self): + with patch.object(_mod, "_mb_request", return_value={}): + assert _mod.mb_artist_tags("mbid") == [] + + def test_skips_nameless_tags(self): + data = {"tags": [ + {"name": "rock", "count": 50}, + {"count": 100}, # no name + {"name": "", "count": 80}, # empty name + ]} + with patch.object(_mod, "_mb_request", return_value=data): + result = _mod.mb_artist_tags("mbid") + assert result == ["rock"] + + def test_fewer_than_5_tags(self): + data = {"tags": [{"name": "jazz", "count": 10}]} + with patch.object(_mod, "_mb_request", return_value=data): + result = _mod.mb_artist_tags("mbid") + assert result == ["jazz"] + + +# --------------------------------------------------------------------------- +# TestMbFindSimilarRecordings +# --------------------------------------------------------------------------- + + +class TestMbFindSimilarRecordings: + def setup_method(self): + _mod._last_request = 0.0 + + def test_returns_dicts(self): + data = {"recordings": [ + { + "title": "Song A", + "artist-credit": [{"name": "Other Artist"}], + }, + { + "title": "Song B", + "artist-credit": [{"name": "Another Band"}], + }, + ]} + with patch.object(_mod, "_mb_request", return_value=data): + result = _mod.mb_find_similar_recordings( + "Tool", ["rock", "metal"], + ) + assert len(result) == 2 + assert result[0] == {"artist": "Other Artist", "title": "Song A"} + assert result[1] == {"artist": "Another Band", "title": "Song B"} + + def test_excludes_original_artist(self): + data = {"recordings": [ + { + "title": "Own Song", + "artist-credit": [{"name": "Tool"}], + }, + { + "title": "Other Song", + "artist-credit": [{"name": "Deftones"}], + }, + ]} + with patch.object(_mod, "_mb_request", return_value=data): + result = _mod.mb_find_similar_recordings( + "Tool", ["rock"], + ) + assert len(result) == 1 + assert result[0]["artist"] == "Deftones" + + def test_excludes_original_artist_case_insensitive(self): + data = {"recordings": [ + { + "title": "Song", + "artist-credit": [{"name": "TOOL"}], + }, + ]} + with patch.object(_mod, "_mb_request", return_value=data): + result = _mod.mb_find_similar_recordings( + "Tool", ["rock"], + ) + assert result == [] + + def test_deduplicates(self): + data = {"recordings": [ + { + "title": "Song A", + "artist-credit": [{"name": "Band X"}], + }, + { + "title": "Song A", + "artist-credit": [{"name": "Band X"}], + }, + ]} + with patch.object(_mod, "_mb_request", return_value=data): + result = _mod.mb_find_similar_recordings( + "Other", ["rock"], + ) + assert len(result) == 1 + + def test_empty_tags(self): + result = _mod.mb_find_similar_recordings("Tool", []) + assert result == [] + + def test_no_recordings(self): + with patch.object(_mod, "_mb_request", return_value={"recordings": []}): + result = _mod.mb_find_similar_recordings( + "Tool", ["rock"], + ) + assert result == [] + + def test_empty_response(self): + with patch.object(_mod, "_mb_request", return_value={}): + result = _mod.mb_find_similar_recordings( + "Tool", ["rock"], + ) + assert result == [] + + def test_skips_missing_title(self): + data = {"recordings": [ + { + "title": "", + "artist-credit": [{"name": "Band"}], + }, + ]} + with patch.object(_mod, "_mb_request", return_value=data): + result = _mod.mb_find_similar_recordings( + "Other", ["rock"], + ) + assert result == [] + + def test_skips_missing_artist_credit(self): + data = {"recordings": [ + {"title": "Song", "artist-credit": []}, + {"title": "Song2"}, + ]} + with patch.object(_mod, "_mb_request", return_value=data): + result = _mod.mb_find_similar_recordings( + "Other", ["rock"], + ) + assert result == [] + + def test_uses_top_two_tags(self): + """Query should use at most 2 tags.""" + with patch.object(_mod, "_mb_request", return_value={}) as mock_req: + _mod.mb_find_similar_recordings( + "Tool", ["rock", "metal", "prog"], + ) + call_args = mock_req.call_args + query = call_args[1]["query"] if "query" in (call_args[1] or {}) else call_args[0][1].get("query", "") + # Verify the query contains both tag references + assert "rock" in query or "metal" in query