"""Plugin: music discovery via Last.fm API.""" from __future__ import annotations import json import logging import os import random from urllib.parse import urlencode from derp.plugin import command log = logging.getLogger(__name__) _BASE = "https://ws.audioscrobbler.com/2.0/" # -- Config ------------------------------------------------------------------ def _get_api_key(bot) -> str: """Resolve Last.fm API key from env or config.""" return (os.environ.get("LASTFM_API_KEY", "") or bot.config.get("lastfm", {}).get("api_key", "")) # -- API helpers ------------------------------------------------------------- def _api_call(api_key: str, method: str, **params) -> dict: """Blocking Last.fm API call. Run in executor.""" from derp.http import urlopen qs = urlencode({ "method": method, "api_key": api_key, "format": "json", **params, }) url = f"{_BASE}?{qs}" try: resp = urlopen(url, timeout=10) return json.loads(resp.read().decode()) except Exception: log.exception("lastfm: API call failed: %s", method) return {} def _get_similar_artists(api_key: str, artist: str, limit: int = 10) -> list[dict]: """Fetch similar artists for a given artist name.""" data = _api_call(api_key, "artist.getSimilar", artist=artist, limit=str(limit)) artists = data.get("similarartists", {}).get("artist", []) if isinstance(artists, dict): artists = [artists] return artists def _get_top_tags(api_key: str, artist: str) -> list[dict]: """Fetch top tags for an artist.""" data = _api_call(api_key, "artist.getTopTags", artist=artist) tags = data.get("toptags", {}).get("tag", []) if isinstance(tags, dict): tags = [tags] return tags def _get_similar_tracks(api_key: str, artist: str, track: str, limit: int = 10) -> list[dict]: """Fetch similar tracks for a given artist + track.""" data = _api_call(api_key, "track.getSimilar", artist=artist, track=track, limit=str(limit)) tracks = data.get("similartracks", {}).get("track", []) if isinstance(tracks, dict): tracks = [tracks] return tracks def _search_track(api_key: str, query: str, limit: int = 5) -> list[dict]: """Search Last.fm for tracks matching a query.""" data = _api_call(api_key, "track.search", track=query, limit=str(limit)) results = data.get("results", {}).get("trackmatches", {}).get("track", []) if isinstance(results, dict): results = [results] return results # -- Metadata extraction ----------------------------------------------------- def _current_meta(bot) -> tuple[str, str]: """Extract artist and title from the currently playing track. Returns (artist, title). Either or both may be empty. Tries the music plugin's current track metadata, falling back to splitting the title on common separators. """ music_ps = bot._pstate.get("music", {}) current = music_ps.get("current") if current is None: return ("", "") raw_title = current.title or "" # Try common "Artist - Title" patterns for sep in (" - ", " -- ", " | ", " ~ "): if sep in raw_title: parts = raw_title.split(sep, 1) return (parts[0].strip(), parts[1].strip()) # No separator -- treat whole thing as a search query return ("", raw_title) # -- Formatting -------------------------------------------------------------- def _fmt_match(m: float | str) -> str: """Format a Last.fm match score as a percentage.""" try: return f"{float(m) * 100:.0f}%" except (ValueError, TypeError): return "" # -- Commands ---------------------------------------------------------------- @command("similar", help="Music: !similar [artist|play] -- find similar music") async def cmd_similar(bot, message): """Find similar artists or tracks. Usage: !similar Similar to currently playing track !similar Similar artists to named artist !similar play Queue a random similar track !similar play Queue a similar track for named artist """ api_key = _get_api_key(bot) if not api_key: await bot.reply(message, "Last.fm API key not configured") return parts = message.text.split(None, 2) # !similar play [artist] play_mode = len(parts) >= 2 and parts[1].lower() == "play" if play_mode: query = parts[2].strip() if len(parts) > 2 else "" else: query = parts[1].strip() if len(parts) > 1 else "" import asyncio loop = asyncio.get_running_loop() # Resolve artist from query or current track if query: artist = query title = "" else: artist, title = _current_meta(bot) if not artist and not title: await bot.reply(message, "Nothing playing and no artist given") return # Try track-level similarity first if we have both artist + title similar = [] if artist and title: similar = await loop.run_in_executor( None, _get_similar_tracks, api_key, artist, title, ) # Fall back to artist-level similarity if not similar: search_artist = artist or title similar_artists = await loop.run_in_executor( None, _get_similar_artists, api_key, search_artist, ) if not similar_artists: await bot.reply(message, f"No similar artists found for '{search_artist}'") return if play_mode: # Pick a random similar artist and search YouTube pick = random.choice(similar_artists[:10]) pick_name = pick.get("name", "") if not pick_name: await bot.reply(message, "No playable result found") return # Inject a !play command with a YouTube search message.text = f"!play {pick_name}" music_mod = bot.registry._modules.get("music") if music_mod: await music_mod.cmd_play(bot, message) return # Display similar artists lines = [f"Similar to {search_artist}:"] for a in similar_artists[:8]: name = a.get("name", "?") match = _fmt_match(a.get("match", "")) suffix = f" ({match})" if match else "" lines.append(f" {name}{suffix}") await bot.long_reply(message, lines, label="similar artists") return # Track-level results if play_mode: pick = random.choice(similar[:10]) pick_artist = pick.get("artist", {}).get("name", "") pick_title = pick.get("name", "") search = f"{pick_artist} {pick_title}".strip() if not search: await bot.reply(message, "No playable result found") return message.text = f"!play {search}" music_mod = bot.registry._modules.get("music") if music_mod: await music_mod.cmd_play(bot, message) return # Display similar tracks lines = [f"Similar to {artist} - {title}:"] for t in similar[:8]: t_artist = t.get("artist", {}).get("name", "") t_name = t.get("name", "?") match = _fmt_match(t.get("match", "")) suffix = f" ({match})" if match else "" lines.append(f" {t_artist} - {t_name}{suffix}") await bot.long_reply(message, lines, label="similar tracks") @command("tags", help="Music: !tags [artist] -- show genre tags") async def cmd_tags(bot, message): """Show genre/style tags for an artist. Usage: !tags Tags for currently playing artist !tags Tags for named artist """ api_key = _get_api_key(bot) if not api_key: await bot.reply(message, "Last.fm API key not configured") return parts = message.text.split(None, 1) query = parts[1].strip() if len(parts) > 1 else "" import asyncio loop = asyncio.get_running_loop() if query: artist = query else: artist, title = _current_meta(bot) artist = artist or title if not artist: await bot.reply(message, "Nothing playing and no artist given") return tags = await loop.run_in_executor( None, _get_top_tags, api_key, artist, ) if not tags: await bot.reply(message, f"No tags found for '{artist}'") return # Show top tags with counts tag_names = [t.get("name", "?") for t in tags[:10] if t.get("name")] await bot.reply(message, f"{artist}: {', '.join(tag_names)}")