"""Plugin: music discovery via Last.fm API.""" from __future__ import annotations import asyncio import json import logging import os import random from urllib.parse import urlencode from derp.plugin import command log = logging.getLogger(__name__) _BASE = "https://ws.audioscrobbler.com/2.0/" # -- Config ------------------------------------------------------------------ def _get_api_key(bot) -> str: """Resolve Last.fm API key from env or config.""" return (os.environ.get("LASTFM_API_KEY", "") or bot.config.get("lastfm", {}).get("api_key", "")) # -- API helpers ------------------------------------------------------------- def _api_call(api_key: str, method: str, **params) -> dict: """Blocking Last.fm API call. Run in executor.""" from derp.http import urlopen qs = urlencode({ "method": method, "api_key": api_key, "format": "json", **params, }) url = f"{_BASE}?{qs}" try: resp = urlopen(url, timeout=10) return json.loads(resp.read().decode()) except Exception: log.exception("lastfm: API call failed: %s", method) return {} def _get_similar_artists(api_key: str, artist: str, limit: int = 10) -> list[dict]: """Fetch similar artists for a given artist name.""" data = _api_call(api_key, "artist.getSimilar", artist=artist, limit=str(limit)) artists = data.get("similarartists", {}).get("artist", []) if isinstance(artists, dict): artists = [artists] return artists def _get_top_tags(api_key: str, artist: str) -> list[dict]: """Fetch top tags for an artist.""" data = _api_call(api_key, "artist.getTopTags", artist=artist) tags = data.get("toptags", {}).get("tag", []) if isinstance(tags, dict): tags = [tags] return tags def _get_similar_tracks(api_key: str, artist: str, track: str, limit: int = 10) -> list[dict]: """Fetch similar tracks for a given artist + track.""" data = _api_call(api_key, "track.getSimilar", artist=artist, track=track, limit=str(limit)) tracks = data.get("similartracks", {}).get("track", []) if isinstance(tracks, dict): tracks = [tracks] return tracks def _search_track(api_key: str, query: str, limit: int = 5) -> list[dict]: """Search Last.fm for tracks matching a query.""" data = _api_call(api_key, "track.search", track=query, limit=str(limit)) results = data.get("results", {}).get("trackmatches", {}).get("track", []) if isinstance(results, dict): results = [results] return results # -- Metadata extraction ----------------------------------------------------- def _parse_title(raw_title: str) -> tuple[str, str]: """Split a raw track title into (artist, title). Tries common separators: `` - ``, `` -- ``, `` | ``, `` ~ ``. Returns ``("", raw_title)`` if no separator is found. """ for sep in (" - ", " -- ", " | ", " ~ "): if sep in raw_title: parts = raw_title.split(sep, 1) return (parts[0].strip(), parts[1].strip()) return ("", raw_title) def _current_meta(bot) -> tuple[str, str]: """Extract artist and title from the currently playing track. Returns (artist, title). Either or both may be empty. Tries the music plugin's current track metadata on this bot first, then checks peer bots (shared registry) so extra bots can see what the music bot is playing. """ # Check this bot first, then peers candidates = [bot] for peer in getattr(getattr(bot, "registry", None), "_bots", {}).values(): if peer is not bot: candidates.append(peer) for b in candidates: music_ps = getattr(b, "_pstate", {}).get("music", {}) current = music_ps.get("current") if current is not None: raw_title = current.title or "" if raw_title: return _parse_title(raw_title) return ("", "") # -- Discovery orchestrator -------------------------------------------------- async def discover_similar(bot, last_track_title: str) -> tuple[str, str] | None: """Find a similar track via Last.fm or MusicBrainz fallback. Returns ``(artist, title)`` or ``None`` if nothing found. """ artist, title = _parse_title(last_track_title) loop = asyncio.get_running_loop() # -- Last.fm path -- api_key = _get_api_key(bot) if api_key and artist: try: similar = await loop.run_in_executor( None, _get_similar_tracks, api_key, artist, title, 20, ) if similar: pick = random.choice(similar) pick_artist = pick.get("artist", {}).get("name", "") pick_title = pick.get("name", "") if pick_artist and pick_title: return (pick_artist, pick_title) except Exception: log.warning("lastfm: discover via Last.fm failed", exc_info=True) # -- MusicBrainz fallback -- if artist: try: from plugins._musicbrainz import ( mb_artist_tags, mb_find_similar_recordings, mb_search_artist, ) mbid = await loop.run_in_executor(None, mb_search_artist, artist) if mbid: tags = await loop.run_in_executor(None, mb_artist_tags, mbid) if tags: picks = await loop.run_in_executor( None, mb_find_similar_recordings, artist, tags, 20, ) if picks: pick = random.choice(picks) return (pick["artist"], pick["title"]) except Exception: log.warning("lastfm: discover via MusicBrainz failed", exc_info=True) return None # -- Formatting -------------------------------------------------------------- def _fmt_match(m: float | str) -> str: """Format a Last.fm match score as a percentage.""" try: return f"{float(m) * 100:.0f}%" except (ValueError, TypeError): return "" # -- Playlist helpers -------------------------------------------------------- def _search_queries(similar: list[dict], similar_artists: list[dict], mb_results: list[dict], limit: int = 10) -> list[str]: """Normalize discovery results into YouTube search strings. Processes track results (``{artist: {name}, name}``), artist results (``{name}``), and MusicBrainz results (``{artist, title}``) into a flat list of search query strings, up to *limit*. """ queries: list[str] = [] for t in similar: a = t.get("artist", {}).get("name", "") n = t.get("name", "") q = f"{a} {n}".strip() if q: queries.append(q) for a in similar_artists: name = a.get("name", "") if name: queries.append(name) for r in mb_results: q = f"{r.get('artist', '')} {r.get('title', '')}".strip() if q: queries.append(q) return queries[:limit] async def _resolve_playlist(bot, queries: list[str], requester: str) -> list: """Resolve search queries to Track objects via yt-dlp in parallel. Uses the music plugin's ``_resolve_tracks`` and ``_Track`` to build a playlist. Returns a list of ``_Track`` objects (empty on failure). """ music_mod = bot.registry._modules.get("music") if not music_mod: return [] loop = asyncio.get_running_loop() resolve = music_mod._resolve_tracks Track = music_mod._Track pool = _get_yt_pool() async def _resolve_one(query: str): try: pairs = await loop.run_in_executor( pool, resolve, f"ytsearch1:{query}", 1, ) if pairs: url, title = pairs[0] return Track(url=url, title=title, requester=requester) except Exception: log.debug("lastfm: resolve failed for %r", query) return None tasks = [_resolve_one(q) for q in queries] results = await asyncio.gather(*tasks) return [t for t in results if t is not None] _yt_pool = None def _get_yt_pool(): """Lazy-init a shared ThreadPoolExecutor for yt-dlp resolution.""" global _yt_pool if _yt_pool is None: from concurrent.futures import ThreadPoolExecutor _yt_pool = ThreadPoolExecutor(max_workers=4) return _yt_pool async def _display_results(bot, message, similar: list[dict], similar_artists: list[dict], mb_results: list[dict], artist: str, title: str) -> None: """Format and display discovery results (list mode).""" if similar: lines = [f"Similar to {artist} - {title}:"] for t in similar[:8]: t_artist = t.get("artist", {}).get("name", "") t_name = t.get("name", "?") match = _fmt_match(t.get("match", "")) suffix = f" ({match})" if match else "" lines.append(f" {t_artist} - {t_name}{suffix}") await bot.long_reply(message, lines, label="similar tracks") return search_artist = artist or title if similar_artists: lines = [f"Similar to {search_artist}:"] for a in similar_artists[:8]: name = a.get("name", "?") match = _fmt_match(a.get("match", "")) suffix = f" ({match})" if match else "" lines.append(f" {name}{suffix}") await bot.long_reply(message, lines, label="similar artists") return if mb_results: lines = [f"Similar to {search_artist}:"] for r in mb_results[:8]: lines.append(f" {r['artist']} - {r['title']}") await bot.long_reply(message, lines, label="similar tracks") return await bot.reply(message, f"No similar artists found for '{search_artist}'") # -- Commands ---------------------------------------------------------------- @command("similar", help="Music: !similar [list] [artist] -- discover & play similar music") async def cmd_similar(bot, message): """Discover and play similar music. Usage: !similar Discover + play similar to current track !similar Discover + play similar to named artist !similar list Show similar (display only) !similar list Show similar for named artist """ api_key = _get_api_key(bot) parts = message.text.split(None, 2) # !similar list [artist] list_mode = len(parts) >= 2 and parts[1].lower() == "list" if list_mode: query = parts[2].strip() if len(parts) > 2 else "" else: query = parts[1].strip() if len(parts) > 1 else "" loop = asyncio.get_running_loop() # Resolve artist from query or current track if query: artist = query title = "" else: artist, title = _current_meta(bot) if not artist and not title: await bot.reply(message, "Nothing playing and no artist given") return # -- Last.fm path -- similar: list[dict] = [] similar_artists: list[dict] = [] if api_key: # Try track-level similarity first if we have both artist + title if artist and title: similar = await loop.run_in_executor( None, _get_similar_tracks, api_key, artist, title, ) # Fall back to artist-level similarity if not similar: search_artist = artist or title similar_artists = await loop.run_in_executor( None, _get_similar_artists, api_key, search_artist, ) # -- MusicBrainz fallback -- mb_results: list[dict] = [] if not similar and not similar_artists: search_artist = artist or title try: from plugins._musicbrainz import ( mb_artist_tags, mb_find_similar_recordings, mb_search_artist, ) mbid = await loop.run_in_executor( None, mb_search_artist, search_artist, ) if mbid: tags = await loop.run_in_executor(None, mb_artist_tags, mbid) if tags: mb_results = await loop.run_in_executor( None, mb_find_similar_recordings, search_artist, tags, 20, ) except Exception: log.warning("lastfm: MusicBrainz fallback failed", exc_info=True) # Nothing found at all if not similar and not similar_artists and not mb_results: search_artist = artist or title await bot.reply(message, f"No similar artists found for '{search_artist}'") return # -- List mode (display only) -- if list_mode: await _display_results(bot, message, similar, similar_artists, mb_results, artist, title) return # -- Play mode (default): build playlist and transition -- search_artist = artist or title queries = _search_queries(similar, similar_artists, mb_results, limit=10) if not queries: await bot.reply(message, f"No similar artists found for '{search_artist}'") return music_mod = bot.registry._modules.get("music") if not music_mod: # No music plugin -- fall back to display await _display_results(bot, message, similar, similar_artists, mb_results, artist, title) return await bot.reply(message, f"Discovering similar to {search_artist}...") tracks = await _resolve_playlist(bot, queries, message.nick) if not tracks: await bot.reply(message, "No playable tracks resolved") return # Transition: fade out current, load new playlist ps = music_mod._ps(bot) await music_mod._fade_and_cancel(bot, duration=3.0) ps["queue"].clear() ps["current"] = None ps["queue"] = list(tracks) music_mod._ensure_loop(bot, fade_in=True) await bot.reply(message, f"Playing {len(tracks)} similar tracks for {search_artist}") @command("tags", help="Music: !tags [artist] -- show genre tags") async def cmd_tags(bot, message): """Show genre/style tags for an artist. Usage: !tags Tags for currently playing artist !tags Tags for named artist """ api_key = _get_api_key(bot) parts = message.text.split(None, 1) query = parts[1].strip() if len(parts) > 1 else "" loop = asyncio.get_running_loop() if query: artist = query else: artist, title = _current_meta(bot) artist = artist or title if not artist: await bot.reply(message, "Nothing playing and no artist given") return # -- Last.fm path -- tags = [] if api_key: tags = await loop.run_in_executor( None, _get_top_tags, api_key, artist, ) # -- MusicBrainz fallback -- if not tags: try: from plugins._musicbrainz import mb_artist_tags, mb_search_artist mbid = await loop.run_in_executor(None, mb_search_artist, artist) if mbid: mb_tags = await loop.run_in_executor( None, mb_artist_tags, mbid, ) if mb_tags: await bot.reply(message, f"{artist}: {', '.join(mb_tags)}") return except Exception: log.warning("lastfm: MusicBrainz tag fallback failed", exc_info=True) if not tags: await bot.reply(message, f"No tags found for '{artist}'") return # Show top tags with counts tag_names = [t.get("name", "?") for t in tags[:10] if t.get("name")] await bot.reply(message, f"{artist}: {', '.join(tag_names)}")