"""Plugin: music discovery via Last.fm API.""" from __future__ import annotations import asyncio import json import logging import os import random from urllib.parse import urlencode from derp.plugin import command log = logging.getLogger(__name__) _BASE = "https://ws.audioscrobbler.com/2.0/" # -- Config ------------------------------------------------------------------ def _get_api_key(bot) -> str: """Resolve Last.fm API key from env or config.""" return (os.environ.get("LASTFM_API_KEY", "") or bot.config.get("lastfm", {}).get("api_key", "")) # -- API helpers ------------------------------------------------------------- def _api_call(api_key: str, method: str, **params) -> dict: """Blocking Last.fm API call. Run in executor.""" from derp.http import urlopen qs = urlencode({ "method": method, "api_key": api_key, "format": "json", **params, }) url = f"{_BASE}?{qs}" try: resp = urlopen(url, timeout=10) return json.loads(resp.read().decode()) except Exception: log.exception("lastfm: API call failed: %s", method) return {} def _get_similar_artists(api_key: str, artist: str, limit: int = 10) -> list[dict]: """Fetch similar artists for a given artist name.""" data = _api_call(api_key, "artist.getSimilar", artist=artist, limit=str(limit)) artists = data.get("similarartists", {}).get("artist", []) if isinstance(artists, dict): artists = [artists] return artists def _get_top_tags(api_key: str, artist: str) -> list[dict]: """Fetch top tags for an artist.""" data = _api_call(api_key, "artist.getTopTags", artist=artist) tags = data.get("toptags", {}).get("tag", []) if isinstance(tags, dict): tags = [tags] return tags def _get_similar_tracks(api_key: str, artist: str, track: str, limit: int = 10) -> list[dict]: """Fetch similar tracks for a given artist + track.""" data = _api_call(api_key, "track.getSimilar", artist=artist, track=track, limit=str(limit)) tracks = data.get("similartracks", {}).get("track", []) if isinstance(tracks, dict): tracks = [tracks] return tracks def _search_track(api_key: str, query: str, limit: int = 5) -> list[dict]: """Search Last.fm for tracks matching a query.""" data = _api_call(api_key, "track.search", track=query, limit=str(limit)) results = data.get("results", {}).get("trackmatches", {}).get("track", []) if isinstance(results, dict): results = [results] return results # -- Metadata extraction ----------------------------------------------------- def _parse_title(raw_title: str) -> tuple[str, str]: """Split a raw track title into (artist, title). Tries common separators: `` - ``, `` -- ``, `` | ``, `` ~ ``. Returns ``("", raw_title)`` if no separator is found. """ for sep in (" - ", " -- ", " | ", " ~ "): if sep in raw_title: parts = raw_title.split(sep, 1) return (parts[0].strip(), parts[1].strip()) return ("", raw_title) def _music_bot(bot): """Return the bot instance that owns music playback. Checks the calling bot first, then peer bots via the shared registry. Returns the first bot with an active music state, or ``bot`` as fallback. """ candidates = [bot] for peer in getattr(getattr(bot, "registry", None), "_bots", {}).values(): if peer is not bot: candidates.append(peer) for b in candidates: music_ps = getattr(b, "_pstate", {}).get("music", {}) if music_ps.get("current") is not None or music_ps.get("queue"): return b # No active music state -- prefer a bot that allows the music plugin for b in candidates: only = getattr(b, "_only_plugins", None) if only is not None and "music" in only: return b return bot def _current_meta(bot) -> tuple[str, str]: """Extract artist and title from the currently playing track. Returns (artist, title). Either or both may be empty. Checks the music bot (via ``_music_bot``) for now-playing metadata. """ mb = _music_bot(bot) music_ps = getattr(mb, "_pstate", {}).get("music", {}) current = music_ps.get("current") if current is not None: raw_title = current.title or "" if raw_title: return _parse_title(raw_title) return ("", "") # -- Discovery orchestrator -------------------------------------------------- async def discover_similar(bot, last_track_title: str) -> tuple[str, str] | None: """Find a similar track via Last.fm or MusicBrainz fallback. Returns ``(artist, title)`` or ``None`` if nothing found. """ artist, title = _parse_title(last_track_title) loop = asyncio.get_running_loop() # -- Last.fm path -- api_key = _get_api_key(bot) if api_key and artist: try: similar = await loop.run_in_executor( None, _get_similar_tracks, api_key, artist, title, 20, ) if similar: pick = random.choice(similar) pick_artist = pick.get("artist", {}).get("name", "") pick_title = pick.get("name", "") if pick_artist and pick_title: return (pick_artist, pick_title) except Exception: log.warning("lastfm: discover via Last.fm failed", exc_info=True) # -- MusicBrainz fallback -- if artist: try: from plugins._musicbrainz import ( mb_artist_tags, mb_find_similar_recordings, mb_search_artist, ) mbid = await loop.run_in_executor(None, mb_search_artist, artist) if mbid: tags = await loop.run_in_executor(None, mb_artist_tags, mbid) if tags: picks = await loop.run_in_executor( None, mb_find_similar_recordings, artist, tags, 20, ) if picks: pick = random.choice(picks) return (pick["artist"], pick["title"]) except Exception: log.warning("lastfm: discover via MusicBrainz failed", exc_info=True) return None # -- Formatting -------------------------------------------------------------- def _fmt_match(m: float | str) -> str: """Format a Last.fm match score as a percentage.""" try: return f"{float(m) * 100:.0f}%" except (ValueError, TypeError): return "" # -- Playlist helpers -------------------------------------------------------- def _search_queries(similar: list[dict], similar_artists: list[dict], mb_results: list[dict], limit: int = 10) -> list[str]: """Normalize discovery results into YouTube search strings. Processes track results (``{artist: {name}, name}``), artist results (``{name}``), and MusicBrainz results (``{artist, title}``) into a flat list of search query strings, up to *limit*. """ queries: list[str] = [] for t in similar: a = t.get("artist", {}).get("name", "") n = t.get("name", "") q = f"{a} {n}".strip() if q: queries.append(q) for a in similar_artists: name = a.get("name", "") if name: queries.append(name) for r in mb_results: q = f"{r.get('artist', '')} {r.get('title', '')}".strip() if q: queries.append(q) return queries[:limit] async def _resolve_playlist(bot, queries: list[str], requester: str) -> list: """Resolve search queries to Track objects via yt-dlp in parallel. Uses the music plugin's ``_resolve_tracks`` and ``_Track`` to build a playlist. Returns a list of ``_Track`` objects (empty on failure). """ music_mod = bot.registry._modules.get("music") if not music_mod: return [] loop = asyncio.get_running_loop() resolve = music_mod._resolve_tracks Track = music_mod._Track pool = _get_yt_pool() async def _resolve_one(query: str): try: pairs = await loop.run_in_executor( pool, resolve, f"ytsearch1:{query}", 1, ) if pairs: url, title = pairs[0] return Track(url=url, title=title, requester=requester) except Exception: log.debug("lastfm: resolve failed for %r", query) return None tasks = [_resolve_one(q) for q in queries] results = await asyncio.gather(*tasks) return [t for t in results if t is not None] _yt_pool = None def _get_yt_pool(): """Lazy-init a shared ThreadPoolExecutor for yt-dlp resolution.""" global _yt_pool if _yt_pool is None: from concurrent.futures import ThreadPoolExecutor _yt_pool = ThreadPoolExecutor(max_workers=4) return _yt_pool async def _display_results(bot, message, similar: list[dict], similar_artists: list[dict], mb_results: list[dict], artist: str, title: str) -> None: """Format and display discovery results (list mode).""" if similar: lines = [f"Similar to {artist} - {title}:"] for t in similar[:8]: t_artist = t.get("artist", {}).get("name", "") t_name = t.get("name", "?") match = _fmt_match(t.get("match", "")) suffix = f" ({match})" if match else "" lines.append(f" {t_artist} - {t_name}{suffix}") await bot.long_reply(message, lines, label="similar tracks") return search_artist = artist or title if similar_artists: lines = [f"Similar to {search_artist}:"] for a in similar_artists[:8]: name = a.get("name", "?") match = _fmt_match(a.get("match", "")) suffix = f" ({match})" if match else "" lines.append(f" {name}{suffix}") await bot.long_reply(message, lines, label="similar artists") return if mb_results: lines = [f"Similar to {search_artist}:"] for r in mb_results[:8]: lines.append(f" {r['artist']} - {r['title']}") await bot.long_reply(message, lines, label="similar tracks") return await bot.reply(message, f"No similar artists found for '{search_artist}'") # -- Commands ---------------------------------------------------------------- @command("similar", help="Music: !similar [list] [artist] -- discover & play similar music") async def cmd_similar(bot, message): """Discover and play similar music. Usage: !similar Discover + play similar to current track !similar Discover + play similar to named artist !similar list Show similar (display only) !similar list Show similar for named artist """ api_key = _get_api_key(bot) parts = message.text.split(None, 2) # !similar list [artist] list_mode = len(parts) >= 2 and parts[1].lower() == "list" if list_mode: query = parts[2].strip() if len(parts) > 2 else "" else: query = parts[1].strip() if len(parts) > 1 else "" loop = asyncio.get_running_loop() # Resolve artist from query or current track if query: artist = query title = "" else: artist, title = _current_meta(bot) if not artist and not title: await bot.reply(message, "Nothing playing and no artist given") return # -- Last.fm path -- similar: list[dict] = [] similar_artists: list[dict] = [] if api_key: # Try track-level similarity first if we have both artist + title if artist and title: similar = await loop.run_in_executor( None, _get_similar_tracks, api_key, artist, title, ) # Fall back to artist-level similarity if not similar: search_artist = artist or title similar_artists = await loop.run_in_executor( None, _get_similar_artists, api_key, search_artist, ) # -- MusicBrainz fallback -- mb_results: list[dict] = [] if not similar and not similar_artists: search_artist = artist or title try: from plugins._musicbrainz import ( mb_artist_tags, mb_find_similar_recordings, mb_search_artist, ) mbid = await loop.run_in_executor( None, mb_search_artist, search_artist, ) if mbid: tags = await loop.run_in_executor(None, mb_artist_tags, mbid) if tags: mb_results = await loop.run_in_executor( None, mb_find_similar_recordings, search_artist, tags, 20, ) except Exception: log.warning("lastfm: MusicBrainz fallback failed", exc_info=True) # Nothing found at all if not similar and not similar_artists and not mb_results: search_artist = artist or title await bot.reply(message, f"No similar artists found for '{search_artist}'") return # -- List mode (display only) -- if list_mode: await _display_results(bot, message, similar, similar_artists, mb_results, artist, title) return # -- Play mode (default): build playlist and transition -- search_artist = artist or title queries = _search_queries(similar, similar_artists, mb_results, limit=10) if not queries: await bot.reply(message, f"No similar artists found for '{search_artist}'") return music_mod = bot.registry._modules.get("music") if not music_mod: # No music plugin -- fall back to display await _display_results(bot, message, similar, similar_artists, mb_results, artist, title) return await bot.reply(message, f"Discovering similar to {search_artist}...") tracks = await _resolve_playlist(bot, queries, message.nick) if not tracks: await bot.reply(message, "No playable tracks resolved") return # Transition on the music bot (derp), not the calling bot (may be merlin) dj = _music_bot(bot) ps = music_mod._ps(dj) await music_mod._fade_and_cancel(dj, duration=3.0) ps["queue"].clear() ps["current"] = None ps["queue"] = list(tracks) music_mod._ensure_loop(dj, fade_in=True) await bot.reply(message, f"Playing {len(tracks)} similar tracks for {search_artist}") @command("tags", help="Music: !tags [artist] -- show genre tags") async def cmd_tags(bot, message): """Show genre/style tags for an artist. Usage: !tags Tags for currently playing artist !tags Tags for named artist """ api_key = _get_api_key(bot) parts = message.text.split(None, 1) query = parts[1].strip() if len(parts) > 1 else "" loop = asyncio.get_running_loop() if query: artist = query else: artist, title = _current_meta(bot) artist = artist or title if not artist: await bot.reply(message, "Nothing playing and no artist given") return # -- Last.fm path -- tags = [] if api_key: tags = await loop.run_in_executor( None, _get_top_tags, api_key, artist, ) # -- MusicBrainz fallback -- if not tags: try: from plugins._musicbrainz import mb_artist_tags, mb_search_artist mbid = await loop.run_in_executor(None, mb_search_artist, artist) if mbid: mb_tags = await loop.run_in_executor( None, mb_artist_tags, mbid, ) if mb_tags: await bot.reply(message, f"{artist}: {', '.join(mb_tags)}") return except Exception: log.warning("lastfm: MusicBrainz tag fallback failed", exc_info=True) if not tags: await bot.reply(message, f"No tags found for '{artist}'") return # Show top tags with counts tag_names = [t.get("name", "?") for t in tags[:10] if t.get("name")] await bot.reply(message, f"{artist}: {', '.join(tag_names)}")