"""Plugin: music discovery via Last.fm API.""" from __future__ import annotations import asyncio import json import logging import os import random from urllib.parse import urlencode from derp.plugin import command log = logging.getLogger(__name__) _BASE = "https://ws.audioscrobbler.com/2.0/" # -- Config ------------------------------------------------------------------ def _get_api_key(bot) -> str: """Resolve Last.fm API key from env or config.""" return (os.environ.get("LASTFM_API_KEY", "") or bot.config.get("lastfm", {}).get("api_key", "")) # -- API helpers ------------------------------------------------------------- def _api_call(api_key: str, method: str, **params) -> dict: """Blocking Last.fm API call. Run in executor.""" from derp.http import urlopen qs = urlencode({ "method": method, "api_key": api_key, "format": "json", **params, }) url = f"{_BASE}?{qs}" try: resp = urlopen(url, timeout=10) return json.loads(resp.read().decode()) except Exception: log.exception("lastfm: API call failed: %s", method) return {} def _get_similar_artists(api_key: str, artist: str, limit: int = 10) -> list[dict]: """Fetch similar artists for a given artist name.""" data = _api_call(api_key, "artist.getSimilar", artist=artist, limit=str(limit)) artists = data.get("similarartists", {}).get("artist", []) if isinstance(artists, dict): artists = [artists] return artists def _get_top_tags(api_key: str, artist: str) -> list[dict]: """Fetch top tags for an artist.""" data = _api_call(api_key, "artist.getTopTags", artist=artist) tags = data.get("toptags", {}).get("tag", []) if isinstance(tags, dict): tags = [tags] return tags def _get_similar_tracks(api_key: str, artist: str, track: str, limit: int = 10) -> list[dict]: """Fetch similar tracks for a given artist + track.""" data = _api_call(api_key, "track.getSimilar", artist=artist, track=track, limit=str(limit)) tracks = data.get("similartracks", {}).get("track", []) if isinstance(tracks, dict): tracks = [tracks] return tracks def _search_track(api_key: str, query: str, limit: int = 5) -> list[dict]: """Search Last.fm for tracks matching a query.""" data = _api_call(api_key, "track.search", track=query, limit=str(limit)) results = data.get("results", {}).get("trackmatches", {}).get("track", []) if isinstance(results, dict): results = [results] return results # -- Metadata extraction ----------------------------------------------------- def _parse_title(raw_title: str) -> tuple[str, str]: """Split a raw track title into (artist, title). Tries common separators: `` - ``, `` -- ``, `` | ``, `` ~ ``. Returns ``("", raw_title)`` if no separator is found. """ for sep in (" - ", " -- ", " | ", " ~ "): if sep in raw_title: parts = raw_title.split(sep, 1) return (parts[0].strip(), parts[1].strip()) return ("", raw_title) def _current_meta(bot) -> tuple[str, str]: """Extract artist and title from the currently playing track. Returns (artist, title). Either or both may be empty. Tries the music plugin's current track metadata, falling back to splitting the title on common separators. """ music_ps = bot._pstate.get("music", {}) current = music_ps.get("current") if current is None: return ("", "") raw_title = current.title or "" return _parse_title(raw_title) # -- Discovery orchestrator -------------------------------------------------- async def discover_similar(bot, last_track_title: str) -> tuple[str, str] | None: """Find a similar track via Last.fm or MusicBrainz fallback. Returns ``(artist, title)`` or ``None`` if nothing found. """ artist, title = _parse_title(last_track_title) loop = asyncio.get_running_loop() # -- Last.fm path -- api_key = _get_api_key(bot) if api_key and artist: try: similar = await loop.run_in_executor( None, _get_similar_tracks, api_key, artist, title, 20, ) if similar: pick = random.choice(similar) pick_artist = pick.get("artist", {}).get("name", "") pick_title = pick.get("name", "") if pick_artist and pick_title: return (pick_artist, pick_title) except Exception: log.warning("lastfm: discover via Last.fm failed", exc_info=True) # -- MusicBrainz fallback -- if artist: try: from plugins._musicbrainz import ( mb_artist_tags, mb_find_similar_recordings, mb_search_artist, ) mbid = await loop.run_in_executor(None, mb_search_artist, artist) if mbid: tags = await loop.run_in_executor(None, mb_artist_tags, mbid) if tags: picks = await loop.run_in_executor( None, mb_find_similar_recordings, artist, tags, 20, ) if picks: pick = random.choice(picks) return (pick["artist"], pick["title"]) except Exception: log.warning("lastfm: discover via MusicBrainz failed", exc_info=True) return None # -- Formatting -------------------------------------------------------------- def _fmt_match(m: float | str) -> str: """Format a Last.fm match score as a percentage.""" try: return f"{float(m) * 100:.0f}%" except (ValueError, TypeError): return "" # -- Commands ---------------------------------------------------------------- @command("similar", help="Music: !similar [artist|play] -- find similar music") async def cmd_similar(bot, message): """Find similar artists or tracks. Usage: !similar Similar to currently playing track !similar Similar artists to named artist !similar play Queue a random similar track !similar play Queue a similar track for named artist """ api_key = _get_api_key(bot) parts = message.text.split(None, 2) # !similar play [artist] play_mode = len(parts) >= 2 and parts[1].lower() == "play" if play_mode: query = parts[2].strip() if len(parts) > 2 else "" else: query = parts[1].strip() if len(parts) > 1 else "" import asyncio loop = asyncio.get_running_loop() # Resolve artist from query or current track if query: artist = query title = "" else: artist, title = _current_meta(bot) if not artist and not title: await bot.reply(message, "Nothing playing and no artist given") return # -- Last.fm path -- similar = [] similar_artists = [] if api_key: # Try track-level similarity first if we have both artist + title if artist and title: similar = await loop.run_in_executor( None, _get_similar_tracks, api_key, artist, title, ) # Fall back to artist-level similarity if not similar: search_artist = artist or title similar_artists = await loop.run_in_executor( None, _get_similar_artists, api_key, search_artist, ) # -- MusicBrainz fallback -- mb_results = [] if not similar and not similar_artists: search_artist = artist or title try: from plugins._musicbrainz import ( mb_artist_tags, mb_find_similar_recordings, mb_search_artist, ) mbid = await loop.run_in_executor( None, mb_search_artist, search_artist, ) if mbid: tags = await loop.run_in_executor(None, mb_artist_tags, mbid) if tags: mb_results = await loop.run_in_executor( None, mb_find_similar_recordings, search_artist, tags, 20, ) except Exception: log.warning("lastfm: MusicBrainz fallback failed", exc_info=True) # -- Track-level results (Last.fm) -- if similar: if play_mode: pick = random.choice(similar[:10]) pick_artist = pick.get("artist", {}).get("name", "") pick_title = pick.get("name", "") search = f"{pick_artist} {pick_title}".strip() if not search: await bot.reply(message, "No playable result found") return message.text = f"!play {search}" music_mod = bot.registry._modules.get("music") if music_mod: await music_mod.cmd_play(bot, message) return lines = [f"Similar to {artist} - {title}:"] for t in similar[:8]: t_artist = t.get("artist", {}).get("name", "") t_name = t.get("name", "?") match = _fmt_match(t.get("match", "")) suffix = f" ({match})" if match else "" lines.append(f" {t_artist} - {t_name}{suffix}") await bot.long_reply(message, lines, label="similar tracks") return # -- Artist-level results (Last.fm) -- if similar_artists: search_artist = artist or title if play_mode: pick = random.choice(similar_artists[:10]) pick_name = pick.get("name", "") if not pick_name: await bot.reply(message, "No playable result found") return message.text = f"!play {pick_name}" music_mod = bot.registry._modules.get("music") if music_mod: await music_mod.cmd_play(bot, message) return lines = [f"Similar to {search_artist}:"] for a in similar_artists[:8]: name = a.get("name", "?") match = _fmt_match(a.get("match", "")) suffix = f" ({match})" if match else "" lines.append(f" {name}{suffix}") await bot.long_reply(message, lines, label="similar artists") return # -- MusicBrainz results -- if mb_results: search_artist = artist or title if play_mode: pick = random.choice(mb_results[:10]) search = f"{pick['artist']} {pick['title']}".strip() message.text = f"!play {search}" music_mod = bot.registry._modules.get("music") if music_mod: await music_mod.cmd_play(bot, message) return lines = [f"Similar to {search_artist}:"] for r in mb_results[:8]: lines.append(f" {r['artist']} - {r['title']}") await bot.long_reply(message, lines, label="similar tracks") return # Nothing found search_artist = artist or title await bot.reply(message, f"No similar artists found for '{search_artist}'") @command("tags", help="Music: !tags [artist] -- show genre tags") async def cmd_tags(bot, message): """Show genre/style tags for an artist. Usage: !tags Tags for currently playing artist !tags Tags for named artist """ api_key = _get_api_key(bot) parts = message.text.split(None, 1) query = parts[1].strip() if len(parts) > 1 else "" import asyncio loop = asyncio.get_running_loop() if query: artist = query else: artist, title = _current_meta(bot) artist = artist or title if not artist: await bot.reply(message, "Nothing playing and no artist given") return # -- Last.fm path -- tags = [] if api_key: tags = await loop.run_in_executor( None, _get_top_tags, api_key, artist, ) # -- MusicBrainz fallback -- if not tags: try: from plugins._musicbrainz import mb_artist_tags, mb_search_artist mbid = await loop.run_in_executor(None, mb_search_artist, artist) if mbid: mb_tags = await loop.run_in_executor( None, mb_artist_tags, mbid, ) if mb_tags: await bot.reply(message, f"{artist}: {', '.join(mb_tags)}") return except Exception: log.warning("lastfm: MusicBrainz tag fallback failed", exc_info=True) if not tags: await bot.reply(message, f"No tags found for '{artist}'") return # Show top tags with counts tag_names = [t.get("name", "?") for t in tags[:10] if t.get("name")] await bot.reply(message, f"{artist}: {', '.join(tag_names)}")