"""Plugin: music discovery via Last.fm API.""" from __future__ import annotations import asyncio import json import logging import os import random from urllib.parse import urlencode from derp.plugin import command log = logging.getLogger(__name__) _BASE = "https://ws.audioscrobbler.com/2.0/" # -- Config ------------------------------------------------------------------ def _get_api_key(bot) -> str: """Resolve Last.fm API key from env or config.""" return (os.environ.get("LASTFM_API_KEY", "") or bot.config.get("lastfm", {}).get("api_key", "")) # -- API helpers ------------------------------------------------------------- def _api_call(api_key: str, method: str, **params) -> dict: """Blocking Last.fm API call. Run in executor.""" from derp.http import urlopen qs = urlencode({ "method": method, "api_key": api_key, "format": "json", **params, }) url = f"{_BASE}?{qs}" try: resp = urlopen(url, timeout=10) return json.loads(resp.read().decode()) except Exception: log.exception("lastfm: API call failed: %s", method) return {} def _get_similar_artists(api_key: str, artist: str, limit: int = 10) -> list[dict]: """Fetch similar artists for a given artist name.""" data = _api_call(api_key, "artist.getSimilar", artist=artist, limit=str(limit)) artists = data.get("similarartists", {}).get("artist", []) if isinstance(artists, dict): artists = [artists] return artists def _get_top_tags(api_key: str, artist: str) -> list[dict]: """Fetch top tags for an artist.""" data = _api_call(api_key, "artist.getTopTags", artist=artist) tags = data.get("toptags", {}).get("tag", []) if isinstance(tags, dict): tags = [tags] return tags def _get_similar_tracks(api_key: str, artist: str, track: str, limit: int = 10) -> list[dict]: """Fetch similar tracks for a given artist + track.""" data = _api_call(api_key, "track.getSimilar", artist=artist, track=track, limit=str(limit)) tracks = data.get("similartracks", {}).get("track", []) if isinstance(tracks, dict): tracks = [tracks] return tracks def _search_track(api_key: str, query: str, limit: int = 5) -> list[dict]: """Search Last.fm for tracks matching a query.""" data = _api_call(api_key, "track.search", track=query, limit=str(limit)) results = data.get("results", {}).get("trackmatches", {}).get("track", []) if isinstance(results, dict): results = [results] return results # -- Metadata extraction ----------------------------------------------------- def _parse_title(raw_title: str) -> tuple[str, str]: """Split a raw track title into (artist, title). Tries common separators: `` - ``, `` -- ``, `` | ``, `` ~ ``. Returns ``("", raw_title)`` if no separator is found. """ for sep in (" - ", " -- ", " | ", " ~ "): if sep in raw_title: parts = raw_title.split(sep, 1) return (parts[0].strip(), parts[1].strip()) return ("", raw_title) def _current_meta(bot) -> tuple[str, str]: """Extract artist and title from the currently playing track. Returns (artist, title). Either or both may be empty. Tries the music plugin's current track metadata, falling back to splitting the title on common separators. """ music_ps = bot._pstate.get("music", {}) current = music_ps.get("current") if current is None: return ("", "") raw_title = current.title or "" return _parse_title(raw_title) # -- Discovery orchestrator -------------------------------------------------- async def discover_similar(bot, last_track_title: str) -> tuple[str, str] | None: """Find a similar track via Last.fm or MusicBrainz fallback. Returns ``(artist, title)`` or ``None`` if nothing found. """ artist, title = _parse_title(last_track_title) loop = asyncio.get_running_loop() # -- Last.fm path -- api_key = _get_api_key(bot) if api_key and artist: try: similar = await loop.run_in_executor( None, _get_similar_tracks, api_key, artist, title, 20, ) if similar: pick = random.choice(similar) pick_artist = pick.get("artist", {}).get("name", "") pick_title = pick.get("name", "") if pick_artist and pick_title: return (pick_artist, pick_title) except Exception: log.warning("lastfm: discover via Last.fm failed", exc_info=True) # -- MusicBrainz fallback -- if artist: try: from plugins._musicbrainz import ( mb_artist_tags, mb_find_similar_recordings, mb_search_artist, ) mbid = await loop.run_in_executor(None, mb_search_artist, artist) if mbid: tags = await loop.run_in_executor(None, mb_artist_tags, mbid) if tags: picks = await loop.run_in_executor( None, mb_find_similar_recordings, artist, tags, 20, ) if picks: pick = random.choice(picks) return (pick["artist"], pick["title"]) except Exception: log.warning("lastfm: discover via MusicBrainz failed", exc_info=True) return None # -- Formatting -------------------------------------------------------------- def _fmt_match(m: float | str) -> str: """Format a Last.fm match score as a percentage.""" try: return f"{float(m) * 100:.0f}%" except (ValueError, TypeError): return "" # -- Commands ---------------------------------------------------------------- @command("similar", help="Music: !similar [artist|play] -- find similar music") async def cmd_similar(bot, message): """Find similar artists or tracks. Usage: !similar Similar to currently playing track !similar Similar artists to named artist !similar play Queue a random similar track !similar play Queue a similar track for named artist """ api_key = _get_api_key(bot) if not api_key: await bot.reply(message, "Last.fm API key not configured") return parts = message.text.split(None, 2) # !similar play [artist] play_mode = len(parts) >= 2 and parts[1].lower() == "play" if play_mode: query = parts[2].strip() if len(parts) > 2 else "" else: query = parts[1].strip() if len(parts) > 1 else "" import asyncio loop = asyncio.get_running_loop() # Resolve artist from query or current track if query: artist = query title = "" else: artist, title = _current_meta(bot) if not artist and not title: await bot.reply(message, "Nothing playing and no artist given") return # Try track-level similarity first if we have both artist + title similar = [] if artist and title: similar = await loop.run_in_executor( None, _get_similar_tracks, api_key, artist, title, ) # Fall back to artist-level similarity if not similar: search_artist = artist or title similar_artists = await loop.run_in_executor( None, _get_similar_artists, api_key, search_artist, ) if not similar_artists: await bot.reply(message, f"No similar artists found for '{search_artist}'") return if play_mode: # Pick a random similar artist and search YouTube pick = random.choice(similar_artists[:10]) pick_name = pick.get("name", "") if not pick_name: await bot.reply(message, "No playable result found") return # Inject a !play command with a YouTube search message.text = f"!play {pick_name}" music_mod = bot.registry._modules.get("music") if music_mod: await music_mod.cmd_play(bot, message) return # Display similar artists lines = [f"Similar to {search_artist}:"] for a in similar_artists[:8]: name = a.get("name", "?") match = _fmt_match(a.get("match", "")) suffix = f" ({match})" if match else "" lines.append(f" {name}{suffix}") await bot.long_reply(message, lines, label="similar artists") return # Track-level results if play_mode: pick = random.choice(similar[:10]) pick_artist = pick.get("artist", {}).get("name", "") pick_title = pick.get("name", "") search = f"{pick_artist} {pick_title}".strip() if not search: await bot.reply(message, "No playable result found") return message.text = f"!play {search}" music_mod = bot.registry._modules.get("music") if music_mod: await music_mod.cmd_play(bot, message) return # Display similar tracks lines = [f"Similar to {artist} - {title}:"] for t in similar[:8]: t_artist = t.get("artist", {}).get("name", "") t_name = t.get("name", "?") match = _fmt_match(t.get("match", "")) suffix = f" ({match})" if match else "" lines.append(f" {t_artist} - {t_name}{suffix}") await bot.long_reply(message, lines, label="similar tracks") @command("tags", help="Music: !tags [artist] -- show genre tags") async def cmd_tags(bot, message): """Show genre/style tags for an artist. Usage: !tags Tags for currently playing artist !tags Tags for named artist """ api_key = _get_api_key(bot) if not api_key: await bot.reply(message, "Last.fm API key not configured") return parts = message.text.split(None, 1) query = parts[1].strip() if len(parts) > 1 else "" import asyncio loop = asyncio.get_running_loop() if query: artist = query else: artist, title = _current_meta(bot) artist = artist or title if not artist: await bot.reply(message, "Nothing playing and no artist given") return tags = await loop.run_in_executor( None, _get_top_tags, api_key, artist, ) if not tags: await bot.reply(message, f"No tags found for '{artist}'") return # Show top tags with counts tag_names = [t.get("name", "?") for t in tags[:10] if t.get("name")] await bot.reply(message, f"{artist}: {', '.join(tag_names)}")