Files
derp/plugins/lastfm.py
user 28f4c63e99
Some checks failed
CI / gitleaks (push) Failing after 3s
CI / lint (push) Failing after 23s
CI / test (3.11) (push) Has been skipped
CI / test (3.12) (push) Has been skipped
CI / test (3.13) (push) Has been skipped
CI / build (push) Has been skipped
fix: delegate !similar playback to music bot, not calling bot
When merlin ran !similar, music operations (fade, queue, play loop)
targeted merlin instead of derp, causing audio to play over derp's
stream. New _music_bot() helper resolves the DJ bot via active state
or plugin filter config, so playback always routes to derp.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 10:39:38 +01:00

493 lines
16 KiB
Python

"""Plugin: music discovery via Last.fm API."""
from __future__ import annotations
import asyncio
import json
import logging
import os
import random
from urllib.parse import urlencode
from derp.plugin import command
log = logging.getLogger(__name__)
_BASE = "https://ws.audioscrobbler.com/2.0/"
# -- Config ------------------------------------------------------------------
def _get_api_key(bot) -> str:
"""Resolve Last.fm API key from env or config."""
return (os.environ.get("LASTFM_API_KEY", "")
or bot.config.get("lastfm", {}).get("api_key", ""))
# -- API helpers -------------------------------------------------------------
def _api_call(api_key: str, method: str, **params) -> dict:
"""Blocking Last.fm API call. Run in executor."""
from derp.http import urlopen
qs = urlencode({
"method": method,
"api_key": api_key,
"format": "json",
**params,
})
url = f"{_BASE}?{qs}"
try:
resp = urlopen(url, timeout=10)
return json.loads(resp.read().decode())
except Exception:
log.exception("lastfm: API call failed: %s", method)
return {}
def _get_similar_artists(api_key: str, artist: str,
limit: int = 10) -> list[dict]:
"""Fetch similar artists for a given artist name."""
data = _api_call(api_key, "artist.getSimilar",
artist=artist, limit=str(limit))
artists = data.get("similarartists", {}).get("artist", [])
if isinstance(artists, dict):
artists = [artists]
return artists
def _get_top_tags(api_key: str, artist: str) -> list[dict]:
"""Fetch top tags for an artist."""
data = _api_call(api_key, "artist.getTopTags", artist=artist)
tags = data.get("toptags", {}).get("tag", [])
if isinstance(tags, dict):
tags = [tags]
return tags
def _get_similar_tracks(api_key: str, artist: str, track: str,
limit: int = 10) -> list[dict]:
"""Fetch similar tracks for a given artist + track."""
data = _api_call(api_key, "track.getSimilar",
artist=artist, track=track, limit=str(limit))
tracks = data.get("similartracks", {}).get("track", [])
if isinstance(tracks, dict):
tracks = [tracks]
return tracks
def _search_track(api_key: str, query: str,
limit: int = 5) -> list[dict]:
"""Search Last.fm for tracks matching a query."""
data = _api_call(api_key, "track.search",
track=query, limit=str(limit))
results = data.get("results", {}).get("trackmatches", {}).get("track", [])
if isinstance(results, dict):
results = [results]
return results
# -- Metadata extraction -----------------------------------------------------
def _parse_title(raw_title: str) -> tuple[str, str]:
"""Split a raw track title into (artist, title).
Tries common separators: `` - ``, `` -- ``, `` | ``, `` ~ ``.
Returns ``("", raw_title)`` if no separator is found.
"""
for sep in (" - ", " -- ", " | ", " ~ "):
if sep in raw_title:
parts = raw_title.split(sep, 1)
return (parts[0].strip(), parts[1].strip())
return ("", raw_title)
def _music_bot(bot):
"""Return the bot instance that owns music playback.
Checks the calling bot first, then peer bots via the shared registry.
Returns the first bot with an active music state, or ``bot`` as fallback.
"""
candidates = [bot]
for peer in getattr(getattr(bot, "registry", None), "_bots", {}).values():
if peer is not bot:
candidates.append(peer)
for b in candidates:
music_ps = getattr(b, "_pstate", {}).get("music", {})
if music_ps.get("current") is not None or music_ps.get("queue"):
return b
# No active music state -- prefer a bot that allows the music plugin
for b in candidates:
only = getattr(b, "_only_plugins", None)
if only is not None and "music" in only:
return b
return bot
def _current_meta(bot) -> tuple[str, str]:
"""Extract artist and title from the currently playing track.
Returns (artist, title). Either or both may be empty.
Checks the music bot (via ``_music_bot``) for now-playing metadata.
"""
mb = _music_bot(bot)
music_ps = getattr(mb, "_pstate", {}).get("music", {})
current = music_ps.get("current")
if current is not None:
raw_title = current.title or ""
if raw_title:
return _parse_title(raw_title)
return ("", "")
# -- Discovery orchestrator --------------------------------------------------
async def discover_similar(bot, last_track_title: str) -> tuple[str, str] | None:
"""Find a similar track via Last.fm or MusicBrainz fallback.
Returns ``(artist, title)`` or ``None`` if nothing found.
"""
artist, title = _parse_title(last_track_title)
loop = asyncio.get_running_loop()
# -- Last.fm path --
api_key = _get_api_key(bot)
if api_key and artist:
try:
similar = await loop.run_in_executor(
None, _get_similar_tracks, api_key, artist, title, 20,
)
if similar:
pick = random.choice(similar)
pick_artist = pick.get("artist", {}).get("name", "")
pick_title = pick.get("name", "")
if pick_artist and pick_title:
return (pick_artist, pick_title)
except Exception:
log.warning("lastfm: discover via Last.fm failed", exc_info=True)
# -- MusicBrainz fallback --
if artist:
try:
from plugins._musicbrainz import (
mb_artist_tags,
mb_find_similar_recordings,
mb_search_artist,
)
mbid = await loop.run_in_executor(None, mb_search_artist, artist)
if mbid:
tags = await loop.run_in_executor(None, mb_artist_tags, mbid)
if tags:
picks = await loop.run_in_executor(
None, mb_find_similar_recordings, artist, tags, 20,
)
if picks:
pick = random.choice(picks)
return (pick["artist"], pick["title"])
except Exception:
log.warning("lastfm: discover via MusicBrainz failed",
exc_info=True)
return None
# -- Formatting --------------------------------------------------------------
def _fmt_match(m: float | str) -> str:
"""Format a Last.fm match score as a percentage."""
try:
return f"{float(m) * 100:.0f}%"
except (ValueError, TypeError):
return ""
# -- Playlist helpers --------------------------------------------------------
def _search_queries(similar: list[dict], similar_artists: list[dict],
mb_results: list[dict], limit: int = 10) -> list[str]:
"""Normalize discovery results into YouTube search strings.
Processes track results (``{artist: {name}, name}``), artist results
(``{name}``), and MusicBrainz results (``{artist, title}``) into a
flat list of search query strings, up to *limit*.
"""
queries: list[str] = []
for t in similar:
a = t.get("artist", {}).get("name", "")
n = t.get("name", "")
q = f"{a} {n}".strip()
if q:
queries.append(q)
for a in similar_artists:
name = a.get("name", "")
if name:
queries.append(name)
for r in mb_results:
q = f"{r.get('artist', '')} {r.get('title', '')}".strip()
if q:
queries.append(q)
return queries[:limit]
async def _resolve_playlist(bot, queries: list[str],
requester: str) -> list:
"""Resolve search queries to Track objects via yt-dlp in parallel.
Uses the music plugin's ``_resolve_tracks`` and ``_Track`` to build
a playlist. Returns a list of ``_Track`` objects (empty on failure).
"""
music_mod = bot.registry._modules.get("music")
if not music_mod:
return []
loop = asyncio.get_running_loop()
resolve = music_mod._resolve_tracks
Track = music_mod._Track
pool = _get_yt_pool()
async def _resolve_one(query: str):
try:
pairs = await loop.run_in_executor(
pool, resolve, f"ytsearch1:{query}", 1,
)
if pairs:
url, title = pairs[0]
return Track(url=url, title=title, requester=requester)
except Exception:
log.debug("lastfm: resolve failed for %r", query)
return None
tasks = [_resolve_one(q) for q in queries]
results = await asyncio.gather(*tasks)
return [t for t in results if t is not None]
_yt_pool = None
def _get_yt_pool():
"""Lazy-init a shared ThreadPoolExecutor for yt-dlp resolution."""
global _yt_pool
if _yt_pool is None:
from concurrent.futures import ThreadPoolExecutor
_yt_pool = ThreadPoolExecutor(max_workers=4)
return _yt_pool
async def _display_results(bot, message, similar: list[dict],
similar_artists: list[dict],
mb_results: list[dict],
artist: str, title: str) -> None:
"""Format and display discovery results (list mode)."""
if similar:
lines = [f"Similar to {artist} - {title}:"]
for t in similar[:8]:
t_artist = t.get("artist", {}).get("name", "")
t_name = t.get("name", "?")
match = _fmt_match(t.get("match", ""))
suffix = f" ({match})" if match else ""
lines.append(f" {t_artist} - {t_name}{suffix}")
await bot.long_reply(message, lines, label="similar tracks")
return
search_artist = artist or title
if similar_artists:
lines = [f"Similar to {search_artist}:"]
for a in similar_artists[:8]:
name = a.get("name", "?")
match = _fmt_match(a.get("match", ""))
suffix = f" ({match})" if match else ""
lines.append(f" {name}{suffix}")
await bot.long_reply(message, lines, label="similar artists")
return
if mb_results:
lines = [f"Similar to {search_artist}:"]
for r in mb_results[:8]:
lines.append(f" {r['artist']} - {r['title']}")
await bot.long_reply(message, lines, label="similar tracks")
return
await bot.reply(message, f"No similar artists found for '{search_artist}'")
# -- Commands ----------------------------------------------------------------
@command("similar", help="Music: !similar [list] [artist] -- discover & play similar music")
async def cmd_similar(bot, message):
"""Discover and play similar music.
Usage:
!similar Discover + play similar to current track
!similar <artist> Discover + play similar to named artist
!similar list Show similar (display only)
!similar list <artist> Show similar for named artist
"""
api_key = _get_api_key(bot)
parts = message.text.split(None, 2)
# !similar list [artist]
list_mode = len(parts) >= 2 and parts[1].lower() == "list"
if list_mode:
query = parts[2].strip() if len(parts) > 2 else ""
else:
query = parts[1].strip() if len(parts) > 1 else ""
loop = asyncio.get_running_loop()
# Resolve artist from query or current track
if query:
artist = query
title = ""
else:
artist, title = _current_meta(bot)
if not artist and not title:
await bot.reply(message, "Nothing playing and no artist given")
return
# -- Last.fm path --
similar: list[dict] = []
similar_artists: list[dict] = []
if api_key:
# Try track-level similarity first if we have both artist + title
if artist and title:
similar = await loop.run_in_executor(
None, _get_similar_tracks, api_key, artist, title,
)
# Fall back to artist-level similarity
if not similar:
search_artist = artist or title
similar_artists = await loop.run_in_executor(
None, _get_similar_artists, api_key, search_artist,
)
# -- MusicBrainz fallback --
mb_results: list[dict] = []
if not similar and not similar_artists:
search_artist = artist or title
try:
from plugins._musicbrainz import (
mb_artist_tags,
mb_find_similar_recordings,
mb_search_artist,
)
mbid = await loop.run_in_executor(
None, mb_search_artist, search_artist,
)
if mbid:
tags = await loop.run_in_executor(None, mb_artist_tags, mbid)
if tags:
mb_results = await loop.run_in_executor(
None, mb_find_similar_recordings, search_artist,
tags, 20,
)
except Exception:
log.warning("lastfm: MusicBrainz fallback failed", exc_info=True)
# Nothing found at all
if not similar and not similar_artists and not mb_results:
search_artist = artist or title
await bot.reply(message, f"No similar artists found for '{search_artist}'")
return
# -- List mode (display only) --
if list_mode:
await _display_results(bot, message, similar, similar_artists,
mb_results, artist, title)
return
# -- Play mode (default): build playlist and transition --
search_artist = artist or title
queries = _search_queries(similar, similar_artists, mb_results, limit=10)
if not queries:
await bot.reply(message, f"No similar artists found for '{search_artist}'")
return
music_mod = bot.registry._modules.get("music")
if not music_mod:
# No music plugin -- fall back to display
await _display_results(bot, message, similar, similar_artists,
mb_results, artist, title)
return
await bot.reply(message, f"Discovering similar to {search_artist}...")
tracks = await _resolve_playlist(bot, queries, message.nick)
if not tracks:
await bot.reply(message, "No playable tracks resolved")
return
# Transition on the music bot (derp), not the calling bot (may be merlin)
dj = _music_bot(bot)
ps = music_mod._ps(dj)
await music_mod._fade_and_cancel(dj, duration=3.0)
ps["queue"].clear()
ps["current"] = None
ps["queue"] = list(tracks)
music_mod._ensure_loop(dj, fade_in=True)
await bot.reply(message, f"Playing {len(tracks)} similar tracks for {search_artist}")
@command("tags", help="Music: !tags [artist] -- show genre tags")
async def cmd_tags(bot, message):
"""Show genre/style tags for an artist.
Usage:
!tags Tags for currently playing artist
!tags <artist> Tags for named artist
"""
api_key = _get_api_key(bot)
parts = message.text.split(None, 1)
query = parts[1].strip() if len(parts) > 1 else ""
loop = asyncio.get_running_loop()
if query:
artist = query
else:
artist, title = _current_meta(bot)
artist = artist or title
if not artist:
await bot.reply(message, "Nothing playing and no artist given")
return
# -- Last.fm path --
tags = []
if api_key:
tags = await loop.run_in_executor(
None, _get_top_tags, api_key, artist,
)
# -- MusicBrainz fallback --
if not tags:
try:
from plugins._musicbrainz import mb_artist_tags, mb_search_artist
mbid = await loop.run_in_executor(None, mb_search_artist, artist)
if mbid:
mb_tags = await loop.run_in_executor(
None, mb_artist_tags, mbid,
)
if mb_tags:
await bot.reply(message, f"{artist}: {', '.join(mb_tags)}")
return
except Exception:
log.warning("lastfm: MusicBrainz tag fallback failed",
exc_info=True)
if not tags:
await bot.reply(message, f"No tags found for '{artist}'")
return
# Show top tags with counts
tag_names = [t.get("name", "?") for t in tags[:10] if t.get("name")]
await bot.reply(message, f"{artist}: {', '.join(tag_names)}")