feat: auto-discover similar tracks during autoplay via Last.fm/MusicBrainz

Every Nth autoplay pick (configurable via discover_ratio), query Last.fm
for similar tracks. When Last.fm has no key or returns nothing, fall back
to MusicBrainz tag-based recording search (no API key needed). Discovered
tracks are resolved via yt-dlp and deduplicated within the session. If
discovery fails, the kept-deck shuffle continues as before.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-23 21:19:41 +01:00
parent 56f6b9822f
commit da9ed51c74
6 changed files with 933 additions and 17 deletions

118
plugins/_musicbrainz.py Normal file
View File

@@ -0,0 +1,118 @@
"""MusicBrainz API helper for music discovery fallback.
Private module (underscore prefix) -- plugin loader skips it.
All functions are blocking; callers should run them in an executor.
"""
from __future__ import annotations
import json
import logging
import time
from urllib.request import Request
log = logging.getLogger(__name__)
_BASE = "https://musicbrainz.org/ws/2"
_UA = "derp-bot/2.0.0 (https://git.mymx.me/username/derp)"
# Rate limit: MusicBrainz requires max 1 request/second.
# We use 1.1s between calls to stay well within limits.
_RATE_INTERVAL = 1.1
_last_request: float = 0.0
def _mb_request(path: str, params: dict | None = None) -> dict:
"""Rate-limited GET to MusicBrainz API. Blocking."""
global _last_request
from derp.http import urlopen
elapsed = time.monotonic() - _last_request
if elapsed < _RATE_INTERVAL:
time.sleep(_RATE_INTERVAL - elapsed)
qs = "&".join(f"{k}={v}" for k, v in (params or {}).items())
url = f"{_BASE}/{path}?fmt=json&{qs}" if qs else f"{_BASE}/{path}?fmt=json"
req = Request(url, headers={"User-Agent": _UA})
try:
resp = urlopen(req, timeout=10, proxy=False)
_last_request = time.monotonic()
return json.loads(resp.read().decode())
except Exception:
_last_request = time.monotonic()
log.warning("musicbrainz: request failed: %s", path, exc_info=True)
return {}
def mb_search_artist(name: str) -> str | None:
"""Search for an artist by name, return MBID or None."""
from urllib.parse import quote
data = _mb_request("artist", {"query": quote(name), "limit": "1"})
artists = data.get("artists", [])
if not artists:
return None
# Require a reasonable score to avoid false matches
score = artists[0].get("score", 0)
if score < 50:
return None
return artists[0].get("id")
def mb_artist_tags(mbid: str) -> list[str]:
"""Fetch top 5 tags for an artist by MBID."""
data = _mb_request(f"artist/{mbid}", {"inc": "tags"})
tags = data.get("tags", [])
if not tags:
return []
# Sort by count descending, take top 5
sorted_tags = sorted(tags, key=lambda t: t.get("count", 0), reverse=True)
return [t["name"] for t in sorted_tags[:5] if t.get("name")]
def mb_find_similar_recordings(artist: str, tags: list[str],
limit: int = 10) -> list[dict]:
"""Find recordings by other artists sharing top tags.
Searches MusicBrainz for recordings tagged with the top 2 tags,
excluding the original artist. Returns [{"artist": str, "title": str}].
"""
from urllib.parse import quote
if not tags:
return []
# Use top 2 tags for the query
tag_query = " AND ".join(f'tag:"{t}"' for t in tags[:2])
query = f'({tag_query}) AND NOT artist:"{artist}"'
data = _mb_request("recording", {
"query": quote(query),
"limit": str(limit),
})
recordings = data.get("recordings", [])
if not recordings:
return []
seen = set()
results = []
for rec in recordings:
title = rec.get("title", "")
credits = rec.get("artist-credit", [])
if not credits or not title:
continue
rec_artist = credits[0].get("name", "") if credits else ""
if not rec_artist:
continue
# Skip the original artist (case-insensitive)
if rec_artist.lower() == artist.lower():
continue
# Deduplicate by artist+title
key = f"{rec_artist.lower()}:{title.lower()}"
if key in seen:
continue
seen.add(key)
results.append({"artist": rec_artist, "title": title})
return results

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
import json
import logging
import os
@@ -91,6 +92,19 @@ def _search_track(api_key: str, query: str,
# -- Metadata extraction -----------------------------------------------------
def _parse_title(raw_title: str) -> tuple[str, str]:
"""Split a raw track title into (artist, title).
Tries common separators: `` - ``, `` -- ``, `` | ``, `` ~ ``.
Returns ``("", raw_title)`` if no separator is found.
"""
for sep in (" - ", " -- ", " | ", " ~ "):
if sep in raw_title:
parts = raw_title.split(sep, 1)
return (parts[0].strip(), parts[1].strip())
return ("", raw_title)
def _current_meta(bot) -> tuple[str, str]:
"""Extract artist and title from the currently playing track.
@@ -103,15 +117,60 @@ def _current_meta(bot) -> tuple[str, str]:
if current is None:
return ("", "")
raw_title = current.title or ""
return _parse_title(raw_title)
# Try common "Artist - Title" patterns
for sep in (" - ", " -- ", " | ", " ~ "):
if sep in raw_title:
parts = raw_title.split(sep, 1)
return (parts[0].strip(), parts[1].strip())
# No separator -- treat whole thing as a search query
return ("", raw_title)
# -- Discovery orchestrator --------------------------------------------------
async def discover_similar(bot, last_track_title: str) -> tuple[str, str] | None:
"""Find a similar track via Last.fm or MusicBrainz fallback.
Returns ``(artist, title)`` or ``None`` if nothing found.
"""
artist, title = _parse_title(last_track_title)
loop = asyncio.get_running_loop()
# -- Last.fm path --
api_key = _get_api_key(bot)
if api_key and artist:
try:
similar = await loop.run_in_executor(
None, _get_similar_tracks, api_key, artist, title, 20,
)
if similar:
pick = random.choice(similar)
pick_artist = pick.get("artist", {}).get("name", "")
pick_title = pick.get("name", "")
if pick_artist and pick_title:
return (pick_artist, pick_title)
except Exception:
log.warning("lastfm: discover via Last.fm failed", exc_info=True)
# -- MusicBrainz fallback --
if artist:
try:
from plugins._musicbrainz import (
mb_artist_tags,
mb_find_similar_recordings,
mb_search_artist,
)
mbid = await loop.run_in_executor(None, mb_search_artist, artist)
if mbid:
tags = await loop.run_in_executor(None, mb_artist_tags, mbid)
if tags:
picks = await loop.run_in_executor(
None, mb_find_similar_recordings, artist, tags, 20,
)
if picks:
pick = random.choice(picks)
return (pick["artist"], pick["title"])
except Exception:
log.warning("lastfm: discover via MusicBrainz failed",
exc_info=True)
return None
# -- Formatting --------------------------------------------------------------

View File

@@ -58,6 +58,8 @@ def _ps(bot):
"history": [],
"autoplay": cfg.get("autoplay", True),
"autoplay_cooldown": cfg.get("autoplay_cooldown", 30),
"discover": cfg.get("discover", True),
"discover_ratio": cfg.get("discover_ratio", 3),
"announce": cfg.get("announce", False),
"paused": None,
"_watcher_task": None,
@@ -575,17 +577,64 @@ async def _play_loop(bot, *, seek: float = 0.0, fade_in: float | bool = True) ->
seek_req = [None]
ps["seek_req"] = seek_req
_autoplay_pool: list[_Track] = [] # shuffled deck, refilled each cycle
_discover_seen: set[str] = set() # "artist:title" dedup within session
_autoplay_count: int = 0 # autoplay picks since loop start
try:
while ps["queue"] or ps.get("autoplay"):
# Autoplay: cooldown + silence wait, then pick next from shuffled deck
if not ps["queue"]:
if not _autoplay_pool:
kept = _load_kept_tracks(bot)
if not kept:
break
random.shuffle(kept)
_autoplay_pool = kept
log.info("music: autoplay shuffled %d kept tracks", len(kept))
_autoplay_count += 1
# -- Discovery attempt on every Nth autoplay pick --
discovered = False
ratio = ps.get("discover_ratio", 3)
if (ps.get("discover") and ratio > 0
and _autoplay_count % ratio == 0
and ps["history"]):
last = ps["history"][-1]
try:
lfm = bot.registry._modules.get("lastfm")
if lfm and hasattr(lfm, "discover_similar"):
pair = await lfm.discover_similar(bot, last.title)
if pair:
a, t = pair
key = f"{a.lower()}:{t.lower()}"
if key not in _discover_seen:
_discover_seen.add(key)
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(
None, _resolve_tracks,
f"{a} {t}", 1,
)
if res:
discovered = True
pick = _Track(
url=res[0][0], title=res[0][1],
requester="discover",
)
log.info(
"music: discovered '%s' "
"similar to '%s'",
pick.title, last.title,
)
except Exception:
log.warning(
"music: discovery failed, using kept deck",
exc_info=True,
)
# -- Kept-deck fallback --
if not discovered:
if not _autoplay_pool:
kept = _load_kept_tracks(bot)
if not kept:
break
random.shuffle(kept)
_autoplay_pool = kept
log.info("music: autoplay shuffled %d kept tracks",
len(kept))
pick = _autoplay_pool.pop(0)
cooldown = ps.get("autoplay_cooldown", 30)
log.info("music: autoplay cooldown %ds before next track",
cooldown)
@@ -602,9 +651,8 @@ async def _play_loop(bot, *, seek: float = 0.0, fade_in: float | bool = True) ->
# Re-check: someone may have queued something or stopped
if ps["queue"]:
continue
pick = _autoplay_pool.pop(0)
ps["queue"].append(pick)
log.info("music: autoplay picked '%s' (%d remaining)",
log.info("music: autoplay queued '%s' (%d pool remaining)",
pick.title, len(_autoplay_pool))
track = ps["queue"].pop(0)
ps["current"] = track

View File

@@ -5,7 +5,7 @@ import importlib.util
import json
import sys
from dataclasses import dataclass
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
# -- Load plugin module directly ---------------------------------------------
@@ -521,3 +521,142 @@ class TestCmdTags:
with patch.object(_mod, "_get_top_tags", return_value=tags):
asyncio.run(_mod.cmd_tags(bot, msg))
assert any("Lateralus:" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestParseTitle
# ---------------------------------------------------------------------------
class TestParseTitle:
def test_dash_separator(self):
assert _mod._parse_title("Tool - Lateralus") == ("Tool", "Lateralus")
def test_double_dash(self):
assert _mod._parse_title("Tool -- Lateralus") == ("Tool", "Lateralus")
def test_pipe_separator(self):
assert _mod._parse_title("Tool | Lateralus") == ("Tool", "Lateralus")
def test_tilde_separator(self):
assert _mod._parse_title("Tool ~ Lateralus") == ("Tool", "Lateralus")
def test_no_separator(self):
assert _mod._parse_title("Lateralus") == ("", "Lateralus")
def test_empty_string(self):
assert _mod._parse_title("") == ("", "")
def test_strips_whitespace(self):
assert _mod._parse_title(" Tool - Lateralus ") == ("Tool", "Lateralus")
def test_first_separator_wins(self):
"""Only the first matching separator is used."""
assert _mod._parse_title("A - B - C") == ("A", "B - C")
def test_dash_priority_over_pipe(self):
"""Dash separator is tried before pipe."""
assert _mod._parse_title("A - B | C") == ("A", "B | C")
# ---------------------------------------------------------------------------
# TestDiscoverSimilar
# ---------------------------------------------------------------------------
class TestDiscoverSimilar:
def test_lastfm_path(self):
"""Returns Last.fm result when API key + results available."""
bot = _FakeBot(api_key="test-key")
tracks = [{"name": "Found", "artist": {"name": "Band"}, "match": "0.9"}]
with patch.object(_mod, "_get_similar_tracks", return_value=tracks):
result = asyncio.run(
_mod.discover_similar(bot, "Tool - Lateralus"),
)
assert result == ("Band", "Found")
def test_lastfm_empty_falls_to_musicbrainz(self):
"""Falls back to MusicBrainz when Last.fm returns nothing."""
bot = _FakeBot(api_key="test-key")
mb_picks = [{"artist": "MB Artist", "title": "MB Song"}]
with patch.object(_mod, "_get_similar_tracks", return_value=[]), \
patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \
patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \
patch("plugins._musicbrainz.mb_find_similar_recordings",
return_value=mb_picks):
result = asyncio.run(
_mod.discover_similar(bot, "Tool - Lateralus"),
)
assert result == ("MB Artist", "MB Song")
def test_no_api_key_uses_musicbrainz(self):
"""Skips Last.fm when no API key, goes straight to MusicBrainz."""
bot = _FakeBot(api_key="")
mb_picks = [{"artist": "MB Band", "title": "MB Track"}]
with patch.dict("os.environ", {}, clear=True), \
patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \
patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \
patch("plugins._musicbrainz.mb_find_similar_recordings",
return_value=mb_picks):
result = asyncio.run(
_mod.discover_similar(bot, "Tool - Lateralus"),
)
assert result == ("MB Band", "MB Track")
def test_both_fail_returns_none(self):
"""Returns None when both Last.fm and MusicBrainz fail."""
bot = _FakeBot(api_key="test-key")
with patch.object(_mod, "_get_similar_tracks", return_value=[]), \
patch("plugins._musicbrainz.mb_search_artist", return_value=None):
result = asyncio.run(
_mod.discover_similar(bot, "Tool - Lateralus"),
)
assert result is None
def test_no_artist_returns_none(self):
"""Returns None when title has no artist component."""
bot = _FakeBot(api_key="test-key")
result = asyncio.run(
_mod.discover_similar(bot, "Lateralus"),
)
assert result is None
def test_musicbrainz_import_error_handled(self):
"""Gracefully handles import error for _musicbrainz module."""
bot = _FakeBot(api_key="test-key")
with patch.object(_mod, "_get_similar_tracks", return_value=[]), \
patch.dict("sys.modules", {"plugins._musicbrainz": None}):
result = asyncio.run(
_mod.discover_similar(bot, "Tool - Lateralus"),
)
assert result is None
def test_lastfm_exception_falls_to_musicbrainz(self):
"""Last.fm exception triggers MusicBrainz fallback."""
bot = _FakeBot(api_key="test-key")
mb_picks = [{"artist": "Fallback", "title": "Song"}]
with patch.object(_mod, "_get_similar_tracks",
side_effect=Exception("API down")), \
patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \
patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \
patch("plugins._musicbrainz.mb_find_similar_recordings",
return_value=mb_picks):
result = asyncio.run(
_mod.discover_similar(bot, "Tool - Lateralus"),
)
assert result == ("Fallback", "Song")
def test_lastfm_pick_missing_name_falls_to_musicbrainz(self):
"""Falls to MB when Last.fm result has empty artist/title."""
bot = _FakeBot(api_key="test-key")
tracks = [{"name": "", "artist": {"name": ""}, "match": "0.9"}]
mb_picks = [{"artist": "MB", "title": "Track"}]
with patch.object(_mod, "_get_similar_tracks", return_value=tracks), \
patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \
patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \
patch("plugins._musicbrainz.mb_find_similar_recordings",
return_value=mb_picks):
result = asyncio.run(
_mod.discover_similar(bot, "Tool - Lateralus"),
)
assert result == ("MB", "Track")

View File

@@ -2362,3 +2362,245 @@ class TestKeptRepair:
stored = json.loads(raw)
assert stored["filename"] == "song.webm"
assert (music_dir / "song.webm").is_file()
# ---------------------------------------------------------------------------
# TestAutoplayDiscovery
# ---------------------------------------------------------------------------
class TestAutoplayDiscovery:
"""Tests for the discovery integration in _play_loop autoplay."""
def test_config_defaults(self):
"""Default discover/discover_ratio values are set."""
bot = _FakeBot()
ps = _mod._ps(bot)
assert ps["discover"] is True
assert ps["discover_ratio"] == 3
def test_config_from_toml(self):
"""Config values are read from bot config."""
bot = _FakeBot()
bot.config = {"music": {"discover": False, "discover_ratio": 5}}
# Reset pstate so _ps re-reads config
bot._pstate.clear()
ps = _mod._ps(bot)
assert ps["discover"] is False
assert ps["discover_ratio"] == 5
def test_discovery_triggers_on_ratio(self, tmp_path):
"""Discovery is attempted when autoplay_count is a multiple of ratio."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["autoplay"] = True
ps["discover"] = True
ps["discover_ratio"] = 1 # trigger every pick
ps["autoplay_cooldown"] = 0
ps["duck_silence"] = 0
# Seed history so discovery has something to reference
ps["history"] = [
_mod._Track(url="x", title="Tool - Lateralus", requester="a"),
]
# Set up kept tracks for fallback pool
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "a.opus").write_bytes(b"audio")
bot.state.set("music", "keep:1", json.dumps({
"url": "https://example.com/a", "title": "Kept Track",
"filename": "a.opus", "id": 1,
}))
discover_called = []
async def fake_discover(b, title):
discover_called.append(title)
return ("Deftones", "Change")
lastfm_mod = MagicMock()
lastfm_mod.discover_similar = fake_discover
bot.registry._modules = {"lastfm": lastfm_mod}
resolved = [("https://youtube.com/watch?v=x", "Deftones - Change")]
async def _run():
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
patch.object(_mod, "_resolve_tracks", return_value=resolved), \
patch.object(_mod, "_download_track", return_value=None):
task = asyncio.create_task(
_mod._play_loop(bot, seek=0.0, fade_in=False),
)
# Let it pick a track, then cancel
await asyncio.sleep(0.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_run())
assert len(discover_called) >= 1
assert discover_called[0] == "Tool - Lateralus"
def test_discovery_disabled(self, tmp_path):
"""Discovery is skipped when discover=False."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["autoplay"] = True
ps["discover"] = False
ps["discover_ratio"] = 1
ps["autoplay_cooldown"] = 0
ps["duck_silence"] = 0
ps["history"] = [
_mod._Track(url="x", title="Tool - Lateralus", requester="a"),
]
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "a.opus").write_bytes(b"audio")
bot.state.set("music", "keep:1", json.dumps({
"url": "https://example.com/a", "title": "Kept Track",
"filename": "a.opus", "id": 1,
}))
discover_called = []
async def fake_discover(b, title):
discover_called.append(title)
return ("X", "Y")
lastfm_mod = MagicMock()
lastfm_mod.discover_similar = fake_discover
bot.registry._modules = {"lastfm": lastfm_mod}
async def _run():
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
patch.object(_mod, "_download_track", return_value=None):
task = asyncio.create_task(
_mod._play_loop(bot, seek=0.0, fade_in=False),
)
await asyncio.sleep(0.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_run())
assert discover_called == []
def test_discovery_dedup(self):
"""Same discovered track is not resolved twice (dedup by seen set)."""
# Unit-test the dedup logic directly: simulate the set-based
# deduplication that _play_loop uses with _discover_seen.
_discover_seen: set[str] = set()
def _would_resolve(artist: str, title: str) -> bool:
key = f"{artist.lower()}:{title.lower()}"
if key in _discover_seen:
return False
_discover_seen.add(key)
return True
assert _would_resolve("Deftones", "Change") is True
assert _would_resolve("Deftones", "Change") is False
assert _would_resolve("deftones", "change") is False
assert _would_resolve("Tool", "Sober") is True
def test_discovery_fallback_to_kept(self, tmp_path):
"""Falls back to kept deck when discovery returns None."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["autoplay"] = True
ps["discover"] = True
ps["discover_ratio"] = 1
ps["autoplay_cooldown"] = 0
ps["duck_silence"] = 0
ps["history"] = [
_mod._Track(url="x", title="Tool - Lateralus", requester="a"),
]
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "a.opus").write_bytes(b"audio")
bot.state.set("music", "keep:1", json.dumps({
"url": "https://example.com/a", "title": "Kept Track",
"filename": "a.opus", "id": 1,
}))
async def fake_discover(b, title):
return None
lastfm_mod = MagicMock()
lastfm_mod.discover_similar = fake_discover
bot.registry._modules = {"lastfm": lastfm_mod}
queued_titles = []
async def _run():
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
patch.object(_mod, "_download_track", return_value=None):
task = asyncio.create_task(
_mod._play_loop(bot, seek=0.0, fade_in=False),
)
await asyncio.sleep(0.5)
# Check what was queued -- should be kept track, not discovered
if ps.get("current"):
queued_titles.append(ps["current"].title)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_run())
# The kept track should have been used as fallback
if queued_titles:
assert queued_titles[0] == "Kept Track"
def test_no_history_skips_discovery(self, tmp_path):
"""Discovery is skipped when history is empty."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["autoplay"] = True
ps["discover"] = True
ps["discover_ratio"] = 1
ps["autoplay_cooldown"] = 0
ps["duck_silence"] = 0
ps["history"] = []
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "a.opus").write_bytes(b"audio")
bot.state.set("music", "keep:1", json.dumps({
"url": "https://example.com/a", "title": "Kept Track",
"filename": "a.opus", "id": 1,
}))
discover_called = []
async def fake_discover(b, title):
discover_called.append(title)
return ("X", "Y")
lastfm_mod = MagicMock()
lastfm_mod.discover_similar = fake_discover
bot.registry._modules = {"lastfm": lastfm_mod}
async def _run():
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
patch.object(_mod, "_download_track", return_value=None):
task = asyncio.create_task(
_mod._play_loop(bot, seek=0.0, fade_in=False),
)
await asyncio.sleep(0.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_run())
assert discover_called == []

310
tests/test_musicbrainz.py Normal file
View File

@@ -0,0 +1,310 @@
"""Tests for the MusicBrainz API helper module."""
import importlib.util
import json
import sys
import time
from io import BytesIO
from unittest.mock import MagicMock, patch
# -- Load module directly ----------------------------------------------------
_spec = importlib.util.spec_from_file_location(
"_musicbrainz", "plugins/_musicbrainz.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules["_musicbrainz"] = _mod
_spec.loader.exec_module(_mod)
# -- Helpers -----------------------------------------------------------------
def _make_resp(data: dict) -> MagicMock:
"""Create a fake HTTP response with JSON body."""
resp = MagicMock()
resp.read.return_value = json.dumps(data).encode()
return resp
# ---------------------------------------------------------------------------
# TestMbRequest
# ---------------------------------------------------------------------------
class TestMbRequest:
def setup_method(self):
_mod._last_request = 0.0
def test_returns_parsed_json(self):
resp = _make_resp({"status": "ok"})
with patch("derp.http.urlopen", return_value=resp):
result = _mod._mb_request("artist", {"query": "Tool"})
assert result == {"status": "ok"}
def test_rate_delay_enforced(self):
"""Second call within rate interval triggers sleep."""
_mod._last_request = time.monotonic()
resp = _make_resp({})
slept = []
with patch("derp.http.urlopen", return_value=resp), \
patch.object(_mod.time, "sleep", side_effect=slept.append), \
patch.object(_mod.time, "monotonic", return_value=_mod._last_request + 0.2):
_mod._mb_request("artist", {"query": "X"})
assert len(slept) == 1
assert slept[0] > 0
def test_no_delay_when_interval_elapsed(self):
"""No sleep when enough time has passed since last request."""
_mod._last_request = time.monotonic() - 5.0
resp = _make_resp({})
with patch("derp.http.urlopen", return_value=resp), \
patch.object(_mod.time, "sleep") as mock_sleep:
_mod._mb_request("artist", {"query": "X"})
mock_sleep.assert_not_called()
def test_returns_empty_on_error(self):
with patch("derp.http.urlopen", side_effect=ConnectionError("fail")):
result = _mod._mb_request("artist", {"query": "X"})
assert result == {}
def test_updates_last_request_on_success(self):
_mod._last_request = 0.0
resp = _make_resp({})
with patch("derp.http.urlopen", return_value=resp):
_mod._mb_request("test")
assert _mod._last_request > 0
def test_updates_last_request_on_error(self):
_mod._last_request = 0.0
with patch("derp.http.urlopen", side_effect=Exception("boom")):
_mod._mb_request("test")
assert _mod._last_request > 0
def test_none_params(self):
"""Handles None params without error."""
resp = _make_resp({"ok": True})
with patch("derp.http.urlopen", return_value=resp):
result = _mod._mb_request("test", None)
assert result == {"ok": True}
# ---------------------------------------------------------------------------
# TestMbSearchArtist
# ---------------------------------------------------------------------------
class TestMbSearchArtist:
def setup_method(self):
_mod._last_request = 0.0
def test_returns_mbid(self):
data = {"artists": [{"id": "abc-123", "name": "Tool", "score": 100}]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_search_artist("Tool")
assert result == "abc-123"
def test_returns_none_no_results(self):
with patch.object(_mod, "_mb_request", return_value={"artists": []}):
assert _mod.mb_search_artist("Unknown") is None
def test_returns_none_on_empty_response(self):
with patch.object(_mod, "_mb_request", return_value={}):
assert _mod.mb_search_artist("X") is None
def test_returns_none_low_score(self):
"""Rejects matches with score below 50."""
data = {"artists": [{"id": "low", "name": "Mismatch", "score": 30}]}
with patch.object(_mod, "_mb_request", return_value=data):
assert _mod.mb_search_artist("Tool") is None
def test_returns_none_on_error(self):
with patch.object(_mod, "_mb_request", return_value={}):
assert _mod.mb_search_artist("Error") is None
def test_accepts_high_score(self):
data = {"artists": [{"id": "abc", "name": "Tool", "score": 85}]}
with patch.object(_mod, "_mb_request", return_value=data):
assert _mod.mb_search_artist("Tool") == "abc"
# ---------------------------------------------------------------------------
# TestMbArtistTags
# ---------------------------------------------------------------------------
class TestMbArtistTags:
def setup_method(self):
_mod._last_request = 0.0
def test_returns_sorted_top_5(self):
data = {"tags": [
{"name": "rock", "count": 50},
{"name": "metal", "count": 100},
{"name": "prog", "count": 80},
{"name": "alternative", "count": 60},
{"name": "hard rock", "count": 40},
{"name": "grunge", "count": 30},
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_artist_tags("mbid-123")
assert len(result) == 5
assert result[0] == "metal"
assert result[1] == "prog"
assert result[2] == "alternative"
assert result[3] == "rock"
assert result[4] == "hard rock"
def test_empty_tags(self):
with patch.object(_mod, "_mb_request", return_value={"tags": []}):
assert _mod.mb_artist_tags("mbid") == []
def test_no_tags_key(self):
with patch.object(_mod, "_mb_request", return_value={}):
assert _mod.mb_artist_tags("mbid") == []
def test_skips_nameless_tags(self):
data = {"tags": [
{"name": "rock", "count": 50},
{"count": 100}, # no name
{"name": "", "count": 80}, # empty name
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_artist_tags("mbid")
assert result == ["rock"]
def test_fewer_than_5_tags(self):
data = {"tags": [{"name": "jazz", "count": 10}]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_artist_tags("mbid")
assert result == ["jazz"]
# ---------------------------------------------------------------------------
# TestMbFindSimilarRecordings
# ---------------------------------------------------------------------------
class TestMbFindSimilarRecordings:
def setup_method(self):
_mod._last_request = 0.0
def test_returns_dicts(self):
data = {"recordings": [
{
"title": "Song A",
"artist-credit": [{"name": "Other Artist"}],
},
{
"title": "Song B",
"artist-credit": [{"name": "Another Band"}],
},
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_find_similar_recordings(
"Tool", ["rock", "metal"],
)
assert len(result) == 2
assert result[0] == {"artist": "Other Artist", "title": "Song A"}
assert result[1] == {"artist": "Another Band", "title": "Song B"}
def test_excludes_original_artist(self):
data = {"recordings": [
{
"title": "Own Song",
"artist-credit": [{"name": "Tool"}],
},
{
"title": "Other Song",
"artist-credit": [{"name": "Deftones"}],
},
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_find_similar_recordings(
"Tool", ["rock"],
)
assert len(result) == 1
assert result[0]["artist"] == "Deftones"
def test_excludes_original_artist_case_insensitive(self):
data = {"recordings": [
{
"title": "Song",
"artist-credit": [{"name": "TOOL"}],
},
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_find_similar_recordings(
"Tool", ["rock"],
)
assert result == []
def test_deduplicates(self):
data = {"recordings": [
{
"title": "Song A",
"artist-credit": [{"name": "Band X"}],
},
{
"title": "Song A",
"artist-credit": [{"name": "Band X"}],
},
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_find_similar_recordings(
"Other", ["rock"],
)
assert len(result) == 1
def test_empty_tags(self):
result = _mod.mb_find_similar_recordings("Tool", [])
assert result == []
def test_no_recordings(self):
with patch.object(_mod, "_mb_request", return_value={"recordings": []}):
result = _mod.mb_find_similar_recordings(
"Tool", ["rock"],
)
assert result == []
def test_empty_response(self):
with patch.object(_mod, "_mb_request", return_value={}):
result = _mod.mb_find_similar_recordings(
"Tool", ["rock"],
)
assert result == []
def test_skips_missing_title(self):
data = {"recordings": [
{
"title": "",
"artist-credit": [{"name": "Band"}],
},
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_find_similar_recordings(
"Other", ["rock"],
)
assert result == []
def test_skips_missing_artist_credit(self):
data = {"recordings": [
{"title": "Song", "artist-credit": []},
{"title": "Song2"},
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_find_similar_recordings(
"Other", ["rock"],
)
assert result == []
def test_uses_top_two_tags(self):
"""Query should use at most 2 tags."""
with patch.object(_mod, "_mb_request", return_value={}) as mock_req:
_mod.mb_find_similar_recordings(
"Tool", ["rock", "metal", "prog"],
)
call_args = mock_req.call_args
query = call_args[1]["query"] if "query" in (call_args[1] or {}) else call_args[0][1].get("query", "")
# Verify the query contains both tag references
assert "rock" in query or "metal" in query